From d7361eff662db64820839cc3fdcb772a58acbcda Mon Sep 17 00:00:00 2001 From: Galin Iliev Date: Mon, 25 May 2026 16:04:04 -0700 Subject: [PATCH] fix(gateway): cap retained compaction checkpoint bytes Cap retained compaction checkpoint snapshots by total bytes per session while preserving the existing count cap. The gateway now stats retained checkpoint snapshots inside the session-store writer before trimming, deletes older trimmed checkpoint files, and keeps the newest checkpoint available. Regression coverage uses real sparse checkpoint files to prove byte-budget cleanup. Closes #84822. --- .../session-compaction-checkpoints.test.ts | 93 +++++++++++++++++++ src/gateway/session-compaction-checkpoints.ts | 76 +++++++++++++-- 2 files changed, 163 insertions(+), 6 deletions(-) diff --git a/src/gateway/session-compaction-checkpoints.test.ts b/src/gateway/session-compaction-checkpoints.test.ts index 3185269b7b2..887700b47c3 100644 --- a/src/gateway/session-compaction-checkpoints.test.ts +++ b/src/gateway/session-compaction-checkpoints.test.ts @@ -10,6 +10,7 @@ import { captureCompactionCheckpointSnapshotAsync, cleanupCompactionCheckpointSnapshot, forkCompactionCheckpointTranscriptAsync, + MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION, MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES, persistSessionCompactionCheckpoint, readSessionLeafIdFromTranscriptAsync, @@ -492,4 +493,96 @@ describe("session-compaction-checkpoints", () => { Object.values(nextStore).find((entry) => entry.compactionCheckpoints)?.compactionCheckpoints, ).toHaveLength(25); }); + + test("persist trims retained checkpoint snapshots by total byte budget", async () => { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-checkpoint-byte-trim-")); + tempDirs.push(dir); + + const storePath = path.join(dir, "sessions.json"); + const sessionId = "sess"; + const sessionKey = "agent:main:main"; + const now = Date.now(); + const checkpointSize = Math.floor(MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION / 6); + const existingCheckpoints = await Promise.all( + Array.from({ length: 8 }, async (_, index) => { + const uuid = `${String(index + 1).padStart(8, "0")}-1111-4111-8111-111111111111`; + const sessionFile = path.join(dir, `sess.checkpoint.${uuid}.jsonl`); + await fs.writeFile(sessionFile, "", "utf-8"); + await fs.truncate(sessionFile, checkpointSize); + return { + checkpointId: `old-${index}`, + sessionKey, + sessionId, + createdAt: now + index, + reason: "manual" as const, + preCompaction: { + sessionId, + sessionFile, + leafId: `old-leaf-${index}`, + }, + postCompaction: { sessionId }, + }; + }), + ); + await fs.writeFile( + storePath, + JSON.stringify( + { + [sessionKey]: { + sessionId, + updatedAt: now, + compactionCheckpoints: existingCheckpoints, + }, + }, + null, + 2, + ), + "utf-8", + ); + + const currentSnapshotFile = path.join( + dir, + "sess.checkpoint.99999999-9999-4999-8999-999999999999.jsonl", + ); + await fs.writeFile(currentSnapshotFile, "", "utf-8"); + await fs.truncate(currentSnapshotFile, checkpointSize); + + await persistSessionCompactionCheckpoint({ + cfg: { + session: { store: storePath }, + agents: { list: [{ id: "main", default: true }] }, + } as OpenClawConfig, + sessionKey: "main", + sessionId, + reason: "manual", + snapshot: { + sessionId, + sessionFile: currentSnapshotFile, + leafId: "current-leaf", + }, + createdAt: now + 100, + }); + + expect(fsSync.existsSync(existingCheckpoints[0].preCompaction.sessionFile)).toBe(false); + expect(fsSync.existsSync(existingCheckpoints[1].preCompaction.sessionFile)).toBe(false); + expect(fsSync.existsSync(existingCheckpoints[2].preCompaction.sessionFile)).toBe(false); + expect(fsSync.existsSync(existingCheckpoints[3].preCompaction.sessionFile)).toBe(true); + expect(fsSync.existsSync(currentSnapshotFile)).toBe(true); + + const nextStore = JSON.parse(await fs.readFile(storePath, "utf-8")) as Record< + string, + { compactionCheckpoints?: Array<{ checkpointId?: string }> } + >; + const retained = Object.values(nextStore).find( + (entry) => entry.compactionCheckpoints, + )?.compactionCheckpoints; + expect(retained?.map((checkpoint) => checkpoint.checkpointId)).toEqual([ + "old-3", + "old-4", + "old-5", + "old-6", + "old-7", + expect.any(String), + ]); + }); }); diff --git a/src/gateway/session-compaction-checkpoints.ts b/src/gateway/session-compaction-checkpoints.ts index d9ce7de532f..b6ef4728381 100644 --- a/src/gateway/session-compaction-checkpoints.ts +++ b/src/gateway/session-compaction-checkpoints.ts @@ -22,6 +22,7 @@ import { resolveGatewaySessionStoreTarget } from "./session-utils.js"; const log = createSubsystemLogger("gateway/session-compaction-checkpoints"); const MAX_COMPACTION_CHECKPOINTS_PER_SESSION = 25; export const MAX_COMPACTION_CHECKPOINT_SNAPSHOT_BYTES = 64 * 1024 * 1024; +export const MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION = 128 * 1024 * 1024; export type CapturedCompactionCheckpointSnapshot = { sessionId: string; @@ -34,17 +35,58 @@ type ForkedCompactionCheckpointTranscript = { sessionFile: string; }; -function trimSessionCheckpoints(checkpoints: SessionCompactionCheckpoint[] | undefined): { +function checkpointSnapshotPath(checkpoint: SessionCompactionCheckpoint): string | undefined { + return checkpoint.preCompaction.sessionFile?.trim() || undefined; +} + +function checkpointSnapshotBytes( + checkpoint: SessionCompactionCheckpoint, + snapshotBytesByPath: ReadonlyMap, +): number { + const sessionFile = checkpointSnapshotPath(checkpoint); + if (!sessionFile) { + return 0; + } + const bytes = snapshotBytesByPath.get(sessionFile); + return typeof bytes === "number" && Number.isFinite(bytes) && bytes > 0 ? bytes : 0; +} + +function trimSessionCheckpoints( + checkpoints: SessionCompactionCheckpoint[] | undefined, + snapshotBytesByPath: ReadonlyMap = new Map(), +): { kept: SessionCompactionCheckpoint[] | undefined; removed: SessionCompactionCheckpoint[]; } { if (!Array.isArray(checkpoints) || checkpoints.length === 0) { return { kept: undefined, removed: [] }; } - const kept = checkpoints.slice(-MAX_COMPACTION_CHECKPOINTS_PER_SESSION); + const countTrimmed = checkpoints.slice(-MAX_COMPACTION_CHECKPOINTS_PER_SESSION); + const countRemoved = checkpoints.slice(0, Math.max(0, checkpoints.length - countTrimmed.length)); + const keptNewestFirst: SessionCompactionCheckpoint[] = []; + const byteRemovedNewestFirst: SessionCompactionCheckpoint[] = []; + let retainedBytes = 0; + for (let index = countTrimmed.length - 1; index >= 0; index -= 1) { + const checkpoint = countTrimmed[index]; + if (!checkpoint) { + continue; + } + const checkpointBytes = checkpointSnapshotBytes(checkpoint, snapshotBytesByPath); + const keepNewestCheckpoint = keptNewestFirst.length === 0; + if ( + keepNewestCheckpoint || + retainedBytes + checkpointBytes <= MAX_COMPACTION_CHECKPOINT_RETAINED_BYTES_PER_SESSION + ) { + keptNewestFirst.push(checkpoint); + retainedBytes += checkpointBytes; + } else { + byteRemovedNewestFirst.push(checkpoint); + } + } + const kept = keptNewestFirst.toReversed(); return { - kept, - removed: checkpoints.slice(0, Math.max(0, checkpoints.length - kept.length)), + kept: kept.length > 0 ? kept : undefined, + removed: [...countRemoved, ...byteRemovedNewestFirst.toReversed()], }; } @@ -54,6 +96,27 @@ function sessionStoreCheckpoints( return Array.isArray(entry?.compactionCheckpoints) ? [...entry.compactionCheckpoints] : []; } +async function statCheckpointSnapshotBytes( + checkpoints: readonly SessionCompactionCheckpoint[], +): Promise> { + const bytesByPath = new Map(); + await Promise.all( + checkpoints.map(async (checkpoint) => { + const sessionFile = checkpointSnapshotPath(checkpoint); + if (!sessionFile || bytesByPath.has(sessionFile)) { + return; + } + try { + const stat = await fs.stat(sessionFile); + bytesByPath.set(sessionFile, stat.isFile() ? stat.size : 0); + } catch { + bytesByPath.set(sessionFile, 0); + } + }), + ); + return bytesByPath; +} + export function resolveSessionCompactionCheckpointReason(params: { trigger?: "budget" | "overflow" | "manual"; timedOut?: boolean; @@ -443,14 +506,15 @@ export async function persistSessionCompactionCheckpoint(params: { removed: SessionCompactionCheckpoint[]; } | undefined; - await updateSessionStore(target.storePath, (store) => { + await updateSessionStore(target.storePath, async (store) => { const existing = store[target.canonicalKey]; if (!existing?.sessionId) { return; } const checkpoints = sessionStoreCheckpoints(existing); checkpoints.push(checkpoint); - trimmedCheckpoints = trimSessionCheckpoints(checkpoints); + const snapshotBytesByPath = await statCheckpointSnapshotBytes(checkpoints); + trimmedCheckpoints = trimSessionCheckpoints(checkpoints, snapshotBytesByPath); store[target.canonicalKey] = { ...existing, updatedAt: Math.max(existing.updatedAt ?? 0, createdAt),