fix(agents): flush final block tag fragments (#74065)

This commit is contained in:
Peter Steinberger
2026-04-29 05:51:46 +01:00
parent 0a8a255733
commit 5e2f6ce294
5 changed files with 108 additions and 8 deletions

View File

@@ -111,6 +111,11 @@ function createMessageEndContext(
final: false,
inlineCode: createInlineCodeState(),
},
partialBlockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistant: undefined,
lastStreamedAssistantCleaned: undefined,
lastReasoningSent: undefined,

View File

@@ -644,7 +644,7 @@ export function handleMessageUpdate(
) {
const assistantMessageIndex = ctx.state.assistantMessageIndex;
void Promise.resolve()
.then(() => ctx.flushBlockReplyBuffer({ assistantMessageIndex }))
.then(() => ctx.flushBlockReplyBuffer({ assistantMessageIndex, final: true }))
.catch((err) => {
ctx.log.debug(`text_end block reply flush failed: ${String(err)}`);
});
@@ -829,8 +829,15 @@ export function handleMessageEnd(
text !== ctx.state.lastBlockReplyText)
) {
if (hasBufferedBlockReply && ctx.blockChunker?.hasBuffered()) {
ctx.blockChunker.drain({ force: true, emit: ctx.emitBlockChunk });
ctx.blockChunker.reset();
const flushBlockReplyBufferResult = ctx.flushBlockReplyBuffer({
assistantMessageIndex: ctx.state.assistantMessageIndex,
final: true,
});
if (isPromiseLike<void>(flushBlockReplyBufferResult)) {
void flushBlockReplyBufferResult.catch((err) => {
ctx.log.debug(`message_end block reply flush failed: ${String(err)}`);
});
}
// Final-flush the streaming directive accumulator so any partial
// directive tail held back by splitTrailingDirective (for example a
// trailing `MEDIA:<path>` that arrived without a closing newline)

View File

@@ -130,8 +130,14 @@ export type EmbeddedPiSubscribeContext = {
},
options?: { final?: boolean },
) => string;
emitBlockChunk: (text: string, options?: { assistantMessageIndex?: number }) => void;
flushBlockReplyBuffer: (options?: { assistantMessageIndex?: number }) => void | Promise<void>;
emitBlockChunk: (
text: string,
options?: { assistantMessageIndex?: number; final?: boolean },
) => void;
flushBlockReplyBuffer: (options?: {
assistantMessageIndex?: number;
final?: boolean;
}) => void | Promise<void>;
emitReasoningStream: (text: string) => void;
consumeReplyDirectives: (
text: string,

View File

@@ -165,6 +165,58 @@ describe("subscribeEmbeddedPiSession", () => {
expect(payloads.map((payload) => payload.delta).join("")).toBe("Answer ends with <fi");
});
it("flushes a literal trailing final-tag prefix in text_end block replies", async () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "Answer ends with <fi" });
emitAssistantTextEnd({ emit });
await Promise.resolve();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply.mock.calls[0]?.[0]?.text).toBe("Answer ends with <fi");
});
it("keeps a trailing final-tag prefix when synchronous message_end drains chunked text_end replies", async () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
subscribeEmbeddedPiSession({
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: { minChars: 1, maxChars: 200 },
});
const text = "Answer ends with <fi";
const assistantMessage = {
role: "assistant",
content: [{ type: "text", text }],
} as AssistantMessage;
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: text });
emitAssistantTextEnd({ emit });
emit({ type: "message_end", message: assistantMessage });
await Promise.resolve();
expect(onBlockReply.mock.calls.map((call) => call[0]?.text)).toEqual([
"Answer ends with",
"<fi",
]);
});
it("preserves literal trailing tag-prefix text from message end fallback", () => {
const { session, emit } = createStubSessionHarness();

View File

@@ -700,13 +700,18 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
return output;
};
const emitBlockChunk = (text: string, options?: { assistantMessageIndex?: number }) => {
const emitBlockChunk = (
text: string,
options?: { assistantMessageIndex?: number; final?: boolean },
) => {
if (state.suppressBlockChunks || params.silentExpected) {
return;
}
// Strip <think> and <final> blocks across chunk boundaries to avoid leaking reasoning.
// Also strip downgraded tool call text ([Tool Call: ...], [Historical context: ...], etc.).
const chunk = stripDowngradedToolCallText(stripBlockTags(text, state.blockState)).trimEnd();
const chunk = stripDowngradedToolCallText(
stripBlockTags(text, state.blockState, { final: options?.final === true }),
).trimEnd();
if (!chunk) {
return;
}
@@ -769,17 +774,42 @@ export function subscribeEmbeddedPiSession(params: SubscribeEmbeddedPiSessionPar
const flushBlockReplyBuffer = (options?: {
assistantMessageIndex?: number;
final?: boolean;
}): void | Promise<void> => {
if (!params.onBlockReply) {
return;
}
if (blockChunker?.hasBuffered()) {
blockChunker.drain({ force: true, emit: (text) => emitBlockChunk(text, options) });
if (options?.final) {
let pendingChunk: string | undefined;
blockChunker.drain({
force: true,
emit: (text) => {
if (pendingChunk !== undefined) {
emitBlockChunk(pendingChunk, {
assistantMessageIndex: options.assistantMessageIndex,
});
}
pendingChunk = text;
},
});
if (pendingChunk !== undefined) {
emitBlockChunk(pendingChunk, {
assistantMessageIndex: options.assistantMessageIndex,
final: true,
});
}
} else {
blockChunker.drain({ force: true, emit: (text) => emitBlockChunk(text, options) });
}
blockChunker.reset();
} else if (state.blockBuffer.length > 0) {
emitBlockChunk(state.blockBuffer, options);
state.blockBuffer = "";
}
if (options?.final) {
emitBlockChunk("", options);
}
if (pendingBlockReplyTasks.size === 0) {
return;
}