From 51dbc2c60fc0fed7b9372dd752f9d8cf2f2e868f Mon Sep 17 00:00:00 2001 From: yetval Date: Mon, 8 Jun 2026 14:58:05 +0000 Subject: [PATCH] fix(reply-queue): remove the drained item by reference instead of front index drainNextQueueItem captured items[0], awaited the run, then shift()-ed index 0 assuming it still held the item it ran. Concurrent inbound messages mutate the same shared items array, and at or over cap applyQueueDropPolicy splices items off the front, so a burst arriving while item[0] is in flight can shift a different, still-undelivered survivor into index 0. shift() then deletes that survivor: it is never run and is not counted in the overflow summary, so the agent silently ignores a message it should have answered. Remove the item that actually ran by identity via a new removeQueuedItemsByRef helper, and apply the same reference-based removal to the collect path in drain.ts, which had the same positional splice(0, groupItems.length) assumption after an awaited group run. --- src/auto-reply/reply/queue/drain.ts | 3 ++- src/utils/queue-helpers.ts | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index cabba31273b..e6cfeee2de7 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -9,6 +9,7 @@ import { drainCollectQueueStep, drainNextQueueItem, hasCrossChannelItems, + removeQueuedItemsByRef, previewQueueSummaryPrompt, waitForQueueDebounce, } from "../../../utils/queue-helpers.js"; @@ -497,7 +498,7 @@ export function scheduleFollowupDrain( } else { await drainGroup(); } - queue.items.splice(0, groupItems.length); + removeQueuedItemsByRef(queue.items, groupItems); if (pendingSummary) { clearFollowupQueueSummaryState(queue); pendingSummary = undefined; diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts index 89b9f2c7c7a..b1933d8b927 100644 --- a/src/utils/queue-helpers.ts +++ b/src/utils/queue-helpers.ts @@ -166,6 +166,15 @@ export function beginQueueDrain( return queue; } +export function removeQueuedItemsByRef(items: T[], processed: readonly T[]): void { + for (const item of processed) { + const idx = items.indexOf(item); + if (idx !== -1) { + items.splice(idx, 1); + } + } +} + /** Run and remove the next queued item, returning false when empty. */ export async function drainNextQueueItem( items: T[], @@ -176,7 +185,7 @@ export async function drainNextQueueItem( return false; } await run(next); - items.shift(); + removeQueuedItemsByRef(items, [next]); return true; }