diff --git a/src/agents/subagent-announce-queue.test.ts b/src/agents/subagent-announce-queue.test.ts index 2ed26841ef9..7f73f8343cb 100644 --- a/src/agents/subagent-announce-queue.test.ts +++ b/src/agents/subagent-announce-queue.test.ts @@ -29,6 +29,14 @@ function createRetryingSend() { return { send, prompts, waitForSecondAttempt }; } +function createCollectSendRecorder() { + const calls: AnnounceQueueItem[] = []; + const send = vi.fn(async (item: AnnounceQueueItem) => { + calls.push(item); + }); + return { calls, send }; +} + describe("subagent-announce-queue", () => { afterEach(() => { vi.useRealTimers(); @@ -122,6 +130,82 @@ describe("subagent-announce-queue", () => { expect(sender.prompts[1]).toContain("queued item two"); }); + it("splits collect-mode batches when target authorization context changes", async () => { + const sender = createCollectSendRecorder(); + const settings = { mode: "collect", debounceMs: 0 } as const; + const origin = { channel: "slack", to: "channel:C123", accountId: "acct-1" }; + + enqueueAnnounce({ + key: "announce:test:collect-auth-split", + item: { + prompt: "first child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:slack:thread:a", + origin, + }, + settings, + send: sender.send, + }); + enqueueAnnounce({ + key: "announce:test:collect-auth-split", + item: { + prompt: "second child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:slack:thread:b", + origin, + }, + settings, + send: sender.send, + }); + + await vi.waitFor(() => { + expect(sender.send).toHaveBeenCalledTimes(2); + }); + expect(sender.calls.map((call) => call.sessionKey)).toEqual([ + "agent:main:slack:thread:a", + "agent:main:slack:thread:b", + ]); + expect(sender.calls[0]?.prompt).toContain("first child completed"); + expect(sender.calls[0]?.prompt).not.toContain("second child completed"); + expect(sender.calls[1]?.prompt).toContain("second child completed"); + }); + + it("keeps one collect-mode batch when target authorization context matches", async () => { + const sender = createCollectSendRecorder(); + const settings = { mode: "collect", debounceMs: 0 } as const; + const origin = { channel: "slack", to: "channel:C123", accountId: "acct-1" }; + + enqueueAnnounce({ + key: "announce:test:collect-auth-match", + item: { + prompt: "first child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:slack:thread:a", + origin, + }, + settings, + send: sender.send, + }); + enqueueAnnounce({ + key: "announce:test:collect-auth-match", + item: { + prompt: "second child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:slack:thread:a", + origin, + }, + settings, + send: sender.send, + }); + + await vi.waitFor(() => { + expect(sender.send).toHaveBeenCalledTimes(1); + }); + expect(sender.calls[0]?.sessionKey).toBe("agent:main:slack:thread:a"); + expect(sender.calls[0]?.prompt).toContain("first child completed"); + expect(sender.calls[0]?.prompt).toContain("second child completed"); + }); + it("waits until a busy parent session becomes idle before draining", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 4d0d1d19d61..a1b5b653749 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -111,6 +111,39 @@ function getAnnounceQueue( return created; } +function resolveAnnounceAuthorizationKey(item: AnnounceQueueItem): string { + return JSON.stringify([item.sessionKey, item.originKey ?? ""]); +} + +function splitCollectItemsByAuthorization(items: AnnounceQueueItem[]): AnnounceQueueItem[][] { + if (items.length <= 1) { + return items.length === 0 ? [] : [items]; + } + + const groups: AnnounceQueueItem[][] = []; + let currentGroup: AnnounceQueueItem[] = []; + let currentKey: string | undefined; + + for (const item of items) { + const itemKey = resolveAnnounceAuthorizationKey(item); + if (currentGroup.length === 0 || itemKey === currentKey) { + currentGroup.push(item); + currentKey = itemKey; + continue; + } + + groups.push(currentGroup); + currentGroup = [item]; + currentKey = itemKey; + } + + if (currentGroup.length > 0) { + groups.push(currentGroup); + } + + return groups; +} + function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean { return hasCrossChannelItems(items, (item) => { if (!item.origin) { @@ -171,25 +204,34 @@ function scheduleAnnounceDrain(key: string) { } const items = queue.items.slice(); const summary = previewQueueSummaryPrompt({ state: queue, noun: "announce" }); - const prompt = buildCollectPrompt({ - title: "[Queued announce messages while agent was busy]", - items, - summary, - renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), - }); - const internalEvents = items.flatMap((item) => item.internalEvents ?? []); - const last = items.at(-1); - if (!last) { + const authGroups = splitCollectItemsByAuthorization(items); + if (authGroups.length === 0) { break; } - await queue.send({ - ...last, - prompt, - internalEvents: internalEvents.length > 0 ? internalEvents : last.internalEvents, - }); - queue.items.splice(0, items.length); - if (summary) { - clearQueueSummaryState(queue); + + let pendingSummary = summary; + for (const groupItems of authGroups) { + const prompt = buildCollectPrompt({ + title: "[Queued announce messages while agent was busy]", + items: groupItems, + summary: pendingSummary, + renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(), + }); + const internalEvents = groupItems.flatMap((item) => item.internalEvents ?? []); + const last = groupItems.at(-1); + if (!last) { + break; + } + await queue.send({ + ...last, + prompt, + internalEvents: internalEvents.length > 0 ? internalEvents : last.internalEvents, + }); + queue.items.splice(0, groupItems.length); + if (pendingSummary) { + clearQueueSummaryState(queue); + pendingSummary = undefined; + } } continue; }