diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index 3a5d960fb00..f3c73a3f416 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -51,15 +51,15 @@ describe("createAcpReplyProjector", () => { }); await projector.onEvent({ type: "text_delta", text: "A", tag: "agent_message_chunk" }); - await vi.advanceTimersByTimeAsync(60); + await vi.advanceTimersByTimeAsync(760); await projector.flush(false); await projector.onEvent({ type: "text_delta", text: "B", tag: "agent_message_chunk" }); - await vi.advanceTimersByTimeAsync(60); + await vi.advanceTimersByTimeAsync(760); await projector.flush(false); await projector.onEvent({ type: "text_delta", text: "C", tag: "agent_message_chunk" }); - await vi.advanceTimersByTimeAsync(60); + await vi.advanceTimersByTimeAsync(760); await projector.flush(false); expect(deliveries.filter((entry) => entry.kind === "block")).toEqual([ @@ -103,6 +103,55 @@ describe("createAcpReplyProjector", () => { ]); }); + it("does not flush short live fragments mid-phrase on idle", async () => { + vi.useFakeTimers(); + try { + const deliveries: Array<{ kind: string; text?: string }> = []; + const projector = createAcpReplyProjector({ + cfg: createCfg({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + coalesceIdleMs: 100, + maxChunkChars: 256, + }, + }, + }), + shouldSendToolSummaries: true, + deliver: async (kind, payload) => { + deliveries.push({ kind, text: payload.text }); + return true; + }, + }); + + await projector.onEvent({ + type: "text_delta", + text: "Yes. Send me the term(s), and I’ll run ", + tag: "agent_message_chunk", + }); + + await vi.advanceTimersByTimeAsync(1200); + expect(deliveries).toEqual([]); + + await projector.onEvent({ + type: "text_delta", + text: "`wd-cli` searches right away. ", + tag: "agent_message_chunk", + }); + await projector.flush(false); + + expect(deliveries).toEqual([ + { + kind: "block", + text: "Yes. Send me the term(s), and I’ll run `wd-cli` searches right away. ", + }, + ]); + } finally { + vi.useRealTimers(); + } + }); + it("supports deliveryMode=final_only by buffering all projected output until done", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ @@ -649,7 +698,7 @@ describe("createAcpReplyProjector", () => { expect(deliveries[0]?.text).toContain("Tool Call"); }); - it("inserts a paragraph boundary before visible text after hidden tool updates by default", async () => { + it("inserts a space boundary before visible text after hidden tool updates by default", async () => { const deliveries: Array<{ kind: string; text?: string }> = []; const projector = createAcpReplyProjector({ cfg: createCfg({ @@ -685,7 +734,7 @@ describe("createAcpReplyProjector", () => { .filter((entry) => entry.kind === "block") .map((entry) => entry.text ?? "") .join(""); - expect(combinedText).toBe("fallback.\n\nI don't"); + expect(combinedText).toBe("fallback. I don't"); }); it("supports hiddenBoundarySeparator=space", async () => { diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index 18326a0a3f8..7ec4ca4c5e3 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -14,6 +14,10 @@ import { createBlockReplyPipeline } from "./block-reply-pipeline.js"; import type { ReplyDispatchKind } from "./reply-dispatcher.js"; const ACP_BLOCK_REPLY_TIMEOUT_MS = 15_000; +const ACP_LIVE_IDLE_FLUSH_FLOOR_MS = 750; +const ACP_LIVE_IDLE_MIN_CHARS = 80; +const ACP_LIVE_SOFT_FLUSH_CHARS = 220; +const ACP_LIVE_HARD_FLUSH_CHARS = 480; const TERMINAL_TOOL_STATUSES = new Set(["completed", "failed", "cancelled", "done", "error"]); const HIDDEN_BOUNDARY_TAGS = new Set(["tool_call", "tool_call_update"]); @@ -99,6 +103,41 @@ function shouldInsertSeparator(params: { return true; } +function shouldFlushLiveBufferOnBoundary(text: string): boolean { + if (!text) { + return false; + } + if (text.length >= ACP_LIVE_HARD_FLUSH_CHARS) { + return true; + } + if (text.endsWith("\n\n")) { + return true; + } + if (/[.!?][)"'`]*\s$/.test(text)) { + return true; + } + if (text.length >= ACP_LIVE_SOFT_FLUSH_CHARS && /\s$/.test(text)) { + return true; + } + return false; +} + +function shouldFlushLiveBufferOnIdle(text: string): boolean { + if (!text) { + return false; + } + if (text.length >= ACP_LIVE_IDLE_MIN_CHARS) { + return true; + } + if (/[.!?][)"'`]*$/.test(text.trimEnd())) { + return true; + } + if (text.includes("\n")) { + return true; + } + return false; +} + function renderToolSummaryText(event: Extract): string { const detailParts: string[] = []; const title = event.title?.trim(); @@ -148,9 +187,10 @@ export function createAcpReplyProjector(params: { await params.deliver("block", payload); }, timeoutMs: ACP_BLOCK_REPLY_TIMEOUT_MS, - coalescing: streaming.coalescing, + coalescing: settings.deliveryMode === "live" ? undefined : streaming.coalescing, }); const chunker = new EmbeddedBlockChunker(streaming.chunking); + const liveIdleFlushMs = Math.max(streaming.coalescing.idleMs, ACP_LIVE_IDLE_FLUSH_FLOOR_MS); let emittedTurnChars = 0; let emittedMetaEvents = 0; @@ -160,10 +200,65 @@ export function createAcpReplyProjector(params: { let lastUsageTuple: string | undefined; let lastVisibleOutputTail: string | undefined; let pendingHiddenBoundary = false; + let liveBufferText = ""; + let liveIdleTimer: NodeJS.Timeout | undefined; const pendingToolDeliveries: BufferedToolDelivery[] = []; const toolLifecycleById = new Map(); + const clearLiveIdleTimer = () => { + if (!liveIdleTimer) { + return; + } + clearTimeout(liveIdleTimer); + liveIdleTimer = undefined; + }; + + const drainChunker = (force: boolean) => { + if (settings.deliveryMode === "final_only" && !force) { + return; + } + chunker.drain({ + force, + emit: (chunk) => { + blockReplyPipeline.enqueue({ text: chunk }); + }, + }); + }; + + const flushLiveBuffer = (opts?: { force?: boolean; idle?: boolean }) => { + if (settings.deliveryMode !== "live") { + return; + } + if (!liveBufferText) { + return; + } + if (opts?.idle && !shouldFlushLiveBufferOnIdle(liveBufferText)) { + return; + } + const text = liveBufferText; + liveBufferText = ""; + chunker.append(text); + drainChunker(opts?.force === true); + }; + + const scheduleLiveIdleFlush = () => { + if (settings.deliveryMode !== "live") { + return; + } + if (liveIdleFlushMs <= 0 || !liveBufferText) { + return; + } + clearLiveIdleTimer(); + liveIdleTimer = setTimeout(() => { + flushLiveBuffer({ force: true, idle: true }); + if (liveBufferText) { + scheduleLiveIdleFlush(); + } + }, liveIdleFlushMs); + }; + const resetTurnState = () => { + clearLiveIdleTimer(); emittedTurnChars = 0; emittedMetaEvents = 0; truncationNoticeEmitted = false; @@ -172,23 +267,11 @@ export function createAcpReplyProjector(params: { lastUsageTuple = undefined; lastVisibleOutputTail = undefined; pendingHiddenBoundary = false; + liveBufferText = ""; pendingToolDeliveries.length = 0; toolLifecycleById.clear(); }; - const drainChunker = (force: boolean) => { - if (settings.deliveryMode === "final_only" && !force) { - return; - } - const effectiveForce = settings.deliveryMode === "live" ? true : force; - chunker.drain({ - force: effectiveForce, - emit: (chunk) => { - blockReplyPipeline.enqueue({ text: chunk }); - }, - }); - }; - const flushBufferedToolDeliveries = async (force: boolean) => { if (!(settings.deliveryMode === "final_only" && force)) { return; @@ -199,6 +282,10 @@ export function createAcpReplyProjector(params: { }; const flush = async (force = false): Promise => { + if (settings.deliveryMode === "live") { + clearLiveIdleTimer(); + flushLiveBuffer({ force: true }); + } await flushBufferedToolDeliveries(force); drainChunker(force); await blockReplyPipeline.flush({ force }); @@ -362,10 +449,20 @@ export function createAcpReplyProjector(params: { const remaining = settings.maxTurnChars - emittedTurnChars; const accepted = remaining < text.length ? text.slice(0, remaining) : text; if (accepted.length > 0) { - chunker.append(accepted); emittedTurnChars += accepted.length; lastVisibleOutputTail = accepted.slice(-1); - drainChunker(false); + if (settings.deliveryMode === "live") { + liveBufferText += accepted; + if (shouldFlushLiveBufferOnBoundary(liveBufferText)) { + clearLiveIdleTimer(); + flushLiveBuffer({ force: true }); + } else { + scheduleLiveIdleFlush(); + } + } else { + chunker.append(accepted); + drainChunker(false); + } } if (accepted.length < text.length) { await emitTruncationNotice(); @@ -396,7 +493,9 @@ export function createAcpReplyProjector(params: { if (event.type === "tool_call") { if (!isAcpTagVisible(settings, event.tag)) { if (event.tag && HIDDEN_BOUNDARY_TAGS.has(event.tag)) { - pendingHiddenBoundary = true; + const status = normalizeToolStatus(event.status); + const isTerminal = status ? TERMINAL_TOOL_STATUSES.has(status) : false; + pendingHiddenBoundary = event.tag === "tool_call" || isTerminal; } return; } diff --git a/src/auto-reply/reply/acp-stream-settings.test.ts b/src/auto-reply/reply/acp-stream-settings.test.ts index a5ffd1a6d44..c176cbccdba 100644 --- a/src/auto-reply/reply/acp-stream-settings.test.ts +++ b/src/auto-reply/reply/acp-stream-settings.test.ts @@ -54,6 +54,7 @@ describe("acp stream settings", () => { }), ); expect(settings.deliveryMode).toBe("live"); + expect(settings.hiddenBoundarySeparator).toBe("space"); }); it("uses default tag visibility when no override is provided", () => { diff --git a/src/auto-reply/reply/acp-stream-settings.ts b/src/auto-reply/reply/acp-stream-settings.ts index 2eae975f89d..00e7dd8a1f7 100644 --- a/src/auto-reply/reply/acp-stream-settings.ts +++ b/src/auto-reply/reply/acp-stream-settings.ts @@ -7,6 +7,7 @@ const DEFAULT_ACP_STREAM_MAX_CHUNK_CHARS = 1800; const DEFAULT_ACP_REPEAT_SUPPRESSION = true; const DEFAULT_ACP_DELIVERY_MODE = "final_only"; const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR = "paragraph"; +const DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR_LIVE = "space"; const DEFAULT_ACP_MAX_TURN_CHARS = 24_000; const DEFAULT_ACP_MAX_TOOL_SUMMARY_CHARS = 320; const DEFAULT_ACP_MAX_STATUS_CHARS = 320; @@ -68,11 +69,14 @@ function resolveAcpDeliveryMode(value: unknown): AcpDeliveryMode { return DEFAULT_ACP_DELIVERY_MODE; } -function resolveAcpHiddenBoundarySeparator(value: unknown): AcpHiddenBoundarySeparator { +function resolveAcpHiddenBoundarySeparator( + value: unknown, + fallback: AcpHiddenBoundarySeparator, +): AcpHiddenBoundarySeparator { if (value === "none" || value === "space" || value === "newline" || value === "paragraph") { return value; } - return DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR; + return fallback; } function resolveAcpStreamCoalesceIdleMs(cfg: OpenClawConfig): number { @@ -95,9 +99,17 @@ function resolveAcpStreamMaxChunkChars(cfg: OpenClawConfig): number { export function resolveAcpProjectionSettings(cfg: OpenClawConfig): AcpProjectionSettings { const stream = cfg.acp?.stream; + const deliveryMode = resolveAcpDeliveryMode(stream?.deliveryMode); + const hiddenBoundaryFallback: AcpHiddenBoundarySeparator = + deliveryMode === "live" + ? DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR_LIVE + : DEFAULT_ACP_HIDDEN_BOUNDARY_SEPARATOR; return { - deliveryMode: resolveAcpDeliveryMode(stream?.deliveryMode), - hiddenBoundarySeparator: resolveAcpHiddenBoundarySeparator(stream?.hiddenBoundarySeparator), + deliveryMode, + hiddenBoundarySeparator: resolveAcpHiddenBoundarySeparator( + stream?.hiddenBoundarySeparator, + hiddenBoundaryFallback, + ), repeatSuppression: clampBoolean(stream?.repeatSuppression, DEFAULT_ACP_REPEAT_SUPPRESSION), maxTurnChars: clampPositiveInteger(stream?.maxTurnChars, DEFAULT_ACP_MAX_TURN_CHARS, { min: 1,