mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 02:50:35 +00:00
fix(plugin-sdk): restore sqlite compatibility shims
This commit is contained in:
@@ -877,6 +877,10 @@
|
||||
"types": "./dist/plugin-sdk/channel-pairing.d.ts",
|
||||
"default": "./dist/plugin-sdk/channel-pairing.js"
|
||||
},
|
||||
"./plugin-sdk/channel-pairing-paths": {
|
||||
"types": "./dist/plugin-sdk/channel-pairing-paths.d.ts",
|
||||
"default": "./dist/plugin-sdk/channel-pairing-paths.js"
|
||||
},
|
||||
"./plugin-sdk/channel-policy": {
|
||||
"types": "./dist/plugin-sdk/channel-policy.d.ts",
|
||||
"default": "./dist/plugin-sdk/channel-policy.js"
|
||||
@@ -897,6 +901,10 @@
|
||||
"types": "./dist/plugin-sdk/context-visibility-runtime.d.ts",
|
||||
"default": "./dist/plugin-sdk/context-visibility-runtime.js"
|
||||
},
|
||||
"./plugin-sdk/file-lock": {
|
||||
"types": "./dist/plugin-sdk/file-lock.d.ts",
|
||||
"default": "./dist/plugin-sdk/file-lock.js"
|
||||
},
|
||||
"./plugin-sdk/fetch-runtime": {
|
||||
"types": "./dist/plugin-sdk/fetch-runtime.d.ts",
|
||||
"default": "./dist/plugin-sdk/fetch-runtime.js"
|
||||
|
||||
@@ -200,11 +200,13 @@
|
||||
"channel-message-runtime",
|
||||
"channel-outbound",
|
||||
"channel-pairing",
|
||||
"channel-pairing-paths",
|
||||
"channel-policy",
|
||||
"channel-send-result",
|
||||
"channel-route",
|
||||
"channel-targets",
|
||||
"context-visibility-runtime",
|
||||
"file-lock",
|
||||
"fetch-runtime",
|
||||
"runtime-fetch",
|
||||
"response-limit-runtime",
|
||||
|
||||
18
src/plugin-sdk/channel-pairing-paths.ts
Normal file
18
src/plugin-sdk/channel-pairing-paths.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import path from "node:path";
|
||||
import { resolveOAuthDir, resolveStateDir } from "../config/paths.js";
|
||||
import { safeAccountKey, safeChannelKey } from "../pairing/pairing-store-keys.js";
|
||||
import type { PairingChannel } from "../pairing/pairing-store.types.js";
|
||||
|
||||
export function resolveChannelAllowFromPath(
|
||||
channel: PairingChannel,
|
||||
env: NodeJS.ProcessEnv = process.env,
|
||||
accountId?: string,
|
||||
): string {
|
||||
const stateDir = resolveStateDir(env);
|
||||
const base = safeChannelKey(channel);
|
||||
const accountKey = accountId?.trim() ? safeAccountKey(accountId) : null;
|
||||
return path.join(
|
||||
resolveOAuthDir(env, stateDir),
|
||||
accountKey ? `${base}-${accountKey}-allowFrom.json` : `${base}-allowFrom.json`,
|
||||
);
|
||||
}
|
||||
115
src/plugin-sdk/file-lock.ts
Normal file
115
src/plugin-sdk/file-lock.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import "../infra/fs-safe-defaults.js";
|
||||
import {
|
||||
acquireFileLock as acquireFsSafeFileLock,
|
||||
drainFileLockManagerForTest,
|
||||
resetFileLockManagerForTest,
|
||||
} from "@openclaw/fs-safe/file-lock";
|
||||
import { shouldRemoveDeadOwnerOrExpiredLock } from "../infra/stale-lock-file.js";
|
||||
|
||||
export type FileLockOptions = {
|
||||
retries: {
|
||||
retries: number;
|
||||
factor: number;
|
||||
minTimeout: number;
|
||||
maxTimeout: number;
|
||||
randomize?: boolean;
|
||||
};
|
||||
stale: number;
|
||||
};
|
||||
|
||||
export type FileLockHandle = {
|
||||
lockPath: string;
|
||||
release: () => Promise<void>;
|
||||
};
|
||||
|
||||
export const FILE_LOCK_TIMEOUT_ERROR_CODE = "file_lock_timeout";
|
||||
export const FILE_LOCK_STALE_ERROR_CODE = "file_lock_stale";
|
||||
|
||||
export type FileLockTimeoutError = Error & {
|
||||
code: typeof FILE_LOCK_TIMEOUT_ERROR_CODE;
|
||||
lockPath: string;
|
||||
};
|
||||
|
||||
export type FileLockStaleError = Error & {
|
||||
code: typeof FILE_LOCK_STALE_ERROR_CODE;
|
||||
lockPath: string;
|
||||
};
|
||||
|
||||
const FILE_LOCK_MANAGER_KEY = "openclaw.plugin-sdk.file-lock";
|
||||
|
||||
async function shouldReclaimPluginLock(params: {
|
||||
lockPath: string;
|
||||
payload: Record<string, unknown> | null;
|
||||
staleMs: number;
|
||||
nowMs: number;
|
||||
}): Promise<boolean> {
|
||||
return shouldRemoveDeadOwnerOrExpiredLock({
|
||||
payload: params.payload,
|
||||
staleMs: params.staleMs,
|
||||
nowMs: params.nowMs,
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeLockError(err: unknown): never {
|
||||
if ((err as { code?: unknown }).code === FILE_LOCK_TIMEOUT_ERROR_CODE) {
|
||||
throw Object.assign(new Error((err as Error).message), {
|
||||
code: FILE_LOCK_TIMEOUT_ERROR_CODE,
|
||||
lockPath: (err as { lockPath?: string }).lockPath ?? "",
|
||||
}) as FileLockTimeoutError;
|
||||
}
|
||||
if ((err as { code?: unknown }).code === FILE_LOCK_STALE_ERROR_CODE) {
|
||||
throw Object.assign(new Error((err as Error).message), {
|
||||
code: FILE_LOCK_STALE_ERROR_CODE,
|
||||
lockPath: (err as { lockPath?: string }).lockPath ?? "",
|
||||
}) as FileLockStaleError;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
|
||||
export function resetFileLockStateForTest(): void {
|
||||
resetFileLockManagerForTest(FILE_LOCK_MANAGER_KEY, FILE_LOCK_MANAGER_KEY);
|
||||
}
|
||||
|
||||
export async function drainFileLockStateForTest(): Promise<void> {
|
||||
await drainFileLockManagerForTest(FILE_LOCK_MANAGER_KEY, FILE_LOCK_MANAGER_KEY);
|
||||
}
|
||||
|
||||
/** Acquire a re-entrant process-local file lock backed by a `.lock` sidecar file. */
|
||||
export async function acquireFileLock(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
): Promise<FileLockHandle> {
|
||||
try {
|
||||
const lock = await acquireFsSafeFileLock(filePath, {
|
||||
managerKey: FILE_LOCK_MANAGER_KEY,
|
||||
staleMs: options.stale,
|
||||
retry: options.retries,
|
||||
staleRecovery: "remove-if-unchanged",
|
||||
allowReentrant: true,
|
||||
payload: () => ({ pid: process.pid, createdAt: new Date().toISOString() }),
|
||||
shouldReclaim: shouldReclaimPluginLock,
|
||||
shouldRemoveStaleLock: (snapshot) =>
|
||||
shouldRemoveDeadOwnerOrExpiredLock({
|
||||
payload: snapshot.payload,
|
||||
staleMs: options.stale,
|
||||
}),
|
||||
});
|
||||
return { lockPath: lock.lockPath, release: lock.release };
|
||||
} catch (err) {
|
||||
return normalizeLockError(err);
|
||||
}
|
||||
}
|
||||
|
||||
/** Run an async callback while holding a file lock, always releasing the lock afterward. */
|
||||
export async function withFileLock<T>(
|
||||
filePath: string,
|
||||
options: FileLockOptions,
|
||||
fn: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const lock = await acquireFileLock(filePath, options);
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
await lock.release();
|
||||
}
|
||||
}
|
||||
45
src/plugin-sdk/session-store-runtime.test.ts
Normal file
45
src/plugin-sdk/session-store-runtime.test.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
readSessionUpdatedAt,
|
||||
upsertSessionEntry,
|
||||
} from "./session-store-runtime.js";
|
||||
|
||||
describe("session-store-runtime compatibility", () => {
|
||||
it("rejects custom store paths instead of falling back to the default agent", async () => {
|
||||
await withOpenClawTestState(
|
||||
{
|
||||
layout: "state-only",
|
||||
prefix: "openclaw-session-store-compat-",
|
||||
scenario: "minimal",
|
||||
},
|
||||
async (state) => {
|
||||
const customStorePath = path.join(state.path("custom"), "sessions.json");
|
||||
await fs.mkdir(path.dirname(customStorePath), { recursive: true });
|
||||
|
||||
upsertSessionEntry({
|
||||
agentId: "default",
|
||||
env: { ...process.env, OPENCLAW_STATE_DIR: state.stateDir },
|
||||
sessionKey: "discord:default:user:1",
|
||||
entry: {
|
||||
sessionId: "default-session",
|
||||
updatedAt: 123,
|
||||
sessionStartedAt: 123,
|
||||
},
|
||||
});
|
||||
|
||||
expect(() => loadSessionStore(customStorePath)).toThrow(/Custom sessions\.json paths/);
|
||||
expect(
|
||||
readSessionUpdatedAt({
|
||||
agentId: "default",
|
||||
sessionKey: "discord:default:user:1",
|
||||
storePath: customStorePath,
|
||||
}),
|
||||
).toBe(123);
|
||||
},
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -1,8 +1,34 @@
|
||||
// Narrow SQLite session row helpers for channel hot paths.
|
||||
// SQLite session row helpers plus deprecated session-store compatibility shims.
|
||||
|
||||
export { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js";
|
||||
export { resolveSessionRowEntry } from "../config/sessions/store-entry.js";
|
||||
export { resolveAndPersistSessionTranscriptScope } from "../config/sessions/session-scope.js";
|
||||
import path from "node:path";
|
||||
import type { MsgContext } from "../auto-reply/templating.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { loadSqliteSessionEntries } from "../config/sessions/session-entries.sqlite.js";
|
||||
import { normalizeSessionEntries } from "../config/sessions/session-entry-normalize.js";
|
||||
import { resolveAndPersistSessionTranscriptScope } from "../config/sessions/session-scope.js";
|
||||
import { resolveSessionRowEntry } from "../config/sessions/store-entry.js";
|
||||
import {
|
||||
deleteSessionEntry,
|
||||
getSessionEntry,
|
||||
listSessionEntries,
|
||||
patchSessionEntry,
|
||||
readSessionUpdatedAt as readSqliteSessionUpdatedAt,
|
||||
recordSessionMetaFromInbound as recordSessionMetaFromInboundSqlite,
|
||||
updateLastRoute as updateLastRouteSqlite,
|
||||
upsertSessionEntry,
|
||||
} from "../config/sessions/store.js";
|
||||
import type { SessionEntry, SessionScope } from "../config/sessions/types.js";
|
||||
import {
|
||||
DEFAULT_AGENT_ID,
|
||||
normalizeAgentId,
|
||||
resolveAgentIdFromSessionKey,
|
||||
} from "../routing/session-key.js";
|
||||
import { closeOpenClawAgentDatabasesForTest } from "../state/openclaw-agent-db.js";
|
||||
import { resolveUserPath } from "../utils.js";
|
||||
|
||||
export { closeOpenClawAgentDatabasesForTest };
|
||||
export { resolveSessionRowEntry };
|
||||
export { resolveAndPersistSessionTranscriptScope };
|
||||
export { readLatestAssistantTextFromSessionTranscript } from "../config/sessions/transcript.js";
|
||||
export { resolveSessionKey } from "../config/sessions/session-key.js";
|
||||
export { resolveGroupSessionKey } from "../config/sessions/group.js";
|
||||
@@ -18,11 +44,8 @@ export {
|
||||
getSessionEntry,
|
||||
listSessionEntries,
|
||||
patchSessionEntry,
|
||||
readSessionUpdatedAt,
|
||||
recordSessionMetaFromInbound,
|
||||
updateLastRoute,
|
||||
upsertSessionEntry,
|
||||
} from "../config/sessions/store.js";
|
||||
};
|
||||
export {
|
||||
evaluateSessionFreshness,
|
||||
resolveChannelResetConfig,
|
||||
@@ -30,4 +53,288 @@ export {
|
||||
resolveSessionResetType,
|
||||
resolveThreadFlag,
|
||||
} from "../config/sessions/reset.js";
|
||||
export type { SessionEntry, SessionScope } from "../config/sessions/types.js";
|
||||
export type { SessionEntry, SessionScope };
|
||||
|
||||
type SessionRowOptions = {
|
||||
agentId: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
};
|
||||
|
||||
type SaveSessionStoreOptions = {
|
||||
skipMaintenance?: boolean;
|
||||
activeSessionKey?: string;
|
||||
allowDropAcpMetaSessionKeys?: string[];
|
||||
onWarn?: (warning: unknown) => void | Promise<void>;
|
||||
onMaintenanceApplied?: (report: unknown) => void | Promise<void>;
|
||||
maintenanceOverride?: unknown;
|
||||
maintenanceConfig?: unknown;
|
||||
};
|
||||
|
||||
type CompatSessionEntry = SessionEntry & { sessionFile?: string };
|
||||
|
||||
const SAFE_SESSION_ID_RE = /^[a-z0-9][a-z0-9._-]{0,127}$/i;
|
||||
|
||||
function optionsWithEnv(agentId: string, env?: NodeJS.ProcessEnv): SessionRowOptions {
|
||||
return env ? { agentId, env } : { agentId };
|
||||
}
|
||||
|
||||
function parseSessionStorePath(storePath: string): { agentId: string; stateDir: string } | null {
|
||||
const resolved = path.resolve(storePath);
|
||||
if (path.basename(resolved) !== "sessions.json") {
|
||||
return null;
|
||||
}
|
||||
const sessionsDir = path.dirname(resolved);
|
||||
if (path.basename(sessionsDir) !== "sessions") {
|
||||
return null;
|
||||
}
|
||||
const agentDir = path.dirname(sessionsDir);
|
||||
const agentsDir = path.dirname(agentDir);
|
||||
if (path.basename(agentsDir) !== "agents") {
|
||||
return null;
|
||||
}
|
||||
const agentId = path.basename(agentDir);
|
||||
if (!agentId) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
agentId: normalizeAgentId(agentId),
|
||||
stateDir: path.dirname(agentsDir),
|
||||
};
|
||||
}
|
||||
|
||||
function resolveSessionRowOptionsFromStorePath(
|
||||
storePath: string,
|
||||
fallback?: { agentId?: string; env?: NodeJS.ProcessEnv },
|
||||
): SessionRowOptions {
|
||||
const parsed = parseSessionStorePath(storePath);
|
||||
if (!parsed) {
|
||||
if (fallback?.agentId) {
|
||||
return optionsWithEnv(normalizeAgentId(fallback.agentId), fallback.env);
|
||||
}
|
||||
throw new Error(
|
||||
"Custom sessions.json paths are not supported by the SQLite session-store compatibility API. Pass a canonical OpenClaw sessions path or use the row APIs with an explicit agentId.",
|
||||
);
|
||||
}
|
||||
return optionsWithEnv(parsed.agentId, {
|
||||
...process.env,
|
||||
OPENCLAW_STATE_DIR: parsed.stateDir,
|
||||
});
|
||||
}
|
||||
|
||||
function resolveSessionRowOptions(params: {
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
storePath?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}): SessionRowOptions {
|
||||
if (params.storePath) {
|
||||
const resolved = resolveSessionRowOptionsFromStorePath(params.storePath, params);
|
||||
return params.env
|
||||
? optionsWithEnv(resolved.agentId, {
|
||||
...params.env,
|
||||
OPENCLAW_STATE_DIR: resolved.env?.OPENCLAW_STATE_DIR,
|
||||
})
|
||||
: resolved;
|
||||
}
|
||||
const agentId =
|
||||
params.agentId ??
|
||||
(params.sessionKey ? resolveAgentIdFromSessionKey(params.sessionKey) : undefined) ??
|
||||
DEFAULT_AGENT_ID;
|
||||
return optionsWithEnv(normalizeAgentId(agentId), params.env);
|
||||
}
|
||||
|
||||
export function clearSessionStoreCacheForTest(): void {
|
||||
closeOpenClawAgentDatabasesForTest();
|
||||
}
|
||||
|
||||
export function resolveSessionStoreEntry(params: {
|
||||
store: Record<string, SessionEntry>;
|
||||
sessionKey: string;
|
||||
}): {
|
||||
normalizedKey: string;
|
||||
existing: SessionEntry | undefined;
|
||||
legacyKeys: string[];
|
||||
} {
|
||||
const resolved = resolveSessionRowEntry({ entries: params.store, sessionKey: params.sessionKey });
|
||||
return { ...resolved, legacyKeys: [] };
|
||||
}
|
||||
|
||||
export function resolveStorePath(
|
||||
store?: string,
|
||||
opts?: { agentId?: string; env?: NodeJS.ProcessEnv },
|
||||
): string {
|
||||
const agentId = normalizeAgentId(opts?.agentId ?? DEFAULT_AGENT_ID);
|
||||
const env = opts?.env ?? process.env;
|
||||
if (!store) {
|
||||
return path.join(resolveStateDir(env), "agents", agentId, "sessions", "sessions.json");
|
||||
}
|
||||
return path.resolve(resolveUserPath(store.replaceAll("{agentId}", agentId), env));
|
||||
}
|
||||
|
||||
export function resolveSessionTranscriptPathInDir(
|
||||
sessionId: string,
|
||||
sessionsDir: string,
|
||||
topicId?: string | number,
|
||||
): string {
|
||||
const trimmed = sessionId.trim();
|
||||
if (!SAFE_SESSION_ID_RE.test(trimmed)) {
|
||||
throw new Error(`Invalid session ID: ${sessionId}`);
|
||||
}
|
||||
const safeTopicId =
|
||||
typeof topicId === "string"
|
||||
? encodeURIComponent(topicId)
|
||||
: typeof topicId === "number"
|
||||
? String(topicId)
|
||||
: undefined;
|
||||
const fileName =
|
||||
safeTopicId === undefined ? `${trimmed}.jsonl` : `${trimmed}-topic-${safeTopicId}.jsonl`;
|
||||
return path.resolve(sessionsDir, fileName);
|
||||
}
|
||||
|
||||
export function loadSessionStore(storePath: string): Record<string, SessionEntry> {
|
||||
return loadSqliteSessionEntries(resolveSessionRowOptionsFromStorePath(storePath));
|
||||
}
|
||||
|
||||
export async function saveSessionStore(
|
||||
storePath: string,
|
||||
store: Record<string, SessionEntry>,
|
||||
_opts?: SaveSessionStoreOptions,
|
||||
): Promise<void> {
|
||||
normalizeSessionEntries(store);
|
||||
const options = resolveSessionRowOptionsFromStorePath(storePath);
|
||||
const existing = loadSqliteSessionEntries(options);
|
||||
for (const sessionKey of Object.keys(existing)) {
|
||||
if (!Object.prototype.hasOwnProperty.call(store, sessionKey)) {
|
||||
deleteSessionEntry({ ...options, sessionKey });
|
||||
}
|
||||
}
|
||||
for (const [sessionKey, entry] of Object.entries(store)) {
|
||||
upsertSessionEntry({ ...options, sessionKey, entry });
|
||||
}
|
||||
}
|
||||
|
||||
export async function updateSessionStore<T>(
|
||||
storePath: string,
|
||||
mutator: (store: Record<string, SessionEntry>) => Promise<T> | T,
|
||||
opts?: SaveSessionStoreOptions,
|
||||
): Promise<T> {
|
||||
const store = loadSessionStore(storePath);
|
||||
const result = await mutator(store);
|
||||
await saveSessionStore(storePath, store, opts);
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function updateSessionStoreEntry(params: {
|
||||
storePath: string;
|
||||
sessionKey: string;
|
||||
update: (entry: SessionEntry) => Promise<Partial<SessionEntry> | null>;
|
||||
}): Promise<SessionEntry | null> {
|
||||
const options = resolveSessionRowOptionsFromStorePath(params.storePath);
|
||||
return await patchSessionEntry({
|
||||
...options,
|
||||
sessionKey: params.sessionKey,
|
||||
update: params.update,
|
||||
});
|
||||
}
|
||||
|
||||
export async function resolveAndPersistSessionFile(params: {
|
||||
sessionId: string;
|
||||
sessionKey: string;
|
||||
sessionStore: Record<string, CompatSessionEntry>;
|
||||
storePath: string;
|
||||
sessionEntry?: CompatSessionEntry;
|
||||
agentId?: string;
|
||||
sessionsDir?: string;
|
||||
fallbackSessionFile?: string;
|
||||
activeSessionKey?: string;
|
||||
maintenanceConfig?: unknown;
|
||||
}): Promise<{ sessionFile: string; sessionEntry: CompatSessionEntry }> {
|
||||
const now = Date.now();
|
||||
const baseEntry = params.sessionEntry ??
|
||||
params.sessionStore[params.sessionKey] ?? {
|
||||
sessionId: params.sessionId,
|
||||
updatedAt: now,
|
||||
sessionStartedAt: now,
|
||||
};
|
||||
const sessionFile =
|
||||
params.fallbackSessionFile?.trim() ??
|
||||
resolveSessionTranscriptPathInDir(
|
||||
params.sessionId,
|
||||
params.sessionsDir ?? path.dirname(path.resolve(params.storePath)),
|
||||
);
|
||||
const sessionEntry: CompatSessionEntry = {
|
||||
...baseEntry,
|
||||
sessionId: params.sessionId,
|
||||
sessionFile,
|
||||
updatedAt: now,
|
||||
sessionStartedAt:
|
||||
baseEntry.sessionId === params.sessionId ? (baseEntry.sessionStartedAt ?? now) : now,
|
||||
};
|
||||
params.sessionStore[params.sessionKey] = sessionEntry;
|
||||
upsertSessionEntry({
|
||||
...resolveSessionRowOptions({
|
||||
storePath: params.storePath,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
}),
|
||||
sessionKey: params.sessionKey,
|
||||
entry: sessionEntry as SessionEntry,
|
||||
});
|
||||
return { sessionFile, sessionEntry };
|
||||
}
|
||||
|
||||
export function readSessionUpdatedAt(params: {
|
||||
agentId?: string;
|
||||
storePath?: string;
|
||||
sessionKey: string;
|
||||
}): number | undefined {
|
||||
return readSqliteSessionUpdatedAt({
|
||||
...resolveSessionRowOptions(params),
|
||||
sessionKey: params.sessionKey,
|
||||
});
|
||||
}
|
||||
|
||||
export async function recordSessionMetaFromInbound(params: {
|
||||
agentId?: string;
|
||||
storePath?: string;
|
||||
sessionKey: string;
|
||||
ctx: MsgContext;
|
||||
groupResolution?: import("../config/sessions/types.js").GroupKeyResolution | null;
|
||||
createIfMissing?: boolean;
|
||||
}): Promise<SessionEntry | null> {
|
||||
return await recordSessionMetaFromInboundSqlite({
|
||||
...resolveSessionRowOptions(params),
|
||||
sessionKey: params.sessionKey,
|
||||
ctx: params.ctx,
|
||||
groupResolution: params.groupResolution,
|
||||
createIfMissing: params.createIfMissing,
|
||||
});
|
||||
}
|
||||
|
||||
export async function updateLastRoute(params: {
|
||||
agentId?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
storePath?: string;
|
||||
sessionKey: string;
|
||||
channel?: SessionEntry["channel"];
|
||||
to?: string;
|
||||
accountId?: string;
|
||||
threadId?: string | number;
|
||||
deliveryContext?: import("../utils/delivery-context.types.js").DeliveryContext;
|
||||
ctx?: MsgContext;
|
||||
groupResolution?: import("../config/sessions/types.js").GroupKeyResolution | null;
|
||||
createIfMissing?: boolean;
|
||||
}): Promise<SessionEntry | null> {
|
||||
return await updateLastRouteSqlite({
|
||||
...resolveSessionRowOptions(params),
|
||||
sessionKey: params.sessionKey,
|
||||
channel: params.channel,
|
||||
to: params.to,
|
||||
accountId: params.accountId,
|
||||
threadId: params.threadId,
|
||||
deliveryContext: params.deliveryContext,
|
||||
ctx: params.ctx,
|
||||
groupResolution: params.groupResolution,
|
||||
createIfMissing: params.createIfMissing,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -70,4 +70,45 @@ describe("withOpenClawStateLock", () => {
|
||||
expect(row).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
it("renews active leases so long critical sections are not stolen", async () => {
|
||||
await withTempDir({ prefix: "openclaw-state-lock-renew-" }, async (dir) => {
|
||||
const dbPath = path.join(dir, "state.sqlite");
|
||||
const order: string[] = [];
|
||||
let releaseFirst!: () => void;
|
||||
const firstCanFinish = new Promise<void>((resolve) => {
|
||||
releaseFirst = resolve;
|
||||
});
|
||||
let first!: Promise<void>;
|
||||
const firstEntered = new Promise<void>((resolve) => {
|
||||
first = withOpenClawStateLock(
|
||||
"shared",
|
||||
{ path: dbPath, stale: 30, retries: FAST_RETRY },
|
||||
async () => {
|
||||
order.push("first-enter");
|
||||
resolve();
|
||||
await firstCanFinish;
|
||||
order.push("first-exit");
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
await firstEntered;
|
||||
await new Promise((resolve) => setTimeout(resolve, 75));
|
||||
const second = withOpenClawStateLock(
|
||||
"shared",
|
||||
{ path: dbPath, stale: 30, retries: FAST_RETRY },
|
||||
async () => {
|
||||
order.push("second-enter");
|
||||
},
|
||||
);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
expect(order).toEqual(["first-enter"]);
|
||||
|
||||
releaseFirst();
|
||||
await Promise.all([first, second]);
|
||||
expect(order).toEqual(["first-enter", "first-exit", "second-enter"]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -176,6 +176,46 @@ function releaseOpenClawStateLock(params: {
|
||||
}, params.options);
|
||||
}
|
||||
|
||||
function renewOpenClawStateLock(params: {
|
||||
key: string;
|
||||
owner: string;
|
||||
scope: string;
|
||||
staleMs: number;
|
||||
options: OpenClawStateDatabaseOptions;
|
||||
}): boolean {
|
||||
const now = Date.now();
|
||||
const expiresAt = now + params.staleMs;
|
||||
return runOpenClawStateWriteTransaction((database) => {
|
||||
const db = getNodeSqliteKysely<LockDatabase>(database.db);
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("state_leases")
|
||||
.select(["owner", "expires_at", "payload_json"])
|
||||
.where("scope", "=", params.scope)
|
||||
.where("lease_key", "=", params.key),
|
||||
);
|
||||
const current = parseLockValue(row);
|
||||
if (current?.owner !== params.owner) {
|
||||
return false;
|
||||
}
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
db
|
||||
.updateTable("state_leases")
|
||||
.set({
|
||||
expires_at: expiresAt,
|
||||
heartbeat_at: now,
|
||||
payload_json: JSON.stringify({ owner: params.owner, expiresAt }),
|
||||
updated_at: now,
|
||||
})
|
||||
.where("scope", "=", params.scope)
|
||||
.where("lease_key", "=", params.key),
|
||||
);
|
||||
return true;
|
||||
}, params.options);
|
||||
}
|
||||
|
||||
export async function withOpenClawStateLock<T>(
|
||||
key: string,
|
||||
options: OpenClawStateLockOptions,
|
||||
@@ -200,9 +240,21 @@ export async function withOpenClawStateLock<T>(
|
||||
options: databaseOptions,
|
||||
})
|
||||
) {
|
||||
const renewEveryMs = Math.max(1, Math.floor(staleMs / 2));
|
||||
const renewal = setInterval(() => {
|
||||
renewOpenClawStateLock({
|
||||
key,
|
||||
owner,
|
||||
scope,
|
||||
staleMs,
|
||||
options: databaseOptions,
|
||||
});
|
||||
}, renewEveryMs);
|
||||
renewal.unref?.();
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
clearInterval(renewal);
|
||||
releaseOpenClawStateLock({ key, owner, scope, options: databaseOptions });
|
||||
}
|
||||
}
|
||||
|
||||
60
src/state/openclaw-state-lock.test.ts
Normal file
60
src/state/openclaw-state-lock.test.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import path from "node:path";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { withTempDir } from "../test-helpers/temp-dir.js";
|
||||
import { closeOpenClawStateDatabaseForTest } from "./openclaw-state-db.js";
|
||||
import { withOpenClawStateLock } from "./openclaw-state-lock.js";
|
||||
|
||||
const FAST_RETRY = {
|
||||
retries: 100,
|
||||
factor: 1,
|
||||
minTimeout: 1,
|
||||
maxTimeout: 1,
|
||||
randomize: false,
|
||||
} as const;
|
||||
|
||||
describe("withOpenClawStateLock", () => {
|
||||
afterEach(() => {
|
||||
closeOpenClawStateDatabaseForTest();
|
||||
});
|
||||
|
||||
it("renews active leases so long critical sections are not stolen", async () => {
|
||||
await withTempDir({ prefix: "openclaw-core-state-lock-renew-" }, async (dir) => {
|
||||
const dbPath = path.join(dir, "state.sqlite");
|
||||
const order: string[] = [];
|
||||
let releaseFirst!: () => void;
|
||||
const firstCanFinish = new Promise<void>((resolve) => {
|
||||
releaseFirst = resolve;
|
||||
});
|
||||
let first!: Promise<void>;
|
||||
const firstEntered = new Promise<void>((resolve) => {
|
||||
first = withOpenClawStateLock(
|
||||
"shared",
|
||||
{ path: dbPath, stale: 30, retries: FAST_RETRY },
|
||||
async () => {
|
||||
order.push("first-enter");
|
||||
resolve();
|
||||
await firstCanFinish;
|
||||
order.push("first-exit");
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
await firstEntered;
|
||||
await new Promise((resolve) => setTimeout(resolve, 75));
|
||||
const second = withOpenClawStateLock(
|
||||
"shared",
|
||||
{ path: dbPath, stale: 30, retries: FAST_RETRY },
|
||||
async () => {
|
||||
order.push("second-enter");
|
||||
},
|
||||
);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
expect(order).toEqual(["first-enter"]);
|
||||
|
||||
releaseFirst();
|
||||
await Promise.all([first, second]);
|
||||
expect(order).toEqual(["first-enter", "first-exit", "second-enter"]);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -176,6 +176,45 @@ function releaseOpenClawStateLock(params: {
|
||||
}, params.options);
|
||||
}
|
||||
|
||||
function renewOpenClawStateLock(params: {
|
||||
key: string;
|
||||
owner: string;
|
||||
scope: string;
|
||||
staleMs: number;
|
||||
options: OpenClawStateDatabaseOptions;
|
||||
}): boolean {
|
||||
const now = Date.now();
|
||||
const expiresAt = now + params.staleMs;
|
||||
return runOpenClawStateWriteTransaction((database) => {
|
||||
const db = getNodeSqliteKysely<LockDatabase>(database.db);
|
||||
const row = executeSqliteQueryTakeFirstSync(
|
||||
database.db,
|
||||
db
|
||||
.selectFrom("state_leases")
|
||||
.select(["owner", "expires_at"])
|
||||
.where("scope", "=", params.scope)
|
||||
.where("lease_key", "=", params.key),
|
||||
);
|
||||
const current = parseLockValue(row);
|
||||
if (current?.owner !== params.owner) {
|
||||
return false;
|
||||
}
|
||||
executeSqliteQuerySync(
|
||||
database.db,
|
||||
db
|
||||
.updateTable("state_leases")
|
||||
.set({
|
||||
expires_at: expiresAt,
|
||||
heartbeat_at: now,
|
||||
updated_at: now,
|
||||
})
|
||||
.where("scope", "=", params.scope)
|
||||
.where("lease_key", "=", params.key),
|
||||
);
|
||||
return true;
|
||||
}, params.options);
|
||||
}
|
||||
|
||||
export async function withOpenClawStateLock<T>(
|
||||
key: string,
|
||||
options: OpenClawStateLockOptions,
|
||||
@@ -200,9 +239,21 @@ export async function withOpenClawStateLock<T>(
|
||||
options: databaseOptions,
|
||||
})
|
||||
) {
|
||||
const renewEveryMs = Math.max(1, Math.floor(staleMs / 2));
|
||||
const renewal = setInterval(() => {
|
||||
renewOpenClawStateLock({
|
||||
key,
|
||||
owner,
|
||||
scope,
|
||||
staleMs,
|
||||
options: databaseOptions,
|
||||
});
|
||||
}, renewEveryMs);
|
||||
renewal.unref?.();
|
||||
try {
|
||||
return await task();
|
||||
} finally {
|
||||
clearInterval(renewal);
|
||||
releaseOpenClawStateLock({ key, owner, scope, options: databaseOptions });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user