From aa36c077fc40ee95068debf0b423f25a48de5a23 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 20 Apr 2026 18:42:43 +0100 Subject: [PATCH] test: share delivery queue reconnect fixtures --- .../delivery-queue.reconnect-drain.test.ts | 104 ++++++++---------- 1 file changed, 46 insertions(+), 58 deletions(-) diff --git a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts index 2113583bb2e..ce85a94c020 100644 --- a/src/infra/outbound/delivery-queue.reconnect-drain.test.ts +++ b/src/infra/outbound/delivery-queue.reconnect-drain.test.ts @@ -1,7 +1,6 @@ import fs from "node:fs"; -import os from "node:os"; import path from "node:path"; -import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { beforeEach, describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../config/config.js"; import { type DeliverFn, @@ -12,14 +11,10 @@ import { type RecoveryLogger, recoverPendingDeliveries, } from "./delivery-queue.js"; - -function createMockLogger(): RecoveryLogger { - return { - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }; -} +import { + createRecoveryLog, + installDeliveryQueueTmpDirHooks, +} from "./delivery-queue.test-helpers.js"; const stubCfg = {} as OpenClawConfig; const NO_LISTENER_ERROR = "No active WhatsApp Web listener"; @@ -52,30 +47,35 @@ async function drainWhatsAppReconnectPending(opts: { }); } -describe("drainPendingDeliveries for WhatsApp reconnect", () => { - let fixtureRoot = ""; - let tmpDir: string; - let fixtureCount = 0; - - beforeAll(() => { - fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-drain-")); +function createTransientFailureDeliver(): DeliverFn { + return vi.fn(async () => { + throw new Error("transient failure"); }); +} + +async function enqueueFailedWhatsAppDelivery(params: { + accountId: string; + stateDir: string; + error?: string; +}): Promise { + const id = await enqueueDelivery( + { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: params.accountId }, + params.stateDir, + ); + await failDelivery(id, params.error ?? NO_LISTENER_ERROR, params.stateDir); + return id; +} + +describe("drainPendingDeliveries for WhatsApp reconnect", () => { + let tmpDir: string; + const fixtures = installDeliveryQueueTmpDirHooks(); beforeEach(() => { - tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`); - fs.mkdirSync(tmpDir, { recursive: true }); - }); - - afterAll(() => { - if (!fixtureRoot) { - return; - } - fs.rmSync(fixtureRoot, { recursive: true, force: true }); - fixtureRoot = ""; + tmpDir = fixtures.tmpDir(); }); it("drains entries that failed with 'no listener' error", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); const id = await enqueueDelivery( @@ -98,7 +98,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("skips entries from other accounts", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); const id = await enqueueDelivery( @@ -119,16 +119,10 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("retries immediately without resetting retry history", async () => { - const log = createMockLogger(); - const deliver = vi.fn(async () => { - throw new Error("transient failure"); - }); + const log = createRecoveryLog(); + const deliver = createTransientFailureDeliver(); - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, - tmpDir, - ); - await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + const id = await enqueueFailedWhatsAppDelivery({ accountId: "acct1", stateDir: tmpDir }); const queueDir = path.join(tmpDir, "delivery-queue"); const filePath = path.join(queueDir, `${id}.json`); const before = JSON.parse(fs.readFileSync(filePath, "utf-8")) as { @@ -158,16 +152,10 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("does not throw if delivery fails during drain", async () => { - const log = createMockLogger(); - const deliver = vi.fn(async () => { - throw new Error("transient failure"); - }); + const log = createRecoveryLog(); + const deliver = createTransientFailureDeliver(); - const id = await enqueueDelivery( - { channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" }, - tmpDir, - ); - await failDelivery(id, "No active WhatsApp Web listener", tmpDir); + await enqueueFailedWhatsAppDelivery({ accountId: "acct1", stateDir: tmpDir }); // Should not throw await expect( @@ -181,7 +169,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("skips entries where retryCount >= MAX_RETRIES", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); const id = await enqueueDelivery( @@ -209,7 +197,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("second concurrent call is skipped (concurrency guard)", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); let resolveDeliver: () => void; const deliverPromise = new Promise((resolve) => { resolveDeliver = resolve; @@ -251,8 +239,8 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("does not re-deliver an entry already being recovered at startup", async () => { - const log = createMockLogger(); - const startupLog = createMockLogger(); + const log = createRecoveryLog(); + const startupLog = createRecoveryLog(); let resolveDeliver: () => void; const deliverPromise = new Promise((resolve) => { resolveDeliver = resolve; @@ -307,8 +295,8 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("does not re-deliver a stale startup snapshot after reconnect already acked it", async () => { - const log = createMockLogger(); - const startupLog = createMockLogger(); + const log = createRecoveryLog(); + const startupLog = createRecoveryLog(); let releaseBlocker: () => void; const blocker = new Promise((resolve) => { releaseBlocker = resolve; @@ -373,7 +361,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { ); }); it("drains fresh pending WhatsApp entries for the reconnecting account", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); await enqueueDelivery( @@ -395,7 +383,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("drains backoff-eligible WhatsApp retries on reconnect", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); const id = await enqueueDelivery( @@ -421,7 +409,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("does not bypass backoff for ordinary transient errors on reconnect", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); const id = await enqueueDelivery( @@ -442,7 +430,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("still bypasses backoff for no-listener failures on reconnect", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); const id = await enqueueDelivery( @@ -462,7 +450,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => { }); it("ignores non-WhatsApp entries even when reconnect drain runs", async () => { - const log = createMockLogger(); + const log = createRecoveryLog(); const deliver = vi.fn(async () => {}); await enqueueDelivery(