diff --git a/CHANGELOG.md b/CHANGELOG.md index 0469a96ab27..0cf82bf78ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Memory/dreaming: stop ordinary transcripts that merely quote the dream-diary prompt from being classified as internal dreaming runs and silently dropped from session recall ingestion. (#66852) Thanks @gumadeiras. - Telegram/documents: sanitize binary reply context and ZIP-like archive extraction so `.epub` and `.mobi` uploads can no longer leak raw binary into prompt context through reply metadata or archive-to-`text/plain` coercion. (#66877) Thanks @martinfrancois. - Telegram/native commands: restore plugin-registry-backed auto defaults for native commands and native skills so Telegram slash commands keep registering when `commands.native` and `commands.nativeSkills` stay on `auto`. (#66843) Thanks @kashevk0. +- fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66857, #66721) Thanks @omarshahine. ## 2026.4.14 diff --git a/extensions/bluebubbles/src/accounts.ts b/extensions/bluebubbles/src/accounts.ts index 59a692ec077..a2b1276e6f0 100644 --- a/extensions/bluebubbles/src/accounts.ts +++ b/extensions/bluebubbles/src/accounts.ts @@ -48,7 +48,7 @@ function mergeBlueBubblesAccountConfig( accountId, omitKeys: ["defaultAccount"], normalizeAccountId, - nestedObjectKeys: ["network"], + nestedObjectKeys: ["network", "catchup"], }); return { ...merged, diff --git a/extensions/bluebubbles/src/catchup.test.ts b/extensions/bluebubbles/src/catchup.test.ts new file mode 100644 index 00000000000..a87818bdf87 --- /dev/null +++ b/extensions/bluebubbles/src/catchup.test.ts @@ -0,0 +1,621 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + fetchBlueBubblesMessagesSince, + loadBlueBubblesCatchupCursor, + runBlueBubblesCatchup, + saveBlueBubblesCatchupCursor, +} from "./catchup.js"; +import type { NormalizedWebhookMessage } from "./monitor-normalize.js"; +import type { WebhookTarget } from "./monitor-shared.js"; + +function makeStateDir(): string { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-catchup-test-")); + process.env.OPENCLAW_STATE_DIR = dir; + return dir; +} + +function clearStateDir(dir: string): void { + delete process.env.OPENCLAW_STATE_DIR; + fs.rmSync(dir, { recursive: true, force: true }); +} + +function makeTarget(overrides: Partial = {}): WebhookTarget { + const accountId = overrides.accountId ?? "test-account"; + return { + account: { + accountId, + enabled: true, + name: accountId, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "test-password", + network: { dangerouslyAllowPrivateNetwork: true }, + } as unknown as WebhookTarget["account"]["config"], + }, + config: {} as unknown as WebhookTarget["config"], + runtime: { log: () => {}, error: () => {} }, + core: {} as unknown as WebhookTarget["core"], + path: "/bluebubbles-webhook", + ...overrides, + }; +} + +function makeBbMessage(over: Partial> = {}): Record { + return { + guid: `guid-${Math.random().toString(36).slice(2, 10)}`, + text: "hello", + dateCreated: 2_000, + handle: { address: "+15555550123" }, + chats: [{ guid: "iMessage;-;+15555550123" }], + isFromMe: false, + ...over, + }; +} + +describe("catchup cursor persistence", () => { + let stateDir: string; + beforeEach(() => { + stateDir = makeStateDir(); + }); + afterEach(() => { + clearStateDir(stateDir); + }); + + it("returns null before the first save", async () => { + expect(await loadBlueBubblesCatchupCursor("acct")).toBeNull(); + }); + + it("round-trips a saved cursor", async () => { + await saveBlueBubblesCatchupCursor("acct", 1_234_567); + const loaded = await loadBlueBubblesCatchupCursor("acct"); + expect(loaded?.lastSeenMs).toBe(1_234_567); + expect(typeof loaded?.updatedAt).toBe("number"); + }); + + it("scopes cursor files per account", async () => { + await saveBlueBubblesCatchupCursor("a", 100); + await saveBlueBubblesCatchupCursor("b", 200); + expect((await loadBlueBubblesCatchupCursor("a"))?.lastSeenMs).toBe(100); + expect((await loadBlueBubblesCatchupCursor("b"))?.lastSeenMs).toBe(200); + }); + + it("treats filesystem-unsafe account IDs as distinct", async () => { + // Different account IDs that happen to map to the same safePrefix must + // not collide on disk. + await saveBlueBubblesCatchupCursor("acct/a", 111); + await saveBlueBubblesCatchupCursor("acct:a", 222); + expect((await loadBlueBubblesCatchupCursor("acct/a"))?.lastSeenMs).toBe(111); + expect((await loadBlueBubblesCatchupCursor("acct:a"))?.lastSeenMs).toBe(222); + }); +}); + +describe("runBlueBubblesCatchup", () => { + let stateDir: string; + beforeEach(() => { + stateDir = makeStateDir(); + }); + afterEach(() => { + clearStateDir(stateDir); + vi.restoreAllMocks(); + }); + + it("coalesces concurrent runs for the same accountId via in-process singleflight", async () => { + // Two calls firing simultaneously must share one run, one fetch, one + // set of processMessage calls, one cursor write. Without singleflight, + // both calls would read the same cursor, both would process the same + // messages twice (caught by #66816 dedupe, but wasteful), and the + // second writer could regress the cursor if its nowMs is stale. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + + let fetchCount = 0; + let processCount = 0; + let resolveFetch: (() => void) | null = null; + + const call1 = runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => { + fetchCount++; + // Block until we fire the second call, so we can verify it + // coalesces rather than starting a new run. + await new Promise((resolve) => { + resolveFetch = resolve; + }); + return { + resolved: true, + messages: [makeBbMessage({ guid: "g1", dateCreated: 6 * 60 * 1000 })], + }; + }, + processMessageFn: async () => { + processCount++; + }, + }); + + // Wait a tick for call1 to enter fetchMessages, then fire call2. + await new Promise((resolve) => setTimeout(resolve, 5)); + const call2 = runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => { + fetchCount++; + return { resolved: true, messages: [makeBbMessage({ guid: "g2" })] }; + }, + processMessageFn: async () => { + processCount++; + }, + }); + + resolveFetch!(); + const [r1, r2] = await Promise.all([call1, call2]); + + expect(fetchCount).toBe(1); // second call coalesced, didn't re-fetch + expect(processCount).toBe(1); + expect(r1).toBe(r2); // same summary object returned to both callers + }); + + it("replays messages and advances the cursor on success", async () => { + const now = 10_000; + const processed: NormalizedWebhookMessage[] = []; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "g1", text: "one", dateCreated: 9_000 }), + makeBbMessage({ guid: "g2", text: "two", dateCreated: 9_500 }), + ], + }), + processMessageFn: async (message) => { + processed.push(message); + }, + }); + + expect(summary?.querySucceeded).toBe(true); + expect(summary?.replayed).toBe(2); + expect(summary?.failed).toBe(0); + expect(processed.map((m) => m.messageId)).toEqual(["g1", "g2"]); + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursor?.lastSeenMs).toBe(now); + }); + + it("clamps first-run lookback to maxAgeMinutes when smaller", async () => { + const now = 1_000_000; + let seenSince = -1; + await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + // maxAge tighter than firstRunLookback — must clamp on first run. + catchup: { maxAgeMinutes: 5, firstRunLookbackMinutes: 30 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async (sinceMs) => { + seenSince = sinceMs; + return { resolved: true, messages: [] }; + }, + processMessageFn: async () => {}, + }, + ); + expect(seenSince).toBe(now - 5 * 60_000); + }); + + it("uses firstRunLookback when no cursor exists", async () => { + const now = 1_000_000; + let seenSince = 0; + await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { firstRunLookbackMinutes: 5 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async (sinceMs) => { + seenSince = sinceMs; + return { resolved: true, messages: [] }; + }, + processMessageFn: async () => {}, + }, + ); + expect(seenSince).toBe(now - 5 * 60_000); + }); + + it("clamps window to maxAgeMinutes when cursor is older", async () => { + const now = 100 * 60_000; + await saveBlueBubblesCatchupCursor("test-account", 0); + let seenSince = -1; + await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { maxAgeMinutes: 10 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async (sinceMs) => { + seenSince = sinceMs; + return { resolved: true, messages: [] }; + }, + processMessageFn: async () => {}, + }, + ); + expect(seenSince).toBe(now - 10 * 60_000); + }); + + it("skips when enabled: false", async () => { + const called = { fetch: 0, proc: 0 }; + const summary = await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { enabled: false }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => 1_000, + fetchMessages: async () => { + called.fetch++; + return { resolved: true, messages: [] }; + }, + processMessageFn: async () => { + called.proc++; + }, + }, + ); + expect(summary).toBeNull(); + expect(called.fetch).toBe(0); + expect(called.proc).toBe(0); + }); + + it("runs catchup even on rapid restarts (no min-interval gate)", async () => { + // Catchup runs once per gateway startup, so a quick restart MUST run + // it again — otherwise messages dropped between the two startups + // (gateway down → BB ECONNREFUSED → gateway up <30s later) are lost + // permanently. Bounded by perRunLimit/maxAge + dedupe-protected. + const now = 10_000; + await saveBlueBubblesCatchupCursor("test-account", now - 5_000); + let fetched = false; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => { + fetched = true; + return { resolved: true, messages: [] }; + }, + processMessageFn: async () => {}, + }); + expect(fetched).toBe(true); + expect(summary).not.toBeNull(); + }); + + it("advances cursor only to last fetched ts when result is truncated (perRunLimit hit)", async () => { + // Long-outage scenario: 4 messages arrived during downtime but + // perRunLimit=2. Sort:ASC means we get the 2 oldest. Cursor must + // advance to the 2nd's timestamp (not nowMs) so the next startup + // picks up the remaining 2. + const now = 100 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 50 * 60 * 1000); + const summary = await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { perRunLimit: 2 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + // Only the 2 the cap allows BB to return (oldest first via ASC). + messages: [ + makeBbMessage({ guid: "p1", dateCreated: 60 * 60 * 1000 }), + makeBbMessage({ guid: "p2", dateCreated: 70 * 60 * 1000 }), + ], + }), + processMessageFn: async () => {}, + }, + ); + expect(summary?.replayed).toBe(2); + expect(summary?.fetchedCount).toBe(2); + expect(summary?.cursorAfter).toBe(70 * 60 * 1000); // page boundary, not nowMs + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursor?.lastSeenMs).toBe(70 * 60 * 1000); + }); + + it("filters isFromMe before dispatch and still advances cursor", async () => { + const now = 10_000; + const processed: NormalizedWebhookMessage[] = []; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "g-me", text: "self", dateCreated: 9_500, isFromMe: true }), + makeBbMessage({ guid: "g-them", text: "them", dateCreated: 9_500 }), + ], + }), + processMessageFn: async (m) => { + processed.push(m); + }, + }); + expect(summary?.replayed).toBe(1); + expect(summary?.skippedFromMe).toBe(1); + expect(processed.map((m) => m.messageId)).toEqual(["g-them"]); + }); + + it("leaves cursor unchanged when the query fails", async () => { + // Use timestamps well past MIN_INTERVAL_MS (30s) so the rate-limit skip + // doesn't short-circuit the run before the fetch path fires. + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ resolved: false, messages: [] }), + processMessageFn: async () => {}, + }); + expect(summary?.querySucceeded).toBe(false); + const cursor = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursor?.lastSeenMs).toBe(5 * 60 * 1000); // unchanged + }); + + it("does NOT advance cursor past a processMessage failure (retryable)", async () => { + const cursorBefore = 5 * 60 * 1000; + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", cursorBefore); + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "ok1", dateCreated: 6 * 60 * 1000 }), + makeBbMessage({ guid: "bad", dateCreated: 7 * 60 * 1000 }), + makeBbMessage({ guid: "ok2", dateCreated: 8 * 60 * 1000 }), + ], + }), + processMessageFn: async (m) => { + if (m.messageId === "bad") { + throw new Error("transient"); + } + }, + }); + // Cursor is held just before the bad message's timestamp so the next + // sweep retries it (and re-queries ok1 which dedupe will drop). + expect(summary?.failed).toBe(1); + expect(summary?.cursorAfter).toBe(7 * 60 * 1000 - 1); + const cursorAfter = await loadBlueBubblesCatchupCursor("test-account"); + expect(cursorAfter?.lastSeenMs).toBe(7 * 60 * 1000 - 1); + }); + + it("clamps held cursor to previous cursor when failure ts is below it", async () => { + // Pathological: failure timestamp is at or below the previous cursor + // (shouldn't happen with server-side `after:` but defense in depth). + // We must never regress the cursor. + const cursorBefore = 9 * 60 * 1000; + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", cursorBefore); + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "bad", dateCreated: 1_000 })], + }), + processMessageFn: async () => { + throw new Error("transient"); + }, + }); + // skippedPreCursor catches the bad record before processMessage runs, + // so no failure is recorded and cursor advances to nowMs normally. + expect(summary?.failed).toBe(0); + expect(summary?.skippedPreCursor).toBe(1); + expect(summary?.cursorAfter).toBe(now); + }); + + it("recovers from a future-dated cursor by falling through to firstRunLookback", async () => { + // Clock-skew scenario: cursor was written with a wall time that is now + // ahead of the corrected clock. Catchup must NOT pass `after=future` + // to BB (which would return zero), and must NOT save cursor=nowMs + // without first replaying the [earliestAllowed, nowMs] window. + const now = 1_000_000; + const futureCursor = now + 60_000; + await saveBlueBubblesCatchupCursor("test-account", futureCursor); + let seenSince = -1; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async (sinceMs) => { + seenSince = sinceMs; + return { resolved: true, messages: [] }; + }, + processMessageFn: async () => {}, + }); + // Should fall through to firstRunLookback (default 30 min), clamped + // to maxAge (default 120 min) — i.e. nowMs - 30min, NOT nowMs. + expect(seenSince).toBe(now - 30 * 60_000); + expect(summary).not.toBeNull(); + // Cursor should be repaired to nowMs so subsequent runs are normal. + const repaired = await loadBlueBubblesCatchupCursor("test-account"); + expect(repaired?.lastSeenMs).toBe(now); + }); + + it("isolates one failing message and keeps processing the rest", async () => { + const now = 10_000; + const processed: string[] = []; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "ok1", text: "ok1" }), + makeBbMessage({ guid: "bad", text: "bad" }), + makeBbMessage({ guid: "ok2", text: "ok2" }), + ], + }), + processMessageFn: async (m) => { + if (m.messageId === "bad") { + throw new Error("boom"); + } + processed.push(m.messageId ?? "?"); + }, + }); + expect(summary?.replayed).toBe(2); + expect(summary?.failed).toBe(1); + expect(processed).toEqual(["ok1", "ok2"]); + }); + + it("warns when fetched count hits perRunLimit so silent truncation is visible", async () => { + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + const warnings: string[] = []; + const summary = await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { perRunLimit: 3 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "a", dateCreated: 6 * 60 * 1000 }), + makeBbMessage({ guid: "b", dateCreated: 7 * 60 * 1000 }), + makeBbMessage({ guid: "c", dateCreated: 8 * 60 * 1000 }), + ], + }), + processMessageFn: async () => {}, + error: (msg) => warnings.push(msg), + }, + ); + expect(summary?.replayed).toBe(3); + expect(summary?.fetchedCount).toBe(3); + const truncationWarnings = warnings.filter((w) => w.includes("perRunLimit")); + expect(truncationWarnings).toHaveLength(1); + expect(truncationWarnings[0]).toContain("WARNING"); + expect(truncationWarnings[0]).toContain("perRunLimit=3"); + }); + + it("does not warn when fetched count is below perRunLimit", async () => { + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000); + const warnings: string[] = []; + await runBlueBubblesCatchup( + makeTarget({ + account: { + accountId: "test-account", + enabled: true, + configured: true, + baseUrl: "http://127.0.0.1:1234", + config: { + serverUrl: "http://127.0.0.1:1234", + password: "x", + network: { dangerouslyAllowPrivateNetwork: true }, + catchup: { perRunLimit: 50 }, + } as unknown as WebhookTarget["account"]["config"], + }, + }), + { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [makeBbMessage({ guid: "a" }), makeBbMessage({ guid: "b" })], + }), + processMessageFn: async () => {}, + error: (msg) => warnings.push(msg), + }, + ); + expect(warnings.filter((w) => w.includes("perRunLimit"))).toHaveLength(0); + }); + + it("skips pre-cursor timestamps as defense in depth against server-inclusive bounds", async () => { + const cursor = 5 * 60 * 1000; + const now = 10 * 60 * 1000; + await saveBlueBubblesCatchupCursor("test-account", cursor); + const processed: string[] = []; + const summary = await runBlueBubblesCatchup(makeTarget(), { + now: () => now, + fetchMessages: async () => ({ + resolved: true, + messages: [ + makeBbMessage({ guid: "before", text: "before", dateCreated: cursor - 1_000 }), + makeBbMessage({ guid: "at-boundary", text: "boundary", dateCreated: cursor }), + makeBbMessage({ guid: "after", text: "after", dateCreated: cursor + 1_000 }), + ], + }), + processMessageFn: async (m) => { + processed.push(m.messageId ?? "?"); + }, + }); + expect(summary?.replayed).toBe(1); + expect(summary?.skippedPreCursor).toBe(2); + expect(processed).toEqual(["after"]); + }); +}); + +describe("fetchBlueBubblesMessagesSince", () => { + it("returns resolved:false when the network call throws", async () => { + // Point at a port nothing is listening on so fetch fails fast. + const result = await fetchBlueBubblesMessagesSince(0, 10, { + baseUrl: "http://127.0.0.1:1", + password: "x", + allowPrivateNetwork: true, + timeoutMs: 200, + }); + expect(result.resolved).toBe(false); + expect(result.messages).toEqual([]); + }); +}); diff --git a/extensions/bluebubbles/src/catchup.ts b/extensions/bluebubbles/src/catchup.ts new file mode 100644 index 00000000000..3c1aba299be --- /dev/null +++ b/extensions/bluebubbles/src/catchup.ts @@ -0,0 +1,430 @@ +import { createHash } from "node:crypto"; +import path from "node:path"; +import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; +import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; +import { resolveBlueBubblesServerAccount } from "./account-resolve.js"; +import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js"; +import { processMessage } from "./monitor-processing.js"; +import type { WebhookTarget } from "./monitor-shared.js"; +import { blueBubblesFetchWithTimeout, buildBlueBubblesApiUrl } from "./types.js"; + +// When the gateway is down, restarting, or wedged, inbound webhook POSTs from +// BB Server fail with ECONNRESET/ECONNREFUSED. BB's WebhookService does not +// retry, and its MessagePoller only re-fires webhooks on BB-side reconnect +// events (Messages.app / APNs), not on webhook-receiver recovery. Without a +// recovery pass, messages delivered during outage windows are permanently +// lost. See #66721 for design discussion and experimental validation. + +const DEFAULT_MAX_AGE_MINUTES = 120; +const MAX_MAX_AGE_MINUTES = 12 * 60; +const DEFAULT_PER_RUN_LIMIT = 50; +const MAX_PER_RUN_LIMIT = 500; +const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30; +const FETCH_TIMEOUT_MS = 15_000; + +export type BlueBubblesCatchupConfig = { + enabled?: boolean; + maxAgeMinutes?: number; + perRunLimit?: number; + firstRunLookbackMinutes?: number; +}; + +export type BlueBubblesCatchupSummary = { + querySucceeded: boolean; + replayed: number; + skippedFromMe: number; + skippedPreCursor: number; + failed: number; + cursorBefore: number | null; + cursorAfter: number; + windowStartMs: number; + windowEndMs: number; + fetchedCount: number; +}; + +export type BlueBubblesCatchupCursor = { lastSeenMs: number; updatedAt: number }; + +function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { + // Explicit OPENCLAW_STATE_DIR overrides take precedence (including + // per-test mkdtemp dirs in this module's test suite). + if (env.OPENCLAW_STATE_DIR?.trim()) { + return resolveStateDir(env); + } + // Default test isolation: per-pid tmpdir, no bleed into real ~/.openclaw. + // Use resolvePreferredOpenClawTmpDir + string concat (mirrors + // inbound-dedupe) so this doesn't trip the tmpdir-path-guard test that + // flags dynamic template-literal suffixes on os.tmpdir() paths. + if (env.VITEST || env.NODE_ENV === "test") { + const name = "openclaw-vitest-" + process.pid; + return path.join(resolvePreferredOpenClawTmpDir(), name); + } + // Canonical OpenClaw state dir: honors `~` expansion + legacy/new + // fallback. Sharing this resolver with inbound-dedupe is what guarantees + // the catchup cursor and the dedupe state always live under the same + // root, so a replayed GUID is recognized by the dedupe after catchup + // re-feeds the message through processMessage. + return resolveStateDir(env); +} + +function resolveCursorFilePath(accountId: string): string { + // Match inbound-dedupe's file layout: readable prefix + short hash so + // account IDs that only differ by filesystem-unsafe characters do not + // collapse onto the same file. + const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account"; + const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12); + return path.join( + resolveStateDirFromEnv(), + "bluebubbles", + "catchup", + `${safePrefix}__${hash}.json`, + ); +} + +export async function loadBlueBubblesCatchupCursor( + accountId: string, +): Promise { + const filePath = resolveCursorFilePath(accountId); + const { value } = await readJsonFileWithFallback(filePath, null); + if (!value || typeof value !== "object") { + return null; + } + if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) { + return null; + } + return value; +} + +export async function saveBlueBubblesCatchupCursor( + accountId: string, + lastSeenMs: number, +): Promise { + const filePath = resolveCursorFilePath(accountId); + const cursor: BlueBubblesCatchupCursor = { lastSeenMs, updatedAt: Date.now() }; + await writeJsonFileAtomically(filePath, cursor); +} + +type FetchOpts = { + baseUrl: string; + password: string; + allowPrivateNetwork: boolean; + timeoutMs?: number; +}; + +export type BlueBubblesCatchupFetchResult = { + resolved: boolean; + messages: Array>; +}; + +export async function fetchBlueBubblesMessagesSince( + sinceMs: number, + limit: number, + opts: FetchOpts, +): Promise { + const ssrfPolicy = opts.allowPrivateNetwork ? { allowPrivateNetwork: true } : {}; + const url = buildBlueBubblesApiUrl({ + baseUrl: opts.baseUrl, + path: "/api/v1/message/query", + password: opts.password, + }); + const body = JSON.stringify({ + limit, + sort: "ASC", + after: sinceMs, + // `with` mirrors what bb-catchup.sh uses and what the normal webhook + // payload carries, so normalizeWebhookMessage has the same fields to + // read during replay as it does on live dispatch. + with: ["chat", "chat.participants", "attachment"], + }); + try { + const res = await blueBubblesFetchWithTimeout( + url, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body, + }, + opts.timeoutMs ?? FETCH_TIMEOUT_MS, + ssrfPolicy, + ); + if (!res.ok) { + return { resolved: false, messages: [] }; + } + const json = (await res.json().catch(() => null)) as { data?: unknown } | null; + if (!json || !Array.isArray(json.data)) { + return { resolved: false, messages: [] }; + } + const messages: Array> = []; + for (const entry of json.data) { + const rec = asRecord(entry); + if (rec) { + messages.push(rec); + } + } + return { resolved: true, messages }; + } catch { + return { resolved: false, messages: [] }; + } +} + +function clampCatchupConfig(raw?: BlueBubblesCatchupConfig) { + const maxAgeMinutes = Math.min( + Math.max(raw?.maxAgeMinutes ?? DEFAULT_MAX_AGE_MINUTES, 1), + MAX_MAX_AGE_MINUTES, + ); + const perRunLimit = Math.min( + Math.max(raw?.perRunLimit ?? DEFAULT_PER_RUN_LIMIT, 1), + MAX_PER_RUN_LIMIT, + ); + const firstRunLookbackMinutes = Math.min( + Math.max(raw?.firstRunLookbackMinutes ?? DEFAULT_FIRST_RUN_LOOKBACK_MINUTES, 1), + MAX_MAX_AGE_MINUTES, + ); + return { + maxAgeMs: maxAgeMinutes * 60_000, + perRunLimit, + firstRunLookbackMs: firstRunLookbackMinutes * 60_000, + }; +} + +export type RunBlueBubblesCatchupDeps = { + fetchMessages?: typeof fetchBlueBubblesMessagesSince; + processMessageFn?: typeof processMessage; + now?: () => number; + log?: (message: string) => void; + error?: (message: string) => void; +}; + +/** + * Fetch and replay BlueBubbles messages delivered since the persisted + * catchup cursor, feeding each through the same `processMessage` pipeline + * live webhooks use. Safe to call on every gateway startup: replays that + * collide with #66230's inbound dedupe cache are dropped there, so a + * message already processed via live webhook will not be processed twice. + * + * Returns the run summary, or `null` when disabled or aborted before the + * first query. + * + * Concurrent calls for the same accountId are coalesced into a single + * in-flight run via a module-level singleflight map. Without this, a + * fire-and-forget trigger (monitor.ts) combined with an overlapping + * webhook-target re-registration could race: two runs would read the + * same cursor, compute divergent `nextCursorMs` values, and the last + * writer could regress the cursor — causing repeated replay of the same + * backlog on every subsequent startup. + */ +const inFlightCatchups = new Map>(); + +export function runBlueBubblesCatchup( + target: WebhookTarget, + deps: RunBlueBubblesCatchupDeps = {}, +): Promise { + const accountId = target.account.accountId; + const existing = inFlightCatchups.get(accountId); + if (existing) { + return existing; + } + const runPromise = runBlueBubblesCatchupInner(target, deps).finally(() => { + inFlightCatchups.delete(accountId); + }); + inFlightCatchups.set(accountId, runPromise); + return runPromise; +} + +async function runBlueBubblesCatchupInner( + target: WebhookTarget, + deps: RunBlueBubblesCatchupDeps, +): Promise { + const raw = (target.account.config as { catchup?: BlueBubblesCatchupConfig }).catchup; + if (raw?.enabled === false) { + return null; + } + + const now = deps.now ?? (() => Date.now()); + const log = deps.log ?? target.runtime.log; + const error = deps.error ?? target.runtime.error; + const fetchFn = deps.fetchMessages ?? fetchBlueBubblesMessagesSince; + const procFn = deps.processMessageFn ?? processMessage; + const accountId = target.account.accountId; + + const { maxAgeMs, perRunLimit, firstRunLookbackMs } = clampCatchupConfig(raw); + const nowMs = now(); + const existing = await loadBlueBubblesCatchupCursor(accountId).catch(() => null); + const cursorBefore = existing?.lastSeenMs ?? null; + + // Catchup runs once per gateway startup (called from monitor.ts after + // webhook target registration). We deliberately do NOT short-circuit on + // a "ran recently" gate, because catchup is the only mechanism that + // recovers messages dropped during the gateway-down window. A short + // gap (e.g. <30s) between two startups can still have lost messages in + // the middle, and skipping the second startup's catchup would lose + // them permanently. The bounded query (perRunLimit, maxAge) and the + // inbound-dedupe cache from #66230 cap the cost of running the query + // every startup. + + const earliestAllowed = nowMs - maxAgeMs; + // A future-dated cursor (clock rollback via NTP correction or manual + // adjust) is unusable: querying with `after` set to a future timestamp + // would return zero records, and saving `nowMs` as the new cursor would + // permanently skip any real messages missed in the + // [earliestAllowed, nowMs] window. Treat it as if no cursor exists and + // fall through to the firstRun lookback path; the inbound-dedupe cache + // from #66230 handles any overlap with already-processed messages, and + // saving cursor = nowMs at the end of the run repairs the cursor. + const cursorIsUsable = existing !== null && existing.lastSeenMs <= nowMs; + // First-run (and recovered-future-cursor) lookback is also clamped to + // the maxAge ceiling so a config with `maxAgeMinutes: 5, + // firstRunLookbackMinutes: 30` doesn't silently exceed the operator's + // stated lookback cap on first startup. + const windowStartMs = cursorIsUsable + ? Math.max(existing.lastSeenMs, earliestAllowed) + : Math.max(nowMs - firstRunLookbackMs, earliestAllowed); + + let baseUrl: string; + let password: string; + let allowPrivateNetwork = false; + try { + ({ baseUrl, password, allowPrivateNetwork } = resolveBlueBubblesServerAccount({ + serverUrl: target.account.baseUrl, + password: target.account.config.password, + accountId, + cfg: target.config, + })); + } catch (err) { + error?.(`[${accountId}] BlueBubbles catchup: cannot resolve server account: ${String(err)}`); + return null; + } + + const { resolved, messages } = await fetchFn(windowStartMs, perRunLimit, { + baseUrl, + password, + allowPrivateNetwork, + }); + + const summary: BlueBubblesCatchupSummary = { + querySucceeded: resolved, + replayed: 0, + skippedFromMe: 0, + skippedPreCursor: 0, + failed: 0, + cursorBefore, + cursorAfter: nowMs, + windowStartMs, + windowEndMs: nowMs, + fetchedCount: messages.length, + }; + + if (!resolved) { + // Leave cursor unchanged so the next run retries the same window. + error?.(`[${accountId}] BlueBubbles catchup: message-query failed; cursor unchanged`); + return summary; + } + + // Track the earliest timestamp where `processMessage` threw so we never + // advance the cursor past a retryable failure. Normalize failures (the + // record didn't yield a usable NormalizedWebhookMessage) are treated as + // permanent skips and do NOT block cursor advance — those payloads are + // unlikely to ever normalize on retry, and blocking on them would wedge + // catchup forever. + let earliestProcessFailureTs: number | null = null; + // Track the latest fetched message timestamp regardless of fate, so a + // truncated query (fetchedCount === perRunLimit) can advance the cursor + // exactly to the page boundary. Without this, the unfetched tail past + // the cap is permanently unreachable. + let latestFetchedTs = windowStartMs; + + for (const rec of messages) { + // Defense in depth: the server-side `after:` filter should already + // exclude pre-cursor messages, but guard here against BB API variants + // that return inclusive-of-boundary data. + const ts = typeof rec.dateCreated === "number" ? rec.dateCreated : 0; + if (ts > 0 && ts > latestFetchedTs) { + latestFetchedTs = ts; + } + if (ts > 0 && ts <= windowStartMs) { + summary.skippedPreCursor++; + continue; + } + + // Filter fromMe early so BB's record of our own outbound sends cannot + // enter the inbound pipeline even if normalization would accept them. + if (rec.isFromMe === true || rec.is_from_me === true) { + summary.skippedFromMe++; + continue; + } + + const normalized = normalizeWebhookMessage({ type: "new-message", data: rec }); + if (!normalized) { + summary.failed++; + continue; + } + if (normalized.fromMe) { + summary.skippedFromMe++; + continue; + } + + try { + await procFn(normalized, target); + summary.replayed++; + } catch (err) { + summary.failed++; + if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) { + earliestProcessFailureTs = ts; + } + error?.(`[${accountId}] BlueBubbles catchup: processMessage failed: ${String(err)}`); + } + } + + // Compute the new cursor. + // + // - Default: advance to `nowMs` so subsequent runs start from the moment + // this sweep finished (avoiding stuck rescans of a message with + // `dateCreated > nowMs` from minor clock skew between BB host and + // gateway host). + // - On retryable failure (any `processMessage` throw): hold the cursor + // just before the earliest failed timestamp so the next run retries + // from there. The inbound-dedupe cache from #66230 keeps successfully + // replayed messages from being re-processed. + // - On truncation (fetched === perRunLimit): advance only to the latest + // fetched timestamp so the next run picks up from the page boundary. + // Otherwise the unfetched tail past the cap (which can be substantial + // during long outages) would be permanently unreachable. + const isTruncated = summary.fetchedCount >= perRunLimit; + let nextCursorMs = nowMs; + if (earliestProcessFailureTs !== null) { + const heldCursor = Math.max(earliestProcessFailureTs - 1, cursorBefore ?? windowStartMs); + nextCursorMs = Math.min(heldCursor, nowMs); + } else if (isTruncated) { + // Use latestFetchedTs (clamped to >= prior cursor and <= nowMs) so the + // next run starts where this page ended. + nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs); + } + summary.cursorAfter = nextCursorMs; + await saveBlueBubblesCatchupCursor(accountId, nextCursorMs).catch((err) => { + error?.(`[${accountId}] BlueBubbles catchup: cursor save failed: ${String(err)}`); + }); + + log?.( + `[${accountId}] BlueBubbles catchup: replayed=${summary.replayed} ` + + `skipped_fromMe=${summary.skippedFromMe} skipped_preCursor=${summary.skippedPreCursor} ` + + `failed=${summary.failed} fetched=${summary.fetchedCount} ` + + `window_ms=${nowMs - windowStartMs}`, + ); + + // Distinct WARNING when the BB result hits perRunLimit so operators + // know a single startup didn't drain the full backlog. The cursor was + // advanced only to the page boundary above, so the unfetched tail will + // be picked up on the next gateway startup — but if startups are + // infrequent, raising perRunLimit drains larger backlogs in one pass. + if (isTruncated) { + error?.( + `[${accountId}] BlueBubbles catchup: WARNING fetched=${summary.fetchedCount} ` + + `hit perRunLimit=${perRunLimit}; cursor advanced only to page boundary, ` + + `remaining messages will be picked up on next startup. Raise ` + + `channels.bluebubbles...catchup.perRunLimit to drain larger backlogs ` + + `in a single pass.`, + ); + } + + return summary; +} diff --git a/extensions/bluebubbles/src/config-schema.ts b/extensions/bluebubbles/src/config-schema.ts index 78df708ed63..22282989475 100644 --- a/extensions/bluebubbles/src/config-schema.ts +++ b/extensions/bluebubbles/src/config-schema.ts @@ -40,6 +40,20 @@ const bluebubblesNetworkSchema = z .strict() .optional(); +const bluebubblesCatchupSchema = z + .object({ + /** Replay messages delivered while the gateway was unreachable. Defaults to on. */ + enabled: z.boolean().optional(), + /** Hard ceiling on lookback window. Clamped to [1, 720] minutes. */ + maxAgeMinutes: z.number().int().positive().optional(), + /** Upper bound on messages replayed in a single startup pass. Clamped to [1, 500]. */ + perRunLimit: z.number().int().positive().optional(), + /** First-run lookback used when no cursor has been persisted yet. Clamped to [1, 720]. */ + firstRunLookbackMinutes: z.number().int().positive().optional(), + }) + .strict() + .optional(); + const bluebubblesAccountSchema = z .object({ name: z.string().optional(), @@ -62,6 +76,7 @@ const bluebubblesAccountSchema = z mediaLocalRoots: z.array(z.string()).optional(), sendReadReceipts: z.boolean().optional(), network: bluebubblesNetworkSchema, + catchup: bluebubblesCatchupSchema, blockStreaming: z.boolean().optional(), groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(), }) diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index c081a319b75..f34f725dddb 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -3,6 +3,7 @@ import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; import { resolveBlueBubblesEffectiveAllowPrivateNetwork } from "./accounts.js"; +import { runBlueBubblesCatchup } from "./catchup.js"; import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js"; import { asRecord, @@ -343,14 +344,15 @@ export async function monitorBlueBubblesProvider( ); } - const unregister = registerBlueBubblesWebhookTarget({ + const target: WebhookTarget = { account, config, runtime, core, path, statusSink, - }); + }; + const unregister = registerBlueBubblesWebhookTarget(target); return await new Promise((resolve) => { const stop = () => { @@ -367,6 +369,17 @@ export async function monitorBlueBubblesProvider( runtime.log?.( `[${account.accountId}] BlueBubbles webhook listening on ${normalizeWebhookPath(path)}`, ); + + // Kick off a catchup pass for messages delivered while the webhook + // target wasn't reachable. Fire-and-forget; the catchup runs through the + // same processMessage path webhooks use, and #66230's inbound dedupe + // drops any GUID that was already handled, so this is safe even if a + // live webhook raced the startup replay. See #66721. + runBlueBubblesCatchup(target).catch((err) => { + runtime.error?.( + `[${account.accountId}] BlueBubbles catchup: unexpected failure: ${String(err)}`, + ); + }); }); }