mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:30:42 +00:00
Merged via squash.
Prepared head SHA: 39e3cf1df5
Co-authored-by: omarshahine <10343873+omarshahine@users.noreply.github.com>
Co-authored-by: omarshahine <10343873+omarshahine@users.noreply.github.com>
Reviewed-by: @omarshahine
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/CLI transcripts: persist successful CLI-backed turns into the OpenClaw session transcript so google-gemini-cli replies appear in session history and the Control UI again. (#67490) Thanks @obviyus.
|
||||
- Discord/tool-call text: strip standalone Gemma-style `<function>...</function>` tool-call payloads from visible assistant text without truncating prose examples or trailing replies. (#67318) Thanks @joelnishanth.
|
||||
- WhatsApp/web-session: drain the pending per-auth creds save queue before reopening sockets so reconnect-time auth bootstrap no longer races in-flight `creds.json` writes and falsely restores from backup. (#67464) Thanks @neeravmakwana.
|
||||
- BlueBubbles/catchup: add a per-message retry ceiling (`catchup.maxFailureRetries`, default 10) so a persistently-failing message with a malformed payload no longer wedges the catchup cursor forever. After N consecutive `processMessage` failures against the same GUID, catchup logs a WARN, skips that message on subsequent sweeps, and lets the cursor advance past it. Transient failures still retry from the same point as before. Also fixes a lost-update race in the persistent dedupe file lock that silently dropped inbound GUIDs on concurrent writes, a dedupe file naming migration gap on version upgrade, and a balloon-event bypass that let catchup replay debouncer-coalesced events as standalone messages. (#67426, #66870) Thanks @omarshahine.
|
||||
|
||||
## 2026.4.15-beta.1
|
||||
|
||||
|
||||
@@ -428,9 +428,13 @@ describe("runBlueBubblesCatchup", () => {
|
||||
// Cursor is held just before the bad message's timestamp so the next
|
||||
// sweep retries it (and re-queries ok1 which dedupe will drop).
|
||||
expect(summary?.failed).toBe(1);
|
||||
expect(summary?.givenUp).toBe(0);
|
||||
expect(summary?.cursorAfter).toBe(7 * 60 * 1000 - 1);
|
||||
const cursorAfter = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursorAfter?.lastSeenMs).toBe(7 * 60 * 1000 - 1);
|
||||
// Retry counter is persisted so subsequent sweeps know how close we
|
||||
// are to the give-up ceiling.
|
||||
expect(cursorAfter?.failureRetries?.bad).toBe(1);
|
||||
});
|
||||
|
||||
it("clamps held cursor to previous cursor when failure ts is below it", async () => {
|
||||
@@ -606,6 +610,494 @@ describe("runBlueBubblesCatchup", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("runBlueBubblesCatchup — per-message retry cap", () => {
|
||||
let stateDir: string;
|
||||
beforeEach(() => {
|
||||
stateDir = makeStateDir();
|
||||
});
|
||||
afterEach(() => {
|
||||
clearStateDir(stateDir);
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("increments retry counter on each consecutive failure and holds cursor", async () => {
|
||||
// Three sweeps, all fail on the same GUID. Counter accumulates and
|
||||
// cursor stays pinned below the failing message so every sweep
|
||||
// retries it. maxFailureRetries: 5 so we don't give up inside this
|
||||
// test.
|
||||
const now1 = 10 * 60 * 1000;
|
||||
const now2 = now1 + 60 * 1000;
|
||||
const now3 = now2 + 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
|
||||
const target = makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxFailureRetries: 5 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
});
|
||||
|
||||
const fetchMessages = async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 })],
|
||||
});
|
||||
const processMessageFn = async () => {
|
||||
throw new Error("boom");
|
||||
};
|
||||
|
||||
const s1 = await runBlueBubblesCatchup(target, {
|
||||
now: () => now1,
|
||||
fetchMessages,
|
||||
processMessageFn,
|
||||
});
|
||||
const s2 = await runBlueBubblesCatchup(target, {
|
||||
now: () => now2,
|
||||
fetchMessages,
|
||||
processMessageFn,
|
||||
});
|
||||
const s3 = await runBlueBubblesCatchup(target, {
|
||||
now: () => now3,
|
||||
fetchMessages,
|
||||
processMessageFn,
|
||||
});
|
||||
|
||||
expect(s1?.failed).toBe(1);
|
||||
expect(s1?.givenUp).toBe(0);
|
||||
expect(s2?.givenUp).toBe(0);
|
||||
expect(s3?.givenUp).toBe(0);
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursor?.failureRetries?.wedge).toBe(3);
|
||||
// Cursor still held just below the wedge message's timestamp.
|
||||
expect(cursor?.lastSeenMs).toBe(7 * 60 * 1000 - 1);
|
||||
});
|
||||
|
||||
it("gives up on the Nth consecutive failure and records count >= max", async () => {
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
// Pre-seed a cursor with retries at the one-before-give-up threshold
|
||||
// so a single run trips the ceiling. This mirrors what would happen
|
||||
// after many runs through the incremental-retry path above.
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 2 });
|
||||
|
||||
const warnings: string[] = [];
|
||||
const target = makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxFailureRetries: 3 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
});
|
||||
|
||||
const summary = await runBlueBubblesCatchup(target, {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 })],
|
||||
}),
|
||||
processMessageFn: async () => {
|
||||
throw new Error("malformed");
|
||||
},
|
||||
error: (m) => warnings.push(m),
|
||||
});
|
||||
|
||||
expect(summary?.failed).toBe(1);
|
||||
expect(summary?.givenUp).toBe(1);
|
||||
// Give-up no longer holds the cursor: it advances to nowMs so the
|
||||
// wedge message falls out of the next query window entirely.
|
||||
expect(summary?.cursorAfter).toBe(now);
|
||||
|
||||
const persisted = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(persisted?.lastSeenMs).toBe(now);
|
||||
// Counter is persisted at the give-up value so a later sweep that
|
||||
// still sees the message (e.g., because a different GUID is holding
|
||||
// the cursor) will recognize the GUID as given up and skip it.
|
||||
expect(persisted?.failureRetries?.wedge).toBe(3);
|
||||
|
||||
// Distinct WARN log line fired on the give-up transition.
|
||||
const giveUpWarnings = warnings.filter((w) => w.includes("giving up on guid="));
|
||||
expect(giveUpWarnings).toHaveLength(1);
|
||||
expect(giveUpWarnings[0]).toContain("guid=wedge");
|
||||
expect(giveUpWarnings[0]).toContain("3 consecutive failures");
|
||||
});
|
||||
|
||||
it("skips an already-given-up GUID without re-attempting processMessage", async () => {
|
||||
// Setup: the cursor file was written with wedge already at the
|
||||
// give-up threshold from a prior run. On this run, the cursor is
|
||||
// held by a different, still-retrying GUID (`held`), so wedge's
|
||||
// timestamp falls back into the query window. Catchup must skip
|
||||
// wedge without invoking processMessage on it.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 3 });
|
||||
|
||||
const attempted: string[] = [];
|
||||
const target = makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxFailureRetries: 3 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
});
|
||||
|
||||
const summary = await runBlueBubblesCatchup(target, {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "held", dateCreated: 6 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async (m) => {
|
||||
attempted.push(m.messageId ?? "?");
|
||||
if (m.messageId === "held") {
|
||||
throw new Error("transient");
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
// processMessage never runs for wedge.
|
||||
expect(attempted).toEqual(["held"]);
|
||||
expect(summary?.skippedGivenUp).toBe(1);
|
||||
expect(summary?.failed).toBe(1);
|
||||
expect(summary?.givenUp).toBe(0);
|
||||
// Cursor held at `held` so held keeps retrying next sweep.
|
||||
expect(summary?.cursorAfter).toBe(6 * 60 * 1000 - 1);
|
||||
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
// Both entries preserved: held at count 1 (still retrying),
|
||||
// wedge at count 3 (given up, sticky).
|
||||
expect(cursor?.failureRetries?.held).toBe(1);
|
||||
expect(cursor?.failureRetries?.wedge).toBe(3);
|
||||
});
|
||||
|
||||
it("clears the retry counter on successful processing", async () => {
|
||||
// GUID recovered after a transient failure. The counter must drop
|
||||
// so the next failure starts fresh (not carrying forward stale
|
||||
// retry history).
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { flaky: 4 });
|
||||
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "flaky", dateCreated: 6 * 60 * 1000 })],
|
||||
}),
|
||||
processMessageFn: async () => {
|
||||
/* succeeds */
|
||||
},
|
||||
});
|
||||
|
||||
expect(summary?.replayed).toBe(1);
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursor?.failureRetries?.flaky).toBeUndefined();
|
||||
// When the map is empty, the field itself is omitted from the file.
|
||||
expect(cursor?.failureRetries).toBeUndefined();
|
||||
expect(cursor?.lastSeenMs).toBe(now);
|
||||
});
|
||||
|
||||
it("resolves 'earlier retry + later give-up' by holding cursor at earlier and skipping later", async () => {
|
||||
// This is the key scenario issue #66870 exists to solve. GUID A at
|
||||
// t=6min is still retrying (count=1). GUID B at t=7min has been
|
||||
// failing for many runs and crosses the ceiling on this run. The
|
||||
// wrong answer is "advance cursor past B to t=7min" — that would
|
||||
// lose A. The right answer is "hold cursor below A, record B as
|
||||
// given-up, skip B on sight next run".
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { giveUpHere: 2 });
|
||||
|
||||
const target = makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxFailureRetries: 3 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
});
|
||||
|
||||
const summary = await runBlueBubblesCatchup(target, {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "retryEarlier", dateCreated: 6 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "giveUpHere", dateCreated: 7 * 60 * 1000 }),
|
||||
],
|
||||
}),
|
||||
processMessageFn: async () => {
|
||||
throw new Error("failing");
|
||||
},
|
||||
});
|
||||
|
||||
expect(summary?.failed).toBe(2);
|
||||
expect(summary?.givenUp).toBe(1);
|
||||
// Cursor held at (earlier message ts - 1) so retryEarlier keeps retrying.
|
||||
expect(summary?.cursorAfter).toBe(6 * 60 * 1000 - 1);
|
||||
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(cursor?.failureRetries?.retryEarlier).toBe(1);
|
||||
// Give-up counter preserved at or above the threshold.
|
||||
expect(cursor?.failureRetries?.giveUpHere).toBe(3);
|
||||
});
|
||||
|
||||
it("uses the default retry cap when maxFailureRetries is omitted from config", async () => {
|
||||
// Boot-strap: record 9 failures, then a 10th should trigger give-up
|
||||
// at the default threshold. We pre-seed the counter at 9 so this
|
||||
// single-run test doesn't need to iterate the whole sequence.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, { wedge: 9 });
|
||||
|
||||
const warnings: string[] = [];
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "wedge", dateCreated: 6 * 60 * 1000 })],
|
||||
}),
|
||||
processMessageFn: async () => {
|
||||
throw new Error("boom");
|
||||
},
|
||||
error: (m) => warnings.push(m),
|
||||
});
|
||||
expect(summary?.givenUp).toBe(1);
|
||||
expect(warnings.some((w) => w.includes("giving up on guid=wedge"))).toBe(true);
|
||||
expect(warnings.some((w) => w.includes("10 consecutive failures"))).toBe(true);
|
||||
});
|
||||
|
||||
it("clamps maxFailureRetries to >= 1 when configured to zero or negative", async () => {
|
||||
// With clamp floor of 1, the first failure already meets count >= 1
|
||||
// so catchup gives up immediately on first attempt.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
|
||||
const summary = await runBlueBubblesCatchup(
|
||||
makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxFailureRetries: 0 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
}),
|
||||
{
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "wedge", dateCreated: 6 * 60 * 1000 })],
|
||||
}),
|
||||
processMessageFn: async () => {
|
||||
throw new Error("boom");
|
||||
},
|
||||
},
|
||||
);
|
||||
expect(summary?.givenUp).toBe(1);
|
||||
expect(summary?.cursorAfter).toBe(now);
|
||||
});
|
||||
|
||||
it("loads cleanly from a legacy cursor file without a failureRetries field", async () => {
|
||||
// Older cursor files (written before this field existed) must still
|
||||
// parse. Round-trip: save without the field (legacy path), then
|
||||
// run catchup and confirm a normal sweep proceeds.
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000);
|
||||
const loaded = await loadBlueBubblesCatchupCursor("test-account");
|
||||
expect(loaded?.lastSeenMs).toBe(5 * 60 * 1000);
|
||||
expect(loaded?.failureRetries).toBeUndefined();
|
||||
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => 10 * 60 * 1000,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
messages: [makeBbMessage({ guid: "ok", dateCreated: 6 * 60 * 1000 })],
|
||||
}),
|
||||
processMessageFn: async () => {},
|
||||
});
|
||||
expect(summary?.replayed).toBe(1);
|
||||
});
|
||||
|
||||
it("drops retry entries for GUIDs that are no longer in the query window", async () => {
|
||||
// A stale entry carried in the cursor file (e.g., from an older
|
||||
// run whose cursor has since advanced past its timestamp) should
|
||||
// NOT be carried forward if the GUID does not appear in the
|
||||
// current fetch. Otherwise the map grows without bound over time.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, {
|
||||
staleGuid: 2,
|
||||
alsoStale: 5,
|
||||
});
|
||||
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => now,
|
||||
fetchMessages: async () => ({
|
||||
resolved: true,
|
||||
// Fetch returns entirely different GUIDs from the stored map.
|
||||
messages: [makeBbMessage({ guid: "fresh", dateCreated: 6 * 60 * 1000 })],
|
||||
}),
|
||||
processMessageFn: async () => {},
|
||||
});
|
||||
expect(summary?.replayed).toBe(1);
|
||||
const cursor = await loadBlueBubblesCatchupCursor("test-account");
|
||||
// Both stale entries dropped; no new entries since the fresh message
|
||||
// succeeded.
|
||||
expect(cursor?.failureRetries).toBeUndefined();
|
||||
});
|
||||
|
||||
it("preserves stickiness when a given-up GUID reappears and fails again", async () => {
|
||||
// Setup: cursor advanced, but held by a newer still-retrying GUID
|
||||
// `held`. The wedge GUID is already given up from a prior run and
|
||||
// still appears because `held` is holding the cursor below it.
|
||||
// Catchup must continue to skip wedge on sight across many runs
|
||||
// without ever calling processMessage on it.
|
||||
const now = 10 * 60 * 1000;
|
||||
await saveBlueBubblesCatchupCursor("test-account", 5 * 60 * 1000, {
|
||||
wedge: 10,
|
||||
held: 1,
|
||||
});
|
||||
|
||||
const attempted: string[] = [];
|
||||
const target = makeTarget({
|
||||
account: {
|
||||
accountId: "test-account",
|
||||
enabled: true,
|
||||
configured: true,
|
||||
baseUrl: "http://127.0.0.1:1234",
|
||||
config: {
|
||||
serverUrl: "http://127.0.0.1:1234",
|
||||
password: "x",
|
||||
network: { dangerouslyAllowPrivateNetwork: true },
|
||||
catchup: { maxFailureRetries: 5 },
|
||||
} as unknown as WebhookTarget["account"]["config"],
|
||||
},
|
||||
});
|
||||
const fetchMessages = async () => ({
|
||||
resolved: true,
|
||||
messages: [
|
||||
makeBbMessage({ guid: "held", dateCreated: 6 * 60 * 1000 }),
|
||||
makeBbMessage({ guid: "wedge", dateCreated: 7 * 60 * 1000 }),
|
||||
],
|
||||
});
|
||||
const processMessageFn = async () => {
|
||||
throw new Error("still broken");
|
||||
};
|
||||
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await runBlueBubblesCatchup(target, {
|
||||
now: () => now + i,
|
||||
fetchMessages,
|
||||
processMessageFn: async (m) => {
|
||||
attempted.push(m.messageId ?? "?");
|
||||
return processMessageFn();
|
||||
},
|
||||
});
|
||||
}
|
||||
// wedge is NEVER attempted despite reappearing every sweep.
|
||||
expect(attempted.filter((g) => g === "wedge")).toHaveLength(0);
|
||||
// held is attempted every sweep.
|
||||
expect(attempted.filter((g) => g === "held")).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("summary.skippedGivenUp counter is zero on a clean run", async () => {
|
||||
const summary = await runBlueBubblesCatchup(makeTarget(), {
|
||||
now: () => 10_000,
|
||||
fetchMessages: async () => ({ resolved: true, messages: [] }),
|
||||
processMessageFn: async () => {},
|
||||
});
|
||||
expect(summary?.skippedGivenUp).toBe(0);
|
||||
expect(summary?.givenUp).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("saveBlueBubblesCatchupCursor + loadBlueBubblesCatchupCursor — retry map", () => {
|
||||
let stateDir: string;
|
||||
beforeEach(() => {
|
||||
stateDir = makeStateDir();
|
||||
});
|
||||
afterEach(() => {
|
||||
clearStateDir(stateDir);
|
||||
});
|
||||
|
||||
it("round-trips an empty retry map by omitting the field from the persisted shape", async () => {
|
||||
await saveBlueBubblesCatchupCursor("acct", 100, {});
|
||||
const loaded = await loadBlueBubblesCatchupCursor("acct");
|
||||
expect(loaded?.lastSeenMs).toBe(100);
|
||||
expect(loaded?.failureRetries).toBeUndefined();
|
||||
});
|
||||
|
||||
it("round-trips a populated retry map", async () => {
|
||||
await saveBlueBubblesCatchupCursor("acct", 100, { a: 1, b: 9 });
|
||||
const loaded = await loadBlueBubblesCatchupCursor("acct");
|
||||
expect(loaded?.failureRetries).toEqual({ a: 1, b: 9 });
|
||||
});
|
||||
|
||||
it("filters malformed retry entries during load (zero, negative, non-numeric)", async () => {
|
||||
// Use the public save to produce the on-disk file, then overwrite
|
||||
// its contents with a hand-crafted payload to exercise the loader's
|
||||
// sanitization independently of what the saver would emit.
|
||||
await saveBlueBubblesCatchupCursor("acct", 100);
|
||||
const stateRoot = process.env.OPENCLAW_STATE_DIR;
|
||||
if (!stateRoot) {
|
||||
throw new Error("OPENCLAW_STATE_DIR must be set by the test harness");
|
||||
}
|
||||
const dir = path.join(stateRoot, "bluebubbles", "catchup");
|
||||
const files = fs.readdirSync(dir);
|
||||
expect(files).toHaveLength(1);
|
||||
const firstFile = files[0];
|
||||
if (!firstFile) {
|
||||
throw new Error("expected a cursor file to exist after save");
|
||||
}
|
||||
const badCursor = {
|
||||
lastSeenMs: 100,
|
||||
updatedAt: 0,
|
||||
failureRetries: {
|
||||
good: 3,
|
||||
zero: 0,
|
||||
negative: -1,
|
||||
notANumber: "oops",
|
||||
infinite: Number.POSITIVE_INFINITY,
|
||||
nan: Number.NaN,
|
||||
},
|
||||
};
|
||||
fs.writeFileSync(path.join(dir, firstFile), JSON.stringify(badCursor));
|
||||
|
||||
const loaded = await loadBlueBubblesCatchupCursor("acct");
|
||||
expect(loaded?.lastSeenMs).toBe(100);
|
||||
expect(loaded?.failureRetries).toEqual({ good: 3 });
|
||||
});
|
||||
});
|
||||
|
||||
describe("fetchBlueBubblesMessagesSince", () => {
|
||||
it("returns resolved:false when the network call throws", async () => {
|
||||
// Point at a port nothing is listening on so fetch fails fast.
|
||||
|
||||
@@ -4,6 +4,7 @@ import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plug
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import { resolveBlueBubblesServerAccount } from "./account-resolve.js";
|
||||
import { warmupBlueBubblesInboundDedupe } from "./inbound-dedupe.js";
|
||||
import { asRecord, normalizeWebhookMessage } from "./monitor-normalize.js";
|
||||
import { processMessage } from "./monitor-processing.js";
|
||||
import type { WebhookTarget } from "./monitor-shared.js";
|
||||
@@ -21,6 +22,14 @@ const MAX_MAX_AGE_MINUTES = 12 * 60;
|
||||
const DEFAULT_PER_RUN_LIMIT = 50;
|
||||
const MAX_PER_RUN_LIMIT = 500;
|
||||
const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30;
|
||||
const DEFAULT_MAX_FAILURE_RETRIES = 10;
|
||||
const MAX_MAX_FAILURE_RETRIES = 1_000;
|
||||
// Defense-in-depth bound: a runaway retry map (e.g., a storm of unique
|
||||
// failing GUIDs) should not balloon the cursor file unboundedly. When the
|
||||
// map exceeds this size, we keep only the highest-count entries (the ones
|
||||
// closest to being given up) and drop the rest. Realistic backlogs stay
|
||||
// well under this; the bound exists to cap pathological growth.
|
||||
const MAX_FAILURE_RETRY_MAP_SIZE = 5_000;
|
||||
const FETCH_TIMEOUT_MS = 15_000;
|
||||
|
||||
export type BlueBubblesCatchupConfig = {
|
||||
@@ -28,6 +37,13 @@ export type BlueBubblesCatchupConfig = {
|
||||
maxAgeMinutes?: number;
|
||||
perRunLimit?: number;
|
||||
firstRunLookbackMinutes?: number;
|
||||
/**
|
||||
* Per-message retry ceiling. After this many consecutive failed
|
||||
* `processMessage` attempts against the same GUID, catchup logs a WARN
|
||||
* and force-advances the cursor past the wedged message instead of
|
||||
* holding it indefinitely. Defaults to 10. Clamped to [1, 1000].
|
||||
*/
|
||||
maxFailureRetries?: number;
|
||||
};
|
||||
|
||||
export type BlueBubblesCatchupSummary = {
|
||||
@@ -35,7 +51,21 @@ export type BlueBubblesCatchupSummary = {
|
||||
replayed: number;
|
||||
skippedFromMe: number;
|
||||
skippedPreCursor: number;
|
||||
/**
|
||||
* Messages whose GUID was already recorded as "given up" from a previous
|
||||
* run (count >= `maxFailureRetries`). These are skipped without calling
|
||||
* `processMessage` again. Lets the cursor continue advancing past the
|
||||
* wedged message on the next sweep while avoiding another failed attempt.
|
||||
*/
|
||||
skippedGivenUp: number;
|
||||
failed: number;
|
||||
/**
|
||||
* Messages that crossed the `maxFailureRetries` ceiling ON THIS RUN.
|
||||
* Each transition triggers a WARN log line. Already-given-up messages
|
||||
* in subsequent runs count under `skippedGivenUp`, not here. Lets
|
||||
* operators distinguish fresh give-up events from steady-state skips.
|
||||
*/
|
||||
givenUp: number;
|
||||
cursorBefore: number | null;
|
||||
cursorAfter: number;
|
||||
windowStartMs: number;
|
||||
@@ -43,7 +73,24 @@ export type BlueBubblesCatchupSummary = {
|
||||
fetchedCount: number;
|
||||
};
|
||||
|
||||
export type BlueBubblesCatchupCursor = { lastSeenMs: number; updatedAt: number };
|
||||
export type BlueBubblesCatchupCursor = {
|
||||
lastSeenMs: number;
|
||||
updatedAt: number;
|
||||
/**
|
||||
* Per-GUID failure counter, preserved across runs. Two states:
|
||||
* - `1 <= count < maxFailureRetries`: the GUID is still retrying and
|
||||
* continues to hold the cursor back.
|
||||
* - `count >= maxFailureRetries`: catchup has "given up" on the GUID.
|
||||
* The message is skipped on sight (no `processMessage` attempt) and
|
||||
* the GUID no longer holds the cursor. The entry stays in the map
|
||||
* until the cursor naturally advances past the message's timestamp
|
||||
* (at which point the message stops appearing in queries entirely).
|
||||
*
|
||||
* A successful `processMessage` removes the entry. Optional on the
|
||||
* persisted shape so older cursor files without this field load cleanly.
|
||||
*/
|
||||
failureRetries?: Record<string, number>;
|
||||
};
|
||||
|
||||
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
|
||||
// Explicit OPENCLAW_STATE_DIR overrides take precedence (including
|
||||
@@ -81,6 +128,26 @@ function resolveCursorFilePath(accountId: string): string {
|
||||
);
|
||||
}
|
||||
|
||||
function sanitizeFailureRetriesInput(raw: unknown): Record<string, number> {
|
||||
// Older cursor files don't carry this field; also guard against
|
||||
// hand-edited JSON or future shape drift. Drop any entry whose count is
|
||||
// not a finite positive integer so downstream arithmetic stays sound.
|
||||
if (!raw || typeof raw !== "object") {
|
||||
return {};
|
||||
}
|
||||
const out: Record<string, number> = {};
|
||||
for (const [guid, count] of Object.entries(raw as Record<string, unknown>)) {
|
||||
if (!guid || typeof guid !== "string") {
|
||||
continue;
|
||||
}
|
||||
if (typeof count !== "number" || !Number.isFinite(count) || count <= 0) {
|
||||
continue;
|
||||
}
|
||||
out[guid] = Math.floor(count);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
export async function loadBlueBubblesCatchupCursor(
|
||||
accountId: string,
|
||||
): Promise<BlueBubblesCatchupCursor | null> {
|
||||
@@ -92,18 +159,66 @@ export async function loadBlueBubblesCatchupCursor(
|
||||
if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) {
|
||||
return null;
|
||||
}
|
||||
return value;
|
||||
const failureRetries = sanitizeFailureRetriesInput(value.failureRetries);
|
||||
const hasRetries = Object.keys(failureRetries).length > 0;
|
||||
// Keep the shape consistent with what the writer emits: only carry the
|
||||
// `failureRetries` key when there's something to retry. Old cursor files
|
||||
// without the field continue to round-trip to the same shape.
|
||||
return {
|
||||
lastSeenMs: value.lastSeenMs,
|
||||
updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0,
|
||||
...(hasRetries ? { failureRetries } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
export async function saveBlueBubblesCatchupCursor(
|
||||
accountId: string,
|
||||
lastSeenMs: number,
|
||||
failureRetries?: Record<string, number>,
|
||||
): Promise<void> {
|
||||
const filePath = resolveCursorFilePath(accountId);
|
||||
const cursor: BlueBubblesCatchupCursor = { lastSeenMs, updatedAt: Date.now() };
|
||||
const sanitized = sanitizeFailureRetriesInput(failureRetries);
|
||||
const hasRetries = Object.keys(sanitized).length > 0;
|
||||
const cursor: BlueBubblesCatchupCursor = {
|
||||
lastSeenMs,
|
||||
updatedAt: Date.now(),
|
||||
// Only emit the field when non-empty so unrelated cursor writes from
|
||||
// the happy path don't bloat the cursor file with `"failureRetries": {}`.
|
||||
...(hasRetries ? { failureRetries: sanitized } : {}),
|
||||
};
|
||||
await writeJsonFileAtomically(filePath, cursor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Bound the retry map so a pathological storm of unique failing GUIDs
|
||||
* cannot grow the cursor file without limit. Keeps the `maxSize` entries
|
||||
* with the highest counts (closest to give-up) when over the bound.
|
||||
*
|
||||
* The map is already scoped to "currently failing, still-retrying" GUIDs
|
||||
* and prunes on every run (entries not observed in the fetched window are
|
||||
* dropped), so this is a defense-in-depth cap, not the primary pruning
|
||||
* mechanism.
|
||||
*/
|
||||
function capFailureRetriesMap(
|
||||
map: Record<string, number>,
|
||||
maxSize: number,
|
||||
): Record<string, number> {
|
||||
const entries = Object.entries(map);
|
||||
if (entries.length <= maxSize) {
|
||||
return map;
|
||||
}
|
||||
// Sort by count desc; stable tiebreak on guid string so the retained set
|
||||
// is deterministic across runs (important for cursor-file diffing during
|
||||
// debugging).
|
||||
entries.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]));
|
||||
const capped: Record<string, number> = {};
|
||||
for (let i = 0; i < maxSize; i++) {
|
||||
const [guid, count] = entries[i];
|
||||
capped[guid] = count;
|
||||
}
|
||||
return capped;
|
||||
}
|
||||
|
||||
type FetchOpts = {
|
||||
baseUrl: string;
|
||||
password: string;
|
||||
@@ -180,10 +295,15 @@ function clampCatchupConfig(raw?: BlueBubblesCatchupConfig) {
|
||||
Math.max(raw?.firstRunLookbackMinutes ?? DEFAULT_FIRST_RUN_LOOKBACK_MINUTES, 1),
|
||||
MAX_MAX_AGE_MINUTES,
|
||||
);
|
||||
const maxFailureRetries = Math.min(
|
||||
Math.max(Math.floor(raw?.maxFailureRetries ?? DEFAULT_MAX_FAILURE_RETRIES), 1),
|
||||
MAX_MAX_FAILURE_RETRIES,
|
||||
);
|
||||
return {
|
||||
maxAgeMs: maxAgeMinutes * 60_000,
|
||||
perRunLimit,
|
||||
firstRunLookbackMs: firstRunLookbackMinutes * 60_000,
|
||||
maxFailureRetries,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -247,10 +367,11 @@ async function runBlueBubblesCatchupInner(
|
||||
const procFn = deps.processMessageFn ?? processMessage;
|
||||
const accountId = target.account.accountId;
|
||||
|
||||
const { maxAgeMs, perRunLimit, firstRunLookbackMs } = clampCatchupConfig(raw);
|
||||
const { maxAgeMs, perRunLimit, firstRunLookbackMs, maxFailureRetries } = clampCatchupConfig(raw);
|
||||
const nowMs = now();
|
||||
const existing = await loadBlueBubblesCatchupCursor(accountId).catch(() => null);
|
||||
const cursorBefore = existing?.lastSeenMs ?? null;
|
||||
const prevRetries = existing?.failureRetries ?? {};
|
||||
|
||||
// Catchup runs once per gateway startup (called from monitor.ts after
|
||||
// webhook target registration). We deliberately do NOT short-circuit on
|
||||
@@ -295,6 +416,15 @@ async function runBlueBubblesCatchupInner(
|
||||
return null;
|
||||
}
|
||||
|
||||
// Ensure legacy→hashed dedupe file migration runs and the on-disk store
|
||||
// is warm before we replay. Without this, an upgrade from a version that
|
||||
// used the old `${safe}.json` naming to the current `${safe}__${hash}.json`
|
||||
// would start with an empty dedupe cache and re-dispatch every message in
|
||||
// the catchup window — producing duplicate replies.
|
||||
await warmupBlueBubblesInboundDedupe(accountId).catch((err) => {
|
||||
error?.(`[${accountId}] BlueBubbles catchup: dedupe warmup failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
const { resolved, messages } = await fetchFn(windowStartMs, perRunLimit, {
|
||||
baseUrl,
|
||||
password,
|
||||
@@ -306,7 +436,9 @@ async function runBlueBubblesCatchupInner(
|
||||
replayed: 0,
|
||||
skippedFromMe: 0,
|
||||
skippedPreCursor: 0,
|
||||
skippedGivenUp: 0,
|
||||
failed: 0,
|
||||
givenUp: 0,
|
||||
cursorBefore,
|
||||
cursorAfter: nowMs,
|
||||
windowStartMs,
|
||||
@@ -320,18 +452,31 @@ async function runBlueBubblesCatchupInner(
|
||||
return summary;
|
||||
}
|
||||
|
||||
// Track the earliest timestamp where `processMessage` threw so we never
|
||||
// advance the cursor past a retryable failure. Normalize failures (the
|
||||
// record didn't yield a usable NormalizedWebhookMessage) are treated as
|
||||
// permanent skips and do NOT block cursor advance — those payloads are
|
||||
// unlikely to ever normalize on retry, and blocking on them would wedge
|
||||
// catchup forever.
|
||||
// Track the earliest timestamp where `processMessage` threw *and* the
|
||||
// failing message has not yet crossed the per-GUID retry ceiling, so we
|
||||
// never advance the cursor past a retryable failure. Normalize failures
|
||||
// (the record didn't yield a usable NormalizedWebhookMessage) are
|
||||
// treated as permanent skips and do NOT block cursor advance — those
|
||||
// payloads are unlikely to ever normalize on retry, and blocking on
|
||||
// them would wedge catchup forever. Given-up messages (count >= max)
|
||||
// also do NOT contribute here; see `skippedGivenUp` below.
|
||||
let earliestProcessFailureTs: number | null = null;
|
||||
// Track the latest fetched message timestamp regardless of fate, so a
|
||||
// truncated query (fetchedCount === perRunLimit) can advance the cursor
|
||||
// exactly to the page boundary. Without this, the unfetched tail past
|
||||
// the cap is permanently unreachable.
|
||||
let latestFetchedTs = windowStartMs;
|
||||
// Next-run retry map. Built from scratch each run so entries for GUIDs
|
||||
// that didn't appear in this fetch are dropped (the cursor has
|
||||
// advanced past them and they will never be queried again). Entries we
|
||||
// do carry forward encode two states via the stored count:
|
||||
// - `1 <= count < maxFailureRetries`: still-retrying, holds cursor.
|
||||
// - `count >= maxFailureRetries`: given-up, skipped on sight without
|
||||
// another `processMessage` attempt. Preserving the count is what
|
||||
// keeps the give-up state sticky across runs when an earlier
|
||||
// still-retrying failure is holding the cursor and the given-up
|
||||
// message keeps reappearing in the query window.
|
||||
const nextRetries: Record<string, number> = {};
|
||||
|
||||
for (const rec of messages) {
|
||||
// Defense in depth: the server-side `after:` filter should already
|
||||
@@ -353,6 +498,30 @@ async function runBlueBubblesCatchupInner(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip tapback/reaction/balloon events. These carry an
|
||||
// `associatedMessageGuid` pointing at the parent text message and
|
||||
// have a different `guid` of their own. The live webhook path handles
|
||||
// balloons via the debouncer, which coalesces them with their parent.
|
||||
// Without debouncing here, replaying a balloon would dispatch it as a
|
||||
// standalone message — producing a duplicate reply to the parent.
|
||||
//
|
||||
// Guard: only skip when `associatedMessageType` is set (tapbacks and
|
||||
// reactions — e.g., "like", 2000) OR `balloonBundleId` is set (URL
|
||||
// previews, stickers). iMessage threaded replies use a separate
|
||||
// `threadOriginatorGuid` field and do NOT set either of these, so
|
||||
// they pass through for correct catchup replay.
|
||||
const assocGuid =
|
||||
typeof rec.associatedMessageGuid === "string"
|
||||
? rec.associatedMessageGuid.trim()
|
||||
: typeof rec.associated_message_guid === "string"
|
||||
? rec.associated_message_guid.trim()
|
||||
: "";
|
||||
const assocType = rec.associatedMessageType ?? rec.associated_message_type;
|
||||
const balloonId = typeof rec.balloonBundleId === "string" ? rec.balloonBundleId.trim() : "";
|
||||
if (assocGuid && (assocType != null || balloonId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const normalized = normalizeWebhookMessage({ type: "new-message", data: rec });
|
||||
if (!normalized) {
|
||||
summary.failed++;
|
||||
@@ -363,15 +532,62 @@ async function runBlueBubblesCatchupInner(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Prefer the normalized messageId (what the dedupe cache uses) so the
|
||||
// retry counter and downstream dedupe key agree on identity. Fall
|
||||
// back to the raw BB `guid` only when normalization didn't supply one.
|
||||
const retryKey = normalized.messageId ?? (typeof rec.guid === "string" ? rec.guid : "");
|
||||
|
||||
// Already-given-up GUIDs are skipped without another `processMessage`
|
||||
// attempt. This is what lets catchup make forward progress through an
|
||||
// earlier, still-retrying failure while not burning cycles re-running
|
||||
// a permanently broken message every sweep.
|
||||
const prevCount = retryKey ? (prevRetries[retryKey] ?? 0) : 0;
|
||||
if (retryKey && prevCount >= maxFailureRetries) {
|
||||
summary.skippedGivenUp++;
|
||||
// Preserve the count so give-up stickiness survives this run.
|
||||
nextRetries[retryKey] = prevCount;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await procFn(normalized, target);
|
||||
summary.replayed++;
|
||||
// Success clears any accumulated retries for this GUID. Since we
|
||||
// build `nextRetries` from scratch rather than mutating
|
||||
// `prevRetries`, simply NOT copying the entry is the clear. (We
|
||||
// still need this branch so readers understand the lifecycle.)
|
||||
} catch (err) {
|
||||
summary.failed++;
|
||||
if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) {
|
||||
earliestProcessFailureTs = ts;
|
||||
const nextCount = prevCount + 1;
|
||||
if (retryKey && nextCount >= maxFailureRetries) {
|
||||
// Crossing the ceiling this run: log WARN once and record the
|
||||
// give-up in the persisted map. Don't contribute to
|
||||
// `earliestProcessFailureTs` — we're intentionally letting the
|
||||
// cursor advance past this GUID on the next sweep.
|
||||
summary.givenUp++;
|
||||
nextRetries[retryKey] = nextCount;
|
||||
error?.(
|
||||
`[${accountId}] BlueBubbles catchup: giving up on guid=${retryKey} ` +
|
||||
`after ${nextCount} consecutive failures; future sweeps will skip ` +
|
||||
`this message. timestamp=${ts}: ${String(err)}`,
|
||||
);
|
||||
} else {
|
||||
// Still retrying: count this failure and hold the cursor so the
|
||||
// next sweep retries the same window. (retryKey may be empty in
|
||||
// the unusual case where neither normalizer nor raw payload
|
||||
// carried a GUID — in that case we hold the cursor but cannot
|
||||
// increment a counter, matching pre-retry-cap behavior.)
|
||||
if (retryKey) {
|
||||
nextRetries[retryKey] = nextCount;
|
||||
}
|
||||
if (ts > 0 && (earliestProcessFailureTs === null || ts < earliestProcessFailureTs)) {
|
||||
earliestProcessFailureTs = ts;
|
||||
}
|
||||
error?.(
|
||||
`[${accountId}] BlueBubbles catchup: processMessage failed (retry ` +
|
||||
`${nextCount}/${maxFailureRetries}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
error?.(`[${accountId}] BlueBubbles catchup: processMessage failed: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,10 +597,17 @@ async function runBlueBubblesCatchupInner(
|
||||
// this sweep finished (avoiding stuck rescans of a message with
|
||||
// `dateCreated > nowMs` from minor clock skew between BB host and
|
||||
// gateway host).
|
||||
// - On retryable failure (any `processMessage` throw): hold the cursor
|
||||
// just before the earliest failed timestamp so the next run retries
|
||||
// from there. The inbound-dedupe cache from #66230 keeps successfully
|
||||
// replayed messages from being re-processed.
|
||||
// - On retryable failure (any still-retrying `processMessage` throw,
|
||||
// where the GUID has NOT crossed `maxFailureRetries`): hold the
|
||||
// cursor just before the earliest still-retrying failed timestamp so
|
||||
// the next run retries from there. The inbound-dedupe cache from
|
||||
// #66230 keeps successfully replayed messages from being re-processed.
|
||||
// - On give-up (failures that crossed `maxFailureRetries`): the GUID
|
||||
// is recorded in the persisted retry map with `count >= max` and
|
||||
// skipped on sight in subsequent runs (without another processMessage
|
||||
// attempt). Give-up GUIDs intentionally do NOT hold the cursor, so
|
||||
// the cursor can advance past them naturally — this is what unwedges
|
||||
// catchup from a permanently malformed message (issue #66870).
|
||||
// - On truncation (fetched === perRunLimit): advance only to the latest
|
||||
// fetched timestamp so the next run picks up from the page boundary.
|
||||
// Otherwise the unfetched tail past the cap (which can be substantial
|
||||
@@ -400,14 +623,18 @@ async function runBlueBubblesCatchupInner(
|
||||
nextCursorMs = Math.min(Math.max(latestFetchedTs, cursorBefore ?? windowStartMs), nowMs);
|
||||
}
|
||||
summary.cursorAfter = nextCursorMs;
|
||||
await saveBlueBubblesCatchupCursor(accountId, nextCursorMs).catch((err) => {
|
||||
// Cap the retry map before writing — defense in depth against a storm
|
||||
// of unique failing GUIDs ballooning the cursor file.
|
||||
const retriesToPersist = capFailureRetriesMap(nextRetries, MAX_FAILURE_RETRY_MAP_SIZE);
|
||||
await saveBlueBubblesCatchupCursor(accountId, nextCursorMs, retriesToPersist).catch((err) => {
|
||||
error?.(`[${accountId}] BlueBubbles catchup: cursor save failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
log?.(
|
||||
`[${accountId}] BlueBubbles catchup: replayed=${summary.replayed} ` +
|
||||
`skipped_fromMe=${summary.skippedFromMe} skipped_preCursor=${summary.skippedPreCursor} ` +
|
||||
`failed=${summary.failed} fetched=${summary.fetchedCount} ` +
|
||||
`skipped_givenUp=${summary.skippedGivenUp} failed=${summary.failed} ` +
|
||||
`given_up=${summary.givenUp} fetched=${summary.fetchedCount} ` +
|
||||
`window_ms=${nowMs - windowStartMs}`,
|
||||
);
|
||||
|
||||
|
||||
@@ -50,6 +50,14 @@ const bluebubblesCatchupSchema = z
|
||||
perRunLimit: z.number().int().positive().optional(),
|
||||
/** First-run lookback used when no cursor has been persisted yet. Clamped to [1, 720]. */
|
||||
firstRunLookbackMinutes: z.number().int().positive().optional(),
|
||||
/**
|
||||
* Consecutive-failure ceiling per message GUID. After this many failed
|
||||
* processMessage attempts against the same GUID, catchup logs a WARN
|
||||
* and skips the message on subsequent sweeps (letting the cursor
|
||||
* advance past a permanently malformed payload). Defaults to 10.
|
||||
* Clamped to [1, 1000].
|
||||
*/
|
||||
maxFailureRetries: z.number().int().positive().optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { type ClaimableDedupe, createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
|
||||
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
|
||||
@@ -33,6 +34,11 @@ function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
|
||||
return resolveStateDir(env);
|
||||
}
|
||||
|
||||
function resolveLegacyNamespaceFilePath(namespace: string): string {
|
||||
const safe = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "global";
|
||||
return path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe", `${safe}.json`);
|
||||
}
|
||||
|
||||
function resolveNamespaceFilePath(namespace: string): string {
|
||||
// Keep a readable prefix for operator debugging, but suffix with a short
|
||||
// hash of the raw namespace so account IDs that only differ by
|
||||
@@ -40,12 +46,42 @@ function resolveNamespaceFilePath(namespace: string): string {
|
||||
// onto the same file.
|
||||
const safePrefix = namespace.replace(/[^a-zA-Z0-9_-]/g, "_") || "ns";
|
||||
const hash = createHash("sha256").update(namespace, "utf8").digest("hex").slice(0, 12);
|
||||
return path.join(
|
||||
resolveStateDirFromEnv(),
|
||||
"bluebubbles",
|
||||
"inbound-dedupe",
|
||||
`${safePrefix}__${hash}.json`,
|
||||
);
|
||||
const dir = path.join(resolveStateDirFromEnv(), "bluebubbles", "inbound-dedupe");
|
||||
const newPath = path.join(dir, `${safePrefix}__${hash}.json`);
|
||||
|
||||
// One-time migration: earlier beta shipped `${safe}.json` (no hash).
|
||||
// Rename so the upgrade preserves existing dedupe entries instead of
|
||||
// starting from an empty file and replaying already-handled messages.
|
||||
migrateLegacyDedupeFile(namespace, newPath);
|
||||
|
||||
return newPath;
|
||||
}
|
||||
|
||||
const migratedNamespaces = new Set<string>();
|
||||
|
||||
function migrateLegacyDedupeFile(namespace: string, newPath: string): void {
|
||||
if (migratedNamespaces.has(namespace)) {
|
||||
return;
|
||||
}
|
||||
migratedNamespaces.add(namespace);
|
||||
try {
|
||||
const legacyPath = resolveLegacyNamespaceFilePath(namespace);
|
||||
if (legacyPath === newPath) {
|
||||
return;
|
||||
}
|
||||
if (!fs.existsSync(legacyPath)) {
|
||||
return;
|
||||
}
|
||||
if (!fs.existsSync(newPath)) {
|
||||
fs.renameSync(legacyPath, newPath);
|
||||
} else {
|
||||
// Both exist: new file is authoritative; remove the stale legacy.
|
||||
fs.unlinkSync(legacyPath);
|
||||
}
|
||||
} catch {
|
||||
// Best-effort migration; a missed rename is strictly less harmful
|
||||
// than crashing the module load path.
|
||||
}
|
||||
}
|
||||
|
||||
function buildPersistentImpl(): ClaimableDedupe {
|
||||
@@ -162,6 +198,18 @@ export async function claimBlueBubblesInboundMessage(params: {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure the legacy→hashed dedupe file migration runs and the on-disk
|
||||
* store is warmed into memory for the given account. Call before any
|
||||
* catchup replay so already-handled GUIDs are recognized even when the
|
||||
* file-naming convention changed between versions.
|
||||
*/
|
||||
export async function warmupBlueBubblesInboundDedupe(accountId: string): Promise<void> {
|
||||
// Trigger the migration side-effect inside resolveNamespaceFilePath.
|
||||
resolveNamespaceFilePath(accountId);
|
||||
await impl.warmup(accountId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset inbound dedupe state between tests. Installs an in-memory-only
|
||||
* implementation so tests do not hit disk, avoiding file-lock timing issues
|
||||
|
||||
@@ -154,6 +154,35 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
|
||||
const lockOptions = mergeLockOptions(options.lockOptions);
|
||||
const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize });
|
||||
const inflight = new Map<string, Promise<boolean>>();
|
||||
// In-process write queue per file path. `withFileLock` is re-entrant
|
||||
// within the same process (a second caller for the same path gets
|
||||
// immediate access instead of waiting), so two concurrent
|
||||
// checkAndRecordInner calls for different keys but the same file can
|
||||
// race: both read the same stale data, and the last writer's
|
||||
// writeJsonFileAtomically silently overwrites the first writer's
|
||||
// additions. This queue serializes all read-modify-write cycles
|
||||
// targeting the same file within this process, preventing the lost
|
||||
// update while still allowing cross-process file-lock contention to
|
||||
// be handled by the file lock itself.
|
||||
const fileWriteQueues = new Map<string, Promise<unknown>>();
|
||||
|
||||
function enqueueFileWrite<T>(filePath: string, fn: () => Promise<T>): Promise<T> {
|
||||
const prev = fileWriteQueues.get(filePath) ?? Promise.resolve();
|
||||
const next = prev.then(fn, fn);
|
||||
fileWriteQueues.set(filePath, next);
|
||||
// Cleanup: remove the queue entry once this link settles, but only if
|
||||
// no newer work was chained after us. The `.catch(() => {})` prevents
|
||||
// an unhandled rejection when `next` rejects — callers still observe
|
||||
// the rejection through the returned `next` promise directly.
|
||||
next
|
||||
.finally(() => {
|
||||
if (fileWriteQueues.get(filePath) === next) {
|
||||
fileWriteQueues.delete(filePath);
|
||||
}
|
||||
})
|
||||
.catch(() => {});
|
||||
return next;
|
||||
}
|
||||
|
||||
async function checkAndRecordInner(
|
||||
key: string,
|
||||
@@ -168,19 +197,21 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
|
||||
|
||||
const path = options.resolveFilePath(namespace);
|
||||
try {
|
||||
const duplicate = await withFileLock(path, lockOptions, async () => {
|
||||
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(path, {});
|
||||
const data = sanitizeData(value);
|
||||
const seenAt = data[key];
|
||||
const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs);
|
||||
if (isRecent) {
|
||||
return true;
|
||||
}
|
||||
data[key] = now;
|
||||
pruneData(data, now, ttlMs, fileMaxEntries);
|
||||
await writeJsonFileAtomically(path, data);
|
||||
return false;
|
||||
});
|
||||
const duplicate = await enqueueFileWrite(path, () =>
|
||||
withFileLock(path, lockOptions, async () => {
|
||||
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(path, {});
|
||||
const data = sanitizeData(value);
|
||||
const seenAt = data[key];
|
||||
const isRecent = seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs);
|
||||
if (isRecent) {
|
||||
return true;
|
||||
}
|
||||
data[key] = now;
|
||||
pruneData(data, now, ttlMs, fileMaxEntries);
|
||||
await writeJsonFileAtomically(path, data);
|
||||
return false;
|
||||
}),
|
||||
);
|
||||
return !duplicate;
|
||||
} catch (error) {
|
||||
onDiskError?.(error);
|
||||
|
||||
Reference in New Issue
Block a user