refactor(telegram): persist plugin state in sqlite

Move Telegram plugin-local state from JSON sidecars into plugin-state SQLite. Keep legacy JSON handling in startup and doctor migration plans, with runtime state now reading and writing SQLite directly. Stabilize the channel Vitest lane by cleaning up typing timers and isolating that lane.
This commit is contained in:
Peter Steinberger
2026-05-31 08:28:53 +01:00
committed by GitHub
parent b9fe0894a6
commit 930b371a2f
21 changed files with 2321 additions and 332 deletions

View File

@@ -2,15 +2,28 @@ import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import path from "node:path";
import type { Message } from "grammy/types";
import { afterEach, describe, expect, it } from "vitest";
import {
createPluginStateKeyedStoreForTests,
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
buildTelegramMessageDispatchReplayKey,
claimTelegramMessageDispatchReplay,
commitTelegramMessageDispatchReplay,
createTelegramMessageDispatchReplayGuard,
releaseTelegramMessageDispatchReplay,
setTelegramMessageDispatchDedupeStoreForTest,
} from "./message-dispatch-dedupe.js";
type MessageDispatchDedupeStore = NonNullable<
Parameters<typeof setTelegramMessageDispatchDedupeStoreForTest>[0]
>;
type SyncMessageDispatchDedupeStore = Extract<MessageDispatchDedupeStore, { entries(): unknown[] }>;
const tempDirs: string[] = [];
function createStorePath(): string {
@@ -27,7 +40,19 @@ function message(params?: { chatId?: number; messageId?: number }): Message {
} as Message;
}
beforeEach(async () => {
resetPluginStateStoreForTests({ closeDatabase: false });
const store = createPluginStateKeyedStoreForTests("telegram", {
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
}) as NonNullable<Parameters<typeof setTelegramMessageDispatchDedupeStoreForTest>[0]>;
await store.clear();
setTelegramMessageDispatchDedupeStoreForTest(store);
});
afterEach(() => {
setTelegramMessageDispatchDedupeStoreForTest(undefined);
resetPluginStateStoreForTests();
for (const dir of tempDirs.splice(0)) {
rmSync(dir, { recursive: true, force: true });
}
@@ -73,6 +98,161 @@ describe("Telegram message dispatch replay guard", () => {
).resolves.toEqual({ kind: "duplicate" });
});
it("preserves concurrent commits that share dedupe buckets", async () => {
const storePath = createStorePath();
const writer = createTelegramMessageDispatchReplayGuard({ storePath });
const keys = Array.from({ length: 400 }, (_, index) =>
JSON.stringify(["message", "1234", index + 1]),
);
await commitTelegramMessageDispatchReplay({
guard: writer,
accountId: "default",
keys,
});
const reader = createTelegramMessageDispatchReplayGuard({ storePath });
await expect(reader.warmup("default")).resolves.toBe(keys.length);
});
it("falls back to same-process replay protection when plugin-state cannot open", async () => {
setTelegramMessageDispatchDedupeStoreForTest(undefined);
const errors: unknown[] = [];
const storePath = createStorePath();
const guard = createTelegramMessageDispatchReplayGuard({
storePath,
onDiskError: (error) => errors.push(error),
});
const first = await claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
});
if (first.kind !== "claimed") {
throw new Error("expected initial claim");
}
await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(false);
await expect(
claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
}),
).resolves.toEqual({ kind: "duplicate" });
await expect(guard.hasRecent(first.key, { namespace: "default" })).resolves.toBe(true);
expect(errors.length).toBeGreaterThan(0);
});
it("keeps same-process replay protection when plugin-state commit fails", async () => {
const failingStore = createPluginStateKeyedStoreForTests("telegram", {
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
}) as NonNullable<Parameters<typeof setTelegramMessageDispatchDedupeStoreForTest>[0]>;
setTelegramMessageDispatchDedupeStoreForTest({
...failingStore,
async register() {
throw new Error("state write failed");
},
});
const storePath = createStorePath();
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
const first = await claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
});
if (first.kind !== "claimed") {
throw new Error("expected initial claim");
}
await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(false);
await expect(
claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
}),
).resolves.toEqual({ kind: "duplicate" });
await expect(guard.hasRecent(first.key, { namespace: "default" })).resolves.toBe(true);
await expect(guard.warmup("default")).resolves.toBe(1);
});
it("keeps same-process replay protection when lookup fails after a successful commit", async () => {
const backingStore = createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
}) as SyncMessageDispatchDedupeStore;
let failLookup = false;
setTelegramMessageDispatchDedupeStoreForTest({
...backingStore,
lookup(key) {
if (failLookup) {
throw new Error("state read failed");
}
return backingStore.lookup(key);
},
});
const storePath = createStorePath();
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
const first = await claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
});
if (first.kind !== "claimed") {
throw new Error("expected initial claim");
}
await expect(guard.commit(first.key, { namespace: "default" })).resolves.toBe(true);
failLookup = true;
await expect(
claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
}),
).resolves.toEqual({ kind: "duplicate" });
});
it("keeps replay histories isolated by session store path", async () => {
const firstStorePath = createStorePath();
const secondStorePath = createStorePath();
const firstGuard = createTelegramMessageDispatchReplayGuard({
storePath: firstStorePath,
});
const first = await claimTelegramMessageDispatchReplay({
guard: firstGuard,
accountId: "default",
msg: message(),
});
if (first.kind !== "claimed") {
throw new Error("expected initial claim");
}
await commitTelegramMessageDispatchReplay({
guard: firstGuard,
accountId: "default",
keys: [first.key],
});
const secondGuard = createTelegramMessageDispatchReplayGuard({
storePath: secondStorePath,
});
await expect(
claimTelegramMessageDispatchReplay({
guard: secondGuard,
accountId: "default",
msg: message(),
}),
).resolves.toEqual({
kind: "claimed",
key: first.key,
});
});
it("keeps accounts isolated and releases retryable pre-dispatch claims", async () => {
const storePath = createStorePath();
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
@@ -112,4 +292,34 @@ describe("Telegram message dispatch replay guard", () => {
key: first.key,
});
});
it("lets an in-flight duplicate retry after the first claim is released", async () => {
const storePath = createStorePath();
const guard = createTelegramMessageDispatchReplayGuard({ storePath });
const first = await claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
});
if (first.kind !== "claimed") {
throw new Error("expected initial claim");
}
const duplicate = claimTelegramMessageDispatchReplay({
guard,
accountId: "default",
msg: message(),
});
releaseTelegramMessageDispatchReplay({
guard,
accountId: "default",
keys: [first.key],
error: new Error("retry"),
});
await expect(duplicate).resolves.toEqual({
kind: "claimed",
key: first.key,
});
});
});

View File

