From 877b5a14f1a9f674bc252b1122cafb2dcd8d66e5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 14:51:48 +0100 Subject: [PATCH] fix(sessions): batch store cap maintenance --- docs/concepts/session.md | 2 + docs/gateway/config-agents.md | 2 +- .../session-management-compaction.md | 2 + src/config/sessions/store-load.ts | 16 +++- src/config/sessions/store-maintenance.ts | 28 +++++++ .../store.pruning.integration.test.ts | 73 ++++++++++++++++++- src/config/sessions/store.pruning.test.ts | 11 ++- src/config/sessions/store.ts | 34 ++++++--- 8 files changed, 152 insertions(+), 16 deletions(-) diff --git a/docs/concepts/session.md b/docs/concepts/session.md index b4915727711..b2b2a7f8e71 100644 --- a/docs/concepts/session.md +++ b/docs/concepts/session.md @@ -118,6 +118,8 @@ to `"enforce"` for automatic cleanup: } ``` +For production-sized `maxEntries` limits, Gateway runtime writes use a small high-water buffer and clean back down to the configured cap in batches. This avoids running full store cleanup on every isolated cron session. `openclaw sessions cleanup --enforce` applies the cap immediately. + Preview with `openclaw sessions cleanup --dry-run`. ## Inspecting sessions diff --git a/docs/gateway/config-agents.md b/docs/gateway/config-agents.md index dee298aedd6..46258d396db 100644 --- a/docs/gateway/config-agents.md +++ b/docs/gateway/config-agents.md @@ -1194,7 +1194,7 @@ See [Multi-Agent Sandbox & Tools](/tools/multi-agent-sandbox-tools) for preceden - **`maintenance`**: session-store cleanup + retention controls. - `mode`: `warn` emits warnings only; `enforce` applies cleanup. - `pruneAfter`: age cutoff for stale entries (default `30d`). - - `maxEntries`: maximum number of entries in `sessions.json` (default `500`). + - `maxEntries`: maximum number of entries in `sessions.json` (default `500`). Runtime writes batch cleanup with a small high-water buffer for production-sized caps; `openclaw sessions cleanup --enforce` applies the cap immediately. - `rotateBytes`: rotate `sessions.json` when it exceeds this size (default `10mb`). - `resetArchiveRetention`: retention for `*.reset.` transcript archives. Defaults to `pruneAfter`; set `false` to disable. - `maxDiskBytes`: optional sessions-directory disk budget. In `warn` mode it logs warnings; in `enforce` mode it removes oldest artifacts/sessions first. diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 1c9c5f7b6dc..26f9ed61590 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -80,6 +80,8 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f - `maxDiskBytes`: optional sessions-directory budget - `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`) +Normal Gateway writes batch `maxEntries` cleanup for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. `openclaw sessions cleanup --enforce` still applies the configured cap immediately. + Enforcement order for disk budget cleanup (`mode: "enforce"`): 1. Remove oldest archived or orphan transcript artifacts first. diff --git a/src/config/sessions/store-load.ts b/src/config/sessions/store-load.ts index f4db2c41210..a200bb9ebb7 100644 --- a/src/config/sessions/store-load.ts +++ b/src/config/sessions/store-load.ts @@ -12,6 +12,7 @@ import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { capEntryCount, pruneStaleEntries, + shouldRunSessionEntryMaintenance, type ResolvedSessionMaintenanceConfig, } from "./store-maintenance.js"; import { applySessionStoreMigrations } from "./store-migrations.js"; @@ -20,6 +21,7 @@ import { normalizeSessionRuntimeModelFields, type SessionEntry } from "./types.j export type LoadSessionStoreOptions = { skipCache?: boolean; maintenanceConfig?: ResolvedSessionMaintenanceConfig; + clone?: boolean; }; const log = createSubsystemLogger("sessions/store"); @@ -129,10 +131,16 @@ export function loadSessionStore( applySessionStoreMigrations(store); normalizeSessionStore(store); const maintenance = opts.maintenanceConfig ?? resolveMaintenanceConfig(); - if (maintenance.mode === "enforce" && Object.keys(store).length > maintenance.maxEntries) { - const beforeCount = Object.keys(store).length; + const beforeCount = Object.keys(store).length; + if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) { const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false }); - const capped = capEntryCount(store, maintenance.maxEntries, { log: false }); + const countAfterPrune = Object.keys(store).length; + const capped = shouldRunSessionEntryMaintenance({ + entryCount: countAfterPrune, + maxEntries: maintenance.maxEntries, + }) + ? capEntryCount(store, maintenance.maxEntries, { log: false }) + : 0; const afterCount = Object.keys(store).length; if (pruned > 0 || capped > 0) { serializedFromDisk = undefined; @@ -158,5 +166,5 @@ export function loadSessionStore( }); } - return structuredClone(store); + return opts.clone === false ? store : structuredClone(store); } diff --git a/src/config/sessions/store-maintenance.ts b/src/config/sessions/store-maintenance.ts index 8b64bce67e9..5973823d621 100644 --- a/src/config/sessions/store-maintenance.ts +++ b/src/config/sessions/store-maintenance.ts @@ -14,6 +14,9 @@ const DEFAULT_SESSION_MAX_ENTRIES = 500; const DEFAULT_SESSION_ROTATE_BYTES = 10_485_760; // 10 MB const DEFAULT_SESSION_MAINTENANCE_MODE: SessionMaintenanceMode = "enforce"; const DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO = 0.8; +const STRICT_ENTRY_MAINTENANCE_MAX_ENTRIES = 49; +const MIN_BATCHED_ENTRY_MAINTENANCE_SLACK = 25; +const BATCHED_ENTRY_MAINTENANCE_SLACK_RATIO = 0.1; export type SessionMaintenanceWarning = { activeSessionKey: string; @@ -148,6 +151,31 @@ export function resolveMaintenanceConfigFromInput( }; } +export function resolveSessionEntryMaintenanceHighWater(maxEntries: number): number { + if (!Number.isSafeInteger(maxEntries) || maxEntries <= 0) { + return 1; + } + if (maxEntries <= STRICT_ENTRY_MAINTENANCE_MAX_ENTRIES) { + return maxEntries + 1; + } + const slack = Math.max( + MIN_BATCHED_ENTRY_MAINTENANCE_SLACK, + Math.ceil(maxEntries * BATCHED_ENTRY_MAINTENANCE_SLACK_RATIO), + ); + return maxEntries + slack; +} + +export function shouldRunSessionEntryMaintenance(params: { + entryCount: number; + maxEntries: number; + force?: boolean; +}): boolean { + if (params.force) { + return true; + } + return params.entryCount >= resolveSessionEntryMaintenanceHighWater(params.maxEntries); +} + /** * Remove entries whose `updatedAt` is older than the configured threshold. * Entries without `updatedAt` are kept (cannot determine staleness). diff --git a/src/config/sessions/store.pruning.integration.test.ts b/src/config/sessions/store.pruning.integration.test.ts index 3ac0216956e..70a8209a87a 100644 --- a/src/config/sessions/store.pruning.integration.test.ts +++ b/src/config/sessions/store.pruning.integration.test.ts @@ -12,7 +12,12 @@ vi.mock("../config.js", async () => ({ })); import { getRuntimeConfig } from "../config.js"; -import { clearSessionStoreCacheForTest, loadSessionStore, saveSessionStore } from "./store.js"; +import { + clearSessionStoreCacheForTest, + loadSessionStore, + saveSessionStore, + updateSessionStore, +} from "./store.js"; let mockLoadConfig: ReturnType; @@ -287,6 +292,72 @@ describe("Integration: saveSessionStore with pruning", () => { expect(loaded.newest).toBeDefined(); }); + it("loadSessionStore batches entry-count cleanup until the high-water mark", async () => { + const now = Date.now(); + const store = Object.fromEntries( + Array.from({ length: 51 }, (_, index) => [`session-${index}`, makeEntry(now - index)]), + ); + await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); + + const loaded = loadSessionStore(storePath, { + skipCache: true, + maintenanceConfig: { + ...ENFORCED_MAINTENANCE_OVERRIDE, + maxEntries: 50, + pruneAfterMs: 365 * DAY_MS, + }, + }); + + expect(Object.keys(loaded)).toHaveLength(51); + }); + + it("loadSessionStore caps production-sized stores once they reach the high-water mark", async () => { + const now = Date.now(); + const store = Object.fromEntries( + Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]), + ); + await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); + + const loaded = loadSessionStore(storePath, { + skipCache: true, + maintenanceConfig: { + ...ENFORCED_MAINTENANCE_OVERRIDE, + maxEntries: 50, + pruneAfterMs: 365 * DAY_MS, + }, + }); + + expect(Object.keys(loaded)).toHaveLength(50); + expect(loaded["session-0"]).toBeDefined(); + expect(loaded["session-74"]).toBeUndefined(); + }); + + it("updateSessionStore batches cap-hit maintenance instead of pruning every new session", async () => { + const now = Date.now(); + const store = Object.fromEntries( + Array.from({ length: 50 }, (_, index) => [`session-${index}`, makeEntry(now - index)]), + ); + await fs.writeFile(storePath, JSON.stringify(store), "utf-8"); + mockLoadConfig.mockReturnValue({ + session: { + maintenance: { + mode: "enforce", + pruneAfter: "365d", + maxEntries: 50, + rotateBytes: 10_485_760, + }, + }, + }); + + await updateSessionStore(storePath, (next) => { + next["session-50"] = makeEntry(now + 1); + }); + + const loaded = loadSessionStore(storePath, { skipCache: true }); + expect(Object.keys(loaded)).toHaveLength(51); + expect(loaded["session-50"]).toBeDefined(); + }); + it("loadSessionStore honors configured maxEntries without an explicit override", async () => { mockLoadConfig.mockReturnValue({ session: { diff --git a/src/config/sessions/store.pruning.test.ts b/src/config/sessions/store.pruning.test.ts index 575fa905308..eaee2c9f6ef 100644 --- a/src/config/sessions/store.pruning.test.ts +++ b/src/config/sessions/store.pruning.test.ts @@ -3,7 +3,10 @@ import fs from "node:fs/promises"; import path from "node:path"; import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { createFixtureSuite } from "../../test-utils/fixture-suite.js"; -import { resolveMaintenanceConfigFromInput } from "./store-maintenance.js"; +import { + resolveMaintenanceConfigFromInput, + resolveSessionEntryMaintenanceHighWater, +} from "./store-maintenance.js"; import { capEntryCount, getActiveSessionMaintenanceWarning, @@ -83,6 +86,12 @@ describe("resolveMaintenanceConfigFromInput", () => { expect(maintenance.mode).toBe("enforce"); }); + + it("batches normal entry-count maintenance for production-sized caps", () => { + expect(resolveSessionEntryMaintenanceHighWater(2)).toBe(3); + expect(resolveSessionEntryMaintenanceHighWater(50)).toBe(75); + expect(resolveSessionEntryMaintenanceHighWater(500)).toBe(550); + }); }); describe("getActiveSessionMaintenanceWarning", () => { diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 1fc9445e395..da26c8e8b11 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -40,6 +40,7 @@ import { getActiveSessionMaintenanceWarning, pruneStaleEntries, rotateSessionFile, + shouldRunSessionEntryMaintenance, type ResolvedSessionMaintenanceConfig, type SessionMaintenanceWarning, } from "./store-maintenance.js"; @@ -243,10 +244,16 @@ async function saveSessionStoreUnlocked( : { ...resolveMaintenanceConfig(), ...opts?.maintenanceOverride }; const shouldWarnOnly = maintenance.mode === "warn"; const beforeCount = Object.keys(store).length; + const forceMaintenance = opts?.maintenanceOverride !== undefined; + const shouldRunEntryMaintenance = shouldRunSessionEntryMaintenance({ + entryCount: beforeCount, + maxEntries: maintenance.maxEntries, + force: forceMaintenance, + }); if (shouldWarnOnly) { const activeSessionKey = opts?.activeSessionKey?.trim(); - if (activeSessionKey) { + if (activeSessionKey && shouldRunEntryMaintenance) { const warning = getActiveSessionMaintenanceWarning({ store, activeSessionKey, @@ -292,12 +299,21 @@ async function saveSessionStoreUnlocked( }, preserveKeys: preserveSessionKeys, }); - const capped = capEntryCount(store, maintenance.maxEntries, { - onCapped: ({ entry }) => { - rememberRemovedSessionFile(removedSessionFiles, entry); - }, - preserveKeys: preserveSessionKeys, - }); + const countAfterPrune = Object.keys(store).length; + const shouldRunCapMaintenance = + forceMaintenance || + shouldRunSessionEntryMaintenance({ + entryCount: countAfterPrune, + maxEntries: maintenance.maxEntries, + }); + const capped = shouldRunCapMaintenance + ? capEntryCount(store, maintenance.maxEntries, { + onCapped: ({ entry }) => { + rememberRemovedSessionFile(removedSessionFiles, entry); + }, + preserveKeys: preserveSessionKeys, + }) + : 0; const archivedDirs = new Set(); const referencedSessionIds = new Set( Object.values(store) @@ -425,7 +441,7 @@ export async function updateSessionStore( ): Promise { return await withSessionStoreLock(storePath, async () => { // Always re-read inside the lock to avoid clobbering concurrent writers. - const store = loadSessionStore(storePath, { skipCache: true }); + const store = loadSessionStore(storePath, { skipCache: true, clone: false }); const previousAcpByKey = collectAcpMetadataSnapshot(store); const result = await mutator(store); preserveExistingAcpMetadata({ @@ -648,7 +664,7 @@ export async function updateSessionStoreEntry(params: { }): Promise { const { storePath, sessionKey, update } = params; return await withSessionStoreLock(storePath, async () => { - const store = loadSessionStore(storePath, { skipCache: true }); + const store = loadSessionStore(storePath, { skipCache: true, clone: false }); const resolved = resolveSessionStoreEntry({ store, sessionKey }); const existing = resolved.existing; if (!existing) {