diff --git a/.github/codeql/openclaw-boundary/queries/raw-socket-callsite-classification.ql b/.github/codeql/openclaw-boundary/queries/raw-socket-callsite-classification.ql
index 555651b03df..311f7fced14 100644
--- a/.github/codeql/openclaw-boundary/queries/raw-socket-callsite-classification.ql
+++ b/.github/codeql/openclaw-boundary/queries/raw-socket-callsite-classification.ql
@@ -76,6 +76,8 @@ predicate allowedRawSocketClientCall(Expr call) {
or
allowedOwnerScope(call, "src/proxy-capture/proxy-server.ts", "startDebugProxyServer")
or
+ allowedOwnerScope(call, "extensions/codex-supervisor/src/json-rpc-client.ts", "connectCodexSupervisorUnixSocket")
+ or
allowedOwnerScope(call, "extensions/irc/src/client.ts", "connectIrcClient")
or
allowedOwnerScope(call, "extensions/qa-lab/src/lab-server-capture.ts", "probeTcpReachability")
diff --git a/.github/labeler.yml b/.github/labeler.yml
index ad68ab3502b..b79dc872728 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -399,6 +399,12 @@
- changed-files:
- any-glob-to-any-file:
- "extensions/codex/**"
+"extensions: codex-supervisor":
+ - changed-files:
+ - any-glob-to-any-file:
+ - "extensions/codex-supervisor/**"
+ - "docs/plugins/reference/codex-supervisor.md"
+ - "docs/specs/claw-supervisor.md"
"extensions: kimi-coding":
- changed-files:
- any-glob-to-any-file:
diff --git a/config/knip.config.ts b/config/knip.config.ts
index 23d44365c77..eab9c2c1a46 100644
--- a/config/knip.config.ts
+++ b/config/knip.config.ts
@@ -27,7 +27,7 @@ const bundledPluginEntries = [
"setup-entry.ts!",
"{api,contract-api,helper-api,runtime-api,light-runtime-api,update-offset-runtime-api,channel-plugin-api,provider-plugin-api,setup-api}.ts!",
"subagent-hooks-api.ts!",
- "src/{api,runtime-api,light-runtime-api,update-offset-runtime-api,channel-plugin-api,provider-plugin-api,doctor-contract,setup-surface}.ts!",
+ "src/{api,runtime-api,light-runtime-api,update-offset-runtime-api,channel-plugin-api,provider-plugin-api,doctor-contract,setup-surface,mcp-serve}.ts!",
"src/subagent-hooks-api.ts!",
] as const;
diff --git a/docs/plugins/plugin-inventory.md b/docs/plugins/plugin-inventory.md
index d8e54455258..9317884db54 100644
--- a/docs/plugins/plugin-inventory.md
+++ b/docs/plugins/plugin-inventory.md
@@ -64,6 +64,7 @@ commands.
| [chutes](/plugins/reference/chutes) | Adds Chutes model provider support to OpenClaw. | `@openclaw/chutes-provider`
included in OpenClaw | providers: chutes |
| [clickclack](/plugins/reference/clickclack) | Adds the Clickclack channel surface for sending and receiving OpenClaw messages. | `@openclaw/clickclack`
included in OpenClaw | channels: clickclack |
| [cloudflare-ai-gateway](/plugins/reference/cloudflare-ai-gateway) | Adds Cloudflare AI Gateway model provider support to OpenClaw. | `@openclaw/cloudflare-ai-gateway-provider`
included in OpenClaw | providers: cloudflare-ai-gateway |
+| [codex-supervisor](/plugins/reference/codex-supervisor) | Supervise Codex app-server sessions from OpenClaw. | `@openclaw/codex-supervisor`
included in OpenClaw | contracts: tools |
| [comfy](/plugins/reference/comfy) | Adds ComfyUI model provider support to OpenClaw. | `@openclaw/comfy-provider`
included in OpenClaw | providers: comfy; contracts: imageGenerationProviders, musicGenerationProviders, videoGenerationProviders |
| [copilot-proxy](/plugins/reference/copilot-proxy) | Adds Copilot Proxy model provider support to OpenClaw. | `@openclaw/copilot-proxy`
included in OpenClaw | providers: copilot-proxy |
| [deepgram](/plugins/reference/deepgram) | Adds media understanding provider support. Adds realtime transcription provider support. | `@openclaw/deepgram-provider`
included in OpenClaw | contracts: mediaUnderstandingProviders, realtimeTranscriptionProviders |
diff --git a/docs/plugins/reference.md b/docs/plugins/reference.md
index c50ae192c71..20c2cc058c2 100644
--- a/docs/plugins/reference.md
+++ b/docs/plugins/reference.md
@@ -36,6 +36,7 @@ pnpm plugins:inventory:gen
| [clickclack](/plugins/reference/clickclack) | Adds the Clickclack channel surface for sending and receiving OpenClaw messages. | `@openclaw/clickclack`
included in OpenClaw | channels: clickclack |
| [cloudflare-ai-gateway](/plugins/reference/cloudflare-ai-gateway) | Adds Cloudflare AI Gateway model provider support to OpenClaw. | `@openclaw/cloudflare-ai-gateway-provider`
included in OpenClaw | providers: cloudflare-ai-gateway |
| [codex](/plugins/reference/codex) | OpenClaw Codex app-server harness and model provider plugin with a Codex-managed GPT catalog. | `@openclaw/codex`
npm; ClawHub | providers: codex; contracts: mediaUnderstandingProviders, migrationProviders |
+| [codex-supervisor](/plugins/reference/codex-supervisor) | Supervise Codex app-server sessions from OpenClaw. | `@openclaw/codex-supervisor`
included in OpenClaw | contracts: tools |
| [comfy](/plugins/reference/comfy) | Adds ComfyUI model provider support to OpenClaw. | `@openclaw/comfy-provider`
included in OpenClaw | providers: comfy; contracts: imageGenerationProviders, musicGenerationProviders, videoGenerationProviders |
| [copilot-proxy](/plugins/reference/copilot-proxy) | Adds Copilot Proxy model provider support to OpenClaw. | `@openclaw/copilot-proxy`
included in OpenClaw | providers: copilot-proxy |
| [deepgram](/plugins/reference/deepgram) | Adds media understanding provider support. Adds realtime transcription provider support. | `@openclaw/deepgram-provider`
included in OpenClaw | contracts: mediaUnderstandingProviders, realtimeTranscriptionProviders |
diff --git a/docs/plugins/reference/codex-supervisor.md b/docs/plugins/reference/codex-supervisor.md
new file mode 100644
index 00000000000..a8cc7074c32
--- /dev/null
+++ b/docs/plugins/reference/codex-supervisor.md
@@ -0,0 +1,19 @@
+---
+summary: "Supervise Codex app-server sessions from OpenClaw."
+read_when:
+ - You are installing, configuring, or auditing the codex-supervisor plugin
+title: "Codex Supervisor plugin"
+---
+
+# Codex Supervisor plugin
+
+Supervise Codex app-server sessions from OpenClaw.
+
+## Distribution
+
+- Package: `@openclaw/codex-supervisor`
+- Install route: included in OpenClaw
+
+## Surface
+
+contracts: tools
diff --git a/docs/specs/claw-supervisor.md b/docs/specs/claw-supervisor.md
new file mode 100644
index 00000000000..f29794dec1b
--- /dev/null
+++ b/docs/specs/claw-supervisor.md
@@ -0,0 +1,247 @@
+---
+title: Claw Supervisor
+description: Fleet supervision plan for Codex app-server sessions controlled by OpenClaw.
+readWhen:
+ - Designing Codex fleet supervision
+ - Building OpenClaw tools that read, steer, or spawn Codex sessions
+ - Choosing between local, Cloudflare, and VPS deployment for supervised Codex
+---
+
+# Claw Supervisor
+
+## Goal
+
+Claw Supervisor lets one always-on OpenClaw instance monitor and drive a fleet of Codex sessions without changing the normal Codex user experience. A user can SSH into a host, start Codex, work in the TUI, and still have the supervisor read the session, steer it, interrupt it, spawn related sessions, and accept handoffs. Codex sessions can also call back into OpenClaw through MCP.
+
+## Product Model
+
+Codex remains the primary work surface. OpenClaw supervises Codex rather than hiding Codex inside an opaque OpenClaw subagent.
+
+The OpenClaw plugin is named `codex-supervisor`. `crabfleet` remains the deployment
+and host-fleet profile for CRAB machines rather than the reusable plugin name.
+
+The model has three roles:
+
+- Human-attached Codex: a normal interactive Codex TUI launched through a shared app-server.
+- Autonomous Codex: a Codex app-server thread spawned by the supervisor that a human can later attach to.
+- Supervisor Claw: an always-on OpenClaw agent with tools for fleet state, transcript reads, steering, interruption, spawning, and handoff.
+
+OpenClaw may use its existing subagent machinery internally, but the external contract is an attachable Codex session with a Codex thread id.
+
+## Architecture
+
+```text
+user SSH session
+ -> codex --remote unix://... or ws://...
+ -> local codex app-server daemon
+ <-> host sidecar / supervisor connector
+ <-> OpenClaw fleet supervisor
+ <-> supervisor MCP exposed back to Codex
+```
+
+Each Codex-capable host runs:
+
+- Codex app-server daemon.
+- A launcher that always starts interactive Codex with `--remote`.
+- A connector that registers app-server endpoints and live threads with the supervisor.
+
+The supervisor runs:
+
+- Endpoint registry.
+- Session registry.
+- Codex app-server JSON-RPC client pool.
+- MCP server for Codex-to-Claw calls.
+- OpenClaw tools for Claw-to-Codex control.
+- Policy engine for autonomous actions, approvals, and loop prevention.
+
+## Codex App-Server Contract
+
+Use Codex app-server APIs as the canonical control plane:
+
+- `initialize`, `initialized`
+- `thread/loaded/list`
+- `thread/list`
+- `thread/read`
+- `thread/resume`
+- `thread/start`
+- `turn/start`
+- `turn/steer`
+- `turn/interrupt`
+- `model/list`
+
+Interactive Codex must be launched with `codex --remote ` so the TUI and supervisor connect to the same app-server. Standalone `codex exec` is not a live-shared session today; use app-server APIs for autonomous work until Codex supports `exec --remote`.
+
+## Session Registry
+
+Supervisor stores one record per observed Codex thread:
+
+```json
+{
+ "sessionId": "codex-thread-id",
+ "endpointId": "host-a",
+ "host": "host-a.example",
+ "workspace": "/workspace/repo",
+ "repo": "owner/repo",
+ "branch": "feature/example",
+ "source": "vscode",
+ "status": "idle",
+ "humanAttached": true,
+ "lastSeenAt": "2026-05-28T10:00:00.000Z",
+ "summary": "Short working-state summary"
+}
+```
+
+The local implementation can derive most fields from Codex thread metadata. Fleet deployment should enrich records with host identity, user attachment state, git state, and sidecar health.
+
+## MCP Surface For Codex
+
+Every supervised Codex gets an MCP server named `openclaw-codex-supervisor`.
+
+Tools:
+
+- `codex_sessions_list`: list visible Codex sessions.
+- `codex_session_read`: read one transcript.
+- `codex_session_send`: send a message to an idle thread or steer an active thread.
+- `codex_session_interrupt`: interrupt the active turn.
+- `codex_endpoint_probe`: verify endpoint connectivity.
+- `claw_report_progress`: publish current task state to the supervisor.
+- `claw_ask`: ask the supervisor for help or delegation.
+- `codex_spawn`: create a new autonomous Codex session.
+- `codex_handoff`: request human or peer takeover.
+
+Resources:
+
+- `codex://sessions`
+- `codex://sessions/{sessionId}`
+- `codex://sessions/{sessionId}/transcript`
+
+## Claw Control Surface
+
+The always-on Claw gets the same primitives as internal tools:
+
+- list sessions and endpoints
+- read transcripts
+- send/steer text
+- interrupt active work
+- spawn new sessions
+- summarize and assign sessions
+- broadcast instructions to a filtered group
+- mark sessions blocked, done, or abandoned
+
+Tool behavior:
+
+- If a target thread is idle, `codex_session_send` maps to `turn/start`.
+- If a target thread is active and an in-progress turn id is visible, it maps to `turn/steer`.
+- If the active turn cannot be identified, the tool fails closed instead of creating an unrelated turn.
+- Codex-exposed MCP write controls stay disabled unless a trusted supervisor-only policy enables them.
+- Raw transcript reads stay disabled unless a trusted supervisor-only policy enables them.
+- Autonomous approval defaults deny tool/file approvals unless an explicit policy says otherwise.
+
+## Launch Flow
+
+Interactive host login:
+
+1. User SSHes into a CRAB host.
+2. SSH service starts or verifies `codex app-server daemon start`.
+3. Login wrapper launches `codex --remote unix:// --cd `.
+4. Host connector registers endpoint and loaded thread.
+5. Supervisor emits a high-priority fleet event: new Codex session, workspace, human-attached state, current task preview.
+6. Supervisor Claw can read and steer immediately.
+
+Autonomous spawn:
+
+1. Supervisor selects host and workspace.
+2. Host connector opens or resumes a Codex app-server thread.
+3. Supervisor starts the first turn with task text and MCP config.
+4. Session registry marks it autonomous and attachable.
+5. Human can later attach with `codex --remote resume ` once Codex supports that exact UX, or via current resume flow on the same app-server.
+
+## Deployment
+
+Preferred control plane:
+
+- Host connectors keep outbound WebSocket connections to the supervisor.
+- Supervisor state lives in OpenClaw Gateway storage.
+- Codex app-server remains local to each host; never expose a raw unauthenticated app-server to the public internet.
+
+Cloudflare viability:
+
+- Good for registry, durable objects, WebSocket fan-in, lightweight event routing, and public MCP/gateway endpoints.
+- Not enough by itself for direct private host control because Workers cannot dial arbitrary private Unix sockets or local loopback app-servers.
+- Use Cloudflare when every host connector phones home over outbound WebSocket.
+
+VPS fallback:
+
+- Use a Hetzner service when long-lived process control, SSH tunnels, private network routing, or local filesystem access is needed.
+- Keep the same protocol: host connectors outbound, supervisor registry central, Codex app-server local.
+
+## Security
+
+- Default bind is local Unix socket.
+- Remote app-server uses token or signed bearer auth.
+- Host connector authenticates to supervisor with a scoped host token.
+- Supervisor tools enforce per-session policy: read, steer, interrupt, spawn, approval.
+- Cross-agent messages include `originSessionId`; self-echo is dropped.
+- Broadcast requires an explicit filter and bounded target count.
+- Transcript reads redact secrets at OpenClaw boundary.
+- Approval requests default to deny for supervisor-originated turns unless policy allows them.
+
+## Implementation Plan
+
+Phase 1: Local supervisor MVP
+
+- Add Codex app-server JSON-RPC client for stdio proxy and WebSocket endpoints.
+- Add supervisor endpoint/session registry.
+- Add MCP tools: list, read, send, interrupt, probe.
+- Add local env config for endpoints.
+- Add fake app-server tests and one live local app-server smoke.
+
+Phase 2: OpenClaw integration
+
+- Register supervisor tools in the `codex-supervisor` plugin.
+- Inject supervisor MCP into Codex thread config.
+- Add session summaries to agent context.
+- Add event notifications when new Codex threads appear.
+- Add policy config for autonomous send/interrupt/spawn.
+
+Phase 3: Fleet connector
+
+- Host sidecar registers app-server endpoint, host metadata, git/workspace metadata, and human attachment state.
+- Add outbound WebSocket connector for Cloudflare or VPS control plane.
+- Add reconnect, heartbeat, and stale-session cleanup.
+- Add CRAB SSH launcher wrapper.
+
+Phase 4: Autonomous operation
+
+- Add spawn/resume/takeover flows.
+- Add broadcast and delegation.
+- Add progress reports and task-state summaries.
+- Add loop prevention and rate limits.
+- Add dashboard views.
+
+Phase 5: Multi-Claw
+
+- Shard sessions by group.
+- Add leadership/lease for each session.
+- Add audit log and replay.
+- Add escalation between Claw groups.
+
+## Acceptance Tests
+
+- A human launches Codex TUI through a shared app-server.
+- Supervisor lists the live thread via `thread/loaded/list`.
+- Supervisor reads transcript via `thread/read`.
+- Supervisor sends text to an idle thread via `turn/start`.
+- Supervisor steers an active thread via `turn/steer`.
+- Supervisor interrupt stops an active turn via `turn/interrupt`.
+- Codex calls supervisor MCP and lists peer sessions.
+- An autonomous Codex is spawned and later human-attached.
+- Lost host connector marks sessions stale without deleting history.
+
+## Open Questions
+
+- Exact Codex TUI attach UX for an app-server thread spawned without a TUI.
+- Whether Codex should add `exec --remote` for headless live-shared runs.
+- Durable state owner: OpenClaw Gateway DB, Cloudflare Durable Object, or VPS database.
+- Approval policy granularity for supervisor-originated turns.
+- How much transcript summary should be injected into the always-on Claw context versus kept as a tool/resource.
diff --git a/extensions/codex-supervisor/index.test.ts b/extensions/codex-supervisor/index.test.ts
new file mode 100644
index 00000000000..c5f566bdb9a
--- /dev/null
+++ b/extensions/codex-supervisor/index.test.ts
@@ -0,0 +1,43 @@
+import { createCapturedPluginRegistration } from "openclaw/plugin-sdk/plugin-test-runtime";
+import { describe, expect, it } from "vitest";
+import entry from "./index.js";
+
+describe("codex-supervisor plugin entry", () => {
+ it("registers supervisor tools from plugin config", () => {
+ const captured = createCapturedPluginRegistration({ id: "codex-supervisor" });
+ captured.api.pluginConfig = {
+ endpoints: [
+ {
+ id: "test",
+ transport: "websocket",
+ url: "ws://127.0.0.1:12345",
+ },
+ ],
+ allowRawTranscripts: true,
+ allowWriteControls: true,
+ };
+
+ entry.register(captured.api);
+
+ expect(captured.tools.map((tool) => tool.name).toSorted()).toEqual([
+ "codex_endpoint_probe",
+ "codex_session_interrupt",
+ "codex_session_read",
+ "codex_session_send",
+ "codex_sessions_list",
+ ]);
+ expect(captured.runtimeLifecycles).toHaveLength(1);
+ expect(captured.runtimeLifecycles[0]).toMatchObject({
+ id: "codex-supervisor",
+ description: "Close Codex supervisor app-server connections.",
+ });
+ expect(entry.configSchema.jsonSchema).toMatchObject({
+ type: "object",
+ properties: {
+ endpoints: { type: "array" },
+ allowRawTranscripts: { type: "boolean" },
+ allowWriteControls: { type: "boolean" },
+ },
+ });
+ });
+});
diff --git a/extensions/codex-supervisor/index.ts b/extensions/codex-supervisor/index.ts
new file mode 100644
index 00000000000..88e86d6e4dc
--- /dev/null
+++ b/extensions/codex-supervisor/index.ts
@@ -0,0 +1,36 @@
+import { buildJsonPluginConfigSchema, definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
+import {
+ CodexSupervisorPluginConfigSchema,
+ resolveCodexSupervisorPluginConfig,
+} from "./src/config.js";
+import { createCodexSupervisorTools } from "./src/plugin-tools.js";
+import { CodexSupervisor } from "./src/supervisor.js";
+
+export default definePluginEntry({
+ id: "codex-supervisor",
+ name: "Codex Supervisor",
+ description: "Supervise Codex app-server sessions from OpenClaw.",
+ configSchema: buildJsonPluginConfigSchema(
+ CodexSupervisorPluginConfigSchema as unknown as Parameters<
+ typeof buildJsonPluginConfigSchema
+ >[0],
+ ),
+ register(api) {
+ const config = resolveCodexSupervisorPluginConfig(api.pluginConfig);
+ const supervisor = new CodexSupervisor(config.endpoints);
+ api.lifecycle.registerRuntimeLifecycle({
+ id: "codex-supervisor",
+ description: "Close Codex supervisor app-server connections.",
+ cleanup: () => supervisor.close(),
+ });
+ for (const tool of createCodexSupervisorTools({
+ supervisor,
+ policy: {
+ allowRawTranscripts: config.allowRawTranscripts,
+ allowWriteControls: config.allowWriteControls,
+ },
+ })) {
+ api.registerTool(tool);
+ }
+ },
+});
diff --git a/extensions/codex-supervisor/openclaw.plugin.json b/extensions/codex-supervisor/openclaw.plugin.json
new file mode 100644
index 00000000000..fa37a86efb7
--- /dev/null
+++ b/extensions/codex-supervisor/openclaw.plugin.json
@@ -0,0 +1,87 @@
+{
+ "id": "codex-supervisor",
+ "activation": {
+ "onStartup": false
+ },
+ "name": "Codex Supervisor",
+ "description": "Supervise Codex app-server sessions from OpenClaw.",
+ "contracts": {
+ "tools": [
+ "codex_endpoint_probe",
+ "codex_sessions_list",
+ "codex_session_read",
+ "codex_session_send",
+ "codex_session_interrupt"
+ ]
+ },
+ "configSchema": {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "endpoints": {
+ "type": "array",
+ "items": {
+ "anyOf": [
+ {
+ "type": "object",
+ "additionalProperties": false,
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "label": {
+ "type": "string"
+ },
+ "transport": {
+ "const": "stdio-proxy"
+ },
+ "command": {
+ "type": "string"
+ },
+ "args": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "cwd": {
+ "type": "string"
+ }
+ }
+ },
+ {
+ "type": "object",
+ "additionalProperties": false,
+ "required": ["transport", "url"],
+ "properties": {
+ "id": {
+ "type": "string"
+ },
+ "label": {
+ "type": "string"
+ },
+ "transport": {
+ "const": "websocket"
+ },
+ "url": {
+ "type": "string"
+ },
+ "authTokenEnv": {
+ "type": "string"
+ }
+ }
+ }
+ ]
+ }
+ },
+ "allowRawTranscripts": {
+ "type": "boolean",
+ "default": false
+ },
+ "allowWriteControls": {
+ "type": "boolean",
+ "default": false
+ }
+ }
+ }
+}
diff --git a/extensions/codex-supervisor/package.json b/extensions/codex-supervisor/package.json
new file mode 100644
index 00000000000..fd1c9eee77a
--- /dev/null
+++ b/extensions/codex-supervisor/package.json
@@ -0,0 +1,21 @@
+{
+ "name": "@openclaw/codex-supervisor",
+ "version": "2026.5.28",
+ "private": true,
+ "description": "OpenClaw Codex app-server fleet supervision plugin.",
+ "type": "module",
+ "dependencies": {
+ "@modelcontextprotocol/sdk": "1.29.0",
+ "typebox": "1.1.38",
+ "ws": "8.21.0",
+ "zod": "4.4.3"
+ },
+ "devDependencies": {
+ "@openclaw/plugin-sdk": "workspace:*"
+ },
+ "openclaw": {
+ "extensions": [
+ "./index.ts"
+ ]
+ }
+}
diff --git a/extensions/codex-supervisor/src/api.ts b/extensions/codex-supervisor/src/api.ts
new file mode 100644
index 00000000000..a7425b4f025
--- /dev/null
+++ b/extensions/codex-supervisor/src/api.ts
@@ -0,0 +1,19 @@
+export {
+ CodexSupervisorPluginConfigSchema,
+ loadCodexSupervisorEndpoints,
+ resolveCodexSupervisorPluginConfig,
+} from "./config.js";
+export { CodexSupervisor } from "./supervisor.js";
+export { createCodexSupervisorTools } from "./plugin-tools.js";
+export { createCodexSupervisorMcpServer, serveCodexSupervisorMcp } from "./mcp-server.js";
+export type { CodexSupervisorPluginConfig, ResolvedCodexSupervisorPluginConfig } from "./config.js";
+export type {
+ CodexJsonRpcConnection,
+ CodexSupervisorEndpoint,
+ CodexSupervisorEndpointHealth,
+ CodexSupervisorSendResult,
+ CodexSupervisorSession,
+ CodexSupervisorSessionListResult,
+ CodexSupervisorThreadStatus,
+ CodexSupervisorTurnMode,
+} from "./types.js";
diff --git a/extensions/codex-supervisor/src/config.ts b/extensions/codex-supervisor/src/config.ts
new file mode 100644
index 00000000000..51b9c82919c
--- /dev/null
+++ b/extensions/codex-supervisor/src/config.ts
@@ -0,0 +1,201 @@
+import { Type, type Static } from "typebox";
+import type { CodexSupervisorEndpoint } from "./types.js";
+
+const ENDPOINTS_ENV = "OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS";
+
+const StdioEndpointSchema = Type.Object(
+ {
+ id: Type.Optional(Type.String()),
+ label: Type.Optional(Type.String()),
+ transport: Type.Optional(Type.Literal("stdio-proxy")),
+ command: Type.Optional(Type.String()),
+ args: Type.Optional(Type.Array(Type.String())),
+ cwd: Type.Optional(Type.String()),
+ },
+ { additionalProperties: false },
+);
+
+const WebSocketEndpointSchema = Type.Object(
+ {
+ id: Type.Optional(Type.String()),
+ label: Type.Optional(Type.String()),
+ transport: Type.Literal("websocket"),
+ url: Type.String(),
+ authTokenEnv: Type.Optional(Type.String()),
+ },
+ { additionalProperties: false },
+);
+
+export const CodexSupervisorPluginConfigSchema = Type.Object(
+ {
+ endpoints: Type.Optional(
+ Type.Array(Type.Union([StdioEndpointSchema, WebSocketEndpointSchema])),
+ ),
+ allowRawTranscripts: Type.Optional(Type.Boolean({ default: false })),
+ allowWriteControls: Type.Optional(Type.Boolean({ default: false })),
+ },
+ { additionalProperties: false },
+);
+
+export type CodexSupervisorPluginConfig = Static;
+
+export type ResolvedCodexSupervisorPluginConfig = {
+ endpoints: CodexSupervisorEndpoint[];
+ allowRawTranscripts: boolean;
+ allowWriteControls: boolean;
+};
+
+function normalizeEndpointId(value: string, index: number): string {
+ const trimmed = value.trim();
+ if (trimmed) {
+ return trimmed.replace(/[^a-zA-Z0-9_.:-]/g, "-");
+ }
+ return `endpoint-${index + 1}`;
+}
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === "object" && !Array.isArray(value);
+}
+
+function parseEndpointRecord(value: unknown, index: number): CodexSupervisorEndpoint | undefined {
+ if (!isRecord(value)) {
+ return undefined;
+ }
+ const transport = typeof value.transport === "string" ? value.transport : undefined;
+ const id =
+ typeof value.id === "string"
+ ? normalizeEndpointId(value.id, index)
+ : normalizeEndpointId(typeof value.label === "string" ? value.label : "", index);
+ const label = typeof value.label === "string" ? value.label : undefined;
+ if (transport === "websocket" && typeof value.url === "string") {
+ return {
+ id,
+ transport,
+ url: value.url,
+ ...(label ? { label } : {}),
+ ...(typeof value.authTokenEnv === "string" ? { authTokenEnv: value.authTokenEnv } : {}),
+ };
+ }
+ if (transport === "stdio-proxy" || transport === undefined) {
+ const args = Array.isArray(value.args)
+ ? value.args.filter((entry): entry is string => typeof entry === "string")
+ : undefined;
+ return {
+ id,
+ transport: "stdio-proxy",
+ ...(label ? { label } : {}),
+ ...(typeof value.command === "string" ? { command: value.command } : {}),
+ ...(args && args.length > 0 ? { args } : {}),
+ ...(typeof value.cwd === "string" ? { cwd: value.cwd } : {}),
+ };
+ }
+ return undefined;
+}
+
+function requireUniqueEndpointIds(endpoints: CodexSupervisorEndpoint[]): CodexSupervisorEndpoint[] {
+ const seen = new Set();
+ for (const endpoint of endpoints) {
+ if (seen.has(endpoint.id)) {
+ throw new Error(`duplicate Codex supervisor endpoint id: ${endpoint.id}`);
+ }
+ seen.add(endpoint.id);
+ }
+ return endpoints;
+}
+
+function endpointFromToken(token: string, index: number): CodexSupervisorEndpoint | undefined {
+ const trimmed = token.trim();
+ if (!trimmed) {
+ return undefined;
+ }
+ if (
+ trimmed.startsWith("ws://") ||
+ trimmed.startsWith("wss://") ||
+ trimmed.startsWith("unix://")
+ ) {
+ return {
+ id: normalizeEndpointId("", index),
+ transport: "websocket",
+ url: trimmed,
+ };
+ }
+ if (trimmed === "local" || trimmed === "proxy" || trimmed === "stdio") {
+ return {
+ id: "local",
+ label: "local Codex app-server daemon",
+ transport: "websocket",
+ url: "unix://",
+ };
+ }
+ const separatorIndex = trimmed.indexOf("=");
+ const id = separatorIndex >= 0 ? trimmed.slice(0, separatorIndex) : trimmed;
+ const url = separatorIndex >= 0 ? trimmed.slice(separatorIndex + 1) : undefined;
+ if (url?.startsWith("ws://") || url?.startsWith("wss://") || url?.startsWith("unix://")) {
+ return {
+ id: normalizeEndpointId(id ?? "", index),
+ transport: "websocket",
+ url,
+ };
+ }
+ return undefined;
+}
+
+export function loadCodexSupervisorEndpoints(
+ env: Pick = process.env,
+): CodexSupervisorEndpoint[] {
+ const raw = env[ENDPOINTS_ENV]?.trim();
+ if (!raw) {
+ return requireUniqueEndpointIds([
+ {
+ id: "local",
+ label: "local Codex app-server daemon",
+ transport: "websocket",
+ url: "unix://",
+ },
+ ]);
+ }
+ if (raw.startsWith("[")) {
+ const parsed = JSON.parse(raw) as unknown;
+ if (!Array.isArray(parsed)) {
+ throw new Error(`${ENDPOINTS_ENV} must be a JSON array`);
+ }
+ return requireUniqueEndpointIds(
+ parsed
+ .map((entry, index) => parseEndpointRecord(entry, index))
+ .filter((entry): entry is CodexSupervisorEndpoint => Boolean(entry)),
+ );
+ }
+ return requireUniqueEndpointIds(
+ raw
+ .split(",")
+ .map(endpointFromToken)
+ .filter((entry): entry is CodexSupervisorEndpoint => Boolean(entry)),
+ );
+}
+
+function normalizeConfiguredEndpoints(
+ endpoints: CodexSupervisorPluginConfig["endpoints"],
+): CodexSupervisorEndpoint[] | undefined {
+ if (!endpoints || endpoints.length === 0) {
+ return undefined;
+ }
+ const normalized = endpoints
+ .map((entry, index) => parseEndpointRecord(entry, index))
+ .filter((entry): entry is CodexSupervisorEndpoint => Boolean(entry));
+ return normalized.length > 0 ? requireUniqueEndpointIds(normalized) : undefined;
+}
+
+export function resolveCodexSupervisorPluginConfig(
+ rawConfig: unknown,
+ env: Pick = process.env,
+): ResolvedCodexSupervisorPluginConfig {
+ const config =
+ rawConfig && typeof rawConfig === "object" && !Array.isArray(rawConfig)
+ ? (rawConfig as CodexSupervisorPluginConfig)
+ : {};
+ return {
+ endpoints: normalizeConfiguredEndpoints(config.endpoints) ?? loadCodexSupervisorEndpoints(env),
+ allowRawTranscripts: config.allowRawTranscripts === true,
+ allowWriteControls: config.allowWriteControls === true,
+ };
+}
diff --git a/extensions/codex-supervisor/src/json-rpc-client.ts b/extensions/codex-supervisor/src/json-rpc-client.ts
new file mode 100644
index 00000000000..ac532e8e632
--- /dev/null
+++ b/extensions/codex-supervisor/src/json-rpc-client.ts
@@ -0,0 +1,334 @@
+import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process";
+import { randomUUID } from "node:crypto";
+import * as net from "node:net";
+import * as os from "node:os";
+import * as path from "node:path";
+import WebSocket from "ws";
+import type { CodexJsonRpcConnection, CodexSupervisorEndpoint } from "./types.js";
+
+type PendingRequest = {
+ reject: (error: Error) => void;
+ resolve: (value: unknown) => void;
+ timeout: NodeJS.Timeout;
+};
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === "object" && !Array.isArray(value);
+}
+
+function formatJsonRpcError(message: Record): Error {
+ const error = isRecord(message.error) ? message.error : {};
+ const detail =
+ typeof error.message === "string" ? error.message : "Codex app-server request failed";
+ return new Error(detail);
+}
+
+function formatMalformedMessageError(error: unknown): Error {
+ const detail = error instanceof Error ? error.message : String(error);
+ return new Error(`Malformed Codex app-server message: ${detail}`);
+}
+
+export function resolveSafeApprovalResult(method: string): Record | undefined {
+ if (method === "item/tool/call") {
+ return {
+ contentItems: [
+ {
+ type: "inputText",
+ text: "OpenClaw Codex supervisor did not register a handler for this app-server tool call.",
+ },
+ ],
+ success: false,
+ };
+ }
+ if (method === "item/commandExecution/requestApproval") {
+ return { decision: "decline" };
+ }
+ if (method === "item/fileChange/requestApproval") {
+ return { decision: "decline" };
+ }
+ if (method === "item/permissions/requestApproval") {
+ return { permissions: {}, scope: "turn" };
+ }
+ if (method.endsWith("/requestApproval")) {
+ return {
+ decision: "decline",
+ reason: "OpenClaw Codex supervisor does not grant native approvals.",
+ };
+ }
+ if (method === "item/tool/requestUserInput") {
+ return { answers: {} };
+ }
+ if (method === "mcpServer/elicitation/request") {
+ return { action: "decline" };
+ }
+ return undefined;
+}
+
+abstract class BaseCodexJsonRpcConnection implements CodexJsonRpcConnection {
+ private readonly pending = new Map();
+ private closedError: Error | undefined;
+
+ abstract close(): Promise;
+ protected abstract sendRaw(line: string): void;
+
+ async initialize(): Promise {
+ await this.request("initialize", {
+ clientInfo: {
+ name: "openclaw-codex-supervisor",
+ title: "OpenClaw Codex Supervisor",
+ version: "0.1.0",
+ },
+ capabilities: {
+ experimentalApi: true,
+ },
+ });
+ this.notify("initialized");
+ }
+
+ request(method: string, params?: Record): Promise {
+ if (this.closedError) {
+ return Promise.reject(this.closedError);
+ }
+ const id = randomUUID();
+ const payload: Record = { id, method, params: params ?? {} };
+ return new Promise((resolve, reject) => {
+ const timeout = setTimeout(() => {
+ this.pending.delete(id);
+ reject(new Error(`Codex app-server request timed out: ${method}`));
+ }, 60_000);
+ this.pending.set(id, { resolve, reject, timeout });
+ try {
+ this.sendRaw(JSON.stringify(payload));
+ } catch (error) {
+ clearTimeout(timeout);
+ this.pending.delete(id);
+ reject(error instanceof Error ? error : new Error(String(error)));
+ }
+ });
+ }
+
+ notify(method: string, params?: Record): void {
+ const payload: Record = { method, params: params ?? null };
+ this.sendRaw(JSON.stringify(payload));
+ }
+
+ protected handleMessage(message: unknown): void {
+ if (!isRecord(message)) {
+ return;
+ }
+ const id =
+ typeof message.id === "string" || typeof message.id === "number" ? message.id : undefined;
+ const method = typeof message.method === "string" ? message.method : undefined;
+ if (id !== undefined && method) {
+ const result = resolveSafeApprovalResult(method);
+ this.sendRaw(
+ JSON.stringify(
+ result === undefined
+ ? {
+ id,
+ error: {
+ code: -32601,
+ message: `OpenClaw Codex supervisor cannot handle app-server request: ${method}`,
+ },
+ }
+ : { id, result },
+ ),
+ );
+ return;
+ }
+ if (id !== undefined) {
+ const pending = this.pending.get(String(id));
+ if (!pending) {
+ return;
+ }
+ clearTimeout(pending.timeout);
+ this.pending.delete(String(id));
+ if ("error" in message) {
+ pending.reject(formatJsonRpcError(message));
+ return;
+ }
+ pending.resolve(message.result);
+ }
+ }
+
+ protected rejectAll(error: Error): void {
+ for (const [id, pending] of this.pending) {
+ clearTimeout(pending.timeout);
+ this.pending.delete(id);
+ pending.reject(error);
+ }
+ }
+
+ protected fail(error: Error): void {
+ this.closedError ??= error;
+ this.rejectAll(this.closedError);
+ }
+}
+
+class StdioCodexJsonRpcConnection extends BaseCodexJsonRpcConnection {
+ private buffer = "";
+ private readonly proc: ChildProcessWithoutNullStreams;
+ private readonly stderrTail: string[] = [];
+
+ constructor(endpoint: Extract) {
+ super();
+ this.proc = spawn(
+ endpoint.command ?? "codex",
+ endpoint.args ?? ["app-server", "--listen", "stdio://"],
+ {
+ cwd: endpoint.cwd,
+ stdio: "pipe",
+ },
+ );
+ this.proc.stdout.setEncoding("utf8");
+ this.proc.stderr.setEncoding("utf8");
+ this.proc.stdout.on("data", (chunk: string) => this.handleStdout(chunk));
+ this.proc.stderr.on("data", (chunk: string) => {
+ this.stderrTail.push(...chunk.split(/\r?\n/).filter(Boolean));
+ this.stderrTail.splice(0, Math.max(0, this.stderrTail.length - 40));
+ });
+ this.proc.stdin.once("error", (error) => this.fail(error));
+ this.proc.once("error", (error) => this.fail(error));
+ this.proc.once("close", () =>
+ this.fail(
+ new Error(
+ `Codex app-server stdio transport closed. stderr_tail=${this.stderrTail.join("\n").slice(0, 1200)}`,
+ ),
+ ),
+ );
+ }
+
+ protected sendRaw(line: string): void {
+ this.proc.stdin.write(`${line}\n`, (error) => {
+ if (error) {
+ this.fail(error);
+ }
+ });
+ }
+
+ async close(): Promise {
+ this.proc.stdin.end();
+ this.proc.kill("SIGTERM");
+ }
+
+ private handleStdout(chunk: string): void {
+ this.buffer += chunk;
+ for (;;) {
+ const index = this.buffer.indexOf("\n");
+ if (index < 0) {
+ return;
+ }
+ const line = this.buffer.slice(0, index).trim();
+ this.buffer = this.buffer.slice(index + 1);
+ if (!line) {
+ continue;
+ }
+ try {
+ this.handleMessage(JSON.parse(line) as unknown);
+ } catch (error) {
+ this.fail(formatMalformedMessageError(error));
+ void this.close();
+ return;
+ }
+ }
+ }
+}
+
+function defaultCodexControlSocketPath(): string {
+ const codexHome = process.env.CODEX_HOME?.trim() || path.join(os.homedir(), ".codex");
+ return path.join(codexHome, "app-server-control", "app-server-control.sock");
+}
+
+function resolveUnixWebSocketPath(url: string): string {
+ const suffix = url.slice("unix://".length);
+ return suffix || defaultCodexControlSocketPath();
+}
+
+function connectCodexSupervisorUnixSocket(url: string): net.Socket {
+ return net.createConnection(resolveUnixWebSocketPath(url));
+}
+
+function websocketMessageToString(data: WebSocket.RawData): string {
+ if (typeof data === "string") {
+ return data;
+ }
+ if (Buffer.isBuffer(data)) {
+ return data.toString("utf8");
+ }
+ if (Array.isArray(data)) {
+ return Buffer.concat(data).toString("utf8");
+ }
+ return Buffer.from(data).toString("utf8");
+}
+
+class WebSocketCodexJsonRpcConnection extends BaseCodexJsonRpcConnection {
+ private readonly ws: WebSocket;
+ private readonly openPromise: Promise;
+
+ constructor(endpoint: Extract) {
+ super();
+ const headers: Record = {};
+ if (endpoint.authTokenEnv) {
+ const token = process.env[endpoint.authTokenEnv];
+ if (token) {
+ headers.authorization = `Bearer ${token}`;
+ }
+ }
+ this.ws = endpoint.url.startsWith("unix://")
+ ? new WebSocket("ws://localhost/", {
+ headers,
+ createConnection: () => connectCodexSupervisorUnixSocket(endpoint.url),
+ })
+ : new WebSocket(endpoint.url, { headers });
+ this.openPromise = new Promise((resolve, reject) => {
+ this.ws.once("open", resolve);
+ this.ws.once("error", reject);
+ });
+ this.ws.on("message", (data) => {
+ const text = websocketMessageToString(data);
+ try {
+ this.handleMessage(JSON.parse(text) as unknown);
+ } catch (error) {
+ this.fail(formatMalformedMessageError(error));
+ void this.close();
+ }
+ });
+ this.ws.once("error", (error) => this.fail(error));
+ this.ws.once("close", () => this.fail(new Error("Codex app-server websocket closed")));
+ }
+
+ async ready(): Promise {
+ await this.openPromise;
+ }
+
+ protected sendRaw(line: string): void {
+ this.ws.send(line, (error) => {
+ if (error) {
+ this.fail(error);
+ }
+ });
+ }
+
+ async close(): Promise {
+ this.ws.close();
+ }
+}
+
+export async function connectCodexAppServerEndpoint(
+ endpoint: CodexSupervisorEndpoint,
+): Promise {
+ const connection =
+ endpoint.transport === "websocket"
+ ? new WebSocketCodexJsonRpcConnection(endpoint)
+ : new StdioCodexJsonRpcConnection(endpoint);
+ try {
+ if ("ready" in connection && typeof connection.ready === "function") {
+ await connection.ready();
+ }
+ await connection.initialize();
+ return connection;
+ } catch (error) {
+ await connection.close().catch(() => undefined);
+ throw error;
+ }
+}
diff --git a/extensions/codex-supervisor/src/mcp-serve.ts b/extensions/codex-supervisor/src/mcp-serve.ts
new file mode 100644
index 00000000000..9d5485b507f
--- /dev/null
+++ b/extensions/codex-supervisor/src/mcp-serve.ts
@@ -0,0 +1,18 @@
+/**
+ * Standalone MCP server for OpenClaw Codex supervision.
+ *
+ * Run via: node --import tsx extensions/codex-supervisor/src/mcp-serve.ts
+ */
+import { pathToFileURL } from "node:url";
+import { serveCodexSupervisorMcp } from "./mcp-server.js";
+
+function formatErrorMessage(error: unknown): string {
+ return error instanceof Error ? error.message : String(error);
+}
+
+if (import.meta.url === pathToFileURL(process.argv[1] ?? "").href) {
+ serveCodexSupervisorMcp().catch((err) => {
+ process.stderr.write(`codex-supervisor-serve: ${formatErrorMessage(err)}\n`);
+ process.exit(1);
+ });
+}
diff --git a/extensions/codex-supervisor/src/mcp-server.ts b/extensions/codex-supervisor/src/mcp-server.ts
new file mode 100644
index 00000000000..f2689907f5e
--- /dev/null
+++ b/extensions/codex-supervisor/src/mcp-server.ts
@@ -0,0 +1,83 @@
+import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
+import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
+import { loadCodexSupervisorEndpoints } from "./config.js";
+import {
+ registerCodexSupervisorMcpTools,
+ type CodexSupervisorMcpToolOptions,
+} from "./mcp-tools.js";
+import { CodexSupervisor } from "./supervisor.js";
+
+const VERSION = "0.1.0";
+
+function routeLogsToStderr(): void {
+ const methods = ["log", "info", "warn", "error", "debug"] as const;
+ for (const method of methods) {
+ console[method] = (...args: unknown[]) => {
+ process.stderr.write(`${args.map(String).join(" ")}\n`);
+ };
+ }
+}
+
+export type CodexSupervisorMcpServeOptions = {
+ supervisor?: CodexSupervisor;
+ toolOptions?: CodexSupervisorMcpToolOptions;
+};
+
+export function createCodexSupervisorMcpServer(opts: CodexSupervisorMcpServeOptions = {}): {
+ server: McpServer;
+ supervisor: CodexSupervisor;
+ close: () => Promise;
+} {
+ const supervisor = opts.supervisor ?? new CodexSupervisor(loadCodexSupervisorEndpoints());
+ const server = new McpServer({ name: "openclaw-codex-supervisor", version: VERSION });
+ registerCodexSupervisorMcpTools(server, supervisor, opts.toolOptions);
+ return {
+ server,
+ supervisor,
+ close: async () => {
+ await supervisor.close();
+ await server.close();
+ },
+ };
+}
+
+export async function serveCodexSupervisorMcp(
+ opts: CodexSupervisorMcpServeOptions = {},
+): Promise {
+ routeLogsToStderr();
+ const { server, close } = createCodexSupervisorMcpServer(opts);
+ const transport = new StdioServerTransport();
+
+ let shuttingDown = false;
+ let resolveClosed!: () => void;
+ const closed = new Promise((resolve) => {
+ resolveClosed = resolve;
+ });
+
+ const shutdown = () => {
+ if (shuttingDown) {
+ return;
+ }
+ shuttingDown = true;
+ process.stdin.off("end", shutdown);
+ process.stdin.off("close", shutdown);
+ process.off("SIGINT", shutdown);
+ process.off("SIGTERM", shutdown);
+ transport["onclose"] = undefined;
+ close().then(resolveClosed, resolveClosed);
+ };
+
+ transport["onclose"] = shutdown;
+ process.stdin.once("end", shutdown);
+ process.stdin.once("close", shutdown);
+ process.once("SIGINT", shutdown);
+ process.once("SIGTERM", shutdown);
+
+ try {
+ await server.connect(transport);
+ await closed;
+ } finally {
+ shutdown();
+ await closed;
+ }
+}
diff --git a/extensions/codex-supervisor/src/mcp-tools.test.ts b/extensions/codex-supervisor/src/mcp-tools.test.ts
new file mode 100644
index 00000000000..8c08ae11147
--- /dev/null
+++ b/extensions/codex-supervisor/src/mcp-tools.test.ts
@@ -0,0 +1,104 @@
+import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
+import { describe, expect, it } from "vitest";
+import {
+ redactCodexSupervisorEndpoint,
+ redactCodexSupervisorValue,
+ registerCodexSupervisorMcpTools,
+ sanitizeCodexSupervisorSessionListResult,
+} from "./mcp-tools.js";
+import type { CodexSupervisor } from "./supervisor.js";
+
+describe("redactCodexSupervisorValue", () => {
+ it("redacts sensitive keys and common bearer-like secrets", () => {
+ expect(
+ redactCodexSupervisorValue({
+ authorization: "Bearer abcdefghijklmnopqrstuvwxyz012345",
+ nested: {
+ apiKey: "sk-abcdefghijklmnopqrstuvwxyz012345",
+ text: "token ghp_abcdefghijklmnopqrstuvwxyz012345 remains hidden",
+ },
+ }),
+ ).toEqual({
+ authorization: "[redacted]",
+ nested: {
+ apiKey: "[redacted]",
+ text: "token [redacted] remains hidden",
+ },
+ });
+ });
+});
+
+describe("redactCodexSupervisorEndpoint", () => {
+ it("removes websocket credentials and query values", () => {
+ expect(
+ redactCodexSupervisorEndpoint({
+ id: "prod",
+ transport: "websocket",
+ url: "wss://user:secret@example.invalid/control?token=a=b",
+ }),
+ ).toEqual({
+ id: "prod",
+ transport: "websocket",
+ url: "wss://example.invalid/control?[redacted]",
+ });
+ });
+});
+
+describe("sanitizeCodexSupervisorSessionListResult", () => {
+ it("omits transcript-derived fields unless explicitly trusted", () => {
+ const result = {
+ sessions: [
+ {
+ endpointId: "local",
+ threadId: "thread-1",
+ status: "idle",
+ preview: "first prompt",
+ name: "thread title",
+ },
+ ],
+ errors: [{ endpointId: "down", ok: false, detail: "stderr secret" }],
+ };
+
+ expect(sanitizeCodexSupervisorSessionListResult(result, false)).toEqual({
+ sessions: [{ endpointId: "local", threadId: "thread-1", status: "idle" }],
+ errors: [{ endpointId: "down", ok: false }],
+ });
+ expect(sanitizeCodexSupervisorSessionListResult(result, true)).toEqual(result);
+ });
+});
+
+describe("registerCodexSupervisorMcpTools", () => {
+ it("uses per-server transcript policy when listing sessions", async () => {
+ const handlers = new Map) => Promise>();
+ const server = {
+ tool(name: string, _description: string, _schema: unknown, handler: unknown) {
+ handlers.set(name, handler as (params: Record) => Promise);
+ },
+ } as unknown as McpServer;
+ const supervisor = {
+ listSessionSnapshot: async () => ({
+ sessions: [
+ {
+ endpointId: "local",
+ threadId: "thread-1",
+ status: "idle",
+ preview: "first prompt",
+ name: "thread title",
+ },
+ ],
+ errors: [{ endpointId: "down", ok: false, detail: "stderr secret" }],
+ }),
+ } as unknown as CodexSupervisor;
+
+ registerCodexSupervisorMcpTools(server, supervisor, {
+ rawTranscriptReadsAllowed: () => false,
+ });
+
+ await expect(handlers.get("codex_sessions_list")?.({})).resolves.toMatchObject({
+ structuredContent: {
+ sessions: [{ endpointId: "local", threadId: "thread-1", status: "idle" }],
+ errors: [{ endpointId: "down", ok: false }],
+ },
+ });
+ });
+});
diff --git a/extensions/codex-supervisor/src/mcp-tools.ts b/extensions/codex-supervisor/src/mcp-tools.ts
new file mode 100644
index 00000000000..cf6f75d40be
--- /dev/null
+++ b/extensions/codex-supervisor/src/mcp-tools.ts
@@ -0,0 +1,259 @@
+import type { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
+import { z } from "zod";
+import type { CodexSupervisor } from "./supervisor.js";
+import type {
+ CodexSupervisorEndpoint,
+ CodexSupervisorSession,
+ CodexSupervisorSessionListResult,
+} from "./types.js";
+
+export const RAW_TRANSCRIPTS_ENV = "OPENCLAW_CODEX_SUPERVISOR_ALLOW_RAW_TRANSCRIPTS";
+export const WRITE_CONTROLS_ENV = "OPENCLAW_CODEX_SUPERVISOR_ALLOW_WRITE_CONTROLS";
+
+export type CodexSupervisorMcpToolOptions = {
+ rawTranscriptReadsAllowed?: () => boolean;
+ writeControlsAllowed?: () => boolean;
+};
+
+function textResult(text: string, structuredContent?: Record) {
+ return {
+ content: [{ type: "text" as const, text }],
+ ...(structuredContent ? { structuredContent } : {}),
+ };
+}
+
+function errorResult(message: string) {
+ return {
+ content: [{ type: "text" as const, text: message }],
+ isError: true,
+ };
+}
+
+function redactString(value: string): string {
+ return value
+ .replace(/\b(?:sk|glpat|xox[baprs])-[-_a-zA-Z0-9]{12,}\b/g, "[redacted]")
+ .replace(/\b(?:ghp|gho|ghu|ghs)_[-_a-zA-Z0-9]{12,}\b/g, "[redacted]")
+ .replace(/\bBearer\s+[-._~+/a-zA-Z0-9]+=*/g, "Bearer [redacted]");
+}
+
+export function redactCodexSupervisorValue(value: unknown, key = ""): unknown {
+ if (typeof value === "string") {
+ if (/authorization|password|secret|token|api[-_]?key/i.test(key)) {
+ return "[redacted]";
+ }
+ return redactString(value);
+ }
+ if (Array.isArray(value)) {
+ return value.map((entry) => redactCodexSupervisorValue(entry));
+ }
+ if (!value || typeof value !== "object") {
+ return value;
+ }
+ return Object.fromEntries(
+ Object.entries(value as Record).map(([entryKey, entryValue]) => [
+ entryKey,
+ redactCodexSupervisorValue(entryValue, entryKey),
+ ]),
+ );
+}
+
+function redactEndpointUrl(value: string): string {
+ if (value.startsWith("unix://")) {
+ return "unix://";
+ }
+ try {
+ const url = new URL(value);
+ url.username = "";
+ url.password = "";
+ if (url.search) {
+ url.search = "?[redacted]";
+ }
+ return url.toString();
+ } catch {
+ return "[redacted]";
+ }
+}
+
+export function redactCodexSupervisorEndpoint(
+ endpoint: CodexSupervisorEndpoint,
+): Record {
+ return {
+ id: endpoint.id,
+ transport: endpoint.transport,
+ ...(endpoint.label ? { label: endpoint.label } : {}),
+ ...(endpoint.transport === "websocket" ? { url: redactEndpointUrl(endpoint.url) } : {}),
+ };
+}
+
+function rawTranscriptReadsAllowed(): boolean {
+ return process.env[RAW_TRANSCRIPTS_ENV] === "1";
+}
+
+function writeControlsAllowed(): boolean {
+ return process.env[WRITE_CONTROLS_ENV] === "1";
+}
+
+function rawTranscriptReadsAllowedFor(opts: CodexSupervisorMcpToolOptions): boolean {
+ return opts.rawTranscriptReadsAllowed
+ ? opts.rawTranscriptReadsAllowed()
+ : rawTranscriptReadsAllowed();
+}
+
+function writeControlsAllowedFor(opts: CodexSupervisorMcpToolOptions): boolean {
+ return opts.writeControlsAllowed ? opts.writeControlsAllowed() : writeControlsAllowed();
+}
+
+function sanitizeSessionForMcp(
+ session: CodexSupervisorSession,
+ includeTranscriptDerivedFields: boolean,
+): Record {
+ const sanitized = redactCodexSupervisorValue(session) as Record;
+ if (!includeTranscriptDerivedFields) {
+ delete sanitized.preview;
+ delete sanitized.name;
+ }
+ return sanitized;
+}
+
+export function sanitizeCodexSupervisorSessionListResult(
+ result: CodexSupervisorSessionListResult,
+ includeTranscriptDerivedFields = rawTranscriptReadsAllowed(),
+): Record {
+ return {
+ sessions: result.sessions.map((session) =>
+ sanitizeSessionForMcp(session, includeTranscriptDerivedFields),
+ ),
+ errors: includeTranscriptDerivedFields
+ ? redactCodexSupervisorValue(result.errors)
+ : result.errors.map(({ endpointId, ok }) => ({ endpointId, ok })),
+ };
+}
+
+export function registerCodexSupervisorMcpTools(
+ server: McpServer,
+ supervisor: CodexSupervisor,
+ opts: CodexSupervisorMcpToolOptions = {},
+): void {
+ server.tool(
+ "codex_endpoint_probe",
+ "Check configured Codex app-server endpoints.",
+ {},
+ async () => {
+ const endpoints = supervisor.listEndpoints().map(redactCodexSupervisorEndpoint);
+ const health = (await supervisor.probeEndpoints()).map(({ endpointId, ok }) => ({
+ endpointId,
+ ok,
+ }));
+ return textResult(
+ `codex endpoints: ${health.filter((entry) => entry.ok).length}/${health.length} ok`,
+ {
+ endpoints,
+ health,
+ },
+ );
+ },
+ );
+
+ server.tool(
+ "codex_sessions_list",
+ "List Codex sessions visible to the OpenClaw supervisor.",
+ {
+ include_stored: z.boolean().optional(),
+ },
+ async ({ include_stored }) => {
+ const result = await supervisor.listSessionSnapshot({
+ includeStored: include_stored ?? false,
+ });
+ return textResult(
+ `codex sessions: ${result.sessions.length}`,
+ sanitizeCodexSupervisorSessionListResult(result, rawTranscriptReadsAllowedFor(opts)),
+ );
+ },
+ );
+
+ server.tool(
+ "codex_session_read",
+ "Read one Codex session transcript from app-server.",
+ {
+ endpoint_id: z.string().optional(),
+ thread_id: z.string().min(1),
+ include_turns: z.boolean().optional(),
+ },
+ async ({ endpoint_id, thread_id, include_turns }) => {
+ if (!rawTranscriptReadsAllowedFor(opts)) {
+ return errorResult(
+ `Codex session reads are disabled; set ${RAW_TRANSCRIPTS_ENV}=1 for a trusted supervisor-only MCP`,
+ );
+ }
+ const includeTurns = include_turns ?? false;
+ try {
+ const response = await supervisor.readSession({
+ endpointId: endpoint_id,
+ threadId: thread_id,
+ includeTurns,
+ });
+ return textResult(`codex session: ${thread_id}`, {
+ response: redactCodexSupervisorValue(response),
+ });
+ } catch (error) {
+ return errorResult(error instanceof Error ? error.message : String(error));
+ }
+ },
+ );
+
+ server.tool(
+ "codex_session_send",
+ "Send text to a Codex session. Idle sessions start a turn; active sessions are steered.",
+ {
+ endpoint_id: z.string().optional(),
+ thread_id: z.string().min(1),
+ text: z.string().min(1),
+ mode: z.enum(["auto", "start", "steer"]).optional(),
+ },
+ async ({ endpoint_id, thread_id, text, mode }) => {
+ if (!writeControlsAllowedFor(opts)) {
+ return errorResult(
+ `Codex write controls are disabled; set ${WRITE_CONTROLS_ENV}=1 for a trusted supervisor-only MCP`,
+ );
+ }
+ try {
+ const result = await supervisor.sendToSession({
+ endpointId: endpoint_id,
+ threadId: thread_id,
+ text,
+ mode,
+ });
+ return textResult(`codex ${result.mode}: ${result.turnId ?? thread_id}`, { result });
+ } catch (error) {
+ return errorResult(error instanceof Error ? error.message : String(error));
+ }
+ },
+ );
+
+ server.tool(
+ "codex_session_interrupt",
+ "Interrupt an active Codex turn.",
+ {
+ endpoint_id: z.string().optional(),
+ thread_id: z.string().min(1),
+ turn_id: z.string().optional(),
+ },
+ async ({ endpoint_id, thread_id, turn_id }) => {
+ if (!writeControlsAllowedFor(opts)) {
+ return errorResult(
+ `Codex write controls are disabled; set ${WRITE_CONTROLS_ENV}=1 for a trusted supervisor-only MCP`,
+ );
+ }
+ try {
+ const result = await supervisor.interruptSession({
+ endpointId: endpoint_id,
+ threadId: thread_id,
+ turnId: turn_id,
+ });
+ return textResult(`codex interrupted: ${result.turnId}`, { result });
+ } catch (error) {
+ return errorResult(error instanceof Error ? error.message : String(error));
+ }
+ },
+ );
+}
diff --git a/extensions/codex-supervisor/src/plugin-tools.test.ts b/extensions/codex-supervisor/src/plugin-tools.test.ts
new file mode 100644
index 00000000000..d28db87f142
--- /dev/null
+++ b/extensions/codex-supervisor/src/plugin-tools.test.ts
@@ -0,0 +1,139 @@
+import { describe, expect, it } from "vitest";
+import { createCodexSupervisorTools } from "./plugin-tools.js";
+import type { CodexSupervisor } from "./supervisor.js";
+
+function createSupervisorStub() {
+ const calls: string[] = [];
+ const supervisor = {
+ listEndpoints: () => [
+ {
+ id: "prod",
+ transport: "websocket",
+ url: "wss://user:secret@example.invalid/control?token=hidden",
+ },
+ ],
+ probeEndpoints: async () => [{ endpointId: "prod", ok: true }],
+ listSessionSnapshot: async () => ({
+ sessions: [
+ {
+ endpointId: "prod",
+ threadId: "thread-1",
+ status: "idle",
+ preview: "secret prompt",
+ name: "secret title",
+ },
+ ],
+ errors: [{ endpointId: "down", ok: false, detail: "secret stderr" }],
+ }),
+ readSession: async () => ({
+ thread: {
+ id: "thread-1",
+ authorization: "Bearer abcdefghijklmnopqrstuvwxyz012345",
+ },
+ }),
+ sendToSession: async (params: { mode?: string }) => {
+ calls.push(`send:${params.mode ?? "auto"}`);
+ return {
+ endpointId: "prod",
+ threadId: "thread-1",
+ mode: "start" as const,
+ turnId: "turn-1",
+ };
+ },
+ interruptSession: async () => {
+ calls.push("interrupt");
+ return {
+ endpointId: "prod",
+ threadId: "thread-1",
+ turnId: "turn-1",
+ };
+ },
+ } satisfies Pick<
+ CodexSupervisor,
+ | "interruptSession"
+ | "listEndpoints"
+ | "listSessionSnapshot"
+ | "probeEndpoints"
+ | "readSession"
+ | "sendToSession"
+ >;
+ return { calls, supervisor: supervisor as unknown as CodexSupervisor };
+}
+
+function toolByName(tools: ReturnType, name: string) {
+ const tool = tools.find((entry) => entry.name === name);
+ if (!tool) {
+ throw new Error(`missing tool: ${name}`);
+ }
+ return tool;
+}
+
+describe("createCodexSupervisorTools", () => {
+ it("registers redacted read-only supervisor tools by default", async () => {
+ const { supervisor } = createSupervisorStub();
+ const tools = createCodexSupervisorTools({
+ supervisor,
+ policy: { allowRawTranscripts: false, allowWriteControls: false },
+ });
+
+ const probe = await toolByName(tools, "codex_endpoint_probe").execute("call-1", {});
+ expect(probe.details).toMatchObject({
+ summary: "codex endpoints: 1/1 ok",
+ endpoints: [
+ { id: "prod", transport: "websocket", url: "wss://example.invalid/control?[redacted]" },
+ ],
+ });
+
+ const list = await toolByName(tools, "codex_sessions_list").execute("call-2", {});
+ expect(list.details).toEqual({
+ summary: "codex sessions: 1",
+ sessions: [{ endpointId: "prod", threadId: "thread-1", status: "idle" }],
+ errors: [{ endpointId: "down", ok: false }],
+ });
+ });
+
+ it("gates transcript reads and write controls", async () => {
+ const { supervisor } = createSupervisorStub();
+ const tools = createCodexSupervisorTools({
+ supervisor,
+ policy: { allowRawTranscripts: false, allowWriteControls: false },
+ });
+
+ await expect(
+ toolByName(tools, "codex_session_read").execute("call-1", { thread_id: "thread-1" }),
+ ).rejects.toThrow("Codex session reads are disabled");
+ await expect(
+ toolByName(tools, "codex_session_send").execute("call-2", {
+ thread_id: "thread-1",
+ text: "continue",
+ }),
+ ).rejects.toThrow("Codex write controls are disabled");
+ });
+
+ it("allows trusted read and write tools when policy enables them", async () => {
+ const { calls, supervisor } = createSupervisorStub();
+ const tools = createCodexSupervisorTools({
+ supervisor,
+ policy: { allowRawTranscripts: true, allowWriteControls: true },
+ });
+
+ const read = await toolByName(tools, "codex_session_read").execute("call-1", {
+ thread_id: "thread-1",
+ });
+ expect(read.details).toEqual({
+ summary: "codex session: thread-1",
+ response: { thread: { id: "thread-1", authorization: "[redacted]" } },
+ });
+
+ const sent = await toolByName(tools, "codex_session_send").execute("call-2", {
+ thread_id: "thread-1",
+ text: "continue",
+ mode: "start",
+ });
+ expect(sent.details).toMatchObject({
+ summary: "codex start: turn-1",
+ result: { turnId: "turn-1" },
+ });
+ expect(calls).toEqual(["send:start"]);
+ });
+});
diff --git a/extensions/codex-supervisor/src/plugin-tools.ts b/extensions/codex-supervisor/src/plugin-tools.ts
new file mode 100644
index 00000000000..fcad3be3a97
--- /dev/null
+++ b/extensions/codex-supervisor/src/plugin-tools.ts
@@ -0,0 +1,193 @@
+import { jsonResult, readStringParam, type AnyAgentTool } from "openclaw/plugin-sdk/core";
+import { Type } from "typebox";
+import {
+ redactCodexSupervisorEndpoint,
+ redactCodexSupervisorValue,
+ sanitizeCodexSupervisorSessionListResult,
+} from "./mcp-tools.js";
+import type { CodexSupervisor } from "./supervisor.js";
+import type { CodexSupervisorTurnMode } from "./types.js";
+
+const EmptyParamsSchema = Type.Object({}, { additionalProperties: false });
+
+const SessionsListParamsSchema = Type.Object(
+ {
+ include_stored: Type.Optional(Type.Boolean()),
+ },
+ { additionalProperties: false },
+);
+
+const SessionReadParamsSchema = Type.Object(
+ {
+ endpoint_id: Type.Optional(Type.String()),
+ thread_id: Type.String(),
+ include_turns: Type.Optional(Type.Boolean()),
+ },
+ { additionalProperties: false },
+);
+
+const SessionSendParamsSchema = Type.Object(
+ {
+ endpoint_id: Type.Optional(Type.String()),
+ thread_id: Type.String(),
+ text: Type.String(),
+ mode: Type.Optional(
+ Type.Union([Type.Literal("auto"), Type.Literal("start"), Type.Literal("steer")]),
+ ),
+ },
+ { additionalProperties: false },
+);
+
+const SessionInterruptParamsSchema = Type.Object(
+ {
+ endpoint_id: Type.Optional(Type.String()),
+ thread_id: Type.String(),
+ turn_id: Type.Optional(Type.String()),
+ },
+ { additionalProperties: false },
+);
+
+export type CodexSupervisorToolPolicy = {
+ allowRawTranscripts: boolean;
+ allowWriteControls: boolean;
+};
+
+export type CodexSupervisorToolOptions = {
+ supervisor: CodexSupervisor;
+ policy: CodexSupervisorToolPolicy;
+};
+
+function asRecord(params: unknown): Record {
+ return params && typeof params === "object" && !Array.isArray(params)
+ ? (params as Record)
+ : {};
+}
+
+function readBooleanParam(params: Record, key: string): boolean {
+ return params[key] === true;
+}
+
+function readModeParam(params: Record): CodexSupervisorTurnMode | undefined {
+ const mode = readStringParam(params, "mode");
+ if (!mode) {
+ return undefined;
+ }
+ if (mode === "auto" || mode === "start" || mode === "steer") {
+ return mode;
+ }
+ throw new Error("mode must be auto, start, or steer");
+}
+
+function requireRawTranscriptAccess(policy: CodexSupervisorToolPolicy): void {
+ if (!policy.allowRawTranscripts) {
+ throw new Error("Codex session reads are disabled for this codex-supervisor plugin config.");
+ }
+}
+
+function requireWriteAccess(policy: CodexSupervisorToolPolicy): void {
+ if (!policy.allowWriteControls) {
+ throw new Error("Codex write controls are disabled for this codex-supervisor plugin config.");
+ }
+}
+
+export function createCodexSupervisorTools({
+ supervisor,
+ policy,
+}: CodexSupervisorToolOptions): AnyAgentTool[] {
+ return [
+ {
+ name: "codex_endpoint_probe",
+ label: "Codex Endpoint Probe",
+ description: "Check configured Codex app-server endpoints.",
+ parameters: EmptyParamsSchema,
+ execute: async () => {
+ const endpoints = supervisor.listEndpoints().map(redactCodexSupervisorEndpoint);
+ const health = (await supervisor.probeEndpoints()).map(({ endpointId, ok }) => ({
+ endpointId,
+ ok,
+ }));
+ return jsonResult({
+ summary: `codex endpoints: ${health.filter((entry) => entry.ok).length}/${health.length} ok`,
+ endpoints,
+ health,
+ });
+ },
+ },
+ {
+ name: "codex_sessions_list",
+ label: "Codex Sessions List",
+ description: "List Codex sessions visible to the OpenClaw supervisor.",
+ parameters: SessionsListParamsSchema,
+ execute: async (_toolCallId, rawParams) => {
+ const params = asRecord(rawParams);
+ const result = await supervisor.listSessionSnapshot({
+ includeStored: readBooleanParam(params, "include_stored"),
+ });
+ return jsonResult({
+ summary: `codex sessions: ${result.sessions.length}`,
+ ...sanitizeCodexSupervisorSessionListResult(result, policy.allowRawTranscripts),
+ });
+ },
+ },
+ {
+ name: "codex_session_read",
+ label: "Codex Session Read",
+ description: "Read one Codex session transcript from app-server.",
+ parameters: SessionReadParamsSchema,
+ execute: async (_toolCallId, rawParams) => {
+ requireRawTranscriptAccess(policy);
+ const params = asRecord(rawParams);
+ const threadId = readStringParam(params, "thread_id", { required: true });
+ const response = await supervisor.readSession({
+ endpointId: readStringParam(params, "endpoint_id"),
+ threadId,
+ includeTurns: readBooleanParam(params, "include_turns"),
+ });
+ return jsonResult({
+ summary: `codex session: ${threadId}`,
+ response: redactCodexSupervisorValue(response),
+ });
+ },
+ },
+ {
+ name: "codex_session_send",
+ label: "Codex Session Send",
+ description:
+ "Send text to a Codex session. Idle sessions start a turn; active sessions are steered.",
+ parameters: SessionSendParamsSchema,
+ execute: async (_toolCallId, rawParams) => {
+ requireWriteAccess(policy);
+ const params = asRecord(rawParams);
+ const result = await supervisor.sendToSession({
+ endpointId: readStringParam(params, "endpoint_id"),
+ threadId: readStringParam(params, "thread_id", { required: true }),
+ text: readStringParam(params, "text", { required: true, allowEmpty: false }),
+ mode: readModeParam(params),
+ });
+ return jsonResult({
+ summary: `codex ${result.mode}: ${result.turnId ?? result.threadId}`,
+ result,
+ });
+ },
+ },
+ {
+ name: "codex_session_interrupt",
+ label: "Codex Session Interrupt",
+ description: "Interrupt an active Codex turn.",
+ parameters: SessionInterruptParamsSchema,
+ execute: async (_toolCallId, rawParams) => {
+ requireWriteAccess(policy);
+ const params = asRecord(rawParams);
+ const result = await supervisor.interruptSession({
+ endpointId: readStringParam(params, "endpoint_id"),
+ threadId: readStringParam(params, "thread_id", { required: true }),
+ turnId: readStringParam(params, "turn_id"),
+ });
+ return jsonResult({
+ summary: `codex interrupted: ${result.turnId}`,
+ result,
+ });
+ },
+ },
+ ];
+}
diff --git a/extensions/codex-supervisor/src/supervisor.test.ts b/extensions/codex-supervisor/src/supervisor.test.ts
new file mode 100644
index 00000000000..38f1eb69ce2
--- /dev/null
+++ b/extensions/codex-supervisor/src/supervisor.test.ts
@@ -0,0 +1,766 @@
+import * as fs from "node:fs/promises";
+import * as os from "node:os";
+import * as path from "node:path";
+import { describe, expect, it } from "vitest";
+import { loadCodexSupervisorEndpoints, resolveCodexSupervisorPluginConfig } from "./config.js";
+import { connectCodexAppServerEndpoint, resolveSafeApprovalResult } from "./json-rpc-client.js";
+import { CodexSupervisor } from "./supervisor.js";
+import type { CodexJsonRpcConnection, CodexSupervisorEndpoint } from "./types.js";
+
+class FakeCodexConnection implements CodexJsonRpcConnection {
+ readonly calls: Array<{ method: string; params?: Record }> = [];
+ closeCount = 0;
+
+ constructor(
+ private thread: Record,
+ private readonly failIncludeTurnsUntilMaterialized = false,
+ ) {}
+
+ async request(method: string, params?: Record): Promise {
+ this.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: [this.thread.id].filter((id) => typeof id === "string"), nextCursor: null };
+ }
+ if (method === "thread/list") {
+ return { threads: [this.thread] };
+ }
+ if (method === "thread/read") {
+ if (this.failIncludeTurnsUntilMaterialized && params?.includeTurns === true) {
+ throw new Error(
+ "thread is not materialized yet; includeTurns is unavailable before first user message",
+ );
+ }
+ return { thread: this.thread };
+ }
+ if (method === "turn/start") {
+ return { turn: { id: "turn-started", status: "inProgress" } };
+ }
+ if (method === "turn/steer") {
+ return {};
+ }
+ if (method === "turn/interrupt") {
+ return {};
+ }
+ throw new Error(`unexpected method: ${method}`);
+ }
+
+ notify(): void {}
+
+ async close(): Promise {
+ this.closeCount += 1;
+ }
+}
+
+const endpoint: CodexSupervisorEndpoint = {
+ id: "local",
+ transport: "stdio-proxy",
+};
+
+describe("loadCodexSupervisorEndpoints", () => {
+ it("defaults to the local app-server Unix websocket", () => {
+ expect(loadCodexSupervisorEndpoints({})).toEqual([
+ {
+ id: "local",
+ label: "local Codex app-server daemon",
+ transport: "websocket",
+ url: "unix://",
+ },
+ ]);
+ });
+
+ it("parses websocket shorthand endpoints", () => {
+ expect(
+ loadCodexSupervisorEndpoints({
+ OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS: "crab=ws://127.0.0.1:18080,local",
+ }),
+ ).toEqual([
+ {
+ id: "crab",
+ transport: "websocket",
+ url: "ws://127.0.0.1:18080",
+ },
+ {
+ id: "local",
+ label: "local Codex app-server daemon",
+ transport: "websocket",
+ url: "unix://",
+ },
+ ]);
+ });
+
+ it("keeps equals signs inside endpoint URLs", () => {
+ expect(
+ loadCodexSupervisorEndpoints({
+ OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS: "prod=wss://example.invalid/control?token=a=b&next=c",
+ }),
+ ).toEqual([
+ {
+ id: "prod",
+ transport: "websocket",
+ url: "wss://example.invalid/control?token=a=b&next=c",
+ },
+ ]);
+ });
+
+ it("does not derive generated endpoint ids from secret-bearing URLs", () => {
+ expect(
+ loadCodexSupervisorEndpoints({
+ OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS: "wss://user:secret@example.invalid/control?token=a=b",
+ }),
+ ).toEqual([
+ {
+ id: "endpoint-1",
+ transport: "websocket",
+ url: "wss://user:secret@example.invalid/control?token=a=b",
+ },
+ ]);
+ expect(
+ loadCodexSupervisorEndpoints({
+ OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS: JSON.stringify([
+ {
+ transport: "websocket",
+ url: "wss://example.invalid/control?token=secret",
+ },
+ ]),
+ }),
+ ).toEqual([
+ {
+ id: "endpoint-1",
+ transport: "websocket",
+ url: "wss://example.invalid/control?token=secret",
+ },
+ ]);
+ });
+
+ it("rejects duplicate normalized endpoint ids", () => {
+ expect(() =>
+ loadCodexSupervisorEndpoints({
+ OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS: "fleet/a=ws://one.invalid,fleet-a=ws://two.invalid",
+ }),
+ ).toThrow("duplicate Codex supervisor endpoint id: fleet-a");
+ expect(() =>
+ resolveCodexSupervisorPluginConfig({
+ endpoints: [
+ { id: "fleet/a", transport: "websocket", url: "ws://one.invalid" },
+ { id: "fleet-a", transport: "websocket", url: "ws://two.invalid" },
+ ],
+ }),
+ ).toThrow("duplicate Codex supervisor endpoint id: fleet-a");
+ });
+
+ it("prefers plugin-configured endpoints over environment defaults", () => {
+ expect(
+ resolveCodexSupervisorPluginConfig(
+ {
+ endpoints: [
+ {
+ id: "fleet",
+ transport: "websocket",
+ url: "wss://fleet.example.invalid/codex",
+ },
+ ],
+ allowRawTranscripts: true,
+ allowWriteControls: true,
+ },
+ {
+ OPENCLAW_CODEX_SUPERVISOR_ENDPOINTS: "local",
+ },
+ ),
+ ).toEqual({
+ endpoints: [
+ {
+ id: "fleet",
+ transport: "websocket",
+ url: "wss://fleet.example.invalid/codex",
+ },
+ ],
+ allowRawTranscripts: true,
+ allowWriteControls: true,
+ });
+ });
+});
+
+describe("CodexSupervisor", () => {
+ it("does not permanently cache failed endpoint connections", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ let attempts = 0;
+ const supervisor = new CodexSupervisor([endpoint], async () => {
+ attempts += 1;
+ if (attempts === 1) {
+ throw new Error("daemon unavailable");
+ }
+ return fake;
+ });
+
+ await expect(supervisor.probeEndpoints()).resolves.toEqual([
+ { endpointId: "local", ok: false, detail: "daemon unavailable" },
+ ]);
+ await expect(supervisor.probeEndpoints()).resolves.toEqual([{ endpointId: "local", ok: true }]);
+ expect(attempts).toBe(2);
+ });
+
+ it("lists loaded sessions", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ cwd: "/workspace",
+ preview: "work",
+ sessionId: "session-1",
+ source: "vscode",
+ status: { type: "idle" },
+ updatedAt: 10,
+ turns: [],
+ });
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(supervisor.listSessions()).resolves.toEqual([
+ {
+ endpointId: "local",
+ threadId: "thread-1",
+ cwd: "/workspace",
+ preview: "work",
+ sessionId: "session-1",
+ source: "vscode",
+ status: "idle",
+ updatedAt: 10,
+ humanAttached: true,
+ },
+ ]);
+ });
+
+ it("lists loaded sessions from real app-server data responses", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ cwd: "/workspace",
+ status: { type: "idle" },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: ["thread-1"], nextCursor: null };
+ }
+ if (method === "thread/read") {
+ return {
+ thread: { id: "thread-1", cwd: "/workspace", status: { type: "idle" }, turns: [] },
+ };
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(supervisor.listSessions()).resolves.toEqual([
+ {
+ endpointId: "local",
+ threadId: "thread-1",
+ cwd: "/workspace",
+ status: "idle",
+ humanAttached: true,
+ },
+ ]);
+ });
+
+ it("hydrates loaded-only sessions without stored history", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-live",
+ cwd: "/workspace",
+ status: { type: "active", activeFlags: [] },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: ["thread-live"], nextCursor: null };
+ }
+ if (method === "thread/read") {
+ return {
+ thread: {
+ id: "thread-live",
+ cwd: "/workspace",
+ status: { type: "active", activeFlags: [] },
+ turns: [],
+ },
+ };
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(supervisor.listSessions()).resolves.toEqual([
+ {
+ endpointId: "local",
+ threadId: "thread-live",
+ cwd: "/workspace",
+ status: "active",
+ humanAttached: true,
+ },
+ ]);
+ expect(fake.calls.map((call) => call.method)).toEqual(["thread/loaded/list", "thread/read"]);
+ });
+
+ it("does not enumerate stored sessions unless requested", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "notLoaded" },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: [], nextCursor: null };
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(supervisor.listSessions()).resolves.toEqual([]);
+ expect(fake.calls.map((call) => call.method)).toEqual(["thread/loaded/list"]);
+ });
+
+ it("reads stored sessions from real app-server data responses", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: [], nextCursor: null };
+ }
+ if (method === "thread/list") {
+ return {
+ data: [{ id: "thread-1", status: { type: "notLoaded" }, turns: [] }],
+ nextCursor: null,
+ };
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(supervisor.listSessions({ includeStored: true })).resolves.toEqual([
+ {
+ endpointId: "local",
+ threadId: "thread-1",
+ status: "notLoaded",
+ },
+ ]);
+ expect(fake.calls.find((call) => call.method === "thread/list")?.params).toMatchObject({
+ sourceKinds: ["cli", "vscode", "exec", "appServer", "unknown"],
+ });
+ });
+
+ it("reads every stored session page", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: [], nextCursor: null };
+ }
+ if (method === "thread/list") {
+ if (params?.cursor === "page-2") {
+ return {
+ data: [{ id: "thread-2", status: { type: "notLoaded" }, turns: [] }],
+ nextCursor: null,
+ };
+ }
+ return {
+ data: [{ id: "thread-1", status: { type: "notLoaded" }, turns: [] }],
+ nextCursor: "page-2",
+ };
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(supervisor.listSessions({ includeStored: true })).resolves.toEqual([
+ {
+ endpointId: "local",
+ threadId: "thread-1",
+ status: "notLoaded",
+ },
+ {
+ endpointId: "local",
+ threadId: "thread-2",
+ status: "notLoaded",
+ },
+ ]);
+ });
+
+ it("closes settled connections when evicting them", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/read") {
+ throw new Error("transport closed");
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.readSession({ endpointId: "local", threadId: "thread-1" }),
+ ).rejects.toThrow("transport closed");
+ await Promise.resolve();
+ expect(fake.closeCount).toBe(1);
+ });
+
+ it("keeps listing healthy endpoints when one endpoint is down", async () => {
+ const downEndpoint: CodexSupervisorEndpoint = {
+ id: "down",
+ transport: "stdio-proxy",
+ };
+ const upEndpoint: CodexSupervisorEndpoint = {
+ id: "up",
+ transport: "stdio-proxy",
+ };
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ const supervisor = new CodexSupervisor([downEndpoint, upEndpoint], async (target) => {
+ if (target.id === "down") {
+ throw new Error("host offline");
+ }
+ return fake;
+ });
+
+ await expect(supervisor.listSessionSnapshot()).resolves.toEqual({
+ sessions: [
+ {
+ endpointId: "up",
+ threadId: "thread-1",
+ status: "idle",
+ humanAttached: true,
+ },
+ ],
+ errors: [{ endpointId: "down", ok: false, detail: "host offline" }],
+ });
+ });
+
+ it("starts a new turn for idle sessions", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.sendToSession({ endpointId: "local", threadId: "thread-1", text: "continue" }),
+ ).resolves.toMatchObject({
+ endpointId: "local",
+ threadId: "thread-1",
+ mode: "start",
+ turnId: "turn-started",
+ });
+ expect(fake.calls.at(-1)).toEqual({
+ method: "turn/start",
+ params: {
+ threadId: "thread-1",
+ input: [{ type: "text", text: "continue", text_elements: [] }],
+ },
+ });
+ });
+
+ it("resolves omitted endpoint ids from loaded-only sessions", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/loaded/list") {
+ return { data: ["thread-1"], nextCursor: null };
+ }
+ if (method === "thread/read") {
+ return { thread: { id: "thread-1", status: { type: "idle" }, turns: [] } };
+ }
+ if (method === "thread/list") {
+ return { data: [], nextCursor: null };
+ }
+ if (method === "turn/start") {
+ return { turn: { id: "turn-started", status: "inProgress" } };
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.sendToSession({ threadId: "thread-1", text: "continue" }),
+ ).resolves.toMatchObject({
+ endpointId: "local",
+ threadId: "thread-1",
+ mode: "start",
+ });
+ });
+
+ it("steers active sessions when the in-progress turn is readable", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "active", activeFlags: [] },
+ turns: [
+ { id: "turn-old", status: "completed", items: [] },
+ { id: "turn-active", status: "inProgress", items: [] },
+ ],
+ });
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.sendToSession({ endpointId: "local", threadId: "thread-1", text: "heads up" }),
+ ).resolves.toEqual({
+ endpointId: "local",
+ threadId: "thread-1",
+ mode: "steer",
+ turnId: "turn-active",
+ status: "active",
+ });
+ expect(fake.calls.at(-1)).toEqual({
+ method: "turn/steer",
+ params: {
+ threadId: "thread-1",
+ expectedTurnId: "turn-active",
+ input: [{ type: "text", text: "heads up", text_elements: [] }],
+ },
+ });
+ });
+
+ it("steers active sessions through the live turns list fallback", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "active", activeFlags: [] },
+ turns: [],
+ });
+ fake.request = async (method, params) => {
+ fake.calls.push({ method, params });
+ if (method === "thread/list") {
+ return {
+ data: [{ id: "thread-1", status: { type: "active", activeFlags: [] }, turns: [] }],
+ nextCursor: null,
+ };
+ }
+ if (method === "thread/read") {
+ return {
+ thread: {
+ id: "thread-1",
+ status: { type: "active", activeFlags: [] },
+ turns: [],
+ },
+ };
+ }
+ if (method === "thread/turns/list") {
+ return {
+ data: [{ id: "turn-active", status: "inProgress", items: [] }],
+ nextCursor: null,
+ };
+ }
+ if (method === "turn/steer") {
+ return {};
+ }
+ throw new Error(`unexpected method: ${method}`);
+ };
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.sendToSession({ endpointId: "local", threadId: "thread-1", text: "heads up" }),
+ ).resolves.toEqual({
+ endpointId: "local",
+ threadId: "thread-1",
+ mode: "steer",
+ turnId: "turn-active",
+ status: "active",
+ });
+ });
+
+ it("fails closed when active turn id is not readable", async () => {
+ const fake = new FakeCodexConnection({
+ id: "thread-1",
+ status: { type: "active", activeFlags: [] },
+ turns: [],
+ });
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.sendToSession({ endpointId: "local", threadId: "thread-1", text: "heads up" }),
+ ).rejects.toThrow("active but no in-progress turn is readable");
+ });
+
+ it("falls back to reading empty unmaterialized threads without turns", async () => {
+ const fake = new FakeCodexConnection(
+ {
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ },
+ true,
+ );
+ const supervisor = new CodexSupervisor([endpoint], async () => fake);
+
+ await expect(
+ supervisor.readSession({ endpointId: "local", threadId: "thread-1", includeTurns: true }),
+ ).resolves.toEqual({
+ thread: {
+ id: "thread-1",
+ status: { type: "idle" },
+ turns: [],
+ },
+ });
+ expect(
+ fake.calls.filter((call) => call.method === "thread/read").map((call) => call.params),
+ ).toEqual([
+ { threadId: "thread-1", includeTurns: true },
+ { threadId: "thread-1", includeTurns: false },
+ ]);
+ });
+});
+
+describe("resolveSafeApprovalResult", () => {
+ it("returns a valid fail-closed permissions response", () => {
+ expect(resolveSafeApprovalResult("item/permissions/requestApproval")).toEqual({
+ permissions: {},
+ scope: "turn",
+ });
+ });
+
+ it("returns valid fail-closed responses for non-approval server requests", () => {
+ expect(resolveSafeApprovalResult("item/tool/call")).toEqual({
+ contentItems: [
+ {
+ type: "inputText",
+ text: "OpenClaw Codex supervisor did not register a handler for this app-server tool call.",
+ },
+ ],
+ success: false,
+ });
+ expect(resolveSafeApprovalResult("item/tool/requestUserInput")).toEqual({ answers: {} });
+ expect(resolveSafeApprovalResult("mcpServer/elicitation/request")).toEqual({
+ action: "decline",
+ });
+ expect(resolveSafeApprovalResult("unknown/request")).toBeUndefined();
+ });
+});
+
+async function waitForFile(filePath: string): Promise {
+ for (let attempt = 0; attempt < 50; attempt += 1) {
+ try {
+ return await fs.readFile(filePath, "utf8");
+ } catch (error) {
+ if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
+ throw error;
+ }
+ await new Promise((resolve) => setTimeout(resolve, 20));
+ }
+ }
+ throw new Error(`timed out waiting for ${filePath}`);
+}
+
+describe("connectCodexAppServerEndpoint", () => {
+ it("rejects malformed stdio frames instead of throwing out of band", async () => {
+ const markerDir = await fs.mkdtemp(path.join(os.tmpdir(), "codex-supervisor-malformed-"));
+ const marker = path.join(markerDir, "closed");
+ const script = `
+ const fs = require("node:fs");
+ const readline = require("node:readline");
+ process.on("SIGTERM", () => {
+ fs.writeFileSync(${JSON.stringify(marker)}, "closed");
+ process.exit(0);
+ });
+ readline.createInterface({ input: process.stdin }).on("line", () => {
+ process.stdout.write("not-json\\n");
+ });
+ setTimeout(() => {}, 10_000);
+ `;
+
+ await expect(
+ connectCodexAppServerEndpoint({
+ id: "bad",
+ transport: "stdio-proxy",
+ command: process.execPath,
+ args: ["-e", script],
+ }),
+ ).rejects.toThrow("Malformed Codex app-server message");
+ await expect(waitForFile(marker)).resolves.toBe("closed");
+ });
+
+ it("closes stdio connections when initialization fails", async () => {
+ const markerDir = await fs.mkdtemp(path.join(os.tmpdir(), "codex-supervisor-init-"));
+ const marker = path.join(markerDir, "closed");
+ const script = `
+ const fs = require("node:fs");
+ const readline = require("node:readline");
+ process.on("SIGTERM", () => {
+ fs.writeFileSync(${JSON.stringify(marker)}, "closed");
+ process.exit(0);
+ });
+ readline.createInterface({ input: process.stdin }).on("line", (line) => {
+ const request = JSON.parse(line);
+ process.stdout.write(JSON.stringify({
+ id: request.id,
+ error: { code: -32000, message: "init failed" }
+ }) + "\\n");
+ });
+ setTimeout(() => {}, 10_000);
+ `;
+
+ await expect(
+ connectCodexAppServerEndpoint({
+ id: "bad",
+ transport: "stdio-proxy",
+ command: process.execPath,
+ args: ["-e", script],
+ }),
+ ).rejects.toThrow("init failed");
+ await expect(waitForFile(marker)).resolves.toBe("closed");
+ });
+
+ it("fails a cached stdio connection cleanly after the child exits", async () => {
+ const script = `
+ const readline = require("node:readline");
+ readline.createInterface({ input: process.stdin }).on("line", (line) => {
+ const request = JSON.parse(line);
+ if (request.method === "initialize") {
+ process.stdout.write(JSON.stringify({ id: request.id, result: {} }) + "\\n");
+ return;
+ }
+ if (request.method === "thread/list") {
+ process.stdout.write(JSON.stringify({ id: request.id, result: { threads: [] } }) + "\\n");
+ setTimeout(() => process.exit(0), 0);
+ }
+ });
+ `;
+ const supervisor = new CodexSupervisor(
+ [
+ {
+ id: "exits",
+ transport: "stdio-proxy",
+ command: process.execPath,
+ args: ["-e", script],
+ },
+ ],
+ connectCodexAppServerEndpoint,
+ );
+
+ await expect(supervisor.probeEndpoints()).resolves.toEqual([{ endpointId: "exits", ok: true }]);
+ await new Promise((resolve) => setTimeout(resolve, 50));
+ await expect(supervisor.probeEndpoints()).resolves.toMatchObject([
+ {
+ endpointId: "exits",
+ ok: false,
+ },
+ ]);
+ await supervisor.close();
+ });
+});
diff --git a/extensions/codex-supervisor/src/supervisor.ts b/extensions/codex-supervisor/src/supervisor.ts
new file mode 100644
index 00000000000..f43f69dab5e
--- /dev/null
+++ b/extensions/codex-supervisor/src/supervisor.ts
@@ -0,0 +1,476 @@
+import { connectCodexAppServerEndpoint } from "./json-rpc-client.js";
+import type {
+ CodexJsonRpcConnection,
+ CodexSupervisorEndpoint,
+ CodexSupervisorEndpointHealth,
+ CodexSupervisorSendResult,
+ CodexSupervisorSession,
+ CodexSupervisorSessionListResult,
+ CodexSupervisorThreadStatus,
+ CodexSupervisorTurnMode,
+} from "./types.js";
+
+type EndpointConnector = (endpoint: CodexSupervisorEndpoint) => Promise;
+
+const ALL_CODEX_THREAD_SOURCE_KINDS = ["cli", "vscode", "exec", "appServer", "unknown"];
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === "object" && !Array.isArray(value);
+}
+
+function asRecordArray(value: unknown): Record[] {
+ if (!Array.isArray(value)) {
+ return [];
+ }
+ return value.filter(isRecord);
+}
+
+function extractThread(value: unknown): Record | undefined {
+ if (!isRecord(value)) {
+ return undefined;
+ }
+ if (isRecord(value.thread)) {
+ return value.thread;
+ }
+ return undefined;
+}
+
+function extractThreadList(value: unknown): Record[] {
+ if (!isRecord(value)) {
+ return [];
+ }
+ if (Array.isArray(value.data)) {
+ return asRecordArray(value.data);
+ }
+ if (Array.isArray(value.threads)) {
+ return asRecordArray(value.threads);
+ }
+ if (Array.isArray(value.loadedThreads)) {
+ return asRecordArray(value.loadedThreads);
+ }
+ return [];
+}
+
+function extractStringList(value: unknown): string[] {
+ if (!isRecord(value) || !Array.isArray(value.data)) {
+ return [];
+ }
+ return value.data.filter((entry) => typeof entry === "string");
+}
+
+function getStatusType(thread: Record): CodexSupervisorThreadStatus {
+ const status = thread.status;
+ if (isRecord(status) && typeof status.type === "string") {
+ return status.type;
+ }
+ if (typeof status === "string") {
+ return status;
+ }
+ return "unknown";
+}
+
+function toSession(
+ endpointId: string,
+ thread: Record,
+ humanAttached?: boolean,
+): CodexSupervisorSession | undefined {
+ if (typeof thread.id !== "string") {
+ return undefined;
+ }
+ return {
+ endpointId,
+ threadId: thread.id,
+ status: getStatusType(thread),
+ ...(typeof thread.sessionId === "string" ? { sessionId: thread.sessionId } : {}),
+ ...(typeof thread.cwd === "string" ? { cwd: thread.cwd } : {}),
+ ...(typeof thread.preview === "string" ? { preview: thread.preview } : {}),
+ ...("name" in thread && (typeof thread.name === "string" || thread.name === null)
+ ? { name: thread.name }
+ : {}),
+ ...(typeof thread.source === "string" ? { source: thread.source } : {}),
+ ...(typeof thread.updatedAt === "number" ? { updatedAt: thread.updatedAt } : {}),
+ ...(humanAttached !== undefined ? { humanAttached } : {}),
+ };
+}
+
+function findInProgressTurnId(thread: Record): string | undefined {
+ const turns = asRecordArray(thread.turns);
+ for (const turn of turns.toReversed()) {
+ if (turn.status === "inProgress" && typeof turn.id === "string") {
+ return turn.id;
+ }
+ }
+ return undefined;
+}
+
+function isLoadedThreadReadMiss(error: unknown): boolean {
+ const message = error instanceof Error ? error.message : String(error);
+ return message.includes("thread not found") || message.includes("thread not loaded");
+}
+
+export class CodexSupervisor {
+ private readonly connections = new Map>();
+
+ constructor(
+ private readonly endpoints: CodexSupervisorEndpoint[],
+ private readonly connector: EndpointConnector = connectCodexAppServerEndpoint,
+ ) {}
+
+ listEndpoints(): CodexSupervisorEndpoint[] {
+ return this.endpoints;
+ }
+
+ async close(): Promise {
+ const settled = await Promise.allSettled(this.connections.values());
+ this.connections.clear();
+ await Promise.all(
+ settled.map(async (entry) => {
+ if (entry.status === "fulfilled") {
+ await entry.value.close();
+ }
+ }),
+ );
+ }
+
+ async probeEndpoints(): Promise {
+ return await Promise.all(
+ this.endpoints.map(async (endpoint) => {
+ try {
+ const connection = await this.connectionFor(endpoint.id);
+ await connection.request("thread/list", { limit: 1 });
+ return { endpointId: endpoint.id, ok: true };
+ } catch (error) {
+ this.forgetEndpoint(endpoint.id);
+ return {
+ endpointId: endpoint.id,
+ ok: false,
+ detail: error instanceof Error ? error.message : String(error),
+ };
+ }
+ }),
+ );
+ }
+
+ async listSessions(params: { includeStored?: boolean } = {}): Promise {
+ return (await this.listSessionSnapshot(params)).sessions;
+ }
+
+ async listSessionSnapshot(
+ params: { includeStored?: boolean } = {},
+ ): Promise {
+ const sessions: CodexSupervisorSession[] = [];
+ const errors: CodexSupervisorEndpointHealth[] = [];
+ for (const endpoint of this.endpoints) {
+ try {
+ sessions.push(...(await this.listEndpointSessions(endpoint, params)));
+ } catch (error) {
+ this.forgetEndpoint(endpoint.id);
+ errors.push({
+ endpointId: endpoint.id,
+ ok: false,
+ detail: error instanceof Error ? error.message : String(error),
+ });
+ }
+ }
+ return { sessions, errors };
+ }
+
+ async readSession(params: {
+ endpointId?: string;
+ threadId: string;
+ includeTurns?: boolean;
+ }): Promise> {
+ const endpointId = await this.resolveEndpointId(params);
+ const connection = await this.connectionFor(endpointId);
+ try {
+ const result = await this.readThread(
+ connection,
+ params.threadId,
+ params.includeTurns === true,
+ );
+ if (!isRecord(result)) {
+ throw new Error("Codex thread/read returned a non-object response");
+ }
+ return result;
+ } catch (error) {
+ this.forgetEndpoint(endpointId);
+ throw error;
+ }
+ }
+
+ async sendToSession(params: {
+ endpointId?: string;
+ threadId: string;
+ text: string;
+ mode?: CodexSupervisorTurnMode;
+ }): Promise {
+ const endpointId = await this.resolveEndpointId(params);
+ const connection = await this.connectionFor(endpointId);
+ try {
+ const mode = params.mode ?? "auto";
+ if (mode === "start") {
+ return await this.startTurn(connection, endpointId, params.threadId, params.text);
+ }
+
+ const read = await this.readThread(connection, params.threadId, false);
+ const thread = extractThread(read);
+ if (!thread) {
+ throw new Error(`Codex thread not found: ${params.threadId}`);
+ }
+ const status = getStatusType(thread);
+ if (mode === "steer" || status === "active") {
+ const detailed = await this.readThread(connection, params.threadId, true);
+ const detailedThread = extractThread(detailed);
+ const turnId =
+ (detailedThread ? findInProgressTurnId(detailedThread) : undefined) ??
+ findInProgressTurnId(thread) ??
+ (await this.readActiveTurnId(connection, params.threadId));
+ if (!turnId) {
+ throw new Error(
+ `Codex thread ${params.threadId} is active but no in-progress turn is readable`,
+ );
+ }
+ await connection.request("turn/steer", {
+ threadId: params.threadId,
+ expectedTurnId: turnId,
+ input: [{ type: "text", text: params.text, text_elements: [] }],
+ });
+ return { endpointId, threadId: params.threadId, mode: "steer", turnId, status };
+ }
+ return await this.startTurn(connection, endpointId, params.threadId, params.text);
+ } catch (error) {
+ this.forgetEndpoint(endpointId);
+ throw error;
+ }
+ }
+
+ async interruptSession(params: {
+ endpointId?: string;
+ threadId: string;
+ turnId?: string;
+ }): Promise<{ endpointId: string; threadId: string; turnId: string }> {
+ const endpointId = await this.resolveEndpointId(params);
+ const connection = await this.connectionFor(endpointId);
+ try {
+ let turnId = params.turnId;
+ if (!turnId) {
+ const read = await this.readThread(connection, params.threadId, true);
+ const thread = extractThread(read);
+ turnId =
+ (thread ? findInProgressTurnId(thread) : undefined) ??
+ (await this.readActiveTurnId(connection, params.threadId));
+ }
+ if (!turnId) {
+ throw new Error(`Codex thread ${params.threadId} has no readable in-progress turn`);
+ }
+ await connection.request("turn/interrupt", { threadId: params.threadId, turnId });
+ return { endpointId, threadId: params.threadId, turnId };
+ } catch (error) {
+ this.forgetEndpoint(endpointId);
+ throw error;
+ }
+ }
+
+ private async listEndpointSessions(
+ endpoint: CodexSupervisorEndpoint,
+ params: { includeStored?: boolean },
+ ): Promise {
+ if (params.includeStored === true) {
+ const loaded = await this.listLoadedThreadSessions(endpoint);
+ const sessions = [...loaded];
+ for (const stored of await this.listStoredThreadSessions(endpoint)) {
+ if (!sessions.some((session) => session.threadId === stored.threadId)) {
+ sessions.push(stored);
+ }
+ }
+ return sessions;
+ }
+ return await this.listLoadedThreadSessions(endpoint);
+ }
+
+ private async listLoadedThreadSessions(
+ endpoint: CodexSupervisorEndpoint,
+ ): Promise {
+ const sessions: CodexSupervisorSession[] = [];
+ const connection = await this.connectionFor(endpoint.id);
+ let cursor: string | undefined;
+ do {
+ const listed = await connection.request("thread/loaded/list", {
+ limit: 100,
+ ...(cursor ? { cursor } : {}),
+ });
+ for (const threadId of extractStringList(listed)) {
+ if (sessions.some((entry) => entry.threadId === threadId)) {
+ continue;
+ }
+ const read = await this.readOptionalLoadedThread(connection, threadId);
+ const thread = extractThread(read);
+ const session = thread ? toSession(endpoint.id, thread, true) : undefined;
+ if (session) {
+ sessions.push(session);
+ }
+ }
+ cursor =
+ isRecord(listed) && typeof listed.nextCursor === "string" ? listed.nextCursor : undefined;
+ } while (cursor);
+ return sessions;
+ }
+
+ private async listStoredThreadSessions(
+ endpoint: CodexSupervisorEndpoint,
+ ): Promise {
+ const sessions: CodexSupervisorSession[] = [];
+ const connection = await this.connectionFor(endpoint.id);
+ let cursor: string | undefined;
+ do {
+ const listed = await connection.request("thread/list", {
+ limit: 100,
+ sourceKinds: ALL_CODEX_THREAD_SOURCE_KINDS,
+ ...(cursor ? { cursor } : {}),
+ });
+ for (const thread of extractThreadList(listed)) {
+ if (typeof thread.id !== "string") {
+ continue;
+ }
+ if (
+ sessions.some((entry) => entry.endpointId === endpoint.id && entry.threadId === thread.id)
+ ) {
+ continue;
+ }
+ const session = toSession(endpoint.id, thread);
+ if (session) {
+ sessions.push(session);
+ }
+ }
+ cursor =
+ isRecord(listed) && typeof listed.nextCursor === "string" ? listed.nextCursor : undefined;
+ } while (cursor);
+ return sessions;
+ }
+
+ private async readOptionalLoadedThread(
+ connection: CodexJsonRpcConnection,
+ threadId: string,
+ ): Promise {
+ try {
+ return await this.readLoadedThread(connection, threadId, false);
+ } catch (error) {
+ if (isLoadedThreadReadMiss(error)) {
+ return undefined;
+ }
+ throw error;
+ }
+ }
+
+ private async readLoadedThread(
+ connection: CodexJsonRpcConnection,
+ threadId: string,
+ includeTurns: boolean,
+ ): Promise {
+ try {
+ return await connection.request("thread/read", { threadId, includeTurns });
+ } catch (error) {
+ if (!includeTurns) {
+ throw error;
+ }
+ const message = error instanceof Error ? error.message : String(error);
+ if (!message.includes("not materialized yet")) {
+ throw error;
+ }
+ return await connection.request("thread/read", { threadId, includeTurns: false });
+ }
+ }
+
+ private async startTurn(
+ connection: CodexJsonRpcConnection,
+ endpointId: string,
+ threadId: string,
+ text: string,
+ ): Promise {
+ const result = await connection.request("turn/start", {
+ threadId,
+ input: [{ type: "text", text, text_elements: [] }],
+ });
+ const turn = isRecord(result) && isRecord(result.turn) ? result.turn : undefined;
+ return {
+ endpointId,
+ threadId,
+ mode: "start",
+ ...(typeof turn?.id === "string" ? { turnId: turn.id } : {}),
+ ...(typeof turn?.status === "string" ? { status: turn.status } : {}),
+ };
+ }
+
+ private async readThread(
+ connection: CodexJsonRpcConnection,
+ threadId: string,
+ includeTurns: boolean,
+ ): Promise {
+ return await this.readLoadedThread(connection, threadId, includeTurns);
+ }
+
+ private async readActiveTurnId(
+ connection: CodexJsonRpcConnection,
+ threadId: string,
+ ): Promise {
+ try {
+ const response = await connection.request("thread/turns/list", {
+ threadId,
+ limit: 10,
+ sortDirection: "desc",
+ itemsView: "summary",
+ });
+ return extractThreadList(response).find(
+ (turn) => turn.status === "inProgress" && typeof turn.id === "string",
+ )?.id as string | undefined;
+ } catch {
+ return undefined;
+ }
+ }
+
+ private async resolveEndpointId(params: {
+ endpointId?: string;
+ threadId: string;
+ }): Promise {
+ if (params.endpointId) {
+ return params.endpointId;
+ }
+ const sessions = await this.listSessions({ includeStored: true });
+ const matches = sessions.filter((session) => session.threadId === params.threadId);
+ if (matches.length === 1) {
+ return matches[0].endpointId;
+ }
+ if (matches.length > 1) {
+ throw new Error(`Codex thread id is ambiguous across endpoints: ${params.threadId}`);
+ }
+ throw new Error(`Codex thread not found: ${params.threadId}`);
+ }
+
+ private async connectionFor(endpointId: string): Promise {
+ const endpoint = this.endpoints.find((entry) => entry.id === endpointId);
+ if (!endpoint) {
+ throw new Error(`Unknown Codex supervisor endpoint: ${endpointId}`);
+ }
+ const existing = this.connections.get(endpoint.id);
+ if (existing) {
+ return await existing;
+ }
+ const created = this.connector(endpoint);
+ this.connections.set(endpoint.id, created);
+ void created.catch(() => {
+ if (this.connections.get(endpoint.id) === created) {
+ this.connections.delete(endpoint.id);
+ }
+ });
+ return await created;
+ }
+
+ private forgetEndpoint(endpointId: string): void {
+ const existing = this.connections.get(endpointId);
+ if (!existing) {
+ return;
+ }
+ this.connections.delete(endpointId);
+ void existing.then((connection) => connection.close()).catch(() => undefined);
+ }
+}
diff --git a/extensions/codex-supervisor/src/types.ts b/extensions/codex-supervisor/src/types.ts
new file mode 100644
index 00000000000..9594920b49c
--- /dev/null
+++ b/extensions/codex-supervisor/src/types.ts
@@ -0,0 +1,58 @@
+export type CodexSupervisorEndpoint =
+ | {
+ id: string;
+ label?: string;
+ transport: "stdio-proxy";
+ command?: string;
+ args?: string[];
+ cwd?: string;
+ }
+ | {
+ id: string;
+ label?: string;
+ transport: "websocket";
+ url: string;
+ authTokenEnv?: string;
+ };
+
+export type CodexSupervisorTurnMode = "auto" | "start" | "steer";
+
+export type CodexSupervisorThreadStatus = string;
+
+export type CodexSupervisorSession = {
+ endpointId: string;
+ threadId: string;
+ sessionId?: string;
+ cwd?: string;
+ preview?: string;
+ name?: string | null;
+ source?: string;
+ status: CodexSupervisorThreadStatus;
+ updatedAt?: number;
+ humanAttached?: boolean;
+};
+
+export type CodexSupervisorSendResult = {
+ endpointId: string;
+ threadId: string;
+ mode: "start" | "steer";
+ turnId?: string;
+ status?: string;
+};
+
+export type CodexJsonRpcConnection = {
+ request(method: string, params?: Record): Promise;
+ notify(method: string, params?: Record): void;
+ close(): Promise;
+};
+
+export type CodexSupervisorEndpointHealth = {
+ endpointId: string;
+ ok: boolean;
+ detail?: string;
+};
+
+export type CodexSupervisorSessionListResult = {
+ sessions: CodexSupervisorSession[];
+ errors: CodexSupervisorEndpointHealth[];
+};
diff --git a/extensions/codex-supervisor/tsconfig.json b/extensions/codex-supervisor/tsconfig.json
new file mode 100644
index 00000000000..b8a85a99ac3
--- /dev/null
+++ b/extensions/codex-supervisor/tsconfig.json
@@ -0,0 +1,16 @@
+{
+ "extends": "../tsconfig.package-boundary.base.json",
+ "compilerOptions": {
+ "rootDir": "."
+ },
+ "include": ["./*.ts", "./src/**/*.ts"],
+ "exclude": [
+ "./**/*.test.ts",
+ "./dist/**",
+ "./node_modules/**",
+ "./src/test-support/**",
+ "./src/**/*test-helpers.ts",
+ "./src/**/*test-harness.ts",
+ "./src/**/*test-support.ts"
+ ]
+}
diff --git a/package.json b/package.json
index d643214861d..25e08039541 100644
--- a/package.json
+++ b/package.json
@@ -1434,6 +1434,7 @@
"clean:dist": "node -e \"require('fs').rmSync('dist', {recursive: true, force: true})\"",
"codex-app-server:protocol:check": "node --import tsx scripts/check-codex-app-server-protocol.ts",
"codex-app-server:protocol:sync": "node --import tsx scripts/sync-codex-app-server-protocol.ts",
+ "codex-supervisor:mcp": "node --import tsx extensions/codex-supervisor/src/mcp-serve.ts",
"config:channels:check": "node --import tsx scripts/generate-bundled-channel-config-metadata.ts --check",
"config:channels:gen": "node --import tsx scripts/generate-bundled-channel-config-metadata.ts --write",
"config:docs:check": "node --import tsx scripts/generate-config-doc-baseline.ts --check",
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 446c9502a77..ccc93ef5f05 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -545,6 +545,25 @@ importers:
specifier: workspace:*
version: link:../../packages/plugin-sdk
+ extensions/codex-supervisor:
+ dependencies:
+ '@modelcontextprotocol/sdk':
+ specifier: 1.29.0
+ version: 1.29.0(zod@4.4.3)
+ typebox:
+ specifier: 1.1.38
+ version: 1.1.38
+ ws:
+ specifier: 8.21.0
+ version: 8.21.0
+ zod:
+ specifier: 4.4.3
+ version: 4.4.3
+ devDependencies:
+ '@openclaw/plugin-sdk':
+ specifier: workspace:*
+ version: link:../../packages/plugin-sdk
+
extensions/comfy:
devDependencies:
'@openclaw/plugin-sdk':
diff --git a/src/agents/models-config.uses-first-github-copilot-profile-env-tokens.test.ts b/src/agents/models-config.uses-first-github-copilot-profile-env-tokens.test.ts
index c90372db6ed..46f39ccaaa7 100644
--- a/src/agents/models-config.uses-first-github-copilot-profile-env-tokens.test.ts
+++ b/src/agents/models-config.uses-first-github-copilot-profile-env-tokens.test.ts
@@ -133,6 +133,7 @@ describe("models-config", () => {
expect(resolveImplicitProviders).toHaveBeenCalledOnce();
expect(plan).toEqual({
action: "write",
+ pluginCatalogWrites: {},
contents: `${JSON.stringify(
{
providers: {
@@ -195,7 +196,7 @@ describe("models-config", () => {
},
);
- expect(plan).toEqual({ action: "noop" });
+ expect(plan).toEqual({ action: "noop", pluginCatalogWrites: {} });
});
it("uses tokenRef env var when github-copilot profile omits plaintext token", () => {
diff --git a/src/agents/sessions/extensions/loader.ts b/src/agents/sessions/extensions/loader.ts
index 1354a845be6..4e426932f9e 100644
--- a/src/agents/sessions/extensions/loader.ts
+++ b/src/agents/sessions/extensions/loader.ts
@@ -15,6 +15,7 @@ import * as bundledLlm from "openclaw/plugin-sdk/llm";
// The virtualModules option then makes them available to extensions.
import * as bundledTypebox from "typebox";
import * as bundledTypeboxCompile from "typebox/compile";
+import * as bundledTypeboxFormat from "typebox/format";
import * as bundledTypeboxValue from "typebox/value";
import {
buildPluginLoaderAliasMap,
@@ -44,9 +45,11 @@ import type {
const VIRTUAL_MODULES: Record = {
typebox: bundledTypebox,
"typebox/compile": bundledTypeboxCompile,
+ "typebox/format": bundledTypeboxFormat,
"typebox/value": bundledTypeboxValue,
"@sinclair/typebox": bundledTypebox,
"@sinclair/typebox/compile": bundledTypeboxCompile,
+ "@sinclair/typebox/format": bundledTypeboxFormat,
"@sinclair/typebox/value": bundledTypeboxValue,
"openclaw/plugin-sdk/agent-core": bundledAgentCore,
"@openclaw/plugin-sdk/agent-core": bundledAgentCore,
@@ -74,6 +77,7 @@ function getExtensionLoaderAliases(): Record {
const agentSessionsEntry = resolveExtensionSafeAgentSessionsEntry();
const typeboxEntry = require.resolve("typebox");
const typeboxCompileEntry = require.resolve("typebox/compile");
+ const typeboxFormatEntry = require.resolve("typebox/format");
const typeboxValueEntry = require.resolve("typebox/value");
const loaderModulePath = fileURLToPath(import.meta.url);
@@ -85,9 +89,11 @@ function getExtensionLoaderAliases(): Record {
"@openclaw/plugin-sdk/agent-sessions": agentSessionsEntry,
typebox: typeboxEntry,
"typebox/compile": typeboxCompileEntry,
+ "typebox/format": typeboxFormatEntry,
"typebox/value": typeboxValueEntry,
"@sinclair/typebox": typeboxEntry,
"@sinclair/typebox/compile": typeboxCompileEntry,
+ "@sinclair/typebox/format": typeboxFormatEntry,
"@sinclair/typebox/value": typeboxValueEntry,
};
diff --git a/src/gateway/client.ts b/src/gateway/client.ts
index 78da18de8ca..1b7821fa0e7 100644
--- a/src/gateway/client.ts
+++ b/src/gateway/client.ts
@@ -149,6 +149,13 @@ export type GatewayClientOptions = {
onGap?: (info: { expected: number; received: number }) => void;
};
+export type GatewayClientConnectionMetadata = {
+ clientName?: GatewayClientName;
+ hasDeviceIdentity: boolean;
+ mode?: GatewayClientMode;
+ preauthHandshakeTimeoutMs?: number;
+};
+
function createOpenClawGatewayClientHostDeps(
overrides?: GatewayClientHostDeps,
): GatewayClientHostDeps {
@@ -210,6 +217,16 @@ export class GatewayClient {
): Promise {
return this.#client.request(method, params, opts);
}
+
+ getConnectionMetadata(): GatewayClientConnectionMetadata {
+ const opts = (this.#client as unknown as { opts: GatewayClientOptions }).opts;
+ return {
+ clientName: opts.clientName,
+ hasDeviceIdentity: Boolean(opts.deviceIdentity),
+ mode: opts.mode,
+ preauthHandshakeTimeoutMs: opts.preauthHandshakeTimeoutMs,
+ };
+ }
}
export type { DeviceIdentity };
diff --git a/test/vitest/vitest.unit-support.config.ts b/test/vitest/vitest.unit-support.config.ts
index cba4deedf3c..db333cd673a 100644
--- a/test/vitest/vitest.unit-support.config.ts
+++ b/test/vitest/vitest.unit-support.config.ts
@@ -4,6 +4,8 @@ export default createUnitVitestConfigWithOptions(process.env, {
name: "unit-support",
includePatterns: ["packages/**/*.test.ts"],
extraExcludePatterns: [
+ // The gateway-client package owns its own browser/runtime protocol lane.
+ "packages/gateway-client/src/**/*.test.ts",
// The gateway-protocol package rides with gateway-client because the client
// package owns the browser/runtime protocol compatibility lane.
"packages/gateway-protocol/src/**/*.test.ts",