diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 583024ae5c6..4f5e2484d50 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -963,6 +963,74 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliverReplies).not.toHaveBeenCalled(); }); + it("still finalizes the active preview after an archived final edit is retained", async () => { + let answerMessageId: number | undefined; + let answerDraftParams: + | { + onSupersededPreview?: (preview: { messageId: number; textSnapshot: string }) => void; + } + | undefined; + const answerDraftStream = { + update: vi.fn().mockImplementation((text: string) => { + if (text.includes("Message B")) { + answerMessageId = 1002; + } + }), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => answerMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn().mockImplementation(() => { + answerMessageId = undefined; + }), + }; + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce((params) => { + answerDraftParams = params as typeof answerDraftParams; + return answerDraftStream; + }) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + answerDraftParams?.onSupersededPreview?.({ + messageId: 1001, + textSnapshot: "Message A partial", + }); + + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram + .mockRejectedValueOnce(new Error("400: Bad Request: message to edit not found")) + .mockResolvedValueOnce({ ok: true, chatId: "123", messageId: "1002" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + expect(answerDraftStream.clear).not.toHaveBeenCalled(); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + it.each(["partial", "block"] as const)( "keeps finalized text preview when the next assistant message is media-only (%s mode)", async (streamMode) => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index af70bbd9a3f..4d8d2b678e8 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -38,7 +38,7 @@ import { createLaneTextDeliverer, type DraftLaneState, type LaneName, - type LanePreviewDisposition, + type LanePreviewLifecycle, } from "./lane-delivery.js"; import { createTelegramReasoningStepState, @@ -240,10 +240,17 @@ export const dispatchTelegramMessage = async ({ answer: createDraftLane("answer", canStreamAnswerDraft), reasoning: createDraftLane("reasoning", canStreamReasoningDraft), }; - const previewDispositionByLane: Record = { + // Active preview lifecycle answers "can this current preview still be + // finalized?" Cleanup retention is separate so archived-preview decisions do + // not poison the active lane. + const activePreviewLifecycleByLane: Record = { answer: "transient", reasoning: "transient", }; + const retainPreviewOnCleanupByLane: Record = { + answer: false, + reasoning: false, + }; const answerLane = lanes.answer; const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; @@ -289,7 +296,10 @@ export const dispatchTelegramMessage = async ({ // so it remains visible across tool boundaries. const materializedId = await answerLane.stream?.materialize?.(); const previewMessageId = materializedId ?? answerLane.stream?.messageId(); - if (typeof previewMessageId === "number" && previewDispositionByLane.answer === "transient") { + if ( + typeof previewMessageId === "number" && + activePreviewLifecycleByLane.answer === "transient" + ) { archivedAnswerPreviews.push({ messageId: previewMessageId, textSnapshot: answerLane.lastPartialText, @@ -302,7 +312,8 @@ export const dispatchTelegramMessage = async ({ resetDraftLaneState(answerLane); if (didForceNewMessage) { // New assistant message boundary: this lane now tracks a fresh preview lifecycle. - previewDispositionByLane.answer = "transient"; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; } return didForceNewMessage; }; @@ -332,7 +343,7 @@ export const dispatchTelegramMessage = async ({ const ingestDraftLaneSegments = async (text: string | undefined) => { const split = splitTextIntoLaneSegments(text); const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer"); - if (hasAnswerSegment && previewDispositionByLane.answer !== "transient") { + if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") { // Some providers can emit the first partial of a new assistant message before // onAssistantMessageStart() arrives. Rotate preemptively so we do not edit // the previously finalized preview message with the next message's text. @@ -470,7 +481,8 @@ export const dispatchTelegramMessage = async ({ const deliverLaneText = createLaneTextDeliverer({ lanes, archivedAnswerPreviews, - previewDispositionByLane, + activePreviewLifecycleByLane, + retainPreviewOnCleanupByLane, draftMaxChars, applyTextToPayload, sendPayload, @@ -597,7 +609,8 @@ export const dispatchTelegramMessage = async ({ } if (info.kind === "final") { if (reasoningLane.hasStreamedMessage) { - previewDispositionByLane.reasoning = "finalized"; + activePreviewLifecycleByLane.reasoning = "complete"; + retainPreviewOnCleanupByLane.reasoning = true; } reasoningStepState.resetForNextStep(); } @@ -675,7 +688,8 @@ export const dispatchTelegramMessage = async ({ reasoningStepState.resetForNextStep(); if (skipNextAnswerMessageStartRotation) { skipNextAnswerMessageStartRotation = false; - previewDispositionByLane.answer = "transient"; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; return; } await rotateAnswerLaneForNewAssistantMessage(); @@ -683,7 +697,8 @@ export const dispatchTelegramMessage = async ({ // Even when no forceNewMessage happened (e.g. prior answer had no // streamed partials), the next partial belongs to a fresh lifecycle // and must not trigger late pre-rotation mid-message. - previewDispositionByLane.answer = "transient"; + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; }) : undefined, onReasoningEnd: reasoningLane.stream @@ -732,8 +747,7 @@ export const dispatchTelegramMessage = async ({ (p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId, ); const shouldClear = - previewDispositionByLane[laneState.laneName] === "transient" && - !hasBoundaryFinalizedActivePreview; + !retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview; const existing = streamCleanupStates.get(stream); if (!existing) { streamCleanupStates.set(stream, { shouldClear }); diff --git a/src/telegram/lane-delivery-text-deliverer.ts b/src/telegram/lane-delivery-text-deliverer.ts index 775a7f31e1f..c8eb10a9bb1 100644 --- a/src/telegram/lane-delivery-text-deliverer.ts +++ b/src/telegram/lane-delivery-text-deliverer.ts @@ -49,7 +49,7 @@ export type ArchivedPreview = { deleteIfUnused?: boolean; }; -export type LanePreviewDisposition = "transient" | "retained" | "finalized"; +export type LanePreviewLifecycle = "transient" | "complete"; export type LaneDeliveryResult = | "preview-finalized" @@ -61,7 +61,8 @@ export type LaneDeliveryResult = type CreateLaneTextDelivererParams = { lanes: Record; archivedAnswerPreviews: ArchivedPreview[]; - previewDispositionByLane: Record; + activePreviewLifecycleByLane: Record; + retainPreviewOnCleanupByLane: Record; draftMaxChars: number; applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload; sendPayload: (payload: ReplyPayload) => Promise; @@ -162,6 +163,10 @@ function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTarget export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; + const markActivePreviewComplete = (laneName: LaneName) => { + params.activePreviewLifecycleByLane[laneName] = "complete"; + params.retainPreviewOnCleanupByLane[laneName] = true; + }; const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft"; const canMaterializeDraftFinal = ( lane: DraftLaneState, @@ -401,7 +406,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { return "preview-finalized"; } if (finalized === "retained") { - params.previewDispositionByLane.answer = "retained"; + params.retainPreviewOnCleanupByLane.answer = true; return "preview-retained"; } } @@ -448,7 +453,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { return archivedResult; } } - if (canEditViaPreview && params.previewDispositionByLane[laneName] === "transient") { + if (canEditViaPreview && params.activePreviewLifecycleByLane[laneName] === "transient") { await params.flushDraftLane(lane); if (laneName === "answer") { const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ @@ -469,7 +474,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { text, }); if (materialized) { - params.previewDispositionByLane[laneName] = "finalized"; + markActivePreviewComplete(laneName); return "preview-finalized"; } } @@ -483,11 +488,11 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { context: "final", }); if (finalized === "edited") { - params.previewDispositionByLane[laneName] = "finalized"; + markActivePreviewComplete(laneName); return "preview-finalized"; } if (finalized === "retained") { - params.previewDispositionByLane[laneName] = "retained"; + markActivePreviewComplete(laneName); return "preview-retained"; } } else if (!hasMedia && !payload.isError && text.length > params.draftMaxChars) { diff --git a/src/telegram/lane-delivery.test.ts b/src/telegram/lane-delivery.test.ts index f33401875f8..a2dae1f05b9 100644 --- a/src/telegram/lane-delivery.test.ts +++ b/src/telegram/lane-delivery.test.ts @@ -42,7 +42,8 @@ function createHarness(params?: { const deletePreviewMessage = vi.fn().mockResolvedValue(undefined); const log = vi.fn(); const markDelivered = vi.fn(); - const previewDispositionByLane = { answer: "transient", reasoning: "transient" } as const; + const activePreviewLifecycleByLane = { answer: "transient", reasoning: "transient" } as const; + const retainPreviewOnCleanupByLane = { answer: false, reasoning: false } as const; const archivedAnswerPreviews: Array<{ messageId: number; textSnapshot: string; @@ -52,7 +53,8 @@ function createHarness(params?: { const deliverLaneText = createLaneTextDeliverer({ lanes, archivedAnswerPreviews, - previewDispositionByLane: { ...previewDispositionByLane }, + activePreviewLifecycleByLane: { ...activePreviewLifecycleByLane }, + retainPreviewOnCleanupByLane: { ...retainPreviewOnCleanupByLane }, draftMaxChars: params?.draftMaxChars ?? 4_096, applyTextToPayload: (payload: ReplyPayload, text: string) => ({ ...payload, text }), sendPayload, diff --git a/src/telegram/lane-delivery.ts b/src/telegram/lane-delivery.ts index 46c6fda235f..a9114b281ff 100644 --- a/src/telegram/lane-delivery.ts +++ b/src/telegram/lane-delivery.ts @@ -4,7 +4,7 @@ export { type DraftLaneState, type LaneDeliveryResult, type LaneName, - type LanePreviewDisposition, + type LanePreviewLifecycle, } from "./lane-delivery-text-deliverer.js"; export { createLaneDeliveryStateTracker,