perf(telegram): append reply-chain cache records

This commit is contained in:
Ayaan Zaidi
2026-05-09 17:22:12 +05:30
parent b39daef3da
commit aec262034b
2 changed files with 169 additions and 42 deletions

View File

@@ -1,6 +1,6 @@
import { rm } from "node:fs/promises";
import { readFile, rm } from "node:fs/promises";
import type { Message } from "@grammyjs/types";
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import {
buildTelegramReplyChain,
createTelegramMessageCache,
@@ -48,8 +48,10 @@ describe("telegram message cache", () => {
} as Message,
});
const secondCache = createTelegramMessageCache({ persistedPath });
const chain = buildTelegramReplyChain({
vi.resetModules();
const reloaded = await import("./message-cache.js");
const secondCache = reloaded.createTelegramMessageCache({ persistedPath });
const chain = reloaded.buildTelegramReplyChain({
cache: secondCache,
accountId: "default",
chatId: 7,
@@ -147,4 +149,74 @@ describe("telegram message cache", () => {
await rm(persistedPath, { force: true });
}
});
it("appends cached records between compactions and reloads the bounded cache window", async () => {
const storePath = `/tmp/openclaw-telegram-message-cache-append-${process.pid}-${Date.now()}.json`;
const persistedPath = resolveTelegramMessageCachePath(storePath);
await rm(persistedPath, { force: true });
try {
const cache = createTelegramMessageCache({ persistedPath, maxMessages: 4 });
for (let index = 0; index < 5; index++) {
cache.record({
accountId: "default",
chatId: 7,
msg: {
chat: { id: 7, type: "private", first_name: "Nora" },
message_id: 9150 + index,
date: 1736380700 + index,
text: `Message ${index}`,
from: { id: 1, is_bot: false, first_name: "Nora" },
} as Message,
});
}
const lines = (await readFile(persistedPath, "utf-8")).trim().split("\n");
expect(lines).toHaveLength(5);
vi.resetModules();
const reloaded = await import("./message-cache.js");
const reloadedCache = reloaded.createTelegramMessageCache({ persistedPath, maxMessages: 4 });
expect(reloadedCache.get({ accountId: "default", chatId: 7, messageId: "9150" })).toBeNull();
expect(
reloadedCache.get({ accountId: "default", chatId: 7, messageId: "9151" })?.messageId,
).toBe("9151");
} finally {
await rm(persistedPath, { force: true });
}
});
it("keeps the persisted log bounded by compacting cached records", async () => {
const storePath = `/tmp/openclaw-telegram-message-cache-compact-${process.pid}-${Date.now()}.json`;
const persistedPath = resolveTelegramMessageCachePath(storePath);
await rm(persistedPath, { force: true });
try {
const cache = createTelegramMessageCache({ persistedPath, maxMessages: 3 });
for (let index = 0; index < 7; index++) {
cache.record({
accountId: "default",
chatId: 7,
msg: {
chat: { id: 7, type: "private", first_name: "Nora" },
message_id: 9200 + index,
date: 1736380700 + index,
text: `Message ${index}`,
from: { id: 1, is_bot: false, first_name: "Nora" },
} as Message,
});
}
const lines = (await readFile(persistedPath, "utf-8")).trim().split("\n");
expect(lines).toHaveLength(3);
expect(
lines.map((line) => {
const entry = JSON.parse(line) as {
node: { sourceMessage: { message_id: number } };
};
return entry.node.sourceMessage.message_id;
}),
).toEqual([9204, 9205, 9206]);
} finally {
await rm(persistedPath, { force: true });
}
});
});

View File

@@ -3,7 +3,7 @@ import type { Message } from "@grammyjs/types";
import { formatLocationText } from "openclaw/plugin-sdk/channel-inbound";
import type { MsgContext } from "openclaw/plugin-sdk/reply-runtime";
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
import { replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime";
import { appendRegularFileSync, replaceFileAtomicSync } from "openclaw/plugin-sdk/security-runtime";
import { resolveTelegramPrimaryMedia } from "./bot/body-helpers.js";
import {
buildSenderName,
@@ -36,9 +36,11 @@ type MessageWithExternalReply = Message & { external_reply?: Message };
type TelegramMessageCacheBucket = {
messages: Map<string, TelegramCachedMessageNode>;
persistedEntryCount: number;
};
const DEFAULT_MAX_MESSAGES = 5000;
const COMPACT_THRESHOLD_RATIO = 2;
const persistedMessageCacheBuckets = new Map<string, TelegramMessageCacheBucket>();
function telegramMessageCacheKey(params: {
@@ -136,55 +138,100 @@ function parsePersistedNode(value: unknown): TelegramCachedMessageNode | null {
return normalizeMessageNode(value.sourceMessage, Number.isFinite(threadId) ? { threadId } : {});
}
function parsePersistedEntry(value: unknown): {
key: string;
node: TelegramCachedMessageNode;
} | null {
if (!isRecord(value) || !isString(value.key)) {
return null;
}
const node = parsePersistedNode(value.node);
return node ? { key: value.key, node } : null;
}
function trimMessages(messages: Map<string, TelegramCachedMessageNode>, maxMessages: number): void {
while (messages.size > maxMessages) {
const oldest = messages.keys().next().value;
if (oldest === undefined) {
break;
}
messages.delete(oldest);
}
}
function readPersistedMessages(filePath: string, maxMessages: number) {
const messages = new Map<string, TelegramCachedMessageNode>();
let persistedEntryCount = 0;
if (!fs.existsSync(filePath)) {
return messages;
return { messages, persistedEntryCount };
}
try {
const parsed = JSON.parse(fs.readFileSync(filePath, "utf-8"));
if (!Array.isArray(parsed)) {
return messages;
}
for (const entry of parsed.slice(-maxMessages)) {
if (!isRecord(entry) || !isString(entry.key)) {
for (const line of fs.readFileSync(filePath, "utf-8").split("\n")) {
if (!line.trim()) {
continue;
}
const node = parsePersistedNode(entry.node);
if (node) {
messages.set(entry.key, node);
const entry = parsePersistedEntry(JSON.parse(line));
if (!entry) {
continue;
}
persistedEntryCount++;
messages.delete(entry.key);
messages.set(entry.key, entry.node);
trimMessages(messages, maxMessages);
}
} catch (error) {
logVerbose(`telegram: failed to read message cache: ${String(error)}`);
}
return messages;
return { messages, persistedEntryCount };
}
function persistMessages(params: {
messages: Map<string, TelegramCachedMessageNode>;
persistedPath?: string;
}) {
const { persistedPath, messages } = params;
if (!persistedPath) {
return;
}
if (messages.size === 0) {
fs.rmSync(persistedPath, { force: true });
return;
}
const serialized = Array.from(messages, ([key, node]) => ({
function serializePersistedEntry(key: string, node: TelegramCachedMessageNode): string {
return `${JSON.stringify({
key,
node: {
sourceMessage: node.sourceMessage,
...(node.threadId ? { threadId: node.threadId } : {}),
},
}));
})}\n`;
}
function replacePersistedMessages(params: {
messages: Map<string, TelegramCachedMessageNode>;
persistedPath?: string;
}): number {
const { persistedPath, messages } = params;
if (!persistedPath) {
return messages.size;
}
if (messages.size === 0) {
fs.rmSync(persistedPath, { force: true });
return 0;
}
const serialized = Array.from(messages, ([key, node]) => serializePersistedEntry(key, node)).join(
"",
);
replaceFileAtomicSync({
filePath: persistedPath,
content: JSON.stringify(serialized),
content: serialized,
tempPrefix: ".telegram-message-cache",
});
return messages.size;
}
function appendPersistedMessage(params: {
key: string;
node: TelegramCachedMessageNode;
persistedPath?: string;
}): number {
const { persistedPath } = params;
if (!persistedPath) {
return 0;
}
appendRegularFileSync({
filePath: persistedPath,
content: serializePersistedEntry(params.key, params.node),
});
return 1;
}
function resolveMessageCacheBucket(params: {
@@ -193,17 +240,20 @@ function resolveMessageCacheBucket(params: {
}): TelegramMessageCacheBucket {
const { persistedPath, maxMessages } = params;
if (!persistedPath) {
return { messages: new Map<string, TelegramCachedMessageNode>() };
return { messages: new Map<string, TelegramCachedMessageNode>(), persistedEntryCount: 0 };
}
const existing = persistedMessageCacheBuckets.get(persistedPath);
if (existing) {
if (!fs.existsSync(persistedPath)) {
existing.messages.clear();
existing.persistedEntryCount = 0;
}
return existing;
}
const persisted = readPersistedMessages(persistedPath, maxMessages);
const bucket = {
messages: readPersistedMessages(persistedPath, maxMessages),
messages: persisted.messages,
persistedEntryCount: persisted.persistedEntryCount,
};
persistedMessageCacheBuckets.set(persistedPath, bucket);
return bucket;
@@ -214,10 +264,11 @@ export function createTelegramMessageCache(params?: {
persistedPath?: string;
}): TelegramMessageCache {
const maxMessages = params?.maxMessages ?? DEFAULT_MAX_MESSAGES;
const { messages } = resolveMessageCacheBucket({
const bucket = resolveMessageCacheBucket({
persistedPath: params?.persistedPath,
maxMessages,
});
const { messages } = bucket;
const get: TelegramMessageCache["get"] = ({ accountId, chatId, messageId }) => {
if (!messageId) {
@@ -242,15 +293,19 @@ export function createTelegramMessageCache(params?: {
const key = telegramMessageCacheKey({ accountId, chatId, messageId: entry.messageId });
messages.delete(key);
messages.set(key, entry);
while (messages.size > maxMessages) {
const oldest = messages.keys().next().value;
if (oldest === undefined) {
break;
}
messages.delete(oldest);
}
trimMessages(messages, maxMessages);
try {
persistMessages({ messages, persistedPath: params?.persistedPath });
bucket.persistedEntryCount += appendPersistedMessage({
key,
node: entry,
persistedPath: params?.persistedPath,
});
if (bucket.persistedEntryCount > maxMessages * COMPACT_THRESHOLD_RATIO) {
bucket.persistedEntryCount = replacePersistedMessages({
messages,
persistedPath: params?.persistedPath,
});
}
} catch (error) {
logVerbose(`telegram: failed to persist message cache: ${String(error)}`);
}