diff --git a/src/config/sessions/store-load.ts b/src/config/sessions/store-load.ts new file mode 100644 index 00000000000..d8a07aa658a --- /dev/null +++ b/src/config/sessions/store-load.ts @@ -0,0 +1,136 @@ +import fs from "node:fs"; +import { + normalizeSessionDeliveryFields, + type DeliveryContext, +} from "../../utils/delivery-context.js"; +import { getFileStatSnapshot } from "../cache-utils.js"; +import { + isSessionStoreCacheEnabled, + readSessionStoreCache, + setSerializedSessionStore, + writeSessionStoreCache, +} from "./store-cache.js"; +import { applySessionStoreMigrations } from "./store-migrations.js"; +import { normalizeSessionRuntimeModelFields, type SessionEntry } from "./types.js"; + +export type LoadSessionStoreOptions = { + skipCache?: boolean; +}; + +function isSessionStoreRecord(value: unknown): value is Record { + return !!value && typeof value === "object" && !Array.isArray(value); +} + +function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { + const normalized = normalizeSessionDeliveryFields({ + channel: entry.channel, + lastChannel: entry.lastChannel, + lastTo: entry.lastTo, + lastAccountId: entry.lastAccountId, + lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId, + deliveryContext: entry.deliveryContext, + }); + const nextDelivery = normalized.deliveryContext; + const sameDelivery = + (entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel && + (entry.deliveryContext?.to ?? undefined) === nextDelivery?.to && + (entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId && + (entry.deliveryContext?.threadId ?? undefined) === nextDelivery?.threadId; + const sameLast = + entry.lastChannel === normalized.lastChannel && + entry.lastTo === normalized.lastTo && + entry.lastAccountId === normalized.lastAccountId && + entry.lastThreadId === normalized.lastThreadId; + if (sameDelivery && sameLast) { + return entry; + } + return { + ...entry, + deliveryContext: nextDelivery, + lastChannel: normalized.lastChannel, + lastTo: normalized.lastTo, + lastAccountId: normalized.lastAccountId, + lastThreadId: normalized.lastThreadId, + }; +} + +export function normalizeSessionStore(store: Record): void { + for (const [key, entry] of Object.entries(store)) { + if (!entry) { + continue; + } + const normalized = normalizeSessionEntryDelivery(normalizeSessionRuntimeModelFields(entry)); + if (normalized !== entry) { + store[key] = normalized; + } + } +} + +export function loadSessionStore( + storePath: string, + opts: LoadSessionStoreOptions = {}, +): Record { + if (!opts.skipCache && isSessionStoreCacheEnabled()) { + const currentFileStat = getFileStatSnapshot(storePath); + const cached = readSessionStoreCache({ + storePath, + mtimeMs: currentFileStat?.mtimeMs, + sizeBytes: currentFileStat?.sizeBytes, + }); + if (cached) { + return cached; + } + } + + // Retry a few times on Windows because readers can briefly observe empty or + // transiently invalid content while another process is swapping the file. + let store: Record = {}; + let fileStat = getFileStatSnapshot(storePath); + let mtimeMs = fileStat?.mtimeMs; + let serializedFromDisk: string | undefined; + const maxReadAttempts = process.platform === "win32" ? 3 : 1; + const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined; + for (let attempt = 0; attempt < maxReadAttempts; attempt += 1) { + try { + const raw = fs.readFileSync(storePath, "utf-8"); + if (raw.length === 0 && attempt < maxReadAttempts - 1) { + Atomics.wait(retryBuf!, 0, 0, 50); + continue; + } + const parsed = JSON.parse(raw); + if (isSessionStoreRecord(parsed)) { + store = parsed; + serializedFromDisk = raw; + } + fileStat = getFileStatSnapshot(storePath) ?? fileStat; + mtimeMs = fileStat?.mtimeMs; + break; + } catch { + if (attempt < maxReadAttempts - 1) { + Atomics.wait(retryBuf!, 0, 0, 50); + continue; + } + } + } + + if (serializedFromDisk !== undefined) { + setSerializedSessionStore(storePath, serializedFromDisk); + } else { + setSerializedSessionStore(storePath, undefined); + } + + applySessionStoreMigrations(store); + normalizeSessionStore(store); + + if (!opts.skipCache && isSessionStoreCacheEnabled()) { + writeSessionStoreCache({ + storePath, + store, + mtimeMs, + sizeBytes: fileStat?.sizeBytes, + serialized: serializedFromDisk, + }); + } + + return structuredClone(store); +} diff --git a/src/config/sessions/store.ts b/src/config/sessions/store.ts index ff6c62b3494..18bc41ec802 100644 --- a/src/config/sessions/store.ts +++ b/src/config/sessions/store.ts @@ -21,10 +21,10 @@ import { dropSessionStoreObjectCache, getSerializedSessionStore, isSessionStoreCacheEnabled, - readSessionStoreCache, setSerializedSessionStore, writeSessionStoreCache, } from "./store-cache.js"; +import { loadSessionStore, normalizeSessionStore } from "./store-load.js"; import { clearSessionStoreCacheForTest, drainSessionStoreLockQueuesForTest, @@ -42,11 +42,9 @@ import { type ResolvedSessionMaintenanceConfig, type SessionMaintenanceWarning, } from "./store-maintenance.js"; -import { applySessionStoreMigrations } from "./store-migrations.js"; import { mergeSessionEntry, mergeSessionEntryPreserveActivity, - normalizeSessionRuntimeModelFields, type SessionEntry, } from "./types.js"; @@ -55,6 +53,7 @@ export { drainSessionStoreLockQueuesForTest, getSessionStoreLockQueueSizeForTest, } from "./store-lock-state.js"; +export { loadSessionStore } from "./store-load.js"; const log = createSubsystemLogger("sessions/store"); let sessionArchiveRuntimePromise: Promise< @@ -67,43 +66,6 @@ function loadSessionArchiveRuntime() { return sessionArchiveRuntimePromise; } -function isSessionStoreRecord(value: unknown): value is Record { - return !!value && typeof value === "object" && !Array.isArray(value); -} - -function normalizeSessionEntryDelivery(entry: SessionEntry): SessionEntry { - const normalized = normalizeSessionDeliveryFields({ - channel: entry.channel, - lastChannel: entry.lastChannel, - lastTo: entry.lastTo, - lastAccountId: entry.lastAccountId, - lastThreadId: entry.lastThreadId ?? entry.deliveryContext?.threadId ?? entry.origin?.threadId, - deliveryContext: entry.deliveryContext, - }); - const nextDelivery = normalized.deliveryContext; - const sameDelivery = - (entry.deliveryContext?.channel ?? undefined) === nextDelivery?.channel && - (entry.deliveryContext?.to ?? undefined) === nextDelivery?.to && - (entry.deliveryContext?.accountId ?? undefined) === nextDelivery?.accountId && - (entry.deliveryContext?.threadId ?? undefined) === nextDelivery?.threadId; - const sameLast = - entry.lastChannel === normalized.lastChannel && - entry.lastTo === normalized.lastTo && - entry.lastAccountId === normalized.lastAccountId && - entry.lastThreadId === normalized.lastThreadId; - if (sameDelivery && sameLast) { - return entry; - } - return { - ...entry, - deliveryContext: nextDelivery, - lastChannel: normalized.lastChannel, - lastTo: normalized.lastTo, - lastAccountId: normalized.lastAccountId, - lastThreadId: normalized.lastThreadId, - }; -} - function removeThreadFromDeliveryContext(context?: DeliveryContext): DeliveryContext | undefined { if (!context || context.threadId == null) { return context; @@ -158,18 +120,6 @@ export function resolveSessionStoreEntry(params: { }; } -function normalizeSessionStore(store: Record): void { - for (const [key, entry] of Object.entries(store)) { - if (!entry) { - continue; - } - const normalized = normalizeSessionEntryDelivery(normalizeSessionRuntimeModelFields(entry)); - if (normalized !== entry) { - store[key] = normalized; - } - } -} - export function setSessionWriteLockAcquirerForTests( acquirer: typeof acquireSessionWriteLock | null, ): void { @@ -188,86 +138,6 @@ export async function withSessionStoreLockForTest( return await withSessionStoreLock(storePath, fn, opts); } -type LoadSessionStoreOptions = { - skipCache?: boolean; -}; - -export function loadSessionStore( - storePath: string, - opts: LoadSessionStoreOptions = {}, -): Record { - // Check cache first if enabled - if (!opts.skipCache && isSessionStoreCacheEnabled()) { - const currentFileStat = getFileStatSnapshot(storePath); - const cached = readSessionStoreCache({ - storePath, - mtimeMs: currentFileStat?.mtimeMs, - sizeBytes: currentFileStat?.sizeBytes, - }); - if (cached) { - return cached; - } - } - - // Cache miss or disabled - load from disk. - // Retry up to 3 times when the file is empty or unparseable. On Windows the - // temp-file + rename write is not fully atomic: a concurrent reader can briefly - // observe a 0-byte file (between truncate and write) or a stale/locked state. - // A short synchronous backoff (50 ms via `Atomics.wait`) is enough for the - // writer to finish. - let store: Record = {}; - let fileStat = getFileStatSnapshot(storePath); - let mtimeMs = fileStat?.mtimeMs; - let serializedFromDisk: string | undefined; - const maxReadAttempts = process.platform === "win32" ? 3 : 1; - const retryBuf = maxReadAttempts > 1 ? new Int32Array(new SharedArrayBuffer(4)) : undefined; - for (let attempt = 0; attempt < maxReadAttempts; attempt++) { - try { - const raw = fs.readFileSync(storePath, "utf-8"); - if (raw.length === 0 && attempt < maxReadAttempts - 1) { - // File is empty — likely caught mid-write; retry after a brief pause. - Atomics.wait(retryBuf!, 0, 0, 50); - continue; - } - const parsed = JSON.parse(raw); - if (isSessionStoreRecord(parsed)) { - store = parsed; - serializedFromDisk = raw; - } - fileStat = getFileStatSnapshot(storePath) ?? fileStat; - mtimeMs = fileStat?.mtimeMs; - break; - } catch { - // File missing, locked, or transiently corrupt — retry on Windows. - if (attempt < maxReadAttempts - 1) { - Atomics.wait(retryBuf!, 0, 0, 50); - continue; - } - // Final attempt failed; proceed with an empty store. - } - } - if (serializedFromDisk !== undefined) { - setSerializedSessionStore(storePath, serializedFromDisk); - } else { - setSerializedSessionStore(storePath, undefined); - } - - applySessionStoreMigrations(store); - - // Cache the result if caching is enabled - if (!opts.skipCache && isSessionStoreCacheEnabled()) { - writeSessionStoreCache({ - storePath, - store, - mtimeMs, - sizeBytes: fileStat?.sizeBytes, - serialized: serializedFromDisk, - }); - } - - return structuredClone(store); -} - export function readSessionUpdatedAt(params: { storePath: string; sessionKey: string; diff --git a/src/cron/isolated-agent/delivery-target.test.ts b/src/cron/isolated-agent/delivery-target.test.ts index 1ae36782396..91403b0cf73 100644 --- a/src/cron/isolated-agent/delivery-target.test.ts +++ b/src/cron/isolated-agent/delivery-target.test.ts @@ -12,7 +12,7 @@ vi.mock("../../config/sessions/paths.js", () => ({ resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"), })); -vi.mock("../../config/sessions/store.js", () => ({ +vi.mock("../../config/sessions/store-load.js", () => ({ loadSessionStore: vi.fn().mockReturnValue({}), })); @@ -33,13 +33,13 @@ vi.mock("../../pairing/pairing-store.js", () => ({ const mockedModuleIds = [ "../../config/sessions/main-session.js", "../../config/sessions/paths.js", - "../../config/sessions/store.js", + "../../config/sessions/store-load.js", "../../infra/outbound/channel-selection.js", "../../infra/outbound/target-resolver.js", "../../pairing/pairing-store.js", ]; -import { loadSessionStore } from "../../config/sessions/store.js"; +import { loadSessionStore } from "../../config/sessions/store-load.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js"; import { readChannelAllowFromStoreSync } from "../../pairing/pairing-store.js"; diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index dae114fb28b..bc6ee39ef4f 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -3,7 +3,7 @@ import type { ChannelId } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; import { resolveAgentMainSessionKey } from "../../config/sessions/main-session.js"; import { resolveStorePath } from "../../config/sessions/paths.js"; -import { loadSessionStore } from "../../config/sessions/store.js"; +import { loadSessionStore } from "../../config/sessions/store-load.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js"; import type { OutboundChannel } from "../../infra/outbound/targets.js"; diff --git a/src/cron/isolated-agent/session.test.ts b/src/cron/isolated-agent/session.test.ts index 8310276d75a..b3ac4b4a9c6 100644 --- a/src/cron/isolated-agent/session.test.ts +++ b/src/cron/isolated-agent/session.test.ts @@ -1,9 +1,15 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; -vi.mock("../../config/sessions.js", () => ({ +vi.mock("../../config/sessions/store-load.js", () => ({ loadSessionStore: vi.fn(), +})); + +vi.mock("../../config/sessions/paths.js", () => ({ resolveStorePath: vi.fn().mockReturnValue("/tmp/test-store.json"), +})); + +vi.mock("../../config/sessions/reset.js", () => ({ evaluateSessionFreshness: vi.fn().mockReturnValue({ fresh: true }), resolveSessionResetPolicy: vi.fn().mockReturnValue({ mode: "idle", idleMinutes: 60 }), })); @@ -18,7 +24,8 @@ vi.mock("../../agents/bootstrap-cache.js", () => ({ })); import { clearBootstrapSnapshot } from "../../agents/bootstrap-cache.js"; -import { loadSessionStore, evaluateSessionFreshness } from "../../config/sessions.js"; +import { evaluateSessionFreshness } from "../../config/sessions/reset.js"; +import { loadSessionStore } from "../../config/sessions/store-load.js"; import { resolveCronSession } from "./session.js"; const NOW_MS = 1_737_600_000_000; diff --git a/src/cron/isolated-agent/session.ts b/src/cron/isolated-agent/session.ts index c7bde5cea2d..8c94a670412 100644 --- a/src/cron/isolated-agent/session.ts +++ b/src/cron/isolated-agent/session.ts @@ -1,13 +1,10 @@ import crypto from "node:crypto"; import { clearBootstrapSnapshotOnSessionRollover } from "../../agents/bootstrap-cache.js"; import type { OpenClawConfig } from "../../config/config.js"; -import { - evaluateSessionFreshness, - loadSessionStore, - resolveSessionResetPolicy, - resolveStorePath, - type SessionEntry, -} from "../../config/sessions.js"; +} from "../../config/sessions/reset.js"; +import { resolveStorePath } from "../../config/sessions/paths.js"; +import { loadSessionStore } from "../../config/sessions/store-load.js"; +import type { SessionEntry } from "../../config/sessions/types.js"; export function resolveCronSession(params: { cfg: OpenClawConfig;