From dfe0e49c8a3f4c860231fc2bee7a5736c72985c4 Mon Sep 17 00:00:00 2001 From: Bek <66288351+bek91@users.noreply.github.com> Date: Tue, 21 Apr 2026 18:22:21 -0400 Subject: [PATCH] fix(qmd): Dedup in-flight manager creation so only one full QMD manager arms per agent/config at a time, eliminating the concurrent exportSessions() collisions that triggered path changed during write errors (#65226) Fixes concurrent manager creation races that caused SafeOpenErrors during session export. Deduplicates in-flight manager creation so only one full QMD manager arms per agent/config at a time, eliminating the concurrent exportSessions() collisions that triggered path changed during write errors Resolves and snapshots runtime inputs before cache reuse, replacing stale managers atomically when workspace/config changes, and aborting queued export work promptly on close() --- .github/workflows/ci.yml | 13 +- .../src/memory/qmd-manager.test.ts | 136 +++++++ .../memory-core/src/memory/qmd-manager.ts | 110 ++++- .../src/memory/search-manager.test.ts | 381 +++++++++++++++++- .../memory-core/src/memory/search-manager.ts | 268 +++++++++--- ...sessions.gateway-server-sessions-a.test.ts | 8 +- 6 files changed, 829 insertions(+), 87 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5920630242d..b42dc54b856 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -346,10 +346,21 @@ jobs: if: github.event_name == 'pull_request' env: BASE_SHA: ${{ github.event.pull_request.base.sha }} + BASE_REF: ${{ github.event.pull_request.base.ref }} run: | set -euo pipefail trusted_config="$RUNNER_TEMP/pre-commit-base.yaml" - git show "${BASE_SHA}:.pre-commit-config.yaml" > "$trusted_config" + if git cat-file -e "${BASE_SHA}^{commit}" 2>/dev/null && + git cat-file -e "${BASE_SHA}:.pre-commit-config.yaml" 2>/dev/null; then + git show "${BASE_SHA}:.pre-commit-config.yaml" > "$trusted_config" + elif git show "refs/remotes/origin/${BASE_REF}:.pre-commit-config.yaml" \ + > "$trusted_config" 2>/dev/null; then + echo "Base SHA ${BASE_SHA} does not expose .pre-commit-config.yaml; using origin/${BASE_REF} instead." + else + echo "::warning title=trusted pre-commit config unavailable::Could not read .pre-commit-config.yaml from ${BASE_SHA} or origin/${BASE_REF}; falling back to the checked-out config." + rm -f "$trusted_config" + exit 0 + fi echo "PRE_COMMIT_CONFIG_PATH=$trusted_config" >> "$GITHUB_ENV" - name: Setup Python diff --git a/extensions/memory-core/src/memory/qmd-manager.test.ts b/extensions/memory-core/src/memory/qmd-manager.test.ts index 7fbff7cddaa..cde98b6e4bd 100644 --- a/extensions/memory-core/src/memory/qmd-manager.test.ts +++ b/extensions/memory-core/src/memory/qmd-manager.test.ts @@ -3386,6 +3386,142 @@ describe("QmdMemoryManager", () => { await second.manager.close(); }); + it("serializes session exports across managers for the same agent", async () => { + cfg = { + ...cfg, + memory: { + backend: "qmd", + qmd: { + includeDefaultMemory: false, + update: { interval: "0s", debounceMs: 0, onBoot: false }, + sessions: { enabled: true }, + paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }], + }, + }, + } as OpenClawConfig; + + const sessionsDir = path.join(stateDir, "agents", agentId, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + await fs.writeFile( + path.join(sessionsDir, "session-1.jsonl"), + '{"type":"message","message":{"role":"user","content":"hello"}}\n', + "utf-8", + ); + + const firstEntered = createDeferred(); + const releaseFirst = createDeferred(); + let activeExports = 0; + let overlapped = false; + const exportSpy = vi + .spyOn( + QmdMemoryManager.prototype as unknown as { + exportSessions: () => Promise; + }, + "exportSessions", + ) + .mockImplementation(async () => { + activeExports += 1; + if (activeExports > 1) { + overlapped = true; + } + if (activeExports === 1) { + firstEntered.resolve(); + await releaseFirst.promise; + } + activeExports -= 1; + }); + + const first = await createManager({ mode: "status" }); + const second = await createManager({ mode: "status" }); + + try { + const firstSync = first.manager.sync({ reason: "manual", force: true }); + await firstEntered.promise; + + const secondSync = second.manager.sync({ reason: "manual", force: true }); + await Promise.resolve(); + + expect(exportSpy).toHaveBeenCalledTimes(1); + expect(overlapped).toBe(false); + + releaseFirst.resolve(); + await Promise.all([firstSync, secondSync]); + + expect(exportSpy).toHaveBeenCalledTimes(2); + expect(overlapped).toBe(false); + } finally { + exportSpy.mockRestore(); + await first.manager.close(); + await second.manager.close(); + } + }); + + it("skips queued session export work after close while waiting on the shared update queue", async () => { + cfg = { + ...cfg, + memory: { + backend: "qmd", + qmd: { + includeDefaultMemory: false, + update: { interval: "0s", debounceMs: 0, onBoot: false }, + sessions: { enabled: true }, + paths: [{ path: workspaceDir, pattern: "**/*.md", name: "workspace" }], + }, + }, + } as OpenClawConfig; + + const sessionsDir = path.join(stateDir, "agents", agentId, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + await fs.writeFile( + path.join(sessionsDir, "session-1.jsonl"), + '{"type":"message","message":{"role":"user","content":"hello"}}\n', + "utf-8", + ); + + const firstEntered = createDeferred(); + const releaseFirst = createDeferred(); + const exportSpy = vi + .spyOn( + QmdMemoryManager.prototype as unknown as { + exportSessions: () => Promise; + }, + "exportSessions", + ) + .mockImplementation(async () => { + if (exportSpy.mock.calls.length === 1) { + firstEntered.resolve(); + await releaseFirst.promise; + } + }); + + const first = await createManager({ mode: "status" }); + const second = await createManager({ mode: "status" }); + + try { + const firstSync = first.manager.sync({ reason: "manual", force: true }); + await firstEntered.promise; + + const secondSync = second.manager.sync({ reason: "manual", force: true }); + await Promise.resolve(); + + const closeSecond = second.manager.close(); + await expect(closeSecond).resolves.toBeUndefined(); + + releaseFirst.resolve(); + await Promise.all([firstSync, secondSync]); + + expect(exportSpy).toHaveBeenCalledTimes(1); + const updateCalls = spawnMock.mock.calls + .map((call: unknown[]) => call[1] as string[]) + .filter((args: string[]) => args[0] === "update"); + expect(updateCalls).toHaveLength(1); + } finally { + exportSpy.mockRestore(); + await first.manager.close(); + await second.manager.close(); + } + }); + it("runs qmd embed in search mode for forced sync", async () => { cfg = { ...cfg, diff --git a/extensions/memory-core/src/memory/qmd-manager.ts b/extensions/memory-core/src/memory/qmd-manager.ts index ebed89f00ec..44e7d012971 100644 --- a/extensions/memory-core/src/memory/qmd-manager.ts +++ b/extensions/memory-core/src/memory/qmd-manager.ts @@ -9,6 +9,7 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { withFileLock } from "openclaw/plugin-sdk/file-lock"; import { createSubsystemLogger, + resolveAgentContextLimits, resolveMemorySearchSyncConfig, resolveAgentWorkspaceDir, resolveGlobalSingleton, @@ -16,7 +17,6 @@ import { writeFileWithinRoot, type OpenClawConfig, } from "openclaw/plugin-sdk/memory-core-host-engine-foundation"; -import { resolveAgentContextLimits } from "openclaw/plugin-sdk/memory-core-host-engine-foundation"; import { buildSessionEntry, deriveQmdScopeChannel, @@ -76,6 +76,7 @@ const QMD_EMBED_LOCK_RETRY_TEMPLATE = { } as const; const MCPORTER_STATE_KEY = Symbol.for("openclaw.mcporterState"); const QMD_EMBED_QUEUE_KEY = Symbol.for("openclaw.qmdEmbedQueueTail"); +const QMD_UPDATE_QUEUE_KEY = Symbol.for("openclaw.qmdUpdateQueueState"); const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([ ".git", "node_modules", @@ -120,6 +121,10 @@ type QmdEmbedQueueState = { tail: Promise; }; +type QmdUpdateQueueState = { + tails: Map>; +}; + function getMcporterState(): McporterState { return resolveGlobalSingleton(MCPORTER_STATE_KEY, () => ({ coldStartWarned: false, @@ -133,6 +138,12 @@ function getQmdEmbedQueueState(): QmdEmbedQueueState { })); } +function getQmdUpdateQueueState(): QmdUpdateQueueState { + return resolveGlobalSingleton(QMD_UPDATE_QUEUE_KEY, () => ({ + tails: new Map>(), + })); +} + function _hasHanScript(value: string): boolean { return HAN_SCRIPT_RE.test(value); } @@ -205,6 +216,11 @@ type ManagedCollection = { }; type QmdManagerMode = "full" | "status"; +type QmdManagerRuntimeConfig = { + workspaceDir: string; + syncSettings: ReturnType; + contextLimits: ReturnType; +}; type BuiltinQmdMcpTool = "query" | "search" | "vector_search" | "deep_search"; type QmdMcporterSearchParams = | { @@ -255,20 +271,27 @@ export class QmdMemoryManager implements MemorySearchManager { agentId: string; resolved: ResolvedMemoryBackendConfig; mode?: QmdManagerMode; + runtimeConfig?: QmdManagerRuntimeConfig; }): Promise { const resolved = params.resolved.qmd; if (!resolved) { return null; } - const manager = new QmdMemoryManager({ cfg: params.cfg, agentId: params.agentId, resolved }); + const runtimeConfig = + params.runtimeConfig ?? resolveQmdManagerRuntimeConfig(params.cfg, params.agentId); + const manager = new QmdMemoryManager({ + agentId: params.agentId, + resolved, + runtimeConfig, + }); await manager.initialize(params.mode ?? "full"); return manager; } - private readonly cfg: OpenClawConfig; private readonly agentId: string; private readonly qmd: ResolvedQmdConfig; private readonly workspaceDir: string; + private readonly contextLimits: ReturnType; private readonly stateDir: string; private readonly agentStateDir: string; private readonly qmdDir: string; @@ -303,6 +326,8 @@ export class QmdMemoryManager implements MemorySearchManager { private queuedForcedRuns = 0; private dirty = false; private closed = false; + private readonly closeSignal: Promise; + private resolveCloseSignal!: () => void; private db: SqliteDatabase | null = null; private lastUpdateAt: number | null = null; private lastEmbedAt: number | null = null; @@ -316,18 +341,18 @@ export class QmdMemoryManager implements MemorySearchManager { private collectionPatternFlag: QmdCollectionPatternFlag | null = "--glob"; private constructor(params: { - cfg: OpenClawConfig; agentId: string; resolved: ResolvedQmdConfig; + runtimeConfig: QmdManagerRuntimeConfig; }) { - this.cfg = params.cfg; this.agentId = params.agentId; this.qmd = params.resolved; - this.workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId); + this.workspaceDir = params.runtimeConfig.workspaceDir; + this.contextLimits = params.runtimeConfig.contextLimits; this.stateDir = resolveStateDir(process.env, os.homedir); this.agentStateDir = path.join(this.stateDir, "agents", this.agentId); this.qmdDir = path.join(this.agentStateDir, "qmd"); - this.syncSettings = resolveMemorySearchSyncConfig(params.cfg, params.agentId); + this.syncSettings = params.runtimeConfig.syncSettings; // QMD uses XDG base dirs for its internal state. // Collections are managed via `qmd collection add` and stored inside the index DB. // - config: $XDG_CONFIG_HOME (contexts, etc.) @@ -346,6 +371,9 @@ export class QmdMemoryManager implements MemorySearchManager { XDG_CACHE_HOME: this.xdgCacheHome, NO_COLOR: "1", }; + this.closeSignal = new Promise((resolve) => { + this.resolveCloseSignal = resolve; + }); this.sessionExporter = this.qmd.sessions.enabled ? { dir: this.qmd.sessions.exportDir ?? path.join(this.qmdDir, "sessions"), @@ -1198,7 +1226,7 @@ export class QmdMemoryManager implements MemorySearchManager { if (statResult.missing) { return { text: "", path: relPath }; } - const contextLimits = resolveAgentContextLimits(this.cfg, this.agentId); + const contextLimits = this.contextLimits; if (params.from !== undefined || params.lines !== undefined) { const requestedCount = Math.max( 1, @@ -1307,6 +1335,7 @@ export class QmdMemoryManager implements MemorySearchManager { return; } this.closed = true; + this.resolveCloseSignal(); if (this.updateTimer) { clearInterval(this.updateTimer); this.updateTimer = null; @@ -1356,11 +1385,19 @@ export class QmdMemoryManager implements MemorySearchManager { return; } const run = async () => { - if (this.sessionExporter) { - await this.exportSessions(); + await this.withQmdUpdateQueue(async () => { + if (this.closed) { + return; + } + if (this.sessionExporter) { + await this.exportSessions(); + } + await this.runQmdUpdateWithRetry(reason); + this.dirty = false; + }); + if (this.closed) { + return; } - await this.runQmdUpdateWithRetry(reason); - this.dirty = false; if (this.shouldRunEmbed(force)) { try { await this.withQmdEmbedLock(async () => { @@ -1376,6 +1413,9 @@ export class QmdMemoryManager implements MemorySearchManager { this.noteEmbedFailure(reason, err); } } + if (this.closed) { + return; + } this.lastUpdateAt = Date.now(); this.docPathCache.clear(); }; @@ -1573,6 +1613,41 @@ export class QmdMemoryManager implements MemorySearchManager { } } + private async withQmdUpdateQueue(task: () => Promise): Promise { + const queue = getQmdUpdateQueueState(); + const key = this.qmdDir; + const previous = queue.tails.get(key) ?? Promise.resolve(); + let releaseCurrent!: () => void; + const current = new Promise((resolve) => { + releaseCurrent = resolve; + }); + const next = previous.then( + () => current, + () => current, + ); + queue.tails.set(key, next); + try { + const waitResult = await Promise.race([ + previous.then( + () => "ready" as const, + () => "ready" as const, + ), + this.closeSignal.then(() => "closed" as const), + ]); + if (waitResult === "closed") { + return undefined as T; + } + return await task(); + } finally { + releaseCurrent(); + void next.finally(() => { + if (queue.tails.get(key) === next) { + queue.tails.delete(key); + } + }); + } + } + private noteEmbedFailure(reason: string, err: unknown): void { this.embedFailureCount += 1; const delayMs = Math.min( @@ -2896,3 +2971,14 @@ export class QmdMemoryManager implements MemorySearchManager { return [command, normalizedQuery, "--json", "-n", String(limit)]; } } + +function resolveQmdManagerRuntimeConfig( + cfg: OpenClawConfig, + agentId: string, +): QmdManagerRuntimeConfig { + return { + workspaceDir: resolveAgentWorkspaceDir(cfg, agentId), + syncSettings: resolveMemorySearchSyncConfig(cfg, agentId), + contextLimits: resolveAgentContextLimits(cfg, agentId), + }; +} diff --git a/extensions/memory-core/src/memory/search-manager.test.ts b/extensions/memory-core/src/memory/search-manager.test.ts index d86e96e73ab..034cce1a3f2 100644 --- a/extensions/memory-core/src/memory/search-manager.test.ts +++ b/extensions/memory-core/src/memory/search-manager.test.ts @@ -126,13 +126,18 @@ import { QmdMemoryManager } from "./qmd-manager.js"; import { closeAllMemorySearchManagers, getMemorySearchManager } from "./search-manager.js"; const createQmdManagerMock = vi.mocked(QmdMemoryManager.create); +type QmdManagerInstance = Awaited>; type SearchManagerResult = Awaited>; type SearchManager = NonNullable; -function createQmdCfg(agentId: string): OpenClawConfig { +function createQmdCfg( + agentId: string, + workspace: string = "/tmp/workspace", + qmd: Record = {}, +): OpenClawConfig { return { - memory: { backend: "qmd", qmd: {} }, - agents: { list: [{ id: agentId, default: true, workspace: "/tmp/workspace" }] }, + memory: { backend: "qmd", qmd }, + agents: { list: [{ id: agentId, default: true, workspace }] }, }; } @@ -167,6 +172,16 @@ function requireManager(result: SearchManagerResult): SearchManager { return result.manager; } +function createDeferred() { + let resolve!: (value: T) => void; + let reject!: (reason?: unknown) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { promise, resolve, reject }; +} + async function createFailedQmdSearchHarness(params: { agentId: string; errorMessage: string }) { const cfg = createQmdCfg(params.agentId); mockPrimary.search.mockRejectedValueOnce(new Error(params.errorMessage)); @@ -296,6 +311,296 @@ describe("getMemorySearchManager caching", () => { expect(checkQmdBinaryAvailability).toHaveBeenCalledTimes(1); }); + it("reuses cached full qmd manager across normalized agent ids", async () => { + const cfg = createQmdCfg("Main-Agent"); + + const first = await getMemorySearchManager({ cfg, agentId: "Main-Agent" }); + const second = await getMemorySearchManager({ cfg, agentId: "main-agent" }); + + requireManager(first); + requireManager(second); + expect(first.manager).toBe(second.manager); + expect(createQmdManagerMock).toHaveBeenCalledTimes(1); + expect(createQmdManagerMock.mock.calls[0]?.[0]).toEqual( + expect.objectContaining({ agentId: "main-agent" }), + ); + }); + + it("replaces cached full qmd manager across different workspaces", async () => { + const agentId = "cached-qmd-workspace-reload"; + const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a"); + const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b"); + const firstPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const secondPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + createQmdManagerMock + .mockImplementationOnce(async () => firstPrimary as unknown as QmdManagerInstance) + .mockImplementationOnce(async () => secondPrimary as unknown as QmdManagerInstance); + + const first = await getMemorySearchManager({ cfg: firstCfg, agentId }); + const firstManager = requireManager(first); + const second = await getMemorySearchManager({ cfg: secondCfg, agentId }); + const secondManager = requireManager(second); + + expect(firstManager).not.toBe(secondManager); + expect(createQmdManagerMock).toHaveBeenCalledTimes(2); + expect(firstPrimary.close).toHaveBeenCalledTimes(1); + await expect(firstManager.search("hello")).rejects.toThrow("replaced by a newer qmd manager"); + expect(() => firstManager.status()).toThrow("replaced by a newer qmd manager"); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace-a", + }); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace-b", + }); + }); + + it("replaces cached full qmd manager when context limits change", async () => { + const agentId = "cached-qmd-context-limits-reload"; + const firstCfg = createQmdCfg(agentId, "/tmp/workspace"); + const secondCfg = { + ...createQmdCfg(agentId, "/tmp/workspace"), + agents: { + list: [ + { + id: agentId, + default: true, + workspace: "/tmp/workspace", + contextLimits: { + memoryGetMaxChars: 24_000, + memoryGetDefaultLines: 180, + }, + }, + ], + }, + } as OpenClawConfig; + const firstPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const secondPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + createQmdManagerMock + .mockImplementationOnce(async () => firstPrimary as unknown as QmdManagerInstance) + .mockImplementationOnce(async () => secondPrimary as unknown as QmdManagerInstance); + + const first = await getMemorySearchManager({ cfg: firstCfg, agentId }); + const second = await getMemorySearchManager({ cfg: secondCfg, agentId }); + + requireManager(first); + requireManager(second); + expect(first.manager).not.toBe(second.manager); + expect(createQmdManagerMock).toHaveBeenCalledTimes(2); + expect(firstPrimary.close).toHaveBeenCalledTimes(1); + }); + + it("keeps the existing cached full qmd manager when replacement creation fails", async () => { + const agentId = "cached-qmd-failed-replacement"; + const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a"); + const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b"); + const firstPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + createQmdManagerMock.mockImplementationOnce( + async () => firstPrimary as unknown as QmdManagerInstance, + ); + checkQmdBinaryAvailability + .mockResolvedValueOnce({ available: true }) + .mockResolvedValueOnce({ available: false, error: "spawn qmd ENOENT" }); + + const first = await getMemorySearchManager({ cfg: firstCfg, agentId }); + const firstManager = requireManager(first); + const replacementAttempt = await getMemorySearchManager({ cfg: secondCfg, agentId }); + + expect(replacementAttempt.manager).toBe(fallbackManager); + expect(firstPrimary.close).not.toHaveBeenCalled(); + await expect(firstManager.search("hello")).resolves.toEqual([]); + + const firstAgain = await getMemorySearchManager({ cfg: firstCfg, agentId }); + expect(firstAgain.manager).toBe(firstManager); + expect(createQmdManagerMock).toHaveBeenCalledTimes(1); + }); + + it("dedupes concurrent full qmd manager creation for the same agent", async () => { + const agentId = "pending-qmd"; + const cfg = createQmdCfg(agentId); + const createGate = createDeferred(); + createQmdManagerMock.mockImplementationOnce(async () => await createGate.promise); + + const firstPromise = getMemorySearchManager({ cfg, agentId }); + const secondPromise = getMemorySearchManager({ cfg, agentId }); + + createGate.resolve(mockPrimary as unknown as QmdManagerInstance); + const [first, second] = await Promise.all([firstPromise, secondPromise]); + + requireManager(first); + requireManager(second); + expect(first.manager).toBe(second.manager); + expect(createQmdManagerMock).toHaveBeenCalledTimes(1); + expect(checkQmdBinaryAvailability).toHaveBeenCalledTimes(1); + }); + + it("serializes pending full qmd creation before replacing it for a different workspace", async () => { + const agentId = "pending-qmd-workspace-reload"; + const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a"); + const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b"); + const firstPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const secondPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const firstGate = createDeferred(); + const secondGate = createDeferred(); + createQmdManagerMock + .mockImplementationOnce(async () => await firstGate.promise) + .mockImplementationOnce(async () => await secondGate.promise); + + const firstPromise = getMemorySearchManager({ cfg: firstCfg, agentId }); + await Promise.resolve(); + const secondPromise = getMemorySearchManager({ cfg: secondCfg, agentId }); + await vi.waitFor(() => { + expect(createQmdManagerMock).toHaveBeenCalledTimes(1); + }); + + firstGate.resolve(firstPrimary as unknown as QmdManagerInstance); + await vi.waitFor(() => { + expect(createQmdManagerMock).toHaveBeenCalledTimes(2); + }); + + secondGate.resolve(secondPrimary as unknown as QmdManagerInstance); + const [first, second] = await Promise.all([firstPromise, secondPromise]); + + requireManager(first); + requireManager(second); + expect(first.manager).not.toBe(second.manager); + expect(firstPrimary.close).toHaveBeenCalledTimes(1); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace-a", + }); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace-b", + }); + }); + + it("serializes pending full qmd creation before replacing it for a different qmd config", async () => { + const agentId = "pending-qmd-config-reload"; + const firstCfg = createQmdCfg(agentId, "/tmp/workspace", { command: "qmd" }); + const secondCfg = createQmdCfg(agentId, "/tmp/workspace", { command: "qmd-alt" }); + const firstPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const secondPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const firstGate = createDeferred(); + const secondGate = createDeferred(); + createQmdManagerMock + .mockImplementationOnce(async () => await firstGate.promise) + .mockImplementationOnce(async () => await secondGate.promise); + + const firstPromise = getMemorySearchManager({ cfg: firstCfg, agentId }); + await Promise.resolve(); + const secondPromise = getMemorySearchManager({ cfg: secondCfg, agentId }); + await vi.waitFor(() => { + expect(createQmdManagerMock).toHaveBeenCalledTimes(1); + }); + + firstGate.resolve(firstPrimary as unknown as QmdManagerInstance); + await vi.waitFor(() => { + expect(createQmdManagerMock).toHaveBeenCalledTimes(2); + }); + + secondGate.resolve(secondPrimary as unknown as QmdManagerInstance); + const [first, second] = await Promise.all([firstPromise, secondPromise]); + + requireManager(first); + requireManager(second); + expect(first.manager).not.toBe(second.manager); + expect(firstPrimary.close).toHaveBeenCalledTimes(1); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace", + }); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, { + command: "qmd-alt", + env: process.env, + cwd: "/tmp/workspace", + }); + }); + + it("reuses pending full qmd creation when raw cfg differs but qmd inputs match", async () => { + const agentId = "pending-qmd-unrelated-config"; + const firstCfg = createQmdCfg(agentId); + const secondCfg = { + ...createQmdCfg(agentId), + session: { store: "/tmp/alternate-session-store.json" }, + } as OpenClawConfig; + const createGate = createDeferred(); + createQmdManagerMock.mockImplementationOnce(async () => await createGate.promise); + + const firstPromise = getMemorySearchManager({ cfg: firstCfg, agentId }); + await Promise.resolve(); + const secondPromise = getMemorySearchManager({ cfg: secondCfg, agentId }); + + createGate.resolve(mockPrimary as unknown as QmdManagerInstance); + const [first, second] = await Promise.all([firstPromise, secondPromise]); + + requireManager(first); + requireManager(second); + expect(createQmdManagerMock).toHaveBeenCalledTimes(1); + expect(first.manager).toBe(second.manager); + expect(checkQmdBinaryAvailability).toHaveBeenCalledTimes(1); + }); + it("does not cache qmd managers for status-only requests", async () => { const agentId = "status-agent"; const cfg = createQmdCfg(agentId); @@ -400,6 +705,51 @@ describe("getMemorySearchManager caching", () => { expect(fullAgain.manager).toBe(full.manager); }); + it("does not borrow a cached full qmd manager for status across different workspaces", async () => { + const agentId = "status-workspace-reload"; + const firstCfg = createQmdCfg(agentId, "/tmp/workspace-a"); + const secondCfg = createQmdCfg(agentId, "/tmp/workspace-b"); + const firstPrimary = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + const secondStatusManager = createManagerMock({ + backend: "qmd", + provider: "qmd", + model: "qmd", + requestedProvider: "qmd", + withMemorySourceCounts: true, + }); + createQmdManagerMock + .mockImplementationOnce(async () => firstPrimary as unknown as QmdManagerInstance) + .mockImplementationOnce(async () => secondStatusManager as unknown as QmdManagerInstance); + + const full = await getMemorySearchManager({ cfg: firstCfg, agentId }); + const fullManager = requireManager(full); + const status = await getMemorySearchManager({ cfg: secondCfg, agentId, purpose: "status" }); + + requireManager(status); + expect(status.manager).toBe(secondStatusManager); + expect(createQmdManagerMock.mock.calls).toHaveLength(2); + expect(firstPrimary.close).not.toHaveBeenCalled(); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(1, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace-a", + }); + expect(checkQmdBinaryAvailability).toHaveBeenNthCalledWith(2, { + command: "qmd", + env: process.env, + cwd: "/tmp/workspace-b", + }); + + const fullAgain = await getMemorySearchManager({ cfg: firstCfg, agentId }); + expect(fullAgain.manager).toBe(fullManager); + }); + it("gets a fresh qmd manager for later status requests after close", async () => { const agentId = "status-eviction-agent"; const cfg = createQmdCfg(agentId); @@ -478,6 +828,31 @@ describe("getMemorySearchManager caching", () => { expect(createQmdManagerMock.mock.calls).toHaveLength(2); }); + it("waits for pending full qmd manager creation during global teardown", async () => { + const agentId = "teardown-pending-qmd"; + const cfg = createQmdCfg(agentId); + const createGate = createDeferred(); + createQmdManagerMock.mockImplementationOnce(async () => await createGate.promise); + + const firstPromise = getMemorySearchManager({ cfg, agentId }); + await Promise.resolve(); + + const closePromise = closeAllMemorySearchManagers(); + await Promise.resolve(); + + createGate.resolve(mockPrimary as unknown as QmdManagerInstance); + + const first = await firstPromise; + const firstManager = requireManager(first); + await closePromise; + + expect(mockPrimary.close).toHaveBeenCalledTimes(1); + + const second = await getMemorySearchManager({ cfg, agentId }); + expect(second.manager).not.toBe(firstManager); + expect(createQmdManagerMock.mock.calls).toHaveLength(2); + }); + it("closes builtin index managers on teardown after runtime is loaded", async () => { const retryAgentId = "teardown-with-fallback"; const { manager } = await createFailedQmdSearchHarness({ diff --git a/extensions/memory-core/src/memory/search-manager.ts b/extensions/memory-core/src/memory/search-manager.ts index fc8d6ed7ed9..365d7c847dd 100644 --- a/extensions/memory-core/src/memory/search-manager.ts +++ b/extensions/memory-core/src/memory/search-manager.ts @@ -2,8 +2,10 @@ import fs from "node:fs/promises"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { createSubsystemLogger, + resolveAgentContextLimits, resolveAgentWorkspaceDir, resolveGlobalSingleton, + resolveMemorySearchSyncConfig, type OpenClawConfig, } from "openclaw/plugin-sdk/memory-core-host-engine-foundation"; import { checkQmdBinaryAvailability } from "openclaw/plugin-sdk/memory-core-host-engine-qmd"; @@ -15,10 +17,29 @@ import { type MemorySyncProgressUpdate, type ResolvedQmdConfig, } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; +import { normalizeAgentId } from "openclaw/plugin-sdk/routing"; const MEMORY_SEARCH_MANAGER_CACHE_KEY = Symbol.for("openclaw.memorySearchManagerCache"); +type Maybe = T | null; +type QmdManagerRuntimeConfig = { + workspaceDir: string; + syncSettings: ReturnType; + contextLimits: ReturnType; +}; + +type CachedQmdManagerEntry = { + identityKey: string; + manager: MemorySearchManager; +}; + +type PendingQmdManagerCreate = { + identityKey: string; + promise: Promise>; +}; + type MemorySearchManagerCacheStore = { - qmdManagerCache: Map; + qmdManagerCache: Map; + pendingQmdManagerCreates: Map; }; function getMemorySearchManagerCacheStore(): MemorySearchManagerCacheStore { @@ -26,13 +47,17 @@ function getMemorySearchManagerCacheStore(): MemorySearchManagerCacheStore { return resolveGlobalSingleton( MEMORY_SEARCH_MANAGER_CACHE_KEY, () => ({ - qmdManagerCache: new Map(), + qmdManagerCache: new Map(), + pendingQmdManagerCreates: new Map(), }), ); } const log = createSubsystemLogger("memory"); -const { qmdManagerCache: QMD_MANAGER_CACHE } = getMemorySearchManagerCacheStore(); +const { + qmdManagerCache: QMD_MANAGER_CACHE, + pendingQmdManagerCreates: PENDING_QMD_MANAGER_CREATES, +} = getMemorySearchManagerCacheStore(); let managerRuntimePromise: Promise | null = null; let qmdManagerModulePromise: Promise | null = null; @@ -47,7 +72,7 @@ function loadQmdManagerModule() { } export type MemorySearchManagerResult = { - manager: MemorySearchManager | null; + manager: Maybe; error?: string; }; @@ -58,74 +83,134 @@ export async function getMemorySearchManager(params: { }): Promise { const resolved = resolveMemoryBackendConfig(params); if (resolved.backend === "qmd" && resolved.qmd) { + const qmdResolved = resolved.qmd; + const normalizedAgentId = normalizeAgentId(params.agentId); + const runtimeConfig = resolveQmdManagerRuntimeConfig(params.cfg, normalizedAgentId); + const { workspaceDir } = runtimeConfig; const statusOnly = params.purpose === "status"; - const baseCacheKey = buildQmdCacheKey(params.agentId, resolved.qmd); - const cacheKey = `${baseCacheKey}:${statusOnly ? "status" : "full"}`; - const cached = QMD_MANAGER_CACHE.get(cacheKey); - if (cached) { - return { manager: cached }; - } - if (statusOnly) { - const fullCached = QMD_MANAGER_CACHE.get(`${baseCacheKey}:full`); - if (fullCached) { - // Status callers often close the manager they receive. Wrap the live - // full manager with a no-op close so health/status probes do not tear - // down the active QMD manager for the process. - return { manager: new BorrowedMemoryManager(fullCached) }; + const scopeKey = buildQmdManagerScopeKey(normalizedAgentId); + const identityKey = buildQmdManagerIdentityKey(normalizedAgentId, qmdResolved, runtimeConfig); + + const createPrimaryQmdManager = async ( + mode: "full" | "status", + ): Promise> => { + try { + await fs.mkdir(workspaceDir, { recursive: true }); + } catch (err) { + log.warn( + `qmd workspace unavailable (${workspaceDir}); falling back to builtin: ${formatErrorMessage(err)}`, + ); + return null; } - } - const workspaceDir = resolveAgentWorkspaceDir(params.cfg, params.agentId); - try { - await fs.mkdir(workspaceDir, { recursive: true }); - } catch (err) { - log.warn( - `qmd workspace unavailable (${workspaceDir}); falling back to builtin: ${formatErrorMessage(err)}`, - ); - return await getBuiltinMemorySearchManager(params); - } - - const qmdBinary = await checkQmdBinaryAvailability({ - command: resolved.qmd.command, - env: process.env, - cwd: workspaceDir, - }); - if (!qmdBinary.available) { - log.warn( - `qmd binary unavailable (${resolved.qmd.command}); falling back to builtin: ${qmdBinary.error ?? "unknown error"}`, - ); - } else { + const qmdBinary = await checkQmdBinaryAvailability({ + command: qmdResolved.command, + env: process.env, + cwd: workspaceDir, + }); + if (!qmdBinary.available) { + log.warn( + `qmd binary unavailable (${qmdResolved.command}); falling back to builtin: ${qmdBinary.error ?? "unknown error"}`, + ); + return null; + } try { const { QmdMemoryManager } = await loadQmdManagerModule(); const primary = await QmdMemoryManager.create({ cfg: params.cfg, - agentId: params.agentId, - resolved, - mode: statusOnly ? "status" : "full", + agentId: normalizedAgentId, + resolved: { ...resolved, qmd: qmdResolved }, + mode, + runtimeConfig, }); if (primary) { - if (statusOnly) { - return { manager: primary }; - } - const wrapper = new FallbackMemoryManager( - { - primary, - fallbackFactory: async () => { - const { MemoryIndexManager } = await loadManagerRuntime(); - return await MemoryIndexManager.get(params); - }, - }, - () => { - QMD_MANAGER_CACHE.delete(cacheKey); - }, - ); - QMD_MANAGER_CACHE.set(cacheKey, wrapper); - return { manager: wrapper }; + return primary; } } catch (err) { const message = formatErrorMessage(err); log.warn(`qmd memory unavailable; falling back to builtin: ${message}`); } + return null; + }; + + const createFullQmdManager = async ( + expectedIdentityKey: string, + ): Promise> => { + const primary = await createPrimaryQmdManager("full"); + if (!primary) { + return null; + } + let cacheEntry!: CachedQmdManagerEntry; + const wrapper = new FallbackMemoryManager( + { + primary, + fallbackFactory: async () => { + const { MemoryIndexManager } = await loadManagerRuntime(); + return await MemoryIndexManager.get(params); + }, + }, + () => { + const current = QMD_MANAGER_CACHE.get(scopeKey); + if (current === cacheEntry) { + QMD_MANAGER_CACHE.delete(scopeKey); + } + }, + ); + cacheEntry = { + identityKey: expectedIdentityKey, + manager: wrapper, + }; + return cacheEntry; + }; + + while (true) { + const cached = QMD_MANAGER_CACHE.get(scopeKey); + const cachedMatchesIdentity = cached?.identityKey === identityKey; + if (cachedMatchesIdentity) { + if (statusOnly) { + // Status callers often close the manager they receive. Wrap the live + // full manager with a no-op close so health/status probes do not tear + // down the active QMD manager for the process. + return { manager: new BorrowedMemoryManager(cached.manager) }; + } + return { manager: cached.manager }; + } + + if (statusOnly) { + const manager = await createPrimaryQmdManager("status"); + return manager ? { manager } : await getBuiltinMemorySearchManager(params); + } + + const pending = PENDING_QMD_MANAGER_CREATES.get(scopeKey); + if (pending) { + await pending.promise; + continue; + } + + const pendingCreate: PendingQmdManagerCreate = { + identityKey, + promise: (async () => { + const created = await createFullQmdManager(identityKey); + if (!created) { + return null; + } + QMD_MANAGER_CACHE.set(scopeKey, created); + if (cached) { + await closeQmdManagerForReplacement(cached.manager).catch((err) => { + log.warn(`failed to retire replaced qmd memory manager: ${formatErrorMessage(err)}`); + }); + } + return created.manager; + })().finally(() => { + const currentPending = PENDING_QMD_MANAGER_CREATES.get(scopeKey); + if (currentPending === pendingCreate) { + PENDING_QMD_MANAGER_CREATES.delete(scopeKey); + } + }), + }; + PENDING_QMD_MANAGER_CREATES.set(scopeKey, pendingCreate); + const manager = await pendingCreate.promise; + return manager ? { manager } : await getBuiltinMemorySearchManager(params); } } @@ -192,7 +277,10 @@ class BorrowedMemoryManager implements MemorySearchManager { } export async function closeAllMemorySearchManagers(): Promise { - const managers = Array.from(QMD_MANAGER_CACHE.values()); + const pendingCreates = Array.from(PENDING_QMD_MANAGER_CREATES.values(), (entry) => entry.promise); + await Promise.allSettled(pendingCreates); + const managers = Array.from(QMD_MANAGER_CACHE.values(), (entry) => entry.manager); + PENDING_QMD_MANAGER_CREATES.clear(); QMD_MANAGER_CACHE.clear(); for (const manager of managers) { try { @@ -208,15 +296,17 @@ export async function closeAllMemorySearchManagers(): Promise { } class FallbackMemoryManager implements MemorySearchManager { - private fallback: MemorySearchManager | null = null; + private fallback: Maybe = null; private primaryFailed = false; private lastError?: string; private cacheEvicted = false; + private closed = false; + private closeReason = "memory search manager is closed"; constructor( private readonly deps: { primary: MemorySearchManager; - fallbackFactory: () => Promise; + fallbackFactory: () => Promise>; }, private readonly onClose?: () => void, ) {} @@ -231,6 +321,7 @@ class FallbackMemoryManager implements MemorySearchManager { onDebug?: (debug: MemorySearchRuntimeDebug) => void; }, ) { + this.ensureOpen(); if (!this.primaryFailed) { try { return await this.deps.primary.search(query, opts); @@ -251,6 +342,7 @@ class FallbackMemoryManager implements MemorySearchManager { } async readFile(params: { relPath: string; from?: number; lines?: number }) { + this.ensureOpen(); if (!this.primaryFailed) { return await this.deps.primary.readFile(params); } @@ -262,6 +354,7 @@ class FallbackMemoryManager implements MemorySearchManager { } status() { + this.ensureOpen(); if (!this.primaryFailed) { return this.deps.primary.status(); } @@ -296,6 +389,7 @@ class FallbackMemoryManager implements MemorySearchManager { sessionFiles?: string[]; progress?: (update: MemorySyncProgressUpdate) => void; }) { + this.ensureOpen(); if (!this.primaryFailed) { await this.deps.primary.sync?.(params); return; @@ -305,6 +399,7 @@ class FallbackMemoryManager implements MemorySearchManager { } async probeEmbeddingAvailability(): Promise { + this.ensureOpen(); if (!this.primaryFailed) { return await this.deps.primary.probeEmbeddingAvailability(); } @@ -316,6 +411,7 @@ class FallbackMemoryManager implements MemorySearchManager { } async probeVectorAvailability() { + this.ensureOpen(); if (!this.primaryFailed) { return await this.deps.primary.probeVectorAvailability(); } @@ -324,16 +420,25 @@ class FallbackMemoryManager implements MemorySearchManager { } async close() { + if (this.closed) { + return; + } + this.closed = true; await this.deps.primary.close?.(); await this.fallback?.close?.(); this.evictCacheEntry(); } - private async ensureFallback(): Promise { + async invalidate(reason: string) { + this.closeReason = reason; + await this.close(); + } + + private async ensureFallback(): Promise> { if (this.fallback) { return this.fallback; } - let fallback: MemorySearchManager | null; + let fallback: Maybe; try { fallback = await this.deps.fallbackFactory(); if (!fallback) { @@ -349,6 +454,12 @@ class FallbackMemoryManager implements MemorySearchManager { return this.fallback; } + private ensureOpen(): void { + if (this.closed) { + throw new Error(this.closeReason); + } + } + private evictCacheEntry(): void { if (this.cacheEvicted) { return; @@ -358,8 +469,35 @@ class FallbackMemoryManager implements MemorySearchManager { } } -function buildQmdCacheKey(agentId: string, config: ResolvedQmdConfig): string { +async function closeQmdManagerForReplacement(manager: MemorySearchManager): Promise { + if (manager instanceof FallbackMemoryManager) { + await manager.invalidate("memory search manager was replaced by a newer qmd manager"); + return; + } + await manager.close?.(); +} + +function buildQmdManagerScopeKey(agentId: string): string { + return agentId; +} + +function buildQmdManagerIdentityKey( + agentId: string, + config: ResolvedQmdConfig, + runtimeConfig: QmdManagerRuntimeConfig, +): string { // ResolvedQmdConfig is assembled in a stable field order in resolveMemoryBackendConfig. // Fast stringify avoids deep key-sorting overhead on this hot path. - return `${agentId}:${JSON.stringify(config)}`; + return `${agentId}:${JSON.stringify(config)}:${JSON.stringify(runtimeConfig.syncSettings ?? null)}:${JSON.stringify(runtimeConfig.contextLimits ?? null)}:${runtimeConfig.workspaceDir}`; +} + +function resolveQmdManagerRuntimeConfig( + cfg: OpenClawConfig, + agentId: string, +): QmdManagerRuntimeConfig { + return { + workspaceDir: resolveAgentWorkspaceDir(cfg, agentId), + syncSettings: resolveMemorySearchSyncConfig(cfg, agentId), + contextLimits: resolveAgentContextLimits(cfg, agentId), + }; } diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index 5d6d9f190ae..1b356e2919d 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -2551,9 +2551,7 @@ describe("gateway server sessions", () => { expect(deleted.ok).toBe(true); expect(deleted.payload?.deleted).toBe(true); expect(subagentLifecycleHookMocks.runSubagentEnded).toHaveBeenCalledTimes(1); - const event = ( - subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][] - )[0]?.[0] as + const event = (subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][])[0]?.[0] as | { targetKind?: string; targetSessionKey?: string; reason?: string; outcome?: string } | undefined; expect(event).toMatchObject({ @@ -2869,9 +2867,7 @@ describe("gateway server sessions", () => { expect(reset.payload?.key).toBe("agent:main:subagent:worker"); expect(reset.payload?.entry.sessionId).not.toBe("sess-subagent"); expect(subagentLifecycleHookMocks.runSubagentEnded).toHaveBeenCalledTimes(1); - const event = ( - subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][] - )[0]?.[0] as + const event = (subagentLifecycleHookMocks.runSubagentEnded.mock.calls as unknown[][])[0]?.[0] as | { targetKind?: string; targetSessionKey?: string; reason?: string; outcome?: string } | undefined; expect(event).toMatchObject({