From 325e5e921fd64e3b4d8d56e396270cb2552a3d67 Mon Sep 17 00:00:00 2001 From: Luka Dolenc Date: Sat, 25 Apr 2026 03:49:50 +0200 Subject: [PATCH] fix: preserve thread-bound subagent completion fallback Preserve the requester-agent announce path for thread-bound subagent completions, while falling back to direct thread delivery only when the announce fails or produces no visible output.\n\nThanks @DolencLuka. --- CHANGELOG.md | 1 + .../subagent-announce-delivery.runtime.ts | 1 + src/agents/subagent-announce-delivery.test.ts | 233 +++++++++++++++- src/agents/subagent-announce-delivery.ts | 258 ++++++++++++++---- src/agents/subagent-announce-dispatch.ts | 7 +- 5 files changed, 444 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41375167489..6e951cfc8ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Discord/subagents: preserve thread-bound completion delivery by keeping the requester-agent announce path primary and falling back to direct thread sends only when the announce produces no visible output. (#71064) Thanks @DolencLuka. - Gateway/sessions: recover main-agent turns interrupted by a gateway restart from stale transcript-lock evidence, avoiding stuck `status: "running"` sessions without broad post-boot transcript scans. Fixes #70555. Thanks @bitloi. - Codex approvals: keep command approval responses within Codex app-server `availableDecisions`, including deny/cancel fallbacks for prompts that do not offer `decline`. (#71338) Thanks @Lucenx9. - Plugins/Google Meet: include live Chrome-node readiness in `googlemeet setup` and document the Parallels recovery checks, so stale node tokens or disconnected VM browsers are visible before an agent opens a meeting. Thanks @steipete. diff --git a/src/agents/subagent-announce-delivery.runtime.ts b/src/agents/subagent-announce-delivery.runtime.ts index cc99f692215..36848420600 100644 --- a/src/agents/subagent-announce-delivery.runtime.ts +++ b/src/agents/subagent-announce-delivery.runtime.ts @@ -8,6 +8,7 @@ export { export { callGateway } from "../gateway/call.js"; export { resolveQueueSettings } from "../auto-reply/reply/queue.js"; export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-effort-delivery.js"; +export { sendMessage } from "../infra/outbound/message.js"; export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js"; export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; export { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 38525a8d1cf..09f12ac55bc 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -1,6 +1,14 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { __testing, deliverSubagentAnnouncement } from "./subagent-announce-delivery.js"; -import { callGateway as runtimeCallGateway } from "./subagent-announce-delivery.runtime.js"; +import type { AgentInternalEvent } from "./internal-events.js"; +import { + __testing, + deliverSubagentAnnouncement, + extractThreadCompletionFallbackText, +} from "./subagent-announce-delivery.js"; +import { + callGateway as runtimeCallGateway, + sendMessage as runtimeSendMessage, +} from "./subagent-announce-delivery.runtime.js"; import { resolveAnnounceOrigin } from "./subagent-announce-origin.js"; afterEach(() => { @@ -14,8 +22,18 @@ const slackThreadOrigin = { threadId: "171.222", } as const; -function createGatewayMock() { - return vi.fn(async () => ({}) as Record) as unknown as typeof runtimeCallGateway; +function createGatewayMock(response: Record = {}) { + return vi.fn(async () => response) as unknown as typeof runtimeCallGateway; +} + +function createSendMessageMock() { + return vi.fn(async () => ({ + channel: "slack", + to: "channel:C123", + via: "direct" as const, + mediaUrl: null, + result: { messageId: "msg-1" }, + })) as unknown as typeof runtimeSendMessage; } async function deliverSlackThreadAnnouncement(params: { @@ -25,6 +43,8 @@ async function deliverSlackThreadAnnouncement(params: { expectsCompletionMessage: boolean; directIdempotencyKey: string; queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean; + sendMessage?: typeof runtimeSendMessage; + internalEvents?: AgentInternalEvent[]; }) { __testing.setDepsForTest({ callGateway: params.callGateway, @@ -36,6 +56,7 @@ async function deliverSlackThreadAnnouncement(params: { ...(params.queueEmbeddedPiMessage ? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage } : {}), + ...(params.sendMessage ? { sendMessage: params.sendMessage } : {}), }); return deliverSubagentAnnouncement({ @@ -51,6 +72,7 @@ async function deliverSlackThreadAnnouncement(params: { expectsCompletionMessage: params.expectsCompletionMessage, bestEffortDeliver: true, directIdempotencyKey: params.directIdempotencyKey, + internalEvents: params.internalEvents, }); } @@ -163,6 +185,153 @@ describe("deliverSubagentAnnouncement completion delivery", () => { ); }); + it("keeps announce-agent delivery primary for dormant completion events with child output", async () => { + const callGateway = createGatewayMock({ + result: { + payloads: [{ text: "requester voice completion" }], + }, + }); + const sendMessage = createSendMessageMock(); + const result = await deliverSlackThreadAnnouncement({ + callGateway, + sendMessage, + sessionId: "requester-session-4", + isActive: false, + expectsCompletionMessage: true, + directIdempotencyKey: "announce-thread-fallback-1", + internalEvents: [ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + childSessionId: "child-session-id", + announceType: "subagent task", + taskLabel: "thread completion smoke", + status: "ok", + statusLabel: "completed successfully", + result: "child completion output", + replyInstruction: "Summarize the result.", + }, + ], + }); + + expect(result).toEqual( + expect.objectContaining({ + delivered: true, + path: "direct", + }), + ); + expect(callGateway).toHaveBeenCalledWith( + expect.objectContaining({ + method: "agent", + params: expect.objectContaining({ + deliver: true, + channel: "slack", + accountId: "acct-1", + to: "channel:C123", + threadId: "171.222", + bestEffortDeliver: true, + internalEvents: expect.any(Array), + }), + }), + ); + expect(sendMessage).not.toHaveBeenCalled(); + }); + + it("uses a direct thread fallback when announce-agent delivery fails", async () => { + const callGateway = vi.fn(async () => { + throw new Error("UNAVAILABLE: gateway lost final output"); + }) as unknown as typeof runtimeCallGateway; + const sendMessage = createSendMessageMock(); + const result = await deliverSlackThreadAnnouncement({ + callGateway, + sendMessage, + sessionId: "requester-session-4", + isActive: false, + expectsCompletionMessage: true, + directIdempotencyKey: "announce-thread-fallback-1", + internalEvents: [ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + childSessionId: "child-session-id", + announceType: "subagent task", + taskLabel: "thread completion smoke", + status: "ok", + statusLabel: "completed successfully", + result: "child completion output", + replyInstruction: "Summarize the result.", + }, + ], + }); + + expect(result).toEqual( + expect.objectContaining({ + delivered: true, + path: "direct-thread-fallback", + }), + ); + expect(callGateway).toHaveBeenCalled(); + expect(sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "slack", + accountId: "acct-1", + to: "channel:C123", + threadId: "171.222", + content: "child completion output", + requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222", + bestEffort: true, + idempotencyKey: "announce-thread-fallback-1", + }), + ); + }); + + it("uses a direct thread fallback when announce-agent returns no visible output", async () => { + const callGateway = createGatewayMock({ + result: { + payloads: [], + }, + }); + const sendMessage = createSendMessageMock(); + const result = await deliverSlackThreadAnnouncement({ + callGateway, + sendMessage, + sessionId: "requester-session-4", + isActive: false, + expectsCompletionMessage: true, + directIdempotencyKey: "announce-thread-fallback-empty", + internalEvents: [ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + childSessionId: "child-session-id", + announceType: "subagent task", + taskLabel: "thread completion smoke", + status: "ok", + statusLabel: "completed successfully", + result: "child completion output", + replyInstruction: "Summarize the result.", + }, + ], + }); + + expect(result).toEqual( + expect.objectContaining({ + delivered: true, + path: "direct-thread-fallback", + }), + ); + expect(callGateway).toHaveBeenCalled(); + expect(sendMessage).toHaveBeenCalledWith( + expect.objectContaining({ + content: "child completion output", + idempotencyKey: "announce-thread-fallback-empty", + }), + ); + }); + it("keeps direct external delivery for non-completion announces", async () => { const callGateway = createGatewayMock(); await deliverSlackThreadAnnouncement({ @@ -188,3 +357,59 @@ describe("deliverSubagentAnnouncement completion delivery", () => { ); }); }); + +describe("extractThreadCompletionFallbackText", () => { + it("prefers task completion result text", () => { + expect( + extractThreadCompletionFallbackText([ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + announceType: "subagent task", + taskLabel: "sample task", + status: "ok", + statusLabel: "completed successfully", + result: "final child result", + replyInstruction: "Summarize the result.", + }, + ]), + ).toBe("final child result"); + }); + + it("falls back to task and status labels when result text is empty", () => { + expect( + extractThreadCompletionFallbackText([ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + announceType: "subagent task", + taskLabel: "sample task", + status: "ok", + statusLabel: "completed successfully", + result: " ", + replyInstruction: "Summarize the result.", + }, + ]), + ).toBe("sample task: completed successfully"); + }); + + it("falls back to the task label when result and status label are empty", () => { + expect( + extractThreadCompletionFallbackText([ + { + type: "task_completion", + source: "subagent", + childSessionKey: "agent:worker:subagent:child", + announceType: "subagent task", + taskLabel: "sample task", + status: "ok", + statusLabel: " ", + result: " ", + replyInstruction: "Summarize the result.", + }, + ]), + ).toBe("sample task"); + }); +}); diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 6855a0e4be4..8df48e7b7cc 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -31,6 +31,7 @@ import { resolveExternalBestEffortDeliveryTarget, resolveQueueSettings, resolveStorePath, + sendMessage, } from "./subagent-announce-delivery.runtime.js"; import { runSubagentAnnounceDispatch, @@ -55,6 +56,7 @@ type SubagentAnnounceDeliveryDeps = { isActive: boolean; }; queueEmbeddedPiMessage: typeof queueEmbeddedPiMessage; + sendMessage: typeof sendMessage; }; const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { @@ -70,6 +72,7 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = { }; }, queueEmbeddedPiMessage, + sendMessage, }; let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = @@ -105,6 +108,14 @@ function resolveBoundConversationOrigin(params: { conversationId, parentConversationId, }); + const inferredThreadId = + boundTarget.threadId ?? + (parentConversationId && parentConversationId !== conversationId + ? conversationId + : undefined) ?? + (params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== "" + ? String(params.requesterOrigin.threadId) + : undefined); if ( requesterTo && conversationId && @@ -115,22 +126,14 @@ function resolveBoundConversationOrigin(params: { channel: conversation.channel, accountId: conversation.accountId, to: requesterTo, - threadId: - boundTarget.threadId ?? - (params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== "" - ? String(params.requesterOrigin.threadId) - : undefined), + threadId: inferredThreadId, }; } return { channel: conversation.channel, accountId: conversation.accountId, to: boundTarget.to, - threadId: - boundTarget.threadId ?? - (params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== "" - ? String(params.requesterOrigin.threadId) - : undefined), + threadId: inferredThreadId, }; } @@ -478,6 +481,111 @@ async function maybeQueueSubagentAnnounce(params: { return "none"; } +export function extractThreadCompletionFallbackText(internalEvents?: AgentInternalEvent[]): string { + if (!internalEvents || internalEvents.length === 0) { + return ""; + } + for (const event of internalEvents) { + if (event.type !== "task_completion") { + continue; + } + const result = event.result.trim(); + if (result) { + return result; + } + const statusLabel = event.statusLabel.trim(); + const taskLabel = event.taskLabel.trim(); + if (statusLabel && taskLabel) { + return `${taskLabel}: ${statusLabel}`; + } + if (statusLabel) { + return statusLabel; + } + if (taskLabel) { + return taskLabel; + } + } + return ""; +} + +function hasVisibleGatewayAgentPayload(response: unknown): boolean { + const result = + response && typeof response === "object" && "result" in response + ? (response as { result?: unknown }).result + : undefined; + const payloads = + result && typeof result === "object" && "payloads" in result + ? (result as { payloads?: unknown }).payloads + : undefined; + if (!Array.isArray(payloads)) { + return false; + } + return payloads.some((payload) => { + if (!payload || typeof payload !== "object") { + return false; + } + const record = payload as { + text?: unknown; + mediaUrl?: unknown; + mediaUrls?: unknown; + presentation?: unknown; + interactive?: unknown; + channelData?: unknown; + }; + const text = typeof record.text === "string" ? record.text.trim() : ""; + const mediaUrl = typeof record.mediaUrl === "string" ? record.mediaUrl.trim() : ""; + const mediaUrls = Array.isArray(record.mediaUrls) + ? record.mediaUrls.some((item) => typeof item === "string" && item.trim()) + : false; + return Boolean( + text || + mediaUrl || + mediaUrls || + record.presentation || + record.interactive || + record.channelData, + ); + }); +} + +async function sendThreadCompletionFallback(params: { + cfg: OpenClawConfig; + channel?: string; + to?: string; + accountId?: string; + threadId?: string; + content: string; + requesterSessionKey: string; + bestEffortDeliver?: boolean; + idempotencyKey: string; + signal?: AbortSignal; +}): Promise { + const channel = params.channel?.trim(); + const to = params.to?.trim(); + const content = params.content.trim(); + if (!channel || !to || !params.threadId || !content) { + return false; + } + await runAnnounceDeliveryWithRetry({ + operation: "completion direct thread fallback send", + signal: params.signal, + run: async () => + await subagentAnnounceDeliveryDeps.sendMessage({ + cfg: params.cfg, + channel, + to, + accountId: params.accountId, + threadId: params.threadId, + content, + requesterSessionKey: params.requesterSessionKey, + bestEffort: params.bestEffortDeliver, + idempotencyKey: params.idempotencyKey, + abortSignal: params.signal, + }), + }); + return true; +} + async function sendSubagentAnnounceDirectly(params: { targetRequesterSessionKey: string; triggerMessage: string; @@ -565,48 +673,96 @@ async function sendSubagentAnnounceDirectly(params: { path: "none", }; } - await runAnnounceDeliveryWithRetry({ - operation: params.expectsCompletionMessage - ? "completion direct announce agent call" - : "direct announce agent call", - signal: params.signal, - run: async () => - await subagentAnnounceDeliveryDeps.callGateway({ - method: "agent", - params: { - sessionKey: canonicalRequesterSessionKey, - message: params.triggerMessage, - deliver: deliveryTarget.deliver, - bestEffortDeliver: params.bestEffortDeliver, - internalEvents: params.internalEvents, - channel: deliveryTarget.deliver ? deliveryTarget.channel : sessionOnlyOriginChannel, - accountId: deliveryTarget.deliver - ? deliveryTarget.accountId - : sessionOnlyOriginChannel - ? sessionOnlyOrigin?.accountId - : undefined, - to: deliveryTarget.deliver - ? deliveryTarget.to - : sessionOnlyOriginChannel - ? sessionOnlyOrigin?.to - : undefined, - threadId: deliveryTarget.deliver - ? deliveryTarget.threadId - : sessionOnlyOriginChannel - ? sessionOnlyOrigin?.threadId - : undefined, - inputProvenance: { - kind: "inter_session", - sourceSessionKey: params.sourceSessionKey, - sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL, - sourceTool: params.sourceTool ?? "subagent_announce", + const threadCompletionFallbackText = + params.expectsCompletionMessage && deliveryTarget.deliver && deliveryTarget.threadId + ? extractThreadCompletionFallbackText(params.internalEvents) + : ""; + let directAnnounceResponse: unknown; + try { + directAnnounceResponse = await runAnnounceDeliveryWithRetry({ + operation: params.expectsCompletionMessage + ? "completion direct announce agent call" + : "direct announce agent call", + signal: params.signal, + run: async () => + await subagentAnnounceDeliveryDeps.callGateway({ + method: "agent", + params: { + sessionKey: canonicalRequesterSessionKey, + message: params.triggerMessage, + deliver: deliveryTarget.deliver, + bestEffortDeliver: params.bestEffortDeliver, + internalEvents: params.internalEvents, + channel: deliveryTarget.deliver ? deliveryTarget.channel : sessionOnlyOriginChannel, + accountId: deliveryTarget.deliver + ? deliveryTarget.accountId + : sessionOnlyOriginChannel + ? sessionOnlyOrigin?.accountId + : undefined, + to: deliveryTarget.deliver + ? deliveryTarget.to + : sessionOnlyOriginChannel + ? sessionOnlyOrigin?.to + : undefined, + threadId: deliveryTarget.deliver + ? deliveryTarget.threadId + : sessionOnlyOriginChannel + ? sessionOnlyOrigin?.threadId + : undefined, + inputProvenance: { + kind: "inter_session", + sourceSessionKey: params.sourceSessionKey, + sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL, + sourceTool: params.sourceTool ?? "subagent_announce", + }, + idempotencyKey: params.directIdempotencyKey, }, - idempotencyKey: params.directIdempotencyKey, - }, - expectFinal: true, - timeoutMs: announceTimeoutMs, - }), - }); + expectFinal: true, + timeoutMs: announceTimeoutMs, + }), + }); + } catch (err) { + const didFallback = await sendThreadCompletionFallback({ + cfg, + channel: deliveryTarget.channel, + to: deliveryTarget.to, + accountId: deliveryTarget.accountId, + threadId: deliveryTarget.threadId, + content: threadCompletionFallbackText, + requesterSessionKey: canonicalRequesterSessionKey, + bestEffortDeliver: params.bestEffortDeliver, + idempotencyKey: params.directIdempotencyKey, + signal: params.signal, + }); + if (didFallback) { + return { + delivered: true, + path: "direct-thread-fallback", + }; + } + throw err; + } + + if (threadCompletionFallbackText && !hasVisibleGatewayAgentPayload(directAnnounceResponse)) { + const didFallback = await sendThreadCompletionFallback({ + cfg, + channel: deliveryTarget.channel, + to: deliveryTarget.to, + accountId: deliveryTarget.accountId, + threadId: deliveryTarget.threadId, + content: threadCompletionFallbackText, + requesterSessionKey: canonicalRequesterSessionKey, + bestEffortDeliver: params.bestEffortDeliver, + idempotencyKey: params.directIdempotencyKey, + signal: params.signal, + }); + if (didFallback) { + return { + delivered: true, + path: "direct-thread-fallback", + }; + } + } return { delivered: true, diff --git a/src/agents/subagent-announce-dispatch.ts b/src/agents/subagent-announce-dispatch.ts index 48ce0b835da..66377a145e9 100644 --- a/src/agents/subagent-announce-dispatch.ts +++ b/src/agents/subagent-announce-dispatch.ts @@ -1,4 +1,9 @@ -export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none"; +export type SubagentDeliveryPath = + | "queued" + | "steered" + | "direct" + | "direct-thread-fallback" + | "none"; export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none" | "dropped";