diff --git a/CHANGELOG.md b/CHANGELOG.md index b7846c22e15..f64a0832296 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ Docs: https://docs.openclaw.ai - Channels/QQBot: add full group chat support (history tracking, @-mention gating, activation modes, per-group config, FIFO message queue with deliver debounce), C2C `stream_messages` streaming with a `StreamingController` lifecycle manager, unified `sendMedia` with chunked upload for large files, and refactor the engine into pipeline stages, focused outbound submodules, builtin slash-command modules, and explicit DI ports via `createEngineAdapters()`. (#70624) Thanks @cxyhhhhh. - Gateway/runtime: reuse the current plugin metadata snapshot for provider discovery so repeated model-provider discovery avoids rebuilding plugin manifest metadata. Thanks @shakkernerd. - Gateway/startup: pass the plugin metadata snapshot from config validation into plugin bootstrap so startup reuses one manifest product instead of rebuilding plugin metadata. Thanks @shakkernerd. +- ACP/runtime: add an opt-in bundled Coven backend extension that routes ACP coding sessions through a local Coven daemon when `acp.backend="coven"`, while preserving the existing ACPX backend as the default fallback path. Thanks @BunsDev. + +### Fixes + +- ACP/runtime: harden the opt-in Coven backend with workspace-confined launch paths, home-expanded Coven socket config, bounded socket responses, sanitized daemon output, and controlled polling failure handling. Thanks @BunsDev. ## 2026.4.26 diff --git a/docs/tools/acp-agents-setup.md b/docs/tools/acp-agents-setup.md index 44787262c0f..863fa31eedd 100644 --- a/docs/tools/acp-agents-setup.md +++ b/docs/tools/acp-agents-setup.md @@ -186,6 +186,77 @@ Override the command or version in plugin config: See [Plugins](/tools/plugin). +## Optional Coven backend + +OpenClaw can also register a bundled, opt-in `coven` ACP backend for operators +who want ACP coding sessions supervised by a local [Coven](https://github.com/OpenCoven/coven) +daemon instead of launched directly through ACPX. + +This is intentionally an extension, not a core runtime path: + +- the default ACPX backend stays unchanged for normal installs; +- Coven has its own daemon, socket, session store, harness mapping, and project + boundary model; +- the bridge can be enabled, disabled, configured, and reviewed independently + through the plugin system; and +- OpenClaw remains responsible for ACP session routing, chat bindings, task + state, and fallback policy while Coven owns harness supervision. + +Minimal opt-in config: + +```json5 +{ + acp: { + enabled: true, + backend: "coven", + defaultAgent: "codex", + }, + plugins: { + entries: { + coven: { + enabled: true, + config: { + // Optional. Defaults to ~/.coven. Environment variables are not used for this trust anchor. + covenHome: "~/.coven", + // Optional. Defaults to /coven.sock; overrides must resolve to that path. + socketPath: "~/.coven/coven.sock", + // Optional. Defaults to false; enable only when direct ACP fallback is acceptable. + allowFallback: false, + // Optional. Used only when allowFallback is true. + fallbackBackend: "acpx", + }, + }, + }, + }, +} +``` + +When selected, OpenClaw checks Coven daemon health over the configured Unix +socket before launching. A successful launch creates a Coven session and records +the Coven session id in the ACP runtime handle. If the health check or launch +fails, OpenClaw fails closed by default so `acp.backend="coven"` cannot silently +downgrade to direct ACP execution. Set `allowFallback: true` only when direct +ACP fallback is an explicit, acceptable operator choice. + +For path safety, `~` in `covenHome` and `socketPath` expands to the current +user home directory, and configured Coven paths must be absolute after that +expansion. OpenClaw rejects workspace-relative Coven daemon paths because the +daemon socket is a local user trust anchor, not repository-controlled state. +`socketPath` must resolve to `/coven.sock`; OpenClaw does not allow +arbitrary Coven socket filenames because the daemon socket is the local trust +anchor. Keep `covenHome` owned by the OpenClaw user and private (`0700`); +OpenClaw rejects symlinked, shared-accessible, shared-writable, or non-socket +Coven socket paths before connecting. The Coven backend currently requires Unix +socket validation and fails closed on Windows rather than trusting a socket path +whose owner and permissions cannot be validated by this plugin. + +The default harness mapping sends known ACP agent ids such as `codex`, `claude`, +`gemini`, and `opencode` to explicitly authorized Coven harness ids. Unknown +ACP agent ids are rejected instead of being forwarded as harness names. Override +`plugins.entries.coven.config.harnesses` only when your local Coven install uses +custom harness names, and keep `acp.allowedAgents` aligned with the intended +chat-exposed harness set. + ### Automatic dependency install When you install OpenClaw globally with `npm install -g openclaw`, the acpx diff --git a/extensions/coven/index.ts b/extensions/coven/index.ts new file mode 100644 index 00000000000..ef8319e53c3 --- /dev/null +++ b/extensions/coven/index.ts @@ -0,0 +1,32 @@ +import { + registerAcpRuntimeBackend, + unregisterAcpRuntimeBackend, +} from "openclaw/plugin-sdk/acp-runtime"; +import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; +import { createCovenPluginConfigSchema, resolveCovenPluginConfig } from "./src/config.js"; +import { CovenAcpRuntime, COVEN_BACKEND_ID } from "./src/runtime.js"; + +export default definePluginEntry({ + id: COVEN_BACKEND_ID, + name: "Coven ACP Runtime", + description: + "Opt-in ACP runtime backend that launches coding tasks through a local Coven daemon.", + configSchema: () => createCovenPluginConfigSchema(), + register(api) { + api.registerService({ + id: "coven-runtime", + async start(ctx) { + const config = resolveCovenPluginConfig({ + rawConfig: api.pluginConfig, + workspaceDir: ctx.workspaceDir, + }); + const runtime = new CovenAcpRuntime({ config, logger: ctx.logger }); + registerAcpRuntimeBackend({ id: COVEN_BACKEND_ID, runtime }); + ctx.logger.info("coven ACP runtime backend registered"); + }, + async stop() { + unregisterAcpRuntimeBackend(COVEN_BACKEND_ID); + }, + }); + }, +}); diff --git a/extensions/coven/openclaw.plugin.json b/extensions/coven/openclaw.plugin.json new file mode 100644 index 00000000000..fd347e3a32a --- /dev/null +++ b/extensions/coven/openclaw.plugin.json @@ -0,0 +1,37 @@ +{ + "id": "coven", + "enabledByDefault": false, + "name": "Coven ACP Runtime", + "description": "Opt-in ACP runtime backend that launches coding tasks through a local Coven daemon.", + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": { + "covenHome": { + "type": "string", + "description": "Path to the Coven daemon state directory. Defaults to ~/.coven; environment variables are not used for this trust anchor." + }, + "socketPath": { + "type": "string", + "description": "Path to the Coven daemon Unix socket. Defaults to /coven.sock; overrides must resolve to that fixed socket filename." + }, + "allowFallback": { + "type": "boolean", + "description": "When true, fall back to fallbackBackend if Coven is unavailable or launch fails. Defaults to false." + }, + "fallbackBackend": { + "type": "string", + "description": "ACP backend to use only when allowFallback is true. Defaults to acpx." + }, + "pollIntervalMs": { + "type": "number", + "description": "Polling interval for Coven session events." + }, + "harnesses": { + "type": "object", + "additionalProperties": { "type": "string" }, + "description": "Explicitly map additional OpenClaw ACP agent ids to authorized Coven harness ids. Unknown agent ids are rejected." + } + } + } +} diff --git a/extensions/coven/package.json b/extensions/coven/package.json new file mode 100644 index 00000000000..2fdf9518fc9 --- /dev/null +++ b/extensions/coven/package.json @@ -0,0 +1,15 @@ +{ + "name": "@openclaw/coven", + "version": "2026.4.26", + "private": true, + "description": "OpenClaw Coven ACP runtime bridge", + "type": "module", + "devDependencies": { + "@openclaw/plugin-sdk": "workspace:*" + }, + "openclaw": { + "extensions": [ + "./index.ts" + ] + } +} diff --git a/extensions/coven/src/client.test.ts b/extensions/coven/src/client.test.ts new file mode 100644 index 00000000000..4682a92e272 --- /dev/null +++ b/extensions/coven/src/client.test.ts @@ -0,0 +1,224 @@ +import fs from "node:fs/promises"; +import http from "node:http"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { __testing, CovenApiError, createCovenClient } from "./client.js"; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-client-")); +}); + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +async function withServer( + handler: http.RequestListener, + fn: (socketPath: string) => Promise, +): Promise { + const socketPath = path.join(tmpDir, "coven.sock"); + const server = http.createServer(handler); + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(socketPath, () => resolve()); + }); + try { + await fn(socketPath); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }); + } +} + +describe("createCovenClient", () => { + it("parses daemon JSON over a Unix socket", async () => { + await withServer( + (_req, res) => { + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ ok: true, daemon: null })); + }, + async (socketPath) => { + await expect(createCovenClient(socketPath).health()).resolves.toEqual({ + ok: true, + daemon: null, + }); + }, + ); + }); + + it("validates a real socket inside the configured socket root", async () => { + await withServer( + (_req, res) => { + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ ok: true, daemon: null })); + }, + async (socketPath) => { + await expect( + createCovenClient(socketPath, { socketRoot: tmpDir }).health(), + ).resolves.toEqual({ + ok: true, + daemon: null, + }); + }, + ); + }); + + it("sends the event cursor when listing events", async () => { + await withServer( + (req, res) => { + expect(req.url).toBe("/events?sessionId=session-1&afterEventId=event-1"); + res.setHeader("Content-Type", "application/json"); + res.end("[]"); + }, + async (socketPath) => { + await expect( + createCovenClient(socketPath).listEvents("session-1", { afterEventId: "event-1" }), + ).resolves.toEqual([]); + }, + ); + }); + + it("rejects oversized event cursors before building the events URL", () => { + expect(() => + createCovenClient("/tmp/coven.sock").listEvents("session-1", { + afterEventId: "e".repeat(257), + }), + ).toThrow(/event id is invalid/); + }); + + it("wraps invalid daemon JSON in a typed API error", async () => { + await withServer( + (_req, res) => { + res.end("{not json"); + }, + async (socketPath) => { + await expect(createCovenClient(socketPath).health()).rejects.toBeInstanceOf(CovenApiError); + }, + ); + }); + + it("rejects daemon responses above the response size limit", async () => { + await withServer( + (_req, res) => { + res.end("x".repeat(1_000_001)); + }, + async (socketPath) => { + await expect(createCovenClient(socketPath).health()).rejects.toThrow(/size limit/); + }, + ); + }); + + it("rejects request bodies above the request size limit", async () => { + await withServer( + (_req, res) => { + res.end("{}"); + }, + async (socketPath) => { + await expect( + createCovenClient(socketPath).launchSession({ + projectRoot: "/repo", + cwd: "/repo", + harness: "codex", + prompt: "x".repeat(1_000_001), + title: "Large prompt", + }), + ).rejects.toThrow(/request exceeded size limit/); + }, + ); + }); + + it("revalidates socket paths before connecting", async () => { + const covenHome = path.join(tmpDir, ".coven"); + await fs.mkdir(covenHome); + await fs.chmod(covenHome, 0o700); + const socketPath = path.join(covenHome, "coven.sock"); + await fs.symlink("/var/run/docker.sock", socketPath); + + await expect(createCovenClient(socketPath, { socketRoot: covenHome }).health()).rejects.toThrow( + /must not be a symlink/, + ); + }); + + it("rejects a socket root that resolves through a symlink", async () => { + const realHome = path.join(tmpDir, "real-coven"); + const symlinkHome = path.join(tmpDir, "symlink-coven"); + await fs.mkdir(realHome); + await fs.chmod(realHome, 0o700); + await fs.symlink(realHome, symlinkHome); + + await expect( + createCovenClient(path.join(symlinkHome, "coven.sock"), { socketRoot: symlinkHome }).health(), + ).rejects.toThrow(/covenHome must not be a symlink/); + }); + + it("rejects missing socket roots with a validation error", async () => { + const covenHome = path.join(tmpDir, "missing-coven"); + + await expect( + createCovenClient(path.join(covenHome, "coven.sock"), { socketRoot: covenHome }).health(), + ).rejects.toThrow(/covenHome must exist/); + }); + + it("rejects a group or world writable socket root", async () => { + if (process.platform === "win32") { + return; + } + const covenHome = path.join(tmpDir, ".coven"); + await fs.mkdir(covenHome); + await fs.chmod(covenHome, 0o777); + + await expect( + createCovenClient(path.join(covenHome, "coven.sock"), { socketRoot: covenHome }).health(), + ).rejects.toThrow(/covenHome must not be group or world writable/); + }); + + it("rejects socket paths that are not Unix sockets", async () => { + const covenHome = path.join(tmpDir, ".coven"); + await fs.mkdir(covenHome); + await fs.chmod(covenHome, 0o700); + const socketPath = path.join(covenHome, "coven.sock"); + await fs.writeFile(socketPath, ""); + + await expect(createCovenClient(socketPath, { socketRoot: covenHome }).health()).rejects.toThrow( + /must be a Unix socket/, + ); + }); + + it("rejects socket path overrides even when they are inside covenHome", async () => { + const covenHome = path.join(tmpDir, ".coven"); + await fs.mkdir(covenHome); + await fs.chmod(covenHome, 0o700); + const socketPath = path.join(covenHome, "other.sock"); + const server = http.createServer((_req, res) => { + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ ok: true, daemon: null })); + }); + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(socketPath, () => resolve()); + }); + try { + await expect( + createCovenClient(socketPath, { socketRoot: covenHome }).health(), + ).rejects.toThrow(/socketPath must be \/coven\.sock/); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }); + } + }); + + it("fails closed instead of bypassing socket validation on Windows", () => { + expect(() => + __testing.validateSocketPathForUse( + path.join(tmpDir, ".coven", "coven.sock"), + path.join(tmpDir, ".coven"), + "win32", + ), + ).toThrow(/not supported on Windows/); + }); +}); diff --git a/extensions/coven/src/client.ts b/extensions/coven/src/client.ts new file mode 100644 index 00000000000..3887893800d --- /dev/null +++ b/extensions/coven/src/client.ts @@ -0,0 +1,440 @@ +import fs from "node:fs"; +import http from "node:http"; +import net from "node:net"; +import path from "node:path"; +import { lstatIfExists, pathIsInside } from "./path-utils.js"; + +export type CovenSessionRecord = { + id: string; + projectRoot: string; + harness: string; + title: string; + status: string; + exitCode: number | null; + createdAt: string; + updatedAt: string; +}; + +export type CovenEventRecord = { + id: string; + sessionId: string; + kind: string; + payloadJson: string; + createdAt: string; +}; + +export type CovenHealthResponse = { + ok: boolean; + daemon?: { + pid: number; + startedAt: string; + socket: string; + } | null; +}; + +export type LaunchCovenSessionInput = { + projectRoot: string; + cwd: string; + harness: string; + prompt: string; + title: string; +}; + +export interface CovenClient { + health(signal?: AbortSignal): Promise; + launchSession(input: LaunchCovenSessionInput, signal?: AbortSignal): Promise; + getSession(sessionId: string, signal?: AbortSignal): Promise; + listEvents( + sessionId: string, + options?: CovenListEventsOptions, + signal?: AbortSignal, + ): Promise; + sendInput(sessionId: string, data: string, signal?: AbortSignal): Promise; + killSession(sessionId: string, signal?: AbortSignal): Promise; +} + +export type CovenListEventsOptions = { + afterEventId?: string; +}; + +type RequestOptions = { + socketPath: string; + socketRoot?: string; + method: "GET" | "POST"; + path: string; + body?: unknown; + signal?: AbortSignal; +}; + +type HttpResponse = { + status: number; + body: string; +}; + +type SocketFingerprint = { + dev: number; + ino: number; + mode: number; + uid: number; + gid: number; +}; + +export class CovenApiError extends Error { + readonly status: number; + readonly body: string; + + constructor(status: number, body: string) { + super(`Coven API returned HTTP ${status || "unknown"}`); + this.name = "CovenApiError"; + this.status = status; + this.body = body; + } +} + +const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; +const MAX_REQUEST_BYTES = 1_000_000; +const MAX_RESPONSE_BYTES = 1_000_000; +const DEFAULT_SOCKET_FILENAME = "coven.sock"; +const SAFE_QUERY_ID_REGEX = /^[A-Za-z0-9._:-]+$/; +const MAX_QUERY_ID_CHARS = 256; + +function statExistingPath(filePath: string, label: string): fs.Stats { + try { + return fs.statSync(filePath); + } catch { + throw new Error(`${label} must exist`); + } +} + +function realpathExistingPath(filePath: string, label: string): string { + try { + return fs.realpathSync.native(filePath); + } catch { + throw new Error(`${label} must exist`); + } +} + +function fingerprintSocket(stat: fs.Stats): SocketFingerprint { + return { + dev: stat.dev, + ino: stat.ino, + mode: stat.mode, + uid: stat.uid, + gid: stat.gid, + }; +} + +function socketFingerprintMatches(left: SocketFingerprint, right: SocketFingerprint): boolean { + return ( + left.dev === right.dev && + left.ino === right.ino && + left.mode === right.mode && + left.uid === right.uid && + left.gid === right.gid + ); +} + +function validateSocketPathForUse( + socketPath: string, + socketRoot: string | undefined, + platform: NodeJS.Platform = process.platform, +): SocketFingerprint | null { + if (!socketRoot) { + return null; + } + validateSocketPlatform(platform); + const socketRootLstat = lstatIfExists(socketRoot); + if (socketRootLstat?.isSymbolicLink()) { + throw new Error("Coven covenHome must not be a symlink"); + } + const socketRootStat = statExistingPath(socketRoot, "Coven covenHome"); + validateSocketOwnerAndMode(socketRootStat, "Coven covenHome", platform); + validatePrivateDirectory(socketRootStat, "Coven covenHome", platform); + const expectedSocketPath = path.resolve(socketRoot, DEFAULT_SOCKET_FILENAME); + if (path.resolve(socketPath) !== expectedSocketPath) { + throw new Error("Coven socketPath must be /coven.sock"); + } + + const socketStat = lstatIfExists(socketPath); + if (socketStat?.isSymbolicLink()) { + throw new Error("Coven socketPath must not be a symlink"); + } + const resolvedSocketStat = statExistingPath(socketPath, "Coven socketPath"); + if (!resolvedSocketStat.isSocket()) { + throw new Error("Coven socketPath must be a Unix socket"); + } + validateSocketOwnerAndMode(resolvedSocketStat, "Coven socketPath", platform); + + const realSocketRoot = realpathExistingPath(socketRoot, "Coven covenHome"); + const realSocketDir = realpathExistingPath( + path.dirname(socketPath), + "Coven socketPath directory", + ); + const socketDirStat = statExistingPath(path.dirname(socketPath), "Coven socketPath directory"); + validateSocketOwnerAndMode(socketDirStat, "Coven socketPath directory", platform); + validatePrivateDirectory(socketDirStat, "Coven socketPath directory", platform); + if (!pathIsInside(realSocketRoot, realSocketDir)) { + throw new Error("Coven socketPath must stay inside covenHome"); + } + const realSocketPath = realpathExistingPath(socketPath, "Coven socketPath"); + if (!pathIsInside(realSocketRoot, realSocketPath)) { + throw new Error("Coven socketPath must stay inside covenHome"); + } + return fingerprintSocket(resolvedSocketStat); +} + +function validateSocketPlatform(platform: NodeJS.Platform): void { + if (platform === "win32") { + throw new Error("Coven Unix socket validation is not supported on Windows"); + } +} + +function requireSafeQueryId(input: string, label: string): string { + const value = input.trim(); + if (!value || value.length > MAX_QUERY_ID_CHARS || !SAFE_QUERY_ID_REGEX.test(value)) { + throw new Error(`${label} is invalid`); + } + return value; +} + +function validateSocketOwnerAndMode( + stat: fs.Stats, + label: string, + platform: NodeJS.Platform, +): void { + validateSocketPlatform(platform); + const currentUid = typeof process.getuid === "function" ? process.getuid() : null; + if (currentUid != null && stat.uid !== currentUid) { + throw new Error(`${label} must be owned by the current user`); + } + if ((stat.mode & 0o022) !== 0) { + throw new Error(`${label} must not be group or world writable`); + } +} + +function validatePrivateDirectory(stat: fs.Stats, label: string, platform: NodeJS.Platform): void { + validateSocketPlatform(platform); + if (!stat.isDirectory()) { + throw new Error(`${label} must be a directory`); + } + if ((stat.mode & 0o077) !== 0) { + throw new Error(`${label} must not be group or world accessible`); + } +} + +function serializeRequestBody(body: unknown): { text: string; byteLength: number } { + if (body === undefined) { + return { text: "", byteLength: 0 }; + } + const text = JSON.stringify(body) ?? ""; + const byteLength = Buffer.byteLength(text, "utf8"); + if (byteLength > MAX_REQUEST_BYTES) { + throw new Error("Coven API request exceeded size limit"); + } + return { text, byteLength }; +} + +function errorToError(error: unknown): Error { + return error instanceof Error ? error : new Error(String(error)); +} + +function socketThatFailsWith(error: unknown): net.Socket { + const socket = new net.Socket(); + queueMicrotask(() => socket.destroy(errorToError(error))); + return socket; +} + +function requestOverSocket(options: RequestOptions): Promise { + return new Promise((resolve, reject) => { + if (options.signal?.aborted) { + reject(options.signal.reason ?? new Error("request aborted")); + return; + } + let requestBody = ""; + let requestBodyBytes = 0; + let socketFingerprint: SocketFingerprint | null = null; + try { + socketFingerprint = validateSocketPathForUse(options.socketPath, options.socketRoot); + const serialized = serializeRequestBody(options.body); + requestBody = serialized.text; + requestBodyBytes = serialized.byteLength; + } catch (error) { + reject(error); + return; + } + + let settled = false; + let body = ""; + let totalBytes = 0; + + const settle = (fn: () => void, req?: http.ClientRequest) => { + if (settled) { + return; + } + settled = true; + req?.destroy(); + fn(); + }; + + const req = http.request( + { + createConnection: () => { + try { + const beforeConnect = validateSocketPathForUse(options.socketPath, options.socketRoot); + const socket = net.createConnection({ path: options.socketPath }); + socket.once("connect", () => { + try { + const afterConnect = validateSocketPathForUse( + options.socketPath, + options.socketRoot, + ); + const expected = beforeConnect ?? socketFingerprint; + if (expected && afterConnect && !socketFingerprintMatches(expected, afterConnect)) { + socket.destroy(new Error("Coven socketPath changed during connection")); + } + } catch (error) { + socket.destroy(errorToError(error)); + } + }); + return socket; + } catch (error) { + return socketThatFailsWith(error); + } + }, + method: options.method, + path: options.path, + headers: { + Host: "coven", + Connection: "close", + ...(requestBody + ? { + "Content-Type": "application/json", + "Content-Length": requestBodyBytes, + } + : {}), + }, + signal: options.signal, + }, + (res) => { + res.setEncoding("utf8"); + res.on("data", (chunk: string) => { + if (settled) { + return; + } + totalBytes += Buffer.byteLength(chunk); + if (totalBytes > MAX_RESPONSE_BYTES) { + settle(() => reject(new Error("Coven API response exceeded size limit")), req); + return; + } + body += chunk; + }); + res.on("end", () => { + settle(() => + resolve({ + status: res.statusCode ?? 0, + body, + }), + ); + }); + res.on("error", (error) => settle(() => reject(error), req)); + }, + ); + req.setTimeout(DEFAULT_REQUEST_TIMEOUT_MS, () => { + settle(() => reject(new Error("Coven API request timed out")), req); + }); + req.on("error", (error) => { + if (settled) { + return; + } + settle(() => reject(error)); + }); + req.end(requestBody); + }); +} + +async function requestJson(options: RequestOptions): Promise { + const response = await requestOverSocket(options); + if (response.status < 200 || response.status >= 300) { + throw new CovenApiError(response.status, response.body); + } + try { + return JSON.parse(response.body || "null") as T; + } catch (error) { + throw new CovenApiError(response.status, `Invalid JSON response: ${String(error)}`); + } +} + +export function createCovenClient( + socketPath: string, + clientOptions: { socketRoot?: string } = {}, +): CovenClient { + return { + health(signal) { + return requestJson({ + socketPath, + socketRoot: clientOptions.socketRoot, + method: "GET", + path: "/health", + signal, + }); + }, + launchSession(input, signal) { + return requestJson({ + socketPath, + socketRoot: clientOptions.socketRoot, + method: "POST", + path: "/sessions", + body: input, + signal, + }); + }, + getSession(sessionId, signal) { + return requestJson({ + socketPath, + socketRoot: clientOptions.socketRoot, + method: "GET", + path: `/sessions/${encodeURIComponent(sessionId)}`, + signal, + }); + }, + listEvents(sessionId, options, signal) { + const params = new URLSearchParams({ + sessionId: requireSafeQueryId(sessionId, "Coven session id"), + }); + const afterEventId = options?.afterEventId?.trim(); + if (afterEventId) { + params.set("afterEventId", requireSafeQueryId(afterEventId, "Coven event id")); + } + return requestJson({ + socketPath, + socketRoot: clientOptions.socketRoot, + method: "GET", + path: `/events?${params.toString()}`, + signal, + }); + }, + async sendInput(sessionId, data, signal) { + await requestJson({ + socketPath, + socketRoot: clientOptions.socketRoot, + method: "POST", + path: `/sessions/${encodeURIComponent(sessionId)}/input`, + body: { data }, + signal, + }); + }, + async killSession(sessionId, signal) { + await requestJson({ + socketPath, + socketRoot: clientOptions.socketRoot, + method: "POST", + path: `/sessions/${encodeURIComponent(sessionId)}/kill`, + signal, + }); + }, + }; +} + +export const __testing = { + validateSocketPathForUse, +}; diff --git a/extensions/coven/src/config.test.ts b/extensions/coven/src/config.test.ts new file mode 100644 index 00000000000..44a5cb661a6 --- /dev/null +++ b/extensions/coven/src/config.test.ts @@ -0,0 +1,129 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { resolveCovenPluginConfig } from "./config.js"; + +const OLD_COVEN_HOME = process.env.COVEN_HOME; + +afterEach(() => { + if (OLD_COVEN_HOME === undefined) { + delete process.env.COVEN_HOME; + } else { + process.env.COVEN_HOME = OLD_COVEN_HOME; + } +}); + +describe("resolveCovenPluginConfig", () => { + it("expands tilde paths before resolving Coven home and socket path", () => { + const resolved = resolveCovenPluginConfig({ + rawConfig: { + covenHome: "~/.coven", + socketPath: "~/.coven/coven.sock", + }, + workspaceDir: "/repo", + }); + + expect(resolved.covenHome).toBe(path.join(os.homedir(), ".coven")); + expect(resolved.socketPath).toBe(path.join(os.homedir(), ".coven", "coven.sock")); + }); + + it("rejects relative Coven paths instead of trusting workspace contents", () => { + expect(() => + resolveCovenPluginConfig({ + rawConfig: { + covenHome: ".coven", + socketPath: ".coven/coven.sock", + }, + workspaceDir: "/repo", + }), + ).toThrow(/covenHome must be absolute/); + }); + + it("rejects socket paths outside covenHome", () => { + expect(() => + resolveCovenPluginConfig({ + rawConfig: { + covenHome: "~/.coven", + socketPath: "/var/run/docker.sock", + }, + workspaceDir: "/repo", + }), + ).toThrow(/socketPath must stay inside covenHome/); + }); + + it("rejects alternate socket filenames inside covenHome", () => { + expect(() => + resolveCovenPluginConfig({ + rawConfig: { + covenHome: "~/.coven", + socketPath: "~/.coven/other.sock", + }, + workspaceDir: "/repo", + }), + ).toThrow(/socketPath overrides are not supported/); + }); + + it("rejects socket paths that are symlinks", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-config-")); + const covenHome = path.join(workspaceDir, ".coven"); + await fs.mkdir(covenHome); + const socketPath = path.join(covenHome, "coven.sock"); + await fs.symlink("/var/run/docker.sock", socketPath); + try { + expect(() => + resolveCovenPluginConfig({ + rawConfig: { + covenHome, + socketPath, + }, + workspaceDir, + }), + ).toThrow(/must not be a symlink/); + } finally { + await fs.rm(workspaceDir, { recursive: true, force: true }); + } + }); + + it("rejects covenHome when it is a symlink", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-config-")); + const realHome = path.join(workspaceDir, "real-coven"); + const symlinkHome = path.join(workspaceDir, "symlink-coven"); + await fs.mkdir(realHome); + await fs.symlink(realHome, symlinkHome); + try { + expect(() => + resolveCovenPluginConfig({ + rawConfig: { + covenHome: symlinkHome, + }, + workspaceDir, + }), + ).toThrow(/covenHome must not be a symlink/); + } finally { + await fs.rm(workspaceDir, { recursive: true, force: true }); + } + }); + + it("ignores COVEN_HOME when resolving the socket trust anchor", () => { + process.env.COVEN_HOME = "~/.custom-coven"; + + const resolved = resolveCovenPluginConfig({ + rawConfig: {}, + workspaceDir: "/repo", + }); + + expect(resolved.covenHome).toBe(path.join(os.homedir(), ".coven")); + expect(resolved.socketPath).toBe(path.join(os.homedir(), ".coven", "coven.sock")); + expect(resolved.allowFallback).toBe(false); + }); + + it("only enables fallback when configured explicitly", () => { + const resolved = resolveCovenPluginConfig({ + rawConfig: { allowFallback: true }, + workspaceDir: "/repo", + }); + + expect(resolved.allowFallback).toBe(true); + }); +}); diff --git a/extensions/coven/src/config.ts b/extensions/coven/src/config.ts new file mode 100644 index 00000000000..e6206b7f326 --- /dev/null +++ b/extensions/coven/src/config.ts @@ -0,0 +1,140 @@ +import os from "node:os"; +import path from "node:path"; +import { buildPluginConfigSchema } from "openclaw/plugin-sdk/core"; +import { z } from "openclaw/plugin-sdk/zod"; +import { lstatIfExists, pathIsInside, realpathIfExists } from "./path-utils.js"; + +export type CovenPluginConfig = { + covenHome?: string; + socketPath?: string; + allowFallback?: boolean; + fallbackBackend?: string; + pollIntervalMs?: number; + harnesses?: Record; +}; + +export type ResolvedCovenPluginConfig = { + covenHome: string; + socketPath: string; + workspaceDir: string; + allowFallback: boolean; + fallbackBackend: string; + pollIntervalMs: number; + harnesses: Record; +}; + +const DEFAULT_FALLBACK_BACKEND = "acpx"; +const DEFAULT_POLL_INTERVAL_MS = 250; +const DEFAULT_SOCKET_FILENAME = "coven.sock"; + +const nonEmptyString = z.string().trim().min(1); + +export const CovenPluginConfigSchema = z.strictObject({ + covenHome: nonEmptyString.optional(), + socketPath: nonEmptyString.optional(), + allowFallback: z.boolean().optional(), + fallbackBackend: nonEmptyString.optional(), + pollIntervalMs: z.number().min(25).max(10_000).optional(), + harnesses: z.record(z.string(), nonEmptyString).optional(), +}); + +export function createCovenPluginConfigSchema() { + return buildPluginConfigSchema(CovenPluginConfigSchema); +} + +function normalizeBackendId(value: string | undefined): string { + const normalized = value?.trim().toLowerCase(); + return normalized || DEFAULT_FALLBACK_BACKEND; +} + +function expandTilde(raw: string): string { + const trimmed = raw.trim(); + if (trimmed === "~") { + return os.homedir(); + } + if (trimmed.startsWith("~/")) { + return path.join(os.homedir(), trimmed.slice(2)); + } + return trimmed; +} + +function resolveConfiguredPath(raw: string, label: "covenHome" | "socketPath"): string { + const expanded = expandTilde(raw); + if (!path.isAbsolute(expanded)) { + throw new Error(`Coven ${label} must be absolute`); + } + return path.resolve(expanded); +} + +function resolveCovenHome(raw: string | undefined): string { + const fromConfig = raw?.trim(); + if (fromConfig) { + return resolveConfiguredPath(fromConfig, "covenHome"); + } + return path.join(os.homedir(), ".coven"); +} + +function resolveSocketPath(covenHome: string, raw: string | undefined): string { + if (lstatIfExists(covenHome)?.isSymbolicLink()) { + throw new Error("Coven covenHome must not be a symlink"); + } + const defaultSocketPath = path.join(covenHome, DEFAULT_SOCKET_FILENAME); + const socketPath = raw?.trim() ? resolveConfiguredPath(raw, "socketPath") : defaultSocketPath; + if (!pathIsInside(covenHome, socketPath)) { + throw new Error("Coven socketPath must stay inside covenHome"); + } + if (socketPath !== defaultSocketPath) { + throw new Error("Coven socketPath overrides are not supported"); + } + const socketStat = lstatIfExists(socketPath); + if (socketStat?.isSymbolicLink()) { + throw new Error("Coven socketPath must not be a symlink"); + } + const realCovenHome = realpathIfExists(covenHome); + const realSocketDir = realpathIfExists(path.dirname(socketPath)); + if (realCovenHome && realSocketDir && !pathIsInside(realCovenHome, realSocketDir)) { + throw new Error("Coven socketPath must stay inside covenHome"); + } + const realSocketPath = realpathIfExists(socketPath); + if (realCovenHome && realSocketPath && !pathIsInside(realCovenHome, realSocketPath)) { + throw new Error("Coven socketPath must stay inside covenHome"); + } + return socketPath; +} + +function normalizeHarnesses(value: Record | undefined): Record { + return Object.fromEntries( + Object.entries(value ?? {}).flatMap(([agent, harness]) => { + const normalizedAgent = agent.trim().toLowerCase(); + const normalizedHarness = harness.trim(); + return normalizedAgent && normalizedHarness ? [[normalizedAgent, normalizedHarness]] : []; + }), + ); +} + +export function resolveCovenPluginConfig(params: { + rawConfig: unknown; + workspaceDir?: string; +}): ResolvedCovenPluginConfig { + const parsed = CovenPluginConfigSchema.safeParse(params.rawConfig ?? {}); + if (!parsed.success) { + throw new Error(parsed.error.issues[0]?.message ?? "invalid Coven plugin config"); + } + const config = parsed.data as CovenPluginConfig; + const workspaceDir = path.resolve(params.workspaceDir ?? process.cwd()); + const covenHome = resolveCovenHome(config.covenHome); + return { + covenHome, + socketPath: resolveSocketPath(covenHome, config.socketPath), + workspaceDir, + allowFallback: config.allowFallback === true, + fallbackBackend: normalizeBackendId(config.fallbackBackend), + pollIntervalMs: config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS, + harnesses: normalizeHarnesses(config.harnesses), + }; +} + +export const __testing = { + expandTilde, + resolveConfiguredPath, +}; diff --git a/extensions/coven/src/path-utils.ts b/extensions/coven/src/path-utils.ts new file mode 100644 index 00000000000..41d377e746e --- /dev/null +++ b/extensions/coven/src/path-utils.ts @@ -0,0 +1,23 @@ +import fs from "node:fs"; +import path from "node:path"; + +export function pathIsInside(parent: string, child: string): boolean { + const relative = path.relative(parent, child); + return relative === "" || (!relative.startsWith("..") && !path.isAbsolute(relative)); +} + +export function realpathIfExists(filePath: string): string | null { + try { + return fs.realpathSync.native(filePath); + } catch { + return null; + } +} + +export function lstatIfExists(filePath: string): fs.Stats | null { + try { + return fs.lstatSync(filePath); + } catch { + return null; + } +} diff --git a/extensions/coven/src/runtime.test.ts b/extensions/coven/src/runtime.test.ts new file mode 100644 index 00000000000..107684fe09c --- /dev/null +++ b/extensions/coven/src/runtime.test.ts @@ -0,0 +1,931 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { + registerAcpRuntimeBackend, + unregisterAcpRuntimeBackend, + type AcpRuntime, + type AcpRuntimeEvent, + type AcpRuntimeHandle, +} from "openclaw/plugin-sdk/acp-runtime"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { CovenClient, CovenEventRecord, CovenSessionRecord } from "./client.js"; +import type { ResolvedCovenPluginConfig } from "./config.js"; +import { __testing, CovenAcpRuntime } from "./runtime.js"; + +const baseConfig: ResolvedCovenPluginConfig = { + covenHome: "", + socketPath: "", + workspaceDir: "", + allowFallback: false, + fallbackBackend: "acpx", + pollIntervalMs: 25, + harnesses: {}, +}; + +let workspaceDir: string; +let config: ResolvedCovenPluginConfig; + +beforeEach(async () => { + workspaceDir = await fs.realpath( + await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-workspace-")), + ); + const covenHome = path.join(workspaceDir, ".coven"); + await fs.mkdir(covenHome); + config = { + ...baseConfig, + covenHome, + socketPath: path.join(covenHome, "coven.sock"), + workspaceDir, + }; +}); + +function session(overrides: Partial = {}): CovenSessionRecord { + return { + id: "session-1", + projectRoot: workspaceDir, + harness: "codex", + title: "Fix tests", + status: "running", + exitCode: null, + createdAt: "2026-04-27T10:00:00Z", + updatedAt: "2026-04-27T10:00:00Z", + ...overrides, + }; +} + +function event(overrides: Partial): CovenEventRecord { + return { + id: "event-1", + sessionId: "session-1", + kind: "output", + payloadJson: JSON.stringify({ data: "hello\n" }), + createdAt: "2026-04-27T10:00:00Z", + ...overrides, + }; +} + +function fakeClient(overrides: Partial = {}): CovenClient { + return { + health: vi.fn(async () => ({ ok: true, daemon: null })), + launchSession: vi.fn(async () => session()), + getSession: vi.fn(async () => session({ status: "completed", exitCode: 0 })), + listEvents: vi.fn(async () => [ + event({ id: "event-1", kind: "output", payloadJson: JSON.stringify({ data: "hello\n" }) }), + event({ + id: "event-2", + kind: "exit", + payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }), + }), + ]), + sendInput: vi.fn(async () => undefined), + killSession: vi.fn(async () => undefined), + ...overrides, + }; +} + +async function collect(iterable: AsyncIterable): Promise { + const events: AcpRuntimeEvent[] = []; + for await (const item of iterable) { + events.push(item); + } + return events; +} + +function fallbackRuntime(): AcpRuntime { + const handle: AcpRuntimeHandle = { + sessionKey: "agent:codex:test", + backend: "acpx", + runtimeSessionName: "fallback-session", + cwd: workspaceDir, + }; + return { + ensureSession: vi.fn(async () => handle), + async *runTurn() { + yield { type: "text_delta", text: "direct fallback\n", stream: "output" }; + yield { type: "done", stopReason: "complete" }; + }, + getStatus: vi.fn(async () => ({ summary: "fallback active" })), + cancel: vi.fn(async () => undefined), + close: vi.fn(async () => undefined), + }; +} + +afterEach(() => { + vi.useRealTimers(); + unregisterAcpRuntimeBackend("acpx"); + return fs.rm(workspaceDir, { recursive: true, force: true }); +}); + +describe("CovenAcpRuntime", () => { + it("fails closed by default when Coven is unavailable", async () => { + const runtime = new CovenAcpRuntime({ + config, + client: fakeClient({ + health: vi.fn(async () => { + throw new Error("offline"); + }), + }), + }); + + await expect( + runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }), + ).rejects.toThrow(/fallback is disabled/); + }); + + it("falls back to the direct ACP backend when Coven is unavailable and fallback is enabled", async () => { + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const runtime = new CovenAcpRuntime({ + config: { ...config, allowFallback: true }, + client: fakeClient({ + health: vi.fn(async () => { + throw new Error("offline"); + }), + }), + }); + + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + expect(handle.backend).toBe("acpx"); + expect(fallback.ensureSession).toHaveBeenCalledOnce(); + }); + + it("falls back when Coven health checks do not settle before the deadline", async () => { + vi.useFakeTimers(); + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const client = fakeClient({ + health: vi.fn( + async (signal?: AbortSignal) => + await new Promise((_resolve, reject) => { + signal?.addEventListener("abort", () => reject(signal.reason ?? new Error("aborted")), { + once: true, + }); + }), + ), + }); + const runtime = new CovenAcpRuntime({ config: { ...config, allowFallback: true }, client }); + + const pending = runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + await vi.advanceTimersByTimeAsync(5_000); + const handle = await pending; + + expect(handle.backend).toBe("acpx"); + }); + + it("launches a Coven session and streams output events to ACP", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.launchSession).toHaveBeenCalledWith( + expect.objectContaining({ + projectRoot: workspaceDir, + cwd: workspaceDir, + harness: "codex", + prompt: "Fix tests", + }), + undefined, + ); + expect(handle.backendSessionId).toBe("session-1"); + expect(events).toEqual([ + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + expect.objectContaining({ type: "text_delta", text: "hello\n" }), + expect.objectContaining({ type: "status", text: "coven session completed exitCode=0" }), + expect.objectContaining({ type: "done", stopReason: "completed" }), + ]); + }); + + it("rejects unknown ACP agent ids instead of forwarding them as Coven harness names", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + + await expect( + runtime.ensureSession({ + sessionKey: "agent:attacker:test", + agent: "attacker-harness", + mode: "oneshot", + cwd: workspaceDir, + }), + ).rejects.toThrow(/Unknown or unauthorized ACP agent/); + expect(client.health).not.toHaveBeenCalled(); + }); + + it("allows explicit configured agent-to-harness mappings", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ + config: { ...config, harnesses: { assistant: "codex" } }, + client, + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:assistant:test", + agent: "assistant", + mode: "oneshot", + cwd: workspaceDir, + }); + + await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.launchSession).toHaveBeenCalledWith( + expect.objectContaining({ harness: "codex" }), + undefined, + ); + }); + + it("sanitizes daemon-controlled harness fields in start status", async () => { + const client = fakeClient({ + launchSession: vi.fn(async () => + session({ + harness: "\u001b[31mcodex\u001b[0m", + }), + ), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(events).toContainEqual( + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + ); + }); + + it("rejects unsafe daemon-controlled session ids before exposing handle fields", async () => { + const client = fakeClient({ + launchSession: vi.fn(async () => + session({ + id: "\u001b]0;spoof\u0007session-1\r", + }), + ), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + await expect( + collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/session id is invalid/); + expect(handle.backendSessionId).toBeUndefined(); + expect(handle.agentSessionId).toBeUndefined(); + expect(client.killSession).toHaveBeenCalledWith("\u001b]0;spoof\u0007session-1\r", undefined); + }); + + it("kills an already-launched Coven session before falling back on unsafe session ids", async () => { + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const client = fakeClient({ + launchSession: vi.fn(async () => session({ id: "bad\nsession" })), + killSession: vi.fn(async () => undefined), + }); + const runtime = new CovenAcpRuntime({ config: { ...config, allowFallback: true }, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.killSession).toHaveBeenCalledWith("bad\nsession", undefined); + expect(handle.backend).toBe("acpx"); + expect(events).toEqual([ + expect.objectContaining({ type: "text_delta", text: "direct fallback\n" }), + expect.objectContaining({ type: "done", stopReason: "complete" }), + ]); + }); + + it("fails closed without launching Coven when prompts exceed the Coven request limit", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + await expect( + collect( + runtime.runTurn({ + handle, + text: "x".repeat(500_001), + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/fallback is disabled/); + + expect(client.launchSession).not.toHaveBeenCalled(); + }); + + it("falls back on oversized prompts when fallback is explicitly enabled", async () => { + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config: { ...config, allowFallback: true }, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ + handle, + text: "x".repeat(500_001), + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.launchSession).not.toHaveBeenCalled(); + expect(events).toEqual([ + expect.objectContaining({ type: "text_delta", text: "direct fallback\n" }), + expect.objectContaining({ type: "done", stopReason: "complete" }), + ]); + }); + + it("ignores cwd embedded in runtimeSessionName when launching Coven sessions", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + handle.runtimeSessionName = `coven:${Buffer.from( + JSON.stringify({ + agent: "codex", + mode: "prompt", + cwd: "/tmp/attacker", + }), + "utf8", + ).toString("base64url")}`; + + await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.launchSession).toHaveBeenCalledWith( + expect.objectContaining({ + projectRoot: workspaceDir, + cwd: workspaceDir, + }), + undefined, + ); + }); + + it("rejects Coven handles whose cwd is outside the configured workspace", async () => { + const runtime = new CovenAcpRuntime({ config, client: fakeClient() }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + handle.cwd = "/tmp/attacker"; + + await expect( + collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/outside workspace/); + }); + + it("rejects Coven cwd symlinks that resolve outside the workspace", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-workspace-")); + const outsideDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-outside-")); + const symlinkPath = path.join(workspaceDir, "outside"); + await fs.symlink(outsideDir, symlinkPath); + try { + const runtime = new CovenAcpRuntime({ + config: { ...config, workspaceDir }, + client: fakeClient(), + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: symlinkPath, + }); + + await expect( + collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/outside workspace/); + } finally { + await fs.rm(workspaceDir, { recursive: true, force: true }); + await fs.rm(outsideDir, { recursive: true, force: true }); + } + }); + + it("requests incremental events after the last processed Coven event", async () => { + const client = fakeClient({ + listEvents: vi + .fn() + .mockResolvedValueOnce([ + event({ + id: "event-1", + kind: "output", + payloadJson: JSON.stringify({ data: "hello\n" }), + }), + ]) + .mockResolvedValueOnce([ + event({ + id: "event-2", + kind: "exit", + payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }), + }), + ]), + getSession: vi.fn(async () => session({ status: "running" })), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(client.listEvents).toHaveBeenNthCalledWith( + 2, + "session-1", + { + afterEventId: "event-1", + }, + undefined, + ); + }); + + it("fails and kills the Coven session when the daemon returns an unsafe event id", async () => { + const client = fakeClient({ + listEvents: vi.fn(async () => [ + event({ + id: "e".repeat(257), + kind: "output", + payloadJson: JSON.stringify({ data: "hello\n" }), + }), + ]), + killSession: vi.fn(async () => undefined), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(client.killSession).toHaveBeenCalledWith("session-1", undefined); + expect(events).toEqual([ + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + expect.objectContaining({ type: "status", text: "coven session polling failed" }), + expect.objectContaining({ type: "done", stopReason: "error" }), + ]); + }); + + it("clamps malformed runtime poll intervals before sleeping", async () => { + const sleep = vi.fn(async () => undefined); + const client = fakeClient({ + listEvents: vi + .fn() + .mockResolvedValueOnce([]) + .mockResolvedValueOnce([ + event({ + id: "event-1", + kind: "exit", + payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }), + }), + ]), + getSession: vi.fn(async () => session({ status: "running" })), + }); + const runtime = new CovenAcpRuntime({ + config: { ...config, pollIntervalMs: 0 }, + client, + sleep, + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(sleep).toHaveBeenCalledWith(25, undefined); + }); + + it("fails the turn when the daemon returns too many events in one poll", async () => { + const client = fakeClient({ + listEvents: vi.fn(async () => + Array.from({ length: 600 }, (_, index) => + event({ + id: `event-${index}`, + kind: "output", + payloadJson: JSON.stringify({ data: `line-${index}\n` }), + }), + ), + ), + killSession: vi.fn(async () => undefined), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(client.killSession).toHaveBeenCalledWith("session-1", undefined); + expect(events).toEqual([ + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + expect.objectContaining({ type: "status", text: "coven session polling failed" }), + expect.objectContaining({ type: "done", stopReason: "error" }), + ]); + }); + + it("converts Coven polling failures into controlled terminal events", async () => { + const client = fakeClient({ + listEvents: vi.fn(async () => { + throw new Error("bad json"); + }), + killSession: vi.fn(async () => undefined), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(client.killSession).toHaveBeenCalledWith("session-1", undefined); + expect(events).toEqual([ + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + expect.objectContaining({ type: "status", text: "coven session polling failed" }), + expect.objectContaining({ type: "done", stopReason: "error" }), + ]); + }); + + it("sanitizes Coven polling errors before logging", async () => { + const logger = { warn: vi.fn(), info: vi.fn(), error: vi.fn(), debug: vi.fn() }; + const client = fakeClient({ + listEvents: vi.fn(async () => { + throw new Error("\u001b]0;spoof\u0007bad\r\njson"); + }), + killSession: vi.fn(async () => undefined), + }); + const runtime = new CovenAcpRuntime({ config, client, logger }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(logger.warn).toHaveBeenCalledWith("coven polling failed: Error: bad json"); + }); + + it("strips terminal escape and control characters from Coven output", () => { + expect( + __testing.sanitizeTerminalText( + "\u001b]0;spoof\u0007hi\u001b[31m!\u001b[0m\u001b7\u001bc\u202e\r\n", + ), + ).toBe("hi!\n"); + }); + + it("sanitizes prompt-derived session titles", () => { + expect(__testing.titleFromPrompt("\u001b]0;spoof\u0007Fix\u001b[31m tests\r\nnow")).toBe( + "Fix tests now", + ); + }); + + it("normalizes untrusted Coven exit status into bounded stop reasons", () => { + expect(__testing.normalizeStopReason("completed")).toBe("completed"); + expect(__testing.normalizeStopReason("killed")).toBe("cancelled"); + expect(__testing.normalizeStopReason("refusal")).toBe("completed"); + + expect( + __testing.eventToRuntimeEvents( + event({ + kind: "exit", + payloadJson: JSON.stringify({ status: "refusal", exitCode: 0 }), + }), + ), + ).toContainEqual(expect.objectContaining({ type: "done", stopReason: "completed" })); + }); + + it("guards daemon exitCode types before rendering terminal status text", () => { + expect( + __testing.terminalStatusEvent( + session({ status: "completed", exitCode: "\u001b[31m1" as unknown as number }), + ), + ).toEqual({ + type: "status", + text: "coven session completed", + tag: "session_info_update", + }); + }); + + it("drops oversized daemon event payloads before parsing", () => { + expect( + __testing.eventToRuntimeEvents( + event({ + kind: "output", + payloadJson: JSON.stringify({ data: "x".repeat(64_001) }), + }), + ), + ).toEqual([]); + }); + + it("rejects oversized Coven runtime session metadata", () => { + expect(__testing.decodeRuntimeSessionName(`coven:${"a".repeat(2_049)}`)).toBeNull(); + }); + + it("bounds encoded Coven runtime session metadata before persistence", () => { + const encoded = __testing.encodeRuntimeSessionName({ + agent: "A".repeat(5_000), + mode: "prompt".repeat(1_000), + sessionMode: "persistent".repeat(1_000), + }); + + expect(Buffer.byteLength(encoded, "utf8")).toBeLessThanOrEqual("coven:".length + 2_048); + expect(__testing.decodeRuntimeSessionName(encoded)).toEqual({ + agent: "a".repeat(128), + mode: "promptpromptpromptpromptpromptpr", + sessionMode: "persistentpersistentpersistentpe", + }); + }); + + it("rejects missing Coven cwd paths before launching", async () => { + const workspaceDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-workspace-")); + try { + const runtime = new CovenAcpRuntime({ + config: { ...config, workspaceDir }, + client: fakeClient(), + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: path.join(workspaceDir, "missing"), + }); + + await expect( + collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/outside workspace/); + } finally { + await fs.rm(workspaceDir, { recursive: true, force: true }); + } + }); + + it("rejects Coven cwd paths that are not directories", async () => { + const filePath = path.join(workspaceDir, "not-a-directory"); + await fs.writeFile(filePath, "not a directory"); + const runtime = new CovenAcpRuntime({ config, client: fakeClient() }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: filePath, + }); + + await expect( + collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/cwd must be a directory/); + }); + + it("does not trust persisted backendSessionId without an active tracked Coven session", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle: AcpRuntimeHandle = { + sessionKey: "agent:codex:test", + backend: "coven", + runtimeSessionName: __testing.encodeRuntimeSessionName({ + agent: "codex", + mode: "prompt", + }), + cwd: workspaceDir, + backendSessionId: "attacker-session", + }; + + await expect(runtime.getStatus({ handle })).resolves.toEqual({ + summary: "coven runtime ready", + }); + await expect(runtime.cancel({ handle })).resolves.toBeUndefined(); + await expect(runtime.close({ handle, reason: "user" })).resolves.toBeUndefined(); + expect(client.getSession).not.toHaveBeenCalledWith("attacker-session", undefined); + expect(client.killSession).not.toHaveBeenCalledWith("attacker-session", undefined); + }); + + it("rejects backendSessionId values that conflict with the active tracked Coven session", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + const turn = runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }); + const iterator = turn[Symbol.asyncIterator](); + await iterator.next(); + handle.backendSessionId = "attacker-session"; + + await expect(runtime.getStatus({ handle })).rejects.toThrow(/does not match/); + await expect(runtime.cancel({ handle })).rejects.toThrow(/does not match/); + await expect(runtime.close({ handle, reason: "user" })).rejects.toThrow(/does not match/); + expect(client.getSession).not.toHaveBeenCalledWith("attacker-session", undefined); + expect(client.killSession).not.toHaveBeenCalledWith("attacker-session", undefined); + await iterator.return?.(); + }); + + it("preserves direct fallback when Coven launch fails after detection", async () => { + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const runtime = new CovenAcpRuntime({ + config: { ...config, allowFallback: true }, + client: fakeClient({ + launchSession: vi.fn(async () => { + throw new Error("launch failed"); + }), + }), + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + const events = await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(handle.backend).toBe("acpx"); + expect(events).toEqual([ + expect.objectContaining({ type: "text_delta", text: "direct fallback\n" }), + expect.objectContaining({ type: "done", stopReason: "complete" }), + ]); + }); + + it("fails closed when Coven launch fails after detection and fallback is disabled", async () => { + const runtime = new CovenAcpRuntime({ + config, + client: fakeClient({ + launchSession: vi.fn(async () => { + throw new Error("\u001b]0;spoof\u0007launch\r\nfailed"); + }), + }), + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: workspaceDir, + }); + + await expect( + collect(runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" })), + ).rejects.toThrow(/Error: launch failed/); + }); + + it("sanitizes Coven doctor error details", async () => { + const runtime = new CovenAcpRuntime({ + config, + client: fakeClient({ + health: vi.fn(async () => { + throw new Error("\u001b[31moffline\r\nnow"); + }), + }), + }); + + await expect(runtime.doctor()).resolves.toMatchObject({ + ok: false, + details: ["Error: offline now"], + }); + }); +}); diff --git a/extensions/coven/src/runtime.ts b/extensions/coven/src/runtime.ts new file mode 100644 index 00000000000..03c131be47c --- /dev/null +++ b/extensions/coven/src/runtime.ts @@ -0,0 +1,676 @@ +import fs from "node:fs"; +import path from "node:path"; +import { + AcpRuntimeError, + getAcpRuntimeBackend, + type AcpRuntime, + type AcpRuntimeDoctorReport, + type AcpRuntimeEvent, + type AcpRuntimeHandle, + type AcpRuntimeStatus, + type AcpRuntimeTurnInput, +} from "openclaw/plugin-sdk/acp-runtime"; +import type { PluginLogger } from "openclaw/plugin-sdk/plugin-entry"; +import { + createCovenClient, + type CovenClient, + type CovenEventRecord, + type CovenSessionRecord, +} from "./client.js"; +import type { ResolvedCovenPluginConfig } from "./config.js"; +import { pathIsInside, realpathIfExists } from "./path-utils.js"; + +export const COVEN_BACKEND_ID = "coven"; + +const DEFAULT_HARNESSES: Record = { + codex: "codex", + "openai-codex": "codex", + "codex-cli": "codex", + claude: "claude", + "claude-cli": "claude", + gemini: "gemini", + "google-gemini-cli": "gemini", + opencode: "opencode", +}; +const HEALTH_CHECK_TIMEOUT_MS = 5_000; +const MAX_COVEN_PROMPT_BYTES = 500_000; +const MIN_POLL_INTERVAL_MS = 25; +const MAX_POLL_INTERVAL_MS = 10_000; +const DEFAULT_POLL_INTERVAL_MS = 250; +const MAX_EVENTS_PER_POLL = 500; +const MAX_EVENT_PAYLOAD_BYTES = 64_000; +const MAX_TRACKED_EVENT_IDS = 10_000; +const MAX_RUNTIME_SESSION_NAME_BYTES = 2_048; +const MAX_RUNTIME_AGENT_CHARS = 128; +const MAX_RUNTIME_MODE_CHARS = 32; +const MAX_STATUS_FIELD_CHARS = 256; +const MAX_SESSION_ID_CHARS = 128; +const MAX_EVENT_ID_CHARS = 256; +const SAFE_SESSION_ID_REGEX = /^[A-Za-z0-9._:-]+$/; + +type CovenRuntimeSessionState = { + agent: string; + mode: string; + sessionMode?: string; +}; + +type CovenAcpRuntimeParams = { + config: ResolvedCovenPluginConfig; + logger?: PluginLogger; + client?: CovenClient; + sleep?: (ms: number, signal?: AbortSignal) => Promise; +}; + +function normalizeAgentId(value: string | undefined): string { + return value?.trim().toLowerCase() || "codex"; +} + +function encodeRuntimeSessionName(state: CovenRuntimeSessionState): string { + const prefix = "coven:"; + const safeState: CovenRuntimeSessionState = { + agent: normalizeAgentId(state.agent).slice(0, MAX_RUNTIME_AGENT_CHARS) || "codex", + mode: (state.mode.trim() || "prompt").slice(0, MAX_RUNTIME_MODE_CHARS), + ...(state.sessionMode + ? { sessionMode: state.sessionMode.trim().slice(0, MAX_RUNTIME_MODE_CHARS) } + : {}), + }; + const encoded = Buffer.from(JSON.stringify(safeState), "utf8").toString("base64url"); + const value = `${prefix}${encoded}`; + if (Buffer.byteLength(value, "utf8") > prefix.length + MAX_RUNTIME_SESSION_NAME_BYTES) { + throw new AcpRuntimeError( + "ACP_SESSION_INIT_FAILED", + "Coven runtime session metadata is too large.", + ); + } + return value; +} + +function decodeRuntimeSessionName(value: string): CovenRuntimeSessionState | null { + const prefix = "coven:"; + if (!value.startsWith(prefix) || value.length > prefix.length + MAX_RUNTIME_SESSION_NAME_BYTES) { + return null; + } + const encoded = value.slice(prefix.length); + if (!encoded) { + return null; + } + try { + const decoded = Buffer.from(encoded, "base64url"); + if (decoded.byteLength > MAX_RUNTIME_SESSION_NAME_BYTES) { + return null; + } + const jsonText = decoded.toString("utf8"); + if (Buffer.byteLength(jsonText, "utf8") > MAX_RUNTIME_SESSION_NAME_BYTES) { + return null; + } + const parsed = JSON.parse(jsonText) as Partial; + const agent = normalizeAgentId(typeof parsed.agent === "string" ? parsed.agent : undefined); + return { + agent, + mode: typeof parsed.mode === "string" ? parsed.mode : "prompt", + ...(typeof parsed.sessionMode === "string" ? { sessionMode: parsed.sessionMode } : {}), + }; + } catch { + return null; + } +} + +function defaultSleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new Error("sleep aborted")); + return; + } + const timeout = setTimeout(resolve, ms); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timeout); + reject(signal.reason ?? new Error("sleep aborted")); + }, + { once: true }, + ); + }); +} + +function titleFromPrompt(prompt: string): string { + const compact = sanitizeStatusText(prompt); + return compact.slice(0, 80) || "OpenClaw task"; +} + +function parsePayload(event: CovenEventRecord): Record { + if (Buffer.byteLength(event.payloadJson, "utf8") > MAX_EVENT_PAYLOAD_BYTES) { + return {}; + } + try { + const parsed = JSON.parse(event.payloadJson) as unknown; + return typeof parsed === "object" && parsed !== null ? (parsed as Record) : {}; + } catch { + return {}; + } +} + +const ESC = String.fromCharCode(0x1b); +const BEL = String.fromCharCode(0x07); +const c0Start = String.fromCharCode(0x00); +const c0Backspace = String.fromCharCode(0x08); +const c0VerticalTab = String.fromCharCode(0x0b); +const c0UnitSeparator = String.fromCharCode(0x1f); +const del = String.fromCharCode(0x7f); +const c1Start = String.fromCharCode(0x80); +const c1End = String.fromCharCode(0x9f); +const BIDI_CONTROL_REGEX = /\p{Bidi_Control}/gu; +const ANSI_ESCAPE_REGEX = new RegExp( + `${ESC}(?:\\][\\s\\S]*?(?:${BEL}|${ESC}\\\\)|P[\\s\\S]*?${ESC}\\\\|\\[[\\x20-\\x3f]*[\\x40-\\x7e]|[\\x20-\\x2f]*[\\x30-\\x7e])`, + "g", +); +const TEXT_CONTROL_REGEX = new RegExp( + `[${c0Start}-${c0Backspace}${c0VerticalTab}-${c0UnitSeparator}${del}${c1Start}-${c1End}]`, + "g", +); + +function sanitizeTerminalText(input: string): string { + return input + .replace(ANSI_ESCAPE_REGEX, "") + .replace(TEXT_CONTROL_REGEX, "") + .replace(BIDI_CONTROL_REGEX, ""); +} + +function sanitizeStatusText(input: string): string { + return sanitizeTerminalText(input).replace(/\s+/g, " ").trim(); +} + +function sanitizeStatusField(input: string, fallback = "unknown"): string { + return sanitizeStatusText(input).slice(0, MAX_STATUS_FIELD_CHARS) || fallback; +} + +function sanitizeErrorText(error: unknown): string { + const raw = error instanceof Error ? `${error.name}: ${error.message}` : String(error); + return sanitizeStatusField(raw, "unknown error"); +} + +function requireSafeSessionId(input: string): string { + const value = input.trim(); + if (!value || value.length > MAX_SESSION_ID_CHARS || !SAFE_SESSION_ID_REGEX.test(value)) { + throw new Error("Coven session id is invalid"); + } + return value; +} + +function requireSafeEventId(input: string): string { + const value = input.trim(); + if (!value || value.length > MAX_EVENT_ID_CHARS || !SAFE_SESSION_ID_REGEX.test(value)) { + throw new Error("Coven event id is invalid"); + } + return value; +} + +function boundedCovenPrompt(input: string): string { + if (Buffer.byteLength(input, "utf8") > MAX_COVEN_PROMPT_BYTES) { + throw new Error("Coven prompt exceeded size limit"); + } + return input; +} + +function normalizePollIntervalMs(value: number): number { + if (!Number.isFinite(value)) { + return DEFAULT_POLL_INTERVAL_MS; + } + return Math.min(MAX_POLL_INTERVAL_MS, Math.max(MIN_POLL_INTERVAL_MS, value)); +} + +function normalizeStopReason(value: unknown): string { + const normalized = + typeof value === "string" ? sanitizeStatusText(value).toLowerCase() : "completed"; + if (normalized === "completed" || normalized === "complete" || normalized === "success") { + return "completed"; + } + if (normalized === "killed" || normalized === "cancelled" || normalized === "canceled") { + return "cancelled"; + } + if (normalized === "failed" || normalized === "failure" || normalized === "error") { + return "error"; + } + return "completed"; +} + +function eventToRuntimeEvents(event: CovenEventRecord): AcpRuntimeEvent[] { + const payload = parsePayload(event); + if (event.kind === "output") { + const text = typeof payload.data === "string" ? sanitizeTerminalText(payload.data) : ""; + return text ? [{ type: "text_delta", text, stream: "output", tag: "agent_message_chunk" }] : []; + } + if (event.kind === "exit") { + const status = sanitizeStatusField( + typeof payload.status === "string" ? payload.status : "completed", + "completed", + ); + const exitCode = typeof payload.exitCode === "number" ? payload.exitCode : null; + return [ + { + type: "status", + text: `coven session ${status}${exitCode == null ? "" : ` exitCode=${exitCode}`}`, + tag: "session_info_update", + }, + { type: "done", stopReason: normalizeStopReason(status) }, + ]; + } + if (event.kind === "kill") { + return [ + { type: "status", text: "coven session killed", tag: "session_info_update" }, + { type: "done", stopReason: "cancelled" }, + ]; + } + return []; +} + +function sessionIsTerminal(session: CovenSessionRecord): boolean { + return session.status !== "running" && session.status !== "created"; +} + +function terminalStatusEvent(session: CovenSessionRecord): AcpRuntimeEvent { + const status = sanitizeStatusField(session.status, "completed"); + const exitCode = typeof session.exitCode === "number" ? session.exitCode : null; + return { + type: "status", + text: `coven session ${status}${exitCode == null ? "" : ` exitCode=${exitCode}`}`, + tag: "session_info_update", + }; +} + +export class CovenAcpRuntime implements AcpRuntime { + private readonly config: ResolvedCovenPluginConfig; + private readonly client: CovenClient; + private readonly logger?: PluginLogger; + private readonly sleep: (ms: number, signal?: AbortSignal) => Promise; + private readonly activeSessionIdsBySessionKey = new Map(); + + constructor(params: CovenAcpRuntimeParams) { + this.config = { + ...params.config, + pollIntervalMs: normalizePollIntervalMs(params.config.pollIntervalMs), + }; + this.logger = params.logger; + this.client = + params.client ?? + createCovenClient(params.config.socketPath, { socketRoot: params.config.covenHome }); + this.sleep = params.sleep ?? defaultSleep; + } + + async ensureSession( + input: Parameters[0], + ): Promise { + const agent = normalizeAgentId(input.agent); + this.resolveHarness(agent); + if (!(await this.isCovenAvailable())) { + if (!this.config.allowFallback) { + throw new AcpRuntimeError( + "ACP_BACKEND_UNAVAILABLE", + "Coven is unavailable and fallback is disabled.", + ); + } + return await this.ensureFallbackSession(input); + } + return { + sessionKey: input.sessionKey, + backend: COVEN_BACKEND_ID, + runtimeSessionName: encodeRuntimeSessionName({ + agent, + mode: "prompt", + sessionMode: input.mode, + }), + ...(input.cwd ? { cwd: input.cwd } : {}), + }; + } + + async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable { + if (input.handle.backend !== COVEN_BACKEND_ID) { + yield* this.runFallbackTurn(input, input.handle); + return; + } + const state = decodeRuntimeSessionName(input.handle.runtimeSessionName); + if (!state) { + throw new AcpRuntimeError( + "ACP_SESSION_INIT_FAILED", + "Coven runtime session metadata is missing.", + ); + } + + const cwd = this.resolveWorkspaceCwd(input.handle.cwd); + const harness = this.resolveHarness(state.agent); + let session: CovenSessionRecord | undefined; + let sessionId: string; + try { + const prompt = boundedCovenPrompt(input.text); + session = await this.client.launchSession( + { + projectRoot: this.config.workspaceDir, + cwd, + harness, + prompt, + title: titleFromPrompt(prompt), + }, + input.signal, + ); + } catch (error) { + const safeError = sanitizeErrorText(error); + if (!this.config.allowFallback) { + throw new AcpRuntimeError( + "ACP_TURN_FAILED", + `Coven launch failed and fallback is disabled: ${safeError}`, + { cause: error }, + ); + } + this.logger?.warn( + `coven launch failed; falling back to ${this.config.fallbackBackend}: ${safeError}`, + ); + yield* this.runFallbackFromCovenHandle(input, state); + return; + } + try { + if (!session) { + throw new Error("Coven launch did not return a session"); + } + sessionId = requireSafeSessionId(session.id); + } catch (error) { + await this.killLaunchedSessionBestEffort(session?.id); + const safeError = sanitizeErrorText(error); + if (!this.config.allowFallback) { + throw new AcpRuntimeError( + "ACP_TURN_FAILED", + `Coven launch failed and fallback is disabled: ${safeError}`, + { cause: error }, + ); + } + this.logger?.warn( + `coven launch failed; falling back to ${this.config.fallbackBackend}: ${safeError}`, + ); + yield* this.runFallbackFromCovenHandle(input, state); + return; + } + + input.handle.backendSessionId = sessionId; + input.handle.agentSessionId = sessionId; + this.activeSessionIdsBySessionKey.set(input.handle.sessionKey, sessionId); + yield { + type: "status", + text: `coven session ${sessionId} started (${sanitizeStatusField(session.harness)})`, + tag: "session_info_update", + }; + + const seenEventIds = new Set(); + const seenEventQueue: string[] = []; + let lastSeenEventId: string | undefined; + while (true) { + if (input.signal?.aborted) { + await this.killActiveSession(sessionId).catch(() => undefined); + throw input.signal.reason ?? new Error("Coven turn aborted"); + } + + try { + const events = await this.client.listEvents( + sessionId, + lastSeenEventId ? { afterEventId: lastSeenEventId } : undefined, + input.signal, + ); + if (events.length > MAX_EVENTS_PER_POLL) { + throw new Error("Coven daemon returned too many events"); + } + for (const event of events) { + const eventId = requireSafeEventId(event.id); + if (seenEventIds.has(eventId)) { + continue; + } + seenEventIds.add(eventId); + seenEventQueue.push(eventId); + while (seenEventQueue.length > MAX_TRACKED_EVENT_IDS) { + const removed = seenEventQueue.shift(); + if (removed) { + seenEventIds.delete(removed); + } + } + lastSeenEventId = eventId; + for (const runtimeEvent of eventToRuntimeEvents(event)) { + yield runtimeEvent; + if (runtimeEvent.type === "done") { + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + return; + } + } + } + + const latest = await this.client.getSession(sessionId, input.signal); + if (sessionIsTerminal(latest)) { + yield terminalStatusEvent(latest); + yield { type: "done", stopReason: normalizeStopReason(latest.status) }; + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + return; + } + } catch (error) { + if (input.signal?.aborted) { + await this.killActiveSession(sessionId).catch(() => undefined); + throw input.signal.reason ?? error; + } + this.logger?.warn(`coven polling failed: ${sanitizeErrorText(error)}`); + await this.killActiveSession(sessionId).catch(() => undefined); + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + yield { type: "status", text: "coven session polling failed", tag: "session_info_update" }; + yield { type: "done", stopReason: "error" }; + return; + } + + await this.sleep(this.config.pollIntervalMs, input.signal); + } + } + + getCapabilities() { + return { controls: ["session/status" as const] }; + } + + async getStatus( + input: Parameters>[0], + ): Promise { + if (input.handle.backend !== COVEN_BACKEND_ID) { + const fallback = this.requireFallbackRuntime(input.handle.backend); + return fallback.getStatus + ? await fallback.getStatus(input) + : { summary: `fallback backend ${input.handle.backend} active` }; + } + const sessionId = this.getTrackedSessionId(input.handle); + if (!sessionId) { + return { summary: "coven runtime ready" }; + } + const session = await this.client.getSession(sessionId, input.signal); + const status = sanitizeStatusField(session.status, "completed"); + const harness = sanitizeStatusField(session.harness); + const title = sanitizeStatusField(session.title, "untitled"); + return { + summary: `${status} ${harness} ${title}`, + backendSessionId: sessionId, + agentSessionId: sessionId, + details: { + projectRoot: sanitizeStatusField(session.projectRoot), + harness, + status, + exitCode: session.exitCode, + }, + }; + } + + async doctor(): Promise { + try { + const health = await this.client.health(); + return health.ok + ? { ok: true, message: "Coven daemon is reachable." } + : { ok: false, code: "COVEN_UNHEALTHY", message: "Coven daemon did not report healthy." }; + } catch (error) { + return { + ok: false, + code: "COVEN_UNAVAILABLE", + message: "Coven daemon is not reachable; direct ACP fallback remains available.", + details: [sanitizeErrorText(error)], + }; + } + } + + async cancel(input: Parameters[0]): Promise { + if (input.handle.backend !== COVEN_BACKEND_ID) { + await this.requireFallbackRuntime(input.handle.backend).cancel(input); + return; + } + const sessionId = this.getTrackedSessionId(input.handle); + if (sessionId) { + await this.killActiveSession(sessionId); + } + } + + async close(input: Parameters[0]): Promise { + if (input.handle.backend !== COVEN_BACKEND_ID) { + await this.requireFallbackRuntime(input.handle.backend).close(input); + return; + } + const sessionId = this.getTrackedSessionId(input.handle); + if (sessionId && input.reason !== "oneshot-complete") { + await this.killActiveSession(sessionId).catch(() => undefined); + } + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + } + + async prepareFreshSession(input: { sessionKey: string }): Promise { + this.activeSessionIdsBySessionKey.delete(input.sessionKey); + const fallback = this.getFallbackRuntime(); + await fallback?.prepareFreshSession?.(input); + } + + private async isCovenAvailable(): Promise { + const controller = new AbortController(); + const timeout = setTimeout( + () => controller.abort(new Error("Coven health check timed out")), + HEALTH_CHECK_TIMEOUT_MS, + ); + try { + const health = await this.client.health(controller.signal); + return health.ok; + } catch { + return false; + } finally { + clearTimeout(timeout); + } + } + + private resolveHarness(agent: string): string { + const normalized = normalizeAgentId(agent); + const harness = this.config.harnesses[normalized] ?? DEFAULT_HARNESSES[normalized]; + if (!harness) { + throw new AcpRuntimeError( + "ACP_INVALID_RUNTIME_OPTION", + `Unknown or unauthorized ACP agent "${normalized}" for Coven backend.`, + ); + } + return harness; + } + + private getFallbackRuntime(backendId = this.config.fallbackBackend): AcpRuntime | null { + const normalized = backendId.trim().toLowerCase(); + if (!normalized || normalized === COVEN_BACKEND_ID) { + return null; + } + return getAcpRuntimeBackend(normalized)?.runtime ?? null; + } + + private requireFallbackRuntime(backendId = this.config.fallbackBackend): AcpRuntime { + const runtime = this.getFallbackRuntime(backendId); + if (!runtime) { + throw new AcpRuntimeError( + "ACP_BACKEND_UNAVAILABLE", + `Coven fallback ACP backend "${backendId}" is not registered.`, + ); + } + return runtime; + } + + private async ensureFallbackSession( + input: Parameters[0], + ): Promise { + return await this.requireFallbackRuntime().ensureSession(input); + } + + private async *runFallbackTurn( + input: AcpRuntimeTurnInput, + handle: AcpRuntimeHandle, + ): AsyncIterable { + yield* this.requireFallbackRuntime(handle.backend).runTurn({ ...input, handle }); + } + + private async *runFallbackFromCovenHandle( + input: AcpRuntimeTurnInput, + state: CovenRuntimeSessionState, + ): AsyncIterable { + const fallback = this.requireFallbackRuntime(); + const handle = await fallback.ensureSession({ + sessionKey: input.handle.sessionKey, + agent: state.agent, + mode: state.sessionMode === "persistent" ? "persistent" : "oneshot", + cwd: this.resolveWorkspaceCwd(input.handle.cwd), + }); + Object.assign(input.handle, handle); + yield* fallback.runTurn({ ...input, handle }); + } + + private resolveWorkspaceCwd(candidate: string | undefined): string { + const cwd = path.resolve(candidate ?? this.config.workspaceDir); + const workspaceReal = realpathIfExists(this.config.workspaceDir); + const cwdReal = realpathIfExists(cwd); + if (!workspaceReal || !cwdReal || !pathIsInside(workspaceReal, cwdReal)) { + throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "Coven cwd is outside workspace."); + } + try { + if (!fs.statSync(cwdReal).isDirectory()) { + throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "Coven cwd must be a directory."); + } + } catch (error) { + if (error instanceof AcpRuntimeError) { + throw error; + } + throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "Coven cwd must be a directory."); + } + return cwdReal; + } + + private getTrackedSessionId(handle: AcpRuntimeHandle): string | undefined { + const tracked = this.activeSessionIdsBySessionKey.get(handle.sessionKey); + if (!tracked) { + return undefined; + } + if (handle.backendSessionId && handle.backendSessionId !== tracked) { + throw new AcpRuntimeError( + "ACP_INVALID_RUNTIME_OPTION", + "Coven session handle does not match this runtime session.", + ); + } + return tracked; + } + + private async killActiveSession(sessionId: string, signal?: AbortSignal): Promise { + await this.client.killSession(sessionId, signal); + } + + private async killLaunchedSessionBestEffort(sessionId: string | undefined): Promise { + if (!sessionId) { + return; + } + await this.client.killSession(sessionId, undefined).catch(() => undefined); + } +} + +export const __testing = { + decodeRuntimeSessionName, + encodeRuntimeSessionName, + eventToRuntimeEvents, + normalizeStopReason, + sanitizeErrorText, + sanitizeStatusField, + sanitizeTerminalText, + terminalStatusEvent, + titleFromPrompt, +}; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5124ee5929b..c9592a2b8ae 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -419,6 +419,12 @@ importers: specifier: workspace:* version: link:../../packages/plugin-sdk + extensions/coven: + devDependencies: + '@openclaw/plugin-sdk': + specifier: workspace:* + version: link:../../packages/plugin-sdk + extensions/deepgram: dependencies: ws: diff --git a/src/proxy-capture/runtime.test.ts b/src/proxy-capture/runtime.test.ts index 40f81d30782..1bf8d9ca1c8 100644 --- a/src/proxy-capture/runtime.test.ts +++ b/src/proxy-capture/runtime.test.ts @@ -89,6 +89,28 @@ describe("debug proxy runtime", () => { expect(events.some((event) => event.kind === "response")).toBe(true); }); + it("reinstalls ambient global fetch capture when fetch changes after initialization", async () => { + globalThis.fetch = vi.fn(async () => ({ status: 200 }) as Response) as typeof fetch; + + initializeDebugProxyCapture("test"); + const replacementFetch = vi.fn(async () => ({ status: 204 }) as Response) as typeof fetch; + globalThis.fetch = replacementFetch; + initializeDebugProxyCapture("test"); + + await globalThis.fetch("https://api.minimax.io/anthropic/messages", { + method: "POST", + headers: { "content-type": "application/json" }, + body: '{"input":"hello"}', + }); + finalizeDebugProxyCapture(); + + expect(replacementFetch).toHaveBeenCalledTimes(1); + const events = storeState.events.filter((event) => event.sessionId === "runtime-test-session"); + expect(events.some((event) => event.host === "api.minimax.io")).toBe(true); + expect(events.some((event) => event.kind === "request")).toBe(true); + expect(events.some((event) => event.kind === "response" && event.status === 204)).toBe(true); + }); + it("redacts sensitive request and response headers before persistence", async () => { initializeDebugProxyCapture("test"); captureHttpExchange({ diff --git a/src/proxy-capture/runtime.ts b/src/proxy-capture/runtime.ts index 33b3ed8f8bc..0211af64d0b 100644 --- a/src/proxy-capture/runtime.ts +++ b/src/proxy-capture/runtime.ts @@ -41,6 +41,7 @@ const SENSITIVE_CAPTURE_HEADER_NAME_FRAGMENTS = [ type GlobalFetchPatchedState = { originalFetch: typeof globalThis.fetch; + patchedFetch: typeof globalThis.fetch; }; type GlobalFetchPatchTarget = typeof globalThis & { @@ -134,15 +135,16 @@ function installDebugProxyGlobalFetchPatch(settings: DebugProxySettings): void { return; } const patched = globalThis as GlobalFetchPatchTarget; - if (patched[DEBUG_PROXY_FETCH_PATCH_KEY]) { + const existing = patched[DEBUG_PROXY_FETCH_PATCH_KEY]; + if (existing && globalThis.fetch === existing.patchedFetch) { return; } - const originalFetch = globalThis.fetch.bind(globalThis); - patched[DEBUG_PROXY_FETCH_PATCH_KEY] = { originalFetch }; - globalThis.fetch = (async (input: RequestInfo | URL, init?: RequestInit) => { + const originalFetch = globalThis.fetch; + const callOriginalFetch = originalFetch.bind(globalThis); + const patchedFetch = (async (input: RequestInfo | URL, init?: RequestInit) => { const url = resolveUrlString(input); try { - const response = await originalFetch(input, init); + const response = await callOriginalFetch(input, init); if (url && /^https?:/i.test(url)) { captureHttpExchange({ url, @@ -199,6 +201,8 @@ function installDebugProxyGlobalFetchPatch(settings: DebugProxySettings): void { throw error; } }) as typeof globalThis.fetch; + patched[DEBUG_PROXY_FETCH_PATCH_KEY] = { originalFetch, patchedFetch }; + globalThis.fetch = patchedFetch; } function uninstallDebugProxyGlobalFetchPatch(): void { @@ -207,12 +211,15 @@ function uninstallDebugProxyGlobalFetchPatch(): void { if (!state) { return; } - globalThis.fetch = state.originalFetch; + if (globalThis.fetch === state.patchedFetch) { + globalThis.fetch = state.originalFetch; + } delete patched[DEBUG_PROXY_FETCH_PATCH_KEY]; } export function isDebugProxyGlobalFetchPatchInstalled(): boolean { - return Boolean((globalThis as GlobalFetchPatchTarget)[DEBUG_PROXY_FETCH_PATCH_KEY]); + const state = (globalThis as GlobalFetchPatchTarget)[DEBUG_PROXY_FETCH_PATCH_KEY]; + return Boolean(state && globalThis.fetch === state.patchedFetch); } export function initializeDebugProxyCapture(mode: string, resolved?: DebugProxySettings): void {