mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 18:04:45 +00:00
fix(telegram): distinguish partial reply snapshots
This commit is contained in:
@@ -221,6 +221,57 @@ describe("telegram message cache", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("replaces authoritative edited message fields without stale caption carryover", () => {
|
||||
const cache = createTelegramMessageCache();
|
||||
const chat = { id: 7, type: "group", title: "Ops" } as const;
|
||||
cache.record({
|
||||
accountId: "default",
|
||||
chatId: 7,
|
||||
msg: {
|
||||
chat,
|
||||
message_id: 104,
|
||||
date: 1736380900,
|
||||
caption: "old caption",
|
||||
from: { id: 999, is_bot: true, first_name: "Bot" },
|
||||
photo: [
|
||||
{
|
||||
file_id: "generated-photo-2",
|
||||
file_unique_id: "generated-photo-unique-2",
|
||||
width: 640,
|
||||
height: 480,
|
||||
},
|
||||
],
|
||||
} as Message,
|
||||
});
|
||||
|
||||
const updated = cache.record({
|
||||
accountId: "default",
|
||||
chatId: 7,
|
||||
msg: {
|
||||
chat,
|
||||
message_id: 104,
|
||||
date: 1736380900,
|
||||
edit_date: 1736380910,
|
||||
from: { id: 999, is_bot: true, first_name: "Bot" },
|
||||
photo: [
|
||||
{
|
||||
file_id: "generated-photo-2",
|
||||
file_unique_id: "generated-photo-unique-2",
|
||||
width: 640,
|
||||
height: 480,
|
||||
},
|
||||
],
|
||||
} as Message,
|
||||
});
|
||||
|
||||
expect(updated).toMatchObject({
|
||||
messageId: "104",
|
||||
body: "<media:image>",
|
||||
mediaRef: "telegram:file/generated-photo-2",
|
||||
});
|
||||
expect(updated?.body).not.toBe("old caption");
|
||||
});
|
||||
|
||||
it("shares one persisted bucket across live cache instances", async () => {
|
||||
const storePath = `/tmp/openclaw-telegram-message-cache-shared-${process.pid}-${Date.now()}.json`;
|
||||
const persistedPath = resolveTelegramMessageCachePath(storePath);
|
||||
|
||||
@@ -63,6 +63,13 @@ type PersistedMessageReadResult = TelegramMessageCacheBucket & {
|
||||
needsRewrite: boolean;
|
||||
};
|
||||
|
||||
type TelegramMessageObservationMode = "authoritative" | "partial";
|
||||
|
||||
type TelegramCachedMessageObservation = {
|
||||
node: TelegramCachedMessageNode;
|
||||
mode: TelegramMessageObservationMode;
|
||||
};
|
||||
|
||||
const DEFAULT_MAX_MESSAGES = 5000;
|
||||
const COMPACT_THRESHOLD_RATIO = 2;
|
||||
const persistedMessageCacheBuckets = new Map<string, TelegramMessageCacheBucket>();
|
||||
@@ -164,14 +171,18 @@ function resolveMessageThreadId(msg: Message): number | undefined {
|
||||
function normalizeMessageNodes(
|
||||
msg: Message,
|
||||
params: { threadId?: number },
|
||||
): TelegramCachedMessageNode[] {
|
||||
const nodes: TelegramCachedMessageNode[] = [];
|
||||
): TelegramCachedMessageObservation[] {
|
||||
const observations: TelegramCachedMessageObservation[] = [];
|
||||
const visited = new Set<string>();
|
||||
const nodeThreadId = (node: TelegramCachedMessageNode) => {
|
||||
const threadId = Number(node.threadId);
|
||||
return Number.isFinite(threadId) ? threadId : undefined;
|
||||
};
|
||||
const visit = (message: Message, inheritedThreadId?: number) => {
|
||||
const visit = (
|
||||
message: Message,
|
||||
inheritedThreadId: number | undefined,
|
||||
mode: TelegramMessageObservationMode,
|
||||
) => {
|
||||
const node = normalizeMessageNode(message, {
|
||||
threadId: resolveMessageThreadId(message) ?? inheritedThreadId,
|
||||
});
|
||||
@@ -181,12 +192,12 @@ function normalizeMessageNodes(
|
||||
visited.add(node.messageId);
|
||||
const replyMessage = resolveEmbeddedReplyMessage(message);
|
||||
if (replyMessage?.message_id != null) {
|
||||
visit(replyMessage, nodeThreadId(node) ?? inheritedThreadId);
|
||||
visit(replyMessage, nodeThreadId(node) ?? inheritedThreadId, "partial");
|
||||
}
|
||||
nodes.push(node);
|
||||
observations.push({ node, mode });
|
||||
};
|
||||
visit(msg, params.threadId);
|
||||
return nodes;
|
||||
visit(msg, params.threadId, "authoritative");
|
||||
return observations;
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
@@ -215,6 +226,7 @@ function isTelegramSourceMessage(value: unknown): value is Message {
|
||||
function parsePersistedEntry(value: unknown): Array<{
|
||||
key: string;
|
||||
node: TelegramCachedMessageNode;
|
||||
mode: TelegramMessageObservationMode;
|
||||
}> {
|
||||
if (!isRecord(value) || !isString(value.key)) {
|
||||
return [];
|
||||
@@ -229,10 +241,15 @@ function parsePersistedEntry(value: unknown): Array<{
|
||||
}
|
||||
const keyPrefix = value.key.slice(0, separatorIndex + 1);
|
||||
const threadId = Number(readOptionalString(value.node, "threadId"));
|
||||
const sourceMessageId = String(value.node.sourceMessage.message_id);
|
||||
return normalizeMessageNodes(
|
||||
value.node.sourceMessage,
|
||||
Number.isFinite(threadId) ? { threadId } : {},
|
||||
).map((node) => ({ key: `${keyPrefix}${node.messageId}`, node }));
|
||||
).map(({ node, mode }) => ({
|
||||
key: `${keyPrefix}${node.messageId}`,
|
||||
node,
|
||||
mode: node.messageId === sourceMessageId ? "authoritative" : mode,
|
||||
}));
|
||||
}
|
||||
|
||||
function findJsonArrayEnd(text: string): number {
|
||||
@@ -337,26 +354,41 @@ function mergeTelegramSourceMessage(existing: Message, incoming: Message): Messa
|
||||
return merged;
|
||||
}
|
||||
|
||||
function mergeAuthoritativeTelegramSourceMessage(existing: Message, incoming: Message): Message {
|
||||
const existingReply = resolveEmbeddedReplyMessage(existing);
|
||||
const incomingReply = resolveEmbeddedReplyMessage(incoming);
|
||||
if (existingReply?.message_id != null && incomingReply?.message_id === existingReply.message_id) {
|
||||
return {
|
||||
...incoming,
|
||||
reply_to_message: mergeTelegramSourceMessage(existingReply, incomingReply),
|
||||
};
|
||||
}
|
||||
return incoming;
|
||||
}
|
||||
|
||||
function mergeCachedMessageNode(
|
||||
existing: TelegramCachedMessageNode,
|
||||
incoming: TelegramCachedMessageNode,
|
||||
mode: TelegramMessageObservationMode,
|
||||
): TelegramCachedMessageNode {
|
||||
const threadId = Number(incoming.threadId ?? existing.threadId);
|
||||
return normalizeRequiredMessageNode(
|
||||
mergeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage),
|
||||
{
|
||||
...(Number.isFinite(threadId) ? { threadId } : {}),
|
||||
},
|
||||
);
|
||||
const sourceMessage =
|
||||
mode === "authoritative"
|
||||
? mergeAuthoritativeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage)
|
||||
: mergeTelegramSourceMessage(existing.sourceMessage, incoming.sourceMessage);
|
||||
return normalizeRequiredMessageNode(sourceMessage, {
|
||||
...(Number.isFinite(threadId) ? { threadId } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function upsertCachedMessageNode(params: {
|
||||
messages: Map<string, TelegramCachedMessageNode>;
|
||||
key: string;
|
||||
node: TelegramCachedMessageNode;
|
||||
mode: TelegramMessageObservationMode;
|
||||
}): TelegramCachedMessageNode {
|
||||
const existing = params.messages.get(params.key);
|
||||
const node = existing ? mergeCachedMessageNode(existing, params.node) : params.node;
|
||||
const node = existing ? mergeCachedMessageNode(existing, params.node, params.mode) : params.node;
|
||||
params.messages.delete(params.key);
|
||||
params.messages.set(params.key, node);
|
||||
return node;
|
||||
@@ -375,7 +407,12 @@ function readPersistedMessages(filePath: string, maxMessages: number): Persisted
|
||||
for (const value of persisted.values) {
|
||||
for (const entry of parsePersistedEntry(value)) {
|
||||
persistedEntryCount++;
|
||||
upsertCachedMessageNode({ messages, key: entry.key, node: entry.node });
|
||||
upsertCachedMessageNode({
|
||||
messages,
|
||||
key: entry.key,
|
||||
node: entry.node,
|
||||
mode: entry.mode,
|
||||
});
|
||||
trimMessages(messages, maxMessages);
|
||||
}
|
||||
}
|
||||
@@ -515,16 +552,16 @@ export function createTelegramMessageCache(params?: {
|
||||
|
||||
return {
|
||||
record: ({ accountId, chatId, msg, threadId }) => {
|
||||
const entries = normalizeMessageNodes(msg, { threadId });
|
||||
const entry = entries.at(-1);
|
||||
if (!entry) {
|
||||
const observations = normalizeMessageNodes(msg, { threadId });
|
||||
const currentObservation = observations.at(-1);
|
||||
if (!currentObservation) {
|
||||
return null;
|
||||
}
|
||||
let recordedEntry: TelegramCachedMessageNode | null = null;
|
||||
for (const node of entries) {
|
||||
for (const { node, mode } of observations) {
|
||||
const key = telegramMessageCacheKey({ accountId, chatId, messageId: node.messageId });
|
||||
const cachedNode = upsertCachedMessageNode({ messages, key, node });
|
||||
if (node.messageId === entry.messageId) {
|
||||
const cachedNode = upsertCachedMessageNode({ messages, key, node, mode });
|
||||
if (node.messageId === currentObservation.node.messageId) {
|
||||
recordedEntry = cachedNode;
|
||||
}
|
||||
trimMessages(messages, maxMessages);
|
||||
@@ -544,7 +581,7 @@ export function createTelegramMessageCache(params?: {
|
||||
logVerbose(`telegram: failed to persist message cache: ${String(error)}`);
|
||||
}
|
||||
}
|
||||
return recordedEntry ?? entry;
|
||||
return recordedEntry ?? currentObservation.node;
|
||||
},
|
||||
get,
|
||||
recentBefore: ({ accountId, chatId, messageId, threadId, limit }) => {
|
||||
|
||||
Reference in New Issue
Block a user