Add ACP session load event ledger (#79093)

* Add ACP session load event ledger

* Record ACP prompts after send acceptance

* Support ACP ledger replay by session key

* Harden ACP ledger replay completeness

* Harden ACP ledger review gaps

* Fix ACP canonical session key handling

---------

Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com>
This commit is contained in:
Alex Knight
2026-05-08 13:54:08 +10:00
committed by GitHub
parent 252456e2f6
commit b1eedb2fc8
10 changed files with 1643 additions and 49 deletions

View File

@@ -42,6 +42,7 @@ Docs: https://docs.openclaw.ai
- ACPX/Codex: preserve trusted Codex project declarations when launching isolated Codex ACP sessions, avoiding interactive trust prompts in headless runs. Thanks @Stedyclaw.
- ACPX/Codex: reap stale OpenClaw-owned ACPX/Codex ACP process trees on startup and after ACP session close, preventing orphaned harness processes from slowing the Gateway. Thanks @91wan.
- ACP bridge: implement stable session list, resume, and close handlers so ACP clients can page Gateway sessions, rebind existing sessions without replay, and close bridge sessions cleanly. Thanks @amknight.
- ACP bridge: replay complete ledger-backed ACP sessions on load, including user prompts, tool updates, session metadata, and usage snapshots, while keeping older sessions on the existing transcript fallback. Thanks @amknight.
- ACP sessions: allow parent agents to inspect and message their own spawned cross-agent ACP sessions without enabling broad agent-to-agent visibility. Thanks @barronlroth.
- Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface.
- Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids.

View File

@@ -44,7 +44,7 @@ Quick rule:
| `initialize`, `newSession`, `prompt`, `cancel` | Implemented | Core bridge flow over stdio to Gateway chat/send + abort. |
| `listSessions`, slash commands | Implemented | Session list works against Gateway session state with bounded cursor pagination and `cwd` filtering where Gateway session rows carry workspace metadata; commands are advertised via `available_commands_update`. |
| `resumeSession`, `closeSession` | Implemented | Resume rebinds an ACP session to an existing Gateway session without replaying history. Close cancels active bridge work, resolves pending prompts as cancelled, and releases bridge session state. |
| `loadSession` | Partial | Rebinds the ACP session to a Gateway session key and replays stored user/assistant text history. Tool/system history is not reconstructed yet. |
| `loadSession` | Partial | Rebinds the ACP session to a Gateway session key and replays ACP event-ledger history for bridge-created sessions. Older/no-ledger sessions fall back to stored user/assistant text. |
| Prompt content (`text`, embedded `resource`, images) | Partial | Text/resources are flattened into chat input; images become Gateway attachments. |
| Session modes | Partial | `session/set_mode` is supported and the bridge exposes initial Gateway-backed session controls for thought level, tool verbosity, reasoning, usage detail, and elevated actions. Broader ACP-native mode/config surfaces are still out of scope. |
| Session info and usage updates | Partial | The bridge emits `session_info_update` and best-effort `usage_update` notifications from cached Gateway session snapshots. Usage is approximate and only sent when Gateway token totals are marked fresh. |
@@ -56,9 +56,9 @@ Quick rule:
## Known Limitations
- `loadSession` replays stored user and assistant text history, but it does not
reconstruct historic tool calls, system notices, or richer ACP-native event
types.
- `loadSession` can replay complete ACP event-ledger history only for
bridge-created sessions. Older/no-ledger sessions still use transcript
fallback and do not reconstruct historic tool calls or system notices.
- If multiple ACP clients share the same Gateway session key, event and cancel
routing are best-effort rather than strictly isolated per client. Prefer the
default isolated `acp:<uuid>` sessions when you need clean editor-local

View File

@@ -0,0 +1,369 @@
import fs from "node:fs/promises";
import path from "node:path";
import { describe, expect, it } from "vitest";
import { withTempDir } from "../test-helpers/temp-dir.js";
import { createFileAcpEventLedger, createInMemoryAcpEventLedger } from "./event-ledger.js";
describe("ACP event ledger", () => {
it("records complete in-memory session updates in sequence", async () => {
const ledger = createInMemoryAcpEventLedger({ now: () => 123 });
await ledger.startSession({
sessionId: "session-1",
sessionKey: "agent:main:work",
cwd: "/work",
complete: true,
});
await ledger.recordUserPrompt({
sessionId: "session-1",
sessionKey: "agent:main:work",
runId: "run-1",
prompt: [{ type: "text", text: "Question" }],
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
runId: "run-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Answer" },
},
});
const replay = await ledger.readReplay({
sessionId: "session-1",
sessionKey: "agent:main:work",
});
expect(replay.complete).toBe(true);
expect(replay.events.map((event) => event.seq)).toEqual([1, 2]);
expect(replay.events.map((event) => event.runId)).toEqual(["run-1", "run-1"]);
expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([
"user_message_chunk",
"agent_message_chunk",
]);
});
it("marks a session incomplete when event retention truncates history", async () => {
const ledger = createInMemoryAcpEventLedger({ maxEventsPerSession: 1 });
await ledger.startSession({
sessionId: "session-1",
sessionKey: "agent:main:work",
cwd: "/work",
complete: true,
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "First" },
},
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Second" },
},
});
await expect(
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
).resolves.toEqual({ complete: false, events: [] });
});
it("persists file-backed replay state across ledger instances", async () => {
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
const filePath = path.join(dir, "acp", "event-ledger.json");
const first = createFileAcpEventLedger({ filePath, now: () => 1000 });
await first.startSession({
sessionId: "session-1",
sessionKey: "agent:main:work",
cwd: "/work",
complete: true,
});
await first.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
runId: "run-1",
update: {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "Thinking" },
},
});
const second = createFileAcpEventLedger({ filePath });
const replay = await second.readReplay({
sessionId: "session-1",
sessionKey: "agent:main:work",
});
expect(replay.complete).toBe(true);
expect(replay.events).toHaveLength(1);
expect(replay.events[0]?.update).toEqual({
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: "Thinking" },
});
await expect(fs.readFile(filePath, "utf8")).resolves.toContain('"version":1');
});
});
it("can replay a complete session by Gateway session key", async () => {
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
await ledger.startSession({
sessionId: "acp-session-1",
sessionKey: "acp:gateway-session-1",
cwd: "/work",
complete: true,
});
await ledger.recordUpdate({
sessionId: "acp-session-1",
sessionKey: "acp:gateway-session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Answer" },
},
});
const replay = await ledger.readReplayBySessionKey({
sessionKey: "acp:gateway-session-1",
});
expect(replay.complete).toBe(true);
expect(replay.sessionId).toBe("acp-session-1");
expect(replay.sessionKey).toBe("acp:gateway-session-1");
expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([
"agent_message_chunk",
]);
});
it("preserves prompt history when a provisional ACP key becomes a canonical Gateway key", async () => {
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
await ledger.startSession({
sessionId: "acp-session-1",
sessionKey: "acp:gateway-session-1",
cwd: "/work",
complete: true,
});
await ledger.recordUserPrompt({
sessionId: "acp-session-1",
sessionKey: "acp:gateway-session-1",
runId: "run-1",
prompt: [{ type: "text", text: "Question" }],
});
await ledger.recordUpdate({
sessionId: "acp-session-1",
sessionKey: "agent:main:acp:gateway-session-1",
runId: "run-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Answer" },
},
});
const replay = await ledger.readReplayBySessionKey({
sessionKey: "agent:main:acp:gateway-session-1",
});
expect(replay.complete).toBe(true);
expect(replay.sessionId).toBe("acp-session-1");
expect(replay.sessionKey).toBe("agent:main:acp:gateway-session-1");
expect(replay.events.map((event) => event.update.sessionUpdate)).toEqual([
"user_message_chunk",
"agent_message_chunk",
]);
});
it("can replay multi-block prompt history by ACP session id", async () => {
const ledger = createInMemoryAcpEventLedger({ now: () => 1000 });
await ledger.startSession({
sessionId: "acp-session-1",
sessionKey: "acp:gateway-session-1",
cwd: "/work",
complete: true,
});
await ledger.recordUserPrompt({
sessionId: "acp-session-1",
sessionKey: "acp:gateway-session-1",
runId: "run-1",
prompt: [
{ type: "text", text: "First" },
{ type: "text", text: "Second" },
],
});
const replay = await ledger.readReplayBySessionId({ sessionId: "acp-session-1" });
expect(replay.complete).toBe(true);
expect(replay.sessionKey).toBe("acp:gateway-session-1");
expect(
replay.events.map((event) =>
event.update.sessionUpdate === "user_message_chunk" ? event.update.content : undefined,
),
).toEqual([
{ type: "text", text: "First" },
{ type: "text", text: "Second" },
]);
});
it("evicts the oldest complete session when session retention is exceeded", async () => {
let now = 1000;
const ledger = createInMemoryAcpEventLedger({ maxSessions: 1, now: () => now++ });
await ledger.startSession({
sessionId: "old-session",
sessionKey: "acp:old-gateway-session",
cwd: "/work",
complete: true,
});
await ledger.startSession({
sessionId: "new-session",
sessionKey: "acp:new-gateway-session",
cwd: "/work",
complete: true,
});
await expect(
ledger.readReplay({ sessionId: "old-session", sessionKey: "acp:old-gateway-session" }),
).resolves.toEqual({ complete: false, events: [] });
const replay = await ledger.readReplayBySessionId({ sessionId: "new-session" });
expect(replay.complete).toBe(true);
expect(replay.sessionKey).toBe("acp:new-gateway-session");
});
it("resets stale events when a session is restarted with reset", async () => {
const ledger = createInMemoryAcpEventLedger();
await ledger.startSession({
sessionId: "session-1",
sessionKey: "acp:old-session",
cwd: "/work",
complete: true,
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "acp:old-session",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Old answer" },
},
});
await ledger.startSession({
sessionId: "session-1",
sessionKey: "acp:new-session",
cwd: "/work",
complete: true,
reset: true,
});
await expect(
ledger.readReplay({ sessionId: "session-1", sessionKey: "acp:old-session" }),
).resolves.toEqual({ complete: false, events: [] });
await expect(ledger.readReplayBySessionId({ sessionId: "session-1" })).resolves.toMatchObject({
complete: true,
sessionKey: "acp:new-session",
events: [],
});
});
it("marks replay incomplete when serialized byte retention trims payloads", async () => {
const ledger = createInMemoryAcpEventLedger({ maxSerializedBytes: 900 });
await ledger.startSession({
sessionId: "session-1",
sessionKey: "agent:main:work",
cwd: "/work",
complete: true,
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
update: {
sessionUpdate: "tool_call_update",
toolCallId: "tool-1",
status: "completed",
rawOutput: { content: "x".repeat(5_000) },
},
});
await expect(
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
).resolves.toEqual({ complete: false, events: [] });
});
it("keeps the persisted ledger file under the serialized byte budget", async () => {
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
const filePath = path.join(dir, "acp", "event-ledger.json");
const ledger = createFileAcpEventLedger({ filePath, maxSerializedBytes: 1024 });
await ledger.startSession({
sessionId: "session-1",
sessionKey: "agent:main:work",
cwd: "/work",
complete: true,
});
await ledger.recordUpdate({
sessionId: "session-1",
sessionKey: "agent:main:work",
update: {
sessionUpdate: "tool_call_update",
toolCallId: "tool-1",
status: "completed",
rawOutput: { content: "x".repeat(5_000) },
},
});
const bytes = Buffer.byteLength(await fs.readFile(filePath, "utf8"), "utf8");
expect(bytes).toBeLessThanOrEqual(1024);
await expect(
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
).resolves.toEqual({ complete: false, events: [] });
});
});
it("ignores corrupt ledger files instead of replaying unknown state", async () => {
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
const filePath = path.join(dir, "event-ledger.json");
await fs.writeFile(filePath, "{bad json", "utf8");
const ledger = createFileAcpEventLedger({ filePath });
await expect(
ledger.readReplay({ sessionId: "session-1", sessionKey: "agent:main:work" }),
).resolves.toEqual({ complete: false, events: [] });
});
});
it("reloads file-backed state under lock before writing", async () => {
await withTempDir({ prefix: "openclaw-acp-ledger-" }, async (dir) => {
const filePath = path.join(dir, "acp", "event-ledger.json");
const first = createFileAcpEventLedger({ filePath });
const second = createFileAcpEventLedger({ filePath });
await first.startSession({
sessionId: "session-1",
sessionKey: "acp:gateway-session-1",
cwd: "/work",
complete: true,
});
await second.startSession({
sessionId: "session-2",
sessionKey: "acp:gateway-session-2",
cwd: "/work",
complete: true,
});
await first.recordUpdate({
sessionId: "session-1",
sessionKey: "acp:gateway-session-1",
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Answer" },
},
});
const reader = createFileAcpEventLedger({ filePath });
const replay = await reader.readReplay({
sessionId: "session-2",
sessionKey: "acp:gateway-session-2",
});
expect(replay.complete).toBe(true);
expect(replay.sessionKey).toBe("acp:gateway-session-2");
});
});
});

