From f504f4cea70bb2d8b0cdf58c7d4d30778a16e9d4 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 16:21:52 +0100 Subject: [PATCH] fix(plugin-sdk): restore sqlite compatibility shims --- package.json | 8 + scripts/lib/plugin-sdk-entrypoints.json | 2 + src/plugin-sdk/channel-pairing-paths.ts | 18 + src/plugin-sdk/file-lock.ts | 115 +++++++ src/plugin-sdk/session-store-runtime.test.ts | 45 +++ src/plugin-sdk/session-store-runtime.ts | 325 ++++++++++++++++++- src/plugin-sdk/sqlite-state-lock.test.ts | 41 +++ src/plugin-sdk/sqlite-state-lock.ts | 52 +++ src/state/openclaw-state-lock.test.ts | 60 ++++ src/state/openclaw-state-lock.ts | 51 +++ 10 files changed, 708 insertions(+), 9 deletions(-) create mode 100644 src/plugin-sdk/channel-pairing-paths.ts create mode 100644 src/plugin-sdk/file-lock.ts create mode 100644 src/plugin-sdk/session-store-runtime.test.ts create mode 100644 src/state/openclaw-state-lock.test.ts diff --git a/package.json b/package.json index a25f1071035..6c0a2dc5e9a 100644 --- a/package.json +++ b/package.json @@ -877,6 +877,10 @@ "types": "./dist/plugin-sdk/channel-pairing.d.ts", "default": "./dist/plugin-sdk/channel-pairing.js" }, + "./plugin-sdk/channel-pairing-paths": { + "types": "./dist/plugin-sdk/channel-pairing-paths.d.ts", + "default": "./dist/plugin-sdk/channel-pairing-paths.js" + }, "./plugin-sdk/channel-policy": { "types": "./dist/plugin-sdk/channel-policy.d.ts", "default": "./dist/plugin-sdk/channel-policy.js" @@ -897,6 +901,10 @@ "types": "./dist/plugin-sdk/context-visibility-runtime.d.ts", "default": "./dist/plugin-sdk/context-visibility-runtime.js" }, + "./plugin-sdk/file-lock": { + "types": "./dist/plugin-sdk/file-lock.d.ts", + "default": "./dist/plugin-sdk/file-lock.js" + }, "./plugin-sdk/fetch-runtime": { "types": "./dist/plugin-sdk/fetch-runtime.d.ts", "default": "./dist/plugin-sdk/fetch-runtime.js" diff --git a/scripts/lib/plugin-sdk-entrypoints.json b/scripts/lib/plugin-sdk-entrypoints.json index c43d6dfd716..45de2737ef6 100644 --- a/scripts/lib/plugin-sdk-entrypoints.json +++ b/scripts/lib/plugin-sdk-entrypoints.json @@ -200,11 +200,13 @@ "channel-message-runtime", "channel-outbound", "channel-pairing", + "channel-pairing-paths", "channel-policy", "channel-send-result", "channel-route", "channel-targets", "context-visibility-runtime", + "file-lock", "fetch-runtime", "runtime-fetch", "response-limit-runtime", diff --git a/src/plugin-sdk/channel-pairing-paths.ts b/src/plugin-sdk/channel-pairing-paths.ts new file mode 100644 index 00000000000..2293704a671 --- /dev/null +++ b/src/plugin-sdk/channel-pairing-paths.ts @@ -0,0 +1,18 @@ +import path from "node:path"; +import { resolveOAuthDir, resolveStateDir } from "../config/paths.js"; +import { safeAccountKey, safeChannelKey } from "../pairing/pairing-store-keys.js"; +import type { PairingChannel } from "../pairing/pairing-store.types.js"; + +export function resolveChannelAllowFromPath( + channel: PairingChannel, + env: NodeJS.ProcessEnv = process.env, + accountId?: string, +): string { + const stateDir = resolveStateDir(env); + const base = safeChannelKey(channel); + const accountKey = accountId?.trim() ? safeAccountKey(accountId) : null; + return path.join( + resolveOAuthDir(env, stateDir), + accountKey ? `${base}-${accountKey}-allowFrom.json` : `${base}-allowFrom.json`, + ); +} diff --git a/src/plugin-sdk/file-lock.ts b/src/plugin-sdk/file-lock.ts new file mode 100644 index 00000000000..06b5208dd62 --- /dev/null +++ b/src/plugin-sdk/file-lock.ts @@ -0,0 +1,115 @@ +import "../infra/fs-safe-defaults.js"; +import { + acquireFileLock as acquireFsSafeFileLock, + drainFileLockManagerForTest, + resetFileLockManagerForTest, +} from "@openclaw/fs-safe/file-lock"; +import { shouldRemoveDeadOwnerOrExpiredLock } from "../infra/stale-lock-file.js"; + +export type FileLockOptions = { + retries: { + retries: number; + factor: number; + minTimeout: number; + maxTimeout: number; + randomize?: boolean; + }; + stale: number; +}; + +export type FileLockHandle = { + lockPath: string; + release: () => Promise; +}; + +export const FILE_LOCK_TIMEOUT_ERROR_CODE = "file_lock_timeout"; +export const FILE_LOCK_STALE_ERROR_CODE = "file_lock_stale"; + +export type FileLockTimeoutError = Error & { + code: typeof FILE_LOCK_TIMEOUT_ERROR_CODE; + lockPath: string; +}; + +export type FileLockStaleError = Error & { + code: typeof FILE_LOCK_STALE_ERROR_CODE; + lockPath: string; +}; + +const FILE_LOCK_MANAGER_KEY = "openclaw.plugin-sdk.file-lock"; + +async function shouldReclaimPluginLock(params: { + lockPath: string; + payload: Record | null; + staleMs: number; + nowMs: number; +}): Promise { + return shouldRemoveDeadOwnerOrExpiredLock({ + payload: params.payload, + staleMs: params.staleMs, + nowMs: params.nowMs, + }); +} + +function normalizeLockError(err: unknown): never { + if ((err as { code?: unknown }).code === FILE_LOCK_TIMEOUT_ERROR_CODE) { + throw Object.assign(new Error((err as Error).message), { + code: FILE_LOCK_TIMEOUT_ERROR_CODE, + lockPath: (err as { lockPath?: string }).lockPath ?? "", + }) as FileLockTimeoutError; + } + if ((err as { code?: unknown }).code === FILE_LOCK_STALE_ERROR_CODE) { + throw Object.assign(new Error((err as Error).message), { + code: FILE_LOCK_STALE_ERROR_CODE, + lockPath: (err as { lockPath?: string }).lockPath ?? "", + }) as FileLockStaleError; + } + throw err; +} + +export function resetFileLockStateForTest(): void { + resetFileLockManagerForTest(FILE_LOCK_MANAGER_KEY, FILE_LOCK_MANAGER_KEY); +} + +export async function drainFileLockStateForTest(): Promise { + await drainFileLockManagerForTest(FILE_LOCK_MANAGER_KEY, FILE_LOCK_MANAGER_KEY); +} + +/** Acquire a re-entrant process-local file lock backed by a `.lock` sidecar file. */ +export async function acquireFileLock( + filePath: string, + options: FileLockOptions, +): Promise { + try { + const lock = await acquireFsSafeFileLock(filePath, { + managerKey: FILE_LOCK_MANAGER_KEY, + staleMs: options.stale, + retry: options.retries, + staleRecovery: "remove-if-unchanged", + allowReentrant: true, + payload: () => ({ pid: process.pid, createdAt: new Date().toISOString() }), + shouldReclaim: shouldReclaimPluginLock, + shouldRemoveStaleLock: (snapshot) => + shouldRemoveDeadOwnerOrExpiredLock({ + payload: snapshot.payload, + staleMs: options.stale, + }), + }); + return { lockPath: lock.lockPath, release: lock.release }; + } catch (err) { + return normalizeLockError(err); + } +} + +/** Run an async callback while holding a file lock, always releasing the lock afterward. */ +export async function withFileLock( + filePath: string, + options: FileLockOptions, + fn: () => Promise, +): Promise { + const lock = await acquireFileLock(filePath, options); + try { + return await fn(); + } finally { + await lock.release(); + } +} diff --git a/src/plugin-sdk/session-store-runtime.test.ts b/src/plugin-sdk/session-store-runtime.test.ts new file mode 100644 index 00000000000..ce3411e71d6 --- /dev/null +++ b/src/plugin-sdk/session-store-runtime.test.ts @@ -0,0 +1,45 @@ +import fs from "node:fs/promises"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; +import { + loadSessionStore, + readSessionUpdatedAt, + upsertSessionEntry, +} from "./session-store-runtime.js"; + +describe("session-store-runtime compatibility", () => { + it("rejects custom store paths instead of falling back to the default agent", async () => { + await withOpenClawTestState( + { + layout: "state-only", + prefix: "openclaw-session-store-compat-", + scenario: "minimal", + }, + async (state) => { + const customStorePath = path.join(state.path("custom"), "sessions.json"); + await fs.mkdir(path.dirname(customStorePath), { recursive: true }); + + upsertSessionEntry({ + agentId: "default", + env: { ...process.env, OPENCLAW_STATE_DIR: state.stateDir }, + sessionKey: "discord:default:user:1", + entry: { + sessionId: "default-session", + updatedAt: 123, + sessionStartedAt: 123, + }, + }); + + expect(() => loadSessionStore(customStorePath)).toThrow(/Custom sessions\.json paths/); + expect( + readSessionUpdatedAt({ + agentId: "default", + sessionKey: "discord:default:user:1", + storePath: customStorePath, + }), + ).toBe(123); + }, + ); + }); +}); diff --git a/src/plugin-sdk/session-store-runtime.ts b/src/plugin-sdk/session-store-runtime.ts index 5807f293e1a..0c7d962f57a 100644 --- a/src/plugin-sdk/session-store-runtime.ts +++ b/src/plugin-sdk/session-store-runtime.ts @@ -1,8 +1,34 @@ -// Narrow SQLite session row helpers for channel hot paths. +// SQLite session row helpers plus deprecated session-store compatibility shims. -export { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; -export { resolveSessionRowEntry } from "../config/sessions/store-entry.js"; -export { resolveAndPersistSessionTranscriptScope } from "../config/sessions/session-scope.js"; +import path from "node:path"; +import type { MsgContext } from "../auto-reply/templating.js"; +import { resolveStateDir } from "../config/paths.js"; +import { loadSqliteSessionEntries } from "../config/sessions/session-entries.sqlite.js"; +import { normalizeSessionEntries } from "../config/sessions/session-entry-normalize.js"; +import { resolveAndPersistSessionTranscriptScope } from "../config/sessions/session-scope.js"; +import { resolveSessionRowEntry } from "../config/sessions/store-entry.js"; +import { + deleteSessionEntry, + getSessionEntry, + listSessionEntries, + patchSessionEntry, + readSessionUpdatedAt as readSqliteSessionUpdatedAt, + recordSessionMetaFromInbound as recordSessionMetaFromInboundSqlite, + updateLastRoute as updateLastRouteSqlite, + upsertSessionEntry, +} from "../config/sessions/store.js"; +import type { SessionEntry, SessionScope } from "../config/sessions/types.js"; +import { + DEFAULT_AGENT_ID, + normalizeAgentId, + resolveAgentIdFromSessionKey, +} from "../routing/session-key.js"; +import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js"; +import { resolveUserPath } from "../utils.js"; + +export { closeOpenClawAgentDatabasesForTest }; +export { resolveSessionRowEntry }; +export { resolveAndPersistSessionTranscriptScope }; export { readLatestAssistantTextFromSessionTranscript } from "../config/sessions/transcript.js"; export { resolveSessionKey } from "../config/sessions/session-key.js"; export { resolveGroupSessionKey } from "../config/sessions/group.js"; @@ -18,11 +44,8 @@ export { getSessionEntry, listSessionEntries, patchSessionEntry, - readSessionUpdatedAt, - recordSessionMetaFromInbound, - updateLastRoute, upsertSessionEntry, -} from "../config/sessions/store.js"; +}; export { evaluateSessionFreshness, resolveChannelResetConfig, @@ -30,4 +53,288 @@ export { resolveSessionResetType, resolveThreadFlag, } from "../config/sessions/reset.js"; -export type { SessionEntry, SessionScope } from "../config/sessions/types.js"; +export type { SessionEntry, SessionScope }; + +type SessionRowOptions = { + agentId: string; + env?: NodeJS.ProcessEnv; +}; + +type SaveSessionStoreOptions = { + skipMaintenance?: boolean; + activeSessionKey?: string; + allowDropAcpMetaSessionKeys?: string[]; + onWarn?: (warning: unknown) => void | Promise; + onMaintenanceApplied?: (report: unknown) => void | Promise; + maintenanceOverride?: unknown; + maintenanceConfig?: unknown; +}; + +type CompatSessionEntry = SessionEntry & { sessionFile?: string }; + +const SAFE_SESSION_ID_RE = /^[a-z0-9][a-z0-9._-]{0,127}$/i; + +function optionsWithEnv(agentId: string, env?: NodeJS.ProcessEnv): SessionRowOptions { + return env ? { agentId, env } : { agentId }; +} + +function parseSessionStorePath(storePath: string): { agentId: string; stateDir: string } | null { + const resolved = path.resolve(storePath); + if (path.basename(resolved) !== "sessions.json") { + return null; + } + const sessionsDir = path.dirname(resolved); + if (path.basename(sessionsDir) !== "sessions") { + return null; + } + const agentDir = path.dirname(sessionsDir); + const agentsDir = path.dirname(agentDir); + if (path.basename(agentsDir) !== "agents") { + return null; + } + const agentId = path.basename(agentDir); + if (!agentId) { + return null; + } + return { + agentId: normalizeAgentId(agentId), + stateDir: path.dirname(agentsDir), + }; +} + +function resolveSessionRowOptionsFromStorePath( + storePath: string, + fallback?: { agentId?: string; env?: NodeJS.ProcessEnv }, +): SessionRowOptions { + const parsed = parseSessionStorePath(storePath); + if (!parsed) { + if (fallback?.agentId) { + return optionsWithEnv(normalizeAgentId(fallback.agentId), fallback.env); + } + throw new Error( + "Custom sessions.json paths are not supported by the SQLite session-store compatibility API. Pass a canonical OpenClaw sessions path or use the row APIs with an explicit agentId.", + ); + } + return optionsWithEnv(parsed.agentId, { + ...process.env, + OPENCLAW_STATE_DIR: parsed.stateDir, + }); +} + +function resolveSessionRowOptions(params: { + agentId?: string; + sessionKey?: string; + storePath?: string; + env?: NodeJS.ProcessEnv; +}): SessionRowOptions { + if (params.storePath) { + const resolved = resolveSessionRowOptionsFromStorePath(params.storePath, params); + return params.env + ? optionsWithEnv(resolved.agentId, { + ...params.env, + OPENCLAW_STATE_DIR: resolved.env?.OPENCLAW_STATE_DIR, + }) + : resolved; + } + const agentId = + params.agentId ?? + (params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined) ?? + DEFAULT_AGENT_ID; + return optionsWithEnv(normalizeAgentId(agentId), params.env); +} + +export function clearSessionStoreCacheForTest(): void { + closeOpenClawAgentDatabasesForTest(); +} + +export function resolveSessionStoreEntry(params: { + store: Record; + sessionKey: string; +}): { + normalizedKey: string; + existing: SessionEntry | undefined; + legacyKeys: string[]; +} { + const resolved = resolveSessionRowEntry({ entries: params.store, sessionKey: params.sessionKey }); + return { ...resolved, legacyKeys: [] }; +} + +export function resolveStorePath( + store?: string, + opts?: { agentId?: string; env?: NodeJS.ProcessEnv }, +): string { + const agentId = normalizeAgentId(opts?.agentId ?? DEFAULT_AGENT_ID); + const env = opts?.env ?? process.env; + if (!store) { + return path.join(resolveStateDir(env), "agents", agentId, "sessions", "sessions.json"); + } + return path.resolve(resolveUserPath(store.replaceAll("{agentId}", agentId), env)); +} + +export function resolveSessionTranscriptPathInDir( + sessionId: string, + sessionsDir: string, + topicId?: string | number, +): string { + const trimmed = sessionId.trim(); + if (!SAFE_SESSION_ID_RE.test(trimmed)) { + throw new Error(`Invalid session ID: ${sessionId}`); + } + const safeTopicId = + typeof topicId === "string" + ? encodeURIComponent(topicId) + : typeof topicId === "number" + ? String(topicId) + : undefined; + const fileName = + safeTopicId === undefined ? `${trimmed}.jsonl` : `${trimmed}-topic-${safeTopicId}.jsonl`; + return path.resolve(sessionsDir, fileName); +} + +export function loadSessionStore(storePath: string): Record { + return loadSqliteSessionEntries(resolveSessionRowOptionsFromStorePath(storePath)); +} + +export async function saveSessionStore( + storePath: string, + store: Record, + _opts?: SaveSessionStoreOptions, +): Promise { + normalizeSessionEntries(store); + const options = resolveSessionRowOptionsFromStorePath(storePath); + const existing = loadSqliteSessionEntries(options); + for (const sessionKey of Object.keys(existing)) { + if (!Object.prototype.hasOwnProperty.call(store, sessionKey)) { + deleteSessionEntry({ ...options, sessionKey }); + } + } + for (const [sessionKey, entry] of Object.entries(store)) { + upsertSessionEntry({ ...options, sessionKey, entry }); + } +} + +export async function updateSessionStore( + storePath: string, + mutator: (store: Record) => Promise | T, + opts?: SaveSessionStoreOptions, +): Promise { + const store = loadSessionStore(storePath); + const result = await mutator(store); + await saveSessionStore(storePath, store, opts); + return result; +} + +export async function updateSessionStoreEntry(params: { + storePath: string; + sessionKey: string; + update: (entry: SessionEntry) => Promise | null>; +}): Promise { + const options = resolveSessionRowOptionsFromStorePath(params.storePath); + return await patchSessionEntry({ + ...options, + sessionKey: params.sessionKey, + update: params.update, + }); +} + +export async function resolveAndPersistSessionFile(params: { + sessionId: string; + sessionKey: string; + sessionStore: Record; + storePath: string; + sessionEntry?: CompatSessionEntry; + agentId?: string; + sessionsDir?: string; + fallbackSessionFile?: string; + activeSessionKey?: string; + maintenanceConfig?: unknown; +}): Promise<{ sessionFile: string; sessionEntry: CompatSessionEntry }> { + const now = Date.now(); + const baseEntry = params.sessionEntry ?? + params.sessionStore[params.sessionKey] ?? { + sessionId: params.sessionId, + updatedAt: now, + sessionStartedAt: now, + }; + const sessionFile = + params.fallbackSessionFile?.trim() ?? + resolveSessionTranscriptPathInDir( + params.sessionId, + params.sessionsDir ?? path.dirname(path.resolve(params.storePath)), + ); + const sessionEntry: CompatSessionEntry = { + ...baseEntry, + sessionId: params.sessionId, + sessionFile, + updatedAt: now, + sessionStartedAt: + baseEntry.sessionId === params.sessionId ? (baseEntry.sessionStartedAt ?? now) : now, + }; + params.sessionStore[params.sessionKey] = sessionEntry; + upsertSessionEntry({ + ...resolveSessionRowOptions({ + storePath: params.storePath, + agentId: params.agentId, + sessionKey: params.sessionKey, + }), + sessionKey: params.sessionKey, + entry: sessionEntry as SessionEntry, + }); + return { sessionFile, sessionEntry }; +} + +export function readSessionUpdatedAt(params: { + agentId?: string; + storePath?: string; + sessionKey: string; +}): number | undefined { + return readSqliteSessionUpdatedAt({ + ...resolveSessionRowOptions(params), + sessionKey: params.sessionKey, + }); +} + +export async function recordSessionMetaFromInbound(params: { + agentId?: string; + storePath?: string; + sessionKey: string; + ctx: MsgContext; + groupResolution?: import("../config/sessions/types.js").GroupKeyResolution | null; + createIfMissing?: boolean; +}): Promise { + return await recordSessionMetaFromInboundSqlite({ + ...resolveSessionRowOptions(params), + sessionKey: params.sessionKey, + ctx: params.ctx, + groupResolution: params.groupResolution, + createIfMissing: params.createIfMissing, + }); +} + +export async function updateLastRoute(params: { + agentId?: string; + env?: NodeJS.ProcessEnv; + storePath?: string; + sessionKey: string; + channel?: SessionEntry["channel"]; + to?: string; + accountId?: string; + threadId?: string | number; + deliveryContext?: import("../utils/delivery-context.types.js").DeliveryContext; + ctx?: MsgContext; + groupResolution?: import("../config/sessions/types.js").GroupKeyResolution | null; + createIfMissing?: boolean; +}): Promise { + return await updateLastRouteSqlite({ + ...resolveSessionRowOptions(params), + sessionKey: params.sessionKey, + channel: params.channel, + to: params.to, + accountId: params.accountId, + threadId: params.threadId, + deliveryContext: params.deliveryContext, + ctx: params.ctx, + groupResolution: params.groupResolution, + createIfMissing: params.createIfMissing, + }); +} diff --git a/src/plugin-sdk/sqlite-state-lock.test.ts b/src/plugin-sdk/sqlite-state-lock.test.ts index 2619972bddb..8341f2b8b75 100644 --- a/src/plugin-sdk/sqlite-state-lock.test.ts +++ b/src/plugin-sdk/sqlite-state-lock.test.ts @@ -70,4 +70,45 @@ describe("withOpenClawStateLock", () => { expect(row).toBeUndefined(); }); }); + + it("renews active leases so long critical sections are not stolen", async () => { + await withTempDir({ prefix: "openclaw-state-lock-renew-" }, async (dir) => { + const dbPath = path.join(dir, "state.sqlite"); + const order: string[] = []; + let releaseFirst!: () => void; + const firstCanFinish = new Promise((resolve) => { + releaseFirst = resolve; + }); + let first!: Promise; + const firstEntered = new Promise((resolve) => { + first = withOpenClawStateLock( + "shared", + { path: dbPath, stale: 30, retries: FAST_RETRY }, + async () => { + order.push("first-enter"); + resolve(); + await firstCanFinish; + order.push("first-exit"); + }, + ); + }); + + await firstEntered; + await new Promise((resolve) => setTimeout(resolve, 75)); + const second = withOpenClawStateLock( + "shared", + { path: dbPath, stale: 30, retries: FAST_RETRY }, + async () => { + order.push("second-enter"); + }, + ); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(order).toEqual(["first-enter"]); + + releaseFirst(); + await Promise.all([first, second]); + expect(order).toEqual(["first-enter", "first-exit", "second-enter"]); + }); + }); }); diff --git a/src/plugin-sdk/sqlite-state-lock.ts b/src/plugin-sdk/sqlite-state-lock.ts index 1cfc9008293..21e1b2c6f64 100644 --- a/src/plugin-sdk/sqlite-state-lock.ts +++ b/src/plugin-sdk/sqlite-state-lock.ts @@ -176,6 +176,46 @@ function releaseOpenClawStateLock(params: { }, params.options); } +function renewOpenClawStateLock(params: { + key: string; + owner: string; + scope: string; + staleMs: number; + options: OpenClawStateDatabaseOptions; +}): boolean { + const now = Date.now(); + const expiresAt = now + params.staleMs; + return runOpenClawStateWriteTransaction((database) => { + const db = getNodeSqliteKysely(database.db); + const row = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("state_leases") + .select(["owner", "expires_at", "payload_json"]) + .where("scope", "=", params.scope) + .where("lease_key", "=", params.key), + ); + const current = parseLockValue(row); + if (current?.owner !== params.owner) { + return false; + } + executeSqliteQuerySync( + database.db, + db + .updateTable("state_leases") + .set({ + expires_at: expiresAt, + heartbeat_at: now, + payload_json: JSON.stringify({ owner: params.owner, expiresAt }), + updated_at: now, + }) + .where("scope", "=", params.scope) + .where("lease_key", "=", params.key), + ); + return true; + }, params.options); +} + export async function withOpenClawStateLock( key: string, options: OpenClawStateLockOptions, @@ -200,9 +240,21 @@ export async function withOpenClawStateLock( options: databaseOptions, }) ) { + const renewEveryMs = Math.max(1, Math.floor(staleMs / 2)); + const renewal = setInterval(() => { + renewOpenClawStateLock({ + key, + owner, + scope, + staleMs, + options: databaseOptions, + }); + }, renewEveryMs); + renewal.unref?.(); try { return await task(); } finally { + clearInterval(renewal); releaseOpenClawStateLock({ key, owner, scope, options: databaseOptions }); } } diff --git a/src/state/openclaw-state-lock.test.ts b/src/state/openclaw-state-lock.test.ts new file mode 100644 index 00000000000..d5d9cb7514c --- /dev/null +++ b/src/state/openclaw-state-lock.test.ts @@ -0,0 +1,60 @@ +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { withTempDir } from "../test-helpers/temp-dir.js"; +import { closeOpenClawStateDatabaseForTest } from "./openclaw-state-db.js"; +import { withOpenClawStateLock } from "./openclaw-state-lock.js"; + +const FAST_RETRY = { + retries: 100, + factor: 1, + minTimeout: 1, + maxTimeout: 1, + randomize: false, +} as const; + +describe("withOpenClawStateLock", () => { + afterEach(() => { + closeOpenClawStateDatabaseForTest(); + }); + + it("renews active leases so long critical sections are not stolen", async () => { + await withTempDir({ prefix: "openclaw-core-state-lock-renew-" }, async (dir) => { + const dbPath = path.join(dir, "state.sqlite"); + const order: string[] = []; + let releaseFirst!: () => void; + const firstCanFinish = new Promise((resolve) => { + releaseFirst = resolve; + }); + let first!: Promise; + const firstEntered = new Promise((resolve) => { + first = withOpenClawStateLock( + "shared", + { path: dbPath, stale: 30, retries: FAST_RETRY }, + async () => { + order.push("first-enter"); + resolve(); + await firstCanFinish; + order.push("first-exit"); + }, + ); + }); + + await firstEntered; + await new Promise((resolve) => setTimeout(resolve, 75)); + const second = withOpenClawStateLock( + "shared", + { path: dbPath, stale: 30, retries: FAST_RETRY }, + async () => { + order.push("second-enter"); + }, + ); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(order).toEqual(["first-enter"]); + + releaseFirst(); + await Promise.all([first, second]); + expect(order).toEqual(["first-enter", "first-exit", "second-enter"]); + }); + }); +}); diff --git a/src/state/openclaw-state-lock.ts b/src/state/openclaw-state-lock.ts index 74958ca81ed..3c17eb5c984 100644 --- a/src/state/openclaw-state-lock.ts +++ b/src/state/openclaw-state-lock.ts @@ -176,6 +176,45 @@ function releaseOpenClawStateLock(params: { }, params.options); } +function renewOpenClawStateLock(params: { + key: string; + owner: string; + scope: string; + staleMs: number; + options: OpenClawStateDatabaseOptions; +}): boolean { + const now = Date.now(); + const expiresAt = now + params.staleMs; + return runOpenClawStateWriteTransaction((database) => { + const db = getNodeSqliteKysely(database.db); + const row = executeSqliteQueryTakeFirstSync( + database.db, + db + .selectFrom("state_leases") + .select(["owner", "expires_at"]) + .where("scope", "=", params.scope) + .where("lease_key", "=", params.key), + ); + const current = parseLockValue(row); + if (current?.owner !== params.owner) { + return false; + } + executeSqliteQuerySync( + database.db, + db + .updateTable("state_leases") + .set({ + expires_at: expiresAt, + heartbeat_at: now, + updated_at: now, + }) + .where("scope", "=", params.scope) + .where("lease_key", "=", params.key), + ); + return true; + }, params.options); +} + export async function withOpenClawStateLock( key: string, options: OpenClawStateLockOptions, @@ -200,9 +239,21 @@ export async function withOpenClawStateLock( options: databaseOptions, }) ) { + const renewEveryMs = Math.max(1, Math.floor(staleMs / 2)); + const renewal = setInterval(() => { + renewOpenClawStateLock({ + key, + owner, + scope, + staleMs, + options: databaseOptions, + }); + }, renewEveryMs); + renewal.unref?.(); try { return await task(); } finally { + clearInterval(renewal); releaseOpenClawStateLock({ key, owner, scope, options: databaseOptions }); } }