mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 16:01:01 +00:00
fix(memory-core): stream embedding cache seed during reindex
- stream safe-reindex embedding-cache seeding with SQLite iterate() - avoid no-op empty-cache transactions and keep regression coverage explicit - supersedes #73067 Thanks @parkertoddbrooks.
This commit is contained in:
committed by
GitHub
parent
2057713af5
commit
983fd775e2
@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Plugin SDK: fall back from partial bundled plugin directory overrides to package source public surfaces while preserving `OPENCLAW_DISABLE_BUNDLED_PLUGINS` as a hard disable. (#72817) Thanks @serkonyc.
|
||||
- Agents/ACPX: stop forwarding Codex ACP timeout config controls that Codex rejects while preserving OpenClaw's run-timeout watchdog for ACP subagents. Fixes #73052. Thanks @pfrederiksen and @richa65.
|
||||
- Memory Core: stream fallback vector search scoring with a bounded top-K result set so large indexes do not materialize every chunk embedding when sqlite-vec is unavailable. (#73069) Thanks @parkertoddbrooks.
|
||||
- Memory Core: stream embedding-cache seeding during safe reindex so large local caches do not materialize every row into the V8 heap before the atomic rebuild. (#73067) Thanks @parkertoddbrooks.
|
||||
- Memory/Ollama: add `memorySearch.remote.nonBatchConcurrency` for inline embedding indexing, default Ollama non-batch indexing to one request at a time, and keep batch concurrency separate from non-batch concurrency so local embedding backfills avoid timeout storms on smaller hosts. Carries forward #57733. Thanks @itilys.
|
||||
- macOS app: update Peekaboo, ElevenLabsKit, and MLX TTS helper dependencies, make canvas file watching and config/exec-approval state writes reliable under concurrent app/test activity, and keep the app plus helper builds warning-free. Thanks @Blaizzy.
|
||||
- iOS app: refresh SwiftPM/XcodeGen source hygiene, make app, extension, watch, and curated shared Swift files pass the prebuild SwiftFormat and SwiftLint checks, move relay registration off deprecated StoreKit receipt APIs, and keep simulator builds and logic tests warning-free. Thanks @ngutman.
|
||||
|
||||
@@ -445,6 +445,72 @@ describe("memory index", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("streams embedding cache rows during safe reindex", async () => {
|
||||
vi.stubEnv("OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX", "0");
|
||||
type EmbeddingCacheRow = {
|
||||
provider: string;
|
||||
model: string;
|
||||
provider_key: string;
|
||||
hash: string;
|
||||
embedding: string;
|
||||
dims: number | null;
|
||||
updated_at: number;
|
||||
};
|
||||
type StatementWithAll = {
|
||||
all: () => EmbeddingCacheRow[];
|
||||
};
|
||||
|
||||
const cfg = createCfg({
|
||||
storePath: path.join(workspaceDir, "index-cache-seed-stream.sqlite"),
|
||||
cacheEnabled: true,
|
||||
});
|
||||
const manager = await getPersistentManager(cfg);
|
||||
await manager.sync({ reason: "test" });
|
||||
|
||||
// Safe reindex streams cache rows from the original database and writes
|
||||
// them into a temporary database, so the SELECT spy belongs on this handle.
|
||||
const sourceDb = (
|
||||
manager as unknown as {
|
||||
db: {
|
||||
prepare: (sql: string) => unknown;
|
||||
};
|
||||
}
|
||||
).db;
|
||||
const originalPrepare = sourceDb.prepare.bind(sourceDb);
|
||||
const cachedRows = (
|
||||
originalPrepare(
|
||||
"SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM embedding_cache",
|
||||
) as StatementWithAll
|
||||
).all();
|
||||
expect(cachedRows.length).toBeGreaterThan(0);
|
||||
|
||||
const beforeCalls = embedBatchCalls;
|
||||
const prepareSpy = vi.spyOn(sourceDb, "prepare").mockImplementation((sql: string) => {
|
||||
if (
|
||||
sql.includes(
|
||||
"SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM embedding_cache",
|
||||
)
|
||||
) {
|
||||
return {
|
||||
all: () => {
|
||||
throw new Error("embedding cache seed must stream rows via iterate()");
|
||||
},
|
||||
iterate: () => cachedRows[Symbol.iterator](),
|
||||
};
|
||||
}
|
||||
return originalPrepare(sql);
|
||||
});
|
||||
|
||||
try {
|
||||
(manager as unknown as { dirty: boolean }).dirty = true;
|
||||
await manager.sync({ reason: "test", force: true });
|
||||
} finally {
|
||||
prepareSpy.mockRestore();
|
||||
}
|
||||
|
||||
expect(embedBatchCalls).toBe(beforeCalls);
|
||||
});
|
||||
|
||||
it("builds FTS index and returns search results when no embedding provider is available", async () => {
|
||||
forceNoProvider = true;
|
||||
|
||||
|
||||
@@ -308,16 +308,17 @@ export abstract class MemoryManagerSyncOps {
|
||||
return openMemoryDatabaseAtPath(dbPath, this.settings.store.vector.enabled);
|
||||
}
|
||||
|
||||
private seedEmbeddingCache(sourceDb: DatabaseSync): void {
|
||||
private async seedEmbeddingCache(sourceDb: DatabaseSync): Promise<void> {
|
||||
if (!this.cache.enabled) {
|
||||
return;
|
||||
}
|
||||
let transactionStarted = false;
|
||||
try {
|
||||
const rows = sourceDb
|
||||
.prepare(
|
||||
`SELECT provider, model, provider_key, hash, embedding, dims, updated_at FROM ${EMBEDDING_CACHE_TABLE}`,
|
||||
)
|
||||
.all() as Array<{
|
||||
.iterate() as IterableIterator<{
|
||||
provider: string;
|
||||
model: string;
|
||||
provider_key: string;
|
||||
@@ -326,19 +327,23 @@ export abstract class MemoryManagerSyncOps {
|
||||
dims: number | null;
|
||||
updated_at: number;
|
||||
}>;
|
||||
if (!rows.length) {
|
||||
return;
|
||||
}
|
||||
const insert = this.db.prepare(
|
||||
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET
|
||||
embedding=excluded.embedding,
|
||||
dims=excluded.dims,
|
||||
updated_at=excluded.updated_at`,
|
||||
);
|
||||
this.db.exec("BEGIN");
|
||||
// Keep gateway health probes responsive while rebuilding large caches.
|
||||
const SEED_EMBEDDING_YIELD_EVERY = 1000;
|
||||
let rowCount = 0;
|
||||
let insert: ReturnType<DatabaseSync["prepare"]> | null = null;
|
||||
for (const row of rows) {
|
||||
if (!insert) {
|
||||
insert = this.db.prepare(
|
||||
`INSERT INTO ${EMBEDDING_CACHE_TABLE} (provider, model, provider_key, hash, embedding, dims, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(provider, model, provider_key, hash) DO UPDATE SET
|
||||
embedding=excluded.embedding,
|
||||
dims=excluded.dims,
|
||||
updated_at=excluded.updated_at`,
|
||||
);
|
||||
this.db.exec("BEGIN");
|
||||
transactionStarted = true;
|
||||
}
|
||||
insert.run(
|
||||
row.provider,
|
||||
row.model,
|
||||
@@ -348,12 +353,22 @@ export abstract class MemoryManagerSyncOps {
|
||||
row.dims,
|
||||
row.updated_at,
|
||||
);
|
||||
rowCount += 1;
|
||||
if (rowCount % SEED_EMBEDDING_YIELD_EVERY === 0) {
|
||||
await new Promise<void>((resolve) => {
|
||||
setImmediate(resolve);
|
||||
});
|
||||
}
|
||||
}
|
||||
if (transactionStarted) {
|
||||
this.db.exec("COMMIT");
|
||||
}
|
||||
this.db.exec("COMMIT");
|
||||
} catch (err) {
|
||||
try {
|
||||
this.db.exec("ROLLBACK");
|
||||
} catch {}
|
||||
if (transactionStarted) {
|
||||
try {
|
||||
this.db.exec("ROLLBACK");
|
||||
} catch {}
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
@@ -1167,7 +1182,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
targetPath: dbPath,
|
||||
tempPath: tempDbPath,
|
||||
build: async () => {
|
||||
this.seedEmbeddingCache(originalDb);
|
||||
await this.seedEmbeddingCache(originalDb);
|
||||
const shouldSyncMemory = this.sources.has("memory");
|
||||
const shouldSyncSessions = this.shouldSyncSessions(
|
||||
{ reason: params.reason, force: params.force },
|
||||
|
||||
Reference in New Issue
Block a user