fix: keep claude cli sessions warm (#69679)

* feat(cli): keep claude cli sessions warm

* test(cli): cover claude live session reuse

* fix(cli): harden claude live session reuse

* fix(cli): redact mcp session key logs

* fix(cli): bound claude live session turns

* fix(cli): reuse claude live sessions on resume

* refactor(cli): canonicalize claude live argv

* fix(cli): preserve claude live resume state

* fix(cli): close dead claude live sessions

* fix(cli): serialize claude live session creates

* fix(cli): count pending claude live sessions

* fix(cli): tighten claude live resume abort

* fix(cli): reject closed claude live sessions

* fix(cli): refresh claude live fingerprints

* fix(cli): stabilize MCP resume hash

* fix: preserve claude live inline resume (#69679)

---------

Co-authored-by: Frank Yang <frank.ekn@gmail.com>
This commit is contained in:
Ayaan Zaidi
2026-04-22 13:44:18 +05:30
committed by GitHub
parent dab46a7e98
commit 81ca7bc40b
18 changed files with 2172 additions and 27 deletions

View File

@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
- Tokenjuice: add bundled native OpenClaw support for tokenjuice as an opt-in plugin that compacts noisy `exec` and `bash` tool results in Pi embedded runs. (#69946) Thanks @vincentkoc.
- Providers/Tencent: add the bundled Tencent Cloud provider plugin with TokenHub and Token Plan onboarding, docs, `hy3-preview` model catalog entries, and tiered Hy3 pricing metadata. (#68460) Thanks @JuniperSling.
- TUI: add local embedded mode for running terminal chats without a Gateway while keeping plugin approval gates enforced. (#66767) Thanks @fuller-stack-dev.
- CLI/Claude: keep compatible `claude-cli` runs on a warm stdio session and resume from the stored Claude session after Gateway restarts or idle exits. (#69679) Thanks @obviyus.
### Fixes

View File

@@ -1,4 +1,4 @@
e77c14ad4db1be62275667537716917e4d0da73e1afb89be1edeb78d73346ae4 config-baseline.json
ed4e305904b4b954ffa72c07ea1900a116bfd874ac0c637227883abb99f753f9 config-baseline.core.json
3f08544c1a8143755a848aeb731f2eddf4f84cf70950c7d165f8889e01e4985d config-baseline.json
2190e81fcd754b96b48a1e012600f3b74fdb9b91eac280d8e3e038fcb73d6546 config-baseline.core.json
6c0069b971ae298ae68516ebcd3eae0e8c82820d2e8f42ecbd2f53a2f9077371 config-baseline.channel.json
9096ec947597b03f97eef44186a3102fd80ffb7f3e791fb64544464d4571448f config-baseline.plugin.json

View File

@@ -143,6 +143,8 @@ The provider id becomes the left side of your model ref:
1. **Selects a backend** based on the provider prefix (`codex-cli/...`).
2. **Builds a system prompt** using the same OpenClaw prompt + workspace context.
3. **Executes the CLI** with a session id (if supported) so history stays consistent.
The bundled `claude-cli` backend keeps a Claude stdio process alive per
OpenClaw session and sends follow-up turns over stream-json stdin.
4. **Parses output** (JSON or plain text) and returns the final text.
5. **Persists session ids** per backend, so follow-ups reuse the same CLI session.
@@ -179,6 +181,10 @@ child process environment for the run.
- `always`: always send a session id (new UUID if none stored).
- `existing`: only send a session id if one was stored before.
- `none`: never send a session id.
- The bundled `claude-cli` backend uses `liveSession: "claude-stdio"` so
follow-up turns reuse the live Claude process while it is active. If the
Gateway restarts or the idle process exits, OpenClaw resumes from the stored
Claude session id.
Serialization notes:

View File

@@ -53,6 +53,7 @@ export function buildAnthropicCliBackend(): CliBackendPlugin {
"{sessionId}",
],
output: "jsonl",
liveSession: "claude-stdio",
input: "stdin",
modelArg: "--model",
modelAliases: CLAUDE_CLI_MODEL_ALIASES,

File diff suppressed because it is too large Load Diff

View File

@@ -78,6 +78,9 @@ export async function runPreparedCliAgent(
...(context.preparedBackend.mcpConfigHash
? { mcpConfigHash: context.preparedBackend.mcpConfigHash }
: {}),
...(context.preparedBackend.mcpResumeHash
? { mcpResumeHash: context.preparedBackend.mcpResumeHash }
: {}),
},
}
: {}),

View File

@@ -103,6 +103,7 @@ describe("prepareCliBundleMcpConfig", () => {
};
expect(raw.mcpServers?.bundleProbe?.args).toEqual([await fs.realpath(bundleProbeServerPath)]);
expect(prepared.mcpConfigHash).toMatch(/^[0-9a-f]{64}$/);
expect(prepared.mcpResumeHash).toMatch(/^[0-9a-f]{64}$/);
await prepared.cleanup?.();
});
@@ -189,6 +190,75 @@ describe("prepareCliBundleMcpConfig", () => {
await prepared.cleanup?.();
});
it("stabilizes the resume hash when only the OpenClaw loopback port changes", async () => {
const first = await prepareBundleProbeCliConfig({
additionalConfig: {
mcpServers: {
openclaw: {
type: "http",
url: "http://127.0.0.1:23119/mcp",
headers: {
Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}",
},
},
},
},
});
const second = await prepareBundleProbeCliConfig({
additionalConfig: {
mcpServers: {
openclaw: {
type: "http",
url: "http://127.0.0.1:24567/mcp",
headers: {
Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}",
},
},
},
},
});
expect(first.mcpConfigHash).not.toBe(second.mcpConfigHash);
expect(first.mcpResumeHash).toBe(second.mcpResumeHash);
await first.cleanup?.();
await second.cleanup?.();
});
it("changes the resume hash when stable MCP semantics change", async () => {
const first = await prepareBundleProbeCliConfig({
additionalConfig: {
mcpServers: {
openclaw: {
type: "http",
url: "http://127.0.0.1:23119/mcp",
headers: {
Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}",
},
},
},
},
});
const second = await prepareBundleProbeCliConfig({
additionalConfig: {
mcpServers: {
openclaw: {
type: "http",
url: "http://127.0.0.1:23119/other",
headers: {
Authorization: "Bearer ${OPENCLAW_MCP_TOKEN}",
},
},
},
},
});
expect(first.mcpResumeHash).not.toBe(second.mcpResumeHash);
await first.cleanup?.();
await second.cleanup?.();
});
it("preserves extra env values alongside generated MCP config", async () => {
const workspaceDir = await tempHarness.createTempDir("openclaw-cli-bundle-mcp-env-");

View File

@@ -22,6 +22,7 @@ type PreparedCliBundleMcpConfig = {
backend: CliBackendConfig;
cleanup?: () => Promise<void>;
mcpConfigHash?: string;
mcpResumeHash?: string;
env?: Record<string, string>;
};
@@ -255,6 +256,49 @@ async function writeGeminiSystemSettings(
};
}
function sortJsonValue(value: unknown): unknown {
if (Array.isArray(value)) {
return value.map((entry) => sortJsonValue(entry));
}
if (!isRecord(value)) {
return value;
}
return Object.fromEntries(
Object.keys(value)
.toSorted()
.map((key) => [key, sortJsonValue(value[key])]),
);
}
function normalizeOpenClawLoopbackUrl(value: string): string {
const match =
/^(http:\/\/(?:127\.0\.0\.1|localhost|\[::1\])):\d+(\/mcp)$/.exec(value.trim()) ?? undefined;
if (!match) {
return value;
}
return `${match[1]}:<openclaw-loopback>${match[2]}`;
}
function canonicalizeBundleMcpConfigForResume(config: BundleMcpConfig): BundleMcpConfig {
const canonicalServers = Object.fromEntries(
Object.entries(config.mcpServers).map(([name, server]) => {
if (name !== "openclaw" || typeof server.url !== "string") {
return [name, sortJsonValue(server)];
}
return [
name,
sortJsonValue({
...server,
url: normalizeOpenClawLoopbackUrl(server.url),
}),
];
}),
) as BundleMcpConfig["mcpServers"];
return {
mcpServers: sortJsonValue(canonicalServers) as BundleMcpConfig["mcpServers"],
};
}
async function prepareModeSpecificBundleMcpConfig(params: {
mode: CliBundleMcpMode;
backend: CliBackendConfig;
@@ -263,6 +307,12 @@ async function prepareModeSpecificBundleMcpConfig(params: {
}): Promise<PreparedCliBundleMcpConfig> {
const serializedConfig = `${JSON.stringify(params.mergedConfig, null, 2)}\n`;
const mcpConfigHash = crypto.createHash("sha256").update(serializedConfig).digest("hex");
const serializedResumeConfig = `${JSON.stringify(
canonicalizeBundleMcpConfigForResume(params.mergedConfig),
null,
2,
)}\n`;
const mcpResumeHash = crypto.createHash("sha256").update(serializedResumeConfig).digest("hex");
if (params.mode === "codex-config-overrides") {
return {
@@ -275,6 +325,7 @@ async function prepareModeSpecificBundleMcpConfig(params: {
),
},
mcpConfigHash,
mcpResumeHash,
env: params.env,
};
}
@@ -284,6 +335,7 @@ async function prepareModeSpecificBundleMcpConfig(params: {
return {
backend: params.backend,
mcpConfigHash,
mcpResumeHash,
env: settings.env,
cleanup: settings.cleanup,
};
@@ -302,6 +354,7 @@ async function prepareModeSpecificBundleMcpConfig(params: {
),
},
mcpConfigHash,
mcpResumeHash,
env: params.env,
cleanup: async () => {
await fs.rm(tempDir, { recursive: true, force: true });

View File

@@ -0,0 +1,913 @@
import crypto from "node:crypto";
import type { ReplyBackendHandle } from "../../auto-reply/reply/reply-run-registry.js";
import type { CliBackendConfig } from "../../config/types.js";
import {
createCliJsonlStreamingParser,
extractCliErrorMessage,
parseCliOutput,
type CliOutput,
type CliStreamingDelta,
} from "../cli-output.js";
import { FailoverError, resolveFailoverStatus } from "../failover-error.js";
import { classifyFailoverReason } from "../pi-embedded-helpers.js";
import { stripSystemPromptCacheBoundary } from "../system-prompt-cache-boundary.js";
import { cliBackendLog } from "./log.js";
import type { PreparedCliRunContext } from "./types.js";
type ProcessSupervisor = ReturnType<
typeof import("../../process/supervisor/index.js").getProcessSupervisor
>;
type ManagedRun = Awaited<ReturnType<ProcessSupervisor["spawn"]>>;
type ClaudeLiveTurn = {
backend: CliBackendConfig;
startedAtMs: number;
rawLines: string[];
rawChars: number;
sessionId?: string;
noOutputTimer: NodeJS.Timeout | null;
timeoutTimer: NodeJS.Timeout | null;
streamingParser: ReturnType<typeof createCliJsonlStreamingParser>;
resolve: (output: CliOutput) => void;
reject: (error: unknown) => void;
};
type ClaudeLiveSession = {
key: string;
fingerprint: string;
managedRun: ManagedRun;
providerId: string;
modelId: string;
noOutputTimeoutMs: number;
stderr: string;
stdoutBuffer: string;
currentTurn: ClaudeLiveTurn | null;
drainTimer: NodeJS.Timeout | null;
drainingAbortedTurn: boolean;
idleTimer: NodeJS.Timeout | null;
cleanup: () => Promise<void>;
cleanupDone: boolean;
closing: boolean;
};
type ClaudeLiveRunResult = {
output: CliOutput;
};
const CLAUDE_LIVE_IDLE_TIMEOUT_MS = 10 * 60 * 1_000;
const CLAUDE_LIVE_MAX_SESSIONS = 16;
const CLAUDE_LIVE_MAX_STDOUT_BUFFER_CHARS = 256 * 1024;
const CLAUDE_LIVE_MAX_STDERR_CHARS = 64 * 1024;
const CLAUDE_LIVE_MAX_TURN_RAW_CHARS = 2 * 1024 * 1024;
const CLAUDE_LIVE_MAX_TURN_LINES = 5_000;
const liveSessions = new Map<string, ClaudeLiveSession>();
const liveSessionCreates = new Map<string, Promise<ClaudeLiveSession>>();
function sha256(value: string): string {
return crypto.createHash("sha256").update(value).digest("hex");
}
export function resetClaudeLiveSessionsForTest(): void {
for (const session of liveSessions.values()) {
closeLiveSession(session, "restart");
}
liveSessions.clear();
liveSessionCreates.clear();
}
export function shouldUseClaudeLiveSession(context: PreparedCliRunContext): boolean {
return (
context.backendResolved.id === "claude-cli" &&
context.preparedBackend.backend.liveSession === "claude-stdio" &&
context.preparedBackend.backend.output === "jsonl" &&
context.preparedBackend.backend.input === "stdin"
);
}
function upsertArgValue(args: string[], flag: string, value: string): string[] {
const normalized: string[] = [];
for (let i = 0; i < args.length; i += 1) {
const arg = args[i] ?? "";
if (arg === flag) {
i += 1;
continue;
}
if (arg.startsWith(`${flag}=`)) {
continue;
}
normalized.push(arg);
}
normalized.push(flag, value);
return normalized;
}
function appendArg(args: string[], flag: string): string[] {
return args.includes(flag) ? args : [...args, flag];
}
function stripLiveProcessArgs(args: string[], backend: CliBackendConfig): string[] {
const liveProcessFlags = new Set(
[backend.sessionArg, backend.systemPromptArg, "--session-id"].filter(
(entry): entry is string => typeof entry === "string" && entry.length > 0,
),
);
const stripped: string[] = [];
for (let i = 0; i < args.length; i += 1) {
const arg = args[i] ?? "";
if (liveProcessFlags.has(arg)) {
i += 1;
continue;
}
if ([...liveProcessFlags].some((flag) => arg.startsWith(`${flag}=`))) {
continue;
}
stripped.push(arg);
}
return stripped;
}
function appendSystemPromptArg(
args: string[],
backend: CliBackendConfig,
systemPrompt: string,
): string[] {
const prompt = systemPrompt.trim();
if (!backend.systemPromptArg || !prompt) {
return args;
}
return upsertArgValue(args, backend.systemPromptArg, stripSystemPromptCacheBoundary(prompt));
}
export function buildClaudeLiveArgs(params: {
args: string[];
backend: CliBackendConfig;
systemPrompt: string;
useResume: boolean;
}): string[] {
return appendArg(
upsertArgValue(
upsertArgValue(
params.useResume
? stripLiveProcessArgs(params.args, params.backend)
: appendSystemPromptArg(
stripLiveProcessArgs(params.args, params.backend),
params.backend,
params.systemPrompt,
),
"--input-format",
"stream-json",
),
"--permission-prompt-tool",
"stdio",
),
"--replay-user-messages",
);
}
function buildClaudeLiveKey(context: PreparedCliRunContext): string {
return `${context.backendResolved.id}:${sha256(
JSON.stringify({
agentAccountId: context.params.agentAccountId,
agentId: context.params.agentId,
authProfileId: context.effectiveAuthProfileId,
sessionId: context.params.sessionId,
sessionKey: context.params.sessionKey,
}),
)}`;
}
function buildClaudeLiveFingerprint(params: {
context: PreparedCliRunContext;
argv: string[];
env: Record<string, string>;
}): string {
const normalizeMcpConfigPath = Boolean(params.context.preparedBackend.mcpConfigHash);
const skillSnapshot = params.context.params.skillsSnapshot;
const skillsFingerprint = skillSnapshot
? sha256(
JSON.stringify({
promptHash: sha256(skillSnapshot.prompt),
skillFilter: skillSnapshot.skillFilter,
skills: skillSnapshot.skills,
resolvedSkills: (skillSnapshot.resolvedSkills ?? []).map((skill) => ({
name: skill.name,
description: skill.description,
filePath: skill.filePath,
sourceInfo: skill.sourceInfo,
})),
version: skillSnapshot.version,
}),
)
: undefined;
const normalizePluginDir = Boolean(skillsFingerprint);
const omittedValueFlags = new Set(
[params.context.preparedBackend.backend.systemPromptArg, "--resume", "-r"].filter(
(entry): entry is string => typeof entry === "string" && entry.length > 0,
),
);
const unstableValueFlags = new Set(
[
params.context.preparedBackend.backend.sessionArg,
"--session-id",
normalizeMcpConfigPath ? "--mcp-config" : undefined,
normalizePluginDir ? "--plugin-dir" : undefined,
].filter((entry): entry is string => typeof entry === "string" && entry.length > 0),
);
const stableArgv: string[] = [];
for (let i = 0; i < params.argv.length; i += 1) {
const entry = params.argv[i] ?? "";
if (omittedValueFlags.has(entry)) {
i += 1;
continue;
}
if ([...omittedValueFlags].some((flag) => entry.startsWith(`${flag}=`))) {
continue;
}
if (unstableValueFlags.has(entry)) {
stableArgv.push("<unstable>");
i += 1;
continue;
}
if ([...unstableValueFlags].some((flag) => entry.startsWith(`${flag}=`))) {
stableArgv.push("<unstable>");
continue;
}
stableArgv.push(entry);
}
return JSON.stringify({
command: params.context.preparedBackend.backend.command,
workspaceDirHash: sha256(params.context.workspaceDir),
provider: params.context.params.provider,
model: params.context.normalizedModel,
systemPromptHash: sha256(params.context.systemPrompt),
authProfileIdHash: params.context.effectiveAuthProfileId
? sha256(params.context.effectiveAuthProfileId)
: undefined,
authEpochHash: params.context.authEpoch ? sha256(params.context.authEpoch) : undefined,
extraSystemPromptHash: params.context.extraSystemPromptHash,
mcpConfigHash: params.context.preparedBackend.mcpConfigHash,
skillsFingerprint,
argv: stableArgv,
env: Object.keys(params.env)
.toSorted()
.map((key) => [key, params.env[key] ? sha256(params.env[key]) : ""]),
});
}
function createAbortError(): Error {
const error = new Error("CLI run aborted");
error.name = "AbortError";
return error;
}
function clearTurnTimers(turn: ClaudeLiveTurn): void {
if (turn.noOutputTimer) {
clearTimeout(turn.noOutputTimer);
turn.noOutputTimer = null;
}
if (turn.timeoutTimer) {
clearTimeout(turn.timeoutTimer);
turn.timeoutTimer = null;
}
}
function clearDrainTimer(session: ClaudeLiveSession): void {
if (session.drainTimer) {
clearTimeout(session.drainTimer);
session.drainTimer = null;
}
}
function finishTurn(session: ClaudeLiveSession, output: CliOutput): void {
const turn = session.currentTurn;
if (!turn) {
return;
}
cliBackendLog.info(
`claude live session turn: provider=${session.providerId} model=${session.modelId} durationMs=${Date.now() - turn.startedAtMs} rawLines=${turn.rawLines.length}`,
);
clearTurnTimers(turn);
turn.streamingParser.finish();
session.currentTurn = null;
turn.resolve(output);
scheduleIdleClose(session);
}
function failTurn(session: ClaudeLiveSession, error: unknown): void {
const turn = session.currentTurn;
if (!turn) {
return;
}
const errorKind = error instanceof Error ? error.name : typeof error;
cliBackendLog.warn(
`claude live session turn failed: provider=${session.providerId} model=${session.modelId} durationMs=${Date.now() - turn.startedAtMs} error=${errorKind}`,
);
clearTurnTimers(turn);
turn.streamingParser.finish();
session.currentTurn = null;
turn.reject(error);
}
function abortTurn(session: ClaudeLiveSession, error: Error): void {
const turn = session.currentTurn;
if (!turn) {
return;
}
closeLiveSession(session, "abort", error);
}
function cleanupLiveSession(session: ClaudeLiveSession): void {
if (session.cleanupDone) {
return;
}
session.cleanupDone = true;
void session.cleanup();
}
function closeLiveSession(
session: ClaudeLiveSession,
reason: "idle" | "restart" | "abort",
error?: unknown,
): void {
if (session.closing) {
return;
}
cliBackendLog.info(
`claude live session close: provider=${session.providerId} model=${session.modelId} reason=${reason}`,
);
session.closing = true;
if (session.idleTimer) {
clearTimeout(session.idleTimer);
session.idleTimer = null;
}
clearDrainTimer(session);
if (liveSessions.get(session.key) === session) {
liveSessions.delete(session.key);
}
if (error) {
failTurn(session, error);
}
session.managedRun.cancel("manual-cancel");
cleanupLiveSession(session);
}
function scheduleIdleClose(session: ClaudeLiveSession): void {
if (session.idleTimer) {
clearTimeout(session.idleTimer);
}
session.idleTimer = setTimeout(() => {
if (!session.currentTurn) {
closeLiveSession(session, "idle");
}
}, CLAUDE_LIVE_IDLE_TIMEOUT_MS);
}
function createTimeoutError(session: ClaudeLiveSession, message: string): FailoverError {
return new FailoverError(message, {
reason: "timeout",
provider: session.providerId,
model: session.modelId,
status: resolveFailoverStatus("timeout"),
});
}
function createOutputLimitError(session: ClaudeLiveSession, message: string): FailoverError {
return new FailoverError(message, {
reason: "format",
provider: session.providerId,
model: session.modelId,
status: resolveFailoverStatus("format"),
});
}
function resetNoOutputTimer(session: ClaudeLiveSession): void {
const turn = session.currentTurn;
if (!turn) {
return;
}
if (turn.noOutputTimer) {
clearTimeout(turn.noOutputTimer);
}
turn.noOutputTimer = setTimeout(() => {
closeLiveSession(
session,
"abort",
createTimeoutError(
session,
`CLI produced no output for ${Math.round(session.noOutputTimeoutMs / 1000)}s and was terminated.`,
),
);
}, session.noOutputTimeoutMs);
}
function parseSessionId(parsed: Record<string, unknown>): string | undefined {
const sessionId =
typeof parsed.session_id === "string"
? parsed.session_id.trim()
: typeof parsed.sessionId === "string"
? parsed.sessionId.trim()
: "";
return sessionId || undefined;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value && typeof value === "object" && !Array.isArray(value));
}
function parseClaudeLiveJsonLine(
session: ClaudeLiveSession,
trimmed: string,
): Record<string, unknown> | null {
if (trimmed.length > CLAUDE_LIVE_MAX_STDOUT_BUFFER_CHARS) {
closeLiveSession(
session,
"abort",
createOutputLimitError(session, "Claude CLI JSONL line exceeded output limit."),
);
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return null;
}
return isRecord(parsed) ? parsed : null;
}
function createResultError(
session: ClaudeLiveSession,
parsed: Record<string, unknown>,
raw: string,
): FailoverError {
const result = typeof parsed.result === "string" ? parsed.result.trim() : "";
const message = extractCliErrorMessage(raw) ?? (result || "Claude CLI failed.");
const reason = classifyFailoverReason(message, { provider: session.providerId }) ?? "unknown";
return new FailoverError(message, {
reason,
provider: session.providerId,
model: session.modelId,
status: resolveFailoverStatus(reason),
});
}
function handleClaudeLiveLine(session: ClaudeLiveSession, line: string): void {
const turn = session.currentTurn;
const trimmed = line.trim();
if (!trimmed) {
return;
}
const parsed = parseClaudeLiveJsonLine(session, trimmed);
if (!parsed) {
return;
}
if (session.drainingAbortedTurn) {
if (parsed.type === "result") {
const turnToClear = session.currentTurn;
if (turnToClear) {
clearTurnTimers(turnToClear);
session.currentTurn = null;
}
session.drainingAbortedTurn = false;
clearDrainTimer(session);
scheduleIdleClose(session);
}
return;
}
if (!turn) {
return;
}
turn.rawChars += trimmed.length + 1;
if (
turn.rawChars > CLAUDE_LIVE_MAX_TURN_RAW_CHARS ||
turn.rawLines.length >= CLAUDE_LIVE_MAX_TURN_LINES
) {
closeLiveSession(
session,
"abort",
createOutputLimitError(session, "Claude CLI turn output exceeded limit."),
);
return;
}
turn.rawLines.push(trimmed);
turn.streamingParser.push(`${trimmed}\n`);
turn.sessionId = parseSessionId(parsed) ?? turn.sessionId;
if (parsed.type !== "result") {
return;
}
const raw = turn.rawLines.join("\n");
if (parsed.is_error === true) {
failTurn(session, createResultError(session, parsed, raw));
scheduleIdleClose(session);
return;
}
finishTurn(
session,
parseCliOutput({
raw,
backend: turn.backend,
providerId: session.providerId,
outputMode: "jsonl",
fallbackSessionId: turn.sessionId,
}),
);
}
function handleClaudeStdout(session: ClaudeLiveSession, chunk: string) {
resetNoOutputTimer(session);
session.stdoutBuffer += chunk;
if (session.stdoutBuffer.length > CLAUDE_LIVE_MAX_STDOUT_BUFFER_CHARS) {
closeLiveSession(
session,
"abort",
createOutputLimitError(session, "Claude CLI stdout buffer exceeded limit."),
);
return;
}
const lines = session.stdoutBuffer.split(/\r?\n/g);
session.stdoutBuffer = lines.pop() ?? "";
try {
for (const line of lines) {
handleClaudeLiveLine(session, line);
}
} catch (error) {
closeLiveSession(session, "abort", error);
}
}
function handleClaudeExit(session: ClaudeLiveSession, exitCode: number | null): void {
session.closing = true;
if (session.idleTimer) {
clearTimeout(session.idleTimer);
session.idleTimer = null;
}
clearDrainTimer(session);
if (liveSessions.get(session.key) === session) {
liveSessions.delete(session.key);
}
cleanupLiveSession(session);
if (!session.currentTurn) {
return;
}
if (session.stdoutBuffer.trim()) {
try {
handleClaudeLiveLine(session, session.stdoutBuffer);
} catch (error) {
session.stdoutBuffer = "";
failTurn(session, error);
return;
}
session.stdoutBuffer = "";
}
if (!session.currentTurn) {
return;
}
const stderr = session.stderr.trim();
const fallbackMessage =
exitCode === 0 ? "Claude CLI exited before completing the turn." : "Claude CLI failed.";
const message = extractCliErrorMessage(stderr) ?? (stderr || fallbackMessage);
if (exitCode === 0) {
failTurn(session, new Error(message));
return;
}
const reason = classifyFailoverReason(message, { provider: session.providerId }) ?? "unknown";
failTurn(
session,
new FailoverError(message, {
reason,
provider: session.providerId,
model: session.modelId,
status: resolveFailoverStatus(reason),
}),
);
}
function createClaudeUserInputMessage(content: string): string {
return `${JSON.stringify({
type: "user",
session_id: "",
parent_tool_use_id: null,
message: {
role: "user",
content,
},
})}\n`;
}
async function writeTurnInput(session: ClaudeLiveSession, prompt: string): Promise<void> {
const stdin = session.managedRun.stdin;
if (!stdin) {
throw new Error("Claude CLI live session stdin is unavailable");
}
await new Promise<void>((resolve, reject) => {
stdin.write(createClaudeUserInputMessage(prompt), (error) => {
if (error) {
reject(error);
return;
}
resolve();
});
});
}
async function createClaudeLiveSession(params: {
context: PreparedCliRunContext;
argv: string[];
env: Record<string, string>;
fingerprint: string;
key: string;
noOutputTimeoutMs: number;
supervisor: ProcessSupervisor;
cleanup: () => Promise<void>;
}): Promise<ClaudeLiveSession> {
let session: ClaudeLiveSession | null = null;
const managedRun = await params.supervisor.spawn({
sessionId: params.context.params.sessionId,
backendId: params.context.backendResolved.id,
scopeKey: `claude-live:${params.key}`,
replaceExistingScope: true,
mode: "child",
argv: params.argv,
cwd: params.context.workspaceDir,
env: params.env,
stdinMode: "pipe-open",
captureOutput: false,
onStdout: (chunk) => {
if (session) {
handleClaudeStdout(session, chunk);
}
},
onStderr: (chunk) => {
if (session) {
session.stderr += chunk;
if (session.stderr.length > CLAUDE_LIVE_MAX_STDERR_CHARS) {
closeLiveSession(
session,
"abort",
createOutputLimitError(session, "Claude CLI stderr exceeded limit."),
);
return;
}
resetNoOutputTimer(session);
}
},
});
session = {
key: params.key,
fingerprint: params.fingerprint,
managedRun,
providerId: params.context.params.provider,
modelId: params.context.modelId,
noOutputTimeoutMs: params.noOutputTimeoutMs,
stderr: "",
stdoutBuffer: "",
currentTurn: null,
drainTimer: null,
drainingAbortedTurn: false,
idleTimer: null,
cleanup: params.cleanup,
cleanupDone: false,
closing: false,
};
void managedRun.wait().then(
(exit) => handleClaudeExit(session, exit.exitCode),
(error) => {
if (session) {
closeLiveSession(session, "abort", error);
}
},
);
liveSessions.set(params.key, session);
cliBackendLog.info(
`claude live session start: provider=${session.providerId} model=${session.modelId} activeSessions=${liveSessions.size}`,
);
return session;
}
function createTurn(params: {
context: PreparedCliRunContext;
noOutputTimeoutMs: number;
onAssistantDelta: (delta: CliStreamingDelta) => void;
session: ClaudeLiveSession;
resolve: (output: CliOutput) => void;
reject: (error: unknown) => void;
}): ClaudeLiveTurn {
const turn: ClaudeLiveTurn = {
backend: params.context.preparedBackend.backend,
startedAtMs: Date.now(),
rawLines: [],
rawChars: 0,
noOutputTimer: null,
timeoutTimer: null,
streamingParser: createCliJsonlStreamingParser({
backend: params.context.preparedBackend.backend,
providerId: params.context.backendResolved.id,
onAssistantDelta: params.onAssistantDelta,
}),
resolve: params.resolve,
reject: params.reject,
};
turn.noOutputTimer = setTimeout(() => {
closeLiveSession(
params.session,
"abort",
createTimeoutError(
params.session,
`CLI produced no output for ${Math.round(params.noOutputTimeoutMs / 1000)}s and was terminated.`,
),
);
}, params.noOutputTimeoutMs);
turn.timeoutTimer = setTimeout(() => {
closeLiveSession(
params.session,
"abort",
createTimeoutError(
params.session,
`CLI exceeded timeout (${Math.round(params.context.params.timeoutMs / 1000)}s) and was terminated.`,
),
);
}, params.context.params.timeoutMs);
return turn;
}
function closeOldestIdleSession(): boolean {
for (const session of liveSessions.values()) {
if (!session.currentTurn && !session.drainingAbortedTurn) {
closeLiveSession(session, "idle");
return true;
}
}
return false;
}
function ensureLiveSessionCapacity(key: string, context: PreparedCliRunContext): void {
if (
liveSessions.has(key) ||
liveSessionCreates.has(key) ||
liveSessions.size + liveSessionCreates.size < CLAUDE_LIVE_MAX_SESSIONS
) {
return;
}
if (closeOldestIdleSession()) {
return;
}
throw new FailoverError("Too many Claude CLI live sessions are active.", {
reason: "rate_limit",
provider: context.params.provider,
model: context.modelId,
status: resolveFailoverStatus("rate_limit"),
});
}
export async function runClaudeLiveSessionTurn(params: {
context: PreparedCliRunContext;
args: string[];
env: Record<string, string>;
prompt: string;
useResume: boolean;
noOutputTimeoutMs: number;
getProcessSupervisor: () => ProcessSupervisor;
onAssistantDelta: (delta: CliStreamingDelta) => void;
cleanup: () => Promise<void>;
}): Promise<ClaudeLiveRunResult> {
const key = buildClaudeLiveKey(params.context);
const resumeCapable = Boolean(params.context.preparedBackend.backend.resumeArgs?.length);
const argv = [
params.context.preparedBackend.backend.command,
...buildClaudeLiveArgs({
args: params.args,
backend: params.context.preparedBackend.backend,
systemPrompt: params.context.systemPrompt,
useResume: params.useResume,
}),
];
const fingerprint = buildClaudeLiveFingerprint({
context: params.context,
argv,
env: params.env,
});
let cleanupDone = false;
const cleanup = async () => {
if (cleanupDone) {
return;
}
cleanupDone = true;
await params.cleanup();
};
let session = liveSessions.get(key) ?? null;
if (session && resumeCapable && !params.useResume) {
closeLiveSession(session, "restart");
session = null;
}
if (session && session.fingerprint !== fingerprint) {
closeLiveSession(session, "restart");
session = null;
}
let cleanupTurnArtifacts = Boolean(session);
try {
ensureLiveSessionCapacity(key, params.context);
} catch (error) {
await cleanup();
throw error;
}
if (!session) {
const pendingSession = liveSessionCreates.get(key);
if (pendingSession) {
try {
session = await pendingSession;
} catch (error) {
await cleanup();
throw error;
}
if (session.fingerprint !== fingerprint) {
closeLiveSession(session, "restart");
session = null;
} else if (resumeCapable && !params.useResume) {
closeLiveSession(session, "restart");
session = null;
} else {
cleanupTurnArtifacts = true;
}
}
if (!session) {
const createSession = createClaudeLiveSession({
context: params.context,
argv,
env: params.env,
fingerprint,
key,
noOutputTimeoutMs: params.noOutputTimeoutMs,
supervisor: params.getProcessSupervisor(),
cleanup,
}).finally(() => {
if (liveSessionCreates.get(key) === createSession) {
liveSessionCreates.delete(key);
}
});
liveSessionCreates.set(key, createSession);
try {
session = await createSession;
} catch (error) {
await cleanup();
throw error;
}
}
}
if (cleanupTurnArtifacts && session) {
await cleanup();
if (session.idleTimer) {
clearTimeout(session.idleTimer);
session.idleTimer = null;
}
cliBackendLog.info(
`claude live session reuse: provider=${session.providerId} model=${session.modelId}`,
);
}
if (session.closing) {
await cleanup();
throw new Error("Claude CLI live session closed before handling the turn");
}
if (session.currentTurn || session.drainingAbortedTurn) {
throw new Error("Claude CLI live session is already handling a turn");
}
const liveSession = session;
liveSession.noOutputTimeoutMs = params.noOutputTimeoutMs;
liveSession.stderr = "";
const outputPromise = new Promise<CliOutput>((resolve, reject) => {
liveSession.currentTurn = createTurn({
context: params.context,
noOutputTimeoutMs: params.noOutputTimeoutMs,
onAssistantDelta: params.onAssistantDelta,
session: liveSession,
resolve,
reject,
});
});
const abort = () => abortTurn(liveSession, createAbortError());
const replyBackendHandle: ReplyBackendHandle | undefined = params.context.params.replyOperation
? {
kind: "cli",
cancel: abort,
isStreaming: () => false,
}
: undefined;
params.context.params.abortSignal?.addEventListener("abort", abort, { once: true });
if (replyBackendHandle) {
params.context.params.replyOperation?.attachBackend(replyBackendHandle);
}
try {
if (params.context.params.abortSignal?.aborted) {
abort();
} else {
try {
await writeTurnInput(liveSession, params.prompt);
} catch (error) {
closeLiveSession(liveSession, "abort", error);
}
}
return { output: await outputPromise };
} finally {
params.context.params.abortSignal?.removeEventListener("abort", abort);
if (replyBackendHandle) {
params.context.params.replyOperation?.detachBackend(replyBackendHandle);
}
}
}

View File

@@ -17,6 +17,7 @@ import { FailoverError, resolveFailoverStatus } from "../failover-error.js";
import { classifyFailoverReason } from "../pi-embedded-helpers.js";
import { applyPluginTextReplacements } from "../plugin-text-transforms.js";
import { applySkillEnvOverridesFromSnapshot } from "../skills.js";
import { runClaudeLiveSessionTurn, shouldUseClaudeLiveSession } from "./claude-live-session.js";
import { prepareClaudeCliSkillsPlugin } from "./claude-skills-plugin.js";
import {
buildCliSupervisorScopeKey,
@@ -155,7 +156,7 @@ function formatCliEnvKeyList(keys: readonly string[]): string {
function buildCliEnvMcpLog(childEnv: Record<string, string>): string {
return [
`token=${childEnv.OPENCLAW_MCP_TOKEN ? "set" : "missing"}`,
`sessionKey=${childEnv.OPENCLAW_MCP_SESSION_KEY || "<empty>"}`,
`sessionKey=${childEnv.OPENCLAW_MCP_SESSION_KEY ? "set" : "<empty>"}`,
`agentId=${childEnv.OPENCLAW_MCP_AGENT_ID || "<empty>"}`,
`accountId=${childEnv.OPENCLAW_MCP_ACCOUNT_ID || "<empty>"}`,
`messageChannel=${childEnv.OPENCLAW_MCP_MESSAGE_CHANNEL || "<empty>"}`,
@@ -235,6 +236,7 @@ export async function executePreparedCliRun(
backendId: context.backendResolved.id,
skillsSnapshot: params.skillsSnapshot,
});
let claudeSkillsPluginCleanupOwned = false;
const args = buildCliArgs({
backend,
baseArgs:
@@ -329,29 +331,71 @@ export async function executePreparedCliRun(
timeoutMs: params.timeoutMs,
useResume,
});
const streamingParser =
backend.output === "jsonl"
? createCliJsonlStreamingParser({
backend,
providerId: context.backendResolved.id,
onAssistantDelta: ({ text, delta }) => {
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text: applyPluginTextReplacements(
text,
context.backendResolved.textTransforms?.output,
),
delta: applyPluginTextReplacements(
delta,
context.backendResolved.textTransforms?.output,
),
},
});
const hasJsonlOutput = backend.output === "jsonl";
if (shouldUseClaudeLiveSession(context)) {
if (!hasJsonlOutput) {
throw new Error("Claude live session requires JSONL streaming parser");
}
claudeSkillsPluginCleanupOwned = true;
const liveResult = await runClaudeLiveSessionTurn({
context,
args,
env,
prompt,
useResume,
noOutputTimeoutMs,
getProcessSupervisor: executeDeps.getProcessSupervisor,
onAssistantDelta: ({ text, delta }) => {
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text: applyPluginTextReplacements(
text,
context.backendResolved.textTransforms?.output,
),
delta: applyPluginTextReplacements(
delta,
context.backendResolved.textTransforms?.output,
),
},
})
: null;
});
},
cleanup: claudeSkillsPlugin.cleanup,
});
const rawText = liveResult.output.text;
return {
...liveResult.output,
rawText,
finalPromptText: prompt,
text: applyPluginTextReplacements(
rawText,
context.backendResolved.textTransforms?.output,
),
};
}
const streamingParser = hasJsonlOutput
? createCliJsonlStreamingParser({
backend,
providerId: context.backendResolved.id,
onAssistantDelta: ({ text, delta }) => {
emitAgentEvent({
runId: params.runId,
stream: "assistant",
data: {
text: applyPluginTextReplacements(
text,
context.backendResolved.textTransforms?.output,
),
delta: applyPluginTextReplacements(
delta,
context.backendResolved.textTransforms?.output,
),
},
});
},
})
: null;
const supervisor = executeDeps.getProcessSupervisor();
const scopeKey = buildCliSupervisorScopeKey({
backend,
@@ -495,7 +539,9 @@ export async function executePreparedCliRun(
}
});
} finally {
await claudeSkillsPlugin.cleanup();
if (!claudeSkillsPluginCleanupOwned) {
await claudeSkillsPlugin.cleanup();
}
if (systemPromptFile) {
await systemPromptFile.cleanup();
}

View File

@@ -234,6 +234,7 @@ export async function prepareCliRunContext(
authEpoch,
extraSystemPromptHash,
mcpConfigHash: preparedBackendFinal.mcpConfigHash,
mcpResumeHash: preparedBackendFinal.mcpResumeHash,
})
: params.cliSessionId
? { sessionId: params.cliSessionId }

View File

@@ -44,6 +44,7 @@ export type CliPreparedBackend = {
backend: CliBackendConfig;
cleanup?: () => Promise<void>;
mcpConfigHash?: string;
mcpResumeHash?: string;
env?: Record<string, string>;
};

View File

@@ -22,6 +22,7 @@ describe("cli-session helpers", () => {
authEpoch: "auth-epoch",
extraSystemPromptHash: "prompt-hash",
mcpConfigHash: "mcp-hash",
mcpResumeHash: "mcp-resume-hash",
});
expect(entry.cliSessionIds?.["claude-cli"]).toBe("cli-session-1");
@@ -32,6 +33,7 @@ describe("cli-session helpers", () => {
authEpoch: "auth-epoch",
extraSystemPromptHash: "prompt-hash",
mcpConfigHash: "mcp-hash",
mcpResumeHash: "mcp-resume-hash",
});
});
@@ -144,6 +146,38 @@ describe("cli-session helpers", () => {
).toEqual({ sessionId: "cli-session-1" });
});
it("prefers the stable MCP resume hash over the raw MCP config hash", () => {
const binding = {
sessionId: "cli-session-1",
authProfileId: "anthropic:work",
authEpoch: "auth-epoch-a",
extraSystemPromptHash: "prompt-a",
mcpConfigHash: "mcp-config-a",
mcpResumeHash: "mcp-resume-a",
};
expect(
resolveCliSessionReuse({
binding,
authProfileId: "anthropic:work",
authEpoch: "auth-epoch-a",
extraSystemPromptHash: "prompt-a",
mcpConfigHash: "mcp-config-b",
mcpResumeHash: "mcp-resume-a",
}),
).toEqual({ sessionId: "cli-session-1" });
expect(
resolveCliSessionReuse({
binding,
authProfileId: "anthropic:work",
authEpoch: "auth-epoch-a",
extraSystemPromptHash: "prompt-a",
mcpConfigHash: "mcp-config-a",
mcpResumeHash: "mcp-resume-b",
}),
).toEqual({ invalidatedReason: "mcp" });
});
it("clears provider-scoped and global CLI session state", () => {
const entry: SessionEntry = {
sessionId: "openclaw-session",

View File

@@ -30,6 +30,7 @@ export function getCliSessionBinding(
authEpoch: normalizeOptionalString(fromBindings?.authEpoch),
extraSystemPromptHash: normalizeOptionalString(fromBindings?.extraSystemPromptHash),
mcpConfigHash: normalizeOptionalString(fromBindings?.mcpConfigHash),
mcpResumeHash: normalizeOptionalString(fromBindings?.mcpResumeHash),
};
}
const fromMap = entry.cliSessionIds?.[normalized];
@@ -83,6 +84,9 @@ export function setCliSessionBinding(
...(normalizeOptionalString(binding.mcpConfigHash)
? { mcpConfigHash: normalizeOptionalString(binding.mcpConfigHash) }
: {}),
...(normalizeOptionalString(binding.mcpResumeHash)
? { mcpResumeHash: normalizeOptionalString(binding.mcpResumeHash) }
: {}),
},
};
entry.cliSessionIds = { ...entry.cliSessionIds, [normalized]: trimmed };
@@ -120,6 +124,7 @@ export function resolveCliSessionReuse(params: {
authEpoch?: string;
extraSystemPromptHash?: string;
mcpConfigHash?: string;
mcpResumeHash?: string;
}): {
sessionId?: string;
invalidatedReason?: "auth-profile" | "auth-epoch" | "system-prompt" | "mcp";
@@ -133,6 +138,7 @@ export function resolveCliSessionReuse(params: {
const currentAuthEpoch = normalizeOptionalString(params.authEpoch);
const currentExtraSystemPromptHash = normalizeOptionalString(params.extraSystemPromptHash);
const currentMcpConfigHash = normalizeOptionalString(params.mcpConfigHash);
const currentMcpResumeHash = normalizeOptionalString(params.mcpResumeHash);
const storedAuthProfileId = normalizeOptionalString(binding?.authProfileId);
if (storedAuthProfileId !== currentAuthProfileId) {
return { invalidatedReason: "auth-profile" };
@@ -145,6 +151,13 @@ export function resolveCliSessionReuse(params: {
if (storedExtraSystemPromptHash !== currentExtraSystemPromptHash) {
return { invalidatedReason: "system-prompt" };
}
const storedMcpResumeHash = normalizeOptionalString(binding?.mcpResumeHash);
if (storedMcpResumeHash || currentMcpResumeHash) {
if (storedMcpResumeHash !== currentMcpResumeHash) {
return { invalidatedReason: "mcp" };
}
return { sessionId };
}
const storedMcpConfigHash = normalizeOptionalString(binding?.mcpConfigHash);
if (storedMcpConfigHash !== currentMcpConfigHash) {
return { invalidatedReason: "mcp" };

View File

@@ -3606,6 +3606,10 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
type: "string",
const: "claude-stream-json",
},
liveSession: {
type: "string",
const: "claude-stdio",
},
input: {
anyOf: [
{

View File

@@ -74,6 +74,7 @@ export type CliSessionBinding = {
authEpoch?: string;
extraSystemPromptHash?: string;
mcpConfigHash?: string;
mcpResumeHash?: string;
};
export type SessionCompactionCheckpointReason =

View File

@@ -91,6 +91,8 @@ export type CliBackendConfig = {
resumeOutput?: "json" | "text" | "jsonl";
/** JSONL event dialect for CLIs with provider-specific stream formats. */
jsonlDialect?: "claude-stream-json";
/** Long-lived CLI process mode. */
liveSession?: "claude-stdio";
/** Prompt input mode (default: arg). */
input?: "arg" | "stdin";
/** Max prompt length for arg mode (if exceeded, stdin is used). */

View File

@@ -546,6 +546,7 @@ export const CliBackendSchema = z
output: z.union([z.literal("json"), z.literal("text"), z.literal("jsonl")]).optional(),
resumeOutput: z.union([z.literal("json"), z.literal("text"), z.literal("jsonl")]).optional(),
jsonlDialect: z.literal("claude-stream-json").optional(),
liveSession: z.literal("claude-stdio").optional(),
input: z.union([z.literal("arg"), z.literal("stdin")]).optional(),
maxPromptArgChars: z.number().int().positive().optional(),
env: z.record(z.string(), z.string()).optional(),