From 8682d0701b2770b1dec12b6677fffa463576eff0 Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Mon, 15 Jun 2026 04:11:50 -0700 Subject: [PATCH] perf(sessions): share one enumeration across archive retention sweeps (#91957) Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com> --- src/config/sessions/disk-budget.ts | 44 +++++--- src/config/sessions/store.ts | 19 ++-- src/cron/session-reaper.ts | 3 +- ...ranscript-files.fs.archive-cleanup.test.ts | 104 ++++++++++++++++++ src/gateway/session-transcript-files.fs.ts | 48 +++++--- 5 files changed, 171 insertions(+), 47 deletions(-) create mode 100644 src/gateway/session-transcript-files.fs.archive-cleanup.test.ts diff --git a/src/config/sessions/disk-budget.ts b/src/config/sessions/disk-budget.ts index 28d61c270e6..19b52c46b17 100644 --- a/src/config/sessions/disk-budget.ts +++ b/src/config/sessions/disk-budget.ts @@ -9,6 +9,7 @@ import { resolveTrajectoryFilePath, resolveTrajectoryPointerFilePath, } from "../../trajectory/paths.js"; +import { runTasksWithConcurrency } from "../../utils/run-with-concurrency.js"; import { isCompactionCheckpointTranscriptFileName, isPrimarySessionTranscriptFileName, @@ -215,29 +216,36 @@ function resolveReferencedSessionArtifactPaths(params: { return referenced; } +const SESSIONS_DIR_STAT_CONCURRENCY = 8; + async function readSessionsDirFiles(sessionsDir: string): Promise { const dirEntries = await fs.promises .readdir(sessionsDir, { withFileTypes: true }) .catch(() => []); - const files: SessionsDirFileStat[] = []; - for (const dirent of dirEntries) { - if (!dirent.isFile()) { - continue; - } - const filePath = path.join(sessionsDir, dirent.name); - const stat = await fs.promises.stat(filePath).catch(() => null); - if (!stat?.isFile()) { - continue; - } - files.push({ - path: filePath, - canonicalPath: canonicalizePathForComparison(filePath), - name: dirent.name, - size: stat.size, - mtimeMs: stat.mtimeMs, + // Stat concurrently: the budget sweep stats every session file, and serial + // stats turn one sweep into per-file latency round trips on networked + // filesystems. + const tasks = dirEntries + .filter((dirent) => dirent.isFile()) + .map((dirent) => async (): Promise => { + const filePath = path.join(sessionsDir, dirent.name); + const stat = await fs.promises.stat(filePath).catch(() => null); + if (!stat?.isFile()) { + return null; + } + return { + path: filePath, + canonicalPath: canonicalizePathForComparison(filePath), + name: dirent.name, + size: stat.size, + mtimeMs: stat.mtimeMs, + }; }); - } - return files; + const { results } = await runTasksWithConcurrency({ + tasks, + limit: SESSIONS_DIR_STAT_CONCURRENCY, + }); + return results.filter((file): file is SessionsDirFileStat => Boolean(file)); } async function readSessionPromptBlobFiles(sessionsDir: string): Promise { diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 96fffe3fc8e..3957c1dfb1b 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -711,18 +711,19 @@ async function saveSessionStoreUnlocked( const { cleanupArchivedSessionTranscripts } = await loadSessionArchiveRuntime(); const targetDirs = archivedDirs.size > 0 ? [...archivedDirs] : [path.dirname(path.resolve(storePath))]; + // Both retention reasons ride one cleanup call so each save enumerates + // the sessions dir at most once; reset retention defaults on, so a + // listing per reason would scan twice per save (costly on NFS). await cleanupArchivedSessionTranscripts({ directories: targetDirs, - olderThanMs: maintenance.pruneAfterMs, - reason: "deleted", + rules: + maintenance.resetArchiveRetentionMs != null + ? [ + { reason: "deleted", olderThanMs: maintenance.pruneAfterMs }, + { reason: "reset", olderThanMs: maintenance.resetArchiveRetentionMs }, + ] + : [{ reason: "deleted", olderThanMs: maintenance.pruneAfterMs }], }); - if (maintenance.resetArchiveRetentionMs != null) { - await cleanupArchivedSessionTranscripts({ - directories: targetDirs, - olderThanMs: maintenance.resetArchiveRetentionMs, - reason: "reset", - }); - } } const diskBudget = await enforceSessionDiskBudget({ diff --git a/src/cron/session-reaper.ts b/src/cron/session-reaper.ts index f98549e23ed..73836696d08 100644 --- a/src/cron/session-reaper.ts +++ b/src/cron/session-reaper.ts @@ -116,8 +116,7 @@ export async function sweepCronRunSessions(params: { if (archivedDirs.size > 0) { await cleanupArchivedSessionTranscripts({ directories: [...archivedDirs], - olderThanMs: retentionMs, - reason: "deleted", + rules: [{ reason: "deleted", olderThanMs: retentionMs }], nowMs: now, }); } diff --git a/src/gateway/session-transcript-files.fs.archive-cleanup.test.ts b/src/gateway/session-transcript-files.fs.archive-cleanup.test.ts new file mode 100644 index 00000000000..b139ad210d5 --- /dev/null +++ b/src/gateway/session-transcript-files.fs.archive-cleanup.test.ts @@ -0,0 +1,104 @@ +// Gateway tests cover archived-transcript retention cleanup: every retention +// rule shares one directory listing per cleanup call. Store maintenance runs +// this on each save, so per-rule listings would multiply READDIR load. +import fsPromises from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { cleanupArchivedSessionTranscripts } from "./session-transcript-files.fs.js"; + +const DAY_MS = 24 * 60 * 60 * 1000; +const NOW_MS = Date.parse("2026-06-02T00:00:00.000Z"); +const OLD_STAMP = "2026-01-01T00-00-00.000Z"; +const FRESH_STAMP = "2026-06-01T00-00-00.000Z"; + +describe("cleanupArchivedSessionTranscripts", () => { + let dir = ""; + + beforeEach(async () => { + dir = await fsPromises.mkdtemp(path.join(os.tmpdir(), "openclaw-archive-cleanup-")); + }); + + afterEach(async () => { + vi.restoreAllMocks(); + await fsPromises.rm(dir, { recursive: true, force: true }); + }); + + async function seed(names: string[]): Promise { + for (const name of names) { + await fsPromises.writeFile(path.join(dir, name), ""); + } + } + + async function remaining(): Promise { + return (await fsPromises.readdir(dir)).toSorted(); + } + + it("applies every retention rule from a single directory listing", async () => { + await seed([ + `a.jsonl.deleted.${OLD_STAMP}`, + `b.jsonl.reset.${OLD_STAMP}`, + `c.jsonl.reset.${FRESH_STAMP}`, + "live.jsonl", + ]); + const readdirSpy = vi.spyOn(fsPromises, "readdir"); + + const result = await cleanupArchivedSessionTranscripts({ + directories: [dir], + rules: [ + { reason: "deleted", olderThanMs: 30 * DAY_MS }, + { reason: "reset", olderThanMs: 30 * DAY_MS }, + ], + nowMs: NOW_MS, + }); + + expect(readdirSpy).toHaveBeenCalledTimes(1); + expect(result).toEqual({ removed: 2, scanned: 3 }); + expect(await remaining()).toEqual([`c.jsonl.reset.${FRESH_STAMP}`, "live.jsonl"]); + }); + + it("applies each rule's age threshold independently", async () => { + await seed([`a.jsonl.deleted.${OLD_STAMP}`, `b.jsonl.reset.${OLD_STAMP}`]); + + const result = await cleanupArchivedSessionTranscripts({ + directories: [dir], + rules: [ + { reason: "deleted", olderThanMs: 30 * DAY_MS }, + { reason: "reset", olderThanMs: 365 * DAY_MS }, + ], + nowMs: NOW_MS, + }); + + expect(result).toEqual({ removed: 1, scanned: 2 }); + expect(await remaining()).toEqual([`b.jsonl.reset.${OLD_STAMP}`]); + }); + + it("keeps archives whose reason has no rule", async () => { + await seed([`a.jsonl.reset.${OLD_STAMP}`]); + + const result = await cleanupArchivedSessionTranscripts({ + directories: [dir], + rules: [{ reason: "deleted", olderThanMs: 0 }], + nowMs: NOW_MS, + }); + + expect(result).toEqual({ removed: 0, scanned: 0 }); + expect(await remaining()).toEqual([`a.jsonl.reset.${OLD_STAMP}`]); + }); + + it("drops invalid rules and never lists when none remain", async () => { + const readdirSpy = vi.spyOn(fsPromises, "readdir"); + + const result = await cleanupArchivedSessionTranscripts({ + directories: [dir], + rules: [ + { reason: "deleted", olderThanMs: Number.NaN }, + { reason: "reset", olderThanMs: -1 }, + ], + nowMs: NOW_MS, + }); + + expect(result).toEqual({ removed: 0, scanned: 0 }); + expect(readdirSpy).not.toHaveBeenCalled(); + }); +}); diff --git a/src/gateway/session-transcript-files.fs.ts b/src/gateway/session-transcript-files.fs.ts index afdabcd4e26..4847d7032d3 100644 --- a/src/gateway/session-transcript-files.fs.ts +++ b/src/gateway/session-transcript-files.fs.ts @@ -502,17 +502,26 @@ export function resolveStableSessionEndTranscript(params: { return {}; } +export type SessionArchiveCleanupRule = { + reason: ArchiveFileReason; + olderThanMs: number; +}; + +// Store maintenance runs this on every session-store save. All retention rules +// share one directory listing: a listing per reason would multiply READDIR +// load on the per-save hot path, which is expensive on networked filesystems. export async function cleanupArchivedSessionTranscripts(opts: { directories: string[]; - olderThanMs: number; - reason?: ArchiveFileReason; + rules: SessionArchiveCleanupRule[]; nowMs?: number; }): Promise<{ removed: number; scanned: number }> { - if (!Number.isFinite(opts.olderThanMs) || opts.olderThanMs < 0) { + const rules = opts.rules.filter( + (rule) => Number.isFinite(rule.olderThanMs) && rule.olderThanMs >= 0, + ); + if (rules.length === 0) { return { removed: 0, scanned: 0 }; } const now = opts.nowMs ?? Date.now(); - const reason: ArchiveFileReason = opts.reason ?? "deleted"; const directories = uniqueStrings(opts.directories.map((dir) => path.resolve(dir))); let removed = 0; let scanned = 0; @@ -520,21 +529,24 @@ export async function cleanupArchivedSessionTranscripts(opts: { for (const dir of directories) { const entries = await fs.promises.readdir(dir).catch(() => []); for (const entry of entries) { - const timestamp = parseSessionArchiveTimestamp(entry, reason); - if (timestamp == null) { - continue; + for (const rule of rules) { + const timestamp = parseSessionArchiveTimestamp(entry, rule.reason); + if (timestamp == null) { + continue; + } + scanned += 1; + if (now - timestamp > rule.olderThanMs) { + const fullPath = path.join(dir, entry); + const stat = await fs.promises.stat(fullPath).catch(() => null); + if (stat?.isFile()) { + await fs.promises.rm(fullPath).catch(() => undefined); + removed += 1; + } + } + // An archive name carries exactly one `.{reason}.{timestamp}` suffix, + // so the first matching rule owns the entry. + break; } - scanned += 1; - if (now - timestamp <= opts.olderThanMs) { - continue; - } - const fullPath = path.join(dir, entry); - const stat = await fs.promises.stat(fullPath).catch(() => null); - if (!stat?.isFile()) { - continue; - } - await fs.promises.rm(fullPath).catch(() => undefined); - removed += 1; } }