mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-07 07:22:54 +00:00
perf(sessions): patch single-entry store writes
This commit is contained in:
@@ -214,6 +214,19 @@ describe("updateSessionStoreAfterAgentRun", () => {
|
||||
maxEntries: 42,
|
||||
},
|
||||
});
|
||||
expect(typeof updateOptions?.resolveSingleEntryPersistence).toBe("function");
|
||||
expect(
|
||||
updateOptions?.resolveSingleEntryPersistence?.({
|
||||
sessionId,
|
||||
updatedAt: 2,
|
||||
} as SessionEntry),
|
||||
).toEqual({
|
||||
sessionKey,
|
||||
entry: {
|
||||
sessionId,
|
||||
updatedAt: 2,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -280,7 +280,11 @@ export async function updateSessionStoreAfterAgentRun(params: {
|
||||
store[sessionKey] = merged;
|
||||
return merged;
|
||||
},
|
||||
{ takeCacheOwnership: true, maintenanceConfig },
|
||||
{
|
||||
takeCacheOwnership: true,
|
||||
maintenanceConfig,
|
||||
resolveSingleEntryPersistence: (entry) => (entry ? { sessionKey, entry } : undefined),
|
||||
},
|
||||
);
|
||||
if (persisted) {
|
||||
sessionStore[sessionKey] = persisted;
|
||||
|
||||
@@ -725,6 +725,86 @@ describe("Session Store Cache", () => {
|
||||
expect(cached["session:1"].deliveryContext?.to).toBe("chat-1");
|
||||
});
|
||||
|
||||
it("patches serialized JSON for one-entry updates without stringifying untouched entries", async () => {
|
||||
await saveSessionStore(storePath, {
|
||||
"session:1": createSessionEntry({ sessionId: "id-1", displayName: "Before" }),
|
||||
"session:2": createSessionEntry({ sessionId: "id-2", displayName: "Untouched" }),
|
||||
});
|
||||
const cached = loadSessionStore(storePath, { clone: false });
|
||||
Object.defineProperty(cached["session:2"], "toJSON", {
|
||||
value: () => {
|
||||
throw new Error("full store stringify touched session:2");
|
||||
},
|
||||
});
|
||||
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey: "session:1",
|
||||
update: () => ({ displayName: "After", updatedAt: 123 }),
|
||||
takeCacheOwnership: true,
|
||||
});
|
||||
|
||||
const disk = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<string, SessionEntry>;
|
||||
expect(disk["session:1"].displayName).toBe("After");
|
||||
expect(disk["session:2"].displayName).toBe("Untouched");
|
||||
});
|
||||
|
||||
it("falls back to full projection when untouched entries need prompt blob repair", async () => {
|
||||
const prompt = "skill prompt ".repeat(80);
|
||||
await saveSessionStore(storePath, {
|
||||
"session:1": createSessionEntry({ sessionId: "id-1", displayName: "Before" }),
|
||||
"session:2": createSessionEntry({
|
||||
sessionId: "id-2",
|
||||
skillsSnapshot: {
|
||||
prompt,
|
||||
skills: [{ name: "alpha" }],
|
||||
},
|
||||
}),
|
||||
});
|
||||
await fs.promises.rm(path.join(testDir, "skills-prompts"), {
|
||||
recursive: true,
|
||||
force: true,
|
||||
});
|
||||
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey: "session:1",
|
||||
update: () => ({ displayName: "After" }),
|
||||
takeCacheOwnership: true,
|
||||
});
|
||||
|
||||
clearSessionStoreCacheForTest();
|
||||
const loaded = loadSessionStore(storePath);
|
||||
expect(loaded["session:1"].displayName).toBe("After");
|
||||
expect(loaded["session:2"].skillsSnapshot?.prompt).toBe(prompt);
|
||||
});
|
||||
|
||||
it("serializes the normalized entry when applying the one-entry fast path", async () => {
|
||||
await saveSessionStore(storePath, {
|
||||
"session:1": createSessionEntry({ sessionId: "id-1", displayName: "Before" }),
|
||||
"session:2": createSessionEntry({ sessionId: "id-2", displayName: "Untouched" }),
|
||||
});
|
||||
|
||||
await updateSessionStoreEntry({
|
||||
storePath,
|
||||
sessionKey: "session:1",
|
||||
update: () => ({
|
||||
displayName: "After",
|
||||
skillsSnapshot: {
|
||||
prompt: "short prompt",
|
||||
skills: [{ name: "alpha" }],
|
||||
resolvedSkills: [{ name: "alpha", body: "transient" }],
|
||||
} as SessionEntry["skillsSnapshot"],
|
||||
}),
|
||||
takeCacheOwnership: true,
|
||||
});
|
||||
|
||||
const disk = JSON.parse(fs.readFileSync(storePath, "utf8")) as Record<string, SessionEntry>;
|
||||
expect(disk["session:1"].displayName).toBe("After");
|
||||
expect(disk["session:1"].skillsSnapshot?.prompt).toBe("short prompt");
|
||||
expect("resolvedSkills" in (disk["session:1"].skillsSnapshot ?? {})).toBe(false);
|
||||
});
|
||||
|
||||
it("restores the writer-owned cache when update result proves the store unchanged", async () => {
|
||||
await saveSessionStore(storePath, {
|
||||
"session:1": createSessionEntry({ sessionId: "id-1" }),
|
||||
|
||||
@@ -166,6 +166,8 @@ type SaveSessionStoreOptions = {
|
||||
maintenanceOverride?: Partial<ResolvedSessionMaintenanceConfig>;
|
||||
/** Fully resolved maintenance settings when the caller already has config loaded. */
|
||||
maintenanceConfig?: ResolvedSessionMaintenanceConfig;
|
||||
/** Changed top-level entry when a hot path only updated one existing session. */
|
||||
singleEntryPersistence?: SingleEntryPersistencePatch;
|
||||
};
|
||||
|
||||
type UpdateSessionStoreOptions<T> = SaveSessionStoreOptions & {
|
||||
@@ -174,6 +176,12 @@ type UpdateSessionStoreOptions<T> = SaveSessionStoreOptions & {
|
||||
* When true, the writer-owned object cache is restored and sessions.json is untouched.
|
||||
*/
|
||||
skipSaveWhenResult?: (result: T) => boolean;
|
||||
resolveSingleEntryPersistence?: (result: T) => SingleEntryPersistencePatch | null | undefined;
|
||||
};
|
||||
|
||||
type SingleEntryPersistencePatch = {
|
||||
sessionKey: string;
|
||||
entry: SessionEntry;
|
||||
};
|
||||
|
||||
type SessionEntryWorkflowOptions = {
|
||||
@@ -277,6 +285,97 @@ function restoreUnchangedSessionStoreCache(
|
||||
}
|
||||
}
|
||||
|
||||
function findJsonValueEnd(json: string, valueStart: number): number | null {
|
||||
let depth = 0;
|
||||
let inString = false;
|
||||
let escaped = false;
|
||||
for (let index = valueStart; index < json.length; index += 1) {
|
||||
const char = json[index];
|
||||
if (inString) {
|
||||
if (escaped) {
|
||||
escaped = false;
|
||||
} else if (char === "\\") {
|
||||
escaped = true;
|
||||
} else if (char === '"') {
|
||||
inString = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (char === '"') {
|
||||
inString = true;
|
||||
continue;
|
||||
}
|
||||
if (char === "{" || char === "[") {
|
||||
depth += 1;
|
||||
continue;
|
||||
}
|
||||
if (char !== "}" && char !== "]") {
|
||||
continue;
|
||||
}
|
||||
depth -= 1;
|
||||
if (depth === 0) {
|
||||
return index + 1;
|
||||
}
|
||||
if (depth < 0) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function indentTopLevelEntryJson(json: string): string {
|
||||
return json.replaceAll("\n", "\n ");
|
||||
}
|
||||
|
||||
function buildSingleEntrySerializedStore(params: {
|
||||
storePath: string;
|
||||
patch: SingleEntryPersistencePatch;
|
||||
}): { serialized: string; promptBlobs: SessionSkillPromptBlobProjection[] } | null {
|
||||
const currentSerialized = getSerializedSessionStore(params.storePath);
|
||||
if (currentSerialized === undefined) {
|
||||
return null;
|
||||
}
|
||||
const marker = `\n ${JSON.stringify(params.patch.sessionKey)}: `;
|
||||
const markerIndex = currentSerialized.indexOf(marker);
|
||||
if (markerIndex < 0) {
|
||||
return null;
|
||||
}
|
||||
const valueStart = markerIndex + marker.length;
|
||||
if (currentSerialized[valueStart] !== "{") {
|
||||
return null;
|
||||
}
|
||||
const valueEnd = findJsonValueEnd(currentSerialized, valueStart);
|
||||
if (valueEnd === null) {
|
||||
return null;
|
||||
}
|
||||
const projected = projectSessionStoreForPersistence({
|
||||
storePath: params.storePath,
|
||||
store: { [params.patch.sessionKey]: params.patch.entry },
|
||||
});
|
||||
const projectedEntry = projected.store[params.patch.sessionKey];
|
||||
if (!projectedEntry) {
|
||||
return null;
|
||||
}
|
||||
const entryJson = indentTopLevelEntryJson(JSON.stringify(projectedEntry, null, 2));
|
||||
return {
|
||||
serialized:
|
||||
currentSerialized.slice(0, valueStart) + entryJson + currentSerialized.slice(valueEnd),
|
||||
promptBlobs: [...projected.promptBlobs.values()],
|
||||
};
|
||||
}
|
||||
|
||||
function storeHasUntouchedHydratedSkillPrompts(
|
||||
store: Record<string, SessionEntry>,
|
||||
changedSessionKey: string,
|
||||
): boolean {
|
||||
for (const [key, entry] of Object.entries(store)) {
|
||||
if (key !== changedSessionKey && typeof entry.skillsSnapshot?.prompt === "string") {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
function loadMutableSessionStoreForWriter(storePath: string): Record<string, SessionEntry> {
|
||||
const currentFileStat = getFileStatSnapshot(storePath);
|
||||
if (isSessionStoreCacheEnabled()) {
|
||||
@@ -517,6 +616,32 @@ async function saveSessionStoreUnlocked(
|
||||
}
|
||||
|
||||
await fs.promises.mkdir(path.dirname(storePath), { recursive: true });
|
||||
if (
|
||||
opts?.singleEntryPersistence &&
|
||||
!maintenanceChangedStore &&
|
||||
!storeHasUntouchedHydratedSkillPrompts(store, opts.singleEntryPersistence.sessionKey)
|
||||
) {
|
||||
const normalizedEntry = store[opts.singleEntryPersistence.sessionKey];
|
||||
const singleEntrySerialized = buildSingleEntrySerializedStore({
|
||||
storePath,
|
||||
patch: normalizedEntry
|
||||
? {
|
||||
sessionKey: opts.singleEntryPersistence.sessionKey,
|
||||
entry: normalizedEntry,
|
||||
}
|
||||
: opts.singleEntryPersistence,
|
||||
});
|
||||
if (singleEntrySerialized) {
|
||||
await writeSessionStoreAtomic({
|
||||
storePath,
|
||||
store,
|
||||
serialized: singleEntrySerialized.serialized,
|
||||
promptBlobs: singleEntrySerialized.promptBlobs,
|
||||
takeOwnership: opts?.takeCacheOwnership,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
const persisted = projectSessionStoreForPersistence({ storePath, store });
|
||||
const promptBlobs = [...persisted.promptBlobs.values()];
|
||||
const json = JSON.stringify(persisted.store, null, 2);
|
||||
@@ -632,7 +757,10 @@ export async function updateSessionStore<T>(
|
||||
nextStore: store,
|
||||
allowDropSessionKeys: opts?.allowDropAcpMetaSessionKeys,
|
||||
});
|
||||
await saveSessionStoreUnlocked(storePath, store, opts);
|
||||
await saveSessionStoreUnlocked(storePath, store, {
|
||||
...opts,
|
||||
singleEntryPersistence: opts?.resolveSingleEntryPersistence?.(result) ?? undefined,
|
||||
});
|
||||
return result;
|
||||
});
|
||||
}
|
||||
@@ -761,6 +889,10 @@ async function persistResolvedSessionEntry(params: {
|
||||
activeSessionKey: params.resolved.normalizedKey,
|
||||
skipMaintenance: params.skipMaintenance,
|
||||
skipSerializeForUnchangedStore: entryUnchanged,
|
||||
singleEntryPersistence:
|
||||
params.resolved.legacyKeys.length === 0 && params.resolved.existing
|
||||
? { sessionKey: params.resolved.normalizedKey, entry: next }
|
||||
: undefined,
|
||||
takeCacheOwnership: params.takeCacheOwnership,
|
||||
});
|
||||
return entryUnchanged || params.returnDetached ? cloneSessionEntry(next) : next;
|
||||
|
||||
Reference in New Issue
Block a user