From d65bc347eb671c6f2c3993f7e129db763c773f5b Mon Sep 17 00:00:00 2001 From: Tyler Yust Date: Wed, 4 Mar 2026 15:12:06 -0800 Subject: [PATCH] fix(subagents): deterministic descendant completion gating and child-result synthesis --- .../subagent-announce.format.e2e.test.ts | 230 ++++++++++++++++-- src/agents/subagent-announce.timeout.test.ts | 1 + src/agents/subagent-announce.ts | 183 ++++++++------ src/agents/subagent-registry-runtime.ts | 1 + 4 files changed, 319 insertions(+), 96 deletions(-) diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index cdea7e3d876..d3f75b670a9 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -37,6 +37,7 @@ const subagentRegistryMock = { countActiveDescendantRuns: vi.fn((_sessionKey: string) => 0), countPendingDescendantRuns: vi.fn((_sessionKey: string) => 0), countPendingDescendantRunsExcludingRun: vi.fn((_sessionKey: string, _runId: string) => 0), + listSubagentRunsForRequester: vi.fn((_sessionKey: string) => []), resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null), }; const subagentDeliveryTargetHookMock = vi.fn( @@ -198,6 +199,7 @@ describe("subagent announce formatting", () => { .mockImplementation((sessionKey: string, _runId: string) => subagentRegistryMock.countPendingDescendantRuns(sessionKey), ); + subagentRegistryMock.listSubagentRunsForRequester.mockClear().mockReturnValue([]); subagentRegistryMock.resolveRequesterForChildSession.mockClear().mockReturnValue(null); hasSubagentDeliveryTargetHook = false; hookRunnerMock.hasHooks.mockClear(); @@ -1774,9 +1776,8 @@ describe("subagent announce formatting", () => { ); }); - it("defers nested parent announce when active descendants exist even if pending snapshot is stale", async () => { - subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0); - subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => + it("defers nested parent announce while pending descendants still exist", async () => { + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => sessionKey === "agent:main:subagent:parent" ? 1 : 0, ); @@ -1809,7 +1810,7 @@ describe("subagent announce formatting", () => { expect(msg).not.toContain("wait for the remaining results"); }); - it("defers announce while finished runs still have active descendants", async () => { + it("defers announce while finished runs still have pending descendants", async () => { const cases = [ { childRunId: "run-parent", @@ -1824,7 +1825,7 @@ describe("subagent announce formatting", () => { for (const testCase of cases) { agentSpy.mockClear(); sendSpy.mockClear(); - subagentRegistryMock.countActiveDescendantRuns.mockImplementation((sessionKey: string) => + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => sessionKey === "agent:main:subagent:parent" ? 1 : 0, ); @@ -1843,35 +1844,214 @@ describe("subagent announce formatting", () => { } }); - it("waits for updated synthesized output before announcing nested subagent completion", async () => { - let historyReads = 0; - chatHistoryMock.mockImplementation(async () => { - historyReads += 1; - if (historyReads < 3) { - return { - messages: [{ role: "assistant", content: "Waiting for child output..." }], - }; - } - return { - messages: [{ role: "assistant", content: "Final synthesized answer." }], - }; - }); - readLatestAssistantReplyMock.mockResolvedValue(undefined); + it("defers completion announce when one descendant child is still pending", async () => { + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:subagent:parent" ? 1 : 0, + ); const didAnnounce = await runSubagentAnnounceFlow({ childSessionKey: "agent:main:subagent:parent", - childRunId: "run-parent-synth", - requesterSessionKey: "agent:main:subagent:orchestrator", - requesterDisplayKey: "agent:main:subagent:orchestrator", + childRunId: "run-parent-one-child-pending", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", ...defaultOutcomeAnnounce, - timeoutMs: 100, + expectsCompletionMessage: true, + roundOneReply: "waiting for one child completion", + }); + + expect(didAnnounce).toBe(false); + expect(agentSpy).not.toHaveBeenCalled(); + expect(sendSpy).not.toHaveBeenCalled(); + }); + + it("defers completion announce when two descendant children are still pending", async () => { + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:subagent:parent" ? 2 : 0, + ); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:parent", + childRunId: "run-parent-two-children-pending", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + roundOneReply: "waiting for both completion events", + }); + + expect(didAnnounce).toBe(false); + expect(agentSpy).not.toHaveBeenCalled(); + expect(sendSpy).not.toHaveBeenCalled(); + }); + + it("announces completion immediately when no descendants are pending", async () => { + subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0); + subagentRegistryMock.countActiveDescendantRuns.mockReturnValue(0); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:leaf", + childRunId: "run-leaf-no-children", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + roundOneReply: "single leaf result", }); expect(didAnnounce).toBe(true); + expect(agentSpy).toHaveBeenCalledTimes(1); + expect(sendSpy).not.toHaveBeenCalled(); const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; const msg = call?.params?.message ?? ""; - expect(msg).toContain("Final synthesized answer."); - expect(msg).not.toContain("Waiting for child output..."); + expect(msg).toContain("single leaf result"); + }); + + it("announces with direct child completion outputs once all descendants are settled", async () => { + subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0); + subagentRegistryMock.listSubagentRunsForRequester.mockImplementation((sessionKey: string) => + sessionKey === "agent:main:subagent:parent" + ? [ + { + runId: "run-child-a", + childSessionKey: "agent:main:subagent:parent:subagent:a", + requesterSessionKey: "agent:main:subagent:parent", + requesterDisplayKey: "parent", + task: "child task a", + label: "child-a", + cleanup: "keep", + createdAt: 10, + endedAt: 20, + cleanupCompletedAt: 21, + frozenResultText: "result from child a", + outcome: { status: "ok" }, + }, + { + runId: "run-child-b", + childSessionKey: "agent:main:subagent:parent:subagent:b", + requesterSessionKey: "agent:main:subagent:parent", + requesterDisplayKey: "parent", + task: "child task b", + label: "child-b", + cleanup: "keep", + createdAt: 11, + endedAt: 21, + cleanupCompletedAt: 22, + frozenResultText: "result from child b", + outcome: { status: "ok" }, + }, + ] + : [], + ); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:parent", + childRunId: "run-parent-settled", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + roundOneReply: "placeholder waiting text that should be ignored", + }); + + expect(didAnnounce).toBe(true); + expect(agentSpy).toHaveBeenCalledTimes(1); + const call = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + const msg = call?.params?.message ?? ""; + expect(msg).toContain("Child completion results:"); + expect(msg).toContain("result from child a"); + expect(msg).toContain("result from child b"); + expect(msg).not.toContain("placeholder waiting text that should be ignored"); + }); + + it("nested completion chains re-check child then parent deterministically", async () => { + const parentSessionKey = "agent:main:subagent:parent"; + const childSessionKey = "agent:main:subagent:parent:subagent:child"; + let parentPending = 1; + + subagentRegistryMock.countPendingDescendantRuns.mockImplementation((sessionKey: string) => { + if (sessionKey === parentSessionKey) { + return parentPending; + } + return 0; + }); + subagentRegistryMock.listSubagentRunsForRequester.mockImplementation((sessionKey: string) => { + if (sessionKey === childSessionKey) { + return [ + { + runId: "run-grandchild", + childSessionKey: `${childSessionKey}:subagent:grandchild`, + requesterSessionKey: childSessionKey, + requesterDisplayKey: "child", + task: "grandchild task", + label: "grandchild", + cleanup: "keep", + createdAt: 10, + endedAt: 20, + cleanupCompletedAt: 21, + frozenResultText: "grandchild final output", + outcome: { status: "ok" }, + }, + ]; + } + if (sessionKey === parentSessionKey && parentPending === 0) { + return [ + { + runId: "run-child", + childSessionKey, + requesterSessionKey: parentSessionKey, + requesterDisplayKey: "parent", + task: "child task", + label: "child", + cleanup: "keep", + createdAt: 11, + endedAt: 21, + cleanupCompletedAt: 22, + frozenResultText: "child synthesized output from grandchild", + outcome: { status: "ok" }, + }, + ]; + } + return []; + }); + + const parentDeferred = await runSubagentAnnounceFlow({ + childSessionKey: parentSessionKey, + childRunId: "run-parent", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + expect(parentDeferred).toBe(false); + expect(agentSpy).not.toHaveBeenCalled(); + + const childAnnounced = await runSubagentAnnounceFlow({ + childSessionKey, + childRunId: "run-child", + requesterSessionKey: parentSessionKey, + requesterDisplayKey: parentSessionKey, + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + expect(childAnnounced).toBe(true); + + parentPending = 0; + const parentAnnounced = await runSubagentAnnounceFlow({ + childSessionKey: parentSessionKey, + childRunId: "run-parent", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + }); + expect(parentAnnounced).toBe(true); + expect(agentSpy).toHaveBeenCalledTimes(2); + + const childCall = agentSpy.mock.calls[0]?.[0] as { params?: { message?: string } }; + expect(childCall?.params?.message ?? "").toContain("grandchild final output"); + + const parentCall = agentSpy.mock.calls[1]?.[0] as { params?: { message?: string } }; + expect(parentCall?.params?.message ?? "").toContain("child synthesized output from grandchild"); }); it("ignores post-completion announce traffic for completed run-mode requester sessions", async () => { diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 6d18a6acd60..755ab2c1556 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -54,6 +54,7 @@ vi.mock("./pi-embedded.js", () => ({ vi.mock("./subagent-registry.js", () => ({ countActiveDescendantRuns: () => 0, countPendingDescendantRuns: () => 0, + listSubagentRunsForRequester: () => [], isSubagentSessionRunActive: () => true, resolveRequesterForChildSession: () => null, })); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 2db5c8ccd5e..ea502e6a4c5 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -50,7 +50,6 @@ import { isAnnounceSkip } from "./tools/sessions-send-helpers.js"; const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1"; const FAST_TEST_RETRY_INTERVAL_MS = 8; -const FAST_TEST_REPLY_CHANGE_WAIT_MS = 20; const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 60_000; const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000; let subagentRegistryRuntimePromise: Promise< @@ -328,29 +327,63 @@ export async function captureSubagentCompletionReply( }); } -async function waitForSubagentOutputChange(params: { - sessionKey: string; - baselineReply: string; - maxWaitMs: number; -}): Promise { - const baseline = params.baselineReply.trim(); - if (!baseline) { - return params.baselineReply; +function describeSubagentOutcome(outcome?: SubagentRunOutcome): string { + if (!outcome) { + return "unknown"; } - const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100; - const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 5_000)); - let latest = params.baselineReply; - while (Date.now() < deadline) { - const next = await readLatestSubagentOutput(params.sessionKey); - if (next?.trim()) { - latest = next; - if (next.trim() !== baseline) { - return next; - } + if (outcome.status === "ok") { + return "ok"; + } + if (outcome.status === "timeout") { + return "timeout"; + } + if (outcome.status === "error") { + return outcome.error?.trim() ? `error: ${outcome.error.trim()}` : "error"; + } + return "unknown"; +} + +function buildChildCompletionFindings( + children: Array<{ + childSessionKey: string; + task: string; + label?: string; + createdAt: number; + endedAt?: number; + frozenResultText?: string | null; + outcome?: SubagentRunOutcome; + }>, +): string | undefined { + const sorted = [...children].toSorted((a, b) => { + if (a.createdAt !== b.createdAt) { + return a.createdAt - b.createdAt; } - await new Promise((resolve) => setTimeout(resolve, RETRY_INTERVAL_MS)); + const aEnded = typeof a.endedAt === "number" ? a.endedAt : Number.MAX_SAFE_INTEGER; + const bEnded = typeof b.endedAt === "number" ? b.endedAt : Number.MAX_SAFE_INTEGER; + return aEnded - bEnded; + }); + + const sections: string[] = []; + for (const [index, child] of sorted.entries()) { + const title = + child.label?.trim() || + child.task.trim() || + child.childSessionKey.trim() || + `child ${index + 1}`; + const resultText = child.frozenResultText?.trim(); + const outcome = describeSubagentOutcome(child.outcome); + sections.push( + [`${index + 1}. ${title}`, `status: ${outcome}`, "result:", resultText || "(no output)"].join( + "\n", + ), + ); } - return latest; + + if (sections.length === 0) { + return undefined; + } + + return ["Child completion results:", "", ...sections].join("\n\n"); } function formatDurationShort(valueMs?: number) { @@ -1116,36 +1149,6 @@ export async function runSubagentAnnounceFlow(params: { outcome = { status: "timeout" }; } } - reply = await readLatestSubagentOutput(params.childSessionKey); - } - - if (!reply) { - reply = await readLatestSubagentOutput(params.childSessionKey); - } - - if (!reply?.trim()) { - reply = await readLatestSubagentOutputWithRetry({ - sessionKey: params.childSessionKey, - maxWaitMs: params.timeoutMs, - }); - } - - if ( - !expectsCompletionMessage && - !reply?.trim() && - childSessionId && - isEmbeddedPiRunActive(childSessionId) - ) { - // Avoid announcing "(no output)" while the child run is still producing output. - shouldDeleteChildSession = false; - return false; - } - - if (isAnnounceSkip(reply)) { - return true; - } - if (isSilentReplyText(reply, SILENT_REPLY_TOKEN)) { - return true; } if (!outcome) { @@ -1155,30 +1158,68 @@ export async function runSubagentAnnounceFlow(params: { let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); let pendingChildDescendantRuns = 0; + let childCompletionFindings: string | undefined; try { - const { countPendingDescendantRuns, countActiveDescendantRuns } = + const { countPendingDescendantRuns, listSubagentRunsForRequester } = await loadSubagentRegistryRuntime(); - const pending = Math.max(0, countPendingDescendantRuns(params.childSessionKey)); - const active = Math.max(0, countActiveDescendantRuns(params.childSessionKey)); - pendingChildDescendantRuns = Math.max(pending, active); + pendingChildDescendantRuns = Math.max(0, countPendingDescendantRuns(params.childSessionKey)); + if (pendingChildDescendantRuns > 0) { + // Deterministic nested announce policy: if this run still has unfinished + // descendants, do not announce yet. Wait for descendant cleanup retries + // to re-trigger this announce check once everything is complete. + shouldDeleteChildSession = false; + return false; + } + + if (typeof listSubagentRunsForRequester === "function") { + const directChildren = listSubagentRunsForRequester(params.childSessionKey); + if (Array.isArray(directChildren) && directChildren.length > 0) { + childCompletionFindings = buildChildCompletionFindings( + directChildren.map((child) => ({ + childSessionKey: child.childSessionKey, + task: child.task, + label: child.label, + createdAt: child.createdAt, + endedAt: child.endedAt, + frozenResultText: child.frozenResultText, + outcome: child.outcome, + })), + ); + } + } } catch { - // Best-effort only; fall back to direct announce behavior when unavailable. - } - if (pendingChildDescendantRuns > 0) { - // The finished run still has pending descendant subagents (either active, - // or ended but still finishing their own announce and cleanup flow). Defer - // announcing this run until descendants fully settle. - shouldDeleteChildSession = false; - return false; + // Best-effort only; fall back to current-run reply extraction. } - if (requesterDepth >= 1 && reply?.trim()) { - const minReplyChangeWaitMs = FAST_TEST_MODE ? FAST_TEST_REPLY_CHANGE_WAIT_MS : 250; - reply = await waitForSubagentOutputChange({ - sessionKey: params.childSessionKey, - baselineReply: reply, - maxWaitMs: Math.max(minReplyChangeWaitMs, Math.min(params.timeoutMs, 2_000)), - }); + if (!childCompletionFindings) { + if (!reply) { + reply = await readLatestSubagentOutput(params.childSessionKey); + } + + if (!reply?.trim()) { + reply = await readLatestSubagentOutputWithRetry({ + sessionKey: params.childSessionKey, + maxWaitMs: params.timeoutMs, + }); + } + + if ( + !expectsCompletionMessage && + !reply?.trim() && + childSessionId && + isEmbeddedPiRunActive(childSessionId) + ) { + // Avoid announcing "(no output)" while the child run is still producing output. + shouldDeleteChildSession = false; + return false; + } + + if (isAnnounceSkip(reply)) { + return true; + } + if (isSilentReplyText(reply, SILENT_REPLY_TOKEN)) { + return true; + } } // Build status label @@ -1195,7 +1236,7 @@ export async function runSubagentAnnounceFlow(params: { const announceType = params.announceType ?? "subagent task"; const taskLabel = params.label || params.task || "task"; const announceSessionId = childSessionId || "unknown"; - const findings = reply || "(no output)"; + const findings = childCompletionFindings || reply || "(no output)"; let triggerMessage = ""; let steerMessage = ""; let internalEvents: AgentInternalEvent[] = []; diff --git a/src/agents/subagent-registry-runtime.ts b/src/agents/subagent-registry-runtime.ts index 985bf217560..7c2718c5bef 100644 --- a/src/agents/subagent-registry-runtime.ts +++ b/src/agents/subagent-registry-runtime.ts @@ -3,6 +3,7 @@ export { countPendingDescendantRuns, countPendingDescendantRunsExcludingRun, isSubagentSessionRunActive, + listSubagentRunsForRequester, resolveRequesterForChildSession, shouldIgnorePostCompletionAnnounceForSession, } from "./subagent-registry.js";