fix(feishu): persist dedup cache across gateway restarts via warmup (openclaw#31605) thanks @Sid-Qin

Verified:
- pnpm install --frozen-lockfile
- pnpm build
- pnpm check
- pnpm test:macmini (fails on unrelated baseline test: src/config/config.legacy-config-detection.rejects-routing-allowfrom.test.ts)

Co-authored-by: Sid-Qin <201593046+Sid-Qin@users.noreply.github.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
Sid
2026-03-03 07:30:40 +08:00
committed by GitHub
parent 132794fe74
commit 481da215b9
5 changed files with 106 additions and 0 deletions

View File

@@ -149,6 +149,7 @@ Docs: https://docs.openclaw.ai
- Feishu/Plugin sdk compatibility: add safe webhook default fallbacks when loading Feishu monitor state so mixed-version installs no longer crash if older `openclaw/plugin-sdk` builds omit webhook default constants. (#31606)
- Feishu/Inbound debounce: debounce rapid same-chat sender bursts into one ordered dispatch turn, skip already-processed retries when composing merged text, and preserve bot-mention intent across merged entries to reduce duplicate or late inbound handling. (#31548)
- Feishu/Inbound ordering: serialize message handling per chat while preserving cross-chat concurrency to avoid same-chat race drops under bursty inbound traffic. (#31807)
- Feishu/Dedup restart resilience: warm persistent dedup state into memory on monitor startup so retry events after gateway restart stay suppressed without requiring initial on-disk probe misses. (#31605)
- BlueBubbles/Message metadata: harden send response ID extraction, include sender identity in DM context, and normalize inbound `message_id` selection to avoid duplicate ID metadata. (#23970) Thanks @tyler6204.
- Docker/Image health checks: add Dockerfile `HEALTHCHECK` that probes gateway `GET /healthz` so container runtimes can mark unhealthy instances without requiring auth credentials in the probe command. (#11478) Thanks @U-C4N and @vincentkoc.
- Docker/Sandbox bootstrap hardening: make `OPENCLAW_SANDBOX` opt-in parsing explicit (`1|true|yes|on`), support custom Docker socket paths via `OPENCLAW_DOCKER_SOCKET`, defer docker.sock exposure until sandbox prerequisites pass, and reset/roll back persisted sandbox mode to `off` when setup is skipped or partially fails to avoid stale broken sandbox state. (#29974) Thanks @jamtujest and @vincentkoc.

View File

@@ -89,3 +89,12 @@ export async function hasRecordedMessagePersistent(
return false;
}
}
export async function warmupDedupFromDisk(
namespace: string,
log?: (...args: unknown[]) => void,
): Promise<number> {
return persistentDedupe.warmup(namespace, (error) => {
log?.(`feishu-dedup: warmup disk error: ${String(error)}`);
});
}

View File

@@ -16,6 +16,7 @@ import {
hasRecordedMessagePersistent,
tryRecordMessage,
tryRecordMessagePersistent,
warmupDedupFromDisk,
} from "./dedup.js";
import { isMentionForwardRequest } from "./mention.js";
import { fetchBotOpenIdForMonitor } from "./monitor.startup.js";
@@ -510,6 +511,11 @@ export async function monitorSingleAccount(params: MonitorSingleAccountParams):
throw new Error(`Feishu account "${accountId}" webhook mode requires verificationToken`);
}
const warmupCount = await warmupDedupFromDisk(accountId, log);
if (warmupCount > 0) {
log(`feishu[${accountId}]: dedup warmup loaded ${warmupCount} entries from disk`);
}
const eventDispatcher = createEventDispatcher(account);
const chatHistories = new Map<string, HistoryEntry[]>();

View File

@@ -70,4 +70,69 @@ describe("createPersistentDedupe", () => {
expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(true);
expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(false);
});
it("warmup loads persisted entries into memory", async () => {
const root = await makeTmpRoot();
const resolveFilePath = (namespace: string) => path.join(root, `${namespace}.json`);
const writer = createPersistentDedupe({
ttlMs: 24 * 60 * 60 * 1000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath,
});
expect(await writer.checkAndRecord("msg-1", { namespace: "acct" })).toBe(true);
expect(await writer.checkAndRecord("msg-2", { namespace: "acct" })).toBe(true);
const reader = createPersistentDedupe({
ttlMs: 24 * 60 * 60 * 1000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath,
});
const loaded = await reader.warmup("acct");
expect(loaded).toBe(2);
expect(await reader.checkAndRecord("msg-1", { namespace: "acct" })).toBe(false);
expect(await reader.checkAndRecord("msg-2", { namespace: "acct" })).toBe(false);
expect(await reader.checkAndRecord("msg-3", { namespace: "acct" })).toBe(true);
});
it("warmup returns 0 when no disk file exists", async () => {
const root = await makeTmpRoot();
const dedupe = createPersistentDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: (ns) => path.join(root, `${ns}.json`),
});
const loaded = await dedupe.warmup("nonexistent");
expect(loaded).toBe(0);
});
it("warmup skips expired entries", async () => {
const root = await makeTmpRoot();
const resolveFilePath = (namespace: string) => path.join(root, `${namespace}.json`);
const ttlMs = 1000;
const writer = createPersistentDedupe({
ttlMs,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath,
});
const oldNow = Date.now() - 2000;
expect(await writer.checkAndRecord("old-msg", { namespace: "acct", now: oldNow })).toBe(true);
expect(await writer.checkAndRecord("new-msg", { namespace: "acct" })).toBe(true);
const reader = createPersistentDedupe({
ttlMs,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath,
});
const loaded = await reader.warmup("acct");
expect(loaded).toBe(1);
expect(await reader.checkAndRecord("old-msg", { namespace: "acct" })).toBe(true);
expect(await reader.checkAndRecord("new-msg", { namespace: "acct" })).toBe(false);
});
});

View File

@@ -22,6 +22,7 @@ export type PersistentDedupeCheckOptions = {
export type PersistentDedupe = {
checkAndRecord: (key: string, options?: PersistentDedupeCheckOptions) => Promise<boolean>;
warmup: (namespace?: string, onError?: (error: unknown) => void) => Promise<number>;
clearMemory: () => void;
memorySize: () => number;
};
@@ -127,10 +128,33 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
return !duplicate;
} catch (error) {
onDiskError?.(error);
memory.check(scopedKey, now);
return true;
}
}
async function warmup(namespace = "global", onError?: (error: unknown) => void): Promise<number> {
const filePath = options.resolveFilePath(namespace);
const now = Date.now();
try {
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(filePath, {});
const data = sanitizeData(value);
let loaded = 0;
for (const [key, ts] of Object.entries(data)) {
if (ttlMs > 0 && now - ts >= ttlMs) {
continue;
}
const scopedKey = `${namespace}:${key}`;
memory.check(scopedKey, ts);
loaded++;
}
return loaded;
} catch (error) {
onError?.(error);
return 0;
}
}
async function checkAndRecord(
key: string,
dedupeOptions?: PersistentDedupeCheckOptions,
@@ -158,6 +182,7 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
return {
checkAndRecord,
warmup,
clearMemory: () => memory.clear(),
memorySize: () => memory.size(),
};