refactor(matrix): store sync cache in sqlite

Move Matrix sync cache state into plugin SQLite storage, with startup and doctor migrations for readable legacy bot-storage.json files.\n\nVerification: focused Matrix and QA tests passed locally; focused touched-file oxlint and git diff --check passed; autoreview clean. CI failures are current main/unrelated: lint/type/madge/gateway-watch issues outside the Matrix diff.
This commit is contained in:
Peter Steinberger
2026-06-06 22:17:41 -07:00
committed by GitHub
parent 690a04f81e
commit e8348c0dc8
15 changed files with 1434 additions and 234 deletions

View File

@@ -0,0 +1,124 @@
// Matrix tests cover doctor contract state migrations.
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-contracts";
import type {
OpenKeyedStoreOptions,
PluginStateKeyedStore,
} from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import type { PluginDoctorStateMigrationContext } from "openclaw/plugin-sdk/runtime-doctor";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { stateMigrations } from "./doctor-contract-api.js";
import { SqliteBackedMatrixSyncStore } from "./src/matrix/client/file-sync-store.js";
import { installMatrixTestRuntime } from "./src/test-runtime.js";
function createContext(): PluginDoctorStateMigrationContext {
return {
openPluginStateKeyedStore: <T>(options: OpenKeyedStoreOptions): PluginStateKeyedStore<T> =>
createPluginStateKeyedStoreForTests<T>("matrix", options),
};
}
function createMigrationParams(stateDir: string) {
return {
config: {} as OpenClawConfig,
env: { OPENCLAW_STATE_DIR: stateDir },
stateDir,
oauthDir: path.join(stateDir, "oauth"),
context: createContext(),
};
}
describe("matrix doctor contract state migrations", () => {
const tempDirs: string[] = [];
beforeEach(() => {
resetPluginStateStoreForTests();
installMatrixTestRuntime();
});
afterEach(() => {
resetPluginStateStoreForTests();
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { recursive: true, force: true });
}
});
it("migrates legacy sync cache JSON to SQLite plugin state", async () => {
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-doctor-"));
tempDirs.push(stateDir);
const storageRootDir = path.join(
stateDir,
"matrix",
"accounts",
"default",
"matrix.example.org__bot",
"token-hash",
);
fs.mkdirSync(storageRootDir, { recursive: true });
fs.writeFileSync(
path.join(storageRootDir, "bot-storage.json"),
JSON.stringify({
version: 1,
savedSync: {
nextBatch: "legacy-token",
accountData: [],
roomsData: {
join: {},
invite: {},
leave: {},
knock: {},
},
},
cleanShutdown: true,
}),
);
const migration = stateMigrations[0];
await expect(migration.detectLegacyState(createMigrationParams(stateDir))).resolves.toEqual({
preview: [`Matrix sync cache JSON can migrate to SQLite: ${storageRootDir}`],
});
await expect(migration.migrateLegacyState(createMigrationParams(stateDir))).resolves.toEqual({
changes: [
`Migrated Matrix sync cache JSON to SQLite for ${storageRootDir}`,
`Archived Matrix sync cache legacy source -> ${path.join(storageRootDir, "bot-storage.json")}.migrated`,
],
warnings: [],
});
const store = new SqliteBackedMatrixSyncStore(storageRootDir);
expect(store.hasSavedSync()).toBe(true);
expect(store.hasSavedSyncFromCleanShutdown()).toBe(true);
await expect(store.getSavedSyncToken()).resolves.toBe("legacy-token");
expect(fs.existsSync(path.join(storageRootDir, "bot-storage.json"))).toBe(false);
});
it("does not archive the legacy flat sync cache into an unread SQLite root", async () => {
const stateDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-doctor-"));
tempDirs.push(stateDir);
const flatRoot = path.join(stateDir, "matrix");
fs.mkdirSync(flatRoot, { recursive: true });
fs.writeFileSync(
path.join(flatRoot, "bot-storage.json"),
JSON.stringify({
next_batch: "flat-token",
rooms: { join: {} },
account_data: { events: [] },
}),
);
const migration = stateMigrations[0];
await expect(migration.detectLegacyState(createMigrationParams(stateDir))).resolves.toBeNull();
await expect(migration.migrateLegacyState(createMigrationParams(stateDir))).resolves.toEqual({
changes: [],
warnings: [],
});
expect(fs.existsSync(path.join(flatRoot, "bot-storage.json"))).toBe(true);
});
});

View File

