test: dedupe reconnect drain fixtures

This commit is contained in:
Peter Steinberger
2026-04-21 00:19:42 +01:00
parent 897a7b794f
commit a95b61560a

View File

@@ -47,6 +47,19 @@ async function drainDirectChatReconnectPending(opts: {
});
}
async function drainAcct1DirectChatReconnect(params: {
deliver: DeliverFn;
log: RecoveryLogger;
stateDir: string;
}) {
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver: params.deliver,
log: params.log,
stateDir: params.stateDir,
});
}
function createTransientFailureDeliver(): DeliverFn {
return vi.fn<DeliverFn>(async () => {
throw new Error("transient failure");
@@ -83,18 +96,9 @@ describe("drainPendingDeliveries for reconnect", () => {
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
tmpDir,
);
await failDelivery(id, NO_LISTENER_ERROR, tmpDir);
await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir });
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).toHaveBeenCalledTimes(1);
expect(deliver).toHaveBeenCalledWith(
@@ -106,18 +110,9 @@ describe("drainPendingDeliveries for reconnect", () => {
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "other" },
tmpDir,
);
await failDelivery(id, NO_LISTENER_ERROR, tmpDir);
await enqueueFailedDirectChatDelivery({ accountId: "other", stateDir: tmpDir });
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
// deliver should not be called since no eligible entries for acct1
expect(deliver).not.toHaveBeenCalled();
@@ -136,12 +131,7 @@ describe("drainPendingDeliveries for reconnect", () => {
lastError?: string;
};
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).toHaveBeenCalledTimes(1);
@@ -164,12 +154,7 @@ describe("drainPendingDeliveries for reconnect", () => {
// Should not throw
await expect(
drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
}),
drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir }),
).resolves.toBeUndefined();
});
@@ -187,12 +172,7 @@ describe("drainPendingDeliveries for reconnect", () => {
await failDelivery(id, NO_LISTENER_ERROR, tmpDir);
}
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
// Should have moved to failed, not delivered
expect(deliver).not.toHaveBeenCalled();
@@ -283,12 +263,7 @@ describe("drainPendingDeliveries for reconnect", () => {
expect(deliver).toHaveBeenCalledTimes(1);
});
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).toHaveBeenCalledTimes(1);
expect(log.info).toHaveBeenCalledWith(
@@ -349,12 +324,7 @@ describe("drainPendingDeliveries for reconnect", () => {
);
});
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
releaseBlocker!();
await startupRecovery;
@@ -374,12 +344,7 @@ describe("drainPendingDeliveries for reconnect", () => {
tmpDir,
);
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).toHaveBeenCalledTimes(1);
expect(
@@ -403,12 +368,7 @@ describe("drainPendingDeliveries for reconnect", () => {
entry.lastAttemptAt = Date.now() - 30_000;
fs.writeFileSync(entryPath, JSON.stringify(entry, null, 2));
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).toHaveBeenCalledTimes(1);
});
@@ -423,12 +383,7 @@ describe("drainPendingDeliveries for reconnect", () => {
);
await failDelivery(id, "network down", tmpDir);
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).not.toHaveBeenCalled();
expect(log.info).toHaveBeenCalledWith(expect.stringContaining("not ready for retry yet"));
@@ -438,18 +393,9 @@ describe("drainPendingDeliveries for reconnect", () => {
const log = createRecoveryLog();
const deliver = vi.fn<DeliverFn>(async () => {});
const id = await enqueueDelivery(
{ channel: "directchat", to: "+1555", payloads: [{ text: "hi" }], accountId: "acct1" },
tmpDir,
);
await failDelivery(id, NO_LISTENER_ERROR, tmpDir);
await enqueueFailedDirectChatDelivery({ accountId: "acct1", stateDir: tmpDir });
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).toHaveBeenCalledTimes(1);
});
@@ -463,12 +409,7 @@ describe("drainPendingDeliveries for reconnect", () => {
tmpDir,
);
await drainDirectChatReconnectPending({
accountId: "acct1",
deliver,
log,
stateDir: tmpDir,
});
await drainAcct1DirectChatReconnect({ deliver, log, stateDir: tmpDir });
expect(deliver).not.toHaveBeenCalled();
});