diff --git a/extensions/whatsapp/src/inbound/dedupe.ts b/extensions/whatsapp/src/inbound/dedupe.ts index 676717fa137..765623acc46 100644 --- a/extensions/whatsapp/src/inbound/dedupe.ts +++ b/extensions/whatsapp/src/inbound/dedupe.ts @@ -1,4 +1,3 @@ -import { createDedupeCache } from "openclaw/plugin-sdk/core"; import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; const RECENT_WEB_MESSAGE_TTL_MS = 20 * 60_000; @@ -6,19 +5,69 @@ const RECENT_WEB_MESSAGE_MAX = 5000; const RECENT_OUTBOUND_MESSAGE_TTL_MS = 20 * 60_000; const RECENT_OUTBOUND_MESSAGE_MAX = 5000; -const recentInboundMessages = createDedupeCache({ - ttlMs: RECENT_WEB_MESSAGE_TTL_MS, - maxSize: RECENT_WEB_MESSAGE_MAX, -}); const claimableInboundMessages = createClaimableDedupe({ ttlMs: RECENT_WEB_MESSAGE_TTL_MS, memoryMaxSize: RECENT_WEB_MESSAGE_MAX, }); -const recentOutboundMessages = createDedupeCache({ +const recentOutboundMessages = createRecentMessageCache({ ttlMs: RECENT_OUTBOUND_MESSAGE_TTL_MS, maxSize: RECENT_OUTBOUND_MESSAGE_MAX, }); +function createRecentMessageCache(options: { ttlMs: number; maxSize: number }) { + const ttlMs = Math.max(0, options.ttlMs); + const maxSize = Math.max(0, Math.floor(options.maxSize)); + const cache = new Map(); + + const prune = (now: number) => { + if (ttlMs > 0) { + const cutoff = now - ttlMs; + for (const [key, timestamp] of cache) { + if (timestamp < cutoff) { + cache.delete(key); + } + } + } + while (cache.size > maxSize) { + const oldest = cache.keys().next().value; + if (!oldest) { + break; + } + cache.delete(oldest); + } + }; + + const peek = (key: string | null, now = Date.now()): boolean => { + if (!key) { + return false; + } + const timestamp = cache.get(key); + if (timestamp === undefined) { + return false; + } + if (ttlMs > 0 && now - timestamp >= ttlMs) { + cache.delete(key); + return false; + } + return true; + }; + + return { + check: (key: string | null, now = Date.now()): boolean => { + if (!key) { + return false; + } + const existed = peek(key, now); + cache.delete(key); + cache.set(key, now); + prune(now); + return existed; + }, + peek, + clear: () => cache.clear(), + }; +} + export class WhatsAppRetryableInboundError extends Error { constructor(message: string, options?: ErrorOptions) { super(message, options); @@ -41,7 +90,6 @@ function buildMessageKey(params: { } export function resetWebInboundDedupe(): void { - recentInboundMessages.clear(); claimableInboundMessages.clearMemory(); recentOutboundMessages.clear(); } @@ -53,7 +101,6 @@ export async function claimRecentInboundMessage(key: string): Promise { export async function commitRecentInboundMessage(key: string): Promise { await claimableInboundMessages.commit(key); - recentInboundMessages.check(key); } export function releaseRecentInboundMessage(key: string, error?: unknown): void {