mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:20:43 +00:00
feat(bluebubbles): replay missed webhook messages after gateway restart (#66857)
Adds an in-process startup catchup pass to the BlueBubbles channel that queries BB Server for messages delivered since a persisted per-account cursor and re-feeds each through the existing processMessage pipeline. Fixes the missed-message hole documented in #66721: BB's WebhookService is fire-and-forget on POST failure, and MessagePoller only re-fires webhooks on BB-side reconnection events, not on webhook-receiver recovery. - New extensions/bluebubbles/src/catchup.ts with singleflight per accountId, cursor persistence via the canonical state-paths resolver, bounded query (perRunLimit + maxAgeMinutes), failure-held cursor, truncation-aware page-boundary advancement, future-cursor recovery, isFromMe filter (pre- and post-normalization). - monitor.ts fires catchup as a background task after the webhook target registers. - config-schema.ts adds optional catchup block; accounts.ts adds catchup to nestedObjectKeys for deep-merge per-account overrides. - Dedupes against #66816's persistent inbound GUID cache. - 22 scoped tests; full BB suite 411/411; pnpm check green; live E2E on macOS 26.3 / BB Server 1.9.x recovered 3/3 missed messages. Closes #66721. Co-authored-by: Omar Shahine <omar@shahine.com>
This commit is contained in:
@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Memory/dreaming: stop ordinary transcripts that merely quote the dream-diary prompt from being classified as internal dreaming runs and silently dropped from session recall ingestion. (#66852) Thanks @gumadeiras.
|
||||
- Telegram/documents: sanitize binary reply context and ZIP-like archive extraction so `.epub` and `.mobi` uploads can no longer leak raw binary into prompt context through reply metadata or archive-to-`text/plain` coercion. (#66877) Thanks @martinfrancois.
|
||||
- Telegram/native commands: restore plugin-registry-backed auto defaults for native commands and native skills so Telegram slash commands keep registering when `commands.native` and `commands.nativeSkills` stay on `auto`. (#66843) Thanks @kashevk0.
|
||||
- fix(bluebubbles): replay missed webhook messages after gateway restart via a persistent per-account cursor and `/api/v1/message/query?after=<ts>` pass, so messages delivered while the gateway was down no longer disappear. Uses the existing `processMessage` path and is deduped by #66816's inbound GUID cache. (#66857, #66721) Thanks @omarshahine.
|
||||
|
||||
## 2026.4.14
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ function mergeBlueBubblesAccountConfig(
|
||||
accountId,
|
||||
omitKeys: ["defaultAccount"],
|
||||
normalizeAccountId,
|
||||
nestedObjectKeys: ["network"],
|
||||
nestedObjectKeys: ["network", "catchup"],
|
||||
});
|
||||
return {
|
||||
...merged,
|
||||
|
||||
621
extensions/bluebubbles/src/catchup.test.ts
Normal file
621
extensions/bluebubbles/src/catchup.test.ts
Normal file
@@ -0,0 +1,621 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
fetchBlueBubblesMessagesSince,
|
||||
loadBlueBubblesCatchupCursor,
|
||||
runBlueBubblesCatchup,
|
||||
saveBlueBubblesCatchupCursor,
|
||||
} from "./catchup.js";
|
||||
import type { NormalizedWebhookMessage } from "./monitor-normalize.js";
|
||||
import type { WebhookTarget } from "./monitor-shared.js";
|
||||
|
||||
function makeStateDir(): string {
|
||||
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-catchup-test-"));
|
||||
process.env.OPENCLAW_STATE_DIR = dir;
|
||||
return dir;
|
||||
}
|
||||
|
||||
function clearStateDir(dir: string): void {
|
||||
delete process.env.OPENCLAW_STATE_DIR;
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
|
||||
function makeTarget(overrides: Partial<WebhookTarget & { accountId: string }> = {}): WebhookTarget {
|
||||
const accountId = overrides.accountId ?? "test-account";
|
||||
return {
|
||||
account: {
|
||||
accountId,
|
||||
enabled: true,
|
||||
name: accountId,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "test-password",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
config: {} as unknown as WebhookTarget["config"],
|
||||
runtime: { log: () => {}, error: () => {} },
|
||||
core: {} as unknown as WebhookTarget["core"],
|
||||
path: "/bluebubbles-webhook",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeBbMessage(over: Partial<Record<string, unknown>> = {}): Record<string, unknown> {
|
||||
return {
|
||||
guid: `guid-${Math.random().toString(36).slice(2, 10)}`,
|
||||
text: "hello",
|
||||
dateCreated: 2_000,
|
||||
handle: { address: "+15555550123" },
|
||||
chats: [{ guid: "iMessage;-;+15555550123" }],
|
||||
isFromMe: false,
|
||||
...over,
|
||||
};
|
||||
}
|
||||
|
||||
describe("catchup cursor persistence", () => {
|
||||
let stateDir: string;
|
||||
beforeEach(() => {
|
||||
stateDir = makeStateDir();
|
||||
});
|
||||
afterEach(() => {
|
||||
clearStateDir(stateDir);
|
||||
});
|
||||
|
||||
it("returns null before the first save", async () => {
|
||||
expect(await loadBlueBubblesCatchupCursor("acct")).toBeNull();
|
||||
});
|
||||
|
||||
it("round-trips a saved cursor", async () => {
|
||||
await saveBlueBubblesCatchupCursor("acct", 1_234_567);
|
||||
const loaded = await loadBlueBubblesCatchupCursor("acct");
|
||||
expect(loaded?.lastSeenMs).toBe(1_234_567);
|
||||
expect(typeof loaded?.updatedAt).toBe("number");
|
||||
});
|
||||
|
||||
it("scopes cursor files per account", async () => {
|
||||
await saveBlueBubblesCatchupCursor("a", 100);
|
||||
await saveBlueBubblesCatchupCursor("b", 200);
|
||||
expect((await loadBlueBubblesCatchupCursor("a"))?.lastSeenMs).toBe(100);
|
||||
expect((await loadBlueBubblesCatchupCursor("b"))?.lastSeenMs).toBe(200);
|
||||
});
|
||||
|
||||
it("treats filesystem-unsafe account IDs as distinct", async () => {
|
||||
// Different account IDs that happen to map to the same safePrefix must
|
||||
// not collide on disk.
|
||||
await saveBlueBubblesCatchupCursor("acct/a", 111);
|
||||
await saveBlueBubblesCatchupCursor("acct:a", 222);
|
||||
expect((await loadBlueBubblesCatchupCursor("acct/a"))?.lastSeenMs).toBe(111);
|
||||
expect((await loadBlueBubblesCatchupCursor("acct:a"))?.lastSeenMs).toBe(222);
|
||||
});
|
||||
});
|
||||
|
||||
describe("runBlueBubblesCatchup", () => {
|
||||
let stateDir: string;
|
||||
beforeEach(() => {
|
||||
stateDir = makeStateDir();
|
||||
});
|
||||
afterEach(() => {
|
||||
clearStateDir(stateDir);
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("coalesces concurrent runs for the same accountId via in-process singleflight", async () => {
|
||||
// Two calls firing simultaneously must share one run, one fetch, one
|
||||
// set of processMessage calls, one cursor write. Without singleflight,
|
||||
// both calls would read the same cursor, both would process the same
|
||||
// messages twice (caught by #66816 dedupe, but wasteful), and the
|
||||
// second writer could regress the cursor if its nowMs is stale.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
|
||||
let fetchCount = 0;
|
||||
let processCount = 0;
|
||||
let resolveFetch: (() => void) | null = null;
|
||||
|
||||
const call1 = runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => {
|
||||
fetchCount++;
|
||||
// Block until we fire the second call, so we can verify it
|
||||
// coalesces rather than starting a new run.
|
||||
await new Promise<void>((resolve) => {
|
||||
resolveFetch = resolve;
|
||||
});
|
||||
return {
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "g1", dateCreated: 6 * 60 * 1000 })],
|
||||
};
|
||||
},
|
||||
processMessageFn: async () => {
|
||||
processCount++;
|
||||
},
|
||||
});
|
||||
|
||||
// Wait a tick for call1 to enter fetchMessages, then fire call2.
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 5));
|
||||
const call2 = runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => {
|
||||
fetchCount++;
|
||||
return { resolved: true, messages: [makeBbMessage({ guid: "g2" })] };
|
||||
},
|
||||
processMessageFn: async () => {
|
||||
processCount++;
|
||||
},
|
||||
});
|
||||
|
||||
resolveFetch!();
|
||||
const [r1, r2] = await Promise.all([call1, call2]);
|
||||
|
||||
expect(fetchCount).toBe(1); // second call coalesced, didn't re-fetch
|
||||
expect(processCount).toBe(1);
|
||||
expect(r1).toBe(r2); // same summary object returned to both callers
|
||||
});
|
||||
|
||||
it("replays messages and advances the cursor on success", async () => {
|
||||
const now = 10_000;
|
||||
const processed: NormalizedWebhookMessage[] = [];
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "g1", text: "one", dateCreated: 9_000 }),
|
||||
makeBbMessage({ guid: "g2", text: "two", dateCreated: 9_500 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async (message) => {
|
||||
processed.push(message);
|
||||
},
|
||||
});
|
||||
|
||||
expect(summary?.querySucceeded).toBe(true);
|
||||
expect(summary?.replayed).toBe(2);
|
||||
expect(summary?.failed).toBe(0);
|
||||
expect(processed.map((m) => m.messageId)).toEqual(["g1", "g2"]);
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursor?.lastSeenMs).toBe(now);
|
||||
});
|
||||
|
||||
it("clamps first-run lookback to maxAgeMinutes when smaller", async () => {
|
||||
const now = 1_000_000;
|
||||
let seenSince = -1;
|
||||
await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
// maxAge tighter than firstRunLookback — must clamp on first run.
|
||||
catchup: { maxAgeMinutes: 5, firstRunLookbackMinutes: 30 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async (sinceMs) => {
|
||||
seenSince = sinceMs;
|
||||
return { resolved: true, messages: [] };
|
||||
},
|
||||
processMessageFn: async () => {},
|
||||
},
|
||||
);
|
||||
expect(seenSince).toBe(now - 5 * 60_000);
|
||||
});
|
||||
|
||||
it("uses firstRunLookback when no cursor exists", async () => {
|
||||
const now = 1_000_000;
|
||||
let seenSince = 0;
|
||||
await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { firstRunLookbackMinutes: 5 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async (sinceMs) => {
|
||||
seenSince = sinceMs;
|
||||
return { resolved: true, messages: [] };
|
||||
},
|
||||
processMessageFn: async () => {},
|
||||
},
|
||||
);
|
||||
expect(seenSince).toBe(now - 5 * 60_000);
|
||||
});
|
||||
|
||||
it("clamps window to maxAgeMinutes when cursor is older", async () => {
|
||||
const now = 100 * 60_000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 0);
|
||||
let seenSince = -1;
|
||||
await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxAgeMinutes: 10 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async (sinceMs) => {
|
||||
seenSince = sinceMs;
|
||||
return { resolved: true, messages: [] };
|
||||
},
|
||||
processMessageFn: async () => {},
|
||||
},
|
||||
);
|
||||
expect(seenSince).toBe(now - 10 * 60_000);
|
||||
});
|
||||
|
||||
it("skips when enabled: false", async () => {
|
||||
const called = { fetch: 0, proc: 0 };
|
||||
const summary = await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { enabled: false },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => 1_000,
|
||||
fetchMessages: async () => {
|
||||
called.fetch++;
|
||||
return { resolved: true, messages: [] };
|
||||
},
|
||||
processMessageFn: async () => {
|
||||
called.proc++;
|
||||
},
|
||||
},
|
||||
);
|
||||
expect(summary).toBeNull();
|
||||
expect(called.fetch).toBe(0);
|
||||
expect(called.proc).toBe(0);
|
||||
});
|
||||
|
||||
it("runs catchup even on rapid restarts (no min-interval gate)", async () => {
|
||||
// Catchup runs once per gateway startup, so a quick restart MUST run
|
||||
// it again — otherwise messages dropped between the two startups
|
||||
// (gateway down → BB ECONNREFUSED → gateway up <30s later) are lost
|
||||
// permanently. Bounded by perRunLimit/maxAge + dedupe-protected.
|
||||
const now = 10_000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", now - 5_000);
|
||||
let fetched = false;
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => {
|
||||
fetched = true;
|
||||
return { resolved: true, messages: [] };
|
||||
},
|
||||
processMessageFn: async () => {},
|
||||
});
|
||||
expect(fetched).toBe(true);
|
||||
expect(summary).not.toBeNull();
|
||||
});
|
||||
|
||||
it("advances cursor only to last fetched ts when result is truncated (perRunLimit hit)", async () => {
|
||||
// Long-outage scenario: 4 messages arrived during downtime but
|
||||
// perRunLimit=2. Sort:ASC means we get the 2 oldest. Cursor must
|
||||
// advance to the 2nd's timestamp (not nowMs) so the next startup
|
||||
// picks up the remaining 2.
|
||||
const now = 100 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 50 * 60 * 1000);
|
||||
const summary = await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { perRunLimit: 2 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
// Only the 2 the cap allows BB to return (oldest first via ASC).
|
||||
messages: [
|
||||
makeBbMessage({ guid: "p1", dateCreated: 60 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "p2", dateCreated: 70 * 60 * 1000 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async () => {},
|
||||
},
|
||||
);
|
||||
expect(summary?.replayed).toBe(2);
|
||||
expect(summary?.fetchedCount).toBe(2);
|
||||
expect(summary?.cursorAfter).toBe(70 * 60 * 1000); // page boundary, not nowMs
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursor?.lastSeenMs).toBe(70 * 60 * 1000);
|
||||
});
|
||||
|
||||
it("filters isFromMe before dispatch and still advances cursor", async () => {
|
||||
const now = 10_000;
|
||||
const processed: NormalizedWebhookMessage[] = [];
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "g-me", text: "self", dateCreated: 9_500, isFromMe: true }),
|
||||
makeBbMessage({ guid: "g-them", text: "them", dateCreated: 9_500 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async (m) => {
|
||||
processed.push(m);
|
||||
},
|
||||
});
|
||||
expect(summary?.replayed).toBe(1);
|
||||
expect(summary?.skippedFromMe).toBe(1);
|
||||
expect(processed.map((m) => m.messageId)).toEqual(["g-them"]);
|
||||
});
|
||||
|
||||
it("leaves cursor unchanged when the query fails", async () => {
|
||||
// Use timestamps well past MIN_INTERVAL_MS (30s) so the rate-limit skip
|
||||
// doesn't short-circuit the run before the fetch path fires.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({ resolved: false, messages: [] }),
|
||||
processMessageFn: async () => {},
|
||||
});
|
||||
expect(summary?.querySucceeded).toBe(false);
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursor?.lastSeenMs).toBe(5 * 60 * 1000); // unchanged
|
||||
});
|
||||
|
||||
it("does NOT advance cursor past a processMessage failure (retryable)", async () => {
|
||||
const cursorBefore = 5 * 60 * 1000;
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", cursorBefore);
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "ok1", dateCreated: 6 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "bad", dateCreated: 7 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "ok2", dateCreated: 8 * 60 * 1000 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async (m) => {
|
||||
if (m.messageId === "bad") {
|
||||
throw new Error("transient");
|
||||
}
|
||||
},
|
||||
});
|
||||
// Cursor is held just before the bad message's timestamp so the next
|
||||
// sweep retries it (and re-queries ok1 which dedupe will drop).
|
||||
expect(summary?.failed).toBe(1);
|
||||
expect(summary?.cursorAfter).toBe(7 * 60 * 1000 - 1);
|
||||
const cursorAfter = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursorAfter?.lastSeenMs).toBe(7 * 60 * 1000 - 1);
|
||||
});
|
||||
|
||||
it("clamps held cursor to previous cursor when failure ts is below it", async () => {
|
||||
// Pathological: failure timestamp is at or below the previous cursor
|
||||
// (shouldn't happen with server-side `after:` but defense in depth).
|
||||
// We must never regress the cursor.
|
||||
const cursorBefore = 9 * 60 * 1000;
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", cursorBefore);
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "bad", dateCreated: 1_000 })],
|
||||
}),
|
||||
processMessageFn: async () => {
|
||||
throw new Error("transient");
|
||||
},
|
||||
});
|
||||
// skippedPreCursor catches the bad record before processMessage runs,
|
||||
// so no failure is recorded and cursor advances to nowMs normally.
|
||||
expect(summary?.failed).toBe(0);
|
||||
expect(summary?.skippedPreCursor).toBe(1);
|
||||
expect(summary?.cursorAfter).toBe(now);
|
||||
});
|
||||
|
||||
it("recovers from a future-dated cursor by falling through to firstRunLookback", async () => {
|
||||
// Clock-skew scenario: cursor was written with a wall time that is now
|
||||
// ahead of the corrected clock. Catchup must NOT pass `after=future`
|
||||
// to BB (which would return zero), and must NOT save cursor=nowMs
|
||||
// without first replaying the [earliestAllowed, nowMs] window.
|
||||
const now = 1_000_000;
|
||||
const futureCursor = now + 60_000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", futureCursor);
|
||||
let seenSince = -1;
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async (sinceMs) => {
|
||||
seenSince = sinceMs;
|
||||
return { resolved: true, messages: [] };
|
||||
},
|
||||
processMessageFn: async () => {},
|
||||
});
|
||||
// Should fall through to firstRunLookback (default 30 min), clamped
|
||||
// to maxAge (default 120 min) — i.e. nowMs - 30min, NOT nowMs.
|
||||
expect(seenSince).toBe(now - 30 * 60_000);
|
||||
expect(summary).not.toBeNull();
|
||||
// Cursor should be repaired to nowMs so subsequent runs are normal.
|
||||
const repaired = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(repaired?.lastSeenMs).toBe(now);
|
||||
});
|
||||
|
||||
it("isolates one failing message and keeps processing the rest", async () => {
|
||||
const now = 10_000;
|
||||
const processed: string[] = [];
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "ok1", text: "ok1" }),
|
||||
makeBbMessage({ guid: "bad", text: "bad" }),
|
||||
makeBbMessage({ guid: "ok2", text: "ok2" }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async (m) => {
|
||||
if (m.messageId === "bad") {
|
||||
throw new Error("boom");
|
||||
}
|
||||
processed.push(m.messageId ?? "?");
|
||||
},
|
||||
});
|
||||
expect(summary?.replayed).toBe(2);
|
||||
expect(summary?.failed).toBe(1);
|
||||
expect(processed).toEqual(["ok1", "ok2"]);
|
||||
});
|
||||
|
||||
it("warns when fetched count hits perRunLimit so silent truncation is visible", async () => {
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
const warnings: string[] = [];
|
||||
const summary = await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { perRunLimit: 3 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "a", dateCreated: 6 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "b", dateCreated: 7 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "c", dateCreated: 8 * 60 * 1000 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async () => {},
|
||||
error: (msg) => warnings.push(msg),
|
||||
},
|
||||
);
|
||||
expect(summary?.replayed).toBe(3);
|
||||
expect(summary?.fetchedCount).toBe(3);
|
||||
const truncationWarnings = warnings.filter((w) => w.includes("perRunLimit"));
|
||||
expect(truncationWarnings).toHaveLength(1);
|
||||
expect(truncationWarnings[0]).toContain("WARNING");
|
||||
expect(truncationWarnings[0]).toContain("perRunLimit=3");
|
||||
});
|
||||
|
||||
it("does not warn when fetched count is below perRunLimit", async () => {
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
const warnings: string[] = [];
|
||||
await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { perRunLimit: 50 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "a" }), makeBbMessage({ guid: "b" })],
|
||||
}),
|
||||
processMessageFn: async () => {},
|
||||
error: (msg) => warnings.push(msg),
|
||||
},
|
||||
);
|
||||
expect(warnings.filter((w) => w.includes("perRunLimit"))).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("skips pre-cursor timestamps as defense in depth against server-inclusive bounds", async () => {
|
||||
const cursor = 5 * 60 * 1000;
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", cursor);
|
||||
const processed: string[] = [];
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "before", text: "before", dateCreated: cursor - 1_000 }),
|
||||
makeBbMessage({ guid: "at-boundary", text: "boundary", dateCreated: cursor }),
|
||||
makeBbMessage({ guid: "after", text: "after", dateCreated: cursor + 1_000 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async (m) => {
|
||||
processed.push(m.messageId ?? "?");
|
||||
},
|
||||
});
|
||||
expect(summary?.replayed).toBe(1);
|
||||
expect(summary?.skippedPreCursor).toBe(2);
|
||||
expect(processed).toEqual(["after"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("fetchBlueBubblesMessagesSince", () => {
|
||||
it("returns resolved:false when the network call throws", async () => {
|
||||
// Point at a port nothing is listening on so fetch fails fast.
|
||||
const result = await fetchBlueBubblesMessagesSince(0, 10, {
|
||||
baseUrl: "http://127.0.0.1:1",
|
||||
password: "x",
|
||||
allowPrivateNetwork: true,
|
||||
timeoutMs: 200,
|
||||
});
|
||||
expect(result.resolved).toBe(false);
|
||||
expect(result.messages).toEqual([]);
|
||||
});
|
||||
});
|
||||
430
extensions/bluebubbles/src/catchup.ts
Normal file
430
extensions/bluebubbles/src/catchup.ts
Normal file
@@ -0,0 +1,430 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import path from "node:path";
|
||||
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import { resolveBlueBubblesServerAccount } from "./account-resolve.js";
|
||||
import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js";
|
||||
import { processMessage } from "./monitor-processing.js";
|
||||
import type { WebhookTarget } from "./monitor-shared.js";
|
||||
import { blueBubblesFetchWithTimeout, buildBlueBubblesApiUrl } from "./types.js";
|
||||
|
||||
// When the gateway is down, restarting, or wedged, inbound webhook POSTs from
|
||||
// BB Server fail with ECONNRESET/ECONNREFUSED. BB's WebhookService does not
|
||||
// retry, and its MessagePoller only re-fires webhooks on BB-side reconnect
|
||||
// events (Messages.app / APNs), not on webhook-receiver recovery. Without a
|
||||
// recovery pass, messages delivered during outage windows are permanently
|
||||
// lost. See #66721 for design discussion and experimental validation.
|
||||
|
||||
const DEFAULT_MAX_AGE_MINUTES = 120;
|
||||
const MAX_MAX_AGE_MINUTES = 12 * 60;
|
||||
const DEFAULT_PER_RUN_LIMIT = 50;
|
||||
const MAX_PER_RUN_LIMIT = 500;
|
||||
const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30;
|
||||
const FETCH_TIMEOUT_MS = 15_000;
|
||||
|
||||
export type BlueBubblesCatchupConfig = {
|
||||
enabled?: boolean;
|
||||
maxAgeMinutes?: number;
|
||||
perRunLimit?: number;
|
||||
firstRunLookbackMinutes?: number;
|
||||
};
|
||||
|
||||
export type BlueBubblesCatchupSummary = {
|
||||
querySucceeded: boolean;
|
||||
replayed: number;
|
||||
skippedFromMe: number;
|
||||
skippedPreCursor: number;
|
||||
failed: number;
|
||||
cursorBefore: number | null;
|
||||
cursorAfter: number;
|
||||
windowStartMs: number;
|
||||
windowEndMs: number;
|
||||
fetchedCount: number;
|
||||
};
|
||||
|
||||
export type BlueBubblesCatchupCursor = { lastSeenMs: number; updatedAt: number };
|
||||
|
||||
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
|
||||
// Explicit OPENCLAW_STATE_DIR overrides take precedence (including
|
||||
// per-test mkdtemp dirs in this module's test suite).
|
||||
if (env.OPENCLAW_STATE_DIR?.trim()) {
|
||||
return resolveStateDir(env);
|
||||
}
|
||||
// Default test isolation: per-pid tmpdir, no bleed into real ~/.openclaw.
|
||||
// Use resolvePreferredOpenClawTmpDir + string concat (mirrors
|
||||
// inbound-dedupe) so this doesn't trip the tmpdir-path-guard test that
|
||||
// flags dynamic template-literal suffixes on os.tmpdir() paths.
|
||||
if (env.VITEST || env.NODE_ENV === "test") {
|
||||
const name = "openclaw-vitest-" + process.pid;
|
||||
return path.join(resolvePreferredOpenClawTmpDir(), name);
|
||||
}
|
||||
// Canonical OpenClaw state dir: honors `~` expansion + legacy/new
|
||||
// fallback. Sharing this resolver with inbound-dedupe is what guarantees
|
||||
// the catchup cursor and the dedupe state always live under the same
|
||||
// root, so a replayed GUID is recognized by the dedupe after catchup
|
||||
// re-feeds the message through processMessage.
|
||||
return resolveStateDir(env);
|
||||
}
|
||||
|
||||
function resolveCursorFilePath(accountId: string): string {
|
||||
// Match inbound-dedupe's file layout: readable prefix + short hash so
|
||||
// account IDs that only differ by filesystem-unsafe characters do not
|
||||
// collapse onto the same file.
|
||||
const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account";
|
||||
const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12);
|
||||
return path.join(
|
||||
resolveStateDirFromEnv(),
|
||||
"bluebubbles",
|
||||
"catchup",
|
||||
`${safePrefix}__${hash}.json`,
|
||||
);
|
||||
}
|
||||
|
||||
export async function loadBlueBubblesCatchupCursor(
|
||||
accountId: string,
|
||||
): Promise<BlueBubblesCatchupCursor | null> {
|
||||
const filePath = resolveCursorFilePath(accountId);
|
||||
const { value } = await readJsonFileWithFallback<BlueBubblesCatchupCursor | null>(filePath, null);
|
||||
if (!value || typeof value !== "object") {
|
||||
return null;
|
||||
}
|
||||
if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) {
|
||||
return null;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
export async function saveBlueBubblesCatchupCursor(
|
||||
accountId: string,
|
||||
lastSeenMs: number,
|
||||
): Promise<void> {
|
||||
const filePath = resolveCursorFilePath(accountId);
|
||||
const cursor: BlueBubblesCatchupCursor = { lastSeenMs, updatedAt: Date.now() };
|
||||
await writeJsonFileAtomically(filePath, cursor);
|
||||
}
|
||||
|
||||
type FetchOpts = {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
allowPrivateNetwork: boolean;
|
||||
timeoutMs?: number;
|
||||
};
|
||||
|
||||
export type BlueBubblesCatchupFetchResult = {
|
||||
resolved: boolean;
|
||||
messages: Array<Record<string, unknown>>;
|
||||
};
|
||||
|
||||
export async function fetchBlueBubblesMessagesSince(
|
||||
sinceMs: number,
|
||||
limit: number,
|
||||
opts: FetchOpts,
|
||||
): Promise<BlueBubblesCatchupFetchResult> {
|
||||
const ssrfPolicy = opts.allowPrivateNetwork ? { allowPrivateNetwork: true } : {};
|
||||
const url = buildBlueBubblesApiUrl({
|
||||
baseUrl: opts.baseUrl,
|
||||
path: "/api/v1/message/query",
|
||||
password: opts.password,
|
||||
});
|
||||
const body = JSON.stringify({
|
||||
limit,
|
||||
sort: "ASC",
|
||||
after: sinceMs,
|
||||
// `with` mirrors what bb-catchup.sh uses and what the normal webhook
|
||||
// payload carries, so normalizeWebhookMessage has the same fields to
|
||||
// read during replay as it does on live dispatch.
|
||||
with: ["chat", "chat.participants", "attachment"],
|
||||
});
|
||||
try {
|
||||
const res = await blueBubblesFetchWithTimeout(
|
||||
url,
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body,
|
||||
},
|
||||
opts.timeoutMs ?? FETCH_TIMEOUT_MS,
|
||||
ssrfPolicy,
|
||||
);
|
||||
if (!res.ok) {
|
||||
return { resolved: false, messages: [] };
|
||||
}
|
||||
const json = (await res.json().catch(() => null)) as { data?: unknown } | null;
|
||||
if (!json || !Array.isArray(json.data)) {
|
||||
return { resolved: false, messages: [] };
|
||||
}
|
||||
const messages: Array<Record<string, unknown>> = [];
|
||||
for (const entry of json.data) {
|
||||
const rec = asRecord(entry);
|
||||
if (rec) {
|
||||
messages.push(rec);
|
||||
}
|
||||
}
|
||||
return { resolved: true, messages };
|
||||
} catch {
|
||||
return { resolved: false, messages: [] };
|
||||
}
|
||||
}
|
||||
|
||||
function clampCatchupConfig(raw?: BlueBubblesCatchupConfig) {
|
||||
const maxAgeMinutes = Math.min(
|
||||
Math.max(raw?.maxAgeMinutes ?? DEFAULT_MAX_AGE_MINUTES, 1),
|
||||
MAX_MAX_AGE_MINUTES,
|
||||
);
|
||||
const perRunLimit = Math.min(
|
||||
Math.max(raw?.perRunLimit ?? DEFAULT_PER_RUN_LIMIT, 1),
|
||||
MAX_PER_RUN_LIMIT,
|
||||
);
|
||||
const firstRunLookbackMinutes = Math.min(
|
||||
Math.max(raw?.firstRunLookbackMinutes ?? DEFAULT_FIRST_RUN_LOOKBACK_MINUTES, 1),
|
||||
MAX_MAX_AGE_MINUTES,
|
||||
);
|
||||
return {
|
||||
maxAgeMs: maxAgeMinutes * 60_000,
|
||||
perRunLimit,
|
||||
firstRunLookbackMs: firstRunLookbackMinutes * 60_000,
|
||||
};
|
||||
}
|
||||
|
||||
export type RunBlueBubblesCatchupDeps = {
|
||||
fetchMessages?: typeof fetchBlueBubblesMessagesSince;
|
||||
processMessageFn?: typeof processMessage;
|
||||
now?: () => number;
|
||||
log?: (message: string) => void;
|
||||
error?: (message: string) => void;
|
||||
};
|
||||
|
||||
/**
|
||||
* Fetch and replay BlueBubbles messages delivered since the persisted
|
||||
* catchup cursor, feeding each through the same `processMessage` pipeline
|
||||
* live webhooks use. Safe to call on every gateway startup: replays that
|
||||
* collide with #66230's inbound dedupe cache are dropped there, so a
|
||||
* message already processed via live webhook will not be processed twice.
|
||||
*
|
||||
* Returns the run summary, or `null` when disabled or aborted before the
|
||||
* first query.
|
||||
*
|
||||
* Concurrent calls for the same accountId are coalesced into a single
|
||||
* in-flight run via a module-level singleflight map. Without this, a
|
||||
* fire-and-forget trigger (monitor.ts) combined with an overlapping
|
||||
* webhook-target re-registration could race: two runs would read the
|
||||
* same cursor, compute divergent `nextCursorMs` values, and the last
|
||||
* writer could regress the cursor — causing repeated replay of the same
|
||||
* backlog on every subsequent startup.
|
||||
*/
|
||||
const inFlightCatchups = new Map<string, Promise<BlueBubblesCatchupSummary | null>>();
|
||||
|
||||
export function runBlueBubblesCatchup(
|
||||
target: WebhookTarget,
|
||||
deps: RunBlueBubblesCatchupDeps = {},
|
||||
): Promise<BlueBubblesCatchupSummary | null> {
|
||||
const accountId = target.account.accountId;
|
||||
const existing = inFlightCatchups.get(accountId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const runPromise = runBlueBubblesCatchupInner(target, deps).finally(() => {
|
||||
inFlightCatchups.delete(accountId);
|
||||
});
|
||||
inFlightCatchups.set(accountId, runPromise);
|
||||
return runPromise;
|
||||
}
|
||||
|
||||
async function runBlueBubblesCatchupInner(
|
||||
target: WebhookTarget,
|
||||
deps: RunBlueBubblesCatchupDeps,
|
||||
): Promise<BlueBubblesCatchupSummary | null> {
|
||||
const raw = (target.account.config as { catchup?: BlueBubblesCatchupConfig }).catchup;
|
||||
if (raw?.enabled === false) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const now = deps.now ?? (() => Date.now());
|
||||
const log = deps.log ?? target.runtime.log;
|
||||
const error = deps.error ?? target.runtime.error;
|
||||
const fetchFn = deps.fetchMessages ?? fetchBlueBubblesMessagesSince;
|
||||
const procFn = deps.processMessageFn ?? processMessage;
|
||||
const accountId = target.account.accountId;
|
||||
|
||||
const { maxAgeMs, perRunLimit, firstRunLookbackMs } = clampCatchupConfig(raw);
|
||||
const nowMs = now();
|
||||
const existing = await loadBlueBubblesCatchupCursor(accountId).catch(() => null);
|
||||
const cursorBefore = existing?.lastSeenMs ?? null;
|
||||
|
||||
// Catchup runs once per gateway startup (called from monitor.ts after
|
||||
// webhook target registration). We deliberately do NOT short-circuit on
|
||||
// a "ran recently" gate, because catchup is the only mechanism that
|
||||
// recovers messages dropped during the gateway-down window. A short
|
||||
// gap (e.g. <30s) between two startups can still have lost messages in
|
||||
// the middle, and skipping the second startup's catchup would lose
|
||||
// them permanently. The bounded query (perRunLimit, maxAge) and the
|
||||
// inbound-dedupe cache from #66230 cap the cost of running the query
|
||||
// every startup.
|
||||
|
||||
const earliestAllowed = nowMs - maxAgeMs;
|
||||
// A future-dated cursor (clock rollback via NTP correction or manual
|
||||
// adjust) is unusable: querying with `after` set to a future timestamp
|
||||
// would return zero records, and saving `nowMs` as the new cursor would
|
||||
// permanently skip any real messages missed in the
|
||||
// [earliestAllowed, nowMs] window. Treat it as if no cursor exists and
|
||||
// fall through to the firstRun lookback path; the inbound-dedupe cache
|
||||
// from #66230 handles any overlap with already-processed messages, and
|
||||
// saving cursor = nowMs at the end of the run repairs the cursor.
|
||||
const cursorIsUsable = existing !== null && existing.lastSeenMs <= nowMs;
|
||||
// First-run (and recovered-future-cursor) lookback is also clamped to
|
||||
// the maxAge ceiling so a config with `maxAgeMinutes: 5,
|
||||
// firstRunLookbackMinutes: 30` doesn't silently exceed the operator's
|
||||
// stated lookback cap on first startup.
|
||||
const windowStartMs = cursorIsUsable
|
||||
? Math.max(existing.lastSeenMs, earliestAllowed)
|
||||
: Math.max(nowMs - firstRunLookbackMs, earliestAllowed);
|
||||
|
||||
let baseUrl: string;
|
||||
let password: string;
|
||||
let allowPrivateNetwork = false;
|
||||
try {
|
||||
({ baseUrl, password, allowPrivateNetwork } = resolveBlueBubblesServerAccount({
|
||||
serverUrl: target.account.baseUrl,
|
||||
password: target.account.config.password,
|
||||
accountId,
|
||||
cfg: target.config,
|
||||
}));
|
||||
} catch (err) {
|
||||
error?.(`[${accountId}] BlueBubbles catchup: cannot resolve server account: ${String(err)}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const { resolved, messages } = await fetchFn(windowStartMs, perRunLimit, {
|
||||
baseUrl,
|
||||
password,
|
||||
allowPrivateNetwork,
|
||||
});
|
||||
|
||||
const summary: BlueBubblesCatchupSummary = {
|
||||
querySucceeded: resolved,
|
||||
replayed: 0,
|
||||
skippedFromMe: 0,
|
||||
skippedPreCursor: 0,
|
||||
failed: 0,
|
||||
cursorBefore,
|
||||
cursorAfter: nowMs,
|
||||
windowStartMs,
|
||||
windowEndMs: nowMs,
|
||||
fetchedCount: messages.length,
|
||||
};
|
||||
|
||||
if (!resolved) {
|
||||
// Leave cursor unchanged so the next run retries the same window.
|
||||
error?.(`[${accountId}] BlueBubbles catchup: message-query failed; cursor unchanged`);
|
||||
return summary;
|
||||
}
|
||||
|
||||
// Track the earliest timestamp where `processMessage` threw so we never
|
||||
// advance the cursor past a retryable failure. Normalize failures (the
|
||||
// record didn't yield a usable NormalizedWebhookMessage) are treated as
|
||||
// permanent skips and do NOT block cursor advance — those payloads are
|
||||
// unlikely to ever normalize on retry, and blocking on them would wedge
|
||||
// catchup forever.
|
||||
let earliestProcessFailureTs: number | null = null;
|
||||
// Track the latest fetched message timestamp regardless of fate, so a
|
||||
// truncated query (fetchedCount === perRunLimit) can advance the cursor
|
||||
// exactly to the page boundary. Without this, the unfetched tail past
|
||||
// the cap is permanently unreachable.
|
||||
let latestFetchedTs = windowStartMs;
|
||||
|
||||
for (const rec of messages) {
|
||||
// Defense in depth: the server-side `after:` filter should already
|
||||
// exclude pre-cursor messages, but guard here against BB API variants
|
||||
// that return inclusive-of-boundary data.
|
||||
const ts = typeof rec.dateCreated === "number" ? rec.dateCreated : 0;
|
||||
if (ts > 0 && ts > latestFetchedTs) {
|
||||
latestFetchedTs = ts;
|
||||
}
|
||||
if (ts > 0 && ts <= windowStartMs) {
|
||||
summary.skippedPreCursor++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Filter fromMe early so BB's record of our own outbound sends cannot
|
||||
// enter the inbound pipeline even if normalization would accept them.
|
||||
if (rec.isFromMe === true || rec.is_from_me === true) {
|
||||
summary.skippedFromMe++;
|
||||
continue;
|
||||
}
|
||||
|
||||
const normalized = normalizeWebhookMessage({ type: "new-message", data: rec });
|
||||
if (!normalized) {
|
||||
summary.failed++;
|
||||
continue;
|
||||
}
|
||||
if (normalized.fromMe) {
|
||||
summary.skippedFromMe++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await procFn(normalized, target);
|
||||
summary.replayed++;
|
||||
} catch (err) {
|
||||
summary.failed++;
|
||||
if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) {
|
||||
earliestProcessFailureTs = ts;
|
||||
}
|
||||
error?.(`[${accountId}] BlueBubbles catchup: processMessage failed: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the new cursor.
|
||||
//
|
||||
// - Default: advance to `nowMs` so subsequent runs start from the moment
|
||||
// this sweep finished (avoiding stuck rescans of a message with
|
||||
// `dateCreated > nowMs` from minor clock skew between BB host and
|
||||
// gateway host).
|
||||
// - On retryable failure (any `processMessage` throw): hold the cursor
|
||||
// just before the earliest failed timestamp so the next run retries
|
||||
// from there. The inbound-dedupe cache from #66230 keeps successfully
|
||||
// replayed messages from being re-processed.
|
||||
// - On truncation (fetched === perRunLimit): advance only to the latest
|
||||
// fetched timestamp so the next run picks up from the page boundary.
|
||||
// Otherwise the unfetched tail past the cap (which can be substantial
|
||||
// during long outages) would be permanently unreachable.
|
||||
const isTruncated = summary.fetchedCount >= perRunLimit;
|
||||
let nextCursorMs = nowMs;
|
||||
if (earliestProcessFailureTs !== null) {
|
||||
const heldCursor = Math.max(earliestProcessFailureTs - 1, cursorBefore ?? windowStartMs);
|
||||
nextCursorMs = Math.min(heldCursor, nowMs);
|
||||
} else if (isTruncated) {
|
||||
// Use latestFetchedTs (clamped to >= prior cursor and <= nowMs) so the
|
||||
// next run starts where this page ended.
|
||||
nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs);
|
||||
}
|
||||
summary.cursorAfter = nextCursorMs;
|
||||
await saveBlueBubblesCatchupCursor(accountId, nextCursorMs).catch((err) => {
|
||||
error?.(`[${accountId}] BlueBubbles catchup: cursor save failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
log?.(
|
||||
`[${accountId}] BlueBubbles catchup: replayed=${summary.replayed} ` +
|
||||
`skipped_fromMe=${summary.skippedFromMe} skipped_preCursor=${summary.skippedPreCursor} ` +
|
||||
`failed=${summary.failed} fetched=${summary.fetchedCount} ` +
|
||||
`window_ms=${nowMs - windowStartMs}`,
|
||||
);
|
||||
|
||||
// Distinct WARNING when the BB result hits perRunLimit so operators
|
||||
// know a single startup didn't drain the full backlog. The cursor was
|
||||
// advanced only to the page boundary above, so the unfetched tail will
|
||||
// be picked up on the next gateway startup — but if startups are
|
||||
// infrequent, raising perRunLimit drains larger backlogs in one pass.
|
||||
if (isTruncated) {
|
||||
error?.(
|
||||
`[${accountId}] BlueBubbles catchup: WARNING fetched=${summary.fetchedCount} ` +
|
||||
`hit perRunLimit=${perRunLimit}; cursor advanced only to page boundary, ` +
|
||||
`remaining messages will be picked up on next startup. Raise ` +
|
||||
`channels.bluebubbles...catchup.perRunLimit to drain larger backlogs ` +
|
||||
`in a single pass.`,
|
||||
);
|
||||
}
|
||||
|
||||
return summary;
|
||||
}
|
||||
@@ -40,6 +40,20 @@ const bluebubblesNetworkSchema = z
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
const bluebubblesCatchupSchema = z
|
||||
.object({
|
||||
/** Replay messages delivered while the gateway was unreachable. Defaults to on. */
|
||||
enabled: z.boolean().optional(),
|
||||
/** Hard ceiling on lookback window. Clamped to [1, 720] minutes. */
|
||||
maxAgeMinutes: z.number().int().positive().optional(),
|
||||
/** Upper bound on messages replayed in a single startup pass. Clamped to [1, 500]. */
|
||||
perRunLimit: z.number().int().positive().optional(),
|
||||
/** First-run lookback used when no cursor has been persisted yet. Clamped to [1, 720]. */
|
||||
firstRunLookbackMinutes: z.number().int().positive().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
const bluebubblesAccountSchema = z
|
||||
.object({
|
||||
name: z.string().optional(),
|
||||
@@ -62,6 +76,7 @@ const bluebubblesAccountSchema = z
|
||||
mediaLocalRoots: z.array(z.string()).optional(),
|
||||
sendReadReceipts: z.boolean().optional(),
|
||||
network: bluebubblesNetworkSchema,
|
||||
catchup: bluebubblesCatchupSchema,
|
||||
blockStreaming: z.boolean().optional(),
|
||||
groups: z.object({}).catchall(bluebubblesGroupConfigSchema).optional(),
|
||||
})
|
||||
|
||||
@@ -3,6 +3,7 @@ import { safeEqualSecret } from "openclaw/plugin-sdk/browser-security-runtime";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { resolveBlueBubblesEffectiveAllowPrivateNetwork } from "./accounts.js";
|
||||
import { runBlueBubblesCatchup } from "./catchup.js";
|
||||
import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js";
|
||||
import {
|
||||
asRecord,
|
||||
@@ -343,14 +344,15 @@ export async function monitorBlueBubblesProvider(
|
||||
);
|
||||
}
|
||||
|
||||
const unregister = registerBlueBubblesWebhookTarget({
|
||||
const target: WebhookTarget = {
|
||||
account,
|
||||
config,
|
||||
runtime,
|
||||
core,
|
||||
path,
|
||||
statusSink,
|
||||
});
|
||||
};
|
||||
const unregister = registerBlueBubblesWebhookTarget(target);
|
||||
|
||||
return await new Promise((resolve) => {
|
||||
const stop = () => {
|
||||
@@ -367,6 +369,17 @@ export async function monitorBlueBubblesProvider(
|
||||
runtime.log?.(
|
||||
`[${account.accountId}] BlueBubbles webhook listening on ${normalizeWebhookPath(path)}`,
|
||||
);
|
||||
|
||||
// Kick off a catchup pass for messages delivered while the webhook
|
||||
// target wasn't reachable. Fire-and-forget; the catchup runs through the
|
||||
// same processMessage path webhooks use, and #66230's inbound dedupe
|
||||
// drops any GUID that was already handled, so this is safe even if a
|
||||
// live webhook raced the startup replay. See #66721.
|
||||
runBlueBubblesCatchup(target).catch((err) => {
|
||||
runtime.error?.(
|
||||
`[${account.accountId}] BlueBubbles catchup: unexpected failure: ${String(err)}`,
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user