From c33e578554695145c1428268b740891bdf01deaa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 3 May 2026 22:01:08 +0100 Subject: [PATCH] feat: add channel progress drafts Adds unified progress-draft streaming for chat channels, with docs and per-channel regressions. --- CHANGELOG.md | 1 + docs/.generated/config-baseline.sha256 | 4 +- docs/.i18n/glossary.zh-CN.json | 36 ++ docs/channels/discord.md | 2 +- docs/channels/telegram.md | 8 +- docs/concepts/progress-drafts.md | 284 ++++++++++++++ docs/concepts/streaming.md | 25 +- docs/docs.json | 1 + docs/gateway/config-channels.md | 2 +- extensions/discord/src/config-ui-hints.ts | 4 +- .../monitor/message-handler.draft-preview.ts | 48 ++- .../monitor/message-handler.process.test.ts | 84 ++++- .../src/monitor/message-handler.process.ts | 6 +- extensions/discord/src/preview-streaming.ts | 29 +- extensions/matrix/src/config-schema.test.ts | 18 +- extensions/matrix/src/config-schema.ts | 13 +- .../matrix/monitor/handler.test-helpers.ts | 9 +- .../matrix/src/matrix/monitor/handler.test.ts | 60 ++- .../matrix/src/matrix/monitor/handler.ts | 58 ++- .../matrix/src/matrix/monitor/index.test.ts | 8 + extensions/matrix/src/matrix/monitor/index.ts | 12 +- extensions/matrix/src/types.ts | 10 +- extensions/msteams/src/config-ui-hints.ts | 8 + .../msteams/src/reply-dispatcher.test.ts | 27 +- extensions/msteams/src/reply-dispatcher.ts | 18 +- .../src/reply-stream-controller.test.ts | 160 ++++++-- .../msteams/src/reply-stream-controller.ts | 97 +++-- .../msteams/src/streaming-message.test.ts | 24 ++ extensions/msteams/src/streaming-message.ts | 16 + .../dispatch.preview-fallback.test.ts | 167 ++++++++- .../src/monitor/message-handler/dispatch.ts | 64 ++-- .../telegram/src/bot-message-dispatch.test.ts | 85 ++++- .../telegram/src/bot-message-dispatch.ts | 47 ++- extensions/telegram/src/bot.helpers.test.ts | 4 +- extensions/telegram/src/bot/types.ts | 2 +- extensions/telegram/src/config-ui-hints.ts | 4 +- extensions/telegram/src/preview-streaming.ts | 7 +- ...ndled-channel-config-metadata.generated.ts | 353 +++++++++++++++++- src/config/types.base.ts | 16 +- src/config/types.msteams.ts | 3 + src/config/zod-schema.providers-core.ts | 10 + src/plugin-sdk/channel-streaming.test.ts | 92 +++++ src/plugin-sdk/channel-streaming.ts | 144 ++++++- 43 files changed, 1861 insertions(+), 209 deletions(-) create mode 100644 docs/concepts/progress-drafts.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 33483a87488..0d769988c21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai ### Changes +- Channels/streaming: add unified `streaming.mode: "progress"` drafts with auto single-word status labels and shared progress configuration across Discord, Telegram, Matrix, Slack, and Microsoft Teams. - Tools/BTW: add `/side` as a text and native slash-command alias for `/btw` side questions. - Agents/tools: skip optional media and PDF tool factories when the effective tool denylist already blocks them, avoiding unnecessary hot-path setup for tools that will be filtered out before model use. (#76773) Thanks @dorukardahan. - Discord/status: let explicit reaction tool calls opt into tracking subsequent tool progress on the reacted message with `trackToolCalls: true`, and use the shared tool display emoji table for status reactions. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 4397159a04c..b7f68687da2 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -b3d5eb9ae0e53dfd8bd8e2b06373fd80b84db1057fe88075afc4b890ff179371 config-baseline.json +c165172059698df0d806ee9861b9ba1433c2055935bbf81497437897bfcc4e6f config-baseline.json bfb7ade43e58c630d0480eaa215ef22bf0d5030136c3e24cdd2c2a4c73d1b663 config-baseline.core.json -09a952cf734a5b4a30f760e570c0f106d54aa8e74bf439dd4d07013f9f7607e4 config-baseline.channel.json +f0e77e90432c987c27143194f70df649bac7595e78613b3708366ff32401369f config-baseline.channel.json 055fae0d0067a751dc10125af7421da45633f73519c94c982d02b0c4eb2bdf67 config-baseline.plugin.json diff --git a/docs/.i18n/glossary.zh-CN.json b/docs/.i18n/glossary.zh-CN.json index fe937d74086..ad249ac7376 100644 --- a/docs/.i18n/glossary.zh-CN.json +++ b/docs/.i18n/glossary.zh-CN.json @@ -287,6 +287,42 @@ "source": "Block streaming", "target": "分块流式传输" }, + { + "source": "Progress drafts", + "target": "进度草稿" + }, + { + "source": "Streaming and chunking", + "target": "流式传输和分块" + }, + { + "source": "Messages", + "target": "消息" + }, + { + "source": "Channel configuration", + "target": "频道配置" + }, + { + "source": "Discord", + "target": "Discord" + }, + { + "source": "Matrix", + "target": "Matrix" + }, + { + "source": "Microsoft Teams", + "target": "Microsoft Teams" + }, + { + "source": "Slack", + "target": "Slack" + }, + { + "source": "Telegram", + "target": "Telegram" + }, { "source": "Discovery + transports", "target": "设备发现 + 传输协议" diff --git a/docs/channels/discord.md b/docs/channels/discord.md index e59efbfcc7b..35a4442c28e 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -660,7 +660,7 @@ Default slash command settings: - OpenClaw can stream draft replies by sending a temporary message and editing it as text arrives. `channels.discord.streaming` takes `off` (default) | `partial` | `block` | `progress`. `progress` maps to `partial` on Discord; `streamMode` is a legacy alias and is auto-migrated. + OpenClaw can stream draft replies by sending a temporary message and editing it as text arrives. `channels.discord.streaming` takes `off` (default) | `partial` | `block` | `progress`. `progress` keeps one editable status draft and updates it with tool progress until final delivery; `streamMode` is a legacy alias and is auto-migrated. Default stays `off` because Discord preview edits hit rate limits quickly when multiple bots or gateways share an account. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index c32cb4159dc..0f9762156c2 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -278,11 +278,11 @@ curl "https://api.telegram.org/bot/getUpdates" Requirement: - `channels.telegram.streaming` is `off | partial | block | progress` (default: `partial`) - - `progress` maps to `partial` on Telegram (compat with cross-channel naming) + - `progress` keeps one editable status draft and updates it with tool progress until final delivery - `streaming.preview.toolProgress` controls whether tool/progress updates reuse the same edited preview message (default: `true` when preview streaming is active) - legacy `channels.telegram.streamMode` and boolean `streaming` values are detected; run `openclaw doctor --fix` to migrate them to `channels.telegram.streaming.mode` - Tool-progress preview updates are the short "Working..." lines shown while tools run, for example command execution, file reads, planning updates, or patch summaries. Telegram keeps these enabled by default to match released OpenClaw behavior from `v2026.4.22` and later. To keep the edited preview for answer text but hide tool-progress lines, set: + Tool-progress preview updates are the short status lines shown while tools run, for example command execution, file reads, planning updates, or patch summaries. Telegram keeps these enabled by default to match released OpenClaw behavior from `v2026.4.22` and later. To keep the edited preview for answer text but hide tool-progress lines, set: ```json { @@ -299,10 +299,10 @@ curl "https://api.telegram.org/bot/getUpdates" } ``` - Use `streaming.mode: "off"` only when you want final-only delivery: Telegram preview edits are disabled and generic tool/progress chatter is suppressed instead of being sent as standalone "Working..." messages. Approval prompts, media payloads, and errors still route through normal final delivery. Use `streaming.preview.toolProgress: false` when you only want to keep answer preview edits while hiding the tool-progress status lines. + Use `streaming.mode: "off"` only when you want final-only delivery: Telegram preview edits are disabled and generic tool/progress chatter is suppressed instead of being sent as standalone status messages. Approval prompts, media payloads, and errors still route through normal final delivery. Use `streaming.preview.toolProgress: false` when you only want to keep answer preview edits while hiding the tool-progress status lines. - Telegram selected quote replies are the exception. When `replyToMode` is `"first"`, `"all"`, or `"batched"` and the inbound message includes selected quote text, OpenClaw sends the final answer through Telegram's native quote-reply path instead of editing the answer preview, so `streaming.preview.toolProgress` cannot show the short "Working..." lines for that turn. Current-message replies without selected quote text still keep preview streaming. Set `replyToMode: "off"` when tool-progress visibility matters more than native quote replies, or set `streaming.preview.toolProgress: false` to acknowledge the trade-off. + Telegram selected quote replies are the exception. When `replyToMode` is `"first"`, `"all"`, or `"batched"` and the inbound message includes selected quote text, OpenClaw sends the final answer through Telegram's native quote-reply path instead of editing the answer preview, so `streaming.preview.toolProgress` cannot show the short status lines for that turn. Current-message replies without selected quote text still keep preview streaming. Set `replyToMode: "off"` when tool-progress visibility matters more than native quote replies, or set `streaming.preview.toolProgress: false` to acknowledge the trade-off. For text-only replies: diff --git a/docs/concepts/progress-drafts.md b/docs/concepts/progress-drafts.md new file mode 100644 index 00000000000..d93722a3ea2 --- /dev/null +++ b/docs/concepts/progress-drafts.md @@ -0,0 +1,284 @@ +--- +summary: "Progress drafts: one visible work-in-progress message that updates while an agent runs" +read_when: + - Configuring visible progress updates for long-running chat turns + - Choosing between partial, block, and progress streaming modes + - Explaining how OpenClaw updates one channel message while work is in progress + - Troubleshooting progress drafts, standalone progress messages, or finalization fallback +title: "Progress drafts" +--- + +Progress drafts make long-running agent turns feel alive in chat without turning +the conversation into a stack of temporary status replies. + +When progress drafts are enabled, OpenClaw creates one visible work-in-progress +message, updates it while the agent reads, plans, calls tools, or waits for +approval, and then turns that draft into the final answer when the channel can +do that safely. + +```text +Shelling +- reading recent channel context +- checking matching issues +- preparing reply +``` + +Use progress drafts when you want one tidy status message during tool-heavy work +and the final answer when the turn is done. + +## Quick Start + +Enable progress drafts per channel with `streaming.mode: "progress"`: + +```json5 +{ + channels: { + discord: { + streaming: { + mode: "progress", + }, + }, + }, +} +``` + +That is usually enough. OpenClaw will pick an automatic one-word label, add +compact progress lines while useful work happens, and suppress duplicate +standalone progress chatter for that turn. + +## What Users See + +A progress draft has two parts: + +| Part | Purpose | +| -------------- | ----------------------------------------------------------------- | +| Label | A short title such as `Thinking` or `Shelling`. | +| Progress lines | Compact run updates such as tool calls, task steps, or approvals. | + +The label appears immediately when the agent starts replying. Progress lines are +added only when the agent emits useful work updates. The final answer replaces +the draft when possible; otherwise OpenClaw sends the final answer normally and +cleans up or stops updating the draft according to the channel's transport. + +## Choose A Mode + +`channels..streaming.mode` controls the visible in-progress behavior: + +| Mode | Best for | What appears in chat | +| ---------- | -------------------------------- | ------------------------------------------------- | +| `off` | Quiet channels | Only the final answer. | +| `partial` | Watching answer text appear | One draft edited with the latest answer text. | +| `block` | Larger answer-preview chunks | One preview updated or appended in bigger chunks. | +| `progress` | Tool-heavy or long-running turns | One status draft, then the final answer. | + +Choose `progress` when users care more about "what is happening" than watching +the answer text stream token by token. + +Choose `partial` when the answer itself is the progress signal. + +Choose `block` when you want draft preview updates in larger text chunks. On +Discord and Telegram, `streaming.mode: "block"` is still preview streaming, not +normal block delivery. Use `streaming.block.enabled` or legacy +`blockStreaming` when you want normal block replies. + +## Configure Labels + +Progress labels live under `channels..streaming.progress`. + +The default label is `auto`, which chooses from OpenClaw's built-in single-word +label pool: + +```text +Thinking +Shelling +Scuttling +Clawing +Pinching +Molting +Bubbling +Tiding +Reefing +Cracking +Sifting +Brining +Nautiling +Krilling +Barnacling +Lobstering +Tidepooling +Pearling +Snapping +Surfacing +``` + +Use a fixed label: + +```json5 +{ + channels: { + discord: { + streaming: { + mode: "progress", + progress: { + label: "Investigating", + }, + }, + }, + }, +} +``` + +Use your own automatic label pool: + +```json5 +{ + channels: { + discord: { + streaming: { + mode: "progress", + progress: { + label: "auto", + labels: ["Checking", "Reading", "Testing", "Finishing"], + }, + }, + }, + }, +} +``` + +Hide the label and show only progress lines: + +```json5 +{ + channels: { + discord: { + streaming: { + mode: "progress", + progress: { + label: false, + }, + }, + }, + }, +} +``` + +## Control Progress Lines + +Progress lines are enabled by default in progress mode. They come from real run +events: tool starts, item updates, task plans, approvals, command output, patch +summaries, and similar agent activity. + +Limit how many lines stay visible: + +```json5 +{ + channels: { + discord: { + streaming: { + mode: "progress", + progress: { + maxLines: 4, + }, + }, + }, + }, +} +``` + +Keep the single progress draft but hide tool and task lines: + +```json5 +{ + channels: { + discord: { + streaming: { + mode: "progress", + progress: { + toolProgress: false, + }, + }, + }, + }, +} +``` + +With `toolProgress: false`, OpenClaw still suppresses the older standalone +tool-progress messages for that turn. The channel stays visually quiet until the +final answer, except for the label if one is configured. + +## Channel Behavior + +Each channel uses the cleanest transport it supports: + +| Channel | Progress transport | Notes | +| --------------- | -------------------------------------- | --------------------------------------------------------------------- | +| Discord | Send one message, then edit it. | Final text edits in place when it fits one safe preview message. | +| Matrix | Send one event, then edit it. | Account-level streaming config controls account-level drafts. | +| Microsoft Teams | Native Teams stream in personal chats. | `streaming.mode: "block"` maps to Teams block delivery. | +| Slack | Native stream or editable draft post. | Thread availability affects whether native streaming can be used. | +| Telegram | Send one message, then edit it. | Older visible drafts may be replaced so final timestamps stay useful. | +| Mattermost | Editable draft post. | Tool activity is folded into the same draft-style post. | + +Channels without safe edit support usually fall back to typing indicators or +final-only delivery. + +## Finalization + +When the final answer is ready, OpenClaw tries to keep the chat clean: + +- If the draft can safely become the final answer, OpenClaw edits it in place. +- If the channel uses native progress streaming, OpenClaw finalizes that stream + when the native transport accepts the final text. +- If the final answer has media, an approval prompt, an explicit reply target, + too many chunks, or a failed edit/send, OpenClaw sends the final answer through + the normal channel delivery path. + +The fallback path is intentional. It is better to send a fresh final answer than +to lose text, mis-thread a reply, or overwrite a draft with a payload the channel +cannot represent safely. + +## Troubleshooting + +**I only see the final answer.** + +Check that `channels..streaming.mode` is set to `progress` for the +account or channel that handled the message. Some group or quote-reply paths may +disable draft previews for a turn when the channel cannot safely edit the right +message. + +**I see the label but no tool lines.** + +Check `streaming.progress.toolProgress`. If it is `false`, OpenClaw keeps the +single draft behavior but hides tool and task progress lines. + +**I see a fresh final message instead of an edited draft.** + +That is a safety fallback. It can happen for media replies, long answers, +explicit reply targets, old Telegram drafts, missing Slack thread targets, +deleted preview messages, or failed native stream finalization. + +**I still see standalone progress messages.** + +Progress mode suppresses default standalone tool-progress messages when a draft +is active. If standalone messages still appear, verify that the turn is actually +using progress mode and not `streaming.mode: "off"` or a channel path that +cannot create a draft for that message. + +**Teams behaves differently from Discord or Telegram.** + +Microsoft Teams uses a native stream in personal chats instead of the generic +send-and-edit preview transport. Teams also treats `streaming.mode: "block"` as +Teams block delivery because it does not have the same draft-preview block mode +used by Discord and Telegram. + +## Related + +- [Streaming and chunking](/concepts/streaming) +- [Messages](/concepts/messages) +- [Channel configuration](/gateway/config-channels) +- [Discord](/channels/discord) +- [Matrix](/channels/matrix) +- [Microsoft Teams](/channels/msteams) +- [Slack](/channels/slack) +- [Telegram](/channels/telegram) diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index e98bbb1ee04..9e8aab948bc 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -127,14 +127,22 @@ Modes: - `block`: preview updates in chunked/appended steps. - `progress`: progress/status preview during generation, final answer at completion. +`streaming.mode: "block"` is a preview-streaming mode for edit-capable channels +such as Discord and Telegram. It does not enable channel block delivery there. +Use `streaming.block.enabled` or the legacy `blockStreaming` channel key when +you want normal block replies. Microsoft Teams is the exception: it has no +draft-preview block transport, so `streaming.mode: "block"` maps to Teams block +delivery instead of native partial/progress streaming. + ### Channel mapping -| Channel | `off` | `partial` | `block` | `progress` | -| ---------- | ----- | --------- | ------- | ----------------- | -| Telegram | ✅ | ✅ | ✅ | maps to `partial` | -| Discord | ✅ | ✅ | ✅ | maps to `partial` | -| Slack | ✅ | ✅ | ✅ | ✅ | -| Mattermost | ✅ | ✅ | ✅ | ✅ | +| Channel | `off` | `partial` | `block` | `progress` | +| ---------- | ----- | --------- | ------- | ----------------------- | +| Telegram | ✅ | ✅ | ✅ | editable progress draft | +| Discord | ✅ | ✅ | ✅ | editable progress draft | +| Slack | ✅ | ✅ | ✅ | ✅ | +| Mattermost | ✅ | ✅ | ✅ | ✅ | +| MS Teams | ✅ | ✅ | ✅ | native progress stream | Slack-only: @@ -189,10 +197,10 @@ Preview streaming can also include **tool-progress** updates — short status li Supported surfaces: -- **Discord**, **Slack**, **Telegram**, and **Matrix** stream tool-progress into the live preview edit by default when preview streaming is active. +- **Discord**, **Slack**, **Telegram**, and **Matrix** stream tool-progress into the live preview edit by default when preview streaming is active. Microsoft Teams uses its native progress stream in personal chats. - Telegram has shipped with tool-progress preview updates enabled since `v2026.4.22`; keeping them enabled preserves that released behavior. - **Mattermost** already folds tool activity into its single draft preview post (see above). -- Tool-progress edits follow the active preview streaming mode; they are skipped when preview streaming is `off` or when block streaming has taken over the message. On Telegram, `streaming.mode: "off"` is final-only: generic progress chatter is also suppressed instead of being delivered as standalone "Working..." messages, while approval prompts, media payloads, and errors still route normally. +- Tool-progress edits follow the active preview streaming mode; they are skipped when preview streaming is `off` or when block streaming has taken over the message. On Telegram, `streaming.mode: "off"` is final-only: generic progress chatter is also suppressed instead of being delivered as standalone status messages, while approval prompts, media payloads, and errors still route normally. - To keep preview streaming but hide tool-progress lines, set `streaming.preview.toolProgress` to `false` for that channel. To disable preview edits entirely, set `streaming.mode` to `off`. - Telegram selected quote replies are an exception: when `replyToMode` is not `"off"` and selected quote text is present, OpenClaw skips the answer preview stream for that turn so tool-progress preview lines cannot render. Current-message replies without selected quote text still keep preview streaming. See [Telegram channel docs](/channels/telegram) for details. @@ -215,6 +223,7 @@ Example: ## Related +- [Progress drafts](/concepts/progress-drafts) — visible work-in-progress messages that update during long turns - [Messages](/concepts/messages) — message lifecycle and delivery - [Retry](/concepts/retry) — retry behavior on delivery failure - [Channels](/channels) — per-channel streaming support diff --git a/docs/docs.json b/docs/docs.json index f70885136ed..0150d0d303a 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -1164,6 +1164,7 @@ "pages": [ "concepts/messages", "concepts/streaming", + "concepts/progress-drafts", "concepts/retry", "concepts/queue", "concepts/queue-steering" diff --git a/docs/gateway/config-channels.md b/docs/gateway/config-channels.md index 86b5eca5ceb..5116acd23a5 100644 --- a/docs/gateway/config-channels.md +++ b/docs/gateway/config-channels.md @@ -272,7 +272,7 @@ WhatsApp runs through the gateway's web channel (Baileys Web). It starts automat historyLimit: 20, textChunkLimit: 2000, chunkMode: "length", // length | newline - streaming: "off", // off | partial | block | progress (progress maps to partial on Discord) + streaming: "off", // off | partial | block | progress maxLinesPerMessage: 17, ui: { components: { diff --git a/extensions/discord/src/config-ui-hints.ts b/extensions/discord/src/config-ui-hints.ts index 0c0377a096f..b54f4aebbc0 100644 --- a/extensions/discord/src/config-ui-hints.ts +++ b/extensions/discord/src/config-ui-hints.ts @@ -31,11 +31,11 @@ export const discordChannelConfigUiHints = { }, streaming: { label: "Discord Streaming Mode", - help: 'Unified Discord stream preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Discord. Legacy boolean/streamMode keys are auto-mapped.', + help: 'Unified Discord stream preview mode: "off" | "partial" | "block" | "progress". "progress" keeps a single editable progress draft until final delivery. Legacy boolean/streamMode keys are auto-mapped.', }, "streaming.mode": { label: "Discord Streaming Mode", - help: 'Canonical Discord preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Discord.', + help: 'Canonical Discord preview mode: "off" | "partial" | "block" | "progress".', }, "streaming.chunkMode": { label: "Discord Chunk Mode", diff --git a/extensions/discord/src/monitor/message-handler.draft-preview.ts b/extensions/discord/src/monitor/message-handler.draft-preview.ts index 28e24fe10fe..f7a12b4855b 100644 --- a/extensions/discord/src/monitor/message-handler.draft-preview.ts +++ b/extensions/discord/src/monitor/message-handler.draft-preview.ts @@ -1,7 +1,10 @@ import { EmbeddedBlockChunker } from "openclaw/plugin-sdk/agent-runtime"; import { + formatChannelProgressDraftText, + resolveChannelProgressDraftMaxLines, resolveChannelStreamingBlockEnabled, resolveChannelStreamingPreviewToolProgress, + resolveChannelStreamingSuppressDefaultToolProgressMessages, } from "openclaw/plugin-sdk/channel-streaming"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { @@ -50,7 +53,7 @@ export function createDiscordDraftPreviewController(params: { channelId: params.deliverChannelId, maxChars: draftMaxChars, replyToMessageId: () => params.replyReference.peek(), - minInitialChars: 30, + minInitialChars: discordStreamMode === "progress" ? 0 : 30, throttleMs: 1200, log: params.log, warn: params.log, @@ -69,8 +72,15 @@ export function createDiscordDraftPreviewController(params: { let finalDeliveryHandled = false; const previewToolProgressEnabled = Boolean(draftStream) && resolveChannelStreamingPreviewToolProgress(params.discordConfig); + const suppressDefaultToolProgressMessages = + Boolean(draftStream) && + resolveChannelStreamingSuppressDefaultToolProgressMessages(params.discordConfig, { + draftStreamActive: true, + previewToolProgressEnabled, + }); let previewToolProgressSuppressed = false; let previewToolProgressLines: string[] = []; + const progressSeed = `${params.accountId}:${params.deliverChannelId}`; const resetProgressState = () => { lastPartialText = ""; @@ -91,6 +101,7 @@ export function createDiscordDraftPreviewController(params: { return { draftStream, previewToolProgressEnabled, + suppressDefaultToolProgressMessages, get finalizedViaPreviewMessage() { return finalizedViaPreviewMessage; }, @@ -101,6 +112,25 @@ export function createDiscordDraftPreviewController(params: { finalizedViaPreviewMessage = true; }, disableBlockStreamingForDraft: draftStream ? true : undefined, + async startProgressDraft() { + if (!draftStream || discordStreamMode !== "progress") { + return; + } + const previewText = formatChannelProgressDraftText({ + entry: params.discordConfig, + lines: [], + seed: progressSeed, + }); + if (!previewText || previewText === lastPartialText) { + return; + } + lastPartialText = previewText; + draftText = previewText; + hasStreamedMessage = true; + draftChunker?.reset(); + draftStream.update(previewText); + await draftStream.flush(); + }, pushToolProgress(line?: string) { if (!draftStream || !previewToolProgressEnabled || previewToolProgressSuppressed) { return; @@ -113,11 +143,14 @@ export function createDiscordDraftPreviewController(params: { if (previous === normalized) { return; } - previewToolProgressLines = [...previewToolProgressLines, normalized].slice(-8); - const previewText = [ - "Working…", - ...previewToolProgressLines.map((entry) => `• ${entry}`), - ].join("\n"); + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(params.discordConfig), + ); + const previewText = formatChannelProgressDraftText({ + entry: params.discordConfig, + lines: previewToolProgressLines, + seed: progressSeed, + }); lastPartialText = previewText; draftText = previewText; hasStreamedMessage = true; @@ -170,6 +203,9 @@ export function createDiscordDraftPreviewController(params: { if (cleaned === lastPartialText) { return; } + if (discordStreamMode === "progress") { + return; + } previewToolProgressSuppressed = true; previewToolProgressLines = []; hasStreamedMessage = true; diff --git a/extensions/discord/src/monitor/message-handler.process.test.ts b/extensions/discord/src/monitor/message-handler.process.test.ts index 2a11c8a1a99..e9639a21138 100644 --- a/extensions/discord/src/monitor/message-handler.process.test.ts +++ b/extensions/discord/src/monitor/message-handler.process.test.ts @@ -126,6 +126,7 @@ type DispatchInboundParams = { summary?: string; title?: string; }) => Promise | void; + onReplyStart?: () => Promise | void; sourceReplyDeliveryMode?: "automatic" | "message_tool_only"; disableBlockStreaming?: boolean; suppressDefaultToolProgressMessages?: boolean; @@ -190,6 +191,7 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({ }, createReplyDispatcherWithTyping: (opts: { deliver: (payload: unknown, info: { kind: string }) => Promise | void; + onReplyStart?: () => Promise | void; }) => ({ dispatcher: { sendToolResult: vi.fn(() => true), @@ -205,7 +207,9 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", () => ({ getQueuedCounts: vi.fn(() => ({ tool: 0, block: 0, final: 0 })), markComplete: vi.fn(), }, - replyOptions: {}, + replyOptions: { + onReplyStart: opts.onReplyStart, + }, markDispatchIdle: vi.fn(), markRunComplete: vi.fn(), }), @@ -1237,7 +1241,9 @@ describe("processDiscordMessage draft streaming", () => { await runProcessDiscordMessage(ctx); } - async function createBlockModeContext() { + async function createBlockModeContext( + discordConfig: Record = { streamMode: "block" }, + ) { return await createAutomaticSourceDeliveryContext({ cfg: { messages: { ackReaction: "👀" }, @@ -1248,7 +1254,7 @@ describe("processDiscordMessage draft streaming", () => { }, }, }, - discordConfig: { streamMode: "block" }, + discordConfig, }); } @@ -1422,6 +1428,78 @@ describe("processDiscordMessage draft streaming", () => { expect(updates).toEqual(["Hello", "HelloWorld"]); }); + it("keeps canonical block mode on the Discord draft preview path", async () => { + const draftStream = createMockDraftStreamForTest(); + + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.replyOptions?.onPartialReply?.({ text: "HelloWorld" }); + return createNoQueuedDispatchResult(); + }); + + const ctx = await createBlockModeContext({ streaming: { mode: "block" } }); + + await runProcessDiscordMessage(ctx); + + expect(draftStream.update).toHaveBeenCalledWith("Hello"); + expect(dispatchInboundMessage.mock.calls[0]?.[0]?.replyOptions?.disableBlockStreaming).toBe( + true, + ); + }); + + it("keeps progress label visible when Discord tool progress lines are disabled", async () => { + const draftStream = createMockDraftStreamForTest(); + + dispatchInboundMessage.mockImplementationOnce(async (params?: DispatchInboundParams) => { + await params?.replyOptions?.onReplyStart?.(); + await params?.replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + return createNoQueuedDispatchResult(); + }); + + const ctx = await createAutomaticSourceDeliveryContext({ + discordConfig: { + streaming: { + mode: "progress", + progress: { + label: "Shelling", + toolProgress: false, + }, + }, + }, + }); + + await runProcessDiscordMessage(ctx); + + expect(draftStream.update).toHaveBeenCalledTimes(1); + expect(draftStream.update).toHaveBeenCalledWith("Shelling"); + expect(draftStream.flush).toHaveBeenCalledTimes(1); + expect(dispatchInboundMessage.mock.calls[0]?.[0]?.replyOptions).toMatchObject({ + suppressDefaultToolProgressMessages: true, + }); + }); + + it("keeps standalone Discord tool progress when partial preview lines are disabled", async () => { + createMockDraftStreamForTest(); + + dispatchInboundMessage.mockImplementationOnce(async () => createNoQueuedDispatchResult()); + + const ctx = await createAutomaticSourceDeliveryContext({ + discordConfig: { + streaming: { + mode: "partial", + preview: { + toolProgress: false, + }, + }, + }, + }); + + await runProcessDiscordMessage(ctx); + + expect( + dispatchInboundMessage.mock.calls[0]?.[0]?.replyOptions?.suppressDefaultToolProgressMessages, + ).toBeUndefined(); + }); + it("strips reply tags from preview partials", async () => { const draftStream = createMockDraftStreamForTest(); diff --git a/extensions/discord/src/monitor/message-handler.process.ts b/extensions/discord/src/monitor/message-handler.process.ts index fe7482fbbb0..91435dafd55 100644 --- a/extensions/discord/src/monitor/message-handler.process.ts +++ b/extensions/discord/src/monitor/message-handler.process.ts @@ -565,6 +565,7 @@ export async function processDiscordMessage( } await replyPipeline.typingCallbacks?.onReplyStart(); await statusReactions.setThinking(); + await draftPreview.startProgressDraft(); }, }); @@ -643,9 +644,8 @@ export async function processDiscordMessage( ? draftPreview.handleAssistantMessageBoundary : undefined, onModelSelected, - suppressDefaultToolProgressMessages: draftPreview.previewToolProgressEnabled - ? true - : undefined, + suppressDefaultToolProgressMessages: + draftPreview.suppressDefaultToolProgressMessages ? true : undefined, onReasoningStream: async () => { await statusReactions.setThinking(); }, diff --git a/extensions/discord/src/preview-streaming.ts b/extensions/discord/src/preview-streaming.ts index afc11d286d1..d52e17ce98f 100644 --- a/extensions/discord/src/preview-streaming.ts +++ b/extensions/discord/src/preview-streaming.ts @@ -1,8 +1,9 @@ -type DiscordPreviewStreamMode = "off" | "partial" | "block"; +import { + resolveChannelPreviewStreamMode, + type StreamingMode, +} from "openclaw/plugin-sdk/channel-streaming"; -function parsePreviewStreamingMode(value: unknown): DiscordPreviewStreamMode | undefined { - return value === "off" || value === "partial" || value === "block" ? value : undefined; -} +type DiscordPreviewStreamMode = StreamingMode; export function resolveDiscordPreviewStreamMode( params: { @@ -10,23 +11,5 @@ export function resolveDiscordPreviewStreamMode( streaming?: unknown; } = {}, ): DiscordPreviewStreamMode { - const parsedStreaming = - params.streaming && typeof params.streaming === "object" && !Array.isArray(params.streaming) - ? parsePreviewStreamingMode( - (params.streaming as Record).mode ?? - (params.streaming as Record).streaming, - ) - : parsePreviewStreamingMode(params.streaming); - if (parsedStreaming) { - return parsedStreaming; - } - - const legacy = parsePreviewStreamingMode(params.streamMode); - if (legacy) { - return legacy; - } - if (typeof params.streaming === "boolean") { - return params.streaming ? "partial" : "off"; - } - return "off"; + return resolveChannelPreviewStreamMode(params, "off"); } diff --git a/extensions/matrix/src/config-schema.test.ts b/extensions/matrix/src/config-schema.test.ts index 7788bc4f036..e99405ffd18 100644 --- a/extensions/matrix/src/config-schema.test.ts +++ b/extensions/matrix/src/config-schema.test.ts @@ -88,15 +88,29 @@ describe("MatrixConfigSchema SecretInput", () => { expect(result.success).toBe(true); }); + it("accepts scalar progress Matrix streaming mode", () => { + const result = MatrixConfigSchema.safeParse({ + homeserver: "https://matrix.example.org", + accessToken: "token", + streaming: "progress", + }); + expect(result.success).toBe(true); + }); + it("accepts Matrix streaming preview tool progress config", () => { const result = MatrixConfigSchema.safeParse({ homeserver: "https://matrix.example.org", accessToken: "token", streaming: { - mode: "partial", - preview: { + mode: "progress", + progress: { + label: "Shelling", + maxLines: 4, toolProgress: false, }, + preview: { + toolProgress: true, + }, }, }); expect(result.success).toBe(true); diff --git a/extensions/matrix/src/config-schema.ts b/extensions/matrix/src/config-schema.ts index 54db054c648..95ce8e43c69 100644 --- a/extensions/matrix/src/config-schema.ts +++ b/extensions/matrix/src/config-schema.ts @@ -66,7 +66,16 @@ const matrixNetworkSchema = z const matrixStreamingSchema = z .object({ - mode: z.enum(["partial", "quiet", "off"]).optional(), + mode: z.enum(["partial", "quiet", "progress", "off"]).optional(), + progress: z + .object({ + label: z.union([z.string(), z.literal(false)]).optional(), + labels: z.array(z.string()).optional(), + maxLines: z.number().int().positive().optional(), + toolProgress: z.boolean().optional(), + }) + .strict() + .optional(), preview: z .object({ toolProgress: z.boolean().optional(), @@ -99,7 +108,7 @@ export const MatrixConfigSchema = z.object({ contextVisibility: ContextVisibilityModeSchema.optional(), blockStreaming: z.boolean().optional(), streaming: z - .union([z.enum(["partial", "quiet", "off"]), z.boolean(), matrixStreamingSchema]) + .union([z.enum(["partial", "quiet", "progress", "off"]), z.boolean(), matrixStreamingSchema]) .optional(), replyToMode: z.enum(["off", "first", "all", "batched"]).optional(), threadReplies: z.enum(["off", "inbound", "always"]).optional(), diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 4859873619a..b0d486e090f 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -1,6 +1,11 @@ import { vi } from "vitest"; import type { RuntimeEnv, RuntimeLogger } from "../../runtime-api.js"; -import type { MatrixRoomConfig, MatrixStreamingMode, ReplyToMode } from "../../types.js"; +import type { + MatrixConfig, + MatrixRoomConfig, + MatrixStreamingMode, + ReplyToMode, +} from "../../types.js"; import type { MatrixClient } from "../sdk.js"; import { createMatrixRoomMessageHandler, type MatrixMonitorHandlerParams } from "./handler.js"; import { EventType, type MatrixRawEvent, type RoomMessageEventContent } from "./types.js"; @@ -16,6 +21,7 @@ const DEFAULT_ROUTE = { type MatrixHandlerTestHarnessOptions = { accountId?: string; + accountConfig?: MatrixConfig; cfg?: unknown; client?: Partial; runtime?: RuntimeEnv; @@ -264,6 +270,7 @@ export function createMatrixHandlerTestHarness( } as never, cfg: cfgForHandler as never, accountId: options.accountId ?? "ops", + accountConfig: options.accountConfig, runtime: options.runtime ?? ({ diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index 429f8685b5e..9392d01ed8e 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -2536,6 +2536,7 @@ describe("matrix monitor handler draft streaming", () => { info: { kind: string }, ) => Promise; type ReplyOpts = { + onReplyStart?: () => Promise | void; onPartialReply?: (payload: { text: string }) => void; onBlockReplyQueued?: ( payload: { @@ -2576,8 +2577,9 @@ describe("matrix monitor handler draft streaming", () => { function createStreamingHarness(opts?: { replyToMode?: "off" | "first" | "all" | "batched"; blockStreamingEnabled?: boolean; - streaming?: "partial" | "quiet"; + streaming?: "partial" | "quiet" | "progress"; previewToolProgressEnabled?: boolean; + accountConfig?: import("../../types.js").MatrixConfig; }) { let capturedDeliver: DeliverFn | undefined; let capturedReplyOpts: ReplyOpts | undefined; @@ -2607,6 +2609,7 @@ describe("matrix monitor handler draft streaming", () => { const { handler } = createMatrixHandlerTestHarness({ streaming: opts?.streaming ?? "quiet", + accountConfig: opts?.accountConfig, previewToolProgressEnabled: opts?.previewToolProgressEnabled ?? false, blockStreamingEnabled: opts?.blockStreamingEnabled ?? false, replyToMode: opts?.replyToMode ?? "off", @@ -2702,9 +2705,7 @@ describe("matrix monitor handler draft streaming", () => { await vi.waitFor(() => { expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); }); - expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toBe( - "Working...\n- `tool: read_file`", - ); + expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toMatch(/\n- `tool: read_file`$/); await deliver({ text: "Done" }, { kind: "final" }); @@ -2721,6 +2722,40 @@ describe("matrix monitor handler draft streaming", () => { await finish(); }); + it("uses resolved Matrix account progress config for draft text", async () => { + const { dispatch } = createStreamingHarness({ + streaming: "progress", + previewToolProgressEnabled: true, + accountConfig: { + streaming: { + mode: "progress", + progress: { + label: "Pearling", + maxLines: 1, + }, + }, + } as never, + }); + const { opts, finish } = await dispatch(); + + await opts.onReplyStart?.(); + await opts.onItemEvent?.({ progressText: "first" }); + await opts.onItemEvent?.({ progressText: "second" }); + + await vi.waitFor(() => { + expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); + }); + await vi.waitFor(() => { + expect(editMessageMatrixMock).toHaveBeenCalledWith( + "!room:example.org", + "$draft1", + "Pearling\n- `second`", + expect.anything(), + ); + }); + await finish(); + }); + it("keeps Matrix tool progress mentions inside code formatting", async () => { const { dispatch } = createStreamingHarness({ previewToolProgressEnabled: true, @@ -2735,8 +2770,8 @@ describe("matrix monitor handler draft streaming", () => { await vi.waitFor(() => { expect(sendSingleTextMessageMatrixMock).toHaveBeenCalledTimes(1); }); - expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toBe( - "Working...\n- `@room ping @alice:example.org [label](https://example.org)`", + expect(sendSingleTextMessageMatrixMock.mock.calls[0]?.[1]).toMatch( + /\n- `@room ping @alice:example\.org \[label\]\(https:\/\/example\.org\)`$/, ); await finish(); }); @@ -2753,6 +2788,19 @@ describe("matrix monitor handler draft streaming", () => { await finish(); }); + it("suppresses standalone Matrix tool progress in progress mode when draft lines are disabled", async () => { + const { dispatch } = createStreamingHarness({ + streaming: "progress", + previewToolProgressEnabled: false, + }); + const { opts, finish } = await dispatch(); + + expect(opts.suppressDefaultToolProgressMessages).toBe(true); + expect(opts.onToolStart).toBeUndefined(); + expect(sendSingleTextMessageMatrixMock).not.toHaveBeenCalled(); + await finish(); + }); + it("keeps partial preview-first finalization on the existing draft when text is unchanged", async () => { const { dispatch, redactEventMock } = createStreamingHarness({ blockStreamingEnabled: true, diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index a0217127ed6..e1581a38f17 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -1,3 +1,7 @@ +import { + formatChannelProgressDraftText, + resolveChannelProgressDraftMaxLines, +} from "openclaw/plugin-sdk/channel-streaming"; import { resolveControlCommandGate } from "openclaw/plugin-sdk/command-gating"; import { evaluateSupplementalContextVisibility, @@ -13,6 +17,7 @@ import { import { normalizeOptionalString } from "openclaw/plugin-sdk/string-coerce-runtime"; import type { CoreConfig, + MatrixConfig, MatrixRoomConfig, MatrixStreamingMode, ReplyToMode, @@ -157,6 +162,7 @@ export type MatrixMonitorHandlerParams = { core: PluginRuntime; cfg: CoreConfig; accountId: string; + accountConfig?: MatrixConfig; runtime: RuntimeEnv; logger: RuntimeLogger; logVerboseMessage: (message: string) => void; @@ -1449,7 +1455,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }, }); const draftStreamingEnabled = streaming !== "off"; - const quietDraftStreaming = streaming === "quiet"; + const quietDraftStreaming = streaming === "quiet" || streaming === "progress"; + const progressDraftStreaming = streaming === "progress"; const draftReplyToId = replyToMode !== "off" && !threadTarget ? _messageId : undefined; const draftStream: MatrixDraftStreamHandle | undefined = draftStreamingEnabled ? await loadMatrixDraftStream().then(({ createMatrixDraftStream }) => @@ -1468,6 +1475,9 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam : undefined; draftStreamRef = draftStream; const shouldStreamPreviewToolProgress = Boolean(draftStream) && previewToolProgressEnabled; + const shouldSuppressDefaultToolProgressMessages = + Boolean(draftStream) && + (shouldStreamPreviewToolProgress || params.streaming === "progress"); type PendingDraftBoundary = { messageGeneration: number; endOffset: number; @@ -1483,6 +1493,8 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam let currentDraftReplyToId = draftReplyToId; let previewToolProgressSuppressed = false; let previewToolProgressLines: string[] = []; + const progressConfigEntry = params.accountConfig ?? cfg.channels?.matrix; + const progressSeed = `${_route.accountId}:${roomId}`; // Set after the first final payload consumes or discards the draft event // so subsequent finals go through normal delivery. @@ -1498,14 +1510,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam if (previous === normalized) { return; } - previewToolProgressLines = [...previewToolProgressLines, normalized].slice(-8); + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(progressConfigEntry), + ); draftStream.update( - [ - "Working...", - ...previewToolProgressLines.map( - (entry) => `- ${formatMatrixToolProgressMarkdownCode(entry)}`, - ), - ].join("\n"), + formatChannelProgressDraftText({ + entry: progressConfigEntry, + lines: previewToolProgressLines, + seed: progressSeed, + formatLine: formatMatrixToolProgressMarkdownCode, + bullet: "-", + }), ); }; @@ -1523,11 +1538,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam }; const buildPreviewToolProgressReplyOptions = (): Partial => { - if (!shouldStreamPreviewToolProgress) { + if (!shouldSuppressDefaultToolProgressMessages) { return {}; } - return { + const options: Partial = { suppressDefaultToolProgressMessages: true, + }; + if (!shouldStreamPreviewToolProgress) { + return options; + } + return { + ...options, onToolStart: async (payload) => { const toolName = payload.name?.trim(); pushPreviewToolProgress(toolName ? `tool: ${toolName}` : "tool running"); @@ -1937,8 +1958,25 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam // for the current assistant block, while block deliveries // finalize completed blocks into their own preserved events. disableBlockStreaming: !blockStreamingEnabled, + onReplyStart: + draftStream && progressDraftStreaming + ? () => { + draftStream.update( + formatChannelProgressDraftText({ + entry: progressConfigEntry, + lines: [], + seed: progressSeed, + formatLine: formatMatrixToolProgressMarkdownCode, + bullet: "-", + }), + ); + } + : undefined, onPartialReply: draftStream ? (payload) => { + if (progressDraftStreaming) { + return; + } latestDraftFullText = payload.text ?? ""; suppressPreviewToolProgressForAnswerText(latestDraftFullText); updateDraftFromLatestFullText(); diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index bf6f7d0a593..96f111c022b 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -475,12 +475,20 @@ describe("monitorMatrixProvider", () => { ["off", "off", false], ["partial", "partial", true], ["quiet", "quiet", true], + ["progress", "progress", true], [{}, "off", false], [{ mode: "off" }, "off", false], [{ mode: "partial" }, "partial", true], [{ mode: "quiet" }, "quiet", true], + [{ mode: "progress" }, "progress", true], [{ mode: "partial", preview: { toolProgress: false } }, "partial", false], [{ mode: "quiet", preview: { toolProgress: false } }, "quiet", false], + [{ mode: "progress", progress: { toolProgress: false } }, "progress", false], + [ + { mode: "progress", progress: { toolProgress: false }, preview: { toolProgress: true } }, + "progress", + false, + ], [{ mode: "off", preview: { toolProgress: true } }, "off", false], ] satisfies Array<[MatrixConfig["streaming"], MatrixStreamingMode, boolean]>)( "resolves streaming=%j to mode=%s and toolProgress=%s", diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index 4074029c886..88ad887c601 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -79,8 +79,15 @@ function resolveMatrixStreamingMode(streaming: MatrixConfig["streaming"]): Matri if (streaming === "quiet") { return "quiet"; } + if (streaming === "progress") { + return "progress"; + } if (isMatrixStreamingConfig(streaming)) { - if (streaming.mode === "partial" || streaming.mode === "quiet") { + if ( + streaming.mode === "partial" || + streaming.mode === "quiet" || + streaming.mode === "progress" + ) { return streaming.mode; } } @@ -91,7 +98,7 @@ function resolveMatrixPreviewToolProgress(streaming: MatrixConfig["streaming"]): if (!isMatrixStreamingConfig(streaming)) { return true; } - return streaming.preview?.toolProgress ?? true; + return streaming.progress?.toolProgress ?? streaming.preview?.toolProgress ?? true; } function resolveMatrixPreviewToolProgressEnabled(streaming: MatrixConfig["streaming"]): boolean { @@ -368,6 +375,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi core, cfg, accountId: effectiveAccountId, + accountConfig, runtime, logger, logVerboseMessage, diff --git a/extensions/matrix/src/types.ts b/extensions/matrix/src/types.ts index 8fd263ddf9b..d923373bef4 100644 --- a/extensions/matrix/src/types.ts +++ b/extensions/matrix/src/types.ts @@ -86,11 +86,12 @@ export type MatrixExecApprovalConfig = { target?: MatrixExecApprovalTarget; }; -export type MatrixStreamingMode = "partial" | "quiet" | "off"; +export type MatrixStreamingMode = "partial" | "quiet" | "progress" | "off"; export type MatrixStreamingConfig = { /** Preview streaming mode for Matrix replies. Default: "off". */ mode?: MatrixStreamingMode; + progress?: import("openclaw/plugin-sdk/channel-streaming").ChannelStreamingProgressConfig; preview?: { /** Show tool/progress activity in the live draft preview. Default: true. */ toolProgress?: boolean; @@ -207,13 +208,16 @@ export type MatrixConfig = { * messages. This preserves legacy preview-first notification behavior. * - `"quiet"`: edit a single quiet draft notice in place for the current * assistant block as the model generates text. + * - `"progress"`: edit a single draft status message with shared progress + * labels and optional tool/task lines until the final answer is ready. * - `"off"`: deliver the full reply once the model finishes. * - Use `blockStreaming: true` when you want completed assistant blocks to * stay visible as separate progress messages. When combined with * preview streaming, Matrix keeps a live draft for the current block and * preserves completed blocks as separate messages. - * - `streaming.preview.toolProgress: false` keeps answer preview edits but - * hides interim tool/progress lines. + * - `streaming.progress.toolProgress: false` hides interim tool/progress + * lines in progress mode. `streaming.preview.toolProgress: false` keeps + * legacy answer preview edits but hides interim tool/progress lines. * - `true` maps to `"partial"`, `false` maps to `"off"` for backward * compatibility. Object form uses `streaming.mode`. * Default: `"off"`. diff --git a/extensions/msteams/src/config-ui-hints.ts b/extensions/msteams/src/config-ui-hints.ts index 5649376ff2b..44dcd61af11 100644 --- a/extensions/msteams/src/config-ui-hints.ts +++ b/extensions/msteams/src/config-ui-hints.ts @@ -9,4 +9,12 @@ export const msTeamsChannelConfigUiHints = { label: "MS Teams Config Writes", help: "Allow Microsoft Teams to write config in response to channel events/commands (default: true).", }, + streaming: { + label: "MS Teams Streaming", + help: 'Microsoft Teams preview/progress streaming mode: "off" | "partial" | "block" | "progress". Personal chats use Teams native streaminfo progress when available.', + }, + "streaming.progress.label": { + label: "MS Teams Progress Label", + help: 'Initial progress title. Use "auto" for built-in single-word labels, a custom string, or false to hide the title.', + }, } satisfies Record; diff --git a/extensions/msteams/src/reply-dispatcher.test.ts b/extensions/msteams/src/reply-dispatcher.test.ts index 22f86d908ab..2228bcb1180 100644 --- a/extensions/msteams/src/reply-dispatcher.test.ts +++ b/extensions/msteams/src/reply-dispatcher.test.ts @@ -344,6 +344,21 @@ describe("createMSTeamsReplyDispatcher", () => { expect(dispatcher.replyOptions.disableBlockStreaming).toBe(false); }); + it("maps streaming.mode=block to block delivery without native Teams streaming", async () => { + renderReplyPayloadsToMessagesMock.mockReturnValue([{ content: "hello" }] as never); + sendMSTeamsMessagesMock.mockResolvedValue(["id-1"] as never); + + const dispatcher = createDispatcher("personal", { streaming: { mode: "block" } }); + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + + await options.deliver({ text: "block content" }); + + expect(streamInstances).toHaveLength(0); + expect(dispatcher.replyOptions.onPartialReply).toBeUndefined(); + expect(dispatcher.replyOptions.disableBlockStreaming).toBe(false); + expect(sendMSTeamsMessagesMock).toHaveBeenCalledTimes(1); + }); + it("sets disableBlockStreaming=true when blockStreaming=false", () => { const dispatcher = createDispatcher("personal", { blockStreaming: false }); @@ -431,7 +446,15 @@ describe("createMSTeamsReplyDispatcher", () => { describe("pickInformativeStatusText", () => { it("selects a deterministic status line for a fixed random source", () => { - expect(pickInformativeStatusText(() => 0)).toBe("Thinking..."); - expect(pickInformativeStatusText(() => 0.99)).toBe("Putting an answer together..."); + expect(pickInformativeStatusText(() => 0)).toBe("Thinking"); + expect(pickInformativeStatusText(() => 0.99)).toBe("Surfacing"); + }); + + it("honors disabled progress labels", () => { + expect( + pickInformativeStatusText({ + config: { streaming: { progress: { label: false } } } as never, + }), + ).toBeUndefined(); }); }); diff --git a/extensions/msteams/src/reply-dispatcher.ts b/extensions/msteams/src/reply-dispatcher.ts index 01e78bff0d6..1566e6f2751 100644 --- a/extensions/msteams/src/reply-dispatcher.ts +++ b/extensions/msteams/src/reply-dispatcher.ts @@ -1,3 +1,7 @@ +import { + resolveChannelPreviewStreamMode, + resolveChannelStreamingBlockEnabled, +} from "openclaw/plugin-sdk/channel-streaming"; import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime"; import { createChannelReplyPipeline, @@ -147,12 +151,16 @@ export function createMSTeamsReplyDispatcher(params: { context: params.context, feedbackLoopEnabled, log: params.log, + msteamsConfig: msteamsCfg, + progressSeed: `${params.accountId ?? "default"}:${params.conversationRef.conversation?.id ?? ""}`, }); // Wire the forward-declared gate used by sendTypingIndicator. streamActiveRef.current = () => streamController.isStreamActive(); - const blockStreamingEnabled = - typeof msteamsCfg?.blockStreaming === "boolean" ? msteamsCfg.blockStreaming : false; + const teamsStreamMode = resolveChannelPreviewStreamMode(msteamsCfg, "partial"); + const resolvedBlockStreamingEnabled = + teamsStreamMode === "block" ? true : resolveChannelStreamingBlockEnabled(msteamsCfg); + const blockStreamingEnabled = resolvedBlockStreamingEnabled ?? false; const typingIndicatorEnabled = typeof msteamsCfg?.typingIndicator === "boolean" ? msteamsCfg.typingIndicator : true; @@ -268,7 +276,7 @@ export function createMSTeamsReplyDispatcher(params: { }, typingCallbacks, deliver: async (payload) => { - const preparedPayload = streamController.preparePayload(payload); + const preparedPayload = await streamController.preparePayload(payload); if (!preparedPayload) { return; } @@ -338,7 +346,9 @@ export function createMSTeamsReplyDispatcher(params: { } : {}), disableBlockStreaming: - typeof msteamsCfg?.blockStreaming === "boolean" ? !msteamsCfg.blockStreaming : undefined, + typeof resolvedBlockStreamingEnabled === "boolean" + ? !resolvedBlockStreamingEnabled + : undefined, onModelSelected, }, markDispatchIdle, diff --git a/extensions/msteams/src/reply-stream-controller.test.ts b/extensions/msteams/src/reply-stream-controller.test.ts index 96a3ee707b1..97358efe960 100644 --- a/extensions/msteams/src/reply-stream-controller.test.ts +++ b/extensions/msteams/src/reply-stream-controller.test.ts @@ -9,6 +9,7 @@ const streamInstances = vi.hoisted( streamedLength: number; sendInformativeUpdate: ReturnType; update: ReturnType; + replaceInformativeWithFinal: ReturnType; finalize: ReturnType; }>, ); @@ -21,12 +22,35 @@ vi.mock("./streaming-message.js", () => ({ streamedLength = 0; sendInformativeUpdate = vi.fn(async () => {}); update = vi.fn(function ( - this: { hasContent: boolean; streamedLength: number }, + this: { hasContent: boolean; isFailed: boolean; streamedLength: number }, payloadText?: string, ) { + if ((payloadText?.length ?? 0) > 4000) { + this.hasContent = false; + this.isFailed = true; + this.streamedLength = 0; + return; + } this.hasContent = true; this.streamedLength = payloadText?.length ?? 0; }); + replaceInformativeWithFinal = vi.fn(async function ( + this: { + hasContent: boolean; + isFailed: boolean; + isFinalized: boolean; + streamedLength: number; + update: (payloadText?: string) => void; + }, + payloadText: string, + ) { + this.update(payloadText); + if (this.isFailed) { + return false; + } + this.isFinalized = true; + return this.hasContent; + }); finalize = vi.fn(async function (this: { isFinalized: boolean }) { this.isFinalized = true; }); @@ -50,15 +74,15 @@ describe("createTeamsReplyStreamController", () => { }); } - it("suppresses fallback for first text segment that was streamed", () => { + it("suppresses fallback for first text segment that was streamed", async () => { const ctrl = createController(); ctrl.onPartialReply({ text: "Hello world" }); - const result = ctrl.preparePayload({ text: "Hello world" }); + const result = await ctrl.preparePayload({ text: "Hello world" }); expect(result).toBeUndefined(); }); - it("when stream fails after partial delivery, fallback sends only remaining text", () => { + it("when stream fails after partial delivery, fallback sends only remaining text", async () => { const ctrl = createController(); const fullText = "a".repeat(4000) + "b".repeat(200); @@ -68,11 +92,11 @@ describe("createTeamsReplyStreamController", () => { streamInstances[0].isFinalized = true; streamInstances[0].streamedLength = 4000; - const result = ctrl.preparePayload({ text: fullText }); + const result = await ctrl.preparePayload({ text: fullText }); expect(result).toEqual({ text: "b".repeat(200) }); }); - it("when stream fails before sending content, fallback sends full text", () => { + it("when stream fails before sending content, fallback sends full text", async () => { const ctrl = createController(); const fullText = "Failure at first chunk"; @@ -82,43 +106,43 @@ describe("createTeamsReplyStreamController", () => { streamInstances[0].isFinalized = true; streamInstances[0].streamedLength = 0; - const result = ctrl.preparePayload({ text: fullText }); + const result = await ctrl.preparePayload({ text: fullText }); expect(result).toEqual({ text: fullText }); }); - it("allows fallback delivery for second text segment after tool calls", () => { + it("allows fallback delivery for second text segment after tool calls", async () => { const ctrl = createController(); // First text segment: streaming tokens arrive ctrl.onPartialReply({ text: "First segment" }); // First segment complete: preparePayload suppresses (stream handled it) - const result1 = ctrl.preparePayload({ text: "First segment" }); + const result1 = await ctrl.preparePayload({ text: "First segment" }); expect(result1).toBeUndefined(); // Tool calls happen... then second text segment arrives via deliver() // preparePayload should allow fallback delivery for this segment - const result2 = ctrl.preparePayload({ text: "Second segment after tools" }); + const result2 = await ctrl.preparePayload({ text: "Second segment after tools" }); expect(result2).toEqual({ text: "Second segment after tools" }); }); - it("finalizes the stream when suppressing first segment", () => { + it("finalizes the stream when suppressing first segment", async () => { const ctrl = createController(); ctrl.onPartialReply({ text: "Streamed text" }); - ctrl.preparePayload({ text: "Streamed text" }); + await ctrl.preparePayload({ text: "Streamed text" }); expect(streamInstances[0]?.finalize).toHaveBeenCalled(); }); - it("uses fallback even when onPartialReply fires after stream finalized", () => { + it("uses fallback even when onPartialReply fires after stream finalized", async () => { const ctrl = createController(); // First text segment: streaming tokens arrive ctrl.onPartialReply({ text: "First segment" }); // First segment complete: preparePayload suppresses and finalizes stream - const result1 = ctrl.preparePayload({ text: "First segment" }); + const result1 = await ctrl.preparePayload({ text: "First segment" }); expect(result1).toBeUndefined(); expect(streamInstances[0]?.isFinalized).toBe(true); @@ -126,37 +150,37 @@ describe("createTeamsReplyStreamController", () => { ctrl.onPartialReply({ text: "Second segment" }); // Must still use fallback because stream is finalized and can't deliver - const result2 = ctrl.preparePayload({ text: "Second segment" }); + const result2 = await ctrl.preparePayload({ text: "Second segment" }); expect(result2).toEqual({ text: "Second segment" }); }); - it("delivers all segments across 3+ tool call rounds", () => { + it("delivers all segments across 3+ tool call rounds", async () => { const ctrl = createController(); // Round 1: text → tool ctrl.onPartialReply({ text: "Segment 1" }); - expect(ctrl.preparePayload({ text: "Segment 1" })).toBeUndefined(); + await expect(ctrl.preparePayload({ text: "Segment 1" })).resolves.toBeUndefined(); // Round 2: text → tool ctrl.onPartialReply({ text: "Segment 2" }); - const r2 = ctrl.preparePayload({ text: "Segment 2" }); + const r2 = await ctrl.preparePayload({ text: "Segment 2" }); expect(r2).toEqual({ text: "Segment 2" }); // Round 3: final text ctrl.onPartialReply({ text: "Segment 3" }); - const r3 = ctrl.preparePayload({ text: "Segment 3" }); + const r3 = await ctrl.preparePayload({ text: "Segment 3" }); expect(r3).toEqual({ text: "Segment 3" }); }); - it("passes media+text payload through fully after stream finalized", () => { + it("passes media+text payload through fully after stream finalized", async () => { const ctrl = createController(); // First segment streamed and finalized ctrl.onPartialReply({ text: "Streamed text" }); - ctrl.preparePayload({ text: "Streamed text" }); + await ctrl.preparePayload({ text: "Streamed text" }); // Second segment has both text and media — should pass through fully - const result = ctrl.preparePayload({ + const result = await ctrl.preparePayload({ text: "Post-tool text with image", mediaUrl: "https://example.com/tool-output.png", }); @@ -166,11 +190,11 @@ describe("createTeamsReplyStreamController", () => { }); }); - it("still strips text from media payloads when stream handled text", () => { + it("still strips text from media payloads when stream handled text", async () => { const ctrl = createController(); ctrl.onPartialReply({ text: "Some text" }); - const result = ctrl.preparePayload({ + const result = await ctrl.preparePayload({ text: "Some text", mediaUrl: "https://example.com/image.png", }); @@ -180,6 +204,90 @@ describe("createTeamsReplyStreamController", () => { }); }); + it("falls back to normal delivery when progress final streaming fails", async () => { + streamInstances.length = 0; + const ctrl = createTeamsReplyStreamController({ + conversationType: "personal", + context: { sendActivity: vi.fn(async () => ({ id: "a" })) } as never, + feedbackLoopEnabled: false, + log: { debug: vi.fn() } as never, + msteamsConfig: { streaming: { mode: "progress" } } as never, + }); + await ctrl.onReplyStart(); + const fullText = "x".repeat(4200); + + const result = await ctrl.preparePayload({ text: fullText }); + + expect(result).toEqual({ text: fullText }); + expect(streamInstances[0]?.replaceInformativeWithFinal).toHaveBeenCalledWith(fullText); + }); + + it("falls back with full text when progress final send fails after streaming text", async () => { + streamInstances.length = 0; + const ctrl = createTeamsReplyStreamController({ + conversationType: "personal", + context: { sendActivity: vi.fn(async () => ({ id: "a" })) } as never, + feedbackLoopEnabled: false, + log: { debug: vi.fn() } as never, + msteamsConfig: { streaming: { mode: "progress" } } as never, + }); + await ctrl.onReplyStart(); + streamInstances[0].replaceInformativeWithFinal.mockImplementationOnce( + async function (this: { + hasContent: boolean; + isFailed: boolean; + isFinalized: boolean; + streamedLength: number; + }) { + this.hasContent = true; + this.isFailed = true; + this.isFinalized = true; + this.streamedLength = 12; + return false; + }, + ); + + const result = await ctrl.preparePayload({ text: "complete final answer" }); + + expect(result).toEqual({ text: "complete final answer" }); + }); + + it("honors disabled Teams progress labels", async () => { + streamInstances.length = 0; + const ctrl = createTeamsReplyStreamController({ + conversationType: "personal", + context: { sendActivity: vi.fn(async () => ({ id: "a" })) } as never, + feedbackLoopEnabled: false, + log: { debug: vi.fn() } as never, + msteamsConfig: { streaming: { mode: "progress", progress: { label: false } } } as never, + }); + + await ctrl.onReplyStart(); + + expect(streamInstances).toHaveLength(1); + expect(streamInstances[0]?.sendInformativeUpdate).not.toHaveBeenCalled(); + }); + + it("does not start native streaming for Teams block mode", async () => { + streamInstances.length = 0; + const ctrl = createTeamsReplyStreamController({ + conversationType: "personal", + context: { sendActivity: vi.fn(async () => ({ id: "a" })) } as never, + feedbackLoopEnabled: false, + log: { debug: vi.fn() } as never, + msteamsConfig: { streaming: { mode: "block" } } as never, + }); + + await ctrl.onReplyStart(); + ctrl.onPartialReply({ text: "block partial" }); + + expect(streamInstances).toHaveLength(0); + await expect(ctrl.preparePayload({ text: "block final" })).resolves.toEqual({ + text: "block final", + }); + expect(ctrl.hasStream()).toBe(false); + }); + describe("isStreamActive", () => { it("returns false before any tokens arrive so typing keepalive can warm up", () => { const ctrl = createController(); @@ -198,7 +306,7 @@ describe("createTeamsReplyStreamController", () => { expect(ctrl.isStreamActive()).toBe(true); }); - it("returns false after the stream is finalized between tool rounds", () => { + it("returns false after the stream is finalized between tool rounds", async () => { const ctrl = createController(); ctrl.onPartialReply({ text: "First segment" }); @@ -206,7 +314,7 @@ describe("createTeamsReplyStreamController", () => { // First segment complete: stream is finalized so the typing keepalive // can resume during the tool chain that follows. - ctrl.preparePayload({ text: "First segment" }); + await ctrl.preparePayload({ text: "First segment" }); expect(ctrl.isStreamActive()).toBe(false); }); diff --git a/extensions/msteams/src/reply-stream-controller.ts b/extensions/msteams/src/reply-stream-controller.ts index 38faf643b28..6d683d396cf 100644 --- a/extensions/msteams/src/reply-stream-controller.ts +++ b/extensions/msteams/src/reply-stream-controller.ts @@ -1,5 +1,9 @@ +import { + resolveChannelPreviewStreamMode, + resolveChannelProgressDraftLabel, +} from "openclaw/plugin-sdk/channel-streaming"; import { normalizeOptionalLowercaseString } from "openclaw/plugin-sdk/text-runtime"; -import type { ReplyPayload } from "../runtime-api.js"; +import type { MSTeamsConfig, ReplyPayload } from "../runtime-api.js"; import { formatUnknownError } from "./errors.js"; import type { MSTeamsMonitorLogger } from "./monitor-types.js"; import type { MSTeamsTurnContext } from "./sdk-types.js"; @@ -12,16 +16,15 @@ import { TeamsHttpStream } from "./streaming-message.js"; // when combined with `undefined` in a union. type Maybe = T | undefined; -const INFORMATIVE_STATUS_TEXTS = [ - "Thinking...", - "Working on that...", - "Checking the details...", - "Putting an answer together...", -]; - -export function pickInformativeStatusText(random = Math.random): string { - const index = Math.floor(random() * INFORMATIVE_STATUS_TEXTS.length); - return INFORMATIVE_STATUS_TEXTS[index] ?? INFORMATIVE_STATUS_TEXTS[0]; +export function pickInformativeStatusText( + params: { config?: MSTeamsConfig; seed?: string; random?: () => number } | (() => number) = {}, +): string | undefined { + const options = typeof params === "function" ? { random: params } : params; + return resolveChannelProgressDraftLabel({ + entry: options.config, + seed: options.seed, + random: options.random, + }); } export function createTeamsReplyStreamController(params: { @@ -29,10 +32,15 @@ export function createTeamsReplyStreamController(params: { context: MSTeamsTurnContext; feedbackLoopEnabled: boolean; log: MSTeamsMonitorLogger; + msteamsConfig?: MSTeamsConfig; + progressSeed?: string; random?: () => number; }) { const isPersonal = normalizeOptionalLowercaseString(params.conversationType) === "personal"; - const stream = isPersonal + const streamMode = resolveChannelPreviewStreamMode(params.msteamsConfig, "partial"); + const shouldUseNativeStream = + isPersonal && (streamMode === "partial" || streamMode === "progress"); + const stream = shouldUseNativeStream ? new TeamsHttpStream({ sendActivity: (activity) => params.context.sendActivity(activity), feedbackLoopEnabled: params.feedbackLoopEnabled, @@ -46,50 +54,77 @@ export function createTeamsReplyStreamController(params: { let informativeUpdateSent = false; let pendingFinalize: Promise | undefined; + const fallbackAfterStreamFailure = ( + payload: ReplyPayload, + hasMedia: boolean, + ): Maybe => { + if (!payload.text) { + return payload; + } + const streamedLength = stream?.streamedLength ?? 0; + if (streamedLength <= 0) { + return payload; + } + const remainingText = payload.text.slice(streamedLength); + if (!remainingText) { + return hasMedia ? { ...payload, text: undefined } : undefined; + } + return { ...payload, text: remainingText }; + }; + return { async onReplyStart(): Promise { if (!stream || informativeUpdateSent) { return; } + const informativeText = pickInformativeStatusText({ + config: params.msteamsConfig, + seed: params.progressSeed, + random: params.random, + }); + if (!informativeText) { + return; + } informativeUpdateSent = true; - await stream.sendInformativeUpdate(pickInformativeStatusText(params.random)); + await stream.sendInformativeUpdate(informativeText); }, onPartialReply(payload: { text?: string }): void { if (!stream || !payload.text) { return; } + if (streamMode === "progress") { + return; + } streamReceivedTokens = true; stream.update(payload.text); }, - preparePayload(payload: ReplyPayload): Maybe { + async preparePayload(payload: ReplyPayload): Promise> { + const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length); + + if (stream && streamMode === "progress" && informativeUpdateSent && !stream.isFinalized) { + if (!payload.text) { + return payload; + } + const finalized = await stream.replaceInformativeWithFinal(payload.text); + informativeUpdateSent = false; + if (!finalized || stream.isFailed) { + return payload; + } + return hasMedia ? { ...payload, text: undefined } : undefined; + } + if (!stream || !streamReceivedTokens) { return payload; } - const hasMedia = Boolean(payload.mediaUrl || payload.mediaUrls?.length); - // Stream failed after partial delivery (e.g. > 4000 chars). Send only // the unstreamed suffix via block delivery to avoid duplicate text. if (stream.isFailed) { streamReceivedTokens = false; - if (!payload.text) { - return payload; - } - - const streamedLength = stream.streamedLength; - if (streamedLength <= 0) { - return payload; - } - - const remainingText = payload.text.slice(streamedLength); - if (!remainingText) { - return hasMedia ? { ...payload, text: undefined } : undefined; - } - - return { ...payload, text: remainingText }; + return fallbackAfterStreamFailure(payload, hasMedia); } if (!stream.hasContent || stream.isFinalized) { diff --git a/extensions/msteams/src/streaming-message.test.ts b/extensions/msteams/src/streaming-message.test.ts index 42704a28d34..66389d22dcf 100644 --- a/extensions/msteams/src/streaming-message.test.ts +++ b/extensions/msteams/src/streaming-message.test.ts @@ -202,6 +202,30 @@ describe("TeamsHttpStream", () => { ); }); + it("reports failure when replacing informative progress with final text fails", async () => { + const sendActivity = vi.fn(async (activity: Record) => { + if (activity.type === "message") { + throw new Error("final send rejected"); + } + return { id: "stream-1" }; + }); + const stream = new TeamsHttpStream({ sendActivity, throttleMs: 1 }); + + await stream.sendInformativeUpdate("Thinking"); + const carried = await stream.replaceInformativeWithFinal( + "Final response long enough to stream before the final message send fails.", + ); + + expect(carried).toBe(false); + expect(stream.isFailed).toBe(true); + expect(sendActivity).toHaveBeenCalledWith( + expect.objectContaining({ + type: "message", + text: "Final response long enough to stream before the final message send fails.", + }), + ); + }); + it("hasContent is true after update", () => { const stream = new TeamsHttpStream({ sendActivity: vi.fn(async () => ({ id: "x" })), diff --git a/extensions/msteams/src/streaming-message.ts b/extensions/msteams/src/streaming-message.ts index 9142f22b1b0..dd1d486f6d2 100644 --- a/extensions/msteams/src/streaming-message.ts +++ b/extensions/msteams/src/streaming-message.ts @@ -163,6 +163,21 @@ export class TeamsHttpStream { this.loop.update(this.accumulatedText); } + /** + * Replace an informative progress update with final answer text. + * Returns false when the stream could not safely carry the final text, so + * callers can deliver the answer through the normal Teams message path. + */ + async replaceInformativeWithFinal(text: string): Promise { + if (this.stopped || this.finalized) { + return false; + } + this.update(text); + await this.loop.flush(); + await this.finalize(); + return !this.streamFailed && this.hasContent; + } + /** * Finalize the stream — send the final message activity. */ @@ -222,6 +237,7 @@ export class TeamsHttpStream { await this.sendActivity(finalActivity); } catch (err) { + this.streamFailed = true; this.onError?.(err); } } diff --git a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts index 521e6a372dc..8c69a7ddcb4 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.preview-fallback.test.ts @@ -31,7 +31,16 @@ class TestSlackStreamNotDeliveredError extends Error { } let mockedNativeStreaming = false; let mockedBlockStreamingEnabled: boolean | undefined = false; -let capturedReplyOptions: { disableBlockStreaming?: boolean } | undefined; +let mockedSlackStreamingMode: "off" | "partial" | "block" | "progress" = "partial"; +let mockedSlackDraftMode: "replace" | "status_final" | "append" = "append"; +let capturedReplyOptions: + | { + disableBlockStreaming?: boolean; + suppressDefaultToolProgressMessages?: boolean; + onItemEvent?: (payload: { progressText: string }) => Promise | void; + onPartialReply?: (payload: { text: string }) => Promise | void; + } + | undefined; let capturedStatusReactionOptions: { enabled?: boolean; initialEmoji?: string } | undefined; const statusReactionControllerMock = { setQueued: vi.fn(async () => {}), @@ -63,6 +72,9 @@ let mockedDispatchSequence: Array<{ }; }> = []; let mockedProgressEvents: string[] = []; +let mockedReplyOptionEvents: Array< + { kind: "item"; progressText: string } | { kind: "partial"; text: string } +> = []; const noop = () => {}; const noopAsync = async () => {}; @@ -83,6 +95,7 @@ function createDraftStreamStub() { function createPreparedSlackMessage(params?: { cfg?: Record; + accountConfig?: Record; ctxPayload?: Record; message?: Partial<{ channel: string; @@ -117,7 +130,7 @@ function createPreparedSlackMessage(params?: { }, account: { accountId: "default", - config: {}, + config: params?.accountConfig ?? {}, }, message: { channel: "C123", @@ -218,9 +231,44 @@ vi.mock("openclaw/plugin-sdk/channel-reply-pipeline", () => ({ })); vi.mock("openclaw/plugin-sdk/channel-streaming", () => ({ + formatChannelProgressDraftText: (params: { + entry?: { streaming?: { progress?: { label?: string; maxLines?: number } } }; + lines: string[]; + }) => + [ + params.entry?.streaming?.progress?.label ?? "Thinking", + ...params.lines.map((line) => `• ${line}`), + ].join("\n"), + resolveChannelProgressDraftMaxLines: (entry?: { + streaming?: { progress?: { maxLines?: number } }; + }) => entry?.streaming?.progress?.maxLines ?? 8, resolveChannelStreamingBlockEnabled: () => mockedBlockStreamingEnabled, resolveChannelStreamingNativeTransport: () => mockedNativeStreaming, - resolveChannelStreamingPreviewToolProgress: () => true, + resolveChannelStreamingPreviewToolProgress: (entry?: { + streaming?: { progress?: { toolProgress?: boolean }; preview?: { toolProgress?: boolean } }; + }) => entry?.streaming?.progress?.toolProgress ?? entry?.streaming?.preview?.toolProgress ?? true, + resolveChannelStreamingSuppressDefaultToolProgressMessages: ( + entry?: { + streaming?: { + mode?: string; + progress?: { toolProgress?: boolean }; + preview?: { toolProgress?: boolean }; + }; + }, + options?: { + draftStreamActive?: boolean; + previewStreamingEnabled?: boolean; + previewToolProgressEnabled?: boolean; + }, + ) => { + if (options?.draftStreamActive === false || options?.previewStreamingEnabled === false) { + return false; + } + if (entry?.streaming?.mode === "progress") { + return true; + } + return options?.previewToolProgressEnabled ?? true; + }, })); vi.mock("openclaw/plugin-sdk/outbound-runtime", () => ({ @@ -292,9 +340,9 @@ vi.mock("../../stream-mode.js", () => ({ }), buildStatusFinalPreviewText: () => "status", resolveSlackStreamingConfig: () => ({ - mode: "partial", + mode: mockedSlackStreamingMode, nativeStreaming: mockedNativeStreaming, - draftMode: "append", + draftMode: mockedSlackDraftMode, }), })); @@ -364,7 +412,9 @@ vi.mock("../reply.runtime.js", () => ({ dispatchInboundMessage: async (params: { replyOptions?: { disableBlockStreaming?: boolean; + suppressDefaultToolProgressMessages?: boolean; onItemEvent?: (payload: { progressText: string }) => Promise | void; + onPartialReply?: (payload: { text: string }) => Promise | void; }; dispatcher: { deliver: ( @@ -380,8 +430,18 @@ vi.mock("../reply.runtime.js", () => ({ }; }) => { capturedReplyOptions = params.replyOptions; - for (const progressText of mockedProgressEvents) { - await params.replyOptions?.onItemEvent?.({ progressText }); + if (mockedReplyOptionEvents.length > 0) { + for (const entry of mockedReplyOptionEvents) { + if (entry.kind === "item") { + await params.replyOptions?.onItemEvent?.({ progressText: entry.progressText }); + } else { + await params.replyOptions?.onPartialReply?.({ text: entry.text }); + } + } + } else { + for (const progressText of mockedProgressEvents) { + await params.replyOptions?.onItemEvent?.({ progressText }); + } } for (const entry of mockedDispatchSequence) { await params.dispatcher.deliver(entry.payload, { kind: entry.kind }); @@ -421,6 +481,8 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { } mockedNativeStreaming = false; mockedBlockStreamingEnabled = false; + mockedSlackStreamingMode = "partial"; + mockedSlackDraftMode = "append"; capturedReplyOptions = undefined; capturedStatusReactionOptions = undefined; capturedTyping = undefined; @@ -428,6 +490,7 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { mockedReplyThreadTsSequence = undefined; mockedDispatchSequence = [{ kind: "final", payload: { text: FINAL_REPLY_TEXT } }]; mockedProgressEvents = []; + mockedReplyOptionEvents = []; createSlackDraftStreamMock.mockReturnValue(createDraftStreamStub()); finalizeSlackPreviewEditMock.mockRejectedValue(new Error("socket closed")); @@ -564,13 +627,99 @@ describe("dispatchPreparedSlackMessage preview fallback", () => { mockedDispatchSequence = []; mockedProgressEvents = ["ran <@U123> *bold* `code` & done"]; - await dispatchPreparedSlackMessage(createPreparedSlackMessage()); + await dispatchPreparedSlackMessage( + createPreparedSlackMessage({ + accountConfig: { streaming: { progress: { label: "Shelling" } } }, + }), + ); expect(draftStream.update).toHaveBeenCalledWith( - "Working…\n• ran <!here> <@U123> \\*bold\\* \\`code\\` & done", + "Shelling\n• ran <!here> <@U123> \\*bold\\* \\`code\\` & done", ); }); + it("honors Slack progress maxLines above the legacy eight-line cap", async () => { + const draftStream = createDraftStreamStub(); + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + mockedDispatchSequence = []; + mockedProgressEvents = Array.from({ length: 10 }, (_value, index) => `step ${index + 1}`); + + await dispatchPreparedSlackMessage( + createPreparedSlackMessage({ + accountConfig: { streaming: { progress: { label: "Shelling", maxLines: 10 } } }, + }), + ); + + expect(draftStream.update).toHaveBeenLastCalledWith( + [ + "Shelling", + "• step 1", + "• step 2", + "• step 3", + "• step 4", + "• step 5", + "• step 6", + "• step 7", + "• step 8", + "• step 9", + "• step 10", + ].join("\n"), + ); + }); + + it("preserves Slack progress lines across status-final answer partials", async () => { + const draftStream = createDraftStreamStub(); + createSlackDraftStreamMock.mockReturnValueOnce(draftStream); + mockedSlackStreamingMode = "progress"; + mockedSlackDraftMode = "status_final"; + mockedDispatchSequence = []; + mockedReplyOptionEvents = [ + { kind: "item", progressText: "tool one" }, + { kind: "partial", text: "partial answer" }, + { kind: "item", progressText: "tool two" }, + ]; + + await dispatchPreparedSlackMessage( + createPreparedSlackMessage({ + accountConfig: { streaming: { progress: { label: "Shelling" } } }, + }), + ); + + expect(draftStream.update).toHaveBeenLastCalledWith( + ["Shelling", "• tool one", "• tool two"].join("\n"), + ); + }); + + it("suppresses standalone Slack tool progress when progress lines are disabled", async () => { + mockedSlackStreamingMode = "progress"; + mockedSlackDraftMode = "status_final"; + mockedDispatchSequence = []; + + await dispatchPreparedSlackMessage( + createPreparedSlackMessage({ + accountConfig: { streaming: { mode: "progress", progress: { toolProgress: false } } }, + }), + ); + + expect(capturedReplyOptions?.suppressDefaultToolProgressMessages).toBe(true); + expect(capturedReplyOptions?.onItemEvent).toBeDefined(); + }); + + it("keeps standalone Slack tool progress when partial preview lines are disabled", async () => { + mockedSlackStreamingMode = "partial"; + mockedSlackDraftMode = "replace"; + mockedDispatchSequence = []; + + await dispatchPreparedSlackMessage( + createPreparedSlackMessage({ + accountConfig: { streaming: { mode: "partial", preview: { toolProgress: false } } }, + }), + ); + + expect(capturedReplyOptions?.suppressDefaultToolProgressMessages).toBeUndefined(); + expect(capturedReplyOptions?.onItemEvent).toBeDefined(); + }); + it("starts native streams in the first-reply thread for top-level channel messages", async () => { mockedNativeStreaming = true; mockedReplyThreadTs = "171234.111"; diff --git a/extensions/slack/src/monitor/message-handler/dispatch.ts b/extensions/slack/src/monitor/message-handler/dispatch.ts index f6e47f95556..8c0d2b72a7d 100644 --- a/extensions/slack/src/monitor/message-handler/dispatch.ts +++ b/extensions/slack/src/monitor/message-handler/dispatch.ts @@ -13,9 +13,12 @@ import { resolveChannelSourceReplyDeliveryMode, } from "openclaw/plugin-sdk/channel-reply-pipeline"; import { + formatChannelProgressDraftText, + resolveChannelProgressDraftMaxLines, resolveChannelStreamingBlockEnabled, resolveChannelStreamingNativeTransport, resolveChannelStreamingPreviewToolProgress, + resolveChannelStreamingSuppressDefaultToolProgressMessages, } from "openclaw/plugin-sdk/channel-streaming"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { @@ -39,11 +42,7 @@ import { } from "../../interactive-replies.js"; import { SLACK_TEXT_LIMIT } from "../../limits.js"; import { recordSlackThreadParticipation } from "../../sent-thread-cache.js"; -import { - applyAppendOnlyStreamUpdate, - buildStatusFinalPreviewText, - resolveSlackStreamingConfig, -} from "../../stream-mode.js"; +import { applyAppendOnlyStreamUpdate, resolveSlackStreamingConfig } from "../../stream-mode.js"; import type { SlackStreamSession } from "../../streaming.js"; import { appendSlackStream, @@ -776,24 +775,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const slackBlocks = readSlackReplyBlocks(payload); const trimmedFinalText = reply.trimmedText; - if (previewStreamingEnabled && streamMode === "status_final" && hasStreamedMessage) { - try { - const statusChannelId = draftStream?.channelId(); - const statusMessageId = draftStream?.messageId(); - if (statusChannelId && statusMessageId) { - await ctx.app.client.chat.update({ - token: ctx.botToken, - channel: statusChannelId, - ts: statusMessageId, - text: "Status: complete. Final answer posted below.", - }); - } - } catch (err) { - logVerbose(`slack: status_final completion update failed (${formatErrorMessage(err)})`); - } - hasStreamedMessage = false; - } - const result = await deliverFinalizableDraftPreview({ kind: info.kind, payload, @@ -813,7 +794,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag buildFinalEdit: () => { if ( !previewStreamingEnabled || - streamMode === "status_final" || reply.hasMedia || payload.isError || (trimmedFinalText.length === 0 && !slackBlocks?.length) @@ -892,11 +872,18 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag const streamMode = slackStreaming.draftMode; const previewToolProgressEnabled = Boolean(draftStream) && resolveChannelStreamingPreviewToolProgress(account.config); + const suppressDefaultToolProgressMessages = + resolveChannelStreamingSuppressDefaultToolProgressMessages(account.config, { + draftStreamActive: Boolean(draftStream), + previewToolProgressEnabled, + previewStreamingEnabled, + }); let previewToolProgressSuppressed = false; let previewToolProgressLines: string[] = []; let appendRenderedText = ""; let appendSourceText = ""; let statusUpdateCount = 0; + const progressSeed = `${account.accountId}:${message.channel}`; const pushPreviewToolProgress = (line?: string) => { if (!draftStream || !previewToolProgressEnabled || previewToolProgressSuppressed) { @@ -911,9 +898,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (previous === escaped) { return; } - previewToolProgressLines = [...previewToolProgressLines, escaped].slice(-8); + previewToolProgressLines = [...previewToolProgressLines, escaped].slice( + -resolveChannelProgressDraftMaxLines(account.config), + ); draftStream.update( - ["Working…", ...previewToolProgressLines.map((entry) => `• ${entry}`)].join("\n"), + formatChannelProgressDraftText({ + entry: account.config, + lines: previewToolProgressLines, + seed: progressSeed, + }), ); hasStreamedMessage = true; }; @@ -924,10 +917,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag return; } - previewToolProgressSuppressed = true; - previewToolProgressLines = []; - if (streamMode === "append") { + previewToolProgressSuppressed = true; + previewToolProgressLines = []; const next = applyAppendOnlyStreamUpdate({ incoming: trimmed, rendered: appendRenderedText, @@ -948,11 +940,19 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag if (statusUpdateCount > 1 && statusUpdateCount % 4 !== 0) { return; } - draftStream?.update(buildStatusFinalPreviewText(statusUpdateCount)); + draftStream?.update( + formatChannelProgressDraftText({ + entry: account.config, + lines: previewToolProgressLines, + seed: progressSeed, + }), + ); hasStreamedMessage = true; return; } + previewToolProgressSuppressed = true; + previewToolProgressLines = []; draftStream?.update(trimmed); hasStreamedMessage = true; }; @@ -1015,7 +1015,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag hasRepliedRef, disableBlockStreaming, onModelSelected, - suppressDefaultToolProgressMessages: previewToolProgressEnabled ? true : undefined, + suppressDefaultToolProgressMessages: suppressDefaultToolProgressMessages + ? true + : undefined, onPartialReply: useStreaming ? undefined : !previewStreamingEnabled diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 30ff8dcc104..b8d79d157df 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -730,6 +730,87 @@ describe("dispatchTelegramMessage draft streaming", () => { ); }); + it("keeps canonical block mode on the Telegram draft preview path", async () => { + const draftStream = createDraftStream(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "HelloWorld" }); + await dispatcherOptions.deliver({ text: "HelloWorld" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ + context: createContext(), + streamMode: "block", + telegramCfg: { streaming: { mode: "block" } }, + }); + + expect(createTelegramDraftStream).toHaveBeenCalled(); + expect(draftStream.update).toHaveBeenCalledWith("HelloWorld"); + }); + + it("reuses the Telegram progress draft for the first assistant final", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReplyStart?.(); + await replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Final answer" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(draftStream.update).toHaveBeenCalledWith("Shelling"); + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith(123, 2001, "Final answer", expect.any(Object)); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + + it("keeps the Telegram progress draft across post-tool assistant boundaries", async () => { + const draftStream = createSequencedDraftStream(2001); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onReplyStart?.(); + await replyOptions?.onAssistantMessageStart?.(); + await replyOptions?.onItemEvent?.({ progressText: "exec ls ~/Desktop" }); + await replyOptions?.onAssistantMessageStart?.(); + await dispatcherOptions.deliver({ text: "Final after tool" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "progress", + telegramCfg: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + }); + + expect(draftStream.update).toHaveBeenCalledWith("Shelling"); + expect(draftStream.update).toHaveBeenCalledWith( + expect.stringMatching(/^Shelling\n• `exec ls ~\/Desktop`$/), + ); + expect(draftStream.forceNewMessage).not.toHaveBeenCalled(); + expect(draftStream.materialize).not.toHaveBeenCalled(); + expect(editMessageTelegram).toHaveBeenCalledWith( + 123, + 2001, + "Final after tool", + expect.any(Object), + ); + expect(draftStream.clear).not.toHaveBeenCalled(); + }); + it("streams Telegram tool progress by default when preview streaming is active", async () => { const draftStream = createDraftStream(); createTelegramDraftStream.mockReturnValue(draftStream); @@ -742,7 +823,7 @@ describe("dispatchTelegramMessage draft streaming", () => { await dispatchWithContext({ context: createContext(), streamMode: "partial" }); expect(draftStream.update).toHaveBeenCalledWith( - "Working…\n• `tool: exec`\n• `exec ls ~/Desktop`", + expect.stringMatching(/\n• `tool: exec`\n• `exec ls ~\/Desktop`$/), ); expect(dispatchReplyWithBufferedBlockDispatcher).toHaveBeenCalledWith( expect.objectContaining({ @@ -812,7 +893,7 @@ describe("dispatchTelegramMessage draft streaming", () => { }); const lastPreviewText = draftStream.update.mock.calls.at(-1)?.[0]; - expect(lastPreviewText).toBe("Working…\n• `tool: exec`\n• `read [label](tg://user?id=123)`"); + expect(lastPreviewText).toMatch(/\n• `tool: exec`\n• `read \[label\]\(tg:\/\/user\?id=123\)`$/); expect(renderTelegramHtmlText(lastPreviewText ?? "")).not.toContain(" `• ${formatProgressAsMarkdownCode(entry)}`), - ].join("\n"); + previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + -resolveChannelProgressDraftMaxLines(telegramCfg), + ); + const previewText = formatChannelProgressDraftText({ + entry: telegramCfg, + lines: previewToolProgressLines, + seed: progressSeed, + formatLine: formatProgressAsMarkdownCode, + }); answerLane.lastPartialText = previewText; + answerLane.hasStreamedMessage = true; answerLane.stream.update(previewText); }; let splitReasoningOnNextStream = false; @@ -570,6 +578,9 @@ export const dispatchTelegramMessage = async ({ return; } if (lane === answerLane) { + if (streamMode === "progress") { + return; + } previewToolProgressSuppressed = true; previewToolProgressLines = []; } @@ -1056,6 +1067,25 @@ export const dispatchTelegramMessage = async ({ replyOptions: { skillFilter, disableBlockStreaming, + onReplyStart: + answerLane.stream && streamMode === "progress" + ? () => + enqueueDraftLaneEvent(async () => { + const previewText = formatChannelProgressDraftText({ + entry: telegramCfg, + lines: [], + seed: progressSeed, + formatLine: formatProgressAsMarkdownCode, + }); + if (!previewText || previewText === answerLane.lastPartialText) { + return; + } + answerLane.lastPartialText = previewText; + answerLane.hasStreamedMessage = true; + answerLane.stream?.update(previewText); + await answerLane.stream?.flush(); + }) + : undefined, onPartialReply: answerLane.stream || reasoningLane.stream ? (payload) => @@ -1086,6 +1116,11 @@ export const dispatchTelegramMessage = async ({ retainPreviewOnCleanupByLane.answer = false; return; } + if (streamMode === "progress") { + activePreviewLifecycleByLane.answer = "transient"; + retainPreviewOnCleanupByLane.answer = false; + return; + } if (pendingCompactionReplayBoundary) { pendingCompactionReplayBoundary = false; activePreviewLifecycleByLane.answer = "transient"; diff --git a/extensions/telegram/src/bot.helpers.test.ts b/extensions/telegram/src/bot.helpers.test.ts index cc08232ec40..0823438e3a5 100644 --- a/extensions/telegram/src/bot.helpers.test.ts +++ b/extensions/telegram/src/bot.helpers.test.ts @@ -20,8 +20,8 @@ describe("resolveTelegramStreamMode", () => { expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("block"); }); - it("maps unified progress mode to partial on Telegram", () => { - expect(resolveTelegramStreamMode({ streaming: "progress" })).toBe("partial"); + it("preserves unified progress mode on Telegram", () => { + expect(resolveTelegramStreamMode({ streaming: "progress" })).toBe("progress"); }); }); diff --git a/extensions/telegram/src/bot/types.ts b/extensions/telegram/src/bot/types.ts index 5903d321cbb..bbb7ff60d1d 100644 --- a/extensions/telegram/src/bot/types.ts +++ b/extensions/telegram/src/bot/types.ts @@ -1,7 +1,7 @@ import type { ChatFullInfo, Message, UserFromGetMe } from "@grammyjs/types"; /** App-specific stream mode for Telegram stream previews. */ -export type TelegramStreamMode = "off" | "partial" | "block"; +export type TelegramStreamMode = "off" | "partial" | "block" | "progress"; type TelegramGetFile = () => Promise<{ file_path?: string }>; export type TelegramChatDetails = { diff --git a/extensions/telegram/src/config-ui-hints.ts b/extensions/telegram/src/config-ui-hints.ts index ee38028adbb..9557cf989f4 100644 --- a/extensions/telegram/src/config-ui-hints.ts +++ b/extensions/telegram/src/config-ui-hints.ts @@ -39,11 +39,11 @@ export const telegramChannelConfigUiHints = { }, streaming: { label: "Telegram Streaming Mode", - help: 'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" maps to "partial" on Telegram. Legacy boolean/streamMode keys are detected; run doctor --fix to migrate.', + help: 'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" keeps a single editable progress draft until final delivery. Legacy boolean/streamMode keys are detected; run doctor --fix to migrate.', }, "streaming.mode": { label: "Telegram Streaming Mode", - help: 'Canonical Telegram preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" maps to "partial" on Telegram.', + help: 'Canonical Telegram preview mode: "off" | "partial" | "block" | "progress" (default: "partial").', }, "streaming.chunkMode": { label: "Telegram Chunk Mode", diff --git a/extensions/telegram/src/preview-streaming.ts b/extensions/telegram/src/preview-streaming.ts index 5bbdfac566f..e9db669d7e1 100644 --- a/extensions/telegram/src/preview-streaming.ts +++ b/extensions/telegram/src/preview-streaming.ts @@ -1,6 +1,9 @@ -import { resolveChannelPreviewStreamMode } from "openclaw/plugin-sdk/channel-streaming"; +import { + resolveChannelPreviewStreamMode, + type StreamingMode, +} from "openclaw/plugin-sdk/channel-streaming"; -type TelegramPreviewStreamMode = "off" | "partial" | "block"; +type TelegramPreviewStreamMode = StreamingMode; export function resolveTelegramPreviewStreamMode( params: { diff --git a/src/config/bundled-channel-config-metadata.generated.ts b/src/config/bundled-channel-config-metadata.generated.ts index 0b117949d5f..54a511b9eee 100644 --- a/src/config/bundled-channel-config-metadata.generated.ts +++ b/src/config/bundled-channel-config-metadata.generated.ts @@ -933,6 +933,37 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, additionalProperties: false, }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, block: { type: "object", properties: { @@ -2336,6 +2367,37 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, additionalProperties: false, }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, block: { type: "object", properties: { @@ -3517,11 +3579,11 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, streaming: { label: "Discord Streaming Mode", - help: 'Unified Discord stream preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Discord. Legacy boolean/streamMode keys are auto-mapped.', + help: 'Unified Discord stream preview mode: "off" | "partial" | "block" | "progress". "progress" keeps a single editable progress draft until final delivery. Legacy boolean/streamMode keys are auto-mapped.', }, "streaming.mode": { label: "Discord Streaming Mode", - help: 'Canonical Discord preview mode: "off" | "partial" | "block" | "progress". "progress" maps to "partial" on Discord.', + help: 'Canonical Discord preview mode: "off" | "partial" | "block" | "progress".', }, "streaming.chunkMode": { label: "Discord Chunk Mode", @@ -7516,7 +7578,7 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ anyOf: [ { type: "string", - enum: ["partial", "quiet", "off"], + enum: ["partial", "quiet", "progress", "off"], }, { type: "boolean", @@ -7526,7 +7588,38 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ properties: { mode: { type: "string", - enum: ["partial", "quiet", "off"], + enum: ["partial", "quiet", "progress", "off"], + }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, }, preview: { type: "object", @@ -8699,6 +8792,122 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ type: "string", enum: ["length", "newline"], }, + streaming: { + type: "object", + properties: { + mode: { + type: "string", + enum: ["off", "partial", "block", "progress"], + }, + chunkMode: { + type: "string", + enum: ["length", "newline"], + }, + preview: { + type: "object", + properties: { + chunk: { + type: "object", + properties: { + minChars: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + maxChars: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + breakPreference: { + anyOf: [ + { + type: "string", + const: "paragraph", + }, + { + type: "string", + const: "newline", + }, + { + type: "string", + const: "sentence", + }, + ], + }, + }, + additionalProperties: false, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, + block: { + type: "object", + properties: { + enabled: { + type: "boolean", + }, + coalesce: { + type: "object", + properties: { + minChars: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + maxChars: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + idleMs: { + type: "integer", + minimum: 0, + maximum: 9007199254740991, + }, + }, + additionalProperties: false, + }, + }, + additionalProperties: false, + }, + }, + additionalProperties: false, + }, typingIndicator: { type: "boolean", }, @@ -9015,6 +9224,14 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ label: "MS Teams Config Writes", help: "Allow Microsoft Teams to write config in response to channel events/commands (default: true).", }, + streaming: { + label: "MS Teams Streaming", + help: 'Microsoft Teams preview/progress streaming mode: "off" | "partial" | "block" | "progress". Personal chats use Teams native streaminfo progress when available.', + }, + "streaming.progress.label": { + label: "MS Teams Progress Label", + help: 'Initial progress title. Use "auto" for built-in single-word labels, a custom string, or false to hide the title.', + }, }, }, { @@ -11841,6 +12058,37 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, additionalProperties: false, }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, block: { type: "object", properties: { @@ -12772,6 +13020,37 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, additionalProperties: false, }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, block: { type: "object", properties: { @@ -14092,6 +14371,37 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, additionalProperties: false, }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, block: { type: "object", properties: { @@ -15161,6 +15471,37 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, additionalProperties: false, }, + progress: { + type: "object", + properties: { + label: { + anyOf: [ + { + type: "string", + }, + { + type: "boolean", + const: false, + }, + ], + }, + labels: { + type: "array", + items: { + type: "string", + }, + }, + maxLines: { + type: "integer", + exclusiveMinimum: 0, + maximum: 9007199254740991, + }, + toolProgress: { + type: "boolean", + }, + }, + additionalProperties: false, + }, block: { type: "object", properties: { @@ -15552,11 +15893,11 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ }, streaming: { label: "Telegram Streaming Mode", - help: 'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" maps to "partial" on Telegram. Legacy boolean/streamMode keys are detected; run doctor --fix to migrate.', + help: 'Unified Telegram stream preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" keeps a single editable progress draft until final delivery. Legacy boolean/streamMode keys are detected; run doctor --fix to migrate.', }, "streaming.mode": { label: "Telegram Streaming Mode", - help: 'Canonical Telegram preview mode: "off" | "partial" | "block" | "progress" (default: "partial"). "progress" maps to "partial" on Telegram.', + help: 'Canonical Telegram preview mode: "off" | "partial" | "block" | "progress" (default: "partial").', }, "streaming.chunkMode": { label: "Telegram Chunk Mode", diff --git a/src/config/types.base.ts b/src/config/types.base.ts index 29cc00e9007..bb38356f3d3 100644 --- a/src/config/types.base.ts +++ b/src/config/types.base.ts @@ -34,6 +34,17 @@ export type BlockStreamingChunkConfig = { breakPreference?: "paragraph" | "newline" | "sentence"; }; +export type ChannelStreamingProgressConfig = { + /** Initial progress title. "auto" picks from labels; false hides the title. Default: "auto". */ + label?: string | false; + /** Candidate labels for label="auto". Defaults to OpenClaw's built-in progress labels. */ + labels?: string[]; + /** Maximum number of progress lines to keep below the label. Default: 8. */ + maxLines?: number; + /** Include compact tool/task progress in the draft. Default: true. */ + toolProgress?: boolean; +}; + export type ChannelStreamingPreviewConfig = { /** Chunking thresholds for preview-draft updates while streaming. */ chunk?: BlockStreamingChunkConfig; @@ -69,6 +80,7 @@ export type ChannelStreamingConfig = { */ nativeTransport?: boolean; preview?: ChannelStreamingPreviewConfig; + progress?: ChannelStreamingProgressConfig; block?: ChannelStreamingBlockConfig; }; @@ -76,12 +88,12 @@ export type ChannelDeliveryStreamingConfig = Pick; export type SlackChannelStreamingConfig = Pick< ChannelStreamingConfig, - "mode" | "chunkMode" | "preview" | "block" | "nativeTransport" + "mode" | "chunkMode" | "preview" | "progress" | "block" | "nativeTransport" >; export type MarkdownTableMode = "off" | "bullets" | "code" | "block"; diff --git a/src/config/types.msteams.ts b/src/config/types.msteams.ts index ad6ea6549fd..c4dc8f4cc5d 100644 --- a/src/config/types.msteams.ts +++ b/src/config/types.msteams.ts @@ -1,5 +1,6 @@ import type { BlockStreamingCoalesceConfig, + ChannelPreviewStreamingConfig, ContextVisibilityMode, DmPolicy, GroupPolicy, @@ -131,6 +132,8 @@ export type MSTeamsConfig = { textChunkLimit?: number; /** Chunking mode: "length" (default) splits by size; "newline" splits on every newline. */ chunkMode?: "length" | "newline"; + /** Preview/progress streaming config for visible in-progress replies. */ + streaming?: ChannelPreviewStreamingConfig; /** Send native Teams typing indicator before replies. Default: true for groups/channels; DMs use informative stream status. */ typingIndicator?: boolean; /** Enable progressive block-by-block message delivery instead of a single reply. */ diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 9c29c6ae366..a03ff2f1420 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -86,11 +86,20 @@ const ChannelStreamingPreviewSchema = z toolProgress: z.boolean().optional(), }) .strict(); +const ChannelStreamingProgressSchema = z + .object({ + label: z.union([z.string(), z.literal(false)]).optional(), + labels: z.array(z.string()).optional(), + maxLines: z.number().int().positive().optional(), + toolProgress: z.boolean().optional(), + }) + .strict(); const ChannelPreviewStreamingConfigSchema = z .object({ mode: UnifiedStreamingModeSchema.optional(), chunkMode: TextChunkModeSchema.optional(), preview: ChannelStreamingPreviewSchema.optional(), + progress: ChannelStreamingProgressSchema.optional(), block: ChannelStreamingBlockSchema.optional(), }) .strict(); @@ -1618,6 +1627,7 @@ export const MSTeamsConfigSchema = z contextVisibility: ContextVisibilityModeSchema.optional(), textChunkLimit: z.number().int().positive().optional(), chunkMode: z.enum(["length", "newline"]).optional(), + streaming: ChannelPreviewStreamingConfigSchema.optional(), typingIndicator: z.boolean().optional(), blockStreaming: z.boolean().optional(), blockStreamingCoalesce: BlockStreamingCoalesceSchema.optional(), diff --git a/src/plugin-sdk/channel-streaming.test.ts b/src/plugin-sdk/channel-streaming.test.ts index 9203eef5a15..fc2a4b4bf21 100644 --- a/src/plugin-sdk/channel-streaming.test.ts +++ b/src/plugin-sdk/channel-streaming.test.ts @@ -1,11 +1,17 @@ import { describe, expect, it } from "vitest"; import { + DEFAULT_PROGRESS_DRAFT_LABELS, + formatChannelProgressDraftText, getChannelStreamingConfigObject, + resolveChannelPreviewStreamMode, + resolveChannelProgressDraftLabel, + resolveChannelProgressDraftMaxLines, resolveChannelStreamingBlockCoalesce, resolveChannelStreamingBlockEnabled, resolveChannelStreamingChunkMode, resolveChannelStreamingNativeTransport, resolveChannelStreamingPreviewChunk, + resolveChannelStreamingSuppressDefaultToolProgressMessages, resolveChannelStreamingPreviewToolProgress, } from "./channel-streaming.js"; @@ -73,4 +79,90 @@ describe("channel-streaming", () => { }); expect(resolveChannelStreamingPreviewToolProgress(entry)).toBe(true); }); + + it("preserves progress as a first-class preview mode", () => { + expect(resolveChannelPreviewStreamMode({ streaming: "progress" }, "off")).toBe("progress"); + expect(resolveChannelPreviewStreamMode({ streaming: { mode: "progress" } }, "off")).toBe( + "progress", + ); + }); + + it("keeps block preview mode separate from block delivery", () => { + expect(resolveChannelStreamingBlockEnabled({ streaming: "block" })).toBeUndefined(); + expect(resolveChannelStreamingBlockEnabled({ streaming: { mode: "block" } })).toBeUndefined(); + expect( + resolveChannelStreamingBlockEnabled({ + streaming: { mode: "block", block: { enabled: true } }, + }), + ).toBe(true); + expect(resolveChannelStreamingBlockEnabled({ streaming: "block", blockStreaming: false })).toBe( + false, + ); + }); + + it("suppresses standalone tool progress for active preview drafts", () => { + expect( + resolveChannelStreamingSuppressDefaultToolProgressMessages({ + streaming: { mode: "progress", progress: { toolProgress: false } }, + }), + ).toBe(true); + expect( + resolveChannelStreamingSuppressDefaultToolProgressMessages( + { streaming: { mode: "partial", preview: { toolProgress: false } } }, + { draftStreamActive: true }, + ), + ).toBe(false); + expect( + resolveChannelStreamingSuppressDefaultToolProgressMessages( + { streaming: { mode: "partial", preview: { toolProgress: false } } }, + { draftStreamActive: true, previewToolProgressEnabled: true }, + ), + ).toBe(true); + expect( + resolveChannelStreamingSuppressDefaultToolProgressMessages( + { streaming: { mode: "progress" } }, + { draftStreamActive: false }, + ), + ).toBe(false); + }); + + it("uses auto progress labels when no explicit label is configured", () => { + expect(resolveChannelProgressDraftLabel({ random: () => 0 })).toBe( + DEFAULT_PROGRESS_DRAFT_LABELS[0], + ); + expect(resolveChannelProgressDraftLabel({ random: () => 0.99 })).toBe( + DEFAULT_PROGRESS_DRAFT_LABELS.at(-1), + ); + }); + + it("supports explicit progress labels and custom label sets", () => { + expect( + resolveChannelProgressDraftLabel({ + entry: { streaming: { progress: { label: "Crunching" } } }, + }), + ).toBe("Crunching"); + expect( + resolveChannelProgressDraftLabel({ + entry: { streaming: { progress: { labels: ["Pearling"] } } }, + random: () => 0.5, + }), + ).toBe("Pearling"); + expect( + resolveChannelProgressDraftLabel({ + entry: { streaming: { progress: { label: false } } }, + }), + ).toBeUndefined(); + }); + + it("formats bounded progress draft text", () => { + const entry = { streaming: { progress: { label: "Shelling", maxLines: 2 } } }; + expect(resolveChannelProgressDraftMaxLines(entry)).toBe(2); + expect( + formatChannelProgressDraftText({ + entry, + lines: [" tool: read ", "patch applied", "tests done"], + formatLine: (line) => `\`${line}\``, + }), + ).toBe("Shelling\n• `patch applied`\n• `tests done`"); + }); }); diff --git a/src/plugin-sdk/channel-streaming.ts b/src/plugin-sdk/channel-streaming.ts index 73c77c58611..d0c0ffbda2d 100644 --- a/src/plugin-sdk/channel-streaming.ts +++ b/src/plugin-sdk/channel-streaming.ts @@ -3,8 +3,10 @@ import type { BlockStreamingCoalesceConfig, ChannelDeliveryStreamingConfig, ChannelPreviewStreamingConfig, + ChannelStreamingProgressConfig, ChannelStreamingConfig, SlackChannelStreamingConfig, + StreamingMode, TextChunkMode, } from "../config/types.base.js"; import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js"; @@ -14,6 +16,7 @@ export type { ChannelPreviewStreamingConfig, ChannelStreamingBlockConfig, ChannelStreamingConfig, + ChannelStreamingProgressConfig, ChannelStreamingPreviewConfig, SlackChannelStreamingConfig, StreamingMode, @@ -44,6 +47,10 @@ function asBoolean(value: unknown): boolean | undefined { return typeof value === "boolean" ? value : undefined; } +function asInteger(value: unknown): number | undefined { + return typeof value === "number" && Number.isInteger(value) ? value : undefined; +} + function normalizeStreamingMode(value: unknown): string | null { if (typeof value !== "string") { return null; @@ -52,7 +59,7 @@ function normalizeStreamingMode(value: unknown): string | null { return normalized || null; } -function parsePreviewStreamingMode(value: unknown): "off" | "partial" | "block" | null { +function parsePreviewStreamingMode(value: unknown): StreamingMode | null { const normalized = normalizeStreamingMode(value); if ( normalized === "off" || @@ -60,7 +67,7 @@ function parsePreviewStreamingMode(value: unknown): "off" | "partial" | "block" normalized === "block" || normalized === "progress" ) { - return normalized === "progress" ? "partial" : normalized; + return normalized; } return null; } @@ -73,6 +80,33 @@ function asBlockStreamingChunkConfig(value: unknown): BlockStreamingChunkConfig return asObjectRecord(value) as BlockStreamingChunkConfig | undefined; } +function asProgressConfig(value: unknown): ChannelStreamingProgressConfig | undefined { + return asObjectRecord(value) as ChannelStreamingProgressConfig | undefined; +} + +export const DEFAULT_PROGRESS_DRAFT_LABELS = [ + "Thinking", + "Shelling", + "Scuttling", + "Clawing", + "Pinching", + "Molting", + "Bubbling", + "Tiding", + "Reefing", + "Cracking", + "Sifting", + "Brining", + "Nautiling", + "Krilling", + "Barnacling", + "Lobstering", + "Tidepooling", + "Pearling", + "Snapping", + "Surfacing", +] as const; + export function getChannelStreamingConfigObject( entry: StreamingCompatEntry | null | undefined, ): ChannelStreamingConfig | undefined { @@ -121,7 +155,32 @@ export function resolveChannelStreamingPreviewToolProgress( defaultValue = true, ): boolean { const config = getChannelStreamingConfigObject(entry); - return asBoolean(config?.preview?.toolProgress) ?? defaultValue; + return ( + asBoolean(config?.progress?.toolProgress) ?? + asBoolean(config?.preview?.toolProgress) ?? + defaultValue + ); +} + +export function resolveChannelStreamingSuppressDefaultToolProgressMessages( + entry: StreamingCompatEntry | null | undefined, + options?: { + draftStreamActive?: boolean; + previewToolProgressEnabled?: boolean; + previewStreamingEnabled?: boolean; + }, +): boolean { + if (options?.draftStreamActive === false || options?.previewStreamingEnabled === false) { + return false; + } + const mode = resolveChannelPreviewStreamMode(entry, "off"); + if (mode === "off") { + return false; + } + if (mode === "progress") { + return true; + } + return options?.previewToolProgressEnabled ?? resolveChannelStreamingPreviewToolProgress(entry); } export function resolveChannelStreamingNativeTransport( @@ -134,7 +193,7 @@ export function resolveChannelStreamingNativeTransport( export function resolveChannelPreviewStreamMode( entry: StreamingCompatEntry | null | undefined, defaultMode: "off" | "partial", -): "off" | "partial" | "block" { +): StreamingMode { const parsedStreaming = parsePreviewStreamingMode( getChannelStreamingConfigObject(entry)?.mode ?? entry?.streaming, ); @@ -151,3 +210,80 @@ export function resolveChannelPreviewStreamMode( } return defaultMode; } + +export function resolveChannelProgressDraftConfig( + entry: StreamingCompatEntry | null | undefined, +): ChannelStreamingProgressConfig { + return asProgressConfig(getChannelStreamingConfigObject(entry)?.progress) ?? {}; +} + +function normalizeProgressLabels(labels: unknown): string[] { + if (!Array.isArray(labels)) { + return [...DEFAULT_PROGRESS_DRAFT_LABELS]; + } + const normalized = labels + .map((entry) => (typeof entry === "string" ? entry.trim() : "")) + .filter((entry) => entry.length > 0); + return normalized.length > 0 ? normalized : [...DEFAULT_PROGRESS_DRAFT_LABELS]; +} + +function hashProgressSeed(seed: string): number { + let hash = 2166136261; + for (let index = 0; index < seed.length; index += 1) { + hash ^= seed.charCodeAt(index); + hash = Math.imul(hash, 16777619); + } + return hash >>> 0; +} + +export function resolveChannelProgressDraftLabel(params: { + entry?: StreamingCompatEntry | null; + seed?: string; + random?: () => number; +}): string | undefined { + const progress = resolveChannelProgressDraftConfig(params.entry); + if (progress.label === false) { + return undefined; + } + if (typeof progress.label === "string" && progress.label.trim() && progress.label !== "auto") { + return progress.label.trim(); + } + const labels = normalizeProgressLabels(progress.labels); + const index = + typeof params.seed === "string" && params.seed.length > 0 + ? hashProgressSeed(params.seed) % labels.length + : Math.floor(Math.max(0, Math.min(0.999999, params.random?.() ?? 0)) * labels.length); + return labels[index] ?? labels[0]; +} + +export function resolveChannelProgressDraftMaxLines( + entry: StreamingCompatEntry | null | undefined, + defaultValue = 8, +): number { + const configured = asInteger(resolveChannelProgressDraftConfig(entry).maxLines); + return configured && configured > 0 ? configured : defaultValue; +} + +export function formatChannelProgressDraftText(params: { + entry?: StreamingCompatEntry | null; + lines: string[]; + seed?: string; + random?: () => number; + formatLine?: (line: string) => string; + bullet?: string; +}): string { + const label = resolveChannelProgressDraftLabel({ + entry: params.entry, + seed: params.seed, + random: params.random, + }); + const maxLines = resolveChannelProgressDraftMaxLines(params.entry); + const formatLine = params.formatLine ?? ((line: string) => line); + const bullet = params.bullet ?? "•"; + const lines = params.lines + .map((line) => line.replace(/\s+/g, " ").trim()) + .filter((line) => line.length > 0) + .slice(-maxLines) + .map((line) => `${bullet} ${formatLine(line)}`); + return [label, ...lines].filter((line): line is string => Boolean(line)).join("\n"); +}