From 1841dd9977cb63780a9d133053edcc911d14ddd7 Mon Sep 17 00:00:00 2001 From: Evgeniy <68610242+Udjin79@users.noreply.github.com> Date: Sun, 26 Apr 2026 00:08:50 +0300 Subject: [PATCH] fix(subagent-announce): defer drain while parent session is busy (#71706) When a subagent finishes while its parent main session is still running (executing tools or awaiting model output), the announce queue would follow the configured debounce and immediately attempt to deliver the completion event back into the parent session via callGateway. The gateway treats the parent as busy and the announce can either get buffered until the next external user message or surface only as a delayed echo, breaking the natural sessions_spawn -> sessions_yield workflow where the parent expects the result to arrive as the next turn. This change adds an optional shouldDefer hook on the announce queue state. The delivery layer wires it to the existing requester session activity probe (resolveRequesterSessionActivity), so while the parent session is still active the drain loop sleeps for max(250ms, debounceMs) and re-checks instead of pushing the announce. As soon as the parent goes idle, the queue drains normally. - Plumbs shouldDefer through getAnnounceQueue / enqueueAnnounce. - Skips drain step in scheduleAnnounceDrain when shouldDefer says the target is still busy, with a bounded re-check sleep. - Updates maybeQueueSubagentAnnounce to pass the activity probe. - Adds a unit test that holds drain while parent is busy and resumes when it goes idle. No behavior change for callers that do not pass shouldDefer. --- src/agents/subagent-announce-delivery.ts | 1 + src/agents/subagent-announce-queue.test.ts | 131 ++++++++++++++++++++- src/agents/subagent-announce-queue.ts | 31 ++++- 3 files changed, 161 insertions(+), 2 deletions(-) diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 1658ca83c8d..62b51be56f9 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -475,6 +475,7 @@ async function maybeQueueSubagentAnnounce(params: { }, settings: queueSettings, send: sendAnnounce, + shouldDefer: (item) => resolveRequesterSessionActivity(item.sessionKey).isActive, }); return didQueue ? "queued" : "dropped"; } diff --git a/src/agents/subagent-announce-queue.test.ts b/src/agents/subagent-announce-queue.test.ts index b638b2fad3f..2ed26841ef9 100644 --- a/src/agents/subagent-announce-queue.test.ts +++ b/src/agents/subagent-announce-queue.test.ts @@ -1,5 +1,9 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { enqueueAnnounce, resetAnnounceQueuesForTests } from "./subagent-announce-queue.js"; +import { + type AnnounceQueueItem, + enqueueAnnounce, + resetAnnounceQueuesForTests, +} from "./subagent-announce-queue.js"; function createRetryingSend() { const prompts: string[] = []; @@ -118,6 +122,131 @@ describe("subagent-announce-queue", () => { expect(sender.prompts[1]).toContain("queued item two"); }); + it("waits until a busy parent session becomes idle before draining", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + let parentBusy = true; + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:busy-parent", + item: { + prompt: "child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + shouldDefer: () => parentBusy, + }); + + await vi.advanceTimersByTimeAsync(249); + expect(send).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(send).not.toHaveBeenCalled(); + + parentBusy = false; + await vi.advanceTimersByTimeAsync(250); + expect(send).toHaveBeenCalledTimes(1); + expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed"); + }); + + it("preserves an existing defer hook when the same queue is reused without one", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + let parentBusy = true; + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:reuse-keeps-defer", + item: { + prompt: "first child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + shouldDefer: () => parentBusy, + }); + + enqueueAnnounce({ + key: "announce:test:reuse-keeps-defer", + item: { + prompt: "second child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + }); + + await vi.advanceTimersByTimeAsync(250); + expect(send).not.toHaveBeenCalled(); + + parentBusy = false; + await vi.advanceTimersByTimeAsync(250); + expect(send).toHaveBeenCalledTimes(2); + }); + + it("polls deferred items at the configured cadence after the first debounce", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + let parentBusy = true; + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:defer-cadence", + item: { + prompt: "child completed", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 1_000 }, + send, + shouldDefer: () => parentBusy, + }); + + await vi.advanceTimersByTimeAsync(1_000); + expect(send).not.toHaveBeenCalled(); + + parentBusy = false; + await vi.advanceTimersByTimeAsync(999); + expect(send).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(send).toHaveBeenCalledTimes(1); + }); + + it("falls back to delivery when busy-parent deferral exceeds the safety cap", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + + const send = vi.fn(async (_item: AnnounceQueueItem) => {}); + + enqueueAnnounce({ + key: "announce:test:busy-parent-timeout", + item: { + prompt: "child completed after stale busy state", + enqueuedAt: Date.now(), + sessionKey: "agent:main:telegram:dm:u1", + }, + settings: { mode: "followup", debounceMs: 0 }, + send, + shouldDefer: () => true, + }); + + await vi.advanceTimersByTimeAsync(14_999); + expect(send).not.toHaveBeenCalled(); + + await vi.advanceTimersByTimeAsync(1); + expect(send).toHaveBeenCalledTimes(1); + expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed after stale busy state"); + }); + it("uses debounce floor for retries when debounce exceeds backoff", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index 556011540d2..4d0d1d19d61 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -50,11 +50,14 @@ type AnnounceQueueState = { droppedCount: number; summaryLines: string[]; send: (item: AnnounceQueueItem) => Promise; + /** Return true while the target parent session is still busy and delivery should wait. */ + shouldDefer?: (item: AnnounceQueueItem) => boolean; /** Consecutive drain failures — drives exponential backoff on errors. */ consecutiveFailures: number; }; const ANNOUNCE_QUEUES = new Map(); +const MAX_DEFER_WHILE_BUSY_MS = 15_000; export function resetAnnounceQueuesForTests() { // Test isolation: other suites may leave a draining queue behind in the worker. @@ -72,6 +75,7 @@ function getAnnounceQueue( key: string, settings: AnnounceQueueSettings, send: (item: AnnounceQueueItem) => Promise, + shouldDefer?: (item: AnnounceQueueItem) => boolean, ) { const existing = ANNOUNCE_QUEUES.get(key); if (existing) { @@ -80,6 +84,9 @@ function getAnnounceQueue( settings, }); existing.send = send; + if (shouldDefer !== undefined) { + existing.shouldDefer = shouldDefer; + } return existing; } const created: AnnounceQueueState = { @@ -93,6 +100,7 @@ function getAnnounceQueue( droppedCount: 0, summaryLines: [], send, + shouldDefer, consecutiveFailures: 0, }; applyQueueRuntimeSettings({ @@ -115,6 +123,20 @@ function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean { }); } +function shouldDeferAnnounceQueueItem(queue: AnnounceQueueState, item: AnnounceQueueItem): boolean { + if (!queue.shouldDefer?.(item)) { + return false; + } + return Date.now() - item.enqueuedAt < MAX_DEFER_WHILE_BUSY_MS; +} + +function waitBeforeDeferredAnnounceRetry(queue: AnnounceQueueState): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, Math.max(250, queue.debounceMs)); + timer.unref?.(); + }); +} + function scheduleAnnounceDrain(key: string) { const queue = beginQueueDrain(ANNOUNCE_QUEUES, key); if (!queue) { @@ -128,6 +150,12 @@ function scheduleAnnounceDrain(key: string) { break; } await waitForQueueDebounce(queue); + const nextItem = queue.items[0]; + if (nextItem && shouldDeferAnnounceQueueItem(queue, nextItem)) { + await waitBeforeDeferredAnnounceRetry(queue); + queue.lastEnqueuedAt = Date.now() - queue.debounceMs; + continue; + } if (queue.mode === "collect") { const collectDrainResult = await drainCollectQueueStep({ collectState, @@ -211,8 +239,9 @@ export function enqueueAnnounce(params: { item: AnnounceQueueItem; settings: AnnounceQueueSettings; send: (item: AnnounceQueueItem) => Promise; + shouldDefer?: (item: AnnounceQueueItem) => boolean; }): boolean { - const queue = getAnnounceQueue(params.key, params.settings, params.send); + const queue = getAnnounceQueue(params.key, params.settings, params.send, params.shouldDefer); // Preserve any retry backoff marker already encoded in lastEnqueuedAt. queue.lastEnqueuedAt = Math.max(queue.lastEnqueuedAt, Date.now());