From 6cc30c86613f440cfecf32f511ff2a5ef4e697a5 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 16 May 2026 04:48:29 +0100 Subject: [PATCH] fix: persist matrix sync store as blob --- .../matrix/client/sqlite-sync-store.test.ts | 44 ++++++++++++++++++- .../src/matrix/client/sqlite-sync-store.ts | 34 +++++++++++--- 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/extensions/matrix/src/matrix/client/sqlite-sync-store.test.ts b/extensions/matrix/src/matrix/client/sqlite-sync-store.test.ts index 39a4909ddbf..89cb11afb71 100644 --- a/extensions/matrix/src/matrix/client/sqlite-sync-store.test.ts +++ b/extensions/matrix/src/matrix/client/sqlite-sync-store.test.ts @@ -2,7 +2,10 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import type { ISyncResponse } from "matrix-js-sdk/lib/matrix.js"; -import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { + resetPluginBlobStoreForTests, + resetPluginStateStoreForTests, +} from "openclaw/plugin-sdk/plugin-state-runtime"; import { afterEach, describe, expect, it, vi } from "vitest"; import { SqliteBackedMatrixSyncStore, parsePersistedMatrixSyncStore } from "./sqlite-sync-store.js"; import { readMatrixStorageMetadata, writeMatrixStorageMetadata } from "./storage-meta-state.js"; @@ -67,6 +70,7 @@ describe("SqliteBackedMatrixSyncStore", () => { vi.unstubAllEnvs(); vi.useRealTimers(); resetPluginStateStoreForTests(); + resetPluginBlobStoreForTests(); for (const dir of tempDirs.splice(0)) { fs.rmSync(dir, { recursive: true, force: true }); } @@ -228,6 +232,44 @@ describe("SqliteBackedMatrixSyncStore", () => { await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true }); }); + it("persists large sync payloads outside the keyed state size limit", async () => { + const storageRoot = createStorageRoot(); + const syncResponse = createSyncResponse("large-token"); + const join = syncResponse.rooms?.join ?? {}; + for (let index = 0; index < 900; index += 1) { + join[`!room-${index}:example.org`] = { + summary: { "m.heroes": [`@hero-${index}:example.org`] }, + state: { events: [] }, + timeline: { + events: [ + { + content: { + body: "x".repeat(128), + msgtype: "m.text", + }, + event_id: `$message-${index}`, + origin_server_ts: index, + sender: "@user:example.org", + type: "m.room.message", + }, + ], + prev_batch: `t${index}`, + }, + ephemeral: { events: [] }, + account_data: { events: [] }, + unread_notifications: {}, + }; + } + + const store = new SqliteBackedMatrixSyncStore(storageRoot); + await store.setSyncData(syncResponse); + await store.flush(); + + const persisted = new SqliteBackedMatrixSyncStore(storageRoot); + await expect(persisted.getSavedSyncToken()).resolves.toBe("large-token"); + expect((await persisted.getSavedSync())?.roomsData.join["!room-899:example.org"]).toBeDefined(); + }); + it("parses legacy raw sync payloads for doctor migration", () => { const parsed = parsePersistedMatrixSyncStore( JSON.stringify({ diff --git a/extensions/matrix/src/matrix/client/sqlite-sync-store.ts b/extensions/matrix/src/matrix/client/sqlite-sync-store.ts index 3f2984b5c68..a2db36ae052 100644 --- a/extensions/matrix/src/matrix/client/sqlite-sync-store.ts +++ b/extensions/matrix/src/matrix/client/sqlite-sync-store.ts @@ -9,7 +9,7 @@ import { type ISyncResponse, type IStoredClientOpts, } from "matrix-js-sdk/lib/matrix.js"; -import { createPluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime"; +import { createPluginBlobSyncStore } from "openclaw/plugin-sdk/plugin-state-runtime"; import { isRecord } from "../../record-shared.js"; import { createAsyncLock } from "../async-lock.js"; import { LogService } from "../sdk/logger.js"; @@ -26,7 +26,15 @@ type PersistedMatrixSyncStore = { cleanShutdown?: boolean; }; -const SYNC_STORE = createPluginStateSyncKeyedStore("matrix", { +type PersistedMatrixSyncStoreMetadata = { + version: number; + cleanShutdown?: boolean; + hasSavedSync: boolean; + nextBatch?: string; + payloadBytes: number; +}; + +const SYNC_STORE = createPluginBlobSyncStore("matrix", { namespace: MATRIX_SYNC_STORE_NAMESPACE, maxEntries: 1000, }); @@ -157,9 +165,12 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore { let restoredClientOptions: IStoredClientOpts | undefined; let restoredCleanShutdown = false; const persisted = SYNC_STORE.lookup(resolveMatrixSyncStoreKey(this.rootDir)); - restoredSavedSync = persisted?.savedSync ?? null; - restoredClientOptions = persisted?.clientOptions; - restoredCleanShutdown = persisted?.cleanShutdown === true; + const persistedPayload = persisted + ? parsePersistedMatrixSyncStore(persisted.blob.toString("utf8")) + : null; + restoredSavedSync = persistedPayload?.savedSync ?? null; + restoredClientOptions = persistedPayload?.clientOptions; + restoredCleanShutdown = persistedPayload?.cleanShutdown === true; this.savedSync = restoredSavedSync; this.savedClientOptions = restoredClientOptions; @@ -282,9 +293,20 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore { cleanShutdown: this.cleanShutdown, ...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}), }); + const blob = Buffer.from(JSON.stringify(payload)); try { await this.persistLock(async () => { - SYNC_STORE.register(resolveMatrixSyncStoreKey(this.rootDir), payload); + SYNC_STORE.register( + resolveMatrixSyncStoreKey(this.rootDir), + { + version: STORE_VERSION, + cleanShutdown: payload.cleanShutdown === true, + hasSavedSync: payload.savedSync !== null, + ...(payload.savedSync?.nextBatch ? { nextBatch: payload.savedSync.nextBatch } : {}), + payloadBytes: blob.byteLength, + }, + blob, + ); claimCurrentTokenStorageState({ rootDir: this.rootDir, });