From 4c9390a36eecd98eb88d41761e8b829653c2f325 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 02:58:34 +0100 Subject: [PATCH] refactor(gateway): finish async session read paths (#75892) * refactor(gateway): finish async session read paths * fix(gateway): migrate async checkpoint forks --- .../src/app-server/event-projector.test.ts | 2 + .../codex/src/app-server/event-projector.ts | 14 +- .../codex/src/app-server/run-attempt.ts | 24 +- .../codex/src/app-server/session-history.ts | 44 ++++ src/gateway/server-methods/sessions.ts | 61 ++--- .../server.sessions.compaction.test.ts | 76 +++++-- .../session-compaction-checkpoints.test.ts | 209 +++++++++++------- src/gateway/session-compaction-checkpoints.ts | 169 ++++++++------ src/gateway/session-history-state.test.ts | 35 ++- src/gateway/session-history-state.ts | 73 +----- 10 files changed, 398 insertions(+), 309 deletions(-) create mode 100644 extensions/codex/src/app-server/session-history.ts diff --git a/extensions/codex/src/app-server/event-projector.test.ts b/extensions/codex/src/app-server/event-projector.test.ts index cfcb22b6f47..56fdf31b243 100644 --- a/extensions/codex/src/app-server/event-projector.test.ts +++ b/extensions/codex/src/app-server/event-projector.test.ts @@ -780,6 +780,7 @@ describe("CodexAppServerEventProjector", () => { it("fires before_compaction and after_compaction hooks for codex compaction items", async () => { const { projector, beforeCompaction, afterCompaction } = await createProjectorWithHooks(); + const openSpy = vi.spyOn(SessionManager, "open"); await projector.handleNotification( forCurrentTurn("item/started", { @@ -791,6 +792,7 @@ describe("CodexAppServerEventProjector", () => { item: { type: "contextCompaction", id: "compact-1" }, }), ); + expect(openSpy).not.toHaveBeenCalled(); expect(beforeCompaction).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/extensions/codex/src/app-server/event-projector.ts b/extensions/codex/src/app-server/event-projector.ts index ace1519135b..6e4f9b6264f 100644 --- a/extensions/codex/src/app-server/event-projector.ts +++ b/extensions/codex/src/app-server/event-projector.ts @@ -1,5 +1,4 @@ import type { AssistantMessage, Usage } from "@mariozechner/pi-ai"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; import { classifyAgentHarnessTerminalOutcome, embeddedAgentLog, @@ -27,6 +26,7 @@ import { type JsonObject, type JsonValue, } from "./protocol.js"; +import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; export type CodexAppServerToolTelemetry = { didSendViaMessagingTool: boolean; @@ -337,7 +337,7 @@ export class CodexAppServerEventProjector { this.activeCompactionItemIds.add(itemId); await runAgentHarnessBeforeCompactionHook({ sessionFile: this.params.sessionFile, - messages: this.readMirroredSessionMessages(), + messages: await this.readMirroredSessionMessages(), ctx: { runId: this.params.runId, agentId: this.params.agentId, @@ -388,7 +388,7 @@ export class CodexAppServerEventProjector { this.completedCompactionCount += 1; await runAgentHarnessAfterCompactionHook({ sessionFile: this.params.sessionFile, - messages: this.readMirroredSessionMessages(), + messages: await this.readMirroredSessionMessages(), compactedCount: -1, ctx: { runId: this.params.runId, @@ -763,12 +763,8 @@ export class CodexAppServerEventProjector { this.assistantItemOrder.push(itemId); } - private readMirroredSessionMessages(): AgentMessage[] { - try { - return SessionManager.open(this.params.sessionFile).buildSessionContext().messages; - } catch { - return []; - } + private async readMirroredSessionMessages(): Promise { + return (await readCodexMirroredSessionHistoryMessages(this.params.sessionFile)) ?? []; } private createAssistantMessage(text: string): AssistantMessage { diff --git a/extensions/codex/src/app-server/run-attempt.ts b/extensions/codex/src/app-server/run-attempt.ts index 85b89dcbe01..20f4f1f76a6 100644 --- a/extensions/codex/src/app-server/run-attempt.ts +++ b/extensions/codex/src/app-server/run-attempt.ts @@ -74,6 +74,7 @@ import { type JsonValue, } from "./protocol.js"; import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js"; +import { readCodexMirroredSessionHistoryMessages } from "./session-history.js"; import { clearSharedCodexAppServerClient } from "./shared-client.js"; import { buildDeveloperInstructions, @@ -400,9 +401,9 @@ export async function runCodexAppServerAttempt( }, }); const hadSessionFile = await fileExists(params.sessionFile); - const sessionManager = SessionManager.open(params.sessionFile); + const sessionManager = activeContextEngine ? SessionManager.open(params.sessionFile) : undefined; let historyMessages = - readMirroredSessionHistoryMessages(params.sessionFile, sessionManager) ?? []; + (await readMirroredSessionHistoryMessages(params.sessionFile, sessionManager)) ?? []; const hookContext = { runId: params.runId, agentId: sessionAgentId, @@ -430,7 +431,8 @@ export async function runCodexAppServerAttempt( runMaintenance: runHarnessContextEngineMaintenance, warn: (message) => embeddedAgentLog.warn(message), }); - historyMessages = readMirroredSessionHistoryMessages(params.sessionFile) ?? historyMessages; + historyMessages = + (await readMirroredSessionHistoryMessages(params.sessionFile)) ?? historyMessages; } const baseDeveloperInstructions = buildDeveloperInstructions(params); let promptText = params.prompt; @@ -1097,7 +1099,7 @@ export async function runCodexAppServerAttempt( } if (activeContextEngine) { const finalMessages = - readMirroredSessionHistoryMessages(params.sessionFile) ?? + (await readMirroredSessionHistoryMessages(params.sessionFile)) ?? historyMessages.concat(result.messagesSnapshot); await finalizeHarnessContextEngineTurn({ contextEngine: activeContextEngine, @@ -1553,19 +1555,17 @@ function readString(record: JsonObject, key: string): string | undefined { return typeof value === "string" ? value : undefined; } -function readMirroredSessionHistoryMessages( +async function readMirroredSessionHistoryMessages( sessionFile: string, - sessionManager?: SessionManager, -): AgentMessage[] | undefined { - try { - return (sessionManager ?? SessionManager.open(sessionFile)).buildSessionContext().messages; - } catch (error) { + sessionManager?: Pick, +): Promise { + const messages = await readCodexMirroredSessionHistoryMessages(sessionFile, sessionManager); + if (!messages) { embeddedAgentLog.warn("failed to read mirrored session history for codex harness hooks", { - error, sessionFile, }); - return undefined; } + return messages; } async function mirrorTranscriptBestEffort(params: { diff --git a/extensions/codex/src/app-server/session-history.ts b/extensions/codex/src/app-server/session-history.ts new file mode 100644 index 00000000000..469c4034364 --- /dev/null +++ b/extensions/codex/src/app-server/session-history.ts @@ -0,0 +1,44 @@ +import fs from "node:fs/promises"; +import type { SessionEntry, SessionManager } from "@mariozechner/pi-coding-agent"; +import { + buildSessionContext, + migrateSessionEntries, + parseSessionEntries, +} from "@mariozechner/pi-coding-agent"; +import type { AgentMessage } from "openclaw/plugin-sdk/agent-harness-runtime"; + +function isMissingFileError(error: unknown): boolean { + return Boolean( + error && + typeof error === "object" && + "code" in error && + (error as { code?: unknown }).code === "ENOENT", + ); +} + +export async function readCodexMirroredSessionHistoryMessages( + sessionFile: string, + sessionManager?: Pick, +): Promise { + try { + if (sessionManager) { + return sessionManager.buildSessionContext().messages; + } + const raw = await fs.readFile(sessionFile, "utf-8"); + const entries = parseSessionEntries(raw); + const firstEntry = entries[0] as { type?: unknown; id?: unknown } | undefined; + if (firstEntry?.type !== "session" || typeof firstEntry.id !== "string") { + return undefined; + } + migrateSessionEntries(entries); + const sessionEntries = entries.filter( + (entry): entry is SessionEntry => entry.type !== "session", + ); + return buildSessionContext(sessionEntries).messages; + } catch (error) { + if (isMissingFileError(error)) { + return []; + } + return undefined; + } +} diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index 5505296a292..54bfa66ba4a 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -1,7 +1,7 @@ import { randomUUID } from "node:crypto"; import fs from "node:fs"; import path from "node:path"; -import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; +import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent"; import { resolveAgentRuntimeMetadata } from "../../agents/agent-runtime-metadata.js"; import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js"; import { @@ -66,6 +66,7 @@ import { } from "../protocol/index.js"; import { resolveSessionKeyForRun } from "../server-session-key.js"; import { + forkCompactionCheckpointTranscriptAsync, getSessionCompactionCheckpoint, listSessionCompactionCheckpoints, } from "../session-compaction-checkpoints.js"; @@ -1089,26 +1090,11 @@ export const sessionsHandlers: GatewayRequestHandlers = { ); return; } - if (!fs.existsSync(checkpoint.preCompaction.sessionFile)) { - respond( - false, - undefined, - errorShape(ErrorCodes.UNAVAILABLE, "checkpoint snapshot transcript is missing"), - ); - return; - } - - const snapshotSession = SessionManager.open( - checkpoint.preCompaction.sessionFile, - path.dirname(checkpoint.preCompaction.sessionFile), - ); - const branchedSession = SessionManager.forkFrom( - checkpoint.preCompaction.sessionFile, - snapshotSession.getCwd(), - path.dirname(checkpoint.preCompaction.sessionFile), - ); - const branchedSessionFile = branchedSession.getSessionFile(); - if (!branchedSessionFile) { + const branchedSession = await forkCompactionCheckpointTranscriptAsync({ + sourceFile: checkpoint.preCompaction.sessionFile, + sessionDir: path.dirname(checkpoint.preCompaction.sessionFile), + }); + if (!branchedSession?.sessionFile) { respond( false, undefined, @@ -1120,8 +1106,8 @@ export const sessionsHandlers: GatewayRequestHandlers = { const label = entry.label?.trim() ? `${entry.label.trim()} (checkpoint)` : "Checkpoint branch"; const nextEntry = cloneCheckpointSessionEntry({ currentEntry: entry, - nextSessionId: branchedSession.getSessionId(), - nextSessionFile: branchedSessionFile, + nextSessionId: branchedSession.sessionId, + nextSessionFile: branchedSession.sessionFile, label, parentSessionKey: canonicalKey, totalTokens: checkpoint.tokensBefore, @@ -1203,15 +1189,6 @@ export const sessionsHandlers: GatewayRequestHandlers = { ); return; } - if (!fs.existsSync(checkpoint.preCompaction.sessionFile)) { - respond( - false, - undefined, - errorShape(ErrorCodes.UNAVAILABLE, "checkpoint snapshot transcript is missing"), - ); - return; - } - const interruptResult = await interruptSessionRunIfActive({ req, context, @@ -1226,17 +1203,11 @@ export const sessionsHandlers: GatewayRequestHandlers = { return; } - const snapshotSession = SessionManager.open( - checkpoint.preCompaction.sessionFile, - path.dirname(checkpoint.preCompaction.sessionFile), - ); - const restoredSession = SessionManager.forkFrom( - checkpoint.preCompaction.sessionFile, - snapshotSession.getCwd(), - path.dirname(checkpoint.preCompaction.sessionFile), - ); - const restoredSessionFile = restoredSession.getSessionFile(); - if (!restoredSessionFile) { + const restoredSession = await forkCompactionCheckpointTranscriptAsync({ + sourceFile: checkpoint.preCompaction.sessionFile, + sessionDir: path.dirname(checkpoint.preCompaction.sessionFile), + }); + if (!restoredSession?.sessionFile) { respond( false, undefined, @@ -1246,8 +1217,8 @@ export const sessionsHandlers: GatewayRequestHandlers = { } const nextEntry = cloneCheckpointSessionEntry({ currentEntry: entry, - nextSessionId: restoredSession.getSessionId(), - nextSessionFile: restoredSessionFile, + nextSessionId: restoredSession.sessionId, + nextSessionFile: restoredSession.sessionFile, totalTokens: checkpoint.tokensBefore, preserveCompactionCheckpoints: true, }); diff --git a/src/gateway/server.sessions.compaction.test.ts b/src/gateway/server.sessions.compaction.test.ts index 973cbeab7ab..e8eb5bd4dd9 100644 --- a/src/gateway/server.sessions.compaction.test.ts +++ b/src/gateway/server.sessions.compaction.test.ts @@ -1,7 +1,7 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { expect, test } from "vitest"; +import { expect, test, vi } from "vitest"; import { withEnvAsync } from "../test-utils/env.js"; import { embeddedRunMock, @@ -106,15 +106,34 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre- fixture.preCompactionSessionFile, ); - const branched = await rpcReq<{ - ok: true; - sourceKey: string; - key: string; - entry: { sessionId: string; sessionFile?: string; parentSessionKey?: string }; - }>(ws, "sessions.compaction.branch", { - key: "main", - checkpointId: "checkpoint-1", - }); + const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); + const sessionManagerForkFromSpy = vi.spyOn(SessionManager, "forkFrom"); + let branched: Awaited< + ReturnType< + typeof rpcReq<{ + ok: true; + sourceKey: string; + key: string; + entry: { sessionId: string; sessionFile?: string; parentSessionKey?: string }; + }> + > + >; + try { + branched = await rpcReq<{ + ok: true; + sourceKey: string; + key: string; + entry: { sessionId: string; sessionFile?: string; parentSessionKey?: string }; + }>(ws, "sessions.compaction.branch", { + key: "main", + checkpointId: "checkpoint-1", + }); + expect(sessionManagerOpenSpy).not.toHaveBeenCalled(); + expect(sessionManagerForkFromSpy).not.toHaveBeenCalled(); + } finally { + sessionManagerOpenSpy.mockRestore(); + sessionManagerForkFromSpy.mockRestore(); + } expect(branched.ok).toBe(true); expect(branched.payload?.sourceKey).toBe("agent:main:main"); expect(branched.payload?.entry.parentSessionKey).toBe("agent:main:main"); @@ -137,15 +156,34 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre- expect(branchedEntry?.parentSessionKey).toBe("agent:main:main"); expect(branchedEntry?.compactionCheckpoints).toBeUndefined(); - const restored = await rpcReq<{ - ok: true; - key: string; - sessionId: string; - entry: { sessionId: string; sessionFile?: string; compactionCheckpoints?: unknown[] }; - }>(ws, "sessions.compaction.restore", { - key: "main", - checkpointId: "checkpoint-1", - }); + const restoreSessionManagerOpenSpy = vi.spyOn(SessionManager, "open"); + const restoreSessionManagerForkFromSpy = vi.spyOn(SessionManager, "forkFrom"); + let restored: Awaited< + ReturnType< + typeof rpcReq<{ + ok: true; + key: string; + sessionId: string; + entry: { sessionId: string; sessionFile?: string; compactionCheckpoints?: unknown[] }; + }> + > + >; + try { + restored = await rpcReq<{ + ok: true; + key: string; + sessionId: string; + entry: { sessionId: string; sessionFile?: string; compactionCheckpoints?: unknown[] }; + }>(ws, "sessions.compaction.restore", { + key: "main", + checkpointId: "checkpoint-1", + }); + expect(restoreSessionManagerOpenSpy).not.toHaveBeenCalled(); + expect(restoreSessionManagerForkFromSpy).not.toHaveBeenCalled(); + } finally { + restoreSessionManagerOpenSpy.mockRestore(); + restoreSessionManagerForkFromSpy.mockRestore(); + } expect(restored.ok).toBe(true); expect(restored.payload?.key).toBe("agent:main:main"); expect(restored.payload?.sessionId).not.toBe(fixture.sessionId); diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index 12b5e3c48a6..7ea7e968425 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -2,14 +2,14 @@ import fsSync from "node:fs"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import type { AssistantMessage, UserMessage } from "@mariozechner/pi-ai"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent"; import { afterEach, describe, expect, test, vi } from "vitest"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { - captureCompactionCheckpointSnapshot, captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, + forkCompactionCheckpointTranscriptAsync, MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, persistSessionCompactionCheckpoint, readSessionLeafIdFromTranscriptAsync, @@ -22,71 +22,6 @@ afterEach(async () => { }); describe("session-compaction-checkpoints", () => { - test("capture stores the copied pre-compaction transcript path and cleanup removes only the copy", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-")); - tempDirs.push(dir); - - const session = SessionManager.create(dir, dir); - const userMessage: UserMessage = { - role: "user", - content: "before compaction", - timestamp: Date.now(), - }; - const assistantMessage: AssistantMessage = { - role: "assistant", - content: [{ type: "text", text: "working on it" }], - api: "responses", - provider: "openai", - model: "gpt-test", - usage: { - input: 1, - output: 1, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 2, - cost: { - input: 0, - output: 0, - cacheRead: 0, - cacheWrite: 0, - total: 0, - }, - }, - stopReason: "stop", - timestamp: Date.now(), - }; - session.appendMessage(userMessage); - session.appendMessage(assistantMessage); - - const sessionFile = session.getSessionFile(); - const leafId = session.getLeafId(); - expect(sessionFile).toBeTruthy(); - expect(leafId).toBeTruthy(); - - const originalBefore = await fs.readFile(sessionFile!, "utf-8"); - const snapshot = captureCompactionCheckpointSnapshot({ - sessionManager: session, - sessionFile: sessionFile!, - }); - - expect(snapshot).not.toBeNull(); - expect(snapshot?.leafId).toBe(leafId); - expect(snapshot?.sessionFile).not.toBe(sessionFile); - expect(snapshot?.sessionFile).toContain(".checkpoint."); - expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(true); - expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore); - - session.appendCompaction("checkpoint summary", leafId!, 123, { ok: true }); - - expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore); - expect(await fs.readFile(sessionFile!, "utf-8")).not.toBe(originalBefore); - - await cleanupCompactionCheckpointSnapshot(snapshot); - - expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(false); - expect(fsSync.existsSync(sessionFile!)).toBe(true); - }); - test("async capture stores the copied pre-compaction transcript without sync copy", async () => { const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-")); tempDirs.push(dir); @@ -225,29 +160,145 @@ describe("session-compaction-checkpoints", () => { } }); - test("capture skips oversized pre-compaction transcripts", async () => { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-")); + test("async fork creates a checkpoint branch transcript without SessionManager sync reads", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-fork-")); tempDirs.push(dir); const session = SessionManager.create(dir, dir); session.appendMessage({ role: "user", - content: "before compaction", + content: "before checkpoint fork", timestamp: Date.now(), }); + session.appendMessage({ + role: "assistant", + content: "fork me", + api: "responses", + provider: "openai", + model: "gpt-test", + timestamp: Date.now(), + } as unknown as AssistantMessage); + const sessionFile = session.getSessionFile(); expect(sessionFile).toBeTruthy(); - await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8"); + await fs.appendFile(sessionFile!, "\nnot-json\n", "utf-8"); - const snapshot = captureCompactionCheckpointSnapshot({ - sessionManager: session, - sessionFile: sessionFile!, - maxBytes: 64, + const openSpy = vi.spyOn(SessionManager, "open"); + const forkSpy = vi.spyOn(SessionManager, "forkFrom"); + let forked: Awaited> = null; + try { + forked = await forkCompactionCheckpointTranscriptAsync({ + sourceFile: sessionFile!, + sessionDir: dir, + }); + + expect(openSpy).not.toHaveBeenCalled(); + expect(forkSpy).not.toHaveBeenCalled(); + expect(forked).not.toBeNull(); + expect(forked?.sessionFile).not.toBe(sessionFile); + expect(forked?.sessionId).toBeTruthy(); + } finally { + openSpy.mockRestore(); + forkSpy.mockRestore(); + } + + const forkedLines = (await fs.readFile(forked!.sessionFile, "utf-8")).trim().split(/\r?\n/); + const forkedEntries = forkedLines.map((line) => JSON.parse(line) as Record); + const sourceEntries = (await fs.readFile(sessionFile!, "utf-8")) + .trim() + .split(/\r?\n/) + .flatMap((line) => { + try { + return [JSON.parse(line) as Record]; + } catch { + return []; + } + }); + + expect(forkedEntries[0]).toMatchObject({ + type: "session", + id: forked!.sessionId, + cwd: dir, + parentSession: sessionFile, + }); + expect(forkedEntries.slice(1)).toEqual( + sourceEntries.filter((entry) => entry.type !== "session"), + ); + }); + + test("async fork migrates legacy checkpoint snapshots before writing a current header", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-legacy-fork-")); + tempDirs.push(dir); + + const legacySessionFile = path.join(dir, "legacy.jsonl"); + const firstMessage = { + type: "message", + timestamp: new Date(0).toISOString(), + message: { + role: "user", + content: "legacy first", + timestamp: 1, + }, + }; + const secondMessage = { + type: "message", + timestamp: new Date(1).toISOString(), + message: { + role: "assistant", + content: "legacy second", + api: "responses", + provider: "openai", + model: "gpt-test", + timestamp: 2, + }, + }; + await fs.writeFile( + legacySessionFile, + [ + JSON.stringify({ + type: "session", + id: "legacy-session", + timestamp: new Date(0).toISOString(), + cwd: dir, + }), + JSON.stringify(firstMessage), + JSON.stringify(secondMessage), + "", + ].join("\n"), + "utf-8", + ); + + const forked = await forkCompactionCheckpointTranscriptAsync({ + sourceFile: legacySessionFile, + sessionDir: dir, }); - expect(snapshot).toBeNull(); - expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64); - expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]); + expect(forked).not.toBeNull(); + const forkedEntries = (await fs.readFile(forked!.sessionFile, "utf-8")) + .trim() + .split(/\r?\n/) + .map((line) => JSON.parse(line) as Record); + expect(forkedEntries[0]).toMatchObject({ + type: "session", + version: CURRENT_SESSION_VERSION, + id: forked!.sessionId, + parentSession: legacySessionFile, + }); + expect(forkedEntries[1]).toMatchObject({ + type: "message", + parentId: null, + message: expect.objectContaining({ content: "legacy first" }), + }); + expect(forkedEntries[1]?.id).toEqual(expect.any(String)); + expect(forkedEntries[2]).toMatchObject({ + type: "message", + parentId: forkedEntries[1]?.id, + message: expect.objectContaining({ content: "legacy second" }), + }); + expect(forkedEntries[2]?.id).toEqual(expect.any(String)); + + const messages = SessionManager.open(forked!.sessionFile, dir).buildSessionContext().messages; + expect(messages.map((message) => message.content)).toEqual(["legacy first", "legacy second"]); }); test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => { diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index cbca36fb186..efed3a22600 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -1,8 +1,13 @@ import { randomUUID } from "node:crypto"; -import fsSync from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; -import { SessionManager } from "@mariozechner/pi-coding-agent"; +import { + CURRENT_SESSION_VERSION, + migrateSessionEntries, + SessionManager, + type FileEntry as PiSessionFileEntry, +} from "@mariozechner/pi-coding-agent"; +import { v7 as uuidv7 } from "uuid"; import { updateSessionStore } from "../config/sessions.js"; import type { SessionCompactionCheckpoint, @@ -24,6 +29,11 @@ export type CapturedCompactionCheckpointSnapshot = { leafId: string; }; +export type ForkedCompactionCheckpointTranscript = { + sessionId: string; + sessionFile: string; +}; + function trimSessionCheckpoints(checkpoints: SessionCompactionCheckpoint[] | undefined): { kept: SessionCompactionCheckpoint[] | undefined; removed: SessionCompactionCheckpoint[]; @@ -82,7 +92,9 @@ async function readFileRangeAsync( return offset === length ? buffer : buffer.subarray(0, offset); } -async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise { +async function readSessionHeaderFromTranscriptAsync( + sessionFile: string, +): Promise<{ id: string; cwd?: string } | null> { let fileHandle: AsyncTranscriptFileHandle | undefined; try { fileHandle = await fs.open(sessionFile, "r"); @@ -98,10 +110,14 @@ async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Prom if (!firstLine) { return null; } - const parsed = JSON.parse(firstLine) as { type?: unknown; id?: unknown }; - return parsed.type === "session" && typeof parsed.id === "string" && parsed.id.trim() - ? parsed.id.trim() - : null; + const parsed = JSON.parse(firstLine) as { type?: unknown; id?: unknown; cwd?: unknown }; + if (parsed.type !== "session" || typeof parsed.id !== "string" || !parsed.id.trim()) { + return null; + } + return { + id: parsed.id.trim(), + ...(typeof parsed.cwd === "string" && parsed.cwd.trim() ? { cwd: parsed.cwd } : {}), + }; } catch { return null; } finally { @@ -111,6 +127,10 @@ async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Prom } } +async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise { + return (await readSessionHeaderFromTranscriptAsync(sessionFile))?.id ?? null; +} + function parseTranscriptLineId( line: string, ): { kind: "session" } | { kind: "entry"; id: string } | null { @@ -128,6 +148,39 @@ function parseTranscriptLineId( return null; } +async function readTranscriptEntriesForForkAsync( + sessionFile: string, +): Promise { + let fileHandle: AsyncTranscriptFileHandle | undefined; + try { + fileHandle = await fs.open(sessionFile, "r"); + const content = await fileHandle.readFile("utf-8"); + const entries: PiSessionFileEntry[] = []; + for (const line of content.trim().split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed) { + continue; + } + try { + entries.push(JSON.parse(trimmed) as PiSessionFileEntry); + } catch { + // Match pi-coding-agent's loader: malformed JSONL entries are ignored. + } + } + const firstEntry = entries[0] as { type?: unknown; id?: unknown } | undefined; + if (firstEntry?.type !== "session" || typeof firstEntry.id !== "string") { + return null; + } + return entries; + } catch { + return null; + } finally { + if (fileHandle) { + await fileHandle.close().catch(() => undefined); + } + } +} + export async function readSessionLeafIdFromTranscriptAsync( sessionFile: string, maxBytes = MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, @@ -187,77 +240,63 @@ export async function readSessionLeafIdFromTranscriptAsync( return null; } -/** - * Synchronous version — kept for callers that cannot be made async. - * Prefer captureCompactionCheckpointSnapshotAsync for large transcripts - * to avoid blocking the event loop during file copy. - */ -export function captureCompactionCheckpointSnapshot(params: { - sessionManager: Pick; - sessionFile: string; - maxBytes?: number; -}): CapturedCompactionCheckpointSnapshot | null { - const getLeafId = - params.sessionManager && typeof params.sessionManager.getLeafId === "function" - ? params.sessionManager.getLeafId.bind(params.sessionManager) - : null; - const sessionFile = params.sessionFile.trim(); - if (!getLeafId || !sessionFile) { +export async function forkCompactionCheckpointTranscriptAsync(params: { + sourceFile: string; + targetCwd?: string; + sessionDir?: string; +}): Promise { + const sourceFile = params.sourceFile.trim(); + if (!sourceFile) { return null; } - const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES; + const sourceHeader = await readSessionHeaderFromTranscriptAsync(sourceFile); + if (!sourceHeader) { + return null; + } + const entries = await readTranscriptEntriesForForkAsync(sourceFile); + if (!entries) { + return null; + } + migrateSessionEntries(entries); + + const targetCwd = params.targetCwd ?? sourceHeader.cwd ?? process.cwd(); + const sessionDir = params.sessionDir ?? path.dirname(sourceFile); + const sessionId = uuidv7(); + const timestamp = new Date().toISOString(); + const fileTimestamp = timestamp.replace(/[:.]/g, "-"); + const sessionFile = path.join(sessionDir, `${fileTimestamp}_${sessionId}.jsonl`); + const header = { + type: "session", + version: CURRENT_SESSION_VERSION, + id: sessionId, + timestamp, + cwd: targetCwd, + parentSession: sourceFile, + }; + try { - const stat = fsSync.statSync(sessionFile); - if (!stat.isFile() || stat.size > maxBytes) { - return null; + await fs.mkdir(sessionDir, { recursive: true }); + const lines = [JSON.stringify(header)]; + for (const entry of entries) { + if ((entry as { type?: unknown }).type !== "session") { + lines.push(JSON.stringify(entry)); + } } - } catch { - return null; - } - const leafId = getLeafId(); - if (!leafId) { - return null; - } - const parsedSessionFile = path.parse(sessionFile); - const snapshotFile = path.join( - parsedSessionFile.dir, - `${parsedSessionFile.name}.checkpoint.${randomUUID()}${parsedSessionFile.ext || ".jsonl"}`, - ); - try { - fsSync.copyFileSync(sessionFile, snapshotFile); - } catch { - return null; - } - let snapshotSession: SessionManager; - try { - snapshotSession = SessionManager.open(snapshotFile, path.dirname(snapshotFile)); + await fs.writeFile(sessionFile, `${lines.join("\n")}\n`, { encoding: "utf-8", flag: "wx" }); + return { sessionId, sessionFile }; } catch { try { - fsSync.unlinkSync(snapshotFile); + await fs.unlink(sessionFile); } catch { - // Best-effort cleanup if the copied transcript cannot be reopened. + // Best-effort cleanup for partial fork files. } return null; } - const getSessionId = - snapshotSession && typeof snapshotSession.getSessionId === "function" - ? snapshotSession.getSessionId.bind(snapshotSession) - : null; - if (!getSessionId) { - return null; - } - return { - sessionId: getSessionId(), - sessionFile: snapshotFile, - leafId, - }; } /** - * Async version of captureCompactionCheckpointSnapshot that uses async file - * operations to avoid blocking the event loop. Large transcript files (20MB+) - * were observed blocking the event loop for minutes when copied synchronously - * (see issue #75414). + * Capture a bounded pre-compaction transcript snapshot without blocking the + * Gateway event loop on synchronous file reads/copies. */ export async function captureCompactionCheckpointSnapshotAsync(params: { sessionManager?: Pick; diff --git a/src/gateway/session-history-state.test.ts b/src/gateway/session-history-state.test.ts index a6704f5f6d8..4847b8cb0f7 100644 --- a/src/gateway/session-history-state.test.ts +++ b/src/gateway/session-history-state.test.ts @@ -5,7 +5,7 @@ import * as sessionUtils from "./session-utils.js"; describe("SessionHistorySseState", () => { test("uses the initial raw snapshot for both first history and seq seeding", () => { - const readSpy = vi.spyOn(sessionUtils, "readSessionMessages").mockReturnValue([ + const readSpy = vi.spyOn(sessionUtils, "readSessionMessagesAsync").mockResolvedValue([ { role: "assistant", content: [{ type: "text", text: "stale disk message" }], @@ -96,21 +96,11 @@ describe("SessionHistorySseState", () => { expect(snapshot.rawTranscriptSeq).toBe(99); }); - test("refreshes limited SSE history from bounded tail reads", () => { - const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessages").mockReturnValue([]); + test("refreshes limited SSE history from bounded async tail reads", async () => { + const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessagesAsync").mockResolvedValue([]); const tailReadSpy = vi - .spyOn(sessionUtils, "readRecentSessionMessagesWithStats") - .mockReturnValueOnce({ - messages: [ - { - role: "assistant", - content: [{ type: "text", text: "tail one" }], - __openclaw: { seq: 7 }, - }, - ], - totalMessages: 7, - }) - .mockReturnValueOnce({ + .spyOn(sessionUtils, "readRecentSessionMessagesWithStatsAsync") + .mockResolvedValueOnce({ messages: [ { role: "assistant", @@ -121,18 +111,27 @@ describe("SessionHistorySseState", () => { totalMessages: 8, }); try { - const state = new SessionHistorySseState({ + const state = SessionHistorySseState.fromRawSnapshot({ target: { sessionId: "sess-main" }, + rawMessages: [ + { + role: "assistant", + content: [{ type: "text", text: "tail one" }], + __openclaw: { seq: 7 }, + }, + ], + rawTranscriptSeq: 7, + totalRawMessages: 7, limit: 1, }); expect(state.snapshot().messages[0]?.__openclaw?.seq).toBe(7); - const refreshed = state.refresh(); + const refreshed = await state.refreshAsync(); expect(refreshed.hasMore).toBe(true); expect(refreshed.nextCursor).toBe("8"); expect(refreshed.messages[0]?.__openclaw?.seq).toBe(8); - expect(tailReadSpy).toHaveBeenCalledTimes(2); + expect(tailReadSpy).toHaveBeenCalledTimes(1); expect(fullReadSpy).not.toHaveBeenCalled(); } finally { fullReadSpy.mockRestore(); diff --git a/src/gateway/session-history-state.ts b/src/gateway/session-history-state.ts index 3f444cc7452..30e600caf3e 100644 --- a/src/gateway/session-history-state.ts +++ b/src/gateway/session-history-state.ts @@ -4,9 +4,7 @@ import { } from "./chat-display-projection.js"; import { attachOpenClawTranscriptMeta, - readRecentSessionMessagesWithStats, readRecentSessionMessagesWithStatsAsync, - readSessionMessages, readSessionMessagesAsync, } from "./session-utils.js"; @@ -181,12 +179,12 @@ export class SessionHistorySseState { }); } - constructor(params: { + private constructor(params: { target: SessionHistoryTranscriptTarget; maxChars?: number; limit?: number; cursor?: string; - initialRawMessages?: unknown[]; + initialRawMessages: unknown[]; rawTranscriptSeq?: number; totalRawMessages?: number; }) { @@ -194,18 +192,15 @@ export class SessionHistorySseState { this.maxChars = params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS; this.limit = params.limit; this.cursor = params.cursor; - const rawSnapshot = - params.initialRawMessages === undefined - ? this.readRawSnapshot() - : { - rawMessages: params.initialRawMessages, - ...(typeof params.rawTranscriptSeq === "number" - ? { rawTranscriptSeq: params.rawTranscriptSeq } - : {}), - ...(typeof params.totalRawMessages === "number" - ? { totalRawMessages: params.totalRawMessages } - : {}), - }; + const rawSnapshot = { + rawMessages: params.initialRawMessages, + ...(typeof params.rawTranscriptSeq === "number" + ? { rawTranscriptSeq: params.rawTranscriptSeq } + : {}), + ...(typeof params.totalRawMessages === "number" + ? { totalRawMessages: params.totalRawMessages } + : {}), + }; const snapshot = buildSessionHistorySnapshot({ rawMessages: rawSnapshot.rawMessages, maxChars: this.maxChars, @@ -255,25 +250,6 @@ export class SessionHistorySseState { }; } - refresh(): PaginatedSessionHistory { - const rawSnapshot = this.readRawSnapshot(); - const snapshot = buildSessionHistorySnapshot({ - rawMessages: rawSnapshot.rawMessages, - maxChars: this.maxChars, - limit: this.limit, - cursor: this.cursor, - ...(typeof rawSnapshot.rawTranscriptSeq === "number" - ? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq } - : {}), - ...(typeof rawSnapshot.totalRawMessages === "number" - ? { totalRawMessages: rawSnapshot.totalRawMessages } - : {}), - }); - this.rawTranscriptSeq = snapshot.rawTranscriptSeq; - this.sentHistory = snapshot.history; - return snapshot.history; - } - async refreshAsync(): Promise { const rawSnapshot = await this.readRawSnapshotAsync(); const snapshot = buildSessionHistorySnapshot({ @@ -293,33 +269,6 @@ export class SessionHistorySseState { return snapshot.history; } - private readRawSnapshot(): SessionHistoryRawSnapshot { - if (this.cursor === undefined && typeof this.limit === "number") { - const snapshot = readRecentSessionMessagesWithStats( - this.target.sessionId, - this.target.storePath, - this.target.sessionFile, - resolveSessionHistoryTailReadOptions(this.limit), - ); - return { - rawMessages: snapshot.messages, - rawTranscriptSeq: snapshot.totalMessages, - totalRawMessages: snapshot.totalMessages, - }; - } - return { - rawMessages: this.readRawMessages(), - }; - } - - private readRawMessages(): unknown[] { - return readSessionMessages( - this.target.sessionId, - this.target.storePath, - this.target.sessionFile, - ); - } - private async readRawSnapshotAsync(): Promise { if (this.cursor === undefined && typeof this.limit === "number") { const snapshot = await readRecentSessionMessagesWithStatsAsync(