From a1b656705992e31df99147c786217fcc517c68b3 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 26 Apr 2026 00:29:06 -0700 Subject: [PATCH] fix(agents): fallback subagent completion delivery --- src/agents/subagent-announce-delivery.test.ts | 128 ++++++++++++++++++ src/agents/subagent-announce-delivery.ts | 68 +++++++--- src/agents/subagent-announce.ts | 3 + .../subagent-registry-lifecycle.test.ts | 76 +++++++++++ src/agents/subagent-registry-lifecycle.ts | 39 ++++++ src/agents/subagent-registry.types.ts | 1 + src/tasks/detached-task-runtime-contract.ts | 1 + src/tasks/task-executor.ts | 1 + src/tasks/task-registry.ts | 12 +- 9 files changed, 309 insertions(+), 20 deletions(-) diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index f36513a6048..f5519e639d6 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -115,6 +115,48 @@ async function deliverDiscordDirectMessageCompletion(params: { }); } +async function deliverTelegramDirectMessageCompletion(params: { + callGateway: typeof runtimeCallGateway; + sendMessage?: typeof runtimeSendMessage; + internalEvents?: AgentInternalEvent[]; + isActive?: boolean; + queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean; +}) { + const origin = { + channel: "telegram", + to: "123456789", + accountId: "bot-1", + }; + __testing.setDepsForTest({ + callGateway: params.callGateway, + getRequesterSessionActivity: () => ({ + sessionId: "requester-session-telegram", + isActive: params.isActive === true, + }), + loadConfig: () => ({}) as never, + ...(params.queueEmbeddedPiMessage + ? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage } + : {}), + ...(params.sendMessage ? { sendMessage: params.sendMessage } : {}), + }); + + return deliverSubagentAnnouncement({ + requesterSessionKey: "agent:main:telegram:123456789", + targetRequesterSessionKey: "agent:main:telegram:123456789", + triggerMessage: "child done", + steerMessage: "child done", + requesterOrigin: origin, + requesterSessionOrigin: origin, + completionDirectOrigin: origin, + directOrigin: origin, + requesterIsSubagent: false, + expectsCompletionMessage: true, + bestEffortDeliver: true, + directIdempotencyKey: "announce-telegram-dm-fallback", + internalEvents: params.internalEvents, + }); +} + async function deliverSlackChannelAnnouncement(params: { callGateway: typeof runtimeCallGateway; isActive: boolean; @@ -510,6 +552,92 @@ describe("deliverSubagentAnnouncement completion delivery", () => { ); }); + it("uses direct fallback for Telegram DMs when announce-agent delivery fails", async () => { + const callGateway = vi.fn(async () => { + throw new Error("UNAVAILABLE: requester wake failed"); + }) as unknown as typeof runtimeCallGateway; + const sendMessage = createSendMessageMock(); + const result = await deliverTelegramDirectMessageCompletion({ + callGateway, + sendMessage, + internalEvents: [ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + childSessionId: "child-session-id", + announceType: "subagent task", + taskLabel: "telegram completion smoke", + status: "ok", + statusLabel: "completed successfully", + result: "child completion output", + replyInstruction: "Summarize the result.", + }, + ], + }); + + expect(result).toEqual( + expect.objectContaining({ + delivered: true, + path: "direct-fallback", + }), + ); + expect(sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + accountId: "bot-1", + to: "123456789", + threadId: undefined, + content: "child completion output", + requesterSessionKey: "agent:main:telegram:123456789", + bestEffort: true, + idempotencyKey: "announce-telegram-dm-fallback", + }), + ); + }); + + it("uses direct fallback when an active Telegram requester cannot be woken", async () => { + const callGateway = createGatewayMock(); + const sendMessage = createSendMessageMock(); + const queueEmbeddedPiMessage = vi.fn(() => false); + const result = await deliverTelegramDirectMessageCompletion({ + callGateway, + sendMessage, + isActive: true, + queueEmbeddedPiMessage, + internalEvents: [ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + childSessionId: "child-session-id", + announceType: "subagent task", + taskLabel: "telegram wake smoke", + status: "ok", + statusLabel: "completed successfully", + result: "child completion output", + replyInstruction: "Summarize the result.", + }, + ], + }); + + expect(result).toEqual( + expect.objectContaining({ + delivered: true, + path: "direct-fallback", + }), + ); + expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-telegram", "child done"); + expect(callGateway).not.toHaveBeenCalled(); + expect(sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "telegram", + to: "123456789", + content: "child completion output", + }), + ); + }); + it("uses a direct thread fallback when announce-agent returns no visible output", async () => { const callGateway = createGatewayMock({ result: { diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 8202777faf6..5f8ad5d73ec 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -681,6 +681,10 @@ async function sendSubagentAnnounceDirectly(params: { isGatewayMessageChannel(normalizedSessionOnlyOriginChannel) ? normalizedSessionOnlyOriginChannel : undefined; + const completionFallbackText = + params.expectsCompletionMessage && deliveryTarget.deliver + ? extractThreadCompletionFallbackText(params.internalEvents) + : ""; const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey); if (params.expectsCompletionMessage && requesterActivity.sessionId) { const woke = requesterActivity.sessionId @@ -696,6 +700,32 @@ async function sendSubagentAnnounceDirectly(params: { }; } if (requesterActivity.isActive) { + try { + const didFallback = await sendCompletionFallback({ + cfg, + channel: deliveryTarget.channel, + to: deliveryTarget.to, + accountId: deliveryTarget.accountId, + threadId: deliveryTarget.threadId, + content: completionFallbackText, + requesterSessionKey: canonicalRequesterSessionKey, + bestEffortDeliver: params.bestEffortDeliver, + idempotencyKey: params.directIdempotencyKey, + signal: params.signal, + }); + if (didFallback) { + return { + delivered: true, + path: resolveCompletionFallbackPath(deliveryTarget.threadId), + }; + } + } catch (err) { + return { + delivered: false, + path: "direct", + error: `active requester session could not be woken; fallback send failed: ${summarizeDeliveryError(err)}`, + }; + } return { delivered: false, path: "direct", @@ -709,10 +739,6 @@ async function sendSubagentAnnounceDirectly(params: { path: "none", }; } - const completionFallbackText = - params.expectsCompletionMessage && deliveryTarget.deliver - ? extractThreadCompletionFallbackText(params.internalEvents) - : ""; let directAnnounceResponse: unknown; try { directAnnounceResponse = await runAnnounceDeliveryWithRetry({ @@ -758,22 +784,30 @@ async function sendSubagentAnnounceDirectly(params: { }), }); } catch (err) { - const didFallback = await sendCompletionFallback({ - cfg, - channel: deliveryTarget.channel, - to: deliveryTarget.to, - accountId: deliveryTarget.accountId, - threadId: deliveryTarget.threadId, - content: deliveryTarget.threadId ? completionFallbackText : "", - requesterSessionKey: canonicalRequesterSessionKey, - bestEffortDeliver: params.bestEffortDeliver, - idempotencyKey: params.directIdempotencyKey, - signal: params.signal, - }); + let didFallback = false; + try { + didFallback = await sendCompletionFallback({ + cfg, + channel: deliveryTarget.channel, + to: deliveryTarget.to, + accountId: deliveryTarget.accountId, + threadId: deliveryTarget.threadId, + content: completionFallbackText, + requesterSessionKey: canonicalRequesterSessionKey, + bestEffortDeliver: params.bestEffortDeliver, + idempotencyKey: params.directIdempotencyKey, + signal: params.signal, + }); + } catch (fallbackErr) { + throw new Error( + `${summarizeDeliveryError(err)}; fallback send failed: ${summarizeDeliveryError(fallbackErr)}`, + { cause: fallbackErr }, + ); + } if (didFallback) { return { delivered: true, - path: "direct-thread-fallback", + path: resolveCompletionFallbackPath(deliveryTarget.threadId), }; } throw err; diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index a6302402da6..4b0c5b82cbd 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -23,6 +23,7 @@ import { resolveSubagentAnnounceTimeoutMs, resolveSubagentCompletionOrigin, } from "./subagent-announce-delivery.js"; +import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js"; import { resolveAnnounceOrigin } from "./subagent-announce-origin.js"; import { applySubagentWaitOutcome, @@ -244,6 +245,7 @@ export async function runSubagentAnnounceFlow(params: { wakeOnDescendantSettle?: boolean; signal?: AbortSignal; bestEffortDeliver?: boolean; + onDeliveryResult?: (delivery: SubagentAnnounceDeliveryResult) => void; }): Promise { let didAnnounce = false; const expectsCompletionMessage = params.expectsCompletionMessage === true; @@ -562,6 +564,7 @@ export async function runSubagentAnnounceFlow(params: { directIdempotencyKey, signal: params.signal, }); + params.onDeliveryResult?.(delivery); didAnnounce = delivery.delivered; if (!delivery.delivered && delivery.path === "direct" && delivery.error) { defaultRuntime.error?.( diff --git a/src/agents/subagent-registry-lifecycle.test.ts b/src/agents/subagent-registry-lifecycle.test.ts index 07951c68dd0..35ee9df2338 100644 --- a/src/agents/subagent-registry-lifecycle.test.ts +++ b/src/agents/subagent-registry-lifecycle.test.ts @@ -569,6 +569,82 @@ describe("subagent registry lifecycle hardening", () => { expect(persist).toHaveBeenCalled(); }); + it("persists the concrete announce delivery error when cleanup gives up", async () => { + const persist = vi.fn(); + const entry = createRunEntry({ + endedAt: 4_000, + expectsCompletionMessage: true, + retainAttachmentsOnKeep: true, + }); + const runSubagentAnnounceFlow = vi.fn( + async (announceParams: { + onDeliveryResult?: (delivery: { + delivered: false; + path: "direct"; + error: string; + phases: Array<{ + phase: "direct-primary" | "queue-fallback"; + delivered: boolean; + path: "direct" | "none"; + error?: string; + }>; + }) => void; + }) => { + announceParams.onDeliveryResult?.({ + delivered: false, + path: "direct", + error: "UNAVAILABLE: requester wake failed", + phases: [ + { + phase: "direct-primary", + delivered: false, + path: "direct", + error: "UNAVAILABLE: requester wake failed", + }, + { + phase: "queue-fallback", + delivered: false, + path: "none", + }, + ], + }); + return false; + }, + ); + + const controller = createLifecycleController({ + entry, + persist, + runSubagentAnnounceFlow, + }); + + await expect( + controller.completeSubagentRun({ + runId: entry.runId, + endedAt: 4_000, + outcome: { status: "ok" }, + reason: SUBAGENT_ENDED_REASON_COMPLETE, + triggerCleanup: true, + }), + ).resolves.toBeUndefined(); + + expect(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith( + expect.objectContaining({ + runId: entry.runId, + runtime: "subagent", + sessionKey: entry.childSessionKey, + deliveryStatus: "failed", + error: + "UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed", + }), + ); + expect(entry.lastAnnounceDeliveryError).toBe( + "UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed", + ); + expect(entry.cleanupCompletedAt).toBeTypeOf("number"); + expect(persist).toHaveBeenCalled(); + }); + it("skips browser cleanup when steer restart suppresses cleanup flow", async () => { const entry = createRunEntry({ expectsCompletionMessage: false, diff --git a/src/agents/subagent-registry-lifecycle.ts b/src/agents/subagent-registry-lifecycle.ts index 83777211798..ab6959b0757 100644 --- a/src/agents/subagent-registry-lifecycle.ts +++ b/src/agents/subagent-registry-lifecycle.ts @@ -10,6 +10,7 @@ import { } from "../tasks/detached-task-runtime.js"; import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js"; import { retireSessionMcpRuntimeForSessionKey } from "./pi-bundle-mcp-tools.js"; +import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js"; import { type SubagentRunOutcome, withSubagentOutcomeTiming } from "./subagent-announce-output.js"; import { SUBAGENT_ENDED_REASON_COMPLETE, @@ -126,10 +127,25 @@ export function createSubagentRegistryLifecycleController(params: { return name ? { name, message } : { message }; }; + const formatAnnounceDeliveryError = (delivery: SubagentAnnounceDeliveryResult): string => { + const errors = [ + delivery.error, + ...(delivery.phases ?? []).map((phase) => + phase.error ? `${phase.phase}: ${phase.error}` : undefined, + ), + ] + .map((value) => value?.trim()) + .filter((value): value is string => Boolean(value)); + return errors.length > 0 + ? [...new Set(errors)].join("; ") + : `delivery path ${delivery.path} did not complete`; + }; + const safeSetSubagentTaskDeliveryStatus = (args: { runId: string; childSessionKey: string; deliveryStatus: "delivered" | "failed"; + deliveryError?: string; }) => { try { setDetachedTaskDeliveryStatusByRunId({ @@ -137,6 +153,7 @@ export function createSubagentRegistryLifecycleController(params: { runtime: "subagent", sessionKey: args.childSessionKey, deliveryStatus: args.deliveryStatus, + error: args.deliveryStatus === "failed" ? args.deliveryError : undefined, }); } catch (err) { params.warn("failed to update subagent background task delivery state", { @@ -301,6 +318,7 @@ export function createSubagentRegistryLifecycleController(params: { runId: giveUpParams.runId, childSessionKey: giveUpParams.entry.childSessionKey, deliveryStatus: "failed", + deliveryError: giveUpParams.entry.lastAnnounceDeliveryError, }); giveUpParams.entry.wakeOnDescendantSettle = undefined; giveUpParams.entry.fallbackFrozenResultText = undefined; @@ -464,6 +482,7 @@ export function createSubagentRegistryLifecycleController(params: { childSessionKey: entry.childSessionKey, deliveryStatus: "delivered", }); + entry.lastAnnounceDeliveryError = undefined; entry.wakeOnDescendantSettle = undefined; entry.fallbackFrozenResultText = undefined; entry.fallbackFrozenResultCapturedAt = undefined; @@ -518,6 +537,7 @@ export function createSubagentRegistryLifecycleController(params: { runId, childSessionKey: entry.childSessionKey, deliveryStatus: "failed", + deliveryError: entry.lastAnnounceDeliveryError, }); entry.wakeOnDescendantSettle = undefined; entry.fallbackFrozenResultText = undefined; @@ -571,7 +591,11 @@ export function createSubagentRegistryLifecycleController(params: { return false; } const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin); + let latestDeliveryError = entry.lastAnnounceDeliveryError; const finalizeAnnounceCleanup = (didAnnounce: boolean) => { + if (!didAnnounce && latestDeliveryError) { + entry.lastAnnounceDeliveryError = latestDeliveryError; + } void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce).catch((err) => { defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`); const current = params.runs.get(runId); @@ -603,6 +627,21 @@ export function createSubagentRegistryLifecycleController(params: { spawnMode: entry.spawnMode, expectsCompletionMessage: entry.expectsCompletionMessage, wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true, + onDeliveryResult: (delivery) => { + if (delivery.delivered) { + if (entry.lastAnnounceDeliveryError !== undefined) { + entry.lastAnnounceDeliveryError = undefined; + params.persist(); + } + latestDeliveryError = undefined; + return; + } + latestDeliveryError = formatAnnounceDeliveryError(delivery); + if (entry.lastAnnounceDeliveryError !== latestDeliveryError) { + entry.lastAnnounceDeliveryError = latestDeliveryError; + params.persist(); + } + }, }) .then((didAnnounce) => { finalizeAnnounceCleanup(didAnnounce); diff --git a/src/agents/subagent-registry.types.ts b/src/agents/subagent-registry.types.ts index bec68b9389a..3274b91fbbb 100644 --- a/src/agents/subagent-registry.types.ts +++ b/src/agents/subagent-registry.types.ts @@ -30,6 +30,7 @@ export type SubagentRunRecord = { expectsCompletionMessage?: boolean; announceRetryCount?: number; lastAnnounceRetryAt?: number; + lastAnnounceDeliveryError?: string; endedReason?: SubagentLifecycleEndedReason; wakeOnDescendantSettle?: boolean; frozenResultText?: string | null; diff --git a/src/tasks/detached-task-runtime-contract.ts b/src/tasks/detached-task-runtime-contract.ts index 82455277e3e..07112a234ce 100644 --- a/src/tasks/detached-task-runtime-contract.ts +++ b/src/tasks/detached-task-runtime-contract.ts @@ -96,6 +96,7 @@ export type DetachedTaskDeliveryStatusParams = { runtime?: TaskRuntime; sessionKey?: string; deliveryStatus: TaskDeliveryStatus; + error?: string; }; export type DetachedTaskCancelParams = { diff --git a/src/tasks/task-executor.ts b/src/tasks/task-executor.ts index ce09f698eda..f8058189745 100644 --- a/src/tasks/task-executor.ts +++ b/src/tasks/task-executor.ts @@ -211,6 +211,7 @@ export function setDetachedTaskDeliveryStatusByRunId(params: { runtime?: TaskRuntime; sessionKey?: string; deliveryStatus: TaskDeliveryStatus; + error?: string; }) { return setTaskRunDeliveryStatusByRunId(params); } diff --git a/src/tasks/task-registry.ts b/src/tasks/task-registry.ts index f2a08464bcb..a571bf9827c 100644 --- a/src/tasks/task-registry.ts +++ b/src/tasks/task-registry.ts @@ -1672,15 +1672,20 @@ function updateTaskDeliveryByRunId(params: { runtime?: TaskRuntime; sessionKey?: string; deliveryStatus: TaskDeliveryStatus; + error?: string; }) { ensureTaskRegistryReady(); + const patch: Partial = { + deliveryStatus: params.deliveryStatus, + }; + if (params.error !== undefined) { + patch.error = params.error; + } return updateTasksByRunId({ runId: params.runId, runtime: params.runtime, sessionKey: params.sessionKey, - patch: { - deliveryStatus: params.deliveryStatus, - }, + patch, }); } @@ -1772,6 +1777,7 @@ export function setTaskRunDeliveryStatusByRunId(params: { runtime?: TaskRuntime; sessionKey?: string; deliveryStatus: TaskDeliveryStatus; + error?: string; }) { return updateTaskDeliveryByRunId(params); }