From 392897304cc57b96c74e52b0457d141332037948 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 4 May 2026 00:18:12 +0100 Subject: [PATCH] fix(channels): delay progress drafts until work is visible --- docs/concepts/progress-drafts.md | 23 ++-- .../monitor/message-handler.draft-preview.ts | 112 ++++++++++++------ .../monitor/message-handler.process.test.ts | 15 ++- .../src/monitor/message-handler.process.ts | 21 ++-- .../matrix/src/matrix/monitor/handler.test.ts | 9 +- .../matrix/src/matrix/monitor/handler.ts | 97 +++++++++------ .../msteams/src/reply-dispatcher.test.ts | 37 ++++-- extensions/msteams/src/reply-dispatcher.ts | 27 +++++ .../src/reply-stream-controller.test.ts | 3 +- .../msteams/src/reply-stream-controller.ts | 98 +++++++++------ .../dispatch.preview-fallback.test.ts | 26 ++++ .../src/monitor/message-handler/dispatch.ts | 90 ++++++++++---- .../telegram/src/bot-message-dispatch.test.ts | 16 ++- .../telegram/src/bot-message-dispatch.ts | 106 +++++++++++------ src/plugin-sdk/channel-streaming.test.ts | 44 ++++++- src/plugin-sdk/channel-streaming.ts | 91 ++++++++++++++ 16 files changed, 596 insertions(+), 219 deletions(-) diff --git a/docs/concepts/progress-drafts.md b/docs/concepts/progress-drafts.md index d0f16b4f14e..ee13f79e95f 100644 --- a/docs/concepts/progress-drafts.md +++ b/docs/concepts/progress-drafts.md @@ -12,9 +12,9 @@ Progress drafts make long-running agent turns feel alive in chat without turning the conversation into a stack of temporary status replies. When progress drafts are enabled, OpenClaw creates one visible work-in-progress -message, updates it while the agent reads, plans, calls tools, or waits for -approval, and then turns that draft into the final answer when the channel can -do that safely. +message only after the turn proves it is doing real work, updates it while the +agent reads, plans, calls tools, or waits for approval, and then turns that draft +into the final answer when the channel can do that safely. ```text Shelling... @@ -42,9 +42,10 @@ Enable progress drafts per channel with `streaming.mode: "progress"`: } ``` -That is usually enough. OpenClaw will pick an automatic one-word label, add -compact progress lines while useful work happens, and suppress duplicate -standalone progress chatter for that turn. +That is usually enough. OpenClaw will pick an automatic one-word label, wait +until work lasts at least five seconds or emits a second work event, add compact +progress lines while useful work happens, and suppress duplicate standalone +progress chatter for that turn. ## What Users See @@ -55,10 +56,12 @@ A progress draft has two parts: | Label | A short title such as `Thinking...` or `Shelling...`. | | Progress lines | Compact run updates such as tool calls, task steps, or approvals. | -The label appears immediately when the agent starts replying. Progress lines are -added only when the agent emits useful work updates. The final answer replaces -the draft when possible; otherwise OpenClaw sends the final answer normally and -cleans up or stops updating the draft according to the channel's transport. +The label appears after the agent starts meaningful work and either remains busy +for five seconds or emits a second work event. Plain text-only replies do not +show a progress draft. Progress lines are added only when the agent emits useful +work updates. The final answer replaces the draft when possible; otherwise +OpenClaw sends the final answer normally and cleans up or stops updating the +draft according to the channel's transport. ## Choose A Mode diff --git a/extensions/discord/src/monitor/message-handler.draft-preview.ts b/extensions/discord/src/monitor/message-handler.draft-preview.ts index f6a973a6e9f..8e327736c0c 100644 --- a/extensions/discord/src/monitor/message-handler.draft-preview.ts +++ b/extensions/discord/src/monitor/message-handler.draft-preview.ts @@ -1,6 +1,8 @@ import { EmbeddedBlockChunker } from "openclaw/plugin-sdk/agent-runtime"; import { + createChannelProgressDraftGate, formatChannelProgressDraftText, + isChannelProgressDraftWorkToolName, resolveChannelProgressDraftMaxLines, resolveChannelStreamingBlockEnabled, resolveChannelStreamingPreviewToolProgress, @@ -70,7 +72,6 @@ export function createDiscordDraftPreviewController(params: { let hasStreamedMessage = false; let finalizedViaPreviewMessage = false; let finalDeliveryHandled = false; - let progressDraftStarted = false; const previewToolProgressEnabled = Boolean(draftStream) && resolveChannelStreamingPreviewToolProgress(params.discordConfig); const suppressDefaultToolProgressMessages = @@ -83,6 +84,32 @@ export function createDiscordDraftPreviewController(params: { let previewToolProgressLines: string[] = []; const progressSeed = `${params.accountId}:${params.deliverChannelId}`; + const renderProgressDraft = async (options?: { flush?: boolean }) => { + if (!draftStream || discordStreamMode !== "progress") { + return; + } + const previewText = formatChannelProgressDraftText({ + entry: params.discordConfig, + lines: previewToolProgressLines, + seed: progressSeed, + }); + if (!previewText || previewText === lastPartialText) { + return; + } + lastPartialText = previewText; + draftText = previewText; + hasStreamedMessage = true; + draftChunker?.reset(); + draftStream.update(previewText); + if (options?.flush) { + await draftStream.flush(); + } + }; + + const progressDraftGate = createChannelProgressDraftGate({ + onStart: () => renderProgressDraft({ flush: true }), + }); + const resetProgressState = () => { lastPartialText = ""; draftText = ""; @@ -106,6 +133,9 @@ export function createDiscordDraftPreviewController(params: { get isProgressMode() { return discordStreamMode === "progress"; }, + get hasProgressDraftStarted() { + return progressDraftGate.hasStarted; + }, get finalizedViaPreviewMessage() { return finalizedViaPreviewMessage; }, @@ -120,50 +150,55 @@ export function createDiscordDraftPreviewController(params: { if (!draftStream || discordStreamMode !== "progress") { return; } - if (progressDraftStarted) { - return; - } - const previewText = formatChannelProgressDraftText({ - entry: params.discordConfig, - lines: [], - seed: progressSeed, - }); - if (!previewText || previewText === lastPartialText) { - return; - } - progressDraftStarted = true; - lastPartialText = previewText; - draftText = previewText; - hasStreamedMessage = true; - draftChunker?.reset(); - draftStream.update(previewText); - await draftStream.flush(); + await progressDraftGate.startNow(); }, - pushToolProgress(line?: string) { - if (!draftStream || !previewToolProgressEnabled || previewToolProgressSuppressed) { + async pushToolProgress(line?: string, options?: { toolName?: string }) { + if (!draftStream) { + return; + } + if ( + options?.toolName !== undefined && + !isChannelProgressDraftWorkToolName(options.toolName) + ) { return; } const normalized = line?.replace(/\s+/g, " ").trim(); - if (!normalized) { + if (discordStreamMode !== "progress") { + if (!previewToolProgressEnabled || previewToolProgressSuppressed || !normalized) { + return; + } + const previous = previewToolProgressLines.at(-1); + if (previous === normalized) { + return; + } + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(params.discordConfig), + ); + const previewText = formatChannelProgressDraftText({ + entry: params.discordConfig, + lines: previewToolProgressLines, + seed: progressSeed, + }); + lastPartialText = previewText; + draftText = previewText; + hasStreamedMessage = true; + draftChunker?.reset(); + draftStream.update(previewText); return; } - const previous = previewToolProgressLines.at(-1); - if (previous === normalized) { - return; + if (previewToolProgressEnabled && !previewToolProgressSuppressed && normalized) { + const previous = previewToolProgressLines.at(-1); + if (previous !== normalized) { + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(params.discordConfig), + ); + } + } + const alreadyStarted = progressDraftGate.hasStarted; + await progressDraftGate.noteWork(); + if (alreadyStarted && progressDraftGate.hasStarted) { + await renderProgressDraft(); } - previewToolProgressLines = [...previewToolProgressLines, normalized].slice( - -resolveChannelProgressDraftMaxLines(params.discordConfig), - ); - const previewText = formatChannelProgressDraftText({ - entry: params.discordConfig, - lines: previewToolProgressLines, - seed: progressSeed, - }); - lastPartialText = previewText; - draftText = previewText; - hasStreamedMessage = true; - draftChunker?.reset(); - draftStream.update(previewText); }, resolvePreviewFinalText(text?: string) { if (typeof text !== "string") { @@ -281,6 +316,7 @@ export function createDiscordDraftPreviewController(params: { }, async cleanup() { try { + progressDraftGate.cancel(); if (!finalDeliveryHandled) { await draftStream?.discardPending(); } diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 009364a06d0..c83004a7ecd 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -1452,6 +1452,7 @@ describe("processDiscordMessage draft streaming", () => { dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { await params?.replyOptions?.onReplyStart?.(); await params?.replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await params?.replyOptions?.onItemEvent?.({ progressText: "exec done" }); return createNoQueuedDispatchResult(); }); @@ -1477,7 +1478,7 @@ describe("processDiscordMessage draft streaming", () => { }); }); - it("starts Discord progress drafts when accepted turns dispatch", async () => { + it("does not start Discord progress drafts for text-only accepted turns", async () => { const draftStream = createMockDraftStreamForTest(); dispatchInboundMessage.mockImplementationOnce(async () => createNoQueuedDispatchResult()); @@ -1495,17 +1496,17 @@ describe("processDiscordMessage draft streaming", () => { await runProcessDiscordMessage(ctx); - expect(draftStream.update).toHaveBeenCalledTimes(1); - expect(draftStream.update).toHaveBeenCalledWith("Shelling"); - expect(draftStream.flush).toHaveBeenCalledTimes(1); + expect(draftStream.update).not.toHaveBeenCalled(); + expect(draftStream.flush).not.toHaveBeenCalled(); }); - it("keeps Discord progress drafts instead of delivering text-only interim blocks", async () => { + it("keeps Discord progress drafts instead of delivering text-only interim blocks after work expands", async () => { const draftStream = createMockDraftStreamForTest(); dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { await params?.dispatcher.sendBlockReply({ text: "on it" }); await params?.replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await params?.replyOptions?.onItemEvent?.({ progressText: "exec done" }); await params?.dispatcher.sendFinalReply({ text: "done" }); return { queuedFinal: true, counts: { final: 1, tool: 0, block: 1 } }; }); @@ -1523,8 +1524,7 @@ describe("processDiscordMessage draft streaming", () => { await runProcessDiscordMessage(ctx); - expect(draftStream.update).toHaveBeenCalledWith("Shelling"); - expect(draftStream.update).toHaveBeenCalledWith("Shelling\n• tool: exec"); + expect(draftStream.update).toHaveBeenCalledWith("Shelling\n• tool: exec\n• exec done"); expect(deliverDiscordReply).not.toHaveBeenCalled(); expect(editMessageDiscord).toHaveBeenCalledWith( "c1", @@ -1557,7 +1557,6 @@ describe("processDiscordMessage draft streaming", () => { await runProcessDiscordMessage(ctx); - expect(draftStream.update).toHaveBeenCalledWith("Shelling\n• tool: first"); expect(draftStream.update).toHaveBeenCalledWith("Shelling\n• tool: first\n• tool: second"); expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); }); diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index 28f28b4f727..c9ec233d228 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -436,7 +436,11 @@ export async function processDiscordMessage( return; } } - if (draftStream && isFinal) { + if ( + draftStream && + isFinal && + (!draftPreview.isProgressMode || draftPreview.hasProgressDraftStarted) + ) { draftPreview.markFinalDeliveryHandled(); const reply = resolveSendableOutboundReplyParts(payload); const hasMedia = reply.hasMedia; @@ -571,7 +575,6 @@ export async function processDiscordMessage( } await replyPipeline.typingCallbacks?.onReplyStart(); await statusReactions.setThinking(); - await draftPreview.startProgressDraft(); }, }); @@ -625,7 +628,6 @@ export async function processDiscordMessage( }, onPreDispatchFailure: settleDispatchBeforeStart, runDispatch: async () => { - await draftPreview.startProgressDraft(); return await dispatchInboundMessage({ ctx: ctxPayload, cfg, @@ -662,12 +664,13 @@ export async function processDiscordMessage( } await maybeBindStatusReactionsToToolReaction(payload); await statusReactions.setTool(payload.name); - draftPreview.pushToolProgress( + await draftPreview.pushToolProgress( payload.name ? `tool: ${payload.name}` : "tool running", + { toolName: payload.name }, ); }, onItemEvent: async (payload) => { - draftPreview.pushToolProgress( + await draftPreview.pushToolProgress( payload.progressText ?? payload.summary ?? payload.title ?? payload.name, ); }, @@ -675,7 +678,7 @@ export async function processDiscordMessage( if (payload.phase !== "update") { return; } - draftPreview.pushToolProgress( + await draftPreview.pushToolProgress( payload.explanation ?? payload.steps?.[0] ?? "planning", ); }, @@ -683,7 +686,7 @@ export async function processDiscordMessage( if (payload.phase !== "requested") { return; } - draftPreview.pushToolProgress( + await draftPreview.pushToolProgress( payload.command ? `approval: ${payload.command}` : "approval requested", ); }, @@ -691,7 +694,7 @@ export async function processDiscordMessage( if (payload.phase !== "end") { return; } - draftPreview.pushToolProgress( + await draftPreview.pushToolProgress( payload.name ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` : payload.title, @@ -701,7 +704,7 @@ export async function processDiscordMessage( if (payload.phase !== "end") { return; } - draftPreview.pushToolProgress( + await draftPreview.pushToolProgress( payload.summary ?? payload.title ?? "patch applied", ); }, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 9392d01ed8e..026e00bff19 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -2745,14 +2745,7 @@ describe("matrix monitor handler draft streaming", () => { await vi.waitFor(() => { expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); }); - await vi.waitFor(() => { - expect(editMessageMatrixMock).toHaveBeenCalledWith( - "!room:example.org", - "$draft1", - "Pearling\n- `second`", - expect.anything(), - ); - }); + expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toBe("Pearling\n- `second`"); await finish(); }); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index e1581a38f17..fa86eb11735 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,5 +1,7 @@ import { + createChannelProgressDraftGate, formatChannelProgressDraftText, + isChannelProgressDraftWorkToolName, resolveChannelProgressDraftMaxLines, } from "openclaw/plugin-sdk/channel-streaming"; import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-gating"; @@ -1498,21 +1500,10 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam // Set after the first final payload consumes or discards the draft event // so subsequent finals go through normal delivery. - const pushPreviewToolProgress = (line?: string) => { - if (!draftStream || !shouldStreamPreviewToolProgress || previewToolProgressSuppressed) { + const renderProgressDraft = () => { + if (!draftStream || !progressDraftStreaming) { return; } - const normalized = line?.replace(/\s+/g, " ").trim(); - if (!normalized) { - return; - } - const previous = previewToolProgressLines.at(-1); - if (previous === normalized) { - return; - } - previewToolProgressLines = [...previewToolProgressLines, normalized].slice( - -resolveChannelProgressDraftMaxLines(progressConfigEntry), - ); draftStream.update( formatChannelProgressDraftText({ entry: progressConfigEntry, @@ -1523,6 +1514,57 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }), ); }; + const progressDraftGate = createChannelProgressDraftGate({ + onStart: renderProgressDraft, + }); + + const pushPreviewToolProgress = async (line?: string, options?: { toolName?: string }) => { + if (!draftStream) { + return; + } + if ( + options?.toolName !== undefined && + !isChannelProgressDraftWorkToolName(options.toolName) + ) { + return; + } + const normalized = line?.replace(/\s+/g, " ").trim(); + if (!progressDraftStreaming) { + if (!shouldStreamPreviewToolProgress || previewToolProgressSuppressed || !normalized) { + return; + } + const previous = previewToolProgressLines.at(-1); + if (previous === normalized) { + return; + } + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(progressConfigEntry), + ); + draftStream.update( + formatChannelProgressDraftText({ + entry: progressConfigEntry, + lines: previewToolProgressLines, + seed: progressSeed, + formatLine: formatMatrixToolProgressMarkdownCode, + bullet: "-", + }), + ); + return; + } + if (shouldStreamPreviewToolProgress && !previewToolProgressSuppressed && normalized) { + const previous = previewToolProgressLines.at(-1); + if (previous !== normalized) { + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(progressConfigEntry), + ); + } + } + const alreadyStarted = progressDraftGate.hasStarted; + await progressDraftGate.noteWork(); + if (alreadyStarted && progressDraftGate.hasStarted) { + renderProgressDraft(); + } + }; const suppressPreviewToolProgressForAnswerText = (text: string | undefined) => { if (!text?.trim()) { @@ -1551,10 +1593,12 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam ...options, onToolStart: async (payload) => { const toolName = payload.name?.trim(); - pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running"); + await pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running", { + toolName, + }); }, onItemEvent: async (payload) => { - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.progressText ?? payload.summary ?? payload.title ?? payload.name, ); }, @@ -1562,13 +1606,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (payload.phase !== "update") { return; } - pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning"); + await pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning"); }, onApprovalEvent: async (payload) => { if (payload.phase !== "requested") { return; } - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.command ? `approval: ${payload.command}` : "approval requested", ); }, @@ -1576,13 +1620,13 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (payload.phase !== "end") { return; } - pushPreviewToolProgress(formatMatrixCommandOutputToolProgress(payload)); + await pushPreviewToolProgress(formatMatrixCommandOutputToolProgress(payload)); }, onPatchSummary: async (payload) => { if (payload.phase !== "end") { return; } - pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); + await pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); }, }; }; @@ -1958,20 +2002,6 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam // for the current assistant block, while block deliveries // finalize completed blocks into their own preserved events. disableBlockStreaming: !blockStreamingEnabled, - onReplyStart: - draftStream && progressDraftStreaming - ? () => { - draftStream.update( - formatChannelProgressDraftText({ - entry: progressConfigEntry, - lines: [], - seed: progressSeed, - formatLine: formatMatrixToolProgressMarkdownCode, - bullet: "-", - }), - ); - } - : undefined, onPartialReply: draftStream ? (payload) => { if (progressDraftStreaming) { @@ -2004,6 +2034,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); } finally { + progressDraftGate.cancel(); markRunComplete(); } }, diff --git a/extensions/msteams/src/reply-dispatcher.test.ts b/extensions/msteams/src/reply-dispatcher.test.ts index 90f6257122a..425c599eea7 100644 --- a/extensions/msteams/src/reply-dispatcher.test.ts +++ b/extensions/msteams/src/reply-dispatcher.test.ts @@ -166,11 +166,13 @@ describe("createMSTeamsReplyDispatcher", () => { lastCreatedDispatcher.replyOptions.onPartialReply?.({ text }); } - it("sends an informative status update on reply start for personal chats", async () => { - createDispatcher("personal"); + it("sends an informative status update once work expands in personal chats", async () => { + const dispatcher = createDispatcher("personal", { streaming: { mode: "progress" } }); const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; await options.onReplyStart?.(); + await dispatcher.replyOptions.onToolStart?.({ name: "exec" }); + await dispatcher.replyOptions.onItemEvent?.({ progressText: "done" }); expect(streamInstances).toHaveLength(1); expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1); @@ -194,9 +196,7 @@ describe("createMSTeamsReplyDispatcher", () => { await options.onReplyStart?.(); - // Even though we still send the informative update, the opt-out - // disables the typing keepalive. - expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1); + expect(streamInstances[0]?.sendInformativeUpdate).not.toHaveBeenCalled(); expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled(); }); @@ -314,14 +314,16 @@ describe("createMSTeamsReplyDispatcher", () => { expect(typingCallbacks.onReplyStart).not.toHaveBeenCalled(); }); - it("only sends the informative status update once", async () => { - createDispatcher("personal"); - const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + it("delays the informative status update until work expands", async () => { + const dispatcher = createDispatcher("personal", { streaming: { mode: "progress" } }); - await options.onReplyStart?.(); - await options.onReplyStart?.(); + await dispatcher.replyOptions.onToolStart?.({ name: "exec" }); + expect(streamInstances[0]?.sendInformativeUpdate).not.toHaveBeenCalled(); - expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(1); + await dispatcher.replyOptions.onItemEvent?.({ progressText: "done" }); + await dispatcher.replyOptions.onPatchSummary?.({ phase: "end", summary: "patched" }); + + expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledTimes(2); }); it("forwards partial replies into the Teams stream", async () => { @@ -344,9 +346,12 @@ describe("createMSTeamsReplyDispatcher", () => { expect(dispatcher.replyOptions.suppressDefaultToolProgressMessages).toBe(true); await dispatcher.replyOptions.onToolStart?.({ name: "web_search" }); + expect(streamInstances[0]?.sendInformativeUpdate).not.toHaveBeenCalled(); + + await dispatcher.replyOptions.onToolStart?.({ name: "exec" }); expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledWith( - "Working\n- tool: web_search", + "Working\n- tool: web_search\n- tool: exec", ); }); @@ -361,8 +366,14 @@ describe("createMSTeamsReplyDispatcher", () => { }); expect(dispatcher.replyOptions.suppressDefaultToolProgressMessages).toBe(true); - expect(dispatcher.replyOptions.onToolStart).toBeUndefined(); + await dispatcher.replyOptions.onToolStart?.({ name: "web_search" }); expect(streamInstances[0]?.sendInformativeUpdate).not.toHaveBeenCalled(); + + await dispatcher.replyOptions.onToolStart?.({ name: "exec" }); + + expect(streamInstances[0]?.sendInformativeUpdate).toHaveBeenCalledWith( + expect.stringMatching(/^[^\n]+\.\.\.$/), + ); }); it("does not create a stream for channel conversations", async () => { diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index f66f17ec9d7..9af66b02f66 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -343,6 +343,32 @@ export function createMSTeamsReplyDispatcher(params: { ? { onPartialReply: (payload: { text?: string }) => streamController.onPartialReply(payload), + onToolStart: async (payload: { name?: string }) => { + await streamController.noteProgressWork({ toolName: payload.name }); + }, + onItemEvent: async () => { + await streamController.noteProgressWork(); + }, + onPlanUpdate: async (payload: { phase?: string }) => { + if (payload.phase === "update") { + await streamController.noteProgressWork(); + } + }, + onApprovalEvent: async (payload: { phase?: string }) => { + if (payload.phase === "requested") { + await streamController.noteProgressWork(); + } + }, + onCommandOutput: async (payload: { phase?: string }) => { + if (payload.phase === "end") { + await streamController.noteProgressWork(); + } + }, + onPatchSummary: async (payload: { phase?: string }) => { + if (payload.phase === "end") { + await streamController.noteProgressWork(); + } + }, } : {}), ...(streamController.shouldSuppressDefaultToolProgressMessages() @@ -353,6 +379,7 @@ export function createMSTeamsReplyDispatcher(params: { onToolStart: async (payload: { name?: string; phase?: string }) => { await streamController.pushProgressLine( payload.name ? `tool: ${payload.name}` : (payload.phase ?? "tool running"), + { toolName: payload.name }, ); }, onItemEvent: async (payload: { diff --git a/extensions/msteams/src/reply-stream-controller.test.ts b/extensions/msteams/src/reply-stream-controller.test.ts index e5632ab562b..e73a8c9627e 100644 --- a/extensions/msteams/src/reply-stream-controller.test.ts +++ b/extensions/msteams/src/reply-stream-controller.test.ts @@ -213,7 +213,8 @@ describe("createTeamsReplyStreamController", () => { log: { debug: vi.fn() } as never, msteamsConfig: { streaming: { mode: "progress" } } as never, }); - await ctrl.onReplyStart(); + await ctrl.noteProgressWork({ toolName: "exec" }); + await ctrl.noteProgressWork(); const fullText = "x".repeat(4200); const result = await ctrl.preparePayload({ text: fullText }); diff --git a/extensions/msteams/src/reply-stream-controller.ts b/extensions/msteams/src/reply-stream-controller.ts index c8cb8834c8c..29c188c10b7 100644 --- a/extensions/msteams/src/reply-stream-controller.ts +++ b/extensions/msteams/src/reply-stream-controller.ts @@ -1,5 +1,7 @@ import { + createChannelProgressDraftGate, formatChannelProgressDraftText, + isChannelProgressDraftWorkToolName, resolveChannelPreviewStreamMode, resolveChannelProgressDraftMaxLines, resolveChannelProgressDraftLabel, @@ -61,32 +63,67 @@ export function createTeamsReplyStreamController(params: { let streamReceivedTokens = false; let informativeUpdateSent = false; let progressLines: string[] = []; + let lastInformativeText = ""; let pendingFinalize: Promise | undefined; - const pushProgressLine = async (line?: string): Promise => { - if (!stream || !shouldStreamPreviewToolProgress) { + const renderInformativeUpdate = async () => { + if (!stream) { return; } - const normalized = line?.replace(/\s+/g, " ").trim(); - if (!normalized) { + const informativeText = formatChannelProgressDraftText({ + entry: params.msteamsConfig, + lines: shouldStreamPreviewToolProgress ? progressLines : [], + seed: params.progressSeed, + bullet: "-", + }); + if (!informativeText || informativeText === lastInformativeText) { return; } - const previous = progressLines.at(-1); - if (previous === normalized) { - return; - } - progressLines = [...progressLines, normalized].slice( - -resolveChannelProgressDraftMaxLines(params.msteamsConfig), - ); + lastInformativeText = informativeText; informativeUpdateSent = true; - await stream.sendInformativeUpdate( - formatChannelProgressDraftText({ - entry: params.msteamsConfig, - lines: progressLines, - seed: params.progressSeed, - bullet: "-", - }), - ); + await stream.sendInformativeUpdate(informativeText); + }; + + const progressDraftGate = createChannelProgressDraftGate({ + onStart: renderInformativeUpdate, + }); + + const noteProgressWork = async (options?: { toolName?: string }): Promise => { + if (!stream || streamMode !== "progress") { + return; + } + if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) { + return; + } + const hadStarted = progressDraftGate.hasStarted; + await progressDraftGate.noteWork(); + if (hadStarted && progressDraftGate.hasStarted) { + await renderInformativeUpdate(); + } + }; + + const pushProgressLine = async ( + line?: string, + options?: { toolName?: string }, + ): Promise => { + if (!stream || streamMode !== "progress") { + return; + } + if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) { + return; + } + if (shouldStreamPreviewToolProgress) { + const normalized = line?.replace(/\s+/g, " ").trim(); + if (normalized) { + const previous = progressLines.at(-1); + if (previous !== normalized) { + progressLines = [...progressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(params.msteamsConfig), + ); + } + } + } + await noteProgressWork(); }; const fallbackAfterStreamFailure = ( @@ -109,19 +146,11 @@ export function createTeamsReplyStreamController(params: { return { async onReplyStart(): Promise { - if (!stream || informativeUpdateSent) { - return; - } - const informativeText = pickInformativeStatusText({ - config: params.msteamsConfig, - seed: params.progressSeed, - random: params.random, - }); - if (!informativeText) { - return; - } - informativeUpdateSent = true; - await stream.sendInformativeUpdate(informativeText); + return; + }, + + async noteProgressWork(options?: { toolName?: string }): Promise { + await noteProgressWork(options); }, onPartialReply(payload: { text?: string }): void { @@ -135,8 +164,8 @@ export function createTeamsReplyStreamController(params: { stream.update(payload.text); }, - async pushProgressLine(line?: string): Promise { - await pushProgressLine(line); + async pushProgressLine(line?: string, options?: { toolName?: string }): Promise { + await pushProgressLine(line, options); }, shouldSuppressDefaultToolProgressMessages(): boolean { @@ -191,6 +220,7 @@ export function createTeamsReplyStreamController(params: { }, async finalize(): Promise { + progressDraftGate.cancel(); await pendingFinalize; await stream?.finalize(); }, diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 8c69a7ddcb4..5392517d4c6 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -231,6 +231,30 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({ })); vi.mock("openclaw/plugin-sdk/channel-streaming", () => ({ + createChannelProgressDraftGate: (params: { onStart: () => void | Promise }) => { + let started = false; + let workEvents = 0; + return { + get hasStarted() { + return started; + }, + async noteWork() { + workEvents += 1; + if (!started && workEvents > 1) { + started = true; + await params.onStart(); + } + return started; + }, + async startNow() { + if (!started) { + started = true; + await params.onStart(); + } + }, + cancel() {}, + }; + }, formatChannelProgressDraftText: (params: { entry?: { streaming?: { progress?: { label?: string; maxLines?: number } } }; lines: string[]; @@ -269,6 +293,8 @@ vi.mock("openclaw/plugin-sdk/channel-streaming", () => ({ } return options?.previewToolProgressEnabled ?? true; }, + isChannelProgressDraftWorkToolName: (name?: string) => + Boolean(name && !["message", "react", "reaction"].includes(name.toLowerCase())), })); vi.mock("openclaw/plugin-sdk/outbound-runtime", () => ({ diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index 8c0d2b72a7d..7a26fd1cd34 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -13,7 +13,9 @@ import { resolveChannelSourceReplyDeliveryMode, } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { + createChannelProgressDraftGate, formatChannelProgressDraftText, + isChannelProgressDraftWorkToolName, resolveChannelProgressDraftMaxLines, resolveChannelStreamingBlockEnabled, resolveChannelStreamingNativeTransport, @@ -885,22 +887,10 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag let statusUpdateCount = 0; const progressSeed = `${account.accountId}:${message.channel}`; - const pushPreviewToolProgress = (line?: string) => { - if (!draftStream || !previewToolProgressEnabled || previewToolProgressSuppressed) { + const renderProgressDraft = () => { + if (!draftStream || streamMode !== "status_final") { return; } - const normalized = line?.replace(/\s+/g, " ").trim(); - if (!normalized) { - return; - } - const escaped = escapeSlackMrkdwn(normalized); - const previous = previewToolProgressLines.at(-1); - if (previous === escaped) { - return; - } - previewToolProgressLines = [...previewToolProgressLines, escaped].slice( - -resolveChannelProgressDraftMaxLines(account.config), - ); draftStream.update( formatChannelProgressDraftText({ entry: account.config, @@ -910,6 +900,55 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag ); hasStreamedMessage = true; }; + const progressDraftGate = createChannelProgressDraftGate({ + onStart: renderProgressDraft, + }); + + const pushPreviewToolProgress = async (line?: string, options?: { toolName?: string }) => { + if (!draftStream) { + return; + } + if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) { + return; + } + const normalized = line?.replace(/\s+/g, " ").trim(); + if (streamMode !== "status_final") { + if (!previewToolProgressEnabled || previewToolProgressSuppressed || !normalized) { + return; + } + const escaped = escapeSlackMrkdwn(normalized); + const previous = previewToolProgressLines.at(-1); + if (previous === escaped) { + return; + } + previewToolProgressLines = [...previewToolProgressLines, escaped].slice( + -resolveChannelProgressDraftMaxLines(account.config), + ); + draftStream.update( + formatChannelProgressDraftText({ + entry: account.config, + lines: previewToolProgressLines, + seed: progressSeed, + }), + ); + hasStreamedMessage = true; + return; + } + if (previewToolProgressEnabled && !previewToolProgressSuppressed && normalized) { + const escaped = escapeSlackMrkdwn(normalized); + const previous = previewToolProgressLines.at(-1); + if (previous !== escaped) { + previewToolProgressLines = [...previewToolProgressLines, escaped].slice( + -resolveChannelProgressDraftMaxLines(account.config), + ); + } + } + const alreadyStarted = progressDraftGate.hasStarted; + await progressDraftGate.noteWork(); + if (alreadyStarted && progressDraftGate.hasStarted) { + renderProgressDraft(); + } + }; const updateDraftFromPartial = (text?: string) => { const trimmed = text?.trimEnd(); @@ -936,6 +975,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } if (streamMode === "status_final") { + if (!progressDraftGate.hasStarted) { + return; + } statusUpdateCount += 1; if (statusUpdateCount > 1 && statusUpdateCount % 4 !== 0) { return; @@ -1036,10 +1078,13 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (statusReactionsEnabled) { await statusReactions.setTool(payload.name); } - pushPreviewToolProgress(payload.name ? `tool: ${payload.name}` : "tool running"); + await pushPreviewToolProgress( + payload.name ? `tool: ${payload.name}` : "tool running", + { toolName: payload.name }, + ); }, onItemEvent: async (payload) => { - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.progressText ?? payload.summary ?? payload.title ?? payload.name, ); }, @@ -1047,13 +1092,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (payload.phase !== "update") { return; } - pushPreviewToolProgress(payload.explanation ?? payload.steps?.[0] ?? "planning"); + await pushPreviewToolProgress( + payload.explanation ?? payload.steps?.[0] ?? "planning", + ); }, onApprovalEvent: async (payload) => { if (payload.phase !== "requested") { return; } - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.command ? `approval: ${payload.command}` : "approval requested", ); }, @@ -1061,7 +1108,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (payload.phase !== "end") { return; } - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.name ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` : payload.title, @@ -1071,7 +1118,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (payload.phase !== "end") { return; } - pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); + await pushPreviewToolProgress( + payload.summary ?? payload.title ?? "patch applied", + ); }, }, }), @@ -1087,6 +1136,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag } catch (err) { dispatchError = err; } finally { + progressDraftGate.cancel(); await draftStream?.discardPending(); if (!dispatchSettledBeforeStart) { markDispatchIdle(); diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index b8d79d157df..f64e1f50395 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -752,7 +752,7 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.update).toHaveBeenCalledWith("HelloWorld"); }); - it("reuses the Telegram progress draft for the first assistant final", async () => { + it("does not create a Telegram progress draft for a text-only final", async () => { const draftStream = createSequencedDraftStream(2001); createTelegramDraftStream.mockReturnValue(draftStream); dispatchReplyWithBufferedBlockDispatcher.mockImplementation( @@ -770,10 +770,14 @@ describe("dispatchTelegramMessage draft streaming", () => { telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, }); - expect(draftStream.update).toHaveBeenCalledWith("Shelling"); + expect(draftStream.update).not.toHaveBeenCalled(); expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); - expect(editMessageTelegram).toHaveBeenCalledWith(123, 2001, "Final answer", expect.any(Object)); - expect(draftStream.clear).not.toHaveBeenCalled(); + expect(editMessageTelegram).not.toHaveBeenCalled(); + expect(deliverReplies).toHaveBeenCalledWith( + expect.objectContaining({ + replies: [expect.objectContaining({ text: "Final answer" })], + }), + ); }); it("keeps the Telegram progress draft across post-tool assistant boundaries", async () => { @@ -784,6 +788,7 @@ describe("dispatchTelegramMessage draft streaming", () => { await replyOptions?.onReplyStart?.(); await replyOptions?.onAssistantMessageStart?.(); await replyOptions?.onItemEvent?.({ progressText: "exec ls ~/Desktop" }); + await replyOptions?.onItemEvent?.({ progressText: "tests passed" }); await replyOptions?.onAssistantMessageStart?.(); await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" }); return { queuedFinal: true }; @@ -796,9 +801,8 @@ describe("dispatchTelegramMessage draft streaming", () => { telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, }); - expect(draftStream.update).toHaveBeenCalledWith("Shelling"); expect(draftStream.update).toHaveBeenCalledWith( - expect.stringMatching(/^Shelling\n• `exec ls ~\/Desktop`$/), + expect.stringMatching(/^Shelling\n• `exec ls ~\/Desktop`\n• `tests passed`$/), ); expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); expect(draftStream.materialize).not.toHaveBeenCalled(); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index fc2030a68bb..7f0cfab7eaf 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -7,7 +7,9 @@ import { } from "openclaw/plugin-sdk/channel-feedback"; import { createChannelReplyPipeline } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { + createChannelProgressDraftGate, formatChannelProgressDraftText, + isChannelProgressDraftWorkToolName, resolveChannelProgressDraftMaxLines, resolveChannelStreamingBlockEnabled, resolveChannelStreamingPreviewToolProgress, @@ -476,30 +478,72 @@ export const dispatchTelegramMessage = async ({ Boolean(answerLane.stream) && resolveChannelStreamingPreviewToolProgress(telegramCfg); let previewToolProgressSuppressed = false; let previewToolProgressLines: string[] = []; - const pushPreviewToolProgress = (line?: string) => { - if (!previewToolProgressEnabled || previewToolProgressSuppressed || !answerLane.stream) { + const renderProgressDraft = async (options?: { flush?: boolean }) => { + if (!answerLane.stream || streamMode !== "progress") { return; } - const normalized = line?.replace(/\s+/g, " ").trim(); - if (!normalized) { - return; - } - const previous = previewToolProgressLines.at(-1); - if (previous === normalized) { - return; - } - previewToolProgressLines = [...previewToolProgressLines, normalized].slice( - -resolveChannelProgressDraftMaxLines(telegramCfg), - ); const previewText = formatChannelProgressDraftText({ entry: telegramCfg, lines: previewToolProgressLines, seed: progressSeed, formatLine: formatProgressAsMarkdownCode, }); + if (!previewText || previewText === answerLane.lastPartialText) { + return; + } answerLane.lastPartialText = previewText; answerLane.hasStreamedMessage = true; answerLane.stream.update(previewText); + if (options?.flush) { + await answerLane.stream.flush(); + } + }; + const progressDraftGate = createChannelProgressDraftGate({ + onStart: () => renderProgressDraft({ flush: true }), + }); + const pushPreviewToolProgress = async (line?: string, options?: { toolName?: string }) => { + if (!answerLane.stream) { + return; + } + if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) { + return; + } + const normalized = line?.replace(/\s+/g, " ").trim(); + if (streamMode !== "progress") { + if (!previewToolProgressEnabled || previewToolProgressSuppressed || !normalized) { + return; + } + const previous = previewToolProgressLines.at(-1); + if (previous === normalized) { + return; + } + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(telegramCfg), + ); + const previewText = formatChannelProgressDraftText({ + entry: telegramCfg, + lines: previewToolProgressLines, + seed: progressSeed, + formatLine: formatProgressAsMarkdownCode, + }); + answerLane.lastPartialText = previewText; + answerLane.hasStreamedMessage = true; + answerLane.stream.update(previewText); + return; + } + if (previewToolProgressEnabled && !previewToolProgressSuppressed && normalized) { + const previous = previewToolProgressLines.at(-1); + if (previous !== normalized) { + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(telegramCfg), + ); + } + } + const alreadyStarted = progressDraftGate.hasStarted; + await progressDraftGate.noteWork(); + if (alreadyStarted && progressDraftGate.hasStarted) { + await renderProgressDraft(); + } }; let splitReasoningOnNextStream = false; let skipNextAnswerMessageStartRotation = false; @@ -1067,25 +1111,6 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, - onReplyStart: - answerLane.stream && streamMode === "progress" - ? () => - enqueueDraftLaneEvent(async () => { - const previewText = formatChannelProgressDraftText({ - entry: telegramCfg, - lines: [], - seed: progressSeed, - formatLine: formatProgressAsMarkdownCode, - }); - if (!previewText || previewText === answerLane.lastPartialText) { - return; - } - answerLane.lastPartialText = previewText; - answerLane.hasStreamedMessage = true; - answerLane.stream?.update(previewText); - await answerLane.stream?.flush(); - }) - : undefined, onPartialReply: answerLane.stream || reasoningLane.stream ? (payload) => @@ -1147,10 +1172,12 @@ export const dispatchTelegramMessage = async ({ if (statusReactionController && toolName) { await statusReactionController.setTool(toolName); } - pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running"); + await pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running", { + toolName, + }); }, onItemEvent: async (payload) => { - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.progressText ?? payload.summary ?? payload.title ?? payload.name, ); }, @@ -1158,7 +1185,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "update") { return; } - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.explanation ?? payload.steps?.[0] ?? "planning", ); }, @@ -1166,7 +1193,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "requested") { return; } - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.command ? `approval: ${payload.command}` : "approval requested", ); }, @@ -1174,7 +1201,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "end") { return; } - pushPreviewToolProgress( + await pushPreviewToolProgress( payload.name ? `${payload.name}${payload.exitCode === 0 ? " ✓" : payload.exitCode != null ? ` (exit ${payload.exitCode})` : ""}` : payload.title, @@ -1184,7 +1211,9 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "end") { return; } - pushPreviewToolProgress(payload.summary ?? payload.title ?? "patch applied"); + await pushPreviewToolProgress( + payload.summary ?? payload.title ?? "patch applied", + ); }, onCompactionStart: statusReactionController || answerLane.stream @@ -1223,6 +1252,7 @@ export const dispatchTelegramMessage = async ({ runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`)); } finally { await draftLaneEventQueue; + progressDraftGate.cancel(); if (isDispatchSuperseded()) { if (answerLane.hasStreamedMessage || typeof answerLane.stream?.messageId() === "number") { retainPreviewOnCleanupByLane.answer = true; diff --git a/src/plugin-sdk/channel-streaming.test.ts b/src/plugin-sdk/channel-streaming.test.ts index 155801fd824..da4cf534c44 100644 --- a/src/plugin-sdk/channel-streaming.test.ts +++ b/src/plugin-sdk/channel-streaming.test.ts @@ -1,8 +1,10 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { + createChannelProgressDraftGate, DEFAULT_PROGRESS_DRAFT_LABELS, formatChannelProgressDraftText, getChannelStreamingConfigObject, + isChannelProgressDraftWorkToolName, resolveChannelPreviewStreamMode, resolveChannelProgressDraftLabel, resolveChannelProgressDraftMaxLines, @@ -16,6 +18,10 @@ import { } from "./channel-streaming.js"; describe("channel-streaming", () => { + afterEach(() => { + vi.useRealTimers(); + }); + it("reads canonical nested streaming config first", () => { const entry = { streaming: { @@ -172,4 +178,40 @@ describe("channel-streaming", () => { }), ).toBe("Shelling\n• `patch applied`\n• `tests done`"); }); + + it("starts progress drafts after five seconds or a second work event", async () => { + vi.useFakeTimers(); + const onStart = vi.fn(async () => {}); + const gate = createChannelProgressDraftGate({ onStart }); + + await expect(gate.noteWork()).resolves.toBe(false); + expect(onStart).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(4_999); + expect(onStart).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(onStart).toHaveBeenCalledTimes(1); + expect(gate.hasStarted).toBe(true); + }); + + it("starts progress drafts immediately on the second work event", async () => { + vi.useFakeTimers(); + const onStart = vi.fn(async () => {}); + const gate = createChannelProgressDraftGate({ onStart }); + + await gate.noteWork(); + await expect(gate.noteWork()).resolves.toBe(true); + + expect(onStart).toHaveBeenCalledTimes(1); + await vi.advanceTimersByTimeAsync(5_000); + expect(onStart).toHaveBeenCalledTimes(1); + }); + + it("ignores message-like tools for progress draft work", () => { + expect(isChannelProgressDraftWorkToolName("message")).toBe(false); + expect(isChannelProgressDraftWorkToolName("react")).toBe(false); + expect(isChannelProgressDraftWorkToolName("web_search")).toBe(true); + expect(isChannelProgressDraftWorkToolName("exec")).toBe(true); + }); }); diff --git a/src/plugin-sdk/channel-streaming.ts b/src/plugin-sdk/channel-streaming.ts index 47926521e83..bbb80b4d25e 100644 --- a/src/plugin-sdk/channel-streaming.ts +++ b/src/plugin-sdk/channel-streaming.ts @@ -107,6 +107,97 @@ export const DEFAULT_PROGRESS_DRAFT_LABELS = [ "Surfacing...", ] as const; +export const DEFAULT_PROGRESS_DRAFT_INITIAL_DELAY_MS = 5_000; + +const NON_WORK_PROGRESS_TOOL_NAMES = new Set([ + "message", + "messages", + "reply", + "send", + "reaction", + "react", + "typing", +]); + +export function isChannelProgressDraftWorkToolName(name: string | null | undefined): boolean { + const normalized = normalizeOptionalLowercaseString(name); + return Boolean(normalized && !NON_WORK_PROGRESS_TOOL_NAMES.has(normalized)); +} + +export function createChannelProgressDraftGate(params: { + onStart: () => void | Promise; + initialDelayMs?: number; + setTimeoutFn?: typeof setTimeout; + clearTimeoutFn?: typeof clearTimeout; +}) { + const initialDelayMs = params.initialDelayMs ?? DEFAULT_PROGRESS_DRAFT_INITIAL_DELAY_MS; + const setTimeoutFn = params.setTimeoutFn ?? setTimeout; + const clearTimeoutFn = params.clearTimeoutFn ?? clearTimeout; + let started = false; + let disposed = false; + let workEvents = 0; + let timer: ReturnType | undefined; + let startPromise: Promise | undefined; + + const clearTimer = () => { + if (timer) { + clearTimeoutFn(timer); + timer = undefined; + } + }; + + const start = (): Promise => { + if (disposed || started) { + return startPromise ?? Promise.resolve(); + } + started = true; + clearTimer(); + startPromise = Promise.resolve().then(params.onStart); + return startPromise; + }; + + const schedule = () => { + if (timer || started || disposed || initialDelayMs < 0) { + return; + } + timer = setTimeoutFn(() => { + timer = undefined; + void start().catch(() => {}); + }, initialDelayMs); + }; + + return { + get hasStarted() { + return started; + }, + get workEvents() { + return workEvents; + }, + async noteWork(): Promise { + if (disposed) { + return false; + } + workEvents += 1; + if (started) { + return true; + } + if (workEvents > 1) { + await start(); + return true; + } + schedule(); + return false; + }, + async startNow(): Promise { + await start(); + }, + cancel(): void { + disposed = true; + clearTimer(); + }, + }; +} + export function getChannelStreamingConfigObject( entry: StreamingCompatEntry | null | undefined, ): ChannelStreamingConfig | undefined {