fix: preserve media completion message-tool delivery (#82206)

* fix: preserve message-tool media completion delivery

* chore: update generated protocol models
This commit is contained in:
Peter Steinberger
2026-05-15 16:49:52 +01:00
committed by GitHub
parent 29b5563ccd
commit c6ddb1afb7
13 changed files with 165 additions and 4 deletions

View File

@@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Agents/media: preserve message-tool-only delivery for generated music and video completion handoffs, so group/channel completions do not finish without posting the generated attachment.
- LINE: acknowledge signed webhook events before agent processing so slow model replies do not cause LINE `request_timeout` delivery failures. Fixes #65375. Thanks @myericho.
- TTS: preserve channel-derived voice-note delivery for `/tts audio` replies even when the provider output is not natively voice-compatible. (#82174) Thanks @xuruiray.
- Codex/Lossless: keep Codex explicit compaction on native app-server threads while allowing Lossless through the context-engine slot; `openclaw doctor --fix` now migrates legacy `compaction.provider: "lossless-claw"` config to `plugins.slots.contextEngine`.

View File

@@ -751,6 +751,7 @@ public struct AgentParams: Codable, Sendable {
public let internalruntimehandoffid: String?
public let internalevents: [[String: AnyCodable]]?
public let inputprovenance: [String: AnyCodable]?
public let sourcereplydeliverymode: AnyCodable?
public let voicewaketrigger: String?
public let idempotencykey: String
public let label: String?
@@ -788,6 +789,7 @@ public struct AgentParams: Codable, Sendable {
internalruntimehandoffid: String?,
internalevents: [[String: AnyCodable]]?,
inputprovenance: [String: AnyCodable]?,
sourcereplydeliverymode: AnyCodable?,
voicewaketrigger: String?,
idempotencykey: String,
label: String?)
@@ -824,6 +826,7 @@ public struct AgentParams: Codable, Sendable {
self.internalruntimehandoffid = internalruntimehandoffid
self.internalevents = internalevents
self.inputprovenance = inputprovenance
self.sourcereplydeliverymode = sourcereplydeliverymode
self.voicewaketrigger = voicewaketrigger
self.idempotencykey = idempotencykey
self.label = label
@@ -862,6 +865,7 @@ public struct AgentParams: Codable, Sendable {
case internalruntimehandoffid = "internalRuntimeHandoffId"
case internalevents = "internalEvents"
case inputprovenance = "inputProvenance"
case sourcereplydeliverymode = "sourceReplyDeliveryMode"
case voicewaketrigger = "voiceWakeTrigger"
case idempotencykey = "idempotencyKey"
case label

View File

@@ -630,6 +630,7 @@ export function runAgentAttempt(params: {
toolsAllow: params.opts.toolsAllow,
internalEvents: params.opts.internalEvents,
inputProvenance: params.opts.inputProvenance,
sourceReplyDeliveryMode: params.opts.sourceReplyDeliveryMode,
streamParams: params.opts.streamParams,
agentDir: params.agentDir,
allowTransientCooldownProbe: params.allowTransientCooldownProbe,

View File

@@ -1,6 +1,7 @@
import type { AgentInternalEvent } from "../../agents/internal-events.js";
import type { SpawnedRunMetadata } from "../../agents/spawned-context.js";
import type { PromptMode } from "../../agents/system-prompt.types.js";
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.public.js";
import type { PromptImageOrderEntry } from "../../media/prompt-image-order.js";
import type { InputProvenance } from "../../sessions/input-provenance.js";
@@ -105,6 +106,8 @@ export type AgentCommandOpts = {
bootstrapContextRunKind?: "default" | "heartbeat" | "cron";
internalEvents?: AgentInternalEvent[];
inputProvenance?: InputProvenance;
/** Visible source replies must be sent through the message tool when set. */
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
/** Per-call stream param overrides (best-effort). */
streamParams?: AgentStreamParams;
/** Explicit workspace directory override (for subagents to inherit parent workspace). */

View File

@@ -1,3 +1,4 @@
import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js";
import {
getActiveReplyRunCount,
listActiveReplyRunSessionIds,
@@ -11,11 +12,13 @@ export type EmbeddedPiQueueHandle = {
isCompacting: () => boolean;
cancel?: (reason?: "user_abort" | "restart" | "superseded") => void;
abort: () => void;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
};
export type EmbeddedPiQueueMessageOptions = {
steeringMode?: "all";
debounceMs?: number;
sourceReplyDeliveryMode?: SourceReplyDeliveryMode;
};
export type ActiveEmbeddedRunSnapshot = {

View File

@@ -2902,6 +2902,7 @@ export async function runEmbeddedAttempt(
},
isStreaming: () => activeSession.isStreaming,
isCompacting: () => subscription.isCompacting(),
sourceReplyDeliveryMode: params.sourceReplyDeliveryMode,
cancel: () => {
abortRun();
},

View File

@@ -73,16 +73,46 @@ describe("pi-embedded runner run registry", () => {
const queueMessage = vi.fn(async () => {});
setActiveEmbeddedRun("session-steer", {
...createRunHandle(),
sourceReplyDeliveryMode: "message_tool_only",
queueMessage,
});
expect(
queueEmbeddedPiMessageWithOutcome("session-steer", "continue", {
steeringMode: "all",
sourceReplyDeliveryMode: "message_tool_only",
}).queued,
).toBe(true);
expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "all" });
expect(queueMessage).toHaveBeenCalledWith("continue", {
steeringMode: "all",
sourceReplyDeliveryMode: "message_tool_only",
});
});
it("rejects message-tool-only steering for active runs created without that mode", () => {
const queueMessage = vi.fn(async () => {});
setActiveEmbeddedRun("session-automatic-source-reply", {
...createRunHandle(),
queueMessage,
});
const outcome = queueEmbeddedPiMessageWithOutcome(
"session-automatic-source-reply",
"continue",
{
steeringMode: "all",
sourceReplyDeliveryMode: "message_tool_only",
},
);
expect(outcome).toEqual({
queued: false,
sessionId: "session-automatic-source-reply",
reason: "source_reply_delivery_mode_mismatch",
gatewayHealth: "live",
});
expect(queueMessage).not.toHaveBeenCalled();
});
it("defaults active embedded steering to all pending messages", () => {

View File

@@ -44,6 +44,7 @@ export type EmbeddedPiQueueFailureReason =
| "no_active_run"
| "not_streaming"
| "compacting"
| "source_reply_delivery_mode_mismatch"
| "runtime_rejected";
export type EmbeddedPiQueueMessageOutcome =
@@ -140,7 +141,7 @@ export function queueEmbeddedPiMessageWithOutcome(
text: string,
options?: EmbeddedPiQueueMessageOptions,
): EmbeddedPiQueueMessageOutcome {
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text);
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text, options);
if (prepared.kind === "complete") {
return prepared.outcome;
}
@@ -169,7 +170,7 @@ export async function queueEmbeddedPiMessageWithOutcomeAsync(
text: string,
options?: EmbeddedPiQueueMessageOptions,
): Promise<EmbeddedPiQueueMessageOutcome> {
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text);
const prepared = prepareEmbeddedPiQueueMessage(sessionId, text, options);
if (prepared.kind === "complete") {
return prepared.outcome;
}
@@ -192,6 +193,7 @@ export async function queueEmbeddedPiMessageWithOutcomeAsync(
function prepareEmbeddedPiQueueMessage(
sessionId: string,
text: string,
options?: EmbeddedPiQueueMessageOptions,
): PreparedEmbeddedPiQueueMessage {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
@@ -219,6 +221,18 @@ function prepareEmbeddedPiQueueMessage(
diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`);
return { kind: "complete", outcome: createQueueFailureOutcome(sessionId, "compacting") };
}
if (
options?.sourceReplyDeliveryMode === "message_tool_only" &&
handle.sourceReplyDeliveryMode !== "message_tool_only"
) {
diag.debug(
`queue message failed: sessionId=${sessionId} reason=source_reply_delivery_mode_mismatch`,
);
return {
kind: "complete",
outcome: createQueueFailureOutcome(sessionId, "source_reply_delivery_mode_mismatch"),
};
}
return { kind: "embedded_run", handle };
}

View File

@@ -1411,6 +1411,80 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
accountId: "acct-1",
to: "channel:C123",
threadId: undefined,
sourceReplyDeliveryMode: "message_tool_only",
});
expect(sendMessage).not.toHaveBeenCalled();
});
it("falls back to a forced message-tool handoff when the active requester run cannot accept one", async () => {
const callGateway = createGatewayMock({
result: {
payloads: [],
messagingToolSentTargets: [
{
tool: "message",
provider: "slack",
accountId: "acct-1",
to: "channel:C123",
text: "The track is ready.",
mediaUrls: ["/tmp/generated-night-drive.mp3"],
},
],
},
});
const queueEmbeddedPiMessageWithOutcome = vi.fn((sessionId: string) => ({
queued: false as const,
sessionId,
reason: "source_reply_delivery_mode_mismatch" as const,
gatewayHealth: "live" as const,
}));
const sendMessage = createSendMessageMock();
const result = await deliverSlackChannelAnnouncement({
callGateway,
sendMessage,
sessionId: "requester-session-channel",
isActive: true,
expectsCompletionMessage: true,
directIdempotencyKey: "announce-channel-media-message-tool-active-mismatch",
sourceTool: "music_generate",
queueEmbeddedPiMessageWithOutcome,
internalEvents: [
{
type: "task_completion",
source: "music_generation",
childSessionKey: "music_generate:task-123",
childSessionId: "task-123",
announceType: "music generation task",
taskLabel: "night-drive synthwave",
status: "ok",
statusLabel: "completed successfully",
result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3",
mediaUrls: ["/tmp/generated-night-drive.mp3"],
replyInstruction:
"Tell the user the music is ready. If visible source delivery requires the message tool, send it there with the generated media attached.",
},
],
});
expectRecordFields(result, {
delivered: true,
path: "direct",
});
expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith(
"requester-session-channel",
"child done",
expect.objectContaining({
steeringMode: "all",
sourceReplyDeliveryMode: "message_tool_only",
}),
);
expectGatewayAgentParams(callGateway, {
deliver: false,
channel: "slack",
accountId: "acct-1",
to: "channel:C123",
threadId: undefined,
sourceReplyDeliveryMode: "message_tool_only",
});
expect(sendMessage).not.toHaveBeenCalled();
});
@@ -1478,6 +1552,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => {
accountId: "acct-1",
to: origin.to,
threadId: undefined,
sourceReplyDeliveryMode: "message_tool_only",
});
expect(sendMessage).not.toHaveBeenCalled();
},

View File

@@ -640,6 +640,9 @@ async function sendSubagentAnnounceDirectly(params: {
directOrigin: effectiveDirectOrigin,
requesterSessionOrigin,
});
const completionSourceReplyDeliveryMode = requiresMessageToolDelivery
? "message_tool_only"
: undefined;
const shouldDeliverAgentFinal = deliveryTarget.deliver && !requiresMessageToolDelivery;
const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey);
const requesterQueueSettings = resolveQueueSettings({
@@ -658,6 +661,9 @@ async function sendSubagentAnnounceDirectly(params: {
params.triggerMessage,
{
steeringMode: "all",
...(completionSourceReplyDeliveryMode
? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode }
: {}),
...(requesterQueueSettings.debounceMs !== undefined
? { debounceMs: requesterQueueSettings.debounceMs }
: {}),
@@ -669,7 +675,9 @@ async function sendSubagentAnnounceDirectly(params: {
path: "steered",
};
}
if (requesterActivity.isActive) {
const shouldFallbackToForcedAgentHandoff =
requiresMessageToolDelivery && wakeOutcome.reason === "source_reply_delivery_mode_mismatch";
if (requesterActivity.isActive && !shouldFallbackToForcedAgentHandoff) {
// Active requester sessions should receive completion data through their
// running agent turn. If wake fails, let the dispatch layer steer/retry;
// do not bypass the requester agent with raw child output.
@@ -717,6 +725,9 @@ async function sendSubagentAnnounceDirectly(params: {
sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL,
sourceTool: params.sourceTool ?? "subagent_announce",
},
...(completionSourceReplyDeliveryMode
? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode }
: {}),
idempotencyKey: params.directIdempotencyKey,
};
let directAnnounceResponse: unknown;

View File

@@ -179,6 +179,9 @@ export const AgentParamsSchema = Type.Object(
internalRuntimeHandoffId: Type.Optional(NonEmptyString),
internalEvents: Type.Optional(Type.Array(AgentInternalEventSchema)),
inputProvenance: Type.Optional(InputProvenanceSchema),
sourceReplyDeliveryMode: Type.Optional(
Type.Union([Type.Literal("automatic"), Type.Literal("message_tool_only")]),
),
voiceWakeTrigger: Type.Optional(Type.String()),
idempotencyKey: NonEmptyString,
label: Type.Optional(SessionLabelString),

View File

@@ -599,6 +599,7 @@ export const agentHandlers: GatewayRequestHandlers = {
internalRuntimeHandoffId?: string;
internalEvents?: AgentInternalEvent[];
idempotencyKey: string;
sourceReplyDeliveryMode?: "automatic" | "message_tool_only";
timeout?: number;
bestEffortDeliver?: boolean;
cleanupBundleMcpOnRunEnd?: boolean;
@@ -1490,6 +1491,7 @@ export const agentHandlers: GatewayRequestHandlers = {
acpTurnSource: request.acpTurnSource,
internalEvents: request.internalEvents,
inputProvenance,
sourceReplyDeliveryMode: request.sourceReplyDeliveryMode,
suppressPromptPersistence: shouldSuppressAgentPromptPersistence({
inputProvenance,
internalEvents: request.internalEvents,

View File

@@ -221,6 +221,19 @@ describe("gateway server agent", () => {
expect(call.to).toBeUndefined();
});
test("agent forwards sourceReplyDeliveryMode to agentCommand", async () => {
const res = await rpcReq(ws, "agent", {
message: "hi",
sessionKey: "main",
sourceReplyDeliveryMode: "message_tool_only",
idempotencyKey: "idem-agent-source-reply-mode",
});
expect(res.ok).toBe(true);
const call = await waitForAgentCommandCall("idem-agent-source-reply-mode");
expect(call.sourceReplyDeliveryMode).toBe("message_tool_only");
});
test("agent preserves spawnDepth on subagent sessions", async () => {
await setTestSessionStore({
entries: {