mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 10:30:44 +00:00
refactor(feishu): reuse persistent dedupe lookups
This commit is contained in:
@@ -1,10 +1,6 @@
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import {
|
||||
createDedupeCache,
|
||||
createPersistentDedupe,
|
||||
readJsonFileWithFallback,
|
||||
} from "./dedup-runtime-api.js";
|
||||
import { createDedupeCache, createPersistentDedupe } from "./dedup-runtime-api.js";
|
||||
|
||||
// Persistent TTL: 24 hours — survives restarts & WebSocket reconnects.
|
||||
const DEDUP_TTL_MS = 24 * 60 * 60 * 1000;
|
||||
@@ -12,9 +8,6 @@ const MEMORY_MAX_SIZE = 1_000;
|
||||
const FILE_MAX_ENTRIES = 10_000;
|
||||
const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000;
|
||||
const EVENT_MEMORY_MAX_SIZE = 2_000;
|
||||
type PersistentDedupeData = Record<string, number>;
|
||||
|
||||
const memoryDedupe = createDedupeCache({ ttlMs: DEDUP_TTL_MS, maxSize: MEMORY_MAX_SIZE });
|
||||
const processingClaims = createDedupeCache({
|
||||
ttlMs: EVENT_DEDUP_TTL_MS,
|
||||
maxSize: EVENT_MEMORY_MAX_SIZE,
|
||||
@@ -59,17 +52,6 @@ function normalizeMessageId(messageId: string | undefined | null): string | null
|
||||
return trimmed ? trimmed : null;
|
||||
}
|
||||
|
||||
function resolveMemoryDedupeKey(
|
||||
namespace: string,
|
||||
messageId: string | undefined | null,
|
||||
): string | null {
|
||||
const trimmed = normalizeMessageId(messageId);
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
}
|
||||
return `${namespace}:${trimmed}`;
|
||||
}
|
||||
|
||||
export function tryBeginFeishuMessageProcessing(
|
||||
messageId: string | undefined | null,
|
||||
namespace = "global",
|
||||
@@ -92,17 +74,12 @@ export async function finalizeFeishuMessageProcessing(params: {
|
||||
}): Promise<boolean> {
|
||||
const { messageId, namespace = "global", log, claimHeld = false } = params;
|
||||
const normalizedMessageId = normalizeMessageId(messageId);
|
||||
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
|
||||
if (!memoryKey || !normalizedMessageId) {
|
||||
if (!normalizedMessageId) {
|
||||
return false;
|
||||
}
|
||||
if (!claimHeld && !tryBeginFeishuMessageProcessing(normalizedMessageId, namespace)) {
|
||||
return false;
|
||||
}
|
||||
if (!tryRecordMessage(memoryKey)) {
|
||||
releaseFeishuMessageProcessing(normalizedMessageId, namespace);
|
||||
return false;
|
||||
}
|
||||
if (!(await tryRecordMessagePersistent(normalizedMessageId, namespace, log))) {
|
||||
releaseFeishuMessageProcessing(normalizedMessageId, namespace);
|
||||
return false;
|
||||
@@ -116,11 +93,9 @@ export async function recordProcessedFeishuMessage(
|
||||
log?: (...args: unknown[]) => void,
|
||||
): Promise<boolean> {
|
||||
const normalizedMessageId = normalizeMessageId(messageId);
|
||||
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
|
||||
if (!memoryKey || !normalizedMessageId) {
|
||||
if (!normalizedMessageId) {
|
||||
return false;
|
||||
}
|
||||
tryRecordMessage(memoryKey);
|
||||
return await tryRecordMessagePersistent(normalizedMessageId, namespace, log);
|
||||
}
|
||||
|
||||
@@ -130,32 +105,12 @@ export async function hasProcessedFeishuMessage(
|
||||
log?: (...args: unknown[]) => void,
|
||||
): Promise<boolean> {
|
||||
const normalizedMessageId = normalizeMessageId(messageId);
|
||||
const memoryKey = resolveMemoryDedupeKey(namespace, messageId);
|
||||
if (!memoryKey || !normalizedMessageId) {
|
||||
if (!normalizedMessageId) {
|
||||
return false;
|
||||
}
|
||||
if (hasRecordedMessage(memoryKey)) {
|
||||
return true;
|
||||
}
|
||||
return hasRecordedMessagePersistent(normalizedMessageId, namespace, log);
|
||||
}
|
||||
|
||||
/**
|
||||
* Synchronous dedup — memory only.
|
||||
* Kept for backward compatibility; prefer {@link tryRecordMessagePersistent}.
|
||||
*/
|
||||
export function tryRecordMessage(messageId: string): boolean {
|
||||
return !memoryDedupe.check(messageId);
|
||||
}
|
||||
|
||||
export function hasRecordedMessage(messageId: string): boolean {
|
||||
const trimmed = messageId.trim();
|
||||
if (!trimmed) {
|
||||
return false;
|
||||
}
|
||||
return memoryDedupe.peek(trimmed);
|
||||
}
|
||||
|
||||
export async function tryRecordMessagePersistent(
|
||||
messageId: string,
|
||||
namespace = "global",
|
||||
@@ -174,23 +129,12 @@ export async function hasRecordedMessagePersistent(
|
||||
namespace = "global",
|
||||
log?: (...args: unknown[]) => void,
|
||||
): Promise<boolean> {
|
||||
const trimmed = messageId.trim();
|
||||
if (!trimmed) {
|
||||
return false;
|
||||
}
|
||||
const now = Date.now();
|
||||
const filePath = resolveNamespaceFilePath(namespace);
|
||||
try {
|
||||
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(filePath, {});
|
||||
const seenAt = value[trimmed];
|
||||
if (typeof seenAt !== "number" || !Number.isFinite(seenAt)) {
|
||||
return false;
|
||||
}
|
||||
return DEDUP_TTL_MS <= 0 || now - seenAt < DEDUP_TTL_MS;
|
||||
} catch (error) {
|
||||
log?.(`feishu-dedup: persistent peek failed: ${String(error)}`);
|
||||
return false;
|
||||
}
|
||||
return persistentDedupe.hasRecent(messageId, {
|
||||
namespace,
|
||||
onDiskError: (error) => {
|
||||
log?.(`feishu-dedup: persistent peek failed: ${String(error)}`);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export async function warmupDedupFromDisk(
|
||||
|
||||
Reference in New Issue
Block a user