From 09d8a0c4b02e39b4ac41422221944a7f4db874b8 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Mon, 9 Mar 2026 03:48:31 -0400 Subject: [PATCH] Matrix: register thread bindings before client sync --- .../matrix/src/matrix/monitor/index.test.ts | 208 ++++++++++++++++++ extensions/matrix/src/matrix/monitor/index.ts | 39 ++-- 2 files changed, 229 insertions(+), 18 deletions(-) create mode 100644 extensions/matrix/src/matrix/monitor/index.test.ts diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts new file mode 100644 index 00000000000..fa884a92dde --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -0,0 +1,208 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const hoisted = vi.hoisted(() => { + const callOrder: string[] = []; + const client = { id: "matrix-client" }; + const logger = { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }; + const stopThreadBindingManager = vi.fn(); + return { + callOrder, + client, + logger, + stopThreadBindingManager, + }; +}); + +vi.mock("openclaw/plugin-sdk/matrix", () => ({ + GROUP_POLICY_BLOCKED_LABEL: { + room: "room", + }, + mergeAllowlist: ({ existing, additions }: { existing: string[]; additions: string[] }) => [ + ...existing, + ...additions, + ], + resolveThreadBindingIdleTimeoutMsForChannel: () => 24 * 60 * 60 * 1000, + resolveThreadBindingMaxAgeMsForChannel: () => 0, + resolveAllowlistProviderRuntimeGroupPolicy: () => ({ + groupPolicy: "allowlist", + providerMissingFallbackApplied: false, + }), + resolveDefaultGroupPolicy: () => "allowlist", + summarizeMapping: vi.fn(), + warnMissingProviderGroupPolicyFallbackOnce: vi.fn(), +})); + +vi.mock("../../resolve-targets.js", () => ({ + resolveMatrixTargets: vi.fn(async () => []), +})); + +vi.mock("../../runtime.js", () => ({ + getMatrixRuntime: () => ({ + config: { + loadConfig: () => ({ + channels: { + matrix: {}, + }, + }), + writeConfigFile: vi.fn(), + }, + logging: { + getChildLogger: () => hoisted.logger, + shouldLogVerbose: () => false, + }, + channel: { + mentions: { + buildMentionRegexes: () => [], + }, + text: { + resolveTextChunkLimit: () => 4000, + }, + }, + system: { + formatNativeDependencyHint: () => "", + }, + media: { + loadWebMedia: vi.fn(), + }, + }), +})); + +vi.mock("../accounts.js", () => ({ + resolveMatrixAccount: () => ({ + accountId: "default", + config: { + dm: {}, + }, + }), +})); + +vi.mock("../active-client.js", () => ({ + setActiveMatrixClient: vi.fn(), +})); + +vi.mock("../client.js", () => ({ + isBunRuntime: () => false, + resolveMatrixAuth: vi.fn(async () => ({ + accountId: "default", + homeserver: "https://matrix.example.org", + userId: "@bot:example.org", + accessToken: "token", + initialSyncLimit: 20, + encryption: false, + })), + resolveMatrixAuthContext: vi.fn(() => ({ + accountId: "default", + })), + resolveSharedMatrixClient: vi.fn(async (params: { startClient?: boolean }) => { + if (params.startClient === false) { + hoisted.callOrder.push("prepare-client"); + return hoisted.client; + } + if (!hoisted.callOrder.includes("create-manager")) { + throw new Error("Matrix client started before thread bindings were registered"); + } + hoisted.callOrder.push("start-client"); + return hoisted.client; + }), + stopSharedClientForAccount: vi.fn(), +})); + +vi.mock("../config-update.js", () => ({ + updateMatrixAccountConfig: vi.fn((cfg: unknown) => cfg), +})); + +vi.mock("../device-health.js", () => ({ + summarizeMatrixDeviceHealth: vi.fn(() => ({ + staleOpenClawDevices: [], + })), +})); + +vi.mock("../profile.js", () => ({ + syncMatrixOwnProfile: vi.fn(async () => ({ + displayNameUpdated: false, + avatarUpdated: false, + convertedAvatarFromHttp: false, + resolvedAvatarUrl: undefined, + })), +})); + +vi.mock("../thread-bindings.js", () => ({ + createMatrixThreadBindingManager: vi.fn(async () => { + hoisted.callOrder.push("create-manager"); + return { + accountId: "default", + stop: hoisted.stopThreadBindingManager, + }; + }), +})); + +vi.mock("./allowlist.js", () => ({ + normalizeMatrixUserId: (value: string) => value, +})); + +vi.mock("./auto-join.js", () => ({ + registerMatrixAutoJoin: vi.fn(), +})); + +vi.mock("./direct.js", () => ({ + createDirectRoomTracker: vi.fn(() => ({ + isDirectMessage: vi.fn(async () => false), + })), +})); + +vi.mock("./events.js", () => ({ + registerMatrixMonitorEvents: vi.fn(() => { + hoisted.callOrder.push("register-events"); + }), +})); + +vi.mock("./handler.js", () => ({ + createMatrixRoomMessageHandler: vi.fn(() => vi.fn()), +})); + +vi.mock("./legacy-crypto-restore.js", () => ({ + maybeRestoreLegacyMatrixBackup: vi.fn(), +})); + +vi.mock("./room-info.js", () => ({ + createMatrixRoomInfoResolver: vi.fn(() => ({ + getRoomInfo: vi.fn(async () => ({ + altAliases: [], + })), + getMemberDisplayName: vi.fn(async () => "Bot"), + })), +})); + +vi.mock("./startup-verification.js", () => ({ + ensureMatrixStartupVerification: vi.fn(), +})); + +describe("monitorMatrixProvider", () => { + beforeEach(() => { + vi.resetModules(); + hoisted.callOrder.length = 0; + hoisted.stopThreadBindingManager.mockReset(); + Object.values(hoisted.logger).forEach((mock) => mock.mockReset()); + }); + + it("registers Matrix thread bindings before starting the client", async () => { + const { monitorMatrixProvider } = await import("./index.js"); + const abortController = new AbortController(); + abortController.abort(); + + await monitorMatrixProvider({ abortSignal: abortController.signal }); + + expect(hoisted.callOrder).toEqual([ + "prepare-client", + "create-manager", + "register-events", + "start-client", + ]); + expect(hoisted.stopThreadBindingManager).toHaveBeenCalledTimes(1); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index c8d148e28fb..d19b056164f 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -332,24 +332,6 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi getMemberDisplayName, }); - registerMatrixMonitorEvents({ - client, - auth, - logVerboseMessage, - warnedEncryptedRooms, - warnedCryptoMissingRooms, - logger, - formatNativeDependencyHint: core.system.formatNativeDependencyHint, - onRoomMessage: handleRoomMessage, - }); - - logVerboseMessage("matrix: starting client"); - await resolveSharedMatrixClient({ - cfg, - auth: authWithLimit, - accountId: auth.accountId, - }); - logVerboseMessage("matrix: client started"); const threadBindingManager = await createMatrixThreadBindingManager({ accountId: account.accountId, auth, @@ -363,6 +345,27 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi `matrix: thread bindings ready account=${threadBindingManager.accountId} idleMs=${threadBindingIdleTimeoutMs} maxAgeMs=${threadBindingMaxAgeMs}`, ); + registerMatrixMonitorEvents({ + client, + auth, + logVerboseMessage, + warnedEncryptedRooms, + warnedCryptoMissingRooms, + logger, + formatNativeDependencyHint: core.system.formatNativeDependencyHint, + onRoomMessage: handleRoomMessage, + }); + + // Register Matrix thread bindings before the client starts syncing so threaded + // commands during startup never observe Matrix as "unavailable". + logVerboseMessage("matrix: starting client"); + await resolveSharedMatrixClient({ + cfg, + auth: authWithLimit, + accountId: auth.accountId, + }); + logVerboseMessage("matrix: client started"); + // Shared client is already started via resolveSharedMatrixClient. logger.info(`matrix: logged in as ${auth.userId}`);