mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
ACPX extension: parse pure ACP JSON-RPC stream
This commit is contained in:
208
extensions/acpx/src/runtime-internals/events.test.ts
Normal file
208
extensions/acpx/src/runtime-internals/events.test.ts
Normal file
@@ -0,0 +1,208 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { parsePromptEventLine, toAcpxErrorEvent } from "./events.js";
|
||||
|
||||
function jsonLine(payload: unknown): string {
|
||||
return JSON.stringify(payload);
|
||||
}
|
||||
|
||||
describe("acpx runtime event parsing", () => {
|
||||
it("maps agent message chunks to output deltas", () => {
|
||||
const event = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
method: "session/update",
|
||||
params: {
|
||||
sessionId: "session-1",
|
||||
update: {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: {
|
||||
type: "text",
|
||||
text: "hello world",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(event).toEqual({
|
||||
type: "text_delta",
|
||||
text: "hello world",
|
||||
stream: "output",
|
||||
});
|
||||
});
|
||||
|
||||
it("preserves leading spaces in streamed output chunks", () => {
|
||||
const event = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
method: "session/update",
|
||||
params: {
|
||||
sessionId: "session-1",
|
||||
update: {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: {
|
||||
type: "text",
|
||||
text: " indented",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(event).toEqual({
|
||||
type: "text_delta",
|
||||
text: " indented",
|
||||
stream: "output",
|
||||
});
|
||||
});
|
||||
|
||||
it("maps agent thought chunks to thought deltas", () => {
|
||||
const event = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
method: "session/update",
|
||||
params: {
|
||||
sessionId: "session-1",
|
||||
update: {
|
||||
sessionUpdate: "agent_thought_chunk",
|
||||
content: {
|
||||
type: "text",
|
||||
text: "thinking",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(event).toEqual({
|
||||
type: "text_delta",
|
||||
text: "thinking",
|
||||
stream: "thought",
|
||||
});
|
||||
});
|
||||
|
||||
it("maps tool call updates to tool_call events", () => {
|
||||
const event = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
method: "session/update",
|
||||
params: {
|
||||
sessionId: "session-1",
|
||||
update: {
|
||||
sessionUpdate: "tool_call",
|
||||
toolCallId: "call-1",
|
||||
title: "exec",
|
||||
status: "in_progress",
|
||||
},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(event).toEqual({
|
||||
type: "tool_call",
|
||||
text: "exec (in_progress)",
|
||||
});
|
||||
});
|
||||
|
||||
it("maps prompt response stop reasons to done events", () => {
|
||||
const event = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
id: "req-1",
|
||||
result: {
|
||||
stopReason: "end_turn",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(event).toEqual({
|
||||
type: "done",
|
||||
stopReason: "end_turn",
|
||||
});
|
||||
});
|
||||
|
||||
it("maps json-rpc errors to runtime errors", () => {
|
||||
const event = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
id: "req-1",
|
||||
error: {
|
||||
code: -32000,
|
||||
message: "adapter failed",
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
expect(event).toEqual({
|
||||
type: "error",
|
||||
message: "adapter failed",
|
||||
code: "-32000",
|
||||
retryable: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it("ignores non-prompt response errors when parse context is provided", () => {
|
||||
const context = {
|
||||
promptRequestIds: new Set<string>(),
|
||||
};
|
||||
|
||||
const promptRequest = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
id: 3,
|
||||
method: "session/prompt",
|
||||
params: {
|
||||
sessionId: "session-1",
|
||||
prompt: [{ type: "text", text: "hello" }],
|
||||
},
|
||||
}),
|
||||
context,
|
||||
);
|
||||
const loadError = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
id: 1,
|
||||
error: {
|
||||
code: -32002,
|
||||
message: "Resource not found",
|
||||
},
|
||||
}),
|
||||
context,
|
||||
);
|
||||
const promptDone = parsePromptEventLine(
|
||||
jsonLine({
|
||||
jsonrpc: "2.0",
|
||||
id: 3,
|
||||
result: {
|
||||
stopReason: "end_turn",
|
||||
},
|
||||
}),
|
||||
context,
|
||||
);
|
||||
|
||||
expect(promptRequest).toBeNull();
|
||||
expect(loadError).toBeNull();
|
||||
expect(promptDone).toEqual({
|
||||
type: "done",
|
||||
stopReason: "end_turn",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("toAcpxErrorEvent", () => {
|
||||
it("reads control command errors from json output", () => {
|
||||
expect(
|
||||
toAcpxErrorEvent({
|
||||
error: {
|
||||
code: "NO_SESSION",
|
||||
message: "No matching session",
|
||||
retryable: false,
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
code: "NO_SESSION",
|
||||
message: "No matching session",
|
||||
retryable: false,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -9,17 +9,72 @@ import {
|
||||
isRecord,
|
||||
} from "./shared.js";
|
||||
|
||||
type JsonRpcId = string | number | null;
|
||||
|
||||
export type PromptParseContext = {
|
||||
promptRequestIds: Set<string>;
|
||||
};
|
||||
|
||||
function isJsonRpcId(value: unknown): value is JsonRpcId {
|
||||
return (
|
||||
value === null ||
|
||||
typeof value === "string" ||
|
||||
(typeof value === "number" && Number.isFinite(value))
|
||||
);
|
||||
}
|
||||
|
||||
function normalizeJsonRpcId(value: unknown): string | null {
|
||||
if (!isJsonRpcId(value) || value == null) {
|
||||
return null;
|
||||
}
|
||||
return String(value);
|
||||
}
|
||||
|
||||
function isAcpJsonRpcMessage(value: unknown): value is Record<string, unknown> {
|
||||
if (!isRecord(value) || value.jsonrpc !== "2.0") {
|
||||
return false;
|
||||
}
|
||||
|
||||
const hasMethod = typeof value.method === "string" && value.method.length > 0;
|
||||
const hasId = Object.hasOwn(value, "id");
|
||||
|
||||
if (hasMethod && !hasId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (hasMethod && hasId) {
|
||||
return isJsonRpcId(value.id);
|
||||
}
|
||||
|
||||
if (!hasMethod && hasId) {
|
||||
if (!isJsonRpcId(value.id)) {
|
||||
return false;
|
||||
}
|
||||
const hasResult = Object.hasOwn(value, "result");
|
||||
const hasError = Object.hasOwn(value, "error");
|
||||
return hasResult !== hasError;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
export function toAcpxErrorEvent(value: unknown): AcpxErrorEvent | null {
|
||||
if (!isRecord(value)) {
|
||||
return null;
|
||||
}
|
||||
if (asTrimmedString(value.type) !== "error") {
|
||||
const error = isRecord(value.error) ? value.error : null;
|
||||
if (!error) {
|
||||
return null;
|
||||
}
|
||||
const message = asTrimmedString(error.message) || "acpx reported an error";
|
||||
const codeValue = error.code;
|
||||
return {
|
||||
message: asTrimmedString(value.message) || "acpx reported an error",
|
||||
code: asOptionalString(value.code),
|
||||
retryable: asOptionalBoolean(value.retryable),
|
||||
message,
|
||||
code:
|
||||
typeof codeValue === "number" && Number.isFinite(codeValue)
|
||||
? String(codeValue)
|
||||
: asOptionalString(codeValue),
|
||||
retryable: asOptionalBoolean(error.retryable),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -42,7 +97,155 @@ export function parseJsonLines(value: string): AcpxJsonObject[] {
|
||||
return events;
|
||||
}
|
||||
|
||||
export function parsePromptEventLine(line: string): AcpRuntimeEvent | null {
|
||||
function parsePromptStopReason(message: Record<string, unknown>): string | undefined {
|
||||
if (!Object.hasOwn(message, "result")) {
|
||||
return undefined;
|
||||
}
|
||||
const result = isRecord(message.result) ? message.result : null;
|
||||
if (!result) {
|
||||
return undefined;
|
||||
}
|
||||
const stopReason = asString(result.stopReason);
|
||||
return stopReason && stopReason.trim().length > 0 ? stopReason : undefined;
|
||||
}
|
||||
|
||||
function parseSessionUpdateEvent(message: Record<string, unknown>): AcpRuntimeEvent | null {
|
||||
if (asTrimmedString(message.method) !== "session/update") {
|
||||
return null;
|
||||
}
|
||||
const params = isRecord(message.params) ? message.params : null;
|
||||
if (!params) {
|
||||
return null;
|
||||
}
|
||||
const update = isRecord(params.update) ? params.update : null;
|
||||
if (!update) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const sessionUpdate = asTrimmedString(update.sessionUpdate);
|
||||
switch (sessionUpdate) {
|
||||
case "agent_message_chunk": {
|
||||
const content = isRecord(update.content) ? update.content : null;
|
||||
if (!content || asTrimmedString(content.type) !== "text") {
|
||||
return null;
|
||||
}
|
||||
const text = asString(content.text);
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
type: "text_delta",
|
||||
text,
|
||||
stream: "output",
|
||||
};
|
||||
}
|
||||
case "agent_thought_chunk": {
|
||||
const content = isRecord(update.content) ? update.content : null;
|
||||
if (!content || asTrimmedString(content.type) !== "text") {
|
||||
return null;
|
||||
}
|
||||
const text = asString(content.text);
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
type: "text_delta",
|
||||
text,
|
||||
stream: "thought",
|
||||
};
|
||||
}
|
||||
case "tool_call":
|
||||
case "tool_call_update": {
|
||||
const title =
|
||||
asTrimmedString(update.title) ||
|
||||
asTrimmedString(update.toolCallId) ||
|
||||
asTrimmedString(update.kind) ||
|
||||
"tool";
|
||||
const status = asTrimmedString(update.status);
|
||||
return {
|
||||
type: "tool_call",
|
||||
text: status ? `${title} (${status})` : title,
|
||||
};
|
||||
}
|
||||
case "plan": {
|
||||
const entries = Array.isArray(update.entries) ? update.entries : [];
|
||||
const first = entries.find((entry) => isRecord(entry)) as Record<string, unknown> | undefined;
|
||||
const content = asTrimmedString(first?.content);
|
||||
if (!content) {
|
||||
return { type: "status", text: "plan updated" };
|
||||
}
|
||||
const status = asTrimmedString(first?.status);
|
||||
return {
|
||||
type: "status",
|
||||
text: status ? `plan: [${status}] ${content}` : `plan: ${content}`,
|
||||
};
|
||||
}
|
||||
case "available_commands_update": {
|
||||
const commands = Array.isArray(update.availableCommands)
|
||||
? update.availableCommands.length
|
||||
: 0;
|
||||
return {
|
||||
type: "status",
|
||||
text: `available commands updated (${commands})`,
|
||||
};
|
||||
}
|
||||
case "current_mode_update": {
|
||||
const modeId = asTrimmedString(update.currentModeId);
|
||||
return {
|
||||
type: "status",
|
||||
text: modeId ? `mode updated: ${modeId}` : "mode updated",
|
||||
};
|
||||
}
|
||||
case "config_option_update": {
|
||||
const options = Array.isArray(update.configOptions) ? update.configOptions.length : 0;
|
||||
return {
|
||||
type: "status",
|
||||
text: `config options updated (${options})`,
|
||||
};
|
||||
}
|
||||
case "session_info_update": {
|
||||
const title = asTrimmedString(update.title);
|
||||
return {
|
||||
type: "status",
|
||||
text: title ? `session info updated: ${title}` : "session info updated",
|
||||
};
|
||||
}
|
||||
case "usage_update": {
|
||||
const used =
|
||||
typeof update.used === "number" && Number.isFinite(update.used) ? update.used : null;
|
||||
const size =
|
||||
typeof update.size === "number" && Number.isFinite(update.size) ? update.size : null;
|
||||
if (used == null || size == null) {
|
||||
return { type: "status", text: "usage updated" };
|
||||
}
|
||||
return {
|
||||
type: "status",
|
||||
text: `usage updated: ${used}/${size}`,
|
||||
};
|
||||
}
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function shouldHandlePromptResponse(params: {
|
||||
message: Record<string, unknown>;
|
||||
context?: PromptParseContext;
|
||||
}): boolean {
|
||||
const id = normalizeJsonRpcId(params.message.id);
|
||||
if (!id) {
|
||||
return false;
|
||||
}
|
||||
if (!params.context) {
|
||||
return true;
|
||||
}
|
||||
return params.context.promptRequestIds.has(id);
|
||||
}
|
||||
|
||||
export function parsePromptEventLine(
|
||||
line: string,
|
||||
context?: PromptParseContext,
|
||||
): AcpRuntimeEvent | null {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) {
|
||||
return null;
|
||||
@@ -61,80 +264,60 @@ export function parsePromptEventLine(line: string): AcpRuntimeEvent | null {
|
||||
return null;
|
||||
}
|
||||
|
||||
const type = asTrimmedString(parsed.type);
|
||||
switch (type) {
|
||||
case "text": {
|
||||
const content = asString(parsed.content);
|
||||
if (content == null || content.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
type: "text_delta",
|
||||
text: content,
|
||||
stream: "output",
|
||||
};
|
||||
}
|
||||
case "thought": {
|
||||
const content = asString(parsed.content);
|
||||
if (content == null || content.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
type: "text_delta",
|
||||
text: content,
|
||||
stream: "thought",
|
||||
};
|
||||
}
|
||||
case "tool_call": {
|
||||
const title = asTrimmedString(parsed.title) || asTrimmedString(parsed.toolCallId) || "tool";
|
||||
const status = asTrimmedString(parsed.status);
|
||||
return {
|
||||
type: "tool_call",
|
||||
text: status ? `${title} (${status})` : title,
|
||||
};
|
||||
}
|
||||
case "client_operation": {
|
||||
const method = asTrimmedString(parsed.method) || "operation";
|
||||
const status = asTrimmedString(parsed.status);
|
||||
const summary = asTrimmedString(parsed.summary);
|
||||
const text = [method, status, summary].filter(Boolean).join(" ");
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
return { type: "status", text };
|
||||
}
|
||||
case "plan": {
|
||||
const entries = Array.isArray(parsed.entries) ? parsed.entries : [];
|
||||
const first = entries.find((entry) => isRecord(entry)) as Record<string, unknown> | undefined;
|
||||
const content = asTrimmedString(first?.content);
|
||||
if (!content) {
|
||||
return null;
|
||||
}
|
||||
return { type: "status", text: `plan: ${content}` };
|
||||
}
|
||||
case "update": {
|
||||
const update = asTrimmedString(parsed.update);
|
||||
if (!update) {
|
||||
return null;
|
||||
}
|
||||
return { type: "status", text: update };
|
||||
}
|
||||
case "done": {
|
||||
return {
|
||||
type: "done",
|
||||
stopReason: asOptionalString(parsed.stopReason),
|
||||
};
|
||||
}
|
||||
case "error": {
|
||||
const message = asTrimmedString(parsed.message) || "acpx runtime error";
|
||||
if (!isAcpJsonRpcMessage(parsed)) {
|
||||
const fallbackError = toAcpxErrorEvent(parsed);
|
||||
if (fallbackError) {
|
||||
return {
|
||||
type: "error",
|
||||
message,
|
||||
code: asOptionalString(parsed.code),
|
||||
retryable: asOptionalBoolean(parsed.retryable),
|
||||
message: fallbackError.message,
|
||||
code: fallbackError.code,
|
||||
retryable: fallbackError.retryable,
|
||||
};
|
||||
}
|
||||
default:
|
||||
return null;
|
||||
return null;
|
||||
}
|
||||
|
||||
const updateEvent = parseSessionUpdateEvent(parsed);
|
||||
if (updateEvent) {
|
||||
return updateEvent;
|
||||
}
|
||||
|
||||
if (asTrimmedString(parsed.method) === "session/prompt") {
|
||||
const id = normalizeJsonRpcId(parsed.id);
|
||||
if (id && context) {
|
||||
context.promptRequestIds.add(id);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
if (Object.hasOwn(parsed, "error")) {
|
||||
if (!shouldHandlePromptResponse({ message: parsed, context })) {
|
||||
return null;
|
||||
}
|
||||
const error = isRecord(parsed.error) ? parsed.error : null;
|
||||
const message = asTrimmedString(error?.message);
|
||||
const codeValue = error?.code;
|
||||
return {
|
||||
type: "error",
|
||||
message: message || "acpx runtime error",
|
||||
code:
|
||||
typeof codeValue === "number" && Number.isFinite(codeValue)
|
||||
? String(codeValue)
|
||||
: asOptionalString(codeValue),
|
||||
retryable: asOptionalBoolean(error?.retryable),
|
||||
};
|
||||
}
|
||||
|
||||
const stopReason = parsePromptStopReason(parsed);
|
||||
if (stopReason) {
|
||||
if (!shouldHandlePromptResponse({ message: parsed, context })) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
type: "done",
|
||||
stopReason,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -23,6 +23,13 @@ const writeLog = (entry) => {
|
||||
if (!logPath) return;
|
||||
fs.appendFileSync(logPath, JSON.stringify(entry) + "\n");
|
||||
};
|
||||
const emitJson = (payload) => process.stdout.write(JSON.stringify(payload) + "\n");
|
||||
const emitUpdate = (sessionId, update) =>
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
method: "session/update",
|
||||
params: { sessionId, update },
|
||||
});
|
||||
|
||||
if (args.includes("--version")) {
|
||||
process.stdout.write("mock-acpx ${ACPX_PINNED_VERSION}\\n");
|
||||
@@ -61,33 +68,33 @@ const setValue = command === "set" ? String(args[commandIndex + 2] || "") : "";
|
||||
|
||||
if (command === "sessions" && args[commandIndex + 1] === "ensure") {
|
||||
writeLog({ kind: "ensure", agent, args, sessionName: ensureName });
|
||||
process.stdout.write(JSON.stringify({
|
||||
type: "session_ensured",
|
||||
emitJson({
|
||||
action: "session_ensured",
|
||||
acpxRecordId: "rec-" + ensureName,
|
||||
acpxSessionId: "sid-" + ensureName,
|
||||
agentSessionId: "inner-" + ensureName,
|
||||
name: ensureName,
|
||||
created: true,
|
||||
}) + "\n");
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (command === "cancel") {
|
||||
writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption });
|
||||
process.stdout.write(JSON.stringify({
|
||||
emitJson({
|
||||
acpxSessionId: "sid-" + sessionFromOption,
|
||||
cancelled: true,
|
||||
}) + "\n");
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (command === "set-mode") {
|
||||
writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue });
|
||||
process.stdout.write(JSON.stringify({
|
||||
type: "mode_set",
|
||||
emitJson({
|
||||
action: "mode_set",
|
||||
acpxSessionId: "sid-" + sessionFromOption,
|
||||
mode: setModeValue,
|
||||
}) + "\n");
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
@@ -100,148 +107,167 @@ if (command === "set") {
|
||||
key: setKey,
|
||||
value: setValue,
|
||||
});
|
||||
process.stdout.write(JSON.stringify({
|
||||
type: "config_set",
|
||||
emitJson({
|
||||
action: "config_set",
|
||||
acpxSessionId: "sid-" + sessionFromOption,
|
||||
key: setKey,
|
||||
value: setValue,
|
||||
}) + "\n");
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (command === "status") {
|
||||
writeLog({ kind: "status", agent, args, sessionName: sessionFromOption });
|
||||
process.stdout.write(JSON.stringify({
|
||||
emitJson({
|
||||
acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null,
|
||||
acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null,
|
||||
agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null,
|
||||
status: sessionFromOption ? "alive" : "no-session",
|
||||
pid: 4242,
|
||||
uptime: 120,
|
||||
}) + "\n");
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (command === "sessions" && args[commandIndex + 1] === "close") {
|
||||
writeLog({ kind: "close", agent, args, sessionName: closeName });
|
||||
process.stdout.write(JSON.stringify({
|
||||
type: "session_closed",
|
||||
emitJson({
|
||||
action: "session_closed",
|
||||
acpxRecordId: "rec-" + closeName,
|
||||
acpxSessionId: "sid-" + closeName,
|
||||
name: closeName,
|
||||
}) + "\n");
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (command === "prompt") {
|
||||
const stdinText = fs.readFileSync(0, "utf8");
|
||||
writeLog({ kind: "prompt", agent, args, sessionName: sessionFromOption, stdinText });
|
||||
const acpxSessionId = "sid-" + sessionFromOption;
|
||||
const requestId = "req-1";
|
||||
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: 0,
|
||||
method: "session/load",
|
||||
params: {
|
||||
sessionId: sessionFromOption,
|
||||
cwd: process.cwd(),
|
||||
mcpServers: [],
|
||||
},
|
||||
});
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: 0,
|
||||
error: {
|
||||
code: -32002,
|
||||
message: "Resource not found",
|
||||
},
|
||||
});
|
||||
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: requestId,
|
||||
method: "session/prompt",
|
||||
params: {
|
||||
sessionId: sessionFromOption,
|
||||
prompt: [
|
||||
{
|
||||
type: "text",
|
||||
text: stdinText.trim(),
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
||||
if (stdinText.includes("trigger-error")) {
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 0,
|
||||
stream: "prompt",
|
||||
type: "error",
|
||||
code: "RUNTIME",
|
||||
message: "mock failure",
|
||||
}) + "\n");
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: requestId,
|
||||
error: {
|
||||
code: -32000,
|
||||
message: "mock failure",
|
||||
},
|
||||
});
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (stdinText.includes("split-spacing")) {
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 0,
|
||||
stream: "prompt",
|
||||
type: "text",
|
||||
content: "alpha",
|
||||
}) + "\n");
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 1,
|
||||
stream: "prompt",
|
||||
type: "text",
|
||||
content: " beta",
|
||||
}) + "\n");
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 2,
|
||||
stream: "prompt",
|
||||
type: "text",
|
||||
content: " gamma",
|
||||
}) + "\n");
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 3,
|
||||
stream: "prompt",
|
||||
type: "done",
|
||||
stopReason: "end_turn",
|
||||
}) + "\n");
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: { type: "text", text: "alpha" },
|
||||
});
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: { type: "text", text: " beta" },
|
||||
});
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: { type: "text", text: " gamma" },
|
||||
});
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: requestId,
|
||||
result: {
|
||||
stopReason: "end_turn",
|
||||
},
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 0,
|
||||
stream: "prompt",
|
||||
type: "thought",
|
||||
content: "thinking",
|
||||
}) + "\n");
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 1,
|
||||
stream: "prompt",
|
||||
type: "tool_call",
|
||||
if (stdinText.includes("double-done")) {
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: { type: "text", text: "ok" },
|
||||
});
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: requestId,
|
||||
result: {
|
||||
stopReason: "end_turn",
|
||||
},
|
||||
});
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: requestId,
|
||||
result: {
|
||||
stopReason: "end_turn",
|
||||
},
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "agent_thought_chunk",
|
||||
content: { type: "text", text: "thinking" },
|
||||
});
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "tool_call",
|
||||
toolCallId: "tool-1",
|
||||
title: "run-tests",
|
||||
status: "in_progress",
|
||||
}) + "\n");
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 2,
|
||||
stream: "prompt",
|
||||
type: "text",
|
||||
content: "echo:" + stdinText.trim(),
|
||||
}) + "\n");
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId,
|
||||
requestId: "req-1",
|
||||
seq: 3,
|
||||
stream: "prompt",
|
||||
type: "done",
|
||||
stopReason: "end_turn",
|
||||
}) + "\n");
|
||||
kind: "command",
|
||||
});
|
||||
emitUpdate(sessionFromOption, {
|
||||
sessionUpdate: "agent_message_chunk",
|
||||
content: { type: "text", text: "echo:" + stdinText.trim() },
|
||||
});
|
||||
emitJson({
|
||||
jsonrpc: "2.0",
|
||||
id: requestId,
|
||||
result: {
|
||||
stopReason: "end_turn",
|
||||
},
|
||||
});
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
writeLog({ kind: "unknown", args });
|
||||
process.stdout.write(JSON.stringify({
|
||||
eventVersion: 1,
|
||||
acpxSessionId: "unknown",
|
||||
seq: 0,
|
||||
stream: "control",
|
||||
type: "error",
|
||||
code: "USAGE",
|
||||
message: "unknown command",
|
||||
}) + "\n");
|
||||
emitJson({
|
||||
error: {
|
||||
code: "USAGE",
|
||||
message: "unknown command",
|
||||
},
|
||||
});
|
||||
process.exit(2);
|
||||
`;
|
||||
|
||||
@@ -444,6 +470,28 @@ describe("AcpxRuntime", () => {
|
||||
expect(textDeltas.join("")).toBe("alpha beta gamma");
|
||||
});
|
||||
|
||||
it("emits done once when ACP stream repeats stop reason responses", async () => {
|
||||
const { runtime } = await createMockRuntime();
|
||||
const handle = await runtime.ensureSession({
|
||||
sessionKey: "agent:codex:acp:double-done",
|
||||
agent: "codex",
|
||||
mode: "persistent",
|
||||
});
|
||||
|
||||
const events = [];
|
||||
for await (const event of runtime.runTurn({
|
||||
handle,
|
||||
text: "double-done",
|
||||
mode: "prompt",
|
||||
requestId: "req-double-done",
|
||||
})) {
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
const doneCount = events.filter((event) => event.type === "done").length;
|
||||
expect(doneCount).toBe(1);
|
||||
});
|
||||
|
||||
it("maps acpx error events into ACP runtime error events", async () => {
|
||||
const { runtime } = await createMockRuntime();
|
||||
const handle = await runtime.ensureSession({
|
||||
@@ -465,7 +513,7 @@ describe("AcpxRuntime", () => {
|
||||
expect(events).toContainEqual({
|
||||
type: "error",
|
||||
message: "mock failure",
|
||||
code: "RUNTIME",
|
||||
code: "-32000",
|
||||
retryable: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
@@ -197,6 +197,9 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
sessionName: state.name,
|
||||
cwd: state.cwd,
|
||||
});
|
||||
const parseContext = {
|
||||
promptRequestIds: new Set<string>(),
|
||||
};
|
||||
|
||||
const cancelOnAbort = async () => {
|
||||
await this.cancel({
|
||||
@@ -238,11 +241,14 @@ export class AcpxRuntime implements AcpRuntime {
|
||||
const lines = createInterface({ input: child.stdout });
|
||||
try {
|
||||
for await (const line of lines) {
|
||||
const parsed = parsePromptEventLine(line);
|
||||
const parsed = parsePromptEventLine(line, parseContext);
|
||||
if (!parsed) {
|
||||
continue;
|
||||
}
|
||||
if (parsed.type === "done") {
|
||||
if (sawDone) {
|
||||
continue;
|
||||
}
|
||||
sawDone = true;
|
||||
}
|
||||
if (parsed.type === "error") {
|
||||
|
||||
Reference in New Issue
Block a user