mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-12 17:51:22 +00:00
fix(memory-core): repair sync helper typing drift
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import type { SQLInputValue } from "node:sqlite";
|
||||
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
|
||||
export type MemorySourceFileStateRow = {
|
||||
@@ -7,8 +8,8 @@ export type MemorySourceFileStateRow = {
|
||||
|
||||
type MemorySourceStateDb = {
|
||||
prepare: (sql: string) => {
|
||||
all: (...args: unknown[]) => unknown;
|
||||
get: (...args: unknown[]) => unknown;
|
||||
all: (...args: SQLInputValue[]) => unknown;
|
||||
get: (...args: SQLInputValue[]) => unknown;
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
import type { SQLInputValue } from "node:sqlite";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import {
|
||||
collectMemoryStatusAggregate,
|
||||
@@ -59,7 +59,7 @@ describe("memory manager status state", () => {
|
||||
});
|
||||
|
||||
it("uses one aggregation query for status counts and source breakdowns", () => {
|
||||
const calls: Array<{ sql: string; params: MemorySource[] }> = [];
|
||||
const calls: Array<{ sql: string; params: SQLInputValue[] }> = [];
|
||||
const aggregate = collectMemoryStatusAggregate({
|
||||
db: {
|
||||
prepare: (sql) => ({
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import type { SQLInputValue } from "node:sqlite";
|
||||
import type { MemorySource } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
|
||||
type StatusProvider = {
|
||||
@@ -13,7 +14,7 @@ type StatusAggregateRow = {
|
||||
|
||||
type StatusAggregateDb = {
|
||||
prepare: (sql: string) => {
|
||||
all: (...args: MemorySource[]) => StatusAggregateRow[];
|
||||
all: (...args: SQLInputValue[]) => StatusAggregateRow[];
|
||||
};
|
||||
};
|
||||
|
||||
|
||||
@@ -124,10 +124,11 @@ export async function runMemorySyncWithReadonlyRecovery(
|
||||
|
||||
export function enqueueMemoryTargetedSessionSync(
|
||||
state: {
|
||||
closed: boolean;
|
||||
syncing: Promise<void> | null;
|
||||
queuedSessionFiles: Set<string>;
|
||||
queuedSessionSync: Promise<void> | null;
|
||||
isClosed: () => boolean;
|
||||
getSyncing: () => Promise<void> | null;
|
||||
getQueuedSessionFiles: () => Set<string>;
|
||||
getQueuedSessionSync: () => Promise<void> | null;
|
||||
setQueuedSessionSync: (value: Promise<void> | null) => void;
|
||||
sync: (params?: {
|
||||
reason?: string;
|
||||
force?: boolean;
|
||||
@@ -137,33 +138,36 @@ export function enqueueMemoryTargetedSessionSync(
|
||||
},
|
||||
sessionFiles?: string[],
|
||||
): Promise<void> {
|
||||
const queuedSessionFiles = state.getQueuedSessionFiles();
|
||||
for (const sessionFile of sessionFiles ?? []) {
|
||||
const trimmed = sessionFile.trim();
|
||||
if (trimmed) {
|
||||
state.queuedSessionFiles.add(trimmed);
|
||||
queuedSessionFiles.add(trimmed);
|
||||
}
|
||||
}
|
||||
if (state.queuedSessionFiles.size === 0) {
|
||||
return state.syncing ?? Promise.resolve();
|
||||
if (queuedSessionFiles.size === 0) {
|
||||
return state.getSyncing() ?? Promise.resolve();
|
||||
}
|
||||
if (!state.queuedSessionSync) {
|
||||
state.queuedSessionSync = (async () => {
|
||||
try {
|
||||
await state.syncing?.catch(() => undefined);
|
||||
while (!state.closed && state.queuedSessionFiles.size > 0) {
|
||||
const queuedSessionFiles = Array.from(state.queuedSessionFiles);
|
||||
state.queuedSessionFiles.clear();
|
||||
await state.sync({
|
||||
reason: "queued-session-files",
|
||||
sessionFiles: queuedSessionFiles,
|
||||
});
|
||||
if (!state.getQueuedSessionSync()) {
|
||||
state.setQueuedSessionSync(
|
||||
(async () => {
|
||||
try {
|
||||
await state.getSyncing()?.catch(() => undefined);
|
||||
while (!state.isClosed() && state.getQueuedSessionFiles().size > 0) {
|
||||
const pendingSessionFiles = Array.from(state.getQueuedSessionFiles());
|
||||
state.getQueuedSessionFiles().clear();
|
||||
await state.sync({
|
||||
reason: "queued-session-files",
|
||||
sessionFiles: pendingSessionFiles,
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
state.setQueuedSessionSync(null);
|
||||
}
|
||||
} finally {
|
||||
state.queuedSessionSync = null;
|
||||
}
|
||||
})();
|
||||
})(),
|
||||
);
|
||||
}
|
||||
return state.queuedSessionSync;
|
||||
return state.getQueuedSessionSync() ?? Promise.resolve();
|
||||
}
|
||||
|
||||
export function _createMemorySyncControlConfigForTests(
|
||||
|
||||
@@ -979,7 +979,7 @@ export abstract class MemoryManagerSyncOps {
|
||||
meta,
|
||||
// Also detects provider→FTS-only transitions so orphaned old-model FTS rows are cleaned up.
|
||||
provider: this.provider ? { id: this.provider.id, model: this.provider.model } : null,
|
||||
providerKey: this.providerKey,
|
||||
providerKey: this.providerKey ?? undefined,
|
||||
configuredSources,
|
||||
configuredScopeHash,
|
||||
chunkTokens: this.settings.chunking.tokens,
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import type { MemorySyncProgressUpdate } from "openclaw/plugin-sdk/memory-core-host-engine-storage";
|
||||
|
||||
type TargetedSyncProgress = (update: MemorySyncProgressUpdate) => void;
|
||||
type TargetedSyncProgress = {
|
||||
completed: number;
|
||||
total: number;
|
||||
label?: string;
|
||||
report: (update: MemorySyncProgressUpdate) => void;
|
||||
};
|
||||
|
||||
export function clearMemorySyncedSessionFiles(params: {
|
||||
sessionsDirtyFiles: Set<string>;
|
||||
|
||||
@@ -28,6 +28,29 @@ describe("memory manager readonly recovery", () => {
|
||||
let workspaceDir = "";
|
||||
let indexPath = "";
|
||||
|
||||
function createQueuedSyncHarness(syncing: Promise<void>) {
|
||||
const queuedSessionFiles = new Set<string>();
|
||||
let queuedSessionSync: Promise<void> | null = null;
|
||||
const sync = vi.fn(async () => {});
|
||||
return {
|
||||
queuedSessionFiles,
|
||||
get queuedSessionSync() {
|
||||
return queuedSessionSync;
|
||||
},
|
||||
sync,
|
||||
state: {
|
||||
isClosed: () => false,
|
||||
getSyncing: () => syncing,
|
||||
getQueuedSessionFiles: () => queuedSessionFiles,
|
||||
getQueuedSessionSync: () => queuedSessionSync,
|
||||
setQueuedSessionSync: (value: Promise<void> | null) => {
|
||||
queuedSessionSync = value;
|
||||
},
|
||||
sync,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function _createMemoryConfig(): OpenClawConfig {
|
||||
return _createMemorySyncControlConfigForTests(workspaceDir, indexPath);
|
||||
}
|
||||
@@ -56,9 +79,9 @@ describe("memory manager readonly recovery", () => {
|
||||
readonlyRecoveryLastError: undefined,
|
||||
ensureProviderInitialized: vi.fn(async () => {}),
|
||||
enqueueTargetedSessionSync: vi.fn(async () => {}),
|
||||
runSync: vi.fn(),
|
||||
runSync: vi.fn(async (_params) => undefined) as ReadonlyRecoveryHarness["runSync"],
|
||||
openDatabase: vi.fn(() => reopenedDb),
|
||||
ensureSchema: vi.fn(),
|
||||
ensureSchema: vi.fn(() => undefined) as ReadonlyRecoveryHarness["ensureSchema"],
|
||||
readMeta: vi.fn(() => undefined),
|
||||
};
|
||||
return {
|
||||
@@ -168,15 +191,9 @@ describe("memory manager readonly recovery", () => {
|
||||
const pendingSync = new Promise<void>((resolve) => {
|
||||
releaseSync = () => resolve();
|
||||
});
|
||||
const harness = {
|
||||
closed: false,
|
||||
syncing: pendingSync,
|
||||
queuedSessionFiles: new Set<string>(),
|
||||
queuedSessionSync: null as Promise<void> | null,
|
||||
sync: vi.fn(async () => {}),
|
||||
};
|
||||
const harness = createQueuedSyncHarness(pendingSync);
|
||||
|
||||
const queued = enqueueMemoryTargetedSessionSync(harness, [
|
||||
const queued = enqueueMemoryTargetedSessionSync(harness.state, [
|
||||
" /tmp/first.jsonl ",
|
||||
"",
|
||||
"/tmp/second.jsonl",
|
||||
@@ -200,19 +217,13 @@ describe("memory manager readonly recovery", () => {
|
||||
const pendingSync = new Promise<void>((resolve) => {
|
||||
releaseSync = () => resolve();
|
||||
});
|
||||
const harness = {
|
||||
closed: false,
|
||||
syncing: pendingSync,
|
||||
queuedSessionFiles: new Set<string>(),
|
||||
queuedSessionSync: null as Promise<void> | null,
|
||||
sync: vi.fn(async () => {}),
|
||||
};
|
||||
const harness = createQueuedSyncHarness(pendingSync);
|
||||
|
||||
const first = enqueueMemoryTargetedSessionSync(harness, [
|
||||
const first = enqueueMemoryTargetedSessionSync(harness.state, [
|
||||
"/tmp/first.jsonl",
|
||||
"/tmp/second.jsonl",
|
||||
]);
|
||||
const second = enqueueMemoryTargetedSessionSync(harness, [
|
||||
const second = enqueueMemoryTargetedSessionSync(harness.state, [
|
||||
"/tmp/second.jsonl",
|
||||
"/tmp/third.jsonl",
|
||||
]);
|
||||
@@ -234,15 +245,9 @@ describe("memory manager readonly recovery", () => {
|
||||
const pendingSync = new Promise<void>((resolve) => {
|
||||
releaseSync = () => resolve();
|
||||
});
|
||||
const harness = {
|
||||
closed: false,
|
||||
syncing: pendingSync,
|
||||
queuedSessionFiles: new Set<string>(),
|
||||
queuedSessionSync: null as Promise<void> | null,
|
||||
sync: vi.fn(async () => {}),
|
||||
};
|
||||
const harness = createQueuedSyncHarness(pendingSync);
|
||||
|
||||
const queued = enqueueMemoryTargetedSessionSync(harness, ["", " "]);
|
||||
const queued = enqueueMemoryTargetedSessionSync(harness.state, ["", " "]);
|
||||
|
||||
expect(queued).toBe(pendingSync);
|
||||
releaseSync();
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import type { DatabaseSync } from "node:sqlite";
|
||||
import { type FSWatcher } from "chokidar";
|
||||
import {
|
||||
createSubsystemLogger,
|
||||
resolveAgentDir,
|
||||
resolveAgentWorkspaceDir,
|
||||
resolveMemorySearchConfig,
|
||||
@@ -53,6 +55,7 @@ const EMBEDDING_CACHE_TABLE = "embedding_cache";
|
||||
const BATCH_FAILURE_LIMIT = 2;
|
||||
|
||||
const MEMORY_INDEX_MANAGER_CACHE_KEY = Symbol.for("openclaw.memoryIndexManagerCache");
|
||||
const log = createSubsystemLogger("memory");
|
||||
|
||||
const { cache: INDEX_CACHE, pending: INDEX_CACHE_PENDING } =
|
||||
resolveSingletonManagedCache<MemoryIndexManager>(MEMORY_INDEX_MANAGER_CACHE_KEY);
|
||||
@@ -542,7 +545,19 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
}
|
||||
|
||||
private enqueueTargetedSessionSync(sessionFiles?: string[]): Promise<void> {
|
||||
return enqueueMemoryTargetedSessionSync(this, sessionFiles);
|
||||
return enqueueMemoryTargetedSessionSync(
|
||||
{
|
||||
isClosed: () => this.closed,
|
||||
getSyncing: () => this.syncing,
|
||||
getQueuedSessionFiles: () => this.queuedSessionFiles,
|
||||
getQueuedSessionSync: () => this.queuedSessionSync,
|
||||
setQueuedSessionSync: (value) => {
|
||||
this.queuedSessionSync = value;
|
||||
},
|
||||
sync: async (params) => await this.sync(params),
|
||||
},
|
||||
sessionFiles,
|
||||
);
|
||||
}
|
||||
|
||||
private isReadonlyDbError(err: unknown): boolean {
|
||||
@@ -650,7 +665,16 @@ export class MemoryIndexManager extends MemoryManagerEmbeddingOps implements Mem
|
||||
status(): MemoryProviderStatus {
|
||||
const sourceFilter = this.buildSourceFilter();
|
||||
const aggregateState = collectMemoryStatusAggregate({
|
||||
db: this.db,
|
||||
db: {
|
||||
prepare: (sql) => ({
|
||||
all: (...args) =>
|
||||
this.db.prepare(sql).all(...args) as Array<{
|
||||
kind: "files" | "chunks";
|
||||
source: MemorySource;
|
||||
c: number;
|
||||
}>,
|
||||
}),
|
||||
},
|
||||
sources: this.sources,
|
||||
sourceFilterSql: sourceFilter.sql,
|
||||
sourceFilterParams: sourceFilter.params,
|
||||
|
||||
Reference in New Issue
Block a user