From d669b27a45c34131405f884bb2a82d79ca58bfbd Mon Sep 17 00:00:00 2001 From: Onur <2453968+osolmaz@users.noreply.github.com> Date: Sat, 28 Feb 2026 15:35:46 +0100 Subject: [PATCH] ACPX extension: split ACP stream parser and test fixtures --- .../runtime-internals/control-errors.test.ts | 25 ++ .../src/runtime-internals/control-errors.ts | 27 ++ .../acpx/src/runtime-internals/events.test.ts | 80 ++-- .../acpx/src/runtime-internals/events.ts | 202 +++------- .../src/runtime-internals/jsonrpc.test.ts | 67 ++++ .../acpx/src/runtime-internals/jsonrpc.ts | 47 +++ .../src/runtime-internals/test-fixtures.ts | 342 ++++++++++++++++ extensions/acpx/src/runtime.test.ts | 371 ++---------------- extensions/acpx/src/runtime.ts | 17 +- 9 files changed, 634 insertions(+), 544 deletions(-) create mode 100644 extensions/acpx/src/runtime-internals/control-errors.test.ts create mode 100644 extensions/acpx/src/runtime-internals/control-errors.ts create mode 100644 extensions/acpx/src/runtime-internals/jsonrpc.test.ts create mode 100644 extensions/acpx/src/runtime-internals/jsonrpc.ts create mode 100644 extensions/acpx/src/runtime-internals/test-fixtures.ts diff --git a/extensions/acpx/src/runtime-internals/control-errors.test.ts b/extensions/acpx/src/runtime-internals/control-errors.test.ts new file mode 100644 index 00000000000..7af3ddcb265 --- /dev/null +++ b/extensions/acpx/src/runtime-internals/control-errors.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from "vitest"; +import { parseControlJsonError } from "./control-errors.js"; + +describe("parseControlJsonError", () => { + it("reads structured control-command errors", () => { + expect( + parseControlJsonError({ + error: { + code: "NO_SESSION", + message: "No matching session", + retryable: false, + }, + }), + ).toEqual({ + code: "NO_SESSION", + message: "No matching session", + retryable: false, + }); + }); + + it("returns null when payload has no error object", () => { + expect(parseControlJsonError({ action: "session_ensured" })).toBeNull(); + expect(parseControlJsonError("bad")).toBeNull(); + }); +}); diff --git a/extensions/acpx/src/runtime-internals/control-errors.ts b/extensions/acpx/src/runtime-internals/control-errors.ts new file mode 100644 index 00000000000..0f8f49ffc80 --- /dev/null +++ b/extensions/acpx/src/runtime-internals/control-errors.ts @@ -0,0 +1,27 @@ +import { + asOptionalBoolean, + asOptionalString, + asTrimmedString, + type AcpxErrorEvent, + isRecord, +} from "./shared.js"; + +export function parseControlJsonError(value: unknown): AcpxErrorEvent | null { + if (!isRecord(value)) { + return null; + } + const error = isRecord(value.error) ? value.error : null; + if (!error) { + return null; + } + const message = asTrimmedString(error.message) || "acpx reported an error"; + const codeValue = error.code; + return { + message, + code: + typeof codeValue === "number" && Number.isFinite(codeValue) + ? String(codeValue) + : asOptionalString(codeValue), + retryable: asOptionalBoolean(error.retryable), + }; +} diff --git a/extensions/acpx/src/runtime-internals/events.test.ts b/extensions/acpx/src/runtime-internals/events.test.ts index e5688361d43..018d1a3c3c7 100644 --- a/extensions/acpx/src/runtime-internals/events.test.ts +++ b/extensions/acpx/src/runtime-internals/events.test.ts @@ -1,13 +1,14 @@ import { describe, expect, it } from "vitest"; -import { parsePromptEventLine, toAcpxErrorEvent } from "./events.js"; +import { PromptStreamProjector } from "./events.js"; function jsonLine(payload: unknown): string { return JSON.stringify(payload); } -describe("acpx runtime event parsing", () => { +describe("PromptStreamProjector", () => { it("maps agent message chunks to output deltas", () => { - const event = parsePromptEventLine( + const projector = new PromptStreamProjector(); + const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", method: "session/update", @@ -32,7 +33,8 @@ describe("acpx runtime event parsing", () => { }); it("preserves leading spaces in streamed output chunks", () => { - const event = parsePromptEventLine( + const projector = new PromptStreamProjector(); + const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", method: "session/update", @@ -57,7 +59,8 @@ describe("acpx runtime event parsing", () => { }); it("maps agent thought chunks to thought deltas", () => { - const event = parsePromptEventLine( + const projector = new PromptStreamProjector(); + const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", method: "session/update", @@ -82,7 +85,8 @@ describe("acpx runtime event parsing", () => { }); it("maps tool call updates to tool_call events", () => { - const event = parsePromptEventLine( + const projector = new PromptStreamProjector(); + const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", method: "session/update", @@ -105,7 +109,19 @@ describe("acpx runtime event parsing", () => { }); it("maps prompt response stop reasons to done events", () => { - const event = parsePromptEventLine( + const projector = new PromptStreamProjector(); + projector.ingestLine( + jsonLine({ + jsonrpc: "2.0", + id: "req-1", + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hello" }], + }, + }), + ); + const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", id: "req-1", @@ -122,7 +138,19 @@ describe("acpx runtime event parsing", () => { }); it("maps json-rpc errors to runtime errors", () => { - const event = parsePromptEventLine( + const projector = new PromptStreamProjector(); + projector.ingestLine( + jsonLine({ + jsonrpc: "2.0", + id: "req-1", + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hello" }], + }, + }), + ); + const event = projector.ingestLine( jsonLine({ jsonrpc: "2.0", id: "req-1", @@ -137,16 +165,12 @@ describe("acpx runtime event parsing", () => { type: "error", message: "adapter failed", code: "-32000", - retryable: undefined, }); }); - it("ignores non-prompt response errors when parse context is provided", () => { - const context = { - promptRequestIds: new Set(), - }; - - const promptRequest = parsePromptEventLine( + it("ignores non-prompt response errors", () => { + const projector = new PromptStreamProjector(); + projector.ingestLine( jsonLine({ jsonrpc: "2.0", id: 3, @@ -156,9 +180,8 @@ describe("acpx runtime event parsing", () => { prompt: [{ type: "text", text: "hello" }], }, }), - context, ); - const loadError = parsePromptEventLine( + const loadError = projector.ingestLine( jsonLine({ jsonrpc: "2.0", id: 1, @@ -167,9 +190,8 @@ describe("acpx runtime event parsing", () => { message: "Resource not found", }, }), - context, ); - const promptDone = parsePromptEventLine( + const promptDone = projector.ingestLine( jsonLine({ jsonrpc: "2.0", id: 3, @@ -177,10 +199,8 @@ describe("acpx runtime event parsing", () => { stopReason: "end_turn", }, }), - context, ); - expect(promptRequest).toBeNull(); expect(loadError).toBeNull(); expect(promptDone).toEqual({ type: "done", @@ -188,21 +208,3 @@ describe("acpx runtime event parsing", () => { }); }); }); - -describe("toAcpxErrorEvent", () => { - it("reads control command errors from json output", () => { - expect( - toAcpxErrorEvent({ - error: { - code: "NO_SESSION", - message: "No matching session", - retryable: false, - }, - }), - ).toEqual({ - code: "NO_SESSION", - message: "No matching session", - retryable: false, - }); - }); -}); diff --git a/extensions/acpx/src/runtime-internals/events.ts b/extensions/acpx/src/runtime-internals/events.ts index 8d840ad5457..bb13e2f3c78 100644 --- a/extensions/acpx/src/runtime-internals/events.ts +++ b/extensions/acpx/src/runtime-internals/events.ts @@ -1,83 +1,13 @@ import type { AcpRuntimeEvent } from "openclaw/plugin-sdk"; +import { isAcpJsonRpcMessage, normalizeJsonRpcId } from "./jsonrpc.js"; import { - asOptionalBoolean, asOptionalString, asString, asTrimmedString, - type AcpxErrorEvent, type AcpxJsonObject, isRecord, } from "./shared.js"; -type JsonRpcId = string | number | null; - -export type PromptParseContext = { - promptRequestIds: Set; -}; - -function isJsonRpcId(value: unknown): value is JsonRpcId { - return ( - value === null || - typeof value === "string" || - (typeof value === "number" && Number.isFinite(value)) - ); -} - -function normalizeJsonRpcId(value: unknown): string | null { - if (!isJsonRpcId(value) || value == null) { - return null; - } - return String(value); -} - -function isAcpJsonRpcMessage(value: unknown): value is Record { - if (!isRecord(value) || value.jsonrpc !== "2.0") { - return false; - } - - const hasMethod = typeof value.method === "string" && value.method.length > 0; - const hasId = Object.hasOwn(value, "id"); - - if (hasMethod && !hasId) { - return true; - } - - if (hasMethod && hasId) { - return isJsonRpcId(value.id); - } - - if (!hasMethod && hasId) { - if (!isJsonRpcId(value.id)) { - return false; - } - const hasResult = Object.hasOwn(value, "result"); - const hasError = Object.hasOwn(value, "error"); - return hasResult !== hasError; - } - - return false; -} - -export function toAcpxErrorEvent(value: unknown): AcpxErrorEvent | null { - if (!isRecord(value)) { - return null; - } - const error = isRecord(value.error) ? value.error : null; - if (!error) { - return null; - } - const message = asTrimmedString(error.message) || "acpx reported an error"; - const codeValue = error.code; - return { - message, - code: - typeof codeValue === "number" && Number.isFinite(codeValue) - ? String(codeValue) - : asOptionalString(codeValue), - retryable: asOptionalBoolean(error.retryable), - }; -} - export function parseJsonLines(value: string): AcpxJsonObject[] { const events: AcpxJsonObject[] = []; for (const line of value.split(/\r?\n/)) { @@ -228,96 +158,74 @@ function parseSessionUpdateEvent(message: Record): AcpRuntimeEv } } -function shouldHandlePromptResponse(params: { - message: Record; - context?: PromptParseContext; -}): boolean { - const id = normalizeJsonRpcId(params.message.id); - if (!id) { - return false; - } - if (!params.context) { - return true; - } - return params.context.promptRequestIds.has(id); -} +export class PromptStreamProjector { + private readonly promptRequestIds = new Set(); -export function parsePromptEventLine( - line: string, - context?: PromptParseContext, -): AcpRuntimeEvent | null { - const trimmed = line.trim(); - if (!trimmed) { - return null; - } - let parsed: unknown; - try { - parsed = JSON.parse(trimmed); - } catch { - return { - type: "status", - text: trimmed, - }; - } - - if (!isRecord(parsed)) { - return null; - } - - if (!isAcpJsonRpcMessage(parsed)) { - const fallbackError = toAcpxErrorEvent(parsed); - if (fallbackError) { + ingestLine(line: string): AcpRuntimeEvent | null { + const trimmed = line.trim(); + if (!trimmed) { + return null; + } + let parsed: unknown; + try { + parsed = JSON.parse(trimmed); + } catch { return { - type: "error", - message: fallbackError.message, - code: fallbackError.code, - retryable: fallbackError.retryable, + type: "status", + text: trimmed, }; } - return null; - } - const updateEvent = parseSessionUpdateEvent(parsed); - if (updateEvent) { - return updateEvent; - } - - if (asTrimmedString(parsed.method) === "session/prompt") { - const id = normalizeJsonRpcId(parsed.id); - if (id && context) { - context.promptRequestIds.add(id); - } - return null; - } - - if (Object.hasOwn(parsed, "error")) { - if (!shouldHandlePromptResponse({ message: parsed, context })) { + if (!isRecord(parsed) || !isAcpJsonRpcMessage(parsed)) { return null; } - const error = isRecord(parsed.error) ? parsed.error : null; - const message = asTrimmedString(error?.message); - const codeValue = error?.code; - return { - type: "error", - message: message || "acpx runtime error", - code: - typeof codeValue === "number" && Number.isFinite(codeValue) - ? String(codeValue) - : asOptionalString(codeValue), - retryable: asOptionalBoolean(error?.retryable), - }; - } - const stopReason = parsePromptStopReason(parsed); - if (stopReason) { - if (!shouldHandlePromptResponse({ message: parsed, context })) { + const updateEvent = parseSessionUpdateEvent(parsed); + if (updateEvent) { + return updateEvent; + } + + if (asTrimmedString(parsed.method) === "session/prompt") { + const id = normalizeJsonRpcId(parsed.id); + if (id) { + this.promptRequestIds.add(id); + } return null; } + + if (Object.hasOwn(parsed, "error")) { + if (!this.shouldHandlePromptResponse(parsed)) { + return null; + } + const error = isRecord(parsed.error) ? parsed.error : null; + const message = asTrimmedString(error?.message); + const codeValue = error?.code; + return { + type: "error", + message: message || "acpx runtime error", + code: + typeof codeValue === "number" && Number.isFinite(codeValue) + ? String(codeValue) + : asOptionalString(codeValue), + }; + } + + const stopReason = parsePromptStopReason(parsed); + if (!stopReason || !this.shouldHandlePromptResponse(parsed)) { + return null; + } + return { type: "done", stopReason, }; } - return null; + private shouldHandlePromptResponse(message: Record): boolean { + const id = normalizeJsonRpcId(message.id); + if (!id) { + return false; + } + return this.promptRequestIds.has(id); + } } diff --git a/extensions/acpx/src/runtime-internals/jsonrpc.test.ts b/extensions/acpx/src/runtime-internals/jsonrpc.test.ts new file mode 100644 index 00000000000..fcac107320c --- /dev/null +++ b/extensions/acpx/src/runtime-internals/jsonrpc.test.ts @@ -0,0 +1,67 @@ +import { describe, expect, it } from "vitest"; +import { isAcpJsonRpcMessage, isJsonRpcId, normalizeJsonRpcId } from "./jsonrpc.js"; + +describe("jsonrpc helpers", () => { + it("validates json-rpc ids", () => { + expect(isJsonRpcId(null)).toBe(true); + expect(isJsonRpcId("abc")).toBe(true); + expect(isJsonRpcId(12)).toBe(true); + expect(isJsonRpcId(Number.NaN)).toBe(false); + expect(isJsonRpcId({})).toBe(false); + }); + + it("normalizes json-rpc ids", () => { + expect(normalizeJsonRpcId("abc")).toBe("abc"); + expect(normalizeJsonRpcId(12)).toBe("12"); + expect(normalizeJsonRpcId(null)).toBeNull(); + expect(normalizeJsonRpcId(undefined)).toBeNull(); + }); + + it("accepts request, response, and notification shapes", () => { + expect( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + method: "session/prompt", + id: 1, + }), + ).toBe(true); + + expect( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + id: 1, + result: { + stopReason: "end_turn", + }, + }), + ).toBe(true); + + expect( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + method: "session/update", + }), + ).toBe(true); + }); + + it("rejects malformed result/error response shapes", () => { + expect( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + id: 1, + }), + ).toBe(false); + + expect( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + id: 1, + result: {}, + error: { + code: -1, + message: "bad", + }, + }), + ).toBe(false); + }); +}); diff --git a/extensions/acpx/src/runtime-internals/jsonrpc.ts b/extensions/acpx/src/runtime-internals/jsonrpc.ts new file mode 100644 index 00000000000..5779c15e6de --- /dev/null +++ b/extensions/acpx/src/runtime-internals/jsonrpc.ts @@ -0,0 +1,47 @@ +import { isRecord } from "./shared.js"; + +export type JsonRpcId = string | number | null; + +function hasExclusiveResultOrError(value: Record): boolean { + const hasResult = Object.hasOwn(value, "result"); + const hasError = Object.hasOwn(value, "error"); + return hasResult !== hasError; +} + +export function isJsonRpcId(value: unknown): value is JsonRpcId { + return ( + value === null || + typeof value === "string" || + (typeof value === "number" && Number.isFinite(value)) + ); +} + +export function normalizeJsonRpcId(value: unknown): string | null { + if (!isJsonRpcId(value) || value == null) { + return null; + } + return String(value); +} + +export function isAcpJsonRpcMessage(value: unknown): value is Record { + if (!isRecord(value) || value.jsonrpc !== "2.0") { + return false; + } + + const hasMethod = typeof value.method === "string" && value.method.length > 0; + const hasId = Object.hasOwn(value, "id"); + + if (hasMethod && !hasId) { + return true; + } + + if (hasMethod && hasId) { + return isJsonRpcId(value.id); + } + + if (!hasMethod && hasId) { + return isJsonRpcId(value.id) && hasExclusiveResultOrError(value); + } + + return false; +} diff --git a/extensions/acpx/src/runtime-internals/test-fixtures.ts b/extensions/acpx/src/runtime-internals/test-fixtures.ts new file mode 100644 index 00000000000..d056cdc3874 --- /dev/null +++ b/extensions/acpx/src/runtime-internals/test-fixtures.ts @@ -0,0 +1,342 @@ +import fs from "node:fs"; +import { chmod, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { ResolvedAcpxPluginConfig } from "../config.js"; +import { ACPX_PINNED_VERSION } from "../config.js"; +import { AcpxRuntime } from "../runtime.js"; + +export const NOOP_LOGGER = { + info: (_message: string) => {}, + warn: (_message: string) => {}, + error: (_message: string) => {}, + debug: (_message: string) => {}, +}; + +const tempDirs: string[] = []; + +const MOCK_CLI_SCRIPT = String.raw`#!/usr/bin/env node +const fs = require("node:fs"); + +const args = process.argv.slice(2); +const logPath = process.env.MOCK_ACPX_LOG; +const writeLog = (entry) => { + if (!logPath) return; + fs.appendFileSync(logPath, JSON.stringify(entry) + "\n"); +}; +const emitJson = (payload) => process.stdout.write(JSON.stringify(payload) + "\n"); +const emitUpdate = (sessionId, update) => + emitJson({ + jsonrpc: "2.0", + method: "session/update", + params: { sessionId, update }, + }); + +if (args.includes("--version")) { + process.stdout.write("mock-acpx ${ACPX_PINNED_VERSION}\\n"); + process.exit(0); +} + +if (args.includes("--help")) { + process.stdout.write("mock-acpx help\\n"); + process.exit(0); +} + +const commandIndex = args.findIndex( + (arg) => + arg === "prompt" || + arg === "cancel" || + arg === "sessions" || + arg === "set-mode" || + arg === "set" || + arg === "status", +); +const command = commandIndex >= 0 ? args[commandIndex] : ""; +const agent = commandIndex > 0 ? args[commandIndex - 1] : "unknown"; + +const readFlag = (flag) => { + const idx = args.indexOf(flag); + if (idx < 0) return ""; + return String(args[idx + 1] || ""); +}; + +const sessionFromOption = readFlag("--session"); +const ensureName = readFlag("--name"); +const closeName = + command === "sessions" && args[commandIndex + 1] === "close" + ? String(args[commandIndex + 2] || "") + : ""; +const setModeValue = command === "set-mode" ? String(args[commandIndex + 1] || "") : ""; +const setKey = command === "set" ? String(args[commandIndex + 1] || "") : ""; +const setValue = command === "set" ? String(args[commandIndex + 2] || "") : ""; + +if (command === "sessions" && args[commandIndex + 1] === "ensure") { + writeLog({ kind: "ensure", agent, args, sessionName: ensureName }); + emitJson({ + action: "session_ensured", + acpxRecordId: "rec-" + ensureName, + acpxSessionId: "sid-" + ensureName, + agentSessionId: "inner-" + ensureName, + name: ensureName, + created: true, + }); + process.exit(0); +} + +if (command === "cancel") { + writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption }); + emitJson({ + acpxSessionId: "sid-" + sessionFromOption, + cancelled: true, + }); + process.exit(0); +} + +if (command === "set-mode") { + writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue }); + emitJson({ + action: "mode_set", + acpxSessionId: "sid-" + sessionFromOption, + mode: setModeValue, + }); + process.exit(0); +} + +if (command === "set") { + writeLog({ + kind: "set", + agent, + args, + sessionName: sessionFromOption, + key: setKey, + value: setValue, + }); + emitJson({ + action: "config_set", + acpxSessionId: "sid-" + sessionFromOption, + key: setKey, + value: setValue, + }); + process.exit(0); +} + +if (command === "status") { + writeLog({ kind: "status", agent, args, sessionName: sessionFromOption }); + emitJson({ + acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null, + acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null, + agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null, + status: sessionFromOption ? "alive" : "no-session", + pid: 4242, + uptime: 120, + }); + process.exit(0); +} + +if (command === "sessions" && args[commandIndex + 1] === "close") { + writeLog({ kind: "close", agent, args, sessionName: closeName }); + emitJson({ + action: "session_closed", + acpxRecordId: "rec-" + closeName, + acpxSessionId: "sid-" + closeName, + name: closeName, + }); + process.exit(0); +} + +if (command === "prompt") { + const stdinText = fs.readFileSync(0, "utf8"); + writeLog({ kind: "prompt", agent, args, sessionName: sessionFromOption, stdinText }); + const requestId = "req-1"; + + emitJson({ + jsonrpc: "2.0", + id: 0, + method: "session/load", + params: { + sessionId: sessionFromOption, + cwd: process.cwd(), + mcpServers: [], + }, + }); + emitJson({ + jsonrpc: "2.0", + id: 0, + error: { + code: -32002, + message: "Resource not found", + }, + }); + + emitJson({ + jsonrpc: "2.0", + id: requestId, + method: "session/prompt", + params: { + sessionId: sessionFromOption, + prompt: [ + { + type: "text", + text: stdinText.trim(), + }, + ], + }, + }); + + if (stdinText.includes("trigger-error")) { + emitJson({ + jsonrpc: "2.0", + id: requestId, + error: { + code: -32000, + message: "mock failure", + }, + }); + process.exit(1); + } + + if (stdinText.includes("split-spacing")) { + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "alpha" }, + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: " beta" }, + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: " gamma" }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); + process.exit(0); + } + + if (stdinText.includes("double-done")) { + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "ok" }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); + process.exit(0); + } + + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "thinking" }, + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "tool_call", + toolCallId: "tool-1", + title: "run-tests", + status: "in_progress", + kind: "command", + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "echo:" + stdinText.trim() }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); + process.exit(0); +} + +writeLog({ kind: "unknown", args }); +emitJson({ + error: { + code: "USAGE", + message: "unknown command", + }, +}); +process.exit(2); +`; + +export async function createMockRuntimeFixture(params?: { + permissionMode?: ResolvedAcpxPluginConfig["permissionMode"]; + queueOwnerTtlSeconds?: number; +}): Promise<{ + runtime: AcpxRuntime; + logPath: string; + config: ResolvedAcpxPluginConfig; +}> { + const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-acpx-runtime-test-")); + tempDirs.push(dir); + const scriptPath = path.join(dir, "mock-acpx.cjs"); + const logPath = path.join(dir, "calls.log"); + await writeFile(scriptPath, MOCK_CLI_SCRIPT, "utf8"); + await chmod(scriptPath, 0o755); + process.env.MOCK_ACPX_LOG = logPath; + + const config: ResolvedAcpxPluginConfig = { + command: scriptPath, + allowPluginLocalInstall: false, + installCommand: "n/a", + cwd: dir, + permissionMode: params?.permissionMode ?? "approve-all", + nonInteractivePermissions: "fail", + queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds ?? 0.1, + }; + + return { + runtime: new AcpxRuntime(config, { + queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds, + logger: NOOP_LOGGER, + }), + logPath, + config, + }; +} + +export async function readMockRuntimeLogEntries( + logPath: string, +): Promise>> { + if (!fs.existsSync(logPath)) { + return []; + } + const raw = await readFile(logPath, "utf8"); + return raw + .split(/\r?\n/) + .map((line) => line.trim()) + .filter(Boolean) + .map((line) => JSON.parse(line) as Record); +} + +export async function cleanupMockRuntimeFixtures(): Promise { + delete process.env.MOCK_ACPX_LOG; + while (tempDirs.length > 0) { + const dir = tempDirs.pop(); + if (!dir) { + continue; + } + await rm(dir, { + recursive: true, + force: true, + maxRetries: 10, + retryDelay: 10, + }); + } +} diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index d05c4f0c8aa..05c7e65268d 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -1,345 +1,22 @@ -import fs from "node:fs"; -import { chmod, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; import { runAcpRuntimeAdapterContract } from "../../../src/acp/runtime/adapter-contract.testkit.js"; -import { ACPX_PINNED_VERSION, type ResolvedAcpxPluginConfig } from "./config.js"; +import { + cleanupMockRuntimeFixtures, + createMockRuntimeFixture, + NOOP_LOGGER, + readMockRuntimeLogEntries, +} from "./runtime-internals/test-fixtures.js"; import { AcpxRuntime, decodeAcpxRuntimeHandleState } from "./runtime.js"; -const NOOP_LOGGER = { - info: (_message: string) => {}, - warn: (_message: string) => {}, - error: (_message: string) => {}, - debug: (_message: string) => {}, -}; - -const MOCK_CLI_SCRIPT = String.raw`#!/usr/bin/env node -const fs = require("node:fs"); - -const args = process.argv.slice(2); -const logPath = process.env.MOCK_ACPX_LOG; -const writeLog = (entry) => { - if (!logPath) return; - fs.appendFileSync(logPath, JSON.stringify(entry) + "\n"); -}; -const emitJson = (payload) => process.stdout.write(JSON.stringify(payload) + "\n"); -const emitUpdate = (sessionId, update) => - emitJson({ - jsonrpc: "2.0", - method: "session/update", - params: { sessionId, update }, - }); - -if (args.includes("--version")) { - process.stdout.write("mock-acpx ${ACPX_PINNED_VERSION}\\n"); - process.exit(0); -} - -if (args.includes("--help")) { - process.stdout.write("mock-acpx help\\n"); - process.exit(0); -} - -const commandIndex = args.findIndex( - (arg) => - arg === "prompt" || - arg === "cancel" || - arg === "sessions" || - arg === "set-mode" || - arg === "set" || - arg === "status", -); -const command = commandIndex >= 0 ? args[commandIndex] : ""; -const agent = commandIndex > 0 ? args[commandIndex - 1] : "unknown"; - -const readFlag = (flag) => { - const idx = args.indexOf(flag); - if (idx < 0) return ""; - return String(args[idx + 1] || ""); -}; - -const sessionFromOption = readFlag("--session"); -const ensureName = readFlag("--name"); -const closeName = command === "sessions" && args[commandIndex + 1] === "close" ? String(args[commandIndex + 2] || "") : ""; -const setModeValue = command === "set-mode" ? String(args[commandIndex + 1] || "") : ""; -const setKey = command === "set" ? String(args[commandIndex + 1] || "") : ""; -const setValue = command === "set" ? String(args[commandIndex + 2] || "") : ""; - -if (command === "sessions" && args[commandIndex + 1] === "ensure") { - writeLog({ kind: "ensure", agent, args, sessionName: ensureName }); - emitJson({ - action: "session_ensured", - acpxRecordId: "rec-" + ensureName, - acpxSessionId: "sid-" + ensureName, - agentSessionId: "inner-" + ensureName, - name: ensureName, - created: true, - }); - process.exit(0); -} - -if (command === "cancel") { - writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption }); - emitJson({ - acpxSessionId: "sid-" + sessionFromOption, - cancelled: true, - }); - process.exit(0); -} - -if (command === "set-mode") { - writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue }); - emitJson({ - action: "mode_set", - acpxSessionId: "sid-" + sessionFromOption, - mode: setModeValue, - }); - process.exit(0); -} - -if (command === "set") { - writeLog({ - kind: "set", - agent, - args, - sessionName: sessionFromOption, - key: setKey, - value: setValue, - }); - emitJson({ - action: "config_set", - acpxSessionId: "sid-" + sessionFromOption, - key: setKey, - value: setValue, - }); - process.exit(0); -} - -if (command === "status") { - writeLog({ kind: "status", agent, args, sessionName: sessionFromOption }); - emitJson({ - acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null, - acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null, - agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null, - status: sessionFromOption ? "alive" : "no-session", - pid: 4242, - uptime: 120, - }); - process.exit(0); -} - -if (command === "sessions" && args[commandIndex + 1] === "close") { - writeLog({ kind: "close", agent, args, sessionName: closeName }); - emitJson({ - action: "session_closed", - acpxRecordId: "rec-" + closeName, - acpxSessionId: "sid-" + closeName, - name: closeName, - }); - process.exit(0); -} - -if (command === "prompt") { - const stdinText = fs.readFileSync(0, "utf8"); - writeLog({ kind: "prompt", agent, args, sessionName: sessionFromOption, stdinText }); - const requestId = "req-1"; - - emitJson({ - jsonrpc: "2.0", - id: 0, - method: "session/load", - params: { - sessionId: sessionFromOption, - cwd: process.cwd(), - mcpServers: [], - }, - }); - emitJson({ - jsonrpc: "2.0", - id: 0, - error: { - code: -32002, - message: "Resource not found", - }, - }); - - emitJson({ - jsonrpc: "2.0", - id: requestId, - method: "session/prompt", - params: { - sessionId: sessionFromOption, - prompt: [ - { - type: "text", - text: stdinText.trim(), - }, - ], - }, - }); - - if (stdinText.includes("trigger-error")) { - emitJson({ - jsonrpc: "2.0", - id: requestId, - error: { - code: -32000, - message: "mock failure", - }, - }); - process.exit(1); - } - - if (stdinText.includes("split-spacing")) { - emitUpdate(sessionFromOption, { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: "alpha" }, - }); - emitUpdate(sessionFromOption, { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: " beta" }, - }); - emitUpdate(sessionFromOption, { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: " gamma" }, - }); - emitJson({ - jsonrpc: "2.0", - id: requestId, - result: { - stopReason: "end_turn", - }, - }); - process.exit(0); - } - - if (stdinText.includes("double-done")) { - emitUpdate(sessionFromOption, { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: "ok" }, - }); - emitJson({ - jsonrpc: "2.0", - id: requestId, - result: { - stopReason: "end_turn", - }, - }); - emitJson({ - jsonrpc: "2.0", - id: requestId, - result: { - stopReason: "end_turn", - }, - }); - process.exit(0); - } - - emitUpdate(sessionFromOption, { - sessionUpdate: "agent_thought_chunk", - content: { type: "text", text: "thinking" }, - }); - emitUpdate(sessionFromOption, { - sessionUpdate: "tool_call", - toolCallId: "tool-1", - title: "run-tests", - status: "in_progress", - kind: "command", - }); - emitUpdate(sessionFromOption, { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text: "echo:" + stdinText.trim() }, - }); - emitJson({ - jsonrpc: "2.0", - id: requestId, - result: { - stopReason: "end_turn", - }, - }); - process.exit(0); -} - -writeLog({ kind: "unknown", args }); -emitJson({ - error: { - code: "USAGE", - message: "unknown command", - }, -}); -process.exit(2); -`; - -const tempDirs: string[] = []; - -async function createMockRuntime(params?: { - permissionMode?: ResolvedAcpxPluginConfig["permissionMode"]; - queueOwnerTtlSeconds?: number; -}): Promise<{ - runtime: AcpxRuntime; - logPath: string; - config: ResolvedAcpxPluginConfig; -}> { - const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-acpx-runtime-test-")); - tempDirs.push(dir); - const scriptPath = path.join(dir, "mock-acpx.cjs"); - const logPath = path.join(dir, "calls.log"); - await writeFile(scriptPath, MOCK_CLI_SCRIPT, "utf8"); - await chmod(scriptPath, 0o755); - process.env.MOCK_ACPX_LOG = logPath; - - const config: ResolvedAcpxPluginConfig = { - command: scriptPath, - allowPluginLocalInstall: false, - installCommand: "n/a", - cwd: dir, - permissionMode: params?.permissionMode ?? "approve-all", - nonInteractivePermissions: "fail", - queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds ?? 0.1, - }; - - return { - runtime: new AcpxRuntime(config, { - queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds, - logger: NOOP_LOGGER, - }), - logPath, - config, - }; -} - -async function readLogEntries(logPath: string): Promise>> { - if (!fs.existsSync(logPath)) { - return []; - } - const raw = await readFile(logPath, "utf8"); - return raw - .split(/\r?\n/) - .map((line) => line.trim()) - .filter(Boolean) - .map((line) => JSON.parse(line) as Record); -} - afterEach(async () => { - delete process.env.MOCK_ACPX_LOG; - while (tempDirs.length > 0) { - const dir = tempDirs.pop(); - if (!dir) { - continue; - } - await rm(dir, { - recursive: true, - force: true, - maxRetries: 10, - retryDelay: 10, - }); - } + await cleanupMockRuntimeFixtures(); }); describe("AcpxRuntime", () => { it("passes the shared ACP adapter contract suite", async () => { - const fixture = await createMockRuntime(); + const fixture = await createMockRuntimeFixture(); await runAcpRuntimeAdapterContract({ createRuntime: async () => fixture.runtime, agentId: "codex", @@ -353,7 +30,7 @@ describe("AcpxRuntime", () => { }, }); - const logs = await readLogEntries(fixture.logPath); + const logs = await readMockRuntimeLogEntries(fixture.logPath); expect(logs.some((entry) => entry.kind === "ensure")).toBe(true); expect(logs.some((entry) => entry.kind === "status")).toBe(true); expect(logs.some((entry) => entry.kind === "set-mode")).toBe(true); @@ -363,7 +40,7 @@ describe("AcpxRuntime", () => { }); it("ensures sessions and streams prompt events", async () => { - const { runtime, logPath } = await createMockRuntime({ queueOwnerTtlSeconds: 180 }); + const { runtime, logPath } = await createMockRuntimeFixture({ queueOwnerTtlSeconds: 180 }); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:123", @@ -408,7 +85,7 @@ describe("AcpxRuntime", () => { stopReason: "end_turn", }); - const logs = await readLogEntries(logPath); + const logs = await readMockRuntimeLogEntries(logPath); const ensure = logs.find((entry) => entry.kind === "ensure"); const prompt = logs.find((entry) => entry.kind === "prompt"); expect(ensure).toBeDefined(); @@ -421,7 +98,7 @@ describe("AcpxRuntime", () => { }); it("passes a queue-owner TTL by default to avoid long idle stalls", async () => { - const { runtime, logPath } = await createMockRuntime(); + const { runtime, logPath } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:ttl-default", agent: "codex", @@ -437,7 +114,7 @@ describe("AcpxRuntime", () => { // drain } - const logs = await readLogEntries(logPath); + const logs = await readMockRuntimeLogEntries(logPath); const prompt = logs.find((entry) => entry.kind === "prompt"); expect(prompt).toBeDefined(); const promptArgs = (prompt?.args as string[]) ?? []; @@ -447,7 +124,7 @@ describe("AcpxRuntime", () => { }); it("preserves leading spaces across streamed text deltas", async () => { - const { runtime } = await createMockRuntime(); + const { runtime } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:space", agent: "codex", @@ -471,7 +148,7 @@ describe("AcpxRuntime", () => { }); it("emits done once when ACP stream repeats stop reason responses", async () => { - const { runtime } = await createMockRuntime(); + const { runtime } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:double-done", agent: "codex", @@ -493,7 +170,7 @@ describe("AcpxRuntime", () => { }); it("maps acpx error events into ACP runtime error events", async () => { - const { runtime } = await createMockRuntime(); + const { runtime } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:456", agent: "codex", @@ -519,7 +196,7 @@ describe("AcpxRuntime", () => { }); it("supports cancel and close using encoded runtime handle state", async () => { - const { runtime, logPath, config } = await createMockRuntime(); + const { runtime, logPath, config } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:claude:acp:789", agent: "claude", @@ -534,7 +211,7 @@ describe("AcpxRuntime", () => { await secondRuntime.cancel({ handle, reason: "test" }); await secondRuntime.close({ handle, reason: "test" }); - const logs = await readLogEntries(logPath); + const logs = await readMockRuntimeLogEntries(logPath); const cancel = logs.find((entry) => entry.kind === "cancel"); const close = logs.find((entry) => entry.kind === "close"); expect(cancel?.sessionName).toBe("agent:claude:acp:789"); @@ -542,7 +219,7 @@ describe("AcpxRuntime", () => { }); it("exposes control capabilities and runs set-mode/set/status commands", async () => { - const { runtime, logPath } = await createMockRuntime(); + const { runtime, logPath } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:controls", agent: "codex", @@ -574,14 +251,14 @@ describe("AcpxRuntime", () => { expect(status.details?.status).toBe("alive"); expect(status.details?.pid).toBe(4242); - const logs = await readLogEntries(logPath); + const logs = await readMockRuntimeLogEntries(logPath); expect(logs.find((entry) => entry.kind === "set-mode")?.mode).toBe("plan"); expect(logs.find((entry) => entry.kind === "set")?.key).toBe("model"); expect(logs.find((entry) => entry.kind === "status")).toBeDefined(); }); it("skips prompt execution when runTurn starts with an already-aborted signal", async () => { - const { runtime, logPath } = await createMockRuntime(); + const { runtime, logPath } = await createMockRuntimeFixture(); const handle = await runtime.ensureSession({ sessionKey: "agent:codex:acp:aborted", agent: "codex", @@ -601,13 +278,13 @@ describe("AcpxRuntime", () => { events.push(event); } - const logs = await readLogEntries(logPath); + const logs = await readMockRuntimeLogEntries(logPath); expect(events).toEqual([]); expect(logs.some((entry) => entry.kind === "prompt")).toBe(false); }); it("does not mark backend unhealthy when a per-session cwd is missing", async () => { - const { runtime } = await createMockRuntime(); + const { runtime } = await createMockRuntimeFixture(); const missingCwd = path.join(os.tmpdir(), "openclaw-acpx-runtime-test-missing-cwd"); await runtime.probeAvailability(); @@ -646,7 +323,7 @@ describe("AcpxRuntime", () => { }); it("marks runtime healthy when command is available", async () => { - const { runtime } = await createMockRuntime(); + const { runtime } = await createMockRuntimeFixture(); await runtime.probeAvailability(); expect(runtime.isHealthy()).toBe(true); }); diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index c0d2cca94bc..5c9a4d890a2 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -14,11 +14,8 @@ import type { import { AcpRuntimeError } from "openclaw/plugin-sdk"; import { type ResolvedAcpxPluginConfig } from "./config.js"; import { checkAcpxVersion } from "./ensure.js"; -import { - parseJsonLines, - parsePromptEventLine, - toAcpxErrorEvent, -} from "./runtime-internals/events.js"; +import { parseControlJsonError } from "./runtime-internals/control-errors.js"; +import { parseJsonLines, PromptStreamProjector } from "./runtime-internals/events.js"; import { resolveSpawnFailure, spawnAndCollect, @@ -197,9 +194,7 @@ export class AcpxRuntime implements AcpRuntime { sessionName: state.name, cwd: state.cwd, }); - const parseContext = { - promptRequestIds: new Set(), - }; + const projector = new PromptStreamProjector(); const cancelOnAbort = async () => { await this.cancel({ @@ -241,7 +236,7 @@ export class AcpxRuntime implements AcpRuntime { const lines = createInterface({ input: child.stdout }); try { for await (const line of lines) { - const parsed = parsePromptEventLine(line, parseContext); + const parsed = projector.ingestLine(line); if (!parsed) { continue; } @@ -312,7 +307,7 @@ export class AcpxRuntime implements AcpRuntime { fallbackCode: "ACP_TURN_FAILED", ignoreNoSession: true, }); - const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0]; + const detail = events.find((event) => !parseControlJsonError(event)) ?? events[0]; if (!detail) { return { summary: "acpx status unavailable", @@ -558,7 +553,7 @@ export class AcpxRuntime implements AcpRuntime { } const events = parseJsonLines(result.stdout); - const errorEvent = events.map((event) => toAcpxErrorEvent(event)).find(Boolean) ?? null; + const errorEvent = events.map((event) => parseControlJsonError(event)).find(Boolean) ?? null; if (errorEvent) { if (params.ignoreNoSession && errorEvent.code === "NO_SESSION") { return events;