mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-27 09:02:15 +00:00
perf(memory): builtin sqlite hot-path follow-ups (#53939)
* chore(perf): start builtin sqlite hotpath workstream * perf(memory): reuse sqlite statements during sync * perf(memory): snapshot file state during sync * perf(memory): consolidate status sqlite reads * docs(changelog): note builtin sqlite perf work * perf(memory): avoid session table scans on targeted sync
This commit is contained in:
@@ -554,6 +554,7 @@ describe("memory index", () => {
|
||||
db: {
|
||||
prepare: (sql: string) => {
|
||||
get: (path: string, source: string) => { hash: string } | undefined;
|
||||
all?: (...args: unknown[]) => unknown;
|
||||
};
|
||||
};
|
||||
}
|
||||
@@ -587,13 +588,41 @@ describe("memory index", () => {
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
const originalPrepare = db.prepare.bind(db);
|
||||
let bulkSessionStateAllCalls = 0;
|
||||
let perFileSessionHashPrepareCalls = 0;
|
||||
db.prepare = ((sql: string) => {
|
||||
const statement = originalPrepare(sql);
|
||||
if (sql === `SELECT path, hash FROM files WHERE source = ?`) {
|
||||
if (!statement.all) {
|
||||
throw new Error("expected sqlite statement.all for bulk session state query");
|
||||
}
|
||||
const bulkAll = statement.all.bind(statement);
|
||||
return {
|
||||
...statement,
|
||||
all: (...args: unknown[]) => {
|
||||
bulkSessionStateAllCalls += 1;
|
||||
return bulkAll(...args);
|
||||
},
|
||||
};
|
||||
}
|
||||
if (sql === `SELECT hash FROM files WHERE path = ? AND source = ?`) {
|
||||
perFileSessionHashPrepareCalls += 1;
|
||||
}
|
||||
return statement;
|
||||
}) as typeof db.prepare;
|
||||
|
||||
await manager.sync?.({
|
||||
reason: "post-compaction",
|
||||
sessionFiles: [firstSessionPath],
|
||||
});
|
||||
|
||||
db.prepare = originalPrepare;
|
||||
|
||||
expect(getSessionHash("sessions/targeted-first.jsonl")).not.toBe(firstOriginalHash);
|
||||
expect(getSessionHash("sessions/targeted-second.jsonl")).toBe(secondOriginalHash);
|
||||
expect(bulkSessionStateAllCalls).toBe(0);
|
||||
expect(perFileSessionHashPrepareCalls).toBeGreaterThan(0);
|
||||
await manager.close?.();
|
||||
} finally {
|
||||
if (previousStateDir === undefined) {
|
||||
@@ -990,6 +1019,121 @@ describe("memory index", () => {
|
||||
await manager.close?.();
|
||||
});
|
||||
|
||||
it("snapshots builtin file hashes with a single sqlite query per sync", async () => {
|
||||
await fs.writeFile(path.join(memoryDir, "2026-01-13.md"), "beta line\n");
|
||||
const cfg = createCfg({
|
||||
storePath: path.join(workspaceDir, `index-prepare-reuse-${randomUUID()}.sqlite`),
|
||||
onSearch: false,
|
||||
});
|
||||
|
||||
const result = await getMemorySearchManager({ cfg, agentId: "main" });
|
||||
const manager = requireManager(result);
|
||||
managersForCleanup.add(manager);
|
||||
|
||||
const db = (
|
||||
manager as unknown as {
|
||||
db: {
|
||||
prepare: (sql: string) => { get: (...args: unknown[]) => unknown };
|
||||
};
|
||||
}
|
||||
).db;
|
||||
const originalPrepare = db.prepare.bind(db);
|
||||
let selectSourceFileStatePrepareCalls = 0;
|
||||
let perFileHashPrepareCalls = 0;
|
||||
db.prepare = ((sql: string) => {
|
||||
if (sql === `SELECT path, hash FROM files WHERE source = ?`) {
|
||||
selectSourceFileStatePrepareCalls += 1;
|
||||
}
|
||||
if (sql === `SELECT hash FROM files WHERE path = ? AND source = ?`) {
|
||||
perFileHashPrepareCalls += 1;
|
||||
}
|
||||
return originalPrepare(sql);
|
||||
}) as typeof db.prepare;
|
||||
|
||||
try {
|
||||
await manager.sync({ reason: "test" });
|
||||
} finally {
|
||||
db.prepare = originalPrepare;
|
||||
}
|
||||
|
||||
expect(selectSourceFileStatePrepareCalls).toBe(1);
|
||||
expect(perFileHashPrepareCalls).toBe(0);
|
||||
});
|
||||
|
||||
it("uses a single sqlite aggregation query for status counts", async () => {
|
||||
const cfg = createCfg({
|
||||
storePath: path.join(workspaceDir, `index-status-aggregate-${randomUUID()}.sqlite`),
|
||||
sources: ["memory", "sessions"],
|
||||
sessionMemory: true,
|
||||
onSearch: false,
|
||||
});
|
||||
|
||||
await fs.writeFile(path.join(memoryDir, "2026-01-13.md"), "beta line\n");
|
||||
|
||||
const stateDir = path.join(fixtureRoot, `state-status-${randomUUID()}`);
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
|
||||
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
|
||||
await fs.mkdir(sessionDir, { recursive: true });
|
||||
await fs.writeFile(
|
||||
path.join(sessionDir, "status.jsonl"),
|
||||
JSON.stringify({
|
||||
type: "message",
|
||||
message: { role: "user", content: [{ type: "text", text: "session status line" }] },
|
||||
}) + "\n",
|
||||
);
|
||||
|
||||
const result = await getMemorySearchManager({ cfg, agentId: "main" });
|
||||
const manager = requireManager(result);
|
||||
managersForCleanup.add(manager);
|
||||
await manager.sync({ reason: "test" });
|
||||
|
||||
const db = (
|
||||
manager as unknown as {
|
||||
db: {
|
||||
prepare: (sql: string) => { all: (...args: unknown[]) => unknown };
|
||||
};
|
||||
}
|
||||
).db;
|
||||
const originalPrepare = db.prepare.bind(db);
|
||||
let aggregatePrepareCalls = 0;
|
||||
let legacyCountPrepareCalls = 0;
|
||||
db.prepare = ((sql: string) => {
|
||||
if (
|
||||
sql.includes(`SELECT 'files' AS kind, source, COUNT(*) as c FROM files`) &&
|
||||
sql.includes(`UNION ALL`)
|
||||
) {
|
||||
aggregatePrepareCalls += 1;
|
||||
}
|
||||
if (
|
||||
sql === `SELECT COUNT(*) as c FROM files WHERE 1=1` ||
|
||||
sql === `SELECT COUNT(*) as c FROM chunks WHERE 1=1` ||
|
||||
sql === `SELECT source, COUNT(*) as c FROM files WHERE 1=1 GROUP BY source` ||
|
||||
sql === `SELECT source, COUNT(*) as c FROM chunks WHERE 1=1 GROUP BY source`
|
||||
) {
|
||||
legacyCountPrepareCalls += 1;
|
||||
}
|
||||
return originalPrepare(sql);
|
||||
}) as typeof db.prepare;
|
||||
|
||||
try {
|
||||
const status = manager.status();
|
||||
expect(status.files).toBeGreaterThan(0);
|
||||
expect(status.chunks).toBeGreaterThan(0);
|
||||
expect(
|
||||
status.sourceCounts?.find((entry) => entry.source === "memory")?.files,
|
||||
).toBeGreaterThan(0);
|
||||
expect(
|
||||
status.sourceCounts?.find((entry) => entry.source === "sessions")?.files,
|
||||
).toBeGreaterThan(0);
|
||||
} finally {
|
||||
db.prepare = originalPrepare;
|
||||
vi.unstubAllEnvs();
|
||||
}
|
||||
|
||||
expect(aggregatePrepareCalls).toBe(1);
|
||||
expect(legacyCountPrepareCalls).toBe(0);
|
||||
});
|
||||
|
||||
it("reindexes when Gemini outputDimensionality changes", async () => {
|
||||
const base = createCfg({
|
||||
storePath: indexModelPath,
|
||||
|
||||
@@ -705,6 +705,23 @@ export abstract class MemoryManagerSyncOps {
|
||||
log.debug("Skipping memory file sync in FTS-only mode (no embedding provider)");
|
||||
return;
|
||||
}
|
||||
const selectSourceFileState = this.db.prepare(`SELECT path, hash FROM files WHERE source = ?`);
|
||||
const deleteFileByPathAndSource = this.db.prepare(
|
||||
`DELETE FROM files WHERE path = ? AND source = ?`,
|
||||
);
|
||||
const deleteChunksByPathAndSource = this.db.prepare(
|
||||
`DELETE FROM chunks WHERE path = ? AND source = ?`,
|
||||
);
|
||||
const deleteVectorRowsByPathAndSource =
|
||||
this.vector.enabled && this.vector.available
|
||||
? this.db.prepare(
|
||||
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
|
||||
)
|
||||
: null;
|
||||
const deleteFtsRowsByPathSourceAndModel =
|
||||
this.fts.enabled && this.fts.available
|
||||
? this.db.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
|
||||
: null;
|
||||
|
||||
const files = await listMemoryFiles(
|
||||
this.workspaceDir,
|
||||
@@ -726,6 +743,11 @@ export abstract class MemoryManagerSyncOps {
|
||||
batch: this.batch.enabled,
|
||||
concurrency: this.getIndexConcurrency(),
|
||||
});
|
||||
const existingRows = selectSourceFileState.all("memory") as Array<{
|
||||
path: string;
|
||||
hash: string;
|
||||
}>;
|
||||
const existingHashes = new Map(existingRows.map((row) => [row.path, row.hash]));
|
||||
const activePaths = new Set(fileEntries.map((entry) => entry.path));
|
||||
if (params.progress) {
|
||||
params.progress.total += fileEntries.length;
|
||||
@@ -737,10 +759,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
|
||||
const tasks = fileEntries.map((entry) => async () => {
|
||||
const record = this.db
|
||||
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
|
||||
.get(entry.path, "memory") as { hash: string } | undefined;
|
||||
if (!params.needsFullReindex && record?.hash === entry.hash) {
|
||||
if (!params.needsFullReindex && existingHashes.get(entry.path) === entry.hash) {
|
||||
if (params.progress) {
|
||||
params.progress.completed += 1;
|
||||
params.progress.report({
|
||||
@@ -761,27 +780,20 @@ export abstract class MemoryManagerSyncOps {
|
||||
});
|
||||
await runWithConcurrency(tasks, this.getIndexConcurrency());
|
||||
|
||||
const staleRows = this.db
|
||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||
.all("memory") as Array<{ path: string }>;
|
||||
for (const stale of staleRows) {
|
||||
for (const stale of existingRows) {
|
||||
if (activePaths.has(stale.path)) {
|
||||
continue;
|
||||
}
|
||||
this.db.prepare(`DELETE FROM files WHERE path = ? AND source = ?`).run(stale.path, "memory");
|
||||
try {
|
||||
this.db
|
||||
.prepare(
|
||||
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
|
||||
)
|
||||
.run(stale.path, "memory");
|
||||
} catch {}
|
||||
this.db.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`).run(stale.path, "memory");
|
||||
if (this.fts.enabled && this.fts.available) {
|
||||
deleteFileByPathAndSource.run(stale.path, "memory");
|
||||
if (deleteVectorRowsByPathAndSource) {
|
||||
try {
|
||||
this.db
|
||||
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
|
||||
.run(stale.path, "memory", this.provider.model);
|
||||
deleteVectorRowsByPathAndSource.run(stale.path, "memory");
|
||||
} catch {}
|
||||
}
|
||||
deleteChunksByPathAndSource.run(stale.path, "memory");
|
||||
if (deleteFtsRowsByPathSourceAndModel) {
|
||||
try {
|
||||
deleteFtsRowsByPathSourceAndModel.run(stale.path, "memory", this.provider.model);
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
@@ -797,6 +809,24 @@ export abstract class MemoryManagerSyncOps {
|
||||
log.debug("Skipping session file sync in FTS-only mode (no embedding provider)");
|
||||
return;
|
||||
}
|
||||
const selectFileHash = this.db.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`);
|
||||
const selectSourceFileState = this.db.prepare(`SELECT path, hash FROM files WHERE source = ?`);
|
||||
const deleteFileByPathAndSource = this.db.prepare(
|
||||
`DELETE FROM files WHERE path = ? AND source = ?`,
|
||||
);
|
||||
const deleteChunksByPathAndSource = this.db.prepare(
|
||||
`DELETE FROM chunks WHERE path = ? AND source = ?`,
|
||||
);
|
||||
const deleteVectorRowsByPathAndSource =
|
||||
this.vector.enabled && this.vector.available
|
||||
? this.db.prepare(
|
||||
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
|
||||
)
|
||||
: null;
|
||||
const deleteFtsRowsByPathSourceAndModel =
|
||||
this.fts.enabled && this.fts.available
|
||||
? this.db.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
|
||||
: null;
|
||||
|
||||
const targetSessionFiles = params.needsFullReindex
|
||||
? null
|
||||
@@ -807,6 +837,15 @@ export abstract class MemoryManagerSyncOps {
|
||||
const activePaths = targetSessionFiles
|
||||
? null
|
||||
: new Set(files.map((file) => sessionPathForFile(file)));
|
||||
const existingRows =
|
||||
activePaths === null
|
||||
? null
|
||||
: (selectSourceFileState.all("sessions") as Array<{
|
||||
path: string;
|
||||
hash: string;
|
||||
}>);
|
||||
const existingHashes =
|
||||
existingRows === null ? null : new Map(existingRows.map((row) => [row.path, row.hash]));
|
||||
const indexAll =
|
||||
params.needsFullReindex || Boolean(targetSessionFiles) || this.sessionsDirtyFiles.size === 0;
|
||||
log.debug("memory sync: indexing session files", {
|
||||
@@ -848,10 +887,16 @@ export abstract class MemoryManagerSyncOps {
|
||||
}
|
||||
return;
|
||||
}
|
||||
const record = this.db
|
||||
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
|
||||
.get(entry.path, "sessions") as { hash: string } | undefined;
|
||||
if (!params.needsFullReindex && record?.hash === entry.hash) {
|
||||
const existingHash =
|
||||
existingHashes?.get(entry.path) ??
|
||||
(
|
||||
selectFileHash.get(entry.path, "sessions") as
|
||||
| {
|
||||
hash: string;
|
||||
}
|
||||
| undefined
|
||||
)?.hash;
|
||||
if (!params.needsFullReindex && existingHash === entry.hash) {
|
||||
if (params.progress) {
|
||||
params.progress.completed += 1;
|
||||
params.progress.report({
|
||||
@@ -880,31 +925,20 @@ export abstract class MemoryManagerSyncOps {
|
||||
return;
|
||||
}
|
||||
|
||||
const staleRows = this.db
|
||||
.prepare(`SELECT path FROM files WHERE source = ?`)
|
||||
.all("sessions") as Array<{ path: string }>;
|
||||
for (const stale of staleRows) {
|
||||
for (const stale of existingRows ?? []) {
|
||||
if (activePaths.has(stale.path)) {
|
||||
continue;
|
||||
}
|
||||
this.db
|
||||
.prepare(`DELETE FROM files WHERE path = ? AND source = ?`)
|
||||
.run(stale.path, "sessions");
|
||||
try {
|
||||
this.db
|
||||
.prepare(
|
||||
`DELETE FROM ${VECTOR_TABLE} WHERE id IN (SELECT id FROM chunks WHERE path = ? AND source = ?)`,
|
||||
)
|
||||
.run(stale.path, "sessions");
|
||||
} catch {}
|
||||
this.db
|
||||
.prepare(`DELETE FROM chunks WHERE path = ? AND source = ?`)
|
||||
.run(stale.path, "sessions");
|
||||
if (this.fts.enabled && this.fts.available) {
|
||||
deleteFileByPathAndSource.run(stale.path, "sessions");
|
||||
if (deleteVectorRowsByPathAndSource) {
|
||||
try {
|
||||
this.db
|
||||
.prepare(`DELETE FROM ${FTS_TABLE} WHERE path = ? AND source = ? AND model = ?`)
|
||||
.run(stale.path, "sessions", this.provider.model);
|
||||
deleteVectorRowsByPathAndSource.run(stale.path, "sessions");
|
||||
} catch {}
|
||||
}
|
||||
deleteChunksByPathAndSource.run(stale.path, "sessions");
|
||||
if (deleteFtsRowsByPathSourceAndModel) {
|
||||
try {
|
||||
deleteFtsRowsByPathSourceAndModel.run(stale.path, "sessions", this.provider.model);
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -694,46 +694,42 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
|
||||
status(): MemoryProviderStatus {
|
||||
const sourceFilter = this.buildSourceFilter();
|
||||
const files = this.db
|
||||
.prepare(`SELECT COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql}`)
|
||||
.get(...sourceFilter.params) as {
|
||||
const aggregateRows = this.db
|
||||
.prepare(
|
||||
`SELECT 'files' AS kind, source, COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql} GROUP BY source\n` +
|
||||
`UNION ALL\n` +
|
||||
`SELECT 'chunks' AS kind, source, COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql} GROUP BY source`,
|
||||
)
|
||||
.all(...sourceFilter.params, ...sourceFilter.params) as Array<{
|
||||
kind: "files" | "chunks";
|
||||
source: MemorySource;
|
||||
c: number;
|
||||
};
|
||||
const chunks = this.db
|
||||
.prepare(`SELECT COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql}`)
|
||||
.get(...sourceFilter.params) as {
|
||||
c: number;
|
||||
};
|
||||
const sourceCounts = (() => {
|
||||
}>;
|
||||
const aggregateState = (() => {
|
||||
const sources = Array.from(this.sources);
|
||||
if (sources.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const bySource = new Map<MemorySource, { files: number; chunks: number }>();
|
||||
for (const source of sources) {
|
||||
bySource.set(source, { files: 0, chunks: 0 });
|
||||
}
|
||||
const fileRows = this.db
|
||||
.prepare(
|
||||
`SELECT source, COUNT(*) as c FROM files WHERE 1=1${sourceFilter.sql} GROUP BY source`,
|
||||
)
|
||||
.all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>;
|
||||
for (const row of fileRows) {
|
||||
let files = 0;
|
||||
let chunks = 0;
|
||||
for (const row of aggregateRows) {
|
||||
const count = row.c ?? 0;
|
||||
const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 };
|
||||
entry.files = row.c ?? 0;
|
||||
if (row.kind === "files") {
|
||||
entry.files = count;
|
||||
files += count;
|
||||
} else {
|
||||
entry.chunks = count;
|
||||
chunks += count;
|
||||
}
|
||||
bySource.set(row.source, entry);
|
||||
}
|
||||
const chunkRows = this.db
|
||||
.prepare(
|
||||
`SELECT source, COUNT(*) as c FROM chunks WHERE 1=1${sourceFilter.sql} GROUP BY source`,
|
||||
)
|
||||
.all(...sourceFilter.params) as Array<{ source: MemorySource; c: number }>;
|
||||
for (const row of chunkRows) {
|
||||
const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 };
|
||||
entry.chunks = row.c ?? 0;
|
||||
bySource.set(row.source, entry);
|
||||
}
|
||||
return sources.map((source) => Object.assign({ source }, bySource.get(source)!));
|
||||
return {
|
||||
files,
|
||||
chunks,
|
||||
sourceCounts: sources.map((source) => Object.assign({ source }, bySource.get(source)!)),
|
||||
};
|
||||
})();
|
||||
|
||||
const searchMode = this.provider || !this.providerInitialized ? "hybrid" : "fts-only";
|
||||
@@ -745,8 +741,8 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
|
||||
return {
|
||||
backend: "builtin",
|
||||
files: files?.c ?? 0,
|
||||
chunks: chunks?.c ?? 0,
|
||||
files: aggregateState.files,
|
||||
chunks: aggregateState.chunks,
|
||||
dirty: this.dirty || this.sessionsDirty,
|
||||
workspaceDir: this.workspaceDir,
|
||||
dbPath: this.settings.store.path,
|
||||
@@ -755,7 +751,7 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
requestedProvider: this.requestedProvider,
|
||||
sources: Array.from(this.sources),
|
||||
extraPaths: this.settings.extraPaths,
|
||||
sourceCounts,
|
||||
sourceCounts: aggregateState.sourceCounts,
|
||||
cache: this.cache.enabled
|
||||
? {
|
||||
enabled: true,
|
||||
|
||||
Reference in New Issue
Block a user