fix(reply): dedupe block-streamed media

This commit is contained in:
Peter Steinberger
2026-04-25 05:35:14 +01:00
parent 98a99765af
commit 04c5bbf33d
6 changed files with 131 additions and 4 deletions

View File

@@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai
- Config/recovery: skip whole-file last-known-good rollback when invalidity is scoped to `plugins.entries.*`, preserving unrelated user settings during plugin schema or host-version skew. Fixes #71289. Thanks @jalehman.
- Agents/tools: keep resolved reply-run configs from being overwritten by stale runtime snapshots, and let empty web runtime metadata fall back to configured provider auto-detection so standard and queued turns expose the same tool set. Fixes #71355. Thanks @c-g14.
- Agents/TTS: pass the resolved shared config into the `tts` tool, so tool-triggered speech uses configured providers and voices instead of falling back to a fresh config load.
- Reply media: strip `MEDIA:` attachments from final replies when the same media already went out through block streaming, preventing duplicate Telegram voice notes and files. Fixes #65468. Thanks @aurora-openclaw.
- Agents/TTS: preserve voice media when a tool-generated reply is paired with an exact `NO_REPLY` sentinel, stripping the sentinel text instead of dropping the audio payload. Fixes #66092.
- Compaction: honor explicit `agents.defaults.compaction.keepRecentTokens` for manual `/compact`, re-distill safeguard summaries instead of snowballing previous summaries, and enable safeguard summary quality checks by default. Fixes #71357. Thanks @WhiteGiverMa.
- Sessions: honor configured `session.maintenance` settings during load-time maintenance instead of falling back to default entry caps. Fixes #71356. Thanks @comolago.

View File

