feat(imessage): inbound catchup (cursor + replay loop + monitor wiring) (#79387)

Closes #78649. Adds opt-in inbound iMessage catchup that recovers messages landing in chat.db while the gateway is offline (crash, restart, mac sleep). Mirrors the design of the retired BlueBubbles catchup, adapted for the imsg JSON-RPC chats.list + messages.history fetch path.

- Schema: new channels.imessage.catchup block with enabled / maxAgeMinutes (1..720) / perRunLimit (1..500) / firstRunLookbackMinutes (1..720) / maxFailureRetries (1..1000). Disabled by default — opt-in.
- Cursor + replay loop (extensions/imessage/src/monitor/catchup.ts): per-account state under <openclawStateDir>/imessage/catchup/. Walks rows oldest-first, advances on success/give-up, holds at failed.rowid - 1 when a failure is below maxFailureRetries (cannot leapfrog held failures even when later rows in the same batch succeed). Watermark floor for parse-rejected rows.
- Bridge (extensions/imessage/src/monitor/catchup-bridge.ts): live chats.list + per-chat messages.history fetch adapter; dispatch adapter routes through the live handleMessageNow path so allowlists / group policy / dedupe / echo cache behave identically on replayed and live messages. Watermark clamped to last dispatched rowid when the cap truncates.
- Monitor wiring (extensions/imessage/src/monitor/monitor-provider.ts): catchup runs once between watch.subscribe and the live dispatch loop when enabled. Bypasses the inbound debouncer for serial per-row dispatch.
- Echo-cache TTL bumped 2 min → 12 h so own outbound rows from before a gap are not re-fed as inbound on replay.
- Generated bundled-channel-config-metadata.generated.ts so the runtime AJV schema accepts the new catchup block.
- Docs: new "Catching up after gateway downtime" section + BlueBubbles migration parity update.

Tests: 322/322 in extensions/imessage/, including 5 regression tests covering the cursor-leapfrog, parse-rejected stall, watermark vs held failure, and cap-truncation-cursor-floor edge cases that codex (gpt-5.4) and clawsweeper (gpt-5.5) found during review. Live-tested end-to-end against the running gateway: replayed=1 fetchedCount=1, agent reply observed, cursor persisted at the test row's exact rowid.

Co-authored-by: Omar Shahine <10343873+omarshahine@users.noreply.github.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Omar Shahine
2026-05-08 22:55:59 -04:00
committed by GitHub
parent 8989d0a777
commit 81e0a1a99b
14 changed files with 1806 additions and 31 deletions

View File

@@ -193,6 +193,7 @@ Docs: https://docs.openclaw.ai
- Plugin SDK: add a generic `api.runtime.llm.complete` host completion helper with runtime-derived caller attribution, config-gated model/agent overrides, session-bound context-engine access, request-scoped config, audit metadata, and normalized usage attribution. (#64294) Thanks @DaevMithran.
- Control UI/exec approvals: highlight parsed shell command fragments that may deserve extra review in approval prompts. (#77153) Thanks @jesse-merhi.
- Channels/iMessage: honor `channels.imessage.groups.<chat_id>.systemPrompt` (and the `groups["*"]` wildcard) by forwarding it as `GroupSystemPrompt` on inbound group turns, mirroring the byte-identical resolver semantic from WhatsApp where defining the key as an empty string on a specific group suppresses the wildcard fallback. Brings iMessage to parity with the per-group `systemPrompt` pattern already supported by Discord, Telegram, IRC, Slack, GoogleChat, and the retired BlueBubbles channel. Fixes #78285. (#79383) Thanks @omarshahine.
- iMessage: add opt-in inbound catchup that replays messages received while the gateway was offline (crash, restart, mac sleep) on next startup. Enable with `channels.imessage.catchup.enabled: true`; tunables for `maxAgeMinutes`, `perRunLimit`, `firstRunLookbackMinutes`, and `maxFailureRetries`. Persists a per-account cursor under the OpenClaw state dir (`<openclawStateDir>/imessage/catchup/`), replays each row through the live dispatch path so allowlists/group policy/dedupe behave identically on replayed and live messages, and force-advances past wedged guids after `maxFailureRetries` to prevent stuck cursors. Extends the persisted echo-cache retention window so the agent's own outbound rows from before a gap are not re-fed as inbound on replay. Includes a regenerated `src/config/bundled-channel-config-metadata.generated.ts` so the runtime AJV schema accepts the new `channels.imessage.catchup` block. Fixes #78649. (#79387) Thanks @omarshahine.
### Breaking

View File

@@ -1,4 +1,4 @@
91480b7bb68280f5b762f4352e456b294d673efcb3989874f70f618714985c71 config-baseline.json
7c4f1417784024d6942de993f1b4dcb9f20c82cec7674047d6b351ab1f586fde config-baseline.core.json
d851534e7f7f44b427d7fa82b7ad287349f069461e3569d23583929611821c31 config-baseline.channel.json
c8a698cf0968fe5b27b2bbc798d3c811ba989a7207ed372cbfd95965c894f65b config-baseline.json
67c7db6eeb7f74dd454118e17304c5486ab59d33e7899c501b003c326d35db0f config-baseline.core.json
e3160218e86959dfa00f35b8b9eca85c3bf436d83dbbe3e7204247dcb692f0a1 config-baseline.channel.json
7a9ed89a6ff7e578bfcab7828ab660af59e62402a85bfbfc05d5ae3d975e9728 config-baseline.plugin.json

View File

@@ -205,22 +205,22 @@ If the gateway logs `imessage: dropping group message from chat_id=<id>` or the
## Action parity at a glance
| Action | legacy BlueBubbles | bundled iMessage |
| ---------------------------------------------------------- | ----------------------------------- | ------------------------------------------------------------------------------------ |
| Send text / SMS fallback | ✅ | ✅ |
| Send media (photo, video, file, voice) | ✅ | ✅ |
| Threaded reply (`reply_to_guid`) | ✅ | ✅ (closes [#51892](https://github.com/openclaw/openclaw/issues/51892)) |
| Tapback (`react`) | ✅ | ✅ |
| Edit / unsend (macOS 13+ recipients) | ✅ | ✅ |
| Send with screen effect | ✅ | ✅ (closes part of [#9394](https://github.com/openclaw/openclaw/issues/9394)) |
| Rich text bold / italic / underline / strikethrough | ✅ | ✅ (typed-run formatting via attributedBody) |
| Rename group / set group icon | ✅ | ✅ |
| Add / remove participant, leave group | ✅ | ✅ |
| Read receipts and typing indicator | ✅ | ✅ (gated on private API probe) |
| Same-sender DM coalescing | ✅ | ✅ (DM-only; opt-in via `channels.imessage.coalesceSameSenderDms`) |
| Catchup of inbound messages received while gateway is down | ✅ (webhook replay + history fetch) | _(not yet — tracked at [#78649](https://github.com/openclaw/openclaw/issues/78649))_ |
| Action | legacy BlueBubbles | bundled iMessage |
| ---------------------------------------------------------- | ----------------------------------- | ----------------------------------------------------------------------------------------------------------------------- |
| Send text / SMS fallback | ✅ | ✅ |
| Send media (photo, video, file, voice) | ✅ | ✅ |
| Threaded reply (`reply_to_guid`) | ✅ | ✅ (closes [#51892](https://github.com/openclaw/openclaw/issues/51892)) |
| Tapback (`react`) | ✅ | ✅ |
| Edit / unsend (macOS 13+ recipients) | ✅ | ✅ |
| Send with screen effect | ✅ | ✅ (closes part of [#9394](https://github.com/openclaw/openclaw/issues/9394)) |
| Rich text bold / italic / underline / strikethrough | ✅ | ✅ (typed-run formatting via attributedBody) |
| Rename group / set group icon | ✅ | ✅ |
| Add / remove participant, leave group | ✅ | ✅ |
| Read receipts and typing indicator | ✅ | ✅ (gated on private API probe) |
| Same-sender DM coalescing | ✅ | ✅ (DM-only; opt-in via `channels.imessage.coalesceSameSenderDms`) |
| Catchup of inbound messages received while gateway is down | ✅ (webhook replay + history fetch) | ✅ (opt-in via `channels.imessage.catchup.enabled`; closes [#78649](https://github.com/openclaw/openclaw/issues/78649)) |
The catchup gap is the most operationally significant one for production deployments: planned restarts, mac sleep, or an unexpected gateway crash that takes more than a few seconds will silently drop any inbound iMessage traffic that arrives during the gap when running on bundled iMessage. BlueBubbles' webhook + history-fetch flow recovered those messages on reconnect, but BlueBubbles is no longer supported. There is no supported migration path that preserves catchup today; wait for [#78649](https://github.com/openclaw/openclaw/issues/78649).
iMessage catchup is now available as an opt-in feature on the bundled plugin. On gateway startup, if `channels.imessage.catchup.enabled` is `true`, the gateway runs one `chats.list` + per-chat `messages.history` pass against the same JSON-RPC client used by `imsg watch`, replays each missed inbound row through the live dispatch path (allowlists, group policy, debouncer, echo cache), and persists a per-account cursor so subsequent startups pick up where they left off. See [Catching up after gateway downtime](/channels/imessage#catching-up-after-gateway-downtime) for tuning.
## Pairing, sessions, and ACP bindings

View File

@@ -9,7 +9,7 @@ title: "iMessage"
<Note>
For OpenClaw iMessage deployments, use `imsg` on a signed-in macOS Messages host. If your Gateway runs on Linux or Windows, point `channels.imessage.cliPath` at an SSH wrapper that runs `imsg` on the Mac.
**Known gap: no gateway-downtime catchup.** Messages that arrive while the gateway is down (crash, restart, Mac sleep, machine off) are not delivered to the agent once the gateway comes back up — `imsg watch` resumes from the current state and ignores anything that landed in `chat.db` during the gap. Tracked at [openclaw#78649](https://github.com/openclaw/openclaw/issues/78649).
**Gateway-downtime catchup is opt-in.** When enabled (`channels.imessage.catchup.enabled: true`), the gateway replays inbound messages that landed in `chat.db` while it was offline (crash, restart, Mac sleep) on next startup. Disabled by default — see [Catching up after gateway downtime](#catching-up-after-gateway-downtime). Closes [openclaw#78649](https://github.com/openclaw/openclaw/issues/78649).
</Note>
<Warning>
@@ -634,6 +634,66 @@ The two rows arrive at OpenClaw ~0.8-2.0 s apart on most setups. Without coalesc
| Rapid flood (>10 small DMs inside window) | N rows | N turns | One turn, bounded output (first + latest, text/attachment caps applied) |
| Two people typing in a group chat | N rows from M senders | M+ turns (one per sender bucket) | M+ turns — group chats are not coalesced |
## Catching up after gateway downtime
When the gateway is offline (crash, restart, Mac sleep, machine off), `imsg watch` resumes from the current `chat.db` state once the gateway comes back up — anything that arrived during the gap is, by default, never seen. Catchup replays those messages on the next startup so the agent does not silently miss inbound traffic.
Catchup is **disabled by default**. Enable it per channel:
```ts
channels: {
imessage: {
catchup: {
enabled: true, // master switch (default: false)
maxAgeMinutes: 120, // skip rows older than now - 2h (default: 120, clamp 1..720)
perRunLimit: 50, // max rows replayed per startup (default: 50, clamp 1..500)
firstRunLookbackMinutes: 30, // first run with no cursor: look back 30 min (default: 30)
maxFailureRetries: 10, // give up on a wedged guid after 10 dispatch failures (default: 10)
},
},
}
```
### How it runs
One pass per `monitorIMessageProvider` startup, sequenced as `imsg launch` ready → `watch.subscribe` → `performIMessageCatchup` → live dispatch loop. Catchup itself uses `chats.list` + per-chat `messages.history` against the same JSON-RPC client used by `imsg watch`. Anything that arrives during the catchup pass flows through live dispatch normally; the existing inbound-dedupe cache absorbs any overlap with replayed rows.
Each replayed row is fed through the live dispatch path (`evaluateIMessageInbound` + `dispatchInboundMessage`), so allowlists, group policy, debouncer, echo cache, and read receipts behave identically on replayed and live messages.
### Cursor and retry semantics
Catchup keeps a per-account cursor at `<openclawStateDir>/imessage/catchup/<account>__<hash>.json` (the OpenClaw state dir defaults to `~/.openclaw`, overridable with `OPENCLAW_STATE_DIR`):
```json
{
"lastSeenMs": 1717900800000,
"lastSeenRowid": 482910,
"updatedAt": 1717900801234,
"failureRetries": { "<guid>": 1 }
}
```
- The cursor advances on each successful dispatch and is held when a row's dispatch throws — the next startup retries the same row from the held cursor.
- After `maxFailureRetries` consecutive throws against the same `guid`, catchup logs a `warn` and force-advances the cursor past the wedged message so subsequent startups can make progress.
- Already-given-up guids are skipped on sight (no dispatch attempt) on later runs and counted under `skippedGivenUp` in the run summary.
### Operator-visible signals
```
imessage catchup: replayed=N skippedFromMe=… skippedGivenUp=… failed=… givenUp=… fetchedCount=…
imessage catchup: giving up on guid=<guid> after <N> failures; advancing cursor past it
imessage catchup: fetched <X> rows across chats, capped to perRunLimit=<Y>
```
A `WARN ... capped to perRunLimit` line means a single startup did not drain the full backlog. Raise `perRunLimit` (max 500) if your gaps regularly exceed the default 50-row pass.
### When to leave it off
- Gateway runs continuously with watchdog auto-restart and gaps are always < a few seconds — the default of off is fine.
- DM volume is low and missed messages would not change agent behavior — the `firstRunLookbackMinutes` initial window can dispatch surprising old context on first enable.
When you turn catchup on, the first startup with no cursor only looks back `firstRunLookbackMinutes` (30 min default), not the full `maxAgeMinutes` window — this avoids replaying a long history of pre-enable messages.
## Troubleshooting
<AccordionGroup>

View File

@@ -0,0 +1,404 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { runIMessageCatchup } from "./catchup-bridge.js";
import { resolveCatchupConfig } from "./catchup.js";
import type { IMessagePayload } from "./types.js";
type RpcCall = {
method: string;
params: unknown;
};
function makeFakeClient(responder: (call: RpcCall) => unknown): {
client: {
request: <T>(method: string, params: unknown) => Promise<T>;
};
calls: RpcCall[];
} {
const calls: RpcCall[] = [];
const client = {
request: async <T>(method: string, params: unknown): Promise<T> => {
calls.push({ method, params });
return responder({ method, params }) as T;
},
};
return { client, calls };
}
function makeRow(opts: {
id: number;
guid: string;
chat_id: number;
created_at: string;
is_from_me?: boolean;
text?: string;
sender?: string;
}): Record<string, unknown> {
return {
id: opts.id,
guid: opts.guid,
chat_id: opts.chat_id,
sender: opts.sender ?? "+15551234",
is_from_me: opts.is_from_me ?? false,
text: opts.text ?? "hello",
created_at: opts.created_at,
chat_identifier: "+15551234",
chat_guid: `iMessage;-;${opts.sender ?? "+15551234"}`,
is_group: false,
};
}
describe("runIMessageCatchup", () => {
let tempDir: string;
beforeEach(() => {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-catchup-bridge-"));
vi.stubEnv("OPENCLAW_STATE_DIR", tempDir);
});
afterEach(() => {
vi.unstubAllEnvs();
vi.useRealTimers();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it("fetches chats then per-chat history and dispatches each row in rowid order", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const dispatched: IMessagePayload[] = [];
const { client, calls } = makeFakeClient(({ method, params }) => {
if (method === "chats.list") {
return {
chats: [
{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" },
{ id: 2, last_message_at: "2026-05-08T11:50:00.000Z" },
],
};
}
if (method === "messages.history") {
const p = params as { chat_id: number };
if (p.chat_id === 1) {
return {
messages: [
makeRow({ id: 102, guid: "g-102", chat_id: 1, created_at: "2026-05-08T11:55:00Z" }),
makeRow({ id: 100, guid: "g-100", chat_id: 1, created_at: "2026-05-08T11:50:00Z" }),
],
};
}
return {
messages: [
makeRow({ id: 101, guid: "g-101", chat_id: 2, created_at: "2026-05-08T11:51:00Z" }),
],
};
}
throw new Error(`unexpected method ${method}`);
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 50, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async (msg) => {
dispatched.push(msg);
},
});
expect(summary.querySucceeded).toBe(true);
expect(summary.replayed).toBe(3);
expect(dispatched.map((m) => m.guid)).toEqual(["g-100", "g-101", "g-102"]);
expect(calls[0]?.method).toBe("chats.list");
expect(calls.filter((c) => c.method === "messages.history")).toHaveLength(2);
});
it("skips chats whose last_message_at is older than the catchup window", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
let historyCalls = 0;
const { client } = makeFakeClient(({ method, params }) => {
if (method === "chats.list") {
return {
chats: [
{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" },
{ id: 99, last_message_at: "2025-12-01T00:00:00.000Z" }, // ancient
],
};
}
if (method === "messages.history") {
historyCalls += 1;
const p = params as { chat_id: number };
return {
messages: [
makeRow({
id: 200,
guid: `g-${p.chat_id}`,
chat_id: p.chat_id,
created_at: "2026-05-08T11:55:00Z",
}),
],
};
}
throw new Error(`unexpected method ${method}`);
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 50, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async () => {},
});
expect(summary.querySucceeded).toBe(true);
expect(historyCalls).toBe(1);
expect(summary.replayed).toBe(1);
});
it("returns querySucceeded=false when chats.list throws", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const { client } = makeFakeClient(({ method }) => {
if (method === "chats.list") {
throw new Error("rpc timeout");
}
throw new Error(`unexpected method ${method}`);
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 50, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async () => {
throw new Error("dispatch should not be called when fetch fails");
},
});
expect(summary.querySucceeded).toBe(false);
expect(summary.replayed).toBe(0);
});
it("continues across chats when a single messages.history call throws", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const dispatched: string[] = [];
const { client } = makeFakeClient(({ method, params }) => {
if (method === "chats.list") {
return {
chats: [
{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" },
{ id: 2, last_message_at: "2026-05-08T11:50:00.000Z" },
],
};
}
if (method === "messages.history") {
const p = params as { chat_id: number };
if (p.chat_id === 1) {
throw new Error("permission denied");
}
return {
messages: [
makeRow({ id: 300, guid: "g-300", chat_id: 2, created_at: "2026-05-08T11:51:00Z" }),
],
};
}
throw new Error(`unexpected method ${method}`);
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 50, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async (msg) => {
if (msg.guid) {
dispatched.push(msg.guid);
}
},
});
expect(summary.querySucceeded).toBe(true);
expect(summary.replayed).toBe(1);
expect(dispatched).toEqual(["g-300"]);
});
it("caps cross-chat results at perRunLimit, oldest first", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const dispatched: string[] = [];
const { client } = makeFakeClient(({ method, params }) => {
if (method === "chats.list") {
return {
chats: [
{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" },
{ id: 2, last_message_at: "2026-05-08T11:55:00.000Z" },
],
};
}
if (method === "messages.history") {
const p = params as { chat_id: number };
const base = p.chat_id * 100;
return {
messages: Array.from({ length: 4 }, (_, i) =>
makeRow({
id: base + i,
guid: `g-${base + i}`,
chat_id: p.chat_id,
created_at: "2026-05-08T11:55:00Z",
}),
),
};
}
throw new Error(`unexpected method ${method}`);
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 5, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async (msg) => {
if (msg.guid) {
dispatched.push(msg.guid);
}
},
});
expect(summary.fetchedCount).toBe(5);
expect(summary.replayed).toBe(5);
// Oldest-first by rowid: 100, 101, 102, 103, 200 (chat 1's first 4, then chat 2's first).
expect(dispatched).toEqual(["g-100", "g-101", "g-102", "g-103", "g-200"]);
// Regression for clawsweeper #79387 finding: the cursor must NOT
// advance past the last dispatched row when perRunLimit truncates
// the cross-chat page. Without the cap-aware watermark clamp, the
// bridge would emit a watermark covering the raw rows it dropped
// (rowids 201, 202, 203 from chat 2), and the catchup loop would
// persist `lastSeenRowid` past them — so the promised "next startup
// picks up the rest" warning would lie and those rows would be
// permanently lost. Cursor must stop at the last dispatched rowid (200).
expect(summary.cursorAfter.lastSeenRowid).toBe(200);
});
it("treats a dispatch throw as a failure and holds the cursor", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const { client } = makeFakeClient(({ method }) => {
if (method === "chats.list") {
return { chats: [{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" }] };
}
return {
messages: [
makeRow({ id: 500, guid: "g-500", chat_id: 1, created_at: "2026-05-08T11:55:00Z" }),
],
};
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({
enabled: true,
perRunLimit: 50,
maxAgeMinutes: 60,
maxFailureRetries: 3,
}),
includeAttachments: false,
dispatchPayload: async () => {
throw new Error("model unavailable");
},
});
expect(summary.failed).toBe(1);
expect(summary.replayed).toBe(0);
// Cursor clamps to `failed.rowid - 1` (== 499), strictly below the held
// failure, so the next pass refetches row 500 — and never leapfrogs it.
expect(summary.cursorAfter.lastSeenRowid).toBe(499);
});
it("emits a high-watermark even when every row fails payload validation", async () => {
// Regression: without this, a chat whose only fresh row is unparseable
// (corrupt text column, schema drift) would stall catchup forever — the
// row never reaches the cursor loop, the cursor never advances past it,
// the next pass re-fetches and re-drops the same row. The bridge probes
// raw `id` / `created_at` per row before parsing and emits the highest
// values it saw as a watermark so the cursor loop can still advance.
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const { client } = makeFakeClient(({ method }) => {
if (method === "chats.list") {
return { chats: [{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" }] };
}
return {
messages: [
// Junk row — wrong types in everything except id + created_at, so
// parseIMessageNotification rejects it but the watermark probe
// still records id=999 and the parsed created_at.
{
id: 999,
guid: 42, // wrong type
chat_id: "x", // wrong type
sender: false, // wrong type
is_from_me: "no", // wrong type
text: 7, // wrong type
created_at: "2026-05-08T11:55:00.000Z",
},
],
};
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 50, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async () => {},
});
expect(summary.querySucceeded).toBe(true);
expect(summary.replayed).toBe(0);
expect(summary.fetchedCount).toBe(0);
// Cursor advances to the watermark — next pass won't keep re-fetching this row.
expect(summary.cursorAfter.lastSeenRowid).toBe(999);
});
it("filters rows that fail payload validation", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
const dispatched: string[] = [];
const { client } = makeFakeClient(({ method }) => {
if (method === "chats.list") {
return { chats: [{ id: 1, last_message_at: "2026-05-08T11:55:00.000Z" }] };
}
return {
messages: [
// Valid row.
makeRow({ id: 600, guid: "g-600", chat_id: 1, created_at: "2026-05-08T11:55:00Z" }),
// Junk row — wrong types — must be dropped silently.
{ id: "not-a-number", guid: 42, chat_id: "x" },
// Missing guid.
{
...makeRow({ id: 601, guid: "", chat_id: 1, created_at: "2026-05-08T11:55:00Z" }),
guid: undefined,
},
],
};
});
const summary = await runIMessageCatchup({
client: client as never,
accountId: "default",
config: resolveCatchupConfig({ enabled: true, perRunLimit: 50, maxAgeMinutes: 60 }),
includeAttachments: false,
dispatchPayload: async (msg) => {
if (msg.guid) {
dispatched.push(msg.guid);
}
},
});
expect(summary.replayed).toBe(1);
expect(dispatched).toEqual(["g-600"]);
});
});

View File

@@ -0,0 +1,275 @@
import { warn } from "openclaw/plugin-sdk/runtime-env";
import type { IMessageRpcClient } from "../client.js";
import {
type CatchupDispatchFn,
type CatchupFetchFn,
type IMessageCatchupRow,
type IMessageCatchupSummary,
performIMessageCatchup,
type ResolvedCatchupConfig,
} from "./catchup.js";
import { parseIMessageNotification } from "./parse-notification.js";
import type { IMessagePayload } from "./types.js";
// Per-chat history fetch budget. messages.history is per-chat; we cap each
// chat's fetch to the global perRunLimit so a single noisy group cannot
// dominate the cursor advance — the cross-chat sort + final slice still
// caps the global pass at perRunLimit.
const PER_CHAT_HISTORY_LIMIT_CAP = 500;
// chats.list page size used during catchup. 200 covers far more than any
// realistic offline window worth of distinct chats while staying well under
// any sensible chat.db query cost.
const CATCHUP_CHATS_LIST_LIMIT = 200;
// Per-RPC timeout. Catchup runs once at startup; a slow imsg should not
// stall the live dispatch loop indefinitely.
const CATCHUP_RPC_TIMEOUT_MS = 30_000;
type ChatsListEntry = {
id?: number | null;
last_message_at?: string | null;
};
type MessagesHistoryResult = {
messages?: unknown[];
};
type RuntimeLogger = {
log?: (msg: string) => void;
error?: (msg: string) => void;
};
export type RunIMessageCatchupParams = {
client: IMessageRpcClient;
accountId: string;
config: ResolvedCatchupConfig;
includeAttachments: boolean;
/**
* The same per-message handler the live `imsg watch` notification path
* runs (i.e. the post-debounce `handleMessageNow` in `monitor-provider`).
* Catchup feeds rows in oldest-first by rowid. Throws are recorded as
* dispatch failures; non-throw returns count as successful dispatch
* (including non-error drops, which mirrors the live pipeline).
*/
dispatchPayload: (message: IMessagePayload) => Promise<void>;
runtime?: RuntimeLogger;
/** Override clock for tests. */
now?: () => number;
};
/**
* Wire `performIMessageCatchup` against the live `imsg` JSON-RPC client.
*
* Catchup recovers messages that landed in `chat.db` while the gateway was
* offline (crash, restart, mac sleep) by:
* 1. listing recently-active chats via `chats.list`,
* 2. fetching per-chat history since the cursor via `messages.history`,
* 3. sorting cross-chat by `rowid`, capping at `perRunLimit`,
* 4. replaying each row through the same `dispatchPayload` handler used
* by the live notification loop, so existing dedupe / coalesce / echo
* / read-receipt behavior covers replayed rows for free.
*
* Runs at most once per `monitorIMessageProvider` invocation, between
* `watch.subscribe` and the live dispatch loop. Anything that arrives during
* catchup itself flows through live dispatch; the existing inbound-dedupe
* cache absorbs any overlap.
*/
export async function runIMessageCatchup(
params: RunIMessageCatchupParams,
): Promise<IMessageCatchupSummary> {
const { client, accountId, config, includeAttachments, dispatchPayload, runtime } = params;
const log = (msg: string) => runtime?.log?.(msg);
const warnLog = (msg: string) => runtime?.log?.(warn(msg));
// Map keyed by guid so the dispatch adapter can recover the full payload
// the fetcher pulled from `messages.history`. Local to this catchup pass —
// discarded when the function returns.
const payloadByGuid = new Map<string, IMessagePayload>();
const fetchFn: CatchupFetchFn = async ({ sinceMs, sinceRowid, limit }) => {
let chatsResult: { chats?: ChatsListEntry[] } | undefined;
try {
chatsResult = await client.request<{ chats?: ChatsListEntry[] }>(
"chats.list",
{ limit: CATCHUP_CHATS_LIST_LIMIT },
{ timeoutMs: CATCHUP_RPC_TIMEOUT_MS },
);
} catch (err) {
warnLog(`imessage catchup: chats.list failed: ${String(err)}`);
return { resolved: false, rows: [] };
}
const chats = chatsResult?.chats ?? [];
const sinceISO = new Date(sinceMs).toISOString();
const collected: IMessageCatchupRow[] = [];
const perChatLimit = Math.min(limit, PER_CHAT_HISTORY_LIMIT_CAP);
// Track the highest rowid / date the imsg bridge actually returned across
// all chats, regardless of whether each row passed the parser. The catchup
// loop uses this as a cursor-advance floor so an unparseable row (corrupt
// text column, schema drift, etc.) cannot stall catchup forever — without
// this, the same broken row would be re-fetched and re-dropped on every
// gateway startup.
let rawWatermarkRowid = -Infinity;
let rawWatermarkMs = -Infinity;
for (const chat of chats) {
const chatId = typeof chat.id === "number" && Number.isFinite(chat.id) ? chat.id : null;
if (chatId === null) {
continue;
}
// Skip chats that have not seen activity in the catchup window. Saves
// a per-chat RPC for every old archived conversation.
const lastMs =
typeof chat.last_message_at === "string" ? Date.parse(chat.last_message_at) : Number.NaN;
if (Number.isFinite(lastMs) && lastMs < sinceMs) {
continue;
}
let historyResult: MessagesHistoryResult | undefined;
try {
historyResult = await client.request<MessagesHistoryResult>(
"messages.history",
{
chat_id: chatId,
limit: perChatLimit,
start: sinceISO,
attachments: includeAttachments,
},
{ timeoutMs: CATCHUP_RPC_TIMEOUT_MS },
);
} catch (err) {
// Best-effort per chat. A single broken chat must not poison the
// whole pass — drop and continue.
warnLog(`imessage catchup: messages.history failed for chat_id=${chatId}: ${String(err)}`);
continue;
}
const messages = Array.isArray(historyResult?.messages) ? historyResult.messages : [];
for (const raw of messages) {
// Best-effort raw-watermark probe BEFORE we run the parser, so even
// rows we drop still let the cursor advance past them. We only trust
// numeric `id` / parseable `created_at` — if the row is so malformed
// that we cannot even read those, leave the watermark unchanged for
// this row (same forward-progress behavior as today, just no worse).
const rawRecord = raw && typeof raw === "object" ? (raw as Record<string, unknown>) : null;
const rawRowid =
rawRecord && typeof rawRecord.id === "number" && Number.isFinite(rawRecord.id)
? rawRecord.id
: null;
const rawCreatedAt =
rawRecord && typeof rawRecord.created_at === "string" ? rawRecord.created_at : null;
const rawDateMs = rawCreatedAt ? Date.parse(rawCreatedAt) : Number.NaN;
if (rawRowid !== null) {
rawWatermarkRowid = Math.max(rawWatermarkRowid, rawRowid);
}
if (Number.isFinite(rawDateMs)) {
rawWatermarkMs = Math.max(rawWatermarkMs, rawDateMs);
}
// Reuse the live notification parser by wrapping the row in the same
// `{ message: ... }` envelope. Anything that fails the parser would
// also be dropped on the live path, so the same shape guard applies.
const payload = parseIMessageNotification({ message: raw });
if (!payload) {
continue;
}
const guid = payload.guid?.trim();
const rowid = typeof payload.id === "number" ? payload.id : null;
const dateMs =
typeof payload.created_at === "string" ? Date.parse(payload.created_at) : Number.NaN;
if (!guid || rowid === null || !Number.isFinite(rowid) || !Number.isFinite(dateMs)) {
continue;
}
if (rowid <= sinceRowid) {
continue;
}
collected.push({
guid,
rowid,
date: dateMs,
isFromMe: payload.is_from_me === true,
});
payloadByGuid.set(guid, payload);
}
}
const sorted = collected.toSorted((a, b) => a.rowid - b.rowid);
const capped = sorted.slice(0, limit);
const isCapTruncated = capped.length < sorted.length;
if (isCapTruncated) {
warnLog(
`imessage catchup: fetched ${sorted.length} rows across chats, ` +
`capped to perRunLimit=${limit} (oldest first); next startup picks up the rest`,
);
// Drop payloads we are no longer going to dispatch so the dispatch
// adapter does not have to defend against the discarded ones.
const keep = new Set(capped.map((row) => row.guid));
for (const guid of payloadByGuid.keys()) {
if (!keep.has(guid)) {
payloadByGuid.delete(guid);
}
}
}
// Clamp the raw watermark when cap-truncation hits so the catchup loop
// cannot persist a cursor past undispatched valid rows. Without this,
// a `messages.history` page wider than `perRunLimit` would silently
// skip the cap-truncated tail forever — the WARN above promises the
// next startup picks up the rest, and that promise relies on the
// cursor staying at the last dispatched rowid. When no truncation
// happens, the watermark covers parse-rejected rows interspersed
// with the dispatched batch (the original forward-progress fix).
let effectiveWatermarkRowid = rawWatermarkRowid;
let effectiveWatermarkMs = rawWatermarkMs;
if (isCapTruncated && capped.length > 0) {
const last = capped.at(-1);
if (last) {
effectiveWatermarkRowid = Math.min(effectiveWatermarkRowid, last.rowid);
effectiveWatermarkMs = Math.min(effectiveWatermarkMs, last.date);
}
} else if (isCapTruncated && capped.length === 0) {
// Pathological: cap=0. Don't emit any watermark; preserve the prior
// cursor and let the next pass try again.
effectiveWatermarkRowid = Number.NaN;
effectiveWatermarkMs = Number.NaN;
}
return {
resolved: true,
rows: capped,
...(Number.isFinite(effectiveWatermarkRowid)
? { highWatermarkRowid: effectiveWatermarkRowid }
: {}),
...(Number.isFinite(effectiveWatermarkMs) ? { highWatermarkMs: effectiveWatermarkMs } : {}),
};
};
const dispatchFn: CatchupDispatchFn = async (row) => {
const payload = payloadByGuid.get(row.guid);
if (!payload) {
// Should not happen: the fetcher only emits rows it has stashed. But
// if a future caller wires a different fetcher and forgets to populate
// the map, we would otherwise silently no-op. Treat as a transient
// failure so the cursor stays put and operators see the warning.
warnLog(`imessage catchup: missing payload for guid=${row.guid}, skipping`);
return { ok: false };
}
try {
await dispatchPayload(payload);
return { ok: true };
} catch (err) {
warnLog(`imessage catchup: dispatch threw for guid=${row.guid}: ${String(err)}`);
return { ok: false };
}
};
return await performIMessageCatchup({
accountId,
config,
fetch: fetchFn,
dispatch: dispatchFn,
log,
warn: warnLog,
...(params.now ? { now: params.now() } : {}),
});
}

View File

@@ -0,0 +1,458 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import {
capFailureRetriesMap,
loadIMessageCatchupCursor,
performIMessageCatchup,
resolveCatchupConfig,
saveIMessageCatchupCursor,
type CatchupDispatchFn,
type CatchupFetchFn,
type IMessageCatchupRow,
} from "./catchup.js";
let tempStateDir: string;
let priorStateDir: string | undefined;
beforeAll(() => {
tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-catchup-"));
priorStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = tempStateDir;
});
afterAll(() => {
if (priorStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = priorStateDir;
}
fs.rmSync(tempStateDir, { recursive: true, force: true });
});
beforeEach(() => {
// Wipe per-account cursor state between tests so each test starts clean.
fs.rmSync(path.join(tempStateDir, "imessage", "catchup"), { recursive: true, force: true });
});
describe("resolveCatchupConfig", () => {
it("falls back to defaults when raw is undefined", () => {
const cfg = resolveCatchupConfig(undefined);
expect(cfg.enabled).toBe(false);
expect(cfg.maxAgeMinutes).toBe(120);
expect(cfg.perRunLimit).toBe(50);
expect(cfg.firstRunLookbackMinutes).toBe(30);
expect(cfg.maxFailureRetries).toBe(10);
});
it("clamps over-limit input to the documented ceiling", () => {
const cfg = resolveCatchupConfig({
enabled: true,
maxAgeMinutes: 99_999,
perRunLimit: 10_000,
maxFailureRetries: 50_000,
});
expect(cfg.enabled).toBe(true);
expect(cfg.maxAgeMinutes).toBe(720);
expect(cfg.perRunLimit).toBe(500);
expect(cfg.maxFailureRetries).toBe(1000);
});
it("clamps zero / negative input to 1", () => {
const cfg = resolveCatchupConfig({
maxAgeMinutes: 0,
perRunLimit: -10,
firstRunLookbackMinutes: -1,
maxFailureRetries: 0,
});
expect(cfg.maxAgeMinutes).toBe(1);
expect(cfg.perRunLimit).toBe(1);
expect(cfg.firstRunLookbackMinutes).toBe(1);
expect(cfg.maxFailureRetries).toBe(1);
});
});
describe("loadIMessageCatchupCursor / saveIMessageCatchupCursor", () => {
it("returns null when no cursor exists", async () => {
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor).toBeNull();
});
it("round-trips a cursor without failureRetries", async () => {
await saveIMessageCatchupCursor("primary", {
lastSeenMs: 1_700_000_000_000,
lastSeenRowid: 42,
});
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor).not.toBeNull();
expect(cursor?.lastSeenMs).toBe(1_700_000_000_000);
expect(cursor?.lastSeenRowid).toBe(42);
expect(cursor?.failureRetries).toBeUndefined();
});
it("round-trips a cursor with failureRetries", async () => {
await saveIMessageCatchupCursor("primary", {
lastSeenMs: 1_700_000_000_000,
lastSeenRowid: 42,
failureRetries: { "GUID-A": 3 },
});
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.failureRetries).toEqual({ "GUID-A": 3 });
});
it("drops malformed failureRetries entries on load", async () => {
await saveIMessageCatchupCursor("primary", {
lastSeenMs: 1_700_000_000_000,
lastSeenRowid: 42,
failureRetries: {
"GUID-A": 3,
"GUID-B": -1,
"GUID-C": Number.NaN,
} as Record<string, number>,
});
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.failureRetries).toEqual({ "GUID-A": 3 });
});
it("isolates state per accountId", async () => {
await saveIMessageCatchupCursor("a", { lastSeenMs: 100, lastSeenRowid: 1 });
await saveIMessageCatchupCursor("b", { lastSeenMs: 200, lastSeenRowid: 2 });
expect((await loadIMessageCatchupCursor("a"))?.lastSeenRowid).toBe(1);
expect((await loadIMessageCatchupCursor("b"))?.lastSeenRowid).toBe(2);
});
});
describe("capFailureRetriesMap", () => {
it("is identity below the cap", () => {
const map = { a: 1, b: 2 };
expect(capFailureRetriesMap(map, 10)).toEqual({ a: 1, b: 2 });
});
it("keeps the highest counts when over the cap", () => {
const map = { a: 1, b: 9, c: 5, d: 9 };
const capped = capFailureRetriesMap(map, 2);
// Both b and d at 9; tiebreak by guid string (alphabetical) → b, d
expect(Object.keys(capped).toSorted()).toEqual(["b", "d"]);
});
});
describe("performIMessageCatchup", () => {
const config = resolveCatchupConfig({ enabled: true });
const now = 1_700_001_000_000; // arbitrary fixed clock
function row(overrides: Partial<IMessageCatchupRow>): IMessageCatchupRow {
return {
guid: "GUID-X",
rowid: 1,
date: now - 60_000,
isFromMe: false,
...overrides,
};
}
function fetchOf(rows: IMessageCatchupRow[]): CatchupFetchFn {
return vi.fn(async () => ({ resolved: true, rows }));
}
function alwaysOk(): CatchupDispatchFn {
return vi.fn(async () => ({ ok: true }));
}
it("replays every fresh inbound row through dispatch and advances the cursor", async () => {
const dispatch = alwaysOk();
const fetch = fetchOf([
row({ guid: "A", rowid: 10, date: now - 30_000 }),
row({ guid: "B", rowid: 11, date: now - 20_000 }),
]);
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.querySucceeded).toBe(true);
expect(summary.replayed).toBe(2);
expect(summary.failed).toBe(0);
expect(summary.cursorAfter.lastSeenRowid).toBe(11);
expect(dispatch).toHaveBeenCalledTimes(2);
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.lastSeenRowid).toBe(11);
});
it("skips is_from_me rows but still advances the cursor past them", async () => {
const dispatch = alwaysOk();
const fetch = fetchOf([
row({ guid: "A", rowid: 10, isFromMe: true }),
row({ guid: "B", rowid: 11, isFromMe: false }),
]);
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.skippedFromMe).toBe(1);
expect(summary.replayed).toBe(1);
expect(summary.cursorAfter.lastSeenRowid).toBe(11);
expect(dispatch).toHaveBeenCalledTimes(1);
});
it("drops rows older than the maxAgeMinutes ceiling and advances past them", async () => {
const tightConfig = resolveCatchupConfig({ enabled: true, maxAgeMinutes: 1 });
const dispatch = alwaysOk();
const fetch = fetchOf([
row({ guid: "OLD", rowid: 10, date: now - 10 * 60_000 }), // 10 min old, > 1 min ceiling
row({ guid: "NEW", rowid: 11, date: now - 30_000 }),
]);
const summary = await performIMessageCatchup({
accountId: "primary",
config: tightConfig,
now,
fetch,
dispatch,
});
expect(summary.skippedPreCursor).toBe(1);
expect(summary.replayed).toBe(1);
expect(summary.cursorAfter.lastSeenRowid).toBe(11);
});
it("holds the cursor on the failing row while count < maxFailureRetries", async () => {
const dispatch = vi.fn<CatchupDispatchFn>(async () => ({ ok: false }));
const fetch = fetchOf([row({ guid: "A", rowid: 10 })]);
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.failed).toBe(1);
expect(summary.givenUp).toBe(0);
// Cursor clamps to `failed.rowid - 1` (== 9), strictly below the held
// failure, so the next pass refetches row 10 — and never leapfrogs it.
expect(summary.cursorAfter.lastSeenRowid).toBe(9);
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.failureRetries?.A).toBe(1);
expect(cursor?.lastSeenRowid).toBe(9);
});
it("crosses the maxFailureRetries ceiling, gives up, and advances past the wedged row", async () => {
const tightConfig = resolveCatchupConfig({ enabled: true, maxFailureRetries: 2 });
const dispatch = vi.fn<CatchupDispatchFn>(async () => ({ ok: false }));
const fetch = fetchOf([row({ guid: "A", rowid: 10 })]);
// First pass: count goes 0 → 1, cursor held below the failed row.
// The clamp is `failed.rowid - 1` (== 9), not the prior cursor (0), so
// the next pass refetches row 10 without re-walking older history.
await performIMessageCatchup({
accountId: "primary",
config: tightConfig,
now,
fetch,
dispatch,
});
expect((await loadIMessageCatchupCursor("primary"))?.lastSeenRowid).toBe(9);
// Second pass: count goes 1 → 2 (== ceiling), give up, cursor advances.
const fetch2 = fetchOf([row({ guid: "A", rowid: 10 })]);
const summary = await performIMessageCatchup({
accountId: "primary",
config: tightConfig,
now,
fetch: fetch2,
dispatch,
});
expect(summary.givenUp).toBe(1);
expect(summary.cursorAfter.lastSeenRowid).toBe(10);
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.failureRetries?.A).toBe(2);
});
it("treats already-given-up rows as skippedGivenUp without dispatching", async () => {
await saveIMessageCatchupCursor("primary", {
lastSeenMs: now - 60_000,
lastSeenRowid: 0,
failureRetries: { "WEDGED-1": 99 },
});
const dispatch = alwaysOk();
const fetch = fetchOf([row({ guid: "WEDGED-1", rowid: 5 }), row({ guid: "FRESH", rowid: 6 })]);
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.skippedGivenUp).toBe(1);
expect(summary.replayed).toBe(1);
expect(dispatch).toHaveBeenCalledTimes(1);
});
it("removes a guid from the retry map after a successful dispatch", async () => {
await saveIMessageCatchupCursor("primary", {
lastSeenMs: now - 60_000,
lastSeenRowid: 0,
failureRetries: { RETRYING: 1 },
});
const dispatch = alwaysOk();
const fetch = fetchOf([row({ guid: "RETRYING", rowid: 5 })]);
await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.failureRetries).toBeUndefined();
});
it("does NOT leapfrog a held failure when a later row in the same batch succeeds", async () => {
// Regression for #78649 cursor-leapfrog bug. Prior to the fix the loop
// advanced lastSeenRowid on every successful row, so a held failure at
// rowid 10 followed by a success at rowid 11 would persist the cursor
// at 11 — and the next pass would filter row 10 out via `row.rowid <=
// sinceRowid` and never retry it. With the fix in place the cursor is
// clamped to `earliestHeldFailureRow.rowid - 1` (== 9) so the next pass
// refetches row 10.
let dispatchCount = 0;
const dispatch = vi.fn<CatchupDispatchFn>(async (row) => {
dispatchCount += 1;
if (row.guid === "A") {
return { ok: false };
}
return { ok: true };
});
const fetch = fetchOf([
row({ guid: "A", rowid: 10, date: now - 40_000 }),
row({ guid: "B", rowid: 11, date: now - 30_000 }),
]);
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.failed).toBe(1);
expect(summary.replayed).toBe(1);
expect(summary.givenUp).toBe(0);
// Cursor must not leapfrog the held failure at rowid 10. The persisted
// cursor lands at rowid 9 so the next pass refetches row 10.
expect(summary.cursorAfter.lastSeenRowid).toBe(9);
expect(dispatchCount).toBe(2);
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.lastSeenRowid).toBe(9);
expect(cursor?.failureRetries?.A).toBe(1);
});
it("advances the cursor past parser-rejected rows via the fetch high-watermark", async () => {
// Regression: without a high-watermark from the fetcher, an unparseable
// row never reaches the loop, so the cursor never advances past it and
// the next pass re-fetches and re-drops the same broken row forever.
// The bridge probes raw `id` / `created_at` per row and emits a
// `highWatermarkRowid` / `highWatermarkMs` floor so the loop can advance
// the cursor even when every fetched row fails the payload parser.
const dispatch = vi.fn<CatchupDispatchFn>(async () => ({ ok: true }));
const fetch: CatchupFetchFn = vi.fn(async () => ({
resolved: true,
rows: [],
highWatermarkRowid: 42,
highWatermarkMs: now - 5_000,
}));
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.querySucceeded).toBe(true);
expect(summary.replayed).toBe(0);
expect(summary.fetchedCount).toBe(0);
expect(summary.cursorAfter.lastSeenRowid).toBe(42);
expect(dispatch).not.toHaveBeenCalled();
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.lastSeenRowid).toBe(42);
expect(cursor?.lastSeenMs).toBe(now - 5_000);
});
it("does not let the high-watermark leapfrog a held failure", async () => {
// The fetcher's watermark is a floor for cursor advance, but a held
// failure must still clamp the cursor below the failed row even when
// the fetcher reports a higher watermark.
const dispatch = vi.fn<CatchupDispatchFn>(async () => ({ ok: false }));
const fetch: CatchupFetchFn = vi.fn(async () => ({
resolved: true,
rows: [row({ guid: "A", rowid: 10, date: now - 40_000 })],
highWatermarkRowid: 99,
highWatermarkMs: now - 100,
}));
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.failed).toBe(1);
// Even though the fetcher reports watermark=99, the held failure at
// rowid 10 clamps the cursor at 9.
expect(summary.cursorAfter.lastSeenRowid).toBe(9);
});
it("returns querySucceeded=false and preserves the cursor on fetch failure", async () => {
await saveIMessageCatchupCursor("primary", { lastSeenMs: now - 60_000, lastSeenRowid: 7 });
const dispatch = alwaysOk();
const fetch = vi.fn<CatchupFetchFn>(async () => {
throw new Error("imsg rpc closed");
});
const warn = vi.fn();
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
warn,
});
expect(summary.querySucceeded).toBe(false);
expect(summary.replayed).toBe(0);
expect(dispatch).not.toHaveBeenCalled();
expect(warn).toHaveBeenCalledWith(expect.stringMatching(/fetch failed/));
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.lastSeenRowid).toBe(7);
});
});

View File

@@ -0,0 +1,468 @@
import { createHash } from "node:crypto";
import path from "node:path";
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
// iMessage inbound catchup. When the gateway is offline (crash, restart, mac
// sleep, machine off), `imsg watch` resumes from current state and ignores
// anything that landed in chat.db while the bridge was disconnected.
// Without a recovery pass, those messages are permanently lost.
//
// This module mirrors the design of the retired BlueBubbles catchup
// (`extensions/bluebubbles/src/catchup.ts` in commit 07bf572f35^), adapted
// for the imsg JSON-RPC `messages.history` fetch path. The replay loop is
// pluggable via the `dispatch` callback so the same `evaluateIMessageInbound`
// + `dispatchInboundMessage` path used by the live `imsg watch` loop runs
// unchanged on replayed rows.
//
// See https://github.com/openclaw/openclaw/issues/78649 for design discussion.
const DEFAULT_MAX_AGE_MINUTES = 120;
const MAX_MAX_AGE_MINUTES = 12 * 60;
const DEFAULT_PER_RUN_LIMIT = 50;
const MAX_PER_RUN_LIMIT = 500;
const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30;
const DEFAULT_MAX_FAILURE_RETRIES = 10;
const MAX_MAX_FAILURE_RETRIES = 1_000;
// Defense-in-depth bound on the retry map. A storm of unique failing GUIDs
// should not balloon the cursor file. When over the bound, keep only the
// highest-count entries (closest to give-up) and drop the rest.
const MAX_FAILURE_RETRY_MAP_SIZE = 5_000;
export type IMessageCatchupConfig = {
enabled?: boolean;
maxAgeMinutes?: number;
perRunLimit?: number;
firstRunLookbackMinutes?: number;
maxFailureRetries?: number;
};
export type IMessageCatchupCursor = {
/** Timestamp (ms since epoch) of the highest-watermark message we processed. */
lastSeenMs: number;
/** ROWID of the highest-watermark processed message. Monotonic in chat.db. */
lastSeenRowid: number;
/** UTC ms timestamp of the most recent cursor write. */
updatedAt: number;
/**
* Per-GUID failure counter, preserved across runs. Two states:
* - `1 <= count < maxFailureRetries`: the GUID is still retrying and
* continues to hold the cursor back.
* - `count >= maxFailureRetries`: catchup has given up on the GUID. The
* message is skipped on sight (no dispatch attempt) and the cursor no
* longer waits on it. Entry stays in the map until the cursor naturally
* advances past the message's timestamp.
*
* A successful dispatch removes the entry. Optional on the persisted shape
* so older cursor files without this field load cleanly.
*/
failureRetries?: Record<string, number>;
};
export type IMessageCatchupRow = {
guid: string;
rowid: number;
/** Timestamp in ms since epoch. */
date: number;
isFromMe?: boolean;
};
export type IMessageCatchupSummary = {
querySucceeded: boolean;
fetchedCount: number;
replayed: number;
skippedFromMe: number;
skippedPreCursor: number;
/**
* Messages whose GUID was already recorded as "given up" from a prior
* run (count >= `maxFailureRetries`). Skipped without a dispatch attempt
* so the cursor can advance past them.
*/
skippedGivenUp: number;
failed: number;
/**
* Messages that crossed the `maxFailureRetries` ceiling on this run. Each
* transition triggers a `warn` log line. Already-given-up messages in
* subsequent runs count under `skippedGivenUp`, not here.
*/
givenUp: number;
cursorBefore: { lastSeenMs: number; lastSeenRowid: number } | null;
cursorAfter: { lastSeenMs: number; lastSeenRowid: number };
windowStartMs: number;
windowEndMs: number;
};
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
if (env.OPENCLAW_STATE_DIR?.trim()) {
return resolveStateDir(env);
}
// Default test isolation: per-pid tmpdir. Mirrors the BB catchup pattern so
// the tmpdir-path-guard test that flags dynamic template-literal suffixes
// on os.tmpdir() paths stays green.
if (env.VITEST || env.NODE_ENV === "test") {
const name = "openclaw-vitest-" + process.pid;
return path.join(resolvePreferredOpenClawTmpDir(), name);
}
return resolveStateDir(env);
}
function resolveCursorFilePath(accountId: string): string {
// Layout matches inbound-dedupe / persisted-echo-cache so a replayed GUID
// is recognized by the existing dedupe after catchup re-feeds the message
// through the live dispatch path.
const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account";
const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12);
return path.join(resolveStateDirFromEnv(), "imessage", "catchup", `${safePrefix}__${hash}.json`);
}
function sanitizeFailureRetriesInput(raw: unknown): Record<string, number> {
if (!raw || typeof raw !== "object") {
return {};
}
const out: Record<string, number> = {};
for (const [guid, count] of Object.entries(raw as Record<string, unknown>)) {
if (!guid || typeof guid !== "string") {
continue;
}
if (typeof count !== "number" || !Number.isFinite(count) || count <= 0) {
continue;
}
out[guid] = Math.floor(count);
}
return out;
}
/**
* Cursor file path: `<openclawStateDir>/imessage/catchup/<safePrefix>__<sha256[:12]>.json`.
* `openclawStateDir` resolves through `OPENCLAW_STATE_DIR` (or the plugin-sdk default,
* `~/.openclaw`). On a default install the cursor lands at
* `~/.openclaw/imessage/catchup/<safePrefix>__<sha256[:12]>.json`.
*/
export async function loadIMessageCatchupCursor(
accountId: string,
): Promise<IMessageCatchupCursor | null> {
const filePath = resolveCursorFilePath(accountId);
const { value } = await readJsonFileWithFallback<IMessageCatchupCursor | null>(filePath, null);
if (!value || typeof value !== "object") {
return null;
}
if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) {
return null;
}
if (typeof value.lastSeenRowid !== "number" || !Number.isFinite(value.lastSeenRowid)) {
return null;
}
const failureRetries = sanitizeFailureRetriesInput(value.failureRetries);
const hasRetries = Object.keys(failureRetries).length > 0;
return {
lastSeenMs: value.lastSeenMs,
lastSeenRowid: value.lastSeenRowid,
updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0,
...(hasRetries ? { failureRetries } : {}),
};
}
export async function saveIMessageCatchupCursor(
accountId: string,
next: { lastSeenMs: number; lastSeenRowid: number; failureRetries?: Record<string, number> },
): Promise<void> {
const filePath = resolveCursorFilePath(accountId);
const sanitized = sanitizeFailureRetriesInput(next.failureRetries);
const hasRetries = Object.keys(sanitized).length > 0;
const cursor: IMessageCatchupCursor = {
lastSeenMs: next.lastSeenMs,
lastSeenRowid: next.lastSeenRowid,
updatedAt: Date.now(),
...(hasRetries ? { failureRetries: sanitized } : {}),
};
await writeJsonFileAtomically(filePath, cursor);
}
/**
* Bound the retry map so a pathological storm of unique failing GUIDs
* cannot grow the cursor file without limit. Keeps the `maxSize` entries
* with the highest counts (closest to give-up) when over the bound.
*/
export function capFailureRetriesMap(
map: Record<string, number>,
maxSize: number = MAX_FAILURE_RETRY_MAP_SIZE,
): Record<string, number> {
const entries = Object.entries(map);
if (entries.length <= maxSize) {
return map;
}
// Sort by count desc; stable tiebreak on guid string so the retained set
// is deterministic across runs (important for cursor-file diffing during
// debugging).
entries.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]));
const capped: Record<string, number> = {};
for (let i = 0; i < maxSize; i++) {
const [guid, count] = entries[i];
capped[guid] = count;
}
return capped;
}
export type ResolvedCatchupConfig = {
enabled: boolean;
maxAgeMinutes: number;
perRunLimit: number;
firstRunLookbackMinutes: number;
maxFailureRetries: number;
};
function clampInt(value: number | undefined, min: number, max: number, fallback: number): number {
if (typeof value !== "number" || !Number.isFinite(value)) {
return fallback;
}
return Math.min(max, Math.max(min, Math.floor(value)));
}
export function resolveCatchupConfig(
raw: IMessageCatchupConfig | undefined,
): ResolvedCatchupConfig {
return {
enabled: Boolean(raw?.enabled),
maxAgeMinutes: clampInt(raw?.maxAgeMinutes, 1, MAX_MAX_AGE_MINUTES, DEFAULT_MAX_AGE_MINUTES),
perRunLimit: clampInt(raw?.perRunLimit, 1, MAX_PER_RUN_LIMIT, DEFAULT_PER_RUN_LIMIT),
firstRunLookbackMinutes: clampInt(
raw?.firstRunLookbackMinutes,
1,
MAX_MAX_AGE_MINUTES,
DEFAULT_FIRST_RUN_LOOKBACK_MINUTES,
),
maxFailureRetries: clampInt(
raw?.maxFailureRetries,
1,
MAX_MAX_FAILURE_RETRIES,
DEFAULT_MAX_FAILURE_RETRIES,
),
};
}
export type CatchupFetchFn = (params: {
sinceMs: number;
sinceRowid: number;
limit: number;
}) => Promise<{
resolved: boolean;
rows: IMessageCatchupRow[];
/**
* Highest `rowid` the fetcher saw in the raw response, including rows it
* dropped (parser failure, schema drift, missing fields). The replay loop
* uses this as a floor for the cursor advance so a single unparseable row
* cannot stall catchup forever — without this, the bridge silently
* dropping a row would mean the next pass re-fetches the same broken row
* indefinitely. Optional so test fetchers that only emit fully-valid rows
* can omit it; when omitted, the cursor advance falls back to the rows
* the loop actually processed.
*/
highWatermarkRowid?: number;
/** Companion to `highWatermarkRowid` — highest `date` seen in the raw response. */
highWatermarkMs?: number;
}>;
export type CatchupDispatchFn = (row: IMessageCatchupRow) => Promise<{ ok: boolean }>;
export type PerformCatchupParams = {
accountId: string;
config: ResolvedCatchupConfig;
now?: number;
fetch: CatchupFetchFn;
dispatch: CatchupDispatchFn;
log?: (message: string) => void;
warn?: (message: string) => void;
};
/**
* One catchup pass. Loads the cursor, fetches `messages.history`, replays
* each row through `dispatch`, advances the cursor on success / give-up,
* persists the cursor, returns a summary.
*
* The fetch and dispatch functions are injected so this loop is unit-testable
* without standing up an `imsg` daemon. The wiring in `monitor-provider.ts`
* passes the live `client.request("messages.history", ...)` adapter as
* `fetch` and the `evaluateIMessageInbound` + `dispatchInboundMessage`
* pipeline as `dispatch`.
*/
export async function performIMessageCatchup(
params: PerformCatchupParams,
): Promise<IMessageCatchupSummary> {
const now = params.now ?? Date.now();
const cfg = params.config;
const cursor = await loadIMessageCatchupCursor(params.accountId);
const lookbackMs =
cursor === null ? cfg.firstRunLookbackMinutes * 60_000 : cfg.maxAgeMinutes * 60_000;
const ageBoundMs = now - cfg.maxAgeMinutes * 60_000;
const windowStartMs = Math.max(cursor?.lastSeenMs ?? now - lookbackMs, ageBoundMs);
const windowEndMs = now;
const sinceRowid = cursor?.lastSeenRowid ?? 0;
const summary: IMessageCatchupSummary = {
querySucceeded: false,
fetchedCount: 0,
replayed: 0,
skippedFromMe: 0,
skippedPreCursor: 0,
skippedGivenUp: 0,
failed: 0,
givenUp: 0,
cursorBefore: cursor
? { lastSeenMs: cursor.lastSeenMs, lastSeenRowid: cursor.lastSeenRowid }
: null,
cursorAfter: {
lastSeenMs: cursor?.lastSeenMs ?? windowStartMs,
lastSeenRowid: cursor?.lastSeenRowid ?? 0,
},
windowStartMs,
windowEndMs,
};
let fetchResult: Awaited<ReturnType<CatchupFetchFn>>;
try {
fetchResult = await params.fetch({
sinceMs: windowStartMs,
sinceRowid,
limit: cfg.perRunLimit,
});
} catch (err) {
params.warn?.(`imessage catchup: fetch failed: ${String(err)}`);
return summary;
}
if (!fetchResult.resolved) {
params.warn?.(`imessage catchup: fetch returned unresolved result`);
return summary;
}
summary.querySucceeded = true;
summary.fetchedCount = fetchResult.rows.length;
// Stable order: process oldest-first so the cursor advances monotonically
// and a mid-run failure leaves a usable lastSeenRowid for the next pass.
const rows = fetchResult.rows.toSorted((a, b) => a.rowid - b.rowid);
const failureRetries = { ...cursor?.failureRetries };
// Two distinct watermarks: `highWatermark*` is the high point we reached on
// any row we processed cleanly (success / skipFromMe / skipPreCursor /
// skipGivenUp / give-up), and `earliestHeldFailureRow` is the smallest-rowid
// row whose dispatch failed below the retry ceiling on this pass. When a
// failure is held, the persisted cursor must NOT leapfrog it — otherwise
// the next pass would filter the failed row out via `row.rowid <= sinceRowid`
// and never retry. Already-successful rows above the held failure get
// re-replayed on the next pass and absorbed by the inbound-dedupe cache.
const cursorBeforeMs = cursor?.lastSeenMs ?? windowStartMs;
const cursorBeforeRowid = cursor?.lastSeenRowid ?? 0;
let highWatermarkMs = cursorBeforeMs;
let highWatermarkRowid = cursorBeforeRowid;
let earliestHeldFailureRow: IMessageCatchupRow | null = null;
for (const row of rows) {
if (row.rowid <= sinceRowid) {
summary.skippedPreCursor += 1;
continue;
}
if (row.date < ageBoundMs) {
// Row predates the recency ceiling. Skip but advance the cursor so we
// don't re-fetch it next pass.
summary.skippedPreCursor += 1;
highWatermarkMs = Math.max(highWatermarkMs, row.date);
highWatermarkRowid = Math.max(highWatermarkRowid, row.rowid);
continue;
}
if (row.isFromMe) {
summary.skippedFromMe += 1;
highWatermarkMs = Math.max(highWatermarkMs, row.date);
highWatermarkRowid = Math.max(highWatermarkRowid, row.rowid);
continue;
}
const priorCount = failureRetries[row.guid] ?? 0;
if (priorCount >= cfg.maxFailureRetries) {
summary.skippedGivenUp += 1;
highWatermarkMs = Math.max(highWatermarkMs, row.date);
highWatermarkRowid = Math.max(highWatermarkRowid, row.rowid);
continue;
}
let dispatched: { ok: boolean };
try {
dispatched = await params.dispatch(row);
} catch (err) {
params.warn?.(`imessage catchup: dispatch threw for guid=${row.guid}: ${String(err)}`);
dispatched = { ok: false };
}
if (dispatched.ok) {
summary.replayed += 1;
delete failureRetries[row.guid];
highWatermarkMs = Math.max(highWatermarkMs, row.date);
highWatermarkRowid = Math.max(highWatermarkRowid, row.rowid);
continue;
}
const nextCount = priorCount + 1;
failureRetries[row.guid] = nextCount;
summary.failed += 1;
if (nextCount >= cfg.maxFailureRetries) {
summary.givenUp += 1;
params.warn?.(
`imessage catchup: giving up on guid=${row.guid} after ${nextCount} failures; advancing cursor past it`,
);
// Cursor advances past the wedged guid so subsequent passes can make
// progress. Already-given-up entries in future runs count under
// skippedGivenUp.
highWatermarkMs = Math.max(highWatermarkMs, row.date);
highWatermarkRowid = Math.max(highWatermarkRowid, row.rowid);
continue;
}
// Below the retry ceiling: hold the cursor BEFORE this row so the next
// pass retries it. Rows are sorted ascending, so the first held failure
// is the lowest-rowid one — clamp the persisted cursor at its rowid - 1.
if (earliestHeldFailureRow === null || row.rowid < earliestHeldFailureRow.rowid) {
earliestHeldFailureRow = row;
}
}
// Apply the bridge's high-watermark floor. The bridge tracks the highest
// rowid in the raw `messages.history` response, including rows it had to
// drop (parser failure, schema drift). Without this, an unparseable row
// could permanently stall the cursor: it never reaches the loop, the loop
// never advances past it, the next pass re-fetches the same broken row.
// The floor only applies when no failure is held — a held failure has
// tighter cursor-cap semantics that must win.
if (earliestHeldFailureRow === null) {
if (typeof fetchResult.highWatermarkMs === "number") {
highWatermarkMs = Math.max(highWatermarkMs, fetchResult.highWatermarkMs);
}
if (typeof fetchResult.highWatermarkRowid === "number") {
highWatermarkRowid = Math.max(highWatermarkRowid, fetchResult.highWatermarkRowid);
}
}
let lastSeenMs: number;
let lastSeenRowid: number;
if (earliestHeldFailureRow !== null) {
// Hold cursor strictly below the failed row. Already-successful rows
// above it get re-replayed next pass; the inbound-dedupe cache absorbs
// the duplicate dispatch.
lastSeenMs = Math.max(cursorBeforeMs, earliestHeldFailureRow.date - 1);
lastSeenRowid = Math.max(cursorBeforeRowid, earliestHeldFailureRow.rowid - 1);
} else {
lastSeenMs = highWatermarkMs;
lastSeenRowid = highWatermarkRowid;
}
const capped = capFailureRetriesMap(failureRetries);
summary.cursorAfter = { lastSeenMs, lastSeenRowid };
await saveIMessageCatchupCursor(params.accountId, {
lastSeenMs,
lastSeenRowid,
failureRetries: capped,
});
if (summary.replayed > 0 || summary.failed > 0 || summary.givenUp > 0) {
params.log?.(
`imessage catchup: replayed=${summary.replayed} skippedFromMe=${summary.skippedFromMe} skippedGivenUp=${summary.skippedGivenUp} failed=${summary.failed} givenUp=${summary.givenUp} fetchedCount=${summary.fetchedCount}`,
);
}
return summary;
}

