mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:10:45 +00:00
fix(subagent-announce): defer drain while parent session is busy (#71706)
When a subagent finishes while its parent main session is still running (executing tools or awaiting model output), the announce queue would follow the configured debounce and immediately attempt to deliver the completion event back into the parent session via callGateway. The gateway treats the parent as busy and the announce can either get buffered until the next external user message or surface only as a delayed echo, breaking the natural sessions_spawn -> sessions_yield workflow where the parent expects the result to arrive as the next turn. This change adds an optional shouldDefer hook on the announce queue state. The delivery layer wires it to the existing requester session activity probe (resolveRequesterSessionActivity), so while the parent session is still active the drain loop sleeps for max(250ms, debounceMs) and re-checks instead of pushing the announce. As soon as the parent goes idle, the queue drains normally. - Plumbs shouldDefer through getAnnounceQueue / enqueueAnnounce. - Skips drain step in scheduleAnnounceDrain when shouldDefer says the target is still busy, with a bounded re-check sleep. - Updates maybeQueueSubagentAnnounce to pass the activity probe. - Adds a unit test that holds drain while parent is busy and resumes when it goes idle. No behavior change for callers that do not pass shouldDefer.
This commit is contained in:
@@ -475,6 +475,7 @@ async function maybeQueueSubagentAnnounce(params: {
|
||||
},
|
||||
settings: queueSettings,
|
||||
send: sendAnnounce,
|
||||
shouldDefer: (item) => resolveRequesterSessionActivity(item.sessionKey).isActive,
|
||||
});
|
||||
return didQueue ? "queued" : "dropped";
|
||||
}
|
||||
|
||||
@@ -1,5 +1,9 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { enqueueAnnounce, resetAnnounceQueuesForTests } from "./subagent-announce-queue.js";
|
||||
import {
|
||||
type AnnounceQueueItem,
|
||||
enqueueAnnounce,
|
||||
resetAnnounceQueuesForTests,
|
||||
} from "./subagent-announce-queue.js";
|
||||
|
||||
function createRetryingSend() {
|
||||
const prompts: string[] = [];
|
||||
@@ -118,6 +122,131 @@ describe("subagent-announce-queue", () => {
|
||||
expect(sender.prompts[1]).toContain("queued item two");
|
||||
});
|
||||
|
||||
it("waits until a busy parent session becomes idle before draining", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
|
||||
|
||||
let parentBusy = true;
|
||||
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "announce:test:busy-parent",
|
||||
item: {
|
||||
prompt: "child completed",
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: "agent:main:telegram:dm:u1",
|
||||
},
|
||||
settings: { mode: "followup", debounceMs: 0 },
|
||||
send,
|
||||
shouldDefer: () => parentBusy,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(249);
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
|
||||
parentBusy = false;
|
||||
await vi.advanceTimersByTimeAsync(250);
|
||||
expect(send).toHaveBeenCalledTimes(1);
|
||||
expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed");
|
||||
});
|
||||
|
||||
it("preserves an existing defer hook when the same queue is reused without one", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
|
||||
|
||||
let parentBusy = true;
|
||||
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "announce:test:reuse-keeps-defer",
|
||||
item: {
|
||||
prompt: "first child completed",
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: "agent:main:telegram:dm:u1",
|
||||
},
|
||||
settings: { mode: "followup", debounceMs: 0 },
|
||||
send,
|
||||
shouldDefer: () => parentBusy,
|
||||
});
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "announce:test:reuse-keeps-defer",
|
||||
item: {
|
||||
prompt: "second child completed",
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: "agent:main:telegram:dm:u1",
|
||||
},
|
||||
settings: { mode: "followup", debounceMs: 0 },
|
||||
send,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(250);
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
|
||||
parentBusy = false;
|
||||
await vi.advanceTimersByTimeAsync(250);
|
||||
expect(send).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("polls deferred items at the configured cadence after the first debounce", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
|
||||
|
||||
let parentBusy = true;
|
||||
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "announce:test:defer-cadence",
|
||||
item: {
|
||||
prompt: "child completed",
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: "agent:main:telegram:dm:u1",
|
||||
},
|
||||
settings: { mode: "followup", debounceMs: 1_000 },
|
||||
send,
|
||||
shouldDefer: () => parentBusy,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
|
||||
parentBusy = false;
|
||||
await vi.advanceTimersByTimeAsync(999);
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(send).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("falls back to delivery when busy-parent deferral exceeds the safety cap", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
|
||||
|
||||
const send = vi.fn(async (_item: AnnounceQueueItem) => {});
|
||||
|
||||
enqueueAnnounce({
|
||||
key: "announce:test:busy-parent-timeout",
|
||||
item: {
|
||||
prompt: "child completed after stale busy state",
|
||||
enqueuedAt: Date.now(),
|
||||
sessionKey: "agent:main:telegram:dm:u1",
|
||||
},
|
||||
settings: { mode: "followup", debounceMs: 0 },
|
||||
send,
|
||||
shouldDefer: () => true,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(14_999);
|
||||
expect(send).not.toHaveBeenCalled();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(send).toHaveBeenCalledTimes(1);
|
||||
expect(send.mock.calls[0]?.[0]?.prompt).toBe("child completed after stale busy state");
|
||||
});
|
||||
|
||||
it("uses debounce floor for retries when debounce exceeds backoff", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z"));
|
||||
|
||||
@@ -50,11 +50,14 @@ type AnnounceQueueState = {
|
||||
droppedCount: number;
|
||||
summaryLines: string[];
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
/** Return true while the target parent session is still busy and delivery should wait. */
|
||||
shouldDefer?: (item: AnnounceQueueItem) => boolean;
|
||||
/** Consecutive drain failures — drives exponential backoff on errors. */
|
||||
consecutiveFailures: number;
|
||||
};
|
||||
|
||||
const ANNOUNCE_QUEUES = new Map<string, AnnounceQueueState>();
|
||||
const MAX_DEFER_WHILE_BUSY_MS = 15_000;
|
||||
|
||||
export function resetAnnounceQueuesForTests() {
|
||||
// Test isolation: other suites may leave a draining queue behind in the worker.
|
||||
@@ -72,6 +75,7 @@ function getAnnounceQueue(
|
||||
key: string,
|
||||
settings: AnnounceQueueSettings,
|
||||
send: (item: AnnounceQueueItem) => Promise<void>,
|
||||
shouldDefer?: (item: AnnounceQueueItem) => boolean,
|
||||
) {
|
||||
const existing = ANNOUNCE_QUEUES.get(key);
|
||||
if (existing) {
|
||||
@@ -80,6 +84,9 @@ function getAnnounceQueue(
|
||||
settings,
|
||||
});
|
||||
existing.send = send;
|
||||
if (shouldDefer !== undefined) {
|
||||
existing.shouldDefer = shouldDefer;
|
||||
}
|
||||
return existing;
|
||||
}
|
||||
const created: AnnounceQueueState = {
|
||||
@@ -93,6 +100,7 @@ function getAnnounceQueue(
|
||||
droppedCount: 0,
|
||||
summaryLines: [],
|
||||
send,
|
||||
shouldDefer,
|
||||
consecutiveFailures: 0,
|
||||
};
|
||||
applyQueueRuntimeSettings({
|
||||
@@ -115,6 +123,20 @@ function hasAnnounceCrossChannelItems(items: AnnounceQueueItem[]): boolean {
|
||||
});
|
||||
}
|
||||
|
||||
function shouldDeferAnnounceQueueItem(queue: AnnounceQueueState, item: AnnounceQueueItem): boolean {
|
||||
if (!queue.shouldDefer?.(item)) {
|
||||
return false;
|
||||
}
|
||||
return Date.now() - item.enqueuedAt < MAX_DEFER_WHILE_BUSY_MS;
|
||||
}
|
||||
|
||||
function waitBeforeDeferredAnnounceRetry(queue: AnnounceQueueState): Promise<void> {
|
||||
return new Promise<void>((resolve) => {
|
||||
const timer = setTimeout(resolve, Math.max(250, queue.debounceMs));
|
||||
timer.unref?.();
|
||||
});
|
||||
}
|
||||
|
||||
function scheduleAnnounceDrain(key: string) {
|
||||
const queue = beginQueueDrain(ANNOUNCE_QUEUES, key);
|
||||
if (!queue) {
|
||||
@@ -128,6 +150,12 @@ function scheduleAnnounceDrain(key: string) {
|
||||
break;
|
||||
}
|
||||
await waitForQueueDebounce(queue);
|
||||
const nextItem = queue.items[0];
|
||||
if (nextItem && shouldDeferAnnounceQueueItem(queue, nextItem)) {
|
||||
await waitBeforeDeferredAnnounceRetry(queue);
|
||||
queue.lastEnqueuedAt = Date.now() - queue.debounceMs;
|
||||
continue;
|
||||
}
|
||||
if (queue.mode === "collect") {
|
||||
const collectDrainResult = await drainCollectQueueStep({
|
||||
collectState,
|
||||
@@ -211,8 +239,9 @@ export function enqueueAnnounce(params: {
|
||||
item: AnnounceQueueItem;
|
||||
settings: AnnounceQueueSettings;
|
||||
send: (item: AnnounceQueueItem) => Promise<void>;
|
||||
shouldDefer?: (item: AnnounceQueueItem) => boolean;
|
||||
}): boolean {
|
||||
const queue = getAnnounceQueue(params.key, params.settings, params.send);
|
||||
const queue = getAnnounceQueue(params.key, params.settings, params.send, params.shouldDefer);
|
||||
// Preserve any retry backoff marker already encoded in lastEnqueuedAt.
|
||||
queue.lastEnqueuedAt = Math.max(queue.lastEnqueuedAt, Date.now());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user