fix(acpx): harden session lifecycle cleanup

Harden ACPX process cleanup with lease-backed ownership verification, startup orphan reaping, reusable cancel semantics, and spawned-session visibility fixes.
This commit is contained in:
Peter Steinberger
2026-05-07 07:30:37 +01:00
committed by GitHub
parent 5b9672b4bb
commit 42ecd5d95e
28 changed files with 3353 additions and 95 deletions

View File

@@ -23,6 +23,9 @@ Docs: https://docs.openclaw.ai
- Codex/approvals: in Codex approval modes, stop installing the pre-guardian native `PermissionRequest` hook by default so Codex's reviewer can approve safe commands before OpenClaw surfaces an approval, remember `allow-always` decisions for identical Codex native `PermissionRequest` payloads within the active session window, and make plugin approval requests validate/render their actual allowed decisions so Telegram and other native approval UIs cannot offer stale actions. Thanks @shakkernerd.
- PR triage: mark external pull requests with `proof: supplied` when Barnacle finds structured real behavior proof, keep stale negative proof labels in sync across CRLF-edited PR bodies, and let ClawSweeper own the stronger `proof: sufficient` judgement.
- Sessions CLI: show the selected agent runtime in the `openclaw sessions` table so terminal output matches the runtime visibility already present in JSON/status surfaces. Thanks @vincentkoc.
- ACPX/Codex: preserve trusted Codex project declarations when launching isolated Codex ACP sessions, avoiding interactive trust prompts in headless runs. Thanks @Stedyclaw.
- ACPX/Codex: reap stale OpenClaw-owned ACPX/Codex ACP process trees on startup and after ACP session close, preventing orphaned harness processes from slowing the Gateway. Thanks @91wan.
- ACP sessions: allow parent agents to inspect and message their own spawned cross-agent ACP sessions without enabling broad agent-to-agent visibility. Thanks @barronlroth.
- Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface.
- Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids.
- Logging/Talk: route shared Talk lifecycle events into bounded file and OTLP log records while keeping transcript text, audio payloads, turn ids, call ids, and provider item ids out of logs.

View File

