perf: route session store writes through writer queue

This commit is contained in:
Peter Steinberger
2026-05-02 12:33:03 +01:00
parent ffc79532b8
commit b4437047f4
32 changed files with 497 additions and 395 deletions

View File

@@ -38,6 +38,7 @@ Docs: https://docs.openclaw.ai
- Models CLI: restore `openclaw models list --provider <id>` catalog and registry fallback rows for unconfigured providers, so provider-specific verification commands no longer report "No models found." Fixes #75517; supersedes #75615. Thanks @lotsoftick and @koshaji.
- Gateway/macOS: write LaunchAgent services with a canonical system PATH and stop preserving old plist PATH entries, so Volta, asdf, fnm, and pnpm shell paths no longer affect gateway child-process Node resolution. Fixes #75233; supersedes #75246. Thanks @nphyde2.
- Slack/hooks: preserve bot alert attachment text in message-received hook content when command text is blank. Fixes #76035; refs #76036. Thanks @amsminn.
- Sessions: route Gateway session-store writes through a dedicated in-process writer and borrow the validated mutable cache during the writer slot, avoiding runtime file locks plus repeated `sessions.json` rereads and JSON clones on hot metadata updates. Refs #68554. Thanks @henkterharmsel.
- Control UI/chat: show inline feedback when local slash-command dispatch is unavailable or fails unexpectedly instead of clearing the composer silently. Fixes #52105. Thanks @MooreQiao.
- Memory/markdown: replace CRLF managed blocks in place and collapse duplicate marker blocks without rewriting unmanaged markdown, so Dreaming and Memory Wiki files self-heal from repeated generated sections. Fixes #75491; supersedes #75495, #75810, and #76008. Thanks @asaenokkostya-coder, @ottodeng, @everettjf, and @lrg913427-dot.
- Agents/tools: return critical tool-loop circuit-breaker stops as blocked tool results instead of thrown tool failures, so models see the guardrail and stop retrying the same call. Thanks @rayraiser.

View File

