From 4f411485fc3ad485dc3343a434e2876f2f9615fd Mon Sep 17 00:00:00 2001 From: Val Alexander Date: Mon, 27 Apr 2026 09:58:04 -0500 Subject: [PATCH] feat: route acp sessions through coven --- extensions/coven/index.ts | 32 ++ extensions/coven/openclaw.plugin.json | 33 +++ extensions/coven/package.json | 15 + extensions/coven/src/client.ts | 200 +++++++++++++ extensions/coven/src/config.ts | 83 ++++++ extensions/coven/src/runtime.test.ts | 179 +++++++++++ extensions/coven/src/runtime.ts | 411 ++++++++++++++++++++++++++ 7 files changed, 953 insertions(+) create mode 100644 extensions/coven/index.ts create mode 100644 extensions/coven/openclaw.plugin.json create mode 100644 extensions/coven/package.json create mode 100644 extensions/coven/src/client.ts create mode 100644 extensions/coven/src/config.ts create mode 100644 extensions/coven/src/runtime.test.ts create mode 100644 extensions/coven/src/runtime.ts diff --git a/extensions/coven/index.ts b/extensions/coven/index.ts new file mode 100644 index 00000000000..623882bcc11 --- /dev/null +++ b/extensions/coven/index.ts @@ -0,0 +1,32 @@ +import { + registerAcpRuntimeBackend, + unregisterAcpRuntimeBackend, +} from "openclaw/plugin-sdk/acp-runtime"; +import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; +import { createCovenPluginConfigSchema, resolveCovenPluginConfig } from "./src/config.js"; +import { CovenAcpRuntime, COVEN_BACKEND_ID } from "./src/runtime.js"; + +export default definePluginEntry({ + id: COVEN_BACKEND_ID, + name: "Coven ACP Runtime", + description: + "Opt-in ACP runtime backend that launches coding tasks through a local Coven daemon.", + configSchema: () => createCovenPluginConfigSchema(), + register(api) { + api.registerService({ + id: "coven-runtime", + async start(ctx) { + const config = resolveCovenPluginConfig({ + rawConfig: api.pluginConfig, + workspaceDir: ctx.workspaceDir, + }); + const runtime = new CovenAcpRuntime({ config, logger: ctx.logger }); + registerAcpRuntimeBackend({ id: COVEN_BACKEND_ID, runtime }); + ctx.logger.info(`coven ACP runtime backend registered (socket: ${config.socketPath})`); + }, + async stop() { + unregisterAcpRuntimeBackend(COVEN_BACKEND_ID); + }, + }); + }, +}); diff --git a/extensions/coven/openclaw.plugin.json b/extensions/coven/openclaw.plugin.json new file mode 100644 index 00000000000..823f7d2ec02 --- /dev/null +++ b/extensions/coven/openclaw.plugin.json @@ -0,0 +1,33 @@ +{ + "id": "coven", + "enabledByDefault": false, + "name": "Coven ACP Runtime", + "description": "Opt-in ACP runtime backend that launches coding tasks through a local Coven daemon.", + "configSchema": { + "type": "object", + "additionalProperties": false, + "properties": { + "covenHome": { + "type": "string", + "description": "Path to COVEN_HOME. Defaults to COVEN_HOME or ~/.coven." + }, + "socketPath": { + "type": "string", + "description": "Path to the Coven daemon Unix socket. Defaults to /coven.sock." + }, + "fallbackBackend": { + "type": "string", + "description": "ACP backend to use when Coven is unavailable. Defaults to acpx." + }, + "pollIntervalMs": { + "type": "number", + "description": "Polling interval for Coven session events." + }, + "harnesses": { + "type": "object", + "additionalProperties": { "type": "string" }, + "description": "Map OpenClaw ACP agent ids to Coven harness ids." + } + } + } +} diff --git a/extensions/coven/package.json b/extensions/coven/package.json new file mode 100644 index 00000000000..7f0247eb409 --- /dev/null +++ b/extensions/coven/package.json @@ -0,0 +1,15 @@ +{ + "name": "@openclaw/coven-runtime", + "version": "2026.4.26", + "private": true, + "description": "OpenClaw Coven ACP runtime bridge", + "type": "module", + "devDependencies": { + "@openclaw/plugin-sdk": "workspace:*" + }, + "openclaw": { + "extensions": [ + "./index.ts" + ] + } +} diff --git a/extensions/coven/src/client.ts b/extensions/coven/src/client.ts new file mode 100644 index 00000000000..81edfe2b806 --- /dev/null +++ b/extensions/coven/src/client.ts @@ -0,0 +1,200 @@ +import net from "node:net"; + +export type CovenSessionRecord = { + id: string; + projectRoot: string; + harness: string; + title: string; + status: string; + exitCode: number | null; + createdAt: string; + updatedAt: string; +}; + +export type CovenEventRecord = { + id: string; + sessionId: string; + kind: string; + payloadJson: string; + createdAt: string; +}; + +export type CovenHealthResponse = { + ok: boolean; + daemon?: { + pid: number; + startedAt: string; + socket: string; + } | null; +}; + +export type LaunchCovenSessionInput = { + projectRoot: string; + cwd: string; + harness: string; + prompt: string; + title: string; +}; + +export interface CovenClient { + health(signal?: AbortSignal): Promise; + launchSession(input: LaunchCovenSessionInput, signal?: AbortSignal): Promise; + getSession(sessionId: string, signal?: AbortSignal): Promise; + listEvents(sessionId: string, signal?: AbortSignal): Promise; + sendInput(sessionId: string, data: string, signal?: AbortSignal): Promise; + killSession(sessionId: string, signal?: AbortSignal): Promise; +} + +type RequestOptions = { + socketPath: string; + method: "GET" | "POST"; + path: string; + body?: unknown; + signal?: AbortSignal; +}; + +type HttpResponse = { + status: number; + body: string; +}; + +export class CovenApiError extends Error { + readonly status: number; + readonly body: string; + + constructor(status: number, body: string) { + super(`Coven API returned HTTP ${status || "unknown"}`); + this.name = "CovenApiError"; + this.status = status; + this.body = body; + } +} + +function parseHttpResponse(raw: string): HttpResponse { + const [head = "", ...bodyParts] = raw.split("\r\n\r\n"); + const statusMatch = /^HTTP\/\d(?:\.\d)?\s+(\d+)/i.exec(head); + return { + status: statusMatch ? Number(statusMatch[1]) : 0, + body: bodyParts.join("\r\n\r\n"), + }; +} + +function requestOverSocket(options: RequestOptions): Promise { + return new Promise((resolve, reject) => { + if (options.signal?.aborted) { + reject(options.signal.reason ?? new Error("request aborted")); + return; + } + + const socket = net.createConnection(options.socketPath); + const chunks: Buffer[] = []; + let settled = false; + + const settle = (fn: () => void) => { + if (settled) { + return; + } + settled = true; + options.signal?.removeEventListener("abort", onAbort); + fn(); + }; + + const onAbort = () => { + socket.destroy(); + settle(() => reject(options.signal?.reason ?? new Error("request aborted"))); + }; + + options.signal?.addEventListener("abort", onAbort, { once: true }); + + socket.on("connect", () => { + const body = options.body === undefined ? "" : JSON.stringify(options.body); + const headers = [ + `${options.method} ${options.path} HTTP/1.1`, + "Host: coven", + "Connection: close", + ...(body + ? ["Content-Type: application/json", `Content-Length: ${Buffer.byteLength(body)}`] + : []), + "", + body, + ]; + socket.write(headers.join("\r\n")); + }); + socket.on("data", (chunk) => chunks.push(Buffer.from(chunk))); + socket.on("error", (error) => settle(() => reject(error))); + socket.on("end", () => { + const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8")); + settle(() => resolve(response)); + }); + socket.on("close", () => { + if (settled) { + return; + } + const response = parseHttpResponse(Buffer.concat(chunks).toString("utf8")); + settle(() => resolve(response)); + }); + }); +} + +async function requestJson(options: RequestOptions): Promise { + const response = await requestOverSocket(options); + if (response.status < 200 || response.status >= 300) { + throw new CovenApiError(response.status, response.body); + } + return JSON.parse(response.body || "null") as T; +} + +export function createCovenClient(socketPath: string): CovenClient { + return { + health(signal) { + return requestJson({ + socketPath, + method: "GET", + path: "/health", + signal, + }); + }, + launchSession(input, signal) { + return requestJson({ + socketPath, + method: "POST", + path: "/sessions", + body: input, + signal, + }); + }, + getSession(sessionId, signal) { + return requestJson({ + socketPath, + method: "GET", + path: `/sessions/${encodeURIComponent(sessionId)}`, + signal, + }); + }, + listEvents(sessionId, signal) { + return requestJson({ + socketPath, + method: "GET", + path: `/events?sessionId=${encodeURIComponent(sessionId)}`, + signal, + }); + }, + async sendInput(sessionId, data, signal) { + await requestJson({ + socketPath, + method: "POST", + path: `/sessions/${encodeURIComponent(sessionId)}/input`, + body: { data }, + signal, + }); + }, + async killSession(sessionId, signal) { + await requestJson({ + socketPath, + method: "POST", + path: `/sessions/${encodeURIComponent(sessionId)}/kill`, + signal, + }); + }, + }; +} diff --git a/extensions/coven/src/config.ts b/extensions/coven/src/config.ts new file mode 100644 index 00000000000..a8f8ea09c8c --- /dev/null +++ b/extensions/coven/src/config.ts @@ -0,0 +1,83 @@ +import os from "node:os"; +import path from "node:path"; +import { buildPluginConfigSchema } from "openclaw/plugin-sdk/core"; +import { z } from "openclaw/plugin-sdk/zod"; + +export type CovenPluginConfig = { + covenHome?: string; + socketPath?: string; + fallbackBackend?: string; + pollIntervalMs?: number; + harnesses?: Record; +}; + +export type ResolvedCovenPluginConfig = { + covenHome: string; + socketPath: string; + fallbackBackend: string; + pollIntervalMs: number; + harnesses: Record; +}; + +const DEFAULT_FALLBACK_BACKEND = "acpx"; +const DEFAULT_POLL_INTERVAL_MS = 250; + +const nonEmptyString = z.string().trim().min(1); + +export const CovenPluginConfigSchema = z.strictObject({ + covenHome: nonEmptyString.optional(), + socketPath: nonEmptyString.optional(), + fallbackBackend: nonEmptyString.optional(), + pollIntervalMs: z.number().min(25).max(10_000).optional(), + harnesses: z.record(z.string(), nonEmptyString).optional(), +}); + +export function createCovenPluginConfigSchema() { + return buildPluginConfigSchema(CovenPluginConfigSchema); +} + +function normalizeBackendId(value: string | undefined): string { + const normalized = value?.trim().toLowerCase(); + return normalized || DEFAULT_FALLBACK_BACKEND; +} + +function resolveCovenHome(raw: string | undefined): string { + const fromConfig = raw?.trim(); + if (fromConfig) { + return path.resolve(fromConfig); + } + const fromEnv = process.env.COVEN_HOME?.trim(); + if (fromEnv) { + return path.resolve(fromEnv); + } + return path.join(os.homedir(), ".coven"); +} + +function normalizeHarnesses(value: Record | undefined): Record { + return Object.fromEntries( + Object.entries(value ?? {}).flatMap(([agent, harness]) => { + const normalizedAgent = agent.trim().toLowerCase(); + const normalizedHarness = harness.trim(); + return normalizedAgent && normalizedHarness ? [[normalizedAgent, normalizedHarness]] : []; + }), + ); +} + +export function resolveCovenPluginConfig(params: { + rawConfig: unknown; + workspaceDir?: string; +}): ResolvedCovenPluginConfig { + const parsed = CovenPluginConfigSchema.safeParse(params.rawConfig ?? {}); + if (!parsed.success) { + throw new Error(parsed.error.issues[0]?.message ?? "invalid Coven plugin config"); + } + const config = parsed.data as CovenPluginConfig; + const covenHome = resolveCovenHome(config.covenHome); + return { + covenHome, + socketPath: path.resolve(config.socketPath?.trim() || path.join(covenHome, "coven.sock")), + fallbackBackend: normalizeBackendId(config.fallbackBackend), + pollIntervalMs: config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS, + harnesses: normalizeHarnesses(config.harnesses), + }; +} diff --git a/extensions/coven/src/runtime.test.ts b/extensions/coven/src/runtime.test.ts new file mode 100644 index 00000000000..93ee7e22ef8 --- /dev/null +++ b/extensions/coven/src/runtime.test.ts @@ -0,0 +1,179 @@ +import { + registerAcpRuntimeBackend, + unregisterAcpRuntimeBackend, + type AcpRuntime, + type AcpRuntimeEvent, + type AcpRuntimeHandle, +} from "openclaw/plugin-sdk/acp-runtime"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { CovenClient, CovenEventRecord, CovenSessionRecord } from "./client.js"; +import type { ResolvedCovenPluginConfig } from "./config.js"; +import { CovenAcpRuntime } from "./runtime.js"; + +const config: ResolvedCovenPluginConfig = { + covenHome: "/tmp/coven", + socketPath: "/tmp/coven/coven.sock", + fallbackBackend: "acpx", + pollIntervalMs: 1, + harnesses: {}, +}; + +function session(overrides: Partial = {}): CovenSessionRecord { + return { + id: "session-1", + projectRoot: "/repo", + harness: "codex", + title: "Fix tests", + status: "running", + exitCode: null, + createdAt: "2026-04-27T10:00:00Z", + updatedAt: "2026-04-27T10:00:00Z", + ...overrides, + }; +} + +function event(overrides: Partial): CovenEventRecord { + return { + id: "event-1", + sessionId: "session-1", + kind: "output", + payloadJson: JSON.stringify({ data: "hello\n" }), + createdAt: "2026-04-27T10:00:00Z", + ...overrides, + }; +} + +function fakeClient(overrides: Partial = {}): CovenClient { + return { + health: vi.fn(async () => ({ ok: true, daemon: null })), + launchSession: vi.fn(async () => session()), + getSession: vi.fn(async () => session({ status: "completed", exitCode: 0 })), + listEvents: vi.fn(async () => [ + event({ id: "event-1", kind: "output", payloadJson: JSON.stringify({ data: "hello\n" }) }), + event({ + id: "event-2", + kind: "exit", + payloadJson: JSON.stringify({ status: "completed", exitCode: 0 }), + }), + ]), + sendInput: vi.fn(async () => undefined), + killSession: vi.fn(async () => undefined), + ...overrides, + }; +} + +async function collect(iterable: AsyncIterable): Promise { + const events: AcpRuntimeEvent[] = []; + for await (const item of iterable) { + events.push(item); + } + return events; +} + +function fallbackRuntime(): AcpRuntime { + const handle: AcpRuntimeHandle = { + sessionKey: "agent:codex:test", + backend: "acpx", + runtimeSessionName: "fallback-session", + cwd: "/repo", + }; + return { + ensureSession: vi.fn(async () => handle), + async *runTurn() { + yield { type: "text_delta", text: "direct fallback\n", stream: "output" }; + yield { type: "done", stopReason: "complete" }; + }, + getStatus: vi.fn(async () => ({ summary: "fallback active" })), + cancel: vi.fn(async () => undefined), + close: vi.fn(async () => undefined), + }; +} + +afterEach(() => { + unregisterAcpRuntimeBackend("acpx"); +}); + +describe("CovenAcpRuntime", () => { + it("falls back to the direct ACP backend when Coven is unavailable", async () => { + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const runtime = new CovenAcpRuntime({ + config, + client: fakeClient({ health: vi.fn(async () => Promise.reject(new Error("offline"))) }), + }); + + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + + expect(handle.backend).toBe("acpx"); + expect(fallback.ensureSession).toHaveBeenCalledOnce(); + }); + + it("launches a Coven session and streams output events to ACP", async () => { + const client = fakeClient(); + const runtime = new CovenAcpRuntime({ config, client }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + + const events = await collect( + runtime.runTurn({ + handle, + text: "Fix tests", + mode: "prompt", + requestId: "req-1", + }), + ); + + expect(client.launchSession).toHaveBeenCalledWith( + expect.objectContaining({ + projectRoot: "/repo", + cwd: "/repo", + harness: "codex", + prompt: "Fix tests", + }), + undefined, + ); + expect(handle.backendSessionId).toBe("session-1"); + expect(events).toEqual([ + expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }), + expect.objectContaining({ type: "text_delta", text: "hello\n" }), + expect.objectContaining({ type: "status", text: "coven session completed exitCode=0" }), + expect.objectContaining({ type: "done", stopReason: "completed" }), + ]); + }); + + it("preserves direct fallback when Coven launch fails after detection", async () => { + const fallback = fallbackRuntime(); + registerAcpRuntimeBackend({ id: "acpx", runtime: fallback }); + const runtime = new CovenAcpRuntime({ + config, + client: fakeClient({ + launchSession: vi.fn(async () => Promise.reject(new Error("launch failed"))), + }), + }); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:test", + agent: "codex", + mode: "oneshot", + cwd: "/repo", + }); + + const events = await collect( + runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }), + ); + + expect(handle.backend).toBe("acpx"); + expect(events).toEqual([ + expect.objectContaining({ type: "text_delta", text: "direct fallback\n" }), + expect.objectContaining({ type: "done", stopReason: "complete" }), + ]); + }); +}); diff --git a/extensions/coven/src/runtime.ts b/extensions/coven/src/runtime.ts new file mode 100644 index 00000000000..1e5e7c73afd --- /dev/null +++ b/extensions/coven/src/runtime.ts @@ -0,0 +1,411 @@ +import path from "node:path"; +import { + AcpRuntimeError, + getAcpRuntimeBackend, + type AcpRuntime, + type AcpRuntimeDoctorReport, + type AcpRuntimeEvent, + type AcpRuntimeHandle, + type AcpRuntimeStatus, + type AcpRuntimeTurnInput, +} from "openclaw/plugin-sdk/acp-runtime"; +import type { PluginLogger } from "openclaw/plugin-sdk/plugin-entry"; +import { + createCovenClient, + type CovenClient, + type CovenEventRecord, + type CovenSessionRecord, +} from "./client.js"; +import type { ResolvedCovenPluginConfig } from "./config.js"; + +export const COVEN_BACKEND_ID = "coven"; + +const DEFAULT_HARNESSES: Record = { + codex: "codex", + "openai-codex": "codex", + "codex-cli": "codex", + claude: "claude", + "claude-cli": "claude", + gemini: "gemini", + "google-gemini-cli": "gemini", + opencode: "opencode", +}; + +type CovenRuntimeSessionState = { + agent: string; + mode: "prompt" | "steer" | string; + sessionMode?: string; + cwd?: string; +}; + +type CovenAcpRuntimeParams = { + config: ResolvedCovenPluginConfig; + logger?: PluginLogger; + client?: CovenClient; + sleep?: (ms: number, signal?: AbortSignal) => Promise; +}; + +function normalizeAgentId(value: string | undefined): string { + return value?.trim().toLowerCase() || "codex"; +} + +function encodeRuntimeSessionName(state: CovenRuntimeSessionState): string { + return `coven:${Buffer.from(JSON.stringify(state), "utf8").toString("base64url")}`; +} + +function decodeRuntimeSessionName(value: string): CovenRuntimeSessionState | null { + const encoded = value.startsWith("coven:") ? value.slice("coven:".length) : ""; + if (!encoded) { + return null; + } + try { + const parsed = JSON.parse( + Buffer.from(encoded, "base64url").toString("utf8"), + ) as Partial; + const agent = normalizeAgentId(typeof parsed.agent === "string" ? parsed.agent : undefined); + return { + agent, + mode: typeof parsed.mode === "string" ? parsed.mode : "prompt", + ...(typeof parsed.sessionMode === "string" ? { sessionMode: parsed.sessionMode } : {}), + ...(typeof parsed.cwd === "string" && parsed.cwd.trim() ? { cwd: parsed.cwd.trim() } : {}), + }; + } catch { + return null; + } +} + +function defaultSleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(signal.reason ?? new Error("sleep aborted")); + return; + } + const timeout = setTimeout(resolve, ms); + signal?.addEventListener( + "abort", + () => { + clearTimeout(timeout); + reject(signal.reason ?? new Error("sleep aborted")); + }, + { once: true }, + ); + }); +} + +function titleFromPrompt(prompt: string): string { + const compact = prompt.replace(/\s+/g, " ").trim(); + return compact.slice(0, 80) || "OpenClaw task"; +} + +function parsePayload(event: CovenEventRecord): Record { + try { + const parsed = JSON.parse(event.payloadJson) as unknown; + return typeof parsed === "object" && parsed !== null ? (parsed as Record) : {}; + } catch { + return {}; + } +} + +function eventToRuntimeEvents(event: CovenEventRecord): AcpRuntimeEvent[] { + const payload = parsePayload(event); + if (event.kind === "output") { + const text = typeof payload.data === "string" ? payload.data : ""; + return text ? [{ type: "text_delta", text, stream: "output", tag: "agent_message_chunk" }] : []; + } + if (event.kind === "exit") { + const status = typeof payload.status === "string" ? payload.status : "completed"; + const exitCode = typeof payload.exitCode === "number" ? payload.exitCode : null; + return [ + { + type: "status", + text: `coven session ${status}${exitCode == null ? "" : ` exitCode=${exitCode}`}`, + tag: "session_info_update", + }, + { type: "done", stopReason: status }, + ]; + } + if (event.kind === "kill") { + return [ + { type: "status", text: "coven session killed", tag: "session_info_update" }, + { type: "done", stopReason: "killed" }, + ]; + } + return []; +} + +function sessionIsTerminal(session: CovenSessionRecord): boolean { + return session.status !== "running" && session.status !== "created"; +} + +function terminalStatusEvent(session: CovenSessionRecord): AcpRuntimeEvent { + return { + type: "status", + text: `coven session ${session.status}${session.exitCode == null ? "" : ` exitCode=${session.exitCode}`}`, + tag: "session_info_update", + }; +} + +export class CovenAcpRuntime implements AcpRuntime { + private readonly config: ResolvedCovenPluginConfig; + private readonly client: CovenClient; + private readonly logger?: PluginLogger; + private readonly sleep: (ms: number, signal?: AbortSignal) => Promise; + private readonly activeSessionIdsBySessionKey = new Map(); + + constructor(params: CovenAcpRuntimeParams) { + this.config = params.config; + this.logger = params.logger; + this.client = params.client ?? createCovenClient(params.config.socketPath); + this.sleep = params.sleep ?? defaultSleep; + } + + async ensureSession( + input: Parameters[0], + ): Promise { + if (!(await this.isCovenAvailable())) { + return await this.ensureFallbackSession(input); + } + const agent = normalizeAgentId(input.agent); + return { + sessionKey: input.sessionKey, + backend: COVEN_BACKEND_ID, + runtimeSessionName: encodeRuntimeSessionName({ + agent, + mode: "prompt", + sessionMode: input.mode, + ...(input.cwd ? { cwd: input.cwd } : {}), + }), + ...(input.cwd ? { cwd: input.cwd } : {}), + }; + } + + async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable { + if (input.handle.backend !== COVEN_BACKEND_ID) { + yield* this.runFallbackTurn(input, input.handle); + return; + } + const state = decodeRuntimeSessionName(input.handle.runtimeSessionName); + if (!state) { + throw new AcpRuntimeError( + "ACP_SESSION_INIT_FAILED", + "Coven runtime session metadata is missing.", + ); + } + + let session: CovenSessionRecord; + try { + session = await this.client.launchSession( + { + projectRoot: state.cwd ?? input.handle.cwd ?? process.cwd(), + cwd: state.cwd ?? input.handle.cwd ?? process.cwd(), + harness: this.resolveHarness(state.agent), + prompt: input.text, + title: titleFromPrompt(input.text), + }, + input.signal, + ); + } catch (error) { + this.logger?.warn( + `coven launch failed; falling back to ${this.config.fallbackBackend}: ${String(error)}`, + ); + yield* this.runFallbackFromCovenHandle(input, state); + return; + } + + input.handle.backendSessionId = session.id; + input.handle.agentSessionId = session.id; + this.activeSessionIdsBySessionKey.set(input.handle.sessionKey, session.id); + yield { + type: "status", + text: `coven session ${session.id} started (${session.harness})`, + tag: "session_info_update", + }; + + const seenEventIds = new Set(); + while (true) { + if (input.signal?.aborted) { + await this.killActiveSession(session.id, input.signal).catch(() => undefined); + throw input.signal.reason ?? new Error("Coven turn aborted"); + } + + const events = await this.client.listEvents(session.id, input.signal); + for (const event of events) { + if (seenEventIds.has(event.id)) { + continue; + } + seenEventIds.add(event.id); + for (const runtimeEvent of eventToRuntimeEvents(event)) { + yield runtimeEvent; + if (runtimeEvent.type === "done") { + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + return; + } + } + } + + const latest = await this.client.getSession(session.id, input.signal); + if (sessionIsTerminal(latest)) { + yield terminalStatusEvent(latest); + yield { type: "done", stopReason: latest.status }; + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + return; + } + + await this.sleep(this.config.pollIntervalMs, input.signal); + } + } + + getCapabilities() { + return { controls: ["session/status" as const] }; + } + + async getStatus( + input: Parameters>[0], + ): Promise { + if (input.handle.backend !== COVEN_BACKEND_ID) { + const fallback = this.requireFallbackRuntime(input.handle.backend); + return fallback.getStatus + ? await fallback.getStatus(input) + : { summary: `fallback backend ${input.handle.backend} active` }; + } + const sessionId = + input.handle.backendSessionId ?? + this.activeSessionIdsBySessionKey.get(input.handle.sessionKey); + if (!sessionId) { + return { summary: "coven runtime ready" }; + } + const session = await this.client.getSession(sessionId, input.signal); + return { + summary: `${session.status} ${session.harness} ${session.title}`, + backendSessionId: session.id, + agentSessionId: session.id, + details: { + projectRoot: session.projectRoot, + harness: session.harness, + status: session.status, + exitCode: session.exitCode, + }, + }; + } + + async doctor(): Promise { + try { + const health = await this.client.health(); + return health.ok + ? { ok: true, message: "Coven daemon is reachable." } + : { ok: false, code: "COVEN_UNHEALTHY", message: "Coven daemon did not report healthy." }; + } catch (error) { + return { + ok: false, + code: "COVEN_UNAVAILABLE", + message: "Coven daemon is not reachable; direct ACP fallback remains available.", + details: [String(error)], + }; + } + } + + async cancel(input: Parameters[0]): Promise { + if (input.handle.backend !== COVEN_BACKEND_ID) { + await this.requireFallbackRuntime(input.handle.backend).cancel(input); + return; + } + const sessionId = + input.handle.backendSessionId ?? + this.activeSessionIdsBySessionKey.get(input.handle.sessionKey); + if (sessionId) { + await this.killActiveSession(sessionId); + } + } + + async close(input: Parameters[0]): Promise { + if (input.handle.backend !== COVEN_BACKEND_ID) { + await this.requireFallbackRuntime(input.handle.backend).close(input); + return; + } + const sessionId = + input.handle.backendSessionId ?? + this.activeSessionIdsBySessionKey.get(input.handle.sessionKey); + if (sessionId && input.reason !== "oneshot-complete") { + await this.killActiveSession(sessionId).catch(() => undefined); + } + this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey); + } + + async prepareFreshSession(input: { sessionKey: string }): Promise { + this.activeSessionIdsBySessionKey.delete(input.sessionKey); + const fallback = this.getFallbackRuntime(); + await fallback?.prepareFreshSession?.(input); + } + + private async isCovenAvailable(): Promise { + try { + const health = await this.client.health(); + return health.ok === true; + } catch { + return false; + } + } + + private resolveHarness(agent: string): string { + const normalized = normalizeAgentId(agent); + return this.config.harnesses[normalized] ?? DEFAULT_HARNESSES[normalized] ?? normalized; + } + + private getFallbackRuntime(backendId = this.config.fallbackBackend): AcpRuntime | null { + const normalized = backendId.trim().toLowerCase(); + if (!normalized || normalized === COVEN_BACKEND_ID) { + return null; + } + return getAcpRuntimeBackend(normalized)?.runtime ?? null; + } + + private requireFallbackRuntime(backendId = this.config.fallbackBackend): AcpRuntime { + const runtime = this.getFallbackRuntime(backendId); + if (!runtime) { + throw new AcpRuntimeError( + "ACP_BACKEND_UNAVAILABLE", + `Coven fallback ACP backend "${backendId}" is not registered.`, + ); + } + return runtime; + } + + private async ensureFallbackSession( + input: Parameters[0], + ): Promise { + return await this.requireFallbackRuntime().ensureSession(input); + } + + private async *runFallbackTurn( + input: AcpRuntimeTurnInput, + handle: AcpRuntimeHandle, + ): AsyncIterable { + yield* this.requireFallbackRuntime(handle.backend).runTurn({ ...input, handle }); + } + + private async *runFallbackFromCovenHandle( + input: AcpRuntimeTurnInput, + state: CovenRuntimeSessionState, + ): AsyncIterable { + const fallback = this.requireFallbackRuntime(); + const cwd = state.cwd ?? input.handle.cwd; + const handle = await fallback.ensureSession({ + sessionKey: input.handle.sessionKey, + agent: state.agent, + mode: state.sessionMode === "persistent" ? "persistent" : "oneshot", + ...(cwd ? { cwd: path.resolve(cwd) } : {}), + }); + Object.assign(input.handle, handle); + yield* fallback.runTurn({ ...input, handle }); + } + + private async killActiveSession(sessionId: string, signal?: AbortSignal): Promise { + await this.client.killSession(sessionId, signal); + } +} + +export const __testing = { + decodeRuntimeSessionName, + encodeRuntimeSessionName, + eventToRuntimeEvents, +};