mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-28 07:33:38 +00:00
perf(agents): cache subagent registry reads
This commit is contained in:
85
src/agents/subagent-registry-state.test.ts
Normal file
85
src/agents/subagent-registry-state.test.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
// Subagent registry state tests cover hot read caching over the persisted SQLite snapshot.
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
clearSubagentRunsReadCacheForTest,
|
||||
getSubagentRunsSnapshotForRead,
|
||||
persistSubagentRunsToDisk,
|
||||
} from "./subagent-registry-state.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
const mocks = vi.hoisted(() => ({
|
||||
loadSubagentRegistryFromSqlite: vi.fn<() => Map<string, SubagentRunRecord>>(),
|
||||
saveSubagentRegistryToSqlite: vi.fn<(runs: Map<string, SubagentRunRecord>) => void>(),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-registry.store.sqlite.js", () => ({
|
||||
loadSubagentRegistryFromSqlite: mocks.loadSubagentRegistryFromSqlite,
|
||||
saveSubagentRegistryToSqlite: mocks.saveSubagentRegistryToSqlite,
|
||||
}));
|
||||
|
||||
function createRun(runId: string): SubagentRunRecord {
|
||||
return {
|
||||
runId,
|
||||
childSessionKey: `agent:main:subagent:${runId}`,
|
||||
requesterSessionKey: "agent:main:main",
|
||||
requesterDisplayKey: "main",
|
||||
task: `task ${runId}`,
|
||||
cleanup: "keep",
|
||||
createdAt: 1,
|
||||
startedAt: 1,
|
||||
};
|
||||
}
|
||||
|
||||
describe("subagent registry state read cache", () => {
|
||||
const previousReadDiskFlag = process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(1_000);
|
||||
process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK = "1";
|
||||
clearSubagentRunsReadCacheForTest();
|
||||
mocks.loadSubagentRegistryFromSqlite.mockReset();
|
||||
mocks.saveSubagentRegistryToSqlite.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearSubagentRunsReadCacheForTest();
|
||||
if (previousReadDiskFlag === undefined) {
|
||||
delete process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK;
|
||||
} else {
|
||||
process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK = previousReadDiskFlag;
|
||||
}
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("reuses persisted snapshots for hot reads within the ttl", () => {
|
||||
const firstRun = createRun("run-first");
|
||||
const secondRun = createRun("run-second");
|
||||
mocks.loadSubagentRegistryFromSqlite
|
||||
.mockReturnValueOnce(new Map([[firstRun.runId, firstRun]]))
|
||||
.mockReturnValueOnce(new Map([[secondRun.runId, secondRun]]));
|
||||
|
||||
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]);
|
||||
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]);
|
||||
expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(1);
|
||||
|
||||
vi.advanceTimersByTime(500);
|
||||
|
||||
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-second"]);
|
||||
expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("refreshes the local read cache after successful writes", () => {
|
||||
const firstRun = createRun("run-first");
|
||||
const savedRun = createRun("run-saved");
|
||||
mocks.loadSubagentRegistryFromSqlite.mockReturnValue(new Map([[firstRun.runId, firstRun]]));
|
||||
|
||||
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]);
|
||||
|
||||
persistSubagentRunsToDisk(new Map([[savedRun.runId, savedRun]]));
|
||||
|
||||
expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-saved"]);
|
||||
expect(mocks.saveSubagentRegistryToSqlite).toHaveBeenCalledOnce();
|
||||
expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
@@ -9,9 +9,54 @@ import {
|
||||
} from "./subagent-registry.store.sqlite.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
const SUBAGENT_RUNS_READ_CACHE_TTL_MS = 500;
|
||||
|
||||
let persistedSubagentRunsReadCache:
|
||||
| {
|
||||
loadedAtMs: number;
|
||||
runs: Map<string, SubagentRunRecord>;
|
||||
}
|
||||
| undefined;
|
||||
|
||||
function cloneSubagentRunsSnapshot(
|
||||
runs: Map<string, SubagentRunRecord>,
|
||||
): Map<string, SubagentRunRecord> {
|
||||
return new Map([...runs.entries()].map(([runId, entry]) => [runId, structuredClone(entry)]));
|
||||
}
|
||||
|
||||
function rememberPersistedSubagentRunsSnapshot(runs: Map<string, SubagentRunRecord>): void {
|
||||
persistedSubagentRunsReadCache = {
|
||||
loadedAtMs: Date.now(),
|
||||
runs: cloneSubagentRunsSnapshot(runs),
|
||||
};
|
||||
}
|
||||
|
||||
function loadPersistedSubagentRunsForRead(): Map<string, SubagentRunRecord> {
|
||||
const nowMs = Date.now();
|
||||
if (
|
||||
persistedSubagentRunsReadCache &&
|
||||
nowMs >= persistedSubagentRunsReadCache.loadedAtMs &&
|
||||
nowMs - persistedSubagentRunsReadCache.loadedAtMs < SUBAGENT_RUNS_READ_CACHE_TTL_MS
|
||||
) {
|
||||
return persistedSubagentRunsReadCache.runs;
|
||||
}
|
||||
|
||||
const runs = loadSubagentRegistryFromSqlite();
|
||||
persistedSubagentRunsReadCache = {
|
||||
loadedAtMs: nowMs,
|
||||
runs,
|
||||
};
|
||||
return runs;
|
||||
}
|
||||
|
||||
export function clearSubagentRunsReadCacheForTest(): void {
|
||||
persistedSubagentRunsReadCache = undefined;
|
||||
}
|
||||
|
||||
export function persistSubagentRunsToDisk(runs: Map<string, SubagentRunRecord>) {
|
||||
try {
|
||||
saveSubagentRegistryToSqlite(runs);
|
||||
rememberPersistedSubagentRunsSnapshot(runs);
|
||||
} catch {
|
||||
// ignore persistence failures
|
||||
}
|
||||
@@ -19,6 +64,7 @@ export function persistSubagentRunsToDisk(runs: Map<string, SubagentRunRecord>)
|
||||
|
||||
export function persistSubagentRunsToDiskOrThrow(runs: Map<string, SubagentRunRecord>) {
|
||||
saveSubagentRegistryToSqlite(runs);
|
||||
rememberPersistedSubagentRunsSnapshot(runs);
|
||||
}
|
||||
|
||||
export function restoreSubagentRunsFromDisk(params: {
|
||||
@@ -53,7 +99,9 @@ export function getSubagentRunsSnapshotForRead(
|
||||
if (shouldReadDisk) {
|
||||
try {
|
||||
// Persisted state lets other worker processes observe active runs.
|
||||
for (const [runId, entry] of loadSubagentRegistryFromSqlite().entries()) {
|
||||
// Cache this hot cross-process snapshot briefly; writes refresh the local
|
||||
// cache and the TTL bounds visibility of changes from other processes.
|
||||
for (const [runId, entry] of loadPersistedSubagentRunsForRead().entries()) {
|
||||
merged.set(runId, entry);
|
||||
}
|
||||
} catch {
|
||||
|
||||
@@ -87,6 +87,7 @@ const mocks = vi.hoisted(() => ({
|
||||
resolveStorePath: vi.fn(() => "/tmp/test-session-store.json"),
|
||||
updateSessionStore: vi.fn(),
|
||||
emitSessionLifecycleEvent: vi.fn(),
|
||||
clearSubagentRunsReadCacheForTest: vi.fn(),
|
||||
persistSubagentRunsToDisk: vi.fn(),
|
||||
persistSubagentRunsToDiskOrThrow: vi.fn(),
|
||||
restoreSubagentRunsFromDisk: vi.fn(() => 0),
|
||||
@@ -133,6 +134,7 @@ vi.mock("../sessions/session-lifecycle-events.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-registry-state.js", () => ({
|
||||
clearSubagentRunsReadCacheForTest: mocks.clearSubagentRunsReadCacheForTest,
|
||||
getSubagentRunsSnapshotForRead: mocks.getSubagentRunsSnapshotForRead,
|
||||
persistSubagentRunsToDisk: mocks.persistSubagentRunsToDisk,
|
||||
persistSubagentRunsToDiskOrThrow: mocks.persistSubagentRunsToDiskOrThrow,
|
||||
|
||||
@@ -74,6 +74,7 @@ import {
|
||||
type RegisterSubagentRunParams,
|
||||
} from "./subagent-registry-run-manager.js";
|
||||
import {
|
||||
clearSubagentRunsReadCacheForTest,
|
||||
getSubagentRunsSnapshotForRead,
|
||||
persistSubagentRunsToDisk,
|
||||
persistSubagentRunsToDiskOrThrow,
|
||||
@@ -1255,6 +1256,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) {
|
||||
runtimePluginsLoader.clear();
|
||||
subagentAnnounceLoader.clear();
|
||||
browserCleanupLoader.clear();
|
||||
clearSubagentRunsReadCacheForTest();
|
||||
stopSweeper();
|
||||
sweepInProgress = false;
|
||||
restoreAttempted = false;
|
||||
|
||||
Reference in New Issue
Block a user