perf: extract memory sync control helpers

This commit is contained in:
Peter Steinberger
2026-04-06 20:20:54 +01:00
parent 4ad1d96e5d
commit 8d2daf7ef2
4 changed files with 320 additions and 221 deletions

View File

@@ -0,0 +1,15 @@
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import { ensureDir, requireNodeSqlite } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
export function openMemoryDatabaseAtPath(dbPath: string, allowExtension: boolean): DatabaseSync {
const dir = path.dirname(dbPath);
ensureDir(dir);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(dbPath, { allowExtension });
// busy_timeout is per-connection and resets to 0 on restart.
// Set it on every open so concurrent processes retry instead of
// failing immediately with SQLITE_BUSY.
db.exec("PRAGMA busy_timeout = 5000");
return db;
}

View File

@@ -0,0 +1,189 @@
import type { DatabaseSync } from "node:sqlite";
import {
createSubsystemLogger,
type OpenClawConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import type { MemorySyncProgressUpdate } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
const log = createSubsystemLogger("memory");
export type MemoryReadonlyRecoveryState = {
closed: boolean;
db: DatabaseSync;
vectorReady: Promise<boolean> | null;
vector: {
enabled: boolean;
available: boolean | null;
extensionPath?: string;
loadError?: string;
dims?: number;
};
readonlyRecoveryAttempts: number;
readonlyRecoverySuccesses: number;
readonlyRecoveryFailures: number;
readonlyRecoveryLastError?: string;
runSync: (params?: {
reason?: string;
force?: boolean;
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
}) => Promise<void>;
openDatabase: () => DatabaseSync;
ensureSchema: () => void;
readMeta: () => { vectorDims?: number } | undefined;
};
export function isMemoryReadonlyDbError(err: unknown): boolean {
const readonlyPattern =
/attempt to write a readonly database|database is read-only|SQLITE_READONLY/i;
const messages = new Set<string>();
const pushValue = (value: unknown): void => {
if (typeof value !== "string") {
return;
}
const normalized = value.trim();
if (!normalized) {
return;
}
messages.add(normalized);
};
pushValue(err instanceof Error ? err.message : String(err));
if (err && typeof err === "object") {
const record = err as Record<string, unknown>;
pushValue(record.message);
pushValue(record.code);
pushValue(record.name);
if (record.cause && typeof record.cause === "object") {
const cause = record.cause as Record<string, unknown>;
pushValue(cause.message);
pushValue(cause.code);
pushValue(cause.name);
}
}
return [...messages].some((value) => readonlyPattern.test(value));
}
export function extractMemoryErrorReason(err: unknown): string {
if (err instanceof Error && err.message.trim()) {
return err.message;
}
if (err && typeof err === "object") {
const record = err as Record<string, unknown>;
if (typeof record.message === "string" && record.message.trim()) {
return record.message;
}
if (typeof record.code === "string" && record.code.trim()) {
return record.code;
}
}
return String(err);
}
export async function runMemorySyncWithReadonlyRecovery(
state: MemoryReadonlyRecoveryState,
params?: {
reason?: string;
force?: boolean;
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
},
): Promise<void> {
try {
await state.runSync(params);
return;
} catch (err) {
if (!isMemoryReadonlyDbError(err) || state.closed) {
throw err;
}
const reason = extractMemoryErrorReason(err);
state.readonlyRecoveryAttempts += 1;
state.readonlyRecoveryLastError = reason;
log.warn(`memory sync readonly handle detected; reopening sqlite connection`, { reason });
try {
state.db.close();
} catch {}
state.db = state.openDatabase();
state.vectorReady = null;
state.vector.available = null;
state.vector.loadError = undefined;
state.ensureSchema();
const meta = state.readMeta();
state.vector.dims = meta?.vectorDims;
try {
await state.runSync(params);
state.readonlyRecoverySuccesses += 1;
} catch (retryErr) {
state.readonlyRecoveryFailures += 1;
throw retryErr;
}
}
}
export function enqueueMemoryTargetedSessionSync(
state: {
closed: boolean;
syncing: Promise<void> | null;
queuedSessionFiles: Set<string>;
queuedSessionSync: Promise<void> | null;
sync: (params?: {
reason?: string;
force?: boolean;
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
}) => Promise<void>;
},
sessionFiles?: string[],
): Promise<void> {
for (const sessionFile of sessionFiles ?? []) {
const trimmed = sessionFile.trim();
if (trimmed) {
state.queuedSessionFiles.add(trimmed);
}
}
if (state.queuedSessionFiles.size === 0) {
return state.syncing ?? Promise.resolve();
}
if (!state.queuedSessionSync) {
state.queuedSessionSync = (async () => {
try {
await state.syncing?.catch(() => undefined);
while (!state.closed && state.queuedSessionFiles.size > 0) {
const queuedSessionFiles = Array.from(state.queuedSessionFiles);
state.queuedSessionFiles.clear();
await state.sync({
reason: "queued-session-files",
sessionFiles: queuedSessionFiles,
});
}
} finally {
state.queuedSessionSync = null;
}
})();
}
return state.queuedSessionSync;
}
export function _createMemorySyncControlConfigForTests(
workspaceDir: string,
indexPath: string,
): OpenClawConfig {
return {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "mock-embed",
store: { path: indexPath, vector: { enabled: false } },
cache: { enabled: false },
query: { minScore: 0, hybrid: { enabled: false } },
sync: { watch: false, onSessionStart: false, onSearch: false },
},
},
list: [{ id: "main", default: true }],
},
} as OpenClawConfig;
}

