refactor: extract shared expiring cache

This commit is contained in:
Peter Steinberger
2026-03-22 22:07:52 +00:00
parent 6a228d9145
commit 8eadc2f43b
4 changed files with 178 additions and 31 deletions

View File

@@ -1,5 +1,5 @@
import { describe, expect, it } from "vitest";
import { resolveCacheTtlMs } from "./cache-utils.js";
import { createExpiringMapCache, resolveCacheTtlMs } from "./cache-utils.js";
describe("resolveCacheTtlMs", () => {
it("accepts exact non-negative integers", () => {
@@ -12,3 +12,39 @@ describe("resolveCacheTtlMs", () => {
expect(resolveCacheTtlMs({ envValue: "15ms", defaultTtlMs: 60_000 })).toBe(60_000);
});
});
describe("createExpiringMapCache", () => {
it("expires entries on read after the TTL", () => {
let now = 1_000;
const cache = createExpiringMapCache<string, string>({
ttlMs: 5_000,
clock: () => now,
});
cache.set("alpha", "a");
expect(cache.get("alpha")).toBe("a");
now = 6_001;
expect(cache.get("alpha")).toBeUndefined();
expect(cache.size()).toBe(0);
});
it("supports dynamic TTLs and opportunistic pruning", () => {
let now = 1_000;
let ttlMs = 5_000;
const cache = createExpiringMapCache<string, string>({
ttlMs: () => ttlMs,
pruneIntervalMs: 1_000,
clock: () => now,
});
cache.set("stale", "old");
now = 7_000;
ttlMs = 2_000;
cache.set("fresh", "new");
expect(cache.get("stale")).toBeUndefined();
expect(cache.keys()).toEqual(["fresh"]);
});
});

View File

@@ -19,6 +19,128 @@ export function isCacheEnabled(ttlMs: number): boolean {
return ttlMs > 0;
}
type CacheTtlResolver = number | (() => number);
type CachePruneIntervalResolver = number | ((ttlMs: number) => number);
type ExpiringMapCacheEntry<TValue> = {
storedAt: number;
value: TValue;
};
export type ExpiringMapCache<TKey, TValue> = {
get: (key: TKey) => TValue | undefined;
set: (key: TKey, value: TValue) => void;
delete: (key: TKey) => void;
clear: () => void;
keys: () => TKey[];
size: () => number;
pruneExpired: () => void;
};
function resolveCacheNumeric(value: CacheTtlResolver): number {
return typeof value === "function" ? value() : value;
}
function resolvePruneIntervalMs(
ttlMs: number,
pruneIntervalMs: CachePruneIntervalResolver | undefined,
): number {
if (typeof pruneIntervalMs === "function") {
return Math.max(0, Math.floor(pruneIntervalMs(ttlMs)));
}
if (typeof pruneIntervalMs === "number") {
return Math.max(0, Math.floor(pruneIntervalMs));
}
return ttlMs;
}
function isCacheEntryExpired(storedAt: number, now: number, ttlMs: number): boolean {
return now - storedAt > ttlMs;
}
export function createExpiringMapCache<TKey, TValue>(options: {
ttlMs: CacheTtlResolver;
pruneIntervalMs?: CachePruneIntervalResolver;
clock?: () => number;
}): ExpiringMapCache<TKey, TValue> {
const cache = new Map<TKey, ExpiringMapCacheEntry<TValue>>();
const now = options.clock ?? Date.now;
let lastPruneAt = 0;
function getTtlMs(): number {
return Math.max(0, Math.floor(resolveCacheNumeric(options.ttlMs)));
}
function maybePruneExpiredEntries(nowMs: number, ttlMs: number): void {
if (!isCacheEnabled(ttlMs)) {
return;
}
if (nowMs - lastPruneAt < resolvePruneIntervalMs(ttlMs, options.pruneIntervalMs)) {
return;
}
for (const [key, entry] of cache.entries()) {
if (isCacheEntryExpired(entry.storedAt, nowMs, ttlMs)) {
cache.delete(key);
}
}
lastPruneAt = nowMs;
}
return {
get: (key) => {
const ttlMs = getTtlMs();
if (!isCacheEnabled(ttlMs)) {
return undefined;
}
const nowMs = now();
maybePruneExpiredEntries(nowMs, ttlMs);
const entry = cache.get(key);
if (!entry) {
return undefined;
}
if (isCacheEntryExpired(entry.storedAt, nowMs, ttlMs)) {
cache.delete(key);
return undefined;
}
return entry.value;
},
set: (key, value) => {
const ttlMs = getTtlMs();
if (!isCacheEnabled(ttlMs)) {
return;
}
const nowMs = now();
maybePruneExpiredEntries(nowMs, ttlMs);
cache.set(key, {
storedAt: nowMs,
value,
});
},
delete: (key) => {
cache.delete(key);
},
clear: () => {
cache.clear();
lastPruneAt = 0;
},
keys: () => [...cache.keys()],
size: () => cache.size,
pruneExpired: () => {
const ttlMs = getTtlMs();
if (!isCacheEnabled(ttlMs)) {
return;
}
const nowMs = now();
for (const [key, entry] of cache.entries()) {
if (isCacheEntryExpired(entry.storedAt, nowMs, ttlMs)) {
cache.delete(key);
}
}
lastPruneAt = nowMs;
},
};
}
export type FileStatSnapshot = {
mtimeMs: number;
sizeBytes: number;

View File

@@ -1,17 +1,31 @@
import { createExpiringMapCache, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js";
import type { SessionEntry } from "./types.js";
type SessionStoreCacheEntry = {
store: Record<string, SessionEntry>;
loadedAt: number;
storePath: string;
mtimeMs?: number;
sizeBytes?: number;
serialized?: string;
};
const SESSION_STORE_CACHE = new Map<string, SessionStoreCacheEntry>();
const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s)
const SESSION_STORE_CACHE = createExpiringMapCache<string, SessionStoreCacheEntry>({
ttlMs: getSessionStoreTtl,
});
const SESSION_STORE_SERIALIZED_CACHE = new Map<string, string>();
export function getSessionStoreTtl(): number {
return resolveCacheTtlMs({
envValue: process.env.OPENCLAW_SESSION_CACHE_TTL_MS,
defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS,
});
}
export function isSessionStoreCacheEnabled(): boolean {
return isCacheEnabled(getSessionStoreTtl());
}
export function clearSessionStoreCaches(): void {
SESSION_STORE_CACHE.clear();
SESSION_STORE_SERIALIZED_CACHE.clear();
@@ -40,7 +54,6 @@ export function dropSessionStoreObjectCache(storePath: string): void {
export function readSessionStoreCache(params: {
storePath: string;
ttlMs: number;
mtimeMs?: number;
sizeBytes?: number;
}): Record<string, SessionEntry> | null {
@@ -48,11 +61,6 @@ export function readSessionStoreCache(params: {
if (!cached) {
return null;
}
const now = Date.now();
if (now - cached.loadedAt > params.ttlMs) {
invalidateSessionStoreCache(params.storePath);
return null;
}
if (params.mtimeMs !== cached.mtimeMs || params.sizeBytes !== cached.sizeBytes) {
invalidateSessionStoreCache(params.storePath);
return null;
@@ -69,8 +77,6 @@ export function writeSessionStoreCache(params: {
}): void {
SESSION_STORE_CACHE.set(params.storePath, {
store: structuredClone(params.store),
loadedAt: Date.now(),
storePath: params.storePath,
mtimeMs: params.mtimeMs,
sizeBytes: params.sizeBytes,
serialized: params.serialized,

View File

@@ -15,13 +15,14 @@ import {
normalizeSessionDeliveryFields,
type DeliveryContext,
} from "../../utils/delivery-context.js";
import { getFileStatSnapshot, isCacheEnabled, resolveCacheTtlMs } from "../cache-utils.js";
import { getFileStatSnapshot } from "../cache-utils.js";
import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js";
import { deriveSessionMetaPatch } from "./metadata.js";
import {
clearSessionStoreCaches,
dropSessionStoreObjectCache,
getSerializedSessionStore,
isSessionStoreCacheEnabled,
readSessionStoreCache,
setSerializedSessionStore,
writeSessionStoreCache,
@@ -45,27 +46,10 @@ import {
const log = createSubsystemLogger("sessions/store");
// ============================================================================
// Session Store Cache with TTL Support
// ============================================================================
const DEFAULT_SESSION_STORE_TTL_MS = 45_000; // 45 seconds (between 30-60s)
function isSessionStoreRecord(value: unknown): value is Record<string, SessionEntry> {
return !!value && typeof value === "object" && !Array.isArray(value);
}
function getSessionStoreTtl(): number {
return resolveCacheTtlMs({
envValue: process.env.OPENCLAW_SESSION_CACHE_TTL_MS,
defaultTtlMs: DEFAULT_SESSION_STORE_TTL_MS,
});
}
function isSessionStoreCacheEnabled(): boolean {
return isCacheEnabled(getSessionStoreTtl());
}
function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry {
const normalized = normalizeSessionDeliveryFields({
channel: entry.channel,
@@ -201,7 +185,6 @@ export function loadSessionStore(
const currentFileStat = getFileStatSnapshot(storePath);
const cached = readSessionStoreCache({
storePath,
ttlMs: getSessionStoreTtl(),
mtimeMs: currentFileStat?.mtimeMs,
sizeBytes: currentFileStat?.sizeBytes,
});