diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 1658ca83c8d..62b51be56f9 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -475,6 +475,7 @@ async function maybeQueueSubagentAnnounce(params: { }, settings: queueSettings, send: sendAnnounce, + shouldDefer: (item) => resolveRequesterSessionActivity(item.sessionKey).isActive, }); return didQueue ? "queued" : "dropped"; } diff --git a/src/agents/subagent-announce-queue.test.ts b/src/agents/subagent-announce-queue.test.ts index b638b2fad3f..2ed26841ef9 100644 --- a/src/agents/subagent-announce-queue.test.ts +++ b/src/agents/subagent-announce-queue.test.ts @@ -1,5 +1,9 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { enqueueAnnounce, resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; +import { + type AnnounceQueueItem, + enqueueAnnounce, + resetAnnounceQueuesForTests, +} from "./subagent-announce-queue.js"; function createRetryingSend() { const prompts: string[] = []; @@ -118,6 +122,131 @@ describe("subagent-announce-queue", () => { expect(sender.prompts[1]).toContain("queued item two"); }); + it("waits until a busy parent session becomes idle before draining", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + let parentBusy = true; + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:busy-parent", + item: { + prompt: "child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + shouldDefer: () => parentBusy, + }); + + await vi.advanceTimersByTimeAsync(249); + expect(send).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(send).not.toHaveBeenCalled(); + + parentBusy = false; + await vi.advanceTimersByTimeAsync(250); + expect(send).toHaveBeenCalledTimes(1); + expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed"); + }); + + it("preserves an existing defer hook when the same queue is reused without one", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + let parentBusy = true; + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:reuse-keeps-defer", + item: { + prompt: "first child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + shouldDefer: () => parentBusy, + }); + + enqueueAnnounce({ + key: "announce:test:reuse-keeps-defer", + item: { + prompt: "second child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + }); + + await vi.advanceTimersByTimeAsync(250); + expect(send).not.toHaveBeenCalled(); + + parentBusy = false; + await vi.advanceTimersByTimeAsync(250); + expect(send).toHaveBeenCalledTimes(2); + }); + + it("polls deferred items at the configured cadence after the first debounce", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + let parentBusy = true; + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:defer-cadence", + item: { + prompt: "child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 1_000 }, + send, + shouldDefer: () => parentBusy, + }); + + await vi.advanceTimersByTimeAsync(1_000); + expect(send).not.toHaveBeenCalled(); + + parentBusy = false; + await vi.advanceTimersByTimeAsync(999); + expect(send).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(send).toHaveBeenCalledTimes(1); + }); + + it("falls back to delivery when busy-parent deferral exceeds the safety cap", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:busy-parent-timeout", + item: { + prompt: "child completed after stale busy state", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + shouldDefer: () => true, + }); + + await vi.advanceTimersByTimeAsync(14_999); + expect(send).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(send).toHaveBeenCalledTimes(1); + expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed after stale busy state"); + }); + it("uses debounce floor for retries when debounce exceeds backoff", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 556011540d2..4d0d1d19d61 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -50,11 +50,14 @@ type AnnounceQueueState = { droppedCount: number; summaryLines: string[]; send: (item: AnnounceQueueItem) => Promise; + /** Return true while the target parent session is still busy and delivery should wait. */ + shouldDefer?: (item: AnnounceQueueItem) => boolean; /** Consecutive drain failures — drives exponential backoff on errors. */ consecutiveFailures: number; }; const ANNOUNCE_QUEUES = new Map(); +const MAX_DEFER_WHILE_BUSY_MS = 15_000; export function resetAnnounceQueuesForTests() { // Test isolation: other suites may leave a draining queue behind in the worker. @@ -72,6 +75,7 @@ function getAnnounceQueue( key: string, settings: AnnounceQueueSettings, send: (item: AnnounceQueueItem) => Promise, + shouldDefer?: (item: AnnounceQueueItem) => boolean, ) { const existing = ANNOUNCE_QUEUES.get(key); if (existing) { @@ -80,6 +84,9 @@ function getAnnounceQueue( settings, }); existing.send = send; + if (shouldDefer !== undefined) { + existing.shouldDefer = shouldDefer; + } return existing; } const created: AnnounceQueueState = { @@ -93,6 +100,7 @@ function getAnnounceQueue( droppedCount: 0, summaryLines: [], send, + shouldDefer, consecutiveFailures: 0, }; applyQueueRuntimeSettings({ @@ -115,6 +123,20 @@ function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean { }); } +function shouldDeferAnnounceQueueItem(queue: AnnounceQueueState, item: AnnounceQueueItem): boolean { + if (!queue.shouldDefer?.(item)) { + return false; + } + return Date.now() - item.enqueuedAt < MAX_DEFER_WHILE_BUSY_MS; +} + +function waitBeforeDeferredAnnounceRetry(queue: AnnounceQueueState): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, Math.max(250, queue.debounceMs)); + timer.unref?.(); + }); +} + function scheduleAnnounceDrain(key: string) { const queue = beginQueueDrain(ANNOUNCE_QUEUES, key); if (!queue) { @@ -128,6 +150,12 @@ function scheduleAnnounceDrain(key: string) { break; } await waitForQueueDebounce(queue); + const nextItem = queue.items[0]; + if (nextItem && shouldDeferAnnounceQueueItem(queue, nextItem)) { + await waitBeforeDeferredAnnounceRetry(queue); + queue.lastEnqueuedAt = Date.now() - queue.debounceMs; + continue; + } if (queue.mode === "collect") { const collectDrainResult = await drainCollectQueueStep({ collectState, @@ -211,8 +239,9 @@ export function enqueueAnnounce(params: { item: AnnounceQueueItem; settings: AnnounceQueueSettings; send: (item: AnnounceQueueItem) => Promise; + shouldDefer?: (item: AnnounceQueueItem) => boolean; }): boolean { - const queue = getAnnounceQueue(params.key, params.settings, params.send); + const queue = getAnnounceQueue(params.key, params.settings, params.send, params.shouldDefer); // Preserve any retry backoff marker already encoded in lastEnqueuedAt. queue.lastEnqueuedAt = Math.max(queue.lastEnqueuedAt, Date.now());