diff --git a/CHANGELOG.md b/CHANGELOG.md index 714384f9116..019bf9029a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -149,6 +149,7 @@ Docs: https://docs.openclaw.ai - Feishu/Plugin sdk compatibility: add safe webhook default fallbacks when loading Feishu monitor state so mixed-version installs no longer crash if older `openclaw/plugin-sdk` builds omit webhook default constants. (#31606) - Feishu/Inbound debounce: debounce rapid same-chat sender bursts into one ordered dispatch turn, skip already-processed retries when composing merged text, and preserve bot-mention intent across merged entries to reduce duplicate or late inbound handling. (#31548) - Feishu/Inbound ordering: serialize message handling per chat while preserving cross-chat concurrency to avoid same-chat race drops under bursty inbound traffic. (#31807) +- Feishu/Dedup restart resilience: warm persistent dedup state into memory on monitor startup so retry events after gateway restart stay suppressed without requiring initial on-disk probe misses. (#31605) - BlueBubbles/Message metadata: harden send response ID extraction, include sender identity in DM context, and normalize inbound `message_id` selection to avoid duplicate ID metadata. (#23970) Thanks @tyler6204. - Docker/Image health checks: add Dockerfile `HEALTHCHECK` that probes gateway `GET /healthz` so container runtimes can mark unhealthy instances without requiring auth credentials in the probe command. (#11478) Thanks @U-C4N and @vincentkoc. - Docker/Sandbox bootstrap hardening: make `OPENCLAW_SANDBOX` opt-in parsing explicit (`1|true|yes|on`), support custom Docker socket paths via `OPENCLAW_DOCKER_SOCKET`, defer docker.sock exposure until sandbox prerequisites pass, and reset/roll back persisted sandbox mode to `off` when setup is skipped or partially fails to avoid stale broken sandbox state. (#29974) Thanks @jamtujest and @vincentkoc. diff --git a/extensions/feishu/src/dedup.ts b/extensions/feishu/src/dedup.ts index ffeb5b76944..408a53d5d1a 100644 --- a/extensions/feishu/src/dedup.ts +++ b/extensions/feishu/src/dedup.ts @@ -89,3 +89,12 @@ export async function hasRecordedMessagePersistent( return false; } } + +export async function warmupDedupFromDisk( + namespace: string, + log?: (...args: unknown[]) => void, +): Promise { + return persistentDedupe.warmup(namespace, (error) => { + log?.(`feishu-dedup: warmup disk error: ${String(error)}`); + }); +} diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index 655a2510234..4e8d30b2359 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -16,6 +16,7 @@ import { hasRecordedMessagePersistent, tryRecordMessage, tryRecordMessagePersistent, + warmupDedupFromDisk, } from "./dedup.js"; import { isMentionForwardRequest } from "./mention.js"; import { fetchBotOpenIdForMonitor } from "./monitor.startup.js"; @@ -510,6 +511,11 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams): throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`); } + const warmupCount = await warmupDedupFromDisk(accountId, log); + if (warmupCount > 0) { + log(`feishu[${accountId}]: dedup warmup loaded ${warmupCount} entries from disk`); + } + const eventDispatcher = createEventDispatcher(account); const chatHistories = new Map(); diff --git a/src/plugin-sdk/persistent-dedupe.test.ts b/src/plugin-sdk/persistent-dedupe.test.ts index e1a1e3faefa..485c143ea75 100644 --- a/src/plugin-sdk/persistent-dedupe.test.ts +++ b/src/plugin-sdk/persistent-dedupe.test.ts @@ -70,4 +70,69 @@ describe("createPersistentDedupe", () => { expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(true); expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(false); }); + + it("warmup loads persisted entries into memory", async () => { + const root = await makeTmpRoot(); + const resolveFilePath = (namespace: string) => path.join(root, `${namespace}.json`); + + const writer = createPersistentDedupe({ + ttlMs: 24 * 60 * 60 * 1000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath, + }); + expect(await writer.checkAndRecord("msg-1", { namespace: "acct" })).toBe(true); + expect(await writer.checkAndRecord("msg-2", { namespace: "acct" })).toBe(true); + + const reader = createPersistentDedupe({ + ttlMs: 24 * 60 * 60 * 1000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath, + }); + const loaded = await reader.warmup("acct"); + expect(loaded).toBe(2); + expect(await reader.checkAndRecord("msg-1", { namespace: "acct" })).toBe(false); + expect(await reader.checkAndRecord("msg-2", { namespace: "acct" })).toBe(false); + expect(await reader.checkAndRecord("msg-3", { namespace: "acct" })).toBe(true); + }); + + it("warmup returns 0 when no disk file exists", async () => { + const root = await makeTmpRoot(); + const dedupe = createPersistentDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath: (ns) => path.join(root, `${ns}.json`), + }); + const loaded = await dedupe.warmup("nonexistent"); + expect(loaded).toBe(0); + }); + + it("warmup skips expired entries", async () => { + const root = await makeTmpRoot(); + const resolveFilePath = (namespace: string) => path.join(root, `${namespace}.json`); + const ttlMs = 1000; + + const writer = createPersistentDedupe({ + ttlMs, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath, + }); + const oldNow = Date.now() - 2000; + expect(await writer.checkAndRecord("old-msg", { namespace: "acct", now: oldNow })).toBe(true); + expect(await writer.checkAndRecord("new-msg", { namespace: "acct" })).toBe(true); + + const reader = createPersistentDedupe({ + ttlMs, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath, + }); + const loaded = await reader.warmup("acct"); + expect(loaded).toBe(1); + expect(await reader.checkAndRecord("old-msg", { namespace: "acct" })).toBe(true); + expect(await reader.checkAndRecord("new-msg", { namespace: "acct" })).toBe(false); + }); }); diff --git a/src/plugin-sdk/persistent-dedupe.ts b/src/plugin-sdk/persistent-dedupe.ts index 947217fda68..0b33824c795 100644 --- a/src/plugin-sdk/persistent-dedupe.ts +++ b/src/plugin-sdk/persistent-dedupe.ts @@ -22,6 +22,7 @@ export type PersistentDedupeCheckOptions = { export type PersistentDedupe = { checkAndRecord: (key: string, options?: PersistentDedupeCheckOptions) => Promise; + warmup: (namespace?: string, onError?: (error: unknown) => void) => Promise; clearMemory: () => void; memorySize: () => number; }; @@ -127,10 +128,33 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis return !duplicate; } catch (error) { onDiskError?.(error); + memory.check(scopedKey, now); return true; } } + async function warmup(namespace = "global", onError?: (error: unknown) => void): Promise { + const filePath = options.resolveFilePath(namespace); + const now = Date.now(); + try { + const { value } = await readJsonFileWithFallback(filePath, {}); + const data = sanitizeData(value); + let loaded = 0; + for (const [key, ts] of Object.entries(data)) { + if (ttlMs > 0 && now - ts >= ttlMs) { + continue; + } + const scopedKey = `${namespace}:${key}`; + memory.check(scopedKey, ts); + loaded++; + } + return loaded; + } catch (error) { + onError?.(error); + return 0; + } + } + async function checkAndRecord( key: string, dedupeOptions?: PersistentDedupeCheckOptions, @@ -158,6 +182,7 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis return { checkAndRecord, + warmup, clearMemory: () => memory.clear(), memorySize: () => memory.size(), };