From 9b19c0b87f3216d0c15fd8cb3ed9c096e9c96d15 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Wed, 8 Apr 2026 15:19:34 -0400 Subject: [PATCH] Matrix: report startup failures as errors --- .../matrix/src/matrix/monitor/index.test.ts | 54 ++++- extensions/matrix/src/matrix/monitor/index.ts | 189 +++++++++--------- 2 files changed, 153 insertions(+), 90 deletions(-) diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index 887f021a2c0..093a5102441 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -54,6 +54,7 @@ const hoisted = vi.hoisted(() => { flush: vi.fn(async () => undefined), stop: vi.fn(async () => undefined), }; + const createMatrixInboundEventDeduper = vi.fn(async () => inboundDeduper); const client = Object.assign(createEmitter(), { id: "matrix-client", hasPersistedSyncState: vi.fn(() => false), @@ -110,6 +111,7 @@ const hoisted = vi.hoisted(() => { accountConfig, client, createDirectRoomTracker, + createMatrixInboundEventDeduper, createMatrixRoomMessageHandler, getMemberDisplayName, getRoomInfo, @@ -356,7 +358,7 @@ vi.mock("./handler.js", () => ({ })); vi.mock("./inbound-dedupe.js", () => ({ - createMatrixInboundEventDeduper: vi.fn(async () => hoisted.inboundDeduper), + createMatrixInboundEventDeduper: hoisted.createMatrixInboundEventDeduper, })); vi.mock("./legacy-crypto-restore.js", () => ({ @@ -438,6 +440,7 @@ describe("monitorMatrixProvider", () => { hoisted.inboundDeduper.releaseEvent.mockReset(); hoisted.inboundDeduper.flush.mockReset().mockResolvedValue(undefined); hoisted.inboundDeduper.stop.mockReset().mockResolvedValue(undefined); + hoisted.createMatrixInboundEventDeduper.mockReset().mockResolvedValue(hoisted.inboundDeduper); hoisted.backfillMatrixAuthDeviceIdAfterStartup.mockReset().mockResolvedValue(undefined); hoisted.runMatrixStartupMaintenance.mockReset().mockResolvedValue(undefined); hoisted.createMatrixRoomMessageHandler.mockReset().mockReturnValue(vi.fn()); @@ -563,6 +566,55 @@ describe("monitorMatrixProvider", () => { ); }); + it("marks early startup failures as error before the monitor loop starts", async () => { + hoisted.resolveSharedMatrixClient.mockImplementation( + async (params: { startClient?: boolean }) => { + if (params.startClient === false) { + throw new Error("prepare failed"); + } + hoisted.callOrder.push("start-client"); + return hoisted.client; + }, + ); + + await expect( + monitorMatrixProvider({ + setStatus: hoisted.setStatus, + }), + ).rejects.toThrow("prepare failed"); + + expect(hoisted.releaseSharedClientInstance).not.toHaveBeenCalled(); + expect(hoisted.setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + accountId: "default", + connected: false, + healthState: "error", + lastError: "prepare failed", + }), + ); + }); + + it("releases the prepared client when startup fails before later resources exist", async () => { + hoisted.createMatrixInboundEventDeduper.mockRejectedValue(new Error("deduper failed")); + + await expect( + monitorMatrixProvider({ + setStatus: hoisted.setStatus, + }), + ).rejects.toThrow("deduper failed"); + + expect(hoisted.releaseSharedClientInstance).toHaveBeenCalledWith(hoisted.client, "persist"); + expect(hoisted.inboundDeduper.stop).not.toHaveBeenCalled(); + expect(hoisted.setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + accountId: "default", + connected: false, + healthState: "error", + lastError: "deduper failed", + }), + ); + }); + it("aborts stalled startup promptly and releases the shared client without persist", async () => { const abortController = new AbortController(); hoisted.resolveSharedMatrixClient.mockImplementation( diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index e15174e10a6..6e1e97ab8a0 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -24,6 +24,7 @@ import { resolveSharedMatrixClient, } from "../client.js"; import { releaseSharedClientInstance } from "../client/shared.js"; +import type { MatrixClient } from "../sdk.js"; import { isMatrixStartupAbortError } from "../startup-abort.js"; import { createMatrixThreadBindingManager } from "../thread-bindings.js"; import { registerMatrixAutoJoin } from "./auto-join.js"; @@ -31,7 +32,10 @@ import { resolveMatrixMonitorConfig } from "./config.js"; import { createDirectRoomTracker } from "./direct.js"; import { registerMatrixMonitorEvents } from "./events.js"; import { createMatrixRoomMessageHandler } from "./handler.js"; -import { createMatrixInboundEventDeduper } from "./inbound-dedupe.js"; +import { + createMatrixInboundEventDeduper, + type MatrixInboundEventDeduper, +} from "./inbound-dedupe.js"; import { shouldPromoteRecentInviteRoom } from "./recent-invite.js"; import { createMatrixRoomInfoResolver } from "./room-info.js"; import { runMatrixStartupMaintenance } from "./startup.js"; @@ -151,44 +155,35 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi baseUrl: auth.homeserver, statusSink: opts.setStatus, }); - const client = await resolveSharedMatrixClient({ - cfg, - auth: authWithLimit, - startClient: false, - accountId: auth.accountId, - }); - setActiveMatrixClient(client, auth.accountId); let cleanedUp = false; + let client: MatrixClient | null = null; let threadBindingManager: { accountId: string; stop: () => void } | null = null; - const inboundDeduper = await createMatrixInboundEventDeduper({ - auth, - env: process.env, - }); + let inboundDeduper: MatrixInboundEventDeduper | null = null; const monitorTaskRunner = createMatrixMonitorTaskRunner({ logger, logVerboseMessage, }); - const syncLifecycle = createMatrixMonitorSyncLifecycle({ - client, - statusController, - isStopping: () => cleanedUp || opts.abortSignal?.aborted === true, - }); + let syncLifecycle: ReturnType | null = null; const cleanup = async (mode: "persist" | "stop" = "persist") => { if (cleanedUp) { return; } cleanedUp = true; try { - client.stopSyncWithoutPersist(); - if (mode === "persist") { + client?.stopSyncWithoutPersist(); + if (client && mode === "persist") { await client.drainPendingDecryptions("matrix monitor shutdown"); + } + if (mode === "persist") { await monitorTaskRunner.waitForIdle(); } threadBindingManager?.stop(); - await inboundDeduper.stop(); - await releaseSharedClientInstance(client, mode); + await inboundDeduper?.stop(); + if (client) { + await releaseSharedClientInstance(client, mode); + } } finally { - syncLifecycle.dispose(); + syncLifecycle?.dispose(); statusController.markStopped(); setActiveMatrixClient(null, auth.accountId); } @@ -243,77 +238,92 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi const blockStreamingEnabled = accountConfig.blockStreaming === true; const startupMs = Date.now(); const startupGraceMs = 0; - // Cold starts should ignore old room history, but once we have a persisted - // /sync cursor we want restart backlogs to replay just like other channels. - const dropPreStartupMessages = !client.hasPersistedSyncState(); - const { getRoomInfo, getMemberDisplayName } = createMatrixRoomInfoResolver(client); - const directTracker = createDirectRoomTracker(client, { - log: logVerboseMessage, - canPromoteRecentInvite: async (roomId) => - shouldPromoteRecentInviteRoom({ - roomId, - roomInfo: await getRoomInfo(roomId, { includeAliases: true }), - rooms: roomsConfig, - }), - shouldKeepLocallyPromotedDirectRoom: async (roomId) => { - try { - const roomInfo = await getRoomInfo(roomId, { includeAliases: true }); - if (!roomInfo.nameResolved || !roomInfo.aliasesResolved) { - return undefined; - } - return shouldPromoteRecentInviteRoom({ - roomId, - roomInfo, - rooms: roomsConfig, - }); - } catch (err) { - logVerboseMessage( - `matrix: local promotion revalidation failed room=${roomId} (${String(err)})`, - ); - return undefined; - } - }, - }); - registerMatrixAutoJoin({ client, accountConfig, runtime }); const warnedEncryptedRooms = new Set(); const warnedCryptoMissingRooms = new Set(); - const handleRoomMessage = createMatrixRoomMessageHandler({ - client, - core, - cfg, - accountId: effectiveAccountId, - runtime, - logger, - logVerboseMessage, - allowFrom, - groupAllowFrom, - roomsConfig, - accountAllowBots, - configuredBotUserIds, - groupPolicy, - replyToMode, - threadReplies, - dmThreadReplies, - dmSessionScope, - streaming, - blockStreamingEnabled, - dmEnabled, - dmPolicy, - textLimit, - mediaMaxBytes, - historyLimit, - startupMs, - startupGraceMs, - dropPreStartupMessages, - inboundDeduper, - directTracker, - getRoomInfo, - getMemberDisplayName, - needsRoomAliasesForConfig, - }); - try { + client = await resolveSharedMatrixClient({ + cfg, + auth: authWithLimit, + startClient: false, + accountId: auth.accountId, + }); + setActiveMatrixClient(client, auth.accountId); + inboundDeduper = await createMatrixInboundEventDeduper({ + auth, + env: process.env, + }); + syncLifecycle = createMatrixMonitorSyncLifecycle({ + client, + statusController, + isStopping: () => cleanedUp || opts.abortSignal?.aborted === true, + }); + // Cold starts should ignore old room history, but once we have a persisted + // /sync cursor we want restart backlogs to replay just like other channels. + const dropPreStartupMessages = !client.hasPersistedSyncState(); + const { getRoomInfo, getMemberDisplayName } = createMatrixRoomInfoResolver(client); + const directTracker = createDirectRoomTracker(client, { + log: logVerboseMessage, + canPromoteRecentInvite: async (roomId) => + shouldPromoteRecentInviteRoom({ + roomId, + roomInfo: await getRoomInfo(roomId, { includeAliases: true }), + rooms: roomsConfig, + }), + shouldKeepLocallyPromotedDirectRoom: async (roomId) => { + try { + const roomInfo = await getRoomInfo(roomId, { includeAliases: true }); + if (!roomInfo.nameResolved || !roomInfo.aliasesResolved) { + return undefined; + } + return shouldPromoteRecentInviteRoom({ + roomId, + roomInfo, + rooms: roomsConfig, + }); + } catch (err) { + logVerboseMessage( + `matrix: local promotion revalidation failed room=${roomId} (${String(err)})`, + ); + return undefined; + } + }, + }); + registerMatrixAutoJoin({ client, accountConfig, runtime }); + const handleRoomMessage = createMatrixRoomMessageHandler({ + client, + core, + cfg, + accountId: effectiveAccountId, + runtime, + logger, + logVerboseMessage, + allowFrom, + groupAllowFrom, + roomsConfig, + accountAllowBots, + configuredBotUserIds, + groupPolicy, + replyToMode, + threadReplies, + dmThreadReplies, + dmSessionScope, + streaming, + blockStreamingEnabled, + dmEnabled, + dmPolicy, + textLimit, + mediaMaxBytes, + historyLimit, + startupMs, + startupGraceMs, + dropPreStartupMessages, + inboundDeduper, + directTracker, + getRoomInfo, + getMemberDisplayName, + needsRoomAliasesForConfig, + }); threadBindingManager = await createMatrixThreadBindingManager({ accountId: effectiveAccountId, auth, @@ -417,6 +427,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi await cleanup("stop"); return; } + statusController.noteUnexpectedError(err); await cleanup(); throw err; }