diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 575ac7f1780..d0fd692c2e1 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -1,4 +1,5 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { importFreshModule } from "../../../test/helpers/import-fresh.js"; import { expectInboundContextContract } from "../../../test/helpers/inbound-contract.js"; import type { OpenClawConfig } from "../../config/config.js"; import { defaultRuntime } from "../../runtime.js"; @@ -743,6 +744,71 @@ describe("followup queue deduplication", () => { expect(calls).toHaveLength(1); }); + it("deduplicates same message_id across distinct enqueue module instances", async () => { + const enqueueA = await importFreshModule( + import.meta.url, + "./queue/enqueue.js?scope=dedupe-a", + ); + const enqueueB = await importFreshModule( + import.meta.url, + "./queue/enqueue.js?scope=dedupe-b", + ); + const { clearSessionQueues } = await import("./queue.js"); + const key = `test-dedup-cross-module-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + enqueueA.resetRecentQueuedMessageIdDedupe(); + enqueueB.resetRecentQueuedMessageIdDedupe(); + + try { + expect( + enqueueA.enqueueFollowupRun( + key, + createRun({ + prompt: "first", + messageId: "same-id", + originatingChannel: "signal", + originatingTo: "+10000000000", + }), + settings, + ), + ).toBe(true); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + await new Promise((resolve) => setImmediate(resolve)); + + expect( + enqueueB.enqueueFollowupRun( + key, + createRun({ + prompt: "first-redelivery", + messageId: "same-id", + originatingChannel: "signal", + originatingTo: "+10000000000", + }), + settings, + ), + ).toBe(false); + expect(calls).toHaveLength(1); + } finally { + clearSessionQueues([key]); + enqueueA.resetRecentQueuedMessageIdDedupe(); + enqueueB.resetRecentQueuedMessageIdDedupe(); + } + }); + it("does not collide recent message-id keys when routing contains delimiters", async () => { const key = `test-dedup-key-collision-${Date.now()}`; const calls: FollowupRun[] = []; @@ -1264,6 +1330,55 @@ describe("followup queue drain restart after idle window", () => { expect(calls[1]?.prompt).toBe("after-idle"); }); + it("restarts an idle drain across distinct enqueue and drain module instances", async () => { + const drainA = await importFreshModule( + import.meta.url, + "./queue/drain.js?scope=restart-a", + ); + const enqueueB = await importFreshModule( + import.meta.url, + "./queue/enqueue.js?scope=restart-b", + ); + const { clearSessionQueues } = await import("./queue.js"); + const key = `test-idle-window-cross-module-${Date.now()}`; + const calls: FollowupRun[] = []; + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; + const firstProcessed = createDeferred(); + + enqueueB.resetRecentQueuedMessageIdDedupe(); + + try { + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length === 1) { + firstProcessed.resolve(); + } + }; + + enqueueB.enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings); + drainA.scheduleFollowupDrain(key, runFollowup); + await firstProcessed.promise; + + await new Promise((resolve) => setImmediate(resolve)); + + enqueueB.enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings); + + await vi.waitFor( + () => { + expect(calls).toHaveLength(2); + }, + { timeout: 1_000 }, + ); + + expect(calls[0]?.prompt).toBe("before-idle"); + expect(calls[1]?.prompt).toBe("after-idle"); + } finally { + clearSessionQueues([key]); + drainA.clearFollowupDrainCallback(key); + enqueueB.resetRecentQueuedMessageIdDedupe(); + } + }); + it("does not double-drain when a message arrives while drain is still running", async () => { const key = `test-no-double-drain-${Date.now()}`; const calls: FollowupRun[] = [];