refactor(gateway): finish async session read paths (#75892)

* refactor(gateway): finish async session read paths

* fix(gateway): migrate async checkpoint forks
This commit is contained in:
Peter Steinberger
2026-05-02 02:58:34 +01:00
committed by GitHub
parent 7ed73f5383
commit 4c9390a36e
10 changed files with 398 additions and 309 deletions

View File

@@ -780,6 +780,7 @@ describe("CodexAppServerEventProjector", () => {
it("fires before_compaction and after_compaction hooks for codex compaction items", async () => {
const { projector, beforeCompaction, afterCompaction } = await createProjectorWithHooks();
const openSpy = vi.spyOn(SessionManager, "open");
await projector.handleNotification(
forCurrentTurn("item/started", {
@@ -791,6 +792,7 @@ describe("CodexAppServerEventProjector", () => {
item: { type: "contextCompaction", id: "compact-1" },
}),
);
expect(openSpy).not.toHaveBeenCalled();
expect(beforeCompaction).toHaveBeenCalledWith(
expect.objectContaining({

View File

@@ -1,5 +1,4 @@
import type { AssistantMessage, Usage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
classifyAgentHarnessTerminalOutcome,
embeddedAgentLog,
@@ -27,6 +26,7 @@ import {
type JsonObject,
type JsonValue,
} from "./protocol.js";
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
export type CodexAppServerToolTelemetry = {
didSendViaMessagingTool: boolean;
@@ -337,7 +337,7 @@ export class CodexAppServerEventProjector {
this.activeCompactionItemIds.add(itemId);
await runAgentHarnessBeforeCompactionHook({
sessionFile: this.params.sessionFile,
messages: this.readMirroredSessionMessages(),
messages: await this.readMirroredSessionMessages(),
ctx: {
runId: this.params.runId,
agentId: this.params.agentId,
@@ -388,7 +388,7 @@ export class CodexAppServerEventProjector {
this.completedCompactionCount += 1;
await runAgentHarnessAfterCompactionHook({
sessionFile: this.params.sessionFile,
messages: this.readMirroredSessionMessages(),
messages: await this.readMirroredSessionMessages(),
compactedCount: -1,
ctx: {
runId: this.params.runId,
@@ -763,12 +763,8 @@ export class CodexAppServerEventProjector {
this.assistantItemOrder.push(itemId);
}
private readMirroredSessionMessages(): AgentMessage[] {
try {
return SessionManager.open(this.params.sessionFile).buildSessionContext().messages;
} catch {
return [];
}
private async readMirroredSessionMessages(): Promise<AgentMessage[]> {
return (await readCodexMirroredSessionHistoryMessages(this.params.sessionFile)) ?? [];
}
private createAssistantMessage(text: string): AssistantMessage {

View File

@@ -74,6 +74,7 @@ import {
type JsonValue,
} from "./protocol.js";
import { readCodexAppServerBinding, type CodexAppServerThreadBinding } from "./session-binding.js";
import { readCodexMirroredSessionHistoryMessages } from "./session-history.js";
import { clearSharedCodexAppServerClient } from "./shared-client.js";
import {
buildDeveloperInstructions,
@@ -400,9 +401,9 @@ export async function runCodexAppServerAttempt(
},
});
const hadSessionFile = await fileExists(params.sessionFile);
const sessionManager = SessionManager.open(params.sessionFile);
const sessionManager = activeContextEngine ? SessionManager.open(params.sessionFile) : undefined;
let historyMessages =
readMirroredSessionHistoryMessages(params.sessionFile, sessionManager) ?? [];
(await readMirroredSessionHistoryMessages(params.sessionFile, sessionManager)) ?? [];
const hookContext = {
runId: params.runId,
agentId: sessionAgentId,
@@ -430,7 +431,8 @@ export async function runCodexAppServerAttempt(
runMaintenance: runHarnessContextEngineMaintenance,
warn: (message) => embeddedAgentLog.warn(message),
});
historyMessages = readMirroredSessionHistoryMessages(params.sessionFile) ?? historyMessages;
historyMessages =
(await readMirroredSessionHistoryMessages(params.sessionFile)) ?? historyMessages;
}
const baseDeveloperInstructions = buildDeveloperInstructions(params);
let promptText = params.prompt;
@@ -1097,7 +1099,7 @@ export async function runCodexAppServerAttempt(
}
if (activeContextEngine) {
const finalMessages =
readMirroredSessionHistoryMessages(params.sessionFile) ??
(await readMirroredSessionHistoryMessages(params.sessionFile)) ??
historyMessages.concat(result.messagesSnapshot);
await finalizeHarnessContextEngineTurn({
contextEngine: activeContextEngine,
@@ -1553,19 +1555,17 @@ function readString(record: JsonObject, key: string): string | undefined {
return typeof value === "string" ? value : undefined;
}
function readMirroredSessionHistoryMessages(
async function readMirroredSessionHistoryMessages(
sessionFile: string,
sessionManager?: SessionManager,
): AgentMessage[] | undefined {
try {
return (sessionManager ?? SessionManager.open(sessionFile)).buildSessionContext().messages;
} catch (error) {
sessionManager?: Pick<SessionManager, "buildSessionContext">,
): Promise<AgentMessage[] | undefined> {
const messages = await readCodexMirroredSessionHistoryMessages(sessionFile, sessionManager);
if (!messages) {
embeddedAgentLog.warn("failed to read mirrored session history for codex harness hooks", {
error,
sessionFile,
});
return undefined;
}
return messages;
}
async function mirrorTranscriptBestEffort(params: {

View File

@@ -0,0 +1,44 @@
import fs from "node:fs/promises";
import type { SessionEntry, SessionManager } from "@mariozechner/pi-coding-agent";
import {
buildSessionContext,
migrateSessionEntries,
parseSessionEntries,
} from "@mariozechner/pi-coding-agent";
import type { AgentMessage } from "openclaw/plugin-sdk/agent-harness-runtime";
function isMissingFileError(error: unknown): boolean {
return Boolean(
error &&
typeof error === "object" &&
"code" in error &&
(error as { code?: unknown }).code === "ENOENT",
);
}
export async function readCodexMirroredSessionHistoryMessages(
sessionFile: string,
sessionManager?: Pick<SessionManager, "buildSessionContext">,
): Promise<AgentMessage[] | undefined> {
try {
if (sessionManager) {
return sessionManager.buildSessionContext().messages;
}
const raw = await fs.readFile(sessionFile, "utf-8");
const entries = parseSessionEntries(raw);
const firstEntry = entries[0] as { type?: unknown; id?: unknown } | undefined;
if (firstEntry?.type !== "session" || typeof firstEntry.id !== "string") {
return undefined;
}
migrateSessionEntries(entries);
const sessionEntries = entries.filter(
(entry): entry is SessionEntry => entry.type !== "session",
);
return buildSessionContext(sessionEntries).messages;
} catch (error) {
if (isMissingFileError(error)) {
return [];
}
return undefined;
}
}

View File

@@ -1,7 +1,7 @@
import { randomUUID } from "node:crypto";
import fs from "node:fs";
import path from "node:path";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { CURRENT_SESSION_VERSION } from "@mariozechner/pi-coding-agent";
import { resolveAgentRuntimeMetadata } from "../../agents/agent-runtime-metadata.js";
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../../agents/agent-scope.js";
import {
@@ -66,6 +66,7 @@ import {
} from "../protocol/index.js";
import { resolveSessionKeyForRun } from "../server-session-key.js";
import {
forkCompactionCheckpointTranscriptAsync,
getSessionCompactionCheckpoint,
listSessionCompactionCheckpoints,
} from "../session-compaction-checkpoints.js";
@@ -1089,26 +1090,11 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
if (!fs.existsSync(checkpoint.preCompaction.sessionFile)) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, "checkpoint snapshot transcript is missing"),
);
return;
}
const snapshotSession = SessionManager.open(
checkpoint.preCompaction.sessionFile,
path.dirname(checkpoint.preCompaction.sessionFile),
);
const branchedSession = SessionManager.forkFrom(
checkpoint.preCompaction.sessionFile,
snapshotSession.getCwd(),
path.dirname(checkpoint.preCompaction.sessionFile),
);
const branchedSessionFile = branchedSession.getSessionFile();
if (!branchedSessionFile) {
const branchedSession = await forkCompactionCheckpointTranscriptAsync({
sourceFile: checkpoint.preCompaction.sessionFile,
sessionDir: path.dirname(checkpoint.preCompaction.sessionFile),
});
if (!branchedSession?.sessionFile) {
respond(
false,
undefined,
@@ -1120,8 +1106,8 @@ export const sessionsHandlers: GatewayRequestHandlers = {
const label = entry.label?.trim() ? `${entry.label.trim()} (checkpoint)` : "Checkpoint branch";
const nextEntry = cloneCheckpointSessionEntry({
currentEntry: entry,
nextSessionId: branchedSession.getSessionId(),
nextSessionFile: branchedSessionFile,
nextSessionId: branchedSession.sessionId,
nextSessionFile: branchedSession.sessionFile,
label,
parentSessionKey: canonicalKey,
totalTokens: checkpoint.tokensBefore,
@@ -1203,15 +1189,6 @@ export const sessionsHandlers: GatewayRequestHandlers = {
);
return;
}
if (!fs.existsSync(checkpoint.preCompaction.sessionFile)) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, "checkpoint snapshot transcript is missing"),
);
return;
}
const interruptResult = await interruptSessionRunIfActive({
req,
context,
@@ -1226,17 +1203,11 @@ export const sessionsHandlers: GatewayRequestHandlers = {
return;
}
const snapshotSession = SessionManager.open(
checkpoint.preCompaction.sessionFile,
path.dirname(checkpoint.preCompaction.sessionFile),
);
const restoredSession = SessionManager.forkFrom(
checkpoint.preCompaction.sessionFile,
snapshotSession.getCwd(),
path.dirname(checkpoint.preCompaction.sessionFile),
);
const restoredSessionFile = restoredSession.getSessionFile();
if (!restoredSessionFile) {
const restoredSession = await forkCompactionCheckpointTranscriptAsync({
sourceFile: checkpoint.preCompaction.sessionFile,
sessionDir: path.dirname(checkpoint.preCompaction.sessionFile),
});
if (!restoredSession?.sessionFile) {
respond(
false,
undefined,
@@ -1246,8 +1217,8 @@ export const sessionsHandlers: GatewayRequestHandlers = {
}
const nextEntry = cloneCheckpointSessionEntry({
currentEntry: entry,
nextSessionId: restoredSession.getSessionId(),
nextSessionFile: restoredSessionFile,
nextSessionId: restoredSession.sessionId,
nextSessionFile: restoredSession.sessionFile,
totalTokens: checkpoint.tokensBefore,
preserveCompactionCheckpoints: true,
});

View File

@@ -1,7 +1,7 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { expect, test } from "vitest";
import { expect, test, vi } from "vitest";
import { withEnvAsync } from "../test-utils/env.js";
import {
embeddedRunMock,
@@ -106,15 +106,34 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre-
fixture.preCompactionSessionFile,
);
const branched = await rpcReq<{
ok: true;
sourceKey: string;
key: string;
entry: { sessionId: string; sessionFile?: string; parentSessionKey?: string };
}>(ws, "sessions.compaction.branch", {
key: "main",
checkpointId: "checkpoint-1",
});
const sessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
const sessionManagerForkFromSpy = vi.spyOn(SessionManager, "forkFrom");
let branched: Awaited<
ReturnType<
typeof rpcReq<{
ok: true;
sourceKey: string;
key: string;
entry: { sessionId: string; sessionFile?: string; parentSessionKey?: string };
}>
>
>;
try {
branched = await rpcReq<{
ok: true;
sourceKey: string;
key: string;
entry: { sessionId: string; sessionFile?: string; parentSessionKey?: string };
}>(ws, "sessions.compaction.branch", {
key: "main",
checkpointId: "checkpoint-1",
});
expect(sessionManagerOpenSpy).not.toHaveBeenCalled();
expect(sessionManagerForkFromSpy).not.toHaveBeenCalled();
} finally {
sessionManagerOpenSpy.mockRestore();
sessionManagerForkFromSpy.mockRestore();
}
expect(branched.ok).toBe(true);
expect(branched.payload?.sourceKey).toBe("agent:main:main");
expect(branched.payload?.entry.parentSessionKey).toBe("agent:main:main");
@@ -137,15 +156,34 @@ test("sessions.compaction.* lists checkpoints and branches or restores from pre-
expect(branchedEntry?.parentSessionKey).toBe("agent:main:main");
expect(branchedEntry?.compactionCheckpoints).toBeUndefined();
const restored = await rpcReq<{
ok: true;
key: string;
sessionId: string;
entry: { sessionId: string; sessionFile?: string; compactionCheckpoints?: unknown[] };
}>(ws, "sessions.compaction.restore", {
key: "main",
checkpointId: "checkpoint-1",
});
const restoreSessionManagerOpenSpy = vi.spyOn(SessionManager, "open");
const restoreSessionManagerForkFromSpy = vi.spyOn(SessionManager, "forkFrom");
let restored: Awaited<
ReturnType<
typeof rpcReq<{
ok: true;
key: string;
sessionId: string;
entry: { sessionId: string; sessionFile?: string; compactionCheckpoints?: unknown[] };
}>
>
>;
try {
restored = await rpcReq<{
ok: true;
key: string;
sessionId: string;
entry: { sessionId: string; sessionFile?: string; compactionCheckpoints?: unknown[] };
}>(ws, "sessions.compaction.restore", {
key: "main",
checkpointId: "checkpoint-1",
});
expect(restoreSessionManagerOpenSpy).not.toHaveBeenCalled();
expect(restoreSessionManagerForkFromSpy).not.toHaveBeenCalled();
} finally {
restoreSessionManagerOpenSpy.mockRestore();
restoreSessionManagerForkFromSpy.mockRestore();
}
expect(restored.ok).toBe(true);
expect(restored.payload?.key).toBe("agent:main:main");
expect(restored.payload?.sessionId).not.toBe(fixture.sessionId);

View File

@@ -2,14 +2,14 @@ import fsSync from "node:fs";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { AssistantMessage, UserMessage } from "@mariozechner/pi-ai";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { CURRENT_SESSION_VERSION, SessionManager } from "@mariozechner/pi-coding-agent";
import { afterEach, describe, expect, test, vi } from "vitest";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import {
captureCompactionCheckpointSnapshot,
captureCompactionCheckpointSnapshotAsync,
cleanupCompactionCheckpointSnapshot,
forkCompactionCheckpointTranscriptAsync,
MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
persistSessionCompactionCheckpoint,
readSessionLeafIdFromTranscriptAsync,
@@ -22,71 +22,6 @@ afterEach(async () => {
});
describe("session-compaction-checkpoints", () => {
test("capture stores the copied pre-compaction transcript path and cleanup removes only the copy", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-"));
tempDirs.push(dir);
const session = SessionManager.create(dir, dir);
const userMessage: UserMessage = {
role: "user",
content: "before compaction",
timestamp: Date.now(),
};
const assistantMessage: AssistantMessage = {
role: "assistant",
content: [{ type: "text", text: "working on it" }],
api: "responses",
provider: "openai",
model: "gpt-test",
usage: {
input: 1,
output: 1,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 2,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
},
stopReason: "stop",
timestamp: Date.now(),
};
session.appendMessage(userMessage);
session.appendMessage(assistantMessage);
const sessionFile = session.getSessionFile();
const leafId = session.getLeafId();
expect(sessionFile).toBeTruthy();
expect(leafId).toBeTruthy();
const originalBefore = await fs.readFile(sessionFile!, "utf-8");
const snapshot = captureCompactionCheckpointSnapshot({
sessionManager: session,
sessionFile: sessionFile!,
});
expect(snapshot).not.toBeNull();
expect(snapshot?.leafId).toBe(leafId);
expect(snapshot?.sessionFile).not.toBe(sessionFile);
expect(snapshot?.sessionFile).toContain(".checkpoint.");
expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(true);
expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore);
session.appendCompaction("checkpoint summary", leafId!, 123, { ok: true });
expect(await fs.readFile(snapshot!.sessionFile, "utf-8")).toBe(originalBefore);
expect(await fs.readFile(sessionFile!, "utf-8")).not.toBe(originalBefore);
await cleanupCompactionCheckpointSnapshot(snapshot);
expect(fsSync.existsSync(snapshot!.sessionFile)).toBe(false);
expect(fsSync.existsSync(sessionFile!)).toBe(true);
});
test("async capture stores the copied pre-compaction transcript without sync copy", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-async-"));
tempDirs.push(dir);
@@ -225,29 +160,145 @@ describe("session-compaction-checkpoints", () => {
}
});
test("capture skips oversized pre-compaction transcripts", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-oversized-"));
test("async fork creates a checkpoint branch transcript without SessionManager sync reads", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-fork-"));
tempDirs.push(dir);
const session = SessionManager.create(dir, dir);
session.appendMessage({
role: "user",
content: "before compaction",
content: "before checkpoint fork",
timestamp: Date.now(),
});
session.appendMessage({
role: "assistant",
content: "fork me",
api: "responses",
provider: "openai",
model: "gpt-test",
timestamp: Date.now(),
} as unknown as AssistantMessage);
const sessionFile = session.getSessionFile();
expect(sessionFile).toBeTruthy();
await fs.appendFile(sessionFile!, "x".repeat(128), "utf-8");
await fs.appendFile(sessionFile!, "\nnot-json\n", "utf-8");
const snapshot = captureCompactionCheckpointSnapshot({
sessionManager: session,
sessionFile: sessionFile!,
maxBytes: 64,
const openSpy = vi.spyOn(SessionManager, "open");
const forkSpy = vi.spyOn(SessionManager, "forkFrom");
let forked: Awaited<ReturnType<typeof forkCompactionCheckpointTranscriptAsync>> = null;
try {
forked = await forkCompactionCheckpointTranscriptAsync({
sourceFile: sessionFile!,
sessionDir: dir,
});
expect(openSpy).not.toHaveBeenCalled();
expect(forkSpy).not.toHaveBeenCalled();
expect(forked).not.toBeNull();
expect(forked?.sessionFile).not.toBe(sessionFile);
expect(forked?.sessionId).toBeTruthy();
} finally {
openSpy.mockRestore();
forkSpy.mockRestore();
}
const forkedLines = (await fs.readFile(forked!.sessionFile, "utf-8")).trim().split(/\r?\n/);
const forkedEntries = forkedLines.map((line) => JSON.parse(line) as Record<string, unknown>);
const sourceEntries = (await fs.readFile(sessionFile!, "utf-8"))
.trim()
.split(/\r?\n/)
.flatMap((line) => {
try {
return [JSON.parse(line) as Record<string, unknown>];
} catch {
return [];
}
});
expect(forkedEntries[0]).toMatchObject({
type: "session",
id: forked!.sessionId,
cwd: dir,
parentSession: sessionFile,
});
expect(forkedEntries.slice(1)).toEqual(
sourceEntries.filter((entry) => entry.type !== "session"),
);
});
test("async fork migrates legacy checkpoint snapshots before writing a current header", async () => {
const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-legacy-fork-"));
tempDirs.push(dir);
const legacySessionFile = path.join(dir, "legacy.jsonl");
const firstMessage = {
type: "message",
timestamp: new Date(0).toISOString(),
message: {
role: "user",
content: "legacy first",
timestamp: 1,
},
};
const secondMessage = {
type: "message",
timestamp: new Date(1).toISOString(),
message: {
role: "assistant",
content: "legacy second",
api: "responses",
provider: "openai",
model: "gpt-test",
timestamp: 2,
},
};
await fs.writeFile(
legacySessionFile,
[
JSON.stringify({
type: "session",
id: "legacy-session",
timestamp: new Date(0).toISOString(),
cwd: dir,
}),
JSON.stringify(firstMessage),
JSON.stringify(secondMessage),
"",
].join("\n"),
"utf-8",
);
const forked = await forkCompactionCheckpointTranscriptAsync({
sourceFile: legacySessionFile,
sessionDir: dir,
});
expect(snapshot).toBeNull();
expect(MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES).toBeGreaterThan(64);
expect(fsSync.readdirSync(dir).filter((file) => file.includes(".checkpoint."))).toEqual([]);
expect(forked).not.toBeNull();
const forkedEntries = (await fs.readFile(forked!.sessionFile, "utf-8"))
.trim()
.split(/\r?\n/)
.map((line) => JSON.parse(line) as Record<string, unknown>);
expect(forkedEntries[0]).toMatchObject({
type: "session",
version: CURRENT_SESSION_VERSION,
id: forked!.sessionId,
parentSession: legacySessionFile,
});
expect(forkedEntries[1]).toMatchObject({
type: "message",
parentId: null,
message: expect.objectContaining({ content: "legacy first" }),
});
expect(forkedEntries[1]?.id).toEqual(expect.any(String));
expect(forkedEntries[2]).toMatchObject({
type: "message",
parentId: forkedEntries[1]?.id,
message: expect.objectContaining({ content: "legacy second" }),
});
expect(forkedEntries[2]?.id).toEqual(expect.any(String));
const messages = SessionManager.open(forked!.sessionFile, dir).buildSessionContext().messages;
expect(messages.map((message) => message.content)).toEqual(["legacy first", "legacy second"]);
});
test("persist trims old checkpoint metadata and removes trimmed snapshot files", async () => {

View File

@@ -1,8 +1,13 @@
import { randomUUID } from "node:crypto";
import fsSync from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import { SessionManager } from "@mariozechner/pi-coding-agent";
import {
CURRENT_SESSION_VERSION,
migrateSessionEntries,
SessionManager,
type FileEntry as PiSessionFileEntry,
} from "@mariozechner/pi-coding-agent";
import { v7 as uuidv7 } from "uuid";
import { updateSessionStore } from "../config/sessions.js";
import type {
SessionCompactionCheckpoint,
@@ -24,6 +29,11 @@ export type CapturedCompactionCheckpointSnapshot = {
leafId: string;
};
export type ForkedCompactionCheckpointTranscript = {
sessionId: string;
sessionFile: string;
};
function trimSessionCheckpoints(checkpoints: SessionCompactionCheckpoint[] | undefined): {
kept: SessionCompactionCheckpoint[] | undefined;
removed: SessionCompactionCheckpoint[];
@@ -82,7 +92,9 @@ async function readFileRangeAsync(
return offset === length ? buffer : buffer.subarray(0, offset);
}
async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise<string | null> {
async function readSessionHeaderFromTranscriptAsync(
sessionFile: string,
): Promise<{ id: string; cwd?: string } | null> {
let fileHandle: AsyncTranscriptFileHandle | undefined;
try {
fileHandle = await fs.open(sessionFile, "r");
@@ -98,10 +110,14 @@ async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Prom
if (!firstLine) {
return null;
}
const parsed = JSON.parse(firstLine) as { type?: unknown; id?: unknown };
return parsed.type === "session" && typeof parsed.id === "string" && parsed.id.trim()
? parsed.id.trim()
: null;
const parsed = JSON.parse(firstLine) as { type?: unknown; id?: unknown; cwd?: unknown };
if (parsed.type !== "session" || typeof parsed.id !== "string" || !parsed.id.trim()) {
return null;
}
return {
id: parsed.id.trim(),
...(typeof parsed.cwd === "string" && parsed.cwd.trim() ? { cwd: parsed.cwd } : {}),
};
} catch {
return null;
} finally {
@@ -111,6 +127,10 @@ async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Prom
}
}
async function readSessionIdFromTranscriptHeaderAsync(sessionFile: string): Promise<string | null> {
return (await readSessionHeaderFromTranscriptAsync(sessionFile))?.id ?? null;
}
function parseTranscriptLineId(
line: string,
): { kind: "session" } | { kind: "entry"; id: string } | null {
@@ -128,6 +148,39 @@ function parseTranscriptLineId(
return null;
}
async function readTranscriptEntriesForForkAsync(
sessionFile: string,
): Promise<PiSessionFileEntry[] | null> {
let fileHandle: AsyncTranscriptFileHandle | undefined;
try {
fileHandle = await fs.open(sessionFile, "r");
const content = await fileHandle.readFile("utf-8");
const entries: PiSessionFileEntry[] = [];
for (const line of content.trim().split(/\r?\n/)) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
entries.push(JSON.parse(trimmed) as PiSessionFileEntry);
} catch {
// Match pi-coding-agent's loader: malformed JSONL entries are ignored.
}
}
const firstEntry = entries[0] as { type?: unknown; id?: unknown } | undefined;
if (firstEntry?.type !== "session" || typeof firstEntry.id !== "string") {
return null;
}
return entries;
} catch {
return null;
} finally {
if (fileHandle) {
await fileHandle.close().catch(() => undefined);
}
}
}
export async function readSessionLeafIdFromTranscriptAsync(
sessionFile: string,
maxBytes = MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES,
@@ -187,77 +240,63 @@ export async function readSessionLeafIdFromTranscriptAsync(
return null;
}
/**
* Synchronous version — kept for callers that cannot be made async.
* Prefer captureCompactionCheckpointSnapshotAsync for large transcripts
* to avoid blocking the event loop during file copy.
*/
export function captureCompactionCheckpointSnapshot(params: {
sessionManager: Pick<SessionManager, "getLeafId">;
sessionFile: string;
maxBytes?: number;
}): CapturedCompactionCheckpointSnapshot | null {
const getLeafId =
params.sessionManager && typeof params.sessionManager.getLeafId === "function"
? params.sessionManager.getLeafId.bind(params.sessionManager)
: null;
const sessionFile = params.sessionFile.trim();
if (!getLeafId || !sessionFile) {
export async function forkCompactionCheckpointTranscriptAsync(params: {
sourceFile: string;
targetCwd?: string;
sessionDir?: string;
}): Promise<ForkedCompactionCheckpointTranscript | null> {
const sourceFile = params.sourceFile.trim();
if (!sourceFile) {
return null;
}
const maxBytes = params.maxBytes ?? MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES;
const sourceHeader = await readSessionHeaderFromTranscriptAsync(sourceFile);
if (!sourceHeader) {
return null;
}
const entries = await readTranscriptEntriesForForkAsync(sourceFile);
if (!entries) {
return null;
}
migrateSessionEntries(entries);
const targetCwd = params.targetCwd ?? sourceHeader.cwd ?? process.cwd();
const sessionDir = params.sessionDir ?? path.dirname(sourceFile);
const sessionId = uuidv7();
const timestamp = new Date().toISOString();
const fileTimestamp = timestamp.replace(/[:.]/g, "-");
const sessionFile = path.join(sessionDir, `${fileTimestamp}_${sessionId}.jsonl`);
const header = {
type: "session",
version: CURRENT_SESSION_VERSION,
id: sessionId,
timestamp,
cwd: targetCwd,
parentSession: sourceFile,
};
try {
const stat = fsSync.statSync(sessionFile);
if (!stat.isFile() || stat.size > maxBytes) {
return null;
await fs.mkdir(sessionDir, { recursive: true });
const lines = [JSON.stringify(header)];
for (const entry of entries) {
if ((entry as { type?: unknown }).type !== "session") {
lines.push(JSON.stringify(entry));
}
}
} catch {
return null;
}
const leafId = getLeafId();
if (!leafId) {
return null;
}
const parsedSessionFile = path.parse(sessionFile);
const snapshotFile = path.join(
parsedSessionFile.dir,
`${parsedSessionFile.name}.checkpoint.${randomUUID()}${parsedSessionFile.ext || ".jsonl"}`,
);
try {
fsSync.copyFileSync(sessionFile, snapshotFile);
} catch {
return null;
}
let snapshotSession: SessionManager;
try {
snapshotSession = SessionManager.open(snapshotFile, path.dirname(snapshotFile));
await fs.writeFile(sessionFile, `${lines.join("\n")}\n`, { encoding: "utf-8", flag: "wx" });
return { sessionId, sessionFile };
} catch {
try {
fsSync.unlinkSync(snapshotFile);
await fs.unlink(sessionFile);
} catch {
// Best-effort cleanup if the copied transcript cannot be reopened.
// Best-effort cleanup for partial fork files.
}
return null;
}
const getSessionId =
snapshotSession && typeof snapshotSession.getSessionId === "function"
? snapshotSession.getSessionId.bind(snapshotSession)
: null;
if (!getSessionId) {
return null;
}
return {
sessionId: getSessionId(),
sessionFile: snapshotFile,
leafId,
};
}
/**
* Async version of captureCompactionCheckpointSnapshot that uses async file
* operations to avoid blocking the event loop. Large transcript files (20MB+)
* were observed blocking the event loop for minutes when copied synchronously
* (see issue #75414).
* Capture a bounded pre-compaction transcript snapshot without blocking the
* Gateway event loop on synchronous file reads/copies.
*/
export async function captureCompactionCheckpointSnapshotAsync(params: {
sessionManager?: Pick<SessionManager, "getLeafId">;

View File

@@ -5,7 +5,7 @@ import * as sessionUtils from "./session-utils.js";
describe("SessionHistorySseState", () => {
test("uses the initial raw snapshot for both first history and seq seeding", () => {
const readSpy = vi.spyOn(sessionUtils, "readSessionMessages").mockReturnValue([
const readSpy = vi.spyOn(sessionUtils, "readSessionMessagesAsync").mockResolvedValue([
{
role: "assistant",
content: [{ type: "text", text: "stale disk message" }],
@@ -96,21 +96,11 @@ describe("SessionHistorySseState", () => {
expect(snapshot.rawTranscriptSeq).toBe(99);
});
test("refreshes limited SSE history from bounded tail reads", () => {
const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessages").mockReturnValue([]);
test("refreshes limited SSE history from bounded async tail reads", async () => {
const fullReadSpy = vi.spyOn(sessionUtils, "readSessionMessagesAsync").mockResolvedValue([]);
const tailReadSpy = vi
.spyOn(sessionUtils, "readRecentSessionMessagesWithStats")
.mockReturnValueOnce({
messages: [
{
role: "assistant",
content: [{ type: "text", text: "tail one" }],
__openclaw: { seq: 7 },
},
],
totalMessages: 7,
})
.mockReturnValueOnce({
.spyOn(sessionUtils, "readRecentSessionMessagesWithStatsAsync")
.mockResolvedValueOnce({
messages: [
{
role: "assistant",
@@ -121,18 +111,27 @@ describe("SessionHistorySseState", () => {
totalMessages: 8,
});
try {
const state = new SessionHistorySseState({
const state = SessionHistorySseState.fromRawSnapshot({
target: { sessionId: "sess-main" },
rawMessages: [
{
role: "assistant",
content: [{ type: "text", text: "tail one" }],
__openclaw: { seq: 7 },
},
],
rawTranscriptSeq: 7,
totalRawMessages: 7,
limit: 1,
});
expect(state.snapshot().messages[0]?.__openclaw?.seq).toBe(7);
const refreshed = state.refresh();
const refreshed = await state.refreshAsync();
expect(refreshed.hasMore).toBe(true);
expect(refreshed.nextCursor).toBe("8");
expect(refreshed.messages[0]?.__openclaw?.seq).toBe(8);
expect(tailReadSpy).toHaveBeenCalledTimes(2);
expect(tailReadSpy).toHaveBeenCalledTimes(1);
expect(fullReadSpy).not.toHaveBeenCalled();
} finally {
fullReadSpy.mockRestore();

View File

@@ -4,9 +4,7 @@ import {
} from "./chat-display-projection.js";
import {
attachOpenClawTranscriptMeta,
readRecentSessionMessagesWithStats,
readRecentSessionMessagesWithStatsAsync,
readSessionMessages,
readSessionMessagesAsync,
} from "./session-utils.js";
@@ -181,12 +179,12 @@ export class SessionHistorySseState {
});
}
constructor(params: {
private constructor(params: {
target: SessionHistoryTranscriptTarget;
maxChars?: number;
limit?: number;
cursor?: string;
initialRawMessages?: unknown[];
initialRawMessages: unknown[];
rawTranscriptSeq?: number;
totalRawMessages?: number;
}) {
@@ -194,18 +192,15 @@ export class SessionHistorySseState {
this.maxChars = params.maxChars ?? DEFAULT_CHAT_HISTORY_TEXT_MAX_CHARS;
this.limit = params.limit;
this.cursor = params.cursor;
const rawSnapshot =
params.initialRawMessages === undefined
? this.readRawSnapshot()
: {
rawMessages: params.initialRawMessages,
...(typeof params.rawTranscriptSeq === "number"
? { rawTranscriptSeq: params.rawTranscriptSeq }
: {}),
...(typeof params.totalRawMessages === "number"
? { totalRawMessages: params.totalRawMessages }
: {}),
};
const rawSnapshot = {
rawMessages: params.initialRawMessages,
...(typeof params.rawTranscriptSeq === "number"
? { rawTranscriptSeq: params.rawTranscriptSeq }
: {}),
...(typeof params.totalRawMessages === "number"
? { totalRawMessages: params.totalRawMessages }
: {}),
};
const snapshot = buildSessionHistorySnapshot({
rawMessages: rawSnapshot.rawMessages,
maxChars: this.maxChars,
@@ -255,25 +250,6 @@ export class SessionHistorySseState {
};
}
refresh(): PaginatedSessionHistory {
const rawSnapshot = this.readRawSnapshot();
const snapshot = buildSessionHistorySnapshot({
rawMessages: rawSnapshot.rawMessages,
maxChars: this.maxChars,
limit: this.limit,
cursor: this.cursor,
...(typeof rawSnapshot.rawTranscriptSeq === "number"
? { rawTranscriptSeq: rawSnapshot.rawTranscriptSeq }
: {}),
...(typeof rawSnapshot.totalRawMessages === "number"
? { totalRawMessages: rawSnapshot.totalRawMessages }
: {}),
});
this.rawTranscriptSeq = snapshot.rawTranscriptSeq;
this.sentHistory = snapshot.history;
return snapshot.history;
}
async refreshAsync(): Promise<PaginatedSessionHistory> {
const rawSnapshot = await this.readRawSnapshotAsync();
const snapshot = buildSessionHistorySnapshot({
@@ -293,33 +269,6 @@ export class SessionHistorySseState {
return snapshot.history;
}
private readRawSnapshot(): SessionHistoryRawSnapshot {
if (this.cursor === undefined && typeof this.limit === "number") {
const snapshot = readRecentSessionMessagesWithStats(
this.target.sessionId,
this.target.storePath,
this.target.sessionFile,
resolveSessionHistoryTailReadOptions(this.limit),
);
return {
rawMessages: snapshot.messages,
rawTranscriptSeq: snapshot.totalMessages,
totalRawMessages: snapshot.totalMessages,
};
}
return {
rawMessages: this.readRawMessages(),
};
}
private readRawMessages(): unknown[] {
return readSessionMessages(
this.target.sessionId,
this.target.storePath,
this.target.sessionFile,
);
}
private async readRawSnapshotAsync(): Promise<SessionHistoryRawSnapshot> {
if (this.cursor === undefined && typeof this.limit === "number") {
const snapshot = await readRecentSessionMessagesWithStatsAsync(