ACPX extension: split ACP stream parser and test fixtures

This commit is contained in:
Onur
2026-02-28 15:35:46 +01:00
committed by Onur Solmaz
parent bdc355d0b0
commit d669b27a45
9 changed files with 634 additions and 544 deletions

View File

@@ -0,0 +1,25 @@
import { describe, expect, it } from "vitest";
import { parseControlJsonError } from "./control-errors.js";
describe("parseControlJsonError", () => {
it("reads structured control-command errors", () => {
expect(
parseControlJsonError({
error: {
code: "NO_SESSION",
message: "No matching session",
retryable: false,
},
}),
).toEqual({
code: "NO_SESSION",
message: "No matching session",
retryable: false,
});
});
it("returns null when payload has no error object", () => {
expect(parseControlJsonError({ action: "session_ensured" })).toBeNull();
expect(parseControlJsonError("bad")).toBeNull();
});
});

View File

@@ -0,0 +1,27 @@
import {
asOptionalBoolean,
asOptionalString,
asTrimmedString,
type AcpxErrorEvent,
isRecord,
} from "./shared.js";
export function parseControlJsonError(value: unknown): AcpxErrorEvent | null {
if (!isRecord(value)) {
return null;
}
const error = isRecord(value.error) ? value.error : null;
if (!error) {
return null;
}
const message = asTrimmedString(error.message) || "acpx reported an error";
const codeValue = error.code;
return {
message,
code:
typeof codeValue === "number" && Number.isFinite(codeValue)
? String(codeValue)
: asOptionalString(codeValue),
retryable: asOptionalBoolean(error.retryable),
};
}

View File

