mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 17:10:49 +00:00
feat: route acp sessions through coven
This commit is contained in:
32
extensions/coven/index.ts
Normal file
32
extensions/coven/index.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import {
|
||||
registerAcpRuntimeBackend,
|
||||
unregisterAcpRuntimeBackend,
|
||||
} from "openclaw/plugin-sdk/acp-runtime";
|
||||
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
|
||||
import { createCovenPluginConfigSchema, resolveCovenPluginConfig } from "./src/config.js";
|
||||
import { CovenAcpRuntime, COVEN_BACKEND_ID } from "./src/runtime.js";
|
||||
|
||||
export default definePluginEntry({
|
||||
id: COVEN_BACKEND_ID,
|
||||
name: "Coven ACP Runtime",
|
||||
description:
|
||||
"Opt-in ACP runtime backend that launches coding tasks through a local Coven daemon.",
|
||||
configSchema: () => createCovenPluginConfigSchema(),
|
||||
register(api) {
|
||||
api.registerService({
|
||||
id: "coven-runtime",
|
||||
async start(ctx) {
|
||||
const config = resolveCovenPluginConfig({
|
||||
rawConfig: api.pluginConfig,
|
||||
workspaceDir: ctx.workspaceDir,
|
||||
});
|
||||
const runtime = new CovenAcpRuntime({ config, logger: ctx.logger });
|
||||
registerAcpRuntimeBackend({ id: COVEN_BACKEND_ID, runtime });
|
||||
ctx.logger.info(`coven ACP runtime backend registered (socket: ${config.socketPath})`);
|
||||
},
|
||||
async stop() {
|
||||
unregisterAcpRuntimeBackend(COVEN_BACKEND_ID);
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
33
extensions/coven/openclaw.plugin.json
Normal file
33
extensions/coven/openclaw.plugin.json
Normal file
@@ -0,0 +1,33 @@
|
||||
{
|
||||
"id": "coven",
|
||||
"enabledByDefault": false,
|
||||
"name": "Coven ACP Runtime",
|
||||
"description": "Opt-in ACP runtime backend that launches coding tasks through a local Coven daemon.",
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"covenHome": {
|
||||
"type": "string",
|
||||
"description": "Path to COVEN_HOME. Defaults to COVEN_HOME or ~/.coven."
|
||||
},
|
||||
"socketPath": {
|
||||
"type": "string",
|
||||
"description": "Path to the Coven daemon Unix socket. Defaults to <covenHome>/coven.sock."
|
||||
},
|
||||
"fallbackBackend": {
|
||||
"type": "string",
|
||||
"description": "ACP backend to use when Coven is unavailable. Defaults to acpx."
|
||||
},
|
||||
"pollIntervalMs": {
|
||||
"type": "number",
|
||||
"description": "Polling interval for Coven session events."
|
||||
},
|
||||
"harnesses": {
|
||||
"type": "object",
|
||||
"additionalProperties": { "type": "string" },
|
||||
"description": "Map OpenClaw ACP agent ids to Coven harness ids."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
15
extensions/coven/package.json
Normal file
15
extensions/coven/package.json
Normal file
@@ -0,0 +1,15 @@
|
||||
{
|
||||
"name": "@openclaw/coven-runtime",
|
||||
"version": "2026.4.26",
|
||||
"private": true,
|
||||
"description": "OpenClaw Coven ACP runtime bridge",
|
||||
"type": "module",
|
||||
"devDependencies": {
|
||||
"@openclaw/plugin-sdk": "workspace:*"
|
||||
},
|
||||
"openclaw": {
|
||||
"extensions": [
|
||||
"./index.ts"
|
||||
]
|
||||
}
|
||||
}
|
||||
200
extensions/coven/src/client.ts
Normal file
200
extensions/coven/src/client.ts
Normal file
@@ -0,0 +1,200 @@
|
||||
import net from "node:net";
|
||||
|
||||
export type CovenSessionRecord = {
|
||||
id: string;
|
||||
projectRoot: string;
|
||||
harness: string;
|
||||
title: string;
|
||||
status: string;
|
||||
exitCode: number | null;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
export type CovenEventRecord = {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
kind: string;
|
||||
payloadJson: string;
|
||||
createdAt: string;
|
||||
};
|
||||
|
||||
export type CovenHealthResponse = {
|
||||
ok: boolean;
|
||||
daemon?: {
|
||||
pid: number;
|
||||
startedAt: string;
|
||||
socket: string;
|
||||
} | null;
|
||||
};
|
||||
|
||||
export type LaunchCovenSessionInput = {
|
||||
projectRoot: string;
|
||||
cwd: string;
|
||||
harness: string;
|
||||
prompt: string;
|
||||
title: string;
|
||||
};
|
||||
|
||||
export interface CovenClient {
|
||||
health(signal?: AbortSignal): Promise<CovenHealthResponse>;
|
||||
launchSession(input: LaunchCovenSessionInput, signal?: AbortSignal): Promise<CovenSessionRecord>;
|
||||
getSession(sessionId: string, signal?: AbortSignal): Promise<CovenSessionRecord>;
|
||||
listEvents(sessionId: string, signal?: AbortSignal): Promise<CovenEventRecord[]>;
|
||||
sendInput(sessionId: string, data: string, signal?: AbortSignal): Promise<void>;
|
||||
killSession(sessionId: string, signal?: AbortSignal): Promise<void>;
|
||||
}
|
||||
|
||||
type RequestOptions = {
|
||||
socketPath: string;
|
||||
method: "GET" | "POST";
|
||||
path: string;
|
||||
body?: unknown;
|
||||
signal?: AbortSignal;
|
||||
};
|
||||
|
||||
type HttpResponse = {
|
||||
status: number;
|
||||
body: string;
|
||||
};
|
||||
|
||||
export class CovenApiError extends Error {
|
||||
readonly status: number;
|
||||
readonly body: string;
|
||||
|
||||
constructor(status: number, body: string) {
|
||||
super(`Coven API returned HTTP ${status || "unknown"}`);
|
||||
this.name = "CovenApiError";
|
||||
this.status = status;
|
||||
this.body = body;
|
||||
}
|
||||
}
|
||||
|
||||
function parseHttpResponse(raw: string): HttpResponse {
|
||||
const [head = "", ...bodyParts] = raw.split("\r\n\r\n");
|
||||
const statusMatch = /^HTTP\/\d(?:\.\d)?\s+(\d+)/i.exec(head);
|
||||
return {
|
||||
status: statusMatch ? Number(statusMatch[1]) : 0,
|
||||
body: bodyParts.join("\r\n\r\n"),
|
||||
};
|
||||
}
|
||||
|
||||
function requestOverSocket(options: RequestOptions): Promise<HttpResponse> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (options.signal?.aborted) {
|
||||
reject(options.signal.reason ?? new Error("request aborted"));
|
||||
return;
|
||||
}
|
||||
|
||||
const socket = net.createConnection(options.socketPath);
|
||||
const chunks: Buffer[] = [];
|
||||
let settled = false;
|
||||
|
||||
const settle = (fn: () => void) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
options.signal?.removeEventListener("abort", onAbort);
|
||||
fn();
|
||||
};
|
||||
|
||||
const onAbort = () => {
|
||||
socket.destroy();
|
||||
settle(() => reject(options.signal?.reason ?? new Error("request aborted")));
|
||||
};
|
||||
|
||||
options.signal?.addEventListener("abort", onAbort, { once: true });
|
||||
|
||||
socket.on("connect", () => {
|
||||
const body = options.body === undefined ? "" : JSON.stringify(options.body);
|
||||
const headers = [
|
||||
`${options.method} ${options.path} HTTP/1.1`,
|
||||
"Host: coven",
|
||||
"Connection: close",
|
||||
...(body
|
||||
? ["Content-Type: application/json", `Content-Length: ${Buffer.byteLength(body)}`]
|
||||
: []),
|
||||
"",
|
||||
body,
|
||||
];
|
||||
socket.write(headers.join("\r\n"));
|
||||
});
|
||||
socket.on("data", (chunk) => chunks.push(Buffer.from(chunk)));
|
||||
socket.on("error", (error) => settle(() => reject(error)));
|
||||
socket.on("end", () => {
|
||||
const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8"));
|
||||
settle(() => resolve(response));
|
||||
});
|
||||
socket.on("close", () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8"));
|
||||
settle(() => resolve(response));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function requestJson<T>(options: RequestOptions): Promise<T> {
|
||||
const response = await requestOverSocket(options);
|
||||
if (response.status < 200 || response.status >= 300) {
|
||||
throw new CovenApiError(response.status, response.body);
|
||||
}
|
||||
return JSON.parse(response.body || "null") as T;
|
||||
}
|
||||
|
||||
export function createCovenClient(socketPath: string): CovenClient {
|
||||
return {
|
||||
health(signal) {
|
||||
return requestJson<CovenHealthResponse>({
|
||||
socketPath,
|
||||
method: "GET",
|
||||
path: "/health",
|
||||
signal,
|
||||
});
|
||||
},
|
||||
launchSession(input, signal) {
|
||||
return requestJson<CovenSessionRecord>({
|
||||
socketPath,
|
||||
method: "POST",
|
||||
path: "/sessions",
|
||||
body: input,
|
||||
signal,
|
||||
});
|
||||
},
|
||||
getSession(sessionId, signal) {
|
||||
return requestJson<CovenSessionRecord>({
|
||||
socketPath,
|
||||
method: "GET",
|
||||
path: `/sessions/${encodeURIComponent(sessionId)}`,
|
||||
signal,
|
||||
});
|
||||
},
|
||||
listEvents(sessionId, signal) {
|
||||
return requestJson<CovenEventRecord[]>({
|
||||
socketPath,
|
||||
method: "GET",
|
||||
path: `/events?sessionId=${encodeURIComponent(sessionId)}`,
|
||||
signal,
|
||||
});
|
||||
},
|
||||
async sendInput(sessionId, data, signal) {
|
||||
await requestJson<unknown>({
|
||||
socketPath,
|
||||
method: "POST",
|
||||
path: `/sessions/${encodeURIComponent(sessionId)}/input`,
|
||||
body: { data },
|
||||
signal,
|
||||
});
|
||||
},
|
||||
async killSession(sessionId, signal) {
|
||||
await requestJson<unknown>({
|
||||
socketPath,
|
||||
method: "POST",
|
||||
path: `/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||
signal,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
83
extensions/coven/src/config.ts
Normal file
83
extensions/coven/src/config.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { buildPluginConfigSchema } from "openclaw/plugin-sdk/core";
|
||||
import { z } from "openclaw/plugin-sdk/zod";
|
||||
|
||||
export type CovenPluginConfig = {
|
||||
covenHome?: string;
|
||||
socketPath?: string;
|
||||
fallbackBackend?: string;
|
||||
pollIntervalMs?: number;
|
||||
harnesses?: Record<string, string>;
|
||||
};
|
||||
|
||||
export type ResolvedCovenPluginConfig = {
|
||||
covenHome: string;
|
||||
socketPath: string;
|
||||
fallbackBackend: string;
|
||||
pollIntervalMs: number;
|
||||
harnesses: Record<string, string>;
|
||||
};
|
||||
|
||||
const DEFAULT_FALLBACK_BACKEND = "acpx";
|
||||
const DEFAULT_POLL_INTERVAL_MS = 250;
|
||||
|
||||
const nonEmptyString = z.string().trim().min(1);
|
||||
|
||||
export const CovenPluginConfigSchema = z.strictObject({
|
||||
covenHome: nonEmptyString.optional(),
|
||||
socketPath: nonEmptyString.optional(),
|
||||
fallbackBackend: nonEmptyString.optional(),
|
||||
pollIntervalMs: z.number().min(25).max(10_000).optional(),
|
||||
harnesses: z.record(z.string(), nonEmptyString).optional(),
|
||||
});
|
||||
|
||||
export function createCovenPluginConfigSchema() {
|
||||
return buildPluginConfigSchema(CovenPluginConfigSchema);
|
||||
}
|
||||
|
||||
function normalizeBackendId(value: string | undefined): string {
|
||||
const normalized = value?.trim().toLowerCase();
|
||||
return normalized || DEFAULT_FALLBACK_BACKEND;
|
||||
}
|
||||
|
||||
function resolveCovenHome(raw: string | undefined): string {
|
||||
const fromConfig = raw?.trim();
|
||||
if (fromConfig) {
|
||||
return path.resolve(fromConfig);
|
||||
}
|
||||
const fromEnv = process.env.COVEN_HOME?.trim();
|
||||
if (fromEnv) {
|
||||
return path.resolve(fromEnv);
|
||||
}
|
||||
return path.join(os.homedir(), ".coven");
|
||||
}
|
||||
|
||||
function normalizeHarnesses(value: Record<string, string> | undefined): Record<string, string> {
|
||||
return Object.fromEntries(
|
||||
Object.entries(value ?? {}).flatMap(([agent, harness]) => {
|
||||
const normalizedAgent = agent.trim().toLowerCase();
|
||||
const normalizedHarness = harness.trim();
|
||||
return normalizedAgent && normalizedHarness ? [[normalizedAgent, normalizedHarness]] : [];
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
export function resolveCovenPluginConfig(params: {
|
||||
rawConfig: unknown;
|
||||
workspaceDir?: string;
|
||||
}): ResolvedCovenPluginConfig {
|
||||
const parsed = CovenPluginConfigSchema.safeParse(params.rawConfig ?? {});
|
||||
if (!parsed.success) {
|
||||
throw new Error(parsed.error.issues[0]?.message ?? "invalid Coven plugin config");
|
||||
}
|
||||
const config = parsed.data as CovenPluginConfig;
|
||||
const covenHome = resolveCovenHome(config.covenHome);
|
||||
return {
|
||||
covenHome,
|
||||
socketPath: path.resolve(config.socketPath?.trim() || path.join(covenHome, "coven.sock")),
|
||||
fallbackBackend: normalizeBackendId(config.fallbackBackend),
|
||||
pollIntervalMs: config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS,
|
||||
harnesses: normalizeHarnesses(config.harnesses),
|
||||
};
|
||||
}
|
||||
179
extensions/coven/src/runtime.test.ts
Normal file
179
extensions/coven/src/runtime.test.ts
Normal file
@@ -0,0 +1,179 @@
|
||||
import {
|
||||
registerAcpRuntimeBackend,
|
||||
unregisterAcpRuntimeBackend,
|
||||
type AcpRuntime,
|
||||
type AcpRuntimeEvent,
|
||||
type AcpRuntimeHandle,
|
||||
} from "openclaw/plugin-sdk/acp-runtime";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type { CovenClient, CovenEventRecord, CovenSessionRecord } from "./client.js";
|
||||
import type { ResolvedCovenPluginConfig } from "./config.js";
|
||||
import { CovenAcpRuntime } from "./runtime.js";
|
||||
|
||||
const config: ResolvedCovenPluginConfig = {
|
||||
covenHome: "/tmp/coven",
|
||||
socketPath: "/tmp/coven/coven.sock",
|
||||
fallbackBackend: "acpx",
|
||||
pollIntervalMs: 1,
|
||||
harnesses: {},
|
||||
};
|
||||
|
||||
function session(overrides: Partial<CovenSessionRecord> = {}): CovenSessionRecord {
|
||||
return {
|
||||
id: "session-1",
|
||||
projectRoot: "/repo",
|
||||
harness: "codex",
|
||||
title: "Fix tests",
|
||||
status: "running",
|
||||
exitCode: null,
|
||||
createdAt: "2026-04-27T10:00:00Z",
|
||||
updatedAt: "2026-04-27T10:00:00Z",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function event(overrides: Partial<CovenEventRecord>): CovenEventRecord {
|
||||
return {
|
||||
id: "event-1",
|
||||
sessionId: "session-1",
|
||||
kind: "output",
|
||||
payloadJson: JSON.stringify({ data: "hello\n" }),
|
||||
createdAt: "2026-04-27T10:00:00Z",
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function fakeClient(overrides: Partial<CovenClient> = {}): CovenClient {
|
||||
return {
|
||||
health: vi.fn(async () => ({ ok: true, daemon: null })),
|
||||
launchSession: vi.fn(async () => session()),
|
||||
getSession: vi.fn(async () => session({ status: "completed", exitCode: 0 })),
|
||||
listEvents: vi.fn(async () => [
|
||||
event({ id: "event-1", kind: "output", payloadJson: JSON.stringify({ data: "hello\n" }) }),
|
||||
event({
|
||||
id: "event-2",
|
||||
kind: "exit",
|
||||
payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }),
|
||||
}),
|
||||
]),
|
||||
sendInput: vi.fn(async () => undefined),
|
||||
killSession: vi.fn(async () => undefined),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
async function collect(iterable: AsyncIterable<AcpRuntimeEvent>): Promise<AcpRuntimeEvent[]> {
|
||||
const events: AcpRuntimeEvent[] = [];
|
||||
for await (const item of iterable) {
|
||||
events.push(item);
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
function fallbackRuntime(): AcpRuntime {
|
||||
const handle: AcpRuntimeHandle = {
|
||||
sessionKey: "agent:codex:test",
|
||||
backend: "acpx",
|
||||
runtimeSessionName: "fallback-session",
|
||||
cwd: "/repo",
|
||||
};
|
||||
return {
|
||||
ensureSession: vi.fn(async () => handle),
|
||||
async *runTurn() {
|
||||
yield { type: "text_delta", text: "direct fallback\n", stream: "output" };
|
||||
yield { type: "done", stopReason: "complete" };
|
||||
},
|
||||
getStatus: vi.fn(async () => ({ summary: "fallback active" })),
|
||||
cancel: vi.fn(async () => undefined),
|
||||
close: vi.fn(async () => undefined),
|
||||
};
|
||||
}
|
||||
|
||||
afterEach(() => {
|
||||
unregisterAcpRuntimeBackend("acpx");
|
||||
});
|
||||
|
||||
describe("CovenAcpRuntime", () => {
|
||||
it("falls back to the direct ACP backend when Coven is unavailable", async () => {
|
||||
const fallback = fallbackRuntime();
|
||||
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
|
||||
const runtime = new CovenAcpRuntime({
|
||||
config,
|
||||
client: fakeClient({ health: vi.fn(async () => Promise.reject(new Error("offline"))) }),
|
||||
});
|
||||
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: "/repo",
|
||||
});
|
||||
|
||||
expect(handle.backend).toBe("acpx");
|
||||
expect(fallback.ensureSession).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("launches a Coven session and streams output events to ACP", async () => {
|
||||
const client = fakeClient();
|
||||
const runtime = new CovenAcpRuntime({ config, client });
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: "/repo",
|
||||
});
|
||||
|
||||
const events = await collect(
|
||||
runtime.runTurn({
|
||||
handle,
|
||||
text: "Fix tests",
|
||||
mode: "prompt",
|
||||
requestId: "req-1",
|
||||
}),
|
||||
);
|
||||
|
||||
expect(client.launchSession).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectRoot: "/repo",
|
||||
cwd: "/repo",
|
||||
harness: "codex",
|
||||
prompt: "Fix tests",
|
||||
}),
|
||||
undefined,
|
||||
);
|
||||
expect(handle.backendSessionId).toBe("session-1");
|
||||
expect(events).toEqual([
|
||||
expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }),
|
||||
expect.objectContaining({ type: "text_delta", text: "hello\n" }),
|
||||
expect.objectContaining({ type: "status", text: "coven session completed exitCode=0" }),
|
||||
expect.objectContaining({ type: "done", stopReason: "completed" }),
|
||||
]);
|
||||
});
|
||||
|
||||
it("preserves direct fallback when Coven launch fails after detection", async () => {
|
||||
const fallback = fallbackRuntime();
|
||||
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
|
||||
const runtime = new CovenAcpRuntime({
|
||||
config,
|
||||
client: fakeClient({
|
||||
launchSession: vi.fn(async () => Promise.reject(new Error("launch failed"))),
|
||||
}),
|
||||
});
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: "/repo",
|
||||
});
|
||||
|
||||
const events = await collect(
|
||||
runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }),
|
||||
);
|
||||
|
||||
expect(handle.backend).toBe("acpx");
|
||||
expect(events).toEqual([
|
||||
expect.objectContaining({ type: "text_delta", text: "direct fallback\n" }),
|
||||
expect.objectContaining({ type: "done", stopReason: "complete" }),
|
||||
]);
|
||||
});
|
||||
});
|
||||
411
extensions/coven/src/runtime.ts
Normal file
411
extensions/coven/src/runtime.ts
Normal file
@@ -0,0 +1,411 @@
|
||||
import path from "node:path";
|
||||
import {
|
||||
AcpRuntimeError,
|
||||
getAcpRuntimeBackend,
|
||||
type AcpRuntime,
|
||||
type AcpRuntimeDoctorReport,
|
||||
type AcpRuntimeEvent,
|
||||
type AcpRuntimeHandle,
|
||||
type AcpRuntimeStatus,
|
||||
type AcpRuntimeTurnInput,
|
||||
} from "openclaw/plugin-sdk/acp-runtime";
|
||||
import type { PluginLogger } from "openclaw/plugin-sdk/plugin-entry";
|
||||
import {
|
||||
createCovenClient,
|
||||
type CovenClient,
|
||||
type CovenEventRecord,
|
||||
type CovenSessionRecord,
|
||||
} from "./client.js";
|
||||
import type { ResolvedCovenPluginConfig } from "./config.js";
|
||||
|
||||
export const COVEN_BACKEND_ID = "coven";
|
||||
|
||||
const DEFAULT_HARNESSES: Record<string, string> = {
|
||||
codex: "codex",
|
||||
"openai-codex": "codex",
|
||||
"codex-cli": "codex",
|
||||
claude: "claude",
|
||||
"claude-cli": "claude",
|
||||
gemini: "gemini",
|
||||
"google-gemini-cli": "gemini",
|
||||
opencode: "opencode",
|
||||
};
|
||||
|
||||
type CovenRuntimeSessionState = {
|
||||
agent: string;
|
||||
mode: "prompt" | "steer" | string;
|
||||
sessionMode?: string;
|
||||
cwd?: string;
|
||||
};
|
||||
|
||||
type CovenAcpRuntimeParams = {
|
||||
config: ResolvedCovenPluginConfig;
|
||||
logger?: PluginLogger;
|
||||
client?: CovenClient;
|
||||
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
|
||||
};
|
||||
|
||||
function normalizeAgentId(value: string | undefined): string {
|
||||
return value?.trim().toLowerCase() || "codex";
|
||||
}
|
||||
|
||||
function encodeRuntimeSessionName(state: CovenRuntimeSessionState): string {
|
||||
return `coven:${Buffer.from(JSON.stringify(state), "utf8").toString("base64url")}`;
|
||||
}
|
||||
|
||||
function decodeRuntimeSessionName(value: string): CovenRuntimeSessionState | null {
|
||||
const encoded = value.startsWith("coven:") ? value.slice("coven:".length) : "";
|
||||
if (!encoded) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(
|
||||
Buffer.from(encoded, "base64url").toString("utf8"),
|
||||
) as Partial<CovenRuntimeSessionState>;
|
||||
const agent = normalizeAgentId(typeof parsed.agent === "string" ? parsed.agent : undefined);
|
||||
return {
|
||||
agent,
|
||||
mode: typeof parsed.mode === "string" ? parsed.mode : "prompt",
|
||||
...(typeof parsed.sessionMode === "string" ? { sessionMode: parsed.sessionMode } : {}),
|
||||
...(typeof parsed.cwd === "string" && parsed.cwd.trim() ? { cwd: parsed.cwd.trim() } : {}),
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function defaultSleep(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (signal?.aborted) {
|
||||
reject(signal.reason ?? new Error("sleep aborted"));
|
||||
return;
|
||||
}
|
||||
const timeout = setTimeout(resolve, ms);
|
||||
signal?.addEventListener(
|
||||
"abort",
|
||||
() => {
|
||||
clearTimeout(timeout);
|
||||
reject(signal.reason ?? new Error("sleep aborted"));
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
function titleFromPrompt(prompt: string): string {
|
||||
const compact = prompt.replace(/\s+/g, " ").trim();
|
||||
return compact.slice(0, 80) || "OpenClaw task";
|
||||
}
|
||||
|
||||
function parsePayload(event: CovenEventRecord): Record<string, unknown> {
|
||||
try {
|
||||
const parsed = JSON.parse(event.payloadJson) as unknown;
|
||||
return typeof parsed === "object" && parsed !== null ? (parsed as Record<string, unknown>) : {};
|
||||
} catch {
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
function eventToRuntimeEvents(event: CovenEventRecord): AcpRuntimeEvent[] {
|
||||
const payload = parsePayload(event);
|
||||
if (event.kind === "output") {
|
||||
const text = typeof payload.data === "string" ? payload.data : "";
|
||||
return text ? [{ type: "text_delta", text, stream: "output", tag: "agent_message_chunk" }] : [];
|
||||
}
|
||||
if (event.kind === "exit") {
|
||||
const status = typeof payload.status === "string" ? payload.status : "completed";
|
||||
const exitCode = typeof payload.exitCode === "number" ? payload.exitCode : null;
|
||||
return [
|
||||
{
|
||||
type: "status",
|
||||
text: `coven session ${status}${exitCode == null ? "" : ` exitCode=${exitCode}`}`,
|
||||
tag: "session_info_update",
|
||||
},
|
||||
{ type: "done", stopReason: status },
|
||||
];
|
||||
}
|
||||
if (event.kind === "kill") {
|
||||
return [
|
||||
{ type: "status", text: "coven session killed", tag: "session_info_update" },
|
||||
{ type: "done", stopReason: "killed" },
|
||||
];
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
function sessionIsTerminal(session: CovenSessionRecord): boolean {
|
||||
return session.status !== "running" && session.status !== "created";
|
||||
}
|
||||
|
||||
function terminalStatusEvent(session: CovenSessionRecord): AcpRuntimeEvent {
|
||||
return {
|
||||
type: "status",
|
||||
text: `coven session ${session.status}${session.exitCode == null ? "" : ` exitCode=${session.exitCode}`}`,
|
||||
tag: "session_info_update",
|
||||
};
|
||||
}
|
||||
|
||||
export class CovenAcpRuntime implements AcpRuntime {
|
||||
private readonly config: ResolvedCovenPluginConfig;
|
||||
private readonly client: CovenClient;
|
||||
private readonly logger?: PluginLogger;
|
||||
private readonly sleep: (ms: number, signal?: AbortSignal) => Promise<void>;
|
||||
private readonly activeSessionIdsBySessionKey = new Map<string, string>();
|
||||
|
||||
constructor(params: CovenAcpRuntimeParams) {
|
||||
this.config = params.config;
|
||||
this.logger = params.logger;
|
||||
this.client = params.client ?? createCovenClient(params.config.socketPath);
|
||||
this.sleep = params.sleep ?? defaultSleep;
|
||||
}
|
||||
|
||||
async ensureSession(
|
||||
input: Parameters<AcpRuntime["ensureSession"]>[0],
|
||||
): Promise<AcpRuntimeHandle> {
|
||||
if (!(await this.isCovenAvailable())) {
|
||||
return await this.ensureFallbackSession(input);
|
||||
}
|
||||
const agent = normalizeAgentId(input.agent);
|
||||
return {
|
||||
sessionKey: input.sessionKey,
|
||||
backend: COVEN_BACKEND_ID,
|
||||
runtimeSessionName: encodeRuntimeSessionName({
|
||||
agent,
|
||||
mode: "prompt",
|
||||
sessionMode: input.mode,
|
||||
...(input.cwd ? { cwd: input.cwd } : {}),
|
||||
}),
|
||||
...(input.cwd ? { cwd: input.cwd } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
|
||||
if (input.handle.backend !== COVEN_BACKEND_ID) {
|
||||
yield* this.runFallbackTurn(input, input.handle);
|
||||
return;
|
||||
}
|
||||
const state = decodeRuntimeSessionName(input.handle.runtimeSessionName);
|
||||
if (!state) {
|
||||
throw new AcpRuntimeError(
|
||||
"ACP_SESSION_INIT_FAILED",
|
||||
"Coven runtime session metadata is missing.",
|
||||
);
|
||||
}
|
||||
|
||||
let session: CovenSessionRecord;
|
||||
try {
|
||||
session = await this.client.launchSession(
|
||||
{
|
||||
projectRoot: state.cwd ?? input.handle.cwd ?? process.cwd(),
|
||||
cwd: state.cwd ?? input.handle.cwd ?? process.cwd(),
|
||||
harness: this.resolveHarness(state.agent),
|
||||
prompt: input.text,
|
||||
title: titleFromPrompt(input.text),
|
||||
},
|
||||
input.signal,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger?.warn(
|
||||
`coven launch failed; falling back to ${this.config.fallbackBackend}: ${String(error)}`,
|
||||
);
|
||||
yield* this.runFallbackFromCovenHandle(input, state);
|
||||
return;
|
||||
}
|
||||
|
||||
input.handle.backendSessionId = session.id;
|
||||
input.handle.agentSessionId = session.id;
|
||||
this.activeSessionIdsBySessionKey.set(input.handle.sessionKey, session.id);
|
||||
yield {
|
||||
type: "status",
|
||||
text: `coven session ${session.id} started (${session.harness})`,
|
||||
tag: "session_info_update",
|
||||
};
|
||||
|
||||
const seenEventIds = new Set<string>();
|
||||
while (true) {
|
||||
if (input.signal?.aborted) {
|
||||
await this.killActiveSession(session.id, input.signal).catch(() => undefined);
|
||||
throw input.signal.reason ?? new Error("Coven turn aborted");
|
||||
}
|
||||
|
||||
const events = await this.client.listEvents(session.id, input.signal);
|
||||
for (const event of events) {
|
||||
if (seenEventIds.has(event.id)) {
|
||||
continue;
|
||||
}
|
||||
seenEventIds.add(event.id);
|
||||
for (const runtimeEvent of eventToRuntimeEvents(event)) {
|
||||
yield runtimeEvent;
|
||||
if (runtimeEvent.type === "done") {
|
||||
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const latest = await this.client.getSession(session.id, input.signal);
|
||||
if (sessionIsTerminal(latest)) {
|
||||
yield terminalStatusEvent(latest);
|
||||
yield { type: "done", stopReason: latest.status };
|
||||
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.sleep(this.config.pollIntervalMs, input.signal);
|
||||
}
|
||||
}
|
||||
|
||||
getCapabilities() {
|
||||
return { controls: ["session/status" as const] };
|
||||
}
|
||||
|
||||
async getStatus(
|
||||
input: Parameters<NonNullable<AcpRuntime["getStatus"]>>[0],
|
||||
): Promise<AcpRuntimeStatus> {
|
||||
if (input.handle.backend !== COVEN_BACKEND_ID) {
|
||||
const fallback = this.requireFallbackRuntime(input.handle.backend);
|
||||
return fallback.getStatus
|
||||
? await fallback.getStatus(input)
|
||||
: { summary: `fallback backend ${input.handle.backend} active` };
|
||||
}
|
||||
const sessionId =
|
||||
input.handle.backendSessionId ??
|
||||
this.activeSessionIdsBySessionKey.get(input.handle.sessionKey);
|
||||
if (!sessionId) {
|
||||
return { summary: "coven runtime ready" };
|
||||
}
|
||||
const session = await this.client.getSession(sessionId, input.signal);
|
||||
return {
|
||||
summary: `${session.status} ${session.harness} ${session.title}`,
|
||||
backendSessionId: session.id,
|
||||
agentSessionId: session.id,
|
||||
details: {
|
||||
projectRoot: session.projectRoot,
|
||||
harness: session.harness,
|
||||
status: session.status,
|
||||
exitCode: session.exitCode,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async doctor(): Promise<AcpRuntimeDoctorReport> {
|
||||
try {
|
||||
const health = await this.client.health();
|
||||
return health.ok
|
||||
? { ok: true, message: "Coven daemon is reachable." }
|
||||
: { ok: false, code: "COVEN_UNHEALTHY", message: "Coven daemon did not report healthy." };
|
||||
} catch (error) {
|
||||
return {
|
||||
ok: false,
|
||||
code: "COVEN_UNAVAILABLE",
|
||||
message: "Coven daemon is not reachable; direct ACP fallback remains available.",
|
||||
details: [String(error)],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async cancel(input: Parameters<AcpRuntime["cancel"]>[0]): Promise<void> {
|
||||
if (input.handle.backend !== COVEN_BACKEND_ID) {
|
||||
await this.requireFallbackRuntime(input.handle.backend).cancel(input);
|
||||
return;
|
||||
}
|
||||
const sessionId =
|
||||
input.handle.backendSessionId ??
|
||||
this.activeSessionIdsBySessionKey.get(input.handle.sessionKey);
|
||||
if (sessionId) {
|
||||
await this.killActiveSession(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
async close(input: Parameters<AcpRuntime["close"]>[0]): Promise<void> {
|
||||
if (input.handle.backend !== COVEN_BACKEND_ID) {
|
||||
await this.requireFallbackRuntime(input.handle.backend).close(input);
|
||||
return;
|
||||
}
|
||||
const sessionId =
|
||||
input.handle.backendSessionId ??
|
||||
this.activeSessionIdsBySessionKey.get(input.handle.sessionKey);
|
||||
if (sessionId && input.reason !== "oneshot-complete") {
|
||||
await this.killActiveSession(sessionId).catch(() => undefined);
|
||||
}
|
||||
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
|
||||
}
|
||||
|
||||
async prepareFreshSession(input: { sessionKey: string }): Promise<void> {
|
||||
this.activeSessionIdsBySessionKey.delete(input.sessionKey);
|
||||
const fallback = this.getFallbackRuntime();
|
||||
await fallback?.prepareFreshSession?.(input);
|
||||
}
|
||||
|
||||
private async isCovenAvailable(): Promise<boolean> {
|
||||
try {
|
||||
const health = await this.client.health();
|
||||
return health.ok === true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private resolveHarness(agent: string): string {
|
||||
const normalized = normalizeAgentId(agent);
|
||||
return this.config.harnesses[normalized] ?? DEFAULT_HARNESSES[normalized] ?? normalized;
|
||||
}
|
||||
|
||||
private getFallbackRuntime(backendId = this.config.fallbackBackend): AcpRuntime | null {
|
||||
const normalized = backendId.trim().toLowerCase();
|
||||
if (!normalized || normalized === COVEN_BACKEND_ID) {
|
||||
return null;
|
||||
}
|
||||
return getAcpRuntimeBackend(normalized)?.runtime ?? null;
|
||||
}
|
||||
|
||||
private requireFallbackRuntime(backendId = this.config.fallbackBackend): AcpRuntime {
|
||||
const runtime = this.getFallbackRuntime(backendId);
|
||||
if (!runtime) {
|
||||
throw new AcpRuntimeError(
|
||||
"ACP_BACKEND_UNAVAILABLE",
|
||||
`Coven fallback ACP backend "${backendId}" is not registered.`,
|
||||
);
|
||||
}
|
||||
return runtime;
|
||||
}
|
||||
|
||||
private async ensureFallbackSession(
|
||||
input: Parameters<AcpRuntime["ensureSession"]>[0],
|
||||
): Promise<AcpRuntimeHandle> {
|
||||
return await this.requireFallbackRuntime().ensureSession(input);
|
||||
}
|
||||
|
||||
private async *runFallbackTurn(
|
||||
input: AcpRuntimeTurnInput,
|
||||
handle: AcpRuntimeHandle,
|
||||
): AsyncIterable<AcpRuntimeEvent> {
|
||||
yield* this.requireFallbackRuntime(handle.backend).runTurn({ ...input, handle });
|
||||
}
|
||||
|
||||
private async *runFallbackFromCovenHandle(
|
||||
input: AcpRuntimeTurnInput,
|
||||
state: CovenRuntimeSessionState,
|
||||
): AsyncIterable<AcpRuntimeEvent> {
|
||||
const fallback = this.requireFallbackRuntime();
|
||||
const cwd = state.cwd ?? input.handle.cwd;
|
||||
const handle = await fallback.ensureSession({
|
||||
sessionKey: input.handle.sessionKey,
|
||||
agent: state.agent,
|
||||
mode: state.sessionMode === "persistent" ? "persistent" : "oneshot",
|
||||
...(cwd ? { cwd: path.resolve(cwd) } : {}),
|
||||
});
|
||||
Object.assign(input.handle, handle);
|
||||
yield* fallback.runTurn({ ...input, handle });
|
||||
}
|
||||
|
||||
private async killActiveSession(sessionId: string, signal?: AbortSignal): Promise<void> {
|
||||
await this.client.killSession(sessionId, signal);
|
||||
}
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
decodeRuntimeSessionName,
|
||||
encodeRuntimeSessionName,
|
||||
eventToRuntimeEvents,
|
||||
};
|
||||
Reference in New Issue
Block a user