diff --git a/extensions/telegram/src/message-dispatch-dedupe.test.ts b/extensions/telegram/src/message-dispatch-dedupe.test.ts index 4c97e95599e..69d6b277ed4 100644 --- a/extensions/telegram/src/message-dispatch-dedupe.test.ts +++ b/extensions/telegram/src/message-dispatch-dedupe.test.ts @@ -2,15 +2,28 @@ import { mkdtempSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import path from "node:path"; import type { Message } from "grammy/types"; -import { afterEach, describe, expect, it } from "vitest"; import { + createPluginStateKeyedStoreForTests, + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, buildTelegramMessageDispatchReplayKey, claimTelegramMessageDispatchReplay, commitTelegramMessageDispatchReplay, createTelegramMessageDispatchReplayGuard, releaseTelegramMessageDispatchReplay, + setTelegramMessageDispatchDedupeStoreForTest, } from "./message-dispatch-dedupe.js"; +type MessageDispatchDedupeStore = NonNullable< + Parameters[0] +>; +type SyncMessageDispatchDedupeStore = Extract; + const tempDirs: string[] = []; function createStorePath(): string { @@ -27,7 +40,19 @@ function message(params?: { chatId?: number; messageId?: number }): Message { } as Message; } +beforeEach(async () => { + resetPluginStateStoreForTests({ closeDatabase: false }); + const store = createPluginStateKeyedStoreForTests("telegram", { + namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, + maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + }) as NonNullable[0]>; + await store.clear(); + setTelegramMessageDispatchDedupeStoreForTest(store); +}); + afterEach(() => { + setTelegramMessageDispatchDedupeStoreForTest(undefined); + resetPluginStateStoreForTests(); for (const dir of tempDirs.splice(0)) { rmSync(dir, { recursive: true, force: true }); } @@ -73,6 +98,161 @@ describe("Telegram message dispatch replay guard", () => { ).resolves.toEqual({ kind: "duplicate" }); }); + it("preserves concurrent commits that share dedupe buckets", async () => { + const storePath = createStorePath(); + const writer = createTelegramMessageDispatchReplayGuard({ storePath }); + const keys = Array.from({ length: 400 }, (_, index) => + JSON.stringify(["message", "1234", index + 1]), + ); + + await commitTelegramMessageDispatchReplay({ + guard: writer, + accountId: "default", + keys, + }); + + const reader = createTelegramMessageDispatchReplayGuard({ storePath }); + await expect(reader.warmup("default")).resolves.toBe(keys.length); + }); + + it("falls back to same-process replay protection when plugin-state cannot open", async () => { + setTelegramMessageDispatchDedupeStoreForTest(undefined); + const errors: unknown[] = []; + const storePath = createStorePath(); + const guard = createTelegramMessageDispatchReplayGuard({ + storePath, + onDiskError: (error) => errors.push(error), + }); + const first = await claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }); + if (first.kind !== "claimed") { + throw new Error("expected initial claim"); + } + + await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(false); + + await expect( + claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }), + ).resolves.toEqual({ kind: "duplicate" }); + await expect(guard.hasRecent(first.key, { namespace: "default" })).resolves.toBe(true); + expect(errors.length).toBeGreaterThan(0); + }); + + it("keeps same-process replay protection when plugin-state commit fails", async () => { + const failingStore = createPluginStateKeyedStoreForTests("telegram", { + namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, + maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + }) as NonNullable[0]>; + setTelegramMessageDispatchDedupeStoreForTest({ + ...failingStore, + async register() { + throw new Error("state write failed"); + }, + }); + const storePath = createStorePath(); + const guard = createTelegramMessageDispatchReplayGuard({ storePath }); + const first = await claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }); + if (first.kind !== "claimed") { + throw new Error("expected initial claim"); + } + + await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(false); + + await expect( + claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }), + ).resolves.toEqual({ kind: "duplicate" }); + await expect(guard.hasRecent(first.key, { namespace: "default" })).resolves.toBe(true); + await expect(guard.warmup("default")).resolves.toBe(1); + }); + + it("keeps same-process replay protection when lookup fails after a successful commit", async () => { + const backingStore = createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, + maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + }) as SyncMessageDispatchDedupeStore; + let failLookup = false; + setTelegramMessageDispatchDedupeStoreForTest({ + ...backingStore, + lookup(key) { + if (failLookup) { + throw new Error("state read failed"); + } + return backingStore.lookup(key); + }, + }); + const storePath = createStorePath(); + const guard = createTelegramMessageDispatchReplayGuard({ storePath }); + const first = await claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }); + if (first.kind !== "claimed") { + throw new Error("expected initial claim"); + } + await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(true); + + failLookup = true; + + await expect( + claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }), + ).resolves.toEqual({ kind: "duplicate" }); + }); + + it("keeps replay histories isolated by session store path", async () => { + const firstStorePath = createStorePath(); + const secondStorePath = createStorePath(); + const firstGuard = createTelegramMessageDispatchReplayGuard({ + storePath: firstStorePath, + }); + const first = await claimTelegramMessageDispatchReplay({ + guard: firstGuard, + accountId: "default", + msg: message(), + }); + if (first.kind !== "claimed") { + throw new Error("expected initial claim"); + } + await commitTelegramMessageDispatchReplay({ + guard: firstGuard, + accountId: "default", + keys: [first.key], + }); + + const secondGuard = createTelegramMessageDispatchReplayGuard({ + storePath: secondStorePath, + }); + await expect( + claimTelegramMessageDispatchReplay({ + guard: secondGuard, + accountId: "default", + msg: message(), + }), + ).resolves.toEqual({ + kind: "claimed", + key: first.key, + }); + }); + it("keeps accounts isolated and releases retryable pre-dispatch claims", async () => { const storePath = createStorePath(); const guard = createTelegramMessageDispatchReplayGuard({ storePath }); @@ -112,4 +292,34 @@ describe("Telegram message dispatch replay guard", () => { key: first.key, }); }); + + it("lets an in-flight duplicate retry after the first claim is released", async () => { + const storePath = createStorePath(); + const guard = createTelegramMessageDispatchReplayGuard({ storePath }); + const first = await claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }); + if (first.kind !== "claimed") { + throw new Error("expected initial claim"); + } + + const duplicate = claimTelegramMessageDispatchReplay({ + guard, + accountId: "default", + msg: message(), + }); + releaseTelegramMessageDispatchReplay({ + guard, + accountId: "default", + keys: [first.key], + error: new Error("retry"), + }); + + await expect(duplicate).resolves.toEqual({ + kind: "claimed", + key: first.key, + }); + }); }); diff --git a/extensions/telegram/src/message-dispatch-dedupe.ts b/extensions/telegram/src/message-dispatch-dedupe.ts index c7c22fef5dd..529942460fb 100644 --- a/extensions/telegram/src/message-dispatch-dedupe.ts +++ b/extensions/telegram/src/message-dispatch-dedupe.ts @@ -1,19 +1,205 @@ +import { createHash } from "node:crypto"; +import fs from "node:fs"; import path from "node:path"; import type { Message } from "grammy/types"; -import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; +import type { + ClaimableDedupe, + ClaimableDedupeClaimResult, +} from "openclaw/plugin-sdk/persistent-dedupe"; +import type { + PluginStateKeyedStore, + PluginStateSyncKeyedStore, +} from "openclaw/plugin-sdk/plugin-state-runtime"; import { normalizeStringEntries, uniqueStrings } from "openclaw/plugin-sdk/string-coerce-runtime"; +import { getTelegramRuntime } from "./runtime.js"; const TELEGRAM_MESSAGE_DISPATCH_TTL_MS = 7 * 24 * 60 * 60 * 1000; -const TELEGRAM_MESSAGE_DISPATCH_MEMORY_MAX = 5000; -const TELEGRAM_MESSAGE_DISPATCH_FILE_MAX = 50_000; +export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE = "telegram.message-dispatch-dedupe"; +export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES = 4_096; +const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOGICAL_MAX_ENTRIES = 50_000; +const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_COUNT = 256; +const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_MAX_KEYS = 256; +const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_TTL_MS = 30_000; +const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_RETRY_MS = 10; +const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_ATTEMPTS = 50; export type TelegramMessageDispatchReplayGuard = ClaimableDedupe; +type TelegramMessageDispatchDedupeRecord = { + scopeKey: string; + namespace: string; + bucketId: string; + entries: Record; +}; + +type TelegramMessageDispatchDedupeStore = + | PluginStateKeyedStore + | PluginStateSyncKeyedStore; + +type PendingClaim = { + promise: Promise; + resolve: (result: boolean) => void; + reject: (error: unknown) => void; +}; + +type MemoryCommittedClaim = { + namespace: string; + expiresAt: number; +}; + +let dispatchDedupeStoreForTest: TelegramMessageDispatchDedupeStore | undefined; export type TelegramMessageDispatchClaim = | { kind: "claimed"; key: string } | { kind: "duplicate" } | { kind: "invalid" }; +function openDispatchDedupeStore(): TelegramMessageDispatchDedupeStore { + return ( + dispatchDedupeStoreForTest ?? + getTelegramRuntime().state.openKeyedStore({ + namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, + maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + }) + ); +} + +function resolveDispatchScopeKey(storePath: string): string { + return createHash("sha256").update(storePath, "utf8").digest("hex").slice(0, 24); +} + +function dedupeEntryKey(scopeKey: string, namespace: string, key: string): string { + return createHash("sha256") + .update(`${scopeKey}\0${namespace}\0${key}`, "utf8") + .digest("hex") + .slice(0, 32); +} + +function dedupeBucketId(key: string): string { + const bucketIndex = + Number.parseInt(createHash("sha256").update(key, "utf8").digest("hex").slice(0, 8), 16) % + TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_COUNT; + return bucketIndex.toString(16).padStart(2, "0"); +} + +function dedupeBucketEntryKey(scopeKey: string, namespace: string, bucketId: string): string { + return createHash("sha256") + .update(`${scopeKey}\0${namespace}\0${bucketId}`, "utf8") + .digest("hex") + .slice(0, 32); +} + +function dedupeLegacyBucketEntryKey(params: { + scopeKey: string; + namespace: string; + bucketId: string; + sourcePath: string; +}): string { + const sourceKey = createHash("sha256") + .update(params.sourcePath, "utf8") + .digest("hex") + .slice(0, 12); + return dedupeBucketEntryKey(params.scopeKey, params.namespace, `${params.bucketId}:${sourceKey}`); +} + +function dedupeBucketLockKey(bucketKey: string): string { + return `${bucketKey}:lock`; +} + +async function sleep(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + +function pruneDedupeBucketEntries(entries: Record, now: number): void { + for (const [key, timestamp] of Object.entries(entries)) { + if (typeof timestamp !== "number" || !Number.isFinite(timestamp)) { + delete entries[key]; + continue; + } + if (now - timestamp >= TELEGRAM_MESSAGE_DISPATCH_TTL_MS) { + delete entries[key]; + } + } + const keys = Object.keys(entries); + if (keys.length <= TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_MAX_KEYS) { + return; + } + for (const key of keys + .toSorted((left, right) => entries[left] - entries[right]) + .slice(0, keys.length - TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_MAX_KEYS)) { + delete entries[key]; + } +} + +function createDedupeBucketRecord(params: { + scopeKey: string; + namespace: string; + bucketId: string; + entries?: Record; +}): TelegramMessageDispatchDedupeRecord { + return { + scopeKey: params.scopeKey, + namespace: params.namespace, + bucketId: params.bucketId, + entries: { ...params.entries }, + }; +} + +function normalizeDedupeBucketRecord( + value: TelegramMessageDispatchDedupeRecord | undefined, + params: { + scopeKey: string; + namespace: string; + bucketId: string; + now: number; + }, +): TelegramMessageDispatchDedupeRecord { + const entries = + value?.scopeKey === params.scopeKey && + value.namespace === params.namespace && + value.bucketId === params.bucketId && + value.entries && + typeof value.entries === "object" + ? { ...value.entries } + : {}; + pruneDedupeBucketEntries(entries, params.now); + return createDedupeBucketRecord({ + scopeKey: params.scopeKey, + namespace: params.namespace, + bucketId: params.bucketId, + entries, + }); +} + +async function lookupDedupeBucketContains(params: { + store: TelegramMessageDispatchDedupeStore; + scopeKey: string; + namespace: string; + bucketId: string; + bucketKey: string; + key: string; + now: number; +}): Promise { + const bucket = normalizeDedupeBucketRecord(await params.store.lookup(params.bucketKey), params); + if (bucket.entries[params.key] !== undefined) { + return true; + } + for (const entry of await params.store.entries()) { + if ( + entry.key === params.bucketKey || + entry.value.scopeKey !== params.scopeKey || + entry.value.namespace !== params.namespace || + entry.value.bucketId !== params.bucketId + ) { + continue; + } + const legacyBucket = normalizeDedupeBucketRecord(entry.value, params); + if (legacyBucket.entries[params.key] !== undefined) { + return true; + } + } + return false; +} + function sanitizeFileSegment(value: string): string { const trimmed = value.trim(); if (!trimmed) { @@ -22,6 +208,18 @@ function sanitizeFileSegment(value: string): string { return trimmed.replace(/[^a-zA-Z0-9_-]/g, "_"); } +export function resolveTelegramMessageDispatchLegacyPath(params: { + storePath: string; + namespace: string; +}): string { + return path.join( + path.dirname(params.storePath), + `${path.basename(params.storePath)}.telegram-message-dispatch-${sanitizeFileSegment( + params.namespace, + )}.json`, + ); +} + export function buildTelegramMessageDispatchReplayKey(msg: Message): string | null { const chatId = msg.chat?.id; const messageId = msg.message_id; @@ -35,19 +233,283 @@ export function createTelegramMessageDispatchReplayGuard(params: { storePath: string; onDiskError?: (error: unknown) => void; }): TelegramMessageDispatchReplayGuard { - return createClaimableDedupe({ - ttlMs: TELEGRAM_MESSAGE_DISPATCH_TTL_MS, - memoryMaxSize: TELEGRAM_MESSAGE_DISPATCH_MEMORY_MAX, - fileMaxEntries: TELEGRAM_MESSAGE_DISPATCH_FILE_MAX, - resolveFilePath: (namespace) => - path.join( - path.dirname(params.storePath), - `${path.basename(params.storePath)}.telegram-message-dispatch-${sanitizeFileSegment( + const scopeKey = resolveDispatchScopeKey(params.storePath); + const onStateError = params.onDiskError; + let store: TelegramMessageDispatchDedupeStore | undefined; + const inflight = new Map(); + const committedInMemory = new Map(); + const bucketWriteQueue = new Map>(); + + function getStore(): TelegramMessageDispatchDedupeStore | undefined { + if (store) { + return store; + } + try { + store = openDispatchDedupeStore(); + return store; + } catch (error) { + onStateError?.(error); + return undefined; + } + } + + function pruneCommittedInMemory(now = Date.now()) { + for (const [entryKey, entry] of committedInMemory) { + if ( + entry.expiresAt <= now || + committedInMemory.size > TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOGICAL_MAX_ENTRIES + ) { + committedInMemory.delete(entryKey); + } + } + } + + function rememberCommittedInMemory(entryKey: string, namespace: string, now: number) { + committedInMemory.set(entryKey, { + namespace, + expiresAt: now + TELEGRAM_MESSAGE_DISPATCH_TTL_MS, + }); + pruneCommittedInMemory(now); + } + + function hasCommittedInMemory(entryKey: string, now = Date.now()): boolean { + const entry = committedInMemory.get(entryKey); + if (!entry) { + return false; + } + if (entry.expiresAt <= now) { + committedInMemory.delete(entryKey); + return false; + } + return true; + } + + function rememberPendingClaim(entryKey: string): PendingClaim { + let resolve!: (result: boolean) => void; + let reject!: (error: unknown) => void; + const promise = new Promise((resolvePromise, rejectPromise) => { + resolve = resolvePromise; + reject = rejectPromise; + }); + void promise.catch(() => {}); + const pending = { promise, resolve, reject }; + inflight.set(entryKey, pending); + return pending; + } + + function enqueueBucketWrite(bucketKey: string, write: () => Promise): Promise { + const previous = bucketWriteQueue.get(bucketKey) ?? Promise.resolve(); + const next = previous.catch(() => undefined).then(write); + const queued = next.then( + () => undefined, + () => undefined, + ); + bucketWriteQueue.set(bucketKey, queued); + void queued.finally(() => { + if (bucketWriteQueue.get(bucketKey) === queued) { + bucketWriteQueue.delete(bucketKey); + } + }); + return next; + } + + async function withBucketLock(params: { + store: TelegramMessageDispatchDedupeStore; + namespace: string; + bucketId: string; + bucketKey: string; + write: () => Promise; + }): Promise { + const lockKey = dedupeBucketLockKey(params.bucketKey); + const lockValue = createDedupeBucketRecord({ + scopeKey, + namespace: `${params.namespace}:lock`, + bucketId: params.bucketId, + }); + let locked = false; + for (let attempt = 0; attempt < TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_ATTEMPTS; attempt += 1) { + if ( + await params.store.registerIfAbsent(lockKey, lockValue, { + ttlMs: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_TTL_MS, + }) + ) { + locked = true; + break; + } + await sleep(TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_RETRY_MS); + } + if (!locked) { + throw new Error( + `timed out acquiring Telegram dispatch dedupe bucket lock: ${params.bucketId}`, + ); + } + try { + return await params.write(); + } finally { + await params.store.delete(lockKey); + } + } + + return { + async claim(key, options): Promise { + const namespace = options?.namespace?.trim() || "global"; + const entryKey = dedupeEntryKey(scopeKey, namespace, key); + const bucketId = dedupeBucketId(key); + const bucketKey = dedupeBucketEntryKey(scopeKey, namespace, bucketId); + if (hasCommittedInMemory(entryKey)) { + return { kind: "duplicate" }; + } + const existing = inflight.get(entryKey); + if (existing) { + return { kind: "inflight", pending: existing.promise }; + } + const pending = rememberPendingClaim(entryKey); + const store = getStore(); + if (!store) { + return { kind: "claimed" }; + } + try { + if ( + await lookupDedupeBucketContains({ + store, + scopeKey, + namespace, + bucketId, + bucketKey, + key, + now: Date.now(), + }) + ) { + pending.resolve(false); + inflight.delete(entryKey); + return { kind: "duplicate" }; + } + return { kind: "claimed" }; + } catch (error) { + onStateError?.(error); + return { kind: "claimed" }; + } + }, + async commit(key, options) { + const namespace = options?.namespace?.trim() || "global"; + const now = options?.now ?? Date.now(); + const entryKey = dedupeEntryKey(scopeKey, namespace, key); + const bucketId = dedupeBucketId(key); + const bucketKey = dedupeBucketEntryKey(scopeKey, namespace, bucketId); + const store = getStore(); + if (!store) { + rememberCommittedInMemory(entryKey, namespace, now); + inflight.get(entryKey)?.resolve(true); + inflight.delete(entryKey); + return false; + } + try { + await enqueueBucketWrite(bucketKey, async () => { + await withBucketLock({ + store, + namespace, + bucketId, + bucketKey, + write: async () => { + const bucket = normalizeDedupeBucketRecord(await store.lookup(bucketKey), { + scopeKey, + namespace, + bucketId, + now, + }); + bucket.entries[key] = now; + pruneDedupeBucketEntries(bucket.entries, now); + await store.register(bucketKey, bucket, { ttlMs: TELEGRAM_MESSAGE_DISPATCH_TTL_MS }); + }, + }); + }); + rememberCommittedInMemory(entryKey, namespace, now); + inflight.get(entryKey)?.resolve(true); + return true; + } catch (error) { + rememberCommittedInMemory(entryKey, namespace, now); + inflight.get(entryKey)?.resolve(true); + onStateError?.(error); + return false; + } finally { + inflight.delete(entryKey); + } + }, + release(key, options) { + const namespace = options?.namespace?.trim() || "global"; + const entryKey = dedupeEntryKey(scopeKey, namespace, key); + const pending = inflight.get(entryKey); + if (pending) { + pending.reject(options?.error ?? new Error(`claim released before commit: ${namespace}`)); + inflight.delete(entryKey); + } + }, + async hasRecent(key, options) { + const namespace = options?.namespace?.trim() || "global"; + const entryKey = dedupeEntryKey(scopeKey, namespace, key); + const bucketId = dedupeBucketId(key); + const bucketKey = dedupeBucketEntryKey(scopeKey, namespace, bucketId); + if (hasCommittedInMemory(entryKey)) { + return true; + } + const store = getStore(); + if (!store) { + return false; + } + try { + return await lookupDedupeBucketContains({ + store, + scopeKey, namespace, - )}.json`, - ), - onDiskError: params.onDiskError, - }); + bucketId, + bucketKey, + key, + now: Date.now(), + }); + } catch (error) { + onStateError?.(error); + return false; + } + }, + async warmup(namespace = "global") { + pruneCommittedInMemory(); + const memoryCount = [...committedInMemory.values()].filter( + (entry) => entry.namespace === namespace, + ).length; + const store = getStore(); + if (!store) { + return memoryCount; + } + try { + const now = Date.now(); + const persistedCount = (await store.entries()) + .filter( + (entry) => entry.value.scopeKey === scopeKey && entry.value.namespace === namespace, + ) + .reduce((count, entry) => { + const bucket = normalizeDedupeBucketRecord(entry.value, { + scopeKey, + namespace, + bucketId: entry.value.bucketId, + now, + }); + return count + Object.keys(bucket.entries).length; + }, 0); + return persistedCount + memoryCount; + } catch (error) { + onStateError?.(error); + return memoryCount; + } + }, + clearMemory() { + inflight.clear(); + committedInMemory.clear(); + }, + memorySize() { + pruneCommittedInMemory(); + return inflight.size + committedInMemory.size; + }, + }; } export async function claimTelegramMessageDispatchReplay(params: { @@ -105,3 +567,64 @@ export function releaseTelegramMessageDispatchReplay(params: { params.guard.release(key, { namespace: params.accountId, error: params.error }); } } + +export function setTelegramMessageDispatchDedupeStoreForTest( + store: TelegramMessageDispatchDedupeStore | undefined, +): void { + dispatchDedupeStoreForTest = store; +} + +export function listTelegramLegacyMessageDispatchDedupeEntries(params: { + storePath: string; + namespace: string; + persistedPath?: string; +}): Array<{ key: string; value: TelegramMessageDispatchDedupeRecord; ttlMs?: number }> { + const filePath = params.persistedPath ?? resolveTelegramMessageDispatchLegacyPath(params); + if (!fs.existsSync(filePath)) { + return []; + } + const now = Date.now(); + let parsed: unknown; + try { + parsed = JSON.parse(fs.readFileSync(filePath, "utf8")); + } catch { + return []; + } + if (!parsed || typeof parsed !== "object") { + return []; + } + const scopeKey = resolveDispatchScopeKey(params.storePath); + const buckets = new Map(); + for (const [key, value] of Object.entries(parsed as Record)) { + if (typeof value !== "number" || !Number.isFinite(value)) { + continue; + } + const ttlMs = TELEGRAM_MESSAGE_DISPATCH_TTL_MS - Math.max(0, now - value); + if (ttlMs <= 0) { + continue; + } + const bucketId = dedupeBucketId(key); + const bucketKey = dedupeLegacyBucketEntryKey({ + scopeKey, + namespace: params.namespace, + bucketId, + sourcePath: filePath, + }); + const bucket = buckets.get(bucketKey) ?? { + value: createDedupeBucketRecord({ + scopeKey, + namespace: params.namespace, + bucketId, + }), + ttlMs: 0, + }; + bucket.value.entries[key] = value; + bucket.ttlMs = Math.max(bucket.ttlMs, ttlMs); + buckets.set(bucketKey, bucket); + } + return [...buckets.entries()].map(([key, bucket]) => ({ + key, + value: bucket.value, + ttlMs: bucket.ttlMs, + })); +} diff --git a/extensions/telegram/src/send.test.ts b/extensions/telegram/src/send.test.ts index a796f90e5ae..c921f7a7d85 100644 --- a/extensions/telegram/src/send.test.ts +++ b/extensions/telegram/src/send.test.ts @@ -1,7 +1,11 @@ import fs from "node:fs"; import type { Bot } from "grammy"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures"; -import { afterEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { buildTelegramConversationContext, createTelegramMessageCache, @@ -14,9 +18,12 @@ import { installTelegramSendTestHooks, } from "./send.test-harness.js"; import { + TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES, + TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE, clearSentMessageCache, recordSentMessage, resetSentMessageCacheForTest, + setTelegramSentMessageStoreForTest, wasSentByBot, } from "./sent-message-cache.js"; @@ -51,6 +58,17 @@ const { } = telegramSendModule; const TELEGRAM_TEST_CFG = {}; +let sentMessageStore: NonNullable[0]>; + +beforeEach(() => { + resetPluginStateStoreForTests({ closeDatabase: false }); + sentMessageStore = createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE, + maxEntries: TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES, + }); + sentMessageStore.clear(); + setTelegramSentMessageStoreForTest(sentMessageStore); +}); async function expectChatNotFoundWithChatId( action: Promise, @@ -186,6 +204,9 @@ function capturedLogText(logFile: string): string { } afterEach(() => { + clearSentMessageCache(); + setTelegramSentMessageStoreForTest(undefined); + resetPluginStateStoreForTests(); setLoggerOverride(null); resetLogger(); resetTelegramMessageCacheBucketsForTest(); @@ -195,7 +216,6 @@ afterEach(() => { describe("sent-message-cache", () => { afterEach(() => { vi.useRealTimers(); - clearSentMessageCache(); }); it("records and retrieves sent messages", () => { @@ -224,6 +244,41 @@ describe("sent-message-cache", () => { expect(wasSentByBot(123, 1)).toBe(false); }); + it("keeps sent-message cache storage failures best-effort", () => { + setTelegramSentMessageStoreForTest({ + ...sentMessageStore, + entries() { + throw new Error("read boom"); + }, + register() { + throw new Error("write boom"); + }, + }); + + expect(() => recordSentMessage(123, 1)).not.toThrow(); + expect(wasSentByBot(123, 1)).toBe(true); + }); + + it("persists sent-message rows with their remaining logical ttl", () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-26T12:00:00.000Z")); + const ttlByMessageId = new Map(); + setTelegramSentMessageStoreForTest({ + ...sentMessageStore, + register(key, value, options) { + sentMessageStore.register(key, value, options); + ttlByMessageId.set(value.messageId, options?.ttlMs ?? 0); + }, + }); + + recordSentMessage(123, 1); + vi.advanceTimersByTime(60 * 60 * 1000); + recordSentMessage(123, 2); + + expect(ttlByMessageId.get("1")).toBe(23 * 60 * 60 * 1000); + expect(ttlByMessageId.get("2")).toBe(24 * 60 * 60 * 1000); + }); + it("keeps sent-message ownership across restart", async () => { const persistedStorePath = `/tmp/openclaw-telegram-send-tests-${process.pid}-restart.json`; const sentMessageCfg = { session: { store: persistedStorePath } }; @@ -237,11 +292,13 @@ describe("sent-message-cache", () => { import.meta.url, "./sent-message-cache.js?scope=restart", ); + restartedCache.setTelegramSentMessageStoreForTest(sentMessageStore); try { expect(restartedCache.wasSentByBot(123, 1, sentMessageCfg)).toBe(true); } finally { restartedCache.clearSentMessageCache(); + restartedCache.setTelegramSentMessageStoreForTest(undefined); } }); @@ -293,6 +350,8 @@ describe("sent-message-cache", () => { import.meta.url, "./sent-message-cache.js?scope=shared-b", ); + cacheA.setTelegramSentMessageStoreForTest(sentMessageStore); + cacheB.setTelegramSentMessageStoreForTest(sentMessageStore); cacheA.clearSentMessageCache(); @@ -304,6 +363,8 @@ describe("sent-message-cache", () => { expect(cacheA.wasSentByBot(123, 1)).toBe(false); } finally { cacheA.clearSentMessageCache(); + cacheA.setTelegramSentMessageStoreForTest(undefined); + cacheB.setTelegramSentMessageStoreForTest(undefined); } }); }); diff --git a/extensions/telegram/src/sent-message-cache.ts b/extensions/telegram/src/sent-message-cache.ts index 44307338a0f..af4f3529e53 100644 --- a/extensions/telegram/src/sent-message-cache.ts +++ b/extensions/telegram/src/sent-message-cache.ts @@ -1,23 +1,50 @@ +import { createHash } from "node:crypto"; import fs from "node:fs"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; -import { replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime"; import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime"; +import { getTelegramRuntime } from "./runtime.js"; const TTL_MS = 24 * 60 * 60 * 1000; +export const TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE = "telegram.sent-messages"; +export const TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES = 10_000; const TELEGRAM_SENT_MESSAGES_STATE_KEY = Symbol.for("openclaw.telegramSentMessagesState"); +const TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY = Symbol.for( + "openclaw.telegramSentMessagesStoreForTest", +); + +type PersistedSentMessage = { + scopeKey: string; + chatId: string; + messageId: string; + timestamp: number; +}; type SentMessageStore = Map>; +type SentMessagePersistentStore = PluginStateSyncKeyedStore; type SentMessageBucket = { - persistedPath: string; + scopeKey: string; store: SentMessageStore; }; type SentMessageState = { - bucketsByPath: Map; + bucketsByScope: Map; }; +let sentMessageStoreForTest: SentMessagePersistentStore | undefined; + +function getSentMessageStoreForTest(): SentMessagePersistentStore | undefined { + const globalStore = globalThis as Record; + return ( + sentMessageStoreForTest ?? + (globalStore[TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY] as + | SentMessagePersistentStore + | undefined) + ); +} + function getSentMessageState(): SentMessageState { const globalStore = globalThis as Record; const existing = globalStore[TELEGRAM_SENT_MESSAGES_STATE_KEY] as SentMessageState | undefined; @@ -25,7 +52,7 @@ function getSentMessageState(): SentMessageState { return existing; } const state: SentMessageState = { - bucketsByPath: new Map(), + bucketsByScope: new Map(), }; globalStore[TELEGRAM_SENT_MESSAGES_STATE_KEY] = state; return state; @@ -39,6 +66,28 @@ function resolveSentMessageStorePath(cfg?: Pick): str return `${resolveStorePath(cfg?.session?.store)}.telegram-sent-messages.json`; } +function resolveSentMessageScopeKey(cfg?: Pick): string { + const storePath = resolveStorePath(cfg?.session?.store); + return createHash("sha256").update(storePath, "utf8").digest("hex").slice(0, 24); +} + +function sentMessageEntryKey(scopeKey: string, chatId: string, messageId: string): string { + return createHash("sha256") + .update(`${scopeKey}\0${chatId}\0${messageId}`, "utf8") + .digest("hex") + .slice(0, 32); +} + +function openSentMessageStore(): SentMessagePersistentStore { + return ( + getSentMessageStoreForTest() ?? + getTelegramRuntime().state.openSyncKeyedStore({ + namespace: TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE, + maxEntries: TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES, + }) + ); +} + function cleanupExpired( store: SentMessageStore, scopeKey: string, @@ -46,7 +95,7 @@ function cleanupExpired( now: number, ): void { for (const [id, timestamp] of entry) { - if (now - timestamp > TTL_MS) { + if (now - timestamp >= TTL_MS) { entry.delete(id); } } @@ -55,10 +104,7 @@ function cleanupExpired( } } -function readPersistedSentMessages(filePath: string): SentMessageStore { - if (!fs.existsSync(filePath)) { - return createSentMessageStore(); - } +function readLegacySentMessages(filePath: string): SentMessageStore { try { const raw = fs.readFileSync(filePath, "utf-8"); const parsed = JSON.parse(raw) as Record>; @@ -86,18 +132,39 @@ function readPersistedSentMessages(filePath: string): SentMessageStore { } } +function readPersistedSentMessages(scopeKey: string): SentMessageStore { + const now = Date.now(); + const store = createSentMessageStore(); + try { + for (const entry of openSentMessageStore().entries()) { + if (entry.value.scopeKey !== scopeKey || now - entry.value.timestamp > TTL_MS) { + continue; + } + let messages = store.get(entry.value.chatId); + if (!messages) { + messages = new Map(); + store.set(entry.value.chatId, messages); + } + messages.set(entry.value.messageId, entry.value.timestamp); + } + } catch (error) { + logVerbose(`telegram: failed to read sent-message cache: ${String(error)}`); + } + return store; +} + function getSentMessageBucket(cfg?: Pick): SentMessageBucket { const state = getSentMessageState(); - const persistedPath = resolveSentMessageStorePath(cfg); - const existing = state.bucketsByPath.get(persistedPath); + const scopeKey = resolveSentMessageScopeKey(cfg); + const existing = state.bucketsByScope.get(scopeKey); if (existing) { return existing; } const bucket = { - persistedPath, - store: readPersistedSentMessages(persistedPath), + scopeKey, + store: readPersistedSentMessages(scopeKey), }; - state.bucketsByPath.set(persistedPath, bucket); + state.bucketsByScope.set(scopeKey, bucket); return bucket; } @@ -106,24 +173,22 @@ function getSentMessages(cfg?: Pick): SentMessageStor } function persistSentMessages(bucket: SentMessageBucket): void { - const { store, persistedPath } = bucket; + const { store, scopeKey } = bucket; const now = Date.now(); - const serialized: Record> = {}; for (const [chatId, entry] of store) { cleanupExpired(store, chatId, entry, now); - if (entry.size > 0) { - serialized[chatId] = Object.fromEntries(entry); + for (const [messageId, timestamp] of entry) { + const ttlMs = TTL_MS - Math.max(0, now - timestamp); + if (ttlMs <= 0) { + continue; + } + openSentMessageStore().register( + sentMessageEntryKey(scopeKey, chatId, messageId), + { scopeKey, chatId, messageId, timestamp }, + { ttlMs }, + ); } } - if (Object.keys(serialized).length === 0) { - fs.rmSync(persistedPath, { force: true }); - return; - } - replaceFileAtomicSync({ - filePath: persistedPath, - content: JSON.stringify(serialized), - tempPrefix: ".telegram-sent-message-cache", - }); } export function recordSentMessage( @@ -170,13 +235,43 @@ export function wasSentByBot( export function clearSentMessageCache(): void { const state = getSentMessageState(); - for (const bucket of state.bucketsByPath.values()) { + for (const bucket of state.bucketsByScope.values()) { bucket.store.clear(); - fs.rmSync(bucket.persistedPath, { force: true }); } - state.bucketsByPath.clear(); + state.bucketsByScope.clear(); + openSentMessageStore().clear(); } export function resetSentMessageCacheForTest(): void { - getSentMessageState().bucketsByPath.clear(); + getSentMessageState().bucketsByScope.clear(); +} + +export function setTelegramSentMessageStoreForTest( + store: SentMessagePersistentStore | undefined, +): void { + sentMessageStoreForTest = store; + const globalStore = globalThis as Record; + if (store) { + globalStore[TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY] = store; + } else { + delete globalStore[TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY]; + } +} + +export function listTelegramLegacySentMessageCacheEntries(params: { + cfg?: Pick; + persistedPath?: string; +}): Array<{ key: string; value: PersistedSentMessage; ttlMs?: number }> { + const scopeKey = resolveSentMessageScopeKey(params.cfg); + const filePath = params.persistedPath ?? resolveSentMessageStorePath(params.cfg); + const legacy = fs.existsSync(filePath) + ? readLegacySentMessages(filePath) + : createSentMessageStore(); + return [...legacy.entries()].flatMap(([chatId, messages]) => + [...messages.entries()].map(([messageId, timestamp]) => ({ + key: sentMessageEntryKey(scopeKey, chatId, messageId), + value: { scopeKey, chatId, messageId, timestamp }, + ttlMs: Math.max(1, TTL_MS - Math.max(0, Date.now() - timestamp)), + })), + ); } diff --git a/extensions/telegram/src/state-migrations.test.ts b/extensions/telegram/src/state-migrations.test.ts index 359c67cab5f..be31b368c27 100644 --- a/extensions/telegram/src/state-migrations.test.ts +++ b/extensions/telegram/src/state-migrations.test.ts @@ -7,6 +7,7 @@ import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime"; import { describe, expect, it } from "vitest"; import { resolveTelegramBotInfoCachePath } from "./bot-info-cache.js"; import { resolveTelegramMessageCachePath } from "./message-cache.js"; +import { resolveTelegramMessageDispatchLegacyPath } from "./message-dispatch-dedupe.js"; import { detectTelegramLegacyStateMigrations } from "./state-migrations.js"; import { resolveTopicNameCacheNamespace, @@ -278,4 +279,261 @@ describe("telegram state migrations", () => { await rm(dir, { recursive: true, force: true }); } }); + + it("detects remaining Telegram JSON sidecars for plugin-state import", async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-state-migration-")); + const env = { ...process.env, OPENCLAW_STATE_DIR: dir }; + const storePath = resolveStorePath(undefined, { env }); + const now = Date.now(); + const updateOffsetPath = path.join(dir, "telegram", "update-offset-ops.json"); + const stickerCachePath = path.join(dir, "telegram", "sticker-cache.json"); + const sentMessagePath = `${storePath}.telegram-sent-messages.json`; + const threadBindingsPath = path.join(dir, "telegram", "thread-bindings-ops.json"); + const dispatchPath = resolveTelegramMessageDispatchLegacyPath({ + storePath, + namespace: "ops", + }); + try { + await mkdir(path.dirname(updateOffsetPath), { recursive: true }); + await mkdir(path.dirname(sentMessagePath), { recursive: true }); + await writeFile( + updateOffsetPath, + JSON.stringify({ + version: 3, + lastUpdateId: 12345, + botId: "123456", + tokenFingerprint: "token:fingerprint", + }), + ); + await writeFile( + stickerCachePath, + JSON.stringify({ + version: 1, + stickers: { + unique_sticker: { + fileId: "file-1", + fileUniqueId: "unique_sticker", + description: "Deploy sticker", + cachedAt: "2026-05-24T12:00:00.000Z", + }, + }, + }), + ); + await writeFile(sentMessagePath, JSON.stringify({ 7: { 42: now } })); + await writeFile( + threadBindingsPath, + JSON.stringify({ + version: 1, + bindings: [ + { + accountId: "ops", + conversationId: "-100:topic:7", + targetKind: "subagent", + targetSessionKey: "agent:main:subagent:child", + boundAt: now, + lastActivityAt: now, + }, + ], + }), + ); + await writeFile( + dispatchPath, + JSON.stringify({ [JSON.stringify(["message", "7", 42])]: now }), + ); + + const cfg = { + channels: { + telegram: { + accounts: { + ops: { + botToken: "123456:secret", + }, + }, + }, + }, + } as OpenClawConfig; + const plans = await detectTelegramLegacyStateMigrations({ cfg, env }); + + const byLabel = new Map(plans.map((plan) => [plan.label, plan])); + expect(byLabel.get("Telegram update offset")).toMatchObject({ + kind: "plugin-state-import", + sourcePath: updateOffsetPath, + namespace: "telegram.update-offsets", + }); + expect(byLabel.get("Telegram sticker cache")).toMatchObject({ + kind: "plugin-state-import", + sourcePath: stickerCachePath, + namespace: "telegram.sticker-cache", + }); + expect(byLabel.get("Telegram sent-message cache")).toMatchObject({ + kind: "plugin-state-import", + sourcePath: sentMessagePath, + namespace: "telegram.sent-messages", + }); + expect(byLabel.get("Telegram thread bindings")).toMatchObject({ + kind: "plugin-state-import", + sourcePath: threadBindingsPath, + namespace: "telegram.thread-bindings", + }); + expect(byLabel.get("Telegram message dispatch dedupe")).toMatchObject({ + kind: "plugin-state-import", + sourcePath: dispatchPath, + namespace: "telegram.message-dispatch-dedupe", + }); + + for (const label of [ + "Telegram update offset", + "Telegram sticker cache", + "Telegram sent-message cache", + "Telegram thread bindings", + "Telegram message dispatch dedupe", + ]) { + const plan = byLabel.get(label); + if (!plan || plan.kind !== "plugin-state-import") { + throw new Error(`expected plugin-state import plan: ${label}`); + } + expect(await plan.readEntries()).toHaveLength(1); + } + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + it("detects Telegram account sidecars even after the account was removed from config", async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-state-migration-")); + const env = { ...process.env, OPENCLAW_STATE_DIR: dir }; + const updateOffsetPath = path.join(dir, "telegram", "update-offset-oldbot.json"); + const threadBindingsPath = path.join(dir, "telegram", "thread-bindings-oldbot.json"); + const now = Date.now(); + try { + await mkdir(path.dirname(updateOffsetPath), { recursive: true }); + await writeFile( + updateOffsetPath, + JSON.stringify({ + version: 3, + lastUpdateId: 12345, + botId: "123456", + tokenFingerprint: "token:fingerprint", + }), + ); + await writeFile( + threadBindingsPath, + JSON.stringify({ + version: 1, + bindings: [ + { + accountId: "oldbot", + conversationId: "-100:topic:7", + targetKind: "subagent", + targetSessionKey: "agent:main:subagent:child", + boundAt: now, + lastActivityAt: now, + }, + ], + }), + ); + + const plans = await detectTelegramLegacyStateMigrations({ cfg: {}, env }); + const updateOffsetPlan = plans.find((plan) => plan.sourcePath === updateOffsetPath); + const threadBindingsPlan = plans.find((plan) => plan.sourcePath === threadBindingsPath); + + expect(updateOffsetPlan).toMatchObject({ + kind: "plugin-state-import", + label: "Telegram update offset", + namespace: "telegram.update-offsets", + }); + expect(threadBindingsPlan).toMatchObject({ + kind: "plugin-state-import", + label: "Telegram thread bindings", + namespace: "telegram.thread-bindings", + }); + if (!updateOffsetPlan || updateOffsetPlan.kind !== "plugin-state-import") { + throw new Error("expected orphaned update offset import plan"); + } + if (!threadBindingsPlan || threadBindingsPlan.kind !== "plugin-state-import") { + throw new Error("expected orphaned thread bindings import plan"); + } + expect(await updateOffsetPlan.readEntries()).toHaveLength(1); + expect(await threadBindingsPlan.readEntries()).toHaveLength(1); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); + + it("imports legacy session-store sidecars into the current runtime scope", async () => { + const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-state-migration-")); + const env = { ...process.env, OPENCLAW_STATE_DIR: dir }; + const storePath = resolveStorePath(undefined, { env }); + const legacyStorePath = path.join(dir, "sessions", "sessions.json"); + const currentSentPath = `${storePath}.telegram-sent-messages.json`; + const legacySentPath = `${legacyStorePath}.telegram-sent-messages.json`; + const currentDispatchPath = resolveTelegramMessageDispatchLegacyPath({ + storePath, + namespace: "ops", + }); + const legacyDispatchPath = resolveTelegramMessageDispatchLegacyPath({ + storePath: legacyStorePath, + namespace: "ops", + }); + const now = Date.now(); + try { + await mkdir(path.dirname(currentSentPath), { recursive: true }); + await mkdir(path.dirname(legacySentPath), { recursive: true }); + const sentPayload = JSON.stringify({ 7: { 42: now } }); + const dispatchPayload = JSON.stringify({ [JSON.stringify(["message", "7", 42])]: now }); + await writeFile(currentSentPath, sentPayload); + await writeFile(legacySentPath, sentPayload); + await writeFile(currentDispatchPath, dispatchPayload); + await writeFile(legacyDispatchPath, dispatchPayload); + + const cfg = { + channels: { + telegram: { + accounts: { + ops: { + botToken: "123456:secret", + }, + }, + }, + }, + } as OpenClawConfig; + const plans = await detectTelegramLegacyStateMigrations({ cfg, env }); + const importPlans = plans.filter((plan) => plan.kind === "plugin-state-import"); + const currentSentPlan = importPlans.find( + (plan) => + plan.label === "Telegram sent-message cache" && plan.sourcePath === currentSentPath, + ); + const legacySentPlan = importPlans.find( + (plan) => + plan.label === "Telegram sent-message cache" && plan.sourcePath === legacySentPath, + ); + const currentDispatchPlan = importPlans.find( + (plan) => + plan.label === "Telegram message dispatch dedupe" && + plan.sourcePath === currentDispatchPath, + ); + const legacyDispatchPlan = importPlans.find( + (plan) => + plan.label === "Telegram message dispatch dedupe" && + plan.sourcePath === legacyDispatchPath, + ); + if (!currentSentPlan || !legacySentPlan || !currentDispatchPlan || !legacyDispatchPlan) { + throw new Error("expected current and legacy session-store import plans"); + } + + const stripTtl = (entries: Awaited>) => + entries.map(({ ttlMs: _ttlMs, ...entry }) => entry); + expect(stripTtl(await legacySentPlan.readEntries())).toStrictEqual( + stripTtl(await currentSentPlan.readEntries()), + ); + const stripDispatchSourceKey = ( + entries: Awaited>, + ) => entries.map(({ key: _key, ttlMs: _ttlMs, ...entry }) => entry); + expect(stripDispatchSourceKey(await legacyDispatchPlan.readEntries())).toStrictEqual( + stripDispatchSourceKey(await currentDispatchPlan.readEntries()), + ); + } finally { + await rm(dir, { recursive: true, force: true }); + } + }); }); diff --git a/extensions/telegram/src/state-migrations.ts b/extensions/telegram/src/state-migrations.ts index 79f5cb879eb..c38efa1b664 100644 --- a/extensions/telegram/src/state-migrations.ts +++ b/extensions/telegram/src/state-migrations.ts @@ -1,3 +1,4 @@ +import fs from "node:fs"; import path from "node:path"; import type { ChannelLegacyStateMigrationPlan } from "openclaw/plugin-sdk/channel-contract"; import { resolveChannelAllowFromPath } from "openclaw/plugin-sdk/channel-pairing"; @@ -19,6 +20,29 @@ import { TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES, TELEGRAM_MESSAGE_CACHE_PERSISTENT_NAMESPACE, } from "./message-cache.js"; +import { + listTelegramLegacyMessageDispatchDedupeEntries, + resolveTelegramMessageDispatchLegacyPath, + TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, +} from "./message-dispatch-dedupe.js"; +import { + listTelegramLegacySentMessageCacheEntries, + TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES, + TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE, +} from "./sent-message-cache.js"; +import { + listTelegramLegacyStickerCacheEntries, + TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + TELEGRAM_STICKER_CACHE_NAMESPACE, +} from "./sticker-cache-store.js"; +import { + listTelegramLegacyThreadBindingEntries, + TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES, + TELEGRAM_THREAD_BINDINGS_NAMESPACE, + testing as telegramThreadBindingTesting, +} from "./thread-bindings.js"; +import { resolveTelegramToken } from "./token.js"; import { listTelegramLegacyTopicNameCacheEntries, resolveTopicNameCacheNamespace, @@ -26,6 +50,13 @@ import { resolveTopicNameCacheScope, TELEGRAM_TOPIC_NAME_CACHE_MAX_ENTRIES, } from "./topic-name-cache.js"; +import { + listTelegramLegacyUpdateOffsetEntries, + normalizeTelegramUpdateOffsetAccountId, + shouldReplaceTelegramUpdateOffsetEntry, + TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + TELEGRAM_UPDATE_OFFSET_NAMESPACE, +} from "./update-offset-store.js"; function fileExists(pathValue: string): boolean { try { @@ -39,12 +70,40 @@ function resolveLegacySessionStorePath(params: { env: NodeJS.ProcessEnv; stateDir?: string; }): string { - const stateDir = + return path.join(resolveMigrationStateDir(params), "sessions", "sessions.json"); +} + +function resolveMigrationStateDir(params: { env: NodeJS.ProcessEnv; stateDir?: string }): string { + return ( params.stateDir ?? path.dirname( path.dirname(path.dirname(path.dirname(resolveStorePath(undefined, { env: params.env })))), - ); - return path.join(stateDir, "sessions", "sessions.json"); + ) + ); +} + +function listTelegramLegacySidecarAccountIds(params: { + cfg: OpenClawConfig; + stateDir: string; + prefix: string; + suffix: string; +}): string[] { + let persistedAccountIds: string[] = []; + try { + persistedAccountIds = fs + .readdirSync(path.join(params.stateDir, "telegram"), { withFileTypes: true }) + .filter( + (entry) => + entry.isFile() && + entry.name.startsWith(params.prefix) && + entry.name.endsWith(params.suffix), + ) + .map((entry) => entry.name.slice(params.prefix.length, -params.suffix.length)) + .filter(Boolean); + } catch { + persistedAccountIds = []; + } + return uniqueStrings([...listTelegramAccountIds(params.cfg), ...persistedAccountIds]); } function detectTelegramMessageCacheLegacyStateMigration(params: { @@ -113,6 +172,190 @@ function detectTelegramBotInfoCacheLegacyStateMigration(params: { }); } +function detectTelegramUpdateOffsetLegacyStateMigration(params: { + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const stateDir = resolveMigrationStateDir(params); + return listTelegramLegacySidecarAccountIds({ + cfg: params.cfg, + stateDir, + prefix: "update-offset-", + suffix: ".json", + }).flatMap((accountId) => { + const normalized = normalizeTelegramUpdateOffsetAccountId(accountId); + const persistedPath = path.join(stateDir, "telegram", `update-offset-${normalized}.json`); + if (!fileExists(persistedPath)) { + return []; + } + let botToken: string | undefined; + try { + botToken = + resolveTelegramToken(params.cfg, { + accountId, + envToken: params.env.TELEGRAM_BOT_TOKEN, + }).token || undefined; + } catch { + botToken = undefined; + } + return { + kind: "plugin-state-import", + label: "Telegram update offset", + sourcePath: persistedPath, + targetPath: `plugin state:${TELEGRAM_UPDATE_OFFSET_NAMESPACE}`, + pluginId: "telegram", + namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE, + maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + scopeKey: "", + cleanupSource: "rename", + preview: `- Telegram update offset: ${persistedPath} → plugin state (${TELEGRAM_UPDATE_OFFSET_NAMESPACE})`, + readEntries: () => listTelegramLegacyUpdateOffsetEntries({ accountId, persistedPath }), + shouldReplaceExistingEntry: ({ existingValue, incomingValue }) => + shouldReplaceTelegramUpdateOffsetEntry({ + existingValue, + incomingValue, + botToken, + }), + }; + }); +} + +function detectTelegramStickerCacheLegacyStateMigration(params: { + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const stateDir = resolveMigrationStateDir(params); + const persistedPath = path.join(stateDir, "telegram", "sticker-cache.json"); + if (!fileExists(persistedPath)) { + return []; + } + return [ + { + kind: "plugin-state-import", + label: "Telegram sticker cache", + sourcePath: persistedPath, + targetPath: `plugin state:${TELEGRAM_STICKER_CACHE_NAMESPACE}`, + pluginId: "telegram", + namespace: TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + scopeKey: "", + cleanupSource: "rename", + preview: `- Telegram sticker cache: ${persistedPath} → plugin state (${TELEGRAM_STICKER_CACHE_NAMESPACE})`, + readEntries: () => listTelegramLegacyStickerCacheEntries({ persistedPath }), + }, + ]; +} + +function detectTelegramSentMessageCacheLegacyStateMigration(params: { + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const storePath = resolveStorePath(params.cfg.session?.store, { env: params.env }); + const legacyStorePath = resolveLegacySessionStorePath(params); + const sources = uniqueStrings([storePath, legacyStorePath]).map((sourceStorePath) => ({ + targetStorePath: storePath, + sourcePath: `${sourceStorePath}.telegram-sent-messages.json`, + })); + return sources.flatMap((source) => { + if (!fileExists(source.sourcePath)) { + return []; + } + return { + kind: "plugin-state-import", + label: "Telegram sent-message cache", + sourcePath: source.sourcePath, + targetPath: `plugin state:${TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE}`, + pluginId: "telegram", + namespace: TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE, + maxEntries: TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES, + scopeKey: "", + cleanupSource: "rename", + preview: `- Telegram sent-message cache: ${source.sourcePath} → plugin state (${TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE})`, + readEntries: () => + listTelegramLegacySentMessageCacheEntries({ + cfg: { session: { store: source.targetStorePath } }, + persistedPath: source.sourcePath, + }), + }; + }); +} + +function detectTelegramThreadBindingLegacyStateMigration(params: { + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const stateDir = resolveMigrationStateDir(params); + return listTelegramLegacySidecarAccountIds({ + cfg: params.cfg, + stateDir, + prefix: "thread-bindings-", + suffix: ".json", + }).flatMap((accountId) => { + const persistedPath = telegramThreadBindingTesting.resolveBindingsPath(accountId, params.env); + if (!fileExists(persistedPath)) { + return []; + } + return { + kind: "plugin-state-import", + label: "Telegram thread bindings", + sourcePath: persistedPath, + targetPath: `plugin state:${TELEGRAM_THREAD_BINDINGS_NAMESPACE}`, + pluginId: "telegram", + namespace: TELEGRAM_THREAD_BINDINGS_NAMESPACE, + maxEntries: TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES, + scopeKey: "", + cleanupSource: "rename", + preview: `- Telegram thread bindings: ${persistedPath} → plugin state (${TELEGRAM_THREAD_BINDINGS_NAMESPACE})`, + readEntries: () => listTelegramLegacyThreadBindingEntries({ accountId, persistedPath }), + }; + }); +} + +function detectTelegramMessageDispatchLegacyStateMigration(params: { + cfg: OpenClawConfig; + env: NodeJS.ProcessEnv; + stateDir?: string; +}): ChannelLegacyStateMigrationPlan[] { + const storePath = resolveStorePath(params.cfg.session?.store, { env: params.env }); + const legacyStorePath = resolveLegacySessionStorePath(params); + return listTelegramAccountIds(params.cfg).flatMap((accountId) => { + const sources = uniqueStrings([storePath, legacyStorePath]).map((sourceStorePath) => ({ + targetStorePath: storePath, + sourcePath: resolveTelegramMessageDispatchLegacyPath({ + storePath: sourceStorePath, + namespace: accountId, + }), + })); + return sources.flatMap((source) => { + const sourcePath = source.sourcePath; + if (!fileExists(sourcePath)) { + return []; + } + return { + kind: "plugin-state-import", + label: "Telegram message dispatch dedupe", + sourcePath, + targetPath: `plugin state:${TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE}`, + pluginId: "telegram", + namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE, + maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES, + scopeKey: "", + cleanupSource: "rename", + preview: `- Telegram message dispatch dedupe: ${sourcePath} → plugin state (${TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE})`, + readEntries: () => + listTelegramLegacyMessageDispatchDedupeEntries({ + storePath: source.targetStorePath, + namespace: accountId, + persistedPath: source.sourcePath, + }), + }; + }); + }); +} + function topicNameCacheImportSource(params: { sourceStorePath: string; targetStorePath?: string; @@ -197,8 +440,13 @@ export async function detectTelegramLegacyStateMigrations(params: { }); } } + plans.push(...detectTelegramUpdateOffsetLegacyStateMigration(params)); plans.push(...detectTelegramBotInfoCacheLegacyStateMigration(params)); + plans.push(...detectTelegramStickerCacheLegacyStateMigration(params)); plans.push(...detectTelegramMessageCacheLegacyStateMigration(params)); + plans.push(...detectTelegramSentMessageCacheLegacyStateMigration(params)); plans.push(...detectTelegramTopicNameCacheLegacyStateMigration(params)); + plans.push(...detectTelegramThreadBindingLegacyStateMigration(params)); + plans.push(...detectTelegramMessageDispatchLegacyStateMigration(params)); return plans; } diff --git a/extensions/telegram/src/sticker-cache-store.ts b/extensions/telegram/src/sticker-cache-store.ts index a4b2720921d..fa230df31a0 100644 --- a/extensions/telegram/src/sticker-cache-store.ts +++ b/extensions/telegram/src/sticker-cache-store.ts @@ -1,8 +1,13 @@ import path from "node:path"; -import { loadJsonFile, saveJsonFile } from "openclaw/plugin-sdk/json-store"; +import { loadJsonFile } from "openclaw/plugin-sdk/json-store"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { getTelegramRuntime } from "./runtime.js"; const CACHE_VERSION = 1; +export const TELEGRAM_STICKER_CACHE_NAMESPACE = "telegram.sticker-cache"; +export const TELEGRAM_STICKER_CACHE_MAX_ENTRIES = 10_000; export interface CachedSticker { fileId: string; @@ -19,57 +24,89 @@ interface StickerCache { stickers: Record; } +type TelegramStickerCacheStore = PluginStateSyncKeyedStore; + +let stickerCacheStoreForTest: TelegramStickerCacheStore | undefined; + function getCacheFile(): string { return path.join(resolveStateDir(), "telegram", "sticker-cache.json"); } -function loadCache(): StickerCache { - const data = loadJsonFile(getCacheFile()); - if (!data || typeof data !== "object") { - return { version: CACHE_VERSION, stickers: {} }; - } - const cache = data as StickerCache; - if (cache.version !== CACHE_VERSION) { - // Future: handle migration if needed - return { version: CACHE_VERSION, stickers: {} }; - } - return cache; +function openStickerCacheStore(): TelegramStickerCacheStore { + return ( + stickerCacheStoreForTest ?? + getTelegramRuntime().state.openSyncKeyedStore({ + namespace: TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + }) + ); } -function saveCache(cache: StickerCache): void { - saveJsonFile(getCacheFile(), cache); +function loadCache(): StickerCache { + return loadCacheFile(getCacheFile()); } function normalizeStickerSearchText(value: unknown): string { return typeof value === "string" ? value.trim().toLowerCase() : ""; } +function normalizeCachedStickerForStore(sticker: CachedSticker): CachedSticker { + return { + fileId: sticker.fileId, + fileUniqueId: sticker.fileUniqueId, + description: sticker.description, + cachedAt: sticker.cachedAt, + ...(sticker.emoji !== undefined ? { emoji: sticker.emoji } : {}), + ...(sticker.setName !== undefined ? { setName: sticker.setName } : {}), + ...(sticker.receivedFrom !== undefined ? { receivedFrom: sticker.receivedFrom } : {}), + }; +} + +function readStickerCacheStore( + operation: string, + read: (store: TelegramStickerCacheStore) => T, + fallback: T, +): T { + try { + return read(openStickerCacheStore()); + } catch (err) { + logVerbose(`telegram sticker cache ${operation} failed: ${String(err)}`); + return fallback; + } +} + /** * Get a cached sticker by its unique ID. */ export function getCachedSticker(fileUniqueId: string): CachedSticker | null { - const cache = loadCache(); - return cache.stickers[fileUniqueId] ?? null; + return readStickerCacheStore("lookup", (store) => store.lookup(fileUniqueId) ?? null, null); } /** * Add or update a sticker in the cache. */ export function cacheSticker(sticker: CachedSticker): void { - const cache = loadCache(); - cache.stickers[sticker.fileUniqueId] = sticker; - saveCache(cache); + readStickerCacheStore( + "register", + (store) => { + store.register(sticker.fileUniqueId, normalizeCachedStickerForStore(sticker)); + }, + undefined, + ); } /** * Search cached stickers by text query (fuzzy match on description + emoji + setName). */ export function searchStickers(query: string, limit = 10): CachedSticker[] { - const cache = loadCache(); const queryLower = normalizeStickerSearchText(query); const results: Array<{ sticker: CachedSticker; score: number }> = []; - for (const sticker of Object.values(cache.stickers)) { + for (const { value: sticker } of readStickerCacheStore( + "entries", + (store) => store.entries(), + [], + )) { let score = 0; const descLower = normalizeStickerSearchText(sticker.description); @@ -112,16 +149,18 @@ export function searchStickers(query: string, limit = 10): CachedSticker[] { * Get all cached stickers (for debugging/listing). */ export function getAllCachedStickers(): CachedSticker[] { - const cache = loadCache(); - return Object.values(cache.stickers); + return readStickerCacheStore( + "entries", + (store) => store.entries().map((entry) => entry.value), + [], + ); } /** * Get cache statistics. */ export function getCacheStats(): { count: number; oldestAt?: string; newestAt?: string } { - const cache = loadCache(); - const stickers = Object.values(cache.stickers); + const stickers = getAllCachedStickers(); if (stickers.length === 0) { return { count: 0 }; } @@ -134,3 +173,37 @@ export function getCacheStats(): { count: number; oldestAt?: string; newestAt?: newestAt: sorted[sorted.length - 1]?.cachedAt, }; } + +export function setTelegramStickerCacheStoreForTest( + store: TelegramStickerCacheStore | undefined, +): void { + stickerCacheStoreForTest = store; +} + +export function clearTelegramStickerCacheForTest(): void { + openStickerCacheStore().clear(); +} + +export function listTelegramLegacyStickerCacheEntries( + params: { + persistedPath?: string; + } = {}, +): Array<{ key: string; value: CachedSticker }> { + const cache = params.persistedPath ? loadCacheFile(params.persistedPath) : loadCache(); + return Object.entries(cache.stickers).map(([key, value]) => ({ + key, + value: normalizeCachedStickerForStore(value), + })); +} + +function loadCacheFile(filePath: string): StickerCache { + const data = loadJsonFile(filePath); + if (!data || typeof data !== "object") { + return { version: CACHE_VERSION, stickers: {} }; + } + const cache = data as StickerCache; + if (cache.version !== CACHE_VERSION) { + return { version: CACHE_VERSION, stickers: {} }; + } + return cache; +} diff --git a/extensions/telegram/src/sticker-cache.test.ts b/extensions/telegram/src/sticker-cache.test.ts index 117e7c10cc9..82868dbb68b 100644 --- a/extensions/telegram/src/sticker-cache.test.ts +++ b/extensions/telegram/src/sticker-cache.test.ts @@ -1,31 +1,29 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import * as stickerCache from "./sticker-cache-store.js"; -const jsonStoreMocks = vi.hoisted(() => { - const store: { value: unknown } = { value: null }; - return { - store, - loadJsonFile: vi.fn(() => store.value), - saveJsonFile: vi.fn((_file: string, value: unknown) => { - store.value = structuredClone(value); - }), - }; -}); - -vi.mock("openclaw/plugin-sdk/json-store", () => ({ - loadJsonFile: jsonStoreMocks.loadJsonFile, - saveJsonFile: jsonStoreMocks.saveJsonFile, -})); - vi.mock("openclaw/plugin-sdk/state-paths", () => ({ resolveStateDir: () => "/tmp/openclaw-test-sticker-cache", })); describe("sticker-cache", () => { beforeEach(() => { - jsonStoreMocks.store.value = null; - jsonStoreMocks.loadJsonFile.mockClear(); - jsonStoreMocks.saveJsonFile.mockClear(); + resetPluginStateStoreForTests({ closeDatabase: false }); + stickerCache.setTelegramStickerCacheStoreForTest( + createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + }), + ); + stickerCache.clearTelegramStickerCacheForTest(); + }); + + afterEach(() => { + stickerCache.setTelegramStickerCacheStoreForTest(undefined); + resetPluginStateStoreForTests(); }); describe("getCachedSticker", () => { @@ -65,7 +63,21 @@ describe("sticker-cache", () => { } expect(cachedSticker.fileUniqueId).toBe("unique123"); - jsonStoreMocks.store.value = null; + stickerCache.clearTelegramStickerCacheForTest(); + + expect(stickerCache.getCachedSticker("unique123")).toBeNull(); + }); + + it("treats plugin-state lookup failures as cache misses", () => { + stickerCache.setTelegramStickerCacheStoreForTest({ + ...createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + }), + lookup() { + throw new Error("lookup failed"); + }, + }); expect(stickerCache.getCachedSticker("unique123")).toBeNull(); }); @@ -87,6 +99,25 @@ describe("sticker-cache", () => { expect(all[0]).toEqual(sticker); }); + it("omits undefined optional fields before storing", () => { + stickerCache.cacheSticker({ + fileId: "file-undefined", + fileUniqueId: "unique-undefined", + emoji: undefined, + setName: undefined, + description: "Sticker with omitted fields", + cachedAt: "2026-01-26T12:00:00.000Z", + receivedFrom: undefined, + }); + + expect(stickerCache.getCachedSticker("unique-undefined")).toStrictEqual({ + fileId: "file-undefined", + fileUniqueId: "unique-undefined", + description: "Sticker with omitted fields", + cachedAt: "2026-01-26T12:00:00.000Z", + }); + }); + it("updates existing entry", () => { const original = { fileId: "file789", @@ -108,6 +139,27 @@ describe("sticker-cache", () => { expect(result?.description).toBe("Updated description"); expect(result?.fileId).toBe("file789-new"); }); + + it("does not throw when plugin-state writes fail", () => { + stickerCache.setTelegramStickerCacheStoreForTest({ + ...createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + }), + register() { + throw new Error("write failed"); + }, + }); + + expect(() => + stickerCache.cacheSticker({ + fileId: "file-failure", + fileUniqueId: "unique-failure", + description: "Write failure should not block sticker handling", + cachedAt: "2026-01-26T13:00:00.000Z", + }), + ).not.toThrow(); + }); }); describe("searchStickers", () => { @@ -201,6 +253,20 @@ describe("sticker-cache", () => { expect(results).toHaveLength(1); expect(results[0]?.fileUniqueId).toBe("cat-unique-1"); }); + + it("returns no matches when plugin-state search reads fail", () => { + stickerCache.setTelegramStickerCacheStoreForTest({ + ...createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + }), + entries() { + throw new Error("entries failed"); + }, + }); + + expect(stickerCache.searchStickers("fox")).toStrictEqual([]); + }); }); describe("getAllCachedStickers", () => { @@ -209,6 +275,20 @@ describe("sticker-cache", () => { expect(result).toStrictEqual([]); }); + it("returns empty array when plugin-state list reads fail", () => { + stickerCache.setTelegramStickerCacheStoreForTest({ + ...createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE, + maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES, + }), + entries() { + throw new Error("entries failed"); + }, + }); + + expect(stickerCache.getAllCachedStickers()).toStrictEqual([]); + }); + it("returns all cached stickers", () => { stickerCache.cacheSticker({ fileId: "a", diff --git a/extensions/telegram/src/thread-bindings.test.ts b/extensions/telegram/src/thread-bindings.test.ts index 9efd6ad473e..9a62fe8c2cd 100644 --- a/extensions/telegram/src/thread-bindings.test.ts +++ b/extensions/telegram/src/thread-bindings.test.ts @@ -3,11 +3,14 @@ import os from "node:os"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime"; -import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn()); const readAcpSessionEntryMock = vi.hoisted(() => vi.fn()); vi.mock("openclaw/plugin-sdk/acp-runtime", async () => { @@ -21,24 +24,20 @@ vi.mock("openclaw/plugin-sdk/acp-runtime", async () => { }; }); -vi.mock("openclaw/plugin-sdk/json-store", async () => { - const actual = await vi.importActual( - "openclaw/plugin-sdk/json-store", - ); - writeJsonFileAtomicallyMock.mockImplementation(actual.writeJsonFileAtomically); - return { - ...actual, - writeJsonFileAtomically: writeJsonFileAtomicallyMock, - }; -}); - import { + TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES, + TELEGRAM_THREAD_BINDINGS_NAMESPACE, testing, createTelegramThreadBindingManager as createTelegramThreadBindingManagerImpl, setTelegramThreadBindingIdleTimeoutBySessionKey, setTelegramThreadBindingMaxAgeBySessionKey, + setTelegramThreadBindingStoreForTest, } from "./thread-bindings.js"; +type ThreadBindingStoreEntry = ReturnType< + ReturnType["listBindings"] +>[number]; + const TELEGRAM_THREAD_BINDINGS_TEST_CFG = { channels: { telegram: { @@ -63,14 +62,30 @@ function createTelegramThreadBindingManager( async function flushMicrotasks(): Promise { await Promise.resolve(); await new Promise((resolve) => queueMicrotask(resolve)); + await new Promise((resolve) => setImmediate(resolve)); } describe("telegram thread bindings", () => { const originalStateDir = process.env.OPENCLAW_STATE_DIR; let stateDirOverride: string | undefined; + let threadBindingStore: PluginStateSyncKeyedStore; + + function createThreadBindingStore(): PluginStateSyncKeyedStore { + return createPluginStateSyncKeyedStoreForTests("telegram", { + namespace: TELEGRAM_THREAD_BINDINGS_NAMESPACE, + maxEntries: TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES, + }); + } + + function storedBindings(): ThreadBindingStoreEntry[] { + return threadBindingStore.entries().map((entry) => entry.value); + } beforeEach(async () => { - writeJsonFileAtomicallyMock.mockClear(); + resetPluginStateStoreForTests({ closeDatabase: false }); + threadBindingStore = createThreadBindingStore(); + threadBindingStore.clear(); + setTelegramThreadBindingStoreForTest(threadBindingStore); readAcpSessionEntryMock.mockReset(); const acpRuntime = await vi.importActual( "openclaw/plugin-sdk/acp-runtime", @@ -82,6 +97,8 @@ describe("telegram thread bindings", () => { afterEach(async () => { vi.useRealTimers(); await testing.resetTelegramThreadBindingsForTests(); + setTelegramThreadBindingStoreForTest(undefined); + resetPluginStateStoreForTests(); if (stateDirOverride) { fs.rmSync(stateDirOverride, { recursive: true, force: true }); stateDirOverride = undefined; @@ -182,6 +199,8 @@ describe("telegram thread bindings", () => { import.meta.url, "./thread-bindings.js?scope=shared-b", ); + bindingsA.setTelegramThreadBindingStoreForTest(threadBindingStore); + bindingsB.setTelegramThreadBindingStoreForTest(threadBindingStore); await bindingsA.testing.resetTelegramThreadBindingsForTests(); @@ -219,6 +238,8 @@ describe("telegram thread bindings", () => { ).toBe("agent:main:subagent:child-shared"); } finally { await bindingsA.testing.resetTelegramThreadBindingsForTests(); + bindingsA.setTelegramThreadBindingStoreForTest(undefined); + bindingsB.setTelegramThreadBindingStoreForTest(undefined); } }); @@ -301,12 +322,9 @@ describe("telegram thread bindings", () => { maxAgeMs: 2 * 60 * 60 * 1000, }); - const statePath = path.join( - resolveStateDir(process.env, os.homedir), - "telegram", - "thread-bindings-no-persist.json", + expect(storedBindings().filter((binding) => binding.accountId === "no-persist")).toStrictEqual( + [], ); - expect(fs.existsSync(statePath)).toBe(false); }); it("persists unbinds before restart so removed bindings do not come back", async () => { @@ -345,6 +363,62 @@ describe("telegram thread bindings", () => { expect(reloaded.getByConversationId("8460800771")).toBeUndefined(); }); + it("persists bindings with json-clean metadata", async () => { + createTelegramThreadBindingManager({ + accountId: "metadata", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:metadata-child", + targetKind: "subagent", + conversation: { + channel: "telegram", + accountId: "metadata", + conversationId: "metadata-thread", + }, + metadata: { + retained: "yes", + omitted: undefined, + }, + }); + + await testing.resetTelegramThreadBindingsForTests(); + + const reloaded = createTelegramThreadBindingManager({ + accountId: "metadata", + persist: true, + enableSweeper: false, + }); + + expect(reloaded.getByConversationId("metadata-thread")?.metadata).toStrictEqual({ + retained: "yes", + }); + expect( + storedBindings().find((binding) => binding.accountId === "metadata")?.metadata, + ).toStrictEqual({ + retained: "yes", + }); + }); + + it("starts with empty bindings when the plugin-state store cannot be read", () => { + setTelegramThreadBindingStoreForTest({ + ...threadBindingStore, + entries() { + throw new Error("state unavailable"); + }, + }); + + const manager = createTelegramThreadBindingManager({ + accountId: "read-failure", + persist: true, + enableSweeper: false, + }); + + expect(manager.listBindings()).toStrictEqual([]); + }); + it("cleans up stale ACP bindings before restart routing can reuse them", async () => { stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); process.env.OPENCLAW_STATE_DIR = stateDirOverride; @@ -384,19 +458,7 @@ describe("telegram thread bindings", () => { expect(reloaded.getByConversationId("cleanup-me")).toBeUndefined(); await testing.resetTelegramThreadBindingsForTests(); - const persisted = JSON.parse( - fs.readFileSync( - path.join( - resolveStateDir(process.env, os.homedir), - "telegram", - "thread-bindings-default.json", - ), - "utf8", - ), - ) as { bindings?: Array<{ conversationId?: string }> }; - expect(persisted.bindings?.map((binding) => binding.conversationId)).not.toContain( - "cleanup-me", - ); + expect(storedBindings().map((binding) => binding.conversationId)).not.toContain("cleanup-me"); }); it("keeps plugin-owned bindings when ACP cleanup runs on startup", async () => { @@ -505,15 +567,9 @@ describe("telegram thread bindings", () => { await testing.resetTelegramThreadBindingsForTests(); - const statePath = path.join( - resolveStateDir(process.env, os.homedir), - "telegram", - "thread-bindings-persist-reset.json", - ); - const persisted = JSON.parse(fs.readFileSync(statePath, "utf8")) as { - bindings?: Array<{ idleTimeoutMs?: number }>; - }; - expect(persisted.bindings?.[0]?.idleTimeoutMs).toBe(90_000); + expect( + storedBindings().find((binding) => binding.accountId === "persist-reset")?.idleTimeoutMs, + ).toBe(90_000); }); it("does not leak unhandled rejections when a persist write fails", async () => { @@ -542,8 +598,11 @@ describe("telegram thread bindings", () => { }, }); - writeJsonFileAtomicallyMock.mockImplementationOnce(async () => { - throw new Error("persist boom"); + setTelegramThreadBindingStoreForTest({ + ...threadBindingStore, + register() { + throw new Error("persist boom"); + }, }); manager.touchConversation("-100200300:topic:100"); diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index 774b1442d0f..d0d750e39a1 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -1,3 +1,4 @@ +import { createHash } from "node:crypto"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; @@ -14,19 +15,23 @@ import { type SessionBindingRecord, } from "openclaw/plugin-sdk/conversation-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; +import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { normalizeAccountId, isAcpSessionKey } from "openclaw/plugin-sdk/routing"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime"; +import { getTelegramRuntime } from "./runtime.js"; import { resolveTelegramToken } from "./token.js"; const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000; const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0; const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000; const STORE_VERSION = 1; +export const TELEGRAM_THREAD_BINDINGS_NAMESPACE = "telegram.thread-bindings"; +export const TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES = 5_000; let telegramSendModulePromise: Promise | undefined; +let threadBindingStoreForTest: PluginStateSyncKeyedStore | undefined; async function loadTelegramSendModule() { telegramSendModulePromise ??= import("./send.js"); @@ -55,6 +60,8 @@ type StoredTelegramBindingState = { bindings: TelegramThreadBindingRecord[]; }; +type TelegramThreadBindingStore = PluginStateSyncKeyedStore; + type TelegramThreadBindingManager = { accountId: string; shouldPersistMutations: () => boolean; @@ -116,6 +123,23 @@ function resolveBindingKey(params: { accountId: string; conversationId: string } return `${params.accountId}:${params.conversationId}`; } +function resolveStoredBindingKey(params: { accountId: string; conversationId: string }): string { + return createHash("sha256") + .update(`${params.accountId}\0${params.conversationId}`, "utf8") + .digest("hex") + .slice(0, 32); +} + +function openThreadBindingStore(): TelegramThreadBindingStore { + return ( + threadBindingStoreForTest ?? + getTelegramRuntime().state.openSyncKeyedStore({ + namespace: TELEGRAM_THREAD_BINDINGS_NAMESPACE, + maxEntries: TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES, + }) + ); +} + function toSessionBindingTargetKind(raw: TelegramBindingTargetKind): BindingTargetKind { return raw === "subagent" ? "subagent" : "session"; } @@ -228,6 +252,20 @@ function resolveBindingsPath(accountId: string, env: NodeJS.ProcessEnv = process return path.join(stateDir, "telegram", `thread-bindings-${accountId}.json`); } +function normalizeMetadataForStore( + metadata: Record | undefined, +): Record | undefined { + if (!metadata) { + return undefined; + } + const serialized = JSON.stringify(metadata); + if (!serialized) { + return undefined; + } + const parsed = JSON.parse(serialized) as Record; + return Object.keys(parsed).length > 0 ? parsed : undefined; +} + function summarizeLifecycleForLog( record: TelegramThreadBindingRecord, defaults: { @@ -243,8 +281,60 @@ function summarizeLifecycleForLog( return `idle=${idleLabel} maxAge=${maxAgeLabel}`; } -function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] { - const filePath = resolveBindingsPath(accountId); +function sanitizeStoredBinding( + accountId: string, + entry: Partial | null | undefined, +): TelegramThreadBindingRecord | null { + const conversationId = normalizeOptionalString(entry?.conversationId); + const targetSessionKey = normalizeOptionalString(entry?.targetSessionKey) ?? ""; + const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp"; + if (!conversationId || !targetSessionKey) { + return null; + } + const boundAt = + typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt) + ? Math.floor(entry.boundAt) + : Date.now(); + const lastActivityAt = + typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt) + ? Math.floor(entry.lastActivityAt) + : boundAt; + const record: TelegramThreadBindingRecord = { + accountId, + conversationId, + targetSessionKey, + targetKind, + boundAt, + lastActivityAt, + }; + if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) { + record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs)); + } + if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) { + record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs)); + } + if (typeof entry?.agentId === "string" && entry.agentId.trim()) { + record.agentId = entry.agentId.trim(); + } + if (typeof entry?.label === "string" && entry.label.trim()) { + record.label = entry.label.trim(); + } + if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) { + record.boundBy = entry.boundBy.trim(); + } + const metadata = normalizeMetadataForStore( + entry?.metadata && typeof entry.metadata === "object" ? { ...entry.metadata } : undefined, + ); + if (metadata) { + record.metadata = metadata; + } + return record; +} + +function readLegacyBindingsFile( + filePath: string, + accountId: string, +): TelegramThreadBindingRecord[] { try { const raw = fs.readFileSync(filePath, "utf-8"); const parsed = JSON.parse(raw) as StoredTelegramBindingState; @@ -253,47 +343,10 @@ function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] } const bindings: TelegramThreadBindingRecord[] = []; for (const entry of parsed.bindings) { - const conversationId = normalizeOptionalString(entry?.conversationId); - const targetSessionKey = normalizeOptionalString(entry?.targetSessionKey) ?? ""; - const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp"; - if (!conversationId || !targetSessionKey) { - continue; + const record = sanitizeStoredBinding(accountId, entry); + if (record) { + bindings.push(record); } - const boundAt = - typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt) - ? Math.floor(entry.boundAt) - : Date.now(); - const lastActivityAt = - typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt) - ? Math.floor(entry.lastActivityAt) - : boundAt; - const record: TelegramThreadBindingRecord = { - accountId, - conversationId, - targetSessionKey, - targetKind, - boundAt, - lastActivityAt, - }; - if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) { - record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs)); - } - if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) { - record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs)); - } - if (typeof entry?.agentId === "string" && entry.agentId.trim()) { - record.agentId = entry.agentId.trim(); - } - if (typeof entry?.label === "string" && entry.label.trim()) { - record.label = entry.label.trim(); - } - if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) { - record.boundBy = entry.boundBy.trim(); - } - if (entry?.metadata && typeof entry.metadata === "object") { - record.metadata = { ...entry.metadata }; - } - bindings.push(record); } return bindings; } catch (err) { @@ -305,7 +358,43 @@ function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] } } -async function persistBindingsToDisk(params: { +function loadBindingsFromStore(accountId: string): TelegramThreadBindingRecord[] { + let store: TelegramThreadBindingStore; + try { + store = openThreadBindingStore(); + } catch (err) { + logVerbose(`telegram thread bindings store open failed (${accountId}): ${String(err)}`); + return []; + } + let entries: Array<{ key: string; value: TelegramThreadBindingRecord }>; + try { + entries = store.entries(); + } catch (err) { + logVerbose(`telegram thread bindings store read failed (${accountId}): ${String(err)}`); + return []; + } + const bindings: TelegramThreadBindingRecord[] = []; + for (const entry of entries) { + if (entry.value.accountId !== accountId) { + continue; + } + const sanitized = sanitizeStoredBinding(accountId, entry.value); + if (sanitized) { + bindings.push(sanitized); + continue; + } + try { + store.delete(entry.key); + } catch (err) { + logVerbose( + `telegram thread bindings invalid row cleanup failed (${accountId}): ${String(err)}`, + ); + } + } + return bindings; +} + +async function persistBindingsToStore(params: { accountId: string; persist: boolean; bindings?: TelegramThreadBindingRecord[]; @@ -313,15 +402,27 @@ async function persistBindingsToDisk(params: { if (!params.persist) { return; } - const payload: StoredTelegramBindingState = { - version: STORE_VERSION, - bindings: - params.bindings ?? - [...getThreadBindingsState().bindingsByAccountConversation.values()].filter( - (entry) => entry.accountId === params.accountId, - ), - }; - await writeJsonFileAtomically(resolveBindingsPath(params.accountId), payload); + const store = openThreadBindingStore(); + const bindings = + params.bindings ?? + [...getThreadBindingsState().bindingsByAccountConversation.values()].filter( + (entry) => entry.accountId === params.accountId, + ); + const nextKeys = new Set(); + for (const binding of bindings) { + const stored = sanitizeStoredBinding(params.accountId, binding); + if (!stored) { + continue; + } + const key = resolveStoredBindingKey(stored); + nextKeys.add(key); + store.register(key, stored); + } + for (const entry of store.entries()) { + if (entry.value.accountId === params.accountId && !nextKeys.has(entry.key)) { + store.delete(entry.key); + } + } } function listBindingsForAccount(accountId: string): TelegramThreadBindingRecord[] { @@ -343,7 +444,7 @@ function enqueuePersistBindings(params: { const next = previous .catch(() => undefined) .then(async () => { - await persistBindingsToDisk(params); + await persistBindingsToStore(params); }); getThreadBindingsState().persistQueueByAccountId.set(params.accountId, next); const cleanup = () => { @@ -428,7 +529,7 @@ export function createTelegramThreadBindingManager(params: { ); const maxAgeMs = normalizeDurationMs(params.maxAgeMs, DEFAULT_THREAD_BINDING_MAX_AGE_MS); - const loaded = loadBindingsFromDisk(accountId); + const loaded = loadBindingsFromStore(accountId); for (const entry of loaded) { const key = resolveBindingKey({ accountId, @@ -917,7 +1018,26 @@ export async function resetTelegramThreadBindingsForTests() { getThreadBindingsState().bindingsByAccountConversation.clear(); } +export function setTelegramThreadBindingStoreForTest( + store: TelegramThreadBindingStore | undefined, +): void { + threadBindingStoreForTest = store; +} + +export function listTelegramLegacyThreadBindingEntries(params: { + accountId: string; + persistedPath?: string; +}): Array<{ key: string; value: TelegramThreadBindingRecord }> { + const bindings = readLegacyBindingsFile( + params.persistedPath ?? resolveBindingsPath(params.accountId), + params.accountId, + ); + return bindings.map((value) => ({ key: resolveStoredBindingKey(value), value })); +} + export const testing = { resetTelegramThreadBindingsForTests, + resolveBindingsPath, + resolveStoredBindingKey, }; export { testing as __testing }; diff --git a/extensions/telegram/src/token.test.ts b/extensions/telegram/src/token.test.ts index ec0f77b4f13..cf407d50946 100644 --- a/extensions/telegram/src/token.test.ts +++ b/extensions/telegram/src/token.test.ts @@ -2,10 +2,8 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; -import { withStateDirEnv } from "openclaw/plugin-sdk/test-env"; import { afterEach, describe, expect, it, vi } from "vitest"; import { resolveTelegramToken } from "./token.js"; -import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js"; describe("resolveTelegramToken", () => { const tempDirs: string[] = []; @@ -431,18 +429,3 @@ describe("resolveTelegramToken", () => { expectNoTokenForUnknownAccount(createUnknownAccountConfig()); }); }); - -describe("telegram update offset store", () => { - it("persists and reloads the last update id", async () => { - await withStateDirEnv("openclaw-telegram-", async () => { - expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull(); - - await writeTelegramUpdateOffset({ - accountId: "primary", - updateId: 421, - }); - - expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBe(421); - }); - }); -}); diff --git a/extensions/telegram/src/update-offset-store.test.ts b/extensions/telegram/src/update-offset-store.test.ts index 902f25f322c..876f5ef4935 100644 --- a/extensions/telegram/src/update-offset-store.test.ts +++ b/extensions/telegram/src/update-offset-store.test.ts @@ -1,15 +1,40 @@ -import fs from "node:fs/promises"; -import path from "node:path"; -import { withStateDirEnv } from "openclaw/plugin-sdk/test-env"; -import { describe, expect, it } from "vitest"; +import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { + createPluginStateKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { withStateDirEnv } from "openclaw/plugin-sdk/test-env"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { fingerprintTelegramBotToken } from "./token-fingerprint.js"; +import { + TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + TELEGRAM_UPDATE_OFFSET_NAMESPACE, + type TelegramUpdateOffsetState, deleteTelegramUpdateOffset, readTelegramUpdateOffset, + setTelegramUpdateOffsetStoreForTest, + shouldReplaceTelegramUpdateOffsetEntry, writeTelegramUpdateOffset, } from "./update-offset-store.js"; describe("deleteTelegramUpdateOffset", () => { - it("removes the offset file so a new bot starts fresh", async () => { + let updateOffsetStore: PluginStateKeyedStore; + + beforeEach(async () => { + updateOffsetStore = createPluginStateKeyedStoreForTests("telegram", { + namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE, + maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + }); + await updateOffsetStore.clear(); + setTelegramUpdateOffsetStoreForTest(updateOffsetStore); + }); + + afterEach(() => { + setTelegramUpdateOffsetStoreForTest(undefined); + resetPluginStateStoreForTests(); + }); + + it("removes the offset row so a new bot starts fresh", async () => { await withStateDirEnv("openclaw-tg-offset-", async () => { await writeTelegramUpdateOffset({ accountId: "default", updateId: 432_000_000 }); expect(await readTelegramUpdateOffset({ accountId: "default" })).toBe(432_000_000); @@ -19,7 +44,7 @@ describe("deleteTelegramUpdateOffset", () => { }); }); - it("keeps a missing offset file absent after delete", async () => { + it("keeps a missing offset row absent after delete", async () => { await withStateDirEnv("openclaw-tg-offset-", async () => { await deleteTelegramUpdateOffset({ accountId: "nonexistent" }); expect(await readTelegramUpdateOffset({ accountId: "nonexistent" })).toBeNull(); @@ -38,6 +63,24 @@ describe("deleteTelegramUpdateOffset", () => { }); }); + it("surfaces plugin-state write failures", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + setTelegramUpdateOffsetStoreForTest({ + ...createPluginStateKeyedStoreForTests("telegram", { + namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE, + maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + }), + async register() { + throw new Error("store write failed"); + }, + }); + + await expect( + writeTelegramUpdateOffset({ accountId: "default", updateId: 808 }), + ).rejects.toThrow("store write failed"); + }); + }); + it("returns null when stored offset was written by a different bot token", async () => { await withStateDirEnv("openclaw-tg-offset-", async () => { await writeTelegramUpdateOffset({ @@ -90,15 +133,12 @@ describe("deleteTelegramUpdateOffset", () => { }); }); - it("invokes onRotationDetected for legacy offsets without bot identity", async () => { - await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => { - const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json"); - await fs.mkdir(path.dirname(legacyPath), { recursive: true }); - await fs.writeFile( - legacyPath, - `${JSON.stringify({ version: 1, lastUpdateId: 777 }, null, 2)}\n`, - "utf-8", - ); + it("invokes onRotationDetected for imported legacy offsets without bot identity", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + await updateOffsetStore.register("default", { + version: 1, + lastUpdateId: 777, + } as TelegramUpdateOffsetState); const rotations: Array> = []; const offset = await readTelegramUpdateOffset({ @@ -121,6 +161,83 @@ describe("deleteTelegramUpdateOffset", () => { }); }); + it("returns null when the plugin-state read fails", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + setTelegramUpdateOffsetStoreForTest({ + ...createPluginStateKeyedStoreForTests("telegram", { + namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE, + maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + }), + async lookup() { + throw new Error("store unavailable"); + }, + }); + + expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull(); + }); + }); + + it("lets migration replace stale plugin-state with a higher compatible imported offset", () => { + const token = "111111:current"; + expect( + shouldReplaceTelegramUpdateOffsetEntry({ + botToken: token, + existingValue: { + version: 3, + lastUpdateId: 10, + botId: "111111", + tokenFingerprint: fingerprintTelegramBotToken(token), + }, + incomingValue: { + version: 3, + lastUpdateId: 20, + botId: "111111", + tokenFingerprint: fingerprintTelegramBotToken(token), + }, + }), + ).toBe(true); + }); + + it("keeps plugin-state when the imported offset belongs to another bot", () => { + const token = "111111:current"; + expect( + shouldReplaceTelegramUpdateOffsetEntry({ + botToken: token, + existingValue: { + version: 3, + lastUpdateId: 10, + botId: "111111", + tokenFingerprint: fingerprintTelegramBotToken(token), + }, + incomingValue: { + version: 3, + lastUpdateId: 999, + botId: "222222", + tokenFingerprint: "stale", + }, + }), + ).toBe(false); + }); + + it("keeps plugin-state across persisted bot-id conflicts when no token is available", () => { + expect( + shouldReplaceTelegramUpdateOffsetEntry({ + existingValue: { + version: 3, + lastUpdateId: 10, + botId: "111111", + tokenFingerprint: "current-fingerprint", + }, + incomingValue: { + version: 3, + lastUpdateId: 999, + botId: "222222", + tokenFingerprint: "stale-fingerprint", + }, + }), + ).toBe(false); + }); + it("detects same-bot token rotation via the persisted fingerprint", async () => { await withStateDirEnv("openclaw-tg-offset-", async () => { const original = "111111:original-secret"; @@ -160,15 +277,13 @@ describe("deleteTelegramUpdateOffset", () => { }); }); - it("treats v2 bot-id-only offsets as stale when token identity cannot be verified", async () => { - await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => { - const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json"); - await fs.mkdir(path.dirname(legacyPath), { recursive: true }); - await fs.writeFile( - legacyPath, - `${JSON.stringify({ version: 2, lastUpdateId: 999, botId: "111111" }, null, 2)}\n`, - "utf-8", - ); + it("treats imported v2 bot-id-only offsets as stale when token identity cannot be verified", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + await updateOffsetStore.register("default", { + version: 2, + lastUpdateId: 999, + botId: "111111", + } as TelegramUpdateOffsetState); const rotations: Array> = []; const offset = await readTelegramUpdateOffset({ @@ -214,15 +329,12 @@ describe("deleteTelegramUpdateOffset", () => { }); }); - it("treats legacy offset records without bot identity as stale when token is provided", async () => { - await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => { - const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json"); - await fs.mkdir(path.dirname(legacyPath), { recursive: true }); - await fs.writeFile( - legacyPath, - `${JSON.stringify({ version: 1, lastUpdateId: 777 }, null, 2)}\n`, - "utf-8", - ); + it("treats imported legacy offset records without bot identity as stale when token is provided", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + await updateOffsetStore.register("default", { + version: 1, + lastUpdateId: 777, + } as TelegramUpdateOffsetState); expect( await readTelegramUpdateOffset({ @@ -233,22 +345,20 @@ describe("deleteTelegramUpdateOffset", () => { }); }); - it("ignores invalid persisted update IDs from disk", async () => { - await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => { - const offsetPath = path.join(stateDir, "telegram", "update-offset-default.json"); - await fs.mkdir(path.dirname(offsetPath), { recursive: true }); - await fs.writeFile( - offsetPath, - `${JSON.stringify({ version: 2, lastUpdateId: -1, botId: "111111" }, null, 2)}\n`, - "utf-8", - ); + it("ignores invalid persisted update IDs from plugin-state", async () => { + await withStateDirEnv("openclaw-tg-offset-", async () => { + await updateOffsetStore.register("default", { + version: 2, + lastUpdateId: -1, + botId: "111111", + } as TelegramUpdateOffsetState); expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull(); - await fs.writeFile( - offsetPath, - `${JSON.stringify({ version: 2, lastUpdateId: Number.POSITIVE_INFINITY, botId: "111111" }, null, 2)}\n`, - "utf-8", - ); + await updateOffsetStore.register("default", { + version: 2, + lastUpdateId: "not-a-number", + botId: "111111", + } as unknown as TelegramUpdateOffsetState); expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull(); }); }); diff --git a/extensions/telegram/src/update-offset-store.ts b/extensions/telegram/src/update-offset-store.ts index 9b9fc689064..9e483d348e8 100644 --- a/extensions/telegram/src/update-offset-store.ts +++ b/extensions/telegram/src/update-offset-store.ts @@ -1,24 +1,28 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; -import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; -import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; +import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store"; +import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { getTelegramRuntime } from "./runtime.js"; import { fingerprintTelegramBotToken } from "./token-fingerprint.js"; const STORE_VERSION = 3; +export const TELEGRAM_UPDATE_OFFSET_NAMESPACE = "telegram.update-offsets"; +export const TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES = 1_000; -type TelegramUpdateOffsetState = { +export type TelegramUpdateOffsetState = { version: number; lastUpdateId: number | null; botId: string | null; tokenFingerprint: string | null; }; +type TelegramUpdateOffsetStore = PluginStateKeyedStore; + +let updateOffsetStoreForTest: TelegramUpdateOffsetStore | undefined; + function isValidUpdateId(value: unknown): value is number { return typeof value === "number" && Number.isSafeInteger(value) && value >= 0; } -function normalizeAccountId(accountId?: string) { +export function normalizeTelegramUpdateOffsetAccountId(accountId?: string) { const trimmed = accountId?.trim(); if (!trimmed) { return "default"; @@ -26,13 +30,15 @@ function normalizeAccountId(accountId?: string) { return trimmed.replace(/[^a-z0-9._-]+/gi, "_"); } -function resolveTelegramUpdateOffsetPath( - accountId?: string, - env: NodeJS.ProcessEnv = process.env, -): string { - const stateDir = resolveStateDir(env, os.homedir); - const normalized = normalizeAccountId(accountId); - return path.join(stateDir, "telegram", `update-offset-${normalized}.json`); +function openUpdateOffsetStore(env?: NodeJS.ProcessEnv): TelegramUpdateOffsetStore { + return ( + updateOffsetStoreForTest ?? + getTelegramRuntime().state.openKeyedStore({ + namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE, + maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES, + ...(env ? { env } : {}), + }) + ); } function extractBotIdFromToken(token?: string): string | null { @@ -133,9 +139,14 @@ export async function readTelegramUpdateOffset(params: { env?: NodeJS.ProcessEnv; onRotationDetected?: (info: TelegramUpdateOffsetRotationInfo) => void | Promise; }): Promise { - const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); - const { value } = await readJsonFileWithFallback(filePath, null); - const parsed = safeParseState(value); + const key = normalizeTelegramUpdateOffsetAccountId(params.accountId); + let storedValue: unknown; + try { + storedValue = await openUpdateOffsetStore(params.env).lookup(key); + } catch { + storedValue = undefined; + } + const parsed = safeParseState(storedValue); if (!parsed) { return null; } @@ -156,28 +167,77 @@ export async function writeTelegramUpdateOffset(params: { if (!isValidUpdateId(params.updateId)) { throw new Error("Telegram update offset must be a non-negative safe integer."); } - const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); const payload: TelegramUpdateOffsetState = { version: STORE_VERSION, lastUpdateId: params.updateId, botId: extractBotIdFromToken(params.botToken), tokenFingerprint: fingerprintFromToken(params.botToken), }; - await writeJsonFileAtomically(filePath, payload); + await openUpdateOffsetStore(params.env).register( + normalizeTelegramUpdateOffsetAccountId(params.accountId), + payload, + ); } export async function deleteTelegramUpdateOffset(params: { accountId?: string; env?: NodeJS.ProcessEnv; }): Promise { - const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env); - try { - await fs.unlink(filePath); - } catch (err) { - const code = (err as { code?: string }).code; - if (code === "ENOENT") { - return; - } - throw err; - } + await openUpdateOffsetStore(params.env).delete( + normalizeTelegramUpdateOffsetAccountId(params.accountId), + ); +} + +export function setTelegramUpdateOffsetStoreForTest( + store: TelegramUpdateOffsetStore | undefined, +): void { + updateOffsetStoreForTest = store; +} + +export async function listTelegramLegacyUpdateOffsetEntries(params: { + accountId?: string; + persistedPath: string; +}): Promise> { + const { value } = await readJsonFileWithFallback(params.persistedPath, null); + const parsed = safeParseState(value); + if (!parsed || parsed.lastUpdateId === null) { + return []; + } + return [{ key: normalizeTelegramUpdateOffsetAccountId(params.accountId), value: parsed }]; +} + +export function shouldReplaceTelegramUpdateOffsetEntry(params: { + existingValue: unknown; + incomingValue: unknown; + botToken?: string; +}): boolean { + const existing = safeParseState(params.existingValue); + const incoming = safeParseState(params.incomingValue); + if (!incoming || incoming.lastUpdateId === null) { + return false; + } + if (!existing || existing.lastUpdateId === null) { + return true; + } + if (!params.botToken) { + if (existing.botId && incoming.botId && existing.botId !== incoming.botId) { + return false; + } + if ( + existing.tokenFingerprint && + incoming.tokenFingerprint && + existing.tokenFingerprint !== incoming.tokenFingerprint + ) { + return false; + } + } + const incomingRotation = rotationForToken(incoming, params.botToken); + if (incomingRotation) { + return false; + } + const existingRotation = rotationForToken(existing, params.botToken); + if (existingRotation) { + return true; + } + return incoming.lastUpdateId > existing.lastUpdateId; } diff --git a/src/channels/plugins/types.core.ts b/src/channels/plugins/types.core.ts index 21a3f1fca0f..5614b1bc469 100644 --- a/src/channels/plugins/types.core.ts +++ b/src/channels/plugins/types.core.ts @@ -173,6 +173,11 @@ export type ChannelLegacyStateMigrationPlan = stateDir?: string; cleanupSource?: "rename"; preview?: string; + shouldReplaceExistingEntry?: (params: { + key: string; + existingValue: unknown; + incomingValue: unknown; + }) => boolean | Promise; readEntries: () => | Array<{ key: string; value: unknown; ttlMs?: number }> | Promise>; diff --git a/src/channels/typing.test.ts b/src/channels/typing.test.ts index 8d6232d408b..3fb9e109251 100644 --- a/src/channels/typing.test.ts +++ b/src/channels/typing.test.ts @@ -59,10 +59,14 @@ describe("createTypingCallbacks", () => { it("invokes start on reply start", async () => { const { start, onStartError, callbacks } = createTypingHarness(); - await callbacks.onReplyStart(); + try { + await callbacks.onReplyStart(); - expect(start).toHaveBeenCalledTimes(1); - expect(onStartError).not.toHaveBeenCalled(); + expect(start).toHaveBeenCalledTimes(1); + expect(onStartError).not.toHaveBeenCalled(); + } finally { + callbacks.onCleanup?.(); + } }); it("reports start errors", async () => { @@ -70,10 +74,14 @@ describe("createTypingCallbacks", () => { start: vi.fn().mockRejectedValue(new Error("fail")), }); - await callbacks.onReplyStart(); - await flushMicrotasks(); + try { + await callbacks.onReplyStart(); + await flushMicrotasks(); - expect(onStartError).toHaveBeenCalledTimes(1); + expect(onStartError).toHaveBeenCalledTimes(1); + } finally { + callbacks.onCleanup?.(); + } }); it("does not block reply start on a pending typing request", async () => { @@ -87,13 +95,17 @@ describe("createTypingCallbacks", () => { ), }); - await callbacks.onReplyStart(); + try { + await callbacks.onReplyStart(); - expect(start).toHaveBeenCalledTimes(1); - if (!resolveStart) { - throw new Error("Expected typing start resolver to be initialized"); + expect(start).toHaveBeenCalledTimes(1); + if (!resolveStart) { + throw new Error("Expected typing start resolver to be initialized"); + } + resolveStart(); + } finally { + callbacks.onCleanup?.(); } - resolveStart(); }); it("invokes stop on idle and reports stop errors", async () => { diff --git a/src/cli/program/config-guard.test.ts b/src/cli/program/config-guard.test.ts index 527183ed909..aaff24e81b0 100644 --- a/src/cli/program/config-guard.test.ts +++ b/src/cli/program/config-guard.test.ts @@ -211,6 +211,9 @@ describe("ensureConfigReady", () => { ["Discord model picker preferences", "discord/model-picker-preferences.json"], ["Feishu dedupe sidecar", "feishu/dedup/default.json"], ["Telegram bot info cache", "telegram/bot-info-default.json"], + ["Telegram update offset", "telegram/update-offset-default.json"], + ["Telegram sticker cache", "telegram/sticker-cache.json"], + ["Telegram thread bindings", "telegram/thread-bindings-default.json"], ["Telegram pairing allowFrom", "credentials/telegram-allowFrom.json"], ["WhatsApp root auth", "credentials/creds.json"], ])("runs doctor flow for bundled channel legacy state: %s", async (_label, relativePath) => { diff --git a/src/cli/program/config-guard.ts b/src/cli/program/config-guard.ts index 205f7ca64ac..6a6395f78ab 100644 --- a/src/cli/program/config-guard.ts +++ b/src/cli/program/config-guard.ts @@ -57,6 +57,15 @@ function isLegacyWhatsAppAuthFile(name: string): boolean { return name.endsWith(".json") && /^(app-state-sync|session|sender-key|pre-key)-/.test(name); } +function isLegacyTelegramStateFile(name: string): boolean { + return ( + (name.startsWith("bot-info-") && name.endsWith(".json")) || + (name.startsWith("update-offset-") && name.endsWith(".json")) || + name === "sticker-cache.json" || + (name.startsWith("thread-bindings-") && name.endsWith(".json")) + ); +} + function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir: string): boolean { if (fileOrDirExists(path.join(stateDir, "discord", "model-picker-preferences.json"))) { return true; @@ -66,10 +75,7 @@ function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir: } if ( fileOrDirExists(path.join(oauthDir, "telegram-allowFrom.json")) || - dirHasFile( - path.join(stateDir, "telegram"), - (name) => name.startsWith("bot-info-") && name.endsWith(".json"), - ) + dirHasFile(path.join(stateDir, "telegram"), isLegacyTelegramStateFile) ) { return true; } diff --git a/src/commands/doctor-state-migrations.test.ts b/src/commands/doctor-state-migrations.test.ts index 8fb02ca9d32..2ffc589833d 100644 --- a/src/commands/doctor-state-migrations.test.ts +++ b/src/commands/doctor-state-migrations.test.ts @@ -880,6 +880,58 @@ describe("doctor legacy state migrations", () => { }); }); + it("replaces existing plugin-state entries when a channel import plan asks for it", async () => { + const root = await makeTempRoot(); + const sourcePath = path.join(root, "legacy-cache.json"); + fs.writeFileSync(sourcePath, "legacy", "utf-8"); + mockedChannelMigrationPlans.plans = [ + { + kind: "plugin-state-import", + label: "Test replace cache", + sourcePath, + targetPath: "plugin state:test.replace-cache", + pluginId: "telegram", + namespace: "test.replace-cache", + maxEntries: 4, + scopeKey: "", + cleanupSource: "rename", + readEntries: () => [{ key: "existing", value: { offset: 20 } }], + shouldReplaceExistingEntry: (params: { existingValue: unknown; incomingValue: unknown }) => + (params.incomingValue as { offset: number }).offset > + (params.existingValue as { offset: number }).offset, + }, + ]; + + await withStateDir(root, async () => { + const store = createPluginStateKeyedStore<{ offset: number }>("telegram", { + namespace: "test.replace-cache", + maxEntries: 4, + }); + await store.register("existing", { offset: 10 }); + }); + resetPluginStateStoreForTests(); + + const detected = await detectLegacyStateMigrations({ + cfg: {}, + env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv, + }); + const result = await runLegacyStateMigrations({ detected }); + + expect(result.warnings).toStrictEqual([]); + expect(result.changes).toContain("Migrated 1 Test replace cache entry → plugin state"); + expect(result.changes).toContain( + `Archived Test replace cache legacy source → ${sourcePath}.migrated`, + ); + + await withStateDir(root, async () => { + const store = createPluginStateKeyedStore<{ offset: number }>("telegram", { + namespace: "test.replace-cache", + maxEntries: 4, + }); + expect(await store.lookup("existing")).toStrictEqual({ offset: 20 }); + }); + }); + it("keeps plugin-state import source when plugin cap eviction drops an imported row", async () => { const root = await makeTempRoot(); const maxPluginStateEntries = 40; diff --git a/src/infra/state-migrations.ts b/src/infra/state-migrations.ts index 79ee8cabfec..981a6b84630 100644 --- a/src/infra/state-migrations.ts +++ b/src/infra/state-migrations.ts @@ -979,58 +979,87 @@ async function runLegacyMigrationPlans( const expectedKeys = new Set(existingKeys); let remainingCapacity = Math.max(0, plan.maxEntries - storeEntries.length); const entries = await plan.readEntries(); - const missingEntries = entries.filter( - ({ key }) => !existingKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)), - ); + const candidateEntries: Array<{ + key: string; + targetKey: string; + value: unknown; + ttlMs?: number; + existedBefore: boolean; + }> = []; + const failedTargetKeys = new Set(); + let missingEntryCount = 0; + for (const entry of entries) { + const targetKey = resolvePluginStateImportTargetKey(plan.scopeKey, entry.key); + const existingValue = existingValuesByKey.get(targetKey); + if (existingKeys.has(targetKey)) { + const shouldReplace = + existingValue !== undefined && + (await plan.shouldReplaceExistingEntry?.({ + key: entry.key, + existingValue, + incomingValue: entry.value, + })); + if (shouldReplace) { + candidateEntries.push({ ...entry, targetKey, existedBefore: true }); + } + continue; + } + candidateEntries.push({ ...entry, targetKey, existedBefore: false }); + missingEntryCount++; + } const pluginRemainingCapacity = Math.max( 0, MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN - pluginEntryCount, ); - if (missingEntries.length > pluginRemainingCapacity) { + if (missingEntryCount > pluginRemainingCapacity) { warnings.push( - `Skipped migrating ${plan.label} because plugin state has room for ${pluginRemainingCapacity} of ${missingEntries.length} missing entries; left legacy source in place`, + `Skipped migrating ${plan.label} because plugin state has room for ${pluginRemainingCapacity} of ${missingEntryCount} missing entries; left legacy source in place`, ); return; } let imported = 0; - const importedKeys: string[] = []; - for (const entry of entries) { - const targetKey = resolvePluginStateImportTargetKey(plan.scopeKey, entry.key); - if (existingKeys.has(targetKey)) { - continue; - } - if (remainingCapacity <= 0) { + const changedKeys: string[] = []; + for (const entry of candidateEntries) { + if (!entry.existedBefore && remainingCapacity <= 0) { break; } try { await store.register( - targetKey, + entry.targetKey, entry.value, entry.ttlMs != null ? { ttlMs: entry.ttlMs } : undefined, ); const nextExpectedKeys = new Set(expectedKeys); - nextExpectedKeys.add(targetKey); + nextExpectedKeys.add(entry.targetKey); const liveKeys = new Set((await store.entries()).map(({ key }) => key)); const missingKey = findMissingKey(nextExpectedKeys, liveKeys); if (missingKey) { - for (const importedKey of importedKeys.toReversed()) { - await store.delete(importedKey); + for (const changedKey of changedKeys.toReversed()) { + if (existingValuesByKey.has(changedKey)) { + await store.register(changedKey, existingValuesByKey.get(changedKey)); + } else { + await store.delete(changedKey); + } } - await store.delete(targetKey); - if (existingValuesByKey.has(missingKey)) { - await store.register(missingKey, existingValuesByKey.get(missingKey)); + if (existingValuesByKey.has(entry.targetKey)) { + await store.register(entry.targetKey, existingValuesByKey.get(entry.targetKey)); + } else { + await store.delete(entry.targetKey); } warnings.push( `Stopped migrating ${plan.label} because plugin state cap evicted ${missingKey}; left legacy source in place`, ); return; } - expectedKeys.add(targetKey); - existingKeys.add(targetKey); - importedKeys.push(targetKey); - remainingCapacity--; + expectedKeys.add(entry.targetKey); + existingKeys.add(entry.targetKey); + changedKeys.push(entry.targetKey); + if (!entry.existedBefore) { + remainingCapacity--; + } imported++; } catch (err) { + failedTargetKeys.add(entry.targetKey); warnings.push(`Failed migrating ${plan.label} entry ${entry.key}: ${String(err)}`); } } @@ -1045,8 +1074,10 @@ async function runLegacyMigrationPlans( } const allEntriesCovered = entries.length > 0 && - entries.every(({ key }) => - cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)), + entries.every( + ({ key }) => + cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)) && + !failedTargetKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)), ); if (allEntriesCovered && plan.cleanupSource === "rename" && fileExists(plan.sourcePath)) { const archivedPath = `${plan.sourcePath}.migrated`; diff --git a/test/vitest-scoped-config.test.ts b/test/vitest-scoped-config.test.ts index 991a516591f..d1f4ddafb7a 100644 --- a/test/vitest-scoped-config.test.ts +++ b/test/vitest-scoped-config.test.ts @@ -378,7 +378,6 @@ describe("scoped vitest configs", () => { it("keeps scoped lanes on threads with the shared non-isolated runner", () => { for (const config of [ - defaultChannelsConfig, defaultAcpConfig, defaultExtensionsConfig, defaultExtensionChannelsConfig, @@ -453,8 +452,8 @@ describe("scoped vitest configs", () => { expect(testConfig.exclude).not.toContain("chat/slash-command-executor.node.test.ts"); }); - it("defaults channel tests to threads with the non-isolated runner", () => { - expectThreadedNonIsolatedRunner(defaultChannelsConfig); + it("defaults channel tests to isolated threads", () => { + expectThreadedIsolatedRunner(defaultChannelsConfig); }); it("keeps the core channel lane limited to non-extension roots", () => { diff --git a/test/vitest/vitest.channels.config.ts b/test/vitest/vitest.channels.config.ts index e0448ffb98c..deaa43f4f23 100644 --- a/test/vitest/vitest.channels.config.ts +++ b/test/vitest/vitest.channels.config.ts @@ -12,6 +12,7 @@ export function createChannelsVitestConfig(env?: Record