From 43d4be902755c970b3d15608679761877718da69 Mon Sep 17 00:00:00 2001 From: Agustin Rivera <31522568+eleqtrizit@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:35:39 -0700 Subject: [PATCH] fix(queue): split collect batches by auth context (#66024) * fix(queue): split collect batches by auth context Co-authored-by: zsx * fix(queue): keep overflow summary on splits * fix(queue): preserve grouped collect retry semantics * fix(queue): drop USER.md from pr * fix(queue): keep overflow summary in first auth group * fix(queue): clear overflow summary state after first auth group * fix(queue): narrow auth split key * fix(queue): flush collect summary-only drains * changelog: note collect-mode auth-context batch split (#66024) --------- Co-authored-by: zsx Co-authored-by: Devin Robison --- CHANGELOG.md | 1 + src/auto-reply/reply/queue.collect.test.ts | 747 +++++++++++++++++++++ src/auto-reply/reply/queue/drain.ts | 124 +++- 3 files changed, 853 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 493b0062bc0..f4f9c07d20f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai - Cron/scheduler: stop inventing short retries when cron next-run calculation returns no valid future slot, and keep a maintenance wake armed so enabled unscheduled jobs recover without entering a refire loop. (#66019, #66083) Thanks @mbelinky. - Cron/scheduler: preserve the active error-backoff floor when maintenance repair recomputes a missing cron next-run, so recurring errored jobs do not resume early after a transient next-run resolution failure. (#66019, #66083, #66113) Thanks @mbelinky. - Outbound/delivery-queue: persist the originating outbound `session` context on queued delivery entries and replay it during recovery, so write-ahead-queued sends keep their original outbound media policy context after restart instead of evaluating against a missing session. (#66025) Thanks @eleqtrizit. +- Auto-reply/queue: split collect-mode followup drains into contiguous groups by per-message authorization context (sender id, owner status, exec/bash-elevated overrides), so queued items from different senders or exec configs no longer execute under the last queued run's owner-only and exec-approval context. (#66024) Thanks @eleqtrizit. ## 2026.4.12 diff --git a/src/auto-reply/reply/queue.collect.test.ts b/src/auto-reply/reply/queue.collect.test.ts index dd865fba3a2..2fb56721f1e 100644 --- a/src/auto-reply/reply/queue.collect.test.ts +++ b/src/auto-reply/reply/queue.collect.test.ts @@ -6,6 +6,7 @@ import { createQueueTestRun as createRun, installQueueRuntimeErrorSilencer, } from "./queue.test-helpers.js"; +import { resolveFollowupAuthorizationKey } from "./queue/drain.js"; installQueueRuntimeErrorSilencer(); @@ -94,6 +95,440 @@ describe("followup queue collect routing", () => { expect(calls[0]?.originatingTo).toBe("channel:A"); }); + it("splits collect batches when sender authorization changes", async () => { + const key = `test-collect-auth-split-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const expectedCalls = 2; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length >= expectedCalls) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const nonOwner = createRun({ + prompt: "use the gateway tool", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + enqueueFollowupRun( + key, + { + ...nonOwner, + run: { + ...nonOwner.run, + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + const owner = createRun({ + prompt: "what's the weather?", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + enqueueFollowupRun( + key, + { + ...owner, + run: { + ...owner.run, + senderId: "owner-1", + senderName: "Owner", + senderIsOwner: true, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls.map((call) => call.run.senderIsOwner)).toEqual([false, true]); + expect(calls[0]?.prompt).toContain("use the gateway tool"); + expect(calls[0]?.prompt).not.toContain("what's the weather?"); + expect(calls[1]?.prompt).toContain("what's the weather?"); + expect(calls[1]?.prompt).toContain("(from Owner)"); + }); + + it("keeps one collect batch when authorization context matches", async () => { + const key = `test-collect-auth-match-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = createRun({ + prompt: "first", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const second = createRun({ + prompt: "second", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + + enqueueFollowupRun( + key, + { + ...first, + run: { + ...first.run, + senderId: "user-1", + senderName: "Guest", + senderUsername: "guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...second, + run: { + ...second.run, + senderId: "user-1", + senderName: "Guest", + senderUsername: "guest", + senderIsOwner: false, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls).toHaveLength(1); + expect(calls[0]?.prompt).toContain("first"); + expect(calls[0]?.prompt).toContain("second"); + expect(calls[0]?.prompt).toContain("(from Guest)"); + }); + + it("keeps one collect batch when only sender display fields drift", async () => { + const key = `test-collect-auth-display-drift-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = createRun({ + prompt: "first", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const second = createRun({ + prompt: "second", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + + enqueueFollowupRun( + key, + { + ...first, + run: { + ...first.run, + senderId: "user-1", + senderName: "Guest", + senderUsername: "guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...second, + run: { + ...second.run, + senderId: "user-1", + senderName: "Guest User", + senderUsername: "guest-renamed", + senderIsOwner: false, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls).toHaveLength(1); + expect(calls[0]?.prompt).toContain("first"); + expect(calls[0]?.prompt).toContain("second"); + expect(calls[0]?.prompt).toContain("(from Guest)"); + expect(calls[0]?.prompt).toContain("(from Guest User)"); + }); + + it("splits collect batches when exec context changes", async () => { + const key = `test-collect-exec-split-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const expectedCalls = 2; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length >= expectedCalls) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const base = createRun({ + prompt: "first", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + + enqueueFollowupRun( + key, + { + ...base, + run: { + ...base.run, + senderId: "owner-1", + senderIsOwner: true, + bashElevated: { enabled: false, allowed: true, defaultLevel: "off" }, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...createRun({ + prompt: "second", + originatingChannel: "slack", + originatingTo: "channel:A", + }), + run: { + ...base.run, + senderId: "owner-1", + senderIsOwner: true, + bashElevated: { enabled: true, allowed: true, defaultLevel: "on" }, + execOverrides: { ask: "always" }, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls[0]?.prompt).toContain("first"); + expect(calls[0]?.prompt).not.toContain("second"); + expect(calls[1]?.prompt).toContain("second"); + expect(calls[1]?.run.bashElevated?.enabled).toBe(true); + expect(calls[1]?.run.execOverrides?.ask).toBe("always"); + }); + + it("uses the newest run within a matching authorization batch", async () => { + const key = `test-collect-latest-run-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + done.resolve(); + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = createRun({ prompt: "first", originatingChannel: "slack", originatingTo: "A" }); + const second = createRun({ + prompt: "second", + originatingChannel: "slack", + originatingTo: "A", + }); + + enqueueFollowupRun( + key, + { + ...first, + run: { + ...first.run, + provider: "openai", + model: "gpt-5.4", + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...second, + run: { + ...second.run, + provider: "anthropic", + model: "sonnet-4.6", + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls).toHaveLength(1); + expect(calls[0]?.run.provider).toBe("anthropic"); + expect(calls[0]?.run.model).toBe("sonnet-4.6"); + }); + + it("delivers and clears summary-only collect drains after cross-channel items", async () => { + const key = `test-collect-summary-only-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const expectedCalls = 3; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length >= expectedCalls) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 2, + dropPolicy: "summarize", + }; + + enqueueFollowupRun( + key, + createRun({ + prompt: "first", + originatingChannel: "slack", + originatingTo: "channel:A", + }), + settings, + ); + enqueueFollowupRun( + key, + createRun({ + prompt: "second", + originatingChannel: "slack", + originatingTo: "channel:B", + }), + settings, + ); + enqueueFollowupRun( + key, + createRun({ + prompt: "third", + originatingChannel: "slack", + originatingTo: "channel:C", + }), + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls).toHaveLength(3); + expect(calls[0]?.prompt).toBe("second"); + expect(calls[1]?.prompt).toBe("third"); + expect(calls[2]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + expect(calls[2]?.prompt).toContain("- first"); + }); + + it("preserves collect order when authorization changes more than once", async () => { + const key = `test-collect-auth-order-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const expectedCalls = 3; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length >= expectedCalls) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const first = createRun({ prompt: "first", originatingChannel: "slack", originatingTo: "A" }); + const second = createRun({ prompt: "second", originatingChannel: "slack", originatingTo: "A" }); + const third = createRun({ prompt: "third", originatingChannel: "slack", originatingTo: "A" }); + + enqueueFollowupRun( + key, + { + ...first, + run: { ...first.run, senderId: "user-a", senderName: "A", senderIsOwner: false }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...second, + run: { ...second.run, senderId: "owner-1", senderName: "Owner", senderIsOwner: true }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...third, + run: { ...third.run, senderId: "user-a", senderName: "A", senderIsOwner: false }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls.map((call) => call.prompt)).toEqual([ + expect.stringContaining("first"), + expect.stringContaining("second"), + expect.stringContaining("third"), + ]); + }); + it("collects Slack messages in same thread and preserves string thread id", async () => { const key = `test-collect-slack-thread-same-${Date.now()}`; const calls: FollowupRun[] = []; @@ -212,6 +647,83 @@ describe("followup queue collect routing", () => { expect(calls[0]?.prompt).toContain("Queued #2\ntwo"); }); + it("retries only the remaining collect auth groups after a partial failure", async () => { + const key = `test-collect-partial-retry-${Date.now()}`; + const attempts: FollowupRun[] = []; + const successfulCalls: FollowupRun[] = []; + const done = createDeferred(); + let attempt = 0; + const runFollowup = async (run: FollowupRun) => { + attempt += 1; + attempts.push(run); + if (attempt === 2) { + throw new Error("transient failure"); + } + successfulCalls.push(run); + if (attempt >= 3) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 50, + dropPolicy: "summarize", + }; + + const guest = createRun({ + prompt: "guest message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const owner = createRun({ + prompt: "owner message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + + enqueueFollowupRun( + key, + { + ...guest, + run: { + ...guest.run, + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...owner, + run: { + ...owner.run, + senderId: "owner-1", + senderName: "Owner", + senderIsOwner: true, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + const guestAttempts = attempts.filter((call) => call.prompt.includes("guest message")); + const ownerAttempts = attempts.filter((call) => call.prompt.includes("owner message")); + + expect(attempts).toHaveLength(3); + expect(guestAttempts).toHaveLength(1); + expect(ownerAttempts).toHaveLength(2); + expect(successfulCalls.map((call) => call.prompt)).toEqual([ + expect.stringContaining("guest message"), + expect.stringContaining("owner message"), + ]); + }); + it("retries overflow summary delivery without losing dropped previews", async () => { const key = `test-overflow-summary-retry-${Date.now()}`; const calls: FollowupRun[] = []; @@ -241,6 +753,183 @@ describe("followup queue collect routing", () => { expect(calls[0]?.prompt).toContain("- first"); }); + it("includes the overflow summary only in the first split auth group", async () => { + const key = `test-collect-overflow-summary-once-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + const expectedCalls = 2; + const runFollowup = async (run: FollowupRun) => { + calls.push(run); + if (calls.length >= expectedCalls) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 2, + dropPolicy: "summarize", + }; + + const droppedGuest = createRun({ + prompt: "dropped guest message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const guest = createRun({ + prompt: "guest message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const owner = createRun({ + prompt: "owner message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + + enqueueFollowupRun( + key, + { + ...droppedGuest, + run: { + ...droppedGuest.run, + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...guest, + run: { + ...guest.run, + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...owner, + run: { + ...owner.run, + senderId: "owner-1", + senderName: "Owner", + senderIsOwner: true, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls).toHaveLength(2); + expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + expect(calls[0]?.prompt).toContain("- dropped guest message"); + expect(calls[1]?.prompt).not.toContain("[Queue overflow]"); + expect(calls[1]?.prompt).not.toContain("dropped guest message"); + }); + + it("does not re-deliver overflow summary on partial auth group failure retry", async () => { + const key = `test-collect-overflow-partial-retry-${Date.now()}`; + const calls: FollowupRun[] = []; + const done = createDeferred(); + let attempt = 0; + const runFollowup = async (run: FollowupRun) => { + attempt += 1; + // First group succeeds (attempt 1), second group fails (attempt 2), + // then second group succeeds on retry (attempt 3). + if (attempt === 2) { + throw new Error("transient failure"); + } + calls.push(run); + if (calls.length >= 2) { + done.resolve(); + } + }; + const settings: QueueSettings = { + mode: "collect", + debounceMs: 0, + cap: 2, + dropPolicy: "summarize", + }; + + const droppedGuest = createRun({ + prompt: "dropped guest message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const guest = createRun({ + prompt: "guest message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + const owner = createRun({ + prompt: "owner message", + originatingChannel: "slack", + originatingTo: "channel:A", + }); + + enqueueFollowupRun( + key, + { + ...droppedGuest, + run: { + ...droppedGuest.run, + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...guest, + run: { + ...guest.run, + senderId: "user-1", + senderName: "Guest", + senderIsOwner: false, + }, + }, + settings, + ); + enqueueFollowupRun( + key, + { + ...owner, + run: { + ...owner.run, + senderId: "owner-1", + senderName: "Owner", + senderIsOwner: true, + }, + }, + settings, + ); + + scheduleFollowupDrain(key, runFollowup); + await done.promise; + + expect(calls).toHaveLength(2); + // First group got the overflow summary + expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); + expect(calls[0]?.prompt).toContain("- dropped guest message"); + // Second group (retried after failure) must NOT get the overflow summary again + expect(calls[1]?.prompt).not.toContain("[Queue overflow]"); + expect(calls[1]?.prompt).not.toContain("dropped guest message"); + expect(calls[1]?.prompt).toContain("owner message"); + }); + it("preserves routing metadata on overflow summary followups", async () => { const key = `test-overflow-summary-routing-${Date.now()}`; const calls: FollowupRun[] = []; @@ -289,3 +978,61 @@ describe("followup queue collect routing", () => { expect(calls[0]?.prompt).toContain("[Queue overflow] Dropped 1 message due to cap."); }); }); + +describe("resolveFollowupAuthorizationKey", () => { + it("changes when sender ownership changes", () => { + const run = createRun({ prompt: "one" }).run; + expect( + resolveFollowupAuthorizationKey({ + ...run, + senderId: "user-1", + senderIsOwner: false, + }), + ).not.toBe( + resolveFollowupAuthorizationKey({ + ...run, + senderId: "user-1", + senderIsOwner: true, + }), + ); + }); + + it("changes when exec defaults change", () => { + const run = createRun({ prompt: "one" }).run; + expect( + resolveFollowupAuthorizationKey({ + ...run, + senderId: "user-1", + bashElevated: { enabled: false, allowed: true, defaultLevel: "off" }, + }), + ).not.toBe( + resolveFollowupAuthorizationKey({ + ...run, + senderId: "user-1", + bashElevated: { enabled: true, allowed: true, defaultLevel: "on" }, + execOverrides: { ask: "always" }, + }), + ); + }); + + it("does not change when only sender display fields change", () => { + const run = createRun({ prompt: "one" }).run; + expect( + resolveFollowupAuthorizationKey({ + ...run, + senderId: "user-1", + senderName: "Guest", + senderUsername: "guest", + senderIsOwner: false, + }), + ).toBe( + resolveFollowupAuthorizationKey({ + ...run, + senderId: "user-1", + senderName: "Guest User", + senderUsername: "guest-renamed", + senderIsOwner: false, + }), + ); + }); +}); diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index 491062de4fb..30584259040 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -59,6 +59,63 @@ function resolveOriginRoutingMetadata(items: FollowupRun[]): OriginRoutingMetada }; } +// Keep this key aligned with the fields that affect per-message authorization or +// exec-context propagation in collect-mode batching. Display-only sender fields +// stay out of the key so profile/name drift does not force conservative splits. +// Fields like authProfileId, elevatedLevel, ownerNumbers, and config are +// intentionally excluded because they are session-level or not consulted in +// per-message authorization checks. +export function resolveFollowupAuthorizationKey(run: FollowupRun["run"]): string { + return JSON.stringify([ + run.senderId ?? "", + run.senderE164 ?? "", + run.senderIsOwner === true, + run.execOverrides?.host ?? "", + run.execOverrides?.security ?? "", + run.execOverrides?.ask ?? "", + run.execOverrides?.node ?? "", + run.bashElevated?.enabled === true, + run.bashElevated?.allowed === true, + run.bashElevated?.defaultLevel ?? "", + ]); +} + +function splitCollectItemsByAuthorization(items: FollowupRun[]): FollowupRun[][] { + if (items.length <= 1) { + return items.length === 0 ? [] : [items]; + } + + const groups: FollowupRun[][] = []; + let currentGroup: FollowupRun[] = []; + let currentKey: string | undefined; + + for (const item of items) { + const itemKey = resolveFollowupAuthorizationKey(item.run); + if (currentGroup.length === 0 || itemKey === currentKey) { + currentGroup.push(item); + currentKey = itemKey; + continue; + } + + groups.push(currentGroup); + currentGroup = [item]; + currentKey = itemKey; + } + + if (currentGroup.length > 0) { + groups.push(currentGroup); + } + + return groups; +} + +function renderCollectItem(item: FollowupRun, idx: number): string { + const senderLabel = + item.run.senderName ?? item.run.senderUsername ?? item.run.senderId ?? item.run.senderE164; + const senderSuffix = senderLabel ? ` (from ${senderLabel})` : ""; + return `---\nQueued #${idx + 1}${senderSuffix}\n${item.prompt}`.trim(); +} + function resolveCrossChannelKey(item: FollowupRun): { cross?: true; key?: string } { const { originatingChannel: channel, originatingTo: to, originatingAccountId: accountId } = item; const threadId = item.originatingThreadId; @@ -108,6 +165,17 @@ export function scheduleFollowupDrain( run: effectiveRunFollowup, }); if (collectDrainResult === "empty") { + const summaryOnlyPrompt = previewQueueSummaryPrompt({ state: queue, noun: "message" }); + const run = queue.lastRun; + if (summaryOnlyPrompt && run) { + await effectiveRunFollowup({ + prompt: summaryOnlyPrompt, + run, + enqueuedAt: Date.now(), + }); + clearQueueSummaryState(queue); + continue; + } break; } if (collectDrainResult === "drained") { @@ -116,28 +184,46 @@ export function scheduleFollowupDrain( const items = queue.items.slice(); const summary = previewQueueSummaryPrompt({ state: queue, noun: "message" }); - const run = items.at(-1)?.run ?? queue.lastRun; - if (!run) { - break; + const authGroups = splitCollectItemsByAuthorization(items); + if (authGroups.length === 0) { + const run = queue.lastRun; + if (!summary || !run) { + break; + } + await effectiveRunFollowup({ + prompt: summary, + run, + enqueuedAt: Date.now(), + }); + clearQueueSummaryState(queue); + continue; } - const routing = resolveOriginRoutingMetadata(items); + let pendingSummary = summary; + for (const groupItems of authGroups) { + const run = groupItems.at(-1)?.run ?? queue.lastRun; + if (!run) { + break; + } - const prompt = buildCollectPrompt({ - title: "[Queued messages while agent was busy]", - items, - summary, - renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), - }); - await effectiveRunFollowup({ - prompt, - run, - enqueuedAt: Date.now(), - ...routing, - }); - queue.items.splice(0, items.length); - if (summary) { - clearQueueSummaryState(queue); + const routing = resolveOriginRoutingMetadata(groupItems); + const prompt = buildCollectPrompt({ + title: "[Queued messages while agent was busy]", + items: groupItems, + summary: pendingSummary, + renderItem: renderCollectItem, + }); + await effectiveRunFollowup({ + prompt, + run, + enqueuedAt: Date.now(), + ...routing, + }); + queue.items.splice(0, groupItems.length); + if (pendingSummary) { + clearQueueSummaryState(queue); + pendingSummary = undefined; + } } continue; }