diff --git a/src/auto-reply/reply/block-reply-pipeline.multi-message.test.ts b/src/auto-reply/reply/block-reply-pipeline.multi-message.test.ts new file mode 100644 index 00000000000..3c35699ddec --- /dev/null +++ b/src/auto-reply/reply/block-reply-pipeline.multi-message.test.ts @@ -0,0 +1,61 @@ +import { describe, expect, it } from "vitest"; +import { setReplyPayloadMetadata } from "../reply-payload.js"; +import { createBlockReplyPipeline } from "./block-reply-pipeline.js"; + +function blockFor(text: string, assistantMessageIndex: number) { + return setReplyPayloadMetadata({ text }, { assistantMessageIndex }); +} + +describe("block reply pipeline multi-assistant-message suppression", () => { + it("recognizes each fully-streamed message across a multi-message turn", async () => { + const sent: string[] = []; + const pipeline = createBlockReplyPipeline({ + onBlockReply: async (payload) => { + if (payload.text) { + sent.push(payload.text); + } + }, + timeoutMs: 5000, + }); + + pipeline.enqueue(blockFor("Alpha one.", 0)); + pipeline.enqueue(blockFor("Alpha two.", 0)); + pipeline.enqueue(blockFor("Beta one.", 1)); + pipeline.enqueue(blockFor("Beta two.", 1)); + await pipeline.flush({ force: true }); + + expect(sent).toEqual(["Alpha one.", "Alpha two.", "Beta one.", "Beta two."]); + expect(pipeline.hasSentPayload({ text: "Alpha one. Alpha two." })).toBe(true); + expect(pipeline.hasSentPayload({ text: "Beta one. Beta two." })).toBe(true); + }); + + it("does not treat one message as covering another message's text", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + pipeline.enqueue(blockFor("Alpha one.", 0)); + pipeline.enqueue(blockFor("Alpha two.", 0)); + pipeline.enqueue(blockFor("Beta one.", 1)); + pipeline.enqueue(blockFor("Beta two.", 1)); + await pipeline.flush({ force: true }); + + expect(pipeline.hasSentPayload({ text: "Alpha one. Alpha two. Beta one. Beta two." })).toBe( + false, + ); + }); + + it("suppresses a single message split into multiple blocks", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + pipeline.enqueue(blockFor("Gamma one.", 0)); + pipeline.enqueue(blockFor("Gamma two.", 0)); + await pipeline.flush({ force: true }); + + expect(pipeline.hasSentPayload({ text: "Gamma one. Gamma two." })).toBe(true); + }); +}); diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index 08e8ed01833..014f622bda1 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -120,7 +120,7 @@ export function createBlockReplyPipeline(params: { const bufferedKeys = new Set(); const bufferedPayloadKeys = new Set(); const bufferedPayloads: ReplyPayload[] = []; - const streamedTextFragments: string[] = []; + const streamedTextFragmentsByMessage = new Map(); let bufferedAssistantMessageIndex: number | undefined; let sendChain: Promise = Promise.resolve(); let aborted = false; @@ -186,7 +186,10 @@ export function createBlockReplyPipeline(params: { sentMediaUrls.add(mediaUrl); } if (!isStatusNotice && reply.trimmedText) { - streamedTextFragments.push(reply.trimmedText); + const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex; + const fragments = streamedTextFragmentsByMessage.get(assistantMessageIndex) ?? []; + fragments.push(reply.trimmedText); + streamedTextFragmentsByMessage.set(assistantMessageIndex, fragments); } if (!isStatusNotice) { didStream = true; @@ -328,7 +331,7 @@ export function createBlockReplyPipeline(params: { if (sentContentKeys.has(payloadKey)) { return true; } - if (!didStream || streamedTextFragments.length === 0) { + if (!didStream) { return false; } const reply = resolveSendableOutboundReplyParts(payload); @@ -336,7 +339,13 @@ export function createBlockReplyPipeline(params: { return false; } const normalize = (text: string) => text.replace(/\s+/g, ""); - return normalize(streamedTextFragments.join("")) === normalize(reply.trimmedText); + const target = normalize(reply.trimmedText); + for (const fragments of streamedTextFragmentsByMessage.values()) { + if (fragments.length > 0 && normalize(fragments.join("")) === target) { + return true; + } + } + return false; }, getSentMediaUrls: () => Array.from(sentMediaUrls), };