diff --git a/docs/channels/imessage-from-bluebubbles.md b/docs/channels/imessage-from-bluebubbles.md index f40b1ac08bc..0e145d38536 100644 --- a/docs/channels/imessage-from-bluebubbles.md +++ b/docs/channels/imessage-from-bluebubbles.md @@ -248,7 +248,7 @@ iMessage catchup is now available as an opt-in feature on the bundled plugin. On There is no supported BlueBubbles runtime to switch back to. If iMessage verification fails, set `channels.imessage.enabled: false`, restart the Gateway, fix the `imsg` blocker, and retry the cutover. -The reply cache lives at `~/.openclaw/state/imessage/reply-cache.jsonl` (mode `0600`, parent dir `0700`). It is safe to delete if you want a clean slate. +The reply cache lives in SQLite plugin state. `openclaw doctor --fix` imports and archives the old `imessage/reply-cache.jsonl` sidecar when present. ## Related diff --git a/docs/channels/imessage.md b/docs/channels/imessage.md index c16ab0b6c33..a851998b3b3 100644 --- a/docs/channels/imessage.md +++ b/docs/channels/imessage.md @@ -533,7 +533,7 @@ When `imsg launch` is running and `openclaw channels status --probe` reports `pr - Inbound iMessage context includes both short `MessageSid` values and full message GUIDs when available. Short IDs are scoped to the recent in-memory reply cache and are checked against the current chat before use. If a short ID has expired or belongs to another chat, retry with the full `MessageSidFull`. + Inbound iMessage context includes both short `MessageSid` values and full message GUIDs when available. Short IDs are scoped to the recent SQLite-backed reply cache and are checked against the current chat before use. If a short ID has expired or belongs to another chat, retry with the full `MessageSidFull`. @@ -714,7 +714,7 @@ Each replayed row is fed through the live dispatch path (`evaluateIMessageInboun ### Cursor and retry semantics -Catchup keeps a per-account cursor at `/imessage/catchup/__.json` (the OpenClaw state dir defaults to `~/.openclaw`, overridable with `OPENCLAW_STATE_DIR`): +Catchup keeps a per-account cursor in SQLite plugin state: ```json { @@ -729,6 +729,7 @@ Catchup keeps a per-account cursor at `/imessage/catchup//imessage/catchup/*.json` cursor files into SQLite plugin state and archives the old files. ### Operator-visible signals diff --git a/extensions/imessage/legacy-state-migrations-api.ts b/extensions/imessage/legacy-state-migrations-api.ts new file mode 100644 index 00000000000..8836acf9994 --- /dev/null +++ b/extensions/imessage/legacy-state-migrations-api.ts @@ -0,0 +1 @@ +export { detectIMessageLegacyStateMigrations } from "./src/state-migrations.js"; diff --git a/extensions/imessage/package.json b/extensions/imessage/package.json index 2baf0b0ccb7..6549667291f 100644 --- a/extensions/imessage/package.json +++ b/extensions/imessage/package.json @@ -12,6 +12,9 @@ "./index.ts" ], "setupEntry": "./setup-entry.ts", + "setupFeatures": { + "legacyStateMigrations": true + }, "channel": { "id": "imessage", "label": "iMessage", diff --git a/extensions/imessage/setup-entry.ts b/extensions/imessage/setup-entry.ts index 0852fd76983..9c0bd7825c6 100644 --- a/extensions/imessage/setup-entry.ts +++ b/extensions/imessage/setup-entry.ts @@ -2,8 +2,15 @@ import { defineBundledChannelSetupEntry } from "openclaw/plugin-sdk/channel-entr export default defineBundledChannelSetupEntry({ importMetaUrl: import.meta.url, + features: { + legacyStateMigrations: true, + }, plugin: { specifier: "./api.js", exportName: "imessageSetupPlugin", }, + legacyStateMigrations: { + specifier: "./legacy-state-migrations-api.js", + exportName: "detectIMessageLegacyStateMigrations", + }, }); diff --git a/extensions/imessage/src/monitor-reply-cache.test.ts b/extensions/imessage/src/monitor-reply-cache.test.ts index 177ff87ab2c..bba49d10ff4 100644 --- a/extensions/imessage/src/monitor-reply-cache.test.ts +++ b/extensions/imessage/src/monitor-reply-cache.test.ts @@ -1,7 +1,4 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { resetIMessageShortIdState, findLatestIMessageEntryForChat, @@ -9,41 +6,15 @@ import { rememberIMessageReplyCache, resolveIMessageMessageId, } from "./monitor-reply-cache.js"; - -// Isolate from any live ~/.openclaw/imessage/reply-cache.jsonl that the -// developer might have from a running gateway. Without this, the on-disk -// hydrate path picks up production data and tests get cross-pollinated. -// -// vi.stubEnv defaults to per-test scoping in this codebase, which means a -// beforeAll-only stub gets unstubbed between tests. Mutate process.env -// directly so the override holds across the whole file. -let tempStateDir: string; -let priorStateDir: string | undefined; -beforeAll(() => { - tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-reply-cache-")); - priorStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = tempStateDir; -}); -afterAll(() => { - if (priorStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = priorStateDir; - } - fs.rmSync(tempStateDir, { recursive: true, force: true }); -}); +import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js"; beforeEach(() => { + installIMessageStateRuntimeForTest(); resetIMessageShortIdState(); - // Belt-and-suspenders: also nuke the persisted file directly. The - // _reset helper does this when OPENCLAW_STATE_DIR is set, but explicitly - // clearing here protects the test from any future refactor of _reset's - // gating logic. - try { - fs.rmSync(path.join(tempStateDir, "imessage", "reply-cache.jsonl"), { force: true }); - } catch { - // best-effort - } +}); + +afterEach(() => { + vi.useRealTimers(); }); describe("imessage short message id resolution", () => { @@ -271,11 +242,6 @@ describe("findLatestIMessageEntryForChat", () => { }); it("never crosses account boundaries", () => { - // Diagnostic: verify the temp-dir env stub is actually visible. - expect(process.env.OPENCLAW_STATE_DIR).toBe(tempStateDir); - const cachePath = path.join(tempStateDir, "imessage", "reply-cache.jsonl"); - expect(fs.existsSync(cachePath)).toBe(false); - rememberIMessageReplyCache({ accountId: "other-account", messageId: "foreign-account", @@ -342,57 +308,8 @@ describe("findLatestIMessageEntryForChat", () => { }); }); -describe("reply cache disk permissions", () => { - it("clamps pre-existing reply-cache.jsonl from older 0644/0755 to 0600/0700", () => { - // Older gateway versions wrote with default modes. Every append must - // clamp existing files back to owner-only — appendFileSync's `mode` - // only applies on creation, so a chmod-on-create-only path would leave - // the upgrade case world-readable forever. - const imsgDir = path.join(tempStateDir, "imessage"); - fs.mkdirSync(imsgDir, { recursive: true, mode: 0o755 }); - const cacheFile = path.join(imsgDir, "reply-cache.jsonl"); - fs.writeFileSync(cacheFile, "", { mode: 0o644 }); - fs.chmodSync(imsgDir, 0o755); - fs.chmodSync(cacheFile, 0o644); - - rememberIMessageReplyCache({ - accountId: "default", - messageId: "clamp-test-guid", - chatIdentifier: "+12069106512", - timestamp: Date.now(), - }); - - const fileMode = fs.statSync(cacheFile).mode & 0o777; - const dirMode = fs.statSync(imsgDir).mode & 0o777; - expect(fileMode).toBe(0o600); - expect(dirMode).toBe(0o700); - }); - - it("writes the cache file 0600 and parent dir 0700", () => { - // Map gateway-allocated short-ids to message guids; a hostile same-UID - // process reading or writing this file could (a) enumerate active - // conversation guids or (b) inject lines so a future shortId resolves - // to an attacker-chosen guid. Owner-only mode is the mitigation. - rememberIMessageReplyCache({ - accountId: "default", - messageId: "perm-test-guid", - chatIdentifier: "+12069106512", - timestamp: Date.now(), - }); - - const cacheFile = path.join(tempStateDir, "imessage", "reply-cache.jsonl"); - const cacheDir = path.dirname(cacheFile); - expect(fs.existsSync(cacheFile)).toBe(true); - - const fileMode = fs.statSync(cacheFile).mode & 0o777; - const dirMode = fs.statSync(cacheDir).mode & 0o777; - expect(fileMode).toBe(0o600); - expect(dirMode).toBe(0o700); - }); -}); - describe("hydrate-on-resolve (post-restart short-id persistence)", () => { - it("hydrates the on-disk JSONL before resolving a short id whose mapping predates this run", () => { + it("hydrates SQLite state before resolving a short id whose mapping predates this run", () => { // Issue-then-restart contract: a shortId we issued before a gateway // restart must still resolve afterwards. The first resolve call after // process boot would otherwise miss the persisted mapping because the @@ -407,15 +324,9 @@ describe("hydrate-on-resolve (post-restart short-id persistence)", () => { }); expect(issued.shortId).not.toBe(""); - // Simulate a restart: clear the in-memory state but leave the JSONL on - // disk. resetIMessageShortIdState only deletes the persisted file when - // OPENCLAW_STATE_DIR is set, so we have to keep the file ourselves - // since this test runs under the suite's temp state dir. - const cachePath = path.join(tempStateDir, "imessage", "reply-cache.jsonl"); - const persisted = fs.readFileSync(cachePath, "utf8"); - resetIMessageShortIdState(); - fs.mkdirSync(path.dirname(cachePath), { recursive: true }); - fs.writeFileSync(cachePath, persisted, "utf8"); + // Simulate a restart: clear only the process-local maps and leave the + // SQLite plugin-state rows intact. + resetIMessageShortIdState({ clearPersistent: false }); // Now resolve the short id we issued before the "restart". Without the // hydrate-on-resolve fix this throws "no longer available" because the @@ -428,6 +339,47 @@ describe("hydrate-on-resolve (post-restart short-id persistence)", () => { }), ).toBe("outbound-guid-pre-restart"); }); + + it("persists entries when optional chat fields are explicitly undefined", () => { + const issued = rememberIMessageReplyCache({ + accountId: "default", + messageId: "guid-with-undefined-optionals", + chatGuid: undefined, + chatIdentifier: undefined, + chatId: undefined, + timestamp: Date.now(), + }); + + resetIMessageShortIdState({ clearPersistent: false }); + + expect( + resolveIMessageMessageId(issued.shortId, { + requireKnownShortId: true, + chatContext: { chatIdentifier: "+15551234567" }, + }), + ).toBe("guid-with-undefined-optionals"); + }); + + it("does not reuse short ids after cached rows expire", () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-08T00:00:00Z")); + const first = rememberIMessageReplyCache({ + accountId: "default", + messageId: "old-guid", + timestamp: Date.now(), + }); + expect(first.shortId).toBe("1"); + + vi.setSystemTime(new Date("2026-05-08T07:00:00Z")); + resetIMessageShortIdState({ clearPersistent: false }); + const second = rememberIMessageReplyCache({ + accountId: "default", + messageId: "new-guid", + timestamp: Date.now(), + }); + + expect(second.shortId).toBe("2"); + }); }); describe("hydrate counter advancement (rowid-collision protection)", () => { diff --git a/extensions/imessage/src/monitor-reply-cache.ts b/extensions/imessage/src/monitor-reply-cache.ts index f6ca2627ee6..724fb3d90fc 100644 --- a/extensions/imessage/src/monitor-reply-cache.ts +++ b/extensions/imessage/src/monitor-reply-cache.ts @@ -1,15 +1,18 @@ -import fs from "node:fs"; -import path from "node:path"; +import { createHash } from "node:crypto"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; -import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime"; +import { getIMessageRuntime } from "./runtime.js"; -const REPLY_CACHE_MAX = 2000; +export const IMESSAGE_REPLY_CACHE_NAMESPACE = "imessage.reply-cache"; +export const IMESSAGE_REPLY_CACHE_MAX_ENTRIES = 2000; +export const IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE = "imessage.reply-cache-counter"; +export const IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES = 1; +export const IMESSAGE_REPLY_CACHE_COUNTER_KEY = "short-id-counter"; const REPLY_CACHE_TTL_MS = 6 * 60 * 60 * 1000; /** Recency window for the "react to the latest message" fallback. */ const LATEST_FALLBACK_MS = 10 * 60 * 1000; let persistenceFailureLogged = false; -let parseFailureLogged = false; function reportPersistenceFailure(scope: string, err: unknown): void { if (persistenceFailureLogged) { return; @@ -47,162 +50,70 @@ type IMessageReplyCacheEntry = IMessageChatContext & { isFromMe?: boolean; }; +type IMessageReplyCacheStore = PluginStateSyncKeyedStore; +type IMessageReplyCacheCounter = { counter: number }; + const imessageReplyCacheByMessageId = new Map(); const imessageShortIdToUuid = new Map(); const imessageUuidToShortId = new Map(); let imessageShortIdCounter = 0; -// On-disk persistence: short-id ↔ UUID mappings need to survive gateway -// restarts so an agent that received "[message_id:5]" before a restart can -// still react to that message after the restart. The on-disk store is -// best-effort — corruption or write failure falls back to the in-memory -// cache, so the worst case is the same as before persistence existed. - -function resolveReplyCachePath(): string { - return path.join(resolveStateDir(), "imessage", "reply-cache.jsonl"); +export function resolveIMessageReplyCacheEntryKey(messageId: string): string { + return createHash("sha256").update(messageId, "utf8").digest("hex").slice(0, 32); } -function readPersistedEntries(): { - entries: IMessageReplyCacheEntry[]; - maxObservedShortId: number; -} { - let raw: string; - try { - raw = fs.readFileSync(resolveReplyCachePath(), "utf8"); - } catch (err) { - if ((err as NodeJS.ErrnoException)?.code !== "ENOENT") { - reportPersistenceFailure("read", err); - } - return { entries: [], maxObservedShortId: 0 }; - } - const cutoff = Date.now() - REPLY_CACHE_TTL_MS; - const out: IMessageReplyCacheEntry[] = []; - // The counter must advance past every shortId we have ever observed in - // the file — including lines we skip because they are stale or malformed. - // Otherwise a future allocation can collide with a still-live mapping - // that came earlier in the file. - let maxObservedShortId = 0; - for (const line of raw.split(/\n+/)) { - if (!line) { - continue; - } - let parsed: Partial | null; - try { - parsed = JSON.parse(line) as Partial; - } catch { - if (!parseFailureLogged) { - parseFailureLogged = true; - logVerbose( - `imessage reply-cache: dropping unparseable line (further parse errors suppressed)`, - ); - } - continue; - } - if (parsed && typeof parsed.shortId === "string") { - const numeric = Number.parseInt(parsed.shortId, 10); - if (Number.isFinite(numeric) && numeric > maxObservedShortId) { - maxObservedShortId = numeric; - } - } - if ( - typeof parsed?.accountId !== "string" || - typeof parsed.messageId !== "string" || - typeof parsed.shortId !== "string" || - typeof parsed.timestamp !== "number" - ) { - continue; - } - if (parsed.timestamp < cutoff) { - continue; - } - out.push({ - accountId: parsed.accountId, - messageId: parsed.messageId, - shortId: parsed.shortId, - timestamp: parsed.timestamp, - chatGuid: typeof parsed.chatGuid === "string" ? parsed.chatGuid : undefined, - chatIdentifier: typeof parsed.chatIdentifier === "string" ? parsed.chatIdentifier : undefined, - chatId: typeof parsed.chatId === "number" ? parsed.chatId : undefined, - isFromMe: typeof parsed.isFromMe === "boolean" ? parsed.isFromMe : undefined, - }); - } - return { entries: out.slice(-REPLY_CACHE_MAX), maxObservedShortId }; +function openReplyCacheStore(): IMessageReplyCacheStore { + return getIMessageRuntime().state.openSyncKeyedStore({ + namespace: IMESSAGE_REPLY_CACHE_NAMESPACE, + maxEntries: IMESSAGE_REPLY_CACHE_MAX_ENTRIES, + }); } -// reply-cache.jsonl maps gateway-allocated short-ids to message guids. A -// hostile same-UID process could otherwise (a) read the file to learn -// active conversation guids, or (b) inject lines so a future shortId -// resolution returns an attacker-chosen guid (allowing the agent to -// react/edit/unsend a message it never saw). Owner-only mode on both the -// directory and file closes that vector — defaults are 0755/0644 which -// are world-readable on a multi-user Mac. -const REPLY_CACHE_DIR_MODE = 0o700; -const REPLY_CACHE_FILE_MODE = 0o600; - -function writePersistedEntries(entries: IMessageReplyCacheEntry[]): void { - const filePath = resolveReplyCachePath(); - try { - fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: REPLY_CACHE_DIR_MODE }); - fs.writeFileSync( - filePath, - entries.map((entry) => JSON.stringify(entry)).join("\n") + (entries.length ? "\n" : ""), - { encoding: "utf8", mode: REPLY_CACHE_FILE_MODE }, - ); - // mkdirSync's mode is masked by umask and only applies on creation. If - // the dir already existed from an older gateway version, clamp it now. - try { - fs.chmodSync(path.dirname(filePath), REPLY_CACHE_DIR_MODE); - fs.chmodSync(filePath, REPLY_CACHE_FILE_MODE); - } catch { - // best-effort — fs may not support chmod on every platform - } - } catch (err) { - reportPersistenceFailure("write", err); - } +function openReplyCacheCounterStore(): PluginStateSyncKeyedStore { + return getIMessageRuntime().state.openSyncKeyedStore({ + namespace: IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE, + maxEntries: IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES, + }); } -function appendPersistedEntry(entry: IMessageReplyCacheEntry): void { - const filePath = resolveReplyCachePath(); - try { - fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: REPLY_CACHE_DIR_MODE }); - fs.appendFileSync(filePath, `${JSON.stringify(entry)}\n`, { - encoding: "utf8", - mode: REPLY_CACHE_FILE_MODE, - }); - // Always clamp — appendFileSync's `mode` only applies on creation, so - // an existing 0644 file from an older gateway version would otherwise - // never get tightened. chmod is microseconds; doing it every append - // keeps the security guarantee monotonic instead of conditional on - // creation order. - try { - fs.chmodSync(path.dirname(filePath), REPLY_CACHE_DIR_MODE); - fs.chmodSync(filePath, REPLY_CACHE_FILE_MODE); - } catch { - // best-effort - } - } catch (err) { - reportPersistenceFailure("append", err); - } +function remainingTtlMs(timestamp: number): number | undefined { + const remaining = REPLY_CACHE_TTL_MS - Math.max(0, Date.now() - timestamp); + return remaining > 0 ? remaining : undefined; } let hydrated = false; -function hydrateFromDiskOnce(): void { +function hydrateFromStoreOnce(): void { if (hydrated) { return; } hydrated = true; - const { entries, maxObservedShortId } = readPersistedEntries(); - // Bump the counter past every observed shortId, even from dropped lines — - // see comment in readPersistedEntries. - if (maxObservedShortId > imessageShortIdCounter) { - imessageShortIdCounter = maxObservedShortId; + const cutoff = Date.now() - REPLY_CACHE_TTL_MS; + let entries: IMessageReplyCacheEntry[]; + try { + const counter = openReplyCacheCounterStore().lookup(IMESSAGE_REPLY_CACHE_COUNTER_KEY); + if (counter && Number.isSafeInteger(counter.counter) && counter.counter > 0) { + imessageShortIdCounter = Math.max(imessageShortIdCounter, counter.counter); + } + const store = openReplyCacheStore(); + entries = store + .entries() + .map(({ value }) => value) + .filter((entry) => entry.timestamp >= cutoff) + .toSorted((a, b) => a.timestamp - b.timestamp) + .slice(-IMESSAGE_REPLY_CACHE_MAX_ENTRIES); + for (const entry of entries) { + const numeric = Number.parseInt(entry.shortId, 10); + if (Number.isFinite(numeric) && numeric > imessageShortIdCounter) { + imessageShortIdCounter = numeric; + } + } + } catch (err) { + reportPersistenceFailure("read", err); + return; } if (entries.length === 0) { return; } - // Entries are appended chronologically, so iterate forward to keep the - // newest entry as the "live" mapping when the same messageId appears - // multiple times (e.g. after a write-rewrite cycle). for (const entry of entries) { imessageReplyCacheByMessageId.set(entry.messageId, entry); imessageShortIdToUuid.set(entry.shortId, entry.messageId); @@ -210,53 +121,104 @@ function hydrateFromDiskOnce(): void { } } +function persistReplyCacheEntry(entry: IMessageReplyCacheEntry): void { + const ttlMs = remainingTtlMs(entry.timestamp); + if (!ttlMs) { + return; + } + try { + openReplyCacheStore().register(resolveIMessageReplyCacheEntryKey(entry.messageId), entry, { + ttlMs, + }); + } catch (err) { + reportPersistenceFailure("write", err); + } +} + +function deleteReplyCacheEntry(messageId: string): void { + try { + openReplyCacheStore().delete(resolveIMessageReplyCacheEntryKey(messageId)); + } catch (err) { + reportPersistenceFailure("delete", err); + } +} + +function persistReplyCacheCounter(): void { + try { + openReplyCacheCounterStore().register(IMESSAGE_REPLY_CACHE_COUNTER_KEY, { + counter: imessageShortIdCounter, + }); + } catch (err) { + reportPersistenceFailure("counter", err); + } +} + +function buildReplyCacheEntry( + entry: Omit, + messageId: string, + shortId: string, +): IMessageReplyCacheEntry { + return { + accountId: entry.accountId, + messageId, + shortId, + timestamp: entry.timestamp, + ...(typeof entry.chatGuid === "string" ? { chatGuid: entry.chatGuid } : {}), + ...(typeof entry.chatIdentifier === "string" ? { chatIdentifier: entry.chatIdentifier } : {}), + ...(typeof entry.chatId === "number" ? { chatId: entry.chatId } : {}), + ...(typeof entry.isFromMe === "boolean" ? { isFromMe: entry.isFromMe } : {}), + }; +} + function generateShortId(): string { imessageShortIdCounter += 1; + persistReplyCacheCounter(); return String(imessageShortIdCounter); } export function rememberIMessageReplyCache( entry: Omit, ): IMessageReplyCacheEntry { - hydrateFromDiskOnce(); + hydrateFromStoreOnce(); const messageId = entry.messageId.trim(); if (!messageId) { return { ...entry, shortId: "" }; } let shortId = imessageUuidToShortId.get(messageId); - let allocatedNew = false; if (!shortId) { shortId = generateShortId(); imessageShortIdToUuid.set(shortId, messageId); imessageUuidToShortId.set(messageId, shortId); - allocatedNew = true; } - const fullEntry: IMessageReplyCacheEntry = { ...entry, messageId, shortId }; + const fullEntry = buildReplyCacheEntry(entry, messageId, shortId); imessageReplyCacheByMessageId.delete(messageId); imessageReplyCacheByMessageId.set(messageId, fullEntry); const cutoff = Date.now() - REPLY_CACHE_TTL_MS; let evicted = false; + const deletedMessageIds: string[] = []; for (const [key, value] of imessageReplyCacheByMessageId) { if (value.timestamp >= cutoff) { break; } imessageReplyCacheByMessageId.delete(key); + deletedMessageIds.push(key); if (value.shortId) { imessageShortIdToUuid.delete(value.shortId); imessageUuidToShortId.delete(key); } evicted = true; } - while (imessageReplyCacheByMessageId.size > REPLY_CACHE_MAX) { + while (imessageReplyCacheByMessageId.size > IMESSAGE_REPLY_CACHE_MAX_ENTRIES) { const oldest = imessageReplyCacheByMessageId.keys().next().value; if (!oldest) { break; } const oldEntry = imessageReplyCacheByMessageId.get(oldest); imessageReplyCacheByMessageId.delete(oldest); + deletedMessageIds.push(oldest); if (oldEntry?.shortId) { imessageShortIdToUuid.delete(oldEntry.shortId); imessageUuidToShortId.delete(oldest); @@ -264,14 +226,12 @@ export function rememberIMessageReplyCache( evicted = true; } - // Append-only is hot-path cheap; periodic rewrite happens when we evict - // stale entries so the file does not grow unbounded across restarts. - if (allocatedNew) { - appendPersistedEntry(fullEntry); - } if (evicted) { - writePersistedEntries([...imessageReplyCacheByMessageId.values()]); + for (const messageIdToDelete of deletedMessageIds) { + deleteReplyCacheEntry(messageIdToDelete); + } } + persistReplyCacheEntry(fullEntry); return fullEntry; } @@ -413,14 +373,12 @@ export function resolveIMessageMessageId( if (!trimmed) { return trimmed; } - // Hydrate the on-disk JSONL into the in-memory maps before reading them. - // Without this, the first post-restart action that arrives with a short - // MessageSid would miss `imessageShortIdToUuid` and fall through to the - // "no longer available" path, breaking the persistence contract — the - // mapping was on disk, we just hadn't read it yet on this read path. + // Hydrate SQLite-backed mappings before reading them. Without this, the + // first post-restart action with a short MessageSid would miss + // `imessageShortIdToUuid` and fall through to "no longer available". // `rememberIMessageReplyCache` already hydrates on its own, so this only // matters for the resolve-first-after-restart sequence. - hydrateFromDiskOnce(); + hydrateFromStoreOnce(); if (/^\d+$/.test(trimmed)) { // Cache hit: the cached entry carries the chat info this short id was @@ -476,7 +434,7 @@ export function isKnownFromMeIMessageMessageId( if (!trimmed || !ctx.accountId || !hasChatScope(ctx)) { return false; } - hydrateFromDiskOnce(); + hydrateFromStoreOnce(); const cached = imessageReplyCacheByMessageId.get(trimmed); if (!cached || cached.isFromMe !== true || cached.accountId !== ctx.accountId) { return false; @@ -579,24 +537,19 @@ function isPositiveChatMatch(entry: IMessageReplyCacheEntry, ctx: IMessageChatCo return false; } -export function resetIMessageShortIdState(): void { +export function resetIMessageShortIdState(options: { clearPersistent?: boolean } = {}): void { imessageReplyCacheByMessageId.clear(); imessageShortIdToUuid.clear(); imessageUuidToShortId.clear(); imessageShortIdCounter = 0; hydrated = false; persistenceFailureLogged = false; - parseFailureLogged = false; - // Only delete the persisted file when the test harness has explicitly - // pointed us at an isolated state directory. Otherwise we would nuke - // whatever live gateway happens to share `~/.openclaw` — and in vitest - // file-level parallelism, two test files calling this at once could - // race a peer's appendFileSync mid-write. - if (!process.env.OPENCLAW_STATE_DIR) { + if (options.clearPersistent === false) { return; } try { - fs.rmSync(resolveReplyCachePath(), { force: true }); + openReplyCacheStore().clear(); + openReplyCacheCounterStore().clear(); } catch { // best-effort } diff --git a/extensions/imessage/src/monitor.gating.test.ts b/extensions/imessage/src/monitor.gating.test.ts index c766a254c46..5927e34a763 100644 --- a/extensions/imessage/src/monitor.gating.test.ts +++ b/extensions/imessage/src/monitor.gating.test.ts @@ -7,8 +7,10 @@ import { } from "./monitor/inbound-processing.js"; import { parseIMessageNotification } from "./monitor/parse-notification.js"; import type { IMessagePayload } from "./monitor/types.js"; +import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js"; beforeEach(() => { + installIMessageStateRuntimeForTest(); resetIMessageShortIdState(); }); diff --git a/extensions/imessage/src/monitor.last-route.test.ts b/extensions/imessage/src/monitor.last-route.test.ts index 4b15a68d76b..92607eb4c7a 100644 --- a/extensions/imessage/src/monitor.last-route.test.ts +++ b/extensions/imessage/src/monitor.last-route.test.ts @@ -7,6 +7,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import type { createIMessageRpcClient } from "./client.js"; import { monitorIMessageProvider } from "./monitor.js"; import { loadIMessageCatchupCursor } from "./monitor/catchup.js"; +import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js"; const waitForTransportReadyMock = vi.hoisted(() => vi.fn(async () => {}), @@ -107,6 +108,7 @@ describe("iMessage monitor last-route updates", () => { } beforeEach(() => { + installIMessageStateRuntimeForTest(); waitForTransportReadyMock.mockReset().mockResolvedValue(undefined); createIMessageRpcClientMock.mockReset(); readChannelAllowFromStoreMock.mockReset().mockResolvedValue([]); diff --git a/extensions/imessage/src/monitor/catchup-bridge.test.ts b/extensions/imessage/src/monitor/catchup-bridge.test.ts index c9e960e210e..02a28773e8c 100644 --- a/extensions/imessage/src/monitor/catchup-bridge.test.ts +++ b/extensions/imessage/src/monitor/catchup-bridge.test.ts @@ -1,9 +1,11 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js"; import { runIMessageCatchup } from "./catchup-bridge.js"; -import { resolveCatchupConfig, saveIMessageCatchupCursor } from "./catchup.js"; +import { + resetIMessageCatchupCursorStoreForTest, + resolveCatchupConfig, + saveIMessageCatchupCursor, +} from "./catchup.js"; import type { IMessagePayload } from "./types.js"; type RpcCall = { @@ -51,17 +53,14 @@ function makeRow(opts: { } describe("runIMessageCatchup", () => { - let tempDir: string; - beforeEach(() => { - tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-catchup-bridge-")); - vi.stubEnv("OPENCLAW_STATE_DIR", tempDir); + installIMessageStateRuntimeForTest(); + resetIMessageCatchupCursorStoreForTest(); }); afterEach(() => { - vi.unstubAllEnvs(); + resetIMessageCatchupCursorStoreForTest(); vi.useRealTimers(); - fs.rmSync(tempDir, { recursive: true, force: true }); }); it("fetches chats then per-chat history and dispatches each row in rowid order", async () => { diff --git a/extensions/imessage/src/monitor/catchup.test.ts b/extensions/imessage/src/monitor/catchup.test.ts index 14c6dccc93b..68690e8cd79 100644 --- a/extensions/imessage/src/monitor/catchup.test.ts +++ b/extensions/imessage/src/monitor/catchup.test.ts @@ -1,12 +1,11 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js"; import { advanceIMessageCatchupCursor, capFailureRetriesMap, loadIMessageCatchupCursor, performIMessageCatchup, + resetIMessageCatchupCursorStoreForTest, resolveCatchupConfig, saveIMessageCatchupCursor, type CatchupDispatchFn, @@ -14,27 +13,9 @@ import { type IMessageCatchupRow, } from "./catchup.js"; -let tempStateDir: string; -let priorStateDir: string | undefined; - -beforeAll(() => { - tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-catchup-")); - priorStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = tempStateDir; -}); - -afterAll(() => { - if (priorStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = priorStateDir; - } - fs.rmSync(tempStateDir, { recursive: true, force: true }); -}); - beforeEach(() => { - // Wipe per-account cursor state between tests so each test starts clean. - fs.rmSync(path.join(tempStateDir, "imessage", "catchup"), { recursive: true, force: true }); + installIMessageStateRuntimeForTest(); + resetIMessageCatchupCursorStoreForTest(); }); describe("resolveCatchupConfig", () => { @@ -217,6 +198,17 @@ describe("capFailureRetriesMap", () => { // Both b and d at 9; tiebreak by guid string (alphabetical) → b, d expect(Object.keys(capped).toSorted()).toEqual(["b", "d"]); }); + + it("keeps the persisted retry map under the plugin-state value budget", () => { + const map = Object.fromEntries( + Array.from({ length: 800 }, (_, index) => [`GUID-${index}-${"x".repeat(120)}`, index + 1]), + ); + + const capped = capFailureRetriesMap(map); + + expect(Object.keys(capped).length).toBeLessThanOrEqual(512); + expect(new TextEncoder().encode(JSON.stringify(capped)).byteLength).toBeLessThanOrEqual(48_000); + }); }); describe("performIMessageCatchup", () => { @@ -452,6 +444,33 @@ describe("performIMessageCatchup", () => { expect(cursor?.failureRetries?.A).toBe(1); }); + it("keeps held failure state when a live monitor advances the same cursor mid-pass", async () => { + const dispatch = vi.fn(async () => { + await advanceIMessageCatchupCursor( + "primary", + { lastSeenMs: now - 10_000, lastSeenRowid: 50 }, + config, + ); + return { ok: false }; + }); + const fetch = fetchOf([row({ guid: "A", rowid: 10, date: now - 40_000 })]); + + const summary = await performIMessageCatchup({ + accountId: "primary", + config, + now, + fetch, + dispatch, + }); + + expect(summary.failed).toBe(1); + expect(summary.cursorAfter.lastSeenRowid).toBe(9); + + const cursor = await loadIMessageCatchupCursor("primary"); + expect(cursor?.lastSeenRowid).toBe(9); + expect(cursor?.failureRetries?.A).toBe(1); + }); + it("advances the cursor past parser-rejected rows via the fetch high-watermark", async () => { // Regression: without a high-watermark from the fetcher, an unparseable // row never reaches the loop, so the cursor never advances past it and diff --git a/extensions/imessage/src/monitor/catchup.ts b/extensions/imessage/src/monitor/catchup.ts index 4cecf96a4f3..a67c2da90ad 100644 --- a/extensions/imessage/src/monitor/catchup.ts +++ b/extensions/imessage/src/monitor/catchup.ts @@ -1,10 +1,6 @@ import { createHash } from "node:crypto"; -import path from "node:path"; -import type { FileLockOptions } from "openclaw/plugin-sdk/file-lock"; -import { withFileLock } from "openclaw/plugin-sdk/file-lock"; -import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; -import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; -import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { getIMessageRuntime } from "../runtime.js"; // iMessage inbound catchup. When the gateway is offline (crash, restart, mac // sleep, machine off), `imsg watch` resumes from current state and ignores @@ -25,20 +21,13 @@ const MAX_PER_RUN_LIMIT = 500; const DEFAULT_FIRST_RUN_LOOKBACK_MINUTES = 30; const DEFAULT_MAX_FAILURE_RETRIES = 10; const MAX_MAX_FAILURE_RETRIES = 1_000; -// Defense-in-depth bound on the retry map. A storm of unique failing GUIDs -// should not balloon the cursor file. When over the bound, keep only the -// highest-count entries (closest to give-up) and drop the rest. -const MAX_FAILURE_RETRY_MAP_SIZE = 5_000; -const CATCHUP_CURSOR_LOCK_OPTIONS: FileLockOptions = { - retries: { - retries: 6, - factor: 1.35, - minTimeout: 8, - maxTimeout: 180, - randomize: true, - }, - stale: 60_000, -}; +// Defense-in-depth bound on the retry map. The cursor is one plugin-state +// value, so keep the retry payload well below the 64KB store limit. +const MAX_FAILURE_RETRY_MAP_SIZE = 512; +const MAX_FAILURE_RETRY_MAP_JSON_BYTES = 48_000; +const textEncoder = new TextEncoder(); +export const IMESSAGE_CATCHUP_CURSOR_NAMESPACE = "imessage.catchup-cursors"; +export const IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES = 256; const cursorWriteQueues = new Map>(); export type IMessageCatchupConfig = { @@ -105,37 +94,37 @@ export type IMessageCatchupSummary = { windowEndMs: number; }; -function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { - if (env.OPENCLAW_STATE_DIR?.trim()) { - return resolveStateDir(env); - } - // Default test isolation: per-pid tmpdir. Mirrors the BB catchup pattern so - // the tmpdir-path-guard test that flags dynamic template-literal suffixes - // on os.tmpdir() paths stays green. - if (env.VITEST || env.NODE_ENV === "test") { - const name = "openclaw-vitest-" + process.pid; - return path.join(resolvePreferredOpenClawTmpDir(), name); - } - return resolveStateDir(env); +export function resolveIMessageCatchupCursorKey(accountId: string): string { + return createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 32); } -function resolveCursorFilePath(accountId: string): string { - // Layout matches inbound-dedupe / persisted-echo-cache so a replayed GUID - // is recognized by the existing dedupe after catchup re-feeds the message - // through the live dispatch path. - const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account"; - const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12); - return path.join(resolveStateDirFromEnv(), "imessage", "catchup", `${safePrefix}__${hash}.json`); +function openCatchupCursorStore(): PluginStateSyncKeyedStore { + return getIMessageRuntime().state.openSyncKeyedStore({ + namespace: IMESSAGE_CATCHUP_CURSOR_NAMESPACE, + maxEntries: IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES, + }); } -function enqueueCursorWrite(filePath: string, fn: () => Promise): Promise { - const prev = cursorWriteQueues.get(filePath) ?? Promise.resolve(); +function updateCatchupCursorStore( + key: string, + updateValue: (current: IMessageCatchupCursor | undefined) => IMessageCatchupCursor | undefined, +): boolean { + const store = openCatchupCursorStore(); + if (!store.update) { + throw new Error("iMessage catchup cursor persistence requires atomic plugin-state update."); + } + return store.update(key, updateValue); +} + +function enqueueCursorWrite(accountId: string, fn: () => Promise): Promise { + const key = resolveIMessageCatchupCursorKey(accountId); + const prev = cursorWriteQueues.get(key) ?? Promise.resolve(); const next = prev.then(fn, fn); - cursorWriteQueues.set(filePath, next); + cursorWriteQueues.set(key, next); next .finally(() => { - if (cursorWriteQueues.get(filePath) === next) { - cursorWriteQueues.delete(filePath); + if (cursorWriteQueues.get(key) === next) { + cursorWriteQueues.delete(key); } }) .catch(() => {}); @@ -159,63 +148,78 @@ function sanitizeFailureRetriesInput(raw: unknown): Record { return out; } -/** - * Cursor file path: `/imessage/catchup/__.json`. - * `openclawStateDir` resolves through `OPENCLAW_STATE_DIR` (or the plugin-sdk default, - * `~/.openclaw`). On a default install the cursor lands at - * `~/.openclaw/imessage/catchup/__.json`. - */ -export async function loadIMessageCatchupCursor( - accountId: string, -): Promise { - const filePath = resolveCursorFilePath(accountId); - return await loadIMessageCatchupCursorFromPath(filePath); -} - -async function loadIMessageCatchupCursorFromPath( - filePath: string, -): Promise { - const { value } = await readJsonFileWithFallback(filePath, null); +function normalizeIMessageCatchupCursor(value: unknown): IMessageCatchupCursor | null { if (!value || typeof value !== "object") { return null; } - if (typeof value.lastSeenMs !== "number" || !Number.isFinite(value.lastSeenMs)) { + const raw = value as Partial; + if (typeof raw.lastSeenMs !== "number" || !Number.isFinite(raw.lastSeenMs)) { return null; } - if (typeof value.lastSeenRowid !== "number" || !Number.isFinite(value.lastSeenRowid)) { + if (typeof raw.lastSeenRowid !== "number" || !Number.isFinite(raw.lastSeenRowid)) { return null; } - const failureRetries = sanitizeFailureRetriesInput(value.failureRetries); + const failureRetries = sanitizeFailureRetriesInput(raw.failureRetries); const hasRetries = Object.keys(failureRetries).length > 0; return { - lastSeenMs: value.lastSeenMs, - lastSeenRowid: value.lastSeenRowid, - updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0, + lastSeenMs: raw.lastSeenMs, + lastSeenRowid: raw.lastSeenRowid, + updatedAt: typeof raw.updatedAt === "number" ? raw.updatedAt : 0, ...(hasRetries ? { failureRetries } : {}), }; } +function readIMessageCatchupCursor(accountId: string): IMessageCatchupCursor | null { + return normalizeIMessageCatchupCursor( + openCatchupCursorStore().lookup(resolveIMessageCatchupCursorKey(accountId)), + ); +} + +export async function loadIMessageCatchupCursor( + accountId: string, +): Promise { + return readIMessageCatchupCursor(accountId); +} + +function buildIMessageCatchupCursor(next: { + lastSeenMs: number; + lastSeenRowid: number; + failureRetries?: Record; +}): IMessageCatchupCursor { + const sanitized = sanitizeFailureRetriesInput(next.failureRetries); + const hasRetries = Object.keys(sanitized).length > 0; + return { + lastSeenMs: next.lastSeenMs, + lastSeenRowid: next.lastSeenRowid, + updatedAt: Date.now(), + ...(hasRetries ? { failureRetries: sanitized } : {}), + }; +} + export async function saveIMessageCatchupCursor( accountId: string, next: { lastSeenMs: number; lastSeenRowid: number; failureRetries?: Record }, + options: { allowCursorRewindForRetries?: boolean } = {}, ): Promise { - const filePath = resolveCursorFilePath(accountId); - await saveIMessageCatchupCursorToPath(filePath, next); + const cursor = buildIMessageCatchupCursor(next); + updateCatchupCursorStore(resolveIMessageCatchupCursorKey(accountId), (existingValue) => { + const existing = normalizeIMessageCatchupCursor(existingValue); + if (existing && cursor.lastSeenRowid < existing.lastSeenRowid) { + if (!options.allowCursorRewindForRetries) { + return undefined; + } + return buildIMessageCatchupCursor({ + lastSeenMs: cursor.lastSeenMs, + lastSeenRowid: cursor.lastSeenRowid, + failureRetries: { ...existing.failureRetries, ...cursor.failureRetries }, + }); + } + return cursor; + }); } -async function saveIMessageCatchupCursorToPath( - filePath: string, - next: { lastSeenMs: number; lastSeenRowid: number; failureRetries?: Record }, -): Promise { - const sanitized = sanitizeFailureRetriesInput(next.failureRetries); - const hasRetries = Object.keys(sanitized).length > 0; - const cursor: IMessageCatchupCursor = { - lastSeenMs: next.lastSeenMs, - lastSeenRowid: next.lastSeenRowid, - updatedAt: Date.now(), - ...(hasRetries ? { failureRetries: sanitized } : {}), - }; - await writeJsonFileAtomically(filePath, cursor); +export function resetIMessageCatchupCursorStoreForTest(): void { + openCatchupCursorStore().clear(); } /** @@ -226,9 +230,10 @@ async function saveIMessageCatchupCursorToPath( export function capFailureRetriesMap( map: Record, maxSize: number = MAX_FAILURE_RETRY_MAP_SIZE, + maxBytes: number = MAX_FAILURE_RETRY_MAP_JSON_BYTES, ): Record { const entries = Object.entries(map); - if (entries.length <= maxSize) { + if (entries.length <= maxSize && textEncoder.encode(JSON.stringify(map)).byteLength <= maxBytes) { return map; } // Sort by count desc; stable tiebreak on guid string so the retained set @@ -236,9 +241,13 @@ export function capFailureRetriesMap( // debugging). entries.sort((a, b) => b[1] - a[1] || a[0].localeCompare(b[0])); const capped: Record = {}; - for (let i = 0; i < maxSize; i++) { + for (let i = 0; i < entries.length && i < maxSize; i++) { const [guid, count] = entries[i]; capped[guid] = count; + if (textEncoder.encode(JSON.stringify(capped)).byteLength > maxBytes) { + delete capped[guid]; + break; + } } return capped; } @@ -329,29 +338,30 @@ export async function advanceIMessageCatchupCursor( return false; } - const filePath = resolveCursorFilePath(accountId); - return await enqueueCursorWrite(filePath, () => - withFileLock(filePath, CATCHUP_CURSOR_LOCK_OPTIONS, async () => { - const cursor = await loadIMessageCatchupCursorFromPath(filePath); + return await enqueueCursorWrite(accountId, async () => { + let advanced = false; + updateCatchupCursorStore(resolveIMessageCatchupCursorKey(accountId), (existingValue) => { + const cursor = normalizeIMessageCatchupCursor(existingValue); if (cursor && next.lastSeenRowid <= cursor.lastSeenRowid) { - return false; + return undefined; } const blockingFailure = Object.values(cursor?.failureRetries ?? {}).some( (count) => count < config.maxFailureRetries, ); if (blockingFailure) { - return false; + return undefined; } - await saveIMessageCatchupCursorToPath(filePath, { + advanced = true; + return buildIMessageCatchupCursor({ lastSeenMs: Math.max(cursor?.lastSeenMs ?? next.lastSeenMs, next.lastSeenMs), lastSeenRowid: next.lastSeenRowid, failureRetries: cursor?.failureRetries, }); - return true; - }), - ); + }); + return advanced; + }); } /** @@ -534,11 +544,17 @@ export async function performIMessageCatchup( const capped = capFailureRetriesMap(failureRetries); summary.cursorAfter = { lastSeenMs, lastSeenRowid }; - await saveIMessageCatchupCursor(params.accountId, { - lastSeenMs, - lastSeenRowid, - failureRetries: capped, - }); + await saveIMessageCatchupCursor( + params.accountId, + { + lastSeenMs, + lastSeenRowid, + failureRetries: capped, + }, + { + allowCursorRewindForRetries: earliestHeldFailureRow !== null, + }, + ); if (summary.replayed > 0 || summary.failed > 0 || summary.givenUp > 0) { params.log?.( diff --git a/extensions/imessage/src/monitor/inbound-processing.test.ts b/extensions/imessage/src/monitor/inbound-processing.test.ts index 96555e2a510..8a040804218 100644 --- a/extensions/imessage/src/monitor/inbound-processing.test.ts +++ b/extensions/imessage/src/monitor/inbound-processing.test.ts @@ -1,10 +1,8 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; import { sanitizeTerminalText } from "openclaw/plugin-sdk/test-fixtures"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import { resetIMessageShortIdState, rememberIMessageReplyCache } from "../monitor-reply-cache.js"; +import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js"; import { buildIMessageInboundContext, describeIMessageEchoDropLog, @@ -13,6 +11,11 @@ import { } from "./inbound-processing.js"; import { createSelfChatCache } from "./self-chat-cache.js"; +beforeEach(() => { + installIMessageStateRuntimeForTest(); + resetIMessageShortIdState(); +}); + describe("resolveIMessageInboundDecision echo detection", () => { const cfg = {} as OpenClawConfig; type InboundDecisionParams = Parameters[0]; @@ -543,56 +546,42 @@ describe("resolveIMessageInboundDecision echo detection", () => { }); it("uses the production reply-cache lookup for bot-authored reaction targets", async () => { - const tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-reaction-cache-")); - const priorStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = tempStateDir; - try { - resetIMessageShortIdState(); - rememberIMessageReplyCache({ - accountId: "default", - messageId: "p:0/imsg-production", - chatGuid: "any;-;+15555550123", - chatIdentifier: "+15555550123", - chatId: 3, - timestamp: Date.now(), - isFromMe: true, - }); + rememberIMessageReplyCache({ + accountId: "default", + messageId: "p:0/imsg-production", + chatGuid: "any;-;+15555550123", + chatIdentifier: "+15555550123", + chatId: 3, + timestamp: Date.now(), + isFromMe: true, + }); - const decision = await resolveDecision({ - message: { - guid: "reaction-guid", - is_reaction: true, - reaction_emoji: "❤️", - is_reaction_add: true, - associated_message_guid: "p:0/imsg-production", - associated_message_type: 2000, - text: "Loved “tapback target”", - chat_id: 3, - chat_guid: "any;-;+15555550123", - chat_identifier: "+15555550123", - }, - messageText: "Loved “tapback target”", - bodyText: "Loved “tapback target”", - echoCache: { has: () => false }, - isKnownFromMeMessageId: undefined, - }); + const decision = await resolveDecision({ + message: { + guid: "reaction-guid", + is_reaction: true, + reaction_emoji: "❤️", + is_reaction_add: true, + associated_message_guid: "p:0/imsg-production", + associated_message_type: 2000, + text: "Loved “tapback target”", + chat_id: 3, + chat_guid: "any;-;+15555550123", + chat_identifier: "+15555550123", + }, + messageText: "Loved “tapback target”", + bodyText: "Loved “tapback target”", + echoCache: { has: () => false }, + isKnownFromMeMessageId: undefined, + }); - expect(decision.kind).toBe("reaction"); - if (decision.kind !== "reaction") { - throw new Error("expected reaction decision"); - } - expect(decision.text).toBe( - "iMessage reaction added: ❤️ by +15555550123 on msg imsg-production", - ); - } finally { - resetIMessageShortIdState(); - if (priorStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = priorStateDir; - } - fs.rmSync(tempStateDir, { recursive: true, force: true }); + expect(decision.kind).toBe("reaction"); + if (decision.kind !== "reaction") { + throw new Error("expected reaction decision"); } + expect(decision.text).toBe( + "iMessage reaction added: ❤️ by +15555550123 on msg imsg-production", + ); }); it("matches prefixed tapback targets against prefixed echo-cache ids in own mode", async () => { @@ -988,30 +977,6 @@ describe("resolveIMessageInboundDecision command auth", () => { }); describe("buildIMessageInboundContext MessageSid handling (rowid-leak regression)", () => { - let tempStateDir: string; - let priorStateDir: string | undefined; - beforeAll(() => { - tempStateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-inbound-")); - priorStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = tempStateDir; - }); - afterAll(() => { - if (priorStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = priorStateDir; - } - fs.rmSync(tempStateDir, { recursive: true, force: true }); - }); - beforeEach(() => { - resetIMessageShortIdState(); - try { - fs.rmSync(path.join(tempStateDir, "imessage", "reply-cache.jsonl"), { force: true }); - } catch { - // best-effort - } - }); - function buildParams(messageOverrides: Partial<{ id: number; guid: string }>) { const decision = { kind: "dispatch" as const, diff --git a/extensions/imessage/src/monitor/monitor-provider.echo-cache.test.ts b/extensions/imessage/src/monitor/monitor-provider.echo-cache.test.ts index 961671e2a2b..3f3ee36a9ee 100644 --- a/extensions/imessage/src/monitor/monitor-provider.echo-cache.test.ts +++ b/extensions/imessage/src/monitor/monitor-provider.echo-cache.test.ts @@ -1,19 +1,28 @@ -import fs from "node:fs"; -import os from "node:os"; -import path from "node:path"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + createIMessagePluginStateSyncStoreForTest, + installIMessageFailingStateRuntimeForTest, + installIMessageStateRuntimeForTest, +} from "../test-support/runtime.js"; import { createSentMessageCache } from "./echo-cache.js"; -import { rememberPersistedIMessageEcho } from "./persisted-echo-cache.js"; +import { + IMESSAGE_SENT_ECHOES_MAX_ENTRIES, + IMESSAGE_SENT_ECHOES_NAMESPACE, + IMESSAGE_SENT_ECHOES_TTL_MS, + hasPersistedIMessageEcho, + rememberPersistedIMessageEcho, + resetPersistedIMessageEchoCacheForTest, + resolveIMessageSentEchoEntryKey, +} from "./persisted-echo-cache.js"; describe("iMessage sent-message echo cache", () => { - const tempDirs: string[] = []; + beforeEach(() => { + installIMessageStateRuntimeForTest(); + resetPersistedIMessageEchoCacheForTest(); + }); afterEach(() => { vi.useRealTimers(); - vi.unstubAllEnvs(); - for (const dir of tempDirs.splice(0)) { - fs.rmSync(dir, { recursive: true, force: true }); - } }); it("matches recent text within the same scope", () => { @@ -82,47 +91,59 @@ describe("iMessage sent-message echo cache", () => { expect(cache.has("acct:imessage:+1555", { messageId: "m-1" })).toBe(true); }); - it("matches persisted echoes written by another process", () => { - const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-")); - tempDirs.push(stateDir); - vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); - const cache = createSentMessageCache(); - + it("matches persisted echoes written before the monitor cache is created", () => { rememberPersistedIMessageEcho({ scope: "acct:imessage:+1555", text: "OpenClaw imsg live test", messageId: "guid-1", }); + const cache = createSentMessageCache(); expect(cache.has("acct:imessage:+1555", { text: "OpenClaw imsg live test" })).toBe(true); expect(cache.has("acct:imessage:+1666", { text: "OpenClaw imsg live test" })).toBe(false); expect(cache.has("acct:imessage:+1555", { messageId: "guid-1" })).toBe(true); }); - it("writes sent-echoes.jsonl 0600 and parent dir 0700", () => { - // sent-echoes.jsonl carries scope keys + outbound message text + messageIds. - // Same threat model as reply-cache.jsonl: a same-UID hostile process could - // enumerate active conversations or inject lines so a future inbound dedupe - // call wrongly suppresses a legitimate inbound. Owner-only mode is the - // mitigation. - const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-perm-")); - tempDirs.push(stateDir); - vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); + it("persists text-only and id-only echoes without undefined fields", () => { + const scope = "acct:imessage:+1555"; + rememberPersistedIMessageEcho({ scope, text: "text-only" }); + rememberPersistedIMessageEcho({ scope, messageId: "id-only" }); - rememberPersistedIMessageEcho({ - scope: "acct:imessage:+1555", - text: "perm-test", - messageId: "guid-perm", + resetPersistedIMessageEchoCacheForTest({ clearPersistent: false }); + const cache = createSentMessageCache(); + + expect(cache.has(scope, { text: "text-only" })).toBe(true); + expect(cache.has(scope, { messageId: "id-only" })).toBe(true); + }); + + it("refreshes persisted echoes written after an earlier empty lookup", () => { + const cache = createSentMessageCache(); + const scope = "acct:imessage:+1555"; + expect(cache.has(scope, { messageId: "guid-late" })).toBe(false); + + const entry = { scope, messageId: "guid-late", timestamp: Date.now() }; + createIMessagePluginStateSyncStoreForTest({ + namespace: IMESSAGE_SENT_ECHOES_NAMESPACE, + maxEntries: IMESSAGE_SENT_ECHOES_MAX_ENTRIES, + }).register(resolveIMessageSentEchoEntryKey(entry), entry, { + ttlMs: IMESSAGE_SENT_ECHOES_TTL_MS, }); - const echoFile = path.join(stateDir, "imessage", "sent-echoes.jsonl"); - const echoDir = path.dirname(echoFile); - expect(fs.existsSync(echoFile)).toBe(true); + expect(cache.has(scope, { messageId: "guid-late" })).toBe(true); + }); - const fileMode = fs.statSync(echoFile).mode & 0o777; - const dirMode = fs.statSync(echoDir).mode & 0o777; - expect(fileMode).toBe(0o600); - expect(dirMode).toBe(0o700); + it("drops the in-memory mirror on persisted read failure so expired echoes do not match", () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-02-25T00:00:00Z")); + const scope = "acct:imessage:+1555"; + + rememberPersistedIMessageEcho({ scope, text: "stale echo" }); + expect(hasPersistedIMessageEcho({ scope, text: "stale echo" })).toBe(true); + + vi.advanceTimersByTime(IMESSAGE_SENT_ECHOES_TTL_MS + 1); + installIMessageFailingStateRuntimeForTest(); + + expect(hasPersistedIMessageEcho({ scope, text: "stale echo" })).toBe(false); }); it("retains entries written hours earlier so catchup replay sees own outbound rows", () => { @@ -132,10 +153,6 @@ describe("iMessage sent-message echo cache", () => { // rows around them — and the agent's replies to itself land back in the // inbound pipeline as if they were external sends. Regression guard for // the echo-cache retention extension that ships with #78649. - const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-ttl-")); - tempDirs.push(stateDir); - vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); - vi.useFakeTimers(); vi.setSystemTime(new Date("2026-05-08T12:00:00Z")); rememberPersistedIMessageEcho({ @@ -153,30 +170,4 @@ describe("iMessage sent-message echo cache", () => { ); expect(cache.has("acct:imessage:+1555", { messageId: "guid-pre-gap" })).toBe(true); }); - - it("clamps pre-existing sent-echoes.jsonl from older 0644/0755 to 0600/0700", () => { - // Older gateway versions wrote with default modes. After upgrade, the next - // remember must clamp the existing file/dir back to owner-only. - const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-echo-clamp-")); - tempDirs.push(stateDir); - vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); - - const imsgDir = path.join(stateDir, "imessage"); - fs.mkdirSync(imsgDir, { recursive: true, mode: 0o755 }); - const echoFile = path.join(imsgDir, "sent-echoes.jsonl"); - fs.writeFileSync(echoFile, "", { mode: 0o644 }); - fs.chmodSync(imsgDir, 0o755); - fs.chmodSync(echoFile, 0o644); - - rememberPersistedIMessageEcho({ - scope: "acct:imessage:+1555", - text: "clamp-test", - messageId: "guid-clamp", - }); - - const fileMode = fs.statSync(echoFile).mode & 0o777; - const dirMode = fs.statSync(imsgDir).mode & 0o777; - expect(fileMode).toBe(0o600); - expect(dirMode).toBe(0o700); - }); }); diff --git a/extensions/imessage/src/monitor/persisted-echo-cache.ts b/extensions/imessage/src/monitor/persisted-echo-cache.ts index 7b3c96d4143..9d97d13e79d 100644 --- a/extensions/imessage/src/monitor/persisted-echo-cache.ts +++ b/extensions/imessage/src/monitor/persisted-echo-cache.ts @@ -1,7 +1,7 @@ -import fs from "node:fs"; -import path from "node:path"; +import { createHash } from "node:crypto"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; -import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { getIMessageRuntime } from "../runtime.js"; type PersistedEchoEntry = { scope: string; @@ -16,32 +16,11 @@ type PersistedEchoEntry = { // a gateway gap would fall out of the dedupe set before catchup could replay // the inbound rows around them, and the agent's own messages would land back // in the inbound pipeline as if they were external sends. -const PERSISTED_ECHO_TTL_MS = 12 * 60 * 60 * 1000; -const MAX_PERSISTED_ECHO_ENTRIES = 256; +export const IMESSAGE_SENT_ECHOES_TTL_MS = 12 * 60 * 60 * 1000; +export const IMESSAGE_SENT_ECHOES_NAMESPACE = "imessage.sent-echoes"; +export const IMESSAGE_SENT_ECHOES_MAX_ENTRIES = 256; -// sent-echoes.jsonl carries scope keys + outbound message text + messageIds. -// A hostile same-UID process could otherwise (a) read the file to enumerate -// active conversations and outbound content, or (b) inject lines so a future -// inbound dedupe call wrongly suppresses a legitimate inbound message. Owner- -// only mode on both the directory and file closes that vector — defaults are -// 0755/0644 which are world-readable on a multi-user Mac. -const PERSISTED_ECHO_DIR_MODE = 0o700; -const PERSISTED_ECHO_FILE_MODE = 0o600; - -function resolvePersistedEchoPath(): string { - return path.join(resolveStateDir(), "imessage", "sent-echoes.jsonl"); -} - -function clampPersistedEchoModes(filePath: string): void { - // mkdirSync's mode is masked by umask and only applies on creation. If the - // dir or file already exists from an older gateway version, clamp now. - try { - fs.chmodSync(path.dirname(filePath), PERSISTED_ECHO_DIR_MODE); - fs.chmodSync(filePath, PERSISTED_ECHO_FILE_MODE); - } catch { - // best-effort — fs may not support chmod on every platform - } -} +type PersistedEchoStore = PluginStateSyncKeyedStore; function normalizeText(text: string | undefined): string | undefined { const normalized = text?.replace(/\r\n?/g, "\n").trim(); @@ -56,29 +35,7 @@ function normalizeMessageId(messageId: string | undefined): string | undefined { return normalized; } -function parseEntry(line: string): PersistedEchoEntry | null { - try { - const parsed = JSON.parse(line) as Partial; - if (typeof parsed.scope !== "string" || typeof parsed.timestamp !== "number") { - return null; - } - return { - scope: parsed.scope, - text: typeof parsed.text === "string" ? parsed.text : undefined, - messageId: typeof parsed.messageId === "string" ? parsed.messageId : undefined, - timestamp: parsed.timestamp, - }; - } catch { - return null; - } -} - -// In-memory mirror of the persisted file. The echo cache is consulted on -// every inbound message; without a cache, group-chat bursts trigger a -// readFileSync + JSON.parse for every member's reply. The mirror is -// invalidated by file mtime so concurrent gateway processes (rare) and -// post-restart hydrate still see fresh data. -let mirror: { entries: PersistedEchoEntry[]; mtimeMs: number } | null = null; +let mirror: PersistedEchoEntry[] | null = null; let persistenceFailureLogged = false; function reportFailure(scope: string, err: unknown): void { if (persistenceFailureLogged) { @@ -88,105 +45,54 @@ function reportFailure(scope: string, err: unknown): void { logVerbose(`imessage echo-cache: ${scope} disabled after first failure: ${String(err)}`); } -function loadMirrorIfStale(): void { - const filePath = resolvePersistedEchoPath(); - let mtimeMs: number; +export function resolveIMessageSentEchoEntryKey(entry: PersistedEchoEntry): string { + return createHash("sha256") + .update(JSON.stringify([entry.scope, entry.text ?? "", entry.messageId ?? "", entry.timestamp])) + .digest("hex") + .slice(0, 32); +} + +function openPersistedEchoStore(): PersistedEchoStore { + return getIMessageRuntime().state.openSyncKeyedStore({ + namespace: IMESSAGE_SENT_ECHOES_NAMESPACE, + maxEntries: IMESSAGE_SENT_ECHOES_MAX_ENTRIES, + }); +} + +function remainingTtlMs(timestamp: number): number | undefined { + const remaining = IMESSAGE_SENT_ECHOES_TTL_MS - Math.max(0, Date.now() - timestamp); + return remaining > 0 ? remaining : undefined; +} + +function loadMirrorFromStore(): void { try { - mtimeMs = fs.statSync(filePath).mtimeMs; - } catch (err) { - if ((err as NodeJS.ErrnoException)?.code !== "ENOENT") { - reportFailure("stat", err); - } - mirror = { entries: [], mtimeMs: 0 }; - return; - } - if (mirror && mirror.mtimeMs === mtimeMs) { - return; - } - let raw: string; - try { - raw = fs.readFileSync(filePath, "utf8"); + const cutoff = Date.now() - IMESSAGE_SENT_ECHOES_TTL_MS; + mirror = openPersistedEchoStore() + .entries() + .map(({ value }) => value) + .filter((entry) => entry.timestamp >= cutoff) + .toSorted((a, b) => a.timestamp - b.timestamp) + .slice(-IMESSAGE_SENT_ECHOES_MAX_ENTRIES); } catch (err) { reportFailure("read", err); - mirror = { entries: [], mtimeMs }; - return; + mirror = []; } - const cutoff = Date.now() - PERSISTED_ECHO_TTL_MS; - const entries = raw - .split(/\n+/) - .map(parseEntry) - .filter((entry): entry is PersistedEchoEntry => Boolean(entry && entry.timestamp >= cutoff)) - .slice(-MAX_PERSISTED_ECHO_ENTRIES); - mirror = { entries, mtimeMs }; } function readRecentEntries(): PersistedEchoEntry[] { - loadMirrorIfStale(); - return mirror?.entries ?? []; + loadMirrorFromStore(); + return mirror ?? []; } -// Trigger compaction once the on-disk file grows past 2x the cap or holds -// stale entries beyond the TTL window. Until then, every remember is an -// O(1) append rather than a full rewrite — group-chat bursts that send 5+ -// outbound messages back-to-back used to write the entire file 5+ times. -const COMPACT_AT_ENTRY_COUNT = MAX_PERSISTED_ECHO_ENTRIES * 2; - -function compactRecentEntries(entries: PersistedEchoEntry[]): void { - const filePath = resolvePersistedEchoPath(); - try { - fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: PERSISTED_ECHO_DIR_MODE }); - fs.writeFileSync( - filePath, - entries.map((entry) => JSON.stringify(entry)).join("\n") + (entries.length ? "\n" : ""), - { encoding: "utf8", mode: PERSISTED_ECHO_FILE_MODE }, - ); - clampPersistedEchoModes(filePath); - } catch (err) { - reportFailure("compact", err); - // Persistence failed; don't update the in-memory mirror so the next - // read still reflects what's actually on disk. +function persistEntry(entry: PersistedEchoEntry): void { + const ttlMs = remainingTtlMs(entry.timestamp); + if (!ttlMs) { return; } - // Update mirror to reflect what we just wrote, so the next has() call - // doesn't re-read the file we just authored. - let mtimeMs = 0; try { - mtimeMs = fs.statSync(filePath).mtimeMs; - } catch { - // ignore — stale mirror will refresh on next access - } - mirror = { entries: [...entries], mtimeMs }; -} - -function appendEntry(entry: PersistedEchoEntry): void { - const filePath = resolvePersistedEchoPath(); - try { - fs.mkdirSync(path.dirname(filePath), { recursive: true, mode: PERSISTED_ECHO_DIR_MODE }); - fs.appendFileSync(filePath, `${JSON.stringify(entry)}\n`, { - encoding: "utf8", - mode: PERSISTED_ECHO_FILE_MODE, - }); - // Always clamp — appendFileSync's `mode` only applies on creation, and - // an older gateway version may have left an existing 0644 file behind. - // chmod is microseconds; doing it every append keeps the security - // guarantee monotonic instead of conditional on creation order. - clampPersistedEchoModes(filePath); + openPersistedEchoStore().register(resolveIMessageSentEchoEntryKey(entry), entry, { ttlMs }); } catch (err) { - reportFailure("append", err); - return; - } - // Mirror stays in sync without re-reading the file: append our entry to - // the in-memory copy and bump the mtime to whatever the FS reports now. - let mtimeMs = 0; - try { - mtimeMs = fs.statSync(filePath).mtimeMs; - } catch { - // ignore - } - if (mirror) { - mirror = { entries: [...mirror.entries, entry], mtimeMs }; - } else { - mirror = { entries: [entry], mtimeMs }; + reportFailure("write", err); } } @@ -195,26 +101,23 @@ export function rememberPersistedIMessageEcho(params: { text?: string; messageId?: string; }): void { + const text = normalizeText(params.text); + const messageId = normalizeMessageId(params.messageId); const entry: PersistedEchoEntry = { scope: params.scope, - text: normalizeText(params.text), - messageId: normalizeMessageId(params.messageId), timestamp: Date.now(), + ...(text ? { text } : {}), + ...(messageId ? { messageId } : {}), }; if (!entry.text && !entry.messageId) { return; } - // Make sure the mirror reflects whatever's on disk before we decide - // whether a compaction is due. - loadMirrorIfStale(); - appendEntry(entry); - const total = mirror?.entries.length ?? 0; - const cutoff = Date.now() - PERSISTED_ECHO_TTL_MS; - const oldestStale = mirror?.entries[0] && mirror.entries[0].timestamp < cutoff; - if (total > COMPACT_AT_ENTRY_COUNT || oldestStale) { - const fresh = (mirror?.entries ?? []).filter((e) => e.timestamp >= cutoff); - compactRecentEntries(fresh.slice(-MAX_PERSISTED_ECHO_ENTRIES)); - } + loadMirrorFromStore(); + persistEntry(entry); + const cutoff = Date.now() - IMESSAGE_SENT_ECHOES_TTL_MS; + mirror = [...(mirror ?? []), entry] + .filter((candidate) => candidate.timestamp >= cutoff) + .slice(-IMESSAGE_SENT_ECHOES_MAX_ENTRIES); } export function hasPersistedIMessageEcho(params: { @@ -240,3 +143,18 @@ export function hasPersistedIMessageEcho(params: { } return false; } + +export function resetPersistedIMessageEchoCacheForTest( + options: { clearPersistent?: boolean } = {}, +): void { + mirror = null; + persistenceFailureLogged = false; + if (options.clearPersistent === false) { + return; + } + try { + openPersistedEchoStore().clear(); + } catch { + // best-effort + } +} diff --git a/extensions/imessage/src/monitor/self-chat-dedupe.test.ts b/extensions/imessage/src/monitor/self-chat-dedupe.test.ts index 4c5632b09bd..f94c130972a 100644 --- a/extensions/imessage/src/monitor/self-chat-dedupe.test.ts +++ b/extensions/imessage/src/monitor/self-chat-dedupe.test.ts @@ -1,7 +1,9 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { installIMessageStateRuntimeForTest } from "../test-support/runtime.js"; import { createSentMessageCache } from "./echo-cache.js"; import { resolveIMessageInboundDecision } from "./inbound-processing.js"; +import { resetPersistedIMessageEchoCacheForTest } from "./persisted-echo-cache.js"; import { createSelfChatCache } from "./self-chat-cache.js"; /** @@ -24,6 +26,11 @@ type InboundDecisionParams = Parameters[0 const cfg = {} as OpenClawConfig; +beforeEach(() => { + installIMessageStateRuntimeForTest(); + resetPersistedIMessageEchoCacheForTest(); +}); + function createParams( overrides: Omit, "message"> & { message?: Partial; diff --git a/extensions/imessage/src/runtime.ts b/extensions/imessage/src/runtime.ts index fb092c6093e..074831f287a 100644 --- a/extensions/imessage/src/runtime.ts +++ b/extensions/imessage/src/runtime.ts @@ -1,13 +1,13 @@ import type { PluginRuntime } from "openclaw/plugin-sdk/core"; import { createPluginRuntimeStore } from "openclaw/plugin-sdk/runtime-store"; -const { setRuntime: setIMessageRuntime, tryGetRuntime: getOptionalIMessageRuntime } = - createPluginRuntimeStore({ - pluginId: "imessage", - errorMessage: "iMessage runtime not initialized", - }); -// Only the optional accessor is exported: approval-reactions.ts opens a -// persistent keyed store best-effort and must never throw if the runtime has -// not yet bound. If a future caller genuinely needs a throwing accessor, -// re-export `getRuntime` here intentionally. -export { getOptionalIMessageRuntime, setIMessageRuntime }; +const { + clearRuntime: clearIMessageRuntime, + getRuntime: getIMessageRuntime, + setRuntime: setIMessageRuntime, + tryGetRuntime: getOptionalIMessageRuntime, +} = createPluginRuntimeStore({ + pluginId: "imessage", + errorMessage: "iMessage runtime not initialized", +}); +export { clearIMessageRuntime, getIMessageRuntime, getOptionalIMessageRuntime, setIMessageRuntime }; diff --git a/extensions/imessage/src/send.test.ts b/extensions/imessage/src/send.test.ts index 64e2617a56d..c02b080bf1f 100644 --- a/extensions/imessage/src/send.test.ts +++ b/extensions/imessage/src/send.test.ts @@ -1,7 +1,7 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { clearIMessageApprovalReactionTargetsForTest, resolveIMessageApprovalReactionTargetWithPersistence, @@ -11,8 +11,12 @@ import { findLatestIMessageEntryForChat, resetIMessageShortIdState, } from "./monitor-reply-cache.js"; -import { hasPersistedIMessageEcho } from "./monitor/persisted-echo-cache.js"; +import { + hasPersistedIMessageEcho, + resetPersistedIMessageEchoCacheForTest, +} from "./monitor/persisted-echo-cache.js"; import { sendMessageIMessage } from "./send.js"; +import { installIMessageStateRuntimeForTest } from "./test-support/runtime.js"; const IMESSAGE_TEST_CFG = { channels: { @@ -61,9 +65,16 @@ function createApprovalText(id = "approval-123"): string { } describe("sendMessageIMessage receipts", () => { + beforeEach(() => { + installIMessageStateRuntimeForTest(); + resetIMessageShortIdState(); + resetPersistedIMessageEchoCacheForTest(); + }); + afterEach(() => { clearIMessageApprovalReactionTargetsForTest(); resetIMessageShortIdState(); + resetPersistedIMessageEchoCacheForTest(); vi.restoreAllMocks(); vi.unstubAllEnvs(); vi.useRealTimers(); @@ -291,19 +302,18 @@ describe("sendMessageIMessage receipts", () => { expect(attachmentArgs[1]).toBe("--chat"); expect(attachmentArgs[2]).toBe("any;-;+15550004567"); expect(attachmentArgs.slice(3)).toEqual(["--file", "/tmp/image.png", "--transport", "auto"]); - expect( - findLatestIMessageEntryForChat({ - accountId: "default", - chatIdentifier: "any;-;+15550004567", - }), - ).toEqual( + const cachedEntry = findLatestIMessageEntryForChat({ + accountId: "default", + chatIdentifier: "any;-;+15550004567", + }); + expect(cachedEntry).toEqual( expect.objectContaining({ messageId: "p:0/dm-media-guid", - chatGuid: undefined, chatIdentifier: "any;-;+15550004567", isFromMe: true, }), ); + expect(cachedEntry).not.toHaveProperty("chatGuid"); expect(getClientMocks(client).request).not.toHaveBeenCalled(); }); diff --git a/extensions/imessage/src/state-migrations.test.ts b/extensions/imessage/src/state-migrations.test.ts new file mode 100644 index 00000000000..36a0ad5d7dd --- /dev/null +++ b/extensions/imessage/src/state-migrations.test.ts @@ -0,0 +1,294 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { resolveIMessageCatchupCursorKey } from "./monitor/catchup.js"; +import { detectIMessageLegacyStateMigrations } from "./state-migrations.js"; + +describe("detectIMessageLegacyStateMigrations", () => { + const tempDirs: string[] = []; + + afterEach(() => { + for (const dir of tempDirs.splice(0)) { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + function makeStateDir(): string { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-imsg-migration-")); + tempDirs.push(dir); + return dir; + } + + function legacyCatchupFilename(accountId: string): string { + return `${accountId}__${createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12)}.json`; + } + + it("imports reply, echo, and catchup sidecars into plugin state plans", async () => { + const stateDir = makeStateDir(); + const imsgDir = path.join(stateDir, "imessage"); + fs.mkdirSync(path.join(imsgDir, "catchup"), { recursive: true }); + fs.writeFileSync( + path.join(imsgDir, "reply-cache.jsonl"), + JSON.stringify({ + accountId: "default", + messageId: "guid-1", + shortId: "1", + timestamp: Date.now(), + chatIdentifier: "+15551234567", + }) + "\n", + ); + fs.writeFileSync( + path.join(imsgDir, "sent-echoes.jsonl"), + JSON.stringify({ + scope: "default:imessage:+15551234567", + text: "hello", + timestamp: Date.now(), + }) + "\n", + ); + fs.writeFileSync( + path.join(imsgDir, "catchup", "default__37a8eec1ce19.json"), + JSON.stringify({ + lastSeenMs: 1_700_000_000_000, + lastSeenRowid: 42, + updatedAt: 1_700_000_000_123, + }), + ); + + const plans = await detectIMessageLegacyStateMigrations({ + cfg: { channels: { imessage: { enabled: true } } } as never, + env: {}, + stateDir, + }); + + expect(plans.map((plan) => plan.label)).toEqual([ + "iMessage catchup cursor", + "iMessage reply short-id counter", + "iMessage reply short-id cache", + "iMessage sent-echo dedupe cache", + ]); + for (const plan of plans) { + expect(plan.kind).toBe("plugin-state-import"); + if (plan.kind !== "plugin-state-import") { + throw new Error("expected plugin-state-import plan"); + } + expect(plan.pluginId).toBe("imessage"); + if (plan.label !== "iMessage reply short-id counter") { + expect(plan.cleanupSource).toBe("rename"); + } + if ( + plan.label === "iMessage reply short-id cache" || + plan.label === "iMessage sent-echo dedupe cache" + ) { + expect(plan.cleanupWhenEmpty).toBe(true); + } + const entries = await plan.readEntries(); + expect(entries).toHaveLength(1); + } + + const catchupPlan = plans.find((plan) => plan.label === "iMessage catchup cursor"); + expect(catchupPlan?.kind).toBe("plugin-state-import"); + if (!catchupPlan || catchupPlan.kind !== "plugin-state-import") { + throw new Error("expected catchup plugin-state-import plan"); + } + const [catchupEntry] = await catchupPlan.readEntries(); + expect( + await catchupPlan.shouldReplaceExistingEntry?.({ + key: catchupEntry?.key ?? "", + existingValue: { lastSeenMs: 1_600_000_000_000, lastSeenRowid: 10, updatedAt: 0 }, + incomingValue: catchupEntry?.value, + }), + ).toBe(true); + expect( + await catchupPlan.shouldReplaceExistingEntry?.({ + key: catchupEntry?.key ?? "", + existingValue: { lastSeenMs: 1_800_000_000_000, lastSeenRowid: 99, updatedAt: 0 }, + incomingValue: catchupEntry?.value, + }), + ).toBe(false); + + const counterPlan = plans.find((plan) => plan.label === "iMessage reply short-id counter"); + expect(counterPlan?.kind).toBe("plugin-state-import"); + if (!counterPlan || counterPlan.kind !== "plugin-state-import") { + throw new Error("expected reply counter plugin-state-import plan"); + } + expect( + await counterPlan.shouldReplaceExistingEntry?.({ + key: "short-id-counter", + existingValue: { counter: 0 }, + incomingValue: { counter: 1 }, + }), + ).toBe(true); + expect( + await counterPlan.shouldReplaceExistingEntry?.({ + key: "short-id-counter", + existingValue: { counter: 2 }, + incomingValue: { counter: 1 }, + }), + ).toBe(false); + }); + + it("leaves unreadable reply-cache sidecars for a later migration attempt", async () => { + const stateDir = makeStateDir(); + const imsgDir = path.join(stateDir, "imessage"); + fs.mkdirSync(imsgDir, { recursive: true }); + const sourcePath = path.join(imsgDir, "reply-cache.jsonl"); + fs.writeFileSync(sourcePath, "\n"); + + const plans = await detectIMessageLegacyStateMigrations({ + cfg: { channels: { imessage: { enabled: true } } } as never, + env: {}, + stateDir, + }); + const replyPlan = plans.find((plan) => plan.label === "iMessage reply short-id cache"); + expect(replyPlan?.kind).toBe("plugin-state-import"); + if (!replyPlan || replyPlan.kind !== "plugin-state-import") { + throw new Error("expected reply cache plugin-state-import plan"); + } + fs.rmSync(sourcePath); + + expect(() => replyPlan.readEntries()).toThrow("Failed reading"); + }); + + it("keeps the latest live reply-cache row for duplicate message ids", async () => { + const stateDir = makeStateDir(); + const imsgDir = path.join(stateDir, "imessage"); + fs.mkdirSync(imsgDir, { recursive: true }); + const now = Date.now(); + fs.writeFileSync( + path.join(imsgDir, "reply-cache.jsonl"), + [ + JSON.stringify({ + accountId: "default", + messageId: "guid-dup", + shortId: "1", + timestamp: now - 2_000, + }), + JSON.stringify({ + accountId: "default", + messageId: "guid-dup", + shortId: "7", + timestamp: now - 1_000, + }), + ].join("\n"), + ); + + const plans = await detectIMessageLegacyStateMigrations({ + cfg: { channels: { imessage: { enabled: true } } } as never, + env: {}, + stateDir, + }); + const replyPlan = plans.find((plan) => plan.label === "iMessage reply short-id cache"); + const counterPlan = plans.find((plan) => plan.label === "iMessage reply short-id counter"); + if (!replyPlan || replyPlan.kind !== "plugin-state-import") { + throw new Error("expected reply cache plugin-state-import plan"); + } + if (!counterPlan || counterPlan.kind !== "plugin-state-import") { + throw new Error("expected reply counter plugin-state-import plan"); + } + + const replyEntries = await replyPlan.readEntries(); + const counterEntries = await counterPlan.readEntries(); + expect(replyEntries).toHaveLength(1); + const replyEntry = replyEntries[0]; + if (!replyEntry) { + throw new Error("expected reply cache entry"); + } + expect((replyEntry.value as { shortId?: string }).shortId).toBe("7"); + expect(counterEntries[0]?.value).toEqual({ counter: 7 }); + }); + + it("archives catchup cursor files that do not match configured accounts", async () => { + const stateDir = makeStateDir(); + const catchupDir = path.join(stateDir, "imessage", "catchup"); + fs.mkdirSync(catchupDir, { recursive: true }); + const sourcePath = path.join(catchupDir, "removed-account__123456789abc.json"); + fs.writeFileSync(sourcePath, JSON.stringify({ lastSeenMs: 1, lastSeenRowid: 2 })); + + const plans = await detectIMessageLegacyStateMigrations({ + cfg: { channels: { imessage: { enabled: true } } } as never, + env: {}, + stateDir, + }); + + const orphanPlan = plans.find((plan) => plan.label === "iMessage orphan catchup cursor"); + expect(orphanPlan).toMatchObject({ + kind: "plugin-state-import", + sourcePath, + cleanupSource: "rename", + cleanupWhenEmpty: true, + }); + if (!orphanPlan || orphanPlan.kind !== "plugin-state-import") { + throw new Error("expected orphan catchup plugin-state-import plan"); + } + expect(await orphanPlan.readEntries()).toEqual([]); + }); + + it("normalizes configured account ids before importing catchup cursor files", async () => { + const stateDir = makeStateDir(); + const catchupDir = path.join(stateDir, "imessage", "catchup"); + fs.mkdirSync(catchupDir, { recursive: true }); + const sourcePath = path.join(catchupDir, legacyCatchupFilename("work")); + fs.writeFileSync(sourcePath, JSON.stringify({ lastSeenMs: 1, lastSeenRowid: 2 })); + + const plans = await detectIMessageLegacyStateMigrations({ + cfg: { + channels: { + imessage: { + enabled: true, + accounts: { + Work: { cliPath: "imsg-work" }, + }, + }, + }, + } as never, + env: {}, + stateDir, + }); + + expect(plans.map((plan) => plan.label)).toEqual(["iMessage catchup cursor"]); + const [plan] = plans; + expect(plan?.kind).toBe("plugin-state-import"); + if (!plan || plan.kind !== "plugin-state-import") { + throw new Error("expected catchup plugin-state-import plan"); + } + expect(plan.sourcePath).toBe(sourcePath); + const [entry] = await plan.readEntries(); + expect(entry?.key).toBe(resolveIMessageCatchupCursorKey("work")); + }); + + it("caps imported catchup retry maps for plugin-state value limits", async () => { + const stateDir = makeStateDir(); + const catchupDir = path.join(stateDir, "imessage", "catchup"); + fs.mkdirSync(catchupDir, { recursive: true }); + fs.writeFileSync( + path.join(catchupDir, "default__37a8eec1ce19.json"), + JSON.stringify({ + lastSeenMs: 1, + lastSeenRowid: 2, + failureRetries: Object.fromEntries( + Array.from({ length: 800 }, (_, index) => [ + `GUID-${index}-${"x".repeat(120)}`, + index + 1, + ]), + ), + }), + ); + + const plans = await detectIMessageLegacyStateMigrations({ + cfg: { channels: { imessage: { enabled: true } } } as never, + env: {}, + stateDir, + }); + const catchupPlan = plans.find((plan) => plan.label === "iMessage catchup cursor"); + if (!catchupPlan || catchupPlan.kind !== "plugin-state-import") { + throw new Error("expected catchup plugin-state-import plan"); + } + + const [entry] = await catchupPlan.readEntries(); + expect(new TextEncoder().encode(JSON.stringify(entry?.value)).byteLength).toBeLessThanOrEqual( + 65_536, + ); + }); +}); diff --git a/extensions/imessage/src/state-migrations.ts b/extensions/imessage/src/state-migrations.ts new file mode 100644 index 00000000000..54cf09b492d --- /dev/null +++ b/extensions/imessage/src/state-migrations.ts @@ -0,0 +1,438 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs"; +import path from "node:path"; +import type { ChannelLegacyStateMigrationPlan } from "openclaw/plugin-sdk/channel-contract"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; +import { statRegularFileSync } from "openclaw/plugin-sdk/security-runtime"; +import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { uniqueStrings } from "openclaw/plugin-sdk/string-coerce-runtime"; +import { + listIMessageAccountIds, + resolveDefaultIMessageAccountId, + resolveIMessageAccount, +} from "./accounts.js"; +import { + IMESSAGE_REPLY_CACHE_MAX_ENTRIES, + IMESSAGE_REPLY_CACHE_COUNTER_KEY, + IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES, + IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE, + IMESSAGE_REPLY_CACHE_NAMESPACE, + resolveIMessageReplyCacheEntryKey, +} from "./monitor-reply-cache.js"; +import { + capFailureRetriesMap, + IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES, + IMESSAGE_CATCHUP_CURSOR_NAMESPACE, + resolveIMessageCatchupCursorKey, + type IMessageCatchupCursor, +} from "./monitor/catchup.js"; +import { + IMESSAGE_SENT_ECHOES_MAX_ENTRIES, + IMESSAGE_SENT_ECHOES_NAMESPACE, + IMESSAGE_SENT_ECHOES_TTL_MS, + resolveIMessageSentEchoEntryKey, +} from "./monitor/persisted-echo-cache.js"; + +type ReplyCacheEntry = { + accountId: string; + messageId: string; + shortId: string; + timestamp: number; + chatGuid?: string; + chatIdentifier?: string; + chatId?: number; + isFromMe?: boolean; +}; + +type SentEchoEntry = { + scope: string; + text?: string; + messageId?: string; + timestamp: number; +}; + +const REPLY_CACHE_TTL_MS = 6 * 60 * 60 * 1000; + +function fileExists(pathValue: string): boolean { + try { + return !statRegularFileSync(pathValue).missing; + } catch { + return false; + } +} + +function resolveMigrationStateDir(params: { env: NodeJS.ProcessEnv; stateDir?: string }): string { + return params.stateDir ?? resolveStateDir(params.env); +} + +function remainingTtlMs(timestamp: number, ttlMs: number): number | undefined { + const remaining = ttlMs - Math.max(0, Date.now() - timestamp); + return remaining > 0 ? remaining : undefined; +} + +function readJsonl(pathValue: string): unknown[] { + try { + return fs + .readFileSync(pathValue, "utf8") + .split(/\n+/) + .flatMap((line) => { + if (!line) { + return []; + } + try { + return [JSON.parse(line) as unknown]; + } catch { + return []; + } + }); + } catch (err) { + throw new Error(`Failed reading ${pathValue}: ${String(err)}`, { cause: err }); + } +} + +function parseReplyCacheEntry(raw: unknown): ReplyCacheEntry | null { + if (!raw || typeof raw !== "object") { + return null; + } + const parsed = raw as Partial; + if ( + typeof parsed.accountId !== "string" || + typeof parsed.messageId !== "string" || + typeof parsed.shortId !== "string" || + typeof parsed.timestamp !== "number" + ) { + return null; + } + return { + accountId: parsed.accountId, + messageId: parsed.messageId, + shortId: parsed.shortId, + timestamp: parsed.timestamp, + ...(typeof parsed.chatGuid === "string" ? { chatGuid: parsed.chatGuid } : {}), + ...(typeof parsed.chatIdentifier === "string" ? { chatIdentifier: parsed.chatIdentifier } : {}), + ...(typeof parsed.chatId === "number" ? { chatId: parsed.chatId } : {}), + ...(typeof parsed.isFromMe === "boolean" ? { isFromMe: parsed.isFromMe } : {}), + }; +} + +function readReplyCacheMaxShortId(sourcePath: string): number { + let max = 0; + for (const raw of readJsonl(sourcePath)) { + if (!raw || typeof raw !== "object") { + continue; + } + const shortId = (raw as { shortId?: unknown }).shortId; + if (typeof shortId !== "string") { + continue; + } + const numeric = Number.parseInt(shortId, 10); + if (Number.isFinite(numeric) && numeric > max) { + max = numeric; + } + } + return max; +} + +function readReplyCounterValue(value: unknown): number | null { + if (!value || typeof value !== "object") { + return null; + } + const counter = (value as { counter?: unknown }).counter; + return typeof counter === "number" && Number.isFinite(counter) ? counter : null; +} + +function shouldReplaceReplyCounter(existingValue: unknown, incomingValue: unknown): boolean { + const incomingCounter = readReplyCounterValue(incomingValue); + if (incomingCounter === null) { + return false; + } + const existingCounter = readReplyCounterValue(existingValue); + return existingCounter === null || incomingCounter > existingCounter; +} + +function parseSentEchoEntry(raw: unknown): SentEchoEntry | null { + if (!raw || typeof raw !== "object") { + return null; + } + const parsed = raw as Partial; + if (typeof parsed.scope !== "string" || typeof parsed.timestamp !== "number") { + return null; + } + return { + scope: parsed.scope, + timestamp: parsed.timestamp, + ...(typeof parsed.text === "string" ? { text: parsed.text } : {}), + ...(typeof parsed.messageId === "string" ? { messageId: parsed.messageId } : {}), + }; +} + +function listReplyCacheEntries(sourcePath: string): Array<{ + key: string; + value: ReplyCacheEntry; + ttlMs?: number; +}> { + const entriesByKey = new Map(); + for (const entry of readJsonl(sourcePath).map(parseReplyCacheEntry)) { + if (!entry) { + continue; + } + const ttlMs = remainingTtlMs(entry.timestamp, REPLY_CACHE_TTL_MS); + if (!ttlMs) { + continue; + } + const key = resolveIMessageReplyCacheEntryKey(entry.messageId); + entriesByKey.delete(key); + entriesByKey.set(key, { value: entry, ttlMs }); + } + return [...entriesByKey.entries()] + .slice(-IMESSAGE_REPLY_CACHE_MAX_ENTRIES) + .map(([key, entry]) => ({ key, value: entry.value, ttlMs: entry.ttlMs })); +} + +function listSentEchoEntries(sourcePath: string): Array<{ + key: string; + value: SentEchoEntry; + ttlMs?: number; +}> { + return readJsonl(sourcePath) + .map(parseSentEchoEntry) + .filter((entry): entry is SentEchoEntry => Boolean(entry)) + .slice(-IMESSAGE_SENT_ECHOES_MAX_ENTRIES) + .flatMap((entry) => { + const ttlMs = remainingTtlMs(entry.timestamp, IMESSAGE_SENT_ECHOES_TTL_MS); + if (!ttlMs) { + return []; + } + return [{ key: resolveIMessageSentEchoEntryKey(entry), value: entry, ttlMs }]; + }); +} + +function resolveLegacyCatchupCursorPath(stateDir: string, accountId: string): string { + const safePrefix = accountId.replace(/[^a-zA-Z0-9_-]/g, "_") || "account"; + const hash = createHash("sha256").update(accountId, "utf8").digest("hex").slice(0, 12); + return path.join(stateDir, "imessage", "catchup", `${safePrefix}__${hash}.json`); +} + +function listLegacyCatchupCursorPaths(stateDir: string): string[] { + const dir = path.join(stateDir, "imessage", "catchup"); + try { + return fs + .readdirSync(dir, { withFileTypes: true }) + .filter((entry) => entry.isFile() && entry.name.endsWith(".json")) + .map((entry) => path.join(dir, entry.name)); + } catch { + return []; + } +} + +function normalizeCatchupCursor(raw: unknown): IMessageCatchupCursor | null { + if (!raw || typeof raw !== "object") { + return null; + } + const value = raw as Partial; + if ( + typeof value.lastSeenMs !== "number" || + !Number.isFinite(value.lastSeenMs) || + typeof value.lastSeenRowid !== "number" || + !Number.isFinite(value.lastSeenRowid) + ) { + return null; + } + const failureRetries = sanitizeCatchupFailureRetries(value.failureRetries); + const hasRetries = Object.keys(failureRetries).length > 0; + return { + lastSeenMs: value.lastSeenMs, + lastSeenRowid: value.lastSeenRowid, + updatedAt: typeof value.updatedAt === "number" ? value.updatedAt : 0, + ...(hasRetries ? { failureRetries } : {}), + }; +} + +function readCatchupCursor(sourcePath: string): IMessageCatchupCursor { + let parsed: unknown; + try { + parsed = JSON.parse(fs.readFileSync(sourcePath, "utf8")) as unknown; + } catch (err) { + throw new Error(`Failed reading ${sourcePath}: ${String(err)}`, { cause: err }); + } + const cursor = normalizeCatchupCursor(parsed); + if (!cursor) { + throw new Error(`Invalid iMessage catchup cursor: ${sourcePath}`); + } + return cursor; +} + +function sanitizeCatchupFailureRetries(raw: unknown): Record { + if (!raw || typeof raw !== "object") { + return {}; + } + const out: Record = {}; + for (const [guid, count] of Object.entries(raw as Record)) { + if (typeof count === "number" && Number.isFinite(count) && count > 0) { + out[guid] = Math.floor(count); + } + } + return capFailureRetriesMap(out); +} + +function shouldReplaceCatchupCursor(existingValue: unknown, incomingValue: unknown): boolean { + const incoming = normalizeCatchupCursor(incomingValue); + if (!incoming) { + return false; + } + const existing = normalizeCatchupCursor(existingValue); + return ( + !existing || + incoming.lastSeenRowid > existing.lastSeenRowid || + (incoming.lastSeenRowid === existing.lastSeenRowid && incoming.lastSeenMs > existing.lastSeenMs) + ); +} + +function detectReplyCacheMigration(params: { + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const stateDir = resolveMigrationStateDir(params); + const sourcePath = path.join(stateDir, "imessage", "reply-cache.jsonl"); + if (!fileExists(sourcePath)) { + return []; + } + const plans: ChannelLegacyStateMigrationPlan[] = []; + plans.push({ + kind: "plugin-state-import", + label: "iMessage reply short-id counter", + sourcePath, + targetPath: `plugin state:${IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE}`, + pluginId: "imessage", + namespace: IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE, + maxEntries: IMESSAGE_REPLY_CACHE_COUNTER_MAX_ENTRIES, + scopeKey: "", + stateDir, + preview: `- iMessage reply short-id counter: ${sourcePath} → plugin state (${IMESSAGE_REPLY_CACHE_COUNTER_NAMESPACE})`, + readEntries: () => { + const maxShortId = readReplyCacheMaxShortId(sourcePath); + return maxShortId > 0 + ? [{ key: IMESSAGE_REPLY_CACHE_COUNTER_KEY, value: { counter: maxShortId } }] + : []; + }, + shouldReplaceExistingEntry: ({ existingValue, incomingValue }) => + shouldReplaceReplyCounter(existingValue, incomingValue), + }); + plans.push({ + kind: "plugin-state-import", + label: "iMessage reply short-id cache", + sourcePath, + targetPath: `plugin state:${IMESSAGE_REPLY_CACHE_NAMESPACE}`, + pluginId: "imessage", + namespace: IMESSAGE_REPLY_CACHE_NAMESPACE, + maxEntries: IMESSAGE_REPLY_CACHE_MAX_ENTRIES, + scopeKey: "", + stateDir, + cleanupSource: "rename", + cleanupWhenEmpty: true, + preview: `- iMessage reply short-id cache: ${sourcePath} → plugin state (${IMESSAGE_REPLY_CACHE_NAMESPACE})`, + readEntries: () => listReplyCacheEntries(sourcePath), + }); + return plans; +} + +function detectSentEchoMigration(params: { + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const stateDir = resolveMigrationStateDir(params); + const sourcePath = path.join(stateDir, "imessage", "sent-echoes.jsonl"); + if (!fileExists(sourcePath)) { + return []; + } + return [ + { + kind: "plugin-state-import", + label: "iMessage sent-echo dedupe cache", + sourcePath, + targetPath: `plugin state:${IMESSAGE_SENT_ECHOES_NAMESPACE}`, + pluginId: "imessage", + namespace: IMESSAGE_SENT_ECHOES_NAMESPACE, + maxEntries: IMESSAGE_SENT_ECHOES_MAX_ENTRIES, + scopeKey: "", + stateDir, + cleanupSource: "rename", + cleanupWhenEmpty: true, + preview: `- iMessage sent-echo dedupe cache: ${sourcePath} → plugin state (${IMESSAGE_SENT_ECHOES_NAMESPACE})`, + readEntries: () => listSentEchoEntries(sourcePath), + }, + ]; +} + +function detectCatchupCursorMigrations(params: { + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const stateDir = resolveMigrationStateDir(params); + const accountIds = uniqueStrings( + [resolveDefaultIMessageAccountId(params.cfg), ...listIMessageAccountIds(params.cfg)].map( + (accountId) => resolveIMessageAccount({ cfg: params.cfg, accountId }).accountId, + ), + ); + const configuredPaths = new Set( + accountIds.map((accountId) => resolveLegacyCatchupCursorPath(stateDir, accountId)), + ); + const configuredPlans = accountIds.flatMap((accountId) => { + const sourcePath = resolveLegacyCatchupCursorPath(stateDir, accountId); + if (!fileExists(sourcePath)) { + return []; + } + return { + kind: "plugin-state-import" as const, + label: "iMessage catchup cursor", + sourcePath, + targetPath: `plugin state:${IMESSAGE_CATCHUP_CURSOR_NAMESPACE}`, + pluginId: "imessage", + namespace: IMESSAGE_CATCHUP_CURSOR_NAMESPACE, + maxEntries: IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES, + scopeKey: "", + stateDir, + cleanupSource: "rename" as const, + preview: `- iMessage catchup cursor: ${sourcePath} → plugin state (${IMESSAGE_CATCHUP_CURSOR_NAMESPACE})`, + readEntries: () => { + const cursor = readCatchupCursor(sourcePath); + return [{ key: resolveIMessageCatchupCursorKey(accountId), value: cursor }]; + }, + shouldReplaceExistingEntry: (replaceParams: { + existingValue: unknown; + incomingValue: unknown; + }) => shouldReplaceCatchupCursor(replaceParams.existingValue, replaceParams.incomingValue), + }; + }); + const orphanPlans = listLegacyCatchupCursorPaths(stateDir) + .filter((sourcePath) => !configuredPaths.has(sourcePath)) + .map((sourcePath) => ({ + kind: "plugin-state-import" as const, + label: "iMessage orphan catchup cursor", + sourcePath, + targetPath: `plugin state:${IMESSAGE_CATCHUP_CURSOR_NAMESPACE}`, + pluginId: "imessage", + namespace: IMESSAGE_CATCHUP_CURSOR_NAMESPACE, + maxEntries: IMESSAGE_CATCHUP_CURSOR_MAX_ENTRIES, + scopeKey: "", + stateDir, + cleanupSource: "rename" as const, + cleanupWhenEmpty: true, + preview: `- iMessage orphan catchup cursor: ${sourcePath} → archived legacy state`, + readEntries: () => [], + })); + return [...configuredPlans, ...orphanPlans]; +} + +export async function detectIMessageLegacyStateMigrations(params: { + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + stateDir?: string; +}): Promise { + return [ + ...detectCatchupCursorMigrations(params), + ...detectReplyCacheMigration(params), + ...detectSentEchoMigration(params), + ]; +} diff --git a/extensions/imessage/src/test-support/runtime.ts b/extensions/imessage/src/test-support/runtime.ts new file mode 100644 index 00000000000..3b0cf1f29f0 --- /dev/null +++ b/extensions/imessage/src/test-support/runtime.ts @@ -0,0 +1,65 @@ +import fs from "node:fs"; +import path from "node:path"; +import type { + OpenKeyedStoreOptions, + PluginStateSyncKeyedStore, +} from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateKeyedStoreForTests, + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import type { PluginRuntime } from "openclaw/plugin-sdk/runtime-store"; +import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; +import { setIMessageRuntime } from "../runtime.js"; + +function createIMessageTestEnv(): NodeJS.ProcessEnv { + const stateDir = fs.mkdtempSync( + path.join(resolvePreferredOpenClawTmpDir(), "openclaw-imessage-state-"), + ); + return { ...process.env, OPENCLAW_STATE_DIR: stateDir }; +} + +let imessageTestEnv = createIMessageTestEnv(); + +export function createIMessagePluginStateSyncStoreForTest( + options: OpenKeyedStoreOptions, +): PluginStateSyncKeyedStore { + return createPluginStateSyncKeyedStoreForTests("imessage", { + ...options, + env: imessageTestEnv, + }); +} + +export function installIMessageStateRuntimeForTest(): void { + imessageTestEnv = createIMessageTestEnv(); + resetPluginStateStoreForTests(); + setIMessageRuntime({ + state: { + openKeyedStore: ((options) => + createPluginStateKeyedStoreForTests("imessage", { + ...options, + env: imessageTestEnv, + })) as PluginRuntime["state"]["openKeyedStore"], + openSyncKeyedStore: ((options) => + createIMessagePluginStateSyncStoreForTest( + options, + )) as PluginRuntime["state"]["openSyncKeyedStore"], + }, + channel: {}, + } as PluginRuntime); +} + +export function installIMessageFailingStateRuntimeForTest(): void { + setIMessageRuntime({ + state: { + openKeyedStore: (() => { + throw new Error("test plugin-state failure"); + }) as PluginRuntime["state"]["openKeyedStore"], + openSyncKeyedStore: (() => { + throw new Error("test plugin-state failure"); + }) as PluginRuntime["state"]["openSyncKeyedStore"], + }, + channel: {}, + } as PluginRuntime); +} diff --git a/src/channels/message/durable-receive.test.ts b/src/channels/message/durable-receive.test.ts index 32151adb7e8..cae29288793 100644 --- a/src/channels/message/durable-receive.test.ts +++ b/src/channels/message/durable-receive.test.ts @@ -42,6 +42,15 @@ function createMemoryStore(): PluginStateKeyedStore { values.set(key, { key, value, createdAt: Date.now() }); return true; }, + async update(key, updateValue) { + const next = updateValue(values.get(key)?.value); + if (next === undefined) { + return false; + } + assertNoUndefinedFields(next); + values.set(key, { key, value: next, createdAt: Date.now() }); + return true; + }, async lookup(key) { return values.get(key)?.value; }, @@ -146,6 +155,9 @@ describe("createDurableInboundReceiveJournal", () => { async registerIfAbsent() { return false; }, + async update() { + return false; + }, async lookup() { return undefined; }, @@ -167,6 +179,9 @@ describe("createDurableInboundReceiveJournal", () => { async registerIfAbsent() { return false; }, + async update() { + return false; + }, async lookup() { completedLookups += 1; return completedLookups === 2 @@ -213,6 +228,9 @@ describe("createDurableInboundReceiveJournal", () => { async registerIfAbsent() { return false; }, + async update() { + return false; + }, async lookup() { completedLookups += 1; return completedLookups === 2 diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 564e2efcc79..5ad83d37dfc 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -172,6 +172,7 @@ export type ChannelLegacyStateMigrationPlan = scopeKey: string; stateDir?: string; cleanupSource?: "rename"; + cleanupWhenEmpty?: boolean; preview?: string; shouldReplaceExistingEntry?: (params: { key: string; diff --git a/src/cli/program/config-guard.test.ts b/src/cli/program/config-guard.test.ts index aaff24e81b0..095b8949d95 100644 --- a/src/cli/program/config-guard.test.ts +++ b/src/cli/program/config-guard.test.ts @@ -215,6 +215,9 @@ describe("ensureConfigReady", () => { ["Telegram sticker cache", "telegram/sticker-cache.json"], ["Telegram thread bindings", "telegram/thread-bindings-default.json"], ["Telegram pairing allowFrom", "credentials/telegram-allowFrom.json"], + ["iMessage reply short-id cache", "imessage/reply-cache.jsonl"], + ["iMessage sent echo cache", "imessage/sent-echoes.jsonl"], + ["iMessage catchup cursor", "imessage/catchup/default.json"], ["WhatsApp root auth", "credentials/creds.json"], ])("runs doctor flow for bundled channel legacy state: %s", async (_label, relativePath) => { const root = useTempOpenClawHome(); diff --git a/src/cli/program/config-guard.ts b/src/cli/program/config-guard.ts index 6a6395f78ab..9bc1875004a 100644 --- a/src/cli/program/config-guard.ts +++ b/src/cli/program/config-guard.ts @@ -66,6 +66,14 @@ function isLegacyTelegramStateFile(name: string): boolean { ); } +function hasLegacyIMessageStateFiles(stateDir: string): boolean { + return ( + fileOrDirExists(path.join(stateDir, "imessage", "reply-cache.jsonl")) || + fileOrDirExists(path.join(stateDir, "imessage", "sent-echoes.jsonl")) || + dirHasFile(path.join(stateDir, "imessage", "catchup"), (name) => name.endsWith(".json")) + ); +} + function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir: string): boolean { if (fileOrDirExists(path.join(stateDir, "discord", "model-picker-preferences.json"))) { return true; @@ -73,6 +81,9 @@ function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir: if (dirHasFile(path.join(stateDir, "feishu", "dedup"), (name) => name.endsWith(".json"))) { return true; } + if (hasLegacyIMessageStateFiles(stateDir)) { + return true; + } if ( fileOrDirExists(path.join(oauthDir, "telegram-allowFrom.json")) || dirHasFile(path.join(stateDir, "telegram"), isLegacyTelegramStateFile) diff --git a/src/commands/doctor-state-migrations.test.ts b/src/commands/doctor-state-migrations.test.ts index 30506ae6d8b..dd214714557 100644 --- a/src/commands/doctor-state-migrations.test.ts +++ b/src/commands/doctor-state-migrations.test.ts @@ -1008,6 +1008,85 @@ describe("doctor legacy state migrations", () => { }); }); + it("archives empty plugin-state import sources when the channel plan asks for cleanup", async () => { + const root = await makeTempRoot(); + const sourceDir = path.join(root, "imessage"); + fs.mkdirSync(sourceDir, { recursive: true }); + const sourcePath = path.join(sourceDir, "reply-cache.jsonl"); + fs.writeFileSync(sourcePath, "expired\n", "utf-8"); + if (process.platform !== "win32") { + fs.chmodSync(sourceDir, 0o755); + fs.chmodSync(sourcePath, 0o644); + } + mockedChannelMigrationPlans.plans = [ + { + kind: "plugin-state-import", + label: "Test expired cache", + sourcePath, + targetPath: "plugin state:test.expired-cache", + pluginId: "telegram", + namespace: "test.expired-cache", + maxEntries: 4, + scopeKey: "", + cleanupSource: "rename", + cleanupWhenEmpty: true, + readEntries: () => [], + }, + ]; + + const detected = await detectLegacyStateMigrations({ + cfg: {}, + env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv, + }); + const result = await runLegacyStateMigrations({ detected }); + + expect(result.warnings).toStrictEqual([]); + expect(result.changes).toContain( + `Archived Test expired cache legacy source → ${sourcePath}.migrated`, + ); + expect(fs.existsSync(sourcePath)).toBe(false); + expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(true); + if (process.platform !== "win32") { + expect(fs.statSync(`${sourcePath}.migrated`).mode & 0o777).toBe(0o600); + } + }); + + it("keeps plugin-state import sources when reading entries fails", async () => { + const root = await makeTempRoot(); + const sourcePath = path.join(root, "legacy-cache.json"); + fs.writeFileSync(sourcePath, "legacy", "utf-8"); + mockedChannelMigrationPlans.plans = [ + { + kind: "plugin-state-import", + label: "Test unreadable cache", + sourcePath, + targetPath: "plugin state:test.unreadable-cache", + pluginId: "telegram", + namespace: "test.unreadable-cache", + maxEntries: 4, + scopeKey: "", + cleanupSource: "rename", + cleanupWhenEmpty: true, + readEntries: () => { + throw new Error("read failed"); + }, + }, + ]; + + const detected = await detectLegacyStateMigrations({ + cfg: {}, + env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv, + }); + const result = await runLegacyStateMigrations({ detected }); + + expect(result.changes).toStrictEqual([]); + expect(result.warnings).toStrictEqual([ + "Failed reading Test unreadable cache legacy source: Error: read failed", + ]); + expect(fs.existsSync(sourcePath)).toBe(true); + expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(false); + }); + it("keeps plugin-state import source when plugin cap eviction drops an imported row", async () => { const root = await makeTempRoot(); const maxPluginStateEntries = 40; diff --git a/src/gateway/server.sessions.list-changed.test.ts b/src/gateway/server.sessions.list-changed.test.ts index 3297aa06f27..2e93f671bb7 100644 --- a/src/gateway/server.sessions.list-changed.test.ts +++ b/src/gateway/server.sessions.list-changed.test.ts @@ -125,7 +125,7 @@ test("sessions.list keeps bulk rows lightweight and uses persisted model fields" "dashboard:child": sessionStoreEntry("sess-child", { updatedAt: Date.now() - 1_000, modelProvider: "anthropic", - model: "claude-sonnet-4-6", + model: "test-model-without-catalog-context", parentSessionKey: "agent:main:main", totalTokens: 0, totalTokensFresh: false, @@ -161,10 +161,10 @@ test("sessions.list keeps bulk rows lightweight and uses persisted model fields" expect(child?.parentSessionKey).toBe("agent:main:main"); expect(child?.totalTokens).toBeUndefined(); expect(child?.totalTokensFresh).toBe(false); - expect(child?.contextTokens).toBe(1_048_576); + expect(child?.contextTokens).toBeUndefined(); expect(child?.estimatedCostUsd).toBeUndefined(); expect(child?.modelProvider).toBe("anthropic"); - expect(child?.model).toBe("claude-sonnet-4-6"); + expect(child?.model).toBe("test-model-without-catalog-context"); ws.close(); }); diff --git a/src/infra/state-migrations.ts b/src/infra/state-migrations.ts index fe1e0f10b73..a3800599d4e 100644 --- a/src/infra/state-migrations.ts +++ b/src/infra/state-migrations.ts @@ -317,6 +317,51 @@ function archiveLegacyTaskStateSidecar(params: { ); } +function hardenLegacyImportSource(params: { + sourcePath: string; + label: string; + warnings: string[]; +}): boolean { + try { + fs.chmodSync(params.sourcePath, 0o600); + return true; + } catch (err) { + params.warnings.push(`Failed securing ${params.label} legacy source: ${String(err)}`); + return false; + } +} + +function archiveLegacyImportSource(params: { + sourcePath: string; + label: string; + changes: string[]; + warnings: string[]; +}): void { + const archivedPath = `${params.sourcePath}.migrated`; + if (fileExists(archivedPath)) { + params.warnings.push( + `Left migrated ${params.label} source in place because ${archivedPath} already exists`, + ); + return; + } + if (!hardenLegacyImportSource(params)) { + return; + } + try { + fs.renameSync(params.sourcePath, archivedPath); + try { + fs.chmodSync(archivedPath, 0o600); + } catch (err) { + params.warnings.push( + `Failed securing archived ${params.label} legacy source: ${String(err)}`, + ); + } + params.changes.push(`Archived ${params.label} legacy source → ${archivedPath}`); + } catch (err) { + params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`); + } +} + function listSqliteColumns(db: DatabaseSync, table: string): Set { const rows = db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name?: string }>; return new Set(rows.flatMap((row) => (row.name ? [row.name] : []))); @@ -1271,7 +1316,13 @@ async function runLegacyMigrationPlans( const existingValuesByKey = new Map(storeEntries.map(({ key, value }) => [key, value])); const expectedKeys = new Set(existingKeys); let remainingCapacity = Math.max(0, plan.maxEntries - storeEntries.length); - const entries = await plan.readEntries(); + let entries: Awaited>; + try { + entries = await plan.readEntries(); + } catch (err) { + warnings.push(`Failed reading ${plan.label} legacy source: ${String(err)}`); + return; + } const candidateEntries: Array<{ key: string; targetKey: string; @@ -1366,26 +1417,20 @@ async function runLegacyMigrationPlans( cleanupKeys = expectedKeys; } const allEntriesCovered = - entries.length > 0 && - entries.every( - ({ key }) => - cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)) && - !failedTargetKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)), - ); + (entries.length === 0 && plan.cleanupWhenEmpty === true) || + (entries.length > 0 && + entries.every( + ({ key }) => + cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)) && + !failedTargetKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)), + )); if (allEntriesCovered && plan.cleanupSource === "rename" && fileExists(plan.sourcePath)) { - const archivedPath = `${plan.sourcePath}.migrated`; - if (fileExists(archivedPath)) { - warnings.push( - `Left migrated ${plan.label} source in place because ${archivedPath} already exists`, - ); - return; - } - try { - fs.renameSync(plan.sourcePath, archivedPath); - changes.push(`Archived ${plan.label} legacy source → ${archivedPath}`); - } catch (err) { - warnings.push(`Failed archiving ${plan.label} legacy source: ${String(err)}`); - } + archiveLegacyImportSource({ + sourcePath: plan.sourcePath, + label: plan.label, + changes, + warnings, + }); } }); continue; diff --git a/src/plugin-state/plugin-state-store.sqlite.ts b/src/plugin-state/plugin-state-store.sqlite.ts index 1508e57c09b..4fcfe1418ce 100644 --- a/src/plugin-state/plugin-state-store.sqlite.ts +++ b/src/plugin-state/plugin-state-store.sqlite.ts @@ -552,6 +552,75 @@ export function pluginStateRegisterIfAbsent(params: { } } +export function pluginStateUpdate(params: { + pluginId: string; + namespace: string; + key: string; + maxEntries: number; + updateValueJson: (current: unknown) => { valueJson: string; ttlMs?: number } | undefined; + env?: NodeJS.ProcessEnv; +}): boolean { + try { + return runWriteTransaction( + "register", + (store) => { + const now = Date.now(); + deleteExpiredPluginStateNamespaceEntries(store.db, { + pluginId: params.pluginId, + namespace: params.namespace, + now, + }); + const existing = selectPluginStateEntry(store.db, { + pluginId: params.pluginId, + namespace: params.namespace, + key: params.key, + now, + }); + const next = params.updateValueJson( + existing ? parseStoredJson(existing.value_json, "lookup") : undefined, + ); + if (!next) { + return false; + } + const expiresAt = resolvePluginStateExpiresAtMs({ + ttlMs: next.ttlMs, + now, + operation: "register", + path: store.path, + }); + upsertPluginStateEntry( + store.db, + bindPluginStateEntry({ + pluginId: params.pluginId, + namespace: params.namespace, + key: params.key, + valueJson: next.valueJson, + createdAt: now, + expiresAt, + }), + ); + enforcePostRegisterLimits({ + store, + pluginId: params.pluginId, + namespace: params.namespace, + maxEntries: params.maxEntries, + now, + protectedKey: params.key, + }); + return true; + }, + envOptions(params.env), + ); + } catch (error) { + throw wrapPluginStateError( + error, + "register", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to update plugin state entry.", + ); + } +} + export function pluginStateLookup(params: { pluginId: string; namespace: string; diff --git a/src/plugin-state/plugin-state-store.test.ts b/src/plugin-state/plugin-state-store.test.ts index c7d3c29b204..0c125e7ef0d 100644 --- a/src/plugin-state/plugin-state-store.test.ts +++ b/src/plugin-state/plugin-state-store.test.ts @@ -100,6 +100,24 @@ describe("plugin state keyed store", () => { }); }); + it("updates a key from the current stored value", async () => { + await withPluginStateTestState(async () => { + const store = createPluginStateSyncKeyedStore<{ count: number }>("discord", { + namespace: "sync-update", + maxEntries: 10, + }); + const update = store.update; + if (!update) { + throw new Error("expected sync keyed store update support"); + } + + expect(update("counter", (current) => ({ count: (current?.count ?? 0) + 1 }))).toBe(true); + expect(update("counter", (current) => ({ count: (current?.count ?? 0) + 1 }))).toBe(true); + expect(update("counter", () => undefined)).toBe(false); + expect(store.lookup("counter")).toEqual({ count: 2 }); + }); + }); + it("honors explicit store env without mutating process state", async () => { await withOpenClawTestState( { label: "plugin-state-explicit-env-a", applyEnv: false }, diff --git a/src/plugin-state/plugin-state-store.ts b/src/plugin-state/plugin-state-store.ts index e800ee46d96..92188296920 100644 --- a/src/plugin-state/plugin-state-store.ts +++ b/src/plugin-state/plugin-state-store.ts @@ -10,6 +10,7 @@ import { pluginStateLookup, pluginStateRegister, pluginStateRegisterIfAbsent, + pluginStateUpdate, } from "./plugin-state-store.sqlite.js"; import type { OpenKeyedStoreOptions, @@ -279,6 +280,27 @@ function createKeyedStoreForPluginId( ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), }); }, + async update(key, updateValue, opts) { + const normalizedKey = validateKey(key, "register"); + return pluginStateUpdate({ + pluginId, + namespace, + key: normalizedKey, + maxEntries, + updateValueJson: (current) => { + const next = updateValue(current as T | undefined); + if (next === undefined) { + return undefined; + } + const params = prepareRegisterParams(normalizedKey, next, defaultTtlMs, opts); + return { + valueJson: params.valueJson, + ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), + }; + }, + ...(env ? { env } : {}), + }); + }, async lookup(key) { const normalizedKey = validateKey(key, "lookup"); return pluginStateLookup({ @@ -354,6 +376,27 @@ function createSyncKeyedStoreForPluginId( ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), }); }, + update(key, updateValue, opts) { + const normalizedKey = validateKey(key, "register"); + return pluginStateUpdate({ + pluginId, + namespace, + key: normalizedKey, + maxEntries, + updateValueJson: (current) => { + const next = updateValue(current as T | undefined); + if (next === undefined) { + return undefined; + } + const params = prepareRegisterParams(normalizedKey, next, defaultTtlMs, opts); + return { + valueJson: params.valueJson, + ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), + }; + }, + ...(env ? { env } : {}), + }); + }, lookup(key) { const normalizedKey = validateKey(key, "lookup"); return pluginStateLookup({ diff --git a/src/plugin-state/plugin-state-store.types.ts b/src/plugin-state/plugin-state-store.types.ts index dad9cd1eb02..2d868b76175 100644 --- a/src/plugin-state/plugin-state-store.types.ts +++ b/src/plugin-state/plugin-state-store.types.ts @@ -8,6 +8,11 @@ export type PluginStateEntry = { export type PluginStateKeyedStore = { register(key: string, value: T, opts?: { ttlMs?: number }): Promise; registerIfAbsent(key: string, value: T, opts?: { ttlMs?: number }): Promise; + update?: ( + key: string, + updateValue: (current: T | undefined) => T | undefined, + opts?: { ttlMs?: number }, + ) => Promise; lookup(key: string): Promise; consume(key: string): Promise; delete(key: string): Promise; @@ -18,6 +23,11 @@ export type PluginStateKeyedStore = { export type PluginStateSyncKeyedStore = { register(key: string, value: T, opts?: { ttlMs?: number }): void; registerIfAbsent(key: string, value: T, opts?: { ttlMs?: number }): boolean; + update?: ( + key: string, + updateValue: (current: T | undefined) => T | undefined, + opts?: { ttlMs?: number }, + ) => boolean; lookup(key: string): T | undefined; consume(key: string): T | undefined; delete(key: string): boolean; diff --git a/ui/src/ui/app-chat.test.ts b/ui/src/ui/app-chat.test.ts index 75618bdbc97..16351ba14a0 100644 --- a/ui/src/ui/app-chat.test.ts +++ b/ui/src/ui/app-chat.test.ts @@ -209,6 +209,12 @@ function createDeferred() { return { promise, resolve, reject }; } +const neverSettlesPromise: Promise = Promise.race([]); + +function pendingPromise(): Promise { + return neverSettlesPromise as Promise; +} + async function raceWithMacrotask(promise: Promise): Promise<"resolved" | "pending"> { return await Promise.race([ promise.then(() => "resolved" as const), @@ -224,7 +230,7 @@ describe("refreshChat", () => { }); it("dispatches chat refresh work without waiting for slow history or metadata RPCs", async () => { - const request = vi.fn(() => new Promise(() => {})); + const request = vi.fn(() => pendingPromise()); const requestUpdate = vi.fn(); const host = makeHost({ client: { request } as unknown as ChatHost["client"], @@ -260,7 +266,7 @@ describe("refreshChat", () => { }); it("scopes global chat refresh session rows to the selected agent", async () => { - const request = vi.fn(() => new Promise(() => {})); + const request = vi.fn(() => pendingPromise()); const host = makeHost({ client: { request } as unknown as ChatHost["client"], sessionKey: "global", @@ -288,7 +294,7 @@ describe("refreshChat", () => { }); it("scopes agent main aliases as selected global chat refreshes", async () => { - const request = vi.fn(() => new Promise(() => {})); + const request = vi.fn(() => pendingPromise()); const host = makeHost({ client: { request } as unknown as ChatHost["client"], sessionKey: "agent:work:main", @@ -308,7 +314,7 @@ describe("refreshChat", () => { }); it("scopes agent session refresh rows before the list limit", async () => { - const request = vi.fn(() => new Promise(() => {})); + const request = vi.fn(() => pendingPromise()); const host = makeHost({ client: { request } as unknown as ChatHost["client"], sessionKey: "agent:work:dashboard", @@ -327,7 +333,7 @@ describe("refreshChat", () => { }); it("uses hello default for global chat refresh before agents list loads", async () => { - const request = vi.fn(() => new Promise(() => {})); + const request = vi.fn(() => pendingPromise()); const host = makeHost({ client: { request } as unknown as ChatHost["client"], sessionKey: "global", @@ -352,7 +358,7 @@ describe("refreshChat", () => { }); it("keeps unknown chat refresh session rows unscoped", async () => { - const request = vi.fn(() => new Promise(() => {})); + const request = vi.fn(() => pendingPromise()); const host = makeHost({ client: { request } as unknown as ChatHost["client"], sessionKey: "unknown", @@ -378,7 +384,7 @@ describe("refreshChat", () => { if (method === "chat.history") { return history.promise; } - return new Promise(() => {}); + return pendingPromise(); }); const host = makeHost({ client: { request } as unknown as ChatHost["client"], @@ -987,7 +993,7 @@ describe("refreshChat", () => { it("does not wait for secondary chat metadata refreshes before showing history", async () => { const previousFetch = globalThis.fetch; - globalThis.fetch = vi.fn(() => new Promise(() => {})) as never; + globalThis.fetch = vi.fn(() => pendingPromise()) as never; try { const request = vi.fn((method: string) => { if (method === "chat.history") { @@ -995,7 +1001,7 @@ describe("refreshChat", () => { messages: [{ role: "assistant", content: [{ type: "text", text: "ready" }] }], }); } - return new Promise(() => {}); + return pendingPromise(); }); const host = makeHost({ client: { request } as unknown as ChatHost["client"],