Files
openclaw/extensions/acpx/src/runtime.ts
Bob 61f7cea48b fix: kill stuck ACP child processes on startup and harden sessions in discord threads (#33699)
* Gateway: resolve agent.wait for chat.send runs

* Discord: harden ACP thread binding + listener timeout

* ACPX: handle already-exited child wait

* Gateway/Discord: address PR review findings

* Discord: keep ACP error-state thread bindings on startup

* gateway: make agent.wait dedupe bridge event-driven

* discord: harden ACP probe classification and cap startup fan-out

* discord: add cooperative timeout cancellation

* discord: fix startup probe concurrency helper typing

* plugin-sdk: avoid Windows root-alias shard timeout

* plugin-sdk: keep root alias reflection path non-blocking

* discord+gateway: resolve remaining PR review findings

* gateway+discord: fix codex review regressions

* Discord/Gateway: address Codex review findings

* Gateway: keep agent.wait lifecycle active with shared run IDs

* Discord: clean up status reactions on aborted runs

* fix: add changelog note for ACP/Discord startup hardening (#33699) (thanks @dutifulbob)

---------

Co-authored-by: Onur <2453968+osolmaz@users.noreply.github.com>
2026-03-04 10:52:28 +01:00

648 lines
20 KiB
TypeScript

import { createInterface } from "node:readline";
import type {
AcpRuntimeCapabilities,
AcpRuntimeDoctorReport,
AcpRuntime,
AcpRuntimeEnsureInput,
AcpRuntimeErrorCode,
AcpRuntimeEvent,
AcpRuntimeHandle,
AcpRuntimeStatus,
AcpRuntimeTurnInput,
PluginLogger,
} from "openclaw/plugin-sdk/acpx";
import { AcpRuntimeError } from "openclaw/plugin-sdk/acpx";
import { type ResolvedAcpxPluginConfig } from "./config.js";
import { checkAcpxVersion } from "./ensure.js";
import {
parseJsonLines,
parsePromptEventLine,
toAcpxErrorEvent,
} from "./runtime-internals/events.js";
import {
resolveSpawnFailure,
type SpawnCommandCache,
type SpawnCommandOptions,
type SpawnResolutionEvent,
spawnAndCollect,
spawnWithResolvedCommand,
waitForExit,
} from "./runtime-internals/process.js";
import {
asOptionalString,
asTrimmedString,
buildPermissionArgs,
deriveAgentFromSessionKey,
isRecord,
type AcpxHandleState,
type AcpxJsonObject,
} from "./runtime-internals/shared.js";
export const ACPX_BACKEND_ID = "acpx";
const ACPX_RUNTIME_HANDLE_PREFIX = "acpx:v1:";
const DEFAULT_AGENT_FALLBACK = "codex";
const ACPX_CAPABILITIES: AcpRuntimeCapabilities = {
controls: ["session/set_mode", "session/set_config_option", "session/status"],
};
export function encodeAcpxRuntimeHandleState(state: AcpxHandleState): string {
const payload = Buffer.from(JSON.stringify(state), "utf8").toString("base64url");
return `${ACPX_RUNTIME_HANDLE_PREFIX}${payload}`;
}
export function decodeAcpxRuntimeHandleState(runtimeSessionName: string): AcpxHandleState | null {
const trimmed = runtimeSessionName.trim();
if (!trimmed.startsWith(ACPX_RUNTIME_HANDLE_PREFIX)) {
return null;
}
const encoded = trimmed.slice(ACPX_RUNTIME_HANDLE_PREFIX.length);
if (!encoded) {
return null;
}
try {
const raw = Buffer.from(encoded, "base64url").toString("utf8");
const parsed = JSON.parse(raw) as unknown;
if (!isRecord(parsed)) {
return null;
}
const name = asTrimmedString(parsed.name);
const agent = asTrimmedString(parsed.agent);
const cwd = asTrimmedString(parsed.cwd);
const mode = asTrimmedString(parsed.mode);
const acpxRecordId = asOptionalString(parsed.acpxRecordId);
const backendSessionId = asOptionalString(parsed.backendSessionId);
const agentSessionId = asOptionalString(parsed.agentSessionId);
if (!name || !agent || !cwd) {
return null;
}
if (mode !== "persistent" && mode !== "oneshot") {
return null;
}
return {
name,
agent,
cwd,
mode,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
} catch {
return null;
}
}
export class AcpxRuntime implements AcpRuntime {
private healthy = false;
private readonly logger?: PluginLogger;
private readonly queueOwnerTtlSeconds: number;
private readonly spawnCommandCache: SpawnCommandCache = {};
private readonly spawnCommandOptions: SpawnCommandOptions;
private readonly loggedSpawnResolutions = new Set<string>();
constructor(
private readonly config: ResolvedAcpxPluginConfig,
opts?: {
logger?: PluginLogger;
queueOwnerTtlSeconds?: number;
},
) {
this.logger = opts?.logger;
const requestedQueueOwnerTtlSeconds = opts?.queueOwnerTtlSeconds;
this.queueOwnerTtlSeconds =
typeof requestedQueueOwnerTtlSeconds === "number" &&
Number.isFinite(requestedQueueOwnerTtlSeconds) &&
requestedQueueOwnerTtlSeconds >= 0
? requestedQueueOwnerTtlSeconds
: this.config.queueOwnerTtlSeconds;
this.spawnCommandOptions = {
strictWindowsCmdWrapper: this.config.strictWindowsCmdWrapper,
cache: this.spawnCommandCache,
onResolved: (event) => {
this.logSpawnResolution(event);
},
};
}
isHealthy(): boolean {
return this.healthy;
}
private logSpawnResolution(event: SpawnResolutionEvent): void {
const key = `${event.command}::${event.strictWindowsCmdWrapper ? "strict" : "compat"}::${event.resolution}`;
if (event.cacheHit || this.loggedSpawnResolutions.has(key)) {
return;
}
this.loggedSpawnResolutions.add(key);
this.logger?.debug?.(
`acpx spawn resolver: command=${event.command} mode=${event.strictWindowsCmdWrapper ? "strict" : "compat"} resolution=${event.resolution}`,
);
}
async probeAvailability(): Promise<void> {
const versionCheck = await checkAcpxVersion({
command: this.config.command,
cwd: this.config.cwd,
expectedVersion: this.config.expectedVersion,
spawnOptions: this.spawnCommandOptions,
});
if (!versionCheck.ok) {
this.healthy = false;
return;
}
try {
const result = await spawnAndCollect(
{
command: this.config.command,
args: ["--help"],
cwd: this.config.cwd,
},
this.spawnCommandOptions,
);
this.healthy = result.error == null && (result.code ?? 0) === 0;
} catch {
this.healthy = false;
}
}
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
const sessionName = asTrimmedString(input.sessionKey);
if (!sessionName) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required.");
}
const agent = asTrimmedString(input.agent);
if (!agent) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP agent id is required.");
}
const cwd = asTrimmedString(input.cwd) || this.config.cwd;
const mode = input.mode;
let events = await this.runControlCommand({
args: this.buildControlArgs({
cwd,
command: [agent, "sessions", "ensure", "--name", sessionName],
}),
cwd,
fallbackCode: "ACP_SESSION_INIT_FAILED",
});
let ensuredEvent = events.find(
(event) =>
asOptionalString(event.agentSessionId) ||
asOptionalString(event.acpxSessionId) ||
asOptionalString(event.acpxRecordId),
);
if (!ensuredEvent) {
events = await this.runControlCommand({
args: this.buildControlArgs({
cwd,
command: [agent, "sessions", "new", "--name", sessionName],
}),
cwd,
fallbackCode: "ACP_SESSION_INIT_FAILED",
});
ensuredEvent = events.find(
(event) =>
asOptionalString(event.agentSessionId) ||
asOptionalString(event.acpxSessionId) ||
asOptionalString(event.acpxRecordId),
);
if (!ensuredEvent) {
throw new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
`ACP session init failed: neither 'sessions ensure' nor 'sessions new' returned valid session identifiers for ${sessionName}.`,
);
}
}
const acpxRecordId = ensuredEvent ? asOptionalString(ensuredEvent.acpxRecordId) : undefined;
const agentSessionId = ensuredEvent ? asOptionalString(ensuredEvent.agentSessionId) : undefined;
const backendSessionId = ensuredEvent
? asOptionalString(ensuredEvent.acpxSessionId)
: undefined;
return {
sessionKey: input.sessionKey,
backend: ACPX_BACKEND_ID,
runtimeSessionName: encodeAcpxRuntimeHandleState({
name: sessionName,
agent,
cwd,
mode,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
}),
cwd,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
}
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
const state = this.resolveHandleState(input.handle);
const args = this.buildPromptArgs({
agent: state.agent,
sessionName: state.name,
cwd: state.cwd,
});
const cancelOnAbort = async () => {
await this.cancel({
handle: input.handle,
reason: "abort-signal",
}).catch((err) => {
this.logger?.warn?.(`acpx runtime abort-cancel failed: ${String(err)}`);
});
};
const onAbort = () => {
void cancelOnAbort();
};
if (input.signal?.aborted) {
await cancelOnAbort();
return;
}
if (input.signal) {
input.signal.addEventListener("abort", onAbort, { once: true });
}
const child = spawnWithResolvedCommand(
{
command: this.config.command,
args,
cwd: state.cwd,
},
this.spawnCommandOptions,
);
child.stdin.on("error", () => {
// Ignore EPIPE when the child exits before stdin flush completes.
});
child.stdin.end(input.text);
let stderr = "";
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
let sawDone = false;
let sawError = false;
const lines = createInterface({ input: child.stdout });
try {
for await (const line of lines) {
const parsed = parsePromptEventLine(line);
if (!parsed) {
continue;
}
if (parsed.type === "done") {
if (sawDone) {
continue;
}
sawDone = true;
}
if (parsed.type === "error") {
sawError = true;
}
yield parsed;
}
const exit = await waitForExit(child);
if (exit.error) {
const spawnFailure = resolveSpawnFailure(exit.error, state.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
`acpx command not found: ${this.config.command}`,
{ cause: exit.error },
);
}
if (spawnFailure === "missing-cwd") {
throw new AcpRuntimeError(
"ACP_TURN_FAILED",
`ACP runtime working directory does not exist: ${state.cwd}`,
{ cause: exit.error },
);
}
throw new AcpRuntimeError("ACP_TURN_FAILED", exit.error.message, { cause: exit.error });
}
if ((exit.code ?? 0) !== 0 && !sawError) {
yield {
type: "error",
message: stderr.trim() || `acpx exited with code ${exit.code ?? "unknown"}`,
};
return;
}
if (!sawDone && !sawError) {
yield { type: "done" };
}
} finally {
lines.close();
if (input.signal) {
input.signal.removeEventListener("abort", onAbort);
}
}
}
getCapabilities(): AcpRuntimeCapabilities {
return ACPX_CAPABILITIES;
}
async getStatus(input: {
handle: AcpRuntimeHandle;
signal?: AbortSignal;
}): Promise<AcpRuntimeStatus> {
const state = this.resolveHandleState(input.handle);
const events = await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "status", "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
signal: input.signal,
});
const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0];
if (!detail) {
return {
summary: "acpx status unavailable",
};
}
const status = asTrimmedString(detail.status) || "unknown";
const acpxRecordId = asOptionalString(detail.acpxRecordId);
const acpxSessionId = asOptionalString(detail.acpxSessionId);
const agentSessionId = asOptionalString(detail.agentSessionId);
const pid = typeof detail.pid === "number" && Number.isFinite(detail.pid) ? detail.pid : null;
const summary = [
`status=${status}`,
acpxRecordId ? `acpxRecordId=${acpxRecordId}` : null,
acpxSessionId ? `acpxSessionId=${acpxSessionId}` : null,
pid != null ? `pid=${pid}` : null,
]
.filter(Boolean)
.join(" ");
return {
summary,
...(acpxRecordId ? { acpxRecordId } : {}),
...(acpxSessionId ? { backendSessionId: acpxSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
details: detail,
};
}
async setMode(input: { handle: AcpRuntimeHandle; mode: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
const mode = asTrimmedString(input.mode);
if (!mode) {
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP runtime mode is required.");
}
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "set-mode", mode, "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
});
}
async setConfigOption(input: {
handle: AcpRuntimeHandle;
key: string;
value: string;
}): Promise<void> {
const state = this.resolveHandleState(input.handle);
const key = asTrimmedString(input.key);
const value = asTrimmedString(input.value);
if (!key || !value) {
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP config option key/value are required.");
}
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "set", key, value, "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
});
}
async doctor(): Promise<AcpRuntimeDoctorReport> {
const versionCheck = await checkAcpxVersion({
command: this.config.command,
cwd: this.config.cwd,
expectedVersion: this.config.expectedVersion,
spawnOptions: this.spawnCommandOptions,
});
if (!versionCheck.ok) {
this.healthy = false;
const details = [
versionCheck.expectedVersion ? `expected=${versionCheck.expectedVersion}` : null,
versionCheck.installedVersion ? `installed=${versionCheck.installedVersion}` : null,
].filter((detail): detail is string => Boolean(detail));
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: versionCheck.message,
installCommand: versionCheck.installCommand,
details,
};
}
try {
const result = await spawnAndCollect(
{
command: this.config.command,
args: ["--help"],
cwd: this.config.cwd,
},
this.spawnCommandOptions,
);
if (result.error) {
const spawnFailure = resolveSpawnFailure(result.error, this.config.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: `acpx command not found: ${this.config.command}`,
installCommand: this.config.installCommand,
};
}
if (spawnFailure === "missing-cwd") {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: `ACP runtime working directory does not exist: ${this.config.cwd}`,
};
}
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: result.error.message,
details: [String(result.error)],
};
}
if ((result.code ?? 0) !== 0) {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: result.stderr.trim() || `acpx exited with code ${result.code ?? "unknown"}`,
};
}
this.healthy = true;
return {
ok: true,
message: `acpx command available (${this.config.command}, version ${versionCheck.version}${this.config.expectedVersion ? `, expected ${this.config.expectedVersion}` : ""})`,
};
} catch (error) {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: error instanceof Error ? error.message : String(error),
};
}
}
async cancel(input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "cancel", "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
}
async close(input: { handle: AcpRuntimeHandle; reason: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "sessions", "close", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
}
private resolveHandleState(handle: AcpRuntimeHandle): AcpxHandleState {
const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName);
if (decoded) {
return decoded;
}
const legacyName = asTrimmedString(handle.runtimeSessionName);
if (!legacyName) {
throw new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
"Invalid acpx runtime handle: runtimeSessionName is missing.",
);
}
return {
name: legacyName,
agent: deriveAgentFromSessionKey(handle.sessionKey, DEFAULT_AGENT_FALLBACK),
cwd: this.config.cwd,
mode: "persistent",
};
}
private buildControlArgs(params: { cwd: string; command: string[] }): string[] {
return ["--format", "json", "--json-strict", "--cwd", params.cwd, ...params.command];
}
private buildPromptArgs(params: { agent: string; sessionName: string; cwd: string }): string[] {
const args = [
"--format",
"json",
"--json-strict",
"--cwd",
params.cwd,
...buildPermissionArgs(this.config.permissionMode),
"--non-interactive-permissions",
this.config.nonInteractivePermissions,
];
if (this.config.timeoutSeconds) {
args.push("--timeout", String(this.config.timeoutSeconds));
}
args.push("--ttl", String(this.queueOwnerTtlSeconds));
args.push(params.agent, "prompt", "--session", params.sessionName, "--file", "-");
return args;
}
private async runControlCommand(params: {
args: string[];
cwd: string;
fallbackCode: AcpRuntimeErrorCode;
ignoreNoSession?: boolean;
signal?: AbortSignal;
}): Promise<AcpxJsonObject[]> {
const result = await spawnAndCollect(
{
command: this.config.command,
args: params.args,
cwd: params.cwd,
},
this.spawnCommandOptions,
{
signal: params.signal,
},
);
if (result.error) {
const spawnFailure = resolveSpawnFailure(result.error, params.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
`acpx command not found: ${this.config.command}`,
{ cause: result.error },
);
}
if (spawnFailure === "missing-cwd") {
throw new AcpRuntimeError(
params.fallbackCode,
`ACP runtime working directory does not exist: ${params.cwd}`,
{ cause: result.error },
);
}
throw new AcpRuntimeError(params.fallbackCode, result.error.message, { cause: result.error });
}
const events = parseJsonLines(result.stdout);
const errorEvent = events.map((event) => toAcpxErrorEvent(event)).find(Boolean) ?? null;
if (errorEvent) {
if (params.ignoreNoSession && errorEvent.code === "NO_SESSION") {
return events;
}
throw new AcpRuntimeError(
params.fallbackCode,
errorEvent.code ? `${errorEvent.code}: ${errorEvent.message}` : errorEvent.message,
);
}
if ((result.code ?? 0) !== 0) {
throw new AcpRuntimeError(
params.fallbackCode,
result.stderr.trim() || `acpx exited with code ${result.code ?? "unknown"}`,
);
}
return events;
}
}