mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-28 06:39:35 +00:00
fix: route matrix sync blobs through selected sqlite state
This commit is contained in:
@@ -129,6 +129,29 @@ describe("SqliteBackedMatrixSyncStore", () => {
|
||||
expect(secondStore.hasSavedSyncFromCleanShutdown()).toBe(false);
|
||||
});
|
||||
|
||||
it("routes explicit storage roots through their owning state database", async () => {
|
||||
const tempDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-matrix-sync-store-"));
|
||||
tempDirs.push(tempDir);
|
||||
const storageRoot = path.join(
|
||||
tempDir,
|
||||
"actual-state",
|
||||
"matrix",
|
||||
"accounts",
|
||||
"default",
|
||||
"matrix.example.org__bot",
|
||||
"token-hash",
|
||||
);
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(tempDir, "ambient-state-a"));
|
||||
|
||||
const firstStore = new SqliteBackedMatrixSyncStore(storageRoot);
|
||||
await firstStore.setSyncData(createSyncResponse("owned-state"));
|
||||
await firstStore.flush();
|
||||
|
||||
vi.stubEnv("OPENCLAW_STATE_DIR", path.join(tempDir, "ambient-state-b"));
|
||||
const secondStore = new SqliteBackedMatrixSyncStore(storageRoot);
|
||||
await expect(secondStore.getSavedSyncToken()).resolves.toBe("owned-state");
|
||||
});
|
||||
|
||||
it("claims current-token storage ownership when sync state is persisted", async () => {
|
||||
const storageRoot = createStorageRoot();
|
||||
writeMatrixStorageMetadata(storageRoot, {
|
||||
|
||||
@@ -13,6 +13,7 @@ import { createPluginBlobSyncStore } from "openclaw/plugin-sdk/plugin-state-runt
|
||||
import { isRecord } from "../../record-shared.js";
|
||||
import { createAsyncLock } from "../async-lock.js";
|
||||
import { LogService } from "../sdk/logger.js";
|
||||
import { resolveStateDirFromMatrixStorageRoot } from "./storage-meta-state.js";
|
||||
import { claimCurrentTokenStorageState } from "./storage.js";
|
||||
|
||||
const STORE_VERSION = 1;
|
||||
@@ -34,10 +35,14 @@ type PersistedMatrixSyncStoreMetadata = {
|
||||
payloadBytes: number;
|
||||
};
|
||||
|
||||
const SYNC_STORE = createPluginBlobSyncStore<PersistedMatrixSyncStoreMetadata>("matrix", {
|
||||
namespace: MATRIX_SYNC_STORE_NAMESPACE,
|
||||
maxEntries: 1000,
|
||||
});
|
||||
function createMatrixSyncStore(rootDir: string) {
|
||||
const stateDir = resolveStateDirFromMatrixStorageRoot(rootDir);
|
||||
return createPluginBlobSyncStore<PersistedMatrixSyncStoreMetadata>("matrix", {
|
||||
namespace: MATRIX_SYNC_STORE_NAMESPACE,
|
||||
maxEntries: 1000,
|
||||
...(stateDir ? { env: { ...process.env, OPENCLAW_STATE_DIR: stateDir } } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
function normalizeRoomsData(value: unknown): IRooms | null {
|
||||
if (!isRecord(value)) {
|
||||
@@ -157,14 +162,16 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
|
||||
private dirty = false;
|
||||
private persistTimer: NodeJS.Timeout | null = null;
|
||||
private persistPromise: Promise<void> | null = null;
|
||||
private readonly syncStore: ReturnType<typeof createMatrixSyncStore>;
|
||||
|
||||
constructor(private readonly rootDir: string) {
|
||||
super();
|
||||
this.syncStore = createMatrixSyncStore(rootDir);
|
||||
|
||||
let restoredSavedSync: ISyncData | null = null;
|
||||
let restoredClientOptions: IStoredClientOpts | undefined;
|
||||
let restoredCleanShutdown = false;
|
||||
const persisted = SYNC_STORE.lookup(resolveMatrixSyncStoreKey(this.rootDir));
|
||||
const persisted = this.syncStore.lookup(resolveMatrixSyncStoreKey(this.rootDir));
|
||||
const persistedPayload = persisted
|
||||
? parsePersistedMatrixSyncStore(persisted.blob.toString("utf8"))
|
||||
: null;
|
||||
@@ -247,7 +254,7 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
|
||||
this.savedSync = null;
|
||||
this.savedClientOptions = undefined;
|
||||
this.cleanShutdown = false;
|
||||
SYNC_STORE.delete(resolveMatrixSyncStoreKey(this.rootDir));
|
||||
this.syncStore.delete(resolveMatrixSyncStoreKey(this.rootDir));
|
||||
}
|
||||
|
||||
markCleanShutdown(): void {
|
||||
@@ -296,7 +303,7 @@ export class SqliteBackedMatrixSyncStore extends MemoryStore {
|
||||
const blob = Buffer.from(JSON.stringify(payload));
|
||||
try {
|
||||
await this.persistLock(async () => {
|
||||
SYNC_STORE.register(
|
||||
this.syncStore.register(
|
||||
resolveMatrixSyncStoreKey(this.rootDir),
|
||||
{
|
||||
version: STORE_VERSION,
|
||||
|
||||
@@ -25,7 +25,7 @@ export function resolveMatrixStorageMetaKey(rootDir: string): string {
|
||||
return createHash("sha256").update(path.resolve(rootDir), "utf8").digest("hex").slice(0, 32);
|
||||
}
|
||||
|
||||
function resolveStateDirFromMatrixStorageRoot(rootDir: string): string | undefined {
|
||||
export function resolveStateDirFromMatrixStorageRoot(rootDir: string): string | undefined {
|
||||
const parts = path.resolve(rootDir).split(path.sep);
|
||||
const matrixIndex = parts.lastIndexOf("matrix");
|
||||
if (matrixIndex <= 0) {
|
||||
|
||||
Reference in New Issue
Block a user