From 7d0f094dbfeb79e84cbb1679a64fcccea0ade5cc Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Wed, 8 Apr 2026 13:46:05 -0400 Subject: [PATCH] fix(matrix): harden shared startup and fatal sync latching --- .../matrix/src/matrix/client/shared.test.ts | 42 +++++++++++++++++++ extensions/matrix/src/matrix/client/shared.ts | 16 ++++--- .../src/matrix/monitor/sync-lifecycle.test.ts | 27 ++++++++++++ .../src/matrix/monitor/sync-lifecycle.ts | 4 +- 4 files changed, 82 insertions(+), 7 deletions(-) diff --git a/extensions/matrix/src/matrix/client/shared.test.ts b/extensions/matrix/src/matrix/client/shared.test.ts index e97d3ebaf19..158cba4a8e5 100644 --- a/extensions/matrix/src/matrix/client/shared.test.ts +++ b/extensions/matrix/src/matrix/client/shared.test.ts @@ -271,6 +271,48 @@ describe("resolveSharedMatrixClient", () => { await expect(ownerPromise).resolves.toBe(mainClient); }); + it("keeps the shared startup lock while an aborted waiter exits early", async () => { + const mainAuth = authFor("main"); + let resolveStartup: (() => void) | undefined; + const mainClient = { + ...createMockClient("main"), + start: vi.fn( + async () => + await new Promise((resolve) => { + resolveStartup = resolve; + }), + ), + }; + + resolveMatrixAuthMock.mockResolvedValue(mainAuth); + createMatrixClientMock.mockResolvedValue(mainClient); + + const ownerPromise = resolveSharedMatrixClient({ accountId: "main" }); + await vi.waitFor(() => { + expect(mainClient.start).toHaveBeenCalledTimes(1); + expect(resolveStartup).toEqual(expect.any(Function)); + }); + + const abortController = new AbortController(); + const abortedWaiter = resolveSharedMatrixClient({ + accountId: "main", + abortSignal: abortController.signal, + }); + abortController.abort(); + await expect(abortedWaiter).rejects.toMatchObject({ + message: "Matrix startup aborted", + name: "AbortError", + }); + + const followerPromise = resolveSharedMatrixClient({ accountId: "main" }); + expect(mainClient.start).toHaveBeenCalledTimes(1); + + resolveStartup?.(); + await expect(ownerPromise).resolves.toBe(mainClient); + await expect(followerPromise).resolves.toBe(mainClient); + expect(mainClient.start).toHaveBeenCalledTimes(1); + }); + it("recreates the shared client when dispatcherPolicy changes", async () => { const firstAuth = { ...authFor("main"), diff --git a/extensions/matrix/src/matrix/client/shared.ts b/extensions/matrix/src/matrix/client/shared.ts index 10d7316b677..ff13bb0e6c9 100644 --- a/extensions/matrix/src/matrix/client/shared.ts +++ b/extensions/matrix/src/matrix/client/shared.ts @@ -107,7 +107,7 @@ async function ensureSharedClientStarted(params: { return; } - params.state.startPromise = (async () => { + const startPromise = (async () => { const client = params.state.client; // Initialize crypto if enabled @@ -126,12 +126,16 @@ async function ensureSharedClientStarted(params: { await client.start({ abortSignal: params.abortSignal }); params.state.started = true; })(); + // Keep the shared startup lock until the underlying start fully settles, even + // if one waiter aborts early while another caller still owns the startup. + const guardedStart = startPromise.finally(() => { + if (params.state.startPromise === guardedStart) { + params.state.startPromise = null; + } + }); + params.state.startPromise = guardedStart; - try { - await waitForStart(params.state.startPromise); - } finally { - params.state.startPromise = null; - } + await waitForStart(guardedStart); } async function resolveSharedMatrixClientState( diff --git a/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts b/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts index 6b37975df60..463f5361199 100644 --- a/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts +++ b/extensions/matrix/src/matrix/monitor/sync-lifecycle.test.ts @@ -146,6 +146,33 @@ describe("createMatrixMonitorSyncLifecycle", () => { ); }); + it("ignores follow-up sync states after a fatal sync error", async () => { + const client = createClientEmitter(); + const setStatus = vi.fn(); + const lifecycle = createMatrixMonitorSyncLifecycle({ + client: client as never, + statusController: createMatrixMonitorStatusController({ + accountId: "default", + statusSink: setStatus, + }), + }); + + const waitPromise = lifecycle.waitForFatalStop(); + client.emit("sync.unexpected_error", new Error("sync exploded")); + await expect(waitPromise).rejects.toThrow("sync exploded"); + + client.emit("sync.state", "RECONNECTING", "SYNCING", new Error("late reconnect")); + lifecycle.dispose(); + + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ + accountId: "default", + healthState: "error", + lastError: "sync exploded", + }), + ); + }); + it("rejects a second concurrent fatal-stop waiter", async () => { const client = createClientEmitter(); const lifecycle = createMatrixMonitorSyncLifecycle({ diff --git a/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts b/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts index 329d1a173ae..ae07c09a575 100644 --- a/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts +++ b/extensions/matrix/src/matrix/monitor/sync-lifecycle.ts @@ -42,7 +42,9 @@ export function createMatrixMonitorSyncLifecycle(params: { settleFatal(fatalError); return; } - if (isMatrixTerminalSyncState(state) && fatalError) { + // Fatal sync failures are sticky for telemetry; later SDK state churn during + // cleanup or reconnect should not overwrite the first recorded error. + if (fatalError) { return; } params.statusController.noteSyncState(state, error);