fix: persist matrix sync store as blob

This commit is contained in:
Peter Steinberger
2026-05-16 04:48:29 +01:00
parent c1adf398c3
commit 6cc30c8661
2 changed files with 71 additions and 7 deletions

View File

@@ -2,7 +2,10 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { ISyncResponse } from "matrix-js-sdk/lib/matrix.js";
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
resetPluginBlobStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-runtime";
import { afterEach, describe, expect, it, vi } from "vitest";
import { SqliteBackedMatrixSyncStore, parsePersistedMatrixSyncStore } from "./sqlite-sync-store.js";
import { readMatrixStorageMetadata, writeMatrixStorageMetadata } from "./storage-meta-state.js";
@@ -67,6 +70,7 @@ describe("SqliteBackedMatrixSyncStore", () => {
vi.unstubAllEnvs();
vi.useRealTimers();
resetPluginStateStoreForTests();
resetPluginBlobStoreForTests();
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { recursive: true, force: true });
}
@@ -228,6 +232,44 @@ describe("SqliteBackedMatrixSyncStore", () => {
await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true });
});
it("persists large sync payloads outside the keyed state size limit", async () => {
const storageRoot = createStorageRoot();
const syncResponse = createSyncResponse("large-token");
const join = syncResponse.rooms?.join ?? {};
for (let index = 0; index < 900; index += 1) {
join[`!room-${index}:example.org`] = {
summary: { "m.heroes": [`@hero-${index}:example.org`] },
state: { events: [] },
timeline: {
events: [
{
content: {
body: "x".repeat(128),
msgtype: "m.text",
},
event_id: `$message-${index}`,
origin_server_ts: index,
sender: "@user:example.org",
type: "m.room.message",
},
],
prev_batch: `t${index}`,
},
ephemeral: { events: [] },
account_data: { events: [] },
unread_notifications: {},
};
}
const store = new SqliteBackedMatrixSyncStore(storageRoot);
await store.setSyncData(syncResponse);
await store.flush();
const persisted = new SqliteBackedMatrixSyncStore(storageRoot);
await expect(persisted.getSavedSyncToken()).resolves.toBe("large-token");
expect((await persisted.getSavedSync())?.roomsData.join["!room-899:example.org"]).toBeDefined();
});
it("parses legacy raw sync payloads for doctor migration", () => {
const parsed = parsePersistedMatrixSyncStore(
JSON.stringify({

View File

@@ -9,7 +9,7 @@ import {
type ISyncResponse,
type IStoredClientOpts,
} from "matrix-js-sdk/lib/matrix.js";
import { createPluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { createPluginBlobSyncStore } from "openclaw/plugin-sdk/plugin-state-runtime";
import { isRecord } from "../../record-shared.js";
import { createAsyncLock } from "../async-lock.js";
import { LogService } from "../sdk/logger.js";
@@ -26,7 +26,15 @@ type PersistedMatrixSyncStore = {
cleanShutdown?: boolean;
};
const SYNC_STORE = createPluginStateSyncKeyedStore<PersistedMatrixSyncStore>("matrix", {
type PersistedMatrixSyncStoreMetadata = {
version: number;
cleanShutdown?: boolean;
hasSavedSync: boolean;
nextBatch?: string;
payloadBytes: number;
};
const SYNC_STORE = createPluginBlobSyncStore<PersistedMatrixSyncStoreMetadata>("matrix", {
namespace: MATRIX_SYNC_STORE_NAMESPACE,
maxEntries: 1000,
});
@@ -157,9 +165,12 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
let restoredClientOptions: IStoredClientOpts | undefined;
let restoredCleanShutdown = false;
const persisted = SYNC_STORE.lookup(resolveMatrixSyncStoreKey(this.rootDir));
restoredSavedSync = persisted?.savedSync ?? null;
restoredClientOptions = persisted?.clientOptions;
restoredCleanShutdown = persisted?.cleanShutdown === true;
const persistedPayload = persisted
? parsePersistedMatrixSyncStore(persisted.blob.toString("utf8"))
: null;
restoredSavedSync = persistedPayload?.savedSync ?? null;
restoredClientOptions = persistedPayload?.clientOptions;
restoredCleanShutdown = persistedPayload?.cleanShutdown === true;
this.savedSync = restoredSavedSync;
this.savedClientOptions = restoredClientOptions;
@@ -282,9 +293,20 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
cleanShutdown: this.cleanShutdown,
...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}),
});
const blob = Buffer.from(JSON.stringify(payload));
try {
await this.persistLock(async () => {
SYNC_STORE.register(resolveMatrixSyncStoreKey(this.rootDir), payload);
SYNC_STORE.register(
resolveMatrixSyncStoreKey(this.rootDir),
{
version: STORE_VERSION,
cleanShutdown: payload.cleanShutdown === true,
hasSavedSync: payload.savedSync !== null,
...(payload.savedSync?.nextBatch ? { nextBatch: payload.savedSync.nextBatch } : {}),
payloadBytes: blob.byteLength,
},
blob,
);
claimCurrentTokenStorageState({
rootDir: this.rootDir,
});