diff --git a/CHANGELOG.md b/CHANGELOG.md index e1af9a7119c..c53b601307a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ Docs: https://docs.openclaw.ai - Config/Memory: restore schema help/label metadata for hybrid `mmr` and `temporalDecay` settings so configuration surfaces show correct names and guidance. (#18786) Thanks @rodrigouroz. - Tools/web_search: handle xAI Responses API payloads that emit top-level `output_text` blocks (without a `message` wrapper) so Grok web_search no longer returns `No response` for those results. (#20508) Thanks @echoVic. - Telegram/Streaming: always clean up draft previews even when dispatch throws before fallback handling, preventing orphaned preview messages during failed runs. (#19041) thanks @mudrii. +- Telegram/Streaming: split reasoning and answer draft preview lanes to prevent cross-lane overwrites, and ignore literal `` tags inside inline/fenced code snippets so sample markup is not misrouted as reasoning. (#20774) Thanks @obviyus. - Discord/Gateway: handle close code 4014 (missing privileged gateway intents) without crashing the gateway. Thanks @thewilloftheshadow. - Security/Net: strip sensitive headers (`Authorization`, `Proxy-Authorization`, `Cookie`, `Cookie2`) on cross-origin redirects in `fetchWithSsrFGuard` to prevent credential forwarding across origin boundaries. (#20313) Thanks @afurm. diff --git a/src/telegram/bot-message-dispatch.test.ts b/src/telegram/bot-message-dispatch.test.ts index 8f47ce636b9..95c2c0af73c 100644 --- a/src/telegram/bot-message-dispatch.test.ts +++ b/src/telegram/bot-message-dispatch.test.ts @@ -7,6 +7,8 @@ const createTelegramDraftStream = vi.hoisted(() => vi.fn()); const dispatchReplyWithBufferedBlockDispatcher = vi.hoisted(() => vi.fn()); const deliverReplies = vi.hoisted(() => vi.fn()); const editMessageTelegram = vi.hoisted(() => vi.fn()); +const loadSessionStore = vi.hoisted(() => vi.fn()); +const resolveStorePath = vi.hoisted(() => vi.fn(() => "/tmp/sessions.json")); vi.mock("./draft-stream.js", () => ({ createTelegramDraftStream, @@ -24,6 +26,11 @@ vi.mock("./send.js", () => ({ editMessageTelegram, })); +vi.mock("../config/sessions.js", async () => ({ + loadSessionStore, + resolveStorePath, +})); + vi.mock("./sticker-cache.js", () => ({ cacheSticker: vi.fn(), describeStickerImage: vi.fn(), @@ -39,6 +46,10 @@ describe("dispatchTelegramMessage draft streaming", () => { dispatchReplyWithBufferedBlockDispatcher.mockReset(); deliverReplies.mockReset(); editMessageTelegram.mockReset(); + loadSessionStore.mockReset(); + resolveStorePath.mockReset(); + resolveStorePath.mockReturnValue("/tmp/sessions.json"); + loadSessionStore.mockReturnValue({}); }); function createDraftStream(messageId?: number) { @@ -52,6 +63,15 @@ describe("dispatchTelegramMessage draft streaming", () => { }; } + function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) { + const answerDraftStream = createDraftStream(params?.answerMessageId); + const reasoningDraftStream = createDraftStream(params?.reasoningMessageId); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + return { answerDraftStream, reasoningDraftStream }; + } + function createContext(overrides?: Partial): TelegramMessageContext { const base = { ctxPayload: {}, @@ -152,6 +172,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect.objectContaining({ chatId: 123, thread: { id: 777, scope: "dm" }, + minInitialChars: 1, }), ); expect(draftStream.update).toHaveBeenCalledWith("Hello"); @@ -172,6 +193,27 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.clear).toHaveBeenCalledTimes(1); }); + it("keeps a higher initial debounce threshold in block stream mode", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Hello" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode: "block" }); + + expect(createTelegramDraftStream).toHaveBeenCalledWith( + expect.objectContaining({ + minInitialChars: 30, + }), + ); + }); + it("keeps block streaming enabled when account config enables it", async () => { dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); @@ -195,6 +237,66 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("keeps block streaming enabled when session reasoning level is on", async () => { + loadSessionStore.mockReturnValue({ + s1: { reasoningLevel: "on" }, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "Reasoning:\n_step_" }, { kind: "block" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext({ + ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"], + }), + }); + + expect(createTelegramDraftStream).not.toHaveBeenCalled(); + expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( + expect.objectContaining({ + replyOptions: expect.objectContaining({ + disableBlockStreaming: false, + }), + }), + ); + expect(loadSessionStore).toHaveBeenCalledWith("/tmp/sessions.json", { skipCache: true }); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Reasoning:\n_step_" })], + }), + ); + }); + + it("streams reasoning draft updates even when answer stream mode is off", async () => { + loadSessionStore.mockReturnValue({ + s1: { reasoningLevel: "stream" }, + }); + const reasoningDraftStream = createDraftStream(111); + createTelegramDraftStream.mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step_" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext({ + ctxPayload: { SessionKey: "s1" } as unknown as TelegramMessageContext["ctxPayload"], + }), + streamMode: "off", + }); + + expect(createTelegramDraftStream).toHaveBeenCalledTimes(1); + expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_step_"); + expect(loadSessionStore).toHaveBeenCalledWith("/tmp/sessions.json", { skipCache: true }); + }); + it("finalizes text-only replies by editing the preview message in place", async () => { const draftStream = createDraftStream(999); createTelegramDraftStream.mockReturnValue(draftStream); @@ -407,71 +509,398 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); }); - it("forces new message when reasoning ends after previous output", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); + it.each(["block", "partial"] as const)( + "splits reasoning lane only when a later reasoning block starts (%s mode)", + async (streamMode) => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" }); + await replyOptions?.onReasoningEnd?.(); + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + await replyOptions?.onPartialReply?.({ text: "checking files..." }); + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_second block_" }); + await dispatcherOptions.deliver({ text: "Done" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode }); + + expect(reasoningDraftStream.forceNewMessage).toHaveBeenCalledTimes(1); + }, + ); + + it.each(["block", "partial"] as const)( + "does not split reasoning lane on reasoning end without a later reasoning block (%s mode)", + async (streamMode) => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_first block_" }); + await replyOptions?.onReasoningEnd?.(); + await replyOptions?.onPartialReply?.({ text: "Here's the answer" }); + await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), streamMode }); + + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + }, + ); + + it("does not finalize preview with reasoning payloads before answer payloads", async () => { + setupDraftStreams({ answerMessageId: 999 }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - // First partial: text before thinking - await replyOptions?.onPartialReply?.({ text: "Let me check" }); - // Reasoning stream (thinking block) - await replyOptions?.onReasoningStream?.({ text: "Analyzing..." }); - // Reasoning ends - await replyOptions?.onReasoningEnd?.(); - // Second partial: text after thinking - await replyOptions?.onPartialReply?.({ text: "Here's the answer" }); - await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" }); - return { queuedFinal: true }; - }, - ); - deliverReplies.mockResolvedValue({ delivered: true }); - - await dispatchWithContext({ context: createContext(), streamMode: "block" }); - - // Should force new message when reasoning ends - expect(draftStream.forceNewMessage).toHaveBeenCalled(); - }); - - it("does not force new message in partial mode when reasoning ends", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); - dispatchReplyWithBufferedBlockDispatcher.mockImplementation( - async ({ dispatcherOptions, replyOptions }) => { - await replyOptions?.onPartialReply?.({ text: "Let me check" }); - await replyOptions?.onReasoningEnd?.(); - await replyOptions?.onPartialReply?.({ text: "Here's the answer" }); - await dispatcherOptions.deliver({ text: "Here's the answer" }, { kind: "final" }); + await replyOptions?.onPartialReply?.({ text: "Hi, I did what you asked and..." }); + await dispatcherOptions.deliver({ text: "Reasoning:\n_step one_" }, { kind: "final" }); + await dispatcherOptions.deliver( + { text: "Hi, I did what you asked and..." }, + { kind: "final" }, + ); return { queuedFinal: true }; }, ); deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); await dispatchWithContext({ context: createContext(), streamMode: "partial" }); - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + // Keep reasoning as its own message. + expect(deliverReplies).toHaveBeenCalledTimes(1); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Reasoning:\n_step one_" })], + }), + ); + // Finalize preview with the actual answer instead of overwriting with reasoning. + expect(editMessageTelegram).toHaveBeenCalledTimes(1); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "Hi, I did what you asked and...", + expect.any(Object), + ); }); - it("does not force new message on reasoning end without previous output", async () => { - const draftStream = createDraftStream(999); - createTelegramDraftStream.mockReturnValue(draftStream); + it("keeps reasoning and answer streaming in separate preview lanes", async () => { + const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( async ({ dispatcherOptions, replyOptions }) => { - // Reasoning starts immediately (no previous text output) - await replyOptions?.onReasoningStream?.({ text: "Thinking..." }); - // Reasoning ends - await replyOptions?.onReasoningEnd?.(); - // First actual text output - await replyOptions?.onPartialReply?.({ text: "Here's my answer" }); - await dispatcherOptions.deliver({ text: "Here's my answer" }, { kind: "final" }); + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" }); + await replyOptions?.onPartialReply?.({ text: "Checking the directory..." }); + await dispatcherOptions.deliver({ text: "Checking the directory..." }, { kind: "final" }); return { queuedFinal: true }; }, ); deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith("Reasoning:\n_Working on it..._"); + expect(answerDraftStream.update).toHaveBeenCalledWith("Checking the directory..."); + expect(answerDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + }); + + it("does not edit reasoning preview bubble with final answer when no assistant partial arrived yet", async () => { + setupDraftStreams({ reasoningMessageId: 999 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Working on it..._" }); + await dispatcherOptions.deliver({ text: "Here's what I found." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).not.toHaveBeenCalled(); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Here's what I found." })], + }), + ); + }); + + it.each(["partial", "block"] as const)( + "does not duplicate reasoning final after reasoning end (%s mode)", + async (streamMode) => { + let reasoningMessageId: number | undefined = 111; + const reasoningDraftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => reasoningMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockResolvedValue(undefined), + forceNewMessage: vi.fn().mockImplementation(() => { + reasoningMessageId = undefined; + }), + }; + const answerDraftStream = createDraftStream(999); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_step one_" }); + await replyOptions?.onReasoningEnd?.(); + await dispatcherOptions.deliver( + { text: "Reasoning:\n_step one expanded_" }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "111" }); + + await dispatchWithContext({ context: createContext(), streamMode }); + + expect(reasoningDraftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 111, + "Reasoning:\n_step one expanded_", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }, + ); + + it("updates reasoning preview for reasoning block payloads instead of sending duplicates", async () => { + setupDraftStreams({ answerMessageId: 999, reasoningMessageId: 111 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReasoningStream?.({ + text: "Reasoning:\nIf I count r in strawberry, I see positions 3, 8, and", + }); + await replyOptions?.onReasoningEnd?.(); + await replyOptions?.onPartialReply?.({ text: "3" }); + await dispatcherOptions.deliver({ text: "3" }, { kind: "final" }); + await dispatcherOptions.deliver( + { + text: "Reasoning:\nIf I count r in strawberry, I see positions 3, 8, and 9. So the total is 3.", + }, + { kind: "block" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenNthCalledWith(1, 123, 999, "3", expect.any(Object)); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 111, + "Reasoning:\nIf I count r in strawberry, I see positions 3, 8, and 9. So the total is 3.", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalledWith( + expect.objectContaining({ + replies: [ + expect.objectContaining({ + text: expect.stringContaining("Reasoning:\nIf I count r in strawberry"), + }), + ], + }), + ); + }); + + it("routes think-tag partials to reasoning lane and keeps answer lane clean", async () => { + const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Counting letters in strawberry3", + }); + await dispatcherOptions.deliver({ text: "3" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\n_Counting letters in strawberry_", + ); + expect(answerDraftStream.update).toHaveBeenCalledWith("3"); + expect( + answerDraftStream.update.mock.calls.some((call) => String(call[0] ?? "").includes("")), + ).toBe(false); + expect(editMessageTelegram).toHaveBeenCalledWith(123, 999, "3", expect.any(Object)); + }); + + it("routes unmatched think partials to reasoning lane without leaking answer lane", async () => { + const { answerDraftStream, reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Counting letters in strawberry", + }); + await dispatcherOptions.deliver( + { text: "There are 3 r's in strawberry." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\n_Counting letters in strawberry_", + ); + expect( + answerDraftStream.update.mock.calls.some((call) => String(call[0] ?? "").includes("<")), + ).toBe(false); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "There are 3 r's in strawberry.", + expect.any(Object), + ); + }); + + it("keeps reasoning preview message when reasoning is streamed but final is answer-only", async () => { + const { reasoningDraftStream } = setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Word: strawberry. r appears at 3, 8, 9.", + }); + await dispatcherOptions.deliver( + { text: "There are 3 r's in strawberry." }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(reasoningDraftStream.update).toHaveBeenCalledWith( + "Reasoning:\n_Word: strawberry. r appears at 3, 8, 9._", + ); + expect(reasoningDraftStream.clear).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "There are 3 r's in strawberry.", + expect.any(Object), + ); + }); + + it("splits think-tag final payload into reasoning and answer lanes", async () => { + setupDraftStreams({ + answerMessageId: 999, + reasoningMessageId: 111, + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver( + { + text: "Word: strawberry. r appears at 3, 8, 9.There are 3 r's in strawberry.", + }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); + + await dispatchWithContext({ context: createContext(), streamMode: "partial" }); + + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 1, + 123, + 111, + "Reasoning:\n_Word: strawberry. r appears at 3, 8, 9._", + expect.any(Object), + ); + expect(editMessageTelegram).toHaveBeenNthCalledWith( + 2, + 123, + 999, + "There are 3 r's in strawberry.", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); + }); + + it("edits stop-created preview when final text is shorter than buffered draft", async () => { + let answerMessageId: number | undefined; + const answerDraftStream = { + update: vi.fn(), + flush: vi.fn().mockResolvedValue(undefined), + messageId: vi.fn().mockImplementation(() => answerMessageId), + clear: vi.fn().mockResolvedValue(undefined), + stop: vi.fn().mockImplementation(async () => { + answerMessageId = 999; + }), + forceNewMessage: vi.fn(), + }; + const reasoningDraftStream = createDraftStream(); + createTelegramDraftStream + .mockImplementationOnce(() => answerDraftStream) + .mockImplementationOnce(() => reasoningDraftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ + text: "Let me check that file and confirm details for you.", + }); + await dispatcherOptions.deliver({ text: "Let me check that file." }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "999" }); await dispatchWithContext({ context: createContext(), streamMode: "block" }); - // No previous text output, so no forceNewMessage needed - expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 999, + "Let me check that file.", + expect.any(Object), + ); + expect(deliverReplies).not.toHaveBeenCalled(); }); it("does not edit preview message when final payload is an error", async () => { diff --git a/src/telegram/bot-message-dispatch.ts b/src/telegram/bot-message-dispatch.ts index ad62cf54a0b..14a6787b586 100644 --- a/src/telegram/bot-message-dispatch.ts +++ b/src/telegram/bot-message-dispatch.ts @@ -10,11 +10,13 @@ import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js"; import { resolveChunkMode } from "../auto-reply/chunk.js"; import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; import { removeAckReactionAfterReply } from "../channels/ack-reactions.js"; import { logAckFailure, logTypingFailure } from "../channels/logging.js"; import { createReplyPrefixOptions } from "../channels/reply-prefix.js"; import { createTypingCallbacks } from "../channels/typing.js"; import { resolveMarkdownTableMode } from "../config/markdown-tables.js"; +import { loadSessionStore, resolveStorePath } from "../config/sessions.js"; import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js"; import { danger, logVerbose } from "../globals.js"; import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js"; @@ -26,6 +28,11 @@ import type { TelegramStreamMode } from "./bot/types.js"; import type { TelegramInlineButtons } from "./button-types.js"; import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js"; import { createTelegramDraftStream } from "./draft-stream.js"; +import { renderTelegramHtmlText } from "./format.js"; +import { + createTelegramReasoningStepState, + splitTelegramReasoningText, +} from "./reasoning-lane-coordinator.js"; import { editMessageTelegram } from "./send.js"; import { cacheSticker, describeStickerImage } from "./sticker-cache.js"; @@ -60,6 +67,31 @@ type DispatchTelegramMessageParams = { opts: Pick; }; +type TelegramReasoningLevel = "off" | "on" | "stream"; + +function resolveTelegramReasoningLevel(params: { + cfg: OpenClawConfig; + sessionKey?: string; + agentId: string; +}): TelegramReasoningLevel { + const { cfg, sessionKey, agentId } = params; + if (!sessionKey) { + return "off"; + } + try { + const storePath = resolveStorePath(cfg.session?.store, { agentId }); + const store = loadSessionStore(storePath, { skipCache: true }); + const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey]; + const level = entry?.reasoningLevel; + if (level === "on" || level === "stream") { + return level; + } + } catch { + // Fall through to default. + } + return "off"; +} + export const dispatchTelegramMessage = async ({ context, bot, @@ -90,112 +122,183 @@ export const dispatchTelegramMessage = async ({ } = context; const draftMaxChars = Math.min(textLimit, 4096); + const tableMode = resolveMarkdownTableMode({ + cfg, + channel: "telegram", + accountId: route.accountId, + }); + const renderDraftPreview = (text: string) => ({ + text: renderTelegramHtmlText(text, { tableMode }), + parseMode: "HTML" as const, + }); const accountBlockStreamingEnabled = typeof telegramCfg.blockStreaming === "boolean" ? telegramCfg.blockStreaming : cfg.agents?.defaults?.blockStreamingDefault === "on"; - const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled; + const resolvedReasoningLevel = resolveTelegramReasoningLevel({ + cfg, + sessionKey: ctxPayload.SessionKey, + agentId: route.agentId, + }); + const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on"; + const streamReasoningDraft = resolvedReasoningLevel === "stream"; + const canStreamAnswerDraft = + streamMode !== "off" && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning; + const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft; const draftReplyToMessageId = replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined; - const draftStream = canStreamDraft - ? createTelegramDraftStream({ - api: bot.api, - chatId, - maxChars: draftMaxChars, - thread: threadSpec, - replyToMessageId: draftReplyToMessageId, - minInitialChars: DRAFT_MIN_INITIAL_CHARS, - log: logVerbose, - warn: logVerbose, - }) - : undefined; - const draftChunking = - draftStream && streamMode === "block" - ? resolveTelegramDraftStreamingChunking(cfg, route.accountId) - : undefined; - const shouldSplitPreviewMessages = streamMode === "block"; - const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined; + const draftMinInitialChars = + streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); - let lastPartialText = ""; - let draftText = ""; - let hasStreamedMessage = false; - const updateDraftFromPartial = (text?: string) => { - if (!draftStream || !text) { + type LaneName = "answer" | "reasoning"; + type DraftLaneState = { + stream: ReturnType | undefined; + lastPartialText: string; + draftText: string; + hasStreamedMessage: boolean; + chunker: EmbeddedBlockChunker | undefined; + }; + const createDraftLane = (enabled: boolean): DraftLaneState => { + const stream = enabled + ? createTelegramDraftStream({ + api: bot.api, + chatId, + maxChars: draftMaxChars, + thread: threadSpec, + replyToMessageId: draftReplyToMessageId, + minInitialChars: draftMinInitialChars, + renderText: renderDraftPreview, + log: logVerbose, + warn: logVerbose, + }) + : undefined; + const chunker = + stream && streamMode === "block" + ? new EmbeddedBlockChunker(resolveTelegramDraftStreamingChunking(cfg, route.accountId)) + : undefined; + return { + stream, + lastPartialText: "", + draftText: "", + hasStreamedMessage: false, + chunker, + }; + }; + const lanes: Record = { + answer: createDraftLane(canStreamAnswerDraft), + reasoning: createDraftLane(canStreamReasoningDraft), + }; + const answerLane = lanes.answer; + const reasoningLane = lanes.reasoning; + let splitReasoningOnNextStream = false; + const reasoningStepState = createTelegramReasoningStepState(); + type SplitLaneSegment = { lane: LaneName; text: string }; + const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => { + const split = splitTelegramReasoningText(text); + const segments: SplitLaneSegment[] = []; + if (split.reasoningText) { + segments.push({ lane: "reasoning", text: split.reasoningText }); + } + if (split.answerText) { + segments.push({ lane: "answer", text: split.answerText }); + } + return segments; + }; + const resetDraftLaneState = (lane: DraftLaneState) => { + lane.lastPartialText = ""; + lane.draftText = ""; + lane.hasStreamedMessage = false; + lane.chunker?.reset(); + }; + const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => { + const laneStream = lane.stream; + if (!laneStream || !text) { return; } - if (text === lastPartialText) { + if (text === lane.lastPartialText) { return; } // Mark that we've received streaming content (for forceNewMessage decision). - hasStreamedMessage = true; + lane.hasStreamedMessage = true; if (streamMode === "partial") { // Some providers briefly emit a shorter prefix snapshot (for example // "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid // visible punctuation flicker. if ( - lastPartialText && - lastPartialText.startsWith(text) && - text.length < lastPartialText.length + lane.lastPartialText && + lane.lastPartialText.startsWith(text) && + text.length < lane.lastPartialText.length ) { return; } - lastPartialText = text; - draftStream.update(text); + lane.lastPartialText = text; + laneStream.update(text); return; } let delta = text; - if (text.startsWith(lastPartialText)) { - delta = text.slice(lastPartialText.length); + if (text.startsWith(lane.lastPartialText)) { + delta = text.slice(lane.lastPartialText.length); } else { // Streaming buffer reset (or non-monotonic stream). Start fresh. - draftChunker?.reset(); - draftText = ""; + lane.chunker?.reset(); + lane.draftText = ""; } - lastPartialText = text; + lane.lastPartialText = text; if (!delta) { return; } - if (!draftChunker) { - draftText = text; - draftStream.update(draftText); + if (!lane.chunker) { + lane.draftText = text; + laneStream.update(lane.draftText); return; } - draftChunker.append(delta); - draftChunker.drain({ + lane.chunker.append(delta); + lane.chunker.drain({ force: false, emit: (chunk) => { - draftText += chunk; - draftStream.update(draftText); + lane.draftText += chunk; + laneStream.update(lane.draftText); }, }); }; - const flushDraft = async () => { - if (!draftStream) { + const ingestDraftLaneSegments = (text: string | undefined) => { + for (const segment of splitTextIntoLaneSegments(text)) { + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); + reasoningStepState.noteReasoningDelivered(); + } + updateDraftFromPartial(lanes[segment.lane], segment.text); + } + }; + const flushDraftLane = async (lane: DraftLaneState) => { + if (!lane.stream) { return; } - if (draftChunker?.hasBuffered()) { - draftChunker.drain({ + if (lane.chunker?.hasBuffered()) { + lane.chunker.drain({ force: true, emit: (chunk) => { - draftText += chunk; + lane.draftText += chunk; }, }); - draftChunker.reset(); - if (draftText) { - draftStream.update(draftText); + lane.chunker.reset(); + if (lane.draftText) { + lane.stream.update(lane.draftText); } } - await draftStream.flush(); + await lane.stream.flush(); }; const disableBlockStreaming = streamMode === "off" - ? true // off mode must always disable block streaming - : typeof telegramCfg.blockStreaming === "boolean" - ? !telegramCfg.blockStreaming - : draftStream - ? true - : undefined; + ? true + : forceBlockStreamingForReasoning + ? false + : typeof telegramCfg.blockStreaming === "boolean" + ? !telegramCfg.blockStreaming + : canStreamAnswerDraft + ? true + : undefined; const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ cfg, @@ -203,11 +306,6 @@ export const dispatchTelegramMessage = async ({ channel: "telegram", accountId: route.accountId, }); - const tableMode = resolveMarkdownTableMode({ - cfg, - channel: "telegram", - accountId: route.accountId, - }); const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId); // Handle uncached stickers: get a dedicated vision description before dispatch @@ -271,26 +369,12 @@ export const dispatchTelegramMessage = async ({ const deliveryState = { delivered: false, skippedNonSilent: 0, - failedDeliveries: 0, + failedNonSilent: 0, }; - let finalizedViaPreviewMessage = false; - - /** - * Clean up the draft preview message. The preview must be removed in every - * case EXCEPT when it was successfully finalized as the actual response via - * an in-place edit (`finalizedViaPreviewMessage === true`). - */ - const clearDraftPreviewIfNeeded = async () => { - if (finalizedViaPreviewMessage) { - return; - } - try { - await draftStream?.clear(); - } catch (err) { - logVerbose(`telegram: draft preview cleanup failed: ${String(err)}`); - } + const finalizedPreviewByLane: Record = { + answer: false, + reasoning: false, }; - const clearGroupHistory = () => { if (isGroup && historyKey) { clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit }); @@ -310,9 +394,157 @@ export const dispatchTelegramMessage = async ({ linkPreview: telegramCfg.linkPreview, replyQuoteText, }; + const getLanePreviewText = (lane: DraftLaneState) => + streamMode === "block" ? lane.draftText : lane.lastPartialText; + const tryUpdatePreviewForLane = async (params: { + lane: DraftLaneState; + laneName: LaneName; + text: string; + previewButtons?: TelegramInlineButtons; + stopBeforeEdit?: boolean; + updateLaneSnapshot?: boolean; + skipRegressive: "always" | "existingOnly"; + context: "final" | "update"; + }): Promise => { + const { + lane, + laneName, + text, + previewButtons, + stopBeforeEdit = false, + updateLaneSnapshot = false, + skipRegressive, + context, + } = params; + if (!lane.stream) { + return false; + } + const hadPreviewMessage = typeof lane.stream.messageId() === "number"; + if (stopBeforeEdit) { + await lane.stream.stop(); + } + const previewMessageId = lane.stream.messageId(); + if (typeof previewMessageId !== "number") { + return false; + } + const currentPreviewText = getLanePreviewText(lane); + const shouldSkipRegressive = + Boolean(currentPreviewText) && + currentPreviewText.startsWith(text) && + text.length < currentPreviewText.length && + (skipRegressive === "always" || hadPreviewMessage); + if (shouldSkipRegressive) { + // Avoid regressive punctuation/wording flicker from occasional shorter finals. + deliveryState.delivered = true; + return true; + } + try { + await editMessageTelegram(chatId, previewMessageId, text, { + api: bot.api, + cfg, + accountId: route.accountId, + linkPreview: telegramCfg.linkPreview, + buttons: previewButtons, + }); + if (updateLaneSnapshot) { + lane.lastPartialText = text; + lane.draftText = text; + } + deliveryState.delivered = true; + return true; + } catch (err) { + logVerbose( + `telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`, + ); + return false; + } + }; + const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => { + if (payload.text === text) { + return payload; + } + return { ...payload, text }; + }; + const sendPayload = async (payload: ReplyPayload) => { + const result = await deliverReplies({ + ...deliveryBaseOptions, + replies: [payload], + onVoiceRecording: sendRecordVoice, + }); + if (result.delivered) { + deliveryState.delivered = true; + } + return result.delivered; + }; + type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped"; + const deliverLaneText = async (params: { + laneName: LaneName; + text: string; + payload: ReplyPayload; + infoKind: string; + previewButtons?: TelegramInlineButtons; + allowPreviewUpdateForNonFinal?: boolean; + }): Promise => { + const { + laneName, + text, + payload, + infoKind, + previewButtons, + allowPreviewUpdateForNonFinal = false, + } = params; + const lane = lanes[laneName]; + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + const canEditViaPreview = + !hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError; + + if (infoKind === "final") { + if (canEditViaPreview && !finalizedPreviewByLane[laneName]) { + await flushDraftLane(lane); + const finalized = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: true, + skipRegressive: "existingOnly", + context: "final", + }); + if (finalized) { + finalizedPreviewByLane[laneName] = true; + return "preview-finalized"; + } + } else if (!hasMedia && !payload.isError && text.length > draftMaxChars) { + logVerbose( + `telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`, + ); + } + await lane.stream?.stop(); + const delivered = await sendPayload(applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + } + + if (allowPreviewUpdateForNonFinal && canEditViaPreview) { + const updated = await tryUpdatePreviewForLane({ + lane, + laneName, + text, + previewButtons, + stopBeforeEdit: false, + updateLaneSnapshot: true, + skipRegressive: "always", + context: "update", + }); + if (updated) { + return "preview-updated"; + } + } + + const delivered = await sendPayload(applyTextToPayload(payload, text)); + return delivered ? "sent" : "skipped"; + }; let queuedFinal = false; - let dispatchError: unknown; try { ({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, @@ -320,117 +552,86 @@ export const dispatchTelegramMessage = async ({ dispatcherOptions: { ...prefixOptions, deliver: async (payload, info) => { - if (info.kind === "final") { - await flushDraft(); - const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const previewMessageId = draftStream?.messageId(); - const finalText = payload.text; - const currentPreviewText = streamMode === "block" ? draftText : lastPartialText; - const previewButtons = ( - payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined + const previewButtons = ( + payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined + )?.buttons; + const segments = splitTextIntoLaneSegments(payload.text); + const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + + const flushBufferedFinalAnswer = async () => { + const buffered = reasoningStepState.takeBufferedFinalAnswer(); + if (!buffered) { + return; + } + const bufferedButtons = ( + buffered.payload.channelData?.telegram as + | { buttons?: TelegramInlineButtons } + | undefined )?.buttons; - let draftStoppedForPreviewEdit = false; - // Skip preview edit for error payloads to avoid overwriting previous content - const canFinalizeViaPreviewEdit = - !finalizedViaPreviewMessage && - !hasMedia && - typeof finalText === "string" && - finalText.length > 0 && - typeof previewMessageId === "number" && - finalText.length <= draftMaxChars && - !payload.isError; - if (canFinalizeViaPreviewEdit) { - await draftStream?.stop(); - draftStoppedForPreviewEdit = true; - if ( - currentPreviewText && - currentPreviewText.startsWith(finalText) && - finalText.length < currentPreviewText.length - ) { - // Ignore regressive final edits (e.g., "Okay." -> "Ok"), which - // can appear transiently in some provider streams. - return; - } - try { - await editMessageTelegram(chatId, previewMessageId, finalText, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - finalizedViaPreviewMessage = true; - deliveryState.delivered = true; - logVerbose( - `telegram: finalized response via preview edit (messageId=${previewMessageId})`, - ); - return; - } catch (err) { - logVerbose( - `telegram: preview final edit failed; falling back to standard send (${String(err)})`, - ); - } - } + await deliverLaneText({ + laneName: "answer", + text: buffered.text, + payload: buffered.payload, + infoKind: "final", + previewButtons: bufferedButtons, + }); + reasoningStepState.resetForNextStep(); + }; + + for (const segment of segments) { if ( - !hasMedia && - !payload.isError && - typeof finalText === "string" && - finalText.length > draftMaxChars + segment.lane === "answer" && + info.kind === "final" && + reasoningStepState.shouldBufferFinalAnswer() ) { - logVerbose( - `telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`, - ); + reasoningStepState.bufferFinalAnswer({ payload, text: segment.text }); + continue; } - if (!draftStoppedForPreviewEdit) { - await draftStream?.stop(); + if (segment.lane === "reasoning") { + reasoningStepState.noteReasoningHint(); } - // Check if stop() sent a message (debounce released on isFinal) - // If so, edit that message instead of sending a new one - const messageIdAfterStop = draftStream?.messageId(); - if ( - !finalizedViaPreviewMessage && - typeof messageIdAfterStop === "number" && - typeof finalText === "string" && - finalText.length > 0 && - finalText.length <= draftMaxChars && - !hasMedia && - !payload.isError - ) { - try { - await editMessageTelegram(chatId, messageIdAfterStop, finalText, { - api: bot.api, - cfg, - accountId: route.accountId, - linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, - }); - finalizedViaPreviewMessage = true; - deliveryState.delivered = true; - logVerbose( - `telegram: finalized response via post-stop preview edit (messageId=${messageIdAfterStop})`, - ); - return; - } catch (err) { - logVerbose( - `telegram: post-stop preview edit failed; falling back to standard send (${String(err)})`, - ); + const result = await deliverLaneText({ + laneName: segment.lane, + text: segment.text, + payload, + infoKind: info.kind, + previewButtons, + allowPreviewUpdateForNonFinal: segment.lane === "reasoning", + }); + if (segment.lane === "reasoning") { + if (result !== "skipped") { + reasoningStepState.noteReasoningDelivered(); + await flushBufferedFinalAnswer(); } + continue; + } + if (info.kind === "final") { + if (reasoningLane.hasStreamedMessage) { + finalizedPreviewByLane.reasoning = true; + } + reasoningStepState.resetForNextStep(); } } - const result = await deliverReplies({ - ...deliveryBaseOptions, - replies: [payload], - onVoiceRecording: sendRecordVoice, - }); - if (result.delivered) { - deliveryState.delivered = true; - logVerbose( - `telegram: ${info.kind} reply delivered to chat ${chatId}${payload.isError ? " (error payload)" : ""}`, - ); - } else { - logVerbose( - `telegram: ${info.kind} reply delivery returned not-delivered for chat ${chatId}`, - ); + if (segments.length > 0) { + return; + } + + if (info.kind === "final") { + await answerLane.stream?.stop(); + await reasoningLane.stream?.stop(); + reasoningStepState.resetForNextStep(); + } + const canSendAsIs = + hasMedia || typeof payload.text !== "string" || payload.text.length > 0; + if (!canSendAsIs) { + if (info.kind === "final") { + await flushBufferedFinalAnswer(); + } + return; + } + await sendPayload(payload); + if (info.kind === "final") { + await flushBufferedFinalAnswer(); } }, onSkip: (_payload, info) => { @@ -439,7 +640,7 @@ export const dispatchTelegramMessage = async ({ } }, onError: (err, info) => { - deliveryState.failedDeliveries += 1; + deliveryState.failedNonSilent += 1; runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`)); }, onReplyStart: createTypingCallbacks({ @@ -457,60 +658,82 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, - onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, - onAssistantMessageStart: draftStream - ? () => { - // Only split preview bubbles in block mode. In partial mode, keep - // editing one preview message to avoid flooding the chat. - logVerbose( - `telegram: onAssistantMessageStart called, hasStreamedMessage=${hasStreamedMessage}`, - ); - if (shouldSplitPreviewMessages && hasStreamedMessage) { - logVerbose(`telegram: calling forceNewMessage()`); - draftStream.forceNewMessage(); + onPartialReply: + answerLane.stream || reasoningLane.stream + ? (payload) => ingestDraftLaneSegments(payload.text) + : undefined, + onReasoningStream: reasoningLane.stream + ? (payload) => { + // Split between reasoning blocks only when the next reasoning + // stream starts. Splitting at reasoning-end can orphan the active + // preview and cause duplicate reasoning sends on reasoning final. + if (splitReasoningOnNextStream) { + reasoningLane.stream?.forceNewMessage(); + resetDraftLaneState(reasoningLane); + splitReasoningOnNextStream = false; } - lastPartialText = ""; - draftText = ""; - draftChunker?.reset(); + ingestDraftLaneSegments(payload.text); } : undefined, - onReasoningEnd: draftStream + onAssistantMessageStart: answerLane.stream ? () => { - // Same policy as assistant-message boundaries: split only in block mode. - if (shouldSplitPreviewMessages && hasStreamedMessage) { - draftStream.forceNewMessage(); + reasoningStepState.resetForNextStep(); + // Keep answer blocks separated in block mode; partial mode keeps one answer lane. + if (streamMode === "block" && answerLane.hasStreamedMessage) { + answerLane.stream?.forceNewMessage(); } - lastPartialText = ""; - draftText = ""; - draftChunker?.reset(); + resetDraftLaneState(answerLane); + } + : undefined, + onReasoningEnd: reasoningLane.stream + ? () => { + // Split when/if a later reasoning block begins. + splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; } : undefined, onModelSelected, }, })); - } catch (err) { - dispatchError = err; } finally { - await draftStream?.stop(); + // Must stop() first to flush debounced content before clear() wipes state. + const streamCleanupStates = new Map< + NonNullable, + { shouldClear: boolean } + >(); + const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [ + { laneName: "answer", lane: answerLane }, + { laneName: "reasoning", lane: reasoningLane }, + ]; + for (const laneState of lanesToCleanup) { + const stream = laneState.lane.stream; + if (!stream) { + continue; + } + const shouldClear = !finalizedPreviewByLane[laneState.laneName]; + const existing = streamCleanupStates.get(stream); + if (!existing) { + streamCleanupStates.set(stream, { shouldClear }); + continue; + } + existing.shouldClear = existing.shouldClear && shouldClear; + } + for (const [stream, cleanupState] of streamCleanupStates) { + await stream.stop(); + if (cleanupState.shouldClear) { + await stream.clear(); + } + } } let sentFallback = false; - try { - if ( - !dispatchError && - !deliveryState.delivered && - (deliveryState.skippedNonSilent > 0 || deliveryState.failedDeliveries > 0) - ) { - const result = await deliverReplies({ - replies: [{ text: EMPTY_RESPONSE_FALLBACK }], - ...deliveryBaseOptions, - }); - sentFallback = result.delivered; - } - } finally { - await clearDraftPreviewIfNeeded(); - } - if (dispatchError) { - throw dispatchError; + if ( + !deliveryState.delivered && + (deliveryState.skippedNonSilent > 0 || deliveryState.failedNonSilent > 0) + ) { + const result = await deliverReplies({ + replies: [{ text: EMPTY_RESPONSE_FALLBACK }], + ...deliveryBaseOptions, + }); + sentFallback = result.delivered; } const hasFinalResponse = queuedFinal || sentFallback; diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index d01a6c29c5e..7532015a5bb 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -133,6 +133,48 @@ describe("createTelegramDraftStream", () => { expect(api.sendMessage).toHaveBeenCalledTimes(2); expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined); }); + + it("supports rendered previews with parse_mode", async () => { + const api = createMockDraftApi(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + renderText: (text) => ({ text: `${text}`, parseMode: "HTML" }), + }); + + stream.update("hello"); + await stream.flush(); + expect(api.sendMessage).toHaveBeenCalledWith(123, "hello", { parse_mode: "HTML" }); + + stream.update("hello again"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledWith(123, 17, "hello again", { + parse_mode: "HTML", + }); + }); + + it("enforces maxChars after renderText expansion", async () => { + const api = createMockDraftApi(); + const warn = vi.fn(); + const stream = createTelegramDraftStream({ + // oxlint-disable-next-line typescript/no-explicit-any + api: api as any, + chatId: 123, + maxChars: 100, + renderText: () => ({ text: `${"<".repeat(120)}`, parseMode: "HTML" }), + warn, + }); + + stream.update("short raw text"); + await stream.flush(); + + expect(api.sendMessage).not.toHaveBeenCalled(); + expect(api.editMessageText).not.toHaveBeenCalled(); + expect(warn).toHaveBeenCalledWith( + expect.stringContaining("telegram stream preview stopped (text length 127 > 100)"), + ); + }); }); describe("draft stream initial message debounce", () => { diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index 9d87358671d..a4a6b2db20c 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -15,6 +15,11 @@ export type TelegramDraftStream = { forceNewMessage: () => void; }; +type TelegramDraftPreview = { + text: string; + parseMode?: "HTML"; +}; + export function createTelegramDraftStream(params: { api: Bot["api"]; chatId: number; @@ -24,6 +29,8 @@ export function createTelegramDraftStream(params: { throttleMs?: number; /** Minimum chars before sending first message (debounce for push notifications) */ minInitialChars?: number; + /** Optional preview renderer (e.g. markdown -> HTML + parse mode). */ + renderText?: (text: string) => TelegramDraftPreview; log?: (message: string) => void; warn?: (message: string) => void; }): TelegramDraftStream { @@ -42,6 +49,7 @@ export function createTelegramDraftStream(params: { let streamMessageId: number | undefined; let lastSentText = ""; + let lastSentParseMode: "HTML" | undefined; let stopped = false; let isFinal = false; @@ -54,33 +62,52 @@ export function createTelegramDraftStream(params: { if (!trimmed) { return false; } - if (trimmed.length > maxChars) { + const rendered = params.renderText?.(trimmed) ?? { text: trimmed }; + const renderedText = rendered.text.trimEnd(); + const renderedParseMode = rendered.parseMode; + if (!renderedText) { + return false; + } + if (renderedText.length > maxChars) { // Telegram text messages/edits cap at 4096 chars. // Stop streaming once we exceed the cap to avoid repeated API failures. stopped = true; params.warn?.( - `telegram stream preview stopped (text length ${trimmed.length} > ${maxChars})`, + `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, ); return false; } - if (trimmed === lastSentText) { + if (renderedText === lastSentText && renderedParseMode === lastSentParseMode) { return true; } // Debounce first preview send for better push notification quality. if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) { - if (trimmed.length < minInitialChars) { + if (renderedText.length < minInitialChars) { return false; } } - lastSentText = trimmed; + lastSentText = renderedText; + lastSentParseMode = renderedParseMode; try { if (typeof streamMessageId === "number") { - await params.api.editMessageText(chatId, streamMessageId, trimmed); + if (renderedParseMode) { + await params.api.editMessageText(chatId, streamMessageId, renderedText, { + parse_mode: renderedParseMode, + }); + } else { + await params.api.editMessageText(chatId, streamMessageId, renderedText); + } return true; } - const sent = await params.api.sendMessage(chatId, trimmed, replyParams); + const sendParams = renderedParseMode + ? { + ...replyParams, + parse_mode: renderedParseMode, + } + : replyParams; + const sent = await params.api.sendMessage(chatId, renderedText, sendParams); const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { stopped = true; @@ -138,6 +165,7 @@ export function createTelegramDraftStream(params: { const forceNewMessage = () => { streamMessageId = undefined; lastSentText = ""; + lastSentParseMode = undefined; loop.resetPending(); }; diff --git a/src/telegram/reasoning-lane-coordinator.test.ts b/src/telegram/reasoning-lane-coordinator.test.ts new file mode 100644 index 00000000000..2dd3a94647f --- /dev/null +++ b/src/telegram/reasoning-lane-coordinator.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from "vitest"; +import { splitTelegramReasoningText } from "./reasoning-lane-coordinator.js"; + +describe("splitTelegramReasoningText", () => { + it("splits real tagged reasoning and answer", () => { + expect(splitTelegramReasoningText("exampleDone")).toEqual({ + reasoningText: "Reasoning:\n_example_", + answerText: "Done", + }); + }); + + it("ignores literal think tags inside inline code", () => { + const text = "Use `example` literally."; + expect(splitTelegramReasoningText(text)).toEqual({ + answerText: text, + }); + }); + + it("ignores literal think tags inside fenced code", () => { + const text = "```xml\nexample\n```"; + expect(splitTelegramReasoningText(text)).toEqual({ + answerText: text, + }); + }); +}); diff --git a/src/telegram/reasoning-lane-coordinator.ts b/src/telegram/reasoning-lane-coordinator.ts new file mode 100644 index 00000000000..045ab46b28f --- /dev/null +++ b/src/telegram/reasoning-lane-coordinator.ts @@ -0,0 +1,167 @@ +import { formatReasoningMessage } from "../agents/pi-embedded-utils.js"; +import type { ReplyPayload } from "../auto-reply/types.js"; +import { stripReasoningTagsFromText } from "../shared/text/reasoning-tags.js"; + +const REASONING_MESSAGE_PREFIX = "Reasoning:\n"; +const REASONING_TAG_PREFIXES = [ + "]*>/gi; + +interface CodeRegion { + start: number; + end: number; +} + +function findCodeRegions(text: string): CodeRegion[] { + const regions: CodeRegion[] = []; + + const fencedRe = /(^|\n)(```|~~~)[^\n]*\n[\s\S]*?(?:\n\2(?:\n|$)|$)/g; + for (const match of text.matchAll(fencedRe)) { + const start = (match.index ?? 0) + match[1].length; + regions.push({ start, end: start + match[0].length - match[1].length }); + } + + const inlineRe = /`+[^`]+`+/g; + for (const match of text.matchAll(inlineRe)) { + const start = match.index ?? 0; + const end = start + match[0].length; + const insideFenced = regions.some((r) => start >= r.start && end <= r.end); + if (!insideFenced) { + regions.push({ start, end }); + } + } + + regions.sort((a, b) => a.start - b.start); + return regions; +} + +function isInsideCode(pos: number, regions: CodeRegion[]): boolean { + return regions.some((r) => pos >= r.start && pos < r.end); +} + +function extractThinkingFromTaggedStreamOutsideCode(text: string): string { + if (!text) { + return ""; + } + const codeRegions = findCodeRegions(text); + let result = ""; + let lastIndex = 0; + let inThinking = false; + THINKING_TAG_RE.lastIndex = 0; + for (const match of text.matchAll(THINKING_TAG_RE)) { + const idx = match.index ?? 0; + if (isInsideCode(idx, codeRegions)) { + continue; + } + if (inThinking) { + result += text.slice(lastIndex, idx); + } + const isClose = match[1] === "/"; + inThinking = !isClose; + lastIndex = idx + match[0].length; + } + if (inThinking) { + result += text.slice(lastIndex); + } + return result.trim(); +} + +function isPartialReasoningTagPrefix(text: string): boolean { + const trimmed = text.trimStart().toLowerCase(); + if (!trimmed.startsWith("<")) { + return false; + } + if (trimmed.includes(">")) { + return false; + } + return REASONING_TAG_PREFIXES.some((prefix) => prefix.startsWith(trimmed)); +} + +export type TelegramReasoningSplit = { + reasoningText?: string; + answerText?: string; +}; + +export function splitTelegramReasoningText(text?: string): TelegramReasoningSplit { + if (typeof text !== "string") { + return {}; + } + + const trimmed = text.trim(); + if (isPartialReasoningTagPrefix(trimmed)) { + return {}; + } + if ( + trimmed.startsWith(REASONING_MESSAGE_PREFIX) && + trimmed.length > REASONING_MESSAGE_PREFIX.length + ) { + return { reasoningText: trimmed }; + } + + const taggedReasoning = extractThinkingFromTaggedStreamOutsideCode(text); + const strippedAnswer = stripReasoningTagsFromText(text, { mode: "strict", trim: "both" }); + + if (!taggedReasoning && strippedAnswer === text) { + return { answerText: text }; + } + + const reasoningText = taggedReasoning ? formatReasoningMessage(taggedReasoning) : undefined; + const answerText = strippedAnswer || undefined; + return { reasoningText, answerText }; +} + +export type BufferedFinalAnswer = { + payload: ReplyPayload; + text: string; +}; + +export function createTelegramReasoningStepState() { + let reasoningStatus: "none" | "hinted" | "delivered" = "none"; + let bufferedFinalAnswer: BufferedFinalAnswer | undefined; + + const noteReasoningHint = () => { + if (reasoningStatus === "none") { + reasoningStatus = "hinted"; + } + }; + + const noteReasoningDelivered = () => { + reasoningStatus = "delivered"; + }; + + const shouldBufferFinalAnswer = () => { + return reasoningStatus === "hinted" && !bufferedFinalAnswer; + }; + + const bufferFinalAnswer = (value: BufferedFinalAnswer) => { + bufferedFinalAnswer = value; + }; + + const takeBufferedFinalAnswer = (): BufferedFinalAnswer | undefined => { + const value = bufferedFinalAnswer; + bufferedFinalAnswer = undefined; + return value; + }; + + const resetForNextStep = () => { + reasoningStatus = "none"; + bufferedFinalAnswer = undefined; + }; + + return { + noteReasoningHint, + noteReasoningDelivered, + shouldBufferFinalAnswer, + bufferFinalAnswer, + takeBufferedFinalAnswer, + resetForNextStep, + }; +}