mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-06 06:41:08 +00:00
refactor: isolate session store test cleanup state
This commit is contained in:
51
src/config/sessions/store-lock-state.ts
Normal file
51
src/config/sessions/store-lock-state.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { clearSessionStoreCaches } from "./store-cache.js";
|
||||
|
||||
export type SessionStoreLockTask = {
|
||||
fn: () => Promise<unknown>;
|
||||
resolve: (value: unknown) => void;
|
||||
reject: (reason: unknown) => void;
|
||||
timeoutMs?: number;
|
||||
staleMs: number;
|
||||
};
|
||||
|
||||
export type SessionStoreLockQueue = {
|
||||
running: boolean;
|
||||
pending: SessionStoreLockTask[];
|
||||
drainPromise: Promise<void> | null;
|
||||
};
|
||||
|
||||
export const LOCK_QUEUES = new Map<string, SessionStoreLockQueue>();
|
||||
|
||||
export function clearSessionStoreCacheForTest(): void {
|
||||
clearSessionStoreCaches();
|
||||
for (const queue of LOCK_QUEUES.values()) {
|
||||
for (const task of queue.pending) {
|
||||
task.reject(new Error("session store queue cleared for test"));
|
||||
}
|
||||
}
|
||||
LOCK_QUEUES.clear();
|
||||
}
|
||||
|
||||
export async function drainSessionStoreLockQueuesForTest(): Promise<void> {
|
||||
while (LOCK_QUEUES.size > 0) {
|
||||
const queues = [...LOCK_QUEUES.values()];
|
||||
for (const queue of queues) {
|
||||
for (const task of queue.pending) {
|
||||
task.reject(new Error("session store queue cleared for test"));
|
||||
}
|
||||
queue.pending.length = 0;
|
||||
}
|
||||
const activeDrains = queues.flatMap((queue) =>
|
||||
queue.drainPromise ? [queue.drainPromise] : [],
|
||||
);
|
||||
if (activeDrains.length === 0) {
|
||||
LOCK_QUEUES.clear();
|
||||
return;
|
||||
}
|
||||
await Promise.allSettled(activeDrains);
|
||||
}
|
||||
}
|
||||
|
||||
export function getSessionStoreLockQueueSizeForTest(): number {
|
||||
return LOCK_QUEUES.size;
|
||||
}
|
||||
@@ -18,7 +18,6 @@ import { getFileStatSnapshot } from "../cache-utils.js";
|
||||
import { enforceSessionDiskBudget, type SessionDiskBudgetSweepResult } from "./disk-budget.js";
|
||||
import { deriveSessionMetaPatch } from "./metadata.js";
|
||||
import {
|
||||
clearSessionStoreCaches,
|
||||
dropSessionStoreObjectCache,
|
||||
getSerializedSessionStore,
|
||||
isSessionStoreCacheEnabled,
|
||||
@@ -26,6 +25,14 @@ import {
|
||||
setSerializedSessionStore,
|
||||
writeSessionStoreCache,
|
||||
} from "./store-cache.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
drainSessionStoreLockQueuesForTest,
|
||||
getSessionStoreLockQueueSizeForTest,
|
||||
LOCK_QUEUES,
|
||||
type SessionStoreLockQueue,
|
||||
type SessionStoreLockTask,
|
||||
} from "./store-lock-state.js";
|
||||
import {
|
||||
capEntryCount,
|
||||
getActiveSessionMaintenanceWarning,
|
||||
@@ -43,6 +50,12 @@ import {
|
||||
type SessionEntry,
|
||||
} from "./types.js";
|
||||
|
||||
export {
|
||||
clearSessionStoreCacheForTest,
|
||||
drainSessionStoreLockQueuesForTest,
|
||||
getSessionStoreLockQueueSizeForTest,
|
||||
} from "./store-lock-state.js";
|
||||
|
||||
const log = createSubsystemLogger("sessions/store");
|
||||
let sessionArchiveRuntimePromise: Promise<
|
||||
typeof import("../../gateway/session-archive.runtime.js")
|
||||
@@ -157,16 +170,6 @@ function normalizeSessionStore(store: Record<string, SessionEntry>): void {
|
||||
}
|
||||
}
|
||||
|
||||
export function clearSessionStoreCacheForTest(): void {
|
||||
clearSessionStoreCaches();
|
||||
for (const queue of LOCK_QUEUES.values()) {
|
||||
for (const task of queue.pending) {
|
||||
task.reject(new Error("session store queue cleared for test"));
|
||||
}
|
||||
}
|
||||
LOCK_QUEUES.clear();
|
||||
}
|
||||
|
||||
export function setSessionWriteLockAcquirerForTests(
|
||||
acquirer: typeof acquireSessionWriteLock | null,
|
||||
): void {
|
||||
@@ -177,31 +180,6 @@ export function resetSessionStoreLockRuntimeForTests(): void {
|
||||
sessionWriteLockAcquirerForTests = null;
|
||||
}
|
||||
|
||||
export async function drainSessionStoreLockQueuesForTest(): Promise<void> {
|
||||
while (LOCK_QUEUES.size > 0) {
|
||||
const queues = [...LOCK_QUEUES.values()];
|
||||
for (const queue of queues) {
|
||||
for (const task of queue.pending) {
|
||||
task.reject(new Error("session store queue cleared for test"));
|
||||
}
|
||||
queue.pending.length = 0;
|
||||
}
|
||||
const activeDrains = queues.flatMap((queue) =>
|
||||
queue.drainPromise ? [queue.drainPromise] : [],
|
||||
);
|
||||
if (activeDrains.length === 0) {
|
||||
LOCK_QUEUES.clear();
|
||||
return;
|
||||
}
|
||||
await Promise.allSettled(activeDrains);
|
||||
}
|
||||
}
|
||||
|
||||
/** Expose lock queue size for tests. */
|
||||
export function getSessionStoreLockQueueSizeForTest(): number {
|
||||
return LOCK_QUEUES.size;
|
||||
}
|
||||
|
||||
export async function withSessionStoreLockForTest<T>(
|
||||
storePath: string,
|
||||
fn: () => Promise<T>,
|
||||
@@ -633,22 +611,6 @@ type SessionStoreLockOptions = {
|
||||
const SESSION_STORE_LOCK_MIN_HOLD_MS = 5_000;
|
||||
const SESSION_STORE_LOCK_TIMEOUT_GRACE_MS = 5_000;
|
||||
|
||||
type SessionStoreLockTask = {
|
||||
fn: () => Promise<unknown>;
|
||||
resolve: (value: unknown) => void;
|
||||
reject: (reason: unknown) => void;
|
||||
timeoutMs?: number;
|
||||
staleMs: number;
|
||||
};
|
||||
|
||||
type SessionStoreLockQueue = {
|
||||
running: boolean;
|
||||
pending: SessionStoreLockTask[];
|
||||
drainPromise: Promise<void> | null;
|
||||
};
|
||||
|
||||
const LOCK_QUEUES = new Map<string, SessionStoreLockQueue>();
|
||||
|
||||
function getErrorCode(error: unknown): string | null {
|
||||
if (!error || typeof error !== "object" || !("code" in error)) {
|
||||
return null;
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
import { drainSessionWriteLockStateForTest } from "../agents/session-write-lock.js";
|
||||
import {
|
||||
clearSessionStoreCacheForTest,
|
||||
drainSessionStoreLockQueuesForTest,
|
||||
} from "../config/sessions/store.js";
|
||||
import { clearSessionStoreCaches } from "../config/sessions/store-cache.js";
|
||||
import { drainSessionStoreLockQueuesForTest } from "../config/sessions/store-lock-state.js";
|
||||
import { drainFileLockStateForTest } from "../infra/file-lock.js";
|
||||
|
||||
let fileLockDrainerForTests: typeof drainFileLockStateForTest | null = null;
|
||||
@@ -33,7 +31,7 @@ export function resetSessionStateCleanupRuntimeForTests(): void {
|
||||
|
||||
export async function cleanupSessionStateForTest(): Promise<void> {
|
||||
await (sessionStoreLockQueueDrainerForTests ?? drainSessionStoreLockQueuesForTest)();
|
||||
clearSessionStoreCacheForTest();
|
||||
clearSessionStoreCaches();
|
||||
await (fileLockDrainerForTests ?? drainFileLockStateForTest)();
|
||||
await (sessionWriteLockDrainerForTests ?? drainSessionWriteLockStateForTest)();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user