fix: reconcile sqlite rebase state seams

This commit is contained in:
Peter Steinberger
2026-05-17 04:08:41 +01:00
parent 3bb0c560cb
commit a73fa405ca
5 changed files with 202 additions and 83 deletions

View File

@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import {
resolveMemorySessionStartupDirtyFiles,
resolveMemorySessionStartupDirtyTranscripts,
resolveMemorySessionSyncPlan,
} from "./manager-session-sync-state.js";
@@ -76,45 +76,59 @@ describe("memory session sync state", () => {
expect(plan.activeSourceKeys).toEqual(new Set(["session:incremental"]));
});
it("marks missing and changed startup session files dirty", () => {
const dirtyFiles = resolveMemorySessionStartupDirtyFiles({
files: [
it("marks missing and changed startup session transcripts dirty", () => {
const dirtyTranscripts = resolveMemorySessionStartupDirtyTranscripts({
transcripts: [
{
absPath: "/tmp/sessions/unchanged.jsonl",
path: "sessions/unchanged.jsonl",
mtimeMs: 100,
scopeKey: "main\0unchanged",
sourceKey: "session:unchanged",
updatedAt: 100,
size: 10,
},
{
absPath: "/tmp/sessions/newer.jsonl",
path: "sessions/newer.jsonl",
mtimeMs: 250,
scopeKey: "main\0newer",
sourceKey: "session:newer",
updatedAt: 250,
size: 20,
},
{
absPath: "/tmp/sessions/resized.jsonl",
path: "sessions/resized.jsonl",
mtimeMs: 300,
scopeKey: "main\0resized",
sourceKey: "session:resized",
updatedAt: 300,
size: 31,
},
{
absPath: "/tmp/sessions/missing.jsonl",
path: "sessions/missing.jsonl",
mtimeMs: 400,
scopeKey: "main\0missing",
sourceKey: "session:missing",
updatedAt: 400,
size: 40,
},
],
existingRows: [
{ path: "sessions/unchanged.jsonl", hash: "hash-unchanged", mtime: 100, size: 10 },
{ path: "sessions/newer.jsonl", hash: "hash-newer", mtime: 200, size: 20 },
{ path: "sessions/resized.jsonl", hash: "hash-resized", mtime: 300, size: 30 },
{
sourceKey: "session:unchanged",
path: "transcript:main:unchanged",
hash: "hash-unchanged",
mtime: 100,
size: 10,
},
{
sourceKey: "session:newer",
path: "transcript:main:newer",
hash: "hash-newer",
mtime: 200,
size: 20,
},
{
sourceKey: "session:resized",
path: "transcript:main:resized",
hash: "hash-resized",
mtime: 300,
size: 30,
},
],
});
expect(dirtyFiles).toEqual([
"/tmp/sessions/newer.jsonl",
"/tmp/sessions/resized.jsonl",
"/tmp/sessions/missing.jsonl",
]);
expect(dirtyTranscripts).toEqual(["main\0newer", "main\0resized", "main\0missing"]);
});
});

View File

@@ -5,6 +5,38 @@ export type MemorySessionSyncScope = {
sessionId: string;
};
export type MemorySessionStartupTranscriptState = {
scopeKey: string;
sourceKey: string;
updatedAt: number;
size: number;
};
export function resolveMemorySessionStartupDirtyTranscripts(params: {
transcripts: MemorySessionStartupTranscriptState[];
existingRows?: MemorySourceFileStateRow[] | null;
}): string[] {
const indexedRows = new Map((params.existingRows ?? []).map((row) => [row.sourceKey, row]));
const dirtyTranscripts: string[] = [];
for (const transcript of params.transcripts) {
const existing = indexedRows.get(transcript.sourceKey);
if (!existing) {
dirtyTranscripts.push(transcript.scopeKey);
continue;
}
const indexedMtimeMs = Number(existing.mtime);
const indexedSize = Number(existing.size);
if (!Number.isFinite(indexedMtimeMs) || !Number.isFinite(indexedSize)) {
dirtyTranscripts.push(transcript.scopeKey);
continue;
}
if (transcript.size !== indexedSize || transcript.updatedAt > indexedMtimeMs) {
dirtyTranscripts.push(transcript.scopeKey);
}
}
return dirtyTranscripts;
}
export function resolveMemorySessionSyncPlan(params: {
needsFullReindex: boolean;
transcripts: MemorySessionSyncScope[];

View File

@@ -1,11 +1,7 @@
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { DatabaseSync } from "node:sqlite";
import {
resolveSessionTranscriptsDirForAgent,
type OpenClawConfig,
type ResolvedMemorySearchConfig,
import type {
OpenClawConfig,
ResolvedMemorySearchConfig,
} from "openclaw/plugin-sdk/memory-core-host-engine-foundation";
import type {
MemorySource,
@@ -14,6 +10,24 @@ import type {
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { MemoryManagerSyncOps } from "./manager-sync-ops.js";
const { listSessionTranscriptScopesForAgentMock, readSessionTranscriptDeltaStatsMock } =
vi.hoisted(() => ({
listSessionTranscriptScopesForAgentMock: vi.fn(),
readSessionTranscriptDeltaStatsMock: vi.fn(),
}));
vi.mock("openclaw/plugin-sdk/memory-core-host-engine-session-transcripts", async (importOriginal) => {
const actual =
await importOriginal<
typeof import("openclaw/plugin-sdk/memory-core-host-engine-session-transcripts")
>();
return {
...actual,
listSessionTranscriptScopesForAgent: listSessionTranscriptScopesForAgentMock,
readSessionTranscriptDeltaStats: readSessionTranscriptDeltaStatsMock,
};
});
type MemoryIndexEntry = {
path: string;
absPath: string;
@@ -26,11 +40,17 @@ type MemoryIndexEntry = {
type SyncParams = {
reason?: string;
force?: boolean;
sessionFiles?: string[];
sessionTranscriptScopes?: Array<{ agentId: string; sessionId: string }>;
progress?: (update: MemorySyncProgressUpdate) => void;
};
type SourceStateRow = { path: string; hash: string; mtime: number; size: number };
type SourceStateRow = {
sourceKey: string;
path: string | null;
hash: string;
mtime?: number;
size?: number;
};
class SessionStartupCatchupHarness extends MemoryManagerSyncOps {
protected readonly cfg = {} as OpenClawConfig;
@@ -76,12 +96,12 @@ class SessionStartupCatchupHarness extends MemoryManagerSyncOps {
return await this.runSessionStartupCatchup();
}
async markStartupDirtyFiles(): Promise<string[]> {
return await this.markSessionStartupCatchupDirtyFiles();
async markStartupDirtyTranscripts(): Promise<string[]> {
return await this.markSessionStartupCatchupDirtyTranscripts();
}
getDirtySessionFiles(): string[] {
return Array.from(this.sessionsDirtyFiles);
getDirtySessionTranscripts(): string[] {
return Array.from(this.dirtySessionTranscripts);
}
isSessionsDirty(): boolean {
@@ -119,81 +139,68 @@ class SessionStartupCatchupHarness extends MemoryManagerSyncOps {
}
describe("session startup catch-up", () => {
let stateDir = "";
beforeEach(async () => {
stateDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-startup-"));
vi.stubEnv("OPENCLAW_STATE_DIR", stateDir);
beforeEach(() => {
listSessionTranscriptScopesForAgentMock.mockResolvedValue([
{ agentId: "main", sessionId: "thread" },
]);
readSessionTranscriptDeltaStatsMock.mockReturnValue({
size: 128,
messageCount: 3,
updatedAt: 200,
});
});
afterEach(async () => {
vi.unstubAllEnvs();
await fs.rm(stateDir, { recursive: true, force: true });
afterEach(() => {
vi.resetAllMocks();
});
async function writeSessionFile(
name: string,
): Promise<{ filePath: string; size: number; mtimeMs: number }> {
const sessionsDir = resolveSessionTranscriptsDirForAgent("main");
await fs.mkdir(sessionsDir, { recursive: true });
const filePath = path.join(sessionsDir, name);
await fs.writeFile(
filePath,
JSON.stringify({ type: "message", message: { role: "user", content: "startup catchup" } }) +
"\n",
"utf-8",
);
const stat = await fs.stat(filePath);
return { filePath, size: stat.size, mtimeMs: stat.mtimeMs };
}
it("marks stale indexed session files dirty and schedules catch-up sync", async () => {
const session = await writeSessionFile("thread.jsonl");
it("marks stale indexed session transcripts dirty and schedules catch-up sync", async () => {
const harness = new SessionStartupCatchupHarness([
{
path: "sessions/main/thread.jsonl",
sourceKey: "session:thread",
path: "transcript:main:thread",
hash: "old-hash",
mtime: session.mtimeMs - 1000,
size: session.size,
mtime: 100,
size: 128,
},
]);
await expect(harness.catchUp()).resolves.toEqual([session.filePath]);
expect(harness.getDirtySessionFiles()).toEqual([session.filePath]);
await expect(harness.catchUp()).resolves.toEqual(["main\0thread"]);
expect(harness.getDirtySessionTranscripts()).toEqual(["main\0thread"]);
expect(harness.isSessionsDirty()).toBe(true);
expect(harness.syncCalls).toEqual([{ reason: "session-startup-catchup" }]);
});
it("can mark startup catch-up files without scheduling background sync", async () => {
const session = await writeSessionFile("thread.jsonl");
it("can mark startup catch-up transcripts without scheduling background sync", async () => {
const harness = new SessionStartupCatchupHarness([
{
path: "sessions/main/thread.jsonl",
sourceKey: "session:thread",
path: "transcript:main:thread",
hash: "old-hash",
mtime: session.mtimeMs - 1000,
size: session.size,
mtime: 100,
size: 128,
},
]);
await expect(harness.markStartupDirtyFiles()).resolves.toEqual([session.filePath]);
expect(harness.getDirtySessionFiles()).toEqual([session.filePath]);
await expect(harness.markStartupDirtyTranscripts()).resolves.toEqual(["main\0thread"]);
expect(harness.getDirtySessionTranscripts()).toEqual(["main\0thread"]);
expect(harness.isSessionsDirty()).toBe(true);
expect(harness.syncCalls).toEqual([]);
});
it("leaves unchanged indexed session files clean", async () => {
const session = await writeSessionFile("thread.jsonl");
it("leaves unchanged indexed session transcripts clean", async () => {
const harness = new SessionStartupCatchupHarness([
{
path: "sessions/main/thread.jsonl",
sourceKey: "session:thread",
path: "transcript:main:thread",
hash: "current-hash",
mtime: session.mtimeMs,
size: session.size,
mtime: 200,
size: 128,
},
]);
await expect(harness.catchUp()).resolves.toEqual([]);
expect(harness.getDirtySessionFiles()).toEqual([]);
expect(harness.getDirtySessionTranscripts()).toEqual([]);
expect(harness.isSessionsDirty()).toBe(false);
expect(harness.syncCalls).toEqual([]);
});

View File

@@ -53,7 +53,11 @@ import {
type MemoryIndexMeta,
} from "./manager-reindex-state.js";
import { shouldSyncSessionsForReindex } from "./manager-session-reindex.js";
import { resolveMemorySessionSyncPlan } from "./manager-session-sync-state.js";
import {
resolveMemorySessionStartupDirtyTranscripts,
resolveMemorySessionSyncPlan,
type MemorySessionStartupTranscriptState,
} from "./manager-session-sync-state.js";
import {
loadMemorySourceFileState,
resolveMemorySourceExistingHash,
@@ -749,6 +753,65 @@ export abstract class MemoryManagerSyncOps {
});
}
protected ensureSessionStartupCatchup(): void {
if (!this.sources.has("sessions")) {
return;
}
void this.runSessionStartupCatchup().catch((err) => {
log.warn("memory session startup catch-up failed: " + String(err));
});
}
protected async markSessionStartupCatchupDirtyTranscripts(): Promise<string[]> {
if (!this.sources.has("sessions") || this.closed) {
return [];
}
const scopes = await listSessionTranscriptScopesForAgent(this.agentId);
if (scopes.length === 0 || this.closed) {
return [];
}
const existingRows = loadMemorySourceFileState({
db: this.db,
source: "sessions",
}).rows;
const transcripts: MemorySessionStartupTranscriptState[] = [];
for (const scope of scopes) {
const stats = readSessionTranscriptDeltaStats(scope);
if (!stats) {
continue;
}
transcripts.push({
scopeKey: sessionTranscriptScopeKey(scope),
sourceKey: sessionTranscriptSourceKeyForScope(scope),
updatedAt: stats.updatedAt,
size: stats.size,
});
}
const dirtyTranscripts = resolveMemorySessionStartupDirtyTranscripts({
transcripts,
existingRows,
});
if (dirtyTranscripts.length === 0 || this.closed) {
return dirtyTranscripts;
}
for (const transcript of dirtyTranscripts) {
this.dirtySessionTranscripts.add(transcript);
}
this.sessionsDirty = true;
return dirtyTranscripts;
}
protected async runSessionStartupCatchup(): Promise<string[]> {
const dirtyTranscripts = await this.markSessionStartupCatchupDirtyTranscripts();
if (dirtyTranscripts.length === 0 || this.closed) {
return dirtyTranscripts;
}
void this.sync({ reason: "session-startup-catchup" }).catch((err) => {
log.warn("memory sync failed (session-startup-catchup): " + String(err));
});
return dirtyTranscripts;
}
private scheduleSessionDirty(sessionTranscript: string) {
this.pendingSessionTranscripts.add(sessionTranscript);
if (this.sessionWatchTimer) {
@@ -1275,6 +1338,9 @@ export abstract class MemoryManagerSyncOps {
});
const targetSessionTranscriptKeys = this.normalizeTargetSessionTranscripts(params);
const hasTargetSessionTranscripts = targetSessionTranscriptKeys !== null;
if (params?.reason === "cli" && !params.force && !hasTargetSessionTranscripts) {
await this.markSessionStartupCatchupDirtyTranscripts();
}
const targetedSessionSync = await runMemoryTargetedSessionSync({
hasSessionSource: this.sources.has("sessions"),
targetSessionTranscriptKeys,

View File

@@ -440,7 +440,7 @@ const CORE_SECRET_TARGET_REGISTRY: SecretTargetRegistryEntry[] = [
{
id: "tools.web.fetch.firecrawl.apiKey",
targetType: "tools.web.fetch.firecrawl.apiKey",
configFile: "openclaw.json",
store: "openclaw.json",
pathPattern: "tools.web.fetch.firecrawl.apiKey",
secretShape: SECRET_INPUT_SHAPE,
expectedResolvedValue: "string",