From efcf170a70e6783e673cef1f23efab888a508489 Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Mon, 27 Apr 2026 10:39:44 -0500 Subject: [PATCH] fix(coven): harden runtime bridge --- CHANGELOG.md | 4 + docs/tools/acp-agents-setup.md | 6 + extensions/coven/index.ts | 2 +- extensions/coven/openclaw.plugin.json | 2 +- extensions/coven/src/client.test.ts | 89 +++++++++++++ extensions/coven/src/client.ts | 121 +++++++++++------- extensions/coven/src/config.test.ts | 67 ++++++++++ extensions/coven/src/config.ts | 49 +++++++- extensions/coven/src/runtime.test.ts | 172 +++++++++++++++++++++++++- extensions/coven/src/runtime.ts | 124 +++++++++++++++---- pnpm-lock.yaml | 6 + 11 files changed, 562 insertions(+), 80 deletions(-) create mode 100644 extensions/coven/src/client.test.ts create mode 100644 extensions/coven/src/config.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 18325c71e66..716424694dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ Docs: https://docs.openclaw.ai - Gateway/startup: pass the plugin metadata snapshot from config validation into plugin bootstrap so startup reuses one manifest product instead of rebuilding plugin metadata. Thanks @shakkernerd. - ACP/runtime: add an opt-in bundled Coven backend extension that routes ACP coding sessions through a local Coven daemon when `acp.backend="coven"`, while preserving the existing ACPX backend as the default fallback path. Thanks @BunsDev. +### Fixes + +- ACP/runtime: harden the opt-in Coven backend with workspace-confined launch paths, home-expanded Coven socket config, bounded socket responses, sanitized daemon output, and controlled polling failure handling. Thanks @BunsDev. + ## 2026.4.26 ### Changes diff --git a/docs/tools/acp-agents-setup.md b/docs/tools/acp-agents-setup.md index 563b8c80d78..f5e69df14f1 100644 --- a/docs/tools/acp-agents-setup.md +++ b/docs/tools/acp-agents-setup.md @@ -235,6 +235,12 @@ the Coven session id in the ACP runtime handle. If the health check or launch fails, OpenClaw falls back to the configured direct ACP backend (`acpx` by default) instead of breaking existing ACP behavior. +For path safety, `~` in `covenHome` and `socketPath` expands to the current +user home directory. Relative Coven paths resolve from the OpenClaw workspace, +not from the process working directory. `socketPath` must stay inside +`covenHome`; use the default `/coven.sock` unless your Coven daemon +uses a different socket filename in the same home directory. + The default harness mapping sends common ACP agent ids such as `codex`, `claude`, `gemini`, and `opencode` to the matching Coven harness id. Override `plugins.entries.coven.config.harnesses` only when your local Coven install uses diff --git a/extensions/coven/index.ts b/extensions/coven/index.ts index 623882bcc11..ef8319e53c3 100644 --- a/extensions/coven/index.ts +++ b/extensions/coven/index.ts @@ -22,7 +22,7 @@ export default definePluginEntry({ }); const runtime = new CovenAcpRuntime({ config, logger: ctx.logger }); registerAcpRuntimeBackend({ id: COVEN_BACKEND_ID, runtime }); - ctx.logger.info(`coven ACP runtime backend registered (socket: ${config.socketPath})`); + ctx.logger.info("coven ACP runtime backend registered"); }, async stop() { unregisterAcpRuntimeBackend(COVEN_BACKEND_ID); diff --git a/extensions/coven/openclaw.plugin.json b/extensions/coven/openclaw.plugin.json index 823f7d2ec02..64cf05de508 100644 --- a/extensions/coven/openclaw.plugin.json +++ b/extensions/coven/openclaw.plugin.json @@ -13,7 +13,7 @@ }, "socketPath": { "type": "string", - "description": "Path to the Coven daemon Unix socket. Defaults to /coven.sock." + "description": "Path to the Coven daemon Unix socket. Defaults to /coven.sock and must stay inside covenHome." }, "fallbackBackend": { "type": "string", diff --git a/extensions/coven/src/client.test.ts b/extensions/coven/src/client.test.ts new file mode 100644 index 00000000000..c028d6e59ff --- /dev/null +++ b/extensions/coven/src/client.test.ts @@ -0,0 +1,89 @@ +import fs from "node:fs/promises"; +import http from "node:http"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { CovenApiError, createCovenClient } from "./client.js"; + +let tmpDir: string; + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-client-")); +}); + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +async function withServer( + handler: http.RequestListener, + fn: (socketPath: string) => Promise, +): Promise { + const socketPath = path.join(tmpDir, "coven.sock"); + const server = http.createServer(handler); + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(socketPath, () => resolve()); + }); + try { + await fn(socketPath); + } finally { + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + }); + } +} + +describe("createCovenClient", () => { + it("parses daemon JSON over a Unix socket", async () => { + await withServer( + (_req, res) => { + res.setHeader("Content-Type", "application/json"); + res.end(JSON.stringify({ ok: true, daemon: null })); + }, + async (socketPath) => { + await expect(createCovenClient(socketPath).health()).resolves.toEqual({ + ok: true, + daemon: null, + }); + }, + ); + }); + + it("sends the event cursor when listing events", async () => { + await withServer( + (req, res) => { + expect(req.url).toBe("/events?sessionId=session-1&afterEventId=event-1"); + res.setHeader("Content-Type", "application/json"); + res.end("[]"); + }, + async (socketPath) => { + await expect( + createCovenClient(socketPath).listEvents("session-1", { afterEventId: "event-1" }), + ).resolves.toEqual([]); + }, + ); + }); + + it("wraps invalid daemon JSON in a typed API error", async () => { + await withServer( + (_req, res) => { + res.end("{not json"); + }, + async (socketPath) => { + await expect(createCovenClient(socketPath).health()).rejects.toBeInstanceOf(CovenApiError); + }, + ); + }); + + it("rejects daemon responses above the response size limit", async () => { + await withServer( + (_req, res) => { + res.end("x".repeat(1_000_001)); + }, + async (socketPath) => { + await expect(createCovenClient(socketPath).health()).rejects.toThrow(/size limit/); + }, + ); + }); +}); diff --git a/extensions/coven/src/client.ts b/extensions/coven/src/client.ts index 81edfe2b806..f96098fc4c6 100644 --- a/extensions/coven/src/client.ts +++ b/extensions/coven/src/client.ts @@ -1,4 +1,4 @@ -import net from "node:net"; +import http from "node:http"; export type CovenSessionRecord = { id: string; @@ -40,11 +40,19 @@ export interface CovenClient { health(signal?: AbortSignal): Promise; launchSession(input: LaunchCovenSessionInput, signal?: AbortSignal): Promise; getSession(sessionId: string, signal?: AbortSignal): Promise; - listEvents(sessionId: string, signal?: AbortSignal): Promise; + listEvents( + sessionId: string, + options?: CovenListEventsOptions, + signal?: AbortSignal, + ): Promise; sendInput(sessionId: string, data: string, signal?: AbortSignal): Promise; killSession(sessionId: string, signal?: AbortSignal): Promise; } +export type CovenListEventsOptions = { + afterEventId?: string; +}; + type RequestOptions = { socketPath: string; method: "GET" | "POST"; @@ -70,14 +78,8 @@ export class CovenApiError extends Error { } } -function parseHttpResponse(raw: string): HttpResponse { - const [head = "", ...bodyParts] = raw.split("\r\n\r\n"); - const statusMatch = /^HTTP\/\d(?:\.\d)?\s+(\d+)/i.exec(head); - return { - status: statusMatch ? Number(statusMatch[1]) : 0, - body: bodyParts.join("\r\n\r\n"), - }; -} +const DEFAULT_REQUEST_TIMEOUT_MS = 10_000; +const MAX_RESPONSE_BYTES = 1_000_000; function requestOverSocket(options: RequestOptions): Promise { return new Promise((resolve, reject) => { @@ -86,53 +88,71 @@ function requestOverSocket(options: RequestOptions): Promise { return; } - const socket = net.createConnection(options.socketPath); - const chunks: Buffer[] = []; let settled = false; + let body = ""; + let totalBytes = 0; - const settle = (fn: () => void) => { + const settle = (fn: () => void, req?: http.ClientRequest) => { if (settled) { return; } settled = true; - options.signal?.removeEventListener("abort", onAbort); + req?.destroy(); fn(); }; - const onAbort = () => { - socket.destroy(); - settle(() => reject(options.signal?.reason ?? new Error("request aborted"))); - }; - - options.signal?.addEventListener("abort", onAbort, { once: true }); - - socket.on("connect", () => { - const body = options.body === undefined ? "" : JSON.stringify(options.body); - const headers = [ - `${options.method} ${options.path} HTTP/1.1`, - "Host: coven", - "Connection: close", - ...(body - ? ["Content-Type: application/json", `Content-Length: ${Buffer.byteLength(body)}`] - : []), - "", - body, - ]; - socket.write(headers.join("\r\n")); + const requestBody = options.body === undefined ? "" : JSON.stringify(options.body); + const req = http.request( + { + socketPath: options.socketPath, + method: options.method, + path: options.path, + headers: { + Host: "coven", + Connection: "close", + ...(requestBody + ? { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(requestBody), + } + : {}), + }, + signal: options.signal, + }, + (res) => { + res.setEncoding("utf8"); + res.on("data", (chunk: string) => { + if (settled) { + return; + } + totalBytes += Buffer.byteLength(chunk); + if (totalBytes > MAX_RESPONSE_BYTES) { + settle(() => reject(new Error("Coven API response exceeded size limit")), req); + return; + } + body += chunk; + }); + res.on("end", () => { + settle(() => + resolve({ + status: res.statusCode ?? 0, + body, + }), + ); + }); + res.on("error", (error) => settle(() => reject(error), req)); + }, + ); + req.setTimeout(DEFAULT_REQUEST_TIMEOUT_MS, () => { + settle(() => reject(new Error("Coven API request timed out")), req); }); - socket.on("data", (chunk) => chunks.push(Buffer.from(chunk))); - socket.on("error", (error) => settle(() => reject(error))); - socket.on("end", () => { - const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8")); - settle(() => resolve(response)); - }); - socket.on("close", () => { + req.on("error", (error) => { if (settled) { return; } - const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8")); - settle(() => resolve(response)); + settle(() => reject(error)); }); + req.end(requestBody); }); } @@ -141,7 +161,11 @@ async function requestJson(options: RequestOptions): Promise { if (response.status < 200 || response.status >= 300) { throw new CovenApiError(response.status, response.body); } - return JSON.parse(response.body || "null") as T; + try { + return JSON.parse(response.body || "null") as T; + } catch (error) { + throw new CovenApiError(response.status, `Invalid JSON response: ${String(error)}`); + } } export function createCovenClient(socketPath: string): CovenClient { @@ -171,11 +195,16 @@ export function createCovenClient(socketPath: string): CovenClient { signal, }); }, - listEvents(sessionId, signal) { + listEvents(sessionId, options, signal) { + const params = new URLSearchParams({ sessionId }); + const afterEventId = options?.afterEventId?.trim(); + if (afterEventId) { + params.set("afterEventId", afterEventId); + } return requestJson({ socketPath, method: "GET", - path: `/events?sessionId=${encodeURIComponent(sessionId)}`, + path: `/events?${params.toString()}`, signal, }); }, diff --git a/extensions/coven/src/config.test.ts b/extensions/coven/src/config.test.ts new file mode 100644 index 00000000000..34bb4cd362c --- /dev/null +++ b/extensions/coven/src/config.test.ts @@ -0,0 +1,67 @@ +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { resolveCovenPluginConfig } from "./config.js"; + +const OLD_COVEN_HOME = process.env.COVEN_HOME; + +afterEach(() => { + if (OLD_COVEN_HOME === undefined) { + delete process.env.COVEN_HOME; + } else { + process.env.COVEN_HOME = OLD_COVEN_HOME; + } +}); + +describe("resolveCovenPluginConfig", () => { + it("expands tilde paths before resolving Coven home and socket path", () => { + const resolved = resolveCovenPluginConfig({ + rawConfig: { + covenHome: "~/.coven", + socketPath: "~/.coven/coven.sock", + }, + workspaceDir: "/repo", + }); + + expect(resolved.covenHome).toBe(path.join(os.homedir(), ".coven")); + expect(resolved.socketPath).toBe(path.join(os.homedir(), ".coven", "coven.sock")); + }); + + it("resolves relative Coven paths from the workspace instead of process cwd", () => { + const resolved = resolveCovenPluginConfig({ + rawConfig: { + covenHome: ".coven", + socketPath: ".coven/coven.sock", + }, + workspaceDir: "/repo", + }); + + expect(resolved.workspaceDir).toBe("/repo"); + expect(resolved.covenHome).toBe("/repo/.coven"); + expect(resolved.socketPath).toBe("/repo/.coven/coven.sock"); + }); + + it("rejects socket paths outside covenHome", () => { + expect(() => + resolveCovenPluginConfig({ + rawConfig: { + covenHome: "~/.coven", + socketPath: "/var/run/docker.sock", + }, + workspaceDir: "/repo", + }), + ).toThrow(/socketPath must stay inside covenHome/); + }); + + it("uses COVEN_HOME with tilde expansion for the default socket path", () => { + process.env.COVEN_HOME = "~/.custom-coven"; + + const resolved = resolveCovenPluginConfig({ + rawConfig: {}, + workspaceDir: "/repo", + }); + + expect(resolved.covenHome).toBe(path.join(os.homedir(), ".custom-coven")); + expect(resolved.socketPath).toBe(path.join(os.homedir(), ".custom-coven", "coven.sock")); + }); +}); diff --git a/extensions/coven/src/config.ts b/extensions/coven/src/config.ts index a8f8ea09c8c..bafc42142d6 100644 --- a/extensions/coven/src/config.ts +++ b/extensions/coven/src/config.ts @@ -14,6 +14,7 @@ export type CovenPluginConfig = { export type ResolvedCovenPluginConfig = { covenHome: string; socketPath: string; + workspaceDir: string; fallbackBackend: string; pollIntervalMs: number; harnesses: Record; @@ -41,18 +42,49 @@ function normalizeBackendId(value: string | undefined): string { return normalized || DEFAULT_FALLBACK_BACKEND; } -function resolveCovenHome(raw: string | undefined): string { +function expandTilde(raw: string): string { + const trimmed = raw.trim(); + if (trimmed === "~") { + return os.homedir(); + } + if (trimmed.startsWith("~/")) { + return path.join(os.homedir(), trimmed.slice(2)); + } + return trimmed; +} + +function resolveConfiguredPath(raw: string, baseDir: string): string { + const expanded = expandTilde(raw); + return path.isAbsolute(expanded) ? path.resolve(expanded) : path.resolve(baseDir, expanded); +} + +function pathIsInside(parent: string, child: string): boolean { + const relative = path.relative(parent, child); + return relative === "" || (!relative.startsWith("..") && !path.isAbsolute(relative)); +} + +function resolveCovenHome(raw: string | undefined, baseDir: string): string { const fromConfig = raw?.trim(); if (fromConfig) { - return path.resolve(fromConfig); + return resolveConfiguredPath(fromConfig, baseDir); } const fromEnv = process.env.COVEN_HOME?.trim(); if (fromEnv) { - return path.resolve(fromEnv); + return resolveConfiguredPath(fromEnv, baseDir); } return path.join(os.homedir(), ".coven"); } +function resolveSocketPath(covenHome: string, raw: string | undefined, baseDir: string): string { + const socketPath = raw?.trim() + ? resolveConfiguredPath(raw, baseDir) + : path.join(covenHome, "coven.sock"); + if (!pathIsInside(covenHome, socketPath)) { + throw new Error("Coven socketPath must stay inside covenHome"); + } + return socketPath; +} + function normalizeHarnesses(value: Record | undefined): Record { return Object.fromEntries( Object.entries(value ?? {}).flatMap(([agent, harness]) => { @@ -72,12 +104,19 @@ export function resolveCovenPluginConfig(params: { throw new Error(parsed.error.issues[0]?.message ?? "invalid Coven plugin config"); } const config = parsed.data as CovenPluginConfig; - const covenHome = resolveCovenHome(config.covenHome); + const workspaceDir = path.resolve(params.workspaceDir ?? process.cwd()); + const covenHome = resolveCovenHome(config.covenHome, workspaceDir); return { covenHome, - socketPath: path.resolve(config.socketPath?.trim() || path.join(covenHome, "coven.sock")), + socketPath: resolveSocketPath(covenHome, config.socketPath, workspaceDir), + workspaceDir, fallbackBackend: normalizeBackendId(config.fallbackBackend), pollIntervalMs: config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS, harnesses: normalizeHarnesses(config.harnesses), }; } + +export const __testing = { + expandTilde, + resolveConfiguredPath, +}; diff --git a/extensions/coven/src/runtime.test.ts b/extensions/coven/src/runtime.test.ts index 93ee7e22ef8..0104c74d786 100644 --- a/extensions/coven/src/runtime.test.ts +++ b/extensions/coven/src/runtime.test.ts @@ -8,11 +8,12 @@ import { import { afterEach, describe, expect, it, vi } from "vitest"; import type { CovenClient, CovenEventRecord, CovenSessionRecord } from "./client.js"; import type { ResolvedCovenPluginConfig } from "./config.js"; -import { CovenAcpRuntime } from "./runtime.js"; +import { __testing, CovenAcpRuntime } from "./runtime.js"; const config: ResolvedCovenPluginConfig = { covenHome: "/tmp/coven", socketPath: "/tmp/coven/coven.sock", + workspaceDir: "/repo", fallbackBackend: "acpx", pollIntervalMs: 1, harnesses: {}, @@ -90,6 +91,7 @@ function fallbackRuntime(): AcpRuntime { } afterEach(() => { + vi.useRealTimers(); unregisterAcpRuntimeBackend("acpx"); }); @@ -99,7 +101,11 @@ describe("CovenAcpRuntime", () => { registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); const runtime = new CovenAcpRuntime({ config, - client: fakeClient({ health: vi.fn(async () => Promise.reject(new Error("offline"))) }), + client: fakeClient({ + health: vi.fn(async () => { + throw new Error("offline"); + }), + }), }); const handle = await runtime.ensureSession({ @@ -113,6 +119,34 @@ describe("CovenAcpRuntime", () => { expect(fallback.ensureSession).toHaveBeenCalledOnce(); }); + it("falls back when Coven health checks do not settle before the deadline", async () => { + vi.useFakeTimers(); + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const client = fakeClient({ + health: vi.fn( + async (signal?: AbortSignal) => + await new Promise((_resolve, reject) => { + signal?.addEventListener("abort", () => reject(signal.reason ?? new Error("aborted")), { + once: true, + }); + }), + ), + }); + const runtime = new CovenAcpRuntime({ config, client }); + + const pending = runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + await vi.advanceTimersByTimeAsync(5_000); + const handle = await pending; + + expect(handle.backend).toBe("acpx"); + }); + it("launches a Coven session and streams output events to ACP", async () => { const client = fakeClient(); const runtime = new CovenAcpRuntime({ config, client }); @@ -150,13 +184,145 @@ describe("CovenAcpRuntime", () => { ]); }); + it("ignores cwd embedded in runtimeSessionName when launching Coven sessions", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + handle.runtimeSessionName = __testing.encodeRuntimeSessionName({ + agent: "codex", + mode: "prompt", + cwd: "/tmp/attacker", + }); + + await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.launchSession).toHaveBeenCalledWith( + expect.objectContaining({ + projectRoot: "/repo", + cwd: "/repo", + }), + undefined, + ); + }); + + it("rejects Coven handles whose cwd is outside the configured workspace", async () => { + const runtime = new CovenAcpRuntime({ config, client: fakeClient() }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + handle.cwd = "/tmp/attacker"; + + await expect( + collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ), + ).rejects.toThrow(/outside workspace/); + }); + + it("requests incremental events after the last processed Coven event", async () => { + const client = fakeClient({ + listEvents: vi + .fn() + .mockResolvedValueOnce([ + event({ + id: "event-1", + kind: "output", + payloadJson: JSON.stringify({ data: "hello\n" }), + }), + ]) + .mockResolvedValueOnce([ + event({ + id: "event-2", + kind: "exit", + payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }), + }), + ]), + getSession: vi.fn(async () => session({ status: "running" })), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + + await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(client.listEvents).toHaveBeenNthCalledWith( + 2, + "session-1", + { + afterEventId: "event-1", + }, + undefined, + ); + }); + + it("converts Coven polling failures into controlled terminal events", async () => { + const client = fakeClient({ + listEvents: vi.fn(async () => { + throw new Error("bad json"); + }), + killSession: vi.fn(async () => undefined), + }); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + + const events = await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(client.killSession).toHaveBeenCalledWith("session-1", undefined); + expect(events).toEqual([ + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + expect.objectContaining({ type: "status", text: "coven session polling failed" }), + expect.objectContaining({ type: "done", stopReason: "error" }), + ]); + }); + + it("strips terminal escape and control characters from Coven output", () => { + expect(__testing.sanitizeTerminalText("\u001b]0;spoof\u0007hi\u001b[31m!\u001b[0m\r\n")).toBe( + "hi!\n", + ); + }); + it("preserves direct fallback when Coven launch fails after detection", async () => { const fallback = fallbackRuntime(); registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); const runtime = new CovenAcpRuntime({ config, client: fakeClient({ - launchSession: vi.fn(async () => Promise.reject(new Error("launch failed"))), + launchSession: vi.fn(async () => { + throw new Error("launch failed"); + }), }), }); const handle = await runtime.ensureSession({ diff --git a/extensions/coven/src/runtime.ts b/extensions/coven/src/runtime.ts index 1e5e7c73afd..7fa5dfcc0e8 100644 --- a/extensions/coven/src/runtime.ts +++ b/extensions/coven/src/runtime.ts @@ -30,10 +30,12 @@ const DEFAULT_HARNESSES: Record = { "google-gemini-cli": "gemini", opencode: "opencode", }; +const HEALTH_CHECK_TIMEOUT_MS = 5_000; +const MAX_TRACKED_EVENT_IDS = 10_000; type CovenRuntimeSessionState = { agent: string; - mode: "prompt" | "steer" | string; + mode: string; sessionMode?: string; cwd?: string; }; @@ -106,10 +108,32 @@ function parsePayload(event: CovenEventRecord): Record { } } +const ESC = String.fromCharCode(0x1b); +const BEL = String.fromCharCode(0x07); +const c0Start = String.fromCharCode(0x00); +const c0Backspace = String.fromCharCode(0x08); +const c0VerticalTab = String.fromCharCode(0x0b); +const c0UnitSeparator = String.fromCharCode(0x1f); +const del = String.fromCharCode(0x7f); +const c1Start = String.fromCharCode(0x80); +const c1End = String.fromCharCode(0x9f); +const ANSI_ESCAPE_REGEX = new RegExp( + `${ESC}(?:\\[[\\x20-\\x3f]*[\\x40-\\x7e]|\\][^${BEL}${ESC}]*(?:${BEL}|${ESC}\\\\)|[\\x40-\\x5f])`, + "g", +); +const TEXT_CONTROL_REGEX = new RegExp( + `[${c0Start}-${c0Backspace}${c0VerticalTab}-${c0UnitSeparator}${del}${c1Start}-${c1End}]`, + "g", +); + +function sanitizeTerminalText(input: string): string { + return input.replace(ANSI_ESCAPE_REGEX, "").replace(TEXT_CONTROL_REGEX, ""); +} + function eventToRuntimeEvents(event: CovenEventRecord): AcpRuntimeEvent[] { const payload = parsePayload(event); if (event.kind === "output") { - const text = typeof payload.data === "string" ? payload.data : ""; + const text = typeof payload.data === "string" ? sanitizeTerminalText(payload.data) : ""; return text ? [{ type: "text_delta", text, stream: "output", tag: "agent_message_chunk" }] : []; } if (event.kind === "exit") { @@ -145,6 +169,11 @@ function terminalStatusEvent(session: CovenSessionRecord): AcpRuntimeEvent { }; } +function pathIsInside(parent: string, child: string): boolean { + const relative = path.relative(parent, child); + return relative === "" || (!relative.startsWith("..") && !path.isAbsolute(relative)); +} + export class CovenAcpRuntime implements AcpRuntime { private readonly config: ResolvedCovenPluginConfig; private readonly client: CovenClient; @@ -192,12 +221,13 @@ export class CovenAcpRuntime implements AcpRuntime { ); } + const cwd = this.resolveWorkspaceCwd(input.handle.cwd); let session: CovenSessionRecord; try { session = await this.client.launchSession( { - projectRoot: state.cwd ?? input.handle.cwd ?? process.cwd(), - cwd: state.cwd ?? input.handle.cwd ?? process.cwd(), + projectRoot: this.config.workspaceDir, + cwd, harness: this.resolveHarness(state.agent), prompt: input.text, title: titleFromPrompt(input.text), @@ -222,32 +252,63 @@ export class CovenAcpRuntime implements AcpRuntime { }; const seenEventIds = new Set(); + const seenEventQueue: string[] = []; + let lastSeenEventId: string | undefined; while (true) { if (input.signal?.aborted) { await this.killActiveSession(session.id, input.signal).catch(() => undefined); throw input.signal.reason ?? new Error("Coven turn aborted"); } - const events = await this.client.listEvents(session.id, input.signal); - for (const event of events) { - if (seenEventIds.has(event.id)) { - continue; - } - seenEventIds.add(event.id); - for (const runtimeEvent of eventToRuntimeEvents(event)) { - yield runtimeEvent; - if (runtimeEvent.type === "done") { - this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); - return; + try { + const events = await this.client.listEvents( + session.id, + lastSeenEventId ? { afterEventId: lastSeenEventId } : undefined, + input.signal, + ); + const cursorIndex = lastSeenEventId + ? events.findIndex((event) => event.id === lastSeenEventId) + : -1; + const nextEvents = cursorIndex >= 0 ? events.slice(cursorIndex + 1) : events; + for (const event of nextEvents) { + if (seenEventIds.has(event.id)) { + continue; + } + seenEventIds.add(event.id); + seenEventQueue.push(event.id); + while (seenEventQueue.length > MAX_TRACKED_EVENT_IDS) { + const removed = seenEventQueue.shift(); + if (removed) { + seenEventIds.delete(removed); + } + } + lastSeenEventId = event.id; + for (const runtimeEvent of eventToRuntimeEvents(event)) { + yield runtimeEvent; + if (runtimeEvent.type === "done") { + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + return; + } } } - } - const latest = await this.client.getSession(session.id, input.signal); - if (sessionIsTerminal(latest)) { - yield terminalStatusEvent(latest); - yield { type: "done", stopReason: latest.status }; + const latest = await this.client.getSession(session.id, input.signal); + if (sessionIsTerminal(latest)) { + yield terminalStatusEvent(latest); + yield { type: "done", stopReason: latest.status }; + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + return; + } + } catch (error) { + if (input.signal?.aborted) { + await this.killActiveSession(session.id, input.signal).catch(() => undefined); + throw input.signal.reason ?? error; + } + this.logger?.warn(`coven polling failed: ${String(error)}`); + await this.killActiveSession(session.id).catch(() => undefined); this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + yield { type: "status", text: "coven session polling failed", tag: "session_info_update" }; + yield { type: "done", stopReason: "error" }; return; } @@ -338,11 +399,18 @@ export class CovenAcpRuntime implements AcpRuntime { } private async isCovenAvailable(): Promise { + const controller = new AbortController(); + const timeout = setTimeout( + () => controller.abort(new Error("Coven health check timed out")), + HEALTH_CHECK_TIMEOUT_MS, + ); try { - const health = await this.client.health(); - return health.ok === true; + const health = await this.client.health(controller.signal); + return health.ok; } catch { return false; + } finally { + clearTimeout(timeout); } } @@ -388,17 +456,24 @@ export class CovenAcpRuntime implements AcpRuntime { state: CovenRuntimeSessionState, ): AsyncIterable { const fallback = this.requireFallbackRuntime(); - const cwd = state.cwd ?? input.handle.cwd; const handle = await fallback.ensureSession({ sessionKey: input.handle.sessionKey, agent: state.agent, mode: state.sessionMode === "persistent" ? "persistent" : "oneshot", - ...(cwd ? { cwd: path.resolve(cwd) } : {}), + cwd: this.resolveWorkspaceCwd(input.handle.cwd), }); Object.assign(input.handle, handle); yield* fallback.runTurn({ ...input, handle }); } + private resolveWorkspaceCwd(candidate: string | undefined): string { + const cwd = path.resolve(candidate ?? this.config.workspaceDir); + if (!pathIsInside(this.config.workspaceDir, cwd)) { + throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "Coven cwd is outside workspace."); + } + return cwd; + } + private async killActiveSession(sessionId: string, signal?: AbortSignal): Promise { await this.client.killSession(sessionId, signal); } @@ -408,4 +483,5 @@ export const __testing = { decodeRuntimeSessionName, encodeRuntimeSessionName, eventToRuntimeEvents, + sanitizeTerminalText, }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5124ee5929b..c9592a2b8ae 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -419,6 +419,12 @@ importers: specifier: workspace:* version: link:../../packages/plugin-sdk + extensions/coven: + devDependencies: + '@openclaw/plugin-sdk': + specifier: workspace:* + version: link:../../packages/plugin-sdk + extensions/deepgram: dependencies: ws: