From c65f356ddc958f1930b083ccba38103d16772993 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Cuevas?= Date: Fri, 17 Apr 2026 01:27:46 -0400 Subject: [PATCH] fix: keep telegram transient preview across compaction retry (#66939) (thanks @rubencu) * fix(telegram): keep transient previews across compaction * test(telegram): cover suppressed approval previews after compaction * fix(telegram): preserve delayed message-start boundaries * fix: keep telegram transient preview across compaction retry (#66939) (thanks @rubencu) --------- Co-authored-by: Ayaan Zaidi --- CHANGELOG.md | 1 + .../telegram/src/bot-message-dispatch.test.ts | 274 ++++++++++++++++++ .../telegram/src/bot-message-dispatch.ts | 33 ++- 3 files changed, 305 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2646aa6b0c..b6ec1a7a771 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai - Models/config: preserve an existing `models.json` provider `baseUrl` during merge-mode regeneration so custom endpoints do not get reset on restart. (#67893) Thanks @lawrence3699. - Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus. - Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus. +- Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu. ## 2026.4.15 diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index d34bc8dede7..08a63e0f422 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -800,6 +800,82 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("preserves pre-rotation skip until queued message-start callbacks flush", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await replyOptions?.onPartialReply?.({ text: "Message B early" }); + void replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + }); + + it("does not double-rotate when assistant_message_start arrives after final delivery drains", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await replyOptions?.onPartialReply?.({ text: "Message B early" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + await replyOptions?.onAssistantMessageStart?.(); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + }); + it("clears active preview even when an unrelated boundary archive exists", async () => { const answerDraftStream = createDraftStream(999); answerDraftStream.materialize.mockResolvedValue(4321); @@ -1054,6 +1130,204 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B second chunk"); }); + it("does not rotate the streamed preview when compaction retries replay the same assistant message", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onCompactionStart?.(); + await replyOptions?.onCompactionEnd?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onPartialReply?.({ text: "Message A partial extended" }); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(answerDraftStream.materialize).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 1001, + "Message A final", + expect.any(Object), + ); + }); + + it("clears the compaction replay skip after the retried message finalizes", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onCompactionStart?.(); + await replyOptions?.onCompactionEnd?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message A partial extended" }); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + }); + + it("preserves the compaction replay flag until queued retry callbacks flush", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onCompactionStart?.(); + await replyOptions?.onCompactionEnd?.(); + void replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 1001, + "Message A final", + expect.any(Object), + ); + }); + + it("keeps the existing preview when the retried answer only arrives as final text", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onCompactionStart?.(); + await replyOptions?.onCompactionEnd?.(); + await replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(answerDraftStream.materialize).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 1001, + "Message B final", + expect.any(Object), + ); + }); + + it("keeps the transient preview when a local exec approval prompt is suppressed after compaction", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onCompactionStart?.(); + await replyOptions?.onCompactionEnd?.(); + await dispatcherOptions.deliver( + { + text: "Approval required.\n\n```txt\n/approve 7f423fdc allow-once\n```", + channelData: { + execApproval: { + approvalId: "7f423fdc-1111-2222-3333-444444444444", + approvalSlug: "7f423fdc", + allowedDecisions: ["allow-once", "allow-always", "deny"], + }, + }, + }, + { kind: "tool" }, + ); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + cfg: { + channels: { + telegram: { + execApprovals: { + enabled: true, + approvers: ["12345"], + target: "dm", + }, + }, + }, + }, + }); + + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 1001, + "Message B final", + expect.any(Object), + ); + }); + it("finalizes multi-message assistant stream to matching preview messages in order", async () => { const answerDraftStream = createSequencedDraftStream(1001); const reasoningDraftStream = createDraftStream(); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index bbe0e4a0bc1..29937de7df4 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -280,6 +280,10 @@ export const dispatchTelegramMessage = async ({ const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; let skipNextAnswerMessageStartRotation = false; + // If compaction interrupts a still-transient answer preview, keep the next + // assistant-message boundary on that same preview instead of materializing a + // duplicate retry message. + let pendingCompactionReplayBoundary = false; let draftLaneEventQueue = Promise.resolve(); const reasoningStepState = createTelegramReasoningStepState(); const enqueueDraftLaneEvent = (task: () => Promise): Promise => { @@ -693,6 +697,9 @@ export const dispatchTelegramMessage = async ({ } } if (segments.length > 0) { + if (info.kind === "final") { + pendingCompactionReplayBoundary = false; + } return; } if (split.suppressedReasoningOnly) { @@ -703,6 +710,7 @@ export const dispatchTelegramMessage = async ({ } if (info.kind === "final") { await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; } return; } @@ -716,12 +724,14 @@ export const dispatchTelegramMessage = async ({ if (!canSendAsIs) { if (info.kind === "final") { await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; } return; } await sendPayload(payload); if (info.kind === "final") { await flushBufferedFinalAnswer(); + pendingCompactionReplayBoundary = false; } }, onSkip: (payload, info) => { @@ -793,6 +803,12 @@ export const dispatchTelegramMessage = async ({ retainPreviewOnCleanupByLane.answer = false; return; } + if (pendingCompactionReplayBoundary) { + pendingCompactionReplayBoundary = false; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + return; + } await rotateAnswerLaneForNewAssistantMessage(); // Message-start is an explicit assistant-message boundary. // Even when no forceNewMessage happened (e.g. prior answer had no @@ -817,9 +833,20 @@ export const dispatchTelegramMessage = async ({ } } : undefined, - onCompactionStart: statusReactionController - ? () => statusReactionController.setCompacting() - : undefined, + onCompactionStart: + statusReactionController || answerLane.stream + ? async () => { + if ( + answerLane.hasStreamedMessage && + activePreviewLifecycleByLane.answer === "transient" + ) { + pendingCompactionReplayBoundary = true; + } + if (statusReactionController) { + await statusReactionController.setCompacting(); + } + } + : undefined, onCompactionEnd: statusReactionController ? async () => { statusReactionController.cancelPending();