fix: preserve thread-bound subagent completion fallback

Preserve the requester-agent announce path for thread-bound subagent completions, while falling back to direct thread delivery only when the announce fails or produces no visible output.\n\nThanks @DolencLuka.
This commit is contained in:
Luka Dolenc
2026-04-25 03:49:50 +02:00
committed by GitHub
parent 5865197ec1
commit 325e5e921f
5 changed files with 444 additions and 56 deletions

View File

@@ -63,6 +63,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Discord/subagents: preserve thread-bound completion delivery by keeping the requester-agent announce path primary and falling back to direct thread sends only when the announce produces no visible output. (#71064) Thanks @DolencLuka.
- Gateway/sessions: recover main-agent turns interrupted by a gateway restart from stale transcript-lock evidence, avoiding stuck `status: "running"` sessions without broad post-boot transcript scans. Fixes #70555. Thanks @bitloi.
- Codex approvals: keep command approval responses within Codex app-server `availableDecisions`, including deny/cancel fallbacks for prompts that do not offer `decline`. (#71338) Thanks @Lucenx9.
- Plugins/Google Meet: include live Chrome-node readiness in `googlemeet setup` and document the Parallels recovery checks, so stale node tokens or disconnected VM browsers are visible before an agent opens a meeting. Thanks @steipete.

View File

@@ -8,6 +8,7 @@ export {
export { callGateway } from "../gateway/call.js";
export { resolveQueueSettings } from "../auto-reply/reply/queue.js";
export { resolveExternalBestEffortDeliveryTarget } from "../infra/outbound/best-effort-delivery.js";
export { sendMessage } from "../infra/outbound/message.js";
export { createBoundDeliveryRouter } from "../infra/outbound/bound-delivery-router.js";
export { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js";
export { getGlobalHookRunner } from "../plugins/hook-runner-global.js";

View File

@@ -1,6 +1,14 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { __testing, deliverSubagentAnnouncement } from "./subagent-announce-delivery.js";
import { callGateway as runtimeCallGateway } from "./subagent-announce-delivery.runtime.js";
import type { AgentInternalEvent } from "./internal-events.js";
import {
__testing,
deliverSubagentAnnouncement,
extractThreadCompletionFallbackText,
} from "./subagent-announce-delivery.js";
import {
callGateway as runtimeCallGateway,
sendMessage as runtimeSendMessage,
} from "./subagent-announce-delivery.runtime.js";
import { resolveAnnounceOrigin } from "./subagent-announce-origin.js";
afterEach(() => {
@@ -14,8 +22,18 @@ const slackThreadOrigin = {
threadId: "171.222",
} as const;
function createGatewayMock() {
return vi.fn(async () => ({}) as Record<string, unknown>) as unknown as typeof runtimeCallGateway;
function createGatewayMock(response: Record<string, unknown> = {}) {
return vi.fn(async () => response) as unknown as typeof runtimeCallGateway;
}
function createSendMessageMock() {
return vi.fn(async () => ({
channel: "slack",
to: "channel:C123",
via: "direct" as const,
mediaUrl: null,
result: { messageId: "msg-1" },
})) as unknown as typeof runtimeSendMessage;
}
async function deliverSlackThreadAnnouncement(params: {
@@ -25,6 +43,8 @@ async function deliverSlackThreadAnnouncement(params: {
expectsCompletionMessage: boolean;
directIdempotencyKey: string;
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
}) {
__testing.setDepsForTest({
callGateway: params.callGateway,
@@ -36,6 +56,7 @@ async function deliverSlackThreadAnnouncement(params: {
...(params.queueEmbeddedPiMessage
? { queueEmbeddedPiMessage: params.queueEmbeddedPiMessage }
: {}),
...(params.sendMessage ? { sendMessage: params.sendMessage } : {}),
});
return deliverSubagentAnnouncement({
@@ -51,6 +72,7 @@ async function deliverSlackThreadAnnouncement(params: {
expectsCompletionMessage: params.expectsCompletionMessage,
bestEffortDeliver: true,
directIdempotencyKey: params.directIdempotencyKey,
internalEvents: params.internalEvents,
});
}
@@ -163,6 +185,153 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
);
});
it("keeps announce-agent delivery primary for dormant completion events with child output", async () => {
const callGateway = createGatewayMock({
result: {
payloads: [{ text: "requester voice completion" }],
},
});
const sendMessage = createSendMessageMock();
const result = await deliverSlackThreadAnnouncement({
callGateway,
sendMessage,
sessionId: "requester-session-4",
isActive: false,
expectsCompletionMessage: true,
directIdempotencyKey: "announce-thread-fallback-1",
internalEvents: [
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
childSessionId: "child-session-id",
announceType: "subagent task",
taskLabel: "thread completion smoke",
status: "ok",
statusLabel: "completed successfully",
result: "child completion output",
replyInstruction: "Summarize the result.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "direct",
}),
);
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "agent",
params: expect.objectContaining({
deliver: true,
channel: "slack",
accountId: "acct-1",
to: "channel:C123",
threadId: "171.222",
bestEffortDeliver: true,
internalEvents: expect.any(Array),
}),
}),
);
expect(sendMessage).not.toHaveBeenCalled();
});
it("uses a direct thread fallback when announce-agent delivery fails", async () => {
const callGateway = vi.fn(async () => {
throw new Error("UNAVAILABLE: gateway lost final output");
}) as unknown as typeof runtimeCallGateway;
const sendMessage = createSendMessageMock();
const result = await deliverSlackThreadAnnouncement({
callGateway,
sendMessage,
sessionId: "requester-session-4",
isActive: false,
expectsCompletionMessage: true,
directIdempotencyKey: "announce-thread-fallback-1",
internalEvents: [
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
childSessionId: "child-session-id",
announceType: "subagent task",
taskLabel: "thread completion smoke",
status: "ok",
statusLabel: "completed successfully",
result: "child completion output",
replyInstruction: "Summarize the result.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "direct-thread-fallback",
}),
);
expect(callGateway).toHaveBeenCalled();
expect(sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
channel: "slack",
accountId: "acct-1",
to: "channel:C123",
threadId: "171.222",
content: "child completion output",
requesterSessionKey: "agent:main:slack:channel:C123:thread:171.222",
bestEffort: true,
idempotencyKey: "announce-thread-fallback-1",
}),
);
});
it("uses a direct thread fallback when announce-agent returns no visible output", async () => {
const callGateway = createGatewayMock({
result: {
payloads: [],
},
});
const sendMessage = createSendMessageMock();
const result = await deliverSlackThreadAnnouncement({
callGateway,
sendMessage,
sessionId: "requester-session-4",
isActive: false,
expectsCompletionMessage: true,
directIdempotencyKey: "announce-thread-fallback-empty",
internalEvents: [
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
childSessionId: "child-session-id",
announceType: "subagent task",
taskLabel: "thread completion smoke",
status: "ok",
statusLabel: "completed successfully",
result: "child completion output",
replyInstruction: "Summarize the result.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "direct-thread-fallback",
}),
);
expect(callGateway).toHaveBeenCalled();
expect(sendMessage).toHaveBeenCalledWith(
expect.objectContaining({
content: "child completion output",
idempotencyKey: "announce-thread-fallback-empty",
}),
);
});
it("keeps direct external delivery for non-completion announces", async () => {
const callGateway = createGatewayMock();
await deliverSlackThreadAnnouncement({
@@ -188,3 +357,59 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
);
});
});
describe("extractThreadCompletionFallbackText", () => {
it("prefers task completion result text", () => {
expect(
extractThreadCompletionFallbackText([
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
announceType: "subagent task",
taskLabel: "sample task",
status: "ok",
statusLabel: "completed successfully",
result: "final child result",
replyInstruction: "Summarize the result.",
},
]),
).toBe("final child result");
});
it("falls back to task and status labels when result text is empty", () => {
expect(
extractThreadCompletionFallbackText([
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
announceType: "subagent task",
taskLabel: "sample task",
status: "ok",
statusLabel: "completed successfully",
result: " ",
replyInstruction: "Summarize the result.",
},
]),
).toBe("sample task: completed successfully");
});
it("falls back to the task label when result and status label are empty", () => {
expect(
extractThreadCompletionFallbackText([
{
type: "task_completion",
source: "subagent",
childSessionKey: "agent:worker:subagent:child",
announceType: "subagent task",
taskLabel: "sample task",
status: "ok",
statusLabel: " ",
result: " ",
replyInstruction: "Summarize the result.",
},
]),
).toBe("sample task");
});
});

View File

@@ -31,6 +31,7 @@ import {
resolveExternalBestEffortDeliveryTarget,
resolveQueueSettings,
resolveStorePath,
sendMessage,
} from "./subagent-announce-delivery.runtime.js";
import {
runSubagentAnnounceDispatch,
@@ -55,6 +56,7 @@ type SubagentAnnounceDeliveryDeps = {
isActive: boolean;
};
queueEmbeddedPiMessage: typeof queueEmbeddedPiMessage;
sendMessage: typeof sendMessage;
};
const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
@@ -70,6 +72,7 @@ const defaultSubagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps = {
};
},
queueEmbeddedPiMessage,
sendMessage,
};
let subagentAnnounceDeliveryDeps: SubagentAnnounceDeliveryDeps =
@@ -105,6 +108,14 @@ function resolveBoundConversationOrigin(params: {
conversationId,
parentConversationId,
});
const inferredThreadId =
boundTarget.threadId ??
(parentConversationId && parentConversationId !== conversationId
? conversationId
: undefined) ??
(params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== ""
? String(params.requesterOrigin.threadId)
: undefined);
if (
requesterTo &&
conversationId &&
@@ -115,22 +126,14 @@ function resolveBoundConversationOrigin(params: {
channel: conversation.channel,
accountId: conversation.accountId,
to: requesterTo,
threadId:
boundTarget.threadId ??
(params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== ""
? String(params.requesterOrigin.threadId)
: undefined),
threadId: inferredThreadId,
};
}
return {
channel: conversation.channel,
accountId: conversation.accountId,
to: boundTarget.to,
threadId:
boundTarget.threadId ??
(params.requesterOrigin?.threadId != null && params.requesterOrigin.threadId !== ""
? String(params.requesterOrigin.threadId)
: undefined),
threadId: inferredThreadId,
};
}
@@ -478,6 +481,111 @@ async function maybeQueueSubagentAnnounce(params: {
return "none";
}
export function extractThreadCompletionFallbackText(internalEvents?: AgentInternalEvent[]): string {
if (!internalEvents || internalEvents.length === 0) {
return "";
}
for (const event of internalEvents) {
if (event.type !== "task_completion") {
continue;
}
const result = event.result.trim();
if (result) {
return result;
}
const statusLabel = event.statusLabel.trim();
const taskLabel = event.taskLabel.trim();
if (statusLabel && taskLabel) {
return `${taskLabel}: ${statusLabel}`;
}
if (statusLabel) {
return statusLabel;
}
if (taskLabel) {
return taskLabel;
}
}
return "";
}
function hasVisibleGatewayAgentPayload(response: unknown): boolean {
const result =
response && typeof response === "object" && "result" in response
? (response as { result?: unknown }).result
: undefined;
const payloads =
result && typeof result === "object" && "payloads" in result
? (result as { payloads?: unknown }).payloads
: undefined;
if (!Array.isArray(payloads)) {
return false;
}
return payloads.some((payload) => {
if (!payload || typeof payload !== "object") {
return false;
}
const record = payload as {
text?: unknown;
mediaUrl?: unknown;
mediaUrls?: unknown;
presentation?: unknown;
interactive?: unknown;
channelData?: unknown;
};
const text = typeof record.text === "string" ? record.text.trim() : "";
const mediaUrl = typeof record.mediaUrl === "string" ? record.mediaUrl.trim() : "";
const mediaUrls = Array.isArray(record.mediaUrls)
? record.mediaUrls.some((item) => typeof item === "string" && item.trim())
: false;
return Boolean(
text ||
mediaUrl ||
mediaUrls ||
record.presentation ||
record.interactive ||
record.channelData,
);
});
}
async function sendThreadCompletionFallback(params: {
cfg: OpenClawConfig;
channel?: string;
to?: string;
accountId?: string;
threadId?: string;
content: string;
requesterSessionKey: string;
bestEffortDeliver?: boolean;
idempotencyKey: string;
signal?: AbortSignal;
}): Promise<boolean> {
const channel = params.channel?.trim();
const to = params.to?.trim();
const content = params.content.trim();
if (!channel || !to || !params.threadId || !content) {
return false;
}
await runAnnounceDeliveryWithRetry({
operation: "completion direct thread fallback send",
signal: params.signal,
run: async () =>
await subagentAnnounceDeliveryDeps.sendMessage({
cfg: params.cfg,
channel,
to,
accountId: params.accountId,
threadId: params.threadId,
content,
requesterSessionKey: params.requesterSessionKey,
bestEffort: params.bestEffortDeliver,
idempotencyKey: params.idempotencyKey,
abortSignal: params.signal,
}),
});
return true;
}
async function sendSubagentAnnounceDirectly(params: {
targetRequesterSessionKey: string;
triggerMessage: string;
@@ -565,48 +673,96 @@ async function sendSubagentAnnounceDirectly(params: {
path: "none",
};
}
await runAnnounceDeliveryWithRetry({
operation: params.expectsCompletionMessage
? "completion direct announce agent call"
: "direct announce agent call",
signal: params.signal,
run: async () =>
await subagentAnnounceDeliveryDeps.callGateway({
method: "agent",
params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: deliveryTarget.deliver,
bestEffortDeliver: params.bestEffortDeliver,
internalEvents: params.internalEvents,
channel: deliveryTarget.deliver ? deliveryTarget.channel : sessionOnlyOriginChannel,
accountId: deliveryTarget.deliver
? deliveryTarget.accountId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.accountId
: undefined,
to: deliveryTarget.deliver
? deliveryTarget.to
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.to
: undefined,
threadId: deliveryTarget.deliver
? deliveryTarget.threadId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.threadId
: undefined,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
sourceTool: params.sourceTool ?? "subagent_announce",
const threadCompletionFallbackText =
params.expectsCompletionMessage && deliveryTarget.deliver && deliveryTarget.threadId
? extractThreadCompletionFallbackText(params.internalEvents)
: "";
let directAnnounceResponse: unknown;
try {
directAnnounceResponse = await runAnnounceDeliveryWithRetry({
operation: params.expectsCompletionMessage
? "completion direct announce agent call"
: "direct announce agent call",
signal: params.signal,
run: async () =>
await subagentAnnounceDeliveryDeps.callGateway({
method: "agent",
params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: deliveryTarget.deliver,
bestEffortDeliver: params.bestEffortDeliver,
internalEvents: params.internalEvents,
channel: deliveryTarget.deliver ? deliveryTarget.channel : sessionOnlyOriginChannel,
accountId: deliveryTarget.deliver
? deliveryTarget.accountId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.accountId
: undefined,
to: deliveryTarget.deliver
? deliveryTarget.to
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.to
: undefined,
threadId: deliveryTarget.deliver
? deliveryTarget.threadId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.threadId
: undefined,
inputProvenance: {
kind: "inter_session",
sourceSessionKey: params.sourceSessionKey,
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
sourceTool: params.sourceTool ?? "subagent_announce",
},
idempotencyKey: params.directIdempotencyKey,
},
idempotencyKey: params.directIdempotencyKey,
},
expectFinal: true,
timeoutMs: announceTimeoutMs,
}),
});
expectFinal: true,
timeoutMs: announceTimeoutMs,
}),
});
} catch (err) {
const didFallback = await sendThreadCompletionFallback({
cfg,
channel: deliveryTarget.channel,
to: deliveryTarget.to,
accountId: deliveryTarget.accountId,
threadId: deliveryTarget.threadId,
content: threadCompletionFallbackText,
requesterSessionKey: canonicalRequesterSessionKey,
bestEffortDeliver: params.bestEffortDeliver,
idempotencyKey: params.directIdempotencyKey,
signal: params.signal,
});
if (didFallback) {
return {
delivered: true,
path: "direct-thread-fallback",
};
}
throw err;
}
if (threadCompletionFallbackText && !hasVisibleGatewayAgentPayload(directAnnounceResponse)) {
const didFallback = await sendThreadCompletionFallback({
cfg,
channel: deliveryTarget.channel,
to: deliveryTarget.to,
accountId: deliveryTarget.accountId,
threadId: deliveryTarget.threadId,
content: threadCompletionFallbackText,
requesterSessionKey: canonicalRequesterSessionKey,
bestEffortDeliver: params.bestEffortDeliver,
idempotencyKey: params.directIdempotencyKey,
signal: params.signal,
});
if (didFallback) {
return {
delivered: true,
path: "direct-thread-fallback",
};
}
}
return {
delivered: true,

View File

@@ -1,4 +1,9 @@
export type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
export type SubagentDeliveryPath =
| "queued"
| "steered"
| "direct"
| "direct-thread-fallback"
| "none";
export type SubagentAnnounceQueueOutcome = "steered" | "queued" | "none" | "dropped";