diff --git a/src/auto-reply/reply/queue/cleanup.ts b/src/auto-reply/reply/queue/cleanup.ts index 996f9ed4760..77b623455bf 100644 --- a/src/auto-reply/reply/queue/cleanup.ts +++ b/src/auto-reply/reply/queue/cleanup.ts @@ -1,5 +1,6 @@ import { resolveEmbeddedSessionLane } from "../../../agents/pi-embedded.js"; import { clearCommandLane } from "../../../process/command-queue.js"; +import { clearFollowupDrainCallback } from "./drain.js"; import { clearFollowupQueue } from "./state.js"; export type ClearSessionQueueResult = { @@ -22,6 +23,7 @@ export function clearSessionQueues(keys: Array): ClearSessio seen.add(cleaned); clearedKeys.push(cleaned); followupCleared += clearFollowupQueue(cleaned); + clearFollowupDrainCallback(cleaned); laneCleared += clearCommandLane(resolveEmbeddedSessionLane(cleaned)); } diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index a048a4e8925..3846b2eaa93 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -13,6 +13,23 @@ import { isRoutableChannel } from "../route-reply.js"; import { FOLLOWUP_QUEUES } from "./state.js"; import type { FollowupRun } from "./types.js"; +// Persists the most recent runFollowup callback per queue key so that +// enqueueFollowupRun can restart a drain that finished and deleted the queue. +const FOLLOWUP_RUN_CALLBACKS = new Map Promise>(); + +export function clearFollowupDrainCallback(key: string): void { + FOLLOWUP_RUN_CALLBACKS.delete(key); +} + +/** Restart the drain for `key` if it is currently idle, using the stored callback. */ +export function kickFollowupDrainIfIdle(key: string): void { + const cb = FOLLOWUP_RUN_CALLBACKS.get(key); + if (!cb) { + return; + } + scheduleFollowupDrain(key, cb); +} + type OriginRoutingMetadata = Pick< FollowupRun, "originatingChannel" | "originatingTo" | "originatingAccountId" | "originatingThreadId" @@ -50,6 +67,9 @@ export function scheduleFollowupDrain( key: string, runFollowup: (run: FollowupRun) => Promise, ): void { + // Cache the callback so enqueueFollowupRun can restart drain after the queue + // has been deleted and recreated (the post-drain idle window race condition). + FOLLOWUP_RUN_CALLBACKS.set(key, runFollowup); const queue = beginQueueDrain(FOLLOWUP_QUEUES, key); if (!queue) { return; diff --git a/src/auto-reply/reply/queue/enqueue.ts b/src/auto-reply/reply/queue/enqueue.ts index 09e848dc051..1d58492374d 100644 --- a/src/auto-reply/reply/queue/enqueue.ts +++ b/src/auto-reply/reply/queue/enqueue.ts @@ -1,4 +1,5 @@ import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js"; +import { kickFollowupDrainIfIdle } from "./drain.js"; import { getExistingFollowupQueue, getFollowupQueue } from "./state.js"; import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js"; @@ -53,6 +54,12 @@ export function enqueueFollowupRun( } queue.items.push(run); + // If drain finished and deleted the queue before this item arrived, a new queue + // object was created (draining: false) but nobody scheduled a drain for it. + // Use the cached callback to restart the drain now. + if (!queue.draining) { + kickFollowupDrainIfIdle(key); + } return true; } diff --git a/src/auto-reply/reply/reply-flow.test.ts b/src/auto-reply/reply/reply-flow.test.ts index 3c697b445ec..c854672a978 100644 --- a/src/auto-reply/reply/reply-flow.test.ts +++ b/src/auto-reply/reply/reply-flow.test.ts @@ -1096,6 +1096,118 @@ describe("followup queue collect routing", () => { }); }); +describe("followup queue drain restart after idle window", () => { + it("processes a message enqueued after the drain empties and deletes the queue", async () => { + const key = `test-idle-window-race-${Date.now()}`; + const calls: FollowupRun[] = []; + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; + + const firstProcessed = createDeferred(); + const secondProcessed = createDeferred(); + let callCount = 0; + const runFollowup = async (run: FollowupRun) => { + callCount++; + calls.push(run); + if (callCount === 1) { + firstProcessed.resolve(); + } + if (callCount === 2) { + secondProcessed.resolve(); + } + }; + + // Enqueue first message and start drain. + enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings); + scheduleFollowupDrain(key, runFollowup); + + // Wait for the first message to be processed by the drain. + await firstProcessed.promise; + + // Yield past the drain's finally block so it can set draining:false and + // delete the queue key from FOLLOWUP_QUEUES (the idle-window boundary). + await new Promise((resolve) => setImmediate(resolve)); + + // Simulate the race: a new message arrives AFTER the drain finished and + // deleted the queue, but WITHOUT calling scheduleFollowupDrain again. + enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings); + + // kickFollowupDrainIfIdle should have restarted the drain automatically. + await secondProcessed.promise; + + expect(calls).toHaveLength(2); + expect(calls[0]?.prompt).toBe("before-idle"); + expect(calls[1]?.prompt).toBe("after-idle"); + }); + + it("does not double-drain when a message arrives while drain is still running", async () => { + const key = `test-no-double-drain-${Date.now()}`; + const calls: FollowupRun[] = []; + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; + + const allProcessed = createDeferred(); + // runFollowup resolves only after both items are enqueued so the second + // item is already in the queue when the first drain step finishes. + let runFollowupResolve!: () => void; + const runFollowupGate = new Promise((res) => { + runFollowupResolve = res; + }); + const runFollowup = async (run: FollowupRun) => { + await runFollowupGate; + calls.push(run); + if (calls.length >= 2) { + allProcessed.resolve(); + } + }; + + enqueueFollowupRun(key, createRun({ prompt: "first" }), settings); + scheduleFollowupDrain(key, runFollowup); + + // Enqueue second message while the drain is mid-flight (draining:true). + enqueueFollowupRun(key, createRun({ prompt: "second" }), settings); + + // Release the gate so both items can drain. + runFollowupResolve(); + + await allProcessed.promise; + expect(calls).toHaveLength(2); + expect(calls[0]?.prompt).toBe("first"); + expect(calls[1]?.prompt).toBe("second"); + }); + + it("does not process messages after clearSessionQueues clears the callback", async () => { + const key = `test-clear-callback-${Date.now()}`; + const calls: FollowupRun[] = []; + const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 }; + + const firstProcessed = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + firstProcessed.resolve(); + }; + + enqueueFollowupRun(key, createRun({ prompt: "before-clear" }), settings); + scheduleFollowupDrain(key, runFollowup); + await firstProcessed.promise; + + // Let drain finish and delete the queue. + await new Promise((resolve) => setImmediate(resolve)); + + // Clear queues (simulates session teardown) — should also clear the callback. + const { clearSessionQueues } = await import("./queue.js"); + clearSessionQueues([key]); + + // Enqueue after clear: should NOT auto-start a drain (callback is gone). + enqueueFollowupRun(key, createRun({ prompt: "after-clear" }), settings); + + // Yield a few ticks; no drain should fire. + await new Promise((resolve) => setImmediate(resolve)); + + // Only the first message was processed; the post-clear one is still pending. + expect(calls).toHaveLength(1); + expect(calls[0]?.prompt).toBe("before-clear"); + }); +}); + const emptyCfg = {} as OpenClawConfig; describe("createReplyDispatcher", () => {