View File

@@ -125,6 +125,35 @@ describe("iMessage sent-message echo cache", () => {
expect(dirMode).toBe(0o700);
});
it("retains entries written hours earlier so catchup replay sees own outbound rows", () => {
// Catchup's default maxAgeMinutes is 120 (2h). The persisted-echo TTL must
// be >= that window, otherwise the agent's own outbound rows from before
// a gateway gap fall out of dedupe before catchup re-feeds the inbound
// rows around them — and the agent's replies to itself land back in the
// inbound pipeline as if they were external sends. Regression guard for
// the echo-cache retention extension that ships with #78649.
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-ttl-"));
tempDirs.push(stateDir);
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
rememberPersistedIMessageEcho({
scope: "acct:imessage:+1555",
text: "agent reply from before the gap",
messageId: "guid-pre-gap",
});
// Advance 3 hours — past the legacy 2-min TTL but well within the 12 h
// retention required by the maxAgeMinutes=720 clamp.
vi.setSystemTime(new Date("2026-05-08T15:00:00Z"));
const cache = createSentMessageCache();
expect(cache.has("acct:imessage:+1555", { text: "agent reply from before the gap" })).toBe(
true,
);
expect(cache.has("acct:imessage:+1555", { messageId: "guid-pre-gap" })).toBe(true);
});
it("clamps pre-existing sent-echoes.jsonl from older 0644/0755 to 0600/0700", () => {
// Older gateway versions wrote with default modes. After upgrade, the next
// remember must clamp the existing file/dir back to owner-only.

View File

@@ -50,6 +50,8 @@ import {
import { sendMessageIMessage } from "../send.js";
import { normalizeIMessageHandle } from "../targets.js";
import { attachIMessageMonitorAbortHandler } from "./abort-handler.js";
import { runIMessageCatchup } from "./catchup-bridge.js";
import { resolveCatchupConfig } from "./catchup.js";
import { combineIMessagePayloads } from "./coalesce.js";
import { createIMessageEchoCachingSend, deliverReplies } from "./deliver.js";
import { createSentMessageCache } from "./echo-cache.js";
@@ -865,6 +867,35 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
return;
}
// Catchup runs once between watch.subscribe and the live dispatch loop.
// Anything that arrives during the catchup pass itself flows through
// `handleMessage` -> `handleMessageNow`; the inbound-dedupe cache absorbs
// any overlap with replayed rows. Disabled by default — opt-in via
// `channels.imessage.catchup.enabled`. See issue #78649.
const catchupCfg = resolveCatchupConfig(imessageCfg.catchup);
if (catchupCfg.enabled && !abort?.aborted) {
try {
await runIMessageCatchup({
client: activeClient,
accountId: accountInfo.accountId,
config: catchupCfg,
includeAttachments,
// Catchup bypasses the inbound debouncer so each row is awaited
// serially and dispatch failure can hold the cursor. Split-sends
// from before the gateway gap therefore arrive as separate turns
// rather than coalesced — same behavior the retired BlueBubbles
// catchup had. Live notifications continue to flow through the
// debouncer.
dispatchPayload: (message) => handleMessageNow(message),
runtime,
});
} catch (err) {
// Catchup is opt-in recovery — surface the error but do not block the
// monitor. The live dispatch loop is already up and running.
runtime.error?.(`imessage catchup: pass failed: ${String(err)}`);
}
}
try {
await activeClient.waitForClose();
} catch (err) {

View File

@@ -10,7 +10,13 @@ type PersistedEchoEntry = {
timestamp: number;
};
const PERSISTED_ECHO_TTL_MS = 2 * 60 * 1000;
// 12h covers the maximum `channels.imessage.catchup.maxAgeMinutes` clamp (720
// minutes). Without this, the live path's previous 2-minute window was
// shorter than any realistic catchup window — own outbound rows from before
// a gateway gap would fall out of the dedupe set before catchup could replay
// the inbound rows around them, and the agent's own messages would land back
// in the inbound pipeline as if they were external sends.
const PERSISTED_ECHO_TTL_MS = 12 * 60 * 60 * 1000;
const MAX_PERSISTED_ECHO_ENTRIES = 256;
// sent-echoes.jsonl carries scope keys + outbound message text + messageIds.

File diff suppressed because one or more lines are too long

View File

@@ -116,6 +116,39 @@ export type IMessageAccountConfig = {
systemPrompt?: string;
}
>;
/**
* Catchup: replay inbound messages that arrived in `chat.db` while the
* gateway was offline (crash, restart, mac sleep). Disabled by default.
* See https://github.com/openclaw/openclaw/issues/78649.
*/
catchup?: {
/** Master switch. Default `false`. */
enabled?: boolean;
/**
* Maximum age of replayable messages in minutes. Messages older than
* `now - maxAgeMinutes` are skipped even when the cursor is older.
* Defense against runaway replay (the inverse of #62761). Default
* `120` (2 h). Clamp `[1, 720]`.
*/
maxAgeMinutes?: number;
/**
* Maximum messages to replay per catchup pass. Default `50`. Clamp
* `[1, 500]`.
*/
perRunLimit?: number;
/**
* On first run when no cursor exists, look back this many minutes.
* Default `30`.
*/
firstRunLookbackMinutes?: number;
/**
* Per-message retry ceiling. After this many consecutive failed
* dispatch attempts against the same message guid, catchup logs a
* `warn` and force-advances the cursor past the wedged message.
* Default `10`. Clamp `[1, 1000]`.
*/
maxFailureRetries?: number;
};
/** Heartbeat visibility settings for this channel. */
heartbeat?: ChannelHeartbeatVisibilityConfig;
/** Channel health monitor overrides for this channel/account. */

View File

@@ -1438,6 +1438,16 @@ export const IMessageAccountSchemaBase = z
.optional(),
)
.optional(),
catchup: z
.object({
enabled: z.boolean().optional(),
maxAgeMinutes: z.number().int().min(1).max(720).optional(),
perRunLimit: z.number().int().min(1).max(500).optional(),
firstRunLookbackMinutes: z.number().int().min(1).max(720).optional(),
maxFailureRetries: z.number().int().min(1).max(1000).optional(),
})
.strict()
.optional(),
heartbeat: ChannelHeartbeatVisibilitySchema,
healthMonitor: ChannelHealthMonitorSchema,
responsePrefix: z.string().optional(),