From 8fcd0384fa0e1779bd2e8a20ae36394bfb6e1a74 Mon Sep 17 00:00:00 2001 From: Gustavo Madeira Santana Date: Fri, 20 Mar 2026 11:27:58 -0700 Subject: [PATCH] Matrix: drain decrypt retries before shutdown persist --- .../matrix/src/matrix/monitor/index.test.ts | 10 +++- extensions/matrix/src/matrix/monitor/index.ts | 1 + extensions/matrix/src/matrix/sdk.test.ts | 46 +++++++++++++++++++ extensions/matrix/src/matrix/sdk.ts | 6 ++- .../matrix/src/matrix/sdk/decrypt-bridge.ts | 45 ++++++++++++++++++ 5 files changed, 106 insertions(+), 2 deletions(-) diff --git a/extensions/matrix/src/matrix/monitor/index.test.ts b/extensions/matrix/src/matrix/monitor/index.test.ts index 0ba003e42e1..31332f13178 100644 --- a/extensions/matrix/src/matrix/monitor/index.test.ts +++ b/extensions/matrix/src/matrix/monitor/index.test.ts @@ -16,6 +16,7 @@ const hoisted = vi.hoisted(() => { id: "matrix-client", hasPersistedSyncState: vi.fn(() => false), stopSyncWithoutPersist: vi.fn(), + drainPendingDecryptions: vi.fn(async () => undefined), }; const createMatrixRoomMessageHandler = vi.fn(() => vi.fn()); const resolveTextChunkLimit = vi.fn< @@ -236,6 +237,7 @@ describe("monitorMatrixProvider", () => { hoisted.stopThreadBindingManager.mockReset(); hoisted.client.hasPersistedSyncState.mockReset().mockReturnValue(false); hoisted.client.stopSyncWithoutPersist.mockReset(); + hoisted.client.drainPendingDecryptions.mockReset().mockResolvedValue(undefined); hoisted.inboundDeduper.claimEvent.mockReset().mockReturnValue(true); hoisted.inboundDeduper.commitEvent.mockReset().mockResolvedValue(undefined); hoisted.inboundDeduper.releaseEvent.mockReset(); @@ -303,7 +305,7 @@ describe("monitorMatrixProvider", () => { ); }); - it("stops sync before waiting for in-flight handlers, then flushes dedupe before persisting", async () => { + it("stops sync, drains decryptions, then waits for in-flight handlers before persisting", async () => { const { monitorMatrixProvider } = await import("./index.js"); const abortController = new AbortController(); let resolveHandler: (() => void) | null = null; @@ -322,6 +324,9 @@ describe("monitorMatrixProvider", () => { hoisted.client.stopSyncWithoutPersist.mockImplementation(() => { hoisted.callOrder.push("pause-client"); }); + hoisted.client.drainPendingDecryptions.mockImplementation(async () => { + hoisted.callOrder.push("drain-decrypts"); + }); hoisted.releaseSharedClientInstance.mockImplementation(async () => { hoisted.callOrder.push("release-client"); return true; @@ -354,6 +359,9 @@ describe("monitorMatrixProvider", () => { await monitorPromise; expect(hoisted.callOrder.indexOf("pause-client")).toBeLessThan( + hoisted.callOrder.indexOf("drain-decrypts"), + ); + expect(hoisted.callOrder.indexOf("drain-decrypts")).toBeLessThan( hoisted.callOrder.indexOf("handler-done"), ); expect(hoisted.callOrder.indexOf("handler-done")).toBeLessThan( diff --git a/extensions/matrix/src/matrix/monitor/index.ts b/extensions/matrix/src/matrix/monitor/index.ts index b223d6706d4..10588375abf 100644 --- a/extensions/matrix/src/matrix/monitor/index.ts +++ b/extensions/matrix/src/matrix/monitor/index.ts @@ -155,6 +155,7 @@ export async function monitorMatrixProvider(opts: MonitorMatrixOpts = {}): Promi try { threadBindingManager?.stop(); client.stopSyncWithoutPersist(); + await client.drainPendingDecryptions("matrix monitor shutdown"); await waitForInFlightRoomMessages(); await inboundDeduper.stop(); await releaseSharedClientInstance(client, "persist"); diff --git a/extensions/matrix/src/matrix/sdk.test.ts b/extensions/matrix/src/matrix/sdk.test.ts index 8b7330294e6..dd84a7f6eb2 100644 --- a/extensions/matrix/src/matrix/sdk.test.ts +++ b/extensions/matrix/src/matrix/sdk.test.ts @@ -684,6 +684,52 @@ describe("MatrixClient event bridge", () => { expect(delivered).toEqual(["m.room.message"]); }); + it("can drain pending decrypt retries after sync stops", async () => { + vi.useFakeTimers(); + const client = new MatrixClient("https://matrix.example.org", "token"); + const delivered: string[] = []; + + client.on("room.message", (_roomId, event) => { + delivered.push(event.type); + }); + + const encrypted = new FakeMatrixEvent({ + roomId: "!room:example.org", + eventId: "$event", + sender: "@alice:example.org", + type: "m.room.encrypted", + ts: Date.now(), + content: {}, + decryptionFailure: true, + }); + const decrypted = new FakeMatrixEvent({ + roomId: "!room:example.org", + eventId: "$event", + sender: "@alice:example.org", + type: "m.room.message", + ts: Date.now(), + content: { + msgtype: "m.text", + body: "hello", + }, + }); + + matrixJsClient.decryptEventIfNeeded = vi.fn(async () => { + encrypted.emit("decrypted", decrypted); + }); + + await client.start(); + matrixJsClient.emit("event", encrypted); + encrypted.emit("decrypted", encrypted, new Error("missing room key")); + + client.stopSyncWithoutPersist(); + await client.drainPendingDecryptions("test shutdown"); + + expect(matrixJsClient.stopClient).toHaveBeenCalledTimes(1); + expect(matrixJsClient.decryptEventIfNeeded).toHaveBeenCalledTimes(1); + expect(delivered).toEqual(["m.room.message"]); + }); + it("retries failed decryptions immediately on crypto key update signals", async () => { vi.useFakeTimers(); const client = new MatrixClient("https://matrix.example.org", "token", undefined, undefined, { diff --git a/extensions/matrix/src/matrix/sdk.ts b/extensions/matrix/src/matrix/sdk.ts index 4212b1a8958..4fb0b53389c 100644 --- a/extensions/matrix/src/matrix/sdk.ts +++ b/extensions/matrix/src/matrix/sdk.ts @@ -370,13 +370,17 @@ export class MatrixClient { clearInterval(this.idbPersistTimer); this.idbPersistTimer = null; } - this.decryptBridge.stop(); this.client.stopClient(); this.started = false; } + async drainPendingDecryptions(reason = "matrix client shutdown"): Promise { + await this.decryptBridge.drainPendingDecryptions(reason); + } + stop(): void { this.stopSyncWithoutPersist(); + this.decryptBridge.stop(); // Final persist on shutdown this.syncStore?.markCleanShutdown(); this.stopPersistPromise = Promise.all([ diff --git a/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts b/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts index 1df9e8748bd..68a746c7b7a 100644 --- a/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts +++ b/extensions/matrix/src/matrix/sdk/decrypt-bridge.ts @@ -51,6 +51,8 @@ export class MatrixDecryptBridge { private readonly decryptedMessageDedupe = new Map(); private readonly decryptRetries = new Map(); private readonly failedDecryptionsNotified = new Set(); + private activeRetryRuns = 0; + private readonly retryIdleResolvers = new Set<() => void>(); private cryptoRetrySignalsBound = false; constructor( @@ -139,6 +141,22 @@ export class MatrixDecryptBridge { } } + async drainPendingDecryptions(reason: string): Promise { + for (let attempts = 0; attempts < MATRIX_DECRYPT_RETRY_MAX_ATTEMPTS; attempts += 1) { + if (this.decryptRetries.size === 0) { + return; + } + this.retryPendingNow(reason); + await this.waitForActiveRetryRunsToFinish(); + const hasPendingRetryTimers = Array.from(this.decryptRetries.values()).some( + (state) => state.timer || state.inFlight, + ); + if (!hasPendingRetryTimers) { + return; + } + } + } + private handleEncryptedEventDecrypted(params: { roomId: string; encryptedEvent: MatrixEvent; @@ -246,9 +264,12 @@ export class MatrixDecryptBridge { state.inFlight = true; state.timer = null; + this.activeRetryRuns += 1; const canDecrypt = typeof this.deps.client.decryptEventIfNeeded === "function"; if (!canDecrypt) { this.clearDecryptRetry(retryKey); + this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1); + this.resolveRetryIdleIfNeeded(); return; } @@ -260,8 +281,13 @@ export class MatrixDecryptBridge { // Retry with backoff until we hit the configured retry cap. } finally { state.inFlight = false; + this.activeRetryRuns = Math.max(0, this.activeRetryRuns - 1); + this.resolveRetryIdleIfNeeded(); } + if (this.decryptRetries.get(retryKey) !== state) { + return; + } if (isDecryptionFailure(state.event)) { this.scheduleDecryptRetry(state); return; @@ -304,4 +330,23 @@ export class MatrixDecryptBridge { this.decryptedMessageDedupe.delete(oldest); } } + + private async waitForActiveRetryRunsToFinish(): Promise { + if (this.activeRetryRuns === 0) { + return; + } + await new Promise((resolve) => { + this.retryIdleResolvers.add(resolve); + }); + } + + private resolveRetryIdleIfNeeded(): void { + if (this.activeRetryRuns !== 0) { + return; + } + for (const resolve of this.retryIdleResolvers) { + resolve(); + } + this.retryIdleResolvers.clear(); + } }