diff --git a/CHANGELOG.md b/CHANGELOG.md index ce2120b3959..627e62542f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai - Agents/MCP: keep `mcp.servers` and bundle MCP tools available in Pi embedded `coding` and `messaging` sessions while preserving `minimal` profile and `tools.deny: ["bundle-mcp"]` opt-out behavior. Fixes #68875 and #68818. +- CLI/Claude: report CLI-backed reply runs as streaming while Claude/Codex CLI turns are still in flight, so WebChat keeps visible response state until the backend finishes. Fixes #70125. - Codex harness: rotate the shared app-server websocket client when the configured bearer token changes, so auth-token refreshes reconnect with the new `Authorization` header instead of reusing a stale socket. (#70328) Thanks @Lucenx9. - Telegram/sandbox: keep Telegram bot DMs on per-account sender session keys even when `session.dmScope=main`, so sandbox/tool policy can distinguish Telegram-originated direct chats from the agent main session. - Config/models: merge provider-scoped model allowlist updates and protect model/provider map writes from accidental full replacement, adding `config set --merge` for additive updates and `--replace` for intentional clobbers. Fixes #65920, #68392, and #68653. diff --git a/src/agents/cli-runner.reliability.test.ts b/src/agents/cli-runner.reliability.test.ts index 613e2f860ba..eb495361513 100644 --- a/src/agents/cli-runner.reliability.test.ts +++ b/src/agents/cli-runner.reliability.test.ts @@ -1,4 +1,9 @@ -import { describe, expect, it } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { + __testing as replyRunTesting, + createReplyOperation, + replyRunRegistry, +} from "../auto-reply/reply/reply-run-registry.js"; import { runPreparedCliAgent } from "./cli-runner.js"; import { createManagedRun, @@ -60,6 +65,10 @@ function buildPreparedContext(params?: { } describe("runCliAgent reliability", () => { + afterEach(() => { + replyRunTesting.resetReplyRunRegistry(); + }); + it("fails with timeout when no-output watchdog trips", async () => { supervisorSpawnMock.mockResolvedValueOnce( createManagedRun({ @@ -219,6 +228,62 @@ describe("runCliAgent reliability", () => { }); }); + it("reports CLI reply backends as streaming until the managed run finishes", async () => { + const operation = createReplyOperation({ + sessionKey: "agent:main:main", + sessionId: "s1", + resetTriggered: false, + }); + operation.setPhase("running"); + let finishRun: (() => void) | undefined; + const waitForExit = new Promise< + Awaited["wait"]>> + >((resolve) => { + finishRun = () => { + resolve({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: "hello from cli", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }); + }; + }); + supervisorSpawnMock.mockResolvedValueOnce({ + ...createManagedRun({ + reason: "exit", + exitCode: 0, + exitSignal: null, + durationMs: 50, + stdout: "unused", + stderr: "", + timedOut: false, + noOutputTimedOut: false, + }), + wait: vi.fn(() => waitForExit), + }); + + const run = executePreparedCliRun({ + ...buildPreparedContext({ sessionKey: "agent:main:main" }), + params: { + ...buildPreparedContext({ sessionKey: "agent:main:main" }).params, + replyOperation: operation, + }, + }); + + await vi.waitFor(() => { + expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(true); + }); + + finishRun?.(); + await expect(run).resolves.toMatchObject({ text: "hello from cli" }); + expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(false); + operation.complete(); + }); + it("keeps raw assistant output separate from transformed visible CLI output", async () => { supervisorSpawnMock.mockResolvedValueOnce( createManagedRun({ diff --git a/src/agents/cli-runner.spawn.test.ts b/src/agents/cli-runner.spawn.test.ts index afbf6c47fed..065fa704b00 100644 --- a/src/agents/cli-runner.spawn.test.ts +++ b/src/agents/cli-runner.spawn.test.ts @@ -1,7 +1,12 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + __testing as replyRunTesting, + createReplyOperation, + replyRunRegistry, +} from "../auto-reply/reply/reply-run-registry.js"; import { onAgentEvent, resetAgentEventsForTest } from "../infra/agent-events.js"; import type { getProcessSupervisor } from "../process/supervisor/index.js"; import { @@ -32,10 +37,16 @@ type SupervisorSpawnFn = ProcessSupervisor["spawn"]; beforeEach(() => { resetAgentEventsForTest(); resetClaudeLiveSessionsForTest(); + replyRunTesting.resetReplyRunRegistry(); restoreCliRunnerPrepareTestDeps(); supervisorSpawnMock.mockClear(); }); +afterEach(() => { + resetClaudeLiveSessionsForTest(); + replyRunTesting.resetReplyRunRegistry(); +}); + function buildPreparedCliRunContext(params: { provider: "claude-cli" | "codex-cli"; model: string; @@ -769,6 +780,76 @@ describe("runCliAgent spawn path", () => { } }); + it("reports Claude live session reply backends as streaming until the turn finishes", async () => { + let stdoutListener: ((chunk: string) => void) | undefined; + let markWriteReady: (() => void) | undefined; + const writeReady = new Promise((resolve) => { + markWriteReady = resolve; + }); + const stdin = { + write: vi.fn((_data: string, cb?: (err?: Error | null) => void) => { + markWriteReady?.(); + cb?.(); + }), + end: vi.fn(), + }; + supervisorSpawnMock.mockImplementation(async (...args: unknown[]) => { + const input = (args[0] ?? {}) as { onStdout?: (chunk: string) => void }; + stdoutListener = input.onStdout; + return { + runId: "live-run", + pid: 2345, + startedAtMs: Date.now(), + stdin, + wait: vi.fn(() => new Promise(() => {})), + cancel: vi.fn(), + }; + }); + const operation = createReplyOperation({ + sessionKey: "agent:main:main", + sessionId: "live-session-reply", + resetTriggered: false, + }); + operation.setPhase("running"); + const context = buildPreparedCliRunContext({ + provider: "claude-cli", + model: "sonnet", + runId: "run-live-reply-streaming", + sessionId: "live-session-reply", + sessionKey: "agent:main:main", + prompt: "hello", + backend: { + liveSession: "claude-stdio", + }, + }); + + const run = executePreparedCliRun({ + ...context, + params: { + ...context.params, + replyOperation: operation, + }, + }); + + await writeReady; + expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(true); + + stdoutListener?.( + [ + JSON.stringify({ type: "system", subtype: "init", session_id: "live-session-reply" }), + JSON.stringify({ + type: "result", + session_id: "live-session-reply", + result: "done", + }), + ].join("\n") + "\n", + ); + + await expect(run).resolves.toMatchObject({ text: "done" }); + expect(replyRunRegistry.isStreaming("agent:main:main")).toBe(false); + operation.complete(); + }); + it("reuses a Claude live session when resumed turns omit the system prompt arg", async () => { let stdoutListener: ((chunk: string) => void) | undefined; let turn = 0; diff --git a/src/agents/cli-runner/claude-live-session.ts b/src/agents/cli-runner/claude-live-session.ts index 077b8154015..a00751f06ec 100644 --- a/src/agents/cli-runner/claude-live-session.ts +++ b/src/agents/cli-runner/claude-live-session.ts @@ -882,11 +882,12 @@ export async function runClaudeLiveSessionTurn(params: { }); }); const abort = () => abortTurn(liveSession, createAbortError()); + let replyBackendCompleted = false; const replyBackendHandle: ReplyBackendHandle | undefined = params.context.params.replyOperation ? { kind: "cli", cancel: abort, - isStreaming: () => false, + isStreaming: () => !replyBackendCompleted, } : undefined; params.context.params.abortSignal?.addEventListener("abort", abort, { once: true }); @@ -905,6 +906,7 @@ export async function runClaudeLiveSessionTurn(params: { } return { output: await outputPromise }; } finally { + replyBackendCompleted = true; params.context.params.abortSignal?.removeEventListener("abort", abort); if (replyBackendHandle) { params.context.params.replyOperation?.detachBackend(replyBackendHandle); diff --git a/src/agents/cli-runner/execute.ts b/src/agents/cli-runner/execute.ts index 04c6549468d..65a4e990a35 100644 --- a/src/agents/cli-runner/execute.ts +++ b/src/agents/cli-runner/execute.ts @@ -417,13 +417,14 @@ export async function executePreparedCliRun( input: stdinPayload, onStdout: streamingParser ? (chunk: string) => streamingParser.push(chunk) : undefined, }); + let replyBackendCompleted = false; const replyBackendHandle = params.replyOperation ? { kind: "cli" as const, cancel: () => { managedRun.cancel("manual-cancel"); }, - isStreaming: () => false, + isStreaming: () => !replyBackendCompleted, } : undefined; if (replyBackendHandle) { @@ -440,6 +441,7 @@ export async function executePreparedCliRun( try { result = await managedRun.wait(); } finally { + replyBackendCompleted = true; if (replyBackendHandle) { params.replyOperation?.detachBackend(replyBackendHandle); }