@@ -691,6 +691,7 @@ describe("runAgentTurnWithFallback", () => {
didStream: vi.fn(() => false),
isAborted: vi.fn(() => false),
hasSentPayload: vi.fn(() => false),
getSentMediaUrls: vi.fn(() => []),
};
state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => {
const result = { payloads: [], meta: {} };

View File

@@ -201,6 +201,77 @@ describe("buildReplyPayloads media filter integration", () => {
await expectSameTargetRepliesSuppressed({ provider: "lark", to: "ou_abc123" });
});
it("strips media already sent by the block pipeline after normalizing both paths", async () => {
const normalizeMediaPaths = async (payload: { mediaUrl?: string; mediaUrls?: string[] }) => {
const rewrite = (value?: string) =>
value === "file:///tmp/voice.ogg" ? "file:///tmp/outbound/voice.ogg" : value;
return {
...payload,
mediaUrl: rewrite(payload.mediaUrl),
mediaUrls: payload.mediaUrls?.map((value) => rewrite(value) ?? value),
};
};
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
didStream: () => false,
isAborted: () => false,
hasSentPayload: () => false,
enqueue: () => {},
flush: async () => {},
stop: () => {},
hasBuffered: () => false,
getSentMediaUrls: () => ["file:///tmp/voice.ogg"],
};
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
blockStreamingEnabled: true,
blockReplyPipeline: pipeline,
normalizeMediaPaths,
payloads: [{ text: "caption", mediaUrl: "file:///tmp/voice.ogg" }],
});
expect(replyPayloads).toHaveLength(1);
expect(replyPayloads[0]).toMatchObject({
text: "caption",
mediaUrl: undefined,
mediaUrls: undefined,
});
});
it("suppresses already-sent text plus media before stripping block-sent media", async () => {
const sentKey = JSON.stringify({
text: "caption",
mediaList: ["file:///tmp/outbound/voice.ogg"],
});
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
didStream: () => false,
isAborted: () => false,
hasSentPayload: (payload) =>
JSON.stringify({
text: (payload.text ?? "").trim(),
mediaList: [
...(payload.mediaUrl ? [payload.mediaUrl] : []),
...(payload.mediaUrls ?? []),
],
}) === sentKey,
enqueue: () => {},
flush: async () => {},
stop: () => {},
hasBuffered: () => false,
getSentMediaUrls: () => ["file:///tmp/outbound/voice.ogg"],
};
const { replyPayloads } = await buildReplyPayloads({
...baseParams,
blockStreamingEnabled: true,
blockReplyPipeline: pipeline,
normalizeMediaPaths: async (payload) => payload,
payloads: [{ text: "caption", mediaUrl: "file:///tmp/outbound/voice.ogg" }],
});
expect(replyPayloads).toHaveLength(0);
});
it("drops all final payloads when block pipeline streamed successfully", async () => {
const pipeline: Parameters<typeof buildReplyPayloads>[0]["blockReplyPipeline"] = {
didStream: () => true,
@@ -210,6 +281,7 @@ describe("buildReplyPayloads media filter integration", () => {
flush: async () => {},
stop: () => {},
hasBuffered: () => false,
getSentMediaUrls: () => [],
};
// shouldDropFinalPayloads short-circuits to [] when the pipeline streamed
// without aborting, so hasSentPayload is never reached.
@@ -233,6 +305,7 @@ describe("buildReplyPayloads media filter integration", () => {
flush: async () => {},
stop: () => {},
hasBuffered: () => false,
getSentMediaUrls: () => [],
};
const { replyPayloads } = await buildReplyPayloads({

View File

@@ -47,11 +47,11 @@ async function normalizeReplyPayloadMedia(params: {
}
async function normalizeSentMediaUrlsForDedupe(params: {
sentMediaUrls: string[];
sentMediaUrls: readonly string[];
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
}): Promise<string[]> {
if (params.sentMediaUrls.length === 0 || !params.normalizeMediaPaths) {
return params.sentMediaUrls;
return [...params.sentMediaUrls];
}
const normalizedUrls: string[] = [];
@@ -222,8 +222,7 @@ export async function buildReplyPayloads(params: {
: mediaFilteredPayloads;
const isDirectlySentBlockPayload = (payload: ReplyPayload) =>
Boolean(params.directlySentBlockKeys?.has(createBlockReplyContentKey(payload)));
// Filter out payloads already sent via pipeline or directly during tool flush.
const filteredPayloads = shouldDropFinalPayloads
const contentSuppressedPayloads = shouldDropFinalPayloads
? dedupedPayloads.filter((payload) => payload.isError)
: params.blockStreamingEnabled
? dedupedPayloads.filter(
@@ -236,6 +235,21 @@ export async function buildReplyPayloads(params: {
(payload) => !params.directlySentBlockKeys!.has(createBlockReplyContentKey(payload)),
)
: dedupedPayloads;
const blockSentMediaUrls = params.blockStreamingEnabled
? await normalizeSentMediaUrlsForDedupe({
sentMediaUrls: params.blockReplyPipeline?.getSentMediaUrls() ?? [],
normalizeMediaPaths: params.normalizeMediaPaths,
})
: [];
const filteredPayloads =
blockSentMediaUrls.length > 0
? (
dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime())
).filterMessagingToolMediaDuplicates({
payloads: contentSuppressedPayloads,
sentMediaUrls: blockSentMediaUrls,
})
: contentSuppressedPayloads;
const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads;
return {

View File

@@ -78,6 +78,38 @@ describe("createBlockReplyPipeline dedup with threading", () => {
expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true);
});
it("tracks media URLs delivered via block replies", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
expect(pipeline.getSentMediaUrls()).toEqual([]);
pipeline.enqueue({ text: "caption", mediaUrl: "file:///a.ogg" });
pipeline.enqueue({ mediaUrls: ["file:///b.ogg", "file:///c.ogg"] });
await pipeline.flush({ force: true });
expect(pipeline.getSentMediaUrls()).toEqual([
"file:///a.ogg",
"file:///b.ogg",
"file:///c.ogg",
]);
});
it("does not track media when text-only blocks are delivered", async () => {
const pipeline = createBlockReplyPipeline({
onBlockReply: async () => {},
timeoutMs: 5000,
});
pipeline.enqueue({ text: "hello" });
pipeline.enqueue({ text: "world" });
await pipeline.flush({ force: true });
expect(pipeline.getSentMediaUrls()).toEqual([]);
});
it("does not coalesce logical assistant blocks across assistantMessageIndex boundaries", async () => {
const sent: string[] = [];
const pipeline = createBlockReplyPipeline({

View File

@@ -13,6 +13,7 @@ export type BlockReplyPipeline = {
didStream: () => boolean;
isAborted: () => boolean;
hasSentPayload: (payload: ReplyPayload) => boolean;
getSentMediaUrls: () => readonly string[];
};
export type BlockReplyBuffer = {
@@ -86,6 +87,7 @@ export function createBlockReplyPipeline(params: {
const { onBlockReply, timeoutMs, coalescing, buffer } = params;
const sentKeys = new Set<string>();
const sentContentKeys = new Set<string>();
const sentMediaUrls = new Set<string>();
const pendingKeys = new Set<string>();
const seenKeys = new Set<string>();
const bufferedKeys = new Set<string>();
@@ -149,6 +151,9 @@ export function createBlockReplyPipeline(params: {
sentKeys.add(payloadKey);
sentContentKeys.add(contentKey);
const reply = resolveSendableOutboundReplyParts(payload);
for (const mediaUrl of reply.mediaUrls) {
sentMediaUrls.add(mediaUrl);
}
if (!reply.hasMedia && reply.trimmedText) {
streamedTextFragments.push(reply.trimmedText);
}
@@ -284,5 +289,6 @@ export function createBlockReplyPipeline(params: {
const normalize = (text: string) => text.replace(/\s+/g, "");
return normalize(streamedTextFragments.join("")) === normalize(reply.trimmedText);
},
getSentMediaUrls: () => Array.from(sentMediaUrls),
};
}