diff --git a/docs/gateway/cli-backends.md b/docs/gateway/cli-backends.md index 1278313f314..6877daab490 100644 --- a/docs/gateway/cli-backends.md +++ b/docs/gateway/cli-backends.md @@ -158,6 +158,12 @@ The provider id becomes the left side of your model ref: - `existing`: only send a session id if one was stored before. - `none`: never send a session id. +Serialization notes: + +- `serialize: true` keeps same-lane runs ordered. +- Most CLIs serialize on one provider lane. +- `claude-cli` is narrower: resumed runs serialize per Claude session id, and fresh runs serialize per workspace path. Independent workspaces can run in parallel. + ## Images (pass-through) If your CLI accepts image paths, set `imageArg`: diff --git a/src/agents/claude-cli-runner.test.ts b/src/agents/claude-cli-runner.test.ts index 17eeda127a8..ca942499c92 100644 --- a/src/agents/claude-cli-runner.test.ts +++ b/src/agents/claude-cli-runner.test.ts @@ -117,7 +117,7 @@ describe("runClaudeCliAgent", () => { expect(spawnInput.argv).toContain("hi"); }); - it("serializes concurrent claude-cli runs", async () => { + it("serializes concurrent claude-cli runs in the same workspace", async () => { const firstDeferred = createDeferred>(); const secondDeferred = createDeferred>(); @@ -155,4 +155,40 @@ describe("runClaudeCliAgent", () => { await Promise.all([firstRun, secondRun]); }); + + it("allows concurrent claude-cli runs across different workspaces", async () => { + const firstDeferred = createDeferred>(); + const secondDeferred = createDeferred>(); + + supervisorSpawnMock + .mockResolvedValueOnce(createManagedRun(firstDeferred.promise)) + .mockResolvedValueOnce(createManagedRun(secondDeferred.promise)); + + const firstRun = runClaudeCliAgent({ + sessionId: "s1", + sessionFile: "/tmp/session-1.jsonl", + workspaceDir: "/tmp/project-a", + prompt: "first", + model: "opus", + timeoutMs: 1_000, + runId: "run-a", + }); + + const secondRun = runClaudeCliAgent({ + sessionId: "s2", + sessionFile: "/tmp/session-2.jsonl", + workspaceDir: "/tmp/project-b", + prompt: "second", + model: "opus", + timeoutMs: 1_000, + runId: "run-b", + }); + + await waitForCalls(supervisorSpawnMock, 2); + + firstDeferred.resolve(successExit({ message: "ok", session_id: "sid-a" })); + secondDeferred.resolve(successExit({ message: "ok", session_id: "sid-b" })); + + await Promise.all([firstRun, secondRun]); + }); }); diff --git a/src/agents/cli-output.test.ts b/src/agents/cli-output.test.ts index 8c203058e9d..161a3ba99fe 100644 --- a/src/agents/cli-output.test.ts +++ b/src/agents/cli-output.test.ts @@ -1,5 +1,34 @@ import { describe, expect, it } from "vitest"; -import { parseCliJsonl } from "./cli-output.js"; +import { parseCliJson, parseCliJsonl } from "./cli-output.js"; + +describe("parseCliJson", () => { + it("recovers mixed-output Claude session metadata from embedded JSON objects", () => { + const result = parseCliJson( + [ + "Claude Code starting...", + '{"type":"init","session_id":"session-789"}', + '{"type":"result","result":"Claude says hi","usage":{"input_tokens":9,"output_tokens":4}}', + ].join("\n"), + { + command: "claude", + output: "json", + sessionIdFields: ["session_id"], + }, + ); + + expect(result).toEqual({ + text: "Claude says hi", + sessionId: "session-789", + usage: { + input: 9, + output: 4, + cacheRead: undefined, + cacheWrite: undefined, + total: undefined, + }, + }); + }); +}); describe("parseCliJsonl", () => { it("parses Claude stream-json result events", () => { @@ -72,4 +101,22 @@ describe("parseCliJsonl", () => { }, }); }); + + it("parses multiple JSON objects embedded on the same line", () => { + const result = parseCliJsonl( + '{"type":"init","session_id":"session-999"} {"type":"result","session_id":"session-999","result":"done"}', + { + command: "claude", + output: "jsonl", + sessionIdFields: ["session_id"], + }, + "claude-cli", + ); + + expect(result).toEqual({ + text: "done", + sessionId: "session-999", + usage: undefined, + }); + }); }); diff --git a/src/agents/cli-output.ts b/src/agents/cli-output.ts index d747b52450e..7dfea715d92 100644 --- a/src/agents/cli-output.ts +++ b/src/agents/cli-output.ts @@ -16,6 +16,82 @@ export type CliOutput = { usage?: CliUsage; }; +function extractJsonObjectCandidates(raw: string): string[] { + const candidates: string[] = []; + let depth = 0; + let start = -1; + let inString = false; + let escaped = false; + + for (let index = 0; index < raw.length; index += 1) { + const char = raw[index] ?? ""; + if (escaped) { + escaped = false; + continue; + } + if (char === "\\") { + if (inString) { + escaped = true; + } + continue; + } + if (char === '"') { + inString = !inString; + continue; + } + if (inString) { + continue; + } + if (char === "{") { + if (depth === 0) { + start = index; + } + depth += 1; + continue; + } + if (char === "}" && depth > 0) { + depth -= 1; + if (depth === 0 && start >= 0) { + candidates.push(raw.slice(start, index + 1)); + start = -1; + } + } + } + + return candidates; +} + +function parseJsonRecordCandidates(raw: string): Record[] { + const parsedRecords: Record[] = []; + const trimmed = raw.trim(); + if (!trimmed) { + return parsedRecords; + } + + try { + const parsed = JSON.parse(trimmed); + if (isRecord(parsed)) { + parsedRecords.push(parsed); + return parsedRecords; + } + } catch { + // Fall back to scanning for top-level JSON objects embedded in mixed output. + } + + for (const candidate of extractJsonObjectCandidates(trimmed)) { + try { + const parsed = JSON.parse(candidate); + if (isRecord(parsed)) { + parsedRecords.push(parsed); + } + } catch { + // Ignore malformed fragments and keep scanning remaining objects. + } + } + + return parsedRecords; +} + function toCliUsage(raw: Record): CliUsage | undefined { const pick = (key: string) => typeof raw[key] === "number" && raw[key] > 0 ? raw[key] : undefined; @@ -79,27 +155,40 @@ function pickCliSessionId( } export function parseCliJson(raw: string, backend: CliBackendConfig): CliOutput | null { - const trimmed = raw.trim(); - if (!trimmed) { + const parsedRecords = parseJsonRecordCandidates(raw); + if (parsedRecords.length === 0) { return null; } - let parsed: unknown; - try { - parsed = JSON.parse(trimmed); - } catch { + + let sessionId: string | undefined; + let usage: CliUsage | undefined; + let text = ""; + let sawStructuredOutput = false; + for (const parsed of parsedRecords) { + sessionId = pickCliSessionId(parsed, backend) ?? sessionId; + if (isRecord(parsed.usage)) { + usage = toCliUsage(parsed.usage) ?? usage; + } + const nextText = + collectCliText(parsed.message) || + collectCliText(parsed.content) || + collectCliText(parsed.result) || + collectCliText(parsed); + const trimmedText = nextText.trim(); + if (trimmedText) { + text = trimmedText; + sawStructuredOutput = true; + continue; + } + if (sessionId || usage) { + sawStructuredOutput = true; + } + } + + if (!text && !sawStructuredOutput) { return null; } - if (!isRecord(parsed)) { - return null; - } - const sessionId = pickCliSessionId(parsed, backend); - const usage = isRecord(parsed.usage) ? toCliUsage(parsed.usage) : undefined; - const text = - collectCliText(parsed.message) || - collectCliText(parsed.content) || - collectCliText(parsed.result) || - collectCliText(parsed); - return { text: text.trim(), sessionId, usage }; + return { text, sessionId, usage }; } function parseClaudeCliJsonlResult(params: { @@ -143,40 +232,33 @@ export function parseCliJsonl( let usage: CliUsage | undefined; const texts: string[] = []; for (const line of lines) { - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - continue; - } - if (!isRecord(parsed)) { - continue; - } - if (!sessionId) { - sessionId = pickCliSessionId(parsed, backend); - } - if (!sessionId && typeof parsed.thread_id === "string") { - sessionId = parsed.thread_id.trim(); - } - if (isRecord(parsed.usage)) { - usage = toCliUsage(parsed.usage) ?? usage; - } + for (const parsed of parseJsonRecordCandidates(line)) { + if (!sessionId) { + sessionId = pickCliSessionId(parsed, backend); + } + if (!sessionId && typeof parsed.thread_id === "string") { + sessionId = parsed.thread_id.trim(); + } + if (isRecord(parsed.usage)) { + usage = toCliUsage(parsed.usage) ?? usage; + } - const claudeResult = parseClaudeCliJsonlResult({ - providerId, - parsed, - sessionId, - usage, - }); - if (claudeResult) { - return claudeResult; - } + const claudeResult = parseClaudeCliJsonlResult({ + providerId, + parsed, + sessionId, + usage, + }); + if (claudeResult) { + return claudeResult; + } - const item = isRecord(parsed.item) ? parsed.item : null; - if (item && typeof item.text === "string") { - const type = typeof item.type === "string" ? item.type.toLowerCase() : ""; - if (!type || type.includes("message")) { - texts.push(item.text); + const item = isRecord(parsed.item) ? parsed.item : null; + if (item && typeof item.text === "string") { + const type = typeof item.type === "string" ? item.type.toLowerCase() : ""; + if (!type || type.includes("message")) { + texts.push(item.text); + } } } } diff --git a/src/agents/cli-runner.helpers.test.ts b/src/agents/cli-runner.helpers.test.ts index 8cbe04fd246..209f35c2631 100644 --- a/src/agents/cli-runner.helpers.test.ts +++ b/src/agents/cli-runner.helpers.test.ts @@ -1,7 +1,7 @@ import type { ImageContent } from "@mariozechner/pi-ai"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { MAX_IMAGE_BYTES } from "../media/constants.js"; -import { buildCliArgs, loadPromptRefImages } from "./cli-runner/helpers.js"; +import { buildCliArgs, loadPromptRefImages, resolveCliRunQueueKey } from "./cli-runner/helpers.js"; import * as promptImageUtils from "./pi-embedded-runner/run/images.js"; import type { SandboxFsBridge } from "./sandbox/fs-bridge.js"; import { SYSTEM_PROMPT_CACHE_BOUNDARY } from "./system-prompt-cache-boundary.js"; @@ -134,3 +134,51 @@ describe("buildCliArgs", () => { ).toEqual(["-p", "--append-system-prompt", "Stable prefix\nDynamic suffix"]); }); }); + +describe("resolveCliRunQueueKey", () => { + it("scopes Claude CLI serialization to the workspace for fresh runs", () => { + expect( + resolveCliRunQueueKey({ + backendId: "claude-cli", + serialize: true, + runId: "run-1", + workspaceDir: "/tmp/project-a", + }), + ).toBe("claude-cli:workspace:/tmp/project-a"); + }); + + it("scopes Claude CLI serialization to the resumed CLI session id", () => { + expect( + resolveCliRunQueueKey({ + backendId: "claude-cli", + serialize: true, + runId: "run-2", + workspaceDir: "/tmp/project-a", + cliSessionId: "claude-session-123", + }), + ).toBe("claude-cli:session:claude-session-123"); + }); + + it("keeps non-Claude backends on the provider lane when serialized", () => { + expect( + resolveCliRunQueueKey({ + backendId: "codex-cli", + serialize: true, + runId: "run-3", + workspaceDir: "/tmp/project-a", + cliSessionId: "thread-123", + }), + ).toBe("codex-cli"); + }); + + it("disables serialization when serialize=false", () => { + expect( + resolveCliRunQueueKey({ + backendId: "claude-cli", + serialize: false, + runId: "run-4", + workspaceDir: "/tmp/project-a", + }), + ).toBe("claude-cli:run-4"); + }); +}); diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index c5a5b830f60..e2caf37c9eb 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -13,6 +13,7 @@ import { appendImagePathsToPrompt, buildCliSupervisorScopeKey, buildCliArgs, + resolveCliRunQueueKey, enqueueCliRun, loadPromptRefImages, resolveCliNoOutputTimeoutMs, @@ -138,10 +139,13 @@ export async function executePreparedCliRun( useResume, }); - const serialize = backend.serialize ?? true; - const queueKey = serialize - ? context.backendResolved.id - : `${context.backendResolved.id}:${params.runId}`; + const queueKey = resolveCliRunQueueKey({ + backendId: context.backendResolved.id, + serialize: backend.serialize, + runId: params.runId, + workspaceDir: context.workspaceDir, + cliSessionId: useResume ? resolvedSessionId : undefined, + }); try { return await enqueueCliRun(queueKey, async () => { diff --git a/src/agents/cli-runner/helpers.ts b/src/agents/cli-runner/helpers.ts index bc709681fb6..1479194f130 100644 --- a/src/agents/cli-runner/helpers.ts +++ b/src/agents/cli-runner/helpers.ts @@ -9,6 +9,7 @@ import type { ThinkLevel } from "../../auto-reply/thinking.js"; import type { OpenClawConfig } from "../../config/config.js"; import type { CliBackendConfig } from "../../config/types.js"; import { MAX_IMAGE_BYTES } from "../../media/constants.js"; +import { isClaudeCliProvider } from "../../plugin-sdk/anthropic-cli.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js"; import { buildModelAliasLines } from "../model-alias-lines.js"; import { resolveDefaultModelForAgent } from "../model-selection.js"; @@ -28,6 +29,29 @@ export function enqueueCliRun(key: string, task: () => Promise): Promise