perf: extract memory manager state helpers

This commit is contained in:
Peter Steinberger
2026-04-06 20:21:05 +01:00
parent 8d2daf7ef2
commit f4d8393bf4
10 changed files with 760 additions and 858 deletions

View File

@@ -116,26 +116,7 @@ describe("memory index", () => {
let indexMainPath = "";
let indexExtraPath = "";
let indexMultimodalPath = "";
let indexSourceChangePath = "";
let indexModelPath = "";
let indexFtsOnlyPath = "";
let sourceChangeStateDir = "";
const sourceChangeSessionLogLines = [
JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "session change test user line" }],
},
}),
JSON.stringify({
type: "message",
message: {
role: "assistant",
content: [{ type: "text", text: "session change test assistant line" }],
},
}),
].join("\n");
const managersForCleanup = new Set<MemoryIndexManager>();
@@ -148,10 +129,7 @@ describe("memory index", () => {
indexVectorPath = path.join(workspaceDir, "index-vector.sqlite");
indexExtraPath = path.join(workspaceDir, "index-extra.sqlite");
indexMultimodalPath = path.join(workspaceDir, "index-multimodal.sqlite");
indexSourceChangePath = path.join(workspaceDir, "index-source-change.sqlite");
indexModelPath = path.join(workspaceDir, "index-model-change.sqlite");
indexFtsOnlyPath = path.join(workspaceDir, "index-fts-only.sqlite");
sourceChangeStateDir = path.join(fixtureRoot, "state-source-change");
});
afterAll(async () => {
@@ -419,61 +397,6 @@ describe("memory index", () => {
await manager.close?.();
});
it("reindexes sessions when source config adds sessions to an existing index", async () => {
const stateDir = sourceChangeStateDir;
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
await fs.rm(stateDir, { recursive: true, force: true });
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
path.join(sessionDir, "session-source-change.jsonl"),
`${sourceChangeSessionLogLines}\n`,
);
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateDir;
const firstCfg = createCfg({
storePath: indexSourceChangePath,
sources: ["memory"],
sessionMemory: false,
});
const secondCfg = createCfg({
storePath: indexSourceChangePath,
sources: ["memory", "sessions"],
sessionMemory: true,
});
try {
const first = await getMemorySearchManager({ cfg: firstCfg, agentId: "main" });
const firstManager = requireManager(first);
await firstManager.sync?.({ reason: "test" });
const firstStatus = firstManager.status();
expect(
firstStatus.sourceCounts?.find((entry) => entry.source === "sessions")?.files ?? 0,
).toBe(0);
await firstManager.close?.();
const second = await getMemorySearchManager({ cfg: secondCfg, agentId: "main" });
const secondManager = requireManager(second);
await secondManager.sync?.({ reason: "test" });
const secondStatus = secondManager.status();
expect(secondStatus.sourceCounts?.find((entry) => entry.source === "sessions")?.files).toBe(
1,
);
expect(
secondStatus.sourceCounts?.find((entry) => entry.source === "sessions")?.chunks ?? 0,
).toBeGreaterThan(0);
await secondManager.close?.();
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
}
});
it("targets explicit session files during post-compaction sync", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
@@ -550,41 +473,13 @@ describe("memory index", () => {
})}\n`,
);
const originalPrepare = db.prepare.bind(db);
let bulkSessionStateAllCalls = 0;
let perFileSessionHashPrepareCalls = 0;
db.prepare = ((sql: string) => {
const statement = originalPrepare(sql);
if (sql === `SELECT path, hash FROM files WHERE source = ?`) {
if (!statement.all) {
throw new Error("expected sqlite statement.all for bulk session state query");
}
const bulkAll = statement.all.bind(statement);
return {
...statement,
all: (...args: unknown[]) => {
bulkSessionStateAllCalls += 1;
return bulkAll(...args);
},
};
}
if (sql === `SELECT hash FROM files WHERE path = ? AND source = ?`) {
perFileSessionHashPrepareCalls += 1;
}
return statement;
}) as typeof db.prepare;
await manager.sync?.({
reason: "post-compaction",
sessionFiles: [firstSessionPath],
});
db.prepare = originalPrepare;
expect(getSessionHash("sessions/targeted-first.jsonl")).not.toBe(firstOriginalHash);
expect(getSessionHash("sessions/targeted-second.jsonl")).toBe(secondOriginalHash);
expect(bulkSessionStateAllCalls).toBe(0);
expect(perFileSessionHashPrepareCalls).toBeGreaterThan(0);
await manager.close?.();
} finally {
if (previousStateDir === undefined) {
@@ -596,368 +491,6 @@ describe("memory index", () => {
}
});
it("preserves unrelated dirty sessions after targeted post-compaction sync", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-dirty-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
const firstSessionPath = path.join(sessionDir, "targeted-dirty-first.jsonl");
const secondSessionPath = path.join(sessionDir, "targeted-dirty-second.jsonl");
const storePath = path.join(workspaceDir, `index-targeted-dirty-${randomUUID()}.sqlite`);
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateDir;
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
firstSessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "first transcript v1" }] },
})}\n`,
);
await fs.writeFile(
secondSessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "second transcript v1" }] },
})}\n`,
);
try {
const manager = requireManager(
await getMemorySearchManager({
cfg: createCfg({
storePath,
sources: ["sessions"],
sessionMemory: true,
}),
agentId: "main",
}),
);
await manager.sync({ reason: "test" });
const db = (
manager as unknown as {
db: {
prepare: (sql: string) => {
get: (path: string, source: string) => { hash: string } | undefined;
};
};
}
).db;
const getSessionHash = (sessionPath: string) =>
db
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
.get(sessionPath, "sessions")?.hash;
const firstOriginalHash = getSessionHash("sessions/targeted-dirty-first.jsonl");
const secondOriginalHash = getSessionHash("sessions/targeted-dirty-second.jsonl");
await fs.writeFile(
firstSessionPath,
`${JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "first transcript v2 after compaction" }],
},
})}\n`,
);
await fs.writeFile(
secondSessionPath,
`${JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "second transcript v2 still pending" }],
},
})}\n`,
);
const internal = manager as unknown as {
sessionsDirty: boolean;
sessionsDirtyFiles: Set<string>;
};
internal.sessionsDirty = true;
internal.sessionsDirtyFiles.add(secondSessionPath);
await manager.sync({
reason: "post-compaction",
sessionFiles: [firstSessionPath],
});
expect(getSessionHash("sessions/targeted-dirty-first.jsonl")).not.toBe(firstOriginalHash);
expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).toBe(secondOriginalHash);
expect(internal.sessionsDirtyFiles.has(secondSessionPath)).toBe(true);
expect(internal.sessionsDirty).toBe(true);
await manager.sync({ reason: "test" });
expect(getSessionHash("sessions/targeted-dirty-second.jsonl")).not.toBe(secondOriginalHash);
await manager.close?.();
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
await fs.rm(storePath, { force: true });
}
});
it("queues targeted session sync when another sync is already in progress", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-queued-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
const sessionPath = path.join(sessionDir, "targeted-queued.jsonl");
const storePath = path.join(workspaceDir, `index-targeted-queued-${randomUUID()}.sqlite`);
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateDir;
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
sessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "queued transcript v1" }] },
})}\n`,
);
try {
const manager = requireManager(
await getMemorySearchManager({
cfg: createCfg({
storePath,
sources: ["sessions"],
sessionMemory: true,
}),
agentId: "main",
}),
);
await manager.sync({ reason: "test" });
const db = (
manager as unknown as {
db: {
prepare: (sql: string) => {
get: (path: string, source: string) => { hash: string } | undefined;
};
};
}
).db;
const getSessionHash = (sessionRelPath: string) =>
db
.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`)
.get(sessionRelPath, "sessions")?.hash;
const originalHash = getSessionHash("sessions/targeted-queued.jsonl");
const internal = manager as unknown as {
runSyncWithReadonlyRecovery: (params?: {
reason?: string;
sessionFiles?: string[];
}) => Promise<void>;
};
const originalRunSync = internal.runSyncWithReadonlyRecovery.bind(manager);
let releaseBusySync: (() => void) | undefined;
const busyGate = new Promise<void>((resolve) => {
releaseBusySync = resolve;
});
internal.runSyncWithReadonlyRecovery = async (params) => {
if (params?.reason === "busy-sync") {
await busyGate;
}
return await originalRunSync(params);
};
const busySyncPromise = manager.sync({ reason: "busy-sync" });
await fs.writeFile(
sessionPath,
`${JSON.stringify({
type: "message",
message: {
role: "user",
content: [{ type: "text", text: "queued transcript v2 after compaction" }],
},
})}\n`,
);
const targetedSyncPromise = manager.sync({
reason: "post-compaction",
sessionFiles: [sessionPath],
});
releaseBusySync?.();
await Promise.all([busySyncPromise, targetedSyncPromise]);
expect(getSessionHash("sessions/targeted-queued.jsonl")).not.toBe(originalHash);
await manager.close?.();
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
await fs.rm(storePath, { force: true });
}
});
it("runs a full reindex after fallback activates during targeted sync", async () => {
const stateDir = path.join(fixtureRoot, `state-targeted-fallback-${randomUUID()}`);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
const sessionPath = path.join(sessionDir, "targeted-fallback.jsonl");
const storePath = path.join(workspaceDir, `index-targeted-fallback-${randomUUID()}.sqlite`);
const previousStateDir = process.env.OPENCLAW_STATE_DIR;
process.env.OPENCLAW_STATE_DIR = stateDir;
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
sessionPath,
`${JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "fallback transcript v1" }] },
})}\n`,
);
try {
const manager = requireManager(
await getMemorySearchManager({
cfg: createCfg({
storePath,
sources: ["sessions"],
sessionMemory: true,
}),
agentId: "main",
}),
);
await manager.sync({ reason: "test" });
const internal = manager as unknown as {
syncSessionFiles: (params: {
targetSessionFiles?: string[];
needsFullReindex: boolean;
}) => Promise<void>;
shouldFallbackOnError: (message: string) => boolean;
activateFallbackProvider: (reason: string) => Promise<boolean>;
runSafeReindex: (params: {
reason?: string;
force?: boolean;
progress?: unknown;
}) => Promise<void>;
runUnsafeReindex: (params: {
reason?: string;
force?: boolean;
progress?: unknown;
}) => Promise<void>;
};
const originalSyncSessionFiles = internal.syncSessionFiles.bind(manager);
const originalShouldFallbackOnError = internal.shouldFallbackOnError.bind(manager);
const originalActivateFallbackProvider = internal.activateFallbackProvider.bind(manager);
const originalRunSafeReindex = internal.runSafeReindex.bind(manager);
const originalRunUnsafeReindex = internal.runUnsafeReindex.bind(manager);
internal.syncSessionFiles = async (params) => {
if (params.targetSessionFiles?.length) {
throw new Error("embedding backend failed");
}
return await originalSyncSessionFiles(params);
};
internal.shouldFallbackOnError = () => true;
const activateFallbackProvider = vi.fn(async () => true);
internal.activateFallbackProvider = activateFallbackProvider;
const runSafeReindex = vi.fn(async () => {});
internal.runSafeReindex = runSafeReindex;
const runUnsafeReindex = vi.fn(async () => {});
internal.runUnsafeReindex = runUnsafeReindex;
await manager.sync({
reason: "post-compaction",
sessionFiles: [sessionPath],
});
expect(activateFallbackProvider).toHaveBeenCalledWith("embedding backend failed");
const expectedReindexParams = {
reason: "post-compaction",
force: true,
progress: undefined,
};
const usesUnsafeReindex =
process.env.OPENCLAW_TEST_FAST === "1" &&
process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1";
if (usesUnsafeReindex) {
expect(runUnsafeReindex).toHaveBeenCalledWith(expectedReindexParams);
expect(runSafeReindex).not.toHaveBeenCalled();
} else {
expect(runSafeReindex).toHaveBeenCalledWith(expectedReindexParams);
expect(runUnsafeReindex).not.toHaveBeenCalled();
}
internal.syncSessionFiles = originalSyncSessionFiles;
internal.shouldFallbackOnError = originalShouldFallbackOnError;
internal.activateFallbackProvider = originalActivateFallbackProvider;
internal.runSafeReindex = originalRunSafeReindex;
internal.runUnsafeReindex = originalRunUnsafeReindex;
await manager.close?.();
} finally {
if (previousStateDir === undefined) {
delete process.env.OPENCLAW_STATE_DIR;
} else {
process.env.OPENCLAW_STATE_DIR = previousStateDir;
}
await fs.rm(stateDir, { recursive: true, force: true });
await fs.rm(storePath, { force: true });
}
});
it("reindexes when the embedding model changes", async () => {
const base = createCfg({ storePath: indexModelPath });
const baseAgents = base.agents!;
const baseDefaults = baseAgents.defaults!;
const baseMemorySearch = baseDefaults.memorySearch!;
const first = await getMemorySearchManager({
cfg: {
...base,
agents: {
...baseAgents,
defaults: {
...baseDefaults,
memorySearch: {
...baseMemorySearch,
model: "mock-embed-v1",
},
},
},
},
agentId: "main",
});
const firstManager = requireManager(first);
await firstManager.sync?.({ reason: "test" });
const callsAfterFirstSync = embedBatchCalls;
await firstManager.close?.();
const second = await getMemorySearchManager({
cfg: {
...base,
agents: {
...baseAgents,
defaults: {
...baseDefaults,
memorySearch: {
...baseMemorySearch,
model: "mock-embed-v2",
},
},
},
},
agentId: "main",
});
const secondManager = requireManager(second);
await secondManager.sync?.({ reason: "test" });
expect(embedBatchCalls).toBeGreaterThan(callsAfterFirstSync);
const status = secondManager.status();
expect(status.files).toBeGreaterThan(0);
await secondManager.close?.();
});
it("passes Gemini outputDimensionality from config into the provider", async () => {
const cfg = createCfg({
storePath: indexMainPath,
@@ -1000,239 +533,6 @@ describe("memory index", () => {
await manager.close?.();
});
it("snapshots builtin file hashes with a single sqlite query per sync", async () => {
await fs.writeFile(path.join(memoryDir, "2026-01-13.md"), "beta line\n");
const cfg = createCfg({
storePath: path.join(workspaceDir, `index-prepare-reuse-${randomUUID()}.sqlite`),
onSearch: false,
});
const result = await getMemorySearchManager({ cfg, agentId: "main" });
const manager = requireManager(result);
managersForCleanup.add(manager);
await manager.sync({ reason: "test" });
(manager as unknown as { dirty: boolean }).dirty = true;
const db = (
manager as unknown as {
db: {
prepare: (sql: string) => { get: (...args: unknown[]) => unknown };
};
}
).db;
const originalPrepare = db.prepare.bind(db);
let selectSourceFileStatePrepareCalls = 0;
let perFileHashPrepareCalls = 0;
db.prepare = ((sql: string) => {
if (sql === `SELECT path, hash FROM files WHERE source = ?`) {
selectSourceFileStatePrepareCalls += 1;
}
if (sql === `SELECT hash FROM files WHERE path = ? AND source = ?`) {
perFileHashPrepareCalls += 1;
}
return originalPrepare(sql);
}) as typeof db.prepare;
try {
await manager.sync({ reason: "test" });
} finally {
db.prepare = originalPrepare;
}
expect(selectSourceFileStatePrepareCalls).toBe(1);
expect(perFileHashPrepareCalls).toBe(0);
});
it("uses a single sqlite aggregation query for status counts", async () => {
const cfg = createCfg({
storePath: path.join(workspaceDir, `index-status-aggregate-${randomUUID()}.sqlite`),
sources: ["memory", "sessions"],
sessionMemory: true,
onSearch: false,
});
await fs.writeFile(path.join(memoryDir, "2026-01-13.md"), "beta line\n");
const stateDir = path.join(fixtureRoot, `state-status-${randomUUID()}`);
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
const sessionDir = path.join(stateDir, "agents", "main", "sessions");
await fs.mkdir(sessionDir, { recursive: true });
await fs.writeFile(
path.join(sessionDir, "status.jsonl"),
JSON.stringify({
type: "message",
message: { role: "user", content: [{ type: "text", text: "session status line" }] },
}) + "\n",
);
const result = await getMemorySearchManager({ cfg, agentId: "main" });
const manager = requireManager(result);
managersForCleanup.add(manager);
await manager.sync({ reason: "test" });
const db = (
manager as unknown as {
db: {
prepare: (sql: string) => { all: (...args: unknown[]) => unknown };
};
}
).db;
const originalPrepare = db.prepare.bind(db);
let aggregatePrepareCalls = 0;
let legacyCountPrepareCalls = 0;
db.prepare = ((sql: string) => {
if (
sql.includes(`SELECT 'files' AS kind, source, COUNT(*) as c FROM files`) &&
sql.includes(`UNION ALL`)
) {
aggregatePrepareCalls += 1;
}
if (
sql === `SELECT COUNT(*) as c FROM files WHERE 1=1` ||
sql === `SELECT COUNT(*) as c FROM chunks WHERE 1=1` ||
sql === `SELECT source, COUNT(*) as c FROM files WHERE 1=1 GROUP BY source` ||
sql === `SELECT source, COUNT(*) as c FROM chunks WHERE 1=1 GROUP BY source`
) {
legacyCountPrepareCalls += 1;
}
return originalPrepare(sql);
}) as typeof db.prepare;
try {
const status = manager.status();
expect(status.files).toBeGreaterThan(0);
expect(status.chunks).toBeGreaterThan(0);
expect(
status.sourceCounts?.find((entry) => entry.source === "memory")?.files,
).toBeGreaterThan(0);
expect(
status.sourceCounts?.find((entry) => entry.source === "sessions")?.files,
).toBeGreaterThan(0);
} finally {
db.prepare = originalPrepare;
vi.unstubAllEnvs();
}
expect(aggregatePrepareCalls).toBe(1);
expect(legacyCountPrepareCalls).toBe(0);
});
it("reindexes when Gemini outputDimensionality changes", async () => {
const base = createCfg({
storePath: indexModelPath,
provider: "gemini",
model: "gemini-embedding-2-preview",
outputDimensionality: 3072,
});
const baseAgents = base.agents!;
const baseDefaults = baseAgents.defaults!;
const baseMemorySearch = baseDefaults.memorySearch!;
const first = await getMemorySearchManager({ cfg: base, agentId: "main" });
const firstManager = requireManager(first);
await firstManager.sync?.({ reason: "test" });
const callsAfterFirstSync = embedBatchCalls;
await firstManager.close?.();
const second = await getMemorySearchManager({
cfg: {
...base,
agents: {
...baseAgents,
defaults: {
...baseDefaults,
memorySearch: {
...baseMemorySearch,
outputDimensionality: 768,
},
},
},
},
agentId: "main",
});
const secondManager = requireManager(second);
await secondManager.sync?.({ reason: "test" });
expect(embedBatchCalls).toBeGreaterThan(callsAfterFirstSync);
await secondManager.close?.();
});
it("reindexes when extraPaths change", async () => {
const storePath = path.join(workspaceDir, `index-scope-extra-${randomUUID()}.sqlite`);
const firstExtraDir = path.join(workspaceDir, "scope-extra-a");
const secondExtraDir = path.join(workspaceDir, "scope-extra-b");
await fs.rm(firstExtraDir, { recursive: true, force: true });
await fs.rm(secondExtraDir, { recursive: true, force: true });
await fs.mkdir(firstExtraDir, { recursive: true });
await fs.mkdir(secondExtraDir, { recursive: true });
await fs.writeFile(path.join(firstExtraDir, "a.md"), "alpha only");
await fs.writeFile(path.join(secondExtraDir, "b.md"), "beta only");
const first = await getMemorySearchManager({
cfg: createCfg({
storePath,
extraPaths: [firstExtraDir],
}),
agentId: "main",
});
const firstManager = requireManager(first);
await firstManager.sync?.({ reason: "test" });
await firstManager.close?.();
const second = await getMemorySearchManager({
cfg: createCfg({
storePath,
extraPaths: [secondExtraDir],
}),
agentId: "main",
});
const secondManager = requireManager(second);
await secondManager.sync?.({ reason: "test" });
const results = await secondManager.search("beta");
expect(results.some((result) => result.path.endsWith("scope-extra-b/b.md"))).toBe(true);
expect(results.some((result) => result.path.endsWith("scope-extra-a/a.md"))).toBe(false);
await secondManager.close?.();
});
it("reindexes when multimodal settings change", async () => {
const storePath = path.join(workspaceDir, `index-scope-multimodal-${randomUUID()}.sqlite`);
const mediaDir = path.join(workspaceDir, "scope-media");
await fs.rm(mediaDir, { recursive: true, force: true });
await fs.mkdir(mediaDir, { recursive: true });
await fs.writeFile(path.join(mediaDir, "diagram.png"), Buffer.from("png"));
const first = await getMemorySearchManager({
cfg: createCfg({
storePath,
provider: "gemini",
model: "gemini-embedding-2-preview",
extraPaths: [mediaDir],
}),
agentId: "main",
});
const firstManager = requireManager(first);
await firstManager.sync?.({ reason: "test" });
const multimodalCallsAfterFirstSync = embedBatchInputCalls;
await firstManager.close?.();
const second = await getMemorySearchManager({
cfg: createCfg({
storePath,
provider: "gemini",
model: "gemini-embedding-2-preview",
extraPaths: [mediaDir],
multimodal: { enabled: true, modalities: ["image"] },
}),
agentId: "main",
});
const secondManager = requireManager(second);
await secondManager.sync?.({ reason: "test" });
expect(embedBatchInputCalls).toBeGreaterThan(multimodalCallsAfterFirstSync);
const results = await secondManager.search("image");
expect(results.some((result) => result.path.endsWith("scope-media/diagram.png"))).toBe(true);
await secondManager.close?.();
});
it("reuses cached embeddings on forced reindex", async () => {
const cfg = createCfg({ storePath: indexMainPath, cacheEnabled: true });
const manager = await getPersistentManager(cfg);

View File

@@ -0,0 +1,160 @@
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { describe, expect, it } from "vitest";
import {
resolveConfiguredScopeHash,
resolveConfiguredSourcesForMeta,
shouldRunFullMemoryReindex,
type MemoryIndexMeta,
} from "./manager-reindex-state.js";
function createMeta(overrides: Partial<MemoryIndexMeta> = {}): MemoryIndexMeta {
return {
model: "mock-embed-v1",
provider: "openai",
providerKey: "provider-key-v1",
sources: ["memory"],
scopeHash: "scope-v1",
chunkTokens: 4000,
chunkOverlap: 0,
ftsTokenizer: "unicode61",
...overrides,
};
}
function createFullReindexParams(
overrides: {
meta?: MemoryIndexMeta | null;
provider?: { id: string; model: string } | null;
providerKey?: string;
configuredSources?: MemorySource[];
configuredScopeHash?: string;
chunkTokens?: number;
chunkOverlap?: number;
vectorReady?: boolean;
ftsTokenizer?: string;
} = {},
) {
return {
meta: createMeta(),
provider: { id: "openai", model: "mock-embed-v1" },
providerKey: "provider-key-v1",
configuredSources: ["memory"] as MemorySource[],
configuredScopeHash: "scope-v1",
chunkTokens: 4000,
chunkOverlap: 0,
vectorReady: false,
ftsTokenizer: "unicode61",
...overrides,
};
}
describe("memory reindex state", () => {
it("requires a full reindex when the embedding model changes", () => {
expect(
shouldRunFullMemoryReindex(
createFullReindexParams({
provider: { id: "openai", model: "mock-embed-v2" },
}),
),
).toBe(true);
});
it("requires a full reindex when the provider cache key changes", () => {
expect(
shouldRunFullMemoryReindex(
createFullReindexParams({
provider: { id: "gemini", model: "gemini-embedding-2-preview" },
providerKey: "provider-key-dims-768",
meta: createMeta({
provider: "gemini",
model: "gemini-embedding-2-preview",
providerKey: "provider-key-dims-3072",
}),
}),
),
).toBe(true);
});
it("requires a full reindex when extraPaths change", () => {
const workspaceDir = "/tmp/workspace";
const firstScopeHash = resolveConfiguredScopeHash({
workspaceDir,
extraPaths: ["/tmp/workspace/a"],
multimodal: {
enabled: false,
modalities: [],
maxFileBytes: 20 * 1024 * 1024,
},
});
const secondScopeHash = resolveConfiguredScopeHash({
workspaceDir,
extraPaths: ["/tmp/workspace/b"],
multimodal: {
enabled: false,
modalities: [],
maxFileBytes: 20 * 1024 * 1024,
},
});
expect(
shouldRunFullMemoryReindex(
createFullReindexParams({
meta: createMeta({ scopeHash: firstScopeHash }),
configuredScopeHash: secondScopeHash,
}),
),
).toBe(true);
});
it("requires a full reindex when configured sources add sessions", () => {
expect(
shouldRunFullMemoryReindex(
createFullReindexParams({
configuredSources: ["memory", "sessions"],
}),
),
).toBe(true);
});
it("requires a full reindex when multimodal settings change", () => {
const workspaceDir = "/tmp/workspace";
const firstScopeHash = resolveConfiguredScopeHash({
workspaceDir,
extraPaths: ["/tmp/workspace/media"],
multimodal: {
enabled: false,
modalities: [],
maxFileBytes: 20 * 1024 * 1024,
},
});
const secondScopeHash = resolveConfiguredScopeHash({
workspaceDir,
extraPaths: ["/tmp/workspace/media"],
multimodal: {
enabled: true,
modalities: ["image"],
maxFileBytes: 20 * 1024 * 1024,
},
});
expect(
shouldRunFullMemoryReindex(
createFullReindexParams({
meta: createMeta({ scopeHash: firstScopeHash }),
configuredScopeHash: secondScopeHash,
}),
),
).toBe(true);
});
it("keeps older indexes with missing sources compatible with memory-only config", () => {
expect(
shouldRunFullMemoryReindex(
createFullReindexParams({
meta: createMeta({ sources: undefined }),
configuredSources: resolveConfiguredSourcesForMeta(new Set(["memory"])),
}),
),
).toBe(false);
});
});

View File

@@ -0,0 +1,103 @@
import {
hashText,
normalizeExtraMemoryPaths,
type MemorySource,
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
export type MemoryIndexMeta = {
model: string;
provider: string;
providerKey?: string;
sources?: MemorySource[];
scopeHash?: string;
chunkTokens: number;
chunkOverlap: number;
vectorDims?: number;
ftsTokenizer?: string;
};
export function resolveConfiguredSourcesForMeta(sources: Iterable<MemorySource>): MemorySource[] {
const normalized = Array.from(sources)
.filter((source): source is MemorySource => source === "memory" || source === "sessions")
.toSorted();
return normalized.length > 0 ? normalized : ["memory"];
}
export function normalizeMetaSources(meta: MemoryIndexMeta): MemorySource[] {
if (!Array.isArray(meta.sources)) {
// Backward compatibility for older indexes that did not persist sources.
return ["memory"];
}
const normalized = Array.from(
new Set(
meta.sources.filter(
(source): source is MemorySource => source === "memory" || source === "sessions",
),
),
).toSorted();
return normalized.length > 0 ? normalized : ["memory"];
}
export function configuredMetaSourcesDiffer(params: {
meta: MemoryIndexMeta;
configuredSources: MemorySource[];
}): boolean {
const metaSources = normalizeMetaSources(params.meta);
if (metaSources.length !== params.configuredSources.length) {
return true;
}
return metaSources.some((source, index) => source !== params.configuredSources[index]);
}
export function resolveConfiguredScopeHash(params: {
workspaceDir: string;
extraPaths?: string[];
multimodal: {
enabled: boolean;
modalities: string[];
maxFileBytes: number;
};
}): string {
const extraPaths = normalizeExtraMemoryPaths(params.workspaceDir, params.extraPaths)
.map((value) => value.replace(/\\/g, "/"))
.toSorted();
return hashText(
JSON.stringify({
extraPaths,
multimodal: {
enabled: params.multimodal.enabled,
modalities: [...params.multimodal.modalities].toSorted(),
maxFileBytes: params.multimodal.maxFileBytes,
},
}),
);
}
export function shouldRunFullMemoryReindex(params: {
meta: MemoryIndexMeta | null;
provider: { id: string; model: string } | null;
providerKey?: string;
configuredSources: MemorySource[];
configuredScopeHash: string;
chunkTokens: number;
chunkOverlap: number;
vectorReady: boolean;
ftsTokenizer: string;
}): boolean {
const { meta } = params;
return (
!meta ||
(params.provider ? meta.model !== params.provider.model : meta.model !== "fts-only") ||
(params.provider ? meta.provider !== params.provider.id : meta.provider !== "none") ||
meta.providerKey !== params.providerKey ||
configuredMetaSourcesDiffer({
meta,
configuredSources: params.configuredSources,
}) ||
meta.scopeHash !== params.configuredScopeHash ||
meta.chunkTokens !== params.chunkTokens ||
meta.chunkOverlap !== params.chunkOverlap ||
(params.vectorReady && !meta.vectorDims) ||
(meta.ftsTokenizer ?? "unicode61") !== params.ftsTokenizer
);
}

View File

@@ -0,0 +1,87 @@
import { describe, expect, it } from "vitest";
import {
loadMemorySourceFileState,
MEMORY_SOURCE_FILE_HASH_SQL,
MEMORY_SOURCE_FILE_STATE_SQL,
resolveMemorySourceExistingHash,
} from "./manager-source-state.js";
describe("memory source state", () => {
it("loads source hashes with one bulk query", () => {
const calls: Array<{ sql: string; args: unknown[] }> = [];
const state = loadMemorySourceFileState({
db: {
prepare: (sql) => ({
all: (...args) => {
calls.push({ sql, args });
return [
{ path: "memory/one.md", hash: "hash-1" },
{ path: "memory/two.md", hash: "hash-2" },
];
},
get: () => undefined,
}),
},
source: "memory",
});
expect(calls).toEqual([{ sql: MEMORY_SOURCE_FILE_STATE_SQL, args: ["memory"] }]);
expect(state.rows).toEqual([
{ path: "memory/one.md", hash: "hash-1" },
{ path: "memory/two.md", hash: "hash-2" },
]);
expect(state.hashes).toEqual(
new Map([
["memory/one.md", "hash-1"],
["memory/two.md", "hash-2"],
]),
);
});
it("uses bulk snapshot hashes when present", () => {
const calls: Array<{ sql: string; args: unknown[] }> = [];
const hash = resolveMemorySourceExistingHash({
db: {
prepare: (sql) => ({
all: () => [],
get: (...args) => {
calls.push({ sql, args });
return { hash: "unexpected" };
},
}),
},
source: "sessions",
path: "sessions/thread.jsonl",
existingHashes: new Map([["sessions/thread.jsonl", "hash-from-snapshot"]]),
});
expect(hash).toBe("hash-from-snapshot");
expect(calls).toEqual([]);
});
it("falls back to per-file lookups without a bulk snapshot", () => {
const calls: Array<{ sql: string; args: unknown[] }> = [];
const hash = resolveMemorySourceExistingHash({
db: {
prepare: (sql) => ({
all: () => [],
get: (...args) => {
calls.push({ sql, args });
return { hash: "hash-from-row" };
},
}),
},
source: "sessions",
path: "sessions/thread.jsonl",
existingHashes: null,
});
expect(hash).toBe("hash-from-row");
expect(calls).toEqual([
{
sql: MEMORY_SOURCE_FILE_HASH_SQL,
args: ["sessions/thread.jsonl", "sessions"],
},
]);
});
});

View File

@@ -0,0 +1,49 @@
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
export type MemorySourceFileStateRow = {
path: string;
hash: string;
};
type MemorySourceStateDb = {
prepare: (sql: string) => {
all: (...args: unknown[]) => unknown;
get: (...args: unknown[]) => unknown;
};
};
export const MEMORY_SOURCE_FILE_STATE_SQL = `SELECT path, hash FROM files WHERE source = ?`;
export const MEMORY_SOURCE_FILE_HASH_SQL = `SELECT hash FROM files WHERE path = ? AND source = ?`;
export function loadMemorySourceFileState(params: {
db: MemorySourceStateDb;
source: MemorySource;
}): {
rows: MemorySourceFileStateRow[];
hashes: Map<string, string>;
} {
const rows = params.db.prepare(MEMORY_SOURCE_FILE_STATE_SQL).all(params.source) as
| MemorySourceFileStateRow[]
| undefined;
const normalizedRows = rows ?? [];
return {
rows: normalizedRows,
hashes: new Map(normalizedRows.map((row) => [row.path, row.hash])),
};
}
export function resolveMemorySourceExistingHash(params: {
db: MemorySourceStateDb;
source: MemorySource;
path: string;
existingHashes?: Map<string, string> | null;
}): string | undefined {
if (params.existingHashes) {
return params.existingHashes.get(params.path);
}
return (
params.db.prepare(MEMORY_SOURCE_FILE_HASH_SQL).get(params.path, params.source) as
| { hash: string }
| undefined
)?.hash;
}

View File

@@ -1,5 +1,11 @@
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { describe, expect, it } from "vitest";
import { resolveInitialMemoryDirty, resolveStatusProviderInfo } from "./manager-status-state.js";
import {
collectMemoryStatusAggregate,
MEMORY_STATUS_AGGREGATE_SQL,
resolveInitialMemoryDirty,
resolveStatusProviderInfo,
} from "./manager-status-state.js";
describe("memory manager status state", () => {
it("keeps memory clean for status-only managers after prior indexing", () => {
@@ -51,4 +57,41 @@ describe("memory manager status state", () => {
searchMode: "fts-only",
});
});
it("uses one aggregation query for status counts and source breakdowns", () => {
const calls: Array<{ sql: string; params: MemorySource[] }> = [];
const aggregate = collectMemoryStatusAggregate({
db: {
prepare: (sql) => ({
all: (...params) => {
calls.push({ sql, params });
return [
{ kind: "files" as const, source: "memory" as const, c: 2 },
{ kind: "chunks" as const, source: "memory" as const, c: 5 },
{ kind: "files" as const, source: "sessions" as const, c: 1 },
{ kind: "chunks" as const, source: "sessions" as const, c: 3 },
];
},
}),
},
sources: ["memory", "sessions"],
sourceFilterSql: " AND source IN (?, ?)",
sourceFilterParams: ["memory", "sessions"],
});
expect(calls).toEqual([
{
sql: MEMORY_STATUS_AGGREGATE_SQL.replaceAll("__FILTER__", " AND source IN (?, ?)"),
params: ["memory", "sessions", "memory", "sessions"],
},
]);
expect(aggregate).toEqual({
files: 3,
chunks: 8,
sourceCounts: [
{ source: "memory", files: 2, chunks: 5 },
{ source: "sessions", files: 1, chunks: 3 },
],
});
});
});

View File

@@ -1,8 +1,27 @@
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
type StatusProvider = {
id: string;
model: string;
};
type StatusAggregateRow = {
kind: "files" | "chunks";
source: MemorySource;
c: number;
};
type StatusAggregateDb = {
prepare: (sql: string) => {
all: (...args: MemorySource[]) => StatusAggregateRow[];
};
};
export const MEMORY_STATUS_AGGREGATE_SQL =
`SELECT 'files' AS kind, source, COUNT(*) as c FROM files WHERE 1=1__FILTER__ GROUP BY source\n` +
`UNION ALL\n` +
`SELECT 'chunks' AS kind, source, COUNT(*) as c FROM chunks WHERE 1=1__FILTER__ GROUP BY source`;
export function resolveInitialMemoryDirty(params: {
hasMemorySource: boolean;
statusOnly: boolean;
@@ -41,3 +60,44 @@ export function resolveStatusProviderInfo(params: {
searchMode: "hybrid",
};
}
export function collectMemoryStatusAggregate(params: {
db: StatusAggregateDb;
sources: Iterable<MemorySource>;
sourceFilterSql?: string;
sourceFilterParams?: MemorySource[];
}): {
files: number;
chunks: number;
sourceCounts: Array<{ source: MemorySource; files: number; chunks: number }>;
} {
const sources = Array.from(params.sources);
const bySource = new Map<MemorySource, { files: number; chunks: number }>();
for (const source of sources) {
bySource.set(source, { files: 0, chunks: 0 });
}
const sourceFilterSql = params.sourceFilterSql ?? "";
const sourceFilterParams = params.sourceFilterParams ?? [];
const aggregateRows = params.db
.prepare(MEMORY_STATUS_AGGREGATE_SQL.replaceAll("__FILTER__", sourceFilterSql))
.all(...sourceFilterParams, ...sourceFilterParams);
let files = 0;
let chunks = 0;
for (const row of aggregateRows) {
const count = row.c ?? 0;
const entry = bySource.get(row.source) ?? { files: 0, chunks: 0 };
if (row.kind === "files") {
entry.files = count;
files += count;
} else {
entry.chunks = count;
chunks += count;
}
bySource.set(row.source, entry);
}
return {
files,
chunks,
sourceCounts: sources.map((source) => Object.assign({ source }, bySource.get(source)!)),
};
}

View File

@@ -26,14 +26,11 @@ import {
} from "openclaw/plugin-sdk/memory-core-host-engine-qmd";
import {
buildFileEntry,
ensureDir,
ensureMemoryIndexSchema,
hashText,
isFileMissingError,
listMemoryFiles,
loadSqliteVecExtension,
normalizeExtraMemoryPaths,
requireNodeSqlite,
runWithConcurrency,
type MemoryFileEntry,
type MemorySource,
@@ -46,19 +43,19 @@ import {
type EmbeddingProviderRuntime,
resolveEmbeddingProviderFallbackModel,
} from "./embeddings.js";
import { openMemoryDatabaseAtPath } from "./manager-db.js";
import {
resolveConfiguredScopeHash,
resolveConfiguredSourcesForMeta,
shouldRunFullMemoryReindex,
type MemoryIndexMeta,
} from "./manager-reindex-state.js";
import { shouldSyncSessionsForReindex } from "./manager-session-reindex.js";
type MemoryIndexMeta = {
model: string;
provider: string;
providerKey?: string;
sources?: MemorySource[];
scopeHash?: string;
chunkTokens: number;
chunkOverlap: number;
vectorDims?: number;
ftsTokenizer?: string;
};
import {
loadMemorySourceFileState,
resolveMemorySourceExistingHash,
} from "./manager-source-state.js";
import { runMemoryTargetedSessionSync } from "./manager-targeted-sync.js";
type MemorySyncProgressState = {
completed: number;
@@ -86,18 +83,6 @@ const IGNORED_MEMORY_WATCH_DIR_NAMES = new Set([
const log = createSubsystemLogger("memory");
export function openMemoryDatabaseAtPath(dbPath: string, allowExtension: boolean): DatabaseSync {
const dir = path.dirname(dbPath);
ensureDir(dir);
const { DatabaseSync } = requireNodeSqlite();
const db = new DatabaseSync(dbPath, { allowExtension });
// busy_timeout is per-connection and resets to 0 on restart.
// Set it on every open so concurrent processes retry instead of
// failing immediately with SQLITE_BUSY.
db.exec("PRAGMA busy_timeout = 5000");
return db;
}
function shouldIgnoreMemoryWatchPath(watchPath: string): boolean {
const normalized = path.normalize(watchPath);
const parts = normalized.split(path.sep).map((segment) => segment.trim().toLowerCase());
@@ -633,17 +618,6 @@ export abstract class MemoryManagerSyncOps {
return normalized.size > 0 ? normalized : null;
}
private clearSyncedSessionFiles(targetSessionFiles?: Iterable<string> | null) {
if (!targetSessionFiles) {
this.sessionsDirtyFiles.clear();
} else {
for (const targetSessionFile of targetSessionFiles) {
this.sessionsDirtyFiles.delete(targetSessionFile);
}
}
this.sessionsDirty = this.sessionsDirtyFiles.size > 0;
}
protected ensureIntervalSync() {
const minutes = this.settings.sync.intervalMinutes;
if (!minutes || minutes <= 0 || this.intervalTimer) {
@@ -685,7 +659,6 @@ export abstract class MemoryManagerSyncOps {
needsFullReindex: boolean;
progress?: MemorySyncProgressState;
}) {
const selectSourceFileState = this.db.prepare(`SELECT path, hash FROM files WHERE source = ?`);
const deleteFileByPathAndSource = this.db.prepare(
`DELETE FROM files WHERE path = ? AND source = ?`,
);
@@ -723,11 +696,12 @@ export abstract class MemoryManagerSyncOps {
batch: this.batch.enabled,
concurrency: this.getIndexConcurrency(),
});
const existingRows = selectSourceFileState.all("memory") as Array<{
path: string;
hash: string;
}>;
const existingHashes = new Map(existingRows.map((row) => [row.path, row.hash]));
const existingState = loadMemorySourceFileState({
db: this.db,
source: "memory",
});
const existingRows = existingState.rows;
const existingHashes = existingState.hashes;
const activePaths = new Set(fileEntries.map((entry) => entry.path));
if (params.progress) {
params.progress.total += fileEntries.length;
@@ -784,8 +758,6 @@ export abstract class MemoryManagerSyncOps {
targetSessionFiles?: string[];
progress?: MemorySyncProgressState;
}) {
const selectFileHash = this.db.prepare(`SELECT hash FROM files WHERE path = ? AND source = ?`);
const selectSourceFileState = this.db.prepare(`SELECT path, hash FROM files WHERE source = ?`);
const deleteFileByPathAndSource = this.db.prepare(
`DELETE FROM files WHERE path = ? AND source = ?`,
);
@@ -815,10 +787,10 @@ export abstract class MemoryManagerSyncOps {
const existingRows =
activePaths === null
? null
: (selectSourceFileState.all("sessions") as Array<{
path: string;
hash: string;
}>);
: loadMemorySourceFileState({
db: this.db,
source: "sessions",
}).rows;
const existingHashes =
existingRows === null ? null : new Map(existingRows.map((row) => [row.path, row.hash]));
const indexAll =
@@ -862,15 +834,12 @@ export abstract class MemoryManagerSyncOps {
}
return;
}
const existingHash =
existingHashes?.get(entry.path) ??
(
selectFileHash.get(entry.path, "sessions") as
| {
hash: string;
}
| undefined
)?.hash;
const existingHash = resolveMemorySourceExistingHash({
db: this.db,
source: "sessions",
path: entry.path,
existingHashes,
});
if (!params.needsFullReindex && existingHash === entry.hash) {
if (params.progress) {
params.progress.completed += 1;
@@ -964,60 +933,57 @@ export abstract class MemoryManagerSyncOps {
}
const vectorReady = await this.ensureVectorReady();
const meta = this.readMeta();
const configuredSources = this.resolveConfiguredSourcesForMeta();
const configuredScopeHash = this.resolveConfiguredScopeHash();
const configuredSources = resolveConfiguredSourcesForMeta(this.sources);
const configuredScopeHash = resolveConfiguredScopeHash({
workspaceDir: this.workspaceDir,
extraPaths: this.settings.extraPaths,
multimodal: {
enabled: this.settings.multimodal.enabled,
modalities: this.settings.multimodal.modalities,
maxFileBytes: this.settings.multimodal.maxFileBytes,
},
});
const targetSessionFiles = this.normalizeTargetSessionFiles(params?.sessionFiles);
const hasTargetSessionFiles = targetSessionFiles !== null;
if (hasTargetSessionFiles && targetSessionFiles && this.sources.has("sessions")) {
// Post-compaction refreshes should only update the explicit transcript files and
// leave broader reindex/dirty-work decisions to the regular sync path.
try {
await this.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: Array.from(targetSessionFiles),
progress: progress ?? undefined,
});
this.clearSyncedSessionFiles(targetSessionFiles);
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
this.shouldFallbackOnError(reason) && (await this.activateFallbackProvider(reason));
if (activated) {
if (
process.env.OPENCLAW_TEST_FAST === "1" &&
process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1"
) {
await this.runUnsafeReindex({
reason: params?.reason,
force: true,
progress: progress ?? undefined,
});
} else {
await this.runSafeReindex({
reason: params?.reason,
force: true,
progress: progress ?? undefined,
});
}
return;
}
throw err;
}
const targetedSessionSync = await runMemoryTargetedSessionSync({
hasSessionSource: this.sources.has("sessions"),
targetSessionFiles,
reason: params?.reason,
progress: progress ?? undefined,
useUnsafeReindex:
process.env.OPENCLAW_TEST_FAST === "1" &&
process.env.OPENCLAW_TEST_MEMORY_UNSAFE_REINDEX === "1",
sessionsDirtyFiles: this.sessionsDirtyFiles,
syncSessionFiles: async (targetedParams) => {
await this.syncSessionFiles(targetedParams);
},
shouldFallbackOnError: (message) => this.shouldFallbackOnError(message),
activateFallbackProvider: async (reason) => await this.activateFallbackProvider(reason),
runSafeReindex: async (reindexParams) => {
await this.runSafeReindex(reindexParams);
},
runUnsafeReindex: async (reindexParams) => {
await this.runUnsafeReindex(reindexParams);
},
});
if (targetedSessionSync.handled) {
this.sessionsDirty = targetedSessionSync.sessionsDirty;
return;
}
const needsFullReindex =
(params?.force && !hasTargetSessionFiles) ||
!meta ||
// Also detects provider→FTS-only transitions so orphaned old-model FTS rows are cleaned up.
(this.provider ? meta.model !== this.provider.model : meta.model !== "fts-only") ||
(this.provider ? meta.provider !== this.provider.id : meta.provider !== "none") ||
meta.providerKey !== this.providerKey ||
this.metaSourcesDiffer(meta, configuredSources) ||
meta.scopeHash !== configuredScopeHash ||
meta.chunkTokens !== this.settings.chunking.tokens ||
meta.chunkOverlap !== this.settings.chunking.overlap ||
(vectorReady && !meta?.vectorDims) ||
(meta.ftsTokenizer ?? "unicode61") !== this.settings.store.fts.tokenizer;
shouldRunFullMemoryReindex({
meta,
// Also detects provider→FTS-only transitions so orphaned old-model FTS rows are cleaned up.
provider: this.provider ? { id: this.provider.id, model: this.provider.model } : null,
providerKey: this.providerKey,
configuredSources,
configuredScopeHash,
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
vectorReady,
ftsTokenizer: this.settings.store.fts.tokenizer,
});
try {
if (needsFullReindex) {
if (
@@ -1209,8 +1175,16 @@ export abstract class MemoryManagerSyncOps {
model: this.provider?.model ?? "fts-only",
provider: this.provider?.id ?? "none",
providerKey: this.providerKey!,
sources: this.resolveConfiguredSourcesForMeta(),
scopeHash: this.resolveConfiguredScopeHash(),
sources: resolveConfiguredSourcesForMeta(this.sources),
scopeHash: resolveConfiguredScopeHash({
workspaceDir: this.workspaceDir,
extraPaths: this.settings.extraPaths,
multimodal: {
enabled: this.settings.multimodal.enabled,
modalities: this.settings.multimodal.modalities,
maxFileBytes: this.settings.multimodal.maxFileBytes,
},
}),
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
ftsTokenizer: this.settings.store.fts.tokenizer,
@@ -1282,8 +1256,16 @@ export abstract class MemoryManagerSyncOps {
model: this.provider?.model ?? "fts-only",
provider: this.provider?.id ?? "none",
providerKey: this.providerKey!,
sources: this.resolveConfiguredSourcesForMeta(),
scopeHash: this.resolveConfiguredScopeHash(),
sources: resolveConfiguredSourcesForMeta(this.sources),
scopeHash: resolveConfiguredScopeHash({
workspaceDir: this.workspaceDir,
extraPaths: this.settings.extraPaths,
multimodal: {
enabled: this.settings.multimodal.enabled,
modalities: this.settings.multimodal.modalities,
maxFileBytes: this.settings.multimodal.maxFileBytes,
},
}),
chunkTokens: this.settings.chunking.tokens,
chunkOverlap: this.settings.chunking.overlap,
ftsTokenizer: this.settings.store.fts.tokenizer,
@@ -1340,50 +1322,4 @@ export abstract class MemoryManagerSyncOps {
.run(META_KEY, value);
this.lastMetaSerialized = value;
}
private resolveConfiguredSourcesForMeta(): MemorySource[] {
const normalized = Array.from(this.sources)
.filter((source): source is MemorySource => source === "memory" || source === "sessions")
.toSorted();
return normalized.length > 0 ? normalized : ["memory"];
}
private normalizeMetaSources(meta: MemoryIndexMeta): MemorySource[] {
if (!Array.isArray(meta.sources)) {
// Backward compatibility for older indexes that did not persist sources.
return ["memory"];
}
const normalized = Array.from(
new Set(
meta.sources.filter(
(source): source is MemorySource => source === "memory" || source === "sessions",
),
),
).toSorted();
return normalized.length > 0 ? normalized : ["memory"];
}
private resolveConfiguredScopeHash(): string {
const extraPaths = normalizeExtraMemoryPaths(this.workspaceDir, this.settings.extraPaths)
.map((value) => value.replace(/\\/g, "/"))
.toSorted();
return hashText(
JSON.stringify({
extraPaths,
multimodal: {
enabled: this.settings.multimodal.enabled,
modalities: [...this.settings.multimodal.modalities].toSorted(),
maxFileBytes: this.settings.multimodal.maxFileBytes,
},
}),
);
}
private metaSourcesDiffer(meta: MemoryIndexMeta, configuredSources: MemorySource[]): boolean {
const metaSources = this.normalizeMetaSources(meta);
if (metaSources.length !== configuredSources.length) {
return true;
}
return metaSources.some((source, index) => source !== configuredSources[index]);
}
}

View File

@@ -0,0 +1,78 @@
import { describe, expect, it, vi } from "vitest";
import {
clearMemorySyncedSessionFiles,
runMemoryTargetedSessionSync,
} from "./manager-targeted-sync.js";
describe("memory targeted session sync", () => {
it("preserves unrelated dirty sessions after targeted cleanup", () => {
const secondSessionPath = "/tmp/targeted-dirty-second.jsonl";
const sessionsDirtyFiles = new Set(["/tmp/targeted-dirty-first.jsonl", secondSessionPath]);
const sessionsDirty = clearMemorySyncedSessionFiles({
sessionsDirtyFiles,
targetSessionFiles: ["/tmp/targeted-dirty-first.jsonl"],
});
expect(sessionsDirtyFiles.has(secondSessionPath)).toBe(true);
expect(sessionsDirty).toBe(true);
});
it("runs a full reindex after fallback activates during targeted sync", async () => {
const activateFallbackProvider = vi.fn(async () => true);
const runSafeReindex = vi.fn(async () => {});
const runUnsafeReindex = vi.fn(async () => {});
await runMemoryTargetedSessionSync({
hasSessionSource: true,
targetSessionFiles: new Set(["/tmp/targeted-fallback.jsonl"]),
reason: "post-compaction",
progress: undefined,
useUnsafeReindex: false,
sessionsDirtyFiles: new Set(),
syncSessionFiles: async () => {
throw new Error("embedding backend failed");
},
shouldFallbackOnError: () => true,
activateFallbackProvider,
runSafeReindex,
runUnsafeReindex,
});
expect(activateFallbackProvider).toHaveBeenCalledWith("embedding backend failed");
expect(runSafeReindex).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
progress: undefined,
});
expect(runUnsafeReindex).not.toHaveBeenCalled();
});
it("uses the unsafe reindex path when enabled", async () => {
const runSafeReindex = vi.fn(async () => {});
const runUnsafeReindex = vi.fn(async () => {});
await runMemoryTargetedSessionSync({
hasSessionSource: true,
targetSessionFiles: new Set(["/tmp/targeted-fallback.jsonl"]),
reason: "post-compaction",
progress: undefined,
useUnsafeReindex: true,
sessionsDirtyFiles: new Set(),
syncSessionFiles: async () => {
throw new Error("embedding backend failed");
},
shouldFallbackOnError: () => true,
activateFallbackProvider: async () => true,
runSafeReindex,
runUnsafeReindex,
});
expect(runUnsafeReindex).toHaveBeenCalledWith({
reason: "post-compaction",
force: true,
progress: undefined,
});
expect(runSafeReindex).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,86 @@
import type { MemorySyncProgressUpdate } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
type TargetedSyncProgress = (update: MemorySyncProgressUpdate) => void;
export function clearMemorySyncedSessionFiles(params: {
sessionsDirtyFiles: Set<string>;
targetSessionFiles?: Iterable<string> | null;
}): boolean {
if (!params.targetSessionFiles) {
params.sessionsDirtyFiles.clear();
} else {
for (const targetSessionFile of params.targetSessionFiles) {
params.sessionsDirtyFiles.delete(targetSessionFile);
}
}
return params.sessionsDirtyFiles.size > 0;
}
export async function runMemoryTargetedSessionSync(params: {
hasSessionSource: boolean;
targetSessionFiles: Set<string> | null;
reason?: string;
progress?: TargetedSyncProgress;
useUnsafeReindex: boolean;
sessionsDirtyFiles: Set<string>;
syncSessionFiles: (params: {
needsFullReindex: boolean;
targetSessionFiles?: string[];
progress?: TargetedSyncProgress;
}) => Promise<void>;
shouldFallbackOnError: (message: string) => boolean;
activateFallbackProvider: (reason: string) => Promise<boolean>;
runSafeReindex: (params: {
reason?: string;
force?: boolean;
progress?: TargetedSyncProgress;
}) => Promise<void>;
runUnsafeReindex: (params: {
reason?: string;
force?: boolean;
progress?: TargetedSyncProgress;
}) => Promise<void>;
}): Promise<{ handled: boolean; sessionsDirty: boolean }> {
if (!params.hasSessionSource || !params.targetSessionFiles) {
return {
handled: false,
sessionsDirty: params.sessionsDirtyFiles.size > 0,
};
}
try {
await params.syncSessionFiles({
needsFullReindex: false,
targetSessionFiles: Array.from(params.targetSessionFiles),
progress: params.progress,
});
return {
handled: true,
sessionsDirty: clearMemorySyncedSessionFiles({
sessionsDirtyFiles: params.sessionsDirtyFiles,
targetSessionFiles: params.targetSessionFiles,
}),
};
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
const activated =
params.shouldFallbackOnError(reason) && (await params.activateFallbackProvider(reason));
if (!activated) {
throw err;
}
const reindexParams = {
reason: params.reason,
force: true,
progress: params.progress,
};
if (params.useUnsafeReindex) {
await params.runUnsafeReindex(reindexParams);
} else {
await params.runSafeReindex(reindexParams);
}
return {
handled: true,
sessionsDirty: params.sessionsDirtyFiles.size > 0,
};
}
}