diff --git a/extensions/matrix/doctor-contract-api.test.ts b/extensions/matrix/doctor-contract-api.test.ts new file mode 100644 index 00000000000..7c7eb63f27c --- /dev/null +++ b/extensions/matrix/doctor-contract-api.test.ts @@ -0,0 +1,124 @@ +// Matrix tests cover doctor contract state migrations. +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts"; +import type { + OpenKeyedStoreOptions, + PluginStateKeyedStore, +} from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import type { PluginDoctorStateMigrationContext } from "openclaw/plugin-sdk/runtime-doctor"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { stateMigrations } from "./doctor-contract-api.js"; +import { SqliteBackedMatrixSyncStore } from "./src/matrix/client/file-sync-store.js"; +import { installMatrixTestRuntime } from "./src/test-runtime.js"; + +function createContext(): PluginDoctorStateMigrationContext { + return { + openPluginStateKeyedStore: (options: OpenKeyedStoreOptions): PluginStateKeyedStore => + createPluginStateKeyedStoreForTests("matrix", options), + }; +} + +function createMigrationParams(stateDir: string) { + return { + config: {} as OpenClawConfig, + env: { OPENCLAW_STATE_DIR: stateDir }, + stateDir, + oauthDir: path.join(stateDir, "oauth"), + context: createContext(), + }; +} + +describe("matrix doctor contract state migrations", () => { + const tempDirs: string[] = []; + + beforeEach(() => { + resetPluginStateStoreForTests(); + installMatrixTestRuntime(); + }); + + afterEach(() => { + resetPluginStateStoreForTests(); + for (const dir of tempDirs.splice(0)) { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + it("migrates legacy sync cache JSON to SQLite plugin state", async () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-doctor-")); + tempDirs.push(stateDir); + const storageRootDir = path.join( + stateDir, + "matrix", + "accounts", + "default", + "matrix.example.org__bot", + "token-hash", + ); + fs.mkdirSync(storageRootDir, { recursive: true }); + fs.writeFileSync( + path.join(storageRootDir, "bot-storage.json"), + JSON.stringify({ + version: 1, + savedSync: { + nextBatch: "legacy-token", + accountData: [], + roomsData: { + join: {}, + invite: {}, + leave: {}, + knock: {}, + }, + }, + cleanShutdown: true, + }), + ); + + const migration = stateMigrations[0]; + await expect(migration.detectLegacyState(createMigrationParams(stateDir))).resolves.toEqual({ + preview: [`Matrix sync cache JSON can migrate to SQLite: ${storageRootDir}`], + }); + + await expect(migration.migrateLegacyState(createMigrationParams(stateDir))).resolves.toEqual({ + changes: [ + `Migrated Matrix sync cache JSON to SQLite for ${storageRootDir}`, + `Archived Matrix sync cache legacy source -> ${path.join(storageRootDir, "bot-storage.json")}.migrated`, + ], + warnings: [], + }); + + const store = new SqliteBackedMatrixSyncStore(storageRootDir); + expect(store.hasSavedSync()).toBe(true); + expect(store.hasSavedSyncFromCleanShutdown()).toBe(true); + await expect(store.getSavedSyncToken()).resolves.toBe("legacy-token"); + expect(fs.existsSync(path.join(storageRootDir, "bot-storage.json"))).toBe(false); + }); + + it("does not archive the legacy flat sync cache into an unread SQLite root", async () => { + const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-doctor-")); + tempDirs.push(stateDir); + const flatRoot = path.join(stateDir, "matrix"); + fs.mkdirSync(flatRoot, { recursive: true }); + fs.writeFileSync( + path.join(flatRoot, "bot-storage.json"), + JSON.stringify({ + next_batch: "flat-token", + rooms: { join: {} }, + account_data: { events: [] }, + }), + ); + + const migration = stateMigrations[0]; + await expect(migration.detectLegacyState(createMigrationParams(stateDir))).resolves.toBeNull(); + await expect(migration.migrateLegacyState(createMigrationParams(stateDir))).resolves.toEqual({ + changes: [], + warnings: [], + }); + expect(fs.existsSync(path.join(flatRoot, "bot-storage.json"))).toBe(true); + }); +}); diff --git a/extensions/matrix/doctor-contract-api.ts b/extensions/matrix/doctor-contract-api.ts index 394ddd9b827..0d8b7673567 100644 --- a/extensions/matrix/doctor-contract-api.ts +++ b/extensions/matrix/doctor-contract-api.ts @@ -1,2 +1,117 @@ +import type { Dirent } from "node:fs"; // Matrix API module exposes the plugin public contract. +import fs from "node:fs/promises"; +import path from "node:path"; +import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + hasMatrixSyncCacheStateInStore, + openMatrixSyncCacheStoreOptions, + readLegacyMatrixSyncCacheState, + writeMatrixSyncCacheStateToStore, + type MatrixSyncCacheRecord, +} from "./src/matrix/client/file-sync-store.js"; + export { normalizeCompatibilityConfig, legacyConfigRules } from "./src/doctor-contract.js"; + +const MATRIX_SYNC_CACHE_FILENAME = "bot-storage.json"; + +async function fileExists(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + return stat.isFile(); + } catch { + return false; + } +} + +async function collectLegacySyncCacheRoots(stateDir: string): Promise { + const matrixRoot = path.join(stateDir, "matrix"); + const roots: string[] = []; + async function visit(dir: string): Promise { + let entries: Dirent[]; + try { + entries = await fs.readdir(dir, { withFileTypes: true }); + } catch { + return; + } + for (const entry of entries) { + const entryPath = path.join(dir, entry.name); + if (entry.isFile() && entry.name === MATRIX_SYNC_CACHE_FILENAME) { + roots.push(dir); + continue; + } + if (entry.isDirectory()) { + await visit(entryPath); + } + } + } + await visit(matrixRoot); + return roots.filter((root) => path.resolve(root) !== path.resolve(matrixRoot)).toSorted(); +} + +async function archiveLegacySyncCache(params: { + storageRootDir: string; + changes: string[]; + warnings: string[]; +}): Promise { + const sourcePath = path.join(params.storageRootDir, MATRIX_SYNC_CACHE_FILENAME); + const archivedPath = `${sourcePath}.migrated`; + if (await fileExists(archivedPath)) { + params.warnings.push( + `Left migrated Matrix sync cache in place because ${archivedPath} already exists`, + ); + return; + } + try { + await fs.rename(sourcePath, archivedPath); + params.changes.push(`Archived Matrix sync cache legacy source -> ${archivedPath}`); + } catch (err) { + params.warnings.push(`Failed archiving Matrix sync cache legacy source: ${String(err)}`); + } +} + +export const stateMigrations: PluginDoctorStateMigration[] = [ + { + id: "matrix-sync-cache-json-to-plugin-state", + label: "Matrix sync cache", + async detectLegacyState(params) { + const previews: string[] = []; + for (const storageRootDir of await collectLegacySyncCacheRoots(params.stateDir)) { + const persisted = await readLegacyMatrixSyncCacheState(storageRootDir); + if (!persisted) { + continue; + } + previews.push(`Matrix sync cache JSON can migrate to SQLite: ${storageRootDir}`); + } + return previews.length > 0 ? { preview: previews } : null; + }, + async migrateLegacyState(params) { + const changes: string[] = []; + const warnings: string[] = []; + for (const storageRootDir of await collectLegacySyncCacheRoots(params.stateDir)) { + const persisted = await readLegacyMatrixSyncCacheState(storageRootDir); + if (!persisted) { + continue; + } + const store = params.context.openPluginStateKeyedStore( + openMatrixSyncCacheStoreOptions(storageRootDir), + ); + if (await hasMatrixSyncCacheStateInStore({ storageRootDir, store })) { + warnings.push( + `Skipped Matrix sync cache import for ${storageRootDir} because SQLite already has sync cache state`, + ); + await archiveLegacySyncCache({ storageRootDir, changes, warnings }); + continue; + } + await writeMatrixSyncCacheStateToStore({ + storageRootDir, + payload: persisted, + store, + }); + changes.push(`Migrated Matrix sync cache JSON to SQLite for ${storageRootDir}`); + await archiveLegacySyncCache({ storageRootDir, changes, warnings }); + } + return { changes, warnings }; + }, + }, +]; diff --git a/extensions/matrix/src/matrix/client/create-client.test.ts b/extensions/matrix/src/matrix/client/create-client.test.ts index 25245c4f5ef..3a2460c459f 100644 --- a/extensions/matrix/src/matrix/client/create-client.test.ts +++ b/extensions/matrix/src/matrix/client/create-client.test.ts @@ -77,7 +77,7 @@ describe("createMatrixClient", () => { encryption: undefined, localTimeoutMs: undefined, initialSyncLimit: undefined, - storagePath: storagePaths.storagePath, + storageRootDir: storagePaths.rootDir, recoveryKeyPath: storagePaths.recoveryKeyPath, idbSnapshotPath: storagePaths.idbSnapshotPath, cryptoDatabasePrefix: "openclaw-matrix-default-token-hash", @@ -103,7 +103,7 @@ describe("createMatrixClient", () => { encryption: undefined, localTimeoutMs: undefined, initialSyncLimit: undefined, - storagePath: undefined, + storageRootDir: undefined, recoveryKeyPath: undefined, idbSnapshotPath: undefined, cryptoDatabasePrefix: undefined, @@ -131,7 +131,7 @@ describe("createMatrixClient", () => { encryption: undefined, localTimeoutMs: undefined, initialSyncLimit: undefined, - storagePath: undefined, + storageRootDir: undefined, recoveryKeyPath: undefined, idbSnapshotPath: undefined, cryptoDatabasePrefix: undefined, @@ -156,7 +156,7 @@ describe("createMatrixClient", () => { encryption: undefined, localTimeoutMs: undefined, initialSyncLimit: undefined, - storagePath: undefined, + storageRootDir: undefined, recoveryKeyPath: undefined, idbSnapshotPath: undefined, cryptoDatabasePrefix: undefined, @@ -183,7 +183,7 @@ describe("createMatrixClient", () => { encryption: undefined, localTimeoutMs: undefined, initialSyncLimit: undefined, - storagePath: undefined, + storageRootDir: undefined, recoveryKeyPath: undefined, idbSnapshotPath: undefined, cryptoDatabasePrefix: undefined, diff --git a/extensions/matrix/src/matrix/client/create-client.ts b/extensions/matrix/src/matrix/client/create-client.ts index 70da38eedc8..3f3d1bc85c2 100644 --- a/extensions/matrix/src/matrix/client/create-client.ts +++ b/extensions/matrix/src/matrix/client/create-client.ts @@ -94,7 +94,7 @@ export async function createMatrixClient(params: { encryption: params.encryption, localTimeoutMs: params.localTimeoutMs, initialSyncLimit: params.initialSyncLimit, - storagePath: storagePaths?.storagePath, + storageRootDir: storagePaths?.rootDir, recoveryKeyPath: storagePaths?.recoveryKeyPath, idbSnapshotPath: storagePaths?.idbSnapshotPath, cryptoDatabasePrefix, diff --git a/extensions/matrix/src/matrix/client/file-sync-store.test.ts b/extensions/matrix/src/matrix/client/file-sync-store.test.ts index b6553334a46..268b703d46c 100644 --- a/extensions/matrix/src/matrix/client/file-sync-store.test.ts +++ b/extensions/matrix/src/matrix/client/file-sync-store.test.ts @@ -1,11 +1,20 @@ -// Matrix tests cover file sync store plugin behavior. +// Matrix tests cover sync cache plugin behavior. import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import type { ISyncResponse } from "matrix-js-sdk/lib/matrix.js"; -import * as jsonStore from "openclaw/plugin-sdk/json-store"; -import { afterEach, describe, expect, it, vi } from "vitest"; -import { FileBackedMatrixSyncStore } from "./file-sync-store.js"; +import { + createPluginStateSyncKeyedStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { getMatrixRuntime } from "../../runtime.js"; +import { installMatrixTestRuntime } from "../../test-runtime.js"; +import { + openMatrixSyncCacheStoreOptions, + SqliteBackedMatrixSyncStore, + type MatrixSyncCacheRecord, +} from "./file-sync-store.js"; function createSyncResponse(nextBatch: string): ISyncResponse { return { @@ -52,44 +61,40 @@ function createSyncResponse(nextBatch: string): ISyncResponse { }; } -function createDeferred() { - let resolve: (() => void) | undefined; - const promise = new Promise((resolvePromise) => { - resolve = resolvePromise; - }); - if (!resolve) { - throw new Error("Expected deferred resolver to be initialized"); - } - return { promise, resolve }; -} - -describe("FileBackedMatrixSyncStore", () => { +describe("SqliteBackedMatrixSyncStore", () => { const tempDirs: string[] = []; - function createStoragePath(): string { + function createStorageRoot(): string { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); tempDirs.push(tempDir); - return path.join(tempDir, "bot-storage.json"); + return tempDir; } + beforeEach(() => { + resetPluginStateStoreForTests(); + installMatrixTestRuntime(); + }); + afterEach(() => { vi.restoreAllMocks(); vi.useRealTimers(); for (const dir of tempDirs.splice(0)) { fs.rmSync(dir, { recursive: true, force: true }); } + resetPluginStateStoreForTests(); }); it("persists sync data so restart resumes from the saved cursor", async () => { - const storagePath = createStoragePath(); + const storageRoot = createStorageRoot(); const syncResponse = createSyncResponse("s123"); - const firstStore = new FileBackedMatrixSyncStore(storagePath); + const firstStore = new SqliteBackedMatrixSyncStore(storageRoot); expect(firstStore.hasSavedSync()).toBe(false); await firstStore.setSyncData(syncResponse); await firstStore.flush(); + expect(fs.existsSync(path.join(storageRoot, "bot-storage.json"))).toBe(false); - const secondStore = new FileBackedMatrixSyncStore(storagePath); + const secondStore = new SqliteBackedMatrixSyncStore(storageRoot); expect(secondStore.hasSavedSync()).toBe(true); await expect(secondStore.getSavedSyncToken()).resolves.toBe("s123"); @@ -133,11 +138,58 @@ describe("FileBackedMatrixSyncStore", () => { expect(secondStore.hasSavedSyncFromCleanShutdown()).toBe(false); }); + it("restores the sync cache after the storage root moves", async () => { + const storageRoot = createStorageRoot(); + const movedStorageRoot = `${storageRoot}-moved`; + + const firstStore = new SqliteBackedMatrixSyncStore(storageRoot); + await firstStore.setSyncData(createSyncResponse("portable-token")); + await firstStore.flush(); + resetPluginStateStoreForTests(); + fs.renameSync(storageRoot, movedStorageRoot); + tempDirs.push(movedStorageRoot); + + const secondStore = new SqliteBackedMatrixSyncStore(movedStorageRoot); + expect(secondStore.hasSavedSync()).toBe(true); + await expect(secondStore.getSavedSyncToken()).resolves.toBe("portable-token"); + }); + + it("ignores metadata with impossible chunk counts", async () => { + const storageRoot = createStorageRoot(); + const store = createPluginStateSyncKeyedStoreForTests( + "matrix", + openMatrixSyncCacheStoreOptions(storageRoot), + ); + store.register("current:meta", { + kind: "meta", + version: 1, + generation: "corrupt", + chunkCount: 20_000, + cleanShutdown: true, + }); + + const syncStore = new SqliteBackedMatrixSyncStore(storageRoot); + expect(syncStore.hasSavedSync()).toBe(false); + await expect(syncStore.getSavedSyncToken()).resolves.toBe(null); + }); + + it("fails persistence instead of silently dropping sync data when sqlite is unavailable", async () => { + const storageRoot = createStorageRoot(); + const runtime = getMatrixRuntime(); + vi.spyOn(runtime.state, "openSyncKeyedStore").mockImplementation(() => { + throw new Error("sqlite unavailable"); + }); + + const syncStore = new SqliteBackedMatrixSyncStore(storageRoot); + await syncStore.setSyncData(createSyncResponse("unavailable-token")); + + await expect(syncStore.flush()).rejects.toThrow(/sqlite store is unavailable/i); + }); + it("claims current-token storage ownership when sync state is persisted", async () => { - const storagePath = createStoragePath(); - const rootDir = path.dirname(storagePath); + const storageRoot = createStorageRoot(); fs.writeFileSync( - path.join(rootDir, "storage-meta.json"), + path.join(storageRoot, "storage-meta.json"), JSON.stringify({ homeserver: "https://matrix.example.org", userId: "@bot:example.org", @@ -148,50 +200,50 @@ describe("FileBackedMatrixSyncStore", () => { "utf8", ); - const store = new FileBackedMatrixSyncStore(storagePath); + const store = new SqliteBackedMatrixSyncStore(storageRoot); await store.setSyncData(createSyncResponse("claimed-token")); await store.flush(); - const meta = JSON.parse(fs.readFileSync(path.join(rootDir, "storage-meta.json"), "utf8")) as { - currentTokenStateClaimed?: boolean; - }; + const meta = JSON.parse( + fs.readFileSync(path.join(storageRoot, "storage-meta.json"), "utf8"), + ) as { currentTokenStateClaimed?: boolean }; expect(meta.currentTokenStateClaimed).toBe(true); }); it("only treats sync state as restart-safe after a clean shutdown persist", async () => { - const storagePath = createStoragePath(); + const storageRoot = createStorageRoot(); - const firstStore = new FileBackedMatrixSyncStore(storagePath); + const firstStore = new SqliteBackedMatrixSyncStore(storageRoot); await firstStore.setSyncData(createSyncResponse("s123")); await firstStore.flush(); - const afterDirtyPersist = new FileBackedMatrixSyncStore(storagePath); + const afterDirtyPersist = new SqliteBackedMatrixSyncStore(storageRoot); expect(afterDirtyPersist.hasSavedSync()).toBe(true); expect(afterDirtyPersist.hasSavedSyncFromCleanShutdown()).toBe(false); firstStore.markCleanShutdown(); await firstStore.flush(); - const afterCleanShutdown = new FileBackedMatrixSyncStore(storagePath); + const afterCleanShutdown = new SqliteBackedMatrixSyncStore(storageRoot); expect(afterCleanShutdown.hasSavedSync()).toBe(true); expect(afterCleanShutdown.hasSavedSyncFromCleanShutdown()).toBe(true); }); it("clears the clean-shutdown marker once fresh sync data arrives", async () => { - const storagePath = createStoragePath(); + const storageRoot = createStorageRoot(); - const firstStore = new FileBackedMatrixSyncStore(storagePath); + const firstStore = new SqliteBackedMatrixSyncStore(storageRoot); await firstStore.setSyncData(createSyncResponse("s123")); firstStore.markCleanShutdown(); await firstStore.flush(); - const restartedStore = new FileBackedMatrixSyncStore(storagePath); + const restartedStore = new SqliteBackedMatrixSyncStore(storageRoot); expect(restartedStore.hasSavedSyncFromCleanShutdown()).toBe(true); await restartedStore.setSyncData(createSyncResponse("s456")); await restartedStore.flush(); - const afterNewSync = new FileBackedMatrixSyncStore(storagePath); + const afterNewSync = new SqliteBackedMatrixSyncStore(storageRoot); expect(afterNewSync.hasSavedSync()).toBe(true); expect(afterNewSync.hasSavedSyncFromCleanShutdown()).toBe(false); await expect(afterNewSync.getSavedSyncToken()).resolves.toBe("s456"); @@ -199,128 +251,46 @@ describe("FileBackedMatrixSyncStore", () => { it("coalesces background persistence until the debounce window elapses", async () => { vi.useFakeTimers(); - const storagePath = createStoragePath(); - const writeSpy = vi.spyOn(jsonStore, "writeJsonFileAtomically").mockResolvedValue(); + const storageRoot = createStorageRoot(); - const store = new FileBackedMatrixSyncStore(storagePath); + const store = new SqliteBackedMatrixSyncStore(storageRoot); await store.setSyncData(createSyncResponse("s111")); await store.setSyncData(createSyncResponse("s222")); await store.storeClientOptions({ lazyLoadMembers: true }); - expect(writeSpy).not.toHaveBeenCalled(); + const beforeDebounce = new SqliteBackedMatrixSyncStore(storageRoot); + expect(beforeDebounce.hasSavedSync()).toBe(false); await vi.advanceTimersByTimeAsync(249); - expect(writeSpy).not.toHaveBeenCalled(); + const beforeElapsed = new SqliteBackedMatrixSyncStore(storageRoot); + expect(beforeElapsed.hasSavedSync()).toBe(false); await vi.advanceTimersByTimeAsync(1); await Promise.resolve(); - expect(writeSpy).toHaveBeenCalledTimes(1); - expect(writeSpy.mock.calls.at(0)).toEqual([ - storagePath, - { - version: 1, - savedSync: { - nextBatch: "s222", - accountData: createSyncResponse("s222").account_data.events, - roomsData: { - join: { - "!room:example.org": { - summary: { - "m.heroes": [], - "m.invited_member_count": undefined, - "m.joined_member_count": undefined, - }, - state: { events: [] }, - "org.matrix.msc4222.state_after": { events: [] }, - timeline: { - events: [ - { - content: { - body: "hello", - msgtype: "m.text", - }, - event_id: "$message", - origin_server_ts: 1, - sender: "@user:example.org", - type: "m.room.message", - }, - { - content: { - body: "hello", - msgtype: "m.text", - }, - event_id: "$message", - origin_server_ts: 1, - sender: "@user:example.org", - type: "m.room.message", - }, - ], - prev_batch: "t0", - }, - ephemeral: { events: [] }, - account_data: { events: [] }, - unread_notifications: {}, - unread_thread_notifications: undefined, - msc4354_sticky: undefined, - }, - }, - invite: {}, - leave: {}, - knock: {}, - }, - }, - cleanShutdown: false, - clientOptions: { - lazyLoadMembers: true, - }, - }, - ]); - await store.flush(); - }); - it("waits for an in-flight persist when shutdown flush runs", async () => { - vi.useFakeTimers(); - const storagePath = createStoragePath(); - const writeDeferred = createDeferred(); - const writeSpy = vi - .spyOn(jsonStore, "writeJsonFileAtomically") - .mockImplementation(async () => writeDeferred.promise); - - const store = new FileBackedMatrixSyncStore(storagePath); - await store.setSyncData(createSyncResponse("s777")); - await vi.advanceTimersByTimeAsync(250); - - let flushCompleted = false; - const flushPromise = store.flush().then(() => { - flushCompleted = true; - }); - - await Promise.resolve(); - expect(writeSpy).toHaveBeenCalledTimes(1); - expect(flushCompleted).toBe(false); - - writeDeferred.resolve(); - await flushPromise; - expect(flushCompleted).toBe(true); + const persisted = new SqliteBackedMatrixSyncStore(storageRoot); + expect(persisted.hasSavedSync()).toBe(true); + await expect(persisted.getSavedSyncToken()).resolves.toBe("s222"); + await expect(persisted.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true }); }); it("persists client options alongside sync state", async () => { - const storagePath = createStoragePath(); + const storageRoot = createStorageRoot(); - const firstStore = new FileBackedMatrixSyncStore(storagePath); + const firstStore = new SqliteBackedMatrixSyncStore(storageRoot); await firstStore.storeClientOptions({ lazyLoadMembers: true }); await firstStore.flush(); - const secondStore = new FileBackedMatrixSyncStore(storagePath); + const secondStore = new SqliteBackedMatrixSyncStore(storageRoot); await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true }); }); - it("loads legacy raw sync payloads from bot-storage.json", async () => { - const storagePath = createStoragePath(); + it("ignores legacy raw sync cache files", async () => { + const storageRoot = createStorageRoot(); fs.writeFileSync( - storagePath, + path.join(storageRoot, "bot-storage.json"), JSON.stringify({ next_batch: "legacy-token", rooms: { @@ -333,18 +303,8 @@ describe("FileBackedMatrixSyncStore", () => { "utf8", ); - const store = new FileBackedMatrixSyncStore(storagePath); - expect(store.hasSavedSync()).toBe(true); - await expect(store.getSavedSyncToken()).resolves.toBe("legacy-token"); - await expect(store.getSavedSync()).resolves.toEqual({ - nextBatch: "legacy-token", - roomsData: { - join: {}, - invite: {}, - leave: {}, - knock: {}, - }, - accountData: [], - }); + const store = new SqliteBackedMatrixSyncStore(storageRoot); + expect(store.hasSavedSync()).toBe(false); + await expect(store.getSavedSyncToken()).resolves.toBe(null); }); }); diff --git a/extensions/matrix/src/matrix/client/file-sync-store.ts b/extensions/matrix/src/matrix/client/file-sync-store.ts index 348b64dca36..78847476236 100644 --- a/extensions/matrix/src/matrix/client/file-sync-store.ts +++ b/extensions/matrix/src/matrix/client/file-sync-store.ts @@ -1,5 +1,5 @@ -// Matrix plugin module implements file sync store behavior. -import { readFileSync } from "node:fs"; +// Matrix plugin module implements SQLite sync cache behavior. +import { createHash, randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; import { @@ -11,22 +11,56 @@ import { type ISyncResponse, type IStoredClientOpts, } from "matrix-js-sdk/lib/matrix.js"; -import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store"; +import type { + PluginStateKeyedStore, + PluginStateSyncKeyedStore, +} from "openclaw/plugin-sdk/plugin-state-runtime"; import { isRecord } from "../../record-shared.js"; +import { getMatrixRuntime } from "../../runtime.js"; import { createAsyncLock } from "../async-lock.js"; import { LogService } from "../sdk/logger.js"; +import { resolveMatrixSqliteStateEnv } from "../sqlite-state.js"; import { claimCurrentTokenStorageState } from "./storage.js"; const STORE_VERSION = 1; const PERSIST_DEBOUNCE_MS = 250; +const SYNC_CACHE_NAMESPACE = "sync-cache"; +const SYNC_CACHE_MAX_ENTRIES = 20_000; +const SYNC_CACHE_MAX_CHUNKS = Math.floor((SYNC_CACHE_MAX_ENTRIES - 1) / 2); +const SYNC_CACHE_STATE_KEY = "current"; +// PluginState serializes this string inside a row object; 24KB leaves room for JSON escaping. +const SYNC_CACHE_CHUNK_BYTES = 24_000; -type PersistedMatrixSyncStore = { +export type PersistedMatrixSyncStore = { version: number; savedSync: ISyncData | null; clientOptions?: IStoredClientOpts; cleanShutdown?: boolean; }; +type MatrixSyncCacheMeta = { + kind: "meta"; + version: number; + generation: string; + chunkCount: number; + syncDigest?: string; + clientOptions?: IStoredClientOpts; + cleanShutdown?: boolean; +}; + +type MatrixSyncCacheChunk = { + kind: "sync-chunk"; + index: number; + data: string; +}; + +export type MatrixSyncCacheRecord = MatrixSyncCacheMeta | MatrixSyncCacheChunk; + +type MatrixSyncCacheAsyncStore = Pick< + PluginStateKeyedStore, + "delete" | "entries" | "lookup" | "register" +>; + function normalizeRoomsData(value: unknown): IRooms | null { if (!isRecord(value)) { return null; @@ -80,36 +114,30 @@ function toPersistedSyncData(value: unknown): ISyncData | null { return null; } -function readPersistedStore(raw: string): PersistedMatrixSyncStore | null { - try { - const parsed = JSON.parse(raw) as { - version?: unknown; - savedSync?: unknown; - clientOptions?: unknown; - cleanShutdown?: unknown; - }; - const savedSync = toPersistedSyncData(parsed.savedSync); - if (parsed.version === STORE_VERSION) { - return { - version: STORE_VERSION, - savedSync, - clientOptions: isRecord(parsed.clientOptions) - ? (parsed.clientOptions as IStoredClientOpts) - : undefined, - cleanShutdown: parsed.cleanShutdown === true, - }; - } - - // Backward-compat: prior Matrix state files stored the raw sync blob at the - // top level without versioning or wrapped metadata. - return { - version: STORE_VERSION, - savedSync: toPersistedSyncData(parsed), - cleanShutdown: false, - }; - } catch { +function normalizePersistedStore(value: unknown): PersistedMatrixSyncStore | null { + if (!isRecord(value) || value.version !== STORE_VERSION) { return null; } + return { + version: STORE_VERSION, + savedSync: toPersistedSyncData(value.savedSync), + clientOptions: isRecord(value.clientOptions) + ? (value.clientOptions as IStoredClientOpts) + : undefined, + cleanShutdown: value.cleanShutdown === true, + }; +} + +function normalizeLegacyPersistedStore(value: unknown): PersistedMatrixSyncStore | null { + const persisted = normalizePersistedStore(value); + if (persisted) { + return persisted; + } + return { + version: STORE_VERSION, + savedSync: toPersistedSyncData(value), + cleanShutdown: false, + }; } function cloneJson(value: T): T { @@ -126,9 +154,12 @@ function syncDataToSyncResponse(syncData: ISyncData): ISyncResponse { }; } -export class FileBackedMatrixSyncStore extends MemoryStore { +export class SqliteBackedMatrixSyncStore extends MemoryStore { private readonly persistLock = createAsyncLock(); private readonly accumulator = new SyncAccumulator(); + private readonly stateKey: string; + private readonly store: PluginStateSyncKeyedStore; + private readonly storeUnavailableError: unknown; private savedSync: ISyncData | null = null; private savedClientOptions: IStoredClientOpts | undefined; private readonly hadSavedSyncOnLoad: boolean; @@ -138,21 +169,29 @@ export class FileBackedMatrixSyncStore extends MemoryStore { private persistTimer: NodeJS.Timeout | null = null; private persistPromise: Promise | null = null; - constructor(private readonly storagePath: string) { + constructor(private readonly storageRootDir: string) { super(); + this.stateKey = resolveSyncCacheStateKey(storageRootDir); let restoredSavedSync: ISyncData | null = null; let restoredClientOptions: IStoredClientOpts | undefined; let restoredCleanShutdown = false; + let syncCacheStore = createNoopMatrixSyncCacheStore(); + let syncCacheStoreUnavailableError: unknown; try { - const raw = readFileSync(this.storagePath, "utf8"); - const persisted = readPersistedStore(raw); - restoredSavedSync = persisted?.savedSync ?? null; - restoredClientOptions = persisted?.clientOptions; - restoredCleanShutdown = persisted?.cleanShutdown === true; - } catch { - // Missing or unreadable sync cache should not block startup. + syncCacheStore = openMatrixSyncCacheStore(storageRootDir); + const persisted = readPersistedStoreFromSyncStore(syncCacheStore, this.stateKey); + if (persisted) { + restoredSavedSync = persisted.savedSync; + restoredClientOptions = persisted.clientOptions; + restoredCleanShutdown = persisted.cleanShutdown === true; + } + } catch (err) { + syncCacheStoreUnavailableError = err; + LogService.warn("MatrixSyncCacheStore", "Failed to load Matrix sync cache:", err); } + this.store = syncCacheStore; + this.storeUnavailableError = syncCacheStoreUnavailableError; this.savedSync = restoredSavedSync; this.savedClientOptions = restoredClientOptions; @@ -219,6 +258,7 @@ export class FileBackedMatrixSyncStore extends MemoryStore { } override async deleteAllData(): Promise { + this.assertStoreAvailable(); if (this.persistTimer) { clearTimeout(this.persistTimer); this.persistTimer = null; @@ -229,7 +269,15 @@ export class FileBackedMatrixSyncStore extends MemoryStore { this.savedSync = null; this.savedClientOptions = undefined; this.cleanShutdown = false; - await fs.rm(this.storagePath, { force: true }).catch(() => undefined); + this.store.delete(metaKey(this.stateKey)); + for (const row of this.store.entries()) { + if (row.key.startsWith(chunkKeyPrefix(this.stateKey))) { + this.store.delete(row.key); + } + } + await fs + .rm(resolveLegacySyncCachePath(this.storageRootDir), { force: true }) + .catch(() => undefined); } markCleanShutdown(): void { @@ -261,13 +309,14 @@ export class FileBackedMatrixSyncStore extends MemoryStore { this.persistTimer = setTimeout(() => { this.persistTimer = null; void this.flush().catch((err: unknown) => { - LogService.warn("MatrixFileSyncStore", "Failed to persist Matrix sync store:", err); + LogService.warn("MatrixSyncCacheStore", "Failed to persist Matrix sync store:", err); }); }, PERSIST_DEBOUNCE_MS); this.persistTimer.unref?.(); } private async persist(): Promise { + this.assertStoreAvailable(); this.dirty = false; const payload: PersistedMatrixSyncStore = { version: STORE_VERSION, @@ -277,9 +326,9 @@ export class FileBackedMatrixSyncStore extends MemoryStore { }; try { await this.persistLock(async () => { - await writeJsonFileAtomically(this.storagePath, payload); + this.writePersistedStore(payload); claimCurrentTokenStorageState({ - rootDir: path.dirname(this.storagePath), + rootDir: this.storageRootDir, }); }); } catch (err) { @@ -287,4 +336,273 @@ export class FileBackedMatrixSyncStore extends MemoryStore { throw err; } } + + private writePersistedStore(payload: PersistedMatrixSyncStore): void { + const rows = buildSyncCacheRows(this.stateKey, payload); + for (const row of rows.chunks) { + this.store.register(row.key, row.value); + } + this.store.register(rows.meta.key, rows.meta.value); + for (const row of this.store.entries()) { + if (row.key.startsWith(chunkKeyPrefix(this.stateKey)) && !rows.nextChunkKeys.has(row.key)) { + this.store.delete(row.key); + } + } + } + + private assertStoreAvailable(): void { + if (this.storeUnavailableError == null) { + return; + } + throw new Error("Matrix sync cache SQLite store is unavailable; cannot persist sync state", { + cause: this.storeUnavailableError, + }); + } +} + +function createNoopMatrixSyncCacheStore(): PluginStateSyncKeyedStore { + return { + register: () => {}, + registerIfAbsent: () => false, + lookup: () => undefined, + consume: () => undefined, + delete: () => false, + entries: () => [], + clear: () => {}, + }; +} + +function readPersistedStoreFromSyncStore( + store: PluginStateSyncKeyedStore, + stateKey: string, +): PersistedMatrixSyncStore | null { + const meta = store.lookup(metaKey(stateKey)); + if (!isSyncCacheMeta(meta)) { + return null; + } + const chunks: string[] = []; + for (let index = 0; index < meta.chunkCount; index += 1) { + const chunk = store.lookup(chunkKey(stateKey, meta.generation, index)); + if (!isSyncCacheChunk(chunk) || chunk.index !== index) { + return normalizePersistedStore({ + version: STORE_VERSION, + savedSync: null, + clientOptions: meta.clientOptions, + cleanShutdown: false, + }); + } + chunks.push(chunk.data); + } + let savedSync: ISyncData | null = null; + if (chunks.length > 0) { + const syncJson = chunks.join(""); + if (meta.syncDigest !== digestText(syncJson)) { + return normalizePersistedStore({ + version: STORE_VERSION, + savedSync: null, + clientOptions: meta.clientOptions, + cleanShutdown: false, + }); + } + try { + savedSync = toPersistedSyncData(JSON.parse(syncJson)); + } catch { + savedSync = null; + } + } + return normalizePersistedStore({ + version: STORE_VERSION, + savedSync, + clientOptions: meta.clientOptions, + cleanShutdown: meta.cleanShutdown, + }); +} + +function openMatrixSyncCacheStore( + storageRootDir: string, +): PluginStateSyncKeyedStore { + return getMatrixRuntime().state.openSyncKeyedStore( + openMatrixSyncCacheStoreOptions(storageRootDir), + ); +} + +function resolveSyncCacheStateKey(_storageRootDir: string): string { + return SYNC_CACHE_STATE_KEY; +} + +function metaKey(stateKey: string): string { + return `${stateKey}:meta`; +} + +function chunkKeyPrefix(stateKey: string): string { + return `${stateKey}:sync:`; +} + +function chunkKey(stateKey: string, generation: string, index: number): string { + return `${chunkKeyPrefix(stateKey)}${generation}:${index}`; +} + +function resolveLegacySyncCachePath(storageRootDir: string): string { + return path.join(storageRootDir, "bot-storage.json"); +} + +function digestText(value: string): string { + return createHash("sha256").update(value, "utf8").digest("hex"); +} + +function isSyncCacheMeta(value: unknown): value is MatrixSyncCacheMeta { + return ( + isRecord(value) && + value.kind === "meta" && + value.version === STORE_VERSION && + typeof value.generation === "string" && + value.generation.trim() !== "" && + typeof value.chunkCount === "number" && + Number.isSafeInteger(value.chunkCount) && + value.chunkCount >= 0 && + value.chunkCount <= SYNC_CACHE_MAX_CHUNKS + ); +} + +function isSyncCacheChunk(value: unknown): value is MatrixSyncCacheChunk { + return ( + isRecord(value) && + value.kind === "sync-chunk" && + typeof value.index === "number" && + Number.isSafeInteger(value.index) && + value.index >= 0 && + typeof value.data === "string" + ); +} + +function chunkSyncCacheJson(value: string): string[] { + const chunks: string[] = []; + const pushChunk = (chunk: string) => { + if (chunks.length >= SYNC_CACHE_MAX_CHUNKS) { + throw new Error("Matrix sync cache exceeds SQLite chunk limit"); + } + chunks.push(chunk); + }; + let current = ""; + let currentBytes = 0; + for (const char of value) { + const charBytes = Buffer.byteLength(char, "utf8"); + if (current && currentBytes + charBytes > SYNC_CACHE_CHUNK_BYTES) { + pushChunk(current); + current = ""; + currentBytes = 0; + } + current += char; + currentBytes += charBytes; + } + if (current) { + pushChunk(current); + } + return chunks; +} + +function buildSyncCacheRows( + stateKey: string, + payload: PersistedMatrixSyncStore, +): { + meta: { key: string; value: MatrixSyncCacheMeta }; + chunks: { key: string; value: MatrixSyncCacheChunk }[]; + nextChunkKeys: Set; +} { + const generation = randomUUID().replaceAll("-", ""); + const syncJson = payload.savedSync ? JSON.stringify(payload.savedSync) : ""; + const chunkValues = syncJson ? chunkSyncCacheJson(syncJson) : []; + const chunks = chunkValues.map((data, index) => ({ + key: chunkKey(stateKey, generation, index), + value: { + kind: "sync-chunk" as const, + index, + data, + }, + })); + return { + chunks, + nextChunkKeys: new Set(chunks.map((chunk) => chunk.key)), + meta: { + key: metaKey(stateKey), + value: { + kind: "meta", + version: STORE_VERSION, + generation, + chunkCount: chunks.length, + ...(syncJson ? { syncDigest: digestText(syncJson) } : {}), + ...(payload.clientOptions ? { clientOptions: payload.clientOptions } : {}), + cleanShutdown: payload.cleanShutdown === true, + }, + }, + }; +} + +export async function readLegacyMatrixSyncCacheState( + storageRootDir: string, +): Promise { + try { + const raw = await fs.readFile(resolveLegacySyncCachePath(storageRootDir), "utf8"); + const persisted = normalizeLegacyPersistedStore(JSON.parse(raw)); + if (!persisted?.savedSync && !persisted?.clientOptions) { + return null; + } + return persisted; + } catch { + return null; + } +} + +export async function hasMatrixSyncCacheStateInStore(params: { + storageRootDir: string; + store: Pick, "lookup">; +}): Promise { + const stateKey = resolveSyncCacheStateKey(params.storageRootDir); + const meta = await params.store.lookup(metaKey(stateKey)); + if (!isSyncCacheMeta(meta) || meta.chunkCount <= 0) { + return false; + } + const chunks: string[] = []; + for (let index = 0; index < meta.chunkCount; index += 1) { + const chunk = await params.store.lookup(chunkKey(stateKey, meta.generation, index)); + if (!isSyncCacheChunk(chunk) || chunk.index !== index) { + return false; + } + chunks.push(chunk.data); + } + const syncJson = chunks.join(""); + if (meta.syncDigest !== digestText(syncJson)) { + return false; + } + try { + return toPersistedSyncData(JSON.parse(syncJson)) !== null; + } catch { + return false; + } +} + +export async function writeMatrixSyncCacheStateToStore(params: { + storageRootDir: string; + payload: PersistedMatrixSyncStore; + store: MatrixSyncCacheAsyncStore; +}): Promise { + const stateKey = resolveSyncCacheStateKey(params.storageRootDir); + const rows = buildSyncCacheRows(stateKey, params.payload); + for (const row of rows.chunks) { + await params.store.register(row.key, row.value); + } + await params.store.register(rows.meta.key, rows.meta.value); + for (const row of await params.store.entries()) { + if (row.key.startsWith(chunkKeyPrefix(stateKey)) && !rows.nextChunkKeys.has(row.key)) { + await params.store.delete(row.key); + } + } +} + +export function openMatrixSyncCacheStoreOptions(storageRootDir: string) { + return { + namespace: SYNC_CACHE_NAMESPACE, + maxEntries: SYNC_CACHE_MAX_ENTRIES, + env: resolveMatrixSqliteStateEnv({ stateDir: storageRootDir }), + }; } diff --git a/extensions/matrix/src/matrix/client/storage.test.ts b/extensions/matrix/src/matrix/client/storage.test.ts index 127a1d22747..a8e9e2adc50 100644 --- a/extensions/matrix/src/matrix/client/storage.test.ts +++ b/extensions/matrix/src/matrix/client/storage.test.ts @@ -5,6 +5,7 @@ import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; import { resolveMatrixAccountStorageRoot } from "../../storage-paths.js"; import { installMatrixTestRuntime } from "../../test-runtime.js"; +import { SqliteBackedMatrixSyncStore } from "./file-sync-store.js"; import { claimCurrentTokenStorageState, maybeMigrateLegacyStorage, @@ -257,6 +258,23 @@ describe("matrix client storage paths", () => { return legacyRoot; } + function legacySyncCacheBody(nextBatch = "legacy-token"): string { + return JSON.stringify({ + version: 1, + savedSync: { + nextBatch, + accountData: [], + roomsData: { + join: {}, + invite: {}, + leave: {}, + knock: {}, + }, + }, + cleanShutdown: true, + }); + } + function writeJson(rootDir: string, filename: string, value: Record) { fs.writeFileSync(path.join(rootDir, filename), JSON.stringify(value, null, 2)); } @@ -385,7 +403,9 @@ describe("matrix client storage paths", () => { it("falls back to migrating the older flat matrix storage layout", async () => { const stateDir = setupStateDir(); const storagePaths = resolveDefaultStoragePaths(); - const legacyRoot = writeLegacyMatrixStorage(stateDir, { storageBody: '{"legacy":true}' }); + const legacyRoot = writeLegacyMatrixStorage(stateDir, { + storageBody: legacySyncCacheBody("legacy-token"), + }); const env = createMigrationEnv(stateDir); await maybeMigrateLegacyStorage({ @@ -395,8 +415,48 @@ describe("matrix client storage paths", () => { expectFallbackMigrationSnapshot(env); expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json"))).toBe(false); - expect(fs.readFileSync(storagePaths.storagePath, "utf8")).toBe('{"legacy":true}'); + expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json.migrated"))).toBe(true); + expect(fs.existsSync(storagePaths.storagePath)).toBe(false); expect(fs.existsSync(storagePaths.cryptoPath)).toBe(true); + const syncStore = new SqliteBackedMatrixSyncStore(storagePaths.rootDir); + expect(syncStore.hasSavedSync()).toBe(true); + await expect(syncStore.getSavedSyncToken()).resolves.toBe("legacy-token"); + }); + + it("migrates the previous account-scoped sync cache into sqlite before startup", async () => { + const stateDir = setupStateDir(); + const storagePaths = resolveDefaultStoragePaths(); + fs.mkdirSync(storagePaths.rootDir, { recursive: true }); + fs.writeFileSync(storagePaths.storagePath, legacySyncCacheBody("account-token")); + const env = createMigrationEnv(stateDir); + + await maybeMigrateLegacyStorage({ + storagePaths, + env, + }); + + expectFallbackMigrationSnapshot(env); + expect(fs.existsSync(storagePaths.storagePath)).toBe(false); + expect(fs.existsSync(`${storagePaths.storagePath}.migrated`)).toBe(true); + const syncStore = new SqliteBackedMatrixSyncStore(storagePaths.rootDir); + expect(syncStore.hasSavedSync()).toBe(true); + await expect(syncStore.getSavedSyncToken()).resolves.toBe("account-token"); + }); + + it("ignores unrecognized account-scoped sync cache files without a migration snapshot", async () => { + const stateDir = setupStateDir(); + const storagePaths = resolveDefaultStoragePaths(); + fs.mkdirSync(storagePaths.rootDir, { recursive: true }); + fs.writeFileSync(storagePaths.storagePath, '{"new":true}'); + const env = createMigrationEnv(stateDir); + + await maybeMigrateLegacyStorage({ + storagePaths, + env, + }); + + expect(maybeCreateMatrixMigrationSnapshotMock).not.toHaveBeenCalled(); + expect(fs.readFileSync(storagePaths.storagePath, "utf8")).toBe('{"new":true}'); }); it("continues migrating whichever legacy artifact is still missing", async () => { @@ -414,6 +474,7 @@ describe("matrix client storage paths", () => { expectFallbackMigrationSnapshot(env); expect(fs.readFileSync(storagePaths.storagePath, "utf8")).toBe('{"new":true}'); + expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json"))).toBe(false); expect(fs.existsSync(path.join(legacyRoot, "crypto"))).toBe(false); expect(fs.existsSync(storagePaths.cryptoPath)).toBe(true); }); @@ -565,6 +626,41 @@ describe("matrix client storage paths", () => { expect(rotatedStoragePaths.storagePath).toBe(oldStoragePaths.storagePath); }); + it("prefers claimed current-token state over an empty new-token metadata root", () => { + const stateDir = setupStateDir(); + const oldStoragePaths = seedCanonicalStorageRoot({ + stateDir, + accessToken: "secret-token-old", + storageMeta: { + homeserver: defaultStorageAuth.homeserver, + userId: defaultStorageAuth.userId, + accountId: "default", + accessTokenHash: resolveDefaultStoragePaths({ accessToken: "secret-token-old" }).tokenHash, + currentTokenStateClaimed: true, + deviceId: "DEVICE123", + }, + }); + seedCanonicalStorageRoot({ + stateDir, + accessToken: "secret-token-new", + storageMeta: { + homeserver: defaultStorageAuth.homeserver, + userId: defaultStorageAuth.userId, + accountId: "default", + accessTokenHash: resolveDefaultStoragePaths({ accessToken: "secret-token-new" }).tokenHash, + deviceId: "DEVICE123", + }, + }); + + const rotatedStoragePaths = resolveDefaultStoragePaths({ + accessToken: "secret-token-new", + deviceId: "DEVICE123", + }); + + expect(rotatedStoragePaths.rootDir).toBe(oldStoragePaths.rootDir); + expect(rotatedStoragePaths.tokenHash).toBe(oldStoragePaths.tokenHash); + }); + it("does not reuse a populated older token-hash root while deviceId is unknown", () => { const stateDir = setupStateDir(); const oldStoragePaths = seedExistingStorageRoot({ diff --git a/extensions/matrix/src/matrix/client/storage.ts b/extensions/matrix/src/matrix/client/storage.ts index 2716e2d4bb1..b8e7f3edc82 100644 --- a/extensions/matrix/src/matrix/client/storage.ts +++ b/extensions/matrix/src/matrix/client/storage.ts @@ -30,6 +30,11 @@ type LegacyMoveRecord = { label: string; }; +type LegacyArchiveRecord = { + sourcePath: string; + label: string; +}; + type StoredRootMetadata = { homeserver?: string; userId?: string; @@ -41,12 +46,12 @@ type StoredRootMetadata = { }; function resolveLegacyStoragePaths(env: NodeJS.ProcessEnv = process.env): { + rootDir: string; storagePath: string; cryptoPath: string; } { const stateDir = getMatrixRuntime().state.resolveStateDir(env, os.homedir); - const legacy = resolveMatrixLegacyFlatStoragePaths(stateDir); - return { storagePath: legacy.storagePath, cryptoPath: legacy.cryptoPath }; + return resolveMatrixLegacyFlatStoragePaths(stateDir); } function assertLegacyMigrationAccountSelection(params: { accountKey: string }): void { @@ -71,7 +76,7 @@ function assertLegacyMigrationAccountSelection(params: { accountKey: string }): function scoreStorageRoot(rootDir: string): number { let score = 0; - if (fs.existsSync(path.join(rootDir, "bot-storage.json"))) { + if (readStoredRootMetadata(rootDir).currentTokenStateClaimed === true) { score += 8; } if (fs.existsSync(path.join(rootDir, "crypto"))) { @@ -336,23 +341,33 @@ export async function maybeMigrateLegacyStorage(params: { env?: NodeJS.ProcessEnv; }): Promise { const legacy = resolveLegacyStoragePaths(params.env); - const hasLegacyStorage = fs.existsSync(legacy.storagePath); + const hasFlatLegacyStorageFile = fs.existsSync(legacy.storagePath); + const hasAccountScopedLegacyStorageFile = fs.existsSync(params.storagePaths.storagePath); + const syncCache = + hasFlatLegacyStorageFile || hasAccountScopedLegacyStorageFile + ? await import("./file-sync-store.js") + : null; + const hasFlatLegacyStorage = + hasFlatLegacyStorageFile && + (await syncCache?.readLegacyMatrixSyncCacheState(legacy.rootDir)) !== null; + const hasAccountScopedLegacyStorage = + hasAccountScopedLegacyStorageFile && + (await syncCache?.readLegacyMatrixSyncCacheState(params.storagePaths.rootDir)) !== null; const hasLegacyCrypto = fs.existsSync(legacy.cryptoPath); - if (!hasLegacyStorage && !hasLegacyCrypto) { + if (!hasFlatLegacyStorage && !hasAccountScopedLegacyStorage && !hasLegacyCrypto) { return; } - const hasTargetStorage = fs.existsSync(params.storagePaths.storagePath); const hasTargetCrypto = fs.existsSync(params.storagePaths.cryptoPath); - // Continue partial migrations one artifact at a time; only skip items whose targets already exist. - const shouldMigrateStorage = hasLegacyStorage && !hasTargetStorage; const shouldMigrateCrypto = hasLegacyCrypto && !hasTargetCrypto; - if (!shouldMigrateStorage && !shouldMigrateCrypto) { + if (!hasFlatLegacyStorage && !hasAccountScopedLegacyStorage && !shouldMigrateCrypto) { return; } - assertLegacyMigrationAccountSelection({ - accountKey: params.storagePaths.accountKey, - }); + if (hasFlatLegacyStorage || hasLegacyCrypto) { + assertLegacyMigrationAccountSelection({ + accountKey: params.storagePaths.accountKey, + }); + } const logger = getMatrixRuntime().logging.getChildLogger({ module: "matrix-storage" }); const { maybeCreateMatrixMigrationSnapshot } = await import("./migration-snapshot.runtime.js"); @@ -363,19 +378,28 @@ export async function maybeMigrateLegacyStorage(params: { }); fs.mkdirSync(params.storagePaths.rootDir, { recursive: true }); const moved: LegacyMoveRecord[] = []; + const pendingArchives: LegacyArchiveRecord[] = []; const skippedExistingTargets: string[] = []; try { - if (shouldMigrateStorage) { - moveLegacyStoragePathOrThrow({ - sourcePath: legacy.storagePath, - targetPath: params.storagePaths.storagePath, - label: "sync store", + if (hasAccountScopedLegacyStorage) { + await migrateLegacySyncCacheToSqlite({ + sourceRootDir: params.storagePaths.rootDir, + sourcePath: params.storagePaths.storagePath, + targetRootDir: params.storagePaths.rootDir, + label: "account sync cache", moved, + pendingArchives, + }); + } + if (hasFlatLegacyStorage) { + await migrateLegacySyncCacheToSqlite({ + sourceRootDir: legacy.rootDir, + sourcePath: legacy.storagePath, + targetRootDir: params.storagePaths.rootDir, + label: "flat sync cache", + moved, + pendingArchives, }); - } else if (hasLegacyStorage) { - skippedExistingTargets.push( - `- sync store remains at ${legacy.storagePath} because ${params.storagePaths.storagePath} already exists`, - ); } if (shouldMigrateCrypto) { moveLegacyStoragePathOrThrow({ @@ -398,6 +422,12 @@ export async function maybeMigrateLegacyStorage(params: { { cause: err }, ); } + for (const archive of pendingArchives) { + archiveLegacyStoragePath({ + ...archive, + skippedExistingTargets, + }); + } if (moved.length > 0) { logger.info( `matrix: migrated legacy client storage into ${params.storagePaths.rootDir}\n${moved @@ -412,6 +442,63 @@ export async function maybeMigrateLegacyStorage(params: { } } +async function migrateLegacySyncCacheToSqlite(params: { + sourceRootDir: string; + sourcePath: string; + targetRootDir: string; + label: string; + moved: LegacyMoveRecord[]; + pendingArchives: LegacyArchiveRecord[]; +}): Promise { + const syncCache = await import("./file-sync-store.js"); + const persisted = await syncCache.readLegacyMatrixSyncCacheState(params.sourceRootDir); + if (!persisted) { + return; + } + const store = getMatrixRuntime().state.openKeyedStore< + import("./file-sync-store.js").MatrixSyncCacheRecord + >(syncCache.openMatrixSyncCacheStoreOptions(params.targetRootDir)); + if ( + !(await syncCache.hasMatrixSyncCacheStateInStore({ + storageRootDir: params.targetRootDir, + store, + })) + ) { + await syncCache.writeMatrixSyncCacheStateToStore({ + storageRootDir: params.targetRootDir, + payload: persisted, + store, + }); + claimCurrentTokenStorageState({ + rootDir: params.targetRootDir, + }); + params.moved.push({ + sourcePath: params.sourcePath, + targetPath: `${params.targetRootDir} SQLite sync cache`, + label: params.label, + }); + } + params.pendingArchives.push({ + sourcePath: params.sourcePath, + label: params.label, + }); +} + +function archiveLegacyStoragePath(params: { + sourcePath: string; + label: string; + skippedExistingTargets: string[]; +}): void { + const archivedLegacyStoragePath = `${params.sourcePath}.migrated`; + if (fs.existsSync(archivedLegacyStoragePath)) { + params.skippedExistingTargets.push( + `- ${params.label} remains at ${params.sourcePath} because ${archivedLegacyStoragePath} already exists`, + ); + return; + } + fs.renameSync(params.sourcePath, archivedLegacyStoragePath); +} + function moveLegacyStoragePathOrThrow(params: { sourcePath: string; targetPath: string; diff --git a/extensions/matrix/src/matrix/sdk.test.ts b/extensions/matrix/src/matrix/sdk.test.ts index 23f65f37db2..10cbb072647 100644 --- a/extensions/matrix/src/matrix/sdk.test.ts +++ b/extensions/matrix/src/matrix/sdk.test.ts @@ -4,7 +4,9 @@ import { EventEmitter } from "node:events"; import fs from "node:fs"; import os from "node:os"; import path from "node:path"; +import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { installMatrixTestRuntime } from "../test-runtime.js"; function requestUrl(input: RequestInfo | URL | undefined): string { if (!input) { @@ -301,6 +303,8 @@ const { MatrixClient } = await import("./sdk.js"); describe("MatrixClient request hardening", () => { beforeEach(() => { + resetPluginStateStoreForTests(); + installMatrixTestRuntime(); matrixJsClient = createMatrixJsClientStub(); lastCreateClientOpts = null; vi.useRealTimers(); @@ -312,6 +316,7 @@ describe("MatrixClient request hardening", () => { vi.useRealTimers(); vi.unstubAllGlobals(); clearTestUndiciRuntimeDepsOverride(); + resetPluginStateStoreForTests(); }); it("blocks absolute endpoints unless explicitly allowed", async () => { @@ -659,11 +664,10 @@ describe("MatrixClient request hardening", () => { it("wires the sync store into the SDK and flushes it on shutdown", async () => { const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sdk-store-")); - const storagePath = path.join(tempDir, "bot-storage.json"); try { const client = new MatrixClient("https://matrix.example.org", "token", { - storagePath, + storageRootDir: tempDir, }); const store = lastCreateClientOpts?.store as { flush: () => Promise } | undefined; diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index 2063c56b357..0a9dccd3d4f 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -21,7 +21,7 @@ import { } from "openclaw/plugin-sdk/string-coerce-runtime"; import type { SsrFPolicy } from "../runtime-api.js"; import { resolveMatrixRoomKeyBackupReadinessError } from "./backup-health.js"; -import { FileBackedMatrixSyncStore } from "./client/file-sync-store.js"; +import { SqliteBackedMatrixSyncStore } from "./client/file-sync-store.js"; import { createMatrixJsSdkClientLogger } from "./client/logging.js"; import { formatMatrixErrorMessage, @@ -329,7 +329,7 @@ export class MatrixClient { private readonly syncFilter?: IFilterDefinition; private readonly encryptionEnabled: boolean; private readonly password?: string; - private readonly syncStore?: FileBackedMatrixSyncStore; + private readonly syncStore?: SqliteBackedMatrixSyncStore; private readonly idbSnapshotPath?: string; private readonly cryptoDatabasePrefix?: string; private bridgeRegistered = false; @@ -370,7 +370,7 @@ export class MatrixClient { encryption?: boolean; initialSyncLimit?: number; syncFilter?: IFilterDefinition; - storagePath?: string; + storageRootDir?: string; recoveryKeyPath?: string; idbSnapshotPath?: string; cryptoDatabasePrefix?: string; @@ -390,7 +390,9 @@ export class MatrixClient { this.syncFilter = opts.syncFilter; this.encryptionEnabled = opts.encryption === true; this.password = opts.password; - this.syncStore = opts.storagePath ? new FileBackedMatrixSyncStore(opts.storagePath) : undefined; + this.syncStore = opts.storageRootDir + ? new SqliteBackedMatrixSyncStore(opts.storageRootDir) + : undefined; this.idbSnapshotPath = opts.idbSnapshotPath; this.cryptoDatabasePrefix = opts.cryptoDatabasePrefix; this.selfUserId = opts.userId?.trim() || null; diff --git a/extensions/matrix/src/test-runtime.ts b/extensions/matrix/src/test-runtime.ts index 37c9ed79772..e08687f1370 100644 --- a/extensions/matrix/src/test-runtime.ts +++ b/extensions/matrix/src/test-runtime.ts @@ -3,6 +3,11 @@ import { implicitMentionKindWhen, resolveInboundMentionDecision, } from "openclaw/plugin-sdk/channel-mention-gating"; +import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + createPluginStateKeyedStoreForTests, + createPluginStateSyncKeyedStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-test-runtime"; import { vi } from "vitest"; import type { PluginRuntime } from "./runtime-api.js"; import { setMatrixRuntime } from "./runtime.js"; @@ -18,7 +23,10 @@ type MatrixRuntimeStub = { config: Pick; channel?: PluginRuntime["channel"]; logging?: PluginRuntime["logging"]; - state: Pick, "resolveStateDir">; + state: Pick< + NonNullable, + "openKeyedStore" | "openSyncKeyedStore" | "resolveStateDir" + >; }; function createMatrixRuntimeMediaMock( @@ -47,10 +55,17 @@ function createMatrixRuntimeMediaMock( } export function installMatrixTestRuntime(options: MatrixTestRuntimeOptions = {}): void { + const osHomedirForTest = () => "/tmp"; const defaultStateDirResolver: NonNullable["resolveStateDir"] = ( _env, homeDir, ) => options.stateDir ?? (homeDir ?? (() => "/tmp"))(); + const resolvePluginStateEnv = (storeOptions: OpenKeyedStoreOptions): NodeJS.ProcessEnv => ({ + ...(storeOptions.env ?? process.env), + OPENCLAW_STATE_DIR: + storeOptions.env?.OPENCLAW_STATE_DIR?.trim() || + defaultStateDirResolver(storeOptions.env, osHomedirForTest), + }); const getRuntimeConfig = () => options.cfg ?? {}; const logging: PluginRuntime["logging"] | undefined = options.logging ? ({ @@ -74,6 +89,16 @@ export function installMatrixTestRuntime(options: MatrixTestRuntimeOptions = {}) ...(logging ? { logging } : {}), state: { resolveStateDir: defaultStateDirResolver, + openKeyedStore: ((storeOptions: OpenKeyedStoreOptions) => + createPluginStateKeyedStoreForTests("matrix", { + ...storeOptions, + env: resolvePluginStateEnv(storeOptions), + })) as PluginRuntime["state"]["openKeyedStore"], + openSyncKeyedStore: ((storeOptions: OpenKeyedStoreOptions) => + createPluginStateSyncKeyedStoreForTests("matrix", { + ...storeOptions, + env: resolvePluginStateEnv(storeOptions), + })) as PluginRuntime["state"]["openSyncKeyedStore"], }, }; diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-e2ee-destructive.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-e2ee-destructive.ts index 44bd1fdb6c7..bdce9ea5c2a 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-e2ee-destructive.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-e2ee-destructive.ts @@ -33,7 +33,10 @@ import { isMatrixQaExactMarkerReply, type MatrixQaScenarioContext, } from "./scenario-runtime-shared.js"; -import { waitForMatrixSyncStoreWithCursor } from "./scenario-runtime-state-files.js"; +import { + deleteMatrixSyncStoreCursor, + waitForMatrixSyncStoreWithCursor, +} from "./scenario-runtime-state-files.js"; import type { MatrixQaScenarioExecution } from "./scenario-types.js"; type MatrixQaCliRuntime = Awaited>; @@ -1417,7 +1420,7 @@ export async function runMatrixQaE2eeSyncStateLossCryptoIntactScenario( }); await context.restartGatewayAfterStateMutation( async () => { - await rm(syncStore.pathname, { force: true }); + await deleteMatrixSyncStoreCursor(syncStore); }, { timeoutMs: context.timeoutMs, diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts index 7e70bb880d1..667ffb0ce05 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-restart.ts @@ -374,6 +374,8 @@ export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenario await rewriteMatrixSyncStoreCursor({ cursor: staleCursor, pathname: syncStore.pathname, + source: syncStore.source, + stateKey: syncStore.stateKey, }); }); diff --git a/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts b/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts index 5cc930cf8d3..ff6ac6b83bf 100644 --- a/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts +++ b/extensions/qa-matrix/src/runners/contract/scenario-runtime-state-files.ts @@ -1,5 +1,5 @@ // Qa Matrix plugin module implements scenario runtime state files behavior. -import { createHash } from "node:crypto"; +import { createHash, randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; import { setTimeout as sleep } from "node:timers/promises"; @@ -9,8 +9,20 @@ import type { MatrixQaScenarioContext } from "./scenario-runtime-shared.js"; const MATRIX_SYNC_STORE_FILENAME = "bot-storage.json"; const MATRIX_INBOUND_DEDUPE_FILENAME = "inbound-dedupe.json"; const MATRIX_PLUGIN_ID = "matrix"; +const MATRIX_SYNC_CACHE_NAMESPACE = "sync-cache"; const MATRIX_INBOUND_DEDUPE_NAMESPACE = "inbound-dedupe"; const MATRIX_STATE_POLL_INTERVAL_MS = 100; +const MATRIX_SYNC_CACHE_MAX_ENTRIES = 20_000; +const MATRIX_SYNC_CACHE_MAX_CHUNKS = Math.floor((MATRIX_SYNC_CACHE_MAX_ENTRIES - 1) / 2); +// PluginState serializes this string inside a row object; 24KB leaves room for JSON escaping. +const MATRIX_SYNC_CACHE_CHUNK_BYTES = 24_000; + +type MatrixSyncStoreCursor = { + cursor: string; + pathname: string; + source: "json" | "sqlite"; + stateKey?: string; +}; async function readJsonFile(pathname: string): Promise { return JSON.parse(await fs.readFile(pathname, "utf8")) as unknown; @@ -80,6 +92,12 @@ function writePersistedMatrixSyncCursor(parsed: unknown, cursor: string): unknow }, }; } + if (typeof parsed.nextBatch === "string") { + return { + ...parsed, + nextBatch: cursor, + }; + } if (typeof parsed.next_batch === "string") { return { ...parsed, @@ -93,11 +111,290 @@ async function readMatrixSyncStoreCursor(pathname: string): Promise, +): MatrixSyncStoreCursor[] { + const rowsByKey = new Map(); + for (const row of rows) { + if (typeof row.entryKey === "string") { + rowsByKey.set(row.entryKey, parsePluginStateJson(row.valueJson)); + } + } + const cursors: MatrixSyncStoreCursor[] = []; + for (const [entryKey, rawMeta] of rowsByKey) { + if (!entryKey.endsWith(":meta") || !isRecord(rawMeta) || rawMeta.kind !== "meta") { + continue; + } + const stateKey = entryKey.slice(0, -":meta".length); + const generation = typeof rawMeta.generation === "string" ? rawMeta.generation : ""; + const chunkCount = + typeof rawMeta.chunkCount === "number" && + Number.isSafeInteger(rawMeta.chunkCount) && + rawMeta.chunkCount <= MATRIX_SYNC_CACHE_MAX_CHUNKS + ? rawMeta.chunkCount + : 0; + const chunks: string[] = []; + for (let index = 0; index < chunkCount; index += 1) { + const chunk = rowsByKey.get(`${stateKey}:sync:${generation}:${index}`); + if (!isRecord(chunk) || typeof chunk.data !== "string") { + chunks.length = 0; + break; + } + chunks.push(chunk.data); + } + if (chunks.length === 0) { + continue; + } + try { + const cursor = readPersistedMatrixSyncCursor({ + savedSync: JSON.parse(chunks.join("")) as unknown, + }); + if (cursor) { + cursors.push({ cursor, pathname: "", source: "sqlite", stateKey }); + } + } catch { + continue; + } + } + return cursors; +} + +async function readMatrixSyncCacheCursorsFromSqlite(params: { + accountId?: string; + context: MatrixQaScenarioContext; + stateDir: string; + userId?: string; +}): Promise { + const databasePaths = await findFilesByName({ + filename: "openclaw.sqlite", + rootDir: params.stateDir, + maxDepth: 10, + }); + const cursors: Array = []; + try { + const sqlite = await import("node:sqlite"); + for (const databasePath of databasePaths) { + try { + const db = new sqlite.DatabaseSync(databasePath, { readOnly: true }); + try { + const rows = db + .prepare( + `SELECT entry_key AS entryKey, value_json AS valueJson + FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND (expires_at IS NULL OR expires_at > ?)`, + ) + .all(MATRIX_PLUGIN_ID, MATRIX_SYNC_CACHE_NAMESPACE, Date.now()) as Array<{ + entryKey?: unknown; + valueJson?: unknown; + }>; + for (const cursor of readMatrixSyncCacheCursorFromRows(rows)) { + const storageRootDir = path.dirname(path.dirname(databasePath)); + cursors.push({ + ...cursor, + pathname: databasePath, + score: await scoreMatrixStateFile({ + context: params.context, + pathname: path.join(storageRootDir, MATRIX_SYNC_STORE_FILENAME), + ...(params.accountId ? { accountId: params.accountId } : {}), + ...(params.userId ? { userId: params.userId } : {}), + }), + }); + } + } finally { + db.close(); + } + } catch { + continue; + } + } + } catch { + return []; + } + return cursors + .toSorted((a, b) => b.score - a.score || a.pathname.localeCompare(b.pathname)) + .map(({ score: _score, ...cursor }) => cursor); +} + +function chunkMatrixSyncCacheJson(value: string): string[] { + const chunks: string[] = []; + let current = ""; + let currentBytes = 0; + for (const char of value) { + const charBytes = Buffer.byteLength(char, "utf8"); + if (current && currentBytes + charBytes > MATRIX_SYNC_CACHE_CHUNK_BYTES) { + chunks.push(current); + current = ""; + currentBytes = 0; + } + current += char; + currentBytes += charBytes; + } + if (current) { + chunks.push(current); + } + return chunks; +} + +function digestText(value: string): string { + return createHash("sha256").update(value, "utf8").digest("hex"); +} + +async function rewriteMatrixSyncCacheRows(params: { + cursor: string; + pathname: string; + stateKey: string; +}) { + const sqlite = await import("node:sqlite"); + const db = new sqlite.DatabaseSync(params.pathname); + try { + const rows = db + .prepare( + `SELECT entry_key AS entryKey, value_json AS valueJson + FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND entry_key LIKE ?`, + ) + .all(MATRIX_PLUGIN_ID, MATRIX_SYNC_CACHE_NAMESPACE, `${params.stateKey}:%`) as Array<{ + entryKey?: unknown; + valueJson?: unknown; + }>; + const meta = parsePluginStateJson( + rows.find((row) => row.entryKey === `${params.stateKey}:meta`)?.valueJson, + ); + if (!isRecord(meta)) { + throw new Error("Matrix sync cache metadata row was missing"); + } + const cursorEntry = readMatrixSyncCacheCursorFromRows(rows)[0]; + if (!cursorEntry) { + throw new Error("Matrix sync cache did not contain a persisted sync cursor"); + } + const generation = typeof meta.generation === "string" ? meta.generation : ""; + const chunkCount = + typeof meta.chunkCount === "number" && + Number.isSafeInteger(meta.chunkCount) && + meta.chunkCount <= MATRIX_SYNC_CACHE_MAX_CHUNKS + ? meta.chunkCount + : 0; + const chunks: string[] = []; + for (let index = 0; index < chunkCount; index += 1) { + const chunk = parsePluginStateJson( + rows.find((row) => row.entryKey === `${params.stateKey}:sync:${generation}:${index}`) + ?.valueJson, + ); + if (!isRecord(chunk) || typeof chunk.data !== "string") { + throw new Error("Matrix sync cache chunk row was missing"); + } + chunks.push(chunk.data); + } + const syncJson = JSON.stringify( + writePersistedMatrixSyncCursor(JSON.parse(chunks.join("")), params.cursor), + ); + const nextGeneration = randomUUID().replaceAll("-", ""); + const nextChunks = chunkMatrixSyncCacheJson(syncJson); + const now = Date.now(); + const upsert = db.prepare( + `INSERT INTO plugin_state_entries (plugin_id, namespace, entry_key, value_json, created_at, expires_at) + VALUES (?, ?, ?, ?, ?, NULL) + ON CONFLICT(plugin_id, namespace, entry_key) + DO UPDATE SET value_json = excluded.value_json, created_at = excluded.created_at, expires_at = NULL`, + ); + for (const [index, data] of nextChunks.entries()) { + upsert.run( + MATRIX_PLUGIN_ID, + MATRIX_SYNC_CACHE_NAMESPACE, + `${params.stateKey}:sync:${nextGeneration}:${index}`, + JSON.stringify({ kind: "sync-chunk", index, data }), + now, + ); + } + upsert.run( + MATRIX_PLUGIN_ID, + MATRIX_SYNC_CACHE_NAMESPACE, + `${params.stateKey}:meta`, + JSON.stringify({ + ...meta, + generation: nextGeneration, + chunkCount: nextChunks.length, + syncDigest: digestText(syncJson), + }), + now, + ); + db.prepare( + `DELETE FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND entry_key LIKE ? + AND entry_key NOT LIKE ?`, + ).run( + MATRIX_PLUGIN_ID, + MATRIX_SYNC_CACHE_NAMESPACE, + `${params.stateKey}:sync:%`, + `${params.stateKey}:sync:${nextGeneration}:%`, + ); + } finally { + db.close(); + } +} + +export async function rewriteMatrixSyncStoreCursor(params: { + cursor: string; + pathname: string; + source?: "json" | "sqlite"; + stateKey?: string; +}) { + if (params.source === "sqlite" || params.stateKey) { + if (!params.stateKey) { + throw new Error("Matrix sync cache rewrite requires a state key"); + } + await rewriteMatrixSyncCacheRows({ + cursor: params.cursor, + pathname: params.pathname, + stateKey: params.stateKey, + }); + return; + } const parsed = await readJsonFile(params.pathname); await writeJsonFile(params.pathname, writePersistedMatrixSyncCursor(parsed, params.cursor)); } +export async function deleteMatrixSyncStoreCursor(params: MatrixSyncStoreCursor) { + if (params.source !== "sqlite" || !params.stateKey) { + await fs.rm(params.pathname, { force: true }); + return; + } + const sqlite = await import("node:sqlite"); + const db = new sqlite.DatabaseSync(params.pathname); + try { + db.prepare( + `DELETE FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND (entry_key = ? OR entry_key LIKE ?)`, + ).run( + MATRIX_PLUGIN_ID, + MATRIX_SYNC_CACHE_NAMESPACE, + `${params.stateKey}:meta`, + `${params.stateKey}:sync:%`, + ); + } finally { + db.close(); + } +} + async function scoreMatrixStateFile(params: { accountId?: string; context: MatrixQaScenarioContext; @@ -162,6 +459,15 @@ export async function waitForMatrixSyncStoreWithCursor(params: { const startedAt = Date.now(); let lastPath: string | null = null; while (Date.now() - startedAt < params.timeoutMs) { + const sqliteCursors = await readMatrixSyncCacheCursorsFromSqlite({ + context: params.context, + stateDir: params.stateDir, + ...(params.accountId ? { accountId: params.accountId } : {}), + ...(params.userId ? { userId: params.userId } : {}), + }); + if (sqliteCursors.length > 0) { + return sqliteCursors[0]; + } const pathname = await resolveBestMatrixStateFile({ context: params.context, filename: MATRIX_SYNC_STORE_FILENAME, @@ -173,7 +479,7 @@ export async function waitForMatrixSyncStoreWithCursor(params: { if (pathname) { const cursor = await readMatrixSyncStoreCursor(pathname); if (cursor) { - return { cursor, pathname }; + return { cursor, pathname, source: "json" as const }; } } await sleep(MATRIX_STATE_POLL_INTERVAL_MS); diff --git a/extensions/qa-matrix/src/substrate/e2ee-client.ts b/extensions/qa-matrix/src/substrate/e2ee-client.ts index 5e61354d4aa..cf9e1285b76 100644 --- a/extensions/qa-matrix/src/substrate/e2ee-client.ts +++ b/extensions/qa-matrix/src/substrate/e2ee-client.ts @@ -17,6 +17,12 @@ import type { MatrixVerificationSummary, MessageEventContent, } from "@openclaw/matrix/test-api.js"; +import type { + OpenKeyedStoreOptions, + PluginStateEntry, + PluginStateKeyedStore, + PluginStateSyncKeyedStore, +} from "openclaw/plugin-sdk/plugin-state-runtime"; import { buildMatrixQaMessageContent } from "./client.js"; import { findMatrixQaObservedEventMatch, normalizeMatrixQaObservedEvent } from "./events.js"; import type { MatrixQaObservedEvent } from "./events.js"; @@ -44,6 +50,144 @@ const MATRIX_QA_E2EE_SYNC_FILTER = { }, }; +type MatrixQaPluginStateValue = { + createdAt: number; + expiresAt?: number; + value: unknown; +}; + +const matrixQaPluginStateNamespaces = new Map>(); + +function resolveMatrixQaPluginStateNamespaceKey(options: OpenKeyedStoreOptions): string { + return `${options.env?.OPENCLAW_STATE_DIR ?? ""}\0${options.namespace}`; +} + +function resolveMatrixQaPluginStateRows( + options: OpenKeyedStoreOptions, +): Map { + const namespaceKey = resolveMatrixQaPluginStateNamespaceKey(options); + let rows = matrixQaPluginStateNamespaces.get(namespaceKey); + if (!rows) { + rows = new Map(); + matrixQaPluginStateNamespaces.set(namespaceKey, rows); + } + return rows; +} + +function pruneMatrixQaExpiredPluginState(rows: Map): void { + const now = Date.now(); + for (const [key, row] of rows) { + if (row.expiresAt !== undefined && row.expiresAt <= now) { + rows.delete(key); + } + } +} + +function enforceMatrixQaPluginStateLimit( + rows: Map, + maxEntries: number, + nextKey: string, +): void { + if (rows.has(nextKey)) { + return; + } + while (rows.size >= maxEntries) { + const oldest = [...rows.entries()].toSorted( + (a, b) => a[1].createdAt - b[1].createdAt || a[0].localeCompare(b[0]), + )[0]?.[0]; + if (!oldest) { + return; + } + rows.delete(oldest); + } +} + +function createMatrixQaPluginStateSyncKeyedStore( + options: OpenKeyedStoreOptions, +): PluginStateSyncKeyedStore { + const rows = resolveMatrixQaPluginStateRows(options); + const resolveExpiresAt = (ttlMs?: number) => { + const effectiveTtlMs = ttlMs ?? options.defaultTtlMs; + return effectiveTtlMs === undefined ? undefined : Date.now() + effectiveTtlMs; + }; + const register = (key: string, value: T, opts?: { ttlMs?: number }) => { + pruneMatrixQaExpiredPluginState(rows); + enforceMatrixQaPluginStateLimit(rows, options.maxEntries, key); + rows.set(key, { + createdAt: rows.get(key)?.createdAt ?? Date.now(), + expiresAt: resolveExpiresAt(opts?.ttlMs), + value, + }); + }; + return { + register, + registerIfAbsent(key, value, opts) { + pruneMatrixQaExpiredPluginState(rows); + if (rows.has(key)) { + return false; + } + register(key, value, opts); + return true; + }, + update(key, updateValue, opts) { + pruneMatrixQaExpiredPluginState(rows); + const next = updateValue(rows.get(key)?.value as T | undefined); + if (next === undefined) { + return false; + } + register(key, next, opts); + return true; + }, + lookup(key) { + pruneMatrixQaExpiredPluginState(rows); + return rows.get(key)?.value as T | undefined; + }, + consume(key) { + pruneMatrixQaExpiredPluginState(rows); + const value = rows.get(key)?.value as T | undefined; + rows.delete(key); + return value; + }, + delete(key) { + pruneMatrixQaExpiredPluginState(rows); + return rows.delete(key); + }, + entries() { + pruneMatrixQaExpiredPluginState(rows); + return [...rows.entries()].map(([key, row]): PluginStateEntry => { + const entry: PluginStateEntry = { + key, + value: row.value as T, + createdAt: row.createdAt, + }; + if (row.expiresAt !== undefined) { + entry.expiresAt = row.expiresAt; + } + return entry; + }); + }, + clear() { + rows.clear(); + }, + }; +} + +function createMatrixQaPluginStateKeyedStore( + options: OpenKeyedStoreOptions, +): PluginStateKeyedStore { + const syncStore = createMatrixQaPluginStateSyncKeyedStore(options); + return { + register: async (...args) => syncStore.register(...args), + registerIfAbsent: async (...args) => syncStore.registerIfAbsent(...args), + update: async (...args) => syncStore.update?.(...args) ?? false, + lookup: async (...args) => syncStore.lookup(...args), + consume: async (...args) => syncStore.consume(...args), + delete: async (...args) => syncStore.delete(...args), + entries: async () => syncStore.entries(), + clear: async () => syncStore.clear(), + }; +} + function shouldRecordMatrixQaObservedEventUpdate(params: { next: MatrixQaObservedEvent; previous: MatrixQaObservedEvent | undefined; @@ -193,6 +337,20 @@ async function createMatrixQaE2eeMatrixClient(params: MatrixQaE2eeClientParams) outputDir: params.outputDir, scenarioId: params.scenarioId, }); + runtime.setMatrixRuntime({ + config: { + current: () => ({}), + mutateConfigFile: async () => ({}), + replaceConfigFile: async () => ({}), + }, + state: { + resolveStateDir: () => params.outputDir, + openKeyedStore: (options: OpenKeyedStoreOptions) => + createMatrixQaPluginStateKeyedStore(options), + openSyncKeyedStore: (options: OpenKeyedStoreOptions) => + createMatrixQaPluginStateSyncKeyedStore(options), + }, + } as never); return new runtime.MatrixClient(params.baseUrl, params.accessToken, { autoBootstrapCrypto: false, cryptoDatabasePrefix: storage.cryptoDatabasePrefix, @@ -203,7 +361,7 @@ async function createMatrixQaE2eeMatrixClient(params: MatrixQaE2eeClientParams) password: params.password, recoveryKeyPath: storage.recoveryKeyPath, ssrfPolicy: { allowPrivateNetwork: true }, - storagePath: storage.storagePath, + storageRootDir: path.dirname(storage.storagePath), syncFilter: MATRIX_QA_E2EE_SYNC_FILTER, userId: params.userId, });