mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-24 13:39:29 +00:00
perf(sessions): share one enumeration across archive retention sweeps (#91957)
Co-authored-by: Alex Knight <15041791+amknight@users.noreply.github.com>
This commit is contained in:
@@ -9,6 +9,7 @@ import {
|
||||
resolveTrajectoryFilePath,
|
||||
resolveTrajectoryPointerFilePath,
|
||||
} from "../../trajectory/paths.js";
|
||||
import { runTasksWithConcurrency } from "../../utils/run-with-concurrency.js";
|
||||
import {
|
||||
isCompactionCheckpointTranscriptFileName,
|
||||
isPrimarySessionTranscriptFileName,
|
||||
@@ -215,29 +216,36 @@ function resolveReferencedSessionArtifactPaths(params: {
|
||||
return referenced;
|
||||
}
|
||||
|
||||
const SESSIONS_DIR_STAT_CONCURRENCY = 8;
|
||||
|
||||
async function readSessionsDirFiles(sessionsDir: string): Promise<SessionsDirFileStat[]> {
|
||||
const dirEntries = await fs.promises
|
||||
.readdir(sessionsDir, { withFileTypes: true })
|
||||
.catch(() => []);
|
||||
const files: SessionsDirFileStat[] = [];
|
||||
for (const dirent of dirEntries) {
|
||||
if (!dirent.isFile()) {
|
||||
continue;
|
||||
}
|
||||
const filePath = path.join(sessionsDir, dirent.name);
|
||||
const stat = await fs.promises.stat(filePath).catch(() => null);
|
||||
if (!stat?.isFile()) {
|
||||
continue;
|
||||
}
|
||||
files.push({
|
||||
path: filePath,
|
||||
canonicalPath: canonicalizePathForComparison(filePath),
|
||||
name: dirent.name,
|
||||
size: stat.size,
|
||||
mtimeMs: stat.mtimeMs,
|
||||
// Stat concurrently: the budget sweep stats every session file, and serial
|
||||
// stats turn one sweep into per-file latency round trips on networked
|
||||
// filesystems.
|
||||
const tasks = dirEntries
|
||||
.filter((dirent) => dirent.isFile())
|
||||
.map((dirent) => async (): Promise<SessionsDirFileStat | null> => {
|
||||
const filePath = path.join(sessionsDir, dirent.name);
|
||||
const stat = await fs.promises.stat(filePath).catch(() => null);
|
||||
if (!stat?.isFile()) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
path: filePath,
|
||||
canonicalPath: canonicalizePathForComparison(filePath),
|
||||
name: dirent.name,
|
||||
size: stat.size,
|
||||
mtimeMs: stat.mtimeMs,
|
||||
};
|
||||
});
|
||||
}
|
||||
return files;
|
||||
const { results } = await runTasksWithConcurrency({
|
||||
tasks,
|
||||
limit: SESSIONS_DIR_STAT_CONCURRENCY,
|
||||
});
|
||||
return results.filter((file): file is SessionsDirFileStat => Boolean(file));
|
||||
}
|
||||
|
||||
async function readSessionPromptBlobFiles(sessionsDir: string): Promise<SessionsDirFileStat[]> {
|
||||
|
||||
@@ -711,18 +711,19 @@ async function saveSessionStoreUnlocked(
|
||||
const { cleanupArchivedSessionTranscripts } = await loadSessionArchiveRuntime();
|
||||
const targetDirs =
|
||||
archivedDirs.size > 0 ? [...archivedDirs] : [path.dirname(path.resolve(storePath))];
|
||||
// Both retention reasons ride one cleanup call so each save enumerates
|
||||
// the sessions dir at most once; reset retention defaults on, so a
|
||||
// listing per reason would scan twice per save (costly on NFS).
|
||||
await cleanupArchivedSessionTranscripts({
|
||||
directories: targetDirs,
|
||||
olderThanMs: maintenance.pruneAfterMs,
|
||||
reason: "deleted",
|
||||
rules:
|
||||
maintenance.resetArchiveRetentionMs != null
|
||||
? [
|
||||
{ reason: "deleted", olderThanMs: maintenance.pruneAfterMs },
|
||||
{ reason: "reset", olderThanMs: maintenance.resetArchiveRetentionMs },
|
||||
]
|
||||
: [{ reason: "deleted", olderThanMs: maintenance.pruneAfterMs }],
|
||||
});
|
||||
if (maintenance.resetArchiveRetentionMs != null) {
|
||||
await cleanupArchivedSessionTranscripts({
|
||||
directories: targetDirs,
|
||||
olderThanMs: maintenance.resetArchiveRetentionMs,
|
||||
reason: "reset",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const diskBudget = await enforceSessionDiskBudget({
|
||||
|
||||
@@ -116,8 +116,7 @@ export async function sweepCronRunSessions(params: {
|
||||
if (archivedDirs.size > 0) {
|
||||
await cleanupArchivedSessionTranscripts({
|
||||
directories: [...archivedDirs],
|
||||
olderThanMs: retentionMs,
|
||||
reason: "deleted",
|
||||
rules: [{ reason: "deleted", olderThanMs: retentionMs }],
|
||||
nowMs: now,
|
||||
});
|
||||
}
|
||||
|
||||
104
src/gateway/session-transcript-files.fs.archive-cleanup.test.ts
Normal file
104
src/gateway/session-transcript-files.fs.archive-cleanup.test.ts
Normal file
@@ -0,0 +1,104 @@
|
||||
// Gateway tests cover archived-transcript retention cleanup: every retention
|
||||
// rule shares one directory listing per cleanup call. Store maintenance runs
|
||||
// this on each save, so per-rule listings would multiply READDIR load.
|
||||
import fsPromises from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { cleanupArchivedSessionTranscripts } from "./session-transcript-files.fs.js";
|
||||
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
const NOW_MS = Date.parse("2026-06-02T00:00:00.000Z");
|
||||
const OLD_STAMP = "2026-01-01T00-00-00.000Z";
|
||||
const FRESH_STAMP = "2026-06-01T00-00-00.000Z";
|
||||
|
||||
describe("cleanupArchivedSessionTranscripts", () => {
|
||||
let dir = "";
|
||||
|
||||
beforeEach(async () => {
|
||||
dir = await fsPromises.mkdtemp(path.join(os.tmpdir(), "openclaw-archive-cleanup-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
await fsPromises.rm(dir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
async function seed(names: string[]): Promise<void> {
|
||||
for (const name of names) {
|
||||
await fsPromises.writeFile(path.join(dir, name), "");
|
||||
}
|
||||
}
|
||||
|
||||
async function remaining(): Promise<string[]> {
|
||||
return (await fsPromises.readdir(dir)).toSorted();
|
||||
}
|
||||
|
||||
it("applies every retention rule from a single directory listing", async () => {
|
||||
await seed([
|
||||
`a.jsonl.deleted.${OLD_STAMP}`,
|
||||
`b.jsonl.reset.${OLD_STAMP}`,
|
||||
`c.jsonl.reset.${FRESH_STAMP}`,
|
||||
"live.jsonl",
|
||||
]);
|
||||
const readdirSpy = vi.spyOn(fsPromises, "readdir");
|
||||
|
||||
const result = await cleanupArchivedSessionTranscripts({
|
||||
directories: [dir],
|
||||
rules: [
|
||||
{ reason: "deleted", olderThanMs: 30 * DAY_MS },
|
||||
{ reason: "reset", olderThanMs: 30 * DAY_MS },
|
||||
],
|
||||
nowMs: NOW_MS,
|
||||
});
|
||||
|
||||
expect(readdirSpy).toHaveBeenCalledTimes(1);
|
||||
expect(result).toEqual({ removed: 2, scanned: 3 });
|
||||
expect(await remaining()).toEqual([`c.jsonl.reset.${FRESH_STAMP}`, "live.jsonl"]);
|
||||
});
|
||||
|
||||
it("applies each rule's age threshold independently", async () => {
|
||||
await seed([`a.jsonl.deleted.${OLD_STAMP}`, `b.jsonl.reset.${OLD_STAMP}`]);
|
||||
|
||||
const result = await cleanupArchivedSessionTranscripts({
|
||||
directories: [dir],
|
||||
rules: [
|
||||
{ reason: "deleted", olderThanMs: 30 * DAY_MS },
|
||||
{ reason: "reset", olderThanMs: 365 * DAY_MS },
|
||||
],
|
||||
nowMs: NOW_MS,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ removed: 1, scanned: 2 });
|
||||
expect(await remaining()).toEqual([`b.jsonl.reset.${OLD_STAMP}`]);
|
||||
});
|
||||
|
||||
it("keeps archives whose reason has no rule", async () => {
|
||||
await seed([`a.jsonl.reset.${OLD_STAMP}`]);
|
||||
|
||||
const result = await cleanupArchivedSessionTranscripts({
|
||||
directories: [dir],
|
||||
rules: [{ reason: "deleted", olderThanMs: 0 }],
|
||||
nowMs: NOW_MS,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ removed: 0, scanned: 0 });
|
||||
expect(await remaining()).toEqual([`a.jsonl.reset.${OLD_STAMP}`]);
|
||||
});
|
||||
|
||||
it("drops invalid rules and never lists when none remain", async () => {
|
||||
const readdirSpy = vi.spyOn(fsPromises, "readdir");
|
||||
|
||||
const result = await cleanupArchivedSessionTranscripts({
|
||||
directories: [dir],
|
||||
rules: [
|
||||
{ reason: "deleted", olderThanMs: Number.NaN },
|
||||
{ reason: "reset", olderThanMs: -1 },
|
||||
],
|
||||
nowMs: NOW_MS,
|
||||
});
|
||||
|
||||
expect(result).toEqual({ removed: 0, scanned: 0 });
|
||||
expect(readdirSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -502,17 +502,26 @@ export function resolveStableSessionEndTranscript(params: {
|
||||
return {};
|
||||
}
|
||||
|
||||
export type SessionArchiveCleanupRule = {
|
||||
reason: ArchiveFileReason;
|
||||
olderThanMs: number;
|
||||
};
|
||||
|
||||
// Store maintenance runs this on every session-store save. All retention rules
|
||||
// share one directory listing: a listing per reason would multiply READDIR
|
||||
// load on the per-save hot path, which is expensive on networked filesystems.
|
||||
export async function cleanupArchivedSessionTranscripts(opts: {
|
||||
directories: string[];
|
||||
olderThanMs: number;
|
||||
reason?: ArchiveFileReason;
|
||||
rules: SessionArchiveCleanupRule[];
|
||||
nowMs?: number;
|
||||
}): Promise<{ removed: number; scanned: number }> {
|
||||
if (!Number.isFinite(opts.olderThanMs) || opts.olderThanMs < 0) {
|
||||
const rules = opts.rules.filter(
|
||||
(rule) => Number.isFinite(rule.olderThanMs) && rule.olderThanMs >= 0,
|
||||
);
|
||||
if (rules.length === 0) {
|
||||
return { removed: 0, scanned: 0 };
|
||||
}
|
||||
const now = opts.nowMs ?? Date.now();
|
||||
const reason: ArchiveFileReason = opts.reason ?? "deleted";
|
||||
const directories = uniqueStrings(opts.directories.map((dir) => path.resolve(dir)));
|
||||
let removed = 0;
|
||||
let scanned = 0;
|
||||
@@ -520,21 +529,24 @@ export async function cleanupArchivedSessionTranscripts(opts: {
|
||||
for (const dir of directories) {
|
||||
const entries = await fs.promises.readdir(dir).catch(() => []);
|
||||
for (const entry of entries) {
|
||||
const timestamp = parseSessionArchiveTimestamp(entry, reason);
|
||||
if (timestamp == null) {
|
||||
continue;
|
||||
for (const rule of rules) {
|
||||
const timestamp = parseSessionArchiveTimestamp(entry, rule.reason);
|
||||
if (timestamp == null) {
|
||||
continue;
|
||||
}
|
||||
scanned += 1;
|
||||
if (now - timestamp > rule.olderThanMs) {
|
||||
const fullPath = path.join(dir, entry);
|
||||
const stat = await fs.promises.stat(fullPath).catch(() => null);
|
||||
if (stat?.isFile()) {
|
||||
await fs.promises.rm(fullPath).catch(() => undefined);
|
||||
removed += 1;
|
||||
}
|
||||
}
|
||||
// An archive name carries exactly one `.{reason}.{timestamp}` suffix,
|
||||
// so the first matching rule owns the entry.
|
||||
break;
|
||||
}
|
||||
scanned += 1;
|
||||
if (now - timestamp <= opts.olderThanMs) {
|
||||
continue;
|
||||
}
|
||||
const fullPath = path.join(dir, entry);
|
||||
const stat = await fs.promises.stat(fullPath).catch(() => null);
|
||||
if (!stat?.isFile()) {
|
||||
continue;
|
||||
}
|
||||
await fs.promises.rm(fullPath).catch(() => undefined);
|
||||
removed += 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user