diff --git a/CHANGELOG.md b/CHANGELOG.md index f6210640f80..09ef18320e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,9 @@ Docs: https://docs.openclaw.ai - Codex/approvals: in Codex approval modes, stop installing the pre-guardian native `PermissionRequest` hook by default so Codex's reviewer can approve safe commands before OpenClaw surfaces an approval, remember `allow-always` decisions for identical Codex native `PermissionRequest` payloads within the active session window, and make plugin approval requests validate/render their actual allowed decisions so Telegram and other native approval UIs cannot offer stale actions. Thanks @shakkernerd. - PR triage: mark external pull requests with `proof: supplied` when Barnacle finds structured real behavior proof, keep stale negative proof labels in sync across CRLF-edited PR bodies, and let ClawSweeper own the stronger `proof: sufficient` judgement. - Sessions CLI: show the selected agent runtime in the `openclaw sessions` table so terminal output matches the runtime visibility already present in JSON/status surfaces. Thanks @vincentkoc. +- ACPX/Codex: preserve trusted Codex project declarations when launching isolated Codex ACP sessions, avoiding interactive trust prompts in headless runs. Thanks @Stedyclaw. +- ACPX/Codex: reap stale OpenClaw-owned ACPX/Codex ACP process trees on startup and after ACP session close, preventing orphaned harness processes from slowing the Gateway. Thanks @91wan. +- ACP sessions: allow parent agents to inspect and message their own spawned cross-agent ACP sessions without enabling broad agent-to-agent visibility. Thanks @barronlroth. - Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface. - Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids. - Logging/Talk: route shared Talk lifecycle events into bounded file and OTLP log records while keeping transcript text, audio payloads, turn ids, call ids, and provider item ids out of logs. diff --git a/docs/.i18n/glossary.zh-CN.json b/docs/.i18n/glossary.zh-CN.json index 98af41ede60..91a1508c089 100644 --- a/docs/.i18n/glossary.zh-CN.json +++ b/docs/.i18n/glossary.zh-CN.json @@ -31,6 +31,10 @@ "source": "Message lifecycle refactor", "target": "消息生命周期重构" }, + { + "source": "ACP lifecycle refactor", + "target": "ACP 生命周期重构" + }, { "source": "Channel message API", "target": "频道消息 API" diff --git a/docs/docs.json b/docs/docs.json index 6dd32dfe62a..59679e162d0 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -1208,6 +1208,7 @@ "plugins/sdk-channel-plugins", "plugins/sdk-channel-message", "plugins/sdk-provider-plugins", + "plugins/cli-backend-plugins", "plugins/adding-capabilities", "plugins/compatibility", "plugins/sdk-migration" diff --git a/docs/gateway/cli-backends.md b/docs/gateway/cli-backends.md index c754064b33f..9b7056a7afa 100644 --- a/docs/gateway/cli-backends.md +++ b/docs/gateway/cli-backends.md @@ -23,6 +23,12 @@ If you want a full harness runtime with ACP session controls, background tasks, thread/conversation binding, and persistent external coding sessions, use [ACP Agents](/tools/acp-agents) instead. CLI backends are not ACP. + + Building a new backend plugin? Use + [CLI backend plugins](/plugins/cli-backend-plugins). This page is for users + configuring and operating an already registered backend. + + ## Beginner-friendly quick start You can use Codex CLI **without any config** (the bundled OpenAI plugin diff --git a/docs/plugins/building-plugins.md b/docs/plugins/building-plugins.md index ab35a819640..efd88963ffc 100644 --- a/docs/plugins/building-plugins.md +++ b/docs/plugins/building-plugins.md @@ -35,6 +35,9 @@ install from npm during the launch cutover. Add a model provider (LLM, proxy, or custom endpoint) + + Map a local AI CLI into OpenClaw's text fallback runner + Register agent tools, event hooks, or services - continue below @@ -160,7 +163,7 @@ A single plugin can register any number of capabilities via the `api` object: | Capability | Registration method | Detailed guide | | ---------------------- | ------------------------------------------------ | ------------------------------------------------------------------------------- | | Text inference (LLM) | `api.registerProvider(...)` | [Provider Plugins](/plugins/sdk-provider-plugins) | -| CLI inference backend | `api.registerCliBackend(...)` | [CLI Backends](/gateway/cli-backends) | +| CLI inference backend | `api.registerCliBackend(...)` | [CLI Backend Plugins](/plugins/cli-backend-plugins) | | Channel / messaging | `api.registerChannel(...)` | [Channel Plugins](/plugins/sdk-channel-plugins) | | Speech (TTS/STT) | `api.registerSpeechProvider(...)` | [Provider Plugins](/plugins/sdk-provider-plugins#step-5-add-extra-capabilities) | | Realtime transcription | `api.registerRealtimeTranscriptionProvider(...)` | [Provider Plugins](/plugins/sdk-provider-plugins#step-5-add-extra-capabilities) | @@ -382,6 +385,9 @@ reserved surfaces, not as the default pattern for new third-party plugins. Build a model provider plugin + + Register a local AI CLI backend + Import map and registration API reference diff --git a/docs/plugins/cli-backend-plugins.md b/docs/plugins/cli-backend-plugins.md new file mode 100644 index 00000000000..43a2c1929f8 --- /dev/null +++ b/docs/plugins/cli-backend-plugins.md @@ -0,0 +1,310 @@ +--- +summary: "Build a plugin that registers a local AI CLI backend" +title: "Building CLI backend plugins" +sidebarTitle: "CLI backend plugins" +read_when: + - You are building a local AI CLI backend plugin + - You want to register a backend for model refs such as acme-cli/model + - You need to map a third-party CLI into OpenClaw's text fallback runner +--- + +CLI backend plugins let OpenClaw call a local AI CLI as a text inference +backend. The backend appears as a provider prefix in model refs: + +```text +acme-cli/acme-large +``` + +Use a CLI backend when the upstream integration is already exposed as a local +command, when the CLI owns local login state, or when the CLI is a useful +fallback if API providers are unavailable. + + + If the upstream service exposes a normal HTTP model API, write a + [provider plugin](/plugins/sdk-provider-plugins) instead. If the upstream + runtime owns complete agent sessions, tool events, compaction, or background + task state, use an [agent harness](/plugins/sdk-agent-harness). + + +## What the plugin owns + +A CLI backend plugin has three contracts: + +| Contract | File | Purpose | +| -------------------- | ---------------------- | --------------------------------------------------------- | +| Package entry | `package.json` | Points OpenClaw at the plugin runtime module | +| Manifest ownership | `openclaw.plugin.json` | Declares the backend id before runtime loads | +| Runtime registration | `index.ts` | Calls `api.registerCliBackend(...)` with command defaults | + +The manifest is discovery metadata. It does not execute the CLI and does not +register runtime behavior. Runtime behavior starts when the plugin entry calls +`api.registerCliBackend(...)`. + +## Minimal backend plugin + + + + ```json package.json + { + "name": "@acme/openclaw-acme-cli", + "version": "1.0.0", + "type": "module", + "openclaw": { + "extensions": ["./index.ts"], + "compat": { + "pluginApi": ">=2026.3.24-beta.2", + "minGatewayVersion": "2026.3.24-beta.2" + }, + "build": { + "openclawVersion": "2026.3.24-beta.2", + "pluginSdkVersion": "2026.3.24-beta.2" + } + }, + "dependencies": { + "openclaw": "^2026.3.24" + }, + "devDependencies": { + "typescript": "^5.9.0" + } + } + ``` + + Published packages must ship built JavaScript runtime files. If your source + entry is `./src/index.ts`, add `openclaw.runtimeExtensions` that points at + the built JavaScript peer. See [Entry points](/plugins/sdk-entrypoints). + + + + + ```json openclaw.plugin.json + { + "id": "acme-cli", + "name": "Acme CLI", + "description": "Run Acme's local AI CLI through OpenClaw", + "cliBackends": ["acme-cli"], + "setup": { + "cliBackends": ["acme-cli"], + "requiresRuntime": false + }, + "activation": { + "onStartup": false + }, + "configSchema": { + "type": "object", + "additionalProperties": false + } + } + ``` + + `cliBackends` is the runtime ownership list. It lets OpenClaw auto-load the + plugin when config or model selection mentions `acme-cli/...`. + + `setup.cliBackends` is the descriptor-first setup surface. Add it when + model discovery, onboarding, or status should recognize the backend without + loading plugin runtime. Use `requiresRuntime: false` only when those static + descriptors are enough for setup. + + + + + ```typescript index.ts + import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; + import { + CLI_FRESH_WATCHDOG_DEFAULTS, + CLI_RESUME_WATCHDOG_DEFAULTS, + type CliBackendPlugin, + } from "openclaw/plugin-sdk/cli-backend"; + + function buildAcmeCliBackend(): CliBackendPlugin { + return { + id: "acme-cli", + liveTest: { + defaultModelRef: "acme-cli/acme-large", + defaultImageProbe: false, + defaultMcpProbe: false, + docker: { + npmPackage: "@acme/acme-cli", + binaryName: "acme", + }, + }, + config: { + command: "acme", + args: ["chat", "--json"], + output: "json", + input: "stdin", + modelArg: "--model", + sessionArg: "--session", + sessionMode: "existing", + sessionIdFields: ["session_id", "conversation_id"], + systemPromptFileArg: "--system-file", + systemPromptWhen: "first", + imageArg: "--image", + imageMode: "repeat", + reliability: { + watchdog: { + fresh: { ...CLI_FRESH_WATCHDOG_DEFAULTS }, + resume: { ...CLI_RESUME_WATCHDOG_DEFAULTS }, + }, + }, + serialize: true, + }, + }; + } + + export default definePluginEntry({ + id: "acme-cli", + name: "Acme CLI", + description: "Run Acme's local AI CLI through OpenClaw", + register(api) { + api.registerCliBackend(buildAcmeCliBackend()); + }, + }); + ``` + + The backend id must match the manifest `cliBackends` entry. The registered + `config` is only the default; user config under + `agents.defaults.cliBackends.acme-cli` is merged over it at runtime. + + + + +## Config shape + +`CliBackendConfig` describes how OpenClaw should launch and parse the CLI: + +| Field | Use | +| ----------------------------------------- | ----------------------------------------------------------- | +| `command` | Binary name or absolute command path | +| `args` | Base argv for fresh runs | +| `resumeArgs` | Alternate argv for resumed sessions; supports `{sessionId}` | +| `output` / `resumeOutput` | Parser: `json`, `jsonl`, or `text` | +| `input` | Prompt transport: `arg` or `stdin` | +| `modelArg` | Flag used before the model id | +| `modelAliases` | Map OpenClaw model ids to CLI-native ids | +| `sessionArg` / `sessionArgs` | How to pass a session id | +| `sessionMode` | `always`, `existing`, or `none` | +| `sessionIdFields` | JSON fields OpenClaw reads from CLI output | +| `systemPromptArg` / `systemPromptFileArg` | System prompt transport | +| `systemPromptWhen` | `first`, `always`, or `never` | +| `imageArg` / `imageMode` | Image path support | +| `serialize` | Keep same-backend runs ordered | +| `reliability.watchdog` | No-output timeout tuning | + +Prefer the smallest static config that matches the CLI. Add plugin callbacks +only for behavior that really belongs to the backend. + +## Advanced backend hooks + +`CliBackendPlugin` can also define: + +| Hook | Use | +| ---------------------------------- | ------------------------------------------------------ | +| `normalizeConfig(config, context)` | Rewrite legacy user config after merge | +| `resolveExecutionArgs(ctx)` | Add request-scoped flags such as thinking effort | +| `prepareExecution(ctx)` | Create temporary auth or config bridges before launch | +| `transformSystemPrompt(ctx)` | Apply a final CLI-specific system prompt transform | +| `textTransforms` | Bidirectional prompt/output replacements | +| `defaultAuthProfileId` | Prefer a specific OpenClaw auth profile | +| `authEpochMode` | Decide how auth changes invalidate stored CLI sessions | +| `nativeToolMode` | Declare whether the CLI has always-on native tools | +| `bundleMcp` / `bundleMcpMode` | Opt into OpenClaw's loopback MCP tool bridge | + +Keep these hooks provider-owned. Do not add CLI-specific branches to core when a +backend hook can express the behavior. + +## MCP tool bridge + +CLI backends do not receive OpenClaw tools by default. If the CLI can consume an +MCP configuration, opt in explicitly: + +```typescript +return { + id: "acme-cli", + bundleMcp: true, + bundleMcpMode: "codex-config-overrides", + config: { + command: "acme", + args: ["chat", "--json"], + output: "json", + }, +}; +``` + +Supported bridge modes are: + +| Mode | Use | +| ------------------------ | ---------------------------------------------------------------- | +| `claude-config-file` | CLIs that accept an MCP config file | +| `codex-config-overrides` | CLIs that accept config overrides on argv | +| `gemini-system-settings` | CLIs that read MCP settings from their system settings directory | + +Only enable the bridge when the CLI can actually consume it. If the CLI has its +own built-in tool layer that cannot be disabled, set `nativeToolMode: +"always-on"` so OpenClaw can fail closed when a caller requires no native tools. + +## User configuration + +Users can override any backend default: + +```json5 +{ + agents: { + defaults: { + cliBackends: { + "acme-cli": { + command: "/opt/acme/bin/acme", + args: ["chat", "--json", "--profile", "work"], + modelAliases: { + large: "acme-large-2026", + }, + }, + }, + model: { + primary: "openai/gpt-5.5", + fallbacks: ["acme-cli/large"], + }, + }, + }, +} +``` + +Document the minimum override users are likely to need. Usually that is only +`command` when the binary is outside `PATH`. + +## Verification + +For bundled plugins, add a focused test around the builder and setup +registration, then run the plugin's targeted test lane: + +```bash +pnpm test extensions/acme-cli +``` + +For local or installed plugins, verify discovery and one real model run: + +```bash +openclaw plugins inspect acme-cli --runtime --json +openclaw agent --message "reply exactly: backend ok" --model acme-cli/acme-large +``` + +If the backend supports images or MCP, add a live smoke that proves those paths +with the real CLI. Do not rely on static inspection for prompt, image, MCP, or +session-resume behavior. + +## Checklist + +`package.json` has `openclaw.extensions` and built runtime entries for published packages +`openclaw.plugin.json` declares `cliBackends` and intentional `activation.onStartup` +`setup.cliBackends` is present when setup/model discovery should see the backend cold +`api.registerCliBackend(...)` uses the same backend id as the manifest +User overrides under `agents.defaults.cliBackends.` still win +Session, system prompt, image, and output parser settings match the real CLI contract +Targeted tests and at least one live CLI smoke prove the backend path + +## Related + +- [CLI backends](/gateway/cli-backends) - user configuration and runtime behavior +- [Building plugins](/plugins/building-plugins) - package and manifest basics +- [Plugin SDK overview](/plugins/sdk-overview) - registration API reference +- [Plugin manifest](/plugins/manifest) - `cliBackends` and setup descriptors +- [Agent harness](/plugins/sdk-agent-harness) - full external agent runtimes diff --git a/docs/plugins/sdk-overview.md b/docs/plugins/sdk-overview.md index b3a253d1b3c..5c237ba6a98 100644 --- a/docs/plugins/sdk-overview.md +++ b/docs/plugins/sdk-overview.md @@ -20,7 +20,7 @@ reference for **what to import** and **what you can register**. -Looking for a how-to guide instead? Start with [Building plugins](/plugins/building-plugins), use [Channel plugins](/plugins/sdk-channel-plugins) for channel plugins, [Provider plugins](/plugins/sdk-provider-plugins) for provider plugins, and [Plugin hooks](/plugins/hooks) for tool or lifecycle hook plugins. +Looking for a how-to guide instead? Start with [Building plugins](/plugins/building-plugins), use [Channel plugins](/plugins/sdk-channel-plugins) for channel plugins, [Provider plugins](/plugins/sdk-provider-plugins) for provider plugins, [CLI backend plugins](/plugins/cli-backend-plugins) for local AI CLI backends, and [Plugin hooks](/plugins/hooks) for tool or lifecycle hook plugins. ## Import convention @@ -261,6 +261,9 @@ AI CLI backend such as `codex-cli`. the CLI dialect, such as mapping OpenClaw thinking levels to a native effort flag. +For an end-to-end authoring guide, see +[CLI backend plugins](/plugins/cli-backend-plugins). + ### Exclusive slots | Method | What it registers | diff --git a/docs/refactor/acp.md b/docs/refactor/acp.md new file mode 100644 index 00000000000..2c9fa2e15b5 --- /dev/null +++ b/docs/refactor/acp.md @@ -0,0 +1,298 @@ +--- +summary: "Migration plan for making ACP session and ACPX process ownership explicit" +read_when: + - Refactoring ACP session lifecycle or ACPX process cleanup + - Debugging ACPX orphan processes, PID reuse, or multi-gateway cleanup safety + - Changing sessions_list visibility for spawned ACP or subagent sessions + - Designing ownership metadata for background tasks, ACP sessions, or process leases +title: "ACP lifecycle refactor" +sidebarTitle: "ACP lifecycle refactor" +--- + +ACP lifecycle currently works, but too much of it is inferred after the fact. +Process cleanup reconstructs ownership from PIDs, command strings, wrapper +paths, and the live process table. Session visibility reconstructs ownership +from session-key strings plus secondary `sessions.list({ spawnedBy })` lookups. +That makes narrow fixes possible, but it also makes edge cases easy to miss: +PID reuse, quoted commands, adapter grandchildren, multi-gateway state roots, +`cancel` versus `close`, and `tree` versus `all` visibility all become separate +places to rediscover the same ownership rules. + +This refactor makes ownership first-class. The goal is not a new ACP product +surface; it is a safer internal contract for the existing ACP and ACPX behavior. + +## Goals + +- Cleanup never signals a process unless current live evidence matches an + OpenClaw-owned lease. +- `cancel`, `close`, and startup reaping have distinct lifecycle intents. +- `sessions_list`, `sessions_history`, `sessions_send`, and status checks use + the same requester-owned session model. +- Multi-gateway installs cannot reap each other's ACPX wrappers. +- Old ACPX session records keep working during migration. +- The runtime remains plugin-owned; core does not learn ACPX package details. + +## Non-goals + +- Replacing ACPX or changing the public `/acp` command surface. +- Moving vendor-specific ACP adapter behavior into core. +- Requiring users to manually clean state before upgrading. +- Making `cancel` close reusable ACP sessions. + +## Target Model + +### Gateway Instance Identity + +Each Gateway process should have a stable runtime instance id: + +```ts +type GatewayInstanceId = string; +``` + +It can be generated on Gateway startup and persisted in state for the life of +that install. It is not a security secret; it is an ownership discriminator used +to avoid confusing one Gateway's ACP processes with another Gateway's processes. + +### ACP Session Ownership + +Every spawned ACP session should have normalized ownership metadata: + +```ts +type AcpSessionOwner = { + sessionKey: string; + spawnedBy?: string; + parentSessionKey?: string; + ownerSessionKey: string; + agentId: string; + backend: "acpx"; + gatewayInstanceId: GatewayInstanceId; + createdAt: number; +}; +``` + +The Gateway should return these fields on session rows where they are known. +Visibility filtering should be a pure check over row metadata: + +```ts +canSeeSessionRow({ + row, + requesterSessionKey, + visibility, + a2aPolicy, +}); +``` + +That removes hidden secondary `sessions.list({ spawnedBy })` calls from +visibility checks. A spawned cross-agent ACP child is requester-owned because +the row says so, not because a second query happens to find it. + +### ACPX Process Leases + +Every generated wrapper launch should create a lease record: + +```ts +type AcpxProcessLease = { + leaseId: string; + gatewayInstanceId: GatewayInstanceId; + sessionKey: string; + wrapperRoot: string; + wrapperPath: string; + rootPid: number; + processGroupId?: number; + commandHash: string; + startedAt: number; + state: "open" | "closing" | "closed" | "lost"; +}; +``` + +The wrapper process should receive the lease id and gateway instance id in its +environment: + +```sh +OPENCLAW_ACPX_LEASE_ID=... +OPENCLAW_GATEWAY_INSTANCE_ID=... +``` + +When the platform allows it, verification should prefer live process metadata +that cannot be confused by command quoting: + +- root PID still exists +- live wrapper path is under `wrapperRoot` +- process group matches the lease when available +- environment contains the expected lease id when readable +- command hash or executable path matches the lease + +If the live process cannot be verified, cleanup fails closed. + +## Lifecycle Controller + +Introduce one ACPX lifecycle controller that owns process leases and cleanup +policy: + +```ts +interface AcpxLifecycleController { + ensureSession(input: AcpRuntimeEnsureInput): Promise; + cancelTurn(handle: AcpRuntimeHandle): Promise; + closeSession(input: { + handle: AcpRuntimeHandle; + discardPersistentState?: boolean; + reason?: string; + }): Promise; + reapStartupOrphans(): Promise; + verifyOwnedTree(lease: AcpxProcessLease): Promise; +} +``` + +`cancelTurn` requests turn cancellation only. It must not reap reusable wrapper +or adapter processes. + +`closeSession` is allowed to reap, but only after loading the session record, +loading the lease, and verifying the live process tree still belongs to that +lease. + +`reapStartupOrphans` starts from open leases in state. It may use the process +table to find descendants, but it should not scan arbitrary ACP-looking +commands first and then decide they are probably ours. + +## Wrapper Contract + +Generated wrappers should stay small. They should: + +- start the adapter in a process group where supported +- forward normal termination signals to the process group +- detect parent death +- on parent death, send SIGTERM, then keep the wrapper alive until the SIGKILL + fallback runs +- report root PID and process group id back to the lifecycle controller when + that is available + +Wrappers should not decide session policy. They only enforce local process-tree +cleanup for their own adapter group. + +## Session Visibility Contract + +Visibility should use normalized row ownership: + +```ts +type SessionVisibilityInput = { + requesterSessionKey: string; + row: { + key: string; + agentId: string; + ownerSessionKey?: string; + spawnedBy?: string; + parentSessionKey?: string; + }; + visibility: "self" | "tree" | "agent" | "all"; + a2aPolicy: AgentToAgentPolicy; +}; +``` + +Rules: + +- `self`: only the requester session. +- `tree`: requester session plus rows owned by or spawned from the requester. +- `all`: all same-agent rows, a2a-allowed cross-agent rows, and requester-owned + spawned cross-agent rows even when general a2a is disabled. +- `agent`: same agent only, unless an explicit owner relationship says the row + belongs to the requester. + +This makes `tree` and `all` monotonic: `all` must not hide an owned child that +`tree` would show. + +## Migration Plan + +### Phase 1: Add Identity And Leases + +- Add `gatewayInstanceId` to Gateway state. +- Add an ACPX lease store under the ACPX state directory. +- Write a lease before spawning a generated wrapper. +- Store `leaseId` on new ACPX session records. +- Keep existing PID and command fields for old records. + +### Phase 2: Lease-First Cleanup + +- Change close cleanup to load `leaseId` first. +- Verify live process ownership against the lease before signaling. +- Keep the current root PID and wrapper-root fallback only for legacy records. +- Mark leases `closed` after verified cleanup. +- Mark leases `lost` when the process is gone before cleanup. + +### Phase 3: Lease-First Startup Reaping + +- Startup reaping scans open leases. +- For each lease, verify the root process and collect descendants. +- Reap verified trees children-first. +- Expire old `closed` and `lost` leases with a bounded retention window. +- Keep command-marker scanning only as a temporary legacy fallback, guarded by + wrapper root and Gateway instance where possible. + +### Phase 4: Session Ownership Rows + +- Add ownership metadata to Gateway session rows. +- Teach ACPX, subagent, background-task, and session-store writers to populate + `ownerSessionKey` or `spawnedBy`. +- Convert session visibility checks to use row metadata. +- Remove visibility-time secondary `sessions.list({ spawnedBy })` lookups. + +### Phase 5: Remove Legacy Heuristics + +After one release window: + +- stop relying on stored root command strings for non-legacy ACPX cleanup +- remove command-marker startup scans +- remove visibility fallback list lookups +- keep defensive fail-closed behavior for missing or unverifiable leases + +## Tests + +Add two table-driven suites. + +Process lifecycle simulator: + +- PID reused by unrelated process +- PID reused by another Gateway's wrapper root +- stored wrapper command is shell-quoted, live `ps` command is not +- adapter child exits, grandchild remains in the process group +- parent death SIGTERM fallback reaches SIGKILL +- process listing unavailable +- stale lease with missing process +- startup orphan with wrapper, adapter child, and grandchild + +Session visibility matrix: + +- `self`, `tree`, `agent`, `all` +- a2a enabled and disabled +- same-agent row +- cross-agent row +- requester-owned spawned cross-agent ACP row +- sandboxed requester clamped to `tree` +- list, history, send, and status actions + +The important invariant: a requester-owned spawned child is visible wherever +the configured visibility includes the requester session tree, and `all` is not +less capable than `tree`. + +## Compatibility Notes + +Old session records may not have `leaseId`. They should use the legacy +fail-closed cleanup path: + +- require a live root process +- require wrapper-root ownership when a generated wrapper is expected +- require command agreement for non-wrapper roots +- never signal based only on stale stored PID metadata + +If a legacy record cannot be verified, leave it alone. Startup lease cleanup and +the next release window should eventually retire the fallback. + +## Success Criteria + +- Closing an old or stale ACPX session cannot kill another Gateway's process. +- Parent death does not leave stubborn adapter grandchildren running. +- `cancel` aborts the active turn without closing reusable sessions. +- `sessions_list` can show requester-owned cross-agent ACP children under both + `tree` and `all`. +- Startup cleanup is driven by leases, not broad command-string scans. +- The focused process and visibility matrix tests cover every edge case that + previously required one-off review fixes. diff --git a/docs/tools/acp-agents.md b/docs/tools/acp-agents.md index 8726b636c06..bec9d0b618d 100644 --- a/docs/tools/acp-agents.md +++ b/docs/tools/acp-agents.md @@ -60,6 +60,7 @@ an unavailable backend. - If `plugins.allow` is set, it is a restrictive plugin inventory and **must** include `acpx`; otherwise the installed ACP backend is intentionally blocked and `/acp doctor` reports the missing allowlist entry. - The Codex ACP adapter is staged with the `acpx` plugin and launched locally when possible. + - Codex ACP runs with an isolated `CODEX_HOME`; OpenClaw copies only trusted project entries from the host Codex config and trusts the active workspace, leaving auth, notifications, and hooks on the host config. - Other target harness adapters may still be fetched on demand with `npx` the first time you use them. - Vendor auth still has to exist on the host for that harness. - If the host has no npm or network access, first-run adapter fetches fail until caches are pre-warmed or the adapter is installed another way. @@ -154,6 +155,7 @@ Quick `/acp` flow from chat: - Gateway commands stay local. `/acp ...`, `/status`, and `/unfocus` are never sent as normal prompt text to a bound ACP harness. - `cancel` aborts the active turn when the backend supports cancellation; it does not delete the binding or session metadata. - `close` ends the ACP session from OpenClaw's point of view and removes the binding. A harness may still keep its own upstream history if it supports resume. + - The acpx plugin cleans up OpenClaw-owned wrapper and adapter process trees after `close`, and reaps stale OpenClaw-owned ACPX orphans during Gateway startup. - Idle runtime workers are eligible for cleanup after `acp.runtime.ttlMinutes`; stored session metadata remains available for `/acp sessions`. @@ -830,7 +832,7 @@ permission modes, see | Missing ACP metadata for bound session | Stale/deleted ACP session metadata. | Recreate with `/acp spawn`, then rebind/focus thread. | | `AcpRuntimeError: Permission prompt unavailable in non-interactive mode` | `permissionMode` blocks writes/exec in non-interactive ACP session. | Set `plugins.entries.acpx.config.permissionMode` to `approve-all` and restart gateway. See [Permission configuration](/tools/acp-agents-setup#permission-configuration). | | ACP session fails early with little output | Permission prompts are blocked by `permissionMode`/`nonInteractivePermissions`. | Check gateway logs for `AcpRuntimeError`. For full permissions, set `permissionMode=approve-all`; for graceful degradation, set `nonInteractivePermissions=deny`. | -| ACP session stalls indefinitely after completing work | Harness process finished but ACP session did not report completion. | Monitor with `ps aux \| grep acpx`; kill stale processes manually. | +| ACP session stalls indefinitely after completing work | Harness process finished but ACP session did not report completion. | Update OpenClaw; current acpx cleanup reaps OpenClaw-owned stale wrapper and adapter processes on close and Gateway startup. | | Harness sees `<<>>` | Internal event envelope leaked across the ACP boundary. | Update OpenClaw and rerun the completion flow; external harnesses should receive plain completion prompts only. | ## Related diff --git a/extensions/acpx/src/codex-auth-bridge.test.ts b/extensions/acpx/src/codex-auth-bridge.test.ts index 3dfee4a1401..18176d39259 100644 --- a/extensions/acpx/src/codex-auth-bridge.test.ts +++ b/extensions/acpx/src/codex-auth-bridge.test.ts @@ -210,6 +210,34 @@ describe("prepareAcpxCodexAuthConfig", () => { expect(wrapper).toContain("defaultArgs = [installedBinPath]"); }); + it("keeps the orphaned wrapper alive long enough to force-kill the child process group", async () => { + const root = await makeTempDir(); + const stateDir = path.join(root, "state"); + const generated = generatedCodexPaths(stateDir); + const pluginConfig = resolveAcpxPluginConfig({ + rawConfig: {}, + workspaceDir: root, + }); + + await prepareAcpxCodexAuthConfig({ + pluginConfig, + stateDir, + }); + + const wrapper = await fs.readFile(generated.wrapperPath, "utf8"); + expect(wrapper).toContain('killChildTree("SIGTERM")'); + expect(wrapper).toContain('killChildTree("SIGKILL", { force: true })'); + expect(wrapper).toMatch( + /forceKillTimer = setTimeout\(\(\) => \{\s*killChildTree\("SIGKILL", \{ force: true \}\);\s*process\.exit\(1\);/s, + ); + expect(wrapper).toMatch( + /child\.on\("exit", \(code, signal\) => \{\s*if \(parentWatcher\) \{\s*clearInterval\(parentWatcher\);\s*\}\s*if \(orphanCleanupStarted\) \{\s*return;\s*\}/s, + ); + expect(wrapper).not.toMatch( + /forceKillTimer = setTimeout\(\(\) => killChildTree\("SIGKILL"\), 1_500\);\s*forceKillTimer\.unref\?\.\(\);\s*process\.exit\(1\);/s, + ); + }); + it("uses the bundled Claude ACP dependency by default when it is installed", async () => { const root = await makeTempDir(); const stateDir = path.join(root, "state"); @@ -251,9 +279,19 @@ describe("prepareAcpxCodexAuthConfig", () => { resolveInstalledCodexAcpBinPath: async () => installedBinPath, }); - const { stdout } = await execFileAsync(process.execPath, [generated.wrapperPath], { - cwd: root, - }); + const { stdout } = await execFileAsync( + process.execPath, + [ + generated.wrapperPath, + "--openclaw-acpx-lease-id", + "lease-1", + "--openclaw-gateway-instance-id", + "gateway-1", + ], + { + cwd: root, + }, + ); const launched = JSON.parse(stdout.trim()) as { argv?: unknown; codexHome?: unknown }; expect(launched.argv).toEqual([]); const expectedCodexHome = await fs.realpath(path.join(stateDir, "acpx", "codex-home")); @@ -326,6 +364,8 @@ describe("prepareAcpxCodexAuthConfig", () => { const isolatedConfig = await fs.readFile(generated.configPath, "utf8"); expect(isolatedConfig).not.toContain("notify"); expect(isolatedConfig).not.toContain("SkyComputerUseClient"); + expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(root))}]`); + expect(isolatedConfig).toContain('trust_level = "trusted"'); const wrapper = await fs.readFile(generated.wrapperPath, "utf8"); expect(wrapper).toContain("CODEX_HOME: codexHome"); expect(wrapper).not.toContain(sourceCodexHome); @@ -337,6 +377,50 @@ describe("prepareAcpxCodexAuthConfig", () => { ).rejects.toMatchObject({ code: "ENOENT" }); }); + it("copies only trusted Codex project declarations into the isolated Codex home", async () => { + const root = await makeTempDir(); + const sourceCodexHome = path.join(root, "source-codex"); + const stateDir = path.join(root, "state"); + const explicitProject = path.join(root, "explicit project"); + const inlineProject = path.join(root, "inline-project"); + const mapProject = path.join(root, "map-project"); + const untrustedProject = path.join(root, "untrusted-project"); + const generated = generatedCodexPaths(stateDir); + await fs.mkdir(sourceCodexHome, { recursive: true }); + await fs.writeFile( + path.join(sourceCodexHome, "config.toml"), + [ + 'notify = ["SkyComputerUseClient", "turn-ended"]', + `projects = { ${JSON.stringify(mapProject)} = { trust_level = "trusted" }, ${JSON.stringify(untrustedProject)} = { trust_level = "untrusted" } }`, + "[projects]", + `${JSON.stringify(inlineProject)} = { trust_level = "trusted" }`, + `[projects.${JSON.stringify(explicitProject)}]`, + 'trust_level = "trusted"', + "", + ].join("\n"), + ); + process.env.CODEX_HOME = sourceCodexHome; + const pluginConfig = resolveAcpxPluginConfig({ + rawConfig: {}, + workspaceDir: root, + }); + + await prepareAcpxCodexAuthConfig({ + pluginConfig, + stateDir, + resolveInstalledCodexAcpBinPath: async () => undefined, + }); + + const isolatedConfig = await fs.readFile(generated.configPath, "utf8"); + expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(root))}]`); + expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(explicitProject))}]`); + expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(inlineProject))}]`); + expect(isolatedConfig).toContain(`[projects.${JSON.stringify(path.resolve(mapProject))}]`); + expect(isolatedConfig).not.toContain(untrustedProject); + expect(isolatedConfig).not.toContain("notify"); + expect(isolatedConfig).not.toContain("SkyComputerUseClient"); + }); + it("normalizes an explicitly configured Codex ACP command to the local wrapper", async () => { const root = await makeTempDir(); const sourceCodexHome = path.join(root, "source-codex"); diff --git a/extensions/acpx/src/codex-auth-bridge.ts b/extensions/acpx/src/codex-auth-bridge.ts index 9b322fa3466..4f76e661037 100644 --- a/extensions/acpx/src/codex-auth-bridge.ts +++ b/extensions/acpx/src/codex-auth-bridge.ts @@ -1,10 +1,16 @@ import fsSync from "node:fs"; import fs from "node:fs/promises"; import { createRequire } from "node:module"; +import os from "node:os"; import path from "node:path"; import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store"; +import { + extractTrustedCodexProjectPaths, + renderIsolatedCodexProjectTrustConfig, +} from "./codex-trust-config.js"; import { resolveAcpxPluginRoot } from "./config.js"; import type { ResolvedAcpxPluginConfig } from "./config.js"; +import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js"; const CODEX_ACP_PACKAGE = "@zed-industries/codex-acp"; const CODEX_ACP_BIN = "codex-acp"; @@ -156,7 +162,25 @@ import { spawn } from "node:child_process"; import { fileURLToPath } from "node:url"; ${params.envSetup} -const configuredArgs = process.argv.slice(2); +const openClawWrapperArgs = new Set([ + ${quoteCommandPart(OPENCLAW_ACPX_LEASE_ID_ARG)}, + ${quoteCommandPart(OPENCLAW_GATEWAY_INSTANCE_ID_ARG)}, +]); + +function stripOpenClawWrapperArgs(args) { + const stripped = []; + for (let index = 0; index < args.length; index += 1) { + const value = args[index]; + if (openClawWrapperArgs.has(value)) { + index += 1; + continue; + } + stripped.push(value); + } + return stripped; +} + +const configuredArgs = stripOpenClawWrapperArgs(process.argv.slice(2)); function resolveNpmCliPath() { const candidate = path.resolve( @@ -198,23 +222,78 @@ if (!command) { } const child = spawn(command, args, { + detached: process.platform !== "win32", env, stdio: "inherit", windowsHide: true, }); +let forceKillTimer; +let orphanCleanupStarted = false; + +function killChildTree(signal, options = {}) { + if (!child.pid || (!options.force && child.killed)) { + return; + } + if (process.platform !== "win32") { + try { + // The adapter can spawn grandchildren; signaling the process group keeps + // the generated wrapper from leaving an ACP tree behind. + process.kill(-child.pid, signal); + return; + } catch { + // Fall back to direct child signaling below. + } + } + child.kill(signal); +} + for (const signal of ["SIGINT", "SIGTERM", "SIGHUP"]) { process.once(signal, () => { - child.kill(signal); + killChildTree(signal); }); } +const originalParentPid = process.ppid; +const parentWatcher = + process.platform === "win32" + ? undefined + : setInterval(() => { + if (process.ppid === originalParentPid || process.ppid !== 1) { + return; + } + if (orphanCleanupStarted) { + return; + } + orphanCleanupStarted = true; + if (parentWatcher) { + clearInterval(parentWatcher); + } + killChildTree("SIGTERM"); + // Keep the wrapper alive long enough for stubborn adapters to receive + // a forced fallback signal after SIGTERM. + forceKillTimer = setTimeout(() => { + killChildTree("SIGKILL", { force: true }); + process.exit(1); + }, 1_500); + }, 1_000); +parentWatcher?.unref?.(); + child.on("error", (error) => { console.error(\`[openclaw] failed to launch ${params.displayName} ACP wrapper: \${error.message}\`); process.exit(1); }); child.on("exit", (code, signal) => { + if (parentWatcher) { + clearInterval(parentWatcher); + } + if (orphanCleanupStarted) { + return; + } + if (forceKillTimer) { + clearTimeout(forceKillTimer); + } if (code !== null) { process.exit(code); } @@ -250,12 +329,32 @@ function buildClaudeAcpWrapperScript(installedBinPath?: string): string { }); } -async function prepareIsolatedCodexHome(baseDir: string): Promise { - const codexHome = path.join(baseDir, "codex-home"); +async function readSourceCodexConfig(codexHome: string): Promise { + try { + return await fs.readFile(path.join(codexHome, "config.toml"), "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "ENOENT") { + return undefined; + } + throw error; + } +} + +async function prepareIsolatedCodexHome(params: { + baseDir: string; + workspaceDir: string; +}): Promise { + const sourceCodexHome = process.env.CODEX_HOME || path.join(os.homedir(), ".codex"); + const sourceConfig = await readSourceCodexConfig(sourceCodexHome); + const trustedProjectPaths = [ + ...(sourceConfig ? extractTrustedCodexProjectPaths(sourceConfig) : []), + params.workspaceDir, + ]; + const codexHome = path.join(params.baseDir, "codex-home"); await fs.mkdir(codexHome, { recursive: true }); await fs.writeFile( path.join(codexHome, "config.toml"), - "# Generated by OpenClaw for Codex ACP sessions.\n", + renderIsolatedCodexProjectTrustConfig(trustedProjectPaths), "utf8", ); return codexHome; @@ -383,7 +482,10 @@ export async function prepareAcpxCodexAuthConfig(params: { }): Promise { void params.logger; const codexBaseDir = path.join(params.stateDir, "acpx"); - await prepareIsolatedCodexHome(codexBaseDir); + await prepareIsolatedCodexHome({ + baseDir: codexBaseDir, + workspaceDir: params.pluginConfig.cwd, + }); const installedCodexBinPath = await ( params.resolveInstalledCodexAcpBinPath ?? resolveInstalledCodexAcpBinPath )(); diff --git a/extensions/acpx/src/codex-trust-config.ts b/extensions/acpx/src/codex-trust-config.ts new file mode 100644 index 00000000000..386eee640a0 --- /dev/null +++ b/extensions/acpx/src/codex-trust-config.ts @@ -0,0 +1,181 @@ +import path from "node:path"; + +function stripTomlComment(line: string): string { + let quote: "'" | '"' | null = null; + let escaping = false; + for (let index = 0; index < line.length; index += 1) { + const ch = line[index]; + if (escaping) { + escaping = false; + continue; + } + if (quote === '"' && ch === "\\") { + escaping = true; + continue; + } + if (quote) { + if (ch === quote) { + quote = null; + } + continue; + } + if (ch === "'" || ch === '"') { + quote = ch; + continue; + } + if (ch === "#") { + return line.slice(0, index); + } + } + return line; +} + +function parseTomlString(value: string): string | undefined { + const trimmed = value.trim(); + if (trimmed.startsWith('"') && trimmed.endsWith('"')) { + try { + return JSON.parse(trimmed) as string; + } catch { + return undefined; + } + } + if (trimmed.startsWith("'") && trimmed.endsWith("'")) { + return trimmed.slice(1, -1); + } + return undefined; +} + +function parseTomlDottedKey(value: string): string[] { + const parts: string[] = []; + let current = ""; + let quote: "'" | '"' | null = null; + let escaping = false; + + for (const ch of value.trim()) { + if (escaping) { + current += ch; + escaping = false; + continue; + } + if (quote === '"' && ch === "\\") { + current += ch; + escaping = true; + continue; + } + if (quote) { + current += ch; + if (ch === quote) { + quote = null; + } + continue; + } + if (ch === "'" || ch === '"') { + quote = ch; + current += ch; + continue; + } + if (ch === ".") { + parts.push(current.trim()); + current = ""; + continue; + } + current += ch; + } + if (current.trim()) { + parts.push(current.trim()); + } + return parts.map((part) => parseTomlString(part) ?? part); +} + +function parseProjectHeader(line: string): string | undefined { + const trimmed = line.trim(); + if (!trimmed.startsWith("[") || !trimmed.endsWith("]") || trimmed.startsWith("[[")) { + return undefined; + } + const parts = parseTomlDottedKey(trimmed.slice(1, -1)); + return parts.length === 2 && parts[0] === "projects" ? parts[1] : undefined; +} + +function parseTrustedInlineProjectEntries(value: string): string[] { + const trusted: string[] = []; + const entryPattern = + /(?"(?:\\.|[^"\\])*"|'[^']*'|[A-Za-z0-9_\-/.~:]+)\s*=\s*\{(?[^{}]*(?:\{[^{}]*\}[^{}]*)*)\}/g; + for (const match of value.matchAll(entryPattern)) { + const key = match.groups?.key; + const body = match.groups?.body; + if (!key || !body || !/\btrust_level\s*=\s*["']trusted["']/.test(body)) { + continue; + } + const projectPath = parseTomlString(key) ?? key.trim(); + if (projectPath) { + trusted.push(projectPath); + } + } + return trusted; +} + +export function extractTrustedCodexProjectPaths(configToml: string): string[] { + const trusted = new Set(); + let currentProjectPath: string | undefined; + let inProjectsTable = false; + + for (const rawLine of configToml.split(/\r?\n/)) { + const line = stripTomlComment(rawLine).trim(); + if (!line) { + continue; + } + if (line.startsWith("[")) { + currentProjectPath = parseProjectHeader(line); + inProjectsTable = line === "[projects]"; + continue; + } + + if (currentProjectPath && /^trust_level\s*=\s*["']trusted["']\s*$/.test(line)) { + trusted.add(currentProjectPath); + continue; + } + + const assignment = + /^(?"(?:\\.|[^"\\])*"|'[^']*'|[A-Za-z0-9_\-/.~:]+)\s*=\s*(?.+)$/.exec(line); + if (!assignment?.groups) { + continue; + } + + const key = parseTomlString(assignment.groups.key) ?? assignment.groups.key; + const value = assignment.groups.value.trim(); + if (inProjectsTable && /^\{.*\}$/.test(value)) { + if (/\btrust_level\s*=\s*["']trusted["']/.test(value) && key) { + trusted.add(key); + } + continue; + } + if (key === "projects" || inProjectsTable) { + for (const projectPath of parseTrustedInlineProjectEntries(value)) { + trusted.add(projectPath); + } + } + } + + return Array.from(trusted); +} + +export function renderIsolatedCodexProjectTrustConfig(projectPaths: string[]): string { + const normalized = Array.from( + new Set( + projectPaths + .map((projectPath) => projectPath.trim()) + .filter(Boolean) + .map((projectPath) => path.resolve(projectPath)), + ), + ).toSorted((left, right) => left.localeCompare(right)); + + return [ + "# Generated by OpenClaw for Codex ACP sessions.", + ...normalized.flatMap((projectPath) => [ + "", + `[projects.${JSON.stringify(projectPath)}]`, + 'trust_level = "trusted"', + ]), + "", + ].join("\n"); +} diff --git a/extensions/acpx/src/process-lease.test.ts b/extensions/acpx/src/process-lease.test.ts new file mode 100644 index 00000000000..e33e8ac2553 --- /dev/null +++ b/extensions/acpx/src/process-lease.test.ts @@ -0,0 +1,36 @@ +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { createAcpxProcessLeaseStore, type AcpxProcessLease } from "./process-lease.js"; + +function makeLease(index: number): AcpxProcessLease { + return { + leaseId: `lease-${index}`, + gatewayInstanceId: "gateway-test", + sessionKey: `agent:codex:acp:${index}`, + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 1000 + index, + commandHash: `hash-${index}`, + startedAt: index, + state: "open", + }; +} + +describe("createAcpxProcessLeaseStore", () => { + it("serializes concurrent lease saves without dropping records", async () => { + const stateDir = await mkdtemp(path.join(tmpdir(), "openclaw-acpx-leases-")); + try { + const store = createAcpxProcessLeaseStore({ stateDir }); + await Promise.all(Array.from({ length: 25 }, (_, index) => store.save(makeLease(index)))); + + const leases = await store.listOpen("gateway-test"); + expect(leases.map((lease) => lease.leaseId).toSorted()).toEqual( + Array.from({ length: 25 }, (_, index) => `lease-${index}`).toSorted(), + ); + } finally { + await rm(stateDir, { recursive: true, force: true }); + } + }); +}); diff --git a/extensions/acpx/src/process-lease.ts b/extensions/acpx/src/process-lease.ts new file mode 100644 index 00000000000..bed260e7add --- /dev/null +++ b/extensions/acpx/src/process-lease.ts @@ -0,0 +1,169 @@ +import { randomUUID, createHash } from "node:crypto"; +import fs from "node:fs/promises"; +import path from "node:path"; +import { readJsonFileWithFallback, writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; + +export const OPENCLAW_ACPX_LEASE_ID_ENV = "OPENCLAW_ACPX_LEASE_ID"; +export const OPENCLAW_GATEWAY_INSTANCE_ID_ENV = "OPENCLAW_GATEWAY_INSTANCE_ID"; +export const OPENCLAW_ACPX_LEASE_ID_ARG = "--openclaw-acpx-lease-id"; +export const OPENCLAW_GATEWAY_INSTANCE_ID_ARG = "--openclaw-gateway-instance-id"; + +export type AcpxProcessLeaseState = "open" | "closing" | "closed" | "lost"; + +export type AcpxProcessLease = { + leaseId: string; + gatewayInstanceId: string; + sessionKey: string; + wrapperRoot: string; + wrapperPath: string; + rootPid: number; + processGroupId?: number; + commandHash: string; + startedAt: number; + state: AcpxProcessLeaseState; +}; + +export type AcpxProcessLeaseStore = { + load(leaseId: string): Promise; + listOpen(gatewayInstanceId?: string): Promise; + save(lease: AcpxProcessLease): Promise; + markState(leaseId: string, state: AcpxProcessLeaseState): Promise; +}; + +type LeaseFile = { + version: 1; + leases: AcpxProcessLease[]; +}; + +const LEASE_FILE = "process-leases.json"; + +function normalizeLease(value: unknown): AcpxProcessLease | undefined { + if (typeof value !== "object" || value === null) { + return undefined; + } + const record = value as Record; + if ( + typeof record.leaseId !== "string" || + typeof record.gatewayInstanceId !== "string" || + typeof record.sessionKey !== "string" || + typeof record.wrapperRoot !== "string" || + typeof record.wrapperPath !== "string" || + typeof record.rootPid !== "number" || + typeof record.commandHash !== "string" || + typeof record.startedAt !== "number" || + !["open", "closing", "closed", "lost"].includes(String(record.state)) + ) { + return undefined; + } + return { + leaseId: record.leaseId, + gatewayInstanceId: record.gatewayInstanceId, + sessionKey: record.sessionKey, + wrapperRoot: record.wrapperRoot, + wrapperPath: record.wrapperPath, + rootPid: record.rootPid, + ...(typeof record.processGroupId === "number" ? { processGroupId: record.processGroupId } : {}), + commandHash: record.commandHash, + startedAt: record.startedAt, + state: record.state as AcpxProcessLeaseState, + }; +} + +async function readLeaseFile(filePath: string): Promise { + const { value } = await readJsonFileWithFallback>(filePath, { + version: 1, + leases: [], + }); + const leases = Array.isArray(value.leases) + ? value.leases.map(normalizeLease).filter((lease): lease is AcpxProcessLease => !!lease) + : []; + return { version: 1, leases }; +} + +function writeLeaseFile(filePath: string, value: LeaseFile): Promise { + return writeJsonFileAtomically(filePath, value); +} + +export function createAcpxProcessLeaseStore(params: { stateDir: string }): AcpxProcessLeaseStore { + const filePath = path.join(params.stateDir, LEASE_FILE); + let updateQueue: Promise = Promise.resolve(); + + async function update( + mutator: (leases: AcpxProcessLease[]) => AcpxProcessLease[], + ): Promise { + const run = updateQueue.then(async () => { + await fs.mkdir(params.stateDir, { recursive: true }); + const current = await readLeaseFile(filePath); + await writeLeaseFile(filePath, { + version: 1, + leases: mutator(current.leases), + }); + }); + updateQueue = run.catch(() => {}); + await run; + } + + async function readCurrent(): Promise { + await updateQueue; + return await readLeaseFile(filePath); + } + + return { + async load(leaseId) { + const current = await readCurrent(); + return current.leases.find((lease) => lease.leaseId === leaseId); + }, + async listOpen(gatewayInstanceId) { + const current = await readCurrent(); + return current.leases.filter( + (lease) => + (lease.state === "open" || lease.state === "closing") && + (!gatewayInstanceId || lease.gatewayInstanceId === gatewayInstanceId), + ); + }, + async save(lease) { + await update((leases) => [ + ...leases.filter((entry) => entry.leaseId !== lease.leaseId), + lease, + ]); + }, + async markState(leaseId, state) { + await update((leases) => + leases.map((lease) => (lease.leaseId === leaseId ? { ...lease, state } : lease)), + ); + }, + }; +} + +export function createAcpxProcessLeaseId(): string { + return randomUUID(); +} + +export function hashAcpxProcessCommand(command: string): string { + return createHash("sha256").update(command).digest("hex"); +} + +function quoteEnvValue(value: string): string { + return /^[A-Za-z0-9_./:=@+-]+$/.test(value) ? value : `'${value.replace(/'/g, "'\\''")}'`; +} + +export function withAcpxLeaseEnvironment(params: { + command: string; + leaseId: string; + gatewayInstanceId: string; + platform?: NodeJS.Platform; +}): string { + if ((params.platform ?? process.platform) === "win32") { + return params.command; + } + return [ + "env", + `${OPENCLAW_ACPX_LEASE_ID_ENV}=${quoteEnvValue(params.leaseId)}`, + `${OPENCLAW_GATEWAY_INSTANCE_ID_ENV}=${quoteEnvValue(params.gatewayInstanceId)}`, + params.command, + OPENCLAW_ACPX_LEASE_ID_ARG, + quoteEnvValue(params.leaseId), + OPENCLAW_GATEWAY_INSTANCE_ID_ARG, + quoteEnvValue(params.gatewayInstanceId), + ].join(" "); +} diff --git a/extensions/acpx/src/process-reaper.test.ts b/extensions/acpx/src/process-reaper.test.ts new file mode 100644 index 00000000000..80e6775e1be --- /dev/null +++ b/extensions/acpx/src/process-reaper.test.ts @@ -0,0 +1,262 @@ +import { describe, expect, it, vi } from "vitest"; +import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js"; +import { + cleanupOpenClawOwnedAcpxProcessTree, + isOpenClawOwnedAcpxProcessCommand, + reapStaleOpenClawOwnedAcpxOrphans, + type AcpxProcessInfo, +} from "./process-reaper.js"; + +const WRAPPER_ROOT = "/tmp/openclaw-state/acpx"; +const CODEX_WRAPPER_COMMAND = `node ${WRAPPER_ROOT}/codex-acp-wrapper.mjs`; +const CODEX_WRAPPER_COMMAND_WITH_LEASE = `${CODEX_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-1 ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-1`; +const CLAUDE_WRAPPER_COMMAND = `node ${WRAPPER_ROOT}/claude-agent-acp-wrapper.mjs`; +const PLUGIN_DEPS_CODEX_COMMAND = + "node /tmp/openclaw/plugin-runtime-deps/node_modules/@zed-industries/codex-acp/bin/codex-acp.js"; + +function cleanupDeps(processes: AcpxProcessInfo[]) { + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + return { + killed, + deps: { + listProcesses: vi.fn(async () => processes), + killProcess: vi.fn((pid: number, signal: NodeJS.Signals) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }; +} + +describe("process reaper", () => { + it("recognizes generated Codex and Claude wrappers only under the configured root", () => { + expect( + isOpenClawOwnedAcpxProcessCommand({ + command: CODEX_WRAPPER_COMMAND, + wrapperRoot: WRAPPER_ROOT, + }), + ).toBe(true); + expect( + isOpenClawOwnedAcpxProcessCommand({ + command: CLAUDE_WRAPPER_COMMAND, + wrapperRoot: WRAPPER_ROOT, + }), + ).toBe(true); + expect( + isOpenClawOwnedAcpxProcessCommand({ + command: "node /tmp/other/codex-acp-wrapper.mjs", + wrapperRoot: WRAPPER_ROOT, + }), + ).toBe(false); + }); + + it("recognizes OpenClaw plugin-runtime-deps ACP adapter children", () => { + expect(isOpenClawOwnedAcpxProcessCommand({ command: PLUGIN_DEPS_CODEX_COMMAND })).toBe(true); + expect(isOpenClawOwnedAcpxProcessCommand({ command: "npx @zed-industries/codex-acp" })).toBe( + false, + ); + }); + + it("kills an owned recorded process tree children first", async () => { + const { deps, killed } = cleanupDeps([ + { pid: 100, ppid: 1, command: CODEX_WRAPPER_COMMAND }, + { pid: 101, ppid: 100, command: PLUGIN_DEPS_CODEX_COMMAND }, + { pid: 102, ppid: 101, command: "node child.js" }, + ]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 100, + rootCommand: CODEX_WRAPPER_COMMAND, + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result.skippedReason).toBeUndefined(); + expect(result.inspectedPids).toEqual([100, 101, 102]); + expect(killed.slice(0, 3)).toEqual([ + { pid: 102, signal: "SIGTERM" }, + { pid: 101, signal: "SIGTERM" }, + { pid: 100, signal: "SIGTERM" }, + ]); + }); + + it("allows wrapper-root verification when stored wrapper commands are shell-quoted", async () => { + const { deps, killed } = cleanupDeps([{ pid: 110, ppid: 1, command: CODEX_WRAPPER_COMMAND }]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 110, + rootCommand: `"/usr/local/bin/node" "${WRAPPER_ROOT}/codex-acp-wrapper.mjs"`, + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result.skippedReason).toBeUndefined(); + expect(killed[0]).toEqual({ pid: 110, signal: "SIGTERM" }); + }); + + it("requires matching lease identity before killing a leased process tree", async () => { + const { deps, killed } = cleanupDeps([ + { pid: 112, ppid: 1, command: CODEX_WRAPPER_COMMAND_WITH_LEASE }, + ]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 112, + rootCommand: CODEX_WRAPPER_COMMAND, + expectedLeaseId: "lease-1", + expectedGatewayInstanceId: "gateway-1", + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result.skippedReason).toBeUndefined(); + expect(killed[0]).toEqual({ pid: 112, signal: "SIGTERM" }); + }); + + it("does not kill a reused same-root wrapper pid with a different lease identity", async () => { + const { deps, killed } = cleanupDeps([ + { + pid: 113, + ppid: 1, + command: `${CODEX_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} other-lease ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-1`, + }, + ]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 113, + rootCommand: CODEX_WRAPPER_COMMAND, + expectedLeaseId: "lease-1", + expectedGatewayInstanceId: "gateway-1", + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result).toEqual({ + inspectedPids: [113], + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }); + expect(killed).toEqual([]); + }); + + it("skips recorded pid cleanup when process listing is unavailable", async () => { + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 200, + rootCommand: CODEX_WRAPPER_COMMAND, + wrapperRoot: WRAPPER_ROOT, + deps: { + listProcesses: vi.fn(async () => { + throw new Error("ps unavailable"); + }), + killProcess: vi.fn((pid, signal) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }); + + expect(result).toEqual({ + inspectedPids: [], + terminatedPids: [], + skippedReason: "unverified-root", + }); + expect(killed).toEqual([]); + }); + + it("does not kill a reused pid when the live command is not OpenClaw-owned", async () => { + const { deps, killed } = cleanupDeps([{ pid: 250, ppid: 1, command: "node unrelated.js" }]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 250, + rootCommand: CODEX_WRAPPER_COMMAND, + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result).toEqual({ + inspectedPids: [250], + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }); + expect(killed).toEqual([]); + }); + + it("does not kill a reused adapter pid when the stored root was a generated wrapper", async () => { + const { deps, killed } = cleanupDeps([ + { + pid: 260, + ppid: 1, + command: PLUGIN_DEPS_CODEX_COMMAND, + }, + ]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 260, + rootCommand: CODEX_WRAPPER_COMMAND, + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result).toEqual({ + inspectedPids: [260], + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }); + expect(killed).toEqual([]); + }); + + it("skips non-owned recorded process trees", async () => { + const { deps, killed } = cleanupDeps([{ pid: 300, ppid: 1, command: "node server.js" }]); + + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: 300, + rootCommand: "node server.js", + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result.skippedReason).toBe("not-openclaw-owned"); + expect(killed).toEqual([]); + }); + + it("reaps stale OpenClaw-owned wrapper and adapter orphans on startup", async () => { + const { deps, killed } = cleanupDeps([ + { pid: 400, ppid: 1, command: CODEX_WRAPPER_COMMAND }, + { pid: 401, ppid: 400, command: PLUGIN_DEPS_CODEX_COMMAND }, + { pid: 402, ppid: 401, command: "node child.js" }, + { pid: 403, ppid: 1, command: CLAUDE_WRAPPER_COMMAND }, + { pid: 404, ppid: 403, command: "node claude-child.js" }, + { pid: 405, ppid: 1, command: PLUGIN_DEPS_CODEX_COMMAND }, + { pid: 406, ppid: 1, command: "node /tmp/other/codex-acp-wrapper.mjs" }, + ]); + + const result = await reapStaleOpenClawOwnedAcpxOrphans({ + wrapperRoot: WRAPPER_ROOT, + deps, + }); + + expect(result.skippedReason).toBeUndefined(); + expect(result.inspectedPids).toEqual([400, 401, 402, 403, 404, 405]); + expect(killed.filter((entry) => entry.signal === "SIGTERM").map((entry) => entry.pid)).toEqual([ + 402, 401, 400, 404, 403, 405, + ]); + }); + + it("keeps startup scans quiet when process listing is unavailable", async () => { + const result = await reapStaleOpenClawOwnedAcpxOrphans({ + wrapperRoot: WRAPPER_ROOT, + deps: { + listProcesses: vi.fn(async () => { + throw new Error("ps unavailable"); + }), + sleep: vi.fn(async () => {}), + }, + }); + + expect(result).toEqual({ + inspectedPids: [], + terminatedPids: [], + skippedReason: "process-list-unavailable", + }); + }); +}); diff --git a/extensions/acpx/src/process-reaper.ts b/extensions/acpx/src/process-reaper.ts new file mode 100644 index 00000000000..f7c3b7aad7f --- /dev/null +++ b/extensions/acpx/src/process-reaper.ts @@ -0,0 +1,381 @@ +import { execFile } from "node:child_process"; +import { promisify } from "node:util"; +import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js"; + +const execFileAsync = promisify(execFile); +const GENERATED_WRAPPER_BASENAMES = new Set([ + "codex-acp-wrapper.mjs", + "claude-agent-acp-wrapper.mjs", +]); +const OPENCLAW_PLUGIN_DEPS_MARKER = "/plugin-runtime-deps/"; +const ACP_PACKAGE_MARKERS = [ + "/@zed-industries/codex-acp/", + "/@agentclientprotocol/claude-agent-acp/", + "/acpx/dist/", +]; + +export type AcpxProcessInfo = { + pid: number; + ppid: number; + command: string; +}; + +export type AcpxProcessCleanupDeps = { + listProcesses?: () => Promise; + killProcess?: (pid: number, signal: NodeJS.Signals) => void; + sleep?: (ms: number) => Promise; +}; + +export type AcpxProcessCleanupResult = { + inspectedPids: number[]; + terminatedPids: number[]; + skippedReason?: "missing-root" | "not-openclaw-owned" | "unverified-root"; +}; + +export type AcpxStartupReapResult = { + inspectedPids: number[]; + terminatedPids: number[]; + skippedReason?: "unsupported-platform" | "process-list-unavailable"; +}; + +function normalizePathLike(value: string): string { + return value.replaceAll("\\", "/"); +} + +function commandMentionsGeneratedWrapper(command: string): boolean { + return Array.from(GENERATED_WRAPPER_BASENAMES).some((basename) => command.includes(basename)); +} + +function commandWrapperBelongsToRoot(command: string, wrapperRoot: string | undefined): boolean { + if (!wrapperRoot) { + return true; + } + const normalizedCommand = normalizePathLike(command); + const normalizedRoot = normalizePathLike(wrapperRoot).replace(/\/+$/, ""); + return Array.from(GENERATED_WRAPPER_BASENAMES).some((basename) => + normalizedCommand.includes(`${normalizedRoot}/${basename}`), + ); +} + +function commandsReferToSameRootCommand(liveCommand: string, storedCommand: string | undefined) { + if (!storedCommand?.trim()) { + return true; + } + return normalizePathLike(liveCommand).trim() === normalizePathLike(storedCommand).trim(); +} + +function splitCommandParts(value: string): string[] { + const parts: string[] = []; + let current = ""; + let quote: "'" | '"' | null = null; + let escaping = false; + + for (const ch of value) { + if (escaping) { + current += ch; + escaping = false; + continue; + } + if (ch === "\\" && quote !== "'") { + escaping = true; + continue; + } + if (quote) { + if (ch === quote) { + quote = null; + } else { + current += ch; + } + continue; + } + if (ch === "'" || ch === '"') { + quote = ch; + continue; + } + if (/\s/.test(ch)) { + if (current) { + parts.push(current); + current = ""; + } + continue; + } + current += ch; + } + + if (escaping) { + current += "\\"; + } + if (current) { + parts.push(current); + } + return parts; +} + +function commandOptionEquals( + parts: string[], + option: string, + expected: string | undefined, +): boolean { + if (!expected) { + return true; + } + const index = parts.indexOf(option); + return index >= 0 && parts[index + 1] === expected; +} + +function liveCommandMatchesLeaseIdentity(params: { + command: string | undefined; + expectedLeaseId?: string; + expectedGatewayInstanceId?: string; +}): boolean { + if (!params.expectedLeaseId && !params.expectedGatewayInstanceId) { + return true; + } + const parts = splitCommandParts(params.command ?? ""); + return ( + commandOptionEquals(parts, OPENCLAW_ACPX_LEASE_ID_ARG, params.expectedLeaseId) && + commandOptionEquals(parts, OPENCLAW_GATEWAY_INSTANCE_ID_ARG, params.expectedGatewayInstanceId) + ); +} + +export function isOpenClawOwnedAcpxProcessCommand(params: { + command: string | undefined; + wrapperRoot?: string; +}): boolean { + const command = params.command?.trim(); + if (!command) { + return false; + } + const normalized = normalizePathLike(command); + if (commandMentionsGeneratedWrapper(normalized)) { + return commandWrapperBelongsToRoot(normalized, params.wrapperRoot); + } + if (!normalized.includes(OPENCLAW_PLUGIN_DEPS_MARKER)) { + return false; + } + return ACP_PACKAGE_MARKERS.some((marker) => normalized.includes(marker)); +} + +function parseProcessList(stdout: string): AcpxProcessInfo[] { + const processes: AcpxProcessInfo[] = []; + for (const line of stdout.split(/\r?\n/)) { + const match = /^\s*(?\d+)\s+(?\d+)\s+(?.+?)\s*$/.exec(line); + if (!match?.groups) { + continue; + } + processes.push({ + pid: Number.parseInt(match.groups.pid, 10), + ppid: Number.parseInt(match.groups.ppid, 10), + command: match.groups.command, + }); + } + return processes; +} + +export async function listPlatformProcesses(): Promise { + if (process.platform === "win32") { + return []; + } + const { stdout } = await execFileAsync("ps", ["-axo", "pid=,ppid=,command="], { + maxBuffer: 8 * 1024 * 1024, + }); + return parseProcessList(stdout); +} + +function collectProcessTree(processes: AcpxProcessInfo[], rootPid: number): AcpxProcessInfo[] { + const childrenByParent = new Map(); + for (const processInfo of processes) { + const children = childrenByParent.get(processInfo.ppid) ?? []; + children.push(processInfo); + childrenByParent.set(processInfo.ppid, children); + } + + const byPid = new Map(processes.map((processInfo) => [processInfo.pid, processInfo])); + const root = byPid.get(rootPid); + const collected: AcpxProcessInfo[] = []; + if (root) { + collected.push(root); + } + + const queue = [...(childrenByParent.get(rootPid) ?? [])]; + while (queue.length > 0) { + const next = queue.shift(); + if (!next || collected.some((processInfo) => processInfo.pid === next.pid)) { + continue; + } + collected.push(next); + queue.push(...(childrenByParent.get(next.pid) ?? [])); + } + + return collected; +} + +function uniquePids(processes: AcpxProcessInfo[]): number[] { + return Array.from( + new Set( + processes + .map((processInfo) => processInfo.pid) + .filter((pid) => Number.isInteger(pid) && pid > 0 && pid !== process.pid), + ), + ); +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function terminatePids( + pids: number[], + deps: AcpxProcessCleanupDeps | undefined, +): Promise { + const killProcess = deps?.killProcess ?? ((pid, signal) => process.kill(pid, signal)); + const sleep = deps?.sleep ?? ((ms) => new Promise((resolve) => setTimeout(resolve, ms))); + const terminated: number[] = []; + + for (const pid of pids) { + try { + killProcess(pid, "SIGTERM"); + terminated.push(pid); + } catch { + // The process may already be gone. + } + } + if (terminated.length === 0) { + return terminated; + } + await sleep(750); + for (const pid of terminated) { + if (deps?.killProcess || isProcessAlive(pid)) { + try { + killProcess(pid, "SIGKILL"); + } catch { + // Best-effort cleanup only. + } + } + } + return terminated; +} + +export async function cleanupOpenClawOwnedAcpxProcessTree(params: { + rootPid?: number; + rootCommand?: string; + expectedLeaseId?: string; + expectedGatewayInstanceId?: string; + wrapperRoot?: string; + deps?: AcpxProcessCleanupDeps; +}): Promise { + const rootPid = params.rootPid; + if (!rootPid || rootPid <= 0 || rootPid === process.pid) { + return { inspectedPids: [], terminatedPids: [], skippedReason: "missing-root" }; + } + + let processes: AcpxProcessInfo[] = []; + try { + processes = await (params.deps?.listProcesses ?? listPlatformProcesses)(); + } catch { + processes = []; + } + + const listedTree = collectProcessTree(processes, rootPid); + // Session-store PIDs are stale data. If the live process table cannot prove + // that this PID still belongs to an OpenClaw-owned wrapper, fail closed to + // avoid killing an unrelated process after PID reuse. + if (listedTree.length === 0) { + return { inspectedPids: [], terminatedPids: [], skippedReason: "unverified-root" }; + } + const rootCommand = listedTree[0]?.command ?? params.rootCommand; + const liveCommandWasGeneratedWrapper = commandMentionsGeneratedWrapper( + normalizePathLike(rootCommand ?? ""), + ); + const storedCommandWasGeneratedWrapper = commandMentionsGeneratedWrapper( + normalizePathLike(params.rootCommand ?? ""), + ); + if (!liveCommandWasGeneratedWrapper && storedCommandWasGeneratedWrapper) { + return { + inspectedPids: listedTree.map((processInfo) => processInfo.pid), + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }; + } + if ( + !liveCommandWasGeneratedWrapper && + !commandsReferToSameRootCommand(rootCommand ?? "", params.rootCommand) + ) { + return { + inspectedPids: listedTree.map((processInfo) => processInfo.pid), + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }; + } + if ( + !isOpenClawOwnedAcpxProcessCommand({ + command: rootCommand, + wrapperRoot: params.wrapperRoot, + }) + ) { + return { + inspectedPids: listedTree.map((processInfo) => processInfo.pid), + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }; + } + if ( + !liveCommandMatchesLeaseIdentity({ + command: rootCommand, + expectedLeaseId: params.expectedLeaseId, + expectedGatewayInstanceId: params.expectedGatewayInstanceId, + }) + ) { + return { + inspectedPids: listedTree.map((processInfo) => processInfo.pid), + terminatedPids: [], + skippedReason: "not-openclaw-owned", + }; + } + + const pids = uniquePids(listedTree.toReversed()); + return { + inspectedPids: uniquePids(listedTree), + terminatedPids: await terminatePids(pids, params.deps), + }; +} + +export async function reapStaleOpenClawOwnedAcpxOrphans(params: { + wrapperRoot: string; + deps?: AcpxProcessCleanupDeps; +}): Promise { + if (process.platform === "win32") { + return { inspectedPids: [], terminatedPids: [], skippedReason: "unsupported-platform" }; + } + + let processes: AcpxProcessInfo[]; + try { + processes = await (params.deps?.listProcesses ?? listPlatformProcesses)(); + } catch { + return { inspectedPids: [], terminatedPids: [], skippedReason: "process-list-unavailable" }; + } + + const orphans = processes.filter( + (processInfo) => + processInfo.ppid === 1 && + isOpenClawOwnedAcpxProcessCommand({ + command: processInfo.command, + wrapperRoot: params.wrapperRoot, + }), + ); + // Startup reaping starts from currently visible orphan roots and then expands + // each tree, so adapter grandchildren do not survive as fresh orphans after + // the wrapper root exits. + const orphanTrees = orphans.map((orphan) => collectProcessTree(processes, orphan.pid)); + const inspectedPids = uniquePids(orphanTrees.flat()); + const pids = uniquePids(orphanTrees.flatMap((tree) => tree.toReversed())); + return { + inspectedPids, + terminatedPids: await terminatePids(pids, params.deps), + }; +} diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index ca7e1ccda40..ec5e37d9429 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { AcpRuntimeError, type AcpRuntime } from "../runtime-api.js"; +import { OPENCLAW_ACPX_LEASE_ID_ARG, OPENCLAW_GATEWAY_INSTANCE_ID_ARG } from "./process-lease.js"; import { AcpxRuntime, __testing } from "./runtime.js"; type TestSessionStore = { @@ -11,14 +12,17 @@ const DOCUMENTED_OPENCLAW_BRIDGE_COMMAND = "env OPENCLAW_HIDE_BANNER=1 OPENCLAW_SUPPRESS_NOTES=1 openclaw acp --url ws://127.0.0.1:18789 --token-file ~/.openclaw/gateway.token --session agent:main:main"; const CODEX_ACP_COMMAND = "npx @zed-industries/codex-acp@0.13.0"; const CODEX_ACP_WRAPPER_COMMAND = `node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"`; +const CODEX_ACP_WRAPPER_COMMAND_WITH_LEASE = `${CODEX_ACP_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-close ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-test`; function makeRuntime( baseStore: TestSessionStore, options: Partial[0]> = {}, + testOptions?: ConstructorParameters[1], ): { runtime: AcpxRuntime; wrappedStore: TestSessionStore & { markFresh: (sessionKey: string) => void }; delegate: { + cancel: AcpRuntime["cancel"]; close: AcpRuntime["close"]; ensureSession: AcpRuntime["ensureSession"]; getStatus: NonNullable; @@ -35,16 +39,19 @@ function makeRuntime( probeAvailability(): Promise; }; } { - const runtime = new AcpxRuntime({ - cwd: "/tmp", - sessionStore: baseStore, - agentRegistry: { - resolve: (agentName: string) => (agentName === "openclaw" ? "openclaw acp" : agentName), - list: () => ["codex", "openclaw"], + const runtime = new AcpxRuntime( + { + cwd: "/tmp", + sessionStore: baseStore, + agentRegistry: { + resolve: (agentName: string) => (agentName === "openclaw" ? "openclaw acp" : agentName), + list: () => ["codex", "openclaw"], + }, + permissionMode: "approve-reads", + ...options, }, - permissionMode: "approve-reads", - ...options, - }); + testOptions, + ); return { runtime, @@ -56,6 +63,7 @@ function makeRuntime( delegate: ( runtime as unknown as { delegate: { + cancel: AcpRuntime["cancel"]; close: AcpRuntime["close"]; ensureSession: AcpRuntime["ensureSession"]; getStatus: NonNullable; @@ -80,6 +88,26 @@ function makeRuntime( }; } +function makeLeaseStore() { + const leases = new Map>(); + return { + leases, + store: { + load: vi.fn(async (leaseId: string) => leases.get(leaseId) as never), + listOpen: vi.fn(async () => Array.from(leases.values()) as never), + save: vi.fn(async (lease: Record) => { + leases.set(String(lease.leaseId), lease); + }), + markState: vi.fn(async (leaseId: string, state: string) => { + const lease = leases.get(leaseId); + if (lease) { + lease.state = state; + } + }), + }, + }; +} + describe("AcpxRuntime fresh reset wrapper", () => { beforeEach(() => { vi.restoreAllMocks(); @@ -517,6 +545,483 @@ describe("AcpxRuntime fresh reset wrapper", () => { expect(baseStore.load).toHaveBeenCalledOnce(); }); + it("cleans up OpenClaw-owned ACPX process trees after close", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + acpxRecordId: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + pid: 900, + })), + save: vi.fn(async () => {}), + }; + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + const { runtime, delegate } = makeRuntime( + baseStore, + { + openclawWrapperRoot: "/tmp/openclaw/acpx", + }, + { + openclawProcessCleanup: { + listProcesses: vi.fn(async () => [ + { + pid: 900, + ppid: 1, + command: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + }, + { + pid: 901, + ppid: 900, + command: + "node /tmp/openclaw/plugin-runtime-deps/node_modules/@zed-industries/codex-acp/bin/codex-acp.js", + }, + ]), + killProcess: vi.fn((pid, signal) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }, + ); + vi.spyOn(delegate, "close").mockResolvedValue(undefined); + + await runtime.close({ + handle: { + sessionKey: "agent:codex:acp:binding:test", + backend: "acpx", + runtimeSessionName: "agent:codex:acp:binding:test", + }, + reason: "user-close", + }); + + expect(killed.slice(0, 2)).toEqual([ + { pid: 901, signal: "SIGTERM" }, + { pid: 900, signal: "SIGTERM" }, + ]); + }); + + it("records ACPX process leases without persisting lease-specific agent commands", async () => { + const savedRecords: Record[] = []; + const launchCommands: string[] = []; + const baseStore: TestSessionStore = { + load: vi.fn(async () => undefined), + save: vi.fn(async (record) => { + savedRecords.push(record); + }), + }; + const leaseStore = makeLeaseStore(); + const { runtime, delegate, wrappedStore } = makeRuntime(baseStore, { + openclawGatewayInstanceId: "gateway-test", + openclawProcessLeaseStore: leaseStore.store, + openclawWrapperRoot: "/tmp/openclaw/acpx", + agentRegistry: { + resolve: (agentName: string) => + agentName === "codex" ? CODEX_ACP_WRAPPER_COMMAND : agentName, + list: () => ["codex"], + }, + }); + vi.spyOn(delegate, "ensureSession").mockImplementation(async (input) => { + const command = ( + runtime as unknown as { scopedAgentRegistry: { resolve(agent: string): string } } + ).scopedAgentRegistry.resolve("codex"); + launchCommands.push(command); + await wrappedStore.save({ + name: input.sessionKey, + agentCommand: command, + pid: 777, + }); + return { + sessionKey: input.sessionKey, + backend: "acpx", + runtimeSessionName: input.sessionKey, + }; + }); + + await runtime.ensureSession({ + sessionKey: "agent:codex:acp:binding:test", + agent: "codex", + mode: "persistent", + }); + + expect(leaseStore.store.save).toHaveBeenCalledTimes(2); + const leases = Array.from(leaseStore.leases.values()); + expect(leases).toHaveLength(1); + expect(leases[0]).toMatchObject({ + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + rootPid: 777, + state: "open", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + }); + expect(launchCommands[0]).toContain("OPENCLAW_ACPX_LEASE_ID="); + expect(launchCommands[0]).toContain("OPENCLAW_GATEWAY_INSTANCE_ID=gateway-test"); + expect(savedRecords[0]?.agentCommand).toBe(CODEX_ACP_WRAPPER_COMMAND); + expect(savedRecords[0]).toMatchObject({ + openclawGatewayInstanceId: "gateway-test", + openclawLeaseId: leases[0]?.leaseId, + }); + }); + + it("keeps reusable persistent ACP launch commands stable across ensures", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + name: "agent:codex:acp:binding:test", + acpxRecordId: "record-1", + acpSessionId: "session-1", + agentCommand: CODEX_ACP_WRAPPER_COMMAND, + cwd: "/tmp", + closed: false, + })), + save: vi.fn(async () => {}), + }; + const leaseStore = makeLeaseStore(); + const { runtime, delegate } = makeRuntime(baseStore, { + openclawGatewayInstanceId: "gateway-test", + openclawProcessLeaseStore: leaseStore.store, + openclawWrapperRoot: "/tmp/openclaw/acpx", + agentRegistry: { + resolve: (agentName: string) => + agentName === "codex" ? CODEX_ACP_WRAPPER_COMMAND : agentName, + list: () => ["codex"], + }, + }); + const resolvedCommands: string[] = []; + vi.spyOn(delegate, "ensureSession").mockImplementation(async (input) => { + resolvedCommands.push( + ( + runtime as unknown as { scopedAgentRegistry: { resolve(agent: string): string } } + ).scopedAgentRegistry.resolve("codex"), + ); + return { + sessionKey: input.sessionKey, + backend: "acpx", + runtimeSessionName: input.sessionKey, + }; + }); + + await runtime.ensureSession({ + sessionKey: "agent:codex:acp:binding:test", + agent: "codex", + mode: "persistent", + }); + + expect(resolvedCommands).toEqual([CODEX_ACP_WRAPPER_COMMAND]); + expect(leaseStore.store.save).not.toHaveBeenCalled(); + }); + + it("merges sidecar lease ids into loaded ACPX session records", async () => { + const leaseStore = makeLeaseStore(); + leaseStore.leases.set("lease-loaded", { + leaseId: "lease-loaded", + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 777, + commandHash: "hash", + startedAt: 1, + state: "open", + }); + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + name: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + pid: 777, + })), + save: vi.fn(async () => {}), + }; + const { wrappedStore } = makeRuntime(baseStore, { + openclawGatewayInstanceId: "gateway-test", + openclawProcessLeaseStore: leaseStore.store, + openclawWrapperRoot: "/tmp/openclaw/acpx", + }); + + await expect(wrappedStore.load("agent:codex:acp:binding:test")).resolves.toMatchObject({ + openclawGatewayInstanceId: "gateway-test", + openclawLeaseId: "lease-loaded", + }); + }); + + it("merges the lease for the current ACPX session process when old leases exist", async () => { + const leaseStore = makeLeaseStore(); + leaseStore.leases.set("lease-old", { + leaseId: "lease-old", + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 700, + commandHash: "hash", + startedAt: 1, + state: "open", + }); + leaseStore.leases.set("lease-current", { + leaseId: "lease-current", + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 777, + commandHash: "hash", + startedAt: 2, + state: "open", + }); + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + name: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + pid: 777, + })), + save: vi.fn(async () => {}), + }; + const { wrappedStore } = makeRuntime(baseStore, { + openclawGatewayInstanceId: "gateway-test", + openclawProcessLeaseStore: leaseStore.store, + openclawWrapperRoot: "/tmp/openclaw/acpx", + }); + + await expect(wrappedStore.load("agent:codex:acp:binding:test")).resolves.toMatchObject({ + openclawGatewayInstanceId: "gateway-test", + openclawLeaseId: "lease-current", + }); + }); + + it("uses matching leases before legacy pid cleanup on close", async () => { + const leaseStore = makeLeaseStore(); + leaseStore.leases.set("lease-close", { + leaseId: "lease-close", + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 930, + commandHash: "hash", + startedAt: 1, + state: "open", + }); + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + acpxRecordId: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + openclawLeaseId: "lease-close", + pid: 930, + })), + save: vi.fn(async () => {}), + }; + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + const { runtime, delegate } = makeRuntime( + baseStore, + { + openclawGatewayInstanceId: "gateway-test", + openclawProcessLeaseStore: leaseStore.store, + openclawWrapperRoot: "/tmp/openclaw/acpx", + }, + { + openclawProcessCleanup: { + listProcesses: vi.fn(async () => [ + { + pid: 930, + ppid: 1, + command: CODEX_ACP_WRAPPER_COMMAND_WITH_LEASE, + }, + { pid: 931, ppid: 930, command: "node child.js" }, + ]), + killProcess: vi.fn((pid, signal) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }, + ); + vi.spyOn(delegate, "close").mockResolvedValue(undefined); + + await runtime.close({ + handle: { + sessionKey: "agent:codex:acp:binding:test", + backend: "acpx", + runtimeSessionName: "agent:codex:acp:binding:test", + }, + reason: "user-close", + }); + + expect(killed.slice(0, 2)).toEqual([ + { pid: 931, signal: "SIGTERM" }, + { pid: 930, signal: "SIGTERM" }, + ]); + expect(leaseStore.store.markState).toHaveBeenCalledWith("lease-close", "closing"); + expect(leaseStore.store.markState).toHaveBeenLastCalledWith("lease-close", "closed"); + }); + + it("closes the current process lease when the saved lease id is stale", async () => { + const leaseStore = makeLeaseStore(); + leaseStore.leases.set("lease-old", { + leaseId: "lease-old", + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 930, + commandHash: "hash", + startedAt: 1, + state: "open", + }); + leaseStore.leases.set("lease-current", { + leaseId: "lease-current", + gatewayInstanceId: "gateway-test", + sessionKey: "agent:codex:acp:binding:test", + wrapperRoot: "/tmp/openclaw/acpx", + wrapperPath: "/tmp/openclaw/acpx/codex-acp-wrapper.mjs", + rootPid: 940, + commandHash: "hash", + startedAt: 2, + state: "open", + }); + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + acpxRecordId: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + openclawLeaseId: "lease-old", + pid: 940, + })), + save: vi.fn(async () => {}), + }; + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + const { runtime, delegate } = makeRuntime( + baseStore, + { + openclawGatewayInstanceId: "gateway-test", + openclawProcessLeaseStore: leaseStore.store, + openclawWrapperRoot: "/tmp/openclaw/acpx", + }, + { + openclawProcessCleanup: { + listProcesses: vi.fn(async () => [ + { + pid: 930, + ppid: 1, + command: `${CODEX_ACP_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-old ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-test`, + }, + { + pid: 940, + ppid: 1, + command: `${CODEX_ACP_WRAPPER_COMMAND} ${OPENCLAW_ACPX_LEASE_ID_ARG} lease-current ${OPENCLAW_GATEWAY_INSTANCE_ID_ARG} gateway-test`, + }, + { pid: 941, ppid: 940, command: "node child.js" }, + ]), + killProcess: vi.fn((pid, signal) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }, + ); + vi.spyOn(delegate, "close").mockResolvedValue(undefined); + + await runtime.close({ + handle: { + sessionKey: "agent:codex:acp:binding:test", + backend: "acpx", + runtimeSessionName: "agent:codex:acp:binding:test", + }, + reason: "user-close", + }); + + expect(killed.slice(0, 2)).toEqual([ + { pid: 941, signal: "SIGTERM" }, + { pid: 940, signal: "SIGTERM" }, + ]); + expect(leaseStore.store.markState).not.toHaveBeenCalledWith("lease-old", expect.any(String)); + expect(leaseStore.store.markState).toHaveBeenCalledWith("lease-current", "closing"); + expect(leaseStore.store.markState).toHaveBeenLastCalledWith("lease-current", "closed"); + }); + + it("does not clean up a stale close pid reused by another wrapper root", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + acpxRecordId: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + pid: 920, + })), + save: vi.fn(async () => {}), + }; + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + const { runtime, delegate } = makeRuntime( + baseStore, + { + openclawWrapperRoot: "/tmp/openclaw/acpx", + }, + { + openclawProcessCleanup: { + listProcesses: vi.fn(async () => [ + { + pid: 920, + ppid: 1, + command: 'node "/tmp/other-gateway/acpx/codex-acp-wrapper.mjs"', + }, + ]), + killProcess: vi.fn((pid, signal) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }, + ); + vi.spyOn(delegate, "close").mockResolvedValue(undefined); + + await runtime.close({ + handle: { + sessionKey: "agent:codex:acp:binding:test", + backend: "acpx", + runtimeSessionName: "agent:codex:acp:binding:test", + }, + reason: "user-close", + }); + + expect(killed).toEqual([]); + }); + + it("does not tear down reusable ACPX sessions after cancel", async () => { + const baseStore: TestSessionStore = { + load: vi.fn(async () => ({ + acpxRecordId: "agent:codex:acp:binding:test", + agentCommand: 'node "/tmp/openclaw/acpx/codex-acp-wrapper.mjs"', + processId: "910", + })), + save: vi.fn(async () => {}), + }; + const killed: Array<{ pid: number; signal: NodeJS.Signals }> = []; + const listProcesses = vi.fn(async () => { + throw new Error("process listing should not run on cancel"); + }); + const { runtime, delegate } = makeRuntime( + baseStore, + {}, + { + openclawProcessCleanup: { + listProcesses, + killProcess: vi.fn((pid, signal) => { + killed.push({ pid, signal }); + }), + sleep: vi.fn(async () => {}), + }, + }, + ); + const cancel = vi.spyOn(delegate, "cancel").mockResolvedValue(undefined); + + const input = { + handle: { + sessionKey: "agent:codex:acp:binding:test", + backend: "acpx", + runtimeSessionName: "agent:codex:acp:binding:test", + }, + } satisfies Parameters[0]; + + await runtime.cancel(input); + + expect(cancel).toHaveBeenCalledWith(input); + expect(listProcesses).not.toHaveBeenCalled(); + expect(killed).toEqual([]); + }); + it("routes openclaw ensureSession through the bridge-safe delegate when MCP servers are configured", async () => { const baseStore: TestSessionStore = { load: vi.fn(async () => undefined), diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index 252eef132ea..0b8e37e23c9 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -1,4 +1,5 @@ import { AsyncLocalStorage } from "node:async_hooks"; +import { resolve as resolvePath } from "node:path"; import { ACPX_BACKEND_ID, AcpxRuntime as BaseAcpxRuntime, @@ -15,16 +16,45 @@ import { type AcpRuntimeStatus, } from "acpx/runtime"; import { AcpRuntimeError, type AcpRuntime } from "../runtime-api.js"; +import { + createAcpxProcessLeaseId, + hashAcpxProcessCommand, + withAcpxLeaseEnvironment, + type AcpxProcessLease, + type AcpxProcessLeaseStore, +} from "./process-lease.js"; +import { + cleanupOpenClawOwnedAcpxProcessTree, + isOpenClawOwnedAcpxProcessCommand, + type AcpxProcessCleanupDeps, +} from "./process-reaper.js"; type AcpSessionStore = AcpRuntimeOptions["sessionStore"]; type AcpSessionRecord = Parameters[0]; type AcpLoadedSessionRecord = Awaited>; +type BaseAcpxRuntimeTestOptions = ConstructorParameters[1]; +type OpenClawAcpxRuntimeOptions = AcpRuntimeOptions & { + openclawWrapperRoot?: string; + openclawGatewayInstanceId?: string; + openclawProcessLeaseStore?: AcpxProcessLeaseStore; +}; +type AcpxRuntimeTestOptions = Record & { + openclawProcessCleanup?: AcpxProcessCleanupDeps; +}; type ResetAwareSessionStore = AcpSessionStore & { markFresh: (sessionKey: string) => void; }; -function readSessionRecordName(record: AcpSessionRecord): string { +type AcpxLaunchLeaseContext = { + leaseId: string; + gatewayInstanceId: string; + sessionKey: string; + wrapperRoot: string; + stableCommand?: string; +}; + +function readSessionRecordName(record: unknown): string { if (typeof record !== "object" || record === null) { return ""; } @@ -32,7 +62,88 @@ function readSessionRecordName(record: AcpSessionRecord): string { return typeof name === "string" ? name.trim() : ""; } -function createResetAwareSessionStore(baseStore: AcpSessionStore): ResetAwareSessionStore { +function readRecordAgentCommand(record: unknown): string | undefined { + if (typeof record !== "object" || record === null) { + return undefined; + } + const { agentCommand } = record as { agentCommand?: unknown }; + return typeof agentCommand === "string" ? agentCommand.trim() || undefined : undefined; +} + +function readRecordCwd(record: unknown): string | undefined { + if (typeof record !== "object" || record === null) { + return undefined; + } + const { cwd } = record as { cwd?: unknown }; + return typeof cwd === "string" ? cwd.trim() || undefined : undefined; +} + +function readRecordResetOnNextEnsure(record: unknown): boolean { + if (typeof record !== "object" || record === null) { + return false; + } + const { acpx } = record as { acpx?: unknown }; + if (typeof acpx !== "object" || acpx === null) { + return false; + } + return (acpx as { reset_on_next_ensure?: unknown }).reset_on_next_ensure === true; +} + +function readRecordAgentPid(record: unknown): number | undefined { + if (typeof record !== "object" || record === null) { + return undefined; + } + const { pid, processId } = record as { pid?: unknown; processId?: unknown }; + const rawPid = pid ?? processId; + const numericPid = + typeof rawPid === "number" + ? rawPid + : typeof rawPid === "string" + ? Number.parseInt(rawPid, 10) + : undefined; + return numericPid && Number.isInteger(numericPid) && numericPid > 0 ? numericPid : undefined; +} + +function readOpenClawLeaseIdFromRecord(record: AcpLoadedSessionRecord): string | undefined { + if (typeof record !== "object" || record === null) { + return undefined; + } + const { openclawLeaseId } = record as { openclawLeaseId?: unknown }; + return typeof openclawLeaseId === "string" ? openclawLeaseId.trim() || undefined : undefined; +} + +function extractGeneratedWrapperPath(command: string | undefined): string { + const parts = splitCommandParts(command ?? ""); + return ( + parts.find( + (part) => + basename(part) === "codex-acp-wrapper.mjs" || + basename(part) === "claude-agent-acp-wrapper.mjs", + ) ?? "" + ); +} + +function selectCurrentSessionLease(params: { + leases: AcpxProcessLease[]; + sessionKeys: string[]; + rootPid?: number; +}): AcpxProcessLease | undefined { + const sessionKeys = new Set(params.sessionKeys.map((entry) => entry.trim()).filter(Boolean)); + const candidates = params.leases.filter((lease) => sessionKeys.has(lease.sessionKey)); + if (params.rootPid) { + return candidates.find((lease) => lease.rootPid === params.rootPid); + } + return candidates.toSorted((a, b) => b.startedAt - a.startedAt)[0]; +} + +function createResetAwareSessionStore( + baseStore: AcpSessionStore, + params?: { + gatewayInstanceId?: string; + leaseStore?: AcpxProcessLeaseStore; + launchScope?: AsyncLocalStorage; + }, +): ResetAwareSessionStore { const freshSessionKeys = new Set(); return { @@ -41,11 +152,61 @@ function createResetAwareSessionStore(baseStore: AcpSessionStore): ResetAwareSes if (normalized && freshSessionKeys.has(normalized)) { return undefined; } - return await baseStore.load(sessionId); + const record = await baseStore.load(sessionId); + if (!record || !params?.leaseStore || !params.gatewayInstanceId) { + return record; + } + const sessionName = readSessionRecordName(record) || normalized; + const lease = selectCurrentSessionLease({ + leases: await params.leaseStore.listOpen(params.gatewayInstanceId), + sessionKeys: [sessionName, normalized], + rootPid: readRecordAgentPid(record), + }); + if (!lease) { + return record; + } + return { + ...(record as Record), + openclawLeaseId: lease.leaseId, + openclawGatewayInstanceId: lease.gatewayInstanceId, + } as AcpLoadedSessionRecord; }, async save(record: AcpSessionRecord): Promise { - await baseStore.save(record); + let recordToSave = record; + const launch = params?.launchScope?.getStore(); const sessionName = readSessionRecordName(record); + const rootPid = readRecordAgentPid(record); + const agentCommand = readRecordAgentCommand(record); + const stableAgentCommand = launch?.stableCommand ?? agentCommand; + if ( + launch && + params?.leaseStore && + sessionName === launch.sessionKey && + rootPid && + stableAgentCommand + ) { + const lease: AcpxProcessLease = { + leaseId: launch.leaseId, + gatewayInstanceId: launch.gatewayInstanceId, + sessionKey: launch.sessionKey, + wrapperRoot: launch.wrapperRoot, + wrapperPath: extractGeneratedWrapperPath(stableAgentCommand), + rootPid, + commandHash: hashAcpxProcessCommand(stableAgentCommand), + startedAt: Date.now(), + state: "open", + }; + await params.leaseStore.save(lease); + recordToSave = { + ...(record as Record), + // ACPX uses agentCommand as reuse identity. Lease metadata belongs to + // our sidecar record, so keep the persisted command stable. + agentCommand: stableAgentCommand, + openclawLeaseId: launch.leaseId, + openclawGatewayInstanceId: launch.gatewayInstanceId, + } as AcpSessionRecord; + } + await baseStore.save(recordToSave); if (sessionName) { freshSessionKeys.delete(sessionName); } @@ -109,11 +270,11 @@ function readAgentFromHandle(handle: AcpRuntimeHandle): string | undefined { } function readAgentCommandFromRecord(record: AcpLoadedSessionRecord): string | undefined { - if (typeof record !== "object" || record === null) { - return undefined; - } - const { agentCommand } = record as { agentCommand?: unknown }; - return typeof agentCommand === "string" ? agentCommand.trim() || undefined : undefined; + return readRecordAgentCommand(record); +} + +function readAgentPidFromRecord(record: AcpLoadedSessionRecord): number | undefined { + return readRecordAgentPid(record); } function splitCommandParts(value: string): string[] { @@ -338,6 +499,7 @@ function appendCodexAcpConfigOverrides(command: string, override: CodexAcpModelO function createModelScopedAgentRegistry(params: { agentRegistry: AcpAgentRegistry; scope: AsyncLocalStorage; + leaseCommand: (command: string | undefined) => string | undefined; }): AcpAgentRegistry { return { resolve(agentName: string): string | undefined { @@ -349,9 +511,9 @@ function createModelScopedAgentRegistry(params: { typeof command !== "string" || !isCodexAcpCommand(command) ) { - return command; + return params.leaseCommand(command); } - return appendCodexAcpConfigOverrides(command, override); + return params.leaseCommand(appendCodexAcpConfigOverrides(command, override)); }, list(): string[] { return params.agentRegistry.list(); @@ -402,30 +564,47 @@ export class AcpxRuntime implements AcpRuntime { private readonly delegate: BaseAcpxRuntime; private readonly bridgeSafeDelegate: BaseAcpxRuntime; private readonly probeDelegate: BaseAcpxRuntime; + private readonly processCleanupDeps: AcpxProcessCleanupDeps | undefined; + private readonly wrapperRoot: string | undefined; + private readonly gatewayInstanceId: string | undefined; + private readonly processLeaseStore: AcpxProcessLeaseStore | undefined; + private readonly launchLeaseScope = new AsyncLocalStorage(); + private readonly cwd: string; - constructor( - options: AcpRuntimeOptions, - testOptions?: ConstructorParameters[1], - ) { - this.sessionStore = createResetAwareSessionStore(options.sessionStore); + constructor(options: OpenClawAcpxRuntimeOptions, testOptions?: AcpxRuntimeTestOptions) { + const { openclawProcessCleanup, ...delegateTestOptions } = testOptions ?? {}; + this.processCleanupDeps = openclawProcessCleanup; + this.wrapperRoot = options.openclawWrapperRoot; + this.gatewayInstanceId = options.openclawGatewayInstanceId; + this.processLeaseStore = options.openclawProcessLeaseStore; + this.cwd = options.cwd; + this.sessionStore = createResetAwareSessionStore(options.sessionStore, { + gatewayInstanceId: this.gatewayInstanceId, + leaseStore: this.processLeaseStore, + launchScope: this.launchLeaseScope, + }); this.agentRegistry = options.agentRegistry; this.scopedAgentRegistry = createModelScopedAgentRegistry({ agentRegistry: this.agentRegistry, scope: this.codexAcpModelOverrideScope, + leaseCommand: (command) => this.commandWithLaunchLease(command), }); const sharedOptions = { ...options, sessionStore: this.sessionStore, agentRegistry: this.scopedAgentRegistry, }; - this.delegate = new BaseAcpxRuntime(sharedOptions, testOptions); + this.delegate = new BaseAcpxRuntime( + sharedOptions, + delegateTestOptions as BaseAcpxRuntimeTestOptions, + ); this.bridgeSafeDelegate = shouldUseDistinctBridgeDelegate(options) ? new BaseAcpxRuntime( { ...sharedOptions, mcpServers: [], }, - testOptions, + delegateTestOptions as BaseAcpxRuntimeTestOptions, ) : this.delegate; this.probeDelegate = this.resolveDelegateForAgent(resolveProbeAgentName(options)); @@ -445,6 +624,13 @@ export class AcpxRuntime implements AcpRuntime { private async resolveDelegateForHandle(handle: AcpRuntimeHandle): Promise { const record = await this.sessionStore.load(handle.acpxRecordId ?? handle.sessionKey); + return this.resolveDelegateForLoadedRecord(handle, record); + } + + private resolveDelegateForLoadedRecord( + handle: AcpRuntimeHandle, + record: AcpLoadedSessionRecord, + ): BaseAcpxRuntime { const recordCommand = readAgentCommandFromRecord(record); if (recordCommand) { return this.resolveDelegateForCommand(recordCommand); @@ -464,6 +650,150 @@ export class AcpxRuntime implements AcpRuntime { }); } + private commandWithLaunchLease(command: string | undefined): string | undefined { + const launch = this.launchLeaseScope.getStore(); + if (!command || !launch) { + return command; + } + launch.stableCommand = command; + return withAcpxLeaseEnvironment({ + command, + leaseId: launch.leaseId, + gatewayInstanceId: launch.gatewayInstanceId, + }); + } + + private async canReuseStablePersistentSession(params: { + sessionKey: string; + mode: Parameters[0]["mode"]; + cwd: string | undefined; + command: string | undefined; + resumeSessionId: string | undefined; + }): Promise { + if (params.mode !== "persistent" || !params.command) { + return false; + } + const existing = await this.sessionStore.load(params.sessionKey); + if (!existing || readRecordResetOnNextEnsure(existing)) { + return false; + } + const recordCwd = readRecordCwd(existing); + if (!recordCwd || resolvePath(recordCwd) !== resolvePath(params.cwd?.trim() || this.cwd)) { + return false; + } + if (readRecordAgentCommand(existing) !== params.command) { + return false; + } + const existingSessionId = + typeof existing === "object" && existing !== null + ? (existing as { acpSessionId?: unknown }).acpSessionId + : undefined; + return !params.resumeSessionId || existingSessionId === params.resumeSessionId; + } + + private async runWithLaunchLease(params: { + sessionKey: string; + command: string | undefined; + enabled?: boolean; + run: () => Promise; + }): Promise { + if ( + params.enabled === false || + !params.command || + !this.wrapperRoot || + !this.gatewayInstanceId || + !this.processLeaseStore || + !isOpenClawOwnedAcpxProcessCommand({ + command: params.command, + wrapperRoot: this.wrapperRoot, + }) + ) { + return await params.run(); + } + const launch: AcpxLaunchLeaseContext = { + leaseId: createAcpxProcessLeaseId(), + gatewayInstanceId: this.gatewayInstanceId, + sessionKey: params.sessionKey, + wrapperRoot: this.wrapperRoot, + stableCommand: params.command, + }; + // The pending lease is written before acpx spawns. The session-store save + // path fills in the live PID after acpx connects and exposes the process. + await this.processLeaseStore.save({ + leaseId: launch.leaseId, + gatewayInstanceId: launch.gatewayInstanceId, + sessionKey: launch.sessionKey, + wrapperRoot: launch.wrapperRoot, + wrapperPath: extractGeneratedWrapperPath(params.command), + rootPid: 0, + commandHash: hashAcpxProcessCommand(params.command), + startedAt: Date.now(), + state: "open", + }); + return await this.launchLeaseScope.run(launch, params.run); + } + + private async cleanupProcessTreeForRecord( + handle: AcpRuntimeHandle, + record: AcpLoadedSessionRecord, + ): Promise { + const leaseId = readOpenClawLeaseIdFromRecord(record); + const rootPid = readAgentPidFromRecord(record); + const sessionKeys = [handle.sessionKey, readSessionRecordName(record)]; + const openLeases = + this.gatewayInstanceId && this.processLeaseStore + ? await this.processLeaseStore.listOpen(this.gatewayInstanceId) + : []; + const selectedLease = selectCurrentSessionLease({ + leases: openLeases, + sessionKeys, + rootPid, + }); + const loadedLease = leaseId ? await this.processLeaseStore?.load(leaseId) : undefined; + const lease = + selectedLease ?? + (loadedLease && + loadedLease.gatewayInstanceId === this.gatewayInstanceId && + (!rootPid || loadedLease.rootPid === rootPid) && + sessionKeys.includes(loadedLease.sessionKey) + ? loadedLease + : undefined); + if (lease && lease.gatewayInstanceId === this.gatewayInstanceId && lease.rootPid > 0) { + await this.processLeaseStore?.markState(lease.leaseId, "closing"); + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: lease.rootPid, + rootCommand: readAgentCommandFromRecord(record), + expectedLeaseId: lease.leaseId, + expectedGatewayInstanceId: lease.gatewayInstanceId, + wrapperRoot: lease.wrapperRoot, + deps: this.processCleanupDeps, + }); + await this.processLeaseStore?.markState( + lease.leaseId, + result.terminatedPids.length > 0 || result.skippedReason === "missing-root" + ? "closed" + : "lost", + ); + return; + } + + const rootCommand = + readAgentCommandFromRecord(record) ?? + resolveAgentCommandForName({ + agentName: readAgentFromHandle(handle), + agentRegistry: this.agentRegistry, + }); + if (!rootPid || !rootCommand) { + return; + } + await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid, + rootCommand, + wrapperRoot: this.wrapperRoot, + deps: this.processCleanupDeps, + }); + } + isHealthy(): boolean { return this.probeDelegate.isHealthy(); } @@ -489,9 +819,25 @@ export class AcpxRuntime implements AcpRuntime { normalizeAgentName(input.agent) === CODEX_ACP_AGENT_ID && isCodexAcpCommand(command) ? normalizeCodexAcpModelOverride(input.model, input.thinking) : undefined; + const stableLaunchCommand = + codexModelOverride && command + ? appendCodexAcpConfigOverrides(command, codexModelOverride) + : command; + const shouldStartWithLease = !(await this.canReuseStablePersistentSession({ + sessionKey: input.sessionKey, + mode: input.mode, + cwd: input.cwd, + command: stableLaunchCommand, + resumeSessionId: input.resumeSessionId, + })); if (!codexModelOverride) { - return delegate.ensureSession(input); + return await this.runWithLaunchLease({ + sessionKey: input.sessionKey, + command: stableLaunchCommand, + enabled: shouldStartWithLease, + run: () => delegate.ensureSession(input), + }); } const normalizedInput = { @@ -500,9 +846,15 @@ export class AcpxRuntime implements AcpRuntime { ? { model: codexAcpSessionModelId(codexModelOverride) } : {}), }; - return this.codexAcpModelOverrideScope.run(codexModelOverride, () => - delegate.ensureSession(normalizedInput), - ); + return await this.runWithLaunchLease({ + sessionKey: input.sessionKey, + command: stableLaunchCommand, + enabled: shouldStartWithLease, + run: () => + this.codexAcpModelOverrideScope.run(codexModelOverride, () => + delegate.ensureSession(normalizedInput), + ), + }); } async *runTurn(input: Parameters[0]): AsyncIterable { @@ -571,7 +923,10 @@ export class AcpxRuntime implements AcpRuntime { } async cancel(input: Parameters[0]): Promise { - const delegate = await this.resolveDelegateForHandle(input.handle); + const record = await this.sessionStore.load( + input.handle.acpxRecordId ?? input.handle.sessionKey, + ); + const delegate = this.resolveDelegateForLoadedRecord(input.handle, record); await delegate.cancel(input); } @@ -580,14 +935,21 @@ export class AcpxRuntime implements AcpRuntime { } async close(input: Parameters[0]): Promise { - await ( - await this.resolveDelegateForHandle(input.handle) - ).close({ - handle: input.handle, - reason: input.reason, - discardPersistentState: input.discardPersistentState, - }); - if (input.discardPersistentState) { + const record = await this.sessionStore.load( + input.handle.acpxRecordId ?? input.handle.sessionKey, + ); + let closeSucceeded = false; + try { + await this.resolveDelegateForLoadedRecord(input.handle, record).close({ + handle: input.handle, + reason: input.reason, + discardPersistentState: input.discardPersistentState, + }); + closeSucceeded = true; + } finally { + await this.cleanupProcessTreeForRecord(input.handle, record); + } + if (closeSucceeded && input.discardPersistentState) { this.sessionStore.markFresh(input.handle.sessionKey); } } diff --git a/extensions/acpx/src/service.test.ts b/extensions/acpx/src/service.test.ts index 085b592375f..88c55d1c577 100644 --- a/extensions/acpx/src/service.test.ts +++ b/extensions/acpx/src/service.test.ts @@ -11,6 +11,30 @@ const { prepareAcpxCodexAuthConfigMock } = vi.hoisted(() => ({ async ({ pluginConfig }: { pluginConfig: unknown }) => pluginConfig, ), })); +const { cleanupOpenClawOwnedAcpxProcessTreeMock } = vi.hoisted(() => ({ + cleanupOpenClawOwnedAcpxProcessTreeMock: vi.fn( + async (): Promise<{ + inspectedPids: number[]; + terminatedPids: number[]; + skippedReason?: string; + }> => ({ + inspectedPids: [], + terminatedPids: [], + }), + ), +})); +const { reapStaleOpenClawOwnedAcpxOrphansMock } = vi.hoisted(() => ({ + reapStaleOpenClawOwnedAcpxOrphansMock: vi.fn( + async (): Promise<{ + inspectedPids: number[]; + terminatedPids: number[]; + skippedReason?: string; + }> => ({ + inspectedPids: [], + terminatedPids: [], + }), + ), +})); const { acpxRuntimeConstructorMock, createAgentRegistryMock, createFileSessionStoreMock } = vi.hoisted(() => ({ acpxRuntimeConstructorMock: vi.fn(function AcpxRuntime(options: unknown) { @@ -59,6 +83,11 @@ vi.mock("./codex-auth-bridge.js", () => ({ prepareAcpxCodexAuthConfig: prepareAcpxCodexAuthConfigMock, })); +vi.mock("./process-reaper.js", () => ({ + cleanupOpenClawOwnedAcpxProcessTree: cleanupOpenClawOwnedAcpxProcessTreeMock, + reapStaleOpenClawOwnedAcpxOrphans: reapStaleOpenClawOwnedAcpxOrphansMock, +})); + import { getAcpRuntimeBackend } from "../runtime-api.js"; import { createAcpxRuntimeService } from "./service.js"; @@ -73,6 +102,8 @@ async function makeTempDir(): Promise { afterEach(async () => { runtimeRegistry.clear(); prepareAcpxCodexAuthConfigMock.mockClear(); + cleanupOpenClawOwnedAcpxProcessTreeMock.mockClear(); + reapStaleOpenClawOwnedAcpxOrphansMock.mockClear(); acpxRuntimeConstructorMock.mockClear(); createAgentRegistryMock.mockClear(); createFileSessionStoreMock.mockClear(); @@ -155,6 +186,123 @@ describe("createAcpxRuntimeService", () => { await service.stop?.(ctx); }); + it("reaps stale ACPX process leases from the generated wrapper root at startup", async () => { + const workspaceDir = await makeTempDir(); + const ctx = createServiceContext(workspaceDir); + const runtime = createMockRuntime(); + const processCleanupDeps = { sleep: vi.fn(async () => {}) }; + await fs.mkdir(path.join(ctx.stateDir, "acpx"), { recursive: true }); + await fs.writeFile(path.join(ctx.stateDir, "gateway-instance-id"), "gw-test\n"); + await fs.writeFile( + path.join(ctx.stateDir, "acpx", "process-leases.json"), + JSON.stringify({ + version: 1, + leases: [ + { + leaseId: "lease-1", + gatewayInstanceId: "gw-test", + sessionKey: "agent:codex:acp:test", + wrapperRoot: path.join(ctx.stateDir, "acpx"), + wrapperPath: path.join(ctx.stateDir, "acpx", "codex-acp-wrapper.mjs"), + rootPid: 101, + commandHash: "hash", + startedAt: 1, + state: "open", + }, + ], + }), + ); + cleanupOpenClawOwnedAcpxProcessTreeMock.mockResolvedValueOnce({ + inspectedPids: [101, 102], + terminatedPids: [101, 102], + }); + const service = createAcpxRuntimeService({ + runtimeFactory: () => runtime as never, + processCleanupDeps, + }); + + await service.start(ctx); + + expect(cleanupOpenClawOwnedAcpxProcessTreeMock).toHaveBeenCalledWith({ + rootPid: 101, + expectedLeaseId: "lease-1", + expectedGatewayInstanceId: "gw-test", + wrapperRoot: path.join(ctx.stateDir, "acpx"), + deps: processCleanupDeps, + }); + expect(ctx.logger.info).toHaveBeenCalledWith("reaped 2 stale OpenClaw-owned ACPX processes"); + + await service.stop?.(ctx); + }); + + it("runs wrapper-root orphan cleanup before dropping pending ACPX leases", async () => { + const workspaceDir = await makeTempDir(); + const ctx = createServiceContext(workspaceDir); + const runtime = createMockRuntime(); + const processCleanupDeps = { sleep: vi.fn(async () => {}) }; + const wrapperRoot = path.join(ctx.stateDir, "acpx"); + await fs.mkdir(wrapperRoot, { recursive: true }); + await fs.writeFile(path.join(ctx.stateDir, "gateway-instance-id"), "gw-test\n"); + await fs.writeFile( + path.join(wrapperRoot, "process-leases.json"), + JSON.stringify({ + version: 1, + leases: [ + { + leaseId: "lease-pending", + gatewayInstanceId: "gw-test", + sessionKey: "agent:codex:acp:test", + wrapperRoot, + wrapperPath: path.join(wrapperRoot, "codex-acp-wrapper.mjs"), + rootPid: 0, + commandHash: "hash", + startedAt: 1, + state: "open", + }, + ], + }), + ); + reapStaleOpenClawOwnedAcpxOrphansMock.mockResolvedValueOnce({ + inspectedPids: [201, 202], + terminatedPids: [201, 202], + }); + const service = createAcpxRuntimeService({ + runtimeFactory: () => runtime as never, + processCleanupDeps, + }); + + await service.start(ctx); + + expect(cleanupOpenClawOwnedAcpxProcessTreeMock).not.toHaveBeenCalled(); + expect(reapStaleOpenClawOwnedAcpxOrphansMock).toHaveBeenCalledWith({ + wrapperRoot, + deps: processCleanupDeps, + }); + expect(ctx.logger.info).toHaveBeenCalledWith("reaped 2 stale OpenClaw-owned ACPX processes"); + const leaseFile = JSON.parse( + await fs.readFile(path.join(wrapperRoot, "process-leases.json"), "utf8"), + ); + expect(leaseFile.leases[0].state).toBe("closed"); + + await service.stop?.(ctx); + }); + + it("keeps startup quiet when no process leases are open", async () => { + const workspaceDir = await makeTempDir(); + const ctx = createServiceContext(workspaceDir); + const runtime = createMockRuntime(); + const service = createAcpxRuntimeService({ + runtimeFactory: () => runtime as never, + }); + + await service.start(ctx); + + expect(cleanupOpenClawOwnedAcpxProcessTreeMock).not.toHaveBeenCalled(); + expect(ctx.logger.warn).not.toHaveBeenCalled(); + + await service.stop?.(ctx); + }); + it("registers the default backend without importing ACPX runtime until first use", async () => { const workspaceDir = await makeTempDir(); const ctx = createServiceContext(workspaceDir); diff --git a/extensions/acpx/src/service.ts b/extensions/acpx/src/service.ts index 1c989bfce8a..2a06060f5d1 100644 --- a/extensions/acpx/src/service.ts +++ b/extensions/acpx/src/service.ts @@ -1,4 +1,6 @@ +import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; +import path from "node:path"; import { inspect } from "node:util"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import type { @@ -14,6 +16,12 @@ import { toAcpMcpServers, type ResolvedAcpxPluginConfig, } from "./config.js"; +import { createAcpxProcessLeaseStore, type AcpxProcessLeaseStore } from "./process-lease.js"; +import { + cleanupOpenClawOwnedAcpxProcessTree, + reapStaleOpenClawOwnedAcpxOrphans, + type AcpxProcessCleanupDeps, +} from "./process-reaper.js"; type AcpxRuntimeLike = AcpRuntime & { probeAvailability(): Promise; @@ -33,12 +41,16 @@ let runtimeModulePromise: Promise | null = null; type AcpxRuntimeFactoryParams = { pluginConfig: ResolvedAcpxPluginConfig; + gatewayInstanceId: string; + processLeaseStore: AcpxProcessLeaseStore; + wrapperRoot: string; logger?: PluginLogger; }; type CreateAcpxRuntimeServiceParams = { pluginConfig?: unknown; runtimeFactory?: (params: AcpxRuntimeFactoryParams) => AcpxRuntimeLike | Promise; + processCleanupDeps?: AcpxProcessCleanupDeps; }; function loadRuntimeModule(): Promise { @@ -57,6 +69,9 @@ function createLazyDefaultRuntime(params: AcpxRuntimeFactoryParams): AcpxRuntime runtimePromise ??= loadRuntimeModule().then((module) => { runtime = new module.AcpxRuntime({ cwd: params.pluginConfig.cwd, + openclawGatewayInstanceId: params.gatewayInstanceId, + openclawProcessLeaseStore: params.processLeaseStore, + openclawWrapperRoot: params.wrapperRoot, sessionStore: module.createFileSessionStore({ stateDir: params.pluginConfig.stateDir, }), @@ -188,6 +203,73 @@ function shouldRunStartupProbe(env: NodeJS.ProcessEnv = process.env): boolean { return env[ENABLE_STARTUP_PROBE_ENV] === "1"; } +async function resolveGatewayInstanceId(stateDir: string): Promise { + const filePath = path.join(stateDir, "gateway-instance-id"); + try { + const existing = (await fs.readFile(filePath, "utf8")).trim(); + if (existing) { + return existing; + } + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + } + const next = randomUUID(); + await fs.mkdir(stateDir, { recursive: true }); + await fs.writeFile(filePath, `${next}\n`, { mode: 0o600 }); + return next; +} + +async function reapOpenAcpxProcessLeases(params: { + gatewayInstanceId: string; + leaseStore: AcpxProcessLeaseStore; + deps?: AcpxProcessCleanupDeps; +}): Promise<{ inspectedPids: number[]; terminatedPids: number[] }> { + const leases = await params.leaseStore.listOpen(params.gatewayInstanceId); + const inspectedPids: number[] = []; + const terminatedPids: number[] = []; + const pendingLeaseRootResults = new Map< + string, + { inspectedPids: number[]; terminatedPids: number[] } + >(); + for (const lease of leases) { + if (lease.rootPid <= 0) { + await params.leaseStore.markState(lease.leaseId, "closing"); + let result = pendingLeaseRootResults.get(lease.wrapperRoot); + if (!result) { + result = await reapStaleOpenClawOwnedAcpxOrphans({ + wrapperRoot: lease.wrapperRoot, + deps: params.deps, + }); + pendingLeaseRootResults.set(lease.wrapperRoot, result); + inspectedPids.push(...result.inspectedPids); + terminatedPids.push(...result.terminatedPids); + } + await params.leaseStore.markState( + lease.leaseId, + result.terminatedPids.length > 0 ? "closed" : "lost", + ); + continue; + } + await params.leaseStore.markState(lease.leaseId, "closing"); + const result = await cleanupOpenClawOwnedAcpxProcessTree({ + rootPid: lease.rootPid, + expectedLeaseId: lease.leaseId, + expectedGatewayInstanceId: lease.gatewayInstanceId, + wrapperRoot: lease.wrapperRoot, + deps: params.deps, + }); + inspectedPids.push(...result.inspectedPids); + terminatedPids.push(...result.terminatedPids); + await params.leaseStore.markState( + lease.leaseId, + result.terminatedPids.length > 0 ? "closed" : "lost", + ); + } + return { inspectedPids, terminatedPids }; +} + export function createAcpxRuntimeService( params: CreateAcpxRuntimeServiceParams = {}, ): OpenClawPluginService { @@ -215,7 +297,21 @@ export function createAcpxRuntimeService( stateDir: ctx.stateDir, logger: ctx.logger, }); + const wrapperRoot = path.join(ctx.stateDir, "acpx"); await fs.mkdir(pluginConfig.stateDir, { recursive: true }); + await fs.mkdir(wrapperRoot, { recursive: true }); + const gatewayInstanceId = await resolveGatewayInstanceId(ctx.stateDir); + const processLeaseStore = createAcpxProcessLeaseStore({ stateDir: wrapperRoot }); + const startupReap = await reapOpenAcpxProcessLeases({ + gatewayInstanceId, + leaseStore: processLeaseStore, + deps: params.processCleanupDeps, + }); + if (startupReap.terminatedPids.length > 0) { + ctx.logger.info( + `reaped ${startupReap.terminatedPids.length} stale OpenClaw-owned ACPX process${startupReap.terminatedPids.length === 1 ? "" : "es"}`, + ); + } warnOnIgnoredLegacyCompatibilityConfig({ pluginConfig, logger: ctx.logger, @@ -224,10 +320,16 @@ export function createAcpxRuntimeService( runtime = params.runtimeFactory ? await params.runtimeFactory({ pluginConfig, + gatewayInstanceId, + processLeaseStore, + wrapperRoot, logger: ctx.logger, }) : createLazyDefaultRuntime({ pluginConfig, + gatewayInstanceId, + processLeaseStore, + wrapperRoot, logger: ctx.logger, }); diff --git a/src/agents/tools/cron-tool.ts b/src/agents/tools/cron-tool.ts index 445c949cc9a..58e6669e02d 100644 --- a/src/agents/tools/cron-tool.ts +++ b/src/agents/tools/cron-tool.ts @@ -735,20 +735,23 @@ Use jobId as the canonical identifier; id is accepted for compatibility. Use con const includeDisabled = Boolean(params.includeDisabled); let offset = 0; let result: unknown; - for (;;) { + let shouldContinue = true; + while (shouldContinue) { result = await callGateway("cron.list", gatewayOpts, { includeDisabled, agentId: listAgentId, ...(selfRemoveOnlyJobId ? { limit: 200, offset } : {}), }); if (!selfRemoveOnlyJobId || cronListResultHasJob(result, selfRemoveOnlyJobId)) { - break; + shouldContinue = false; + } else { + const nextOffset = readCronListNextOffset(result, offset); + if (nextOffset === undefined) { + shouldContinue = false; + } else { + offset = nextOffset; + } } - const nextOffset = readCronListNextOffset(result, offset); - if (nextOffset === undefined) { - break; - } - offset = nextOffset; } return jsonResult( selfRemoveOnlyJobId ? filterCronListResultToJobId(result, selfRemoveOnlyJobId) : result, diff --git a/src/agents/tools/sessions-access.test.ts b/src/agents/tools/sessions-access.test.ts index 533c5b70b0b..79685beb722 100644 --- a/src/agents/tools/sessions-access.test.ts +++ b/src/agents/tools/sessions-access.test.ts @@ -3,6 +3,7 @@ import type { OpenClawConfig } from "../../config/config.js"; import { createAgentToAgentPolicy, createSessionVisibilityGuard, + createSessionVisibilityRowChecker, resolveEffectiveSessionToolsVisibility, resolveSandboxSessionToolsVisibility, resolveSessionToolsVisibility, @@ -109,6 +110,175 @@ describe("createAgentToAgentPolicy", () => { }); describe("createSessionVisibilityGuard", () => { + it("allows cross-agent spawned child rows in list results with tree visibility", () => { + const guard = createSessionVisibilityRowChecker({ + action: "list", + requesterSessionKey: "agent:main:main", + visibility: "tree", + a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig), + }); + + expect( + guard.check({ + key: "agent:codex:acp:child-1", + spawnedBy: "agent:main:main", + }), + ).toEqual({ allowed: true }); + }); + + it("allows cross-agent spawned child rows in all-visibility list results when a2a is disabled", () => { + const guard = createSessionVisibilityRowChecker({ + action: "list", + requesterSessionKey: "agent:main:main", + visibility: "all", + a2aPolicy: createAgentToAgentPolicy({ + tools: { agentToAgent: { enabled: false } }, + } as unknown as OpenClawConfig), + }); + + expect( + guard.check({ + key: "agent:codex:acp:child-1", + spawnedBy: "agent:main:main", + }), + ).toEqual({ allowed: true }); + }); + + it("keeps agent visibility same-agent-only for cross-agent owned child rows", () => { + const guard = createSessionVisibilityRowChecker({ + action: "list", + requesterSessionKey: "agent:main:main", + visibility: "agent", + a2aPolicy: createAgentToAgentPolicy({ + tools: { agentToAgent: { enabled: true, allow: ["main", "codex"] } }, + } as unknown as OpenClawConfig), + }); + + expect( + guard.check({ + key: "agent:codex:acp:child-1", + spawnedBy: "agent:main:main", + }), + ).toEqual({ + allowed: false, + status: "forbidden", + error: + "Session list visibility is restricted. Set tools.sessions.visibility=all to allow cross-agent access.", + }); + }); + + it("does not do spawned lookup for list visibility without row metadata", async () => { + const callGateway = vi.fn(async () => ({ + sessions: [{ key: "agent:codex:acp:child-1" }], + })); + sessionsResolutionTesting.setDepsForTest({ + callGateway: callGateway as never, + }); + + const guard = await createSessionVisibilityGuard({ + action: "list", + requesterSessionKey: "agent:main:main", + visibility: "tree", + a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig), + }); + + expect(guard.check("agent:codex:acp:child-1")).toMatchObject({ allowed: false }); + expect(callGateway).not.toHaveBeenCalled(); + + sessionsResolutionTesting.setDepsForTest(); + }); + + it("allows cross-agent spawned child sessions with tree visibility", async () => { + sessionsResolutionTesting.setDepsForTest({ + callGateway: vi.fn(async (request: { method?: string; params?: { spawnedBy?: string } }) => { + if (request.method === "sessions.list") { + expect(request.params?.spawnedBy).toBe("agent:main:main"); + return { + sessions: [{ key: "agent:codex:acp:child-1" }], + }; + } + return {}; + }) as never, + }); + + const guard = await createSessionVisibilityGuard({ + action: "history", + requesterSessionKey: "agent:main:main", + visibility: "tree", + a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig), + }); + + expect(guard.check("agent:codex:acp:child-1")).toEqual({ allowed: true }); + + sessionsResolutionTesting.setDepsForTest(); + }); + + it("keeps self visibility restricted even for spawned child sessions", async () => { + const guard = await createSessionVisibilityGuard({ + action: "history", + requesterSessionKey: "agent:main:main", + visibility: "self", + a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig), + }); + + expect(guard.check("agent:codex:acp:child-1")).toEqual({ + allowed: false, + status: "forbidden", + error: + "Session history visibility is restricted. Set tools.sessions.visibility=all to allow cross-agent access.", + }); + }); + + it("allows cross-agent spawned child sessions before agent-to-agent checks with all visibility", async () => { + sessionsResolutionTesting.setDepsForTest({ + callGateway: vi.fn(async (request: { method?: string; params?: { spawnedBy?: string } }) => { + if (request.method === "sessions.list") { + expect(request.params?.spawnedBy).toBe("agent:main:main"); + return { + sessions: [{ key: "agent:codex:acp:child-1" }], + }; + } + return {}; + }) as never, + }); + + const guard = await createSessionVisibilityGuard({ + action: "send", + requesterSessionKey: "agent:main:main", + visibility: "all", + a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig), + }); + + expect(guard.check("agent:codex:acp:child-1")).toEqual({ allowed: true }); + + sessionsResolutionTesting.setDepsForTest(); + }); + + it("allows cross-agent spawned child status before agent-to-agent checks with all visibility", async () => { + sessionsResolutionTesting.setDepsForTest({ + callGateway: vi.fn(async (request: { method?: string; params?: { spawnedBy?: string } }) => { + if (request.method === "sessions.list") { + expect(request.params?.spawnedBy).toBe("agent:main:main"); + return { + sessions: [{ key: "agent:codex:acp:child-1" }], + }; + } + return {}; + }) as never, + }); + + const guard = await createSessionVisibilityGuard({ + action: "status", + requesterSessionKey: "agent:main:main", + visibility: "all", + a2aPolicy: createAgentToAgentPolicy({} as unknown as OpenClawConfig), + }); + + expect(guard.check("agent:codex:acp:child-1")).toEqual({ allowed: true }); + + sessionsResolutionTesting.setDepsForTest(); + }); + it("does not block exact same-agent spawned targets that fall past the spawned list cap", async () => { sessionsResolutionTesting.setDepsForTest({ callGateway: vi.fn(async (request: { method?: string; params?: { key?: string } }) => { diff --git a/src/agents/tools/sessions-access.ts b/src/agents/tools/sessions-access.ts index 63f6eeeecd5..46aa69ad0d0 100644 --- a/src/agents/tools/sessions-access.ts +++ b/src/agents/tools/sessions-access.ts @@ -3,6 +3,7 @@ import { createAgentToAgentPolicy, createSessionVisibilityChecker, createSessionVisibilityGuard, + createSessionVisibilityRowChecker, listSpawnedSessionKeys, resolveEffectiveSessionToolsVisibility, resolveSandboxSessionToolsVisibility, @@ -15,6 +16,7 @@ export { createAgentToAgentPolicy, createSessionVisibilityChecker, createSessionVisibilityGuard, + createSessionVisibilityRowChecker, listSpawnedSessionKeys, resolveEffectiveSessionToolsVisibility, } from "../../plugin-sdk/session-visibility.js"; diff --git a/src/agents/tools/sessions-helpers.ts b/src/agents/tools/sessions-helpers.ts index 2f7b96feb88..f05e5d4e5f7 100644 --- a/src/agents/tools/sessions-helpers.ts +++ b/src/agents/tools/sessions-helpers.ts @@ -1,6 +1,7 @@ export { createAgentToAgentPolicy, createSessionVisibilityGuard, + createSessionVisibilityRowChecker, resolveEffectiveSessionToolsVisibility, resolveSandboxedSessionToolContext, } from "./sessions-access.js"; diff --git a/src/agents/tools/sessions-list-tool.ts b/src/agents/tools/sessions-list-tool.ts index dc046dbbec2..02309a9e934 100644 --- a/src/agents/tools/sessions-list-tool.ts +++ b/src/agents/tools/sessions-list-tool.ts @@ -22,8 +22,8 @@ import { import type { AnyAgentTool } from "./common.js"; import { jsonResult, readStringArrayParam, readStringParam } from "./common.js"; import { - createSessionVisibilityGuard, createAgentToAgentPolicy, + createSessionVisibilityRowChecker, classifySessionKind, deriveChannel, resolveDisplaySessionKey, @@ -136,7 +136,7 @@ export function createSessionsListTool(opts?: { const sessions = Array.isArray(list?.sessions) ? list.sessions : []; const storePath = typeof list?.path === "string" ? list.path : undefined; - const visibilityGuard = await createSessionVisibilityGuard({ + const visibilityGuard = createSessionVisibilityRowChecker({ action: "list", requesterSessionKey: effectiveRequesterKey, visibility, @@ -160,7 +160,17 @@ export function createSessionsListTool(opts?: { if (!key) { continue; } - const access = visibilityGuard.check(key); + const access = visibilityGuard.check({ + key, + agentId: typeof entry.agentId === "string" ? entry.agentId : undefined, + ownerSessionKey: + typeof (entry as { ownerSessionKey?: unknown }).ownerSessionKey === "string" + ? (entry as { ownerSessionKey?: string }).ownerSessionKey + : undefined, + spawnedBy: typeof entry.spawnedBy === "string" ? entry.spawnedBy : undefined, + parentSessionKey: + typeof entry.parentSessionKey === "string" ? entry.parentSessionKey : undefined, + }); if (!access.allowed) { continue; } diff --git a/src/agents/tools/sessions-send-tool.ts b/src/agents/tools/sessions-send-tool.ts index 26ed967a54d..0f14d5b86da 100644 --- a/src/agents/tools/sessions-send-tool.ts +++ b/src/agents/tools/sessions-send-tool.ts @@ -269,6 +269,15 @@ export function createSessionsSendTool(opts?: { const announceTimeoutMs = timeoutSeconds === 0 ? 30_000 : timeoutMs; const idempotencyKey = crypto.randomUUID(); let runId: string = idempotencyKey; + if (parseSessionThreadInfoFast(resolvedKey).threadId) { + return jsonResult({ + runId: crypto.randomUUID(), + status: "error", + error: + "sessions_send cannot target a thread session for inter-agent coordination. Use the parent channel session key instead.", + sessionKey: displayKey, + }); + } const visibilityGuard = await createSessionVisibilityGuard({ action: "send", requesterSessionKey: effectiveRequesterKey, @@ -284,15 +293,6 @@ export function createSessionsSendTool(opts?: { sessionKey: displayKey, }); } - if (parseSessionThreadInfoFast(resolvedKey).threadId) { - return jsonResult({ - runId: crypto.randomUUID(), - status: "error", - error: - "sessions_send cannot target a thread session for inter-agent coordination. Use the parent channel session key instead.", - sessionKey: displayKey, - }); - } // Capture the pre-run assistant snapshot before starting the nested run. // Fast in-process test doubles and short-circuit agent paths can finish diff --git a/src/agents/tools/sessions.test.ts b/src/agents/tools/sessions.test.ts index bd0a0f142e0..01d33e1f340 100644 --- a/src/agents/tools/sessions.test.ts +++ b/src/agents/tools/sessions.test.ts @@ -14,7 +14,7 @@ type SessionsToolTestConfig = { session: { scope: "per-sender"; mainKey: string }; tools: { agentToAgent: { enabled: boolean }; - sessions?: { visibility: "all" | "own" }; + sessions?: { visibility: "self" | "tree" | "agent" | "all" }; }; }; @@ -417,13 +417,20 @@ describe("resolveAnnounceTarget", () => { describe("sessions_list gating", () => { beforeEach(() => { callGatewayMock.mockClear(); - callGatewayMock.mockResolvedValue({ - path: "/tmp/sessions.json", - sessions: [ - { key: "agent:main:main", kind: "direct" }, - { key: "agent:other:main", kind: "direct" }, - ], - }); + callGatewayMock.mockImplementation( + (request: { method?: string; params?: { spawnedBy?: string } }) => { + if (request.method === "sessions.list" && request.params?.spawnedBy) { + return Promise.resolve({ path: "/tmp/sessions.json", sessions: [] }); + } + return Promise.resolve({ + path: "/tmp/sessions.json", + sessions: [ + { key: "agent:main:main", kind: "direct" }, + { key: "agent:other:main", kind: "direct" }, + ], + }); + }, + ); }); it("filters out other agents when tools.agentToAgent.enabled is false", async () => { @@ -435,6 +442,62 @@ describe("sessions_list gating", () => { }); }); + it("keeps requester-owned cross-agent rows with tree visibility without a spawned lookup", async () => { + loadConfigMock.mockReturnValue({ + session: { scope: "per-sender", mainKey: "main" }, + tools: { + agentToAgent: { enabled: false }, + sessions: { visibility: "tree" }, + }, + }); + callGatewayMock.mockResolvedValueOnce({ + path: "/tmp/sessions.json", + sessions: [ + { + key: "agent:codex:acp:child-1", + kind: "direct", + spawnedBy: MAIN_AGENT_SESSION_KEY, + }, + ], + }); + + const result = await createMainSessionsListTool().execute("call1", {}); + + expect(result.details).toMatchObject({ + count: 1, + sessions: [{ key: "agent:codex:acp:child-1", spawnedBy: MAIN_AGENT_SESSION_KEY }], + }); + expect(callGatewayMock).toHaveBeenCalledTimes(1); + }); + + it("keeps requester-owned cross-agent rows with all visibility when a2a is disabled", async () => { + loadConfigMock.mockReturnValue({ + session: { scope: "per-sender", mainKey: "main" }, + tools: { + agentToAgent: { enabled: false }, + sessions: { visibility: "all" }, + }, + }); + callGatewayMock.mockResolvedValueOnce({ + path: "/tmp/sessions.json", + sessions: [ + { + key: "agent:codex:acp:child-1", + kind: "direct", + parentSessionKey: MAIN_AGENT_SESSION_KEY, + }, + ], + }); + + const result = await createMainSessionsListTool().execute("call1", {}); + + expect(result.details).toMatchObject({ + count: 1, + sessions: [{ key: "agent:codex:acp:child-1", parentSessionKey: MAIN_AGENT_SESSION_KEY }], + }); + expect(callGatewayMock).toHaveBeenCalledTimes(1); + }); + it("keeps literal current keys for message previews", async () => { callGatewayMock.mockReset(); callGatewayMock @@ -442,7 +505,6 @@ describe("sessions_list gating", () => { path: "/tmp/sessions.json", sessions: [{ key: "current", kind: "direct" }], }) - .mockResolvedValueOnce({ sessions: [{ key: "current" }] }) .mockResolvedValueOnce({ messages: [{ role: "assistant", content: [] }] }); await createMainSessionsListTool().execute("call1", { messageLimit: 1 }); @@ -478,7 +540,6 @@ describe("sessions_list transcriptPath resolution", () => { }, ], }); - const result = await executeMainSessionsList(); expectWorkerTranscriptPath(result, { containsPath: path.join("agents", "worker", "sessions"), @@ -498,7 +559,6 @@ describe("sessions_list transcriptPath resolution", () => { }, ], }); - const result = await executeMainSessionsList(); expectWorkerTranscriptPath(result, { containsPath: path.join("agents", "worker", "sessions"), @@ -519,7 +579,6 @@ describe("sessions_list transcriptPath resolution", () => { }, ], }); - const result = await executeMainSessionsList(); expectWorkerTranscriptPath(result, { containsPath: path.join("agents", "worker", "sessions"), @@ -540,7 +599,6 @@ describe("sessions_list transcriptPath resolution", () => { }, ], }); - const result = await executeMainSessionsList(); expectWorkerTranscriptPath(result, { containsPath: path.join(stateDir, "agents", "worker", "sessions"), @@ -562,7 +620,6 @@ describe("sessions_list transcriptPath resolution", () => { }, ], }); - const result = await executeMainSessionsList(); const expectedSessionsDir = path.dirname(templateStorePath.replace("{agentId}", "worker")); expectWorkerTranscriptPath(result, { @@ -595,7 +652,6 @@ describe("sessions_list channel derivation", () => { }, ], }); - const result = await executeMainSessionsList(); expect(result.details).toMatchObject({ diff --git a/src/plugin-sdk/session-visibility.ts b/src/plugin-sdk/session-visibility.ts index 6a6015ccec9..81b71ecce69 100644 --- a/src/plugin-sdk/session-visibility.ts +++ b/src/plugin-sdk/session-visibility.ts @@ -31,6 +31,14 @@ export type SessionAccessResult = | { allowed: true } | { allowed: false; error: string; status: "forbidden" }; +export type SessionVisibilityRow = { + key: string; + agentId?: string; + ownerSessionKey?: string; + spawnedBy?: string; + parentSessionKey?: string; +}; + export async function listSpawnedSessionKeys(params: { requesterSessionKey: string; limit?: number; @@ -191,11 +199,56 @@ export function createSessionVisibilityChecker(params: { a2aPolicy: AgentToAgentPolicy; spawnedKeys: Set | null; }): { check: (targetSessionKey: string) => SessionAccessResult } { - const requesterAgentId = resolveAgentIdFromSessionKey(params.requesterSessionKey); const spawnedKeys = params.spawnedKeys; + const rowChecker = createSessionVisibilityRowChecker({ + action: params.action, + requesterSessionKey: params.requesterSessionKey, + visibility: params.visibility, + a2aPolicy: params.a2aPolicy, + }); const check = (targetSessionKey: string): SessionAccessResult => { - const targetAgentId = resolveAgentIdFromSessionKey(targetSessionKey); + const isSpawnedSession = spawnedKeys?.has(targetSessionKey) === true; + return rowChecker.check({ + key: targetSessionKey, + spawnedBy: isSpawnedSession ? params.requesterSessionKey : undefined, + }); + }; + + return { check }; +} + +function rowOwnedByRequester(row: SessionVisibilityRow, requesterSessionKey: string): boolean { + return ( + row.ownerSessionKey === requesterSessionKey || + row.spawnedBy === requesterSessionKey || + row.parentSessionKey === requesterSessionKey + ); +} + +export function createSessionVisibilityRowChecker(params: { + action: SessionAccessAction; + requesterSessionKey: string; + visibility: SessionToolsVisibility; + a2aPolicy: AgentToAgentPolicy; +}): { check: (row: SessionVisibilityRow) => SessionAccessResult } { + const requesterAgentId = resolveAgentIdFromSessionKey(params.requesterSessionKey); + + const check = (row: SessionVisibilityRow): SessionAccessResult => { + const targetSessionKey = row.key; + const targetAgentId = row.agentId ?? resolveAgentIdFromSessionKey(targetSessionKey); + const isRequesterSession = + targetSessionKey === params.requesterSessionKey || targetSessionKey === "current"; + const isRequesterOwned = rowOwnedByRequester(row, params.requesterSessionKey); + // Row ownership is stronger than agent ids: ACP children may use a backend + // agent id while still belonging to the requester that spawned them. + if ( + !isRequesterSession && + isRequesterOwned && + (params.visibility === "tree" || params.visibility === "all") + ) { + return { allowed: true }; + } const isCrossAgent = targetAgentId !== requesterAgentId; if (isCrossAgent) { if (params.visibility !== "all") { @@ -222,7 +275,7 @@ export function createSessionVisibilityChecker(params: { return { allowed: true }; } - if (params.visibility === "self" && targetSessionKey !== params.requesterSessionKey) { + if (params.visibility === "self" && !isRequesterSession) { return { allowed: false, status: "forbidden", @@ -230,11 +283,7 @@ export function createSessionVisibilityChecker(params: { }; } - if ( - params.visibility === "tree" && - targetSessionKey !== params.requesterSessionKey && - !spawnedKeys?.has(targetSessionKey) - ) { + if (params.visibility === "tree" && !isRequesterSession && !isRequesterOwned) { return { allowed: false, status: "forbidden", @@ -256,8 +305,10 @@ export async function createSessionVisibilityGuard(params: { }): Promise<{ check: (targetSessionKey: string) => SessionAccessResult; }> { + // Listing already has row ownership metadata; direct key actions still need + // this lookup until every caller can pass a normalized session row. const spawnedKeys = - params.visibility === "tree" + params.action !== "list" && (params.visibility === "tree" || params.visibility === "all") ? await listSpawnedSessionKeys({ requesterSessionKey: params.requesterSessionKey }) : null; return createSessionVisibilityChecker({