From ca26489fe88227bbafc582f3be00680abdbc0803 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 6 Apr 2026 21:10:46 +0100 Subject: [PATCH] fix(memory-core): repair sync helper typing drift --- .../src/memory/manager-source-state.ts | 5 +- .../src/memory/manager-status-state.test.ts | 4 +- .../src/memory/manager-status-state.ts | 3 +- .../src/memory/manager-sync-control.ts | 50 ++++++++-------- .../src/memory/manager-sync-ops.ts | 2 +- .../src/memory/manager-targeted-sync.ts | 7 ++- .../memory/manager.readonly-recovery.test.ts | 59 ++++++++++--------- extensions/memory-core/src/memory/manager.ts | 28 ++++++++- 8 files changed, 99 insertions(+), 59 deletions(-) diff --git a/extensions/memory-core/src/memory/manager-source-state.ts b/extensions/memory-core/src/memory/manager-source-state.ts index 234e3953e6d..2fbfa3bc097 100644 --- a/extensions/memory-core/src/memory/manager-source-state.ts +++ b/extensions/memory-core/src/memory/manager-source-state.ts @@ -1,3 +1,4 @@ +import type { SQLInputValue } from "node:sqlite"; import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; export type MemorySourceFileStateRow = { @@ -7,8 +8,8 @@ export type MemorySourceFileStateRow = { type MemorySourceStateDb = { prepare: (sql: string) => { - all: (...args: unknown[]) => unknown; - get: (...args: unknown[]) => unknown; + all: (...args: SQLInputValue[]) => unknown; + get: (...args: SQLInputValue[]) => unknown; }; }; diff --git a/extensions/memory-core/src/memory/manager-status-state.test.ts b/extensions/memory-core/src/memory/manager-status-state.test.ts index 405180e5a8d..6dde45acecc 100644 --- a/extensions/memory-core/src/memory/manager-status-state.test.ts +++ b/extensions/memory-core/src/memory/manager-status-state.test.ts @@ -1,4 +1,4 @@ -import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; +import type { SQLInputValue } from "node:sqlite"; import { describe, expect, it } from "vitest"; import { collectMemoryStatusAggregate, @@ -59,7 +59,7 @@ describe("memory manager status state", () => { }); it("uses one aggregation query for status counts and source breakdowns", () => { - const calls: Array<{ sql: string; params: MemorySource[] }> = []; + const calls: Array<{ sql: string; params: SQLInputValue[] }> = []; const aggregate = collectMemoryStatusAggregate({ db: { prepare: (sql) => ({ diff --git a/extensions/memory-core/src/memory/manager-status-state.ts b/extensions/memory-core/src/memory/manager-status-state.ts index 4ee3c69ae6c..217a3fb1871 100644 --- a/extensions/memory-core/src/memory/manager-status-state.ts +++ b/extensions/memory-core/src/memory/manager-status-state.ts @@ -1,3 +1,4 @@ +import type { SQLInputValue } from "node:sqlite"; import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; type StatusProvider = { @@ -13,7 +14,7 @@ type StatusAggregateRow = { type StatusAggregateDb = { prepare: (sql: string) => { - all: (...args: MemorySource[]) => StatusAggregateRow[]; + all: (...args: SQLInputValue[]) => StatusAggregateRow[]; }; }; diff --git a/extensions/memory-core/src/memory/manager-sync-control.ts b/extensions/memory-core/src/memory/manager-sync-control.ts index 339a5df1626..32ab8517279 100644 --- a/extensions/memory-core/src/memory/manager-sync-control.ts +++ b/extensions/memory-core/src/memory/manager-sync-control.ts @@ -124,10 +124,11 @@ export async function runMemorySyncWithReadonlyRecovery( export function enqueueMemoryTargetedSessionSync( state: { - closed: boolean; - syncing: Promise | null; - queuedSessionFiles: Set; - queuedSessionSync: Promise | null; + isClosed: () => boolean; + getSyncing: () => Promise | null; + getQueuedSessionFiles: () => Set; + getQueuedSessionSync: () => Promise | null; + setQueuedSessionSync: (value: Promise | null) => void; sync: (params?: { reason?: string; force?: boolean; @@ -137,33 +138,36 @@ export function enqueueMemoryTargetedSessionSync( }, sessionFiles?: string[], ): Promise { + const queuedSessionFiles = state.getQueuedSessionFiles(); for (const sessionFile of sessionFiles ?? []) { const trimmed = sessionFile.trim(); if (trimmed) { - state.queuedSessionFiles.add(trimmed); + queuedSessionFiles.add(trimmed); } } - if (state.queuedSessionFiles.size === 0) { - return state.syncing ?? Promise.resolve(); + if (queuedSessionFiles.size === 0) { + return state.getSyncing() ?? Promise.resolve(); } - if (!state.queuedSessionSync) { - state.queuedSessionSync = (async () => { - try { - await state.syncing?.catch(() => undefined); - while (!state.closed && state.queuedSessionFiles.size > 0) { - const queuedSessionFiles = Array.from(state.queuedSessionFiles); - state.queuedSessionFiles.clear(); - await state.sync({ - reason: "queued-session-files", - sessionFiles: queuedSessionFiles, - }); + if (!state.getQueuedSessionSync()) { + state.setQueuedSessionSync( + (async () => { + try { + await state.getSyncing()?.catch(() => undefined); + while (!state.isClosed() && state.getQueuedSessionFiles().size > 0) { + const pendingSessionFiles = Array.from(state.getQueuedSessionFiles()); + state.getQueuedSessionFiles().clear(); + await state.sync({ + reason: "queued-session-files", + sessionFiles: pendingSessionFiles, + }); + } + } finally { + state.setQueuedSessionSync(null); } - } finally { - state.queuedSessionSync = null; - } - })(); + })(), + ); } - return state.queuedSessionSync; + return state.getQueuedSessionSync() ?? Promise.resolve(); } export function _createMemorySyncControlConfigForTests( diff --git a/extensions/memory-core/src/memory/manager-sync-ops.ts b/extensions/memory-core/src/memory/manager-sync-ops.ts index 31c22b8cda2..16601a0d0d0 100644 --- a/extensions/memory-core/src/memory/manager-sync-ops.ts +++ b/extensions/memory-core/src/memory/manager-sync-ops.ts @@ -979,7 +979,7 @@ export abstract class MemoryManagerSyncOps { meta, // Also detects provider→FTS-only transitions so orphaned old-model FTS rows are cleaned up. provider: this.provider ? { id: this.provider.id, model: this.provider.model } : null, - providerKey: this.providerKey, + providerKey: this.providerKey ?? undefined, configuredSources, configuredScopeHash, chunkTokens: this.settings.chunking.tokens, diff --git a/extensions/memory-core/src/memory/manager-targeted-sync.ts b/extensions/memory-core/src/memory/manager-targeted-sync.ts index a8bbe6f54f1..d7103c130d2 100644 --- a/extensions/memory-core/src/memory/manager-targeted-sync.ts +++ b/extensions/memory-core/src/memory/manager-targeted-sync.ts @@ -1,6 +1,11 @@ import type { MemorySyncProgressUpdate } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; -type TargetedSyncProgress = (update: MemorySyncProgressUpdate) => void; +type TargetedSyncProgress = { + completed: number; + total: number; + label?: string; + report: (update: MemorySyncProgressUpdate) => void; +}; export function clearMemorySyncedSessionFiles(params: { sessionsDirtyFiles: Set; diff --git a/extensions/memory-core/src/memory/manager.readonly-recovery.test.ts b/extensions/memory-core/src/memory/manager.readonly-recovery.test.ts index cfa924511fe..5183c78b926 100644 --- a/extensions/memory-core/src/memory/manager.readonly-recovery.test.ts +++ b/extensions/memory-core/src/memory/manager.readonly-recovery.test.ts @@ -28,6 +28,29 @@ describe("memory manager readonly recovery", () => { let workspaceDir = ""; let indexPath = ""; + function createQueuedSyncHarness(syncing: Promise) { + const queuedSessionFiles = new Set(); + let queuedSessionSync: Promise | null = null; + const sync = vi.fn(async () => {}); + return { + queuedSessionFiles, + get queuedSessionSync() { + return queuedSessionSync; + }, + sync, + state: { + isClosed: () => false, + getSyncing: () => syncing, + getQueuedSessionFiles: () => queuedSessionFiles, + getQueuedSessionSync: () => queuedSessionSync, + setQueuedSessionSync: (value: Promise | null) => { + queuedSessionSync = value; + }, + sync, + }, + }; + } + function _createMemoryConfig(): OpenClawConfig { return _createMemorySyncControlConfigForTests(workspaceDir, indexPath); } @@ -56,9 +79,9 @@ describe("memory manager readonly recovery", () => { readonlyRecoveryLastError: undefined, ensureProviderInitialized: vi.fn(async () => {}), enqueueTargetedSessionSync: vi.fn(async () => {}), - runSync: vi.fn(), + runSync: vi.fn(async (_params) => undefined) as ReadonlyRecoveryHarness["runSync"], openDatabase: vi.fn(() => reopenedDb), - ensureSchema: vi.fn(), + ensureSchema: vi.fn(() => undefined) as ReadonlyRecoveryHarness["ensureSchema"], readMeta: vi.fn(() => undefined), }; return { @@ -168,15 +191,9 @@ describe("memory manager readonly recovery", () => { const pendingSync = new Promise((resolve) => { releaseSync = () => resolve(); }); - const harness = { - closed: false, - syncing: pendingSync, - queuedSessionFiles: new Set(), - queuedSessionSync: null as Promise | null, - sync: vi.fn(async () => {}), - }; + const harness = createQueuedSyncHarness(pendingSync); - const queued = enqueueMemoryTargetedSessionSync(harness, [ + const queued = enqueueMemoryTargetedSessionSync(harness.state, [ " /tmp/first.jsonl ", "", "/tmp/second.jsonl", @@ -200,19 +217,13 @@ describe("memory manager readonly recovery", () => { const pendingSync = new Promise((resolve) => { releaseSync = () => resolve(); }); - const harness = { - closed: false, - syncing: pendingSync, - queuedSessionFiles: new Set(), - queuedSessionSync: null as Promise | null, - sync: vi.fn(async () => {}), - }; + const harness = createQueuedSyncHarness(pendingSync); - const first = enqueueMemoryTargetedSessionSync(harness, [ + const first = enqueueMemoryTargetedSessionSync(harness.state, [ "/tmp/first.jsonl", "/tmp/second.jsonl", ]); - const second = enqueueMemoryTargetedSessionSync(harness, [ + const second = enqueueMemoryTargetedSessionSync(harness.state, [ "/tmp/second.jsonl", "/tmp/third.jsonl", ]); @@ -234,15 +245,9 @@ describe("memory manager readonly recovery", () => { const pendingSync = new Promise((resolve) => { releaseSync = () => resolve(); }); - const harness = { - closed: false, - syncing: pendingSync, - queuedSessionFiles: new Set(), - queuedSessionSync: null as Promise | null, - sync: vi.fn(async () => {}), - }; + const harness = createQueuedSyncHarness(pendingSync); - const queued = enqueueMemoryTargetedSessionSync(harness, ["", " "]); + const queued = enqueueMemoryTargetedSessionSync(harness.state, ["", " "]); expect(queued).toBe(pendingSync); releaseSync(); diff --git a/extensions/memory-core/src/memory/manager.ts b/extensions/memory-core/src/memory/manager.ts index dc60c122805..997a424be56 100644 --- a/extensions/memory-core/src/memory/manager.ts +++ b/extensions/memory-core/src/memory/manager.ts @@ -1,5 +1,7 @@ +import type { DatabaseSync } from "node:sqlite"; import { type FSWatcher } from "chokidar"; import { + createSubsystemLogger, resolveAgentDir, resolveAgentWorkspaceDir, resolveMemorySearchConfig, @@ -53,6 +55,7 @@ const EMBEDDING_CACHE_TABLE = "embedding_cache"; const BATCH_FAILURE_LIMIT = 2; const MEMORY_INDEX_MANAGER_CACHE_KEY = Symbol.for("openclaw.memoryIndexManagerCache"); +const log = createSubsystemLogger("memory"); const { cache: INDEX_CACHE, pending: INDEX_CACHE_PENDING } = resolveSingletonManagedCache(MEMORY_INDEX_MANAGER_CACHE_KEY); @@ -542,7 +545,19 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem } private enqueueTargetedSessionSync(sessionFiles?: string[]): Promise { - return enqueueMemoryTargetedSessionSync(this, sessionFiles); + return enqueueMemoryTargetedSessionSync( + { + isClosed: () => this.closed, + getSyncing: () => this.syncing, + getQueuedSessionFiles: () => this.queuedSessionFiles, + getQueuedSessionSync: () => this.queuedSessionSync, + setQueuedSessionSync: (value) => { + this.queuedSessionSync = value; + }, + sync: async (params) => await this.sync(params), + }, + sessionFiles, + ); } private isReadonlyDbError(err: unknown): boolean { @@ -650,7 +665,16 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem status(): MemoryProviderStatus { const sourceFilter = this.buildSourceFilter(); const aggregateState = collectMemoryStatusAggregate({ - db: this.db, + db: { + prepare: (sql) => ({ + all: (...args) => + this.db.prepare(sql).all(...args) as Array<{ + kind: "files" | "chunks"; + source: MemorySource; + c: number; + }>, + }), + }, sources: this.sources, sourceFilterSql: sourceFilter.sql, sourceFilterParams: sourceFilter.params,