mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 15:14:10 +00:00
perf(gateway): reuse stable turn metadata
This commit is contained in:
@@ -77,6 +77,31 @@ describe("Codex app-server startup binding", () => {
|
||||
expect(savedBinding?.threadId).toBe("thread-existing");
|
||||
});
|
||||
|
||||
it("reuses the session record cache while sessions.json is unchanged", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
const agentDir = path.join(tempDir, "agent");
|
||||
await writeExistingBinding(sessionFile, workspaceDir, { dynamicToolsFingerprint: "[]" });
|
||||
await writeSessionRecord(sessionFile, { totalTokens: 12_000 });
|
||||
const sessionsJson = path.join(path.dirname(sessionFile), "sessions.json");
|
||||
const readFileSpy = vi.spyOn(fs, "readFile");
|
||||
|
||||
for (let i = 0; i < 2; i += 1) {
|
||||
const binding = await rotateOversizedCodexAppServerStartupBinding({
|
||||
binding: await readCodexAppServerBinding(sessionFile),
|
||||
sessionFile,
|
||||
agentDir,
|
||||
config: undefined,
|
||||
});
|
||||
expect(binding?.threadId).toBe("thread-existing");
|
||||
}
|
||||
|
||||
const sessionStoreReads = readFileSpy.mock.calls.filter(
|
||||
([file]) => typeof file === "string" && file === sessionsJson,
|
||||
);
|
||||
expect(sessionStoreReads).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("checks native rollout token pressure under default compaction config", async () => {
|
||||
const sessionFile = path.join(tempDir, "session.jsonl");
|
||||
const workspaceDir = path.join(tempDir, "workspace");
|
||||
|
||||
@@ -30,6 +30,14 @@ const CODEX_APP_SERVER_BYTE_UNITS: Record<string, number> = {
|
||||
tb: 1024 * 1024 * 1024 * 1024,
|
||||
tib: 1024 * 1024 * 1024 * 1024,
|
||||
};
|
||||
type CodexSessionRecordCacheEntry = {
|
||||
sessionsFile: string;
|
||||
mtimeMs: number;
|
||||
size: number;
|
||||
record: (Record<string, unknown> & { sessionKey: string }) | undefined;
|
||||
};
|
||||
|
||||
const codexSessionRecordCache = new Map<string, CodexSessionRecordCacheEntry>();
|
||||
|
||||
function parseCodexAppServerByteLimit(value: unknown): number | undefined {
|
||||
if (typeof value === "number" && Number.isFinite(value) && value > 0) {
|
||||
@@ -112,16 +120,34 @@ async function readCodexSessionRecordForSessionFile(
|
||||
sessionFile: string,
|
||||
): Promise<(Record<string, unknown> & { sessionKey: string }) | undefined> {
|
||||
const sessionsFile = path.join(path.dirname(sessionFile), "sessions.json");
|
||||
const resolvedSessionFile = path.resolve(sessionFile);
|
||||
let stat: Awaited<ReturnType<typeof fs.stat>>;
|
||||
try {
|
||||
stat = await fs.stat(sessionsFile);
|
||||
} catch {
|
||||
codexSessionRecordCache.delete(resolvedSessionFile);
|
||||
return undefined;
|
||||
}
|
||||
const cached = codexSessionRecordCache.get(resolvedSessionFile);
|
||||
if (
|
||||
cached?.sessionsFile === sessionsFile &&
|
||||
cached.mtimeMs === stat.mtimeMs &&
|
||||
cached.size === stat.size
|
||||
) {
|
||||
return cached.record;
|
||||
}
|
||||
let store: JsonValue | undefined;
|
||||
try {
|
||||
store = JSON.parse(await fs.readFile(sessionsFile, "utf8")) as JsonValue;
|
||||
} catch {
|
||||
codexSessionRecordCache.delete(resolvedSessionFile);
|
||||
return undefined;
|
||||
}
|
||||
if (!isJsonObject(store)) {
|
||||
codexSessionRecordCache.delete(resolvedSessionFile);
|
||||
return undefined;
|
||||
}
|
||||
const resolvedSessionFile = path.resolve(sessionFile);
|
||||
let found: (Record<string, unknown> & { sessionKey: string }) | undefined;
|
||||
for (const [sessionKey, record] of Object.entries(store)) {
|
||||
if (!isJsonObject(record) || typeof record.sessionFile !== "string") {
|
||||
continue;
|
||||
@@ -129,9 +155,16 @@ async function readCodexSessionRecordForSessionFile(
|
||||
if (path.resolve(record.sessionFile) !== resolvedSessionFile) {
|
||||
continue;
|
||||
}
|
||||
return { sessionKey, ...record };
|
||||
found = { sessionKey, ...record };
|
||||
break;
|
||||
}
|
||||
return undefined;
|
||||
codexSessionRecordCache.set(resolvedSessionFile, {
|
||||
sessionsFile,
|
||||
mtimeMs: stat.mtimeMs,
|
||||
size: stat.size,
|
||||
record: found,
|
||||
});
|
||||
return found;
|
||||
}
|
||||
|
||||
type CodexAppServerRolloutTokenSnapshot = {
|
||||
|
||||
@@ -38,7 +38,11 @@ export async function persistSessionEntry(
|
||||
store[params.sessionKey] = merged;
|
||||
return merged;
|
||||
},
|
||||
{ takeCacheOwnership: true },
|
||||
{
|
||||
resolveSingleEntryPersistence: (entry) =>
|
||||
entry ? { sessionKey: params.sessionKey, entry } : null,
|
||||
takeCacheOwnership: true,
|
||||
},
|
||||
);
|
||||
if (persisted) {
|
||||
params.sessionStore[params.sessionKey] = persisted;
|
||||
|
||||
@@ -19,8 +19,13 @@ export async function persistSessionEntry(params: CommandParams): Promise<boolea
|
||||
params.storePath,
|
||||
(store) => {
|
||||
store[params.sessionKey] = params.sessionEntry as SessionEntry;
|
||||
return params.sessionEntry as SessionEntry;
|
||||
},
|
||||
{
|
||||
resolveSingleEntryPersistence: (entry) =>
|
||||
entry ? { sessionKey: params.sessionKey, entry } : null,
|
||||
skipMaintenance: true,
|
||||
},
|
||||
{ skipMaintenance: true },
|
||||
);
|
||||
}
|
||||
return true;
|
||||
@@ -44,16 +49,24 @@ export async function persistAbortTargetEntry(params: {
|
||||
sessionStore[key] = entry;
|
||||
|
||||
if (storePath) {
|
||||
await updateSessionStore(storePath, (store) => {
|
||||
const nextEntry = store[key] ?? entry;
|
||||
if (!nextEntry) {
|
||||
return;
|
||||
}
|
||||
nextEntry.abortedLastRun = true;
|
||||
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
|
||||
nextEntry.updatedAt = Date.now();
|
||||
store[key] = nextEntry;
|
||||
});
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
const nextEntry = store[key] ?? entry;
|
||||
if (!nextEntry) {
|
||||
return undefined;
|
||||
}
|
||||
nextEntry.abortedLastRun = true;
|
||||
applyAbortCutoffToSessionEntry(nextEntry, abortCutoff);
|
||||
nextEntry.updatedAt = Date.now();
|
||||
store[key] = nextEntry;
|
||||
return nextEntry;
|
||||
},
|
||||
{
|
||||
resolveSingleEntryPersistence: (updated) =>
|
||||
updated ? { sessionKey: key, entry: updated } : null,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
@@ -42,9 +42,18 @@ async function persistSessionEntryUpdate(params: {
|
||||
if (!params.storePath) {
|
||||
return;
|
||||
}
|
||||
await updateSessionStore(params.storePath, (store) => {
|
||||
store[params.sessionKey!] = { ...store[params.sessionKey!], ...params.nextEntry };
|
||||
});
|
||||
await updateSessionStore(
|
||||
params.storePath,
|
||||
(store) => {
|
||||
const next = { ...store[params.sessionKey!], ...params.nextEntry };
|
||||
store[params.sessionKey!] = next;
|
||||
return next;
|
||||
},
|
||||
{
|
||||
resolveSingleEntryPersistence: (entry) =>
|
||||
entry && params.sessionKey ? { sessionKey: params.sessionKey, entry } : null,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function emitCompactionSessionLifecycleHooks(params: {
|
||||
|
||||
@@ -756,6 +756,49 @@ describe("session store writer queue", () => {
|
||||
writeSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("can persist a known single entry without touching hydrated prompts from other sessions", async () => {
|
||||
const key = "agent:main:single-entry";
|
||||
const otherKey = "agent:main:other-entry";
|
||||
const otherPrompt = `<available_skills>\n${"other prompt\n".repeat(200)}</available_skills>`;
|
||||
const { dir, storePath } = await makeTmpStore({
|
||||
[key]: { sessionId: "s-single-entry", updatedAt: Date.now(), counter: 0 },
|
||||
[otherKey]: {
|
||||
sessionId: "s-other-entry",
|
||||
updatedAt: Date.now(),
|
||||
skillsSnapshot: {
|
||||
prompt: otherPrompt,
|
||||
skills: [{ name: "demo" }],
|
||||
version: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
loadSessionStore(storePath);
|
||||
await updateSessionStore(storePath, () => undefined, { skipMaintenance: true });
|
||||
const before = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<string, SessionEntry>;
|
||||
const beforeOtherEntry = before[otherKey];
|
||||
|
||||
await updateSessionStore(
|
||||
storePath,
|
||||
(store) => {
|
||||
const next = { ...store[key], counter: 1 } as SessionEntry;
|
||||
store[key] = next;
|
||||
return next;
|
||||
},
|
||||
{
|
||||
resolveSingleEntryPersistence: (entry) => ({ sessionKey: key, entry }),
|
||||
skipMaintenance: true,
|
||||
},
|
||||
);
|
||||
|
||||
const persisted = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<
|
||||
string,
|
||||
SessionEntry
|
||||
>;
|
||||
expect((persisted[key] as Record<string, unknown> | undefined)?.counter).toBe(1);
|
||||
expect(persisted[otherKey]).toStrictEqual(beforeOtherEntry);
|
||||
expect(fs.existsSync(path.join(dir, "skills-prompts"))).toBe(true);
|
||||
});
|
||||
|
||||
it("multiple consecutive errors do not permanently poison the queue", async () => {
|
||||
const key = "agent:main:multi-err";
|
||||
const { storePath } = await makeTmpStore({
|
||||
|
||||
@@ -89,6 +89,12 @@ const writerStoreFileStats = new WeakMap<
|
||||
Record<string, SessionEntry>,
|
||||
ReturnType<typeof getFileStatSnapshot> | null
|
||||
>();
|
||||
let serializedPromptRefKeyCache:
|
||||
| {
|
||||
serialized: string;
|
||||
keys: Set<string>;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
function loadSessionArchiveRuntime() {
|
||||
sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js");
|
||||
@@ -365,12 +371,40 @@ function buildSingleEntrySerializedStore(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function storeHasUntouchedHydratedSkillPrompts(
|
||||
function collectSerializedPromptRefKeys(serialized: string): Set<string> {
|
||||
if (serializedPromptRefKeyCache?.serialized === serialized) {
|
||||
return serializedPromptRefKeyCache.keys;
|
||||
}
|
||||
const keys = new Set<string>();
|
||||
try {
|
||||
const parsed = JSON.parse(serialized) as Record<string, SessionEntry>;
|
||||
for (const [key, entry] of Object.entries(parsed)) {
|
||||
if (entry?.skillsSnapshot?.promptRef) {
|
||||
keys.add(key);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Malformed serialized cache cannot prove prompt refs are already durable.
|
||||
}
|
||||
serializedPromptRefKeyCache = { serialized, keys };
|
||||
return keys;
|
||||
}
|
||||
|
||||
function storeHasUnsafeUntouchedHydratedSkillPrompts(
|
||||
storePath: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
changedSessionKey: string,
|
||||
): boolean {
|
||||
const currentSerialized = getSerializedSessionStore(storePath);
|
||||
const serializedPromptRefKeys = currentSerialized
|
||||
? collectSerializedPromptRefKeys(currentSerialized)
|
||||
: undefined;
|
||||
for (const [key, entry] of Object.entries(store)) {
|
||||
if (key !== changedSessionKey && typeof entry.skillsSnapshot?.prompt === "string") {
|
||||
if (
|
||||
key !== changedSessionKey &&
|
||||
typeof entry.skillsSnapshot?.prompt === "string" &&
|
||||
!serializedPromptRefKeys?.has(key)
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -620,7 +654,11 @@ async function saveSessionStoreUnlocked(
|
||||
if (
|
||||
opts?.singleEntryPersistence &&
|
||||
!maintenanceChangedStore &&
|
||||
!storeHasUntouchedHydratedSkillPrompts(store, opts.singleEntryPersistence.sessionKey)
|
||||
!storeHasUnsafeUntouchedHydratedSkillPrompts(
|
||||
storePath,
|
||||
store,
|
||||
opts.singleEntryPersistence.sessionKey,
|
||||
)
|
||||
) {
|
||||
const normalizedEntry = store[opts.singleEntryPersistence.sessionKey];
|
||||
const singleEntrySerialized = buildSingleEntrySerializedStore({
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import fs from "node:fs";
|
||||
import { createRequire } from "node:module";
|
||||
import path from "node:path";
|
||||
import { fileURLToPath, pathToFileURL } from "node:url";
|
||||
import { openRootFileSync } from "../infra/boundary-file-read.js";
|
||||
@@ -14,22 +13,12 @@ import { resolveBundledFacadeModuleLocation } from "./facade-resolution-shared.j
|
||||
|
||||
const CURRENT_MODULE_PATH = fileURLToPath(import.meta.url);
|
||||
|
||||
const nodeRequire = createRequire(import.meta.url);
|
||||
const moduleLoaders: PluginModuleLoaderCache = new Map();
|
||||
const loadedFacadeModules = new Map<string, unknown>();
|
||||
const loadedFacadePluginIds = new Set<string>();
|
||||
let facadeLoaderSourceTransformFactory: PluginModuleLoaderFactory | undefined;
|
||||
let cachedOpenClawPackageRoot: string | undefined;
|
||||
|
||||
function getSourceTransformFactory() {
|
||||
if (facadeLoaderSourceTransformFactory) {
|
||||
return facadeLoaderSourceTransformFactory;
|
||||
}
|
||||
const { createJiti } = nodeRequire("jiti") as typeof import("jiti");
|
||||
facadeLoaderSourceTransformFactory = createJiti;
|
||||
return facadeLoaderSourceTransformFactory;
|
||||
}
|
||||
|
||||
function getOpenClawPackageRoot() {
|
||||
if (cachedOpenClawPackageRoot) {
|
||||
return cachedOpenClawPackageRoot;
|
||||
@@ -63,7 +52,9 @@ function getModuleLoader(modulePath: string) {
|
||||
importerUrl: import.meta.url,
|
||||
preferBuiltDist: true,
|
||||
loaderFilename: import.meta.url,
|
||||
createLoader: getSourceTransformFactory(),
|
||||
...(facadeLoaderSourceTransformFactory
|
||||
? { createLoader: facadeLoaderSourceTransformFactory }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -244,6 +244,30 @@ describe("loadPluginMetadataSnapshot process memo", () => {
|
||||
expect(second.byPluginId.get("demo")).toBe(second.plugins[0]);
|
||||
});
|
||||
|
||||
it("skips persisted registry filesystem fingerprints after a process memo hit", () => {
|
||||
const stateDir = tempStateDir();
|
||||
touchPersistedIndex(stateDir);
|
||||
loadPluginRegistrySnapshotWithMetadata.mockReturnValue({
|
||||
source: "persisted",
|
||||
snapshot: makeIndex(),
|
||||
diagnostics: [],
|
||||
});
|
||||
|
||||
const first = loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
|
||||
const statSpy = vi.spyOn(fs, "statSync");
|
||||
const readdirSpy = vi.spyOn(fs, "readdirSync");
|
||||
try {
|
||||
const second = loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
|
||||
|
||||
expect(second).toBe(first);
|
||||
expect(statSpy).not.toHaveBeenCalled();
|
||||
expect(readdirSpy).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
statSpy.mockRestore();
|
||||
readdirSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("clears the process memo at plugin metadata lifecycle boundaries", () => {
|
||||
const stateDir = tempStateDir();
|
||||
touchPersistedIndex(stateDir);
|
||||
@@ -481,7 +505,7 @@ describe("loadPluginMetadataSnapshot process memo", () => {
|
||||
expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("refreshes when the persisted registry file changes", () => {
|
||||
it("keeps persisted registry snapshots process-stable until lifecycle clear", () => {
|
||||
const stateDir = tempStateDir();
|
||||
touchPersistedIndex(stateDir, 1);
|
||||
loadPluginRegistrySnapshotWithMetadata.mockReturnValue({
|
||||
@@ -494,6 +518,11 @@ describe("loadPluginMetadataSnapshot process memo", () => {
|
||||
touchPersistedIndex(stateDir, 22);
|
||||
loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
|
||||
|
||||
expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledOnce();
|
||||
|
||||
clearPluginMetadataLifecycleCaches();
|
||||
loadPluginMetadataSnapshot({ config: {}, env: {}, stateDir });
|
||||
|
||||
expect(loadPluginRegistrySnapshotWithMetadata).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ import {
|
||||
|
||||
type PluginMetadataSnapshotMemo = {
|
||||
key: string;
|
||||
lookupContextHash: string;
|
||||
registryState?: PersistedRegistryMemoState;
|
||||
snapshot: PluginMetadataSnapshot;
|
||||
};
|
||||
@@ -231,6 +232,18 @@ function resolvePersistedRegistryMemoContextHash(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function resolvePersistedRegistryMemoLookupContextHash(params: {
|
||||
env: NodeJS.ProcessEnv;
|
||||
preferPersisted?: boolean;
|
||||
stateDir?: string;
|
||||
}): string {
|
||||
return hashJson({
|
||||
env: pickMemoRelevantEnv(params.env),
|
||||
preferPersisted: params.preferPersisted ?? null,
|
||||
stateDir: params.stateDir ?? null,
|
||||
});
|
||||
}
|
||||
|
||||
function resolvePersistedRegistryMemoState(params: {
|
||||
env: NodeJS.ProcessEnv;
|
||||
index?: InstalledPluginIndex;
|
||||
@@ -273,6 +286,15 @@ function resolvePersistedRegistryMemoStateForLookup(
|
||||
},
|
||||
memos: readonly PluginMetadataSnapshotMemo[],
|
||||
): PersistedRegistryMemoState {
|
||||
const lookupContextHash = resolvePersistedRegistryMemoLookupContextHash(params);
|
||||
for (const memo of memos) {
|
||||
if (memo.lookupContextHash === lookupContextHash && memo.registryState) {
|
||||
// Gateway runtime metadata is process-stable. Installs/reloads clear the
|
||||
// memo lifecycle explicitly, so hot lookups can reuse the prepared
|
||||
// registry stamp instead of re-statting plugin roots on every turn.
|
||||
return memo.registryState;
|
||||
}
|
||||
}
|
||||
const fastFingerprint = resolvePersistedRegistryFastMemoFingerprint(params);
|
||||
const fastHash = hashJson(fastFingerprint);
|
||||
const contextHash = resolvePersistedRegistryMemoContextHash({
|
||||
@@ -581,6 +603,13 @@ export function loadPluginMetadataSnapshot(
|
||||
: registryState;
|
||||
rememberPluginMetadataSnapshotMemo({
|
||||
key: computePluginMetadataSnapshotMemoKey({ params, registryState: cachedRegistryState }),
|
||||
lookupContextHash: resolvePersistedRegistryMemoLookupContextHash({
|
||||
env,
|
||||
...(params.stateDir ? { stateDir: resolveUserPath(params.stateDir, env) } : {}),
|
||||
...(params.preferPersisted !== undefined
|
||||
? { preferPersisted: params.preferPersisted }
|
||||
: {}),
|
||||
}),
|
||||
registryState: cachedRegistryState,
|
||||
snapshot,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user