fix(queue): restart drain when message enqueued after idle window

After a drain loop empties the queue it deletes the key from
FOLLOWUP_QUEUES.  If a new message arrives at that moment
enqueueFollowupRun creates a fresh queue object with draining:false
but never starts a drain, leaving the message stranded until the
next run completes and calls finalizeWithFollowup.

Fix: persist the most recent runFollowup callback per queue key in
FOLLOWUP_RUN_CALLBACKS (drain.ts).  enqueueFollowupRun now calls
kickFollowupDrainIfIdle after a successful push; if a cached
callback exists and no drain is running it calls scheduleFollowupDrain
to restart immediately.  clearSessionQueues cleans up the callback
cache alongside the queue state.
This commit is contained in:
Jealous
2026-03-02 23:08:23 +08:00
committed by Peter Steinberger
parent c4511df283
commit 60130203e1
4 changed files with 141 additions and 0 deletions

View File

@@ -1,5 +1,6 @@
import { resolveEmbeddedSessionLane } from "../../../agents/pi-embedded.js";
import { clearCommandLane } from "../../../process/command-queue.js";
import { clearFollowupDrainCallback } from "./drain.js";
import { clearFollowupQueue } from "./state.js";
export type ClearSessionQueueResult = {
@@ -22,6 +23,7 @@ export function clearSessionQueues(keys: Array<string | undefined>): ClearSessio
seen.add(cleaned);
clearedKeys.push(cleaned);
followupCleared += clearFollowupQueue(cleaned);
clearFollowupDrainCallback(cleaned);
laneCleared += clearCommandLane(resolveEmbeddedSessionLane(cleaned));
}

View File

