fix(auto-reply): require durable restart recovery

This commit is contained in:
Vincent Koc
2026-06-23 16:45:11 +08:00
committed by GitHub
parent 450630c307
commit edb75a944f
2 changed files with 74 additions and 4 deletions

View File

@@ -0,0 +1,61 @@
import { describe, expect, it } from "vitest";
import { setReplyPayloadMetadata } from "../reply-payload.js";
import { createBlockReplyPipeline } from "./block-reply-pipeline.js";
function blockFor(text: string, assistantMessageIndex: number) {
return setReplyPayloadMetadata({ text }, { assistantMessageIndex });
}
describe("block reply pipeline multi-assistant-message suppression", () => {
it("recognizes each fully-streamed message across a multi-message turn", async () => {
const sent: string[] = [];
const pipeline = createBlockReplyPipeline({
onBlockReply: async (payload) => {
if (payload.text) {
sent.push(payload.text);
}
},
timeoutMs: 5000,
});
pipeline.enqueue(blockFor("Alpha one.", 0));
pipeline.enqueue(blockFor("Alpha two.", 0));
pipeline.enqueue(blockFor("Beta one.", 1));
pipeline.enqueue(blockFor("Beta two.", 1));
await pipeline.flush({ force: true });
expect(sent).toEqual(["Alpha one.", "Alpha two.", "Beta one.", "Beta two."]);
expect(pipeline.hasSentPayload({ text: "Alpha one. Alpha two." })).toBe(true);
expect(pipeline.hasSentPayload({ text: "Beta one. Beta two." })).toBe(true);
});
it("does not treat one message as covering another message's text", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
pipeline.enqueue(blockFor("Alpha one.", 0));
pipeline.enqueue(blockFor("Alpha two.", 0));
pipeline.enqueue(blockFor("Beta one.", 1));
pipeline.enqueue(blockFor("Beta two.", 1));
await pipeline.flush({ force: true });
expect(pipeline.hasSentPayload({ text: "Alpha one. Alpha two. Beta one. Beta two." })).toBe(
false,
);
});
it("suppresses a single message split into multiple blocks", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
pipeline.enqueue(blockFor("Gamma one.", 0));
pipeline.enqueue(blockFor("Gamma two.", 0));
await pipeline.flush({ force: true });
expect(pipeline.hasSentPayload({ text: "Gamma one. Gamma two." })).toBe(true);
});
});

View File

@@ -120,7 +120,7 @@ export function createBlockReplyPipeline(params: {
const bufferedKeys = new Set<string>();
const bufferedPayloadKeys = new Set<string>();
const bufferedPayloads: ReplyPayload[] = [];
const streamedTextFragments: string[] = [];
const streamedTextFragmentsByMessage = new Map<number | undefined, string[]>();
let bufferedAssistantMessageIndex: number | undefined;
let sendChain: Promise<void> = Promise.resolve();
let aborted = false;
@@ -186,7 +186,10 @@ export function createBlockReplyPipeline(params: {
sentMediaUrls.add(mediaUrl);
}
if (!isStatusNotice && reply.trimmedText) {
streamedTextFragments.push(reply.trimmedText);
const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex;
const fragments = streamedTextFragmentsByMessage.get(assistantMessageIndex) ?? [];
fragments.push(reply.trimmedText);
streamedTextFragmentsByMessage.set(assistantMessageIndex, fragments);
}
if (!isStatusNotice) {
didStream = true;
@@ -328,7 +331,7 @@ export function createBlockReplyPipeline(params: {
if (sentContentKeys.has(payloadKey)) {
return true;
}
if (!didStream || streamedTextFragments.length === 0) {
if (!didStream) {
return false;
}
const reply = resolveSendableOutboundReplyParts(payload);
@@ -336,7 +339,13 @@ export function createBlockReplyPipeline(params: {
return false;
}
const normalize = (text: string) => text.replace(/\s+/g, "");
return normalize(streamedTextFragments.join("")) === normalize(reply.trimmedText);
const target = normalize(reply.trimmedText);
for (const fragments of streamedTextFragmentsByMessage.values()) {
if (fragments.length > 0 && normalize(fragments.join("")) === target) {
return true;
}
}
return false;
},
getSentMediaUrls: () => Array.from(sentMediaUrls),
};