From 257e2f5338d13ca634869670c88c7baa73d8d059 Mon Sep 17 00:00:00 2001 From: Bob Date: Wed, 4 Mar 2026 11:44:20 +0100 Subject: [PATCH] fix: relay ACP sessions_spawn parent streaming (#34310) (thanks @vincentkoc) (#34310) Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com> --- CHANGELOG.md | 3 + docs/tools/acp-agents.md | 2 + docs/tools/index.md | 4 +- src/agents/acp-spawn-parent-stream.test.ts | 242 ++++++++++++ src/agents/acp-spawn-parent-stream.ts | 376 +++++++++++++++++++ src/agents/acp-spawn.test.ts | 170 +++++++++ src/agents/acp-spawn.ts | 63 +++- src/agents/openclaw-tools.sessions.test.ts | 1 + src/agents/tools/sessions-spawn-tool.test.ts | 23 ++ src/agents/tools/sessions-spawn-tool.ts | 12 +- 10 files changed, 893 insertions(+), 3 deletions(-) create mode 100644 src/agents/acp-spawn-parent-stream.test.ts create mode 100644 src/agents/acp-spawn-parent-stream.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index db6e5f310e8..563deedf307 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -69,6 +69,9 @@ Docs: https://docs.openclaw.ai - iOS/TTS playback fallback: keep voice playback resilient by switching from PCM to MP3 when provider format support is unavailable, while avoiding sticky fallback on generic local playback errors. (#33032) thanks @mbelinky. - Telegram/multi-account default routing clarity: warn only for ambiguous (2+) account setups without an explicit default, add `openclaw doctor` warnings for missing/invalid multi-account defaults across channels, and document explicit-default guidance for channel routing and Telegram config. (#32544) thanks @Sid-Qin. - Telegram/plugin outbound hook parity: run `message_sending` + `message_sent` in Telegram reply delivery, include reply-path hook metadata (`mediaUrls`, `threadId`), and report `message_sent.success=false` when hooks blank text and no outbound message is delivered. (#32649) Thanks @KimGLee. +- CLI/Coding-agent reliability: switch default `claude-cli` non-interactive args to `--permission-mode bypassPermissions`, auto-normalize legacy `--dangerously-skip-permissions` backend overrides to the modern permission-mode form, align coding-agent + live-test docs with the non-PTY Claude path, and emit session system-event heartbeat notices when CLI watchdog no-output timeouts terminate runs. (#28610, #31149, #34055). Thanks @niceysam, @cryptomaltese and @vincentkoc. +- ACP/ACPX session bootstrap: retry with `sessions new` when `sessions ensure` returns no session identifiers so ACP spawns avoid `NO_SESSION`/`ACP_TURN_FAILED` failures on affected agents. (#28786, #31338, #34055). Thanks @Sid-Qin and @vincentkoc. +- ACP/sessions_spawn parent stream visibility: add `streamTo: "parent"` for `runtime: "acp"` to forward initial child-run progress/no-output/completion updates back into the requester session as system events (instead of direct child delivery), and emit a tail-able session-scoped relay log (`.acp-stream.jsonl`, returned as `streamLogPath` when available), improving orchestrator visibility for blocked or long-running harness turns. (#34310, #29909; reopened from #34055). Thanks @vincentkoc. - Agents/bootstrap truncation warning handling: unify bootstrap budget/truncation analysis across embedded + CLI runtime, `/context`, and `openclaw doctor`; add `agents.defaults.bootstrapPromptTruncationWarning` (`off|once|always`, default `once`) and persist warning-signature metadata so truncation warnings are consistent and deduped across turns. (#32769) Thanks @gumadeiras. - Agents/Skills runtime loading: propagate run config into embedded attempt and compaction skill-entry loading so explicitly enabled bundled companion skills are discovered consistently when skill snapshots do not already provide resolved entries. Thanks @gumadeiras. - Agents/Session startup date grounding: substitute `YYYY-MM-DD` placeholders in startup/post-compaction AGENTS context and append runtime current-time lines for `/new` and `/reset` prompts so daily-memory references resolve correctly. (#32381) Thanks @chengzhichao-xydt. diff --git a/docs/tools/acp-agents.md b/docs/tools/acp-agents.md index d16bfc3868b..f6c1d5734cb 100644 --- a/docs/tools/acp-agents.md +++ b/docs/tools/acp-agents.md @@ -119,6 +119,8 @@ Interface details: - `mode: "session"` requires `thread: true` - `cwd` (optional): requested runtime working directory (validated by backend/runtime policy). - `label` (optional): operator-facing label used in session/banner text. +- `streamTo` (optional): `"parent"` streams initial ACP run progress summaries back to the requester session as system events. + - When available, accepted responses include `streamLogPath` pointing to a session-scoped JSONL log (`.acp-stream.jsonl`) you can tail for full relay history. ## Sandbox compatibility diff --git a/docs/tools/index.md b/docs/tools/index.md index fdbc0250833..47366f25e3a 100644 --- a/docs/tools/index.md +++ b/docs/tools/index.md @@ -472,7 +472,7 @@ Core parameters: - `sessions_list`: `kinds?`, `limit?`, `activeMinutes?`, `messageLimit?` (0 = none) - `sessions_history`: `sessionKey` (or `sessionId`), `limit?`, `includeTools?` - `sessions_send`: `sessionKey` (or `sessionId`), `message`, `timeoutSeconds?` (0 = fire-and-forget) -- `sessions_spawn`: `task`, `label?`, `runtime?`, `agentId?`, `model?`, `thinking?`, `cwd?`, `runTimeoutSeconds?`, `thread?`, `mode?`, `cleanup?`, `sandbox?`, `attachments?`, `attachAs?` +- `sessions_spawn`: `task`, `label?`, `runtime?`, `agentId?`, `model?`, `thinking?`, `cwd?`, `runTimeoutSeconds?`, `thread?`, `mode?`, `cleanup?`, `sandbox?`, `streamTo?`, `attachments?`, `attachAs?` - `session_status`: `sessionKey?` (default current; accepts `sessionId`), `model?` (`default` clears override) Notes: @@ -483,6 +483,7 @@ Notes: - `sessions_send` waits for final completion when `timeoutSeconds > 0`. - Delivery/announce happens after completion and is best-effort; `status: "ok"` confirms the agent run finished, not that the announce was delivered. - `sessions_spawn` supports `runtime: "subagent" | "acp"` (`subagent` default). For ACP runtime behavior, see [ACP Agents](/tools/acp-agents). +- For ACP runtime, `streamTo: "parent"` routes initial-run progress summaries back to the requester session as system events instead of direct child delivery. - `sessions_spawn` starts a sub-agent run and posts an announce reply back to the requester chat. - Supports one-shot mode (`mode: "run"`) and persistent thread-bound mode (`mode: "session"` with `thread: true`). - If `thread: true` and `mode` is omitted, mode defaults to `session`. @@ -496,6 +497,7 @@ Notes: - Configure limits via `tools.sessions_spawn.attachments` (`enabled`, `maxTotalBytes`, `maxFiles`, `maxFileBytes`, `retainOnSessionKeep`). - `attachAs.mountPath` is a reserved hint for future mount implementations. - `sessions_spawn` is non-blocking and returns `status: "accepted"` immediately. +- ACP `streamTo: "parent"` responses may include `streamLogPath` (session-scoped `*.acp-stream.jsonl`) for tailing progress history. - `sessions_send` runs a reply‑back ping‑pong (reply `REPLY_SKIP` to stop; max turns via `session.agentToAgent.maxPingPongTurns`, 0–5). - After the ping‑pong, the target agent runs an **announce step**; reply `ANNOUNCE_SKIP` to suppress the announcement. - Sandbox clamp: when the current session is sandboxed and `agents.defaults.sandbox.sessionToolsVisibility: "spawned"`, OpenClaw clamps `tools.sessions.visibility` to `tree`. diff --git a/src/agents/acp-spawn-parent-stream.test.ts b/src/agents/acp-spawn-parent-stream.test.ts new file mode 100644 index 00000000000..010cd596e7f --- /dev/null +++ b/src/agents/acp-spawn-parent-stream.test.ts @@ -0,0 +1,242 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { emitAgentEvent } from "../infra/agent-events.js"; +import { + resolveAcpSpawnStreamLogPath, + startAcpSpawnParentStreamRelay, +} from "./acp-spawn-parent-stream.js"; + +const enqueueSystemEventMock = vi.fn(); +const requestHeartbeatNowMock = vi.fn(); +const readAcpSessionEntryMock = vi.fn(); +const resolveSessionFilePathMock = vi.fn(); +const resolveSessionFilePathOptionsMock = vi.fn(); + +vi.mock("../infra/system-events.js", () => ({ + enqueueSystemEvent: (...args: unknown[]) => enqueueSystemEventMock(...args), +})); + +vi.mock("../infra/heartbeat-wake.js", () => ({ + requestHeartbeatNow: (...args: unknown[]) => requestHeartbeatNowMock(...args), +})); + +vi.mock("../acp/runtime/session-meta.js", () => ({ + readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args), +})); + +vi.mock("../config/sessions/paths.js", () => ({ + resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args), + resolveSessionFilePathOptions: (...args: unknown[]) => resolveSessionFilePathOptionsMock(...args), +})); + +function collectedTexts() { + return enqueueSystemEventMock.mock.calls.map((call) => String(call[0] ?? "")); +} + +describe("startAcpSpawnParentStreamRelay", () => { + beforeEach(() => { + enqueueSystemEventMock.mockClear(); + requestHeartbeatNowMock.mockClear(); + readAcpSessionEntryMock.mockReset(); + resolveSessionFilePathMock.mockReset(); + resolveSessionFilePathOptionsMock.mockReset(); + resolveSessionFilePathOptionsMock.mockImplementation((value: unknown) => value); + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-04T01:00:00.000Z")); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("relays assistant progress and completion to the parent session", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-1", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child-1", + agentId: "codex", + streamFlushMs: 10, + noOutputNoticeMs: 120_000, + }); + + emitAgentEvent({ + runId: "run-1", + stream: "assistant", + data: { + delta: "hello from child", + }, + }); + vi.advanceTimersByTime(15); + + emitAgentEvent({ + runId: "run-1", + stream: "lifecycle", + data: { + phase: "end", + startedAt: 1_000, + endedAt: 3_100, + }, + }); + + const texts = collectedTexts(); + expect(texts.some((text) => text.includes("Started codex session"))).toBe(true); + expect(texts.some((text) => text.includes("codex: hello from child"))).toBe(true); + expect(texts.some((text) => text.includes("codex run completed in 2s"))).toBe(true); + expect(requestHeartbeatNowMock).toHaveBeenCalledWith( + expect.objectContaining({ + reason: "acp:spawn:stream", + sessionKey: "agent:main:main", + }), + ); + relay.dispose(); + }); + + it("emits a no-output notice and a resumed notice when output returns", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-2", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child-2", + agentId: "codex", + streamFlushMs: 1, + noOutputNoticeMs: 1_000, + noOutputPollMs: 250, + }); + + vi.advanceTimersByTime(1_500); + expect(collectedTexts().some((text) => text.includes("has produced no output for 1s"))).toBe( + true, + ); + + emitAgentEvent({ + runId: "run-2", + stream: "assistant", + data: { + delta: "resumed output", + }, + }); + vi.advanceTimersByTime(5); + + const texts = collectedTexts(); + expect(texts.some((text) => text.includes("resumed output."))).toBe(true); + expect(texts.some((text) => text.includes("codex: resumed output"))).toBe(true); + + emitAgentEvent({ + runId: "run-2", + stream: "lifecycle", + data: { + phase: "error", + error: "boom", + }, + }); + expect(collectedTexts().some((text) => text.includes("run failed: boom"))).toBe(true); + relay.dispose(); + }); + + it("auto-disposes stale relays after max lifetime timeout", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-3", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child-3", + agentId: "codex", + streamFlushMs: 1, + noOutputNoticeMs: 0, + maxRelayLifetimeMs: 1_000, + }); + + vi.advanceTimersByTime(1_001); + expect(collectedTexts().some((text) => text.includes("stream relay timed out after 1s"))).toBe( + true, + ); + + const before = enqueueSystemEventMock.mock.calls.length; + emitAgentEvent({ + runId: "run-3", + stream: "assistant", + data: { + delta: "late output", + }, + }); + vi.advanceTimersByTime(5); + + expect(enqueueSystemEventMock.mock.calls).toHaveLength(before); + relay.dispose(); + }); + + it("supports delayed start notices", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-4", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child-4", + agentId: "codex", + emitStartNotice: false, + }); + + expect(collectedTexts().some((text) => text.includes("Started codex session"))).toBe(false); + + relay.notifyStarted(); + + expect(collectedTexts().some((text) => text.includes("Started codex session"))).toBe(true); + relay.dispose(); + }); + + it("preserves delta whitespace boundaries in progress relays", () => { + const relay = startAcpSpawnParentStreamRelay({ + runId: "run-5", + parentSessionKey: "agent:main:main", + childSessionKey: "agent:codex:acp:child-5", + agentId: "codex", + streamFlushMs: 10, + noOutputNoticeMs: 120_000, + }); + + emitAgentEvent({ + runId: "run-5", + stream: "assistant", + data: { + delta: "hello", + }, + }); + emitAgentEvent({ + runId: "run-5", + stream: "assistant", + data: { + delta: " world", + }, + }); + vi.advanceTimersByTime(15); + + const texts = collectedTexts(); + expect(texts.some((text) => text.includes("codex: hello world"))).toBe(true); + relay.dispose(); + }); + + it("resolves ACP spawn stream log path from session metadata", () => { + readAcpSessionEntryMock.mockReturnValue({ + storePath: "/tmp/openclaw/agents/codex/sessions/sessions.json", + entry: { + sessionId: "sess-123", + sessionFile: "/tmp/openclaw/agents/codex/sessions/sess-123.jsonl", + }, + }); + resolveSessionFilePathMock.mockReturnValue( + "/tmp/openclaw/agents/codex/sessions/sess-123.jsonl", + ); + + const resolved = resolveAcpSpawnStreamLogPath({ + childSessionKey: "agent:codex:acp:child-1", + }); + + expect(resolved).toBe("/tmp/openclaw/agents/codex/sessions/sess-123.acp-stream.jsonl"); + expect(readAcpSessionEntryMock).toHaveBeenCalledWith({ + sessionKey: "agent:codex:acp:child-1", + }); + expect(resolveSessionFilePathMock).toHaveBeenCalledWith( + "sess-123", + expect.objectContaining({ + sessionId: "sess-123", + }), + expect.objectContaining({ + storePath: "/tmp/openclaw/agents/codex/sessions/sessions.json", + }), + ); + }); +}); diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts new file mode 100644 index 00000000000..94f04ce3940 --- /dev/null +++ b/src/agents/acp-spawn-parent-stream.ts @@ -0,0 +1,376 @@ +import { appendFile, mkdir } from "node:fs/promises"; +import path from "node:path"; +import { readAcpSessionEntry } from "../acp/runtime/session-meta.js"; +import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js"; +import { onAgentEvent } from "../infra/agent-events.js"; +import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; +import { enqueueSystemEvent } from "../infra/system-events.js"; +import { scopedHeartbeatWakeOptions } from "../routing/session-key.js"; + +const DEFAULT_STREAM_FLUSH_MS = 2_500; +const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000; +const DEFAULT_NO_OUTPUT_POLL_MS = 15_000; +const DEFAULT_MAX_RELAY_LIFETIME_MS = 6 * 60 * 60 * 1000; +const STREAM_BUFFER_MAX_CHARS = 4_000; +const STREAM_SNIPPET_MAX_CHARS = 220; + +function compactWhitespace(value: string): string { + return value.replace(/\s+/g, " ").trim(); +} + +function truncate(value: string, maxChars: number): string { + if (value.length <= maxChars) { + return value; + } + if (maxChars <= 1) { + return value.slice(0, maxChars); + } + return `${value.slice(0, maxChars - 1)}…`; +} + +function toTrimmedString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed || undefined; +} + +function toFiniteNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +function resolveAcpStreamLogPathFromSessionFile(sessionFile: string, sessionId: string): string { + const baseDir = path.dirname(path.resolve(sessionFile)); + return path.join(baseDir, `${sessionId}.acp-stream.jsonl`); +} + +export function resolveAcpSpawnStreamLogPath(params: { + childSessionKey: string; +}): string | undefined { + const childSessionKey = params.childSessionKey.trim(); + if (!childSessionKey) { + return undefined; + } + const storeEntry = readAcpSessionEntry({ + sessionKey: childSessionKey, + }); + const sessionId = storeEntry?.entry?.sessionId?.trim(); + if (!storeEntry || !sessionId) { + return undefined; + } + try { + const sessionFile = resolveSessionFilePath( + sessionId, + storeEntry.entry, + resolveSessionFilePathOptions({ + storePath: storeEntry.storePath, + }), + ); + return resolveAcpStreamLogPathFromSessionFile(sessionFile, sessionId); + } catch { + return undefined; + } +} + +export function startAcpSpawnParentStreamRelay(params: { + runId: string; + parentSessionKey: string; + childSessionKey: string; + agentId: string; + logPath?: string; + streamFlushMs?: number; + noOutputNoticeMs?: number; + noOutputPollMs?: number; + maxRelayLifetimeMs?: number; + emitStartNotice?: boolean; +}): AcpSpawnParentRelayHandle { + const runId = params.runId.trim(); + const parentSessionKey = params.parentSessionKey.trim(); + if (!runId || !parentSessionKey) { + return { + dispose: () => {}, + notifyStarted: () => {}, + }; + } + + const streamFlushMs = + typeof params.streamFlushMs === "number" && Number.isFinite(params.streamFlushMs) + ? Math.max(0, Math.floor(params.streamFlushMs)) + : DEFAULT_STREAM_FLUSH_MS; + const noOutputNoticeMs = + typeof params.noOutputNoticeMs === "number" && Number.isFinite(params.noOutputNoticeMs) + ? Math.max(0, Math.floor(params.noOutputNoticeMs)) + : DEFAULT_NO_OUTPUT_NOTICE_MS; + const noOutputPollMs = + typeof params.noOutputPollMs === "number" && Number.isFinite(params.noOutputPollMs) + ? Math.max(250, Math.floor(params.noOutputPollMs)) + : DEFAULT_NO_OUTPUT_POLL_MS; + const maxRelayLifetimeMs = + typeof params.maxRelayLifetimeMs === "number" && Number.isFinite(params.maxRelayLifetimeMs) + ? Math.max(1_000, Math.floor(params.maxRelayLifetimeMs)) + : DEFAULT_MAX_RELAY_LIFETIME_MS; + + const relayLabel = truncate(compactWhitespace(params.agentId), 40) || "ACP child"; + const contextPrefix = `acp-spawn:${runId}`; + const logPath = toTrimmedString(params.logPath); + let logDirReady = false; + let pendingLogLines = ""; + let logFlushScheduled = false; + let logWriteChain: Promise = Promise.resolve(); + const flushLogBuffer = () => { + if (!logPath || !pendingLogLines) { + return; + } + const chunk = pendingLogLines; + pendingLogLines = ""; + logWriteChain = logWriteChain + .then(async () => { + if (!logDirReady) { + await mkdir(path.dirname(logPath), { + recursive: true, + }); + logDirReady = true; + } + await appendFile(logPath, chunk, { + encoding: "utf-8", + mode: 0o600, + }); + }) + .catch(() => { + // Best-effort diagnostics; never break relay flow. + }); + }; + const scheduleLogFlush = () => { + if (!logPath || logFlushScheduled) { + return; + } + logFlushScheduled = true; + queueMicrotask(() => { + logFlushScheduled = false; + flushLogBuffer(); + }); + }; + const writeLogLine = (entry: Record) => { + if (!logPath) { + return; + } + try { + pendingLogLines += `${JSON.stringify(entry)}\n`; + if (pendingLogLines.length >= 16_384) { + flushLogBuffer(); + return; + } + scheduleLogFlush(); + } catch { + // Best-effort diagnostics; never break relay flow. + } + }; + const logEvent = (kind: string, fields?: Record) => { + writeLogLine({ + ts: new Date().toISOString(), + epochMs: Date.now(), + runId, + parentSessionKey, + childSessionKey: params.childSessionKey, + agentId: params.agentId, + kind, + ...fields, + }); + }; + const wake = () => { + requestHeartbeatNow( + scopedHeartbeatWakeOptions(parentSessionKey, { reason: "acp:spawn:stream" }), + ); + }; + const emit = (text: string, contextKey: string) => { + const cleaned = text.trim(); + if (!cleaned) { + return; + } + logEvent("system_event", { contextKey, text: cleaned }); + enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey }); + wake(); + }; + const emitStartNotice = () => { + emit( + `Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`, + `${contextPrefix}:start`, + ); + }; + + let disposed = false; + let pendingText = ""; + let lastProgressAt = Date.now(); + let stallNotified = false; + let flushTimer: NodeJS.Timeout | undefined; + let relayLifetimeTimer: NodeJS.Timeout | undefined; + + const clearFlushTimer = () => { + if (!flushTimer) { + return; + } + clearTimeout(flushTimer); + flushTimer = undefined; + }; + const clearRelayLifetimeTimer = () => { + if (!relayLifetimeTimer) { + return; + } + clearTimeout(relayLifetimeTimer); + relayLifetimeTimer = undefined; + }; + + const flushPending = () => { + clearFlushTimer(); + if (!pendingText) { + return; + } + const snippet = truncate(compactWhitespace(pendingText), STREAM_SNIPPET_MAX_CHARS); + pendingText = ""; + if (!snippet) { + return; + } + emit(`${relayLabel}: ${snippet}`, `${contextPrefix}:progress`); + }; + + const scheduleFlush = () => { + if (disposed || flushTimer || streamFlushMs <= 0) { + return; + } + flushTimer = setTimeout(() => { + flushPending(); + }, streamFlushMs); + flushTimer.unref?.(); + }; + + const noOutputWatcherTimer = setInterval(() => { + if (disposed || noOutputNoticeMs <= 0) { + return; + } + if (stallNotified) { + return; + } + if (Date.now() - lastProgressAt < noOutputNoticeMs) { + return; + } + stallNotified = true; + emit( + `${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`, + `${contextPrefix}:stall`, + ); + }, noOutputPollMs); + noOutputWatcherTimer.unref?.(); + + relayLifetimeTimer = setTimeout(() => { + if (disposed) { + return; + } + emit( + `${relayLabel} stream relay timed out after ${Math.max(1, Math.round(maxRelayLifetimeMs / 1000))}s without completion.`, + `${contextPrefix}:timeout`, + ); + dispose(); + }, maxRelayLifetimeMs); + relayLifetimeTimer.unref?.(); + + if (params.emitStartNotice !== false) { + emitStartNotice(); + } + + const unsubscribe = onAgentEvent((event) => { + if (disposed || event.runId !== runId) { + return; + } + + if (event.stream === "assistant") { + const data = event.data; + const deltaCandidate = + (data as { delta?: unknown } | undefined)?.delta ?? + (data as { text?: unknown } | undefined)?.text; + const delta = typeof deltaCandidate === "string" ? deltaCandidate : undefined; + if (!delta || !delta.trim()) { + return; + } + logEvent("assistant_delta", { delta }); + + if (stallNotified) { + stallNotified = false; + emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`); + } + + lastProgressAt = Date.now(); + pendingText += delta; + if (pendingText.length > STREAM_BUFFER_MAX_CHARS) { + pendingText = pendingText.slice(-STREAM_BUFFER_MAX_CHARS); + } + if (pendingText.length >= STREAM_SNIPPET_MAX_CHARS || delta.includes("\n\n")) { + flushPending(); + return; + } + scheduleFlush(); + return; + } + + if (event.stream !== "lifecycle") { + return; + } + + const phase = toTrimmedString((event.data as { phase?: unknown } | undefined)?.phase); + logEvent("lifecycle", { phase: phase ?? "unknown", data: event.data }); + if (phase === "end") { + flushPending(); + const startedAt = toFiniteNumber( + (event.data as { startedAt?: unknown } | undefined)?.startedAt, + ); + const endedAt = toFiniteNumber((event.data as { endedAt?: unknown } | undefined)?.endedAt); + const durationMs = + startedAt != null && endedAt != null && endedAt >= startedAt + ? endedAt - startedAt + : undefined; + if (durationMs != null) { + emit( + `${relayLabel} run completed in ${Math.max(1, Math.round(durationMs / 1000))}s.`, + `${contextPrefix}:done`, + ); + } else { + emit(`${relayLabel} run completed.`, `${contextPrefix}:done`); + } + dispose(); + return; + } + + if (phase === "error") { + flushPending(); + const errorText = toTrimmedString((event.data as { error?: unknown } | undefined)?.error); + if (errorText) { + emit(`${relayLabel} run failed: ${errorText}`, `${contextPrefix}:error`); + } else { + emit(`${relayLabel} run failed.`, `${contextPrefix}:error`); + } + dispose(); + } + }); + + const dispose = () => { + if (disposed) { + return; + } + disposed = true; + clearFlushTimer(); + clearRelayLifetimeTimer(); + flushLogBuffer(); + clearInterval(noOutputWatcherTimer); + unsubscribe(); + }; + + return { + dispose, + notifyStarted: emitStartNotice, + }; +} + +export type AcpSpawnParentRelayHandle = { + dispose: () => void; + notifyStarted: () => void; +}; diff --git a/src/agents/acp-spawn.test.ts b/src/agents/acp-spawn.test.ts index 732a465142d..b9b768361b2 100644 --- a/src/agents/acp-spawn.test.ts +++ b/src/agents/acp-spawn.test.ts @@ -33,6 +33,8 @@ const hoisted = vi.hoisted(() => { const sessionBindingListBySessionMock = vi.fn(); const closeSessionMock = vi.fn(); const initializeSessionMock = vi.fn(); + const startAcpSpawnParentStreamRelayMock = vi.fn(); + const resolveAcpSpawnStreamLogPathMock = vi.fn(); const state = { cfg: createDefaultSpawnConfig(), }; @@ -45,6 +47,8 @@ const hoisted = vi.hoisted(() => { sessionBindingListBySessionMock, closeSessionMock, initializeSessionMock, + startAcpSpawnParentStreamRelayMock, + resolveAcpSpawnStreamLogPathMock, state, }; }); @@ -100,6 +104,13 @@ vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) = }; }); +vi.mock("./acp-spawn-parent-stream.js", () => ({ + startAcpSpawnParentStreamRelay: (...args: unknown[]) => + hoisted.startAcpSpawnParentStreamRelayMock(...args), + resolveAcpSpawnStreamLogPath: (...args: unknown[]) => + hoisted.resolveAcpSpawnStreamLogPathMock(...args), +})); + const { spawnAcpDirect } = await import("./acp-spawn.js"); function createSessionBindingCapabilities() { @@ -132,6 +143,16 @@ function createSessionBinding(overrides?: Partial): Sessio }; } +function createRelayHandle(overrides?: { + dispose?: ReturnType; + notifyStarted?: ReturnType; +}) { + return { + dispose: overrides?.dispose ?? vi.fn(), + notifyStarted: overrides?.notifyStarted ?? vi.fn(), + }; +} + function expectResolvedIntroTextInBindMetadata(): void { const callWithMetadata = hoisted.sessionBindingBindMock.mock.calls.find( (call: unknown[]) => @@ -236,6 +257,12 @@ describe("spawnAcpDirect", () => { hoisted.sessionBindingResolveByConversationMock.mockReset().mockReturnValue(null); hoisted.sessionBindingListBySessionMock.mockReset().mockReturnValue([]); hoisted.sessionBindingUnbindMock.mockReset().mockResolvedValue([]); + hoisted.startAcpSpawnParentStreamRelayMock + .mockReset() + .mockImplementation(() => createRelayHandle()); + hoisted.resolveAcpSpawnStreamLogPathMock + .mockReset() + .mockReturnValue("/tmp/sess-main.acp-stream.jsonl"); }); it("spawns ACP session, binds a new thread, and dispatches initial task", async () => { @@ -423,4 +450,147 @@ describe("spawnAcpDirect", () => { expect(hoisted.callGatewayMock).not.toHaveBeenCalled(); expect(hoisted.initializeSessionMock).not.toHaveBeenCalled(); }); + + it('streams ACP progress to parent when streamTo="parent"', async () => { + const firstHandle = createRelayHandle(); + const secondHandle = createRelayHandle(); + hoisted.startAcpSpawnParentStreamRelayMock + .mockReset() + .mockReturnValueOnce(firstHandle) + .mockReturnValueOnce(secondHandle); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + streamTo: "parent", + }, + { + agentSessionKey: "agent:main:main", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl"); + const agentCall = hoisted.callGatewayMock.mock.calls + .map((call: unknown[]) => call[0] as { method?: string; params?: Record }) + .find((request) => request.method === "agent"); + const agentCallIndex = hoisted.callGatewayMock.mock.calls.findIndex( + (call: unknown[]) => (call[0] as { method?: string }).method === "agent", + ); + const relayCallOrder = hoisted.startAcpSpawnParentStreamRelayMock.mock.invocationCallOrder[0]; + const agentCallOrder = hoisted.callGatewayMock.mock.invocationCallOrder[agentCallIndex]; + expect(agentCall?.params?.deliver).toBe(false); + expect(typeof relayCallOrder).toBe("number"); + expect(typeof agentCallOrder).toBe("number"); + expect(relayCallOrder < agentCallOrder).toBe(true); + expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith( + expect.objectContaining({ + parentSessionKey: "agent:main:main", + agentId: "codex", + logPath: "/tmp/sess-main.acp-stream.jsonl", + emitStartNotice: false, + }), + ); + const relayRuns = hoisted.startAcpSpawnParentStreamRelayMock.mock.calls.map( + (call: unknown[]) => (call[0] as { runId?: string }).runId, + ); + expect(relayRuns).toContain(agentCall?.params?.idempotencyKey); + expect(relayRuns).toContain(result.runId); + expect(hoisted.resolveAcpSpawnStreamLogPathMock).toHaveBeenCalledWith({ + childSessionKey: expect.stringMatching(/^agent:codex:acp:/), + }); + expect(firstHandle.dispose).toHaveBeenCalledTimes(1); + expect(firstHandle.notifyStarted).not.toHaveBeenCalled(); + expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1); + }); + + it("announces parent relay start only after successful child dispatch", async () => { + const firstHandle = createRelayHandle(); + const secondHandle = createRelayHandle(); + hoisted.startAcpSpawnParentStreamRelayMock + .mockReset() + .mockReturnValueOnce(firstHandle) + .mockReturnValueOnce(secondHandle); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + streamTo: "parent", + }, + { + agentSessionKey: "agent:main:main", + }, + ); + + expect(result.status).toBe("accepted"); + expect(firstHandle.notifyStarted).not.toHaveBeenCalled(); + expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1); + const notifyOrder = secondHandle.notifyStarted.mock.invocationCallOrder; + const agentCallIndex = hoisted.callGatewayMock.mock.calls.findIndex( + (call: unknown[]) => (call[0] as { method?: string }).method === "agent", + ); + const agentCallOrder = hoisted.callGatewayMock.mock.invocationCallOrder[agentCallIndex]; + expect(typeof agentCallOrder).toBe("number"); + expect(typeof notifyOrder[0]).toBe("number"); + expect(notifyOrder[0] > agentCallOrder).toBe(true); + }); + + it("disposes pre-registered parent relay when initial ACP dispatch fails", async () => { + const relayHandle = createRelayHandle(); + hoisted.startAcpSpawnParentStreamRelayMock.mockReturnValueOnce(relayHandle); + hoisted.callGatewayMock.mockImplementation(async (argsUnknown: unknown) => { + const args = argsUnknown as { method?: string }; + if (args.method === "sessions.patch") { + return { ok: true }; + } + if (args.method === "agent") { + throw new Error("agent dispatch failed"); + } + if (args.method === "sessions.delete") { + return { ok: true }; + } + return {}; + }); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + streamTo: "parent", + }, + { + agentSessionKey: "agent:main:main", + }, + ); + + expect(result.status).toBe("error"); + expect(result.error).toContain("agent dispatch failed"); + expect(relayHandle.dispose).toHaveBeenCalledTimes(1); + expect(relayHandle.notifyStarted).not.toHaveBeenCalled(); + }); + + it('rejects streamTo="parent" without requester session context', async () => { + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + streamTo: "parent", + }, + { + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + }, + ); + + expect(result.status).toBe("error"); + expect(result.error).toContain('streamTo="parent"'); + expect(hoisted.callGatewayMock).not.toHaveBeenCalled(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); }); diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index ff475e54ebf..d5da9d199d8 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -32,12 +32,19 @@ import { } from "../infra/outbound/session-binding-service.js"; import { normalizeAgentId } from "../routing/session-key.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { + type AcpSpawnParentRelayHandle, + resolveAcpSpawnStreamLogPath, + startAcpSpawnParentStreamRelay, +} from "./acp-spawn-parent-stream.js"; import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js"; export const ACP_SPAWN_MODES = ["run", "session"] as const; export type SpawnAcpMode = (typeof ACP_SPAWN_MODES)[number]; export const ACP_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const; export type SpawnAcpSandboxMode = (typeof ACP_SPAWN_SANDBOX_MODES)[number]; +export const ACP_SPAWN_STREAM_TARGETS = ["parent"] as const; +export type SpawnAcpStreamTarget = (typeof ACP_SPAWN_STREAM_TARGETS)[number]; export type SpawnAcpParams = { task: string; @@ -47,6 +54,7 @@ export type SpawnAcpParams = { mode?: SpawnAcpMode; thread?: boolean; sandbox?: SpawnAcpSandboxMode; + streamTo?: SpawnAcpStreamTarget; }; export type SpawnAcpContext = { @@ -63,6 +71,7 @@ export type SpawnAcpResult = { childSessionKey?: string; runId?: string; mode?: SpawnAcpMode; + streamLogPath?: string; note?: string; error?: string; }; @@ -234,6 +243,14 @@ export async function spawnAcpDirect( }; } const sandboxMode = params.sandbox === "require" ? "require" : "inherit"; + const streamToParentRequested = params.streamTo === "parent"; + const parentSessionKey = ctx.agentSessionKey?.trim(); + if (streamToParentRequested && !parentSessionKey) { + return { + status: "error", + error: 'sessions_spawn streamTo="parent" requires an active requester session context.', + }; + } const requesterRuntime = resolveSandboxRuntimeStatus({ cfg, sessionKey: ctx.agentSessionKey, @@ -410,8 +427,27 @@ export async function spawnAcpDirect( ? `channel:${boundThreadId}` : requesterOrigin?.to?.trim() || (deliveryThreadId ? `channel:${deliveryThreadId}` : undefined); const hasDeliveryTarget = Boolean(requesterOrigin?.channel && inferredDeliveryTo); + const deliverToBoundTarget = hasDeliveryTarget && !streamToParentRequested; const childIdem = crypto.randomUUID(); let childRunId: string = childIdem; + const streamLogPath = + streamToParentRequested && parentSessionKey + ? resolveAcpSpawnStreamLogPath({ + childSessionKey: sessionKey, + }) + : undefined; + let parentRelay: AcpSpawnParentRelayHandle | undefined; + if (streamToParentRequested && parentSessionKey) { + // Register relay before dispatch so fast lifecycle failures are not missed. + parentRelay = startAcpSpawnParentStreamRelay({ + runId: childIdem, + parentSessionKey, + childSessionKey: sessionKey, + agentId: targetAgentId, + logPath: streamLogPath, + emitStartNotice: false, + }); + } try { const response = await callGateway<{ runId?: string }>({ method: "agent", @@ -423,7 +459,7 @@ export async function spawnAcpDirect( accountId: hasDeliveryTarget ? (requesterOrigin?.accountId ?? undefined) : undefined, threadId: hasDeliveryTarget ? deliveryThreadId : undefined, idempotencyKey: childIdem, - deliver: hasDeliveryTarget, + deliver: deliverToBoundTarget, label: params.label || undefined, }, timeoutMs: 10_000, @@ -432,6 +468,7 @@ export async function spawnAcpDirect( childRunId = response.runId.trim(); } } catch (err) { + parentRelay?.dispose(); await cleanupFailedAcpSpawn({ cfg, sessionKey, @@ -445,6 +482,30 @@ export async function spawnAcpDirect( }; } + if (streamToParentRequested && parentSessionKey) { + if (parentRelay && childRunId !== childIdem) { + parentRelay.dispose(); + // Defensive fallback if gateway returns a runId that differs from idempotency key. + parentRelay = startAcpSpawnParentStreamRelay({ + runId: childRunId, + parentSessionKey, + childSessionKey: sessionKey, + agentId: targetAgentId, + logPath: streamLogPath, + emitStartNotice: false, + }); + } + parentRelay?.notifyStarted(); + return { + status: "accepted", + childSessionKey: sessionKey, + runId: childRunId, + mode: spawnMode, + ...(streamLogPath ? { streamLogPath } : {}), + note: spawnMode === "session" ? ACP_SPAWN_SESSION_ACCEPTED_NOTE : ACP_SPAWN_ACCEPTED_NOTE, + }; + } + return { status: "accepted", childSessionKey: sessionKey, diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index 9b07fafc4da..36c1f420af4 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -93,6 +93,7 @@ describe("sessions tools", () => { expect(schemaProp("sessions_spawn", "thread").type).toBe("boolean"); expect(schemaProp("sessions_spawn", "mode").type).toBe("string"); expect(schemaProp("sessions_spawn", "sandbox").type).toBe("string"); + expect(schemaProp("sessions_spawn", "streamTo").type).toBe("string"); expect(schemaProp("sessions_spawn", "runtime").type).toBe("string"); expect(schemaProp("sessions_spawn", "cwd").type).toBe("string"); expect(schemaProp("subagents", "recentMinutes").type).toBe("number"); diff --git a/src/agents/tools/sessions-spawn-tool.test.ts b/src/agents/tools/sessions-spawn-tool.test.ts index 3b6b67dbe47..a000000f1ee 100644 --- a/src/agents/tools/sessions-spawn-tool.test.ts +++ b/src/agents/tools/sessions-spawn-tool.test.ts @@ -16,6 +16,7 @@ vi.mock("../subagent-spawn.js", () => ({ vi.mock("../acp-spawn.js", () => ({ ACP_SPAWN_MODES: ["run", "session"], + ACP_SPAWN_STREAM_TARGETS: ["parent"], spawnAcpDirect: (...args: unknown[]) => hoisted.spawnAcpDirectMock(...args), })); @@ -94,6 +95,7 @@ describe("sessions_spawn tool", () => { cwd: "/workspace", thread: true, mode: "session", + streamTo: "parent", }); expect(result.details).toMatchObject({ @@ -108,6 +110,7 @@ describe("sessions_spawn tool", () => { cwd: "/workspace", thread: true, mode: "session", + streamTo: "parent", }), expect.objectContaining({ agentSessionKey: "agent:main:main", @@ -165,6 +168,26 @@ describe("sessions_spawn tool", () => { expect(hoisted.spawnSubagentDirectMock).not.toHaveBeenCalled(); }); + it('rejects streamTo when runtime is not "acp"', async () => { + const tool = createSessionsSpawnTool({ + agentSessionKey: "agent:main:main", + }); + + const result = await tool.execute("call-3b", { + runtime: "subagent", + task: "analyze file", + streamTo: "parent", + }); + + expect(result.details).toMatchObject({ + status: "error", + }); + const details = result.details as { error?: string }; + expect(details.error).toContain("streamTo is only supported for runtime=acp"); + expect(hoisted.spawnAcpDirectMock).not.toHaveBeenCalled(); + expect(hoisted.spawnSubagentDirectMock).not.toHaveBeenCalled(); + }); + it("keeps attachment content schema unconstrained for llama.cpp grammar safety", () => { const tool = createSessionsSpawnTool(); const schema = tool.parameters as { diff --git a/src/agents/tools/sessions-spawn-tool.ts b/src/agents/tools/sessions-spawn-tool.ts index 7ea48ded44f..03a138e8a0f 100644 --- a/src/agents/tools/sessions-spawn-tool.ts +++ b/src/agents/tools/sessions-spawn-tool.ts @@ -1,6 +1,6 @@ import { Type } from "@sinclair/typebox"; import type { GatewayMessageChannel } from "../../utils/message-channel.js"; -import { ACP_SPAWN_MODES, spawnAcpDirect } from "../acp-spawn.js"; +import { ACP_SPAWN_MODES, ACP_SPAWN_STREAM_TARGETS, spawnAcpDirect } from "../acp-spawn.js"; import { optionalStringEnum } from "../schema/typebox.js"; import { SUBAGENT_SPAWN_MODES, spawnSubagentDirect } from "../subagent-spawn.js"; import type { AnyAgentTool } from "./common.js"; @@ -34,6 +34,7 @@ const SessionsSpawnToolSchema = Type.Object({ mode: optionalStringEnum(SUBAGENT_SPAWN_MODES), cleanup: optionalStringEnum(["delete", "keep"] as const), sandbox: optionalStringEnum(SESSIONS_SPAWN_SANDBOX_MODES), + streamTo: optionalStringEnum(ACP_SPAWN_STREAM_TARGETS), // Inline attachments (snapshot-by-value). // NOTE: Attachment contents are redacted from transcript persistence by sanitizeToolCallInputs. @@ -97,6 +98,7 @@ export function createSessionsSpawnTool(opts?: { const cleanup = params.cleanup === "keep" || params.cleanup === "delete" ? params.cleanup : "keep"; const sandbox = params.sandbox === "require" ? "require" : "inherit"; + const streamTo = params.streamTo === "parent" ? "parent" : undefined; // Back-compat: older callers used timeoutSeconds for this tool. const timeoutSecondsCandidate = typeof params.runTimeoutSeconds === "number" @@ -118,6 +120,13 @@ export function createSessionsSpawnTool(opts?: { }>) : undefined; + if (streamTo && runtime !== "acp") { + return jsonResult({ + status: "error", + error: `streamTo is only supported for runtime=acp; got runtime=${runtime}`, + }); + } + if (runtime === "acp") { if (Array.isArray(attachments) && attachments.length > 0) { return jsonResult({ @@ -135,6 +144,7 @@ export function createSessionsSpawnTool(opts?: { mode: mode && ACP_SPAWN_MODES.includes(mode) ? mode : undefined, thread, sandbox, + streamTo, }, { agentSessionKey: opts?.agentSessionKey,