From 741eafea5fc7fda8125d4e712e20314c447b5f88 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sun, 17 May 2026 03:43:57 +0000 Subject: [PATCH] fix(telegram): distinguish partial reply snapshots --- extensions/telegram/src/message-cache.test.ts | 51 ++++++++++++ extensions/telegram/src/message-cache.ts | 83 ++++++++++++++----- 2 files changed, 111 insertions(+), 23 deletions(-) diff --git a/extensions/telegram/src/message-cache.test.ts b/extensions/telegram/src/message-cache.test.ts index 97af625f891..723dca46017 100644 --- a/extensions/telegram/src/message-cache.test.ts +++ b/extensions/telegram/src/message-cache.test.ts @@ -221,6 +221,57 @@ describe("telegram message cache", () => { } }); + it("replaces authoritative edited message fields without stale caption carryover", () => { + const cache = createTelegramMessageCache(); + const chat = { id: 7, type: "group", title: "Ops" } as const; + cache.record({ + accountId: "default", + chatId: 7, + msg: { + chat, + message_id: 104, + date: 1736380900, + caption: "old caption", + from: { id: 999, is_bot: true, first_name: "Bot" }, + photo: [ + { + file_id: "generated-photo-2", + file_unique_id: "generated-photo-unique-2", + width: 640, + height: 480, + }, + ], + } as Message, + }); + + const updated = cache.record({ + accountId: "default", + chatId: 7, + msg: { + chat, + message_id: 104, + date: 1736380900, + edit_date: 1736380910, + from: { id: 999, is_bot: true, first_name: "Bot" }, + photo: [ + { + file_id: "generated-photo-2", + file_unique_id: "generated-photo-unique-2", + width: 640, + height: 480, + }, + ], + } as Message, + }); + + expect(updated).toMatchObject({ + messageId: "104", + body: "", + mediaRef: "telegram:file/generated-photo-2", + }); + expect(updated?.body).not.toBe("old caption"); + }); + it("shares one persisted bucket across live cache instances", async () => { const storePath = `/tmp/openclaw-telegram-message-cache-shared-${process.pid}-${Date.now()}.json`; const persistedPath = resolveTelegramMessageCachePath(storePath); diff --git a/extensions/telegram/src/message-cache.ts b/extensions/telegram/src/message-cache.ts index 48c9539a056..18f95133389 100644 --- a/extensions/telegram/src/message-cache.ts +++ b/extensions/telegram/src/message-cache.ts @@ -63,6 +63,13 @@ type PersistedMessageReadResult = TelegramMessageCacheBucket & { needsRewrite: boolean; }; +type TelegramMessageObservationMode = "authoritative" | "partial"; + +type TelegramCachedMessageObservation = { + node: TelegramCachedMessageNode; + mode: TelegramMessageObservationMode; +}; + const DEFAULT_MAX_MESSAGES = 5000; const COMPACT_THRESHOLD_RATIO = 2; const persistedMessageCacheBuckets = new Map(); @@ -164,14 +171,18 @@ function resolveMessageThreadId(msg: Message): number | undefined { function normalizeMessageNodes( msg: Message, params: { threadId?: number }, -): TelegramCachedMessageNode[] { - const nodes: TelegramCachedMessageNode[] = []; +): TelegramCachedMessageObservation[] { + const observations: TelegramCachedMessageObservation[] = []; const visited = new Set(); const nodeThreadId = (node: TelegramCachedMessageNode) => { const threadId = Number(node.threadId); return Number.isFinite(threadId) ? threadId : undefined; }; - const visit = (message: Message, inheritedThreadId?: number) => { + const visit = ( + message: Message, + inheritedThreadId: number | undefined, + mode: TelegramMessageObservationMode, + ) => { const node = normalizeMessageNode(message, { threadId: resolveMessageThreadId(message) ?? inheritedThreadId, }); @@ -181,12 +192,12 @@ function normalizeMessageNodes( visited.add(node.messageId); const replyMessage = resolveEmbeddedReplyMessage(message); if (replyMessage?.message_id != null) { - visit(replyMessage, nodeThreadId(node) ?? inheritedThreadId); + visit(replyMessage, nodeThreadId(node) ?? inheritedThreadId, "partial"); } - nodes.push(node); + observations.push({ node, mode }); }; - visit(msg, params.threadId); - return nodes; + visit(msg, params.threadId, "authoritative"); + return observations; } function isRecord(value: unknown): value is Record { @@ -215,6 +226,7 @@ function isTelegramSourceMessage(value: unknown): value is Message { function parsePersistedEntry(value: unknown): Array<{ key: string; node: TelegramCachedMessageNode; + mode: TelegramMessageObservationMode; }> { if (!isRecord(value) || !isString(value.key)) { return []; @@ -229,10 +241,15 @@ function parsePersistedEntry(value: unknown): Array<{ } const keyPrefix = value.key.slice(0, separatorIndex + 1); const threadId = Number(readOptionalString(value.node, "threadId")); + const sourceMessageId = String(value.node.sourceMessage.message_id); return normalizeMessageNodes( value.node.sourceMessage, Number.isFinite(threadId) ? { threadId } : {}, - ).map((node) => ({ key: `${keyPrefix}${node.messageId}`, node })); + ).map(({ node, mode }) => ({ + key: `${keyPrefix}${node.messageId}`, + node, + mode: node.messageId === sourceMessageId ? "authoritative" : mode, + })); } function findJsonArrayEnd(text: string): number { @@ -337,26 +354,41 @@ function mergeTelegramSourceMessage(existing: Message, incoming: Message): Messa return merged; } +function mergeAuthoritativeTelegramSourceMessage(existing: Message, incoming: Message): Message { + const existingReply = resolveEmbeddedReplyMessage(existing); + const incomingReply = resolveEmbeddedReplyMessage(incoming); + if (existingReply?.message_id != null && incomingReply?.message_id === existingReply.message_id) { + return { + ...incoming, + reply_to_message: mergeTelegramSourceMessage(existingReply, incomingReply), + }; + } + return incoming; +} + function mergeCachedMessageNode( existing: TelegramCachedMessageNode, incoming: TelegramCachedMessageNode, + mode: TelegramMessageObservationMode, ): TelegramCachedMessageNode { const threadId = Number(incoming.threadId ?? existing.threadId); - return normalizeRequiredMessageNode( - mergeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage), - { - ...(Number.isFinite(threadId) ? { threadId } : {}), - }, - ); + const sourceMessage = + mode === "authoritative" + ? mergeAuthoritativeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage) + : mergeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage); + return normalizeRequiredMessageNode(sourceMessage, { + ...(Number.isFinite(threadId) ? { threadId } : {}), + }); } function upsertCachedMessageNode(params: { messages: Map; key: string; node: TelegramCachedMessageNode; + mode: TelegramMessageObservationMode; }): TelegramCachedMessageNode { const existing = params.messages.get(params.key); - const node = existing ? mergeCachedMessageNode(existing, params.node) : params.node; + const node = existing ? mergeCachedMessageNode(existing, params.node, params.mode) : params.node; params.messages.delete(params.key); params.messages.set(params.key, node); return node; @@ -375,7 +407,12 @@ function readPersistedMessages(filePath: string, maxMessages: number): Persisted for (const value of persisted.values) { for (const entry of parsePersistedEntry(value)) { persistedEntryCount++; - upsertCachedMessageNode({ messages, key: entry.key, node: entry.node }); + upsertCachedMessageNode({ + messages, + key: entry.key, + node: entry.node, + mode: entry.mode, + }); trimMessages(messages, maxMessages); } } @@ -515,16 +552,16 @@ export function createTelegramMessageCache(params?: { return { record: ({ accountId, chatId, msg, threadId }) => { - const entries = normalizeMessageNodes(msg, { threadId }); - const entry = entries.at(-1); - if (!entry) { + const observations = normalizeMessageNodes(msg, { threadId }); + const currentObservation = observations.at(-1); + if (!currentObservation) { return null; } let recordedEntry: TelegramCachedMessageNode | null = null; - for (const node of entries) { + for (const { node, mode } of observations) { const key = telegramMessageCacheKey({ accountId, chatId, messageId: node.messageId }); - const cachedNode = upsertCachedMessageNode({ messages, key, node }); - if (node.messageId === entry.messageId) { + const cachedNode = upsertCachedMessageNode({ messages, key, node, mode }); + if (node.messageId === currentObservation.node.messageId) { recordedEntry = cachedNode; } trimMessages(messages, maxMessages); @@ -544,7 +581,7 @@ export function createTelegramMessageCache(params?: { logVerbose(`telegram: failed to persist message cache: ${String(error)}`); } } - return recordedEntry ?? entry; + return recordedEntry ?? currentObservation.node; }, get, recentBefore: ({ accountId, chatId, messageId, threadId, limit }) => {