fix(coven): harden runtime bridge

This commit is contained in:
Val Alexander
2026-04-27 10:39:44 -05:00
parent 79792c5a3e
commit efcf170a70
11 changed files with 562 additions and 80 deletions

View File

@@ -11,6 +11,10 @@ Docs: https://docs.openclaw.ai
- Gateway/startup: pass the plugin metadata snapshot from config validation into plugin bootstrap so startup reuses one manifest product instead of rebuilding plugin metadata. Thanks @shakkernerd.
- ACP/runtime: add an opt-in bundled Coven backend extension that routes ACP coding sessions through a local Coven daemon when `acp.backend="coven"`, while preserving the existing ACPX backend as the default fallback path. Thanks @BunsDev.
### Fixes
- ACP/runtime: harden the opt-in Coven backend with workspace-confined launch paths, home-expanded Coven socket config, bounded socket responses, sanitized daemon output, and controlled polling failure handling. Thanks @BunsDev.
## 2026.4.26
### Changes

View File

@@ -235,6 +235,12 @@ the Coven session id in the ACP runtime handle. If the health check or launch
fails, OpenClaw falls back to the configured direct ACP backend (`acpx` by
default) instead of breaking existing ACP behavior.
For path safety, `~` in `covenHome` and `socketPath` expands to the current
user home directory. Relative Coven paths resolve from the OpenClaw workspace,
not from the process working directory. `socketPath` must stay inside
`covenHome`; use the default `<covenHome>/coven.sock` unless your Coven daemon
uses a different socket filename in the same home directory.
The default harness mapping sends common ACP agent ids such as `codex`,
`claude`, `gemini`, and `opencode` to the matching Coven harness id. Override
`plugins.entries.coven.config.harnesses` only when your local Coven install uses

View File

@@ -22,7 +22,7 @@ export default definePluginEntry({
});
const runtime = new CovenAcpRuntime({ config, logger: ctx.logger });
registerAcpRuntimeBackend({ id: COVEN_BACKEND_ID, runtime });
ctx.logger.info(`coven ACP runtime backend registered (socket: ${config.socketPath})`);
ctx.logger.info("coven ACP runtime backend registered");
},
async stop() {
unregisterAcpRuntimeBackend(COVEN_BACKEND_ID);

View File

@@ -13,7 +13,7 @@
},
"socketPath": {
"type": "string",
"description": "Path to the Coven daemon Unix socket. Defaults to <covenHome>/coven.sock."
"description": "Path to the Coven daemon Unix socket. Defaults to <covenHome>/coven.sock and must stay inside covenHome."
},
"fallbackBackend": {
"type": "string",

View File

@@ -0,0 +1,89 @@
import fs from "node:fs/promises";
import http from "node:http";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { CovenApiError, createCovenClient } from "./client.js";
let tmpDir: string;
beforeEach(async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-coven-client-"));
});
afterEach(async () => {
await fs.rm(tmpDir, { recursive: true, force: true });
});
async function withServer(
handler: http.RequestListener,
fn: (socketPath: string) => Promise<void>,
): Promise<void> {
const socketPath = path.join(tmpDir, "coven.sock");
const server = http.createServer(handler);
await new Promise<void>((resolve, reject) => {
server.once("error", reject);
server.listen(socketPath, () => resolve());
});
try {
await fn(socketPath);
} finally {
await new Promise<void>((resolve, reject) => {
server.close((error) => (error ? reject(error) : resolve()));
});
}
}
describe("createCovenClient", () => {
it("parses daemon JSON over a Unix socket", async () => {
await withServer(
(_req, res) => {
res.setHeader("Content-Type", "application/json");
res.end(JSON.stringify({ ok: true, daemon: null }));
},
async (socketPath) => {
await expect(createCovenClient(socketPath).health()).resolves.toEqual({
ok: true,
daemon: null,
});
},
);
});
it("sends the event cursor when listing events", async () => {
await withServer(
(req, res) => {
expect(req.url).toBe("/events?sessionId=session-1&afterEventId=event-1");
res.setHeader("Content-Type", "application/json");
res.end("[]");
},
async (socketPath) => {
await expect(
createCovenClient(socketPath).listEvents("session-1", { afterEventId: "event-1" }),
).resolves.toEqual([]);
},
);
});
it("wraps invalid daemon JSON in a typed API error", async () => {
await withServer(
(_req, res) => {
res.end("{not json");
},
async (socketPath) => {
await expect(createCovenClient(socketPath).health()).rejects.toBeInstanceOf(CovenApiError);
},
);
});
it("rejects daemon responses above the response size limit", async () => {
await withServer(
(_req, res) => {
res.end("x".repeat(1_000_001));
},
async (socketPath) => {
await expect(createCovenClient(socketPath).health()).rejects.toThrow(/size limit/);
},
);
});
});

