fix(agents): fallback subagent completion delivery

This commit is contained in:
Vincent Koc
2026-04-26 00:29:06 -07:00
parent 8741a86f93
commit a1b6567059
9 changed files with 309 additions and 20 deletions

View File

@@ -115,6 +115,48 @@ async function deliverDiscordDirectMessageCompletion(params: {
});
}
async function deliverTelegramDirectMessageCompletion(params: {
callGateway: typeof runtimeCallGateway;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
isActive?: boolean;
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
}) {
const origin = {
channel: "telegram",
to: "123456789",
accountId: "bot-1",
};
__testing.setDepsForTest({
callGateway: params.callGateway,
getRequesterSessionActivity: () => ({
sessionId: "requester-session-telegram",
isActive: params.isActive === true,
}),
loadConfig: () => ({}) as never,
...(params.queueEmbeddedPiMessage
? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage }
: {}),
...(params.sendMessage ? { sendMessage: params.sendMessage } : {}),
});
return deliverSubagentAnnouncement({
requesterSessionKey: "agent:main:telegram:123456789",
targetRequesterSessionKey: "agent:main:telegram:123456789",
triggerMessage: "child done",
steerMessage: "child done",
requesterOrigin: origin,
requesterSessionOrigin: origin,
completionDirectOrigin: origin,
directOrigin: origin,
requesterIsSubagent: false,
expectsCompletionMessage: true,
bestEffortDeliver: true,
directIdempotencyKey: "announce-telegram-dm-fallback",
internalEvents: params.internalEvents,
});
}
async function deliverSlackChannelAnnouncement(params: {
callGateway: typeof runtimeCallGateway;
isActive: boolean;
@@ -510,6 +552,92 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
);
});
it("uses direct fallback for Telegram DMs when announce-agent delivery fails", async () => {
const callGateway = vi.fn(async () => {
throw new Error("UNAVAILABLE: requester wake failed");
}) as unknown as typeof runtimeCallGateway;
const sendMessage = createSendMessageMock();
const result = await deliverTelegramDirectMessageCompletion({
callGateway,
sendMessage,
internalEvents: [
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
childSessionId: "child-session-id",
announceType: "subagent task",
taskLabel: "telegram completion smoke",
status: "ok",
statusLabel: "completed successfully",
result: "child completion output",
replyInstruction: "Summarize the result.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "direct-fallback",
}),
);
expect(sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
accountId: "bot-1",
to: "123456789",
threadId: undefined,
content: "child completion output",
requesterSessionKey: "agent:main:telegram:123456789",
bestEffort: true,
idempotencyKey: "announce-telegram-dm-fallback",
}),
);
});
it("uses direct fallback when an active Telegram requester cannot be woken", async () => {
const callGateway = createGatewayMock();
const sendMessage = createSendMessageMock();
const queueEmbeddedPiMessage = vi.fn(() => false);
const result = await deliverTelegramDirectMessageCompletion({
callGateway,
sendMessage,
isActive: true,
queueEmbeddedPiMessage,
internalEvents: [
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
childSessionId: "child-session-id",
announceType: "subagent task",
taskLabel: "telegram wake smoke",
status: "ok",
statusLabel: "completed successfully",
result: "child completion output",
replyInstruction: "Summarize the result.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "direct-fallback",
}),
);
expect(queueEmbeddedPiMessage).toHaveBeenCalledWith("requester-session-telegram", "child done");
expect(callGateway).not.toHaveBeenCalled();
expect(sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "123456789",
content: "child completion output",
}),
);
});
it("uses a direct thread fallback when announce-agent returns no visible output", async () => {
const callGateway = createGatewayMock({
result: {

View File

@@ -681,6 +681,10 @@ async function sendSubagentAnnounceDirectly(params: {
isGatewayMessageChannel(normalizedSessionOnlyOriginChannel)
? normalizedSessionOnlyOriginChannel
: undefined;
const completionFallbackText =
params.expectsCompletionMessage && deliveryTarget.deliver
? extractThreadCompletionFallbackText(params.internalEvents)
: "";
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
if (params.expectsCompletionMessage && requesterActivity.sessionId) {
const woke = requesterActivity.sessionId
@@ -696,6 +700,32 @@ async function sendSubagentAnnounceDirectly(params: {
};
}
if (requesterActivity.isActive) {
try {
const didFallback = await sendCompletionFallback({
cfg,
channel: deliveryTarget.channel,
to: deliveryTarget.to,
accountId: deliveryTarget.accountId,
threadId: deliveryTarget.threadId,
content: completionFallbackText,
requesterSessionKey: canonicalRequesterSessionKey,
bestEffortDeliver: params.bestEffortDeliver,
idempotencyKey: params.directIdempotencyKey,
signal: params.signal,
});
if (didFallback) {
return {
delivered: true,
path: resolveCompletionFallbackPath(deliveryTarget.threadId),
};
}
} catch (err) {
return {
delivered: false,
path: "direct",
error: `active requester session could not be woken; fallback send failed: ${summarizeDeliveryError(err)}`,
};
}
return {
delivered: false,
path: "direct",
@@ -709,10 +739,6 @@ async function sendSubagentAnnounceDirectly(params: {
path: "none",
};
}
const completionFallbackText =
params.expectsCompletionMessage && deliveryTarget.deliver
? extractThreadCompletionFallbackText(params.internalEvents)
: "";
let directAnnounceResponse: unknown;
try {
directAnnounceResponse = await runAnnounceDeliveryWithRetry({
@@ -758,22 +784,30 @@ async function sendSubagentAnnounceDirectly(params: {
}),
});
} catch (err) {
const didFallback = await sendCompletionFallback({
cfg,
channel: deliveryTarget.channel,
to: deliveryTarget.to,
accountId: deliveryTarget.accountId,
threadId: deliveryTarget.threadId,
content: deliveryTarget.threadId ? completionFallbackText : "",
requesterSessionKey: canonicalRequesterSessionKey,
bestEffortDeliver: params.bestEffortDeliver,
idempotencyKey: params.directIdempotencyKey,
signal: params.signal,
});
let didFallback = false;
try {
didFallback = await sendCompletionFallback({
cfg,
channel: deliveryTarget.channel,
to: deliveryTarget.to,
accountId: deliveryTarget.accountId,
threadId: deliveryTarget.threadId,
content: completionFallbackText,
requesterSessionKey: canonicalRequesterSessionKey,
bestEffortDeliver: params.bestEffortDeliver,
idempotencyKey: params.directIdempotencyKey,
signal: params.signal,
});
} catch (fallbackErr) {
throw new Error(
`${summarizeDeliveryError(err)}; fallback send failed: ${summarizeDeliveryError(fallbackErr)}`,
{ cause: fallbackErr },
);
}
if (didFallback) {
return {
delivered: true,
path: "direct-thread-fallback",
path: resolveCompletionFallbackPath(deliveryTarget.threadId),
};
}
throw err;

View File

@@ -23,6 +23,7 @@ import {
resolveSubagentAnnounceTimeoutMs,
resolveSubagentCompletionOrigin,
} from "./subagent-announce-delivery.js";
import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js";
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
import {
applySubagentWaitOutcome,
@@ -244,6 +245,7 @@ export async function runSubagentAnnounceFlow(params: {
wakeOnDescendantSettle?: boolean;
signal?: AbortSignal;
bestEffortDeliver?: boolean;
onDeliveryResult?: (delivery: SubagentAnnounceDeliveryResult) => void;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
@@ -562,6 +564,7 @@ export async function runSubagentAnnounceFlow(params: {
directIdempotencyKey,
signal: params.signal,
});
params.onDeliveryResult?.(delivery);
didAnnounce = delivery.delivered;
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
defaultRuntime.error?.(

View File

@@ -569,6 +569,82 @@ describe("subagent registry lifecycle hardening", () => {
expect(persist).toHaveBeenCalled();
});
it("persists the concrete announce delivery error when cleanup gives up", async () => {
const persist = vi.fn();
const entry = createRunEntry({
endedAt: 4_000,
expectsCompletionMessage: true,
retainAttachmentsOnKeep: true,
});
const runSubagentAnnounceFlow = vi.fn(
async (announceParams: {
onDeliveryResult?: (delivery: {
delivered: false;
path: "direct";
error: string;
phases: Array<{
phase: "direct-primary" | "queue-fallback";
delivered: boolean;
path: "direct" | "none";
error?: string;
}>;
}) => void;
}) => {
announceParams.onDeliveryResult?.({
delivered: false,
path: "direct",
error: "UNAVAILABLE: requester wake failed",
phases: [
{
phase: "direct-primary",
delivered: false,
path: "direct",
error: "UNAVAILABLE: requester wake failed",
},
{
phase: "queue-fallback",
delivered: false,
path: "none",
},
],
});
return false;
},
);
const controller = createLifecycleController({
entry,
persist,
runSubagentAnnounceFlow,
});
await expect(
controller.completeSubagentRun({
runId: entry.runId,
endedAt: 4_000,
outcome: { status: "ok" },
reason: SUBAGENT_ENDED_REASON_COMPLETE,
triggerCleanup: true,
}),
).resolves.toBeUndefined();
expect(taskExecutorMocks.setDetachedTaskDeliveryStatusByRunId).toHaveBeenCalledWith(
expect.objectContaining({
runId: entry.runId,
runtime: "subagent",
sessionKey: entry.childSessionKey,
deliveryStatus: "failed",
error:
"UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed",
}),
);
expect(entry.lastAnnounceDeliveryError).toBe(
"UNAVAILABLE: requester wake failed; direct-primary: UNAVAILABLE: requester wake failed",
);
expect(entry.cleanupCompletedAt).toBeTypeOf("number");
expect(persist).toHaveBeenCalled();
});
it("skips browser cleanup when steer restart suppresses cleanup flow", async () => {
const entry = createRunEntry({
expectsCompletionMessage: false,

View File

@@ -10,6 +10,7 @@ import {
} from "../tasks/detached-task-runtime.js";
import { normalizeDeliveryContext } from "../utils/delivery-context.shared.js";
import { retireSessionMcpRuntimeForSessionKey } from "./pi-bundle-mcp-tools.js";
import type { SubagentAnnounceDeliveryResult } from "./subagent-announce-dispatch.js";
import { type SubagentRunOutcome, withSubagentOutcomeTiming } from "./subagent-announce-output.js";
import {
SUBAGENT_ENDED_REASON_COMPLETE,
@@ -126,10 +127,25 @@ export function createSubagentRegistryLifecycleController(params: {
return name ? { name, message } : { message };
};
const formatAnnounceDeliveryError = (delivery: SubagentAnnounceDeliveryResult): string => {
const errors = [
delivery.error,
...(delivery.phases ?? []).map((phase) =>
phase.error ? `${phase.phase}: ${phase.error}` : undefined,
),
]
.map((value) => value?.trim())
.filter((value): value is string => Boolean(value));
return errors.length > 0
? [...new Set(errors)].join("; ")
: `delivery path ${delivery.path} did not complete`;
};
const safeSetSubagentTaskDeliveryStatus = (args: {
runId: string;
childSessionKey: string;
deliveryStatus: "delivered" | "failed";
deliveryError?: string;
}) => {
try {
setDetachedTaskDeliveryStatusByRunId({
@@ -137,6 +153,7 @@ export function createSubagentRegistryLifecycleController(params: {
runtime: "subagent",
sessionKey: args.childSessionKey,
deliveryStatus: args.deliveryStatus,
error: args.deliveryStatus === "failed" ? args.deliveryError : undefined,
});
} catch (err) {
params.warn("failed to update subagent background task delivery state", {
@@ -301,6 +318,7 @@ export function createSubagentRegistryLifecycleController(params: {
runId: giveUpParams.runId,
childSessionKey: giveUpParams.entry.childSessionKey,
deliveryStatus: "failed",
deliveryError: giveUpParams.entry.lastAnnounceDeliveryError,
});
giveUpParams.entry.wakeOnDescendantSettle = undefined;
giveUpParams.entry.fallbackFrozenResultText = undefined;
@@ -464,6 +482,7 @@ export function createSubagentRegistryLifecycleController(params: {
childSessionKey: entry.childSessionKey,
deliveryStatus: "delivered",
});
entry.lastAnnounceDeliveryError = undefined;
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
entry.fallbackFrozenResultCapturedAt = undefined;
@@ -518,6 +537,7 @@ export function createSubagentRegistryLifecycleController(params: {
runId,
childSessionKey: entry.childSessionKey,
deliveryStatus: "failed",
deliveryError: entry.lastAnnounceDeliveryError,
});
entry.wakeOnDescendantSettle = undefined;
entry.fallbackFrozenResultText = undefined;
@@ -571,7 +591,11 @@ export function createSubagentRegistryLifecycleController(params: {
return false;
}
const requesterOrigin = normalizeDeliveryContext(entry.requesterOrigin);
let latestDeliveryError = entry.lastAnnounceDeliveryError;
const finalizeAnnounceCleanup = (didAnnounce: boolean) => {
if (!didAnnounce && latestDeliveryError) {
entry.lastAnnounceDeliveryError = latestDeliveryError;
}
void finalizeSubagentCleanup(runId, entry.cleanup, didAnnounce).catch((err) => {
defaultRuntime.log(`[warn] subagent cleanup finalize failed (${runId}): ${String(err)}`);
const current = params.runs.get(runId);
@@ -603,6 +627,21 @@ export function createSubagentRegistryLifecycleController(params: {
spawnMode: entry.spawnMode,
expectsCompletionMessage: entry.expectsCompletionMessage,
wakeOnDescendantSettle: entry.wakeOnDescendantSettle === true,
onDeliveryResult: (delivery) => {
if (delivery.delivered) {
if (entry.lastAnnounceDeliveryError !== undefined) {
entry.lastAnnounceDeliveryError = undefined;
params.persist();
}
latestDeliveryError = undefined;
return;
}
latestDeliveryError = formatAnnounceDeliveryError(delivery);
if (entry.lastAnnounceDeliveryError !== latestDeliveryError) {
entry.lastAnnounceDeliveryError = latestDeliveryError;
params.persist();
}
},
})
.then((didAnnounce) => {
finalizeAnnounceCleanup(didAnnounce);

View File

@@ -30,6 +30,7 @@ export type SubagentRunRecord = {
expectsCompletionMessage?: boolean;
announceRetryCount?: number;
lastAnnounceRetryAt?: number;
lastAnnounceDeliveryError?: string;
endedReason?: SubagentLifecycleEndedReason;
wakeOnDescendantSettle?: boolean;
frozenResultText?: string | null;

View File

@@ -96,6 +96,7 @@ export type DetachedTaskDeliveryStatusParams = {
runtime?: TaskRuntime;
sessionKey?: string;
deliveryStatus: TaskDeliveryStatus;
error?: string;
};
export type DetachedTaskCancelParams = {

View File

@@ -211,6 +211,7 @@ export function setDetachedTaskDeliveryStatusByRunId(params: {
runtime?: TaskRuntime;
sessionKey?: string;
deliveryStatus: TaskDeliveryStatus;
error?: string;
}) {
return setTaskRunDeliveryStatusByRunId(params);
}

View File

@@ -1672,15 +1672,20 @@ function updateTaskDeliveryByRunId(params: {
runtime?: TaskRuntime;
sessionKey?: string;
deliveryStatus: TaskDeliveryStatus;
error?: string;
}) {
ensureTaskRegistryReady();
const patch: Partial<TaskRecord> = {
deliveryStatus: params.deliveryStatus,
};
if (params.error !== undefined) {
patch.error = params.error;
}
return updateTasksByRunId({
runId: params.runId,
runtime: params.runtime,
sessionKey: params.sessionKey,
patch: {
deliveryStatus: params.deliveryStatus,
},
patch,
});
}
@@ -1772,6 +1777,7 @@ export function setTaskRunDeliveryStatusByRunId(params: {
runtime?: TaskRuntime;
sessionKey?: string;
deliveryStatus: TaskDeliveryStatus;
error?: string;
}) {
return updateTaskDeliveryByRunId(params);
}