fix(agents): suppress commentary-phase output leaks

This commit is contained in:
Mariano Belinky
2026-04-05 12:08:21 +02:00
parent 1bf5339b98
commit eb35c1c280
3 changed files with 368 additions and 12 deletions

View File

@@ -4,6 +4,7 @@ import {
buildAssistantStreamData,
consumePendingToolMediaIntoReply,
consumePendingToolMediaReply,
handleMessageEnd,
handleMessageUpdate,
hasAssistantVisibleReply,
resolveSilentReplyFallbackText,
@@ -140,6 +141,119 @@ describe("consumePendingToolMediaReply", () => {
});
describe("handleMessageUpdate", () => {
it("suppresses commentary-phase partial delivery and text_end flush", async () => {
const onAgentEvent = vi.fn();
const onPartialReply = vi.fn();
const flushBlockReplyBuffer = vi.fn();
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
onAgentEvent,
onPartialReply,
},
state: {
deterministicApprovalPromptSent: false,
reasoningStreamOpen: false,
streamReasoning: false,
deltaBuffer: "",
blockBuffer: "",
partialBlockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistantCleaned: undefined,
emittedAssistantUpdate: false,
shouldEmitPartialReplies: true,
blockReplyBreak: "text_end",
assistantMessageIndex: 0,
},
log: { debug: vi.fn() },
noteLastAssistant: vi.fn(),
stripBlockTags: (text: string) => text,
consumePartialReplyDirectives: vi.fn(() => null),
flushBlockReplyBuffer,
} as unknown as EmbeddedPiSubscribeContext;
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", phase: "commentary", content: [] },
assistantMessageEvent: { type: "text_delta", delta: "Need send." },
} as never);
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", phase: "commentary", content: [] },
assistantMessageEvent: { type: "text_end", content: "Need send." },
} as never);
await Promise.resolve();
expect(onAgentEvent).not.toHaveBeenCalled();
expect(onPartialReply).not.toHaveBeenCalled();
expect(flushBlockReplyBuffer).not.toHaveBeenCalled();
});
it("suppresses commentary partials when phase exists only in textSignature metadata", async () => {
const onAgentEvent = vi.fn();
const onPartialReply = vi.fn();
const flushBlockReplyBuffer = vi.fn();
const commentaryBlock = {
type: "text",
text: "Need send.",
textSignature: JSON.stringify({ v: 1, id: "msg_sig", phase: "commentary" }),
};
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
onAgentEvent,
onPartialReply,
},
state: {
deterministicApprovalPromptSent: false,
reasoningStreamOpen: false,
streamReasoning: false,
deltaBuffer: "",
blockBuffer: "",
partialBlockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistantCleaned: undefined,
emittedAssistantUpdate: false,
shouldEmitPartialReplies: true,
blockReplyBreak: "text_end",
assistantMessageIndex: 0,
},
log: { debug: vi.fn() },
noteLastAssistant: vi.fn(),
stripBlockTags: (text: string) => text,
consumePartialReplyDirectives: vi.fn(() => null),
flushBlockReplyBuffer,
} as unknown as EmbeddedPiSubscribeContext;
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", content: [commentaryBlock] },
assistantMessageEvent: { type: "text_delta", delta: "Need send." },
} as never);
handleMessageUpdate(ctx, {
type: "message_update",
message: { role: "assistant", content: [commentaryBlock] },
assistantMessageEvent: { type: "text_end", content: "Need send." },
} as never);
await Promise.resolve();
expect(onAgentEvent).not.toHaveBeenCalled();
expect(onPartialReply).not.toHaveBeenCalled();
expect(flushBlockReplyBuffer).not.toHaveBeenCalled();
expect(ctx.state.deltaBuffer).toBe("");
expect(ctx.state.blockBuffer).toBe("");
});
it("contains synchronous text_end flush failures", async () => {
const debug = vi.fn();
const ctx = {
@@ -183,3 +297,124 @@ describe("handleMessageUpdate", () => {
});
});
});
describe("handleMessageEnd", () => {
it("suppresses commentary-phase replies from user-visible output", () => {
const onAgentEvent = vi.fn();
const emitBlockReply = vi.fn();
const finalizeAssistantTexts = vi.fn();
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
onAgentEvent,
onBlockReply: vi.fn(),
},
state: {
assistantTexts: [],
assistantTextBaseline: 0,
emittedAssistantUpdate: false,
deterministicApprovalPromptSent: false,
reasoningStreamOpen: false,
includeReasoning: false,
streamReasoning: false,
blockReplyBreak: "message_end",
deltaBuffer: "Need send.",
blockBuffer: "Need send.",
blockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistant: undefined,
lastStreamedAssistantCleaned: undefined,
},
noteLastAssistant: vi.fn(),
recordAssistantUsage: vi.fn(),
log: { debug: vi.fn(), warn: vi.fn() },
stripBlockTags: (text: string) => text,
finalizeAssistantTexts,
emitBlockReply,
consumeReplyDirectives: vi.fn(() => ({ text: "Need send." })),
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: vi.fn(),
blockChunker: null,
} as unknown as EmbeddedPiSubscribeContext;
void handleMessageEnd(ctx, {
type: "message_end",
message: {
role: "assistant",
phase: "commentary",
content: [{ type: "text", text: "Need send." }],
usage: { input: 1, output: 1, total: 2 },
},
} as never);
expect(onAgentEvent).not.toHaveBeenCalled();
expect(emitBlockReply).not.toHaveBeenCalled();
expect(finalizeAssistantTexts).not.toHaveBeenCalled();
});
it("suppresses commentary message_end when phase exists only in textSignature metadata", () => {
const onAgentEvent = vi.fn();
const emitBlockReply = vi.fn();
const finalizeAssistantTexts = vi.fn();
const ctx = {
params: {
runId: "run-1",
session: { id: "session-1" },
onAgentEvent,
onBlockReply: vi.fn(),
},
state: {
assistantTexts: [],
assistantTextBaseline: 0,
emittedAssistantUpdate: false,
deterministicApprovalPromptSent: false,
reasoningStreamOpen: false,
includeReasoning: false,
streamReasoning: false,
blockReplyBreak: "message_end",
deltaBuffer: "Need send.",
blockBuffer: "Need send.",
blockState: {
thinking: false,
final: false,
inlineCode: createInlineCodeState(),
},
lastStreamedAssistant: undefined,
lastStreamedAssistantCleaned: undefined,
},
noteLastAssistant: vi.fn(),
recordAssistantUsage: vi.fn(),
log: { debug: vi.fn(), warn: vi.fn() },
stripBlockTags: (text: string) => text,
finalizeAssistantTexts,
emitBlockReply,
consumeReplyDirectives: vi.fn(() => ({ text: "Need send." })),
emitReasoningStream: vi.fn(),
flushBlockReplyBuffer: vi.fn(),
blockChunker: null,
} as unknown as EmbeddedPiSubscribeContext;
void handleMessageEnd(ctx, {
type: "message_end",
message: {
role: "assistant",
content: [
{
type: "text",
text: "Need send.",
textSignature: JSON.stringify({ v: 1, id: "msg_sig", phase: "commentary" }),
},
],
usage: { input: 1, output: 1, total: 2 },
},
} as never);
expect(onAgentEvent).not.toHaveBeenCalled();
expect(emitBlockReply).not.toHaveBeenCalled();
expect(finalizeAssistantTexts).not.toHaveBeenCalled();
});
});

