From 14b5f73e2a5e57925e53465d6b0bee856697f6dd Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 4 May 2026 22:14:06 +0100 Subject: [PATCH] fix(agents): avoid duplicate generated media attachments --- CHANGELOG.md | 1 + ...session.subscribeembeddedpisession.test.ts | 175 ++++++++++++++++++ src/agents/pi-embedded-subscribe.ts | 10 +- 3 files changed, 184 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6c48d21ac4..6ffb47b09f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ Docs: https://docs.openclaw.ai - Browser: enforce strict SSRF current-URL checks before existing-session screenshots, matching existing-session snapshot handling. Thanks @vincentkoc. - Active Memory: give timeout partial transcript recovery enough abort-settle headroom so temporary recall summaries are returned before cleanup. Thanks @vincentkoc. - Gateway/chat: clear the active reply-run guard before draining queued same-session follow-up turns, so sequential `chat.send` calls no longer trip `ReplyRunAlreadyActiveError` every other request. Fixes #77485. Thanks @bws14email. +- Agents/media: avoid sending generated image, video, and music attachments twice when streamed reply text arrives before the final `MEDIA:` directive. - Doctor/config: restore legacy group chat config migrations for `routing.allowFrom`, `routing.groupChat.*`, and `channels.telegram.requireMention` so upgrades keep WhatsApp, Telegram, and iMessage group mention gates and history settings instead of leaving configs invalid or silently blocked. Thanks @scoootscooob. - CLI/update: make package-update follow-up processes write completion results and exit explicitly, so Windows packaged upgrades do not hang after the new package finishes post-core plugin work. Thanks @vincentkoc. - Release validation: skip Slack live QA unless Slack credentials are explicitly configured, so release gates can keep proving non-Slack surfaces while Slack is still local and credential-gated. Thanks @vincentkoc. diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts index 8db3e5353e6..0586edd1055 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.subscribeembeddedpisession.test.ts @@ -424,6 +424,83 @@ describe("subscribeEmbeddedPiSession", () => { ); }); + it("does not attach generated image media to an early streamed chunk before explicit MEDIA", async () => { + const onToolResult = vi.fn(); + const onBlockReply = vi.fn(); + const { emit } = createSubscribedHarness({ + runId: "run", + onToolResult, + onBlockReply, + verboseLevel: "full", + blockReplyBreak: "text_end", + blockReplyChunking: { minChars: 5, maxChars: 200, breakPreference: "newline" }, + builtinToolNames: new Set(["image_generate"]), + }); + + emitToolRun({ + emit, + toolName: "image_generate", + toolCallId: "tool-1", + isError: false, + result: { + content: [ + { + type: "text", + text: "Generated 1 image with google/gemini-3.1-flash-image-preview.\nMEDIA:/tmp/generated.png", + }, + ], + details: { + media: { + mediaUrls: ["/tmp/generated.png"], + }, + }, + }, + }); + + await vi.waitFor(() => { + expect(onToolResult).toHaveBeenCalled(); + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta(emit, "Generated 1 image.\n"); + + expect(onBlockReply).toHaveBeenCalledWith( + expect.objectContaining({ + text: "Generated 1 image.", + }), + ); + expect(onBlockReply.mock.calls.some(([payload]) => payload.mediaUrls?.length)).toBe(false); + + emitAssistantTextDelta(emit, "MEDIA:/tmp/generated.png"); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + content: "Generated 1 image.\nMEDIA:/tmp/generated.png", + }, + }); + emit({ + type: "message_end", + message: { + role: "assistant", + content: [ + { + type: "text", + text: "Generated 1 image.\nMEDIA:/tmp/generated.png", + }, + ], + }, + }); + emit({ type: "agent_end" }); + await flushBlockReplyCallbacks(); + + const mediaPayloads = onBlockReply.mock.calls + .map(([payload]) => payload) + .filter((payload) => payload.mediaUrls?.includes("/tmp/generated.png")); + expect(mediaPayloads).toHaveLength(1); + }); + it("attaches media from internal completion events even when assistant omits MEDIA lines", async () => { const onBlockReply = vi.fn(); const { emit } = createSubscribedHarness({ @@ -469,6 +546,104 @@ describe("subscribeEmbeddedPiSession", () => { ); }); + it.each([ + { + label: "music", + source: "music_generation" as const, + childSessionKey: "music_generate:task-123", + announceType: "music generation task", + taskLabel: "launch anthem", + result: "Generated 1 track.\nMEDIA:/tmp/launch-anthem.mp3", + mediaUrl: "/tmp/launch-anthem.mp3", + firstChunk: "Generated 1 track.\n", + finalText: "Generated 1 track.\nMEDIA:/tmp/launch-anthem.mp3", + }, + { + label: "video", + source: "video_generation" as const, + childSessionKey: "video_generate:task-123", + announceType: "video generation task", + taskLabel: "launch reel", + result: "Generated 1 video.\nMEDIA:/tmp/launch-reel.mp4", + mediaUrl: "/tmp/launch-reel.mp4", + firstChunk: "Generated 1 video.\n", + finalText: "Generated 1 video.\nMEDIA:/tmp/launch-reel.mp4", + }, + ])( + "does not attach $label internal completion media to an early streamed chunk before explicit MEDIA", + async ({ + source, + childSessionKey, + announceType, + taskLabel, + result, + mediaUrl, + firstChunk, + finalText, + }) => { + const onBlockReply = vi.fn(); + const { emit } = createSubscribedHarness({ + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { minChars: 5, maxChars: 200, breakPreference: "newline" }, + internalEvents: [ + { + type: "task_completion", + source, + childSessionKey, + announceType, + taskLabel, + status: "ok", + statusLabel: "completed successfully", + result, + mediaUrls: [mediaUrl], + replyInstruction: "Reply normally.", + }, + ], + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta(emit, firstChunk); + + expect(onBlockReply).toHaveBeenCalledWith( + expect.objectContaining({ + text: firstChunk.trim(), + }), + ); + expect(onBlockReply.mock.calls.some(([payload]) => payload.mediaUrls?.length)).toBe(false); + + emitAssistantTextDelta(emit, `MEDIA:${mediaUrl}`); + emit({ + type: "message_update", + message: { role: "assistant" }, + assistantMessageEvent: { + type: "text_end", + content: finalText, + }, + }); + emit({ + type: "message_end", + message: { + role: "assistant", + content: [ + { + type: "text", + text: finalText, + }, + ], + }, + }); + emit({ type: "agent_end" }); + await flushBlockReplyCallbacks(); + + const mediaPayloads = onBlockReply.mock.calls + .map(([payload]) => payload) + .filter((payload) => payload.mediaUrls?.includes(mediaUrl)); + expect(mediaPayloads).toHaveLength(1); + }, + ); + it("keeps orphaned tool media available for non-block final payload assembly", () => { const { emit, subscription } = createSubscribedSessionHarness({ runId: "run", diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 72d7384ee95..b824aa2a686 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -239,10 +239,14 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar }; const emitBlockReply = ( payload: BlockReplyPayload, - options?: { assistantMessageIndex?: number }, + options?: { assistantMessageIndex?: number; consumePendingToolMedia?: boolean }, ) => { const withAssistantDirectives = consumePendingAssistantReplyDirectivesIntoReply(state, payload); - emitBlockReplySafely(consumePendingToolMediaIntoReply(state, withAssistantDirectives), options); + const withToolMedia = + options?.consumePendingToolMedia === false + ? withAssistantDirectives + : consumePendingToolMediaIntoReply(state, withAssistantDirectives); + emitBlockReplySafely(withToolMedia, options); }; const resetAssistantMessageState = (nextAssistantTextBaseline: number) => { @@ -761,6 +765,8 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar }, { assistantMessageIndex: options?.assistantMessageIndex ?? state.assistantMessageIndex, + consumePendingToolMedia: + options?.final === true || Boolean(mediaUrls?.length || audioAsVoice), }, ); };