diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ea86670b58..42105b9aebc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ Docs: https://docs.openclaw.ai - Config/recovery: skip whole-file last-known-good rollback when invalidity is scoped to `plugins.entries.*`, preserving unrelated user settings during plugin schema or host-version skew. Fixes #71289. Thanks @jalehman. - Agents/tools: keep resolved reply-run configs from being overwritten by stale runtime snapshots, and let empty web runtime metadata fall back to configured provider auto-detection so standard and queued turns expose the same tool set. Fixes #71355. Thanks @c-g14. - Agents/TTS: pass the resolved shared config into the `tts` tool, so tool-triggered speech uses configured providers and voices instead of falling back to a fresh config load. +- Reply media: strip `MEDIA:` attachments from final replies when the same media already went out through block streaming, preventing duplicate Telegram voice notes and files. Fixes #65468. Thanks @aurora-openclaw. - Agents/TTS: preserve voice media when a tool-generated reply is paired with an exact `NO_REPLY` sentinel, stripping the sentinel text instead of dropping the audio payload. Fixes #66092. - Compaction: honor explicit `agents.defaults.compaction.keepRecentTokens` for manual `/compact`, re-distill safeguard summaries instead of snowballing previous summaries, and enable safeguard summary quality checks by default. Fixes #71357. Thanks @WhiteGiverMa. - Sessions: honor configured `session.maintenance` settings during load-time maintenance instead of falling back to default entry caps. Fixes #71356. Thanks @comolago. diff --git a/src/auto-reply/reply/agent-runner-execution.test.ts b/src/auto-reply/reply/agent-runner-execution.test.ts index e664e205997..8ef3fa8bdf9 100644 --- a/src/auto-reply/reply/agent-runner-execution.test.ts +++ b/src/auto-reply/reply/agent-runner-execution.test.ts @@ -691,6 +691,7 @@ describe("runAgentTurnWithFallback", () => { didStream: vi.fn(() => false), isAborted: vi.fn(() => false), hasSentPayload: vi.fn(() => false), + getSentMediaUrls: vi.fn(() => []), }; state.runWithModelFallbackMock.mockImplementationOnce(async (params: FallbackRunnerParams) => { const result = { payloads: [], meta: {} }; diff --git a/src/auto-reply/reply/agent-runner-payloads.test.ts b/src/auto-reply/reply/agent-runner-payloads.test.ts index 79cd98d6bea..c29fa559ddd 100644 --- a/src/auto-reply/reply/agent-runner-payloads.test.ts +++ b/src/auto-reply/reply/agent-runner-payloads.test.ts @@ -201,6 +201,77 @@ describe("buildReplyPayloads media filter integration", () => { await expectSameTargetRepliesSuppressed({ provider: "lark", to: "ou_abc123" }); }); + it("strips media already sent by the block pipeline after normalizing both paths", async () => { + const normalizeMediaPaths = async (payload: { mediaUrl?: string; mediaUrls?: string[] }) => { + const rewrite = (value?: string) => + value === "file:///tmp/voice.ogg" ? "file:///tmp/outbound/voice.ogg" : value; + return { + ...payload, + mediaUrl: rewrite(payload.mediaUrl), + mediaUrls: payload.mediaUrls?.map((value) => rewrite(value) ?? value), + }; + }; + const pipeline: Parameters[0]["blockReplyPipeline"] = { + didStream: () => false, + isAborted: () => false, + hasSentPayload: () => false, + enqueue: () => {}, + flush: async () => {}, + stop: () => {}, + hasBuffered: () => false, + getSentMediaUrls: () => ["file:///tmp/voice.ogg"], + }; + + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + blockStreamingEnabled: true, + blockReplyPipeline: pipeline, + normalizeMediaPaths, + payloads: [{ text: "caption", mediaUrl: "file:///tmp/voice.ogg" }], + }); + + expect(replyPayloads).toHaveLength(1); + expect(replyPayloads[0]).toMatchObject({ + text: "caption", + mediaUrl: undefined, + mediaUrls: undefined, + }); + }); + + it("suppresses already-sent text plus media before stripping block-sent media", async () => { + const sentKey = JSON.stringify({ + text: "caption", + mediaList: ["file:///tmp/outbound/voice.ogg"], + }); + const pipeline: Parameters[0]["blockReplyPipeline"] = { + didStream: () => false, + isAborted: () => false, + hasSentPayload: (payload) => + JSON.stringify({ + text: (payload.text ?? "").trim(), + mediaList: [ + ...(payload.mediaUrl ? [payload.mediaUrl] : []), + ...(payload.mediaUrls ?? []), + ], + }) === sentKey, + enqueue: () => {}, + flush: async () => {}, + stop: () => {}, + hasBuffered: () => false, + getSentMediaUrls: () => ["file:///tmp/outbound/voice.ogg"], + }; + + const { replyPayloads } = await buildReplyPayloads({ + ...baseParams, + blockStreamingEnabled: true, + blockReplyPipeline: pipeline, + normalizeMediaPaths: async (payload) => payload, + payloads: [{ text: "caption", mediaUrl: "file:///tmp/outbound/voice.ogg" }], + }); + + expect(replyPayloads).toHaveLength(0); + }); + it("drops all final payloads when block pipeline streamed successfully", async () => { const pipeline: Parameters[0]["blockReplyPipeline"] = { didStream: () => true, @@ -210,6 +281,7 @@ describe("buildReplyPayloads media filter integration", () => { flush: async () => {}, stop: () => {}, hasBuffered: () => false, + getSentMediaUrls: () => [], }; // shouldDropFinalPayloads short-circuits to [] when the pipeline streamed // without aborting, so hasSentPayload is never reached. @@ -233,6 +305,7 @@ describe("buildReplyPayloads media filter integration", () => { flush: async () => {}, stop: () => {}, hasBuffered: () => false, + getSentMediaUrls: () => [], }; const { replyPayloads } = await buildReplyPayloads({ diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index e1777261af3..1f6e7d2e000 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -47,11 +47,11 @@ async function normalizeReplyPayloadMedia(params: { } async function normalizeSentMediaUrlsForDedupe(params: { - sentMediaUrls: string[]; + sentMediaUrls: readonly string[]; normalizeMediaPaths?: (payload: ReplyPayload) => Promise; }): Promise { if (params.sentMediaUrls.length === 0 || !params.normalizeMediaPaths) { - return params.sentMediaUrls; + return [...params.sentMediaUrls]; } const normalizedUrls: string[] = []; @@ -222,8 +222,7 @@ export async function buildReplyPayloads(params: { : mediaFilteredPayloads; const isDirectlySentBlockPayload = (payload: ReplyPayload) => Boolean(params.directlySentBlockKeys?.has(createBlockReplyContentKey(payload))); - // Filter out payloads already sent via pipeline or directly during tool flush. - const filteredPayloads = shouldDropFinalPayloads + const contentSuppressedPayloads = shouldDropFinalPayloads ? dedupedPayloads.filter((payload) => payload.isError) : params.blockStreamingEnabled ? dedupedPayloads.filter( @@ -236,6 +235,21 @@ export async function buildReplyPayloads(params: { (payload) => !params.directlySentBlockKeys!.has(createBlockReplyContentKey(payload)), ) : dedupedPayloads; + const blockSentMediaUrls = params.blockStreamingEnabled + ? await normalizeSentMediaUrlsForDedupe({ + sentMediaUrls: params.blockReplyPipeline?.getSentMediaUrls() ?? [], + normalizeMediaPaths: params.normalizeMediaPaths, + }) + : []; + const filteredPayloads = + blockSentMediaUrls.length > 0 + ? ( + dedupeRuntime ?? (await loadReplyPayloadsDedupeRuntime()) + ).filterMessagingToolMediaDuplicates({ + payloads: contentSuppressedPayloads, + sentMediaUrls: blockSentMediaUrls, + }) + : contentSuppressedPayloads; const replyPayloads = suppressMessagingToolReplies ? [] : filteredPayloads; return { diff --git a/src/auto-reply/reply/block-reply-pipeline.test.ts b/src/auto-reply/reply/block-reply-pipeline.test.ts index a7bc76b58b1..72f2d48cfa8 100644 --- a/src/auto-reply/reply/block-reply-pipeline.test.ts +++ b/src/auto-reply/reply/block-reply-pipeline.test.ts @@ -78,6 +78,38 @@ describe("createBlockReplyPipeline dedup with threading", () => { expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true); }); + it("tracks media URLs delivered via block replies", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + expect(pipeline.getSentMediaUrls()).toEqual([]); + + pipeline.enqueue({ text: "caption", mediaUrl: "file:///a.ogg" }); + pipeline.enqueue({ mediaUrls: ["file:///b.ogg", "file:///c.ogg"] }); + await pipeline.flush({ force: true }); + + expect(pipeline.getSentMediaUrls()).toEqual([ + "file:///a.ogg", + "file:///b.ogg", + "file:///c.ogg", + ]); + }); + + it("does not track media when text-only blocks are delivered", async () => { + const pipeline = createBlockReplyPipeline({ + onBlockReply: async () => {}, + timeoutMs: 5000, + }); + + pipeline.enqueue({ text: "hello" }); + pipeline.enqueue({ text: "world" }); + await pipeline.flush({ force: true }); + + expect(pipeline.getSentMediaUrls()).toEqual([]); + }); + it("does not coalesce logical assistant blocks across assistantMessageIndex boundaries", async () => { const sent: string[] = []; const pipeline = createBlockReplyPipeline({ diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index 1898d5d8443..1531826e8cb 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -13,6 +13,7 @@ export type BlockReplyPipeline = { didStream: () => boolean; isAborted: () => boolean; hasSentPayload: (payload: ReplyPayload) => boolean; + getSentMediaUrls: () => readonly string[]; }; export type BlockReplyBuffer = { @@ -86,6 +87,7 @@ export function createBlockReplyPipeline(params: { const { onBlockReply, timeoutMs, coalescing, buffer } = params; const sentKeys = new Set(); const sentContentKeys = new Set(); + const sentMediaUrls = new Set(); const pendingKeys = new Set(); const seenKeys = new Set(); const bufferedKeys = new Set(); @@ -149,6 +151,9 @@ export function createBlockReplyPipeline(params: { sentKeys.add(payloadKey); sentContentKeys.add(contentKey); const reply = resolveSendableOutboundReplyParts(payload); + for (const mediaUrl of reply.mediaUrls) { + sentMediaUrls.add(mediaUrl); + } if (!reply.hasMedia && reply.trimmedText) { streamedTextFragments.push(reply.trimmedText); } @@ -284,5 +289,6 @@ export function createBlockReplyPipeline(params: { const normalize = (text: string) => text.replace(/\s+/g, ""); return normalize(streamedTextFragments.join("")) === normalize(reply.trimmedText); }, + getSentMediaUrls: () => Array.from(sentMediaUrls), }; }