@@ -1,2 +1,117 @@
import type { Dirent } from "node:fs";
// Matrix API module exposes the plugin public contract.
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
hasMatrixSyncCacheStateInStore,
openMatrixSyncCacheStoreOptions,
readLegacyMatrixSyncCacheState,
writeMatrixSyncCacheStateToStore,
type MatrixSyncCacheRecord,
} from "./src/matrix/client/file-sync-store.js";
export { normalizeCompatibilityConfig, legacyConfigRules } from "./src/doctor-contract.js";
const MATRIX_SYNC_CACHE_FILENAME = "bot-storage.json";
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function collectLegacySyncCacheRoots(stateDir: string): Promise<string[]> {
const matrixRoot = path.join(stateDir, "matrix");
const roots: string[] = [];
async function visit(dir: string): Promise<void> {
let entries: Dirent[];
try {
entries = await fs.readdir(dir, { withFileTypes: true });
} catch {
return;
}
for (const entry of entries) {
const entryPath = path.join(dir, entry.name);
if (entry.isFile() && entry.name === MATRIX_SYNC_CACHE_FILENAME) {
roots.push(dir);
continue;
}
if (entry.isDirectory()) {
await visit(entryPath);
}
}
}
await visit(matrixRoot);
return roots.filter((root) => path.resolve(root) !== path.resolve(matrixRoot)).toSorted();
}
async function archiveLegacySyncCache(params: {
storageRootDir: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const sourcePath = path.join(params.storageRootDir, MATRIX_SYNC_CACHE_FILENAME);
const archivedPath = `${sourcePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated Matrix sync cache in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(sourcePath, archivedPath);
params.changes.push(`Archived Matrix sync cache legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving Matrix sync cache legacy source: ${String(err)}`);
}
}
export const stateMigrations: PluginDoctorStateMigration[] = [
{
id: "matrix-sync-cache-json-to-plugin-state",
label: "Matrix sync cache",
async detectLegacyState(params) {
const previews: string[] = [];
for (const storageRootDir of await collectLegacySyncCacheRoots(params.stateDir)) {
const persisted = await readLegacyMatrixSyncCacheState(storageRootDir);
if (!persisted) {
continue;
}
previews.push(`Matrix sync cache JSON can migrate to SQLite: ${storageRootDir}`);
}
return previews.length > 0 ? { preview: previews } : null;
},
async migrateLegacyState(params) {
const changes: string[] = [];
const warnings: string[] = [];
for (const storageRootDir of await collectLegacySyncCacheRoots(params.stateDir)) {
const persisted = await readLegacyMatrixSyncCacheState(storageRootDir);
if (!persisted) {
continue;
}
const store = params.context.openPluginStateKeyedStore<MatrixSyncCacheRecord>(
openMatrixSyncCacheStoreOptions(storageRootDir),
);
if (await hasMatrixSyncCacheStateInStore({ storageRootDir, store })) {
warnings.push(
`Skipped Matrix sync cache import for ${storageRootDir} because SQLite already has sync cache state`,
);
await archiveLegacySyncCache({ storageRootDir, changes, warnings });
continue;
}
await writeMatrixSyncCacheStateToStore({
storageRootDir,
payload: persisted,
store,
});
changes.push(`Migrated Matrix sync cache JSON to SQLite for ${storageRootDir}`);
await archiveLegacySyncCache({ storageRootDir, changes, warnings });
}
return { changes, warnings };
},
},
];

View File

@@ -77,7 +77,7 @@ describe("createMatrixClient", () => {
encryption: undefined,
localTimeoutMs: undefined,
initialSyncLimit: undefined,
storagePath: storagePaths.storagePath,
storageRootDir: storagePaths.rootDir,
recoveryKeyPath: storagePaths.recoveryKeyPath,
idbSnapshotPath: storagePaths.idbSnapshotPath,
cryptoDatabasePrefix: "openclaw-matrix-default-token-hash",
@@ -103,7 +103,7 @@ describe("createMatrixClient", () => {
encryption: undefined,
localTimeoutMs: undefined,
initialSyncLimit: undefined,
storagePath: undefined,
storageRootDir: undefined,
recoveryKeyPath: undefined,
idbSnapshotPath: undefined,
cryptoDatabasePrefix: undefined,
@@ -131,7 +131,7 @@ describe("createMatrixClient", () => {
encryption: undefined,
localTimeoutMs: undefined,
initialSyncLimit: undefined,
storagePath: undefined,
storageRootDir: undefined,
recoveryKeyPath: undefined,
idbSnapshotPath: undefined,
cryptoDatabasePrefix: undefined,
@@ -156,7 +156,7 @@ describe("createMatrixClient", () => {
encryption: undefined,
localTimeoutMs: undefined,
initialSyncLimit: undefined,
storagePath: undefined,
storageRootDir: undefined,
recoveryKeyPath: undefined,
idbSnapshotPath: undefined,
cryptoDatabasePrefix: undefined,
@@ -183,7 +183,7 @@ describe("createMatrixClient", () => {
encryption: undefined,
localTimeoutMs: undefined,
initialSyncLimit: undefined,
storagePath: undefined,
storageRootDir: undefined,
recoveryKeyPath: undefined,
idbSnapshotPath: undefined,
cryptoDatabasePrefix: undefined,

View File

@@ -94,7 +94,7 @@ export async function createMatrixClient(params: {
encryption: params.encryption,
localTimeoutMs: params.localTimeoutMs,
initialSyncLimit: params.initialSyncLimit,
storagePath: storagePaths?.storagePath,
storageRootDir: storagePaths?.rootDir,
recoveryKeyPath: storagePaths?.recoveryKeyPath,
idbSnapshotPath: storagePaths?.idbSnapshotPath,
cryptoDatabasePrefix,

View File

@@ -1,11 +1,20 @@
// Matrix tests cover file sync store plugin behavior.
// Matrix tests cover sync cache plugin behavior.
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import type { ISyncResponse } from "matrix-js-sdk/lib/matrix.js";
import * as jsonStore from "openclaw/plugin-sdk/json-store";
import { afterEach, describe, expect, it, vi } from "vitest";
import { FileBackedMatrixSyncStore } from "./file-sync-store.js";
import {
createPluginStateSyncKeyedStoreForTests,
resetPluginStateStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { getMatrixRuntime } from "../../runtime.js";
import { installMatrixTestRuntime } from "../../test-runtime.js";
import {
openMatrixSyncCacheStoreOptions,
SqliteBackedMatrixSyncStore,
type MatrixSyncCacheRecord,
} from "./file-sync-store.js";
function createSyncResponse(nextBatch: string): ISyncResponse {
return {
@@ -52,44 +61,40 @@ function createSyncResponse(nextBatch: string): ISyncResponse {
};
}
function createDeferred() {
let resolve: (() => void) | undefined;
const promise = new Promise<void>((resolvePromise) => {
resolve = resolvePromise;
});
if (!resolve) {
throw new Error("Expected deferred resolver to be initialized");
}
return { promise, resolve };
}
describe("FileBackedMatrixSyncStore", () => {
describe("SqliteBackedMatrixSyncStore", () => {
const tempDirs: string[] = [];
function createStoragePath(): string {
function createStorageRoot(): string {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
tempDirs.push(tempDir);
return path.join(tempDir, "bot-storage.json");
return tempDir;
}
beforeEach(() => {
resetPluginStateStoreForTests();
installMatrixTestRuntime();
});
afterEach(() => {
vi.restoreAllMocks();
vi.useRealTimers();
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { recursive: true, force: true });
}
resetPluginStateStoreForTests();
});
it("persists sync data so restart resumes from the saved cursor", async () => {
const storagePath = createStoragePath();
const storageRoot = createStorageRoot();
const syncResponse = createSyncResponse("s123");
const firstStore = new FileBackedMatrixSyncStore(storagePath);
const firstStore = new SqliteBackedMatrixSyncStore(storageRoot);
expect(firstStore.hasSavedSync()).toBe(false);
await firstStore.setSyncData(syncResponse);
await firstStore.flush();
expect(fs.existsSync(path.join(storageRoot, "bot-storage.json"))).toBe(false);
const secondStore = new FileBackedMatrixSyncStore(storagePath);
const secondStore = new SqliteBackedMatrixSyncStore(storageRoot);
expect(secondStore.hasSavedSync()).toBe(true);
await expect(secondStore.getSavedSyncToken()).resolves.toBe("s123");
@@ -133,11 +138,58 @@ describe("FileBackedMatrixSyncStore", () => {
expect(secondStore.hasSavedSyncFromCleanShutdown()).toBe(false);
});
it("restores the sync cache after the storage root moves", async () => {
const storageRoot = createStorageRoot();
const movedStorageRoot = `${storageRoot}-moved`;
const firstStore = new SqliteBackedMatrixSyncStore(storageRoot);
await firstStore.setSyncData(createSyncResponse("portable-token"));
await firstStore.flush();
resetPluginStateStoreForTests();
fs.renameSync(storageRoot, movedStorageRoot);
tempDirs.push(movedStorageRoot);
const secondStore = new SqliteBackedMatrixSyncStore(movedStorageRoot);
expect(secondStore.hasSavedSync()).toBe(true);
await expect(secondStore.getSavedSyncToken()).resolves.toBe("portable-token");
});
it("ignores metadata with impossible chunk counts", async () => {
const storageRoot = createStorageRoot();
const store = createPluginStateSyncKeyedStoreForTests<MatrixSyncCacheRecord>(
"matrix",
openMatrixSyncCacheStoreOptions(storageRoot),
);
store.register("current:meta", {
kind: "meta",
version: 1,
generation: "corrupt",
chunkCount: 20_000,
cleanShutdown: true,
});
const syncStore = new SqliteBackedMatrixSyncStore(storageRoot);
expect(syncStore.hasSavedSync()).toBe(false);
await expect(syncStore.getSavedSyncToken()).resolves.toBe(null);
});
it("fails persistence instead of silently dropping sync data when sqlite is unavailable", async () => {
const storageRoot = createStorageRoot();
const runtime = getMatrixRuntime();
vi.spyOn(runtime.state, "openSyncKeyedStore").mockImplementation(() => {
throw new Error("sqlite unavailable");
});
const syncStore = new SqliteBackedMatrixSyncStore(storageRoot);
await syncStore.setSyncData(createSyncResponse("unavailable-token"));
await expect(syncStore.flush()).rejects.toThrow(/sqlite store is unavailable/i);
});
it("claims current-token storage ownership when sync state is persisted", async () => {
const storagePath = createStoragePath();
const rootDir = path.dirname(storagePath);
const storageRoot = createStorageRoot();
fs.writeFileSync(
path.join(rootDir, "storage-meta.json"),
path.join(storageRoot, "storage-meta.json"),
JSON.stringify({
homeserver: "https://matrix.example.org",
userId: "@bot:example.org",
@@ -148,50 +200,50 @@ describe("FileBackedMatrixSyncStore", () => {
"utf8",
);
const store = new FileBackedMatrixSyncStore(storagePath);
const store = new SqliteBackedMatrixSyncStore(storageRoot);
await store.setSyncData(createSyncResponse("claimed-token"));
await store.flush();
const meta = JSON.parse(fs.readFileSync(path.join(rootDir, "storage-meta.json"), "utf8")) as {
currentTokenStateClaimed?: boolean;
};
const meta = JSON.parse(
fs.readFileSync(path.join(storageRoot, "storage-meta.json"), "utf8"),
) as { currentTokenStateClaimed?: boolean };
expect(meta.currentTokenStateClaimed).toBe(true);
});
it("only treats sync state as restart-safe after a clean shutdown persist", async () => {
const storagePath = createStoragePath();
const storageRoot = createStorageRoot();
const firstStore = new FileBackedMatrixSyncStore(storagePath);
const firstStore = new SqliteBackedMatrixSyncStore(storageRoot);
await firstStore.setSyncData(createSyncResponse("s123"));
await firstStore.flush();
const afterDirtyPersist = new FileBackedMatrixSyncStore(storagePath);
const afterDirtyPersist = new SqliteBackedMatrixSyncStore(storageRoot);
expect(afterDirtyPersist.hasSavedSync()).toBe(true);
expect(afterDirtyPersist.hasSavedSyncFromCleanShutdown()).toBe(false);
firstStore.markCleanShutdown();
await firstStore.flush();
const afterCleanShutdown = new FileBackedMatrixSyncStore(storagePath);
const afterCleanShutdown = new SqliteBackedMatrixSyncStore(storageRoot);
expect(afterCleanShutdown.hasSavedSync()).toBe(true);
expect(afterCleanShutdown.hasSavedSyncFromCleanShutdown()).toBe(true);
});
it("clears the clean-shutdown marker once fresh sync data arrives", async () => {
const storagePath = createStoragePath();
const storageRoot = createStorageRoot();
const firstStore = new FileBackedMatrixSyncStore(storagePath);
const firstStore = new SqliteBackedMatrixSyncStore(storageRoot);
await firstStore.setSyncData(createSyncResponse("s123"));
firstStore.markCleanShutdown();
await firstStore.flush();
const restartedStore = new FileBackedMatrixSyncStore(storagePath);
const restartedStore = new SqliteBackedMatrixSyncStore(storageRoot);
expect(restartedStore.hasSavedSyncFromCleanShutdown()).toBe(true);
await restartedStore.setSyncData(createSyncResponse("s456"));
await restartedStore.flush();
const afterNewSync = new FileBackedMatrixSyncStore(storagePath);
const afterNewSync = new SqliteBackedMatrixSyncStore(storageRoot);
expect(afterNewSync.hasSavedSync()).toBe(true);
expect(afterNewSync.hasSavedSyncFromCleanShutdown()).toBe(false);
await expect(afterNewSync.getSavedSyncToken()).resolves.toBe("s456");
@@ -199,128 +251,46 @@ describe("FileBackedMatrixSyncStore", () => {
it("coalesces background persistence until the debounce window elapses", async () => {
vi.useFakeTimers();
const storagePath = createStoragePath();
const writeSpy = vi.spyOn(jsonStore, "writeJsonFileAtomically").mockResolvedValue();
const storageRoot = createStorageRoot();
const store = new FileBackedMatrixSyncStore(storagePath);
const store = new SqliteBackedMatrixSyncStore(storageRoot);
await store.setSyncData(createSyncResponse("s111"));
await store.setSyncData(createSyncResponse("s222"));
await store.storeClientOptions({ lazyLoadMembers: true });
expect(writeSpy).not.toHaveBeenCalled();
const beforeDebounce = new SqliteBackedMatrixSyncStore(storageRoot);
expect(beforeDebounce.hasSavedSync()).toBe(false);
await vi.advanceTimersByTimeAsync(249);
expect(writeSpy).not.toHaveBeenCalled();
const beforeElapsed = new SqliteBackedMatrixSyncStore(storageRoot);
expect(beforeElapsed.hasSavedSync()).toBe(false);
await vi.advanceTimersByTimeAsync(1);
await Promise.resolve();
expect(writeSpy).toHaveBeenCalledTimes(1);
expect(writeSpy.mock.calls.at(0)).toEqual([
storagePath,
{
version: 1,
savedSync: {
nextBatch: "s222",
accountData: createSyncResponse("s222").account_data.events,
roomsData: {
join: {
"!room:example.org": {
summary: {
"m.heroes": [],
"m.invited_member_count": undefined,
"m.joined_member_count": undefined,
},
state: { events: [] },
"org.matrix.msc4222.state_after": { events: [] },
timeline: {
events: [
{
content: {
body: "hello",
msgtype: "m.text",
},
event_id: "$message",
origin_server_ts: 1,
sender: "@user:example.org",
type: "m.room.message",
},
{
content: {
body: "hello",
msgtype: "m.text",
},
event_id: "$message",
origin_server_ts: 1,
sender: "@user:example.org",
type: "m.room.message",
},
],
prev_batch: "t0",
},
ephemeral: { events: [] },
account_data: { events: [] },
unread_notifications: {},
unread_thread_notifications: undefined,
msc4354_sticky: undefined,
},
},
invite: {},
leave: {},
knock: {},
},
},
cleanShutdown: false,
clientOptions: {
lazyLoadMembers: true,
},
},
]);
await store.flush();
});
it("waits for an in-flight persist when shutdown flush runs", async () => {
vi.useFakeTimers();
const storagePath = createStoragePath();
const writeDeferred = createDeferred();
const writeSpy = vi
.spyOn(jsonStore, "writeJsonFileAtomically")
.mockImplementation(async () => writeDeferred.promise);
const store = new FileBackedMatrixSyncStore(storagePath);
await store.setSyncData(createSyncResponse("s777"));
await vi.advanceTimersByTimeAsync(250);
let flushCompleted = false;
const flushPromise = store.flush().then(() => {
flushCompleted = true;
});
await Promise.resolve();
expect(writeSpy).toHaveBeenCalledTimes(1);
expect(flushCompleted).toBe(false);
writeDeferred.resolve();
await flushPromise;
expect(flushCompleted).toBe(true);
const persisted = new SqliteBackedMatrixSyncStore(storageRoot);
expect(persisted.hasSavedSync()).toBe(true);
await expect(persisted.getSavedSyncToken()).resolves.toBe("s222");
await expect(persisted.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true });
});
it("persists client options alongside sync state", async () => {
const storagePath = createStoragePath();
const storageRoot = createStorageRoot();
const firstStore = new FileBackedMatrixSyncStore(storagePath);
const firstStore = new SqliteBackedMatrixSyncStore(storageRoot);
await firstStore.storeClientOptions({ lazyLoadMembers: true });
await firstStore.flush();
const secondStore = new FileBackedMatrixSyncStore(storagePath);
const secondStore = new SqliteBackedMatrixSyncStore(storageRoot);
await expect(secondStore.getClientOptions()).resolves.toEqual({ lazyLoadMembers: true });
});
it("loads legacy raw sync payloads from bot-storage.json", async () => {
const storagePath = createStoragePath();
it("ignores legacy raw sync cache files", async () => {
const storageRoot = createStorageRoot();
fs.writeFileSync(
storagePath,
path.join(storageRoot, "bot-storage.json"),
JSON.stringify({
next_batch: "legacy-token",
rooms: {
@@ -333,18 +303,8 @@ describe("FileBackedMatrixSyncStore", () => {
"utf8",
);
const store = new FileBackedMatrixSyncStore(storagePath);
expect(store.hasSavedSync()).toBe(true);
await expect(store.getSavedSyncToken()).resolves.toBe("legacy-token");
await expect(store.getSavedSync()).resolves.toEqual({
nextBatch: "legacy-token",
roomsData: {
join: {},
invite: {},
leave: {},
knock: {},
},
accountData: [],
});
const store = new SqliteBackedMatrixSyncStore(storageRoot);
expect(store.hasSavedSync()).toBe(false);
await expect(store.getSavedSyncToken()).resolves.toBe(null);
});
});

View File

@@ -1,5 +1,5 @@
// Matrix plugin module implements file sync store behavior.
import { readFileSync } from "node:fs";
// Matrix plugin module implements SQLite sync cache behavior.
import { createHash, randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import {
@@ -11,22 +11,56 @@ import {
type ISyncResponse,
type IStoredClientOpts,
} from "matrix-js-sdk/lib/matrix.js";
import { writeJsonFileAtomically } from "openclaw/plugin-sdk/json-store";
import type {
PluginStateKeyedStore,
PluginStateSyncKeyedStore,
} from "openclaw/plugin-sdk/plugin-state-runtime";
import { isRecord } from "../../record-shared.js";
import { getMatrixRuntime } from "../../runtime.js";
import { createAsyncLock } from "../async-lock.js";
import { LogService } from "../sdk/logger.js";
import { resolveMatrixSqliteStateEnv } from "../sqlite-state.js";
import { claimCurrentTokenStorageState } from "./storage.js";
const STORE_VERSION = 1;
const PERSIST_DEBOUNCE_MS = 250;
const SYNC_CACHE_NAMESPACE = "sync-cache";
const SYNC_CACHE_MAX_ENTRIES = 20_000;
const SYNC_CACHE_MAX_CHUNKS = Math.floor((SYNC_CACHE_MAX_ENTRIES - 1) / 2);
const SYNC_CACHE_STATE_KEY = "current";
// PluginState serializes this string inside a row object; 24KB leaves room for JSON escaping.
const SYNC_CACHE_CHUNK_BYTES = 24_000;
type PersistedMatrixSyncStore = {
export type PersistedMatrixSyncStore = {
version: number;
savedSync: ISyncData | null;
clientOptions?: IStoredClientOpts;
cleanShutdown?: boolean;
};
type MatrixSyncCacheMeta = {
kind: "meta";
version: number;
generation: string;
chunkCount: number;
syncDigest?: string;
clientOptions?: IStoredClientOpts;
cleanShutdown?: boolean;
};
type MatrixSyncCacheChunk = {
kind: "sync-chunk";
index: number;
data: string;
};
export type MatrixSyncCacheRecord = MatrixSyncCacheMeta | MatrixSyncCacheChunk;
type MatrixSyncCacheAsyncStore = Pick<
PluginStateKeyedStore<MatrixSyncCacheRecord>,
"delete" | "entries" | "lookup" | "register"
>;
function normalizeRoomsData(value: unknown): IRooms | null {
if (!isRecord(value)) {
return null;
@@ -80,36 +114,30 @@ function toPersistedSyncData(value: unknown): ISyncData | null {
return null;
}
function readPersistedStore(raw: string): PersistedMatrixSyncStore | null {
try {
const parsed = JSON.parse(raw) as {
version?: unknown;
savedSync?: unknown;
clientOptions?: unknown;
cleanShutdown?: unknown;
};
const savedSync = toPersistedSyncData(parsed.savedSync);
if (parsed.version === STORE_VERSION) {
return {
version: STORE_VERSION,
savedSync,
clientOptions: isRecord(parsed.clientOptions)
? (parsed.clientOptions as IStoredClientOpts)
: undefined,
cleanShutdown: parsed.cleanShutdown === true,
};
}
// Backward-compat: prior Matrix state files stored the raw sync blob at the
// top level without versioning or wrapped metadata.
return {
version: STORE_VERSION,
savedSync: toPersistedSyncData(parsed),
cleanShutdown: false,
};
} catch {
function normalizePersistedStore(value: unknown): PersistedMatrixSyncStore | null {
if (!isRecord(value) || value.version !== STORE_VERSION) {
return null;
}
return {
version: STORE_VERSION,
savedSync: toPersistedSyncData(value.savedSync),
clientOptions: isRecord(value.clientOptions)
? (value.clientOptions as IStoredClientOpts)
: undefined,
cleanShutdown: value.cleanShutdown === true,
};
}
function normalizeLegacyPersistedStore(value: unknown): PersistedMatrixSyncStore | null {
const persisted = normalizePersistedStore(value);
if (persisted) {
return persisted;
}
return {
version: STORE_VERSION,
savedSync: toPersistedSyncData(value),
cleanShutdown: false,
};
}
function cloneJson<T>(value: T): T {
@@ -126,9 +154,12 @@ function syncDataToSyncResponse(syncData: ISyncData): ISyncResponse {
};
}
export class FileBackedMatrixSyncStore extends MemoryStore {
export class SqliteBackedMatrixSyncStore extends MemoryStore {
private readonly persistLock = createAsyncLock();
private readonly accumulator = new SyncAccumulator();
private readonly stateKey: string;
private readonly store: PluginStateSyncKeyedStore<MatrixSyncCacheRecord>;
private readonly storeUnavailableError: unknown;
private savedSync: ISyncData | null = null;
private savedClientOptions: IStoredClientOpts | undefined;
private readonly hadSavedSyncOnLoad: boolean;
@@ -138,21 +169,29 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
private persistTimer: NodeJS.Timeout | null = null;
private persistPromise: Promise<void> | null = null;
constructor(private readonly storagePath: string) {
constructor(private readonly storageRootDir: string) {
super();
this.stateKey = resolveSyncCacheStateKey(storageRootDir);
let restoredSavedSync: ISyncData | null = null;
let restoredClientOptions: IStoredClientOpts | undefined;
let restoredCleanShutdown = false;
let syncCacheStore = createNoopMatrixSyncCacheStore();
let syncCacheStoreUnavailableError: unknown;
try {
const raw = readFileSync(this.storagePath, "utf8");
const persisted = readPersistedStore(raw);
restoredSavedSync = persisted?.savedSync ?? null;
restoredClientOptions = persisted?.clientOptions;
restoredCleanShutdown = persisted?.cleanShutdown === true;
} catch {
// Missing or unreadable sync cache should not block startup.
syncCacheStore = openMatrixSyncCacheStore(storageRootDir);
const persisted = readPersistedStoreFromSyncStore(syncCacheStore, this.stateKey);
if (persisted) {
restoredSavedSync = persisted.savedSync;
restoredClientOptions = persisted.clientOptions;
restoredCleanShutdown = persisted.cleanShutdown === true;
}
} catch (err) {
syncCacheStoreUnavailableError = err;
LogService.warn("MatrixSyncCacheStore", "Failed to load Matrix sync cache:", err);
}
this.store = syncCacheStore;
this.storeUnavailableError = syncCacheStoreUnavailableError;
this.savedSync = restoredSavedSync;
this.savedClientOptions = restoredClientOptions;
@@ -219,6 +258,7 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
}
override async deleteAllData(): Promise<void> {
this.assertStoreAvailable();
if (this.persistTimer) {
clearTimeout(this.persistTimer);
this.persistTimer = null;
@@ -229,7 +269,15 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
this.savedSync = null;
this.savedClientOptions = undefined;
this.cleanShutdown = false;
await fs.rm(this.storagePath, { force: true }).catch(() => undefined);
this.store.delete(metaKey(this.stateKey));
for (const row of this.store.entries()) {
if (row.key.startsWith(chunkKeyPrefix(this.stateKey))) {
this.store.delete(row.key);
}
}
await fs
.rm(resolveLegacySyncCachePath(this.storageRootDir), { force: true })
.catch(() => undefined);
}
markCleanShutdown(): void {
@@ -261,13 +309,14 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
this.persistTimer = setTimeout(() => {
this.persistTimer = null;
void this.flush().catch((err: unknown) => {
LogService.warn("MatrixFileSyncStore", "Failed to persist Matrix sync store:", err);
LogService.warn("MatrixSyncCacheStore", "Failed to persist Matrix sync store:", err);
});
}, PERSIST_DEBOUNCE_MS);
this.persistTimer.unref?.();
}
private async persist(): Promise<void> {
this.assertStoreAvailable();
this.dirty = false;
const payload: PersistedMatrixSyncStore = {
version: STORE_VERSION,
@@ -277,9 +326,9 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
};
try {
await this.persistLock(async () => {
await writeJsonFileAtomically(this.storagePath, payload);
this.writePersistedStore(payload);
claimCurrentTokenStorageState({
rootDir: path.dirname(this.storagePath),
rootDir: this.storageRootDir,
});
});
} catch (err) {
@@ -287,4 +336,273 @@ export class FileBackedMatrixSyncStore extends MemoryStore {
throw err;
}
}
private writePersistedStore(payload: PersistedMatrixSyncStore): void {
const rows = buildSyncCacheRows(this.stateKey, payload);
for (const row of rows.chunks) {
this.store.register(row.key, row.value);
}
this.store.register(rows.meta.key, rows.meta.value);
for (const row of this.store.entries()) {
if (row.key.startsWith(chunkKeyPrefix(this.stateKey)) && !rows.nextChunkKeys.has(row.key)) {
this.store.delete(row.key);
}
}
}
private assertStoreAvailable(): void {
if (this.storeUnavailableError == null) {
return;
}
throw new Error("Matrix sync cache SQLite store is unavailable; cannot persist sync state", {
cause: this.storeUnavailableError,
});
}
}
function createNoopMatrixSyncCacheStore(): PluginStateSyncKeyedStore<MatrixSyncCacheRecord> {
return {
register: () => {},
registerIfAbsent: () => false,
lookup: () => undefined,
consume: () => undefined,
delete: () => false,
entries: () => [],
clear: () => {},
};
}
function readPersistedStoreFromSyncStore(
store: PluginStateSyncKeyedStore<MatrixSyncCacheRecord>,
stateKey: string,
): PersistedMatrixSyncStore | null {
const meta = store.lookup(metaKey(stateKey));
if (!isSyncCacheMeta(meta)) {
return null;
}
const chunks: string[] = [];
for (let index = 0; index < meta.chunkCount; index += 1) {
const chunk = store.lookup(chunkKey(stateKey, meta.generation, index));
if (!isSyncCacheChunk(chunk) || chunk.index !== index) {
return normalizePersistedStore({
version: STORE_VERSION,
savedSync: null,
clientOptions: meta.clientOptions,
cleanShutdown: false,
});
}
chunks.push(chunk.data);
}
let savedSync: ISyncData | null = null;
if (chunks.length > 0) {
const syncJson = chunks.join("");
if (meta.syncDigest !== digestText(syncJson)) {
return normalizePersistedStore({
version: STORE_VERSION,
savedSync: null,
clientOptions: meta.clientOptions,
cleanShutdown: false,
});
}
try {
savedSync = toPersistedSyncData(JSON.parse(syncJson));
} catch {
savedSync = null;
}
}
return normalizePersistedStore({
version: STORE_VERSION,
savedSync,
clientOptions: meta.clientOptions,
cleanShutdown: meta.cleanShutdown,
});
}
function openMatrixSyncCacheStore(
storageRootDir: string,
): PluginStateSyncKeyedStore<MatrixSyncCacheRecord> {
return getMatrixRuntime().state.openSyncKeyedStore<MatrixSyncCacheRecord>(
openMatrixSyncCacheStoreOptions(storageRootDir),
);
}
function resolveSyncCacheStateKey(_storageRootDir: string): string {
return SYNC_CACHE_STATE_KEY;
}
function metaKey(stateKey: string): string {
return `${stateKey}:meta`;
}
function chunkKeyPrefix(stateKey: string): string {
return `${stateKey}:sync:`;
}
function chunkKey(stateKey: string, generation: string, index: number): string {
return `${chunkKeyPrefix(stateKey)}${generation}:${index}`;
}
function resolveLegacySyncCachePath(storageRootDir: string): string {
return path.join(storageRootDir, "bot-storage.json");
}
function digestText(value: string): string {
return createHash("sha256").update(value, "utf8").digest("hex");
}
function isSyncCacheMeta(value: unknown): value is MatrixSyncCacheMeta {
return (
isRecord(value) &&
value.kind === "meta" &&
value.version === STORE_VERSION &&
typeof value.generation === "string" &&
value.generation.trim() !== "" &&
typeof value.chunkCount === "number" &&
Number.isSafeInteger(value.chunkCount) &&
value.chunkCount >= 0 &&
value.chunkCount <= SYNC_CACHE_MAX_CHUNKS
);
}
function isSyncCacheChunk(value: unknown): value is MatrixSyncCacheChunk {
return (
isRecord(value) &&
value.kind === "sync-chunk" &&
typeof value.index === "number" &&
Number.isSafeInteger(value.index) &&
value.index >= 0 &&
typeof value.data === "string"
);
}
function chunkSyncCacheJson(value: string): string[] {
const chunks: string[] = [];
const pushChunk = (chunk: string) => {
if (chunks.length >= SYNC_CACHE_MAX_CHUNKS) {
throw new Error("Matrix sync cache exceeds SQLite chunk limit");
}
chunks.push(chunk);
};
let current = "";
let currentBytes = 0;
for (const char of value) {
const charBytes = Buffer.byteLength(char, "utf8");
if (current && currentBytes + charBytes > SYNC_CACHE_CHUNK_BYTES) {
pushChunk(current);
current = "";
currentBytes = 0;
}
current += char;
currentBytes += charBytes;
}
if (current) {
pushChunk(current);
}
return chunks;
}
function buildSyncCacheRows(
stateKey: string,
payload: PersistedMatrixSyncStore,
): {
meta: { key: string; value: MatrixSyncCacheMeta };
chunks: { key: string; value: MatrixSyncCacheChunk }[];
nextChunkKeys: Set<string>;
} {
const generation = randomUUID().replaceAll("-", "");
const syncJson = payload.savedSync ? JSON.stringify(payload.savedSync) : "";
const chunkValues = syncJson ? chunkSyncCacheJson(syncJson) : [];
const chunks = chunkValues.map((data, index) => ({
key: chunkKey(stateKey, generation, index),
value: {
kind: "sync-chunk" as const,
index,
data,
},
}));
return {
chunks,
nextChunkKeys: new Set(chunks.map((chunk) => chunk.key)),
meta: {
key: metaKey(stateKey),
value: {
kind: "meta",
version: STORE_VERSION,
generation,
chunkCount: chunks.length,
...(syncJson ? { syncDigest: digestText(syncJson) } : {}),
...(payload.clientOptions ? { clientOptions: payload.clientOptions } : {}),
cleanShutdown: payload.cleanShutdown === true,
},
},
};
}
export async function readLegacyMatrixSyncCacheState(
storageRootDir: string,
): Promise<PersistedMatrixSyncStore | null> {
try {
const raw = await fs.readFile(resolveLegacySyncCachePath(storageRootDir), "utf8");
const persisted = normalizeLegacyPersistedStore(JSON.parse(raw));
if (!persisted?.savedSync && !persisted?.clientOptions) {
return null;
}
return persisted;
} catch {
return null;
}
}
export async function hasMatrixSyncCacheStateInStore(params: {
storageRootDir: string;
store: Pick<PluginStateKeyedStore<MatrixSyncCacheRecord>, "lookup">;
}): Promise<boolean> {
const stateKey = resolveSyncCacheStateKey(params.storageRootDir);
const meta = await params.store.lookup(metaKey(stateKey));
if (!isSyncCacheMeta(meta) || meta.chunkCount <= 0) {
return false;
}
const chunks: string[] = [];
for (let index = 0; index < meta.chunkCount; index += 1) {
const chunk = await params.store.lookup(chunkKey(stateKey, meta.generation, index));
if (!isSyncCacheChunk(chunk) || chunk.index !== index) {
return false;
}
chunks.push(chunk.data);
}
const syncJson = chunks.join("");
if (meta.syncDigest !== digestText(syncJson)) {
return false;
}
try {
return toPersistedSyncData(JSON.parse(syncJson)) !== null;
} catch {
return false;
}
}
export async function writeMatrixSyncCacheStateToStore(params: {
storageRootDir: string;
payload: PersistedMatrixSyncStore;
store: MatrixSyncCacheAsyncStore;
}): Promise<void> {
const stateKey = resolveSyncCacheStateKey(params.storageRootDir);
const rows = buildSyncCacheRows(stateKey, params.payload);
for (const row of rows.chunks) {
await params.store.register(row.key, row.value);
}
await params.store.register(rows.meta.key, rows.meta.value);
for (const row of await params.store.entries()) {
if (row.key.startsWith(chunkKeyPrefix(stateKey)) && !rows.nextChunkKeys.has(row.key)) {
await params.store.delete(row.key);
}
}
}
export function openMatrixSyncCacheStoreOptions(storageRootDir: string) {
return {
namespace: SYNC_CACHE_NAMESPACE,
maxEntries: SYNC_CACHE_MAX_ENTRIES,
env: resolveMatrixSqliteStateEnv({ stateDir: storageRootDir }),
};
}

View File

@@ -5,6 +5,7 @@ import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { resolveMatrixAccountStorageRoot } from "../../storage-paths.js";
import { installMatrixTestRuntime } from "../../test-runtime.js";
import { SqliteBackedMatrixSyncStore } from "./file-sync-store.js";
import {
claimCurrentTokenStorageState,
maybeMigrateLegacyStorage,
@@ -257,6 +258,23 @@ describe("matrix client storage paths", () => {
return legacyRoot;
}
function legacySyncCacheBody(nextBatch = "legacy-token"): string {
return JSON.stringify({
version: 1,
savedSync: {
nextBatch,
accountData: [],
roomsData: {
join: {},
invite: {},
leave: {},
knock: {},
},
},
cleanShutdown: true,
});
}
function writeJson(rootDir: string, filename: string, value: Record<string, unknown>) {
fs.writeFileSync(path.join(rootDir, filename), JSON.stringify(value, null, 2));
}
@@ -385,7 +403,9 @@ describe("matrix client storage paths", () => {
it("falls back to migrating the older flat matrix storage layout", async () => {
const stateDir = setupStateDir();
const storagePaths = resolveDefaultStoragePaths();
const legacyRoot = writeLegacyMatrixStorage(stateDir, { storageBody: '{"legacy":true}' });
const legacyRoot = writeLegacyMatrixStorage(stateDir, {
storageBody: legacySyncCacheBody("legacy-token"),
});
const env = createMigrationEnv(stateDir);
await maybeMigrateLegacyStorage({
@@ -395,8 +415,48 @@ describe("matrix client storage paths", () => {
expectFallbackMigrationSnapshot(env);
expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json"))).toBe(false);
expect(fs.readFileSync(storagePaths.storagePath, "utf8")).toBe('{"legacy":true}');
expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json.migrated"))).toBe(true);
expect(fs.existsSync(storagePaths.storagePath)).toBe(false);
expect(fs.existsSync(storagePaths.cryptoPath)).toBe(true);
const syncStore = new SqliteBackedMatrixSyncStore(storagePaths.rootDir);
expect(syncStore.hasSavedSync()).toBe(true);
await expect(syncStore.getSavedSyncToken()).resolves.toBe("legacy-token");
});
it("migrates the previous account-scoped sync cache into sqlite before startup", async () => {
const stateDir = setupStateDir();
const storagePaths = resolveDefaultStoragePaths();
fs.mkdirSync(storagePaths.rootDir, { recursive: true });
fs.writeFileSync(storagePaths.storagePath, legacySyncCacheBody("account-token"));
const env = createMigrationEnv(stateDir);
await maybeMigrateLegacyStorage({
storagePaths,
env,
});
expectFallbackMigrationSnapshot(env);
expect(fs.existsSync(storagePaths.storagePath)).toBe(false);
expect(fs.existsSync(`${storagePaths.storagePath}.migrated`)).toBe(true);
const syncStore = new SqliteBackedMatrixSyncStore(storagePaths.rootDir);
expect(syncStore.hasSavedSync()).toBe(true);
await expect(syncStore.getSavedSyncToken()).resolves.toBe("account-token");
});
it("ignores unrecognized account-scoped sync cache files without a migration snapshot", async () => {
const stateDir = setupStateDir();
const storagePaths = resolveDefaultStoragePaths();
fs.mkdirSync(storagePaths.rootDir, { recursive: true });
fs.writeFileSync(storagePaths.storagePath, '{"new":true}');
const env = createMigrationEnv(stateDir);
await maybeMigrateLegacyStorage({
storagePaths,
env,
});
expect(maybeCreateMatrixMigrationSnapshotMock).not.toHaveBeenCalled();
expect(fs.readFileSync(storagePaths.storagePath, "utf8")).toBe('{"new":true}');
});
it("continues migrating whichever legacy artifact is still missing", async () => {
@@ -414,6 +474,7 @@ describe("matrix client storage paths", () => {
expectFallbackMigrationSnapshot(env);
expect(fs.readFileSync(storagePaths.storagePath, "utf8")).toBe('{"new":true}');
expect(fs.existsSync(path.join(legacyRoot, "bot-storage.json"))).toBe(false);
expect(fs.existsSync(path.join(legacyRoot, "crypto"))).toBe(false);
expect(fs.existsSync(storagePaths.cryptoPath)).toBe(true);
});
@@ -565,6 +626,41 @@ describe("matrix client storage paths", () => {
expect(rotatedStoragePaths.storagePath).toBe(oldStoragePaths.storagePath);
});
it("prefers claimed current-token state over an empty new-token metadata root", () => {
const stateDir = setupStateDir();
const oldStoragePaths = seedCanonicalStorageRoot({
stateDir,
accessToken: "secret-token-old",
storageMeta: {
homeserver: defaultStorageAuth.homeserver,
userId: defaultStorageAuth.userId,
accountId: "default",
accessTokenHash: resolveDefaultStoragePaths({ accessToken: "secret-token-old" }).tokenHash,
currentTokenStateClaimed: true,
deviceId: "DEVICE123",
},
});
seedCanonicalStorageRoot({
stateDir,
accessToken: "secret-token-new",
storageMeta: {
homeserver: defaultStorageAuth.homeserver,
userId: defaultStorageAuth.userId,
accountId: "default",
accessTokenHash: resolveDefaultStoragePaths({ accessToken: "secret-token-new" }).tokenHash,
deviceId: "DEVICE123",
},
});
const rotatedStoragePaths = resolveDefaultStoragePaths({
accessToken: "secret-token-new",
deviceId: "DEVICE123",
});
expect(rotatedStoragePaths.rootDir).toBe(oldStoragePaths.rootDir);
expect(rotatedStoragePaths.tokenHash).toBe(oldStoragePaths.tokenHash);
});
it("does not reuse a populated older token-hash root while deviceId is unknown", () => {
const stateDir = setupStateDir();
const oldStoragePaths = seedExistingStorageRoot({

View File

@@ -30,6 +30,11 @@ type LegacyMoveRecord = {
label: string;
};
type LegacyArchiveRecord = {
sourcePath: string;
label: string;
};
type StoredRootMetadata = {
homeserver?: string;
userId?: string;
@@ -41,12 +46,12 @@ type StoredRootMetadata = {
};
function resolveLegacyStoragePaths(env: NodeJS.ProcessEnv = process.env): {
rootDir: string;
storagePath: string;
cryptoPath: string;
} {
const stateDir = getMatrixRuntime().state.resolveStateDir(env, os.homedir);
const legacy = resolveMatrixLegacyFlatStoragePaths(stateDir);
return { storagePath: legacy.storagePath, cryptoPath: legacy.cryptoPath };
return resolveMatrixLegacyFlatStoragePaths(stateDir);
}
function assertLegacyMigrationAccountSelection(params: { accountKey: string }): void {
@@ -71,7 +76,7 @@ function assertLegacyMigrationAccountSelection(params: { accountKey: string }):
function scoreStorageRoot(rootDir: string): number {
let score = 0;
if (fs.existsSync(path.join(rootDir, "bot-storage.json"))) {
if (readStoredRootMetadata(rootDir).currentTokenStateClaimed === true) {
score += 8;
}
if (fs.existsSync(path.join(rootDir, "crypto"))) {
@@ -336,23 +341,33 @@ export async function maybeMigrateLegacyStorage(params: {
env?: NodeJS.ProcessEnv;
}): Promise<void> {
const legacy = resolveLegacyStoragePaths(params.env);
const hasLegacyStorage = fs.existsSync(legacy.storagePath);
const hasFlatLegacyStorageFile = fs.existsSync(legacy.storagePath);
const hasAccountScopedLegacyStorageFile = fs.existsSync(params.storagePaths.storagePath);
const syncCache =
hasFlatLegacyStorageFile || hasAccountScopedLegacyStorageFile
? await import("./file-sync-store.js")
: null;
const hasFlatLegacyStorage =
hasFlatLegacyStorageFile &&
(await syncCache?.readLegacyMatrixSyncCacheState(legacy.rootDir)) !== null;
const hasAccountScopedLegacyStorage =
hasAccountScopedLegacyStorageFile &&
(await syncCache?.readLegacyMatrixSyncCacheState(params.storagePaths.rootDir)) !== null;
const hasLegacyCrypto = fs.existsSync(legacy.cryptoPath);
if (!hasLegacyStorage && !hasLegacyCrypto) {
if (!hasFlatLegacyStorage && !hasAccountScopedLegacyStorage && !hasLegacyCrypto) {
return;
}
const hasTargetStorage = fs.existsSync(params.storagePaths.storagePath);
const hasTargetCrypto = fs.existsSync(params.storagePaths.cryptoPath);
// Continue partial migrations one artifact at a time; only skip items whose targets already exist.
const shouldMigrateStorage = hasLegacyStorage && !hasTargetStorage;
const shouldMigrateCrypto = hasLegacyCrypto && !hasTargetCrypto;
if (!shouldMigrateStorage && !shouldMigrateCrypto) {
if (!hasFlatLegacyStorage && !hasAccountScopedLegacyStorage && !shouldMigrateCrypto) {
return;
}
assertLegacyMigrationAccountSelection({
accountKey: params.storagePaths.accountKey,
});
if (hasFlatLegacyStorage || hasLegacyCrypto) {
assertLegacyMigrationAccountSelection({
accountKey: params.storagePaths.accountKey,
});
}
const logger = getMatrixRuntime().logging.getChildLogger({ module: "matrix-storage" });
const { maybeCreateMatrixMigrationSnapshot } = await import("./migration-snapshot.runtime.js");
@@ -363,19 +378,28 @@ export async function maybeMigrateLegacyStorage(params: {
});
fs.mkdirSync(params.storagePaths.rootDir, { recursive: true });
const moved: LegacyMoveRecord[] = [];
const pendingArchives: LegacyArchiveRecord[] = [];
const skippedExistingTargets: string[] = [];
try {
if (shouldMigrateStorage) {
moveLegacyStoragePathOrThrow({
sourcePath: legacy.storagePath,
targetPath: params.storagePaths.storagePath,
label: "sync store",
if (hasAccountScopedLegacyStorage) {
await migrateLegacySyncCacheToSqlite({
sourceRootDir: params.storagePaths.rootDir,
sourcePath: params.storagePaths.storagePath,
targetRootDir: params.storagePaths.rootDir,
label: "account sync cache",
moved,
pendingArchives,
});
}
if (hasFlatLegacyStorage) {
await migrateLegacySyncCacheToSqlite({
sourceRootDir: legacy.rootDir,
sourcePath: legacy.storagePath,
targetRootDir: params.storagePaths.rootDir,
label: "flat sync cache",
moved,
pendingArchives,
});
} else if (hasLegacyStorage) {
skippedExistingTargets.push(
`- sync store remains at ${legacy.storagePath} because ${params.storagePaths.storagePath} already exists`,
);
}
if (shouldMigrateCrypto) {
moveLegacyStoragePathOrThrow({
@@ -398,6 +422,12 @@ export async function maybeMigrateLegacyStorage(params: {
{ cause: err },
);
}
for (const archive of pendingArchives) {
archiveLegacyStoragePath({
...archive,
skippedExistingTargets,
});
}
if (moved.length > 0) {
logger.info(
`matrix: migrated legacy client storage into ${params.storagePaths.rootDir}\n${moved
@@ -412,6 +442,63 @@ export async function maybeMigrateLegacyStorage(params: {
}
}
async function migrateLegacySyncCacheToSqlite(params: {
sourceRootDir: string;
sourcePath: string;
targetRootDir: string;
label: string;
moved: LegacyMoveRecord[];
pendingArchives: LegacyArchiveRecord[];
}): Promise<void> {
const syncCache = await import("./file-sync-store.js");
const persisted = await syncCache.readLegacyMatrixSyncCacheState(params.sourceRootDir);
if (!persisted) {
return;
}
const store = getMatrixRuntime().state.openKeyedStore<
import("./file-sync-store.js").MatrixSyncCacheRecord
>(syncCache.openMatrixSyncCacheStoreOptions(params.targetRootDir));
if (
!(await syncCache.hasMatrixSyncCacheStateInStore({
storageRootDir: params.targetRootDir,
store,
}))
) {
await syncCache.writeMatrixSyncCacheStateToStore({
storageRootDir: params.targetRootDir,
payload: persisted,
store,
});
claimCurrentTokenStorageState({
rootDir: params.targetRootDir,
});
params.moved.push({
sourcePath: params.sourcePath,
targetPath: `${params.targetRootDir} SQLite sync cache`,
label: params.label,
});
}
params.pendingArchives.push({
sourcePath: params.sourcePath,
label: params.label,
});
}
function archiveLegacyStoragePath(params: {
sourcePath: string;
label: string;
skippedExistingTargets: string[];
}): void {
const archivedLegacyStoragePath = `${params.sourcePath}.migrated`;
if (fs.existsSync(archivedLegacyStoragePath)) {
params.skippedExistingTargets.push(
`- ${params.label} remains at ${params.sourcePath} because ${archivedLegacyStoragePath} already exists`,
);
return;
}
fs.renameSync(params.sourcePath, archivedLegacyStoragePath);
}
function moveLegacyStoragePathOrThrow(params: {
sourcePath: string;
targetPath: string;

View File

@@ -4,7 +4,9 @@ import { EventEmitter } from "node:events";
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { resetPluginStateStoreForTests } from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { installMatrixTestRuntime } from "../test-runtime.js";
function requestUrl(input: RequestInfo | URL | undefined): string {
if (!input) {
@@ -301,6 +303,8 @@ const { MatrixClient } = await import("./sdk.js");
describe("MatrixClient request hardening", () => {
beforeEach(() => {
resetPluginStateStoreForTests();
installMatrixTestRuntime();
matrixJsClient = createMatrixJsClientStub();
lastCreateClientOpts = null;
vi.useRealTimers();
@@ -312,6 +316,7 @@ describe("MatrixClient request hardening", () => {
vi.useRealTimers();
vi.unstubAllGlobals();
clearTestUndiciRuntimeDepsOverride();
resetPluginStateStoreForTests();
});
it("blocks absolute endpoints unless explicitly allowed", async () => {
@@ -659,11 +664,10 @@ describe("MatrixClient request hardening", () => {
it("wires the sync store into the SDK and flushes it on shutdown", async () => {
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sdk-store-"));
const storagePath = path.join(tempDir, "bot-storage.json");
try {
const client = new MatrixClient("https://matrix.example.org", "token", {
storagePath,
storageRootDir: tempDir,
});
const store = lastCreateClientOpts?.store as { flush: () => Promise<void> } | undefined;

View File

@@ -21,7 +21,7 @@ import {
} from "openclaw/plugin-sdk/string-coerce-runtime";
import type { SsrFPolicy } from "../runtime-api.js";
import { resolveMatrixRoomKeyBackupReadinessError } from "./backup-health.js";
import { FileBackedMatrixSyncStore } from "./client/file-sync-store.js";
import { SqliteBackedMatrixSyncStore } from "./client/file-sync-store.js";
import { createMatrixJsSdkClientLogger } from "./client/logging.js";
import {
formatMatrixErrorMessage,
@@ -329,7 +329,7 @@ export class MatrixClient {
private readonly syncFilter?: IFilterDefinition;
private readonly encryptionEnabled: boolean;
private readonly password?: string;
private readonly syncStore?: FileBackedMatrixSyncStore;
private readonly syncStore?: SqliteBackedMatrixSyncStore;
private readonly idbSnapshotPath?: string;
private readonly cryptoDatabasePrefix?: string;
private bridgeRegistered = false;
@@ -370,7 +370,7 @@ export class MatrixClient {
encryption?: boolean;
initialSyncLimit?: number;
syncFilter?: IFilterDefinition;
storagePath?: string;
storageRootDir?: string;
recoveryKeyPath?: string;
idbSnapshotPath?: string;
cryptoDatabasePrefix?: string;
@@ -390,7 +390,9 @@ export class MatrixClient {
this.syncFilter = opts.syncFilter;
this.encryptionEnabled = opts.encryption === true;
this.password = opts.password;
this.syncStore = opts.storagePath ? new FileBackedMatrixSyncStore(opts.storagePath) : undefined;
this.syncStore = opts.storageRootDir
? new SqliteBackedMatrixSyncStore(opts.storageRootDir)
: undefined;
this.idbSnapshotPath = opts.idbSnapshotPath;
this.cryptoDatabasePrefix = opts.cryptoDatabasePrefix;
this.selfUserId = opts.userId?.trim() || null;

View File

@@ -3,6 +3,11 @@ import {
implicitMentionKindWhen,
resolveInboundMentionDecision,
} from "openclaw/plugin-sdk/channel-mention-gating";
import type { OpenKeyedStoreOptions } from "openclaw/plugin-sdk/plugin-state-runtime";
import {
createPluginStateKeyedStoreForTests,
createPluginStateSyncKeyedStoreForTests,
} from "openclaw/plugin-sdk/plugin-state-test-runtime";
import { vi } from "vitest";
import type { PluginRuntime } from "./runtime-api.js";
import { setMatrixRuntime } from "./runtime.js";
@@ -18,7 +23,10 @@ type MatrixRuntimeStub = {
config: Pick<PluginRuntime["config"], "current" | "mutateConfigFile" | "replaceConfigFile">;
channel?: PluginRuntime["channel"];
logging?: PluginRuntime["logging"];
state: Pick<NonNullable<PluginRuntime["state"]>, "resolveStateDir">;
state: Pick<
NonNullable<PluginRuntime["state"]>,
"openKeyedStore" | "openSyncKeyedStore" | "resolveStateDir"
>;
};
function createMatrixRuntimeMediaMock(
@@ -47,10 +55,17 @@ function createMatrixRuntimeMediaMock(
}
export function installMatrixTestRuntime(options: MatrixTestRuntimeOptions = {}): void {
const osHomedirForTest = () => "/tmp";
const defaultStateDirResolver: NonNullable<PluginRuntime["state"]>["resolveStateDir"] = (
_env,
homeDir,
) => options.stateDir ?? (homeDir ?? (() => "/tmp"))();
const resolvePluginStateEnv = (storeOptions: OpenKeyedStoreOptions): NodeJS.ProcessEnv => ({
...(storeOptions.env ?? process.env),
OPENCLAW_STATE_DIR:
storeOptions.env?.OPENCLAW_STATE_DIR?.trim() ||
defaultStateDirResolver(storeOptions.env, osHomedirForTest),
});
const getRuntimeConfig = () => options.cfg ?? {};
const logging: PluginRuntime["logging"] | undefined = options.logging
? ({
@@ -74,6 +89,16 @@ export function installMatrixTestRuntime(options: MatrixTestRuntimeOptions = {})
...(logging ? { logging } : {}),
state: {
resolveStateDir: defaultStateDirResolver,
openKeyedStore: (<T>(storeOptions: OpenKeyedStoreOptions) =>
createPluginStateKeyedStoreForTests<T>("matrix", {
...storeOptions,
env: resolvePluginStateEnv(storeOptions),
})) as PluginRuntime["state"]["openKeyedStore"],
openSyncKeyedStore: (<T>(storeOptions: OpenKeyedStoreOptions) =>
createPluginStateSyncKeyedStoreForTests<T>("matrix", {
...storeOptions,
env: resolvePluginStateEnv(storeOptions),
})) as PluginRuntime["state"]["openSyncKeyedStore"],
},
};

View File

@@ -33,7 +33,10 @@ import {
isMatrixQaExactMarkerReply,
type MatrixQaScenarioContext,
} from "./scenario-runtime-shared.js";
import { waitForMatrixSyncStoreWithCursor } from "./scenario-runtime-state-files.js";
import {
deleteMatrixSyncStoreCursor,
waitForMatrixSyncStoreWithCursor,
} from "./scenario-runtime-state-files.js";
import type { MatrixQaScenarioExecution } from "./scenario-types.js";
type MatrixQaCliRuntime = Awaited<ReturnType<typeof createMatrixQaOpenClawCliRuntime>>;
@@ -1417,7 +1420,7 @@ export async function runMatrixQaE2eeSyncStateLossCryptoIntactScenario(
});
await context.restartGatewayAfterStateMutation(
async () => {
await rm(syncStore.pathname, { force: true });
await deleteMatrixSyncStoreCursor(syncStore);
},
{
timeoutMs: context.timeoutMs,

View File

@@ -374,6 +374,8 @@ export async function runStaleSyncReplayDedupeScenario(context: MatrixQaScenario
await rewriteMatrixSyncStoreCursor({
cursor: staleCursor,
pathname: syncStore.pathname,
source: syncStore.source,
stateKey: syncStore.stateKey,
});
});

View File

@@ -1,5 +1,5 @@
// Qa Matrix plugin module implements scenario runtime state files behavior.
import { createHash } from "node:crypto";
import { createHash, randomUUID } from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import { setTimeout as sleep } from "node:timers/promises";
@@ -9,8 +9,20 @@ import type { MatrixQaScenarioContext } from "./scenario-runtime-shared.js";
const MATRIX_SYNC_STORE_FILENAME = "bot-storage.json";
const MATRIX_INBOUND_DEDUPE_FILENAME = "inbound-dedupe.json";
const MATRIX_PLUGIN_ID = "matrix";
const MATRIX_SYNC_CACHE_NAMESPACE = "sync-cache";
const MATRIX_INBOUND_DEDUPE_NAMESPACE = "inbound-dedupe";
const MATRIX_STATE_POLL_INTERVAL_MS = 100;
const MATRIX_SYNC_CACHE_MAX_ENTRIES = 20_000;
const MATRIX_SYNC_CACHE_MAX_CHUNKS = Math.floor((MATRIX_SYNC_CACHE_MAX_ENTRIES - 1) / 2);
// PluginState serializes this string inside a row object; 24KB leaves room for JSON escaping.
const MATRIX_SYNC_CACHE_CHUNK_BYTES = 24_000;
type MatrixSyncStoreCursor = {
cursor: string;
pathname: string;
source: "json" | "sqlite";
stateKey?: string;
};
async function readJsonFile(pathname: string): Promise<unknown> {
return JSON.parse(await fs.readFile(pathname, "utf8")) as unknown;
@@ -80,6 +92,12 @@ function writePersistedMatrixSyncCursor(parsed: unknown, cursor: string): unknow
},
};
}
if (typeof parsed.nextBatch === "string") {
return {
...parsed,
nextBatch: cursor,
};
}
if (typeof parsed.next_batch === "string") {
return {
...parsed,
@@ -93,11 +111,290 @@ async function readMatrixSyncStoreCursor(pathname: string): Promise<string | nul
return readPersistedMatrixSyncCursor(await readJsonFile(pathname));
}
export async function rewriteMatrixSyncStoreCursor(params: { cursor: string; pathname: string }) {
function parsePluginStateJson(raw: unknown): unknown {
if (typeof raw !== "string") {
return undefined;
}
try {
return JSON.parse(raw) as unknown;
} catch {
return undefined;
}
}
function readMatrixSyncCacheCursorFromRows(
rows: Array<{ entryKey?: unknown; valueJson?: unknown }>,
): MatrixSyncStoreCursor[] {
const rowsByKey = new Map<string, unknown>();
for (const row of rows) {
if (typeof row.entryKey === "string") {
rowsByKey.set(row.entryKey, parsePluginStateJson(row.valueJson));
}
}
const cursors: MatrixSyncStoreCursor[] = [];
for (const [entryKey, rawMeta] of rowsByKey) {
if (!entryKey.endsWith(":meta") || !isRecord(rawMeta) || rawMeta.kind !== "meta") {
continue;
}
const stateKey = entryKey.slice(0, -":meta".length);
const generation = typeof rawMeta.generation === "string" ? rawMeta.generation : "";
const chunkCount =
typeof rawMeta.chunkCount === "number" &&
Number.isSafeInteger(rawMeta.chunkCount) &&
rawMeta.chunkCount <= MATRIX_SYNC_CACHE_MAX_CHUNKS
? rawMeta.chunkCount
: 0;
const chunks: string[] = [];
for (let index = 0; index < chunkCount; index += 1) {
const chunk = rowsByKey.get(`${stateKey}:sync:${generation}:${index}`);
if (!isRecord(chunk) || typeof chunk.data !== "string") {
chunks.length = 0;
break;
}
chunks.push(chunk.data);
}
if (chunks.length === 0) {
continue;
}
try {
const cursor = readPersistedMatrixSyncCursor({
savedSync: JSON.parse(chunks.join("")) as unknown,
});
if (cursor) {
cursors.push({ cursor, pathname: "", source: "sqlite", stateKey });
}
} catch {
continue;
}
}
return cursors;
}
async function readMatrixSyncCacheCursorsFromSqlite(params: {
accountId?: string;
context: MatrixQaScenarioContext;
stateDir: string;
userId?: string;
}): Promise<MatrixSyncStoreCursor[]> {
const databasePaths = await findFilesByName({
filename: "openclaw.sqlite",
rootDir: params.stateDir,
maxDepth: 10,
});
const cursors: Array<MatrixSyncStoreCursor & { score: number }> = [];
try {
const sqlite = await import("node:sqlite");
for (const databasePath of databasePaths) {
try {
const db = new sqlite.DatabaseSync(databasePath, { readOnly: true });
try {
const rows = db
.prepare(
`SELECT entry_key AS entryKey, value_json AS valueJson
FROM plugin_state_entries
WHERE plugin_id = ?
AND namespace = ?
AND (expires_at IS NULL OR expires_at > ?)`,
)
.all(MATRIX_PLUGIN_ID, MATRIX_SYNC_CACHE_NAMESPACE, Date.now()) as Array<{
entryKey?: unknown;
valueJson?: unknown;
}>;
for (const cursor of readMatrixSyncCacheCursorFromRows(rows)) {
const storageRootDir = path.dirname(path.dirname(databasePath));
cursors.push({
...cursor,
pathname: databasePath,
score: await scoreMatrixStateFile({
context: params.context,
pathname: path.join(storageRootDir, MATRIX_SYNC_STORE_FILENAME),
...(params.accountId ? { accountId: params.accountId } : {}),
...(params.userId ? { userId: params.userId } : {}),
}),
});
}
} finally {
db.close();
}
} catch {
continue;
}
}
} catch {
return [];
}
return cursors
.toSorted((a, b) => b.score - a.score || a.pathname.localeCompare(b.pathname))
.map(({ score: _score, ...cursor }) => cursor);
}
function chunkMatrixSyncCacheJson(value: string): string[] {
const chunks: string[] = [];
let current = "";
let currentBytes = 0;
for (const char of value) {
const charBytes = Buffer.byteLength(char, "utf8");
if (current && currentBytes + charBytes > MATRIX_SYNC_CACHE_CHUNK_BYTES) {
chunks.push(current);
current = "";
currentBytes = 0;
}
current += char;
currentBytes += charBytes;
}
if (current) {
chunks.push(current);
}
return chunks;
}
function digestText(value: string): string {
return createHash("sha256").update(value, "utf8").digest("hex");
}
async function rewriteMatrixSyncCacheRows(params: {
cursor: string;
pathname: string;
stateKey: string;
}) {
const sqlite = await import("node:sqlite");
const db = new sqlite.DatabaseSync(params.pathname);
try {
const rows = db
.prepare(
`SELECT entry_key AS entryKey, value_json AS valueJson
FROM plugin_state_entries
WHERE plugin_id = ?
AND namespace = ?
AND entry_key LIKE ?`,
)
.all(MATRIX_PLUGIN_ID, MATRIX_SYNC_CACHE_NAMESPACE, `${params.stateKey}:%`) as Array<{
entryKey?: unknown;
valueJson?: unknown;
}>;
const meta = parsePluginStateJson(
rows.find((row) => row.entryKey === `${params.stateKey}:meta`)?.valueJson,
);
if (!isRecord(meta)) {
throw new Error("Matrix sync cache metadata row was missing");
}
const cursorEntry = readMatrixSyncCacheCursorFromRows(rows)[0];
if (!cursorEntry) {
throw new Error("Matrix sync cache did not contain a persisted sync cursor");
}
const generation = typeof meta.generation === "string" ? meta.generation : "";
const chunkCount =
typeof meta.chunkCount === "number" &&
Number.isSafeInteger(meta.chunkCount) &&
meta.chunkCount <= MATRIX_SYNC_CACHE_MAX_CHUNKS
? meta.chunkCount
: 0;
const chunks: string[] = [];
for (let index = 0; index < chunkCount; index += 1) {
const chunk = parsePluginStateJson(
rows.find((row) => row.entryKey === `${params.stateKey}:sync:${generation}:${index}`)
?.valueJson,
);
if (!isRecord(chunk) || typeof chunk.data !== "string") {
throw new Error("Matrix sync cache chunk row was missing");
}
chunks.push(chunk.data);
}
const syncJson = JSON.stringify(
writePersistedMatrixSyncCursor(JSON.parse(chunks.join("")), params.cursor),
);
const nextGeneration = randomUUID().replaceAll("-", "");
const nextChunks = chunkMatrixSyncCacheJson(syncJson);
const now = Date.now();
const upsert = db.prepare(
`INSERT INTO plugin_state_entries (plugin_id, namespace, entry_key, value_json, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, NULL)
ON CONFLICT(plugin_id, namespace, entry_key)
DO UPDATE SET value_json = excluded.value_json, created_at = excluded.created_at, expires_at = NULL`,
);
for (const [index, data] of nextChunks.entries()) {
upsert.run(
MATRIX_PLUGIN_ID,
MATRIX_SYNC_CACHE_NAMESPACE,
`${params.stateKey}:sync:${nextGeneration}:${index}`,
JSON.stringify({ kind: "sync-chunk", index, data }),
now,
);
}
upsert.run(
MATRIX_PLUGIN_ID,
MATRIX_SYNC_CACHE_NAMESPACE,
`${params.stateKey}:meta`,
JSON.stringify({
...meta,
generation: nextGeneration,
chunkCount: nextChunks.length,
syncDigest: digestText(syncJson),
}),
now,
);
db.prepare(
`DELETE FROM plugin_state_entries
WHERE plugin_id = ?
AND namespace = ?
AND entry_key LIKE ?
AND entry_key NOT LIKE ?`,
).run(
MATRIX_PLUGIN_ID,
MATRIX_SYNC_CACHE_NAMESPACE,
`${params.stateKey}:sync:%`,
`${params.stateKey}:sync:${nextGeneration}:%`,
);
} finally {
db.close();
}
}
export async function rewriteMatrixSyncStoreCursor(params: {
cursor: string;
pathname: string;
source?: "json" | "sqlite";
stateKey?: string;
}) {
if (params.source === "sqlite" || params.stateKey) {
if (!params.stateKey) {
throw new Error("Matrix sync cache rewrite requires a state key");
}
await rewriteMatrixSyncCacheRows({
cursor: params.cursor,
pathname: params.pathname,
stateKey: params.stateKey,
});
return;
}
const parsed = await readJsonFile(params.pathname);
await writeJsonFile(params.pathname, writePersistedMatrixSyncCursor(parsed, params.cursor));
}
export async function deleteMatrixSyncStoreCursor(params: MatrixSyncStoreCursor) {
if (params.source !== "sqlite" || !params.stateKey) {
await fs.rm(params.pathname, { force: true });
return;
}
const sqlite = await import("node:sqlite");
const db = new sqlite.DatabaseSync(params.pathname);
try {
db.prepare(
`DELETE FROM plugin_state_entries
WHERE plugin_id = ?
AND namespace = ?
AND (entry_key = ? OR entry_key LIKE ?)`,
).run(
MATRIX_PLUGIN_ID,
MATRIX_SYNC_CACHE_NAMESPACE,
`${params.stateKey}:meta`,
`${params.stateKey}:sync:%`,
);
} finally {
db.close();
}
}
async function scoreMatrixStateFile(params: {
accountId?: string;
context: MatrixQaScenarioContext;
@@ -162,6 +459,15 @@ export async function waitForMatrixSyncStoreWithCursor(params: {
const startedAt = Date.now();
let lastPath: string | null = null;
while (Date.now() - startedAt < params.timeoutMs) {
const sqliteCursors = await readMatrixSyncCacheCursorsFromSqlite({
context: params.context,
stateDir: params.stateDir,
...(params.accountId ? { accountId: params.accountId } : {}),
...(params.userId ? { userId: params.userId } : {}),
});
if (sqliteCursors.length > 0) {
return sqliteCursors[0];
}
const pathname = await resolveBestMatrixStateFile({
context: params.context,
filename: MATRIX_SYNC_STORE_FILENAME,
@@ -173,7 +479,7 @@ export async function waitForMatrixSyncStoreWithCursor(params: {
if (pathname) {
const cursor = await readMatrixSyncStoreCursor(pathname);
if (cursor) {
return { cursor, pathname };
return { cursor, pathname, source: "json" as const };
}
}
await sleep(MATRIX_STATE_POLL_INTERVAL_MS);

View File

@@ -17,6 +17,12 @@ import type {
MatrixVerificationSummary,
MessageEventContent,
} from "@openclaw/matrix/test-api.js";
import type {
OpenKeyedStoreOptions,
PluginStateEntry,
PluginStateKeyedStore,
PluginStateSyncKeyedStore,
} from "openclaw/plugin-sdk/plugin-state-runtime";
import { buildMatrixQaMessageContent } from "./client.js";
import { findMatrixQaObservedEventMatch, normalizeMatrixQaObservedEvent } from "./events.js";
import type { MatrixQaObservedEvent } from "./events.js";
@@ -44,6 +50,144 @@ const MATRIX_QA_E2EE_SYNC_FILTER = {
},
};
type MatrixQaPluginStateValue = {
createdAt: number;
expiresAt?: number;
value: unknown;
};
const matrixQaPluginStateNamespaces = new Map<string, Map<string, MatrixQaPluginStateValue>>();
function resolveMatrixQaPluginStateNamespaceKey(options: OpenKeyedStoreOptions): string {
return `${options.env?.OPENCLAW_STATE_DIR ?? ""}\0${options.namespace}`;
}
function resolveMatrixQaPluginStateRows(
options: OpenKeyedStoreOptions,
): Map<string, MatrixQaPluginStateValue> {
const namespaceKey = resolveMatrixQaPluginStateNamespaceKey(options);
let rows = matrixQaPluginStateNamespaces.get(namespaceKey);
if (!rows) {
rows = new Map();
matrixQaPluginStateNamespaces.set(namespaceKey, rows);
}
return rows;
}
function pruneMatrixQaExpiredPluginState(rows: Map<string, MatrixQaPluginStateValue>): void {
const now = Date.now();
for (const [key, row] of rows) {
if (row.expiresAt !== undefined && row.expiresAt <= now) {
rows.delete(key);
}
}
}
function enforceMatrixQaPluginStateLimit(
rows: Map<string, MatrixQaPluginStateValue>,
maxEntries: number,
nextKey: string,
): void {
if (rows.has(nextKey)) {
return;
}
while (rows.size >= maxEntries) {
const oldest = [...rows.entries()].toSorted(
(a, b) => a[1].createdAt - b[1].createdAt || a[0].localeCompare(b[0]),
)[0]?.[0];
if (!oldest) {
return;
}
rows.delete(oldest);
}
}
function createMatrixQaPluginStateSyncKeyedStore<T>(
options: OpenKeyedStoreOptions,
): PluginStateSyncKeyedStore<T> {
const rows = resolveMatrixQaPluginStateRows(options);
const resolveExpiresAt = (ttlMs?: number) => {
const effectiveTtlMs = ttlMs ?? options.defaultTtlMs;
return effectiveTtlMs === undefined ? undefined : Date.now() + effectiveTtlMs;
};
const register = (key: string, value: T, opts?: { ttlMs?: number }) => {
pruneMatrixQaExpiredPluginState(rows);
enforceMatrixQaPluginStateLimit(rows, options.maxEntries, key);
rows.set(key, {
createdAt: rows.get(key)?.createdAt ?? Date.now(),
expiresAt: resolveExpiresAt(opts?.ttlMs),
value,
});
};
return {
register,
registerIfAbsent(key, value, opts) {
pruneMatrixQaExpiredPluginState(rows);
if (rows.has(key)) {
return false;
}
register(key, value, opts);
return true;
},
update(key, updateValue, opts) {
pruneMatrixQaExpiredPluginState(rows);
const next = updateValue(rows.get(key)?.value as T | undefined);
if (next === undefined) {
return false;
}
register(key, next, opts);
return true;
},
lookup(key) {
pruneMatrixQaExpiredPluginState(rows);
return rows.get(key)?.value as T | undefined;
},
consume(key) {
pruneMatrixQaExpiredPluginState(rows);
const value = rows.get(key)?.value as T | undefined;
rows.delete(key);
return value;
},
delete(key) {
pruneMatrixQaExpiredPluginState(rows);
return rows.delete(key);
},
entries() {
pruneMatrixQaExpiredPluginState(rows);
return [...rows.entries()].map(([key, row]): PluginStateEntry<T> => {
const entry: PluginStateEntry<T> = {
key,
value: row.value as T,
createdAt: row.createdAt,
};
if (row.expiresAt !== undefined) {
entry.expiresAt = row.expiresAt;
}
return entry;
});
},
clear() {
rows.clear();
},
};
}
function createMatrixQaPluginStateKeyedStore<T>(
options: OpenKeyedStoreOptions,
): PluginStateKeyedStore<T> {
const syncStore = createMatrixQaPluginStateSyncKeyedStore<T>(options);
return {
register: async (...args) => syncStore.register(...args),
registerIfAbsent: async (...args) => syncStore.registerIfAbsent(...args),
update: async (...args) => syncStore.update?.(...args) ?? false,
lookup: async (...args) => syncStore.lookup(...args),
consume: async (...args) => syncStore.consume(...args),
delete: async (...args) => syncStore.delete(...args),
entries: async () => syncStore.entries(),
clear: async () => syncStore.clear(),
};
}
function shouldRecordMatrixQaObservedEventUpdate(params: {
next: MatrixQaObservedEvent;
previous: MatrixQaObservedEvent | undefined;
@@ -193,6 +337,20 @@ async function createMatrixQaE2eeMatrixClient(params: MatrixQaE2eeClientParams)
outputDir: params.outputDir,
scenarioId: params.scenarioId,
});
runtime.setMatrixRuntime({
config: {
current: () => ({}),
mutateConfigFile: async () => ({}),
replaceConfigFile: async () => ({}),
},
state: {
resolveStateDir: () => params.outputDir,
openKeyedStore: <T>(options: OpenKeyedStoreOptions) =>
createMatrixQaPluginStateKeyedStore<T>(options),
openSyncKeyedStore: <T>(options: OpenKeyedStoreOptions) =>
createMatrixQaPluginStateSyncKeyedStore<T>(options),
},
} as never);
return new runtime.MatrixClient(params.baseUrl, params.accessToken, {
autoBootstrapCrypto: false,
cryptoDatabasePrefix: storage.cryptoDatabasePrefix,
@@ -203,7 +361,7 @@ async function createMatrixQaE2eeMatrixClient(params: MatrixQaE2eeClientParams)
password: params.password,
recoveryKeyPath: storage.recoveryKeyPath,
ssrfPolicy: { allowPrivateNetwork: true },
storagePath: storage.storagePath,
storageRootDir: path.dirname(storage.storagePath),
syncFilter: MATRIX_QA_E2EE_SYNC_FILTER,
userId: params.userId,
});