From ed3bf4ff143fdaed1bae01d6059c56d9226921cd Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 21 Feb 2026 17:31:53 +0530 Subject: [PATCH] refactor(telegram): reset draft throttle on message rotation --- src/channels/draft-stream-loop.ts | 8 +++ src/telegram/bot-message-dispatch.test.ts | 72 ----------------------- src/telegram/bot-message-dispatch.ts | 1 - src/telegram/draft-stream.test.ts | 33 +++++++++++ src/telegram/draft-stream.ts | 1 + 5 files changed, 42 insertions(+), 73 deletions(-) diff --git a/src/channels/draft-stream-loop.ts b/src/channels/draft-stream-loop.ts index 69f16c46d78..ed4656dd0f3 100644 --- a/src/channels/draft-stream-loop.ts +++ b/src/channels/draft-stream-loop.ts @@ -3,6 +3,7 @@ export type DraftStreamLoop = { flush: () => Promise; stop: () => void; resetPending: () => void; + resetThrottleWindow: () => void; waitForInFlight: () => Promise; }; @@ -87,6 +88,13 @@ export function createDraftStreamLoop(params: { resetPending: () => { pendingText = ""; }, + resetThrottleWindow: () => { + lastSentAt = 0; + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }, waitForInFlight: async () => { if (inFlightPromise) { await inFlightPromise; diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 1394bded08c..3a88b46026b 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -82,29 +82,6 @@ describe("dispatchTelegramMessage draft streaming", () => { }; } - function createFlushSequencedDraftStream(startMessageId = 2001) { - let activeMessageId: number | undefined; - let nextMessageId = startMessageId; - let pendingText = ""; - return { - update: vi.fn().mockImplementation((text: string) => { - pendingText = text; - }), - flush: vi.fn().mockImplementation(async () => { - if (pendingText && activeMessageId == null) { - activeMessageId = nextMessageId++; - } - }), - messageId: vi.fn().mockImplementation(() => activeMessageId), - clear: vi.fn().mockResolvedValue(undefined), - stop: vi.fn().mockResolvedValue(undefined), - forceNewMessage: vi.fn().mockImplementation(() => { - activeMessageId = undefined; - pendingText = ""; - }), - }; - } - function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) { const answerDraftStream = createDraftStream(params?.answerMessageId); const reasoningDraftStream = createDraftStream(params?.reasoningMessageId); @@ -600,55 +577,6 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(deliverReplies).not.toHaveBeenCalled(); }); - it("flushes each assistant message boundary so previews stream separately before final delivery", async () => { - const answerDraftStream = createFlushSequencedDraftStream(2001); - const reasoningDraftStream = createDraftStream(); - createTelegramDraftStream - .mockImplementationOnce(() => answerDraftStream) - .mockImplementationOnce(() => reasoningDraftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "First chunk" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Second chunk" }); - await replyOptions?.onAssistantMessageStart?.(); - await replyOptions?.onPartialReply?.({ text: "Third chunk" }); - - await dispatcherOptions.deliver({ text: "First final" }, { kind: "final" }); - await dispatcherOptions.deliver({ text: "Second final" }, { kind: "final" }); - await dispatcherOptions.deliver({ text: "Third final" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "2001" }); - - await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - - expect(answerDraftStream.flush).toHaveBeenCalled(); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 1, - 123, - 2001, - "First final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 2, - 123, - 2002, - "Second final", - expect.any(Object), - ); - expect(editMessageTelegram).toHaveBeenNthCalledWith( - 3, - 123, - 2003, - "Third final", - expect.any(Object), - ); - }); - it.each(["block", "partial"] as const)( "splits reasoning lane only when a later reasoning block starts (%s mode)", async (streamMode) => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index e058f88fb16..9929d3a8ab6 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -672,7 +672,6 @@ export const dispatchTelegramMessage = async ({ ? async () => { reasoningStepState.resetForNextStep(); if (answerLane.hasStreamedMessage) { - await answerLane.stream?.flush(); const previewMessageId = answerLane.stream?.messageId(); if (typeof previewMessageId === "number") { archivedAnswerPreviews.push({ diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 7532015a5bb..fda42e9e9e2 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -134,6 +134,39 @@ describe("createTelegramDraftStream", () => { expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined); }); + it("sends first update immediately after forceNewMessage within throttle window", async () => { + vi.useFakeTimers(); + try { + const api = { + sendMessage: vi + .fn() + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + throttleMs: 1000, + }); + + stream.update("Hello"); + await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1)); + + stream.update("Hello edited"); + expect(api.editMessageText).not.toHaveBeenCalled(); + + stream.forceNewMessage(); + stream.update("Second message"); + await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(2)); + expect(api.sendMessage).toHaveBeenLastCalledWith(123, "Second message", undefined); + } finally { + vi.useRealTimers(); + } + }); + it("supports rendered previews with parse_mode", async () => { const api = createMockDraftApi(); const stream = createTelegramDraftStream({ diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index a4a6b2db20c..e4fb2ca4136 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -167,6 +167,7 @@ export function createTelegramDraftStream(params: { lastSentText = ""; lastSentParseMode = undefined; loop.resetPending(); + loop.resetThrottleWindow(); }; params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);