From b1eedb2fc8a1036f0fa534bd0a90f3685a1b984e Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Fri, 8 May 2026 13:54:08 +1000 Subject: [PATCH] Add ACP session load event ledger (#79093) * Add ACP session load event ledger * Record ACP prompts after send acceptance * Support ACP ledger replay by session key * Harden ACP ledger replay completeness * Harden ACP ledger review gaps * Fix ACP canonical session key handling --------- Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com> --- CHANGELOG.md | 1 + docs/cli/acp.md | 8 +- src/acp/event-ledger.test.ts | 369 ++++++++++++++++++ src/acp/event-ledger.ts | 485 ++++++++++++++++++++++++ src/acp/server.ts | 6 +- src/acp/session.ts | 11 +- src/acp/translator.event-ledger.test.ts | 396 +++++++++++++++++++ src/acp/translator.stop-reason.test.ts | 57 +++ src/acp/translator.ts | 358 ++++++++++++++--- src/acp/types.ts | 1 + 10 files changed, 1643 insertions(+), 49 deletions(-) create mode 100644 src/acp/event-ledger.test.ts create mode 100644 src/acp/event-ledger.ts create mode 100644 src/acp/translator.event-ledger.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d4715c6f6a..182aa1f74ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai - ACPX/Codex: preserve trusted Codex project declarations when launching isolated Codex ACP sessions, avoiding interactive trust prompts in headless runs. Thanks @Stedyclaw. - ACPX/Codex: reap stale OpenClaw-owned ACPX/Codex ACP process trees on startup and after ACP session close, preventing orphaned harness processes from slowing the Gateway. Thanks @91wan. - ACP bridge: implement stable session list, resume, and close handlers so ACP clients can page Gateway sessions, rebind existing sessions without replay, and close bridge sessions cleanly. Thanks @amknight. +- ACP bridge: replay complete ledger-backed ACP sessions on load, including user prompts, tool updates, session metadata, and usage snapshots, while keeping older sessions on the existing transcript fallback. Thanks @amknight. - ACP sessions: allow parent agents to inspect and message their own spawned cross-agent ACP sessions without enabling broad agent-to-agent visibility. Thanks @barronlroth. - Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface. - Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids. diff --git a/docs/cli/acp.md b/docs/cli/acp.md index d336c44200d..09f1ad3006d 100644 --- a/docs/cli/acp.md +++ b/docs/cli/acp.md @@ -44,7 +44,7 @@ Quick rule: | `initialize`, `newSession`, `prompt`, `cancel` | Implemented | Core bridge flow over stdio to Gateway chat/send + abort. | | `listSessions`, slash commands | Implemented | Session list works against Gateway session state with bounded cursor pagination and `cwd` filtering where Gateway session rows carry workspace metadata; commands are advertised via `available_commands_update`. | | `resumeSession`, `closeSession` | Implemented | Resume rebinds an ACP session to an existing Gateway session without replaying history. Close cancels active bridge work, resolves pending prompts as cancelled, and releases bridge session state. | -| `loadSession` | Partial | Rebinds the ACP session to a Gateway session key and replays stored user/assistant text history. Tool/system history is not reconstructed yet. | +| `loadSession` | Partial | Rebinds the ACP session to a Gateway session key and replays ACP event-ledger history for bridge-created sessions. Older/no-ledger sessions fall back to stored user/assistant text. | | Prompt content (`text`, embedded `resource`, images) | Partial | Text/resources are flattened into chat input; images become Gateway attachments. | | Session modes | Partial | `session/set_mode` is supported and the bridge exposes initial Gateway-backed session controls for thought level, tool verbosity, reasoning, usage detail, and elevated actions. Broader ACP-native mode/config surfaces are still out of scope. | | Session info and usage updates | Partial | The bridge emits `session_info_update` and best-effort `usage_update` notifications from cached Gateway session snapshots. Usage is approximate and only sent when Gateway token totals are marked fresh. | @@ -56,9 +56,9 @@ Quick rule: ## Known Limitations -- `loadSession` replays stored user and assistant text history, but it does not - reconstruct historic tool calls, system notices, or richer ACP-native event - types. +- `loadSession` can replay complete ACP event-ledger history only for + bridge-created sessions. Older/no-ledger sessions still use transcript + fallback and do not reconstruct historic tool calls or system notices. - If multiple ACP clients share the same Gateway session key, event and cancel routing are best-effort rather than strictly isolated per client. Prefer the default isolated `acp:` sessions when you need clean editor-local diff --git a/src/acp/event-ledger.test.ts b/src/acp/event-ledger.test.ts new file mode 100644 index 00000000000..46091a6ea7e --- /dev/null +++ b/src/acp/event-ledger.test.ts @@ -0,0 +1,369 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { createFileAcpEventLedger, createInMemoryAcpEventLedger } from "./event-ledger.js"; + +describe("ACP event ledger", () => { + it("records complete in-memory session updates in sequence", async () => { + const ledger = createInMemoryAcpEventLedger({ now: () => 123 }); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "agent:main:work", + cwd: "/work", + complete: true, + }); + await ledger.recordUserPrompt({ + sessionId: "session-1", + sessionKey: "agent:main:work", + runId: "run-1", + prompt: [{ type: "text", text: "Question" }], + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + runId: "run-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Answer" }, + }, + }); + + const replay = await ledger.readReplay({ + sessionId: "session-1", + sessionKey: "agent:main:work", + }); + + expect(replay.complete).toBe(true); + expect(replay.events.map((event) => event.seq)).toEqual([1, 2]); + expect(replay.events.map((event) => event.runId)).toEqual(["run-1", "run-1"]); + expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([ + "user_message_chunk", + "agent_message_chunk", + ]); + }); + + it("marks a session incomplete when event retention truncates history", async () => { + const ledger = createInMemoryAcpEventLedger({ maxEventsPerSession: 1 }); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "agent:main:work", + cwd: "/work", + complete: true, + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "First" }, + }, + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Second" }, + }, + }); + + await expect( + ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }), + ).resolves.toEqual({ complete: false, events: [] }); + }); + + it("persists file-backed replay state across ledger instances", async () => { + await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => { + const filePath = path.join(dir, "acp", "event-ledger.json"); + const first = createFileAcpEventLedger({ filePath, now: () => 1000 }); + await first.startSession({ + sessionId: "session-1", + sessionKey: "agent:main:work", + cwd: "/work", + complete: true, + }); + await first.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + runId: "run-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "Thinking" }, + }, + }); + + const second = createFileAcpEventLedger({ filePath }); + const replay = await second.readReplay({ + sessionId: "session-1", + sessionKey: "agent:main:work", + }); + + expect(replay.complete).toBe(true); + expect(replay.events).toHaveLength(1); + expect(replay.events[0]?.update).toEqual({ + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "Thinking" }, + }); + await expect(fs.readFile(filePath, "utf8")).resolves.toContain('"version":1'); + }); + }); + + it("can replay a complete session by Gateway session key", async () => { + const ledger = createInMemoryAcpEventLedger({ now: () => 1000 }); + await ledger.startSession({ + sessionId: "acp-session-1", + sessionKey: "acp:gateway-session-1", + cwd: "/work", + complete: true, + }); + await ledger.recordUpdate({ + sessionId: "acp-session-1", + sessionKey: "acp:gateway-session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Answer" }, + }, + }); + + const replay = await ledger.readReplayBySessionKey({ + sessionKey: "acp:gateway-session-1", + }); + + expect(replay.complete).toBe(true); + expect(replay.sessionId).toBe("acp-session-1"); + expect(replay.sessionKey).toBe("acp:gateway-session-1"); + expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([ + "agent_message_chunk", + ]); + }); + + it("preserves prompt history when a provisional ACP key becomes a canonical Gateway key", async () => { + const ledger = createInMemoryAcpEventLedger({ now: () => 1000 }); + await ledger.startSession({ + sessionId: "acp-session-1", + sessionKey: "acp:gateway-session-1", + cwd: "/work", + complete: true, + }); + await ledger.recordUserPrompt({ + sessionId: "acp-session-1", + sessionKey: "acp:gateway-session-1", + runId: "run-1", + prompt: [{ type: "text", text: "Question" }], + }); + await ledger.recordUpdate({ + sessionId: "acp-session-1", + sessionKey: "agent:main:acp:gateway-session-1", + runId: "run-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Answer" }, + }, + }); + + const replay = await ledger.readReplayBySessionKey({ + sessionKey: "agent:main:acp:gateway-session-1", + }); + + expect(replay.complete).toBe(true); + expect(replay.sessionId).toBe("acp-session-1"); + expect(replay.sessionKey).toBe("agent:main:acp:gateway-session-1"); + expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([ + "user_message_chunk", + "agent_message_chunk", + ]); + }); + + it("can replay multi-block prompt history by ACP session id", async () => { + const ledger = createInMemoryAcpEventLedger({ now: () => 1000 }); + await ledger.startSession({ + sessionId: "acp-session-1", + sessionKey: "acp:gateway-session-1", + cwd: "/work", + complete: true, + }); + await ledger.recordUserPrompt({ + sessionId: "acp-session-1", + sessionKey: "acp:gateway-session-1", + runId: "run-1", + prompt: [ + { type: "text", text: "First" }, + { type: "text", text: "Second" }, + ], + }); + + const replay = await ledger.readReplayBySessionId({ sessionId: "acp-session-1" }); + + expect(replay.complete).toBe(true); + expect(replay.sessionKey).toBe("acp:gateway-session-1"); + expect( + replay.events.map((event) => + event.update.sessionUpdate === "user_message_chunk" ? event.update.content : undefined, + ), + ).toEqual([ + { type: "text", text: "First" }, + { type: "text", text: "Second" }, + ]); + }); + + it("evicts the oldest complete session when session retention is exceeded", async () => { + let now = 1000; + const ledger = createInMemoryAcpEventLedger({ maxSessions: 1, now: () => now++ }); + await ledger.startSession({ + sessionId: "old-session", + sessionKey: "acp:old-gateway-session", + cwd: "/work", + complete: true, + }); + await ledger.startSession({ + sessionId: "new-session", + sessionKey: "acp:new-gateway-session", + cwd: "/work", + complete: true, + }); + + await expect( + ledger.readReplay({ sessionId: "old-session", sessionKey: "acp:old-gateway-session" }), + ).resolves.toEqual({ complete: false, events: [] }); + const replay = await ledger.readReplayBySessionId({ sessionId: "new-session" }); + expect(replay.complete).toBe(true); + expect(replay.sessionKey).toBe("acp:new-gateway-session"); + }); + + it("resets stale events when a session is restarted with reset", async () => { + const ledger = createInMemoryAcpEventLedger(); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "acp:old-session", + cwd: "/work", + complete: true, + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "acp:old-session", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Old answer" }, + }, + }); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "acp:new-session", + cwd: "/work", + complete: true, + reset: true, + }); + + await expect( + ledger.readReplay({ sessionId: "session-1", sessionKey: "acp:old-session" }), + ).resolves.toEqual({ complete: false, events: [] }); + await expect(ledger.readReplayBySessionId({ sessionId: "session-1" })).resolves.toMatchObject({ + complete: true, + sessionKey: "acp:new-session", + events: [], + }); + }); + + it("marks replay incomplete when serialized byte retention trims payloads", async () => { + const ledger = createInMemoryAcpEventLedger({ maxSerializedBytes: 900 }); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "agent:main:work", + cwd: "/work", + complete: true, + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + update: { + sessionUpdate: "tool_call_update", + toolCallId: "tool-1", + status: "completed", + rawOutput: { content: "x".repeat(5_000) }, + }, + }); + + await expect( + ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }), + ).resolves.toEqual({ complete: false, events: [] }); + }); + + it("keeps the persisted ledger file under the serialized byte budget", async () => { + await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => { + const filePath = path.join(dir, "acp", "event-ledger.json"); + const ledger = createFileAcpEventLedger({ filePath, maxSerializedBytes: 1024 }); + await ledger.startSession({ + sessionId: "session-1", + sessionKey: "agent:main:work", + cwd: "/work", + complete: true, + }); + await ledger.recordUpdate({ + sessionId: "session-1", + sessionKey: "agent:main:work", + update: { + sessionUpdate: "tool_call_update", + toolCallId: "tool-1", + status: "completed", + rawOutput: { content: "x".repeat(5_000) }, + }, + }); + + const bytes = Buffer.byteLength(await fs.readFile(filePath, "utf8"), "utf8"); + expect(bytes).toBeLessThanOrEqual(1024); + await expect( + ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }), + ).resolves.toEqual({ complete: false, events: [] }); + }); + }); + + it("ignores corrupt ledger files instead of replaying unknown state", async () => { + await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => { + const filePath = path.join(dir, "event-ledger.json"); + await fs.writeFile(filePath, "{bad json", "utf8"); + const ledger = createFileAcpEventLedger({ filePath }); + + await expect( + ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }), + ).resolves.toEqual({ complete: false, events: [] }); + }); + }); + + it("reloads file-backed state under lock before writing", async () => { + await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => { + const filePath = path.join(dir, "acp", "event-ledger.json"); + const first = createFileAcpEventLedger({ filePath }); + const second = createFileAcpEventLedger({ filePath }); + + await first.startSession({ + sessionId: "session-1", + sessionKey: "acp:gateway-session-1", + cwd: "/work", + complete: true, + }); + await second.startSession({ + sessionId: "session-2", + sessionKey: "acp:gateway-session-2", + cwd: "/work", + complete: true, + }); + await first.recordUpdate({ + sessionId: "session-1", + sessionKey: "acp:gateway-session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Answer" }, + }, + }); + + const reader = createFileAcpEventLedger({ filePath }); + const replay = await reader.readReplay({ + sessionId: "session-2", + sessionKey: "acp:gateway-session-2", + }); + expect(replay.complete).toBe(true); + expect(replay.sessionKey).toBe("acp:gateway-session-2"); + }); + }); +}); diff --git a/src/acp/event-ledger.ts b/src/acp/event-ledger.ts new file mode 100644 index 00000000000..bc87a679ec6 --- /dev/null +++ b/src/acp/event-ledger.ts @@ -0,0 +1,485 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import type { ContentBlock, SessionUpdate } from "@agentclientprotocol/sdk"; +import { resolveStateDir } from "../config/paths.js"; +import { withFileLock } from "../infra/file-lock.js"; +import { readJsonFile, writeTextAtomic } from "../infra/json-files.js"; +import { isRecord } from "../utils.js"; + +const LEDGER_VERSION = 1; +const DEFAULT_MAX_SESSIONS = 200; +const DEFAULT_MAX_EVENTS_PER_SESSION = 5_000; +const DEFAULT_MAX_SERIALIZED_BYTES = 16 * 1024 * 1024; +const FILE_LEDGER_LOCK_OPTIONS = { + retries: { + retries: 8, + factor: 2, + minTimeout: 50, + maxTimeout: 5_000, + randomize: true, + }, + stale: 15_000, +} as const; + +export type AcpEventLedgerEntry = { + seq: number; + at: number; + sessionId: string; + sessionKey: string; + runId?: string; + update: SessionUpdate; +}; + +export type AcpEventLedgerReplay = { + complete: boolean; + sessionId?: string; + sessionKey?: string; + events: AcpEventLedgerEntry[]; +}; + +export type AcpEventLedger = { + startSession: (params: { + sessionId: string; + sessionKey: string; + cwd: string; + complete: boolean; + reset?: boolean; + }) => Promise; + recordUserPrompt: (params: { + sessionId: string; + sessionKey: string; + runId: string; + prompt: readonly ContentBlock[]; + }) => Promise; + recordUpdate: (params: { + sessionId: string; + sessionKey: string; + runId?: string; + update: SessionUpdate; + }) => Promise; + markIncomplete: (params: { sessionId: string; sessionKey: string }) => Promise; + readReplay: (params: { sessionId: string; sessionKey: string }) => Promise; + readReplayBySessionId: (params: { sessionId: string }) => Promise; + readReplayBySessionKey: (params: { sessionKey: string }) => Promise; +}; + +type LedgerSession = { + sessionId: string; + sessionKey: string; + cwd: string; + complete: boolean; + createdAt: number; + updatedAt: number; + nextSeq: number; + events: AcpEventLedgerEntry[]; +}; + +type LedgerStore = { + version: 1; + sessions: Record; +}; + +type LedgerOptions = { + maxSessions?: number; + maxEventsPerSession?: number; + maxSerializedBytes?: number; + now?: () => number; +}; + +type MutableLedgerState = { + store: LedgerStore; + maxSessions: number; + maxEventsPerSession: number; + maxSerializedBytes: number; + now: () => number; +}; + +function createEmptyStore(): LedgerStore { + return { + version: LEDGER_VERSION, + sessions: {}, + }; +} + +function normalizeLedgerOptions(options: LedgerOptions = {}) { + return { + maxSessions: Math.max(1, Math.floor(options.maxSessions ?? DEFAULT_MAX_SESSIONS)), + maxEventsPerSession: Math.max( + 1, + Math.floor(options.maxEventsPerSession ?? DEFAULT_MAX_EVENTS_PER_SESSION), + ), + maxSerializedBytes: Math.max( + 1_024, + Math.floor(options.maxSerializedBytes ?? DEFAULT_MAX_SERIALIZED_BYTES), + ), + now: options.now ?? Date.now, + }; +} + +function cloneJsonValue(value: T): T { + return structuredClone(value); +} + +function createUserPromptUpdates(prompt: readonly ContentBlock[]): SessionUpdate[] { + return prompt.map((content) => ({ + sessionUpdate: "user_message_chunk", + content: cloneJsonValue(content), + })); +} + +function serializeLedgerStore(store: LedgerStore): string { + return JSON.stringify(store); +} + +function getSerializedLedgerByteLength(store: LedgerStore): number { + return Buffer.byteLength(serializeLedgerStore(store), "utf8"); +} + +function normalizeEvent(raw: unknown): AcpEventLedgerEntry | undefined { + if (!isRecord(raw) || !isRecord(raw.update)) { + return undefined; + } + const seq = raw.seq; + const at = raw.at; + const sessionId = raw.sessionId; + const sessionKey = raw.sessionKey; + const runId = raw.runId; + const sessionUpdate = raw.update.sessionUpdate; + if ( + typeof seq !== "number" || + !Number.isInteger(seq) || + seq < 0 || + typeof at !== "number" || + !Number.isFinite(at) || + typeof sessionId !== "string" || + typeof sessionKey !== "string" || + typeof sessionUpdate !== "string" + ) { + return undefined; + } + return { + seq, + at, + sessionId, + sessionKey, + ...(typeof runId === "string" && runId ? { runId } : {}), + update: cloneJsonValue(raw.update) as SessionUpdate, + }; +} + +function normalizeSession(raw: unknown): LedgerSession | undefined { + if (!isRecord(raw)) { + return undefined; + } + const sessionId = raw.sessionId; + const sessionKey = raw.sessionKey; + const cwd = raw.cwd; + const createdAt = raw.createdAt; + const updatedAt = raw.updatedAt; + const nextSeq = raw.nextSeq; + if ( + typeof sessionId !== "string" || + typeof sessionKey !== "string" || + typeof cwd !== "string" || + typeof createdAt !== "number" || + !Number.isFinite(createdAt) || + typeof updatedAt !== "number" || + !Number.isFinite(updatedAt) || + typeof nextSeq !== "number" || + !Number.isInteger(nextSeq) || + nextSeq < 1 + ) { + return undefined; + } + const events = Array.isArray(raw.events) + ? raw.events.map(normalizeEvent).filter((event): event is AcpEventLedgerEntry => Boolean(event)) + : []; + return { + sessionId, + sessionKey, + cwd, + complete: raw.complete === true, + createdAt, + updatedAt, + nextSeq, + events, + }; +} + +function normalizeStore(raw: unknown): LedgerStore { + if (!isRecord(raw) || raw.version !== LEDGER_VERSION || !isRecord(raw.sessions)) { + return createEmptyStore(); + } + const sessions: Record = {}; + for (const [sessionId, value] of Object.entries(raw.sessions)) { + const session = normalizeSession(value); + if (!session || session.sessionId !== sessionId) { + continue; + } + sessions[sessionId] = session; + } + return { version: LEDGER_VERSION, sessions }; +} + +function getOrCreateSession( + state: MutableLedgerState, + params: { + sessionId: string; + sessionKey: string; + cwd: string; + complete: boolean; + reset?: boolean; + }, +): LedgerSession { + const now = state.now(); + const existing = state.store.sessions[params.sessionId]; + if (!params.reset && existing) { + existing.sessionKey = params.sessionKey; + if (params.cwd) { + existing.cwd = params.cwd; + } + existing.complete = existing.complete || params.complete; + existing.updatedAt = now; + return existing; + } + const session: LedgerSession = { + sessionId: params.sessionId, + sessionKey: params.sessionKey, + cwd: params.cwd, + complete: params.complete, + createdAt: now, + updatedAt: now, + nextSeq: 1, + events: [], + }; + state.store.sessions[params.sessionId] = session; + return session; +} + +function trimLedger(state: MutableLedgerState): void { + for (const session of Object.values(state.store.sessions)) { + if (session.events.length <= state.maxEventsPerSession) { + continue; + } + session.events = session.events.slice(-state.maxEventsPerSession); + session.complete = false; + } + + const sessions = Object.values(state.store.sessions); + if (sessions.length > state.maxSessions) { + for (const session of sessions + .toSorted((a, b) => b.updatedAt - a.updatedAt) + .slice(state.maxSessions)) { + delete state.store.sessions[session.sessionId]; + } + } + + let serializedBytes = getSerializedLedgerByteLength(state.store); + while (serializedBytes > state.maxSerializedBytes) { + const session = Object.values(state.store.sessions) + .filter((candidate) => candidate.events.length > 0) + .toSorted((a, b) => a.updatedAt - b.updatedAt)[0]; + if (!session) { + break; + } + session.events.shift(); + session.complete = false; + serializedBytes = getSerializedLedgerByteLength(state.store); + } + + while (serializedBytes > state.maxSerializedBytes) { + const session = Object.values(state.store.sessions).toSorted( + (a, b) => a.updatedAt - b.updatedAt, + )[0]; + if (!session) { + break; + } + delete state.store.sessions[session.sessionId]; + serializedBytes = getSerializedLedgerByteLength(state.store); + } +} + +function appendUpdate( + state: MutableLedgerState, + params: { + sessionId: string; + sessionKey: string; + runId?: string; + update: SessionUpdate; + }, +): void { + const session = getOrCreateSession(state, { + sessionId: params.sessionId, + sessionKey: params.sessionKey, + cwd: "", + complete: false, + }); + const now = state.now(); + session.updatedAt = now; + session.events.push({ + seq: session.nextSeq, + at: now, + sessionId: params.sessionId, + sessionKey: params.sessionKey, + ...(params.runId ? { runId: params.runId } : {}), + update: cloneJsonValue(params.update), + }); + session.nextSeq += 1; + trimLedger(state); +} + +function createLedgerApi(params: { + state: MutableLedgerState; + mutate: (fn: () => void) => Promise; + read: (fn: () => T) => Promise; +}): AcpEventLedger { + const buildReplay = (session: LedgerSession): AcpEventLedgerReplay => ({ + complete: true, + sessionId: session.sessionId, + sessionKey: session.sessionKey, + events: session.events.map((event) => cloneJsonValue(event)), + }); + + return { + async startSession(sessionParams) { + await params.mutate(() => { + getOrCreateSession(params.state, sessionParams); + trimLedger(params.state); + }); + }, + + async recordUserPrompt(promptParams) { + await params.mutate(() => { + for (const update of createUserPromptUpdates(promptParams.prompt)) { + appendUpdate(params.state, { + sessionId: promptParams.sessionId, + sessionKey: promptParams.sessionKey, + runId: promptParams.runId, + update, + }); + } + }); + }, + + async recordUpdate(updateParams) { + await params.mutate(() => { + appendUpdate(params.state, updateParams); + }); + }, + + async markIncomplete(markParams) { + await params.mutate(() => { + const session = params.state.store.sessions[markParams.sessionId]; + if (!session || session.sessionKey !== markParams.sessionKey) { + return; + } + session.complete = false; + session.updatedAt = params.state.now(); + }); + }, + + async readReplay(replayParams) { + return params.read(() => { + const session = params.state.store.sessions[replayParams.sessionId]; + if (!session || session.sessionKey !== replayParams.sessionKey || !session.complete) { + return { complete: false, events: [] }; + } + return buildReplay(session); + }); + }, + + async readReplayBySessionId(replayParams) { + return params.read(() => { + const session = params.state.store.sessions[replayParams.sessionId]; + if (!session || !session.complete) { + return { complete: false, events: [] }; + } + return buildReplay(session); + }); + }, + + async readReplayBySessionKey(replayParams) { + return params.read(() => { + const session = Object.values(params.state.store.sessions) + .filter( + (candidate) => candidate.sessionKey === replayParams.sessionKey && candidate.complete, + ) + .toSorted((a, b) => b.updatedAt - a.updatedAt)[0]; + if (!session) { + return { complete: false, events: [] }; + } + return buildReplay(session); + }); + }, + }; +} + +export function createInMemoryAcpEventLedger(options: LedgerOptions = {}): AcpEventLedger { + const normalized = normalizeLedgerOptions(options); + const state: MutableLedgerState = { + store: createEmptyStore(), + ...normalized, + }; + return createLedgerApi({ + state, + mutate: async (fn) => { + fn(); + }, + read: async (fn) => fn(), + }); +} + +export function resolveDefaultAcpEventLedgerPath(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveStateDir(env), "acp", "event-ledger.json"); +} + +export function createFileAcpEventLedger( + params: { filePath: string } & LedgerOptions, +): AcpEventLedger { + const normalized = normalizeLedgerOptions(params); + const state: MutableLedgerState = { + store: createEmptyStore(), + ...normalized, + }; + let operation = Promise.resolve(); + + const load = async () => { + state.store = normalizeStore(await readJsonFile(params.filePath)); + }; + const ensureParentDir = async () => { + await fs.mkdir(path.dirname(params.filePath), { recursive: true, mode: 0o700 }); + }; + + const enqueue = async (fn: () => Promise): Promise => { + const task = operation.then(fn, fn); + operation = task.then( + () => {}, + () => {}, + ); + return task; + }; + + return createLedgerApi({ + state, + mutate: async (fn) => + enqueue(async () => { + await ensureParentDir(); + await withFileLock(params.filePath, FILE_LEDGER_LOCK_OPTIONS, async () => { + await load(); + fn(); + await writeTextAtomic(params.filePath, serializeLedgerStore(state.store), { + mode: 0o600, + dirMode: 0o700, + }); + }); + }), + read: async (fn) => + enqueue(async () => { + await ensureParentDir(); + return await withFileLock(params.filePath, FILE_LEDGER_LOCK_OPTIONS, async () => { + await load(); + return fn(); + }); + }), + }); +} diff --git a/src/acp/server.ts b/src/acp/server.ts index 1d47545a256..f6759e080ad 100644 --- a/src/acp/server.ts +++ b/src/acp/server.ts @@ -10,6 +10,7 @@ import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/ import { isMainModule } from "../infra/is-main.js"; import { routeLogsToStderr } from "../logging/console.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; +import { createFileAcpEventLedger, resolveDefaultAcpEventLedgerPath } from "./event-ledger.js"; import { readSecretFromFile } from "./secret-file.js"; import { AcpGatewayAgent } from "./translator.js"; import { normalizeAcpProvenanceMode, type AcpServerOptions } from "./types.js"; @@ -121,9 +122,12 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise; const stream = ndJsonStream(input, output); + const eventLedger = createFileAcpEventLedger({ + filePath: resolveDefaultAcpEventLedgerPath(process.env), + }); const _connection = new AgentSideConnection((conn: AgentSideConnection) => { - agent = new AcpGatewayAgent(conn, gateway, opts); + agent = new AcpGatewayAgent(conn, gateway, { ...opts, eventLedger }); agent.start(); return agent; }, stream); diff --git a/src/acp/session.ts b/src/acp/session.ts index 53be711bd66..168e7bebe06 100644 --- a/src/acp/session.ts +++ b/src/acp/session.ts @@ -2,7 +2,12 @@ import { randomUUID } from "node:crypto"; import type { AcpSession } from "./types.js"; export type AcpSessionStore = { - createSession: (params: { sessionKey: string; cwd: string; sessionId?: string }) => AcpSession; + createSession: (params: { + sessionKey: string; + cwd: string; + sessionId?: string; + ledgerSessionId?: string; + }) => AcpSession; hasSession: (sessionId: string) => boolean; getSession: (sessionId: string) => AcpSession | undefined; getSessionByRunId: (runId: string) => AcpSession | undefined; @@ -84,6 +89,9 @@ export function createInMemorySessionStore(options: AcpSessionStoreOptions = {}) const existingSession = sessions.get(sessionId); if (existingSession) { existingSession.sessionKey = params.sessionKey; + if ("ledgerSessionId" in params) { + existingSession.ledgerSessionId = params.ledgerSessionId; + } existingSession.cwd = params.cwd; touchSession(existingSession, nowMs); return existingSession; @@ -97,6 +105,7 @@ export function createInMemorySessionStore(options: AcpSessionStoreOptions = {}) const session: AcpSession = { sessionId, sessionKey: params.sessionKey, + ...(params.ledgerSessionId ? { ledgerSessionId: params.ledgerSessionId } : {}), cwd: params.cwd, createdAt: nowMs, lastTouchedAt: nowMs, diff --git a/src/acp/translator.event-ledger.test.ts b/src/acp/translator.event-ledger.test.ts new file mode 100644 index 00000000000..1a163ec0dad --- /dev/null +++ b/src/acp/translator.event-ledger.test.ts @@ -0,0 +1,396 @@ +import type { + LoadSessionRequest, + NewSessionRequest, + PromptRequest, +} from "@agentclientprotocol/sdk"; +import { describe, expect, it, vi } from "vitest"; +import type { GatewayClient } from "../gateway/client.js"; +import type { EventFrame } from "../gateway/protocol/index.js"; +import { createInMemoryAcpEventLedger, type AcpEventLedger } from "./event-ledger.js"; +import { createInMemorySessionStore } from "./session.js"; +import { AcpGatewayAgent } from "./translator.js"; +import { createAcpConnection, createAcpGateway } from "./translator.test-helpers.js"; + +vi.mock("./commands.js", () => ({ + getAvailableCommands: () => [], +})); + +function createNewSessionRequest(cwd = "/tmp"): NewSessionRequest { + return { + cwd, + mcpServers: [], + _meta: {}, + } as unknown as NewSessionRequest; +} + +function createLoadSessionRequest(sessionId: string, cwd = "/tmp"): LoadSessionRequest { + return { + sessionId, + cwd, + mcpServers: [], + _meta: {}, + } as unknown as LoadSessionRequest; +} + +function createPromptRequest(sessionId: string, text: string): PromptRequest { + return { + sessionId, + prompt: [{ type: "text", text }], + _meta: {}, + } as unknown as PromptRequest; +} + +function createToolEvent(params: { + sessionKey: string; + runId: string; + phase: "start" | "result"; + toolCallId: string; +}): EventFrame { + return { + event: "agent", + payload: { + sessionKey: params.sessionKey, + runId: params.runId, + stream: "tool", + data: { + phase: params.phase, + toolCallId: params.toolCallId, + name: "read", + args: { path: "src/app.ts" }, + result: { content: [{ type: "text", text: "FILE:src/app.ts" }] }, + }, + }, + } as unknown as EventFrame; +} + +function createChatEvent(params: { + sessionKey: string; + runId: string; + state: "delta" | "final"; + text: string; +}): EventFrame { + return { + event: "chat", + payload: { + sessionKey: params.sessionKey, + runId: params.runId, + state: params.state, + message: { + content: [{ type: "text", text: params.text }], + }, + }, + } as unknown as EventFrame; +} + +describe("ACP translator event ledger replay", () => { + it("loads complete ledger-backed sessions without the lossy Gateway transcript fallback", async () => { + const eventLedger = createInMemoryAcpEventLedger(); + const firstSessionStore = createInMemorySessionStore(); + const firstConnection = createAcpConnection(); + const firstRequestMock = vi.fn(async (method: string) => { + if (method === "chat.send") { + return { ok: true }; + } + return { ok: true }; + }); + const firstRequest = firstRequestMock as GatewayClient["request"]; + const firstAgent = new AcpGatewayAgent(firstConnection, createAcpGateway(firstRequest), { + eventLedger, + sessionStore: firstSessionStore, + }); + + const created = await firstAgent.newSession(createNewSessionRequest()); + const firstSession = firstSessionStore.getSession(created.sessionId); + if (!firstSession) { + throw new Error("Expected new ACP session to be stored"); + } + firstConnection.__sessionUpdateMock.mockClear(); + + const promptPromise = firstAgent.prompt(createPromptRequest(created.sessionId, "Question")); + for (let attempt = 0; attempt < 10; attempt += 1) { + if (firstRequestMock.mock.calls.some((call) => call[0] === "chat.send")) { + break; + } + await new Promise((resolve) => { + setImmediate(resolve); + }); + } + const runId = firstSessionStore.getSession(created.sessionId)?.activeRunId; + if (!runId) { + throw new Error("Expected active ACP run"); + } + + await firstAgent.handleGatewayEvent( + createToolEvent({ + sessionKey: firstSession.sessionKey, + runId, + phase: "start", + toolCallId: "tool-1", + }), + ); + await firstAgent.handleGatewayEvent( + createToolEvent({ + sessionKey: firstSession.sessionKey, + runId, + phase: "result", + toolCallId: "tool-1", + }), + ); + await firstAgent.handleGatewayEvent( + createChatEvent({ + sessionKey: firstSession.sessionKey, + runId, + state: "delta", + text: "Answer", + }), + ); + await firstAgent.handleGatewayEvent( + createChatEvent({ + sessionKey: firstSession.sessionKey, + runId, + state: "final", + text: "Answer", + }), + ); + await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" }); + + const secondConnection = createAcpConnection(); + const secondRequestMock = vi.fn(async (method: string) => { + if (method === "sessions.get") { + throw new Error("ledger replay should not call sessions.get"); + } + return { ok: true }; + }); + const secondRequest = secondRequestMock as GatewayClient["request"]; + const secondAgent = new AcpGatewayAgent(secondConnection, createAcpGateway(secondRequest), { + eventLedger, + sessionStore: createInMemorySessionStore(), + }); + + await secondAgent.loadSession(createLoadSessionRequest(created.sessionId)); + + expect(secondRequestMock).not.toHaveBeenCalledWith("sessions.get", expect.anything()); + const replayedUpdates = secondConnection.__sessionUpdateMock.mock.calls.map( + (call) => call[0]?.update, + ); + const replayedUpdateTypes = replayedUpdates.map((update) => update?.sessionUpdate); + expect(replayedUpdateTypes).toEqual( + expect.arrayContaining([ + "session_info_update", + "available_commands_update", + "user_message_chunk", + "tool_call", + "tool_call_update", + "agent_message_chunk", + ]), + ); + expect(replayedUpdates).toContainEqual({ + sessionUpdate: "user_message_chunk", + content: { type: "text", text: "Question" }, + }); + expect(replayedUpdates).toContainEqual({ + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "Answer" }, + }); + expect(replayedUpdateTypes.indexOf("user_message_chunk")).toBeLessThan( + replayedUpdateTypes.indexOf("agent_message_chunk"), + ); + + const ledgerReplay = await eventLedger.readReplay({ + sessionId: created.sessionId, + sessionKey: firstSession.sessionKey, + }); + expect( + ledgerReplay.events.filter((event) => event.update.sessionUpdate === "user_message_chunk"), + ).toHaveLength(1); + + const listedSessionStore = createInMemorySessionStore(); + const listedConnection = createAcpConnection(); + const listedRequestMock = vi.fn(async (method: string) => { + if (method === "sessions.get") { + throw new Error("listed session ledger replay should not call sessions.get"); + } + return { ok: true }; + }); + const listedAgent = new AcpGatewayAgent( + listedConnection, + createAcpGateway(listedRequestMock as GatewayClient["request"]), + { + eventLedger, + sessionStore: listedSessionStore, + }, + ); + + await listedAgent.loadSession(createLoadSessionRequest(firstSession.sessionKey)); + + expect(listedRequestMock).not.toHaveBeenCalledWith("sessions.get", expect.anything()); + const listedReplayTypes = listedConnection.__sessionUpdateMock.mock.calls.map( + (call) => call[0]?.update?.sessionUpdate, + ); + expect(listedReplayTypes).toEqual( + expect.arrayContaining(["user_message_chunk", "tool_call", "agent_message_chunk"]), + ); + + const listedPrompt = listedAgent.prompt( + createPromptRequest(firstSession.sessionKey, "Follow-up"), + ); + for (let attempt = 0; attempt < 10; attempt += 1) { + if (listedRequestMock.mock.calls.some((call) => call[0] === "chat.send")) { + break; + } + await new Promise((resolve) => { + setImmediate(resolve); + }); + } + const listedRunId = listedSessionStore.getSession(firstSession.sessionKey)?.activeRunId; + if (!listedRunId) { + throw new Error("Expected listed ACP session to have an active run"); + } + await listedAgent.handleGatewayEvent( + createChatEvent({ + sessionKey: firstSession.sessionKey, + runId: listedRunId, + state: "final", + text: "Follow-up answer", + }), + ); + await expect(listedPrompt).resolves.toEqual({ stopReason: "end_turn" }); + + const canonicalReplay = await eventLedger.readReplay({ + sessionId: created.sessionId, + sessionKey: firstSession.sessionKey, + }); + expect( + canonicalReplay.events.filter((event) => event.update.sessionUpdate === "user_message_chunk"), + ).toHaveLength(2); + await expect( + eventLedger.readReplayBySessionId({ sessionId: firstSession.sessionKey }), + ).resolves.toMatchObject({ complete: false }); + + firstSessionStore.clearAllSessionsForTest(); + }); + + it("does not replay prompts that Gateway rejected before accepting the send", async () => { + const eventLedger = createInMemoryAcpEventLedger(); + const sessionStore = createInMemorySessionStore(); + const connection = createAcpConnection(); + const requestMock = vi.fn(async (method: string) => { + if (method === "chat.send") { + throw new Error("send failed before acceptance"); + } + return { ok: true }; + }); + const agent = new AcpGatewayAgent( + connection, + createAcpGateway(requestMock as GatewayClient["request"]), + { + eventLedger, + sessionStore, + }, + ); + + const created = await agent.newSession(createNewSessionRequest()); + const session = sessionStore.getSession(created.sessionId); + if (!session) { + throw new Error("Expected new ACP session to be stored"); + } + + await expect( + agent.prompt(createPromptRequest(created.sessionId, "Never accepted")), + ).rejects.toThrow("send failed before acceptance"); + + const replay = await eventLedger.readReplay({ + sessionId: created.sessionId, + sessionKey: session.sessionKey, + }); + expect(replay.events.map((event) => event.update.sessionUpdate)).not.toContain( + "user_message_chunk", + ); + + const loadConnection = createAcpConnection(); + const loadRequestMock = vi.fn(async (method: string) => { + if (method === "sessions.get") { + throw new Error("ledger replay should not call sessions.get"); + } + return { ok: true }; + }); + const loadAgent = new AcpGatewayAgent( + loadConnection, + createAcpGateway(loadRequestMock as GatewayClient["request"]), + { + eventLedger, + sessionStore: createInMemorySessionStore(), + }, + ); + + await loadAgent.loadSession(createLoadSessionRequest(created.sessionId)); + + const replayedUpdates = loadConnection.__sessionUpdateMock.mock.calls.map( + (call) => call[0]?.update?.sessionUpdate, + ); + expect(replayedUpdates).not.toContain("user_message_chunk"); + }); + + it("marks replay incomplete when an accepted prompt cannot be recorded", async () => { + const innerLedger = createInMemoryAcpEventLedger(); + let markIncompleteResolve: ((value: unknown) => void) | undefined; + const markIncompletePromise = new Promise((resolve) => { + markIncompleteResolve = resolve; + }); + const eventLedger: AcpEventLedger = { + ...innerLedger, + recordUserPrompt: async () => { + throw new Error("ledger write failed"); + }, + markIncomplete: async (params) => { + await innerLedger.markIncomplete(params); + markIncompleteResolve?.(params); + }, + }; + const sessionStore = createInMemorySessionStore(); + const connection = createAcpConnection(); + const requestMock = vi.fn(async (_method: string) => ({ ok: true })); + const agent = new AcpGatewayAgent( + connection, + createAcpGateway(requestMock as GatewayClient["request"]), + { + eventLedger, + sessionStore, + }, + ); + + const created = await agent.newSession(createNewSessionRequest()); + const session = sessionStore.getSession(created.sessionId); + if (!session) { + throw new Error("Expected new ACP session to be stored"); + } + + const prompt = agent.prompt(createPromptRequest(created.sessionId, "Question")); + for (let attempt = 0; attempt < 10; attempt += 1) { + if (requestMock.mock.calls.some((call) => call[0] === "chat.send")) { + break; + } + await new Promise((resolve) => { + setImmediate(resolve); + }); + } + await markIncompletePromise; + const runId = sessionStore.getSession(created.sessionId)?.activeRunId; + if (!runId) { + throw new Error("Expected active ACP run"); + } + await agent.handleGatewayEvent( + createChatEvent({ + sessionKey: session.sessionKey, + runId, + state: "final", + text: "Answer", + }), + ); + await expect(prompt).resolves.toEqual({ stopReason: "end_turn" }); + + await expect( + innerLedger.readReplay({ sessionId: created.sessionId, sessionKey: session.sessionKey }), + ).resolves.toEqual({ complete: false, events: [] }); + }); +}); diff --git a/src/acp/translator.stop-reason.test.ts b/src/acp/translator.stop-reason.test.ts index f110f23c06c..f1d97f9b7a7 100644 --- a/src/acp/translator.stop-reason.test.ts +++ b/src/acp/translator.stop-reason.test.ts @@ -59,6 +59,63 @@ describe("acp translator stop reason mapping", () => { await expect(promptPromise).resolves.toEqual({ stopReason: "cancelled" }); }); + it("reconciles provisional ACP session keys to canonical Gateway keys by run id", async () => { + const sentRunIds: string[] = []; + const request = vi.fn(async (method: string, params?: Record) => { + if (method === "chat.send") { + const runId = params?.idempotencyKey; + if (typeof runId === "string") { + sentRunIds.push(runId); + } + } + return {}; + }) as GatewayClient["request"]; + const { agent, sessionId, sessionStore } = createSessionAgentHarness(request, { + sessionKey: "acp:session-1", + }); + + const firstPrompt = promptAgent(agent, sessionId); + await vi.waitFor(() => { + expect(sentRunIds).toHaveLength(1); + }); + await agent.handleGatewayEvent( + createChatEvent({ + runId: sentRunIds[0], + sessionKey: "agent:main:acp:session-1", + seq: 1, + state: "final", + message: { + content: [{ type: "text", text: "first" }], + }, + }), + ); + + await expect(firstPrompt).resolves.toEqual({ stopReason: "end_turn" }); + expect(sessionStore.getSession(sessionId)?.sessionKey).toBe("agent:main:acp:session-1"); + + const secondPrompt = promptAgent(agent, sessionId, "again"); + await vi.waitFor(() => { + expect(sentRunIds).toHaveLength(2); + }); + expect(request).toHaveBeenLastCalledWith( + "chat.send", + expect.objectContaining({ + sessionKey: "agent:main:acp:session-1", + }), + { timeoutMs: null }, + ); + await agent.handleGatewayEvent( + createChatEvent({ + runId: sentRunIds[1], + sessionKey: "agent:main:acp:session-1", + seq: 2, + state: "final", + }), + ); + + await expect(secondPrompt).resolves.toEqual({ stopReason: "end_turn" }); + }); + it("keeps in-flight prompts pending across transient gateway disconnects", async () => { const { agent, promptPromise, runId } = await createPendingPromptHarness(); const settleSpy = observeSettlement(promptPromise); diff --git a/src/acp/translator.ts b/src/acp/translator.ts index b8dcbd4e795..27fcb73928f 100644 --- a/src/acp/translator.ts +++ b/src/acp/translator.ts @@ -24,6 +24,7 @@ import type { SessionConfigOption, SessionInfo, SessionModeState, + SessionUpdate, SetSessionConfigOptionRequest, SetSessionConfigOptionResponse, SetSessionModeRequest, @@ -42,6 +43,11 @@ import { } from "../infra/fixed-window-rate-limit.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; import { shortenHomePath } from "../utils.js"; +import { + createInMemoryAcpEventLedger, + type AcpEventLedger, + type AcpEventLedgerReplay, +} from "./event-ledger.js"; import { extractAttachmentsFromPrompt, extractToolCallContent, @@ -97,6 +103,7 @@ type DisconnectContext = { type PendingPrompt = { sessionId: string; sessionKey: string; + ledgerSessionId?: string; idempotencyKey: string; sendAccepted?: boolean; disconnectContext?: DisconnectContext; @@ -117,6 +124,7 @@ type PendingToolCall = { }; type AcpGatewayAgentOptions = AcpServerOptions & { + eventLedger?: AcpEventLedger; sessionStore?: AcpSessionStore; }; @@ -501,12 +509,26 @@ function buildSystemProvenanceReceipt(params: { ].join("\n"); } +function hasExplicitSessionRouting( + meta: ReturnType, + opts: AcpServerOptions, +): boolean { + return Boolean( + meta.sessionKey || meta.sessionLabel || opts.defaultSessionKey || opts.defaultSessionLabel, + ); +} + +function resolveLedgerSessionId(session: { sessionId: string; ledgerSessionId?: string }): string { + return session.ledgerSessionId ?? session.sessionId; +} + export class AcpGatewayAgent implements Agent { private connection: AgentSideConnection; private gateway: GatewayClient; private opts: AcpGatewayAgentOptions; private log: (msg: string) => void; private sessionStore: AcpSessionStore; + private eventLedger: AcpEventLedger; private sessionCreateRateLimiter: FixedWindowRateLimiter; private pendingPrompts = new Map(); private disconnectTimer: NodeJS.Timeout | null = null; @@ -531,6 +553,7 @@ export class AcpGatewayAgent implements Agent { this.opts = opts; this.log = opts.verbose ? (msg: string) => process.stderr.write(`[acp] ${msg}\n`) : () => {}; this.sessionStore = opts.sessionStore ?? defaultAcpSessionStore; + this.eventLedger = opts.eventLedger ?? createInMemoryAcpEventLedger(); this.sessionCreateRateLimiter = createFixedWindowRateLimiter({ maxRequests: Math.max( 1, @@ -625,12 +648,14 @@ export class AcpGatewayAgent implements Agent { sessionKey, cwd: params.cwd, }); + await this.startLedgerSession(session, { complete: true, reset: true }); this.log(`newSession: ${session.sessionId} -> ${session.sessionKey}`); const sessionSnapshot = await this.getSessionSnapshot(session.sessionKey); - await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, { + await this.sendSessionSnapshotUpdate(session, sessionSnapshot, { includeControls: false, + record: true, }); - await this.sendAvailableCommands(session.sessionId); + await this.sendAvailableCommands(session, { record: true }); const { configOptions, modes } = sessionSnapshot; return { sessionId: session.sessionId, @@ -646,29 +671,56 @@ export class AcpGatewayAgent implements Agent { } const meta = parseSessionMeta(params._meta); + const hasExplicitRouting = hasExplicitSessionRouting(meta, this.opts); + const exactLedgerReplay: AcpEventLedgerReplay = hasExplicitRouting + ? { complete: false, events: [] } + : await this.readLedgerReplayBySessionId(params.sessionId); + const listedLedgerReplay: AcpEventLedgerReplay = + !hasExplicitRouting && !exactLedgerReplay.complete + ? await this.readLedgerReplayBySessionKey(params.sessionId) + : { complete: false, events: [] }; + const routedLedgerReplay = exactLedgerReplay.complete ? exactLedgerReplay : listedLedgerReplay; const sessionKey = await this.resolveSessionKeyFromMeta({ meta, - fallbackKey: params.sessionId, + fallbackKey: routedLedgerReplay.sessionKey ?? params.sessionId, }); + const ledgerReplay = + exactLedgerReplay.complete && exactLedgerReplay.sessionKey === sessionKey + ? exactLedgerReplay + : listedLedgerReplay.complete && listedLedgerReplay.sessionKey === sessionKey + ? listedLedgerReplay + : await this.readLedgerReplay({ + sessionId: params.sessionId, + sessionKey, + }); const session = this.sessionStore.createSession({ sessionId: params.sessionId, sessionKey, + ...(ledgerReplay.sessionId ? { ledgerSessionId: ledgerReplay.sessionId } : {}), cwd: params.cwd, }); + await this.startLedgerSession(session, { complete: ledgerReplay.complete }); this.log(`loadSession: ${session.sessionId} -> ${session.sessionKey}`); const [sessionSnapshot, transcript] = await Promise.all([ this.getSessionSnapshot(session.sessionKey), - this.getSessionTranscript(session.sessionKey).catch((err) => { - this.log(`session transcript fallback for ${session.sessionKey}: ${String(err)}`); - return []; - }), + ledgerReplay.complete + ? Promise.resolve([]) + : this.getSessionTranscript(session.sessionKey).catch((err) => { + this.log(`session transcript fallback for ${session.sessionKey}: ${String(err)}`); + return []; + }), ]); - await this.replaySessionTranscript(session.sessionId, transcript); - await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, { + if (ledgerReplay.complete) { + await this.replayLedgerSession(session.sessionId, ledgerReplay); + } else { + await this.replaySessionTranscript(session.sessionId, transcript); + } + await this.sendSessionSnapshotUpdate(session, sessionSnapshot, { includeControls: false, + record: false, }); - await this.sendAvailableCommands(session.sessionId); + await this.sendAvailableCommands(session, { record: false }); const { configOptions, modes } = sessionSnapshot; return { configOptions, modes }; } @@ -754,11 +806,13 @@ export class AcpGatewayAgent implements Agent { sessionKey, cwd: params.cwd, }); + await this.startLedgerSession(session, { complete: false }); this.log(`resumeSession: ${session.sessionId} -> ${session.sessionKey}`); - await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, { + await this.sendSessionSnapshotUpdate(session, sessionSnapshot, { includeControls: false, + record: false, }); - await this.sendAvailableCommands(session.sessionId); + await this.sendAvailableCommands(session, { record: false }); const { configOptions, modes } = sessionSnapshot; return { configOptions, modes }; } @@ -795,8 +849,9 @@ export class AcpGatewayAgent implements Agent { const sessionSnapshot = await this.getSessionSnapshot(session.sessionKey, { thinkingLevel: params.modeId, }); - await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, { + await this.sendSessionSnapshotUpdate(session, sessionSnapshot, { includeControls: true, + record: true, }); } catch (err) { this.log(`setSessionMode error: ${String(err)}`); @@ -828,8 +883,9 @@ export class AcpGatewayAgent implements Agent { session.sessionKey, sessionPatch.overrides, ); - await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, { + await this.sendSessionSnapshotUpdate(session, sessionSnapshot, { includeControls: true, + record: true, }); return { configOptions: sessionSnapshot.configOptions, @@ -892,6 +948,7 @@ export class AcpGatewayAgent implements Agent { this.pendingPrompts.set(params.sessionId, { sessionId: params.sessionId, sessionKey: session.sessionKey, + ...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}), idempotencyKey: runId, disconnectContext: this.activeDisconnectContext ?? undefined, resolve, @@ -902,6 +959,12 @@ export class AcpGatewayAgent implements Agent { } const sendWithProvenanceFallback = async () => { + const markSendAccepted = () => { + const pending = this.getPendingPrompt(params.sessionId, runId); + if (pending) { + pending.sendAccepted = true; + } + }; try { await this.gateway.request( "chat.send", @@ -912,20 +975,16 @@ export class AcpGatewayAgent implements Agent { }, { timeoutMs: null }, ); - const pending = this.getPendingPrompt(params.sessionId, runId); - if (pending) { - pending.sendAccepted = true; - } + markSendAccepted(); + await this.recordUserPrompt(session, runId, params.prompt); } catch (err) { if ( (systemInputProvenance || systemProvenanceReceipt) && isAdminScopeProvenanceRejection(err) ) { await this.gateway.request("chat.send", requestParams, { timeoutMs: null }); - const pending = this.getPendingPrompt(params.sessionId, runId); - if (pending) { - pending.sendAccepted = true; - } + markSendAccepted(); + await this.recordUserPrompt(session, runId, params.prompt); return; } throw err; @@ -1018,8 +1077,12 @@ export class AcpGatewayAgent implements Agent { rawInput: args, locations, }); - await this.connection.sessionUpdate({ + await this.emitSessionUpdate({ sessionId: pending.sessionId, + sessionKey: pending.sessionKey, + ...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}), + runId: pending.idempotencyKey, + record: true, update: { sessionUpdate: "tool_call", toolCallId, @@ -1036,8 +1099,12 @@ export class AcpGatewayAgent implements Agent { if (phase === "update") { const toolState = pending.toolCalls?.get(toolCallId); const partialResult = data.partialResult; - await this.connection.sessionUpdate({ + await this.emitSessionUpdate({ sessionId: pending.sessionId, + sessionKey: pending.sessionKey, + ...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}), + runId: pending.idempotencyKey, + record: true, update: { sessionUpdate: "tool_call_update", toolCallId, @@ -1054,8 +1121,12 @@ export class AcpGatewayAgent implements Agent { const isError = Boolean(data.isError); const toolState = pending.toolCalls?.get(toolCallId); pending.toolCalls?.delete(toolCallId); - await this.connection.sessionUpdate({ + await this.emitSessionUpdate({ sessionId: pending.sessionId, + sessionKey: pending.sessionKey, + ...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}), + runId: pending.idempotencyKey, + record: true, update: { sessionUpdate: "tool_call_update", toolCallId, @@ -1135,8 +1206,12 @@ export class AcpGatewayAgent implements Agent { const newThought = fullThought.slice(sentThoughtSoFar); pending.sentThoughtLength = fullThought.length; pending.sentThought = fullThought; - await this.connection.sessionUpdate({ + await this.emitSessionUpdate({ sessionId, + sessionKey: pending.sessionKey, + ...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}), + runId: pending.idempotencyKey, + record: true, update: { sessionUpdate: "agent_thought_chunk", content: { type: "text", text: newThought }, @@ -1157,8 +1232,12 @@ export class AcpGatewayAgent implements Agent { const newText = fullText.slice(sentSoFar); pending.sentTextLength = fullText.length; pending.sentText = fullText; - await this.connection.sessionUpdate({ + await this.emitSessionUpdate({ sessionId, + sessionKey: pending.sessionKey, + ...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}), + runId: pending.idempotencyKey, + record: true, update: { sessionUpdate: "agent_message_chunk", content: { type: "text", text: newText }, @@ -1178,9 +1257,19 @@ export class AcpGatewayAgent implements Agent { } const sessionSnapshot = await this.getSessionSnapshot(pending.sessionKey); try { - await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, { - includeControls: false, - }); + await this.sendSessionSnapshotUpdate( + { + sessionId, + sessionKey: pending.sessionKey, + ...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}), + }, + sessionSnapshot, + { + includeControls: false, + record: true, + runId: pending.idempotencyKey, + }, + ); } catch (err) { this.log(`session snapshot update failed for ${sessionId}: ${String(err)}`); } @@ -1197,9 +1286,30 @@ export class AcpGatewayAgent implements Agent { } return pending; } + if (runId) { + for (const pending of this.pendingPrompts.values()) { + if (pending.idempotencyKey !== runId) { + continue; + } + this.reconcilePendingSessionKey(pending, sessionKey); + return pending; + } + } return undefined; } + private reconcilePendingSessionKey(pending: PendingPrompt, sessionKey: string): void { + if (pending.sessionKey === sessionKey) { + return; + } + this.log(`session key reconciled: ${pending.sessionKey} -> ${sessionKey}`); + pending.sessionKey = sessionKey; + const session = this.sessionStore.getSession(pending.sessionId); + if (session?.activeRunId === pending.idempotencyKey) { + session.sessionKey = sessionKey; + } + } + private clearDisconnectTimer(): void { if (!this.disconnectTimer) { return; @@ -1351,9 +1461,142 @@ export class AcpGatewayAgent implements Agent { return true; } - private async sendAvailableCommands(sessionId: string): Promise { + private async startLedgerSession( + session: { sessionId: string; sessionKey: string; ledgerSessionId?: string; cwd: string }, + options: { complete: boolean; reset?: boolean }, + ): Promise { + try { + await this.eventLedger.startSession({ + sessionId: resolveLedgerSessionId(session), + sessionKey: session.sessionKey, + cwd: session.cwd, + complete: options.complete, + ...(options.reset ? { reset: true } : {}), + }); + } catch (err) { + this.log(`event ledger session start failed for ${session.sessionId}: ${String(err)}`); + } + } + + private async readLedgerReplay(params: { + sessionId: string; + sessionKey: string; + }): Promise { + try { + return await this.eventLedger.readReplay(params); + } catch (err) { + this.log(`event ledger replay fallback for ${params.sessionId}: ${String(err)}`); + return { complete: false, events: [] }; + } + } + + private async readLedgerReplayBySessionId(sessionId: string): Promise { + try { + return await this.eventLedger.readReplayBySessionId({ sessionId }); + } catch (err) { + this.log(`event ledger exact replay fallback for ${sessionId}: ${String(err)}`); + return { complete: false, events: [] }; + } + } + + private async readLedgerReplayBySessionKey(sessionKey: string): Promise { + try { + return await this.eventLedger.readReplayBySessionKey({ sessionKey }); + } catch (err) { + this.log(`event ledger session-key replay fallback for ${sessionKey}: ${String(err)}`); + return { complete: false, events: [] }; + } + } + + private async recordUserPrompt( + session: { sessionId: string; sessionKey: string; ledgerSessionId?: string }, + runId: string, + prompt: PromptRequest["prompt"], + ): Promise { + try { + await this.eventLedger.recordUserPrompt({ + sessionId: resolveLedgerSessionId(session), + sessionKey: session.sessionKey, + runId, + prompt, + }); + } catch (err) { + this.log(`event ledger prompt record failed for ${session.sessionId}: ${String(err)}`); + await this.markLedgerIncomplete(session); + } + } + + private async recordLedgerUpdate(params: { + sessionId: string; + sessionKey: string; + ledgerSessionId?: string; + runId?: string; + update: SessionUpdate; + }): Promise { + try { + await this.eventLedger.recordUpdate({ + sessionId: params.ledgerSessionId ?? params.sessionId, + sessionKey: params.sessionKey, + ...(params.runId ? { runId: params.runId } : {}), + update: params.update, + }); + } catch (err) { + this.log(`event ledger update record failed for ${params.sessionId}: ${String(err)}`); + await this.markLedgerIncomplete({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + ...(params.ledgerSessionId ? { ledgerSessionId: params.ledgerSessionId } : {}), + }); + } + } + + private async markLedgerIncomplete(session: { + sessionId: string; + sessionKey: string; + ledgerSessionId?: string; + }): Promise { + try { + await this.eventLedger.markIncomplete({ + sessionId: resolveLedgerSessionId(session), + sessionKey: session.sessionKey, + }); + } catch (err) { + this.log(`event ledger incomplete mark failed for ${session.sessionId}: ${String(err)}`); + } + } + + private async emitSessionUpdate(params: { + sessionId: string; + sessionKey?: string; + ledgerSessionId?: string; + runId?: string; + update: SessionUpdate; + record?: boolean; + }): Promise { await this.connection.sessionUpdate({ - sessionId, + sessionId: params.sessionId, + update: params.update, + }); + if (params.record && params.sessionKey) { + await this.recordLedgerUpdate({ + sessionId: params.sessionId, + sessionKey: params.sessionKey, + ...(params.ledgerSessionId ? { ledgerSessionId: params.ledgerSessionId } : {}), + ...(params.runId ? { runId: params.runId } : {}), + update: params.update, + }); + } + } + + private async sendAvailableCommands( + session: { sessionId: string; sessionKey: string; ledgerSessionId?: string }, + options: { record: boolean }, + ): Promise { + await this.emitSessionUpdate({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + ...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}), + record: options.record, update: { sessionUpdate: "available_commands_update", availableCommands: await getAvailableCommandsForAcp(), @@ -1552,7 +1795,7 @@ export class AcpGatewayAgent implements Agent { for (const message of transcript) { const replayChunks = extractReplayChunks(message); for (const chunk of replayChunks) { - await this.connection.sessionUpdate({ + await this.emitSessionUpdate({ sessionId, update: { sessionUpdate: chunk.sessionUpdate, @@ -1563,21 +1806,42 @@ export class AcpGatewayAgent implements Agent { } } - private async sendSessionSnapshotUpdate( + private async replayLedgerSession( sessionId: string, + ledgerReplay: AcpEventLedgerReplay, + ): Promise { + for (const event of ledgerReplay.events) { + await this.emitSessionUpdate({ + sessionId, + update: event.update, + record: false, + }); + } + } + + private async sendSessionSnapshotUpdate( + session: { sessionId: string; sessionKey: string; ledgerSessionId?: string }, sessionSnapshot: SessionSnapshot, - options: { includeControls: boolean }, + options: { includeControls: boolean; record: boolean; runId?: string }, ): Promise { if (options.includeControls) { - await this.connection.sessionUpdate({ - sessionId, + await this.emitSessionUpdate({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + ...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}), + runId: options.runId, + record: options.record, update: { sessionUpdate: "current_mode_update", currentModeId: sessionSnapshot.modes.currentModeId, }, }); - await this.connection.sessionUpdate({ - sessionId, + await this.emitSessionUpdate({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + ...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}), + runId: options.runId, + record: options.record, update: { sessionUpdate: "config_option_update", configOptions: sessionSnapshot.configOptions, @@ -1585,8 +1849,12 @@ export class AcpGatewayAgent implements Agent { }); } if (sessionSnapshot.metadata) { - await this.connection.sessionUpdate({ - sessionId, + await this.emitSessionUpdate({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + ...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}), + runId: options.runId, + record: options.record, update: { sessionUpdate: "session_info_update", ...sessionSnapshot.metadata, @@ -1594,8 +1862,12 @@ export class AcpGatewayAgent implements Agent { }); } if (sessionSnapshot.usage) { - await this.connection.sessionUpdate({ - sessionId, + await this.emitSessionUpdate({ + sessionId: session.sessionId, + sessionKey: session.sessionKey, + ...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}), + runId: options.runId, + record: options.record, update: { sessionUpdate: "usage_update", used: sessionSnapshot.usage.used, diff --git a/src/acp/types.ts b/src/acp/types.ts index e28d962b82a..126bf0e9eb1 100644 --- a/src/acp/types.ts +++ b/src/acp/types.ts @@ -21,6 +21,7 @@ export function normalizeAcpProvenanceMode( export type AcpSession = { sessionId: SessionId; sessionKey: string; + ledgerSessionId?: string; cwd: string; createdAt: number; lastTouchedAt: number;