mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 22:24:06 +00:00
perf: skip session store clones in turn hot paths
This commit is contained in:
@@ -11,7 +11,8 @@ const hoisted = vi.hoisted(() => {
|
||||
});
|
||||
|
||||
vi.mock("../../config/sessions/store-load.js", () => ({
|
||||
loadSessionStore: (storePath: string) => hoisted.loadSessionStoreMock(storePath),
|
||||
loadSessionStore: (storePath: string, opts?: unknown) =>
|
||||
hoisted.loadSessionStoreMock(storePath, opts),
|
||||
}));
|
||||
|
||||
vi.mock("../../config/sessions/targets.js", () => ({
|
||||
@@ -57,7 +58,10 @@ describe("listAcpSessionEntries", () => {
|
||||
const entries = await listAcpSessionEntries({ cfg });
|
||||
|
||||
expect(hoisted.resolveAllAgentSessionStoreTargetsMock).toHaveBeenCalledWith(cfg, undefined);
|
||||
expect(hoisted.loadSessionStoreMock).toHaveBeenCalledWith("/custom/sessions/ops.json");
|
||||
expect(hoisted.loadSessionStoreMock).toHaveBeenCalledWith(
|
||||
"/custom/sessions/ops.json",
|
||||
undefined,
|
||||
);
|
||||
expect(entries).toEqual([
|
||||
{
|
||||
acp: storedEntry.acp,
|
||||
@@ -69,4 +73,18 @@ describe("listAcpSessionEntries", () => {
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("can skip cloning for maintenance callers that only inspect ACP entries", async () => {
|
||||
const cfg = { session: { store: "/custom/sessions/{agentId}.json" } } as OpenClawConfig;
|
||||
hoisted.resolveAllAgentSessionStoreTargetsMock.mockResolvedValue([
|
||||
{ agentId: "ops", storePath: "/custom/sessions/ops.json" },
|
||||
]);
|
||||
hoisted.loadSessionStoreMock.mockReturnValue({});
|
||||
|
||||
await listAcpSessionEntries({ cfg, clone: false });
|
||||
|
||||
expect(hoisted.loadSessionStoreMock).toHaveBeenCalledWith("/custom/sessions/ops.json", {
|
||||
clone: false,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -99,6 +99,7 @@ export function readAcpSessionEntry(params: {
|
||||
export async function listAcpSessionEntries(params: {
|
||||
cfg?: OpenClawConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
clone?: boolean;
|
||||
}): Promise<AcpSessionStoreEntry[]> {
|
||||
const cfg = params.cfg ?? getRuntimeConfig();
|
||||
const storeTargets = await resolveAllAgentSessionStoreTargets(
|
||||
@@ -111,7 +112,7 @@ export async function listAcpSessionEntries(params: {
|
||||
const storePath = target.storePath;
|
||||
let store: Record<string, SessionEntry>;
|
||||
try {
|
||||
store = loadSessionStore(storePath);
|
||||
store = loadSessionStore(storePath, params.clone === false ? { clone: false } : undefined);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1004,7 +1004,7 @@ function refreshSessionEntryFromStore(params: {
|
||||
return fallbackEntry;
|
||||
}
|
||||
try {
|
||||
const latestStore = loadSessionStore(storePath, { skipCache: true });
|
||||
const latestStore = loadSessionStore(storePath, { skipCache: true, clone: false });
|
||||
const latestEntry = latestStore?.[sessionKey];
|
||||
if (!latestEntry) {
|
||||
return fallbackEntry;
|
||||
|
||||
@@ -215,6 +215,7 @@ export function initFastReplySessionState(params: {
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const sessionStore: Record<string, SessionEntry> = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
clone: false,
|
||||
});
|
||||
const existingEntry = sessionStore[sessionKey];
|
||||
const commandSource = ctx.BodyForCommands ?? ctx.CommandBody ?? ctx.RawBody ?? ctx.Body ?? "";
|
||||
|
||||
@@ -284,6 +284,7 @@ export async function initSessionState(params: {
|
||||
const sessionStoreLoadStartMs = ingressTimingEnabled ? Date.now() : 0;
|
||||
const sessionStore: Record<string, SessionEntry> = loadSessionStore(storePath, {
|
||||
skipCache: true,
|
||||
clone: false,
|
||||
});
|
||||
if (ingressTimingEnabled) {
|
||||
log.info(
|
||||
|
||||
@@ -703,6 +703,33 @@ describe("session store writer queue", () => {
|
||||
expect(cached?.[key]?.sessionId).toBe("s-serialized-cache");
|
||||
});
|
||||
|
||||
it("returns an owned parsed store for fresh skip-cache loads without cloning again", async () => {
|
||||
const key = "agent:main:owned-skip-cache";
|
||||
const { storePath } = await makeTmpStore({
|
||||
[key]: {
|
||||
sessionId: "s-owned-skip-cache",
|
||||
updatedAt: Date.now(),
|
||||
skillsSnapshot: {
|
||||
prompt: "owned prompt",
|
||||
skills: [{ name: "demo" }],
|
||||
version: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
const parseSpy = vi.spyOn(JSON, "parse");
|
||||
try {
|
||||
const loaded = loadSessionStore(storePath, { skipCache: true, clone: false });
|
||||
loaded[key].sessionId = "mutated-owned-store";
|
||||
|
||||
expect(parseSpy).toHaveBeenCalledTimes(1);
|
||||
expect(loadSessionStore(storePath, { skipCache: true, clone: false })[key].sessionId).toBe(
|
||||
"s-owned-skip-cache",
|
||||
);
|
||||
} finally {
|
||||
parseSpy.mockRestore();
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps session store writes atomic while skipping durable fsync inside the writer lock", async () => {
|
||||
const key = "agent:main:no-fsync";
|
||||
const { storePath } = await makeTmpStore({
|
||||
|
||||
@@ -503,6 +503,7 @@ function hasBackingSession(task: TaskRecord, context?: BackingSessionLookupConte
|
||||
if (task.runtime === "acp") {
|
||||
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({
|
||||
sessionKey: childSessionKey,
|
||||
clone: false,
|
||||
});
|
||||
if (!acpEntry || acpEntry.storeReadFailed) {
|
||||
return true;
|
||||
@@ -648,7 +649,10 @@ function shouldCloseTerminalAcpSession(task: TaskRecord): boolean {
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey });
|
||||
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({
|
||||
sessionKey,
|
||||
clone: false,
|
||||
});
|
||||
if (!acpEntry || acpEntry.storeReadFailed || !acpEntry.acp) {
|
||||
return false;
|
||||
}
|
||||
@@ -686,7 +690,10 @@ async function cleanupTerminalAcpSession(task: TaskRecord): Promise<void> {
|
||||
if (!sessionKey) {
|
||||
return;
|
||||
}
|
||||
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({ sessionKey });
|
||||
const acpEntry = taskRegistryMaintenanceRuntime.readAcpSessionEntry({
|
||||
sessionKey,
|
||||
clone: false,
|
||||
});
|
||||
const closeAcpSession = taskRegistryMaintenanceRuntime.closeAcpSession;
|
||||
if (!acpEntry || !closeAcpSession) {
|
||||
return;
|
||||
@@ -722,7 +729,7 @@ async function cleanupTerminalAcpSession(task: TaskRecord): Promise<void> {
|
||||
async function cleanupOrphanedParentOwnedAcpSessions(): Promise<void> {
|
||||
let acpSessions: AcpSessionStoreEntry[];
|
||||
try {
|
||||
acpSessions = await taskRegistryMaintenanceRuntime.listAcpSessionEntries({});
|
||||
acpSessions = await taskRegistryMaintenanceRuntime.listAcpSessionEntries({ clone: false });
|
||||
} catch (error) {
|
||||
log.warn("Failed to list ACP sessions during task maintenance", { error });
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user