@@ -114,11 +114,16 @@ Provider and channel execution paths must use the active runtime config snapshot
```typescript
const storePath = api.runtime.agent.session.resolveStorePath(cfg);
const store = api.runtime.agent.session.loadSessionStore(cfg);
await api.runtime.agent.session.saveSessionStore(cfg, store);
const store = api.runtime.agent.session.loadSessionStore(storePath);
await api.runtime.agent.session.updateSessionStore(storePath, (nextStore) => {
// Patch one entry without replacing the whole file from stale state.
nextStore[sessionKey] = { ...nextStore[sessionKey], thinkingLevel: "high" };
});
const filePath = api.runtime.agent.session.resolveSessionFilePath(cfg, sessionId);
```
Prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)` for runtime writes. They route through the Gateway-owned session-store writer, preserve concurrent updates, and reuse the hot cache. `saveSessionStore(...)` remains available for compatibility and offline maintenance-style rewrites.
</Accordion>
<Accordion title="api.runtime.agent.defaults">
Default model and provider constants:

View File

@@ -85,7 +85,7 @@ Session persistence has automatic maintenance controls (`session.maintenance`) f
- `maxDiskBytes`: optional sessions-directory budget
- `highWaterBytes`: optional target after cleanup (default `80%` of `maxDiskBytes`)
Normal Gateway writes batch `maxEntries` cleanup for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
Normal Gateway writes flow through a per-store session writer that serializes in-process mutations without taking a runtime file lock. Hot-path patch helpers borrow the validated mutable cache while they hold that writer slot, so large `sessions.json` files are not cloned or reread for every metadata update. Runtime code should prefer `updateSessionStore(...)` or `updateSessionStoreEntry(...)`; direct whole-store saves are compatibility and offline-maintenance tools. `maxEntries` cleanup is still batched for production-sized caps, so a store may briefly exceed the configured cap before the next high-water cleanup rewrites it back down. Session store reads do not prune or cap entries during Gateway startup; use writes or `openclaw sessions cleanup --enforce` for cleanup. `openclaw sessions cleanup --enforce` still applies the configured cap immediately.
Maintenance keeps durable external conversation pointers such as group sessions
and thread-scoped chat sessions, but synthetic runtime entries for cron, hooks,

View File

@@ -6,6 +6,14 @@ import { generateVoiceResponse } from "./response-generator.js";
function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
const sessionStore: Record<string, { sessionId: string; updatedAt: number }> = {};
const saveSessionStore = vi.fn(async () => {});
const updateSessionStore = vi.fn(
async (
_storePath: string,
mutator: (store: Record<string, { sessionId: string; updatedAt: number }>) => unknown,
) => {
return await mutator(sessionStore);
},
);
const runEmbeddedPiAgent = vi.fn(async () => ({
payloads,
meta: { durationMs: 12, aborted: false },
@@ -44,6 +52,7 @@ function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
resolveStorePath,
loadSessionStore: () => sessionStore,
saveSessionStore,
updateSessionStore,
resolveSessionFilePath,
},
} as unknown as CoreAgentDeps;
@@ -52,6 +61,7 @@ function createAgentRuntime(payloads: Array<Record<string, unknown>>) {
runtime,
runEmbeddedPiAgent,
saveSessionStore,
updateSessionStore,
sessionStore,
resolveAgentDir,
resolveAgentWorkspaceDir,
@@ -157,7 +167,7 @@ describe("generateVoiceResponse", () => {
});
it("pins the voice session to responseModel before running the embedded agent", async () => {
const { runtime, runEmbeddedPiAgent, saveSessionStore, sessionStore } = createAgentRuntime([
const { runtime, runEmbeddedPiAgent, updateSessionStore, sessionStore } = createAgentRuntime([
{ text: '{"spoken":"Pinned model works."}' },
]);
const voiceConfig = VoiceCallConfigSchema.parse({
@@ -181,7 +191,10 @@ describe("generateVoiceResponse", () => {
modelOverride: "gpt-4.1-nano",
modelOverrideSource: "auto",
});
expect(saveSessionStore).toHaveBeenCalledWith("/tmp/openclaw/main/sessions.json", sessionStore);
expect(updateSessionStore).toHaveBeenCalledWith(
"/tmp/openclaw/main/sessions.json",
expect.any(Function),
);
expect(runEmbeddedPiAgent).toHaveBeenCalledWith(
expect.objectContaining({
provider: "openai",

View File

@@ -224,34 +224,34 @@ export async function generateVoiceResponse(
// Load or create session entry
const sessionStore = agentRuntime.session.loadSessionStore(storePath);
const now = Date.now();
let sessionEntry = sessionStore[resolvedSessionKey] as SessionEntry | undefined;
let sessionEntryUpdated = false;
if (!sessionEntry) {
sessionEntry = {
sessionId: crypto.randomUUID(),
updatedAt: now,
};
sessionStore[resolvedSessionKey] = sessionEntry;
sessionEntryUpdated = true;
}
const sessionId = sessionEntry.sessionId;
const existingSessionEntry = sessionStore[resolvedSessionKey] as SessionEntry | undefined;
// Resolve model from config
const { provider, model } = resolveVoiceResponseModel({ voiceConfig, agentRuntime });
if (voiceConfig.responseModel) {
sessionEntryUpdated =
applyModelOverrideToSessionEntry({
entry: sessionEntry,
selection: { provider, model },
selectionSource: "auto",
}).updated || sessionEntryUpdated;
}
if (sessionEntryUpdated) {
await agentRuntime.session.saveSessionStore(storePath, sessionStore);
let sessionEntry = existingSessionEntry;
if (!sessionEntry?.sessionId || voiceConfig.responseModel) {
sessionEntry = await agentRuntime.session.updateSessionStore(storePath, (store) => {
let entry = store[resolvedSessionKey] as SessionEntry | undefined;
if (!entry?.sessionId) {
entry = {
...entry,
sessionId: crypto.randomUUID(),
updatedAt: now,
};
store[resolvedSessionKey] = entry;
}
if (voiceConfig.responseModel) {
applyModelOverrideToSessionEntry({
entry,
selection: { provider, model },
selectionSource: "auto",
});
}
return entry;
});
}
const sessionId = sessionEntry.sessionId;
const sessionFile = agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, {
agentId,

View File

@@ -337,6 +337,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
loadSessionStore: vi.fn(() => sessionStore),
saveSessionStore: vi.fn(async () => {}),
updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)),
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
},
runEmbeddedPiAgent,
@@ -421,6 +422,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
loadSessionStore: vi.fn(() => sessionStore),
saveSessionStore: vi.fn(async () => {}),
updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)),
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
},
runEmbeddedPiAgent,
@@ -483,6 +485,7 @@ describe("createVoiceCallRuntime lifecycle", () => {
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
loadSessionStore: vi.fn(() => sessionStore),
saveSessionStore: vi.fn(async () => {}),
updateSessionStore: vi.fn(async (_storePath, mutator) => mutator(sessionStore as never)),
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
},
runEmbeddedPiAgent,

View File

@@ -1,12 +1,8 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
drainSessionStoreLockQueuesForTest,
resetSessionStoreLockRuntimeForTests,
setSessionWriteLockAcquirerForTests,
} from "../config/sessions.js";
import { afterEach, describe, expect, it, vi } from "vitest";
import { drainSessionStoreWriterQueuesForTest } from "../config/sessions.js";
import {
readCompactionCount,
seedSessionStore,
@@ -57,15 +53,8 @@ function createCompactionContext(params: {
} as unknown as EmbeddedPiSubscribeContext;
}
beforeEach(() => {
setSessionWriteLockAcquirerForTests(async () => ({
release: async () => {},
}));
});
afterEach(async () => {
resetSessionStoreLockRuntimeForTests();
await drainSessionStoreLockQueuesForTest();
await drainSessionStoreWriterQueuesForTest();
});
describe("reconcileSessionStoreCompactionCountAfterSuccess", () => {

View File

@@ -5,7 +5,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite
import "./subagent-registry.mocks.shared.js";
import {
clearSessionStoreCacheForTest,
drainSessionStoreLockQueuesForTest,
drainSessionStoreWriterQueuesForTest,
} from "../config/sessions/store.js";
import { captureEnv } from "../test-utils/env.js";
import {
@@ -131,7 +131,7 @@ describe("subagent registry persistence resume", () => {
announceSpy.mockClear();
mod.__testing.setDepsForTest();
mod.resetSubagentRegistryForTests({ persist: false });
await drainSessionStoreLockQueuesForTest();
await drainSessionStoreWriterQueuesForTest();
clearSessionStoreCacheForTest();
if (tempStateDir) {
await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });

View File

@@ -6,7 +6,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import "./subagent-registry.mocks.shared.js";
import {
clearSessionStoreCacheForTest,
drainSessionStoreLockQueuesForTest,
drainSessionStoreWriterQueuesForTest,
} from "../config/sessions/store.js";
import { callGateway } from "../gateway/call.js";
import { onAgentEvent } from "../infra/agent-events.js";
@@ -207,7 +207,7 @@ describe("subagent registry persistence", () => {
announceSpy.mockClear();
__testing.setDepsForTest();
resetSubagentRegistryForTests({ persist: false });
await drainSessionStoreLockQueuesForTest();
await drainSessionStoreWriterQueuesForTest();
clearSessionStoreCacheForTest();
if (tempStateDir) {
await fs.rm(tempStateDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });

View File

@@ -1,7 +1,8 @@
import fsSync from "node:fs";
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, describe, expect, it } from "vitest";
import { afterAll, beforeAll, describe, expect, it, vi } from "vitest";
import { withEnv } from "../test-utils/env.js";
import {
buildGroupDisplayName,
@@ -798,7 +799,7 @@ describe("sessions", () => {
await expect(fs.stat(`${storePath}.lock`)).rejects.toThrow();
});
it("updateSessionStoreEntry re-reads disk inside lock instead of using stale cache", async () => {
it("updateSessionStoreEntry re-reads disk inside the writer slot instead of using stale cache", async () => {
const mainSessionKey = "agent:main:main";
const { storePath } = await createSessionStoreFixture({
prefix: "updateSessionStoreEntry-cache-bypass",
@@ -838,4 +839,91 @@ describe("sessions", () => {
expect(store[mainSessionKey]?.providerOverride).toBe("anthropic");
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
});
it("updateSessionStore uses the writer-owned mutable cache without disk read or parse", async () => {
const mainSessionKey = "agent:main:main";
const { storePath } = await createSessionStoreFixture({
prefix: "updateSessionStore-mutable-cache",
entries: {
[mainSessionKey]: {
sessionId: "sess-1",
updatedAt: 123,
thinkingLevel: "low",
},
},
});
expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low");
const readSpy = vi.spyOn(fsSync, "readFileSync");
const parseSpy = vi.spyOn(JSON, "parse");
try {
await updateSessionStore(
storePath,
(store) => {
const existing = store[mainSessionKey];
if (!existing) {
throw new Error("missing session entry");
}
store[mainSessionKey] = {
...existing,
thinkingLevel: "high",
};
},
{ skipMaintenance: true },
);
expect(readSpy).not.toHaveBeenCalled();
expect(parseSpy).not.toHaveBeenCalled();
} finally {
readSpy.mockRestore();
parseSpy.mockRestore();
}
const store = loadSessionStore(storePath, { skipCache: true });
expect(store[mainSessionKey]?.thinkingLevel).toBe("high");
});
it("updateSessionStore drops a borrowed cache entry when a mutator throws", async () => {
const mainSessionKey = "agent:main:main";
const { storePath } = await createSessionStoreFixture({
prefix: "updateSessionStore-mutable-cache-throw",
entries: {
[mainSessionKey]: {
sessionId: "sess-1",
updatedAt: 123,
thinkingLevel: "low",
},
},
});
expect(loadSessionStore(storePath)[mainSessionKey]?.thinkingLevel).toBe("low");
await expect(
updateSessionStore(
storePath,
(store) => {
const existing = store[mainSessionKey];
if (!existing) {
throw new Error("missing session entry");
}
store[mainSessionKey] = {
...existing,
thinkingLevel: "mutated-before-throw",
};
throw new Error("boom");
},
{ skipMaintenance: true },
),
).rejects.toThrow("boom");
const readSpy = vi.spyOn(fsSync, "readFileSync");
try {
const store = loadSessionStore(storePath);
expect(readSpy).toHaveBeenCalled();
expect(store[mainSessionKey]?.thinkingLevel).toBe("low");
} finally {
readSpy.mockRestore();
}
});
});

View File

@@ -271,15 +271,13 @@ describe("session lifecycle timestamps", () => {
});
});
describe("session store lock (Promise chain mutex)", () => {
const lockFixtureRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-lock-test-" });
let lockTmpDirs: string[] = [];
describe("session store writer queue", () => {
const writerFixtureRootTracker = createSuiteTempRootTracker({ prefix: "openclaw-writer-test-" });
async function makeTmpStore(
initial: Record<string, unknown> = {},
): Promise<{ dir: string; storePath: string }> {
const dir = await lockFixtureRootTracker.make("case");
lockTmpDirs.push(dir);
const dir = await writerFixtureRootTracker.make("case");
const storePath = path.join(dir, "sessions.json");
if (Object.keys(initial).length > 0) {
await fsPromises.writeFile(storePath, JSON.stringify(initial, null, 2), "utf-8");
@@ -288,16 +286,15 @@ describe("session store lock (Promise chain mutex)", () => {
}
beforeAll(async () => {
await lockFixtureRootTracker.setup();
await writerFixtureRootTracker.setup();
});
afterAll(async () => {
await lockFixtureRootTracker.cleanup();
await writerFixtureRootTracker.cleanup();
});
afterEach(async () => {
clearSessionStoreCacheForTest();
lockTmpDirs = [];
});
it("serializes concurrent updateSessionStore calls without data loss", async () => {

View File

@@ -75,6 +75,23 @@ export function readSessionStoreCache(params: {
return cloneSessionStoreRecord(cached.store, cached.serialized);
}
export function takeMutableSessionStoreCache(params: {
storePath: string;
mtimeMs?: number;
sizeBytes?: number;
}): Record<string, SessionEntry> | null {
const cached = SESSION_STORE_CACHE.get(params.storePath);
if (!cached) {
return null;
}
if (params.mtimeMs !== cached.mtimeMs || params.sizeBytes !== cached.sizeBytes) {
invalidateSessionStoreCache(params.storePath);
return null;
}
SESSION_STORE_CACHE.delete(params.storePath);
return cached.store;
}
export function writeSessionStoreCache(params: {
storePath: string;
store: Record<string, SessionEntry>;

View File

@@ -1,34 +1,32 @@
import { clearSessionStoreCaches } from "./store-cache.js";
export type SessionStoreLockTask = {
export type SessionStoreWriterTask = {
fn: () => Promise<unknown>;
resolve: (value: unknown) => void;
reject: (reason: unknown) => void;
timeoutMs?: number;
staleMs: number;
};
export type SessionStoreLockQueue = {
export type SessionStoreWriterQueue = {
running: boolean;
pending: SessionStoreLockTask[];
pending: SessionStoreWriterTask[];
drainPromise: Promise<void> | null;
};
export const LOCK_QUEUES = new Map<string, SessionStoreLockQueue>();
export const WRITER_QUEUES = new Map<string, SessionStoreWriterQueue>();
export function clearSessionStoreCacheForTest(): void {
clearSessionStoreCaches();
for (const queue of LOCK_QUEUES.values()) {
for (const queue of WRITER_QUEUES.values()) {
for (const task of queue.pending) {
task.reject(new Error("session store queue cleared for test"));
}
}
LOCK_QUEUES.clear();
WRITER_QUEUES.clear();
}
export async function drainSessionStoreLockQueuesForTest(): Promise<void> {
while (LOCK_QUEUES.size > 0) {
const queues = [...LOCK_QUEUES.values()];
export async function drainSessionStoreWriterQueuesForTest(): Promise<void> {
while (WRITER_QUEUES.size > 0) {
const queues = [...WRITER_QUEUES.values()];
for (const queue of queues) {
for (const task of queue.pending) {
task.reject(new Error("session store queue cleared for test"));
@@ -39,13 +37,13 @@ export async function drainSessionStoreLockQueuesForTest(): Promise<void> {
queue.drainPromise ? [queue.drainPromise] : [],
);
if (activeDrains.length === 0) {
LOCK_QUEUES.clear();
WRITER_QUEUES.clear();
return;
}
await Promise.allSettled(activeDrains);
}
}
export function getSessionStoreLockQueueSizeForTest(): number {
return LOCK_QUEUES.size;
export function getSessionStoreWriterQueueSizeForTest(): number {
return WRITER_QUEUES.size;
}

View File

@@ -0,0 +1,56 @@
import { afterEach, describe, expect, it } from "vitest";
import {
clearSessionStoreCacheForTest,
getSessionStoreWriterQueueSizeForTest,
withSessionStoreWriterForTest,
} from "./store.js";
const createDeferred = <T>() => {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((nextResolve, nextReject) => {
resolve = nextResolve;
reject = nextReject;
});
return { promise, resolve, reject };
};
describe("session store writer", () => {
afterEach(() => {
clearSessionStoreCacheForTest();
});
it("serializes runtime writes through one in-process writer", async () => {
const storePath = "/tmp/openclaw-store.json";
const firstStarted = createDeferred<void>();
const releaseFirst = createDeferred<void>();
const order: string[] = [];
const first = withSessionStoreWriterForTest(storePath, async () => {
order.push("first:start");
firstStarted.resolve();
await releaseFirst.promise;
order.push("first:end");
});
const second = withSessionStoreWriterForTest(storePath, async () => {
order.push("second");
});
await firstStarted.promise;
expect(getSessionStoreWriterQueueSizeForTest()).toBe(1);
expect(order).toEqual(["first:start"]);
releaseFirst.resolve();
await Promise.all([first, second]);
expect(order).toEqual(["first:start", "first:end", "second"]);
expect(getSessionStoreWriterQueueSizeForTest()).toBe(0);
});
it("rejects empty store paths before enqueuing work", async () => {
await expect(withSessionStoreWriterForTest("", async () => undefined)).rejects.toThrow(
/storePath must be a non-empty string/,
);
expect(getSessionStoreWriterQueueSizeForTest()).toBe(0);
});
});

View File

@@ -0,0 +1,97 @@
import {
WRITER_QUEUES,
type SessionStoreWriterQueue,
type SessionStoreWriterTask,
} from "./store-writer-state.js";
export async function withSessionStoreWriterForTest<T>(
storePath: string,
fn: () => Promise<T>,
): Promise<T> {
return await runExclusiveSessionStoreWrite(storePath, fn);
}
function getOrCreateWriterQueue(storePath: string): SessionStoreWriterQueue {
const existing = WRITER_QUEUES.get(storePath);
if (existing) {
return existing;
}
const created: SessionStoreWriterQueue = { running: false, pending: [], drainPromise: null };
WRITER_QUEUES.set(storePath, created);
return created;
}
async function drainSessionStoreWriterQueue(storePath: string): Promise<void> {
const queue = WRITER_QUEUES.get(storePath);
if (!queue) {
return;
}
if (queue.drainPromise) {
await queue.drainPromise;
return;
}
queue.running = true;
queue.drainPromise = (async () => {
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task) {
continue;
}
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
queue.drainPromise = null;
if (queue.pending.length === 0) {
WRITER_QUEUES.delete(storePath);
} else {
queueMicrotask(() => {
void drainSessionStoreWriterQueue(storePath);
});
}
}
})();
await queue.drainPromise;
}
export async function runExclusiveSessionStoreWrite<T>(
storePath: string,
fn: () => Promise<T>,
): Promise<T> {
if (!storePath || typeof storePath !== "string") {
throw new Error(
`runExclusiveSessionStoreWrite: storePath must be a non-empty string, got ${JSON.stringify(
storePath,
)}`,
);
}
const queue = getOrCreateWriterQueue(storePath);
const promise = new Promise<T>((resolve, reject) => {
const task: SessionStoreWriterTask = {
fn: async () => await fn(),
resolve: (value) => resolve(value as T),
reject,
};
queue.pending.push(task);
void drainSessionStoreWriterQueue(storePath);
});
return await promise;
}

View File

@@ -1,50 +0,0 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
clearSessionStoreCacheForTest,
resetSessionStoreLockRuntimeForTests,
setSessionWriteLockAcquirerForTests,
withSessionStoreLockForTest,
} from "./store.js";
const acquireSessionWriteLockMock = vi.hoisted(() =>
vi.fn(async () => ({ release: vi.fn(async () => {}) })),
);
describe("withSessionStoreLock", () => {
beforeEach(() => {
acquireSessionWriteLockMock.mockClear();
setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock);
});
afterEach(() => {
clearSessionStoreCacheForTest();
resetSessionStoreLockRuntimeForTests();
vi.restoreAllMocks();
});
it("derives session lock hold time from the store lock timeout", async () => {
await withSessionStoreLockForTest("/tmp/openclaw-store.json", async () => {}, {
timeoutMs: 10_000,
});
expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({
sessionFile: "/tmp/openclaw-store.json",
timeoutMs: 10_000,
staleMs: 30_000,
maxHoldMs: 15_000,
});
});
it("leaves the session lock hold time unset when store locking has no timeout", async () => {
await withSessionStoreLockForTest("/tmp/openclaw-store.json", async () => {}, {
timeoutMs: 0,
});
expect(acquireSessionWriteLockMock).toHaveBeenCalledWith({
sessionFile: "/tmp/openclaw-store.json",
timeoutMs: Number.POSITIVE_INFINITY,
staleMs: 30_000,
maxHoldMs: undefined,
});
});
});

View File

@@ -1,9 +1,5 @@
import fs from "node:fs";
import path from "node:path";
import {
acquireSessionWriteLock,
resolveSessionLockMaxHoldFromTimeout,
} from "../../agents/session-write-lock.js";
import type { MsgContext } from "../../auto-reply/templating.js";
import { writeTextAtomic } from "../../infra/json-files.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
@@ -22,18 +18,11 @@ import {
getSerializedSessionStore,
isSessionStoreCacheEnabled,
setSerializedSessionStore,
takeMutableSessionStoreCache,
writeSessionStoreCache,
} from "./store-cache.js";
import { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js";
import { loadSessionStore, normalizeSessionStore } from "./store-load.js";
import {
clearSessionStoreCacheForTest,
drainSessionStoreLockQueuesForTest,
getSessionStoreLockQueueSizeForTest,
LOCK_QUEUES,
type SessionStoreLockQueue,
type SessionStoreLockTask,
} from "./store-lock-state.js";
import { resolveMaintenanceConfig } from "./store-maintenance-runtime.js";
import {
capEntryCount,
@@ -43,6 +32,7 @@ import {
type ResolvedSessionMaintenanceConfig,
type SessionMaintenanceWarning,
} from "./store-maintenance.js";
import { runExclusiveSessionStoreWrite } from "./store-writer.js";
import {
mergeSessionEntry,
mergeSessionEntryPreserveActivity,
@@ -51,9 +41,10 @@ import {
export {
clearSessionStoreCacheForTest,
drainSessionStoreLockQueuesForTest,
getSessionStoreLockQueueSizeForTest,
} from "./store-lock-state.js";
drainSessionStoreWriterQueuesForTest,
getSessionStoreWriterQueueSizeForTest,
} from "./store-writer-state.js";
export { withSessionStoreWriterForTest } from "./store-writer.js";
export { loadSessionStore } from "./store-load.js";
export { normalizeStoreSessionKey, resolveSessionStoreEntry } from "./store-entry.js";
@@ -63,7 +54,6 @@ let sessionArchiveRuntimePromise: Promise<
> | null = null;
let trajectoryCleanupRuntimePromise: Promise<typeof import("../../trajectory/cleanup.js")> | null =
null;
let sessionWriteLockAcquirerForTests: typeof acquireSessionWriteLock | null = null;
function loadSessionArchiveRuntime() {
sessionArchiveRuntimePromise ??= import("../../gateway/session-archive.runtime.js");
@@ -84,24 +74,6 @@ function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryCon
return next;
}
export function setSessionWriteLockAcquirerForTests(
acquirer: typeof acquireSessionWriteLock | null,
): void {
sessionWriteLockAcquirerForTests = acquirer;
}
export function resetSessionStoreLockRuntimeForTests(): void {
sessionWriteLockAcquirerForTests = null;
}
export async function withSessionStoreLockForTest<T>(
storePath: string,
fn: () => Promise<T>,
opts: SessionStoreLockOptions = {},
): Promise<T> {
return await withSessionStoreLock(storePath, fn, opts);
}
export function readSessionUpdatedAt(params: {
storePath: string;
sessionKey: string;
@@ -177,6 +149,21 @@ function updateSessionStoreWriteCaches(params: {
});
}
function loadMutableSessionStoreForWriter(storePath: string): Record<string, SessionEntry> {
if (isSessionStoreCacheEnabled()) {
const currentFileStat = getFileStatSnapshot(storePath);
const cached = takeMutableSessionStoreCache({
storePath,
mtimeMs: currentFileStat?.mtimeMs,
sizeBytes: currentFileStat?.sizeBytes,
});
if (cached) {
return cached;
}
}
return loadSessionStore(storePath, { skipCache: true, clone: false });
}
function resolveMutableSessionStoreKey(
store: Record<string, SessionEntry>,
sessionKey: string,
@@ -403,7 +390,7 @@ async function saveSessionStoreUnlocked(
await new Promise((r) => setTimeout(r, 50 * (i + 1)));
continue;
}
// Final attempt failed skip this save. The write lock ensures
// Final attempt failed - skip this save. The writer queue ensures
// the next save will retry with fresh data. Log for diagnostics.
log.warn(`atomic write failed after 5 attempts: ${storePath}`);
}
@@ -440,7 +427,7 @@ export async function saveSessionStore(
store: Record<string, SessionEntry>,
opts?: SaveSessionStoreOptions,
): Promise<void> {
await withSessionStoreLock(storePath, async () => {
await runExclusiveSessionStoreWrite(storePath, async () => {
await saveSessionStoreUnlocked(storePath, store, opts);
});
}
@@ -450,9 +437,8 @@ export async function updateSessionStore<T>(
mutator: (store: Record<string, SessionEntry>) => Promise<T> | T,
opts?: SaveSessionStoreOptions,
): Promise<T> {
return await withSessionStoreLock(storePath, async () => {
// Always re-read inside the lock to avoid clobbering concurrent writers.
const store = loadSessionStore(storePath, { skipCache: true, clone: false });
return await runExclusiveSessionStoreWrite(storePath, async () => {
const store = loadMutableSessionStoreForWriter(storePath);
const previousAcpByKey = collectAcpMetadataSnapshot(store);
const result = await mutator(store);
preserveExistingAcpMetadata({
@@ -465,15 +451,6 @@ export async function updateSessionStore<T>(
});
}
type SessionStoreLockOptions = {
timeoutMs?: number;
pollIntervalMs?: number;
staleMs?: number;
};
const SESSION_STORE_LOCK_MIN_HOLD_MS = 5_000;
const SESSION_STORE_LOCK_TIMEOUT_GRACE_MS = 5_000;
function getErrorCode(error: unknown): string | null {
if (!error || typeof error !== "object" || !("code" in error)) {
return null;
@@ -546,136 +523,14 @@ async function persistResolvedSessionEntry(params: {
return params.next;
}
function lockTimeoutError(storePath: string): Error {
return new Error(`timeout waiting for session store lock: ${storePath}`);
}
function resolveSessionStoreLockMaxHoldMs(timeoutMs: number | undefined): number | undefined {
if (timeoutMs == null || !Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return undefined;
}
return resolveSessionLockMaxHoldFromTimeout({
timeoutMs,
graceMs: SESSION_STORE_LOCK_TIMEOUT_GRACE_MS,
minMs: SESSION_STORE_LOCK_MIN_HOLD_MS,
});
}
function getOrCreateLockQueue(storePath: string): SessionStoreLockQueue {
const existing = LOCK_QUEUES.get(storePath);
if (existing) {
return existing;
}
const created: SessionStoreLockQueue = { running: false, pending: [], drainPromise: null };
LOCK_QUEUES.set(storePath, created);
return created;
}
async function drainSessionStoreLockQueue(storePath: string): Promise<void> {
const queue = LOCK_QUEUES.get(storePath);
if (!queue) {
return;
}
if (queue.drainPromise) {
await queue.drainPromise;
return;
}
queue.running = true;
queue.drainPromise = (async () => {
try {
while (queue.pending.length > 0) {
const task = queue.pending.shift();
if (!task) {
continue;
}
const remainingTimeoutMs = task.timeoutMs ?? Number.POSITIVE_INFINITY;
if (task.timeoutMs != null && remainingTimeoutMs <= 0) {
task.reject(lockTimeoutError(storePath));
continue;
}
let lock: { release: () => Promise<void> } | undefined;
let result: unknown;
let failed: unknown;
let hasFailure = false;
try {
lock = await (sessionWriteLockAcquirerForTests ?? acquireSessionWriteLock)({
sessionFile: storePath,
timeoutMs: remainingTimeoutMs,
staleMs: task.staleMs,
maxHoldMs: resolveSessionStoreLockMaxHoldMs(task.timeoutMs),
});
result = await task.fn();
} catch (err) {
hasFailure = true;
failed = err;
} finally {
await lock?.release().catch(() => undefined);
}
if (hasFailure) {
task.reject(failed);
continue;
}
task.resolve(result);
}
} finally {
queue.running = false;
queue.drainPromise = null;
if (queue.pending.length === 0) {
LOCK_QUEUES.delete(storePath);
} else {
queueMicrotask(() => {
void drainSessionStoreLockQueue(storePath);
});
}
}
})();
await queue.drainPromise;
}
async function withSessionStoreLock<T>(
storePath: string,
fn: () => Promise<T>,
opts: SessionStoreLockOptions = {},
): Promise<T> {
if (!storePath || typeof storePath !== "string") {
throw new Error(
`withSessionStoreLock: storePath must be a non-empty string, got ${JSON.stringify(storePath)}`,
);
}
const timeoutMs = opts.timeoutMs ?? 10_000;
const staleMs = opts.staleMs ?? 30_000;
// `pollIntervalMs` is retained for API compatibility with older lock options.
void opts.pollIntervalMs;
const hasTimeout = timeoutMs > 0 && Number.isFinite(timeoutMs);
const queue = getOrCreateLockQueue(storePath);
const promise = new Promise<T>((resolve, reject) => {
const task: SessionStoreLockTask = {
fn: async () => await fn(),
resolve: (value) => resolve(value as T),
reject,
timeoutMs: hasTimeout ? timeoutMs : undefined,
staleMs,
};
queue.pending.push(task);
void drainSessionStoreLockQueue(storePath);
});
return await promise;
}
export async function updateSessionStoreEntry(params: {
storePath: string;
sessionKey: string;
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
}): Promise<SessionEntry | null> {
const { storePath, sessionKey, update } = params;
return await withSessionStoreLock(storePath, async () => {
const store = loadSessionStore(storePath, { skipCache: true, clone: false });
return await runExclusiveSessionStoreWrite(storePath, async () => {
const store = loadMutableSessionStoreForWriter(storePath);
const resolved = resolveSessionStoreEntry({ store, sessionKey });
const existing = resolved.existing;
if (!existing) {
@@ -756,8 +611,8 @@ export async function updateLastRoute(params: {
}): Promise<SessionEntry | null> {
const { storePath, sessionKey, channel, to, accountId, threadId, ctx } = params;
const createIfMissing = params.createIfMissing ?? true;
return await withSessionStoreLock(storePath, async () => {
const store = loadSessionStore(storePath);
return await runExclusiveSessionStoreWrite(storePath, async () => {
const store = loadMutableSessionStoreForWriter(storePath);
const resolved = resolveSessionStoreEntry({ store, sessionKey });
const existing = resolved.existing;
if (!existing && !createIfMissing) {

View File

@@ -106,12 +106,13 @@ export function resolveCronSession(params: {
nowMs: number;
agentId: string;
forceNew?: boolean;
store?: Record<string, SessionEntry>;
}) {
const sessionCfg = params.cfg.session;
const storePath = resolveStorePath(sessionCfg?.store, {
agentId: params.agentId,
});
const store = loadSessionStore(storePath);
const store = params.store ?? loadSessionStore(storePath);
const entry = store[params.sessionKey];
// Check if we can reuse an existing session

View File

@@ -31,5 +31,7 @@ export {
resolveStorePath,
loadSessionStore,
saveSessionStore,
updateSessionStore,
updateSessionStoreEntry,
resolveSessionFilePath,
} from "./config/sessions.js";

View File

@@ -214,7 +214,7 @@ test("sessions.reset returns unavailable when active run does not stop", async (
expect(filesAfterResetAttempt.some((f) => f.startsWith("sess-main.jsonl.reset."))).toBe(false);
});
test("sessions.reset emits before_reset for the entry actually reset under the store lock", async () => {
test("sessions.reset emits before_reset for the entry actually reset in the writer slot", async () => {
const { dir } = await createSessionStoreDir();
const oldTranscriptPath = path.join(dir, "sess-old.jsonl");
const newTranscriptPath = path.join(dir, "sess-new.jsonl");
@@ -251,7 +251,7 @@ test("sessions.reset emits before_reset for the entry actually reset under the s
const [
{ getRuntimeConfig },
{ resolveGatewaySessionStoreTarget },
{ withSessionStoreLockForTest },
{ withSessionStoreWriterForTest },
] = await Promise.all([
import("../config/config.js"),
import("./session-utils.js"),
@@ -266,7 +266,7 @@ test("sessions.reset emits before_reset for the entry actually reset under the s
| ReturnType<(typeof import("./session-reset-service.js"))["performGatewaySessionReset"]>
| undefined;
const { performGatewaySessionReset } = await import("./session-reset-service.js");
await withSessionStoreLockForTest(gatewayStorePath, async () => {
await withSessionStoreWriterForTest(gatewayStorePath, async () => {
pendingReset = performGatewaySessionReset({
key: "main",
reason: "new",

View File

@@ -54,11 +54,7 @@ import {
} from "../config/sessions/main-session.js";
import { resolveStorePath } from "../config/sessions/paths.js";
import { loadSessionStore } from "../config/sessions/store-load.js";
import {
archiveRemovedSessionTranscripts,
saveSessionStore,
updateSessionStore,
} from "../config/sessions/store.js";
import { archiveRemovedSessionTranscripts, updateSessionStore } from "../config/sessions/store.js";
import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { hasActiveCronJobs } from "../cron/active-jobs.js";
@@ -1179,40 +1175,46 @@ export async function runHeartbeatOnce(opts: {
configuredSessionKey: configuredSession.sessionKey,
sessionEntry: entry,
});
const cronSession = resolveCronSession({
cfg,
sessionKey: isolatedSessionKey,
agentId,
nowMs: startedAt,
forceNew: true,
});
const isolatedStorePath = resolveStorePath(cfg.session?.store, { agentId });
const staleIsolatedSessionKey = resolveStaleHeartbeatIsolatedSessionKey({
sessionKey,
isolatedSessionKey,
isolatedBaseSessionKey,
});
const removedSessionFiles = new Map<string, string | undefined>();
if (staleIsolatedSessionKey) {
const staleEntry = cronSession.store[staleIsolatedSessionKey];
if (staleEntry?.sessionId) {
removedSessionFiles.set(staleEntry.sessionId, staleEntry.sessionFile);
let referencedSessionIds = new Set<string>();
await updateSessionStore(isolatedStorePath, (store) => {
const cronSession = resolveCronSession({
cfg,
sessionKey: isolatedSessionKey,
agentId,
nowMs: startedAt,
forceNew: true,
store,
});
if (staleIsolatedSessionKey) {
const staleEntry = store[staleIsolatedSessionKey];
if (staleEntry?.sessionId) {
removedSessionFiles.set(staleEntry.sessionId, staleEntry.sessionFile);
}
delete store[staleIsolatedSessionKey];
}
delete cronSession.store[staleIsolatedSessionKey];
}
cronSession.sessionEntry.heartbeatIsolatedBaseSessionKey = isolatedBaseSessionKey;
cronSession.store[isolatedSessionKey] = cronSession.sessionEntry;
await saveSessionStore(cronSession.storePath, cronSession.store);
store[isolatedSessionKey] = {
...cronSession.sessionEntry,
heartbeatIsolatedBaseSessionKey: isolatedBaseSessionKey,
};
referencedSessionIds = new Set(
Object.values(store)
.map((sessionEntry) => sessionEntry?.sessionId)
.filter((sessionId): sessionId is string => Boolean(sessionId)),
);
});
if (removedSessionFiles.size > 0) {
try {
const referencedSessionIds = new Set(
Object.values(cronSession.store)
.map((sessionEntry) => sessionEntry?.sessionId)
.filter((sessionId): sessionId is string => Boolean(sessionId)),
);
await archiveRemovedSessionTranscripts({
removedSessionFiles,
referencedSessionIds,
storePath: cronSession.storePath,
storePath: isolatedStorePath,
reason: "deleted",
restrictToStoreDir: true,
});
@@ -1242,29 +1244,30 @@ export async function runHeartbeatOnce(opts: {
if (!preflight.tasks || preflight.tasks.length === 0) {
return;
}
const tasks = preflight.tasks;
const store = loadSessionStore(storePath);
const current = store[sessionKey];
// Initialize stub entry on first run when current doesn't exist
const base = current ?? {
// Generate valid sessionId - derive from sessionKey without colons
sessionId: sessionKey.replace(/:/g, "_"),
updatedAt: startedAt,
createdAt: startedAt,
messageCount: 0,
lastMessageAt: startedAt,
heartbeatTaskState: {},
};
const taskState = { ...base.heartbeatTaskState };
await updateSessionStore(storePath, (store) => {
const current = store[sessionKey];
// Initialize stub entry on first run when current doesn't exist.
const base = current ?? {
// Generate valid sessionId - derive from sessionKey without colons.
sessionId: sessionKey.replace(/:/g, "_"),
updatedAt: startedAt,
createdAt: startedAt,
messageCount: 0,
lastMessageAt: startedAt,
heartbeatTaskState: {},
};
const taskState = { ...base.heartbeatTaskState };
for (const task of preflight.tasks) {
if (isTaskDue(taskState[task.name], task.interval, startedAt)) {
taskState[task.name] = startedAt;
for (const task of tasks) {
if (isTaskDue(taskState[task.name], task.interval, startedAt)) {
taskState[task.name] = startedAt;
}
}
}
store[sessionKey] = { ...base, heartbeatTaskState: taskState };
await saveSessionStore(storePath, store);
store[sessionKey] = { ...base, heartbeatTaskState: taskState };
});
};
const consumeInspectedSystemEvents = () => {
@@ -1658,16 +1661,17 @@ export async function runHeartbeatOnce(opts: {
// Record last delivered heartbeat payload for dedupe.
if (!shouldSkipMain && normalized.text.trim()) {
const store = loadSessionStore(storePath);
const current = store[sessionKey];
if (current) {
await updateSessionStore(storePath, (store) => {
const current = store[sessionKey];
if (!current) {
return;
}
store[sessionKey] = {
...current,
lastHeartbeatText: normalized.text,
lastHeartbeatSentAt: startedAt,
};
await saveSessionStore(storePath, store);
}
});
}
emitHeartbeatEvent({

View File

@@ -135,6 +135,7 @@ export {
saveSessionStore,
updateLastRoute,
updateSessionStore,
updateSessionStoreEntry,
resolveSessionStoreEntry,
} from "../config/sessions/store.js";
export { resolveSessionKey } from "../config/sessions/session-key.js";

View File

@@ -14,6 +14,7 @@ export {
saveSessionStore,
updateLastRoute,
updateSessionStore,
updateSessionStoreEntry,
} from "../config/sessions/store.js";
export {
evaluateSessionFreshness,

View File

@@ -353,6 +353,16 @@ export function createPluginRuntimeMock(overrides: DeepPartial<PluginRuntime> =
.mockResolvedValue(
undefined,
) as unknown as PluginRuntime["agent"]["session"]["saveSessionStore"],
updateSessionStore: vi
.fn()
.mockResolvedValue(
undefined,
) as unknown as PluginRuntime["agent"]["session"]["updateSessionStore"],
updateSessionStoreEntry: vi
.fn()
.mockResolvedValue(
null,
) as unknown as PluginRuntime["agent"]["session"]["updateSessionStoreEntry"],
resolveSessionFilePath: vi.fn(
(sessionId: string) => `/tmp/${sessionId}.json`,
) as unknown as PluginRuntime["agent"]["session"]["resolveSessionFilePath"],

View File

@@ -274,6 +274,8 @@ describe("plugin runtime command execution", () => {
"resolveAgentDir",
]);
expectFunctionKeys(runtime.agent.session as Record<string, unknown>, [
"updateSessionStore",
"updateSessionStoreEntry",
"resolveSessionFilePath",
]);
},

View File

@@ -10,7 +10,12 @@ import { ensureAgentWorkspace } from "../../agents/workspace.js";
import { normalizeThinkLevel, resolveThinkingProfile } from "../../auto-reply/thinking.js";
import { getRuntimeConfig } from "../../config/config.js";
import { resolveSessionFilePath, resolveStorePath } from "../../config/sessions/paths.js";
import { loadSessionStore, saveSessionStore } from "../../config/sessions/store.js";
import {
loadSessionStore,
saveSessionStore,
updateSessionStore,
updateSessionStoreEntry,
} from "../../config/sessions/store.js";
import { createLazyRuntimeMethod, createLazyRuntimeModule } from "../../shared/lazy-runtime.js";
import { defineCachedValue } from "./runtime-cache.js";
import type { PluginRuntime } from "./types.js";
@@ -68,6 +73,8 @@ export function createRuntimeAgent(): PluginRuntime["agent"] {
resolveStorePath,
loadSessionStore,
saveSessionStore,
updateSessionStore,
updateSessionStoreEntry,
resolveSessionFilePath,
}));

View File

@@ -147,6 +147,8 @@ export type PluginRuntimeCore = {
resolveStorePath: typeof import("../../config/sessions/paths.js").resolveStorePath;
loadSessionStore: typeof import("../../config/sessions/store-load.js").loadSessionStore;
saveSessionStore: import("../../config/sessions/runtime-types.js").SaveSessionStore;
updateSessionStore: typeof import("../../config/sessions/store.js").updateSessionStore;
updateSessionStoreEntry: typeof import("../../config/sessions/store.js").updateSessionStoreEntry;
resolveSessionFilePath: typeof import("../../config/sessions/paths.js").resolveSessionFilePath;
};
};

View File

@@ -12,6 +12,14 @@ function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) {
payloads,
meta: {},
}));
const updateSessionStore = vi.fn(
async (
_storePath: string,
mutator: (store: Record<string, { sessionId?: string; updatedAt?: number }>) => unknown,
) => {
return await mutator(sessionStore);
},
);
return {
runtime: {
resolveAgentDir: vi.fn(() => "/tmp/agent"),
@@ -22,6 +30,7 @@ function createAgentRuntime(payloads: unknown[] = [{ text: "Speak this." }]) {
resolveStorePath: vi.fn(() => "/tmp/sessions.json"),
loadSessionStore: vi.fn(() => sessionStore),
saveSessionStore: vi.fn(async () => {}),
updateSessionStore,
resolveSessionFilePath: vi.fn(() => "/tmp/session.json"),
},
runEmbeddedPiAgent,

View File

@@ -1,5 +1,6 @@
import { randomUUID } from "node:crypto";
import type { RunEmbeddedPiAgentParams } from "../agents/pi-embedded-runner/run/params.js";
import type { SessionEntry } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { RuntimeLogger, PluginRuntimeCore } from "../plugins/runtime/types-core.js";
import {
@@ -54,20 +55,19 @@ export async function consultRealtimeVoiceAgent(params: {
const storePath = params.agentRuntime.session.resolveStorePath(params.cfg.session?.store, {
agentId,
});
const sessionStore = params.agentRuntime.session.loadSessionStore(storePath);
const now = Date.now();
const existing = sessionStore[params.sessionKey] as
| { sessionId?: string; updatedAt?: number }
| undefined;
const sessionId = existing?.sessionId?.trim() || randomUUID();
sessionStore[params.sessionKey] = { ...existing, sessionId, updatedAt: now };
await params.agentRuntime.session.saveSessionStore(storePath, sessionStore);
const sessionEntry = await params.agentRuntime.session.updateSessionStore(storePath, (store) => {
const existing = store[params.sessionKey] as SessionEntry | undefined;
const sessionId = existing?.sessionId?.trim() || randomUUID();
const next: SessionEntry = { ...existing, sessionId, updatedAt: now };
store[params.sessionKey] = next;
return next;
});
const sessionId = sessionEntry.sessionId;
const sessionFile = params.agentRuntime.session.resolveSessionFilePath(
sessionId,
sessionStore[params.sessionKey],
{ agentId },
);
const sessionFile = params.agentRuntime.session.resolveSessionFilePath(sessionId, sessionEntry, {
agentId,
});
const result = await params.agentRuntime.runEmbeddedPiAgent({
sessionId,
sessionKey: params.sessionKey,

View File

@@ -5,10 +5,8 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { resetSessionWriteLockStateForTest } from "../agents/session-write-lock.js";
import {
clearSessionStoreCacheForTest,
getSessionStoreLockQueueSizeForTest,
resetSessionStoreLockRuntimeForTests,
setSessionWriteLockAcquirerForTests,
withSessionStoreLockForTest,
getSessionStoreWriterQueueSizeForTest,
withSessionStoreWriterForTest,
} from "../config/sessions/store.js";
import { resetFileLockStateForTest } from "../infra/file-lock.js";
import {
@@ -17,11 +15,8 @@ import {
setSessionStateCleanupRuntimeForTests,
} from "./session-state-cleanup.js";
const acquireSessionWriteLockMock = vi.hoisted(() =>
vi.fn(async () => ({ release: vi.fn(async () => {}) })),
);
const drainFileLockStateMock = vi.hoisted(() => vi.fn(async () => undefined));
const drainSessionStoreLockQueuesMock = vi.hoisted(() => vi.fn(async () => undefined));
const drainSessionStoreWriterQueuesMock = vi.hoisted(() => vi.fn(async () => undefined));
const drainSessionWriteLockStateMock = vi.hoisted(() => vi.fn(async () => undefined));
function createDeferred<T>() {
@@ -46,14 +41,12 @@ describe("cleanupSessionStateForTest", () => {
clearSessionStoreCacheForTest();
resetFileLockStateForTest();
resetSessionWriteLockStateForTest();
acquireSessionWriteLockMock.mockClear();
drainFileLockStateMock.mockClear();
drainSessionStoreLockQueuesMock.mockClear();
drainSessionStoreWriterQueuesMock.mockClear();
drainSessionWriteLockStateMock.mockClear();
setSessionWriteLockAcquirerForTests(acquireSessionWriteLockMock);
setSessionStateCleanupRuntimeForTests({
drainFileLockStateForTest: drainFileLockStateMock,
drainSessionStoreLockQueuesForTest: drainSessionStoreLockQueuesMock,
drainSessionStoreWriterQueuesForTest: drainSessionStoreWriterQueuesMock,
drainSessionWriteLockStateForTest: drainSessionWriteLockStateMock,
});
});
@@ -63,19 +56,18 @@ describe("cleanupSessionStateForTest", () => {
clearSessionStoreCacheForTest();
resetFileLockStateForTest();
resetSessionWriteLockStateForTest();
resetSessionStoreLockRuntimeForTests();
resetSessionStateCleanupRuntimeForTests();
vi.restoreAllMocks();
});
it("waits for in-flight session store locks before clearing test state", async () => {
it("waits for in-flight session store writer queues before clearing test state", async () => {
const fixtureRoot = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-cleanup-"));
const storePath = path.join(fixtureRoot, "openclaw-sessions.json");
const started = createDeferred<void>();
const release = createDeferred<void>();
const drainRequested = createDeferred<void>();
let finishDrain: () => void = () => undefined;
drainSessionStoreLockQueuesMock.mockImplementationOnce(async () => {
drainSessionStoreWriterQueuesMock.mockImplementationOnce(async () => {
drainRequested.resolve();
await new Promise<void>((resolve) => {
finishDrain = resolve;
@@ -83,13 +75,13 @@ describe("cleanupSessionStateForTest", () => {
});
let running: Promise<void> | undefined;
try {
running = withSessionStoreLockForTest(storePath, async () => {
running = withSessionStoreWriterForTest(storePath, async () => {
started.resolve();
await release.promise;
});
await started.promise;
expect(getSessionStoreLockQueueSizeForTest()).toBe(1);
expect(getSessionStoreWriterQueueSizeForTest()).toBe(1);
let settled = false;
const cleanupPromise = cleanupSessionStateForTest().then(() => {
@@ -99,7 +91,7 @@ describe("cleanupSessionStateForTest", () => {
await drainRequested.promise;
await flushMicrotasks();
expect(settled).toBe(false);
expect(drainSessionStoreLockQueuesMock).toHaveBeenCalledTimes(1);
expect(drainSessionStoreWriterQueuesMock).toHaveBeenCalledTimes(1);
expect(drainFileLockStateMock).not.toHaveBeenCalled();
expect(drainSessionWriteLockStateMock).not.toHaveBeenCalled();
@@ -108,7 +100,7 @@ describe("cleanupSessionStateForTest", () => {
finishDrain();
await cleanupPromise;
expect(getSessionStoreLockQueueSizeForTest()).toBe(0);
expect(getSessionStoreWriterQueueSizeForTest()).toBe(0);
expect(drainFileLockStateMock).toHaveBeenCalledTimes(1);
expect(drainSessionWriteLockStateMock).toHaveBeenCalledTimes(1);
} finally {

View File

@@ -1,22 +1,23 @@
import { drainSessionWriteLockStateForTest } from "../agents/session-write-lock.js";
import { clearSessionStoreCaches } from "../config/sessions/store-cache.js";
import { drainSessionStoreLockQueuesForTest } from "../config/sessions/store-lock-state.js";
import { drainSessionStoreWriterQueuesForTest } from "../config/sessions/store-writer-state.js";
import { drainFileLockStateForTest } from "../infra/file-lock.js";
let fileLockDrainerForTests: typeof drainFileLockStateForTest | null = null;
let sessionStoreLockQueueDrainerForTests: typeof drainSessionStoreLockQueuesForTest | null = null;
let sessionStoreWriterQueueDrainerForTests: typeof drainSessionStoreWriterQueuesForTest | null =
null;
let sessionWriteLockDrainerForTests: typeof drainSessionWriteLockStateForTest | null = null;
export function setSessionStateCleanupRuntimeForTests(params: {
drainFileLockStateForTest?: typeof drainFileLockStateForTest | null;
drainSessionStoreLockQueuesForTest?: typeof drainSessionStoreLockQueuesForTest | null;
drainSessionStoreWriterQueuesForTest?: typeof drainSessionStoreWriterQueuesForTest | null;
drainSessionWriteLockStateForTest?: typeof drainSessionWriteLockStateForTest | null;
}): void {
if ("drainFileLockStateForTest" in params) {
fileLockDrainerForTests = params.drainFileLockStateForTest ?? null;
}
if ("drainSessionStoreLockQueuesForTest" in params) {
sessionStoreLockQueueDrainerForTests = params.drainSessionStoreLockQueuesForTest ?? null;
if ("drainSessionStoreWriterQueuesForTest" in params) {
sessionStoreWriterQueueDrainerForTests = params.drainSessionStoreWriterQueuesForTest ?? null;
}
if ("drainSessionWriteLockStateForTest" in params) {
sessionWriteLockDrainerForTests = params.drainSessionWriteLockStateForTest ?? null;
@@ -25,12 +26,12 @@ export function setSessionStateCleanupRuntimeForTests(params: {
export function resetSessionStateCleanupRuntimeForTests(): void {
fileLockDrainerForTests = null;
sessionStoreLockQueueDrainerForTests = null;
sessionStoreWriterQueueDrainerForTests = null;
sessionWriteLockDrainerForTests = null;
}
export async function cleanupSessionStateForTest(): Promise<void> {
await (sessionStoreLockQueueDrainerForTests ?? drainSessionStoreLockQueuesForTest)();
await (sessionStoreWriterQueueDrainerForTests ?? drainSessionStoreWriterQueuesForTest)();
clearSessionStoreCaches();
await (fileLockDrainerForTests ?? drainFileLockStateForTest)();
await (sessionWriteLockDrainerForTests ?? drainSessionWriteLockStateForTest)();

View File

@@ -25,7 +25,7 @@ type WorkerPluginRuntimeHelpers = {
type WorkerCleanupHelpers = {
clearSessionStoreCaches: typeof import("../src/config/sessions/store-cache.js").clearSessionStoreCaches;
drainFileLockStateForTest: typeof import("../src/infra/file-lock.js").drainFileLockStateForTest;
drainSessionStoreLockQueuesForTest: typeof import("../src/config/sessions/store-lock-state.js").drainSessionStoreLockQueuesForTest;
drainSessionStoreWriterQueuesForTest: typeof import("../src/config/sessions/store-writer-state.js").drainSessionStoreWriterQueuesForTest;
drainSessionWriteLockStateForTest: typeof import("../src/agents/session-write-lock.js").drainSessionWriteLockStateForTest;
resetContextWindowCacheForTest: typeof import("../src/agents/context-runtime-state.js").resetContextWindowCacheForTest;
resetFileLockStateForTest: typeof import("../src/infra/file-lock.js").resetFileLockStateForTest;
@@ -80,8 +80,8 @@ function loadWorkerCleanupHelpers(): Promise<WorkerCleanupHelpers> {
vi.importActual<typeof import("../src/config/sessions/store-cache.js")>(
"../src/config/sessions/store-cache.js",
),
vi.importActual<typeof import("../src/config/sessions/store-lock-state.js")>(
"../src/config/sessions/store-lock-state.js",
vi.importActual<typeof import("../src/config/sessions/store-writer-state.js")>(
"../src/config/sessions/store-writer-state.js",
),
vi.importActual<typeof import("../src/infra/file-lock.js")>("../src/infra/file-lock.js"),
]).then(
@@ -90,12 +90,13 @@ function loadWorkerCleanupHelpers(): Promise<WorkerCleanupHelpers> {
modelsConfigState,
sessionWriteLock,
sessionStoreCache,
sessionStoreLockState,
sessionStoreWriterState,
fileLock,
]) => ({
clearSessionStoreCaches: sessionStoreCache.clearSessionStoreCaches,
drainFileLockStateForTest: fileLock.drainFileLockStateForTest,
drainSessionStoreLockQueuesForTest: sessionStoreLockState.drainSessionStoreLockQueuesForTest,
drainSessionStoreWriterQueuesForTest:
sessionStoreWriterState.drainSessionStoreWriterQueuesForTest,
drainSessionWriteLockStateForTest: sessionWriteLock.drainSessionWriteLockStateForTest,
resetContextWindowCacheForTest: contextRuntimeState.resetContextWindowCacheForTest,
resetFileLockStateForTest: fileLock.resetFileLockStateForTest,
@@ -393,14 +394,14 @@ afterEach(async () => {
const {
clearSessionStoreCaches,
drainFileLockStateForTest,
drainSessionStoreLockQueuesForTest,
drainSessionStoreWriterQueuesForTest,
drainSessionWriteLockStateForTest,
resetContextWindowCacheForTest,
resetFileLockStateForTest,
resetModelsJsonReadyCacheForTest,
resetSessionWriteLockStateForTest,
} = await loadWorkerCleanupHelpers();
await drainSessionStoreLockQueuesForTest();
await drainSessionStoreWriterQueuesForTest();
clearSessionStoreCaches();
await drainFileLockStateForTest();
await drainSessionWriteLockStateForTest();