Migrate iMessage monitor state to SQLite (#88797)

* refactor: move imessage monitor state to sqlite

* test: use OpenClaw temp root in iMessage state helper

* test: avoid pending promise lint in chat tests

* test: harden gateway ci flakes

* test: align session list merge expectation
This commit is contained in:
Peter Steinberger
2026-06-01 00:19:51 +01:00
committed by GitHub
parent 12cf34a8ea
commit 3491834d49
33 changed files with 1691 additions and 745 deletions

View File

@@ -248,7 +248,7 @@ iMessage catchup is now available as an opt-in feature on the bundled plugin. On
There is no supported BlueBubbles runtime to switch back to. If iMessage verification fails, set `channels.imessage.enabled: false`, restart the Gateway, fix the `imsg` blocker, and retry the cutover.
The reply cache lives at `~/.openclaw/state/imessage/reply-cache.jsonl` (mode `0600`, parent dir `0700`). It is safe to delete if you want a clean slate.
The reply cache lives in SQLite plugin state. `openclaw doctor --fix` imports and archives the old `imessage/reply-cache.jsonl` sidecar when present.
## Related

View File

@@ -533,7 +533,7 @@ When `imsg launch` is running and `openclaw channels status --probe` reports `pr
</Accordion>
<Accordion title="Message IDs">
Inbound iMessage context includes both short `MessageSid` values and full message GUIDs when available. Short IDs are scoped to the recent in-memory reply cache and are checked against the current chat before use. If a short ID has expired or belongs to another chat, retry with the full `MessageSidFull`.
Inbound iMessage context includes both short `MessageSid` values and full message GUIDs when available. Short IDs are scoped to the recent SQLite-backed reply cache and are checked against the current chat before use. If a short ID has expired or belongs to another chat, retry with the full `MessageSidFull`.
</Accordion>
@@ -714,7 +714,7 @@ Each replayed row is fed through the live dispatch path (`evaluateIMessageInboun
### Cursor and retry semantics
Catchup keeps a per-account cursor at `<openclawStateDir>/imessage/catchup/<account>__<hash>.json` (the OpenClaw state dir defaults to `~/.openclaw`, overridable with `OPENCLAW_STATE_DIR`):
Catchup keeps a per-account cursor in SQLite plugin state:
```json
{
@@ -729,6 +729,7 @@ Catchup keeps a per-account cursor at `<openclawStateDir>/imessage/catchup/<acco
- After the startup catchup query succeeds, later live-handled rows also advance the same cursor so a gateway restart does not replay messages that were already handled live. Live cursor writes do not jump past catchup failures that are still below `maxFailureRetries`.
- After `maxFailureRetries` consecutive throws against the same `guid`, catchup logs a `warn` and force-advances the cursor past the wedged message so subsequent startups can make progress.
- Already-given-up guids are skipped on sight (no dispatch attempt) on later runs and counted under `skippedGivenUp` in the run summary.
- `openclaw doctor --fix` imports legacy `<openclawStateDir>/imessage/catchup/*.json` cursor files into SQLite plugin state and archives the old files.
### Operator-visible signals

View File

@@ -0,0 +1 @@
export { detectIMessageLegacyStateMigrations } from "./src/state-migrations.js";

View File

@@ -12,6 +12,9 @@
"./index.ts"
],
"setupEntry": "./setup-entry.ts",
"setupFeatures": {
"legacyStateMigrations": true
},
"channel": {
"id": "imessage",
"label": "iMessage",

View File

@@ -2,8 +2,15 @@ import { defineBundledChannelSetupEntry } from "openclaw/plugin-sdk/channel-entr
export default defineBundledChannelSetupEntry({
importMetaUrl: import.meta.url,
features: {
legacyStateMigrations: true,
},
plugin: {
specifier: "./api.js",
exportName: "imessageSetupPlugin",
},
legacyStateMigrations: {
specifier: "./legacy-state-migrations-api.js",
exportName: "detectIMessageLegacyStateMigrations",
},
});

View File

@@ -1,7 +1,4 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
resetIMessageShortIdState,
findLatestIMessageEntryForChat,
@@ -9,41 +6,15 @@ import {
rememberIMessageReplyCache,
resolveIMessageMessageId,
} from "./monitor-reply-cache.js";
// Isolate from any live ~/.openclaw/imessage/reply-cache.jsonl that the
// developer might have from a running gateway. Without this, the on-disk
// hydrate path picks up production data and tests get cross-pollinated.
//
// vi.stubEnv defaults to per-test scoping in this codebase, which means a
// beforeAll-only stub gets unstubbed between tests. Mutate process.env
// directly so the override holds across the whole file.
let tempStateDir: string;
let priorStateDir: string | undefined;
beforeAll(() => {
tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-reply-cache-"));
priorStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = tempStateDir;
});
afterAll(() => {
if (priorStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = priorStateDir;
}
fs.rmSync(tempStateDir, { recursive: true, force: true });
});
import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js";
beforeEach(() => {
installIMessageStateRuntimeForTest();
resetIMessageShortIdState();
// Belt-and-suspenders: also nuke the persisted file directly. The
// _reset helper does this when OPENCLAW_STATE_DIR is set, but explicitly
// clearing here protects the test from any future refactor of _reset's
// gating logic.
try {
fs.rmSync(path.join(tempStateDir, "imessage", "reply-cache.jsonl"), { force: true });
} catch {
// best-effort
}
});
afterEach(() => {
vi.useRealTimers();
});
describe("imessage short message id resolution", () => {
@@ -271,11 +242,6 @@ describe("findLatestIMessageEntryForChat", () => {
});
it("never crosses account boundaries", () => {
// Diagnostic: verify the temp-dir env stub is actually visible.
expect(process.env.OPENCLAW_STATE_DIR).toBe(tempStateDir);
const cachePath = path.join(tempStateDir, "imessage", "reply-cache.jsonl");
expect(fs.existsSync(cachePath)).toBe(false);
rememberIMessageReplyCache({
accountId: "other-account",
messageId: "foreign-account",
@@ -342,57 +308,8 @@ describe("findLatestIMessageEntryForChat", () => {
});
});
describe("reply cache disk permissions", () => {
it("clamps pre-existing reply-cache.jsonl from older 0644/0755 to 0600/0700", () => {
// Older gateway versions wrote with default modes. Every append must
// clamp existing files back to owner-only — appendFileSync's `mode`
// only applies on creation, so a chmod-on-create-only path would leave
// the upgrade case world-readable forever.
const imsgDir = path.join(tempStateDir, "imessage");
fs.mkdirSync(imsgDir, { recursive: true, mode: 0o755 });
const cacheFile = path.join(imsgDir, "reply-cache.jsonl");
fs.writeFileSync(cacheFile, "", { mode: 0o644 });
fs.chmodSync(imsgDir, 0o755);
fs.chmodSync(cacheFile, 0o644);
rememberIMessageReplyCache({
accountId: "default",
messageId: "clamp-test-guid",
chatIdentifier: "+12069106512",
timestamp: Date.now(),
});
const fileMode = fs.statSync(cacheFile).mode & 0o777;
const dirMode = fs.statSync(imsgDir).mode & 0o777;
expect(fileMode).toBe(0o600);
expect(dirMode).toBe(0o700);
});
it("writes the cache file 0600 and parent dir 0700", () => {
// Map gateway-allocated short-ids to message guids; a hostile same-UID
// process reading or writing this file could (a) enumerate active
// conversation guids or (b) inject lines so a future shortId resolves
// to an attacker-chosen guid. Owner-only mode is the mitigation.
rememberIMessageReplyCache({
accountId: "default",
messageId: "perm-test-guid",
chatIdentifier: "+12069106512",
timestamp: Date.now(),
});
const cacheFile = path.join(tempStateDir, "imessage", "reply-cache.jsonl");
const cacheDir = path.dirname(cacheFile);
expect(fs.existsSync(cacheFile)).toBe(true);
const fileMode = fs.statSync(cacheFile).mode & 0o777;
const dirMode = fs.statSync(cacheDir).mode & 0o777;
expect(fileMode).toBe(0o600);
expect(dirMode).toBe(0o700);
});
});
describe("hydrate-on-resolve (post-restart short-id persistence)", () => {
it("hydrates the on-disk JSONL before resolving a short id whose mapping predates this run", () => {
it("hydrates SQLite state before resolving a short id whose mapping predates this run", () => {
// Issue-then-restart contract: a shortId we issued before a gateway
// restart must still resolve afterwards. The first resolve call after
// process boot would otherwise miss the persisted mapping because the
@@ -407,15 +324,9 @@ describe("hydrate-on-resolve (post-restart short-id persistence)", () => {
});
expect(issued.shortId).not.toBe("");
// Simulate a restart: clear the in-memory state but leave the JSONL on
// disk. resetIMessageShortIdState only deletes the persisted file when
// OPENCLAW_STATE_DIR is set, so we have to keep the file ourselves
// since this test runs under the suite's temp state dir.
const cachePath = path.join(tempStateDir, "imessage", "reply-cache.jsonl");
const persisted = fs.readFileSync(cachePath, "utf8");
resetIMessageShortIdState();
fs.mkdirSync(path.dirname(cachePath), { recursive: true });
fs.writeFileSync(cachePath, persisted, "utf8");
// Simulate a restart: clear only the process-local maps and leave the
// SQLite plugin-state rows intact.
resetIMessageShortIdState({ clearPersistent: false });
// Now resolve the short id we issued before the "restart". Without the
// hydrate-on-resolve fix this throws "no longer available" because the
@@ -428,6 +339,47 @@ describe("hydrate-on-resolve (post-restart short-id persistence)", () => {
}),
).toBe("outbound-guid-pre-restart");
});
it("persists entries when optional chat fields are explicitly undefined", () => {
const issued = rememberIMessageReplyCache({
accountId: "default",
messageId: "guid-with-undefined-optionals",
chatGuid: undefined,
chatIdentifier: undefined,
chatId: undefined,
timestamp: Date.now(),
});
resetIMessageShortIdState({ clearPersistent: false });
expect(
resolveIMessageMessageId(issued.shortId, {
requireKnownShortId: true,
chatContext: { chatIdentifier: "+15551234567" },
}),
).toBe("guid-with-undefined-optionals");
});
it("does not reuse short ids after cached rows expire", () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T00:00:00Z"));
const first = rememberIMessageReplyCache({
accountId: "default",
messageId: "old-guid",
timestamp: Date.now(),
});
expect(first.shortId).toBe("1");
vi.setSystemTime(new Date("2026-05-08T07:00:00Z"));
resetIMessageShortIdState({ clearPersistent: false });
const second = rememberIMessageReplyCache({
accountId: "default",
messageId: "new-guid",
timestamp: Date.now(),
});
expect(second.shortId).toBe("2");
});
});
describe("hydrate counter advancement (rowid-collision protection)", () => {

View File

@@ -1,15 +1,18 @@
import fs from "node:fs";
import path from "node:path";
import { createHash } from "node:crypto";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import { getIMessageRuntime } from "./runtime.js";
const REPLY_CACHE_MAX = 2000;
export const IMESSAGE_REPLY_CACHE_NAMESPACE = "imessage.reply-cache";
export const IMESSAGE_REPLY_CACHE_MAX_ENTRIES = 2000;
export const IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE = "imessage.reply-cache-counter";
export const IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES = 1;
export const IMESSAGE_REPLY_CACHE_COUNTER_KEY = "short-id-counter";
const REPLY_CACHE_TTL_MS = 6 * 60 * 60 * 1000;
/** Recency window for the "react to the latest message" fallback. */
const LATEST_FALLBACK_MS = 10 * 60 * 1000;
let persistenceFailureLogged = false;
let parseFailureLogged = false;
function reportPersistenceFailure(scope: string, err: unknown): void {
if (persistenceFailureLogged) {
return;
@@ -47,162 +50,70 @@ type IMessageReplyCacheEntry = IMessageChatContext & {
isFromMe?: boolean;
};
type IMessageReplyCacheStore = PluginStateSyncKeyedStore<IMessageReplyCacheEntry>;
type IMessageReplyCacheCounter = { counter: number };
const imessageReplyCacheByMessageId = new Map<string, IMessageReplyCacheEntry>();
const imessageShortIdToUuid = new Map<string, string>();
const imessageUuidToShortId = new Map<string, string>();
let imessageShortIdCounter = 0;
// On-disk persistence: short-id ↔ UUID mappings need to survive gateway
// restarts so an agent that received "[message_id:5]" before a restart can
// still react to that message after the restart. The on-disk store is
// best-effort — corruption or write failure falls back to the in-memory
// cache, so the worst case is the same as before persistence existed.
function resolveReplyCachePath(): string {
return path.join(resolveStateDir(), "imessage", "reply-cache.jsonl");
export function resolveIMessageReplyCacheEntryKey(messageId: string): string {
return createHash("sha256").update(messageId, "utf8").digest("hex").slice(0, 32);
}
function readPersistedEntries(): {
entries: IMessageReplyCacheEntry[];
maxObservedShortId: number;
} {
let raw: string;
try {
raw = fs.readFileSync(resolveReplyCachePath(), "utf8");
} catch (err) {
if ((err as NodeJS.ErrnoException)?.code !== "ENOENT") {
reportPersistenceFailure("read", err);
}
return { entries: [], maxObservedShortId: 0 };
}
const cutoff = Date.now() - REPLY_CACHE_TTL_MS;
const out: IMessageReplyCacheEntry[] = [];
// The counter must advance past every shortId we have ever observed in
// the file — including lines we skip because they are stale or malformed.
// Otherwise a future allocation can collide with a still-live mapping
// that came earlier in the file.
let maxObservedShortId = 0;
for (const line of raw.split(/\n+/)) {
if (!line) {
continue;
}
let parsed: Partial<IMessageReplyCacheEntry> | null;
try {
parsed = JSON.parse(line) as Partial<IMessageReplyCacheEntry>;
} catch {
if (!parseFailureLogged) {
parseFailureLogged = true;
logVerbose(
`imessage reply-cache: dropping unparseable line (further parse errors suppressed)`,
);
}
continue;
}
if (parsed && typeof parsed.shortId === "string") {
const numeric = Number.parseInt(parsed.shortId, 10);
if (Number.isFinite(numeric) && numeric > maxObservedShortId) {
maxObservedShortId = numeric;
}
}
if (
typeof parsed?.accountId !== "string" ||
typeof parsed.messageId !== "string" ||
typeof parsed.shortId !== "string" ||
typeof parsed.timestamp !== "number"
) {
continue;
}
if (parsed.timestamp < cutoff) {
continue;
}
out.push({
accountId: parsed.accountId,
messageId: parsed.messageId,
shortId: parsed.shortId,
timestamp: parsed.timestamp,
chatGuid: typeof parsed.chatGuid === "string" ? parsed.chatGuid : undefined,
chatIdentifier: typeof parsed.chatIdentifier === "string" ? parsed.chatIdentifier : undefined,
chatId: typeof parsed.chatId === "number" ? parsed.chatId : undefined,
isFromMe: typeof parsed.isFromMe === "boolean" ? parsed.isFromMe : undefined,
});
}
return { entries: out.slice(-REPLY_CACHE_MAX), maxObservedShortId };
function openReplyCacheStore(): IMessageReplyCacheStore {
return getIMessageRuntime().state.openSyncKeyedStore<IMessageReplyCacheEntry>({
namespace: IMESSAGE_REPLY_CACHE_NAMESPACE,
maxEntries: IMESSAGE_REPLY_CACHE_MAX_ENTRIES,
});
}
// reply-cache.jsonl maps gateway-allocated short-ids to message guids. A
// hostile same-UID process could otherwise (a) read the file to learn
// active conversation guids, or (b) inject lines so a future shortId
// resolution returns an attacker-chosen guid (allowing the agent to
// react/edit/unsend a message it never saw). Owner-only mode on both the
// directory and file closes that vector — defaults are 0755/0644 which
// are world-readable on a multi-user Mac.
const REPLY_CACHE_DIR_MODE = 0o700;
const REPLY_CACHE_FILE_MODE = 0o600;
function writePersistedEntries(entries: IMessageReplyCacheEntry[]): void {
const filePath = resolveReplyCachePath();
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: REPLY_CACHE_DIR_MODE });
fs.writeFileSync(
filePath,
entries.map((entry) => JSON.stringify(entry)).join("\n") + (entries.length ? "\n" : ""),
{ encoding: "utf8", mode: REPLY_CACHE_FILE_MODE },
);
// mkdirSync's mode is masked by umask and only applies on creation. If
// the dir already existed from an older gateway version, clamp it now.
try {
fs.chmodSync(path.dirname(filePath), REPLY_CACHE_DIR_MODE);
fs.chmodSync(filePath, REPLY_CACHE_FILE_MODE);
} catch {
// best-effort — fs may not support chmod on every platform
}
} catch (err) {
reportPersistenceFailure("write", err);
}
function openReplyCacheCounterStore(): PluginStateSyncKeyedStore<IMessageReplyCacheCounter> {
return getIMessageRuntime().state.openSyncKeyedStore<IMessageReplyCacheCounter>({
namespace: IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE,
maxEntries: IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES,
});
}
function appendPersistedEntry(entry: IMessageReplyCacheEntry): void {
const filePath = resolveReplyCachePath();
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: REPLY_CACHE_DIR_MODE });
fs.appendFileSync(filePath, `${JSON.stringify(entry)}\n`, {
encoding: "utf8",
mode: REPLY_CACHE_FILE_MODE,
});
// Always clamp — appendFileSync's `mode` only applies on creation, so
// an existing 0644 file from an older gateway version would otherwise
// never get tightened. chmod is microseconds; doing it every append
// keeps the security guarantee monotonic instead of conditional on
// creation order.
try {
fs.chmodSync(path.dirname(filePath), REPLY_CACHE_DIR_MODE);
fs.chmodSync(filePath, REPLY_CACHE_FILE_MODE);
} catch {
// best-effort
}
} catch (err) {
reportPersistenceFailure("append", err);
}
function remainingTtlMs(timestamp: number): number | undefined {
const remaining = REPLY_CACHE_TTL_MS - Math.max(0, Date.now() - timestamp);
return remaining > 0 ? remaining : undefined;
}
let hydrated = false;
function hydrateFromDiskOnce(): void {
function hydrateFromStoreOnce(): void {
if (hydrated) {
return;
}
hydrated = true;
const { entries, maxObservedShortId } = readPersistedEntries();
// Bump the counter past every observed shortId, even from dropped lines —
// see comment in readPersistedEntries.
if (maxObservedShortId > imessageShortIdCounter) {
imessageShortIdCounter = maxObservedShortId;
const cutoff = Date.now() - REPLY_CACHE_TTL_MS;
let entries: IMessageReplyCacheEntry[];
try {
const counter = openReplyCacheCounterStore().lookup(IMESSAGE_REPLY_CACHE_COUNTER_KEY);
if (counter && Number.isSafeInteger(counter.counter) && counter.counter > 0) {
imessageShortIdCounter = Math.max(imessageShortIdCounter, counter.counter);
}
const store = openReplyCacheStore();
entries = store
.entries()
.map(({ value }) => value)
.filter((entry) => entry.timestamp >= cutoff)
.toSorted((a, b) => a.timestamp - b.timestamp)
.slice(-IMESSAGE_REPLY_CACHE_MAX_ENTRIES);
for (const entry of entries) {
const numeric = Number.parseInt(entry.shortId, 10);
if (Number.isFinite(numeric) && numeric > imessageShortIdCounter) {
imessageShortIdCounter = numeric;
}
}
} catch (err) {
reportPersistenceFailure("read", err);
return;
}
if (entries.length === 0) {
return;
}
// Entries are appended chronologically, so iterate forward to keep the
// newest entry as the "live" mapping when the same messageId appears
// multiple times (e.g. after a write-rewrite cycle).
for (const entry of entries) {
imessageReplyCacheByMessageId.set(entry.messageId, entry);
imessageShortIdToUuid.set(entry.shortId, entry.messageId);
@@ -210,53 +121,104 @@ function hydrateFromDiskOnce(): void {
}
}
function persistReplyCacheEntry(entry: IMessageReplyCacheEntry): void {
const ttlMs = remainingTtlMs(entry.timestamp);
if (!ttlMs) {
return;
}
try {
openReplyCacheStore().register(resolveIMessageReplyCacheEntryKey(entry.messageId), entry, {
ttlMs,
});
} catch (err) {
reportPersistenceFailure("write", err);
}
}
function deleteReplyCacheEntry(messageId: string): void {
try {
openReplyCacheStore().delete(resolveIMessageReplyCacheEntryKey(messageId));
} catch (err) {
reportPersistenceFailure("delete", err);
}
}
function persistReplyCacheCounter(): void {
try {
openReplyCacheCounterStore().register(IMESSAGE_REPLY_CACHE_COUNTER_KEY, {
counter: imessageShortIdCounter,
});
} catch (err) {
reportPersistenceFailure("counter", err);
}
}
function buildReplyCacheEntry(
entry: Omit<IMessageReplyCacheEntry, "shortId">,
messageId: string,
shortId: string,
): IMessageReplyCacheEntry {
return {
accountId: entry.accountId,
messageId,
shortId,
timestamp: entry.timestamp,
...(typeof entry.chatGuid === "string" ? { chatGuid: entry.chatGuid } : {}),
...(typeof entry.chatIdentifier === "string" ? { chatIdentifier: entry.chatIdentifier } : {}),
...(typeof entry.chatId === "number" ? { chatId: entry.chatId } : {}),
...(typeof entry.isFromMe === "boolean" ? { isFromMe: entry.isFromMe } : {}),
};
}
function generateShortId(): string {
imessageShortIdCounter += 1;
persistReplyCacheCounter();
return String(imessageShortIdCounter);
}
export function rememberIMessageReplyCache(
entry: Omit<IMessageReplyCacheEntry, "shortId">,
): IMessageReplyCacheEntry {
hydrateFromDiskOnce();
hydrateFromStoreOnce();
const messageId = entry.messageId.trim();
if (!messageId) {
return { ...entry, shortId: "" };
}
let shortId = imessageUuidToShortId.get(messageId);
let allocatedNew = false;
if (!shortId) {
shortId = generateShortId();
imessageShortIdToUuid.set(shortId, messageId);
imessageUuidToShortId.set(messageId, shortId);
allocatedNew = true;
}
const fullEntry: IMessageReplyCacheEntry = { ...entry, messageId, shortId };
const fullEntry = buildReplyCacheEntry(entry, messageId, shortId);
imessageReplyCacheByMessageId.delete(messageId);
imessageReplyCacheByMessageId.set(messageId, fullEntry);
const cutoff = Date.now() - REPLY_CACHE_TTL_MS;
let evicted = false;
const deletedMessageIds: string[] = [];
for (const [key, value] of imessageReplyCacheByMessageId) {
if (value.timestamp >= cutoff) {
break;
}
imessageReplyCacheByMessageId.delete(key);
deletedMessageIds.push(key);
if (value.shortId) {
imessageShortIdToUuid.delete(value.shortId);
imessageUuidToShortId.delete(key);
}
evicted = true;
}
while (imessageReplyCacheByMessageId.size > REPLY_CACHE_MAX) {
while (imessageReplyCacheByMessageId.size > IMESSAGE_REPLY_CACHE_MAX_ENTRIES) {
const oldest = imessageReplyCacheByMessageId.keys().next().value;
if (!oldest) {
break;
}
const oldEntry = imessageReplyCacheByMessageId.get(oldest);
imessageReplyCacheByMessageId.delete(oldest);
deletedMessageIds.push(oldest);
if (oldEntry?.shortId) {
imessageShortIdToUuid.delete(oldEntry.shortId);
imessageUuidToShortId.delete(oldest);
@@ -264,14 +226,12 @@ export function rememberIMessageReplyCache(
evicted = true;
}
// Append-only is hot-path cheap; periodic rewrite happens when we evict
// stale entries so the file does not grow unbounded across restarts.
if (allocatedNew) {
appendPersistedEntry(fullEntry);
}
if (evicted) {
writePersistedEntries([...imessageReplyCacheByMessageId.values()]);
for (const messageIdToDelete of deletedMessageIds) {
deleteReplyCacheEntry(messageIdToDelete);
}
}
persistReplyCacheEntry(fullEntry);
return fullEntry;
}
@@ -413,14 +373,12 @@ export function resolveIMessageMessageId(
if (!trimmed) {
return trimmed;
}
// Hydrate the on-disk JSONL into the in-memory maps before reading them.
// Without this, the first post-restart action that arrives with a short
// MessageSid would miss `imessageShortIdToUuid` and fall through to the
// "no longer available" path, breaking the persistence contract — the
// mapping was on disk, we just hadn't read it yet on this read path.
// Hydrate SQLite-backed mappings before reading them. Without this, the
// first post-restart action with a short MessageSid would miss
// `imessageShortIdToUuid` and fall through to "no longer available".
// `rememberIMessageReplyCache` already hydrates on its own, so this only
// matters for the resolve-first-after-restart sequence.
hydrateFromDiskOnce();
hydrateFromStoreOnce();
if (/^\d+$/.test(trimmed)) {
// Cache hit: the cached entry carries the chat info this short id was
@@ -476,7 +434,7 @@ export function isKnownFromMeIMessageMessageId(
if (!trimmed || !ctx.accountId || !hasChatScope(ctx)) {
return false;
}
hydrateFromDiskOnce();
hydrateFromStoreOnce();
const cached = imessageReplyCacheByMessageId.get(trimmed);
if (!cached || cached.isFromMe !== true || cached.accountId !== ctx.accountId) {
return false;
@@ -579,24 +537,19 @@ function isPositiveChatMatch(entry: IMessageReplyCacheEntry, ctx: IMessageChatCo
return false;
}
export function resetIMessageShortIdState(): void {
export function resetIMessageShortIdState(options: { clearPersistent?: boolean } = {}): void {
imessageReplyCacheByMessageId.clear();
imessageShortIdToUuid.clear();
imessageUuidToShortId.clear();
imessageShortIdCounter = 0;
hydrated = false;
persistenceFailureLogged = false;
parseFailureLogged = false;
// Only delete the persisted file when the test harness has explicitly
// pointed us at an isolated state directory. Otherwise we would nuke
// whatever live gateway happens to share `~/.openclaw` — and in vitest
// file-level parallelism, two test files calling this at once could
// race a peer's appendFileSync mid-write.
if (!process.env.OPENCLAW_STATE_DIR) {
if (options.clearPersistent === false) {
return;
}
try {
fs.rmSync(resolveReplyCachePath(), { force: true });
openReplyCacheStore().clear();
openReplyCacheCounterStore().clear();
} catch {
// best-effort
}

View File

@@ -7,8 +7,10 @@ import {
} from "./monitor/inbound-processing.js";
import { parseIMessageNotification } from "./monitor/parse-notification.js";
import type { IMessagePayload } from "./monitor/types.js";
import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js";
beforeEach(() => {
installIMessageStateRuntimeForTest();
resetIMessageShortIdState();
});

View File

@@ -7,6 +7,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { createIMessageRpcClient } from "./client.js";
import { monitorIMessageProvider } from "./monitor.js";
import { loadIMessageCatchupCursor } from "./monitor/catchup.js";
import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js";
const waitForTransportReadyMock = vi.hoisted(() =>
vi.fn<typeof waitForTransportReady>(async () => {}),
@@ -107,6 +108,7 @@ describe("iMessage monitor last-route updates", () => {
}
beforeEach(() => {
installIMessageStateRuntimeForTest();
waitForTransportReadyMock.mockReset().mockResolvedValue(undefined);
createIMessageRpcClientMock.mockReset();
readChannelAllowFromStoreMock.mockReset().mockResolvedValue([]);

View File

@@ -1,9 +1,11 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js";
import { runIMessageCatchup } from "./catchup-bridge.js";
import { resolveCatchupConfig, saveIMessageCatchupCursor } from "./catchup.js";
import {
resetIMessageCatchupCursorStoreForTest,
resolveCatchupConfig,
saveIMessageCatchupCursor,
} from "./catchup.js";
import type { IMessagePayload } from "./types.js";
type RpcCall = {
@@ -51,17 +53,14 @@ function makeRow(opts: {
}
describe("runIMessageCatchup", () => {
let tempDir: string;
beforeEach(() => {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-catchup-bridge-"));
vi.stubEnv("OPENCLAW_STATE_DIR", tempDir);
installIMessageStateRuntimeForTest();
resetIMessageCatchupCursorStoreForTest();
});
afterEach(() => {
vi.unstubAllEnvs();
resetIMessageCatchupCursorStoreForTest();
vi.useRealTimers();
fs.rmSync(tempDir, { recursive: true, force: true });
});
it("fetches chats then per-chat history and dispatches each row in rowid order", async () => {

View File

@@ -1,12 +1,11 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js";
import {
advanceIMessageCatchupCursor,
capFailureRetriesMap,
loadIMessageCatchupCursor,
performIMessageCatchup,
resetIMessageCatchupCursorStoreForTest,
resolveCatchupConfig,
saveIMessageCatchupCursor,
type CatchupDispatchFn,
@@ -14,27 +13,9 @@ import {
type IMessageCatchupRow,
} from "./catchup.js";
let tempStateDir: string;
let priorStateDir: string | undefined;
beforeAll(() => {
tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-catchup-"));
priorStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = tempStateDir;
});
afterAll(() => {
if (priorStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = priorStateDir;
}
fs.rmSync(tempStateDir, { recursive: true, force: true });
});
beforeEach(() => {
// Wipe per-account cursor state between tests so each test starts clean.
fs.rmSync(path.join(tempStateDir, "imessage", "catchup"), { recursive: true, force: true });
installIMessageStateRuntimeForTest();
resetIMessageCatchupCursorStoreForTest();
});
describe("resolveCatchupConfig", () => {
@@ -217,6 +198,17 @@ describe("capFailureRetriesMap", () => {
// Both b and d at 9; tiebreak by guid string (alphabetical) → b, d
expect(Object.keys(capped).toSorted()).toEqual(["b", "d"]);
});
it("keeps the persisted retry map under the plugin-state value budget", () => {
const map = Object.fromEntries(
Array.from({ length: 800 }, (_, index) => [`GUID-${index}-${"x".repeat(120)}`, index + 1]),
);
const capped = capFailureRetriesMap(map);
expect(Object.keys(capped).length).toBeLessThanOrEqual(512);
expect(new TextEncoder().encode(JSON.stringify(capped)).byteLength).toBeLessThanOrEqual(48_000);
});
});
describe("performIMessageCatchup", () => {
@@ -452,6 +444,33 @@ describe("performIMessageCatchup", () => {
expect(cursor?.failureRetries?.A).toBe(1);
});
it("keeps held failure state when a live monitor advances the same cursor mid-pass", async () => {
const dispatch = vi.fn<CatchupDispatchFn>(async () => {
await advanceIMessageCatchupCursor(
"primary",
{ lastSeenMs: now - 10_000, lastSeenRowid: 50 },
config,
);
return { ok: false };
});
const fetch = fetchOf([row({ guid: "A", rowid: 10, date: now - 40_000 })]);
const summary = await performIMessageCatchup({
accountId: "primary",
config,
now,
fetch,
dispatch,
});
expect(summary.failed).toBe(1);
expect(summary.cursorAfter.lastSeenRowid).toBe(9);
const cursor = await loadIMessageCatchupCursor("primary");
expect(cursor?.lastSeenRowid).toBe(9);
expect(cursor?.failureRetries?.A).toBe(1);
});
it("advances the cursor past parser-rejected rows via the fetch high-watermark", async () => {
// Regression: without a high-watermark from the fetcher, an unparseable
// row never reaches the loop, so the cursor never advances past it and

View File

@@ -1,10 +1,6 @@
import { createHash } from "node:crypto";
import path from "node:path";
import type { FileLockOptions } from "openclaw/plugin-sdk/file-lock";
import { withFileLock } from "openclaw/plugin-sdk/file-lock";
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { getIMessageRuntime } from "../runtime.js";
// iMessage inbound catchup. When the gateway is offline (crash, restart, mac
// sleep, machine off), `imsg watch` resumes from current state and ignores
@@ -25,20 +21,13 @@ const MAX_PER_RUN_LIMIT = 500;
const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30;
const DEFAULT_MAX_FAILURE_RETRIES = 10;
const MAX_MAX_FAILURE_RETRIES = 1_000;
// Defense-in-depth bound on the retry map. A storm of unique failing GUIDs
// should not balloon the cursor file. When over the bound, keep only the
// highest-count entries (closest to give-up) and drop the rest.
const MAX_FAILURE_RETRY_MAP_SIZE = 5_000;
const CATCHUP_CURSOR_LOCK_OPTIONS: FileLockOptions = {
retries: {
retries: 6,
factor: 1.35,
minTimeout: 8,
maxTimeout: 180,
randomize: true,
},
stale: 60_000,
};
// Defense-in-depth bound on the retry map. The cursor is one plugin-state
// value, so keep the retry payload well below the 64KB store limit.
const MAX_FAILURE_RETRY_MAP_SIZE = 512;
const MAX_FAILURE_RETRY_MAP_JSON_BYTES = 48_000;
const textEncoder = new TextEncoder();
export const IMESSAGE_CATCHUP_CURSOR_NAMESPACE = "imessage.catchup-cursors";
export const IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES = 256;
const cursorWriteQueues = new Map<string, Promise<unknown>>();
export type IMessageCatchupConfig = {
@@ -105,37 +94,37 @@ export type IMessageCatchupSummary = {
windowEndMs: number;
};
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
if (env.OPENCLAW_STATE_DIR?.trim()) {
return resolveStateDir(env);
}
// Default test isolation: per-pid tmpdir. Mirrors the BB catchup pattern so
// the tmpdir-path-guard test that flags dynamic template-literal suffixes
// on os.tmpdir() paths stays green.
if (env.VITEST || env.NODE_ENV === "test") {
const name = "openclaw-vitest-" + process.pid;
return path.join(resolvePreferredOpenClawTmpDir(), name);
}
return resolveStateDir(env);
export function resolveIMessageCatchupCursorKey(accountId: string): string {
return createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 32);
}
function resolveCursorFilePath(accountId: string): string {
// Layout matches inbound-dedupe / persisted-echo-cache so a replayed GUID
// is recognized by the existing dedupe after catchup re-feeds the message
// through the live dispatch path.
const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account";
const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12);
return path.join(resolveStateDirFromEnv(), "imessage", "catchup", `${safePrefix}__${hash}.json`);
function openCatchupCursorStore(): PluginStateSyncKeyedStore<IMessageCatchupCursor> {
return getIMessageRuntime().state.openSyncKeyedStore<IMessageCatchupCursor>({
namespace: IMESSAGE_CATCHUP_CURSOR_NAMESPACE,
maxEntries: IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES,
});
}
function enqueueCursorWrite<T>(filePath: string, fn: () => Promise<T>): Promise<T> {
const prev = cursorWriteQueues.get(filePath) ?? Promise.resolve();
function updateCatchupCursorStore(
key: string,
updateValue: (current: IMessageCatchupCursor | undefined) => IMessageCatchupCursor | undefined,
): boolean {
const store = openCatchupCursorStore();
if (!store.update) {
throw new Error("iMessage catchup cursor persistence requires atomic plugin-state update.");
}
return store.update(key, updateValue);
}
function enqueueCursorWrite<T>(accountId: string, fn: () => Promise<T>): Promise<T> {
const key = resolveIMessageCatchupCursorKey(accountId);
const prev = cursorWriteQueues.get(key) ?? Promise.resolve();
const next = prev.then(fn, fn);
cursorWriteQueues.set(filePath, next);
cursorWriteQueues.set(key, next);
next
.finally(() => {
if (cursorWriteQueues.get(filePath) === next) {
cursorWriteQueues.delete(filePath);
if (cursorWriteQueues.get(key) === next) {
cursorWriteQueues.delete(key);
}
})
.catch(() => {});
@@ -159,63 +148,78 @@ function sanitizeFailureRetriesInput(raw: unknown): Record<string, number> {
return out;
}
/**
* Cursor file path: `<openclawStateDir>/imessage/catchup/<safePrefix>__<sha256[:12]>.json`.
* `openclawStateDir` resolves through `OPENCLAW_STATE_DIR` (or the plugin-sdk default,
* `~/.openclaw`). On a default install the cursor lands at
* `~/.openclaw/imessage/catchup/<safePrefix>__<sha256[:12]>.json`.
*/
export async function loadIMessageCatchupCursor(
accountId: string,
): Promise<IMessageCatchupCursor | null> {
const filePath = resolveCursorFilePath(accountId);
return await loadIMessageCatchupCursorFromPath(filePath);
}
async function loadIMessageCatchupCursorFromPath(
filePath: string,
): Promise<IMessageCatchupCursor | null> {
const { value } = await readJsonFileWithFallback<IMessageCatchupCursor | null>(filePath, null);
function normalizeIMessageCatchupCursor(value: unknown): IMessageCatchupCursor | null {
if (!value || typeof value !== "object") {
return null;
}
if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) {
const raw = value as Partial<IMessageCatchupCursor>;
if (typeof raw.lastSeenMs !== "number" || !Number.isFinite(raw.lastSeenMs)) {
return null;
}
if (typeof value.lastSeenRowid !== "number" || !Number.isFinite(value.lastSeenRowid)) {
if (typeof raw.lastSeenRowid !== "number" || !Number.isFinite(raw.lastSeenRowid)) {
return null;
}
const failureRetries = sanitizeFailureRetriesInput(value.failureRetries);
const failureRetries = sanitizeFailureRetriesInput(raw.failureRetries);
const hasRetries = Object.keys(failureRetries).length > 0;
return {
lastSeenMs: value.lastSeenMs,
lastSeenRowid: value.lastSeenRowid,
updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0,
lastSeenMs: raw.lastSeenMs,
lastSeenRowid: raw.lastSeenRowid,
updatedAt: typeof raw.updatedAt === "number" ? raw.updatedAt : 0,
...(hasRetries ? { failureRetries } : {}),
};
}
function readIMessageCatchupCursor(accountId: string): IMessageCatchupCursor | null {
return normalizeIMessageCatchupCursor(
openCatchupCursorStore().lookup(resolveIMessageCatchupCursorKey(accountId)),
);
}
export async function loadIMessageCatchupCursor(
accountId: string,
): Promise<IMessageCatchupCursor | null> {
return readIMessageCatchupCursor(accountId);
}
function buildIMessageCatchupCursor(next: {
lastSeenMs: number;
lastSeenRowid: number;
failureRetries?: Record<string, number>;
}): IMessageCatchupCursor {
const sanitized = sanitizeFailureRetriesInput(next.failureRetries);
const hasRetries = Object.keys(sanitized).length > 0;
return {
lastSeenMs: next.lastSeenMs,
lastSeenRowid: next.lastSeenRowid,
updatedAt: Date.now(),
...(hasRetries ? { failureRetries: sanitized } : {}),
};
}
export async function saveIMessageCatchupCursor(
accountId: string,
next: { lastSeenMs: number; lastSeenRowid: number; failureRetries?: Record<string, number> },
options: { allowCursorRewindForRetries?: boolean } = {},
): Promise<void> {
const filePath = resolveCursorFilePath(accountId);
await saveIMessageCatchupCursorToPath(filePath, next);
const cursor = buildIMessageCatchupCursor(next);
updateCatchupCursorStore(resolveIMessageCatchupCursorKey(accountId), (existingValue) => {
const existing = normalizeIMessageCatchupCursor(existingValue);
if (existing && cursor.lastSeenRowid < existing.lastSeenRowid) {
if (!options.allowCursorRewindForRetries) {
return undefined;
}
return buildIMessageCatchupCursor({
lastSeenMs: cursor.lastSeenMs,
lastSeenRowid: cursor.lastSeenRowid,
failureRetries: { ...existing.failureRetries, ...cursor.failureRetries },
});
}
return cursor;
});
}
async function saveIMessageCatchupCursorToPath(
filePath: string,
next: { lastSeenMs: number; lastSeenRowid: number; failureRetries?: Record<string, number> },
): Promise<void> {
const sanitized = sanitizeFailureRetriesInput(next.failureRetries);
const hasRetries = Object.keys(sanitized).length > 0;
const cursor: IMessageCatchupCursor = {
lastSeenMs: next.lastSeenMs,
lastSeenRowid: next.lastSeenRowid,
updatedAt: Date.now(),
...(hasRetries ? { failureRetries: sanitized } : {}),
};
await writeJsonFileAtomically(filePath, cursor);
export function resetIMessageCatchupCursorStoreForTest(): void {
openCatchupCursorStore().clear();
}
/**
@@ -226,9 +230,10 @@ async function saveIMessageCatchupCursorToPath(
export function capFailureRetriesMap(
map: Record<string, number>,
maxSize: number = MAX_FAILURE_RETRY_MAP_SIZE,
maxBytes: number = MAX_FAILURE_RETRY_MAP_JSON_BYTES,
): Record<string, number> {
const entries = Object.entries(map);
if (entries.length <= maxSize) {
if (entries.length <= maxSize && textEncoder.encode(JSON.stringify(map)).byteLength <= maxBytes) {
return map;
}
// Sort by count desc; stable tiebreak on guid string so the retained set
@@ -236,9 +241,13 @@ export function capFailureRetriesMap(
// debugging).
entries.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0]));
const capped: Record<string, number> = {};
for (let i = 0; i < maxSize; i++) {
for (let i = 0; i < entries.length && i < maxSize; i++) {
const [guid, count] = entries[i];
capped[guid] = count;
if (textEncoder.encode(JSON.stringify(capped)).byteLength > maxBytes) {
delete capped[guid];
break;
}
}
return capped;
}
@@ -329,29 +338,30 @@ export async function advanceIMessageCatchupCursor(
return false;
}
const filePath = resolveCursorFilePath(accountId);
return await enqueueCursorWrite(filePath, () =>
withFileLock(filePath, CATCHUP_CURSOR_LOCK_OPTIONS, async () => {
const cursor = await loadIMessageCatchupCursorFromPath(filePath);
return await enqueueCursorWrite(accountId, async () => {
let advanced = false;
updateCatchupCursorStore(resolveIMessageCatchupCursorKey(accountId), (existingValue) => {
const cursor = normalizeIMessageCatchupCursor(existingValue);
if (cursor && next.lastSeenRowid <= cursor.lastSeenRowid) {
return false;
return undefined;
}
const blockingFailure = Object.values(cursor?.failureRetries ?? {}).some(
(count) => count < config.maxFailureRetries,
);
if (blockingFailure) {
return false;
return undefined;
}
await saveIMessageCatchupCursorToPath(filePath, {
advanced = true;
return buildIMessageCatchupCursor({
lastSeenMs: Math.max(cursor?.lastSeenMs ?? next.lastSeenMs, next.lastSeenMs),
lastSeenRowid: next.lastSeenRowid,
failureRetries: cursor?.failureRetries,
});
return true;
}),
);
});
return advanced;
});
}
/**
@@ -534,11 +544,17 @@ export async function performIMessageCatchup(
const capped = capFailureRetriesMap(failureRetries);
summary.cursorAfter = { lastSeenMs, lastSeenRowid };
await saveIMessageCatchupCursor(params.accountId, {
lastSeenMs,
lastSeenRowid,
failureRetries: capped,
});
await saveIMessageCatchupCursor(
params.accountId,
{
lastSeenMs,
lastSeenRowid,
failureRetries: capped,
},
{
allowCursorRewindForRetries: earliestHeldFailureRow !== null,
},
);
if (summary.replayed > 0 || summary.failed > 0 || summary.givenUp > 0) {
params.log?.(

View File

@@ -1,10 +1,8 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { sanitizeTerminalText } from "openclaw/plugin-sdk/test-fixtures";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { resetIMessageShortIdState, rememberIMessageReplyCache } from "../monitor-reply-cache.js";
import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js";
import {
buildIMessageInboundContext,
describeIMessageEchoDropLog,
@@ -13,6 +11,11 @@ import {
} from "./inbound-processing.js";
import { createSelfChatCache } from "./self-chat-cache.js";
beforeEach(() => {
installIMessageStateRuntimeForTest();
resetIMessageShortIdState();
});
describe("resolveIMessageInboundDecision echo detection", () => {
const cfg = {} as OpenClawConfig;
type InboundDecisionParams = Parameters<typeof resolveIMessageInboundDecision>[0];
@@ -543,56 +546,42 @@ describe("resolveIMessageInboundDecision echo detection", () => {
});
it("uses the production reply-cache lookup for bot-authored reaction targets", async () => {
const tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-reaction-cache-"));
const priorStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = tempStateDir;
try {
resetIMessageShortIdState();
rememberIMessageReplyCache({
accountId: "default",
messageId: "p:0/imsg-production",
chatGuid: "any;-;+15555550123",
chatIdentifier: "+15555550123",
chatId: 3,
timestamp: Date.now(),
isFromMe: true,
});
rememberIMessageReplyCache({
accountId: "default",
messageId: "p:0/imsg-production",
chatGuid: "any;-;+15555550123",
chatIdentifier: "+15555550123",
chatId: 3,
timestamp: Date.now(),
isFromMe: true,
});
const decision = await resolveDecision({
message: {
guid: "reaction-guid",
is_reaction: true,
reaction_emoji: "❤️",
is_reaction_add: true,
associated_message_guid: "p:0/imsg-production",
associated_message_type: 2000,
text: "Loved “tapback target”",
chat_id: 3,
chat_guid: "any;-;+15555550123",
chat_identifier: "+15555550123",
},
messageText: "Loved “tapback target”",
bodyText: "Loved “tapback target”",
echoCache: { has: () => false },
isKnownFromMeMessageId: undefined,
});
const decision = await resolveDecision({
message: {
guid: "reaction-guid",
is_reaction: true,
reaction_emoji: "❤️",
is_reaction_add: true,
associated_message_guid: "p:0/imsg-production",
associated_message_type: 2000,
text: "Loved “tapback target”",
chat_id: 3,
chat_guid: "any;-;+15555550123",
chat_identifier: "+15555550123",
},
messageText: "Loved “tapback target”",
bodyText: "Loved “tapback target”",
echoCache: { has: () => false },
isKnownFromMeMessageId: undefined,
});
expect(decision.kind).toBe("reaction");
if (decision.kind !== "reaction") {
throw new Error("expected reaction decision");
}
expect(decision.text).toBe(
"iMessage reaction added: ❤️ by +15555550123 on msg imsg-production",
);
} finally {
resetIMessageShortIdState();
if (priorStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = priorStateDir;
}
fs.rmSync(tempStateDir, { recursive: true, force: true });
expect(decision.kind).toBe("reaction");
if (decision.kind !== "reaction") {
throw new Error("expected reaction decision");
}
expect(decision.text).toBe(
"iMessage reaction added: ❤️ by +15555550123 on msg imsg-production",
);
});
it("matches prefixed tapback targets against prefixed echo-cache ids in own mode", async () => {
@@ -988,30 +977,6 @@ describe("resolveIMessageInboundDecision command auth", () => {
});
describe("buildIMessageInboundContext MessageSid handling (rowid-leak regression)", () => {
let tempStateDir: string;
let priorStateDir: string | undefined;
beforeAll(() => {
tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-inbound-"));
priorStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = tempStateDir;
});
afterAll(() => {
if (priorStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = priorStateDir;
}
fs.rmSync(tempStateDir, { recursive: true, force: true });
});
beforeEach(() => {
resetIMessageShortIdState();
try {
fs.rmSync(path.join(tempStateDir, "imessage", "reply-cache.jsonl"), { force: true });
} catch {
// best-effort
}
});
function buildParams(messageOverrides: Partial<{ id: number; guid: string }>) {
const decision = {
kind: "dispatch" as const,

View File

@@ -1,19 +1,28 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
createIMessagePluginStateSyncStoreForTest,
installIMessageFailingStateRuntimeForTest,
installIMessageStateRuntimeForTest,
} from "../test-support/runtime.js";
import { createSentMessageCache } from "./echo-cache.js";
import { rememberPersistedIMessageEcho } from "./persisted-echo-cache.js";
import {
IMESSAGE_SENT_ECHOES_MAX_ENTRIES,
IMESSAGE_SENT_ECHOES_NAMESPACE,
IMESSAGE_SENT_ECHOES_TTL_MS,
hasPersistedIMessageEcho,
rememberPersistedIMessageEcho,
resetPersistedIMessageEchoCacheForTest,
resolveIMessageSentEchoEntryKey,
} from "./persisted-echo-cache.js";
describe("iMessage sent-message echo cache", () => {
const tempDirs: string[] = [];
beforeEach(() => {
installIMessageStateRuntimeForTest();
resetPersistedIMessageEchoCacheForTest();
});
afterEach(() => {
vi.useRealTimers();
vi.unstubAllEnvs();
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { recursive: true, force: true });
}
});
it("matches recent text within the same scope", () => {
@@ -82,47 +91,59 @@ describe("iMessage sent-message echo cache", () => {
expect(cache.has("acct:imessage:+1555", { messageId: "m-1" })).toBe(true);
});
it("matches persisted echoes written by another process", () => {
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-"));
tempDirs.push(stateDir);
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
const cache = createSentMessageCache();
it("matches persisted echoes written before the monitor cache is created", () => {
rememberPersistedIMessageEcho({
scope: "acct:imessage:+1555",
text: "OpenClaw imsg live test",
messageId: "guid-1",
});
const cache = createSentMessageCache();
expect(cache.has("acct:imessage:+1555", { text: "OpenClaw imsg live test" })).toBe(true);
expect(cache.has("acct:imessage:+1666", { text: "OpenClaw imsg live test" })).toBe(false);
expect(cache.has("acct:imessage:+1555", { messageId: "guid-1" })).toBe(true);
});
it("writes sent-echoes.jsonl 0600 and parent dir 0700", () => {
// sent-echoes.jsonl carries scope keys + outbound message text + messageIds.
// Same threat model as reply-cache.jsonl: a same-UID hostile process could
// enumerate active conversations or inject lines so a future inbound dedupe
// call wrongly suppresses a legitimate inbound. Owner-only mode is the
// mitigation.
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-perm-"));
tempDirs.push(stateDir);
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
it("persists text-only and id-only echoes without undefined fields", () => {
const scope = "acct:imessage:+1555";
rememberPersistedIMessageEcho({ scope, text: "text-only" });
rememberPersistedIMessageEcho({ scope, messageId: "id-only" });
rememberPersistedIMessageEcho({
scope: "acct:imessage:+1555",
text: "perm-test",
messageId: "guid-perm",
resetPersistedIMessageEchoCacheForTest({ clearPersistent: false });
const cache = createSentMessageCache();
expect(cache.has(scope, { text: "text-only" })).toBe(true);
expect(cache.has(scope, { messageId: "id-only" })).toBe(true);
});
it("refreshes persisted echoes written after an earlier empty lookup", () => {
const cache = createSentMessageCache();
const scope = "acct:imessage:+1555";
expect(cache.has(scope, { messageId: "guid-late" })).toBe(false);
const entry = { scope, messageId: "guid-late", timestamp: Date.now() };
createIMessagePluginStateSyncStoreForTest({
namespace: IMESSAGE_SENT_ECHOES_NAMESPACE,
maxEntries: IMESSAGE_SENT_ECHOES_MAX_ENTRIES,
}).register(resolveIMessageSentEchoEntryKey(entry), entry, {
ttlMs: IMESSAGE_SENT_ECHOES_TTL_MS,
});
const echoFile = path.join(stateDir, "imessage", "sent-echoes.jsonl");
const echoDir = path.dirname(echoFile);
expect(fs.existsSync(echoFile)).toBe(true);
expect(cache.has(scope, { messageId: "guid-late" })).toBe(true);
});
const fileMode = fs.statSync(echoFile).mode & 0o777;
const dirMode = fs.statSync(echoDir).mode & 0o777;
expect(fileMode).toBe(0o600);
expect(dirMode).toBe(0o700);
it("drops the in-memory mirror on persisted read failure so expired echoes do not match", () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-02-25T00:00:00Z"));
const scope = "acct:imessage:+1555";
rememberPersistedIMessageEcho({ scope, text: "stale echo" });
expect(hasPersistedIMessageEcho({ scope, text: "stale echo" })).toBe(true);
vi.advanceTimersByTime(IMESSAGE_SENT_ECHOES_TTL_MS + 1);
installIMessageFailingStateRuntimeForTest();
expect(hasPersistedIMessageEcho({ scope, text: "stale echo" })).toBe(false);
});
it("retains entries written hours earlier so catchup replay sees own outbound rows", () => {
@@ -132,10 +153,6 @@ describe("iMessage sent-message echo cache", () => {
// rows around them — and the agent's replies to itself land back in the
// inbound pipeline as if they were external sends. Regression guard for
// the echo-cache retention extension that ships with #78649.
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-ttl-"));
tempDirs.push(stateDir);
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-05-08T12:00:00Z"));
rememberPersistedIMessageEcho({
@@ -153,30 +170,4 @@ describe("iMessage sent-message echo cache", () => {
);
expect(cache.has("acct:imessage:+1555", { messageId: "guid-pre-gap" })).toBe(true);
});
it("clamps pre-existing sent-echoes.jsonl from older 0644/0755 to 0600/0700", () => {
// Older gateway versions wrote with default modes. After upgrade, the next
// remember must clamp the existing file/dir back to owner-only.
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-clamp-"));
tempDirs.push(stateDir);
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
const imsgDir = path.join(stateDir, "imessage");
fs.mkdirSync(imsgDir, { recursive: true, mode: 0o755 });
const echoFile = path.join(imsgDir, "sent-echoes.jsonl");
fs.writeFileSync(echoFile, "", { mode: 0o644 });
fs.chmodSync(imsgDir, 0o755);
fs.chmodSync(echoFile, 0o644);
rememberPersistedIMessageEcho({
scope: "acct:imessage:+1555",
text: "clamp-test",
messageId: "guid-clamp",
});
const fileMode = fs.statSync(echoFile).mode & 0o777;
const dirMode = fs.statSync(imsgDir).mode & 0o777;
expect(fileMode).toBe(0o600);
expect(dirMode).toBe(0o700);
});
});

View File

@@ -1,7 +1,7 @@
import fs from "node:fs";
import path from "node:path";
import { createHash } from "node:crypto";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { getIMessageRuntime } from "../runtime.js";
type PersistedEchoEntry = {
scope: string;
@@ -16,32 +16,11 @@ type PersistedEchoEntry = {
// a gateway gap would fall out of the dedupe set before catchup could replay
// the inbound rows around them, and the agent's own messages would land back
// in the inbound pipeline as if they were external sends.
const PERSISTED_ECHO_TTL_MS = 12 * 60 * 60 * 1000;
const MAX_PERSISTED_ECHO_ENTRIES = 256;
export const IMESSAGE_SENT_ECHOES_TTL_MS = 12 * 60 * 60 * 1000;
export const IMESSAGE_SENT_ECHOES_NAMESPACE = "imessage.sent-echoes";
export const IMESSAGE_SENT_ECHOES_MAX_ENTRIES = 256;
// sent-echoes.jsonl carries scope keys + outbound message text + messageIds.
// A hostile same-UID process could otherwise (a) read the file to enumerate
// active conversations and outbound content, or (b) inject lines so a future
// inbound dedupe call wrongly suppresses a legitimate inbound message. Owner-
// only mode on both the directory and file closes that vector — defaults are
// 0755/0644 which are world-readable on a multi-user Mac.
const PERSISTED_ECHO_DIR_MODE = 0o700;
const PERSISTED_ECHO_FILE_MODE = 0o600;
function resolvePersistedEchoPath(): string {
return path.join(resolveStateDir(), "imessage", "sent-echoes.jsonl");
}
function clampPersistedEchoModes(filePath: string): void {
// mkdirSync's mode is masked by umask and only applies on creation. If the
// dir or file already exists from an older gateway version, clamp now.
try {
fs.chmodSync(path.dirname(filePath), PERSISTED_ECHO_DIR_MODE);
fs.chmodSync(filePath, PERSISTED_ECHO_FILE_MODE);
} catch {
// best-effort — fs may not support chmod on every platform
}
}
type PersistedEchoStore = PluginStateSyncKeyedStore<PersistedEchoEntry>;
function normalizeText(text: string | undefined): string | undefined {
const normalized = text?.replace(/\r\n?/g, "\n").trim();
@@ -56,29 +35,7 @@ function normalizeMessageId(messageId: string | undefined): string | undefined {
return normalized;
}
function parseEntry(line: string): PersistedEchoEntry | null {
try {
const parsed = JSON.parse(line) as Partial<PersistedEchoEntry>;
if (typeof parsed.scope !== "string" || typeof parsed.timestamp !== "number") {
return null;
}
return {
scope: parsed.scope,
text: typeof parsed.text === "string" ? parsed.text : undefined,
messageId: typeof parsed.messageId === "string" ? parsed.messageId : undefined,
timestamp: parsed.timestamp,
};
} catch {
return null;
}
}
// In-memory mirror of the persisted file. The echo cache is consulted on
// every inbound message; without a cache, group-chat bursts trigger a
// readFileSync + JSON.parse for every member's reply. The mirror is
// invalidated by file mtime so concurrent gateway processes (rare) and
// post-restart hydrate still see fresh data.
let mirror: { entries: PersistedEchoEntry[]; mtimeMs: number } | null = null;
let mirror: PersistedEchoEntry[] | null = null;
let persistenceFailureLogged = false;
function reportFailure(scope: string, err: unknown): void {
if (persistenceFailureLogged) {
@@ -88,105 +45,54 @@ function reportFailure(scope: string, err: unknown): void {
logVerbose(`imessage echo-cache: ${scope} disabled after first failure: ${String(err)}`);
}
function loadMirrorIfStale(): void {
const filePath = resolvePersistedEchoPath();
let mtimeMs: number;
export function resolveIMessageSentEchoEntryKey(entry: PersistedEchoEntry): string {
return createHash("sha256")
.update(JSON.stringify([entry.scope, entry.text ?? "", entry.messageId ?? "", entry.timestamp]))
.digest("hex")
.slice(0, 32);
}
function openPersistedEchoStore(): PersistedEchoStore {
return getIMessageRuntime().state.openSyncKeyedStore<PersistedEchoEntry>({
namespace: IMESSAGE_SENT_ECHOES_NAMESPACE,
maxEntries: IMESSAGE_SENT_ECHOES_MAX_ENTRIES,
});
}
function remainingTtlMs(timestamp: number): number | undefined {
const remaining = IMESSAGE_SENT_ECHOES_TTL_MS - Math.max(0, Date.now() - timestamp);
return remaining > 0 ? remaining : undefined;
}
function loadMirrorFromStore(): void {
try {
mtimeMs = fs.statSync(filePath).mtimeMs;
} catch (err) {
if ((err as NodeJS.ErrnoException)?.code !== "ENOENT") {
reportFailure("stat", err);
}
mirror = { entries: [], mtimeMs: 0 };
return;
}
if (mirror && mirror.mtimeMs === mtimeMs) {
return;
}
let raw: string;
try {
raw = fs.readFileSync(filePath, "utf8");
const cutoff = Date.now() - IMESSAGE_SENT_ECHOES_TTL_MS;
mirror = openPersistedEchoStore()
.entries()
.map(({ value }) => value)
.filter((entry) => entry.timestamp >= cutoff)
.toSorted((a, b) => a.timestamp - b.timestamp)
.slice(-IMESSAGE_SENT_ECHOES_MAX_ENTRIES);
} catch (err) {
reportFailure("read", err);
mirror = { entries: [], mtimeMs };
return;
mirror = [];
}
const cutoff = Date.now() - PERSISTED_ECHO_TTL_MS;
const entries = raw
.split(/\n+/)
.map(parseEntry)
.filter((entry): entry is PersistedEchoEntry => Boolean(entry && entry.timestamp >= cutoff))
.slice(-MAX_PERSISTED_ECHO_ENTRIES);
mirror = { entries, mtimeMs };
}
function readRecentEntries(): PersistedEchoEntry[] {
loadMirrorIfStale();
return mirror?.entries ?? [];
loadMirrorFromStore();
return mirror ?? [];
}
// Trigger compaction once the on-disk file grows past 2x the cap or holds
// stale entries beyond the TTL window. Until then, every remember is an
// O(1) append rather than a full rewrite — group-chat bursts that send 5+
// outbound messages back-to-back used to write the entire file 5+ times.
const COMPACT_AT_ENTRY_COUNT = MAX_PERSISTED_ECHO_ENTRIES * 2;
function compactRecentEntries(entries: PersistedEchoEntry[]): void {
const filePath = resolvePersistedEchoPath();
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: PERSISTED_ECHO_DIR_MODE });
fs.writeFileSync(
filePath,
entries.map((entry) => JSON.stringify(entry)).join("\n") + (entries.length ? "\n" : ""),
{ encoding: "utf8", mode: PERSISTED_ECHO_FILE_MODE },
);
clampPersistedEchoModes(filePath);
} catch (err) {
reportFailure("compact", err);
// Persistence failed; don't update the in-memory mirror so the next
// read still reflects what's actually on disk.
function persistEntry(entry: PersistedEchoEntry): void {
const ttlMs = remainingTtlMs(entry.timestamp);
if (!ttlMs) {
return;
}
// Update mirror to reflect what we just wrote, so the next has() call
// doesn't re-read the file we just authored.
let mtimeMs = 0;
try {
mtimeMs = fs.statSync(filePath).mtimeMs;
} catch {
// ignore — stale mirror will refresh on next access
}
mirror = { entries: [...entries], mtimeMs };
}
function appendEntry(entry: PersistedEchoEntry): void {
const filePath = resolvePersistedEchoPath();
try {
fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: PERSISTED_ECHO_DIR_MODE });
fs.appendFileSync(filePath, `${JSON.stringify(entry)}\n`, {
encoding: "utf8",
mode: PERSISTED_ECHO_FILE_MODE,
});
// Always clamp — appendFileSync's `mode` only applies on creation, and
// an older gateway version may have left an existing 0644 file behind.
// chmod is microseconds; doing it every append keeps the security
// guarantee monotonic instead of conditional on creation order.
clampPersistedEchoModes(filePath);
openPersistedEchoStore().register(resolveIMessageSentEchoEntryKey(entry), entry, { ttlMs });
} catch (err) {
reportFailure("append", err);
return;
}
// Mirror stays in sync without re-reading the file: append our entry to
// the in-memory copy and bump the mtime to whatever the FS reports now.
let mtimeMs = 0;
try {
mtimeMs = fs.statSync(filePath).mtimeMs;
} catch {
// ignore
}
if (mirror) {
mirror = { entries: [...mirror.entries, entry], mtimeMs };
} else {
mirror = { entries: [entry], mtimeMs };
reportFailure("write", err);
}
}
@@ -195,26 +101,23 @@ export function rememberPersistedIMessageEcho(params: {
text?: string;
messageId?: string;
}): void {
const text = normalizeText(params.text);
const messageId = normalizeMessageId(params.messageId);
const entry: PersistedEchoEntry = {
scope: params.scope,
text: normalizeText(params.text),
messageId: normalizeMessageId(params.messageId),
timestamp: Date.now(),
...(text ? { text } : {}),
...(messageId ? { messageId } : {}),
};
if (!entry.text && !entry.messageId) {
return;
}
// Make sure the mirror reflects whatever's on disk before we decide
// whether a compaction is due.
loadMirrorIfStale();
appendEntry(entry);
const total = mirror?.entries.length ?? 0;
const cutoff = Date.now() - PERSISTED_ECHO_TTL_MS;
const oldestStale = mirror?.entries[0] && mirror.entries[0].timestamp < cutoff;
if (total > COMPACT_AT_ENTRY_COUNT || oldestStale) {
const fresh = (mirror?.entries ?? []).filter((e) => e.timestamp >= cutoff);
compactRecentEntries(fresh.slice(-MAX_PERSISTED_ECHO_ENTRIES));
}
loadMirrorFromStore();
persistEntry(entry);
const cutoff = Date.now() - IMESSAGE_SENT_ECHOES_TTL_MS;
mirror = [...(mirror ?? []), entry]
.filter((candidate) => candidate.timestamp >= cutoff)
.slice(-IMESSAGE_SENT_ECHOES_MAX_ENTRIES);
}
export function hasPersistedIMessageEcho(params: {
@@ -240,3 +143,18 @@ export function hasPersistedIMessageEcho(params: {
}
return false;
}
export function resetPersistedIMessageEchoCacheForTest(
options: { clearPersistent?: boolean } = {},
): void {
mirror = null;
persistenceFailureLogged = false;
if (options.clearPersistent === false) {
return;
}
try {
openPersistedEchoStore().clear();
} catch {
// best-effort
}
}

View File

@@ -1,7 +1,9 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js";
import { createSentMessageCache } from "./echo-cache.js";
import { resolveIMessageInboundDecision } from "./inbound-processing.js";
import { resetPersistedIMessageEchoCacheForTest } from "./persisted-echo-cache.js";
import { createSelfChatCache } from "./self-chat-cache.js";
/**
@@ -24,6 +26,11 @@ type InboundDecisionParams = Parameters<typeof resolveIMessageInboundDecision>[0
const cfg = {} as OpenClawConfig;
beforeEach(() => {
installIMessageStateRuntimeForTest();
resetPersistedIMessageEchoCacheForTest();
});
function createParams(
overrides: Omit<Partial<InboundDecisionParams>, "message"> & {
message?: Partial<InboundDecisionParams["message"]>;

View File

@@ -1,13 +1,13 @@
import type { PluginRuntime } from "openclaw/plugin-sdk/core";
import { createPluginRuntimeStore } from "openclaw/plugin-sdk/runtime-store";
const { setRuntime: setIMessageRuntime, tryGetRuntime: getOptionalIMessageRuntime } =
createPluginRuntimeStore<PluginRuntime>({
pluginId: "imessage",
errorMessage: "iMessage runtime not initialized",
});
// Only the optional accessor is exported: approval-reactions.ts opens a
// persistent keyed store best-effort and must never throw if the runtime has
// not yet bound. If a future caller genuinely needs a throwing accessor,
// re-export `getRuntime` here intentionally.
export { getOptionalIMessageRuntime, setIMessageRuntime };
const {
clearRuntime: clearIMessageRuntime,
getRuntime: getIMessageRuntime,
setRuntime: setIMessageRuntime,
tryGetRuntime: getOptionalIMessageRuntime,
} = createPluginRuntimeStore<PluginRuntime>({
pluginId: "imessage",
errorMessage: "iMessage runtime not initialized",
});
export { clearIMessageRuntime, getIMessageRuntime, getOptionalIMessageRuntime, setIMessageRuntime };

View File

@@ -1,7 +1,7 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
clearIMessageApprovalReactionTargetsForTest,
resolveIMessageApprovalReactionTargetWithPersistence,
@@ -11,8 +11,12 @@ import {
findLatestIMessageEntryForChat,
resetIMessageShortIdState,
} from "./monitor-reply-cache.js";
import { hasPersistedIMessageEcho } from "./monitor/persisted-echo-cache.js";
import {
hasPersistedIMessageEcho,
resetPersistedIMessageEchoCacheForTest,
} from "./monitor/persisted-echo-cache.js";
import { sendMessageIMessage } from "./send.js";
import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js";
const IMESSAGE_TEST_CFG = {
channels: {
@@ -61,9 +65,16 @@ function createApprovalText(id = "approval-123"): string {
}
describe("sendMessageIMessage receipts", () => {
beforeEach(() => {
installIMessageStateRuntimeForTest();
resetIMessageShortIdState();
resetPersistedIMessageEchoCacheForTest();
});
afterEach(() => {
clearIMessageApprovalReactionTargetsForTest();
resetIMessageShortIdState();
resetPersistedIMessageEchoCacheForTest();
vi.restoreAllMocks();
vi.unstubAllEnvs();
vi.useRealTimers();
@@ -291,19 +302,18 @@ describe("sendMessageIMessage receipts", () => {
expect(attachmentArgs[1]).toBe("--chat");
expect(attachmentArgs[2]).toBe("any;-;+15550004567");
expect(attachmentArgs.slice(3)).toEqual(["--file", "/tmp/image.png", "--transport", "auto"]);
expect(
findLatestIMessageEntryForChat({
accountId: "default",
chatIdentifier: "any;-;+15550004567",
}),
).toEqual(
const cachedEntry = findLatestIMessageEntryForChat({
accountId: "default",
chatIdentifier: "any;-;+15550004567",
});
expect(cachedEntry).toEqual(
expect.objectContaining({
messageId: "p:0/dm-media-guid",
chatGuid: undefined,
chatIdentifier: "any;-;+15550004567",
isFromMe: true,
}),
);
expect(cachedEntry).not.toHaveProperty("chatGuid");
expect(getClientMocks(client).request).not.toHaveBeenCalled();
});

View File

@@ -0,0 +1,294 @@
import { createHash } from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { resolveIMessageCatchupCursorKey } from "./monitor/catchup.js";
import { detectIMessageLegacyStateMigrations } from "./state-migrations.js";
describe("detectIMessageLegacyStateMigrations", () => {
const tempDirs: string[] = [];
afterEach(() => {
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { recursive: true, force: true });
}
});
function makeStateDir(): string {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-migration-"));
tempDirs.push(dir);
return dir;
}
function legacyCatchupFilename(accountId: string): string {
return `${accountId}__${createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12)}.json`;
}
it("imports reply, echo, and catchup sidecars into plugin state plans", async () => {
const stateDir = makeStateDir();
const imsgDir = path.join(stateDir, "imessage");
fs.mkdirSync(path.join(imsgDir, "catchup"), { recursive: true });
fs.writeFileSync(
path.join(imsgDir, "reply-cache.jsonl"),
JSON.stringify({
accountId: "default",
messageId: "guid-1",
shortId: "1",
timestamp: Date.now(),
chatIdentifier: "+15551234567",
}) + "\n",
);
fs.writeFileSync(
path.join(imsgDir, "sent-echoes.jsonl"),
JSON.stringify({
scope: "default:imessage:+15551234567",
text: "hello",
timestamp: Date.now(),
}) + "\n",
);
fs.writeFileSync(
path.join(imsgDir, "catchup", "default__37a8eec1ce19.json"),
JSON.stringify({
lastSeenMs: 1_700_000_000_000,
lastSeenRowid: 42,
updatedAt: 1_700_000_000_123,
}),
);
const plans = await detectIMessageLegacyStateMigrations({
cfg: { channels: { imessage: { enabled: true } } } as never,
env: {},
stateDir,
});
expect(plans.map((plan) => plan.label)).toEqual([
"iMessage catchup cursor",
"iMessage reply short-id counter",
"iMessage reply short-id cache",
"iMessage sent-echo dedupe cache",
]);
for (const plan of plans) {
expect(plan.kind).toBe("plugin-state-import");
if (plan.kind !== "plugin-state-import") {
throw new Error("expected plugin-state-import plan");
}
expect(plan.pluginId).toBe("imessage");
if (plan.label !== "iMessage reply short-id counter") {
expect(plan.cleanupSource).toBe("rename");
}
if (
plan.label === "iMessage reply short-id cache" ||
plan.label === "iMessage sent-echo dedupe cache"
) {
expect(plan.cleanupWhenEmpty).toBe(true);
}
const entries = await plan.readEntries();
expect(entries).toHaveLength(1);
}
const catchupPlan = plans.find((plan) => plan.label === "iMessage catchup cursor");
expect(catchupPlan?.kind).toBe("plugin-state-import");
if (!catchupPlan || catchupPlan.kind !== "plugin-state-import") {
throw new Error("expected catchup plugin-state-import plan");
}
const [catchupEntry] = await catchupPlan.readEntries();
expect(
await catchupPlan.shouldReplaceExistingEntry?.({
key: catchupEntry?.key ?? "",
existingValue: { lastSeenMs: 1_600_000_000_000, lastSeenRowid: 10, updatedAt: 0 },
incomingValue: catchupEntry?.value,
}),
).toBe(true);
expect(
await catchupPlan.shouldReplaceExistingEntry?.({
key: catchupEntry?.key ?? "",
existingValue: { lastSeenMs: 1_800_000_000_000, lastSeenRowid: 99, updatedAt: 0 },
incomingValue: catchupEntry?.value,
}),
).toBe(false);
const counterPlan = plans.find((plan) => plan.label === "iMessage reply short-id counter");
expect(counterPlan?.kind).toBe("plugin-state-import");
if (!counterPlan || counterPlan.kind !== "plugin-state-import") {
throw new Error("expected reply counter plugin-state-import plan");
}
expect(
await counterPlan.shouldReplaceExistingEntry?.({
key: "short-id-counter",
existingValue: { counter: 0 },
incomingValue: { counter: 1 },
}),
).toBe(true);
expect(
await counterPlan.shouldReplaceExistingEntry?.({
key: "short-id-counter",
existingValue: { counter: 2 },
incomingValue: { counter: 1 },
}),
).toBe(false);
});
it("leaves unreadable reply-cache sidecars for a later migration attempt", async () => {
const stateDir = makeStateDir();
const imsgDir = path.join(stateDir, "imessage");
fs.mkdirSync(imsgDir, { recursive: true });
const sourcePath = path.join(imsgDir, "reply-cache.jsonl");
fs.writeFileSync(sourcePath, "\n");
const plans = await detectIMessageLegacyStateMigrations({
cfg: { channels: { imessage: { enabled: true } } } as never,
env: {},
stateDir,
});
const replyPlan = plans.find((plan) => plan.label === "iMessage reply short-id cache");
expect(replyPlan?.kind).toBe("plugin-state-import");
if (!replyPlan || replyPlan.kind !== "plugin-state-import") {
throw new Error("expected reply cache plugin-state-import plan");
}
fs.rmSync(sourcePath);
expect(() => replyPlan.readEntries()).toThrow("Failed reading");
});
it("keeps the latest live reply-cache row for duplicate message ids", async () => {
const stateDir = makeStateDir();
const imsgDir = path.join(stateDir, "imessage");
fs.mkdirSync(imsgDir, { recursive: true });
const now = Date.now();
fs.writeFileSync(
path.join(imsgDir, "reply-cache.jsonl"),
[
JSON.stringify({
accountId: "default",
messageId: "guid-dup",
shortId: "1",
timestamp: now - 2_000,
}),
JSON.stringify({
accountId: "default",
messageId: "guid-dup",
shortId: "7",
timestamp: now - 1_000,
}),
].join("\n"),
);
const plans = await detectIMessageLegacyStateMigrations({
cfg: { channels: { imessage: { enabled: true } } } as never,
env: {},
stateDir,
});
const replyPlan = plans.find((plan) => plan.label === "iMessage reply short-id cache");
const counterPlan = plans.find((plan) => plan.label === "iMessage reply short-id counter");
if (!replyPlan || replyPlan.kind !== "plugin-state-import") {
throw new Error("expected reply cache plugin-state-import plan");
}
if (!counterPlan || counterPlan.kind !== "plugin-state-import") {
throw new Error("expected reply counter plugin-state-import plan");
}
const replyEntries = await replyPlan.readEntries();
const counterEntries = await counterPlan.readEntries();
expect(replyEntries).toHaveLength(1);
const replyEntry = replyEntries[0];
if (!replyEntry) {
throw new Error("expected reply cache entry");
}
expect((replyEntry.value as { shortId?: string }).shortId).toBe("7");
expect(counterEntries[0]?.value).toEqual({ counter: 7 });
});
it("archives catchup cursor files that do not match configured accounts", async () => {
const stateDir = makeStateDir();
const catchupDir = path.join(stateDir, "imessage", "catchup");
fs.mkdirSync(catchupDir, { recursive: true });
const sourcePath = path.join(catchupDir, "removed-account__123456789abc.json");
fs.writeFileSync(sourcePath, JSON.stringify({ lastSeenMs: 1, lastSeenRowid: 2 }));
const plans = await detectIMessageLegacyStateMigrations({
cfg: { channels: { imessage: { enabled: true } } } as never,
env: {},
stateDir,
});
const orphanPlan = plans.find((plan) => plan.label === "iMessage orphan catchup cursor");
expect(orphanPlan).toMatchObject({
kind: "plugin-state-import",
sourcePath,
cleanupSource: "rename",
cleanupWhenEmpty: true,
});
if (!orphanPlan || orphanPlan.kind !== "plugin-state-import") {
throw new Error("expected orphan catchup plugin-state-import plan");
}
expect(await orphanPlan.readEntries()).toEqual([]);
});
it("normalizes configured account ids before importing catchup cursor files", async () => {
const stateDir = makeStateDir();
const catchupDir = path.join(stateDir, "imessage", "catchup");
fs.mkdirSync(catchupDir, { recursive: true });
const sourcePath = path.join(catchupDir, legacyCatchupFilename("work"));
fs.writeFileSync(sourcePath, JSON.stringify({ lastSeenMs: 1, lastSeenRowid: 2 }));
const plans = await detectIMessageLegacyStateMigrations({
cfg: {
channels: {
imessage: {
enabled: true,
accounts: {
Work: { cliPath: "imsg-work" },
},
},
},
} as never,
env: {},
stateDir,
});
expect(plans.map((plan) => plan.label)).toEqual(["iMessage catchup cursor"]);
const [plan] = plans;
expect(plan?.kind).toBe("plugin-state-import");
if (!plan || plan.kind !== "plugin-state-import") {
throw new Error("expected catchup plugin-state-import plan");
}
expect(plan.sourcePath).toBe(sourcePath);
const [entry] = await plan.readEntries();
expect(entry?.key).toBe(resolveIMessageCatchupCursorKey("work"));
});
it("caps imported catchup retry maps for plugin-state value limits", async () => {
const stateDir = makeStateDir();
const catchupDir = path.join(stateDir, "imessage", "catchup");
fs.mkdirSync(catchupDir, { recursive: true });
fs.writeFileSync(
path.join(catchupDir, "default__37a8eec1ce19.json"),
JSON.stringify({
lastSeenMs: 1,
lastSeenRowid: 2,
failureRetries: Object.fromEntries(
Array.from({ length: 800 }, (_, index) => [
`GUID-${index}-${"x".repeat(120)}`,
index + 1,
]),
),
}),
);
const plans = await detectIMessageLegacyStateMigrations({
cfg: { channels: { imessage: { enabled: true } } } as never,
env: {},
stateDir,
});
const catchupPlan = plans.find((plan) => plan.label === "iMessage catchup cursor");
if (!catchupPlan || catchupPlan.kind !== "plugin-state-import") {
throw new Error("expected catchup plugin-state-import plan");
}
const [entry] = await catchupPlan.readEntries();
expect(new TextEncoder().encode(JSON.stringify(entry?.value)).byteLength).toBeLessThanOrEqual(
65_536,
);
});
});

View File

@@ -0,0 +1,438 @@
import { createHash } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import type { ChannelLegacyStateMigrationPlan } from "openclaw/plugin-sdk/channel-contract";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { statRegularFileSync } from "openclaw/plugin-sdk/security-runtime";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { uniqueStrings } from "openclaw/plugin-sdk/string-coerce-runtime";
import {
listIMessageAccountIds,
resolveDefaultIMessageAccountId,
resolveIMessageAccount,
} from "./accounts.js";
import {
IMESSAGE_REPLY_CACHE_MAX_ENTRIES,
IMESSAGE_REPLY_CACHE_COUNTER_KEY,
IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES,
IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE,
IMESSAGE_REPLY_CACHE_NAMESPACE,
resolveIMessageReplyCacheEntryKey,
} from "./monitor-reply-cache.js";
import {
capFailureRetriesMap,
IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES,
IMESSAGE_CATCHUP_CURSOR_NAMESPACE,
resolveIMessageCatchupCursorKey,
type IMessageCatchupCursor,
} from "./monitor/catchup.js";
import {
IMESSAGE_SENT_ECHOES_MAX_ENTRIES,
IMESSAGE_SENT_ECHOES_NAMESPACE,
IMESSAGE_SENT_ECHOES_TTL_MS,
resolveIMessageSentEchoEntryKey,
} from "./monitor/persisted-echo-cache.js";
type ReplyCacheEntry = {
accountId: string;
messageId: string;
shortId: string;
timestamp: number;
chatGuid?: string;
chatIdentifier?: string;
chatId?: number;
isFromMe?: boolean;
};
type SentEchoEntry = {
scope: string;
text?: string;
messageId?: string;
timestamp: number;
};
const REPLY_CACHE_TTL_MS = 6 * 60 * 60 * 1000;
function fileExists(pathValue: string): boolean {
try {
return !statRegularFileSync(pathValue).missing;
} catch {
return false;
}
}
function resolveMigrationStateDir(params: { env: NodeJS.ProcessEnv; stateDir?: string }): string {
return params.stateDir ?? resolveStateDir(params.env);
}
function remainingTtlMs(timestamp: number, ttlMs: number): number | undefined {
const remaining = ttlMs - Math.max(0, Date.now() - timestamp);
return remaining > 0 ? remaining : undefined;
}
function readJsonl(pathValue: string): unknown[] {
try {
return fs
.readFileSync(pathValue, "utf8")
.split(/\n+/)
.flatMap((line) => {
if (!line) {
return [];
}
try {
return [JSON.parse(line) as unknown];
} catch {
return [];
}
});
} catch (err) {
throw new Error(`Failed reading ${pathValue}: ${String(err)}`, { cause: err });
}
}
function parseReplyCacheEntry(raw: unknown): ReplyCacheEntry | null {
if (!raw || typeof raw !== "object") {
return null;
}
const parsed = raw as Partial<ReplyCacheEntry>;
if (
typeof parsed.accountId !== "string" ||
typeof parsed.messageId !== "string" ||
typeof parsed.shortId !== "string" ||
typeof parsed.timestamp !== "number"
) {
return null;
}
return {
accountId: parsed.accountId,
messageId: parsed.messageId,
shortId: parsed.shortId,
timestamp: parsed.timestamp,
...(typeof parsed.chatGuid === "string" ? { chatGuid: parsed.chatGuid } : {}),
...(typeof parsed.chatIdentifier === "string" ? { chatIdentifier: parsed.chatIdentifier } : {}),
...(typeof parsed.chatId === "number" ? { chatId: parsed.chatId } : {}),
...(typeof parsed.isFromMe === "boolean" ? { isFromMe: parsed.isFromMe } : {}),
};
}
function readReplyCacheMaxShortId(sourcePath: string): number {
let max = 0;
for (const raw of readJsonl(sourcePath)) {
if (!raw || typeof raw !== "object") {
continue;
}
const shortId = (raw as { shortId?: unknown }).shortId;
if (typeof shortId !== "string") {
continue;
}
const numeric = Number.parseInt(shortId, 10);
if (Number.isFinite(numeric) && numeric > max) {
max = numeric;
}
}
return max;
}
function readReplyCounterValue(value: unknown): number | null {
if (!value || typeof value !== "object") {
return null;
}
const counter = (value as { counter?: unknown }).counter;
return typeof counter === "number" && Number.isFinite(counter) ? counter : null;
}
function shouldReplaceReplyCounter(existingValue: unknown, incomingValue: unknown): boolean {
const incomingCounter = readReplyCounterValue(incomingValue);
if (incomingCounter === null) {
return false;
}
const existingCounter = readReplyCounterValue(existingValue);
return existingCounter === null || incomingCounter > existingCounter;
}
function parseSentEchoEntry(raw: unknown): SentEchoEntry | null {
if (!raw || typeof raw !== "object") {
return null;
}
const parsed = raw as Partial<SentEchoEntry>;
if (typeof parsed.scope !== "string" || typeof parsed.timestamp !== "number") {
return null;
}
return {
scope: parsed.scope,
timestamp: parsed.timestamp,
...(typeof parsed.text === "string" ? { text: parsed.text } : {}),
...(typeof parsed.messageId === "string" ? { messageId: parsed.messageId } : {}),
};
}
function listReplyCacheEntries(sourcePath: string): Array<{
key: string;
value: ReplyCacheEntry;
ttlMs?: number;
}> {
const entriesByKey = new Map<string, { value: ReplyCacheEntry; ttlMs: number }>();
for (const entry of readJsonl(sourcePath).map(parseReplyCacheEntry)) {
if (!entry) {
continue;
}
const ttlMs = remainingTtlMs(entry.timestamp, REPLY_CACHE_TTL_MS);
if (!ttlMs) {
continue;
}
const key = resolveIMessageReplyCacheEntryKey(entry.messageId);
entriesByKey.delete(key);
entriesByKey.set(key, { value: entry, ttlMs });
}
return [...entriesByKey.entries()]
.slice(-IMESSAGE_REPLY_CACHE_MAX_ENTRIES)
.map(([key, entry]) => ({ key, value: entry.value, ttlMs: entry.ttlMs }));
}
function listSentEchoEntries(sourcePath: string): Array<{
key: string;
value: SentEchoEntry;
ttlMs?: number;
}> {
return readJsonl(sourcePath)
.map(parseSentEchoEntry)
.filter((entry): entry is SentEchoEntry => Boolean(entry))
.slice(-IMESSAGE_SENT_ECHOES_MAX_ENTRIES)
.flatMap((entry) => {
const ttlMs = remainingTtlMs(entry.timestamp, IMESSAGE_SENT_ECHOES_TTL_MS);
if (!ttlMs) {
return [];
}
return [{ key: resolveIMessageSentEchoEntryKey(entry), value: entry, ttlMs }];
});
}
function resolveLegacyCatchupCursorPath(stateDir: string, accountId: string): string {
const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account";
const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12);
return path.join(stateDir, "imessage", "catchup", `${safePrefix}__${hash}.json`);
}
function listLegacyCatchupCursorPaths(stateDir: string): string[] {
const dir = path.join(stateDir, "imessage", "catchup");
try {
return fs
.readdirSync(dir, { withFileTypes: true })
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
.map((entry) => path.join(dir, entry.name));
} catch {
return [];
}
}
function normalizeCatchupCursor(raw: unknown): IMessageCatchupCursor | null {
if (!raw || typeof raw !== "object") {
return null;
}
const value = raw as Partial<IMessageCatchupCursor>;
if (
typeof value.lastSeenMs !== "number" ||
!Number.isFinite(value.lastSeenMs) ||
typeof value.lastSeenRowid !== "number" ||
!Number.isFinite(value.lastSeenRowid)
) {
return null;
}
const failureRetries = sanitizeCatchupFailureRetries(value.failureRetries);
const hasRetries = Object.keys(failureRetries).length > 0;
return {
lastSeenMs: value.lastSeenMs,
lastSeenRowid: value.lastSeenRowid,
updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0,
...(hasRetries ? { failureRetries } : {}),
};
}
function readCatchupCursor(sourcePath: string): IMessageCatchupCursor {
let parsed: unknown;
try {
parsed = JSON.parse(fs.readFileSync(sourcePath, "utf8")) as unknown;
} catch (err) {
throw new Error(`Failed reading ${sourcePath}: ${String(err)}`, { cause: err });
}
const cursor = normalizeCatchupCursor(parsed);
if (!cursor) {
throw new Error(`Invalid iMessage catchup cursor: ${sourcePath}`);
}
return cursor;
}
function sanitizeCatchupFailureRetries(raw: unknown): Record<string, number> {
if (!raw || typeof raw !== "object") {
return {};
}
const out: Record<string, number> = {};
for (const [guid, count] of Object.entries(raw as Record<string, unknown>)) {
if (typeof count === "number" && Number.isFinite(count) && count > 0) {
out[guid] = Math.floor(count);
}
}
return capFailureRetriesMap(out);
}
function shouldReplaceCatchupCursor(existingValue: unknown, incomingValue: unknown): boolean {
const incoming = normalizeCatchupCursor(incomingValue);
if (!incoming) {
return false;
}
const existing = normalizeCatchupCursor(existingValue);
return (
!existing ||
incoming.lastSeenRowid > existing.lastSeenRowid ||
(incoming.lastSeenRowid === existing.lastSeenRowid && incoming.lastSeenMs > existing.lastSeenMs)
);
}
function detectReplyCacheMigration(params: {
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const stateDir = resolveMigrationStateDir(params);
const sourcePath = path.join(stateDir, "imessage", "reply-cache.jsonl");
if (!fileExists(sourcePath)) {
return [];
}
const plans: ChannelLegacyStateMigrationPlan[] = [];
plans.push({
kind: "plugin-state-import",
label: "iMessage reply short-id counter",
sourcePath,
targetPath: `plugin state:${IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE}`,
pluginId: "imessage",
namespace: IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE,
maxEntries: IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES,
scopeKey: "",
stateDir,
preview: `- iMessage reply short-id counter: ${sourcePath} → plugin state (${IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE})`,
readEntries: () => {
const maxShortId = readReplyCacheMaxShortId(sourcePath);
return maxShortId > 0
? [{ key: IMESSAGE_REPLY_CACHE_COUNTER_KEY, value: { counter: maxShortId } }]
: [];
},
shouldReplaceExistingEntry: ({ existingValue, incomingValue }) =>
shouldReplaceReplyCounter(existingValue, incomingValue),
});
plans.push({
kind: "plugin-state-import",
label: "iMessage reply short-id cache",
sourcePath,
targetPath: `plugin state:${IMESSAGE_REPLY_CACHE_NAMESPACE}`,
pluginId: "imessage",
namespace: IMESSAGE_REPLY_CACHE_NAMESPACE,
maxEntries: IMESSAGE_REPLY_CACHE_MAX_ENTRIES,
scopeKey: "",
stateDir,
cleanupSource: "rename",
cleanupWhenEmpty: true,
preview: `- iMessage reply short-id cache: ${sourcePath} → plugin state (${IMESSAGE_REPLY_CACHE_NAMESPACE})`,
readEntries: () => listReplyCacheEntries(sourcePath),
});
return plans;
}
function detectSentEchoMigration(params: {
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const stateDir = resolveMigrationStateDir(params);
const sourcePath = path.join(stateDir, "imessage", "sent-echoes.jsonl");
if (!fileExists(sourcePath)) {
return [];
}
return [
{
kind: "plugin-state-import",
label: "iMessage sent-echo dedupe cache",
sourcePath,
targetPath: `plugin state:${IMESSAGE_SENT_ECHOES_NAMESPACE}`,
pluginId: "imessage",
namespace: IMESSAGE_SENT_ECHOES_NAMESPACE,
maxEntries: IMESSAGE_SENT_ECHOES_MAX_ENTRIES,
scopeKey: "",
stateDir,
cleanupSource: "rename",
cleanupWhenEmpty: true,
preview: `- iMessage sent-echo dedupe cache: ${sourcePath} → plugin state (${IMESSAGE_SENT_ECHOES_NAMESPACE})`,
readEntries: () => listSentEchoEntries(sourcePath),
},
];
}
function detectCatchupCursorMigrations(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const stateDir = resolveMigrationStateDir(params);
const accountIds = uniqueStrings(
[resolveDefaultIMessageAccountId(params.cfg), ...listIMessageAccountIds(params.cfg)].map(
(accountId) => resolveIMessageAccount({ cfg: params.cfg, accountId }).accountId,
),
);
const configuredPaths = new Set(
accountIds.map((accountId) => resolveLegacyCatchupCursorPath(stateDir, accountId)),
);
const configuredPlans = accountIds.flatMap((accountId) => {
const sourcePath = resolveLegacyCatchupCursorPath(stateDir, accountId);
if (!fileExists(sourcePath)) {
return [];
}
return {
kind: "plugin-state-import" as const,
label: "iMessage catchup cursor",
sourcePath,
targetPath: `plugin state:${IMESSAGE_CATCHUP_CURSOR_NAMESPACE}`,
pluginId: "imessage",
namespace: IMESSAGE_CATCHUP_CURSOR_NAMESPACE,
maxEntries: IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES,
scopeKey: "",
stateDir,
cleanupSource: "rename" as const,
preview: `- iMessage catchup cursor: ${sourcePath} → plugin state (${IMESSAGE_CATCHUP_CURSOR_NAMESPACE})`,
readEntries: () => {
const cursor = readCatchupCursor(sourcePath);
return [{ key: resolveIMessageCatchupCursorKey(accountId), value: cursor }];
},
shouldReplaceExistingEntry: (replaceParams: {
existingValue: unknown;
incomingValue: unknown;
}) => shouldReplaceCatchupCursor(replaceParams.existingValue, replaceParams.incomingValue),
};
});
const orphanPlans = listLegacyCatchupCursorPaths(stateDir)
.filter((sourcePath) => !configuredPaths.has(sourcePath))
.map((sourcePath) => ({
kind: "plugin-state-import" as const,
label: "iMessage orphan catchup cursor",
sourcePath,
targetPath: `plugin state:${IMESSAGE_CATCHUP_CURSOR_NAMESPACE}`,
pluginId: "imessage",
namespace: IMESSAGE_CATCHUP_CURSOR_NAMESPACE,
maxEntries: IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES,
scopeKey: "",
stateDir,
cleanupSource: "rename" as const,
cleanupWhenEmpty: true,
preview: `- iMessage orphan catchup cursor: ${sourcePath} → archived legacy state`,
readEntries: () => [],
}));
return [...configuredPlans, ...orphanPlans];
}
export async function detectIMessageLegacyStateMigrations(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
stateDir?: string;
}): Promise<ChannelLegacyStateMigrationPlan[]> {
return [
...detectCatchupCursorMigrations(params),
...detectReplyCacheMigration(params),
...detectSentEchoMigration(params),
];
}

View File

@@ -0,0 +1,65 @@
import fs from "node:fs";
import path from "node:path";
import type {
OpenKeyedStoreOptions,
PluginStateSyncKeyedStore,
} from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateKeyedStoreForTests,
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import type { PluginRuntime } from "openclaw/plugin-sdk/runtime-store";
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
import { setIMessageRuntime } from "../runtime.js";
function createIMessageTestEnv(): NodeJS.ProcessEnv {
const stateDir = fs.mkdtempSync(
path.join(resolvePreferredOpenClawTmpDir(), "openclaw-imessage-state-"),
);
return { ...process.env, OPENCLAW_STATE_DIR: stateDir };
}
let imessageTestEnv = createIMessageTestEnv();
export function createIMessagePluginStateSyncStoreForTest<T>(
options: OpenKeyedStoreOptions,
): PluginStateSyncKeyedStore<T> {
return createPluginStateSyncKeyedStoreForTests<T>("imessage", {
...options,
env: imessageTestEnv,
});
}
export function installIMessageStateRuntimeForTest(): void {
imessageTestEnv = createIMessageTestEnv();
resetPluginStateStoreForTests();
setIMessageRuntime({
state: {
openKeyedStore: ((options) =>
createPluginStateKeyedStoreForTests("imessage", {
...options,
env: imessageTestEnv,
})) as PluginRuntime["state"]["openKeyedStore"],
openSyncKeyedStore: ((options) =>
createIMessagePluginStateSyncStoreForTest(
options,
)) as PluginRuntime["state"]["openSyncKeyedStore"],
},
channel: {},
} as PluginRuntime);
}
export function installIMessageFailingStateRuntimeForTest(): void {
setIMessageRuntime({
state: {
openKeyedStore: (() => {
throw new Error("test plugin-state failure");
}) as PluginRuntime["state"]["openKeyedStore"],
openSyncKeyedStore: (() => {
throw new Error("test plugin-state failure");
}) as PluginRuntime["state"]["openSyncKeyedStore"],
},
channel: {},
} as PluginRuntime);
}

View File

@@ -42,6 +42,15 @@ function createMemoryStore<T>(): PluginStateKeyedStore<T> {
values.set(key, { key, value, createdAt: Date.now() });
return true;
},
async update(key, updateValue) {
const next = updateValue(values.get(key)?.value);
if (next === undefined) {
return false;
}
assertNoUndefinedFields(next);
values.set(key, { key, value: next, createdAt: Date.now() });
return true;
},
async lookup(key) {
return values.get(key)?.value;
},
@@ -146,6 +155,9 @@ describe("createDurableInboundReceiveJournal", () => {
async registerIfAbsent() {
return false;
},
async update() {
return false;
},
async lookup() {
return undefined;
},
@@ -167,6 +179,9 @@ describe("createDurableInboundReceiveJournal", () => {
async registerIfAbsent() {
return false;
},
async update() {
return false;
},
async lookup() {
completedLookups += 1;
return completedLookups === 2
@@ -213,6 +228,9 @@ describe("createDurableInboundReceiveJournal", () => {
async registerIfAbsent() {
return false;
},
async update() {
return false;
},
async lookup() {
completedLookups += 1;
return completedLookups === 2

View File

@@ -172,6 +172,7 @@ export type ChannelLegacyStateMigrationPlan =
scopeKey: string;
stateDir?: string;
cleanupSource?: "rename";
cleanupWhenEmpty?: boolean;
preview?: string;
shouldReplaceExistingEntry?: (params: {
key: string;

View File

@@ -215,6 +215,9 @@ describe("ensureConfigReady", () => {
["Telegram sticker cache", "telegram/sticker-cache.json"],
["Telegram thread bindings", "telegram/thread-bindings-default.json"],
["Telegram pairing allowFrom", "credentials/telegram-allowFrom.json"],
["iMessage reply short-id cache", "imessage/reply-cache.jsonl"],
["iMessage sent echo cache", "imessage/sent-echoes.jsonl"],
["iMessage catchup cursor", "imessage/catchup/default.json"],
["WhatsApp root auth", "credentials/creds.json"],
])("runs doctor flow for bundled channel legacy state: %s", async (_label, relativePath) => {
const root = useTempOpenClawHome();

View File

@@ -66,6 +66,14 @@ function isLegacyTelegramStateFile(name: string): boolean {
);
}
function hasLegacyIMessageStateFiles(stateDir: string): boolean {
return (
fileOrDirExists(path.join(stateDir, "imessage", "reply-cache.jsonl")) ||
fileOrDirExists(path.join(stateDir, "imessage", "sent-echoes.jsonl")) ||
dirHasFile(path.join(stateDir, "imessage", "catchup"), (name) => name.endsWith(".json"))
);
}
function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir: string): boolean {
if (fileOrDirExists(path.join(stateDir, "discord", "model-picker-preferences.json"))) {
return true;
@@ -73,6 +81,9 @@ function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir:
if (dirHasFile(path.join(stateDir, "feishu", "dedup"), (name) => name.endsWith(".json"))) {
return true;
}
if (hasLegacyIMessageStateFiles(stateDir)) {
return true;
}
if (
fileOrDirExists(path.join(oauthDir, "telegram-allowFrom.json")) ||
dirHasFile(path.join(stateDir, "telegram"), isLegacyTelegramStateFile)

View File

@@ -1008,6 +1008,85 @@ describe("doctor legacy state migrations", () => {
});
});
it("archives empty plugin-state import sources when the channel plan asks for cleanup", async () => {
const root = await makeTempRoot();
const sourceDir = path.join(root, "imessage");
fs.mkdirSync(sourceDir, { recursive: true });
const sourcePath = path.join(sourceDir, "reply-cache.jsonl");
fs.writeFileSync(sourcePath, "expired\n", "utf-8");
if (process.platform !== "win32") {
fs.chmodSync(sourceDir, 0o755);
fs.chmodSync(sourcePath, 0o644);
}
mockedChannelMigrationPlans.plans = [
{
kind: "plugin-state-import",
label: "Test expired cache",
sourcePath,
targetPath: "plugin state:test.expired-cache",
pluginId: "telegram",
namespace: "test.expired-cache",
maxEntries: 4,
scopeKey: "",
cleanupSource: "rename",
cleanupWhenEmpty: true,
readEntries: () => [],
},
];
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([]);
expect(result.changes).toContain(
`Archived Test expired cache legacy source → ${sourcePath}.migrated`,
);
expect(fs.existsSync(sourcePath)).toBe(false);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true);
if (process.platform !== "win32") {
expect(fs.statSync(`${sourcePath}.migrated`).mode & 0o777).toBe(0o600);
}
});
it("keeps plugin-state import sources when reading entries fails", async () => {
const root = await makeTempRoot();
const sourcePath = path.join(root, "legacy-cache.json");
fs.writeFileSync(sourcePath, "legacy", "utf-8");
mockedChannelMigrationPlans.plans = [
{
kind: "plugin-state-import",
label: "Test unreadable cache",
sourcePath,
targetPath: "plugin state:test.unreadable-cache",
pluginId: "telegram",
namespace: "test.unreadable-cache",
maxEntries: 4,
scopeKey: "",
cleanupSource: "rename",
cleanupWhenEmpty: true,
readEntries: () => {
throw new Error("read failed");
},
},
];
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.changes).toStrictEqual([]);
expect(result.warnings).toStrictEqual([
"Failed reading Test unreadable cache legacy source: Error: read failed",
]);
expect(fs.existsSync(sourcePath)).toBe(true);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(false);
});
it("keeps plugin-state import source when plugin cap eviction drops an imported row", async () => {
const root = await makeTempRoot();
const maxPluginStateEntries = 40;

View File

@@ -125,7 +125,7 @@ test("sessions.list keeps bulk rows lightweight and uses persisted model fields"
"dashboard:child": sessionStoreEntry("sess-child", {
updatedAt: Date.now() - 1_000,
modelProvider: "anthropic",
model: "claude-sonnet-4-6",
model: "test-model-without-catalog-context",
parentSessionKey: "agent:main:main",
totalTokens: 0,
totalTokensFresh: false,
@@ -161,10 +161,10 @@ test("sessions.list keeps bulk rows lightweight and uses persisted model fields"
expect(child?.parentSessionKey).toBe("agent:main:main");
expect(child?.totalTokens).toBeUndefined();
expect(child?.totalTokensFresh).toBe(false);
expect(child?.contextTokens).toBe(1_048_576);
expect(child?.contextTokens).toBeUndefined();
expect(child?.estimatedCostUsd).toBeUndefined();
expect(child?.modelProvider).toBe("anthropic");
expect(child?.model).toBe("claude-sonnet-4-6");
expect(child?.model).toBe("test-model-without-catalog-context");
ws.close();
});

View File

@@ -317,6 +317,51 @@ function archiveLegacyTaskStateSidecar(params: {
);
}
function hardenLegacyImportSource(params: {
sourcePath: string;
label: string;
warnings: string[];
}): boolean {
try {
fs.chmodSync(params.sourcePath, 0o600);
return true;
} catch (err) {
params.warnings.push(`Failed securing ${params.label} legacy source: ${String(err)}`);
return false;
}
}
function archiveLegacyImportSource(params: {
sourcePath: string;
label: string;
changes: string[];
warnings: string[];
}): void {
const archivedPath = `${params.sourcePath}.migrated`;
if (fileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${params.label} source in place because ${archivedPath} already exists`,
);
return;
}
if (!hardenLegacyImportSource(params)) {
return;
}
try {
fs.renameSync(params.sourcePath, archivedPath);
try {
fs.chmodSync(archivedPath, 0o600);
} catch (err) {
params.warnings.push(
`Failed securing archived ${params.label} legacy source: ${String(err)}`,
);
}
params.changes.push(`Archived ${params.label} legacy source → ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`);
}
}
function listSqliteColumns(db: DatabaseSync, table: string): Set<string> {
const rows = db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name?: string }>;
return new Set(rows.flatMap((row) => (row.name ? [row.name] : [])));
@@ -1271,7 +1316,13 @@ async function runLegacyMigrationPlans(
const existingValuesByKey = new Map(storeEntries.map(({ key, value }) => [key, value]));
const expectedKeys = new Set(existingKeys);
let remainingCapacity = Math.max(0, plan.maxEntries - storeEntries.length);
const entries = await plan.readEntries();
let entries: Awaited<ReturnType<typeof plan.readEntries>>;
try {
entries = await plan.readEntries();
} catch (err) {
warnings.push(`Failed reading ${plan.label} legacy source: ${String(err)}`);
return;
}
const candidateEntries: Array<{
key: string;
targetKey: string;
@@ -1366,26 +1417,20 @@ async function runLegacyMigrationPlans(
cleanupKeys = expectedKeys;
}
const allEntriesCovered =
entries.length > 0 &&
entries.every(
({ key }) =>
cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)) &&
!failedTargetKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
);
(entries.length === 0 && plan.cleanupWhenEmpty === true) ||
(entries.length > 0 &&
entries.every(
({ key }) =>
cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)) &&
!failedTargetKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
));
if (allEntriesCovered && plan.cleanupSource === "rename" && fileExists(plan.sourcePath)) {
const archivedPath = `${plan.sourcePath}.migrated`;
if (fileExists(archivedPath)) {
warnings.push(
`Left migrated ${plan.label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
fs.renameSync(plan.sourcePath, archivedPath);
changes.push(`Archived ${plan.label} legacy source → ${archivedPath}`);
} catch (err) {
warnings.push(`Failed archiving ${plan.label} legacy source: ${String(err)}`);
}
archiveLegacyImportSource({
sourcePath: plan.sourcePath,
label: plan.label,
changes,
warnings,
});
}
});
continue;

View File

@@ -552,6 +552,75 @@ export function pluginStateRegisterIfAbsent(params: {
}
}
export function pluginStateUpdate(params: {
pluginId: string;
namespace: string;
key: string;
maxEntries: number;
updateValueJson: (current: unknown) => { valueJson: string; ttlMs?: number } | undefined;
env?: NodeJS.ProcessEnv;
}): boolean {
try {
return runWriteTransaction(
"register",
(store) => {
const now = Date.now();
deleteExpiredPluginStateNamespaceEntries(store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
now,
});
const existing = selectPluginStateEntry(store.db, {
pluginId: params.pluginId,
namespace: params.namespace,
key: params.key,
now,
});
const next = params.updateValueJson(
existing ? parseStoredJson(existing.value_json, "lookup") : undefined,
);
if (!next) {
return false;
}
const expiresAt = resolvePluginStateExpiresAtMs({
ttlMs: next.ttlMs,
now,
operation: "register",
path: store.path,
});
upsertPluginStateEntry(
store.db,
bindPluginStateEntry({
pluginId: params.pluginId,
namespace: params.namespace,
key: params.key,
valueJson: next.valueJson,
createdAt: now,
expiresAt,
}),
);
enforcePostRegisterLimits({
store,
pluginId: params.pluginId,
namespace: params.namespace,
maxEntries: params.maxEntries,
now,
protectedKey: params.key,
});
return true;
},
envOptions(params.env),
);
} catch (error) {
throw wrapPluginStateError(
error,
"register",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to update plugin state entry.",
);
}
}
export function pluginStateLookup(params: {
pluginId: string;
namespace: string;

View File

@@ -100,6 +100,24 @@ describe("plugin state keyed store", () => {
});
});
it("updates a key from the current stored value", async () => {
await withPluginStateTestState(async () => {
const store = createPluginStateSyncKeyedStore<{ count: number }>("discord", {
namespace: "sync-update",
maxEntries: 10,
});
const update = store.update;
if (!update) {
throw new Error("expected sync keyed store update support");
}
expect(update("counter", (current) => ({ count: (current?.count ?? 0) + 1 }))).toBe(true);
expect(update("counter", (current) => ({ count: (current?.count ?? 0) + 1 }))).toBe(true);
expect(update("counter", () => undefined)).toBe(false);
expect(store.lookup("counter")).toEqual({ count: 2 });
});
});
it("honors explicit store env without mutating process state", async () => {
await withOpenClawTestState(
{ label: "plugin-state-explicit-env-a", applyEnv: false },

View File

@@ -10,6 +10,7 @@ import {
pluginStateLookup,
pluginStateRegister,
pluginStateRegisterIfAbsent,
pluginStateUpdate,
} from "./plugin-state-store.sqlite.js";
import type {
OpenKeyedStoreOptions,
@@ -279,6 +280,27 @@ function createKeyedStoreForPluginId<T>(
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
});
},
async update(key, updateValue, opts) {
const normalizedKey = validateKey(key, "register");
return pluginStateUpdate({
pluginId,
namespace,
key: normalizedKey,
maxEntries,
updateValueJson: (current) => {
const next = updateValue(current as T | undefined);
if (next === undefined) {
return undefined;
}
const params = prepareRegisterParams(normalizedKey, next, defaultTtlMs, opts);
return {
valueJson: params.valueJson,
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
};
},
...(env ? { env } : {}),
});
},
async lookup(key) {
const normalizedKey = validateKey(key, "lookup");
return pluginStateLookup({
@@ -354,6 +376,27 @@ function createSyncKeyedStoreForPluginId<T>(
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
});
},
update(key, updateValue, opts) {
const normalizedKey = validateKey(key, "register");
return pluginStateUpdate({
pluginId,
namespace,
key: normalizedKey,
maxEntries,
updateValueJson: (current) => {
const next = updateValue(current as T | undefined);
if (next === undefined) {
return undefined;
}
const params = prepareRegisterParams(normalizedKey, next, defaultTtlMs, opts);
return {
valueJson: params.valueJson,
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
};
},
...(env ? { env } : {}),
});
},
lookup(key) {
const normalizedKey = validateKey(key, "lookup");
return pluginStateLookup({

View File

@@ -8,6 +8,11 @@ export type PluginStateEntry<T> = {
export type PluginStateKeyedStore<T> = {
register(key: string, value: T, opts?: { ttlMs?: number }): Promise<void>;
registerIfAbsent(key: string, value: T, opts?: { ttlMs?: number }): Promise<boolean>;
update?: (
key: string,
updateValue: (current: T | undefined) => T | undefined,
opts?: { ttlMs?: number },
) => Promise<boolean>;
lookup(key: string): Promise<T | undefined>;
consume(key: string): Promise<T | undefined>;
delete(key: string): Promise<boolean>;
@@ -18,6 +23,11 @@ export type PluginStateKeyedStore<T> = {
export type PluginStateSyncKeyedStore<T> = {
register(key: string, value: T, opts?: { ttlMs?: number }): void;
registerIfAbsent(key: string, value: T, opts?: { ttlMs?: number }): boolean;
update?: (
key: string,
updateValue: (current: T | undefined) => T | undefined,
opts?: { ttlMs?: number },
) => boolean;
lookup(key: string): T | undefined;
consume(key: string): T | undefined;
delete(key: string): boolean;

View File

@@ -209,6 +209,12 @@ function createDeferred<T>() {
return { promise, resolve, reject };
}
const neverSettlesPromise: Promise<never> = Promise.race([]);
function pendingPromise<T = unknown>(): Promise<T> {
return neverSettlesPromise as Promise<T>;
}
async function raceWithMacrotask(promise: Promise<unknown>): Promise<"resolved" | "pending"> {
return await Promise.race([
promise.then(() => "resolved" as const),
@@ -224,7 +230,7 @@ describe("refreshChat", () => {
});
it("dispatches chat refresh work without waiting for slow history or metadata RPCs", async () => {
const request = vi.fn(() => new Promise<unknown>(() => {}));
const request = vi.fn(() => pendingPromise());
const requestUpdate = vi.fn();
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
@@ -260,7 +266,7 @@ describe("refreshChat", () => {
});
it("scopes global chat refresh session rows to the selected agent", async () => {
const request = vi.fn(() => new Promise<unknown>(() => {}));
const request = vi.fn(() => pendingPromise());
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
sessionKey: "global",
@@ -288,7 +294,7 @@ describe("refreshChat", () => {
});
it("scopes agent main aliases as selected global chat refreshes", async () => {
const request = vi.fn(() => new Promise<unknown>(() => {}));
const request = vi.fn(() => pendingPromise());
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
sessionKey: "agent:work:main",
@@ -308,7 +314,7 @@ describe("refreshChat", () => {
});
it("scopes agent session refresh rows before the list limit", async () => {
const request = vi.fn(() => new Promise<unknown>(() => {}));
const request = vi.fn(() => pendingPromise());
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
sessionKey: "agent:work:dashboard",
@@ -327,7 +333,7 @@ describe("refreshChat", () => {
});
it("uses hello default for global chat refresh before agents list loads", async () => {
const request = vi.fn(() => new Promise<unknown>(() => {}));
const request = vi.fn(() => pendingPromise());
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
sessionKey: "global",
@@ -352,7 +358,7 @@ describe("refreshChat", () => {
});
it("keeps unknown chat refresh session rows unscoped", async () => {
const request = vi.fn(() => new Promise<unknown>(() => {}));
const request = vi.fn(() => pendingPromise());
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
sessionKey: "unknown",
@@ -378,7 +384,7 @@ describe("refreshChat", () => {
if (method === "chat.history") {
return history.promise;
}
return new Promise<unknown>(() => {});
return pendingPromise();
});
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
@@ -987,7 +993,7 @@ describe("refreshChat", () => {
it("does not wait for secondary chat metadata refreshes before showing history", async () => {
const previousFetch = globalThis.fetch;
globalThis.fetch = vi.fn(() => new Promise<Response>(() => {})) as never;
globalThis.fetch = vi.fn(() => pendingPromise<Response>()) as never;
try {
const request = vi.fn((method: string) => {
if (method === "chat.history") {
@@ -995,7 +1001,7 @@ describe("refreshChat", () => {
messages: [{ role: "assistant", content: [{ type: "text", text: "ready" }] }],
});
}
return new Promise(() => {});
return pendingPromise();
});
const host = makeHost({
client: { request } as unknown as ChatHost["client"],