@@ -13,6 +13,23 @@ import { isRoutableChannel } from "../route-reply.js";
import { FOLLOWUP_QUEUES } from "./state.js";
import type { FollowupRun } from "./types.js";
// Persists the most recent runFollowup callback per queue key so that
// enqueueFollowupRun can restart a drain that finished and deleted the queue.
const FOLLOWUP_RUN_CALLBACKS = new Map<string, (run: FollowupRun) => Promise<void>>();
export function clearFollowupDrainCallback(key: string): void {
FOLLOWUP_RUN_CALLBACKS.delete(key);
}
/** Restart the drain for `key` if it is currently idle, using the stored callback. */
export function kickFollowupDrainIfIdle(key: string): void {
const cb = FOLLOWUP_RUN_CALLBACKS.get(key);
if (!cb) {
return;
}
scheduleFollowupDrain(key, cb);
}
type OriginRoutingMetadata = Pick<
FollowupRun,
"originatingChannel" | "originatingTo" | "originatingAccountId" | "originatingThreadId"
@@ -50,6 +67,9 @@ export function scheduleFollowupDrain(
key: string,
runFollowup: (run: FollowupRun) => Promise<void>,
): void {
// Cache the callback so enqueueFollowupRun can restart drain after the queue
// has been deleted and recreated (the post-drain idle window race condition).
FOLLOWUP_RUN_CALLBACKS.set(key, runFollowup);
const queue = beginQueueDrain(FOLLOWUP_QUEUES, key);
if (!queue) {
return;

View File

@@ -1,4 +1,5 @@
import { applyQueueDropPolicy, shouldSkipQueueItem } from "../../../utils/queue-helpers.js";
import { kickFollowupDrainIfIdle } from "./drain.js";
import { getExistingFollowupQueue, getFollowupQueue } from "./state.js";
import type { FollowupRun, QueueDedupeMode, QueueSettings } from "./types.js";
@@ -53,6 +54,12 @@ export function enqueueFollowupRun(
}
queue.items.push(run);
// If drain finished and deleted the queue before this item arrived, a new queue
// object was created (draining: false) but nobody scheduled a drain for it.
// Use the cached callback to restart the drain now.
if (!queue.draining) {
kickFollowupDrainIfIdle(key);
}
return true;
}

View File

@@ -1096,6 +1096,118 @@ describe("followup queue collect routing", () => {
});
});
describe("followup queue drain restart after idle window", () => {
it("processes a message enqueued after the drain empties and deletes the queue", async () => {
const key = `test-idle-window-race-${Date.now()}`;
const calls: FollowupRun[] = [];
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
const firstProcessed = createDeferred<void>();
const secondProcessed = createDeferred<void>();
let callCount = 0;
const runFollowup = async (run: FollowupRun) => {
callCount++;
calls.push(run);
if (callCount === 1) {
firstProcessed.resolve();
}
if (callCount === 2) {
secondProcessed.resolve();
}
};
// Enqueue first message and start drain.
enqueueFollowupRun(key, createRun({ prompt: "before-idle" }), settings);
scheduleFollowupDrain(key, runFollowup);
// Wait for the first message to be processed by the drain.
await firstProcessed.promise;
// Yield past the drain's finally block so it can set draining:false and
// delete the queue key from FOLLOWUP_QUEUES (the idle-window boundary).
await new Promise<void>((resolve) => setImmediate(resolve));
// Simulate the race: a new message arrives AFTER the drain finished and
// deleted the queue, but WITHOUT calling scheduleFollowupDrain again.
enqueueFollowupRun(key, createRun({ prompt: "after-idle" }), settings);
// kickFollowupDrainIfIdle should have restarted the drain automatically.
await secondProcessed.promise;
expect(calls).toHaveLength(2);
expect(calls[0]?.prompt).toBe("before-idle");
expect(calls[1]?.prompt).toBe("after-idle");
});
it("does not double-drain when a message arrives while drain is still running", async () => {
const key = `test-no-double-drain-${Date.now()}`;
const calls: FollowupRun[] = [];
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
const allProcessed = createDeferred<void>();
// runFollowup resolves only after both items are enqueued so the second
// item is already in the queue when the first drain step finishes.
let runFollowupResolve!: () => void;
const runFollowupGate = new Promise<void>((res) => {
runFollowupResolve = res;
});
const runFollowup = async (run: FollowupRun) => {
await runFollowupGate;
calls.push(run);
if (calls.length >= 2) {
allProcessed.resolve();
}
};
enqueueFollowupRun(key, createRun({ prompt: "first" }), settings);
scheduleFollowupDrain(key, runFollowup);
// Enqueue second message while the drain is mid-flight (draining:true).
enqueueFollowupRun(key, createRun({ prompt: "second" }), settings);
// Release the gate so both items can drain.
runFollowupResolve();
await allProcessed.promise;
expect(calls).toHaveLength(2);
expect(calls[0]?.prompt).toBe("first");
expect(calls[1]?.prompt).toBe("second");
});
it("does not process messages after clearSessionQueues clears the callback", async () => {
const key = `test-clear-callback-${Date.now()}`;
const calls: FollowupRun[] = [];
const settings: QueueSettings = { mode: "followup", debounceMs: 0, cap: 50 };
const firstProcessed = createDeferred<void>();
const runFollowup = async (run: FollowupRun) => {
calls.push(run);
firstProcessed.resolve();
};
enqueueFollowupRun(key, createRun({ prompt: "before-clear" }), settings);
scheduleFollowupDrain(key, runFollowup);
await firstProcessed.promise;
// Let drain finish and delete the queue.
await new Promise<void>((resolve) => setImmediate(resolve));
// Clear queues (simulates session teardown) — should also clear the callback.
const { clearSessionQueues } = await import("./queue.js");
clearSessionQueues([key]);
// Enqueue after clear: should NOT auto-start a drain (callback is gone).
enqueueFollowupRun(key, createRun({ prompt: "after-clear" }), settings);
// Yield a few ticks; no drain should fire.
await new Promise<void>((resolve) => setImmediate(resolve));
// Only the first message was processed; the post-clear one is still pending.
expect(calls).toHaveLength(1);
expect(calls[0]?.prompt).toBe("before-clear");
});
});
const emptyCfg = {} as OpenClawConfig;
describe("createReplyDispatcher", () => {