@@ -1,19 +1,205 @@
import { createHash } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import type { Message } from "grammy/types";
import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import type {
ClaimableDedupe,
ClaimableDedupeClaimResult,
} from "openclaw/plugin-sdk/persistent-dedupe";
import type {
PluginStateKeyedStore,
PluginStateSyncKeyedStore,
} from "openclaw/plugin-sdk/plugin-state-runtime";
import { normalizeStringEntries, uniqueStrings } from "openclaw/plugin-sdk/string-coerce-runtime";
import { getTelegramRuntime } from "./runtime.js";
const TELEGRAM_MESSAGE_DISPATCH_TTL_MS = 7 * 24 * 60 * 60 * 1000;
const TELEGRAM_MESSAGE_DISPATCH_MEMORY_MAX = 5000;
const TELEGRAM_MESSAGE_DISPATCH_FILE_MAX = 50_000;
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE = "telegram.message-dispatch-dedupe";
export const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES = 4_096;
const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOGICAL_MAX_ENTRIES = 50_000;
const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_COUNT = 256;
const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_MAX_KEYS = 256;
const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_TTL_MS = 30_000;
const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_RETRY_MS = 10;
const TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_ATTEMPTS = 50;
export type TelegramMessageDispatchReplayGuard = ClaimableDedupe;
type TelegramMessageDispatchDedupeRecord = {
scopeKey: string;
namespace: string;
bucketId: string;
entries: Record<string, number>;
};
type TelegramMessageDispatchDedupeStore =
| PluginStateKeyedStore<TelegramMessageDispatchDedupeRecord>
| PluginStateSyncKeyedStore<TelegramMessageDispatchDedupeRecord>;
type PendingClaim = {
promise: Promise<boolean>;
resolve: (result: boolean) => void;
reject: (error: unknown) => void;
};
type MemoryCommittedClaim = {
namespace: string;
expiresAt: number;
};
let dispatchDedupeStoreForTest: TelegramMessageDispatchDedupeStore | undefined;
export type TelegramMessageDispatchClaim =
| { kind: "claimed"; key: string }
| { kind: "duplicate" }
| { kind: "invalid" };
function openDispatchDedupeStore(): TelegramMessageDispatchDedupeStore {
return (
dispatchDedupeStoreForTest ??
getTelegramRuntime().state.openKeyedStore<TelegramMessageDispatchDedupeRecord>({
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
})
);
}
function resolveDispatchScopeKey(storePath: string): string {
return createHash("sha256").update(storePath, "utf8").digest("hex").slice(0, 24);
}
function dedupeEntryKey(scopeKey: string, namespace: string, key: string): string {
return createHash("sha256")
.update(`${scopeKey}\0${namespace}\0${key}`, "utf8")
.digest("hex")
.slice(0, 32);
}
function dedupeBucketId(key: string): string {
const bucketIndex =
Number.parseInt(createHash("sha256").update(key, "utf8").digest("hex").slice(0, 8), 16) %
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_COUNT;
return bucketIndex.toString(16).padStart(2, "0");
}
function dedupeBucketEntryKey(scopeKey: string, namespace: string, bucketId: string): string {
return createHash("sha256")
.update(`${scopeKey}\0${namespace}\0${bucketId}`, "utf8")
.digest("hex")
.slice(0, 32);
}
function dedupeLegacyBucketEntryKey(params: {
scopeKey: string;
namespace: string;
bucketId: string;
sourcePath: string;
}): string {
const sourceKey = createHash("sha256")
.update(params.sourcePath, "utf8")
.digest("hex")
.slice(0, 12);
return dedupeBucketEntryKey(params.scopeKey, params.namespace, `${params.bucketId}:${sourceKey}`);
}
function dedupeBucketLockKey(bucketKey: string): string {
return `${bucketKey}:lock`;
}
async function sleep(ms: number): Promise<void> {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
}
function pruneDedupeBucketEntries(entries: Record<string, number>, now: number): void {
for (const [key, timestamp] of Object.entries(entries)) {
if (typeof timestamp !== "number" || !Number.isFinite(timestamp)) {
delete entries[key];
continue;
}
if (now - timestamp >= TELEGRAM_MESSAGE_DISPATCH_TTL_MS) {
delete entries[key];
}
}
const keys = Object.keys(entries);
if (keys.length <= TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_MAX_KEYS) {
return;
}
for (const key of keys
.toSorted((left, right) => entries[left] - entries[right])
.slice(0, keys.length - TELEGRAM_MESSAGE_DISPATCH_DEDUPE_BUCKET_MAX_KEYS)) {
delete entries[key];
}
}
function createDedupeBucketRecord(params: {
scopeKey: string;
namespace: string;
bucketId: string;
entries?: Record<string, number>;
}): TelegramMessageDispatchDedupeRecord {
return {
scopeKey: params.scopeKey,
namespace: params.namespace,
bucketId: params.bucketId,
entries: { ...params.entries },
};
}
function normalizeDedupeBucketRecord(
value: TelegramMessageDispatchDedupeRecord | undefined,
params: {
scopeKey: string;
namespace: string;
bucketId: string;
now: number;
},
): TelegramMessageDispatchDedupeRecord {
const entries =
value?.scopeKey === params.scopeKey &&
value.namespace === params.namespace &&
value.bucketId === params.bucketId &&
value.entries &&
typeof value.entries === "object"
? { ...value.entries }
: {};
pruneDedupeBucketEntries(entries, params.now);
return createDedupeBucketRecord({
scopeKey: params.scopeKey,
namespace: params.namespace,
bucketId: params.bucketId,
entries,
});
}
async function lookupDedupeBucketContains(params: {
store: TelegramMessageDispatchDedupeStore;
scopeKey: string;
namespace: string;
bucketId: string;
bucketKey: string;
key: string;
now: number;
}): Promise<boolean> {
const bucket = normalizeDedupeBucketRecord(await params.store.lookup(params.bucketKey), params);
if (bucket.entries[params.key] !== undefined) {
return true;
}
for (const entry of await params.store.entries()) {
if (
entry.key === params.bucketKey ||
entry.value.scopeKey !== params.scopeKey ||
entry.value.namespace !== params.namespace ||
entry.value.bucketId !== params.bucketId
) {
continue;
}
const legacyBucket = normalizeDedupeBucketRecord(entry.value, params);
if (legacyBucket.entries[params.key] !== undefined) {
return true;
}
}
return false;
}
function sanitizeFileSegment(value: string): string {
const trimmed = value.trim();
if (!trimmed) {
@@ -22,6 +208,18 @@ function sanitizeFileSegment(value: string): string {
return trimmed.replace(/[^a-zA-Z0-9_-]/g, "_");
}
export function resolveTelegramMessageDispatchLegacyPath(params: {
storePath: string;
namespace: string;
}): string {
return path.join(
path.dirname(params.storePath),
`${path.basename(params.storePath)}.telegram-message-dispatch-${sanitizeFileSegment(
params.namespace,
)}.json`,
);
}
export function buildTelegramMessageDispatchReplayKey(msg: Message): string | null {
const chatId = msg.chat?.id;
const messageId = msg.message_id;
@@ -35,19 +233,283 @@ export function createTelegramMessageDispatchReplayGuard(params: {
storePath: string;
onDiskError?: (error: unknown) => void;
}): TelegramMessageDispatchReplayGuard {
return createClaimableDedupe({
ttlMs: TELEGRAM_MESSAGE_DISPATCH_TTL_MS,
memoryMaxSize: TELEGRAM_MESSAGE_DISPATCH_MEMORY_MAX,
fileMaxEntries: TELEGRAM_MESSAGE_DISPATCH_FILE_MAX,
resolveFilePath: (namespace) =>
path.join(
path.dirname(params.storePath),
`${path.basename(params.storePath)}.telegram-message-dispatch-${sanitizeFileSegment(
const scopeKey = resolveDispatchScopeKey(params.storePath);
const onStateError = params.onDiskError;
let store: TelegramMessageDispatchDedupeStore | undefined;
const inflight = new Map<string, PendingClaim>();
const committedInMemory = new Map<string, MemoryCommittedClaim>();
const bucketWriteQueue = new Map<string, Promise<void>>();
function getStore(): TelegramMessageDispatchDedupeStore | undefined {
if (store) {
return store;
}
try {
store = openDispatchDedupeStore();
return store;
} catch (error) {
onStateError?.(error);
return undefined;
}
}
function pruneCommittedInMemory(now = Date.now()) {
for (const [entryKey, entry] of committedInMemory) {
if (
entry.expiresAt <= now ||
committedInMemory.size > TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOGICAL_MAX_ENTRIES
) {
committedInMemory.delete(entryKey);
}
}
}
function rememberCommittedInMemory(entryKey: string, namespace: string, now: number) {
committedInMemory.set(entryKey, {
namespace,
expiresAt: now + TELEGRAM_MESSAGE_DISPATCH_TTL_MS,
});
pruneCommittedInMemory(now);
}
function hasCommittedInMemory(entryKey: string, now = Date.now()): boolean {
const entry = committedInMemory.get(entryKey);
if (!entry) {
return false;
}
if (entry.expiresAt <= now) {
committedInMemory.delete(entryKey);
return false;
}
return true;
}
function rememberPendingClaim(entryKey: string): PendingClaim {
let resolve!: (result: boolean) => void;
let reject!: (error: unknown) => void;
const promise = new Promise<boolean>((resolvePromise, rejectPromise) => {
resolve = resolvePromise;
reject = rejectPromise;
});
void promise.catch(() => {});
const pending = { promise, resolve, reject };
inflight.set(entryKey, pending);
return pending;
}
function enqueueBucketWrite<T>(bucketKey: string, write: () => Promise<T>): Promise<T> {
const previous = bucketWriteQueue.get(bucketKey) ?? Promise.resolve();
const next = previous.catch(() => undefined).then(write);
const queued = next.then(
() => undefined,
() => undefined,
);
bucketWriteQueue.set(bucketKey, queued);
void queued.finally(() => {
if (bucketWriteQueue.get(bucketKey) === queued) {
bucketWriteQueue.delete(bucketKey);
}
});
return next;
}
async function withBucketLock<T>(params: {
store: TelegramMessageDispatchDedupeStore;
namespace: string;
bucketId: string;
bucketKey: string;
write: () => Promise<T>;
}): Promise<T> {
const lockKey = dedupeBucketLockKey(params.bucketKey);
const lockValue = createDedupeBucketRecord({
scopeKey,
namespace: `${params.namespace}:lock`,
bucketId: params.bucketId,
});
let locked = false;
for (let attempt = 0; attempt < TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_ATTEMPTS; attempt += 1) {
if (
await params.store.registerIfAbsent(lockKey, lockValue, {
ttlMs: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_TTL_MS,
})
) {
locked = true;
break;
}
await sleep(TELEGRAM_MESSAGE_DISPATCH_DEDUPE_LOCK_RETRY_MS);
}
if (!locked) {
throw new Error(
`timed out acquiring Telegram dispatch dedupe bucket lock: ${params.bucketId}`,
);
}
try {
return await params.write();
} finally {
await params.store.delete(lockKey);
}
}
return {
async claim(key, options): Promise<ClaimableDedupeClaimResult> {
const namespace = options?.namespace?.trim() || "global";
const entryKey = dedupeEntryKey(scopeKey, namespace, key);
const bucketId = dedupeBucketId(key);
const bucketKey = dedupeBucketEntryKey(scopeKey, namespace, bucketId);
if (hasCommittedInMemory(entryKey)) {
return { kind: "duplicate" };
}
const existing = inflight.get(entryKey);
if (existing) {
return { kind: "inflight", pending: existing.promise };
}
const pending = rememberPendingClaim(entryKey);
const store = getStore();
if (!store) {
return { kind: "claimed" };
}
try {
if (
await lookupDedupeBucketContains({
store,
scopeKey,
namespace,
bucketId,
bucketKey,
key,
now: Date.now(),
})
) {
pending.resolve(false);
inflight.delete(entryKey);
return { kind: "duplicate" };
}
return { kind: "claimed" };
} catch (error) {
onStateError?.(error);
return { kind: "claimed" };
}
},
async commit(key, options) {
const namespace = options?.namespace?.trim() || "global";
const now = options?.now ?? Date.now();
const entryKey = dedupeEntryKey(scopeKey, namespace, key);
const bucketId = dedupeBucketId(key);
const bucketKey = dedupeBucketEntryKey(scopeKey, namespace, bucketId);
const store = getStore();
if (!store) {
rememberCommittedInMemory(entryKey, namespace, now);
inflight.get(entryKey)?.resolve(true);
inflight.delete(entryKey);
return false;
}
try {
await enqueueBucketWrite(bucketKey, async () => {
await withBucketLock({
store,
namespace,
bucketId,
bucketKey,
write: async () => {
const bucket = normalizeDedupeBucketRecord(await store.lookup(bucketKey), {
scopeKey,
namespace,
bucketId,
now,
});
bucket.entries[key] = now;
pruneDedupeBucketEntries(bucket.entries, now);
await store.register(bucketKey, bucket, { ttlMs: TELEGRAM_MESSAGE_DISPATCH_TTL_MS });
},
});
});
rememberCommittedInMemory(entryKey, namespace, now);
inflight.get(entryKey)?.resolve(true);
return true;
} catch (error) {
rememberCommittedInMemory(entryKey, namespace, now);
inflight.get(entryKey)?.resolve(true);
onStateError?.(error);
return false;
} finally {
inflight.delete(entryKey);
}
},
release(key, options) {
const namespace = options?.namespace?.trim() || "global";
const entryKey = dedupeEntryKey(scopeKey, namespace, key);
const pending = inflight.get(entryKey);
if (pending) {
pending.reject(options?.error ?? new Error(`claim released before commit: ${namespace}`));
inflight.delete(entryKey);
}
},
async hasRecent(key, options) {
const namespace = options?.namespace?.trim() || "global";
const entryKey = dedupeEntryKey(scopeKey, namespace, key);
const bucketId = dedupeBucketId(key);
const bucketKey = dedupeBucketEntryKey(scopeKey, namespace, bucketId);
if (hasCommittedInMemory(entryKey)) {
return true;
}
const store = getStore();
if (!store) {
return false;
}
try {
return await lookupDedupeBucketContains({
store,
scopeKey,
namespace,
)}.json`,
),
onDiskError: params.onDiskError,
});
bucketId,
bucketKey,
key,
now: Date.now(),
});
} catch (error) {
onStateError?.(error);
return false;
}
},
async warmup(namespace = "global") {
pruneCommittedInMemory();
const memoryCount = [...committedInMemory.values()].filter(
(entry) => entry.namespace === namespace,
).length;
const store = getStore();
if (!store) {
return memoryCount;
}
try {
const now = Date.now();
const persistedCount = (await store.entries())
.filter(
(entry) => entry.value.scopeKey === scopeKey && entry.value.namespace === namespace,
)
.reduce((count, entry) => {
const bucket = normalizeDedupeBucketRecord(entry.value, {
scopeKey,
namespace,
bucketId: entry.value.bucketId,
now,
});
return count + Object.keys(bucket.entries).length;
}, 0);
return persistedCount + memoryCount;
} catch (error) {
onStateError?.(error);
return memoryCount;
}
},
clearMemory() {
inflight.clear();
committedInMemory.clear();
},
memorySize() {
pruneCommittedInMemory();
return inflight.size + committedInMemory.size;
},
};
}
export async function claimTelegramMessageDispatchReplay(params: {
@@ -105,3 +567,64 @@ export function releaseTelegramMessageDispatchReplay(params: {
params.guard.release(key, { namespace: params.accountId, error: params.error });
}
}
export function setTelegramMessageDispatchDedupeStoreForTest(
store: TelegramMessageDispatchDedupeStore | undefined,
): void {
dispatchDedupeStoreForTest = store;
}
export function listTelegramLegacyMessageDispatchDedupeEntries(params: {
storePath: string;
namespace: string;
persistedPath?: string;
}): Array<{ key: string; value: TelegramMessageDispatchDedupeRecord; ttlMs?: number }> {
const filePath = params.persistedPath ?? resolveTelegramMessageDispatchLegacyPath(params);
if (!fs.existsSync(filePath)) {
return [];
}
const now = Date.now();
let parsed: unknown;
try {
parsed = JSON.parse(fs.readFileSync(filePath, "utf8"));
} catch {
return [];
}
if (!parsed || typeof parsed !== "object") {
return [];
}
const scopeKey = resolveDispatchScopeKey(params.storePath);
const buckets = new Map<string, { value: TelegramMessageDispatchDedupeRecord; ttlMs: number }>();
for (const [key, value] of Object.entries(parsed as Record<string, unknown>)) {
if (typeof value !== "number" || !Number.isFinite(value)) {
continue;
}
const ttlMs = TELEGRAM_MESSAGE_DISPATCH_TTL_MS - Math.max(0, now - value);
if (ttlMs <= 0) {
continue;
}
const bucketId = dedupeBucketId(key);
const bucketKey = dedupeLegacyBucketEntryKey({
scopeKey,
namespace: params.namespace,
bucketId,
sourcePath: filePath,
});
const bucket = buckets.get(bucketKey) ?? {
value: createDedupeBucketRecord({
scopeKey,
namespace: params.namespace,
bucketId,
}),
ttlMs: 0,
};
bucket.value.entries[key] = value;
bucket.ttlMs = Math.max(bucket.ttlMs, ttlMs);
buckets.set(bucketKey, bucket);
}
return [...buckets.entries()].map(([key, bucket]) => ({
key,
value: bucket.value,
ttlMs: bucket.ttlMs,
}));
}

View File

@@ -1,7 +1,11 @@
import fs from "node:fs";
import type { Bot } from "grammy";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures";
import { afterEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
buildTelegramConversationContext,
createTelegramMessageCache,
@@ -14,9 +18,12 @@ import {
installTelegramSendTestHooks,
} from "./send.test-harness.js";
import {
TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES,
TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE,
clearSentMessageCache,
recordSentMessage,
resetSentMessageCacheForTest,
setTelegramSentMessageStoreForTest,
wasSentByBot,
} from "./sent-message-cache.js";
@@ -51,6 +58,17 @@ const {
} = telegramSendModule;
const TELEGRAM_TEST_CFG = {};
let sentMessageStore: NonNullable<Parameters<typeof setTelegramSentMessageStoreForTest>[0]>;
beforeEach(() => {
resetPluginStateStoreForTests({ closeDatabase: false });
sentMessageStore = createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE,
maxEntries: TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES,
});
sentMessageStore.clear();
setTelegramSentMessageStoreForTest(sentMessageStore);
});
async function expectChatNotFoundWithChatId(
action: Promise<unknown>,
@@ -186,6 +204,9 @@ function capturedLogText(logFile: string): string {
}
afterEach(() => {
clearSentMessageCache();
setTelegramSentMessageStoreForTest(undefined);
resetPluginStateStoreForTests();
setLoggerOverride(null);
resetLogger();
resetTelegramMessageCacheBucketsForTest();
@@ -195,7 +216,6 @@ afterEach(() => {
describe("sent-message-cache", () => {
afterEach(() => {
vi.useRealTimers();
clearSentMessageCache();
});
it("records and retrieves sent messages", () => {
@@ -224,6 +244,41 @@ describe("sent-message-cache", () => {
expect(wasSentByBot(123, 1)).toBe(false);
});
it("keeps sent-message cache storage failures best-effort", () => {
setTelegramSentMessageStoreForTest({
...sentMessageStore,
entries() {
throw new Error("read boom");
},
register() {
throw new Error("write boom");
},
});
expect(() => recordSentMessage(123, 1)).not.toThrow();
expect(wasSentByBot(123, 1)).toBe(true);
});
it("persists sent-message rows with their remaining logical ttl", () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-01-26T12:00:00.000Z"));
const ttlByMessageId = new Map<string, number>();
setTelegramSentMessageStoreForTest({
...sentMessageStore,
register(key, value, options) {
sentMessageStore.register(key, value, options);
ttlByMessageId.set(value.messageId, options?.ttlMs ?? 0);
},
});
recordSentMessage(123, 1);
vi.advanceTimersByTime(60 * 60 * 1000);
recordSentMessage(123, 2);
expect(ttlByMessageId.get("1")).toBe(23 * 60 * 60 * 1000);
expect(ttlByMessageId.get("2")).toBe(24 * 60 * 60 * 1000);
});
it("keeps sent-message ownership across restart", async () => {
const persistedStorePath = `/tmp/openclaw-telegram-send-tests-${process.pid}-restart.json`;
const sentMessageCfg = { session: { store: persistedStorePath } };
@@ -237,11 +292,13 @@ describe("sent-message-cache", () => {
import.meta.url,
"./sent-message-cache.js?scope=restart",
);
restartedCache.setTelegramSentMessageStoreForTest(sentMessageStore);
try {
expect(restartedCache.wasSentByBot(123, 1, sentMessageCfg)).toBe(true);
} finally {
restartedCache.clearSentMessageCache();
restartedCache.setTelegramSentMessageStoreForTest(undefined);
}
});
@@ -293,6 +350,8 @@ describe("sent-message-cache", () => {
import.meta.url,
"./sent-message-cache.js?scope=shared-b",
);
cacheA.setTelegramSentMessageStoreForTest(sentMessageStore);
cacheB.setTelegramSentMessageStoreForTest(sentMessageStore);
cacheA.clearSentMessageCache();
@@ -304,6 +363,8 @@ describe("sent-message-cache", () => {
expect(cacheA.wasSentByBot(123, 1)).toBe(false);
} finally {
cacheA.clearSentMessageCache();
cacheA.setTelegramSentMessageStoreForTest(undefined);
cacheB.setTelegramSentMessageStoreForTest(undefined);
}
});
});

View File

@@ -1,23 +1,50 @@
import { createHash } from "node:crypto";
import fs from "node:fs";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime";
import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime";
import { getTelegramRuntime } from "./runtime.js";
const TTL_MS = 24 * 60 * 60 * 1000;
export const TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE = "telegram.sent-messages";
export const TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES = 10_000;
const TELEGRAM_SENT_MESSAGES_STATE_KEY = Symbol.for("openclaw.telegramSentMessagesState");
const TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY = Symbol.for(
"openclaw.telegramSentMessagesStoreForTest",
);
type PersistedSentMessage = {
scopeKey: string;
chatId: string;
messageId: string;
timestamp: number;
};
type SentMessageStore = Map<string, Map<string, number>>;
type SentMessagePersistentStore = PluginStateSyncKeyedStore<PersistedSentMessage>;
type SentMessageBucket = {
persistedPath: string;
scopeKey: string;
store: SentMessageStore;
};
type SentMessageState = {
bucketsByPath: Map<string, SentMessageBucket>;
bucketsByScope: Map<string, SentMessageBucket>;
};
let sentMessageStoreForTest: SentMessagePersistentStore | undefined;
function getSentMessageStoreForTest(): SentMessagePersistentStore | undefined {
const globalStore = globalThis as Record<PropertyKey, unknown>;
return (
sentMessageStoreForTest ??
(globalStore[TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY] as
| SentMessagePersistentStore
| undefined)
);
}
function getSentMessageState(): SentMessageState {
const globalStore = globalThis as Record<PropertyKey, unknown>;
const existing = globalStore[TELEGRAM_SENT_MESSAGES_STATE_KEY] as SentMessageState | undefined;
@@ -25,7 +52,7 @@ function getSentMessageState(): SentMessageState {
return existing;
}
const state: SentMessageState = {
bucketsByPath: new Map(),
bucketsByScope: new Map(),
};
globalStore[TELEGRAM_SENT_MESSAGES_STATE_KEY] = state;
return state;
@@ -39,6 +66,28 @@ function resolveSentMessageStorePath(cfg?: Pick<OpenClawConfig, "session">): str
return `${resolveStorePath(cfg?.session?.store)}.telegram-sent-messages.json`;
}
function resolveSentMessageScopeKey(cfg?: Pick<OpenClawConfig, "session">): string {
const storePath = resolveStorePath(cfg?.session?.store);
return createHash("sha256").update(storePath, "utf8").digest("hex").slice(0, 24);
}
function sentMessageEntryKey(scopeKey: string, chatId: string, messageId: string): string {
return createHash("sha256")
.update(`${scopeKey}\0${chatId}\0${messageId}`, "utf8")
.digest("hex")
.slice(0, 32);
}
function openSentMessageStore(): SentMessagePersistentStore {
return (
getSentMessageStoreForTest() ??
getTelegramRuntime().state.openSyncKeyedStore<PersistedSentMessage>({
namespace: TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE,
maxEntries: TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES,
})
);
}
function cleanupExpired(
store: SentMessageStore,
scopeKey: string,
@@ -46,7 +95,7 @@ function cleanupExpired(
now: number,
): void {
for (const [id, timestamp] of entry) {
if (now - timestamp > TTL_MS) {
if (now - timestamp >= TTL_MS) {
entry.delete(id);
}
}
@@ -55,10 +104,7 @@ function cleanupExpired(
}
}
function readPersistedSentMessages(filePath: string): SentMessageStore {
if (!fs.existsSync(filePath)) {
return createSentMessageStore();
}
function readLegacySentMessages(filePath: string): SentMessageStore {
try {
const raw = fs.readFileSync(filePath, "utf-8");
const parsed = JSON.parse(raw) as Record<string, Record<string, number>>;
@@ -86,18 +132,39 @@ function readPersistedSentMessages(filePath: string): SentMessageStore {
}
}
function readPersistedSentMessages(scopeKey: string): SentMessageStore {
const now = Date.now();
const store = createSentMessageStore();
try {
for (const entry of openSentMessageStore().entries()) {
if (entry.value.scopeKey !== scopeKey || now - entry.value.timestamp > TTL_MS) {
continue;
}
let messages = store.get(entry.value.chatId);
if (!messages) {
messages = new Map<string, number>();
store.set(entry.value.chatId, messages);
}
messages.set(entry.value.messageId, entry.value.timestamp);
}
} catch (error) {
logVerbose(`telegram: failed to read sent-message cache: ${String(error)}`);
}
return store;
}
function getSentMessageBucket(cfg?: Pick<OpenClawConfig, "session">): SentMessageBucket {
const state = getSentMessageState();
const persistedPath = resolveSentMessageStorePath(cfg);
const existing = state.bucketsByPath.get(persistedPath);
const scopeKey = resolveSentMessageScopeKey(cfg);
const existing = state.bucketsByScope.get(scopeKey);
if (existing) {
return existing;
}
const bucket = {
persistedPath,
store: readPersistedSentMessages(persistedPath),
scopeKey,
store: readPersistedSentMessages(scopeKey),
};
state.bucketsByPath.set(persistedPath, bucket);
state.bucketsByScope.set(scopeKey, bucket);
return bucket;
}
@@ -106,24 +173,22 @@ function getSentMessages(cfg?: Pick<OpenClawConfig, "session">): SentMessageStor
}
function persistSentMessages(bucket: SentMessageBucket): void {
const { store, persistedPath } = bucket;
const { store, scopeKey } = bucket;
const now = Date.now();
const serialized: Record<string, Record<string, number>> = {};
for (const [chatId, entry] of store) {
cleanupExpired(store, chatId, entry, now);
if (entry.size > 0) {
serialized[chatId] = Object.fromEntries(entry);
for (const [messageId, timestamp] of entry) {
const ttlMs = TTL_MS - Math.max(0, now - timestamp);
if (ttlMs <= 0) {
continue;
}
openSentMessageStore().register(
sentMessageEntryKey(scopeKey, chatId, messageId),
{ scopeKey, chatId, messageId, timestamp },
{ ttlMs },
);
}
}
if (Object.keys(serialized).length === 0) {
fs.rmSync(persistedPath, { force: true });
return;
}
replaceFileAtomicSync({
filePath: persistedPath,
content: JSON.stringify(serialized),
tempPrefix: ".telegram-sent-message-cache",
});
}
export function recordSentMessage(
@@ -170,13 +235,43 @@ export function wasSentByBot(
export function clearSentMessageCache(): void {
const state = getSentMessageState();
for (const bucket of state.bucketsByPath.values()) {
for (const bucket of state.bucketsByScope.values()) {
bucket.store.clear();
fs.rmSync(bucket.persistedPath, { force: true });
}
state.bucketsByPath.clear();
state.bucketsByScope.clear();
openSentMessageStore().clear();
}
export function resetSentMessageCacheForTest(): void {
getSentMessageState().bucketsByPath.clear();
getSentMessageState().bucketsByScope.clear();
}
export function setTelegramSentMessageStoreForTest(
store: SentMessagePersistentStore | undefined,
): void {
sentMessageStoreForTest = store;
const globalStore = globalThis as Record<PropertyKey, unknown>;
if (store) {
globalStore[TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY] = store;
} else {
delete globalStore[TELEGRAM_SENT_MESSAGES_STORE_FOR_TEST_KEY];
}
}
export function listTelegramLegacySentMessageCacheEntries(params: {
cfg?: Pick<OpenClawConfig, "session">;
persistedPath?: string;
}): Array<{ key: string; value: PersistedSentMessage; ttlMs?: number }> {
const scopeKey = resolveSentMessageScopeKey(params.cfg);
const filePath = params.persistedPath ?? resolveSentMessageStorePath(params.cfg);
const legacy = fs.existsSync(filePath)
? readLegacySentMessages(filePath)
: createSentMessageStore();
return [...legacy.entries()].flatMap(([chatId, messages]) =>
[...messages.entries()].map(([messageId, timestamp]) => ({
key: sentMessageEntryKey(scopeKey, chatId, messageId),
value: { scopeKey, chatId, messageId, timestamp },
ttlMs: Math.max(1, TTL_MS - Math.max(0, Date.now() - timestamp)),
})),
);
}

View File

@@ -7,6 +7,7 @@ import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime";
import { describe, expect, it } from "vitest";
import { resolveTelegramBotInfoCachePath } from "./bot-info-cache.js";
import { resolveTelegramMessageCachePath } from "./message-cache.js";
import { resolveTelegramMessageDispatchLegacyPath } from "./message-dispatch-dedupe.js";
import { detectTelegramLegacyStateMigrations } from "./state-migrations.js";
import {
resolveTopicNameCacheNamespace,
@@ -278,4 +279,261 @@ describe("telegram state migrations", () => {
await rm(dir, { recursive: true, force: true });
}
});
it("detects remaining Telegram JSON sidecars for plugin-state import", async () => {
const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-state-migration-"));
const env = { ...process.env, OPENCLAW_STATE_DIR: dir };
const storePath = resolveStorePath(undefined, { env });
const now = Date.now();
const updateOffsetPath = path.join(dir, "telegram", "update-offset-ops.json");
const stickerCachePath = path.join(dir, "telegram", "sticker-cache.json");
const sentMessagePath = `${storePath}.telegram-sent-messages.json`;
const threadBindingsPath = path.join(dir, "telegram", "thread-bindings-ops.json");
const dispatchPath = resolveTelegramMessageDispatchLegacyPath({
storePath,
namespace: "ops",
});
try {
await mkdir(path.dirname(updateOffsetPath), { recursive: true });
await mkdir(path.dirname(sentMessagePath), { recursive: true });
await writeFile(
updateOffsetPath,
JSON.stringify({
version: 3,
lastUpdateId: 12345,
botId: "123456",
tokenFingerprint: "token:fingerprint",
}),
);
await writeFile(
stickerCachePath,
JSON.stringify({
version: 1,
stickers: {
unique_sticker: {
fileId: "file-1",
fileUniqueId: "unique_sticker",
description: "Deploy sticker",
cachedAt: "2026-05-24T12:00:00.000Z",
},
},
}),
);
await writeFile(sentMessagePath, JSON.stringify({ 7: { 42: now } }));
await writeFile(
threadBindingsPath,
JSON.stringify({
version: 1,
bindings: [
{
accountId: "ops",
conversationId: "-100:topic:7",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
boundAt: now,
lastActivityAt: now,
},
],
}),
);
await writeFile(
dispatchPath,
JSON.stringify({ [JSON.stringify(["message", "7", 42])]: now }),
);
const cfg = {
channels: {
telegram: {
accounts: {
ops: {
botToken: "123456:secret",
},
},
},
},
} as OpenClawConfig;
const plans = await detectTelegramLegacyStateMigrations({ cfg, env });
const byLabel = new Map(plans.map((plan) => [plan.label, plan]));
expect(byLabel.get("Telegram update offset")).toMatchObject({
kind: "plugin-state-import",
sourcePath: updateOffsetPath,
namespace: "telegram.update-offsets",
});
expect(byLabel.get("Telegram sticker cache")).toMatchObject({
kind: "plugin-state-import",
sourcePath: stickerCachePath,
namespace: "telegram.sticker-cache",
});
expect(byLabel.get("Telegram sent-message cache")).toMatchObject({
kind: "plugin-state-import",
sourcePath: sentMessagePath,
namespace: "telegram.sent-messages",
});
expect(byLabel.get("Telegram thread bindings")).toMatchObject({
kind: "plugin-state-import",
sourcePath: threadBindingsPath,
namespace: "telegram.thread-bindings",
});
expect(byLabel.get("Telegram message dispatch dedupe")).toMatchObject({
kind: "plugin-state-import",
sourcePath: dispatchPath,
namespace: "telegram.message-dispatch-dedupe",
});
for (const label of [
"Telegram update offset",
"Telegram sticker cache",
"Telegram sent-message cache",
"Telegram thread bindings",
"Telegram message dispatch dedupe",
]) {
const plan = byLabel.get(label);
if (!plan || plan.kind !== "plugin-state-import") {
throw new Error(`expected plugin-state import plan: ${label}`);
}
expect(await plan.readEntries()).toHaveLength(1);
}
} finally {
await rm(dir, { recursive: true, force: true });
}
});
it("detects Telegram account sidecars even after the account was removed from config", async () => {
const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-state-migration-"));
const env = { ...process.env, OPENCLAW_STATE_DIR: dir };
const updateOffsetPath = path.join(dir, "telegram", "update-offset-oldbot.json");
const threadBindingsPath = path.join(dir, "telegram", "thread-bindings-oldbot.json");
const now = Date.now();
try {
await mkdir(path.dirname(updateOffsetPath), { recursive: true });
await writeFile(
updateOffsetPath,
JSON.stringify({
version: 3,
lastUpdateId: 12345,
botId: "123456",
tokenFingerprint: "token:fingerprint",
}),
);
await writeFile(
threadBindingsPath,
JSON.stringify({
version: 1,
bindings: [
{
accountId: "oldbot",
conversationId: "-100:topic:7",
targetKind: "subagent",
targetSessionKey: "agent:main:subagent:child",
boundAt: now,
lastActivityAt: now,
},
],
}),
);
const plans = await detectTelegramLegacyStateMigrations({ cfg: {}, env });
const updateOffsetPlan = plans.find((plan) => plan.sourcePath === updateOffsetPath);
const threadBindingsPlan = plans.find((plan) => plan.sourcePath === threadBindingsPath);
expect(updateOffsetPlan).toMatchObject({
kind: "plugin-state-import",
label: "Telegram update offset",
namespace: "telegram.update-offsets",
});
expect(threadBindingsPlan).toMatchObject({
kind: "plugin-state-import",
label: "Telegram thread bindings",
namespace: "telegram.thread-bindings",
});
if (!updateOffsetPlan || updateOffsetPlan.kind !== "plugin-state-import") {
throw new Error("expected orphaned update offset import plan");
}
if (!threadBindingsPlan || threadBindingsPlan.kind !== "plugin-state-import") {
throw new Error("expected orphaned thread bindings import plan");
}
expect(await updateOffsetPlan.readEntries()).toHaveLength(1);
expect(await threadBindingsPlan.readEntries()).toHaveLength(1);
} finally {
await rm(dir, { recursive: true, force: true });
}
});
it("imports legacy session-store sidecars into the current runtime scope", async () => {
const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-state-migration-"));
const env = { ...process.env, OPENCLAW_STATE_DIR: dir };
const storePath = resolveStorePath(undefined, { env });
const legacyStorePath = path.join(dir, "sessions", "sessions.json");
const currentSentPath = `${storePath}.telegram-sent-messages.json`;
const legacySentPath = `${legacyStorePath}.telegram-sent-messages.json`;
const currentDispatchPath = resolveTelegramMessageDispatchLegacyPath({
storePath,
namespace: "ops",
});
const legacyDispatchPath = resolveTelegramMessageDispatchLegacyPath({
storePath: legacyStorePath,
namespace: "ops",
});
const now = Date.now();
try {
await mkdir(path.dirname(currentSentPath), { recursive: true });
await mkdir(path.dirname(legacySentPath), { recursive: true });
const sentPayload = JSON.stringify({ 7: { 42: now } });
const dispatchPayload = JSON.stringify({ [JSON.stringify(["message", "7", 42])]: now });
await writeFile(currentSentPath, sentPayload);
await writeFile(legacySentPath, sentPayload);
await writeFile(currentDispatchPath, dispatchPayload);
await writeFile(legacyDispatchPath, dispatchPayload);
const cfg = {
channels: {
telegram: {
accounts: {
ops: {
botToken: "123456:secret",
},
},
},
},
} as OpenClawConfig;
const plans = await detectTelegramLegacyStateMigrations({ cfg, env });
const importPlans = plans.filter((plan) => plan.kind === "plugin-state-import");
const currentSentPlan = importPlans.find(
(plan) =>
plan.label === "Telegram sent-message cache" && plan.sourcePath === currentSentPath,
);
const legacySentPlan = importPlans.find(
(plan) =>
plan.label === "Telegram sent-message cache" && plan.sourcePath === legacySentPath,
);
const currentDispatchPlan = importPlans.find(
(plan) =>
plan.label === "Telegram message dispatch dedupe" &&
plan.sourcePath === currentDispatchPath,
);
const legacyDispatchPlan = importPlans.find(
(plan) =>
plan.label === "Telegram message dispatch dedupe" &&
plan.sourcePath === legacyDispatchPath,
);
if (!currentSentPlan || !legacySentPlan || !currentDispatchPlan || !legacyDispatchPlan) {
throw new Error("expected current and legacy session-store import plans");
}
const stripTtl = (entries: Awaited<ReturnType<typeof currentSentPlan.readEntries>>) =>
entries.map(({ ttlMs: _ttlMs, ...entry }) => entry);
expect(stripTtl(await legacySentPlan.readEntries())).toStrictEqual(
stripTtl(await currentSentPlan.readEntries()),
);
const stripDispatchSourceKey = (
entries: Awaited<ReturnType<typeof currentDispatchPlan.readEntries>>,
) => entries.map(({ key: _key, ttlMs: _ttlMs, ...entry }) => entry);
expect(stripDispatchSourceKey(await legacyDispatchPlan.readEntries())).toStrictEqual(
stripDispatchSourceKey(await currentDispatchPlan.readEntries()),
);
} finally {
await rm(dir, { recursive: true, force: true });
}
});
});

View File

@@ -1,3 +1,4 @@
import fs from "node:fs";
import path from "node:path";
import type { ChannelLegacyStateMigrationPlan } from "openclaw/plugin-sdk/channel-contract";
import { resolveChannelAllowFromPath } from "openclaw/plugin-sdk/channel-pairing";
@@ -19,6 +20,29 @@ import {
TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES,
TELEGRAM_MESSAGE_CACHE_PERSISTENT_NAMESPACE,
} from "./message-cache.js";
import {
listTelegramLegacyMessageDispatchDedupeEntries,
resolveTelegramMessageDispatchLegacyPath,
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
} from "./message-dispatch-dedupe.js";
import {
listTelegramLegacySentMessageCacheEntries,
TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES,
TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE,
} from "./sent-message-cache.js";
import {
listTelegramLegacyStickerCacheEntries,
TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
TELEGRAM_STICKER_CACHE_NAMESPACE,
} from "./sticker-cache-store.js";
import {
listTelegramLegacyThreadBindingEntries,
TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES,
TELEGRAM_THREAD_BINDINGS_NAMESPACE,
testing as telegramThreadBindingTesting,
} from "./thread-bindings.js";
import { resolveTelegramToken } from "./token.js";
import {
listTelegramLegacyTopicNameCacheEntries,
resolveTopicNameCacheNamespace,
@@ -26,6 +50,13 @@ import {
resolveTopicNameCacheScope,
TELEGRAM_TOPIC_NAME_CACHE_MAX_ENTRIES,
} from "./topic-name-cache.js";
import {
listTelegramLegacyUpdateOffsetEntries,
normalizeTelegramUpdateOffsetAccountId,
shouldReplaceTelegramUpdateOffsetEntry,
TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
TELEGRAM_UPDATE_OFFSET_NAMESPACE,
} from "./update-offset-store.js";
function fileExists(pathValue: string): boolean {
try {
@@ -39,12 +70,40 @@ function resolveLegacySessionStorePath(params: {
env: NodeJS.ProcessEnv;
stateDir?: string;
}): string {
const stateDir =
return path.join(resolveMigrationStateDir(params), "sessions", "sessions.json");
}
function resolveMigrationStateDir(params: { env: NodeJS.ProcessEnv; stateDir?: string }): string {
return (
params.stateDir ??
path.dirname(
path.dirname(path.dirname(path.dirname(resolveStorePath(undefined, { env: params.env })))),
);
return path.join(stateDir, "sessions", "sessions.json");
)
);
}
function listTelegramLegacySidecarAccountIds(params: {
cfg: OpenClawConfig;
stateDir: string;
prefix: string;
suffix: string;
}): string[] {
let persistedAccountIds: string[] = [];
try {
persistedAccountIds = fs
.readdirSync(path.join(params.stateDir, "telegram"), { withFileTypes: true })
.filter(
(entry) =>
entry.isFile() &&
entry.name.startsWith(params.prefix) &&
entry.name.endsWith(params.suffix),
)
.map((entry) => entry.name.slice(params.prefix.length, -params.suffix.length))
.filter(Boolean);
} catch {
persistedAccountIds = [];
}
return uniqueStrings([...listTelegramAccountIds(params.cfg), ...persistedAccountIds]);
}
function detectTelegramMessageCacheLegacyStateMigration(params: {
@@ -113,6 +172,190 @@ function detectTelegramBotInfoCacheLegacyStateMigration(params: {
});
}
function detectTelegramUpdateOffsetLegacyStateMigration(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const stateDir = resolveMigrationStateDir(params);
return listTelegramLegacySidecarAccountIds({
cfg: params.cfg,
stateDir,
prefix: "update-offset-",
suffix: ".json",
}).flatMap((accountId) => {
const normalized = normalizeTelegramUpdateOffsetAccountId(accountId);
const persistedPath = path.join(stateDir, "telegram", `update-offset-${normalized}.json`);
if (!fileExists(persistedPath)) {
return [];
}
let botToken: string | undefined;
try {
botToken =
resolveTelegramToken(params.cfg, {
accountId,
envToken: params.env.TELEGRAM_BOT_TOKEN,
}).token || undefined;
} catch {
botToken = undefined;
}
return {
kind: "plugin-state-import",
label: "Telegram update offset",
sourcePath: persistedPath,
targetPath: `plugin state:${TELEGRAM_UPDATE_OFFSET_NAMESPACE}`,
pluginId: "telegram",
namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE,
maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
scopeKey: "",
cleanupSource: "rename",
preview: `- Telegram update offset: ${persistedPath} → plugin state (${TELEGRAM_UPDATE_OFFSET_NAMESPACE})`,
readEntries: () => listTelegramLegacyUpdateOffsetEntries({ accountId, persistedPath }),
shouldReplaceExistingEntry: ({ existingValue, incomingValue }) =>
shouldReplaceTelegramUpdateOffsetEntry({
existingValue,
incomingValue,
botToken,
}),
};
});
}
function detectTelegramStickerCacheLegacyStateMigration(params: {
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const stateDir = resolveMigrationStateDir(params);
const persistedPath = path.join(stateDir, "telegram", "sticker-cache.json");
if (!fileExists(persistedPath)) {
return [];
}
return [
{
kind: "plugin-state-import",
label: "Telegram sticker cache",
sourcePath: persistedPath,
targetPath: `plugin state:${TELEGRAM_STICKER_CACHE_NAMESPACE}`,
pluginId: "telegram",
namespace: TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
scopeKey: "",
cleanupSource: "rename",
preview: `- Telegram sticker cache: ${persistedPath} → plugin state (${TELEGRAM_STICKER_CACHE_NAMESPACE})`,
readEntries: () => listTelegramLegacyStickerCacheEntries({ persistedPath }),
},
];
}
function detectTelegramSentMessageCacheLegacyStateMigration(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const storePath = resolveStorePath(params.cfg.session?.store, { env: params.env });
const legacyStorePath = resolveLegacySessionStorePath(params);
const sources = uniqueStrings([storePath, legacyStorePath]).map((sourceStorePath) => ({
targetStorePath: storePath,
sourcePath: `${sourceStorePath}.telegram-sent-messages.json`,
}));
return sources.flatMap((source) => {
if (!fileExists(source.sourcePath)) {
return [];
}
return {
kind: "plugin-state-import",
label: "Telegram sent-message cache",
sourcePath: source.sourcePath,
targetPath: `plugin state:${TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE}`,
pluginId: "telegram",
namespace: TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE,
maxEntries: TELEGRAM_SENT_MESSAGE_CACHE_MAX_ENTRIES,
scopeKey: "",
cleanupSource: "rename",
preview: `- Telegram sent-message cache: ${source.sourcePath} → plugin state (${TELEGRAM_SENT_MESSAGE_CACHE_NAMESPACE})`,
readEntries: () =>
listTelegramLegacySentMessageCacheEntries({
cfg: { session: { store: source.targetStorePath } },
persistedPath: source.sourcePath,
}),
};
});
}
function detectTelegramThreadBindingLegacyStateMigration(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const stateDir = resolveMigrationStateDir(params);
return listTelegramLegacySidecarAccountIds({
cfg: params.cfg,
stateDir,
prefix: "thread-bindings-",
suffix: ".json",
}).flatMap((accountId) => {
const persistedPath = telegramThreadBindingTesting.resolveBindingsPath(accountId, params.env);
if (!fileExists(persistedPath)) {
return [];
}
return {
kind: "plugin-state-import",
label: "Telegram thread bindings",
sourcePath: persistedPath,
targetPath: `plugin state:${TELEGRAM_THREAD_BINDINGS_NAMESPACE}`,
pluginId: "telegram",
namespace: TELEGRAM_THREAD_BINDINGS_NAMESPACE,
maxEntries: TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES,
scopeKey: "",
cleanupSource: "rename",
preview: `- Telegram thread bindings: ${persistedPath} → plugin state (${TELEGRAM_THREAD_BINDINGS_NAMESPACE})`,
readEntries: () => listTelegramLegacyThreadBindingEntries({ accountId, persistedPath }),
};
});
}
function detectTelegramMessageDispatchLegacyStateMigration(params: {
cfg: OpenClawConfig;
env: NodeJS.ProcessEnv;
stateDir?: string;
}): ChannelLegacyStateMigrationPlan[] {
const storePath = resolveStorePath(params.cfg.session?.store, { env: params.env });
const legacyStorePath = resolveLegacySessionStorePath(params);
return listTelegramAccountIds(params.cfg).flatMap((accountId) => {
const sources = uniqueStrings([storePath, legacyStorePath]).map((sourceStorePath) => ({
targetStorePath: storePath,
sourcePath: resolveTelegramMessageDispatchLegacyPath({
storePath: sourceStorePath,
namespace: accountId,
}),
}));
return sources.flatMap((source) => {
const sourcePath = source.sourcePath;
if (!fileExists(sourcePath)) {
return [];
}
return {
kind: "plugin-state-import",
label: "Telegram message dispatch dedupe",
sourcePath,
targetPath: `plugin state:${TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE}`,
pluginId: "telegram",
namespace: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE,
maxEntries: TELEGRAM_MESSAGE_DISPATCH_DEDUPE_MAX_ENTRIES,
scopeKey: "",
cleanupSource: "rename",
preview: `- Telegram message dispatch dedupe: ${sourcePath} → plugin state (${TELEGRAM_MESSAGE_DISPATCH_DEDUPE_NAMESPACE})`,
readEntries: () =>
listTelegramLegacyMessageDispatchDedupeEntries({
storePath: source.targetStorePath,
namespace: accountId,
persistedPath: source.sourcePath,
}),
};
});
});
}
function topicNameCacheImportSource(params: {
sourceStorePath: string;
targetStorePath?: string;
@@ -197,8 +440,13 @@ export async function detectTelegramLegacyStateMigrations(params: {
});
}
}
plans.push(...detectTelegramUpdateOffsetLegacyStateMigration(params));
plans.push(...detectTelegramBotInfoCacheLegacyStateMigration(params));
plans.push(...detectTelegramStickerCacheLegacyStateMigration(params));
plans.push(...detectTelegramMessageCacheLegacyStateMigration(params));
plans.push(...detectTelegramSentMessageCacheLegacyStateMigration(params));
plans.push(...detectTelegramTopicNameCacheLegacyStateMigration(params));
plans.push(...detectTelegramThreadBindingLegacyStateMigration(params));
plans.push(...detectTelegramMessageDispatchLegacyStateMigration(params));
return plans;
}

View File

@@ -1,8 +1,13 @@
import path from "node:path";
import { loadJsonFile, saveJsonFile } from "openclaw/plugin-sdk/json-store";
import { loadJsonFile } from "openclaw/plugin-sdk/json-store";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { getTelegramRuntime } from "./runtime.js";
const CACHE_VERSION = 1;
export const TELEGRAM_STICKER_CACHE_NAMESPACE = "telegram.sticker-cache";
export const TELEGRAM_STICKER_CACHE_MAX_ENTRIES = 10_000;
export interface CachedSticker {
fileId: string;
@@ -19,57 +24,89 @@ interface StickerCache {
stickers: Record<string, CachedSticker>;
}
type TelegramStickerCacheStore = PluginStateSyncKeyedStore<CachedSticker>;
let stickerCacheStoreForTest: TelegramStickerCacheStore | undefined;
function getCacheFile(): string {
return path.join(resolveStateDir(), "telegram", "sticker-cache.json");
}
function loadCache(): StickerCache {
const data = loadJsonFile(getCacheFile());
if (!data || typeof data !== "object") {
return { version: CACHE_VERSION, stickers: {} };
}
const cache = data as StickerCache;
if (cache.version !== CACHE_VERSION) {
// Future: handle migration if needed
return { version: CACHE_VERSION, stickers: {} };
}
return cache;
function openStickerCacheStore(): TelegramStickerCacheStore {
return (
stickerCacheStoreForTest ??
getTelegramRuntime().state.openSyncKeyedStore<CachedSticker>({
namespace: TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
})
);
}
function saveCache(cache: StickerCache): void {
saveJsonFile(getCacheFile(), cache);
function loadCache(): StickerCache {
return loadCacheFile(getCacheFile());
}
function normalizeStickerSearchText(value: unknown): string {
return typeof value === "string" ? value.trim().toLowerCase() : "";
}
function normalizeCachedStickerForStore(sticker: CachedSticker): CachedSticker {
return {
fileId: sticker.fileId,
fileUniqueId: sticker.fileUniqueId,
description: sticker.description,
cachedAt: sticker.cachedAt,
...(sticker.emoji !== undefined ? { emoji: sticker.emoji } : {}),
...(sticker.setName !== undefined ? { setName: sticker.setName } : {}),
...(sticker.receivedFrom !== undefined ? { receivedFrom: sticker.receivedFrom } : {}),
};
}
function readStickerCacheStore<T>(
operation: string,
read: (store: TelegramStickerCacheStore) => T,
fallback: T,
): T {
try {
return read(openStickerCacheStore());
} catch (err) {
logVerbose(`telegram sticker cache ${operation} failed: ${String(err)}`);
return fallback;
}
}
/**
* Get a cached sticker by its unique ID.
*/
export function getCachedSticker(fileUniqueId: string): CachedSticker | null {
const cache = loadCache();
return cache.stickers[fileUniqueId] ?? null;
return readStickerCacheStore("lookup", (store) => store.lookup(fileUniqueId) ?? null, null);
}
/**
* Add or update a sticker in the cache.
*/
export function cacheSticker(sticker: CachedSticker): void {
const cache = loadCache();
cache.stickers[sticker.fileUniqueId] = sticker;
saveCache(cache);
readStickerCacheStore(
"register",
(store) => {
store.register(sticker.fileUniqueId, normalizeCachedStickerForStore(sticker));
},
undefined,
);
}
/**
* Search cached stickers by text query (fuzzy match on description + emoji + setName).
*/
export function searchStickers(query: string, limit = 10): CachedSticker[] {
const cache = loadCache();
const queryLower = normalizeStickerSearchText(query);
const results: Array<{ sticker: CachedSticker; score: number }> = [];
for (const sticker of Object.values(cache.stickers)) {
for (const { value: sticker } of readStickerCacheStore(
"entries",
(store) => store.entries(),
[],
)) {
let score = 0;
const descLower = normalizeStickerSearchText(sticker.description);
@@ -112,16 +149,18 @@ export function searchStickers(query: string, limit = 10): CachedSticker[] {
* Get all cached stickers (for debugging/listing).
*/
export function getAllCachedStickers(): CachedSticker[] {
const cache = loadCache();
return Object.values(cache.stickers);
return readStickerCacheStore(
"entries",
(store) => store.entries().map((entry) => entry.value),
[],
);
}
/**
* Get cache statistics.
*/
export function getCacheStats(): { count: number; oldestAt?: string; newestAt?: string } {
const cache = loadCache();
const stickers = Object.values(cache.stickers);
const stickers = getAllCachedStickers();
if (stickers.length === 0) {
return { count: 0 };
}
@@ -134,3 +173,37 @@ export function getCacheStats(): { count: number; oldestAt?: string; newestAt?:
newestAt: sorted[sorted.length - 1]?.cachedAt,
};
}
export function setTelegramStickerCacheStoreForTest(
store: TelegramStickerCacheStore | undefined,
): void {
stickerCacheStoreForTest = store;
}
export function clearTelegramStickerCacheForTest(): void {
openStickerCacheStore().clear();
}
export function listTelegramLegacyStickerCacheEntries(
params: {
persistedPath?: string;
} = {},
): Array<{ key: string; value: CachedSticker }> {
const cache = params.persistedPath ? loadCacheFile(params.persistedPath) : loadCache();
return Object.entries(cache.stickers).map(([key, value]) => ({
key,
value: normalizeCachedStickerForStore(value),
}));
}
function loadCacheFile(filePath: string): StickerCache {
const data = loadJsonFile(filePath);
if (!data || typeof data !== "object") {
return { version: CACHE_VERSION, stickers: {} };
}
const cache = data as StickerCache;
if (cache.version !== CACHE_VERSION) {
return { version: CACHE_VERSION, stickers: {} };
}
return cache;
}

View File

@@ -1,31 +1,29 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import * as stickerCache from "./sticker-cache-store.js";
const jsonStoreMocks = vi.hoisted(() => {
const store: { value: unknown } = { value: null };
return {
store,
loadJsonFile: vi.fn(() => store.value),
saveJsonFile: vi.fn((_file: string, value: unknown) => {
store.value = structuredClone(value);
}),
};
});
vi.mock("openclaw/plugin-sdk/json-store", () => ({
loadJsonFile: jsonStoreMocks.loadJsonFile,
saveJsonFile: jsonStoreMocks.saveJsonFile,
}));
vi.mock("openclaw/plugin-sdk/state-paths", () => ({
resolveStateDir: () => "/tmp/openclaw-test-sticker-cache",
}));
describe("sticker-cache", () => {
beforeEach(() => {
jsonStoreMocks.store.value = null;
jsonStoreMocks.loadJsonFile.mockClear();
jsonStoreMocks.saveJsonFile.mockClear();
resetPluginStateStoreForTests({ closeDatabase: false });
stickerCache.setTelegramStickerCacheStoreForTest(
createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
}),
);
stickerCache.clearTelegramStickerCacheForTest();
});
afterEach(() => {
stickerCache.setTelegramStickerCacheStoreForTest(undefined);
resetPluginStateStoreForTests();
});
describe("getCachedSticker", () => {
@@ -65,7 +63,21 @@ describe("sticker-cache", () => {
}
expect(cachedSticker.fileUniqueId).toBe("unique123");
jsonStoreMocks.store.value = null;
stickerCache.clearTelegramStickerCacheForTest();
expect(stickerCache.getCachedSticker("unique123")).toBeNull();
});
it("treats plugin-state lookup failures as cache misses", () => {
stickerCache.setTelegramStickerCacheStoreForTest({
...createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
}),
lookup() {
throw new Error("lookup failed");
},
});
expect(stickerCache.getCachedSticker("unique123")).toBeNull();
});
@@ -87,6 +99,25 @@ describe("sticker-cache", () => {
expect(all[0]).toEqual(sticker);
});
it("omits undefined optional fields before storing", () => {
stickerCache.cacheSticker({
fileId: "file-undefined",
fileUniqueId: "unique-undefined",
emoji: undefined,
setName: undefined,
description: "Sticker with omitted fields",
cachedAt: "2026-01-26T12:00:00.000Z",
receivedFrom: undefined,
});
expect(stickerCache.getCachedSticker("unique-undefined")).toStrictEqual({
fileId: "file-undefined",
fileUniqueId: "unique-undefined",
description: "Sticker with omitted fields",
cachedAt: "2026-01-26T12:00:00.000Z",
});
});
it("updates existing entry", () => {
const original = {
fileId: "file789",
@@ -108,6 +139,27 @@ describe("sticker-cache", () => {
expect(result?.description).toBe("Updated description");
expect(result?.fileId).toBe("file789-new");
});
it("does not throw when plugin-state writes fail", () => {
stickerCache.setTelegramStickerCacheStoreForTest({
...createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
}),
register() {
throw new Error("write failed");
},
});
expect(() =>
stickerCache.cacheSticker({
fileId: "file-failure",
fileUniqueId: "unique-failure",
description: "Write failure should not block sticker handling",
cachedAt: "2026-01-26T13:00:00.000Z",
}),
).not.toThrow();
});
});
describe("searchStickers", () => {
@@ -201,6 +253,20 @@ describe("sticker-cache", () => {
expect(results).toHaveLength(1);
expect(results[0]?.fileUniqueId).toBe("cat-unique-1");
});
it("returns no matches when plugin-state search reads fail", () => {
stickerCache.setTelegramStickerCacheStoreForTest({
...createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
}),
entries() {
throw new Error("entries failed");
},
});
expect(stickerCache.searchStickers("fox")).toStrictEqual([]);
});
});
describe("getAllCachedStickers", () => {
@@ -209,6 +275,20 @@ describe("sticker-cache", () => {
expect(result).toStrictEqual([]);
});
it("returns empty array when plugin-state list reads fail", () => {
stickerCache.setTelegramStickerCacheStoreForTest({
...createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: stickerCache.TELEGRAM_STICKER_CACHE_NAMESPACE,
maxEntries: stickerCache.TELEGRAM_STICKER_CACHE_MAX_ENTRIES,
}),
entries() {
throw new Error("entries failed");
},
});
expect(stickerCache.getAllCachedStickers()).toStrictEqual([]);
});
it("returns all cached stickers", () => {
stickerCache.cacheSticker({
fileId: "a",

View File

@@ -3,11 +3,14 @@ import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { getSessionBindingService } from "openclaw/plugin-sdk/conversation-runtime";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { importFreshModule } from "openclaw/plugin-sdk/test-fixtures";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const writeJsonFileAtomicallyMock = vi.hoisted(() => vi.fn());
const readAcpSessionEntryMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/acp-runtime", async () => {
@@ -21,24 +24,20 @@ vi.mock("openclaw/plugin-sdk/acp-runtime", async () => {
};
});
vi.mock("openclaw/plugin-sdk/json-store", async () => {
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/json-store")>(
"openclaw/plugin-sdk/json-store",
);
writeJsonFileAtomicallyMock.mockImplementation(actual.writeJsonFileAtomically);
return {
...actual,
writeJsonFileAtomically: writeJsonFileAtomicallyMock,
};
});
import {
TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES,
TELEGRAM_THREAD_BINDINGS_NAMESPACE,
testing,
createTelegramThreadBindingManager as createTelegramThreadBindingManagerImpl,
setTelegramThreadBindingIdleTimeoutBySessionKey,
setTelegramThreadBindingMaxAgeBySessionKey,
setTelegramThreadBindingStoreForTest,
} from "./thread-bindings.js";
type ThreadBindingStoreEntry = ReturnType<
ReturnType<typeof createTelegramThreadBindingManagerImpl>["listBindings"]
>[number];
const TELEGRAM_THREAD_BINDINGS_TEST_CFG = {
channels: {
telegram: {
@@ -63,14 +62,30 @@ function createTelegramThreadBindingManager(
async function flushMicrotasks(): Promise<void> {
await Promise.resolve();
await new Promise<void>((resolve) => queueMicrotask(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
}
describe("telegram thread bindings", () => {
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
let stateDirOverride: string | undefined;
let threadBindingStore: PluginStateSyncKeyedStore<ThreadBindingStoreEntry>;
function createThreadBindingStore(): PluginStateSyncKeyedStore<ThreadBindingStoreEntry> {
return createPluginStateSyncKeyedStoreForTests("telegram", {
namespace: TELEGRAM_THREAD_BINDINGS_NAMESPACE,
maxEntries: TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES,
});
}
function storedBindings(): ThreadBindingStoreEntry[] {
return threadBindingStore.entries().map((entry) => entry.value);
}
beforeEach(async () => {
writeJsonFileAtomicallyMock.mockClear();
resetPluginStateStoreForTests({ closeDatabase: false });
threadBindingStore = createThreadBindingStore();
threadBindingStore.clear();
setTelegramThreadBindingStoreForTest(threadBindingStore);
readAcpSessionEntryMock.mockReset();
const acpRuntime = await vi.importActual<typeof import("openclaw/plugin-sdk/acp-runtime")>(
"openclaw/plugin-sdk/acp-runtime",
@@ -82,6 +97,8 @@ describe("telegram thread bindings", () => {
afterEach(async () => {
vi.useRealTimers();
await testing.resetTelegramThreadBindingsForTests();
setTelegramThreadBindingStoreForTest(undefined);
resetPluginStateStoreForTests();
if (stateDirOverride) {
fs.rmSync(stateDirOverride, { recursive: true, force: true });
stateDirOverride = undefined;
@@ -182,6 +199,8 @@ describe("telegram thread bindings", () => {
import.meta.url,
"./thread-bindings.js?scope=shared-b",
);
bindingsA.setTelegramThreadBindingStoreForTest(threadBindingStore);
bindingsB.setTelegramThreadBindingStoreForTest(threadBindingStore);
await bindingsA.testing.resetTelegramThreadBindingsForTests();
@@ -219,6 +238,8 @@ describe("telegram thread bindings", () => {
).toBe("agent:main:subagent:child-shared");
} finally {
await bindingsA.testing.resetTelegramThreadBindingsForTests();
bindingsA.setTelegramThreadBindingStoreForTest(undefined);
bindingsB.setTelegramThreadBindingStoreForTest(undefined);
}
});
@@ -301,12 +322,9 @@ describe("telegram thread bindings", () => {
maxAgeMs: 2 * 60 * 60 * 1000,
});
const statePath = path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-no-persist.json",
expect(storedBindings().filter((binding) => binding.accountId === "no-persist")).toStrictEqual(
[],
);
expect(fs.existsSync(statePath)).toBe(false);
});
it("persists unbinds before restart so removed bindings do not come back", async () => {
@@ -345,6 +363,62 @@ describe("telegram thread bindings", () => {
expect(reloaded.getByConversationId("8460800771")).toBeUndefined();
});
it("persists bindings with json-clean metadata", async () => {
createTelegramThreadBindingManager({
accountId: "metadata",
persist: true,
enableSweeper: false,
});
await getSessionBindingService().bind({
targetSessionKey: "agent:main:subagent:metadata-child",
targetKind: "subagent",
conversation: {
channel: "telegram",
accountId: "metadata",
conversationId: "metadata-thread",
},
metadata: {
retained: "yes",
omitted: undefined,
},
});
await testing.resetTelegramThreadBindingsForTests();
const reloaded = createTelegramThreadBindingManager({
accountId: "metadata",
persist: true,
enableSweeper: false,
});
expect(reloaded.getByConversationId("metadata-thread")?.metadata).toStrictEqual({
retained: "yes",
});
expect(
storedBindings().find((binding) => binding.accountId === "metadata")?.metadata,
).toStrictEqual({
retained: "yes",
});
});
it("starts with empty bindings when the plugin-state store cannot be read", () => {
setTelegramThreadBindingStoreForTest({
...threadBindingStore,
entries() {
throw new Error("state unavailable");
},
});
const manager = createTelegramThreadBindingManager({
accountId: "read-failure",
persist: true,
enableSweeper: false,
});
expect(manager.listBindings()).toStrictEqual([]);
});
it("cleans up stale ACP bindings before restart routing can reuse them", async () => {
stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-"));
process.env.OPENCLAW_STATE_DIR = stateDirOverride;
@@ -384,19 +458,7 @@ describe("telegram thread bindings", () => {
expect(reloaded.getByConversationId("cleanup-me")).toBeUndefined();
await testing.resetTelegramThreadBindingsForTests();
const persisted = JSON.parse(
fs.readFileSync(
path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-default.json",
),
"utf8",
),
) as { bindings?: Array<{ conversationId?: string }> };
expect(persisted.bindings?.map((binding) => binding.conversationId)).not.toContain(
"cleanup-me",
);
expect(storedBindings().map((binding) => binding.conversationId)).not.toContain("cleanup-me");
});
it("keeps plugin-owned bindings when ACP cleanup runs on startup", async () => {
@@ -505,15 +567,9 @@ describe("telegram thread bindings", () => {
await testing.resetTelegramThreadBindingsForTests();
const statePath = path.join(
resolveStateDir(process.env, os.homedir),
"telegram",
"thread-bindings-persist-reset.json",
);
const persisted = JSON.parse(fs.readFileSync(statePath, "utf8")) as {
bindings?: Array<{ idleTimeoutMs?: number }>;
};
expect(persisted.bindings?.[0]?.idleTimeoutMs).toBe(90_000);
expect(
storedBindings().find((binding) => binding.accountId === "persist-reset")?.idleTimeoutMs,
).toBe(90_000);
});
it("does not leak unhandled rejections when a persist write fails", async () => {
@@ -542,8 +598,11 @@ describe("telegram thread bindings", () => {
},
});
writeJsonFileAtomicallyMock.mockImplementationOnce(async () => {
throw new Error("persist boom");
setTelegramThreadBindingStoreForTest({
...threadBindingStore,
register() {
throw new Error("persist boom");
},
});
manager.touchConversation("-100200300:topic:100");

View File

@@ -1,3 +1,4 @@
import { createHash } from "node:crypto";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
@@ -14,19 +15,23 @@ import {
type SessionBindingRecord,
} from "openclaw/plugin-sdk/conversation-runtime";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
import type { PluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { normalizeAccountId, isAcpSessionKey } from "openclaw/plugin-sdk/routing";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime";
import { getTelegramRuntime } from "./runtime.js";
import { resolveTelegramToken } from "./token.js";
const DEFAULT_THREAD_BINDING_IDLE_TIMEOUT_MS = 24 * 60 * 60 * 1000;
const DEFAULT_THREAD_BINDING_MAX_AGE_MS = 0;
const THREAD_BINDINGS_SWEEP_INTERVAL_MS = 60_000;
const STORE_VERSION = 1;
export const TELEGRAM_THREAD_BINDINGS_NAMESPACE = "telegram.thread-bindings";
export const TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES = 5_000;
let telegramSendModulePromise: Promise<typeof import("./send.js")> | undefined;
let threadBindingStoreForTest: PluginStateSyncKeyedStore<TelegramThreadBindingRecord> | undefined;
async function loadTelegramSendModule() {
telegramSendModulePromise ??= import("./send.js");
@@ -55,6 +60,8 @@ type StoredTelegramBindingState = {
bindings: TelegramThreadBindingRecord[];
};
type TelegramThreadBindingStore = PluginStateSyncKeyedStore<TelegramThreadBindingRecord>;
type TelegramThreadBindingManager = {
accountId: string;
shouldPersistMutations: () => boolean;
@@ -116,6 +123,23 @@ function resolveBindingKey(params: { accountId: string; conversationId: string }
return `${params.accountId}:${params.conversationId}`;
}
function resolveStoredBindingKey(params: { accountId: string; conversationId: string }): string {
return createHash("sha256")
.update(`${params.accountId}\0${params.conversationId}`, "utf8")
.digest("hex")
.slice(0, 32);
}
function openThreadBindingStore(): TelegramThreadBindingStore {
return (
threadBindingStoreForTest ??
getTelegramRuntime().state.openSyncKeyedStore<TelegramThreadBindingRecord>({
namespace: TELEGRAM_THREAD_BINDINGS_NAMESPACE,
maxEntries: TELEGRAM_THREAD_BINDINGS_MAX_ENTRIES,
})
);
}
function toSessionBindingTargetKind(raw: TelegramBindingTargetKind): BindingTargetKind {
return raw === "subagent" ? "subagent" : "session";
}
@@ -228,6 +252,20 @@ function resolveBindingsPath(accountId: string, env: NodeJS.ProcessEnv = process
return path.join(stateDir, "telegram", `thread-bindings-${accountId}.json`);
}
function normalizeMetadataForStore(
metadata: Record<string, unknown> | undefined,
): Record<string, unknown> | undefined {
if (!metadata) {
return undefined;
}
const serialized = JSON.stringify(metadata);
if (!serialized) {
return undefined;
}
const parsed = JSON.parse(serialized) as Record<string, unknown>;
return Object.keys(parsed).length > 0 ? parsed : undefined;
}
function summarizeLifecycleForLog(
record: TelegramThreadBindingRecord,
defaults: {
@@ -243,8 +281,60 @@ function summarizeLifecycleForLog(
return `idle=${idleLabel} maxAge=${maxAgeLabel}`;
}
function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] {
const filePath = resolveBindingsPath(accountId);
function sanitizeStoredBinding(
accountId: string,
entry: Partial<TelegramThreadBindingRecord> | null | undefined,
): TelegramThreadBindingRecord | null {
const conversationId = normalizeOptionalString(entry?.conversationId);
const targetSessionKey = normalizeOptionalString(entry?.targetSessionKey) ?? "";
const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp";
if (!conversationId || !targetSessionKey) {
return null;
}
const boundAt =
typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt)
? Math.floor(entry.boundAt)
: Date.now();
const lastActivityAt =
typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt)
? Math.floor(entry.lastActivityAt)
: boundAt;
const record: TelegramThreadBindingRecord = {
accountId,
conversationId,
targetSessionKey,
targetKind,
boundAt,
lastActivityAt,
};
if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) {
record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs));
}
if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) {
record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs));
}
if (typeof entry?.agentId === "string" && entry.agentId.trim()) {
record.agentId = entry.agentId.trim();
}
if (typeof entry?.label === "string" && entry.label.trim()) {
record.label = entry.label.trim();
}
if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) {
record.boundBy = entry.boundBy.trim();
}
const metadata = normalizeMetadataForStore(
entry?.metadata && typeof entry.metadata === "object" ? { ...entry.metadata } : undefined,
);
if (metadata) {
record.metadata = metadata;
}
return record;
}
function readLegacyBindingsFile(
filePath: string,
accountId: string,
): TelegramThreadBindingRecord[] {
try {
const raw = fs.readFileSync(filePath, "utf-8");
const parsed = JSON.parse(raw) as StoredTelegramBindingState;
@@ -253,47 +343,10 @@ function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[]
}
const bindings: TelegramThreadBindingRecord[] = [];
for (const entry of parsed.bindings) {
const conversationId = normalizeOptionalString(entry?.conversationId);
const targetSessionKey = normalizeOptionalString(entry?.targetSessionKey) ?? "";
const targetKind = entry?.targetKind === "subagent" ? "subagent" : "acp";
if (!conversationId || !targetSessionKey) {
continue;
const record = sanitizeStoredBinding(accountId, entry);
if (record) {
bindings.push(record);
}
const boundAt =
typeof entry?.boundAt === "number" && Number.isFinite(entry.boundAt)
? Math.floor(entry.boundAt)
: Date.now();
const lastActivityAt =
typeof entry?.lastActivityAt === "number" && Number.isFinite(entry.lastActivityAt)
? Math.floor(entry.lastActivityAt)
: boundAt;
const record: TelegramThreadBindingRecord = {
accountId,
conversationId,
targetSessionKey,
targetKind,
boundAt,
lastActivityAt,
};
if (typeof entry?.idleTimeoutMs === "number" && Number.isFinite(entry.idleTimeoutMs)) {
record.idleTimeoutMs = Math.max(0, Math.floor(entry.idleTimeoutMs));
}
if (typeof entry?.maxAgeMs === "number" && Number.isFinite(entry.maxAgeMs)) {
record.maxAgeMs = Math.max(0, Math.floor(entry.maxAgeMs));
}
if (typeof entry?.agentId === "string" && entry.agentId.trim()) {
record.agentId = entry.agentId.trim();
}
if (typeof entry?.label === "string" && entry.label.trim()) {
record.label = entry.label.trim();
}
if (typeof entry?.boundBy === "string" && entry.boundBy.trim()) {
record.boundBy = entry.boundBy.trim();
}
if (entry?.metadata && typeof entry.metadata === "object") {
record.metadata = { ...entry.metadata };
}
bindings.push(record);
}
return bindings;
} catch (err) {
@@ -305,7 +358,43 @@ function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[]
}
}
async function persistBindingsToDisk(params: {
function loadBindingsFromStore(accountId: string): TelegramThreadBindingRecord[] {
let store: TelegramThreadBindingStore;
try {
store = openThreadBindingStore();
} catch (err) {
logVerbose(`telegram thread bindings store open failed (${accountId}): ${String(err)}`);
return [];
}
let entries: Array<{ key: string; value: TelegramThreadBindingRecord }>;
try {
entries = store.entries();
} catch (err) {
logVerbose(`telegram thread bindings store read failed (${accountId}): ${String(err)}`);
return [];
}
const bindings: TelegramThreadBindingRecord[] = [];
for (const entry of entries) {
if (entry.value.accountId !== accountId) {
continue;
}
const sanitized = sanitizeStoredBinding(accountId, entry.value);
if (sanitized) {
bindings.push(sanitized);
continue;
}
try {
store.delete(entry.key);
} catch (err) {
logVerbose(
`telegram thread bindings invalid row cleanup failed (${accountId}): ${String(err)}`,
);
}
}
return bindings;
}
async function persistBindingsToStore(params: {
accountId: string;
persist: boolean;
bindings?: TelegramThreadBindingRecord[];
@@ -313,15 +402,27 @@ async function persistBindingsToDisk(params: {
if (!params.persist) {
return;
}
const payload: StoredTelegramBindingState = {
version: STORE_VERSION,
bindings:
params.bindings ??
[...getThreadBindingsState().bindingsByAccountConversation.values()].filter(
(entry) => entry.accountId === params.accountId,
),
};
await writeJsonFileAtomically(resolveBindingsPath(params.accountId), payload);
const store = openThreadBindingStore();
const bindings =
params.bindings ??
[...getThreadBindingsState().bindingsByAccountConversation.values()].filter(
(entry) => entry.accountId === params.accountId,
);
const nextKeys = new Set<string>();
for (const binding of bindings) {
const stored = sanitizeStoredBinding(params.accountId, binding);
if (!stored) {
continue;
}
const key = resolveStoredBindingKey(stored);
nextKeys.add(key);
store.register(key, stored);
}
for (const entry of store.entries()) {
if (entry.value.accountId === params.accountId && !nextKeys.has(entry.key)) {
store.delete(entry.key);
}
}
}
function listBindingsForAccount(accountId: string): TelegramThreadBindingRecord[] {
@@ -343,7 +444,7 @@ function enqueuePersistBindings(params: {
const next = previous
.catch(() => undefined)
.then(async () => {
await persistBindingsToDisk(params);
await persistBindingsToStore(params);
});
getThreadBindingsState().persistQueueByAccountId.set(params.accountId, next);
const cleanup = () => {
@@ -428,7 +529,7 @@ export function createTelegramThreadBindingManager(params: {
);
const maxAgeMs = normalizeDurationMs(params.maxAgeMs, DEFAULT_THREAD_BINDING_MAX_AGE_MS);
const loaded = loadBindingsFromDisk(accountId);
const loaded = loadBindingsFromStore(accountId);
for (const entry of loaded) {
const key = resolveBindingKey({
accountId,
@@ -917,7 +1018,26 @@ export async function resetTelegramThreadBindingsForTests() {
getThreadBindingsState().bindingsByAccountConversation.clear();
}
export function setTelegramThreadBindingStoreForTest(
store: TelegramThreadBindingStore | undefined,
): void {
threadBindingStoreForTest = store;
}
export function listTelegramLegacyThreadBindingEntries(params: {
accountId: string;
persistedPath?: string;
}): Array<{ key: string; value: TelegramThreadBindingRecord }> {
const bindings = readLegacyBindingsFile(
params.persistedPath ?? resolveBindingsPath(params.accountId),
params.accountId,
);
return bindings.map((value) => ({ key: resolveStoredBindingKey(value), value }));
}
export const testing = {
resetTelegramThreadBindingsForTests,
resolveBindingsPath,
resolveStoredBindingKey,
};
export { testing as __testing };

View File

@@ -2,10 +2,8 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import { withStateDirEnv } from "openclaw/plugin-sdk/test-env";
import { afterEach, describe, expect, it, vi } from "vitest";
import { resolveTelegramToken } from "./token.js";
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
describe("resolveTelegramToken", () => {
const tempDirs: string[] = [];
@@ -431,18 +429,3 @@ describe("resolveTelegramToken", () => {
expectNoTokenForUnknownAccount(createUnknownAccountConfig());
});
});
describe("telegram update offset store", () => {
it("persists and reloads the last update id", async () => {
await withStateDirEnv("openclaw-telegram-", async () => {
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
await writeTelegramUpdateOffset({
accountId: "primary",
updateId: 421,
});
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBe(421);
});
});
});

View File

@@ -1,15 +1,40 @@
import fs from "node:fs/promises";
import path from "node:path";
import { withStateDirEnv } from "openclaw/plugin-sdk/test-env";
import { describe, expect, it } from "vitest";
import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { withStateDirEnv } from "openclaw/plugin-sdk/test-env";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { fingerprintTelegramBotToken } from "./token-fingerprint.js";
import {
TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
TELEGRAM_UPDATE_OFFSET_NAMESPACE,
type TelegramUpdateOffsetState,
deleteTelegramUpdateOffset,
readTelegramUpdateOffset,
setTelegramUpdateOffsetStoreForTest,
shouldReplaceTelegramUpdateOffsetEntry,
writeTelegramUpdateOffset,
} from "./update-offset-store.js";
describe("deleteTelegramUpdateOffset", () => {
it("removes the offset file so a new bot starts fresh", async () => {
let updateOffsetStore: PluginStateKeyedStore<TelegramUpdateOffsetState>;
beforeEach(async () => {
updateOffsetStore = createPluginStateKeyedStoreForTests<TelegramUpdateOffsetState>("telegram", {
namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE,
maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
});
await updateOffsetStore.clear();
setTelegramUpdateOffsetStoreForTest(updateOffsetStore);
});
afterEach(() => {
setTelegramUpdateOffsetStoreForTest(undefined);
resetPluginStateStoreForTests();
});
it("removes the offset row so a new bot starts fresh", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await writeTelegramUpdateOffset({ accountId: "default", updateId: 432_000_000 });
expect(await readTelegramUpdateOffset({ accountId: "default" })).toBe(432_000_000);
@@ -19,7 +44,7 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("keeps a missing offset file absent after delete", async () => {
it("keeps a missing offset row absent after delete", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await deleteTelegramUpdateOffset({ accountId: "nonexistent" });
expect(await readTelegramUpdateOffset({ accountId: "nonexistent" })).toBeNull();
@@ -38,6 +63,24 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("surfaces plugin-state write failures", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
setTelegramUpdateOffsetStoreForTest({
...createPluginStateKeyedStoreForTests<TelegramUpdateOffsetState>("telegram", {
namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE,
maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
}),
async register() {
throw new Error("store write failed");
},
});
await expect(
writeTelegramUpdateOffset({ accountId: "default", updateId: 808 }),
).rejects.toThrow("store write failed");
});
});
it("returns null when stored offset was written by a different bot token", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await writeTelegramUpdateOffset({
@@ -90,15 +133,12 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("invokes onRotationDetected for legacy offsets without bot identity", async () => {
await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => {
const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json");
await fs.mkdir(path.dirname(legacyPath), { recursive: true });
await fs.writeFile(
legacyPath,
`${JSON.stringify({ version: 1, lastUpdateId: 777 }, null, 2)}\n`,
"utf-8",
);
it("invokes onRotationDetected for imported legacy offsets without bot identity", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await updateOffsetStore.register("default", {
version: 1,
lastUpdateId: 777,
} as TelegramUpdateOffsetState);
const rotations: Array<Record<string, unknown>> = [];
const offset = await readTelegramUpdateOffset({
@@ -121,6 +161,83 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("returns null when the plugin-state read fails", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
setTelegramUpdateOffsetStoreForTest({
...createPluginStateKeyedStoreForTests<TelegramUpdateOffsetState>("telegram", {
namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE,
maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
}),
async lookup() {
throw new Error("store unavailable");
},
});
expect(await readTelegramUpdateOffset({ accountId: "primary" })).toBeNull();
});
});
it("lets migration replace stale plugin-state with a higher compatible imported offset", () => {
const token = "111111:current";
expect(
shouldReplaceTelegramUpdateOffsetEntry({
botToken: token,
existingValue: {
version: 3,
lastUpdateId: 10,
botId: "111111",
tokenFingerprint: fingerprintTelegramBotToken(token),
},
incomingValue: {
version: 3,
lastUpdateId: 20,
botId: "111111",
tokenFingerprint: fingerprintTelegramBotToken(token),
},
}),
).toBe(true);
});
it("keeps plugin-state when the imported offset belongs to another bot", () => {
const token = "111111:current";
expect(
shouldReplaceTelegramUpdateOffsetEntry({
botToken: token,
existingValue: {
version: 3,
lastUpdateId: 10,
botId: "111111",
tokenFingerprint: fingerprintTelegramBotToken(token),
},
incomingValue: {
version: 3,
lastUpdateId: 999,
botId: "222222",
tokenFingerprint: "stale",
},
}),
).toBe(false);
});
it("keeps plugin-state across persisted bot-id conflicts when no token is available", () => {
expect(
shouldReplaceTelegramUpdateOffsetEntry({
existingValue: {
version: 3,
lastUpdateId: 10,
botId: "111111",
tokenFingerprint: "current-fingerprint",
},
incomingValue: {
version: 3,
lastUpdateId: 999,
botId: "222222",
tokenFingerprint: "stale-fingerprint",
},
}),
).toBe(false);
});
it("detects same-bot token rotation via the persisted fingerprint", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
const original = "111111:original-secret";
@@ -160,15 +277,13 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("treats v2 bot-id-only offsets as stale when token identity cannot be verified", async () => {
await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => {
const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json");
await fs.mkdir(path.dirname(legacyPath), { recursive: true });
await fs.writeFile(
legacyPath,
`${JSON.stringify({ version: 2, lastUpdateId: 999, botId: "111111" }, null, 2)}\n`,
"utf-8",
);
it("treats imported v2 bot-id-only offsets as stale when token identity cannot be verified", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await updateOffsetStore.register("default", {
version: 2,
lastUpdateId: 999,
botId: "111111",
} as TelegramUpdateOffsetState);
const rotations: Array<Record<string, unknown>> = [];
const offset = await readTelegramUpdateOffset({
@@ -214,15 +329,12 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("treats legacy offset records without bot identity as stale when token is provided", async () => {
await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => {
const legacyPath = path.join(stateDir, "telegram", "update-offset-default.json");
await fs.mkdir(path.dirname(legacyPath), { recursive: true });
await fs.writeFile(
legacyPath,
`${JSON.stringify({ version: 1, lastUpdateId: 777 }, null, 2)}\n`,
"utf-8",
);
it("treats imported legacy offset records without bot identity as stale when token is provided", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await updateOffsetStore.register("default", {
version: 1,
lastUpdateId: 777,
} as TelegramUpdateOffsetState);
expect(
await readTelegramUpdateOffset({
@@ -233,22 +345,20 @@ describe("deleteTelegramUpdateOffset", () => {
});
});
it("ignores invalid persisted update IDs from disk", async () => {
await withStateDirEnv("openclaw-tg-offset-", async ({ stateDir }) => {
const offsetPath = path.join(stateDir, "telegram", "update-offset-default.json");
await fs.mkdir(path.dirname(offsetPath), { recursive: true });
await fs.writeFile(
offsetPath,
`${JSON.stringify({ version: 2, lastUpdateId: -1, botId: "111111" }, null, 2)}\n`,
"utf-8",
);
it("ignores invalid persisted update IDs from plugin-state", async () => {
await withStateDirEnv("openclaw-tg-offset-", async () => {
await updateOffsetStore.register("default", {
version: 2,
lastUpdateId: -1,
botId: "111111",
} as TelegramUpdateOffsetState);
expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull();
await fs.writeFile(
offsetPath,
`${JSON.stringify({ version: 2, lastUpdateId: Number.POSITIVE_INFINITY, botId: "111111" }, null, 2)}\n`,
"utf-8",
);
await updateOffsetStore.register("default", {
version: 2,
lastUpdateId: "not-a-number",
botId: "111111",
} as unknown as TelegramUpdateOffsetState);
expect(await readTelegramUpdateOffset({ accountId: "default" })).toBeNull();
});
});

View File

@@ -1,24 +1,28 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
import { resolveStateDir } from "openclaw/plugin-sdk/state-paths";
import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store";
import type { PluginStateKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { getTelegramRuntime } from "./runtime.js";
import { fingerprintTelegramBotToken } from "./token-fingerprint.js";
const STORE_VERSION = 3;
export const TELEGRAM_UPDATE_OFFSET_NAMESPACE = "telegram.update-offsets";
export const TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES = 1_000;
type TelegramUpdateOffsetState = {
export type TelegramUpdateOffsetState = {
version: number;
lastUpdateId: number | null;
botId: string | null;
tokenFingerprint: string | null;
};
type TelegramUpdateOffsetStore = PluginStateKeyedStore<TelegramUpdateOffsetState>;
let updateOffsetStoreForTest: TelegramUpdateOffsetStore | undefined;
function isValidUpdateId(value: unknown): value is number {
return typeof value === "number" && Number.isSafeInteger(value) && value >= 0;
}
function normalizeAccountId(accountId?: string) {
export function normalizeTelegramUpdateOffsetAccountId(accountId?: string) {
const trimmed = accountId?.trim();
if (!trimmed) {
return "default";
@@ -26,13 +30,15 @@ function normalizeAccountId(accountId?: string) {
return trimmed.replace(/[^a-z0-9._-]+/gi, "_");
}
function resolveTelegramUpdateOffsetPath(
accountId?: string,
env: NodeJS.ProcessEnv = process.env,
): string {
const stateDir = resolveStateDir(env, os.homedir);
const normalized = normalizeAccountId(accountId);
return path.join(stateDir, "telegram", `update-offset-${normalized}.json`);
function openUpdateOffsetStore(env?: NodeJS.ProcessEnv): TelegramUpdateOffsetStore {
return (
updateOffsetStoreForTest ??
getTelegramRuntime().state.openKeyedStore<TelegramUpdateOffsetState>({
namespace: TELEGRAM_UPDATE_OFFSET_NAMESPACE,
maxEntries: TELEGRAM_UPDATE_OFFSET_MAX_ENTRIES,
...(env ? { env } : {}),
})
);
}
function extractBotIdFromToken(token?: string): string | null {
@@ -133,9 +139,14 @@ export async function readTelegramUpdateOffset(params: {
env?: NodeJS.ProcessEnv;
onRotationDetected?: (info: TelegramUpdateOffsetRotationInfo) => void | Promise<void>;
}): Promise<number | null> {
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
const { value } = await readJsonFileWithFallback<unknown>(filePath, null);
const parsed = safeParseState(value);
const key = normalizeTelegramUpdateOffsetAccountId(params.accountId);
let storedValue: unknown;
try {
storedValue = await openUpdateOffsetStore(params.env).lookup(key);
} catch {
storedValue = undefined;
}
const parsed = safeParseState(storedValue);
if (!parsed) {
return null;
}
@@ -156,28 +167,77 @@ export async function writeTelegramUpdateOffset(params: {
if (!isValidUpdateId(params.updateId)) {
throw new Error("Telegram update offset must be a non-negative safe integer.");
}
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
const payload: TelegramUpdateOffsetState = {
version: STORE_VERSION,
lastUpdateId: params.updateId,
botId: extractBotIdFromToken(params.botToken),
tokenFingerprint: fingerprintFromToken(params.botToken),
};
await writeJsonFileAtomically(filePath, payload);
await openUpdateOffsetStore(params.env).register(
normalizeTelegramUpdateOffsetAccountId(params.accountId),
payload,
);
}
export async function deleteTelegramUpdateOffset(params: {
accountId?: string;
env?: NodeJS.ProcessEnv;
}): Promise<void> {
const filePath = resolveTelegramUpdateOffsetPath(params.accountId, params.env);
try {
await fs.unlink(filePath);
} catch (err) {
const code = (err as { code?: string }).code;
if (code === "ENOENT") {
return;
}
throw err;
}
await openUpdateOffsetStore(params.env).delete(
normalizeTelegramUpdateOffsetAccountId(params.accountId),
);
}
export function setTelegramUpdateOffsetStoreForTest(
store: TelegramUpdateOffsetStore | undefined,
): void {
updateOffsetStoreForTest = store;
}
export async function listTelegramLegacyUpdateOffsetEntries(params: {
accountId?: string;
persistedPath: string;
}): Promise<Array<{ key: string; value: TelegramUpdateOffsetState }>> {
const { value } = await readJsonFileWithFallback<unknown>(params.persistedPath, null);
const parsed = safeParseState(value);
if (!parsed || parsed.lastUpdateId === null) {
return [];
}
return [{ key: normalizeTelegramUpdateOffsetAccountId(params.accountId), value: parsed }];
}
export function shouldReplaceTelegramUpdateOffsetEntry(params: {
existingValue: unknown;
incomingValue: unknown;
botToken?: string;
}): boolean {
const existing = safeParseState(params.existingValue);
const incoming = safeParseState(params.incomingValue);
if (!incoming || incoming.lastUpdateId === null) {
return false;
}
if (!existing || existing.lastUpdateId === null) {
return true;
}
if (!params.botToken) {
if (existing.botId && incoming.botId && existing.botId !== incoming.botId) {
return false;
}
if (
existing.tokenFingerprint &&
incoming.tokenFingerprint &&
existing.tokenFingerprint !== incoming.tokenFingerprint
) {
return false;
}
}
const incomingRotation = rotationForToken(incoming, params.botToken);
if (incomingRotation) {
return false;
}
const existingRotation = rotationForToken(existing, params.botToken);
if (existingRotation) {
return true;
}
return incoming.lastUpdateId > existing.lastUpdateId;
}

View File

@@ -173,6 +173,11 @@ export type ChannelLegacyStateMigrationPlan =
stateDir?: string;
cleanupSource?: "rename";
preview?: string;
shouldReplaceExistingEntry?: (params: {
key: string;
existingValue: unknown;
incomingValue: unknown;
}) => boolean | Promise<boolean>;
readEntries: () =>
| Array<{ key: string; value: unknown; ttlMs?: number }>
| Promise<Array<{ key: string; value: unknown; ttlMs?: number }>>;

View File

@@ -59,10 +59,14 @@ describe("createTypingCallbacks", () => {
it("invokes start on reply start", async () => {
const { start, onStartError, callbacks } = createTypingHarness();
await callbacks.onReplyStart();
try {
await callbacks.onReplyStart();
expect(start).toHaveBeenCalledTimes(1);
expect(onStartError).not.toHaveBeenCalled();
expect(start).toHaveBeenCalledTimes(1);
expect(onStartError).not.toHaveBeenCalled();
} finally {
callbacks.onCleanup?.();
}
});
it("reports start errors", async () => {
@@ -70,10 +74,14 @@ describe("createTypingCallbacks", () => {
start: vi.fn().mockRejectedValue(new Error("fail")),
});
await callbacks.onReplyStart();
await flushMicrotasks();
try {
await callbacks.onReplyStart();
await flushMicrotasks();
expect(onStartError).toHaveBeenCalledTimes(1);
expect(onStartError).toHaveBeenCalledTimes(1);
} finally {
callbacks.onCleanup?.();
}
});
it("does not block reply start on a pending typing request", async () => {
@@ -87,13 +95,17 @@ describe("createTypingCallbacks", () => {
),
});
await callbacks.onReplyStart();
try {
await callbacks.onReplyStart();
expect(start).toHaveBeenCalledTimes(1);
if (!resolveStart) {
throw new Error("Expected typing start resolver to be initialized");
expect(start).toHaveBeenCalledTimes(1);
if (!resolveStart) {
throw new Error("Expected typing start resolver to be initialized");
}
resolveStart();
} finally {
callbacks.onCleanup?.();
}
resolveStart();
});
it("invokes stop on idle and reports stop errors", async () => {

View File

@@ -211,6 +211,9 @@ describe("ensureConfigReady", () => {
["Discord model picker preferences", "discord/model-picker-preferences.json"],
["Feishu dedupe sidecar", "feishu/dedup/default.json"],
["Telegram bot info cache", "telegram/bot-info-default.json"],
["Telegram update offset", "telegram/update-offset-default.json"],
["Telegram sticker cache", "telegram/sticker-cache.json"],
["Telegram thread bindings", "telegram/thread-bindings-default.json"],
["Telegram pairing allowFrom", "credentials/telegram-allowFrom.json"],
["WhatsApp root auth", "credentials/creds.json"],
])("runs doctor flow for bundled channel legacy state: %s", async (_label, relativePath) => {

View File

@@ -57,6 +57,15 @@ function isLegacyWhatsAppAuthFile(name: string): boolean {
return name.endsWith(".json") && /^(app-state-sync|session|sender-key|pre-key)-/.test(name);
}
function isLegacyTelegramStateFile(name: string): boolean {
return (
(name.startsWith("bot-info-") && name.endsWith(".json")) ||
(name.startsWith("update-offset-") && name.endsWith(".json")) ||
name === "sticker-cache.json" ||
(name.startsWith("thread-bindings-") && name.endsWith(".json"))
);
}
function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir: string): boolean {
if (fileOrDirExists(path.join(stateDir, "discord", "model-picker-preferences.json"))) {
return true;
@@ -66,10 +75,7 @@ function hasBundledChannelLegacyStateMigrationInputs(stateDir: string, oauthDir:
}
if (
fileOrDirExists(path.join(oauthDir, "telegram-allowFrom.json")) ||
dirHasFile(
path.join(stateDir, "telegram"),
(name) => name.startsWith("bot-info-") && name.endsWith(".json"),
)
dirHasFile(path.join(stateDir, "telegram"), isLegacyTelegramStateFile)
) {
return true;
}

View File

@@ -880,6 +880,58 @@ describe("doctor legacy state migrations", () => {
});
});
it("replaces existing plugin-state entries when a channel import plan asks for it", async () => {
const root = await makeTempRoot();
const sourcePath = path.join(root, "legacy-cache.json");
fs.writeFileSync(sourcePath, "legacy", "utf-8");
mockedChannelMigrationPlans.plans = [
{
kind: "plugin-state-import",
label: "Test replace cache",
sourcePath,
targetPath: "plugin state:test.replace-cache",
pluginId: "telegram",
namespace: "test.replace-cache",
maxEntries: 4,
scopeKey: "",
cleanupSource: "rename",
readEntries: () => [{ key: "existing", value: { offset: 20 } }],
shouldReplaceExistingEntry: (params: { existingValue: unknown; incomingValue: unknown }) =>
(params.incomingValue as { offset: number }).offset >
(params.existingValue as { offset: number }).offset,
},
];
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ offset: number }>("telegram", {
namespace: "test.replace-cache",
maxEntries: 4,
});
await store.register("existing", { offset: 10 });
});
resetPluginStateStoreForTests();
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([]);
expect(result.changes).toContain("Migrated 1 Test replace cache entry → plugin state");
expect(result.changes).toContain(
`Archived Test replace cache legacy source → ${sourcePath}.migrated`,
);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ offset: number }>("telegram", {
namespace: "test.replace-cache",
maxEntries: 4,
});
expect(await store.lookup("existing")).toStrictEqual({ offset: 20 });
});
});
it("keeps plugin-state import source when plugin cap eviction drops an imported row", async () => {
const root = await makeTempRoot();
const maxPluginStateEntries = 40;

View File

@@ -979,58 +979,87 @@ async function runLegacyMigrationPlans(
const expectedKeys = new Set(existingKeys);
let remainingCapacity = Math.max(0, plan.maxEntries - storeEntries.length);
const entries = await plan.readEntries();
const missingEntries = entries.filter(
({ key }) => !existingKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
);
const candidateEntries: Array<{
key: string;
targetKey: string;
value: unknown;
ttlMs?: number;
existedBefore: boolean;
}> = [];
const failedTargetKeys = new Set<string>();
let missingEntryCount = 0;
for (const entry of entries) {
const targetKey = resolvePluginStateImportTargetKey(plan.scopeKey, entry.key);
const existingValue = existingValuesByKey.get(targetKey);
if (existingKeys.has(targetKey)) {
const shouldReplace =
existingValue !== undefined &&
(await plan.shouldReplaceExistingEntry?.({
key: entry.key,
existingValue,
incomingValue: entry.value,
}));
if (shouldReplace) {
candidateEntries.push({ ...entry, targetKey, existedBefore: true });
}
continue;
}
candidateEntries.push({ ...entry, targetKey, existedBefore: false });
missingEntryCount++;
}
const pluginRemainingCapacity = Math.max(
0,
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN - pluginEntryCount,
);
if (missingEntries.length > pluginRemainingCapacity) {
if (missingEntryCount > pluginRemainingCapacity) {
warnings.push(
`Skipped migrating ${plan.label} because plugin state has room for ${pluginRemainingCapacity} of ${missingEntries.length} missing entries; left legacy source in place`,
`Skipped migrating ${plan.label} because plugin state has room for ${pluginRemainingCapacity} of ${missingEntryCount} missing entries; left legacy source in place`,
);
return;
}
let imported = 0;
const importedKeys: string[] = [];
for (const entry of entries) {
const targetKey = resolvePluginStateImportTargetKey(plan.scopeKey, entry.key);
if (existingKeys.has(targetKey)) {
continue;
}
if (remainingCapacity <= 0) {
const changedKeys: string[] = [];
for (const entry of candidateEntries) {
if (!entry.existedBefore && remainingCapacity <= 0) {
break;
}
try {
await store.register(
targetKey,
entry.targetKey,
entry.value,
entry.ttlMs != null ? { ttlMs: entry.ttlMs } : undefined,
);
const nextExpectedKeys = new Set(expectedKeys);
nextExpectedKeys.add(targetKey);
nextExpectedKeys.add(entry.targetKey);
const liveKeys = new Set((await store.entries()).map(({ key }) => key));
const missingKey = findMissingKey(nextExpectedKeys, liveKeys);
if (missingKey) {
for (const importedKey of importedKeys.toReversed()) {
await store.delete(importedKey);
for (const changedKey of changedKeys.toReversed()) {
if (existingValuesByKey.has(changedKey)) {
await store.register(changedKey, existingValuesByKey.get(changedKey));
} else {
await store.delete(changedKey);
}
}
await store.delete(targetKey);
if (existingValuesByKey.has(missingKey)) {
await store.register(missingKey, existingValuesByKey.get(missingKey));
if (existingValuesByKey.has(entry.targetKey)) {
await store.register(entry.targetKey, existingValuesByKey.get(entry.targetKey));
} else {
await store.delete(entry.targetKey);
}
warnings.push(
`Stopped migrating ${plan.label} because plugin state cap evicted ${missingKey}; left legacy source in place`,
);
return;
}
expectedKeys.add(targetKey);
existingKeys.add(targetKey);
importedKeys.push(targetKey);
remainingCapacity--;
expectedKeys.add(entry.targetKey);
existingKeys.add(entry.targetKey);
changedKeys.push(entry.targetKey);
if (!entry.existedBefore) {
remainingCapacity--;
}
imported++;
} catch (err) {
failedTargetKeys.add(entry.targetKey);
warnings.push(`Failed migrating ${plan.label} entry ${entry.key}: ${String(err)}`);
}
}
@@ -1045,8 +1074,10 @@ async function runLegacyMigrationPlans(
}
const allEntriesCovered =
entries.length > 0 &&
entries.every(({ key }) =>
cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
entries.every(
({ key }) =>
cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)) &&
!failedTargetKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
);
if (allEntriesCovered && plan.cleanupSource === "rename" && fileExists(plan.sourcePath)) {
const archivedPath = `${plan.sourcePath}.migrated`;

View File

@@ -378,7 +378,6 @@ describe("scoped vitest configs", () => {
it("keeps scoped lanes on threads with the shared non-isolated runner", () => {
for (const config of [
defaultChannelsConfig,
defaultAcpConfig,
defaultExtensionsConfig,
defaultExtensionChannelsConfig,
@@ -453,8 +452,8 @@ describe("scoped vitest configs", () => {
expect(testConfig.exclude).not.toContain("chat/slash-command-executor.node.test.ts");
});
it("defaults channel tests to threads with the non-isolated runner", () => {
expectThreadedNonIsolatedRunner(defaultChannelsConfig);
it("defaults channel tests to isolated threads", () => {
expectThreadedIsolatedRunner(defaultChannelsConfig);
});
it("keeps the core channel lane limited to non-extension roots", () => {

View File

@@ -12,6 +12,7 @@ export function createChannelsVitestConfig(env?: Record<string, string | undefin
return createScopedVitestConfig(loadIncludePatternsFromEnv(env) ?? coreChannelTestInclude, {
env,
exclude: ["src/gateway/**", "src/channels/plugins/contracts/**"],
isolate: true,
name: "channels",
passWithNoTests: true,
});