From 70eea0235fdc317802d4232bd8290b6216beb613 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 9 Mar 2026 16:16:12 +0530 Subject: [PATCH] fix: unify telegram streaming answer delivery --- CHANGELOG.md | 1 + src/telegram/bot-message-dispatch.test.ts | 512 ++----------------- src/telegram/bot-message-dispatch.ts | 230 ++++----- src/telegram/lane-delivery-text-deliverer.ts | 108 +--- src/telegram/lane-delivery.test.ts | 43 +- src/telegram/lane-delivery.ts | 1 - src/telegram/reasoning-lane-coordinator.ts | 25 - 7 files changed, 153 insertions(+), 767 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f987feeec35..a3e2d864723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes - macOS/LaunchAgent install: tighten LaunchAgent directory and plist permissions during install so launchd bootstrap does not fail when the target home path or generated plist inherited group/world-writable modes. +- Telegram/streaming: keep one answer preview lane per inbound turn and never send a replacement final text bubble when preview finalization edits fail, fixing duplicate split replies during streamed answers. ## 2026.3.8 diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 8972532e139..59595fe61e4 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -405,7 +405,7 @@ describe("dispatchTelegramMessage draft streaming", () => { }); it.each(["block", "partial"] as const)( - "forces new message when assistant message restarts (%s mode)", + "keeps one answer preview when assistant message restarts (%s mode)", async (streamMode) => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -419,146 +419,22 @@ describe("dispatchTelegramMessage draft streaming", () => { }, ); deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); await dispatchWithContext({ context: createContext(), streamMode }); - expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1); + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "First response After tool call", + expect.any(Object), + ); }, ); - it("materializes boundary preview and keeps it when no matching final arrives", async () => { - const answerDraftStream = createDraftStream(999); - answerDraftStream.materialize.mockResolvedValue(4321); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Before tool boundary" }); - await replyOptions?.onAssistantMessageStart?.(); - return { queuedFinal: false }; - }); - - const bot = createBot(); - await dispatchWithContext({ context: createContext(), streamMode: "partial", bot }); - - expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); - const deleteMessageCalls = ( - bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } } - ).deleteMessage.mock.calls; - expect(deleteMessageCalls).not.toContainEqual([123, 4321]); - }); - - it("waits for queued boundary rotation before final lane delivery", async () => { - const answerDraftStream = createSequencedDraftStream(1001); - let resolveMaterialize: ((value: number | undefined) => void) | undefined; - const materializePromise = new Promise((resolve) => { - resolveMaterialize = resolve; - }); - answerDraftStream.materialize.mockImplementation(() => materializePromise); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Message A partial" }); - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - const startPromise = replyOptions?.onAssistantMessageStart?.(); - const finalPromise = dispatcherOptions.deliver( - { text: "Message B final" }, - { kind: "final" }, - ); - resolveMaterialize?.(1001); - await startPromise; - await finalPromise; - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(editMessageTelegram).toHaveBeenCalledTimes(2); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); - }); - - it("clears active preview even when an unrelated boundary archive exists", async () => { - const answerDraftStream = createDraftStream(999); - answerDraftStream.materialize.mockResolvedValue(4321); - answerDraftStream.forceNewMessage.mockImplementation(() => { - answerDraftStream.setMessageId(5555); - }); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Before tool boundary" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Unfinalized next preview" }); - return { queuedFinal: false }; - }); - - const bot = createBot(); - await dispatchWithContext({ context: createContext(), streamMode: "partial", bot }); - - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); - const deleteMessageCalls = ( - bot.api as unknown as { deleteMessage: { mock: { calls: unknown[][] } } } - ).deleteMessage.mock.calls; - expect(deleteMessageCalls).not.toContainEqual([123, 4321]); - }); - - it("queues late partials behind async boundary materialization", async () => { - const answerDraftStream = createDraftStream(999); - let resolveMaterialize: ((value: number | undefined) => void) | undefined; - const materializePromise = new Promise((resolve) => { - resolveMaterialize = resolve; - }); - answerDraftStream.materialize.mockImplementation(() => materializePromise); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Message A partial" }); - - // Simulate provider fire-and-forget ordering: boundary callback starts - // and a new partial arrives before boundary materialization resolves. - const startPromise = replyOptions?.onAssistantMessageStart?.(); - const nextPartialPromise = replyOptions?.onPartialReply?.({ text: "Message B early" }); - - expect(answerDraftStream.update).toHaveBeenCalledTimes(1); - resolveMaterialize?.(4321); - - await startPromise; - await nextPartialPromise; - return { queuedFinal: false }; - }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - expect(answerDraftStream.materialize).toHaveBeenCalledTimes(1); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.update).toHaveBeenCalledTimes(2); - expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early"); - const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; - const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1]; - expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder); - }); - - it("keeps final-only preview lane finalized until a real boundary rotation happens", async () => { + it("keeps one answer lane when a later partial arrives before assistant message start", async () => { const answerDraftStream = createSequencedDraftStream(1001); const reasoningDraftStream = createDraftStream(); createTelegramDraftStream @@ -566,9 +442,7 @@ describe("dispatchTelegramMessage draft streaming", () => { .mockImplementationOnce(() => reasoningDraftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - // Final-only first response (no streamed partials). await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - // Simulate provider ordering bug: first chunk arrives before message-start callback. await replyOptions?.onPartialReply?.({ text: "Message B early" }); await replyOptions?.onAssistantMessageStart?.(); await replyOptions?.onPartialReply?.({ text: "Message B partial" }); @@ -581,173 +455,22 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 1001, - "Message A final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); - }); - - it("does not force new message on first assistant message start", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - // First assistant message starts (no previous output) - await replyOptions?.onAssistantMessageStart?.(); - // Partial updates - await replyOptions?.onPartialReply?.({ text: "Hello" }); - await replyOptions?.onPartialReply?.({ text: "Hello world" }); - await dispatcherOptions.deliver({ text: "Hello world" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - - await dispatchWithContext({ context: createContext(), streamMode: "block" }); - - // First message start shouldn't trigger forceNewMessage (no previous output) - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); - }); - - it("rotates before a late second-message partial so finalized preview is not overwritten", async () => { - const answerDraftStream = createSequencedDraftStream(1001); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Message A partial" }); - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - // Simulate provider ordering bug: first chunk arrives before message-start callback. - await replyOptions?.onPartialReply?.({ text: "Message B early" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Message B partial" }); - await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B early"); - const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; - const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1]; - expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 1001, - "Message A final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); - }); - - it("does not skip message-start rotation when pre-rotation did not force a new message", async () => { - const answerDraftStream = createSequencedDraftStream(1002); - answerDraftStream.setMessageId(1001); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - // First message has only final text (no streamed partials), so answer lane - // reaches finalized state with hasStreamedMessage still false. - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - // Provider ordering bug: next message partial arrives before message-start. - await replyOptions?.onPartialReply?.({ text: "Message B early" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Message B partial" }); - await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); - const bot = createBot(); - - await dispatchWithContext({ context: createContext(), streamMode: "partial", bot }); - - // Early pre-rotation could not force (no streamed partials yet), so the - // real assistant message_start must still rotate once. - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B early"); - expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B partial"); - const earlyUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0]; - const boundaryRotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; - const secondUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1]; - expect(earlyUpdateOrder).toBeLessThan(boundaryRotationOrder); - expect(boundaryRotationOrder).toBeLessThan(secondUpdateOrder); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 1001, - "Message A final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); - expect((bot.api.deleteMessage as ReturnType).mock.calls).toHaveLength(0); - }); - - it("does not trigger late pre-rotation mid-message after an explicit assistant message start", async () => { - const answerDraftStream = createDraftStream(1001); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - // Message A finalizes without streamed partials. - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - // Message B starts normally before partials. - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Message B first chunk" }); - await replyOptions?.onPartialReply?.({ text: "Message B second chunk" }); - await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - // The explicit message_start boundary must clear finalized state so - // same-message partials do not force a new preview mid-stream. expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); - expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message B first chunk"); - expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Message B second chunk"); + expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Message A final Message B early"); + expect(answerDraftStream.update).toHaveBeenNthCalledWith( + 2, + "Message A final Message B partial", + ); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 1001, + "Message A final Message B final", + expect.any(Object), + ); }); - it("finalizes multi-message assistant stream to matching preview messages in order", async () => { + it("collapses multi-message assistant finals into one final edit", async () => { const answerDraftStream = createSequencedDraftStream(1001); const reasoningDraftStream = createDraftStream(); createTelegramDraftStream @@ -760,7 +483,6 @@ describe("dispatchTelegramMessage draft streaming", () => { await replyOptions?.onPartialReply?.({ text: "Message B partial" }); await replyOptions?.onAssistantMessageStart?.(); await replyOptions?.onPartialReply?.({ text: "Message C partial" }); - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); await dispatcherOptions.deliver({ text: "Message C final" }, { kind: "final" }); @@ -772,111 +494,20 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 1001, - "Message A final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 3, - 123, - 1003, - "Message C final", - expect.any(Object), - ); - expect(deliverReplies).not.toHaveBeenCalled(); - }); - - it("maps finals correctly when first preview id resolves after message boundary", async () => { - let answerMessageId: number | undefined; - let answerDraftParams: - | { - onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void; - } - | undefined; - const answerDraftStream = { - update: vi.fn().mockImplementation((text: string) => { - if (text.includes("Message B")) { - answerMessageId = 1002; - } - }), - flush: vi.fn().mockResolvedValue(undefined), - messageId: vi.fn().mockImplementation(() => answerMessageId), - clear: vi.fn().mockResolvedValue(undefined), - stop: vi.fn().mockResolvedValue(undefined), - forceNewMessage: vi.fn().mockImplementation(() => { - answerMessageId = undefined; - }), - }; - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce((params) => { - answerDraftParams = params as typeof answerDraftParams; - return answerDraftStream; - }) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Message A partial" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Message B partial" }); - // Simulate late resolution of message A preview ID after boundary rotation. - answerDraftParams?.onSupersededPreview?.({ - messageId: 1001, - textSnapshot: "Message A partial", - }); - - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 1001, - "Message A final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram.mock.calls[0]?.[0]).toBe(123); + expect(editMessageTelegram.mock.calls[0]?.[1]).toBe(1001); + expect(editMessageTelegram.mock.calls[0]?.[2]).toContain("Message A final"); + expect(editMessageTelegram.mock.calls[0]?.[2]).toContain("Message B final"); + expect(editMessageTelegram.mock.calls[0]?.[2]).toContain("Message C final"); expect(deliverReplies).not.toHaveBeenCalled(); }); it.each(["partial", "block"] as const)( "keeps finalized text preview when the next assistant message is media-only (%s mode)", async (streamMode) => { - let answerMessageId: number | undefined = 1001; - const answerDraftStream = { - update: vi.fn(), - flush: vi.fn().mockResolvedValue(undefined), - messageId: vi.fn().mockImplementation(() => answerMessageId), - clear: vi.fn().mockResolvedValue(undefined), - stop: vi.fn().mockResolvedValue(undefined), - forceNewMessage: vi.fn().mockImplementation(() => { - answerMessageId = undefined; - }), - }; + const answerDraftStream = createDraftStream(1001); const reasoningDraftStream = createDraftStream(); createTelegramDraftStream .mockImplementationOnce(() => answerDraftStream) @@ -896,6 +527,7 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode, bot }); + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); expect(editMessageTelegram).toHaveBeenCalledWith( 123, 1001, @@ -909,75 +541,6 @@ describe("dispatchTelegramMessage draft streaming", () => { }, ); - it("maps finals correctly when archived preview id arrives during final flush", async () => { - let answerMessageId: number | undefined; - let answerDraftParams: - | { - onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void; - } - | undefined; - let emittedSupersededPreview = false; - const answerDraftStream = { - update: vi.fn().mockImplementation((text: string) => { - if (text.includes("Message B")) { - answerMessageId = 1002; - } - }), - flush: vi.fn().mockImplementation(async () => { - if (!emittedSupersededPreview) { - emittedSupersededPreview = true; - answerDraftParams?.onSupersededPreview?.({ - messageId: 1001, - textSnapshot: "Message A partial", - }); - } - }), - messageId: vi.fn().mockImplementation(() => answerMessageId), - clear: vi.fn().mockResolvedValue(undefined), - stop: vi.fn().mockResolvedValue(undefined), - forceNewMessage: vi.fn().mockImplementation(() => { - answerMessageId = undefined; - }), - }; - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce((params) => { - answerDraftParams = params as typeof answerDraftParams; - return answerDraftStream; - }) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Message A partial" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Message B partial" }); - await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); - await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 1001, - "Message A final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 1002, - "Message B final", - expect.any(Object), - ); - expect(deliverReplies).not.toHaveBeenCalled(); - }); - it.each(["block", "partial"] as const)( "splits reasoning lane only when a later reasoning block starts (%s mode)", async (streamMode) => { @@ -1349,9 +912,8 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createReasoningStreamContext(), streamMode: "partial" }); - expect(editMessageTelegram).toHaveBeenNthCalledWith(1, 123, 999, "3", expect.any(Object)); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, + expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "3", expect.any(Object)); + expect(editMessageTelegram).toHaveBeenCalledWith( 123, 111, "Reasoning:\nIf I count r in strawberry, I see positions 3, 8, and 9. So the total is 3.", @@ -1712,7 +1274,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ replies: [ expect.objectContaining({ - text: expect.stringContaining("No response"), + text: "Something went wrong while processing your request. Please try again.", }), ], }), @@ -1741,7 +1303,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ replies: [ expect.objectContaining({ - text: expect.stringContaining("No response"), + text: "Something went wrong while processing your request. Please try again.", }), ], }), diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 63e7b6e8e8f..03b9acd8c2b 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -32,7 +32,6 @@ import type { TelegramInlineButtons } from "./button-types.js"; import { createTelegramDraftStream } from "./draft-stream.js"; import { renderTelegramHtmlText } from "./format.js"; import { - type ArchivedPreview, createLaneDeliveryStateTracker, createLaneTextDeliverer, type DraftLaneState, @@ -49,6 +48,24 @@ const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again."; /** Minimum chars before sending first streaming message (improves push notification UX) */ const DRAFT_MIN_INITIAL_CHARS = 30; +const ANSWER_SEGMENT_NO_SPACE_BEFORE_RE = /^[,.;:!?)}\]]/; + +function appendAnswerSegment(prefix: string, segment: string): string { + if (!prefix) { + return segment; + } + if (!segment) { + return prefix; + } + if ( + /\s$/.test(prefix) || + /^\s/.test(segment) || + ANSWER_SEGMENT_NO_SPACE_BEFORE_RE.test(segment) + ) { + return `${prefix}${segment}`; + } + return `${prefix} ${segment}`; +} async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) { try { @@ -195,7 +212,6 @@ export const dispatchTelegramMessage = async ({ // a visible duplicate flash at finalize time. const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); - const archivedAnswerPreviews: ArchivedPreview[] = []; const archivedReasoningPreviewIds: number[] = []; const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => { const stream = enabled @@ -209,19 +225,11 @@ export const dispatchTelegramMessage = async ({ minInitialChars: draftMinInitialChars, renderText: renderDraftPreview, onSupersededPreview: - laneName === "answer" || laneName === "reasoning" + laneName === "reasoning" ? (preview) => { - if (laneName === "reasoning") { - if (!archivedReasoningPreviewIds.includes(preview.messageId)) { - archivedReasoningPreviewIds.push(preview.messageId); - } - return; + if (!archivedReasoningPreviewIds.includes(preview.messageId)) { + archivedReasoningPreviewIds.push(preview.messageId); } - archivedAnswerPreviews.push({ - messageId: preview.messageId, - textSnapshot: preview.textSnapshot, - deleteIfUnused: true, - }); } : undefined, log: logVerbose, @@ -245,7 +253,14 @@ export const dispatchTelegramMessage = async ({ const answerLane = lanes.answer; const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; - let skipNextAnswerMessageStartRotation = false; + let answerSegmentPrefixText = ""; + let pendingAnswerFinalSlots = 1; + let bufferedAnswerFinal: + | { + payload: ReplyPayload; + text: string; + } + | undefined; let draftLaneEventQueue = Promise.resolve(); const reasoningStepState = createTelegramReasoningStepState(); const enqueueDraftLaneEvent = (task: () => Promise): Promise => { @@ -276,34 +291,20 @@ export const dispatchTelegramMessage = async ({ Boolean(split.reasoningText) && suppressReasoning && !split.answerText, }; }; + const getCurrentAnswerText = () => bufferedAnswerFinal?.text ?? answerLane.lastPartialText; + const composeAnswerSegmentText = (text: string) => + appendAnswerSegment(answerSegmentPrefixText, text); + const rememberAnswerBoundary = () => { + answerSegmentPrefixText = getCurrentAnswerText(); + }; + const bufferAnswerFinal = (payload: ReplyPayload, text: string) => { + bufferedAnswerFinal = { payload, text }; + answerSegmentPrefixText = text; + }; const resetDraftLaneState = (lane: DraftLaneState) => { lane.lastPartialText = ""; lane.hasStreamedMessage = false; }; - const rotateAnswerLaneForNewAssistantMessage = async () => { - let didForceNewMessage = false; - if (answerLane.hasStreamedMessage) { - // Materialize the current streamed draft into a permanent message - // so it remains visible across tool boundaries. - const materializedId = await answerLane.stream?.materialize?.(); - const previewMessageId = materializedId ?? answerLane.stream?.messageId(); - if (typeof previewMessageId === "number" && !finalizedPreviewByLane.answer) { - archivedAnswerPreviews.push({ - messageId: previewMessageId, - textSnapshot: answerLane.lastPartialText, - deleteIfUnused: false, - }); - } - answerLane.stream?.forceNewMessage(); - didForceNewMessage = true; - } - resetDraftLaneState(answerLane); - if (didForceNewMessage) { - // New assistant message boundary: this lane now tracks a fresh preview lifecycle. - finalizedPreviewByLane.answer = false; - } - return didForceNewMessage; - }; const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => { const laneStream = lane.stream; if (!laneStream || !text) { @@ -329,19 +330,14 @@ export const dispatchTelegramMessage = async ({ }; const ingestDraftLaneSegments = async (text: string | undefined) => { const split = splitTextIntoLaneSegments(text); - const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer"); - if (hasAnswerSegment && finalizedPreviewByLane.answer) { - // Some providers can emit the first partial of a new assistant message before - // onAssistantMessageStart() arrives. Rotate preemptively so we do not edit - // the previously finalized preview message with the next message's text. - skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage(); - } for (const segment of split.segments) { if (segment.lane === "reasoning") { reasoningStepState.noteReasoningHint(); reasoningStepState.noteReasoningDelivered(); + updateDraftFromPartial(lanes.reasoning, segment.text); + continue; } - updateDraftFromPartial(lanes[segment.lane], segment.text); + updateDraftFromPartial(lanes.answer, composeAnswerSegmentText(segment.text)); } }; const flushDraftLane = async (lane: DraftLaneState) => { @@ -464,7 +460,6 @@ export const dispatchTelegramMessage = async ({ }; const deliverLaneText = createLaneTextDeliverer({ lanes, - archivedAnswerPreviews, finalizedPreviewByLane, draftMaxChars, applyTextToPayload, @@ -482,14 +477,29 @@ export const dispatchTelegramMessage = async ({ buttons: previewButtons, }); }, - deletePreviewMessage: async (messageId) => { - await bot.api.deleteMessage(chatId, messageId); - }, log: logVerbose, markDelivered: () => { deliveryState.markDelivered(); }, }); + const flushBufferedAnswerFinal = async () => { + if (!bufferedAnswerFinal) { + return; + } + const { payload, text } = bufferedAnswerFinal; + bufferedAnswerFinal = undefined; + const previewButtons = ( + payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined + )?.buttons; + await deliverLaneText({ + laneName: "answer", + text, + payload, + infoKind: "final", + previewButtons, + }); + reasoningStepState.resetForNextStep(); + }; let queuedFinal = false; @@ -530,59 +540,39 @@ export const dispatchTelegramMessage = async ({ const segments = split.segments; const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const flushBufferedFinalAnswer = async () => { - const buffered = reasoningStepState.takeBufferedFinalAnswer(); - if (!buffered) { - return; - } - const bufferedButtons = ( - buffered.payload.channelData?.telegram as - | { buttons?: TelegramInlineButtons } - | undefined - )?.buttons; - await deliverLaneText({ - laneName: "answer", - text: buffered.text, - payload: buffered.payload, - infoKind: "final", - previewButtons: bufferedButtons, - }); - reasoningStepState.resetForNextStep(); - }; - for (const segment of segments) { - if ( - segment.lane === "answer" && - info.kind === "final" && - reasoningStepState.shouldBufferFinalAnswer() - ) { - reasoningStepState.bufferFinalAnswer({ payload, text: segment.text }); - continue; - } if (segment.lane === "reasoning") { reasoningStepState.noteReasoningHint(); + const result = await deliverLaneText({ + laneName: "reasoning", + text: segment.text, + payload, + infoKind: info.kind, + previewButtons, + allowPreviewUpdateForNonFinal: true, + }); + if (result !== "skipped") { + reasoningStepState.noteReasoningDelivered(); + } + continue; } - const result = await deliverLaneText({ - laneName: segment.lane, - text: segment.text, + const answerText = composeAnswerSegmentText(segment.text); + if (info.kind === "final") { + if (pendingAnswerFinalSlots <= 0) { + await sendPayload(payload); + continue; + } + pendingAnswerFinalSlots -= 1; + bufferAnswerFinal(payload, answerText); + continue; + } + await deliverLaneText({ + laneName: "answer", + text: answerText, payload, infoKind: info.kind, previewButtons, - allowPreviewUpdateForNonFinal: segment.lane === "reasoning", }); - if (segment.lane === "reasoning") { - if (result !== "skipped") { - reasoningStepState.noteReasoningDelivered(); - await flushBufferedFinalAnswer(); - } - continue; - } - if (info.kind === "final") { - if (reasoningLane.hasStreamedMessage) { - finalizedPreviewByLane.reasoning = true; - } - reasoningStepState.resetForNextStep(); - } } if (segments.length > 0) { return; @@ -593,9 +583,6 @@ export const dispatchTelegramMessage = async ({ typeof payload.text === "string" ? { ...payload, text: "" } : payload; await sendPayload(payloadWithoutSuppressedReasoning); } - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - } return; } @@ -607,15 +594,9 @@ export const dispatchTelegramMessage = async ({ const canSendAsIs = hasMedia || (typeof payload.text === "string" && payload.text.length > 0); if (!canSendAsIs) { - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - } return; } await sendPayload(payload); - if (info.kind === "final") { - await flushBufferedFinalAnswer(); - } }, onSkip: (_payload, info) => { if (info.reason !== "silent") { @@ -655,17 +636,10 @@ export const dispatchTelegramMessage = async ({ ? () => enqueueDraftLaneEvent(async () => { reasoningStepState.resetForNextStep(); - if (skipNextAnswerMessageStartRotation) { - skipNextAnswerMessageStartRotation = false; - finalizedPreviewByLane.answer = false; - return; + if (getCurrentAnswerText()) { + pendingAnswerFinalSlots += 1; + rememberAnswerBoundary(); } - await rotateAnswerLaneForNewAssistantMessage(); - // Message-start is an explicit assistant-message boundary. - // Even when no forceNewMessage happened (e.g. prior answer had no - // streamed partials), the next partial belongs to a fresh lifecycle - // and must not trigger late pre-rotation mid-message. - finalizedPreviewByLane.answer = false; }) : undefined, onReasoningEnd: reasoningLane.stream @@ -683,6 +657,10 @@ export const dispatchTelegramMessage = async ({ onModelSelected, }, })); + await flushBufferedAnswerFinal(); + if (reasoningLane.hasStreamedMessage) { + finalizedPreviewByLane.reasoning = true; + } } catch (err) { dispatchError = err; runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`)); @@ -704,17 +682,7 @@ export const dispatchTelegramMessage = async ({ if (!stream) { continue; } - // Don't clear (delete) the stream if: (a) it was finalized, or - // (b) the active stream message is itself a boundary-finalized archive. - const activePreviewMessageId = stream.messageId(); - const hasBoundaryFinalizedActivePreview = - laneState.laneName === "answer" && - typeof activePreviewMessageId === "number" && - archivedAnswerPreviews.some( - (p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId, - ); - const shouldClear = - !finalizedPreviewByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview; + const shouldClear = !finalizedPreviewByLane[laneState.laneName]; const existing = streamCleanupStates.get(stream); if (!existing) { streamCleanupStates.set(stream, { shouldClear }); @@ -728,18 +696,6 @@ export const dispatchTelegramMessage = async ({ await stream.clear(); } } - for (const archivedPreview of archivedAnswerPreviews) { - if (archivedPreview.deleteIfUnused === false) { - continue; - } - try { - await bot.api.deleteMessage(chatId, archivedPreview.messageId); - } catch (err) { - logVerbose( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - } for (const messageId of archivedReasoningPreviewIds) { try { await bot.api.deleteMessage(chatId, messageId); diff --git a/src/telegram/lane-delivery-text-deliverer.ts b/src/telegram/lane-delivery-text-deliverer.ts index f244d086657..5e80432d6de 100644 --- a/src/telegram/lane-delivery-text-deliverer.ts +++ b/src/telegram/lane-delivery-text-deliverer.ts @@ -27,19 +27,10 @@ export type DraftLaneState = { hasStreamedMessage: boolean; }; -export type ArchivedPreview = { - messageId: number; - textSnapshot: string; - // Boundary-finalized previews should remain visible even if no matching - // final edit arrives; superseded previews can be safely deleted. - deleteIfUnused?: boolean; -}; - export type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; type CreateLaneTextDelivererParams = { lanes: Record; - archivedAnswerPreviews: ArchivedPreview[]; finalizedPreviewByLane: Record; draftMaxChars: number; applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload; @@ -53,7 +44,6 @@ type CreateLaneTextDelivererParams = { context: "final" | "update"; previewButtons?: TelegramInlineButtons; }) => Promise; - deletePreviewMessage: (messageId: number) => Promise; log: (message: string) => void; markDelivered: () => void; }; @@ -80,14 +70,6 @@ type TryUpdatePreviewParams = { previewTextSnapshot?: string; }; -type ConsumeArchivedAnswerPreviewParams = { - lane: DraftLaneState; - text: string; - payload: ReplyPayload; - previewButtons?: TelegramInlineButtons; - canEditViaPreview: boolean; -}; - type PreviewUpdateContext = "final" | "update"; type RegressiveSkipMode = "always" | "existingOnly"; @@ -140,6 +122,10 @@ function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTarget export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft"; + const markLaneDelivered = (lane: DraftLaneState, text: string) => { + lane.lastPartialText = text; + params.markDelivered(); + }; const canMaterializeDraftFinal = ( lane: DraftLaneState, previewButtons?: TelegramInlineButtons, @@ -171,8 +157,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { ); return false; } - args.lane.lastPartialText = args.text; - params.markDelivered(); + markLaneDelivered(args.lane, args.text); return true; }; @@ -194,13 +179,16 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { previewButtons: args.previewButtons, context: args.context, }); - if (args.updateLaneSnapshot) { + if (args.updateLaneSnapshot || args.context === "final") { args.lane.lastPartialText = args.text; } params.markDelivered(); return true; } catch (err) { if (isMessageNotModifiedError(err)) { + if (args.context === "final") { + args.lane.lastPartialText = args.text; + } params.log( `telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`, ); @@ -208,8 +196,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { return true; } if (args.treatEditFailureAsDelivered) { + if (args.context === "final") { + args.lane.lastPartialText = args.text; + } params.log( - `telegram: ${args.laneName} preview ${args.context} edit failed after stop-created flush; treating as delivered (${String(err)})`, + `telegram: ${args.laneName} preview ${args.context} edit failed; keeping existing preview (${String(err)})`, ); params.markDelivered(); return true; @@ -300,55 +291,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } return finalizePreview( previewTargetAfterStop.previewMessageId, - false, + context === "final", previewTargetAfterStop.hadPreviewMessage, ); }; - const consumeArchivedAnswerPreviewForFinal = async ({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }: ConsumeArchivedAnswerPreviewParams): Promise => { - const archivedPreview = params.archivedAnswerPreviews.shift(); - if (!archivedPreview) { - return undefined; - } - if (canEditViaPreview) { - const finalized = await tryUpdatePreviewForLane({ - lane, - laneName: "answer", - text, - previewButtons, - stopBeforeEdit: false, - skipRegressive: "existingOnly", - context: "final", - previewMessageId: archivedPreview.messageId, - previewTextSnapshot: archivedPreview.textSnapshot, - }); - if (finalized) { - return "preview-finalized"; - } - } - // Send the replacement message first, then clean up the old preview. - // This avoids the visual "disappear then reappear" flash. - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); - // Once this archived preview is consumed by a fallback final send, delete it - // regardless of deleteIfUnused. That flag only applies to unconsumed boundaries. - if (delivered || archivedPreview.deleteIfUnused !== false) { - try { - await params.deletePreviewMessage(archivedPreview.messageId); - } catch (err) { - params.log( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - } - return delivered ? "sent" : "skipped"; - }; - return async ({ laneName, text, @@ -363,32 +310,8 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { !hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError; if (infoKind === "final") { - if (laneName === "answer") { - const archivedResult = await consumeArchivedAnswerPreviewForFinal({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }); - if (archivedResult) { - return archivedResult; - } - } if (canEditViaPreview && !params.finalizedPreviewByLane[laneName]) { await params.flushDraftLane(lane); - if (laneName === "answer") { - const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }); - if (archivedResultAfterFlush) { - return archivedResultAfterFlush; - } - } if (canMaterializeDraftFinal(lane, previewButtons)) { const materialized = await tryMaterializeDraftPreviewForFinal({ lane, @@ -420,6 +343,9 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } await params.stopDraftLane(lane); const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + if (delivered) { + lane.lastPartialText = text; + } return delivered ? "sent" : "skipped"; } diff --git a/src/telegram/lane-delivery.test.ts b/src/telegram/lane-delivery.test.ts index 1cd1d36cf4c..41698ef00e1 100644 --- a/src/telegram/lane-delivery.test.ts +++ b/src/telegram/lane-delivery.test.ts @@ -39,19 +39,12 @@ function createHarness(params?: { await lane.stream?.stop(); }); const editPreview = vi.fn().mockResolvedValue(undefined); - const deletePreviewMessage = vi.fn().mockResolvedValue(undefined); const log = vi.fn(); const markDelivered = vi.fn(); const finalizedPreviewByLane: Record = { answer: false, reasoning: false }; - const archivedAnswerPreviews: Array<{ - messageId: number; - textSnapshot: string; - deleteIfUnused?: boolean; - }> = []; const deliverLaneText = createLaneTextDeliverer({ lanes, - archivedAnswerPreviews, finalizedPreviewByLane, draftMaxChars: params?.draftMaxChars ?? 4_096, applyTextToPayload: (payload: ReplyPayload, text: string) => ({ ...payload, text }), @@ -59,7 +52,6 @@ function createHarness(params?: { flushDraftLane, stopDraftLane, editPreview, - deletePreviewMessage, log, markDelivered, }); @@ -75,10 +67,8 @@ function createHarness(params?: { flushDraftLane, stopDraftLane, editPreview, - deletePreviewMessage, log, markDelivered, - archivedAnswerPreviews, }; } @@ -143,7 +133,7 @@ describe("createLaneTextDeliverer", () => { expect(result).toBe("preview-finalized"); expect(harness.editPreview).toHaveBeenCalledTimes(1); expect(harness.sendPayload).not.toHaveBeenCalled(); - expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("treating as delivered")); + expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("keeping existing preview")); }); it("treats 'message is not modified' preview edit errors as delivered", async () => { @@ -170,7 +160,7 @@ describe("createLaneTextDeliverer", () => { ); }); - it("falls back to normal delivery when editing an existing preview fails", async () => { + it("keeps existing preview when editing an existing preview fails", async () => { const harness = createHarness({ answerMessageId: 999 }); harness.editPreview.mockRejectedValue(new Error("500: preview edit failed")); @@ -181,11 +171,10 @@ describe("createLaneTextDeliverer", () => { infoKind: "final", }); - expect(result).toBe("sent"); + expect(result).toBe("preview-finalized"); expect(harness.editPreview).toHaveBeenCalledTimes(1); - expect(harness.sendPayload).toHaveBeenCalledWith( - expect.objectContaining({ text: "Hello final" }), - ); + expect(harness.sendPayload).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("keeping existing preview")); }); it("falls back to normal delivery when stop-created preview has no message id", async () => { @@ -361,26 +350,4 @@ describe("createLaneTextDeliverer", () => { ); expect(harness.markDelivered).not.toHaveBeenCalled(); }); - - it("deletes consumed boundary previews after fallback final send", async () => { - const harness = createHarness(); - harness.archivedAnswerPreviews.push({ - messageId: 4444, - textSnapshot: "Boundary preview", - deleteIfUnused: false, - }); - - const result = await harness.deliverLaneText({ - laneName: "answer", - text: "Final with media", - payload: { text: "Final with media", mediaUrl: "file:///tmp/example.png" }, - infoKind: "final", - }); - - expect(result).toBe("sent"); - expect(harness.sendPayload).toHaveBeenCalledWith( - expect.objectContaining({ text: "Final with media", mediaUrl: "file:///tmp/example.png" }), - ); - expect(harness.deletePreviewMessage).toHaveBeenCalledWith(4444); - }); }); diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts index 213b05e1158..c7b558de7a6 100644 --- a/src/telegram/lane-delivery.ts +++ b/src/telegram/lane-delivery.ts @@ -1,5 +1,4 @@ export { - type ArchivedPreview, createLaneTextDeliverer, type DraftLaneState, type LaneDeliveryResult, diff --git a/src/telegram/reasoning-lane-coordinator.ts b/src/telegram/reasoning-lane-coordinator.ts index a0207a39c72..90fad99e54d 100644 --- a/src/telegram/reasoning-lane-coordinator.ts +++ b/src/telegram/reasoning-lane-coordinator.ts @@ -1,5 +1,4 @@ import { formatReasoningMessage } from "../agents/pi-embedded-utils.js"; -import type { ReplyPayload } from "../auto-reply/types.js"; import { findCodeRegions, isInsideCode } from "../shared/text/code-regions.js"; import { stripReasoningTagsFromText } from "../shared/text/reasoning-tags.js"; @@ -87,14 +86,8 @@ export function splitTelegramReasoningText(text?: string): TelegramReasoningSpli return { reasoningText, answerText }; } -export type BufferedFinalAnswer = { - payload: ReplyPayload; - text: string; -}; - export function createTelegramReasoningStepState() { let reasoningStatus: "none" | "hinted" | "delivered" = "none"; - let bufferedFinalAnswer: BufferedFinalAnswer | undefined; const noteReasoningHint = () => { if (reasoningStatus === "none") { @@ -106,31 +99,13 @@ export function createTelegramReasoningStepState() { reasoningStatus = "delivered"; }; - const shouldBufferFinalAnswer = () => { - return reasoningStatus === "hinted" && !bufferedFinalAnswer; - }; - - const bufferFinalAnswer = (value: BufferedFinalAnswer) => { - bufferedFinalAnswer = value; - }; - - const takeBufferedFinalAnswer = (): BufferedFinalAnswer | undefined => { - const value = bufferedFinalAnswer; - bufferedFinalAnswer = undefined; - return value; - }; - const resetForNextStep = () => { reasoningStatus = "none"; - bufferedFinalAnswer = undefined; }; return { noteReasoningHint, noteReasoningDelivered, - shouldBufferFinalAnswer, - bufferFinalAnswer, - takeBufferedFinalAnswer, resetForNextStep, }; }