From b4437047f4777b1ff95f9908b654121887c71ff6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 2 May 2026 12:33:03 +0100 Subject: [PATCH] perf: route session store writes through writer queue --- CHANGELOG.md | 1 + docs/plugins/sdk-runtime.md | 9 +- .../session-management-compaction.md | 2 +- .../voice-call/src/response-generator.test.ts | 17 +- .../voice-call/src/response-generator.ts | 46 ++-- extensions/voice-call/src/runtime.test.ts | 3 + ...dded-subscribe.handlers.compaction.test.ts | 17 +- ...bagent-registry.persistence.resume.test.ts | 4 +- .../subagent-registry.persistence.test.ts | 4 +- src/config/sessions.test.ts | 92 +++++++- src/config/sessions/sessions.test.ts | 13 +- src/config/sessions/store-cache.ts | 17 ++ ...re-lock-state.ts => store-writer-state.ts} | 26 ++- src/config/sessions/store-writer.test.ts | 56 +++++ src/config/sessions/store-writer.ts | 97 +++++++++ src/config/sessions/store.lock.test.ts | 50 ----- src/config/sessions/store.ts | 203 +++--------------- src/cron/isolated-agent/session.ts | 3 +- src/extensionAPI.ts | 2 + .../server.sessions.reset-hooks.test.ts | 6 +- src/infra/heartbeat-runner.ts | 106 ++++----- src/plugin-sdk/config-runtime.ts | 1 + src/plugin-sdk/session-store-runtime.ts | 1 + .../test-helpers/plugin-runtime-mock.ts | 10 + src/plugins/runtime/index.test.ts | 2 + src/plugins/runtime/runtime-agent.ts | 9 +- src/plugins/runtime/types-core.ts | 2 + .../agent-consult-runtime.test.ts | 9 + src/realtime-voice/agent-consult-runtime.ts | 24 +-- src/test-utils/session-state-cleanup.test.ts | 30 +-- src/test-utils/session-state-cleanup.ts | 15 +- test/setup-openclaw-runtime.ts | 15 +- 32 files changed, 497 insertions(+), 395 deletions(-) rename src/config/sessions/{store-lock-state.ts => store-writer-state.ts} (60%) create mode 100644 src/config/sessions/store-writer.test.ts create mode 100644 src/config/sessions/store-writer.ts delete mode 100644 src/config/sessions/store.lock.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 74523cf9072..d017c67c547 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai - Models CLI: restore `openclaw models list --provider ` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji. - Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2. - Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn. +- Sessions: route Gateway session-store writes through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel. - Control UI/chat: show inline feedback when local slash-command dispatch is unavailable or fails unexpectedly instead of clearing the composer silently. Fixes #52105. Thanks @MooreQiao. - Memory/markdown: replace CRLF managed blocks in place and collapse duplicate marker blocks without rewriting unmanaged markdown, so Dreaming and Memory Wiki files self-heal from repeated generated sections. Fixes #75491; supersedes #75495, #75810, and #76008. Thanks @asaenokkostya-coder, @ottodeng, @everettjf, and @lrg913427-dot. - Agents/tools: return critical tool-loop circuit-breaker stops as blocked tool results instead of thrown tool failures, so models see the guardrail and stop retrying the same call. Thanks @rayraiser. diff --git a/docs/plugins/sdk-runtime.md b/docs/plugins/sdk-runtime.md index 5668bf72e29..1d36afe5401 100644 --- a/docs/plugins/sdk-runtime.md +++ b/docs/plugins/sdk-runtime.md @@ -114,11 +114,16 @@ Provider and channel execution paths must use the active runtime config snapshot ```typescript const storePath = api.runtime.agent.session.resolveStorePath(cfg); - const store = api.runtime.agent.session.loadSessionStore(cfg); - await api.runtime.agent.session.saveSessionStore(cfg, store); + const store = api.runtime.agent.session.loadSessionStore(storePath); + await api.runtime.agent.session.updateSessionStore(storePath, (nextStore) => { + // Patch one entry without replacing the whole file from stale state. + nextStore[sessionKey] = { ...nextStore[sessionKey], thinkingLevel: "high" }; + }); const filePath = api.runtime.agent.session.resolveSessionFilePath(cfg, sessionId); ``` + Prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)` for runtime writes. They route through the Gateway-owned session-store writer, preserve concurrent updates, and reuse the hot cache. `saveSessionStore(...)` remains available for compatibility and offline maintenance-style rewrites. + Default model and provider constants: diff --git a/docs/reference/session-management-compaction.md b/docs/reference/session-management-compaction.md index 2e83120668d..520256966da 100644 --- a/docs/reference/session-management-compaction.md +++ b/docs/reference/session-management-compaction.md @@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f - `maxDiskBytes`: optional sessions-directory budget - `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`) -Normal Gateway writes batch `maxEntries` cleanup for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately. +Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately. Maintenance keeps durable external conversation pointers such as group sessions and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks, diff --git a/extensions/voice-call/src/response-generator.test.ts b/extensions/voice-call/src/response-generator.test.ts index fa59e451df7..72a9fadc15a 100644 --- a/extensions/voice-call/src/response-generator.test.ts +++ b/extensions/voice-call/src/response-generator.test.ts @@ -6,6 +6,14 @@ import { generateVoiceResponse } from "./response-generator.js"; function createAgentRuntime(payloads: Array>) { const sessionStore: Record = {}; const saveSessionStore = vi.fn(async () => {}); + const updateSessionStore = vi.fn( + async ( + _storePath: string, + mutator: (store: Record) => unknown, + ) => { + return await mutator(sessionStore); + }, + ); const runEmbeddedPiAgent = vi.fn(async () => ({ payloads, meta: { durationMs: 12, aborted: false }, @@ -44,6 +52,7 @@ function createAgentRuntime(payloads: Array>) { resolveStorePath, loadSessionStore: () => sessionStore, saveSessionStore, + updateSessionStore, resolveSessionFilePath, }, } as unknown as CoreAgentDeps; @@ -52,6 +61,7 @@ function createAgentRuntime(payloads: Array>) { runtime, runEmbeddedPiAgent, saveSessionStore, + updateSessionStore, sessionStore, resolveAgentDir, resolveAgentWorkspaceDir, @@ -157,7 +167,7 @@ describe("generateVoiceResponse", () => { }); it("pins the voice session to responseModel before running the embedded agent", async () => { - const { runtime, runEmbeddedPiAgent, saveSessionStore, sessionStore } = createAgentRuntime([ + const { runtime, runEmbeddedPiAgent, updateSessionStore, sessionStore } = createAgentRuntime([ { text: '{"spoken":"Pinned model works."}' }, ]); const voiceConfig = VoiceCallConfigSchema.parse({ @@ -181,7 +191,10 @@ describe("generateVoiceResponse", () => { modelOverride: "gpt-4.1-nano", modelOverrideSource: "auto", }); - expect(saveSessionStore).toHaveBeenCalledWith("/tmp/openclaw/main/sessions.json", sessionStore); + expect(updateSessionStore).toHaveBeenCalledWith( + "/tmp/openclaw/main/sessions.json", + expect.any(Function), + ); expect(runEmbeddedPiAgent).toHaveBeenCalledWith( expect.objectContaining({ provider: "openai", diff --git a/extensions/voice-call/src/response-generator.ts b/extensions/voice-call/src/response-generator.ts index 4f18c6586e2..ed38654ad3c 100644 --- a/extensions/voice-call/src/response-generator.ts +++ b/extensions/voice-call/src/response-generator.ts @@ -224,34 +224,34 @@ export async function generateVoiceResponse( // Load or create session entry const sessionStore = agentRuntime.session.loadSessionStore(storePath); const now = Date.now(); - let sessionEntry = sessionStore[resolvedSessionKey] as SessionEntry | undefined; - let sessionEntryUpdated = false; - - if (!sessionEntry) { - sessionEntry = { - sessionId: crypto.randomUUID(), - updatedAt: now, - }; - sessionStore[resolvedSessionKey] = sessionEntry; - sessionEntryUpdated = true; - } - - const sessionId = sessionEntry.sessionId; + const existingSessionEntry = sessionStore[resolvedSessionKey] as SessionEntry | undefined; // Resolve model from config const { provider, model } = resolveVoiceResponseModel({ voiceConfig, agentRuntime }); - if (voiceConfig.responseModel) { - sessionEntryUpdated = - applyModelOverrideToSessionEntry({ - entry: sessionEntry, - selection: { provider, model }, - selectionSource: "auto", - }).updated || sessionEntryUpdated; - } - if (sessionEntryUpdated) { - await agentRuntime.session.saveSessionStore(storePath, sessionStore); + let sessionEntry = existingSessionEntry; + if (!sessionEntry?.sessionId || voiceConfig.responseModel) { + sessionEntry = await agentRuntime.session.updateSessionStore(storePath, (store) => { + let entry = store[resolvedSessionKey] as SessionEntry | undefined; + if (!entry?.sessionId) { + entry = { + ...entry, + sessionId: crypto.randomUUID(), + updatedAt: now, + }; + store[resolvedSessionKey] = entry; + } + if (voiceConfig.responseModel) { + applyModelOverrideToSessionEntry({ + entry, + selection: { provider, model }, + selectionSource: "auto", + }); + } + return entry; + }); } + const sessionId = sessionEntry.sessionId; const sessionFile = agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, { agentId, diff --git a/extensions/voice-call/src/runtime.test.ts b/extensions/voice-call/src/runtime.test.ts index a2fd56f8a28..6510b205184 100644 --- a/extensions/voice-call/src/runtime.test.ts +++ b/extensions/voice-call/src/runtime.test.ts @@ -337,6 +337,7 @@ describe("createVoiceCallRuntime lifecycle", () => { resolveStorePath: vi.fn(() => "/tmp/sessions.json"), loadSessionStore: vi.fn(() => sessionStore), saveSessionStore: vi.fn(async () => {}), + updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)), resolveSessionFilePath: vi.fn(() => "/tmp/session.json"), }, runEmbeddedPiAgent, @@ -421,6 +422,7 @@ describe("createVoiceCallRuntime lifecycle", () => { resolveStorePath: vi.fn(() => "/tmp/sessions.json"), loadSessionStore: vi.fn(() => sessionStore), saveSessionStore: vi.fn(async () => {}), + updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)), resolveSessionFilePath: vi.fn(() => "/tmp/session.json"), }, runEmbeddedPiAgent, @@ -483,6 +485,7 @@ describe("createVoiceCallRuntime lifecycle", () => { resolveStorePath: vi.fn(() => "/tmp/sessions.json"), loadSessionStore: vi.fn(() => sessionStore), saveSessionStore: vi.fn(async () => {}), + updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)), resolveSessionFilePath: vi.fn(() => "/tmp/session.json"), }, runEmbeddedPiAgent, diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts index a7120e14151..3079afeb09f 100644 --- a/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.test.ts @@ -1,12 +1,8 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { - drainSessionStoreLockQueuesForTest, - resetSessionStoreLockRuntimeForTests, - setSessionWriteLockAcquirerForTests, -} from "../config/sessions.js"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { drainSessionStoreWriterQueuesForTest } from "../config/sessions.js"; import { readCompactionCount, seedSessionStore, @@ -57,15 +53,8 @@ function createCompactionContext(params: { } as unknown as EmbeddedPiSubscribeContext; } -beforeEach(() => { - setSessionWriteLockAcquirerForTests(async () => ({ - release: async () => {}, - })); -}); - afterEach(async () => { - resetSessionStoreLockRuntimeForTests(); - await drainSessionStoreLockQueuesForTest(); + await drainSessionStoreWriterQueuesForTest(); }); describe("reconcileSessionStoreCompactionCountAfterSuccess", () => { diff --git a/src/agents/subagent-registry.persistence.resume.test.ts b/src/agents/subagent-registry.persistence.resume.test.ts index d46e70313a2..60aec9a328d 100644 --- a/src/agents/subagent-registry.persistence.resume.test.ts +++ b/src/agents/subagent-registry.persistence.resume.test.ts @@ -5,7 +5,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite import "./subagent-registry.mocks.shared.js"; import { clearSessionStoreCacheForTest, - drainSessionStoreLockQueuesForTest, + drainSessionStoreWriterQueuesForTest, } from "../config/sessions/store.js"; import { captureEnv } from "../test-utils/env.js"; import { @@ -131,7 +131,7 @@ describe("subagent registry persistence resume", () => { announceSpy.mockClear(); mod.__testing.setDepsForTest(); mod.resetSubagentRegistryForTests({ persist: false }); - await drainSessionStoreLockQueuesForTest(); + await drainSessionStoreWriterQueuesForTest(); clearSessionStoreCacheForTest(); if (tempStateDir) { await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }); diff --git a/src/agents/subagent-registry.persistence.test.ts b/src/agents/subagent-registry.persistence.test.ts index 6352e88cfa0..fcb04a3b6cf 100644 --- a/src/agents/subagent-registry.persistence.test.ts +++ b/src/agents/subagent-registry.persistence.test.ts @@ -6,7 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import "./subagent-registry.mocks.shared.js"; import { clearSessionStoreCacheForTest, - drainSessionStoreLockQueuesForTest, + drainSessionStoreWriterQueuesForTest, } from "../config/sessions/store.js"; import { callGateway } from "../gateway/call.js"; import { onAgentEvent } from "../infra/agent-events.js"; @@ -207,7 +207,7 @@ describe("subagent registry persistence", () => { announceSpy.mockClear(); __testing.setDepsForTest(); resetSubagentRegistryForTests({ persist: false }); - await drainSessionStoreLockQueuesForTest(); + await drainSessionStoreWriterQueuesForTest(); clearSessionStoreCacheForTest(); if (tempStateDir) { await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 }); diff --git a/src/config/sessions.test.ts b/src/config/sessions.test.ts index 638a8be8c20..9de3d1d4d4f 100644 --- a/src/config/sessions.test.ts +++ b/src/config/sessions.test.ts @@ -1,7 +1,8 @@ +import fsSync from "node:fs"; import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { afterAll, beforeAll, describe, expect, it } from "vitest"; +import { afterAll, beforeAll, describe, expect, it, vi } from "vitest"; import { withEnv } from "../test-utils/env.js"; import { buildGroupDisplayName, @@ -798,7 +799,7 @@ describe("sessions", () => { await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow(); }); - it("updateSessionStoreEntry re-reads disk inside lock instead of using stale cache", async () => { + it("updateSessionStoreEntry re-reads disk inside the writer slot instead of using stale cache", async () => { const mainSessionKey = "agent:main:main"; const { storePath } = await createSessionStoreFixture({ prefix: "updateSessionStoreEntry-cache-bypass", @@ -838,4 +839,91 @@ describe("sessions", () => { expect(store[mainSessionKey]?.providerOverride).toBe("anthropic"); expect(store[mainSessionKey]?.thinkingLevel).toBe("high"); }); + + it("updateSessionStore uses the writer-owned mutable cache without disk read or parse", async () => { + const mainSessionKey = "agent:main:main"; + const { storePath } = await createSessionStoreFixture({ + prefix: "updateSessionStore-mutable-cache", + entries: { + [mainSessionKey]: { + sessionId: "sess-1", + updatedAt: 123, + thinkingLevel: "low", + }, + }, + }); + + expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low"); + + const readSpy = vi.spyOn(fsSync, "readFileSync"); + const parseSpy = vi.spyOn(JSON, "parse"); + try { + await updateSessionStore( + storePath, + (store) => { + const existing = store[mainSessionKey]; + if (!existing) { + throw new Error("missing session entry"); + } + store[mainSessionKey] = { + ...existing, + thinkingLevel: "high", + }; + }, + { skipMaintenance: true }, + ); + + expect(readSpy).not.toHaveBeenCalled(); + expect(parseSpy).not.toHaveBeenCalled(); + } finally { + readSpy.mockRestore(); + parseSpy.mockRestore(); + } + + const store = loadSessionStore(storePath, { skipCache: true }); + expect(store[mainSessionKey]?.thinkingLevel).toBe("high"); + }); + + it("updateSessionStore drops a borrowed cache entry when a mutator throws", async () => { + const mainSessionKey = "agent:main:main"; + const { storePath } = await createSessionStoreFixture({ + prefix: "updateSessionStore-mutable-cache-throw", + entries: { + [mainSessionKey]: { + sessionId: "sess-1", + updatedAt: 123, + thinkingLevel: "low", + }, + }, + }); + + expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low"); + + await expect( + updateSessionStore( + storePath, + (store) => { + const existing = store[mainSessionKey]; + if (!existing) { + throw new Error("missing session entry"); + } + store[mainSessionKey] = { + ...existing, + thinkingLevel: "mutated-before-throw", + }; + throw new Error("boom"); + }, + { skipMaintenance: true }, + ), + ).rejects.toThrow("boom"); + + const readSpy = vi.spyOn(fsSync, "readFileSync"); + try { + const store = loadSessionStore(storePath); + expect(readSpy).toHaveBeenCalled(); + expect(store[mainSessionKey]?.thinkingLevel).toBe("low"); + } finally { + readSpy.mockRestore(); + } + }); }); diff --git a/src/config/sessions/sessions.test.ts b/src/config/sessions/sessions.test.ts index 53f9cd6b983..6525be9754d 100644 --- a/src/config/sessions/sessions.test.ts +++ b/src/config/sessions/sessions.test.ts @@ -271,15 +271,13 @@ describe("session lifecycle timestamps", () => { }); }); -describe("session store lock (Promise chain mutex)", () => { - const lockFixtureRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-lock-test-" }); - let lockTmpDirs: string[] = []; +describe("session store writer queue", () => { + const writerFixtureRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-writer-test-" }); async function makeTmpStore( initial: Record = {}, ): Promise<{ dir: string; storePath: string }> { - const dir = await lockFixtureRootTracker.make("case"); - lockTmpDirs.push(dir); + const dir = await writerFixtureRootTracker.make("case"); const storePath = path.join(dir, "sessions.json"); if (Object.keys(initial).length > 0) { await fsPromises.writeFile(storePath, JSON.stringify(initial, null, 2), "utf-8"); @@ -288,16 +286,15 @@ describe("session store lock (Promise chain mutex)", () => { } beforeAll(async () => { - await lockFixtureRootTracker.setup(); + await writerFixtureRootTracker.setup(); }); afterAll(async () => { - await lockFixtureRootTracker.cleanup(); + await writerFixtureRootTracker.cleanup(); }); afterEach(async () => { clearSessionStoreCacheForTest(); - lockTmpDirs = []; }); it("serializes concurrent updateSessionStore calls without data loss", async () => { diff --git a/src/config/sessions/store-cache.ts b/src/config/sessions/store-cache.ts index 85693b329b8..a9fc012b3a6 100644 --- a/src/config/sessions/store-cache.ts +++ b/src/config/sessions/store-cache.ts @@ -75,6 +75,23 @@ export function readSessionStoreCache(params: { return cloneSessionStoreRecord(cached.store, cached.serialized); } +export function takeMutableSessionStoreCache(params: { + storePath: string; + mtimeMs?: number; + sizeBytes?: number; +}): Record | null { + const cached = SESSION_STORE_CACHE.get(params.storePath); + if (!cached) { + return null; + } + if (params.mtimeMs !== cached.mtimeMs || params.sizeBytes !== cached.sizeBytes) { + invalidateSessionStoreCache(params.storePath); + return null; + } + SESSION_STORE_CACHE.delete(params.storePath); + return cached.store; +} + export function writeSessionStoreCache(params: { storePath: string; store: Record; diff --git a/src/config/sessions/store-lock-state.ts b/src/config/sessions/store-writer-state.ts similarity index 60% rename from src/config/sessions/store-lock-state.ts rename to src/config/sessions/store-writer-state.ts index 7ac92f93e31..f847c078772 100644 --- a/src/config/sessions/store-lock-state.ts +++ b/src/config/sessions/store-writer-state.ts @@ -1,34 +1,32 @@ import { clearSessionStoreCaches } from "./store-cache.js"; -export type SessionStoreLockTask = { +export type SessionStoreWriterTask = { fn: () => Promise; resolve: (value: unknown) => void; reject: (reason: unknown) => void; - timeoutMs?: number; - staleMs: number; }; -export type SessionStoreLockQueue = { +export type SessionStoreWriterQueue = { running: boolean; - pending: SessionStoreLockTask[]; + pending: SessionStoreWriterTask[]; drainPromise: Promise | null; }; -export const LOCK_QUEUES = new Map(); +export const WRITER_QUEUES = new Map(); export function clearSessionStoreCacheForTest(): void { clearSessionStoreCaches(); - for (const queue of LOCK_QUEUES.values()) { + for (const queue of WRITER_QUEUES.values()) { for (const task of queue.pending) { task.reject(new Error("session store queue cleared for test")); } } - LOCK_QUEUES.clear(); + WRITER_QUEUES.clear(); } -export async function drainSessionStoreLockQueuesForTest(): Promise { - while (LOCK_QUEUES.size > 0) { - const queues = [...LOCK_QUEUES.values()]; +export async function drainSessionStoreWriterQueuesForTest(): Promise { + while (WRITER_QUEUES.size > 0) { + const queues = [...WRITER_QUEUES.values()]; for (const queue of queues) { for (const task of queue.pending) { task.reject(new Error("session store queue cleared for test")); @@ -39,13 +37,13 @@ export async function drainSessionStoreLockQueuesForTest(): Promise { queue.drainPromise ? [queue.drainPromise] : [], ); if (activeDrains.length === 0) { - LOCK_QUEUES.clear(); + WRITER_QUEUES.clear(); return; } await Promise.allSettled(activeDrains); } } -export function getSessionStoreLockQueueSizeForTest(): number { - return LOCK_QUEUES.size; +export function getSessionStoreWriterQueueSizeForTest(): number { + return WRITER_QUEUES.size; } diff --git a/src/config/sessions/store-writer.test.ts b/src/config/sessions/store-writer.test.ts new file mode 100644 index 00000000000..f6e91055344 --- /dev/null +++ b/src/config/sessions/store-writer.test.ts @@ -0,0 +1,56 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + clearSessionStoreCacheForTest, + getSessionStoreWriterQueueSizeForTest, + withSessionStoreWriterForTest, +} from "./store.js"; + +const createDeferred = () => { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((nextResolve, nextReject) => { + resolve = nextResolve; + reject = nextReject; + }); + return { promise, resolve, reject }; +}; + +describe("session store writer", () => { + afterEach(() => { + clearSessionStoreCacheForTest(); + }); + + it("serializes runtime writes through one in-process writer", async () => { + const storePath = "/tmp/openclaw-store.json"; + const firstStarted = createDeferred(); + const releaseFirst = createDeferred(); + const order: string[] = []; + + const first = withSessionStoreWriterForTest(storePath, async () => { + order.push("first:start"); + firstStarted.resolve(); + await releaseFirst.promise; + order.push("first:end"); + }); + const second = withSessionStoreWriterForTest(storePath, async () => { + order.push("second"); + }); + + await firstStarted.promise; + expect(getSessionStoreWriterQueueSizeForTest()).toBe(1); + expect(order).toEqual(["first:start"]); + + releaseFirst.resolve(); + await Promise.all([first, second]); + + expect(order).toEqual(["first:start", "first:end", "second"]); + expect(getSessionStoreWriterQueueSizeForTest()).toBe(0); + }); + + it("rejects empty store paths before enqueuing work", async () => { + await expect(withSessionStoreWriterForTest("", async () => undefined)).rejects.toThrow( + /storePath must be a non-empty string/, + ); + expect(getSessionStoreWriterQueueSizeForTest()).toBe(0); + }); +}); diff --git a/src/config/sessions/store-writer.ts b/src/config/sessions/store-writer.ts new file mode 100644 index 00000000000..392e7d03c9a --- /dev/null +++ b/src/config/sessions/store-writer.ts @@ -0,0 +1,97 @@ +import { + WRITER_QUEUES, + type SessionStoreWriterQueue, + type SessionStoreWriterTask, +} from "./store-writer-state.js"; + +export async function withSessionStoreWriterForTest( + storePath: string, + fn: () => Promise, +): Promise { + return await runExclusiveSessionStoreWrite(storePath, fn); +} + +function getOrCreateWriterQueue(storePath: string): SessionStoreWriterQueue { + const existing = WRITER_QUEUES.get(storePath); + if (existing) { + return existing; + } + const created: SessionStoreWriterQueue = { running: false, pending: [], drainPromise: null }; + WRITER_QUEUES.set(storePath, created); + return created; +} + +async function drainSessionStoreWriterQueue(storePath: string): Promise { + const queue = WRITER_QUEUES.get(storePath); + if (!queue) { + return; + } + if (queue.drainPromise) { + await queue.drainPromise; + return; + } + queue.running = true; + queue.drainPromise = (async () => { + try { + while (queue.pending.length > 0) { + const task = queue.pending.shift(); + if (!task) { + continue; + } + + let result: unknown; + let failed: unknown; + let hasFailure = false; + try { + result = await task.fn(); + } catch (err) { + hasFailure = true; + failed = err; + } + if (hasFailure) { + task.reject(failed); + continue; + } + task.resolve(result); + } + } finally { + queue.running = false; + queue.drainPromise = null; + if (queue.pending.length === 0) { + WRITER_QUEUES.delete(storePath); + } else { + queueMicrotask(() => { + void drainSessionStoreWriterQueue(storePath); + }); + } + } + })(); + await queue.drainPromise; +} + +export async function runExclusiveSessionStoreWrite( + storePath: string, + fn: () => Promise, +): Promise { + if (!storePath || typeof storePath !== "string") { + throw new Error( + `runExclusiveSessionStoreWrite: storePath must be a non-empty string, got ${JSON.stringify( + storePath, + )}`, + ); + } + const queue = getOrCreateWriterQueue(storePath); + + const promise = new Promise((resolve, reject) => { + const task: SessionStoreWriterTask = { + fn: async () => await fn(), + resolve: (value) => resolve(value as T), + reject, + }; + + queue.pending.push(task); + void drainSessionStoreWriterQueue(storePath); + }); + + return await promise; +} diff --git a/src/config/sessions/store.lock.test.ts b/src/config/sessions/store.lock.test.ts deleted file mode 100644 index b8def5188c9..00000000000 --- a/src/config/sessions/store.lock.test.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { - clearSessionStoreCacheForTest, - resetSessionStoreLockRuntimeForTests, - setSessionWriteLockAcquirerForTests, - withSessionStoreLockForTest, -} from "./store.js"; - -const acquireSessionWriteLockMock = vi.hoisted(() => - vi.fn(async () => ({ release: vi.fn(async () => {}) })), -); - -describe("withSessionStoreLock", () => { - beforeEach(() => { - acquireSessionWriteLockMock.mockClear(); - setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock); - }); - - afterEach(() => { - clearSessionStoreCacheForTest(); - resetSessionStoreLockRuntimeForTests(); - vi.restoreAllMocks(); - }); - - it("derives session lock hold time from the store lock timeout", async () => { - await withSessionStoreLockForTest("/tmp/openclaw-store.json", async () => {}, { - timeoutMs: 10_000, - }); - - expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({ - sessionFile: "/tmp/openclaw-store.json", - timeoutMs: 10_000, - staleMs: 30_000, - maxHoldMs: 15_000, - }); - }); - - it("leaves the session lock hold time unset when store locking has no timeout", async () => { - await withSessionStoreLockForTest("/tmp/openclaw-store.json", async () => {}, { - timeoutMs: 0, - }); - - expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({ - sessionFile: "/tmp/openclaw-store.json", - timeoutMs: Number.POSITIVE_INFINITY, - staleMs: 30_000, - maxHoldMs: undefined, - }); - }); -}); diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index 7456c85495a..43bf615d83c 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -1,9 +1,5 @@ import fs from "node:fs"; import path from "node:path"; -import { - acquireSessionWriteLock, - resolveSessionLockMaxHoldFromTimeout, -} from "../../agents/session-write-lock.js"; import type { MsgContext } from "../../auto-reply/templating.js"; import { writeTextAtomic } from "../../infra/json-files.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; @@ -22,18 +18,11 @@ import { getSerializedSessionStore, isSessionStoreCacheEnabled, setSerializedSessionStore, + takeMutableSessionStoreCache, writeSessionStoreCache, } from "./store-cache.js"; import { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js"; import { loadSessionStore, normalizeSessionStore } from "./store-load.js"; -import { - clearSessionStoreCacheForTest, - drainSessionStoreLockQueuesForTest, - getSessionStoreLockQueueSizeForTest, - LOCK_QUEUES, - type SessionStoreLockQueue, - type SessionStoreLockTask, -} from "./store-lock-state.js"; import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js"; import { capEntryCount, @@ -43,6 +32,7 @@ import { type ResolvedSessionMaintenanceConfig, type SessionMaintenanceWarning, } from "./store-maintenance.js"; +import { runExclusiveSessionStoreWrite } from "./store-writer.js"; import { mergeSessionEntry, mergeSessionEntryPreserveActivity, @@ -51,9 +41,10 @@ import { export { clearSessionStoreCacheForTest, - drainSessionStoreLockQueuesForTest, - getSessionStoreLockQueueSizeForTest, -} from "./store-lock-state.js"; + drainSessionStoreWriterQueuesForTest, + getSessionStoreWriterQueueSizeForTest, +} from "./store-writer-state.js"; +export { withSessionStoreWriterForTest } from "./store-writer.js"; export { loadSessionStore } from "./store-load.js"; export { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js"; @@ -63,7 +54,6 @@ let sessionArchiveRuntimePromise: Promise< > | null = null; let trajectoryCleanupRuntimePromise: Promise | null = null; -let sessionWriteLockAcquirerForTests: typeof acquireSessionWriteLock | null = null; function loadSessionArchiveRuntime() { sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js"); @@ -84,24 +74,6 @@ function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryCon return next; } -export function setSessionWriteLockAcquirerForTests( - acquirer: typeof acquireSessionWriteLock | null, -): void { - sessionWriteLockAcquirerForTests = acquirer; -} - -export function resetSessionStoreLockRuntimeForTests(): void { - sessionWriteLockAcquirerForTests = null; -} - -export async function withSessionStoreLockForTest( - storePath: string, - fn: () => Promise, - opts: SessionStoreLockOptions = {}, -): Promise { - return await withSessionStoreLock(storePath, fn, opts); -} - export function readSessionUpdatedAt(params: { storePath: string; sessionKey: string; @@ -177,6 +149,21 @@ function updateSessionStoreWriteCaches(params: { }); } +function loadMutableSessionStoreForWriter(storePath: string): Record { + if (isSessionStoreCacheEnabled()) { + const currentFileStat = getFileStatSnapshot(storePath); + const cached = takeMutableSessionStoreCache({ + storePath, + mtimeMs: currentFileStat?.mtimeMs, + sizeBytes: currentFileStat?.sizeBytes, + }); + if (cached) { + return cached; + } + } + return loadSessionStore(storePath, { skipCache: true, clone: false }); +} + function resolveMutableSessionStoreKey( store: Record, sessionKey: string, @@ -403,7 +390,7 @@ async function saveSessionStoreUnlocked( await new Promise((r) => setTimeout(r, 50 * (i + 1))); continue; } - // Final attempt failed — skip this save. The write lock ensures + // Final attempt failed - skip this save. The writer queue ensures // the next save will retry with fresh data. Log for diagnostics. log.warn(`atomic write failed after 5 attempts: ${storePath}`); } @@ -440,7 +427,7 @@ export async function saveSessionStore( store: Record, opts?: SaveSessionStoreOptions, ): Promise { - await withSessionStoreLock(storePath, async () => { + await runExclusiveSessionStoreWrite(storePath, async () => { await saveSessionStoreUnlocked(storePath, store, opts); }); } @@ -450,9 +437,8 @@ export async function updateSessionStore( mutator: (store: Record) => Promise | T, opts?: SaveSessionStoreOptions, ): Promise { - return await withSessionStoreLock(storePath, async () => { - // Always re-read inside the lock to avoid clobbering concurrent writers. - const store = loadSessionStore(storePath, { skipCache: true, clone: false }); + return await runExclusiveSessionStoreWrite(storePath, async () => { + const store = loadMutableSessionStoreForWriter(storePath); const previousAcpByKey = collectAcpMetadataSnapshot(store); const result = await mutator(store); preserveExistingAcpMetadata({ @@ -465,15 +451,6 @@ export async function updateSessionStore( }); } -type SessionStoreLockOptions = { - timeoutMs?: number; - pollIntervalMs?: number; - staleMs?: number; -}; - -const SESSION_STORE_LOCK_MIN_HOLD_MS = 5_000; -const SESSION_STORE_LOCK_TIMEOUT_GRACE_MS = 5_000; - function getErrorCode(error: unknown): string | null { if (!error || typeof error !== "object" || !("code" in error)) { return null; @@ -546,136 +523,14 @@ async function persistResolvedSessionEntry(params: { return params.next; } -function lockTimeoutError(storePath: string): Error { - return new Error(`timeout waiting for session store lock: ${storePath}`); -} - -function resolveSessionStoreLockMaxHoldMs(timeoutMs: number | undefined): number | undefined { - if (timeoutMs == null || !Number.isFinite(timeoutMs) || timeoutMs <= 0) { - return undefined; - } - return resolveSessionLockMaxHoldFromTimeout({ - timeoutMs, - graceMs: SESSION_STORE_LOCK_TIMEOUT_GRACE_MS, - minMs: SESSION_STORE_LOCK_MIN_HOLD_MS, - }); -} - -function getOrCreateLockQueue(storePath: string): SessionStoreLockQueue { - const existing = LOCK_QUEUES.get(storePath); - if (existing) { - return existing; - } - const created: SessionStoreLockQueue = { running: false, pending: [], drainPromise: null }; - LOCK_QUEUES.set(storePath, created); - return created; -} - -async function drainSessionStoreLockQueue(storePath: string): Promise { - const queue = LOCK_QUEUES.get(storePath); - if (!queue) { - return; - } - if (queue.drainPromise) { - await queue.drainPromise; - return; - } - queue.running = true; - queue.drainPromise = (async () => { - try { - while (queue.pending.length > 0) { - const task = queue.pending.shift(); - if (!task) { - continue; - } - - const remainingTimeoutMs = task.timeoutMs ?? Number.POSITIVE_INFINITY; - if (task.timeoutMs != null && remainingTimeoutMs <= 0) { - task.reject(lockTimeoutError(storePath)); - continue; - } - - let lock: { release: () => Promise } | undefined; - let result: unknown; - let failed: unknown; - let hasFailure = false; - try { - lock = await (sessionWriteLockAcquirerForTests ?? acquireSessionWriteLock)({ - sessionFile: storePath, - timeoutMs: remainingTimeoutMs, - staleMs: task.staleMs, - maxHoldMs: resolveSessionStoreLockMaxHoldMs(task.timeoutMs), - }); - result = await task.fn(); - } catch (err) { - hasFailure = true; - failed = err; - } finally { - await lock?.release().catch(() => undefined); - } - if (hasFailure) { - task.reject(failed); - continue; - } - task.resolve(result); - } - } finally { - queue.running = false; - queue.drainPromise = null; - if (queue.pending.length === 0) { - LOCK_QUEUES.delete(storePath); - } else { - queueMicrotask(() => { - void drainSessionStoreLockQueue(storePath); - }); - } - } - })(); - await queue.drainPromise; -} - -async function withSessionStoreLock( - storePath: string, - fn: () => Promise, - opts: SessionStoreLockOptions = {}, -): Promise { - if (!storePath || typeof storePath !== "string") { - throw new Error( - `withSessionStoreLock: storePath must be a non-empty string, got ${JSON.stringify(storePath)}`, - ); - } - const timeoutMs = opts.timeoutMs ?? 10_000; - const staleMs = opts.staleMs ?? 30_000; - // `pollIntervalMs` is retained for API compatibility with older lock options. - void opts.pollIntervalMs; - - const hasTimeout = timeoutMs > 0 && Number.isFinite(timeoutMs); - const queue = getOrCreateLockQueue(storePath); - - const promise = new Promise((resolve, reject) => { - const task: SessionStoreLockTask = { - fn: async () => await fn(), - resolve: (value) => resolve(value as T), - reject, - timeoutMs: hasTimeout ? timeoutMs : undefined, - staleMs, - }; - - queue.pending.push(task); - void drainSessionStoreLockQueue(storePath); - }); - - return await promise; -} - export async function updateSessionStoreEntry(params: { storePath: string; sessionKey: string; update: (entry: SessionEntry) => Promise | null>; }): Promise { const { storePath, sessionKey, update } = params; - return await withSessionStoreLock(storePath, async () => { - const store = loadSessionStore(storePath, { skipCache: true, clone: false }); + return await runExclusiveSessionStoreWrite(storePath, async () => { + const store = loadMutableSessionStoreForWriter(storePath); const resolved = resolveSessionStoreEntry({ store, sessionKey }); const existing = resolved.existing; if (!existing) { @@ -756,8 +611,8 @@ export async function updateLastRoute(params: { }): Promise { const { storePath, sessionKey, channel, to, accountId, threadId, ctx } = params; const createIfMissing = params.createIfMissing ?? true; - return await withSessionStoreLock(storePath, async () => { - const store = loadSessionStore(storePath); + return await runExclusiveSessionStoreWrite(storePath, async () => { + const store = loadMutableSessionStoreForWriter(storePath); const resolved = resolveSessionStoreEntry({ store, sessionKey }); const existing = resolved.existing; if (!existing && !createIfMissing) { diff --git a/src/cron/isolated-agent/session.ts b/src/cron/isolated-agent/session.ts index fc59ef9199b..5596f17a34c 100644 --- a/src/cron/isolated-agent/session.ts +++ b/src/cron/isolated-agent/session.ts @@ -106,12 +106,13 @@ export function resolveCronSession(params: { nowMs: number; agentId: string; forceNew?: boolean; + store?: Record; }) { const sessionCfg = params.cfg.session; const storePath = resolveStorePath(sessionCfg?.store, { agentId: params.agentId, }); - const store = loadSessionStore(storePath); + const store = params.store ?? loadSessionStore(storePath); const entry = store[params.sessionKey]; // Check if we can reuse an existing session diff --git a/src/extensionAPI.ts b/src/extensionAPI.ts index 46da1b0284c..9e354998fa6 100644 --- a/src/extensionAPI.ts +++ b/src/extensionAPI.ts @@ -31,5 +31,7 @@ export { resolveStorePath, loadSessionStore, saveSessionStore, + updateSessionStore, + updateSessionStoreEntry, resolveSessionFilePath, } from "./config/sessions.js"; diff --git a/src/gateway/server.sessions.reset-hooks.test.ts b/src/gateway/server.sessions.reset-hooks.test.ts index 3ec77cefd8a..b61311b1a6c 100644 --- a/src/gateway/server.sessions.reset-hooks.test.ts +++ b/src/gateway/server.sessions.reset-hooks.test.ts @@ -214,7 +214,7 @@ test("sessions.reset returns unavailable when active run does not stop", async ( expect(filesAfterResetAttempt.some((f) => f.startsWith("sess-main.jsonl.reset."))).toBe(false); }); -test("sessions.reset emits before_reset for the entry actually reset under the store lock", async () => { +test("sessions.reset emits before_reset for the entry actually reset in the writer slot", async () => { const { dir } = await createSessionStoreDir(); const oldTranscriptPath = path.join(dir, "sess-old.jsonl"); const newTranscriptPath = path.join(dir, "sess-new.jsonl"); @@ -251,7 +251,7 @@ test("sessions.reset emits before_reset for the entry actually reset under the s const [ { getRuntimeConfig }, { resolveGatewaySessionStoreTarget }, - { withSessionStoreLockForTest }, + { withSessionStoreWriterForTest }, ] = await Promise.all([ import("../config/config.js"), import("./session-utils.js"), @@ -266,7 +266,7 @@ test("sessions.reset emits before_reset for the entry actually reset under the s | ReturnType<(typeof import("./session-reset-service.js"))["performGatewaySessionReset"]> | undefined; const { performGatewaySessionReset } = await import("./session-reset-service.js"); - await withSessionStoreLockForTest(gatewayStorePath, async () => { + await withSessionStoreWriterForTest(gatewayStorePath, async () => { pendingReset = performGatewaySessionReset({ key: "main", reason: "new", diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 3809d2ae4b4..3a1c97b3e4f 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -54,11 +54,7 @@ import { } from "../config/sessions/main-session.js"; import { resolveStorePath } from "../config/sessions/paths.js"; import { loadSessionStore } from "../config/sessions/store-load.js"; -import { - archiveRemovedSessionTranscripts, - saveSessionStore, - updateSessionStore, -} from "../config/sessions/store.js"; +import { archiveRemovedSessionTranscripts, updateSessionStore } from "../config/sessions/store.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import { hasActiveCronJobs } from "../cron/active-jobs.js"; @@ -1179,40 +1175,46 @@ export async function runHeartbeatOnce(opts: { configuredSessionKey: configuredSession.sessionKey, sessionEntry: entry, }); - const cronSession = resolveCronSession({ - cfg, - sessionKey: isolatedSessionKey, - agentId, - nowMs: startedAt, - forceNew: true, - }); + const isolatedStorePath = resolveStorePath(cfg.session?.store, { agentId }); const staleIsolatedSessionKey = resolveStaleHeartbeatIsolatedSessionKey({ sessionKey, isolatedSessionKey, isolatedBaseSessionKey, }); const removedSessionFiles = new Map(); - if (staleIsolatedSessionKey) { - const staleEntry = cronSession.store[staleIsolatedSessionKey]; - if (staleEntry?.sessionId) { - removedSessionFiles.set(staleEntry.sessionId, staleEntry.sessionFile); + let referencedSessionIds = new Set(); + await updateSessionStore(isolatedStorePath, (store) => { + const cronSession = resolveCronSession({ + cfg, + sessionKey: isolatedSessionKey, + agentId, + nowMs: startedAt, + forceNew: true, + store, + }); + if (staleIsolatedSessionKey) { + const staleEntry = store[staleIsolatedSessionKey]; + if (staleEntry?.sessionId) { + removedSessionFiles.set(staleEntry.sessionId, staleEntry.sessionFile); + } + delete store[staleIsolatedSessionKey]; } - delete cronSession.store[staleIsolatedSessionKey]; - } - cronSession.sessionEntry.heartbeatIsolatedBaseSessionKey = isolatedBaseSessionKey; - cronSession.store[isolatedSessionKey] = cronSession.sessionEntry; - await saveSessionStore(cronSession.storePath, cronSession.store); + store[isolatedSessionKey] = { + ...cronSession.sessionEntry, + heartbeatIsolatedBaseSessionKey: isolatedBaseSessionKey, + }; + referencedSessionIds = new Set( + Object.values(store) + .map((sessionEntry) => sessionEntry?.sessionId) + .filter((sessionId): sessionId is string => Boolean(sessionId)), + ); + }); if (removedSessionFiles.size > 0) { try { - const referencedSessionIds = new Set( - Object.values(cronSession.store) - .map((sessionEntry) => sessionEntry?.sessionId) - .filter((sessionId): sessionId is string => Boolean(sessionId)), - ); await archiveRemovedSessionTranscripts({ removedSessionFiles, referencedSessionIds, - storePath: cronSession.storePath, + storePath: isolatedStorePath, reason: "deleted", restrictToStoreDir: true, }); @@ -1242,29 +1244,30 @@ export async function runHeartbeatOnce(opts: { if (!preflight.tasks || preflight.tasks.length === 0) { return; } + const tasks = preflight.tasks; - const store = loadSessionStore(storePath); - const current = store[sessionKey]; - // Initialize stub entry on first run when current doesn't exist - const base = current ?? { - // Generate valid sessionId - derive from sessionKey without colons - sessionId: sessionKey.replace(/:/g, "_"), - updatedAt: startedAt, - createdAt: startedAt, - messageCount: 0, - lastMessageAt: startedAt, - heartbeatTaskState: {}, - }; - const taskState = { ...base.heartbeatTaskState }; + await updateSessionStore(storePath, (store) => { + const current = store[sessionKey]; + // Initialize stub entry on first run when current doesn't exist. + const base = current ?? { + // Generate valid sessionId - derive from sessionKey without colons. + sessionId: sessionKey.replace(/:/g, "_"), + updatedAt: startedAt, + createdAt: startedAt, + messageCount: 0, + lastMessageAt: startedAt, + heartbeatTaskState: {}, + }; + const taskState = { ...base.heartbeatTaskState }; - for (const task of preflight.tasks) { - if (isTaskDue(taskState[task.name], task.interval, startedAt)) { - taskState[task.name] = startedAt; + for (const task of tasks) { + if (isTaskDue(taskState[task.name], task.interval, startedAt)) { + taskState[task.name] = startedAt; + } } - } - store[sessionKey] = { ...base, heartbeatTaskState: taskState }; - await saveSessionStore(storePath, store); + store[sessionKey] = { ...base, heartbeatTaskState: taskState }; + }); }; const consumeInspectedSystemEvents = () => { @@ -1658,16 +1661,17 @@ export async function runHeartbeatOnce(opts: { // Record last delivered heartbeat payload for dedupe. if (!shouldSkipMain && normalized.text.trim()) { - const store = loadSessionStore(storePath); - const current = store[sessionKey]; - if (current) { + await updateSessionStore(storePath, (store) => { + const current = store[sessionKey]; + if (!current) { + return; + } store[sessionKey] = { ...current, lastHeartbeatText: normalized.text, lastHeartbeatSentAt: startedAt, }; - await saveSessionStore(storePath, store); - } + }); } emitHeartbeatEvent({ diff --git a/src/plugin-sdk/config-runtime.ts b/src/plugin-sdk/config-runtime.ts index 8fe5fc4138f..139da1a7bcb 100644 --- a/src/plugin-sdk/config-runtime.ts +++ b/src/plugin-sdk/config-runtime.ts @@ -135,6 +135,7 @@ export { saveSessionStore, updateLastRoute, updateSessionStore, + updateSessionStoreEntry, resolveSessionStoreEntry, } from "../config/sessions/store.js"; export { resolveSessionKey } from "../config/sessions/session-key.js"; diff --git a/src/plugin-sdk/session-store-runtime.ts b/src/plugin-sdk/session-store-runtime.ts index cae9de10352..3370552e5d9 100644 --- a/src/plugin-sdk/session-store-runtime.ts +++ b/src/plugin-sdk/session-store-runtime.ts @@ -14,6 +14,7 @@ export { saveSessionStore, updateLastRoute, updateSessionStore, + updateSessionStoreEntry, } from "../config/sessions/store.js"; export { evaluateSessionFreshness, diff --git a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts index 95cfda24494..85b1e89c240 100644 --- a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts +++ b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts @@ -353,6 +353,16 @@ export function createPluginRuntimeMock(overrides: DeepPartial = .mockResolvedValue( undefined, ) as unknown as PluginRuntime["agent"]["session"]["saveSessionStore"], + updateSessionStore: vi + .fn() + .mockResolvedValue( + undefined, + ) as unknown as PluginRuntime["agent"]["session"]["updateSessionStore"], + updateSessionStoreEntry: vi + .fn() + .mockResolvedValue( + null, + ) as unknown as PluginRuntime["agent"]["session"]["updateSessionStoreEntry"], resolveSessionFilePath: vi.fn( (sessionId: string) => `/tmp/${sessionId}.json`, ) as unknown as PluginRuntime["agent"]["session"]["resolveSessionFilePath"], diff --git a/src/plugins/runtime/index.test.ts b/src/plugins/runtime/index.test.ts index b65cb068c64..fc751fc1323 100644 --- a/src/plugins/runtime/index.test.ts +++ b/src/plugins/runtime/index.test.ts @@ -274,6 +274,8 @@ describe("plugin runtime command execution", () => { "resolveAgentDir", ]); expectFunctionKeys(runtime.agent.session as Record, [ + "updateSessionStore", + "updateSessionStoreEntry", "resolveSessionFilePath", ]); }, diff --git a/src/plugins/runtime/runtime-agent.ts b/src/plugins/runtime/runtime-agent.ts index 9a1f1a44fc1..fbf097f2262 100644 --- a/src/plugins/runtime/runtime-agent.ts +++ b/src/plugins/runtime/runtime-agent.ts @@ -10,7 +10,12 @@ import { ensureAgentWorkspace } from "../../agents/workspace.js"; import { normalizeThinkLevel, resolveThinkingProfile } from "../../auto-reply/thinking.js"; import { getRuntimeConfig } from "../../config/config.js"; import { resolveSessionFilePath, resolveStorePath } from "../../config/sessions/paths.js"; -import { loadSessionStore, saveSessionStore } from "../../config/sessions/store.js"; +import { + loadSessionStore, + saveSessionStore, + updateSessionStore, + updateSessionStoreEntry, +} from "../../config/sessions/store.js"; import { createLazyRuntimeMethod, createLazyRuntimeModule } from "../../shared/lazy-runtime.js"; import { defineCachedValue } from "./runtime-cache.js"; import type { PluginRuntime } from "./types.js"; @@ -68,6 +73,8 @@ export function createRuntimeAgent(): PluginRuntime["agent"] { resolveStorePath, loadSessionStore, saveSessionStore, + updateSessionStore, + updateSessionStoreEntry, resolveSessionFilePath, })); diff --git a/src/plugins/runtime/types-core.ts b/src/plugins/runtime/types-core.ts index 2d76d7bed7b..bd088ca44ab 100644 --- a/src/plugins/runtime/types-core.ts +++ b/src/plugins/runtime/types-core.ts @@ -147,6 +147,8 @@ export type PluginRuntimeCore = { resolveStorePath: typeof import("../../config/sessions/paths.js").resolveStorePath; loadSessionStore: typeof import("../../config/sessions/store-load.js").loadSessionStore; saveSessionStore: import("../../config/sessions/runtime-types.js").SaveSessionStore; + updateSessionStore: typeof import("../../config/sessions/store.js").updateSessionStore; + updateSessionStoreEntry: typeof import("../../config/sessions/store.js").updateSessionStoreEntry; resolveSessionFilePath: typeof import("../../config/sessions/paths.js").resolveSessionFilePath; }; }; diff --git a/src/realtime-voice/agent-consult-runtime.test.ts b/src/realtime-voice/agent-consult-runtime.test.ts index 2a24ba02df2..568970e38ab 100644 --- a/src/realtime-voice/agent-consult-runtime.test.ts +++ b/src/realtime-voice/agent-consult-runtime.test.ts @@ -12,6 +12,14 @@ function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) { payloads, meta: {}, })); + const updateSessionStore = vi.fn( + async ( + _storePath: string, + mutator: (store: Record) => unknown, + ) => { + return await mutator(sessionStore); + }, + ); return { runtime: { resolveAgentDir: vi.fn(() => "/tmp/agent"), @@ -22,6 +30,7 @@ function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) { resolveStorePath: vi.fn(() => "/tmp/sessions.json"), loadSessionStore: vi.fn(() => sessionStore), saveSessionStore: vi.fn(async () => {}), + updateSessionStore, resolveSessionFilePath: vi.fn(() => "/tmp/session.json"), }, runEmbeddedPiAgent, diff --git a/src/realtime-voice/agent-consult-runtime.ts b/src/realtime-voice/agent-consult-runtime.ts index e256c80d896..df2a57bef75 100644 --- a/src/realtime-voice/agent-consult-runtime.ts +++ b/src/realtime-voice/agent-consult-runtime.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import type { RunEmbeddedPiAgentParams } from "../agents/pi-embedded-runner/run/params.js"; +import type { SessionEntry } from "../config/sessions/types.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; import type { RuntimeLogger, PluginRuntimeCore } from "../plugins/runtime/types-core.js"; import { @@ -54,20 +55,19 @@ export async function consultRealtimeVoiceAgent(params: { const storePath = params.agentRuntime.session.resolveStorePath(params.cfg.session?.store, { agentId, }); - const sessionStore = params.agentRuntime.session.loadSessionStore(storePath); const now = Date.now(); - const existing = sessionStore[params.sessionKey] as - | { sessionId?: string; updatedAt?: number } - | undefined; - const sessionId = existing?.sessionId?.trim() || randomUUID(); - sessionStore[params.sessionKey] = { ...existing, sessionId, updatedAt: now }; - await params.agentRuntime.session.saveSessionStore(storePath, sessionStore); + const sessionEntry = await params.agentRuntime.session.updateSessionStore(storePath, (store) => { + const existing = store[params.sessionKey] as SessionEntry | undefined; + const sessionId = existing?.sessionId?.trim() || randomUUID(); + const next: SessionEntry = { ...existing, sessionId, updatedAt: now }; + store[params.sessionKey] = next; + return next; + }); + const sessionId = sessionEntry.sessionId; - const sessionFile = params.agentRuntime.session.resolveSessionFilePath( - sessionId, - sessionStore[params.sessionKey], - { agentId }, - ); + const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, { + agentId, + }); const result = await params.agentRuntime.runEmbeddedPiAgent({ sessionId, sessionKey: params.sessionKey, diff --git a/src/test-utils/session-state-cleanup.test.ts b/src/test-utils/session-state-cleanup.test.ts index 4c22e128a14..79cfd4de925 100644 --- a/src/test-utils/session-state-cleanup.test.ts +++ b/src/test-utils/session-state-cleanup.test.ts @@ -5,10 +5,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { resetSessionWriteLockStateForTest } from "../agents/session-write-lock.js"; import { clearSessionStoreCacheForTest, - getSessionStoreLockQueueSizeForTest, - resetSessionStoreLockRuntimeForTests, - setSessionWriteLockAcquirerForTests, - withSessionStoreLockForTest, + getSessionStoreWriterQueueSizeForTest, + withSessionStoreWriterForTest, } from "../config/sessions/store.js"; import { resetFileLockStateForTest } from "../infra/file-lock.js"; import { @@ -17,11 +15,8 @@ import { setSessionStateCleanupRuntimeForTests, } from "./session-state-cleanup.js"; -const acquireSessionWriteLockMock = vi.hoisted(() => - vi.fn(async () => ({ release: vi.fn(async () => {}) })), -); const drainFileLockStateMock = vi.hoisted(() => vi.fn(async () => undefined)); -const drainSessionStoreLockQueuesMock = vi.hoisted(() => vi.fn(async () => undefined)); +const drainSessionStoreWriterQueuesMock = vi.hoisted(() => vi.fn(async () => undefined)); const drainSessionWriteLockStateMock = vi.hoisted(() => vi.fn(async () => undefined)); function createDeferred() { @@ -46,14 +41,12 @@ describe("cleanupSessionStateForTest", () => { clearSessionStoreCacheForTest(); resetFileLockStateForTest(); resetSessionWriteLockStateForTest(); - acquireSessionWriteLockMock.mockClear(); drainFileLockStateMock.mockClear(); - drainSessionStoreLockQueuesMock.mockClear(); + drainSessionStoreWriterQueuesMock.mockClear(); drainSessionWriteLockStateMock.mockClear(); - setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock); setSessionStateCleanupRuntimeForTests({ drainFileLockStateForTest: drainFileLockStateMock, - drainSessionStoreLockQueuesForTest: drainSessionStoreLockQueuesMock, + drainSessionStoreWriterQueuesForTest: drainSessionStoreWriterQueuesMock, drainSessionWriteLockStateForTest: drainSessionWriteLockStateMock, }); }); @@ -63,19 +56,18 @@ describe("cleanupSessionStateForTest", () => { clearSessionStoreCacheForTest(); resetFileLockStateForTest(); resetSessionWriteLockStateForTest(); - resetSessionStoreLockRuntimeForTests(); resetSessionStateCleanupRuntimeForTests(); vi.restoreAllMocks(); }); - it("waits for in-flight session store locks before clearing test state", async () => { + it("waits for in-flight session store writer queues before clearing test state", async () => { const fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-cleanup-")); const storePath = path.join(fixtureRoot, "openclaw-sessions.json"); const started = createDeferred(); const release = createDeferred(); const drainRequested = createDeferred(); let finishDrain: () => void = () => undefined; - drainSessionStoreLockQueuesMock.mockImplementationOnce(async () => { + drainSessionStoreWriterQueuesMock.mockImplementationOnce(async () => { drainRequested.resolve(); await new Promise((resolve) => { finishDrain = resolve; @@ -83,13 +75,13 @@ describe("cleanupSessionStateForTest", () => { }); let running: Promise | undefined; try { - running = withSessionStoreLockForTest(storePath, async () => { + running = withSessionStoreWriterForTest(storePath, async () => { started.resolve(); await release.promise; }); await started.promise; - expect(getSessionStoreLockQueueSizeForTest()).toBe(1); + expect(getSessionStoreWriterQueueSizeForTest()).toBe(1); let settled = false; const cleanupPromise = cleanupSessionStateForTest().then(() => { @@ -99,7 +91,7 @@ describe("cleanupSessionStateForTest", () => { await drainRequested.promise; await flushMicrotasks(); expect(settled).toBe(false); - expect(drainSessionStoreLockQueuesMock).toHaveBeenCalledTimes(1); + expect(drainSessionStoreWriterQueuesMock).toHaveBeenCalledTimes(1); expect(drainFileLockStateMock).not.toHaveBeenCalled(); expect(drainSessionWriteLockStateMock).not.toHaveBeenCalled(); @@ -108,7 +100,7 @@ describe("cleanupSessionStateForTest", () => { finishDrain(); await cleanupPromise; - expect(getSessionStoreLockQueueSizeForTest()).toBe(0); + expect(getSessionStoreWriterQueueSizeForTest()).toBe(0); expect(drainFileLockStateMock).toHaveBeenCalledTimes(1); expect(drainSessionWriteLockStateMock).toHaveBeenCalledTimes(1); } finally { diff --git a/src/test-utils/session-state-cleanup.ts b/src/test-utils/session-state-cleanup.ts index dcf188f62ba..fba5a7418e1 100644 --- a/src/test-utils/session-state-cleanup.ts +++ b/src/test-utils/session-state-cleanup.ts @@ -1,22 +1,23 @@ import { drainSessionWriteLockStateForTest } from "../agents/session-write-lock.js"; import { clearSessionStoreCaches } from "../config/sessions/store-cache.js"; -import { drainSessionStoreLockQueuesForTest } from "../config/sessions/store-lock-state.js"; +import { drainSessionStoreWriterQueuesForTest } from "../config/sessions/store-writer-state.js"; import { drainFileLockStateForTest } from "../infra/file-lock.js"; let fileLockDrainerForTests: typeof drainFileLockStateForTest | null = null; -let sessionStoreLockQueueDrainerForTests: typeof drainSessionStoreLockQueuesForTest | null = null; +let sessionStoreWriterQueueDrainerForTests: typeof drainSessionStoreWriterQueuesForTest | null = + null; let sessionWriteLockDrainerForTests: typeof drainSessionWriteLockStateForTest | null = null; export function setSessionStateCleanupRuntimeForTests(params: { drainFileLockStateForTest?: typeof drainFileLockStateForTest | null; - drainSessionStoreLockQueuesForTest?: typeof drainSessionStoreLockQueuesForTest | null; + drainSessionStoreWriterQueuesForTest?: typeof drainSessionStoreWriterQueuesForTest | null; drainSessionWriteLockStateForTest?: typeof drainSessionWriteLockStateForTest | null; }): void { if ("drainFileLockStateForTest" in params) { fileLockDrainerForTests = params.drainFileLockStateForTest ?? null; } - if ("drainSessionStoreLockQueuesForTest" in params) { - sessionStoreLockQueueDrainerForTests = params.drainSessionStoreLockQueuesForTest ?? null; + if ("drainSessionStoreWriterQueuesForTest" in params) { + sessionStoreWriterQueueDrainerForTests = params.drainSessionStoreWriterQueuesForTest ?? null; } if ("drainSessionWriteLockStateForTest" in params) { sessionWriteLockDrainerForTests = params.drainSessionWriteLockStateForTest ?? null; @@ -25,12 +26,12 @@ export function setSessionStateCleanupRuntimeForTests(params: { export function resetSessionStateCleanupRuntimeForTests(): void { fileLockDrainerForTests = null; - sessionStoreLockQueueDrainerForTests = null; + sessionStoreWriterQueueDrainerForTests = null; sessionWriteLockDrainerForTests = null; } export async function cleanupSessionStateForTest(): Promise { - await (sessionStoreLockQueueDrainerForTests ?? drainSessionStoreLockQueuesForTest)(); + await (sessionStoreWriterQueueDrainerForTests ?? drainSessionStoreWriterQueuesForTest)(); clearSessionStoreCaches(); await (fileLockDrainerForTests ?? drainFileLockStateForTest)(); await (sessionWriteLockDrainerForTests ?? drainSessionWriteLockStateForTest)(); diff --git a/test/setup-openclaw-runtime.ts b/test/setup-openclaw-runtime.ts index 5b674f967a3..393c105ca5b 100644 --- a/test/setup-openclaw-runtime.ts +++ b/test/setup-openclaw-runtime.ts @@ -25,7 +25,7 @@ type WorkerPluginRuntimeHelpers = { type WorkerCleanupHelpers = { clearSessionStoreCaches: typeof import("../src/config/sessions/store-cache.js").clearSessionStoreCaches; drainFileLockStateForTest: typeof import("../src/infra/file-lock.js").drainFileLockStateForTest; - drainSessionStoreLockQueuesForTest: typeof import("../src/config/sessions/store-lock-state.js").drainSessionStoreLockQueuesForTest; + drainSessionStoreWriterQueuesForTest: typeof import("../src/config/sessions/store-writer-state.js").drainSessionStoreWriterQueuesForTest; drainSessionWriteLockStateForTest: typeof import("../src/agents/session-write-lock.js").drainSessionWriteLockStateForTest; resetContextWindowCacheForTest: typeof import("../src/agents/context-runtime-state.js").resetContextWindowCacheForTest; resetFileLockStateForTest: typeof import("../src/infra/file-lock.js").resetFileLockStateForTest; @@ -80,8 +80,8 @@ function loadWorkerCleanupHelpers(): Promise { vi.importActual( "../src/config/sessions/store-cache.js", ), - vi.importActual( - "../src/config/sessions/store-lock-state.js", + vi.importActual( + "../src/config/sessions/store-writer-state.js", ), vi.importActual("../src/infra/file-lock.js"), ]).then( @@ -90,12 +90,13 @@ function loadWorkerCleanupHelpers(): Promise { modelsConfigState, sessionWriteLock, sessionStoreCache, - sessionStoreLockState, + sessionStoreWriterState, fileLock, ]) => ({ clearSessionStoreCaches: sessionStoreCache.clearSessionStoreCaches, drainFileLockStateForTest: fileLock.drainFileLockStateForTest, - drainSessionStoreLockQueuesForTest: sessionStoreLockState.drainSessionStoreLockQueuesForTest, + drainSessionStoreWriterQueuesForTest: + sessionStoreWriterState.drainSessionStoreWriterQueuesForTest, drainSessionWriteLockStateForTest: sessionWriteLock.drainSessionWriteLockStateForTest, resetContextWindowCacheForTest: contextRuntimeState.resetContextWindowCacheForTest, resetFileLockStateForTest: fileLock.resetFileLockStateForTest, @@ -393,14 +394,14 @@ afterEach(async () => { const { clearSessionStoreCaches, drainFileLockStateForTest, - drainSessionStoreLockQueuesForTest, + drainSessionStoreWriterQueuesForTest, drainSessionWriteLockStateForTest, resetContextWindowCacheForTest, resetFileLockStateForTest, resetModelsJsonReadyCacheForTest, resetSessionWriteLockStateForTest, } = await loadWorkerCleanupHelpers(); - await drainSessionStoreLockQueuesForTest(); + await drainSessionStoreWriterQueuesForTest(); clearSessionStoreCaches(); await drainFileLockStateForTest(); await drainSessionWriteLockStateForTest();