From 81ca7bc40b09dbb6386fc5c1cecf237c5f11004a Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 22 Apr 2026 13:44:18 +0530 Subject: [PATCH] fix: keep claude cli sessions warm (#69679) * feat(cli): keep claude cli sessions warm * test(cli): cover claude live session reuse * fix(cli): harden claude live session reuse * fix(cli): redact mcp session key logs * fix(cli): bound claude live session turns * fix(cli): reuse claude live sessions on resume * refactor(cli): canonicalize claude live argv * fix(cli): preserve claude live resume state * fix(cli): close dead claude live sessions * fix(cli): serialize claude live session creates * fix(cli): count pending claude live sessions * fix(cli): tighten claude live resume abort * fix(cli): reject closed claude live sessions * fix(cli): refresh claude live fingerprints * fix(cli): stabilize MCP resume hash * fix: preserve claude live inline resume (#69679) --------- Co-authored-by: Frank Yang --- CHANGELOG.md | 1 + docs/.generated/config-baseline.sha256 | 4 +- docs/gateway/cli-backends.md | 6 + extensions/anthropic/cli-backend.ts | 1 + src/agents/cli-runner.spawn.test.ts | 997 ++++++++++++++++++- src/agents/cli-runner.ts | 3 + src/agents/cli-runner/bundle-mcp.test.ts | 70 ++ src/agents/cli-runner/bundle-mcp.ts | 53 + src/agents/cli-runner/claude-live-session.ts | 913 +++++++++++++++++ src/agents/cli-runner/execute.ts | 94 +- src/agents/cli-runner/prepare.ts | 1 + src/agents/cli-runner/types.ts | 1 + src/agents/cli-session.test.ts | 34 + src/agents/cli-session.ts | 13 + src/config/schema.base.generated.ts | 4 + src/config/sessions/types.ts | 1 + src/config/types.agent-defaults.ts | 2 + src/config/zod-schema.core.ts | 1 + 18 files changed, 2172 insertions(+), 27 deletions(-) create mode 100644 src/agents/cli-runner/claude-live-session.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4451d224f37..e761d22cb69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai - Tokenjuice: add bundled native OpenClaw support for tokenjuice as an opt-in plugin that compacts noisy `exec` and `bash` tool results in Pi embedded runs. (#69946) Thanks @vincentkoc. - Providers/Tencent: add the bundled Tencent Cloud provider plugin with TokenHub and Token Plan onboarding, docs, `hy3-preview` model catalog entries, and tiered Hy3 pricing metadata. (#68460) Thanks @JuniperSling. - TUI: add local embedded mode for running terminal chats without a Gateway while keeping plugin approval gates enforced. (#66767) Thanks @fuller-stack-dev. +- CLI/Claude: keep compatible `claude-cli` runs on a warm stdio session and resume from the stored Claude session after Gateway restarts or idle exits. (#69679) Thanks @obviyus. ### Fixes diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index cd11be17227..fe51591a551 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -e77c14ad4db1be62275667537716917e4d0da73e1afb89be1edeb78d73346ae4 config-baseline.json -ed4e305904b4b954ffa72c07ea1900a116bfd874ac0c637227883abb99f753f9 config-baseline.core.json +3f08544c1a8143755a848aeb731f2eddf4f84cf70950c7d165f8889e01e4985d config-baseline.json +2190e81fcd754b96b48a1e012600f3b74fdb9b91eac280d8e3e038fcb73d6546 config-baseline.core.json 6c0069b971ae298ae68516ebcd3eae0e8c82820d2e8f42ecbd2f53a2f9077371 config-baseline.channel.json 9096ec947597b03f97eef44186a3102fd80ffb7f3e791fb64544464d4571448f config-baseline.plugin.json diff --git a/docs/gateway/cli-backends.md b/docs/gateway/cli-backends.md index b0587467662..646074f3308 100644 --- a/docs/gateway/cli-backends.md +++ b/docs/gateway/cli-backends.md @@ -143,6 +143,8 @@ The provider id becomes the left side of your model ref: 1. **Selects a backend** based on the provider prefix (`codex-cli/...`). 2. **Builds a system prompt** using the same OpenClaw prompt + workspace context. 3. **Executes the CLI** with a session id (if supported) so history stays consistent. + The bundled `claude-cli` backend keeps a Claude stdio process alive per + OpenClaw session and sends follow-up turns over stream-json stdin. 4. **Parses output** (JSON or plain text) and returns the final text. 5. **Persists session ids** per backend, so follow-ups reuse the same CLI session. @@ -179,6 +181,10 @@ child process environment for the run. - `always`: always send a session id (new UUID if none stored). - `existing`: only send a session id if one was stored before. - `none`: never send a session id. +- The bundled `claude-cli` backend uses `liveSession: "claude-stdio"` so + follow-up turns reuse the live Claude process while it is active. If the + Gateway restarts or the idle process exits, OpenClaw resumes from the stored + Claude session id. Serialization notes: diff --git a/extensions/anthropic/cli-backend.ts b/extensions/anthropic/cli-backend.ts index c7aeb31809c..2933fa27741 100644 --- a/extensions/anthropic/cli-backend.ts +++ b/extensions/anthropic/cli-backend.ts @@ -53,6 +53,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin { "{sessionId}", ], output: "jsonl", + liveSession: "claude-stdio", input: "stdin", modelArg: "--model", modelAliases: CLAUDE_CLI_MODEL_ALIASES, diff --git a/src/agents/cli-runner.spawn.test.ts b/src/agents/cli-runner.spawn.test.ts index 4663b3ecc0b..b4a4cde2f5f 100644 --- a/src/agents/cli-runner.spawn.test.ts +++ b/src/agents/cli-runner.spawn.test.ts @@ -3,6 +3,7 @@ import os from "node:os"; import path from "node:path"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js"; +import type { getProcessSupervisor } from "../process/supervisor/index.js"; import { makeBootstrapWarn as realMakeBootstrapWarn, resolveBootstrapContextForRun as realResolveBootstrapContextForRun, @@ -14,14 +15,23 @@ import { restoreCliRunnerPrepareTestDeps, supervisorSpawnMock, } from "./cli-runner.test-support.js"; +import { + buildClaudeLiveArgs, + resetClaudeLiveSessionsForTest, + runClaudeLiveSessionTurn, +} from "./cli-runner/claude-live-session.js"; import { buildCliEnvAuthLog, executePreparedCliRun } from "./cli-runner/execute.js"; import { buildSystemPrompt } from "./cli-runner/helpers.js"; import { setCliRunnerPrepareTestDeps } from "./cli-runner/prepare.js"; import type { PreparedCliRunContext } from "./cli-runner/types.js"; import { createClaudeApiErrorFixture } from "./test-helpers/claude-api-error-fixture.js"; +type ProcessSupervisor = ReturnType; +type SupervisorSpawnFn = ProcessSupervisor["spawn"]; + beforeEach(() => { resetAgentEventsForTest(); + resetClaudeLiveSessionsForTest(); restoreCliRunnerPrepareTestDeps(); supervisorSpawnMock.mockClear(); }); @@ -31,8 +41,11 @@ function buildPreparedCliRunContext(params: { model: string; runId: string; prompt?: string; + sessionId?: string; + sessionKey?: string; backend?: Partial; config?: PreparedCliRunContext["params"]["config"]; + mcpConfigHash?: string; skillsSnapshot?: PreparedCliRunContext["params"]["skillsSnapshot"]; workspaceDir?: string; }): PreparedCliRunContext { @@ -67,7 +80,8 @@ function buildPreparedCliRunContext(params: { const backend = { ...baseBackend, ...params.backend }; return { params: { - sessionId: "s1", + sessionId: params.sessionId ?? "s1", + sessionKey: params.sessionKey, sessionFile: "/tmp/session.jsonl", workspaceDir, config: params.config, @@ -89,6 +103,7 @@ function buildPreparedCliRunContext(params: { preparedBackend: { backend, env: {}, + ...(params.mcpConfigHash ? { mcpConfigHash: params.mcpConfigHash } : {}), }, reusableCliSession: {}, modelId: params.model, @@ -619,6 +634,986 @@ describe("runCliAgent spawn path", () => { } }); + it("reuses a Claude live session process across turns", async () => { + const agentEvents: unknown[] = []; + const stop = onAgentEvent((evt) => { + if (evt.stream === "assistant") { + agentEvents.push(evt.data); + } + }); + const writes: string[] = []; + let stdoutListener: ((chunk: string) => void) | undefined; + const stdin = { + write: vi.fn((data: string, cb?: (err?: Error | null) => void) => { + writes.push(data); + const prompt = (JSON.parse(data) as { message: { content: string } }).message.content; + const text = prompt === "first" ? "one" : "two"; + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-session-1" }), + JSON.stringify({ + type: "stream_event", + event: { + type: "content_block_delta", + delta: { type: "text_delta", text }, + }, + }), + JSON.stringify({ + type: "result", + session_id: "live-session-1", + result: text, + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + + try { + const first = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-1", + prompt: "first", + backend: { + args: [ + "-p", + "--output-format", + "stream-json", + "--strict-mcp-config", + "--mcp-config", + "/tmp/mcp-one.json", + ], + liveSession: "claude-stdio", + }, + mcpConfigHash: "same-mcp-config", + }), + ); + const second = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-2", + prompt: "second", + backend: { + args: [ + "-p", + "--output-format", + "stream-json", + "--strict-mcp-config", + "--mcp-config", + "/tmp/mcp-two.json", + ], + liveSession: "claude-stdio", + }, + mcpConfigHash: "same-mcp-config", + }), + ); + + const spawnInput = supervisorSpawnMock.mock.calls[0]?.[0] as { + argv?: string[]; + stdinMode?: string; + }; + expect(first.text).toBe("one"); + expect(second.text).toBe("two"); + expect(supervisorSpawnMock).toHaveBeenCalledOnce(); + expect(spawnInput.stdinMode).toBe("pipe-open"); + expect(spawnInput.argv).toContain("--input-format"); + expect(spawnInput.argv).toContain("stream-json"); + expect(spawnInput.argv).toContain("--replay-user-messages"); + expect(spawnInput.argv).not.toContain("--session-id"); + expect(spawnInput.argv).toContain("/tmp/mcp-one.json"); + expect( + writes.map( + (entry) => (JSON.parse(entry) as { message: { content: string } }).message.content, + ), + ).toEqual(["first", "second"]); + expect(agentEvents).toEqual([ + { text: "one", delta: "one" }, + { text: "two", delta: "two" }, + ]); + } finally { + stop(); + } + }); + + it("reuses a Claude live session when resumed turns omit the system prompt arg", async () => { + let stdoutListener: ((chunk: string) => void) | undefined; + let turn = 0; + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + turn += 1; + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-system" }), + JSON.stringify({ + type: "result", + session_id: "live-system", + result: turn === 1 ? "one" : "two", + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + + const backend = { + resumeArgs: ["-p", "--output-format", "stream-json", "--resume={sessionId}"], + liveSession: "claude-stdio" as const, + }; + const first = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-system-1", + prompt: "first", + backend, + }), + ); + const second = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-system-2", + prompt: "second", + backend, + }), + "live-system", + ); + + expect(first.text).toBe("one"); + expect(second.text).toBe("two"); + expect(supervisorSpawnMock).toHaveBeenCalledOnce(); + }); + + it("serializes concurrent Claude live session creation for the same key", async () => { + let stdoutListener: ((chunk: string) => void) | undefined; + let releaseSpawn: (() => void) | undefined; + let turn = 0; + const spawnReady = new Promise((resolve) => { + releaseSpawn = resolve; + }); + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + turn += 1; + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-concurrent" }), + JSON.stringify({ + type: "result", + session_id: "live-concurrent", + result: turn === 1 ? "one" : "two", + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + await spawnReady; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + + const backend = { + liveSession: "claude-stdio" as const, + }; + const first = executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-concurrent-1", + prompt: "first", + backend, + }), + ); + const second = executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-concurrent-2", + prompt: "second", + backend, + }), + ); + await vi.waitFor(() => expect(supervisorSpawnMock).toHaveBeenCalledOnce()); + releaseSpawn?.(); + + await expect(Promise.all([first, second])).resolves.toEqual([ + expect.objectContaining({ text: "one" }), + expect.objectContaining({ text: "two" }), + ]); + expect(supervisorSpawnMock).toHaveBeenCalledOnce(); + }); + + it("counts pending Claude live session creates against the session cap", async () => { + let releaseSpawn: (() => void) | undefined; + const spawnReady = new Promise((resolve) => { + releaseSpawn = resolve; + }); + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + const spawnIndex = supervisorSpawnMock.mock.calls.length; + await spawnReady; + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + input.onStdout?.( + [ + JSON.stringify({ + type: "system", + subtype: "init", + session_id: `live-cap-${spawnIndex}`, + }), + JSON.stringify({ + type: "result", + session_id: `live-cap-${spawnIndex}`, + result: `ok-${spawnIndex}`, + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + return { + runId: `live-run-${spawnIndex}`, + pid: 2300 + spawnIndex, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + + const backend = { + liveSession: "claude-stdio" as const, + }; + const runs = Array.from({ length: 17 }, (_, index) => + (() => { + const context = buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: `run-live-cap-${index}`, + prompt: `prompt ${index}`, + sessionId: `session-${index}`, + backend, + }); + return runClaudeLiveSessionTurn({ + context, + args: context.preparedBackend.backend.args ?? [], + env: {}, + prompt: `prompt ${index}`, + useResume: false, + noOutputTimeoutMs: 1_000, + getProcessSupervisor: () => ({ + spawn: (params: Parameters[0]) => + supervisorSpawnMock(params) as ReturnType, + cancel: vi.fn(), + cancelScope: vi.fn(), + reconcileOrphans: vi.fn(), + getRecord: vi.fn(), + }), + onAssistantDelta: () => {}, + cleanup: async () => {}, + }); + })(), + ); + + await vi.waitFor(() => expect(supervisorSpawnMock).toHaveBeenCalledTimes(16)); + const rejectedRun = runs[16]; + expect(rejectedRun).toBeDefined(); + await expect(rejectedRun).rejects.toThrow("Too many Claude CLI live sessions are active."); + releaseSpawn?.(); + await expect(Promise.all(runs.slice(0, 16))).resolves.toHaveLength(16); + expect(supervisorSpawnMock).toHaveBeenCalledTimes(16); + }); + + it("preserves Claude resume args when building live session argv", () => { + const backend: PreparedCliRunContext["preparedBackend"]["backend"] = { + command: "claude", + args: ["-p", "--output-format", "stream-json"], + output: "jsonl", + input: "stdin", + sessionArg: "--session-id", + systemPromptArg: "--append-system-prompt", + }; + + const args = buildClaudeLiveArgs({ + args: [ + "-p", + "--output-format", + "stream-json", + "--resume", + "claude-session", + "--session-id", + "openclaw-session", + "--append-system-prompt", + "old prompt", + ], + backend, + systemPrompt: "current prompt", + useResume: true, + }); + + expect(args).toContain("--resume"); + expect(args).toContain("claude-session"); + expect(args).not.toContain("--session-id"); + expect(args).not.toContain("openclaw-session"); + expect(args).not.toContain("--append-system-prompt"); + expect(args).not.toContain("old prompt"); + expect(args).not.toContain("current prompt"); + }); + + it("restarts Claude live sessions for env changes and fresh retries", async () => { + const cancels: Array> = []; + const turnResults = ["first-ok", "resume-ok", "env-ok", "fresh-ok"]; + let turnIndex = 0; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const spawnIndex = supervisorSpawnMock.mock.calls.length; + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + const cancel = vi.fn(); + cancels.push(cancel); + return { + runId: `live-run-${spawnIndex}`, + pid: 2345 + spawnIndex, + startedAtMs: Date.now(), + stdin: { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + const result = turnResults[turnIndex] ?? "ok"; + turnIndex += 1; + input.onStdout?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-session" }), + JSON.stringify({ + type: "result", + session_id: "live-session", + result, + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }, + wait: vi.fn(() => new Promise(() => {})), + cancel, + }; + }); + const runTurn = async (runId: string, args: string[], env: Record) => { + const context = buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId, + backend: { + liveSession: "claude-stdio", + resumeArgs: ["-p", "--output-format", "stream-json", "--resume", "{sessionId}"], + }, + }); + const result = await runClaudeLiveSessionTurn({ + context, + args, + env, + prompt: "hi", + useResume: args.some((entry) => entry.startsWith("--resume")), + noOutputTimeoutMs: 1_000, + getProcessSupervisor: () => ({ + spawn: (params: Parameters[0]) => + supervisorSpawnMock(params) as ReturnType, + cancel: vi.fn(), + cancelScope: vi.fn(), + reconcileOrphans: vi.fn(), + getRecord: vi.fn(), + }), + onAssistantDelta: () => {}, + cleanup: async () => {}, + }); + return result.output.text; + }; + const freshArgs = ["-p", "--output-format", "stream-json"]; + const resumeArgs = ["-p", "--output-format", "stream-json", "--resume", "live-session"]; + + await expect( + runTurn("run-live-fresh", freshArgs, { ANTHROPIC_BASE_URL: "https://one.example" }), + ).resolves.toBe("first-ok"); + await expect( + runTurn("run-live-resume", resumeArgs, { ANTHROPIC_BASE_URL: "https://one.example" }), + ).resolves.toBe("resume-ok"); + expect(supervisorSpawnMock).toHaveBeenCalledTimes(1); + expect(cancels[0]).not.toHaveBeenCalled(); + + await expect( + runTurn("run-live-env-change", resumeArgs, { ANTHROPIC_BASE_URL: "https://two.example" }), + ).resolves.toBe("env-ok"); + expect(supervisorSpawnMock).toHaveBeenCalledTimes(2); + expect(cancels[0]).toHaveBeenCalledWith("manual-cancel"); + + await expect( + runTurn("run-live-fresh-retry", freshArgs, { + ANTHROPIC_BASE_URL: "https://two.example", + }), + ).resolves.toBe("fresh-ok"); + + expect(supervisorSpawnMock).toHaveBeenCalledTimes(3); + expect(cancels[1]).toHaveBeenCalledWith("manual-cancel"); + expect(cancels[2]).not.toHaveBeenCalled(); + }); + + it("ignores non-JSON stdout lines from Claude live sessions", async () => { + let stdoutListener: ((chunk: string) => void) | undefined; + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + stdoutListener?.( + [ + "Claude CLI warning", + JSON.stringify({ type: "system", subtype: "init", session_id: "live-mixed" }), + JSON.stringify({ + type: "result", + session_id: "live-mixed", + result: "mixed-ok", + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + + const result = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-mixed", + backend: { + liveSession: "claude-stdio", + }, + }), + ); + + expect(result.text).toBe("mixed-ok"); + }); + + it("fails Claude live turns on is_error results", async () => { + let stdoutListener: ((chunk: string) => void) | undefined; + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-error" }), + JSON.stringify({ + type: "result", + session_id: "live-error", + is_error: true, + result: "Credit balance is too low", + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + + await expect( + executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-error", + backend: { + liveSession: "claude-stdio", + }, + }), + ), + ).rejects.toMatchObject({ + name: "FailoverError", + message: "Credit balance is too low", + }); + }); + + it("fails when Claude exits before a live turn starts", async () => { + supervisorSpawnMock.mockImplementationOnce(async () => ({ + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin: { + write: vi.fn(), + end: vi.fn(), + }, + wait: vi.fn(async () => ({ + reason: "exit", + exitCode: 1, + exitSignal: null, + durationMs: 1, + stdout: "", + stderr: "startup failed", + timedOut: false, + noOutputTimedOut: false, + })), + cancel: vi.fn(), + })); + + await expect( + executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-startup-exit", + backend: { + liveSession: "claude-stdio", + }, + }), + ), + ).rejects.toThrow("Claude CLI live session closed before handling the turn"); + }); + + it("restarts the Claude live process after request abort", async () => { + const abortController = new AbortController(); + let stdoutListener: ((chunk: string) => void) | undefined; + const cancels: Array> = []; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + const spawnIndex = supervisorSpawnMock.mock.calls.length; + const cancel = vi.fn(); + cancels.push(cancel); + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + if (spawnIndex === 2) { + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-abort-2" }), + JSON.stringify({ + type: "result", + session_id: "live-abort-2", + result: "second-ok", + }), + ].join("\n") + "\n", + ); + } + cb?.(); + }), + end: vi.fn(), + }; + return { + runId: `live-run-${spawnIndex}`, + pid: 2345 + spawnIndex, + startedAtMs: Date.now(), + stdin, + wait: vi.fn( + () => + new Promise((resolve) => { + if (spawnIndex === 1) { + cancel.mockImplementationOnce(() => { + resolve({ + reason: "manual-cancel", + exitCode: null, + exitSignal: null, + durationMs: 50, + stdout: "", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }); + } + }), + ), + cancel, + }; + }); + + const firstContext = buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-abort-1", + backend: { + liveSession: "claude-stdio", + }, + }); + firstContext.params.abortSignal = abortController.signal; + const first = executePreparedCliRun(firstContext); + + await vi.waitFor(() => { + expect(supervisorSpawnMock).toHaveBeenCalledTimes(1); + }); + abortController.abort(); + + await expect(first).rejects.toMatchObject({ name: "AbortError" }); + expect(cancels[0]).toHaveBeenCalledWith("manual-cancel"); + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-abort" }), + JSON.stringify({ + type: "result", + session_id: "live-abort", + result: "discarded", + }), + ].join("\n") + "\n", + ); + + const second = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-abort-2", + backend: { + liveSession: "claude-stdio", + }, + }), + ); + + expect(second.text).toBe("second-ok"); + expect(supervisorSpawnMock).toHaveBeenCalledTimes(2); + }); + + it("restarts Claude live sessions when selected skills change", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-live-skills-")); + const weatherDir = path.join(workspaceDir, "skills", "weather"); + const gitDir = path.join(workspaceDir, "skills", "git"); + await fs.mkdir(weatherDir, { recursive: true }); + await fs.mkdir(gitDir, { recursive: true }); + await fs.writeFile(path.join(weatherDir, "SKILL.md"), "weather instructions\n", "utf-8"); + await fs.writeFile(path.join(gitDir, "SKILL.md"), "git instructions\n", "utf-8"); + + const cancels: Array> = []; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const spawnIndex = supervisorSpawnMock.mock.calls.length; + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + const cancel = vi.fn(); + cancels.push(cancel); + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + const text = spawnIndex === 1 ? "weather-ok" : "git-ok"; + input.onStdout?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: `live-${spawnIndex}` }), + JSON.stringify({ + type: "result", + session_id: `live-${spawnIndex}`, + result: text, + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + return { + runId: `live-run-${spawnIndex}`, + pid: 2345 + spawnIndex, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel, + }; + }); + + try { + const first = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-skills-1", + prompt: "first", + workspaceDir, + backend: { + liveSession: "claude-stdio", + }, + skillsSnapshot: { + prompt: "weather", + skills: [{ name: "weather" }], + resolvedSkills: [ + { + name: "weather", + description: "Weather instructions.", + filePath: path.join(weatherDir, "SKILL.md"), + baseDir: weatherDir, + source: "test", + sourceInfo: { + path: weatherDir, + source: "test", + scope: "project", + origin: "top-level", + baseDir: weatherDir, + }, + disableModelInvocation: false, + }, + ], + }, + }), + ); + const second = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-skills-2", + prompt: "second", + workspaceDir, + backend: { + liveSession: "claude-stdio", + }, + skillsSnapshot: { + prompt: "git", + skills: [{ name: "git" }], + resolvedSkills: [ + { + name: "git", + description: "Git instructions.", + filePath: path.join(gitDir, "SKILL.md"), + baseDir: gitDir, + source: "test", + sourceInfo: { + path: gitDir, + source: "test", + scope: "project", + origin: "top-level", + baseDir: gitDir, + }, + disableModelInvocation: false, + }, + ], + }, + }), + ); + + expect(first.text).toBe("weather-ok"); + expect(second.text).toBe("git-ok"); + expect(supervisorSpawnMock).toHaveBeenCalledTimes(2); + expect(cancels[0]).toHaveBeenCalledWith("manual-cancel"); + expect(cancels[1]).not.toHaveBeenCalled(); + } finally { + await fs.rm(workspaceDir, { recursive: true, force: true }); + } + }); + + it("closes idle Claude live sessions after ten minutes", async () => { + vi.useFakeTimers(); + const writes: string[] = []; + let stdoutListener: ((chunk: string) => void) | undefined; + const cancel = vi.fn(); + const stdin = { + write: vi.fn((data: string, cb?: (err?: Error | null) => void) => { + writes.push(data); + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-session-idle" }), + JSON.stringify({ + type: "result", + session_id: "live-session-idle", + result: "idle-ok", + }), + ].join("\n") + "\n", + ); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel, + }; + }); + + try { + const result = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-idle", + prompt: "idle", + backend: { + liveSession: "claude-stdio", + }, + }), + ); + + expect(result.text).toBe("idle-ok"); + expect(cancel).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(10 * 60 * 1_000 - 1); + expect(cancel).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(1); + expect(cancel).toHaveBeenCalledWith("manual-cancel"); + expect( + writes.map( + (entry) => (JSON.parse(entry) as { message: { content: string } }).message.content, + ), + ).toEqual(["idle"]); + } finally { + vi.useRealTimers(); + } + }); + + it("does not surface stale stderr after a later Claude live exit", async () => { + let stdoutListener: ((chunk: string) => void) | undefined; + let stderrListener: ((chunk: string) => void) | undefined; + let resolveExit!: (value: { + reason: "exit"; + exitCode: number; + exitSignal: null; + durationMs: number; + stdout: string; + stderr: string; + timedOut: false; + noOutputTimedOut: false; + }) => void; + const wait = new Promise<{ + reason: "exit"; + exitCode: number; + exitSignal: null; + durationMs: number; + stdout: string; + stderr: string; + timedOut: false; + noOutputTimedOut: false; + }>((resolve) => { + resolveExit = resolve; + }); + let writeCount = 0; + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + writeCount += 1; + if (writeCount === 1) { + stderrListener?.("stale stderr from first turn"); + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-stderr" }), + JSON.stringify({ + type: "result", + session_id: "live-stderr", + result: "first-ok", + }), + ].join("\n") + "\n", + ); + cb?.(); + return; + } + cb?.(); + resolveExit({ + reason: "exit", + exitCode: 1, + exitSignal: null, + durationMs: 50, + stdout: "", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementationOnce(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { + onStdout?: (chunk: string) => void; + onStderr?: (chunk: string) => void; + }; + stdoutListener = input.onStdout; + stderrListener = input.onStderr; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => wait), + cancel: vi.fn(), + }; + }); + + const first = await executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-stderr-1", + prompt: "first", + backend: { + liveSession: "claude-stdio", + }, + }), + ); + const second = executePreparedCliRun( + buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-stderr-2", + prompt: "second", + backend: { + liveSession: "claude-stdio", + }, + }), + ); + + expect(first.text).toBe("first-ok"); + await expect(second).rejects.toMatchObject({ + name: "FailoverError", + message: "Claude CLI failed.", + }); + }); + it("surfaces nested Claude stream-json API errors instead of raw event output", async () => { const { message, jsonl } = createClaudeApiErrorFixture(); diff --git a/src/agents/cli-runner.ts b/src/agents/cli-runner.ts index a4babfd378c..aba115367b1 100644 --- a/src/agents/cli-runner.ts +++ b/src/agents/cli-runner.ts @@ -78,6 +78,9 @@ export async function runPreparedCliAgent( ...(context.preparedBackend.mcpConfigHash ? { mcpConfigHash: context.preparedBackend.mcpConfigHash } : {}), + ...(context.preparedBackend.mcpResumeHash + ? { mcpResumeHash: context.preparedBackend.mcpResumeHash } + : {}), }, } : {}), diff --git a/src/agents/cli-runner/bundle-mcp.test.ts b/src/agents/cli-runner/bundle-mcp.test.ts index 20b62a9c66b..8da07818527 100644 --- a/src/agents/cli-runner/bundle-mcp.test.ts +++ b/src/agents/cli-runner/bundle-mcp.test.ts @@ -103,6 +103,7 @@ describe("prepareCliBundleMcpConfig", () => { }; expect(raw.mcpServers?.bundleProbe?.args).toEqual([await fs.realpath(bundleProbeServerPath)]); expect(prepared.mcpConfigHash).toMatch(/^[0-9a-f]{64}$/); + expect(prepared.mcpResumeHash).toMatch(/^[0-9a-f]{64}$/); await prepared.cleanup?.(); }); @@ -189,6 +190,75 @@ describe("prepareCliBundleMcpConfig", () => { await prepared.cleanup?.(); }); + it("stabilizes the resume hash when only the OpenClaw loopback port changes", async () => { + const first = await prepareBundleProbeCliConfig({ + additionalConfig: { + mcpServers: { + openclaw: { + type: "http", + url: "http://127.0.0.1:23119/mcp", + headers: { + Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}", + }, + }, + }, + }, + }); + const second = await prepareBundleProbeCliConfig({ + additionalConfig: { + mcpServers: { + openclaw: { + type: "http", + url: "http://127.0.0.1:24567/mcp", + headers: { + Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}", + }, + }, + }, + }, + }); + + expect(first.mcpConfigHash).not.toBe(second.mcpConfigHash); + expect(first.mcpResumeHash).toBe(second.mcpResumeHash); + + await first.cleanup?.(); + await second.cleanup?.(); + }); + + it("changes the resume hash when stable MCP semantics change", async () => { + const first = await prepareBundleProbeCliConfig({ + additionalConfig: { + mcpServers: { + openclaw: { + type: "http", + url: "http://127.0.0.1:23119/mcp", + headers: { + Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}", + }, + }, + }, + }, + }); + const second = await prepareBundleProbeCliConfig({ + additionalConfig: { + mcpServers: { + openclaw: { + type: "http", + url: "http://127.0.0.1:23119/other", + headers: { + Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}", + }, + }, + }, + }, + }); + + expect(first.mcpResumeHash).not.toBe(second.mcpResumeHash); + + await first.cleanup?.(); + await second.cleanup?.(); + }); + it("preserves extra env values alongside generated MCP config", async () => { const workspaceDir = await tempHarness.createTempDir("openclaw-cli-bundle-mcp-env-"); diff --git a/src/agents/cli-runner/bundle-mcp.ts b/src/agents/cli-runner/bundle-mcp.ts index 06503763fb6..7be248b4bdc 100644 --- a/src/agents/cli-runner/bundle-mcp.ts +++ b/src/agents/cli-runner/bundle-mcp.ts @@ -22,6 +22,7 @@ type PreparedCliBundleMcpConfig = { backend: CliBackendConfig; cleanup?: () => Promise; mcpConfigHash?: string; + mcpResumeHash?: string; env?: Record; }; @@ -255,6 +256,49 @@ async function writeGeminiSystemSettings( }; } +function sortJsonValue(value: unknown): unknown { + if (Array.isArray(value)) { + return value.map((entry) => sortJsonValue(entry)); + } + if (!isRecord(value)) { + return value; + } + return Object.fromEntries( + Object.keys(value) + .toSorted() + .map((key) => [key, sortJsonValue(value[key])]), + ); +} + +function normalizeOpenClawLoopbackUrl(value: string): string { + const match = + /^(http:\/\/(?:127\.0\.0\.1|localhost|\[::1\])):\d+(\/mcp)$/.exec(value.trim()) ?? undefined; + if (!match) { + return value; + } + return `${match[1]}:${match[2]}`; +} + +function canonicalizeBundleMcpConfigForResume(config: BundleMcpConfig): BundleMcpConfig { + const canonicalServers = Object.fromEntries( + Object.entries(config.mcpServers).map(([name, server]) => { + if (name !== "openclaw" || typeof server.url !== "string") { + return [name, sortJsonValue(server)]; + } + return [ + name, + sortJsonValue({ + ...server, + url: normalizeOpenClawLoopbackUrl(server.url), + }), + ]; + }), + ) as BundleMcpConfig["mcpServers"]; + return { + mcpServers: sortJsonValue(canonicalServers) as BundleMcpConfig["mcpServers"], + }; +} + async function prepareModeSpecificBundleMcpConfig(params: { mode: CliBundleMcpMode; backend: CliBackendConfig; @@ -263,6 +307,12 @@ async function prepareModeSpecificBundleMcpConfig(params: { }): Promise { const serializedConfig = `${JSON.stringify(params.mergedConfig, null, 2)}\n`; const mcpConfigHash = crypto.createHash("sha256").update(serializedConfig).digest("hex"); + const serializedResumeConfig = `${JSON.stringify( + canonicalizeBundleMcpConfigForResume(params.mergedConfig), + null, + 2, + )}\n`; + const mcpResumeHash = crypto.createHash("sha256").update(serializedResumeConfig).digest("hex"); if (params.mode === "codex-config-overrides") { return { @@ -275,6 +325,7 @@ async function prepareModeSpecificBundleMcpConfig(params: { ), }, mcpConfigHash, + mcpResumeHash, env: params.env, }; } @@ -284,6 +335,7 @@ async function prepareModeSpecificBundleMcpConfig(params: { return { backend: params.backend, mcpConfigHash, + mcpResumeHash, env: settings.env, cleanup: settings.cleanup, }; @@ -302,6 +354,7 @@ async function prepareModeSpecificBundleMcpConfig(params: { ), }, mcpConfigHash, + mcpResumeHash, env: params.env, cleanup: async () => { await fs.rm(tempDir, { recursive: true, force: true }); diff --git a/src/agents/cli-runner/claude-live-session.ts b/src/agents/cli-runner/claude-live-session.ts new file mode 100644 index 00000000000..077b8154015 --- /dev/null +++ b/src/agents/cli-runner/claude-live-session.ts @@ -0,0 +1,913 @@ +import crypto from "node:crypto"; +import type { ReplyBackendHandle } from "../../auto-reply/reply/reply-run-registry.js"; +import type { CliBackendConfig } from "../../config/types.js"; +import { + createCliJsonlStreamingParser, + extractCliErrorMessage, + parseCliOutput, + type CliOutput, + type CliStreamingDelta, +} from "../cli-output.js"; +import { FailoverError, resolveFailoverStatus } from "../failover-error.js"; +import { classifyFailoverReason } from "../pi-embedded-helpers.js"; +import { stripSystemPromptCacheBoundary } from "../system-prompt-cache-boundary.js"; +import { cliBackendLog } from "./log.js"; +import type { PreparedCliRunContext } from "./types.js"; + +type ProcessSupervisor = ReturnType< + typeof import("../../process/supervisor/index.js").getProcessSupervisor +>; +type ManagedRun = Awaited>; +type ClaudeLiveTurn = { + backend: CliBackendConfig; + startedAtMs: number; + rawLines: string[]; + rawChars: number; + sessionId?: string; + noOutputTimer: NodeJS.Timeout | null; + timeoutTimer: NodeJS.Timeout | null; + streamingParser: ReturnType; + resolve: (output: CliOutput) => void; + reject: (error: unknown) => void; +}; +type ClaudeLiveSession = { + key: string; + fingerprint: string; + managedRun: ManagedRun; + providerId: string; + modelId: string; + noOutputTimeoutMs: number; + stderr: string; + stdoutBuffer: string; + currentTurn: ClaudeLiveTurn | null; + drainTimer: NodeJS.Timeout | null; + drainingAbortedTurn: boolean; + idleTimer: NodeJS.Timeout | null; + cleanup: () => Promise; + cleanupDone: boolean; + closing: boolean; +}; +type ClaudeLiveRunResult = { + output: CliOutput; +}; + +const CLAUDE_LIVE_IDLE_TIMEOUT_MS = 10 * 60 * 1_000; +const CLAUDE_LIVE_MAX_SESSIONS = 16; +const CLAUDE_LIVE_MAX_STDOUT_BUFFER_CHARS = 256 * 1024; +const CLAUDE_LIVE_MAX_STDERR_CHARS = 64 * 1024; +const CLAUDE_LIVE_MAX_TURN_RAW_CHARS = 2 * 1024 * 1024; +const CLAUDE_LIVE_MAX_TURN_LINES = 5_000; +const liveSessions = new Map(); +const liveSessionCreates = new Map>(); + +function sha256(value: string): string { + return crypto.createHash("sha256").update(value).digest("hex"); +} + +export function resetClaudeLiveSessionsForTest(): void { + for (const session of liveSessions.values()) { + closeLiveSession(session, "restart"); + } + liveSessions.clear(); + liveSessionCreates.clear(); +} + +export function shouldUseClaudeLiveSession(context: PreparedCliRunContext): boolean { + return ( + context.backendResolved.id === "claude-cli" && + context.preparedBackend.backend.liveSession === "claude-stdio" && + context.preparedBackend.backend.output === "jsonl" && + context.preparedBackend.backend.input === "stdin" + ); +} + +function upsertArgValue(args: string[], flag: string, value: string): string[] { + const normalized: string[] = []; + for (let i = 0; i < args.length; i += 1) { + const arg = args[i] ?? ""; + if (arg === flag) { + i += 1; + continue; + } + if (arg.startsWith(`${flag}=`)) { + continue; + } + normalized.push(arg); + } + normalized.push(flag, value); + return normalized; +} + +function appendArg(args: string[], flag: string): string[] { + return args.includes(flag) ? args : [...args, flag]; +} + +function stripLiveProcessArgs(args: string[], backend: CliBackendConfig): string[] { + const liveProcessFlags = new Set( + [backend.sessionArg, backend.systemPromptArg, "--session-id"].filter( + (entry): entry is string => typeof entry === "string" && entry.length > 0, + ), + ); + const stripped: string[] = []; + for (let i = 0; i < args.length; i += 1) { + const arg = args[i] ?? ""; + if (liveProcessFlags.has(arg)) { + i += 1; + continue; + } + if ([...liveProcessFlags].some((flag) => arg.startsWith(`${flag}=`))) { + continue; + } + stripped.push(arg); + } + return stripped; +} + +function appendSystemPromptArg( + args: string[], + backend: CliBackendConfig, + systemPrompt: string, +): string[] { + const prompt = systemPrompt.trim(); + if (!backend.systemPromptArg || !prompt) { + return args; + } + return upsertArgValue(args, backend.systemPromptArg, stripSystemPromptCacheBoundary(prompt)); +} + +export function buildClaudeLiveArgs(params: { + args: string[]; + backend: CliBackendConfig; + systemPrompt: string; + useResume: boolean; +}): string[] { + return appendArg( + upsertArgValue( + upsertArgValue( + params.useResume + ? stripLiveProcessArgs(params.args, params.backend) + : appendSystemPromptArg( + stripLiveProcessArgs(params.args, params.backend), + params.backend, + params.systemPrompt, + ), + "--input-format", + "stream-json", + ), + "--permission-prompt-tool", + "stdio", + ), + "--replay-user-messages", + ); +} + +function buildClaudeLiveKey(context: PreparedCliRunContext): string { + return `${context.backendResolved.id}:${sha256( + JSON.stringify({ + agentAccountId: context.params.agentAccountId, + agentId: context.params.agentId, + authProfileId: context.effectiveAuthProfileId, + sessionId: context.params.sessionId, + sessionKey: context.params.sessionKey, + }), + )}`; +} + +function buildClaudeLiveFingerprint(params: { + context: PreparedCliRunContext; + argv: string[]; + env: Record; +}): string { + const normalizeMcpConfigPath = Boolean(params.context.preparedBackend.mcpConfigHash); + const skillSnapshot = params.context.params.skillsSnapshot; + const skillsFingerprint = skillSnapshot + ? sha256( + JSON.stringify({ + promptHash: sha256(skillSnapshot.prompt), + skillFilter: skillSnapshot.skillFilter, + skills: skillSnapshot.skills, + resolvedSkills: (skillSnapshot.resolvedSkills ?? []).map((skill) => ({ + name: skill.name, + description: skill.description, + filePath: skill.filePath, + sourceInfo: skill.sourceInfo, + })), + version: skillSnapshot.version, + }), + ) + : undefined; + const normalizePluginDir = Boolean(skillsFingerprint); + const omittedValueFlags = new Set( + [params.context.preparedBackend.backend.systemPromptArg, "--resume", "-r"].filter( + (entry): entry is string => typeof entry === "string" && entry.length > 0, + ), + ); + const unstableValueFlags = new Set( + [ + params.context.preparedBackend.backend.sessionArg, + "--session-id", + normalizeMcpConfigPath ? "--mcp-config" : undefined, + normalizePluginDir ? "--plugin-dir" : undefined, + ].filter((entry): entry is string => typeof entry === "string" && entry.length > 0), + ); + const stableArgv: string[] = []; + for (let i = 0; i < params.argv.length; i += 1) { + const entry = params.argv[i] ?? ""; + if (omittedValueFlags.has(entry)) { + i += 1; + continue; + } + if ([...omittedValueFlags].some((flag) => entry.startsWith(`${flag}=`))) { + continue; + } + if (unstableValueFlags.has(entry)) { + stableArgv.push(""); + i += 1; + continue; + } + if ([...unstableValueFlags].some((flag) => entry.startsWith(`${flag}=`))) { + stableArgv.push(""); + continue; + } + stableArgv.push(entry); + } + return JSON.stringify({ + command: params.context.preparedBackend.backend.command, + workspaceDirHash: sha256(params.context.workspaceDir), + provider: params.context.params.provider, + model: params.context.normalizedModel, + systemPromptHash: sha256(params.context.systemPrompt), + authProfileIdHash: params.context.effectiveAuthProfileId + ? sha256(params.context.effectiveAuthProfileId) + : undefined, + authEpochHash: params.context.authEpoch ? sha256(params.context.authEpoch) : undefined, + extraSystemPromptHash: params.context.extraSystemPromptHash, + mcpConfigHash: params.context.preparedBackend.mcpConfigHash, + skillsFingerprint, + argv: stableArgv, + env: Object.keys(params.env) + .toSorted() + .map((key) => [key, params.env[key] ? sha256(params.env[key]) : ""]), + }); +} + +function createAbortError(): Error { + const error = new Error("CLI run aborted"); + error.name = "AbortError"; + return error; +} + +function clearTurnTimers(turn: ClaudeLiveTurn): void { + if (turn.noOutputTimer) { + clearTimeout(turn.noOutputTimer); + turn.noOutputTimer = null; + } + if (turn.timeoutTimer) { + clearTimeout(turn.timeoutTimer); + turn.timeoutTimer = null; + } +} + +function clearDrainTimer(session: ClaudeLiveSession): void { + if (session.drainTimer) { + clearTimeout(session.drainTimer); + session.drainTimer = null; + } +} + +function finishTurn(session: ClaudeLiveSession, output: CliOutput): void { + const turn = session.currentTurn; + if (!turn) { + return; + } + cliBackendLog.info( + `claude live session turn: provider=${session.providerId} model=${session.modelId} durationMs=${Date.now() - turn.startedAtMs} rawLines=${turn.rawLines.length}`, + ); + clearTurnTimers(turn); + turn.streamingParser.finish(); + session.currentTurn = null; + turn.resolve(output); + scheduleIdleClose(session); +} + +function failTurn(session: ClaudeLiveSession, error: unknown): void { + const turn = session.currentTurn; + if (!turn) { + return; + } + const errorKind = error instanceof Error ? error.name : typeof error; + cliBackendLog.warn( + `claude live session turn failed: provider=${session.providerId} model=${session.modelId} durationMs=${Date.now() - turn.startedAtMs} error=${errorKind}`, + ); + clearTurnTimers(turn); + turn.streamingParser.finish(); + session.currentTurn = null; + turn.reject(error); +} + +function abortTurn(session: ClaudeLiveSession, error: Error): void { + const turn = session.currentTurn; + if (!turn) { + return; + } + closeLiveSession(session, "abort", error); +} + +function cleanupLiveSession(session: ClaudeLiveSession): void { + if (session.cleanupDone) { + return; + } + session.cleanupDone = true; + void session.cleanup(); +} + +function closeLiveSession( + session: ClaudeLiveSession, + reason: "idle" | "restart" | "abort", + error?: unknown, +): void { + if (session.closing) { + return; + } + cliBackendLog.info( + `claude live session close: provider=${session.providerId} model=${session.modelId} reason=${reason}`, + ); + session.closing = true; + if (session.idleTimer) { + clearTimeout(session.idleTimer); + session.idleTimer = null; + } + clearDrainTimer(session); + if (liveSessions.get(session.key) === session) { + liveSessions.delete(session.key); + } + if (error) { + failTurn(session, error); + } + session.managedRun.cancel("manual-cancel"); + cleanupLiveSession(session); +} + +function scheduleIdleClose(session: ClaudeLiveSession): void { + if (session.idleTimer) { + clearTimeout(session.idleTimer); + } + session.idleTimer = setTimeout(() => { + if (!session.currentTurn) { + closeLiveSession(session, "idle"); + } + }, CLAUDE_LIVE_IDLE_TIMEOUT_MS); +} + +function createTimeoutError(session: ClaudeLiveSession, message: string): FailoverError { + return new FailoverError(message, { + reason: "timeout", + provider: session.providerId, + model: session.modelId, + status: resolveFailoverStatus("timeout"), + }); +} + +function createOutputLimitError(session: ClaudeLiveSession, message: string): FailoverError { + return new FailoverError(message, { + reason: "format", + provider: session.providerId, + model: session.modelId, + status: resolveFailoverStatus("format"), + }); +} + +function resetNoOutputTimer(session: ClaudeLiveSession): void { + const turn = session.currentTurn; + if (!turn) { + return; + } + if (turn.noOutputTimer) { + clearTimeout(turn.noOutputTimer); + } + turn.noOutputTimer = setTimeout(() => { + closeLiveSession( + session, + "abort", + createTimeoutError( + session, + `CLI produced no output for ${Math.round(session.noOutputTimeoutMs / 1000)}s and was terminated.`, + ), + ); + }, session.noOutputTimeoutMs); +} + +function parseSessionId(parsed: Record): string | undefined { + const sessionId = + typeof parsed.session_id === "string" + ? parsed.session_id.trim() + : typeof parsed.sessionId === "string" + ? parsed.sessionId.trim() + : ""; + return sessionId || undefined; +} + +function isRecord(value: unknown): value is Record { + return Boolean(value && typeof value === "object" && !Array.isArray(value)); +} + +function parseClaudeLiveJsonLine( + session: ClaudeLiveSession, + trimmed: string, +): Record | null { + if (trimmed.length > CLAUDE_LIVE_MAX_STDOUT_BUFFER_CHARS) { + closeLiveSession( + session, + "abort", + createOutputLimitError(session, "Claude CLI JSONL line exceeded output limit."), + ); + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { + return null; + } + return isRecord(parsed) ? parsed : null; +} + +function createResultError( + session: ClaudeLiveSession, + parsed: Record, + raw: string, +): FailoverError { + const result = typeof parsed.result === "string" ? parsed.result.trim() : ""; + const message = extractCliErrorMessage(raw) ?? (result || "Claude CLI failed."); + const reason = classifyFailoverReason(message, { provider: session.providerId }) ?? "unknown"; + return new FailoverError(message, { + reason, + provider: session.providerId, + model: session.modelId, + status: resolveFailoverStatus(reason), + }); +} + +function handleClaudeLiveLine(session: ClaudeLiveSession, line: string): void { + const turn = session.currentTurn; + const trimmed = line.trim(); + if (!trimmed) { + return; + } + const parsed = parseClaudeLiveJsonLine(session, trimmed); + if (!parsed) { + return; + } + if (session.drainingAbortedTurn) { + if (parsed.type === "result") { + const turnToClear = session.currentTurn; + if (turnToClear) { + clearTurnTimers(turnToClear); + session.currentTurn = null; + } + session.drainingAbortedTurn = false; + clearDrainTimer(session); + scheduleIdleClose(session); + } + return; + } + if (!turn) { + return; + } + turn.rawChars += trimmed.length + 1; + if ( + turn.rawChars > CLAUDE_LIVE_MAX_TURN_RAW_CHARS || + turn.rawLines.length >= CLAUDE_LIVE_MAX_TURN_LINES + ) { + closeLiveSession( + session, + "abort", + createOutputLimitError(session, "Claude CLI turn output exceeded limit."), + ); + return; + } + turn.rawLines.push(trimmed); + turn.streamingParser.push(`${trimmed}\n`); + turn.sessionId = parseSessionId(parsed) ?? turn.sessionId; + if (parsed.type !== "result") { + return; + } + const raw = turn.rawLines.join("\n"); + if (parsed.is_error === true) { + failTurn(session, createResultError(session, parsed, raw)); + scheduleIdleClose(session); + return; + } + finishTurn( + session, + parseCliOutput({ + raw, + backend: turn.backend, + providerId: session.providerId, + outputMode: "jsonl", + fallbackSessionId: turn.sessionId, + }), + ); +} + +function handleClaudeStdout(session: ClaudeLiveSession, chunk: string) { + resetNoOutputTimer(session); + session.stdoutBuffer += chunk; + if (session.stdoutBuffer.length > CLAUDE_LIVE_MAX_STDOUT_BUFFER_CHARS) { + closeLiveSession( + session, + "abort", + createOutputLimitError(session, "Claude CLI stdout buffer exceeded limit."), + ); + return; + } + const lines = session.stdoutBuffer.split(/\r?\n/g); + session.stdoutBuffer = lines.pop() ?? ""; + try { + for (const line of lines) { + handleClaudeLiveLine(session, line); + } + } catch (error) { + closeLiveSession(session, "abort", error); + } +} + +function handleClaudeExit(session: ClaudeLiveSession, exitCode: number | null): void { + session.closing = true; + if (session.idleTimer) { + clearTimeout(session.idleTimer); + session.idleTimer = null; + } + clearDrainTimer(session); + if (liveSessions.get(session.key) === session) { + liveSessions.delete(session.key); + } + cleanupLiveSession(session); + if (!session.currentTurn) { + return; + } + if (session.stdoutBuffer.trim()) { + try { + handleClaudeLiveLine(session, session.stdoutBuffer); + } catch (error) { + session.stdoutBuffer = ""; + failTurn(session, error); + return; + } + session.stdoutBuffer = ""; + } + if (!session.currentTurn) { + return; + } + const stderr = session.stderr.trim(); + const fallbackMessage = + exitCode === 0 ? "Claude CLI exited before completing the turn." : "Claude CLI failed."; + const message = extractCliErrorMessage(stderr) ?? (stderr || fallbackMessage); + if (exitCode === 0) { + failTurn(session, new Error(message)); + return; + } + const reason = classifyFailoverReason(message, { provider: session.providerId }) ?? "unknown"; + failTurn( + session, + new FailoverError(message, { + reason, + provider: session.providerId, + model: session.modelId, + status: resolveFailoverStatus(reason), + }), + ); +} + +function createClaudeUserInputMessage(content: string): string { + return `${JSON.stringify({ + type: "user", + session_id: "", + parent_tool_use_id: null, + message: { + role: "user", + content, + }, + })}\n`; +} + +async function writeTurnInput(session: ClaudeLiveSession, prompt: string): Promise { + const stdin = session.managedRun.stdin; + if (!stdin) { + throw new Error("Claude CLI live session stdin is unavailable"); + } + await new Promise((resolve, reject) => { + stdin.write(createClaudeUserInputMessage(prompt), (error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +} + +async function createClaudeLiveSession(params: { + context: PreparedCliRunContext; + argv: string[]; + env: Record; + fingerprint: string; + key: string; + noOutputTimeoutMs: number; + supervisor: ProcessSupervisor; + cleanup: () => Promise; +}): Promise { + let session: ClaudeLiveSession | null = null; + const managedRun = await params.supervisor.spawn({ + sessionId: params.context.params.sessionId, + backendId: params.context.backendResolved.id, + scopeKey: `claude-live:${params.key}`, + replaceExistingScope: true, + mode: "child", + argv: params.argv, + cwd: params.context.workspaceDir, + env: params.env, + stdinMode: "pipe-open", + captureOutput: false, + onStdout: (chunk) => { + if (session) { + handleClaudeStdout(session, chunk); + } + }, + onStderr: (chunk) => { + if (session) { + session.stderr += chunk; + if (session.stderr.length > CLAUDE_LIVE_MAX_STDERR_CHARS) { + closeLiveSession( + session, + "abort", + createOutputLimitError(session, "Claude CLI stderr exceeded limit."), + ); + return; + } + resetNoOutputTimer(session); + } + }, + }); + session = { + key: params.key, + fingerprint: params.fingerprint, + managedRun, + providerId: params.context.params.provider, + modelId: params.context.modelId, + noOutputTimeoutMs: params.noOutputTimeoutMs, + stderr: "", + stdoutBuffer: "", + currentTurn: null, + drainTimer: null, + drainingAbortedTurn: false, + idleTimer: null, + cleanup: params.cleanup, + cleanupDone: false, + closing: false, + }; + void managedRun.wait().then( + (exit) => handleClaudeExit(session, exit.exitCode), + (error) => { + if (session) { + closeLiveSession(session, "abort", error); + } + }, + ); + liveSessions.set(params.key, session); + cliBackendLog.info( + `claude live session start: provider=${session.providerId} model=${session.modelId} activeSessions=${liveSessions.size}`, + ); + return session; +} + +function createTurn(params: { + context: PreparedCliRunContext; + noOutputTimeoutMs: number; + onAssistantDelta: (delta: CliStreamingDelta) => void; + session: ClaudeLiveSession; + resolve: (output: CliOutput) => void; + reject: (error: unknown) => void; +}): ClaudeLiveTurn { + const turn: ClaudeLiveTurn = { + backend: params.context.preparedBackend.backend, + startedAtMs: Date.now(), + rawLines: [], + rawChars: 0, + noOutputTimer: null, + timeoutTimer: null, + streamingParser: createCliJsonlStreamingParser({ + backend: params.context.preparedBackend.backend, + providerId: params.context.backendResolved.id, + onAssistantDelta: params.onAssistantDelta, + }), + resolve: params.resolve, + reject: params.reject, + }; + turn.noOutputTimer = setTimeout(() => { + closeLiveSession( + params.session, + "abort", + createTimeoutError( + params.session, + `CLI produced no output for ${Math.round(params.noOutputTimeoutMs / 1000)}s and was terminated.`, + ), + ); + }, params.noOutputTimeoutMs); + turn.timeoutTimer = setTimeout(() => { + closeLiveSession( + params.session, + "abort", + createTimeoutError( + params.session, + `CLI exceeded timeout (${Math.round(params.context.params.timeoutMs / 1000)}s) and was terminated.`, + ), + ); + }, params.context.params.timeoutMs); + return turn; +} + +function closeOldestIdleSession(): boolean { + for (const session of liveSessions.values()) { + if (!session.currentTurn && !session.drainingAbortedTurn) { + closeLiveSession(session, "idle"); + return true; + } + } + return false; +} + +function ensureLiveSessionCapacity(key: string, context: PreparedCliRunContext): void { + if ( + liveSessions.has(key) || + liveSessionCreates.has(key) || + liveSessions.size + liveSessionCreates.size < CLAUDE_LIVE_MAX_SESSIONS + ) { + return; + } + if (closeOldestIdleSession()) { + return; + } + throw new FailoverError("Too many Claude CLI live sessions are active.", { + reason: "rate_limit", + provider: context.params.provider, + model: context.modelId, + status: resolveFailoverStatus("rate_limit"), + }); +} + +export async function runClaudeLiveSessionTurn(params: { + context: PreparedCliRunContext; + args: string[]; + env: Record; + prompt: string; + useResume: boolean; + noOutputTimeoutMs: number; + getProcessSupervisor: () => ProcessSupervisor; + onAssistantDelta: (delta: CliStreamingDelta) => void; + cleanup: () => Promise; +}): Promise { + const key = buildClaudeLiveKey(params.context); + const resumeCapable = Boolean(params.context.preparedBackend.backend.resumeArgs?.length); + const argv = [ + params.context.preparedBackend.backend.command, + ...buildClaudeLiveArgs({ + args: params.args, + backend: params.context.preparedBackend.backend, + systemPrompt: params.context.systemPrompt, + useResume: params.useResume, + }), + ]; + const fingerprint = buildClaudeLiveFingerprint({ + context: params.context, + argv, + env: params.env, + }); + let cleanupDone = false; + const cleanup = async () => { + if (cleanupDone) { + return; + } + cleanupDone = true; + await params.cleanup(); + }; + let session = liveSessions.get(key) ?? null; + if (session && resumeCapable && !params.useResume) { + closeLiveSession(session, "restart"); + session = null; + } + if (session && session.fingerprint !== fingerprint) { + closeLiveSession(session, "restart"); + session = null; + } + let cleanupTurnArtifacts = Boolean(session); + try { + ensureLiveSessionCapacity(key, params.context); + } catch (error) { + await cleanup(); + throw error; + } + if (!session) { + const pendingSession = liveSessionCreates.get(key); + if (pendingSession) { + try { + session = await pendingSession; + } catch (error) { + await cleanup(); + throw error; + } + if (session.fingerprint !== fingerprint) { + closeLiveSession(session, "restart"); + session = null; + } else if (resumeCapable && !params.useResume) { + closeLiveSession(session, "restart"); + session = null; + } else { + cleanupTurnArtifacts = true; + } + } + if (!session) { + const createSession = createClaudeLiveSession({ + context: params.context, + argv, + env: params.env, + fingerprint, + key, + noOutputTimeoutMs: params.noOutputTimeoutMs, + supervisor: params.getProcessSupervisor(), + cleanup, + }).finally(() => { + if (liveSessionCreates.get(key) === createSession) { + liveSessionCreates.delete(key); + } + }); + liveSessionCreates.set(key, createSession); + try { + session = await createSession; + } catch (error) { + await cleanup(); + throw error; + } + } + } + if (cleanupTurnArtifacts && session) { + await cleanup(); + if (session.idleTimer) { + clearTimeout(session.idleTimer); + session.idleTimer = null; + } + cliBackendLog.info( + `claude live session reuse: provider=${session.providerId} model=${session.modelId}`, + ); + } + if (session.closing) { + await cleanup(); + throw new Error("Claude CLI live session closed before handling the turn"); + } + if (session.currentTurn || session.drainingAbortedTurn) { + throw new Error("Claude CLI live session is already handling a turn"); + } + const liveSession = session; + liveSession.noOutputTimeoutMs = params.noOutputTimeoutMs; + liveSession.stderr = ""; + + const outputPromise = new Promise((resolve, reject) => { + liveSession.currentTurn = createTurn({ + context: params.context, + noOutputTimeoutMs: params.noOutputTimeoutMs, + onAssistantDelta: params.onAssistantDelta, + session: liveSession, + resolve, + reject, + }); + }); + const abort = () => abortTurn(liveSession, createAbortError()); + const replyBackendHandle: ReplyBackendHandle | undefined = params.context.params.replyOperation + ? { + kind: "cli", + cancel: abort, + isStreaming: () => false, + } + : undefined; + params.context.params.abortSignal?.addEventListener("abort", abort, { once: true }); + if (replyBackendHandle) { + params.context.params.replyOperation?.attachBackend(replyBackendHandle); + } + try { + if (params.context.params.abortSignal?.aborted) { + abort(); + } else { + try { + await writeTurnInput(liveSession, params.prompt); + } catch (error) { + closeLiveSession(liveSession, "abort", error); + } + } + return { output: await outputPromise }; + } finally { + params.context.params.abortSignal?.removeEventListener("abort", abort); + if (replyBackendHandle) { + params.context.params.replyOperation?.detachBackend(replyBackendHandle); + } + } +} diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index fa6ea8cffe0..04c6549468d 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -17,6 +17,7 @@ import { FailoverError, resolveFailoverStatus } from "../failover-error.js"; import { classifyFailoverReason } from "../pi-embedded-helpers.js"; import { applyPluginTextReplacements } from "../plugin-text-transforms.js"; import { applySkillEnvOverridesFromSnapshot } from "../skills.js"; +import { runClaudeLiveSessionTurn, shouldUseClaudeLiveSession } from "./claude-live-session.js"; import { prepareClaudeCliSkillsPlugin } from "./claude-skills-plugin.js"; import { buildCliSupervisorScopeKey, @@ -155,7 +156,7 @@ function formatCliEnvKeyList(keys: readonly string[]): string { function buildCliEnvMcpLog(childEnv: Record): string { return [ `token=${childEnv.OPENCLAW_MCP_TOKEN ? "set" : "missing"}`, - `sessionKey=${childEnv.OPENCLAW_MCP_SESSION_KEY || ""}`, + `sessionKey=${childEnv.OPENCLAW_MCP_SESSION_KEY ? "set" : ""}`, `agentId=${childEnv.OPENCLAW_MCP_AGENT_ID || ""}`, `accountId=${childEnv.OPENCLAW_MCP_ACCOUNT_ID || ""}`, `messageChannel=${childEnv.OPENCLAW_MCP_MESSAGE_CHANNEL || ""}`, @@ -235,6 +236,7 @@ export async function executePreparedCliRun( backendId: context.backendResolved.id, skillsSnapshot: params.skillsSnapshot, }); + let claudeSkillsPluginCleanupOwned = false; const args = buildCliArgs({ backend, baseArgs: @@ -329,29 +331,71 @@ export async function executePreparedCliRun( timeoutMs: params.timeoutMs, useResume, }); - const streamingParser = - backend.output === "jsonl" - ? createCliJsonlStreamingParser({ - backend, - providerId: context.backendResolved.id, - onAssistantDelta: ({ text, delta }) => { - emitAgentEvent({ - runId: params.runId, - stream: "assistant", - data: { - text: applyPluginTextReplacements( - text, - context.backendResolved.textTransforms?.output, - ), - delta: applyPluginTextReplacements( - delta, - context.backendResolved.textTransforms?.output, - ), - }, - }); + const hasJsonlOutput = backend.output === "jsonl"; + if (shouldUseClaudeLiveSession(context)) { + if (!hasJsonlOutput) { + throw new Error("Claude live session requires JSONL streaming parser"); + } + claudeSkillsPluginCleanupOwned = true; + const liveResult = await runClaudeLiveSessionTurn({ + context, + args, + env, + prompt, + useResume, + noOutputTimeoutMs, + getProcessSupervisor: executeDeps.getProcessSupervisor, + onAssistantDelta: ({ text, delta }) => { + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text: applyPluginTextReplacements( + text, + context.backendResolved.textTransforms?.output, + ), + delta: applyPluginTextReplacements( + delta, + context.backendResolved.textTransforms?.output, + ), }, - }) - : null; + }); + }, + cleanup: claudeSkillsPlugin.cleanup, + }); + const rawText = liveResult.output.text; + return { + ...liveResult.output, + rawText, + finalPromptText: prompt, + text: applyPluginTextReplacements( + rawText, + context.backendResolved.textTransforms?.output, + ), + }; + } + const streamingParser = hasJsonlOutput + ? createCliJsonlStreamingParser({ + backend, + providerId: context.backendResolved.id, + onAssistantDelta: ({ text, delta }) => { + emitAgentEvent({ + runId: params.runId, + stream: "assistant", + data: { + text: applyPluginTextReplacements( + text, + context.backendResolved.textTransforms?.output, + ), + delta: applyPluginTextReplacements( + delta, + context.backendResolved.textTransforms?.output, + ), + }, + }); + }, + }) + : null; const supervisor = executeDeps.getProcessSupervisor(); const scopeKey = buildCliSupervisorScopeKey({ backend, @@ -495,7 +539,9 @@ export async function executePreparedCliRun( } }); } finally { - await claudeSkillsPlugin.cleanup(); + if (!claudeSkillsPluginCleanupOwned) { + await claudeSkillsPlugin.cleanup(); + } if (systemPromptFile) { await systemPromptFile.cleanup(); } diff --git a/src/agents/cli-runner/prepare.ts b/src/agents/cli-runner/prepare.ts index 4edbe220fd3..93bc6265454 100644 --- a/src/agents/cli-runner/prepare.ts +++ b/src/agents/cli-runner/prepare.ts @@ -234,6 +234,7 @@ export async function prepareCliRunContext( authEpoch, extraSystemPromptHash, mcpConfigHash: preparedBackendFinal.mcpConfigHash, + mcpResumeHash: preparedBackendFinal.mcpResumeHash, }) : params.cliSessionId ? { sessionId: params.cliSessionId } diff --git a/src/agents/cli-runner/types.ts b/src/agents/cli-runner/types.ts index 1c2ac804e12..ba686a666a6 100644 --- a/src/agents/cli-runner/types.ts +++ b/src/agents/cli-runner/types.ts @@ -44,6 +44,7 @@ export type CliPreparedBackend = { backend: CliBackendConfig; cleanup?: () => Promise; mcpConfigHash?: string; + mcpResumeHash?: string; env?: Record; }; diff --git a/src/agents/cli-session.test.ts b/src/agents/cli-session.test.ts index 718bb3faf5c..c3ee719dbfd 100644 --- a/src/agents/cli-session.test.ts +++ b/src/agents/cli-session.test.ts @@ -22,6 +22,7 @@ describe("cli-session helpers", () => { authEpoch: "auth-epoch", extraSystemPromptHash: "prompt-hash", mcpConfigHash: "mcp-hash", + mcpResumeHash: "mcp-resume-hash", }); expect(entry.cliSessionIds?.["claude-cli"]).toBe("cli-session-1"); @@ -32,6 +33,7 @@ describe("cli-session helpers", () => { authEpoch: "auth-epoch", extraSystemPromptHash: "prompt-hash", mcpConfigHash: "mcp-hash", + mcpResumeHash: "mcp-resume-hash", }); }); @@ -144,6 +146,38 @@ describe("cli-session helpers", () => { ).toEqual({ sessionId: "cli-session-1" }); }); + it("prefers the stable MCP resume hash over the raw MCP config hash", () => { + const binding = { + sessionId: "cli-session-1", + authProfileId: "anthropic:work", + authEpoch: "auth-epoch-a", + extraSystemPromptHash: "prompt-a", + mcpConfigHash: "mcp-config-a", + mcpResumeHash: "mcp-resume-a", + }; + + expect( + resolveCliSessionReuse({ + binding, + authProfileId: "anthropic:work", + authEpoch: "auth-epoch-a", + extraSystemPromptHash: "prompt-a", + mcpConfigHash: "mcp-config-b", + mcpResumeHash: "mcp-resume-a", + }), + ).toEqual({ sessionId: "cli-session-1" }); + expect( + resolveCliSessionReuse({ + binding, + authProfileId: "anthropic:work", + authEpoch: "auth-epoch-a", + extraSystemPromptHash: "prompt-a", + mcpConfigHash: "mcp-config-a", + mcpResumeHash: "mcp-resume-b", + }), + ).toEqual({ invalidatedReason: "mcp" }); + }); + it("clears provider-scoped and global CLI session state", () => { const entry: SessionEntry = { sessionId: "openclaw-session", diff --git a/src/agents/cli-session.ts b/src/agents/cli-session.ts index 15ad3825ad6..6f721f00d73 100644 --- a/src/agents/cli-session.ts +++ b/src/agents/cli-session.ts @@ -30,6 +30,7 @@ export function getCliSessionBinding( authEpoch: normalizeOptionalString(fromBindings?.authEpoch), extraSystemPromptHash: normalizeOptionalString(fromBindings?.extraSystemPromptHash), mcpConfigHash: normalizeOptionalString(fromBindings?.mcpConfigHash), + mcpResumeHash: normalizeOptionalString(fromBindings?.mcpResumeHash), }; } const fromMap = entry.cliSessionIds?.[normalized]; @@ -83,6 +84,9 @@ export function setCliSessionBinding( ...(normalizeOptionalString(binding.mcpConfigHash) ? { mcpConfigHash: normalizeOptionalString(binding.mcpConfigHash) } : {}), + ...(normalizeOptionalString(binding.mcpResumeHash) + ? { mcpResumeHash: normalizeOptionalString(binding.mcpResumeHash) } + : {}), }, }; entry.cliSessionIds = { ...entry.cliSessionIds, [normalized]: trimmed }; @@ -120,6 +124,7 @@ export function resolveCliSessionReuse(params: { authEpoch?: string; extraSystemPromptHash?: string; mcpConfigHash?: string; + mcpResumeHash?: string; }): { sessionId?: string; invalidatedReason?: "auth-profile" | "auth-epoch" | "system-prompt" | "mcp"; @@ -133,6 +138,7 @@ export function resolveCliSessionReuse(params: { const currentAuthEpoch = normalizeOptionalString(params.authEpoch); const currentExtraSystemPromptHash = normalizeOptionalString(params.extraSystemPromptHash); const currentMcpConfigHash = normalizeOptionalString(params.mcpConfigHash); + const currentMcpResumeHash = normalizeOptionalString(params.mcpResumeHash); const storedAuthProfileId = normalizeOptionalString(binding?.authProfileId); if (storedAuthProfileId !== currentAuthProfileId) { return { invalidatedReason: "auth-profile" }; @@ -145,6 +151,13 @@ export function resolveCliSessionReuse(params: { if (storedExtraSystemPromptHash !== currentExtraSystemPromptHash) { return { invalidatedReason: "system-prompt" }; } + const storedMcpResumeHash = normalizeOptionalString(binding?.mcpResumeHash); + if (storedMcpResumeHash || currentMcpResumeHash) { + if (storedMcpResumeHash !== currentMcpResumeHash) { + return { invalidatedReason: "mcp" }; + } + return { sessionId }; + } const storedMcpConfigHash = normalizeOptionalString(binding?.mcpConfigHash); if (storedMcpConfigHash !== currentMcpConfigHash) { return { invalidatedReason: "mcp" }; diff --git a/src/config/schema.base.generated.ts b/src/config/schema.base.generated.ts index 937f8f32119..244783aafa9 100644 --- a/src/config/schema.base.generated.ts +++ b/src/config/schema.base.generated.ts @@ -3606,6 +3606,10 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = { type: "string", const: "claude-stream-json", }, + liveSession: { + type: "string", + const: "claude-stdio", + }, input: { anyOf: [ { diff --git a/src/config/sessions/types.ts b/src/config/sessions/types.ts index 526bd7e6d35..2ac683aa2ea 100644 --- a/src/config/sessions/types.ts +++ b/src/config/sessions/types.ts @@ -74,6 +74,7 @@ export type CliSessionBinding = { authEpoch?: string; extraSystemPromptHash?: string; mcpConfigHash?: string; + mcpResumeHash?: string; }; export type SessionCompactionCheckpointReason = diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index 04c57f90658..6f118d99d96 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -91,6 +91,8 @@ export type CliBackendConfig = { resumeOutput?: "json" | "text" | "jsonl"; /** JSONL event dialect for CLIs with provider-specific stream formats. */ jsonlDialect?: "claude-stream-json"; + /** Long-lived CLI process mode. */ + liveSession?: "claude-stdio"; /** Prompt input mode (default: arg). */ input?: "arg" | "stdin"; /** Max prompt length for arg mode (if exceeded, stdin is used). */ diff --git a/src/config/zod-schema.core.ts b/src/config/zod-schema.core.ts index cd9bbdabb92..0b88413893b 100644 --- a/src/config/zod-schema.core.ts +++ b/src/config/zod-schema.core.ts @@ -546,6 +546,7 @@ export const CliBackendSchema = z output: z.union([z.literal("json"), z.literal("text"), z.literal("jsonl")]).optional(), resumeOutput: z.union([z.literal("json"), z.literal("text"), z.literal("jsonl")]).optional(), jsonlDialect: z.literal("claude-stream-json").optional(), + liveSession: z.literal("claude-stdio").optional(), input: z.union([z.literal("arg"), z.literal("stdin")]).optional(), maxPromptArgChars: z.number().int().positive().optional(), env: z.record(z.string(), z.string()).optional(),