fix: relay ACP sessions_spawn parent streaming (#34310) (thanks @vincentkoc) (#34310)

Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
This commit is contained in:
Bob
2026-03-04 11:44:20 +01:00
committed by GitHub
parent 61f7cea48b
commit 257e2f5338
10 changed files with 893 additions and 3 deletions

View File

@@ -69,6 +69,9 @@ Docs: https://docs.openclaw.ai
- iOS/TTS playback fallback: keep voice playback resilient by switching from PCM to MP3 when provider format support is unavailable, while avoiding sticky fallback on generic local playback errors. (#33032) thanks @mbelinky.
- Telegram/multi-account default routing clarity: warn only for ambiguous (2+) account setups without an explicit default, add `openclaw doctor` warnings for missing/invalid multi-account defaults across channels, and document explicit-default guidance for channel routing and Telegram config. (#32544) thanks @Sid-Qin.
- Telegram/plugin outbound hook parity: run `message_sending` + `message_sent` in Telegram reply delivery, include reply-path hook metadata (`mediaUrls`, `threadId`), and report `message_sent.success=false` when hooks blank text and no outbound message is delivered. (#32649) Thanks @KimGLee.
- CLI/Coding-agent reliability: switch default `claude-cli` non-interactive args to `--permission-mode bypassPermissions`, auto-normalize legacy `--dangerously-skip-permissions` backend overrides to the modern permission-mode form, align coding-agent + live-test docs with the non-PTY Claude path, and emit session system-event heartbeat notices when CLI watchdog no-output timeouts terminate runs. (#28610, #31149, #34055). Thanks @niceysam, @cryptomaltese and @vincentkoc.
- ACP/ACPX session bootstrap: retry with `sessions new` when `sessions ensure` returns no session identifiers so ACP spawns avoid `NO_SESSION`/`ACP_TURN_FAILED` failures on affected agents. (#28786, #31338, #34055). Thanks @Sid-Qin and @vincentkoc.
- ACP/sessions_spawn parent stream visibility: add `streamTo: "parent"` for `runtime: "acp"` to forward initial child-run progress/no-output/completion updates back into the requester session as system events (instead of direct child delivery), and emit a tail-able session-scoped relay log (`<sessionId>.acp-stream.jsonl`, returned as `streamLogPath` when available), improving orchestrator visibility for blocked or long-running harness turns. (#34310, #29909; reopened from #34055). Thanks @vincentkoc.
- Agents/bootstrap truncation warning handling: unify bootstrap budget/truncation analysis across embedded + CLI runtime, `/context`, and `openclaw doctor`; add `agents.defaults.bootstrapPromptTruncationWarning` (`off|once|always`, default `once`) and persist warning-signature metadata so truncation warnings are consistent and deduped across turns. (#32769) Thanks @gumadeiras.
- Agents/Skills runtime loading: propagate run config into embedded attempt and compaction skill-entry loading so explicitly enabled bundled companion skills are discovered consistently when skill snapshots do not already provide resolved entries. Thanks @gumadeiras.
- Agents/Session startup date grounding: substitute `YYYY-MM-DD` placeholders in startup/post-compaction AGENTS context and append runtime current-time lines for `/new` and `/reset` prompts so daily-memory references resolve correctly. (#32381) Thanks @chengzhichao-xydt.

View File

@@ -119,6 +119,8 @@ Interface details:
- `mode: "session"` requires `thread: true`
- `cwd` (optional): requested runtime working directory (validated by backend/runtime policy).
- `label` (optional): operator-facing label used in session/banner text.
- `streamTo` (optional): `"parent"` streams initial ACP run progress summaries back to the requester session as system events.
- When available, accepted responses include `streamLogPath` pointing to a session-scoped JSONL log (`<sessionId>.acp-stream.jsonl`) you can tail for full relay history.
## Sandbox compatibility

View File

@@ -472,7 +472,7 @@ Core parameters:
- `sessions_list`: `kinds?`, `limit?`, `activeMinutes?`, `messageLimit?` (0 = none)
- `sessions_history`: `sessionKey` (or `sessionId`), `limit?`, `includeTools?`
- `sessions_send`: `sessionKey` (or `sessionId`), `message`, `timeoutSeconds?` (0 = fire-and-forget)
- `sessions_spawn`: `task`, `label?`, `runtime?`, `agentId?`, `model?`, `thinking?`, `cwd?`, `runTimeoutSeconds?`, `thread?`, `mode?`, `cleanup?`, `sandbox?`, `attachments?`, `attachAs?`
- `sessions_spawn`: `task`, `label?`, `runtime?`, `agentId?`, `model?`, `thinking?`, `cwd?`, `runTimeoutSeconds?`, `thread?`, `mode?`, `cleanup?`, `sandbox?`, `streamTo?`, `attachments?`, `attachAs?`
- `session_status`: `sessionKey?` (default current; accepts `sessionId`), `model?` (`default` clears override)
Notes:
@@ -483,6 +483,7 @@ Notes:
- `sessions_send` waits for final completion when `timeoutSeconds > 0`.
- Delivery/announce happens after completion and is best-effort; `status: "ok"` confirms the agent run finished, not that the announce was delivered.
- `sessions_spawn` supports `runtime: "subagent" | "acp"` (`subagent` default). For ACP runtime behavior, see [ACP Agents](/tools/acp-agents).
- For ACP runtime, `streamTo: "parent"` routes initial-run progress summaries back to the requester session as system events instead of direct child delivery.
- `sessions_spawn` starts a sub-agent run and posts an announce reply back to the requester chat.
- Supports one-shot mode (`mode: "run"`) and persistent thread-bound mode (`mode: "session"` with `thread: true`).
- If `thread: true` and `mode` is omitted, mode defaults to `session`.
@@ -496,6 +497,7 @@ Notes:
- Configure limits via `tools.sessions_spawn.attachments` (`enabled`, `maxTotalBytes`, `maxFiles`, `maxFileBytes`, `retainOnSessionKeep`).
- `attachAs.mountPath` is a reserved hint for future mount implementations.
- `sessions_spawn` is non-blocking and returns `status: "accepted"` immediately.
- ACP `streamTo: "parent"` responses may include `streamLogPath` (session-scoped `*.acp-stream.jsonl`) for tailing progress history.
- `sessions_send` runs a replyback pingpong (reply `REPLY_SKIP` to stop; max turns via `session.agentToAgent.maxPingPongTurns`, 05).
- After the pingpong, the target agent runs an **announce step**; reply `ANNOUNCE_SKIP` to suppress the announcement.
- Sandbox clamp: when the current session is sandboxed and `agents.defaults.sandbox.sessionToolsVisibility: "spawned"`, OpenClaw clamps `tools.sessions.visibility` to `tree`.

View File

@@ -0,0 +1,242 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { emitAgentEvent } from "../infra/agent-events.js";
import {
resolveAcpSpawnStreamLogPath,
startAcpSpawnParentStreamRelay,
} from "./acp-spawn-parent-stream.js";
const enqueueSystemEventMock = vi.fn();
const requestHeartbeatNowMock = vi.fn();
const readAcpSessionEntryMock = vi.fn();
const resolveSessionFilePathMock = vi.fn();
const resolveSessionFilePathOptionsMock = vi.fn();
vi.mock("../infra/system-events.js", () => ({
enqueueSystemEvent: (...args: unknown[]) => enqueueSystemEventMock(...args),
}));
vi.mock("../infra/heartbeat-wake.js", () => ({
requestHeartbeatNow: (...args: unknown[]) => requestHeartbeatNowMock(...args),
}));
vi.mock("../acp/runtime/session-meta.js", () => ({
readAcpSessionEntry: (...args: unknown[]) => readAcpSessionEntryMock(...args),
}));
vi.mock("../config/sessions/paths.js", () => ({
resolveSessionFilePath: (...args: unknown[]) => resolveSessionFilePathMock(...args),
resolveSessionFilePathOptions: (...args: unknown[]) => resolveSessionFilePathOptionsMock(...args),
}));
function collectedTexts() {
return enqueueSystemEventMock.mock.calls.map((call) => String(call[0] ?? ""));
}
describe("startAcpSpawnParentStreamRelay", () => {
beforeEach(() => {
enqueueSystemEventMock.mockClear();
requestHeartbeatNowMock.mockClear();
readAcpSessionEntryMock.mockReset();
resolveSessionFilePathMock.mockReset();
resolveSessionFilePathOptionsMock.mockReset();
resolveSessionFilePathOptionsMock.mockImplementation((value: unknown) => value);
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-03-04T01:00:00.000Z"));
});
afterEach(() => {
vi.useRealTimers();
});
it("relays assistant progress and completion to the parent session", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-1",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child-1",
agentId: "codex",
streamFlushMs: 10,
noOutputNoticeMs: 120_000,
});
emitAgentEvent({
runId: "run-1",
stream: "assistant",
data: {
delta: "hello from child",
},
});
vi.advanceTimersByTime(15);
emitAgentEvent({
runId: "run-1",
stream: "lifecycle",
data: {
phase: "end",
startedAt: 1_000,
endedAt: 3_100,
},
});
const texts = collectedTexts();
expect(texts.some((text) => text.includes("Started codex session"))).toBe(true);
expect(texts.some((text) => text.includes("codex: hello from child"))).toBe(true);
expect(texts.some((text) => text.includes("codex run completed in 2s"))).toBe(true);
expect(requestHeartbeatNowMock).toHaveBeenCalledWith(
expect.objectContaining({
reason: "acp:spawn:stream",
sessionKey: "agent:main:main",
}),
);
relay.dispose();
});
it("emits a no-output notice and a resumed notice when output returns", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-2",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child-2",
agentId: "codex",
streamFlushMs: 1,
noOutputNoticeMs: 1_000,
noOutputPollMs: 250,
});
vi.advanceTimersByTime(1_500);
expect(collectedTexts().some((text) => text.includes("has produced no output for 1s"))).toBe(
true,
);
emitAgentEvent({
runId: "run-2",
stream: "assistant",
data: {
delta: "resumed output",
},
});
vi.advanceTimersByTime(5);
const texts = collectedTexts();
expect(texts.some((text) => text.includes("resumed output."))).toBe(true);
expect(texts.some((text) => text.includes("codex: resumed output"))).toBe(true);
emitAgentEvent({
runId: "run-2",
stream: "lifecycle",
data: {
phase: "error",
error: "boom",
},
});
expect(collectedTexts().some((text) => text.includes("run failed: boom"))).toBe(true);
relay.dispose();
});
it("auto-disposes stale relays after max lifetime timeout", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-3",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child-3",
agentId: "codex",
streamFlushMs: 1,
noOutputNoticeMs: 0,
maxRelayLifetimeMs: 1_000,
});
vi.advanceTimersByTime(1_001);
expect(collectedTexts().some((text) => text.includes("stream relay timed out after 1s"))).toBe(
true,
);
const before = enqueueSystemEventMock.mock.calls.length;
emitAgentEvent({
runId: "run-3",
stream: "assistant",
data: {
delta: "late output",
},
});
vi.advanceTimersByTime(5);
expect(enqueueSystemEventMock.mock.calls).toHaveLength(before);
relay.dispose();
});
it("supports delayed start notices", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-4",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child-4",
agentId: "codex",
emitStartNotice: false,
});
expect(collectedTexts().some((text) => text.includes("Started codex session"))).toBe(false);
relay.notifyStarted();
expect(collectedTexts().some((text) => text.includes("Started codex session"))).toBe(true);
relay.dispose();
});
it("preserves delta whitespace boundaries in progress relays", () => {
const relay = startAcpSpawnParentStreamRelay({
runId: "run-5",
parentSessionKey: "agent:main:main",
childSessionKey: "agent:codex:acp:child-5",
agentId: "codex",
streamFlushMs: 10,
noOutputNoticeMs: 120_000,
});
emitAgentEvent({
runId: "run-5",
stream: "assistant",
data: {
delta: "hello",
},
});
emitAgentEvent({
runId: "run-5",
stream: "assistant",
data: {
delta: " world",
},
});
vi.advanceTimersByTime(15);
const texts = collectedTexts();
expect(texts.some((text) => text.includes("codex: hello world"))).toBe(true);
relay.dispose();
});
it("resolves ACP spawn stream log path from session metadata", () => {
readAcpSessionEntryMock.mockReturnValue({
storePath: "/tmp/openclaw/agents/codex/sessions/sessions.json",
entry: {
sessionId: "sess-123",
sessionFile: "/tmp/openclaw/agents/codex/sessions/sess-123.jsonl",
},
});
resolveSessionFilePathMock.mockReturnValue(
"/tmp/openclaw/agents/codex/sessions/sess-123.jsonl",
);
const resolved = resolveAcpSpawnStreamLogPath({
childSessionKey: "agent:codex:acp:child-1",
});
expect(resolved).toBe("/tmp/openclaw/agents/codex/sessions/sess-123.acp-stream.jsonl");
expect(readAcpSessionEntryMock).toHaveBeenCalledWith({
sessionKey: "agent:codex:acp:child-1",
});
expect(resolveSessionFilePathMock).toHaveBeenCalledWith(
"sess-123",
expect.objectContaining({
sessionId: "sess-123",
}),
expect.objectContaining({
storePath: "/tmp/openclaw/agents/codex/sessions/sessions.json",
}),
);
});
});