@@ -31,6 +31,10 @@
"source": "Message lifecycle refactor",
"target": "消息生命周期重构"
},
{
"source": "ACP lifecycle refactor",
"target": "ACP 生命周期重构"
},
{
"source": "Channel message API",
"target": "频道消息 API"

View File

@@ -1208,6 +1208,7 @@
"plugins/sdk-channel-plugins",
"plugins/sdk-channel-message",
"plugins/sdk-provider-plugins",
"plugins/cli-backend-plugins",
"plugins/adding-capabilities",
"plugins/compatibility",
"plugins/sdk-migration"

View File

@@ -23,6 +23,12 @@ If you want a full harness runtime with ACP session controls, background tasks,
thread/conversation binding, and persistent external coding sessions, use
[ACP Agents](/tools/acp-agents) instead. CLI backends are not ACP.
<Tip>
Building a new backend plugin? Use
[CLI backend plugins](/plugins/cli-backend-plugins). This page is for users
configuring and operating an already registered backend.
</Tip>
## Beginner-friendly quick start
You can use Codex CLI **without any config** (the bundled OpenAI plugin

View File

@@ -35,6 +35,9 @@ install from npm during the launch cutover.
<Card title="Provider plugin" icon="cpu" href="/plugins/sdk-provider-plugins">
Add a model provider (LLM, proxy, or custom endpoint)
</Card>
<Card title="CLI backend plugin" icon="terminal" href="/plugins/cli-backend-plugins">
Map a local AI CLI into OpenClaw's text fallback runner
</Card>
<Card title="Tool / hook plugin" icon="wrench" href="/plugins/hooks">
Register agent tools, event hooks, or services - continue below
</Card>
@@ -160,7 +163,7 @@ A single plugin can register any number of capabilities via the `api` object:
| Capability | Registration method | Detailed guide |
| ---------------------- | ------------------------------------------------ | ------------------------------------------------------------------------------- |
| Text inference (LLM) | `api.registerProvider(...)` | [Provider Plugins](/plugins/sdk-provider-plugins) |
| CLI inference backend | `api.registerCliBackend(...)` | [CLI Backends](/gateway/cli-backends) |
| CLI inference backend | `api.registerCliBackend(...)` | [CLI Backend Plugins](/plugins/cli-backend-plugins) |
| Channel / messaging | `api.registerChannel(...)` | [Channel Plugins](/plugins/sdk-channel-plugins) |
| Speech (TTS/STT) | `api.registerSpeechProvider(...)` | [Provider Plugins](/plugins/sdk-provider-plugins#step-5-add-extra-capabilities) |
| Realtime transcription | `api.registerRealtimeTranscriptionProvider(...)` | [Provider Plugins](/plugins/sdk-provider-plugins#step-5-add-extra-capabilities) |
@@ -382,6 +385,9 @@ reserved surfaces, not as the default pattern for new third-party plugins.
<Card title="Provider Plugins" icon="cpu" href="/plugins/sdk-provider-plugins">
Build a model provider plugin
</Card>
<Card title="CLI Backend Plugins" icon="terminal" href="/plugins/cli-backend-plugins">
Register a local AI CLI backend
</Card>
<Card title="SDK Overview" icon="book-open" href="/plugins/sdk-overview">
Import map and registration API reference
</Card>

View File

@@ -0,0 +1,310 @@
---
summary: "Build a plugin that registers a local AI CLI backend"
title: "Building CLI backend plugins"
sidebarTitle: "CLI backend plugins"
read_when:
- You are building a local AI CLI backend plugin
- You want to register a backend for model refs such as acme-cli/model
- You need to map a third-party CLI into OpenClaw's text fallback runner
---
CLI backend plugins let OpenClaw call a local AI CLI as a text inference
backend. The backend appears as a provider prefix in model refs:
```text
acme-cli/acme-large
```
Use a CLI backend when the upstream integration is already exposed as a local
command, when the CLI owns local login state, or when the CLI is a useful
fallback if API providers are unavailable.
<Info>
If the upstream service exposes a normal HTTP model API, write a
[provider plugin](/plugins/sdk-provider-plugins) instead. If the upstream
runtime owns complete agent sessions, tool events, compaction, or background
task state, use an [agent harness](/plugins/sdk-agent-harness).
</Info>
## What the plugin owns
A CLI backend plugin has three contracts:
| Contract | File | Purpose |
| -------------------- | ---------------------- | --------------------------------------------------------- |
| Package entry | `package.json` | Points OpenClaw at the plugin runtime module |
| Manifest ownership | `openclaw.plugin.json` | Declares the backend id before runtime loads |
| Runtime registration | `index.ts` | Calls `api.registerCliBackend(...)` with command defaults |
The manifest is discovery metadata. It does not execute the CLI and does not
register runtime behavior. Runtime behavior starts when the plugin entry calls
`api.registerCliBackend(...)`.
## Minimal backend plugin
<Steps>
<Step title="Create package metadata">
```json package.json
{
"name": "@acme/openclaw-acme-cli",
"version": "1.0.0",
"type": "module",
"openclaw": {
"extensions": ["./index.ts"],
"compat": {
"pluginApi": ">=2026.3.24-beta.2",
"minGatewayVersion": "2026.3.24-beta.2"
},
"build": {
"openclawVersion": "2026.3.24-beta.2",
"pluginSdkVersion": "2026.3.24-beta.2"
}
},
"dependencies": {
"openclaw": "^2026.3.24"
},
"devDependencies": {
"typescript": "^5.9.0"
}
}
```
Published packages must ship built JavaScript runtime files. If your source
entry is `./src/index.ts`, add `openclaw.runtimeExtensions` that points at
the built JavaScript peer. See [Entry points](/plugins/sdk-entrypoints).
</Step>
<Step title="Declare backend ownership">
```json openclaw.plugin.json
{
"id": "acme-cli",
"name": "Acme CLI",
"description": "Run Acme's local AI CLI through OpenClaw",
"cliBackends": ["acme-cli"],
"setup": {
"cliBackends": ["acme-cli"],
"requiresRuntime": false
},
"activation": {
"onStartup": false
},
"configSchema": {
"type": "object",
"additionalProperties": false
}
}
```
`cliBackends` is the runtime ownership list. It lets OpenClaw auto-load the
plugin when config or model selection mentions `acme-cli/...`.
`setup.cliBackends` is the descriptor-first setup surface. Add it when
model discovery, onboarding, or status should recognize the backend without
loading plugin runtime. Use `requiresRuntime: false` only when those static
descriptors are enough for setup.
</Step>
<Step title="Register the backend">
```typescript index.ts
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
import {
CLI_FRESH_WATCHDOG_DEFAULTS,
CLI_RESUME_WATCHDOG_DEFAULTS,
type CliBackendPlugin,
} from "openclaw/plugin-sdk/cli-backend";
function buildAcmeCliBackend(): CliBackendPlugin {
return {
id: "acme-cli",
liveTest: {
defaultModelRef: "acme-cli/acme-large",
defaultImageProbe: false,
defaultMcpProbe: false,
docker: {
npmPackage: "@acme/acme-cli",
binaryName: "acme",
},
},
config: {
command: "acme",
args: ["chat", "--json"],
output: "json",
input: "stdin",
modelArg: "--model",
sessionArg: "--session",
sessionMode: "existing",
sessionIdFields: ["session_id", "conversation_id"],
systemPromptFileArg: "--system-file",
systemPromptWhen: "first",
imageArg: "--image",
imageMode: "repeat",
reliability: {
watchdog: {
fresh: { ...CLI_FRESH_WATCHDOG_DEFAULTS },
resume: { ...CLI_RESUME_WATCHDOG_DEFAULTS },
},
},
serialize: true,
},
};
}
export default definePluginEntry({
id: "acme-cli",
name: "Acme CLI",
description: "Run Acme's local AI CLI through OpenClaw",
register(api) {
api.registerCliBackend(buildAcmeCliBackend());
},
});
```
The backend id must match the manifest `cliBackends` entry. The registered
`config` is only the default; user config under
`agents.defaults.cliBackends.acme-cli` is merged over it at runtime.
</Step>
</Steps>
## Config shape
`CliBackendConfig` describes how OpenClaw should launch and parse the CLI:
| Field | Use |
| ----------------------------------------- | ----------------------------------------------------------- |
| `command` | Binary name or absolute command path |
| `args` | Base argv for fresh runs |
| `resumeArgs` | Alternate argv for resumed sessions; supports `{sessionId}` |
| `output` / `resumeOutput` | Parser: `json`, `jsonl`, or `text` |
| `input` | Prompt transport: `arg` or `stdin` |
| `modelArg` | Flag used before the model id |
| `modelAliases` | Map OpenClaw model ids to CLI-native ids |
| `sessionArg` / `sessionArgs` | How to pass a session id |
| `sessionMode` | `always`, `existing`, or `none` |
| `sessionIdFields` | JSON fields OpenClaw reads from CLI output |
| `systemPromptArg` / `systemPromptFileArg` | System prompt transport |
| `systemPromptWhen` | `first`, `always`, or `never` |
| `imageArg` / `imageMode` | Image path support |
| `serialize` | Keep same-backend runs ordered |
| `reliability.watchdog` | No-output timeout tuning |
Prefer the smallest static config that matches the CLI. Add plugin callbacks
only for behavior that really belongs to the backend.
## Advanced backend hooks
`CliBackendPlugin` can also define:
| Hook | Use |
| ---------------------------------- | ------------------------------------------------------ |
| `normalizeConfig(config, context)` | Rewrite legacy user config after merge |
| `resolveExecutionArgs(ctx)` | Add request-scoped flags such as thinking effort |
| `prepareExecution(ctx)` | Create temporary auth or config bridges before launch |
| `transformSystemPrompt(ctx)` | Apply a final CLI-specific system prompt transform |
| `textTransforms` | Bidirectional prompt/output replacements |
| `defaultAuthProfileId` | Prefer a specific OpenClaw auth profile |
| `authEpochMode` | Decide how auth changes invalidate stored CLI sessions |
| `nativeToolMode` | Declare whether the CLI has always-on native tools |
| `bundleMcp` / `bundleMcpMode` | Opt into OpenClaw's loopback MCP tool bridge |
Keep these hooks provider-owned. Do not add CLI-specific branches to core when a
backend hook can express the behavior.
## MCP tool bridge
CLI backends do not receive OpenClaw tools by default. If the CLI can consume an
MCP configuration, opt in explicitly:
```typescript
return {
id: "acme-cli",
bundleMcp: true,
bundleMcpMode: "codex-config-overrides",
config: {
command: "acme",
args: ["chat", "--json"],
output: "json",
},
};
```
Supported bridge modes are:
| Mode | Use |
| ------------------------ | ---------------------------------------------------------------- |
| `claude-config-file` | CLIs that accept an MCP config file |
| `codex-config-overrides` | CLIs that accept config overrides on argv |
| `gemini-system-settings` | CLIs that read MCP settings from their system settings directory |
Only enable the bridge when the CLI can actually consume it. If the CLI has its
own built-in tool layer that cannot be disabled, set `nativeToolMode:
"always-on"` so OpenClaw can fail closed when a caller requires no native tools.
## User configuration
Users can override any backend default:
```json5
{
agents: {
defaults: {
cliBackends: {
"acme-cli": {
command: "/opt/acme/bin/acme",
args: ["chat", "--json", "--profile", "work"],
modelAliases: {
large: "acme-large-2026",
},
},
},
model: {
primary: "openai/gpt-5.5",
fallbacks: ["acme-cli/large"],
},
},
},
}
```
Document the minimum override users are likely to need. Usually that is only
`command` when the binary is outside `PATH`.
## Verification
For bundled plugins, add a focused test around the builder and setup
registration, then run the plugin's targeted test lane:
```bash
pnpm test extensions/acme-cli
```
For local or installed plugins, verify discovery and one real model run:
```bash
openclaw plugins inspect acme-cli --runtime --json
openclaw agent --message "reply exactly: backend ok" --model acme-cli/acme-large
```
If the backend supports images or MCP, add a live smoke that proves those paths
with the real CLI. Do not rely on static inspection for prompt, image, MCP, or
session-resume behavior.
## Checklist
<Check>`package.json` has `openclaw.extensions` and built runtime entries for published packages</Check>
<Check>`openclaw.plugin.json` declares `cliBackends` and intentional `activation.onStartup`</Check>
<Check>`setup.cliBackends` is present when setup/model discovery should see the backend cold</Check>
<Check>`api.registerCliBackend(...)` uses the same backend id as the manifest</Check>
<Check>User overrides under `agents.defaults.cliBackends.<id>` still win</Check>
<Check>Session, system prompt, image, and output parser settings match the real CLI contract</Check>
<Check>Targeted tests and at least one live CLI smoke prove the backend path</Check>
## Related
- [CLI backends](/gateway/cli-backends) - user configuration and runtime behavior
- [Building plugins](/plugins/building-plugins) - package and manifest basics
- [Plugin SDK overview](/plugins/sdk-overview) - registration API reference
- [Plugin manifest](/plugins/manifest) - `cliBackends` and setup descriptors
- [Agent harness](/plugins/sdk-agent-harness) - full external agent runtimes

View File

@@ -20,7 +20,7 @@ reference for **what to import** and **what you can register**.
</Note>
<Tip>
Looking for a how-to guide instead? Start with [Building plugins](/plugins/building-plugins), use [Channel plugins](/plugins/sdk-channel-plugins) for channel plugins, [Provider plugins](/plugins/sdk-provider-plugins) for provider plugins, and [Plugin hooks](/plugins/hooks) for tool or lifecycle hook plugins.
Looking for a how-to guide instead? Start with [Building plugins](/plugins/building-plugins), use [Channel plugins](/plugins/sdk-channel-plugins) for channel plugins, [Provider plugins](/plugins/sdk-provider-plugins) for provider plugins, [CLI backend plugins](/plugins/cli-backend-plugins) for local AI CLI backends, and [Plugin hooks](/plugins/hooks) for tool or lifecycle hook plugins.
</Tip>
## Import convention
@@ -261,6 +261,9 @@ AI CLI backend such as `codex-cli`.
the CLI dialect, such as mapping OpenClaw thinking levels to a native effort
flag.
For an end-to-end authoring guide, see
[CLI backend plugins](/plugins/cli-backend-plugins).
### Exclusive slots
| Method | What it registers |

298
docs/refactor/acp.md Normal file
View File

@@ -0,0 +1,298 @@
---
summary: "Migration plan for making ACP session and ACPX process ownership explicit"
read_when:
- Refactoring ACP session lifecycle or ACPX process cleanup
- Debugging ACPX orphan processes, PID reuse, or multi-gateway cleanup safety
- Changing sessions_list visibility for spawned ACP or subagent sessions
- Designing ownership metadata for background tasks, ACP sessions, or process leases
title: "ACP lifecycle refactor"
sidebarTitle: "ACP lifecycle refactor"
---
ACP lifecycle currently works, but too much of it is inferred after the fact.
Process cleanup reconstructs ownership from PIDs, command strings, wrapper
paths, and the live process table. Session visibility reconstructs ownership
from session-key strings plus secondary `sessions.list({ spawnedBy })` lookups.
That makes narrow fixes possible, but it also makes edge cases easy to miss:
PID reuse, quoted commands, adapter grandchildren, multi-gateway state roots,
`cancel` versus `close`, and `tree` versus `all` visibility all become separate
places to rediscover the same ownership rules.
This refactor makes ownership first-class. The goal is not a new ACP product
surface; it is a safer internal contract for the existing ACP and ACPX behavior.
## Goals
- Cleanup never signals a process unless current live evidence matches an
OpenClaw-owned lease.
- `cancel`, `close`, and startup reaping have distinct lifecycle intents.
- `sessions_list`, `sessions_history`, `sessions_send`, and status checks use
the same requester-owned session model.
- Multi-gateway installs cannot reap each other's ACPX wrappers.
- Old ACPX session records keep working during migration.
- The runtime remains plugin-owned; core does not learn ACPX package details.
## Non-goals
- Replacing ACPX or changing the public `/acp` command surface.
- Moving vendor-specific ACP adapter behavior into core.
- Requiring users to manually clean state before upgrading.
- Making `cancel` close reusable ACP sessions.
## Target Model
### Gateway Instance Identity
Each Gateway process should have a stable runtime instance id:
```ts
type GatewayInstanceId = string;
```
It can be generated on Gateway startup and persisted in state for the life of
that install. It is not a security secret; it is an ownership discriminator used
to avoid confusing one Gateway's ACP processes with another Gateway's processes.
### ACP Session Ownership
Every spawned ACP session should have normalized ownership metadata:
```ts
type AcpSessionOwner = {
sessionKey: string;
spawnedBy?: string;
parentSessionKey?: string;
ownerSessionKey: string;
agentId: string;
backend: "acpx";
gatewayInstanceId: GatewayInstanceId;
createdAt: number;
};
```
The Gateway should return these fields on session rows where they are known.
Visibility filtering should be a pure check over row metadata:
```ts
canSeeSessionRow({
row,
requesterSessionKey,
visibility,
a2aPolicy,
});
```
That removes hidden secondary `sessions.list({ spawnedBy })` calls from
visibility checks. A spawned cross-agent ACP child is requester-owned because
the row says so, not because a second query happens to find it.
### ACPX Process Leases
Every generated wrapper launch should create a lease record:
```ts
type AcpxProcessLease = {
leaseId: string;
gatewayInstanceId: GatewayInstanceId;
sessionKey: string;
wrapperRoot: string;
wrapperPath: string;
rootPid: number;
processGroupId?: number;
commandHash: string;
startedAt: number;
state: "open" | "closing" | "closed" | "lost";
};
```
The wrapper process should receive the lease id and gateway instance id in its
environment:
```sh
OPENCLAW_ACPX_LEASE_ID=...
OPENCLAW_GATEWAY_INSTANCE_ID=...
```
When the platform allows it, verification should prefer live process metadata
that cannot be confused by command quoting:
- root PID still exists
- live wrapper path is under `wrapperRoot`
- process group matches the lease when available
- environment contains the expected lease id when readable
- command hash or executable path matches the lease
If the live process cannot be verified, cleanup fails closed.
## Lifecycle Controller
Introduce one ACPX lifecycle controller that owns process leases and cleanup
policy:
```ts
interface AcpxLifecycleController {
ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle>;
cancelTurn(handle: AcpRuntimeHandle): Promise<void>;
closeSession(input: {
handle: AcpRuntimeHandle;
discardPersistentState?: boolean;
reason?: string;
}): Promise<void>;
reapStartupOrphans(): Promise<void>;
verifyOwnedTree(lease: AcpxProcessLease): Promise<OwnedProcessTree | null>;
}
```
`cancelTurn` requests turn cancellation only. It must not reap reusable wrapper
or adapter processes.
`closeSession` is allowed to reap, but only after loading the session record,
loading the lease, and verifying the live process tree still belongs to that
lease.
`reapStartupOrphans` starts from open leases in state. It may use the process
table to find descendants, but it should not scan arbitrary ACP-looking
commands first and then decide they are probably ours.
## Wrapper Contract
Generated wrappers should stay small. They should:
- start the adapter in a process group where supported
- forward normal termination signals to the process group
- detect parent death
- on parent death, send SIGTERM, then keep the wrapper alive until the SIGKILL
fallback runs
- report root PID and process group id back to the lifecycle controller when
that is available
Wrappers should not decide session policy. They only enforce local process-tree
cleanup for their own adapter group.
## Session Visibility Contract
Visibility should use normalized row ownership:
```ts
type SessionVisibilityInput = {
requesterSessionKey: string;
row: {
key: string;
agentId: string;
ownerSessionKey?: string;
spawnedBy?: string;
parentSessionKey?: string;
};
visibility: "self" | "tree" | "agent" | "all";
a2aPolicy: AgentToAgentPolicy;
};
```
Rules:
- `self`: only the requester session.
- `tree`: requester session plus rows owned by or spawned from the requester.
- `all`: all same-agent rows, a2a-allowed cross-agent rows, and requester-owned
spawned cross-agent rows even when general a2a is disabled.
- `agent`: same agent only, unless an explicit owner relationship says the row
belongs to the requester.
This makes `tree` and `all` monotonic: `all` must not hide an owned child that
`tree` would show.
## Migration Plan
### Phase 1: Add Identity And Leases
- Add `gatewayInstanceId` to Gateway state.
- Add an ACPX lease store under the ACPX state directory.
- Write a lease before spawning a generated wrapper.
- Store `leaseId` on new ACPX session records.
- Keep existing PID and command fields for old records.
### Phase 2: Lease-First Cleanup
- Change close cleanup to load `leaseId` first.
- Verify live process ownership against the lease before signaling.
- Keep the current root PID and wrapper-root fallback only for legacy records.
- Mark leases `closed` after verified cleanup.
- Mark leases `lost` when the process is gone before cleanup.
### Phase 3: Lease-First Startup Reaping
- Startup reaping scans open leases.
- For each lease, verify the root process and collect descendants.
- Reap verified trees children-first.
- Expire old `closed` and `lost` leases with a bounded retention window.
- Keep command-marker scanning only as a temporary legacy fallback, guarded by
wrapper root and Gateway instance where possible.
### Phase 4: Session Ownership Rows
- Add ownership metadata to Gateway session rows.
- Teach ACPX, subagent, background-task, and session-store writers to populate
`ownerSessionKey` or `spawnedBy`.
- Convert session visibility checks to use row metadata.
- Remove visibility-time secondary `sessions.list({ spawnedBy })` lookups.
### Phase 5: Remove Legacy Heuristics
After one release window:
- stop relying on stored root command strings for non-legacy ACPX cleanup
- remove command-marker startup scans
- remove visibility fallback list lookups
- keep defensive fail-closed behavior for missing or unverifiable leases
## Tests
Add two table-driven suites.
Process lifecycle simulator:
- PID reused by unrelated process
- PID reused by another Gateway's wrapper root
- stored wrapper command is shell-quoted, live `ps` command is not
- adapter child exits, grandchild remains in the process group
- parent death SIGTERM fallback reaches SIGKILL
- process listing unavailable
- stale lease with missing process
- startup orphan with wrapper, adapter child, and grandchild
Session visibility matrix:
- `self`, `tree`, `agent`, `all`
- a2a enabled and disabled
- same-agent row
- cross-agent row
- requester-owned spawned cross-agent ACP row
- sandboxed requester clamped to `tree`
- list, history, send, and status actions
The important invariant: a requester-owned spawned child is visible wherever
the configured visibility includes the requester session tree, and `all` is not
less capable than `tree`.
## Compatibility Notes
Old session records may not have `leaseId`. They should use the legacy
fail-closed cleanup path:
- require a live root process
- require wrapper-root ownership when a generated wrapper is expected
- require command agreement for non-wrapper roots
- never signal based only on stale stored PID metadata
If a legacy record cannot be verified, leave it alone. Startup lease cleanup and
the next release window should eventually retire the fallback.
## Success Criteria
- Closing an old or stale ACPX session cannot kill another Gateway's process.
- Parent death does not leave stubborn adapter grandchildren running.
- `cancel` aborts the active turn without closing reusable sessions.
- `sessions_list` can show requester-owned cross-agent ACP children under both
`tree` and `all`.
- Startup cleanup is driven by leases, not broad command-string scans.
- The focused process and visibility matrix tests cover every edge case that
previously required one-off review fixes.

View File

@@ -60,6 +60,7 @@ an unavailable backend.
<Accordion title="First-run gotchas">
- If `plugins.allow` is set, it is a restrictive plugin inventory and **must** include `acpx`; otherwise the installed ACP backend is intentionally blocked and `/acp doctor` reports the missing allowlist entry.
- The Codex ACP adapter is staged with the `acpx` plugin and launched locally when possible.
- Codex ACP runs with an isolated `CODEX_HOME`; OpenClaw copies only trusted project entries from the host Codex config and trusts the active workspace, leaving auth, notifications, and hooks on the host config.
- Other target harness adapters may still be fetched on demand with `npx` the first time you use them.
- Vendor auth still has to exist on the host for that harness.
- If the host has no npm or network access, first-run adapter fetches fail until caches are pre-warmed or the adapter is installed another way.
@@ -154,6 +155,7 @@ Quick `/acp` flow from chat:
- Gateway commands stay local. `/acp ...`, `/status`, and `/unfocus` are never sent as normal prompt text to a bound ACP harness.
- `cancel` aborts the active turn when the backend supports cancellation; it does not delete the binding or session metadata.
- `close` ends the ACP session from OpenClaw's point of view and removes the binding. A harness may still keep its own upstream history if it supports resume.
- The acpx plugin cleans up OpenClaw-owned wrapper and adapter process trees after `close`, and reaps stale OpenClaw-owned ACPX orphans during Gateway startup.
- Idle runtime workers are eligible for cleanup after `acp.runtime.ttlMinutes`; stored session metadata remains available for `/acp sessions`.
</Accordion>
@@ -830,7 +832,7 @@ permission modes, see
| Missing ACP metadata for bound session | Stale/deleted ACP session metadata. | Recreate with `/acp spawn`, then rebind/focus thread. |
| `AcpRuntimeError: Permission prompt unavailable in non-interactive mode` | `permissionMode` blocks writes/exec in non-interactive ACP session. | Set `plugins.entries.acpx.config.permissionMode` to `approve-all` and restart gateway. See [Permission configuration](/tools/acp-agents-setup#permission-configuration). |
| ACP session fails early with little output | Permission prompts are blocked by `permissionMode`/`nonInteractivePermissions`. | Check gateway logs for `AcpRuntimeError`. For full permissions, set `permissionMode=approve-all`; for graceful degradation, set `nonInteractivePermissions=deny`. |
| ACP session stalls indefinitely after completing work | Harness process finished but ACP session did not report completion. | Monitor with `ps aux \| grep acpx`; kill stale processes manually. |
| ACP session stalls indefinitely after completing work | Harness process finished but ACP session did not report completion. | Update OpenClaw; current acpx cleanup reaps OpenClaw-owned stale wrapper and adapter processes on close and Gateway startup. |
| Harness sees `<<<BEGIN_OPENCLAW_INTERNAL_CONTEXT>>>` | Internal event envelope leaked across the ACP boundary. | Update OpenClaw and rerun the completion flow; external harnesses should receive plain completion prompts only. |
## Related

View File

@@ -210,6 +210,34 @@ describe("prepareAcpxCodexAuthConfig", () => {
expect(wrapper).toContain("defaultArgs = [installedBinPath]");
});
it("keeps the orphaned wrapper alive long enough to force-kill the child process group", async () => {
const root = await makeTempDir();
const stateDir = path.join(root, "state");
const generated = generatedCodexPaths(stateDir);
const pluginConfig = resolveAcpxPluginConfig({
rawConfig: {},
workspaceDir: root,
});
await prepareAcpxCodexAuthConfig({
pluginConfig,
stateDir,
});
const wrapper = await fs.readFile(generated.wrapperPath, "utf8");
expect(wrapper).toContain('killChildTree("SIGTERM")');
expect(wrapper).toContain('killChildTree("SIGKILL", { force: true })');
expect(wrapper).toMatch(
/forceKillTimer = setTimeout\(\(\) => \{\s*killChildTree\("SIGKILL", \{ force: true \}\);\s*process\.exit\(1\);/s,
);
expect(wrapper).toMatch(
/child\.on\("exit", \(code, signal\) => \{\s*if \(parentWatcher\) \{\s*clearInterval\(parentWatcher\);\s*\}\s*if \(orphanCleanupStarted\) \{\s*return;\s*\}/s,
);
expect(wrapper).not.toMatch(
/forceKillTimer = setTimeout\(\(\) => killChildTree\("SIGKILL"\), 1_500\);\s*forceKillTimer\.unref\?\.\(\);\s*process\.exit\(1\);/s,
);
});
it("uses the bundled Claude ACP dependency by default when it is installed", async () => {
const root = await makeTempDir();
const stateDir = path.join(root, "state");
@@ -251,9 +279,19 @@ describe("prepareAcpxCodexAuthConfig", () => {
resolveInstalledCodexAcpBinPath: async () => installedBinPath,
});
const { stdout } = await execFileAsync(process.execPath, [generated.wrapperPath], {
cwd: root,
});
const { stdout } = await execFileAsync(
process.execPath,
[
generated.wrapperPath,
"--openclaw-acpx-lease-id",
"lease-1",
"--openclaw-gateway-instance-id",
"gateway-1",
],
{
cwd: root,
},
);
const launched = JSON.parse(stdout.trim()) as { argv?: unknown; codexHome?: unknown };
expect(launched.argv).toEqual([]);
const expectedCodexHome = await fs.realpath(path.join(stateDir, "acpx", "codex-home"));
@@ -326,6 +364,8 @@ describe("prepareAcpxCodexAuthConfig", () => {
const isolatedConfig = await fs.readFile(generated.configPath, "utf8");
expect(isolatedConfig).not.toContain("notify");
expect(isolatedConfig).not.toContain("SkyComputerUseClient");
expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(root))}]`);
expect(isolatedConfig).toContain('trust_level = "trusted"');
const wrapper = await fs.readFile(generated.wrapperPath, "utf8");
expect(wrapper).toContain("CODEX_HOME: codexHome");
expect(wrapper).not.toContain(sourceCodexHome);
@@ -337,6 +377,50 @@ describe("prepareAcpxCodexAuthConfig", () => {
).rejects.toMatchObject({ code: "ENOENT" });
});
it("copies only trusted Codex project declarations into the isolated Codex home", async () => {
const root = await makeTempDir();
const sourceCodexHome = path.join(root, "source-codex");
const stateDir = path.join(root, "state");
const explicitProject = path.join(root, "explicit project");
const inlineProject = path.join(root, "inline-project");
const mapProject = path.join(root, "map-project");
const untrustedProject = path.join(root, "untrusted-project");
const generated = generatedCodexPaths(stateDir);
await fs.mkdir(sourceCodexHome, { recursive: true });
await fs.writeFile(
path.join(sourceCodexHome, "config.toml"),
[
'notify = ["SkyComputerUseClient", "turn-ended"]',
`projects = { ${JSON.stringify(mapProject)} = { trust_level = "trusted" }, ${JSON.stringify(untrustedProject)} = { trust_level = "untrusted" } }`,
"[projects]",
`${JSON.stringify(inlineProject)} = { trust_level = "trusted" }`,
`[projects.${JSON.stringify(explicitProject)}]`,
'trust_level = "trusted"',
"",
].join("\n"),
);
process.env.CODEX_HOME = sourceCodexHome;
const pluginConfig = resolveAcpxPluginConfig({
rawConfig: {},
workspaceDir: root,
});
await prepareAcpxCodexAuthConfig({
pluginConfig,
stateDir,
resolveInstalledCodexAcpBinPath: async () => undefined,
});
const isolatedConfig = await fs.readFile(generated.configPath, "utf8");
expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(root))}]`);
expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(explicitProject))}]`);
expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(inlineProject))}]`);
expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(mapProject))}]`);
expect(isolatedConfig).not.toContain(untrustedProject);
expect(isolatedConfig).not.toContain("notify");
expect(isolatedConfig).not.toContain("SkyComputerUseClient");
});
it("normalizes an explicitly configured Codex ACP command to the local wrapper", async () => {
const root = await makeTempDir();
const sourceCodexHome = path.join(root, "source-codex");

View File

@@ -1,10 +1,16 @@
import fsSync from "node:fs";
import fs from "node:fs/promises";
import { createRequire } from "node:module";
import os from "node:os";
import path from "node:path";
import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store";
import {
extractTrustedCodexProjectPaths,
renderIsolatedCodexProjectTrustConfig,
} from "./codex-trust-config.js";
import { resolveAcpxPluginRoot } from "./config.js";
import type { ResolvedAcpxPluginConfig } from "./config.js";
import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js";
const CODEX_ACP_PACKAGE = "@zed-industries/codex-acp";
const CODEX_ACP_BIN = "codex-acp";
@@ -156,7 +162,25 @@ import { spawn } from "node:child_process";
import { fileURLToPath } from "node:url";
${params.envSetup}
const configuredArgs = process.argv.slice(2);
const openClawWrapperArgs = new Set([
${quoteCommandPart(OPENCLAW_ACPX_LEASE_ID_ARG)},
${quoteCommandPart(OPENCLAW_GATEWAY_INSTANCE_ID_ARG)},
]);
function stripOpenClawWrapperArgs(args) {
const stripped = [];
for (let index = 0; index < args.length; index += 1) {
const value = args[index];
if (openClawWrapperArgs.has(value)) {
index += 1;
continue;
}
stripped.push(value);
}
return stripped;
}
const configuredArgs = stripOpenClawWrapperArgs(process.argv.slice(2));
function resolveNpmCliPath() {
const candidate = path.resolve(
@@ -198,23 +222,78 @@ if (!command) {
}
const child = spawn(command, args, {
detached: process.platform !== "win32",
env,
stdio: "inherit",
windowsHide: true,
});
let forceKillTimer;
let orphanCleanupStarted = false;
function killChildTree(signal, options = {}) {
if (!child.pid || (!options.force && child.killed)) {
return;
}
if (process.platform !== "win32") {
try {
// The adapter can spawn grandchildren; signaling the process group keeps
// the generated wrapper from leaving an ACP tree behind.
process.kill(-child.pid, signal);
return;
} catch {
// Fall back to direct child signaling below.
}
}
child.kill(signal);
}
for (const signal of ["SIGINT", "SIGTERM", "SIGHUP"]) {
process.once(signal, () => {
child.kill(signal);
killChildTree(signal);
});
}
const originalParentPid = process.ppid;
const parentWatcher =
process.platform === "win32"
? undefined
: setInterval(() => {
if (process.ppid === originalParentPid || process.ppid !== 1) {
return;
}
if (orphanCleanupStarted) {
return;
}
orphanCleanupStarted = true;
if (parentWatcher) {
clearInterval(parentWatcher);
}
killChildTree("SIGTERM");
// Keep the wrapper alive long enough for stubborn adapters to receive
// a forced fallback signal after SIGTERM.
forceKillTimer = setTimeout(() => {
killChildTree("SIGKILL", { force: true });
process.exit(1);
}, 1_500);
}, 1_000);
parentWatcher?.unref?.();
child.on("error", (error) => {
console.error(\`[openclaw] failed to launch ${params.displayName} ACP wrapper: \${error.message}\`);
process.exit(1);
});
child.on("exit", (code, signal) => {
if (parentWatcher) {
clearInterval(parentWatcher);
}
if (orphanCleanupStarted) {
return;
}
if (forceKillTimer) {
clearTimeout(forceKillTimer);
}
if (code !== null) {
process.exit(code);
}
@@ -250,12 +329,32 @@ function buildClaudeAcpWrapperScript(installedBinPath?: string): string {
});
}
async function prepareIsolatedCodexHome(baseDir: string): Promise<string> {
const codexHome = path.join(baseDir, "codex-home");
async function readSourceCodexConfig(codexHome: string): Promise<string | undefined> {
try {
return await fs.readFile(path.join(codexHome, "config.toml"), "utf8");
} catch (error) {
if ((error as NodeJS.ErrnoException).code === "ENOENT") {
return undefined;
}
throw error;
}
}
async function prepareIsolatedCodexHome(params: {
baseDir: string;
workspaceDir: string;
}): Promise<string> {
const sourceCodexHome = process.env.CODEX_HOME || path.join(os.homedir(), ".codex");
const sourceConfig = await readSourceCodexConfig(sourceCodexHome);
const trustedProjectPaths = [
...(sourceConfig ? extractTrustedCodexProjectPaths(sourceConfig) : []),
params.workspaceDir,
];
const codexHome = path.join(params.baseDir, "codex-home");
await fs.mkdir(codexHome, { recursive: true });
await fs.writeFile(
path.join(codexHome, "config.toml"),
"# Generated by OpenClaw for Codex ACP sessions.\n",
renderIsolatedCodexProjectTrustConfig(trustedProjectPaths),
"utf8",
);
return codexHome;
@@ -383,7 +482,10 @@ export async function prepareAcpxCodexAuthConfig(params: {
}): Promise<ResolvedAcpxPluginConfig> {
void params.logger;
const codexBaseDir = path.join(params.stateDir, "acpx");
await prepareIsolatedCodexHome(codexBaseDir);
await prepareIsolatedCodexHome({
baseDir: codexBaseDir,
workspaceDir: params.pluginConfig.cwd,
});
const installedCodexBinPath = await (
params.resolveInstalledCodexAcpBinPath ?? resolveInstalledCodexAcpBinPath
)();

View File

@@ -0,0 +1,181 @@
import path from "node:path";
function stripTomlComment(line: string): string {
let quote: "'" | '"' | null = null;
let escaping = false;
for (let index = 0; index < line.length; index += 1) {
const ch = line[index];
if (escaping) {
escaping = false;
continue;
}
if (quote === '"' && ch === "\\") {
escaping = true;
continue;
}
if (quote) {
if (ch === quote) {
quote = null;
}
continue;
}
if (ch === "'" || ch === '"') {
quote = ch;
continue;
}
if (ch === "#") {
return line.slice(0, index);
}
}
return line;
}
function parseTomlString(value: string): string | undefined {
const trimmed = value.trim();
if (trimmed.startsWith('"') && trimmed.endsWith('"')) {
try {
return JSON.parse(trimmed) as string;
} catch {
return undefined;
}
}
if (trimmed.startsWith("'") && trimmed.endsWith("'")) {
return trimmed.slice(1, -1);
}
return undefined;
}
function parseTomlDottedKey(value: string): string[] {
const parts: string[] = [];
let current = "";
let quote: "'" | '"' | null = null;
let escaping = false;
for (const ch of value.trim()) {
if (escaping) {
current += ch;
escaping = false;
continue;
}
if (quote === '"' && ch === "\\") {
current += ch;
escaping = true;
continue;
}
if (quote) {
current += ch;
if (ch === quote) {
quote = null;
}
continue;
}
if (ch === "'" || ch === '"') {
quote = ch;
current += ch;
continue;
}
if (ch === ".") {
parts.push(current.trim());
current = "";
continue;
}
current += ch;
}
if (current.trim()) {
parts.push(current.trim());
}
return parts.map((part) => parseTomlString(part) ?? part);
}
function parseProjectHeader(line: string): string | undefined {
const trimmed = line.trim();
if (!trimmed.startsWith("[") || !trimmed.endsWith("]") || trimmed.startsWith("[[")) {
return undefined;
}
const parts = parseTomlDottedKey(trimmed.slice(1, -1));
return parts.length === 2 && parts[0] === "projects" ? parts[1] : undefined;
}
function parseTrustedInlineProjectEntries(value: string): string[] {
const trusted: string[] = [];
const entryPattern =
/(?<key>"(?:\\.|[^"\\])*"|'[^']*'|[A-Za-z0-9_\-/.~:]+)\s*=\s*\{(?<body>[^{}]*(?:\{[^{}]*\}[^{}]*)*)\}/g;
for (const match of value.matchAll(entryPattern)) {
const key = match.groups?.key;
const body = match.groups?.body;
if (!key || !body || !/\btrust_level\s*=\s*["']trusted["']/.test(body)) {
continue;
}
const projectPath = parseTomlString(key) ?? key.trim();
if (projectPath) {
trusted.push(projectPath);
}
}
return trusted;
}
export function extractTrustedCodexProjectPaths(configToml: string): string[] {
const trusted = new Set<string>();
let currentProjectPath: string | undefined;
let inProjectsTable = false;
for (const rawLine of configToml.split(/\r?\n/)) {
const line = stripTomlComment(rawLine).trim();
if (!line) {
continue;
}
if (line.startsWith("[")) {
currentProjectPath = parseProjectHeader(line);
inProjectsTable = line === "[projects]";
continue;
}
if (currentProjectPath && /^trust_level\s*=\s*["']trusted["']\s*$/.test(line)) {
trusted.add(currentProjectPath);
continue;
}
const assignment =
/^(?<key>"(?:\\.|[^"\\])*"|'[^']*'|[A-Za-z0-9_\-/.~:]+)\s*=\s*(?<value>.+)$/.exec(line);
if (!assignment?.groups) {
continue;
}
const key = parseTomlString(assignment.groups.key) ?? assignment.groups.key;
const value = assignment.groups.value.trim();
if (inProjectsTable && /^\{.*\}$/.test(value)) {
if (/\btrust_level\s*=\s*["']trusted["']/.test(value) && key) {
trusted.add(key);
}
continue;
}
if (key === "projects" || inProjectsTable) {
for (const projectPath of parseTrustedInlineProjectEntries(value)) {
trusted.add(projectPath);
}
}
}
return Array.from(trusted);
}
export function renderIsolatedCodexProjectTrustConfig(projectPaths: string[]): string {
const normalized = Array.from(
new Set(
projectPaths
.map((projectPath) => projectPath.trim())
.filter(Boolean)
.map((projectPath) => path.resolve(projectPath)),
),
).toSorted((left, right) => left.localeCompare(right));
return [
"# Generated by OpenClaw for Codex ACP sessions.",
...normalized.flatMap((projectPath) => [
"",
`[projects.${JSON.stringify(projectPath)}]`,
'trust_level = "trusted"',
]),
"",
].join("\n");
}

View File

@@ -0,0 +1,36 @@
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { createAcpxProcessLeaseStore, type AcpxProcessLease } from "./process-lease.js";
function makeLease(index: number): AcpxProcessLease {
return {
leaseId: `lease-${index}`,
gatewayInstanceId: "gateway-test",
sessionKey: `agent:codex:acp:${index}`,
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 1000 + index,
commandHash: `hash-${index}`,
startedAt: index,
state: "open",
};
}
describe("createAcpxProcessLeaseStore", () => {
it("serializes concurrent lease saves without dropping records", async () => {
const stateDir = await mkdtemp(path.join(tmpdir(), "openclaw-acpx-leases-"));
try {
const store = createAcpxProcessLeaseStore({ stateDir });
await Promise.all(Array.from({ length: 25 }, (_, index) => store.save(makeLease(index))));
const leases = await store.listOpen("gateway-test");
expect(leases.map((lease) => lease.leaseId).toSorted()).toEqual(
Array.from({ length: 25 }, (_, index) => `lease-${index}`).toSorted(),
);
} finally {
await rm(stateDir, { recursive: true, force: true });
}
});
});

View File

@@ -0,0 +1,169 @@
import { randomUUID, createHash } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
export const OPENCLAW_ACPX_LEASE_ID_ENV = "OPENCLAW_ACPX_LEASE_ID";
export const OPENCLAW_GATEWAY_INSTANCE_ID_ENV = "OPENCLAW_GATEWAY_INSTANCE_ID";
export const OPENCLAW_ACPX_LEASE_ID_ARG = "--openclaw-acpx-lease-id";
export const OPENCLAW_GATEWAY_INSTANCE_ID_ARG = "--openclaw-gateway-instance-id";
export type AcpxProcessLeaseState = "open" | "closing" | "closed" | "lost";
export type AcpxProcessLease = {
leaseId: string;
gatewayInstanceId: string;
sessionKey: string;
wrapperRoot: string;
wrapperPath: string;
rootPid: number;
processGroupId?: number;
commandHash: string;
startedAt: number;
state: AcpxProcessLeaseState;
};
export type AcpxProcessLeaseStore = {
load(leaseId: string): Promise<AcpxProcessLease | undefined>;
listOpen(gatewayInstanceId?: string): Promise<AcpxProcessLease[]>;
save(lease: AcpxProcessLease): Promise<void>;
markState(leaseId: string, state: AcpxProcessLeaseState): Promise<void>;
};
type LeaseFile = {
version: 1;
leases: AcpxProcessLease[];
};
const LEASE_FILE = "process-leases.json";
function normalizeLease(value: unknown): AcpxProcessLease | undefined {
if (typeof value !== "object" || value === null) {
return undefined;
}
const record = value as Record<string, unknown>;
if (
typeof record.leaseId !== "string" ||
typeof record.gatewayInstanceId !== "string" ||
typeof record.sessionKey !== "string" ||
typeof record.wrapperRoot !== "string" ||
typeof record.wrapperPath !== "string" ||
typeof record.rootPid !== "number" ||
typeof record.commandHash !== "string" ||
typeof record.startedAt !== "number" ||
!["open", "closing", "closed", "lost"].includes(String(record.state))
) {
return undefined;
}
return {
leaseId: record.leaseId,
gatewayInstanceId: record.gatewayInstanceId,
sessionKey: record.sessionKey,
wrapperRoot: record.wrapperRoot,
wrapperPath: record.wrapperPath,
rootPid: record.rootPid,
...(typeof record.processGroupId === "number" ? { processGroupId: record.processGroupId } : {}),
commandHash: record.commandHash,
startedAt: record.startedAt,
state: record.state as AcpxProcessLeaseState,
};
}
async function readLeaseFile(filePath: string): Promise<LeaseFile> {
const { value } = await readJsonFileWithFallback<Partial<LeaseFile>>(filePath, {
version: 1,
leases: [],
});
const leases = Array.isArray(value.leases)
? value.leases.map(normalizeLease).filter((lease): lease is AcpxProcessLease => !!lease)
: [];
return { version: 1, leases };
}
function writeLeaseFile(filePath: string, value: LeaseFile): Promise<void> {
return writeJsonFileAtomically(filePath, value);
}
export function createAcpxProcessLeaseStore(params: { stateDir: string }): AcpxProcessLeaseStore {
const filePath = path.join(params.stateDir, LEASE_FILE);
let updateQueue: Promise<void> = Promise.resolve();
async function update(
mutator: (leases: AcpxProcessLease[]) => AcpxProcessLease[],
): Promise<void> {
const run = updateQueue.then(async () => {
await fs.mkdir(params.stateDir, { recursive: true });
const current = await readLeaseFile(filePath);
await writeLeaseFile(filePath, {
version: 1,
leases: mutator(current.leases),
});
});
updateQueue = run.catch(() => {});
await run;
}
async function readCurrent(): Promise<LeaseFile> {
await updateQueue;
return await readLeaseFile(filePath);
}
return {
async load(leaseId) {
const current = await readCurrent();
return current.leases.find((lease) => lease.leaseId === leaseId);
},
async listOpen(gatewayInstanceId) {
const current = await readCurrent();
return current.leases.filter(
(lease) =>
(lease.state === "open" || lease.state === "closing") &&
(!gatewayInstanceId || lease.gatewayInstanceId === gatewayInstanceId),
);
},
async save(lease) {
await update((leases) => [
...leases.filter((entry) => entry.leaseId !== lease.leaseId),
lease,
]);
},
async markState(leaseId, state) {
await update((leases) =>
leases.map((lease) => (lease.leaseId === leaseId ? { ...lease, state } : lease)),
);
},
};
}
export function createAcpxProcessLeaseId(): string {
return randomUUID();
}
export function hashAcpxProcessCommand(command: string): string {
return createHash("sha256").update(command).digest("hex");
}
function quoteEnvValue(value: string): string {
return /^[A-Za-z0-9_./:=@+-]+$/.test(value) ? value : `'${value.replace(/'/g, "'\\''")}'`;
}
export function withAcpxLeaseEnvironment(params: {
command: string;
leaseId: string;
gatewayInstanceId: string;
platform?: NodeJS.Platform;
}): string {
if ((params.platform ?? process.platform) === "win32") {
return params.command;
}
return [
"env",
`${OPENCLAW_ACPX_LEASE_ID_ENV}=${quoteEnvValue(params.leaseId)}`,
`${OPENCLAW_GATEWAY_INSTANCE_ID_ENV}=${quoteEnvValue(params.gatewayInstanceId)}`,
params.command,
OPENCLAW_ACPX_LEASE_ID_ARG,
quoteEnvValue(params.leaseId),
OPENCLAW_GATEWAY_INSTANCE_ID_ARG,
quoteEnvValue(params.gatewayInstanceId),
].join(" ");
}

