diff --git a/extensions/acpx/src/runtime.test.ts b/extensions/acpx/src/runtime.test.ts index 5c94bffa647..29b23a6444e 100644 --- a/extensions/acpx/src/runtime.test.ts +++ b/extensions/acpx/src/runtime.test.ts @@ -5,7 +5,11 @@ import { pathToFileURL } from "node:url"; import { afterEach, describe, expect, it, vi } from "vitest"; import { runAcpRuntimeAdapterContract } from "../../../src/acp/runtime/adapter-contract.testkit.js"; import { resolveAcpxPluginConfig } from "./config.js"; -import { AcpxRuntime, decodeAcpxRuntimeHandleState } from "./runtime.js"; +import { + AcpxRuntime, + decodeAcpxRuntimeHandleState, + encodeAcpxRuntimeHandleState, +} from "./runtime.js"; import { cleanupMockRuntimeFixtures, createMockRuntimeFixture, @@ -452,9 +456,10 @@ describe("AcpxRuntime", () => { it("serializes text plus image attachments into ACP prompt blocks", async () => { const { runtime, logPath } = await createMockRuntimeFixture(); + const sessionKey = "agent:codex:acp:with-image"; const handle = await runtime.ensureSession({ - sessionKey: "agent:codex:acp:with-image", + sessionKey, agent: "codex", mode: "persistent", }); @@ -472,7 +477,7 @@ describe("AcpxRuntime", () => { const logs = await readMockRuntimeLogEntries(logPath); const prompt = logs.find( (entry) => - entry.kind === "prompt" && String(entry.sessionName ?? "") === "agent:codex:acp:with-image", + entry.kind === "prompt" && String(entry.sessionName ?? "") === handle.agentSessionId, ); expect(prompt).toBeDefined(); @@ -489,8 +494,9 @@ describe("AcpxRuntime", () => { try { const { runtime, logPath } = await createMockRuntimeFixture(); + const sessionKey = "agent:codex:acp:custom-env"; const handle = await runtime.ensureSession({ - sessionKey: "agent:codex:acp:custom-env", + sessionKey, agent: "codex", mode: "persistent", }); @@ -507,8 +513,7 @@ describe("AcpxRuntime", () => { const logs = await readMockRuntimeLogEntries(logPath); const prompt = logs.find( (entry) => - entry.kind === "prompt" && - String(entry.sessionName ?? "") === "agent:codex:acp:custom-env", + entry.kind === "prompt" && String(entry.sessionName ?? "") === handle.agentSessionId, ); expect(prompt?.openaiApiKey).toBe("openai-secret"); expect(prompt?.githubToken).toBe("gh-secret"); @@ -546,7 +551,7 @@ describe("AcpxRuntime", () => { const logs = await readMockRuntimeLogEntries(String(activeLogPath)); const prompt = logs.find( (entry) => - entry.kind === "prompt" && String(entry.sessionName ?? "") === "agent:codex:acp:space", + entry.kind === "prompt" && String(entry.sessionName ?? "") === handle.agentSessionId, ); expect(prompt).toBeDefined(); const promptArgs = (prompt?.args as string[]) ?? []; @@ -654,7 +659,7 @@ describe("AcpxRuntime", () => { const logs = await readMockRuntimeLogEntries(logPath); const cancel = logs.find((entry) => entry.kind === "cancel"); const close = logs.find((entry) => entry.kind === "close"); - expect(cancel?.sessionName).toBe("agent:claude:acp:789"); + expect(cancel?.sessionName).toBe(handle.agentSessionId); expect(close?.sessionName).toBe("agent:claude:acp:789"); }); @@ -1014,4 +1019,176 @@ describe("AcpxRuntime", () => { delete process.env.MOCK_ACPX_NEW_EMPTY; } }); + + it("stores agent session IDs returned after prompt-time load fallback and reuses them", async () => { + process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID = "1"; + process.env.MOCK_ACPX_PROMPT_LOAD_INVALID = "1"; + process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID = "gemini-session-123"; + try { + const { runtime, logPath } = await createMockRuntimeFixture(); + const sessionKey = "agent:gemini:acp:load-fallback"; + const handle = await runtime.ensureSession({ + sessionKey, + agent: "gemini", + mode: "persistent", + }); + + expect(handle.agentSessionId).toBeUndefined(); + + for await (const _event of runtime.runTurn({ + handle, + text: "first turn", + mode: "prompt", + requestId: "req-gemini-1", + })) { + // Drain the prompt stream so the fallback session id can be captured. + } + + expect(handle.agentSessionId).toBe("gemini-session-123"); + expect(decodeAcpxRuntimeHandleState(handle.runtimeSessionName)?.agentSessionId).toBe( + "gemini-session-123", + ); + + const status = await runtime.getStatus({ handle }); + expect(status.agentSessionId).toBe("gemini-session-123"); + + for await (const _event of runtime.runTurn({ + handle, + text: "second turn", + mode: "prompt", + requestId: "req-gemini-2", + })) { + // The second turn should reuse the learned agent session id. + } + + const promptEntries = (await readMockRuntimeLogEntries(logPath)).filter( + (entry) => entry.kind === "prompt", + ); + expect(promptEntries).toHaveLength(2); + expect(promptEntries[0]?.sessionName).toBe(sessionKey); + expect(promptEntries[1]?.sessionName).toBe("gemini-session-123"); + } finally { + delete process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID; + delete process.env.MOCK_ACPX_PROMPT_LOAD_INVALID; + delete process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID; + } + }); + + it("prefers decoded runtime session identifiers over stale handle fallbacks", async () => { + const { runtime, logPath } = await createMockRuntimeFixture(); + const sessionKey = "agent:gemini:acp:stale-handle-fallback"; + const handle = await runtime.ensureSession({ + sessionKey, + agent: "gemini", + mode: "persistent", + }); + + const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName); + expect(decoded).not.toBeNull(); + + handle.runtimeSessionName = encodeAcpxRuntimeHandleState({ + ...decoded!, + backendSessionId: "sid-decoded-gemini-session", + agentSessionId: "decoded-gemini-session", + }); + handle.backendSessionId = "sid-stale-gemini-session"; + handle.agentSessionId = "stale-gemini-session"; + + await runtime.getStatus({ handle }); + + const statusEntries = (await readMockRuntimeLogEntries(logPath)).filter( + (entry) => entry.kind === "status", + ); + expect(statusEntries.length).toBeGreaterThan(0); + expect(statusEntries.at(-1)?.sessionName).toBe("decoded-gemini-session"); + }); + + it("refreshes encoded runtime session identifiers after status learns a newer agent id", async () => { + const { runtime, logPath } = await createMockRuntimeFixture(); + const sessionKey = "agent:gemini:acp:status-refresh"; + const handle = await runtime.ensureSession({ + sessionKey, + agent: "gemini", + mode: "persistent", + }); + + const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName); + expect(decoded).not.toBeNull(); + + handle.runtimeSessionName = encodeAcpxRuntimeHandleState({ + ...decoded!, + agentSessionId: "decoded-gemini-session", + }); + handle.agentSessionId = "fresh-gemini-session"; + + const statePath = process.env.MOCK_ACPX_STATE; + expect(statePath).toBeTruthy(); + fs.writeFileSync( + String(statePath), + JSON.stringify({ + byName: { + [sessionKey]: { + acpxRecordId: `rec-${sessionKey}`, + acpxSessionId: `sid-${sessionKey}`, + agentSessionId: "fresh-gemini-session", + }, + }, + byAgentSessionId: { + "decoded-gemini-session": sessionKey, + "fresh-gemini-session": sessionKey, + }, + }), + "utf8", + ); + + const status = await runtime.getStatus({ handle }); + expect(status.agentSessionId).toBe("fresh-gemini-session"); + expect(handle.agentSessionId).toBe("fresh-gemini-session"); + expect(decodeAcpxRuntimeHandleState(handle.runtimeSessionName)?.agentSessionId).toBe( + "fresh-gemini-session", + ); + + await runtime.cancel({ handle, reason: "test-refresh" }); + + const statusEntries = (await readMockRuntimeLogEntries(logPath)).filter( + (entry) => entry.kind === "status", + ); + const cancelEntries = (await readMockRuntimeLogEntries(logPath)).filter( + (entry) => entry.kind === "cancel", + ); + expect(statusEntries.at(-1)?.sessionName).toBe("decoded-gemini-session"); + expect(cancelEntries.at(-1)?.sessionName).toBe("fresh-gemini-session"); + }); + + it("does not promote session/update params.sessionId into the runtime handle", async () => { + process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID = "1"; + process.env.MOCK_ACPX_PROMPT_OMIT_LOAD_RESULT = "1"; + try { + const { runtime } = await createMockRuntimeFixture(); + const handle = await runtime.ensureSession({ + sessionKey: "agent:codex:acp:update-echo", + agent: "codex", + mode: "persistent", + }); + + expect(handle.agentSessionId).toBeUndefined(); + + for await (const _event of runtime.runTurn({ + handle, + text: "session-update-echo", + mode: "prompt", + requestId: "req-session-update-echo", + })) { + // Drain the prompt stream so any stray identifier promotion would be applied. + } + + expect(handle.agentSessionId).toBeUndefined(); + expect( + decodeAcpxRuntimeHandleState(handle.runtimeSessionName)?.agentSessionId, + ).toBeUndefined(); + } finally { + delete process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID; + delete process.env.MOCK_ACPX_PROMPT_OMIT_LOAD_RESULT; + } + }); }); diff --git a/extensions/acpx/src/runtime.ts b/extensions/acpx/src/runtime.ts index bf00185ff3e..b5a0b54bf58 100644 --- a/extensions/acpx/src/runtime.ts +++ b/extensions/acpx/src/runtime.ts @@ -52,6 +52,12 @@ const ACPX_CAPABILITIES: AcpRuntimeCapabilities = { controls: ["session/set_mode", "session/set_config_option", "session/status"], }; +type AcpxSessionIdentifiers = { + acpxRecordId?: string; + backendSessionId?: string; + agentSessionId?: string; +}; + type AcpxHealthCheckResult = | { ok: true; @@ -166,6 +172,111 @@ function findSessionIdentifierEvent(events: AcpxJsonObject[]): AcpxJsonObject | ); } +function resolveSessionIdentifiersFromEvent( + event: AcpxJsonObject | undefined, +): AcpxSessionIdentifiers { + if (!event) { + return {}; + } + const acpxRecordId = asOptionalString(event.acpxRecordId); + const backendSessionId = asOptionalString(event.acpxSessionId); + const agentSessionId = asOptionalString(event.agentSessionId); + return { + ...(acpxRecordId ? { acpxRecordId } : {}), + ...(backendSessionId ? { backendSessionId } : {}), + ...(agentSessionId ? { agentSessionId } : {}), + }; +} + +function hasSessionIdentifiers(identifiers: AcpxSessionIdentifiers): boolean { + return Boolean( + identifiers.acpxRecordId || identifiers.backendSessionId || identifiers.agentSessionId, + ); +} + +function parsePromptProtocolEvent(line: string): AcpxJsonObject | null { + const trimmed = line.trim(); + if (!trimmed) { + return null; + } + try { + const parsed = JSON.parse(trimmed) as unknown; + return isRecord(parsed) ? parsed : null; + } catch { + return null; + } +} + +function extractPromptSessionIdentifiers(line: string): AcpxSessionIdentifiers { + const parsed = parsePromptProtocolEvent(line); + if (!parsed) { + return {}; + } + + const direct = resolveSessionIdentifiersFromEvent(parsed); + if (hasSessionIdentifiers(direct)) { + return direct; + } + + if (isRecord(parsed.result)) { + // Prompt turns only emit result.sessionId from session/load and session/new responses today, + // so an unrestricted capture remains scoped to backend-issued session identifiers. + const agentSessionId = asOptionalString(parsed.result.sessionId); + if (agentSessionId) { + return { agentSessionId }; + } + } + + return {}; +} + +function mergeHandleStateWithIdentifiers( + state: AcpxHandleState, + identifiers: AcpxSessionIdentifiers, +): AcpxHandleState { + const acpxRecordId = identifiers.acpxRecordId ?? state.acpxRecordId; + const backendSessionId = identifiers.backendSessionId ?? state.backendSessionId; + const agentSessionId = identifiers.agentSessionId ?? state.agentSessionId; + return { + ...state, + ...(acpxRecordId ? { acpxRecordId } : {}), + ...(backendSessionId ? { backendSessionId } : {}), + ...(agentSessionId ? { agentSessionId } : {}), + }; +} + +function mergeHandleStateWithFallbackIdentifiers( + state: AcpxHandleState, + identifiers: AcpxSessionIdentifiers, +): AcpxHandleState { + const acpxRecordId = state.acpxRecordId ?? identifiers.acpxRecordId; + const backendSessionId = state.backendSessionId ?? identifiers.backendSessionId; + const agentSessionId = state.agentSessionId ?? identifiers.agentSessionId; + return { + ...state, + ...(acpxRecordId ? { acpxRecordId } : {}), + ...(backendSessionId ? { backendSessionId } : {}), + ...(agentSessionId ? { agentSessionId } : {}), + }; +} + +function writeHandleState(handle: AcpRuntimeHandle, state: AcpxHandleState): void { + handle.runtimeSessionName = encodeAcpxRuntimeHandleState(state); + if (state.acpxRecordId) { + handle.acpxRecordId = state.acpxRecordId; + } + if (state.backendSessionId) { + handle.backendSessionId = state.backendSessionId; + } + if (state.agentSessionId) { + handle.agentSessionId = state.agentSessionId; + } +} + +function resolveInteractiveSessionReference(state: AcpxHandleState): string { + return state.agentSessionId ?? state.name; +} + export function encodeAcpxRuntimeHandleState(state: AcpxHandleState): string { const payload = Buffer.from(JSON.stringify(state), "utf8").toString("base64url"); return `${ACPX_RUNTIME_HANDLE_PREFIX}${payload}`; @@ -647,11 +758,10 @@ export class AcpxRuntime implements AcpRuntime { ); } - const acpxRecordId = ensuredEvent ? asOptionalString(ensuredEvent.acpxRecordId) : undefined; - const agentSessionId = ensuredEvent ? asOptionalString(ensuredEvent.agentSessionId) : undefined; - const backendSessionId = ensuredEvent - ? asOptionalString(ensuredEvent.acpxSessionId) - : undefined; + const identifiers = resolveSessionIdentifiersFromEvent(ensuredEvent); + const acpxRecordId = identifiers.acpxRecordId; + const agentSessionId = identifiers.agentSessionId; + const backendSessionId = identifiers.backendSessionId; return { sessionKey: input.sessionKey, @@ -673,10 +783,10 @@ export class AcpxRuntime implements AcpRuntime { } async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable { - const state = this.resolveHandleState(input.handle); + let state = this.resolveHandleState(input.handle); const args = await this.buildPromptArgs({ agent: state.agent, - sessionName: state.name, + sessionName: resolveInteractiveSessionReference(state), cwd: state.cwd, }); @@ -737,6 +847,11 @@ export class AcpxRuntime implements AcpRuntime { const lines = createInterface({ input: child.stdout }); try { for await (const line of lines) { + const promptIdentifiers = extractPromptSessionIdentifiers(line); + if (hasSessionIdentifiers(promptIdentifiers)) { + state = mergeHandleStateWithIdentifiers(state, promptIdentifiers); + writeHandleState(input.handle, state); + } const parsed = parsePromptEventLine(line); if (!parsed) { continue; @@ -813,7 +928,7 @@ export class AcpxRuntime implements AcpRuntime { const args = await this.buildVerbArgs({ agent: state.agent, cwd: state.cwd, - command: ["status", "--session", state.name], + command: ["status", "--session", resolveInteractiveSessionReference(state)], }); const events = await this.runControlCommand({ args, @@ -832,6 +947,14 @@ export class AcpxRuntime implements AcpRuntime { const acpxRecordId = asOptionalString(detail.acpxRecordId); const acpxSessionId = asOptionalString(detail.acpxSessionId); const agentSessionId = asOptionalString(detail.agentSessionId); + const refreshedIdentifiers: AcpxSessionIdentifiers = { + ...(acpxRecordId ? { acpxRecordId } : {}), + ...(acpxSessionId ? { backendSessionId: acpxSessionId } : {}), + ...(agentSessionId ? { agentSessionId } : {}), + }; + if (hasSessionIdentifiers(refreshedIdentifiers)) { + writeHandleState(input.handle, mergeHandleStateWithIdentifiers(state, refreshedIdentifiers)); + } const pid = typeof detail.pid === "number" && Number.isFinite(detail.pid) ? detail.pid : null; const summary = [ `status=${status}`, @@ -859,7 +982,7 @@ export class AcpxRuntime implements AcpRuntime { const args = await this.buildVerbArgs({ agent: state.agent, cwd: state.cwd, - command: ["set-mode", mode, "--session", state.name], + command: ["set-mode", mode, "--session", resolveInteractiveSessionReference(state)], }); await this.runControlCommand({ args, @@ -882,7 +1005,7 @@ export class AcpxRuntime implements AcpRuntime { const args = await this.buildVerbArgs({ agent: state.agent, cwd: state.cwd, - command: ["set", key, value, "--session", state.name], + command: ["set", key, value, "--session", resolveInteractiveSessionReference(state)], }); await this.runControlCommand({ args, @@ -971,7 +1094,7 @@ export class AcpxRuntime implements AcpRuntime { const args = await this.buildVerbArgs({ agent: state.agent, cwd: state.cwd, - command: ["cancel", "--session", state.name], + command: ["cancel", "--session", resolveInteractiveSessionReference(state)], }); await this.runControlCommand({ args, @@ -999,7 +1122,13 @@ export class AcpxRuntime implements AcpRuntime { private resolveHandleState(handle: AcpRuntimeHandle): AcpxHandleState { const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName); if (decoded) { - return decoded; + // Prefer the encoded runtime state over legacy handle fields so stale + // fallback metadata does not resurrect older session identifiers. + return mergeHandleStateWithFallbackIdentifiers(decoded, { + acpxRecordId: asOptionalString((handle as { acpxRecordId?: unknown }).acpxRecordId), + backendSessionId: asOptionalString(handle.backendSessionId), + agentSessionId: asOptionalString(handle.agentSessionId), + }); } const legacyName = asTrimmedString(handle.runtimeSessionName); @@ -1015,6 +1144,17 @@ export class AcpxRuntime implements AcpRuntime { agent: deriveAgentFromSessionKey(handle.sessionKey, DEFAULT_AGENT_FALLBACK), cwd: this.config.cwd, mode: "persistent", + ...(asOptionalString((handle as { acpxRecordId?: unknown }).acpxRecordId) + ? { + acpxRecordId: asOptionalString((handle as { acpxRecordId?: unknown }).acpxRecordId), + } + : {}), + ...(asOptionalString(handle.backendSessionId) + ? { backendSessionId: asOptionalString(handle.backendSessionId) } + : {}), + ...(asOptionalString(handle.agentSessionId) + ? { agentSessionId: asOptionalString(handle.agentSessionId) } + : {}), }; } diff --git a/extensions/acpx/src/test-utils/runtime-fixtures.ts b/extensions/acpx/src/test-utils/runtime-fixtures.ts index 8d5b66b08f1..412343c603a 100644 --- a/extensions/acpx/src/test-utils/runtime-fixtures.ts +++ b/extensions/acpx/src/test-utils/runtime-fixtures.ts @@ -19,10 +19,14 @@ let logFileSequence = 0; const MOCK_CLI_SCRIPT = String.raw`#!/usr/bin/env node const fs = require("node:fs"); +const path = require("node:path"); (async () => { const args = process.argv.slice(2); const logPath = process.env.MOCK_ACPX_LOG; +const statePath = + process.env.MOCK_ACPX_STATE || + path.join(path.dirname(logPath || process.cwd()), "mock-acpx-state.json"); const openclawShell = process.env.OPENCLAW_SHELL || ""; const writeLog = (entry) => { if (!logPath) return; @@ -41,6 +45,95 @@ const emitUpdate = (sessionId, update) => method: "session/update", params: { sessionId, update }, }); +const readState = () => { + try { + const raw = fs.readFileSync(statePath, "utf8"); + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) { + return { + byName: + parsed.byName && typeof parsed.byName === "object" && !Array.isArray(parsed.byName) + ? parsed.byName + : {}, + byAgentSessionId: + parsed.byAgentSessionId && + typeof parsed.byAgentSessionId === "object" && + !Array.isArray(parsed.byAgentSessionId) + ? parsed.byAgentSessionId + : {}, + }; + } + } catch {} + return { byName: {}, byAgentSessionId: {} }; +}; +const writeState = (state) => { + fs.writeFileSync(statePath, JSON.stringify(state), "utf8"); +}; +const defaultAgentSessionIdForName = (name) => { + if (process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID === "1") { + return ""; + } + const prefix = process.env.MOCK_ACPX_AGENT_SESSION_PREFIX || "inner-"; + return prefix + name; +}; +const cleanupAgentLookup = (state, name) => { + for (const [sessionId, mappedName] of Object.entries(state.byAgentSessionId)) { + if (mappedName === name) { + delete state.byAgentSessionId[sessionId]; + } + } +}; +const storeSessionByName = (name, overrides = {}) => { + const state = readState(); + const existing = state.byName[name] && typeof state.byName[name] === "object" ? state.byName[name] : {}; + const next = { + acpxRecordId: "rec-" + name, + acpxSessionId: "sid-" + name, + agentSessionId: defaultAgentSessionIdForName(name), + ...existing, + ...overrides, + }; + if (!next.acpxRecordId) { + next.acpxRecordId = "rec-" + name; + } + if (!next.acpxSessionId) { + next.acpxSessionId = "sid-" + name; + } + cleanupAgentLookup(state, name); + state.byName[name] = next; + if (next.agentSessionId) { + state.byAgentSessionId[next.agentSessionId] = name; + } + writeState(state); + return { name, ...next }; +}; +const findSessionByReference = (reference) => { + if (!reference) { + return null; + } + const state = readState(); + const byName = state.byName[reference]; + if (byName && typeof byName === "object") { + return { name: reference, ...byName }; + } + const mappedName = state.byAgentSessionId[reference]; + if (mappedName) { + const mapped = state.byName[mappedName]; + if (mapped && typeof mapped === "object") { + return { name: mappedName, ...mapped }; + } + } + for (const [name, session] of Object.entries(state.byName)) { + if (!session || typeof session !== "object") { + continue; + } + if (session.acpxSessionId === reference) { + return { name, ...session }; + } + } + return null; +}; +const resolveSession = (reference) => findSessionByReference(reference) || storeSessionByName(reference); if (args.includes("--version")) { return emitTextAndExit("mock-acpx ${ACPX_PINNED_VERSION}\\n"); @@ -74,6 +167,7 @@ const readFlag = (flag) => { const sessionFromOption = readFlag("--session"); const ensureName = readFlag("--name"); +const resumeSessionId = readFlag("--resume-session"); const closeName = command === "sessions" && args[commandIndex + 1] === "close" ? String(args[commandIndex + 2] || "") @@ -88,6 +182,7 @@ if (command === "sessions" && args[commandIndex + 1] === "ensure") { process.stderr.write(String(process.env.MOCK_ACPX_ENSURE_STDERR) + "\n"); } if (process.env.MOCK_ACPX_ENSURE_EXIT_1 === "1") { + storeSessionByName(ensureName, resumeSessionId ? { agentSessionId: resumeSessionId } : {}); return emitJsonAndExit({ jsonrpc: "2.0", id: null, @@ -100,11 +195,12 @@ if (command === "sessions" && args[commandIndex + 1] === "ensure") { if (process.env.MOCK_ACPX_ENSURE_EMPTY === "1") { emitJson({ action: "session_ensured", name: ensureName }); } else { + const session = storeSessionByName(ensureName, resumeSessionId ? { agentSessionId: resumeSessionId } : {}); emitJson({ action: "session_ensured", - acpxRecordId: "rec-" + ensureName, - acpxSessionId: "sid-" + ensureName, - agentSessionId: "inner-" + ensureName, + acpxRecordId: session.acpxRecordId, + acpxSessionId: session.acpxSessionId, + ...(session.agentSessionId ? { agentSessionId: session.agentSessionId } : {}), name: ensureName, created: true, }); @@ -131,11 +227,12 @@ if (command === "sessions" && args[commandIndex + 1] === "new") { if (process.env.MOCK_ACPX_NEW_EMPTY === "1") { emitJson({ action: "session_created", name: ensureName }); } else { + const session = storeSessionByName(ensureName, resumeSessionId ? { agentSessionId: resumeSessionId } : {}); emitJson({ action: "session_created", - acpxRecordId: "rec-" + ensureName, - acpxSessionId: "sid-" + ensureName, - agentSessionId: "inner-" + ensureName, + acpxRecordId: session.acpxRecordId, + acpxSessionId: session.acpxSessionId, + ...(session.agentSessionId ? { agentSessionId: session.agentSessionId } : {}), name: ensureName, created: true, }); @@ -172,23 +269,26 @@ if (command === "config" && args[commandIndex + 1] === "show") { } if (command === "cancel") { + const session = findSessionByReference(sessionFromOption); writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption }); return emitJsonAndExit({ - acpxSessionId: "sid-" + sessionFromOption, + acpxSessionId: session ? session.acpxSessionId : "sid-" + sessionFromOption, cancelled: true, }); } if (command === "set-mode") { + const session = findSessionByReference(sessionFromOption); writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue }); return emitJsonAndExit({ action: "mode_set", - acpxSessionId: "sid-" + sessionFromOption, + acpxSessionId: session ? session.acpxSessionId : "sid-" + sessionFromOption, mode: setModeValue, }); } if (command === "set") { + const session = findSessionByReference(sessionFromOption); writeLog({ kind: "set", agent, @@ -199,7 +299,7 @@ if (command === "set") { }); emitJson({ action: "config_set", - acpxSessionId: "sid-" + sessionFromOption, + acpxSessionId: session ? session.acpxSessionId : "sid-" + sessionFromOption, key: setKey, value: setValue, }); @@ -208,6 +308,7 @@ if (command === "set") { } if (command === "status") { + const session = findSessionByReference(sessionFromOption); writeLog({ kind: "status", agent, args, sessionName: sessionFromOption }); if (process.env.MOCK_ACPX_STATUS_SIGNAL) { process.kill(process.pid, process.env.MOCK_ACPX_STATUS_SIGNAL); @@ -216,9 +317,9 @@ if (command === "status") { const summary = process.env.MOCK_ACPX_STATUS_SUMMARY || ""; const omitStatusIds = process.env.MOCK_ACPX_STATUS_NO_IDS === "1"; emitJson({ - acpxRecordId: sessionFromOption && !omitStatusIds ? "rec-" + sessionFromOption : null, - acpxSessionId: sessionFromOption && !omitStatusIds ? "sid-" + sessionFromOption : null, - agentSessionId: sessionFromOption && !omitStatusIds ? "inner-" + sessionFromOption : null, + acpxRecordId: !omitStatusIds && session ? session.acpxRecordId : null, + acpxSessionId: !omitStatusIds && session ? session.acpxSessionId : null, + agentSessionId: !omitStatusIds && session ? session.agentSessionId || null : null, status, ...(summary ? { summary } : {}), pid: 4242, @@ -229,17 +330,19 @@ if (command === "status") { } if (command === "sessions" && args[commandIndex + 1] === "close") { + const session = findSessionByReference(closeName) || storeSessionByName(closeName); writeLog({ kind: "close", agent, args, sessionName: closeName }); return emitJsonAndExit({ action: "session_closed", - acpxRecordId: "rec-" + closeName, - acpxSessionId: "sid-" + closeName, + acpxRecordId: session.acpxRecordId, + acpxSessionId: session.acpxSessionId, name: closeName, }); } if (command === "prompt") { const stdinText = fs.readFileSync(0, "utf8"); + const session = resolveSession(sessionFromOption); writeLog({ kind: "prompt", agent, @@ -251,6 +354,7 @@ if (command === "prompt") { githubToken: process.env.GITHUB_TOKEN || "", }); const requestId = "req-1"; + let activeSessionId = session.agentSessionId || sessionFromOption; emitJson({ jsonrpc: "2.0", @@ -262,21 +366,50 @@ if (command === "prompt") { mcpServers: [], }, }); - emitJson({ - jsonrpc: "2.0", - id: 0, - error: { - code: -32002, - message: "Resource not found", - }, - }); + + const shouldRejectLoad = + process.env.MOCK_ACPX_PROMPT_LOAD_INVALID === "1" && + (!session.agentSessionId || sessionFromOption !== session.agentSessionId); + if (shouldRejectLoad) { + const nextAgentSessionId = + process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID || "agent-fallback-" + session.name; + const refreshed = storeSessionByName(session.name, { + agentSessionId: nextAgentSessionId, + }); + emitJson({ + jsonrpc: "2.0", + id: 0, + error: { + code: -32002, + message: "Invalid session identifier", + }, + }); + emitJson({ + jsonrpc: "2.0", + id: 0, + result: { + sessionId: nextAgentSessionId, + }, + }); + activeSessionId = refreshed.agentSessionId || nextAgentSessionId; + } else { + if (process.env.MOCK_ACPX_PROMPT_OMIT_LOAD_RESULT !== "1") { + emitJson({ + jsonrpc: "2.0", + id: 0, + result: { + sessionId: activeSessionId, + }, + }); + } + } emitJson({ jsonrpc: "2.0", id: requestId, method: "session/prompt", params: { - sessionId: sessionFromOption, + sessionId: activeSessionId, prompt: [ { type: "text", @@ -304,15 +437,15 @@ if (command === "prompt") { } if (stdinText.includes("split-spacing")) { - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: "alpha" }, }); - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: " beta" }, }); - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: " gamma" }, }); @@ -322,7 +455,7 @@ if (command === "prompt") { } if (stdinText.includes("double-done")) { - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: "ok" }, }); @@ -332,18 +465,18 @@ if (command === "prompt") { return; } - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "agent_thought_chunk", content: { type: "text", text: "thinking" }, }); - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "tool_call", toolCallId: "tool-1", title: "run-tests", status: "in_progress", kind: "command", }); - emitUpdate(sessionFromOption, { + emitUpdate(activeSessionId, { sessionUpdate: "agent_message_chunk", content: { type: "text", text: "echo:" + stdinText.trim() }, }); @@ -376,7 +509,9 @@ export async function createMockRuntimeFixture(params?: { const scriptPath = await ensureMockCliScriptPath(); const dir = path.dirname(scriptPath); const logPath = path.join(dir, `calls-${logFileSequence++}.log`); + const statePath = path.join(dir, `state-${logFileSequence - 1}.json`); process.env.MOCK_ACPX_LOG = logPath; + process.env.MOCK_ACPX_STATE = statePath; const config: ResolvedAcpxPluginConfig = { command: scriptPath, @@ -435,11 +570,18 @@ export async function readMockRuntimeLogEntries( export async function cleanupMockRuntimeFixtures(): Promise { delete process.env.MOCK_ACPX_LOG; + delete process.env.MOCK_ACPX_STATE; delete process.env.MOCK_ACPX_CONFIG_SHOW_AGENTS; delete process.env.MOCK_ACPX_ENSURE_ERROR_MESSAGE; delete process.env.MOCK_ACPX_ENSURE_EXIT_1; delete process.env.MOCK_ACPX_ENSURE_STDERR; delete process.env.MOCK_ACPX_NEW_FAIL_ON_RESUME; + delete process.env.MOCK_ACPX_ENSURE_EMPTY; + delete process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID; + delete process.env.MOCK_ACPX_NEW_EMPTY; + delete process.env.MOCK_ACPX_AGENT_SESSION_PREFIX; + delete process.env.MOCK_ACPX_PROMPT_LOAD_INVALID; + delete process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID; delete process.env.MOCK_ACPX_STATUS_STATUS; delete process.env.MOCK_ACPX_STATUS_NO_IDS; delete process.env.MOCK_ACPX_STATUS_SUMMARY; diff --git a/src/acp/control-plane/manager.core.ts b/src/acp/control-plane/manager.core.ts index d4c52f3d534..c03ff37e405 100644 --- a/src/acp/control-plane/manager.core.ts +++ b/src/acp/control-plane/manager.core.ts @@ -1354,6 +1354,7 @@ export class AcpSessionManager { const runtime = backend.runtime; const previousMeta = params.meta; const previousIdentity = resolveSessionIdentityFromMeta(previousMeta); + let identityForEnsure = previousIdentity; const persistedResumeSessionId = mode === "persistent" ? resolveRuntimeResumeSessionId(previousIdentity) : undefined; const ensureSession = async (resumeSessionId?: string) => @@ -1385,6 +1386,19 @@ export class AcpSessionManager { logVerbose( `acp-manager: resume init failed for ${params.sessionKey}; retrying without persisted ACP session id: ${acpError.message}`, ); + if (identityForEnsure) { + const { + acpxSessionId: _staleAcpxSessionId, + agentSessionId: _staleAgentSessionId, + ...retryIdentity + } = identityForEnsure; + // The persisted resume identifiers already failed, so do not merge them back into the + // fresh named-session handle returned by the retry path. + identityForEnsure = { + ...retryIdentity, + state: "pending", + }; + } ensured = await ensureSession(); } } else { @@ -1399,13 +1413,13 @@ export class AcpSessionManager { }); const nextIdentity = mergeSessionIdentity({ - current: previousIdentity, + current: identityForEnsure, incoming: createIdentityFromEnsure({ handle: ensured, now, }), now, - }) ?? previousIdentity; + }) ?? identityForEnsure; const nextHandleIdentifiers = resolveRuntimeHandleIdentifiersFromIdentity(nextIdentity); const nextHandle: AcpRuntimeHandle = { ...ensured, diff --git a/src/acp/control-plane/manager.identity-reconcile.ts b/src/acp/control-plane/manager.identity-reconcile.ts index d78a22ea04f..45c9817967b 100644 --- a/src/acp/control-plane/manager.identity-reconcile.ts +++ b/src/acp/control-plane/manager.identity-reconcile.ts @@ -2,6 +2,7 @@ import type { OpenClawConfig } from "../../config/config.js"; import { logVerbose } from "../../globals.js"; import { withAcpRuntimeErrorBoundary } from "../runtime/errors.js"; import { + createIdentityFromHandleEvent, createIdentityFromStatus, identityEquals, mergeSessionIdentity, @@ -63,15 +64,25 @@ export async function reconcileManagerRuntimeSessionIdentifiers(params: { const now = Date.now(); const currentIdentity = resolveSessionIdentityFromMeta(params.meta); - const nextIdentity = + const eventIdentity = createIdentityFromHandleEvent({ + handle: params.handle, + now, + }); + const identityAfterEvent = mergeSessionIdentity({ current: currentIdentity, + incoming: eventIdentity, + now, + }) ?? currentIdentity; + const nextIdentity = + mergeSessionIdentity({ + current: identityAfterEvent, incoming: createIdentityFromStatus({ status: runtimeStatus, now, }), now, - }) ?? currentIdentity; + }) ?? identityAfterEvent; const handleIdentifiers = resolveRuntimeHandleIdentifiersFromIdentity(nextIdentity); const handleChanged = handleIdentifiers.backendSessionId !== params.handle.backendSessionId || diff --git a/src/acp/control-plane/manager.test.ts b/src/acp/control-plane/manager.test.ts index 028d0ad4ff9..241a2145ffb 100644 --- a/src/acp/control-plane/manager.test.ts +++ b/src/acp/control-plane/manager.test.ts @@ -843,6 +843,51 @@ describe("AcpSessionManager", () => { ); }); + it("prefers the persisted agent session id when reopening an ACP runtime after restart", async () => { + const runtimeState = createRuntime(); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + const sessionKey = "agent:gemini:acp:binding:discord:default:restart"; + hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => { + const key = (paramsUnknown as { sessionKey?: string }).sessionKey ?? sessionKey; + return { + sessionKey: key, + storeSessionKey: key, + acp: { + ...readySessionMeta(), + agent: "gemini", + runtimeSessionName: key, + identity: { + state: "resolved", + source: "status", + acpxSessionId: "acpx-sid-1", + agentSessionId: "gemini-sid-1", + lastUpdatedAt: Date.now(), + }, + }, + }; + }); + + const manager = new AcpSessionManager(); + await manager.runTurn({ + cfg: baseCfg, + sessionKey, + text: "after restart", + mode: "prompt", + requestId: "r-binding-restart-gemini", + }); + + expect(runtimeState.ensureSession).toHaveBeenCalledWith( + expect.objectContaining({ + sessionKey, + agent: "gemini", + resumeSessionId: "gemini-sid-1", + }), + ); + }); + it("does not resume persisted ACP identity for oneshot sessions after restart", async () => { const runtimeState = createRuntime(); hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ @@ -890,7 +935,7 @@ describe("AcpSessionManager", () => { expect(ensureInput?.resumeSessionId).toBeUndefined(); }); - it("falls back to a fresh ensure when reopening a persisted ACP backend session id fails", async () => { + it("falls back to a fresh ensure without reusing stale agent session ids", async () => { const runtimeState = createRuntime(); runtimeState.ensureSession.mockImplementation(async (inputUnknown: unknown) => { const input = inputUnknown as { @@ -929,6 +974,7 @@ describe("AcpSessionManager", () => { state: "resolved", source: "status", acpxSessionId: "acpx-sid-stale", + agentSessionId: "agent-sid-stale", lastUpdatedAt: Date.now(), }, }; @@ -971,13 +1017,19 @@ describe("AcpSessionManager", () => { expect(runtimeState.ensureSession.mock.calls[0]?.[0]).toMatchObject({ sessionKey, agent: "codex", - resumeSessionId: "acpx-sid-stale", + resumeSessionId: "agent-sid-stale", }); const retryInput = runtimeState.ensureSession.mock.calls[1]?.[0] as | { resumeSessionId?: string } | undefined; expect(retryInput?.resumeSessionId).toBeUndefined(); + const runTurnInput = runtimeState.runTurn.mock.calls[0]?.[0] as + | { handle?: { agentSessionId?: string; backendSessionId?: string } } + | undefined; + expect(runTurnInput?.handle?.backendSessionId).toBe("acpx-sid-fresh"); + expect(runTurnInput?.handle?.agentSessionId).toBeUndefined(); expect(currentMeta.identity?.acpxSessionId).toBe("acpx-sid-fresh"); + expect(currentMeta.identity?.agentSessionId).toBeUndefined(); }); it("enforces acp.maxConcurrentSessions when opening new runtime handles", async () => { @@ -1812,6 +1864,83 @@ describe("AcpSessionManager", () => { expect(currentMeta.identity?.agentSessionId).toBe("agent-session-1"); }); + it("reconciles prompt-learned agent session IDs even when runtime status omits them", async () => { + const runtimeState = createRuntime(); + runtimeState.ensureSession.mockResolvedValue({ + sessionKey: "agent:gemini:acp:session-1", + backend: "acpx", + runtimeSessionName: "runtime-3", + backendSessionId: "acpx-stale", + }); + runtimeState.runTurn.mockImplementation(async function* (inputUnknown: unknown) { + const input = inputUnknown as { + handle: { + agentSessionId?: string; + }; + }; + input.handle.agentSessionId = "gemini-session-1"; + yield { type: "done" as const }; + }); + runtimeState.getStatus.mockResolvedValue({ + summary: "status=alive", + details: { status: "alive" }, + }); + hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ + id: "acpx", + runtime: runtimeState.runtime, + }); + + let currentMeta: SessionAcpMeta = { + ...readySessionMeta(), + agent: "gemini", + identity: { + state: "pending", + source: "ensure", + acpxSessionId: "acpx-stale", + lastUpdatedAt: Date.now(), + }, + }; + const sessionKey = "agent:gemini:acp:session-1"; + hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => { + const key = (paramsUnknown as { sessionKey?: string }).sessionKey ?? sessionKey; + return { + sessionKey: key, + storeSessionKey: key, + acp: currentMeta, + }; + }); + hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => { + const params = paramsUnknown as { + mutate: ( + current: SessionAcpMeta | undefined, + entry: { acp?: SessionAcpMeta } | undefined, + ) => SessionAcpMeta | null | undefined; + }; + const next = params.mutate(currentMeta, { acp: currentMeta }); + if (next) { + currentMeta = next; + } + return { + sessionId: "session-1", + updatedAt: Date.now(), + acp: currentMeta, + }; + }); + + const manager = new AcpSessionManager(); + await manager.runTurn({ + cfg: baseCfg, + sessionKey, + text: "learn prompt session", + mode: "prompt", + requestId: "run-prompt-learned-agent-id", + }); + + expect(currentMeta.identity?.state).toBe("resolved"); + expect(currentMeta.identity?.agentSessionId).toBe("gemini-session-1"); + expect(currentMeta.identity?.acpxSessionId).toBe("acpx-stale"); + }); + it("skips startup identity reconciliation for already resolved sessions", async () => { const runtimeState = createRuntime(); hoisted.requireAcpRuntimeBackendMock.mockReturnValue({ diff --git a/src/acp/runtime/session-identity.ts b/src/acp/runtime/session-identity.ts index 1ff808bd28c..57f002be307 100644 --- a/src/acp/runtime/session-identity.ts +++ b/src/acp/runtime/session-identity.ts @@ -77,7 +77,7 @@ export function resolveRuntimeResumeSessionId( if (!identity) { return undefined; } - return normalizeText(identity.acpxSessionId) ?? normalizeText(identity.agentSessionId); + return normalizeText(identity.agentSessionId) ?? normalizeText(identity.acpxSessionId); } export function isSessionIdentityPending(identity: SessionAcpIdentity | undefined): boolean { @@ -175,6 +175,26 @@ export function createIdentityFromEnsure(params: { }; } +export function createIdentityFromHandleEvent(params: { + handle: AcpRuntimeHandle; + now: number; +}): SessionAcpIdentity | undefined { + const acpxRecordId = normalizeText((params.handle as { acpxRecordId?: unknown }).acpxRecordId); + const acpxSessionId = normalizeText(params.handle.backendSessionId); + const agentSessionId = normalizeText(params.handle.agentSessionId); + if (!acpxRecordId && !acpxSessionId && !agentSessionId) { + return undefined; + } + return { + state: agentSessionId ? "resolved" : "pending", + ...(acpxRecordId ? { acpxRecordId } : {}), + ...(acpxSessionId ? { acpxSessionId } : {}), + ...(agentSessionId ? { agentSessionId } : {}), + source: "event", + lastUpdatedAt: params.now, + }; +} + export function createIdentityFromStatus(params: { status: AcpRuntimeStatus | undefined; now: number;