diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index de7cd69eb4a..3ffbf1812bb 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -490,6 +490,63 @@ describe("buildReplyPayloads media filter integration", () => { expect(replyPayloads).toHaveLength(0); }); + it("suppresses final text payloads already covered by partial preview streaming", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + previewStreamedText: "First block\n\nSecond block", + payloads: [{ text: "First block" }, { text: "Second block" }], + }); + + expect(replyPayloads).toHaveLength(0); + }); + + it("keeps final text that was not covered by partial preview streaming", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + previewStreamedText: "Working...", + payloads: [{ text: "Done." }], + }); + + expect(replyPayloads).toHaveLength(1); + expectFields(replyPayloads[0], { text: "Done." }); + }); + + it("does not suppress short final text just because it appears inside preview text", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + previewStreamedText: "Working on item 3", + payloads: [{ text: "3" }], + }); + + expect(replyPayloads).toHaveLength(1); + expectFields(replyPayloads[0], { text: "3" }); + }); + + it("preserves media while removing duplicate preview-streamed caption text", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + previewStreamedText: "Here is the chart", + payloads: [{ text: "Here is the chart", mediaUrl: "file:///tmp/chart.png" }], + }); + + expect(replyPayloads).toHaveLength(1); + expectFields(replyPayloads[0], { + text: undefined, + mediaUrl: "file:///tmp/chart.png", + }); + }); + + it("preserves errors even when their text appears in partial preview streaming", async () => { + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + previewStreamedText: "Agent couldn't generate a response. Please try again.", + payloads: [{ text: "Agent couldn't generate a response. Please try again.", isError: true }], + }); + + expect(replyPayloads).toHaveLength(1); + expectFields(replyPayloads[0], { isError: true }); + }); + it("drops all final payloads when block pipeline streamed successfully", async () => { const pipeline: Parameters[0]["blockReplyPipeline"] = { didStream: () => true, diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index 0ef83994a7f..1a77b2df6f2 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -158,6 +158,7 @@ export async function buildReplyPayloads(params: { blockStreamingEnabled: boolean; blockReplyPipeline: BlockReplyPipeline | null; /** Payload keys sent directly (not via pipeline) during tool flush. */ + previewStreamedText?: string; directlySentBlockKeys?: Set; replyToMode: ReplyToMode; replyToChannel?: OriginatingChannelType; @@ -323,6 +324,54 @@ export async function buildReplyPayloads(params: { : mediaFilteredPayloads; const isDirectlySentBlockPayload = (payload: ReplyPayload) => Boolean(params.directlySentBlockKeys?.has(createBlockReplyContentKey(payload))); + const normalizePreviewDedupeText = (value: string | undefined): string => + (value ?? "").replace(/\s+/g, " ").trim(); + const buildPreviewDedupeTextSet = (value: string | undefined): Set => { + const dedupeText = new Set(); + const normalizedWhole = normalizePreviewDedupeText(value); + if (normalizedWhole) { + dedupeText.add(normalizedWhole); + } + for (const block of (value ?? "").split(/\n{2,}/u)) { + const normalizedBlock = normalizePreviewDedupeText(block); + if (normalizedBlock) { + dedupeText.add(normalizedBlock); + } + } + return dedupeText; + }; + const previewStreamedText = buildPreviewDedupeTextSet(params.previewStreamedText); + const isPreviewStreamedTextPayload = (payload: ReplyPayload): boolean => { + if (previewStreamedText.size === 0 || payload.isError) { + return false; + } + const text = normalizePreviewDedupeText(payload.text); + return Boolean(text && previewStreamedText.has(text)); + }; + const preserveUnsentMediaAfterPreviewStream = (payload: ReplyPayload): ReplyPayload | null => { + if (!isPreviewStreamedTextPayload(payload)) { + return payload; + } + const reply = resolveSendableOutboundReplyParts(payload); + if (!reply.hasMedia) { + return null; + } + return copyReplyPayloadMetadata(payload, { + ...payload, + text: undefined, + audioAsVoice: payload.audioAsVoice || undefined, + }); + }; + const suppressPreviewStreamedPayloads = (payloads: ReplyPayload[]): ReplyPayload[] => { + const unsent: ReplyPayload[] = []; + for (const payload of payloads) { + const next = preserveUnsentMediaAfterPreviewStream(payload); + if (next) { + unsent.push(next); + } + } + return unsent; + }; const preserveUnsentMediaAfterBlockStream = (payload: ReplyPayload): ReplyPayload | null => { if (payload.isError || payload.isFallbackNotice) { return payload; @@ -383,7 +432,9 @@ export async function buildReplyPayloads(params: { } return unsent; })() - : dedupedPayloads; + : previewStreamedText.size > 0 + ? suppressPreviewStreamedPayloads(dedupedPayloads) + : dedupedPayloads; const blockSentMediaUrls = params.blockStreamingEnabled ? await normalizeSentMediaUrlsForDedupe({ sentMediaUrls: params.blockReplyPipeline?.getSentMediaUrls() ?? [], diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 32518ca1ba1..ff9b89a4e6f 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -622,6 +622,27 @@ describe("runReplyAgent typing (heartbeat)", () => { } }); + it("suppresses final text blocks already delivered through partial preview streaming", async () => { + const onPartialReply = vi.fn(); + state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => { + await params.onPartialReply?.({ text: "First block\n\nSecond block" }); + return { + payloads: [{ text: "First block" }, { text: "Second block" }], + meta: {}, + }; + }); + + const { run } = createMinimalRun({ + opts: { onPartialReply }, + typingMode: "message", + }); + + const result = await run(); + + expect(onPartialReply).toHaveBeenCalledWith({ text: "First block\n\nSecond block" }); + expect(result).toBeUndefined(); + }); + it("suppresses narrated silent-turn partials, block replies, and final payloads", async () => { const onPartialReply = vi.fn(); const onBlockReply = vi.fn(); diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 23653db7cb7..27d3b953c2b 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -1277,6 +1277,20 @@ export async function runReplyAgent(params: { : null; const replySessionKey = sessionKey ?? followupRun.run.sessionKey; + let latestPreviewStreamedText: string | undefined; + const effectiveOpts = opts?.onPartialReply + ? { + ...opts, + onPartialReply: async ( + payload: Parameters>[0], + ) => { + if (typeof payload.text === "string" && payload.text.trim()) { + latestPreviewStreamedText = payload.text; + } + await opts.onPartialReply?.(payload); + }, + } + : opts; let replyOperation: ReplyOperation; try { replyOperation = @@ -1459,7 +1473,7 @@ export async function runReplyAgent(params: { sessionCtx, replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading, replyOperation, - opts, + opts: effectiveOpts, typingSignals, blockReplyPipeline, blockStreamingEnabled, @@ -1737,6 +1751,7 @@ export async function runReplyAgent(params: { silentExpected: followupRun.run.silentExpected, blockStreamingEnabled, blockReplyPipeline, + previewStreamedText: latestPreviewStreamedText, directlySentBlockKeys, replyToMode, replyToChannel,