From 18e7d28b2179604187cdee664baffe3b9dca2197 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 30 May 2026 17:30:34 +0100 Subject: [PATCH] perf(gateway): reuse stable turn metadata --- .../src/app-server/startup-binding.test.ts | 25 +++++++++++ .../codex/src/app-server/startup-binding.ts | 39 ++++++++++++++-- .../command/attempt-execution.shared.ts | 6 ++- .../reply/commands-session-store.ts | 35 ++++++++++----- src/auto-reply/reply/session-updates.ts | 15 +++++-- src/config/sessions/sessions.test.ts | 43 ++++++++++++++++++ src/config/sessions/store.ts | 44 +++++++++++++++++-- src/plugin-sdk/facade-loader.ts | 15 ++----- .../plugin-metadata-snapshot.memo.test.ts | 31 ++++++++++++- src/plugins/plugin-metadata-snapshot.ts | 29 ++++++++++++ 10 files changed, 248 insertions(+), 34 deletions(-) diff --git a/extensions/codex/src/app-server/startup-binding.test.ts b/extensions/codex/src/app-server/startup-binding.test.ts index 0817a44c354..dcc4cb59c04 100644 --- a/extensions/codex/src/app-server/startup-binding.test.ts +++ b/extensions/codex/src/app-server/startup-binding.test.ts @@ -77,6 +77,31 @@ describe("Codex app-server startup binding", () => { expect(savedBinding?.threadId).toBe("thread-existing"); }); + it("reuses the session record cache while sessions.json is unchanged", async () => { + const sessionFile = path.join(tempDir, "session.jsonl"); + const workspaceDir = path.join(tempDir, "workspace"); + const agentDir = path.join(tempDir, "agent"); + await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" }); + await writeSessionRecord(sessionFile, { totalTokens: 12_000 }); + const sessionsJson = path.join(path.dirname(sessionFile), "sessions.json"); + const readFileSpy = vi.spyOn(fs, "readFile"); + + for (let i = 0; i < 2; i += 1) { + const binding = await rotateOversizedCodexAppServerStartupBinding({ + binding: await readCodexAppServerBinding(sessionFile), + sessionFile, + agentDir, + config: undefined, + }); + expect(binding?.threadId).toBe("thread-existing"); + } + + const sessionStoreReads = readFileSpy.mock.calls.filter( + ([file]) => typeof file === "string" && file === sessionsJson, + ); + expect(sessionStoreReads).toHaveLength(1); + }); + it("checks native rollout token pressure under default compaction config", async () => { const sessionFile = path.join(tempDir, "session.jsonl"); const workspaceDir = path.join(tempDir, "workspace"); diff --git a/extensions/codex/src/app-server/startup-binding.ts b/extensions/codex/src/app-server/startup-binding.ts index da568e562e0..46c175c29ae 100644 --- a/extensions/codex/src/app-server/startup-binding.ts +++ b/extensions/codex/src/app-server/startup-binding.ts @@ -30,6 +30,14 @@ const CODEX_APP_SERVER_BYTE_UNITS: Record = { tb: 1024 * 1024 * 1024 * 1024, tib: 1024 * 1024 * 1024 * 1024, }; +type CodexSessionRecordCacheEntry = { + sessionsFile: string; + mtimeMs: number; + size: number; + record: (Record & { sessionKey: string }) | undefined; +}; + +const codexSessionRecordCache = new Map(); function parseCodexAppServerByteLimit(value: unknown): number | undefined { if (typeof value === "number" && Number.isFinite(value) && value > 0) { @@ -112,16 +120,34 @@ async function readCodexSessionRecordForSessionFile( sessionFile: string, ): Promise<(Record & { sessionKey: string }) | undefined> { const sessionsFile = path.join(path.dirname(sessionFile), "sessions.json"); + const resolvedSessionFile = path.resolve(sessionFile); + let stat: Awaited>; + try { + stat = await fs.stat(sessionsFile); + } catch { + codexSessionRecordCache.delete(resolvedSessionFile); + return undefined; + } + const cached = codexSessionRecordCache.get(resolvedSessionFile); + if ( + cached?.sessionsFile === sessionsFile && + cached.mtimeMs === stat.mtimeMs && + cached.size === stat.size + ) { + return cached.record; + } let store: JsonValue | undefined; try { store = JSON.parse(await fs.readFile(sessionsFile, "utf8")) as JsonValue; } catch { + codexSessionRecordCache.delete(resolvedSessionFile); return undefined; } if (!isJsonObject(store)) { + codexSessionRecordCache.delete(resolvedSessionFile); return undefined; } - const resolvedSessionFile = path.resolve(sessionFile); + let found: (Record & { sessionKey: string }) | undefined; for (const [sessionKey, record] of Object.entries(store)) { if (!isJsonObject(record) || typeof record.sessionFile !== "string") { continue; @@ -129,9 +155,16 @@ async function readCodexSessionRecordForSessionFile( if (path.resolve(record.sessionFile) !== resolvedSessionFile) { continue; } - return { sessionKey, ...record }; + found = { sessionKey, ...record }; + break; } - return undefined; + codexSessionRecordCache.set(resolvedSessionFile, { + sessionsFile, + mtimeMs: stat.mtimeMs, + size: stat.size, + record: found, + }); + return found; } type CodexAppServerRolloutTokenSnapshot = { diff --git a/src/agents/command/attempt-execution.shared.ts b/src/agents/command/attempt-execution.shared.ts index 66d4a525026..992a11a434b 100644 --- a/src/agents/command/attempt-execution.shared.ts +++ b/src/agents/command/attempt-execution.shared.ts @@ -38,7 +38,11 @@ export async function persistSessionEntry( store[params.sessionKey] = merged; return merged; }, - { takeCacheOwnership: true }, + { + resolveSingleEntryPersistence: (entry) => + entry ? { sessionKey: params.sessionKey, entry } : null, + takeCacheOwnership: true, + }, ); if (persisted) { params.sessionStore[params.sessionKey] = persisted; diff --git a/src/auto-reply/reply/commands-session-store.ts b/src/auto-reply/reply/commands-session-store.ts index 132d88bdad4..96de4f53a70 100644 --- a/src/auto-reply/reply/commands-session-store.ts +++ b/src/auto-reply/reply/commands-session-store.ts @@ -19,8 +19,13 @@ export async function persistSessionEntry(params: CommandParams): Promise { store[params.sessionKey] = params.sessionEntry as SessionEntry; + return params.sessionEntry as SessionEntry; + }, + { + resolveSingleEntryPersistence: (entry) => + entry ? { sessionKey: params.sessionKey, entry } : null, + skipMaintenance: true, }, - { skipMaintenance: true }, ); } return true; @@ -44,16 +49,24 @@ export async function persistAbortTargetEntry(params: { sessionStore[key] = entry; if (storePath) { - await updateSessionStore(storePath, (store) => { - const nextEntry = store[key] ?? entry; - if (!nextEntry) { - return; - } - nextEntry.abortedLastRun = true; - applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); - nextEntry.updatedAt = Date.now(); - store[key] = nextEntry; - }); + await updateSessionStore( + storePath, + (store) => { + const nextEntry = store[key] ?? entry; + if (!nextEntry) { + return undefined; + } + nextEntry.abortedLastRun = true; + applyAbortCutoffToSessionEntry(nextEntry, abortCutoff); + nextEntry.updatedAt = Date.now(); + store[key] = nextEntry; + return nextEntry; + }, + { + resolveSingleEntryPersistence: (updated) => + updated ? { sessionKey: key, entry: updated } : null, + }, + ); } return true; diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 3ebcd23e8b1..cb230ac9734 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -42,9 +42,18 @@ async function persistSessionEntryUpdate(params: { if (!params.storePath) { return; } - await updateSessionStore(params.storePath, (store) => { - store[params.sessionKey!] = { ...store[params.sessionKey!], ...params.nextEntry }; - }); + await updateSessionStore( + params.storePath, + (store) => { + const next = { ...store[params.sessionKey!], ...params.nextEntry }; + store[params.sessionKey!] = next; + return next; + }, + { + resolveSingleEntryPersistence: (entry) => + entry && params.sessionKey ? { sessionKey: params.sessionKey, entry } : null, + }, + ); } function emitCompactionSessionLifecycleHooks(params: { diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index 18d6e568795..49fa7e9b731 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -756,6 +756,49 @@ describe("session store writer queue", () => { writeSpy.mockRestore(); }); + it("can persist a known single entry without touching hydrated prompts from other sessions", async () => { + const key = "agent:main:single-entry"; + const otherKey = "agent:main:other-entry"; + const otherPrompt = `\n${"other prompt\n".repeat(200)}`; + const { dir, storePath } = await makeTmpStore({ + [key]: { sessionId: "s-single-entry", updatedAt: Date.now(), counter: 0 }, + [otherKey]: { + sessionId: "s-other-entry", + updatedAt: Date.now(), + skillsSnapshot: { + prompt: otherPrompt, + skills: [{ name: "demo" }], + version: 1, + }, + }, + }); + loadSessionStore(storePath); + await updateSessionStore(storePath, () => undefined, { skipMaintenance: true }); + const before = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record; + const beforeOtherEntry = before[otherKey]; + + await updateSessionStore( + storePath, + (store) => { + const next = { ...store[key], counter: 1 } as SessionEntry; + store[key] = next; + return next; + }, + { + resolveSingleEntryPersistence: (entry) => ({ sessionKey: key, entry }), + skipMaintenance: true, + }, + ); + + const persisted = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record< + string, + SessionEntry + >; + expect((persisted[key] as Record | undefined)?.counter).toBe(1); + expect(persisted[otherKey]).toStrictEqual(beforeOtherEntry); + expect(fs.existsSync(path.join(dir, "skills-prompts"))).toBe(true); + }); + it("multiple consecutive errors do not permanently poison the queue", async () => { const key = "agent:main:multi-err"; const { storePath } = await makeTmpStore({ diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index e8883b2f4d5..15b9e6da2dd 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -89,6 +89,12 @@ const writerStoreFileStats = new WeakMap< Record, ReturnType | null >(); +let serializedPromptRefKeyCache: + | { + serialized: string; + keys: Set; + } + | undefined; function loadSessionArchiveRuntime() { sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js"); @@ -365,12 +371,40 @@ function buildSingleEntrySerializedStore(params: { }; } -function storeHasUntouchedHydratedSkillPrompts( +function collectSerializedPromptRefKeys(serialized: string): Set { + if (serializedPromptRefKeyCache?.serialized === serialized) { + return serializedPromptRefKeyCache.keys; + } + const keys = new Set(); + try { + const parsed = JSON.parse(serialized) as Record; + for (const [key, entry] of Object.entries(parsed)) { + if (entry?.skillsSnapshot?.promptRef) { + keys.add(key); + } + } + } catch { + // Malformed serialized cache cannot prove prompt refs are already durable. + } + serializedPromptRefKeyCache = { serialized, keys }; + return keys; +} + +function storeHasUnsafeUntouchedHydratedSkillPrompts( + storePath: string, store: Record, changedSessionKey: string, ): boolean { + const currentSerialized = getSerializedSessionStore(storePath); + const serializedPromptRefKeys = currentSerialized + ? collectSerializedPromptRefKeys(currentSerialized) + : undefined; for (const [key, entry] of Object.entries(store)) { - if (key !== changedSessionKey && typeof entry.skillsSnapshot?.prompt === "string") { + if ( + key !== changedSessionKey && + typeof entry.skillsSnapshot?.prompt === "string" && + !serializedPromptRefKeys?.has(key) + ) { return true; } } @@ -620,7 +654,11 @@ async function saveSessionStoreUnlocked( if ( opts?.singleEntryPersistence && !maintenanceChangedStore && - !storeHasUntouchedHydratedSkillPrompts(store, opts.singleEntryPersistence.sessionKey) + !storeHasUnsafeUntouchedHydratedSkillPrompts( + storePath, + store, + opts.singleEntryPersistence.sessionKey, + ) ) { const normalizedEntry = store[opts.singleEntryPersistence.sessionKey]; const singleEntrySerialized = buildSingleEntrySerializedStore({ diff --git a/src/plugin-sdk/facade-loader.ts b/src/plugin-sdk/facade-loader.ts index 46ed403f187..63a019a8fc3 100644 --- a/src/plugin-sdk/facade-loader.ts +++ b/src/plugin-sdk/facade-loader.ts @@ -1,5 +1,4 @@ import fs from "node:fs"; -import { createRequire } from "node:module"; import path from "node:path"; import { fileURLToPath, pathToFileURL } from "node:url"; import { openRootFileSync } from "../infra/boundary-file-read.js"; @@ -14,22 +13,12 @@ import { resolveBundledFacadeModuleLocation } from "./facade-resolution-shared.j const CURRENT_MODULE_PATH = fileURLToPath(import.meta.url); -const nodeRequire = createRequire(import.meta.url); const moduleLoaders: PluginModuleLoaderCache = new Map(); const loadedFacadeModules = new Map(); const loadedFacadePluginIds = new Set(); let facadeLoaderSourceTransformFactory: PluginModuleLoaderFactory | undefined; let cachedOpenClawPackageRoot: string | undefined; -function getSourceTransformFactory() { - if (facadeLoaderSourceTransformFactory) { - return facadeLoaderSourceTransformFactory; - } - const { createJiti } = nodeRequire("jiti") as typeof import("jiti"); - facadeLoaderSourceTransformFactory = createJiti; - return facadeLoaderSourceTransformFactory; -} - function getOpenClawPackageRoot() { if (cachedOpenClawPackageRoot) { return cachedOpenClawPackageRoot; @@ -63,7 +52,9 @@ function getModuleLoader(modulePath: string) { importerUrl: import.meta.url, preferBuiltDist: true, loaderFilename: import.meta.url, - createLoader: getSourceTransformFactory(), + ...(facadeLoaderSourceTransformFactory + ? { createLoader: facadeLoaderSourceTransformFactory } + : {}), }); } diff --git a/src/plugins/plugin-metadata-snapshot.memo.test.ts b/src/plugins/plugin-metadata-snapshot.memo.test.ts index 60b0ee713d3..ed7134bf767 100644 --- a/src/plugins/plugin-metadata-snapshot.memo.test.ts +++ b/src/plugins/plugin-metadata-snapshot.memo.test.ts @@ -244,6 +244,30 @@ describe("loadPluginMetadataSnapshot process memo", () => { expect(second.byPluginId.get("demo")).toBe(second.plugins[0]); }); + it("skips persisted registry filesystem fingerprints after a process memo hit", () => { + const stateDir = tempStateDir(); + touchPersistedIndex(stateDir); + loadPluginRegistrySnapshotWithMetadata.mockReturnValue({ + source: "persisted", + snapshot: makeIndex(), + diagnostics: [], + }); + + const first = loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir }); + const statSpy = vi.spyOn(fs, "statSync"); + const readdirSpy = vi.spyOn(fs, "readdirSync"); + try { + const second = loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir }); + + expect(second).toBe(first); + expect(statSpy).not.toHaveBeenCalled(); + expect(readdirSpy).not.toHaveBeenCalled(); + } finally { + statSpy.mockRestore(); + readdirSpy.mockRestore(); + } + }); + it("clears the process memo at plugin metadata lifecycle boundaries", () => { const stateDir = tempStateDir(); touchPersistedIndex(stateDir); @@ -481,7 +505,7 @@ describe("loadPluginMetadataSnapshot process memo", () => { expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledOnce(); }); - it("refreshes when the persisted registry file changes", () => { + it("keeps persisted registry snapshots process-stable until lifecycle clear", () => { const stateDir = tempStateDir(); touchPersistedIndex(stateDir, 1); loadPluginRegistrySnapshotWithMetadata.mockReturnValue({ @@ -494,6 +518,11 @@ describe("loadPluginMetadataSnapshot process memo", () => { touchPersistedIndex(stateDir, 22); loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir }); + expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledOnce(); + + clearPluginMetadataLifecycleCaches(); + loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir }); + expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledTimes(2); }); diff --git a/src/plugins/plugin-metadata-snapshot.ts b/src/plugins/plugin-metadata-snapshot.ts index a3c1b343bdb..791e14cca78 100644 --- a/src/plugins/plugin-metadata-snapshot.ts +++ b/src/plugins/plugin-metadata-snapshot.ts @@ -37,6 +37,7 @@ import { type PluginMetadataSnapshotMemo = { key: string; + lookupContextHash: string; registryState?: PersistedRegistryMemoState; snapshot: PluginMetadataSnapshot; }; @@ -231,6 +232,18 @@ function resolvePersistedRegistryMemoContextHash(params: { }); } +function resolvePersistedRegistryMemoLookupContextHash(params: { + env: NodeJS.ProcessEnv; + preferPersisted?: boolean; + stateDir?: string; +}): string { + return hashJson({ + env: pickMemoRelevantEnv(params.env), + preferPersisted: params.preferPersisted ?? null, + stateDir: params.stateDir ?? null, + }); +} + function resolvePersistedRegistryMemoState(params: { env: NodeJS.ProcessEnv; index?: InstalledPluginIndex; @@ -273,6 +286,15 @@ function resolvePersistedRegistryMemoStateForLookup( }, memos: readonly PluginMetadataSnapshotMemo[], ): PersistedRegistryMemoState { + const lookupContextHash = resolvePersistedRegistryMemoLookupContextHash(params); + for (const memo of memos) { + if (memo.lookupContextHash === lookupContextHash && memo.registryState) { + // Gateway runtime metadata is process-stable. Installs/reloads clear the + // memo lifecycle explicitly, so hot lookups can reuse the prepared + // registry stamp instead of re-statting plugin roots on every turn. + return memo.registryState; + } + } const fastFingerprint = resolvePersistedRegistryFastMemoFingerprint(params); const fastHash = hashJson(fastFingerprint); const contextHash = resolvePersistedRegistryMemoContextHash({ @@ -581,6 +603,13 @@ export function loadPluginMetadataSnapshot( : registryState; rememberPluginMetadataSnapshotMemo({ key: computePluginMetadataSnapshotMemoKey({ params, registryState: cachedRegistryState }), + lookupContextHash: resolvePersistedRegistryMemoLookupContextHash({ + env, + ...(params.stateDir ? { stateDir: resolveUserPath(params.stateDir, env) } : {}), + ...(params.preferPersisted !== undefined + ? { preferPersisted: params.preferPersisted } + : {}), + }), registryState: cachedRegistryState, snapshot, });