mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-02 02:20:20 +00:00
* fix(acp): implicit streamToParent for mode=run without thread When spawning ACP sessions with mode=run and no thread binding, automatically route output to parent session instead of Discord. This enables agent-to-agent supervision patterns where the spawning agent wants results returned programmatically, not posted as chat. The change makes sessions_spawn with runtime=acp and thread=false behave like direct acpx invocation - output goes to the spawning session, not to Discord. Fixes the issue where mode=run without thread still posted to Discord because hasDeliveryTarget was true when called from a Discord context. * fix: use resolved spawnMode instead of params.mode Move implicit streamToParent check to after resolveSpawnMode so that both explicit mode="run" and omitted mode (which defaults to "run" when thread is false) correctly trigger parent routing. This fixes the issue where callers that rely on default mode selection would not get the intended parent streaming behavior. * fix: tighten implicit ACP parent relay gating (#42404) (thanks @davidguttman) --------- Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
379 lines
11 KiB
TypeScript
379 lines
11 KiB
TypeScript
import { appendFile, mkdir } from "node:fs/promises";
|
|
import path from "node:path";
|
|
import { readAcpSessionEntry } from "../acp/runtime/session-meta.js";
|
|
import { resolveSessionFilePath, resolveSessionFilePathOptions } from "../config/sessions/paths.js";
|
|
import { onAgentEvent } from "../infra/agent-events.js";
|
|
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
|
import { enqueueSystemEvent } from "../infra/system-events.js";
|
|
import { scopedHeartbeatWakeOptions } from "../routing/session-key.js";
|
|
|
|
const DEFAULT_STREAM_FLUSH_MS = 2_500;
|
|
const DEFAULT_NO_OUTPUT_NOTICE_MS = 60_000;
|
|
const DEFAULT_NO_OUTPUT_POLL_MS = 15_000;
|
|
const DEFAULT_MAX_RELAY_LIFETIME_MS = 6 * 60 * 60 * 1000;
|
|
const STREAM_BUFFER_MAX_CHARS = 4_000;
|
|
const STREAM_SNIPPET_MAX_CHARS = 220;
|
|
|
|
function compactWhitespace(value: string): string {
|
|
return value.replace(/\s+/g, " ").trim();
|
|
}
|
|
|
|
function truncate(value: string, maxChars: number): string {
|
|
if (value.length <= maxChars) {
|
|
return value;
|
|
}
|
|
if (maxChars <= 1) {
|
|
return value.slice(0, maxChars);
|
|
}
|
|
return `${value.slice(0, maxChars - 1)}…`;
|
|
}
|
|
|
|
function toTrimmedString(value: unknown): string | undefined {
|
|
if (typeof value !== "string") {
|
|
return undefined;
|
|
}
|
|
const trimmed = value.trim();
|
|
return trimmed || undefined;
|
|
}
|
|
|
|
function toFiniteNumber(value: unknown): number | undefined {
|
|
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
|
|
}
|
|
|
|
function resolveAcpStreamLogPathFromSessionFile(sessionFile: string, sessionId: string): string {
|
|
const baseDir = path.dirname(path.resolve(sessionFile));
|
|
return path.join(baseDir, `${sessionId}.acp-stream.jsonl`);
|
|
}
|
|
|
|
export function resolveAcpSpawnStreamLogPath(params: {
|
|
childSessionKey: string;
|
|
}): string | undefined {
|
|
const childSessionKey = params.childSessionKey.trim();
|
|
if (!childSessionKey) {
|
|
return undefined;
|
|
}
|
|
const storeEntry = readAcpSessionEntry({
|
|
sessionKey: childSessionKey,
|
|
});
|
|
const sessionId = storeEntry?.entry?.sessionId?.trim();
|
|
if (!storeEntry || !sessionId) {
|
|
return undefined;
|
|
}
|
|
try {
|
|
const sessionFile = resolveSessionFilePath(
|
|
sessionId,
|
|
storeEntry.entry,
|
|
resolveSessionFilePathOptions({
|
|
storePath: storeEntry.storePath,
|
|
}),
|
|
);
|
|
return resolveAcpStreamLogPathFromSessionFile(sessionFile, sessionId);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
export function startAcpSpawnParentStreamRelay(params: {
|
|
runId: string;
|
|
parentSessionKey: string;
|
|
childSessionKey: string;
|
|
agentId: string;
|
|
logPath?: string;
|
|
streamFlushMs?: number;
|
|
noOutputNoticeMs?: number;
|
|
noOutputPollMs?: number;
|
|
maxRelayLifetimeMs?: number;
|
|
emitStartNotice?: boolean;
|
|
}): AcpSpawnParentRelayHandle {
|
|
const runId = params.runId.trim();
|
|
const parentSessionKey = params.parentSessionKey.trim();
|
|
if (!runId || !parentSessionKey) {
|
|
return {
|
|
dispose: () => {},
|
|
notifyStarted: () => {},
|
|
};
|
|
}
|
|
|
|
const streamFlushMs =
|
|
typeof params.streamFlushMs === "number" && Number.isFinite(params.streamFlushMs)
|
|
? Math.max(0, Math.floor(params.streamFlushMs))
|
|
: DEFAULT_STREAM_FLUSH_MS;
|
|
const noOutputNoticeMs =
|
|
typeof params.noOutputNoticeMs === "number" && Number.isFinite(params.noOutputNoticeMs)
|
|
? Math.max(0, Math.floor(params.noOutputNoticeMs))
|
|
: DEFAULT_NO_OUTPUT_NOTICE_MS;
|
|
const noOutputPollMs =
|
|
typeof params.noOutputPollMs === "number" && Number.isFinite(params.noOutputPollMs)
|
|
? Math.max(250, Math.floor(params.noOutputPollMs))
|
|
: DEFAULT_NO_OUTPUT_POLL_MS;
|
|
const maxRelayLifetimeMs =
|
|
typeof params.maxRelayLifetimeMs === "number" && Number.isFinite(params.maxRelayLifetimeMs)
|
|
? Math.max(1_000, Math.floor(params.maxRelayLifetimeMs))
|
|
: DEFAULT_MAX_RELAY_LIFETIME_MS;
|
|
|
|
const relayLabel = truncate(compactWhitespace(params.agentId), 40) || "ACP child";
|
|
const contextPrefix = `acp-spawn:${runId}`;
|
|
const logPath = toTrimmedString(params.logPath);
|
|
let logDirReady = false;
|
|
let pendingLogLines = "";
|
|
let logFlushScheduled = false;
|
|
let logWriteChain: Promise<void> = Promise.resolve();
|
|
const flushLogBuffer = () => {
|
|
if (!logPath || !pendingLogLines) {
|
|
return;
|
|
}
|
|
const chunk = pendingLogLines;
|
|
pendingLogLines = "";
|
|
logWriteChain = logWriteChain
|
|
.then(async () => {
|
|
if (!logDirReady) {
|
|
await mkdir(path.dirname(logPath), {
|
|
recursive: true,
|
|
});
|
|
logDirReady = true;
|
|
}
|
|
await appendFile(logPath, chunk, {
|
|
encoding: "utf-8",
|
|
mode: 0o600,
|
|
});
|
|
})
|
|
.catch(() => {
|
|
// Best-effort diagnostics; never break relay flow.
|
|
});
|
|
};
|
|
const scheduleLogFlush = () => {
|
|
if (!logPath || logFlushScheduled) {
|
|
return;
|
|
}
|
|
logFlushScheduled = true;
|
|
queueMicrotask(() => {
|
|
logFlushScheduled = false;
|
|
flushLogBuffer();
|
|
});
|
|
};
|
|
const writeLogLine = (entry: Record<string, unknown>) => {
|
|
if (!logPath) {
|
|
return;
|
|
}
|
|
try {
|
|
pendingLogLines += `${JSON.stringify(entry)}\n`;
|
|
if (pendingLogLines.length >= 16_384) {
|
|
flushLogBuffer();
|
|
return;
|
|
}
|
|
scheduleLogFlush();
|
|
} catch {
|
|
// Best-effort diagnostics; never break relay flow.
|
|
}
|
|
};
|
|
const logEvent = (kind: string, fields?: Record<string, unknown>) => {
|
|
writeLogLine({
|
|
ts: new Date().toISOString(),
|
|
epochMs: Date.now(),
|
|
runId,
|
|
parentSessionKey,
|
|
childSessionKey: params.childSessionKey,
|
|
agentId: params.agentId,
|
|
kind,
|
|
...fields,
|
|
});
|
|
};
|
|
const wake = () => {
|
|
requestHeartbeatNow(
|
|
scopedHeartbeatWakeOptions(parentSessionKey, {
|
|
reason: "acp:spawn:stream",
|
|
}),
|
|
);
|
|
};
|
|
const emit = (text: string, contextKey: string) => {
|
|
const cleaned = text.trim();
|
|
if (!cleaned) {
|
|
return;
|
|
}
|
|
logEvent("system_event", { contextKey, text: cleaned });
|
|
enqueueSystemEvent(cleaned, { sessionKey: parentSessionKey, contextKey });
|
|
wake();
|
|
};
|
|
const emitStartNotice = () => {
|
|
emit(
|
|
`Started ${relayLabel} session ${params.childSessionKey}. Streaming progress updates to parent session.`,
|
|
`${contextPrefix}:start`,
|
|
);
|
|
};
|
|
|
|
let disposed = false;
|
|
let pendingText = "";
|
|
let lastProgressAt = Date.now();
|
|
let stallNotified = false;
|
|
let flushTimer: NodeJS.Timeout | undefined;
|
|
let relayLifetimeTimer: NodeJS.Timeout | undefined;
|
|
|
|
const clearFlushTimer = () => {
|
|
if (!flushTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(flushTimer);
|
|
flushTimer = undefined;
|
|
};
|
|
const clearRelayLifetimeTimer = () => {
|
|
if (!relayLifetimeTimer) {
|
|
return;
|
|
}
|
|
clearTimeout(relayLifetimeTimer);
|
|
relayLifetimeTimer = undefined;
|
|
};
|
|
|
|
const flushPending = () => {
|
|
clearFlushTimer();
|
|
if (!pendingText) {
|
|
return;
|
|
}
|
|
const snippet = truncate(compactWhitespace(pendingText), STREAM_SNIPPET_MAX_CHARS);
|
|
pendingText = "";
|
|
if (!snippet) {
|
|
return;
|
|
}
|
|
emit(`${relayLabel}: ${snippet}`, `${contextPrefix}:progress`);
|
|
};
|
|
|
|
const scheduleFlush = () => {
|
|
if (disposed || flushTimer || streamFlushMs <= 0) {
|
|
return;
|
|
}
|
|
flushTimer = setTimeout(() => {
|
|
flushPending();
|
|
}, streamFlushMs);
|
|
flushTimer.unref?.();
|
|
};
|
|
|
|
const noOutputWatcherTimer = setInterval(() => {
|
|
if (disposed || noOutputNoticeMs <= 0) {
|
|
return;
|
|
}
|
|
if (stallNotified) {
|
|
return;
|
|
}
|
|
if (Date.now() - lastProgressAt < noOutputNoticeMs) {
|
|
return;
|
|
}
|
|
stallNotified = true;
|
|
emit(
|
|
`${relayLabel} has produced no output for ${Math.round(noOutputNoticeMs / 1000)}s. It may be waiting for interactive input.`,
|
|
`${contextPrefix}:stall`,
|
|
);
|
|
}, noOutputPollMs);
|
|
noOutputWatcherTimer.unref?.();
|
|
|
|
relayLifetimeTimer = setTimeout(() => {
|
|
if (disposed) {
|
|
return;
|
|
}
|
|
emit(
|
|
`${relayLabel} stream relay timed out after ${Math.max(1, Math.round(maxRelayLifetimeMs / 1000))}s without completion.`,
|
|
`${contextPrefix}:timeout`,
|
|
);
|
|
dispose();
|
|
}, maxRelayLifetimeMs);
|
|
relayLifetimeTimer.unref?.();
|
|
|
|
if (params.emitStartNotice !== false) {
|
|
emitStartNotice();
|
|
}
|
|
|
|
const unsubscribe = onAgentEvent((event) => {
|
|
if (disposed || event.runId !== runId) {
|
|
return;
|
|
}
|
|
|
|
if (event.stream === "assistant") {
|
|
const data = event.data;
|
|
const deltaCandidate =
|
|
(data as { delta?: unknown } | undefined)?.delta ??
|
|
(data as { text?: unknown } | undefined)?.text;
|
|
const delta = typeof deltaCandidate === "string" ? deltaCandidate : undefined;
|
|
if (!delta || !delta.trim()) {
|
|
return;
|
|
}
|
|
logEvent("assistant_delta", { delta });
|
|
|
|
if (stallNotified) {
|
|
stallNotified = false;
|
|
emit(`${relayLabel} resumed output.`, `${contextPrefix}:resumed`);
|
|
}
|
|
|
|
lastProgressAt = Date.now();
|
|
pendingText += delta;
|
|
if (pendingText.length > STREAM_BUFFER_MAX_CHARS) {
|
|
pendingText = pendingText.slice(-STREAM_BUFFER_MAX_CHARS);
|
|
}
|
|
if (pendingText.length >= STREAM_SNIPPET_MAX_CHARS || delta.includes("\n\n")) {
|
|
flushPending();
|
|
return;
|
|
}
|
|
scheduleFlush();
|
|
return;
|
|
}
|
|
|
|
if (event.stream !== "lifecycle") {
|
|
return;
|
|
}
|
|
|
|
const phase = toTrimmedString((event.data as { phase?: unknown } | undefined)?.phase);
|
|
logEvent("lifecycle", { phase: phase ?? "unknown", data: event.data });
|
|
if (phase === "end") {
|
|
flushPending();
|
|
const startedAt = toFiniteNumber(
|
|
(event.data as { startedAt?: unknown } | undefined)?.startedAt,
|
|
);
|
|
const endedAt = toFiniteNumber((event.data as { endedAt?: unknown } | undefined)?.endedAt);
|
|
const durationMs =
|
|
startedAt != null && endedAt != null && endedAt >= startedAt
|
|
? endedAt - startedAt
|
|
: undefined;
|
|
if (durationMs != null) {
|
|
emit(
|
|
`${relayLabel} run completed in ${Math.max(1, Math.round(durationMs / 1000))}s.`,
|
|
`${contextPrefix}:done`,
|
|
);
|
|
} else {
|
|
emit(`${relayLabel} run completed.`, `${contextPrefix}:done`);
|
|
}
|
|
dispose();
|
|
return;
|
|
}
|
|
|
|
if (phase === "error") {
|
|
flushPending();
|
|
const errorText = toTrimmedString((event.data as { error?: unknown } | undefined)?.error);
|
|
if (errorText) {
|
|
emit(`${relayLabel} run failed: ${errorText}`, `${contextPrefix}:error`);
|
|
} else {
|
|
emit(`${relayLabel} run failed.`, `${contextPrefix}:error`);
|
|
}
|
|
dispose();
|
|
}
|
|
});
|
|
|
|
const dispose = () => {
|
|
if (disposed) {
|
|
return;
|
|
}
|
|
disposed = true;
|
|
clearFlushTimer();
|
|
clearRelayLifetimeTimer();
|
|
flushLogBuffer();
|
|
clearInterval(noOutputWatcherTimer);
|
|
unsubscribe();
|
|
};
|
|
|
|
return {
|
|
dispose,
|
|
notifyStarted: emitStartNotice,
|
|
};
|
|
}
|
|
|
|
export type AcpSpawnParentRelayHandle = {
|
|
dispose: () => void;
|
|
notifyStarted: () => void;
|
|
};
|