From 5730f69ca1959097142ffa7877a3ea6ecdf94f70 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Sat, 14 Mar 2026 20:10:36 +0000 Subject: [PATCH] Matrix: persist sync state across restarts --- .../matrix/src/matrix/client/create-client.ts | 1 + .../src/matrix/client/file-sync-store.test.ts | 197 ++++++++++++++ .../src/matrix/client/file-sync-store.ts | 243 ++++++++++++++++++ .../matrix/monitor/handler.test-helpers.ts | 2 + .../matrix/src/matrix/monitor/handler.test.ts | 58 +++++ .../matrix/src/matrix/monitor/handler.ts | 22 +- .../matrix/src/matrix/monitor/index.test.ts | 26 +- extensions/matrix/src/matrix/monitor/index.ts | 4 + extensions/matrix/src/matrix/sdk.test.ts | 42 ++- extensions/matrix/src/matrix/sdk.ts | 20 +- 10 files changed, 592 insertions(+), 23 deletions(-) create mode 100644 extensions/matrix/src/matrix/client/file-sync-store.test.ts create mode 100644 extensions/matrix/src/matrix/client/file-sync-store.ts diff --git a/extensions/matrix/src/matrix/client/create-client.ts b/extensions/matrix/src/matrix/client/create-client.ts index 37fd64e5e87..5f5cb9d9db6 100644 --- a/extensions/matrix/src/matrix/client/create-client.ts +++ b/extensions/matrix/src/matrix/client/create-client.ts @@ -57,6 +57,7 @@ export async function createMatrixClient(params: { encryption: params.encryption, localTimeoutMs: params.localTimeoutMs, initialSyncLimit: params.initialSyncLimit, + storagePath: storagePaths.storagePath, recoveryKeyPath: storagePaths.recoveryKeyPath, idbSnapshotPath: storagePaths.idbSnapshotPath, cryptoDatabasePrefix, diff --git a/extensions/matrix/src/matrix/client/file-sync-store.test.ts b/extensions/matrix/src/matrix/client/file-sync-store.test.ts new file mode 100644 index 00000000000..85d61580a17 --- /dev/null +++ b/extensions/matrix/src/matrix/client/file-sync-store.test.ts @@ -0,0 +1,197 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import type { ISyncResponse } from "matrix-js-sdk"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import * as jsonFiles from "../../../../../src/infra/json-files.js"; +import { FileBackedMatrixSyncStore } from "./file-sync-store.js"; + +function createSyncResponse(nextBatch: string): ISyncResponse { + return { + next_batch: nextBatch, + rooms: { + join: { + "!room:example.org": { + summary: {}, + state: { events: [] }, + timeline: { + events: [ + { + content: { + body: "hello", + msgtype: "m.text", + }, + event_id: "$message", + origin_server_ts: 1, + sender: "@user:example.org", + type: "m.room.message", + }, + ], + prev_batch: "t0", + }, + ephemeral: { events: [] }, + account_data: { events: [] }, + unread_notifications: {}, + }, + }, + }, + account_data: { + events: [ + { + content: { theme: "dark" }, + type: "com.openclaw.test", + }, + ], + }, + }; +} + +function createDeferred() { + let resolve!: () => void; + const promise = new Promise((resolvePromise) => { + resolve = resolvePromise; + }); + return { promise, resolve }; +} + +describe("FileBackedMatrixSyncStore", () => { + const tempDirs: string[] = []; + + afterEach(() => { + vi.restoreAllMocks(); + vi.useRealTimers(); + for (const dir of tempDirs.splice(0)) { + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + it("persists sync data so restart resumes from the saved cursor", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + + const firstStore = new FileBackedMatrixSyncStore(storagePath); + expect(firstStore.hasSavedSync()).toBe(false); + await firstStore.setSyncData(createSyncResponse("s123")); + await firstStore.flush(); + + const secondStore = new FileBackedMatrixSyncStore(storagePath); + expect(secondStore.hasSavedSync()).toBe(true); + await expect(secondStore.getSavedSyncToken()).resolves.toBe("s123"); + + const savedSync = await secondStore.getSavedSync(); + expect(savedSync?.nextBatch).toBe("s123"); + expect(savedSync?.accountData).toEqual([ + { + content: { theme: "dark" }, + type: "com.openclaw.test", + }, + ]); + expect(savedSync?.roomsData.join?.["!room:example.org"]).toBeTruthy(); + }); + + it("coalesces background persistence until the debounce window elapses", async () => { + vi.useFakeTimers(); + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + const writeSpy = vi.spyOn(jsonFiles, "writeJsonAtomic").mockResolvedValue(); + + const store = new FileBackedMatrixSyncStore(storagePath); + await store.setSyncData(createSyncResponse("s111")); + await store.setSyncData(createSyncResponse("s222")); + await store.storeClientOptions({ lazyLoadMembers: true }); + + expect(writeSpy).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(249); + expect(writeSpy).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(writeSpy).toHaveBeenCalledTimes(1); + expect(writeSpy).toHaveBeenCalledWith( + storagePath, + expect.objectContaining({ + savedSync: expect.objectContaining({ + nextBatch: "s222", + }), + clientOptions: { + lazyLoadMembers: true, + }, + }), + expect.any(Object), + ); + }); + + it("waits for an in-flight persist when shutdown flush runs", async () => { + vi.useFakeTimers(); + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + const writeDeferred = createDeferred(); + const writeSpy = vi + .spyOn(jsonFiles, "writeJsonAtomic") + .mockImplementation(async () => writeDeferred.promise); + + const store = new FileBackedMatrixSyncStore(storagePath); + await store.setSyncData(createSyncResponse("s777")); + await vi.advanceTimersByTimeAsync(250); + + let flushCompleted = false; + const flushPromise = store.flush().then(() => { + flushCompleted = true; + }); + + await Promise.resolve(); + expect(writeSpy).toHaveBeenCalledTimes(1); + expect(flushCompleted).toBe(false); + + writeDeferred.resolve(); + await flushPromise; + expect(flushCompleted).toBe(true); + }); + + it("persists client options alongside sync state", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + + const firstStore = new FileBackedMatrixSyncStore(storagePath); + await firstStore.storeClientOptions({ lazyLoadMembers: true }); + await firstStore.flush(); + + const secondStore = new FileBackedMatrixSyncStore(storagePath); + await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true }); + }); + + it("loads legacy raw sync payloads from bot-storage.json", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-")); + tempDirs.push(tempDir); + const storagePath = path.join(tempDir, "bot-storage.json"); + + fs.writeFileSync( + storagePath, + JSON.stringify({ + next_batch: "legacy-token", + rooms: { + join: {}, + }, + account_data: { + events: [], + }, + }), + "utf8", + ); + + const store = new FileBackedMatrixSyncStore(storagePath); + expect(store.hasSavedSync()).toBe(true); + await expect(store.getSavedSyncToken()).resolves.toBe("legacy-token"); + await expect(store.getSavedSync()).resolves.toMatchObject({ + nextBatch: "legacy-token", + roomsData: { + join: {}, + }, + accountData: [], + }); + }); +}); diff --git a/extensions/matrix/src/matrix/client/file-sync-store.ts b/extensions/matrix/src/matrix/client/file-sync-store.ts new file mode 100644 index 00000000000..25a2d4e07f8 --- /dev/null +++ b/extensions/matrix/src/matrix/client/file-sync-store.ts @@ -0,0 +1,243 @@ +import { readFileSync } from "node:fs"; +import fs from "node:fs/promises"; +import { + MemoryStore, + SyncAccumulator, + type ISyncData, + type ISyncResponse, + type IStoredClientOpts, +} from "matrix-js-sdk"; +import { createAsyncLock, writeJsonAtomic } from "../../../../../src/infra/json-files.js"; +import { LogService } from "../sdk/logger.js"; + +const STORE_VERSION = 1; +const PERSIST_DEBOUNCE_MS = 250; + +type PersistedMatrixSyncStore = { + version: number; + savedSync: ISyncData | null; + clientOptions?: IStoredClientOpts; +}; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null; +} + +function toPersistedSyncData(value: unknown): ISyncData | null { + if (!isRecord(value)) { + return null; + } + if (typeof value.nextBatch === "string" && value.nextBatch.trim()) { + if (!Array.isArray(value.accountData) || !isRecord(value.roomsData)) { + return null; + } + return { + nextBatch: value.nextBatch, + accountData: value.accountData, + roomsData: value.roomsData, + } as ISyncData; + } + + // Older Matrix state files stored the raw /sync-shaped payload directly. + if (typeof value.next_batch === "string" && value.next_batch.trim()) { + return { + nextBatch: value.next_batch, + accountData: + isRecord(value.account_data) && Array.isArray(value.account_data.events) + ? value.account_data.events + : [], + roomsData: isRecord(value.rooms) ? value.rooms : {}, + } as ISyncData; + } + + return null; +} + +function readPersistedStore(raw: string): PersistedMatrixSyncStore | null { + try { + const parsed = JSON.parse(raw) as { + version?: unknown; + savedSync?: unknown; + clientOptions?: unknown; + }; + const savedSync = toPersistedSyncData(parsed.savedSync); + if (parsed.version === STORE_VERSION) { + return { + version: STORE_VERSION, + savedSync, + clientOptions: isRecord(parsed.clientOptions) + ? (parsed.clientOptions as IStoredClientOpts) + : undefined, + }; + } + + // Backward-compat: prior Matrix state files stored the raw sync blob at the + // top level without versioning or wrapped metadata. + return { + version: STORE_VERSION, + savedSync: toPersistedSyncData(parsed), + }; + } catch { + return null; + } +} + +function cloneJson(value: T): T { + return structuredClone(value); +} + +function syncDataToSyncResponse(syncData: ISyncData): ISyncResponse { + return { + next_batch: syncData.nextBatch, + rooms: syncData.roomsData, + account_data: { + events: syncData.accountData, + }, + }; +} + +export class FileBackedMatrixSyncStore extends MemoryStore { + private readonly persistLock = createAsyncLock(); + private readonly accumulator = new SyncAccumulator(); + private savedSync: ISyncData | null = null; + private savedClientOptions: IStoredClientOpts | undefined; + private readonly hadSavedSyncOnLoad: boolean; + private dirty = false; + private persistTimer: NodeJS.Timeout | null = null; + private persistPromise: Promise | null = null; + + constructor(private readonly storagePath: string) { + super(); + + let restoredSavedSync: ISyncData | null = null; + let restoredClientOptions: IStoredClientOpts | undefined; + try { + const raw = readFileSync(this.storagePath, "utf8"); + const persisted = readPersistedStore(raw); + restoredSavedSync = persisted?.savedSync ?? null; + restoredClientOptions = persisted?.clientOptions; + } catch { + // Missing or unreadable sync cache should not block startup. + } + + this.savedSync = restoredSavedSync; + this.savedClientOptions = restoredClientOptions; + this.hadSavedSyncOnLoad = restoredSavedSync !== null; + + if (this.savedSync) { + this.accumulator.accumulate(syncDataToSyncResponse(this.savedSync), true); + super.setSyncToken(this.savedSync.nextBatch); + } + if (this.savedClientOptions) { + void super.storeClientOptions(this.savedClientOptions); + } + } + + hasSavedSync(): boolean { + return this.hadSavedSyncOnLoad; + } + + override getSavedSync(): Promise { + return Promise.resolve(this.savedSync ? cloneJson(this.savedSync) : null); + } + + override getSavedSyncToken(): Promise { + return Promise.resolve(this.savedSync?.nextBatch ?? null); + } + + override setSyncData(syncData: ISyncResponse): Promise { + this.accumulator.accumulate(syncData); + this.savedSync = this.accumulator.getJSON(); + this.markDirtyAndSchedulePersist(); + return Promise.resolve(); + } + + override getClientOptions() { + return Promise.resolve( + this.savedClientOptions ? cloneJson(this.savedClientOptions) : undefined, + ); + } + + override storeClientOptions(options: IStoredClientOpts) { + this.savedClientOptions = cloneJson(options); + void super.storeClientOptions(options); + this.markDirtyAndSchedulePersist(); + return Promise.resolve(); + } + + override save(force = false) { + if (force) { + return this.flush(); + } + return Promise.resolve(); + } + + override wantsSave(): boolean { + // We persist directly from setSyncData/storeClientOptions so the SDK's + // periodic save hook stays disabled. Shutdown uses flush() for a final sync. + return false; + } + + override async deleteAllData(): Promise { + if (this.persistTimer) { + clearTimeout(this.persistTimer); + this.persistTimer = null; + } + this.dirty = false; + await this.persistPromise?.catch(() => undefined); + await super.deleteAllData(); + this.savedSync = null; + this.savedClientOptions = undefined; + await fs.rm(this.storagePath, { force: true }).catch(() => undefined); + } + + async flush(): Promise { + if (this.persistTimer) { + clearTimeout(this.persistTimer); + this.persistTimer = null; + } + while (this.dirty || this.persistPromise) { + if (this.dirty && !this.persistPromise) { + this.persistPromise = this.persist().finally(() => { + this.persistPromise = null; + }); + } + await this.persistPromise; + } + } + + private markDirtyAndSchedulePersist(): void { + this.dirty = true; + if (this.persistTimer) { + return; + } + this.persistTimer = setTimeout(() => { + this.persistTimer = null; + void this.flush().catch((err) => { + LogService.warn("MatrixFileSyncStore", "Failed to persist Matrix sync store:", err); + }); + }, PERSIST_DEBOUNCE_MS); + this.persistTimer.unref?.(); + } + + private async persist(): Promise { + this.dirty = false; + const payload: PersistedMatrixSyncStore = { + version: STORE_VERSION, + savedSync: this.savedSync ? cloneJson(this.savedSync) : null, + ...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}), + }; + try { + await this.persistLock(async () => { + await writeJsonAtomic(this.storagePath, payload, { + mode: 0o600, + trailingNewline: true, + ensureDirMode: 0o700, + }); + }); + } catch (err) { + this.dirty = true; + throw err; + } + } +} diff --git a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts index 3517f00ac33..834b7e110a7 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test-helpers.ts @@ -34,6 +34,7 @@ type MatrixHandlerTestHarnessOptions = { mediaMaxBytes?: number; startupMs?: number; startupGraceMs?: number; + dropPreStartupMessages?: boolean; isDirectMessage?: boolean; readAllowFromStore?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["readAllowFromStore"]; upsertPairingRequest?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["upsertPairingRequest"]; @@ -172,6 +173,7 @@ export function createMatrixHandlerTestHarness( mediaMaxBytes: options.mediaMaxBytes ?? 10_000_000, startupMs: options.startupMs ?? 0, startupGraceMs: options.startupGraceMs ?? 0, + dropPreStartupMessages: options.dropPreStartupMessages ?? true, directTracker: { isDirectMessage: async () => options.isDirectMessage ?? true, }, diff --git a/extensions/matrix/src/matrix/monitor/handler.test.ts b/extensions/matrix/src/matrix/monitor/handler.test.ts index b4d5595e01d..16b2a53d7cd 100644 --- a/extensions/matrix/src/matrix/monitor/handler.test.ts +++ b/extensions/matrix/src/matrix/monitor/handler.test.ts @@ -635,4 +635,62 @@ describe("matrix monitor handler pairing account scope", () => { expect(enqueueSystemEvent).not.toHaveBeenCalled(); }); + + it("drops pre-startup dm messages on cold start", async () => { + const resolveAgentRoute = vi.fn(() => ({ + agentId: "ops", + channel: "matrix", + accountId: "ops", + sessionKey: "agent:ops:main", + mainSessionKey: "agent:ops:main", + matchedBy: "binding.account" as const, + })); + const { handler } = createMatrixHandlerTestHarness({ + resolveAgentRoute, + isDirectMessage: true, + startupMs: 1_000, + startupGraceMs: 0, + dropPreStartupMessages: true, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$old-cold-start", + body: "hello", + originServerTs: 999, + }), + ); + + expect(resolveAgentRoute).not.toHaveBeenCalled(); + }); + + it("replays pre-startup dm messages when persisted sync state exists", async () => { + const resolveAgentRoute = vi.fn(() => ({ + agentId: "ops", + channel: "matrix", + accountId: "ops", + sessionKey: "agent:ops:main", + mainSessionKey: "agent:ops:main", + matchedBy: "binding.account" as const, + })); + const { handler } = createMatrixHandlerTestHarness({ + resolveAgentRoute, + isDirectMessage: true, + startupMs: 1_000, + startupGraceMs: 0, + dropPreStartupMessages: false, + }); + + await handler( + "!room:example.org", + createMatrixTextMessageEvent({ + eventId: "$old-resume", + body: "hello", + originServerTs: 999, + }), + ); + + expect(resolveAgentRoute).toHaveBeenCalledTimes(1); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/handler.ts b/extensions/matrix/src/matrix/monitor/handler.ts index d2ea00b53ac..69241463530 100644 --- a/extensions/matrix/src/matrix/monitor/handler.ts +++ b/extensions/matrix/src/matrix/monitor/handler.ts @@ -68,6 +68,7 @@ export type MatrixMonitorHandlerParams = { mediaMaxBytes: number; startupMs: number; startupGraceMs: number; + dropPreStartupMessages: boolean; directTracker: { isDirectMessage: (params: { roomId: string; @@ -146,6 +147,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam mediaMaxBytes, startupMs, startupGraceMs, + dropPreStartupMessages, directTracker, getRoomInfo, getMemberDisplayName, @@ -239,15 +241,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam } const eventTs = event.origin_server_ts; const eventAge = event.unsigned?.age; - if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) { - return; - } - if ( - typeof eventTs !== "number" && - typeof eventAge === "number" && - eventAge > startupGraceMs - ) { - return; + if (dropPreStartupMessages) { + if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) { + return; + } + if ( + typeof eventTs !== "number" && + typeof eventAge === "number" && + eventAge > startupGraceMs + ) { + return; + } } let content = event.content as RoomMessageEventContent; diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index eba37159cc4..30d7a6d4890 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -2,7 +2,11 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const hoisted = vi.hoisted(() => { const callOrder: string[] = []; - const client = { id: "matrix-client" }; + const client = { + id: "matrix-client", + hasPersistedSyncState: vi.fn(() => false), + }; + const createMatrixRoomMessageHandler = vi.fn(() => vi.fn()); let startClientError: Error | null = null; const resolveTextChunkLimit = vi.fn< (cfg: unknown, channel: unknown, accountId?: unknown) => number @@ -19,6 +23,7 @@ const hoisted = vi.hoisted(() => { return { callOrder, client, + createMatrixRoomMessageHandler, logger, resolveTextChunkLimit, setActiveMatrixClient, @@ -176,7 +181,7 @@ vi.mock("./events.js", () => ({ })); vi.mock("./handler.js", () => ({ - createMatrixRoomMessageHandler: vi.fn(() => vi.fn()), + createMatrixRoomMessageHandler: hoisted.createMatrixRoomMessageHandler, })); vi.mock("./legacy-crypto-restore.js", () => ({ @@ -205,6 +210,8 @@ describe("monitorMatrixProvider", () => { hoisted.setActiveMatrixClient.mockReset(); hoisted.stopSharedClientInstance.mockReset(); hoisted.stopThreadBindingManager.mockReset(); + hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false); + hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn()); Object.values(hoisted.logger).forEach((mock) => mock.mockReset()); }); @@ -249,4 +256,19 @@ describe("monitorMatrixProvider", () => { expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default"); expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default"); }); + + it("disables cold-start backlog dropping when sync state already exists", async () => { + hoisted.client.hasPersistedSyncState.mockReturnValue(true); + const { monitorMatrixProvider } = await import("./index.js"); + const abortController = new AbortController(); + abortController.abort(); + + await monitorMatrixProvider({ abortSignal: abortController.signal }); + + expect(hoisted.createMatrixRoomMessageHandler).toHaveBeenCalledWith( + expect.objectContaining({ + dropPreStartupMessages: false, + }), + ); + }); }); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index e87aef44702..8eff9f740f6 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -181,6 +181,9 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024; const startupMs = Date.now(); const startupGraceMs = 0; + // Cold starts should ignore old room history, but once we have a persisted + // /sync cursor we want restart backlogs to replay just like other channels. + const dropPreStartupMessages = !client.hasPersistedSyncState(); const directTracker = createDirectRoomTracker(client, { log: logVerboseMessage }); registerMatrixAutoJoin({ client, accountConfig, runtime }); const warnedEncryptedRooms = new Set(); @@ -208,6 +211,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi mediaMaxBytes, startupMs, startupGraceMs, + dropPreStartupMessages, directTracker, getRoomInfo, getMemberDisplayName, diff --git a/extensions/matrix/src/matrix/sdk.test.ts b/extensions/matrix/src/matrix/sdk.test.ts index c70741076bb..3467f12711c 100644 --- a/extensions/matrix/src/matrix/sdk.test.ts +++ b/extensions/matrix/src/matrix/sdk.test.ts @@ -176,14 +176,18 @@ function createMatrixJsClientStub(): MatrixJsClientStub { let matrixJsClient = createMatrixJsClientStub(); let lastCreateClientOpts: Record | null = null; -vi.mock("matrix-js-sdk", () => ({ - ClientEvent: { Event: "event", Room: "Room" }, - MatrixEventEvent: { Decrypted: "decrypted" }, - createClient: vi.fn((opts: Record) => { - lastCreateClientOpts = opts; - return matrixJsClient; - }), -})); +vi.mock("matrix-js-sdk", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + ClientEvent: { Event: "event", Room: "Room" }, + MatrixEventEvent: { Decrypted: "decrypted" }, + createClient: vi.fn((opts: Record) => { + lastCreateClientOpts = opts; + return matrixJsClient; + }), + }; +}); import { MatrixClient } from "./sdk.js"; @@ -485,6 +489,28 @@ describe("MatrixClient request hardening", () => { await assertion; }); + + it("wires the sync store into the SDK and flushes it on shutdown", async () => { + const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sdk-store-")); + const storagePath = path.join(tempDir, "bot-storage.json"); + + try { + const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, { + storagePath, + }); + + const store = lastCreateClientOpts?.store as { flush: () => Promise } | undefined; + expect(store).toBeTruthy(); + const flushSpy = vi.spyOn(store!, "flush").mockResolvedValue(); + + await client.stopAndPersist(); + + expect(flushSpy).toHaveBeenCalledTimes(1); + expect(matrixJsClient.stopClient).toHaveBeenCalledTimes(1); + } finally { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); }); describe("MatrixClient event bridge", () => { diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index 1d9679ba9c2..94ac1990096 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -11,6 +11,7 @@ import { import { VerificationMethod } from "matrix-js-sdk/lib/types.js"; import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; import { resolveMatrixRoomKeyBackupReadinessError } from "./backup-health.js"; +import { FileBackedMatrixSyncStore } from "./client/file-sync-store.js"; import { createMatrixJsSdkClientLogger } from "./client/logging.js"; import { MatrixCryptoBootstrapper } from "./sdk/crypto-bootstrap.js"; import type { MatrixCryptoBootstrapResult } from "./sdk/crypto-bootstrap.js"; @@ -174,6 +175,7 @@ export class MatrixClient { private readonly initialSyncLimit?: number; private readonly encryptionEnabled: boolean; private readonly password?: string; + private readonly syncStore?: FileBackedMatrixSyncStore; private readonly idbSnapshotPath?: string; private readonly cryptoDatabasePrefix?: string; private bridgeRegistered = false; @@ -211,6 +213,7 @@ export class MatrixClient { localTimeoutMs?: number; encryption?: boolean; initialSyncLimit?: number; + storagePath?: string; recoveryKeyPath?: string; idbSnapshotPath?: string; cryptoDatabasePrefix?: string; @@ -222,6 +225,7 @@ export class MatrixClient { this.initialSyncLimit = opts.initialSyncLimit; this.encryptionEnabled = opts.encryption === true; this.password = opts.password; + this.syncStore = opts.storagePath ? new FileBackedMatrixSyncStore(opts.storagePath) : undefined; this.idbSnapshotPath = opts.idbSnapshotPath; this.cryptoDatabasePrefix = opts.cryptoDatabasePrefix; this.selfUserId = opts.userId?.trim() || null; @@ -237,6 +241,7 @@ export class MatrixClient { deviceId: opts.deviceId, logger: createMatrixJsSdkClientLogger("MatrixClient"), localTimeoutMs: this.localTimeoutMs, + store: this.syncStore, cryptoCallbacks: cryptoCallbacks as never, verificationMethods: [ VerificationMethod.Sas, @@ -343,6 +348,10 @@ export class MatrixClient { } } + hasPersistedSyncState(): boolean { + return this.syncStore?.hasSavedSync() === true; + } + private async ensureStartedForCryptoControlPlane(): Promise { if (this.started) { return; @@ -357,10 +366,13 @@ export class MatrixClient { } this.decryptBridge.stop(); // Final persist on shutdown - this.stopPersistPromise = persistIdbToDisk({ - snapshotPath: this.idbSnapshotPath, - databasePrefix: this.cryptoDatabasePrefix, - }).catch(noop); + this.stopPersistPromise = Promise.all([ + persistIdbToDisk({ + snapshotPath: this.idbSnapshotPath, + databasePrefix: this.cryptoDatabasePrefix, + }).catch(noop), + this.syncStore?.flush().catch(noop), + ]).then(() => undefined); this.client.stopClient(); this.started = false; }