fix: keep cli reply runs streaming

This commit is contained in:
Peter Steinberger
2026-04-22 23:53:39 +01:00
parent c4e5ca8625
commit 53e822f407
5 changed files with 155 additions and 4 deletions

View File

@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
- Agents/MCP: keep `mcp.servers` and bundle MCP tools available in Pi embedded
`coding` and `messaging` sessions while preserving `minimal` profile and
`tools.deny: ["bundle-mcp"]` opt-out behavior. Fixes #68875 and #68818.
- CLI/Claude: report CLI-backed reply runs as streaming while Claude/Codex CLI turns are still in flight, so WebChat keeps visible response state until the backend finishes. Fixes #70125.
- Codex harness: rotate the shared app-server websocket client when the configured bearer token changes, so auth-token refreshes reconnect with the new `Authorization` header instead of reusing a stale socket. (#70328) Thanks @Lucenx9.
- Telegram/sandbox: keep Telegram bot DMs on per-account sender session keys even when `session.dmScope=main`, so sandbox/tool policy can distinguish Telegram-originated direct chats from the agent main session.
- Config/models: merge provider-scoped model allowlist updates and protect model/provider map writes from accidental full replacement, adding `config set --merge` for additive updates and `--replace` for intentional clobbers. Fixes #65920, #68392, and #68653.

View File

@@ -1,4 +1,9 @@
import { describe, expect, it } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
__testing as replyRunTesting,
createReplyOperation,
replyRunRegistry,
} from "../auto-reply/reply/reply-run-registry.js";
import { runPreparedCliAgent } from "./cli-runner.js";
import {
createManagedRun,
@@ -60,6 +65,10 @@ function buildPreparedContext(params?: {
}
describe("runCliAgent reliability", () => {
afterEach(() => {
replyRunTesting.resetReplyRunRegistry();
});
it("fails with timeout when no-output watchdog trips", async () => {
supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({
@@ -219,6 +228,62 @@ describe("runCliAgent reliability", () => {
});
});
it("reports CLI reply backends as streaming until the managed run finishes", async () => {
const operation = createReplyOperation({
sessionKey: "agent:main:main",
sessionId: "s1",
resetTriggered: false,
});
operation.setPhase("running");
let finishRun: (() => void) | undefined;
const waitForExit = new Promise<
Awaited<ReturnType<ReturnType<typeof createManagedRun>["wait"]>>
>((resolve) => {
finishRun = () => {
resolve({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: "hello from cli",
stderr: "",
timedOut: false,
noOutputTimedOut: false,
});
};
});
supervisorSpawnMock.mockResolvedValueOnce({
...createManagedRun({
reason: "exit",
exitCode: 0,
exitSignal: null,
durationMs: 50,
stdout: "unused",
stderr: "",
timedOut: false,
noOutputTimedOut: false,
}),
wait: vi.fn(() => waitForExit),
});
const run = executePreparedCliRun({
...buildPreparedContext({ sessionKey: "agent:main:main" }),
params: {
...buildPreparedContext({ sessionKey: "agent:main:main" }).params,
replyOperation: operation,
},
});
await vi.waitFor(() => {
expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(true);
});
finishRun?.();
await expect(run).resolves.toMatchObject({ text: "hello from cli" });
expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(false);
operation.complete();
});
it("keeps raw assistant output separate from transformed visible CLI output", async () => {
supervisorSpawnMock.mockResolvedValueOnce(
createManagedRun({

View File

@@ -1,7 +1,12 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
__testing as replyRunTesting,
createReplyOperation,
replyRunRegistry,
} from "../auto-reply/reply/reply-run-registry.js";
import { onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js";
import type { getProcessSupervisor } from "../process/supervisor/index.js";
import {
@@ -32,10 +37,16 @@ type SupervisorSpawnFn = ProcessSupervisor["spawn"];
beforeEach(() => {
resetAgentEventsForTest();
resetClaudeLiveSessionsForTest();
replyRunTesting.resetReplyRunRegistry();
restoreCliRunnerPrepareTestDeps();
supervisorSpawnMock.mockClear();
});
afterEach(() => {
resetClaudeLiveSessionsForTest();
replyRunTesting.resetReplyRunRegistry();
});
function buildPreparedCliRunContext(params: {
provider: "claude-cli" | "codex-cli";
model: string;
@@ -769,6 +780,76 @@ describe("runCliAgent spawn path", () => {
}
});
it("reports Claude live session reply backends as streaming until the turn finishes", async () => {
let stdoutListener: ((chunk: string) => void) | undefined;
let markWriteReady: (() => void) | undefined;
const writeReady = new Promise<void>((resolve) => {
markWriteReady = resolve;
});
const stdin = {
write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => {
markWriteReady?.();
cb?.();
}),
end: vi.fn(),
};
supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => {
const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void };
stdoutListener = input.onStdout;
return {
runId: "live-run",
pid: 2345,
startedAtMs: Date.now(),
stdin,
wait: vi.fn(() => new Promise(() => {})),
cancel: vi.fn(),
};
});
const operation = createReplyOperation({
sessionKey: "agent:main:main",
sessionId: "live-session-reply",
resetTriggered: false,
});
operation.setPhase("running");
const context = buildPreparedCliRunContext({
provider: "claude-cli",
model: "sonnet",
runId: "run-live-reply-streaming",
sessionId: "live-session-reply",
sessionKey: "agent:main:main",
prompt: "hello",
backend: {
liveSession: "claude-stdio",
},
});
const run = executePreparedCliRun({
...context,
params: {
...context.params,
replyOperation: operation,
},
});
await writeReady;
expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(true);
stdoutListener?.(
[
JSON.stringify({ type: "system", subtype: "init", session_id: "live-session-reply" }),
JSON.stringify({
type: "result",
session_id: "live-session-reply",
result: "done",
}),
].join("\n") + "\n",
);
await expect(run).resolves.toMatchObject({ text: "done" });
expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(false);
operation.complete();
});
it("reuses a Claude live session when resumed turns omit the system prompt arg", async () => {
let stdoutListener: ((chunk: string) => void) | undefined;
let turn = 0;

View File

@@ -882,11 +882,12 @@ export async function runClaudeLiveSessionTurn(params: {
});
});
const abort = () => abortTurn(liveSession, createAbortError());
let replyBackendCompleted = false;
const replyBackendHandle: ReplyBackendHandle | undefined = params.context.params.replyOperation
? {
kind: "cli",
cancel: abort,
isStreaming: () => false,
isStreaming: () => !replyBackendCompleted,
}
: undefined;
params.context.params.abortSignal?.addEventListener("abort", abort, { once: true });
@@ -905,6 +906,7 @@ export async function runClaudeLiveSessionTurn(params: {
}
return { output: await outputPromise };
} finally {
replyBackendCompleted = true;
params.context.params.abortSignal?.removeEventListener("abort", abort);
if (replyBackendHandle) {
params.context.params.replyOperation?.detachBackend(replyBackendHandle);

View File

@@ -417,13 +417,14 @@ export async function executePreparedCliRun(
input: stdinPayload,
onStdout: streamingParser ? (chunk: string) => streamingParser.push(chunk) : undefined,
});
let replyBackendCompleted = false;
const replyBackendHandle = params.replyOperation
? {
kind: "cli" as const,
cancel: () => {
managedRun.cancel("manual-cancel");
},
isStreaming: () => false,
isStreaming: () => !replyBackendCompleted,
}
: undefined;
if (replyBackendHandle) {
@@ -440,6 +441,7 @@ export async function executePreparedCliRun(
try {
result = await managedRun.wait();
} finally {
replyBackendCompleted = true;
if (replyBackendHandle) {
params.replyOperation?.detachBackend(replyBackendHandle);
}