From 8b1fe0d1e2408546264e2cf5ebb56277880c7dae Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 21 Feb 2026 18:05:23 +0530 Subject: [PATCH] fix(telegram): split streaming preview per assistant block (#22613) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 26f35f4411e65cf14587efeedc4e326a71d54ee0 Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com> Reviewed-by: @obviyus --- CHANGELOG.md | 1 + .../reply/agent-runner-execution.ts | 5 +- .../reply/agent-runner.runreplyagent.test.ts | 44 ++++++++++ src/auto-reply/tokens.ts | 20 +++++ src/channels/draft-stream-loop.ts | 8 ++ ...tion.rejects-routing-allowfrom.e2e.test.ts | 4 +- src/config/schema.help.ts | 2 +- src/config/zod-schema.providers-core.ts | 2 +- src/telegram/bot-message-dispatch.test.ts | 83 +++++++++++++++++-- src/telegram/bot-message-dispatch.ts | 71 ++++++++++++++-- src/telegram/bot.helpers.test.ts | 20 +++++ src/telegram/bot/helpers.ts | 2 +- src/telegram/draft-stream.test.ts | 33 ++++++++ src/telegram/draft-stream.ts | 1 + 14 files changed, 277 insertions(+), 19 deletions(-) create mode 100644 src/telegram/bot.helpers.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d9247fb854..02bb3e8ac8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ Docs: https://docs.openclaw.ai - Memory: return empty snippets when `memory_get`/QMD read files that have not been created yet, and harden memory indexing/session helpers against ENOENT races so missing Markdown no longer crashes tools. (#20680) Thanks @pahdo. - Telegram/Streaming: always clean up draft previews even when dispatch throws before fallback handling, preventing orphaned preview messages during failed runs. (#19041) thanks @mudrii. - Telegram/Streaming: split reasoning and answer draft preview lanes to prevent cross-lane overwrites, and ignore literal `` tags inside inline/fenced code snippets so sample markup is not misrouted as reasoning. (#20774) Thanks @obviyus. +- Telegram/Streaming: restore 30-char first-preview debounce and scope `NO_REPLY` prefix suppression to partial sentinel fragments so normal `No...` text is not filtered. (#22613) thanks @obviyus. - Telegram/Status reactions: refresh stall timers on repeated phase updates and honor ack-reaction scope when lifecycle reactions are enabled, preventing false stall emojis and unwanted group reactions. Thanks @wolly-tundracube and @thewilloftheshadow. - Telegram/Status reactions: keep lifecycle reactions active when available-reactions lookup fails by falling back to unrestricted variant selection instead of suppressing reaction updates. (#22380) thanks @obviyus. - Discord/Streaming: apply `replyToMode: first` only to the first Discord chunk so block-streamed replies do not spam mention pings. (#20726) Thanks @thewilloftheshadow for the report. diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 4becf72c780..eaabfe2f2d3 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -28,7 +28,7 @@ import { import { stripHeartbeatToken } from "../heartbeat.js"; import type { TemplateContext } from "../templating.js"; import type { VerboseLevel } from "../thinking.js"; -import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; +import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import type { GetReplyOptions, ReplyPayload } from "../types.js"; import { buildEmbeddedRunBaseParams, @@ -157,6 +157,9 @@ export async function runAgentTurnWithFallback(params: { return { text: sanitized, skip: false }; }; const handlePartialForTyping = async (payload: ReplyPayload): Promise => { + if (isSilentReplyPrefixText(payload.text, SILENT_REPLY_TOKEN)) { + return undefined; + } const { text, skip } = normalizeStreamingText(payload); if (skip || !text) { return undefined; diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts index b009a8b633b..f7fd979c1fe 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.test.ts @@ -383,6 +383,50 @@ describe("runReplyAgent typing (heartbeat)", () => { expect(typing.startTypingLoop).not.toHaveBeenCalled(); }); + it("suppresses partial streaming for NO_REPLY prefixes", async () => { + const onPartialReply = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "NO_" }); + await params.onPartialReply?.({ text: "NO_RE" }); + await params.onPartialReply?.({ text: "NO_REPLY" }); + return { payloads: [{ text: "NO_REPLY" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: false, onPartialReply }, + typingMode: "message", + }); + await run(); + + expect(onPartialReply).not.toHaveBeenCalled(); + expect(typing.startTypingOnText).not.toHaveBeenCalled(); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + + it("does not suppress partial streaming for normal 'No' prefixes", async () => { + const onPartialReply = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "No" }); + await params.onPartialReply?.({ text: "No, that is valid" }); + return { payloads: [{ text: "No, that is valid" }], meta: {} }; + }); + + const { run, typing } = createMinimalRun({ + opts: { isHeartbeat: false, onPartialReply }, + typingMode: "message", + }); + await run(); + + expect(onPartialReply).toHaveBeenCalledTimes(2); + expect(onPartialReply).toHaveBeenNthCalledWith(1, { text: "No", mediaUrls: undefined }); + expect(onPartialReply).toHaveBeenNthCalledWith(2, { + text: "No, that is valid", + mediaUrls: undefined, + }); + expect(typing.startTypingOnText).toHaveBeenCalled(); + expect(typing.startTypingLoop).not.toHaveBeenCalled(); + }); + it("does not start typing on assistant message start without prior text in message mode", async () => { state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { await params.onAssistantMessageStart?.(); diff --git a/src/auto-reply/tokens.ts b/src/auto-reply/tokens.ts index b305391dcd0..c0bce2a2da3 100644 --- a/src/auto-reply/tokens.ts +++ b/src/auto-reply/tokens.ts @@ -18,3 +18,23 @@ export function isSilentReplyText( const suffix = new RegExp(`\\b${escaped}\\b\\W*$`); return suffix.test(text); } + +export function isSilentReplyPrefixText( + text: string | undefined, + token: string = SILENT_REPLY_TOKEN, +): boolean { + if (!text) { + return false; + } + const normalized = text.trimStart().toUpperCase(); + if (!normalized) { + return false; + } + if (!normalized.includes("_")) { + return false; + } + if (/[^A-Z_]/.test(normalized)) { + return false; + } + return token.toUpperCase().startsWith(normalized); +} diff --git a/src/channels/draft-stream-loop.ts b/src/channels/draft-stream-loop.ts index 69f16c46d78..ed4656dd0f3 100644 --- a/src/channels/draft-stream-loop.ts +++ b/src/channels/draft-stream-loop.ts @@ -3,6 +3,7 @@ export type DraftStreamLoop = { flush: () => Promise; stop: () => void; resetPending: () => void; + resetThrottleWindow: () => void; waitForInFlight: () => Promise; }; @@ -87,6 +88,13 @@ export function createDraftStreamLoop(params: { resetPending: () => { pendingText = ""; }, + resetThrottleWindow: () => { + lastSentAt = 0; + if (timer) { + clearTimeout(timer); + timer = undefined; + } + }, waitForInFlight: async () => { if (inFlightPromise) { await inFlightPromise; diff --git a/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts b/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts index 1a88e3e785d..ac83e659af2 100644 --- a/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts +++ b/src/config/config.legacy-config-detection.rejects-routing-allowfrom.e2e.test.ts @@ -378,11 +378,11 @@ describe("legacy config detection", () => { expect(res.config.channels?.telegram?.groupPolicy).toBe("allowlist"); } }); - it("defaults telegram.streaming to true when telegram section exists", async () => { + it("defaults telegram.streaming to false when telegram section exists", async () => { const res = validateConfigObject({ channels: { telegram: {} } }); expect(res.ok).toBe(true); if (res.ok) { - expect(res.config.channels?.telegram?.streaming).toBe(true); + expect(res.config.channels?.telegram?.streaming).toBe(false); expect(res.config.channels?.telegram?.streamMode).toBeUndefined(); } }); diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 1c7dffda860..e96e6f149f0 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -392,7 +392,7 @@ export const FIELD_HELP: Record = { "channels.telegram.dmPolicy": 'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].', "channels.telegram.streaming": - "Enable Telegram live stream preview via message edits (default: true; legacy streamMode auto-maps here).", + "Enable Telegram live stream preview via message edits (default: false; legacy streamMode auto-maps here).", "channels.discord.streamMode": "Live stream preview mode for Discord replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessage.", "channels.discord.draftChunk.minChars": diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 22c55b4035d..668c413a4e0 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -117,7 +117,7 @@ function normalizeTelegramStreamingConfig(value: { delete value.streamMode; return; } - value.streaming = true; + value.streaming = false; } export const TelegramAccountSchemaBase = z diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index c2f84730938..ede7a128856 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -63,6 +63,25 @@ describe("dispatchTelegramMessage draft streaming", () => { }; } + function createSequencedDraftStream(startMessageId = 1001) { + let activeMessageId: number | undefined; + let nextMessageId = startMessageId; + return { + update: vi.fn().mockImplementation(() => { + if (activeMessageId == null) { + activeMessageId = nextMessageId++; + } + }), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => activeMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn().mockImplementation(() => { + activeMessageId = undefined; + }), + }; + } + function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) { const answerDraftStream = createDraftStream(params?.answerMessageId); const reasoningDraftStream = createDraftStream(params?.reasoningMessageId); @@ -172,7 +191,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ chatId: 123, thread: { id: 777, scope: "dm" }, - minInitialChars: 1, + minInitialChars: 30, }), ); expect(draftStream.update).toHaveBeenCalledWith("Hello"); @@ -193,7 +212,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.clear).toHaveBeenCalledTimes(1); }); - it("uses immediate preview updates for legacy block stream mode", async () => { + it("uses 30-char preview debounce for legacy block stream mode", async () => { const draftStream = createDraftStream(); createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( @@ -209,7 +228,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(createTelegramDraftStream).toHaveBeenCalledWith( expect.objectContaining({ - minInitialChars: 1, + minInitialChars: 30, }), ); }); @@ -445,7 +464,7 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); - it("does not force new message for legacy block stream mode", async () => { + it("forces new message for next assistant block in legacy block stream mode", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( @@ -464,10 +483,10 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "block" }); - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1); }); - it("does not force new message in partial mode when assistant message restarts", async () => { + it("forces new message in partial mode when assistant message restarts", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( @@ -483,7 +502,7 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1); }); it("does not force new message on first assistant message start", async () => { @@ -508,6 +527,56 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); }); + it("finalizes multi-message assistant stream to matching preview messages in order", async () => { + const answerDraftStream = createSequencedDraftStream(1001); + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Message A partial" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message B partial" }); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onPartialReply?.({ text: "Message C partial" }); + + await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" }); + await dispatcherOptions.deliver({ text: "Message C final" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 1001, + "Message A final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 1002, + "Message B final", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 3, + 123, + 1003, + "Message C final", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + it.each(["block", "partial"] as const)( "splits reasoning lane only when a later reasoning block starts (%s mode)", async (streamMode) => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index 7026b8cca6c..71e53528051 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -147,8 +147,7 @@ export const dispatchTelegramMessage = async ({ const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft; const draftReplyToMessageId = replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; - const draftMinInitialChars = - previewStreamingEnabled || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS; + const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); type LaneName = "answer" | "reasoning"; type DraftLaneState = { @@ -184,6 +183,8 @@ export const dispatchTelegramMessage = async ({ const reasoningLane = lanes.reasoning; let splitReasoningOnNextStream = false; const reasoningStepState = createTelegramReasoningStepState(); + type ArchivedPreview = { messageId: number; textSnapshot: string }; + const archivedAnswerPreviews: ArchivedPreview[] = []; type SplitLaneSegment = { lane: LaneName; text: string }; const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => { const split = splitTelegramReasoningText(text); @@ -353,6 +354,8 @@ export const dispatchTelegramMessage = async ({ updateLaneSnapshot?: boolean; skipRegressive: "always" | "existingOnly"; context: "final" | "update"; + previewMessageId?: number; + previewTextSnapshot?: string; }): Promise => { const { lane, @@ -363,19 +366,26 @@ export const dispatchTelegramMessage = async ({ updateLaneSnapshot = false, skipRegressive, context, + previewMessageId: previewMessageIdOverride, + previewTextSnapshot, } = params; if (!lane.stream) { return false; } - const hadPreviewMessage = typeof lane.stream.messageId() === "number"; + const lanePreviewMessageId = lane.stream.messageId(); + const hadPreviewMessage = + typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number"; if (stopBeforeEdit) { await lane.stream.stop(); } - const previewMessageId = lane.stream.messageId(); + const previewMessageId = + typeof previewMessageIdOverride === "number" + ? previewMessageIdOverride + : lane.stream.messageId(); if (typeof previewMessageId !== "number") { return false; } - const currentPreviewText = getLanePreviewText(lane); + const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane); const shouldSkipRegressive = Boolean(currentPreviewText) && currentPreviewText.startsWith(text) && @@ -446,6 +456,36 @@ export const dispatchTelegramMessage = async ({ !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; if (infoKind === "final") { + if (laneName === "answer" && archivedAnswerPreviews.length > 0) { + const archivedPreview = archivedAnswerPreviews.shift(); + if (archivedPreview) { + if (canEditViaPreview) { + const finalized = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: false, + skipRegressive: "existingOnly", + context: "final", + previewMessageId: archivedPreview.messageId, + previewTextSnapshot: archivedPreview.textSnapshot, + }); + if (finalized) { + return "preview-finalized"; + } + } + try { + await bot.api.deleteMessage(chatId, archivedPreview.messageId); + } catch (err) { + logVerbose( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + const delivered = await sendPayload(applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + } + } if (canEditViaPreview && !finalizedPreviewByLane[laneName]) { await flushDraftLane(lane); const finalized = await tryUpdatePreviewForLane({ @@ -628,8 +668,18 @@ export const dispatchTelegramMessage = async ({ } : undefined, onAssistantMessageStart: answerLane.stream - ? () => { + ? async () => { reasoningStepState.resetForNextStep(); + if (answerLane.hasStreamedMessage) { + const previewMessageId = answerLane.stream?.messageId(); + if (typeof previewMessageId === "number") { + archivedAnswerPreviews.push({ + messageId: previewMessageId, + textSnapshot: answerLane.lastPartialText, + }); + } + answerLane.stream?.forceNewMessage(); + } resetDraftLaneState(answerLane); } : undefined, @@ -676,6 +726,15 @@ export const dispatchTelegramMessage = async ({ await stream.clear(); } } + for (const archivedPreview of archivedAnswerPreviews) { + try { + await bot.api.deleteMessage(chatId, archivedPreview.messageId); + } catch (err) { + logVerbose( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + } } let sentFallback = false; if ( diff --git a/src/telegram/bot.helpers.test.ts b/src/telegram/bot.helpers.test.ts new file mode 100644 index 00000000000..aa68107bf91 --- /dev/null +++ b/src/telegram/bot.helpers.test.ts @@ -0,0 +1,20 @@ +import { describe, expect, it } from "vitest"; +import { resolveTelegramStreamMode } from "./bot/helpers.js"; + +describe("resolveTelegramStreamMode", () => { + it("defaults to off when telegram streaming is unset", () => { + expect(resolveTelegramStreamMode(undefined)).toBe("off"); + expect(resolveTelegramStreamMode({})).toBe("off"); + }); + + it("prefers explicit streaming boolean", () => { + expect(resolveTelegramStreamMode({ streaming: true })).toBe("partial"); + expect(resolveTelegramStreamMode({ streaming: false })).toBe("off"); + }); + + it("maps legacy streamMode values", () => { + expect(resolveTelegramStreamMode({ streamMode: "off" })).toBe("off"); + expect(resolveTelegramStreamMode({ streamMode: "partial" })).toBe("partial"); + expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("partial"); + }); +}); diff --git a/src/telegram/bot/helpers.ts b/src/telegram/bot/helpers.ts index 4ee7036553d..59e0634135d 100644 --- a/src/telegram/bot/helpers.ts +++ b/src/telegram/bot/helpers.ts @@ -167,7 +167,7 @@ export function resolveTelegramStreamMode(telegramCfg?: { if (raw === "partial" || raw === "block") { return "partial"; } - return "partial"; + return "off"; } export function buildTelegramGroupPeerId(chatId: number | string, messageThreadId?: number) { diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index 7532015a5bb..fda42e9e9e2 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -134,6 +134,39 @@ describe("createTelegramDraftStream", () => { expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined); }); + it("sends first update immediately after forceNewMessage within throttle window", async () => { + vi.useFakeTimers(); + try { + const api = { + sendMessage: vi + .fn() + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }), + editMessageText: vi.fn().mockResolvedValue(true), + deleteMessage: vi.fn().mockResolvedValue(true), + }; + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + throttleMs: 1000, + }); + + stream.update("Hello"); + await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1)); + + stream.update("Hello edited"); + expect(api.editMessageText).not.toHaveBeenCalled(); + + stream.forceNewMessage(); + stream.update("Second message"); + await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(2)); + expect(api.sendMessage).toHaveBeenLastCalledWith(123, "Second message", undefined); + } finally { + vi.useRealTimers(); + } + }); + it("supports rendered previews with parse_mode", async () => { const api = createMockDraftApi(); const stream = createTelegramDraftStream({ diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index a4a6b2db20c..e4fb2ca4136 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -167,6 +167,7 @@ export function createTelegramDraftStream(params: { lastSentText = ""; lastSentParseMode = undefined; loop.resetPending(); + loop.resetThrottleWindow(); }; params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);