fix(acpx): retry persisted resume ids cleanly (#52209)

* fix(acpx): store agent session ID when session/load fails

When an ACP agent (e.g. Gemini CLI) rejects the acpx-generated session
ID via session/load and falls back to session/new, the agent-returned
session ID was previously discarded. This caused identity stuck at
pending forever, multi-turn failures, lost completion events, and
persistent reconcile warnings.

- Parse ACP protocol stream in runTurn() to capture agent session IDs
- Flip resolveRuntimeResumeSessionId() to prefer agentSessionId
- Add createIdentityFromHandleEvent() for handle-sourced identity
- Layer handle event identity before status in reconcile
- Add regression tests for load fallback and restart resume

Closes #52182

* ACPX: prefer decoded session ids

* ACPX: refresh runtime handle state from status

---------

Co-authored-by: Wesley <imwyvern@users.noreply.github.com>
This commit is contained in:
wesley
2026-04-05 17:01:59 +08:00
committed by GitHub
parent cc09171929
commit 1030b498de
7 changed files with 690 additions and 57 deletions

View File

@@ -5,7 +5,11 @@ import { pathToFileURL } from "node:url";
import { afterEach, describe, expect, it, vi } from "vitest";
import { runAcpRuntimeAdapterContract } from "../../../src/acp/runtime/adapter-contract.testkit.js";
import { resolveAcpxPluginConfig } from "./config.js";
import { AcpxRuntime, decodeAcpxRuntimeHandleState } from "./runtime.js";
import {
AcpxRuntime,
decodeAcpxRuntimeHandleState,
encodeAcpxRuntimeHandleState,
} from "./runtime.js";
import {
cleanupMockRuntimeFixtures,
createMockRuntimeFixture,
@@ -452,9 +456,10 @@ describe("AcpxRuntime", () => {
it("serializes text plus image attachments into ACP prompt blocks", async () => {
const { runtime, logPath } = await createMockRuntimeFixture();
const sessionKey = "agent:codex:acp:with-image";
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:with-image",
sessionKey,
agent: "codex",
mode: "persistent",
});
@@ -472,7 +477,7 @@ describe("AcpxRuntime", () => {
const logs = await readMockRuntimeLogEntries(logPath);
const prompt = logs.find(
(entry) =>
entry.kind === "prompt" && String(entry.sessionName ?? "") === "agent:codex:acp:with-image",
entry.kind === "prompt" && String(entry.sessionName ?? "") === handle.agentSessionId,
);
expect(prompt).toBeDefined();
@@ -489,8 +494,9 @@ describe("AcpxRuntime", () => {
try {
const { runtime, logPath } = await createMockRuntimeFixture();
const sessionKey = "agent:codex:acp:custom-env";
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:custom-env",
sessionKey,
agent: "codex",
mode: "persistent",
});
@@ -507,8 +513,7 @@ describe("AcpxRuntime", () => {
const logs = await readMockRuntimeLogEntries(logPath);
const prompt = logs.find(
(entry) =>
entry.kind === "prompt" &&
String(entry.sessionName ?? "") === "agent:codex:acp:custom-env",
entry.kind === "prompt" && String(entry.sessionName ?? "") === handle.agentSessionId,
);
expect(prompt?.openaiApiKey).toBe("openai-secret");
expect(prompt?.githubToken).toBe("gh-secret");
@@ -546,7 +551,7 @@ describe("AcpxRuntime", () => {
const logs = await readMockRuntimeLogEntries(String(activeLogPath));
const prompt = logs.find(
(entry) =>
entry.kind === "prompt" && String(entry.sessionName ?? "") === "agent:codex:acp:space",
entry.kind === "prompt" && String(entry.sessionName ?? "") === handle.agentSessionId,
);
expect(prompt).toBeDefined();
const promptArgs = (prompt?.args as string[]) ?? [];
@@ -654,7 +659,7 @@ describe("AcpxRuntime", () => {
const logs = await readMockRuntimeLogEntries(logPath);
const cancel = logs.find((entry) => entry.kind === "cancel");
const close = logs.find((entry) => entry.kind === "close");
expect(cancel?.sessionName).toBe("agent:claude:acp:789");
expect(cancel?.sessionName).toBe(handle.agentSessionId);
expect(close?.sessionName).toBe("agent:claude:acp:789");
});
@@ -1014,4 +1019,176 @@ describe("AcpxRuntime", () => {
delete process.env.MOCK_ACPX_NEW_EMPTY;
}
});
it("stores agent session IDs returned after prompt-time load fallback and reuses them", async () => {
process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID = "1";
process.env.MOCK_ACPX_PROMPT_LOAD_INVALID = "1";
process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID = "gemini-session-123";
try {
const { runtime, logPath } = await createMockRuntimeFixture();
const sessionKey = "agent:gemini:acp:load-fallback";
const handle = await runtime.ensureSession({
sessionKey,
agent: "gemini",
mode: "persistent",
});
expect(handle.agentSessionId).toBeUndefined();
for await (const _event of runtime.runTurn({
handle,
text: "first turn",
mode: "prompt",
requestId: "req-gemini-1",
})) {
// Drain the prompt stream so the fallback session id can be captured.
}
expect(handle.agentSessionId).toBe("gemini-session-123");
expect(decodeAcpxRuntimeHandleState(handle.runtimeSessionName)?.agentSessionId).toBe(
"gemini-session-123",
);
const status = await runtime.getStatus({ handle });
expect(status.agentSessionId).toBe("gemini-session-123");
for await (const _event of runtime.runTurn({
handle,
text: "second turn",
mode: "prompt",
requestId: "req-gemini-2",
})) {
// The second turn should reuse the learned agent session id.
}
const promptEntries = (await readMockRuntimeLogEntries(logPath)).filter(
(entry) => entry.kind === "prompt",
);
expect(promptEntries).toHaveLength(2);
expect(promptEntries[0]?.sessionName).toBe(sessionKey);
expect(promptEntries[1]?.sessionName).toBe("gemini-session-123");
} finally {
delete process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID;
delete process.env.MOCK_ACPX_PROMPT_LOAD_INVALID;
delete process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID;
}
});
it("prefers decoded runtime session identifiers over stale handle fallbacks", async () => {
const { runtime, logPath } = await createMockRuntimeFixture();
const sessionKey = "agent:gemini:acp:stale-handle-fallback";
const handle = await runtime.ensureSession({
sessionKey,
agent: "gemini",
mode: "persistent",
});
const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName);
expect(decoded).not.toBeNull();
handle.runtimeSessionName = encodeAcpxRuntimeHandleState({
...decoded!,
backendSessionId: "sid-decoded-gemini-session",
agentSessionId: "decoded-gemini-session",
});
handle.backendSessionId = "sid-stale-gemini-session";
handle.agentSessionId = "stale-gemini-session";
await runtime.getStatus({ handle });
const statusEntries = (await readMockRuntimeLogEntries(logPath)).filter(
(entry) => entry.kind === "status",
);
expect(statusEntries.length).toBeGreaterThan(0);
expect(statusEntries.at(-1)?.sessionName).toBe("decoded-gemini-session");
});
it("refreshes encoded runtime session identifiers after status learns a newer agent id", async () => {
const { runtime, logPath } = await createMockRuntimeFixture();
const sessionKey = "agent:gemini:acp:status-refresh";
const handle = await runtime.ensureSession({
sessionKey,
agent: "gemini",
mode: "persistent",
});
const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName);
expect(decoded).not.toBeNull();
handle.runtimeSessionName = encodeAcpxRuntimeHandleState({
...decoded!,
agentSessionId: "decoded-gemini-session",
});
handle.agentSessionId = "fresh-gemini-session";
const statePath = process.env.MOCK_ACPX_STATE;
expect(statePath).toBeTruthy();
fs.writeFileSync(
String(statePath),
JSON.stringify({
byName: {
[sessionKey]: {
acpxRecordId: `rec-${sessionKey}`,
acpxSessionId: `sid-${sessionKey}`,
agentSessionId: "fresh-gemini-session",
},
},
byAgentSessionId: {
"decoded-gemini-session": sessionKey,
"fresh-gemini-session": sessionKey,
},
}),
"utf8",
);
const status = await runtime.getStatus({ handle });
expect(status.agentSessionId).toBe("fresh-gemini-session");
expect(handle.agentSessionId).toBe("fresh-gemini-session");
expect(decodeAcpxRuntimeHandleState(handle.runtimeSessionName)?.agentSessionId).toBe(
"fresh-gemini-session",
);
await runtime.cancel({ handle, reason: "test-refresh" });
const statusEntries = (await readMockRuntimeLogEntries(logPath)).filter(
(entry) => entry.kind === "status",
);
const cancelEntries = (await readMockRuntimeLogEntries(logPath)).filter(
(entry) => entry.kind === "cancel",
);
expect(statusEntries.at(-1)?.sessionName).toBe("decoded-gemini-session");
expect(cancelEntries.at(-1)?.sessionName).toBe("fresh-gemini-session");
});
it("does not promote session/update params.sessionId into the runtime handle", async () => {
process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID = "1";
process.env.MOCK_ACPX_PROMPT_OMIT_LOAD_RESULT = "1";
try {
const { runtime } = await createMockRuntimeFixture();
const handle = await runtime.ensureSession({
sessionKey: "agent:codex:acp:update-echo",
agent: "codex",
mode: "persistent",
});
expect(handle.agentSessionId).toBeUndefined();
for await (const _event of runtime.runTurn({
handle,
text: "session-update-echo",
mode: "prompt",
requestId: "req-session-update-echo",
})) {
// Drain the prompt stream so any stray identifier promotion would be applied.
}
expect(handle.agentSessionId).toBeUndefined();
expect(
decodeAcpxRuntimeHandleState(handle.runtimeSessionName)?.agentSessionId,
).toBeUndefined();
} finally {
delete process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID;
delete process.env.MOCK_ACPX_PROMPT_OMIT_LOAD_RESULT;
}
});
});

View File

@@ -52,6 +52,12 @@ const ACPX_CAPABILITIES: AcpRuntimeCapabilities = {
controls: ["session/set_mode", "session/set_config_option", "session/status"],
};
type AcpxSessionIdentifiers = {
acpxRecordId?: string;
backendSessionId?: string;
agentSessionId?: string;
};
type AcpxHealthCheckResult =
| {
ok: true;
@@ -166,6 +172,111 @@ function findSessionIdentifierEvent(events: AcpxJsonObject[]): AcpxJsonObject |
);
}
function resolveSessionIdentifiersFromEvent(
event: AcpxJsonObject | undefined,
): AcpxSessionIdentifiers {
if (!event) {
return {};
}
const acpxRecordId = asOptionalString(event.acpxRecordId);
const backendSessionId = asOptionalString(event.acpxSessionId);
const agentSessionId = asOptionalString(event.agentSessionId);
return {
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
}
function hasSessionIdentifiers(identifiers: AcpxSessionIdentifiers): boolean {
return Boolean(
identifiers.acpxRecordId || identifiers.backendSessionId || identifiers.agentSessionId,
);
}
function parsePromptProtocolEvent(line: string): AcpxJsonObject | null {
const trimmed = line.trim();
if (!trimmed) {
return null;
}
try {
const parsed = JSON.parse(trimmed) as unknown;
return isRecord(parsed) ? parsed : null;
} catch {
return null;
}
}
function extractPromptSessionIdentifiers(line: string): AcpxSessionIdentifiers {
const parsed = parsePromptProtocolEvent(line);
if (!parsed) {
return {};
}
const direct = resolveSessionIdentifiersFromEvent(parsed);
if (hasSessionIdentifiers(direct)) {
return direct;
}
if (isRecord(parsed.result)) {
// Prompt turns only emit result.sessionId from session/load and session/new responses today,
// so an unrestricted capture remains scoped to backend-issued session identifiers.
const agentSessionId = asOptionalString(parsed.result.sessionId);
if (agentSessionId) {
return { agentSessionId };
}
}
return {};
}
function mergeHandleStateWithIdentifiers(
state: AcpxHandleState,
identifiers: AcpxSessionIdentifiers,
): AcpxHandleState {
const acpxRecordId = identifiers.acpxRecordId ?? state.acpxRecordId;
const backendSessionId = identifiers.backendSessionId ?? state.backendSessionId;
const agentSessionId = identifiers.agentSessionId ?? state.agentSessionId;
return {
...state,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
}
function mergeHandleStateWithFallbackIdentifiers(
state: AcpxHandleState,
identifiers: AcpxSessionIdentifiers,
): AcpxHandleState {
const acpxRecordId = state.acpxRecordId ?? identifiers.acpxRecordId;
const backendSessionId = state.backendSessionId ?? identifiers.backendSessionId;
const agentSessionId = state.agentSessionId ?? identifiers.agentSessionId;
return {
...state,
...(acpxRecordId ? { acpxRecordId } : {}),
...(backendSessionId ? { backendSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
}
function writeHandleState(handle: AcpRuntimeHandle, state: AcpxHandleState): void {
handle.runtimeSessionName = encodeAcpxRuntimeHandleState(state);
if (state.acpxRecordId) {
handle.acpxRecordId = state.acpxRecordId;
}
if (state.backendSessionId) {
handle.backendSessionId = state.backendSessionId;
}
if (state.agentSessionId) {
handle.agentSessionId = state.agentSessionId;
}
}
function resolveInteractiveSessionReference(state: AcpxHandleState): string {
return state.agentSessionId ?? state.name;
}
export function encodeAcpxRuntimeHandleState(state: AcpxHandleState): string {
const payload = Buffer.from(JSON.stringify(state), "utf8").toString("base64url");
return `${ACPX_RUNTIME_HANDLE_PREFIX}${payload}`;
@@ -647,11 +758,10 @@ export class AcpxRuntime implements AcpRuntime {
);
}
const acpxRecordId = ensuredEvent ? asOptionalString(ensuredEvent.acpxRecordId) : undefined;
const agentSessionId = ensuredEvent ? asOptionalString(ensuredEvent.agentSessionId) : undefined;
const backendSessionId = ensuredEvent
? asOptionalString(ensuredEvent.acpxSessionId)
: undefined;
const identifiers = resolveSessionIdentifiersFromEvent(ensuredEvent);
const acpxRecordId = identifiers.acpxRecordId;
const agentSessionId = identifiers.agentSessionId;
const backendSessionId = identifiers.backendSessionId;
return {
sessionKey: input.sessionKey,
@@ -673,10 +783,10 @@ export class AcpxRuntime implements AcpRuntime {
}
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
const state = this.resolveHandleState(input.handle);
let state = this.resolveHandleState(input.handle);
const args = await this.buildPromptArgs({
agent: state.agent,
sessionName: state.name,
sessionName: resolveInteractiveSessionReference(state),
cwd: state.cwd,
});
@@ -737,6 +847,11 @@ export class AcpxRuntime implements AcpRuntime {
const lines = createInterface({ input: child.stdout });
try {
for await (const line of lines) {
const promptIdentifiers = extractPromptSessionIdentifiers(line);
if (hasSessionIdentifiers(promptIdentifiers)) {
state = mergeHandleStateWithIdentifiers(state, promptIdentifiers);
writeHandleState(input.handle, state);
}
const parsed = parsePromptEventLine(line);
if (!parsed) {
continue;
@@ -813,7 +928,7 @@ export class AcpxRuntime implements AcpRuntime {
const args = await this.buildVerbArgs({
agent: state.agent,
cwd: state.cwd,
command: ["status", "--session", state.name],
command: ["status", "--session", resolveInteractiveSessionReference(state)],
});
const events = await this.runControlCommand({
args,
@@ -832,6 +947,14 @@ export class AcpxRuntime implements AcpRuntime {
const acpxRecordId = asOptionalString(detail.acpxRecordId);
const acpxSessionId = asOptionalString(detail.acpxSessionId);
const agentSessionId = asOptionalString(detail.agentSessionId);
const refreshedIdentifiers: AcpxSessionIdentifiers = {
...(acpxRecordId ? { acpxRecordId } : {}),
...(acpxSessionId ? { backendSessionId: acpxSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
};
if (hasSessionIdentifiers(refreshedIdentifiers)) {
writeHandleState(input.handle, mergeHandleStateWithIdentifiers(state, refreshedIdentifiers));
}
const pid = typeof detail.pid === "number" && Number.isFinite(detail.pid) ? detail.pid : null;
const summary = [
`status=${status}`,
@@ -859,7 +982,7 @@ export class AcpxRuntime implements AcpRuntime {
const args = await this.buildVerbArgs({
agent: state.agent,
cwd: state.cwd,
command: ["set-mode", mode, "--session", state.name],
command: ["set-mode", mode, "--session", resolveInteractiveSessionReference(state)],
});
await this.runControlCommand({
args,
@@ -882,7 +1005,7 @@ export class AcpxRuntime implements AcpRuntime {
const args = await this.buildVerbArgs({
agent: state.agent,
cwd: state.cwd,
command: ["set", key, value, "--session", state.name],
command: ["set", key, value, "--session", resolveInteractiveSessionReference(state)],
});
await this.runControlCommand({
args,
@@ -971,7 +1094,7 @@ export class AcpxRuntime implements AcpRuntime {
const args = await this.buildVerbArgs({
agent: state.agent,
cwd: state.cwd,
command: ["cancel", "--session", state.name],
command: ["cancel", "--session", resolveInteractiveSessionReference(state)],
});
await this.runControlCommand({
args,
@@ -999,7 +1122,13 @@ export class AcpxRuntime implements AcpRuntime {
private resolveHandleState(handle: AcpRuntimeHandle): AcpxHandleState {
const decoded = decodeAcpxRuntimeHandleState(handle.runtimeSessionName);
if (decoded) {
return decoded;
// Prefer the encoded runtime state over legacy handle fields so stale
// fallback metadata does not resurrect older session identifiers.
return mergeHandleStateWithFallbackIdentifiers(decoded, {
acpxRecordId: asOptionalString((handle as { acpxRecordId?: unknown }).acpxRecordId),
backendSessionId: asOptionalString(handle.backendSessionId),
agentSessionId: asOptionalString(handle.agentSessionId),
});
}
const legacyName = asTrimmedString(handle.runtimeSessionName);
@@ -1015,6 +1144,17 @@ export class AcpxRuntime implements AcpRuntime {
agent: deriveAgentFromSessionKey(handle.sessionKey, DEFAULT_AGENT_FALLBACK),
cwd: this.config.cwd,
mode: "persistent",
...(asOptionalString((handle as { acpxRecordId?: unknown }).acpxRecordId)
? {
acpxRecordId: asOptionalString((handle as { acpxRecordId?: unknown }).acpxRecordId),
}
: {}),
...(asOptionalString(handle.backendSessionId)
? { backendSessionId: asOptionalString(handle.backendSessionId) }
: {}),
...(asOptionalString(handle.agentSessionId)
? { agentSessionId: asOptionalString(handle.agentSessionId) }
: {}),
};
}

View File

@@ -19,10 +19,14 @@ let logFileSequence = 0;
const MOCK_CLI_SCRIPT = String.raw`#!/usr/bin/env node
const fs = require("node:fs");
const path = require("node:path");
(async () => {
const args = process.argv.slice(2);
const logPath = process.env.MOCK_ACPX_LOG;
const statePath =
process.env.MOCK_ACPX_STATE ||
path.join(path.dirname(logPath || process.cwd()), "mock-acpx-state.json");
const openclawShell = process.env.OPENCLAW_SHELL || "";
const writeLog = (entry) => {
if (!logPath) return;
@@ -41,6 +45,95 @@ const emitUpdate = (sessionId, update) =>
method: "session/update",
params: { sessionId, update },
});
const readState = () => {
try {
const raw = fs.readFileSync(statePath, "utf8");
const parsed = JSON.parse(raw);
if (parsed && typeof parsed === "object" && !Array.isArray(parsed)) {
return {
byName:
parsed.byName && typeof parsed.byName === "object" && !Array.isArray(parsed.byName)
? parsed.byName
: {},
byAgentSessionId:
parsed.byAgentSessionId &&
typeof parsed.byAgentSessionId === "object" &&
!Array.isArray(parsed.byAgentSessionId)
? parsed.byAgentSessionId
: {},
};
}
} catch {}
return { byName: {}, byAgentSessionId: {} };
};
const writeState = (state) => {
fs.writeFileSync(statePath, JSON.stringify(state), "utf8");
};
const defaultAgentSessionIdForName = (name) => {
if (process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID === "1") {
return "";
}
const prefix = process.env.MOCK_ACPX_AGENT_SESSION_PREFIX || "inner-";
return prefix + name;
};
const cleanupAgentLookup = (state, name) => {
for (const [sessionId, mappedName] of Object.entries(state.byAgentSessionId)) {
if (mappedName === name) {
delete state.byAgentSessionId[sessionId];
}
}
};
const storeSessionByName = (name, overrides = {}) => {
const state = readState();
const existing = state.byName[name] && typeof state.byName[name] === "object" ? state.byName[name] : {};
const next = {
acpxRecordId: "rec-" + name,
acpxSessionId: "sid-" + name,
agentSessionId: defaultAgentSessionIdForName(name),
...existing,
...overrides,
};
if (!next.acpxRecordId) {
next.acpxRecordId = "rec-" + name;
}
if (!next.acpxSessionId) {
next.acpxSessionId = "sid-" + name;
}
cleanupAgentLookup(state, name);
state.byName[name] = next;
if (next.agentSessionId) {
state.byAgentSessionId[next.agentSessionId] = name;
}
writeState(state);
return { name, ...next };
};
const findSessionByReference = (reference) => {
if (!reference) {
return null;
}
const state = readState();
const byName = state.byName[reference];
if (byName && typeof byName === "object") {
return { name: reference, ...byName };
}
const mappedName = state.byAgentSessionId[reference];
if (mappedName) {
const mapped = state.byName[mappedName];
if (mapped && typeof mapped === "object") {
return { name: mappedName, ...mapped };
}
}
for (const [name, session] of Object.entries(state.byName)) {
if (!session || typeof session !== "object") {
continue;
}
if (session.acpxSessionId === reference) {
return { name, ...session };
}
}
return null;
};
const resolveSession = (reference) => findSessionByReference(reference) || storeSessionByName(reference);
if (args.includes("--version")) {
return emitTextAndExit("mock-acpx ${ACPX_PINNED_VERSION}\\n");
@@ -74,6 +167,7 @@ const readFlag = (flag) => {
const sessionFromOption = readFlag("--session");
const ensureName = readFlag("--name");
const resumeSessionId = readFlag("--resume-session");
const closeName =
command === "sessions" && args[commandIndex + 1] === "close"
? String(args[commandIndex + 2] || "")
@@ -88,6 +182,7 @@ if (command === "sessions" && args[commandIndex + 1] === "ensure") {
process.stderr.write(String(process.env.MOCK_ACPX_ENSURE_STDERR) + "\n");
}
if (process.env.MOCK_ACPX_ENSURE_EXIT_1 === "1") {
storeSessionByName(ensureName, resumeSessionId ? { agentSessionId: resumeSessionId } : {});
return emitJsonAndExit({
jsonrpc: "2.0",
id: null,
@@ -100,11 +195,12 @@ if (command === "sessions" && args[commandIndex + 1] === "ensure") {
if (process.env.MOCK_ACPX_ENSURE_EMPTY === "1") {
emitJson({ action: "session_ensured", name: ensureName });
} else {
const session = storeSessionByName(ensureName, resumeSessionId ? { agentSessionId: resumeSessionId } : {});
emitJson({
action: "session_ensured",
acpxRecordId: "rec-" + ensureName,
acpxSessionId: "sid-" + ensureName,
agentSessionId: "inner-" + ensureName,
acpxRecordId: session.acpxRecordId,
acpxSessionId: session.acpxSessionId,
...(session.agentSessionId ? { agentSessionId: session.agentSessionId } : {}),
name: ensureName,
created: true,
});
@@ -131,11 +227,12 @@ if (command === "sessions" && args[commandIndex + 1] === "new") {
if (process.env.MOCK_ACPX_NEW_EMPTY === "1") {
emitJson({ action: "session_created", name: ensureName });
} else {
const session = storeSessionByName(ensureName, resumeSessionId ? { agentSessionId: resumeSessionId } : {});
emitJson({
action: "session_created",
acpxRecordId: "rec-" + ensureName,
acpxSessionId: "sid-" + ensureName,
agentSessionId: "inner-" + ensureName,
acpxRecordId: session.acpxRecordId,
acpxSessionId: session.acpxSessionId,
...(session.agentSessionId ? { agentSessionId: session.agentSessionId } : {}),
name: ensureName,
created: true,
});
@@ -172,23 +269,26 @@ if (command === "config" && args[commandIndex + 1] === "show") {
}
if (command === "cancel") {
const session = findSessionByReference(sessionFromOption);
writeLog({ kind: "cancel", agent, args, sessionName: sessionFromOption });
return emitJsonAndExit({
acpxSessionId: "sid-" + sessionFromOption,
acpxSessionId: session ? session.acpxSessionId : "sid-" + sessionFromOption,
cancelled: true,
});
}
if (command === "set-mode") {
const session = findSessionByReference(sessionFromOption);
writeLog({ kind: "set-mode", agent, args, sessionName: sessionFromOption, mode: setModeValue });
return emitJsonAndExit({
action: "mode_set",
acpxSessionId: "sid-" + sessionFromOption,
acpxSessionId: session ? session.acpxSessionId : "sid-" + sessionFromOption,
mode: setModeValue,
});
}
if (command === "set") {
const session = findSessionByReference(sessionFromOption);
writeLog({
kind: "set",
agent,
@@ -199,7 +299,7 @@ if (command === "set") {
});
emitJson({
action: "config_set",
acpxSessionId: "sid-" + sessionFromOption,
acpxSessionId: session ? session.acpxSessionId : "sid-" + sessionFromOption,
key: setKey,
value: setValue,
});
@@ -208,6 +308,7 @@ if (command === "set") {
}
if (command === "status") {
const session = findSessionByReference(sessionFromOption);
writeLog({ kind: "status", agent, args, sessionName: sessionFromOption });
if (process.env.MOCK_ACPX_STATUS_SIGNAL) {
process.kill(process.pid, process.env.MOCK_ACPX_STATUS_SIGNAL);
@@ -216,9 +317,9 @@ if (command === "status") {
const summary = process.env.MOCK_ACPX_STATUS_SUMMARY || "";
const omitStatusIds = process.env.MOCK_ACPX_STATUS_NO_IDS === "1";
emitJson({
acpxRecordId: sessionFromOption && !omitStatusIds ? "rec-" + sessionFromOption : null,
acpxSessionId: sessionFromOption && !omitStatusIds ? "sid-" + sessionFromOption : null,
agentSessionId: sessionFromOption && !omitStatusIds ? "inner-" + sessionFromOption : null,
acpxRecordId: !omitStatusIds && session ? session.acpxRecordId : null,
acpxSessionId: !omitStatusIds && session ? session.acpxSessionId : null,
agentSessionId: !omitStatusIds && session ? session.agentSessionId || null : null,
status,
...(summary ? { summary } : {}),
pid: 4242,
@@ -229,17 +330,19 @@ if (command === "status") {
}
if (command === "sessions" && args[commandIndex + 1] === "close") {
const session = findSessionByReference(closeName) || storeSessionByName(closeName);
writeLog({ kind: "close", agent, args, sessionName: closeName });
return emitJsonAndExit({
action: "session_closed",
acpxRecordId: "rec-" + closeName,
acpxSessionId: "sid-" + closeName,
acpxRecordId: session.acpxRecordId,
acpxSessionId: session.acpxSessionId,
name: closeName,
});
}
if (command === "prompt") {
const stdinText = fs.readFileSync(0, "utf8");
const session = resolveSession(sessionFromOption);
writeLog({
kind: "prompt",
agent,
@@ -251,6 +354,7 @@ if (command === "prompt") {
githubToken: process.env.GITHUB_TOKEN || "",
});
const requestId = "req-1";
let activeSessionId = session.agentSessionId || sessionFromOption;
emitJson({
jsonrpc: "2.0",
@@ -262,21 +366,50 @@ if (command === "prompt") {
mcpServers: [],
},
});
emitJson({
jsonrpc: "2.0",
id: 0,
error: {
code: -32002,
message: "Resource not found",
},
});
const shouldRejectLoad =
process.env.MOCK_ACPX_PROMPT_LOAD_INVALID === "1" &&
(!session.agentSessionId || sessionFromOption !== session.agentSessionId);
if (shouldRejectLoad) {
const nextAgentSessionId =
process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID || "agent-fallback-" + session.name;
const refreshed = storeSessionByName(session.name, {
agentSessionId: nextAgentSessionId,
});
emitJson({
jsonrpc: "2.0",
id: 0,
error: {
code: -32002,
message: "Invalid session identifier",
},
});
emitJson({
jsonrpc: "2.0",
id: 0,
result: {
sessionId: nextAgentSessionId,
},
});
activeSessionId = refreshed.agentSessionId || nextAgentSessionId;
} else {
if (process.env.MOCK_ACPX_PROMPT_OMIT_LOAD_RESULT !== "1") {
emitJson({
jsonrpc: "2.0",
id: 0,
result: {
sessionId: activeSessionId,
},
});
}
}
emitJson({
jsonrpc: "2.0",
id: requestId,
method: "session/prompt",
params: {
sessionId: sessionFromOption,
sessionId: activeSessionId,
prompt: [
{
type: "text",
@@ -304,15 +437,15 @@ if (command === "prompt") {
}
if (stdinText.includes("split-spacing")) {
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "alpha" },
});
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " beta" },
});
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " gamma" },
});
@@ -322,7 +455,7 @@ if (command === "prompt") {
}
if (stdinText.includes("double-done")) {
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "ok" },
});
@@ -332,18 +465,18 @@ if (command === "prompt") {
return;
}
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "thinking" },
});
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "tool_call",
toolCallId: "tool-1",
title: "run-tests",
status: "in_progress",
kind: "command",
});
emitUpdate(sessionFromOption, {
emitUpdate(activeSessionId, {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "echo:" + stdinText.trim() },
});
@@ -376,7 +509,9 @@ export async function createMockRuntimeFixture(params?: {
const scriptPath = await ensureMockCliScriptPath();
const dir = path.dirname(scriptPath);
const logPath = path.join(dir, `calls-${logFileSequence++}.log`);
const statePath = path.join(dir, `state-${logFileSequence - 1}.json`);
process.env.MOCK_ACPX_LOG = logPath;
process.env.MOCK_ACPX_STATE = statePath;
const config: ResolvedAcpxPluginConfig = {
command: scriptPath,
@@ -435,11 +570,18 @@ export async function readMockRuntimeLogEntries(
export async function cleanupMockRuntimeFixtures(): Promise<void> {
delete process.env.MOCK_ACPX_LOG;
delete process.env.MOCK_ACPX_STATE;
delete process.env.MOCK_ACPX_CONFIG_SHOW_AGENTS;
delete process.env.MOCK_ACPX_ENSURE_ERROR_MESSAGE;
delete process.env.MOCK_ACPX_ENSURE_EXIT_1;
delete process.env.MOCK_ACPX_ENSURE_STDERR;
delete process.env.MOCK_ACPX_NEW_FAIL_ON_RESUME;
delete process.env.MOCK_ACPX_ENSURE_EMPTY;
delete process.env.MOCK_ACPX_ENSURE_NO_AGENT_SESSION_ID;
delete process.env.MOCK_ACPX_NEW_EMPTY;
delete process.env.MOCK_ACPX_AGENT_SESSION_PREFIX;
delete process.env.MOCK_ACPX_PROMPT_LOAD_INVALID;
delete process.env.MOCK_ACPX_PROMPT_NEW_AGENT_SESSION_ID;
delete process.env.MOCK_ACPX_STATUS_STATUS;
delete process.env.MOCK_ACPX_STATUS_NO_IDS;
delete process.env.MOCK_ACPX_STATUS_SUMMARY;

View File

@@ -1354,6 +1354,7 @@ export class AcpSessionManager {
const runtime = backend.runtime;
const previousMeta = params.meta;
const previousIdentity = resolveSessionIdentityFromMeta(previousMeta);
let identityForEnsure = previousIdentity;
const persistedResumeSessionId =
mode === "persistent" ? resolveRuntimeResumeSessionId(previousIdentity) : undefined;
const ensureSession = async (resumeSessionId?: string) =>
@@ -1385,6 +1386,19 @@ export class AcpSessionManager {
logVerbose(
`acp-manager: resume init failed for ${params.sessionKey}; retrying without persisted ACP session id: ${acpError.message}`,
);
if (identityForEnsure) {
const {
acpxSessionId: _staleAcpxSessionId,
agentSessionId: _staleAgentSessionId,
...retryIdentity
} = identityForEnsure;
// The persisted resume identifiers already failed, so do not merge them back into the
// fresh named-session handle returned by the retry path.
identityForEnsure = {
...retryIdentity,
state: "pending",
};
}
ensured = await ensureSession();
}
} else {
@@ -1399,13 +1413,13 @@ export class AcpSessionManager {
});
const nextIdentity =
mergeSessionIdentity({
current: previousIdentity,
current: identityForEnsure,
incoming: createIdentityFromEnsure({
handle: ensured,
now,
}),
now,
}) ?? previousIdentity;
}) ?? identityForEnsure;
const nextHandleIdentifiers = resolveRuntimeHandleIdentifiersFromIdentity(nextIdentity);
const nextHandle: AcpRuntimeHandle = {
...ensured,

View File

@@ -2,6 +2,7 @@ import type { OpenClawConfig } from "../../config/config.js";
import { logVerbose } from "../../globals.js";
import { withAcpRuntimeErrorBoundary } from "../runtime/errors.js";
import {
createIdentityFromHandleEvent,
createIdentityFromStatus,
identityEquals,
mergeSessionIdentity,
@@ -63,15 +64,25 @@ export async function reconcileManagerRuntimeSessionIdentifiers(params: {
const now = Date.now();
const currentIdentity = resolveSessionIdentityFromMeta(params.meta);
const nextIdentity =
const eventIdentity = createIdentityFromHandleEvent({
handle: params.handle,
now,
});
const identityAfterEvent =
mergeSessionIdentity({
current: currentIdentity,
incoming: eventIdentity,
now,
}) ?? currentIdentity;
const nextIdentity =
mergeSessionIdentity({
current: identityAfterEvent,
incoming: createIdentityFromStatus({
status: runtimeStatus,
now,
}),
now,
}) ?? currentIdentity;
}) ?? identityAfterEvent;
const handleIdentifiers = resolveRuntimeHandleIdentifiersFromIdentity(nextIdentity);
const handleChanged =
handleIdentifiers.backendSessionId !== params.handle.backendSessionId ||

View File

@@ -843,6 +843,51 @@ describe("AcpSessionManager", () => {
);
});
it("prefers the persisted agent session id when reopening an ACP runtime after restart", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
const sessionKey = "agent:gemini:acp:binding:discord:default:restart";
hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => {
const key = (paramsUnknown as { sessionKey?: string }).sessionKey ?? sessionKey;
return {
sessionKey: key,
storeSessionKey: key,
acp: {
...readySessionMeta(),
agent: "gemini",
runtimeSessionName: key,
identity: {
state: "resolved",
source: "status",
acpxSessionId: "acpx-sid-1",
agentSessionId: "gemini-sid-1",
lastUpdatedAt: Date.now(),
},
},
};
});
const manager = new AcpSessionManager();
await manager.runTurn({
cfg: baseCfg,
sessionKey,
text: "after restart",
mode: "prompt",
requestId: "r-binding-restart-gemini",
});
expect(runtimeState.ensureSession).toHaveBeenCalledWith(
expect.objectContaining({
sessionKey,
agent: "gemini",
resumeSessionId: "gemini-sid-1",
}),
);
});
it("does not resume persisted ACP identity for oneshot sessions after restart", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
@@ -890,7 +935,7 @@ describe("AcpSessionManager", () => {
expect(ensureInput?.resumeSessionId).toBeUndefined();
});
it("falls back to a fresh ensure when reopening a persisted ACP backend session id fails", async () => {
it("falls back to a fresh ensure without reusing stale agent session ids", async () => {
const runtimeState = createRuntime();
runtimeState.ensureSession.mockImplementation(async (inputUnknown: unknown) => {
const input = inputUnknown as {
@@ -929,6 +974,7 @@ describe("AcpSessionManager", () => {
state: "resolved",
source: "status",
acpxSessionId: "acpx-sid-stale",
agentSessionId: "agent-sid-stale",
lastUpdatedAt: Date.now(),
},
};
@@ -971,13 +1017,19 @@ describe("AcpSessionManager", () => {
expect(runtimeState.ensureSession.mock.calls[0]?.[0]).toMatchObject({
sessionKey,
agent: "codex",
resumeSessionId: "acpx-sid-stale",
resumeSessionId: "agent-sid-stale",
});
const retryInput = runtimeState.ensureSession.mock.calls[1]?.[0] as
| { resumeSessionId?: string }
| undefined;
expect(retryInput?.resumeSessionId).toBeUndefined();
const runTurnInput = runtimeState.runTurn.mock.calls[0]?.[0] as
| { handle?: { agentSessionId?: string; backendSessionId?: string } }
| undefined;
expect(runTurnInput?.handle?.backendSessionId).toBe("acpx-sid-fresh");
expect(runTurnInput?.handle?.agentSessionId).toBeUndefined();
expect(currentMeta.identity?.acpxSessionId).toBe("acpx-sid-fresh");
expect(currentMeta.identity?.agentSessionId).toBeUndefined();
});
it("enforces acp.maxConcurrentSessions when opening new runtime handles", async () => {
@@ -1812,6 +1864,83 @@ describe("AcpSessionManager", () => {
expect(currentMeta.identity?.agentSessionId).toBe("agent-session-1");
});
it("reconciles prompt-learned agent session IDs even when runtime status omits them", async () => {
const runtimeState = createRuntime();
runtimeState.ensureSession.mockResolvedValue({
sessionKey: "agent:gemini:acp:session-1",
backend: "acpx",
runtimeSessionName: "runtime-3",
backendSessionId: "acpx-stale",
});
runtimeState.runTurn.mockImplementation(async function* (inputUnknown: unknown) {
const input = inputUnknown as {
handle: {
agentSessionId?: string;
};
};
input.handle.agentSessionId = "gemini-session-1";
yield { type: "done" as const };
});
runtimeState.getStatus.mockResolvedValue({
summary: "status=alive",
details: { status: "alive" },
});
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({
id: "acpx",
runtime: runtimeState.runtime,
});
let currentMeta: SessionAcpMeta = {
...readySessionMeta(),
agent: "gemini",
identity: {
state: "pending",
source: "ensure",
acpxSessionId: "acpx-stale",
lastUpdatedAt: Date.now(),
},
};
const sessionKey = "agent:gemini:acp:session-1";
hoisted.readAcpSessionEntryMock.mockImplementation((paramsUnknown: unknown) => {
const key = (paramsUnknown as { sessionKey?: string }).sessionKey ?? sessionKey;
return {
sessionKey: key,
storeSessionKey: key,
acp: currentMeta,
};
});
hoisted.upsertAcpSessionMetaMock.mockImplementation(async (paramsUnknown: unknown) => {
const params = paramsUnknown as {
mutate: (
current: SessionAcpMeta | undefined,
entry: { acp?: SessionAcpMeta } | undefined,
) => SessionAcpMeta | null | undefined;
};
const next = params.mutate(currentMeta, { acp: currentMeta });
if (next) {
currentMeta = next;
}
return {
sessionId: "session-1",
updatedAt: Date.now(),
acp: currentMeta,
};
});
const manager = new AcpSessionManager();
await manager.runTurn({
cfg: baseCfg,
sessionKey,
text: "learn prompt session",
mode: "prompt",
requestId: "run-prompt-learned-agent-id",
});
expect(currentMeta.identity?.state).toBe("resolved");
expect(currentMeta.identity?.agentSessionId).toBe("gemini-session-1");
expect(currentMeta.identity?.acpxSessionId).toBe("acpx-stale");
});
it("skips startup identity reconciliation for already resolved sessions", async () => {
const runtimeState = createRuntime();
hoisted.requireAcpRuntimeBackendMock.mockReturnValue({

View File

@@ -77,7 +77,7 @@ export function resolveRuntimeResumeSessionId(
if (!identity) {
return undefined;
}
return normalizeText(identity.acpxSessionId) ?? normalizeText(identity.agentSessionId);
return normalizeText(identity.agentSessionId) ?? normalizeText(identity.acpxSessionId);
}
export function isSessionIdentityPending(identity: SessionAcpIdentity | undefined): boolean {
@@ -175,6 +175,26 @@ export function createIdentityFromEnsure(params: {
};
}
export function createIdentityFromHandleEvent(params: {
handle: AcpRuntimeHandle;
now: number;
}): SessionAcpIdentity | undefined {
const acpxRecordId = normalizeText((params.handle as { acpxRecordId?: unknown }).acpxRecordId);
const acpxSessionId = normalizeText(params.handle.backendSessionId);
const agentSessionId = normalizeText(params.handle.agentSessionId);
if (!acpxRecordId && !acpxSessionId && !agentSessionId) {
return undefined;
}
return {
state: agentSessionId ? "resolved" : "pending",
...(acpxRecordId ? { acpxRecordId } : {}),
...(acpxSessionId ? { acpxSessionId } : {}),
...(agentSessionId ? { agentSessionId } : {}),
source: "event",
lastUpdatedAt: params.now,
};
}
export function createIdentityFromStatus(params: {
status: AcpRuntimeStatus | undefined;
now: number;