From a4b09d72b92b07b971543307614874cf4e2ee57f Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 3 Jun 2026 10:12:52 +0530 Subject: [PATCH] refactor(channels): share progress draft compositor --- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- .../progress-draft-compositor.test.ts | 106 +++++ src/channels/progress-draft-compositor.ts | 387 ++++++++++++++++++ src/channels/streaming.ts | 2 +- src/plugin-sdk/channel-outbound.ts | 1 + 5 files changed, 497 insertions(+), 3 deletions(-) create mode 100644 src/channels/progress-draft-compositor.test.ts create mode 100644 src/channels/progress-draft-compositor.ts diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index a104254a0b6..8a712e32512 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -f3e0379cbe0e584a8c9658253d4a808356fe80fb5ec775bbee9e968e8d815380 plugin-sdk-api-baseline.json -601b55acafbd1e00b850c9b0c15d587029050906960071d448d37538b223e226 plugin-sdk-api-baseline.jsonl +a9501e226bb26befb02072cf5e60c3dc124cbd5dc0b16eb281789d0843f72f71 plugin-sdk-api-baseline.json +b106090dc12bf7e46beac4ed160f0cff0ef8039291f24172b693e8d8b752d571 plugin-sdk-api-baseline.jsonl diff --git a/src/channels/progress-draft-compositor.test.ts b/src/channels/progress-draft-compositor.test.ts new file mode 100644 index 00000000000..aaa42fdf266 --- /dev/null +++ b/src/channels/progress-draft-compositor.test.ts @@ -0,0 +1,106 @@ +import { describe, expect, it, vi } from "vitest"; +import { createChannelProgressDraftCompositor } from "./progress-draft-compositor.js"; + +describe("createChannelProgressDraftCompositor", () => { + it("keeps the progress label visible when tool lines are hidden", async () => { + const update = vi.fn(); + const progress = createChannelProgressDraftCompositor({ + entry: { + streaming: { mode: "progress", progress: { label: "Shelling", toolProgress: false } }, + }, + mode: "progress", + active: true, + seed: "test", + update, + }); + + await progress.pushToolProgress("🛠️ Exec", { startImmediately: true }); + + expect(update).toHaveBeenCalledWith("Shelling", { flush: true }); + }); + + it("keeps reasoning details hidden when tool progress lines are hidden", async () => { + const update = vi.fn(); + const progress = createChannelProgressDraftCompositor({ + entry: { + streaming: { mode: "progress", progress: { label: "Shelling", toolProgress: false } }, + }, + mode: "progress", + active: true, + seed: "test", + update, + }); + + await progress.pushToolProgress("🛠️ Exec", { startImmediately: true }); + await progress.pushReasoningProgress("Reading files"); + + expect(update).toHaveBeenCalledWith("Shelling", { flush: true }); + expect(update).not.toHaveBeenCalledWith(expect.stringContaining("Reading"), undefined); + }); + + it("does not resurrect progress after suppression", async () => { + const update = vi.fn(); + const progress = createChannelProgressDraftCompositor({ + entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + mode: "progress", + active: true, + seed: "test", + update, + }); + + progress.suppress(); + await progress.pushReasoningProgress("Reading files"); + + expect(update).not.toHaveBeenCalled(); + }); + + it("composes reasoning deltas with tool progress", async () => { + const update = vi.fn(); + const progress = createChannelProgressDraftCompositor({ + entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + mode: "progress", + active: true, + seed: "test", + update, + }); + + await progress.pushToolProgress("🛠️ Exec", { startImmediately: true }); + await progress.pushReasoningProgress("Reading"); + await progress.pushReasoningProgress(" files"); + + expect(update).toHaveBeenLastCalledWith("Shelling\n\n🛠️ Exec\n• _Reading files_", undefined); + }); + + it("preserves tagged reasoning content without leaking tags", async () => { + const update = vi.fn(); + const progress = createChannelProgressDraftCompositor({ + entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + mode: "progress", + active: true, + seed: "test", + update, + }); + + await progress.pushToolProgress("🛠️ Exec", { startImmediately: true }); + await progress.pushReasoningProgress("Checking files"); + + expect(update).toHaveBeenLastCalledWith("Shelling\n\n🛠️ Exec\n• _Checking files_", undefined); + }); + + it("replaces repeated formatted reasoning snapshots", async () => { + const update = vi.fn(); + const progress = createChannelProgressDraftCompositor({ + entry: { streaming: { mode: "progress", progress: { label: "Shelling" } } }, + mode: "progress", + active: true, + seed: "test", + update, + }); + + await progress.pushToolProgress("🛠️ Exec", { startImmediately: true }); + await progress.pushReasoningProgress("Thinking\n\n_Reading_"); + await progress.pushReasoningProgress("Thinking\n\n_Reading files_"); + + expect(update).toHaveBeenLastCalledWith("Shelling\n\n🛠️ Exec\n• _Reading files_", undefined); + }); +}); diff --git a/src/channels/progress-draft-compositor.ts b/src/channels/progress-draft-compositor.ts new file mode 100644 index 00000000000..c9d11ada599 --- /dev/null +++ b/src/channels/progress-draft-compositor.ts @@ -0,0 +1,387 @@ +import { formatReasoningMessage } from "../agents/embedded-agent-utils.js"; +import { stripInlineDirectiveTagsForDelivery } from "../utils/directive-tags.js"; +import { + createChannelProgressDraftGate, + type ChannelProgressDraftLine, + formatChannelProgressDraftText, + isChannelProgressDraftWorkToolName, + mergeChannelProgressDraftLine, + normalizeChannelProgressDraftLineIdentity, + resolveChannelProgressDraftMaxLineChars, + resolveChannelProgressDraftMaxLines, + resolveChannelStreamingProgressCommentary, + resolveChannelStreamingPreviewToolProgress, + resolveChannelStreamingSuppressDefaultToolProgressMessages, + type StreamingCompatEntry, + type StreamingMode, +} from "./streaming.js"; + +export type ChannelProgressDraftMode = StreamingMode; + +export type ChannelProgressDraftCompositor = ReturnType< + typeof createChannelProgressDraftCompositor +>; +type ProgressDraftLine = string | ChannelProgressDraftLine; + +export function createChannelProgressDraftCompositor(params: { + entry: StreamingCompatEntry | null | undefined; + mode: ChannelProgressDraftMode; + active: boolean; + seed: string; + update: (text: string, options?: { flush?: boolean }) => Promise | void; + deleteCurrent?: () => Promise | void; + tryNativeUpdate?: (text: string) => Promise | boolean; + formatLine?: (line: string) => string; + isEmptyLine?: (line: ProgressDraftLine | undefined) => boolean; + shouldStartNow?: (line: ProgressDraftLine | undefined) => boolean; +}) { + const previewToolProgressEnabled = + params.active && resolveChannelStreamingPreviewToolProgress(params.entry); + const commentaryProgressEnabled = + params.active && resolveChannelStreamingProgressCommentary(params.entry); + const suppressDefaultToolProgressMessages = + params.active && + resolveChannelStreamingSuppressDefaultToolProgressMessages(params.entry, { + draftStreamActive: true, + previewToolProgressEnabled, + }); + let progressSuppressed = false; + let lines: ProgressDraftLine[] = []; + let lastRenderedText = ""; + let reasoningRawText = ""; + let lastReasoningLine: string | undefined; + let finalReplyStarted = false; + let finalReplyDelivered = false; + + const formatDraftText = (draftLines = lines, options?: { formatted?: boolean }) => + formatChannelProgressDraftText({ + entry: params.entry, + lines: draftLines, + seed: params.seed, + formatLine: options?.formatted === false ? undefined : params.formatLine, + }); + + const clearProgressState = (suppressed: boolean) => { + progressSuppressed = suppressed; + lines = []; + lastRenderedText = ""; + reasoningRawText = ""; + lastReasoningLine = undefined; + }; + + const render = async (options?: { flush?: boolean }): Promise => { + if (!params.active || params.mode !== "progress") { + return false; + } + const text = formatDraftText(); + if (!text || text === lastRenderedText) { + return false; + } + lastRenderedText = text; + await params.update(text, options); + return true; + }; + + const gate = createChannelProgressDraftGate({ + onStart: async () => { + await render({ flush: true }); + }, + }); + + const clearLine = async (lineId: string) => { + const nextLines = lines.filter( + (line) => typeof line !== "object" || line.id?.trim() !== lineId, + ); + if (nextLines.length === lines.length) { + return; + } + lines = nextLines; + if (!gate.hasStarted) { + return; + } + const text = formatDraftText(); + if (text) { + await render(); + return; + } + lastRenderedText = ""; + await params.deleteCurrent?.(); + }; + + const noteProgress = async ( + line?: ProgressDraftLine, + options?: { toolName?: string; startImmediately?: boolean }, + ) => { + if (!params.active || finalReplyStarted || finalReplyDelivered) { + return false; + } + if (options?.toolName !== undefined && !isChannelProgressDraftWorkToolName(options.toolName)) { + return false; + } + if (params.isEmptyLine?.(line)) { + return false; + } + const normalized = normalizeChannelProgressDraftLineIdentity(line); + if (!normalized || progressSuppressed) { + return false; + } + if (params.mode !== "progress" && !previewToolProgressEnabled) { + return false; + } + const progressLine = typeof line === "object" && line !== undefined ? line : normalized; + const shouldStoreLine = previewToolProgressEnabled; + const nextLines = shouldStoreLine + ? mergeChannelProgressDraftLine(lines, progressLine, { + maxLines: resolveChannelProgressDraftMaxLines(params.entry), + }) + : lines; + if (shouldStoreLine && nextLines === lines) { + return false; + } + if (shouldStoreLine && params.tryNativeUpdate) { + const text = formatDraftText(nextLines, { formatted: false }); + if (text && (await params.tryNativeUpdate(text))) { + lines = nextLines; + lastRenderedText = text; + return true; + } + } + lines = nextLines; + if (params.mode !== "progress") { + if (!shouldStoreLine) { + return false; + } + const text = formatDraftText(); + if (!text || text === lastRenderedText) { + return false; + } + lastRenderedText = text; + await params.update(text); + return true; + } + if (options?.startImmediately || params.shouldStartNow?.(line)) { + await gate.startNow(); + return gate.hasStarted ? await render() : false; + } + const alreadyStarted = gate.hasStarted; + const progressActive = await gate.noteWork(); + if ((alreadyStarted || progressActive) && gate.hasStarted) { + return await render(); + } + return false; + }; + + return { + get previewToolProgressEnabled() { + return previewToolProgressEnabled; + }, + get commentaryProgressEnabled() { + return commentaryProgressEnabled; + }, + get suppressDefaultToolProgressMessages() { + return suppressDefaultToolProgressMessages; + }, + get hasStarted() { + return gate.hasStarted; + }, + markFinalReplyStarted() { + finalReplyStarted = true; + }, + markFinalReplyDelivered() { + finalReplyDelivered = true; + }, + reset() { + clearProgressState(false); + }, + suppress() { + clearProgressState(true); + }, + cancel() { + gate.cancel(); + }, + start() { + return gate.startNow(); + }, + pushToolProgress: noteProgress, + async pushReasoningProgress(text?: string, options?: { snapshot?: boolean }) { + if ( + !params.active || + params.mode !== "progress" || + !text || + progressSuppressed || + finalReplyDelivered + ) { + return false; + } + reasoningRawText = mergeReasoningProgressText(reasoningRawText, text, { + snapshot: options?.snapshot === true, + }); + const normalized = normalizeReasoningProgressLine(reasoningRawText); + if (!normalized) { + return false; + } + const displayLine = formatReasoningProgressDisplayLine( + normalized, + resolveChannelProgressDraftMaxLineChars(params.entry), + ); + if (!displayLine) { + return false; + } + if (previewToolProgressEnabled) { + const priorIndex = + lastReasoningLine === undefined ? -1 : lines.lastIndexOf(lastReasoningLine); + if (priorIndex >= 0) { + lines = [...lines]; + lines[priorIndex] = displayLine; + } else { + lines = [...lines, displayLine].slice(-resolveChannelProgressDraftMaxLines(params.entry)); + } + lastReasoningLine = displayLine; + } + const progressActive = await gate.noteWork(); + if (progressActive && gate.hasStarted) { + return await render(); + } + return false; + }, + async pushCommentaryProgress(text?: string, options?: { itemId?: string }) { + if (!params.active || params.mode !== "progress" || !commentaryProgressEnabled) { + return false; + } + if (finalReplyStarted || finalReplyDelivered) { + return false; + } + const itemId = options?.itemId?.trim(); + if (!text && !itemId) { + return false; + } + const normalized = normalizeCommentaryProgressText(text ?? ""); + const lineId = itemId ? `commentary:${itemId}` : normalized ? `commentary:${normalized}` : ""; + if (!normalized) { + if (lineId) { + await clearLine(lineId); + } + return false; + } + const line: ChannelProgressDraftLine = { + id: lineId, + kind: "item", + text: normalized, + label: "Commentary", + prefix: false, + }; + lines = mergeChannelProgressDraftLine(lines, line, { + maxLines: resolveChannelProgressDraftMaxLines(params.entry), + }); + await gate.startNow(); + return await render(); + }, + }; +} + +function normalizeReasoningProgressLine(text: string): string { + return stripReasoningProgressTags(text) + .replace( + /^\s*(?:>\s*)?(?:Reasoning:\s*(?:\r?\n|\r)\s*|Thinking\.{0,3}\s*(?:\r?\n|\r)\s*(?:\r?\n|\r)\s*)/i, + "", + ) + .replace(/\s+/g, " ") + .trim(); +} + +function stripReasoningProgressTags(text: string): string { + return text.replace( + /<\s*\/?\s*(?:(?:antml:)?(?:think(?:ing)?|thought)|antthinking)\b[^<>]*>/giu, + "", + ); +} + +function normalizeReasoningProgressInput(text: string): string { + const normalized = normalizeReasoningProgressLine(text); + const italic = normalized.match(/^_(.*)_$/u); + return (italic?.[1] ?? normalized).trim(); +} + +function formatReasoningProgressDisplayLine(text: string, maxChars: number): string { + const normalizedText = normalizeReasoningProgressInput(text); + const formatted = normalizeReasoningProgressLine(formatReasoningMessage(normalizedText)); + if (!formatted) { + return ""; + } + if (Array.from(formatted).length <= maxChars) { + return formatted; + } + const italic = formatted.match(/^_(.*)_$/u); + if (!italic) { + return compactReasoningProgressDisplayLine(formatted, maxChars); + } + const body = compactReasoningProgressDisplayLine(italic[1] ?? "", Math.max(1, maxChars - 2)); + return body ? `_${body}_` : ""; +} + +function compactReasoningProgressDisplayLine(text: string, maxChars: number): string { + const normalized = text.replace(/\s+/g, " ").trim(); + const chars = Array.from(normalized); + if (chars.length <= maxChars) { + return normalized; + } + if (maxChars <= 1) { + return "…"; + } + const head = chars + .slice(0, maxChars - 1) + .join("") + .trimEnd(); + const boundary = head.search(/\s+\S*$/u); + if (boundary > Math.floor(maxChars * 0.6)) { + return `${head.slice(0, boundary).trimEnd()}…`; + } + return `${head}…`; +} + +function normalizeCommentaryProgressText(text: string): string { + const cleaned = stripInlineDirectiveTagsForDelivery(text).text.trim(); + if (!cleaned || isSilentCommentaryProgressText(cleaned)) { + return ""; + } + return cleaned + .split(/\r?\n/u) + .map((line) => line.replace(/\s+/g, " ").trim()) + .filter(Boolean) + .map((line) => `_${line}_`) + .join("\n"); +} + +function isSilentCommentaryProgressText(text: string): boolean { + const normalized = text.replace(/^[\s*_`~]+|[\s*_`~]+$/gu, "").trim(); + return /^NO_REPLY$/iu.test(normalized); +} + +function mergeReasoningProgressText( + current: string, + incoming: string, + options?: { snapshot?: boolean }, +): string { + if (!current) { + return incoming; + } + const normalizedCurrent = normalizeReasoningProgressInput(current); + const normalizedIncoming = normalizeReasoningProgressInput(incoming); + if (!normalizedIncoming || normalizedIncoming === normalizedCurrent) { + return current; + } + if ( + options?.snapshot === true || + isReasoningSnapshotText(incoming) || + normalizedIncoming.startsWith(normalizedCurrent) + ) { + return incoming; + } + return `${current}${incoming}`; +} + +function isReasoningSnapshotText(text: string): boolean { + return /^\s*(?:>\s*)?(?:Reasoning:\s*(?:\r?\n|\r)\s*|Thinking\.{0,3}\s*(?:\r?\n|\r)\s*(?:\r?\n|\r)\s*)/i.test( + text, + ); +} diff --git a/src/channels/streaming.ts b/src/channels/streaming.ts index ce266928891..103f51f90a9 100644 --- a/src/channels/streaming.ts +++ b/src/channels/streaming.ts @@ -26,7 +26,7 @@ export type { } from "../config/types.base.js"; export type { SlackChannelStreamingConfig } from "../config/types.slack.js"; -type StreamingCompatEntry = { +export type StreamingCompatEntry = { streaming?: unknown; streamMode?: unknown; chunkMode?: unknown; diff --git a/src/plugin-sdk/channel-outbound.ts b/src/plugin-sdk/channel-outbound.ts index c051514acc5..413fd2db80a 100644 --- a/src/plugin-sdk/channel-outbound.ts +++ b/src/plugin-sdk/channel-outbound.ts @@ -75,6 +75,7 @@ export type { OutboundSendDeps } from "../infra/outbound/send-deps.js"; export { sanitizeForPlainText } from "../infra/outbound/sanitize-text.js"; export { logAckFailure, logTypingFailure } from "../channels/logging.js"; export * from "../channels/streaming.js"; +export * from "../channels/progress-draft-compositor.js"; export { classifyDurableSendRecoveryState, createChannelMessageAdapterFromOutbound,