diff --git a/src/agents/cli-output.ts b/src/agents/cli-output.ts index b3b7e585369..93d40301436 100644 --- a/src/agents/cli-output.ts +++ b/src/agents/cli-output.ts @@ -388,14 +388,34 @@ export function createCliJsonlStreamingParser(params: { let assistantText = ""; let sessionId: string | undefined; let usage: CliUsage | undefined; + let output: CliOutput | null = null; + const texts: string[] = []; const handleParsedRecord = (parsed: Record) => { sessionId = pickCliSessionId(parsed, params.backend) ?? sessionId; if (!sessionId && typeof parsed.thread_id === "string") { sessionId = parsed.thread_id.trim(); } - if (isRecord(parsed.usage)) { - usage = toCliUsage(parsed.usage) ?? usage; + usage = readCliUsage(parsed) ?? usage; + + const result = parseClaudeCliJsonlResult({ + backend: params.backend, + providerId: params.providerId, + parsed, + sessionId, + usage, + }); + if (result) { + output = result; + return; + } + + const item = isRecord(parsed.item) ? parsed.item : null; + if (item && typeof item.text === "string") { + const type = normalizeLowercaseStringOrEmpty(item.type); + if (!type || type.includes("message")) { + texts.push(item.text); + } } const delta = parseClaudeCliStreamingDelta({ @@ -452,6 +472,13 @@ export function createCliJsonlStreamingParser(params: { finish() { flushLines(true); }, + getOutput() { + if (output) { + return output; + } + const text = texts.join("\n").trim(); + return text ? { text, sessionId, usage } : null; + }, }; } diff --git a/src/agents/cli-runner.test-support.ts b/src/agents/cli-runner.test-support.ts index aa9b4af2390..565899ad861 100644 --- a/src/agents/cli-runner.test-support.ts +++ b/src/agents/cli-runner.test-support.ts @@ -38,8 +38,44 @@ const hoisted = vi.hoisted( setCliRunnerExecuteTestDeps({ getProcessSupervisor: () => ({ - spawn: (params: Parameters[0]) => - supervisorSpawnMock(params) as ReturnType, + spawn: async (params: Parameters[0]) => { + let stdoutDelivered = false; + let stderrDelivered = false; + const wrappedParams = { + ...params, + onStdout: params.onStdout + ? (chunk: string) => { + stdoutDelivered = true; + params.onStdout?.(chunk); + } + : undefined, + onStderr: params.onStderr + ? (chunk: string) => { + stderrDelivered = true; + params.onStderr?.(chunk); + } + : undefined, + }; + const managedRun = (await supervisorSpawnMock(wrappedParams)) as Awaited< + ReturnType + >; + const wait = managedRun.wait; + return { + ...managedRun, + wait: async () => { + const exit = await wait(); + if (params.captureOutput === false) { + if (!stdoutDelivered && exit.stdout) { + params.onStdout?.(exit.stdout); + } + if (!stderrDelivered && exit.stderr) { + params.onStderr?.(exit.stderr); + } + } + return exit; + }, + }; + }, cancel: vi.fn(), cancelScope: vi.fn(), reconcileOrphans: vi.fn(), diff --git a/src/agents/cli-runner/execute.supervisor-capture.test.ts b/src/agents/cli-runner/execute.supervisor-capture.test.ts new file mode 100644 index 00000000000..bd302267f34 --- /dev/null +++ b/src/agents/cli-runner/execute.supervisor-capture.test.ts @@ -0,0 +1,247 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import { onAgentEvent, resetAgentEventsForTest } from "../../infra/agent-events.js"; +import type { getProcessSupervisor } from "../../process/supervisor/index.js"; +import { createManagedRun, supervisorSpawnMock } from "../cli-runner.test-support.js"; +import { executePreparedCliRun } from "./execute.js"; +import type { PreparedCliRunContext } from "./types.js"; + +type ProcessSupervisor = ReturnType; +type SupervisorSpawnInput = Parameters[0]; + +function buildPreparedCliRunContext(params: { + output: "jsonl" | "text"; + provider?: string; +}): PreparedCliRunContext { + const provider = params.provider ?? "codex-cli"; + const backend = { + command: "agent-cli", + args: [], + output: params.output, + input: "stdin" as const, + serialize: true, + }; + + return { + params: { + sessionId: "session-1", + sessionFile: "/tmp/session.jsonl", + workspaceDir: "/tmp", + prompt: "hi", + provider, + model: "model", + timeoutMs: 1_000, + runId: `run-${params.output}`, + }, + started: Date.now(), + workspaceDir: "/tmp", + backendResolved: { + id: provider, + config: backend, + bundleMcp: false, + }, + preparedBackend: { + backend, + env: {}, + }, + reusableCliSession: {}, + modelId: "model", + normalizedModel: "model", + systemPrompt: "system", + systemPromptReport: {} as PreparedCliRunContext["systemPromptReport"], + bootstrapPromptWarningLines: [], + authEpochVersion: 2, + }; +} + +beforeEach(() => { + resetAgentEventsForTest(); + supervisorSpawnMock.mockReset(); +}); + +describe("executePreparedCliRun supervisor output capture", () => { + it("disables supervisor capture without parsing from the diagnostic stdout tail", async () => { + const fullText = `start-${"x".repeat(80 * 1024)}-end`; + + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = args[0] as SupervisorSpawnInput; + input.onStdout?.(fullText); + return createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: input.captureOutput === false ? "" : fullText, + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + + const result = await executePreparedCliRun(buildPreparedCliRunContext({ output: "text" })); + const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as SupervisorSpawnInput; + + expect(spawnInput.captureOutput).toBe(false); + expect(result.rawText).toBe(fullText); + }); + + it("rejects oversized successful stdout instead of parsing a truncated tail", async () => { + const noisyPrefix = "x".repeat(2 * 1024 * 1024); + const finalText = "final answer"; + + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = args[0] as SupervisorSpawnInput; + input.onStdout?.(noisyPrefix); + input.onStdout?.(finalText); + return createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: input.captureOutput === false ? "" : `${noisyPrefix}${finalText}`, + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + + await expect( + executePreparedCliRun(buildPreparedCliRunContext({ output: "text" })), + ).rejects.toThrow("CLI stdout exceeded"); + const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as SupervisorSpawnInput; + + expect(spawnInput.captureOutput).toBe(false); + }); + + it("parses valid oversized JSONL output incrementally", async () => { + const largeToolEvent = `${JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "tool_delta", text: "x".repeat(2 * 1024 * 1024) }, + }, + })}\n`; + const resultEvent = `${JSON.stringify({ + type: "result", + session_id: "session-jsonl-large", + result: "final answer", + })}\n`; + + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = args[0] as SupervisorSpawnInput; + input.onStdout?.(largeToolEvent); + input.onStdout?.(resultEvent); + return createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: input.captureOutput === false ? "" : `${largeToolEvent}${resultEvent}`, + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + + const result = await executePreparedCliRun( + buildPreparedCliRunContext({ output: "jsonl", provider: "claude-cli" }), + ); + + expect(result.text).toBe("final answer"); + expect(result.sessionId).toBe("session-jsonl-large"); + }); + + it("classifies failed stdout from the retained parse buffer before the diagnostic tail", async () => { + const errorPrefix = `${JSON.stringify({ + type: "result", + is_error: true, + result: "429 rate limit exceeded", + })}\n`; + const noisyTail = "x".repeat(80 * 1024); + + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = args[0] as SupervisorSpawnInput; + input.onStdout?.(errorPrefix); + input.onStdout?.(noisyTail); + return createManagedRun({ + reason: "exit", + exitCode: 1, + exitSignal: null, + durationMs: 50, + stdout: input.captureOutput === false ? "" : `${errorPrefix}${noisyTail}`, + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + + await expect( + executePreparedCliRun(buildPreparedCliRunContext({ output: "text" })), + ).rejects.toMatchObject({ + reason: "rate_limit", + status: 429, + }); + }); + + it("still streams every JSONL stdout chunk with supervisor capture disabled", async () => { + const agentEvents: Array<{ text?: string; delta?: string }> = []; + const stop = onAgentEvent((event) => { + if (event.stream !== "assistant") { + return; + } + agentEvents.push({ + text: typeof event.data.text === "string" ? event.data.text : undefined, + delta: typeof event.data.delta === "string" ? event.data.delta : undefined, + }); + }); + const chunks = [ + `${JSON.stringify({ type: "init", session_id: "session-jsonl" })}\n`, + `${JSON.stringify({ + type: "stream_event", + event: { type: "content_block_delta", delta: { type: "text_delta", text: "Hello" } }, + })}\n`, + `not-json ${"x".repeat(80 * 1024)}\n`, + `${JSON.stringify({ + type: "stream_event", + event: { type: "content_block_delta", delta: { type: "text_delta", text: " world" } }, + })}\n`, + `${JSON.stringify({ + type: "result", + session_id: "session-jsonl", + result: "Hello world", + })}\n`, + ]; + + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = args[0] as SupervisorSpawnInput; + for (const chunk of chunks) { + input.onStdout?.(chunk); + } + return createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: input.captureOutput === false ? "" : chunks.join(""), + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + + try { + const result = await executePreparedCliRun( + buildPreparedCliRunContext({ output: "jsonl", provider: "claude-cli" }), + ); + const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as SupervisorSpawnInput; + + expect(spawnInput.captureOutput).toBe(false); + expect(result.text).toBe("Hello world"); + expect(agentEvents).toEqual([ + { text: "Hello", delta: "Hello" }, + { text: "Hello world", delta: " world" }, + ]); + } finally { + stop(); + } + }); +}); diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index b043fb616a1..0f07f7949b1 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -45,6 +45,51 @@ const executeDeps = { requestHeartbeat: requestHeartbeatImpl, }; +const CLI_RUNNER_OUTPUT_TAIL_BYTES = 64 * 1024; +const CLI_RUNNER_OUTPUT_PARSE_BYTES = 1024 * 1024; + +function appendCliOutputTail(tail: Buffer, chunk: string): Buffer { + if (!chunk) { + return tail; + } + const chunkBuffer = Buffer.from(chunk); + if (chunkBuffer.byteLength >= CLI_RUNNER_OUTPUT_TAIL_BYTES) { + return Buffer.from(chunkBuffer.subarray(chunkBuffer.byteLength - CLI_RUNNER_OUTPUT_TAIL_BYTES)); + } + const next = Buffer.concat([tail, chunkBuffer], tail.byteLength + chunkBuffer.byteLength); + if (next.byteLength <= CLI_RUNNER_OUTPUT_TAIL_BYTES) { + return next; + } + return Buffer.from(next.subarray(next.byteLength - CLI_RUNNER_OUTPUT_TAIL_BYTES)); +} + +function appendCliOutputParseBuffer( + buffer: Buffer, + chunk: string, +): { buffer: Buffer; exceeded: boolean } { + if (!chunk) { + return { buffer, exceeded: false }; + } + const chunkBuffer = Buffer.from(chunk); + if (buffer.byteLength + chunkBuffer.byteLength > CLI_RUNNER_OUTPUT_PARSE_BYTES) { + const remainingBytes = CLI_RUNNER_OUTPUT_PARSE_BYTES - buffer.byteLength; + if (remainingBytes <= 0) { + return { buffer, exceeded: true }; + } + return { + buffer: Buffer.concat( + [buffer, chunkBuffer.subarray(0, remainingBytes)], + CLI_RUNNER_OUTPUT_PARSE_BYTES, + ), + exceeded: true, + }; + } + return { + buffer: Buffer.concat([buffer, chunkBuffer], buffer.byteLength + chunkBuffer.byteLength), + exceeded: false, + }; +} + export function setCliRunnerExecuteTestDeps(overrides: Partial): void { Object.assign(executeDeps, overrides); } @@ -476,6 +521,12 @@ export async function executePreparedCliRun( backendId: context.backendResolved.id, cliSessionId: useResume ? resolvedSessionId : undefined, }); + let stdoutTail: Buffer = Buffer.alloc(0); + let stdoutParseBuffer: Buffer = Buffer.alloc(0); + let stdoutParseExceeded = false; + let stderrTail: Buffer = Buffer.alloc(0); + let stderrParseBuffer: Buffer = Buffer.alloc(0); + let stderrParseExceeded = false; const managedRun = await supervisor.spawn({ sessionId: params.sessionId, @@ -489,7 +540,24 @@ export async function executePreparedCliRun( cwd: context.workspaceDir, env, input: stdinPayload, - onStdout: streamingParser ? (chunk: string) => streamingParser.push(chunk) : undefined, + captureOutput: false, + onStdout: (chunk: string) => { + stdoutTail = appendCliOutputTail(stdoutTail, chunk); + if (!stdoutParseExceeded) { + const nextStdoutParse = appendCliOutputParseBuffer(stdoutParseBuffer, chunk); + stdoutParseBuffer = nextStdoutParse.buffer; + stdoutParseExceeded = nextStdoutParse.exceeded; + } + streamingParser?.push(chunk); + }, + onStderr: (chunk: string) => { + stderrTail = appendCliOutputTail(stderrTail, chunk); + if (!stderrParseExceeded) { + const nextStderrParse = appendCliOutputParseBuffer(stderrParseBuffer, chunk); + stderrParseBuffer = nextStderrParse.buffer; + stderrParseExceeded = nextStderrParse.exceeded; + } + }, }); let replyBackendCompleted = false; const replyBackendHandle = params.replyOperation @@ -526,22 +594,24 @@ export async function executePreparedCliRun( throw createCliAbortError(); } - const stdout = result.stdout.trim(); - const stderr = result.stderr.trim(); + const stdout = stdoutParseBuffer.toString("utf8").trim(); + const stdoutDiagnostic = stdoutTail.toString("utf8").trim(); + const stderr = stderrParseBuffer.toString("utf8").trim(); + const stderrDiagnostic = stderrTail.toString("utf8").trim(); if (logOutputText) { - if (stdout) { - cliBackendLog.info(`cli stdout:\n${stdout}`); + if (stdoutDiagnostic) { + cliBackendLog.info(`cli stdout:\n${stdoutDiagnostic}`); } - if (stderr) { - cliBackendLog.info(`cli stderr:\n${stderr}`); + if (stderrDiagnostic) { + cliBackendLog.info(`cli stderr:\n${stderrDiagnostic}`); } } if (shouldLogVerbose()) { - if (stdout) { - cliBackendLog.debug(`cli stdout:\n${stdout}`); + if (stdoutDiagnostic) { + cliBackendLog.debug(`cli stdout:\n${stdoutDiagnostic}`); } - if (stderr) { - cliBackendLog.debug(`cli stderr:\n${stderr}`); + if (stderrDiagnostic) { + cliBackendLog.debug(`cli stderr:\n${stderrDiagnostic}`); } } @@ -586,12 +656,27 @@ export async function executePreparedCliRun( status: resolveFailoverStatus("timeout"), }); } - const primaryErrorText = stderr || stdout; + const errorCandidates = [stderr, stdout, stderrDiagnostic, stdoutDiagnostic].filter( + (candidate) => candidate.length > 0, + ); const structuredError = - extractCliErrorMessage(primaryErrorText) ?? - (stderr ? extractCliErrorMessage(stdout) : null); - const err = structuredError || primaryErrorText || "CLI failed."; - const reason = classifyFailoverReason(err, { provider: params.provider }) ?? "unknown"; + errorCandidates.map((candidate) => extractCliErrorMessage(candidate)).find(Boolean) ?? + null; + let classifiedErrorText = structuredError; + let reason = structuredError + ? classifyFailoverReason(structuredError, { provider: params.provider }) + : null; + if (!reason) { + for (const candidate of errorCandidates) { + reason = classifyFailoverReason(candidate, { provider: params.provider }); + if (reason) { + classifiedErrorText = candidate; + break; + } + } + } + const err = structuredError || classifiedErrorText || errorCandidates[0] || "CLI failed."; + reason = reason ?? "unknown"; const status = resolveFailoverStatus(reason); throw new FailoverError(err, { reason, @@ -603,13 +688,33 @@ export async function executePreparedCliRun( }); } - const parsed = parseCliOutput({ - raw: stdout, - backend, - providerId: context.backendResolved.id, - outputMode: useResume ? (backend.resumeOutput ?? backend.output) : backend.output, - fallbackSessionId: resolvedSessionId, - }); + const outputMode = useResume ? (backend.resumeOutput ?? backend.output) : backend.output; + const streamedJsonlOutput = + outputMode === "jsonl" ? (streamingParser?.getOutput() ?? null) : null; + + if (stdoutParseExceeded && !streamedJsonlOutput) { + throw new FailoverError( + `CLI stdout exceeded ${CLI_RUNNER_OUTPUT_PARSE_BYTES} bytes; refusing to parse truncated output.`, + { + reason: "format", + provider: params.provider, + model: context.modelId, + sessionId: params.sessionId, + lane: params.lane, + status: resolveFailoverStatus("format"), + }, + ); + } + + const parsed = + streamedJsonlOutput ?? + parseCliOutput({ + raw: stdout, + backend, + providerId: context.backendResolved.id, + outputMode, + fallbackSessionId: resolvedSessionId, + }); const rawText = parsed.text; return { ...parsed,