View File

@@ -0,0 +1,376 @@
import { appendFile, mkdir } from "node:fs/promises";
import path from "node:path";
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js";
import { onAgentEvent } from "../infra/agent-events.js";
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
const DEFAULT_STREAM_FLUSH_MS = 2_500;
const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000;
const DEFAULT_NO_OUTPUT_POLL_MS = 15_000;
const DEFAULT_MAX_RELAY_LIFETIME_MS = 6 * 60 * 60 * 1000;
const STREAM_BUFFER_MAX_CHARS = 4_000;
const STREAM_SNIPPET_MAX_CHARS = 220;
function compactWhitespace(value: string): string {
return value.replace(/\s+/g, " ").trim();
}
function truncate(value: string, maxChars: number): string {
if (value.length <= maxChars) {
return value;
}
if (maxChars <= 1) {
return value.slice(0, maxChars);
}
return `${value.slice(0, maxChars - 1)}`;
}
function toTrimmedString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed || undefined;
}
function toFiniteNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function resolveAcpStreamLogPathFromSessionFile(sessionFile: string, sessionId: string): string {
const baseDir = path.dirname(path.resolve(sessionFile));
return path.join(baseDir, `${sessionId}.acp-stream.jsonl`);
}
export function resolveAcpSpawnStreamLogPath(params: {
childSessionKey: string;
}): string | undefined {
const childSessionKey = params.childSessionKey.trim();
if (!childSessionKey) {
return undefined;
}
const storeEntry = readAcpSessionEntry({
sessionKey: childSessionKey,
});
const sessionId = storeEntry?.entry?.sessionId?.trim();
if (!storeEntry || !sessionId) {
return undefined;
}
try {
const sessionFile = resolveSessionFilePath(
sessionId,
storeEntry.entry,
resolveSessionFilePathOptions({
storePath: storeEntry.storePath,
}),
);
return resolveAcpStreamLogPathFromSessionFile(sessionFile, sessionId);
} catch {
return undefined;
}
}
export function startAcpSpawnParentStreamRelay(params: {
runId: string;
parentSessionKey: string;
childSessionKey: string;
agentId: string;
logPath?: string;
streamFlushMs?: number;
noOutputNoticeMs?: number;
noOutputPollMs?: number;
maxRelayLifetimeMs?: number;
emitStartNotice?: boolean;
}): AcpSpawnParentRelayHandle {
const runId = params.runId.trim();
const parentSessionKey = params.parentSessionKey.trim();
if (!runId || !parentSessionKey) {
return {
dispose: () => {},
notifyStarted: () => {},
};
}
const streamFlushMs =
typeof params.streamFlushMs === "number" && Number.isFinite(params.streamFlushMs)
? Math.max(0, Math.floor(params.streamFlushMs))
: DEFAULT_STREAM_FLUSH_MS;
const noOutputNoticeMs =
typeof params.noOutputNoticeMs === "number" && Number.isFinite(params.noOutputNoticeMs)
? Math.max(0, Math.floor(params.noOutputNoticeMs))
: DEFAULT_NO_OUTPUT_NOTICE_MS;
const noOutputPollMs =
typeof params.noOutputPollMs === "number" && Number.isFinite(params.noOutputPollMs)
? Math.max(250, Math.floor(params.noOutputPollMs))
: DEFAULT_NO_OUTPUT_POLL_MS;
const maxRelayLifetimeMs =
typeof params.maxRelayLifetimeMs === "number" && Number.isFinite(params.maxRelayLifetimeMs)
? Math.max(1_000, Math.floor(params.maxRelayLifetimeMs))
: DEFAULT_MAX_RELAY_LIFETIME_MS;
const relayLabel = truncate(compactWhitespace(params.agentId), 40) || "ACP child";
const contextPrefix = `acp-spawn:${runId}`;
const logPath = toTrimmedString(params.logPath);
let logDirReady = false;
let pendingLogLines = "";
let logFlushScheduled = false;
let logWriteChain: Promise<void> = Promise.resolve();
const flushLogBuffer = () => {
if (!logPath || !pendingLogLines) {
return;
}
const chunk = pendingLogLines;
pendingLogLines = "";
logWriteChain = logWriteChain
.then(async () => {
if (!logDirReady) {
await mkdir(path.dirname(logPath), {
recursive: true,
});
logDirReady = true;
}
await appendFile(logPath, chunk, {
encoding: "utf-8",
mode: 0o600,
});
})
.catch(() => {
// Best-effort diagnostics; never break relay flow.
});
};
const scheduleLogFlush = () => {
if (!logPath || logFlushScheduled) {
return;
}
logFlushScheduled = true;
queueMicrotask(() => {
logFlushScheduled = false;
flushLogBuffer();
});
};
const writeLogLine = (entry: Record<string, unknown>) => {
if (!logPath) {
return;
}
try {
pendingLogLines += `${JSON.stringify(entry)}\n`;
if (pendingLogLines.length >= 16_384) {
flushLogBuffer();
return;
}
scheduleLogFlush();
} catch {
// Best-effort diagnostics; never break relay flow.
}
};
const logEvent = (kind: string, fields?: Record<string, unknown>) => {
writeLogLine({
ts: new Date().toISOString(),
epochMs: Date.now(),
runId,
parentSessionKey,
childSessionKey: params.childSessionKey,
agentId: params.agentId,
kind,
...fields,
});
};
const wake = () => {
requestHeartbeatNow(
scopedHeartbeatWakeOptions(parentSessionKey, { reason: "acp:spawn:stream" }),
);
};
const emit = (text: string, contextKey: string) => {
const cleaned = text.trim();
if (!cleaned) {
return;
}
logEvent("system_event", { contextKey, text: cleaned });
enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey });
wake();
};
const emitStartNotice = () => {
emit(
`Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`,
`${contextPrefix}:start`,
);
};
let disposed = false;
let pendingText = "";
let lastProgressAt = Date.now();
let stallNotified = false;
let flushTimer: NodeJS.Timeout | undefined;
let relayLifetimeTimer: NodeJS.Timeout | undefined;
const clearFlushTimer = () => {
if (!flushTimer) {
return;
}
clearTimeout(flushTimer);
flushTimer = undefined;
};
const clearRelayLifetimeTimer = () => {
if (!relayLifetimeTimer) {
return;
}
clearTimeout(relayLifetimeTimer);
relayLifetimeTimer = undefined;
};
const flushPending = () => {
clearFlushTimer();
if (!pendingText) {
return;
}
const snippet = truncate(compactWhitespace(pendingText), STREAM_SNIPPET_MAX_CHARS);
pendingText = "";
if (!snippet) {
return;
}
emit(`${relayLabel}: ${snippet}`, `${contextPrefix}:progress`);
};
const scheduleFlush = () => {
if (disposed || flushTimer || streamFlushMs <= 0) {
return;
}
flushTimer = setTimeout(() => {
flushPending();
}, streamFlushMs);
flushTimer.unref?.();
};
const noOutputWatcherTimer = setInterval(() => {
if (disposed || noOutputNoticeMs <= 0) {
return;
}
if (stallNotified) {
return;
}
if (Date.now() - lastProgressAt < noOutputNoticeMs) {
return;
}
stallNotified = true;
emit(
`${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`,
`${contextPrefix}:stall`,
);
}, noOutputPollMs);
noOutputWatcherTimer.unref?.();
relayLifetimeTimer = setTimeout(() => {
if (disposed) {
return;
}
emit(
`${relayLabel} stream relay timed out after ${Math.max(1, Math.round(maxRelayLifetimeMs / 1000))}s without completion.`,
`${contextPrefix}:timeout`,
);
dispose();
}, maxRelayLifetimeMs);
relayLifetimeTimer.unref?.();
if (params.emitStartNotice !== false) {
emitStartNotice();
}
const unsubscribe = onAgentEvent((event) => {
if (disposed || event.runId !== runId) {
return;
}
if (event.stream === "assistant") {
const data = event.data;
const deltaCandidate =
(data as { delta?: unknown } | undefined)?.delta ??
(data as { text?: unknown } | undefined)?.text;
const delta = typeof deltaCandidate === "string" ? deltaCandidate : undefined;
if (!delta || !delta.trim()) {
return;
}
logEvent("assistant_delta", { delta });
if (stallNotified) {
stallNotified = false;
emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`);
}
lastProgressAt = Date.now();
pendingText += delta;
if (pendingText.length > STREAM_BUFFER_MAX_CHARS) {
pendingText = pendingText.slice(-STREAM_BUFFER_MAX_CHARS);
}
if (pendingText.length >= STREAM_SNIPPET_MAX_CHARS || delta.includes("\n\n")) {
flushPending();
return;
}
scheduleFlush();
return;
}
if (event.stream !== "lifecycle") {
return;
}
const phase = toTrimmedString((event.data as { phase?: unknown } | undefined)?.phase);
logEvent("lifecycle", { phase: phase ?? "unknown", data: event.data });
if (phase === "end") {
flushPending();
const startedAt = toFiniteNumber(
(event.data as { startedAt?: unknown } | undefined)?.startedAt,
);
const endedAt = toFiniteNumber((event.data as { endedAt?: unknown } | undefined)?.endedAt);
const durationMs =
startedAt != null && endedAt != null && endedAt >= startedAt
? endedAt - startedAt
: undefined;
if (durationMs != null) {
emit(
`${relayLabel} run completed in ${Math.max(1, Math.round(durationMs / 1000))}s.`,
`${contextPrefix}:done`,
);
} else {
emit(`${relayLabel} run completed.`, `${contextPrefix}:done`);
}
dispose();
return;
}
if (phase === "error") {
flushPending();
const errorText = toTrimmedString((event.data as { error?: unknown } | undefined)?.error);
if (errorText) {
emit(`${relayLabel} run failed: ${errorText}`, `${contextPrefix}:error`);
} else {
emit(`${relayLabel} run failed.`, `${contextPrefix}:error`);
}
dispose();
}
});
const dispose = () => {
if (disposed) {
return;
}
disposed = true;
clearFlushTimer();
clearRelayLifetimeTimer();
flushLogBuffer();
clearInterval(noOutputWatcherTimer);
unsubscribe();
};
return {
dispose,
notifyStarted: emitStartNotice,
};
}
export type AcpSpawnParentRelayHandle = {
dispose: () => void;
notifyStarted: () => void;
};

View File

@@ -33,6 +33,8 @@ const hoisted = vi.hoisted(() => {
const sessionBindingListBySessionMock = vi.fn();
const closeSessionMock = vi.fn();
const initializeSessionMock = vi.fn();
const startAcpSpawnParentStreamRelayMock = vi.fn();
const resolveAcpSpawnStreamLogPathMock = vi.fn();
const state = {
cfg: createDefaultSpawnConfig(),
};
@@ -45,6 +47,8 @@ const hoisted = vi.hoisted(() => {
sessionBindingListBySessionMock,
closeSessionMock,
initializeSessionMock,
startAcpSpawnParentStreamRelayMock,
resolveAcpSpawnStreamLogPathMock,
state,
};
});
@@ -100,6 +104,13 @@ vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) =
};
});
vi.mock("./acp-spawn-parent-stream.js", () => ({
startAcpSpawnParentStreamRelay: (...args: unknown[]) =>
hoisted.startAcpSpawnParentStreamRelayMock(...args),
resolveAcpSpawnStreamLogPath: (...args: unknown[]) =>
hoisted.resolveAcpSpawnStreamLogPathMock(...args),
}));
const { spawnAcpDirect } = await import("./acp-spawn.js");
function createSessionBindingCapabilities() {
@@ -132,6 +143,16 @@ function createSessionBinding(overrides?: Partial<SessionBindingRecord>): Sessio
};
}
function createRelayHandle(overrides?: {
dispose?: ReturnType<typeof vi.fn>;
notifyStarted?: ReturnType<typeof vi.fn>;
}) {
return {
dispose: overrides?.dispose ?? vi.fn(),
notifyStarted: overrides?.notifyStarted ?? vi.fn(),
};
}
function expectResolvedIntroTextInBindMetadata(): void {
const callWithMetadata = hoisted.sessionBindingBindMock.mock.calls.find(
(call: unknown[]) =>
@@ -236,6 +257,12 @@ describe("spawnAcpDirect", () => {
hoisted.sessionBindingResolveByConversationMock.mockReset().mockReturnValue(null);
hoisted.sessionBindingListBySessionMock.mockReset().mockReturnValue([]);
hoisted.sessionBindingUnbindMock.mockReset().mockResolvedValue([]);
hoisted.startAcpSpawnParentStreamRelayMock
.mockReset()
.mockImplementation(() => createRelayHandle());
hoisted.resolveAcpSpawnStreamLogPathMock
.mockReset()
.mockReturnValue("/tmp/sess-main.acp-stream.jsonl");
});
it("spawns ACP session, binds a new thread, and dispatches initial task", async () => {
@@ -423,4 +450,147 @@ describe("spawnAcpDirect", () => {
expect(hoisted.callGatewayMock).not.toHaveBeenCalled();
expect(hoisted.initializeSessionMock).not.toHaveBeenCalled();
});
it('streams ACP progress to parent when streamTo="parent"', async () => {
const firstHandle = createRelayHandle();
const secondHandle = createRelayHandle();
hoisted.startAcpSpawnParentStreamRelayMock
.mockReset()
.mockReturnValueOnce(firstHandle)
.mockReturnValueOnce(secondHandle);
const result = await spawnAcpDirect(
{
task: "Investigate flaky tests",
agentId: "codex",
streamTo: "parent",
},
{
agentSessionKey: "agent:main:main",
agentChannel: "discord",
agentAccountId: "default",
agentTo: "channel:parent-channel",
},
);
expect(result.status).toBe("accepted");
expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl");
const agentCall = hoisted.callGatewayMock.mock.calls
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
.find((request) => request.method === "agent");
const agentCallIndex = hoisted.callGatewayMock.mock.calls.findIndex(
(call: unknown[]) => (call[0] as { method?: string }).method === "agent",
);
const relayCallOrder = hoisted.startAcpSpawnParentStreamRelayMock.mock.invocationCallOrder[0];
const agentCallOrder = hoisted.callGatewayMock.mock.invocationCallOrder[agentCallIndex];
expect(agentCall?.params?.deliver).toBe(false);
expect(typeof relayCallOrder).toBe("number");
expect(typeof agentCallOrder).toBe("number");
expect(relayCallOrder < agentCallOrder).toBe(true);
expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith(
expect.objectContaining({
parentSessionKey: "agent:main:main",
agentId: "codex",
logPath: "/tmp/sess-main.acp-stream.jsonl",
emitStartNotice: false,
}),
);
const relayRuns = hoisted.startAcpSpawnParentStreamRelayMock.mock.calls.map(
(call: unknown[]) => (call[0] as { runId?: string }).runId,
);
expect(relayRuns).toContain(agentCall?.params?.idempotencyKey);
expect(relayRuns).toContain(result.runId);
expect(hoisted.resolveAcpSpawnStreamLogPathMock).toHaveBeenCalledWith({
childSessionKey: expect.stringMatching(/^agent:codex:acp:/),
});
expect(firstHandle.dispose).toHaveBeenCalledTimes(1);
expect(firstHandle.notifyStarted).not.toHaveBeenCalled();
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
});
it("announces parent relay start only after successful child dispatch", async () => {
const firstHandle = createRelayHandle();
const secondHandle = createRelayHandle();
hoisted.startAcpSpawnParentStreamRelayMock
.mockReset()
.mockReturnValueOnce(firstHandle)
.mockReturnValueOnce(secondHandle);
const result = await spawnAcpDirect(
{
task: "Investigate flaky tests",
agentId: "codex",
streamTo: "parent",
},
{
agentSessionKey: "agent:main:main",
},
);
expect(result.status).toBe("accepted");
expect(firstHandle.notifyStarted).not.toHaveBeenCalled();
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
const notifyOrder = secondHandle.notifyStarted.mock.invocationCallOrder;
const agentCallIndex = hoisted.callGatewayMock.mock.calls.findIndex(
(call: unknown[]) => (call[0] as { method?: string }).method === "agent",
);
const agentCallOrder = hoisted.callGatewayMock.mock.invocationCallOrder[agentCallIndex];
expect(typeof agentCallOrder).toBe("number");
expect(typeof notifyOrder[0]).toBe("number");
expect(notifyOrder[0] > agentCallOrder).toBe(true);
});
it("disposes pre-registered parent relay when initial ACP dispatch fails", async () => {
const relayHandle = createRelayHandle();
hoisted.startAcpSpawnParentStreamRelayMock.mockReturnValueOnce(relayHandle);
hoisted.callGatewayMock.mockImplementation(async (argsUnknown: unknown) => {
const args = argsUnknown as { method?: string };
if (args.method === "sessions.patch") {
return { ok: true };
}
if (args.method === "agent") {
throw new Error("agent dispatch failed");
}
if (args.method === "sessions.delete") {
return { ok: true };
}
return {};
});
const result = await spawnAcpDirect(
{
task: "Investigate flaky tests",
agentId: "codex",
streamTo: "parent",
},
{
agentSessionKey: "agent:main:main",
},
);
expect(result.status).toBe("error");
expect(result.error).toContain("agent dispatch failed");
expect(relayHandle.dispose).toHaveBeenCalledTimes(1);
expect(relayHandle.notifyStarted).not.toHaveBeenCalled();
});
it('rejects streamTo="parent" without requester session context', async () => {
const result = await spawnAcpDirect(
{
task: "Investigate flaky tests",
agentId: "codex",
streamTo: "parent",
},
{
agentChannel: "discord",
agentAccountId: "default",
agentTo: "channel:parent-channel",
},
);
expect(result.status).toBe("error");
expect(result.error).toContain('streamTo="parent"');
expect(hoisted.callGatewayMock).not.toHaveBeenCalled();
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
});
});

View File

@@ -32,12 +32,19 @@ import {
} from "../infra/outbound/session-binding-service.js";
import { normalizeAgentId } from "../routing/session-key.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.js";
import {
type AcpSpawnParentRelayHandle,
resolveAcpSpawnStreamLogPath,
startAcpSpawnParentStreamRelay,
} from "./acp-spawn-parent-stream.js";
import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js";
export const ACP_SPAWN_MODES = ["run", "session"] as const;
export type SpawnAcpMode = (typeof ACP_SPAWN_MODES)[number];
export const ACP_SPAWN_SANDBOX_MODES = ["inherit", "require"] as const;
export type SpawnAcpSandboxMode = (typeof ACP_SPAWN_SANDBOX_MODES)[number];
export const ACP_SPAWN_STREAM_TARGETS = ["parent"] as const;
export type SpawnAcpStreamTarget = (typeof ACP_SPAWN_STREAM_TARGETS)[number];
export type SpawnAcpParams = {
task: string;
@@ -47,6 +54,7 @@ export type SpawnAcpParams = {
mode?: SpawnAcpMode;
thread?: boolean;
sandbox?: SpawnAcpSandboxMode;
streamTo?: SpawnAcpStreamTarget;
};
export type SpawnAcpContext = {
@@ -63,6 +71,7 @@ export type SpawnAcpResult = {
childSessionKey?: string;
runId?: string;
mode?: SpawnAcpMode;
streamLogPath?: string;
note?: string;
error?: string;
};
@@ -234,6 +243,14 @@ export async function spawnAcpDirect(
};
}
const sandboxMode = params.sandbox === "require" ? "require" : "inherit";
const streamToParentRequested = params.streamTo === "parent";
const parentSessionKey = ctx.agentSessionKey?.trim();
if (streamToParentRequested && !parentSessionKey) {
return {
status: "error",
error: 'sessions_spawn streamTo="parent" requires an active requester session context.',
};
}
const requesterRuntime = resolveSandboxRuntimeStatus({
cfg,
sessionKey: ctx.agentSessionKey,
@@ -410,8 +427,27 @@ export async function spawnAcpDirect(
? `channel:${boundThreadId}`
: requesterOrigin?.to?.trim() || (deliveryThreadId ? `channel:${deliveryThreadId}` : undefined);
const hasDeliveryTarget = Boolean(requesterOrigin?.channel && inferredDeliveryTo);
const deliverToBoundTarget = hasDeliveryTarget && !streamToParentRequested;
const childIdem = crypto.randomUUID();
let childRunId: string = childIdem;
const streamLogPath =
streamToParentRequested && parentSessionKey
? resolveAcpSpawnStreamLogPath({
childSessionKey: sessionKey,
})
: undefined;
let parentRelay: AcpSpawnParentRelayHandle | undefined;
if (streamToParentRequested && parentSessionKey) {
// Register relay before dispatch so fast lifecycle failures are not missed.
parentRelay = startAcpSpawnParentStreamRelay({
runId: childIdem,
parentSessionKey,
childSessionKey: sessionKey,
agentId: targetAgentId,
logPath: streamLogPath,
emitStartNotice: false,
});
}
try {
const response = await callGateway<{ runId?: string }>({
method: "agent",
@@ -423,7 +459,7 @@ export async function spawnAcpDirect(
accountId: hasDeliveryTarget ? (requesterOrigin?.accountId ?? undefined) : undefined,
threadId: hasDeliveryTarget ? deliveryThreadId : undefined,
idempotencyKey: childIdem,
deliver: hasDeliveryTarget,
deliver: deliverToBoundTarget,
label: params.label || undefined,
},
timeoutMs: 10_000,
@@ -432,6 +468,7 @@ export async function spawnAcpDirect(
childRunId = response.runId.trim();
}
} catch (err) {
parentRelay?.dispose();
await cleanupFailedAcpSpawn({
cfg,
sessionKey,
@@ -445,6 +482,30 @@ export async function spawnAcpDirect(
};
}
if (streamToParentRequested && parentSessionKey) {
if (parentRelay && childRunId !== childIdem) {
parentRelay.dispose();
// Defensive fallback if gateway returns a runId that differs from idempotency key.
parentRelay = startAcpSpawnParentStreamRelay({
runId: childRunId,
parentSessionKey,
childSessionKey: sessionKey,
agentId: targetAgentId,
logPath: streamLogPath,
emitStartNotice: false,
});
}
parentRelay?.notifyStarted();
return {
status: "accepted",
childSessionKey: sessionKey,
runId: childRunId,
mode: spawnMode,
...(streamLogPath ? { streamLogPath } : {}),
note: spawnMode === "session" ? ACP_SPAWN_SESSION_ACCEPTED_NOTE : ACP_SPAWN_ACCEPTED_NOTE,
};
}
return {
status: "accepted",
childSessionKey: sessionKey,

View File

@@ -93,6 +93,7 @@ describe("sessions tools", () => {
expect(schemaProp("sessions_spawn", "thread").type).toBe("boolean");
expect(schemaProp("sessions_spawn", "mode").type).toBe("string");
expect(schemaProp("sessions_spawn", "sandbox").type).toBe("string");
expect(schemaProp("sessions_spawn", "streamTo").type).toBe("string");
expect(schemaProp("sessions_spawn", "runtime").type).toBe("string");
expect(schemaProp("sessions_spawn", "cwd").type).toBe("string");
expect(schemaProp("subagents", "recentMinutes").type).toBe("number");

View File

@@ -16,6 +16,7 @@ vi.mock("../subagent-spawn.js", () => ({
vi.mock("../acp-spawn.js", () => ({
ACP_SPAWN_MODES: ["run", "session"],
ACP_SPAWN_STREAM_TARGETS: ["parent"],
spawnAcpDirect: (...args: unknown[]) => hoisted.spawnAcpDirectMock(...args),
}));
@@ -94,6 +95,7 @@ describe("sessions_spawn tool", () => {
cwd: "/workspace",
thread: true,
mode: "session",
streamTo: "parent",
});
expect(result.details).toMatchObject({
@@ -108,6 +110,7 @@ describe("sessions_spawn tool", () => {
cwd: "/workspace",
thread: true,
mode: "session",
streamTo: "parent",
}),
expect.objectContaining({
agentSessionKey: "agent:main:main",
@@ -165,6 +168,26 @@ describe("sessions_spawn tool", () => {
expect(hoisted.spawnSubagentDirectMock).not.toHaveBeenCalled();
});
it('rejects streamTo when runtime is not "acp"', async () => {
const tool = createSessionsSpawnTool({
agentSessionKey: "agent:main:main",
});
const result = await tool.execute("call-3b", {
runtime: "subagent",
task: "analyze file",
streamTo: "parent",
});
expect(result.details).toMatchObject({
status: "error",
});
const details = result.details as { error?: string };
expect(details.error).toContain("streamTo is only supported for runtime=acp");
expect(hoisted.spawnAcpDirectMock).not.toHaveBeenCalled();
expect(hoisted.spawnSubagentDirectMock).not.toHaveBeenCalled();
});
it("keeps attachment content schema unconstrained for llama.cpp grammar safety", () => {
const tool = createSessionsSpawnTool();
const schema = tool.parameters as {

View File

@@ -1,6 +1,6 @@
import { Type } from "@sinclair/typebox";
import type { GatewayMessageChannel } from "../../utils/message-channel.js";
import { ACP_SPAWN_MODES, spawnAcpDirect } from "../acp-spawn.js";
import { ACP_SPAWN_MODES, ACP_SPAWN_STREAM_TARGETS, spawnAcpDirect } from "../acp-spawn.js";
import { optionalStringEnum } from "../schema/typebox.js";
import { SUBAGENT_SPAWN_MODES, spawnSubagentDirect } from "../subagent-spawn.js";
import type { AnyAgentTool } from "./common.js";
@@ -34,6 +34,7 @@ const SessionsSpawnToolSchema = Type.Object({
mode: optionalStringEnum(SUBAGENT_SPAWN_MODES),
cleanup: optionalStringEnum(["delete", "keep"] as const),
sandbox: optionalStringEnum(SESSIONS_SPAWN_SANDBOX_MODES),
streamTo: optionalStringEnum(ACP_SPAWN_STREAM_TARGETS),
// Inline attachments (snapshot-by-value).
// NOTE: Attachment contents are redacted from transcript persistence by sanitizeToolCallInputs.
@@ -97,6 +98,7 @@ export function createSessionsSpawnTool(opts?: {
const cleanup =
params.cleanup === "keep" || params.cleanup === "delete" ? params.cleanup : "keep";
const sandbox = params.sandbox === "require" ? "require" : "inherit";
const streamTo = params.streamTo === "parent" ? "parent" : undefined;
// Back-compat: older callers used timeoutSeconds for this tool.
const timeoutSecondsCandidate =
typeof params.runTimeoutSeconds === "number"
@@ -118,6 +120,13 @@ export function createSessionsSpawnTool(opts?: {
}>)
: undefined;
if (streamTo && runtime !== "acp") {
return jsonResult({
status: "error",
error: `streamTo is only supported for runtime=acp; got runtime=${runtime}`,
});
}
if (runtime === "acp") {
if (Array.isArray(attachments) && attachments.length > 0) {
return jsonResult({
@@ -135,6 +144,7 @@ export function createSessionsSpawnTool(opts?: {
mode: mode && ACP_SPAWN_MODES.includes(mode) ? mode : undefined,
thread,
sandbox,
streamTo,
},
{
agentSessionKey: opts?.agentSessionKey,