From cc9c2b13a94c59e6169739251be3d75bff3e282f Mon Sep 17 00:00:00 2001 From: Peter Lindsey Date: Thu, 2 Jul 2026 20:51:51 +0800 Subject: [PATCH] fix(telegram): reposition streaming window post-first, delete-old-deferred MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Peter isolated the on-off focus-jump precisely: when a durable 🧠 posts BELOW the streaming window, the window (now above the reasoning) is repositioned to stay newest by DELETING it and reposting below. Telegram cannot move messages, so the delete-then-repost order scroll-jumps the client. Root cause: rotateAnswerLaneAfterToolProgress rewound the tool-progress window with stream.clear(), which deletes the old message IMMEDIATELY when it has been on screen past the dwell — before the replacement message is sent. Delete-first, post-second. Fix — invert the order, preserving arrival order: - draft-stream: new rotateToNewMessageDeferringDelete() rewinds the stream so the NEXT update creates a fresh message, and schedules the superseded message's delete for AFTER it (detached, floored at 1.5s so the new message lands first). Extracted the shared deferred-delete scheduler (scheduleDetachedDelete) used by both clear() and the reposition. (draft-stream.ts) - dispatch: rotateAnswerLaneAfterToolProgress now repositions via that method instead of clear()+forceNewMessage, so no window reposition deletes before the replacement lands. (bot-message-dispatch.ts:1259) Tests: draft-stream unit tests prove the sequencing (new message sent before the old is deleted; delete deferred; no-op with no live message). Added a dispatch repro (durable 🧠 then answer text mid-turn -> reposition, no clear). Updated the predating tool-progress-rotation tests to assert the deferred-delete reposition and that any deliverer-cleanup clear() runs only AFTER the rewind. 228 green across bot-message-dispatch, draft-stream, progress-summary; extensions/telegram typechecks clean. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../telegram/src/bot-message-dispatch.test.ts | 120 +++++++++++++++--- .../telegram/src/bot-message-dispatch.ts | 11 +- .../telegram/src/draft-stream.test-helpers.ts | 19 +++ extensions/telegram/src/draft-stream.test.ts | 42 ++++++ extensions/telegram/src/draft-stream.ts | 79 +++++++++--- 5 files changed, 228 insertions(+), 43 deletions(-) diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index f9c445be913f..4a26fb592e71 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -2551,8 +2551,16 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ text: expect.stringMatching(/🛠️ Exec<\/b>$/) }), ); expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Final answer"); - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2); + // The tool-progress window repositions before the final (deferred delete), + // never an immediate clear/delete. + expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1); + // The reposition rewinds the stream BEFORE any deliverer cleanup clear(), + // so that clear finds no live message id and never deletes the window. + if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) { + expect( + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0], + ).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]); + } const progressResetOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; const progressUpdateOrder = answerDraftStream.updatePreview.mock.invocationCallOrder[0]; expect(progressResetOrder).toBeLessThan(progressUpdateOrder); @@ -2579,8 +2587,16 @@ describe("dispatchTelegramMessage draft streaming", () => { ); expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Site B shows Y."); expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Final answer"); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2); - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); + // The tool-progress window repositions (deferred delete) rather than an + // immediate clear when the following text block takes over the lane. + expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1); + // The reposition rewinds the stream BEFORE any deliverer cleanup clear(), + // so that clear finds no live message id and never deletes the window. + if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) { + expect( + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0], + ).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]); + } expect(deliverReplies).not.toHaveBeenCalled(); }); @@ -2620,12 +2636,20 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ text: expect.stringMatching(/🛠️ Exec<\/b>$/) }), ); expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Branch is up to date"); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); - const clearOrder = answerDraftStream.clear.mock.invocationCallOrder[0]; - const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; + // Reposition, not delete-then-repost: the tool-progress window is rewound + // for a new message and its delete deferred until after the replacement + // lands. clear() (immediate delete) must NOT run — that scroll-jumps. + expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1); + // The reposition rewinds the stream BEFORE any deliverer cleanup clear(), + // so that clear finds no live message id and never deletes the window. + if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) { + expect( + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0], + ).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]); + } + const rotationOrder = + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0]; const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0]; - expect(clearOrder).toBeLessThan(rotationOrder); expect(rotationOrder).toBeLessThan(finalUpdateOrder); }); @@ -2646,12 +2670,19 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ text: expect.stringMatching(/🛠️ Exec<\/b>$/) }), ); expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Branch is up to date"); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); - const clearOrder = answerDraftStream.clear.mock.invocationCallOrder[0]; - const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; + // Across an assistant boundary the tool-progress window still repositions + // (new message first, deferred delete) rather than deleting immediately. + expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1); + // The reposition rewinds the stream BEFORE any deliverer cleanup clear(), + // so that clear finds no live message id and never deletes the window. + if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) { + expect( + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0], + ).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]); + } + const rotationOrder = + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0]; const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0]; - expect(clearOrder).toBeLessThan(rotationOrder); expect(rotationOrder).toBeLessThan(finalUpdateOrder); }); @@ -2667,12 +2698,19 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "🛠️ Exec: pnpm test"); expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Tests passed"); - expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); - expect(answerDraftStream.clear).toHaveBeenCalledTimes(1); - const clearOrder = answerDraftStream.clear.mock.invocationCallOrder[0]; - const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0]; + // Verbose tool result window repositions before the final: new message + // first, superseded delete deferred (no immediate clear/delete). + expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1); + // The reposition rewinds the stream BEFORE any deliverer cleanup clear(), + // so that clear finds no live message id and never deletes the window. + if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) { + expect( + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0], + ).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]); + } + const rotationOrder = + answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0]; const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1]; - expect(clearOrder).toBeLessThan(rotationOrder); expect(rotationOrder).toBeLessThan(finalUpdateOrder); }); @@ -2863,6 +2901,42 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(texts).toContain("Done"); }); + it("repositions the tool-progress window (deferred delete) when text follows durable reasoning", async () => { + // on-off mid-stream jump: a durable 🧠 posts BELOW the tool-progress window; + // when answer text then takes over the lane, the window must reposition + // (send-new-first, delete-old-deferred) rather than delete-then-repost, + // which scroll-jumps the Telegram client. + loadSessionStore.mockReturnValue({ s1: { reasoningLevel: "on" } }); + const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await dispatcherOptions.deliver( + { text: "hidden", isReasoning: true }, + { kind: "block" }, + ); + // Answer text mid-turn takes the lane over from the tool-progress window. + await dispatcherOptions.deliver({ text: "Here is the answer" }, { kind: "block" }); + await dispatcherOptions.deliver({ text: "Here is the answer." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext({ + ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"], + }), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress" } }, + }); + + // The tool-progress window was repositioned via the deferred-delete path, + // never an immediate clear() (which deletes the window above the durable 🧠 + // and reposts below — the focus-jump). + expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalled(); + expect(answerDraftStream.clear).not.toHaveBeenCalled(); + }); + it("posts the collapse bar durably with no delete when the window has no live message", async () => { // When finalizeToPreview cannot edit in place (no live window message id), // the bar is still surfaced — as a durable post — and the window is NOT @@ -3773,9 +3847,13 @@ describe("dispatchTelegramMessage draft streaming", () => { "Shelling\n🔎 Web Search docs lookup\nUpdate tests passed", ), ); - expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1); expect(draftStream.materialize).not.toHaveBeenCalled(); - expect(draftStream.clear).toHaveBeenCalledTimes(1); + // A tool-progress-only window with nothing to summarize is torn down via the + // deferred-delete reposition (new content first, delete later), not a bare + // immediate clear/delete or forceNewMessage. + expect(draftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1); + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(draftStream.clear).not.toHaveBeenCalled(); expectDeliveredReply(0, { text: "Final after tool" }); expect(editMessageTelegram).not.toHaveBeenCalled(); }); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index a220e58cbc77..de1fac85745b 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -1260,8 +1260,15 @@ export const dispatchTelegramMessage = async ({ if (!activeAnswerDraftIsToolProgressOnly) { return false; } - await answerLane.stream?.clear(); - answerLane.stream?.forceNewMessage(); + // Reposition, don't delete-then-repost: rewind so the replacement message + // sends below, and defer the tool-progress window's delete until after it + // lands. Deleting first (clear) scroll-jumps the client when a durable 🧠 + // was posted between the window and the replacement (the on-off jump). + if (answerLane.stream?.rotateToNewMessageDeferringDelete) { + answerLane.stream.rotateToNewMessageDeferringDelete(); + } else { + answerLane.stream?.forceNewMessage(); + } resetDraftLaneState(answerLane); suppressProgressDraftState(); rotateAnswerLaneWhenQueuedBlocksSettle = false; diff --git a/extensions/telegram/src/draft-stream.test-helpers.ts b/extensions/telegram/src/draft-stream.test-helpers.ts index d60a755eaad9..55badf065c3a 100644 --- a/extensions/telegram/src/draft-stream.test-helpers.ts +++ b/extensions/telegram/src/draft-stream.test-helpers.ts @@ -18,6 +18,7 @@ type TestDraftStream = { typeof vi.fn<(preview: TelegramDraftPreview) => Promise> >; forceNewMessage: ReturnType void>>; + rotateToNewMessageDeferringDelete: ReturnType number | undefined>>; sendMayHaveLanded: ReturnType boolean>>; setMessageId: (value: number | undefined) => void; }; @@ -85,6 +86,18 @@ export function createTestDraftStream(params?: { } visibleSinceMs = undefined; }), + rotateToNewMessageDeferringDelete: vi.fn().mockImplementation(() => { + // Mirror forceNewMessage's message-id handling (a sequenced harness swaps + // ids on the next send; the fixed harness keeps its id unless configured + // otherwise) so the rewind semantics match; return the superseded id. + const superseded = messageId; + stopped = false; + if (params?.clearMessageIdOnForceNew) { + messageId = undefined; + } + visibleSinceMs = undefined; + return superseded; + }), sendMayHaveLanded: vi.fn().mockReturnValue(false), setMessageId: (value: number | undefined) => { messageId = value; @@ -137,6 +150,12 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft activeMessageId = undefined; visibleSinceMs = undefined; }), + rotateToNewMessageDeferringDelete: vi.fn().mockImplementation(() => { + const superseded = activeMessageId; + activeMessageId = undefined; + visibleSinceMs = undefined; + return superseded; + }), sendMayHaveLanded: vi.fn().mockReturnValue(false), setMessageId: (value: number | undefined) => { activeMessageId = value; diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index fe945d92d8b7..f341be732ef0 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -380,6 +380,48 @@ describe("createTelegramDraftStream", () => { } }); + it("rotateToNewMessageDeferringDelete posts the new message before deleting the old", async () => { + vi.useFakeTimers(); + try { + const api = createMockDraftApi(); + api.sendMessage + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }); + const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" }); + + stream.update("🛠️ Exec"); + await stream.flush(); + // Reposition: rewind for a new message; the old one's delete is deferred. + const superseded = stream.rotateToNewMessageDeferringDelete(); + expect(superseded).toBe(17); + + // The NEW message is sent first... + stream.update("Answer below"); + await stream.flush(); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Answer below", { + message_thread_id: 42, + }); + // ...and the superseded message is NOT deleted immediately (deferred so + // the new message lands first — no scroll-jump). + expect(api.deleteMessage).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(4_000); + expect(api.deleteMessage).toHaveBeenCalledWith(123, 17); + // Only the superseded (old) message is deleted; the new one stays. + expect(api.deleteMessage).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + + it("rotateToNewMessageDeferringDelete is a no-op with no live message", () => { + const api = createMockDraftApi(); + const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" }); + + expect(stream.rotateToNewMessageDeferringDelete()).toBeUndefined(); + expect(api.deleteMessage).not.toHaveBeenCalled(); + }); + it("creates new message after forceNewMessage is called", async () => { const { api, stream } = createForceNewMessageHarness(); diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index 7c58021b7f9d..051f3cb39d0b 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -68,6 +68,13 @@ export type TelegramDraftStream = { finalizeToPreview: (preview: TelegramDraftPreview) => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; + /** + * Reposition the window: rewind so the next update creates a new message, + * and schedule the superseded message's delete for AFTER the new one lands + * (post-new-then-delete-old, never delete-then-repost — avoids the client + * scroll-jump). Returns the superseded message id, if any. + */ + rotateToNewMessageDeferringDelete: () => number | undefined; /** True when a preview sendMessage was attempted but the response was lost. */ sendMayHaveLanded?: () => boolean; }; @@ -546,6 +553,37 @@ export function createTelegramDraftStream(params: { loop.resetThrottleWindow(); }; + // Delete a superseded preview message DETACHED (scheduled, never awaited) so + // teardown is never stalled. The delay is at least the remaining on-screen + // dwell (so a preview is never flashed), and at least `minDelayMs` — a + // reposition passes a small floor so the NEW message has landed below before + // the old one disappears, keeping the viewport anchored instead of jumping. + const scheduleDetachedDelete = ( + messageId: number, + visibleSince: number | undefined, + minDelayMs = 0, + ) => { + const runDelete = async () => { + try { + await params.api.deleteMessage(chatId, messageId); + params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); + } catch (err) { + params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`); + } + }; + const elapsedMs = + typeof visibleSince === "number" ? Date.now() - visibleSince : MIN_PREVIEW_DWELL_MS; + const remainingDwellMs = Math.max(0, MIN_PREVIEW_DWELL_MS - elapsedMs); + const delayMs = Math.max(remainingDwellMs, minDelayMs); + if (delayMs <= 0) { + void runDelete(); + } else { + setTimeout(() => { + void runDelete(); + }, delayMs); + } + }; + const clear = async () => { // Capture before the stop; takeMessageIdAfterStop resets streamVisibleSinceMs. const visibleSince = streamVisibleSinceMs; @@ -557,30 +595,30 @@ export function createTelegramDraftStream(params: { }, }); if (typeof messageId === "number" && Number.isFinite(messageId)) { - const runDelete = async () => { - try { - await params.api.deleteMessage(chatId, messageId); - params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`); - } catch (err) { - params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`); - } - }; // Keep the preview on screen for at least MIN_PREVIEW_DWELL_MS from when it - // first appeared, then delete DETACHED (scheduled, not awaited) so teardown - // is never stalled waiting for the dwell. - const elapsedMs = - typeof visibleSince === "number" ? Date.now() - visibleSince : MIN_PREVIEW_DWELL_MS; - const remainingMs = Math.max(0, MIN_PREVIEW_DWELL_MS - elapsedMs); - if (remainingMs <= 0) { - void runDelete(); - } else { - setTimeout(() => { - void runDelete(); - }, remainingMs); - } + // first appeared, then delete. + scheduleDetachedDelete(messageId, visibleSince); } }; + // Reposition the window: rewind so the NEXT update creates a fresh message + // (below anything posted since), then delete the superseded one AFTER a short + // delay so the new message lands first. Post-new-then-delete-old — never + // delete-then-repost, which scroll-jumps the Telegram client (the on-off + // durable-🧠 jump). Returns the superseded message id (for tests). + const REPOSITION_DELETE_DELAY_MS = 1_500; + const rotateToNewMessageDeferringDelete = (): number | undefined => { + const supersededMessageId = streamMessageId; + const supersededVisibleSince = streamVisibleSinceMs; + // Rewind WITHOUT deleting; the old id is captured above. + resetStreamToNewMessage(); + if (typeof supersededMessageId === "number" && Number.isFinite(supersededMessageId)) { + scheduleDetachedDelete(supersededMessageId, supersededVisibleSince, REPOSITION_DELETE_DELAY_MS); + return supersededMessageId; + } + return undefined; + }; + const discard = async () => { await stopForClear(); }; @@ -646,6 +684,7 @@ export function createTelegramDraftStream(params: { materialize, finalizeToPreview, forceNewMessage, + rotateToNewMessageDeferringDelete, sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number", }; }