diff --git a/src/auto-reply/reply/acp-projector.test.ts b/src/auto-reply/reply/acp-projector.test.ts index e413b07888f..7402e07d1cc 100644 --- a/src/auto-reply/reply/acp-projector.test.ts +++ b/src/auto-reply/reply/acp-projector.test.ts @@ -201,7 +201,7 @@ describe("createAcpReplyProjector", () => { expect(onProgress).toHaveBeenCalledTimes(2); }); - it("coalesces text deltas into bounded block chunks", async () => { + it("buffers default final-only text into one final reply", async () => { const { deliveries, projector } = createProjectorHarness(); await projector.onEvent({ @@ -211,10 +211,7 @@ describe("createAcpReplyProjector", () => { }); await projector.flush(true); - expect(deliveries).toEqual([ - { kind: "block", text: "a".repeat(64) }, - { kind: "block", text: "a".repeat(6) }, - ]); + expect(deliveries).toEqual([{ kind: "final", text: "a".repeat(70) }]); }); it("does not suppress identical short text across terminal turn boundaries", async () => { @@ -363,7 +360,7 @@ describe("createAcpReplyProjector", () => { text: prefixSystemMessage("available commands updated (7)"), }); expectToolCallSummary(deliveries[1]); - expect(deliveries[2]).toEqual({ kind: "block", text: "What now?" }); + expect(deliveries[2]).toEqual({ kind: "final", text: "What now?" }); }); it("flushes buffered status/tool output on error in deliveryMode=final_only", async () => { diff --git a/src/auto-reply/reply/acp-projector.ts b/src/auto-reply/reply/acp-projector.ts index 0ab3887476d..3b919a2b3d4 100644 --- a/src/auto-reply/reply/acp-projector.ts +++ b/src/auto-reply/reply/acp-projector.ts @@ -204,6 +204,7 @@ export function createAcpReplyProjector(params: { let lastVisibleOutputTail: string | undefined; let pendingHiddenBoundary = false; let liveBufferText = ""; + let finalOnlyOutputText = ""; let liveIdleTimer: NodeJS.Timeout | undefined; const pendingToolDeliveries: BufferedToolDelivery[] = []; const toolLifecycleById = new Map(); @@ -272,6 +273,7 @@ export function createAcpReplyProjector(params: { lastVisibleOutputTail = undefined; pendingHiddenBoundary = false; liveBufferText = ""; + finalOnlyOutputText = ""; pendingToolDeliveries.length = 0; toolLifecycleById.clear(); }; @@ -291,7 +293,15 @@ export function createAcpReplyProjector(params: { flushLiveBuffer({ force: true }); } await flushBufferedToolDeliveries(force); - drainChunker(force); + if (settings.deliveryMode === "final_only") { + if (force && finalOnlyOutputText.trim().length > 0) { + const text = finalOnlyOutputText; + finalOnlyOutputText = ""; + await params.deliver("final", { text }); + } + } else { + drainChunker(force); + } await blockReplyPipeline.flush({ force }); }; @@ -445,8 +455,7 @@ export function createAcpReplyProjector(params: { scheduleLiveIdleFlush(); } } else { - chunker.append(accepted); - drainChunker(false); + finalOnlyOutputText += accepted; } } if (accepted.length < text.length) { diff --git a/src/auto-reply/reply/dispatch-acp-delivery.ts b/src/auto-reply/reply/dispatch-acp-delivery.ts index fac3af6bd20..276b1dfb06e 100644 --- a/src/auto-reply/reply/dispatch-acp-delivery.ts +++ b/src/auto-reply/reply/dispatch-acp-delivery.ts @@ -140,6 +140,7 @@ type AcpDispatchDeliveryState = { accumulatedBlockText: string; accumulatedVisibleBlockText: string; accumulatedBlockTtsText: string; + accumulatedFinalText: string; cleanBlockTtsDirectiveText?: ReturnType; blockCount: number; deliveredFinalReply: boolean; @@ -162,6 +163,7 @@ export type AcpDispatchDeliveryCoordinator = { getAccumulatedBlockText: () => string; getAccumulatedVisibleBlockText: () => string; getAccumulatedBlockTtsText: () => string; + getAccumulatedFinalText: () => string; settleVisibleText: () => Promise; hasDeliveredFinalReply: () => boolean; hasDeliveredVisibleText: () => boolean; @@ -202,6 +204,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { accumulatedBlockText: "", accumulatedVisibleBlockText: "", accumulatedBlockTtsText: "", + accumulatedFinalText: "", cleanBlockTtsDirectiveText: shouldCleanTtsDirectiveText({ cfg: params.cfg, ttsAuto: params.sessionTtsAuto, @@ -330,6 +333,13 @@ export function createAcpDispatchDeliveryCoordinator(params: { state.accumulatedVisibleBlockText += visiblePayload.text; } } + const rawFinalText = kind === "final" ? normalizeOptionalString(payload.text) : undefined; + if (rawFinalText) { + if (state.accumulatedFinalText.length > 0) { + state.accumulatedFinalText += "\n"; + } + state.accumulatedFinalText += rawFinalText; + } if (hasOutboundReplyContent(visiblePayload, { trimText: true })) { await startReplyLifecycleOnce(); @@ -445,6 +455,7 @@ export function createAcpDispatchDeliveryCoordinator(params: { getAccumulatedBlockText: () => state.accumulatedBlockText, getAccumulatedVisibleBlockText: () => state.accumulatedVisibleBlockText, getAccumulatedBlockTtsText: () => state.accumulatedBlockTtsText, + getAccumulatedFinalText: () => state.accumulatedFinalText, settleVisibleText: settleDirectVisibleText, hasDeliveredFinalReply: () => state.deliveredFinalReply, hasDeliveredVisibleText: () => state.deliveredVisibleText, diff --git a/src/auto-reply/reply/dispatch-acp.test.ts b/src/auto-reply/reply/dispatch-acp.test.ts index 708d6a894f5..ac66ae2b8f9 100644 --- a/src/auto-reply/reply/dispatch-acp.test.ts +++ b/src/auto-reply/reply/dispatch-acp.test.ts @@ -377,7 +377,11 @@ describe("tryDispatchAcpReply", () => { channelPluginMocks.getChannelPlugin.mockClear(); messageActionMocks.runMessageAction.mockReset(); messageActionMocks.runMessageAction.mockResolvedValue({ ok: true as const }); - ttsMocks.maybeApplyTtsToPayload.mockClear(); + ttsMocks.maybeApplyTtsToPayload.mockReset(); + ttsMocks.maybeApplyTtsToPayload.mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { payload: unknown }; + return params.payload; + }); ttsMocks.resolveTtsConfig.mockReset(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); mediaUnderstandingMocks.applyMediaUnderstanding.mockReset(); @@ -393,7 +397,7 @@ describe("tryDispatchAcpReply", () => { globalThis.fetch = originalFetch; }); - it("routes ACP block output to originating channel", async () => { + it("routes default ACP output to the originating channel as a final reply", async () => { setReadyAcpResolution(); mockRoutedTextTurn("hello"); @@ -404,14 +408,17 @@ describe("tryDispatchAcpReply", () => { shouldRouteToOriginating: true, }); - expect(result?.counts.block).toBe(1); + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ channel: "telegram", to: "telegram:thread-1", + payload: expect.objectContaining({ text: "hello" }), }), ); expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); it("persists ACP transcript when routed delivery fails", async () => { @@ -1187,18 +1194,18 @@ describe("tryDispatchAcpReply", () => { ); }); - it("does not deliver final fallback text when routed block text was already visible", async () => { + it("does not add a fallback when routed ACP text was already delivered as final", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); const { result } = await runRoutedAcpTextTurn("CODEX_OK"); - expect(result?.counts.block).toBe(1); - expect(result?.counts.final).toBe(0); + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); }); - it("does not deliver final fallback text when routed discord block text was already visible", async () => { + it("routes default ACP text as one final reply to Discord", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies( @@ -1216,8 +1223,8 @@ describe("tryDispatchAcpReply", () => { originatingTo: "channel:1478836151241412759", }); - expect(result?.counts.block).toBe(1); - expect(result?.counts.final).toBe(0); + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); expect(routeMocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ @@ -1228,7 +1235,7 @@ describe("tryDispatchAcpReply", () => { ); }); - it("does not deliver final fallback text when routed Slack block text was already visible", async () => { + it("routes default ACP text as one final reply to Slack", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies( @@ -1246,8 +1253,8 @@ describe("tryDispatchAcpReply", () => { originatingTo: "channel:C123", }); - expect(result?.counts.block).toBe(1); - expect(result?.counts.final).toBe(0); + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); expect(routeMocks.routeReply).toHaveBeenCalledWith( expect.objectContaining({ @@ -1258,7 +1265,7 @@ describe("tryDispatchAcpReply", () => { ); }); - it("does not deliver final fallback text when direct block text was already visible", async () => { + it("delivers default Telegram ACP text directly as a final reply", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); @@ -1278,13 +1285,14 @@ describe("tryDispatchAcpReply", () => { expect(result?.counts.final).toBe(0); expect(counts.block).toBe(0); expect(counts.final).toBe(0); - expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect(result?.queuedFinal).toBe(true); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "CODEX_OK" }), ); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); - it("does not deliver final fallback text when direct discord block text was already visible", async () => { + it("delivers default Discord ACP text directly as a final reply", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies( @@ -1307,13 +1315,14 @@ describe("tryDispatchAcpReply", () => { expect(result?.counts.final).toBe(0); expect(counts.block).toBe(0); expect(counts.final).toBe(0); - expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect(result?.queuedFinal).toBe(true); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "Received." }), ); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); - it("does not deliver final fallback text when direct Slack block text was already visible", async () => { + it("delivers default Slack ACP text directly as a final reply", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies( @@ -1336,13 +1345,14 @@ describe("tryDispatchAcpReply", () => { expect(result?.counts.final).toBe(0); expect(counts.block).toBe(0); expect(counts.final).toBe(0); - expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect(result?.queuedFinal).toBe(true); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "Slack says hi." }), ); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); - it("treats visible telegram ACP block delivery as a successful final response", async () => { + it("treats Telegram ACP final delivery as a successful final response", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); @@ -1359,13 +1369,13 @@ describe("tryDispatchAcpReply", () => { }); expect(result?.queuedFinal).toBe(true); - expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); + expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "CODEX_OK" }), ); - expect(dispatcher.sendFinalReply).not.toHaveBeenCalled(); }); - it("preserves final fallback when direct block text is filtered by channels without a visibility override", async () => { + it("delivers default ACP text as final for channels without a visibility override", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "CODEX_OK" }, {} as ReturnType); @@ -1385,9 +1395,8 @@ describe("tryDispatchAcpReply", () => { expect(result?.counts.final).toBe(0); expect(counts.block).toBe(0); expect(counts.final).toBe(0); - expect(dispatcher.sendBlockReply).toHaveBeenCalledWith( - expect.objectContaining({ text: "CODEX_OK" }), - ); + expect(result?.queuedFinal).toBe(true); + expect(dispatcher.sendBlockReply).not.toHaveBeenCalled(); expect(dispatcher.sendFinalReply).toHaveBeenCalledWith( expect.objectContaining({ text: "CODEX_OK" }), ); @@ -1450,6 +1459,12 @@ describe("tryDispatchAcpReply", () => { it("honors the configured default account for ACP projector chunking when AccountId is omitted", async () => { setReadyAcpResolution(); const cfg = createAcpTestConfig({ + acp: { + enabled: true, + stream: { + deliveryMode: "live", + }, + }, channels: { discord: { defaultAccount: "work", @@ -1489,7 +1504,7 @@ describe("tryDispatchAcpReply", () => { ); }); - it("does not add a second routed payload when routed block text was already visible", async () => { + it("does not add a second routed payload when routed final text was already visible", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "final" }); queueTtsReplies({ text: "Task completed" }, { @@ -1498,21 +1513,21 @@ describe("tryDispatchAcpReply", () => { } as MockTtsReply); const { result } = await runRoutedAcpTextTurn("Task completed"); - expect(result?.counts.block).toBe(1); - expect(result?.counts.final).toBe(0); + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); expectRoutedPayload(1, { text: "Task completed", }); }); - it("skips fallback when TTS mode is all (blocks already processed with TTS)", async () => { + it("skips fallback when TTS mode is all and final delivery already succeeded", async () => { setReadyAcpResolution(); ttsMocks.resolveTtsConfig.mockReturnValue({ mode: "all" }); const { result } = await runRoutedAcpTextTurn("Response"); - expect(result?.counts.block).toBe(1); - expect(result?.counts.final).toBe(0); + expect(result?.counts.block).toBe(0); + expect(result?.counts.final).toBe(1); expect(routeMocks.routeReply).toHaveBeenCalledTimes(1); }); diff --git a/src/auto-reply/reply/dispatch-acp.ts b/src/auto-reply/reply/dispatch-acp.ts index 66642503af9..1b1f22b435a 100644 --- a/src/auto-reply/reply/dispatch-acp.ts +++ b/src/auto-reply/reply/dispatch-acp.ts @@ -519,7 +519,7 @@ export async function tryDispatchAcpReply(params: { cfg: params.cfg, sessionKey: canonicalSessionKey, promptText, - finalText: delivery.getAccumulatedBlockText(), + finalText: delivery.getAccumulatedFinalText() || delivery.getAccumulatedBlockText(), meta: acpResolution.meta, threadId: params.ctx.MessageThreadId, });