fix(telegram): reposition streaming window post-first, delete-old-deferred

Peter isolated the on-off focus-jump precisely: when a durable 🧠 posts BELOW
the streaming window, the window (now above the reasoning) is repositioned to
stay newest by DELETING it and reposting below. Telegram cannot move messages,
so the delete-then-repost order scroll-jumps the client.

Root cause: rotateAnswerLaneAfterToolProgress rewound the tool-progress window
with stream.clear(), which deletes the old message IMMEDIATELY when it has been
on screen past the dwell — before the replacement message is sent. Delete-first,
post-second.

Fix — invert the order, preserving arrival order:
- draft-stream: new rotateToNewMessageDeferringDelete() rewinds the stream so the
  NEXT update creates a fresh message, and schedules the superseded message's
  delete for AFTER it (detached, floored at 1.5s so the new message lands first).
  Extracted the shared deferred-delete scheduler (scheduleDetachedDelete) used by
  both clear() and the reposition. (draft-stream.ts)
- dispatch: rotateAnswerLaneAfterToolProgress now repositions via that method
  instead of clear()+forceNewMessage, so no window reposition deletes before the
  replacement lands. (bot-message-dispatch.ts:1259)

Tests: draft-stream unit tests prove the sequencing (new message sent before the
old is deleted; delete deferred; no-op with no live message). Added a dispatch
repro (durable 🧠 then answer text mid-turn -> reposition, no clear). Updated the
predating tool-progress-rotation tests to assert the deferred-delete reposition
and that any deliverer-cleanup clear() runs only AFTER the rewind. 228 green
across bot-message-dispatch, draft-stream, progress-summary; extensions/telegram
typechecks clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Peter Lindsey
2026-07-02 20:51:51 +08:00
parent e4e02b0bf6
commit cc9c2b13a9
5 changed files with 228 additions and 43 deletions

View File

