diff --git a/extensions/acpx/src/runtime-internals/events.test.ts b/extensions/acpx/src/runtime-internals/events.test.ts new file mode 100644 index 00000000000..e5688361d43 --- /dev/null +++ b/extensions/acpx/src/runtime-internals/events.test.ts @@ -0,0 +1,208 @@ +import { describe, expect, it } from "vitest"; +import { parsePromptEventLine, toAcpxErrorEvent } from "./events.js"; + +function jsonLine(payload: unknown): string { + return JSON.stringify(payload); +} + +describe("acpx runtime event parsing", () => { + it("maps agent message chunks to output deltas", () => { + const event = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "hello world", + }, + }, + }, + }), + ); + + expect(event).toEqual({ + type: "text_delta", + text: "hello world", + stream: "output", + }); + }); + + it("preserves leading spaces in streamed output chunks", () => { + const event = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: " indented", + }, + }, + }, + }), + ); + + expect(event).toEqual({ + type: "text_delta", + text: " indented", + stream: "output", + }); + }); + + it("maps agent thought chunks to thought deltas", () => { + const event = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: "thinking", + }, + }, + }, + }), + ); + + expect(event).toEqual({ + type: "text_delta", + text: "thinking", + stream: "thought", + }); + }); + + it("maps tool call updates to tool_call events", () => { + const event = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "tool_call", + toolCallId: "call-1", + title: "exec", + status: "in_progress", + }, + }, + }), + ); + + expect(event).toEqual({ + type: "tool_call", + text: "exec (in_progress)", + }); + }); + + it("maps prompt response stop reasons to done events", () => { + const event = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + id: "req-1", + result: { + stopReason: "end_turn", + }, + }), + ); + + expect(event).toEqual({ + type: "done", + stopReason: "end_turn", + }); + }); + + it("maps json-rpc errors to runtime errors", () => { + const event = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + id: "req-1", + error: { + code: -32000, + message: "adapter failed", + }, + }), + ); + + expect(event).toEqual({ + type: "error", + message: "adapter failed", + code: "-32000", + retryable: undefined, + }); + }); + + it("ignores non-prompt response errors when parse context is provided", () => { + const context = { + promptRequestIds: new Set(), + }; + + const promptRequest = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + id: 3, + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hello" }], + }, + }), + context, + ); + const loadError = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + id: 1, + error: { + code: -32002, + message: "Resource not found", + }, + }), + context, + ); + const promptDone = parsePromptEventLine( + jsonLine({ + jsonrpc: "2.0", + id: 3, + result: { + stopReason: "end_turn", + }, + }), + context, + ); + + expect(promptRequest).toBeNull(); + expect(loadError).toBeNull(); + expect(promptDone).toEqual({ + type: "done", + stopReason: "end_turn", + }); + }); +}); + +describe("toAcpxErrorEvent", () => { + it("reads control command errors from json output", () => { + expect( + toAcpxErrorEvent({ + error: { + code: "NO_SESSION", + message: "No matching session", + retryable: false, + }, + }), + ).toEqual({ + code: "NO_SESSION", + message: "No matching session", + retryable: false, + }); + }); +}); diff --git a/extensions/acpx/src/runtime-internals/events.ts b/extensions/acpx/src/runtime-internals/events.ts index 074787b3fdf..8d840ad5457 100644 --- a/extensions/acpx/src/runtime-internals/events.ts +++ b/extensions/acpx/src/runtime-internals/events.ts @@ -9,17 +9,72 @@ import { isRecord, } from "./shared.js"; +type JsonRpcId = string | number | null; + +export type PromptParseContext = { + promptRequestIds: Set; +}; + +function isJsonRpcId(value: unknown): value is JsonRpcId { + return ( + value === null || + typeof value === "string" || + (typeof value === "number" && Number.isFinite(value)) + ); +} + +function normalizeJsonRpcId(value: unknown): string | null { + if (!isJsonRpcId(value) || value == null) { + return null; + } + return String(value); +} + +function isAcpJsonRpcMessage(value: unknown): value is Record { + if (!isRecord(value) || value.jsonrpc !== "2.0") { + return false; + } + + const hasMethod = typeof value.method === "string" && value.method.length > 0; + const hasId = Object.hasOwn(value, "id"); + + if (hasMethod && !hasId) { + return true; + } + + if (hasMethod && hasId) { + return isJsonRpcId(value.id); + } + + if (!hasMethod && hasId) { + if (!isJsonRpcId(value.id)) { + return false; + } + const hasResult = Object.hasOwn(value, "result"); + const hasError = Object.hasOwn(value, "error"); + return hasResult !== hasError; + } + + return false; +} + export function toAcpxErrorEvent(value: unknown): AcpxErrorEvent | null { if (!isRecord(value)) { return null; } - if (asTrimmedString(value.type) !== "error") { + const error = isRecord(value.error) ? value.error : null; + if (!error) { return null; } + const message = asTrimmedString(error.message) || "acpx reported an error"; + const codeValue = error.code; return { - message: asTrimmedString(value.message) || "acpx reported an error", - code: asOptionalString(value.code), - retryable: asOptionalBoolean(value.retryable), + message, + code: + typeof codeValue === "number" && Number.isFinite(codeValue) + ? String(codeValue) + : asOptionalString(codeValue), + retryable: asOptionalBoolean(error.retryable), }; } @@ -42,7 +97,155 @@ export function parseJsonLines(value: string): AcpxJsonObject[] { return events; } -export function parsePromptEventLine(line: string): AcpRuntimeEvent | null { +function parsePromptStopReason(message: Record): string | undefined { + if (!Object.hasOwn(message, "result")) { + return undefined; + } + const result = isRecord(message.result) ? message.result : null; + if (!result) { + return undefined; + } + const stopReason = asString(result.stopReason); + return stopReason && stopReason.trim().length > 0 ? stopReason : undefined; +} + +function parseSessionUpdateEvent(message: Record): AcpRuntimeEvent | null { + if (asTrimmedString(message.method) !== "session/update") { + return null; + } + const params = isRecord(message.params) ? message.params : null; + if (!params) { + return null; + } + const update = isRecord(params.update) ? params.update : null; + if (!update) { + return null; + } + + const sessionUpdate = asTrimmedString(update.sessionUpdate); + switch (sessionUpdate) { + case "agent_message_chunk": { + const content = isRecord(update.content) ? update.content : null; + if (!content || asTrimmedString(content.type) !== "text") { + return null; + } + const text = asString(content.text); + if (!text) { + return null; + } + return { + type: "text_delta", + text, + stream: "output", + }; + } + case "agent_thought_chunk": { + const content = isRecord(update.content) ? update.content : null; + if (!content || asTrimmedString(content.type) !== "text") { + return null; + } + const text = asString(content.text); + if (!text) { + return null; + } + return { + type: "text_delta", + text, + stream: "thought", + }; + } + case "tool_call": + case "tool_call_update": { + const title = + asTrimmedString(update.title) || + asTrimmedString(update.toolCallId) || + asTrimmedString(update.kind) || + "tool"; + const status = asTrimmedString(update.status); + return { + type: "tool_call", + text: status ? `${title} (${status})` : title, + }; + } + case "plan": { + const entries = Array.isArray(update.entries) ? update.entries : []; + const first = entries.find((entry) => isRecord(entry)) as Record | undefined; + const content = asTrimmedString(first?.content); + if (!content) { + return { type: "status", text: "plan updated" }; + } + const status = asTrimmedString(first?.status); + return { + type: "status", + text: status ? `plan: [${status}] ${content}` : `plan: ${content}`, + }; + } + case "available_commands_update": { + const commands = Array.isArray(update.availableCommands) + ? update.availableCommands.length + : 0; + return { + type: "status", + text: `available commands updated (${commands})`, + }; + } + case "current_mode_update": { + const modeId = asTrimmedString(update.currentModeId); + return { + type: "status", + text: modeId ? `mode updated: ${modeId}` : "mode updated", + }; + } + case "config_option_update": { + const options = Array.isArray(update.configOptions) ? update.configOptions.length : 0; + return { + type: "status", + text: `config options updated (${options})`, + }; + } + case "session_info_update": { + const title = asTrimmedString(update.title); + return { + type: "status", + text: title ? `session info updated: ${title}` : "session info updated", + }; + } + case "usage_update": { + const used = + typeof update.used === "number" && Number.isFinite(update.used) ? update.used : null; + const size = + typeof update.size === "number" && Number.isFinite(update.size) ? update.size : null; + if (used == null || size == null) { + return { type: "status", text: "usage updated" }; + } + return { + type: "status", + text: `usage updated: ${used}/${size}`, + }; + } + default: + return null; + } +} + +function shouldHandlePromptResponse(params: { + message: Record; + context?: PromptParseContext; +}): boolean { + const id = normalizeJsonRpcId(params.message.id); + if (!id) { + return false; + } + if (!params.context) { + return true; + } + return params.context.promptRequestIds.has(id); +} + +export function parsePromptEventLine( + line: string, + context?: PromptParseContext, +): AcpRuntimeEvent | null { const trimmed = line.trim(); if (!trimmed) { return null; @@ -61,80 +264,60 @@ export function parsePromptEventLine(line: string): AcpRuntimeEvent | null { return null; } - const type = asTrimmedString(parsed.type); - switch (type) { - case "text": { - const content = asString(parsed.content); - if (content == null || content.length === 0) { - return null; - } - return { - type: "text_delta", - text: content, - stream: "output", - }; - } - case "thought": { - const content = asString(parsed.content); - if (content == null || content.length === 0) { - return null; - } - return { - type: "text_delta", - text: content, - stream: "thought", - }; - } - case "tool_call": { - const title = asTrimmedString(parsed.title) || asTrimmedString(parsed.toolCallId) || "tool"; - const status = asTrimmedString(parsed.status); - return { - type: "tool_call", - text: status ? `${title} (${status})` : title, - }; - } - case "client_operation": { - const method = asTrimmedString(parsed.method) || "operation"; - const status = asTrimmedString(parsed.status); - const summary = asTrimmedString(parsed.summary); - const text = [method, status, summary].filter(Boolean).join(" "); - if (!text) { - return null; - } - return { type: "status", text }; - } - case "plan": { - const entries = Array.isArray(parsed.entries) ? parsed.entries : []; - const first = entries.find((entry) => isRecord(entry)) as Record | undefined; - const content = asTrimmedString(first?.content); - if (!content) { - return null; - } - return { type: "status", text: `plan: ${content}` }; - } - case "update": { - const update = asTrimmedString(parsed.update); - if (!update) { - return null; - } - return { type: "status", text: update }; - } - case "done": { - return { - type: "done", - stopReason: asOptionalString(parsed.stopReason), - }; - } - case "error": { - const message = asTrimmedString(parsed.message) || "acpx runtime error"; + if (!isAcpJsonRpcMessage(parsed)) { + const fallbackError = toAcpxErrorEvent(parsed); + if (fallbackError) { return { type: "error", - message, - code: asOptionalString(parsed.code), - retryable: asOptionalBoolean(parsed.retryable), + message: fallbackError.message, + code: fallbackError.code, + retryable: fallbackError.retryable, }; } - default: - return null; + return null; } + + const updateEvent = parseSessionUpdateEvent(parsed); + if (updateEvent) { + return updateEvent; + } + + if (asTrimmedString(parsed.method) === "session/prompt") { + const id = normalizeJsonRpcId(parsed.id); + if (id && context) { + context.promptRequestIds.add(id); + } + return null; + } + + if (Object.hasOwn(parsed, "error")) { + if (!shouldHandlePromptResponse({ message: parsed, context })) { + return null; + } + const error = isRecord(parsed.error) ? parsed.error : null; + const message = asTrimmedString(error?.message); + const codeValue = error?.code; + return { + type: "error", + message: message || "acpx runtime error", + code: + typeof codeValue === "number" && Number.isFinite(codeValue) + ? String(codeValue) + : asOptionalString(codeValue), + retryable: asOptionalBoolean(error?.retryable), + }; + } + + const stopReason = parsePromptStopReason(parsed); + if (stopReason) { + if (!shouldHandlePromptResponse({ message: parsed, context })) { + return null; + } + return { + type: "done", + stopReason, + }; + } + + return null; } diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index f9096bbc73e..d05c4f0c8aa 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -23,6 +23,13 @@ const writeLog = (entry) => { if (!logPath) return; fs.appendFileSync(logPath, JSON.stringify(entry) + "\n"); }; +const emitJson = (payload) => process.stdout.write(JSON.stringify(payload) + "\n"); +const emitUpdate = (sessionId, update) => + emitJson({ + jsonrpc: "2.0", + method: "session/update", + params: { sessionId, update }, + }); if (args.includes("--version")) { process.stdout.write("mock-acpx ${ACPX_PINNED_VERSION}\\n"); @@ -61,33 +68,33 @@ const setValue = command === "set" ? String(args[commandIndex + 2] || "") : ""; if (command === "sessions" && args[commandIndex + 1] === "ensure") { writeLog({ kind: "ensure", agent, args, sessionName: ensureName }); - process.stdout.write(JSON.stringify({ - type: "session_ensured", + emitJson({ + action: "session_ensured", acpxRecordId: "rec-" + ensureName, acpxSessionId: "sid-" + ensureName, agentSessionId: "inner-" + ensureName, name: ensureName, created: true, - }) + "\n"); + }); process.exit(0); } if (command === "cancel") { writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption }); - process.stdout.write(JSON.stringify({ + emitJson({ acpxSessionId: "sid-" + sessionFromOption, cancelled: true, - }) + "\n"); + }); process.exit(0); } if (command === "set-mode") { writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue }); - process.stdout.write(JSON.stringify({ - type: "mode_set", + emitJson({ + action: "mode_set", acpxSessionId: "sid-" + sessionFromOption, mode: setModeValue, - }) + "\n"); + }); process.exit(0); } @@ -100,148 +107,167 @@ if (command === "set") { key: setKey, value: setValue, }); - process.stdout.write(JSON.stringify({ - type: "config_set", + emitJson({ + action: "config_set", acpxSessionId: "sid-" + sessionFromOption, key: setKey, value: setValue, - }) + "\n"); + }); process.exit(0); } if (command === "status") { writeLog({ kind: "status", agent, args, sessionName: sessionFromOption }); - process.stdout.write(JSON.stringify({ + emitJson({ acpxRecordId: sessionFromOption ? "rec-" + sessionFromOption : null, acpxSessionId: sessionFromOption ? "sid-" + sessionFromOption : null, agentSessionId: sessionFromOption ? "inner-" + sessionFromOption : null, status: sessionFromOption ? "alive" : "no-session", pid: 4242, uptime: 120, - }) + "\n"); + }); process.exit(0); } if (command === "sessions" && args[commandIndex + 1] === "close") { writeLog({ kind: "close", agent, args, sessionName: closeName }); - process.stdout.write(JSON.stringify({ - type: "session_closed", + emitJson({ + action: "session_closed", acpxRecordId: "rec-" + closeName, acpxSessionId: "sid-" + closeName, name: closeName, - }) + "\n"); + }); process.exit(0); } if (command === "prompt") { const stdinText = fs.readFileSync(0, "utf8"); writeLog({ kind: "prompt", agent, args, sessionName: sessionFromOption, stdinText }); - const acpxSessionId = "sid-" + sessionFromOption; + const requestId = "req-1"; + + emitJson({ + jsonrpc: "2.0", + id: 0, + method: "session/load", + params: { + sessionId: sessionFromOption, + cwd: process.cwd(), + mcpServers: [], + }, + }); + emitJson({ + jsonrpc: "2.0", + id: 0, + error: { + code: -32002, + message: "Resource not found", + }, + }); + + emitJson({ + jsonrpc: "2.0", + id: requestId, + method: "session/prompt", + params: { + sessionId: sessionFromOption, + prompt: [ + { + type: "text", + text: stdinText.trim(), + }, + ], + }, + }); if (stdinText.includes("trigger-error")) { - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 0, - stream: "prompt", - type: "error", - code: "RUNTIME", - message: "mock failure", - }) + "\n"); + emitJson({ + jsonrpc: "2.0", + id: requestId, + error: { + code: -32000, + message: "mock failure", + }, + }); process.exit(1); } if (stdinText.includes("split-spacing")) { - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 0, - stream: "prompt", - type: "text", - content: "alpha", - }) + "\n"); - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 1, - stream: "prompt", - type: "text", - content: " beta", - }) + "\n"); - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 2, - stream: "prompt", - type: "text", - content: " gamma", - }) + "\n"); - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 3, - stream: "prompt", - type: "done", - stopReason: "end_turn", - }) + "\n"); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "alpha" }, + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: " beta" }, + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: " gamma" }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); process.exit(0); } - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 0, - stream: "prompt", - type: "thought", - content: "thinking", - }) + "\n"); - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 1, - stream: "prompt", - type: "tool_call", + if (stdinText.includes("double-done")) { + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "ok" }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); + process.exit(0); + } + + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "thinking" }, + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "tool_call", + toolCallId: "tool-1", title: "run-tests", status: "in_progress", - }) + "\n"); - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 2, - stream: "prompt", - type: "text", - content: "echo:" + stdinText.trim(), - }) + "\n"); - process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId, - requestId: "req-1", - seq: 3, - stream: "prompt", - type: "done", - stopReason: "end_turn", - }) + "\n"); + kind: "command", + }); + emitUpdate(sessionFromOption, { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "echo:" + stdinText.trim() }, + }); + emitJson({ + jsonrpc: "2.0", + id: requestId, + result: { + stopReason: "end_turn", + }, + }); process.exit(0); } writeLog({ kind: "unknown", args }); -process.stdout.write(JSON.stringify({ - eventVersion: 1, - acpxSessionId: "unknown", - seq: 0, - stream: "control", - type: "error", - code: "USAGE", - message: "unknown command", -}) + "\n"); +emitJson({ + error: { + code: "USAGE", + message: "unknown command", + }, +}); process.exit(2); `; @@ -444,6 +470,28 @@ describe("AcpxRuntime", () => { expect(textDeltas.join("")).toBe("alpha beta gamma"); }); + it("emits done once when ACP stream repeats stop reason responses", async () => { + const { runtime } = await createMockRuntime(); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:acp:double-done", + agent: "codex", + mode: "persistent", + }); + + const events = []; + for await (const event of runtime.runTurn({ + handle, + text: "double-done", + mode: "prompt", + requestId: "req-double-done", + })) { + events.push(event); + } + + const doneCount = events.filter((event) => event.type === "done").length; + expect(doneCount).toBe(1); + }); + it("maps acpx error events into ACP runtime error events", async () => { const { runtime } = await createMockRuntime(); const handle = await runtime.ensureSession({ @@ -465,7 +513,7 @@ describe("AcpxRuntime", () => { expect(events).toContainEqual({ type: "error", message: "mock failure", - code: "RUNTIME", + code: "-32000", retryable: undefined, }); }); diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index 1256c8903c3..c0d2cca94bc 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -197,6 +197,9 @@ export class AcpxRuntime implements AcpRuntime { sessionName: state.name, cwd: state.cwd, }); + const parseContext = { + promptRequestIds: new Set(), + }; const cancelOnAbort = async () => { await this.cancel({ @@ -238,11 +241,14 @@ export class AcpxRuntime implements AcpRuntime { const lines = createInterface({ input: child.stdout }); try { for await (const line of lines) { - const parsed = parsePromptEventLine(line); + const parsed = parsePromptEventLine(line, parseContext); if (!parsed) { continue; } if (parsed.type === "done") { + if (sawDone) { + continue; + } sawDone = true; } if (parsed.type === "error") {