diff --git a/extensions/telegram/src/message-cache.test.ts b/extensions/telegram/src/message-cache.test.ts index 04608709f0b..8efe4c217e9 100644 --- a/extensions/telegram/src/message-cache.test.ts +++ b/extensions/telegram/src/message-cache.test.ts @@ -1,6 +1,6 @@ -import { rm } from "node:fs/promises"; +import { readFile, rm } from "node:fs/promises"; import type { Message } from "@grammyjs/types"; -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { buildTelegramReplyChain, createTelegramMessageCache, @@ -48,8 +48,10 @@ describe("telegram message cache", () => { } as Message, }); - const secondCache = createTelegramMessageCache({ persistedPath }); - const chain = buildTelegramReplyChain({ + vi.resetModules(); + const reloaded = await import("./message-cache.js"); + const secondCache = reloaded.createTelegramMessageCache({ persistedPath }); + const chain = reloaded.buildTelegramReplyChain({ cache: secondCache, accountId: "default", chatId: 7, @@ -147,4 +149,74 @@ describe("telegram message cache", () => { await rm(persistedPath, { force: true }); } }); + + it("appends cached records between compactions and reloads the bounded cache window", async () => { + const storePath = `/tmp/openclaw-telegram-message-cache-append-${process.pid}-${Date.now()}.json`; + const persistedPath = resolveTelegramMessageCachePath(storePath); + await rm(persistedPath, { force: true }); + try { + const cache = createTelegramMessageCache({ persistedPath, maxMessages: 4 }); + for (let index = 0; index < 5; index++) { + cache.record({ + accountId: "default", + chatId: 7, + msg: { + chat: { id: 7, type: "private", first_name: "Nora" }, + message_id: 9150 + index, + date: 1736380700 + index, + text: `Message ${index}`, + from: { id: 1, is_bot: false, first_name: "Nora" }, + } as Message, + }); + } + + const lines = (await readFile(persistedPath, "utf-8")).trim().split("\n"); + expect(lines).toHaveLength(5); + + vi.resetModules(); + const reloaded = await import("./message-cache.js"); + const reloadedCache = reloaded.createTelegramMessageCache({ persistedPath, maxMessages: 4 }); + expect(reloadedCache.get({ accountId: "default", chatId: 7, messageId: "9150" })).toBeNull(); + expect( + reloadedCache.get({ accountId: "default", chatId: 7, messageId: "9151" })?.messageId, + ).toBe("9151"); + } finally { + await rm(persistedPath, { force: true }); + } + }); + + it("keeps the persisted log bounded by compacting cached records", async () => { + const storePath = `/tmp/openclaw-telegram-message-cache-compact-${process.pid}-${Date.now()}.json`; + const persistedPath = resolveTelegramMessageCachePath(storePath); + await rm(persistedPath, { force: true }); + try { + const cache = createTelegramMessageCache({ persistedPath, maxMessages: 3 }); + for (let index = 0; index < 7; index++) { + cache.record({ + accountId: "default", + chatId: 7, + msg: { + chat: { id: 7, type: "private", first_name: "Nora" }, + message_id: 9200 + index, + date: 1736380700 + index, + text: `Message ${index}`, + from: { id: 1, is_bot: false, first_name: "Nora" }, + } as Message, + }); + } + + const lines = (await readFile(persistedPath, "utf-8")).trim().split("\n"); + expect(lines).toHaveLength(3); + expect( + lines.map((line) => { + const entry = JSON.parse(line) as { + node: { sourceMessage: { message_id: number } }; + }; + return entry.node.sourceMessage.message_id; + }), + ).toEqual([9204, 9205, 9206]); + } finally { + await rm(persistedPath, { force: true }); + } + }); }); diff --git a/extensions/telegram/src/message-cache.ts b/extensions/telegram/src/message-cache.ts index c71eb042337..a0ff1598602 100644 --- a/extensions/telegram/src/message-cache.ts +++ b/extensions/telegram/src/message-cache.ts @@ -3,7 +3,7 @@ import type { Message } from "@grammyjs/types"; import { formatLocationText } from "openclaw/plugin-sdk/channel-inbound"; import type { MsgContext } from "openclaw/plugin-sdk/reply-runtime"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; -import { replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime"; +import { appendRegularFileSync, replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime"; import { resolveTelegramPrimaryMedia } from "./bot/body-helpers.js"; import { buildSenderName, @@ -36,9 +36,11 @@ type MessageWithExternalReply = Message & { external_reply?: Message }; type TelegramMessageCacheBucket = { messages: Map; + persistedEntryCount: number; }; const DEFAULT_MAX_MESSAGES = 5000; +const COMPACT_THRESHOLD_RATIO = 2; const persistedMessageCacheBuckets = new Map(); function telegramMessageCacheKey(params: { @@ -136,55 +138,100 @@ function parsePersistedNode(value: unknown): TelegramCachedMessageNode | null { return normalizeMessageNode(value.sourceMessage, Number.isFinite(threadId) ? { threadId } : {}); } +function parsePersistedEntry(value: unknown): { + key: string; + node: TelegramCachedMessageNode; +} | null { + if (!isRecord(value) || !isString(value.key)) { + return null; + } + const node = parsePersistedNode(value.node); + return node ? { key: value.key, node } : null; +} + +function trimMessages(messages: Map, maxMessages: number): void { + while (messages.size > maxMessages) { + const oldest = messages.keys().next().value; + if (oldest === undefined) { + break; + } + messages.delete(oldest); + } +} + function readPersistedMessages(filePath: string, maxMessages: number) { const messages = new Map(); + let persistedEntryCount = 0; if (!fs.existsSync(filePath)) { - return messages; + return { messages, persistedEntryCount }; } try { - const parsed = JSON.parse(fs.readFileSync(filePath, "utf-8")); - if (!Array.isArray(parsed)) { - return messages; - } - for (const entry of parsed.slice(-maxMessages)) { - if (!isRecord(entry) || !isString(entry.key)) { + for (const line of fs.readFileSync(filePath, "utf-8").split("\n")) { + if (!line.trim()) { continue; } - const node = parsePersistedNode(entry.node); - if (node) { - messages.set(entry.key, node); + const entry = parsePersistedEntry(JSON.parse(line)); + if (!entry) { + continue; } + persistedEntryCount++; + messages.delete(entry.key); + messages.set(entry.key, entry.node); + trimMessages(messages, maxMessages); } } catch (error) { logVerbose(`telegram: failed to read message cache: ${String(error)}`); } - return messages; + return { messages, persistedEntryCount }; } -function persistMessages(params: { - messages: Map; - persistedPath?: string; -}) { - const { persistedPath, messages } = params; - if (!persistedPath) { - return; - } - if (messages.size === 0) { - fs.rmSync(persistedPath, { force: true }); - return; - } - const serialized = Array.from(messages, ([key, node]) => ({ +function serializePersistedEntry(key: string, node: TelegramCachedMessageNode): string { + return `${JSON.stringify({ key, node: { sourceMessage: node.sourceMessage, ...(node.threadId ? { threadId: node.threadId } : {}), }, - })); + })}\n`; +} + +function replacePersistedMessages(params: { + messages: Map; + persistedPath?: string; +}): number { + const { persistedPath, messages } = params; + if (!persistedPath) { + return messages.size; + } + if (messages.size === 0) { + fs.rmSync(persistedPath, { force: true }); + return 0; + } + const serialized = Array.from(messages, ([key, node]) => serializePersistedEntry(key, node)).join( + "", + ); replaceFileAtomicSync({ filePath: persistedPath, - content: JSON.stringify(serialized), + content: serialized, tempPrefix: ".telegram-message-cache", }); + return messages.size; +} + +function appendPersistedMessage(params: { + key: string; + node: TelegramCachedMessageNode; + persistedPath?: string; +}): number { + const { persistedPath } = params; + if (!persistedPath) { + return 0; + } + appendRegularFileSync({ + filePath: persistedPath, + content: serializePersistedEntry(params.key, params.node), + }); + return 1; } function resolveMessageCacheBucket(params: { @@ -193,17 +240,20 @@ function resolveMessageCacheBucket(params: { }): TelegramMessageCacheBucket { const { persistedPath, maxMessages } = params; if (!persistedPath) { - return { messages: new Map() }; + return { messages: new Map(), persistedEntryCount: 0 }; } const existing = persistedMessageCacheBuckets.get(persistedPath); if (existing) { if (!fs.existsSync(persistedPath)) { existing.messages.clear(); + existing.persistedEntryCount = 0; } return existing; } + const persisted = readPersistedMessages(persistedPath, maxMessages); const bucket = { - messages: readPersistedMessages(persistedPath, maxMessages), + messages: persisted.messages, + persistedEntryCount: persisted.persistedEntryCount, }; persistedMessageCacheBuckets.set(persistedPath, bucket); return bucket; @@ -214,10 +264,11 @@ export function createTelegramMessageCache(params?: { persistedPath?: string; }): TelegramMessageCache { const maxMessages = params?.maxMessages ?? DEFAULT_MAX_MESSAGES; - const { messages } = resolveMessageCacheBucket({ + const bucket = resolveMessageCacheBucket({ persistedPath: params?.persistedPath, maxMessages, }); + const { messages } = bucket; const get: TelegramMessageCache["get"] = ({ accountId, chatId, messageId }) => { if (!messageId) { @@ -242,15 +293,19 @@ export function createTelegramMessageCache(params?: { const key = telegramMessageCacheKey({ accountId, chatId, messageId: entry.messageId }); messages.delete(key); messages.set(key, entry); - while (messages.size > maxMessages) { - const oldest = messages.keys().next().value; - if (oldest === undefined) { - break; - } - messages.delete(oldest); - } + trimMessages(messages, maxMessages); try { - persistMessages({ messages, persistedPath: params?.persistedPath }); + bucket.persistedEntryCount += appendPersistedMessage({ + key, + node: entry, + persistedPath: params?.persistedPath, + }); + if (bucket.persistedEntryCount > maxMessages * COMPACT_THRESHOLD_RATIO) { + bucket.persistedEntryCount = replacePersistedMessages({ + messages, + persistedPath: params?.persistedPath, + }); + } } catch (error) { logVerbose(`telegram: failed to persist message cache: ${String(error)}`); }