@@ -2551,8 +2551,16 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect.objectContaining({ text: expect.stringMatching(/🛠️ Exec<\/b>$/) }),
);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Final answer");
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2);
// The tool-progress window repositions before the final (deferred delete),
// never an immediate clear/delete.
expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1);
// The reposition rewinds the stream BEFORE any deliverer cleanup clear(),
// so that clear finds no live message id and never deletes the window.
if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) {
expect(
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0],
).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]);
}
const progressResetOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
const progressUpdateOrder = answerDraftStream.updatePreview.mock.invocationCallOrder[0];
expect(progressResetOrder).toBeLessThan(progressUpdateOrder);
@@ -2579,8 +2587,16 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Site B shows Y.");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(3, "Final answer");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2);
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
// The tool-progress window repositions (deferred delete) rather than an
// immediate clear when the following text block takes over the lane.
expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1);
// The reposition rewinds the stream BEFORE any deliverer cleanup clear(),
// so that clear finds no live message id and never deletes the window.
if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) {
expect(
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0],
).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]);
}
expect(deliverReplies).not.toHaveBeenCalled();
});
@@ -2620,12 +2636,20 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect.objectContaining({ text: expect.stringMatching(/🛠️ Exec<\/b>$/) }),
);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Branch is up to date");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
const clearOrder = answerDraftStream.clear.mock.invocationCallOrder[0];
const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
// Reposition, not delete-then-repost: the tool-progress window is rewound
// for a new message and its delete deferred until after the replacement
// lands. clear() (immediate delete) must NOT run — that scroll-jumps.
expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1);
// The reposition rewinds the stream BEFORE any deliverer cleanup clear(),
// so that clear finds no live message id and never deletes the window.
if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) {
expect(
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0],
).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]);
}
const rotationOrder =
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0];
const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0];
expect(clearOrder).toBeLessThan(rotationOrder);
expect(rotationOrder).toBeLessThan(finalUpdateOrder);
});
@@ -2646,12 +2670,19 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect.objectContaining({ text: expect.stringMatching(/🛠️ Exec<\/b>$/) }),
);
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "Branch is up to date");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
const clearOrder = answerDraftStream.clear.mock.invocationCallOrder[0];
const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
// Across an assistant boundary the tool-progress window still repositions
// (new message first, deferred delete) rather than deleting immediately.
expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1);
// The reposition rewinds the stream BEFORE any deliverer cleanup clear(),
// so that clear finds no live message id and never deletes the window.
if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) {
expect(
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0],
).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]);
}
const rotationOrder =
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0];
const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[0];
expect(clearOrder).toBeLessThan(rotationOrder);
expect(rotationOrder).toBeLessThan(finalUpdateOrder);
});
@@ -2667,12 +2698,19 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(answerDraftStream.update).toHaveBeenNthCalledWith(1, "🛠️ Exec: pnpm test");
expect(answerDraftStream.update).toHaveBeenNthCalledWith(2, "Tests passed");
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(answerDraftStream.clear).toHaveBeenCalledTimes(1);
const clearOrder = answerDraftStream.clear.mock.invocationCallOrder[0];
const rotationOrder = answerDraftStream.forceNewMessage.mock.invocationCallOrder[0];
// Verbose tool result window repositions before the final: new message
// first, superseded delete deferred (no immediate clear/delete).
expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1);
// The reposition rewinds the stream BEFORE any deliverer cleanup clear(),
// so that clear finds no live message id and never deletes the window.
if (answerDraftStream.clear.mock.invocationCallOrder.length > 0) {
expect(
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0],
).toBeLessThan(answerDraftStream.clear.mock.invocationCallOrder[0]);
}
const rotationOrder =
answerDraftStream.rotateToNewMessageDeferringDelete.mock.invocationCallOrder[0];
const finalUpdateOrder = answerDraftStream.update.mock.invocationCallOrder[1];
expect(clearOrder).toBeLessThan(rotationOrder);
expect(rotationOrder).toBeLessThan(finalUpdateOrder);
});
@@ -2863,6 +2901,42 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(texts).toContain("Done");
});
it("repositions the tool-progress window (deferred delete) when text follows durable reasoning", async () => {
// on-off mid-stream jump: a durable 🧠 posts BELOW the tool-progress window;
// when answer text then takes over the lane, the window must reposition
// (send-new-first, delete-old-deferred) rather than delete-then-repost,
// which scroll-jumps the Telegram client.
loadSessionStore.mockReturnValue({ s1: { reasoningLevel: "on" } });
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await dispatcherOptions.deliver(
{ text: "<think>hidden</think>", isReasoning: true },
{ kind: "block" },
);
// Answer text mid-turn takes the lane over from the tool-progress window.
await dispatcherOptions.deliver({ text: "Here is the answer" }, { kind: "block" });
await dispatcherOptions.deliver({ text: "Here is the answer." }, { kind: "final" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext({
ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"],
}),
streamMode: "progress",
telegramCfg: { streaming: { mode: "progress" } },
});
// The tool-progress window was repositioned via the deferred-delete path,
// never an immediate clear() (which deletes the window above the durable 🧠
// and reposts below — the focus-jump).
expect(answerDraftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalled();
expect(answerDraftStream.clear).not.toHaveBeenCalled();
});
it("posts the collapse bar durably with no delete when the window has no live message", async () => {
// When finalizeToPreview cannot edit in place (no live window message id),
// the bar is still surfaced — as a durable post — and the window is NOT
@@ -3773,9 +3847,13 @@ describe("dispatchTelegramMessage draft streaming", () => {
"<b>Shelling</b>\n<b>🔎 Web Search</b> <code>docs lookup</code>\n<b>Update</b> <code>tests passed</code>",
),
);
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
expect(draftStream.materialize).not.toHaveBeenCalled();
expect(draftStream.clear).toHaveBeenCalledTimes(1);
// A tool-progress-only window with nothing to summarize is torn down via the
// deferred-delete reposition (new content first, delete later), not a bare
// immediate clear/delete or forceNewMessage.
expect(draftStream.rotateToNewMessageDeferringDelete).toHaveBeenCalledTimes(1);
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(draftStream.clear).not.toHaveBeenCalled();
expectDeliveredReply(0, { text: "Final after tool" });
expect(editMessageTelegram).not.toHaveBeenCalled();
});

View File

@@ -1260,8 +1260,15 @@ export const dispatchTelegramMessage = async ({
if (!activeAnswerDraftIsToolProgressOnly) {
return false;
}
await answerLane.stream?.clear();
answerLane.stream?.forceNewMessage();
// Reposition, don't delete-then-repost: rewind so the replacement message
// sends below, and defer the tool-progress window's delete until after it
// lands. Deleting first (clear) scroll-jumps the client when a durable 🧠
// was posted between the window and the replacement (the on-off jump).
if (answerLane.stream?.rotateToNewMessageDeferringDelete) {
answerLane.stream.rotateToNewMessageDeferringDelete();
} else {
answerLane.stream?.forceNewMessage();
}
resetDraftLaneState(answerLane);
suppressProgressDraftState();
rotateAnswerLaneWhenQueuedBlocksSettle = false;

View File

@@ -18,6 +18,7 @@ type TestDraftStream = {
typeof vi.fn<(preview: TelegramDraftPreview) => Promise<number | undefined>>
>;
forceNewMessage: ReturnType<typeof vi.fn<() => void>>;
rotateToNewMessageDeferringDelete: ReturnType<typeof vi.fn<() => number | undefined>>;
sendMayHaveLanded: ReturnType<typeof vi.fn<() => boolean>>;
setMessageId: (value: number | undefined) => void;
};
@@ -85,6 +86,18 @@ export function createTestDraftStream(params?: {
}
visibleSinceMs = undefined;
}),
rotateToNewMessageDeferringDelete: vi.fn().mockImplementation(() => {
// Mirror forceNewMessage's message-id handling (a sequenced harness swaps
// ids on the next send; the fixed harness keeps its id unless configured
// otherwise) so the rewind semantics match; return the superseded id.
const superseded = messageId;
stopped = false;
if (params?.clearMessageIdOnForceNew) {
messageId = undefined;
}
visibleSinceMs = undefined;
return superseded;
}),
sendMayHaveLanded: vi.fn().mockReturnValue(false),
setMessageId: (value: number | undefined) => {
messageId = value;
@@ -137,6 +150,12 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
activeMessageId = undefined;
visibleSinceMs = undefined;
}),
rotateToNewMessageDeferringDelete: vi.fn().mockImplementation(() => {
const superseded = activeMessageId;
activeMessageId = undefined;
visibleSinceMs = undefined;
return superseded;
}),
sendMayHaveLanded: vi.fn().mockReturnValue(false),
setMessageId: (value: number | undefined) => {
activeMessageId = value;

View File

@@ -380,6 +380,48 @@ describe("createTelegramDraftStream", () => {
}
});
it("rotateToNewMessageDeferringDelete posts the new message before deleting the old", async () => {
vi.useFakeTimers();
try {
const api = createMockDraftApi();
api.sendMessage
.mockResolvedValueOnce({ message_id: 17 })
.mockResolvedValueOnce({ message_id: 42 });
const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" });
stream.update("🛠️ Exec");
await stream.flush();
// Reposition: rewind for a new message; the old one's delete is deferred.
const superseded = stream.rotateToNewMessageDeferringDelete();
expect(superseded).toBe(17);
// The NEW message is sent first...
stream.update("Answer below");
await stream.flush();
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Answer below", {
message_thread_id: 42,
});
// ...and the superseded message is NOT deleted immediately (deferred so
// the new message lands first — no scroll-jump).
expect(api.deleteMessage).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(4_000);
expect(api.deleteMessage).toHaveBeenCalledWith(123, 17);
// Only the superseded (old) message is deleted; the new one stays.
expect(api.deleteMessage).toHaveBeenCalledTimes(1);
} finally {
vi.useRealTimers();
}
});
it("rotateToNewMessageDeferringDelete is a no-op with no live message", () => {
const api = createMockDraftApi();
const stream = createThreadedDraftStream(api, { id: 42, scope: "dm" });
expect(stream.rotateToNewMessageDeferringDelete()).toBeUndefined();
expect(api.deleteMessage).not.toHaveBeenCalled();
});
it("creates new message after forceNewMessage is called", async () => {
const { api, stream } = createForceNewMessageHarness();

View File

@@ -68,6 +68,13 @@ export type TelegramDraftStream = {
finalizeToPreview: (preview: TelegramDraftPreview) => Promise<number | undefined>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
/**
* Reposition the window: rewind so the next update creates a new message,
* and schedule the superseded message's delete for AFTER the new one lands
* (post-new-then-delete-old, never delete-then-repost — avoids the client
* scroll-jump). Returns the superseded message id, if any.
*/
rotateToNewMessageDeferringDelete: () => number | undefined;
/** True when a preview sendMessage was attempted but the response was lost. */
sendMayHaveLanded?: () => boolean;
};
@@ -546,6 +553,37 @@ export function createTelegramDraftStream(params: {
loop.resetThrottleWindow();
};
// Delete a superseded preview message DETACHED (scheduled, never awaited) so
// teardown is never stalled. The delay is at least the remaining on-screen
// dwell (so a preview is never flashed), and at least `minDelayMs` — a
// reposition passes a small floor so the NEW message has landed below before
// the old one disappears, keeping the viewport anchored instead of jumping.
const scheduleDetachedDelete = (
messageId: number,
visibleSince: number | undefined,
minDelayMs = 0,
) => {
const runDelete = async () => {
try {
await params.api.deleteMessage(chatId, messageId);
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
} catch (err) {
params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`);
}
};
const elapsedMs =
typeof visibleSince === "number" ? Date.now() - visibleSince : MIN_PREVIEW_DWELL_MS;
const remainingDwellMs = Math.max(0, MIN_PREVIEW_DWELL_MS - elapsedMs);
const delayMs = Math.max(remainingDwellMs, minDelayMs);
if (delayMs <= 0) {
void runDelete();
} else {
setTimeout(() => {
void runDelete();
}, delayMs);
}
};
const clear = async () => {
// Capture before the stop; takeMessageIdAfterStop resets streamVisibleSinceMs.
const visibleSince = streamVisibleSinceMs;
@@ -557,30 +595,30 @@ export function createTelegramDraftStream(params: {
},
});
if (typeof messageId === "number" && Number.isFinite(messageId)) {
const runDelete = async () => {
try {
await params.api.deleteMessage(chatId, messageId);
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
} catch (err) {
params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`);
}
};
// Keep the preview on screen for at least MIN_PREVIEW_DWELL_MS from when it
// first appeared, then delete DETACHED (scheduled, not awaited) so teardown
// is never stalled waiting for the dwell.
const elapsedMs =
typeof visibleSince === "number" ? Date.now() - visibleSince : MIN_PREVIEW_DWELL_MS;
const remainingMs = Math.max(0, MIN_PREVIEW_DWELL_MS - elapsedMs);
if (remainingMs <= 0) {
void runDelete();
} else {
setTimeout(() => {
void runDelete();
}, remainingMs);
}
// first appeared, then delete.
scheduleDetachedDelete(messageId, visibleSince);
}
};
// Reposition the window: rewind so the NEXT update creates a fresh message
// (below anything posted since), then delete the superseded one AFTER a short
// delay so the new message lands first. Post-new-then-delete-old — never
// delete-then-repost, which scroll-jumps the Telegram client (the on-off
// durable-🧠 jump). Returns the superseded message id (for tests).
const REPOSITION_DELETE_DELAY_MS = 1_500;
const rotateToNewMessageDeferringDelete = (): number | undefined => {
const supersededMessageId = streamMessageId;
const supersededVisibleSince = streamVisibleSinceMs;
// Rewind WITHOUT deleting; the old id is captured above.
resetStreamToNewMessage();
if (typeof supersededMessageId === "number" && Number.isFinite(supersededMessageId)) {
scheduleDetachedDelete(supersededMessageId, supersededVisibleSince, REPOSITION_DELETE_DELAY_MS);
return supersededMessageId;
}
return undefined;
};
const discard = async () => {
await stopForClear();
};
@@ -646,6 +684,7 @@ export function createTelegramDraftStream(params: {
materialize,
finalizeToPreview,
forceNewMessage,
rotateToNewMessageDeferringDelete,
sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",
};
}