diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts index 62e36cf6d44..8e5370090bc 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.test.ts @@ -111,6 +111,11 @@ function createMessageEndContext( final: false, inlineCode: createInlineCodeState(), }, + partialBlockState: { + thinking: false, + final: false, + inlineCode: createInlineCodeState(), + }, lastStreamedAssistant: undefined, lastStreamedAssistantCleaned: undefined, lastReasoningSent: undefined, diff --git a/src/agents/pi-embedded-subscribe.handlers.messages.ts b/src/agents/pi-embedded-subscribe.handlers.messages.ts index 994e3b1b258..a1c56e085ef 100644 --- a/src/agents/pi-embedded-subscribe.handlers.messages.ts +++ b/src/agents/pi-embedded-subscribe.handlers.messages.ts @@ -644,7 +644,7 @@ export function handleMessageUpdate( ) { const assistantMessageIndex = ctx.state.assistantMessageIndex; void Promise.resolve() - .then(() => ctx.flushBlockReplyBuffer({ assistantMessageIndex })) + .then(() => ctx.flushBlockReplyBuffer({ assistantMessageIndex, final: true })) .catch((err) => { ctx.log.debug(`text_end block reply flush failed: ${String(err)}`); }); @@ -829,8 +829,15 @@ export function handleMessageEnd( text !== ctx.state.lastBlockReplyText) ) { if (hasBufferedBlockReply && ctx.blockChunker?.hasBuffered()) { - ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk }); - ctx.blockChunker.reset(); + const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer({ + assistantMessageIndex: ctx.state.assistantMessageIndex, + final: true, + }); + if (isPromiseLike(flushBlockReplyBufferResult)) { + void flushBlockReplyBufferResult.catch((err) => { + ctx.log.debug(`message_end block reply flush failed: ${String(err)}`); + }); + } // Final-flush the streaming directive accumulator so any partial // directive tail held back by splitTrailingDirective (for example a // trailing `MEDIA:` that arrived without a closing newline) diff --git a/src/agents/pi-embedded-subscribe.handlers.types.ts b/src/agents/pi-embedded-subscribe.handlers.types.ts index f2fd3674a8a..2b93baa8ab2 100644 --- a/src/agents/pi-embedded-subscribe.handlers.types.ts +++ b/src/agents/pi-embedded-subscribe.handlers.types.ts @@ -130,8 +130,14 @@ export type EmbeddedPiSubscribeContext = { }, options?: { final?: boolean }, ) => string; - emitBlockChunk: (text: string, options?: { assistantMessageIndex?: number }) => void; - flushBlockReplyBuffer: (options?: { assistantMessageIndex?: number }) => void | Promise; + emitBlockChunk: ( + text: string, + options?: { assistantMessageIndex?: number; final?: boolean }, + ) => void; + flushBlockReplyBuffer: (options?: { + assistantMessageIndex?: number; + final?: boolean; + }) => void | Promise; emitReasoningStream: (text: string) => void; consumeReplyDirectives: ( text: string, diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts index caab1759257..2ca639d30e7 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.test.ts @@ -165,6 +165,58 @@ describe("subscribeEmbeddedPiSession", () => { expect(payloads.map((payload) => payload.delta).join("")).toBe("Answer ends with { + const { session, emit } = createStubSessionHarness(); + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + }); + + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "Answer ends with { + const { session, emit } = createStubSessionHarness(); + + const onBlockReply = vi.fn(); + + subscribeEmbeddedPiSession({ + session, + runId: "run", + onBlockReply, + blockReplyBreak: "text_end", + blockReplyChunking: { minChars: 1, maxChars: 200 }, + }); + + const text = "Answer ends with call[0]?.text)).toEqual([ + "Answer ends with", + " { const { session, emit } = createStubSessionHarness(); diff --git a/src/agents/pi-embedded-subscribe.ts b/src/agents/pi-embedded-subscribe.ts index 95711bc533e..a7ce827f36a 100644 --- a/src/agents/pi-embedded-subscribe.ts +++ b/src/agents/pi-embedded-subscribe.ts @@ -700,13 +700,18 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar return output; }; - const emitBlockChunk = (text: string, options?: { assistantMessageIndex?: number }) => { + const emitBlockChunk = ( + text: string, + options?: { assistantMessageIndex?: number; final?: boolean }, + ) => { if (state.suppressBlockChunks || params.silentExpected) { return; } // Strip and blocks across chunk boundaries to avoid leaking reasoning. // Also strip downgraded tool call text ([Tool Call: ...], [Historical context: ...], etc.). - const chunk = stripDowngradedToolCallText(stripBlockTags(text, state.blockState)).trimEnd(); + const chunk = stripDowngradedToolCallText( + stripBlockTags(text, state.blockState, { final: options?.final === true }), + ).trimEnd(); if (!chunk) { return; } @@ -769,17 +774,42 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar const flushBlockReplyBuffer = (options?: { assistantMessageIndex?: number; + final?: boolean; }): void | Promise => { if (!params.onBlockReply) { return; } if (blockChunker?.hasBuffered()) { - blockChunker.drain({ force: true, emit: (text) => emitBlockChunk(text, options) }); + if (options?.final) { + let pendingChunk: string | undefined; + blockChunker.drain({ + force: true, + emit: (text) => { + if (pendingChunk !== undefined) { + emitBlockChunk(pendingChunk, { + assistantMessageIndex: options.assistantMessageIndex, + }); + } + pendingChunk = text; + }, + }); + if (pendingChunk !== undefined) { + emitBlockChunk(pendingChunk, { + assistantMessageIndex: options.assistantMessageIndex, + final: true, + }); + } + } else { + blockChunker.drain({ force: true, emit: (text) => emitBlockChunk(text, options) }); + } blockChunker.reset(); } else if (state.blockBuffer.length > 0) { emitBlockChunk(state.blockBuffer, options); state.blockBuffer = ""; } + if (options?.final) { + emitBlockChunk("", options); + } if (pendingBlockReplyTasks.size === 0) { return; }