diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index 3185269b7b2..887700b47c3 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -10,6 +10,7 @@ import { captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, forkCompactionCheckpointTranscriptAsync, + MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION, MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, persistSessionCompactionCheckpoint, readSessionLeafIdFromTranscriptAsync, @@ -492,4 +493,96 @@ describe("session-compaction-checkpoints", () => { Object.values(nextStore).find((entry) => entry.compactionCheckpoints)?.compactionCheckpoints, ).toHaveLength(25); }); + + test("persist trims retained checkpoint snapshots by total byte budget", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-byte-trim-")); + tempDirs.push(dir); + + const storePath = path.join(dir, "sessions.json"); + const sessionId = "sess"; + const sessionKey = "agent:main:main"; + const now = Date.now(); + const checkpointSize = Math.floor(MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION / 6); + const existingCheckpoints = await Promise.all( + Array.from({ length: 8 }, async (_, index) => { + const uuid = `${String(index + 1).padStart(8, "0")}-1111-4111-8111-111111111111`; + const sessionFile = path.join(dir, `sess.checkpoint.${uuid}.jsonl`); + await fs.writeFile(sessionFile, "", "utf-8"); + await fs.truncate(sessionFile, checkpointSize); + return { + checkpointId: `old-${index}`, + sessionKey, + sessionId, + createdAt: now + index, + reason: "manual" as const, + preCompaction: { + sessionId, + sessionFile, + leafId: `old-leaf-${index}`, + }, + postCompaction: { sessionId }, + }; + }), + ); + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId, + updatedAt: now, + compactionCheckpoints: existingCheckpoints, + }, + }, + null, + 2, + ), + "utf-8", + ); + + const currentSnapshotFile = path.join( + dir, + "sess.checkpoint.99999999-9999-4999-8999-999999999999.jsonl", + ); + await fs.writeFile(currentSnapshotFile, "", "utf-8"); + await fs.truncate(currentSnapshotFile, checkpointSize); + + await persistSessionCompactionCheckpoint({ + cfg: { + session: { store: storePath }, + agents: { list: [{ id: "main", default: true }] }, + } as OpenClawConfig, + sessionKey: "main", + sessionId, + reason: "manual", + snapshot: { + sessionId, + sessionFile: currentSnapshotFile, + leafId: "current-leaf", + }, + createdAt: now + 100, + }); + + expect(fsSync.existsSync(existingCheckpoints[0].preCompaction.sessionFile)).toBe(false); + expect(fsSync.existsSync(existingCheckpoints[1].preCompaction.sessionFile)).toBe(false); + expect(fsSync.existsSync(existingCheckpoints[2].preCompaction.sessionFile)).toBe(false); + expect(fsSync.existsSync(existingCheckpoints[3].preCompaction.sessionFile)).toBe(true); + expect(fsSync.existsSync(currentSnapshotFile)).toBe(true); + + const nextStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { compactionCheckpoints?: Array<{ checkpointId?: string }> } + >; + const retained = Object.values(nextStore).find( + (entry) => entry.compactionCheckpoints, + )?.compactionCheckpoints; + expect(retained?.map((checkpoint) => checkpoint.checkpointId)).toEqual([ + "old-3", + "old-4", + "old-5", + "old-6", + "old-7", + expect.any(String), + ]); + }); }); diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index d9ce7de532f..b6ef4728381 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -22,6 +22,7 @@ import { resolveGatewaySessionStoreTarget } from "./session-utils.js"; const log = createSubsystemLogger("gateway/session-compaction-checkpoints"); const MAX_COMPACTION_CHECKPOINTS_PER_SESSION = 25; export const MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES = 64 * 1024 * 1024; +export const MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION = 128 * 1024 * 1024; export type CapturedCompactionCheckpointSnapshot = { sessionId: string; @@ -34,17 +35,58 @@ type ForkedCompactionCheckpointTranscript = { sessionFile: string; }; -function trimSessionCheckpoints(checkpoints: SessionCompactionCheckpoint[] | undefined): { +function checkpointSnapshotPath(checkpoint: SessionCompactionCheckpoint): string | undefined { + return checkpoint.preCompaction.sessionFile?.trim() || undefined; +} + +function checkpointSnapshotBytes( + checkpoint: SessionCompactionCheckpoint, + snapshotBytesByPath: ReadonlyMap, +): number { + const sessionFile = checkpointSnapshotPath(checkpoint); + if (!sessionFile) { + return 0; + } + const bytes = snapshotBytesByPath.get(sessionFile); + return typeof bytes === "number" && Number.isFinite(bytes) && bytes > 0 ? bytes : 0; +} + +function trimSessionCheckpoints( + checkpoints: SessionCompactionCheckpoint[] | undefined, + snapshotBytesByPath: ReadonlyMap = new Map(), +): { kept: SessionCompactionCheckpoint[] | undefined; removed: SessionCompactionCheckpoint[]; } { if (!Array.isArray(checkpoints) || checkpoints.length === 0) { return { kept: undefined, removed: [] }; } - const kept = checkpoints.slice(-MAX_COMPACTION_CHECKPOINTS_PER_SESSION); + const countTrimmed = checkpoints.slice(-MAX_COMPACTION_CHECKPOINTS_PER_SESSION); + const countRemoved = checkpoints.slice(0, Math.max(0, checkpoints.length - countTrimmed.length)); + const keptNewestFirst: SessionCompactionCheckpoint[] = []; + const byteRemovedNewestFirst: SessionCompactionCheckpoint[] = []; + let retainedBytes = 0; + for (let index = countTrimmed.length - 1; index >= 0; index -= 1) { + const checkpoint = countTrimmed[index]; + if (!checkpoint) { + continue; + } + const checkpointBytes = checkpointSnapshotBytes(checkpoint, snapshotBytesByPath); + const keepNewestCheckpoint = keptNewestFirst.length === 0; + if ( + keepNewestCheckpoint || + retainedBytes + checkpointBytes <= MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION + ) { + keptNewestFirst.push(checkpoint); + retainedBytes += checkpointBytes; + } else { + byteRemovedNewestFirst.push(checkpoint); + } + } + const kept = keptNewestFirst.toReversed(); return { - kept, - removed: checkpoints.slice(0, Math.max(0, checkpoints.length - kept.length)), + kept: kept.length > 0 ? kept : undefined, + removed: [...countRemoved, ...byteRemovedNewestFirst.toReversed()], }; } @@ -54,6 +96,27 @@ function sessionStoreCheckpoints( return Array.isArray(entry?.compactionCheckpoints) ? [...entry.compactionCheckpoints] : []; } +async function statCheckpointSnapshotBytes( + checkpoints: readonly SessionCompactionCheckpoint[], +): Promise> { + const bytesByPath = new Map(); + await Promise.all( + checkpoints.map(async (checkpoint) => { + const sessionFile = checkpointSnapshotPath(checkpoint); + if (!sessionFile || bytesByPath.has(sessionFile)) { + return; + } + try { + const stat = await fs.stat(sessionFile); + bytesByPath.set(sessionFile, stat.isFile() ? stat.size : 0); + } catch { + bytesByPath.set(sessionFile, 0); + } + }), + ); + return bytesByPath; +} + export function resolveSessionCompactionCheckpointReason(params: { trigger?: "budget" | "overflow" | "manual"; timedOut?: boolean; @@ -443,14 +506,15 @@ export async function persistSessionCompactionCheckpoint(params: { removed: SessionCompactionCheckpoint[]; } | undefined; - await updateSessionStore(target.storePath, (store) => { + await updateSessionStore(target.storePath, async (store) => { const existing = store[target.canonicalKey]; if (!existing?.sessionId) { return; } const checkpoints = sessionStoreCheckpoints(existing); checkpoints.push(checkpoint); - trimmedCheckpoints = trimSessionCheckpoints(checkpoints); + const snapshotBytesByPath = await statCheckpointSnapshotBytes(checkpoints); + trimmedCheckpoints = trimSessionCheckpoints(checkpoints, snapshotBytesByPath); store[target.canonicalKey] = { ...existing, updatedAt: Math.max(existing.updatedAt ?? 0, createdAt),