@@ -1,13 +1,14 @@
import { describe, expect, it } from "vitest";
import { parsePromptEventLine, toAcpxErrorEvent } from "./events.js";
import { PromptStreamProjector } from "./events.js";
function jsonLine(payload: unknown): string {
return JSON.stringify(payload);
}
describe("acpx runtime event parsing", () => {
describe("PromptStreamProjector", () => {
it("maps agent message chunks to output deltas", () => {
const event = parsePromptEventLine(
const projector = new PromptStreamProjector();
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
@@ -32,7 +33,8 @@ describe("acpx runtime event parsing", () => {
});
it("preserves leading spaces in streamed output chunks", () => {
const event = parsePromptEventLine(
const projector = new PromptStreamProjector();
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
@@ -57,7 +59,8 @@ describe("acpx runtime event parsing", () => {
});
it("maps agent thought chunks to thought deltas", () => {
const event = parsePromptEventLine(
const projector = new PromptStreamProjector();
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
@@ -82,7 +85,8 @@ describe("acpx runtime event parsing", () => {
});
it("maps tool call updates to tool_call events", () => {
const event = parsePromptEventLine(
const projector = new PromptStreamProjector();
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
method: "session/update",
@@ -105,7 +109,19 @@ describe("acpx runtime event parsing", () => {
});
it("maps prompt response stop reasons to done events", () => {
const event = parsePromptEventLine(
const projector = new PromptStreamProjector();
projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: "req-1",
method: "session/prompt",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
},
}),
);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: "req-1",
@@ -122,7 +138,19 @@ describe("acpx runtime event parsing", () => {
});
it("maps json-rpc errors to runtime errors", () => {
const event = parsePromptEventLine(
const projector = new PromptStreamProjector();
projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: "req-1",
method: "session/prompt",
params: {
sessionId: "session-1",
prompt: [{ type: "text", text: "hello" }],
},
}),
);
const event = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: "req-1",
@@ -137,16 +165,12 @@ describe("acpx runtime event parsing", () => {
type: "error",
message: "adapter failed",
code: "-32000",
retryable: undefined,
});
});
it("ignores non-prompt response errors when parse context is provided", () => {
const context = {
promptRequestIds: new Set<string>(),
};
const promptRequest = parsePromptEventLine(
it("ignores non-prompt response errors", () => {
const projector = new PromptStreamProjector();
projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: 3,
@@ -156,9 +180,8 @@ describe("acpx runtime event parsing", () => {
prompt: [{ type: "text", text: "hello" }],
},
}),
context,
);
const loadError = parsePromptEventLine(
const loadError = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: 1,
@@ -167,9 +190,8 @@ describe("acpx runtime event parsing", () => {
message: "Resource not found",
},
}),
context,
);
const promptDone = parsePromptEventLine(
const promptDone = projector.ingestLine(
jsonLine({
jsonrpc: "2.0",
id: 3,
@@ -177,10 +199,8 @@ describe("acpx runtime event parsing", () => {
stopReason: "end_turn",
},
}),
context,
);
expect(promptRequest).toBeNull();
expect(loadError).toBeNull();
expect(promptDone).toEqual({
type: "done",
@@ -188,21 +208,3 @@ describe("acpx runtime event parsing", () => {
});
});
});
describe("toAcpxErrorEvent", () => {
it("reads control command errors from json output", () => {
expect(
toAcpxErrorEvent({
error: {
code: "NO_SESSION",
message: "No matching session",
retryable: false,
},
}),
).toEqual({
code: "NO_SESSION",
message: "No matching session",
retryable: false,
});
});
});

View File

@@ -1,83 +1,13 @@
import type { AcpRuntimeEvent } from "openclaw/plugin-sdk";
import { isAcpJsonRpcMessage, normalizeJsonRpcId } from "./jsonrpc.js";
import {
asOptionalBoolean,
asOptionalString,
asString,
asTrimmedString,
type AcpxErrorEvent,
type AcpxJsonObject,
isRecord,
} from "./shared.js";
type JsonRpcId = string | number | null;
export type PromptParseContext = {
promptRequestIds: Set<string>;
};
function isJsonRpcId(value: unknown): value is JsonRpcId {
return (
value === null ||
typeof value === "string" ||
(typeof value === "number" && Number.isFinite(value))
);
}
function normalizeJsonRpcId(value: unknown): string | null {
if (!isJsonRpcId(value) || value == null) {
return null;
}
return String(value);
}
function isAcpJsonRpcMessage(value: unknown): value is Record<string, unknown> {
if (!isRecord(value) || value.jsonrpc !== "2.0") {
return false;
}
const hasMethod = typeof value.method === "string" && value.method.length > 0;
const hasId = Object.hasOwn(value, "id");
if (hasMethod && !hasId) {
return true;
}
if (hasMethod && hasId) {
return isJsonRpcId(value.id);
}
if (!hasMethod && hasId) {
if (!isJsonRpcId(value.id)) {
return false;
}
const hasResult = Object.hasOwn(value, "result");
const hasError = Object.hasOwn(value, "error");
return hasResult !== hasError;
}
return false;
}
export function toAcpxErrorEvent(value: unknown): AcpxErrorEvent | null {
if (!isRecord(value)) {
return null;
}
const error = isRecord(value.error) ? value.error : null;
if (!error) {
return null;
}
const message = asTrimmedString(error.message) || "acpx reported an error";
const codeValue = error.code;
return {
message,
code:
typeof codeValue === "number" && Number.isFinite(codeValue)
? String(codeValue)
: asOptionalString(codeValue),
retryable: asOptionalBoolean(error.retryable),
};
}
export function parseJsonLines(value: string): AcpxJsonObject[] {
const events: AcpxJsonObject[] = [];
for (const line of value.split(/\r?\n/)) {
@@ -228,96 +158,74 @@ function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEv
}
}
function shouldHandlePromptResponse(params: {
message: Record<string, unknown>;
context?: PromptParseContext;
}): boolean {
const id = normalizeJsonRpcId(params.message.id);
if (!id) {
return false;
}
if (!params.context) {
return true;
}
return params.context.promptRequestIds.has(id);
}
export class PromptStreamProjector {
private readonly promptRequestIds = new Set<string>();
export function parsePromptEventLine(
line: string,
context?: PromptParseContext,
): AcpRuntimeEvent | null {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return {
type: "status",
text: trimmed,
};
}
if (!isRecord(parsed)) {
return null;
}
if (!isAcpJsonRpcMessage(parsed)) {
const fallbackError = toAcpxErrorEvent(parsed);
if (fallbackError) {
ingestLine(line: string): AcpRuntimeEvent | null {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
let parsed: unknown;
try {
parsed = JSON.parse(trimmed);
} catch {
return {
type: "error",
message: fallbackError.message,
code: fallbackError.code,
retryable: fallbackError.retryable,
type: "status",
text: trimmed,
};
}
return null;
}
const updateEvent = parseSessionUpdateEvent(parsed);
if (updateEvent) {
return updateEvent;
}
if (asTrimmedString(parsed.method) === "session/prompt") {
const id = normalizeJsonRpcId(parsed.id);
if (id && context) {
context.promptRequestIds.add(id);
}
return null;
}
if (Object.hasOwn(parsed, "error")) {
if (!shouldHandlePromptResponse({ message: parsed, context })) {
if (!isRecord(parsed) || !isAcpJsonRpcMessage(parsed)) {
return null;
}
const error = isRecord(parsed.error) ? parsed.error : null;
const message = asTrimmedString(error?.message);
const codeValue = error?.code;
return {
type: "error",
message: message || "acpx runtime error",
code:
typeof codeValue === "number" && Number.isFinite(codeValue)
? String(codeValue)
: asOptionalString(codeValue),
retryable: asOptionalBoolean(error?.retryable),
};
}
const stopReason = parsePromptStopReason(parsed);
if (stopReason) {
if (!shouldHandlePromptResponse({ message: parsed, context })) {
const updateEvent = parseSessionUpdateEvent(parsed);
if (updateEvent) {
return updateEvent;
}
if (asTrimmedString(parsed.method) === "session/prompt") {
const id = normalizeJsonRpcId(parsed.id);
if (id) {
this.promptRequestIds.add(id);
}
return null;
}
if (Object.hasOwn(parsed, "error")) {
if (!this.shouldHandlePromptResponse(parsed)) {
return null;
}
const error = isRecord(parsed.error) ? parsed.error : null;
const message = asTrimmedString(error?.message);
const codeValue = error?.code;
return {
type: "error",
message: message || "acpx runtime error",
code:
typeof codeValue === "number" && Number.isFinite(codeValue)
? String(codeValue)
: asOptionalString(codeValue),
};
}
const stopReason = parsePromptStopReason(parsed);
if (!stopReason || !this.shouldHandlePromptResponse(parsed)) {
return null;
}
return {
type: "done",
stopReason,
};
}
return null;
private shouldHandlePromptResponse(message: Record<string, unknown>): boolean {
const id = normalizeJsonRpcId(message.id);
if (!id) {
return false;
}
return this.promptRequestIds.has(id);
}
}

View File

@@ -0,0 +1,67 @@
import { describe, expect, it } from "vitest";
import { isAcpJsonRpcMessage, isJsonRpcId, normalizeJsonRpcId } from "./jsonrpc.js";
describe("jsonrpc helpers", () => {
it("validates json-rpc ids", () => {
expect(isJsonRpcId(null)).toBe(true);
expect(isJsonRpcId("abc")).toBe(true);
expect(isJsonRpcId(12)).toBe(true);
expect(isJsonRpcId(Number.NaN)).toBe(false);
expect(isJsonRpcId({})).toBe(false);
});
it("normalizes json-rpc ids", () => {
expect(normalizeJsonRpcId("abc")).toBe("abc");
expect(normalizeJsonRpcId(12)).toBe("12");
expect(normalizeJsonRpcId(null)).toBeNull();
expect(normalizeJsonRpcId(undefined)).toBeNull();
});
it("accepts request, response, and notification shapes", () => {
expect(
isAcpJsonRpcMessage({
jsonrpc: "2.0",
method: "session/prompt",
id: 1,
}),
).toBe(true);
expect(
isAcpJsonRpcMessage({
jsonrpc: "2.0",
id: 1,
result: {
stopReason: "end_turn",
},
}),
).toBe(true);
expect(
isAcpJsonRpcMessage({
jsonrpc: "2.0",
method: "session/update",
}),
).toBe(true);
});
it("rejects malformed result/error response shapes", () => {
expect(
isAcpJsonRpcMessage({
jsonrpc: "2.0",
id: 1,
}),
).toBe(false);
expect(
isAcpJsonRpcMessage({
jsonrpc: "2.0",
id: 1,
result: {},
error: {
code: -1,
message: "bad",
},
}),
).toBe(false);
});
});

View File

@@ -0,0 +1,47 @@
import { isRecord } from "./shared.js";
export type JsonRpcId = string | number | null;
function hasExclusiveResultOrError(value: Record<string, unknown>): boolean {
const hasResult = Object.hasOwn(value, "result");
const hasError = Object.hasOwn(value, "error");
return hasResult !== hasError;
}
export function isJsonRpcId(value: unknown): value is JsonRpcId {
return (
value === null ||
typeof value === "string" ||
(typeof value === "number" && Number.isFinite(value))
);
}
export function normalizeJsonRpcId(value: unknown): string | null {
if (!isJsonRpcId(value) || value == null) {
return null;
}
return String(value);
}
export function isAcpJsonRpcMessage(value: unknown): value is Record<string, unknown> {
if (!isRecord(value) || value.jsonrpc !== "2.0") {
return false;
}
const hasMethod = typeof value.method === "string" && value.method.length > 0;
const hasId = Object.hasOwn(value, "id");
if (hasMethod && !hasId) {
return true;
}
if (hasMethod && hasId) {
return isJsonRpcId(value.id);
}
if (!hasMethod && hasId) {
return isJsonRpcId(value.id) && hasExclusiveResultOrError(value);
}
return false;
}

View File

@@ -0,0 +1,342 @@
import fs from "node:fs";
import { chmod, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { ResolvedAcpxPluginConfig } from "../config.js";
import { ACPX_PINNED_VERSION } from "../config.js";
import { AcpxRuntime } from "../runtime.js";
export const NOOP_LOGGER = {
info: (_message: string) => {},
warn: (_message: string) => {},
error: (_message: string) => {},
debug: (_message: string) => {},
};
const tempDirs: string[] = [];
const MOCK_CLI_SCRIPT = String.raw`#!/usr/bin/env node
const fs = require("node:fs");
const args = process.argv.slice(2);
const logPath = process.env.MOCK_ACPX_LOG;
const writeLog = (entry) => {
if (!logPath) return;
fs.appendFileSync(logPath, JSON.stringify(entry) + "\n");
};
const emitJson = (payload) => process.stdout.write(JSON.stringify(payload) + "\n");
const emitUpdate = (sessionId, update) =>
emitJson({
jsonrpc: "2.0",
method: "session/update",
params: { sessionId, update },
});
if (args.includes("--version")) {
process.stdout.write("mock-acpx ${ACPX_PINNED_VERSION}\\n");
process.exit(0);
}
if (args.includes("--help")) {
process.stdout.write("mock-acpx help\\n");
process.exit(0);
}
const commandIndex = args.findIndex(
(arg) =>
arg === "prompt" ||
arg === "cancel" ||
arg === "sessions" ||
arg === "set-mode" ||
arg === "set" ||
arg === "status",
);
const command = commandIndex >= 0 ? args[commandIndex] : "";
const agent = commandIndex > 0 ? args[commandIndex - 1] : "unknown";
const readFlag = (flag) => {
const idx = args.indexOf(flag);
if (idx < 0) return "";
return String(args[idx + 1] || "");
};
const sessionFromOption = readFlag("--session");
const ensureName = readFlag("--name");
const closeName =
command === "sessions" && args[commandIndex + 1] === "close"
? String(args[commandIndex + 2] || "")
: "";
const setModeValue = command === "set-mode" ? String(args[commandIndex + 1] || "") : "";
const setKey = command === "set" ? String(args[commandIndex + 1] || "") : "";
const setValue = command === "set" ? String(args[commandIndex + 2] || "") : "";
if (command === "sessions" && args[commandIndex + 1] === "ensure") {
writeLog({ kind: "ensure", agent, args, sessionName: ensureName });
emitJson({
action: "session_ensured",
acpxRecordId: "rec-" + ensureName,
acpxSessionId: "sid-" + ensureName,
agentSessionId: "inner-" + ensureName,
name: ensureName,
created: true,
});
process.exit(0);
}
if (command === "cancel") {
writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption });
emitJson({
acpxSessionId: "sid-" + sessionFromOption,
cancelled: true,
});
process.exit(0);
}
if (command === "set-mode") {
writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue });
emitJson({
action: "mode_set",
acpxSessionId: "sid-" + sessionFromOption,
mode: setModeValue,
});
process.exit(0);
}
if (command === "set") {
writeLog({
kind: "set",
agent,
args,
sessionName: sessionFromOption,
key: setKey,
value: setValue,
});
emitJson({
action: "config_set",
acpxSessionId: "sid-" + sessionFromOption,
key: setKey,
value: setValue,
});
process.exit(0);
}
if (command === "status") {
writeLog({ kind: "status", agent, args, sessionName: sessionFromOption });
emitJson({
acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null,
acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null,
agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null,
status: sessionFromOption ? "alive" : "no-session",
pid: 4242,
uptime: 120,
});
process.exit(0);
}
if (command === "sessions" && args[commandIndex + 1] === "close") {
writeLog({ kind: "close", agent, args, sessionName: closeName });
emitJson({
action: "session_closed",
acpxRecordId: "rec-" + closeName,
acpxSessionId: "sid-" + closeName,
name: closeName,
});
process.exit(0);
}
if (command === "prompt") {
const stdinText = fs.readFileSync(0, "utf8");
writeLog({ kind: "prompt", agent, args, sessionName: sessionFromOption, stdinText });
const requestId = "req-1";
emitJson({
jsonrpc: "2.0",
id: 0,
method: "session/load",
params: {
sessionId: sessionFromOption,
cwd: process.cwd(),
mcpServers: [],
},
});
emitJson({
jsonrpc: "2.0",
id: 0,
error: {
code: -32002,
message: "Resource not found",
},
});
emitJson({
jsonrpc: "2.0",
id: requestId,
method: "session/prompt",
params: {
sessionId: sessionFromOption,
prompt: [
{
type: "text",
text: stdinText.trim(),
},
],
},
});
if (stdinText.includes("trigger-error")) {
emitJson({
jsonrpc: "2.0",
id: requestId,
error: {
code: -32000,
message: "mock failure",
},
});
process.exit(1);
}
if (stdinText.includes("split-spacing")) {
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "alpha" },
});
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " beta" },
});
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " gamma" },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
process.exit(0);
}
if (stdinText.includes("double-done")) {
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "ok" },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
process.exit(0);
}
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "thinking" },
});
emitUpdate(sessionFromOption, {
sessionUpdate: "tool_call",
toolCallId: "tool-1",
title: "run-tests",
status: "in_progress",
kind: "command",
});
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "echo:" + stdinText.trim() },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
process.exit(0);
}
writeLog({ kind: "unknown", args });
emitJson({
error: {
code: "USAGE",
message: "unknown command",
},
});
process.exit(2);
`;
export async function createMockRuntimeFixture(params?: {
permissionMode?: ResolvedAcpxPluginConfig["permissionMode"];
queueOwnerTtlSeconds?: number;
}): Promise<{
runtime: AcpxRuntime;
logPath: string;
config: ResolvedAcpxPluginConfig;
}> {
const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-acpx-runtime-test-"));
tempDirs.push(dir);
const scriptPath = path.join(dir, "mock-acpx.cjs");
const logPath = path.join(dir, "calls.log");
await writeFile(scriptPath, MOCK_CLI_SCRIPT, "utf8");
await chmod(scriptPath, 0o755);
process.env.MOCK_ACPX_LOG = logPath;
const config: ResolvedAcpxPluginConfig = {
command: scriptPath,
allowPluginLocalInstall: false,
installCommand: "n/a",
cwd: dir,
permissionMode: params?.permissionMode ?? "approve-all",
nonInteractivePermissions: "fail",
queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds ?? 0.1,
};
return {
runtime: new AcpxRuntime(config, {
queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds,
logger: NOOP_LOGGER,
}),
logPath,
config,
};
}
export async function readMockRuntimeLogEntries(
logPath: string,
): Promise<Array<Record<string, unknown>>> {
if (!fs.existsSync(logPath)) {
return [];
}
const raw = await readFile(logPath, "utf8");
return raw
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean)
.map((line) => JSON.parse(line) as Record<string, unknown>);
}
export async function cleanupMockRuntimeFixtures(): Promise<void> {
delete process.env.MOCK_ACPX_LOG;
while (tempDirs.length > 0) {
const dir = tempDirs.pop();
if (!dir) {
continue;
}
await rm(dir, {
recursive: true,
force: true,
maxRetries: 10,
retryDelay: 10,
});
}
}

View File

@@ -1,345 +1,22 @@
import fs from "node:fs";
import { chmod, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { runAcpRuntimeAdapterContract } from "../../../src/acp/runtime/adapter-contract.testkit.js";
import { ACPX_PINNED_VERSION, type ResolvedAcpxPluginConfig } from "./config.js";
import {
cleanupMockRuntimeFixtures,
createMockRuntimeFixture,
NOOP_LOGGER,
readMockRuntimeLogEntries,
} from "./runtime-internals/test-fixtures.js";
import { AcpxRuntime, decodeAcpxRuntimeHandleState } from "./runtime.js";
const NOOP_LOGGER = {
info: (_message: string) => {},
warn: (_message: string) => {},
error: (_message: string) => {},
debug: (_message: string) => {},
};
const MOCK_CLI_SCRIPT = String.raw`#!/usr/bin/env node
const fs = require("node:fs");
const args = process.argv.slice(2);
const logPath = process.env.MOCK_ACPX_LOG;
const writeLog = (entry) => {
if (!logPath) return;
fs.appendFileSync(logPath, JSON.stringify(entry) + "\n");
};
const emitJson = (payload) => process.stdout.write(JSON.stringify(payload) + "\n");
const emitUpdate = (sessionId, update) =>
emitJson({
jsonrpc: "2.0",
method: "session/update",
params: { sessionId, update },
});
if (args.includes("--version")) {
process.stdout.write("mock-acpx ${ACPX_PINNED_VERSION}\\n");
process.exit(0);
}
if (args.includes("--help")) {
process.stdout.write("mock-acpx help\\n");
process.exit(0);
}
const commandIndex = args.findIndex(
(arg) =>
arg === "prompt" ||
arg === "cancel" ||
arg === "sessions" ||
arg === "set-mode" ||
arg === "set" ||
arg === "status",
);
const command = commandIndex >= 0 ? args[commandIndex] : "";
const agent = commandIndex > 0 ? args[commandIndex - 1] : "unknown";
const readFlag = (flag) => {
const idx = args.indexOf(flag);
if (idx < 0) return "";
return String(args[idx + 1] || "");
};
const sessionFromOption = readFlag("--session");
const ensureName = readFlag("--name");
const closeName = command === "sessions" && args[commandIndex + 1] === "close" ? String(args[commandIndex + 2] || "") : "";
const setModeValue = command === "set-mode" ? String(args[commandIndex + 1] || "") : "";
const setKey = command === "set" ? String(args[commandIndex + 1] || "") : "";
const setValue = command === "set" ? String(args[commandIndex + 2] || "") : "";
if (command === "sessions" && args[commandIndex + 1] === "ensure") {
writeLog({ kind: "ensure", agent, args, sessionName: ensureName });
emitJson({
action: "session_ensured",
acpxRecordId: "rec-" + ensureName,
acpxSessionId: "sid-" + ensureName,
agentSessionId: "inner-" + ensureName,
name: ensureName,
created: true,
});
process.exit(0);
}
if (command === "cancel") {
writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption });
emitJson({
acpxSessionId: "sid-" + sessionFromOption,
cancelled: true,
});
process.exit(0);
}
if (command === "set-mode") {
writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue });
emitJson({
action: "mode_set",
acpxSessionId: "sid-" + sessionFromOption,
mode: setModeValue,
});
process.exit(0);
}
if (command === "set") {
writeLog({
kind: "set",
agent,
args,
sessionName: sessionFromOption,
key: setKey,
value: setValue,
});
emitJson({
action: "config_set",
acpxSessionId: "sid-" + sessionFromOption,
key: setKey,
value: setValue,
});
process.exit(0);
}
if (command === "status") {
writeLog({ kind: "status", agent, args, sessionName: sessionFromOption });
emitJson({
acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null,
acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null,
agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null,
status: sessionFromOption ? "alive" : "no-session",
pid: 4242,
uptime: 120,
});
process.exit(0);
}
if (command === "sessions" && args[commandIndex + 1] === "close") {
writeLog({ kind: "close", agent, args, sessionName: closeName });
emitJson({
action: "session_closed",
acpxRecordId: "rec-" + closeName,
acpxSessionId: "sid-" + closeName,
name: closeName,
});
process.exit(0);
}
if (command === "prompt") {
const stdinText = fs.readFileSync(0, "utf8");
writeLog({ kind: "prompt", agent, args, sessionName: sessionFromOption, stdinText });
const requestId = "req-1";
emitJson({
jsonrpc: "2.0",
id: 0,
method: "session/load",
params: {
sessionId: sessionFromOption,
cwd: process.cwd(),
mcpServers: [],
},
});
emitJson({
jsonrpc: "2.0",
id: 0,
error: {
code: -32002,
message: "Resource not found",
},
});
emitJson({
jsonrpc: "2.0",
id: requestId,
method: "session/prompt",
params: {
sessionId: sessionFromOption,
prompt: [
{
type: "text",
text: stdinText.trim(),
},
],
},
});
if (stdinText.includes("trigger-error")) {
emitJson({
jsonrpc: "2.0",
id: requestId,
error: {
code: -32000,
message: "mock failure",
},
});
process.exit(1);
}
if (stdinText.includes("split-spacing")) {
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "alpha" },
});
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " beta" },
});
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " gamma" },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
process.exit(0);
}
if (stdinText.includes("double-done")) {
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "ok" },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
process.exit(0);
}
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "thinking" },
});
emitUpdate(sessionFromOption, {
sessionUpdate: "tool_call",
toolCallId: "tool-1",
title: "run-tests",
status: "in_progress",
kind: "command",
});
emitUpdate(sessionFromOption, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "echo:" + stdinText.trim() },
});
emitJson({
jsonrpc: "2.0",
id: requestId,
result: {
stopReason: "end_turn",
},
});
process.exit(0);
}
writeLog({ kind: "unknown", args });
emitJson({
error: {
code: "USAGE",
message: "unknown command",
},
});
process.exit(2);
`;
const tempDirs: string[] = [];
async function createMockRuntime(params?: {
permissionMode?: ResolvedAcpxPluginConfig["permissionMode"];
queueOwnerTtlSeconds?: number;
}): Promise<{
runtime: AcpxRuntime;
logPath: string;
config: ResolvedAcpxPluginConfig;
}> {
const dir = await mkdtemp(path.join(os.tmpdir(), "openclaw-acpx-runtime-test-"));
tempDirs.push(dir);
const scriptPath = path.join(dir, "mock-acpx.cjs");
const logPath = path.join(dir, "calls.log");
await writeFile(scriptPath, MOCK_CLI_SCRIPT, "utf8");
await chmod(scriptPath, 0o755);
process.env.MOCK_ACPX_LOG = logPath;
const config: ResolvedAcpxPluginConfig = {
command: scriptPath,
allowPluginLocalInstall: false,
installCommand: "n/a",
cwd: dir,
permissionMode: params?.permissionMode ?? "approve-all",
nonInteractivePermissions: "fail",
queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds ?? 0.1,
};
return {
runtime: new AcpxRuntime(config, {
queueOwnerTtlSeconds: params?.queueOwnerTtlSeconds,
logger: NOOP_LOGGER,
}),
logPath,
config,
};
}
async function readLogEntries(logPath: string): Promise<Array<Record<string, unknown>>> {
if (!fs.existsSync(logPath)) {
return [];
}
const raw = await readFile(logPath, "utf8");
return raw
.split(/\r?\n/)
.map((line) => line.trim())
.filter(Boolean)
.map((line) => JSON.parse(line) as Record<string, unknown>);
}
afterEach(async () => {
delete process.env.MOCK_ACPX_LOG;
while (tempDirs.length > 0) {
const dir = tempDirs.pop();
if (!dir) {
continue;
}
await rm(dir, {
recursive: true,
force: true,
maxRetries: 10,
retryDelay: 10,
});
}
await cleanupMockRuntimeFixtures();
});
describe("AcpxRuntime", () => {
it("passes the shared ACP adapter contract suite", async () => {
const fixture = await createMockRuntime();
const fixture = await createMockRuntimeFixture();
await runAcpRuntimeAdapterContract({
createRuntime: async () => fixture.runtime,
agentId: "codex",
@@ -353,7 +30,7 @@ describe("AcpxRuntime", () => {
},
});
const logs = await readLogEntries(fixture.logPath);
const logs = await readMockRuntimeLogEntries(fixture.logPath);
expect(logs.some((entry) => entry.kind === "ensure")).toBe(true);
expect(logs.some((entry) => entry.kind === "status")).toBe(true);
expect(logs.some((entry) => entry.kind === "set-mode")).toBe(true);
@@ -363,7 +40,7 @@ describe("AcpxRuntime", () => {
});
it("ensures sessions and streams prompt events", async () => {
const { runtime, logPath } = await createMockRuntime({ queueOwnerTtlSeconds: 180 });
const { runtime, logPath } = await createMockRuntimeFixture({ queueOwnerTtlSeconds: 180 });
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:123",
@@ -408,7 +85,7 @@ describe("AcpxRuntime", () => {
stopReason: "end_turn",
});
const logs = await readLogEntries(logPath);
const logs = await readMockRuntimeLogEntries(logPath);
const ensure = logs.find((entry) => entry.kind === "ensure");
const prompt = logs.find((entry) => entry.kind === "prompt");
expect(ensure).toBeDefined();
@@ -421,7 +98,7 @@ describe("AcpxRuntime", () => {
});
it("passes a queue-owner TTL by default to avoid long idle stalls", async () => {
const { runtime, logPath } = await createMockRuntime();
const { runtime, logPath } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:ttl-default",
agent: "codex",
@@ -437,7 +114,7 @@ describe("AcpxRuntime", () => {
// drain
}
const logs = await readLogEntries(logPath);
const logs = await readMockRuntimeLogEntries(logPath);
const prompt = logs.find((entry) => entry.kind === "prompt");
expect(prompt).toBeDefined();
const promptArgs = (prompt?.args as string[]) ?? [];
@@ -447,7 +124,7 @@ describe("AcpxRuntime", () => {
});
it("preserves leading spaces across streamed text deltas", async () => {
const { runtime } = await createMockRuntime();
const { runtime } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:space",
agent: "codex",
@@ -471,7 +148,7 @@ describe("AcpxRuntime", () => {
});
it("emits done once when ACP stream repeats stop reason responses", async () => {
const { runtime } = await createMockRuntime();
const { runtime } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:double-done",
agent: "codex",
@@ -493,7 +170,7 @@ describe("AcpxRuntime", () => {
});
it("maps acpx error events into ACP runtime error events", async () => {
const { runtime } = await createMockRuntime();
const { runtime } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:456",
agent: "codex",
@@ -519,7 +196,7 @@ describe("AcpxRuntime", () => {
});
it("supports cancel and close using encoded runtime handle state", async () => {
const { runtime, logPath, config } = await createMockRuntime();
const { runtime, logPath, config } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:claude:acp:789",
agent: "claude",
@@ -534,7 +211,7 @@ describe("AcpxRuntime", () => {
await secondRuntime.cancel({ handle, reason: "test" });
await secondRuntime.close({ handle, reason: "test" });
const logs = await readLogEntries(logPath);
const logs = await readMockRuntimeLogEntries(logPath);
const cancel = logs.find((entry) => entry.kind === "cancel");
const close = logs.find((entry) => entry.kind === "close");
expect(cancel?.sessionName).toBe("agent:claude:acp:789");
@@ -542,7 +219,7 @@ describe("AcpxRuntime", () => {
});
it("exposes control capabilities and runs set-mode/set/status commands", async () => {
const { runtime, logPath } = await createMockRuntime();
const { runtime, logPath } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:controls",
agent: "codex",
@@ -574,14 +251,14 @@ describe("AcpxRuntime", () => {
expect(status.details?.status).toBe("alive");
expect(status.details?.pid).toBe(4242);
const logs = await readLogEntries(logPath);
const logs = await readMockRuntimeLogEntries(logPath);
expect(logs.find((entry) => entry.kind === "set-mode")?.mode).toBe("plan");
expect(logs.find((entry) => entry.kind === "set")?.key).toBe("model");
expect(logs.find((entry) => entry.kind === "status")).toBeDefined();
});
it("skips prompt execution when runTurn starts with an already-aborted signal", async () => {
const { runtime, logPath } = await createMockRuntime();
const { runtime, logPath } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:aborted",
agent: "codex",
@@ -601,13 +278,13 @@ describe("AcpxRuntime", () => {
events.push(event);
}
const logs = await readLogEntries(logPath);
const logs = await readMockRuntimeLogEntries(logPath);
expect(events).toEqual([]);
expect(logs.some((entry) => entry.kind === "prompt")).toBe(false);
});
it("does not mark backend unhealthy when a per-session cwd is missing", async () => {
const { runtime } = await createMockRuntime();
const { runtime } = await createMockRuntimeFixture();
const missingCwd = path.join(os.tmpdir(), "openclaw-acpx-runtime-test-missing-cwd");
await runtime.probeAvailability();
@@ -646,7 +323,7 @@ describe("AcpxRuntime", () => {
});
it("marks runtime healthy when command is available", async () => {
const { runtime } = await createMockRuntime();
const { runtime } = await createMockRuntimeFixture();
await runtime.probeAvailability();
expect(runtime.isHealthy()).toBe(true);
});

View File

@@ -14,11 +14,8 @@ import type {
import { AcpRuntimeError } from "openclaw/plugin-sdk";
import { type ResolvedAcpxPluginConfig } from "./config.js";
import { checkAcpxVersion } from "./ensure.js";
import {
parseJsonLines,
parsePromptEventLine,
toAcpxErrorEvent,
} from "./runtime-internals/events.js";
import { parseControlJsonError } from "./runtime-internals/control-errors.js";
import { parseJsonLines, PromptStreamProjector } from "./runtime-internals/events.js";
import {
resolveSpawnFailure,
spawnAndCollect,
@@ -197,9 +194,7 @@ export class AcpxRuntime implements AcpRuntime {
sessionName: state.name,
cwd: state.cwd,
});
const parseContext = {
promptRequestIds: new Set<string>(),
};
const projector = new PromptStreamProjector();
const cancelOnAbort = async () => {
await this.cancel({
@@ -241,7 +236,7 @@ export class AcpxRuntime implements AcpRuntime {
const lines = createInterface({ input: child.stdout });
try {
for await (const line of lines) {
const parsed = parsePromptEventLine(line, parseContext);
const parsed = projector.ingestLine(line);
if (!parsed) {
continue;
}
@@ -312,7 +307,7 @@ export class AcpxRuntime implements AcpRuntime {
fallbackCode: "ACP_TURN_FAILED",
ignoreNoSession: true,
});
const detail = events.find((event) => !toAcpxErrorEvent(event)) ?? events[0];
const detail = events.find((event) => !parseControlJsonError(event)) ?? events[0];
if (!detail) {
return {
summary: "acpx status unavailable",
@@ -558,7 +553,7 @@ export class AcpxRuntime implements AcpRuntime {
}
const events = parseJsonLines(result.stdout);
const errorEvent = events.map((event) => toAcpxErrorEvent(event)).find(Boolean) ?? null;
const errorEvent = events.map((event) => parseControlJsonError(event)).find(Boolean) ?? null;
if (errorEvent) {
if (params.ignoreNoSession && errorEvent.code === "NO_SESSION") {
return events;