View File

@@ -4,26 +4,18 @@ import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { openMemoryDatabaseAtPath } from "./manager-sync-ops.js";
import { runMemorySyncWithReadonlyRecovery } from "./manager.js";
import { openMemoryDatabaseAtPath } from "./manager-db.js";
import {
_createMemorySyncControlConfigForTests,
enqueueMemoryTargetedSessionSync,
runMemorySyncWithReadonlyRecovery,
type MemoryReadonlyRecoveryState,
} from "./manager-sync-control.js";
type ReadonlyRecoveryHarness = {
closed: boolean;
type ReadonlyRecoveryHarness = MemoryReadonlyRecoveryState & {
syncing: Promise<void> | null;
queuedSessionFiles: Set<string>;
queuedSessionSync: Promise<void> | null;
db: DatabaseSync;
vectorReady: Promise<boolean> | null;
vector: {
enabled: boolean;
available: boolean | null;
loadError?: string;
dims?: number;
};
readonlyRecoveryAttempts: number;
readonlyRecoverySuccesses: number;
readonlyRecoveryFailures: number;
readonlyRecoveryLastError?: string;
ensureProviderInitialized: ReturnType<typeof vi.fn>;
enqueueTargetedSessionSync: ReturnType<typeof vi.fn>;
runSync: ReturnType<typeof vi.fn>;
@@ -37,22 +29,7 @@ describe("memory manager readonly recovery", () => {
let indexPath = "";
function _createMemoryConfig(): OpenClawConfig {
return {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "mock-embed",
store: { path: indexPath, vector: { enabled: false } },
cache: { enabled: false },
query: { minScore: 0, hybrid: { enabled: false } },
sync: { watch: false, onSessionStart: false, onSearch: false },
},
},
list: [{ id: "main", default: true }],
},
} as OpenClawConfig;
return _createMemorySyncControlConfigForTests(workspaceDir, indexPath);
}
function createReadonlyRecoveryHarness() {
@@ -97,10 +74,7 @@ describe("memory manager readonly recovery", () => {
harness: ReadonlyRecoveryHarness,
params?: { reason?: string; force?: boolean; sessionFiles?: string[] },
) {
return await runMemorySyncWithReadonlyRecovery(
harness as unknown as Parameters<typeof runMemorySyncWithReadonlyRecovery>[0],
params,
);
return await runMemorySyncWithReadonlyRecovery(harness, params);
}
function expectReadonlyRecoveryStatus(
@@ -188,4 +162,91 @@ describe("memory manager readonly recovery", () => {
expect(busyTimeout).toBe(5000);
db.close();
});
it("queues targeted session files behind an in-flight sync", async () => {
let releaseSync = () => {};
const pendingSync = new Promise<void>((resolve) => {
releaseSync = () => resolve();
});
const harness = {
closed: false,
syncing: pendingSync,
queuedSessionFiles: new Set<string>(),
queuedSessionSync: null as Promise<void> | null,
sync: vi.fn(async () => {}),
};
const queued = enqueueMemoryTargetedSessionSync(harness, [
" /tmp/first.jsonl ",
"",
"/tmp/second.jsonl",
]);
expect(harness.sync).not.toHaveBeenCalled();
releaseSync();
await queued;
expect(harness.sync).toHaveBeenCalledTimes(1);
expect(harness.sync).toHaveBeenCalledWith({
reason: "queued-session-files",
sessionFiles: ["/tmp/first.jsonl", "/tmp/second.jsonl"],
});
expect(harness.queuedSessionSync).toBeNull();
});
it("merges repeated queued requests while the active sync is still running", async () => {
let releaseSync = () => {};
const pendingSync = new Promise<void>((resolve) => {
releaseSync = () => resolve();
});
const harness = {
closed: false,
syncing: pendingSync,
queuedSessionFiles: new Set<string>(),
queuedSessionSync: null as Promise<void> | null,
sync: vi.fn(async () => {}),
};
const first = enqueueMemoryTargetedSessionSync(harness, [
"/tmp/first.jsonl",
"/tmp/second.jsonl",
]);
const second = enqueueMemoryTargetedSessionSync(harness, [
"/tmp/second.jsonl",
"/tmp/third.jsonl",
]);
expect(first).toBe(second);
releaseSync();
await second;
expect(harness.sync).toHaveBeenCalledTimes(1);
expect(harness.sync).toHaveBeenCalledWith({
reason: "queued-session-files",
sessionFiles: ["/tmp/first.jsonl", "/tmp/second.jsonl", "/tmp/third.jsonl"],
});
});
it("falls back to the active sync when no usable session files were queued", async () => {
let releaseSync = () => {};
const pendingSync = new Promise<void>((resolve) => {
releaseSync = () => resolve();
});
const harness = {
closed: false,
syncing: pendingSync,
queuedSessionFiles: new Set<string>(),
queuedSessionSync: null as Promise<void> | null,
sync: vi.fn(async () => {}),
};
const queued = enqueueMemoryTargetedSessionSync(harness, ["", " "]);
expect(queued).toBe(pendingSync);
releaseSync();
await queued;
expect(harness.sync).not.toHaveBeenCalled();
});
});

View File

@@ -1,10 +1,8 @@
import type { DatabaseSync } from "node:sqlite";
import { type FSWatcher } from "chokidar";
import {
resolveAgentDir,
resolveAgentWorkspaceDir,
resolveMemorySearchConfig,
createSubsystemLogger,
type OpenClawConfig,
type ResolvedMemorySearchConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
@@ -35,7 +33,18 @@ import {
} from "./manager-cache.js";
import { MemoryManagerEmbeddingOps } from "./manager-embedding-ops.js";
import { searchKeyword, searchVector } from "./manager-search.js";
import { resolveInitialMemoryDirty, resolveStatusProviderInfo } from "./manager-status-state.js";
import {
collectMemoryStatusAggregate,
resolveInitialMemoryDirty,
resolveStatusProviderInfo,
} from "./manager-status-state.js";
import {
enqueueMemoryTargetedSessionSync,
extractMemoryErrorReason,
isMemoryReadonlyDbError,
runMemorySyncWithReadonlyRecovery,
type MemoryReadonlyRecoveryState,
} from "./manager-sync-control.js";
const SNIPPET_MAX_CHARS = 700;
const VECTOR_TABLE = "chunks_vec";
const FTS_TABLE = "chunks_fts";
@@ -44,126 +53,8 @@ const BATCH_FAILURE_LIMIT = 2;
const MEMORY_INDEX_MANAGER_CACHE_KEY = Symbol.for("openclaw.memoryIndexManagerCache");
const log = createSubsystemLogger("memory");
const { cache: INDEX_CACHE, pending: INDEX_CACHE_PENDING } =
resolveSingletonManagedCache<MemoryIndexManager>(MEMORY_INDEX_MANAGER_CACHE_KEY);
type MemoryReadonlyRecoveryState = {
closed: boolean;
db: DatabaseSync;
vectorReady: Promise<boolean> | null;
vector: {
enabled: boolean;
available: boolean | null;
extensionPath?: string;
loadError?: string;
dims?: number;
};
readonlyRecoveryAttempts: number;
readonlyRecoverySuccesses: number;
readonlyRecoveryFailures: number;
readonlyRecoveryLastError?: string;
runSync: (params?: {
reason?: string;
force?: boolean;
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
}) => Promise<void>;
openDatabase: () => DatabaseSync;
ensureSchema: () => void;
readMeta: () => { vectorDims?: number } | undefined;
};
export function isMemoryReadonlyDbError(err: unknown): boolean {
const readonlyPattern =
/attempt to write a readonly database|database is read-only|SQLITE_READONLY/i;
const messages = new Set<string>();
const pushValue = (value: unknown): void => {
if (typeof value !== "string") {
return;
}
const normalized = value.trim();
if (!normalized) {
return;
}
messages.add(normalized);
};
pushValue(err instanceof Error ? err.message : String(err));
if (err && typeof err === "object") {
const record = err as Record<string, unknown>;
pushValue(record.message);
pushValue(record.code);
pushValue(record.name);
if (record.cause && typeof record.cause === "object") {
const cause = record.cause as Record<string, unknown>;
pushValue(cause.message);
pushValue(cause.code);
pushValue(cause.name);
}
}
return [...messages].some((value) => readonlyPattern.test(value));
}
export function extractMemoryErrorReason(err: unknown): string {
if (err instanceof Error && err.message.trim()) {
return err.message;
}
if (err && typeof err === "object") {
const record = err as Record<string, unknown>;
if (typeof record.message === "string" && record.message.trim()) {
return record.message;
}
if (typeof record.code === "string" && record.code.trim()) {
return record.code;
}
}
return String(err);
}
export async function runMemorySyncWithReadonlyRecovery(
state: MemoryReadonlyRecoveryState,
params?: {
reason?: string;
force?: boolean;
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
},
): Promise<void> {
try {
await state.runSync(params);
return;
} catch (err) {
if (!isMemoryReadonlyDbError(err) || state.closed) {
throw err;
}
const reason = extractMemoryErrorReason(err);
state.readonlyRecoveryAttempts += 1;
state.readonlyRecoveryLastError = reason;
log.warn(`memory sync readonly handle detected; reopening sqlite connection`, { reason });
try {
state.db.close();
} catch {}
state.db = state.openDatabase();
state.vectorReady = null;
state.vector.available = null;
state.vector.loadError = undefined;
state.ensureSchema();
const meta = state.readMeta();
state.vector.dims = meta?.vectorDims;
try {
await state.runSync(params);
state.readonlyRecoverySuccesses += 1;
} catch (retryErr) {
state.readonlyRecoveryFailures += 1;
throw retryErr;
}
}
}
export async function closeAllMemoryIndexManagers(): Promise<void> {
await closeManagedCacheEntries({
cache: INDEX_CACHE,
@@ -649,33 +540,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
}
private enqueueTargetedSessionSync(sessionFiles?: string[]): Promise<void> {
for (const sessionFile of sessionFiles ?? []) {
const trimmed = sessionFile.trim();
if (trimmed) {
this.queuedSessionFiles.add(trimmed);
}
}
if (this.queuedSessionFiles.size === 0) {
return this.syncing ?? Promise.resolve();
}
if (!this.queuedSessionSync) {
this.queuedSessionSync = (async () => {
try {
await this.syncing?.catch(() => undefined);
while (!this.closed && this.queuedSessionFiles.size > 0) {
const queuedSessionFiles = Array.from(this.queuedSessionFiles);
this.queuedSessionFiles.clear();
await this.sync({
reason: "queued-session-files",
sessionFiles: queuedSessionFiles,
});
}
} finally {
this.queuedSessionSync = null;
}
})();
}
return this.queuedSessionSync;
return enqueueMemoryTargetedSessionSync(this, sessionFiles);
}
private isReadonlyDbError(err: unknown): boolean {
@@ -782,43 +647,12 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
status(): MemoryProviderStatus {
const sourceFilter = this.buildSourceFilter();
const aggregateRows = this.db
.prepare(
`SELECT 'files' AS kind, source, COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql} GROUP BY source\n` +
`UNION ALL\n` +
`SELECT 'chunks' AS kind, source, COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql} GROUP BY source`,
)
.all(...sourceFilter.params, ...sourceFilter.params) as Array<{
kind: "files" | "chunks";
source: MemorySource;
c: number;
}>;
const aggregateState = (() => {
const sources = Array.from(this.sources);
const bySource = new Map<MemorySource, { files: number; chunks: number }>();
for (const source of sources) {
bySource.set(source, { files: 0, chunks: 0 });
}
let files = 0;
let chunks = 0;
for (const row of aggregateRows) {
const count = row.c ?? 0;
const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 };
if (row.kind === "files") {
entry.files = count;
files += count;
} else {
entry.chunks = count;
chunks += count;
}
bySource.set(row.source, entry);
}
return {
files,
chunks,
sourceCounts: sources.map((source) => Object.assign({ source }, bySource.get(source)!)),
};
})();
const aggregateState = collectMemoryStatusAggregate({
db: this.db,
sources: this.sources,
sourceFilterSql: sourceFilter.sql,
sourceFilterParams: sourceFilter.params,
});
const providerInfo = resolveStatusProviderInfo({
provider: this.provider,