diff --git a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts index 0302341c46a..03e67083dcb 100644 --- a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts +++ b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts @@ -47,6 +47,19 @@ async function drainDirectChatReconnectPending(opts: { }); } +async function drainAcct1DirectChatReconnect(params: { + deliver: DeliverFn; + log: RecoveryLogger; + stateDir: string; +}) { + await drainDirectChatReconnectPending({ + accountId: "acct1", + deliver: params.deliver, + log: params.log, + stateDir: params.stateDir, + }); +} + function createTransientFailureDeliver(): DeliverFn { return vi.fn(async () => { throw new Error("transient failure"); @@ -83,18 +96,9 @@ describe("drainPendingDeliveries for reconnect", () => { const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); - const id = await enqueueDelivery( - { channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, - tmpDir, - ); - await failDelivery(id, NO_LISTENER_ERROR, tmpDir); + await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir }); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).toHaveBeenCalledTimes(1); expect(deliver).toHaveBeenCalledWith( @@ -106,18 +110,9 @@ describe("drainPendingDeliveries for reconnect", () => { const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); - const id = await enqueueDelivery( - { channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "other" }, - tmpDir, - ); - await failDelivery(id, NO_LISTENER_ERROR, tmpDir); + await enqueueFailedDirectChatDelivery({ accountId: "other", stateDir: tmpDir }); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); // deliver should not be called since no eligible entries for acct1 expect(deliver).not.toHaveBeenCalled(); @@ -136,12 +131,7 @@ describe("drainPendingDeliveries for reconnect", () => { lastError?: string; }; - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).toHaveBeenCalledTimes(1); @@ -164,12 +154,7 @@ describe("drainPendingDeliveries for reconnect", () => { // Should not throw await expect( - drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }), + drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }), ).resolves.toBeUndefined(); }); @@ -187,12 +172,7 @@ describe("drainPendingDeliveries for reconnect", () => { await failDelivery(id, NO_LISTENER_ERROR, tmpDir); } - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); // Should have moved to failed, not delivered expect(deliver).not.toHaveBeenCalled(); @@ -283,12 +263,7 @@ describe("drainPendingDeliveries for reconnect", () => { expect(deliver).toHaveBeenCalledTimes(1); }); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).toHaveBeenCalledTimes(1); expect(log.info).toHaveBeenCalledWith( @@ -349,12 +324,7 @@ describe("drainPendingDeliveries for reconnect", () => { ); }); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); releaseBlocker!(); await startupRecovery; @@ -374,12 +344,7 @@ describe("drainPendingDeliveries for reconnect", () => { tmpDir, ); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).toHaveBeenCalledTimes(1); expect( @@ -403,12 +368,7 @@ describe("drainPendingDeliveries for reconnect", () => { entry.lastAttemptAt = Date.now() - 30_000; fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2)); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).toHaveBeenCalledTimes(1); }); @@ -423,12 +383,7 @@ describe("drainPendingDeliveries for reconnect", () => { ); await failDelivery(id, "network down", tmpDir); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).not.toHaveBeenCalled(); expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet")); @@ -438,18 +393,9 @@ describe("drainPendingDeliveries for reconnect", () => { const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); - const id = await enqueueDelivery( - { channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, - tmpDir, - ); - await failDelivery(id, NO_LISTENER_ERROR, tmpDir); + await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir }); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).toHaveBeenCalledTimes(1); }); @@ -463,12 +409,7 @@ describe("drainPendingDeliveries for reconnect", () => { tmpDir, ); - await drainDirectChatReconnectPending({ - accountId: "acct1", - deliver, - log, - stateDir: tmpDir, - }); + await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }); expect(deliver).not.toHaveBeenCalled(); });