mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-08 20:22:56 +00:00
fix(agents/cli-runner): gate cliSessionBinding persist on transcript flush
When a claude-cli turn produces a session id but the underlying claude subprocess fails to flush an assistant-role record to its ~/.claude/projects/<cwd>/<sid>.jsonl transcript (e.g. mid-turn kill from a concurrent fingerprint-mismatched turn, supervisor restart, internal failure), buildCliRunResult was still persisting that session id into cliSessionBinding. The next turn ran claudeCliSessionTranscriptHasContent, didn't find the file, logged 'cli session reset: reason=missing-transcript', and started a brand-new claude session with empty memory. End-user symptom: agent forgets prior conversation between turns. Gate the cliSessionBinding spread on the same predicate the next-turn invalidator uses, evaluated at write time. Also clear agentMeta.sessionId in the same case so the session-store fallback at command/session-store.ts (which reads agentMeta.sessionId via setCliSessionId when the binding is absent) doesn't re-persist the unflushed sid through a different field path. The fallback is what makes the binding-only gate insufficient on its own; both writes must drop together. The gate only fires for claude-cli providers — other CLI providers don't write to ~/.claude/projects, so probing them would always return false and incorrectly strip valid binding metadata. isCliBindingFlushed now takes the provider id and returns true unconditionally for non-claude-cli sessions. A bounded retry (0 / 50 / 150 ms) tolerates the brief gap between claude-cli's stdio close and the OS making the JSONL line visible to readers (cooperative fsync semantics on APFS, but not guaranteed under stress). The transcript-probe is exposed as an injectable dep (setCliRunnerTestDeps / restoreCliRunnerTestDeps) mirroring the existing pattern in src/agents/cli-runner/prepare.ts so isCliBindingFlushed is testable without touching ~/.claude/projects. AI-assisted: yes. Tooling: Claude Opus + claude-cli. Codex review caught the fallback path and the missing provider gate before this hit upstream. Real-Behavior-Proof: dist-side patch on M5 gateway; branch-build follow-up pending — see PR body.
This commit is contained in:
committed by
Ayaan Zaidi
parent
d13c8b03c9
commit
07c1245db4
103
src/agents/cli-runner.binding-flush.test.ts
Normal file
103
src/agents/cli-runner.binding-flush.test.ts
Normal file
@@ -0,0 +1,103 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
isCliBindingFlushed,
|
||||
restoreCliRunnerTestDeps,
|
||||
setCliRunnerTestDeps,
|
||||
} from "./cli-runner.js";
|
||||
|
||||
describe("isCliBindingFlushed", () => {
|
||||
beforeEach(() => {
|
||||
restoreCliRunnerTestDeps();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
restoreCliRunnerTestDeps();
|
||||
});
|
||||
|
||||
it("returns false when no sessionId is provided", async () => {
|
||||
const probe = vi.fn(async () => true);
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
expect(await isCliBindingFlushed(undefined, "claude-cli")).toBe(false);
|
||||
expect(probe).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns true when the transcript has content on the first probe", async () => {
|
||||
const probe = vi.fn(async () => true);
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
expect(await isCliBindingFlushed("sid-fresh", "claude-cli")).toBe(true);
|
||||
expect(probe).toHaveBeenCalledTimes(1);
|
||||
expect(probe).toHaveBeenCalledWith({ sessionId: "sid-fresh" });
|
||||
});
|
||||
|
||||
it("retries up to three times before giving up", async () => {
|
||||
const probe = vi.fn(async () => false);
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
expect(await isCliBindingFlushed("sid-cold", "claude-cli")).toBe(false);
|
||||
expect(probe).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
|
||||
it("succeeds when the transcript becomes visible on a later retry", async () => {
|
||||
let calls = 0;
|
||||
const probe = vi.fn(async () => {
|
||||
calls += 1;
|
||||
return calls >= 2;
|
||||
});
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
expect(await isCliBindingFlushed("sid-late", "claude-cli")).toBe(true);
|
||||
expect(probe).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("schedules at most 0 + 50 + 150ms of delay across the bounded retry", async () => {
|
||||
// 0 + 50 + 150 = 200ms of scheduled delay if all three probes return false.
|
||||
// Using fake timers so the assertion measures *scheduled* delay rather
|
||||
// than wall-clock elapsed time (the latter is flaky under CI threadpool
|
||||
// load — the probe itself can spend tens of ms before the first sleep).
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const probe = vi.fn(async () => false);
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
const settled = vi.fn();
|
||||
const errored = vi.fn();
|
||||
isCliBindingFlushed("sid-bounded", "claude-cli").then(settled, errored);
|
||||
|
||||
// Drain any synchronous probe calls and the queued setTimeouts.
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
await vi.advanceTimersByTimeAsync(50);
|
||||
await vi.advanceTimersByTimeAsync(150);
|
||||
|
||||
expect(settled).toHaveBeenCalledTimes(1);
|
||||
expect(settled.mock.calls[0]?.[0]).toBe(false);
|
||||
expect(errored).not.toHaveBeenCalled();
|
||||
expect(probe).toHaveBeenCalledTimes(3);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("returns true without probing for non-claude-cli providers", async () => {
|
||||
// The transcript probe walks `~/.claude/projects` and only knows about
|
||||
// claude-cli sessions. For codex / openai / anthropic-api / etc., probing
|
||||
// would always return false and incorrectly strip valid binding metadata,
|
||||
// so we must skip the probe entirely.
|
||||
const probe = vi.fn(async () => false);
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
expect(await isCliBindingFlushed("sid-codex", "codex-cli")).toBe(true);
|
||||
expect(await isCliBindingFlushed("sid-anthropic", "anthropic")).toBe(true);
|
||||
expect(await isCliBindingFlushed("sid-openai", "openai")).toBe(true);
|
||||
expect(probe).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns true without probing when provider is undefined", async () => {
|
||||
const probe = vi.fn(async () => false);
|
||||
setCliRunnerTestDeps({ claudeCliSessionTranscriptHasContent: probe });
|
||||
|
||||
expect(await isCliBindingFlushed("sid-x", undefined)).toBe(true);
|
||||
expect(probe).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -2,6 +2,7 @@ import type { ReplyPayload } from "../auto-reply/reply-payload.js";
|
||||
import { SILENT_REPLY_TOKEN } from "../auto-reply/tokens.js";
|
||||
import { formatErrorMessage } from "../infra/errors.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { isClaudeCliProvider } from "../plugin-sdk/anthropic-cli.js";
|
||||
import { buildAgentHookContextChannelFields } from "../plugins/hook-agent-context.js";
|
||||
import { resolveBlockMessage } from "../plugins/hook-decision-types.js";
|
||||
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
||||
@@ -11,6 +12,7 @@ import {
|
||||
loadCliSessionHistoryMessages,
|
||||
} from "./cli-runner/session-history.js";
|
||||
import type { PreparedCliRunContext, RunCliAgentParams } from "./cli-runner/types.js";
|
||||
import { claudeCliSessionTranscriptHasContent as claudeCliSessionTranscriptHasContentImpl } from "./command/attempt-execution.helpers.js";
|
||||
import { classifyFailoverReason, isFailoverErrorMessage } from "./embedded-agent-helpers.js";
|
||||
import type { EmbeddedAgentRunResult } from "./embedded-agent-runner.js";
|
||||
import { FailoverError, isFailoverError, resolveFailoverStatus } from "./failover-error.js";
|
||||
@@ -32,6 +34,39 @@ import { SessionManager } from "./sessions/index.js";
|
||||
|
||||
const log = createSubsystemLogger("agents/cli-runner");
|
||||
|
||||
const cliRunnerDeps = {
|
||||
claudeCliSessionTranscriptHasContent: claudeCliSessionTranscriptHasContentImpl,
|
||||
};
|
||||
|
||||
export function setCliRunnerTestDeps(overrides: Partial<typeof cliRunnerDeps>): void {
|
||||
Object.assign(cliRunnerDeps, overrides);
|
||||
}
|
||||
|
||||
export function restoreCliRunnerTestDeps(): void {
|
||||
cliRunnerDeps.claudeCliSessionTranscriptHasContent = claudeCliSessionTranscriptHasContentImpl;
|
||||
}
|
||||
|
||||
export async function isCliBindingFlushed(
|
||||
sessionId: string | undefined,
|
||||
provider: string | undefined,
|
||||
): Promise<boolean> {
|
||||
if (!provider || !isClaudeCliProvider(provider)) {
|
||||
return true;
|
||||
}
|
||||
if (!sessionId) {
|
||||
return false;
|
||||
}
|
||||
for (const delayMs of [0, 50, 150]) {
|
||||
if (delayMs > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
}
|
||||
if (await cliRunnerDeps.claudeCliSessionTranscriptHasContent({ sessionId })) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function flushSessionManagerFile(sessionManager: SessionManager): void {
|
||||
(sessionManager as unknown as { rewriteFile?: () => void }).rewriteFile?.();
|
||||
}
|
||||
@@ -503,10 +538,21 @@ export async function runPreparedCliAgent(
|
||||
const buildCliRunResult = (resultParams: {
|
||||
output: Awaited<ReturnType<typeof executePreparedCliRun>>;
|
||||
effectiveCliSessionId?: string;
|
||||
bindingFlushOk?: boolean;
|
||||
}): EmbeddedAgentRunResult => {
|
||||
const text = resultParams.output.text?.trim();
|
||||
const rawText = resultParams.output.rawText?.trim();
|
||||
const payloads = text ? [{ text }] : undefined;
|
||||
const unflushedCliSessionId =
|
||||
resultParams.effectiveCliSessionId && resultParams.bindingFlushOk === false
|
||||
? resultParams.effectiveCliSessionId
|
||||
: undefined;
|
||||
const persistedCliSessionId = unflushedCliSessionId
|
||||
? undefined
|
||||
: resultParams.effectiveCliSessionId;
|
||||
const agentSessionId = unflushedCliSessionId
|
||||
? ""
|
||||
: (resultParams.effectiveCliSessionId ?? params.sessionId ?? "");
|
||||
|
||||
return {
|
||||
payloads,
|
||||
@@ -545,15 +591,15 @@ export async function runPreparedCliAgent(
|
||||
refusal: false,
|
||||
},
|
||||
agentMeta: {
|
||||
sessionId: resultParams.effectiveCliSessionId ?? params.sessionId ?? "",
|
||||
sessionId: agentSessionId,
|
||||
provider: params.provider,
|
||||
model: context.modelId,
|
||||
usage: resultParams.output.usage,
|
||||
...(resultParams.output.usage ? { lastCallUsage: resultParams.output.usage } : {}),
|
||||
...(resultParams.effectiveCliSessionId
|
||||
...(persistedCliSessionId
|
||||
? {
|
||||
cliSessionBinding: {
|
||||
sessionId: resultParams.effectiveCliSessionId,
|
||||
sessionId: persistedCliSessionId,
|
||||
...(context.effectiveAuthProfileId
|
||||
? { authProfileId: context.effectiveAuthProfileId }
|
||||
: {}),
|
||||
@@ -672,6 +718,7 @@ export async function runPreparedCliAgent(
|
||||
assistantText,
|
||||
output,
|
||||
});
|
||||
const bindingFlushOk = await isCliBindingFlushed(effectiveCliSessionId, params.provider);
|
||||
await runCliAgentEndHook(params, {
|
||||
event: {
|
||||
messages: buildAgentEndMessages(lastAssistant),
|
||||
@@ -681,7 +728,7 @@ export async function runPreparedCliAgent(
|
||||
ctx: hookContext,
|
||||
hookRunner,
|
||||
});
|
||||
return buildCliRunResult({ output, effectiveCliSessionId });
|
||||
return buildCliRunResult({ output, effectiveCliSessionId, bindingFlushOk });
|
||||
} catch (err) {
|
||||
if (isFailoverError(err)) {
|
||||
const retryableSessionId = context.reusableCliSession.sessionId ?? params.cliSessionId;
|
||||
@@ -704,6 +751,10 @@ export async function runPreparedCliAgent(
|
||||
assistantText,
|
||||
output,
|
||||
});
|
||||
const bindingFlushOk = await isCliBindingFlushed(
|
||||
effectiveCliSessionId,
|
||||
params.provider,
|
||||
);
|
||||
await runCliAgentEndHook(params, {
|
||||
event: {
|
||||
messages: buildAgentEndMessages(lastAssistant),
|
||||
@@ -713,7 +764,7 @@ export async function runPreparedCliAgent(
|
||||
ctx: hookContext,
|
||||
hookRunner,
|
||||
});
|
||||
return buildCliRunResult({ output, effectiveCliSessionId });
|
||||
return buildCliRunResult({ output, effectiveCliSessionId, bindingFlushOk });
|
||||
} catch (retryErr) {
|
||||
const retryMessage = formatErrorMessage(retryErr);
|
||||
await runCliAgentEndHook(params, {
|
||||
|
||||
Reference in New Issue
Block a user