fix(qmd): Dedup in-flight manager creation so only one full QMD manager arms per agent/config at a time, eliminating the concurrent exportSessions() collisions that triggered path changed during write errors (#65226)

Fixes concurrent manager creation races that caused SafeOpenErrors during session export.

Deduplicates in-flight manager creation so only one full QMD manager arms per agent/config at a time, eliminating the concurrent exportSessions() collisions that triggered path changed during write errors
Resolves and snapshots runtime inputs before cache reuse, replacing stale managers atomically when workspace/config changes, and aborting queued export work promptly on close()
This commit is contained in:
Bek
2026-04-21 18:22:21 -04:00
committed by GitHub
parent 1acb094579
commit dfe0e49c8a
6 changed files with 829 additions and 87 deletions

View File

@@ -346,10 +346,21 @@ jobs:
if: github.event_name == 'pull_request'
env:
BASE_SHA: ${{ github.event.pull_request.base.sha }}
BASE_REF: ${{ github.event.pull_request.base.ref }}
run: |
set -euo pipefail
trusted_config="$RUNNER_TEMP/pre-commit-base.yaml"
git show "${BASE_SHA}:.pre-commit-config.yaml" > "$trusted_config"
if git cat-file -e "${BASE_SHA}^{commit}" 2>/dev/null &&
git cat-file -e "${BASE_SHA}:.pre-commit-config.yaml" 2>/dev/null; then
git show "${BASE_SHA}:.pre-commit-config.yaml" > "$trusted_config"
elif git show "refs/remotes/origin/${BASE_REF}:.pre-commit-config.yaml" \
> "$trusted_config" 2>/dev/null; then
echo "Base SHA ${BASE_SHA} does not expose .pre-commit-config.yaml; using origin/${BASE_REF} instead."
else
echo "::warning title=trusted pre-commit config unavailable::Could not read .pre-commit-config.yaml from ${BASE_SHA} or origin/${BASE_REF}; falling back to the checked-out config."
rm -f "$trusted_config"
exit 0
fi
echo "PRE_COMMIT_CONFIG_PATH=$trusted_config" >> "$GITHUB_ENV"
- name: Setup Python

View File

@@ -3386,6 +3386,142 @@ describe("QmdMemoryManager", () => {
await second.manager.close();
});
it("serializes session exports across managers for the same agent", async () => {
cfg = {
...cfg,
memory: {
backend: "qmd",
qmd: {
includeDefaultMemory: false,
update: { interval: "0s", debounceMs: 0, onBoot: false },
sessions: { enabled: true },
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
},
},
} as OpenClawConfig;
const sessionsDir = path.join(stateDir, "agents", agentId, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
await fs.writeFile(
path.join(sessionsDir, "session-1.jsonl"),
'{"type":"message","message":{"role":"user","content":"hello"}}\n',
"utf-8",
);
const firstEntered = createDeferred<void>();
const releaseFirst = createDeferred<void>();
let activeExports = 0;
let overlapped = false;
const exportSpy = vi
.spyOn(
QmdMemoryManager.prototype as unknown as {
exportSessions: () => Promise<void>;
},
"exportSessions",
)
.mockImplementation(async () => {
activeExports += 1;
if (activeExports > 1) {
overlapped = true;
}
if (activeExports === 1) {
firstEntered.resolve();
await releaseFirst.promise;
}
activeExports -= 1;
});
const first = await createManager({ mode: "status" });
const second = await createManager({ mode: "status" });
try {
const firstSync = first.manager.sync({ reason: "manual", force: true });
await firstEntered.promise;
const secondSync = second.manager.sync({ reason: "manual", force: true });
await Promise.resolve();
expect(exportSpy).toHaveBeenCalledTimes(1);
expect(overlapped).toBe(false);
releaseFirst.resolve();
await Promise.all([firstSync, secondSync]);
expect(exportSpy).toHaveBeenCalledTimes(2);
expect(overlapped).toBe(false);
} finally {
exportSpy.mockRestore();
await first.manager.close();
await second.manager.close();
}
});
it("skips queued session export work after close while waiting on the shared update queue", async () => {
cfg = {
...cfg,
memory: {
backend: "qmd",
qmd: {
includeDefaultMemory: false,
update: { interval: "0s", debounceMs: 0, onBoot: false },
sessions: { enabled: true },
paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }],
},
},
} as OpenClawConfig;
const sessionsDir = path.join(stateDir, "agents", agentId, "sessions");
await fs.mkdir(sessionsDir, { recursive: true });
await fs.writeFile(
path.join(sessionsDir, "session-1.jsonl"),
'{"type":"message","message":{"role":"user","content":"hello"}}\n',
"utf-8",
);
const firstEntered = createDeferred<void>();
const releaseFirst = createDeferred<void>();
const exportSpy = vi
.spyOn(
QmdMemoryManager.prototype as unknown as {
exportSessions: () => Promise<void>;
},
"exportSessions",
)
.mockImplementation(async () => {
if (exportSpy.mock.calls.length === 1) {
firstEntered.resolve();
await releaseFirst.promise;
}
});
const first = await createManager({ mode: "status" });
const second = await createManager({ mode: "status" });
try {
const firstSync = first.manager.sync({ reason: "manual", force: true });
await firstEntered.promise;
const secondSync = second.manager.sync({ reason: "manual", force: true });
await Promise.resolve();
const closeSecond = second.manager.close();
await expect(closeSecond).resolves.toBeUndefined();
releaseFirst.resolve();
await Promise.all([firstSync, secondSync]);
expect(exportSpy).toHaveBeenCalledTimes(1);
const updateCalls = spawnMock.mock.calls
.map((call: unknown[]) => call[1] as string[])
.filter((args: string[]) => args[0] === "update");
expect(updateCalls).toHaveLength(1);
} finally {
exportSpy.mockRestore();
await first.manager.close();
await second.manager.close();
}
});
it("runs qmd embed in search mode for forced sync", async () => {
cfg = {
...cfg,

View File

@@ -9,6 +9,7 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { withFileLock } from "openclaw/plugin-sdk/file-lock";
import {
createSubsystemLogger,
resolveAgentContextLimits,
resolveMemorySearchSyncConfig,
resolveAgentWorkspaceDir,
resolveGlobalSingleton,
@@ -16,7 +17,6 @@ import {
writeFileWithinRoot,
type OpenClawConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import { resolveAgentContextLimits } from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import {
buildSessionEntry,
deriveQmdScopeChannel,
@@ -76,6 +76,7 @@ const QMD_EMBED_LOCK_RETRY_TEMPLATE = {
} as const;
const MCPORTER_STATE_KEY = Symbol.for("openclaw.mcporterState");
const QMD_EMBED_QUEUE_KEY = Symbol.for("openclaw.qmdEmbedQueueTail");
const QMD_UPDATE_QUEUE_KEY = Symbol.for("openclaw.qmdUpdateQueueState");
const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([
".git",
"node_modules",
@@ -120,6 +121,10 @@ type QmdEmbedQueueState = {
tail: Promise<void>;
};
type QmdUpdateQueueState = {
tails: Map<string, Promise<void>>;
};
function getMcporterState(): McporterState {
return resolveGlobalSingleton<McporterState>(MCPORTER_STATE_KEY, () => ({
coldStartWarned: false,
@@ -133,6 +138,12 @@ function getQmdEmbedQueueState(): QmdEmbedQueueState {
}));
}
function getQmdUpdateQueueState(): QmdUpdateQueueState {
return resolveGlobalSingleton<QmdUpdateQueueState>(QMD_UPDATE_QUEUE_KEY, () => ({
tails: new Map<string, Promise<void>>(),
}));
}
function _hasHanScript(value: string): boolean {
return HAN_SCRIPT_RE.test(value);
}
@@ -205,6 +216,11 @@ type ManagedCollection = {
};
type QmdManagerMode = "full" | "status";
type QmdManagerRuntimeConfig = {
workspaceDir: string;
syncSettings: ReturnType<typeof resolveMemorySearchSyncConfig>;
contextLimits: ReturnType<typeof resolveAgentContextLimits>;
};
type BuiltinQmdMcpTool = "query" | "search" | "vector_search" | "deep_search";
type QmdMcporterSearchParams =
| {
@@ -255,20 +271,27 @@ export class QmdMemoryManager implements MemorySearchManager {
agentId: string;
resolved: ResolvedMemoryBackendConfig;
mode?: QmdManagerMode;
runtimeConfig?: QmdManagerRuntimeConfig;
}): Promise<QmdMemoryManager | null> {
const resolved = params.resolved.qmd;
if (!resolved) {
return null;
}
const manager = new QmdMemoryManager({ cfg: params.cfg, agentId: params.agentId, resolved });
const runtimeConfig =
params.runtimeConfig ?? resolveQmdManagerRuntimeConfig(params.cfg, params.agentId);
const manager = new QmdMemoryManager({
agentId: params.agentId,
resolved,
runtimeConfig,
});
await manager.initialize(params.mode ?? "full");
return manager;
}
private readonly cfg: OpenClawConfig;
private readonly agentId: string;
private readonly qmd: ResolvedQmdConfig;
private readonly workspaceDir: string;
private readonly contextLimits: ReturnType<typeof resolveAgentContextLimits>;
private readonly stateDir: string;
private readonly agentStateDir: string;
private readonly qmdDir: string;
@@ -303,6 +326,8 @@ export class QmdMemoryManager implements MemorySearchManager {
private queuedForcedRuns = 0;
private dirty = false;
private closed = false;
private readonly closeSignal: Promise<void>;
private resolveCloseSignal!: () => void;
private db: SqliteDatabase | null = null;
private lastUpdateAt: number | null = null;
private lastEmbedAt: number | null = null;
@@ -316,18 +341,18 @@ export class QmdMemoryManager implements MemorySearchManager {
private collectionPatternFlag: QmdCollectionPatternFlag | null = "--glob";
private constructor(params: {
cfg: OpenClawConfig;
agentId: string;
resolved: ResolvedQmdConfig;
runtimeConfig: QmdManagerRuntimeConfig;
}) {
this.cfg = params.cfg;
this.agentId = params.agentId;
this.qmd = params.resolved;
this.workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
this.workspaceDir = params.runtimeConfig.workspaceDir;
this.contextLimits = params.runtimeConfig.contextLimits;
this.stateDir = resolveStateDir(process.env, os.homedir);
this.agentStateDir = path.join(this.stateDir, "agents", this.agentId);
this.qmdDir = path.join(this.agentStateDir, "qmd");
this.syncSettings = resolveMemorySearchSyncConfig(params.cfg, params.agentId);
this.syncSettings = params.runtimeConfig.syncSettings;
// QMD uses XDG base dirs for its internal state.
// Collections are managed via `qmd collection add` and stored inside the index DB.
// - config: $XDG_CONFIG_HOME (contexts, etc.)
@@ -346,6 +371,9 @@ export class QmdMemoryManager implements MemorySearchManager {
XDG_CACHE_HOME: this.xdgCacheHome,
NO_COLOR: "1",
};
this.closeSignal = new Promise<void>((resolve) => {
this.resolveCloseSignal = resolve;
});
this.sessionExporter = this.qmd.sessions.enabled
? {
dir: this.qmd.sessions.exportDir ?? path.join(this.qmdDir, "sessions"),
@@ -1198,7 +1226,7 @@ export class QmdMemoryManager implements MemorySearchManager {
if (statResult.missing) {
return { text: "", path: relPath };
}
const contextLimits = resolveAgentContextLimits(this.cfg, this.agentId);
const contextLimits = this.contextLimits;
if (params.from !== undefined || params.lines !== undefined) {
const requestedCount = Math.max(
1,
@@ -1307,6 +1335,7 @@ export class QmdMemoryManager implements MemorySearchManager {
return;
}
this.closed = true;
this.resolveCloseSignal();
if (this.updateTimer) {
clearInterval(this.updateTimer);
this.updateTimer = null;
@@ -1356,11 +1385,19 @@ export class QmdMemoryManager implements MemorySearchManager {
return;
}
const run = async () => {
if (this.sessionExporter) {
await this.exportSessions();
await this.withQmdUpdateQueue(async () => {
if (this.closed) {
return;
}
if (this.sessionExporter) {
await this.exportSessions();
}
await this.runQmdUpdateWithRetry(reason);
this.dirty = false;
});
if (this.closed) {
return;
}
await this.runQmdUpdateWithRetry(reason);
this.dirty = false;
if (this.shouldRunEmbed(force)) {
try {
await this.withQmdEmbedLock(async () => {
@@ -1376,6 +1413,9 @@ export class QmdMemoryManager implements MemorySearchManager {
this.noteEmbedFailure(reason, err);
}
}
if (this.closed) {
return;
}
this.lastUpdateAt = Date.now();
this.docPathCache.clear();
};
@@ -1573,6 +1613,41 @@ export class QmdMemoryManager implements MemorySearchManager {
}
}
private async withQmdUpdateQueue<T>(task: () => Promise<T>): Promise<T> {
const queue = getQmdUpdateQueueState();
const key = this.qmdDir;
const previous = queue.tails.get(key) ?? Promise.resolve();
let releaseCurrent!: () => void;
const current = new Promise<void>((resolve) => {
releaseCurrent = resolve;
});
const next = previous.then(
() => current,
() => current,
);
queue.tails.set(key, next);
try {
const waitResult = await Promise.race([
previous.then(
() => "ready" as const,
() => "ready" as const,
),
this.closeSignal.then(() => "closed" as const),
]);
if (waitResult === "closed") {
return undefined as T;
}
return await task();
} finally {
releaseCurrent();
void next.finally(() => {
if (queue.tails.get(key) === next) {
queue.tails.delete(key);
}
});
}
}
private noteEmbedFailure(reason: string, err: unknown): void {
this.embedFailureCount += 1;
const delayMs = Math.min(
@@ -2896,3 +2971,14 @@ export class QmdMemoryManager implements MemorySearchManager {
return [command, normalizedQuery, "--json", "-n", String(limit)];
}
}
function resolveQmdManagerRuntimeConfig(
cfg: OpenClawConfig,
agentId: string,
): QmdManagerRuntimeConfig {
return {
workspaceDir: resolveAgentWorkspaceDir(cfg, agentId),
syncSettings: resolveMemorySearchSyncConfig(cfg, agentId),
contextLimits: resolveAgentContextLimits(cfg, agentId),
};
}

View File

@@ -126,13 +126,18 @@ import { QmdMemoryManager } from "./qmd-manager.js";
import { closeAllMemorySearchManagers, getMemorySearchManager } from "./search-manager.js";
const createQmdManagerMock = vi.mocked(QmdMemoryManager.create);
type QmdManagerInstance = Awaited<ReturnType<typeof QmdMemoryManager.create>>;
type SearchManagerResult = Awaited<ReturnType<typeof getMemorySearchManager>>;
type SearchManager = NonNullable<SearchManagerResult["manager"]>;
function createQmdCfg(agentId: string): OpenClawConfig {
function createQmdCfg(
agentId: string,
workspace: string = "/tmp/workspace",
qmd: Record<string, unknown> = {},
): OpenClawConfig {
return {
memory: { backend: "qmd", qmd: {} },
agents: { list: [{ id: agentId, default: true, workspace: "/tmp/workspace" }] },
memory: { backend: "qmd", qmd },
agents: { list: [{ id: agentId, default: true, workspace }] },
};
}
@@ -167,6 +172,16 @@ function requireManager(result: SearchManagerResult): SearchManager {
return result.manager;
}
function createDeferred<T>() {
let resolve!: (value: T) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve, reject };
}
async function createFailedQmdSearchHarness(params: { agentId: string; errorMessage: string }) {
const cfg = createQmdCfg(params.agentId);
mockPrimary.search.mockRejectedValueOnce(new Error(params.errorMessage));
@@ -296,6 +311,296 @@ describe("getMemorySearchManager caching", () => {
expect(checkQmdBinaryAvailability).toHaveBeenCalledTimes(1);
});
it("reuses cached full qmd manager across normalized agent ids", async () => {
const cfg = createQmdCfg("Main-Agent");
const first = await getMemorySearchManager({ cfg, agentId: "Main-Agent" });
const second = await getMemorySearchManager({ cfg, agentId: "main-agent" });
requireManager(first);
requireManager(second);
expect(first.manager).toBe(second.manager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
expect(createQmdManagerMock.mock.calls[0]?.[0]).toEqual(
expect.objectContaining({ agentId: "main-agent" }),
);
});
it("replaces cached full qmd manager across different workspaces", async () => {
const agentId = "cached-qmd-workspace-reload";
const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a");
const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b");
const firstPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const secondPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
createQmdManagerMock
.mockImplementationOnce(async () => firstPrimary as unknown as QmdManagerInstance)
.mockImplementationOnce(async () => secondPrimary as unknown as QmdManagerInstance);
const first = await getMemorySearchManager({ cfg: firstCfg, agentId });
const firstManager = requireManager(first);
const second = await getMemorySearchManager({ cfg: secondCfg, agentId });
const secondManager = requireManager(second);
expect(firstManager).not.toBe(secondManager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(2);
expect(firstPrimary.close).toHaveBeenCalledTimes(1);
await expect(firstManager.search("hello")).rejects.toThrow("replaced by a newer qmd manager");
expect(() => firstManager.status()).toThrow("replaced by a newer qmd manager");
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace-a",
});
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace-b",
});
});
it("replaces cached full qmd manager when context limits change", async () => {
const agentId = "cached-qmd-context-limits-reload";
const firstCfg = createQmdCfg(agentId, "/tmp/workspace");
const secondCfg = {
...createQmdCfg(agentId, "/tmp/workspace"),
agents: {
list: [
{
id: agentId,
default: true,
workspace: "/tmp/workspace",
contextLimits: {
memoryGetMaxChars: 24_000,
memoryGetDefaultLines: 180,
},
},
],
},
} as OpenClawConfig;
const firstPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const secondPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
createQmdManagerMock
.mockImplementationOnce(async () => firstPrimary as unknown as QmdManagerInstance)
.mockImplementationOnce(async () => secondPrimary as unknown as QmdManagerInstance);
const first = await getMemorySearchManager({ cfg: firstCfg, agentId });
const second = await getMemorySearchManager({ cfg: secondCfg, agentId });
requireManager(first);
requireManager(second);
expect(first.manager).not.toBe(second.manager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(2);
expect(firstPrimary.close).toHaveBeenCalledTimes(1);
});
it("keeps the existing cached full qmd manager when replacement creation fails", async () => {
const agentId = "cached-qmd-failed-replacement";
const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a");
const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b");
const firstPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
createQmdManagerMock.mockImplementationOnce(
async () => firstPrimary as unknown as QmdManagerInstance,
);
checkQmdBinaryAvailability
.mockResolvedValueOnce({ available: true })
.mockResolvedValueOnce({ available: false, error: "spawn qmd ENOENT" });
const first = await getMemorySearchManager({ cfg: firstCfg, agentId });
const firstManager = requireManager(first);
const replacementAttempt = await getMemorySearchManager({ cfg: secondCfg, agentId });
expect(replacementAttempt.manager).toBe(fallbackManager);
expect(firstPrimary.close).not.toHaveBeenCalled();
await expect(firstManager.search("hello")).resolves.toEqual([]);
const firstAgain = await getMemorySearchManager({ cfg: firstCfg, agentId });
expect(firstAgain.manager).toBe(firstManager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
});
it("dedupes concurrent full qmd manager creation for the same agent", async () => {
const agentId = "pending-qmd";
const cfg = createQmdCfg(agentId);
const createGate = createDeferred<QmdManagerInstance>();
createQmdManagerMock.mockImplementationOnce(async () => await createGate.promise);
const firstPromise = getMemorySearchManager({ cfg, agentId });
const secondPromise = getMemorySearchManager({ cfg, agentId });
createGate.resolve(mockPrimary as unknown as QmdManagerInstance);
const [first, second] = await Promise.all([firstPromise, secondPromise]);
requireManager(first);
requireManager(second);
expect(first.manager).toBe(second.manager);
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
expect(checkQmdBinaryAvailability).toHaveBeenCalledTimes(1);
});
it("serializes pending full qmd creation before replacing it for a different workspace", async () => {
const agentId = "pending-qmd-workspace-reload";
const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a");
const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b");
const firstPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const secondPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const firstGate = createDeferred<QmdManagerInstance>();
const secondGate = createDeferred<QmdManagerInstance>();
createQmdManagerMock
.mockImplementationOnce(async () => await firstGate.promise)
.mockImplementationOnce(async () => await secondGate.promise);
const firstPromise = getMemorySearchManager({ cfg: firstCfg, agentId });
await Promise.resolve();
const secondPromise = getMemorySearchManager({ cfg: secondCfg, agentId });
await vi.waitFor(() => {
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
});
firstGate.resolve(firstPrimary as unknown as QmdManagerInstance);
await vi.waitFor(() => {
expect(createQmdManagerMock).toHaveBeenCalledTimes(2);
});
secondGate.resolve(secondPrimary as unknown as QmdManagerInstance);
const [first, second] = await Promise.all([firstPromise, secondPromise]);
requireManager(first);
requireManager(second);
expect(first.manager).not.toBe(second.manager);
expect(firstPrimary.close).toHaveBeenCalledTimes(1);
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace-a",
});
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace-b",
});
});
it("serializes pending full qmd creation before replacing it for a different qmd config", async () => {
const agentId = "pending-qmd-config-reload";
const firstCfg = createQmdCfg(agentId, "/tmp/workspace", { command: "qmd" });
const secondCfg = createQmdCfg(agentId, "/tmp/workspace", { command: "qmd-alt" });
const firstPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const secondPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const firstGate = createDeferred<QmdManagerInstance>();
const secondGate = createDeferred<QmdManagerInstance>();
createQmdManagerMock
.mockImplementationOnce(async () => await firstGate.promise)
.mockImplementationOnce(async () => await secondGate.promise);
const firstPromise = getMemorySearchManager({ cfg: firstCfg, agentId });
await Promise.resolve();
const secondPromise = getMemorySearchManager({ cfg: secondCfg, agentId });
await vi.waitFor(() => {
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
});
firstGate.resolve(firstPrimary as unknown as QmdManagerInstance);
await vi.waitFor(() => {
expect(createQmdManagerMock).toHaveBeenCalledTimes(2);
});
secondGate.resolve(secondPrimary as unknown as QmdManagerInstance);
const [first, second] = await Promise.all([firstPromise, secondPromise]);
requireManager(first);
requireManager(second);
expect(first.manager).not.toBe(second.manager);
expect(firstPrimary.close).toHaveBeenCalledTimes(1);
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace",
});
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, {
command: "qmd-alt",
env: process.env,
cwd: "/tmp/workspace",
});
});
it("reuses pending full qmd creation when raw cfg differs but qmd inputs match", async () => {
const agentId = "pending-qmd-unrelated-config";
const firstCfg = createQmdCfg(agentId);
const secondCfg = {
...createQmdCfg(agentId),
session: { store: "/tmp/alternate-session-store.json" },
} as OpenClawConfig;
const createGate = createDeferred<QmdManagerInstance>();
createQmdManagerMock.mockImplementationOnce(async () => await createGate.promise);
const firstPromise = getMemorySearchManager({ cfg: firstCfg, agentId });
await Promise.resolve();
const secondPromise = getMemorySearchManager({ cfg: secondCfg, agentId });
createGate.resolve(mockPrimary as unknown as QmdManagerInstance);
const [first, second] = await Promise.all([firstPromise, secondPromise]);
requireManager(first);
requireManager(second);
expect(createQmdManagerMock).toHaveBeenCalledTimes(1);
expect(first.manager).toBe(second.manager);
expect(checkQmdBinaryAvailability).toHaveBeenCalledTimes(1);
});
it("does not cache qmd managers for status-only requests", async () => {
const agentId = "status-agent";
const cfg = createQmdCfg(agentId);
@@ -400,6 +705,51 @@ describe("getMemorySearchManager caching", () => {
expect(fullAgain.manager).toBe(full.manager);
});
it("does not borrow a cached full qmd manager for status across different workspaces", async () => {
const agentId = "status-workspace-reload";
const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a");
const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b");
const firstPrimary = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
const secondStatusManager = createManagerMock({
backend: "qmd",
provider: "qmd",
model: "qmd",
requestedProvider: "qmd",
withMemorySourceCounts: true,
});
createQmdManagerMock
.mockImplementationOnce(async () => firstPrimary as unknown as QmdManagerInstance)
.mockImplementationOnce(async () => secondStatusManager as unknown as QmdManagerInstance);
const full = await getMemorySearchManager({ cfg: firstCfg, agentId });
const fullManager = requireManager(full);
const status = await getMemorySearchManager({ cfg: secondCfg, agentId, purpose: "status" });
requireManager(status);
expect(status.manager).toBe(secondStatusManager);
expect(createQmdManagerMock.mock.calls).toHaveLength(2);
expect(firstPrimary.close).not.toHaveBeenCalled();
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace-a",
});
expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, {
command: "qmd",
env: process.env,
cwd: "/tmp/workspace-b",
});
const fullAgain = await getMemorySearchManager({ cfg: firstCfg, agentId });
expect(fullAgain.manager).toBe(fullManager);
});
it("gets a fresh qmd manager for later status requests after close", async () => {
const agentId = "status-eviction-agent";
const cfg = createQmdCfg(agentId);
@@ -478,6 +828,31 @@ describe("getMemorySearchManager caching", () => {
expect(createQmdManagerMock.mock.calls).toHaveLength(2);
});
it("waits for pending full qmd manager creation during global teardown", async () => {
const agentId = "teardown-pending-qmd";
const cfg = createQmdCfg(agentId);
const createGate = createDeferred<QmdManagerInstance>();
createQmdManagerMock.mockImplementationOnce(async () => await createGate.promise);
const firstPromise = getMemorySearchManager({ cfg, agentId });
await Promise.resolve();
const closePromise = closeAllMemorySearchManagers();
await Promise.resolve();
createGate.resolve(mockPrimary as unknown as QmdManagerInstance);
const first = await firstPromise;
const firstManager = requireManager(first);
await closePromise;
expect(mockPrimary.close).toHaveBeenCalledTimes(1);
const second = await getMemorySearchManager({ cfg, agentId });
expect(second.manager).not.toBe(firstManager);
expect(createQmdManagerMock.mock.calls).toHaveLength(2);
});
it("closes builtin index managers on teardown after runtime is loaded", async () => {
const retryAgentId = "teardown-with-fallback";
const { manager } = await createFailedQmdSearchHarness({

View File

@@ -2,8 +2,10 @@ import fs from "node:fs/promises";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
createSubsystemLogger,
resolveAgentContextLimits,
resolveAgentWorkspaceDir,
resolveGlobalSingleton,
resolveMemorySearchSyncConfig,
type OpenClawConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import { checkQmdBinaryAvailability } from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
@@ -15,10 +17,29 @@ import {
type MemorySyncProgressUpdate,
type ResolvedQmdConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { normalizeAgentId } from "openclaw/plugin-sdk/routing";
const MEMORY_SEARCH_MANAGER_CACHE_KEY = Symbol.for("openclaw.memorySearchManagerCache");
type Maybe<T> = T | null;
type QmdManagerRuntimeConfig = {
workspaceDir: string;
syncSettings: ReturnType<typeof resolveMemorySearchSyncConfig>;
contextLimits: ReturnType<typeof resolveAgentContextLimits>;
};
type CachedQmdManagerEntry = {
identityKey: string;
manager: MemorySearchManager;
};
type PendingQmdManagerCreate = {
identityKey: string;
promise: Promise<Maybe<MemorySearchManager>>;
};
type MemorySearchManagerCacheStore = {
qmdManagerCache: Map<string, MemorySearchManager>;
qmdManagerCache: Map<string, CachedQmdManagerEntry>;
pendingQmdManagerCreates: Map<string, PendingQmdManagerCreate>;
};
function getMemorySearchManagerCacheStore(): MemorySearchManagerCacheStore {
@@ -26,13 +47,17 @@ function getMemorySearchManagerCacheStore(): MemorySearchManagerCacheStore {
return resolveGlobalSingleton<MemorySearchManagerCacheStore>(
MEMORY_SEARCH_MANAGER_CACHE_KEY,
() => ({
qmdManagerCache: new Map<string, MemorySearchManager>(),
qmdManagerCache: new Map<string, CachedQmdManagerEntry>(),
pendingQmdManagerCreates: new Map<string, PendingQmdManagerCreate>(),
}),
);
}
const log = createSubsystemLogger("memory");
const { qmdManagerCache: QMD_MANAGER_CACHE } = getMemorySearchManagerCacheStore();
const {
qmdManagerCache: QMD_MANAGER_CACHE,
pendingQmdManagerCreates: PENDING_QMD_MANAGER_CREATES,
} = getMemorySearchManagerCacheStore();
let managerRuntimePromise: Promise<typeof import("../../manager-runtime.js")> | null = null;
let qmdManagerModulePromise: Promise<typeof import("./qmd-manager.js")> | null = null;
@@ -47,7 +72,7 @@ function loadQmdManagerModule() {
}
export type MemorySearchManagerResult = {
manager: MemorySearchManager | null;
manager: Maybe<MemorySearchManager>;
error?: string;
};
@@ -58,74 +83,134 @@ export async function getMemorySearchManager(params: {
}): Promise<MemorySearchManagerResult> {
const resolved = resolveMemoryBackendConfig(params);
if (resolved.backend === "qmd" && resolved.qmd) {
const qmdResolved = resolved.qmd;
const normalizedAgentId = normalizeAgentId(params.agentId);
const runtimeConfig = resolveQmdManagerRuntimeConfig(params.cfg, normalizedAgentId);
const { workspaceDir } = runtimeConfig;
const statusOnly = params.purpose === "status";
const baseCacheKey = buildQmdCacheKey(params.agentId, resolved.qmd);
const cacheKey = `${baseCacheKey}:${statusOnly ? "status" : "full"}`;
const cached = QMD_MANAGER_CACHE.get(cacheKey);
if (cached) {
return { manager: cached };
}
if (statusOnly) {
const fullCached = QMD_MANAGER_CACHE.get(`${baseCacheKey}:full`);
if (fullCached) {
// Status callers often close the manager they receive. Wrap the live
// full manager with a no-op close so health/status probes do not tear
// down the active QMD manager for the process.
return { manager: new BorrowedMemoryManager(fullCached) };
const scopeKey = buildQmdManagerScopeKey(normalizedAgentId);
const identityKey = buildQmdManagerIdentityKey(normalizedAgentId, qmdResolved, runtimeConfig);
const createPrimaryQmdManager = async (
mode: "full" | "status",
): Promise<Maybe<MemorySearchManager>> => {
try {
await fs.mkdir(workspaceDir, { recursive: true });
} catch (err) {
log.warn(
`qmd workspace unavailable (${workspaceDir}); falling back to builtin: ${formatErrorMessage(err)}`,
);
return null;
}
}
const workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId);
try {
await fs.mkdir(workspaceDir, { recursive: true });
} catch (err) {
log.warn(
`qmd workspace unavailable (${workspaceDir}); falling back to builtin: ${formatErrorMessage(err)}`,
);
return await getBuiltinMemorySearchManager(params);
}
const qmdBinary = await checkQmdBinaryAvailability({
command: resolved.qmd.command,
env: process.env,
cwd: workspaceDir,
});
if (!qmdBinary.available) {
log.warn(
`qmd binary unavailable (${resolved.qmd.command}); falling back to builtin: ${qmdBinary.error ?? "unknown error"}`,
);
} else {
const qmdBinary = await checkQmdBinaryAvailability({
command: qmdResolved.command,
env: process.env,
cwd: workspaceDir,
});
if (!qmdBinary.available) {
log.warn(
`qmd binary unavailable (${qmdResolved.command}); falling back to builtin: ${qmdBinary.error ?? "unknown error"}`,
);
return null;
}
try {
const { QmdMemoryManager } = await loadQmdManagerModule();
const primary = await QmdMemoryManager.create({
cfg: params.cfg,
agentId: params.agentId,
resolved,
mode: statusOnly ? "status" : "full",
agentId: normalizedAgentId,
resolved: { ...resolved, qmd: qmdResolved },
mode,
runtimeConfig,
});
if (primary) {
if (statusOnly) {
return { manager: primary };
}
const wrapper = new FallbackMemoryManager(
{
primary,
fallbackFactory: async () => {
const { MemoryIndexManager } = await loadManagerRuntime();
return await MemoryIndexManager.get(params);
},
},
() => {
QMD_MANAGER_CACHE.delete(cacheKey);
},
);
QMD_MANAGER_CACHE.set(cacheKey, wrapper);
return { manager: wrapper };
return primary;
}
} catch (err) {
const message = formatErrorMessage(err);
log.warn(`qmd memory unavailable; falling back to builtin: ${message}`);
}
return null;
};
const createFullQmdManager = async (
expectedIdentityKey: string,
): Promise<Maybe<CachedQmdManagerEntry>> => {
const primary = await createPrimaryQmdManager("full");
if (!primary) {
return null;
}
let cacheEntry!: CachedQmdManagerEntry;
const wrapper = new FallbackMemoryManager(
{
primary,
fallbackFactory: async () => {
const { MemoryIndexManager } = await loadManagerRuntime();
return await MemoryIndexManager.get(params);
},
},
() => {
const current = QMD_MANAGER_CACHE.get(scopeKey);
if (current === cacheEntry) {
QMD_MANAGER_CACHE.delete(scopeKey);
}
},
);
cacheEntry = {
identityKey: expectedIdentityKey,
manager: wrapper,
};
return cacheEntry;
};
while (true) {
const cached = QMD_MANAGER_CACHE.get(scopeKey);
const cachedMatchesIdentity = cached?.identityKey === identityKey;
if (cachedMatchesIdentity) {
if (statusOnly) {
// Status callers often close the manager they receive. Wrap the live
// full manager with a no-op close so health/status probes do not tear
// down the active QMD manager for the process.
return { manager: new BorrowedMemoryManager(cached.manager) };
}
return { manager: cached.manager };
}
if (statusOnly) {
const manager = await createPrimaryQmdManager("status");
return manager ? { manager } : await getBuiltinMemorySearchManager(params);
}
const pending = PENDING_QMD_MANAGER_CREATES.get(scopeKey);
if (pending) {
await pending.promise;
continue;
}
const pendingCreate: PendingQmdManagerCreate = {
identityKey,
promise: (async () => {
const created = await createFullQmdManager(identityKey);
if (!created) {
return null;
}
QMD_MANAGER_CACHE.set(scopeKey, created);
if (cached) {
await closeQmdManagerForReplacement(cached.manager).catch((err) => {
log.warn(`failed to retire replaced qmd memory manager: ${formatErrorMessage(err)}`);
});
}
return created.manager;
})().finally(() => {
const currentPending = PENDING_QMD_MANAGER_CREATES.get(scopeKey);
if (currentPending === pendingCreate) {
PENDING_QMD_MANAGER_CREATES.delete(scopeKey);
}
}),
};
PENDING_QMD_MANAGER_CREATES.set(scopeKey, pendingCreate);
const manager = await pendingCreate.promise;
return manager ? { manager } : await getBuiltinMemorySearchManager(params);
}
}
@@ -192,7 +277,10 @@ class BorrowedMemoryManager implements MemorySearchManager {
}
export async function closeAllMemorySearchManagers(): Promise<void> {
const managers = Array.from(QMD_MANAGER_CACHE.values());
const pendingCreates = Array.from(PENDING_QMD_MANAGER_CREATES.values(), (entry) => entry.promise);
await Promise.allSettled(pendingCreates);
const managers = Array.from(QMD_MANAGER_CACHE.values(), (entry) => entry.manager);
PENDING_QMD_MANAGER_CREATES.clear();
QMD_MANAGER_CACHE.clear();
for (const manager of managers) {
try {
@@ -208,15 +296,17 @@ export async function closeAllMemorySearchManagers(): Promise<void> {
}
class FallbackMemoryManager implements MemorySearchManager {
private fallback: MemorySearchManager | null = null;
private fallback: Maybe<MemorySearchManager> = null;
private primaryFailed = false;
private lastError?: string;
private cacheEvicted = false;
private closed = false;
private closeReason = "memory search manager is closed";
constructor(
private readonly deps: {
primary: MemorySearchManager;
fallbackFactory: () => Promise<MemorySearchManager | null>;
fallbackFactory: () => Promise<Maybe<MemorySearchManager>>;
},
private readonly onClose?: () => void,
) {}
@@ -231,6 +321,7 @@ class FallbackMemoryManager implements MemorySearchManager {
onDebug?: (debug: MemorySearchRuntimeDebug) => void;
},
) {
this.ensureOpen();
if (!this.primaryFailed) {
try {
return await this.deps.primary.search(query, opts);
@@ -251,6 +342,7 @@ class FallbackMemoryManager implements MemorySearchManager {
}
async readFile(params: { relPath: string; from?: number; lines?: number }) {
this.ensureOpen();
if (!this.primaryFailed) {
return await this.deps.primary.readFile(params);
}
@@ -262,6 +354,7 @@ class FallbackMemoryManager implements MemorySearchManager {
}
status() {
this.ensureOpen();
if (!this.primaryFailed) {
return this.deps.primary.status();
}
@@ -296,6 +389,7 @@ class FallbackMemoryManager implements MemorySearchManager {
sessionFiles?: string[];
progress?: (update: MemorySyncProgressUpdate) => void;
}) {
this.ensureOpen();
if (!this.primaryFailed) {
await this.deps.primary.sync?.(params);
return;
@@ -305,6 +399,7 @@ class FallbackMemoryManager implements MemorySearchManager {
}
async probeEmbeddingAvailability(): Promise<MemoryEmbeddingProbeResult> {
this.ensureOpen();
if (!this.primaryFailed) {
return await this.deps.primary.probeEmbeddingAvailability();
}
@@ -316,6 +411,7 @@ class FallbackMemoryManager implements MemorySearchManager {
}
async probeVectorAvailability() {
this.ensureOpen();
if (!this.primaryFailed) {
return await this.deps.primary.probeVectorAvailability();
}
@@ -324,16 +420,25 @@ class FallbackMemoryManager implements MemorySearchManager {
}
async close() {
if (this.closed) {
return;
}
this.closed = true;
await this.deps.primary.close?.();
await this.fallback?.close?.();
this.evictCacheEntry();
}
private async ensureFallback(): Promise<MemorySearchManager | null> {
async invalidate(reason: string) {
this.closeReason = reason;
await this.close();
}
private async ensureFallback(): Promise<Maybe<MemorySearchManager>> {
if (this.fallback) {
return this.fallback;
}
let fallback: MemorySearchManager | null;
let fallback: Maybe<MemorySearchManager>;
try {
fallback = await this.deps.fallbackFactory();
if (!fallback) {
@@ -349,6 +454,12 @@ class FallbackMemoryManager implements MemorySearchManager {
return this.fallback;
}
private ensureOpen(): void {
if (this.closed) {
throw new Error(this.closeReason);
}
}
private evictCacheEntry(): void {
if (this.cacheEvicted) {
return;
@@ -358,8 +469,35 @@ class FallbackMemoryManager implements MemorySearchManager {
}
}
function buildQmdCacheKey(agentId: string, config: ResolvedQmdConfig): string {
async function closeQmdManagerForReplacement(manager: MemorySearchManager): Promise<void> {
if (manager instanceof FallbackMemoryManager) {
await manager.invalidate("memory search manager was replaced by a newer qmd manager");
return;
}
await manager.close?.();
}
function buildQmdManagerScopeKey(agentId: string): string {
return agentId;
}
function buildQmdManagerIdentityKey(
agentId: string,
config: ResolvedQmdConfig,
runtimeConfig: QmdManagerRuntimeConfig,
): string {
// ResolvedQmdConfig is assembled in a stable field order in resolveMemoryBackendConfig.
// Fast stringify avoids deep key-sorting overhead on this hot path.
return `${agentId}:${JSON.stringify(config)}`;
return `${agentId}:${JSON.stringify(config)}:${JSON.stringify(runtimeConfig.syncSettings ?? null)}:${JSON.stringify(runtimeConfig.contextLimits ?? null)}:${runtimeConfig.workspaceDir}`;
}
function resolveQmdManagerRuntimeConfig(
cfg: OpenClawConfig,
agentId: string,
): QmdManagerRuntimeConfig {
return {
workspaceDir: resolveAgentWorkspaceDir(cfg, agentId),
syncSettings: resolveMemorySearchSyncConfig(cfg, agentId),
contextLimits: resolveAgentContextLimits(cfg, agentId),
};
}

View File

@@ -2551,9 +2551,7 @@ describe("gateway server sessions", () => {
expect(deleted.ok).toBe(true);
expect(deleted.payload?.deleted).toBe(true);
expect(subagentLifecycleHookMocks.runSubagentEnded).toHaveBeenCalledTimes(1);
const event = (
subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][]
)[0]?.[0] as
const event = (subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][])[0]?.[0] as
| { targetKind?: string; targetSessionKey?: string; reason?: string; outcome?: string }
| undefined;
expect(event).toMatchObject({
@@ -2869,9 +2867,7 @@ describe("gateway server sessions", () => {
expect(reset.payload?.key).toBe("agent:main:subagent:worker");
expect(reset.payload?.entry.sessionId).not.toBe("sess-subagent");
expect(subagentLifecycleHookMocks.runSubagentEnded).toHaveBeenCalledTimes(1);
const event = (
subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][]
)[0]?.[0] as
const event = (subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][])[0]?.[0] as
| { targetKind?: string; targetSessionKey?: string; reason?: string; outcome?: string }
| undefined;
expect(event).toMatchObject({