fix(agents): bound cli runner supervisor output

Signed-off-by: samzong <samzong.lu@gmail.com>
This commit is contained in:
samzong
2026-05-09 10:51:03 +08:00
committed by Peter Steinberger
parent 1ab7c7e248
commit 02ca572a26
4 changed files with 442 additions and 27 deletions

View File

@@ -388,14 +388,34 @@ export function createCliJsonlStreamingParser(params: {
let assistantText = "";
let sessionId: string | undefined;
let usage: CliUsage | undefined;
let output: CliOutput | null = null;
const texts: string[] = [];
const handleParsedRecord = (parsed: Record<string, unknown>) => {
sessionId = pickCliSessionId(parsed, params.backend) ?? sessionId;
if (!sessionId && typeof parsed.thread_id === "string") {
sessionId = parsed.thread_id.trim();
}
if (isRecord(parsed.usage)) {
usage = toCliUsage(parsed.usage) ?? usage;
usage = readCliUsage(parsed) ?? usage;
const result = parseClaudeCliJsonlResult({
backend: params.backend,
providerId: params.providerId,
parsed,
sessionId,
usage,
});
if (result) {
output = result;
return;
}
const item = isRecord(parsed.item) ? parsed.item : null;
if (item && typeof item.text === "string") {
const type = normalizeLowercaseStringOrEmpty(item.type);
if (!type || type.includes("message")) {
texts.push(item.text);
}
}
const delta = parseClaudeCliStreamingDelta({
@@ -452,6 +472,13 @@ export function createCliJsonlStreamingParser(params: {
finish() {
flushLines(true);
},
getOutput() {
if (output) {
return output;
}
const text = texts.join("\n").trim();
return text ? { text, sessionId, usage } : null;
},
};
}

View File

@@ -38,8 +38,44 @@ const hoisted = vi.hoisted(
setCliRunnerExecuteTestDeps({
getProcessSupervisor: () => ({
spawn: (params: Parameters<SupervisorSpawnFn>[0]) =>
supervisorSpawnMock(params) as ReturnType<SupervisorSpawnFn>,
spawn: async (params: Parameters<SupervisorSpawnFn>[0]) => {
let stdoutDelivered = false;
let stderrDelivered = false;
const wrappedParams = {
...params,
onStdout: params.onStdout
? (chunk: string) => {
stdoutDelivered = true;
params.onStdout?.(chunk);
}
: undefined,
onStderr: params.onStderr
? (chunk: string) => {
stderrDelivered = true;
params.onStderr?.(chunk);
}
: undefined,
};
const managedRun = (await supervisorSpawnMock(wrappedParams)) as Awaited<
ReturnType<SupervisorSpawnFn>
>;
const wait = managedRun.wait;
return {
...managedRun,
wait: async () => {
const exit = await wait();
if (params.captureOutput === false) {
if (!stdoutDelivered && exit.stdout) {
params.onStdout?.(exit.stdout);
}
if (!stderrDelivered && exit.stderr) {
params.onStderr?.(exit.stderr);
}
}
return exit;
},
};
},
cancel: vi.fn(),
cancelScope: vi.fn(),
reconcileOrphans: vi.fn(),

View File

@@ -0,0 +1,247 @@
import { beforeEach, describe, expect, it } from "vitest";
import { onAgentEvent, resetAgentEventsForTest } from "../../infra/agent-events.js";
import type { getProcessSupervisor } from "../../process/supervisor/index.js";
import { createManagedRun, supervisorSpawnMock } from "../cli-runner.test-support.js";
import { executePreparedCliRun } from "./execute.js";
import type { PreparedCliRunContext } from "./types.js";
type ProcessSupervisor = ReturnType<typeof getProcessSupervisor>;
type SupervisorSpawnInput = Parameters<ProcessSupervisor["spawn"]>[0];
function buildPreparedCliRunContext(params: {
output: "jsonl" | "text";
provider?: string;
}): PreparedCliRunContext {
const provider = params.provider ?? "codex-cli";
const backend = {
command: "agent-cli",
args: [],
output: params.output,
input: "stdin" as const,
serialize: true,
};
return {
params: {
sessionId: "session-1",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
prompt: "hi",
provider,
model: "model",
timeoutMs: 1_000,
runId: `run-${params.output}`,
},
started: Date.now(),
workspaceDir: "/tmp",
backendResolved: {
id: provider,
config: backend,
bundleMcp: false,
},
preparedBackend: {
backend,
env: {},
},
reusableCliSession: {},
modelId: "model",
normalizedModel: "model",
systemPrompt: "system",
systemPromptReport: {} as PreparedCliRunContext["systemPromptReport"],
bootstrapPromptWarningLines: [],
authEpochVersion: 2,
};
}
beforeEach(() => {
resetAgentEventsForTest();
supervisorSpawnMock.mockReset();
});
describe("executePreparedCliRun supervisor output capture", () => {
it("disables supervisor capture without parsing from the diagnostic stdout tail", async () => {
const fullText = `start-${"x".repeat(80 * 1024)}-end`;
supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => {
const input = args[0] as SupervisorSpawnInput;
input.onStdout?.(fullText);
return createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: input.captureOutput === false ? "" : fullText,
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
});
const result = await executePreparedCliRun(buildPreparedCliRunContext({ output: "text" }));
const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as SupervisorSpawnInput;
expect(spawnInput.captureOutput).toBe(false);
expect(result.rawText).toBe(fullText);
});
it("rejects oversized successful stdout instead of parsing a truncated tail", async () => {
const noisyPrefix = "x".repeat(2 * 1024 * 1024);
const finalText = "final answer";
supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => {
const input = args[0] as SupervisorSpawnInput;
input.onStdout?.(noisyPrefix);
input.onStdout?.(finalText);
return createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: input.captureOutput === false ? "" : `${noisyPrefix}${finalText}`,
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
});
await expect(
executePreparedCliRun(buildPreparedCliRunContext({ output: "text" })),
).rejects.toThrow("CLI stdout exceeded");
const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as SupervisorSpawnInput;
expect(spawnInput.captureOutput).toBe(false);
});
it("parses valid oversized JSONL output incrementally", async () => {
const largeToolEvent = `${JSON.stringify({
type: "stream_event",
event: {
type: "content_block_delta",
delta: { type: "tool_delta", text: "x".repeat(2 * 1024 * 1024) },
},
})}\n`;
const resultEvent = `${JSON.stringify({
type: "result",
session_id: "session-jsonl-large",
result: "final answer",
})}\n`;
supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => {
const input = args[0] as SupervisorSpawnInput;
input.onStdout?.(largeToolEvent);
input.onStdout?.(resultEvent);
return createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: input.captureOutput === false ? "" : `${largeToolEvent}${resultEvent}`,
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
});
const result = await executePreparedCliRun(
buildPreparedCliRunContext({ output: "jsonl", provider: "claude-cli" }),
);
expect(result.text).toBe("final answer");
expect(result.sessionId).toBe("session-jsonl-large");
});
it("classifies failed stdout from the retained parse buffer before the diagnostic tail", async () => {
const errorPrefix = `${JSON.stringify({
type: "result",
is_error: true,
result: "429 rate limit exceeded",
})}\n`;
const noisyTail = "x".repeat(80 * 1024);
supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => {
const input = args[0] as SupervisorSpawnInput;
input.onStdout?.(errorPrefix);
input.onStdout?.(noisyTail);
return createManagedRun({
reason: "exit",
exitCode: 1,
exitSignal: null,
durationMs: 50,
stdout: input.captureOutput === false ? "" : `${errorPrefix}${noisyTail}`,
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
});
await expect(
executePreparedCliRun(buildPreparedCliRunContext({ output: "text" })),
).rejects.toMatchObject({
reason: "rate_limit",
status: 429,
});
});
it("still streams every JSONL stdout chunk with supervisor capture disabled", async () => {
const agentEvents: Array<{ text?: string; delta?: string }> = [];
const stop = onAgentEvent((event) => {
if (event.stream !== "assistant") {
return;
}
agentEvents.push({
text: typeof event.data.text === "string" ? event.data.text : undefined,
delta: typeof event.data.delta === "string" ? event.data.delta : undefined,
});
});
const chunks = [
`${JSON.stringify({ type: "init", session_id: "session-jsonl" })}\n`,
`${JSON.stringify({
type: "stream_event",
event: { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } },
})}\n`,
`not-json ${"x".repeat(80 * 1024)}\n`,
`${JSON.stringify({
type: "stream_event",
event: { type: "content_block_delta", delta: { type: "text_delta", text: " world" } },
})}\n`,
`${JSON.stringify({
type: "result",
session_id: "session-jsonl",
result: "Hello world",
})}\n`,
];
supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => {
const input = args[0] as SupervisorSpawnInput;
for (const chunk of chunks) {
input.onStdout?.(chunk);
}
return createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: input.captureOutput === false ? "" : chunks.join(""),
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
});
try {
const result = await executePreparedCliRun(
buildPreparedCliRunContext({ output: "jsonl", provider: "claude-cli" }),
);
const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as SupervisorSpawnInput;
expect(spawnInput.captureOutput).toBe(false);
expect(result.text).toBe("Hello world");
expect(agentEvents).toEqual([
{ text: "Hello", delta: "Hello" },
{ text: "Hello world", delta: " world" },
]);
} finally {
stop();
}
});
});

View File

@@ -45,6 +45,51 @@ const executeDeps = {
requestHeartbeat: requestHeartbeatImpl,
};
const CLI_RUNNER_OUTPUT_TAIL_BYTES = 64 * 1024;
const CLI_RUNNER_OUTPUT_PARSE_BYTES = 1024 * 1024;
function appendCliOutputTail(tail: Buffer, chunk: string): Buffer {
if (!chunk) {
return tail;
}
const chunkBuffer = Buffer.from(chunk);
if (chunkBuffer.byteLength >= CLI_RUNNER_OUTPUT_TAIL_BYTES) {
return Buffer.from(chunkBuffer.subarray(chunkBuffer.byteLength - CLI_RUNNER_OUTPUT_TAIL_BYTES));
}
const next = Buffer.concat([tail, chunkBuffer], tail.byteLength + chunkBuffer.byteLength);
if (next.byteLength <= CLI_RUNNER_OUTPUT_TAIL_BYTES) {
return next;
}
return Buffer.from(next.subarray(next.byteLength - CLI_RUNNER_OUTPUT_TAIL_BYTES));
}
function appendCliOutputParseBuffer(
buffer: Buffer,
chunk: string,
): { buffer: Buffer; exceeded: boolean } {
if (!chunk) {
return { buffer, exceeded: false };
}
const chunkBuffer = Buffer.from(chunk);
if (buffer.byteLength + chunkBuffer.byteLength > CLI_RUNNER_OUTPUT_PARSE_BYTES) {
const remainingBytes = CLI_RUNNER_OUTPUT_PARSE_BYTES - buffer.byteLength;
if (remainingBytes <= 0) {
return { buffer, exceeded: true };
}
return {
buffer: Buffer.concat(
[buffer, chunkBuffer.subarray(0, remainingBytes)],
CLI_RUNNER_OUTPUT_PARSE_BYTES,
),
exceeded: true,
};
}
return {
buffer: Buffer.concat([buffer, chunkBuffer], buffer.byteLength + chunkBuffer.byteLength),
exceeded: false,
};
}
export function setCliRunnerExecuteTestDeps(overrides: Partial<typeof executeDeps>): void {
Object.assign(executeDeps, overrides);
}
@@ -476,6 +521,12 @@ export async function executePreparedCliRun(
backendId: context.backendResolved.id,
cliSessionId: useResume ? resolvedSessionId : undefined,
});
let stdoutTail: Buffer = Buffer.alloc(0);
let stdoutParseBuffer: Buffer = Buffer.alloc(0);
let stdoutParseExceeded = false;
let stderrTail: Buffer = Buffer.alloc(0);
let stderrParseBuffer: Buffer = Buffer.alloc(0);
let stderrParseExceeded = false;
const managedRun = await supervisor.spawn({
sessionId: params.sessionId,
@@ -489,7 +540,24 @@ export async function executePreparedCliRun(
cwd: context.workspaceDir,
env,
input: stdinPayload,
onStdout: streamingParser ? (chunk: string) => streamingParser.push(chunk) : undefined,
captureOutput: false,
onStdout: (chunk: string) => {
stdoutTail = appendCliOutputTail(stdoutTail, chunk);
if (!stdoutParseExceeded) {
const nextStdoutParse = appendCliOutputParseBuffer(stdoutParseBuffer, chunk);
stdoutParseBuffer = nextStdoutParse.buffer;
stdoutParseExceeded = nextStdoutParse.exceeded;
}
streamingParser?.push(chunk);
},
onStderr: (chunk: string) => {
stderrTail = appendCliOutputTail(stderrTail, chunk);
if (!stderrParseExceeded) {
const nextStderrParse = appendCliOutputParseBuffer(stderrParseBuffer, chunk);
stderrParseBuffer = nextStderrParse.buffer;
stderrParseExceeded = nextStderrParse.exceeded;
}
},
});
let replyBackendCompleted = false;
const replyBackendHandle = params.replyOperation
@@ -526,22 +594,24 @@ export async function executePreparedCliRun(
throw createCliAbortError();
}
const stdout = result.stdout.trim();
const stderr = result.stderr.trim();
const stdout = stdoutParseBuffer.toString("utf8").trim();
const stdoutDiagnostic = stdoutTail.toString("utf8").trim();
const stderr = stderrParseBuffer.toString("utf8").trim();
const stderrDiagnostic = stderrTail.toString("utf8").trim();
if (logOutputText) {
if (stdout) {
cliBackendLog.info(`cli stdout:\n${stdout}`);
if (stdoutDiagnostic) {
cliBackendLog.info(`cli stdout:\n${stdoutDiagnostic}`);
}
if (stderr) {
cliBackendLog.info(`cli stderr:\n${stderr}`);
if (stderrDiagnostic) {
cliBackendLog.info(`cli stderr:\n${stderrDiagnostic}`);
}
}
if (shouldLogVerbose()) {
if (stdout) {
cliBackendLog.debug(`cli stdout:\n${stdout}`);
if (stdoutDiagnostic) {
cliBackendLog.debug(`cli stdout:\n${stdoutDiagnostic}`);
}
if (stderr) {
cliBackendLog.debug(`cli stderr:\n${stderr}`);
if (stderrDiagnostic) {
cliBackendLog.debug(`cli stderr:\n${stderrDiagnostic}`);
}
}
@@ -586,12 +656,27 @@ export async function executePreparedCliRun(
status: resolveFailoverStatus("timeout"),
});
}
const primaryErrorText = stderr || stdout;
const errorCandidates = [stderr, stdout, stderrDiagnostic, stdoutDiagnostic].filter(
(candidate) => candidate.length > 0,
);
const structuredError =
extractCliErrorMessage(primaryErrorText) ??
(stderr ? extractCliErrorMessage(stdout) : null);
const err = structuredError || primaryErrorText || "CLI failed.";
const reason = classifyFailoverReason(err, { provider: params.provider }) ?? "unknown";
errorCandidates.map((candidate) => extractCliErrorMessage(candidate)).find(Boolean) ??
null;
let classifiedErrorText = structuredError;
let reason = structuredError
? classifyFailoverReason(structuredError, { provider: params.provider })
: null;
if (!reason) {
for (const candidate of errorCandidates) {
reason = classifyFailoverReason(candidate, { provider: params.provider });
if (reason) {
classifiedErrorText = candidate;
break;
}
}
}
const err = structuredError || classifiedErrorText || errorCandidates[0] || "CLI failed.";
reason = reason ?? "unknown";
const status = resolveFailoverStatus(reason);
throw new FailoverError(err, {
reason,
@@ -603,13 +688,33 @@ export async function executePreparedCliRun(
});
}
const parsed = parseCliOutput({
raw: stdout,
backend,
providerId: context.backendResolved.id,
outputMode: useResume ? (backend.resumeOutput ?? backend.output) : backend.output,
fallbackSessionId: resolvedSessionId,
});
const outputMode = useResume ? (backend.resumeOutput ?? backend.output) : backend.output;
const streamedJsonlOutput =
outputMode === "jsonl" ? (streamingParser?.getOutput() ?? null) : null;
if (stdoutParseExceeded && !streamedJsonlOutput) {
throw new FailoverError(
`CLI stdout exceeded ${CLI_RUNNER_OUTPUT_PARSE_BYTES} bytes; refusing to parse truncated output.`,
{
reason: "format",
provider: params.provider,
model: context.modelId,
sessionId: params.sessionId,
lane: params.lane,
status: resolveFailoverStatus("format"),
},
);
}
const parsed =
streamedJsonlOutput ??
parseCliOutput({
raw: stdout,
backend,
providerId: context.backendResolved.id,
outputMode,
fallbackSessionId: resolvedSessionId,
});
const rawText = parsed.text;
return {
...parsed,