From e7277b4e3a4bebc21ec603495a2604faf4c53c17 Mon Sep 17 00:00:00 2001 From: Super Zheng Date: Fri, 8 May 2026 21:08:21 +0800 Subject: [PATCH] refactor(agents): preserve raw reasoning stream and push formatting to edge (#78397) Merged via squash. Prepared head SHA: bb56f7ee000d6ea3334d378d6f5f209e086f7201 Co-authored-by: medns <1575008+medns@users.noreply.github.com> Co-authored-by: odysseus0 <8635094+odysseus0@users.noreply.github.com> Reviewed-by: @odysseus0 --- CHANGELOG.md | 1 + .../monitor/message-handler.process.test.ts | 10 ++-- .../src/monitor/message-handler.process.ts | 11 ++++- .../feishu/src/reply-dispatcher.test.ts | 9 ++-- extensions/feishu/src/reply-dispatcher.ts | 7 ++- .../src/mattermost/reply-delivery.test.ts | 2 +- .../dispatch.preview-fallback.test.ts | 2 +- .../telegram/src/bot-message-dispatch.ts | 15 +++--- .../src/reasoning-lane-coordinator.ts | 9 +++- .../src/auto-reply/deliver-reply.test.ts | 2 +- .../monitor/inbound-dispatch.test.ts | 2 +- src/agents/pi-embedded-runner/run/payloads.ts | 3 +- ...pi-embedded-subscribe.handlers.messages.ts | 13 +++-- ...soning-as-separate-message-enabled.test.ts | 2 +- ...session.subscribeembeddedpisession.test.ts | 47 ++++++++++++++++++- src/agents/pi-embedded-subscribe.ts | 17 ++++--- .../reply/dispatch-from-config.test.ts | 6 +-- src/auto-reply/reply/reply-utils.test.ts | 4 +- src/auto-reply/reply/route-reply.test.ts | 2 +- src/daemon/service-env.test.ts | 4 +- .../server-methods/chat-webchat-media.test.ts | 2 +- .../chat.directive-tags.test.ts | 2 +- ...tbeat-runner.returns-default-unset.test.ts | 14 ++++-- src/infra/heartbeat-runner.ts | 23 +++++++-- 24 files changed, 146 insertions(+), 63 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab4d0a2cfa9..3a7bbe85929 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -648,6 +648,7 @@ Docs: https://docs.openclaw.ai - Browser/downloads: route explicit and managed browser download output directories through `fs-safe` validation before staging final files, so symlinked output roots are rejected before writes. (#78780) Thanks @jesse-merhi. - Agents/PI: skip the idle wait during aborted embedded-run cleanup, so stopped or timed-out runs clear pending tool state and release the session lock promptly. (#74919) Thanks @medns. - Agents/current-time: split UTC into a separate `Reference UTC:` prompt line so local `Current time:` stays anchored to the user's timezone. (#42654) Thanks @chencheng-li. +- Agents/reasoning: keep embedded reasoning deltas raw for correct same-line streaming while preserving formatted Telegram, Feishu, Discord, and heartbeat delivery at the channel edge. (#78397) Thanks @medns. ## 2026.5.3-1 diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 3f738cc3229..16343b8c7b4 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -1725,8 +1725,8 @@ describe("processDiscordMessage draft streaming", () => { kind: "analysis", title: "Reasoning", }); - await params?.replyOptions?.onReasoningStream?.({ text: "Reading " }); - await params?.replyOptions?.onReasoningStream?.({ text: "the event projector" }); + await params?.replyOptions?.onReasoningStream?.({ text: "Reading" }); + await params?.replyOptions?.onReasoningStream?.({ text: "Reading the event projector" }); return createNoQueuedDispatchResult(); }); @@ -1744,7 +1744,7 @@ describe("processDiscordMessage draft streaming", () => { await runProcessDiscordMessage(ctx); expect(draftStream.update).toHaveBeenCalledWith( - "Clawing...\n🛠️ Exec\n• Reading the event projector", + "Clawing...\n🛠️ Exec\n• _Reading the event projector_", ); expect(draftStream.update).not.toHaveBeenCalledWith(expect.stringContaining("Reasoning")); }); @@ -1754,9 +1754,9 @@ describe("processDiscordMessage draft streaming", () => { dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { await params?.replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); - await params?.replyOptions?.onReasoningStream?.({ text: "Reasoning:\n_Checking files_" }); + await params?.replyOptions?.onReasoningStream?.({ text: "Checking files" }); await params?.replyOptions?.onReasoningStream?.({ - text: "Reasoning:\n_Checking files and tests_", + text: "Checking files and tests", }); return createNoQueuedDispatchResult(); }); diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 2810dc5f5bf..65324c88774 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -1,4 +1,8 @@ -import { resolveAckReaction, resolveHumanDelayConfig } from "openclaw/plugin-sdk/agent-runtime"; +import { + formatReasoningMessage, + resolveAckReaction, + resolveHumanDelayConfig, +} from "openclaw/plugin-sdk/agent-runtime"; import { createStatusReactionController, DEFAULT_TIMING, @@ -665,7 +669,10 @@ export async function processDiscordMessage( draftPreview.suppressDefaultToolProgressMessages ? true : undefined, onReasoningStream: async (payload) => { await statusReactions.setThinking(); - await draftPreview.pushReasoningProgress(payload?.text); + const formattedText = payload?.text + ? formatReasoningMessage(payload.text) + : undefined; + await draftPreview.pushReasoningProgress(formattedText); }, onToolStart: async (payload) => { if (isProcessAborted(abortSignal)) { diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index 85d36c61721..66560972291 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -888,10 +888,9 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); await options.onReplyStart?.(); - // Core agent sends pre-formatted text from formatReasoningMessage - result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thinking step 1_" }); + result.replyOptions.onReasoningStream?.({ text: "thinking step 1" }); result.replyOptions.onReasoningStream?.({ - text: "Reasoning:\n_thinking step 1_\n_step 2_", + text: "thinking step 1\nstep 2", }); result.replyOptions.onPartialReply?.({ text: "answer part" }); result.replyOptions.onReasoningEnd?.(); @@ -967,7 +966,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); await options.onReplyStart?.(); - result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_deep thought_" }); + result.replyOptions.onReasoningStream?.({ text: "deep thought" }); result.replyOptions.onReasoningEnd?.(); await options.onIdle?.(); @@ -1005,7 +1004,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); await options.onReplyStart?.(); - result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" }); + result.replyOptions.onReasoningStream?.({ text: "thought" }); result.replyOptions.onReasoningEnd?.(); await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" }); await options.onIdle?.(); diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 2c11c3e546d..9c56f9fa17e 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -1,3 +1,4 @@ +import { formatReasoningMessage } from "openclaw/plugin-sdk/agent-runtime"; import { logTypingFailure } from "openclaw/plugin-sdk/channel-feedback"; import { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message"; import { @@ -522,7 +523,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP await typingCallbacks?.onReplyStart?.(); }, deliver: async (payload: ReplyPayload, info) => { - const reply = resolveSendableOutboundReplyParts(payload); + const payloadText = + payload.isReasoning && payload.text ? formatReasoningMessage(payload.text) : payload.text; + const reply = resolveSendableOutboundReplyParts({ ...payload, text: payloadText }); const text = reply.text; const hasText = reply.hasText; const hasMedia = reply.hasMedia; @@ -694,7 +697,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP return; } startStreaming(); - queueReasoningUpdate(payload.text); + queueReasoningUpdate(formatReasoningMessage(payload.text)); } : undefined, onReasoningEnd: reasoningPreviewEnabled ? () => {} : undefined, diff --git a/extensions/mattermost/src/mattermost/reply-delivery.test.ts b/extensions/mattermost/src/mattermost/reply-delivery.test.ts index 7714121c855..4d2828e20b1 100644 --- a/extensions/mattermost/src/mattermost/reply-delivery.test.ts +++ b/extensions/mattermost/src/mattermost/reply-delivery.test.ts @@ -46,7 +46,7 @@ describe("deliverMattermostReplyPayload", () => { await deliverMattermostReplyPayload({ core, cfg, - payload: { text: "Reasoning:\n_hidden_", isReasoning: true }, + payload: { text: "hidden", isReasoning: true }, to: "channel:town-square", accountId: "default", agentId: "agent-1", diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 689ee830b07..266df7bfd0b 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -971,7 +971,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { it("suppresses reasoning payloads before Slack native streaming delivery", async () => { mockedNativeStreaming = true; mockedDispatchSequence = [ - { kind: "block", payload: { text: "Reasoning:\n_hidden_", isReasoning: true } }, + { kind: "block", payload: { text: "hidden", isReasoning: true } }, { kind: "final", payload: { text: FINAL_REPLY_TEXT } }, ]; diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 5cb0388ca5c..e5d9fb373c9 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -574,8 +574,11 @@ export const dispatchTelegramMessage = async ({ segments: SplitLaneSegment[]; suppressedReasoningOnly: boolean; }; - const splitTextIntoLaneSegments = (text?: string): SplitLaneSegmentsResult => { - const split = splitTelegramReasoningText(text); + const splitTextIntoLaneSegments = ( + text?: string, + isReasoning?: boolean, + ): SplitLaneSegmentsResult => { + const split = splitTelegramReasoningText(text, isReasoning); const segments: SplitLaneSegment[] = []; const suppressReasoning = resolvedReasoningLevel === "off"; if (split.reasoningText && !suppressReasoning) { @@ -637,8 +640,8 @@ export const dispatchTelegramMessage = async ({ lane.lastPartialText = text; laneStream.update(text); }; - const ingestDraftLaneSegments = async (text: string | undefined) => { - const split = splitTextIntoLaneSegments(text); + const ingestDraftLaneSegments = async (text: string | undefined, isReasoning?: boolean) => { + const split = splitTextIntoLaneSegments(text, isReasoning); for (const segment of split.segments) { if (segment.lane === "answer") { await prepareAnswerLaneForText(); @@ -1037,7 +1040,7 @@ export const dispatchTelegramMessage = async ({ | { buttons?: TelegramInlineButtons } | undefined )?.buttons; - const split = splitTextIntoLaneSegments(payload.text); + const split = splitTextIntoLaneSegments(payload.text, payload.isReasoning); const segments = split.segments; const reply = resolveSendableOutboundReplyParts(payload); const _hasMedia = reply.hasMedia; @@ -1192,7 +1195,7 @@ export const dispatchTelegramMessage = async ({ resetDraftLaneState(reasoningLane); splitReasoningOnNextStream = false; } - await ingestDraftLaneSegments(payload.text); + await ingestDraftLaneSegments(payload.text, true); }) : undefined, onAssistantMessageStart: answerLane.stream diff --git a/extensions/telegram/src/reasoning-lane-coordinator.ts b/extensions/telegram/src/reasoning-lane-coordinator.ts index 42acc89fb7f..b18e5f92db3 100644 --- a/extensions/telegram/src/reasoning-lane-coordinator.ts +++ b/extensions/telegram/src/reasoning-lane-coordinator.ts @@ -62,7 +62,10 @@ type TelegramReasoningSplit = { answerText?: string; }; -export function splitTelegramReasoningText(text?: string): TelegramReasoningSplit { +export function splitTelegramReasoningText( + text?: string, + isReasoning?: boolean, +): TelegramReasoningSplit { if (typeof text !== "string") { return {}; } @@ -81,6 +84,10 @@ export function splitTelegramReasoningText(text?: string): TelegramReasoningSpli const taggedReasoning = extractThinkingFromTaggedStreamOutsideCode(text); const strippedAnswer = stripReasoningTagsFromText(text, { mode: "strict", trim: "both" }); + if (isReasoning === true) { + return { reasoningText: formatReasoningMessage(taggedReasoning || strippedAnswer || text) }; + } + if (!taggedReasoning && strippedAnswer === text) { return { answerText: text }; } diff --git a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts index e745bbc7207..11cb361f0ac 100644 --- a/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts +++ b/extensions/whatsapp/src/auto-reply/deliver-reply.test.ts @@ -161,7 +161,7 @@ describe("deliverWebReply", () => { }); it("suppresses payloads flagged as reasoning", async () => { - await expectReplySuppressed({ text: "Reasoning:\n_hidden_", isReasoning: true }); + await expectReplySuppressed({ text: "hidden", isReasoning: true }); }); it("suppresses payloads that start with reasoning prefix text", async () => { diff --git a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts index 0a26be224b5..bea5848990e 100644 --- a/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts +++ b/extensions/whatsapp/src/auto-reply/monitor/inbound-dispatch.test.ts @@ -699,7 +699,7 @@ describe("whatsapp inbound dispatch", () => { const deliver = getCapturedDeliver(); expect(deliver).toBeTypeOf("function"); - await deliver?.({ text: "Reasoning:\n_hidden_", isReasoning: true }, { kind: "block" }); + await deliver?.({ text: "hidden", isReasoning: true }, { kind: "block" }); await deliver?.( { text: "🧹 Compacting context...", isCompactionNotice: true }, { kind: "block" }, diff --git a/src/agents/pi-embedded-runner/run/payloads.ts b/src/agents/pi-embedded-runner/run/payloads.ts index 2499d83b9cd..ad7e014ebef 100644 --- a/src/agents/pi-embedded-runner/run/payloads.ts +++ b/src/agents/pi-embedded-runner/run/payloads.ts @@ -27,7 +27,6 @@ import type { ToolResultFormat } from "../../pi-embedded-subscribe.shared-types. import { extractAssistantThinking, extractAssistantVisibleText, - formatReasoningMessage, } from "../../pi-embedded-utils.js"; import { isExecLikeToolName, type ToolErrorSummary } from "../../tool-error-summary.js"; import { isLikelyMutatingToolName } from "../../tool-mutation.js"; @@ -283,7 +282,7 @@ export function buildEmbeddedRunPayloads(params: { const reasoningText = suppressAssistantArtifacts ? "" : params.lastAssistant && params.reasoningLevel === "on" && params.thinkingLevel !== "off" - ? formatReasoningMessage(extractAssistantThinking(params.lastAssistant)) + ? extractAssistantThinking(params.lastAssistant) : ""; if (reasoningText) { replyItems.push({ text: reasoningText, isReasoning: true }); diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index a1c56e085ef..625e735bc15 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -34,7 +34,6 @@ import { extractAssistantVisibleText, extractThinkingFromTaggedStream, extractThinkingFromTaggedText, - formatReasoningMessage, promoteThinkingTagsToBlocks, } from "./pi-embedded-utils.js"; @@ -692,7 +691,7 @@ export function handleMessageEnd( ctx.state.includeReasoning || ctx.state.streamReasoning ? extractAssistantThinking(assistantMessage) || extractThinkingFromTaggedText(rawText) : ""; - const formattedReasoning = rawThinking ? formatReasoningMessage(rawThinking) : ""; + const trimmedReasoning = rawThinking ? rawThinking.trim() : ""; const trimmedText = text.trim(); const parsedText = trimmedText ? parseReplyDirectives(splitTrailingDirective(trimmedText, { final: true }).text) @@ -770,18 +769,18 @@ export function handleMessageEnd( !ctx.params.silentExpected && !suppressDeterministicApprovalOutput && ctx.state.includeReasoning && - formattedReasoning && + trimmedReasoning && onBlockReply && - formattedReasoning !== ctx.state.lastReasoningSent, + trimmedReasoning !== ctx.state.lastReasoningSent, ); const shouldEmitReasoningBeforeAnswer = shouldEmitReasoning && ctx.state.blockReplyBreak === "message_end" && !addedDuringMessage; const maybeEmitReasoning = () => { - if (!shouldEmitReasoning || !formattedReasoning) { + if (!shouldEmitReasoning || !trimmedReasoning) { return; } - ctx.state.lastReasoningSent = formattedReasoning; - ctx.emitBlockReply({ text: formattedReasoning, isReasoning: true }); + ctx.state.lastReasoningSent = trimmedReasoning; + ctx.emitBlockReply({ text: trimmedReasoning, isReasoning: true }); }; if (shouldEmitReasoningBeforeAnswer) { diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.test.ts index 3bceb2dd171..82a6e375b8e 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-reasoning-as-separate-message-enabled.test.ts @@ -26,7 +26,7 @@ describe("subscribeEmbeddedPiSession", () => { function expectReasoningAndAnswerCalls(onBlockReply: ReturnType) { expect(onBlockReply).toHaveBeenCalledTimes(2); - expect(onBlockReply.mock.calls[0][0].text).toBe("Reasoning:\n_Because it helps_"); + expect(onBlockReply.mock.calls[0][0].text).toBe("Because it helps"); expect(onBlockReply.mock.calls[1][0].text).toBe("Final answer"); } diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index cdb23c295ff..26c4e845a7d 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -1,5 +1,6 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; +import * as agentEvents from "../infra/agent-events.js"; import { THINKING_TAG_CASES, createSubscribedSessionHarness, @@ -216,7 +217,7 @@ describe("subscribeEmbeddedPiSession", () => { const streamTexts = onReasoningStream.mock.calls .map((call) => call[0]?.text) .filter((value): value is string => typeof value === "string"); - expect(streamTexts.at(-1)).toBe("Reasoning:\n_Because it helps_"); + expect(streamTexts.at(-1)).toBe("Because it helps"); expect(assistantMessage.content).toEqual([ { type: "thinking", thinking: "Because it helps" }, @@ -757,10 +758,52 @@ describe("subscribeEmbeddedPiSession", () => { const streamTexts = onReasoningStream.mock.calls .map((call) => call[0]?.text) .filter((value): value is string => typeof value === "string"); - expect(streamTexts.at(-1)).toBe("Reasoning:\n_Checking files done_"); + expect(streamTexts.at(-1)).toBe("Checking files done"); expect(onReasoningEnd).toHaveBeenCalledTimes(1); }); + it("extracts correct reasoning delta for incremental stream updates", () => { + const emitAgentEventSpy = vi.spyOn(agentEvents, "emitAgentEvent").mockImplementation(() => {}); + const { emit } = createSubscribedHarness({ + runId: "run", + reasoningMode: "stream", + onReasoningStream: vi.fn(), + }); + + emit({ + type: "message_update", + message: { + role: "assistant", + content: [{ type: "thinking", thinking: "Step 1" }], + }, + assistantMessageEvent: { + type: "thinking_delta", + delta: "Step 1", + }, + }); + + emit({ + type: "message_update", + message: { + role: "assistant", + content: [{ type: "thinking", thinking: "Step 1 and Step 2" }], + }, + assistantMessageEvent: { + type: "thinking_delta", + delta: " and Step 2", + }, + }); + + const thinkingEvents = emitAgentEventSpy.mock.calls + .map((call) => call[0]) + .filter((evt) => evt?.stream === "thinking"); + + expect(thinkingEvents.length).toBe(2); + expect(thinkingEvents[0]?.data?.delta).toBe("Step 1"); + expect(thinkingEvents[1]?.data?.delta).toBe(" and Step 2"); + emitAgentEventSpy.mockRestore(); + }); + it("emits reasoning end once when native and tagged reasoning end overlap", () => { const onReasoningEnd = vi.fn(); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index b4431a0b6ab..51aaad9dc71 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -36,7 +36,6 @@ import { isPromiseLike } from "./pi-embedded-subscribe.promise.js"; import { filterToolResultMediaUrls } from "./pi-embedded-subscribe.tools.js"; import type { SubscribeEmbeddedPiSessionParams } from "./pi-embedded-subscribe.types.js"; import { - formatReasoningMessage, stripDowngradedToolCallText, THINKING_TAG_SCAN_RE, } from "./pi-embedded-utils.js"; @@ -831,31 +830,31 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar if (!state.streamReasoning || !params.onReasoningStream) { return; } - const formatted = formatReasoningMessage(text); - if (!formatted) { + const trimmed = text.trim(); + if (!trimmed) { return; } - if (formatted === state.lastStreamedReasoning) { + if (trimmed === state.lastStreamedReasoning) { return; } // Compute delta: new text since the last emitted reasoning. - // Guard against non-prefix changes (e.g. trim/format altering earlier content). + // Guard against non-prefix changes (e.g. trim altering earlier content). const prior = state.lastStreamedReasoning ?? ""; - const delta = formatted.startsWith(prior) ? formatted.slice(prior.length) : formatted; - state.lastStreamedReasoning = formatted; + const delta = trimmed.startsWith(prior) ? trimmed.slice(prior.length) : trimmed; + state.lastStreamedReasoning = trimmed; // Broadcast thinking event to WebSocket clients in real-time emitAgentEvent({ runId: params.runId, stream: "thinking", data: { - text: formatted, + text: trimmed, delta, }, }); void params.onReasoningStream({ - text: formatted, + text: trimmed, }); }; diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index df747c9c61e..7fb857b4a02 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -3911,7 +3911,7 @@ describe("dispatchReplyFromConfig", () => { const ctx = buildTestCtx({ Provider: "whatsapp" }); const replyResolver = async () => [ - { text: "Reasoning:\n_thinking..._", isReasoning: true }, + { text: "thinking...", isReasoning: true }, { text: "The answer is 42" }, ] satisfies ReplyPayload[]; await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver }); @@ -3930,7 +3930,7 @@ describe("dispatchReplyFromConfig", () => { opts?: GetReplyOptions, ): Promise => { // Simulate block reply with reasoning payload - await opts?.onBlockReply?.({ text: "Reasoning:\n_thinking..._", isReasoning: true }); + await opts?.onBlockReply?.({ text: "thinking...", isReasoning: true }); await opts?.onBlockReply?.({ text: "The answer is 42" }); return { text: "The answer is 42" }; }; @@ -3944,7 +3944,7 @@ describe("dispatchReplyFromConfig", () => { }, ); await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver }); - expect(blockReplySentTexts).not.toContain("Reasoning:\n_thinking..._"); + expect(blockReplySentTexts).not.toContain("thinking..."); expect(blockReplySentTexts).toContain("The answer is 42"); }); diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index 91b438462b3..b09d354ced9 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -850,12 +850,12 @@ describe("block reply coalescer", () => { }, }); - coalescer.enqueue({ text: "Reasoning:\n_hidden_", isReasoning: true }); + coalescer.enqueue({ text: "hidden", isReasoning: true }); coalescer.enqueue({ text: "Visible answer" }); await coalescer.flush({ force: true }); expect(flushes).toEqual([ - { text: "Reasoning:\n_hidden_", isReasoning: true }, + { text: "hidden", isReasoning: true }, { text: "Visible answer", isReasoning: undefined }, ]); coalescer.stop(); diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index 444bfc8671f..ffcb93cf3d4 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -204,7 +204,7 @@ describe("routeReply", () => { }); it("suppresses reasoning payloads", async () => { - await expectSlackNoDelivery({ text: "Reasoning:\n_step_", isReasoning: true }); + await expectSlackNoDelivery({ text: "step", isReasoning: true }); }); it("drops silent token payloads", async () => { diff --git a/src/daemon/service-env.test.ts b/src/daemon/service-env.test.ts index 3a709063304..c9691e57a89 100644 --- a/src/daemon/service-env.test.ts +++ b/src/daemon/service-env.test.ts @@ -964,7 +964,7 @@ describe("shared Node TLS env defaults focused", () => { }); it("defaults NODE_EXTRA_CA_CERTS on Linux when NVM_DIR is set", () => { - const expected = resolveLinuxSystemCaBundle(); + const expected = resolveLinuxSystemCaBundle({ platform: "linux" }); const env = buildServiceEnvironment({ env: { HOME: "/home/user", NVM_DIR: "/home/user/.nvm" }, port: 18789, @@ -975,7 +975,7 @@ describe("shared Node TLS env defaults focused", () => { }); it("defaults NODE_EXTRA_CA_CERTS on Linux when execPath is under nvm", () => { - const expected = resolveLinuxSystemCaBundle(); + const expected = resolveLinuxSystemCaBundle({ platform: "linux" }); const env = buildNodeServiceEnvironment({ env: { HOME: "/home/user" }, platform: "linux", diff --git a/src/gateway/server-methods/chat-webchat-media.test.ts b/src/gateway/server-methods/chat-webchat-media.test.ts index af62b1a27db..c83cd97bcbd 100644 --- a/src/gateway/server-methods/chat-webchat-media.test.ts +++ b/src/gateway/server-methods/chat-webchat-media.test.ts @@ -52,7 +52,7 @@ describe("buildWebchatAudioContentBlocksFromReplyPayloads", () => { const blocks = await buildWebchatAudioContentBlocksFromReplyPayloads( [ { - text: "Reasoning:\n_step_", + text: "step", mediaUrl: audioPath, trustedLocalMedia: true, isReasoning: true, diff --git a/src/gateway/server-methods/chat.directive-tags.test.ts b/src/gateway/server-methods/chat.directive-tags.test.ts index bb54099cc85..22e1c23064a 100644 --- a/src/gateway/server-methods/chat.directive-tags.test.ts +++ b/src/gateway/server-methods/chat.directive-tags.test.ts @@ -927,7 +927,7 @@ describe("chat directive tag stripping for non-streaming final payloads", () => mockState.dispatchedReplies = [ { kind: "final", - payload: { text: "Reasoning:\n_step_", isReasoning: true }, + payload: { text: "step", isReasoning: true }, }, { kind: "final", diff --git a/src/infra/heartbeat-runner.returns-default-unset.test.ts b/src/infra/heartbeat-runner.returns-default-unset.test.ts index 5a790370f53..c60762a9ff7 100644 --- a/src/infra/heartbeat-runner.returns-default-unset.test.ts +++ b/src/infra/heartbeat-runner.returns-default-unset.test.ts @@ -1134,19 +1134,25 @@ describe("runHeartbeatOnce", () => { typedCases<{ name: string; caseDir: string; - replies: Array<{ text: string }>; + replies: Array<{ text: string; isReasoning?: boolean }>; expectedTexts: string[]; }>([ { - name: "reasoning + final payload", + name: "legacy-prefixed reasoning + final payload", caseDir: "hb-reasoning", replies: [{ text: "Reasoning:\n_Because it helps_" }, { text: "Final alert" }], expectedTexts: ["Reasoning:\n_Because it helps_", "Final alert"], }, { - name: "reasoning + HEARTBEAT_OK", + name: "raw flagged reasoning + final payload", + caseDir: "hb-reasoning-raw", + replies: [{ text: "Because it helps", isReasoning: true }, { text: "Final alert" }], + expectedTexts: ["Reasoning:\n_Because it helps_", "Final alert"], + }, + { + name: "raw flagged reasoning + HEARTBEAT_OK", caseDir: "hb-reasoning-heartbeat-ok", - replies: [{ text: "Reasoning:\n_Because it helps_" }, { text: "HEARTBEAT_OK" }], + replies: [{ text: "Because it helps", isReasoning: true }, { text: "HEARTBEAT_OK" }], expectedTexts: ["Reasoning:\n_Because it helps_"], }, ]), diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 1e0dd20c75e..a82138c1f50 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -16,6 +16,7 @@ import { appendCronStyleCurrentTimeLine } from "../agents/current-time.js"; import { isNestedAgentLane } from "../agents/lanes.js"; import { resolveModelRefFromString, type ModelRef } from "../agents/model-selection.js"; import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js"; +import { formatReasoningMessage } from "../agents/pi-embedded-utils.js"; import { DEFAULT_HEARTBEAT_FILENAME } from "../agents/workspace.js"; import { resolveHeartbeatReplyPayload } from "../auto-reply/heartbeat-reply-payload.js"; import { @@ -639,10 +640,26 @@ function resolveHeartbeatReasoningPayloads( replyResult: ReplyPayload | ReplyPayload[] | undefined, ): ReplyPayload[] { const payloads = Array.isArray(replyResult) ? replyResult : replyResult ? [replyResult] : []; - return payloads.filter((payload) => { + const reasoningPayloads: ReplyPayload[] = []; + for (const payload of payloads) { const text = typeof payload.text === "string" ? payload.text : ""; - return text.trimStart().startsWith("Reasoning:"); - }); + const hasLegacyReasoningPrefix = text.trimStart().startsWith("Reasoning:"); + if (payload.isReasoning !== true && !hasLegacyReasoningPrefix) { + continue; + } + + const formattedText = hasLegacyReasoningPrefix ? text : formatReasoningMessage(text); + if (!formattedText.trim()) { + continue; + } + + const deliverablePayload: ReplyPayload = { ...payload, text: formattedText }; + delete deliverablePayload.isReasoning; + delete deliverablePayload.mediaUrl; + delete deliverablePayload.mediaUrls; + reasoningPayloads.push(deliverablePayload); + } + return reasoningPayloads; } async function restoreHeartbeatUpdatedAt(params: {