diff --git a/CHANGELOG.md b/CHANGELOG.md index c20a8f58c53..4ef32c766b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Agents/CLI transcripts: persist successful CLI-backed turns into the OpenClaw session transcript so google-gemini-cli replies appear in session history and the Control UI again. (#67490) Thanks @obviyus. - Discord/tool-call text: strip standalone Gemma-style `...` tool-call payloads from visible assistant text without truncating prose examples or trailing replies. (#67318) Thanks @joelnishanth. - WhatsApp/web-session: drain the pending per-auth creds save queue before reopening sockets so reconnect-time auth bootstrap no longer races in-flight `creds.json` writes and falsely restores from backup. (#67464) Thanks @neeravmakwana. +- BlueBubbles/catchup: add a per-message retry ceiling (`catchup.maxFailureRetries`, default 10) so a persistently-failing message with a malformed payload no longer wedges the catchup cursor forever. After N consecutive `processMessage` failures against the same GUID, catchup logs a WARN, skips that message on subsequent sweeps, and lets the cursor advance past it. Transient failures still retry from the same point as before. Also fixes a lost-update race in the persistent dedupe file lock that silently dropped inbound GUIDs on concurrent writes, a dedupe file naming migration gap on version upgrade, and a balloon-event bypass that let catchup replay debouncer-coalesced events as standalone messages. (#67426, #66870) Thanks @omarshahine. ## 2026.4.15-beta.1 diff --git a/extensions/bluebubbles/src/catchup.test.ts b/extensions/bluebubbles/src/catchup.test.ts index a87818bdf87..7e4a265dea7 100644 --- a/extensions/bluebubbles/src/catchup.test.ts +++ b/extensions/bluebubbles/src/catchup.test.ts @@ -428,9 +428,13 @@ describe("runBlueBubblesCatchup", () => { // Cursor is held just before the bad message's timestamp so the next // sweep retries it (and re-queries ok1 which dedupe will drop). expect(summary?.failed).toBe(1); + expect(summary?.givenUp).toBe(0); expect(summary?.cursorAfter).toBe(7 * 60 * 1000 - 1); const cursorAfter = await loadBlueBubblesCatchupCursor("test-account"); expect(cursorAfter?.lastSeenMs).toBe(7 * 60 * 1000 - 1); + // Retry counter is persisted so subsequent sweeps know how close we + // are to the give-up ceiling. + expect(cursorAfter?.failureRetries?.bad).toBe(1); }); it("clamps held cursor to previous cursor when failure ts is below it", async () => { @@ -606,6 +610,494 @@ describe("runBlueBubblesCatchup", () => { }); }); +describe("runBlueBubblesCatchup — per-message retry cap", () => { + let stateDir: string; + beforeEach(() => { + stateDir = makeStateDir(); + }); + afterEach(() => { + clearStateDir(stateDir); + vi.restoreAllMocks(); + }); + + it("increments retry counter on each consecutive failure and holds cursor", async () => { + // Three sweeps, all fail on the same GUID. Counter accumulates and + // cursor stays pinned below the failing message so every sweep + // retries it. maxFailureRetries: 5 so we don't give up inside this + // test. + const now1 = 10 * 60 * 1000; + const now2 = now1 + 60 * 1000; + const now3 = now2 + 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + + const target = makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxFailureRetries: 5 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }); + + const fetchMessages = async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 })], + }); + const processMessageFn = async () => { + throw new Error("boom"); + }; + + const s1 = await runBlueBubblesCatchup(target, { + now: () => now1, + fetchMessages, + processMessageFn, + }); + const s2 = await runBlueBubblesCatchup(target, { + now: () => now2, + fetchMessages, + processMessageFn, + }); + const s3 = await runBlueBubblesCatchup(target, { + now: () => now3, + fetchMessages, + processMessageFn, + }); + + expect(s1?.failed).toBe(1); + expect(s1?.givenUp).toBe(0); + expect(s2?.givenUp).toBe(0); + expect(s3?.givenUp).toBe(0); + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursor?.failureRetries?.wedge).toBe(3); + // Cursor still held just below the wedge message's timestamp. + expect(cursor?.lastSeenMs).toBe(7 * 60 * 1000 - 1); + }); + + it("gives up on the Nth consecutive failure and records count >= max", async () => { + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + // Pre-seed a cursor with retries at the one-before-give-up threshold + // so a single run trips the ceiling. This mirrors what would happen + // after many runs through the incremental-retry path above. + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 2 }); + + const warnings: string[] = []; + const target = makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxFailureRetries: 3 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }); + + const summary = await runBlueBubblesCatchup(target, { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 })], + }), + processMessageFn: async () => { + throw new Error("malformed"); + }, + error: (m) => warnings.push(m), + }); + + expect(summary?.failed).toBe(1); + expect(summary?.givenUp).toBe(1); + // Give-up no longer holds the cursor: it advances to nowMs so the + // wedge message falls out of the next query window entirely. + expect(summary?.cursorAfter).toBe(now); + + const persisted = await loadBlueBubblesCatchupCursor("test-account"); + expect(persisted?.lastSeenMs).toBe(now); + // Counter is persisted at the give-up value so a later sweep that + // still sees the message (e.g., because a different GUID is holding + // the cursor) will recognize the GUID as given up and skip it. + expect(persisted?.failureRetries?.wedge).toBe(3); + + // Distinct WARN log line fired on the give-up transition. + const giveUpWarnings = warnings.filter((w) => w.includes("giving up on guid=")); + expect(giveUpWarnings).toHaveLength(1); + expect(giveUpWarnings[0]).toContain("guid=wedge"); + expect(giveUpWarnings[0]).toContain("3 consecutive failures"); + }); + + it("skips an already-given-up GUID without re-attempting processMessage", async () => { + // Setup: the cursor file was written with wedge already at the + // give-up threshold from a prior run. On this run, the cursor is + // held by a different, still-retrying GUID (`held`), so wedge's + // timestamp falls back into the query window. Catchup must skip + // wedge without invoking processMessage on it. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 3 }); + + const attempted: string[] = []; + const target = makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxFailureRetries: 3 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }); + + const summary = await runBlueBubblesCatchup(target, { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "held", dateCreated: 6 * 60 * 1000 }), + makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 }), + ], + }), + processMessageFn: async (m) => { + attempted.push(m.messageId ?? "?"); + if (m.messageId === "held") { + throw new Error("transient"); + } + }, + }); + + // processMessage never runs for wedge. + expect(attempted).toEqual(["held"]); + expect(summary?.skippedGivenUp).toBe(1); + expect(summary?.failed).toBe(1); + expect(summary?.givenUp).toBe(0); + // Cursor held at `held` so held keeps retrying next sweep. + expect(summary?.cursorAfter).toBe(6 * 60 * 1000 - 1); + + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + // Both entries preserved: held at count 1 (still retrying), + // wedge at count 3 (given up, sticky). + expect(cursor?.failureRetries?.held).toBe(1); + expect(cursor?.failureRetries?.wedge).toBe(3); + }); + + it("clears the retry counter on successful processing", async () => { + // GUID recovered after a transient failure. The counter must drop + // so the next failure starts fresh (not carrying forward stale + // retry history). + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { flaky: 4 }); + + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "flaky", dateCreated: 6 * 60 * 1000 })], + }), + processMessageFn: async () => { + /* succeeds */ + }, + }); + + expect(summary?.replayed).toBe(1); + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursor?.failureRetries?.flaky).toBeUndefined(); + // When the map is empty, the field itself is omitted from the file. + expect(cursor?.failureRetries).toBeUndefined(); + expect(cursor?.lastSeenMs).toBe(now); + }); + + it("resolves 'earlier retry + later give-up' by holding cursor at earlier and skipping later", async () => { + // This is the key scenario issue #66870 exists to solve. GUID A at + // t=6min is still retrying (count=1). GUID B at t=7min has been + // failing for many runs and crosses the ceiling on this run. The + // wrong answer is "advance cursor past B to t=7min" — that would + // lose A. The right answer is "hold cursor below A, record B as + // given-up, skip B on sight next run". + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { giveUpHere: 2 }); + + const target = makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxFailureRetries: 3 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }); + + const summary = await runBlueBubblesCatchup(target, { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "retryEarlier", dateCreated: 6 * 60 * 1000 }), + makeBbMessage({ guid: "giveUpHere", dateCreated: 7 * 60 * 1000 }), + ], + }), + processMessageFn: async () => { + throw new Error("failing"); + }, + }); + + expect(summary?.failed).toBe(2); + expect(summary?.givenUp).toBe(1); + // Cursor held at (earlier message ts - 1) so retryEarlier keeps retrying. + expect(summary?.cursorAfter).toBe(6 * 60 * 1000 - 1); + + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursor?.failureRetries?.retryEarlier).toBe(1); + // Give-up counter preserved at or above the threshold. + expect(cursor?.failureRetries?.giveUpHere).toBe(3); + }); + + it("uses the default retry cap when maxFailureRetries is omitted from config", async () => { + // Boot-strap: record 9 failures, then a 10th should trigger give-up + // at the default threshold. We pre-seed the counter at 9 so this + // single-run test doesn't need to iterate the whole sequence. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 9 }); + + const warnings: string[] = []; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "wedge", dateCreated: 6 * 60 * 1000 })], + }), + processMessageFn: async () => { + throw new Error("boom"); + }, + error: (m) => warnings.push(m), + }); + expect(summary?.givenUp).toBe(1); + expect(warnings.some((w) => w.includes("giving up on guid=wedge"))).toBe(true); + expect(warnings.some((w) => w.includes("10 consecutive failures"))).toBe(true); + }); + + it("clamps maxFailureRetries to >= 1 when configured to zero or negative", async () => { + // With clamp floor of 1, the first failure already meets count >= 1 + // so catchup gives up immediately on first attempt. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + + const summary = await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxFailureRetries: 0 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "wedge", dateCreated: 6 * 60 * 1000 })], + }), + processMessageFn: async () => { + throw new Error("boom"); + }, + }, + ); + expect(summary?.givenUp).toBe(1); + expect(summary?.cursorAfter).toBe(now); + }); + + it("loads cleanly from a legacy cursor file without a failureRetries field", async () => { + // Older cursor files (written before this field existed) must still + // parse. Round-trip: save without the field (legacy path), then + // run catchup and confirm a normal sweep proceeds. + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + const loaded = await loadBlueBubblesCatchupCursor("test-account"); + expect(loaded?.lastSeenMs).toBe(5 * 60 * 1000); + expect(loaded?.failureRetries).toBeUndefined(); + + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => 10 * 60 * 1000, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "ok", dateCreated: 6 * 60 * 1000 })], + }), + processMessageFn: async () => {}, + }); + expect(summary?.replayed).toBe(1); + }); + + it("drops retry entries for GUIDs that are no longer in the query window", async () => { + // A stale entry carried in the cursor file (e.g., from an older + // run whose cursor has since advanced past its timestamp) should + // NOT be carried forward if the GUID does not appear in the + // current fetch. Otherwise the map grows without bound over time. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { + staleGuid: 2, + alsoStale: 5, + }); + + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + // Fetch returns entirely different GUIDs from the stored map. + messages: [makeBbMessage({ guid: "fresh", dateCreated: 6 * 60 * 1000 })], + }), + processMessageFn: async () => {}, + }); + expect(summary?.replayed).toBe(1); + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + // Both stale entries dropped; no new entries since the fresh message + // succeeded. + expect(cursor?.failureRetries).toBeUndefined(); + }); + + it("preserves stickiness when a given-up GUID reappears and fails again", async () => { + // Setup: cursor advanced, but held by a newer still-retrying GUID + // `held`. The wedge GUID is already given up from a prior run and + // still appears because `held` is holding the cursor below it. + // Catchup must continue to skip wedge on sight across many runs + // without ever calling processMessage on it. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { + wedge: 10, + held: 1, + }); + + const attempted: string[] = []; + const target = makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxFailureRetries: 5 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }); + const fetchMessages = async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "held", dateCreated: 6 * 60 * 1000 }), + makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 }), + ], + }); + const processMessageFn = async () => { + throw new Error("still broken"); + }; + + for (let i = 0; i < 3; i++) { + await runBlueBubblesCatchup(target, { + now: () => now + i, + fetchMessages, + processMessageFn: async (m) => { + attempted.push(m.messageId ?? "?"); + return processMessageFn(); + }, + }); + } + // wedge is NEVER attempted despite reappearing every sweep. + expect(attempted.filter((g) => g === "wedge")).toHaveLength(0); + // held is attempted every sweep. + expect(attempted.filter((g) => g === "held")).toHaveLength(3); + }); + + it("summary.skippedGivenUp counter is zero on a clean run", async () => { + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => 10_000, + fetchMessages: async () => ({ resolved: true, messages: [] }), + processMessageFn: async () => {}, + }); + expect(summary?.skippedGivenUp).toBe(0); + expect(summary?.givenUp).toBe(0); + }); +}); + +describe("saveBlueBubblesCatchupCursor + loadBlueBubblesCatchupCursor — retry map", () => { + let stateDir: string; + beforeEach(() => { + stateDir = makeStateDir(); + }); + afterEach(() => { + clearStateDir(stateDir); + }); + + it("round-trips an empty retry map by omitting the field from the persisted shape", async () => { + await saveBlueBubblesCatchupCursor("acct", 100, {}); + const loaded = await loadBlueBubblesCatchupCursor("acct"); + expect(loaded?.lastSeenMs).toBe(100); + expect(loaded?.failureRetries).toBeUndefined(); + }); + + it("round-trips a populated retry map", async () => { + await saveBlueBubblesCatchupCursor("acct", 100, { a: 1, b: 9 }); + const loaded = await loadBlueBubblesCatchupCursor("acct"); + expect(loaded?.failureRetries).toEqual({ a: 1, b: 9 }); + }); + + it("filters malformed retry entries during load (zero, negative, non-numeric)", async () => { + // Use the public save to produce the on-disk file, then overwrite + // its contents with a hand-crafted payload to exercise the loader's + // sanitization independently of what the saver would emit. + await saveBlueBubblesCatchupCursor("acct", 100); + const stateRoot = process.env.OPENCLAW_STATE_DIR; + if (!stateRoot) { + throw new Error("OPENCLAW_STATE_DIR must be set by the test harness"); + } + const dir = path.join(stateRoot, "bluebubbles", "catchup"); + const files = fs.readdirSync(dir); + expect(files).toHaveLength(1); + const firstFile = files[0]; + if (!firstFile) { + throw new Error("expected a cursor file to exist after save"); + } + const badCursor = { + lastSeenMs: 100, + updatedAt: 0, + failureRetries: { + good: 3, + zero: 0, + negative: -1, + notANumber: "oops", + infinite: Number.POSITIVE_INFINITY, + nan: Number.NaN, + }, + }; + fs.writeFileSync(path.join(dir, firstFile), JSON.stringify(badCursor)); + + const loaded = await loadBlueBubblesCatchupCursor("acct"); + expect(loaded?.lastSeenMs).toBe(100); + expect(loaded?.failureRetries).toEqual({ good: 3 }); + }); +}); + describe("fetchBlueBubblesMessagesSince", () => { it("returns resolved:false when the network call throws", async () => { // Point at a port nothing is listening on so fetch fails fast. diff --git a/extensions/bluebubbles/src/catchup.ts b/extensions/bluebubbles/src/catchup.ts index 3c1aba299be..0aa70342264 100644 --- a/extensions/bluebubbles/src/catchup.ts +++ b/extensions/bluebubbles/src/catchup.ts @@ -4,6 +4,7 @@ import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plug import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { resolveBlueBubblesServerAccount } from "./account-resolve.js"; +import { warmupBlueBubblesInboundDedupe } from "./inbound-dedupe.js"; import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js"; import { processMessage } from "./monitor-processing.js"; import type { WebhookTarget } from "./monitor-shared.js"; @@ -21,6 +22,14 @@ const MAX_MAX_AGE_MINUTES = 12 * 60; const DEFAULT_PER_RUN_LIMIT = 50; const MAX_PER_RUN_LIMIT = 500; const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30; +const DEFAULT_MAX_FAILURE_RETRIES = 10; +const MAX_MAX_FAILURE_RETRIES = 1_000; +// Defense-in-depth bound: a runaway retry map (e.g., a storm of unique +// failing GUIDs) should not balloon the cursor file unboundedly. When the +// map exceeds this size, we keep only the highest-count entries (the ones +// closest to being given up) and drop the rest. Realistic backlogs stay +// well under this; the bound exists to cap pathological growth. +const MAX_FAILURE_RETRY_MAP_SIZE = 5_000; const FETCH_TIMEOUT_MS = 15_000; export type BlueBubblesCatchupConfig = { @@ -28,6 +37,13 @@ export type BlueBubblesCatchupConfig = { maxAgeMinutes?: number; perRunLimit?: number; firstRunLookbackMinutes?: number; + /** + * Per-message retry ceiling. After this many consecutive failed + * `processMessage` attempts against the same GUID, catchup logs a WARN + * and force-advances the cursor past the wedged message instead of + * holding it indefinitely. Defaults to 10. Clamped to [1, 1000]. + */ + maxFailureRetries?: number; }; export type BlueBubblesCatchupSummary = { @@ -35,7 +51,21 @@ export type BlueBubblesCatchupSummary = { replayed: number; skippedFromMe: number; skippedPreCursor: number; + /** + * Messages whose GUID was already recorded as "given up" from a previous + * run (count >= `maxFailureRetries`). These are skipped without calling + * `processMessage` again. Lets the cursor continue advancing past the + * wedged message on the next sweep while avoiding another failed attempt. + */ + skippedGivenUp: number; failed: number; + /** + * Messages that crossed the `maxFailureRetries` ceiling ON THIS RUN. + * Each transition triggers a WARN log line. Already-given-up messages + * in subsequent runs count under `skippedGivenUp`, not here. Lets + * operators distinguish fresh give-up events from steady-state skips. + */ + givenUp: number; cursorBefore: number | null; cursorAfter: number; windowStartMs: number; @@ -43,7 +73,24 @@ export type BlueBubblesCatchupSummary = { fetchedCount: number; }; -export type BlueBubblesCatchupCursor = { lastSeenMs: number; updatedAt: number }; +export type BlueBubblesCatchupCursor = { + lastSeenMs: number; + updatedAt: number; + /** + * Per-GUID failure counter, preserved across runs. Two states: + * - `1 <= count < maxFailureRetries`: the GUID is still retrying and + * continues to hold the cursor back. + * - `count >= maxFailureRetries`: catchup has "given up" on the GUID. + * The message is skipped on sight (no `processMessage` attempt) and + * the GUID no longer holds the cursor. The entry stays in the map + * until the cursor naturally advances past the message's timestamp + * (at which point the message stops appearing in queries entirely). + * + * A successful `processMessage` removes the entry. Optional on the + * persisted shape so older cursor files without this field load cleanly. + */ + failureRetries?: Record; +}; function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { // Explicit OPENCLAW_STATE_DIR overrides take precedence (including @@ -81,6 +128,26 @@ function resolveCursorFilePath(accountId: string): string { ); } +function sanitizeFailureRetriesInput(raw: unknown): Record { + // Older cursor files don't carry this field; also guard against + // hand-edited JSON or future shape drift. Drop any entry whose count is + // not a finite positive integer so downstream arithmetic stays sound. + if (!raw || typeof raw !== "object") { + return {}; + } + const out: Record = {}; + for (const [guid, count] of Object.entries(raw as Record)) { + if (!guid || typeof guid !== "string") { + continue; + } + if (typeof count !== "number" || !Number.isFinite(count) || count <= 0) { + continue; + } + out[guid] = Math.floor(count); + } + return out; +} + export async function loadBlueBubblesCatchupCursor( accountId: string, ): Promise { @@ -92,18 +159,66 @@ export async function loadBlueBubblesCatchupCursor( if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) { return null; } - return value; + const failureRetries = sanitizeFailureRetriesInput(value.failureRetries); + const hasRetries = Object.keys(failureRetries).length > 0; + // Keep the shape consistent with what the writer emits: only carry the + // `failureRetries` key when there's something to retry. Old cursor files + // without the field continue to round-trip to the same shape. + return { + lastSeenMs: value.lastSeenMs, + updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0, + ...(hasRetries ? { failureRetries } : {}), + }; } export async function saveBlueBubblesCatchupCursor( accountId: string, lastSeenMs: number, + failureRetries?: Record, ): Promise { const filePath = resolveCursorFilePath(accountId); - const cursor: BlueBubblesCatchupCursor = { lastSeenMs, updatedAt: Date.now() }; + const sanitized = sanitizeFailureRetriesInput(failureRetries); + const hasRetries = Object.keys(sanitized).length > 0; + const cursor: BlueBubblesCatchupCursor = { + lastSeenMs, + updatedAt: Date.now(), + // Only emit the field when non-empty so unrelated cursor writes from + // the happy path don't bloat the cursor file with `"failureRetries": {}`. + ...(hasRetries ? { failureRetries: sanitized } : {}), + }; await writeJsonFileAtomically(filePath, cursor); } +/** + * Bound the retry map so a pathological storm of unique failing GUIDs + * cannot grow the cursor file without limit. Keeps the `maxSize` entries + * with the highest counts (closest to give-up) when over the bound. + * + * The map is already scoped to "currently failing, still-retrying" GUIDs + * and prunes on every run (entries not observed in the fetched window are + * dropped), so this is a defense-in-depth cap, not the primary pruning + * mechanism. + */ +function capFailureRetriesMap( + map: Record, + maxSize: number, +): Record { + const entries = Object.entries(map); + if (entries.length <= maxSize) { + return map; + } + // Sort by count desc; stable tiebreak on guid string so the retained set + // is deterministic across runs (important for cursor-file diffing during + // debugging). + entries.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0])); + const capped: Record = {}; + for (let i = 0; i < maxSize; i++) { + const [guid, count] = entries[i]; + capped[guid] = count; + } + return capped; +} + type FetchOpts = { baseUrl: string; password: string; @@ -180,10 +295,15 @@ function clampCatchupConfig(raw?: BlueBubblesCatchupConfig) { Math.max(raw?.firstRunLookbackMinutes ?? DEFAULT_FIRST_RUN_LOOKBACK_MINUTES, 1), MAX_MAX_AGE_MINUTES, ); + const maxFailureRetries = Math.min( + Math.max(Math.floor(raw?.maxFailureRetries ?? DEFAULT_MAX_FAILURE_RETRIES), 1), + MAX_MAX_FAILURE_RETRIES, + ); return { maxAgeMs: maxAgeMinutes * 60_000, perRunLimit, firstRunLookbackMs: firstRunLookbackMinutes * 60_000, + maxFailureRetries, }; } @@ -247,10 +367,11 @@ async function runBlueBubblesCatchupInner( const procFn = deps.processMessageFn ?? processMessage; const accountId = target.account.accountId; - const { maxAgeMs, perRunLimit, firstRunLookbackMs } = clampCatchupConfig(raw); + const { maxAgeMs, perRunLimit, firstRunLookbackMs, maxFailureRetries } = clampCatchupConfig(raw); const nowMs = now(); const existing = await loadBlueBubblesCatchupCursor(accountId).catch(() => null); const cursorBefore = existing?.lastSeenMs ?? null; + const prevRetries = existing?.failureRetries ?? {}; // Catchup runs once per gateway startup (called from monitor.ts after // webhook target registration). We deliberately do NOT short-circuit on @@ -295,6 +416,15 @@ async function runBlueBubblesCatchupInner( return null; } + // Ensure legacy→hashed dedupe file migration runs and the on-disk store + // is warm before we replay. Without this, an upgrade from a version that + // used the old `${safe}.json` naming to the current `${safe}__${hash}.json` + // would start with an empty dedupe cache and re-dispatch every message in + // the catchup window — producing duplicate replies. + await warmupBlueBubblesInboundDedupe(accountId).catch((err) => { + error?.(`[${accountId}] BlueBubbles catchup: dedupe warmup failed: ${String(err)}`); + }); + const { resolved, messages } = await fetchFn(windowStartMs, perRunLimit, { baseUrl, password, @@ -306,7 +436,9 @@ async function runBlueBubblesCatchupInner( replayed: 0, skippedFromMe: 0, skippedPreCursor: 0, + skippedGivenUp: 0, failed: 0, + givenUp: 0, cursorBefore, cursorAfter: nowMs, windowStartMs, @@ -320,18 +452,31 @@ async function runBlueBubblesCatchupInner( return summary; } - // Track the earliest timestamp where `processMessage` threw so we never - // advance the cursor past a retryable failure. Normalize failures (the - // record didn't yield a usable NormalizedWebhookMessage) are treated as - // permanent skips and do NOT block cursor advance — those payloads are - // unlikely to ever normalize on retry, and blocking on them would wedge - // catchup forever. + // Track the earliest timestamp where `processMessage` threw *and* the + // failing message has not yet crossed the per-GUID retry ceiling, so we + // never advance the cursor past a retryable failure. Normalize failures + // (the record didn't yield a usable NormalizedWebhookMessage) are + // treated as permanent skips and do NOT block cursor advance — those + // payloads are unlikely to ever normalize on retry, and blocking on + // them would wedge catchup forever. Given-up messages (count >= max) + // also do NOT contribute here; see `skippedGivenUp` below. let earliestProcessFailureTs: number | null = null; // Track the latest fetched message timestamp regardless of fate, so a // truncated query (fetchedCount === perRunLimit) can advance the cursor // exactly to the page boundary. Without this, the unfetched tail past // the cap is permanently unreachable. let latestFetchedTs = windowStartMs; + // Next-run retry map. Built from scratch each run so entries for GUIDs + // that didn't appear in this fetch are dropped (the cursor has + // advanced past them and they will never be queried again). Entries we + // do carry forward encode two states via the stored count: + // - `1 <= count < maxFailureRetries`: still-retrying, holds cursor. + // - `count >= maxFailureRetries`: given-up, skipped on sight without + // another `processMessage` attempt. Preserving the count is what + // keeps the give-up state sticky across runs when an earlier + // still-retrying failure is holding the cursor and the given-up + // message keeps reappearing in the query window. + const nextRetries: Record = {}; for (const rec of messages) { // Defense in depth: the server-side `after:` filter should already @@ -353,6 +498,30 @@ async function runBlueBubblesCatchupInner( continue; } + // Skip tapback/reaction/balloon events. These carry an + // `associatedMessageGuid` pointing at the parent text message and + // have a different `guid` of their own. The live webhook path handles + // balloons via the debouncer, which coalesces them with their parent. + // Without debouncing here, replaying a balloon would dispatch it as a + // standalone message — producing a duplicate reply to the parent. + // + // Guard: only skip when `associatedMessageType` is set (tapbacks and + // reactions — e.g., "like", 2000) OR `balloonBundleId` is set (URL + // previews, stickers). iMessage threaded replies use a separate + // `threadOriginatorGuid` field and do NOT set either of these, so + // they pass through for correct catchup replay. + const assocGuid = + typeof rec.associatedMessageGuid === "string" + ? rec.associatedMessageGuid.trim() + : typeof rec.associated_message_guid === "string" + ? rec.associated_message_guid.trim() + : ""; + const assocType = rec.associatedMessageType ?? rec.associated_message_type; + const balloonId = typeof rec.balloonBundleId === "string" ? rec.balloonBundleId.trim() : ""; + if (assocGuid && (assocType != null || balloonId)) { + continue; + } + const normalized = normalizeWebhookMessage({ type: "new-message", data: rec }); if (!normalized) { summary.failed++; @@ -363,15 +532,62 @@ async function runBlueBubblesCatchupInner( continue; } + // Prefer the normalized messageId (what the dedupe cache uses) so the + // retry counter and downstream dedupe key agree on identity. Fall + // back to the raw BB `guid` only when normalization didn't supply one. + const retryKey = normalized.messageId ?? (typeof rec.guid === "string" ? rec.guid : ""); + + // Already-given-up GUIDs are skipped without another `processMessage` + // attempt. This is what lets catchup make forward progress through an + // earlier, still-retrying failure while not burning cycles re-running + // a permanently broken message every sweep. + const prevCount = retryKey ? (prevRetries[retryKey] ?? 0) : 0; + if (retryKey && prevCount >= maxFailureRetries) { + summary.skippedGivenUp++; + // Preserve the count so give-up stickiness survives this run. + nextRetries[retryKey] = prevCount; + continue; + } + try { await procFn(normalized, target); summary.replayed++; + // Success clears any accumulated retries for this GUID. Since we + // build `nextRetries` from scratch rather than mutating + // `prevRetries`, simply NOT copying the entry is the clear. (We + // still need this branch so readers understand the lifecycle.) } catch (err) { summary.failed++; - if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) { - earliestProcessFailureTs = ts; + const nextCount = prevCount + 1; + if (retryKey && nextCount >= maxFailureRetries) { + // Crossing the ceiling this run: log WARN once and record the + // give-up in the persisted map. Don't contribute to + // `earliestProcessFailureTs` — we're intentionally letting the + // cursor advance past this GUID on the next sweep. + summary.givenUp++; + nextRetries[retryKey] = nextCount; + error?.( + `[${accountId}] BlueBubbles catchup: giving up on guid=${retryKey} ` + + `after ${nextCount} consecutive failures; future sweeps will skip ` + + `this message. timestamp=${ts}: ${String(err)}`, + ); + } else { + // Still retrying: count this failure and hold the cursor so the + // next sweep retries the same window. (retryKey may be empty in + // the unusual case where neither normalizer nor raw payload + // carried a GUID — in that case we hold the cursor but cannot + // increment a counter, matching pre-retry-cap behavior.) + if (retryKey) { + nextRetries[retryKey] = nextCount; + } + if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) { + earliestProcessFailureTs = ts; + } + error?.( + `[${accountId}] BlueBubbles catchup: processMessage failed (retry ` + + `${nextCount}/${maxFailureRetries}): ${String(err)}`, + ); } - error?.(`[${accountId}] BlueBubbles catchup: processMessage failed: ${String(err)}`); } } @@ -381,10 +597,17 @@ async function runBlueBubblesCatchupInner( // this sweep finished (avoiding stuck rescans of a message with // `dateCreated > nowMs` from minor clock skew between BB host and // gateway host). - // - On retryable failure (any `processMessage` throw): hold the cursor - // just before the earliest failed timestamp so the next run retries - // from there. The inbound-dedupe cache from #66230 keeps successfully - // replayed messages from being re-processed. + // - On retryable failure (any still-retrying `processMessage` throw, + // where the GUID has NOT crossed `maxFailureRetries`): hold the + // cursor just before the earliest still-retrying failed timestamp so + // the next run retries from there. The inbound-dedupe cache from + // #66230 keeps successfully replayed messages from being re-processed. + // - On give-up (failures that crossed `maxFailureRetries`): the GUID + // is recorded in the persisted retry map with `count >= max` and + // skipped on sight in subsequent runs (without another processMessage + // attempt). Give-up GUIDs intentionally do NOT hold the cursor, so + // the cursor can advance past them naturally — this is what unwedges + // catchup from a permanently malformed message (issue #66870). // - On truncation (fetched === perRunLimit): advance only to the latest // fetched timestamp so the next run picks up from the page boundary. // Otherwise the unfetched tail past the cap (which can be substantial @@ -400,14 +623,18 @@ async function runBlueBubblesCatchupInner( nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs); } summary.cursorAfter = nextCursorMs; - await saveBlueBubblesCatchupCursor(accountId, nextCursorMs).catch((err) => { + // Cap the retry map before writing — defense in depth against a storm + // of unique failing GUIDs ballooning the cursor file. + const retriesToPersist = capFailureRetriesMap(nextRetries, MAX_FAILURE_RETRY_MAP_SIZE); + await saveBlueBubblesCatchupCursor(accountId, nextCursorMs, retriesToPersist).catch((err) => { error?.(`[${accountId}] BlueBubbles catchup: cursor save failed: ${String(err)}`); }); log?.( `[${accountId}] BlueBubbles catchup: replayed=${summary.replayed} ` + `skipped_fromMe=${summary.skippedFromMe} skipped_preCursor=${summary.skippedPreCursor} ` + - `failed=${summary.failed} fetched=${summary.fetchedCount} ` + + `skipped_givenUp=${summary.skippedGivenUp} failed=${summary.failed} ` + + `given_up=${summary.givenUp} fetched=${summary.fetchedCount} ` + `window_ms=${nowMs - windowStartMs}`, ); diff --git a/extensions/bluebubbles/src/config-schema.ts b/extensions/bluebubbles/src/config-schema.ts index 22282989475..a4b7fb5e7b3 100644 --- a/extensions/bluebubbles/src/config-schema.ts +++ b/extensions/bluebubbles/src/config-schema.ts @@ -50,6 +50,14 @@ const bluebubblesCatchupSchema = z perRunLimit: z.number().int().positive().optional(), /** First-run lookback used when no cursor has been persisted yet. Clamped to [1, 720]. */ firstRunLookbackMinutes: z.number().int().positive().optional(), + /** + * Consecutive-failure ceiling per message GUID. After this many failed + * processMessage attempts against the same GUID, catchup logs a WARN + * and skips the message on subsequent sweeps (letting the cursor + * advance past a permanently malformed payload). Defaults to 10. + * Clamped to [1, 1000]. + */ + maxFailureRetries: z.number().int().positive().optional(), }) .strict() .optional(); diff --git a/extensions/bluebubbles/src/inbound-dedupe.ts b/extensions/bluebubbles/src/inbound-dedupe.ts index 8327b899710..deffa49685f 100644 --- a/extensions/bluebubbles/src/inbound-dedupe.ts +++ b/extensions/bluebubbles/src/inbound-dedupe.ts @@ -1,4 +1,5 @@ import { createHash } from "node:crypto"; +import fs from "node:fs"; import path from "node:path"; import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; @@ -33,6 +34,11 @@ function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { return resolveStateDir(env); } +function resolveLegacyNamespaceFilePath(namespace: string): string { + const safe = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "global"; + return path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe", `${safe}.json`); +} + function resolveNamespaceFilePath(namespace: string): string { // Keep a readable prefix for operator debugging, but suffix with a short // hash of the raw namespace so account IDs that only differ by @@ -40,12 +46,42 @@ function resolveNamespaceFilePath(namespace: string): string { // onto the same file. const safePrefix = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "ns"; const hash = createHash("sha256").update(namespace, "utf8").digest("hex").slice(0, 12); - return path.join( - resolveStateDirFromEnv(), - "bluebubbles", - "inbound-dedupe", - `${safePrefix}__${hash}.json`, - ); + const dir = path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe"); + const newPath = path.join(dir, `${safePrefix}__${hash}.json`); + + // One-time migration: earlier beta shipped `${safe}.json` (no hash). + // Rename so the upgrade preserves existing dedupe entries instead of + // starting from an empty file and replaying already-handled messages. + migrateLegacyDedupeFile(namespace, newPath); + + return newPath; +} + +const migratedNamespaces = new Set(); + +function migrateLegacyDedupeFile(namespace: string, newPath: string): void { + if (migratedNamespaces.has(namespace)) { + return; + } + migratedNamespaces.add(namespace); + try { + const legacyPath = resolveLegacyNamespaceFilePath(namespace); + if (legacyPath === newPath) { + return; + } + if (!fs.existsSync(legacyPath)) { + return; + } + if (!fs.existsSync(newPath)) { + fs.renameSync(legacyPath, newPath); + } else { + // Both exist: new file is authoritative; remove the stale legacy. + fs.unlinkSync(legacyPath); + } + } catch { + // Best-effort migration; a missed rename is strictly less harmful + // than crashing the module load path. + } } function buildPersistentImpl(): ClaimableDedupe { @@ -162,6 +198,18 @@ export async function claimBlueBubblesInboundMessage(params: { }; } +/** + * Ensure the legacy→hashed dedupe file migration runs and the on-disk + * store is warmed into memory for the given account. Call before any + * catchup replay so already-handled GUIDs are recognized even when the + * file-naming convention changed between versions. + */ +export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise { + // Trigger the migration side-effect inside resolveNamespaceFilePath. + resolveNamespaceFilePath(accountId); + await impl.warmup(accountId); +} + /** * Reset inbound dedupe state between tests. Installs an in-memory-only * implementation so tests do not hit disk, avoiding file-lock timing issues diff --git a/src/plugin-sdk/persistent-dedupe.ts b/src/plugin-sdk/persistent-dedupe.ts index 79dd33e2779..651a5b9193f 100644 --- a/src/plugin-sdk/persistent-dedupe.ts +++ b/src/plugin-sdk/persistent-dedupe.ts @@ -154,6 +154,35 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis const lockOptions = mergeLockOptions(options.lockOptions); const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize }); const inflight = new Map>(); + // In-process write queue per file path. `withFileLock` is re-entrant + // within the same process (a second caller for the same path gets + // immediate access instead of waiting), so two concurrent + // checkAndRecordInner calls for different keys but the same file can + // race: both read the same stale data, and the last writer's + // writeJsonFileAtomically silently overwrites the first writer's + // additions. This queue serializes all read-modify-write cycles + // targeting the same file within this process, preventing the lost + // update while still allowing cross-process file-lock contention to + // be handled by the file lock itself. + const fileWriteQueues = new Map>(); + + function enqueueFileWrite(filePath: string, fn: () => Promise): Promise { + const prev = fileWriteQueues.get(filePath) ?? Promise.resolve(); + const next = prev.then(fn, fn); + fileWriteQueues.set(filePath, next); + // Cleanup: remove the queue entry once this link settles, but only if + // no newer work was chained after us. The `.catch(() => {})` prevents + // an unhandled rejection when `next` rejects — callers still observe + // the rejection through the returned `next` promise directly. + next + .finally(() => { + if (fileWriteQueues.get(filePath) === next) { + fileWriteQueues.delete(filePath); + } + }) + .catch(() => {}); + return next; + } async function checkAndRecordInner( key: string, @@ -168,19 +197,21 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis const path = options.resolveFilePath(namespace); try { - const duplicate = await withFileLock(path, lockOptions, async () => { - const { value } = await readJsonFileWithFallback(path, {}); - const data = sanitizeData(value); - const seenAt = data[key]; - const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs); - if (isRecent) { - return true; - } - data[key] = now; - pruneData(data, now, ttlMs, fileMaxEntries); - await writeJsonFileAtomically(path, data); - return false; - }); + const duplicate = await enqueueFileWrite(path, () => + withFileLock(path, lockOptions, async () => { + const { value } = await readJsonFileWithFallback(path, {}); + const data = sanitizeData(value); + const seenAt = data[key]; + const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs); + if (isRecent) { + return true; + } + data[key] = now; + pruneData(data, now, ttlMs, fileMaxEntries); + await writeJsonFileAtomically(path, data); + return false; + }), + ); return !duplicate; } catch (error) { onDiskError?.(error);