View File

@@ -24,6 +24,8 @@ import {
promoteThinkingTagsToBlocks,
} from "./pi-embedded-utils.js";
type AssistantDeliveryPhase = "commentary" | "final_answer";
const stripTrailingDirective = (text: string): string => {
const openIndex = text.lastIndexOf("[[");
if (openIndex < 0) {
@@ -64,6 +66,48 @@ const coerceText = (value: unknown): string => {
return "";
};
function normalizeAssistantDeliveryPhase(value: unknown): AssistantDeliveryPhase | undefined {
return value === "commentary" || value === "final_answer" ? value : undefined;
}
function resolveAssistantDeliveryPhase(
message: AgentMessage | undefined,
): AssistantDeliveryPhase | undefined {
if (!message || message.role !== "assistant") {
return undefined;
}
const directPhase = normalizeAssistantDeliveryPhase((message as { phase?: unknown }).phase);
if (directPhase) {
return directPhase;
}
if (!Array.isArray(message.content)) {
return undefined;
}
for (const part of message.content) {
if (!part || typeof part !== "object") {
continue;
}
const block = part as { type?: unknown; textSignature?: unknown };
if (block.type !== "text" || typeof block.textSignature !== "string") {
continue;
}
try {
const parsed = JSON.parse(block.textSignature) as { phase?: unknown };
const phase = normalizeAssistantDeliveryPhase(parsed.phase);
if (phase) {
return phase;
}
} catch {
continue;
}
}
return undefined;
}
function shouldSuppressAssistantVisibleOutput(message: AgentMessage | undefined): boolean {
return resolveAssistantDeliveryPhase(message) === "commentary";
}
function isTranscriptOnlyOpenClawAssistantMessage(message: AgentMessage | undefined): boolean {
if (!message || message.role !== "assistant") {
return false;
@@ -196,6 +240,7 @@ export function handleMessageUpdate(
}
ctx.noteLastAssistant(msg);
const suppressVisibleAssistantOutput = shouldSuppressAssistantVisibleOutput(msg);
if (ctx.state.deterministicApprovalPromptSent) {
return;
}
@@ -254,6 +299,10 @@ export function handleMessageUpdate(
content,
});
if (suppressVisibleAssistantOutput) {
return;
}
let chunk = "";
if (evtType === "text_delta") {
chunk = delta;
@@ -387,6 +436,7 @@ export function handleMessageEnd(
}
const assistantMessage = msg;
const suppressVisibleAssistantOutput = shouldSuppressAssistantVisibleOutput(assistantMessage);
ctx.noteLastAssistant(assistantMessage);
ctx.recordAssistantUsage((assistantMessage as { usage?: unknown }).usage);
if (ctx.state.deterministicApprovalPromptSent) {
@@ -418,6 +468,24 @@ export function handleMessageEnd(
let cleanedText = parsedText?.text ?? "";
let { mediaUrls, hasMedia } = resolveSendableOutboundReplyParts(parsedText ?? {});
const finalizeMessageEnd = () => {
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;
ctx.state.reasoningStreamOpen = false;
};
if (suppressVisibleAssistantOutput) {
emitReasoningEnd(ctx);
finalizeMessageEnd();
return;
}
if (!cleanedText && !hasMedia && !ctx.params.enforceFinalTag) {
const rawTrimmed = coerceText(rawText).trim();
const rawStrippedFinal = rawTrimmed.replace(/<\s*\/?\s*final\s*>/gi, "").trim();
@@ -551,18 +619,6 @@ export function handleMessageEnd(
emitSplitResultAsBlockReply(ctx.consumeReplyDirectives("", { final: true }));
}
const finalizeMessageEnd = () => {
ctx.state.deltaBuffer = "";
ctx.state.blockBuffer = "";
ctx.blockChunker?.reset();
ctx.state.blockState.thinking = false;
ctx.state.blockState.final = false;
ctx.state.blockState.inlineCode = createInlineCodeState();
ctx.state.lastStreamedAssistant = undefined;
ctx.state.lastStreamedAssistantCleaned = undefined;
ctx.state.reasoningStreamOpen = false;
};
if (
!ctx.params.silentExpected &&
ctx.state.blockReplyBreak === "message_end" &&

View File

@@ -0,0 +1,65 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createSubscribedSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
describe("subscribeEmbeddedPiSession", () => {
it("suppresses commentary-phase assistant messages before tool use", () => {
const onBlockReply = vi.fn();
const onPartialReply = vi.fn();
const { emit, subscription } = createSubscribedSessionHarness({
runId: "run",
onBlockReply,
onPartialReply,
blockReplyBreak: "message_end",
});
const commentaryMessage = {
role: "assistant",
phase: "commentary",
content: [{ type: "text", text: "Need send." }],
stopReason: "toolUse",
} as AssistantMessage;
emit({ type: "message_start", message: commentaryMessage });
emit({ type: "message_end", message: commentaryMessage });
expect(onBlockReply).not.toHaveBeenCalled();
expect(onPartialReply).not.toHaveBeenCalled();
expect(subscription.assistantTexts).toEqual([]);
});
it("suppresses commentary when phase is only present in textSignature metadata", () => {
const onBlockReply = vi.fn();
const onPartialReply = vi.fn();
const { emit, subscription } = createSubscribedSessionHarness({
runId: "run",
onBlockReply,
onPartialReply,
blockReplyBreak: "message_end",
});
const commentaryMessage = {
role: "assistant",
content: [
{
type: "text",
text: "Need send.",
textSignature: JSON.stringify({ v: 1, id: "msg_sig", phase: "commentary" }),
},
],
stopReason: "toolUse",
} as AssistantMessage;
emit({ type: "message_start", message: commentaryMessage });
emit({
type: "message_update",
message: commentaryMessage,
assistantMessageEvent: { type: "text_delta", delta: "Need send." },
});
emit({ type: "message_end", message: commentaryMessage });
expect(onBlockReply).not.toHaveBeenCalled();
expect(onPartialReply).not.toHaveBeenCalled();
expect(subscription.assistantTexts).toEqual([]);
});
});