import { describe, expect, it, vi } from "vitest"; import { enqueueKeyedTask, KeyedAsyncQueue } from "./keyed-async-queue.js"; function deferred() { const { promise, resolve, reject } = Promise.withResolvers(); return { promise, resolve, reject }; } describe("enqueueKeyedTask", () => { it("serializes tasks per key and keeps different keys independent", async () => { const tails = new Map>(); const gate = deferred(); const order: string[] = []; const first = enqueueKeyedTask({ tails, key: "a", task: async () => { order.push("a1:start"); await gate.promise; order.push("a1:end"); }, }); const second = enqueueKeyedTask({ tails, key: "a", task: async () => { order.push("a2:start"); order.push("a2:end"); }, }); const third = enqueueKeyedTask({ tails, key: "b", task: async () => { order.push("b1:start"); order.push("b1:end"); }, }); await vi.waitFor(() => { expect(order).toContain("a1:start"); expect(order).toContain("b1:start"); }); expect(order).not.toContain("a2:start"); gate.resolve(); await Promise.all([first, second, third]); expect(order).toEqual(["a1:start", "b1:start", "b1:end", "a1:end", "a2:start", "a2:end"]); expect(tails.size).toBe(0); }); it("keeps queue alive after task failures", async () => { const tails = new Map>(); await expect( enqueueKeyedTask({ tails, key: "a", task: async () => { throw new Error("boom"); }, }), ).rejects.toThrow("boom"); await expect( enqueueKeyedTask({ tails, key: "a", task: async () => "ok", }), ).resolves.toBe("ok"); }); it("runs enqueue/settle hooks once per task", async () => { const tails = new Map>(); const onEnqueue = vi.fn(); const onSettle = vi.fn(); await enqueueKeyedTask({ tails, key: "a", task: async () => undefined, hooks: { onEnqueue, onSettle }, }); expect(onEnqueue).toHaveBeenCalledTimes(1); expect(onSettle).toHaveBeenCalledTimes(1); }); }); describe("KeyedAsyncQueue", () => { it("exposes tail map for observability", async () => { const queue = new KeyedAsyncQueue(); const gate = deferred(); const run = queue.enqueue("actor", async () => { await gate.promise; return 1; }); expect(queue.getTailMapForTesting().has("actor")).toBe(true); gate.resolve(); await run; await Promise.resolve(); expect(queue.getTailMapForTesting().has("actor")).toBe(false); }); });