diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 0b53cf63bd9..3a7715e45c0 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -40,6 +40,9 @@ const subagentRegistryMock = { listSubagentRunsForRequester: vi.fn( (_sessionKey: string, _scope?: { requesterRunId?: string }) => [], ), + replaceSubagentRunAfterSteer: vi.fn( + (_params: { previousRunId: string; nextRunId: string }) => true, + ), resolveRequesterForChildSession: vi.fn((_sessionKey: string): RequesterResolution => null), }; const subagentDeliveryTargetHookMock = vi.fn( @@ -202,6 +205,7 @@ describe("subagent announce formatting", () => { subagentRegistryMock.countPendingDescendantRuns(sessionKey), ); subagentRegistryMock.listSubagentRunsForRequester.mockClear().mockReturnValue([]); + subagentRegistryMock.replaceSubagentRunAfterSteer.mockClear().mockReturnValue(true); subagentRegistryMock.resolveRequesterForChildSession.mockClear().mockReturnValue(null); hasSubagentDeliveryTargetHook = false; hookRunnerMock.hasHooks.mockClear(); @@ -1991,6 +1995,84 @@ describe("subagent announce formatting", () => { expect(msg).not.toContain("placeholder waiting text that should be ignored"); }); + it("wakes an ended orchestrator run with settled child results before any upward announce", async () => { + sessionStore = { + "agent:main:subagent:parent": { + sessionId: "session-parent", + }, + }; + + subagentRegistryMock.countPendingDescendantRuns.mockReturnValue(0); + subagentRegistryMock.listSubagentRunsForRequester.mockImplementation( + (sessionKey: string, scope?: { requesterRunId?: string }) => { + if (sessionKey !== "agent:main:subagent:parent") { + return []; + } + if (scope?.requesterRunId !== "run-parent-phase-1") { + return []; + } + return [ + { + runId: "run-child-a", + childSessionKey: "agent:main:subagent:parent:subagent:a", + requesterSessionKey: "agent:main:subagent:parent", + requesterDisplayKey: "parent", + task: "child task a", + label: "child-a", + cleanup: "keep", + createdAt: 10, + endedAt: 20, + cleanupCompletedAt: 21, + frozenResultText: "result from child a", + outcome: { status: "ok" }, + }, + { + runId: "run-child-b", + childSessionKey: "agent:main:subagent:parent:subagent:b", + requesterSessionKey: "agent:main:subagent:parent", + requesterDisplayKey: "parent", + task: "child task b", + label: "child-b", + cleanup: "keep", + createdAt: 11, + endedAt: 21, + cleanupCompletedAt: 22, + frozenResultText: "result from child b", + outcome: { status: "ok" }, + }, + ]; + }, + ); + + agentSpy.mockResolvedValueOnce({ runId: "run-parent-phase-2", status: "ok" }); + + const didAnnounce = await runSubagentAnnounceFlow({ + childSessionKey: "agent:main:subagent:parent", + childRunId: "run-parent-phase-1", + requesterSessionKey: "agent:main:main", + requesterDisplayKey: "main", + ...defaultOutcomeAnnounce, + expectsCompletionMessage: true, + wakeOnDescendantSettle: true, + roundOneReply: "waiting for children", + }); + + expect(didAnnounce).toBe(true); + expect(agentSpy).toHaveBeenCalledTimes(1); + const call = agentSpy.mock.calls[0]?.[0] as { + params?: { sessionKey?: string; message?: string }; + }; + expect(call?.params?.sessionKey).toBe("agent:main:subagent:parent"); + const message = call?.params?.message ?? ""; + expect(message).toContain("All pending descendants for that run have now settled"); + expect(message).toContain("result from child a"); + expect(message).toContain("result from child b"); + expect(subagentRegistryMock.replaceSubagentRunAfterSteer).toHaveBeenCalledWith({ + previousRunId: "run-parent-phase-1", + nextRunId: "run-parent-phase-2", + }); + }); + it("nested completion chains re-check child then parent deterministically", async () => { const parentSessionKey = "agent:main:subagent:parent"; const childSessionKey = "agent:main:subagent:parent:subagent:child"; diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index f97eafec16e..775fb257e69 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -1066,6 +1066,92 @@ function buildAnnounceSteerMessage(events: AgentInternalEvent[]): string { return rendered; } +function hasUsableSessionEntry(entry: unknown): boolean { + if (!entry || typeof entry !== "object") { + return false; + } + const sessionId = (entry as { sessionId?: unknown }).sessionId; + if (typeof sessionId === "string" && sessionId.trim() === "") { + return false; + } + return true; +} + +function buildDescendantWakeMessage(params: { findings: string; taskLabel: string }): string { + return [ + "[Subagent Context] Your prior run ended while waiting for descendant subagent completions.", + "[Subagent Context] All pending descendants for that run have now settled.", + "[Subagent Context] Continue your workflow using these results. Spawn more subagents if needed, otherwise send your final answer.", + "", + `Task: ${params.taskLabel}`, + "", + params.findings, + ].join("\n"); +} + +async function wakeSubagentRunAfterDescendants(params: { + runId: string; + childSessionKey: string; + taskLabel: string; + findings: string; + announceId: string; + signal?: AbortSignal; +}): Promise { + if (params.signal?.aborted) { + return false; + } + + const childEntry = loadSessionEntryByKey(params.childSessionKey); + if (!hasUsableSessionEntry(childEntry)) { + return false; + } + + const cfg = loadConfig(); + const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg); + const wakeMessage = buildDescendantWakeMessage({ + findings: params.findings, + taskLabel: params.taskLabel, + }); + + let wakeRunId = ""; + try { + const wakeResponse = await runAnnounceDeliveryWithRetry<{ runId?: string }>({ + operation: "descendant wake agent call", + signal: params.signal, + run: async () => + await callGateway({ + method: "agent", + params: { + sessionKey: params.childSessionKey, + message: wakeMessage, + deliver: false, + inputProvenance: { + kind: "inter_session", + sourceSessionKey: params.childSessionKey, + sourceChannel: INTERNAL_MESSAGE_CHANNEL, + sourceTool: "subagent_announce", + }, + idempotencyKey: buildAnnounceIdempotencyKey(`${params.announceId}:wake`), + }, + timeoutMs: announceTimeoutMs, + }), + }); + wakeRunId = typeof wakeResponse?.runId === "string" ? wakeResponse.runId.trim() : ""; + } catch { + return false; + } + + if (!wakeRunId) { + return false; + } + + const { replaceSubagentRunAfterSteer } = await loadSubagentRegistryRuntime(); + return replaceSubagentRunAfterSteer({ + previousRunId: params.runId, + nextRunId: wakeRunId, + }); +} + export async function runSubagentAnnounceFlow(params: { childSessionKey: string; childRunId: string; @@ -1084,6 +1170,7 @@ export async function runSubagentAnnounceFlow(params: { announceType?: SubagentAnnounceType; expectsCompletionMessage?: boolean; spawnMode?: SpawnSubagentMode; + wakeOnDescendantSettle?: boolean; signal?: AbortSignal; bestEffortDeliver?: boolean; }): Promise { @@ -1193,6 +1280,26 @@ export async function runSubagentAnnounceFlow(params: { // Best-effort only; fall back to current-run reply extraction. } + const announceId = buildAnnounceIdFromChildRun({ + childSessionKey: params.childSessionKey, + childRunId: params.childRunId, + }); + + if (params.wakeOnDescendantSettle === true && childCompletionFindings?.trim()) { + const woke = await wakeSubagentRunAfterDescendants({ + runId: params.childRunId, + childSessionKey: params.childSessionKey, + taskLabel: params.label || params.task || "task", + findings: childCompletionFindings, + announceId, + signal: params.signal, + }); + if (woke) { + shouldDeleteChildSession = false; + return true; + } + } + if (!childCompletionFindings) { if (!reply) { reply = await readLatestSubagentOutput(params.childSessionKey); @@ -1262,11 +1369,7 @@ export async function runSubagentAnnounceFlow(params: { // Parent run has ended. Check if parent SESSION still exists. // If it does, the parent may be waiting for child results — inject there. const parentSessionEntry = loadSessionEntryByKey(targetRequesterSessionKey); - const parentSessionId = - typeof parentSessionEntry?.sessionId === "string" - ? parentSessionEntry.sessionId.trim() - : ""; - const parentSessionAlive = Boolean(parentSessionId); + const parentSessionAlive = hasUsableSessionEntry(parentSessionEntry); if (!parentSessionAlive) { // Parent session is truly gone — fallback to grandparent @@ -1317,10 +1420,6 @@ export async function runSubagentAnnounceFlow(params: { triggerMessage = buildAnnounceSteerMessage(internalEvents); steerMessage = triggerMessage; - const announceId = buildAnnounceIdFromChildRun({ - childSessionKey: params.childSessionKey, - childRunId: params.childRunId, - }); // Send to the requester session. For nested subagents this is an internal // follow-up injection (deliver=false) so the orchestrator receives it. let directOrigin = targetRequesterOrigin; diff --git a/src/agents/subagent-registry-runtime.ts b/src/agents/subagent-registry-runtime.ts index 7c2718c5bef..567c0321543 100644 --- a/src/agents/subagent-registry-runtime.ts +++ b/src/agents/subagent-registry-runtime.ts @@ -4,6 +4,7 @@ export { countPendingDescendantRunsExcludingRun, isSubagentSessionRunActive, listSubagentRunsForRequester, + replaceSubagentRunAfterSteer, resolveRequesterForChildSession, shouldIgnorePostCompletionAnnounceForSession, } from "./subagent-registry.js"; diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 01e6d391f15..28783e2301b 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -431,6 +431,7 @@ function startSubagentAnnounceCleanupFlow(runId: string, entry: SubagentRunRecor outcome: entry.outcome, spawnMode: entry.spawnMode, expectsCompletionMessage: entry.expectsCompletionMessage, + wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true, }) .then((didAnnounce) => { void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce); @@ -725,6 +726,7 @@ async function finalizeSubagentCleanup( return; } if (didAnnounce) { + entry.wakeOnDescendantSettle = undefined; const completionReason = resolveCleanupCompletionReason(entry); await emitCompletionEndedHookIfNeeded(entry, completionReason); // Clean up attachments before the run record is removed. @@ -756,6 +758,7 @@ async function finalizeSubagentCleanup( if (deferredDecision.kind === "defer-descendants") { entry.lastAnnounceRetryAt = now; + entry.wakeOnDescendantSettle = true; entry.cleanupHandled = false; resumedRuns.delete(runId); persistSubagentRuns(); @@ -771,6 +774,7 @@ async function finalizeSubagentCleanup( } if (deferredDecision.kind === "give-up") { + entry.wakeOnDescendantSettle = undefined; const shouldDeleteAttachments = cleanup === "delete" || !entry.retainAttachmentsOnKeep; if (shouldDeleteAttachments) { await safeRemoveAttachmentsDir(entry); @@ -964,6 +968,7 @@ export function replaceSubagentRunAfterSteer(params: { endedAt: undefined, endedReason: undefined, endedHookEmittedAt: undefined, + wakeOnDescendantSettle: undefined, outcome: undefined, frozenResultText: undefined, frozenResultCapturedAt: undefined, @@ -1030,6 +1035,7 @@ export function registerSubagentRun(params: { startedAt: now, archiveAtMs, cleanupHandled: false, + wakeOnDescendantSettle: undefined, attachmentsDir: params.attachmentsDir, attachmentsRootDir: params.attachmentsRootDir, retainAttachmentsOnKeep: params.retainAttachmentsOnKeep, diff --git a/src/agents/subagent-registry.types.ts b/src/agents/subagent-registry.types.ts index 9100667600b..a3e9aaa7728 100644 --- a/src/agents/subagent-registry.types.ts +++ b/src/agents/subagent-registry.types.ts @@ -30,6 +30,8 @@ export type SubagentRunRecord = { lastAnnounceRetryAt?: number; /** Terminal lifecycle reason recorded when the run finishes. */ endedReason?: SubagentLifecycleEndedReason; + /** Run ended while descendants were still pending and should be re-invoked once they settle. */ + wakeOnDescendantSettle?: boolean; /** Frozen completion output captured when the run first transitions to ended state. */ frozenResultText?: string | null; /** Timestamp when frozenResultText was captured and locked. */