diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b7dbb06018..bf2e3358822 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/media: preserve message-tool-only delivery for generated music and video completion handoffs, so group/channel completions do not finish without posting the generated attachment. - LINE: acknowledge signed webhook events before agent processing so slow model replies do not cause LINE `request_timeout` delivery failures. Fixes #65375. Thanks @myericho. - TTS: preserve channel-derived voice-note delivery for `/tts audio` replies even when the provider output is not natively voice-compatible. (#82174) Thanks @xuruiray. - Codex/Lossless: keep Codex explicit compaction on native app-server threads while allowing Lossless through the context-engine slot; `openclaw doctor --fix` now migrates legacy `compaction.provider: "lossless-claw"` config to `plugins.slots.contextEngine`. diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 04bdc6cf076..4c9b0da8bfc 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -751,6 +751,7 @@ public struct AgentParams: Codable, Sendable { public let internalruntimehandoffid: String? public let internalevents: [[String: AnyCodable]]? public let inputprovenance: [String: AnyCodable]? + public let sourcereplydeliverymode: AnyCodable? public let voicewaketrigger: String? public let idempotencykey: String public let label: String? @@ -788,6 +789,7 @@ public struct AgentParams: Codable, Sendable { internalruntimehandoffid: String?, internalevents: [[String: AnyCodable]]?, inputprovenance: [String: AnyCodable]?, + sourcereplydeliverymode: AnyCodable?, voicewaketrigger: String?, idempotencykey: String, label: String?) @@ -824,6 +826,7 @@ public struct AgentParams: Codable, Sendable { self.internalruntimehandoffid = internalruntimehandoffid self.internalevents = internalevents self.inputprovenance = inputprovenance + self.sourcereplydeliverymode = sourcereplydeliverymode self.voicewaketrigger = voicewaketrigger self.idempotencykey = idempotencykey self.label = label @@ -862,6 +865,7 @@ public struct AgentParams: Codable, Sendable { case internalruntimehandoffid = "internalRuntimeHandoffId" case internalevents = "internalEvents" case inputprovenance = "inputProvenance" + case sourcereplydeliverymode = "sourceReplyDeliveryMode" case voicewaketrigger = "voiceWakeTrigger" case idempotencykey = "idempotencyKey" case label diff --git a/src/agents/command/attempt-execution.ts b/src/agents/command/attempt-execution.ts index 4551ff46c0c..d437682e4fb 100644 --- a/src/agents/command/attempt-execution.ts +++ b/src/agents/command/attempt-execution.ts @@ -630,6 +630,7 @@ export function runAgentAttempt(params: { toolsAllow: params.opts.toolsAllow, internalEvents: params.opts.internalEvents, inputProvenance: params.opts.inputProvenance, + sourceReplyDeliveryMode: params.opts.sourceReplyDeliveryMode, streamParams: params.opts.streamParams, agentDir: params.agentDir, allowTransientCooldownProbe: params.allowTransientCooldownProbe, diff --git a/src/agents/command/types.ts b/src/agents/command/types.ts index 50b4fc7acad..6af743b79b1 100644 --- a/src/agents/command/types.ts +++ b/src/agents/command/types.ts @@ -1,6 +1,7 @@ import type { AgentInternalEvent } from "../../agents/internal-events.js"; import type { SpawnedRunMetadata } from "../../agents/spawned-context.js"; import type { PromptMode } from "../../agents/system-prompt.types.js"; +import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js"; import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.public.js"; import type { PromptImageOrderEntry } from "../../media/prompt-image-order.js"; import type { InputProvenance } from "../../sessions/input-provenance.js"; @@ -105,6 +106,8 @@ export type AgentCommandOpts = { bootstrapContextRunKind?: "default" | "heartbeat" | "cron"; internalEvents?: AgentInternalEvent[]; inputProvenance?: InputProvenance; + /** Visible source replies must be sent through the message tool when set. */ + sourceReplyDeliveryMode?: SourceReplyDeliveryMode; /** Per-call stream param overrides (best-effort). */ streamParams?: AgentStreamParams; /** Explicit workspace directory override (for subagents to inherit parent workspace). */ diff --git a/src/agents/pi-embedded-runner/run-state.ts b/src/agents/pi-embedded-runner/run-state.ts index 4b1305bf2f8..994a18e304f 100644 --- a/src/agents/pi-embedded-runner/run-state.ts +++ b/src/agents/pi-embedded-runner/run-state.ts @@ -1,3 +1,4 @@ +import type { SourceReplyDeliveryMode } from "../../auto-reply/get-reply-options.types.js"; import { getActiveReplyRunCount, listActiveReplyRunSessionIds, @@ -11,11 +12,13 @@ export type EmbeddedPiQueueHandle = { isCompacting: () => boolean; cancel?: (reason?: "user_abort" | "restart" | "superseded") => void; abort: () => void; + sourceReplyDeliveryMode?: SourceReplyDeliveryMode; }; export type EmbeddedPiQueueMessageOptions = { steeringMode?: "all"; debounceMs?: number; + sourceReplyDeliveryMode?: SourceReplyDeliveryMode; }; export type ActiveEmbeddedRunSnapshot = { diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index c70ecc2120b..370d11f7fdf 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -2902,6 +2902,7 @@ export async function runEmbeddedAttempt( }, isStreaming: () => activeSession.isStreaming, isCompacting: () => subscription.isCompacting(), + sourceReplyDeliveryMode: params.sourceReplyDeliveryMode, cancel: () => { abortRun(); }, diff --git a/src/agents/pi-embedded-runner/runs.test.ts b/src/agents/pi-embedded-runner/runs.test.ts index 4989c54811f..2401a583218 100644 --- a/src/agents/pi-embedded-runner/runs.test.ts +++ b/src/agents/pi-embedded-runner/runs.test.ts @@ -73,16 +73,46 @@ describe("pi-embedded runner run registry", () => { const queueMessage = vi.fn(async () => {}); setActiveEmbeddedRun("session-steer", { ...createRunHandle(), + sourceReplyDeliveryMode: "message_tool_only", queueMessage, }); expect( queueEmbeddedPiMessageWithOutcome("session-steer", "continue", { steeringMode: "all", + sourceReplyDeliveryMode: "message_tool_only", }).queued, ).toBe(true); - expect(queueMessage).toHaveBeenCalledWith("continue", { steeringMode: "all" }); + expect(queueMessage).toHaveBeenCalledWith("continue", { + steeringMode: "all", + sourceReplyDeliveryMode: "message_tool_only", + }); + }); + + it("rejects message-tool-only steering for active runs created without that mode", () => { + const queueMessage = vi.fn(async () => {}); + setActiveEmbeddedRun("session-automatic-source-reply", { + ...createRunHandle(), + queueMessage, + }); + + const outcome = queueEmbeddedPiMessageWithOutcome( + "session-automatic-source-reply", + "continue", + { + steeringMode: "all", + sourceReplyDeliveryMode: "message_tool_only", + }, + ); + + expect(outcome).toEqual({ + queued: false, + sessionId: "session-automatic-source-reply", + reason: "source_reply_delivery_mode_mismatch", + gatewayHealth: "live", + }); + expect(queueMessage).not.toHaveBeenCalled(); }); it("defaults active embedded steering to all pending messages", () => { diff --git a/src/agents/pi-embedded-runner/runs.ts b/src/agents/pi-embedded-runner/runs.ts index fbc2494e9fa..57c0b837964 100644 --- a/src/agents/pi-embedded-runner/runs.ts +++ b/src/agents/pi-embedded-runner/runs.ts @@ -44,6 +44,7 @@ export type EmbeddedPiQueueFailureReason = | "no_active_run" | "not_streaming" | "compacting" + | "source_reply_delivery_mode_mismatch" | "runtime_rejected"; export type EmbeddedPiQueueMessageOutcome = @@ -140,7 +141,7 @@ export function queueEmbeddedPiMessageWithOutcome( text: string, options?: EmbeddedPiQueueMessageOptions, ): EmbeddedPiQueueMessageOutcome { - const prepared = prepareEmbeddedPiQueueMessage(sessionId, text); + const prepared = prepareEmbeddedPiQueueMessage(sessionId, text, options); if (prepared.kind === "complete") { return prepared.outcome; } @@ -169,7 +170,7 @@ export async function queueEmbeddedPiMessageWithOutcomeAsync( text: string, options?: EmbeddedPiQueueMessageOptions, ): Promise { - const prepared = prepareEmbeddedPiQueueMessage(sessionId, text); + const prepared = prepareEmbeddedPiQueueMessage(sessionId, text, options); if (prepared.kind === "complete") { return prepared.outcome; } @@ -192,6 +193,7 @@ export async function queueEmbeddedPiMessageWithOutcomeAsync( function prepareEmbeddedPiQueueMessage( sessionId: string, text: string, + options?: EmbeddedPiQueueMessageOptions, ): PreparedEmbeddedPiQueueMessage { const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId); if (!handle) { @@ -219,6 +221,18 @@ function prepareEmbeddedPiQueueMessage( diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`); return { kind: "complete", outcome: createQueueFailureOutcome(sessionId, "compacting") }; } + if ( + options?.sourceReplyDeliveryMode === "message_tool_only" && + handle.sourceReplyDeliveryMode !== "message_tool_only" + ) { + diag.debug( + `queue message failed: sessionId=${sessionId} reason=source_reply_delivery_mode_mismatch`, + ); + return { + kind: "complete", + outcome: createQueueFailureOutcome(sessionId, "source_reply_delivery_mode_mismatch"), + }; + } return { kind: "embedded_run", handle }; } diff --git a/src/agents/subagent-announce-delivery.test.ts b/src/agents/subagent-announce-delivery.test.ts index 8f5ace4b3f8..fc7690df568 100644 --- a/src/agents/subagent-announce-delivery.test.ts +++ b/src/agents/subagent-announce-delivery.test.ts @@ -1411,6 +1411,80 @@ describe("deliverSubagentAnnouncement completion delivery", () => { accountId: "acct-1", to: "channel:C123", threadId: undefined, + sourceReplyDeliveryMode: "message_tool_only", + }); + expect(sendMessage).not.toHaveBeenCalled(); + }); + + it("falls back to a forced message-tool handoff when the active requester run cannot accept one", async () => { + const callGateway = createGatewayMock({ + result: { + payloads: [], + messagingToolSentTargets: [ + { + tool: "message", + provider: "slack", + accountId: "acct-1", + to: "channel:C123", + text: "The track is ready.", + mediaUrls: ["/tmp/generated-night-drive.mp3"], + }, + ], + }, + }); + const queueEmbeddedPiMessageWithOutcome = vi.fn((sessionId: string) => ({ + queued: false as const, + sessionId, + reason: "source_reply_delivery_mode_mismatch" as const, + gatewayHealth: "live" as const, + })); + const sendMessage = createSendMessageMock(); + const result = await deliverSlackChannelAnnouncement({ + callGateway, + sendMessage, + sessionId: "requester-session-channel", + isActive: true, + expectsCompletionMessage: true, + directIdempotencyKey: "announce-channel-media-message-tool-active-mismatch", + sourceTool: "music_generate", + queueEmbeddedPiMessageWithOutcome, + internalEvents: [ + { + type: "task_completion", + source: "music_generation", + childSessionKey: "music_generate:task-123", + childSessionId: "task-123", + announceType: "music generation task", + taskLabel: "night-drive synthwave", + status: "ok", + statusLabel: "completed successfully", + result: "Generated 1 track.\nMEDIA:/tmp/generated-night-drive.mp3", + mediaUrls: ["/tmp/generated-night-drive.mp3"], + replyInstruction: + "Tell the user the music is ready. If visible source delivery requires the message tool, send it there with the generated media attached.", + }, + ], + }); + + expectRecordFields(result, { + delivered: true, + path: "direct", + }); + expect(queueEmbeddedPiMessageWithOutcome).toHaveBeenCalledWith( + "requester-session-channel", + "child done", + expect.objectContaining({ + steeringMode: "all", + sourceReplyDeliveryMode: "message_tool_only", + }), + ); + expectGatewayAgentParams(callGateway, { + deliver: false, + channel: "slack", + accountId: "acct-1", + to: "channel:C123", + threadId: undefined, + sourceReplyDeliveryMode: "message_tool_only", }); expect(sendMessage).not.toHaveBeenCalled(); }); @@ -1478,6 +1552,7 @@ describe("deliverSubagentAnnouncement completion delivery", () => { accountId: "acct-1", to: origin.to, threadId: undefined, + sourceReplyDeliveryMode: "message_tool_only", }); expect(sendMessage).not.toHaveBeenCalled(); }, diff --git a/src/agents/subagent-announce-delivery.ts b/src/agents/subagent-announce-delivery.ts index 2ab6d952ee8..374e6cec72b 100644 --- a/src/agents/subagent-announce-delivery.ts +++ b/src/agents/subagent-announce-delivery.ts @@ -640,6 +640,9 @@ async function sendSubagentAnnounceDirectly(params: { directOrigin: effectiveDirectOrigin, requesterSessionOrigin, }); + const completionSourceReplyDeliveryMode = requiresMessageToolDelivery + ? "message_tool_only" + : undefined; const shouldDeliverAgentFinal = deliveryTarget.deliver && !requiresMessageToolDelivery; const requesterActivity = resolveRequesterSessionActivity(canonicalRequesterSessionKey); const requesterQueueSettings = resolveQueueSettings({ @@ -658,6 +661,9 @@ async function sendSubagentAnnounceDirectly(params: { params.triggerMessage, { steeringMode: "all", + ...(completionSourceReplyDeliveryMode + ? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode } + : {}), ...(requesterQueueSettings.debounceMs !== undefined ? { debounceMs: requesterQueueSettings.debounceMs } : {}), @@ -669,7 +675,9 @@ async function sendSubagentAnnounceDirectly(params: { path: "steered", }; } - if (requesterActivity.isActive) { + const shouldFallbackToForcedAgentHandoff = + requiresMessageToolDelivery && wakeOutcome.reason === "source_reply_delivery_mode_mismatch"; + if (requesterActivity.isActive && !shouldFallbackToForcedAgentHandoff) { // Active requester sessions should receive completion data through their // running agent turn. If wake fails, let the dispatch layer steer/retry; // do not bypass the requester agent with raw child output. @@ -717,6 +725,9 @@ async function sendSubagentAnnounceDirectly(params: { sourceChannel: params.sourceChannel ?? INTERNAL_MESSAGE_CHANNEL, sourceTool: params.sourceTool ?? "subagent_announce", }, + ...(completionSourceReplyDeliveryMode + ? { sourceReplyDeliveryMode: completionSourceReplyDeliveryMode } + : {}), idempotencyKey: params.directIdempotencyKey, }; let directAnnounceResponse: unknown; diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index 143392f313a..e59375a1f05 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -179,6 +179,9 @@ export const AgentParamsSchema = Type.Object( internalRuntimeHandoffId: Type.Optional(NonEmptyString), internalEvents: Type.Optional(Type.Array(AgentInternalEventSchema)), inputProvenance: Type.Optional(InputProvenanceSchema), + sourceReplyDeliveryMode: Type.Optional( + Type.Union([Type.Literal("automatic"), Type.Literal("message_tool_only")]), + ), voiceWakeTrigger: Type.Optional(Type.String()), idempotencyKey: NonEmptyString, label: Type.Optional(SessionLabelString), diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 63cb9745925..b6627a993c0 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -599,6 +599,7 @@ export const agentHandlers: GatewayRequestHandlers = { internalRuntimeHandoffId?: string; internalEvents?: AgentInternalEvent[]; idempotencyKey: string; + sourceReplyDeliveryMode?: "automatic" | "message_tool_only"; timeout?: number; bestEffortDeliver?: boolean; cleanupBundleMcpOnRunEnd?: boolean; @@ -1490,6 +1491,7 @@ export const agentHandlers: GatewayRequestHandlers = { acpTurnSource: request.acpTurnSource, internalEvents: request.internalEvents, inputProvenance, + sourceReplyDeliveryMode: request.sourceReplyDeliveryMode, suppressPromptPersistence: shouldSuppressAgentPromptPersistence({ inputProvenance, internalEvents: request.internalEvents, diff --git a/src/gateway/server.agent.gateway-server-agent-a.test.ts b/src/gateway/server.agent.gateway-server-agent-a.test.ts index 214a050d309..41afcbbd5af 100644 --- a/src/gateway/server.agent.gateway-server-agent-a.test.ts +++ b/src/gateway/server.agent.gateway-server-agent-a.test.ts @@ -221,6 +221,19 @@ describe("gateway server agent", () => { expect(call.to).toBeUndefined(); }); + test("agent forwards sourceReplyDeliveryMode to agentCommand", async () => { + const res = await rpcReq(ws, "agent", { + message: "hi", + sessionKey: "main", + sourceReplyDeliveryMode: "message_tool_only", + idempotencyKey: "idem-agent-source-reply-mode", + }); + expect(res.ok).toBe(true); + + const call = await waitForAgentCommandCall("idem-agent-source-reply-mode"); + expect(call.sourceReplyDeliveryMode).toBe("message_tool_only"); + }); + test("agent preserves spawnDepth on subagent sessions", async () => { await setTestSessionStore({ entries: {