From eca0809a6ddcbd75a1af4204b1f445c31d47ab2c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Mar 2026 22:08:33 +0000 Subject: [PATCH] refactor: convert session manager cache to factory --- .../session-manager-cache.test.ts | 53 +++---- .../session-manager-cache.ts | 142 +++++++++--------- 2 files changed, 89 insertions(+), 106 deletions(-) diff --git a/src/agents/pi-embedded-runner/session-manager-cache.test.ts b/src/agents/pi-embedded-runner/session-manager-cache.test.ts index f4ac51697e9..4ec2d159f03 100644 --- a/src/agents/pi-embedded-runner/session-manager-cache.test.ts +++ b/src/agents/pi-embedded-runner/session-manager-cache.test.ts @@ -1,40 +1,31 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { importFreshModule } from "../../../test/helpers/import-fresh.js"; - -type SessionManagerCacheModule = typeof import("./session-manager-cache.js"); +import { describe, expect, it } from "vitest"; +import { createSessionManagerCache } from "./session-manager-cache.js"; describe("session manager cache", () => { - let savedSessionManagerTtl: string | undefined; + it("prunes expired entries during later cache activity even without revisiting them", () => { + let now = 1_000; + const cache = createSessionManagerCache({ + clock: () => now, + ttlMs: 5_000, + }); - beforeEach(() => { - savedSessionManagerTtl = process.env.OPENCLAW_SESSION_MANAGER_CACHE_TTL_MS; - process.env.OPENCLAW_SESSION_MANAGER_CACHE_TTL_MS = "5000"; - vi.useFakeTimers(); - vi.setSystemTime(new Date("2026-03-22T00:00:00Z")); - }); - - afterEach(() => { - if (savedSessionManagerTtl === undefined) { - delete process.env.OPENCLAW_SESSION_MANAGER_CACHE_TTL_MS; - } else { - process.env.OPENCLAW_SESSION_MANAGER_CACHE_TTL_MS = savedSessionManagerTtl; - } - vi.useRealTimers(); - }); - - it("prunes expired entries during later cache activity even without revisiting them", async () => { - const cache = await importFreshModule( - import.meta.url, - "./session-manager-cache.js?session-manager-cache-prune-on-access", - ); - - cache.__testing.resetSessionManagerCache(); cache.trackSessionManagerAccess("/tmp/stale-session.jsonl"); - expect(cache.__testing.getSessionManagerCacheKeys()).toEqual(["/tmp/stale-session.jsonl"]); + expect(cache.keys()).toEqual(["/tmp/stale-session.jsonl"]); - await vi.advanceTimersByTimeAsync(6_000); + now = 7_000; cache.trackSessionManagerAccess("/tmp/fresh-session.jsonl"); - expect(cache.__testing.getSessionManagerCacheKeys()).toEqual(["/tmp/fresh-session.jsonl"]); + expect(cache.keys()).toEqual(["/tmp/fresh-session.jsonl"]); + }); + + it("can disable caching via the injected TTL resolver", () => { + const cache = createSessionManagerCache({ + ttlMs: 0, + }); + + cache.trackSessionManagerAccess("/tmp/session.jsonl"); + + expect(cache.isSessionManagerCached("/tmp/session.jsonl")).toBe(false); + expect(cache.keys()).toEqual([]); }); }); diff --git a/src/agents/pi-embedded-runner/session-manager-cache.ts b/src/agents/pi-embedded-runner/session-manager-cache.ts index 5804c2ffac3..800f89c1573 100644 --- a/src/agents/pi-embedded-runner/session-manager-cache.ts +++ b/src/agents/pi-embedded-runner/session-manager-cache.ts @@ -1,19 +1,15 @@ import { Buffer } from "node:buffer"; import fs from "node:fs/promises"; -import { isCacheEnabled, resolveCacheTtlMs } from "../../config/cache-utils.js"; +import { + createExpiringMapCache, + isCacheEnabled, + resolveCacheTtlMs, +} from "../../config/cache-utils.js"; -type SessionManagerCacheEntry = { - sessionFile: string; - loadedAt: number; -}; - -const SESSION_MANAGER_CACHE = new Map(); const DEFAULT_SESSION_MANAGER_TTL_MS = 45_000; // 45 seconds const MIN_SESSION_MANAGER_CACHE_PRUNE_INTERVAL_MS = 1_000; const MAX_SESSION_MANAGER_CACHE_PRUNE_INTERVAL_MS = 30_000; -let lastSessionManagerCachePruneAt = 0; - function getSessionManagerTtl(): number { return resolveCacheTtlMs({ envValue: process.env.OPENCLAW_SESSION_MANAGER_CACHE_TTL_MS, @@ -21,10 +17,6 @@ function getSessionManagerTtl(): number { }); } -function isSessionManagerCacheEnabled(): boolean { - return isCacheEnabled(getSessionManagerTtl()); -} - function resolveSessionManagerCachePruneInterval(ttlMs: number): number { return Math.min( Math.max(ttlMs, MIN_SESSION_MANAGER_CACHE_PRUNE_INTERVAL_MS), @@ -32,74 +24,74 @@ function resolveSessionManagerCachePruneInterval(ttlMs: number): number { ); } -function maybePruneExpiredSessionManagerCache(now: number, ttlMs: number): void { - if (now - lastSessionManagerCachePruneAt < resolveSessionManagerCachePruneInterval(ttlMs)) { - return; - } - for (const [sessionFile, entry] of SESSION_MANAGER_CACHE.entries()) { - if (now - entry.loadedAt > ttlMs) { - SESSION_MANAGER_CACHE.delete(sessionFile); - } - } - lastSessionManagerCachePruneAt = now; +export type SessionManagerCache = { + clear: () => void; + isSessionManagerCached: (sessionFile: string) => boolean; + keys: () => string[]; + prewarmSessionFile: (sessionFile: string) => Promise; + trackSessionManagerAccess: (sessionFile: string) => void; +}; + +export function createSessionManagerCache(options?: { + clock?: () => number; + fsModule?: Pick; + ttlMs?: number | (() => number); +}): SessionManagerCache { + const getTtlMs = () => + typeof options?.ttlMs === "function" + ? options.ttlMs() + : (options?.ttlMs ?? getSessionManagerTtl()); + const cache = createExpiringMapCache({ + ttlMs: getTtlMs, + pruneIntervalMs: resolveSessionManagerCachePruneInterval, + clock: options?.clock, + }); + const fsModule = options?.fsModule ?? fs; + + return { + clear: () => { + cache.clear(); + }, + isSessionManagerCached: (sessionFile) => cache.get(sessionFile) === true, + keys: () => cache.keys(), + prewarmSessionFile: async (sessionFile) => { + if (!isCacheEnabled(getTtlMs())) { + return; + } + if (cache.get(sessionFile) === true) { + return; + } + + try { + // Read a small chunk to encourage OS page cache warmup. + const handle = await fsModule.open(sessionFile, "r"); + try { + const buffer = Buffer.alloc(4096); + await handle.read(buffer, 0, buffer.length, 0); + } finally { + await handle.close(); + } + cache.set(sessionFile, true); + } catch { + // File doesn't exist yet, SessionManager will create it + } + }, + trackSessionManagerAccess: (sessionFile) => { + cache.set(sessionFile, true); + }, + }; } +const sessionManagerCache = createSessionManagerCache(); + export function trackSessionManagerAccess(sessionFile: string): void { - const ttl = getSessionManagerTtl(); - if (!isCacheEnabled(ttl)) { - return; - } - const now = Date.now(); - maybePruneExpiredSessionManagerCache(now, ttl); - SESSION_MANAGER_CACHE.set(sessionFile, { - sessionFile, - loadedAt: now, - }); + sessionManagerCache.trackSessionManagerAccess(sessionFile); } -function isSessionManagerCached(sessionFile: string): boolean { - const ttl = getSessionManagerTtl(); - if (!isCacheEnabled(ttl)) { - return false; - } - const now = Date.now(); - maybePruneExpiredSessionManagerCache(now, ttl); - const entry = SESSION_MANAGER_CACHE.get(sessionFile); - if (!entry) { - return false; - } - return now - entry.loadedAt <= ttl; +export function isSessionManagerCached(sessionFile: string): boolean { + return sessionManagerCache.isSessionManagerCached(sessionFile); } export async function prewarmSessionFile(sessionFile: string): Promise { - if (!isSessionManagerCacheEnabled()) { - return; - } - if (isSessionManagerCached(sessionFile)) { - return; - } - - try { - // Read a small chunk to encourage OS page cache warmup. - const handle = await fs.open(sessionFile, "r"); - try { - const buffer = Buffer.alloc(4096); - await handle.read(buffer, 0, buffer.length, 0); - } finally { - await handle.close(); - } - trackSessionManagerAccess(sessionFile); - } catch { - // File doesn't exist yet, SessionManager will create it - } + await sessionManagerCache.prewarmSessionFile(sessionFile); } - -export const __testing = { - getSessionManagerCacheKeys(): string[] { - return [...SESSION_MANAGER_CACHE.keys()]; - }, - resetSessionManagerCache(): void { - SESSION_MANAGER_CACHE.clear(); - lastSessionManagerCachePruneAt = 0; - }, -};