From f4d8393bf4c1f615bbdb38fa3b237f741ea99aa1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 6 Apr 2026 20:21:05 +0100 Subject: [PATCH] perf: extract memory manager state helpers --- .../memory-core/src/memory/index.test.ts | 700 ------------------ .../src/memory/manager-reindex-state.test.ts | 160 ++++ .../src/memory/manager-reindex-state.ts | 103 +++ .../src/memory/manager-source-state.test.ts | 87 +++ .../src/memory/manager-source-state.ts | 49 ++ .../src/memory/manager-status-state.test.ts | 45 +- .../src/memory/manager-status-state.ts | 60 ++ .../src/memory/manager-sync-ops.ts | 250 +++---- .../src/memory/manager-targeted-sync.test.ts | 78 ++ .../src/memory/manager-targeted-sync.ts | 86 +++ 10 files changed, 760 insertions(+), 858 deletions(-) create mode 100644 extensions/memory-core/src/memory/manager-reindex-state.test.ts create mode 100644 extensions/memory-core/src/memory/manager-reindex-state.ts create mode 100644 extensions/memory-core/src/memory/manager-source-state.test.ts create mode 100644 extensions/memory-core/src/memory/manager-source-state.ts create mode 100644 extensions/memory-core/src/memory/manager-targeted-sync.test.ts create mode 100644 extensions/memory-core/src/memory/manager-targeted-sync.ts diff --git a/extensions/memory-core/src/memory/index.test.ts b/extensions/memory-core/src/memory/index.test.ts index fabf16cc755..4782cbf6730 100644 --- a/extensions/memory-core/src/memory/index.test.ts +++ b/extensions/memory-core/src/memory/index.test.ts @@ -116,26 +116,7 @@ describe("memory index", () => { let indexMainPath = ""; let indexExtraPath = ""; let indexMultimodalPath = ""; - let indexSourceChangePath = ""; - let indexModelPath = ""; let indexFtsOnlyPath = ""; - let sourceChangeStateDir = ""; - const sourceChangeSessionLogLines = [ - JSON.stringify({ - type: "message", - message: { - role: "user", - content: [{ type: "text", text: "session change test user line" }], - }, - }), - JSON.stringify({ - type: "message", - message: { - role: "assistant", - content: [{ type: "text", text: "session change test assistant line" }], - }, - }), - ].join("\n"); const managersForCleanup = new Set(); @@ -148,10 +129,7 @@ describe("memory index", () => { indexVectorPath = path.join(workspaceDir, "index-vector.sqlite"); indexExtraPath = path.join(workspaceDir, "index-extra.sqlite"); indexMultimodalPath = path.join(workspaceDir, "index-multimodal.sqlite"); - indexSourceChangePath = path.join(workspaceDir, "index-source-change.sqlite"); - indexModelPath = path.join(workspaceDir, "index-model-change.sqlite"); indexFtsOnlyPath = path.join(workspaceDir, "index-fts-only.sqlite"); - sourceChangeStateDir = path.join(fixtureRoot, "state-source-change"); }); afterAll(async () => { @@ -419,61 +397,6 @@ describe("memory index", () => { await manager.close?.(); }); - it("reindexes sessions when source config adds sessions to an existing index", async () => { - const stateDir = sourceChangeStateDir; - const sessionDir = path.join(stateDir, "agents", "main", "sessions"); - await fs.rm(stateDir, { recursive: true, force: true }); - await fs.mkdir(sessionDir, { recursive: true }); - await fs.writeFile( - path.join(sessionDir, "session-source-change.jsonl"), - `${sourceChangeSessionLogLines}\n`, - ); - - const previousStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = stateDir; - - const firstCfg = createCfg({ - storePath: indexSourceChangePath, - sources: ["memory"], - sessionMemory: false, - }); - const secondCfg = createCfg({ - storePath: indexSourceChangePath, - sources: ["memory", "sessions"], - sessionMemory: true, - }); - - try { - const first = await getMemorySearchManager({ cfg: firstCfg, agentId: "main" }); - const firstManager = requireManager(first); - await firstManager.sync?.({ reason: "test" }); - const firstStatus = firstManager.status(); - expect( - firstStatus.sourceCounts?.find((entry) => entry.source === "sessions")?.files ?? 0, - ).toBe(0); - await firstManager.close?.(); - - const second = await getMemorySearchManager({ cfg: secondCfg, agentId: "main" }); - const secondManager = requireManager(second); - await secondManager.sync?.({ reason: "test" }); - const secondStatus = secondManager.status(); - expect(secondStatus.sourceCounts?.find((entry) => entry.source === "sessions")?.files).toBe( - 1, - ); - expect( - secondStatus.sourceCounts?.find((entry) => entry.source === "sessions")?.chunks ?? 0, - ).toBeGreaterThan(0); - await secondManager.close?.(); - } finally { - if (previousStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = previousStateDir; - } - await fs.rm(stateDir, { recursive: true, force: true }); - } - }); - it("targets explicit session files during post-compaction sync", async () => { const stateDir = path.join(fixtureRoot, `state-targeted-${randomUUID()}`); const sessionDir = path.join(stateDir, "agents", "main", "sessions"); @@ -550,41 +473,13 @@ describe("memory index", () => { })}\n`, ); - const originalPrepare = db.prepare.bind(db); - let bulkSessionStateAllCalls = 0; - let perFileSessionHashPrepareCalls = 0; - db.prepare = ((sql: string) => { - const statement = originalPrepare(sql); - if (sql === `SELECT path, hash FROM files WHERE source = ?`) { - if (!statement.all) { - throw new Error("expected sqlite statement.all for bulk session state query"); - } - const bulkAll = statement.all.bind(statement); - return { - ...statement, - all: (...args: unknown[]) => { - bulkSessionStateAllCalls += 1; - return bulkAll(...args); - }, - }; - } - if (sql === `SELECT hash FROM files WHERE path = ? AND source = ?`) { - perFileSessionHashPrepareCalls += 1; - } - return statement; - }) as typeof db.prepare; - await manager.sync?.({ reason: "post-compaction", sessionFiles: [firstSessionPath], }); - db.prepare = originalPrepare; - expect(getSessionHash("sessions/targeted-first.jsonl")).not.toBe(firstOriginalHash); expect(getSessionHash("sessions/targeted-second.jsonl")).toBe(secondOriginalHash); - expect(bulkSessionStateAllCalls).toBe(0); - expect(perFileSessionHashPrepareCalls).toBeGreaterThan(0); await manager.close?.(); } finally { if (previousStateDir === undefined) { @@ -596,368 +491,6 @@ describe("memory index", () => { } }); - it("preserves unrelated dirty sessions after targeted post-compaction sync", async () => { - const stateDir = path.join(fixtureRoot, `state-targeted-dirty-${randomUUID()}`); - const sessionDir = path.join(stateDir, "agents", "main", "sessions"); - const firstSessionPath = path.join(sessionDir, "targeted-dirty-first.jsonl"); - const secondSessionPath = path.join(sessionDir, "targeted-dirty-second.jsonl"); - const storePath = path.join(workspaceDir, `index-targeted-dirty-${randomUUID()}.sqlite`); - const previousStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = stateDir; - - await fs.mkdir(sessionDir, { recursive: true }); - await fs.writeFile( - firstSessionPath, - `${JSON.stringify({ - type: "message", - message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] }, - })}\n`, - ); - await fs.writeFile( - secondSessionPath, - `${JSON.stringify({ - type: "message", - message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] }, - })}\n`, - ); - - try { - const manager = requireManager( - await getMemorySearchManager({ - cfg: createCfg({ - storePath, - sources: ["sessions"], - sessionMemory: true, - }), - agentId: "main", - }), - ); - await manager.sync({ reason: "test" }); - - const db = ( - manager as unknown as { - db: { - prepare: (sql: string) => { - get: (path: string, source: string) => { hash: string } | undefined; - }; - }; - } - ).db; - const getSessionHash = (sessionPath: string) => - db - .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) - .get(sessionPath, "sessions")?.hash; - - const firstOriginalHash = getSessionHash("sessions/targeted-dirty-first.jsonl"); - const secondOriginalHash = getSessionHash("sessions/targeted-dirty-second.jsonl"); - - await fs.writeFile( - firstSessionPath, - `${JSON.stringify({ - type: "message", - message: { - role: "user", - content: [{ type: "text", text: "first transcript v2 after compaction" }], - }, - })}\n`, - ); - await fs.writeFile( - secondSessionPath, - `${JSON.stringify({ - type: "message", - message: { - role: "user", - content: [{ type: "text", text: "second transcript v2 still pending" }], - }, - })}\n`, - ); - - const internal = manager as unknown as { - sessionsDirty: boolean; - sessionsDirtyFiles: Set; - }; - internal.sessionsDirty = true; - internal.sessionsDirtyFiles.add(secondSessionPath); - - await manager.sync({ - reason: "post-compaction", - sessionFiles: [firstSessionPath], - }); - - expect(getSessionHash("sessions/targeted-dirty-first.jsonl")).not.toBe(firstOriginalHash); - expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).toBe(secondOriginalHash); - expect(internal.sessionsDirtyFiles.has(secondSessionPath)).toBe(true); - expect(internal.sessionsDirty).toBe(true); - - await manager.sync({ reason: "test" }); - - expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).not.toBe(secondOriginalHash); - await manager.close?.(); - } finally { - if (previousStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = previousStateDir; - } - await fs.rm(stateDir, { recursive: true, force: true }); - await fs.rm(storePath, { force: true }); - } - }); - - it("queues targeted session sync when another sync is already in progress", async () => { - const stateDir = path.join(fixtureRoot, `state-targeted-queued-${randomUUID()}`); - const sessionDir = path.join(stateDir, "agents", "main", "sessions"); - const sessionPath = path.join(sessionDir, "targeted-queued.jsonl"); - const storePath = path.join(workspaceDir, `index-targeted-queued-${randomUUID()}.sqlite`); - const previousStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = stateDir; - - await fs.mkdir(sessionDir, { recursive: true }); - await fs.writeFile( - sessionPath, - `${JSON.stringify({ - type: "message", - message: { role: "user", content: [{ type: "text", text: "queued transcript v1" }] }, - })}\n`, - ); - - try { - const manager = requireManager( - await getMemorySearchManager({ - cfg: createCfg({ - storePath, - sources: ["sessions"], - sessionMemory: true, - }), - agentId: "main", - }), - ); - await manager.sync({ reason: "test" }); - - const db = ( - manager as unknown as { - db: { - prepare: (sql: string) => { - get: (path: string, source: string) => { hash: string } | undefined; - }; - }; - } - ).db; - const getSessionHash = (sessionRelPath: string) => - db - .prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`) - .get(sessionRelPath, "sessions")?.hash; - const originalHash = getSessionHash("sessions/targeted-queued.jsonl"); - - const internal = manager as unknown as { - runSyncWithReadonlyRecovery: (params?: { - reason?: string; - sessionFiles?: string[]; - }) => Promise; - }; - const originalRunSync = internal.runSyncWithReadonlyRecovery.bind(manager); - let releaseBusySync: (() => void) | undefined; - const busyGate = new Promise((resolve) => { - releaseBusySync = resolve; - }); - internal.runSyncWithReadonlyRecovery = async (params) => { - if (params?.reason === "busy-sync") { - await busyGate; - } - return await originalRunSync(params); - }; - - const busySyncPromise = manager.sync({ reason: "busy-sync" }); - await fs.writeFile( - sessionPath, - `${JSON.stringify({ - type: "message", - message: { - role: "user", - content: [{ type: "text", text: "queued transcript v2 after compaction" }], - }, - })}\n`, - ); - - const targetedSyncPromise = manager.sync({ - reason: "post-compaction", - sessionFiles: [sessionPath], - }); - - releaseBusySync?.(); - await Promise.all([busySyncPromise, targetedSyncPromise]); - - expect(getSessionHash("sessions/targeted-queued.jsonl")).not.toBe(originalHash); - await manager.close?.(); - } finally { - if (previousStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = previousStateDir; - } - await fs.rm(stateDir, { recursive: true, force: true }); - await fs.rm(storePath, { force: true }); - } - }); - - it("runs a full reindex after fallback activates during targeted sync", async () => { - const stateDir = path.join(fixtureRoot, `state-targeted-fallback-${randomUUID()}`); - const sessionDir = path.join(stateDir, "agents", "main", "sessions"); - const sessionPath = path.join(sessionDir, "targeted-fallback.jsonl"); - const storePath = path.join(workspaceDir, `index-targeted-fallback-${randomUUID()}.sqlite`); - const previousStateDir = process.env.OPENCLAW_STATE_DIR; - process.env.OPENCLAW_STATE_DIR = stateDir; - - await fs.mkdir(sessionDir, { recursive: true }); - await fs.writeFile( - sessionPath, - `${JSON.stringify({ - type: "message", - message: { role: "user", content: [{ type: "text", text: "fallback transcript v1" }] }, - })}\n`, - ); - - try { - const manager = requireManager( - await getMemorySearchManager({ - cfg: createCfg({ - storePath, - sources: ["sessions"], - sessionMemory: true, - }), - agentId: "main", - }), - ); - await manager.sync({ reason: "test" }); - - const internal = manager as unknown as { - syncSessionFiles: (params: { - targetSessionFiles?: string[]; - needsFullReindex: boolean; - }) => Promise; - shouldFallbackOnError: (message: string) => boolean; - activateFallbackProvider: (reason: string) => Promise; - runSafeReindex: (params: { - reason?: string; - force?: boolean; - progress?: unknown; - }) => Promise; - runUnsafeReindex: (params: { - reason?: string; - force?: boolean; - progress?: unknown; - }) => Promise; - }; - const originalSyncSessionFiles = internal.syncSessionFiles.bind(manager); - const originalShouldFallbackOnError = internal.shouldFallbackOnError.bind(manager); - const originalActivateFallbackProvider = internal.activateFallbackProvider.bind(manager); - const originalRunSafeReindex = internal.runSafeReindex.bind(manager); - const originalRunUnsafeReindex = internal.runUnsafeReindex.bind(manager); - - internal.syncSessionFiles = async (params) => { - if (params.targetSessionFiles?.length) { - throw new Error("embedding backend failed"); - } - return await originalSyncSessionFiles(params); - }; - internal.shouldFallbackOnError = () => true; - const activateFallbackProvider = vi.fn(async () => true); - internal.activateFallbackProvider = activateFallbackProvider; - const runSafeReindex = vi.fn(async () => {}); - internal.runSafeReindex = runSafeReindex; - const runUnsafeReindex = vi.fn(async () => {}); - internal.runUnsafeReindex = runUnsafeReindex; - - await manager.sync({ - reason: "post-compaction", - sessionFiles: [sessionPath], - }); - - expect(activateFallbackProvider).toHaveBeenCalledWith("embedding backend failed"); - const expectedReindexParams = { - reason: "post-compaction", - force: true, - progress: undefined, - }; - const usesUnsafeReindex = - process.env.OPENCLAW_TEST_FAST === "1" && - process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1"; - if (usesUnsafeReindex) { - expect(runUnsafeReindex).toHaveBeenCalledWith(expectedReindexParams); - expect(runSafeReindex).not.toHaveBeenCalled(); - } else { - expect(runSafeReindex).toHaveBeenCalledWith(expectedReindexParams); - expect(runUnsafeReindex).not.toHaveBeenCalled(); - } - - internal.syncSessionFiles = originalSyncSessionFiles; - internal.shouldFallbackOnError = originalShouldFallbackOnError; - internal.activateFallbackProvider = originalActivateFallbackProvider; - internal.runSafeReindex = originalRunSafeReindex; - internal.runUnsafeReindex = originalRunUnsafeReindex; - await manager.close?.(); - } finally { - if (previousStateDir === undefined) { - delete process.env.OPENCLAW_STATE_DIR; - } else { - process.env.OPENCLAW_STATE_DIR = previousStateDir; - } - await fs.rm(stateDir, { recursive: true, force: true }); - await fs.rm(storePath, { force: true }); - } - }); - - it("reindexes when the embedding model changes", async () => { - const base = createCfg({ storePath: indexModelPath }); - const baseAgents = base.agents!; - const baseDefaults = baseAgents.defaults!; - const baseMemorySearch = baseDefaults.memorySearch!; - - const first = await getMemorySearchManager({ - cfg: { - ...base, - agents: { - ...baseAgents, - defaults: { - ...baseDefaults, - memorySearch: { - ...baseMemorySearch, - model: "mock-embed-v1", - }, - }, - }, - }, - agentId: "main", - }); - const firstManager = requireManager(first); - await firstManager.sync?.({ reason: "test" }); - const callsAfterFirstSync = embedBatchCalls; - await firstManager.close?.(); - - const second = await getMemorySearchManager({ - cfg: { - ...base, - agents: { - ...baseAgents, - defaults: { - ...baseDefaults, - memorySearch: { - ...baseMemorySearch, - model: "mock-embed-v2", - }, - }, - }, - }, - agentId: "main", - }); - const secondManager = requireManager(second); - await secondManager.sync?.({ reason: "test" }); - expect(embedBatchCalls).toBeGreaterThan(callsAfterFirstSync); - const status = secondManager.status(); - expect(status.files).toBeGreaterThan(0); - await secondManager.close?.(); - }); - it("passes Gemini outputDimensionality from config into the provider", async () => { const cfg = createCfg({ storePath: indexMainPath, @@ -1000,239 +533,6 @@ describe("memory index", () => { await manager.close?.(); }); - it("snapshots builtin file hashes with a single sqlite query per sync", async () => { - await fs.writeFile(path.join(memoryDir, "2026-01-13.md"), "beta line\n"); - const cfg = createCfg({ - storePath: path.join(workspaceDir, `index-prepare-reuse-${randomUUID()}.sqlite`), - onSearch: false, - }); - - const result = await getMemorySearchManager({ cfg, agentId: "main" }); - const manager = requireManager(result); - managersForCleanup.add(manager); - - await manager.sync({ reason: "test" }); - (manager as unknown as { dirty: boolean }).dirty = true; - - const db = ( - manager as unknown as { - db: { - prepare: (sql: string) => { get: (...args: unknown[]) => unknown }; - }; - } - ).db; - const originalPrepare = db.prepare.bind(db); - let selectSourceFileStatePrepareCalls = 0; - let perFileHashPrepareCalls = 0; - db.prepare = ((sql: string) => { - if (sql === `SELECT path, hash FROM files WHERE source = ?`) { - selectSourceFileStatePrepareCalls += 1; - } - if (sql === `SELECT hash FROM files WHERE path = ? AND source = ?`) { - perFileHashPrepareCalls += 1; - } - return originalPrepare(sql); - }) as typeof db.prepare; - - try { - await manager.sync({ reason: "test" }); - } finally { - db.prepare = originalPrepare; - } - - expect(selectSourceFileStatePrepareCalls).toBe(1); - expect(perFileHashPrepareCalls).toBe(0); - }); - - it("uses a single sqlite aggregation query for status counts", async () => { - const cfg = createCfg({ - storePath: path.join(workspaceDir, `index-status-aggregate-${randomUUID()}.sqlite`), - sources: ["memory", "sessions"], - sessionMemory: true, - onSearch: false, - }); - - await fs.writeFile(path.join(memoryDir, "2026-01-13.md"), "beta line\n"); - - const stateDir = path.join(fixtureRoot, `state-status-${randomUUID()}`); - vi.stubEnv("OPENCLAW_STATE_DIR", stateDir); - const sessionDir = path.join(stateDir, "agents", "main", "sessions"); - await fs.mkdir(sessionDir, { recursive: true }); - await fs.writeFile( - path.join(sessionDir, "status.jsonl"), - JSON.stringify({ - type: "message", - message: { role: "user", content: [{ type: "text", text: "session status line" }] }, - }) + "\n", - ); - - const result = await getMemorySearchManager({ cfg, agentId: "main" }); - const manager = requireManager(result); - managersForCleanup.add(manager); - await manager.sync({ reason: "test" }); - - const db = ( - manager as unknown as { - db: { - prepare: (sql: string) => { all: (...args: unknown[]) => unknown }; - }; - } - ).db; - const originalPrepare = db.prepare.bind(db); - let aggregatePrepareCalls = 0; - let legacyCountPrepareCalls = 0; - db.prepare = ((sql: string) => { - if ( - sql.includes(`SELECT 'files' AS kind, source, COUNT(*) as c FROM files`) && - sql.includes(`UNION ALL`) - ) { - aggregatePrepareCalls += 1; - } - if ( - sql === `SELECT COUNT(*) as c FROM files WHERE 1=1` || - sql === `SELECT COUNT(*) as c FROM chunks WHERE 1=1` || - sql === `SELECT source, COUNT(*) as c FROM files WHERE 1=1 GROUP BY source` || - sql === `SELECT source, COUNT(*) as c FROM chunks WHERE 1=1 GROUP BY source` - ) { - legacyCountPrepareCalls += 1; - } - return originalPrepare(sql); - }) as typeof db.prepare; - - try { - const status = manager.status(); - expect(status.files).toBeGreaterThan(0); - expect(status.chunks).toBeGreaterThan(0); - expect( - status.sourceCounts?.find((entry) => entry.source === "memory")?.files, - ).toBeGreaterThan(0); - expect( - status.sourceCounts?.find((entry) => entry.source === "sessions")?.files, - ).toBeGreaterThan(0); - } finally { - db.prepare = originalPrepare; - vi.unstubAllEnvs(); - } - - expect(aggregatePrepareCalls).toBe(1); - expect(legacyCountPrepareCalls).toBe(0); - }); - - it("reindexes when Gemini outputDimensionality changes", async () => { - const base = createCfg({ - storePath: indexModelPath, - provider: "gemini", - model: "gemini-embedding-2-preview", - outputDimensionality: 3072, - }); - const baseAgents = base.agents!; - const baseDefaults = baseAgents.defaults!; - const baseMemorySearch = baseDefaults.memorySearch!; - - const first = await getMemorySearchManager({ cfg: base, agentId: "main" }); - const firstManager = requireManager(first); - await firstManager.sync?.({ reason: "test" }); - const callsAfterFirstSync = embedBatchCalls; - await firstManager.close?.(); - - const second = await getMemorySearchManager({ - cfg: { - ...base, - agents: { - ...baseAgents, - defaults: { - ...baseDefaults, - memorySearch: { - ...baseMemorySearch, - outputDimensionality: 768, - }, - }, - }, - }, - agentId: "main", - }); - const secondManager = requireManager(second); - await secondManager.sync?.({ reason: "test" }); - expect(embedBatchCalls).toBeGreaterThan(callsAfterFirstSync); - await secondManager.close?.(); - }); - - it("reindexes when extraPaths change", async () => { - const storePath = path.join(workspaceDir, `index-scope-extra-${randomUUID()}.sqlite`); - const firstExtraDir = path.join(workspaceDir, "scope-extra-a"); - const secondExtraDir = path.join(workspaceDir, "scope-extra-b"); - await fs.rm(firstExtraDir, { recursive: true, force: true }); - await fs.rm(secondExtraDir, { recursive: true, force: true }); - await fs.mkdir(firstExtraDir, { recursive: true }); - await fs.mkdir(secondExtraDir, { recursive: true }); - await fs.writeFile(path.join(firstExtraDir, "a.md"), "alpha only"); - await fs.writeFile(path.join(secondExtraDir, "b.md"), "beta only"); - - const first = await getMemorySearchManager({ - cfg: createCfg({ - storePath, - extraPaths: [firstExtraDir], - }), - agentId: "main", - }); - const firstManager = requireManager(first); - await firstManager.sync?.({ reason: "test" }); - await firstManager.close?.(); - - const second = await getMemorySearchManager({ - cfg: createCfg({ - storePath, - extraPaths: [secondExtraDir], - }), - agentId: "main", - }); - const secondManager = requireManager(second); - await secondManager.sync?.({ reason: "test" }); - const results = await secondManager.search("beta"); - expect(results.some((result) => result.path.endsWith("scope-extra-b/b.md"))).toBe(true); - expect(results.some((result) => result.path.endsWith("scope-extra-a/a.md"))).toBe(false); - await secondManager.close?.(); - }); - - it("reindexes when multimodal settings change", async () => { - const storePath = path.join(workspaceDir, `index-scope-multimodal-${randomUUID()}.sqlite`); - const mediaDir = path.join(workspaceDir, "scope-media"); - await fs.rm(mediaDir, { recursive: true, force: true }); - await fs.mkdir(mediaDir, { recursive: true }); - await fs.writeFile(path.join(mediaDir, "diagram.png"), Buffer.from("png")); - - const first = await getMemorySearchManager({ - cfg: createCfg({ - storePath, - provider: "gemini", - model: "gemini-embedding-2-preview", - extraPaths: [mediaDir], - }), - agentId: "main", - }); - const firstManager = requireManager(first); - await firstManager.sync?.({ reason: "test" }); - const multimodalCallsAfterFirstSync = embedBatchInputCalls; - await firstManager.close?.(); - - const second = await getMemorySearchManager({ - cfg: createCfg({ - storePath, - provider: "gemini", - model: "gemini-embedding-2-preview", - extraPaths: [mediaDir], - multimodal: { enabled: true, modalities: ["image"] }, - }), - agentId: "main", - }); - const secondManager = requireManager(second); - await secondManager.sync?.({ reason: "test" }); - expect(embedBatchInputCalls).toBeGreaterThan(multimodalCallsAfterFirstSync); - const results = await secondManager.search("image"); - expect(results.some((result) => result.path.endsWith("scope-media/diagram.png"))).toBe(true); - await secondManager.close?.(); - }); - it("reuses cached embeddings on forced reindex", async () => { const cfg = createCfg({ storePath: indexMainPath, cacheEnabled: true }); const manager = await getPersistentManager(cfg); diff --git a/extensions/memory-core/src/memory/manager-reindex-state.test.ts b/extensions/memory-core/src/memory/manager-reindex-state.test.ts new file mode 100644 index 00000000000..68a83724288 --- /dev/null +++ b/extensions/memory-core/src/memory/manager-reindex-state.test.ts @@ -0,0 +1,160 @@ +import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; +import { describe, expect, it } from "vitest"; +import { + resolveConfiguredScopeHash, + resolveConfiguredSourcesForMeta, + shouldRunFullMemoryReindex, + type MemoryIndexMeta, +} from "./manager-reindex-state.js"; + +function createMeta(overrides: Partial = {}): MemoryIndexMeta { + return { + model: "mock-embed-v1", + provider: "openai", + providerKey: "provider-key-v1", + sources: ["memory"], + scopeHash: "scope-v1", + chunkTokens: 4000, + chunkOverlap: 0, + ftsTokenizer: "unicode61", + ...overrides, + }; +} + +function createFullReindexParams( + overrides: { + meta?: MemoryIndexMeta | null; + provider?: { id: string; model: string } | null; + providerKey?: string; + configuredSources?: MemorySource[]; + configuredScopeHash?: string; + chunkTokens?: number; + chunkOverlap?: number; + vectorReady?: boolean; + ftsTokenizer?: string; + } = {}, +) { + return { + meta: createMeta(), + provider: { id: "openai", model: "mock-embed-v1" }, + providerKey: "provider-key-v1", + configuredSources: ["memory"] as MemorySource[], + configuredScopeHash: "scope-v1", + chunkTokens: 4000, + chunkOverlap: 0, + vectorReady: false, + ftsTokenizer: "unicode61", + ...overrides, + }; +} + +describe("memory reindex state", () => { + it("requires a full reindex when the embedding model changes", () => { + expect( + shouldRunFullMemoryReindex( + createFullReindexParams({ + provider: { id: "openai", model: "mock-embed-v2" }, + }), + ), + ).toBe(true); + }); + + it("requires a full reindex when the provider cache key changes", () => { + expect( + shouldRunFullMemoryReindex( + createFullReindexParams({ + provider: { id: "gemini", model: "gemini-embedding-2-preview" }, + providerKey: "provider-key-dims-768", + meta: createMeta({ + provider: "gemini", + model: "gemini-embedding-2-preview", + providerKey: "provider-key-dims-3072", + }), + }), + ), + ).toBe(true); + }); + + it("requires a full reindex when extraPaths change", () => { + const workspaceDir = "/tmp/workspace"; + const firstScopeHash = resolveConfiguredScopeHash({ + workspaceDir, + extraPaths: ["/tmp/workspace/a"], + multimodal: { + enabled: false, + modalities: [], + maxFileBytes: 20 * 1024 * 1024, + }, + }); + const secondScopeHash = resolveConfiguredScopeHash({ + workspaceDir, + extraPaths: ["/tmp/workspace/b"], + multimodal: { + enabled: false, + modalities: [], + maxFileBytes: 20 * 1024 * 1024, + }, + }); + + expect( + shouldRunFullMemoryReindex( + createFullReindexParams({ + meta: createMeta({ scopeHash: firstScopeHash }), + configuredScopeHash: secondScopeHash, + }), + ), + ).toBe(true); + }); + + it("requires a full reindex when configured sources add sessions", () => { + expect( + shouldRunFullMemoryReindex( + createFullReindexParams({ + configuredSources: ["memory", "sessions"], + }), + ), + ).toBe(true); + }); + + it("requires a full reindex when multimodal settings change", () => { + const workspaceDir = "/tmp/workspace"; + const firstScopeHash = resolveConfiguredScopeHash({ + workspaceDir, + extraPaths: ["/tmp/workspace/media"], + multimodal: { + enabled: false, + modalities: [], + maxFileBytes: 20 * 1024 * 1024, + }, + }); + const secondScopeHash = resolveConfiguredScopeHash({ + workspaceDir, + extraPaths: ["/tmp/workspace/media"], + multimodal: { + enabled: true, + modalities: ["image"], + maxFileBytes: 20 * 1024 * 1024, + }, + }); + + expect( + shouldRunFullMemoryReindex( + createFullReindexParams({ + meta: createMeta({ scopeHash: firstScopeHash }), + configuredScopeHash: secondScopeHash, + }), + ), + ).toBe(true); + }); + + it("keeps older indexes with missing sources compatible with memory-only config", () => { + expect( + shouldRunFullMemoryReindex( + createFullReindexParams({ + meta: createMeta({ sources: undefined }), + configuredSources: resolveConfiguredSourcesForMeta(new Set(["memory"])), + }), + ), + ).toBe(false); + }); +}); diff --git a/extensions/memory-core/src/memory/manager-reindex-state.ts b/extensions/memory-core/src/memory/manager-reindex-state.ts new file mode 100644 index 00000000000..50adc7127b8 --- /dev/null +++ b/extensions/memory-core/src/memory/manager-reindex-state.ts @@ -0,0 +1,103 @@ +import { + hashText, + normalizeExtraMemoryPaths, + type MemorySource, +} from "openclaw/plugin-sdk/memory-core-host-engine-storage"; + +export type MemoryIndexMeta = { + model: string; + provider: string; + providerKey?: string; + sources?: MemorySource[]; + scopeHash?: string; + chunkTokens: number; + chunkOverlap: number; + vectorDims?: number; + ftsTokenizer?: string; +}; + +export function resolveConfiguredSourcesForMeta(sources: Iterable): MemorySource[] { + const normalized = Array.from(sources) + .filter((source): source is MemorySource => source === "memory" || source === "sessions") + .toSorted(); + return normalized.length > 0 ? normalized : ["memory"]; +} + +export function normalizeMetaSources(meta: MemoryIndexMeta): MemorySource[] { + if (!Array.isArray(meta.sources)) { + // Backward compatibility for older indexes that did not persist sources. + return ["memory"]; + } + const normalized = Array.from( + new Set( + meta.sources.filter( + (source): source is MemorySource => source === "memory" || source === "sessions", + ), + ), + ).toSorted(); + return normalized.length > 0 ? normalized : ["memory"]; +} + +export function configuredMetaSourcesDiffer(params: { + meta: MemoryIndexMeta; + configuredSources: MemorySource[]; +}): boolean { + const metaSources = normalizeMetaSources(params.meta); + if (metaSources.length !== params.configuredSources.length) { + return true; + } + return metaSources.some((source, index) => source !== params.configuredSources[index]); +} + +export function resolveConfiguredScopeHash(params: { + workspaceDir: string; + extraPaths?: string[]; + multimodal: { + enabled: boolean; + modalities: string[]; + maxFileBytes: number; + }; +}): string { + const extraPaths = normalizeExtraMemoryPaths(params.workspaceDir, params.extraPaths) + .map((value) => value.replace(/\\/g, "/")) + .toSorted(); + return hashText( + JSON.stringify({ + extraPaths, + multimodal: { + enabled: params.multimodal.enabled, + modalities: [...params.multimodal.modalities].toSorted(), + maxFileBytes: params.multimodal.maxFileBytes, + }, + }), + ); +} + +export function shouldRunFullMemoryReindex(params: { + meta: MemoryIndexMeta | null; + provider: { id: string; model: string } | null; + providerKey?: string; + configuredSources: MemorySource[]; + configuredScopeHash: string; + chunkTokens: number; + chunkOverlap: number; + vectorReady: boolean; + ftsTokenizer: string; +}): boolean { + const { meta } = params; + return ( + !meta || + (params.provider ? meta.model !== params.provider.model : meta.model !== "fts-only") || + (params.provider ? meta.provider !== params.provider.id : meta.provider !== "none") || + meta.providerKey !== params.providerKey || + configuredMetaSourcesDiffer({ + meta, + configuredSources: params.configuredSources, + }) || + meta.scopeHash !== params.configuredScopeHash || + meta.chunkTokens !== params.chunkTokens || + meta.chunkOverlap !== params.chunkOverlap || + (params.vectorReady && !meta.vectorDims) || + (meta.ftsTokenizer ?? "unicode61") !== params.ftsTokenizer + ); +} diff --git a/extensions/memory-core/src/memory/manager-source-state.test.ts b/extensions/memory-core/src/memory/manager-source-state.test.ts new file mode 100644 index 00000000000..9945563ac44 --- /dev/null +++ b/extensions/memory-core/src/memory/manager-source-state.test.ts @@ -0,0 +1,87 @@ +import { describe, expect, it } from "vitest"; +import { + loadMemorySourceFileState, + MEMORY_SOURCE_FILE_HASH_SQL, + MEMORY_SOURCE_FILE_STATE_SQL, + resolveMemorySourceExistingHash, +} from "./manager-source-state.js"; + +describe("memory source state", () => { + it("loads source hashes with one bulk query", () => { + const calls: Array<{ sql: string; args: unknown[] }> = []; + const state = loadMemorySourceFileState({ + db: { + prepare: (sql) => ({ + all: (...args) => { + calls.push({ sql, args }); + return [ + { path: "memory/one.md", hash: "hash-1" }, + { path: "memory/two.md", hash: "hash-2" }, + ]; + }, + get: () => undefined, + }), + }, + source: "memory", + }); + + expect(calls).toEqual([{ sql: MEMORY_SOURCE_FILE_STATE_SQL, args: ["memory"] }]); + expect(state.rows).toEqual([ + { path: "memory/one.md", hash: "hash-1" }, + { path: "memory/two.md", hash: "hash-2" }, + ]); + expect(state.hashes).toEqual( + new Map([ + ["memory/one.md", "hash-1"], + ["memory/two.md", "hash-2"], + ]), + ); + }); + + it("uses bulk snapshot hashes when present", () => { + const calls: Array<{ sql: string; args: unknown[] }> = []; + const hash = resolveMemorySourceExistingHash({ + db: { + prepare: (sql) => ({ + all: () => [], + get: (...args) => { + calls.push({ sql, args }); + return { hash: "unexpected" }; + }, + }), + }, + source: "sessions", + path: "sessions/thread.jsonl", + existingHashes: new Map([["sessions/thread.jsonl", "hash-from-snapshot"]]), + }); + + expect(hash).toBe("hash-from-snapshot"); + expect(calls).toEqual([]); + }); + + it("falls back to per-file lookups without a bulk snapshot", () => { + const calls: Array<{ sql: string; args: unknown[] }> = []; + const hash = resolveMemorySourceExistingHash({ + db: { + prepare: (sql) => ({ + all: () => [], + get: (...args) => { + calls.push({ sql, args }); + return { hash: "hash-from-row" }; + }, + }), + }, + source: "sessions", + path: "sessions/thread.jsonl", + existingHashes: null, + }); + + expect(hash).toBe("hash-from-row"); + expect(calls).toEqual([ + { + sql: MEMORY_SOURCE_FILE_HASH_SQL, + args: ["sessions/thread.jsonl", "sessions"], + }, + ]); + }); +}); diff --git a/extensions/memory-core/src/memory/manager-source-state.ts b/extensions/memory-core/src/memory/manager-source-state.ts new file mode 100644 index 00000000000..234e3953e6d --- /dev/null +++ b/extensions/memory-core/src/memory/manager-source-state.ts @@ -0,0 +1,49 @@ +import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; + +export type MemorySourceFileStateRow = { + path: string; + hash: string; +}; + +type MemorySourceStateDb = { + prepare: (sql: string) => { + all: (...args: unknown[]) => unknown; + get: (...args: unknown[]) => unknown; + }; +}; + +export const MEMORY_SOURCE_FILE_STATE_SQL = `SELECT path, hash FROM files WHERE source = ?`; +export const MEMORY_SOURCE_FILE_HASH_SQL = `SELECT hash FROM files WHERE path = ? AND source = ?`; + +export function loadMemorySourceFileState(params: { + db: MemorySourceStateDb; + source: MemorySource; +}): { + rows: MemorySourceFileStateRow[]; + hashes: Map; +} { + const rows = params.db.prepare(MEMORY_SOURCE_FILE_STATE_SQL).all(params.source) as + | MemorySourceFileStateRow[] + | undefined; + const normalizedRows = rows ?? []; + return { + rows: normalizedRows, + hashes: new Map(normalizedRows.map((row) => [row.path, row.hash])), + }; +} + +export function resolveMemorySourceExistingHash(params: { + db: MemorySourceStateDb; + source: MemorySource; + path: string; + existingHashes?: Map | null; +}): string | undefined { + if (params.existingHashes) { + return params.existingHashes.get(params.path); + } + return ( + params.db.prepare(MEMORY_SOURCE_FILE_HASH_SQL).get(params.path, params.source) as + | { hash: string } + | undefined + )?.hash; +} diff --git a/extensions/memory-core/src/memory/manager-status-state.test.ts b/extensions/memory-core/src/memory/manager-status-state.test.ts index 33c0e86c73c..405180e5a8d 100644 --- a/extensions/memory-core/src/memory/manager-status-state.test.ts +++ b/extensions/memory-core/src/memory/manager-status-state.test.ts @@ -1,5 +1,11 @@ +import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; import { describe, expect, it } from "vitest"; -import { resolveInitialMemoryDirty, resolveStatusProviderInfo } from "./manager-status-state.js"; +import { + collectMemoryStatusAggregate, + MEMORY_STATUS_AGGREGATE_SQL, + resolveInitialMemoryDirty, + resolveStatusProviderInfo, +} from "./manager-status-state.js"; describe("memory manager status state", () => { it("keeps memory clean for status-only managers after prior indexing", () => { @@ -51,4 +57,41 @@ describe("memory manager status state", () => { searchMode: "fts-only", }); }); + + it("uses one aggregation query for status counts and source breakdowns", () => { + const calls: Array<{ sql: string; params: MemorySource[] }> = []; + const aggregate = collectMemoryStatusAggregate({ + db: { + prepare: (sql) => ({ + all: (...params) => { + calls.push({ sql, params }); + return [ + { kind: "files" as const, source: "memory" as const, c: 2 }, + { kind: "chunks" as const, source: "memory" as const, c: 5 }, + { kind: "files" as const, source: "sessions" as const, c: 1 }, + { kind: "chunks" as const, source: "sessions" as const, c: 3 }, + ]; + }, + }), + }, + sources: ["memory", "sessions"], + sourceFilterSql: " AND source IN (?, ?)", + sourceFilterParams: ["memory", "sessions"], + }); + + expect(calls).toEqual([ + { + sql: MEMORY_STATUS_AGGREGATE_SQL.replaceAll("__FILTER__", " AND source IN (?, ?)"), + params: ["memory", "sessions", "memory", "sessions"], + }, + ]); + expect(aggregate).toEqual({ + files: 3, + chunks: 8, + sourceCounts: [ + { source: "memory", files: 2, chunks: 5 }, + { source: "sessions", files: 1, chunks: 3 }, + ], + }); + }); }); diff --git a/extensions/memory-core/src/memory/manager-status-state.ts b/extensions/memory-core/src/memory/manager-status-state.ts index 20e36fd646c..4ee3c69ae6c 100644 --- a/extensions/memory-core/src/memory/manager-status-state.ts +++ b/extensions/memory-core/src/memory/manager-status-state.ts @@ -1,8 +1,27 @@ +import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; + type StatusProvider = { id: string; model: string; }; +type StatusAggregateRow = { + kind: "files" | "chunks"; + source: MemorySource; + c: number; +}; + +type StatusAggregateDb = { + prepare: (sql: string) => { + all: (...args: MemorySource[]) => StatusAggregateRow[]; + }; +}; + +export const MEMORY_STATUS_AGGREGATE_SQL = + `SELECT 'files' AS kind, source, COUNT(*) as c FROM files WHERE 1=1__FILTER__ GROUP BY source\n` + + `UNION ALL\n` + + `SELECT 'chunks' AS kind, source, COUNT(*) as c FROM chunks WHERE 1=1__FILTER__ GROUP BY source`; + export function resolveInitialMemoryDirty(params: { hasMemorySource: boolean; statusOnly: boolean; @@ -41,3 +60,44 @@ export function resolveStatusProviderInfo(params: { searchMode: "hybrid", }; } + +export function collectMemoryStatusAggregate(params: { + db: StatusAggregateDb; + sources: Iterable; + sourceFilterSql?: string; + sourceFilterParams?: MemorySource[]; +}): { + files: number; + chunks: number; + sourceCounts: Array<{ source: MemorySource; files: number; chunks: number }>; +} { + const sources = Array.from(params.sources); + const bySource = new Map(); + for (const source of sources) { + bySource.set(source, { files: 0, chunks: 0 }); + } + const sourceFilterSql = params.sourceFilterSql ?? ""; + const sourceFilterParams = params.sourceFilterParams ?? []; + const aggregateRows = params.db + .prepare(MEMORY_STATUS_AGGREGATE_SQL.replaceAll("__FILTER__", sourceFilterSql)) + .all(...sourceFilterParams, ...sourceFilterParams); + let files = 0; + let chunks = 0; + for (const row of aggregateRows) { + const count = row.c ?? 0; + const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 }; + if (row.kind === "files") { + entry.files = count; + files += count; + } else { + entry.chunks = count; + chunks += count; + } + bySource.set(row.source, entry); + } + return { + files, + chunks, + sourceCounts: sources.map((source) => Object.assign({ source }, bySource.get(source)!)), + }; +} diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 8ba2a1143a1..05bdbbd09b5 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -26,14 +26,11 @@ import { } from "openclaw/plugin-sdk/memory-core-host-engine-qmd"; import { buildFileEntry, - ensureDir, ensureMemoryIndexSchema, - hashText, isFileMissingError, listMemoryFiles, loadSqliteVecExtension, normalizeExtraMemoryPaths, - requireNodeSqlite, runWithConcurrency, type MemoryFileEntry, type MemorySource, @@ -46,19 +43,19 @@ import { type EmbeddingProviderRuntime, resolveEmbeddingProviderFallbackModel, } from "./embeddings.js"; +import { openMemoryDatabaseAtPath } from "./manager-db.js"; +import { + resolveConfiguredScopeHash, + resolveConfiguredSourcesForMeta, + shouldRunFullMemoryReindex, + type MemoryIndexMeta, +} from "./manager-reindex-state.js"; import { shouldSyncSessionsForReindex } from "./manager-session-reindex.js"; - -type MemoryIndexMeta = { - model: string; - provider: string; - providerKey?: string; - sources?: MemorySource[]; - scopeHash?: string; - chunkTokens: number; - chunkOverlap: number; - vectorDims?: number; - ftsTokenizer?: string; -}; +import { + loadMemorySourceFileState, + resolveMemorySourceExistingHash, +} from "./manager-source-state.js"; +import { runMemoryTargetedSessionSync } from "./manager-targeted-sync.js"; type MemorySyncProgressState = { completed: number; @@ -86,18 +83,6 @@ const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([ const log = createSubsystemLogger("memory"); -export function openMemoryDatabaseAtPath(dbPath: string, allowExtension: boolean): DatabaseSync { - const dir = path.dirname(dbPath); - ensureDir(dir); - const { DatabaseSync } = requireNodeSqlite(); - const db = new DatabaseSync(dbPath, { allowExtension }); - // busy_timeout is per-connection and resets to 0 on restart. - // Set it on every open so concurrent processes retry instead of - // failing immediately with SQLITE_BUSY. - db.exec("PRAGMA busy_timeout = 5000"); - return db; -} - function shouldIgnoreMemoryWatchPath(watchPath: string): boolean { const normalized = path.normalize(watchPath); const parts = normalized.split(path.sep).map((segment) => segment.trim().toLowerCase()); @@ -633,17 +618,6 @@ export abstract class MemoryManagerSyncOps { return normalized.size > 0 ? normalized : null; } - private clearSyncedSessionFiles(targetSessionFiles?: Iterable | null) { - if (!targetSessionFiles) { - this.sessionsDirtyFiles.clear(); - } else { - for (const targetSessionFile of targetSessionFiles) { - this.sessionsDirtyFiles.delete(targetSessionFile); - } - } - this.sessionsDirty = this.sessionsDirtyFiles.size > 0; - } - protected ensureIntervalSync() { const minutes = this.settings.sync.intervalMinutes; if (!minutes || minutes <= 0 || this.intervalTimer) { @@ -685,7 +659,6 @@ export abstract class MemoryManagerSyncOps { needsFullReindex: boolean; progress?: MemorySyncProgressState; }) { - const selectSourceFileState = this.db.prepare(`SELECT path, hash FROM files WHERE source = ?`); const deleteFileByPathAndSource = this.db.prepare( `DELETE FROM files WHERE path = ? AND source = ?`, ); @@ -723,11 +696,12 @@ export abstract class MemoryManagerSyncOps { batch: this.batch.enabled, concurrency: this.getIndexConcurrency(), }); - const existingRows = selectSourceFileState.all("memory") as Array<{ - path: string; - hash: string; - }>; - const existingHashes = new Map(existingRows.map((row) => [row.path, row.hash])); + const existingState = loadMemorySourceFileState({ + db: this.db, + source: "memory", + }); + const existingRows = existingState.rows; + const existingHashes = existingState.hashes; const activePaths = new Set(fileEntries.map((entry) => entry.path)); if (params.progress) { params.progress.total += fileEntries.length; @@ -784,8 +758,6 @@ export abstract class MemoryManagerSyncOps { targetSessionFiles?: string[]; progress?: MemorySyncProgressState; }) { - const selectFileHash = this.db.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`); - const selectSourceFileState = this.db.prepare(`SELECT path, hash FROM files WHERE source = ?`); const deleteFileByPathAndSource = this.db.prepare( `DELETE FROM files WHERE path = ? AND source = ?`, ); @@ -815,10 +787,10 @@ export abstract class MemoryManagerSyncOps { const existingRows = activePaths === null ? null - : (selectSourceFileState.all("sessions") as Array<{ - path: string; - hash: string; - }>); + : loadMemorySourceFileState({ + db: this.db, + source: "sessions", + }).rows; const existingHashes = existingRows === null ? null : new Map(existingRows.map((row) => [row.path, row.hash])); const indexAll = @@ -862,15 +834,12 @@ export abstract class MemoryManagerSyncOps { } return; } - const existingHash = - existingHashes?.get(entry.path) ?? - ( - selectFileHash.get(entry.path, "sessions") as - | { - hash: string; - } - | undefined - )?.hash; + const existingHash = resolveMemorySourceExistingHash({ + db: this.db, + source: "sessions", + path: entry.path, + existingHashes, + }); if (!params.needsFullReindex && existingHash === entry.hash) { if (params.progress) { params.progress.completed += 1; @@ -964,60 +933,57 @@ export abstract class MemoryManagerSyncOps { } const vectorReady = await this.ensureVectorReady(); const meta = this.readMeta(); - const configuredSources = this.resolveConfiguredSourcesForMeta(); - const configuredScopeHash = this.resolveConfiguredScopeHash(); + const configuredSources = resolveConfiguredSourcesForMeta(this.sources); + const configuredScopeHash = resolveConfiguredScopeHash({ + workspaceDir: this.workspaceDir, + extraPaths: this.settings.extraPaths, + multimodal: { + enabled: this.settings.multimodal.enabled, + modalities: this.settings.multimodal.modalities, + maxFileBytes: this.settings.multimodal.maxFileBytes, + }, + }); const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles); const hasTargetSessionFiles = targetSessionFiles !== null; - if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) { - // Post-compaction refreshes should only update the explicit transcript files and - // leave broader reindex/dirty-work decisions to the regular sync path. - try { - await this.syncSessionFiles({ - needsFullReindex: false, - targetSessionFiles: Array.from(targetSessionFiles), - progress: progress ?? undefined, - }); - this.clearSyncedSessionFiles(targetSessionFiles); - } catch (err) { - const reason = err instanceof Error ? err.message : String(err); - const activated = - this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason)); - if (activated) { - if ( - process.env.OPENCLAW_TEST_FAST === "1" && - process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1" - ) { - await this.runUnsafeReindex({ - reason: params?.reason, - force: true, - progress: progress ?? undefined, - }); - } else { - await this.runSafeReindex({ - reason: params?.reason, - force: true, - progress: progress ?? undefined, - }); - } - return; - } - throw err; - } + const targetedSessionSync = await runMemoryTargetedSessionSync({ + hasSessionSource: this.sources.has("sessions"), + targetSessionFiles, + reason: params?.reason, + progress: progress ?? undefined, + useUnsafeReindex: + process.env.OPENCLAW_TEST_FAST === "1" && + process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1", + sessionsDirtyFiles: this.sessionsDirtyFiles, + syncSessionFiles: async (targetedParams) => { + await this.syncSessionFiles(targetedParams); + }, + shouldFallbackOnError: (message) => this.shouldFallbackOnError(message), + activateFallbackProvider: async (reason) => await this.activateFallbackProvider(reason), + runSafeReindex: async (reindexParams) => { + await this.runSafeReindex(reindexParams); + }, + runUnsafeReindex: async (reindexParams) => { + await this.runUnsafeReindex(reindexParams); + }, + }); + if (targetedSessionSync.handled) { + this.sessionsDirty = targetedSessionSync.sessionsDirty; return; } const needsFullReindex = (params?.force && !hasTargetSessionFiles) || - !meta || - // Also detects provider→FTS-only transitions so orphaned old-model FTS rows are cleaned up. - (this.provider ? meta.model !== this.provider.model : meta.model !== "fts-only") || - (this.provider ? meta.provider !== this.provider.id : meta.provider !== "none") || - meta.providerKey !== this.providerKey || - this.metaSourcesDiffer(meta, configuredSources) || - meta.scopeHash !== configuredScopeHash || - meta.chunkTokens !== this.settings.chunking.tokens || - meta.chunkOverlap !== this.settings.chunking.overlap || - (vectorReady && !meta?.vectorDims) || - (meta.ftsTokenizer ?? "unicode61") !== this.settings.store.fts.tokenizer; + shouldRunFullMemoryReindex({ + meta, + // Also detects provider→FTS-only transitions so orphaned old-model FTS rows are cleaned up. + provider: this.provider ? { id: this.provider.id, model: this.provider.model } : null, + providerKey: this.providerKey, + configuredSources, + configuredScopeHash, + chunkTokens: this.settings.chunking.tokens, + chunkOverlap: this.settings.chunking.overlap, + vectorReady, + ftsTokenizer: this.settings.store.fts.tokenizer, + }); try { if (needsFullReindex) { if ( @@ -1209,8 +1175,16 @@ export abstract class MemoryManagerSyncOps { model: this.provider?.model ?? "fts-only", provider: this.provider?.id ?? "none", providerKey: this.providerKey!, - sources: this.resolveConfiguredSourcesForMeta(), - scopeHash: this.resolveConfiguredScopeHash(), + sources: resolveConfiguredSourcesForMeta(this.sources), + scopeHash: resolveConfiguredScopeHash({ + workspaceDir: this.workspaceDir, + extraPaths: this.settings.extraPaths, + multimodal: { + enabled: this.settings.multimodal.enabled, + modalities: this.settings.multimodal.modalities, + maxFileBytes: this.settings.multimodal.maxFileBytes, + }, + }), chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, ftsTokenizer: this.settings.store.fts.tokenizer, @@ -1282,8 +1256,16 @@ export abstract class MemoryManagerSyncOps { model: this.provider?.model ?? "fts-only", provider: this.provider?.id ?? "none", providerKey: this.providerKey!, - sources: this.resolveConfiguredSourcesForMeta(), - scopeHash: this.resolveConfiguredScopeHash(), + sources: resolveConfiguredSourcesForMeta(this.sources), + scopeHash: resolveConfiguredScopeHash({ + workspaceDir: this.workspaceDir, + extraPaths: this.settings.extraPaths, + multimodal: { + enabled: this.settings.multimodal.enabled, + modalities: this.settings.multimodal.modalities, + maxFileBytes: this.settings.multimodal.maxFileBytes, + }, + }), chunkTokens: this.settings.chunking.tokens, chunkOverlap: this.settings.chunking.overlap, ftsTokenizer: this.settings.store.fts.tokenizer, @@ -1340,50 +1322,4 @@ export abstract class MemoryManagerSyncOps { .run(META_KEY, value); this.lastMetaSerialized = value; } - - private resolveConfiguredSourcesForMeta(): MemorySource[] { - const normalized = Array.from(this.sources) - .filter((source): source is MemorySource => source === "memory" || source === "sessions") - .toSorted(); - return normalized.length > 0 ? normalized : ["memory"]; - } - - private normalizeMetaSources(meta: MemoryIndexMeta): MemorySource[] { - if (!Array.isArray(meta.sources)) { - // Backward compatibility for older indexes that did not persist sources. - return ["memory"]; - } - const normalized = Array.from( - new Set( - meta.sources.filter( - (source): source is MemorySource => source === "memory" || source === "sessions", - ), - ), - ).toSorted(); - return normalized.length > 0 ? normalized : ["memory"]; - } - - private resolveConfiguredScopeHash(): string { - const extraPaths = normalizeExtraMemoryPaths(this.workspaceDir, this.settings.extraPaths) - .map((value) => value.replace(/\\/g, "/")) - .toSorted(); - return hashText( - JSON.stringify({ - extraPaths, - multimodal: { - enabled: this.settings.multimodal.enabled, - modalities: [...this.settings.multimodal.modalities].toSorted(), - maxFileBytes: this.settings.multimodal.maxFileBytes, - }, - }), - ); - } - - private metaSourcesDiffer(meta: MemoryIndexMeta, configuredSources: MemorySource[]): boolean { - const metaSources = this.normalizeMetaSources(meta); - if (metaSources.length !== configuredSources.length) { - return true; - } - return metaSources.some((source, index) => source !== configuredSources[index]); - } } diff --git a/extensions/memory-core/src/memory/manager-targeted-sync.test.ts b/extensions/memory-core/src/memory/manager-targeted-sync.test.ts new file mode 100644 index 00000000000..ecd545bb90a --- /dev/null +++ b/extensions/memory-core/src/memory/manager-targeted-sync.test.ts @@ -0,0 +1,78 @@ +import { describe, expect, it, vi } from "vitest"; +import { + clearMemorySyncedSessionFiles, + runMemoryTargetedSessionSync, +} from "./manager-targeted-sync.js"; + +describe("memory targeted session sync", () => { + it("preserves unrelated dirty sessions after targeted cleanup", () => { + const secondSessionPath = "/tmp/targeted-dirty-second.jsonl"; + const sessionsDirtyFiles = new Set(["/tmp/targeted-dirty-first.jsonl", secondSessionPath]); + + const sessionsDirty = clearMemorySyncedSessionFiles({ + sessionsDirtyFiles, + targetSessionFiles: ["/tmp/targeted-dirty-first.jsonl"], + }); + + expect(sessionsDirtyFiles.has(secondSessionPath)).toBe(true); + expect(sessionsDirty).toBe(true); + }); + + it("runs a full reindex after fallback activates during targeted sync", async () => { + const activateFallbackProvider = vi.fn(async () => true); + const runSafeReindex = vi.fn(async () => {}); + const runUnsafeReindex = vi.fn(async () => {}); + + await runMemoryTargetedSessionSync({ + hasSessionSource: true, + targetSessionFiles: new Set(["/tmp/targeted-fallback.jsonl"]), + reason: "post-compaction", + progress: undefined, + useUnsafeReindex: false, + sessionsDirtyFiles: new Set(), + syncSessionFiles: async () => { + throw new Error("embedding backend failed"); + }, + shouldFallbackOnError: () => true, + activateFallbackProvider, + runSafeReindex, + runUnsafeReindex, + }); + + expect(activateFallbackProvider).toHaveBeenCalledWith("embedding backend failed"); + expect(runSafeReindex).toHaveBeenCalledWith({ + reason: "post-compaction", + force: true, + progress: undefined, + }); + expect(runUnsafeReindex).not.toHaveBeenCalled(); + }); + + it("uses the unsafe reindex path when enabled", async () => { + const runSafeReindex = vi.fn(async () => {}); + const runUnsafeReindex = vi.fn(async () => {}); + + await runMemoryTargetedSessionSync({ + hasSessionSource: true, + targetSessionFiles: new Set(["/tmp/targeted-fallback.jsonl"]), + reason: "post-compaction", + progress: undefined, + useUnsafeReindex: true, + sessionsDirtyFiles: new Set(), + syncSessionFiles: async () => { + throw new Error("embedding backend failed"); + }, + shouldFallbackOnError: () => true, + activateFallbackProvider: async () => true, + runSafeReindex, + runUnsafeReindex, + }); + + expect(runUnsafeReindex).toHaveBeenCalledWith({ + reason: "post-compaction", + force: true, + progress: undefined, + }); + expect(runSafeReindex).not.toHaveBeenCalled(); + }); +}); diff --git a/extensions/memory-core/src/memory/manager-targeted-sync.ts b/extensions/memory-core/src/memory/manager-targeted-sync.ts new file mode 100644 index 00000000000..a8bbe6f54f1 --- /dev/null +++ b/extensions/memory-core/src/memory/manager-targeted-sync.ts @@ -0,0 +1,86 @@ +import type { MemorySyncProgressUpdate } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; + +type TargetedSyncProgress = (update: MemorySyncProgressUpdate) => void; + +export function clearMemorySyncedSessionFiles(params: { + sessionsDirtyFiles: Set; + targetSessionFiles?: Iterable | null; +}): boolean { + if (!params.targetSessionFiles) { + params.sessionsDirtyFiles.clear(); + } else { + for (const targetSessionFile of params.targetSessionFiles) { + params.sessionsDirtyFiles.delete(targetSessionFile); + } + } + return params.sessionsDirtyFiles.size > 0; +} + +export async function runMemoryTargetedSessionSync(params: { + hasSessionSource: boolean; + targetSessionFiles: Set | null; + reason?: string; + progress?: TargetedSyncProgress; + useUnsafeReindex: boolean; + sessionsDirtyFiles: Set; + syncSessionFiles: (params: { + needsFullReindex: boolean; + targetSessionFiles?: string[]; + progress?: TargetedSyncProgress; + }) => Promise; + shouldFallbackOnError: (message: string) => boolean; + activateFallbackProvider: (reason: string) => Promise; + runSafeReindex: (params: { + reason?: string; + force?: boolean; + progress?: TargetedSyncProgress; + }) => Promise; + runUnsafeReindex: (params: { + reason?: string; + force?: boolean; + progress?: TargetedSyncProgress; + }) => Promise; +}): Promise<{ handled: boolean; sessionsDirty: boolean }> { + if (!params.hasSessionSource || !params.targetSessionFiles) { + return { + handled: false, + sessionsDirty: params.sessionsDirtyFiles.size > 0, + }; + } + + try { + await params.syncSessionFiles({ + needsFullReindex: false, + targetSessionFiles: Array.from(params.targetSessionFiles), + progress: params.progress, + }); + return { + handled: true, + sessionsDirty: clearMemorySyncedSessionFiles({ + sessionsDirtyFiles: params.sessionsDirtyFiles, + targetSessionFiles: params.targetSessionFiles, + }), + }; + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + const activated = + params.shouldFallbackOnError(reason) && (await params.activateFallbackProvider(reason)); + if (!activated) { + throw err; + } + const reindexParams = { + reason: params.reason, + force: true, + progress: params.progress, + }; + if (params.useUnsafeReindex) { + await params.runUnsafeReindex(reindexParams); + } else { + await params.runSafeReindex(reindexParams); + } + return { + handled: true, + sessionsDirty: params.sessionsDirtyFiles.size > 0, + }; + } +}