From 855872986ea36a42e91800d7c571bcb4fc2bab26 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 24 Apr 2026 15:20:26 +0100 Subject: [PATCH] fix(release): harden subagent completion delivery --- ...dia-file-path-no-file-download.e2e.test.ts | 2 +- src/agents/subagent-announce-delivery.ts | 93 ++++++++++++++----- src/agents/subagent-announce-output.ts | 16 +--- .../subagent-announce.format.e2e.test.ts | 11 +++ src/agents/subagent-announce.ts | 13 ++- ...registry.lifecycle-retry-grace.e2e.test.ts | 12 +++ src/gateway/server-startup-plugins.ts | 23 +++-- 7 files changed, 122 insertions(+), 48 deletions(-) diff --git a/extensions/telegram/src/bot.media.downloads-media-file-path-no-file-download.e2e.test.ts b/extensions/telegram/src/bot.media.downloads-media-file-path-no-file-download.e2e.test.ts index 39888ecf9bc..16838548096 100644 --- a/extensions/telegram/src/bot.media.downloads-media-file-path-no-file-download.e2e.test.ts +++ b/extensions/telegram/src/bot.media.downloads-media-file-path-no-file-download.e2e.test.ts @@ -207,7 +207,7 @@ describe("telegram inbound media", () => { }, }, assert: (payload: Record) => { - expect(payload.Body).toContain("Eiffel Tower"); + expect(payload.Body).toContain("48.858844"); expect(payload.LocationName).toBe("Eiffel Tower"); expect(payload.LocationAddress).toBe("Champ de Mars, Paris"); expect(payload.LocationSource).toBe("place"); diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index da2946e4900..6855a0e4be4 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -75,6 +75,65 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = defaultSubagentAnnounceDeliveryDeps; +function resolveBoundConversationOrigin(params: { + bindingConversation: ConversationRef & { parentConversationId?: string }; + requesterConversation?: ConversationRef; + requesterOrigin?: DeliveryContext; +}): DeliveryContext { + const conversation = params.bindingConversation; + const conversationId = conversation.conversationId?.trim() ?? ""; + const parentConversationId = conversation.parentConversationId?.trim() ?? ""; + const requesterConversationId = params.requesterConversation?.conversationId?.trim() ?? ""; + const requesterTo = params.requesterOrigin?.to?.trim(); + if ( + conversation.channel === "matrix" && + parentConversationId && + requesterConversationId && + parentConversationId === requesterConversationId && + requesterTo + ) { + return { + channel: conversation.channel, + accountId: conversation.accountId, + to: requesterTo, + ...(conversationId ? { threadId: conversationId } : {}), + }; + } + + const boundTarget = resolveConversationDeliveryTarget({ + channel: conversation.channel, + conversationId, + parentConversationId, + }); + if ( + requesterTo && + conversationId && + requesterConversationId && + conversationId.toLowerCase() === requesterConversationId.toLowerCase() + ) { + return { + channel: conversation.channel, + accountId: conversation.accountId, + to: requesterTo, + threadId: + boundTarget.threadId ?? + (params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== "" + ? String(params.requesterOrigin.threadId) + : undefined), + }; + } + return { + channel: conversation.channel, + accountId: conversation.accountId, + to: boundTarget.to, + threadId: + boundTarget.threadId ?? + (params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== "" + ? String(params.requesterOrigin.threadId) + : undefined), + }; +} + function resolveRequesterSessionActivity(requesterSessionKey: string) { const activity = subagentAnnounceDeliveryDeps.getRequesterSessionActivity(requesterSessionKey); if (activity.sessionId || activity.isActive) { @@ -243,22 +302,12 @@ export async function resolveSubagentCompletionOrigin(params: { failClosed: false, }); if (route.mode === "bound" && route.binding) { - const boundTarget = resolveConversationDeliveryTarget({ - channel: route.binding.conversation.channel, - conversationId: route.binding.conversation.conversationId, - parentConversationId: route.binding.conversation.parentConversationId, - }); return mergeDeliveryContext( - { - channel: route.binding.conversation.channel, - accountId: route.binding.conversation.accountId, - to: boundTarget.to, - threadId: - boundTarget.threadId ?? - (requesterOrigin?.threadId != null && requesterOrigin.threadId !== "" - ? String(requesterOrigin.threadId) - : undefined), - }, + resolveBoundConversationOrigin({ + bindingConversation: route.binding.conversation, + requesterConversation, + requesterOrigin, + }), requesterOrigin, ); } @@ -489,7 +538,7 @@ async function sendSubagentAnnounceDirectly(params: { ? normalizedSessionOnlyOriginChannel : undefined; const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey); - if (params.expectsCompletionMessage && requesterActivity.isActive) { + if (params.expectsCompletionMessage && requesterActivity.sessionId) { const woke = requesterActivity.sessionId ? subagentAnnounceDeliveryDeps.queueEmbeddedPiMessage( requesterActivity.sessionId, @@ -502,11 +551,13 @@ async function sendSubagentAnnounceDirectly(params: { path: "steered", }; } - return { - delivered: false, - path: "direct", - error: "active requester session could not be woken", - }; + if (requesterActivity.isActive) { + return { + delivered: false, + path: "direct", + error: "active requester session could not be woken", + }; + } } if (params.signal?.aborted) { return { diff --git a/src/agents/subagent-announce-output.ts b/src/agents/subagent-announce-output.ts index 856df046d39..880aac5788f 100644 --- a/src/agents/subagent-announce-output.ts +++ b/src/agents/subagent-announce-output.ts @@ -63,10 +63,6 @@ export type SubagentRunOutcome = { elapsedMs?: number; }; -function isFailedOutcome(outcome?: SubagentRunOutcome): boolean { - return outcome?.status === "error"; -} - function readFiniteNumber(value: number | undefined): number | undefined { return typeof value === "number" && Number.isFinite(value) ? value : undefined; } @@ -156,6 +152,9 @@ function extractSubagentOutputText(message: unknown): string { const role = (message as { role?: unknown }).role; const content = (message as { content?: unknown }).content; if (role === "assistant") { + if (typeof content === "string") { + return sanitizeTextContent(content); + } return extractAssistantText(message) ?? ""; } if (role === "toolResult" || role === "tool") { @@ -257,9 +256,6 @@ function selectSubagentOutputText( snapshot: SubagentOutputSnapshot, outcome?: SubagentRunOutcome, ): string | undefined { - if (isFailedOutcome(outcome)) { - return undefined; - } if (snapshot.latestSilentText) { return snapshot.latestSilentText; } @@ -277,9 +273,6 @@ export async function readSubagentOutput( sessionKey: string, outcome?: SubagentRunOutcome, ): Promise { - if (isFailedOutcome(outcome)) { - return undefined; - } const history = await subagentAnnounceOutputDeps.callGateway({ method: "chat.history", params: { sessionKey, limit: 100 }, @@ -359,9 +352,6 @@ export async function captureSubagentCompletionReply( sessionKey: string, options?: { waitForReply?: boolean; outcome?: SubagentRunOutcome }, ): Promise { - if (isFailedOutcome(options?.outcome)) { - return undefined; - } return await captureSubagentCompletionReplyUsing({ sessionKey, waitForReply: options?.waitForReply, diff --git a/src/agents/subagent-announce.format.e2e.test.ts b/src/agents/subagent-announce.format.e2e.test.ts index 10bca5ccd99..ed619ae02a2 100644 --- a/src/agents/subagent-announce.format.e2e.test.ts +++ b/src/agents/subagent-announce.format.e2e.test.ts @@ -249,6 +249,17 @@ describe("subagent announce formatting", () => { callGateway: async >( req: Parameters[0], ) => (await callGatewaySpy(req)) as T, + loadConfig: () => configOverride, + getRequesterSessionActivity: (requesterSessionKey: string) => { + const entry = loadSessionStoreFixture()[requesterSessionKey]; + const sessionId = entry?.sessionId; + return { + sessionId, + isActive: Boolean(sessionId && embeddedRunMock.isEmbeddedPiRunActive(sessionId)), + }; + }, + queueEmbeddedPiMessage: (sessionId: string, text: string) => + embeddedRunMock.queueEmbeddedPiMessage(sessionId, text), }); loadSessionStoreSpy.mockReset().mockImplementation(() => loadSessionStoreFixture()); resolveAgentIdFromSessionKeySpy.mockReset().mockImplementation(() => "main"); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index b9043c6bf03..a6302402da6 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -286,10 +286,11 @@ export async function runSubagentAnnounceFlow(params: { outcome = { status: "unknown" }; } const failedTerminalOutcome = outcome.status === "error"; + const allowFailedOutputCapture = + !failedTerminalOutcome || (!params.roundOneReply && !params.fallbackReply); if (failedTerminalOutcome) { reply = undefined; } - let requesterDepth = getSubagentDepthFromSessionStore(targetRequesterSessionKey); const requesterIsInternalSession = () => requesterDepth >= 1 || isCronSessionKey(targetRequesterSessionKey); @@ -370,17 +371,19 @@ export async function runSubagentAnnounceFlow(params: { } } - if (!childCompletionFindings && !failedTerminalOutcome) { - const fallbackReply = normalizeOptionalString(params.fallbackReply); + if (!childCompletionFindings) { + const fallbackReply = failedTerminalOutcome + ? undefined + : normalizeOptionalString(params.fallbackReply); const fallbackIsSilent = Boolean(fallbackReply) && (isAnnounceSkip(fallbackReply) || isSilentReplyText(fallbackReply, SILENT_REPLY_TOKEN)); - if (!reply) { + if (!reply && allowFailedOutputCapture) { reply = await readSubagentOutput(params.childSessionKey, outcome); } - if (!reply?.trim()) { + if (!reply?.trim() && allowFailedOutputCapture) { reply = await readLatestSubagentOutputWithRetry({ sessionKey: params.childSessionKey, maxWaitMs: params.timeoutMs, diff --git a/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts b/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts index 9df935b084f..9b6c129d4c5 100644 --- a/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts +++ b/src/agents/subagent-registry.lifecycle-retry-grace.e2e.test.ts @@ -104,6 +104,10 @@ vi.mock("../plugins/hook-runner-global.js", () => ({ getGlobalHookRunner: vi.fn(() => null), })); +vi.mock("../browser-lifecycle-cleanup.js", () => ({ + cleanupBrowserSessionsForLifecycleEnd: vi.fn(async () => {}), +})); + vi.mock("./subagent-depth.js", () => ({ getSubagentDepthFromSessionStore: () => 0, })); @@ -178,6 +182,13 @@ describe("subagent registry lifecycle error grace", () => { subagentAnnounceDeliveryTesting.setDepsForTest({ callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway, loadConfig: loadConfigMock as typeof import("../config/config.js").loadConfig, + getRequesterSessionActivity: (requesterSessionKey: string) => { + const entry = sessionStore[requesterSessionKey]; + return { + sessionId: entry?.sessionId, + isActive: false, + }; + }, }); subagentAnnounceOutputTesting.setDepsForTest({ callGateway: callGatewayMock as typeof import("../gateway/call.js").callGateway, @@ -457,6 +468,7 @@ describe("subagent registry lifecycle error grace", () => { emitLifecycleEvent("run-refresh-silent", { phase: "end", endedAt }); await flushAsync(); await waitForCleanupHandledFalse("run-refresh-silent"); + await waitForFrozenResultText("run-refresh-silent", "All work complete, final summary"); setAssistantOutput("agent:main:subagent:refresh-silent", "NO_REPLY"); emitLifecycleEvent( diff --git a/src/gateway/server-startup-plugins.ts b/src/gateway/server-startup-plugins.ts index 8fc07df1928..7666c6c3026 100644 --- a/src/gateway/server-startup-plugins.ts +++ b/src/gateway/server-startup-plugins.ts @@ -35,19 +35,26 @@ export async function prepareGatewayPluginBootstrap(params: { } : params.cfgAtStart; - if (!params.minimalTestGateway) { - await Promise.all([ + const shouldRunStartupMaintenance = + !params.minimalTestGateway || startupMaintenanceConfig.channels !== undefined; + if (shouldRunStartupMaintenance) { + const startupTasks = [ runChannelPluginStartupMaintenance({ cfg: startupMaintenanceConfig, env: process.env, log: params.log, }), - runStartupSessionMigration({ - cfg: params.cfgAtStart, - env: process.env, - log: params.log, - }), - ]); + ]; + if (!params.minimalTestGateway) { + startupTasks.push( + runStartupSessionMigration({ + cfg: params.cfgAtStart, + env: process.env, + log: params.log, + }), + ); + } + await Promise.all(startupTasks); } initSubagentRegistry();