fix(agents): mediate async media completions

This commit is contained in:
Peter Steinberger
2026-05-05 01:29:59 +01:00
parent 48ff390953
commit 2d8fa23447
16 changed files with 272 additions and 142 deletions

View File

@@ -60,6 +60,7 @@ async function deliverSlackThreadAnnouncement(params: {
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
sourceTool?: string;
}) {
__testing.setDepsForTest({
callGateway: params.callGateway,
@@ -88,6 +89,7 @@ async function deliverSlackThreadAnnouncement(params: {
bestEffortDeliver: true,
directIdempotencyKey: params.directIdempotencyKey,
internalEvents: params.internalEvents,
sourceTool: params.sourceTool,
});
}
@@ -95,6 +97,7 @@ async function deliverDiscordDirectMessageCompletion(params: {
callGateway: typeof runtimeCallGateway;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
sourceTool?: string;
}) {
const origin = {
channel: "discord",
@@ -125,6 +128,7 @@ async function deliverDiscordDirectMessageCompletion(params: {
bestEffortDeliver: true,
directIdempotencyKey: "announce-dm-fallback-empty",
internalEvents: params.internalEvents,
sourceTool: params.sourceTool,
});
}
@@ -185,6 +189,7 @@ async function deliverSlackChannelAnnouncement(params: {
queueEmbeddedPiMessage?: (sessionId: string, message: string) => boolean;
sendMessage?: typeof runtimeSendMessage;
internalEvents?: AgentInternalEvent[];
sourceTool?: string;
}) {
const origin = {
channel: "slack",
@@ -219,6 +224,7 @@ async function deliverSlackChannelAnnouncement(params: {
bestEffortDeliver: true,
directIdempotencyKey: params.directIdempotencyKey,
internalEvents: params.internalEvents,
sourceTool: params.sourceTool,
});
}
@@ -1141,6 +1147,119 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
expect(sendMessage).not.toHaveBeenCalled();
});
it("delivers generated media completions through the announce agent in automatic DMs", async () => {
const callGateway = createGatewayMock({
result: {
payloads: [
{
text: "The track is ready.",
mediaUrls: ["/tmp/generated-night-drive.mp3"],
},
],
},
});
const sendMessage = createSendMessageMock();
const result = await deliverDiscordDirectMessageCompletion({
callGateway,
sendMessage,
sourceTool: "music_generate",
internalEvents: [
{
type: "task_completion",
source: "music_generation",
childSessionKey: "music_generate:task-123",
childSessionId: "task-123",
announceType: "music generation task",
taskLabel: "night-drive synthwave",
status: "ok",
statusLabel: "completed successfully",
result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3",
mediaUrls: ["/tmp/generated-night-drive.mp3"],
replyInstruction:
"Tell the user the music is ready. If visible source delivery requires the message tool, send it there with the generated media attached.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: true,
path: "direct",
}),
);
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "agent",
params: expect.objectContaining({
deliver: true,
channel: "discord",
accountId: "acct-1",
to: "dm:U123",
threadId: undefined,
}),
}),
);
expect(sendMessage).not.toHaveBeenCalled();
});
it("requires message-tool delivery for generated media completions in default group routes", async () => {
const callGateway = createGatewayMock({
result: {
payloads: [
{
text: "The track is ready.",
},
],
},
});
const sendMessage = createSendMessageMock();
const result = await deliverSlackChannelAnnouncement({
callGateway,
sendMessage,
sessionId: "requester-session-channel",
isActive: false,
expectsCompletionMessage: true,
directIdempotencyKey: "announce-channel-media-message-tool",
sourceTool: "music_generate",
internalEvents: [
{
type: "task_completion",
source: "music_generation",
childSessionKey: "music_generate:task-123",
childSessionId: "task-123",
announceType: "music generation task",
taskLabel: "night-drive synthwave",
status: "ok",
statusLabel: "completed successfully",
result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3",
mediaUrls: ["/tmp/generated-night-drive.mp3"],
replyInstruction:
"Tell the user the music is ready. If visible source delivery requires the message tool, send it there with the generated media attached.",
},
],
});
expect(result).toEqual(
expect.objectContaining({
delivered: false,
path: "direct",
}),
);
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "agent",
params: expect.objectContaining({
deliver: false,
channel: "slack",
accountId: "acct-1",
to: "channel:C123",
threadId: undefined,
}),
}),
);
expect(sendMessage).not.toHaveBeenCalled();
});
it("uses a direct channel fallback when announce-agent returns no visible output", async () => {
const callGateway = createGatewayMock({
result: {

View File

@@ -1,8 +1,10 @@
import { normalizeChatType } from "../channels/chat-type.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { ConversationRef } from "../infra/outbound/session-binding-service.js";
import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js";
import { normalizeAccountId } from "../routing/session-key.js";
import { defaultRuntime } from "../runtime.js";
import { deriveSessionChatTypeFromKey } from "../sessions/session-chat-type-shared.js";
import { isCronSessionKey } from "../sessions/session-key-utils.js";
import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js";
import {
@@ -57,6 +59,7 @@ const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000;
const MIN_COMPLETION_INTEGRITY_RESULT_LENGTH = 120;
const MIN_COMPLETION_INTEGRITY_PREFIX_LENGTH = 24;
const MAX_COMPLETION_INTEGRITY_PREFIX_RATIO = 0.8;
const AGENT_MEDIATED_COMPLETION_TOOLS = new Set(["music_generate", "video_generate"]);
type SubagentAnnounceDeliveryDeps = {
callGateway: typeof callGateway;
@@ -671,6 +674,76 @@ function shouldSendCompletionFallback(response: unknown, completionFallbackText:
return hasIncompleteCompletionPrefix(response, completionFallbackText);
}
function requiresAgentMediatedCompletionDelivery(params: {
expectsCompletionMessage: boolean;
sourceTool?: string;
}): boolean {
return (
params.expectsCompletionMessage &&
AGENT_MEDIATED_COMPLETION_TOOLS.has(normalizeOptionalLowercaseString(params.sourceTool) ?? "")
);
}
function hasGatewayAgentMessagingToolDelivery(response: unknown): boolean {
const result = getGatewayAgentResult(response);
return Boolean(result && hasMessagingToolDeliveryEvidence(result));
}
function inferCompletionChatType(params: {
requesterSessionKey: string;
targetRequesterSessionKey: string;
requesterEntry?: {
chatType?: string | null;
origin?: { chatType?: string | null };
};
directOrigin?: DeliveryContext;
requesterSessionOrigin?: DeliveryContext;
}): "direct" | "group" | "channel" | "unknown" {
const explicit = normalizeChatType(
params.requesterEntry?.chatType ?? params.requesterEntry?.origin?.chatType ?? undefined,
);
if (explicit) {
return explicit;
}
for (const key of [params.targetRequesterSessionKey, params.requesterSessionKey]) {
const derived = deriveSessionChatTypeFromKey(key);
if (derived !== "unknown") {
return derived;
}
}
const target = params.directOrigin?.to ?? params.requesterSessionOrigin?.to;
if (target?.startsWith("group:")) {
return "group";
}
if (target?.startsWith("channel:")) {
return "channel";
}
if (target?.startsWith("dm:")) {
return "direct";
}
return "unknown";
}
function completionRequiresMessageToolDelivery(params: {
cfg: OpenClawConfig;
requesterSessionKey: string;
targetRequesterSessionKey: string;
requesterEntry?: {
chatType?: string | null;
origin?: { chatType?: string | null };
};
directOrigin?: DeliveryContext;
requesterSessionOrigin?: DeliveryContext;
}): boolean {
const chatType = inferCompletionChatType(params);
if (chatType === "group" || chatType === "channel") {
const configuredMode =
params.cfg.messages?.groupChat?.visibleReplies ?? params.cfg.messages?.visibleReplies;
return configuredMode !== "automatic";
}
return params.cfg.messages?.visibleReplies === "message_tool";
}
async function sendCompletionFallback(params: {
cfg: OpenClawConfig;
channel?: string;
@@ -731,6 +804,7 @@ function stripNonDeliverableChannelForCompletionOrigin(
}
async function sendSubagentAnnounceDirectly(params: {
requesterSessionKey: string;
targetRequesterSessionKey: string;
triggerMessage: string;
internalEvents?: AgentInternalEvent[];
@@ -778,6 +852,7 @@ async function sendSubagentAnnounceDirectly(params: {
const sessionOnlyOrigin = effectiveDirectOrigin?.channel
? effectiveDirectOrigin
: requesterSessionOrigin;
const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
const deliveryTarget = !params.requesterIsSubagent
? resolveExternalBestEffortDeliveryTarget({
channel: effectiveDirectOrigin?.channel,
@@ -794,12 +869,26 @@ async function sendSubagentAnnounceDirectly(params: {
isGatewayMessageChannel(normalizedSessionOnlyOriginChannel)
? normalizedSessionOnlyOriginChannel
: undefined;
const agentMediatedCompletion = requiresAgentMediatedCompletionDelivery({
expectsCompletionMessage: params.expectsCompletionMessage,
sourceTool: params.sourceTool,
});
const requiresMessageToolDelivery =
agentMediatedCompletion &&
completionRequiresMessageToolDelivery({
cfg,
requesterSessionKey: params.requesterSessionKey,
targetRequesterSessionKey: params.targetRequesterSessionKey,
requesterEntry,
directOrigin: effectiveDirectOrigin,
requesterSessionOrigin,
});
const shouldDeliverAgentFinal = deliveryTarget.deliver && !requiresMessageToolDelivery;
const completionFallbackText =
params.expectsCompletionMessage && deliveryTarget.deliver
params.expectsCompletionMessage && shouldDeliverAgentFinal && !agentMediatedCompletion
? extractThreadCompletionFallbackText(params.internalEvents)
: "";
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
const requesterEntry = loadRequesterSessionEntry(params.targetRequesterSessionKey).entry;
const requesterQueueSettings = resolveQueueSettings({
cfg,
channel:
@@ -830,6 +919,13 @@ async function sendSubagentAnnounceDirectly(params: {
};
}
if (requesterActivity.isActive) {
if (agentMediatedCompletion) {
return {
delivered: false,
path: "direct",
error: "active requester session could not be woken",
};
}
try {
const didFallback = await sendCompletionFallback({
cfg,
@@ -882,21 +978,21 @@ async function sendSubagentAnnounceDirectly(params: {
params: {
sessionKey: canonicalRequesterSessionKey,
message: params.triggerMessage,
deliver: deliveryTarget.deliver,
deliver: shouldDeliverAgentFinal,
bestEffortDeliver: params.bestEffortDeliver,
internalEvents: params.internalEvents,
channel: deliveryTarget.deliver ? deliveryTarget.channel : sessionOnlyOriginChannel,
accountId: deliveryTarget.deliver
channel: shouldDeliverAgentFinal ? deliveryTarget.channel : sessionOnlyOriginChannel,
accountId: shouldDeliverAgentFinal
? deliveryTarget.accountId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.accountId
: undefined,
to: deliveryTarget.deliver
to: shouldDeliverAgentFinal
? deliveryTarget.to
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.to
: undefined,
threadId: deliveryTarget.deliver
threadId: shouldDeliverAgentFinal
? deliveryTarget.threadId
: sessionOnlyOriginChannel
? sessionOnlyOrigin?.threadId
@@ -917,6 +1013,9 @@ async function sendSubagentAnnounceDirectly(params: {
if (isPermanentAnnounceDeliveryError(err)) {
throw err;
}
if (agentMediatedCompletion) {
throw err;
}
let didFallback = false;
try {
didFallback = await sendCompletionFallback({
@@ -967,6 +1066,28 @@ async function sendSubagentAnnounceDirectly(params: {
}
}
if (
requiresMessageToolDelivery &&
!hasGatewayAgentMessagingToolDelivery(directAnnounceResponse)
) {
return {
delivered: false,
path: "direct",
error: "completion agent did not deliver through the message tool",
};
}
if (
agentMediatedCompletion &&
shouldDeliverAgentFinal &&
!hasVisibleGatewayAgentPayload(directAnnounceResponse)
) {
return {
delivered: false,
path: "direct",
error: "completion agent did not produce a visible reply",
};
}
return {
delivered: true,
path: "direct",
@@ -1020,6 +1141,7 @@ export async function deliverSubagentAnnouncement(params: {
}),
direct: async () =>
await sendSubagentAnnounceDirectly({
requesterSessionKey: params.requesterSessionKey,
targetRequesterSessionKey: params.targetRequesterSessionKey,
triggerMessage: params.triggerMessage,
internalEvents: params.internalEvents,

View File

@@ -1,18 +1,14 @@
import crypto from "node:crypto";
import { parseReplyDirectives } from "../../auto-reply/reply/reply-directives.js";
import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { clearAgentRunContext, registerAgentRunContext } from "../../infra/agent-events.js";
import { formatErrorMessage } from "../../infra/errors.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import {
completeTaskRunByRunId,
createRunningTaskRun,
failTaskRunByRunId,
recordTaskRunProgressByRunId,
} from "../../tasks/detached-task-runtime.js";
import { sendMessage } from "../../tasks/task-registry-delivery-runtime.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
import { INTERNAL_MESSAGE_CHANNEL } from "../../utils/message-channel.js";
import { formatAgentInternalEventsForPrompt, type AgentInternalEvent } from "../internal-events.js";
@@ -65,8 +61,6 @@ type WakeMediaGenerationTaskCompletionParams = {
statsLine?: string;
};
type MediaGenerationDirectCompletionDelivery = "config" | "disabled";
function touchMediaGenerationTaskRunContext(handle: MediaGenerationTaskHandle) {
registerAgentRunContext(handle.runId, {
sessionKey: handle.requesterSessionKey,
@@ -230,12 +224,7 @@ function buildMediaGenerationReplyInstruction(params: {
completionLabel: string;
}) {
if (params.status === "ok") {
return [
`A completed ${params.completionLabel} generation task is ready for user delivery.`,
`Prefer the message tool for delivery: use action="send" to the current/original chat, put your user-facing caption in message, attach each generated file with path/filePath using the exact path from the result, then reply ONLY: ${SILENT_REPLY_TOKEN}.`,
`If you cannot use the message tool, reply in your normal assistant voice and include the exact MEDIA: lines from the result so OpenClaw attaches the finished ${params.completionLabel}.`,
"Keep internal task/session details private and do not copy the internal event text verbatim.",
].join(" ");
return `Tell the user the ${params.completionLabel} is ready. If visible source delivery requires the message tool, send it there with the generated media attached.`;
}
return [
`${params.completionLabel[0]?.toUpperCase() ?? "T"}${params.completionLabel.slice(1)} generation task failed.`,
@@ -244,54 +233,6 @@ function buildMediaGenerationReplyInstruction(params: {
].join(" ");
}
function isAsyncMediaDirectSendEnabled(params: {
config: OpenClawConfig | undefined;
directCompletionDelivery: MediaGenerationDirectCompletionDelivery;
}): boolean {
if (params.directCompletionDelivery === "disabled") {
return false;
}
return params.config?.tools?.media?.asyncCompletion?.directSend === true;
}
async function maybeDeliverMediaGenerationResultDirectly(params: {
handle: MediaGenerationTaskHandle;
status: "ok" | "error";
result: string;
idempotencyKey: string;
}): Promise<boolean> {
const origin = params.handle.requesterOrigin;
const channel = origin?.channel?.trim();
const to = origin?.to?.trim();
if (!channel || !to) {
return false;
}
const parsed = parseReplyDirectives(params.result);
const content = parsed.text.trim();
const mediaUrls = parsed.mediaUrls?.filter((entry) => entry.trim().length > 0);
const requesterAgentId = parseAgentSessionKey(params.handle.requesterSessionKey)?.agentId;
await sendMessage({
channel,
to,
accountId: origin?.accountId,
threadId: origin?.threadId,
content:
content ||
(params.status === "ok"
? `Finished ${params.handle.taskLabel}.`
: "Background media generation failed."),
...(mediaUrls?.length ? { mediaUrls } : {}),
agentId: requesterAgentId,
idempotencyKey: params.idempotencyKey,
mirror: {
sessionKey: params.handle.requesterSessionKey,
agentId: requesterAgentId,
idempotencyKey: params.idempotencyKey,
},
});
return true;
}
async function wakeMediaGenerationTaskCompletion(params: {
config?: OpenClawConfig;
handle: MediaGenerationTaskHandle | null;
@@ -304,37 +245,11 @@ async function wakeMediaGenerationTaskCompletion(params: {
announceType: string;
toolName: string;
completionLabel: string;
directCompletionDelivery: MediaGenerationDirectCompletionDelivery;
}) {
if (!params.handle) {
return;
}
const announceId = `${params.toolName}:${params.handle.taskId}:${params.status}`;
if (
isAsyncMediaDirectSendEnabled({
config: params.config,
directCompletionDelivery: params.directCompletionDelivery,
})
) {
try {
const deliveredDirect = await maybeDeliverMediaGenerationResultDirectly({
handle: params.handle,
status: params.status,
result: params.result,
idempotencyKey: announceId,
});
if (deliveredDirect) {
return;
}
} catch (error) {
log.warn("Media generation direct completion delivery failed; falling back to announce", {
taskId: params.handle.taskId,
runId: params.handle.runId,
toolName: params.toolName,
error,
});
}
}
const internalEvents: AgentInternalEvent[] = [
{
type: "task_completion",
@@ -397,7 +312,6 @@ export function createMediaGenerationTaskLifecycle(params: {
eventSource: AgentInternalEvent["source"];
announceType: string;
completionLabel: string;
directCompletionDelivery?: MediaGenerationDirectCompletionDelivery;
}) {
return {
createTaskRun(runParams: CreateMediaGenerationTaskRunParams): MediaGenerationTaskHandle | null {
@@ -435,7 +349,6 @@ export function createMediaGenerationTaskLifecycle(params: {
announceType: params.announceType,
toolName: params.toolName,
completionLabel: params.completionLabel,
directCompletionDelivery: params.directCompletionDelivery ?? "config",
});
},
};

View File

@@ -191,7 +191,7 @@ export function expectFallbackMediaAnnouncement({
status: "ok",
result: expect.stringContaining(resultMediaPath),
mediaUrls,
replyInstruction: expect.stringContaining("Prefer the message tool for delivery"),
replyInstruction: expect.stringContaining("Tell the user"),
}),
]),
}),

View File

@@ -17,7 +17,6 @@ const musicGenerationTaskLifecycle = createMediaGenerationTaskLifecycle({
eventSource: "music_generation",
announceType: "music generation task",
completionLabel: "music",
directCompletionDelivery: "disabled",
});
export const createMusicGenerationTaskRun = (

View File

@@ -4,7 +4,6 @@ import { VIDEO_GENERATION_TASK_KIND } from "../video-generation-task-status.js";
import {
announceDeliveryMocks,
createMediaCompletionFixture,
expectDirectMediaSend,
expectFallbackMediaAnnouncement,
expectQueuedTaskRun,
expectRecordedTaskProgress,
@@ -175,34 +174,7 @@ describe("video generate background helpers", () => {
expect(announceDeliveryMocks.deliverSubagentAnnouncement).toHaveBeenCalled();
});
it("delivers completed video directly to the requester channel when enabled", async () => {
taskDeliveryRuntimeMocks.sendMessage.mockResolvedValue({
channel: "discord",
messageId: "msg-1",
});
await wakeVideoGenerationTaskCompletion({
...createMediaCompletionFixture({
directSend: true,
runId: "tool:video_generate:abc",
taskLabel: "friendly lobster surfing",
result: "Generated 1 video.\nMEDIA:/tmp/generated-lobster.mp4",
}),
});
expectDirectMediaSend({
sendMessageMock: taskDeliveryRuntimeMocks.sendMessage,
channel: "discord",
to: "channel:1",
threadId: "thread-1",
content: "Generated 1 video.",
mediaUrls: ["/tmp/generated-lobster.mp4"],
});
expect(announceDeliveryMocks.deliverSubagentAnnouncement).not.toHaveBeenCalled();
});
it("falls back to a video-generation completion event when direct delivery fails", async () => {
taskDeliveryRuntimeMocks.sendMessage.mockRejectedValue(new Error("discord upload failed"));
it("keeps completed video agent-mediated even when direct send is enabled", async () => {
announceDeliveryMocks.deliverSubagentAnnouncement.mockResolvedValue({
delivered: true,
path: "direct",
@@ -218,6 +190,7 @@ describe("video generate background helpers", () => {
}),
});
expect(taskDeliveryRuntimeMocks.sendMessage).not.toHaveBeenCalled();
expectFallbackMediaAnnouncement({
deliverAnnouncementMock: announceDeliveryMocks.deliverSubagentAnnouncement,
requesterSessionKey: "agent:main:discord:direct:123",

View File

@@ -10202,7 +10202,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
type: "boolean",
title: "Async Media Completion Direct Send",
description:
"Enable direct channel sends for completed async media generation tasks that support direct completion delivery. Currently this applies to video generation; music generation always stays requester-session mediated. Default off so detached media completion uses the requester session wake path.",
"Deprecated compatibility flag. Async media generation completions are requester-session mediated so the agent can decide how to tell the user and use the message tool when source delivery requires it.",
},
},
additionalProperties: false,
@@ -25350,7 +25350,7 @@ export const GENERATED_BASE_CONFIG_SCHEMA: BaseConfigSchemaResponse = {
},
"tools.media.asyncCompletion.directSend": {
label: "Async Media Completion Direct Send",
help: "Enable direct channel sends for completed async media generation tasks that support direct completion delivery. Currently this applies to video generation; music generation always stays requester-session mediated. Default off so detached media completion uses the requester session wake path.",
help: "Deprecated compatibility flag. Async media generation completions are requester-session mediated so the agent can decide how to tell the user and use the message tool when source delivery requires it.",
tags: ["storage", "media", "tools"],
},
"tools.media.audio.enabled": {

View File

@@ -700,7 +700,7 @@ export const FIELD_HELP: Record<string, string> = {
"tools.media.concurrency":
"Maximum number of concurrent media understanding operations per turn across image, audio, and video tasks. Lower this in resource-constrained deployments to prevent CPU/network saturation.",
"tools.media.asyncCompletion.directSend":
"Enable direct channel sends for completed async media generation tasks that support direct completion delivery. Currently this applies to video generation; music generation always stays requester-session mediated. Default off so detached media completion uses the requester session wake path.",
"Deprecated compatibility flag. Async media generation completions are requester-session mediated so the agent can decide how to tell the user and use the message tool when source delivery requires it.",
"tools.media.image.enabled":
"Enable image understanding so attached or referenced images can be interpreted into textual context. Disable if you need text-only operation or want to avoid image-processing cost.",
"tools.media.image.maxBytes":

View File

@@ -145,9 +145,8 @@ export type MediaToolsConfig = {
concurrency?: number;
asyncCompletion?: {
/**
* Enable direct channel sends for async media generation tasks that support
* direct completion delivery. Music generation stays requester-session mediated.
* Default: false.
* Deprecated compatibility flag. Async media generation completions stay
* requester-session mediated so source delivery policy remains agent-owned.
*/
directSend?: boolean;
};