test: share delivery queue reconnect fixtures

This commit is contained in:
Peter Steinberger
2026-04-20 18:42:43 +01:00
parent 9430113fe5
commit aa36c077fc

View File

@@ -1,7 +1,6 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/config.js";
import {
type DeliverFn,
@@ -12,14 +11,10 @@ import {
type RecoveryLogger,
recoverPendingDeliveries,
} from "./delivery-queue.js";
function createMockLogger(): RecoveryLogger {
return {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
}
import {
createRecoveryLog,
installDeliveryQueueTmpDirHooks,
} from "./delivery-queue.test-helpers.js";
const stubCfg = {} as OpenClawConfig;
const NO_LISTENER_ERROR = "No active WhatsApp Web listener";
@@ -52,30 +47,35 @@ async function drainWhatsAppReconnectPending(opts: {
});
}
describe("drainPendingDeliveries for WhatsApp reconnect", () => {
let fixtureRoot = "";
let tmpDir: string;
let fixtureCount = 0;
beforeAll(() => {
fixtureRoot = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-drain-"));
function createTransientFailureDeliver(): DeliverFn {
return vi.fn<DeliverFn>(async () => {
throw new Error("transient failure");
});
}
async function enqueueFailedWhatsAppDelivery(params: {
accountId: string;
stateDir: string;
error?: string;
}): Promise<string> {
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: params.accountId },
params.stateDir,
);
await failDelivery(id, params.error ?? NO_LISTENER_ERROR, params.stateDir);
return id;
}
describe("drainPendingDeliveries for WhatsApp reconnect", () => {
let tmpDir: string;
const fixtures = installDeliveryQueueTmpDirHooks();
beforeEach(() => {
tmpDir = path.join(fixtureRoot, `case-${fixtureCount++}`);
fs.mkdirSync(tmpDir, { recursive: true });
});
afterAll(() => {
if (!fixtureRoot) {
return;
}
fs.rmSync(fixtureRoot, { recursive: true, force: true });
fixtureRoot = "";
tmpDir = fixtures.tmpDir();
});
it("drains entries that failed with 'no listener' error", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
@@ -98,7 +98,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("skips entries from other accounts", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
@@ -119,16 +119,10 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("retries immediately without resetting retry history", async () => {
const log = createMockLogger();
const deliver = vi.fn<DeliverFn>(async () => {
throw new Error("transient failure");
});
const log = createRecoveryLog();
const deliver = createTransientFailureDeliver();
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
tmpDir,
);
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
const id = await enqueueFailedWhatsAppDelivery({ accountId: "acct1", stateDir: tmpDir });
const queueDir = path.join(tmpDir, "delivery-queue");
const filePath = path.join(queueDir, `${id}.json`);
const before = JSON.parse(fs.readFileSync(filePath, "utf-8")) as {
@@ -158,16 +152,10 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("does not throw if delivery fails during drain", async () => {
const log = createMockLogger();
const deliver = vi.fn<DeliverFn>(async () => {
throw new Error("transient failure");
});
const log = createRecoveryLog();
const deliver = createTransientFailureDeliver();
const id = await enqueueDelivery(
{ channel: "whatsapp", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
tmpDir,
);
await failDelivery(id, "No active WhatsApp Web listener", tmpDir);
await enqueueFailedWhatsAppDelivery({ accountId: "acct1", stateDir: tmpDir });
// Should not throw
await expect(
@@ -181,7 +169,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("skips entries where retryCount >= MAX_RETRIES", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
@@ -209,7 +197,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("second concurrent call is skipped (concurrency guard)", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
let resolveDeliver: () => void;
const deliverPromise = new Promise<void>((resolve) => {
resolveDeliver = resolve;
@@ -251,8 +239,8 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("does not re-deliver an entry already being recovered at startup", async () => {
const log = createMockLogger();
const startupLog = createMockLogger();
const log = createRecoveryLog();
const startupLog = createRecoveryLog();
let resolveDeliver: () => void;
const deliverPromise = new Promise<void>((resolve) => {
resolveDeliver = resolve;
@@ -307,8 +295,8 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("does not re-deliver a stale startup snapshot after reconnect already acked it", async () => {
const log = createMockLogger();
const startupLog = createMockLogger();
const log = createRecoveryLog();
const startupLog = createRecoveryLog();
let releaseBlocker: () => void;
const blocker = new Promise<void>((resolve) => {
releaseBlocker = resolve;
@@ -373,7 +361,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
);
});
it("drains fresh pending WhatsApp entries for the reconnecting account", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
await enqueueDelivery(
@@ -395,7 +383,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("drains backoff-eligible WhatsApp retries on reconnect", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
@@ -421,7 +409,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("does not bypass backoff for ordinary transient errors on reconnect", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
@@ -442,7 +430,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("still bypasses backoff for no-listener failures on reconnect", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
@@ -462,7 +450,7 @@ describe("drainPendingDeliveries for WhatsApp reconnect", () => {
});
it("ignores non-WhatsApp entries even when reconnect drain runs", async () => {
const log = createMockLogger();
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
await enqueueDelivery(