mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-16 12:30:49 +00:00
Matrix: persist sync state across restarts
This commit is contained in:
@@ -57,6 +57,7 @@ export async function createMatrixClient(params: {
|
||||
encryption: params.encryption,
|
||||
localTimeoutMs: params.localTimeoutMs,
|
||||
initialSyncLimit: params.initialSyncLimit,
|
||||
storagePath: storagePaths.storagePath,
|
||||
recoveryKeyPath: storagePaths.recoveryKeyPath,
|
||||
idbSnapshotPath: storagePaths.idbSnapshotPath,
|
||||
cryptoDatabasePrefix,
|
||||
|
||||
197
extensions/matrix/src/matrix/client/file-sync-store.test.ts
Normal file
197
extensions/matrix/src/matrix/client/file-sync-store.test.ts
Normal file
@@ -0,0 +1,197 @@
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import type { ISyncResponse } from "matrix-js-sdk";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import * as jsonFiles from "../../../../../src/infra/json-files.js";
|
||||
import { FileBackedMatrixSyncStore } from "./file-sync-store.js";
|
||||
|
||||
function createSyncResponse(nextBatch: string): ISyncResponse {
|
||||
return {
|
||||
next_batch: nextBatch,
|
||||
rooms: {
|
||||
join: {
|
||||
"!room:example.org": {
|
||||
summary: {},
|
||||
state: { events: [] },
|
||||
timeline: {
|
||||
events: [
|
||||
{
|
||||
content: {
|
||||
body: "hello",
|
||||
msgtype: "m.text",
|
||||
},
|
||||
event_id: "$message",
|
||||
origin_server_ts: 1,
|
||||
sender: "@user:example.org",
|
||||
type: "m.room.message",
|
||||
},
|
||||
],
|
||||
prev_batch: "t0",
|
||||
},
|
||||
ephemeral: { events: [] },
|
||||
account_data: { events: [] },
|
||||
unread_notifications: {},
|
||||
},
|
||||
},
|
||||
},
|
||||
account_data: {
|
||||
events: [
|
||||
{
|
||||
content: { theme: "dark" },
|
||||
type: "com.openclaw.test",
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function createDeferred() {
|
||||
let resolve!: () => void;
|
||||
const promise = new Promise<void>((resolvePromise) => {
|
||||
resolve = resolvePromise;
|
||||
});
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
describe("FileBackedMatrixSyncStore", () => {
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
vi.useRealTimers();
|
||||
for (const dir of tempDirs.splice(0)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it("persists sync data so restart resumes from the saved cursor", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
|
||||
tempDirs.push(tempDir);
|
||||
const storagePath = path.join(tempDir, "bot-storage.json");
|
||||
|
||||
const firstStore = new FileBackedMatrixSyncStore(storagePath);
|
||||
expect(firstStore.hasSavedSync()).toBe(false);
|
||||
await firstStore.setSyncData(createSyncResponse("s123"));
|
||||
await firstStore.flush();
|
||||
|
||||
const secondStore = new FileBackedMatrixSyncStore(storagePath);
|
||||
expect(secondStore.hasSavedSync()).toBe(true);
|
||||
await expect(secondStore.getSavedSyncToken()).resolves.toBe("s123");
|
||||
|
||||
const savedSync = await secondStore.getSavedSync();
|
||||
expect(savedSync?.nextBatch).toBe("s123");
|
||||
expect(savedSync?.accountData).toEqual([
|
||||
{
|
||||
content: { theme: "dark" },
|
||||
type: "com.openclaw.test",
|
||||
},
|
||||
]);
|
||||
expect(savedSync?.roomsData.join?.["!room:example.org"]).toBeTruthy();
|
||||
});
|
||||
|
||||
it("coalesces background persistence until the debounce window elapses", async () => {
|
||||
vi.useFakeTimers();
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
|
||||
tempDirs.push(tempDir);
|
||||
const storagePath = path.join(tempDir, "bot-storage.json");
|
||||
const writeSpy = vi.spyOn(jsonFiles, "writeJsonAtomic").mockResolvedValue();
|
||||
|
||||
const store = new FileBackedMatrixSyncStore(storagePath);
|
||||
await store.setSyncData(createSyncResponse("s111"));
|
||||
await store.setSyncData(createSyncResponse("s222"));
|
||||
await store.storeClientOptions({ lazyLoadMembers: true });
|
||||
|
||||
expect(writeSpy).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(249);
|
||||
expect(writeSpy).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(writeSpy).toHaveBeenCalledTimes(1);
|
||||
expect(writeSpy).toHaveBeenCalledWith(
|
||||
storagePath,
|
||||
expect.objectContaining({
|
||||
savedSync: expect.objectContaining({
|
||||
nextBatch: "s222",
|
||||
}),
|
||||
clientOptions: {
|
||||
lazyLoadMembers: true,
|
||||
},
|
||||
}),
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("waits for an in-flight persist when shutdown flush runs", async () => {
|
||||
vi.useFakeTimers();
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
|
||||
tempDirs.push(tempDir);
|
||||
const storagePath = path.join(tempDir, "bot-storage.json");
|
||||
const writeDeferred = createDeferred();
|
||||
const writeSpy = vi
|
||||
.spyOn(jsonFiles, "writeJsonAtomic")
|
||||
.mockImplementation(async () => writeDeferred.promise);
|
||||
|
||||
const store = new FileBackedMatrixSyncStore(storagePath);
|
||||
await store.setSyncData(createSyncResponse("s777"));
|
||||
await vi.advanceTimersByTimeAsync(250);
|
||||
|
||||
let flushCompleted = false;
|
||||
const flushPromise = store.flush().then(() => {
|
||||
flushCompleted = true;
|
||||
});
|
||||
|
||||
await Promise.resolve();
|
||||
expect(writeSpy).toHaveBeenCalledTimes(1);
|
||||
expect(flushCompleted).toBe(false);
|
||||
|
||||
writeDeferred.resolve();
|
||||
await flushPromise;
|
||||
expect(flushCompleted).toBe(true);
|
||||
});
|
||||
|
||||
it("persists client options alongside sync state", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
|
||||
tempDirs.push(tempDir);
|
||||
const storagePath = path.join(tempDir, "bot-storage.json");
|
||||
|
||||
const firstStore = new FileBackedMatrixSyncStore(storagePath);
|
||||
await firstStore.storeClientOptions({ lazyLoadMembers: true });
|
||||
await firstStore.flush();
|
||||
|
||||
const secondStore = new FileBackedMatrixSyncStore(storagePath);
|
||||
await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true });
|
||||
});
|
||||
|
||||
it("loads legacy raw sync payloads from bot-storage.json", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
|
||||
tempDirs.push(tempDir);
|
||||
const storagePath = path.join(tempDir, "bot-storage.json");
|
||||
|
||||
fs.writeFileSync(
|
||||
storagePath,
|
||||
JSON.stringify({
|
||||
next_batch: "legacy-token",
|
||||
rooms: {
|
||||
join: {},
|
||||
},
|
||||
account_data: {
|
||||
events: [],
|
||||
},
|
||||
}),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const store = new FileBackedMatrixSyncStore(storagePath);
|
||||
expect(store.hasSavedSync()).toBe(true);
|
||||
await expect(store.getSavedSyncToken()).resolves.toBe("legacy-token");
|
||||
await expect(store.getSavedSync()).resolves.toMatchObject({
|
||||
nextBatch: "legacy-token",
|
||||
roomsData: {
|
||||
join: {},
|
||||
},
|
||||
accountData: [],
|
||||
});
|
||||
});
|
||||
});
|
||||
243
extensions/matrix/src/matrix/client/file-sync-store.ts
Normal file
243
extensions/matrix/src/matrix/client/file-sync-store.ts
Normal file
@@ -0,0 +1,243 @@
|
||||
import { readFileSync } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
import {
|
||||
MemoryStore,
|
||||
SyncAccumulator,
|
||||
type ISyncData,
|
||||
type ISyncResponse,
|
||||
type IStoredClientOpts,
|
||||
} from "matrix-js-sdk";
|
||||
import { createAsyncLock, writeJsonAtomic } from "../../../../../src/infra/json-files.js";
|
||||
import { LogService } from "../sdk/logger.js";
|
||||
|
||||
const STORE_VERSION = 1;
|
||||
const PERSIST_DEBOUNCE_MS = 250;
|
||||
|
||||
type PersistedMatrixSyncStore = {
|
||||
version: number;
|
||||
savedSync: ISyncData | null;
|
||||
clientOptions?: IStoredClientOpts;
|
||||
};
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === "object" && value !== null;
|
||||
}
|
||||
|
||||
function toPersistedSyncData(value: unknown): ISyncData | null {
|
||||
if (!isRecord(value)) {
|
||||
return null;
|
||||
}
|
||||
if (typeof value.nextBatch === "string" && value.nextBatch.trim()) {
|
||||
if (!Array.isArray(value.accountData) || !isRecord(value.roomsData)) {
|
||||
return null;
|
||||
}
|
||||
return {
|
||||
nextBatch: value.nextBatch,
|
||||
accountData: value.accountData,
|
||||
roomsData: value.roomsData,
|
||||
} as ISyncData;
|
||||
}
|
||||
|
||||
// Older Matrix state files stored the raw /sync-shaped payload directly.
|
||||
if (typeof value.next_batch === "string" && value.next_batch.trim()) {
|
||||
return {
|
||||
nextBatch: value.next_batch,
|
||||
accountData:
|
||||
isRecord(value.account_data) && Array.isArray(value.account_data.events)
|
||||
? value.account_data.events
|
||||
: [],
|
||||
roomsData: isRecord(value.rooms) ? value.rooms : {},
|
||||
} as ISyncData;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
function readPersistedStore(raw: string): PersistedMatrixSyncStore | null {
|
||||
try {
|
||||
const parsed = JSON.parse(raw) as {
|
||||
version?: unknown;
|
||||
savedSync?: unknown;
|
||||
clientOptions?: unknown;
|
||||
};
|
||||
const savedSync = toPersistedSyncData(parsed.savedSync);
|
||||
if (parsed.version === STORE_VERSION) {
|
||||
return {
|
||||
version: STORE_VERSION,
|
||||
savedSync,
|
||||
clientOptions: isRecord(parsed.clientOptions)
|
||||
? (parsed.clientOptions as IStoredClientOpts)
|
||||
: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
// Backward-compat: prior Matrix state files stored the raw sync blob at the
|
||||
// top level without versioning or wrapped metadata.
|
||||
return {
|
||||
version: STORE_VERSION,
|
||||
savedSync: toPersistedSyncData(parsed),
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function cloneJson<T>(value: T): T {
|
||||
return structuredClone(value);
|
||||
}
|
||||
|
||||
function syncDataToSyncResponse(syncData: ISyncData): ISyncResponse {
|
||||
return {
|
||||
next_batch: syncData.nextBatch,
|
||||
rooms: syncData.roomsData,
|
||||
account_data: {
|
||||
events: syncData.accountData,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export class FileBackedMatrixSyncStore extends MemoryStore {
|
||||
private readonly persistLock = createAsyncLock();
|
||||
private readonly accumulator = new SyncAccumulator();
|
||||
private savedSync: ISyncData | null = null;
|
||||
private savedClientOptions: IStoredClientOpts | undefined;
|
||||
private readonly hadSavedSyncOnLoad: boolean;
|
||||
private dirty = false;
|
||||
private persistTimer: NodeJS.Timeout | null = null;
|
||||
private persistPromise: Promise<void> | null = null;
|
||||
|
||||
constructor(private readonly storagePath: string) {
|
||||
super();
|
||||
|
||||
let restoredSavedSync: ISyncData | null = null;
|
||||
let restoredClientOptions: IStoredClientOpts | undefined;
|
||||
try {
|
||||
const raw = readFileSync(this.storagePath, "utf8");
|
||||
const persisted = readPersistedStore(raw);
|
||||
restoredSavedSync = persisted?.savedSync ?? null;
|
||||
restoredClientOptions = persisted?.clientOptions;
|
||||
} catch {
|
||||
// Missing or unreadable sync cache should not block startup.
|
||||
}
|
||||
|
||||
this.savedSync = restoredSavedSync;
|
||||
this.savedClientOptions = restoredClientOptions;
|
||||
this.hadSavedSyncOnLoad = restoredSavedSync !== null;
|
||||
|
||||
if (this.savedSync) {
|
||||
this.accumulator.accumulate(syncDataToSyncResponse(this.savedSync), true);
|
||||
super.setSyncToken(this.savedSync.nextBatch);
|
||||
}
|
||||
if (this.savedClientOptions) {
|
||||
void super.storeClientOptions(this.savedClientOptions);
|
||||
}
|
||||
}
|
||||
|
||||
hasSavedSync(): boolean {
|
||||
return this.hadSavedSyncOnLoad;
|
||||
}
|
||||
|
||||
override getSavedSync(): Promise<ISyncData | null> {
|
||||
return Promise.resolve(this.savedSync ? cloneJson(this.savedSync) : null);
|
||||
}
|
||||
|
||||
override getSavedSyncToken(): Promise<string | null> {
|
||||
return Promise.resolve(this.savedSync?.nextBatch ?? null);
|
||||
}
|
||||
|
||||
override setSyncData(syncData: ISyncResponse): Promise<void> {
|
||||
this.accumulator.accumulate(syncData);
|
||||
this.savedSync = this.accumulator.getJSON();
|
||||
this.markDirtyAndSchedulePersist();
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
override getClientOptions() {
|
||||
return Promise.resolve(
|
||||
this.savedClientOptions ? cloneJson(this.savedClientOptions) : undefined,
|
||||
);
|
||||
}
|
||||
|
||||
override storeClientOptions(options: IStoredClientOpts) {
|
||||
this.savedClientOptions = cloneJson(options);
|
||||
void super.storeClientOptions(options);
|
||||
this.markDirtyAndSchedulePersist();
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
override save(force = false) {
|
||||
if (force) {
|
||||
return this.flush();
|
||||
}
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
override wantsSave(): boolean {
|
||||
// We persist directly from setSyncData/storeClientOptions so the SDK's
|
||||
// periodic save hook stays disabled. Shutdown uses flush() for a final sync.
|
||||
return false;
|
||||
}
|
||||
|
||||
override async deleteAllData(): Promise<void> {
|
||||
if (this.persistTimer) {
|
||||
clearTimeout(this.persistTimer);
|
||||
this.persistTimer = null;
|
||||
}
|
||||
this.dirty = false;
|
||||
await this.persistPromise?.catch(() => undefined);
|
||||
await super.deleteAllData();
|
||||
this.savedSync = null;
|
||||
this.savedClientOptions = undefined;
|
||||
await fs.rm(this.storagePath, { force: true }).catch(() => undefined);
|
||||
}
|
||||
|
||||
async flush(): Promise<void> {
|
||||
if (this.persistTimer) {
|
||||
clearTimeout(this.persistTimer);
|
||||
this.persistTimer = null;
|
||||
}
|
||||
while (this.dirty || this.persistPromise) {
|
||||
if (this.dirty && !this.persistPromise) {
|
||||
this.persistPromise = this.persist().finally(() => {
|
||||
this.persistPromise = null;
|
||||
});
|
||||
}
|
||||
await this.persistPromise;
|
||||
}
|
||||
}
|
||||
|
||||
private markDirtyAndSchedulePersist(): void {
|
||||
this.dirty = true;
|
||||
if (this.persistTimer) {
|
||||
return;
|
||||
}
|
||||
this.persistTimer = setTimeout(() => {
|
||||
this.persistTimer = null;
|
||||
void this.flush().catch((err) => {
|
||||
LogService.warn("MatrixFileSyncStore", "Failed to persist Matrix sync store:", err);
|
||||
});
|
||||
}, PERSIST_DEBOUNCE_MS);
|
||||
this.persistTimer.unref?.();
|
||||
}
|
||||
|
||||
private async persist(): Promise<void> {
|
||||
this.dirty = false;
|
||||
const payload: PersistedMatrixSyncStore = {
|
||||
version: STORE_VERSION,
|
||||
savedSync: this.savedSync ? cloneJson(this.savedSync) : null,
|
||||
...(this.savedClientOptions ? { clientOptions: cloneJson(this.savedClientOptions) } : {}),
|
||||
};
|
||||
try {
|
||||
await this.persistLock(async () => {
|
||||
await writeJsonAtomic(this.storagePath, payload, {
|
||||
mode: 0o600,
|
||||
trailingNewline: true,
|
||||
ensureDirMode: 0o700,
|
||||
});
|
||||
});
|
||||
} catch (err) {
|
||||
this.dirty = true;
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,6 +34,7 @@ type MatrixHandlerTestHarnessOptions = {
|
||||
mediaMaxBytes?: number;
|
||||
startupMs?: number;
|
||||
startupGraceMs?: number;
|
||||
dropPreStartupMessages?: boolean;
|
||||
isDirectMessage?: boolean;
|
||||
readAllowFromStore?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["readAllowFromStore"];
|
||||
upsertPairingRequest?: MatrixMonitorHandlerParams["core"]["channel"]["pairing"]["upsertPairingRequest"];
|
||||
@@ -172,6 +173,7 @@ export function createMatrixHandlerTestHarness(
|
||||
mediaMaxBytes: options.mediaMaxBytes ?? 10_000_000,
|
||||
startupMs: options.startupMs ?? 0,
|
||||
startupGraceMs: options.startupGraceMs ?? 0,
|
||||
dropPreStartupMessages: options.dropPreStartupMessages ?? true,
|
||||
directTracker: {
|
||||
isDirectMessage: async () => options.isDirectMessage ?? true,
|
||||
},
|
||||
|
||||
@@ -635,4 +635,62 @@ describe("matrix monitor handler pairing account scope", () => {
|
||||
|
||||
expect(enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("drops pre-startup dm messages on cold start", async () => {
|
||||
const resolveAgentRoute = vi.fn(() => ({
|
||||
agentId: "ops",
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
sessionKey: "agent:ops:main",
|
||||
mainSessionKey: "agent:ops:main",
|
||||
matchedBy: "binding.account" as const,
|
||||
}));
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
resolveAgentRoute,
|
||||
isDirectMessage: true,
|
||||
startupMs: 1_000,
|
||||
startupGraceMs: 0,
|
||||
dropPreStartupMessages: true,
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$old-cold-start",
|
||||
body: "hello",
|
||||
originServerTs: 999,
|
||||
}),
|
||||
);
|
||||
|
||||
expect(resolveAgentRoute).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("replays pre-startup dm messages when persisted sync state exists", async () => {
|
||||
const resolveAgentRoute = vi.fn(() => ({
|
||||
agentId: "ops",
|
||||
channel: "matrix",
|
||||
accountId: "ops",
|
||||
sessionKey: "agent:ops:main",
|
||||
mainSessionKey: "agent:ops:main",
|
||||
matchedBy: "binding.account" as const,
|
||||
}));
|
||||
const { handler } = createMatrixHandlerTestHarness({
|
||||
resolveAgentRoute,
|
||||
isDirectMessage: true,
|
||||
startupMs: 1_000,
|
||||
startupGraceMs: 0,
|
||||
dropPreStartupMessages: false,
|
||||
});
|
||||
|
||||
await handler(
|
||||
"!room:example.org",
|
||||
createMatrixTextMessageEvent({
|
||||
eventId: "$old-resume",
|
||||
body: "hello",
|
||||
originServerTs: 999,
|
||||
}),
|
||||
);
|
||||
|
||||
expect(resolveAgentRoute).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -68,6 +68,7 @@ export type MatrixMonitorHandlerParams = {
|
||||
mediaMaxBytes: number;
|
||||
startupMs: number;
|
||||
startupGraceMs: number;
|
||||
dropPreStartupMessages: boolean;
|
||||
directTracker: {
|
||||
isDirectMessage: (params: {
|
||||
roomId: string;
|
||||
@@ -146,6 +147,7 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
mediaMaxBytes,
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
dropPreStartupMessages,
|
||||
directTracker,
|
||||
getRoomInfo,
|
||||
getMemberDisplayName,
|
||||
@@ -239,15 +241,17 @@ export function createMatrixRoomMessageHandler(params: MatrixMonitorHandlerParam
|
||||
}
|
||||
const eventTs = event.origin_server_ts;
|
||||
const eventAge = event.unsigned?.age;
|
||||
if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
typeof eventTs !== "number" &&
|
||||
typeof eventAge === "number" &&
|
||||
eventAge > startupGraceMs
|
||||
) {
|
||||
return;
|
||||
if (dropPreStartupMessages) {
|
||||
if (typeof eventTs === "number" && eventTs < startupMs - startupGraceMs) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
typeof eventTs !== "number" &&
|
||||
typeof eventAge === "number" &&
|
||||
eventAge > startupGraceMs
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let content = event.content as RoomMessageEventContent;
|
||||
|
||||
@@ -2,7 +2,11 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const hoisted = vi.hoisted(() => {
|
||||
const callOrder: string[] = [];
|
||||
const client = { id: "matrix-client" };
|
||||
const client = {
|
||||
id: "matrix-client",
|
||||
hasPersistedSyncState: vi.fn(() => false),
|
||||
};
|
||||
const createMatrixRoomMessageHandler = vi.fn(() => vi.fn());
|
||||
let startClientError: Error | null = null;
|
||||
const resolveTextChunkLimit = vi.fn<
|
||||
(cfg: unknown, channel: unknown, accountId?: unknown) => number
|
||||
@@ -19,6 +23,7 @@ const hoisted = vi.hoisted(() => {
|
||||
return {
|
||||
callOrder,
|
||||
client,
|
||||
createMatrixRoomMessageHandler,
|
||||
logger,
|
||||
resolveTextChunkLimit,
|
||||
setActiveMatrixClient,
|
||||
@@ -176,7 +181,7 @@ vi.mock("./events.js", () => ({
|
||||
}));
|
||||
|
||||
vi.mock("./handler.js", () => ({
|
||||
createMatrixRoomMessageHandler: vi.fn(() => vi.fn()),
|
||||
createMatrixRoomMessageHandler: hoisted.createMatrixRoomMessageHandler,
|
||||
}));
|
||||
|
||||
vi.mock("./legacy-crypto-restore.js", () => ({
|
||||
@@ -205,6 +210,8 @@ describe("monitorMatrixProvider", () => {
|
||||
hoisted.setActiveMatrixClient.mockReset();
|
||||
hoisted.stopSharedClientInstance.mockReset();
|
||||
hoisted.stopThreadBindingManager.mockReset();
|
||||
hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false);
|
||||
hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn());
|
||||
Object.values(hoisted.logger).forEach((mock) => mock.mockReset());
|
||||
});
|
||||
|
||||
@@ -249,4 +256,19 @@ describe("monitorMatrixProvider", () => {
|
||||
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(1, hoisted.client, "default");
|
||||
expect(hoisted.setActiveMatrixClient).toHaveBeenNthCalledWith(2, null, "default");
|
||||
});
|
||||
|
||||
it("disables cold-start backlog dropping when sync state already exists", async () => {
|
||||
hoisted.client.hasPersistedSyncState.mockReturnValue(true);
|
||||
const { monitorMatrixProvider } = await import("./index.js");
|
||||
const abortController = new AbortController();
|
||||
abortController.abort();
|
||||
|
||||
await monitorMatrixProvider({ abortSignal: abortController.signal });
|
||||
|
||||
expect(hoisted.createMatrixRoomMessageHandler).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
dropPreStartupMessages: false,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -181,6 +181,9 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
const mediaMaxBytes = Math.max(1, mediaMaxMb) * 1024 * 1024;
|
||||
const startupMs = Date.now();
|
||||
const startupGraceMs = 0;
|
||||
// Cold starts should ignore old room history, but once we have a persisted
|
||||
// /sync cursor we want restart backlogs to replay just like other channels.
|
||||
const dropPreStartupMessages = !client.hasPersistedSyncState();
|
||||
const directTracker = createDirectRoomTracker(client, { log: logVerboseMessage });
|
||||
registerMatrixAutoJoin({ client, accountConfig, runtime });
|
||||
const warnedEncryptedRooms = new Set<string>();
|
||||
@@ -208,6 +211,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi
|
||||
mediaMaxBytes,
|
||||
startupMs,
|
||||
startupGraceMs,
|
||||
dropPreStartupMessages,
|
||||
directTracker,
|
||||
getRoomInfo,
|
||||
getMemberDisplayName,
|
||||
|
||||
@@ -176,14 +176,18 @@ function createMatrixJsClientStub(): MatrixJsClientStub {
|
||||
let matrixJsClient = createMatrixJsClientStub();
|
||||
let lastCreateClientOpts: Record<string, unknown> | null = null;
|
||||
|
||||
vi.mock("matrix-js-sdk", () => ({
|
||||
ClientEvent: { Event: "event", Room: "Room" },
|
||||
MatrixEventEvent: { Decrypted: "decrypted" },
|
||||
createClient: vi.fn((opts: Record<string, unknown>) => {
|
||||
lastCreateClientOpts = opts;
|
||||
return matrixJsClient;
|
||||
}),
|
||||
}));
|
||||
vi.mock("matrix-js-sdk", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("matrix-js-sdk")>();
|
||||
return {
|
||||
...actual,
|
||||
ClientEvent: { Event: "event", Room: "Room" },
|
||||
MatrixEventEvent: { Decrypted: "decrypted" },
|
||||
createClient: vi.fn((opts: Record<string, unknown>) => {
|
||||
lastCreateClientOpts = opts;
|
||||
return matrixJsClient;
|
||||
}),
|
||||
};
|
||||
});
|
||||
|
||||
import { MatrixClient } from "./sdk.js";
|
||||
|
||||
@@ -485,6 +489,28 @@ describe("MatrixClient request hardening", () => {
|
||||
|
||||
await assertion;
|
||||
});
|
||||
|
||||
it("wires the sync store into the SDK and flushes it on shutdown", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sdk-store-"));
|
||||
const storagePath = path.join(tempDir, "bot-storage.json");
|
||||
|
||||
try {
|
||||
const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, {
|
||||
storagePath,
|
||||
});
|
||||
|
||||
const store = lastCreateClientOpts?.store as { flush: () => Promise<void> } | undefined;
|
||||
expect(store).toBeTruthy();
|
||||
const flushSpy = vi.spyOn(store!, "flush").mockResolvedValue();
|
||||
|
||||
await client.stopAndPersist();
|
||||
|
||||
expect(flushSpy).toHaveBeenCalledTimes(1);
|
||||
expect(matrixJsClient.stopClient).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
fs.rmSync(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("MatrixClient event bridge", () => {
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
import { VerificationMethod } from "matrix-js-sdk/lib/types.js";
|
||||
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
|
||||
import { resolveMatrixRoomKeyBackupReadinessError } from "./backup-health.js";
|
||||
import { FileBackedMatrixSyncStore } from "./client/file-sync-store.js";
|
||||
import { createMatrixJsSdkClientLogger } from "./client/logging.js";
|
||||
import { MatrixCryptoBootstrapper } from "./sdk/crypto-bootstrap.js";
|
||||
import type { MatrixCryptoBootstrapResult } from "./sdk/crypto-bootstrap.js";
|
||||
@@ -174,6 +175,7 @@ export class MatrixClient {
|
||||
private readonly initialSyncLimit?: number;
|
||||
private readonly encryptionEnabled: boolean;
|
||||
private readonly password?: string;
|
||||
private readonly syncStore?: FileBackedMatrixSyncStore;
|
||||
private readonly idbSnapshotPath?: string;
|
||||
private readonly cryptoDatabasePrefix?: string;
|
||||
private bridgeRegistered = false;
|
||||
@@ -211,6 +213,7 @@ export class MatrixClient {
|
||||
localTimeoutMs?: number;
|
||||
encryption?: boolean;
|
||||
initialSyncLimit?: number;
|
||||
storagePath?: string;
|
||||
recoveryKeyPath?: string;
|
||||
idbSnapshotPath?: string;
|
||||
cryptoDatabasePrefix?: string;
|
||||
@@ -222,6 +225,7 @@ export class MatrixClient {
|
||||
this.initialSyncLimit = opts.initialSyncLimit;
|
||||
this.encryptionEnabled = opts.encryption === true;
|
||||
this.password = opts.password;
|
||||
this.syncStore = opts.storagePath ? new FileBackedMatrixSyncStore(opts.storagePath) : undefined;
|
||||
this.idbSnapshotPath = opts.idbSnapshotPath;
|
||||
this.cryptoDatabasePrefix = opts.cryptoDatabasePrefix;
|
||||
this.selfUserId = opts.userId?.trim() || null;
|
||||
@@ -237,6 +241,7 @@ export class MatrixClient {
|
||||
deviceId: opts.deviceId,
|
||||
logger: createMatrixJsSdkClientLogger("MatrixClient"),
|
||||
localTimeoutMs: this.localTimeoutMs,
|
||||
store: this.syncStore,
|
||||
cryptoCallbacks: cryptoCallbacks as never,
|
||||
verificationMethods: [
|
||||
VerificationMethod.Sas,
|
||||
@@ -343,6 +348,10 @@ export class MatrixClient {
|
||||
}
|
||||
}
|
||||
|
||||
hasPersistedSyncState(): boolean {
|
||||
return this.syncStore?.hasSavedSync() === true;
|
||||
}
|
||||
|
||||
private async ensureStartedForCryptoControlPlane(): Promise<void> {
|
||||
if (this.started) {
|
||||
return;
|
||||
@@ -357,10 +366,13 @@ export class MatrixClient {
|
||||
}
|
||||
this.decryptBridge.stop();
|
||||
// Final persist on shutdown
|
||||
this.stopPersistPromise = persistIdbToDisk({
|
||||
snapshotPath: this.idbSnapshotPath,
|
||||
databasePrefix: this.cryptoDatabasePrefix,
|
||||
}).catch(noop);
|
||||
this.stopPersistPromise = Promise.all([
|
||||
persistIdbToDisk({
|
||||
snapshotPath: this.idbSnapshotPath,
|
||||
databasePrefix: this.cryptoDatabasePrefix,
|
||||
}).catch(noop),
|
||||
this.syncStore?.flush().catch(noop),
|
||||
]).then(() => undefined);
|
||||
this.client.stopClient();
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user