485
src/acp/event-ledger.ts Normal file
View File

@@ -0,0 +1,485 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { ContentBlock, SessionUpdate } from "@agentclientprotocol/sdk";
import { resolveStateDir } from "../config/paths.js";
import { withFileLock } from "../infra/file-lock.js";
import { readJsonFile, writeTextAtomic } from "../infra/json-files.js";
import { isRecord } from "../utils.js";
const LEDGER_VERSION = 1;
const DEFAULT_MAX_SESSIONS = 200;
const DEFAULT_MAX_EVENTS_PER_SESSION = 5_000;
const DEFAULT_MAX_SERIALIZED_BYTES = 16 * 1024 * 1024;
const FILE_LEDGER_LOCK_OPTIONS = {
retries: {
retries: 8,
factor: 2,
minTimeout: 50,
maxTimeout: 5_000,
randomize: true,
},
stale: 15_000,
} as const;
export type AcpEventLedgerEntry = {
seq: number;
at: number;
sessionId: string;
sessionKey: string;
runId?: string;
update: SessionUpdate;
};
export type AcpEventLedgerReplay = {
complete: boolean;
sessionId?: string;
sessionKey?: string;
events: AcpEventLedgerEntry[];
};
export type AcpEventLedger = {
startSession: (params: {
sessionId: string;
sessionKey: string;
cwd: string;
complete: boolean;
reset?: boolean;
}) => Promise<void>;
recordUserPrompt: (params: {
sessionId: string;
sessionKey: string;
runId: string;
prompt: readonly ContentBlock[];
}) => Promise<void>;
recordUpdate: (params: {
sessionId: string;
sessionKey: string;
runId?: string;
update: SessionUpdate;
}) => Promise<void>;
markIncomplete: (params: { sessionId: string; sessionKey: string }) => Promise<void>;
readReplay: (params: { sessionId: string; sessionKey: string }) => Promise<AcpEventLedgerReplay>;
readReplayBySessionId: (params: { sessionId: string }) => Promise<AcpEventLedgerReplay>;
readReplayBySessionKey: (params: { sessionKey: string }) => Promise<AcpEventLedgerReplay>;
};
type LedgerSession = {
sessionId: string;
sessionKey: string;
cwd: string;
complete: boolean;
createdAt: number;
updatedAt: number;
nextSeq: number;
events: AcpEventLedgerEntry[];
};
type LedgerStore = {
version: 1;
sessions: Record<string, LedgerSession>;
};
type LedgerOptions = {
maxSessions?: number;
maxEventsPerSession?: number;
maxSerializedBytes?: number;
now?: () => number;
};
type MutableLedgerState = {
store: LedgerStore;
maxSessions: number;
maxEventsPerSession: number;
maxSerializedBytes: number;
now: () => number;
};
function createEmptyStore(): LedgerStore {
return {
version: LEDGER_VERSION,
sessions: {},
};
}
function normalizeLedgerOptions(options: LedgerOptions = {}) {
return {
maxSessions: Math.max(1, Math.floor(options.maxSessions ?? DEFAULT_MAX_SESSIONS)),
maxEventsPerSession: Math.max(
1,
Math.floor(options.maxEventsPerSession ?? DEFAULT_MAX_EVENTS_PER_SESSION),
),
maxSerializedBytes: Math.max(
1_024,
Math.floor(options.maxSerializedBytes ?? DEFAULT_MAX_SERIALIZED_BYTES),
),
now: options.now ?? Date.now,
};
}
function cloneJsonValue<T>(value: T): T {
return structuredClone(value);
}
function createUserPromptUpdates(prompt: readonly ContentBlock[]): SessionUpdate[] {
return prompt.map((content) => ({
sessionUpdate: "user_message_chunk",
content: cloneJsonValue(content),
}));
}
function serializeLedgerStore(store: LedgerStore): string {
return JSON.stringify(store);
}
function getSerializedLedgerByteLength(store: LedgerStore): number {
return Buffer.byteLength(serializeLedgerStore(store), "utf8");
}
function normalizeEvent(raw: unknown): AcpEventLedgerEntry | undefined {
if (!isRecord(raw) || !isRecord(raw.update)) {
return undefined;
}
const seq = raw.seq;
const at = raw.at;
const sessionId = raw.sessionId;
const sessionKey = raw.sessionKey;
const runId = raw.runId;
const sessionUpdate = raw.update.sessionUpdate;
if (
typeof seq !== "number" ||
!Number.isInteger(seq) ||
seq < 0 ||
typeof at !== "number" ||
!Number.isFinite(at) ||
typeof sessionId !== "string" ||
typeof sessionKey !== "string" ||
typeof sessionUpdate !== "string"
) {
return undefined;
}
return {
seq,
at,
sessionId,
sessionKey,
...(typeof runId === "string" && runId ? { runId } : {}),
update: cloneJsonValue(raw.update) as SessionUpdate,
};
}
function normalizeSession(raw: unknown): LedgerSession | undefined {
if (!isRecord(raw)) {
return undefined;
}
const sessionId = raw.sessionId;
const sessionKey = raw.sessionKey;
const cwd = raw.cwd;
const createdAt = raw.createdAt;
const updatedAt = raw.updatedAt;
const nextSeq = raw.nextSeq;
if (
typeof sessionId !== "string" ||
typeof sessionKey !== "string" ||
typeof cwd !== "string" ||
typeof createdAt !== "number" ||
!Number.isFinite(createdAt) ||
typeof updatedAt !== "number" ||
!Number.isFinite(updatedAt) ||
typeof nextSeq !== "number" ||
!Number.isInteger(nextSeq) ||
nextSeq < 1
) {
return undefined;
}
const events = Array.isArray(raw.events)
? raw.events.map(normalizeEvent).filter((event): event is AcpEventLedgerEntry => Boolean(event))
: [];
return {
sessionId,
sessionKey,
cwd,
complete: raw.complete === true,
createdAt,
updatedAt,
nextSeq,
events,
};
}
function normalizeStore(raw: unknown): LedgerStore {
if (!isRecord(raw) || raw.version !== LEDGER_VERSION || !isRecord(raw.sessions)) {
return createEmptyStore();
}
const sessions: Record<string, LedgerSession> = {};
for (const [sessionId, value] of Object.entries(raw.sessions)) {
const session = normalizeSession(value);
if (!session || session.sessionId !== sessionId) {
continue;
}
sessions[sessionId] = session;
}
return { version: LEDGER_VERSION, sessions };
}
function getOrCreateSession(
state: MutableLedgerState,
params: {
sessionId: string;
sessionKey: string;
cwd: string;
complete: boolean;
reset?: boolean;
},
): LedgerSession {
const now = state.now();
const existing = state.store.sessions[params.sessionId];
if (!params.reset && existing) {
existing.sessionKey = params.sessionKey;
if (params.cwd) {
existing.cwd = params.cwd;
}
existing.complete = existing.complete || params.complete;
existing.updatedAt = now;
return existing;
}
const session: LedgerSession = {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
cwd: params.cwd,
complete: params.complete,
createdAt: now,
updatedAt: now,
nextSeq: 1,
events: [],
};
state.store.sessions[params.sessionId] = session;
return session;
}
function trimLedger(state: MutableLedgerState): void {
for (const session of Object.values(state.store.sessions)) {
if (session.events.length <= state.maxEventsPerSession) {
continue;
}
session.events = session.events.slice(-state.maxEventsPerSession);
session.complete = false;
}
const sessions = Object.values(state.store.sessions);
if (sessions.length > state.maxSessions) {
for (const session of sessions
.toSorted((a, b) => b.updatedAt - a.updatedAt)
.slice(state.maxSessions)) {
delete state.store.sessions[session.sessionId];
}
}
let serializedBytes = getSerializedLedgerByteLength(state.store);
while (serializedBytes > state.maxSerializedBytes) {
const session = Object.values(state.store.sessions)
.filter((candidate) => candidate.events.length > 0)
.toSorted((a, b) => a.updatedAt - b.updatedAt)[0];
if (!session) {
break;
}
session.events.shift();
session.complete = false;
serializedBytes = getSerializedLedgerByteLength(state.store);
}
while (serializedBytes > state.maxSerializedBytes) {
const session = Object.values(state.store.sessions).toSorted(
(a, b) => a.updatedAt - b.updatedAt,
)[0];
if (!session) {
break;
}
delete state.store.sessions[session.sessionId];
serializedBytes = getSerializedLedgerByteLength(state.store);
}
}
function appendUpdate(
state: MutableLedgerState,
params: {
sessionId: string;
sessionKey: string;
runId?: string;
update: SessionUpdate;
},
): void {
const session = getOrCreateSession(state, {
sessionId: params.sessionId,
sessionKey: params.sessionKey,
cwd: "",
complete: false,
});
const now = state.now();
session.updatedAt = now;
session.events.push({
seq: session.nextSeq,
at: now,
sessionId: params.sessionId,
sessionKey: params.sessionKey,
...(params.runId ? { runId: params.runId } : {}),
update: cloneJsonValue(params.update),
});
session.nextSeq += 1;
trimLedger(state);
}
function createLedgerApi(params: {
state: MutableLedgerState;
mutate: (fn: () => void) => Promise<void>;
read: <T>(fn: () => T) => Promise<T>;
}): AcpEventLedger {
const buildReplay = (session: LedgerSession): AcpEventLedgerReplay => ({
complete: true,
sessionId: session.sessionId,
sessionKey: session.sessionKey,
events: session.events.map((event) => cloneJsonValue(event)),
});
return {
async startSession(sessionParams) {
await params.mutate(() => {
getOrCreateSession(params.state, sessionParams);
trimLedger(params.state);
});
},
async recordUserPrompt(promptParams) {
await params.mutate(() => {
for (const update of createUserPromptUpdates(promptParams.prompt)) {
appendUpdate(params.state, {
sessionId: promptParams.sessionId,
sessionKey: promptParams.sessionKey,
runId: promptParams.runId,
update,
});
}
});
},
async recordUpdate(updateParams) {
await params.mutate(() => {
appendUpdate(params.state, updateParams);
});
},
async markIncomplete(markParams) {
await params.mutate(() => {
const session = params.state.store.sessions[markParams.sessionId];
if (!session || session.sessionKey !== markParams.sessionKey) {
return;
}
session.complete = false;
session.updatedAt = params.state.now();
});
},
async readReplay(replayParams) {
return params.read(() => {
const session = params.state.store.sessions[replayParams.sessionId];
if (!session || session.sessionKey !== replayParams.sessionKey || !session.complete) {
return { complete: false, events: [] };
}
return buildReplay(session);
});
},
async readReplayBySessionId(replayParams) {
return params.read(() => {
const session = params.state.store.sessions[replayParams.sessionId];
if (!session || !session.complete) {
return { complete: false, events: [] };
}
return buildReplay(session);
});
},
async readReplayBySessionKey(replayParams) {
return params.read(() => {
const session = Object.values(params.state.store.sessions)
.filter(
(candidate) => candidate.sessionKey === replayParams.sessionKey && candidate.complete,
)
.toSorted((a, b) => b.updatedAt - a.updatedAt)[0];
if (!session) {
return { complete: false, events: [] };
}
return buildReplay(session);
});
},
};
}
export function createInMemoryAcpEventLedger(options: LedgerOptions = {}): AcpEventLedger {
const normalized = normalizeLedgerOptions(options);
const state: MutableLedgerState = {
store: createEmptyStore(),
...normalized,
};
return createLedgerApi({
state,
mutate: async (fn) => {
fn();
},
read: async (fn) => fn(),
});
}
export function resolveDefaultAcpEventLedgerPath(env: NodeJS.ProcessEnv = process.env): string {
return path.join(resolveStateDir(env), "acp", "event-ledger.json");
}
export function createFileAcpEventLedger(
params: { filePath: string } & LedgerOptions,
): AcpEventLedger {
const normalized = normalizeLedgerOptions(params);
const state: MutableLedgerState = {
store: createEmptyStore(),
...normalized,
};
let operation = Promise.resolve();
const load = async () => {
state.store = normalizeStore(await readJsonFile(params.filePath));
};
const ensureParentDir = async () => {
await fs.mkdir(path.dirname(params.filePath), { recursive: true, mode: 0o700 });
};
const enqueue = async <T>(fn: () => Promise<T>): Promise<T> => {
const task = operation.then(fn, fn);
operation = task.then(
() => {},
() => {},
);
return task;
};
return createLedgerApi({
state,
mutate: async (fn) =>
enqueue(async () => {
await ensureParentDir();
await withFileLock(params.filePath, FILE_LEDGER_LOCK_OPTIONS, async () => {
await load();
fn();
await writeTextAtomic(params.filePath, serializeLedgerStore(state.store), {
mode: 0o600,
dirMode: 0o700,
});
});
}),
read: async (fn) =>
enqueue(async () => {
await ensureParentDir();
return await withFileLock(params.filePath, FILE_LEDGER_LOCK_OPTIONS, async () => {
await load();
return fn();
});
}),
});
}

