From 8db4bb758353911f51b920d856dcdf4e31ce7522 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Tue, 14 Apr 2026 23:44:04 -0400 Subject: [PATCH] Reply: preserve phased block metadata --- ...bedded-subscribe.handlers.messages.test.ts | 56 ++++++ ...pi-embedded-subscribe.handlers.messages.ts | 161 +++++++++++++----- .../pi-embedded-subscribe.handlers.types.ts | 1 + src/agents/pi-embedded-subscribe.ts | 26 +-- .../reply/block-reply-pipeline.test.ts | 27 ++- src/auto-reply/reply/block-reply-pipeline.ts | 34 +++- src/auto-reply/reply/reply-delivery.test.ts | 38 +++++ src/auto-reply/reply/reply-delivery.ts | 37 +++- 8 files changed, 313 insertions(+), 67 deletions(-) diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index 7075b5f1ad7..1b19786a14a 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -21,6 +21,7 @@ function createMessageUpdateContext( onAgentEvent?: ReturnType; onPartialReply?: ReturnType; flushBlockReplyBuffer?: ReturnType; + resetAssistantMessageState?: ReturnType; debug?: ReturnType; shouldEmitPartialReplies?: boolean; } = {}, @@ -50,6 +51,8 @@ function createMessageUpdateContext( shouldEmitPartialReplies: params.shouldEmitPartialReplies ?? true, blockReplyBreak: "text_end", assistantMessageIndex: 0, + lastAssistantStreamItemId: undefined, + assistantTexts: [], }, log: { debug: params.debug ?? vi.fn() }, noteLastAssistant: vi.fn(), @@ -57,6 +60,7 @@ function createMessageUpdateContext( consumePartialReplyDirectives: vi.fn(() => null), emitReasoningStream: vi.fn(), flushBlockReplyBuffer: params.flushBlockReplyBuffer ?? vi.fn(), + resetAssistantMessageState: params.resetAssistantMessageState ?? vi.fn(), } as unknown as EmbeddedPiSubscribeContext; } @@ -190,6 +194,58 @@ describe("buildAssistantStreamData", () => { }); }); +describe("handleMessageUpdate", () => { + it("treats phased textSignature item changes as assistant-message boundaries", () => { + const flushBlockReplyBuffer = vi.fn(); + const resetAssistantMessageState = vi.fn(); + const onAssistantMessageStart = vi.fn(); + const context = createMessageUpdateContext({ + flushBlockReplyBuffer, + resetAssistantMessageState, + }); + context.params.onAssistantMessageStart = onAssistantMessageStart; + context.state.lastAssistantStreamItemId = "item-1"; + context.state.assistantMessageIndex = 7; + + handleMessageUpdate(context, { + type: "message_update", + message: { role: "assistant", content: [] }, + assistantMessageEvent: { + type: "text_delta", + contentIndex: 1, + delta: "Second block", + partial: { + role: "assistant", + phase: "final_answer", + content: [ + createOpenAiResponsesTextBlock({ + text: "First block", + id: "item-1", + phase: "final_answer", + }), + createOpenAiResponsesTextBlock({ + text: "Second block", + id: "item-2", + phase: "final_answer", + }), + ], + stopReason: "stop", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: {}, + timestamp: 0, + }, + }, + } as never); + + expect(flushBlockReplyBuffer).toHaveBeenCalledWith({ assistantMessageIndex: 7 }); + expect(resetAssistantMessageState).toHaveBeenCalledWith(0); + expect(onAssistantMessageStart).toHaveBeenCalledTimes(1); + expect(context.state.lastAssistantStreamItemId).toBe("item-2"); + }); +}); + describe("consumePendingToolMediaIntoReply", () => { it("attaches queued tool media to the next assistant reply", () => { const state = { diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 8feeebd4b87..15934558cd0 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -6,6 +6,7 @@ import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js"; import { emitAgentEvent } from "../infra/agent-events.js"; import { createInlineCodeState } from "../markdown/code-spans.js"; import { + parseAssistantTextSignature, resolveAssistantMessagePhase, type AssistantPhase, } from "../shared/chat-message-content.js"; @@ -84,6 +85,38 @@ function isTranscriptOnlyOpenClawAssistantMessage(message: AgentMessage | undefi return provider === "openclaw" && (model === "delivery-mirror" || model === "gateway-injected"); } +function resolveAssistantStreamItemId(params: { + contentIndex?: unknown; + message: AgentMessage | undefined; +}): string | undefined { + const content = (params.message as { content?: unknown } | undefined)?.content; + if (!Array.isArray(content)) { + return undefined; + } + const contentIndex = + typeof params.contentIndex === "number" && + Number.isInteger(params.contentIndex) && + params.contentIndex >= 0 + ? params.contentIndex + : undefined; + const candidateBlocks = + contentIndex !== undefined ? [content[contentIndex]] : content.toReversed(); + for (const block of candidateBlocks) { + if (!block || typeof block !== "object") { + continue; + } + const record = block as { type?: unknown; textSignature?: unknown }; + if (record.type !== "text") { + continue; + } + const signature = parseAssistantTextSignature(record.textSignature); + if (signature?.id) { + return signature.id; + } + } + return undefined; +} + function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) { if (!ctx.state.reasoningStreamOpen) { return; @@ -92,6 +125,66 @@ function emitReasoningEnd(ctx: EmbeddedPiSubscribeContext) { void ctx.params.onReasoningEnd?.(); } +function openReasoningStream(ctx: EmbeddedPiSubscribeContext) { + ctx.state.reasoningStreamOpen = true; +} + +function shouldSuppressDeterministicApprovalOutput( + state: Pick< + EmbeddedPiSubscribeState, + "deterministicApprovalPromptPending" | "deterministicApprovalPromptSent" + >, +): boolean { + return state.deterministicApprovalPromptPending || state.deterministicApprovalPromptSent; +} + +function appendBlockReplyChunk(ctx: EmbeddedPiSubscribeContext, chunk: string) { + if (ctx.blockChunker) { + ctx.blockChunker.append(chunk); + return; + } + ctx.state.blockBuffer += chunk; +} + +function replaceBlockReplyBuffer(ctx: EmbeddedPiSubscribeContext, text: string) { + if (ctx.blockChunker) { + ctx.blockChunker.reset(); + ctx.blockChunker.append(text); + return; + } + ctx.state.blockBuffer = text; +} + +function resolveAssistantTextChunk(params: { + evtType: "text_delta" | "text_start" | "text_end"; + delta: string; + content: string; + accumulatedText: string; +}): string { + const { evtType, delta, content, accumulatedText } = params; + if (evtType === "text_delta") { + return delta; + } + if (delta) { + return delta; + } + if (!content) { + return ""; + } + // KNOWN: Some providers resend full content on `text_end`. + // We only append a suffix (or nothing) to keep output monotonic. + if (content.startsWith(accumulatedText)) { + return content.slice(accumulatedText.length); + } + if (accumulatedText.startsWith(content)) { + return ""; + } + if (!accumulatedText.includes(content)) { + return content; + } + return ""; +} + export function resolveSilentReplyFallbackText(params: { text: unknown; messagingToolSentTexts: string[]; @@ -219,8 +312,7 @@ export function handleMessageUpdate( if (suppressVisibleAssistantOutput) { return; } - const suppressDeterministicApprovalOutput = - ctx.state.deterministicApprovalPromptPending || ctx.state.deterministicApprovalPromptSent; + const suppressDeterministicApprovalOutput = shouldSuppressDeterministicApprovalOutput(ctx.state); const assistantEvent = evt.assistantMessageEvent; const assistantPhase = resolveAssistantMessagePhase(msg); @@ -232,7 +324,7 @@ export function handleMessageUpdate( if (evtType === "thinking_start" || evtType === "thinking_delta" || evtType === "thinking_end") { if (evtType === "thinking_start" || evtType === "thinking_delta") { - ctx.state.reasoningStreamOpen = true; + openReasoningStream(ctx); } const thinkingDelta = typeof assistantRecord?.delta === "string" ? assistantRecord.delta : ""; const thinkingContent = @@ -253,7 +345,7 @@ export function handleMessageUpdate( } if (evtType === "thinking_end") { if (!ctx.state.reasoningStreamOpen) { - ctx.state.reasoningStreamOpen = true; + openReasoningStream(ctx); } emitReasoningEnd(ctx); } @@ -277,30 +369,31 @@ export function handleMessageUpdate( content, }); - let chunk = ""; - if (evtType === "text_delta") { - chunk = delta; - } else if (evtType === "text_start" || evtType === "text_end") { - if (delta) { - chunk = delta; - } else if (content) { - // KNOWN: Some providers resend full content on `text_end`. - // We only append a suffix (or nothing) to keep output monotonic. - if (content.startsWith(ctx.state.deltaBuffer)) { - chunk = content.slice(ctx.state.deltaBuffer.length); - } else if (ctx.state.deltaBuffer.startsWith(content)) { - chunk = ""; - } else if (!ctx.state.deltaBuffer.includes(content)) { - chunk = content; - } - } - } + const chunk = resolveAssistantTextChunk({ + evtType, + delta, + content, + accumulatedText: ctx.state.deltaBuffer, + }); const partialAssistant = assistantRecord?.partial && typeof assistantRecord.partial === "object" ? (assistantRecord.partial as AssistantMessage) : msg; const deliveryPhase = resolveAssistantMessagePhase(partialAssistant); + const streamItemId = resolveAssistantStreamItemId({ + contentIndex: assistantRecord?.contentIndex, + message: partialAssistant, + }); + if (deliveryPhase && streamItemId) { + const previousStreamItemId = ctx.state.lastAssistantStreamItemId; + if (previousStreamItemId && previousStreamItemId !== streamItemId) { + void ctx.flushBlockReplyBuffer({ assistantMessageIndex: ctx.state.assistantMessageIndex }); + ctx.resetAssistantMessageState(ctx.state.assistantTexts.length); + void ctx.params.onAssistantMessageStart?.(); + } + ctx.state.lastAssistantStreamItemId = streamItemId; + } if (deliveryPhase === "commentary") { return; } @@ -310,11 +403,7 @@ export function handleMessageUpdate( if (chunk) { ctx.state.deltaBuffer += chunk; if (!shouldUsePhaseAwareBlockReply) { - if (ctx.blockChunker) { - ctx.blockChunker.append(chunk); - } else { - ctx.state.blockBuffer += chunk; - } + appendBlockReplyChunk(ctx, chunk); } } @@ -337,7 +426,7 @@ export function handleMessageUpdate( const wasThinking = ctx.state.partialBlockState.thinking; const visibleDelta = chunk ? ctx.stripBlockTags(chunk, ctx.state.partialBlockState) : ""; if (!wasThinking && ctx.state.partialBlockState.thinking) { - ctx.state.reasoningStreamOpen = true; + openReasoningStream(ctx); } // Detect when thinking block ends ( tag processed) if (wasThinking && !ctx.state.partialBlockState.thinking) { @@ -370,20 +459,11 @@ export function handleMessageUpdate( } const blockReplyChunk = replace ? cleanedText : deltaText; if (blockReplyChunk) { - if (ctx.blockChunker) { - ctx.blockChunker.append(blockReplyChunk); - } else { - ctx.state.blockBuffer += blockReplyChunk; - } + appendBlockReplyChunk(ctx, blockReplyChunk); } if (evtType === "text_end" && !ctx.state.lastBlockReplyText && cleanedText) { - if (ctx.blockChunker) { - ctx.blockChunker.reset(); - ctx.blockChunker.append(cleanedText); - } else { - ctx.state.blockBuffer = cleanedText; - } + replaceBlockReplyBuffer(ctx, cleanedText); } } @@ -455,8 +535,7 @@ export function handleMessageEnd( const assistantMessage = msg; const assistantPhase = resolveAssistantMessagePhase(assistantMessage); const suppressVisibleAssistantOutput = shouldSuppressAssistantVisibleOutput(assistantMessage); - const suppressDeterministicApprovalOutput = - ctx.state.deterministicApprovalPromptPending || ctx.state.deterministicApprovalPromptSent; + const suppressDeterministicApprovalOutput = shouldSuppressDeterministicApprovalOutput(ctx.state); ctx.noteLastAssistant(assistantMessage); ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage); if (suppressVisibleAssistantOutput) { diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index c8aa60c2bc4..60858ee98d3 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -53,6 +53,7 @@ export type EmbeddedPiSubscribeState = { lastBlockReplyText?: string; reasoningStreamOpen: boolean; assistantMessageIndex: number; + lastAssistantStreamItemId?: string; lastAssistantTextMessageIndex: number; lastAssistantTextNormalized?: string; lastAssistantTextTrimmed?: string; diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index c960572d5e1..2de19270ad2 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -97,6 +97,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar lastBlockReplyText: undefined, reasoningStreamOpen: false, assistantMessageIndex: 0, + lastAssistantStreamItemId: undefined, lastAssistantTextMessageIndex: -1, lastAssistantTextNormalized: undefined, lastAssistantTextTrimmed: undefined, @@ -206,6 +207,7 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar state.reasoningStreamOpen = false; state.suppressBlockChunks = false; state.assistantMessageIndex += 1; + state.lastAssistantStreamItemId = undefined; state.lastAssistantTextMessageIndex = -1; state.lastAssistantTextNormalized = undefined; state.lastAssistantTextTrimmed = undefined; @@ -322,26 +324,26 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar ensureCompactionPromise(); }; + const resolveCompactionPromiseIfIdle = () => { + if (state.pendingCompactionRetry !== 0 || state.compactionInFlight) { + return; + } + state.compactionRetryResolve?.(); + state.compactionRetryResolve = undefined; + state.compactionRetryReject = undefined; + state.compactionRetryPromise = null; + }; + const resolveCompactionRetry = () => { if (state.pendingCompactionRetry <= 0) { return; } state.pendingCompactionRetry -= 1; - if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) { - state.compactionRetryResolve?.(); - state.compactionRetryResolve = undefined; - state.compactionRetryReject = undefined; - state.compactionRetryPromise = null; - } + resolveCompactionPromiseIfIdle(); }; const maybeResolveCompactionWait = () => { - if (state.pendingCompactionRetry === 0 && !state.compactionInFlight) { - state.compactionRetryResolve?.(); - state.compactionRetryResolve = undefined; - state.compactionRetryReject = undefined; - state.compactionRetryPromise = null; - } + resolveCompactionPromiseIfIdle(); }; const recordAssistantUsage = (usageLike: unknown) => { const usage = normalizeUsage((usageLike ?? undefined) as UsageLike | undefined); diff --git a/src/auto-reply/reply/block-reply-pipeline.test.ts b/src/auto-reply/reply/block-reply-pipeline.test.ts index 92564033df5..cd19c92e065 100644 --- a/src/auto-reply/reply/block-reply-pipeline.test.ts +++ b/src/auto-reply/reply/block-reply-pipeline.test.ts @@ -1,4 +1,5 @@ import { describe, expect, it } from "vitest"; +import { setReplyPayloadMetadata } from "../reply-payload.js"; import { createBlockReplyContentKey, createBlockReplyPayloadKey, @@ -55,7 +56,7 @@ describe("createBlockReplyPipeline dedup with threading", () => { pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" }); pipeline.enqueue({ text: "response text", replyToId: undefined }); - await pipeline.flush(); + await pipeline.flush({ force: true }); expect(sent).toEqual([ { text: "response text", replyToId: "thread-root-1" }, @@ -70,10 +71,32 @@ describe("createBlockReplyPipeline dedup with threading", () => { }); pipeline.enqueue({ text: "response text", replyToId: "thread-root-1" }); - await pipeline.flush(); + await pipeline.flush({ force: true }); // Final payload with no replyToId should be recognized as already sent expect(pipeline.hasSentPayload({ text: "response text" })).toBe(true); expect(pipeline.hasSentPayload({ text: "response text", replyToId: "other-id" })).toBe(true); }); + + it("does not coalesce logical assistant blocks across assistantMessageIndex boundaries", async () => { + const sent: string[] = []; + const pipeline = createBlockReplyPipeline({ + onBlockReply: async (payload) => { + sent.push(payload.text ?? ""); + }, + timeoutMs: 5000, + coalescing: { + minChars: 100, + maxChars: 200, + idleMs: 1000, + joiner: "\n\n", + }, + }); + + pipeline.enqueue(setReplyPayloadMetadata({ text: "Alpha" }, { assistantMessageIndex: 0 })); + pipeline.enqueue(setReplyPayloadMetadata({ text: "Beta" }, { assistantMessageIndex: 1 })); + await pipeline.flush({ force: true }); + + expect(sent).toEqual(["Alpha", "Beta"]); + }); }); diff --git a/src/auto-reply/reply/block-reply-pipeline.ts b/src/auto-reply/reply/block-reply-pipeline.ts index 502403e53d2..f63e566b0f7 100644 --- a/src/auto-reply/reply/block-reply-pipeline.ts +++ b/src/auto-reply/reply/block-reply-pipeline.ts @@ -1,5 +1,6 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { logVerbose } from "../../globals.js"; +import { getReplyPayloadMetadata } from "../reply-payload.js"; import type { ReplyPayload } from "../types.js"; import { createBlockReplyCoalescer } from "./block-reply-coalescer.js"; import type { BlockStreamingCoalescing } from "./block-streaming.js"; @@ -90,11 +91,20 @@ export function createBlockReplyPipeline(params: { const bufferedKeys = new Set(); const bufferedPayloadKeys = new Set(); const bufferedPayloads: ReplyPayload[] = []; + let bufferedAssistantMessageIndex: number | undefined; let sendChain: Promise = Promise.resolve(); let aborted = false; let didStream = false; let didLogTimeout = false; + const hasSeenOrQueuedPayloadKey = (payloadKey: string) => + seenKeys.has(payloadKey) || sentKeys.has(payloadKey) || pendingKeys.has(payloadKey); + + const flushBufferedAssistantBlock = () => { + bufferedAssistantMessageIndex = undefined; + void coalescer?.flush({ force: true }); + }; + const sendPayload = (payload: ReplyPayload, bypassSeenCheck: boolean = false) => { if (aborted) { return; @@ -163,6 +173,7 @@ export function createBlockReplyPipeline(params: { config: coalescing, shouldAbort: () => aborted, onFlush: (payload) => { + bufferedAssistantMessageIndex = undefined; bufferedKeys.clear(); sendPayload(payload, /* bypassSeenCheck */ true); }, @@ -175,12 +186,7 @@ export function createBlockReplyPipeline(params: { return false; } const payloadKey = createBlockReplyPayloadKey(payload); - if ( - seenKeys.has(payloadKey) || - sentKeys.has(payloadKey) || - pendingKeys.has(payloadKey) || - bufferedPayloadKeys.has(payloadKey) - ) { + if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedPayloadKeys.has(payloadKey)) { return true; } seenKeys.add(payloadKey); @@ -215,12 +221,25 @@ export function createBlockReplyPipeline(params: { return; } if (coalescer) { + const assistantMessageIndex = getReplyPayloadMetadata(payload)?.assistantMessageIndex; + if ( + assistantMessageIndex !== undefined && + bufferedAssistantMessageIndex !== undefined && + assistantMessageIndex !== bufferedAssistantMessageIndex && + coalescer.hasBuffered() + ) { + // Logical assistant blocks must not be merged together by the generic + // coalescer. Force-flush the previous buffered block before starting a + // new assistant-message block. + flushBufferedAssistantBlock(); + } const payloadKey = createBlockReplyPayloadKey(payload); - if (seenKeys.has(payloadKey) || pendingKeys.has(payloadKey) || bufferedKeys.has(payloadKey)) { + if (hasSeenOrQueuedPayloadKey(payloadKey) || bufferedKeys.has(payloadKey)) { return; } seenKeys.add(payloadKey); bufferedKeys.add(payloadKey); + bufferedAssistantMessageIndex = assistantMessageIndex; coalescer.enqueue(payload); return; } @@ -229,6 +248,7 @@ export function createBlockReplyPipeline(params: { const flush = async (options?: { force?: boolean }) => { await coalescer?.flush(options); + bufferedAssistantMessageIndex = undefined; flushBuffered(); await sendChain; }; diff --git a/src/auto-reply/reply/reply-delivery.test.ts b/src/auto-reply/reply/reply-delivery.test.ts index 5edd8a947c7..d8148268fed 100644 --- a/src/auto-reply/reply/reply-delivery.test.ts +++ b/src/auto-reply/reply/reply-delivery.test.ts @@ -1,5 +1,6 @@ import path from "node:path"; import { describe, expect, it, vi } from "vitest"; +import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js"; import { createBlockReplyContentKey } from "./block-reply-pipeline.js"; import { createBlockReplyDeliveryHandler, @@ -159,4 +160,41 @@ describe("createBlockReplyDeliveryHandler", () => { audioAsVoice: false, }); }); + + it("preserves reply payload metadata across block-reply normalization", async () => { + const enqueue = vi.fn(); + const blockReplyPipeline = { + enqueue, + } as unknown as BlockReplyPipelineLike; + + const handler = createBlockReplyDeliveryHandler({ + onBlockReply: vi.fn(async () => {}), + normalizeStreamingText: (payload) => ({ text: payload.text, skip: false }), + applyReplyToMode: (payload) => ({ ...payload, replyToTag: true }), + typingSignals: { + signalTextDelta: vi.fn(async () => {}), + } as unknown as TypingSignaler, + blockStreamingEnabled: true, + blockReplyPipeline, + directlySentBlockKeys: new Set(), + }); + + const payload = setReplyPayloadMetadata({ text: "Alpha" }, { assistantMessageIndex: 7 }); + + await handler(payload); + + const enqueuedPayload = enqueue.mock.calls[0]?.[0]; + expect(enqueuedPayload).toEqual({ + text: "Alpha", + mediaUrl: undefined, + replyToId: undefined, + replyToCurrent: undefined, + replyToTag: true, + audioAsVoice: false, + mediaUrls: undefined, + }); + expect(getReplyPayloadMetadata(enqueuedPayload)).toEqual({ + assistantMessageIndex: 7, + }); + }); }); diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts index f12ba3b5bef..f08e15ae1fd 100644 --- a/src/auto-reply/reply/reply-delivery.ts +++ b/src/auto-reply/reply/reply-delivery.ts @@ -1,5 +1,6 @@ import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import { logVerbose } from "../../globals.js"; +import { getReplyPayloadMetadata, setReplyPayloadMetadata } from "../reply-payload.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import type { BlockReplyContext, ReplyPayload } from "../types.js"; import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; @@ -58,6 +59,21 @@ export function normalizeReplyPayloadDirectives(params: { }; } +function carryReplyPayloadMetadata(source: ReplyPayload, target: ReplyPayload): ReplyPayload { + const metadata = getReplyPayloadMetadata(source); + return metadata ? setReplyPayloadMetadata(target, metadata) : target; +} + +async function sendDirectBlockReply(params: { + onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; + directlySentBlockKeys: Set; + trackingPayload: ReplyPayload; + payload: ReplyPayload; +}) { + params.directlySentBlockKeys.add(createBlockReplyContentKey(params.trackingPayload)); + await params.onBlockReply(params.payload); +} + export function createBlockReplyDeliveryHandler(params: { onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; currentMessageId?: string; @@ -103,7 +119,10 @@ export function createBlockReplyDeliveryHandler(params: { const mediaNormalizedPayload = params.normalizeMediaPaths ? await params.normalizeMediaPaths(normalized.payload) : normalized.payload; - const blockPayload = params.applyReplyToMode(mediaNormalizedPayload); + const blockPayload = carryReplyPayloadMetadata( + payload, + params.applyReplyToMode(mediaNormalizedPayload), + ); const blockHasMedia = resolveSendableOutboundReplyParts(blockPayload).hasMedia; // Skip empty payloads unless they have audioAsVoice flag (need to track it). @@ -126,14 +145,22 @@ export function createBlockReplyDeliveryHandler(params: { } else if (params.blockStreamingEnabled) { // Send directly when flushing before tool execution (no pipeline but streaming enabled). // Track sent key to avoid duplicate in final payloads. - params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload)); - await params.onBlockReply(blockPayload); + await sendDirectBlockReply({ + onBlockReply: params.onBlockReply, + directlySentBlockKeys: params.directlySentBlockKeys, + trackingPayload: blockPayload, + payload: blockPayload, + }); } else if (blockHasMedia) { // When block streaming is disabled, text-only block replies are accumulated into the // final response. Media cannot be reconstructed later, so send it immediately and let // the assistant's final text arrive through the normal final-reply path. - params.directlySentBlockKeys.add(createBlockReplyContentKey(blockPayload)); - await params.onBlockReply({ ...blockPayload, text: undefined }); + await sendDirectBlockReply({ + onBlockReply: params.onBlockReply, + directlySentBlockKeys: params.directlySentBlockKeys, + trackingPayload: blockPayload, + payload: { ...blockPayload, text: undefined }, + }); } // When streaming is disabled entirely, text-only blocks are accumulated in final text. };