mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:40:44 +00:00
fix(coven): fail closed and verify daemon trust
This commit is contained in:
@@ -220,7 +220,9 @@ Minimal opt-in config:
|
||||
covenHome: "~/.coven",
|
||||
// Optional. Defaults to <covenHome>/coven.sock.
|
||||
socketPath: "~/.coven/coven.sock",
|
||||
// Optional. Used when Coven is unavailable or launch fails.
|
||||
// Optional. Defaults to false; enable only when direct ACP fallback is acceptable.
|
||||
allowFallback: false,
|
||||
// Optional. Used only when allowFallback is true.
|
||||
fallbackBackend: "acpx",
|
||||
},
|
||||
},
|
||||
@@ -232,8 +234,9 @@ Minimal opt-in config:
|
||||
When selected, OpenClaw checks Coven daemon health over the configured Unix
|
||||
socket before launching. A successful launch creates a Coven session and records
|
||||
the Coven session id in the ACP runtime handle. If the health check or launch
|
||||
fails, OpenClaw falls back to the configured direct ACP backend (`acpx` by
|
||||
default) instead of breaking existing ACP behavior.
|
||||
fails, OpenClaw fails closed by default so `acp.backend="coven"` cannot silently
|
||||
downgrade to direct ACP execution. Set `allowFallback: true` only when direct
|
||||
ACP fallback is an explicit, acceptable operator choice.
|
||||
|
||||
For path safety, `~` in `covenHome` and `socketPath` expands to the current
|
||||
user home directory, and configured Coven paths must be absolute after that
|
||||
@@ -242,8 +245,8 @@ daemon socket is a local user trust anchor, not repository-controlled state.
|
||||
`socketPath` must stay inside `covenHome`; use the default
|
||||
`<covenHome>/coven.sock` unless your Coven daemon uses a different socket
|
||||
filename in the same home directory. Keep `covenHome` owned by the OpenClaw user
|
||||
and not group/world-writable; OpenClaw rejects symlinked, shared-writable, or
|
||||
non-socket Coven socket paths before connecting.
|
||||
and private (`0700`); OpenClaw rejects symlinked, shared-accessible,
|
||||
shared-writable, or non-socket Coven socket paths before connecting.
|
||||
|
||||
The default harness mapping sends common ACP agent ids such as `codex`,
|
||||
`claude`, `gemini`, and `opencode` to the matching Coven harness id. Override
|
||||
|
||||
@@ -15,9 +15,13 @@
|
||||
"type": "string",
|
||||
"description": "Path to the Coven daemon Unix socket. Defaults to <covenHome>/coven.sock and must stay inside covenHome."
|
||||
},
|
||||
"allowFallback": {
|
||||
"type": "boolean",
|
||||
"description": "When true, fall back to fallbackBackend if Coven is unavailable or launch fails. Defaults to false."
|
||||
},
|
||||
"fallbackBackend": {
|
||||
"type": "string",
|
||||
"description": "ACP backend to use when Coven is unavailable. Defaults to acpx."
|
||||
"description": "ACP backend to use only when allowFallback is true. Defaults to acpx."
|
||||
},
|
||||
"pollIntervalMs": {
|
||||
"type": "number",
|
||||
|
||||
@@ -126,6 +126,7 @@ describe("createCovenClient", () => {
|
||||
it("revalidates socket paths before connecting", async () => {
|
||||
const covenHome = path.join(tmpDir, ".coven");
|
||||
await fs.mkdir(covenHome);
|
||||
await fs.chmod(covenHome, 0o700);
|
||||
const socketPath = path.join(covenHome, "coven.sock");
|
||||
await fs.symlink("/var/run/docker.sock", socketPath);
|
||||
|
||||
@@ -138,6 +139,7 @@ describe("createCovenClient", () => {
|
||||
const realHome = path.join(tmpDir, "real-coven");
|
||||
const symlinkHome = path.join(tmpDir, "symlink-coven");
|
||||
await fs.mkdir(realHome);
|
||||
await fs.chmod(realHome, 0o700);
|
||||
await fs.symlink(realHome, symlinkHome);
|
||||
|
||||
await expect(
|
||||
@@ -169,6 +171,7 @@ describe("createCovenClient", () => {
|
||||
it("rejects socket paths that are not Unix sockets", async () => {
|
||||
const covenHome = path.join(tmpDir, ".coven");
|
||||
await fs.mkdir(covenHome);
|
||||
await fs.chmod(covenHome, 0o700);
|
||||
const socketPath = path.join(covenHome, "coven.sock");
|
||||
await fs.writeFile(socketPath, "");
|
||||
|
||||
|
||||
@@ -71,6 +71,14 @@ type HttpResponse = {
|
||||
body: string;
|
||||
};
|
||||
|
||||
type SocketFingerprint = {
|
||||
dev: number;
|
||||
ino: number;
|
||||
mode: number;
|
||||
uid: number;
|
||||
gid: number;
|
||||
};
|
||||
|
||||
export class CovenApiError extends Error {
|
||||
readonly status: number;
|
||||
readonly body: string;
|
||||
@@ -103,9 +111,32 @@ function realpathExistingPath(filePath: string, label: string): string {
|
||||
}
|
||||
}
|
||||
|
||||
function validateSocketPathForUse(socketPath: string, socketRoot: string | undefined): void {
|
||||
function fingerprintSocket(stat: fs.Stats): SocketFingerprint {
|
||||
return {
|
||||
dev: stat.dev,
|
||||
ino: stat.ino,
|
||||
mode: stat.mode,
|
||||
uid: stat.uid,
|
||||
gid: stat.gid,
|
||||
};
|
||||
}
|
||||
|
||||
function socketFingerprintMatches(left: SocketFingerprint, right: SocketFingerprint): boolean {
|
||||
return (
|
||||
left.dev === right.dev &&
|
||||
left.ino === right.ino &&
|
||||
left.mode === right.mode &&
|
||||
left.uid === right.uid &&
|
||||
left.gid === right.gid
|
||||
);
|
||||
}
|
||||
|
||||
function validateSocketPathForUse(
|
||||
socketPath: string,
|
||||
socketRoot: string | undefined,
|
||||
): SocketFingerprint | null {
|
||||
if (!socketRoot) {
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
const socketRootLstat = lstatIfExists(socketRoot);
|
||||
if (socketRootLstat?.isSymbolicLink()) {
|
||||
@@ -113,6 +144,7 @@ function validateSocketPathForUse(socketPath: string, socketRoot: string | undef
|
||||
}
|
||||
const socketRootStat = statExistingPath(socketRoot, "Coven covenHome");
|
||||
validateSocketOwnerAndMode(socketRootStat, "Coven covenHome");
|
||||
validatePrivateDirectory(socketRootStat, "Coven covenHome");
|
||||
|
||||
const socketStat = lstatIfExists(socketPath);
|
||||
if (socketStat?.isSymbolicLink()) {
|
||||
@@ -129,6 +161,9 @@ function validateSocketPathForUse(socketPath: string, socketRoot: string | undef
|
||||
path.dirname(socketPath),
|
||||
"Coven socketPath directory",
|
||||
);
|
||||
const socketDirStat = statExistingPath(path.dirname(socketPath), "Coven socketPath directory");
|
||||
validateSocketOwnerAndMode(socketDirStat, "Coven socketPath directory");
|
||||
validatePrivateDirectory(socketDirStat, "Coven socketPath directory");
|
||||
if (!pathIsInside(realSocketRoot, realSocketDir)) {
|
||||
throw new Error("Coven socketPath must stay inside covenHome");
|
||||
}
|
||||
@@ -136,6 +171,7 @@ function validateSocketPathForUse(socketPath: string, socketRoot: string | undef
|
||||
if (!pathIsInside(realSocketRoot, realSocketPath)) {
|
||||
throw new Error("Coven socketPath must stay inside covenHome");
|
||||
}
|
||||
return fingerprintSocket(resolvedSocketStat);
|
||||
}
|
||||
|
||||
function validateSocketOwnerAndMode(stat: fs.Stats, label: string): void {
|
||||
@@ -151,6 +187,18 @@ function validateSocketOwnerAndMode(stat: fs.Stats, label: string): void {
|
||||
}
|
||||
}
|
||||
|
||||
function validatePrivateDirectory(stat: fs.Stats, label: string): void {
|
||||
if (process.platform === "win32") {
|
||||
return;
|
||||
}
|
||||
if (!stat.isDirectory()) {
|
||||
throw new Error(`${label} must be a directory`);
|
||||
}
|
||||
if ((stat.mode & 0o077) !== 0) {
|
||||
throw new Error(`${label} must not be group or world accessible`);
|
||||
}
|
||||
}
|
||||
|
||||
function serializeRequestBody(body: unknown): { text: string; byteLength: number } {
|
||||
if (body === undefined) {
|
||||
return { text: "", byteLength: 0 };
|
||||
@@ -181,8 +229,9 @@ function requestOverSocket(options: RequestOptions): Promise<HttpResponse> {
|
||||
}
|
||||
let requestBody = "";
|
||||
let requestBodyBytes = 0;
|
||||
let socketFingerprint: SocketFingerprint | null = null;
|
||||
try {
|
||||
validateSocketPathForUse(options.socketPath, options.socketRoot);
|
||||
socketFingerprint = validateSocketPathForUse(options.socketPath, options.socketRoot);
|
||||
const serialized = serializeRequestBody(options.body);
|
||||
requestBody = serialized.text;
|
||||
requestBodyBytes = serialized.byteLength;
|
||||
@@ -208,8 +257,23 @@ function requestOverSocket(options: RequestOptions): Promise<HttpResponse> {
|
||||
{
|
||||
createConnection: () => {
|
||||
try {
|
||||
validateSocketPathForUse(options.socketPath, options.socketRoot);
|
||||
return net.createConnection({ path: options.socketPath });
|
||||
const beforeConnect = validateSocketPathForUse(options.socketPath, options.socketRoot);
|
||||
const socket = net.createConnection({ path: options.socketPath });
|
||||
socket.once("connect", () => {
|
||||
try {
|
||||
const afterConnect = validateSocketPathForUse(
|
||||
options.socketPath,
|
||||
options.socketRoot,
|
||||
);
|
||||
const expected = beforeConnect ?? socketFingerprint;
|
||||
if (expected && afterConnect && !socketFingerprintMatches(expected, afterConnect)) {
|
||||
socket.destroy(new Error("Coven socketPath changed during connection"));
|
||||
}
|
||||
} catch (error) {
|
||||
socket.destroy(errorToError(error));
|
||||
}
|
||||
});
|
||||
return socket;
|
||||
} catch (error) {
|
||||
return socketThatFailsWith(error);
|
||||
}
|
||||
|
||||
@@ -103,5 +103,15 @@ describe("resolveCovenPluginConfig", () => {
|
||||
|
||||
expect(resolved.covenHome).toBe(path.join(os.homedir(), ".custom-coven"));
|
||||
expect(resolved.socketPath).toBe(path.join(os.homedir(), ".custom-coven", "coven.sock"));
|
||||
expect(resolved.allowFallback).toBe(false);
|
||||
});
|
||||
|
||||
it("only enables fallback when configured explicitly", () => {
|
||||
const resolved = resolveCovenPluginConfig({
|
||||
rawConfig: { allowFallback: true },
|
||||
workspaceDir: "/repo",
|
||||
});
|
||||
|
||||
expect(resolved.allowFallback).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -7,6 +7,7 @@ import { lstatIfExists, pathIsInside, realpathIfExists } from "./path-utils.js";
|
||||
export type CovenPluginConfig = {
|
||||
covenHome?: string;
|
||||
socketPath?: string;
|
||||
allowFallback?: boolean;
|
||||
fallbackBackend?: string;
|
||||
pollIntervalMs?: number;
|
||||
harnesses?: Record<string, string>;
|
||||
@@ -16,6 +17,7 @@ export type ResolvedCovenPluginConfig = {
|
||||
covenHome: string;
|
||||
socketPath: string;
|
||||
workspaceDir: string;
|
||||
allowFallback: boolean;
|
||||
fallbackBackend: string;
|
||||
pollIntervalMs: number;
|
||||
harnesses: Record<string, string>;
|
||||
@@ -29,6 +31,7 @@ const nonEmptyString = z.string().trim().min(1);
|
||||
export const CovenPluginConfigSchema = z.strictObject({
|
||||
covenHome: nonEmptyString.optional(),
|
||||
socketPath: nonEmptyString.optional(),
|
||||
allowFallback: z.boolean().optional(),
|
||||
fallbackBackend: nonEmptyString.optional(),
|
||||
pollIntervalMs: z.number().min(25).max(10_000).optional(),
|
||||
harnesses: z.record(z.string(), nonEmptyString).optional(),
|
||||
@@ -125,6 +128,7 @@ export function resolveCovenPluginConfig(params: {
|
||||
covenHome,
|
||||
socketPath: resolveSocketPath(covenHome, config.socketPath),
|
||||
workspaceDir,
|
||||
allowFallback: config.allowFallback === true,
|
||||
fallbackBackend: normalizeBackendId(config.fallbackBackend),
|
||||
pollIntervalMs: config.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS,
|
||||
harnesses: normalizeHarnesses(config.harnesses),
|
||||
|
||||
@@ -17,6 +17,7 @@ const baseConfig: ResolvedCovenPluginConfig = {
|
||||
covenHome: "",
|
||||
socketPath: "",
|
||||
workspaceDir: "",
|
||||
allowFallback: false,
|
||||
fallbackBackend: "acpx",
|
||||
pollIntervalMs: 25,
|
||||
harnesses: {},
|
||||
@@ -117,11 +118,31 @@ afterEach(() => {
|
||||
});
|
||||
|
||||
describe("CovenAcpRuntime", () => {
|
||||
it("falls back to the direct ACP backend when Coven is unavailable", async () => {
|
||||
it("fails closed by default when Coven is unavailable", async () => {
|
||||
const runtime = new CovenAcpRuntime({
|
||||
config,
|
||||
client: fakeClient({
|
||||
health: vi.fn(async () => {
|
||||
throw new Error("offline");
|
||||
}),
|
||||
}),
|
||||
});
|
||||
|
||||
await expect(
|
||||
runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: workspaceDir,
|
||||
}),
|
||||
).rejects.toThrow(/fallback is disabled/);
|
||||
});
|
||||
|
||||
it("falls back to the direct ACP backend when Coven is unavailable and fallback is enabled", async () => {
|
||||
const fallback = fallbackRuntime();
|
||||
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
|
||||
const runtime = new CovenAcpRuntime({
|
||||
config,
|
||||
config: { ...config, allowFallback: true },
|
||||
client: fakeClient({
|
||||
health: vi.fn(async () => {
|
||||
throw new Error("offline");
|
||||
@@ -154,7 +175,7 @@ describe("CovenAcpRuntime", () => {
|
||||
}),
|
||||
),
|
||||
});
|
||||
const runtime = new CovenAcpRuntime({ config, client });
|
||||
const runtime = new CovenAcpRuntime({ config: { ...config, allowFallback: true }, client });
|
||||
|
||||
const pending = runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
@@ -205,11 +226,10 @@ describe("CovenAcpRuntime", () => {
|
||||
]);
|
||||
});
|
||||
|
||||
it("sanitizes daemon-controlled session fields in start status", async () => {
|
||||
it("sanitizes daemon-controlled harness fields in start status", async () => {
|
||||
const client = fakeClient({
|
||||
launchSession: vi.fn(async () =>
|
||||
session({
|
||||
id: "\u001b]0;spoof\u0007session-1\r",
|
||||
harness: "\u001b[31mcodex\u001b[0m",
|
||||
}),
|
||||
),
|
||||
@@ -236,11 +256,65 @@ describe("CovenAcpRuntime", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back without launching Coven when prompts exceed the Coven request limit", async () => {
|
||||
it("rejects unsafe daemon-controlled session ids before exposing handle fields", async () => {
|
||||
const client = fakeClient({
|
||||
launchSession: vi.fn(async () =>
|
||||
session({
|
||||
id: "\u001b]0;spoof\u0007session-1\r",
|
||||
}),
|
||||
),
|
||||
});
|
||||
const runtime = new CovenAcpRuntime({ config, client });
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: workspaceDir,
|
||||
});
|
||||
|
||||
await expect(
|
||||
collect(
|
||||
runtime.runTurn({
|
||||
handle,
|
||||
text: "Fix tests",
|
||||
mode: "prompt",
|
||||
requestId: "req-1",
|
||||
}),
|
||||
),
|
||||
).rejects.toThrow(/session id is invalid/);
|
||||
expect(handle.backendSessionId).toBeUndefined();
|
||||
expect(handle.agentSessionId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("fails closed without launching Coven when prompts exceed the Coven request limit", async () => {
|
||||
const client = fakeClient();
|
||||
const runtime = new CovenAcpRuntime({ config, client });
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: workspaceDir,
|
||||
});
|
||||
|
||||
await expect(
|
||||
collect(
|
||||
runtime.runTurn({
|
||||
handle,
|
||||
text: "x".repeat(500_001),
|
||||
mode: "prompt",
|
||||
requestId: "req-1",
|
||||
}),
|
||||
),
|
||||
).rejects.toThrow(/fallback is disabled/);
|
||||
|
||||
expect(client.launchSession).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("falls back on oversized prompts when fallback is explicitly enabled", async () => {
|
||||
const fallback = fallbackRuntime();
|
||||
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
|
||||
const client = fakeClient();
|
||||
const runtime = new CovenAcpRuntime({ config, client });
|
||||
const runtime = new CovenAcpRuntime({ config: { ...config, allowFallback: true }, client });
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
@@ -428,7 +502,7 @@ describe("CovenAcpRuntime", () => {
|
||||
expect(sleep).toHaveBeenCalledWith(25, undefined);
|
||||
});
|
||||
|
||||
it("bounds daemon events processed during one poll cycle", async () => {
|
||||
it("fails the turn when the daemon returns too many events in one poll", async () => {
|
||||
const client = fakeClient({
|
||||
listEvents: vi.fn(async () =>
|
||||
Array.from({ length: 600 }, (_, index) =>
|
||||
@@ -439,7 +513,7 @@ describe("CovenAcpRuntime", () => {
|
||||
}),
|
||||
),
|
||||
),
|
||||
getSession: vi.fn(async () => session({ status: "completed", exitCode: 0 })),
|
||||
killSession: vi.fn(async () => undefined),
|
||||
});
|
||||
const runtime = new CovenAcpRuntime({ config, client });
|
||||
const handle = await runtime.ensureSession({
|
||||
@@ -453,11 +527,12 @@ describe("CovenAcpRuntime", () => {
|
||||
runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" }),
|
||||
);
|
||||
|
||||
const outputEvents = events.filter((item) => item.type === "text_delta");
|
||||
expect(outputEvents).toHaveLength(500);
|
||||
expect(outputEvents).not.toContainEqual(
|
||||
expect.objectContaining({ type: "text_delta", text: "line-500\n" }),
|
||||
);
|
||||
expect(client.killSession).toHaveBeenCalledWith("session-1", undefined);
|
||||
expect(events).toEqual([
|
||||
expect.objectContaining({ type: "status", text: "coven session session-1 started (codex)" }),
|
||||
expect.objectContaining({ type: "status", text: "coven session polling failed" }),
|
||||
expect.objectContaining({ type: "done", stopReason: "error" }),
|
||||
]);
|
||||
});
|
||||
|
||||
it("converts Coven polling failures into controlled terminal events", async () => {
|
||||
@@ -648,7 +723,7 @@ describe("CovenAcpRuntime", () => {
|
||||
const fallback = fallbackRuntime();
|
||||
registerAcpRuntimeBackend({ id: "acpx", runtime: fallback });
|
||||
const runtime = new CovenAcpRuntime({
|
||||
config,
|
||||
config: { ...config, allowFallback: true },
|
||||
client: fakeClient({
|
||||
launchSession: vi.fn(async () => {
|
||||
throw new Error("launch failed");
|
||||
@@ -672,4 +747,25 @@ describe("CovenAcpRuntime", () => {
|
||||
expect.objectContaining({ type: "done", stopReason: "complete" }),
|
||||
]);
|
||||
});
|
||||
|
||||
it("fails closed when Coven launch fails after detection and fallback is disabled", async () => {
|
||||
const runtime = new CovenAcpRuntime({
|
||||
config,
|
||||
client: fakeClient({
|
||||
launchSession: vi.fn(async () => {
|
||||
throw new Error("launch failed");
|
||||
}),
|
||||
}),
|
||||
});
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:test",
|
||||
agent: "codex",
|
||||
mode: "oneshot",
|
||||
cwd: workspaceDir,
|
||||
});
|
||||
|
||||
await expect(
|
||||
collect(runtime.runTurn({ handle, text: "Fix tests", mode: "prompt", requestId: "req-1" })),
|
||||
).rejects.toThrow(/fallback is disabled/);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -44,6 +44,8 @@ const MAX_RUNTIME_SESSION_NAME_BYTES = 2_048;
|
||||
const MAX_RUNTIME_AGENT_CHARS = 128;
|
||||
const MAX_RUNTIME_MODE_CHARS = 32;
|
||||
const MAX_STATUS_FIELD_CHARS = 256;
|
||||
const MAX_SESSION_ID_CHARS = 128;
|
||||
const SAFE_SESSION_ID_REGEX = /^[A-Za-z0-9._:-]+$/;
|
||||
|
||||
type CovenRuntimeSessionState = {
|
||||
agent: string;
|
||||
@@ -183,6 +185,14 @@ function sanitizeStatusField(input: string, fallback = "unknown"): string {
|
||||
return sanitizeStatusText(input).slice(0, MAX_STATUS_FIELD_CHARS) || fallback;
|
||||
}
|
||||
|
||||
function requireSafeSessionId(input: string): string {
|
||||
const value = input.trim();
|
||||
if (!value || value.length > MAX_SESSION_ID_CHARS || !SAFE_SESSION_ID_REGEX.test(value)) {
|
||||
throw new Error("Coven session id is invalid");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
function boundedCovenPrompt(input: string): string {
|
||||
if (Buffer.byteLength(input, "utf8") > MAX_COVEN_PROMPT_BYTES) {
|
||||
throw new Error("Coven prompt exceeded size limit");
|
||||
@@ -278,6 +288,12 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
input: Parameters<AcpRuntime["ensureSession"]>[0],
|
||||
): Promise<AcpRuntimeHandle> {
|
||||
if (!(await this.isCovenAvailable())) {
|
||||
if (!this.config.allowFallback) {
|
||||
throw new AcpRuntimeError(
|
||||
"ACP_BACKEND_UNAVAILABLE",
|
||||
"Coven is unavailable and fallback is disabled.",
|
||||
);
|
||||
}
|
||||
return await this.ensureFallbackSession(input);
|
||||
}
|
||||
const agent = normalizeAgentId(input.agent);
|
||||
@@ -308,6 +324,7 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
|
||||
const cwd = this.resolveWorkspaceCwd(input.handle.cwd);
|
||||
let session: CovenSessionRecord;
|
||||
let sessionId: string;
|
||||
try {
|
||||
const prompt = boundedCovenPrompt(input.text);
|
||||
session = await this.client.launchSession(
|
||||
@@ -320,7 +337,15 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
},
|
||||
input.signal,
|
||||
);
|
||||
sessionId = requireSafeSessionId(session.id);
|
||||
} catch (error) {
|
||||
if (!this.config.allowFallback) {
|
||||
throw new AcpRuntimeError(
|
||||
"ACP_TURN_FAILED",
|
||||
`Coven launch failed and fallback is disabled: ${String(error)}`,
|
||||
{ cause: error },
|
||||
);
|
||||
}
|
||||
this.logger?.warn(
|
||||
`coven launch failed; falling back to ${this.config.fallbackBackend}: ${String(error)}`,
|
||||
);
|
||||
@@ -328,12 +353,12 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
return;
|
||||
}
|
||||
|
||||
input.handle.backendSessionId = session.id;
|
||||
input.handle.agentSessionId = session.id;
|
||||
this.activeSessionIdsBySessionKey.set(input.handle.sessionKey, session.id);
|
||||
input.handle.backendSessionId = sessionId;
|
||||
input.handle.agentSessionId = sessionId;
|
||||
this.activeSessionIdsBySessionKey.set(input.handle.sessionKey, sessionId);
|
||||
yield {
|
||||
type: "status",
|
||||
text: `coven session ${sanitizeStatusField(session.id)} started (${sanitizeStatusField(session.harness)})`,
|
||||
text: `coven session ${sessionId} started (${sanitizeStatusField(session.harness)})`,
|
||||
tag: "session_info_update",
|
||||
};
|
||||
|
||||
@@ -342,29 +367,23 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
let lastSeenEventId: string | undefined;
|
||||
while (true) {
|
||||
if (input.signal?.aborted) {
|
||||
await this.killActiveSession(session.id).catch(() => undefined);
|
||||
await this.killActiveSession(sessionId).catch(() => undefined);
|
||||
throw input.signal.reason ?? new Error("Coven turn aborted");
|
||||
}
|
||||
|
||||
try {
|
||||
const events = await this.client.listEvents(
|
||||
session.id,
|
||||
sessionId,
|
||||
lastSeenEventId ? { afterEventId: lastSeenEventId } : undefined,
|
||||
input.signal,
|
||||
);
|
||||
const cursorIndex = lastSeenEventId
|
||||
? events.findIndex((event) => event.id === lastSeenEventId)
|
||||
: -1;
|
||||
const nextEvents = cursorIndex >= 0 ? events.slice(cursorIndex + 1) : events;
|
||||
let processedEvents = 0;
|
||||
for (const event of nextEvents) {
|
||||
if (events.length > MAX_EVENTS_PER_POLL) {
|
||||
throw new Error("Coven daemon returned too many events");
|
||||
}
|
||||
for (const event of events) {
|
||||
if (seenEventIds.has(event.id)) {
|
||||
continue;
|
||||
}
|
||||
if (processedEvents >= MAX_EVENTS_PER_POLL) {
|
||||
break;
|
||||
}
|
||||
processedEvents += 1;
|
||||
seenEventIds.add(event.id);
|
||||
seenEventQueue.push(event.id);
|
||||
while (seenEventQueue.length > MAX_TRACKED_EVENT_IDS) {
|
||||
@@ -383,7 +402,7 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
const latest = await this.client.getSession(session.id, input.signal);
|
||||
const latest = await this.client.getSession(sessionId, input.signal);
|
||||
if (sessionIsTerminal(latest)) {
|
||||
yield terminalStatusEvent(latest);
|
||||
yield { type: "done", stopReason: normalizeStopReason(latest.status) };
|
||||
@@ -392,11 +411,11 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
}
|
||||
} catch (error) {
|
||||
if (input.signal?.aborted) {
|
||||
await this.killActiveSession(session.id).catch(() => undefined);
|
||||
await this.killActiveSession(sessionId).catch(() => undefined);
|
||||
throw input.signal.reason ?? error;
|
||||
}
|
||||
this.logger?.warn(`coven polling failed: ${String(error)}`);
|
||||
await this.killActiveSession(session.id).catch(() => undefined);
|
||||
await this.killActiveSession(sessionId).catch(() => undefined);
|
||||
this.activeSessionIdsBySessionKey.delete(input.handle.sessionKey);
|
||||
yield { type: "status", text: "coven session polling failed", tag: "session_info_update" };
|
||||
yield { type: "done", stopReason: "error" };
|
||||
@@ -424,14 +443,15 @@ export class CovenAcpRuntime implements AcpRuntime {
|
||||
if (!sessionId) {
|
||||
return { summary: "coven runtime ready" };
|
||||
}
|
||||
const session = await this.client.getSession(sessionId, input.signal);
|
||||
const status = sanitizeStatusField(session.status, "completed");
|
||||
const harness = sanitizeStatusField(session.harness);
|
||||
const title = sanitizeStatusField(session.title, "untitled");
|
||||
const sessionId = sanitizeStatusField(session.id);
|
||||
return {
|
||||
summary: `${status} ${harness} ${title}`,
|
||||
backendSessionId: sessionId,
|
||||
agentSessionId: sessionId,
|
||||
details: {
|
||||
projectRoot: sanitizeStatusField(session.projectRoot),
|
||||
harness,
|
||||
status,
|
||||
|
||||
Reference in New Issue
Block a user