Files
openclaw/extensions/acpx/src/runtime.ts
Onur Solmaz a7d56e3554 feat: ACP thread-bound agents (#23580)
* docs: add ACP thread-bound agents plan doc

* docs: expand ACP implementation specification

* feat(acp): route ACP sessions through core dispatch and lifecycle cleanup

* feat(acp): add /acp commands and Discord spawn gate

* ACP: add acpx runtime plugin backend

* fix(subagents): defer transient lifecycle errors before announce

* Agents: harden ACP sessions_spawn and tighten spawn guidance

* Agents: require explicit ACP target for runtime spawns

* docs: expand ACP control-plane implementation plan

* ACP: harden metadata seeding and spawn guidance

* ACP: centralize runtime control-plane manager and fail-closed dispatch

* ACP: harden runtime manager and unify spawn helpers

* Commands: route ACP sessions through ACP runtime in agent command

* ACP: require persisted metadata for runtime spawns

* Sessions: preserve ACP metadata when updating entries

* Plugins: harden ACP backend registry across loaders

* ACPX: make availability probe compatible with adapters

* E2E: add manual Discord ACP plain-language smoke script

* ACPX: preserve streamed spacing across Discord delivery

* Docs: add ACP Discord streaming strategy

* ACP: harden Discord stream buffering for thread replies

* ACP: reuse shared block reply pipeline for projector

* ACP: unify streaming config and adopt coalesceIdleMs

* Docs: add temporary ACP production hardening plan

* Docs: trim temporary ACP hardening plan goals

* Docs: gate ACP thread controls by backend capabilities

* ACP: add capability-gated runtime controls and /acp operator commands

* Docs: remove temporary ACP hardening plan

* ACP: fix spawn target validation and close cache cleanup

* ACP: harden runtime dispatch and recovery paths

* ACP: split ACP command/runtime internals and centralize policy

* ACP: harden runtime lifecycle, validation, and observability

* ACP: surface runtime and backend session IDs in thread bindings

* docs: add temp plan for binding-service migration

* ACP: migrate thread binding flows to SessionBindingService

* ACP: address review feedback and preserve prompt wording

* ACPX plugin: pin runtime dependency and prefer bundled CLI

* Discord: complete binding-service migration cleanup and restore ACP plan

* Docs: add standalone ACP agents guide

* ACP: route harness intents to thread-bound ACP sessions

* ACP: fix spawn thread routing and queue-owner stall

* ACP: harden startup reconciliation and command bypass handling

* ACP: fix dispatch bypass type narrowing

* ACP: align runtime metadata to agentSessionId

* ACP: normalize session identifier handling and labels

* ACP: mark thread banner session ids provisional until first reply

* ACP: stabilize session identity mapping and startup reconciliation

* ACP: add resolved session-id notices and cwd in thread intros

* Discord: prefix thread meta notices consistently

* Discord: unify ACP/thread meta notices with gear prefix

* Discord: split thread persona naming from meta formatting

* Extensions: bump acpx plugin dependency to 0.1.9

* Agents: gate ACP prompt guidance behind acp.enabled

* Docs: remove temp experiment plan docs

* Docs: scope streaming plan to holy grail refactor

* Docs: refactor ACP agents guide for human-first flow

* Docs/Skill: add ACP feature-flag guidance and direct acpx telephone-game flow

* Docs/Skill: add OpenCode and Pi to ACP harness lists

* Docs/Skill: align ACP harness list with current acpx registry

* Dev/Test: move ACP plain-language smoke script and mark as keep

* Docs/Skill: reorder ACP harness lists with Pi first

* ACP: split control-plane manager into core/types/utils modules

* Docs: refresh ACP thread-bound agents plan

* ACP: extract dispatch lane and split manager domains

* ACP: centralize binding context and remove reverse deps

* Infra: unify system message formatting

* ACP: centralize error boundaries and session id rendering

* ACP: enforce init concurrency cap and strict meta clear

* Tests: fix ACP dispatch binding mock typing

* Tests: fix Discord thread-binding mock drift and ACP request id

* ACP: gate slash bypass and persist cleared overrides

* ACPX: await pre-abort cancel before runTurn return

* Extension: pin acpx runtime dependency to 0.1.11

* Docs: add pinned acpx install strategy for ACP extension

* Extensions/acpx: enforce strict local pinned startup

* Extensions/acpx: tighten acp-router install guidance

* ACPX: retry runtime test temp-dir cleanup

* Extensions/acpx: require proactive ACPX repair for thread spawns

* Extensions/acpx: require restart offer after acpx reinstall

* extensions/acpx: remove workspace protocol devDependency

* extensions/acpx: bump pinned acpx to 0.1.13

* extensions/acpx: sync lockfile after dependency bump

* ACPX: make runtime spawn Windows-safe

* fix: align doctor-config-flow repair tests with default-account migration (#23580) (thanks @osolmaz)
2026-02-26 11:00:09 +01:00

579 lines
18 KiB
TypeScript

import { createInterface } from "node:readline";
import type {
AcpRuntimeCapabilities,
AcpRuntimeDoctorReport,
AcpRuntime,
AcpRuntimeEnsureInput,
AcpRuntimeErrorCode,
AcpRuntimeEvent,
AcpRuntimeHandle,
AcpRuntimeStatus,
AcpRuntimeTurnInput,
PluginLogger,
} from "openclaw/plugin-sdk";
import { AcpRuntimeError } from "openclaw/plugin-sdk";
import {
ACPX_LOCAL_INSTALL_COMMAND,
ACPX_PINNED_VERSION,
type ResolvedAcpxPluginConfig,
} from "./config.js";
import { checkPinnedAcpxVersion } from "./ensure.js";
import {
parseJsonLines,
parsePromptEventLine,
toAcpxErrorEvent,
} from "./runtime-internals/events.js";
import {
resolveSpawnFailure,
spawnAndCollect,
spawnWithResolvedCommand,
waitForExit,
} from "./runtime-internals/process.js";
import {
asOptionalString,
asTrimmedString,
buildPermissionArgs,
deriveAgentFromSessionKey,
isRecord,
type AcpxHandleState,
type AcpxJsonObject,
} from "./runtime-internals/shared.js";
export const ACPX_BACKEND_ID = "acpx";
const ACPX_RUNTIME_HANDLE_PREFIX = "acpx:v1:";
const DEFAULT_AGENT_FALLBACK = "codex";
const ACPX_CAPABILITIES: AcpRuntimeCapabilities = {
controls: ["session/set_mode", "session/set_config_option", "session/status"],
};
export function encodeAcpxRuntimeHandleState(state: AcpxHandleState): string {
const payload = Buffer.from(JSON.stringify(state), "utf8").toString("base64url");
return `${ACPX_RUNTIME_HANDLE_PREFIX}${payload}`;
}
export function decodeAcpxRuntimeHandleState(runtimeSessionName: string): AcpxHandleState | null {
const trimmed = runtimeSessionName.trim();
if (!trimmed.startsWith(ACPX_RUNTIME_HANDLE_PREFIX)) {
return null;
}
const encoded = trimmed.slice(ACPX_RUNTIME_HANDLE_PREFIX.length);
if (!encoded) {
return null;
}
try {
const raw = Buffer.from(encoded, "base64url").toString("utf8");
const parsed = JSON.parse(raw) as unknown;
if (!isRecord(parsed)) {
return null;
}
const name = asTrimmedString(parsed.name);
const agent = asTrimmedString(parsed.agent);
const cwd = asTrimmedString(parsed.cwd);
const mode = asTrimmedString(parsed.mode);
const acpxRecordId = asOptionalString(parsed.acpxRecordId);
const backendSessionId = asOptionalString(parsed.backendSessionId);
const agentSessionId = asOptionalString(parsed.agentSessionId);
if (!name || !agent || !cwd) {
return null;
}
if (mode !== "persistent" && mode !== "oneshot") {
return null;
}
return {
name,
agent,
cwd,
mode,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
} catch {
return null;
}
}
export class AcpxRuntime implements AcpRuntime {
private healthy = false;
private readonly logger?: PluginLogger;
private readonly queueOwnerTtlSeconds: number;
constructor(
private readonly config: ResolvedAcpxPluginConfig,
opts?: {
logger?: PluginLogger;
queueOwnerTtlSeconds?: number;
},
) {
this.logger = opts?.logger;
const requestedQueueOwnerTtlSeconds = opts?.queueOwnerTtlSeconds;
this.queueOwnerTtlSeconds =
typeof requestedQueueOwnerTtlSeconds === "number" &&
Number.isFinite(requestedQueueOwnerTtlSeconds) &&
requestedQueueOwnerTtlSeconds >= 0
? requestedQueueOwnerTtlSeconds
: this.config.queueOwnerTtlSeconds;
}
isHealthy(): boolean {
return this.healthy;
}
async probeAvailability(): Promise<void> {
const versionCheck = await checkPinnedAcpxVersion({
command: this.config.command,
cwd: this.config.cwd,
expectedVersion: ACPX_PINNED_VERSION,
});
if (!versionCheck.ok) {
this.healthy = false;
return;
}
try {
const result = await spawnAndCollect({
command: this.config.command,
args: ["--help"],
cwd: this.config.cwd,
});
this.healthy = result.error == null && (result.code ?? 0) === 0;
} catch {
this.healthy = false;
}
}
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
const sessionName = asTrimmedString(input.sessionKey);
if (!sessionName) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP session key is required.");
}
const agent = asTrimmedString(input.agent);
if (!agent) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "ACP agent id is required.");
}
const cwd = asTrimmedString(input.cwd) || this.config.cwd;
const mode = input.mode;
const events = await this.runControlCommand({
args: this.buildControlArgs({
cwd,
command: [agent, "sessions", "ensure", "--name", sessionName],
}),
cwd,
fallbackCode: "ACP_SESSION_INIT_FAILED",
});
const ensuredEvent = events.find(
(event) =>
asOptionalString(event.agentSessionId) ||
asOptionalString(event.acpxSessionId) ||
asOptionalString(event.acpxRecordId),
);
const acpxRecordId = ensuredEvent ? asOptionalString(ensuredEvent.acpxRecordId) : undefined;
const agentSessionId = ensuredEvent ? asOptionalString(ensuredEvent.agentSessionId) : undefined;
const backendSessionId = ensuredEvent
? asOptionalString(ensuredEvent.acpxSessionId)
: undefined;
return {
sessionKey: input.sessionKey,
backend: ACPX_BACKEND_ID,
runtimeSessionName: encodeAcpxRuntimeHandleState({
name: sessionName,
agent,
cwd,
mode,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
}),
cwd,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
}
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
const state = this.resolveHandleState(input.handle);
const args = this.buildPromptArgs({
agent: state.agent,
sessionName: state.name,
cwd: state.cwd,
});
const cancelOnAbort = async () => {
await this.cancel({
handle: input.handle,
reason: "abort-signal",
}).catch((err) => {
this.logger?.warn?.(`acpx runtime abort-cancel failed: ${String(err)}`);
});
};
const onAbort = () => {
void cancelOnAbort();
};
if (input.signal?.aborted) {
await cancelOnAbort();
return;
}
if (input.signal) {
input.signal.addEventListener("abort", onAbort, { once: true });
}
const child = spawnWithResolvedCommand({
command: this.config.command,
args,
cwd: state.cwd,
});
child.stdin.on("error", () => {
// Ignore EPIPE when the child exits before stdin flush completes.
});
child.stdin.end(input.text);
let stderr = "";
child.stderr.on("data", (chunk) => {
stderr += String(chunk);
});
let sawDone = false;
let sawError = false;
const lines = createInterface({ input: child.stdout });
try {
for await (const line of lines) {
const parsed = parsePromptEventLine(line);
if (!parsed) {
continue;
}
if (parsed.type === "done") {
sawDone = true;
}
if (parsed.type === "error") {
sawError = true;
}
yield parsed;
}
const exit = await waitForExit(child);
if (exit.error) {
const spawnFailure = resolveSpawnFailure(exit.error, state.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
`acpx command not found: ${this.config.command}`,
{ cause: exit.error },
);
}
if (spawnFailure === "missing-cwd") {
throw new AcpRuntimeError(
"ACP_TURN_FAILED",
`ACP runtime working directory does not exist: ${state.cwd}`,
{ cause: exit.error },
);
}
throw new AcpRuntimeError("ACP_TURN_FAILED", exit.error.message, { cause: exit.error });
}
if ((exit.code ?? 0) !== 0 && !sawError) {
yield {
type: "error",
message: stderr.trim() || `acpx exited with code ${exit.code ?? "unknown"}`,
};
return;
}
if (!sawDone && !sawError) {
yield { type: "done" };
}
} finally {
lines.close();
if (input.signal) {
input.signal.removeEventListener("abort", onAbort);
}
}
}
getCapabilities(): AcpRuntimeCapabilities {
return ACPX_CAPABILITIES;
}
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
const state = this.resolveHandleState(input.handle);
const events = await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "status", "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0];
if (!detail) {
return {
summary: "acpx status unavailable",
};
}
const status = asTrimmedString(detail.status) || "unknown";
const acpxRecordId = asOptionalString(detail.acpxRecordId);
const acpxSessionId = asOptionalString(detail.acpxSessionId);
const agentSessionId = asOptionalString(detail.agentSessionId);
const pid = typeof detail.pid === "number" && Number.isFinite(detail.pid) ? detail.pid : null;
const summary = [
`status=${status}`,
acpxRecordId ? `acpxRecordId=${acpxRecordId}` : null,
acpxSessionId ? `acpxSessionId=${acpxSessionId}` : null,
pid != null ? `pid=${pid}` : null,
]
.filter(Boolean)
.join(" ");
return {
summary,
...(acpxRecordId ? { acpxRecordId } : {}),
...(acpxSessionId ? { backendSessionId: acpxSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
details: detail,
};
}
async setMode(input: { handle: AcpRuntimeHandle; mode: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
const mode = asTrimmedString(input.mode);
if (!mode) {
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP runtime mode is required.");
}
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "set-mode", mode, "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
});
}
async setConfigOption(input: {
handle: AcpRuntimeHandle;
key: string;
value: string;
}): Promise<void> {
const state = this.resolveHandleState(input.handle);
const key = asTrimmedString(input.key);
const value = asTrimmedString(input.value);
if (!key || !value) {
throw new AcpRuntimeError("ACP_TURN_FAILED", "ACP config option key/value are required.");
}
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "set", key, value, "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
});
}
async doctor(): Promise<AcpRuntimeDoctorReport> {
const versionCheck = await checkPinnedAcpxVersion({
command: this.config.command,
cwd: this.config.cwd,
expectedVersion: ACPX_PINNED_VERSION,
});
if (!versionCheck.ok) {
this.healthy = false;
const details = [
`expected=${versionCheck.expectedVersion}`,
versionCheck.installedVersion ? `installed=${versionCheck.installedVersion}` : null,
].filter((detail): detail is string => Boolean(detail));
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: versionCheck.message,
installCommand: versionCheck.installCommand,
details,
};
}
try {
const result = await spawnAndCollect({
command: this.config.command,
args: ["--help"],
cwd: this.config.cwd,
});
if (result.error) {
const spawnFailure = resolveSpawnFailure(result.error, this.config.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: `acpx command not found: ${this.config.command}`,
installCommand: ACPX_LOCAL_INSTALL_COMMAND,
};
}
if (spawnFailure === "missing-cwd") {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: `ACP runtime working directory does not exist: ${this.config.cwd}`,
};
}
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: result.error.message,
details: [String(result.error)],
};
}
if ((result.code ?? 0) !== 0) {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: result.stderr.trim() || `acpx exited with code ${result.code ?? "unknown"}`,
};
}
this.healthy = true;
return {
ok: true,
message: `acpx command available (${this.config.command}, version ${versionCheck.version})`,
};
} catch (error) {
this.healthy = false;
return {
ok: false,
code: "ACP_BACKEND_UNAVAILABLE",
message: error instanceof Error ? error.message : String(error),
};
}
}
async cancel(input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "cancel", "--session", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
}
async close(input: { handle: AcpRuntimeHandle; reason: string }): Promise<void> {
const state = this.resolveHandleState(input.handle);
await this.runControlCommand({
args: this.buildControlArgs({
cwd: state.cwd,
command: [state.agent, "sessions", "close", state.name],
}),
cwd: state.cwd,
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
}
private resolveHandleState(handle: AcpRuntimeHandle): AcpxHandleState {
const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName);
if (decoded) {
return decoded;
}
const legacyName = asTrimmedString(handle.runtimeSessionName);
if (!legacyName) {
throw new AcpRuntimeError(
"ACP_SESSION_INIT_FAILED",
"Invalid acpx runtime handle: runtimeSessionName is missing.",
);
}
return {
name: legacyName,
agent: deriveAgentFromSessionKey(handle.sessionKey, DEFAULT_AGENT_FALLBACK),
cwd: this.config.cwd,
mode: "persistent",
};
}
private buildControlArgs(params: { cwd: string; command: string[] }): string[] {
return ["--format", "json", "--json-strict", "--cwd", params.cwd, ...params.command];
}
private buildPromptArgs(params: { agent: string; sessionName: string; cwd: string }): string[] {
const args = [
"--format",
"json",
"--json-strict",
"--cwd",
params.cwd,
...buildPermissionArgs(this.config.permissionMode),
"--non-interactive-permissions",
this.config.nonInteractivePermissions,
];
if (this.config.timeoutSeconds) {
args.push("--timeout", String(this.config.timeoutSeconds));
}
args.push("--ttl", String(this.queueOwnerTtlSeconds));
args.push(params.agent, "prompt", "--session", params.sessionName, "--file", "-");
return args;
}
private async runControlCommand(params: {
args: string[];
cwd: string;
fallbackCode: AcpRuntimeErrorCode;
ignoreNoSession?: boolean;
}): Promise<AcpxJsonObject[]> {
const result = await spawnAndCollect({
command: this.config.command,
args: params.args,
cwd: params.cwd,
});
if (result.error) {
const spawnFailure = resolveSpawnFailure(result.error, params.cwd);
if (spawnFailure === "missing-command") {
this.healthy = false;
throw new AcpRuntimeError(
"ACP_BACKEND_UNAVAILABLE",
`acpx command not found: ${this.config.command}`,
{ cause: result.error },
);
}
if (spawnFailure === "missing-cwd") {
throw new AcpRuntimeError(
params.fallbackCode,
`ACP runtime working directory does not exist: ${params.cwd}`,
{ cause: result.error },
);
}
throw new AcpRuntimeError(params.fallbackCode, result.error.message, { cause: result.error });
}
const events = parseJsonLines(result.stdout);
const errorEvent = events.map((event) => toAcpxErrorEvent(event)).find(Boolean) ?? null;
if (errorEvent) {
if (params.ignoreNoSession && errorEvent.code === "NO_SESSION") {
return events;
}
throw new AcpRuntimeError(
params.fallbackCode,
errorEvent.code ? `${errorEvent.code}: ${errorEvent.message}` : errorEvent.message,
);
}
if ((result.code ?? 0) !== 0) {
throw new AcpRuntimeError(
params.fallbackCode,
result.stderr.trim() || `acpx exited with code ${result.code ?? "unknown"}`,
);
}
return events;
}
}