From a7c1376b207e337cd7c3058b3a163fac1235fd7d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 24 Apr 2026 05:18:00 +0100 Subject: [PATCH] fix(block-streaming): dedupe aborted final text --- CHANGELOG.md | 1 + .../reply/block-reply-pipeline.test.ts | 71 +++++++++++++++++++ src/auto-reply/reply/block-reply-pipeline.ts | 18 ++++- 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index eebc60af1dd..9e4767bafda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Codex harness: route native `request_user_input` prompts back to the originating chat, preserve queued follow-up answers, and honor newer app-server command approval amendment decisions. - Codex harness/context-engine: redact context-engine assembly failures before logging, so fallback warnings do not serialize raw error objects. (#70809) Thanks @jalehman. +- Block streaming: suppress final assembled text after partial block-delivery aborts when the already-sent text chunks exactly cover the final reply, preventing duplicate replies without dropping unrelated short messages. Fixes #70921. - Codex harness/Windows: resolve npm-installed `codex.cmd` shims through PATHEXT before starting the native app-server, so `codex/*` models work without a manual `.exe` shim. Fixes #70913. - Slack/groups: classify MPIM group DMs as group chat context and suppress verbose tool/plan progress on Slack non-DM surfaces, so internal "Working…" traces no longer leak into rooms. Fixes #70912. - Agents/replay: stop OpenAI/Codex transcript replay from synthesizing missing tool results while still preserving synthetic repair on Anthropic, Gemini, and Bedrock transport-owned sessions. (#61556) Thanks @VictorJeon and @vincentkoc. diff --git a/src/auto-reply/reply/block-reply-pipeline.test.ts b/src/auto-reply/reply/block-reply-pipeline.test.ts index cd19c92e065..a7bc76b58b1 100644 --- a/src/auto-reply/reply/block-reply-pipeline.test.ts +++ b/src/auto-reply/reply/block-reply-pipeline.test.ts @@ -100,3 +100,74 @@ describe("createBlockReplyPipeline dedup with threading", () => { expect(sent).toEqual(["Alpha", "Beta"]); }); }); + +describe("createBlockReplyPipeline content coverage dedup", () => { + it("matches final assembled text to successfully streamed text chunks after abort", async () => { + let callCount = 0; + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => { + callCount += 1; + if (callCount === 3) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + }, + timeoutMs: 1, + }); + + pipeline.enqueue({ text: "First paragraph." }); + pipeline.enqueue({ text: "Second paragraph." }); + pipeline.enqueue({ text: "Third paragraph." }); + await pipeline.flush({ force: true }); + + expect(pipeline.didStream()).toBe(true); + expect(pipeline.isAborted()).toBe(true); + expect(pipeline.hasSentPayload({ text: "First paragraph.\n\nSecond paragraph." })).toBe(true); + }); + + it("does not match final assembled text with content that was not streamed", async () => { + let callCount = 0; + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => { + callCount += 1; + if (callCount === 2) { + await new Promise((resolve) => setTimeout(resolve, 50)); + } + }, + timeoutMs: 1, + }); + + pipeline.enqueue({ text: "First paragraph." }); + pipeline.enqueue({ text: "Second paragraph." }); + await pipeline.flush({ force: true }); + + expect(pipeline.didStream()).toBe(true); + expect(pipeline.isAborted()).toBe(true); + expect(pipeline.hasSentPayload({ text: "First paragraph.\n\nSecond paragraph." })).toBe(false); + }); + + it("does not suppress media payloads through streamed text coverage", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + pipeline.enqueue({ text: "Description" }); + await pipeline.flush({ force: true }); + + expect(pipeline.hasSentPayload({ text: "Description", mediaUrl: "file:///photo.jpg" })).toBe( + false, + ); + }); + + it("does not suppress unrelated shorter text that appears inside streamed content", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + pipeline.enqueue({ text: "Here is a summary." }); + await pipeline.flush({ force: true }); + + expect(pipeline.hasSentPayload({ text: "summary" })).toBe(false); + }); +}); diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index f63e566b0f7..1898d5d8443 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -91,6 +91,7 @@ export function createBlockReplyPipeline(params: { const bufferedKeys = new Set(); const bufferedPayloadKeys = new Set(); const bufferedPayloads: ReplyPayload[] = []; + const streamedTextFragments: string[] = []; let bufferedAssistantMessageIndex: number | undefined; let sendChain: Promise = Promise.resolve(); let aborted = false; @@ -147,6 +148,10 @@ export function createBlockReplyPipeline(params: { } sentKeys.add(payloadKey); sentContentKeys.add(contentKey); + const reply = resolveSendableOutboundReplyParts(payload); + if (!reply.hasMedia && reply.trimmedText) { + streamedTextFragments.push(reply.trimmedText); + } didStream = true; }) .catch((err) => { @@ -266,7 +271,18 @@ export function createBlockReplyPipeline(params: { isAborted: () => aborted, hasSentPayload: (payload) => { const payloadKey = createBlockReplyContentKey(payload); - return sentContentKeys.has(payloadKey); + if (sentContentKeys.has(payloadKey)) { + return true; + } + if (!didStream || streamedTextFragments.length === 0) { + return false; + } + const reply = resolveSendableOutboundReplyParts(payload); + if (reply.hasMedia || !reply.trimmedText) { + return false; + } + const normalize = (text: string) => text.replace(/\s+/g, ""); + return normalize(streamedTextFragments.join("")) === normalize(reply.trimmedText); }, }; }