View File

@@ -1,4 +1,4 @@
import net from "node:net";
import http from "node:http";
export type CovenSessionRecord = {
id: string;
@@ -40,11 +40,19 @@ export interface CovenClient {
health(signal?: AbortSignal): Promise<CovenHealthResponse>;
launchSession(input: LaunchCovenSessionInput, signal?: AbortSignal): Promise<CovenSessionRecord>;
getSession(sessionId: string, signal?: AbortSignal): Promise<CovenSessionRecord>;
listEvents(sessionId: string, signal?: AbortSignal): Promise<CovenEventRecord[]>;
listEvents(
sessionId: string,
options?: CovenListEventsOptions,
signal?: AbortSignal,
): Promise<CovenEventRecord[]>;
sendInput(sessionId: string, data: string, signal?: AbortSignal): Promise<void>;
killSession(sessionId: string, signal?: AbortSignal): Promise<void>;
}
export type CovenListEventsOptions = {
afterEventId?: string;
};
type RequestOptions = {
socketPath: string;
method: "GET" | "POST";
@@ -70,14 +78,8 @@ export class CovenApiError extends Error {
}
}
function parseHttpResponse(raw: string): HttpResponse {
const [head = "", ...bodyParts] = raw.split("\r\n\r\n");
const statusMatch = /^HTTP\/\d(?:\.\d)?\s+(\d+)/i.exec(head);
return {
status: statusMatch ? Number(statusMatch[1]) : 0,
body: bodyParts.join("\r\n\r\n"),
};
}
const DEFAULT_REQUEST_TIMEOUT_MS = 10_000;
const MAX_RESPONSE_BYTES = 1_000_000;
function requestOverSocket(options: RequestOptions): Promise<HttpResponse> {
return new Promise((resolve, reject) => {
@@ -86,53 +88,71 @@ function requestOverSocket(options: RequestOptions): Promise<HttpResponse> {
return;
}
const socket = net.createConnection(options.socketPath);
const chunks: Buffer[] = [];
let settled = false;
let body = "";
let totalBytes = 0;
const settle = (fn: () => void) => {
const settle = (fn: () => void, req?: http.ClientRequest) => {
if (settled) {
return;
}
settled = true;
options.signal?.removeEventListener("abort", onAbort);
req?.destroy();
fn();
};
const onAbort = () => {
socket.destroy();
settle(() => reject(options.signal?.reason ?? new Error("request aborted")));
};
options.signal?.addEventListener("abort", onAbort, { once: true });
socket.on("connect", () => {
const body = options.body === undefined ? "" : JSON.stringify(options.body);
const headers = [
`${options.method} ${options.path} HTTP/1.1`,
"Host: coven",
"Connection: close",
...(body
? ["Content-Type: application/json", `Content-Length: ${Buffer.byteLength(body)}`]
: []),
"",
body,
];
socket.write(headers.join("\r\n"));
const requestBody = options.body === undefined ? "" : JSON.stringify(options.body);
const req = http.request(
{
socketPath: options.socketPath,
method: options.method,
path: options.path,
headers: {
Host: "coven",
Connection: "close",
...(requestBody
? {
"Content-Type": "application/json",
"Content-Length": Buffer.byteLength(requestBody),
}
: {}),
},
signal: options.signal,
},
(res) => {
res.setEncoding("utf8");
res.on("data", (chunk: string) => {
if (settled) {
return;
}
totalBytes += Buffer.byteLength(chunk);
if (totalBytes > MAX_RESPONSE_BYTES) {
settle(() => reject(new Error("Coven API response exceeded size limit")), req);
return;
}
body += chunk;
});
res.on("end", () => {
settle(() =>
resolve({
status: res.statusCode ?? 0,
body,
}),
);
});
res.on("error", (error) => settle(() => reject(error), req));
},
);
req.setTimeout(DEFAULT_REQUEST_TIMEOUT_MS, () => {
settle(() => reject(new Error("Coven API request timed out")), req);
});
socket.on("data", (chunk) => chunks.push(Buffer.from(chunk)));
socket.on("error", (error) => settle(() => reject(error)));
socket.on("end", () => {
const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8"));
settle(() => resolve(response));
});
socket.on("close", () => {
req.on("error", (error) => {
if (settled) {
return;
}
const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8"));
settle(() => resolve(response));
settle(() => reject(error));
});
req.end(requestBody);
});
}
@@ -141,7 +161,11 @@ async function requestJson<T>(options: RequestOptions): Promise<T> {
if (response.status < 200 || response.status >= 300) {
throw new CovenApiError(response.status, response.body);
}
return JSON.parse(response.body || "null") as T;
try {
return JSON.parse(response.body || "null") as T;
} catch (error) {
throw new CovenApiError(response.status, `Invalid JSON response: ${String(error)}`);
}
}
export function createCovenClient(socketPath: string): CovenClient {
@@ -171,11 +195,16 @@ export function createCovenClient(socketPath: string): CovenClient {
signal,
});
},
listEvents(sessionId, signal) {
listEvents(sessionId, options, signal) {
const params = new URLSearchParams({ sessionId });
const afterEventId = options?.afterEventId?.trim();
if (afterEventId) {
params.set("afterEventId", afterEventId);
}
return requestJson<CovenEventRecord[]>({
socketPath,
method: "GET",
path: `/events?sessionId=${encodeURIComponent(sessionId)}`,
path: `/events?${params.toString()}`,
signal,
});
},

View File

@@ -0,0 +1,67 @@
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { resolveCovenPluginConfig } from "./config.js";
const OLD_COVEN_HOME = process.env.COVEN_HOME;
afterEach(() => {
if (OLD_COVEN_HOME === undefined) {
delete process.env.COVEN_HOME;
} else {
process.env.COVEN_HOME = OLD_COVEN_HOME;
}
});
describe("resolveCovenPluginConfig", () => {
it("expands tilde paths before resolving Coven home and socket path", () => {
const resolved = resolveCovenPluginConfig({
rawConfig: {
covenHome: "~/.coven",
socketPath: "~/.coven/coven.sock",
},
workspaceDir: "/repo",
});
expect(resolved.covenHome).toBe(path.join(os.homedir(), ".coven"));
expect(resolved.socketPath).toBe(path.join(os.homedir(), ".coven", "coven.sock"));
});
it("resolves relative Coven paths from the workspace instead of process cwd", () => {
const resolved = resolveCovenPluginConfig({
rawConfig: {
covenHome: ".coven",
socketPath: ".coven/coven.sock",
},
workspaceDir: "/repo",
});
expect(resolved.workspaceDir).toBe("/repo");
expect(resolved.covenHome).toBe("/repo/.coven");
expect(resolved.socketPath).toBe("/repo/.coven/coven.sock");
});
it("rejects socket paths outside covenHome", () => {
expect(() =>
resolveCovenPluginConfig({
rawConfig: {
covenHome: "~/.coven",
socketPath: "/var/run/docker.sock",
},
workspaceDir: "/repo",
}),
).toThrow(/socketPath must stay inside covenHome/);
});
it("uses COVEN_HOME with tilde expansion for the default socket path", () => {
process.env.COVEN_HOME = "~/.custom-coven";
const resolved = resolveCovenPluginConfig({
rawConfig: {},
workspaceDir: "/repo",
});
expect(resolved.covenHome).toBe(path.join(os.homedir(), ".custom-coven"));
expect(resolved.socketPath).toBe(path.join(os.homedir(), ".custom-coven", "coven.sock"));
});
});

View File

@@ -14,6 +14,7 @@ export type CovenPluginConfig = {
export type ResolvedCovenPluginConfig = {
covenHome: string;
socketPath: string;
workspaceDir: string;
fallbackBackend: string;
pollIntervalMs: number;
harnesses: Record<string, string>;
@@ -41,18 +42,49 @@ function normalizeBackendId(value: string | undefined): string {
return normalized || DEFAULT_FALLBACK_BACKEND;
}
function resolveCovenHome(raw: string | undefined): string {
function expandTilde(raw: string): string {
const trimmed = raw.trim();
if (trimmed === "~") {
return os.homedir();
}
if (trimmed.startsWith("~/")) {
return path.join(os.homedir(), trimmed.slice(2));
}
return trimmed;
}
function resolveConfiguredPath(raw: string, baseDir: string): string {
const expanded = expandTilde(raw);
return path.isAbsolute(expanded) ? path.resolve(expanded) : path.resolve(baseDir, expanded);
}
function pathIsInside(parent: string, child: string): boolean {
const relative = path.relative(parent, child);
return relative === "" || (!relative.startsWith("..") && !path.isAbsolute(relative));
}
function resolveCovenHome(raw: string | undefined, baseDir: string): string {
const fromConfig = raw?.trim();
if (fromConfig) {
return path.resolve(fromConfig);
return resolveConfiguredPath(fromConfig, baseDir);
}
const fromEnv = process.env.COVEN_HOME?.trim();
if (fromEnv) {
return path.resolve(fromEnv);
return resolveConfiguredPath(fromEnv, baseDir);
}
return path.join(os.homedir(), ".coven");
}
function resolveSocketPath(covenHome: string, raw: string | undefined, baseDir: string): string {
const socketPath = raw?.trim()
? resolveConfiguredPath(raw, baseDir)
: path.join(covenHome, "coven.sock");
if (!pathIsInside(covenHome, socketPath)) {
throw new Error("Coven socketPath must stay inside covenHome");
}
return socketPath;
}
function normalizeHarnesses(value: Record<string, string> | undefined): Record<string, string> {
return Object.fromEntries(
Object.entries(value ?? {}).flatMap(([agent, harness]) => {
@@ -72,12 +104,19 @@ export function resolveCovenPluginConfig(params: {
throw new Error(parsed.error.issues[0]?.message ?? "invalid Coven plugin config");
}
const config = parsed.data as CovenPluginConfig;
const covenHome = resolveCovenHome(config.covenHome);
const workspaceDir = path.resolve(params.workspaceDir ?? process.cwd());
const covenHome = resolveCovenHome(config.covenHome, workspaceDir);
return {
covenHome,
socketPath: path.resolve(config.socketPath?.trim() || path.join(covenHome, "coven.sock")),
socketPath: resolveSocketPath(covenHome, config.socketPath, workspaceDir),
workspaceDir,
fallbackBackend: normalizeBackendId(config.fallbackBackend),
pollIntervalMs: config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS,
harnesses: normalizeHarnesses(config.harnesses),
};
}
export const __testing = {
expandTilde,
resolveConfiguredPath,
};

View File

@@ -8,11 +8,12 @@ import {
import { afterEach, describe, expect, it, vi } from "vitest";
import type { CovenClient, CovenEventRecord, CovenSessionRecord } from "./client.js";
import type { ResolvedCovenPluginConfig } from "./config.js";
import { CovenAcpRuntime } from "./runtime.js";
import { __testing, CovenAcpRuntime } from "./runtime.js";
const config: ResolvedCovenPluginConfig = {
covenHome: "/tmp/coven",
socketPath: "/tmp/coven/coven.sock",
workspaceDir: "/repo",
fallbackBackend: "acpx",
pollIntervalMs: 1,
harnesses: {},
@@ -90,6 +91,7 @@ function fallbackRuntime(): AcpRuntime {
}
afterEach(() => {
vi.useRealTimers();
unregisterAcpRuntimeBackend("acpx");
});
@@ -99,7 +101,11 @@ describe("CovenAcpRuntime", () => {
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
const runtime = new CovenAcpRuntime({
config,
client: fakeClient({ health: vi.fn(async () => Promise.reject(new Error("offline"))) }),
client: fakeClient({
health: vi.fn(async () => {
throw new Error("offline");
}),
}),
});
const handle = await runtime.ensureSession({
@@ -113,6 +119,34 @@ describe("CovenAcpRuntime", () => {
expect(fallback.ensureSession).toHaveBeenCalledOnce();
});
it("falls back when Coven health checks do not settle before the deadline", async () => {
vi.useFakeTimers();
const fallback = fallbackRuntime();
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
const client = fakeClient({
health: vi.fn(
async (signal?: AbortSignal) =>
await new Promise<never>((_resolve, reject) => {
signal?.addEventListener("abort", () => reject(signal.reason ?? new Error("aborted")), {
once: true,
});
}),
),
});
const runtime = new CovenAcpRuntime({ config, client });
const pending = runtime.ensureSession({
sessionKey: "agent:codex:test",
agent: "codex",
mode: "oneshot",
cwd: "/repo",
});
await vi.advanceTimersByTimeAsync(5_000);
const handle = await pending;
expect(handle.backend).toBe("acpx");
});
it("launches a Coven session and streams output events to ACP", async () => {
const client = fakeClient();
const runtime = new CovenAcpRuntime({ config, client });
@@ -150,13 +184,145 @@ describe("CovenAcpRuntime", () => {
]);
});
it("ignores cwd embedded in runtimeSessionName when launching Coven sessions", async () => {
const client = fakeClient();
const runtime = new CovenAcpRuntime({ config, client });
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:test",
agent: "codex",
mode: "oneshot",
cwd: "/repo",
});
handle.runtimeSessionName = __testing.encodeRuntimeSessionName({
agent: "codex",
mode: "prompt",
cwd: "/tmp/attacker",
});
await collect(
runtime.runTurn({
handle,
text: "Fix tests",
mode: "prompt",
requestId: "req-1",
}),
);
expect(client.launchSession).toHaveBeenCalledWith(
expect.objectContaining({
projectRoot: "/repo",
cwd: "/repo",
}),
undefined,
);
});
it("rejects Coven handles whose cwd is outside the configured workspace", async () => {
const runtime = new CovenAcpRuntime({ config, client: fakeClient() });
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:test",
agent: "codex",
mode: "oneshot",
cwd: "/repo",
});
handle.cwd = "/tmp/attacker";
await expect(
collect(
runtime.runTurn({
handle,
text: "Fix tests",
mode: "prompt",
requestId: "req-1",
}),
),
).rejects.toThrow(/outside workspace/);
});
it("requests incremental events after the last processed Coven event", async () => {
const client = fakeClient({
listEvents: vi
.fn()
.mockResolvedValueOnce([
event({
id: "event-1",
kind: "output",
payloadJson: JSON.stringify({ data: "hello\n" }),
}),
])
.mockResolvedValueOnce([
event({
id: "event-2",
kind: "exit",
payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }),
}),
]),
getSession: vi.fn(async () => session({ status: "running" })),
});
const runtime = new CovenAcpRuntime({ config, client });
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:test",
agent: "codex",
mode: "oneshot",
cwd: "/repo",
});
await collect(
runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }),
);
expect(client.listEvents).toHaveBeenNthCalledWith(
2,
"session-1",
{
afterEventId: "event-1",
},
undefined,
);
});
it("converts Coven polling failures into controlled terminal events", async () => {
const client = fakeClient({
listEvents: vi.fn(async () => {
throw new Error("bad json");
}),
killSession: vi.fn(async () => undefined),
});
const runtime = new CovenAcpRuntime({ config, client });
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:test",
agent: "codex",
mode: "oneshot",
cwd: "/repo",
});
const events = await collect(
runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }),
);
expect(client.killSession).toHaveBeenCalledWith("session-1", undefined);
expect(events).toEqual([
expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }),
expect.objectContaining({ type: "status", text: "coven session polling failed" }),
expect.objectContaining({ type: "done", stopReason: "error" }),
]);
});
it("strips terminal escape and control characters from Coven output", () => {
expect(__testing.sanitizeTerminalText("\u001b]0;spoof\u0007hi\u001b[31m!\u001b[0m\r\n")).toBe(
"hi!\n",
);
});
it("preserves direct fallback when Coven launch fails after detection", async () => {
const fallback = fallbackRuntime();
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
const runtime = new CovenAcpRuntime({
config,
client: fakeClient({
launchSession: vi.fn(async () => Promise.reject(new Error("launch failed"))),
launchSession: vi.fn(async () => {
throw new Error("launch failed");
}),
}),
});
const handle = await runtime.ensureSession({

View File

@@ -30,10 +30,12 @@ const DEFAULT_HARNESSES: Record<string, string> = {
"google-gemini-cli": "gemini",
opencode: "opencode",
};
const HEALTH_CHECK_TIMEOUT_MS = 5_000;
const MAX_TRACKED_EVENT_IDS = 10_000;
type CovenRuntimeSessionState = {
agent: string;
mode: "prompt" | "steer" | string;
mode: string;
sessionMode?: string;
cwd?: string;
};
@@ -106,10 +108,32 @@ function parsePayload(event: CovenEventRecord): Record<string, unknown> {
}
}
const ESC = String.fromCharCode(0x1b);
const BEL = String.fromCharCode(0x07);
const c0Start = String.fromCharCode(0x00);
const c0Backspace = String.fromCharCode(0x08);
const c0VerticalTab = String.fromCharCode(0x0b);
const c0UnitSeparator = String.fromCharCode(0x1f);
const del = String.fromCharCode(0x7f);
const c1Start = String.fromCharCode(0x80);
const c1End = String.fromCharCode(0x9f);
const ANSI_ESCAPE_REGEX = new RegExp(
`${ESC}(?:\\[[\\x20-\\x3f]*[\\x40-\\x7e]|\\][^${BEL}${ESC}]*(?:${BEL}|${ESC}\\\\)|[\\x40-\\x5f])`,
"g",
);
const TEXT_CONTROL_REGEX = new RegExp(
`[${c0Start}-${c0Backspace}${c0VerticalTab}-${c0UnitSeparator}${del}${c1Start}-${c1End}]`,
"g",
);
function sanitizeTerminalText(input: string): string {
return input.replace(ANSI_ESCAPE_REGEX, "").replace(TEXT_CONTROL_REGEX, "");
}
function eventToRuntimeEvents(event: CovenEventRecord): AcpRuntimeEvent[] {
const payload = parsePayload(event);
if (event.kind === "output") {
const text = typeof payload.data === "string" ? payload.data : "";
const text = typeof payload.data === "string" ? sanitizeTerminalText(payload.data) : "";
return text ? [{ type: "text_delta", text, stream: "output", tag: "agent_message_chunk" }] : [];
}
if (event.kind === "exit") {
@@ -145,6 +169,11 @@ function terminalStatusEvent(session: CovenSessionRecord): AcpRuntimeEvent {
};
}
function pathIsInside(parent: string, child: string): boolean {
const relative = path.relative(parent, child);
return relative === "" || (!relative.startsWith("..") && !path.isAbsolute(relative));
}
export class CovenAcpRuntime implements AcpRuntime {
private readonly config: ResolvedCovenPluginConfig;
private readonly client: CovenClient;
@@ -192,12 +221,13 @@ export class CovenAcpRuntime implements AcpRuntime {
);
}
const cwd = this.resolveWorkspaceCwd(input.handle.cwd);
let session: CovenSessionRecord;
try {
session = await this.client.launchSession(
{
projectRoot: state.cwd ?? input.handle.cwd ?? process.cwd(),
cwd: state.cwd ?? input.handle.cwd ?? process.cwd(),
projectRoot: this.config.workspaceDir,
cwd,
harness: this.resolveHarness(state.agent),
prompt: input.text,
title: titleFromPrompt(input.text),
@@ -222,32 +252,63 @@ export class CovenAcpRuntime implements AcpRuntime {
};
const seenEventIds = new Set<string>();
const seenEventQueue: string[] = [];
let lastSeenEventId: string | undefined;
while (true) {
if (input.signal?.aborted) {
await this.killActiveSession(session.id, input.signal).catch(() => undefined);
throw input.signal.reason ?? new Error("Coven turn aborted");
}
const events = await this.client.listEvents(session.id, input.signal);
for (const event of events) {
if (seenEventIds.has(event.id)) {
continue;
}
seenEventIds.add(event.id);
for (const runtimeEvent of eventToRuntimeEvents(event)) {
yield runtimeEvent;
if (runtimeEvent.type === "done") {
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
return;
try {
const events = await this.client.listEvents(
session.id,
lastSeenEventId ? { afterEventId: lastSeenEventId } : undefined,
input.signal,
);
const cursorIndex = lastSeenEventId
? events.findIndex((event) => event.id === lastSeenEventId)
: -1;
const nextEvents = cursorIndex >= 0 ? events.slice(cursorIndex + 1) : events;
for (const event of nextEvents) {
if (seenEventIds.has(event.id)) {
continue;
}
seenEventIds.add(event.id);
seenEventQueue.push(event.id);
while (seenEventQueue.length > MAX_TRACKED_EVENT_IDS) {
const removed = seenEventQueue.shift();
if (removed) {
seenEventIds.delete(removed);
}
}
lastSeenEventId = event.id;
for (const runtimeEvent of eventToRuntimeEvents(event)) {
yield runtimeEvent;
if (runtimeEvent.type === "done") {
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
return;
}
}
}
}
const latest = await this.client.getSession(session.id, input.signal);
if (sessionIsTerminal(latest)) {
yield terminalStatusEvent(latest);
yield { type: "done", stopReason: latest.status };
const latest = await this.client.getSession(session.id, input.signal);
if (sessionIsTerminal(latest)) {
yield terminalStatusEvent(latest);
yield { type: "done", stopReason: latest.status };
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
return;
}
} catch (error) {
if (input.signal?.aborted) {
await this.killActiveSession(session.id, input.signal).catch(() => undefined);
throw input.signal.reason ?? error;
}
this.logger?.warn(`coven polling failed: ${String(error)}`);
await this.killActiveSession(session.id).catch(() => undefined);
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
yield { type: "status", text: "coven session polling failed", tag: "session_info_update" };
yield { type: "done", stopReason: "error" };
return;
}
@@ -338,11 +399,18 @@ export class CovenAcpRuntime implements AcpRuntime {
}
private async isCovenAvailable(): Promise<boolean> {
const controller = new AbortController();
const timeout = setTimeout(
() => controller.abort(new Error("Coven health check timed out")),
HEALTH_CHECK_TIMEOUT_MS,
);
try {
const health = await this.client.health();
return health.ok === true;
const health = await this.client.health(controller.signal);
return health.ok;
} catch {
return false;
} finally {
clearTimeout(timeout);
}
}
@@ -388,17 +456,24 @@ export class CovenAcpRuntime implements AcpRuntime {
state: CovenRuntimeSessionState,
): AsyncIterable<AcpRuntimeEvent> {
const fallback = this.requireFallbackRuntime();
const cwd = state.cwd ?? input.handle.cwd;
const handle = await fallback.ensureSession({
sessionKey: input.handle.sessionKey,
agent: state.agent,
mode: state.sessionMode === "persistent" ? "persistent" : "oneshot",
...(cwd ? { cwd: path.resolve(cwd) } : {}),
cwd: this.resolveWorkspaceCwd(input.handle.cwd),
});
Object.assign(input.handle, handle);
yield* fallback.runTurn({ ...input, handle });
}
private resolveWorkspaceCwd(candidate: string | undefined): string {
const cwd = path.resolve(candidate ?? this.config.workspaceDir);
if (!pathIsInside(this.config.workspaceDir, cwd)) {
throw new AcpRuntimeError("ACP_SESSION_INIT_FAILED", "Coven cwd is outside workspace.");
}
return cwd;
}
private async killActiveSession(sessionId: string, signal?: AbortSignal): Promise<void> {
await this.client.killSession(sessionId, signal);
}
@@ -408,4 +483,5 @@ export const __testing = {
decodeRuntimeSessionName,
encodeRuntimeSessionName,
eventToRuntimeEvents,
sanitizeTerminalText,
};

6
pnpm-lock.yaml generated
View File

@@ -419,6 +419,12 @@ importers:
specifier: workspace:*
version: link:../../packages/plugin-sdk
extensions/coven:
devDependencies:
'@openclaw/plugin-sdk':
specifier: workspace:*
version: link:../../packages/plugin-sdk
extensions/deepgram:
dependencies:
ws: