fix(block-streaming): dedupe aborted final text

This commit is contained in:
Peter Steinberger
2026-04-24 05:18:00 +01:00
parent deb300d905
commit a7c1376b20
3 changed files with 89 additions and 1 deletions

View File

@@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai
- Codex harness: route native `request_user_input` prompts back to the originating chat, preserve queued follow-up answers, and honor newer app-server command approval amendment decisions.
- Codex harness/context-engine: redact context-engine assembly failures before logging, so fallback warnings do not serialize raw error objects. (#70809) Thanks @jalehman.
- Block streaming: suppress final assembled text after partial block-delivery aborts when the already-sent text chunks exactly cover the final reply, preventing duplicate replies without dropping unrelated short messages. Fixes #70921.
- Codex harness/Windows: resolve npm-installed `codex.cmd` shims through PATHEXT before starting the native app-server, so `codex/*` models work without a manual `.exe` shim. Fixes #70913.
- Slack/groups: classify MPIM group DMs as group chat context and suppress verbose tool/plan progress on Slack non-DM surfaces, so internal "Working…" traces no longer leak into rooms. Fixes #70912.
- Agents/replay: stop OpenAI/Codex transcript replay from synthesizing missing tool results while still preserving synthetic repair on Anthropic, Gemini, and Bedrock transport-owned sessions. (#61556) Thanks @VictorJeon and @vincentkoc.

View File

@@ -100,3 +100,74 @@ describe("createBlockReplyPipeline dedup with threading", () => {
expect(sent).toEqual(["Alpha", "Beta"]);
});
});
describe("createBlockReplyPipeline content coverage dedup", () => {
it("matches final assembled text to successfully streamed text chunks after abort", async () => {
let callCount = 0;
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {
callCount += 1;
if (callCount === 3) {
await new Promise((resolve) => setTimeout(resolve, 50));
}
},
timeoutMs: 1,
});
pipeline.enqueue({ text: "First paragraph." });
pipeline.enqueue({ text: "Second paragraph." });
pipeline.enqueue({ text: "Third paragraph." });
await pipeline.flush({ force: true });
expect(pipeline.didStream()).toBe(true);
expect(pipeline.isAborted()).toBe(true);
expect(pipeline.hasSentPayload({ text: "First paragraph.\n\nSecond paragraph." })).toBe(true);
});
it("does not match final assembled text with content that was not streamed", async () => {
let callCount = 0;
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {
callCount += 1;
if (callCount === 2) {
await new Promise((resolve) => setTimeout(resolve, 50));
}
},
timeoutMs: 1,
});
pipeline.enqueue({ text: "First paragraph." });
pipeline.enqueue({ text: "Second paragraph." });
await pipeline.flush({ force: true });
expect(pipeline.didStream()).toBe(true);
expect(pipeline.isAborted()).toBe(true);
expect(pipeline.hasSentPayload({ text: "First paragraph.\n\nSecond paragraph." })).toBe(false);
});
it("does not suppress media payloads through streamed text coverage", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
pipeline.enqueue({ text: "Description" });
await pipeline.flush({ force: true });
expect(pipeline.hasSentPayload({ text: "Description", mediaUrl: "file:///photo.jpg" })).toBe(
false,
);
});
it("does not suppress unrelated shorter text that appears inside streamed content", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
pipeline.enqueue({ text: "Here is a summary." });
await pipeline.flush({ force: true });
expect(pipeline.hasSentPayload({ text: "summary" })).toBe(false);
});
});

View File

@@ -91,6 +91,7 @@ export function createBlockReplyPipeline(params: {
const bufferedKeys = new Set<string>();
const bufferedPayloadKeys = new Set<string>();
const bufferedPayloads: ReplyPayload[] = [];
const streamedTextFragments: string[] = [];
let bufferedAssistantMessageIndex: number | undefined;
let sendChain: Promise<void> = Promise.resolve();
let aborted = false;
@@ -147,6 +148,10 @@ export function createBlockReplyPipeline(params: {
}
sentKeys.add(payloadKey);
sentContentKeys.add(contentKey);
const reply = resolveSendableOutboundReplyParts(payload);
if (!reply.hasMedia && reply.trimmedText) {
streamedTextFragments.push(reply.trimmedText);
}
didStream = true;
})
.catch((err) => {
@@ -266,7 +271,18 @@ export function createBlockReplyPipeline(params: {
isAborted: () => aborted,
hasSentPayload: (payload) => {
const payloadKey = createBlockReplyContentKey(payload);
return sentContentKeys.has(payloadKey);
if (sentContentKeys.has(payloadKey)) {
return true;
}
if (!didStream || streamedTextFragments.length === 0) {
return false;
}
const reply = resolveSendableOutboundReplyParts(payload);
if (reply.hasMedia || !reply.trimmedText) {
return false;
}
const normalize = (text: string) => text.replace(/\s+/g, "");
return normalize(streamedTextFragments.join("")) === normalize(reply.trimmedText);
},
};
}