fix: group collect queue deliveries

This commit is contained in:
jesse-merhi
2026-04-29 13:09:20 +10:00
parent 2b8c20c8a3
commit 0015d34fda
2 changed files with 143 additions and 17 deletions

View File

@@ -29,6 +29,14 @@ function createRetryingSend() {
return { send, prompts, waitForSecondAttempt };
}
function createCollectSendRecorder() {
const calls: AnnounceQueueItem[] = [];
const send = vi.fn(async (item: AnnounceQueueItem) => {
calls.push(item);
});
return { calls, send };
}
describe("subagent-announce-queue", () => {
afterEach(() => {
vi.useRealTimers();
@@ -122,6 +130,82 @@ describe("subagent-announce-queue", () => {
expect(sender.prompts[1]).toContain("queued item two");
});
it("splits collect-mode batches when target authorization context changes", async () => {
const sender = createCollectSendRecorder();
const settings = { mode: "collect", debounceMs: 0 } as const;
const origin = { channel: "slack", to: "channel:C123", accountId: "acct-1" };
enqueueAnnounce({
key: "announce:test:collect-auth-split",
item: {
prompt: "first child completed",
enqueuedAt: Date.now(),
sessionKey: "agent:main:slack:thread:a",
origin,
},
settings,
send: sender.send,
});
enqueueAnnounce({
key: "announce:test:collect-auth-split",
item: {
prompt: "second child completed",
enqueuedAt: Date.now(),
sessionKey: "agent:main:slack:thread:b",
origin,
},
settings,
send: sender.send,
});
await vi.waitFor(() => {
expect(sender.send).toHaveBeenCalledTimes(2);
});
expect(sender.calls.map((call) => call.sessionKey)).toEqual([
"agent:main:slack:thread:a",
"agent:main:slack:thread:b",
]);
expect(sender.calls[0]?.prompt).toContain("first child completed");
expect(sender.calls[0]?.prompt).not.toContain("second child completed");
expect(sender.calls[1]?.prompt).toContain("second child completed");
});
it("keeps one collect-mode batch when target authorization context matches", async () => {
const sender = createCollectSendRecorder();
const settings = { mode: "collect", debounceMs: 0 } as const;
const origin = { channel: "slack", to: "channel:C123", accountId: "acct-1" };
enqueueAnnounce({
key: "announce:test:collect-auth-match",
item: {
prompt: "first child completed",
enqueuedAt: Date.now(),
sessionKey: "agent:main:slack:thread:a",
origin,
},
settings,
send: sender.send,
});
enqueueAnnounce({
key: "announce:test:collect-auth-match",
item: {
prompt: "second child completed",
enqueuedAt: Date.now(),
sessionKey: "agent:main:slack:thread:a",
origin,
},
settings,
send: sender.send,
});
await vi.waitFor(() => {
expect(sender.send).toHaveBeenCalledTimes(1);
});
expect(sender.calls[0]?.sessionKey).toBe("agent:main:slack:thread:a");
expect(sender.calls[0]?.prompt).toContain("first child completed");
expect(sender.calls[0]?.prompt).toContain("second child completed");
});
it("waits until a busy parent session becomes idle before draining", async () => {
vi.useFakeTimers();
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));

View File

@@ -111,6 +111,39 @@ function getAnnounceQueue(
return created;
}
function resolveAnnounceAuthorizationKey(item: AnnounceQueueItem): string {
return JSON.stringify([item.sessionKey, item.originKey ?? ""]);
}
function splitCollectItemsByAuthorization(items: AnnounceQueueItem[]): AnnounceQueueItem[][] {
if (items.length <= 1) {
return items.length === 0 ? [] : [items];
}
const groups: AnnounceQueueItem[][] = [];
let currentGroup: AnnounceQueueItem[] = [];
let currentKey: string | undefined;
for (const item of items) {
const itemKey = resolveAnnounceAuthorizationKey(item);
if (currentGroup.length === 0 || itemKey === currentKey) {
currentGroup.push(item);
currentKey = itemKey;
continue;
}
groups.push(currentGroup);
currentGroup = [item];
currentKey = itemKey;
}
if (currentGroup.length > 0) {
groups.push(currentGroup);
}
return groups;
}
function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean {
return hasCrossChannelItems(items, (item) => {
if (!item.origin) {
@@ -171,25 +204,34 @@ function scheduleAnnounceDrain(key: string) {
}
const items = queue.items.slice();
const summary = previewQueueSummaryPrompt({ state: queue, noun: "announce" });
const prompt = buildCollectPrompt({
title: "[Queued announce messages while agent was busy]",
items,
summary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
const internalEvents = items.flatMap((item) => item.internalEvents ?? []);
const last = items.at(-1);
if (!last) {
const authGroups = splitCollectItemsByAuthorization(items);
if (authGroups.length === 0) {
break;
}
await queue.send({
...last,
prompt,
internalEvents: internalEvents.length > 0 ? internalEvents : last.internalEvents,
});
queue.items.splice(0, items.length);
if (summary) {
clearQueueSummaryState(queue);
let pendingSummary = summary;
for (const groupItems of authGroups) {
const prompt = buildCollectPrompt({
title: "[Queued announce messages while agent was busy]",
items: groupItems,
summary: pendingSummary,
renderItem: (item, idx) => `---\nQueued #${idx + 1}\n${item.prompt}`.trim(),
});
const internalEvents = groupItems.flatMap((item) => item.internalEvents ?? []);
const last = groupItems.at(-1);
if (!last) {
break;
}
await queue.send({
...last,
prompt,
internalEvents: internalEvents.length > 0 ? internalEvents : last.internalEvents,
});
queue.items.splice(0, groupItems.length);
if (pendingSummary) {
clearQueueSummaryState(queue);
pendingSummary = undefined;
}
}
continue;
}