mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:10:43 +00:00
fix(sessions): batch store cap maintenance
This commit is contained in:
@@ -118,6 +118,8 @@ to `"enforce"` for automatic cleanup:
|
||||
}
|
||||
```
|
||||
|
||||
For production-sized `maxEntries` limits, Gateway runtime writes use a small high-water buffer and clean back down to the configured cap in batches. This avoids running full store cleanup on every isolated cron session. `openclaw sessions cleanup --enforce` applies the cap immediately.
|
||||
|
||||
Preview with `openclaw sessions cleanup --dry-run`.
|
||||
|
||||
## Inspecting sessions
|
||||
|
||||
@@ -1194,7 +1194,7 @@ See [Multi-Agent Sandbox & Tools](/tools/multi-agent-sandbox-tools) for preceden
|
||||
- **`maintenance`**: session-store cleanup + retention controls.
|
||||
- `mode`: `warn` emits warnings only; `enforce` applies cleanup.
|
||||
- `pruneAfter`: age cutoff for stale entries (default `30d`).
|
||||
- `maxEntries`: maximum number of entries in `sessions.json` (default `500`).
|
||||
- `maxEntries`: maximum number of entries in `sessions.json` (default `500`). Runtime writes batch cleanup with a small high-water buffer for production-sized caps; `openclaw sessions cleanup --enforce` applies the cap immediately.
|
||||
- `rotateBytes`: rotate `sessions.json` when it exceeds this size (default `10mb`).
|
||||
- `resetArchiveRetention`: retention for `*.reset.<timestamp>` transcript archives. Defaults to `pruneAfter`; set `false` to disable.
|
||||
- `maxDiskBytes`: optional sessions-directory disk budget. In `warn` mode it logs warnings; in `enforce` mode it removes oldest artifacts/sessions first.
|
||||
|
||||
@@ -80,6 +80,8 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
|
||||
- `maxDiskBytes`: optional sessions-directory budget
|
||||
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
|
||||
|
||||
Normal Gateway writes batch `maxEntries` cleanup for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
|
||||
|
||||
Enforcement order for disk budget cleanup (`mode: "enforce"`):
|
||||
|
||||
1. Remove oldest archived or orphan transcript artifacts first.
|
||||
|
||||
@@ -12,6 +12,7 @@ import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
pruneStaleEntries,
|
||||
shouldRunSessionEntryMaintenance,
|
||||
type ResolvedSessionMaintenanceConfig,
|
||||
} from "./store-maintenance.js";
|
||||
import { applySessionStoreMigrations } from "./store-migrations.js";
|
||||
@@ -20,6 +21,7 @@ import { normalizeSessionRuntimeModelFields, type SessionEntry } from "./types.j
|
||||
export type LoadSessionStoreOptions = {
|
||||
skipCache?: boolean;
|
||||
maintenanceConfig?: ResolvedSessionMaintenanceConfig;
|
||||
clone?: boolean;
|
||||
};
|
||||
|
||||
const log = createSubsystemLogger("sessions/store");
|
||||
@@ -129,10 +131,16 @@ export function loadSessionStore(
|
||||
applySessionStoreMigrations(store);
|
||||
normalizeSessionStore(store);
|
||||
const maintenance = opts.maintenanceConfig ?? resolveMaintenanceConfig();
|
||||
if (maintenance.mode === "enforce" && Object.keys(store).length > maintenance.maxEntries) {
|
||||
const beforeCount = Object.keys(store).length;
|
||||
const beforeCount = Object.keys(store).length;
|
||||
if (maintenance.mode === "enforce" && beforeCount > maintenance.maxEntries) {
|
||||
const pruned = pruneStaleEntries(store, maintenance.pruneAfterMs, { log: false });
|
||||
const capped = capEntryCount(store, maintenance.maxEntries, { log: false });
|
||||
const countAfterPrune = Object.keys(store).length;
|
||||
const capped = shouldRunSessionEntryMaintenance({
|
||||
entryCount: countAfterPrune,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
})
|
||||
? capEntryCount(store, maintenance.maxEntries, { log: false })
|
||||
: 0;
|
||||
const afterCount = Object.keys(store).length;
|
||||
if (pruned > 0 || capped > 0) {
|
||||
serializedFromDisk = undefined;
|
||||
@@ -158,5 +166,5 @@ export function loadSessionStore(
|
||||
});
|
||||
}
|
||||
|
||||
return structuredClone(store);
|
||||
return opts.clone === false ? store : structuredClone(store);
|
||||
}
|
||||
|
||||
@@ -14,6 +14,9 @@ const DEFAULT_SESSION_MAX_ENTRIES = 500;
|
||||
const DEFAULT_SESSION_ROTATE_BYTES = 10_485_760; // 10 MB
|
||||
const DEFAULT_SESSION_MAINTENANCE_MODE: SessionMaintenanceMode = "enforce";
|
||||
const DEFAULT_SESSION_DISK_BUDGET_HIGH_WATER_RATIO = 0.8;
|
||||
const STRICT_ENTRY_MAINTENANCE_MAX_ENTRIES = 49;
|
||||
const MIN_BATCHED_ENTRY_MAINTENANCE_SLACK = 25;
|
||||
const BATCHED_ENTRY_MAINTENANCE_SLACK_RATIO = 0.1;
|
||||
|
||||
export type SessionMaintenanceWarning = {
|
||||
activeSessionKey: string;
|
||||
@@ -148,6 +151,31 @@ export function resolveMaintenanceConfigFromInput(
|
||||
};
|
||||
}
|
||||
|
||||
export function resolveSessionEntryMaintenanceHighWater(maxEntries: number): number {
|
||||
if (!Number.isSafeInteger(maxEntries) || maxEntries <= 0) {
|
||||
return 1;
|
||||
}
|
||||
if (maxEntries <= STRICT_ENTRY_MAINTENANCE_MAX_ENTRIES) {
|
||||
return maxEntries + 1;
|
||||
}
|
||||
const slack = Math.max(
|
||||
MIN_BATCHED_ENTRY_MAINTENANCE_SLACK,
|
||||
Math.ceil(maxEntries * BATCHED_ENTRY_MAINTENANCE_SLACK_RATIO),
|
||||
);
|
||||
return maxEntries + slack;
|
||||
}
|
||||
|
||||
export function shouldRunSessionEntryMaintenance(params: {
|
||||
entryCount: number;
|
||||
maxEntries: number;
|
||||
force?: boolean;
|
||||
}): boolean {
|
||||
if (params.force) {
|
||||
return true;
|
||||
}
|
||||
return params.entryCount >= resolveSessionEntryMaintenanceHighWater(params.maxEntries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove entries whose `updatedAt` is older than the configured threshold.
|
||||
* Entries without `updatedAt` are kept (cannot determine staleness).
|
||||
|
||||
@@ -12,7 +12,12 @@ vi.mock("../config.js", async () => ({
|
||||
}));
|
||||
|
||||
import { getRuntimeConfig } from "../config.js";
|
||||
import { clearSessionStoreCacheForTest, loadSessionStore, saveSessionStore } from "./store.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
loadSessionStore,
|
||||
saveSessionStore,
|
||||
updateSessionStore,
|
||||
} from "./store.js";
|
||||
|
||||
let mockLoadConfig: ReturnType<typeof vi.fn>;
|
||||
|
||||
@@ -287,6 +292,72 @@ describe("Integration: saveSessionStore with pruning", () => {
|
||||
expect(loaded.newest).toBeDefined();
|
||||
});
|
||||
|
||||
it("loadSessionStore batches entry-count cleanup until the high-water mark", async () => {
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 51 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 50,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(51);
|
||||
});
|
||||
|
||||
it("loadSessionStore caps production-sized stores once they reach the high-water mark", async () => {
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 75 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
|
||||
const loaded = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
maintenanceConfig: {
|
||||
...ENFORCED_MAINTENANCE_OVERRIDE,
|
||||
maxEntries: 50,
|
||||
pruneAfterMs: 365 * DAY_MS,
|
||||
},
|
||||
});
|
||||
|
||||
expect(Object.keys(loaded)).toHaveLength(50);
|
||||
expect(loaded["session-0"]).toBeDefined();
|
||||
expect(loaded["session-74"]).toBeUndefined();
|
||||
});
|
||||
|
||||
it("updateSessionStore batches cap-hit maintenance instead of pruning every new session", async () => {
|
||||
const now = Date.now();
|
||||
const store = Object.fromEntries(
|
||||
Array.from({ length: 50 }, (_, index) => [`session-${index}`, makeEntry(now - index)]),
|
||||
);
|
||||
await fs.writeFile(storePath, JSON.stringify(store), "utf-8");
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
maintenance: {
|
||||
mode: "enforce",
|
||||
pruneAfter: "365d",
|
||||
maxEntries: 50,
|
||||
rotateBytes: 10_485_760,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
await updateSessionStore(storePath, (next) => {
|
||||
next["session-50"] = makeEntry(now + 1);
|
||||
});
|
||||
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true });
|
||||
expect(Object.keys(loaded)).toHaveLength(51);
|
||||
expect(loaded["session-50"]).toBeDefined();
|
||||
});
|
||||
|
||||
it("loadSessionStore honors configured maxEntries without an explicit override", async () => {
|
||||
mockLoadConfig.mockReturnValue({
|
||||
session: {
|
||||
|
||||
@@ -3,7 +3,10 @@ import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createFixtureSuite } from "../../test-utils/fixture-suite.js";
|
||||
import { resolveMaintenanceConfigFromInput } from "./store-maintenance.js";
|
||||
import {
|
||||
resolveMaintenanceConfigFromInput,
|
||||
resolveSessionEntryMaintenanceHighWater,
|
||||
} from "./store-maintenance.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
@@ -83,6 +86,12 @@ describe("resolveMaintenanceConfigFromInput", () => {
|
||||
|
||||
expect(maintenance.mode).toBe("enforce");
|
||||
});
|
||||
|
||||
it("batches normal entry-count maintenance for production-sized caps", () => {
|
||||
expect(resolveSessionEntryMaintenanceHighWater(2)).toBe(3);
|
||||
expect(resolveSessionEntryMaintenanceHighWater(50)).toBe(75);
|
||||
expect(resolveSessionEntryMaintenanceHighWater(500)).toBe(550);
|
||||
});
|
||||
});
|
||||
|
||||
describe("getActiveSessionMaintenanceWarning", () => {
|
||||
|
||||
@@ -40,6 +40,7 @@ import {
|
||||
getActiveSessionMaintenanceWarning,
|
||||
pruneStaleEntries,
|
||||
rotateSessionFile,
|
||||
shouldRunSessionEntryMaintenance,
|
||||
type ResolvedSessionMaintenanceConfig,
|
||||
type SessionMaintenanceWarning,
|
||||
} from "./store-maintenance.js";
|
||||
@@ -243,10 +244,16 @@ async function saveSessionStoreUnlocked(
|
||||
: { ...resolveMaintenanceConfig(), ...opts?.maintenanceOverride };
|
||||
const shouldWarnOnly = maintenance.mode === "warn";
|
||||
const beforeCount = Object.keys(store).length;
|
||||
const forceMaintenance = opts?.maintenanceOverride !== undefined;
|
||||
const shouldRunEntryMaintenance = shouldRunSessionEntryMaintenance({
|
||||
entryCount: beforeCount,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
force: forceMaintenance,
|
||||
});
|
||||
|
||||
if (shouldWarnOnly) {
|
||||
const activeSessionKey = opts?.activeSessionKey?.trim();
|
||||
if (activeSessionKey) {
|
||||
if (activeSessionKey && shouldRunEntryMaintenance) {
|
||||
const warning = getActiveSessionMaintenanceWarning({
|
||||
store,
|
||||
activeSessionKey,
|
||||
@@ -292,12 +299,21 @@ async function saveSessionStoreUnlocked(
|
||||
},
|
||||
preserveKeys: preserveSessionKeys,
|
||||
});
|
||||
const capped = capEntryCount(store, maintenance.maxEntries, {
|
||||
onCapped: ({ entry }) => {
|
||||
rememberRemovedSessionFile(removedSessionFiles, entry);
|
||||
},
|
||||
preserveKeys: preserveSessionKeys,
|
||||
});
|
||||
const countAfterPrune = Object.keys(store).length;
|
||||
const shouldRunCapMaintenance =
|
||||
forceMaintenance ||
|
||||
shouldRunSessionEntryMaintenance({
|
||||
entryCount: countAfterPrune,
|
||||
maxEntries: maintenance.maxEntries,
|
||||
});
|
||||
const capped = shouldRunCapMaintenance
|
||||
? capEntryCount(store, maintenance.maxEntries, {
|
||||
onCapped: ({ entry }) => {
|
||||
rememberRemovedSessionFile(removedSessionFiles, entry);
|
||||
},
|
||||
preserveKeys: preserveSessionKeys,
|
||||
})
|
||||
: 0;
|
||||
const archivedDirs = new Set<string>();
|
||||
const referencedSessionIds = new Set(
|
||||
Object.values(store)
|
||||
@@ -425,7 +441,7 @@ export async function updateSessionStore<T>(
|
||||
): Promise<T> {
|
||||
return await withSessionStoreLock(storePath, async () => {
|
||||
// Always re-read inside the lock to avoid clobbering concurrent writers.
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const store = loadSessionStore(storePath, { skipCache: true, clone: false });
|
||||
const previousAcpByKey = collectAcpMetadataSnapshot(store);
|
||||
const result = await mutator(store);
|
||||
preserveExistingAcpMetadata({
|
||||
@@ -648,7 +664,7 @@ export async function updateSessionStoreEntry(params: {
|
||||
}): Promise<SessionEntry | null> {
|
||||
const { storePath, sessionKey, update } = params;
|
||||
return await withSessionStoreLock(storePath, async () => {
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const store = loadSessionStore(storePath, { skipCache: true, clone: false });
|
||||
const resolved = resolveSessionStoreEntry({ store, sessionKey });
|
||||
const existing = resolved.existing;
|
||||
if (!existing) {
|
||||
|
||||
Reference in New Issue
Block a user