mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-28 01:23:31 +00:00
fix(reply-queue): remove the drained item by reference instead of front index
drainNextQueueItem captured items[0], awaited the run, then shift()-ed index 0 assuming it still held the item it ran. Concurrent inbound messages mutate the same shared items array, and at or over cap applyQueueDropPolicy splices items off the front, so a burst arriving while item[0] is in flight can shift a different, still-undelivered survivor into index 0. shift() then deletes that survivor: it is never run and is not counted in the overflow summary, so the agent silently ignores a message it should have answered. Remove the item that actually ran by identity via a new removeQueuedItemsByRef helper, and apply the same reference-based removal to the collect path in drain.ts, which had the same positional splice(0, groupItems.length) assumption after an awaited group run.
This commit is contained in:
@@ -9,6 +9,7 @@ import {
|
||||
drainCollectQueueStep,
|
||||
drainNextQueueItem,
|
||||
hasCrossChannelItems,
|
||||
removeQueuedItemsByRef,
|
||||
previewQueueSummaryPrompt,
|
||||
waitForQueueDebounce,
|
||||
} from "../../../utils/queue-helpers.js";
|
||||
@@ -497,7 +498,7 @@ export function scheduleFollowupDrain(
|
||||
} else {
|
||||
await drainGroup();
|
||||
}
|
||||
queue.items.splice(0, groupItems.length);
|
||||
removeQueuedItemsByRef(queue.items, groupItems);
|
||||
if (pendingSummary) {
|
||||
clearFollowupQueueSummaryState(queue);
|
||||
pendingSummary = undefined;
|
||||
|
||||
@@ -166,6 +166,15 @@ export function beginQueueDrain<T extends { draining: boolean }>(
|
||||
return queue;
|
||||
}
|
||||
|
||||
export function removeQueuedItemsByRef<T>(items: T[], processed: readonly T[]): void {
|
||||
for (const item of processed) {
|
||||
const idx = items.indexOf(item);
|
||||
if (idx !== -1) {
|
||||
items.splice(idx, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Run and remove the next queued item, returning false when empty. */
|
||||
export async function drainNextQueueItem<T>(
|
||||
items: T[],
|
||||
@@ -176,7 +185,7 @@ export async function drainNextQueueItem<T>(
|
||||
return false;
|
||||
}
|
||||
await run(next);
|
||||
items.shift();
|
||||
removeQueuedItemsByRef(items, [next]);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user