View File

@@ -0,0 +1,262 @@
import { describe, expect, it, vi } from "vitest";
import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js";
import {
cleanupOpenClawOwnedAcpxProcessTree,
isOpenClawOwnedAcpxProcessCommand,
reapStaleOpenClawOwnedAcpxOrphans,
type AcpxProcessInfo,
} from "./process-reaper.js";
const WRAPPER_ROOT = "/tmp/openclaw-state/acpx";
const CODEX_WRAPPER_COMMAND = `node ${WRAPPER_ROOT}/codex-acp-wrapper.mjs`;
const CODEX_WRAPPER_COMMAND_WITH_LEASE = `${CODEX_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-1 ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-1`;
const CLAUDE_WRAPPER_COMMAND = `node ${WRAPPER_ROOT}/claude-agent-acp-wrapper.mjs`;
const PLUGIN_DEPS_CODEX_COMMAND =
"node /tmp/openclaw/plugin-runtime-deps/node_modules/@zed-industries/codex-acp/bin/codex-acp.js";
function cleanupDeps(processes: AcpxProcessInfo[]) {
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
return {
killed,
deps: {
listProcesses: vi.fn(async () => processes),
killProcess: vi.fn((pid: number, signal: NodeJS.Signals) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
};
}
describe("process reaper", () => {
it("recognizes generated Codex and Claude wrappers only under the configured root", () => {
expect(
isOpenClawOwnedAcpxProcessCommand({
command: CODEX_WRAPPER_COMMAND,
wrapperRoot: WRAPPER_ROOT,
}),
).toBe(true);
expect(
isOpenClawOwnedAcpxProcessCommand({
command: CLAUDE_WRAPPER_COMMAND,
wrapperRoot: WRAPPER_ROOT,
}),
).toBe(true);
expect(
isOpenClawOwnedAcpxProcessCommand({
command: "node /tmp/other/codex-acp-wrapper.mjs",
wrapperRoot: WRAPPER_ROOT,
}),
).toBe(false);
});
it("recognizes OpenClaw plugin-runtime-deps ACP adapter children", () => {
expect(isOpenClawOwnedAcpxProcessCommand({ command: PLUGIN_DEPS_CODEX_COMMAND })).toBe(true);
expect(isOpenClawOwnedAcpxProcessCommand({ command: "npx @zed-industries/codex-acp" })).toBe(
false,
);
});
it("kills an owned recorded process tree children first", async () => {
const { deps, killed } = cleanupDeps([
{ pid: 100, ppid: 1, command: CODEX_WRAPPER_COMMAND },
{ pid: 101, ppid: 100, command: PLUGIN_DEPS_CODEX_COMMAND },
{ pid: 102, ppid: 101, command: "node child.js" },
]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 100,
rootCommand: CODEX_WRAPPER_COMMAND,
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result.skippedReason).toBeUndefined();
expect(result.inspectedPids).toEqual([100, 101, 102]);
expect(killed.slice(0, 3)).toEqual([
{ pid: 102, signal: "SIGTERM" },
{ pid: 101, signal: "SIGTERM" },
{ pid: 100, signal: "SIGTERM" },
]);
});
it("allows wrapper-root verification when stored wrapper commands are shell-quoted", async () => {
const { deps, killed } = cleanupDeps([{ pid: 110, ppid: 1, command: CODEX_WRAPPER_COMMAND }]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 110,
rootCommand: `"/usr/local/bin/node" "${WRAPPER_ROOT}/codex-acp-wrapper.mjs"`,
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result.skippedReason).toBeUndefined();
expect(killed[0]).toEqual({ pid: 110, signal: "SIGTERM" });
});
it("requires matching lease identity before killing a leased process tree", async () => {
const { deps, killed } = cleanupDeps([
{ pid: 112, ppid: 1, command: CODEX_WRAPPER_COMMAND_WITH_LEASE },
]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 112,
rootCommand: CODEX_WRAPPER_COMMAND,
expectedLeaseId: "lease-1",
expectedGatewayInstanceId: "gateway-1",
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result.skippedReason).toBeUndefined();
expect(killed[0]).toEqual({ pid: 112, signal: "SIGTERM" });
});
it("does not kill a reused same-root wrapper pid with a different lease identity", async () => {
const { deps, killed } = cleanupDeps([
{
pid: 113,
ppid: 1,
command: `${CODEX_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} other-lease ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-1`,
},
]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 113,
rootCommand: CODEX_WRAPPER_COMMAND,
expectedLeaseId: "lease-1",
expectedGatewayInstanceId: "gateway-1",
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result).toEqual({
inspectedPids: [113],
terminatedPids: [],
skippedReason: "not-openclaw-owned",
});
expect(killed).toEqual([]);
});
it("skips recorded pid cleanup when process listing is unavailable", async () => {
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 200,
rootCommand: CODEX_WRAPPER_COMMAND,
wrapperRoot: WRAPPER_ROOT,
deps: {
listProcesses: vi.fn(async () => {
throw new Error("ps unavailable");
}),
killProcess: vi.fn((pid, signal) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
});
expect(result).toEqual({
inspectedPids: [],
terminatedPids: [],
skippedReason: "unverified-root",
});
expect(killed).toEqual([]);
});
it("does not kill a reused pid when the live command is not OpenClaw-owned", async () => {
const { deps, killed } = cleanupDeps([{ pid: 250, ppid: 1, command: "node unrelated.js" }]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 250,
rootCommand: CODEX_WRAPPER_COMMAND,
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result).toEqual({
inspectedPids: [250],
terminatedPids: [],
skippedReason: "not-openclaw-owned",
});
expect(killed).toEqual([]);
});
it("does not kill a reused adapter pid when the stored root was a generated wrapper", async () => {
const { deps, killed } = cleanupDeps([
{
pid: 260,
ppid: 1,
command: PLUGIN_DEPS_CODEX_COMMAND,
},
]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 260,
rootCommand: CODEX_WRAPPER_COMMAND,
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result).toEqual({
inspectedPids: [260],
terminatedPids: [],
skippedReason: "not-openclaw-owned",
});
expect(killed).toEqual([]);
});
it("skips non-owned recorded process trees", async () => {
const { deps, killed } = cleanupDeps([{ pid: 300, ppid: 1, command: "node server.js" }]);
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: 300,
rootCommand: "node server.js",
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result.skippedReason).toBe("not-openclaw-owned");
expect(killed).toEqual([]);
});
it("reaps stale OpenClaw-owned wrapper and adapter orphans on startup", async () => {
const { deps, killed } = cleanupDeps([
{ pid: 400, ppid: 1, command: CODEX_WRAPPER_COMMAND },
{ pid: 401, ppid: 400, command: PLUGIN_DEPS_CODEX_COMMAND },
{ pid: 402, ppid: 401, command: "node child.js" },
{ pid: 403, ppid: 1, command: CLAUDE_WRAPPER_COMMAND },
{ pid: 404, ppid: 403, command: "node claude-child.js" },
{ pid: 405, ppid: 1, command: PLUGIN_DEPS_CODEX_COMMAND },
{ pid: 406, ppid: 1, command: "node /tmp/other/codex-acp-wrapper.mjs" },
]);
const result = await reapStaleOpenClawOwnedAcpxOrphans({
wrapperRoot: WRAPPER_ROOT,
deps,
});
expect(result.skippedReason).toBeUndefined();
expect(result.inspectedPids).toEqual([400, 401, 402, 403, 404, 405]);
expect(killed.filter((entry) => entry.signal === "SIGTERM").map((entry) => entry.pid)).toEqual([
402, 401, 400, 404, 403, 405,
]);
});
it("keeps startup scans quiet when process listing is unavailable", async () => {
const result = await reapStaleOpenClawOwnedAcpxOrphans({
wrapperRoot: WRAPPER_ROOT,
deps: {
listProcesses: vi.fn(async () => {
throw new Error("ps unavailable");
}),
sleep: vi.fn(async () => {}),
},
});
expect(result).toEqual({
inspectedPids: [],
terminatedPids: [],
skippedReason: "process-list-unavailable",
});
});
});

View File

@@ -0,0 +1,381 @@
import { execFile } from "node:child_process";
import { promisify } from "node:util";
import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js";
const execFileAsync = promisify(execFile);
const GENERATED_WRAPPER_BASENAMES = new Set([
"codex-acp-wrapper.mjs",
"claude-agent-acp-wrapper.mjs",
]);
const OPENCLAW_PLUGIN_DEPS_MARKER = "/plugin-runtime-deps/";
const ACP_PACKAGE_MARKERS = [
"/@zed-industries/codex-acp/",
"/@agentclientprotocol/claude-agent-acp/",
"/acpx/dist/",
];
export type AcpxProcessInfo = {
pid: number;
ppid: number;
command: string;
};
export type AcpxProcessCleanupDeps = {
listProcesses?: () => Promise<AcpxProcessInfo[]>;
killProcess?: (pid: number, signal: NodeJS.Signals) => void;
sleep?: (ms: number) => Promise<void>;
};
export type AcpxProcessCleanupResult = {
inspectedPids: number[];
terminatedPids: number[];
skippedReason?: "missing-root" | "not-openclaw-owned" | "unverified-root";
};
export type AcpxStartupReapResult = {
inspectedPids: number[];
terminatedPids: number[];
skippedReason?: "unsupported-platform" | "process-list-unavailable";
};
function normalizePathLike(value: string): string {
return value.replaceAll("\\", "/");
}
function commandMentionsGeneratedWrapper(command: string): boolean {
return Array.from(GENERATED_WRAPPER_BASENAMES).some((basename) => command.includes(basename));
}
function commandWrapperBelongsToRoot(command: string, wrapperRoot: string | undefined): boolean {
if (!wrapperRoot) {
return true;
}
const normalizedCommand = normalizePathLike(command);
const normalizedRoot = normalizePathLike(wrapperRoot).replace(/\/+$/, "");
return Array.from(GENERATED_WRAPPER_BASENAMES).some((basename) =>
normalizedCommand.includes(`${normalizedRoot}/${basename}`),
);
}
function commandsReferToSameRootCommand(liveCommand: string, storedCommand: string | undefined) {
if (!storedCommand?.trim()) {
return true;
}
return normalizePathLike(liveCommand).trim() === normalizePathLike(storedCommand).trim();
}
function splitCommandParts(value: string): string[] {
const parts: string[] = [];
let current = "";
let quote: "'" | '"' | null = null;
let escaping = false;
for (const ch of value) {
if (escaping) {
current += ch;
escaping = false;
continue;
}
if (ch === "\\" && quote !== "'") {
escaping = true;
continue;
}
if (quote) {
if (ch === quote) {
quote = null;
} else {
current += ch;
}
continue;
}
if (ch === "'" || ch === '"') {
quote = ch;
continue;
}
if (/\s/.test(ch)) {
if (current) {
parts.push(current);
current = "";
}
continue;
}
current += ch;
}
if (escaping) {
current += "\\";
}
if (current) {
parts.push(current);
}
return parts;
}
function commandOptionEquals(
parts: string[],
option: string,
expected: string | undefined,
): boolean {
if (!expected) {
return true;
}
const index = parts.indexOf(option);
return index >= 0 && parts[index + 1] === expected;
}
function liveCommandMatchesLeaseIdentity(params: {
command: string | undefined;
expectedLeaseId?: string;
expectedGatewayInstanceId?: string;
}): boolean {
if (!params.expectedLeaseId && !params.expectedGatewayInstanceId) {
return true;
}
const parts = splitCommandParts(params.command ?? "");
return (
commandOptionEquals(parts, OPENCLAW_ACPX_LEASE_ID_ARG, params.expectedLeaseId) &&
commandOptionEquals(parts, OPENCLAW_GATEWAY_INSTANCE_ID_ARG, params.expectedGatewayInstanceId)
);
}
export function isOpenClawOwnedAcpxProcessCommand(params: {
command: string | undefined;
wrapperRoot?: string;
}): boolean {
const command = params.command?.trim();
if (!command) {
return false;
}
const normalized = normalizePathLike(command);
if (commandMentionsGeneratedWrapper(normalized)) {
return commandWrapperBelongsToRoot(normalized, params.wrapperRoot);
}
if (!normalized.includes(OPENCLAW_PLUGIN_DEPS_MARKER)) {
return false;
}
return ACP_PACKAGE_MARKERS.some((marker) => normalized.includes(marker));
}
function parseProcessList(stdout: string): AcpxProcessInfo[] {
const processes: AcpxProcessInfo[] = [];
for (const line of stdout.split(/\r?\n/)) {
const match = /^\s*(?<pid>\d+)\s+(?<ppid>\d+)\s+(?<command>.+?)\s*$/.exec(line);
if (!match?.groups) {
continue;
}
processes.push({
pid: Number.parseInt(match.groups.pid, 10),
ppid: Number.parseInt(match.groups.ppid, 10),
command: match.groups.command,
});
}
return processes;
}
export async function listPlatformProcesses(): Promise<AcpxProcessInfo[]> {
if (process.platform === "win32") {
return [];
}
const { stdout } = await execFileAsync("ps", ["-axo", "pid=,ppid=,command="], {
maxBuffer: 8 * 1024 * 1024,
});
return parseProcessList(stdout);
}
function collectProcessTree(processes: AcpxProcessInfo[], rootPid: number): AcpxProcessInfo[] {
const childrenByParent = new Map<number, AcpxProcessInfo[]>();
for (const processInfo of processes) {
const children = childrenByParent.get(processInfo.ppid) ?? [];
children.push(processInfo);
childrenByParent.set(processInfo.ppid, children);
}
const byPid = new Map(processes.map((processInfo) => [processInfo.pid, processInfo]));
const root = byPid.get(rootPid);
const collected: AcpxProcessInfo[] = [];
if (root) {
collected.push(root);
}
const queue = [...(childrenByParent.get(rootPid) ?? [])];
while (queue.length > 0) {
const next = queue.shift();
if (!next || collected.some((processInfo) => processInfo.pid === next.pid)) {
continue;
}
collected.push(next);
queue.push(...(childrenByParent.get(next.pid) ?? []));
}
return collected;
}
function uniquePids(processes: AcpxProcessInfo[]): number[] {
return Array.from(
new Set(
processes
.map((processInfo) => processInfo.pid)
.filter((pid) => Number.isInteger(pid) && pid > 0 && pid !== process.pid),
),
);
}
function isProcessAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
async function terminatePids(
pids: number[],
deps: AcpxProcessCleanupDeps | undefined,
): Promise<number[]> {
const killProcess = deps?.killProcess ?? ((pid, signal) => process.kill(pid, signal));
const sleep = deps?.sleep ?? ((ms) => new Promise<void>((resolve) => setTimeout(resolve, ms)));
const terminated: number[] = [];
for (const pid of pids) {
try {
killProcess(pid, "SIGTERM");
terminated.push(pid);
} catch {
// The process may already be gone.
}
}
if (terminated.length === 0) {
return terminated;
}
await sleep(750);
for (const pid of terminated) {
if (deps?.killProcess || isProcessAlive(pid)) {
try {
killProcess(pid, "SIGKILL");
} catch {
// Best-effort cleanup only.
}
}
}
return terminated;
}
export async function cleanupOpenClawOwnedAcpxProcessTree(params: {
rootPid?: number;
rootCommand?: string;
expectedLeaseId?: string;
expectedGatewayInstanceId?: string;
wrapperRoot?: string;
deps?: AcpxProcessCleanupDeps;
}): Promise<AcpxProcessCleanupResult> {
const rootPid = params.rootPid;
if (!rootPid || rootPid <= 0 || rootPid === process.pid) {
return { inspectedPids: [], terminatedPids: [], skippedReason: "missing-root" };
}
let processes: AcpxProcessInfo[] = [];
try {
processes = await (params.deps?.listProcesses ?? listPlatformProcesses)();
} catch {
processes = [];
}
const listedTree = collectProcessTree(processes, rootPid);
// Session-store PIDs are stale data. If the live process table cannot prove
// that this PID still belongs to an OpenClaw-owned wrapper, fail closed to
// avoid killing an unrelated process after PID reuse.
if (listedTree.length === 0) {
return { inspectedPids: [], terminatedPids: [], skippedReason: "unverified-root" };
}
const rootCommand = listedTree[0]?.command ?? params.rootCommand;
const liveCommandWasGeneratedWrapper = commandMentionsGeneratedWrapper(
normalizePathLike(rootCommand ?? ""),
);
const storedCommandWasGeneratedWrapper = commandMentionsGeneratedWrapper(
normalizePathLike(params.rootCommand ?? ""),
);
if (!liveCommandWasGeneratedWrapper && storedCommandWasGeneratedWrapper) {
return {
inspectedPids: listedTree.map((processInfo) => processInfo.pid),
terminatedPids: [],
skippedReason: "not-openclaw-owned",
};
}
if (
!liveCommandWasGeneratedWrapper &&
!commandsReferToSameRootCommand(rootCommand ?? "", params.rootCommand)
) {
return {
inspectedPids: listedTree.map((processInfo) => processInfo.pid),
terminatedPids: [],
skippedReason: "not-openclaw-owned",
};
}
if (
!isOpenClawOwnedAcpxProcessCommand({
command: rootCommand,
wrapperRoot: params.wrapperRoot,
})
) {
return {
inspectedPids: listedTree.map((processInfo) => processInfo.pid),
terminatedPids: [],
skippedReason: "not-openclaw-owned",
};
}
if (
!liveCommandMatchesLeaseIdentity({
command: rootCommand,
expectedLeaseId: params.expectedLeaseId,
expectedGatewayInstanceId: params.expectedGatewayInstanceId,
})
) {
return {
inspectedPids: listedTree.map((processInfo) => processInfo.pid),
terminatedPids: [],
skippedReason: "not-openclaw-owned",
};
}
const pids = uniquePids(listedTree.toReversed());
return {
inspectedPids: uniquePids(listedTree),
terminatedPids: await terminatePids(pids, params.deps),
};
}
export async function reapStaleOpenClawOwnedAcpxOrphans(params: {
wrapperRoot: string;
deps?: AcpxProcessCleanupDeps;
}): Promise<AcpxStartupReapResult> {
if (process.platform === "win32") {
return { inspectedPids: [], terminatedPids: [], skippedReason: "unsupported-platform" };
}
let processes: AcpxProcessInfo[];
try {
processes = await (params.deps?.listProcesses ?? listPlatformProcesses)();
} catch {
return { inspectedPids: [], terminatedPids: [], skippedReason: "process-list-unavailable" };
}
const orphans = processes.filter(
(processInfo) =>
processInfo.ppid === 1 &&
isOpenClawOwnedAcpxProcessCommand({
command: processInfo.command,
wrapperRoot: params.wrapperRoot,
}),
);
// Startup reaping starts from currently visible orphan roots and then expands
// each tree, so adapter grandchildren do not survive as fresh orphans after
// the wrapper root exits.
const orphanTrees = orphans.map((orphan) => collectProcessTree(processes, orphan.pid));
const inspectedPids = uniquePids(orphanTrees.flat());
const pids = uniquePids(orphanTrees.flatMap((tree) => tree.toReversed()));
return {
inspectedPids,
terminatedPids: await terminatePids(pids, params.deps),
};
}

View File

@@ -1,5 +1,6 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { AcpRuntimeError, type AcpRuntime } from "../runtime-api.js";
import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js";
import { AcpxRuntime, __testing } from "./runtime.js";
type TestSessionStore = {
@@ -11,14 +12,17 @@ const DOCUMENTED_OPENCLAW_BRIDGE_COMMAND =
"env OPENCLAW_HIDE_BANNER=1 OPENCLAW_SUPPRESS_NOTES=1 openclaw acp --url ws://127.0.0.1:18789 --token-file ~/.openclaw/gateway.token --session agent:main:main";
const CODEX_ACP_COMMAND = "npx @zed-industries/codex-acp@0.13.0";
const CODEX_ACP_WRAPPER_COMMAND = `node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"`;
const CODEX_ACP_WRAPPER_COMMAND_WITH_LEASE = `${CODEX_ACP_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-close ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-test`;
function makeRuntime(
baseStore: TestSessionStore,
options: Partial<ConstructorParameters<typeof AcpxRuntime>[0]> = {},
testOptions?: ConstructorParameters<typeof AcpxRuntime>[1],
): {
runtime: AcpxRuntime;
wrappedStore: TestSessionStore & { markFresh: (sessionKey: string) => void };
delegate: {
cancel: AcpRuntime["cancel"];
close: AcpRuntime["close"];
ensureSession: AcpRuntime["ensureSession"];
getStatus: NonNullable<AcpRuntime["getStatus"]>;
@@ -35,16 +39,19 @@ function makeRuntime(
probeAvailability(): Promise<void>;
};
} {
const runtime = new AcpxRuntime({
cwd: "/tmp",
sessionStore: baseStore,
agentRegistry: {
resolve: (agentName: string) => (agentName === "openclaw" ? "openclaw acp" : agentName),
list: () => ["codex", "openclaw"],
const runtime = new AcpxRuntime(
{
cwd: "/tmp",
sessionStore: baseStore,
agentRegistry: {
resolve: (agentName: string) => (agentName === "openclaw" ? "openclaw acp" : agentName),
list: () => ["codex", "openclaw"],
},
permissionMode: "approve-reads",
...options,
},
permissionMode: "approve-reads",
...options,
});
testOptions,
);
return {
runtime,
@@ -56,6 +63,7 @@ function makeRuntime(
delegate: (
runtime as unknown as {
delegate: {
cancel: AcpRuntime["cancel"];
close: AcpRuntime["close"];
ensureSession: AcpRuntime["ensureSession"];
getStatus: NonNullable<AcpRuntime["getStatus"]>;
@@ -80,6 +88,26 @@ function makeRuntime(
};
}
function makeLeaseStore() {
const leases = new Map<string, Record<string, unknown>>();
return {
leases,
store: {
load: vi.fn(async (leaseId: string) => leases.get(leaseId) as never),
listOpen: vi.fn(async () => Array.from(leases.values()) as never),
save: vi.fn(async (lease: Record<string, unknown>) => {
leases.set(String(lease.leaseId), lease);
}),
markState: vi.fn(async (leaseId: string, state: string) => {
const lease = leases.get(leaseId);
if (lease) {
lease.state = state;
}
}),
},
};
}
describe("AcpxRuntime fresh reset wrapper", () => {
beforeEach(() => {
vi.restoreAllMocks();
@@ -517,6 +545,483 @@ describe("AcpxRuntime fresh reset wrapper", () => {
expect(baseStore.load).toHaveBeenCalledOnce();
});
it("cleans up OpenClaw-owned ACPX process trees after close", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
acpxRecordId: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
pid: 900,
})),
save: vi.fn(async () => {}),
};
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
const { runtime, delegate } = makeRuntime(
baseStore,
{
openclawWrapperRoot: "/tmp/openclaw/acpx",
},
{
openclawProcessCleanup: {
listProcesses: vi.fn(async () => [
{
pid: 900,
ppid: 1,
command: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
},
{
pid: 901,
ppid: 900,
command:
"node /tmp/openclaw/plugin-runtime-deps/node_modules/@zed-industries/codex-acp/bin/codex-acp.js",
},
]),
killProcess: vi.fn((pid, signal) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
},
);
vi.spyOn(delegate, "close").mockResolvedValue(undefined);
await runtime.close({
handle: {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
},
reason: "user-close",
});
expect(killed.slice(0, 2)).toEqual([
{ pid: 901, signal: "SIGTERM" },
{ pid: 900, signal: "SIGTERM" },
]);
});
it("records ACPX process leases without persisting lease-specific agent commands", async () => {
const savedRecords: Record<string, unknown>[] = [];
const launchCommands: string[] = [];
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),
save: vi.fn(async (record) => {
savedRecords.push(record);
}),
};
const leaseStore = makeLeaseStore();
const { runtime, delegate, wrappedStore } = makeRuntime(baseStore, {
openclawGatewayInstanceId: "gateway-test",
openclawProcessLeaseStore: leaseStore.store,
openclawWrapperRoot: "/tmp/openclaw/acpx",
agentRegistry: {
resolve: (agentName: string) =>
agentName === "codex" ? CODEX_ACP_WRAPPER_COMMAND : agentName,
list: () => ["codex"],
},
});
vi.spyOn(delegate, "ensureSession").mockImplementation(async (input) => {
const command = (
runtime as unknown as { scopedAgentRegistry: { resolve(agent: string): string } }
).scopedAgentRegistry.resolve("codex");
launchCommands.push(command);
await wrappedStore.save({
name: input.sessionKey,
agentCommand: command,
pid: 777,
});
return {
sessionKey: input.sessionKey,
backend: "acpx",
runtimeSessionName: input.sessionKey,
};
});
await runtime.ensureSession({
sessionKey: "agent:codex:acp:binding:test",
agent: "codex",
mode: "persistent",
});
expect(leaseStore.store.save).toHaveBeenCalledTimes(2);
const leases = Array.from(leaseStore.leases.values());
expect(leases).toHaveLength(1);
expect(leases[0]).toMatchObject({
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
rootPid: 777,
state: "open",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
});
expect(launchCommands[0]).toContain("OPENCLAW_ACPX_LEASE_ID=");
expect(launchCommands[0]).toContain("OPENCLAW_GATEWAY_INSTANCE_ID=gateway-test");
expect(savedRecords[0]?.agentCommand).toBe(CODEX_ACP_WRAPPER_COMMAND);
expect(savedRecords[0]).toMatchObject({
openclawGatewayInstanceId: "gateway-test",
openclawLeaseId: leases[0]?.leaseId,
});
});
it("keeps reusable persistent ACP launch commands stable across ensures", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
name: "agent:codex:acp:binding:test",
acpxRecordId: "record-1",
acpSessionId: "session-1",
agentCommand: CODEX_ACP_WRAPPER_COMMAND,
cwd: "/tmp",
closed: false,
})),
save: vi.fn(async () => {}),
};
const leaseStore = makeLeaseStore();
const { runtime, delegate } = makeRuntime(baseStore, {
openclawGatewayInstanceId: "gateway-test",
openclawProcessLeaseStore: leaseStore.store,
openclawWrapperRoot: "/tmp/openclaw/acpx",
agentRegistry: {
resolve: (agentName: string) =>
agentName === "codex" ? CODEX_ACP_WRAPPER_COMMAND : agentName,
list: () => ["codex"],
},
});
const resolvedCommands: string[] = [];
vi.spyOn(delegate, "ensureSession").mockImplementation(async (input) => {
resolvedCommands.push(
(
runtime as unknown as { scopedAgentRegistry: { resolve(agent: string): string } }
).scopedAgentRegistry.resolve("codex"),
);
return {
sessionKey: input.sessionKey,
backend: "acpx",
runtimeSessionName: input.sessionKey,
};
});
await runtime.ensureSession({
sessionKey: "agent:codex:acp:binding:test",
agent: "codex",
mode: "persistent",
});
expect(resolvedCommands).toEqual([CODEX_ACP_WRAPPER_COMMAND]);
expect(leaseStore.store.save).not.toHaveBeenCalled();
});
it("merges sidecar lease ids into loaded ACPX session records", async () => {
const leaseStore = makeLeaseStore();
leaseStore.leases.set("lease-loaded", {
leaseId: "lease-loaded",
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 777,
commandHash: "hash",
startedAt: 1,
state: "open",
});
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
name: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
pid: 777,
})),
save: vi.fn(async () => {}),
};
const { wrappedStore } = makeRuntime(baseStore, {
openclawGatewayInstanceId: "gateway-test",
openclawProcessLeaseStore: leaseStore.store,
openclawWrapperRoot: "/tmp/openclaw/acpx",
});
await expect(wrappedStore.load("agent:codex:acp:binding:test")).resolves.toMatchObject({
openclawGatewayInstanceId: "gateway-test",
openclawLeaseId: "lease-loaded",
});
});
it("merges the lease for the current ACPX session process when old leases exist", async () => {
const leaseStore = makeLeaseStore();
leaseStore.leases.set("lease-old", {
leaseId: "lease-old",
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 700,
commandHash: "hash",
startedAt: 1,
state: "open",
});
leaseStore.leases.set("lease-current", {
leaseId: "lease-current",
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 777,
commandHash: "hash",
startedAt: 2,
state: "open",
});
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
name: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
pid: 777,
})),
save: vi.fn(async () => {}),
};
const { wrappedStore } = makeRuntime(baseStore, {
openclawGatewayInstanceId: "gateway-test",
openclawProcessLeaseStore: leaseStore.store,
openclawWrapperRoot: "/tmp/openclaw/acpx",
});
await expect(wrappedStore.load("agent:codex:acp:binding:test")).resolves.toMatchObject({
openclawGatewayInstanceId: "gateway-test",
openclawLeaseId: "lease-current",
});
});
it("uses matching leases before legacy pid cleanup on close", async () => {
const leaseStore = makeLeaseStore();
leaseStore.leases.set("lease-close", {
leaseId: "lease-close",
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 930,
commandHash: "hash",
startedAt: 1,
state: "open",
});
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
acpxRecordId: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
openclawLeaseId: "lease-close",
pid: 930,
})),
save: vi.fn(async () => {}),
};
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
const { runtime, delegate } = makeRuntime(
baseStore,
{
openclawGatewayInstanceId: "gateway-test",
openclawProcessLeaseStore: leaseStore.store,
openclawWrapperRoot: "/tmp/openclaw/acpx",
},
{
openclawProcessCleanup: {
listProcesses: vi.fn(async () => [
{
pid: 930,
ppid: 1,
command: CODEX_ACP_WRAPPER_COMMAND_WITH_LEASE,
},
{ pid: 931, ppid: 930, command: "node child.js" },
]),
killProcess: vi.fn((pid, signal) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
},
);
vi.spyOn(delegate, "close").mockResolvedValue(undefined);
await runtime.close({
handle: {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
},
reason: "user-close",
});
expect(killed.slice(0, 2)).toEqual([
{ pid: 931, signal: "SIGTERM" },
{ pid: 930, signal: "SIGTERM" },
]);
expect(leaseStore.store.markState).toHaveBeenCalledWith("lease-close", "closing");
expect(leaseStore.store.markState).toHaveBeenLastCalledWith("lease-close", "closed");
});
it("closes the current process lease when the saved lease id is stale", async () => {
const leaseStore = makeLeaseStore();
leaseStore.leases.set("lease-old", {
leaseId: "lease-old",
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 930,
commandHash: "hash",
startedAt: 1,
state: "open",
});
leaseStore.leases.set("lease-current", {
leaseId: "lease-current",
gatewayInstanceId: "gateway-test",
sessionKey: "agent:codex:acp:binding:test",
wrapperRoot: "/tmp/openclaw/acpx",
wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs",
rootPid: 940,
commandHash: "hash",
startedAt: 2,
state: "open",
});
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
acpxRecordId: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
openclawLeaseId: "lease-old",
pid: 940,
})),
save: vi.fn(async () => {}),
};
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
const { runtime, delegate } = makeRuntime(
baseStore,
{
openclawGatewayInstanceId: "gateway-test",
openclawProcessLeaseStore: leaseStore.store,
openclawWrapperRoot: "/tmp/openclaw/acpx",
},
{
openclawProcessCleanup: {
listProcesses: vi.fn(async () => [
{
pid: 930,
ppid: 1,
command: `${CODEX_ACP_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-old ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-test`,
},
{
pid: 940,
ppid: 1,
command: `${CODEX_ACP_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-current ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-test`,
},
{ pid: 941, ppid: 940, command: "node child.js" },
]),
killProcess: vi.fn((pid, signal) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
},
);
vi.spyOn(delegate, "close").mockResolvedValue(undefined);
await runtime.close({
handle: {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
},
reason: "user-close",
});
expect(killed.slice(0, 2)).toEqual([
{ pid: 941, signal: "SIGTERM" },
{ pid: 940, signal: "SIGTERM" },
]);
expect(leaseStore.store.markState).not.toHaveBeenCalledWith("lease-old", expect.any(String));
expect(leaseStore.store.markState).toHaveBeenCalledWith("lease-current", "closing");
expect(leaseStore.store.markState).toHaveBeenLastCalledWith("lease-current", "closed");
});
it("does not clean up a stale close pid reused by another wrapper root", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
acpxRecordId: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
pid: 920,
})),
save: vi.fn(async () => {}),
};
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
const { runtime, delegate } = makeRuntime(
baseStore,
{
openclawWrapperRoot: "/tmp/openclaw/acpx",
},
{
openclawProcessCleanup: {
listProcesses: vi.fn(async () => [
{
pid: 920,
ppid: 1,
command: 'node "/tmp/other-gateway/acpx/codex-acp-wrapper.mjs"',
},
]),
killProcess: vi.fn((pid, signal) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
},
);
vi.spyOn(delegate, "close").mockResolvedValue(undefined);
await runtime.close({
handle: {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
},
reason: "user-close",
});
expect(killed).toEqual([]);
});
it("does not tear down reusable ACPX sessions after cancel", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => ({
acpxRecordId: "agent:codex:acp:binding:test",
agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"',
processId: "910",
})),
save: vi.fn(async () => {}),
};
const killed: Array<{ pid: number; signal: NodeJS.Signals }> = [];
const listProcesses = vi.fn(async () => {
throw new Error("process listing should not run on cancel");
});
const { runtime, delegate } = makeRuntime(
baseStore,
{},
{
openclawProcessCleanup: {
listProcesses,
killProcess: vi.fn((pid, signal) => {
killed.push({ pid, signal });
}),
sleep: vi.fn(async () => {}),
},
},
);
const cancel = vi.spyOn(delegate, "cancel").mockResolvedValue(undefined);
const input = {
handle: {
sessionKey: "agent:codex:acp:binding:test",
backend: "acpx",
runtimeSessionName: "agent:codex:acp:binding:test",
},
} satisfies Parameters<AcpRuntime["cancel"]>[0];
await runtime.cancel(input);
expect(cancel).toHaveBeenCalledWith(input);
expect(listProcesses).not.toHaveBeenCalled();
expect(killed).toEqual([]);
});
it("routes openclaw ensureSession through the bridge-safe delegate when MCP servers are configured", async () => {
const baseStore: TestSessionStore = {
load: vi.fn(async () => undefined),

View File

@@ -1,4 +1,5 @@
import { AsyncLocalStorage } from "node:async_hooks";
import { resolve as resolvePath } from "node:path";
import {
ACPX_BACKEND_ID,
AcpxRuntime as BaseAcpxRuntime,
@@ -15,16 +16,45 @@ import {
type AcpRuntimeStatus,
} from "acpx/runtime";
import { AcpRuntimeError, type AcpRuntime } from "../runtime-api.js";
import {
createAcpxProcessLeaseId,
hashAcpxProcessCommand,
withAcpxLeaseEnvironment,
type AcpxProcessLease,
type AcpxProcessLeaseStore,
} from "./process-lease.js";
import {
cleanupOpenClawOwnedAcpxProcessTree,
isOpenClawOwnedAcpxProcessCommand,
type AcpxProcessCleanupDeps,
} from "./process-reaper.js";
type AcpSessionStore = AcpRuntimeOptions["sessionStore"];
type AcpSessionRecord = Parameters<AcpSessionStore["save"]>[0];
type AcpLoadedSessionRecord = Awaited<ReturnType<AcpSessionStore["load"]>>;
type BaseAcpxRuntimeTestOptions = ConstructorParameters<typeof BaseAcpxRuntime>[1];
type OpenClawAcpxRuntimeOptions = AcpRuntimeOptions & {
openclawWrapperRoot?: string;
openclawGatewayInstanceId?: string;
openclawProcessLeaseStore?: AcpxProcessLeaseStore;
};
type AcpxRuntimeTestOptions = Record<string, unknown> & {
openclawProcessCleanup?: AcpxProcessCleanupDeps;
};
type ResetAwareSessionStore = AcpSessionStore & {
markFresh: (sessionKey: string) => void;
};
function readSessionRecordName(record: AcpSessionRecord): string {
type AcpxLaunchLeaseContext = {
leaseId: string;
gatewayInstanceId: string;
sessionKey: string;
wrapperRoot: string;
stableCommand?: string;
};
function readSessionRecordName(record: unknown): string {
if (typeof record !== "object" || record === null) {
return "";
}
@@ -32,7 +62,88 @@ function readSessionRecordName(record: AcpSessionRecord): string {
return typeof name === "string" ? name.trim() : "";
}
function createResetAwareSessionStore(baseStore: AcpSessionStore): ResetAwareSessionStore {
function readRecordAgentCommand(record: unknown): string | undefined {
if (typeof record !== "object" || record === null) {
return undefined;
}
const { agentCommand } = record as { agentCommand?: unknown };
return typeof agentCommand === "string" ? agentCommand.trim() || undefined : undefined;
}
function readRecordCwd(record: unknown): string | undefined {
if (typeof record !== "object" || record === null) {
return undefined;
}
const { cwd } = record as { cwd?: unknown };
return typeof cwd === "string" ? cwd.trim() || undefined : undefined;
}
function readRecordResetOnNextEnsure(record: unknown): boolean {
if (typeof record !== "object" || record === null) {
return false;
}
const { acpx } = record as { acpx?: unknown };
if (typeof acpx !== "object" || acpx === null) {
return false;
}
return (acpx as { reset_on_next_ensure?: unknown }).reset_on_next_ensure === true;
}
function readRecordAgentPid(record: unknown): number | undefined {
if (typeof record !== "object" || record === null) {
return undefined;
}
const { pid, processId } = record as { pid?: unknown; processId?: unknown };
const rawPid = pid ?? processId;
const numericPid =
typeof rawPid === "number"
? rawPid
: typeof rawPid === "string"
? Number.parseInt(rawPid, 10)
: undefined;
return numericPid && Number.isInteger(numericPid) && numericPid > 0 ? numericPid : undefined;
}
function readOpenClawLeaseIdFromRecord(record: AcpLoadedSessionRecord): string | undefined {
if (typeof record !== "object" || record === null) {
return undefined;
}
const { openclawLeaseId } = record as { openclawLeaseId?: unknown };
return typeof openclawLeaseId === "string" ? openclawLeaseId.trim() || undefined : undefined;
}
function extractGeneratedWrapperPath(command: string | undefined): string {
const parts = splitCommandParts(command ?? "");
return (
parts.find(
(part) =>
basename(part) === "codex-acp-wrapper.mjs" ||
basename(part) === "claude-agent-acp-wrapper.mjs",
) ?? ""
);
}
function selectCurrentSessionLease(params: {
leases: AcpxProcessLease[];
sessionKeys: string[];
rootPid?: number;
}): AcpxProcessLease | undefined {
const sessionKeys = new Set(params.sessionKeys.map((entry) => entry.trim()).filter(Boolean));
const candidates = params.leases.filter((lease) => sessionKeys.has(lease.sessionKey));
if (params.rootPid) {
return candidates.find((lease) => lease.rootPid === params.rootPid);
}
return candidates.toSorted((a, b) => b.startedAt - a.startedAt)[0];
}
function createResetAwareSessionStore(
baseStore: AcpSessionStore,
params?: {
gatewayInstanceId?: string;
leaseStore?: AcpxProcessLeaseStore;
launchScope?: AsyncLocalStorage<AcpxLaunchLeaseContext | undefined>;
},
): ResetAwareSessionStore {
const freshSessionKeys = new Set<string>();
return {
@@ -41,11 +152,61 @@ function createResetAwareSessionStore(baseStore: AcpSessionStore): ResetAwareSes
if (normalized && freshSessionKeys.has(normalized)) {
return undefined;
}
return await baseStore.load(sessionId);
const record = await baseStore.load(sessionId);
if (!record || !params?.leaseStore || !params.gatewayInstanceId) {
return record;
}
const sessionName = readSessionRecordName(record) || normalized;
const lease = selectCurrentSessionLease({
leases: await params.leaseStore.listOpen(params.gatewayInstanceId),
sessionKeys: [sessionName, normalized],
rootPid: readRecordAgentPid(record),
});
if (!lease) {
return record;
}
return {
...(record as Record<string, unknown>),
openclawLeaseId: lease.leaseId,
openclawGatewayInstanceId: lease.gatewayInstanceId,
} as AcpLoadedSessionRecord;
},
async save(record: AcpSessionRecord): Promise<void> {
await baseStore.save(record);
let recordToSave = record;
const launch = params?.launchScope?.getStore();
const sessionName = readSessionRecordName(record);
const rootPid = readRecordAgentPid(record);
const agentCommand = readRecordAgentCommand(record);
const stableAgentCommand = launch?.stableCommand ?? agentCommand;
if (
launch &&
params?.leaseStore &&
sessionName === launch.sessionKey &&
rootPid &&
stableAgentCommand
) {
const lease: AcpxProcessLease = {
leaseId: launch.leaseId,
gatewayInstanceId: launch.gatewayInstanceId,
sessionKey: launch.sessionKey,
wrapperRoot: launch.wrapperRoot,
wrapperPath: extractGeneratedWrapperPath(stableAgentCommand),
rootPid,
commandHash: hashAcpxProcessCommand(stableAgentCommand),
startedAt: Date.now(),
state: "open",
};
await params.leaseStore.save(lease);
recordToSave = {
...(record as Record<string, unknown>),
// ACPX uses agentCommand as reuse identity. Lease metadata belongs to
// our sidecar record, so keep the persisted command stable.
agentCommand: stableAgentCommand,
openclawLeaseId: launch.leaseId,
openclawGatewayInstanceId: launch.gatewayInstanceId,
} as AcpSessionRecord;
}
await baseStore.save(recordToSave);
if (sessionName) {
freshSessionKeys.delete(sessionName);
}
@@ -109,11 +270,11 @@ function readAgentFromHandle(handle: AcpRuntimeHandle): string | undefined {
}
function readAgentCommandFromRecord(record: AcpLoadedSessionRecord): string | undefined {
if (typeof record !== "object" || record === null) {
return undefined;
}
const { agentCommand } = record as { agentCommand?: unknown };
return typeof agentCommand === "string" ? agentCommand.trim() || undefined : undefined;
return readRecordAgentCommand(record);
}
function readAgentPidFromRecord(record: AcpLoadedSessionRecord): number | undefined {
return readRecordAgentPid(record);
}
function splitCommandParts(value: string): string[] {
@@ -338,6 +499,7 @@ function appendCodexAcpConfigOverrides(command: string, override: CodexAcpModelO
function createModelScopedAgentRegistry(params: {
agentRegistry: AcpAgentRegistry;
scope: AsyncLocalStorage<CodexAcpModelOverride | undefined>;
leaseCommand: (command: string | undefined) => string | undefined;
}): AcpAgentRegistry {
return {
resolve(agentName: string): string | undefined {
@@ -349,9 +511,9 @@ function createModelScopedAgentRegistry(params: {
typeof command !== "string" ||
!isCodexAcpCommand(command)
) {
return command;
return params.leaseCommand(command);
}
return appendCodexAcpConfigOverrides(command, override);
return params.leaseCommand(appendCodexAcpConfigOverrides(command, override));
},
list(): string[] {
return params.agentRegistry.list();
@@ -402,30 +564,47 @@ export class AcpxRuntime implements AcpRuntime {
private readonly delegate: BaseAcpxRuntime;
private readonly bridgeSafeDelegate: BaseAcpxRuntime;
private readonly probeDelegate: BaseAcpxRuntime;
private readonly processCleanupDeps: AcpxProcessCleanupDeps | undefined;
private readonly wrapperRoot: string | undefined;
private readonly gatewayInstanceId: string | undefined;
private readonly processLeaseStore: AcpxProcessLeaseStore | undefined;
private readonly launchLeaseScope = new AsyncLocalStorage<AcpxLaunchLeaseContext | undefined>();
private readonly cwd: string;
constructor(
options: AcpRuntimeOptions,
testOptions?: ConstructorParameters<typeof BaseAcpxRuntime>[1],
) {
this.sessionStore = createResetAwareSessionStore(options.sessionStore);
constructor(options: OpenClawAcpxRuntimeOptions, testOptions?: AcpxRuntimeTestOptions) {
const { openclawProcessCleanup, ...delegateTestOptions } = testOptions ?? {};
this.processCleanupDeps = openclawProcessCleanup;
this.wrapperRoot = options.openclawWrapperRoot;
this.gatewayInstanceId = options.openclawGatewayInstanceId;
this.processLeaseStore = options.openclawProcessLeaseStore;
this.cwd = options.cwd;
this.sessionStore = createResetAwareSessionStore(options.sessionStore, {
gatewayInstanceId: this.gatewayInstanceId,
leaseStore: this.processLeaseStore,
launchScope: this.launchLeaseScope,
});
this.agentRegistry = options.agentRegistry;
this.scopedAgentRegistry = createModelScopedAgentRegistry({
agentRegistry: this.agentRegistry,
scope: this.codexAcpModelOverrideScope,
leaseCommand: (command) => this.commandWithLaunchLease(command),
});
const sharedOptions = {
...options,
sessionStore: this.sessionStore,
agentRegistry: this.scopedAgentRegistry,
};
this.delegate = new BaseAcpxRuntime(sharedOptions, testOptions);
this.delegate = new BaseAcpxRuntime(
sharedOptions,
delegateTestOptions as BaseAcpxRuntimeTestOptions,
);
this.bridgeSafeDelegate = shouldUseDistinctBridgeDelegate(options)
? new BaseAcpxRuntime(
{
...sharedOptions,
mcpServers: [],
},
testOptions,
delegateTestOptions as BaseAcpxRuntimeTestOptions,
)
: this.delegate;
this.probeDelegate = this.resolveDelegateForAgent(resolveProbeAgentName(options));
@@ -445,6 +624,13 @@ export class AcpxRuntime implements AcpRuntime {
private async resolveDelegateForHandle(handle: AcpRuntimeHandle): Promise<BaseAcpxRuntime> {
const record = await this.sessionStore.load(handle.acpxRecordId ?? handle.sessionKey);
return this.resolveDelegateForLoadedRecord(handle, record);
}
private resolveDelegateForLoadedRecord(
handle: AcpRuntimeHandle,
record: AcpLoadedSessionRecord,
): BaseAcpxRuntime {
const recordCommand = readAgentCommandFromRecord(record);
if (recordCommand) {
return this.resolveDelegateForCommand(recordCommand);
@@ -464,6 +650,150 @@ export class AcpxRuntime implements AcpRuntime {
});
}
private commandWithLaunchLease(command: string | undefined): string | undefined {
const launch = this.launchLeaseScope.getStore();
if (!command || !launch) {
return command;
}
launch.stableCommand = command;
return withAcpxLeaseEnvironment({
command,
leaseId: launch.leaseId,
gatewayInstanceId: launch.gatewayInstanceId,
});
}
private async canReuseStablePersistentSession(params: {
sessionKey: string;
mode: Parameters<AcpRuntime["ensureSession"]>[0]["mode"];
cwd: string | undefined;
command: string | undefined;
resumeSessionId: string | undefined;
}): Promise<boolean> {
if (params.mode !== "persistent" || !params.command) {
return false;
}
const existing = await this.sessionStore.load(params.sessionKey);
if (!existing || readRecordResetOnNextEnsure(existing)) {
return false;
}
const recordCwd = readRecordCwd(existing);
if (!recordCwd || resolvePath(recordCwd) !== resolvePath(params.cwd?.trim() || this.cwd)) {
return false;
}
if (readRecordAgentCommand(existing) !== params.command) {
return false;
}
const existingSessionId =
typeof existing === "object" && existing !== null
? (existing as { acpSessionId?: unknown }).acpSessionId
: undefined;
return !params.resumeSessionId || existingSessionId === params.resumeSessionId;
}
private async runWithLaunchLease<T>(params: {
sessionKey: string;
command: string | undefined;
enabled?: boolean;
run: () => Promise<T>;
}): Promise<T> {
if (
params.enabled === false ||
!params.command ||
!this.wrapperRoot ||
!this.gatewayInstanceId ||
!this.processLeaseStore ||
!isOpenClawOwnedAcpxProcessCommand({
command: params.command,
wrapperRoot: this.wrapperRoot,
})
) {
return await params.run();
}
const launch: AcpxLaunchLeaseContext = {
leaseId: createAcpxProcessLeaseId(),
gatewayInstanceId: this.gatewayInstanceId,
sessionKey: params.sessionKey,
wrapperRoot: this.wrapperRoot,
stableCommand: params.command,
};
// The pending lease is written before acpx spawns. The session-store save
// path fills in the live PID after acpx connects and exposes the process.
await this.processLeaseStore.save({
leaseId: launch.leaseId,
gatewayInstanceId: launch.gatewayInstanceId,
sessionKey: launch.sessionKey,
wrapperRoot: launch.wrapperRoot,
wrapperPath: extractGeneratedWrapperPath(params.command),
rootPid: 0,
commandHash: hashAcpxProcessCommand(params.command),
startedAt: Date.now(),
state: "open",
});
return await this.launchLeaseScope.run(launch, params.run);
}
private async cleanupProcessTreeForRecord(
handle: AcpRuntimeHandle,
record: AcpLoadedSessionRecord,
): Promise<void> {
const leaseId = readOpenClawLeaseIdFromRecord(record);
const rootPid = readAgentPidFromRecord(record);
const sessionKeys = [handle.sessionKey, readSessionRecordName(record)];
const openLeases =
this.gatewayInstanceId && this.processLeaseStore
? await this.processLeaseStore.listOpen(this.gatewayInstanceId)
: [];
const selectedLease = selectCurrentSessionLease({
leases: openLeases,
sessionKeys,
rootPid,
});
const loadedLease = leaseId ? await this.processLeaseStore?.load(leaseId) : undefined;
const lease =
selectedLease ??
(loadedLease &&
loadedLease.gatewayInstanceId === this.gatewayInstanceId &&
(!rootPid || loadedLease.rootPid === rootPid) &&
sessionKeys.includes(loadedLease.sessionKey)
? loadedLease
: undefined);
if (lease && lease.gatewayInstanceId === this.gatewayInstanceId && lease.rootPid > 0) {
await this.processLeaseStore?.markState(lease.leaseId, "closing");
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: lease.rootPid,
rootCommand: readAgentCommandFromRecord(record),
expectedLeaseId: lease.leaseId,
expectedGatewayInstanceId: lease.gatewayInstanceId,
wrapperRoot: lease.wrapperRoot,
deps: this.processCleanupDeps,
});
await this.processLeaseStore?.markState(
lease.leaseId,
result.terminatedPids.length > 0 || result.skippedReason === "missing-root"
? "closed"
: "lost",
);
return;
}
const rootCommand =
readAgentCommandFromRecord(record) ??
resolveAgentCommandForName({
agentName: readAgentFromHandle(handle),
agentRegistry: this.agentRegistry,
});
if (!rootPid || !rootCommand) {
return;
}
await cleanupOpenClawOwnedAcpxProcessTree({
rootPid,
rootCommand,
wrapperRoot: this.wrapperRoot,
deps: this.processCleanupDeps,
});
}
isHealthy(): boolean {
return this.probeDelegate.isHealthy();
}
@@ -489,9 +819,25 @@ export class AcpxRuntime implements AcpRuntime {
normalizeAgentName(input.agent) === CODEX_ACP_AGENT_ID && isCodexAcpCommand(command)
? normalizeCodexAcpModelOverride(input.model, input.thinking)
: undefined;
const stableLaunchCommand =
codexModelOverride && command
? appendCodexAcpConfigOverrides(command, codexModelOverride)
: command;
const shouldStartWithLease = !(await this.canReuseStablePersistentSession({
sessionKey: input.sessionKey,
mode: input.mode,
cwd: input.cwd,
command: stableLaunchCommand,
resumeSessionId: input.resumeSessionId,
}));
if (!codexModelOverride) {
return delegate.ensureSession(input);
return await this.runWithLaunchLease({
sessionKey: input.sessionKey,
command: stableLaunchCommand,
enabled: shouldStartWithLease,
run: () => delegate.ensureSession(input),
});
}
const normalizedInput = {
@@ -500,9 +846,15 @@ export class AcpxRuntime implements AcpRuntime {
? { model: codexAcpSessionModelId(codexModelOverride) }
: {}),
};
return this.codexAcpModelOverrideScope.run(codexModelOverride, () =>
delegate.ensureSession(normalizedInput),
);
return await this.runWithLaunchLease({
sessionKey: input.sessionKey,
command: stableLaunchCommand,
enabled: shouldStartWithLease,
run: () =>
this.codexAcpModelOverrideScope.run(codexModelOverride, () =>
delegate.ensureSession(normalizedInput),
),
});
}
async *runTurn(input: Parameters<AcpRuntime["runTurn"]>[0]): AsyncIterable<AcpRuntimeEvent> {
@@ -571,7 +923,10 @@ export class AcpxRuntime implements AcpRuntime {
}
async cancel(input: Parameters<AcpRuntime["cancel"]>[0]): Promise<void> {
const delegate = await this.resolveDelegateForHandle(input.handle);
const record = await this.sessionStore.load(
input.handle.acpxRecordId ?? input.handle.sessionKey,
);
const delegate = this.resolveDelegateForLoadedRecord(input.handle, record);
await delegate.cancel(input);
}
@@ -580,14 +935,21 @@ export class AcpxRuntime implements AcpRuntime {
}
async close(input: Parameters<AcpRuntime["close"]>[0]): Promise<void> {
await (
await this.resolveDelegateForHandle(input.handle)
).close({
handle: input.handle,
reason: input.reason,
discardPersistentState: input.discardPersistentState,
});
if (input.discardPersistentState) {
const record = await this.sessionStore.load(
input.handle.acpxRecordId ?? input.handle.sessionKey,
);
let closeSucceeded = false;
try {
await this.resolveDelegateForLoadedRecord(input.handle, record).close({
handle: input.handle,
reason: input.reason,
discardPersistentState: input.discardPersistentState,
});
closeSucceeded = true;
} finally {
await this.cleanupProcessTreeForRecord(input.handle, record);
}
if (closeSucceeded && input.discardPersistentState) {
this.sessionStore.markFresh(input.handle.sessionKey);
}
}

View File

@@ -11,6 +11,30 @@ const { prepareAcpxCodexAuthConfigMock } = vi.hoisted(() => ({
async ({ pluginConfig }: { pluginConfig: unknown }) => pluginConfig,
),
}));
const { cleanupOpenClawOwnedAcpxProcessTreeMock } = vi.hoisted(() => ({
cleanupOpenClawOwnedAcpxProcessTreeMock: vi.fn(
async (): Promise<{
inspectedPids: number[];
terminatedPids: number[];
skippedReason?: string;
}> => ({
inspectedPids: [],
terminatedPids: [],
}),
),
}));
const { reapStaleOpenClawOwnedAcpxOrphansMock } = vi.hoisted(() => ({
reapStaleOpenClawOwnedAcpxOrphansMock: vi.fn(
async (): Promise<{
inspectedPids: number[];
terminatedPids: number[];
skippedReason?: string;
}> => ({
inspectedPids: [],
terminatedPids: [],
}),
),
}));
const { acpxRuntimeConstructorMock, createAgentRegistryMock, createFileSessionStoreMock } =
vi.hoisted(() => ({
acpxRuntimeConstructorMock: vi.fn(function AcpxRuntime(options: unknown) {
@@ -59,6 +83,11 @@ vi.mock("./codex-auth-bridge.js", () => ({
prepareAcpxCodexAuthConfig: prepareAcpxCodexAuthConfigMock,
}));
vi.mock("./process-reaper.js", () => ({
cleanupOpenClawOwnedAcpxProcessTree: cleanupOpenClawOwnedAcpxProcessTreeMock,
reapStaleOpenClawOwnedAcpxOrphans: reapStaleOpenClawOwnedAcpxOrphansMock,
}));
import { getAcpRuntimeBackend } from "../runtime-api.js";
import { createAcpxRuntimeService } from "./service.js";
@@ -73,6 +102,8 @@ async function makeTempDir(): Promise<string> {
afterEach(async () => {
runtimeRegistry.clear();
prepareAcpxCodexAuthConfigMock.mockClear();
cleanupOpenClawOwnedAcpxProcessTreeMock.mockClear();
reapStaleOpenClawOwnedAcpxOrphansMock.mockClear();
acpxRuntimeConstructorMock.mockClear();
createAgentRegistryMock.mockClear();
createFileSessionStoreMock.mockClear();
@@ -155,6 +186,123 @@ describe("createAcpxRuntimeService", () => {
await service.stop?.(ctx);
});
it("reaps stale ACPX process leases from the generated wrapper root at startup", async () => {
const workspaceDir = await makeTempDir();
const ctx = createServiceContext(workspaceDir);
const runtime = createMockRuntime();
const processCleanupDeps = { sleep: vi.fn(async () => {}) };
await fs.mkdir(path.join(ctx.stateDir, "acpx"), { recursive: true });
await fs.writeFile(path.join(ctx.stateDir, "gateway-instance-id"), "gw-test\n");
await fs.writeFile(
path.join(ctx.stateDir, "acpx", "process-leases.json"),
JSON.stringify({
version: 1,
leases: [
{
leaseId: "lease-1",
gatewayInstanceId: "gw-test",
sessionKey: "agent:codex:acp:test",
wrapperRoot: path.join(ctx.stateDir, "acpx"),
wrapperPath: path.join(ctx.stateDir, "acpx", "codex-acp-wrapper.mjs"),
rootPid: 101,
commandHash: "hash",
startedAt: 1,
state: "open",
},
],
}),
);
cleanupOpenClawOwnedAcpxProcessTreeMock.mockResolvedValueOnce({
inspectedPids: [101, 102],
terminatedPids: [101, 102],
});
const service = createAcpxRuntimeService({
runtimeFactory: () => runtime as never,
processCleanupDeps,
});
await service.start(ctx);
expect(cleanupOpenClawOwnedAcpxProcessTreeMock).toHaveBeenCalledWith({
rootPid: 101,
expectedLeaseId: "lease-1",
expectedGatewayInstanceId: "gw-test",
wrapperRoot: path.join(ctx.stateDir, "acpx"),
deps: processCleanupDeps,
});
expect(ctx.logger.info).toHaveBeenCalledWith("reaped 2 stale OpenClaw-owned ACPX processes");
await service.stop?.(ctx);
});
it("runs wrapper-root orphan cleanup before dropping pending ACPX leases", async () => {
const workspaceDir = await makeTempDir();
const ctx = createServiceContext(workspaceDir);
const runtime = createMockRuntime();
const processCleanupDeps = { sleep: vi.fn(async () => {}) };
const wrapperRoot = path.join(ctx.stateDir, "acpx");
await fs.mkdir(wrapperRoot, { recursive: true });
await fs.writeFile(path.join(ctx.stateDir, "gateway-instance-id"), "gw-test\n");
await fs.writeFile(
path.join(wrapperRoot, "process-leases.json"),
JSON.stringify({
version: 1,
leases: [
{
leaseId: "lease-pending",
gatewayInstanceId: "gw-test",
sessionKey: "agent:codex:acp:test",
wrapperRoot,
wrapperPath: path.join(wrapperRoot, "codex-acp-wrapper.mjs"),
rootPid: 0,
commandHash: "hash",
startedAt: 1,
state: "open",
},
],
}),
);
reapStaleOpenClawOwnedAcpxOrphansMock.mockResolvedValueOnce({
inspectedPids: [201, 202],
terminatedPids: [201, 202],
});
const service = createAcpxRuntimeService({
runtimeFactory: () => runtime as never,
processCleanupDeps,
});
await service.start(ctx);
expect(cleanupOpenClawOwnedAcpxProcessTreeMock).not.toHaveBeenCalled();
expect(reapStaleOpenClawOwnedAcpxOrphansMock).toHaveBeenCalledWith({
wrapperRoot,
deps: processCleanupDeps,
});
expect(ctx.logger.info).toHaveBeenCalledWith("reaped 2 stale OpenClaw-owned ACPX processes");
const leaseFile = JSON.parse(
await fs.readFile(path.join(wrapperRoot, "process-leases.json"), "utf8"),
);
expect(leaseFile.leases[0].state).toBe("closed");
await service.stop?.(ctx);
});
it("keeps startup quiet when no process leases are open", async () => {
const workspaceDir = await makeTempDir();
const ctx = createServiceContext(workspaceDir);
const runtime = createMockRuntime();
const service = createAcpxRuntimeService({
runtimeFactory: () => runtime as never,
});
await service.start(ctx);
expect(cleanupOpenClawOwnedAcpxProcessTreeMock).not.toHaveBeenCalled();
expect(ctx.logger.warn).not.toHaveBeenCalled();
await service.stop?.(ctx);
});
it("registers the default backend without importing ACPX runtime until first use", async () => {
const workspaceDir = await makeTempDir();
const ctx = createServiceContext(workspaceDir);

View File

@@ -1,4 +1,6 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import { inspect } from "node:util";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import type {
@@ -14,6 +16,12 @@ import {
toAcpMcpServers,
type ResolvedAcpxPluginConfig,
} from "./config.js";
import { createAcpxProcessLeaseStore, type AcpxProcessLeaseStore } from "./process-lease.js";
import {
cleanupOpenClawOwnedAcpxProcessTree,
reapStaleOpenClawOwnedAcpxOrphans,
type AcpxProcessCleanupDeps,
} from "./process-reaper.js";
type AcpxRuntimeLike = AcpRuntime & {
probeAvailability(): Promise<void>;
@@ -33,12 +41,16 @@ let runtimeModulePromise: Promise<AcpxRuntimeModule> | null = null;
type AcpxRuntimeFactoryParams = {
pluginConfig: ResolvedAcpxPluginConfig;
gatewayInstanceId: string;
processLeaseStore: AcpxProcessLeaseStore;
wrapperRoot: string;
logger?: PluginLogger;
};
type CreateAcpxRuntimeServiceParams = {
pluginConfig?: unknown;
runtimeFactory?: (params: AcpxRuntimeFactoryParams) => AcpxRuntimeLike | Promise<AcpxRuntimeLike>;
processCleanupDeps?: AcpxProcessCleanupDeps;
};
function loadRuntimeModule(): Promise<AcpxRuntimeModule> {
@@ -57,6 +69,9 @@ function createLazyDefaultRuntime(params: AcpxRuntimeFactoryParams): AcpxRuntime
runtimePromise ??= loadRuntimeModule().then((module) => {
runtime = new module.AcpxRuntime({
cwd: params.pluginConfig.cwd,
openclawGatewayInstanceId: params.gatewayInstanceId,
openclawProcessLeaseStore: params.processLeaseStore,
openclawWrapperRoot: params.wrapperRoot,
sessionStore: module.createFileSessionStore({
stateDir: params.pluginConfig.stateDir,
}),
@@ -188,6 +203,73 @@ function shouldRunStartupProbe(env: NodeJS.ProcessEnv = process.env): boolean {
return env[ENABLE_STARTUP_PROBE_ENV] === "1";
}
async function resolveGatewayInstanceId(stateDir: string): Promise<string> {
const filePath = path.join(stateDir, "gateway-instance-id");
try {
const existing = (await fs.readFile(filePath, "utf8")).trim();
if (existing) {
return existing;
}
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
throw error;
}
}
const next = randomUUID();
await fs.mkdir(stateDir, { recursive: true });
await fs.writeFile(filePath, `${next}\n`, { mode: 0o600 });
return next;
}
async function reapOpenAcpxProcessLeases(params: {
gatewayInstanceId: string;
leaseStore: AcpxProcessLeaseStore;
deps?: AcpxProcessCleanupDeps;
}): Promise<{ inspectedPids: number[]; terminatedPids: number[] }> {
const leases = await params.leaseStore.listOpen(params.gatewayInstanceId);
const inspectedPids: number[] = [];
const terminatedPids: number[] = [];
const pendingLeaseRootResults = new Map<
string,
{ inspectedPids: number[]; terminatedPids: number[] }
>();
for (const lease of leases) {
if (lease.rootPid <= 0) {
await params.leaseStore.markState(lease.leaseId, "closing");
let result = pendingLeaseRootResults.get(lease.wrapperRoot);
if (!result) {
result = await reapStaleOpenClawOwnedAcpxOrphans({
wrapperRoot: lease.wrapperRoot,
deps: params.deps,
});
pendingLeaseRootResults.set(lease.wrapperRoot, result);
inspectedPids.push(...result.inspectedPids);
terminatedPids.push(...result.terminatedPids);
}
await params.leaseStore.markState(
lease.leaseId,
result.terminatedPids.length > 0 ? "closed" : "lost",
);
continue;
}
await params.leaseStore.markState(lease.leaseId, "closing");
const result = await cleanupOpenClawOwnedAcpxProcessTree({
rootPid: lease.rootPid,
expectedLeaseId: lease.leaseId,
expectedGatewayInstanceId: lease.gatewayInstanceId,
wrapperRoot: lease.wrapperRoot,
deps: params.deps,
});
inspectedPids.push(...result.inspectedPids);
terminatedPids.push(...result.terminatedPids);
await params.leaseStore.markState(
lease.leaseId,
result.terminatedPids.length > 0 ? "closed" : "lost",
);
}
return { inspectedPids, terminatedPids };
}
export function createAcpxRuntimeService(
params: CreateAcpxRuntimeServiceParams = {},
): OpenClawPluginService {
@@ -215,7 +297,21 @@ export function createAcpxRuntimeService(
stateDir: ctx.stateDir,
logger: ctx.logger,
});
const wrapperRoot = path.join(ctx.stateDir, "acpx");
await fs.mkdir(pluginConfig.stateDir, { recursive: true });
await fs.mkdir(wrapperRoot, { recursive: true });
const gatewayInstanceId = await resolveGatewayInstanceId(ctx.stateDir);
const processLeaseStore = createAcpxProcessLeaseStore({ stateDir: wrapperRoot });
const startupReap = await reapOpenAcpxProcessLeases({
gatewayInstanceId,
leaseStore: processLeaseStore,
deps: params.processCleanupDeps,
});
if (startupReap.terminatedPids.length > 0) {
ctx.logger.info(
`reaped ${startupReap.terminatedPids.length} stale OpenClaw-owned ACPX process${startupReap.terminatedPids.length === 1 ? "" : "es"}`,
);
}
warnOnIgnoredLegacyCompatibilityConfig({
pluginConfig,
logger: ctx.logger,
@@ -224,10 +320,16 @@ export function createAcpxRuntimeService(
runtime = params.runtimeFactory
? await params.runtimeFactory({
pluginConfig,
gatewayInstanceId,
processLeaseStore,
wrapperRoot,
logger: ctx.logger,
})
: createLazyDefaultRuntime({
pluginConfig,
gatewayInstanceId,
processLeaseStore,
wrapperRoot,
logger: ctx.logger,
});

View File

@@ -735,20 +735,23 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con
const includeDisabled = Boolean(params.includeDisabled);
let offset = 0;
let result: unknown;
for (;;) {
let shouldContinue = true;
while (shouldContinue) {
result = await callGateway("cron.list", gatewayOpts, {
includeDisabled,
agentId: listAgentId,
...(selfRemoveOnlyJobId ? { limit: 200, offset } : {}),
});
if (!selfRemoveOnlyJobId || cronListResultHasJob(result, selfRemoveOnlyJobId)) {
break;
shouldContinue = false;
} else {
const nextOffset = readCronListNextOffset(result, offset);
if (nextOffset === undefined) {
shouldContinue = false;
} else {
offset = nextOffset;
}
}
const nextOffset = readCronListNextOffset(result, offset);
if (nextOffset === undefined) {
break;
}
offset = nextOffset;
}
return jsonResult(
selfRemoveOnlyJobId ? filterCronListResultToJobId(result, selfRemoveOnlyJobId) : result,

View File

@@ -3,6 +3,7 @@ import type { OpenClawConfig } from "../../config/config.js";
import {
createAgentToAgentPolicy,
createSessionVisibilityGuard,
createSessionVisibilityRowChecker,
resolveEffectiveSessionToolsVisibility,
resolveSandboxSessionToolsVisibility,
resolveSessionToolsVisibility,
@@ -109,6 +110,175 @@ describe("createAgentToAgentPolicy", () => {
});
describe("createSessionVisibilityGuard", () => {
it("allows cross-agent spawned child rows in list results with tree visibility", () => {
const guard = createSessionVisibilityRowChecker({
action: "list",
requesterSessionKey: "agent:main:main",
visibility: "tree",
a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig),
});
expect(
guard.check({
key: "agent:codex:acp:child-1",
spawnedBy: "agent:main:main",
}),
).toEqual({ allowed: true });
});
it("allows cross-agent spawned child rows in all-visibility list results when a2a is disabled", () => {
const guard = createSessionVisibilityRowChecker({
action: "list",
requesterSessionKey: "agent:main:main",
visibility: "all",
a2aPolicy: createAgentToAgentPolicy({
tools: { agentToAgent: { enabled: false } },
} as unknown as OpenClawConfig),
});
expect(
guard.check({
key: "agent:codex:acp:child-1",
spawnedBy: "agent:main:main",
}),
).toEqual({ allowed: true });
});
it("keeps agent visibility same-agent-only for cross-agent owned child rows", () => {
const guard = createSessionVisibilityRowChecker({
action: "list",
requesterSessionKey: "agent:main:main",
visibility: "agent",
a2aPolicy: createAgentToAgentPolicy({
tools: { agentToAgent: { enabled: true, allow: ["main", "codex"] } },
} as unknown as OpenClawConfig),
});
expect(
guard.check({
key: "agent:codex:acp:child-1",
spawnedBy: "agent:main:main",
}),
).toEqual({
allowed: false,
status: "forbidden",
error:
"Session list visibility is restricted. Set tools.sessions.visibility=all to allow cross-agent access.",
});
});
it("does not do spawned lookup for list visibility without row metadata", async () => {
const callGateway = vi.fn(async () => ({
sessions: [{ key: "agent:codex:acp:child-1" }],
}));
sessionsResolutionTesting.setDepsForTest({
callGateway: callGateway as never,
});
const guard = await createSessionVisibilityGuard({
action: "list",
requesterSessionKey: "agent:main:main",
visibility: "tree",
a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig),
});
expect(guard.check("agent:codex:acp:child-1")).toMatchObject({ allowed: false });
expect(callGateway).not.toHaveBeenCalled();
sessionsResolutionTesting.setDepsForTest();
});
it("allows cross-agent spawned child sessions with tree visibility", async () => {
sessionsResolutionTesting.setDepsForTest({
callGateway: vi.fn(async (request: { method?: string; params?: { spawnedBy?: string } }) => {
if (request.method === "sessions.list") {
expect(request.params?.spawnedBy).toBe("agent:main:main");
return {
sessions: [{ key: "agent:codex:acp:child-1" }],
};
}
return {};
}) as never,
});
const guard = await createSessionVisibilityGuard({
action: "history",
requesterSessionKey: "agent:main:main",
visibility: "tree",
a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig),
});
expect(guard.check("agent:codex:acp:child-1")).toEqual({ allowed: true });
sessionsResolutionTesting.setDepsForTest();
});
it("keeps self visibility restricted even for spawned child sessions", async () => {
const guard = await createSessionVisibilityGuard({
action: "history",
requesterSessionKey: "agent:main:main",
visibility: "self",
a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig),
});
expect(guard.check("agent:codex:acp:child-1")).toEqual({
allowed: false,
status: "forbidden",
error:
"Session history visibility is restricted. Set tools.sessions.visibility=all to allow cross-agent access.",
});
});
it("allows cross-agent spawned child sessions before agent-to-agent checks with all visibility", async () => {
sessionsResolutionTesting.setDepsForTest({
callGateway: vi.fn(async (request: { method?: string; params?: { spawnedBy?: string } }) => {
if (request.method === "sessions.list") {
expect(request.params?.spawnedBy).toBe("agent:main:main");
return {
sessions: [{ key: "agent:codex:acp:child-1" }],
};
}
return {};
}) as never,
});
const guard = await createSessionVisibilityGuard({
action: "send",
requesterSessionKey: "agent:main:main",
visibility: "all",
a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig),
});
expect(guard.check("agent:codex:acp:child-1")).toEqual({ allowed: true });
sessionsResolutionTesting.setDepsForTest();
});
it("allows cross-agent spawned child status before agent-to-agent checks with all visibility", async () => {
sessionsResolutionTesting.setDepsForTest({
callGateway: vi.fn(async (request: { method?: string; params?: { spawnedBy?: string } }) => {
if (request.method === "sessions.list") {
expect(request.params?.spawnedBy).toBe("agent:main:main");
return {
sessions: [{ key: "agent:codex:acp:child-1" }],
};
}
return {};
}) as never,
});
const guard = await createSessionVisibilityGuard({
action: "status",
requesterSessionKey: "agent:main:main",
visibility: "all",
a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig),
});
expect(guard.check("agent:codex:acp:child-1")).toEqual({ allowed: true });
sessionsResolutionTesting.setDepsForTest();
});
it("does not block exact same-agent spawned targets that fall past the spawned list cap", async () => {
sessionsResolutionTesting.setDepsForTest({
callGateway: vi.fn(async (request: { method?: string; params?: { key?: string } }) => {

View File

@@ -3,6 +3,7 @@ import {
createAgentToAgentPolicy,
createSessionVisibilityChecker,
createSessionVisibilityGuard,
createSessionVisibilityRowChecker,
listSpawnedSessionKeys,
resolveEffectiveSessionToolsVisibility,
resolveSandboxSessionToolsVisibility,
@@ -15,6 +16,7 @@ export {
createAgentToAgentPolicy,
createSessionVisibilityChecker,
createSessionVisibilityGuard,
createSessionVisibilityRowChecker,
listSpawnedSessionKeys,
resolveEffectiveSessionToolsVisibility,
} from "../../plugin-sdk/session-visibility.js";

View File

@@ -1,6 +1,7 @@
export {
createAgentToAgentPolicy,
createSessionVisibilityGuard,
createSessionVisibilityRowChecker,
resolveEffectiveSessionToolsVisibility,
resolveSandboxedSessionToolContext,
} from "./sessions-access.js";

View File

@@ -22,8 +22,8 @@ import {
import type { AnyAgentTool } from "./common.js";
import { jsonResult, readStringArrayParam, readStringParam } from "./common.js";
import {
createSessionVisibilityGuard,
createAgentToAgentPolicy,
createSessionVisibilityRowChecker,
classifySessionKind,
deriveChannel,
resolveDisplaySessionKey,
@@ -136,7 +136,7 @@ export function createSessionsListTool(opts?: {
const sessions = Array.isArray(list?.sessions) ? list.sessions : [];
const storePath = typeof list?.path === "string" ? list.path : undefined;
const visibilityGuard = await createSessionVisibilityGuard({
const visibilityGuard = createSessionVisibilityRowChecker({
action: "list",
requesterSessionKey: effectiveRequesterKey,
visibility,
@@ -160,7 +160,17 @@ export function createSessionsListTool(opts?: {
if (!key) {
continue;
}
const access = visibilityGuard.check(key);
const access = visibilityGuard.check({
key,
agentId: typeof entry.agentId === "string" ? entry.agentId : undefined,
ownerSessionKey:
typeof (entry as { ownerSessionKey?: unknown }).ownerSessionKey === "string"
? (entry as { ownerSessionKey?: string }).ownerSessionKey
: undefined,
spawnedBy: typeof entry.spawnedBy === "string" ? entry.spawnedBy : undefined,
parentSessionKey:
typeof entry.parentSessionKey === "string" ? entry.parentSessionKey : undefined,
});
if (!access.allowed) {
continue;
}

View File

@@ -269,6 +269,15 @@ export function createSessionsSendTool(opts?: {
const announceTimeoutMs = timeoutSeconds === 0 ? 30_000 : timeoutMs;
const idempotencyKey = crypto.randomUUID();
let runId: string = idempotencyKey;
if (parseSessionThreadInfoFast(resolvedKey).threadId) {
return jsonResult({
runId: crypto.randomUUID(),
status: "error",
error:
"sessions_send cannot target a thread session for inter-agent coordination. Use the parent channel session key instead.",
sessionKey: displayKey,
});
}
const visibilityGuard = await createSessionVisibilityGuard({
action: "send",
requesterSessionKey: effectiveRequesterKey,
@@ -284,15 +293,6 @@ export function createSessionsSendTool(opts?: {
sessionKey: displayKey,
});
}
if (parseSessionThreadInfoFast(resolvedKey).threadId) {
return jsonResult({
runId: crypto.randomUUID(),
status: "error",
error:
"sessions_send cannot target a thread session for inter-agent coordination. Use the parent channel session key instead.",
sessionKey: displayKey,
});
}
// Capture the pre-run assistant snapshot before starting the nested run.
// Fast in-process test doubles and short-circuit agent paths can finish

View File

@@ -14,7 +14,7 @@ type SessionsToolTestConfig = {
session: { scope: "per-sender"; mainKey: string };
tools: {
agentToAgent: { enabled: boolean };
sessions?: { visibility: "all" | "own" };
sessions?: { visibility: "self" | "tree" | "agent" | "all" };
};
};
@@ -417,13 +417,20 @@ describe("resolveAnnounceTarget", () => {
describe("sessions_list gating", () => {
beforeEach(() => {
callGatewayMock.mockClear();
callGatewayMock.mockResolvedValue({
path: "/tmp/sessions.json",
sessions: [
{ key: "agent:main:main", kind: "direct" },
{ key: "agent:other:main", kind: "direct" },
],
});
callGatewayMock.mockImplementation(
(request: { method?: string; params?: { spawnedBy?: string } }) => {
if (request.method === "sessions.list" && request.params?.spawnedBy) {
return Promise.resolve({ path: "/tmp/sessions.json", sessions: [] });
}
return Promise.resolve({
path: "/tmp/sessions.json",
sessions: [
{ key: "agent:main:main", kind: "direct" },
{ key: "agent:other:main", kind: "direct" },
],
});
},
);
});
it("filters out other agents when tools.agentToAgent.enabled is false", async () => {
@@ -435,6 +442,62 @@ describe("sessions_list gating", () => {
});
});
it("keeps requester-owned cross-agent rows with tree visibility without a spawned lookup", async () => {
loadConfigMock.mockReturnValue({
session: { scope: "per-sender", mainKey: "main" },
tools: {
agentToAgent: { enabled: false },
sessions: { visibility: "tree" },
},
});
callGatewayMock.mockResolvedValueOnce({
path: "/tmp/sessions.json",
sessions: [
{
key: "agent:codex:acp:child-1",
kind: "direct",
spawnedBy: MAIN_AGENT_SESSION_KEY,
},
],
});
const result = await createMainSessionsListTool().execute("call1", {});
expect(result.details).toMatchObject({
count: 1,
sessions: [{ key: "agent:codex:acp:child-1", spawnedBy: MAIN_AGENT_SESSION_KEY }],
});
expect(callGatewayMock).toHaveBeenCalledTimes(1);
});
it("keeps requester-owned cross-agent rows with all visibility when a2a is disabled", async () => {
loadConfigMock.mockReturnValue({
session: { scope: "per-sender", mainKey: "main" },
tools: {
agentToAgent: { enabled: false },
sessions: { visibility: "all" },
},
});
callGatewayMock.mockResolvedValueOnce({
path: "/tmp/sessions.json",
sessions: [
{
key: "agent:codex:acp:child-1",
kind: "direct",
parentSessionKey: MAIN_AGENT_SESSION_KEY,
},
],
});
const result = await createMainSessionsListTool().execute("call1", {});
expect(result.details).toMatchObject({
count: 1,
sessions: [{ key: "agent:codex:acp:child-1", parentSessionKey: MAIN_AGENT_SESSION_KEY }],
});
expect(callGatewayMock).toHaveBeenCalledTimes(1);
});
it("keeps literal current keys for message previews", async () => {
callGatewayMock.mockReset();
callGatewayMock
@@ -442,7 +505,6 @@ describe("sessions_list gating", () => {
path: "/tmp/sessions.json",
sessions: [{ key: "current", kind: "direct" }],
})
.mockResolvedValueOnce({ sessions: [{ key: "current" }] })
.mockResolvedValueOnce({ messages: [{ role: "assistant", content: [] }] });
await createMainSessionsListTool().execute("call1", { messageLimit: 1 });
@@ -478,7 +540,6 @@ describe("sessions_list transcriptPath resolution", () => {
},
],
});
const result = await executeMainSessionsList();
expectWorkerTranscriptPath(result, {
containsPath: path.join("agents", "worker", "sessions"),
@@ -498,7 +559,6 @@ describe("sessions_list transcriptPath resolution", () => {
},
],
});
const result = await executeMainSessionsList();
expectWorkerTranscriptPath(result, {
containsPath: path.join("agents", "worker", "sessions"),
@@ -519,7 +579,6 @@ describe("sessions_list transcriptPath resolution", () => {
},
],
});
const result = await executeMainSessionsList();
expectWorkerTranscriptPath(result, {
containsPath: path.join("agents", "worker", "sessions"),
@@ -540,7 +599,6 @@ describe("sessions_list transcriptPath resolution", () => {
},
],
});
const result = await executeMainSessionsList();
expectWorkerTranscriptPath(result, {
containsPath: path.join(stateDir, "agents", "worker", "sessions"),
@@ -562,7 +620,6 @@ describe("sessions_list transcriptPath resolution", () => {
},
],
});
const result = await executeMainSessionsList();
const expectedSessionsDir = path.dirname(templateStorePath.replace("{agentId}", "worker"));
expectWorkerTranscriptPath(result, {
@@ -595,7 +652,6 @@ describe("sessions_list channel derivation", () => {
},
],
});
const result = await executeMainSessionsList();
expect(result.details).toMatchObject({

View File

@@ -31,6 +31,14 @@ export type SessionAccessResult =
| { allowed: true }
| { allowed: false; error: string; status: "forbidden" };
export type SessionVisibilityRow = {
key: string;
agentId?: string;
ownerSessionKey?: string;
spawnedBy?: string;
parentSessionKey?: string;
};
export async function listSpawnedSessionKeys(params: {
requesterSessionKey: string;
limit?: number;
@@ -191,11 +199,56 @@ export function createSessionVisibilityChecker(params: {
a2aPolicy: AgentToAgentPolicy;
spawnedKeys: Set<string> | null;
}): { check: (targetSessionKey: string) => SessionAccessResult } {
const requesterAgentId = resolveAgentIdFromSessionKey(params.requesterSessionKey);
const spawnedKeys = params.spawnedKeys;
const rowChecker = createSessionVisibilityRowChecker({
action: params.action,
requesterSessionKey: params.requesterSessionKey,
visibility: params.visibility,
a2aPolicy: params.a2aPolicy,
});
const check = (targetSessionKey: string): SessionAccessResult => {
const targetAgentId = resolveAgentIdFromSessionKey(targetSessionKey);
const isSpawnedSession = spawnedKeys?.has(targetSessionKey) === true;
return rowChecker.check({
key: targetSessionKey,
spawnedBy: isSpawnedSession ? params.requesterSessionKey : undefined,
});
};
return { check };
}
function rowOwnedByRequester(row: SessionVisibilityRow, requesterSessionKey: string): boolean {
return (
row.ownerSessionKey === requesterSessionKey ||
row.spawnedBy === requesterSessionKey ||
row.parentSessionKey === requesterSessionKey
);
}
export function createSessionVisibilityRowChecker(params: {
action: SessionAccessAction;
requesterSessionKey: string;
visibility: SessionToolsVisibility;
a2aPolicy: AgentToAgentPolicy;
}): { check: (row: SessionVisibilityRow) => SessionAccessResult } {
const requesterAgentId = resolveAgentIdFromSessionKey(params.requesterSessionKey);
const check = (row: SessionVisibilityRow): SessionAccessResult => {
const targetSessionKey = row.key;
const targetAgentId = row.agentId ?? resolveAgentIdFromSessionKey(targetSessionKey);
const isRequesterSession =
targetSessionKey === params.requesterSessionKey || targetSessionKey === "current";
const isRequesterOwned = rowOwnedByRequester(row, params.requesterSessionKey);
// Row ownership is stronger than agent ids: ACP children may use a backend
// agent id while still belonging to the requester that spawned them.
if (
!isRequesterSession &&
isRequesterOwned &&
(params.visibility === "tree" || params.visibility === "all")
) {
return { allowed: true };
}
const isCrossAgent = targetAgentId !== requesterAgentId;
if (isCrossAgent) {
if (params.visibility !== "all") {
@@ -222,7 +275,7 @@ export function createSessionVisibilityChecker(params: {
return { allowed: true };
}
if (params.visibility === "self" && targetSessionKey !== params.requesterSessionKey) {
if (params.visibility === "self" && !isRequesterSession) {
return {
allowed: false,
status: "forbidden",
@@ -230,11 +283,7 @@ export function createSessionVisibilityChecker(params: {
};
}
if (
params.visibility === "tree" &&
targetSessionKey !== params.requesterSessionKey &&
!spawnedKeys?.has(targetSessionKey)
) {
if (params.visibility === "tree" && !isRequesterSession && !isRequesterOwned) {
return {
allowed: false,
status: "forbidden",
@@ -256,8 +305,10 @@ export async function createSessionVisibilityGuard(params: {
}): Promise<{
check: (targetSessionKey: string) => SessionAccessResult;
}> {
// Listing already has row ownership metadata; direct key actions still need
// this lookup until every caller can pass a normalized session row.
const spawnedKeys =
params.visibility === "tree"
params.action !== "list" && (params.visibility === "tree" || params.visibility === "all")
? await listSpawnedSessionKeys({ requesterSessionKey: params.requesterSessionKey })
: null;
return createSessionVisibilityChecker({