diff --git a/src/agents/subagent-registry-state.test.ts b/src/agents/subagent-registry-state.test.ts new file mode 100644 index 00000000000..13ec57427cf --- /dev/null +++ b/src/agents/subagent-registry-state.test.ts @@ -0,0 +1,85 @@ +// Subagent registry state tests cover hot read caching over the persisted SQLite snapshot. +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + clearSubagentRunsReadCacheForTest, + getSubagentRunsSnapshotForRead, + persistSubagentRunsToDisk, +} from "./subagent-registry-state.js"; +import type { SubagentRunRecord } from "./subagent-registry.types.js"; + +const mocks = vi.hoisted(() => ({ + loadSubagentRegistryFromSqlite: vi.fn<() => Map>(), + saveSubagentRegistryToSqlite: vi.fn<(runs: Map) => void>(), +})); + +vi.mock("./subagent-registry.store.sqlite.js", () => ({ + loadSubagentRegistryFromSqlite: mocks.loadSubagentRegistryFromSqlite, + saveSubagentRegistryToSqlite: mocks.saveSubagentRegistryToSqlite, +})); + +function createRun(runId: string): SubagentRunRecord { + return { + runId, + childSessionKey: `agent:main:subagent:${runId}`, + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + task: `task ${runId}`, + cleanup: "keep", + createdAt: 1, + startedAt: 1, + }; +} + +describe("subagent registry state read cache", () => { + const previousReadDiskFlag = process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK; + + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(1_000); + process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK = "1"; + clearSubagentRunsReadCacheForTest(); + mocks.loadSubagentRegistryFromSqlite.mockReset(); + mocks.saveSubagentRegistryToSqlite.mockReset(); + }); + + afterEach(() => { + clearSubagentRunsReadCacheForTest(); + if (previousReadDiskFlag === undefined) { + delete process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK; + } else { + process.env.OPENCLAW_TEST_READ_SUBAGENT_RUNS_FROM_DISK = previousReadDiskFlag; + } + vi.useRealTimers(); + }); + + it("reuses persisted snapshots for hot reads within the ttl", () => { + const firstRun = createRun("run-first"); + const secondRun = createRun("run-second"); + mocks.loadSubagentRegistryFromSqlite + .mockReturnValueOnce(new Map([[firstRun.runId, firstRun]])) + .mockReturnValueOnce(new Map([[secondRun.runId, secondRun]])); + + expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]); + expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]); + expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(1); + + vi.advanceTimersByTime(500); + + expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-second"]); + expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(2); + }); + + it("refreshes the local read cache after successful writes", () => { + const firstRun = createRun("run-first"); + const savedRun = createRun("run-saved"); + mocks.loadSubagentRegistryFromSqlite.mockReturnValue(new Map([[firstRun.runId, firstRun]])); + + expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-first"]); + + persistSubagentRunsToDisk(new Map([[savedRun.runId, savedRun]])); + + expect([...getSubagentRunsSnapshotForRead(new Map()).keys()]).toEqual(["run-saved"]); + expect(mocks.saveSubagentRegistryToSqlite).toHaveBeenCalledOnce(); + expect(mocks.loadSubagentRegistryFromSqlite).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/agents/subagent-registry-state.ts b/src/agents/subagent-registry-state.ts index ab49f8943f7..999d9623f19 100644 --- a/src/agents/subagent-registry-state.ts +++ b/src/agents/subagent-registry-state.ts @@ -9,9 +9,54 @@ import { } from "./subagent-registry.store.sqlite.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; +const SUBAGENT_RUNS_READ_CACHE_TTL_MS = 500; + +let persistedSubagentRunsReadCache: + | { + loadedAtMs: number; + runs: Map; + } + | undefined; + +function cloneSubagentRunsSnapshot( + runs: Map, +): Map { + return new Map([...runs.entries()].map(([runId, entry]) => [runId, structuredClone(entry)])); +} + +function rememberPersistedSubagentRunsSnapshot(runs: Map): void { + persistedSubagentRunsReadCache = { + loadedAtMs: Date.now(), + runs: cloneSubagentRunsSnapshot(runs), + }; +} + +function loadPersistedSubagentRunsForRead(): Map { + const nowMs = Date.now(); + if ( + persistedSubagentRunsReadCache && + nowMs >= persistedSubagentRunsReadCache.loadedAtMs && + nowMs - persistedSubagentRunsReadCache.loadedAtMs < SUBAGENT_RUNS_READ_CACHE_TTL_MS + ) { + return persistedSubagentRunsReadCache.runs; + } + + const runs = loadSubagentRegistryFromSqlite(); + persistedSubagentRunsReadCache = { + loadedAtMs: nowMs, + runs, + }; + return runs; +} + +export function clearSubagentRunsReadCacheForTest(): void { + persistedSubagentRunsReadCache = undefined; +} + export function persistSubagentRunsToDisk(runs: Map) { try { saveSubagentRegistryToSqlite(runs); + rememberPersistedSubagentRunsSnapshot(runs); } catch { // ignore persistence failures } @@ -19,6 +64,7 @@ export function persistSubagentRunsToDisk(runs: Map) export function persistSubagentRunsToDiskOrThrow(runs: Map) { saveSubagentRegistryToSqlite(runs); + rememberPersistedSubagentRunsSnapshot(runs); } export function restoreSubagentRunsFromDisk(params: { @@ -53,7 +99,9 @@ export function getSubagentRunsSnapshotForRead( if (shouldReadDisk) { try { // Persisted state lets other worker processes observe active runs. - for (const [runId, entry] of loadSubagentRegistryFromSqlite().entries()) { + // Cache this hot cross-process snapshot briefly; writes refresh the local + // cache and the TTL bounds visibility of changes from other processes. + for (const [runId, entry] of loadPersistedSubagentRunsForRead().entries()) { merged.set(runId, entry); } } catch { diff --git a/src/agents/subagent-registry.test.ts b/src/agents/subagent-registry.test.ts index 146665c7c4f..053209260d0 100644 --- a/src/agents/subagent-registry.test.ts +++ b/src/agents/subagent-registry.test.ts @@ -87,6 +87,7 @@ const mocks = vi.hoisted(() => ({ resolveStorePath: vi.fn(() => "/tmp/test-session-store.json"), updateSessionStore: vi.fn(), emitSessionLifecycleEvent: vi.fn(), + clearSubagentRunsReadCacheForTest: vi.fn(), persistSubagentRunsToDisk: vi.fn(), persistSubagentRunsToDiskOrThrow: vi.fn(), restoreSubagentRunsFromDisk: vi.fn(() => 0), @@ -133,6 +134,7 @@ vi.mock("../sessions/session-lifecycle-events.js", () => ({ })); vi.mock("./subagent-registry-state.js", () => ({ + clearSubagentRunsReadCacheForTest: mocks.clearSubagentRunsReadCacheForTest, getSubagentRunsSnapshotForRead: mocks.getSubagentRunsSnapshotForRead, persistSubagentRunsToDisk: mocks.persistSubagentRunsToDisk, persistSubagentRunsToDiskOrThrow: mocks.persistSubagentRunsToDiskOrThrow, diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 78a4a239622..28e6aaf35aa 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -74,6 +74,7 @@ import { type RegisterSubagentRunParams, } from "./subagent-registry-run-manager.js"; import { + clearSubagentRunsReadCacheForTest, getSubagentRunsSnapshotForRead, persistSubagentRunsToDisk, persistSubagentRunsToDiskOrThrow, @@ -1255,6 +1256,7 @@ export function resetSubagentRegistryForTests(opts?: { persist?: boolean }) { runtimePluginsLoader.clear(); subagentAnnounceLoader.clear(); browserCleanupLoader.clear(); + clearSubagentRunsReadCacheForTest(); stopSweeper(); sweepInProgress = false; restoreAttempted = false;