mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 05:26:16 +00:00
fix: persist matrix sync store as blob
This commit is contained in:
@@ -2,7 +2,10 @@ import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { ISyncResponse } from "matrix-js-sdk/lib/matrix.js";
|
||||
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-runtime";
|
||||
import {
|
||||
resetPluginBlobStoreForTests,
|
||||
resetPluginStateStoreForTests,
|
||||
} from "openclaw/plugin-sdk/plugin-state-runtime";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { SqliteBackedMatrixSyncStore, parsePersistedMatrixSyncStore } from "./sqlite-sync-store.js";
|
||||
import { readMatrixStorageMetadata, writeMatrixStorageMetadata } from "./storage-meta-state.js";
|
||||
@@ -67,6 +70,7 @@ describe("SqliteBackedMatrixSyncStore", () => {
|
||||
vi.unstubAllEnvs();
|
||||
vi.useRealTimers();
|
||||
resetPluginStateStoreForTests();
|
||||
resetPluginBlobStoreForTests();
|
||||
for (const dir of tempDirs.splice(0)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
@@ -228,6 +232,44 @@ describe("SqliteBackedMatrixSyncStore", () => {
|
||||
await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true });
|
||||
});
|
||||
|
||||
it("persists large sync payloads outside the keyed state size limit", async () => {
|
||||
const storageRoot = createStorageRoot();
|
||||
const syncResponse = createSyncResponse("large-token");
|
||||
const join = syncResponse.rooms?.join ?? {};
|
||||
for (let index = 0; index < 900; index += 1) {
|
||||
join[`!room-${index}:example.org`] = {
|
||||
summary: { "m.heroes": [`@hero-${index}:example.org`] },
|
||||
state: { events: [] },
|
||||
timeline: {
|
||||
events: [
|
||||
{
|
||||
content: {
|
||||
body: "x".repeat(128),
|
||||
msgtype: "m.text",
|
||||
},
|
||||
event_id: `$message-${index}`,
|
||||
origin_server_ts: index,
|
||||
sender: "@user:example.org",
|
||||
type: "m.room.message",
|
||||
},
|
||||
],
|
||||
prev_batch: `t${index}`,
|
||||
},
|
||||
ephemeral: { events: [] },
|
||||
account_data: { events: [] },
|
||||
unread_notifications: {},
|
||||
};
|
||||
}
|
||||
|
||||
const store = new SqliteBackedMatrixSyncStore(storageRoot);
|
||||
await store.setSyncData(syncResponse);
|
||||
await store.flush();
|
||||
|
||||
const persisted = new SqliteBackedMatrixSyncStore(storageRoot);
|
||||
await expect(persisted.getSavedSyncToken()).resolves.toBe("large-token");
|
||||
expect((await persisted.getSavedSync())?.roomsData.join["!room-899:example.org"]).toBeDefined();
|
||||
});
|
||||
|
||||
it("parses legacy raw sync payloads for doctor migration", () => {
|
||||
const parsed = parsePersistedMatrixSyncStore(
|
||||
JSON.stringify({
|
||||
|
||||
@@ -9,7 +9,7 @@ import {
|
||||
type ISyncResponse,
|
||||
type IStoredClientOpts,
|
||||
} from "matrix-js-sdk/lib/matrix.js";
|
||||
import { createPluginStateSyncKeyedStore } from "openclaw/plugin-sdk/plugin-state-runtime";
|
||||
import { createPluginBlobSyncStore } from "openclaw/plugin-sdk/plugin-state-runtime";
|
||||
import { isRecord } from "../../record-shared.js";
|
||||
import { createAsyncLock } from "../async-lock.js";
|
||||
import { LogService } from "../sdk/logger.js";
|
||||
@@ -26,7 +26,15 @@ type PersistedMatrixSyncStore = {
|
||||
cleanShutdown?: boolean;
|
||||
};
|
||||
|
||||
const SYNC_STORE = createPluginStateSyncKeyedStore<PersistedMatrixSyncStore>("matrix", {
|
||||
type PersistedMatrixSyncStoreMetadata = {
|
||||
version: number;
|
||||
cleanShutdown?: boolean;
|
||||
hasSavedSync: boolean;
|
||||
nextBatch?: string;
|
||||
payloadBytes: number;
|
||||
};
|
||||
|
||||
const SYNC_STORE = createPluginBlobSyncStore<PersistedMatrixSyncStoreMetadata>("matrix", {
|
||||
namespace: MATRIX_SYNC_STORE_NAMESPACE,
|
||||
maxEntries: 1000,
|
||||
});
|
||||
@@ -157,9 +165,12 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
|
||||
let restoredClientOptions: IStoredClientOpts | undefined;
|
||||
let restoredCleanShutdown = false;
|
||||
const persisted = SYNC_STORE.lookup(resolveMatrixSyncStoreKey(this.rootDir));
|
||||
restoredSavedSync = persisted?.savedSync ?? null;
|
||||
restoredClientOptions = persisted?.clientOptions;
|
||||
restoredCleanShutdown = persisted?.cleanShutdown === true;
|
||||
const persistedPayload = persisted
|
||||
? parsePersistedMatrixSyncStore(persisted.blob.toString("utf8"))
|
||||
: null;
|
||||
restoredSavedSync = persistedPayload?.savedSync ?? null;
|
||||
restoredClientOptions = persistedPayload?.clientOptions;
|
||||
restoredCleanShutdown = persistedPayload?.cleanShutdown === true;
|
||||
|
||||
this.savedSync = restoredSavedSync;
|
||||
this.savedClientOptions = restoredClientOptions;
|
||||
@@ -282,9 +293,20 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
|
||||
cleanShutdown: this.cleanShutdown,
|
||||
...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}),
|
||||
});
|
||||
const blob = Buffer.from(JSON.stringify(payload));
|
||||
try {
|
||||
await this.persistLock(async () => {
|
||||
SYNC_STORE.register(resolveMatrixSyncStoreKey(this.rootDir), payload);
|
||||
SYNC_STORE.register(
|
||||
resolveMatrixSyncStoreKey(this.rootDir),
|
||||
{
|
||||
version: STORE_VERSION,
|
||||
cleanShutdown: payload.cleanShutdown === true,
|
||||
hasSavedSync: payload.savedSync !== null,
|
||||
...(payload.savedSync?.nextBatch ? { nextBatch: payload.savedSync.nextBatch } : {}),
|
||||
payloadBytes: blob.byteLength,
|
||||
},
|
||||
blob,
|
||||
);
|
||||
claimCurrentTokenStorageState({
|
||||
rootDir: this.rootDir,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user