Deduplicate preview-streamed final replies (#82625)

Track the latest partial-preview reply text during reply-agent runs and suppress matching final text-only payloads so Telegram partial streaming does not resend already-previewed blocks when block streaming is disabled.

Keep the dedupe exact-match based to avoid dropping unrelated short finals, preserve errors, and keep unsent media while stripping duplicate caption text.
This commit is contained in:
Gio Della-Libera
2026-05-16 21:24:34 -07:00
committed by GitHub
parent 5c02b72413
commit bd51d8f2dd
4 changed files with 146 additions and 2 deletions

View File

@@ -490,6 +490,63 @@ describe("buildReplyPayloads media filter integration", () => {
expect(replyPayloads).toHaveLength(0);
});
it("suppresses final text payloads already covered by partial preview streaming", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
previewStreamedText: "First block\n\nSecond block",
payloads: [{ text: "First block" }, { text: "Second block" }],
});
expect(replyPayloads).toHaveLength(0);
});
it("keeps final text that was not covered by partial preview streaming", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
previewStreamedText: "Working...",
payloads: [{ text: "Done." }],
});
expect(replyPayloads).toHaveLength(1);
expectFields(replyPayloads[0], { text: "Done." });
});
it("does not suppress short final text just because it appears inside preview text", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
previewStreamedText: "Working on item 3",
payloads: [{ text: "3" }],
});
expect(replyPayloads).toHaveLength(1);
expectFields(replyPayloads[0], { text: "3" });
});
it("preserves media while removing duplicate preview-streamed caption text", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
previewStreamedText: "Here is the chart",
payloads: [{ text: "Here is the chart", mediaUrl: "file:///tmp/chart.png" }],
});
expect(replyPayloads).toHaveLength(1);
expectFields(replyPayloads[0], {
text: undefined,
mediaUrl: "file:///tmp/chart.png",
});
});
it("preserves errors even when their text appears in partial preview streaming", async () => {
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
previewStreamedText: "Agent couldn't generate a response. Please try again.",
payloads: [{ text: "Agent couldn't generate a response. Please try again.", isError: true }],
});
expect(replyPayloads).toHaveLength(1);
expectFields(replyPayloads[0], { isError: true });
});
it("drops all final payloads when block pipeline streamed successfully", async () => {
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
didStream: () => true,

View File

@@ -158,6 +158,7 @@ export async function buildReplyPayloads(params: {
blockStreamingEnabled: boolean;
blockReplyPipeline: BlockReplyPipeline | null;
/** Payload keys sent directly (not via pipeline) during tool flush. */
previewStreamedText?: string;
directlySentBlockKeys?: Set<string>;
replyToMode: ReplyToMode;
replyToChannel?: OriginatingChannelType;
@@ -323,6 +324,54 @@ export async function buildReplyPayloads(params: {
: mediaFilteredPayloads;
const isDirectlySentBlockPayload = (payload: ReplyPayload) =>
Boolean(params.directlySentBlockKeys?.has(createBlockReplyContentKey(payload)));
const normalizePreviewDedupeText = (value: string | undefined): string =>
(value ?? "").replace(/\s+/g, " ").trim();
const buildPreviewDedupeTextSet = (value: string | undefined): Set<string> => {
const dedupeText = new Set<string>();
const normalizedWhole = normalizePreviewDedupeText(value);
if (normalizedWhole) {
dedupeText.add(normalizedWhole);
}
for (const block of (value ?? "").split(/\n{2,}/u)) {
const normalizedBlock = normalizePreviewDedupeText(block);
if (normalizedBlock) {
dedupeText.add(normalizedBlock);
}
}
return dedupeText;
};
const previewStreamedText = buildPreviewDedupeTextSet(params.previewStreamedText);
const isPreviewStreamedTextPayload = (payload: ReplyPayload): boolean => {
if (previewStreamedText.size === 0 || payload.isError) {
return false;
}
const text = normalizePreviewDedupeText(payload.text);
return Boolean(text && previewStreamedText.has(text));
};
const preserveUnsentMediaAfterPreviewStream = (payload: ReplyPayload): ReplyPayload | null => {
if (!isPreviewStreamedTextPayload(payload)) {
return payload;
}
const reply = resolveSendableOutboundReplyParts(payload);
if (!reply.hasMedia) {
return null;
}
return copyReplyPayloadMetadata(payload, {
...payload,
text: undefined,
audioAsVoice: payload.audioAsVoice || undefined,
});
};
const suppressPreviewStreamedPayloads = (payloads: ReplyPayload[]): ReplyPayload[] => {
const unsent: ReplyPayload[] = [];
for (const payload of payloads) {
const next = preserveUnsentMediaAfterPreviewStream(payload);
if (next) {
unsent.push(next);
}
}
return unsent;
};
const preserveUnsentMediaAfterBlockStream = (payload: ReplyPayload): ReplyPayload | null => {
if (payload.isError || payload.isFallbackNotice) {
return payload;
@@ -383,7 +432,9 @@ export async function buildReplyPayloads(params: {
}
return unsent;
})()
: dedupedPayloads;
: previewStreamedText.size > 0
? suppressPreviewStreamedPayloads(dedupedPayloads)
: dedupedPayloads;
const blockSentMediaUrls = params.blockStreamingEnabled
? await normalizeSentMediaUrlsForDedupe({
sentMediaUrls: params.blockReplyPipeline?.getSentMediaUrls() ?? [],

View File

@@ -622,6 +622,27 @@ describe("runReplyAgent typing (heartbeat)", () => {
}
});
it("suppresses final text blocks already delivered through partial preview streaming", async () => {
const onPartialReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "First block\n\nSecond block" });
return {
payloads: [{ text: "First block" }, { text: "Second block" }],
meta: {},
};
});
const { run } = createMinimalRun({
opts: { onPartialReply },
typingMode: "message",
});
const result = await run();
expect(onPartialReply).toHaveBeenCalledWith({ text: "First block\n\nSecond block" });
expect(result).toBeUndefined();
});
it("suppresses narrated silent-turn partials, block replies, and final payloads", async () => {
const onPartialReply = vi.fn();
const onBlockReply = vi.fn();

View File

@@ -1277,6 +1277,20 @@ export async function runReplyAgent(params: {
: null;
const replySessionKey = sessionKey ?? followupRun.run.sessionKey;
let latestPreviewStreamedText: string | undefined;
const effectiveOpts = opts?.onPartialReply
? {
...opts,
onPartialReply: async (
payload: Parameters<NonNullable<GetReplyOptions["onPartialReply"]>>[0],
) => {
if (typeof payload.text === "string" && payload.text.trim()) {
latestPreviewStreamedText = payload.text;
}
await opts.onPartialReply?.(payload);
},
}
: opts;
let replyOperation: ReplyOperation;
try {
replyOperation =
@@ -1459,7 +1473,7 @@ export async function runReplyAgent(params: {
sessionCtx,
replyThreading: replyThreadingOverride ?? sessionCtx.ReplyThreading,
replyOperation,
opts,
opts: effectiveOpts,
typingSignals,
blockReplyPipeline,
blockStreamingEnabled,
@@ -1737,6 +1751,7 @@ export async function runReplyAgent(params: {
silentExpected: followupRun.run.silentExpected,
blockStreamingEnabled,
blockReplyPipeline,
previewStreamedText: latestPreviewStreamedText,
directlySentBlockKeys,
replyToMode,
replyToChannel,