fix(memory): add QMD sync parity hooks (#57354)

* fix(memory): add qmd sync parity hooks

* fix(memory): avoid blocking qmd session warm searches
This commit is contained in:
Vincent Koc
2026-03-29 17:25:37 -07:00
committed by GitHub
parent 9857d40923
commit d26d7c797b
3 changed files with 256 additions and 0 deletions

View File

@@ -12,6 +12,14 @@ const { logWarnMock, logDebugMock, logInfoMock } = vi.hoisted(() => ({
logDebugMock: vi.fn(),
logInfoMock: vi.fn(),
}));
const { watchMock } = vi.hoisted(() => ({
watchMock: vi.fn(() => {
const watcher = new EventEmitter();
return Object.assign(watcher, {
close: vi.fn(async () => undefined),
});
}),
}));
const MCPORTER_STATE_KEY = Symbol.for("openclaw.mcporterState");
type MockChild = EventEmitter & {
@@ -93,6 +101,11 @@ vi.mock("node:child_process", async (importOriginal) => {
};
});
vi.mock("chokidar", () => ({
default: { watch: watchMock },
watch: watchMock,
}));
import { spawn as mockedSpawn } from "node:child_process";
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import {
@@ -152,6 +165,7 @@ describe("QmdMemoryManager", () => {
beforeEach(async () => {
spawnMock.mockClear();
spawnMock.mockImplementation(() => createMockChild());
watchMock.mockClear();
logWarnMock.mockClear();
logDebugMock.mockClear();
logInfoMock.mockClear();
@@ -229,6 +243,139 @@ describe("QmdMemoryManager", () => {
await manager.close();
});
it("runs a qmd sync once for the first search in a fresh session", async () => {
cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "mock-embed",
store: { path: path.join(workspaceDir, "index.sqlite"), vector: { enabled: false } },
sync: { watch: false, onSessionStart: true, onSearch: false },
},
},
list: [{ id: agentId, default: true, workspace: workspaceDir }],
},
memory: {
backend: "qmd",
qmd: {
includeDefaultMemory: false,
update: { interval: "0s", debounceMs: 0, onBoot: false },
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
},
},
} as OpenClawConfig;
spawnMock.mockImplementation((_cmd: string, args: string[]) => {
const child = createMockChild({ autoClose: false });
if (args[0] === "search" || args[0] === "query" || args[0] === "vsearch") {
emitAndClose(child, "stdout", "[]");
return child;
}
queueMicrotask(() => child.closeWith(0));
return child;
});
const { manager } = await createManager({ mode: "full" });
await manager.search("hello", { sessionKey: "session-a" });
await manager.search("hello again", { sessionKey: "session-a" });
const updateCalls = spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update");
expect(updateCalls).toHaveLength(1);
});
it("does not block first search on session-start sync completion", async () => {
vi.useFakeTimers();
cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "mock-embed",
store: { path: path.join(workspaceDir, "index.sqlite"), vector: { enabled: false } },
sync: { watch: false, onSessionStart: true, onSearch: false },
},
},
list: [{ id: agentId, default: true, workspace: workspaceDir }],
},
memory: {
backend: "qmd",
qmd: {
includeDefaultMemory: false,
update: { interval: "0s", debounceMs: 0, onBoot: false },
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
},
},
} as OpenClawConfig;
let releaseUpdate: (() => void) | null = null;
spawnMock.mockImplementation((_cmd: string, args: string[]) => {
if (args[0] === "update") {
const child = createMockChild({ autoClose: false });
releaseUpdate = () => child.closeWith(0);
return child;
}
if (args[0] === "search" || args[0] === "query" || args[0] === "vsearch") {
const child = createMockChild({ autoClose: false });
emitAndClose(child, "stdout", "[]");
return child;
}
return createMockChild();
});
const { manager } = await createManager({ mode: "full" });
const searchPromise = manager.search("hello", { sessionKey: "session-b" });
await vi.advanceTimersByTimeAsync(500);
await expect(searchPromise).resolves.toEqual([]);
releaseUpdate?.();
await manager.close();
});
it("runs qmd sync when watched collection files change", async () => {
vi.useFakeTimers();
cfg = {
agents: {
defaults: {
workspace: workspaceDir,
memorySearch: {
provider: "openai",
model: "mock-embed",
store: { path: path.join(workspaceDir, "index.sqlite"), vector: { enabled: false } },
sync: { watch: true, watchDebounceMs: 25, onSessionStart: false, onSearch: false },
},
},
list: [{ id: agentId, default: true, workspace: workspaceDir }],
},
memory: {
backend: "qmd",
qmd: {
includeDefaultMemory: false,
update: { interval: "0s", debounceMs: 0, onBoot: false },
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
},
},
} as OpenClawConfig;
const { manager } = await createManager({ mode: "full" });
expect(watchMock).toHaveBeenCalledTimes(1);
const watcher = watchMock.mock.results[0]?.value as EventEmitter & { close: Mock };
const initialUpdateCalls = spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update");
expect(initialUpdateCalls).toHaveLength(0);
watcher.emit("change", path.join(workspaceDir, "notes.md"));
await vi.advanceTimersByTimeAsync(25);
const updateCalls = spawnMock.mock.calls.filter((call) => call[1]?.[0] === "update");
expect(updateCalls).toHaveLength(1);
await manager.close();
});
it("runs boot update in background by default", async () => {
cfg = {
...cfg,

View File

@@ -2,13 +2,16 @@ import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import readline from "node:readline";
import chokidar, { type FSWatcher } from "chokidar";
import {
createSubsystemLogger,
resolveMemorySearchConfig,
resolveAgentWorkspaceDir,
resolveGlobalSingleton,
resolveStateDir,
writeFileWithinRoot,
type OpenClawConfig,
type ResolvedMemorySearchConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import {
buildSessionEntry,
@@ -44,6 +47,7 @@ const log = createSubsystemLogger("memory");
const SNIPPET_HEADER_RE = /@@\s*-([0-9]+),([0-9]+)/;
const SEARCH_PENDING_UPDATE_WAIT_MS = 500;
const QMD_WATCH_STABILITY_MS = 200;
const MAX_QMD_OUTPUT_CHARS = 200_000;
const NUL_MARKER_RE = /(?:\^@|\\0|\\x00|\\u0000|null\s*byte|nul\s*byte)/i;
const QMD_EMBED_BACKOFF_BASE_MS = 60_000;
@@ -51,6 +55,15 @@ const QMD_EMBED_BACKOFF_MAX_MS = 60 * 60 * 1000;
const HAN_SCRIPT_RE = /[\u3400-\u9fff]/u;
const QMD_BM25_HAN_KEYWORD_LIMIT = 12;
const MCPORTER_STATE_KEY = Symbol.for("openclaw.mcporterState");
const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([
".git",
"node_modules",
".pnpm-store",
".venv",
"venv",
".tox",
"__pycache__",
]);
type McporterState = {
coldStartWarned: boolean;
@@ -100,6 +113,12 @@ function normalizeHanBm25Query(query: string): string {
return normalizedKeywords.length > 0 ? normalizedKeywords.join(" ") : trimmed;
}
function shouldIgnoreMemoryWatchPath(watchPath: string): boolean {
const normalized = path.normalize(watchPath);
const parts = normalized.split(path.sep).map((segment) => segment.trim().toLowerCase());
return parts.some((segment) => IGNORED_MEMORY_WATCH_DIR_NAMES.has(segment));
}
async function runWithQmdEmbedLock<T>(task: () => Promise<T>): Promise<T> {
const previous = qmdEmbedQueueTail;
let release: (() => void) | undefined;
@@ -166,6 +185,7 @@ export class QmdMemoryManager implements MemorySearchManager {
private readonly xdgCacheHome: string;
private readonly indexPath: string;
private readonly env: NodeJS.ProcessEnv;
private readonly syncSettings: ResolvedMemorySearchConfig | null;
private readonly managedCollectionNames: string[];
private readonly collectionRoots = new Map<string, CollectionRoot>();
private readonly sources = new Set<MemorySource>();
@@ -185,9 +205,12 @@ export class QmdMemoryManager implements MemorySearchManager {
private readonly sessionExporter: SessionExporterConfig | null;
private updateTimer: NodeJS.Timeout | null = null;
private embedTimer: NodeJS.Timeout | null = null;
private watcher: FSWatcher | null = null;
private watchTimer: NodeJS.Timeout | null = null;
private pendingUpdate: Promise<void> | null = null;
private queuedForcedUpdate: Promise<void> | null = null;
private queuedForcedRuns = 0;
private dirty = false;
private closed = false;
private db: SqliteDatabase | null = null;
private lastUpdateAt: number | null = null;
@@ -196,6 +219,7 @@ export class QmdMemoryManager implements MemorySearchManager {
private embedFailureCount = 0;
private attemptedNullByteCollectionRepair = false;
private attemptedDuplicateDocumentRepair = false;
private readonly sessionWarm = new Set<string>();
private constructor(params: {
cfg: OpenClawConfig;
@@ -209,6 +233,7 @@ export class QmdMemoryManager implements MemorySearchManager {
this.stateDir = resolveStateDir(process.env, os.homedir);
this.agentStateDir = path.join(this.stateDir, "agents", this.agentId);
this.qmdDir = path.join(this.agentStateDir, "qmd");
this.syncSettings = resolveMemorySearchConfig(params.cfg, params.agentId);
// QMD uses XDG base dirs for its internal state.
// Collections are managed via `qmd collection add` and stored inside the index DB.
// - config: $XDG_CONFIG_HOME (contexts, etc.)
@@ -271,6 +296,7 @@ export class QmdMemoryManager implements MemorySearchManager {
await this.symlinkSharedModels();
await this.ensureCollections();
this.ensureWatcher();
if (this.qmd.update.onBoot) {
const bootRun = this.runUpdate("boot", true);
@@ -759,6 +785,8 @@ export class QmdMemoryManager implements MemorySearchManager {
if (!trimmed) {
return [];
}
await this.maybeWarmSession(opts?.sessionKey);
await this.maybeSyncDirtySearchState();
await this.waitForPendingUpdateBeforeSearch();
const limit = Math.min(
this.qmd.limits.maxResults,
@@ -994,6 +1022,14 @@ export class QmdMemoryManager implements MemorySearchManager {
clearInterval(this.embedTimer);
this.embedTimer = null;
}
if (this.watchTimer) {
clearTimeout(this.watchTimer);
this.watchTimer = null;
}
if (this.watcher) {
await this.watcher.close().catch(() => undefined);
this.watcher = null;
}
this.queuedForcedRuns = 0;
await this.pendingUpdate?.catch(() => undefined);
await this.queuedForcedUpdate?.catch(() => undefined);
@@ -1031,6 +1067,7 @@ export class QmdMemoryManager implements MemorySearchManager {
await this.exportSessions();
}
await this.runQmdUpdateWithRetry(reason);
this.dirty = false;
if (this.shouldRunEmbed(force)) {
try {
await runWithQmdEmbedLock(async () => {
@@ -1055,6 +1092,77 @@ export class QmdMemoryManager implements MemorySearchManager {
await this.pendingUpdate;
}
private ensureWatcher(): void {
if (!this.syncSettings?.sync.watch || this.watcher || this.closed) {
return;
}
const watchPaths = new Set<string>();
for (const collection of this.qmd.collections) {
if (collection.kind === "sessions") {
continue;
}
watchPaths.add(this.resolveCollectionWatchPath(collection));
}
if (watchPaths.size === 0) {
return;
}
this.watcher = chokidar.watch(Array.from(watchPaths), {
ignoreInitial: true,
ignored: (watchPath) => shouldIgnoreMemoryWatchPath(String(watchPath)),
awaitWriteFinish: {
stabilityThreshold: QMD_WATCH_STABILITY_MS,
pollInterval: 100,
},
});
const markDirty = () => {
this.dirty = true;
this.scheduleWatchSync();
};
this.watcher.on("add", markDirty);
this.watcher.on("change", markDirty);
this.watcher.on("unlink", markDirty);
}
private resolveCollectionWatchPath(collection: ManagedCollection): string {
return path.join(path.normalize(collection.path), collection.pattern);
}
private scheduleWatchSync(): void {
if (!this.syncSettings?.sync.watch) {
return;
}
if (this.watchTimer) {
clearTimeout(this.watchTimer);
}
this.watchTimer = setTimeout(() => {
this.watchTimer = null;
void this.sync({ reason: "watch" }).catch((err) => {
log.warn(`qmd watch sync failed: ${String(err)}`);
});
}, this.syncSettings.sync.watchDebounceMs);
}
private async maybeWarmSession(sessionKey?: string): Promise<void> {
if (!this.syncSettings?.sync.onSessionStart) {
return;
}
const key = sessionKey?.trim() || "";
if (!key || this.sessionWarm.has(key)) {
return;
}
this.sessionWarm.add(key);
void this.sync({ reason: "session-start" }).catch((err) => {
log.warn(`qmd session-start sync failed: ${String(err)}`);
});
}
private async maybeSyncDirtySearchState(): Promise<void> {
if (!this.syncSettings?.sync.onSearch || !this.dirty) {
return;
}
await this.sync({ reason: "search" });
}
private async runQmdUpdateWithRetry(reason: string): Promise<void> {
const isBootRun = reason === "boot" || reason.startsWith("boot:");
const maxAttempts = isBootRun ? 3 : 1;