View File

@@ -10,6 +10,7 @@ import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/
import { isMainModule } from "../infra/is-main.js";
import { routeLogsToStderr } from "../logging/console.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { createFileAcpEventLedger, resolveDefaultAcpEventLedgerPath } from "./event-ledger.js";
import { readSecretFromFile } from "./secret-file.js";
import { AcpGatewayAgent } from "./translator.js";
import { normalizeAcpProvenanceMode, type AcpServerOptions } from "./types.js";
@@ -121,9 +122,12 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise<void
const input = Writable.toWeb(process.stdout);
const output = Readable.toWeb(process.stdin) as unknown as ReadableStream<Uint8Array>;
const stream = ndJsonStream(input, output);
const eventLedger = createFileAcpEventLedger({
filePath: resolveDefaultAcpEventLedgerPath(process.env),
});
const _connection = new AgentSideConnection((conn: AgentSideConnection) => {
agent = new AcpGatewayAgent(conn, gateway, opts);
agent = new AcpGatewayAgent(conn, gateway, { ...opts, eventLedger });
agent.start();
return agent;
}, stream);

View File

@@ -2,7 +2,12 @@ import { randomUUID } from "node:crypto";
import type { AcpSession } from "./types.js";
export type AcpSessionStore = {
createSession: (params: { sessionKey: string; cwd: string; sessionId?: string }) => AcpSession;
createSession: (params: {
sessionKey: string;
cwd: string;
sessionId?: string;
ledgerSessionId?: string;
}) => AcpSession;
hasSession: (sessionId: string) => boolean;
getSession: (sessionId: string) => AcpSession | undefined;
getSessionByRunId: (runId: string) => AcpSession | undefined;
@@ -84,6 +89,9 @@ export function createInMemorySessionStore(options: AcpSessionStoreOptions = {})
const existingSession = sessions.get(sessionId);
if (existingSession) {
existingSession.sessionKey = params.sessionKey;
if ("ledgerSessionId" in params) {
existingSession.ledgerSessionId = params.ledgerSessionId;
}
existingSession.cwd = params.cwd;
touchSession(existingSession, nowMs);
return existingSession;
@@ -97,6 +105,7 @@ export function createInMemorySessionStore(options: AcpSessionStoreOptions = {})
const session: AcpSession = {
sessionId,
sessionKey: params.sessionKey,
...(params.ledgerSessionId ? { ledgerSessionId: params.ledgerSessionId } : {}),
cwd: params.cwd,
createdAt: nowMs,
lastTouchedAt: nowMs,

View File

@@ -0,0 +1,396 @@
import type {
LoadSessionRequest,
NewSessionRequest,
PromptRequest,
} from "@agentclientprotocol/sdk";
import { describe, expect, it, vi } from "vitest";
import type { GatewayClient } from "../gateway/client.js";
import type { EventFrame } from "../gateway/protocol/index.js";
import { createInMemoryAcpEventLedger, type AcpEventLedger } from "./event-ledger.js";
import { createInMemorySessionStore } from "./session.js";
import { AcpGatewayAgent } from "./translator.js";
import { createAcpConnection, createAcpGateway } from "./translator.test-helpers.js";
vi.mock("./commands.js", () => ({
getAvailableCommands: () => [],
}));
function createNewSessionRequest(cwd = "/tmp"): NewSessionRequest {
return {
cwd,
mcpServers: [],
_meta: {},
} as unknown as NewSessionRequest;
}
function createLoadSessionRequest(sessionId: string, cwd = "/tmp"): LoadSessionRequest {
return {
sessionId,
cwd,
mcpServers: [],
_meta: {},
} as unknown as LoadSessionRequest;
}
function createPromptRequest(sessionId: string, text: string): PromptRequest {
return {
sessionId,
prompt: [{ type: "text", text }],
_meta: {},
} as unknown as PromptRequest;
}
function createToolEvent(params: {
sessionKey: string;
runId: string;
phase: "start" | "result";
toolCallId: string;
}): EventFrame {
return {
event: "agent",
payload: {
sessionKey: params.sessionKey,
runId: params.runId,
stream: "tool",
data: {
phase: params.phase,
toolCallId: params.toolCallId,
name: "read",
args: { path: "src/app.ts" },
result: { content: [{ type: "text", text: "FILE:src/app.ts" }] },
},
},
} as unknown as EventFrame;
}
function createChatEvent(params: {
sessionKey: string;
runId: string;
state: "delta" | "final";
text: string;
}): EventFrame {
return {
event: "chat",
payload: {
sessionKey: params.sessionKey,
runId: params.runId,
state: params.state,
message: {
content: [{ type: "text", text: params.text }],
},
},
} as unknown as EventFrame;
}
describe("ACP translator event ledger replay", () => {
it("loads complete ledger-backed sessions without the lossy Gateway transcript fallback", async () => {
const eventLedger = createInMemoryAcpEventLedger();
const firstSessionStore = createInMemorySessionStore();
const firstConnection = createAcpConnection();
const firstRequestMock = vi.fn(async (method: string) => {
if (method === "chat.send") {
return { ok: true };
}
return { ok: true };
});
const firstRequest = firstRequestMock as GatewayClient["request"];
const firstAgent = new AcpGatewayAgent(firstConnection, createAcpGateway(firstRequest), {
eventLedger,
sessionStore: firstSessionStore,
});
const created = await firstAgent.newSession(createNewSessionRequest());
const firstSession = firstSessionStore.getSession(created.sessionId);
if (!firstSession) {
throw new Error("Expected new ACP session to be stored");
}
firstConnection.__sessionUpdateMock.mockClear();
const promptPromise = firstAgent.prompt(createPromptRequest(created.sessionId, "Question"));
for (let attempt = 0; attempt < 10; attempt += 1) {
if (firstRequestMock.mock.calls.some((call) => call[0] === "chat.send")) {
break;
}
await new Promise<void>((resolve) => {
setImmediate(resolve);
});
}
const runId = firstSessionStore.getSession(created.sessionId)?.activeRunId;
if (!runId) {
throw new Error("Expected active ACP run");
}
await firstAgent.handleGatewayEvent(
createToolEvent({
sessionKey: firstSession.sessionKey,
runId,
phase: "start",
toolCallId: "tool-1",
}),
);
await firstAgent.handleGatewayEvent(
createToolEvent({
sessionKey: firstSession.sessionKey,
runId,
phase: "result",
toolCallId: "tool-1",
}),
);
await firstAgent.handleGatewayEvent(
createChatEvent({
sessionKey: firstSession.sessionKey,
runId,
state: "delta",
text: "Answer",
}),
);
await firstAgent.handleGatewayEvent(
createChatEvent({
sessionKey: firstSession.sessionKey,
runId,
state: "final",
text: "Answer",
}),
);
await expect(promptPromise).resolves.toEqual({ stopReason: "end_turn" });
const secondConnection = createAcpConnection();
const secondRequestMock = vi.fn(async (method: string) => {
if (method === "sessions.get") {
throw new Error("ledger replay should not call sessions.get");
}
return { ok: true };
});
const secondRequest = secondRequestMock as GatewayClient["request"];
const secondAgent = new AcpGatewayAgent(secondConnection, createAcpGateway(secondRequest), {
eventLedger,
sessionStore: createInMemorySessionStore(),
});
await secondAgent.loadSession(createLoadSessionRequest(created.sessionId));
expect(secondRequestMock).not.toHaveBeenCalledWith("sessions.get", expect.anything());
const replayedUpdates = secondConnection.__sessionUpdateMock.mock.calls.map(
(call) => call[0]?.update,
);
const replayedUpdateTypes = replayedUpdates.map((update) => update?.sessionUpdate);
expect(replayedUpdateTypes).toEqual(
expect.arrayContaining([
"session_info_update",
"available_commands_update",
"user_message_chunk",
"tool_call",
"tool_call_update",
"agent_message_chunk",
]),
);
expect(replayedUpdates).toContainEqual({
sessionUpdate: "user_message_chunk",
content: { type: "text", text: "Question" },
});
expect(replayedUpdates).toContainEqual({
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "Answer" },
});
expect(replayedUpdateTypes.indexOf("user_message_chunk")).toBeLessThan(
replayedUpdateTypes.indexOf("agent_message_chunk"),
);
const ledgerReplay = await eventLedger.readReplay({
sessionId: created.sessionId,
sessionKey: firstSession.sessionKey,
});
expect(
ledgerReplay.events.filter((event) => event.update.sessionUpdate === "user_message_chunk"),
).toHaveLength(1);
const listedSessionStore = createInMemorySessionStore();
const listedConnection = createAcpConnection();
const listedRequestMock = vi.fn(async (method: string) => {
if (method === "sessions.get") {
throw new Error("listed session ledger replay should not call sessions.get");
}
return { ok: true };
});
const listedAgent = new AcpGatewayAgent(
listedConnection,
createAcpGateway(listedRequestMock as GatewayClient["request"]),
{
eventLedger,
sessionStore: listedSessionStore,
},
);
await listedAgent.loadSession(createLoadSessionRequest(firstSession.sessionKey));
expect(listedRequestMock).not.toHaveBeenCalledWith("sessions.get", expect.anything());
const listedReplayTypes = listedConnection.__sessionUpdateMock.mock.calls.map(
(call) => call[0]?.update?.sessionUpdate,
);
expect(listedReplayTypes).toEqual(
expect.arrayContaining(["user_message_chunk", "tool_call", "agent_message_chunk"]),
);
const listedPrompt = listedAgent.prompt(
createPromptRequest(firstSession.sessionKey, "Follow-up"),
);
for (let attempt = 0; attempt < 10; attempt += 1) {
if (listedRequestMock.mock.calls.some((call) => call[0] === "chat.send")) {
break;
}
await new Promise<void>((resolve) => {
setImmediate(resolve);
});
}
const listedRunId = listedSessionStore.getSession(firstSession.sessionKey)?.activeRunId;
if (!listedRunId) {
throw new Error("Expected listed ACP session to have an active run");
}
await listedAgent.handleGatewayEvent(
createChatEvent({
sessionKey: firstSession.sessionKey,
runId: listedRunId,
state: "final",
text: "Follow-up answer",
}),
);
await expect(listedPrompt).resolves.toEqual({ stopReason: "end_turn" });
const canonicalReplay = await eventLedger.readReplay({
sessionId: created.sessionId,
sessionKey: firstSession.sessionKey,
});
expect(
canonicalReplay.events.filter((event) => event.update.sessionUpdate === "user_message_chunk"),
).toHaveLength(2);
await expect(
eventLedger.readReplayBySessionId({ sessionId: firstSession.sessionKey }),
).resolves.toMatchObject({ complete: false });
firstSessionStore.clearAllSessionsForTest();
});
it("does not replay prompts that Gateway rejected before accepting the send", async () => {
const eventLedger = createInMemoryAcpEventLedger();
const sessionStore = createInMemorySessionStore();
const connection = createAcpConnection();
const requestMock = vi.fn(async (method: string) => {
if (method === "chat.send") {
throw new Error("send failed before acceptance");
}
return { ok: true };
});
const agent = new AcpGatewayAgent(
connection,
createAcpGateway(requestMock as GatewayClient["request"]),
{
eventLedger,
sessionStore,
},
);
const created = await agent.newSession(createNewSessionRequest());
const session = sessionStore.getSession(created.sessionId);
if (!session) {
throw new Error("Expected new ACP session to be stored");
}
await expect(
agent.prompt(createPromptRequest(created.sessionId, "Never accepted")),
).rejects.toThrow("send failed before acceptance");
const replay = await eventLedger.readReplay({
sessionId: created.sessionId,
sessionKey: session.sessionKey,
});
expect(replay.events.map((event) => event.update.sessionUpdate)).not.toContain(
"user_message_chunk",
);
const loadConnection = createAcpConnection();
const loadRequestMock = vi.fn(async (method: string) => {
if (method === "sessions.get") {
throw new Error("ledger replay should not call sessions.get");
}
return { ok: true };
});
const loadAgent = new AcpGatewayAgent(
loadConnection,
createAcpGateway(loadRequestMock as GatewayClient["request"]),
{
eventLedger,
sessionStore: createInMemorySessionStore(),
},
);
await loadAgent.loadSession(createLoadSessionRequest(created.sessionId));
const replayedUpdates = loadConnection.__sessionUpdateMock.mock.calls.map(
(call) => call[0]?.update?.sessionUpdate,
);
expect(replayedUpdates).not.toContain("user_message_chunk");
});
it("marks replay incomplete when an accepted prompt cannot be recorded", async () => {
const innerLedger = createInMemoryAcpEventLedger();
let markIncompleteResolve: ((value: unknown) => void) | undefined;
const markIncompletePromise = new Promise((resolve) => {
markIncompleteResolve = resolve;
});
const eventLedger: AcpEventLedger = {
...innerLedger,
recordUserPrompt: async () => {
throw new Error("ledger write failed");
},
markIncomplete: async (params) => {
await innerLedger.markIncomplete(params);
markIncompleteResolve?.(params);
},
};
const sessionStore = createInMemorySessionStore();
const connection = createAcpConnection();
const requestMock = vi.fn(async (_method: string) => ({ ok: true }));
const agent = new AcpGatewayAgent(
connection,
createAcpGateway(requestMock as GatewayClient["request"]),
{
eventLedger,
sessionStore,
},
);
const created = await agent.newSession(createNewSessionRequest());
const session = sessionStore.getSession(created.sessionId);
if (!session) {
throw new Error("Expected new ACP session to be stored");
}
const prompt = agent.prompt(createPromptRequest(created.sessionId, "Question"));
for (let attempt = 0; attempt < 10; attempt += 1) {
if (requestMock.mock.calls.some((call) => call[0] === "chat.send")) {
break;
}
await new Promise<void>((resolve) => {
setImmediate(resolve);
});
}
await markIncompletePromise;
const runId = sessionStore.getSession(created.sessionId)?.activeRunId;
if (!runId) {
throw new Error("Expected active ACP run");
}
await agent.handleGatewayEvent(
createChatEvent({
sessionKey: session.sessionKey,
runId,
state: "final",
text: "Answer",
}),
);
await expect(prompt).resolves.toEqual({ stopReason: "end_turn" });
await expect(
innerLedger.readReplay({ sessionId: created.sessionId, sessionKey: session.sessionKey }),
).resolves.toEqual({ complete: false, events: [] });
});
});

View File

@@ -59,6 +59,63 @@ describe("acp translator stop reason mapping", () => {
await expect(promptPromise).resolves.toEqual({ stopReason: "cancelled" });
});
it("reconciles provisional ACP session keys to canonical Gateway keys by run id", async () => {
const sentRunIds: string[] = [];
const request = vi.fn(async (method: string, params?: Record<string, unknown>) => {
if (method === "chat.send") {
const runId = params?.idempotencyKey;
if (typeof runId === "string") {
sentRunIds.push(runId);
}
}
return {};
}) as GatewayClient["request"];
const { agent, sessionId, sessionStore } = createSessionAgentHarness(request, {
sessionKey: "acp:session-1",
});
const firstPrompt = promptAgent(agent, sessionId);
await vi.waitFor(() => {
expect(sentRunIds).toHaveLength(1);
});
await agent.handleGatewayEvent(
createChatEvent({
runId: sentRunIds[0],
sessionKey: "agent:main:acp:session-1",
seq: 1,
state: "final",
message: {
content: [{ type: "text", text: "first" }],
},
}),
);
await expect(firstPrompt).resolves.toEqual({ stopReason: "end_turn" });
expect(sessionStore.getSession(sessionId)?.sessionKey).toBe("agent:main:acp:session-1");
const secondPrompt = promptAgent(agent, sessionId, "again");
await vi.waitFor(() => {
expect(sentRunIds).toHaveLength(2);
});
expect(request).toHaveBeenLastCalledWith(
"chat.send",
expect.objectContaining({
sessionKey: "agent:main:acp:session-1",
}),
{ timeoutMs: null },
);
await agent.handleGatewayEvent(
createChatEvent({
runId: sentRunIds[1],
sessionKey: "agent:main:acp:session-1",
seq: 2,
state: "final",
}),
);
await expect(secondPrompt).resolves.toEqual({ stopReason: "end_turn" });
});
it("keeps in-flight prompts pending across transient gateway disconnects", async () => {
const { agent, promptPromise, runId } = await createPendingPromptHarness();
const settleSpy = observeSettlement(promptPromise);

View File

@@ -24,6 +24,7 @@ import type {
SessionConfigOption,
SessionInfo,
SessionModeState,
SessionUpdate,
SetSessionConfigOptionRequest,
SetSessionConfigOptionResponse,
SetSessionModeRequest,
@@ -42,6 +43,11 @@ import {
} from "../infra/fixed-window-rate-limit.js";
import { normalizeOptionalString } from "../shared/string-coerce.js";
import { shortenHomePath } from "../utils.js";
import {
createInMemoryAcpEventLedger,
type AcpEventLedger,
type AcpEventLedgerReplay,
} from "./event-ledger.js";
import {
extractAttachmentsFromPrompt,
extractToolCallContent,
@@ -97,6 +103,7 @@ type DisconnectContext = {
type PendingPrompt = {
sessionId: string;
sessionKey: string;
ledgerSessionId?: string;
idempotencyKey: string;
sendAccepted?: boolean;
disconnectContext?: DisconnectContext;
@@ -117,6 +124,7 @@ type PendingToolCall = {
};
type AcpGatewayAgentOptions = AcpServerOptions & {
eventLedger?: AcpEventLedger;
sessionStore?: AcpSessionStore;
};
@@ -501,12 +509,26 @@ function buildSystemProvenanceReceipt(params: {
].join("\n");
}
function hasExplicitSessionRouting(
meta: ReturnType<typeof parseSessionMeta>,
opts: AcpServerOptions,
): boolean {
return Boolean(
meta.sessionKey || meta.sessionLabel || opts.defaultSessionKey || opts.defaultSessionLabel,
);
}
function resolveLedgerSessionId(session: { sessionId: string; ledgerSessionId?: string }): string {
return session.ledgerSessionId ?? session.sessionId;
}
export class AcpGatewayAgent implements Agent {
private connection: AgentSideConnection;
private gateway: GatewayClient;
private opts: AcpGatewayAgentOptions;
private log: (msg: string) => void;
private sessionStore: AcpSessionStore;
private eventLedger: AcpEventLedger;
private sessionCreateRateLimiter: FixedWindowRateLimiter;
private pendingPrompts = new Map<string, PendingPrompt>();
private disconnectTimer: NodeJS.Timeout | null = null;
@@ -531,6 +553,7 @@ export class AcpGatewayAgent implements Agent {
this.opts = opts;
this.log = opts.verbose ? (msg: string) => process.stderr.write(`[acp] ${msg}\n`) : () => {};
this.sessionStore = opts.sessionStore ?? defaultAcpSessionStore;
this.eventLedger = opts.eventLedger ?? createInMemoryAcpEventLedger();
this.sessionCreateRateLimiter = createFixedWindowRateLimiter({
maxRequests: Math.max(
1,
@@ -625,12 +648,14 @@ export class AcpGatewayAgent implements Agent {
sessionKey,
cwd: params.cwd,
});
await this.startLedgerSession(session, { complete: true, reset: true });
this.log(`newSession: ${session.sessionId} -> ${session.sessionKey}`);
const sessionSnapshot = await this.getSessionSnapshot(session.sessionKey);
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
await this.sendSessionSnapshotUpdate(session, sessionSnapshot, {
includeControls: false,
record: true,
});
await this.sendAvailableCommands(session.sessionId);
await this.sendAvailableCommands(session, { record: true });
const { configOptions, modes } = sessionSnapshot;
return {
sessionId: session.sessionId,
@@ -646,29 +671,56 @@ export class AcpGatewayAgent implements Agent {
}
const meta = parseSessionMeta(params._meta);
const hasExplicitRouting = hasExplicitSessionRouting(meta, this.opts);
const exactLedgerReplay: AcpEventLedgerReplay = hasExplicitRouting
? { complete: false, events: [] }
: await this.readLedgerReplayBySessionId(params.sessionId);
const listedLedgerReplay: AcpEventLedgerReplay =
!hasExplicitRouting && !exactLedgerReplay.complete
? await this.readLedgerReplayBySessionKey(params.sessionId)
: { complete: false, events: [] };
const routedLedgerReplay = exactLedgerReplay.complete ? exactLedgerReplay : listedLedgerReplay;
const sessionKey = await this.resolveSessionKeyFromMeta({
meta,
fallbackKey: params.sessionId,
fallbackKey: routedLedgerReplay.sessionKey ?? params.sessionId,
});
const ledgerReplay =
exactLedgerReplay.complete && exactLedgerReplay.sessionKey === sessionKey
? exactLedgerReplay
: listedLedgerReplay.complete && listedLedgerReplay.sessionKey === sessionKey
? listedLedgerReplay
: await this.readLedgerReplay({
sessionId: params.sessionId,
sessionKey,
});
const session = this.sessionStore.createSession({
sessionId: params.sessionId,
sessionKey,
...(ledgerReplay.sessionId ? { ledgerSessionId: ledgerReplay.sessionId } : {}),
cwd: params.cwd,
});
await this.startLedgerSession(session, { complete: ledgerReplay.complete });
this.log(`loadSession: ${session.sessionId} -> ${session.sessionKey}`);
const [sessionSnapshot, transcript] = await Promise.all([
this.getSessionSnapshot(session.sessionKey),
this.getSessionTranscript(session.sessionKey).catch((err) => {
this.log(`session transcript fallback for ${session.sessionKey}: ${String(err)}`);
return [];
}),
ledgerReplay.complete
? Promise.resolve([])
: this.getSessionTranscript(session.sessionKey).catch((err) => {
this.log(`session transcript fallback for ${session.sessionKey}: ${String(err)}`);
return [];
}),
]);
await this.replaySessionTranscript(session.sessionId, transcript);
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
if (ledgerReplay.complete) {
await this.replayLedgerSession(session.sessionId, ledgerReplay);
} else {
await this.replaySessionTranscript(session.sessionId, transcript);
}
await this.sendSessionSnapshotUpdate(session, sessionSnapshot, {
includeControls: false,
record: false,
});
await this.sendAvailableCommands(session.sessionId);
await this.sendAvailableCommands(session, { record: false });
const { configOptions, modes } = sessionSnapshot;
return { configOptions, modes };
}
@@ -754,11 +806,13 @@ export class AcpGatewayAgent implements Agent {
sessionKey,
cwd: params.cwd,
});
await this.startLedgerSession(session, { complete: false });
this.log(`resumeSession: ${session.sessionId} -> ${session.sessionKey}`);
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
await this.sendSessionSnapshotUpdate(session, sessionSnapshot, {
includeControls: false,
record: false,
});
await this.sendAvailableCommands(session.sessionId);
await this.sendAvailableCommands(session, { record: false });
const { configOptions, modes } = sessionSnapshot;
return { configOptions, modes };
}
@@ -795,8 +849,9 @@ export class AcpGatewayAgent implements Agent {
const sessionSnapshot = await this.getSessionSnapshot(session.sessionKey, {
thinkingLevel: params.modeId,
});
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
await this.sendSessionSnapshotUpdate(session, sessionSnapshot, {
includeControls: true,
record: true,
});
} catch (err) {
this.log(`setSessionMode error: ${String(err)}`);
@@ -828,8 +883,9 @@ export class AcpGatewayAgent implements Agent {
session.sessionKey,
sessionPatch.overrides,
);
await this.sendSessionSnapshotUpdate(session.sessionId, sessionSnapshot, {
await this.sendSessionSnapshotUpdate(session, sessionSnapshot, {
includeControls: true,
record: true,
});
return {
configOptions: sessionSnapshot.configOptions,
@@ -892,6 +948,7 @@ export class AcpGatewayAgent implements Agent {
this.pendingPrompts.set(params.sessionId, {
sessionId: params.sessionId,
sessionKey: session.sessionKey,
...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}),
idempotencyKey: runId,
disconnectContext: this.activeDisconnectContext ?? undefined,
resolve,
@@ -902,6 +959,12 @@ export class AcpGatewayAgent implements Agent {
}
const sendWithProvenanceFallback = async () => {
const markSendAccepted = () => {
const pending = this.getPendingPrompt(params.sessionId, runId);
if (pending) {
pending.sendAccepted = true;
}
};
try {
await this.gateway.request(
"chat.send",
@@ -912,20 +975,16 @@ export class AcpGatewayAgent implements Agent {
},
{ timeoutMs: null },
);
const pending = this.getPendingPrompt(params.sessionId, runId);
if (pending) {
pending.sendAccepted = true;
}
markSendAccepted();
await this.recordUserPrompt(session, runId, params.prompt);
} catch (err) {
if (
(systemInputProvenance || systemProvenanceReceipt) &&
isAdminScopeProvenanceRejection(err)
) {
await this.gateway.request("chat.send", requestParams, { timeoutMs: null });
const pending = this.getPendingPrompt(params.sessionId, runId);
if (pending) {
pending.sendAccepted = true;
}
markSendAccepted();
await this.recordUserPrompt(session, runId, params.prompt);
return;
}
throw err;
@@ -1018,8 +1077,12 @@ export class AcpGatewayAgent implements Agent {
rawInput: args,
locations,
});
await this.connection.sessionUpdate({
await this.emitSessionUpdate({
sessionId: pending.sessionId,
sessionKey: pending.sessionKey,
...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}),
runId: pending.idempotencyKey,
record: true,
update: {
sessionUpdate: "tool_call",
toolCallId,
@@ -1036,8 +1099,12 @@ export class AcpGatewayAgent implements Agent {
if (phase === "update") {
const toolState = pending.toolCalls?.get(toolCallId);
const partialResult = data.partialResult;
await this.connection.sessionUpdate({
await this.emitSessionUpdate({
sessionId: pending.sessionId,
sessionKey: pending.sessionKey,
...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}),
runId: pending.idempotencyKey,
record: true,
update: {
sessionUpdate: "tool_call_update",
toolCallId,
@@ -1054,8 +1121,12 @@ export class AcpGatewayAgent implements Agent {
const isError = Boolean(data.isError);
const toolState = pending.toolCalls?.get(toolCallId);
pending.toolCalls?.delete(toolCallId);
await this.connection.sessionUpdate({
await this.emitSessionUpdate({
sessionId: pending.sessionId,
sessionKey: pending.sessionKey,
...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}),
runId: pending.idempotencyKey,
record: true,
update: {
sessionUpdate: "tool_call_update",
toolCallId,
@@ -1135,8 +1206,12 @@ export class AcpGatewayAgent implements Agent {
const newThought = fullThought.slice(sentThoughtSoFar);
pending.sentThoughtLength = fullThought.length;
pending.sentThought = fullThought;
await this.connection.sessionUpdate({
await this.emitSessionUpdate({
sessionId,
sessionKey: pending.sessionKey,
...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}),
runId: pending.idempotencyKey,
record: true,
update: {
sessionUpdate: "agent_thought_chunk",
content: { type: "text", text: newThought },
@@ -1157,8 +1232,12 @@ export class AcpGatewayAgent implements Agent {
const newText = fullText.slice(sentSoFar);
pending.sentTextLength = fullText.length;
pending.sentText = fullText;
await this.connection.sessionUpdate({
await this.emitSessionUpdate({
sessionId,
sessionKey: pending.sessionKey,
...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}),
runId: pending.idempotencyKey,
record: true,
update: {
sessionUpdate: "agent_message_chunk",
content: { type: "text", text: newText },
@@ -1178,9 +1257,19 @@ export class AcpGatewayAgent implements Agent {
}
const sessionSnapshot = await this.getSessionSnapshot(pending.sessionKey);
try {
await this.sendSessionSnapshotUpdate(sessionId, sessionSnapshot, {
includeControls: false,
});
await this.sendSessionSnapshotUpdate(
{
sessionId,
sessionKey: pending.sessionKey,
...(pending.ledgerSessionId ? { ledgerSessionId: pending.ledgerSessionId } : {}),
},
sessionSnapshot,
{
includeControls: false,
record: true,
runId: pending.idempotencyKey,
},
);
} catch (err) {
this.log(`session snapshot update failed for ${sessionId}: ${String(err)}`);
}
@@ -1197,9 +1286,30 @@ export class AcpGatewayAgent implements Agent {
}
return pending;
}
if (runId) {
for (const pending of this.pendingPrompts.values()) {
if (pending.idempotencyKey !== runId) {
continue;
}
this.reconcilePendingSessionKey(pending, sessionKey);
return pending;
}
}
return undefined;
}
private reconcilePendingSessionKey(pending: PendingPrompt, sessionKey: string): void {
if (pending.sessionKey === sessionKey) {
return;
}
this.log(`session key reconciled: ${pending.sessionKey} -> ${sessionKey}`);
pending.sessionKey = sessionKey;
const session = this.sessionStore.getSession(pending.sessionId);
if (session?.activeRunId === pending.idempotencyKey) {
session.sessionKey = sessionKey;
}
}
private clearDisconnectTimer(): void {
if (!this.disconnectTimer) {
return;
@@ -1351,9 +1461,142 @@ export class AcpGatewayAgent implements Agent {
return true;
}
private async sendAvailableCommands(sessionId: string): Promise<void> {
private async startLedgerSession(
session: { sessionId: string; sessionKey: string; ledgerSessionId?: string; cwd: string },
options: { complete: boolean; reset?: boolean },
): Promise<void> {
try {
await this.eventLedger.startSession({
sessionId: resolveLedgerSessionId(session),
sessionKey: session.sessionKey,
cwd: session.cwd,
complete: options.complete,
...(options.reset ? { reset: true } : {}),
});
} catch (err) {
this.log(`event ledger session start failed for ${session.sessionId}: ${String(err)}`);
}
}
private async readLedgerReplay(params: {
sessionId: string;
sessionKey: string;
}): Promise<AcpEventLedgerReplay> {
try {
return await this.eventLedger.readReplay(params);
} catch (err) {
this.log(`event ledger replay fallback for ${params.sessionId}: ${String(err)}`);
return { complete: false, events: [] };
}
}
private async readLedgerReplayBySessionId(sessionId: string): Promise<AcpEventLedgerReplay> {
try {
return await this.eventLedger.readReplayBySessionId({ sessionId });
} catch (err) {
this.log(`event ledger exact replay fallback for ${sessionId}: ${String(err)}`);
return { complete: false, events: [] };
}
}
private async readLedgerReplayBySessionKey(sessionKey: string): Promise<AcpEventLedgerReplay> {
try {
return await this.eventLedger.readReplayBySessionKey({ sessionKey });
} catch (err) {
this.log(`event ledger session-key replay fallback for ${sessionKey}: ${String(err)}`);
return { complete: false, events: [] };
}
}
private async recordUserPrompt(
session: { sessionId: string; sessionKey: string; ledgerSessionId?: string },
runId: string,
prompt: PromptRequest["prompt"],
): Promise<void> {
try {
await this.eventLedger.recordUserPrompt({
sessionId: resolveLedgerSessionId(session),
sessionKey: session.sessionKey,
runId,
prompt,
});
} catch (err) {
this.log(`event ledger prompt record failed for ${session.sessionId}: ${String(err)}`);
await this.markLedgerIncomplete(session);
}
}
private async recordLedgerUpdate(params: {
sessionId: string;
sessionKey: string;
ledgerSessionId?: string;
runId?: string;
update: SessionUpdate;
}): Promise<void> {
try {
await this.eventLedger.recordUpdate({
sessionId: params.ledgerSessionId ?? params.sessionId,
sessionKey: params.sessionKey,
...(params.runId ? { runId: params.runId } : {}),
update: params.update,
});
} catch (err) {
this.log(`event ledger update record failed for ${params.sessionId}: ${String(err)}`);
await this.markLedgerIncomplete({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
...(params.ledgerSessionId ? { ledgerSessionId: params.ledgerSessionId } : {}),
});
}
}
private async markLedgerIncomplete(session: {
sessionId: string;
sessionKey: string;
ledgerSessionId?: string;
}): Promise<void> {
try {
await this.eventLedger.markIncomplete({
sessionId: resolveLedgerSessionId(session),
sessionKey: session.sessionKey,
});
} catch (err) {
this.log(`event ledger incomplete mark failed for ${session.sessionId}: ${String(err)}`);
}
}
private async emitSessionUpdate(params: {
sessionId: string;
sessionKey?: string;
ledgerSessionId?: string;
runId?: string;
update: SessionUpdate;
record?: boolean;
}): Promise<void> {
await this.connection.sessionUpdate({
sessionId,
sessionId: params.sessionId,
update: params.update,
});
if (params.record && params.sessionKey) {
await this.recordLedgerUpdate({
sessionId: params.sessionId,
sessionKey: params.sessionKey,
...(params.ledgerSessionId ? { ledgerSessionId: params.ledgerSessionId } : {}),
...(params.runId ? { runId: params.runId } : {}),
update: params.update,
});
}
}
private async sendAvailableCommands(
session: { sessionId: string; sessionKey: string; ledgerSessionId?: string },
options: { record: boolean },
): Promise<void> {
await this.emitSessionUpdate({
sessionId: session.sessionId,
sessionKey: session.sessionKey,
...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}),
record: options.record,
update: {
sessionUpdate: "available_commands_update",
availableCommands: await getAvailableCommandsForAcp(),
@@ -1552,7 +1795,7 @@ export class AcpGatewayAgent implements Agent {
for (const message of transcript) {
const replayChunks = extractReplayChunks(message);
for (const chunk of replayChunks) {
await this.connection.sessionUpdate({
await this.emitSessionUpdate({
sessionId,
update: {
sessionUpdate: chunk.sessionUpdate,
@@ -1563,21 +1806,42 @@ export class AcpGatewayAgent implements Agent {
}
}
private async sendSessionSnapshotUpdate(
private async replayLedgerSession(
sessionId: string,
ledgerReplay: AcpEventLedgerReplay,
): Promise<void> {
for (const event of ledgerReplay.events) {
await this.emitSessionUpdate({
sessionId,
update: event.update,
record: false,
});
}
}
private async sendSessionSnapshotUpdate(
session: { sessionId: string; sessionKey: string; ledgerSessionId?: string },
sessionSnapshot: SessionSnapshot,
options: { includeControls: boolean },
options: { includeControls: boolean; record: boolean; runId?: string },
): Promise<void> {
if (options.includeControls) {
await this.connection.sessionUpdate({
sessionId,
await this.emitSessionUpdate({
sessionId: session.sessionId,
sessionKey: session.sessionKey,
...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}),
runId: options.runId,
record: options.record,
update: {
sessionUpdate: "current_mode_update",
currentModeId: sessionSnapshot.modes.currentModeId,
},
});
await this.connection.sessionUpdate({
sessionId,
await this.emitSessionUpdate({
sessionId: session.sessionId,
sessionKey: session.sessionKey,
...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}),
runId: options.runId,
record: options.record,
update: {
sessionUpdate: "config_option_update",
configOptions: sessionSnapshot.configOptions,
@@ -1585,8 +1849,12 @@ export class AcpGatewayAgent implements Agent {
});
}
if (sessionSnapshot.metadata) {
await this.connection.sessionUpdate({
sessionId,
await this.emitSessionUpdate({
sessionId: session.sessionId,
sessionKey: session.sessionKey,
...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}),
runId: options.runId,
record: options.record,
update: {
sessionUpdate: "session_info_update",
...sessionSnapshot.metadata,
@@ -1594,8 +1862,12 @@ export class AcpGatewayAgent implements Agent {
});
}
if (sessionSnapshot.usage) {
await this.connection.sessionUpdate({
sessionId,
await this.emitSessionUpdate({
sessionId: session.sessionId,
sessionKey: session.sessionKey,
...(session.ledgerSessionId ? { ledgerSessionId: session.ledgerSessionId } : {}),
runId: options.runId,
record: options.record,
update: {
sessionUpdate: "usage_update",
used: sessionSnapshot.usage.used,

View File

@@ -21,6 +21,7 @@ export function normalizeAcpProvenanceMode(
export type AcpSession = {
sessionId: SessionId;
sessionKey: string;
ledgerSessionId?: string;
cwd: string;
createdAt: number;
lastTouchedAt: number;