From a4b17d65a8ff07bb4fdc38feedbf92f58c372303 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 9 May 2026 07:02:33 +0100 Subject: [PATCH] refactor: consolidate message delivery API --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/plugins/hooks.md | 6 + docs/plugins/sdk-channel-message.md | 62 ++++-- docs/plugins/sdk-migration.md | 2 +- docs/plugins/sdk-subpaths.md | 6 +- .../src/monitor/reply-delivery.test.ts | 11 + .../discord/src/monitor/reply-delivery.ts | 8 +- extensions/irc/src/inbound.ts | 70 +++--- extensions/irc/src/runtime-api.ts | 2 +- extensions/line/src/monitor.lifecycle.test.ts | 2 +- extensions/line/src/monitor.ts | 4 +- extensions/nextcloud-talk/runtime-api.ts | 2 +- .../src/inbound.behavior.test.ts | 7 +- extensions/nextcloud-talk/src/inbound.ts | 67 +++--- extensions/qa-channel/runtime-api.ts | 1 - extensions/qa-channel/src/inbound.test.ts | 24 +- extensions/qa-channel/src/inbound.ts | 81 ++++--- extensions/qa-channel/src/runtime-api.ts | 2 +- package.json | 4 +- scripts/check-deprecated-api-usage.mjs | 210 ++++++++++++++++++ .../check-deprecated-internal-config-api.mjs | 20 -- scripts/check.mjs | 4 +- src/agents/command/delivery.test.ts | 37 ++- src/agents/command/delivery.ts | 11 +- src/auto-reply/reply/route-reply.test.ts | 6 + src/auto-reply/reply/route-reply.ts | 16 +- src/channels/message/send.test.ts | 209 +++++++++++++++++ src/channels/message/send.ts | 138 +++++++++++- src/channels/turn/durable-delivery.test.ts | 30 +++ src/channels/turn/durable-delivery.ts | 3 + src/commands/agent.delivery.test.ts | 1 + src/cron/delivery.failure-notify.test.ts | 1 + src/cron/delivery.ts | 9 +- .../delivery-dispatch.double-announce.test.ts | 20 +- src/cron/isolated-agent/delivery-dispatch.ts | 20 +- .../delivery-outbound.runtime.ts | 6 +- src/gateway/server-methods/send.test.ts | 1 + src/gateway/server-methods/send.ts | 8 +- src/gateway/server-node-events.runtime.ts | 2 +- src/gateway/server-node-events.ts | 8 +- src/gateway/server-restart-sentinel.test.ts | 50 ++++- src/gateway/server-restart-sentinel.ts | 146 +++++++----- src/gateway/server-runtime-services.test.ts | 1 + src/gateway/server-runtime-services.ts | 4 +- src/infra/exec-approval-forwarder.runtime.ts | 2 +- src/infra/exec-approval-forwarder.ts | 18 +- ...beat-runner.isolated-key-stability.test.ts | 3 +- .../heartbeat-runner.model-override.test.ts | 3 +- src/infra/heartbeat-runner.ts | 12 +- src/infra/outbound/deliver-runtime.ts | 2 + src/infra/outbound/deliver-types.ts | 59 +++++ src/infra/outbound/deliver.test.ts | 32 +++ src/infra/outbound/deliver.ts | 194 +++++++++++++++- src/infra/outbound/message.test.ts | 46 ++++ src/infra/outbound/message.ts | 13 +- src/infra/outbound/payloads.ts | 14 +- src/infra/session-maintenance-warning.test.ts | 5 + src/infra/session-maintenance-warning.ts | 15 +- .../echo-transcript.test.ts | 27 ++- src/media-understanding/echo-transcript.ts | 17 +- src/plugin-sdk/channel-message-runtime.ts | 30 +-- src/plugin-sdk/channel-message.ts | 23 ++ src/plugin-sdk/channel-test-helpers.ts | 3 +- src/plugin-sdk/delivery-queue-runtime.test.ts | 1 + src/plugin-sdk/delivery-queue-runtime.ts | 3 +- src/plugin-sdk/inbound-reply-dispatch.ts | 18 +- src/plugin-sdk/outbound-runtime.ts | 20 +- .../test-helpers/outbound-delivery.ts | 3 +- src/plugin-sdk/testing.ts | 1 + src/plugins/compat/registry.ts | 2 +- src/plugins/hook-message.types.ts | 2 + src/plugins/hooks.ts | 2 + src/plugins/wired-hooks-message.test.ts | 4 +- 74 files changed, 1561 insertions(+), 340 deletions(-) create mode 100644 scripts/check-deprecated-api-usage.mjs delete mode 100644 scripts/check-deprecated-internal-config-api.mjs diff --git a/CHANGELOG.md b/CHANGELOG.md index 18bfc9fa078..7c0f709261e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai - Plugins/doctor: avoid full-array sorting while selecting ClawHub search/archive results and bounded dreaming doctor entries. Thanks @shakkernerd. - Agents/compaction: keep contributor diagnostics to a bounded top-three selection without sorting the full history. Thanks @shakkernerd. - Sessions/UI: avoid full-array sorting while selecting ACPX leases, Google Meet calendar events, and latest chat sessions. Thanks @shakkernerd. +- Plugin SDK: mark direct `deliverOutboundPayloads` and legacy reply-dispatch bridges as deprecated compatibility substrate, enrich `sendDurableMessageBatch` with explicit durable send outcomes, migrate bundled send/turn paths off deprecated APIs, and enforce the split with `check:deprecated-api-usage`. - Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus. - Telegram/streaming: continue over-limit draft previews in a new message instead of stopping when rendered preview text crosses Telegram's message limit. (#74508) Thanks @anagnorisis2peripeteia. - Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 1d0d9f8899e..b5144de9bfd 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -9f7ea91407a66fee6bcdebaf64bd15d0a6ae8b48cf100753c96f2ad29ad86390 plugin-sdk-api-baseline.json -4d516f6ac681cf55e916f712601abb8f5a7ddcd92a7710e8947f89c38e4054e7 plugin-sdk-api-baseline.jsonl +779d6cd6c35fb4685cb5a31c40748ab71184b89632982920d2aeee9d703cdb3a plugin-sdk-api-baseline.json +ecb5ea5dc7b4573c61070b2cbdfdcbc3436032bbe39c3045dd8d5213d18ec858 plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/hooks.md b/docs/plugins/hooks.md index f72d21ea65c..52e99295715 100644 --- a/docs/plugins/hooks.md +++ b/docs/plugins/hooks.md @@ -369,6 +369,12 @@ Decision rules: - `message_sending` with `cancel: false` is treated as no decision. - Rewritten `content` continues to lower-priority hooks unless a later hook cancels delivery. +- `message_sending` can return `cancelReason` and bounded `metadata` with a + cancellation. New message lifecycle APIs expose this as a suppressed delivery + outcome with reason `cancelled_by_message_sending_hook`; legacy direct + delivery keeps returning an empty result array for compatibility. +- `message_sent` is observation-only. Handler failures are logged and do not + change the delivery result. ## Install hooks diff --git a/docs/plugins/sdk-channel-message.md b/docs/plugins/sdk-channel-message.md index 2502bf17fd2..d4644ebf00b 100644 --- a/docs/plugins/sdk-channel-message.md +++ b/docs/plugins/sdk-channel-message.md @@ -31,6 +31,36 @@ Runtime delivery helpers are available from `openclaw/plugin-sdk/channel-message-runtime` for monitor/send code paths that are already doing asynchronous message I/O. +New channel and plugin send code should use the message lifecycle helpers from +`openclaw/plugin-sdk/channel-message-runtime`: `sendDurableMessageBatch`, +`withDurableMessageSendContext`, or `deliverInboundReplyWithMessageSendContext`. +The older +`deliverOutboundPayloads(...)` helper in `openclaw/plugin-sdk/outbound-runtime` +is deprecated compatibility/runtime substrate for outbound internals, recovery, +and legacy adapters. Do not use it for new channel or plugin send paths. + +`sendDurableMessageBatch(...)` returns an explicit lifecycle outcome: + +- `sent` - at least one visible platform message was delivered. +- `suppressed` - no platform message should be treated as missing. Stable + reasons include `cancelled_by_message_sending_hook`, + `empty_after_message_sending_hook`, `no_visible_payload`, + `adapter_returned_no_identity`, and legacy `no_visible_result`. +- `partial_failed` - at least one platform message was delivered before a later + payload or side effect failed. The result includes the delivered receipt prefix + plus the failure. +- `failed` - no platform receipt was produced. + +Use `payloadOutcomes` when a batch mixes sent, suppressed, and failed payloads. +Do not infer hook cancellation by checking whether the old direct-delivery array +is empty. + +Compatibility dispatchers that still need the buffered reply dispatcher should +build reply-prefix options with `createChannelMessageReplyPipeline(...)` from +`openclaw/plugin-sdk/channel-message`, then call the runtime's +`channel.turn.runPrepared(...)`. That keeps session recording and dispatch +ordering on the shared turn lifecycle without adding another public turn wrapper. + ## Minimal adapter Most new channel plugins can start with a small adapter: @@ -124,8 +154,10 @@ tool send, implement `actions.prepareSendPayload(...)` instead of sending from `prepareSendPayload(...)` receives the normalized core `ReplyPayload` plus the full action context. Return a payload with channel-specific data in `payload.channelData.` and let core call `sendMessage(...)`, -`deliverOutboundPayloads(...)`, the write-ahead queue, message-sending hooks, -retry, recovery, and ack cleanup. +the message lifecycle runtime, the write-ahead queue, message-sending hooks, +retry, recovery, and ack cleanup. The lifecycle runtime may call +`deliverOutboundPayloads(...)` internally as compatibility substrate, but channel +plugins should not call it directly for new send behavior. Return `null` only when the send cannot be represented as a durable payload, for example because it contains a non-serializable component factory. Core will keep @@ -390,17 +422,21 @@ surface. These APIs remain importable for third-party compatibility. Do not use them for new channel code. -| Deprecated API | Replacement | -| -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- | -| `openclaw/plugin-sdk/channel-reply-pipeline` | `openclaw/plugin-sdk/channel-message` | -| `createChannelTurnReplyPipeline(...)` | `createChannelMessageReplyPipeline(...)` for compatibility dispatchers, or a `message` adapter for new channel code | -| `deliverDurableInboundReplyPayload(...)` | `deliverInboundReplyWithMessageSendContext(...)` from `openclaw/plugin-sdk/channel-message-runtime` | -| `dispatchInboundReplyWithBase(...)` | `dispatchChannelMessageReplyWithBase(...)` only for compatibility dispatchers | -| `recordInboundSessionAndDispatchReply(...)` | `recordChannelMessageReplyDispatch(...)` only for compatibility dispatchers | -| `resolveChannelSourceReplyDeliveryMode(...)` | `resolveChannelMessageSourceReplyDeliveryMode(...)` | -| `deliverFinalizableDraftPreview(...)` | `defineFinalizableLivePreviewAdapter(...)` plus `deliverWithFinalizableLivePreviewAdapter(...)` | -| `DraftPreviewFinalizerDraft` | `LivePreviewFinalizerDraft` | -| `DraftPreviewFinalizerResult` | `LivePreviewFinalizerResult` | +| Deprecated API | Replacement | +| -------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | +| `openclaw/plugin-sdk/channel-reply-pipeline` | `openclaw/plugin-sdk/channel-message` | +| `createChannelTurnReplyPipeline(...)` | `createChannelMessageReplyPipeline(...)` for compatibility dispatchers, or a `message` adapter for new channel code | +| `buildChannelMessageReplyDispatchBase(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code | +| `dispatchChannelMessageReplyWithBase(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code | +| `recordChannelMessageReplyDispatch(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code | +| `deliverOutboundPayloads(...)` | `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)` from `channel-message-runtime` | +| `deliverDurableInboundReplyPayload(...)` | `deliverInboundReplyWithMessageSendContext(...)` from `openclaw/plugin-sdk/channel-message-runtime` | +| `dispatchInboundReplyWithBase(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code | +| `recordInboundSessionAndDispatchReply(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code | +| `resolveChannelSourceReplyDeliveryMode(...)` | `resolveChannelMessageSourceReplyDeliveryMode(...)` | +| `deliverFinalizableDraftPreview(...)` | `defineFinalizableLivePreviewAdapter(...)` plus `deliverWithFinalizableLivePreviewAdapter(...)` | +| `DraftPreviewFinalizerDraft` | `LivePreviewFinalizerDraft` | +| `DraftPreviewFinalizerResult` | `LivePreviewFinalizerResult` | Compatibility dispatchers can still use `createReplyPrefixContext(...)`, `createReplyPrefixOptions(...)`, and `createTypingCallbacks(...)` through the diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index 7f68c6185ef..515ceb0ae7e 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -250,7 +250,7 @@ releases. helpers for external plugins during the migration window and warn once with the `runtime-config-load-write` compatibility code. Bundled plugins and repo runtime code are protected by scanner guardrails in - `pnpm check:deprecated-internal-config-api` and + `pnpm check:deprecated-api-usage` and `pnpm check:no-runtime-action-load-config`: new production plugin usage fails outright, direct config writes fail, gateway server methods must use the request runtime snapshot, runtime channel send/action/client helpers diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index 4e02d731c57..7294bca03a8 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -66,14 +66,14 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview) | `plugin-sdk/command-gating` | Narrow command authorization gate helpers | | `plugin-sdk/channel-policy` | `resolveChannelGroupRequireMention` | | `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, `createChannelRunQueue`, and legacy draft stream lifecycle helpers. New preview finalization code should use `plugin-sdk/channel-message`. | - | `plugin-sdk/channel-message` | Cheap message lifecycle contract helpers such as `defineChannelMessageAdapter`, `createChannelMessageAdapterFromOutbound`, `createReplyPrefixContext`, `resolveChannelMessageSourceReplyDeliveryMode`, compatibility facades, durable-final capability derivation, capability proof helpers for send/receipt/side-effect capabilities, `MessageReceiveContext`, receive ack policy proofs, `defineFinalizableLivePreviewAdapter`, `deliverWithFinalizableLivePreviewAdapter`, live-preview and live-finalizer capability proofs, durable recovery state, `RenderedMessageBatch`, message receipt types, and receipt id helpers. See [Channel message API](/plugins/sdk-channel-message). Legacy `createChannelTurnReplyPipeline` remains only for compatibility dispatchers. | - | `plugin-sdk/channel-message-runtime` | Runtime delivery helpers that may load outbound delivery, including `deliverInboundReplyWithMessageSendContext`, `sendDurableMessageBatch`, `withDurableMessageSendContext`, `dispatchChannelMessageReplyWithBase`, and `recordChannelMessageReplyDispatch`. Use from monitor/send runtime modules, not hot plugin bootstrap files. | + | `plugin-sdk/channel-message` | Cheap message lifecycle contract helpers such as `defineChannelMessageAdapter`, `createChannelMessageAdapterFromOutbound`, `createChannelMessageReplyPipeline`, `createReplyPrefixContext`, `resolveChannelMessageSourceReplyDeliveryMode`, durable-final capability derivation, capability proof helpers for send/receipt/side-effect capabilities, `MessageReceiveContext`, receive ack policy proofs, `defineFinalizableLivePreviewAdapter`, `deliverWithFinalizableLivePreviewAdapter`, live-preview and live-finalizer capability proofs, durable recovery state, `RenderedMessageBatch`, message receipt types, and receipt id helpers. See [Channel message API](/plugins/sdk-channel-message). Legacy reply-dispatch facades are deprecated compatibility only. | + | `plugin-sdk/channel-message-runtime` | Runtime delivery helpers that may load outbound delivery, including `deliverInboundReplyWithMessageSendContext`, `sendDurableMessageBatch`, and `withDurableMessageSendContext`. Deprecated reply-dispatch bridges remain importable for compatibility dispatchers only. Use from monitor/send runtime modules, not hot plugin bootstrap files. | | `plugin-sdk/inbound-envelope` | Shared inbound route + envelope builder helpers | | `plugin-sdk/inbound-reply-dispatch` | Legacy shared inbound record-and-dispatch helpers, visible/final dispatch predicates, and deprecated `deliverDurableInboundReplyPayload` compatibility for prepared channel dispatchers. New channel receive/dispatch code should import runtime lifecycle helpers from `plugin-sdk/channel-message-runtime`. | | `plugin-sdk/messaging-targets` | Target parsing/matching helpers | | `plugin-sdk/outbound-media` | Shared outbound media loading helpers | | `plugin-sdk/outbound-send-deps` | Lightweight outbound send dependency lookup for channel adapters | - | `plugin-sdk/outbound-runtime` | Outbound delivery, identity, send delegate, session, formatting, and payload planning helpers | + | `plugin-sdk/outbound-runtime` | Outbound identity, send delegate, session, formatting, and payload planning helpers. Direct delivery helpers such as `deliverOutboundPayloads` are deprecated compatibility substrate; use `plugin-sdk/channel-message-runtime` for new send paths. | | `plugin-sdk/poll-runtime` | Narrow poll normalization helpers | | `plugin-sdk/thread-bindings-runtime` | Thread-binding lifecycle and adapter helpers | | `plugin-sdk/agent-media-payload` | Legacy agent media payload builder | diff --git a/extensions/discord/src/monitor/reply-delivery.test.ts b/extensions/discord/src/monitor/reply-delivery.test.ts index 99a709a7445..9d063b3f9fa 100644 --- a/extensions/discord/src/monitor/reply-delivery.test.ts +++ b/extensions/discord/src/monitor/reply-delivery.test.ts @@ -19,6 +19,17 @@ vi.mock("openclaw/plugin-sdk/outbound-runtime", async () => { }; }); +vi.mock("../../../../src/infra/outbound/deliver.js", async () => { + const actual = await vi.importActual( + "../../../../src/infra/outbound/deliver.js", + ); + return { + ...actual, + deliverOutboundPayloads: deliverOutboundPayloadsMock, + deliverOutboundPayloadsInternal: deliverOutboundPayloadsMock, + }; +}); + vi.mock("../send.js", async () => { const actual = await vi.importActual("../send.js"); return { diff --git a/extensions/discord/src/monitor/reply-delivery.ts b/extensions/discord/src/monitor/reply-delivery.ts index 8384dcf2da5..323798ffae4 100644 --- a/extensions/discord/src/monitor/reply-delivery.ts +++ b/extensions/discord/src/monitor/reply-delivery.ts @@ -1,4 +1,5 @@ import { resolveAgentAvatar } from "openclaw/plugin-sdk/agent-runtime"; +import { sendDurableMessageBatch } from "openclaw/plugin-sdk/channel-message"; import type { MarkdownTableMode, OpenClawConfig, @@ -7,7 +8,6 @@ import type { import type { OutboundMediaAccess } from "openclaw/plugin-sdk/media-runtime"; import { buildOutboundSessionContext, - deliverOutboundPayloads, type OutboundDeliveryFormattingOptions, type OutboundIdentity, type OutboundSendDeps, @@ -181,7 +181,7 @@ export async function deliverDiscordReply(params: { return; } - const results = await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg: params.cfg, channel: "discord", to: delivery.to, @@ -205,6 +205,10 @@ export async function deliverDiscordReply(params: { requesterAccountId: params.accountId, }), }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } + const results = send.status === "sent" ? send.results : []; if (results.length === 0) { throw new Error(`discord final reply produced no delivered message for ${delivery.to}`); } diff --git a/extensions/irc/src/inbound.ts b/extensions/irc/src/inbound.ts index 2954a8c5590..a8e631a404f 100644 --- a/extensions/irc/src/inbound.ts +++ b/extensions/irc/src/inbound.ts @@ -1,3 +1,4 @@ +import { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message"; import { createChannelPairingController } from "openclaw/plugin-sdk/channel-pairing"; import { readStoreAllowFromForDmPolicy, @@ -333,38 +334,53 @@ export async function handleIrcInbound(params: { CommandAuthorized: commandAuthorized, }); - const { dispatchChannelMessageReplyWithBase } = - await import("openclaw/plugin-sdk/channel-message"); - await dispatchChannelMessageReplyWithBase({ + const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({ cfg: config as OpenClawConfig, + agentId: route.agentId, channel: CHANNEL_ID, accountId: account.accountId, - route, + }); + + await core.channel.turn.runPrepared({ + channel: CHANNEL_ID, + accountId: account.accountId, + routeSessionKey: route.sessionKey, storePath, ctxPayload, - core, - deliver: async (payload) => { - await deliverIrcReply({ - payload, - cfg: config, - target: peerId, - accountId: account.accountId, - sendReply: params.sendReply, - statusSink, - }); - }, - onRecordError: (err) => { - runtime.error?.(`irc: failed updating session meta: ${String(err)}`); - }, - onDispatchError: (err, info) => { - runtime.error?.(`irc ${info.kind} reply failed: ${String(err)}`); - }, - replyOptions: { - skillFilter: groupMatch.groupConfig?.skills, - disableBlockStreaming: - typeof account.config.blockStreaming === "boolean" - ? !account.config.blockStreaming - : undefined, + recordInboundSession: core.channel.session.recordInboundSession, + runDispatch: async () => + await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: config as OpenClawConfig, + dispatcherOptions: { + ...replyPipeline, + deliver: async (payload) => { + await deliverIrcReply({ + payload, + cfg: config, + target: peerId, + accountId: account.accountId, + sendReply: params.sendReply, + statusSink, + }); + }, + onError: (err, info) => { + runtime.error?.(`irc ${info.kind} reply failed: ${String(err)}`); + }, + }, + replyOptions: { + onModelSelected, + skillFilter: groupMatch.groupConfig?.skills, + disableBlockStreaming: + typeof account.config.blockStreaming === "boolean" + ? !account.config.blockStreaming + : undefined, + }, + }), + record: { + onRecordError: (err) => { + runtime.error?.(`irc: failed updating session meta: ${String(err)}`); + }, }, }); } diff --git a/extensions/irc/src/runtime-api.ts b/extensions/irc/src/runtime-api.ts index 971204dc12b..5f67497d5b3 100644 --- a/extensions/irc/src/runtime-api.ts +++ b/extensions/irc/src/runtime-api.ts @@ -29,7 +29,7 @@ export { resolveEffectiveAllowFromLists, } from "openclaw/plugin-sdk/channel-policy"; export { resolveControlCommandGate } from "openclaw/plugin-sdk/command-auth"; -export { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message"; +export { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message"; export { chunkTextForOutbound } from "openclaw/plugin-sdk/text-chunking"; export { deliverFormattedTextWithAttachments, diff --git a/extensions/line/src/monitor.lifecycle.test.ts b/extensions/line/src/monitor.lifecycle.test.ts index 6536b3e604b..c627750e65e 100644 --- a/extensions/line/src/monitor.lifecycle.test.ts +++ b/extensions/line/src/monitor.lifecycle.test.ts @@ -64,7 +64,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", async () => { vi.mock("openclaw/plugin-sdk/channel-message", () => ({ createChannelMessageReplyPipeline: vi.fn(() => ({})), - hasFinalChannelMessageReplyDispatch: vi.fn(() => false), + hasFinalChannelTurnDispatch: vi.fn(() => false), })); vi.mock("openclaw/plugin-sdk/webhook-ingress", async () => { diff --git a/extensions/line/src/monitor.ts b/extensions/line/src/monitor.ts index 70f3c4c0fd9..bb32a8e9358 100644 --- a/extensions/line/src/monitor.ts +++ b/extensions/line/src/monitor.ts @@ -1,5 +1,5 @@ import type { webhook } from "@line/bot-sdk"; -import { hasFinalChannelMessageReplyDispatch } from "openclaw/plugin-sdk/channel-message"; +import { hasFinalChannelTurnDispatch } from "openclaw/plugin-sdk/channel-message"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { chunkMarkdownText } from "openclaw/plugin-sdk/reply-runtime"; import { @@ -313,7 +313,7 @@ export async function monitorLineProvider( }, }); const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined; - if (!hasFinalChannelMessageReplyDispatch(dispatchResult)) { + if (!hasFinalChannelTurnDispatch(dispatchResult)) { logVerbose(`line: no response generated for message from ${ctxPayload.From}`); } } catch (err) { diff --git a/extensions/nextcloud-talk/runtime-api.ts b/extensions/nextcloud-talk/runtime-api.ts index a179ef13bbe..60f9e0c0ea8 100644 --- a/extensions/nextcloud-talk/runtime-api.ts +++ b/extensions/nextcloud-talk/runtime-api.ts @@ -23,7 +23,7 @@ export { resolveDefaultGroupPolicy, warnMissingProviderGroupPolicyFallbackOnce, } from "openclaw/plugin-sdk/runtime-group-policy"; -export { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message"; +export { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message"; export type { OutboundReplyPayload } from "openclaw/plugin-sdk/reply-payload"; export { deliverFormattedTextWithAttachments } from "openclaw/plugin-sdk/reply-payload"; export type { PluginRuntime } from "openclaw/plugin-sdk/runtime-store"; diff --git a/extensions/nextcloud-talk/src/inbound.behavior.test.ts b/extensions/nextcloud-talk/src/inbound.behavior.test.ts index 55824d58364..f9bb8585c50 100644 --- a/extensions/nextcloud-talk/src/inbound.behavior.test.ts +++ b/extensions/nextcloud-talk/src/inbound.behavior.test.ts @@ -7,7 +7,6 @@ import type { CoreConfig, NextcloudTalkInboundMessage } from "./types.js"; const { createChannelPairingControllerMock, - dispatchChannelMessageReplyWithBaseMock, readStoreAllowFromForDmPolicyMock, resolveDmGroupAccessWithCommandGateMock, resolveAllowlistProviderRuntimeGroupPolicyMock, @@ -16,7 +15,6 @@ const { } = vi.hoisted(() => { return { createChannelPairingControllerMock: vi.fn(), - dispatchChannelMessageReplyWithBaseMock: vi.fn(), readStoreAllowFromForDmPolicyMock: vi.fn(), resolveDmGroupAccessWithCommandGateMock: vi.fn(), resolveAllowlistProviderRuntimeGroupPolicyMock: vi.fn(), @@ -33,7 +31,6 @@ vi.mock("../runtime-api.js", async () => { return { ...actual, createChannelPairingController: createChannelPairingControllerMock, - dispatchChannelMessageReplyWithBase: dispatchChannelMessageReplyWithBaseMock, readStoreAllowFromForDmPolicy: readStoreAllowFromForDmPolicyMock, resolveDmGroupAccessWithCommandGate: resolveDmGroupAccessWithCommandGateMock, resolveAllowlistProviderRuntimeGroupPolicy: resolveAllowlistProviderRuntimeGroupPolicyMock, @@ -183,7 +180,7 @@ describe("nextcloud-talk inbound behavior", () => { .find((status) => status.lastOutboundAt !== undefined); expect(typeof outboundStatus?.lastOutboundAt).toBe("number"); expect(outboundStatus?.lastOutboundAt).toBeGreaterThanOrEqual(1_736_380_800_000); - expect(dispatchChannelMessageReplyWithBaseMock).not.toHaveBeenCalled(); + expect(sendMessageNextcloudTalkMock).toHaveBeenCalledTimes(1); }); it("drops unmentioned group traffic before dispatch", async () => { @@ -222,7 +219,7 @@ describe("nextcloud-talk inbound behavior", () => { runtime, }); - expect(dispatchChannelMessageReplyWithBaseMock).not.toHaveBeenCalled(); + expect(sendMessageNextcloudTalkMock).not.toHaveBeenCalled(); expect(runtime.log).toHaveBeenCalledWith("nextcloud-talk: drop room room-group (no mention)"); }); }); diff --git a/extensions/nextcloud-talk/src/inbound.ts b/extensions/nextcloud-talk/src/inbound.ts index 687c72fb082..2c59c787257 100644 --- a/extensions/nextcloud-talk/src/inbound.ts +++ b/extensions/nextcloud-talk/src/inbound.ts @@ -1,9 +1,9 @@ import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import { GROUP_POLICY_BLOCKED_LABEL, + createChannelMessageReplyPipeline, createChannelPairingController, deliverFormattedTextWithAttachments, - dispatchChannelMessageReplyWithBase, logInboundDrop, readStoreAllowFromForDmPolicy, resolveAllowlistProviderRuntimeGroupPolicy, @@ -286,35 +286,52 @@ export async function handleNextcloudTalkInbound(params: { CommandAuthorized: commandAuthorized, }); - await dispatchChannelMessageReplyWithBase({ + const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({ cfg: config as OpenClawConfig, + agentId: route.agentId, channel: CHANNEL_ID, accountId: account.accountId, - route, + }); + + await core.channel.turn.runPrepared({ + channel: CHANNEL_ID, + accountId: account.accountId, + routeSessionKey: route.sessionKey, storePath, ctxPayload, - core, - deliver: async (payload) => { - await deliverNextcloudTalkReply({ - cfg: config, - payload, - roomToken, - accountId: account.accountId, - statusSink, - }); - }, - onRecordError: (err) => { - runtime.error?.(`nextcloud-talk: failed updating session meta: ${String(err)}`); - }, - onDispatchError: (err, info) => { - runtime.error?.(`nextcloud-talk ${info.kind} reply failed: ${String(err)}`); - }, - replyOptions: { - skillFilter: roomConfig?.skills, - disableBlockStreaming: - typeof account.config.blockStreaming === "boolean" - ? !account.config.blockStreaming - : undefined, + recordInboundSession: core.channel.session.recordInboundSession, + runDispatch: async () => + await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: config as OpenClawConfig, + dispatcherOptions: { + ...replyPipeline, + deliver: async (payload) => { + await deliverNextcloudTalkReply({ + cfg: config, + payload, + roomToken, + accountId: account.accountId, + statusSink, + }); + }, + onError: (err, info) => { + runtime.error?.(`nextcloud-talk ${info.kind} reply failed: ${String(err)}`); + }, + }, + replyOptions: { + onModelSelected, + skillFilter: roomConfig?.skills, + disableBlockStreaming: + typeof account.config.blockStreaming === "boolean" + ? !account.config.blockStreaming + : undefined, + }, + }), + record: { + onRecordError: (err) => { + runtime.error?.(`nextcloud-talk: failed updating session meta: ${String(err)}`); + }, }, }); } diff --git a/extensions/qa-channel/runtime-api.ts b/extensions/qa-channel/runtime-api.ts index 943612b2c1c..81299320cd3 100644 --- a/extensions/qa-channel/runtime-api.ts +++ b/extensions/qa-channel/runtime-api.ts @@ -10,7 +10,6 @@ export { createDefaultChannelRuntimeState, createPluginRuntimeStore, defineChannelPluginEntry, - dispatchChannelMessageReplyWithBase, getChatChannelMeta, jsonResult, type OpenClawConfig, diff --git a/extensions/qa-channel/src/inbound.test.ts b/extensions/qa-channel/src/inbound.test.ts index b55217e48be..5f5a598a89e 100644 --- a/extensions/qa-channel/src/inbound.test.ts +++ b/extensions/qa-channel/src/inbound.test.ts @@ -1,22 +1,8 @@ import { createPluginRuntimeMock } from "openclaw/plugin-sdk/channel-test-helpers"; -import { beforeEach, describe, expect, it, vi } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { setQaChannelRuntime } from "../api.js"; import { handleQaInbound, isHttpMediaUrl } from "./inbound.js"; -const dispatchChannelMessageReplyWithBaseMock = vi.hoisted(() => vi.fn()); - -vi.mock("openclaw/plugin-sdk/channel-message", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - dispatchChannelMessageReplyWithBase: dispatchChannelMessageReplyWithBaseMock, - }; -}); - -beforeEach(() => { - dispatchChannelMessageReplyWithBaseMock.mockReset(); -}); - describe("isHttpMediaUrl", () => { it("accepts only http and https urls", () => { expect(isHttpMediaUrl("https://example.com/image.png")).toBe(true); @@ -64,9 +50,9 @@ describe("handleQaInbound", () => { }, }); - expect(dispatchChannelMessageReplyWithBaseMock).toHaveBeenCalledTimes(1); - expect(dispatchChannelMessageReplyWithBaseMock.mock.calls[0]?.[0].ctxPayload.WasMentioned).toBe( - true, - ); + expect(runtime.channel.turn.runPrepared).toHaveBeenCalledTimes(1); + expect( + vi.mocked(runtime.channel.turn.runPrepared).mock.calls[0]?.[0].ctxPayload.WasMentioned, + ).toBe(true); }); }); diff --git a/extensions/qa-channel/src/inbound.ts b/extensions/qa-channel/src/inbound.ts index 28a2a4bade8..a1500a09b39 100644 --- a/extensions/qa-channel/src/inbound.ts +++ b/extensions/qa-channel/src/inbound.ts @@ -1,4 +1,4 @@ -import { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message"; +import { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { buildAgentMediaPayload, @@ -151,42 +151,59 @@ export async function handleQaInbound(params: { ...mediaPayload, }); - await dispatchChannelMessageReplyWithBase({ + const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({ cfg: params.config as OpenClawConfig, + agentId: route.agentId, channel: params.channelId, accountId: params.account.accountId, - route, + }); + + await runtime.channel.turn.runPrepared({ + channel: params.channelId, + accountId: params.account.accountId, + routeSessionKey: route.sessionKey, storePath, ctxPayload, - core: runtime, - deliver: async (payload) => { - const text = - payload && typeof payload === "object" && "text" in payload - ? ((payload as { text?: string }).text ?? "") - : ""; - if (!text.trim()) { - return; - } - await sendQaBusMessage({ - baseUrl: params.account.baseUrl, - accountId: params.account.accountId, - to: target, - text, - senderId: params.account.botUserId, - senderName: params.account.botDisplayName, - threadId: inbound.threadId, - replyToId: inbound.id, - }); - }, - onRecordError: (error) => { - throw error instanceof Error - ? error - : new Error(`qa-channel session record failed: ${String(error)}`); - }, - onDispatchError: (error) => { - throw error instanceof Error - ? error - : new Error(`qa-channel dispatch failed: ${String(error)}`); + recordInboundSession: runtime.channel.session.recordInboundSession, + runDispatch: async () => + await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ + ctx: ctxPayload, + cfg: params.config as OpenClawConfig, + dispatcherOptions: { + ...replyPipeline, + deliver: async (payload) => { + const text = + payload && typeof payload === "object" && "text" in payload + ? ((payload as { text?: string }).text ?? "") + : ""; + if (!text.trim()) { + return; + } + await sendQaBusMessage({ + baseUrl: params.account.baseUrl, + accountId: params.account.accountId, + to: target, + text, + senderId: params.account.botUserId, + senderName: params.account.botDisplayName, + threadId: inbound.threadId, + replyToId: inbound.id, + }); + }, + onError: (error) => { + throw error instanceof Error + ? error + : new Error(`qa-channel dispatch failed: ${String(error)}`); + }, + }, + replyOptions: { onModelSelected }, + }), + record: { + onRecordError: (error) => { + throw error instanceof Error + ? error + : new Error(`qa-channel session record failed: ${String(error)}`); + }, }, }); } diff --git a/extensions/qa-channel/src/runtime-api.ts b/extensions/qa-channel/src/runtime-api.ts index ca97c5fd9e7..04c22bc5f6b 100644 --- a/extensions/qa-channel/src/runtime-api.ts +++ b/extensions/qa-channel/src/runtime-api.ts @@ -20,4 +20,4 @@ export { createDefaultChannelRuntimeState, } from "openclaw/plugin-sdk/status-helpers"; export { createPluginRuntimeStore } from "openclaw/plugin-sdk/runtime-store"; -export { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message"; +export { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message"; diff --git a/package.json b/package.json index ed8f22a134c..5c3a7292d4c 100644 --- a/package.json +++ b/package.json @@ -1316,12 +1316,12 @@ "canvas:a2ui:bundle": "node scripts/bundle-a2ui.mjs", "changed:lanes": "node scripts/changed-lanes.mjs", "check": "node scripts/check.mjs", - "check:architecture": "pnpm check:import-cycles && pnpm check:madge-import-cycles && pnpm check:deprecated-internal-config-api && pnpm check:deprecated-jsdoc", + "check:architecture": "pnpm check:import-cycles && pnpm check:madge-import-cycles && pnpm check:deprecated-api-usage && pnpm check:deprecated-jsdoc", "check:base-config-schema": "node --import tsx scripts/generate-base-config-schema.ts --check", "check:bundled-channel-config-metadata": "node --import tsx scripts/generate-bundled-channel-config-metadata.ts --check", "check:changed": "node scripts/check-changed.mjs", "check:changelog-attributions": "node scripts/check-changelog-attributions.mjs", - "check:deprecated-internal-config-api": "node scripts/check-deprecated-internal-config-api.mjs", + "check:deprecated-api-usage": "node scripts/check-deprecated-api-usage.mjs", "check:deprecated-jsdoc": "node scripts/check-deprecated-jsdoc.mjs", "check:docs": "pnpm format:docs:check && pnpm lint:docs && pnpm docs:check-mdx && pnpm docs:check-i18n-glossary && pnpm docs:check-links", "check:host-env-policy:swift": "node scripts/generate-host-env-security-policy-swift.mjs --check", diff --git a/scripts/check-deprecated-api-usage.mjs b/scripts/check-deprecated-api-usage.mjs new file mode 100644 index 00000000000..b04378b6844 --- /dev/null +++ b/scripts/check-deprecated-api-usage.mjs @@ -0,0 +1,210 @@ +#!/usr/bin/env node +import fs from "node:fs"; +import path from "node:path"; +import { collectDeprecatedInternalConfigApiViolations } from "./lib/deprecated-config-api-guard.mjs"; + +const repoRoot = process.cwd(); + +const sourceExtensions = new Set([".ts", ".tsx", ".js", ".mjs", ".mts"]); +const skippedSegments = new Set(["node_modules", "dist", "build", "coverage", ".turbo"]); +const skippedFilePatterns = [ + /\.test\.[cm]?[jt]sx?$/u, + /\.spec\.[cm]?[jt]sx?$/u, + /\.e2e\.[cm]?[jt]sx?$/u, + /\.d\.ts$/u, +]; + +function toRepoPath(filePath) { + return path.relative(repoRoot, filePath).split(path.sep).join("/"); +} + +function shouldSkipFile(filePath, rule) { + const repoPath = toRepoPath(filePath); + return (rule.skippedFilePatterns ?? skippedFilePatterns).some((pattern) => + pattern.test(repoPath), + ); +} + +function* walk(dir, rule) { + if (!fs.existsSync(dir)) { + return; + } + for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + if (skippedSegments.has(entry.name)) { + continue; + } + const entryPath = path.join(dir, entry.name); + if (entry.isDirectory()) { + yield* walk(entryPath, rule); + continue; + } + if (!entry.isFile() || !sourceExtensions.has(path.extname(entry.name))) { + continue; + } + if (!shouldSkipFile(entryPath, rule)) { + yield entryPath; + } + } +} + +function escapeRegExp(value) { + return value.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&"); +} + +function collectIdentifierRuleViolations(rule) { + const allowedFiles = new Set(rule.allowedFiles ?? []); + const pattern = new RegExp( + `\\b(?:${rule.names.map((name) => escapeRegExp(name)).join("|")})\\b`, + "gu", + ); + const violations = []; + + for (const root of rule.roots) { + for (const filePath of walk(path.join(repoRoot, root), rule)) { + const repoPath = toRepoPath(filePath); + if (allowedFiles.has(repoPath)) { + continue; + } + const source = fs.readFileSync(filePath, "utf8"); + for (const match of source.matchAll(pattern)) { + const line = source.slice(0, match.index).split("\n").length; + violations.push(`${repoPath}:${line}: ${match[0]} (${rule.message})`); + } + } + } + + return violations; +} + +function collectModuleSpecifierRuleViolations(rule) { + const allowedFiles = new Set(rule.allowedFiles ?? []); + const specifierPattern = rule.moduleSpecifiers + .map((specifier) => escapeRegExp(specifier)) + .join("|"); + const patterns = [ + new RegExp( + `\\bimport\\s+(?:type\\s+)?(?:[^"']+?\\s+from\\s+)?["'](${specifierPattern})["']`, + "gu", + ), + new RegExp( + `\\bexport\\s+(?:type\\s+)?(?:\\*\\s+from\\s+|[^"']+?\\s+from\\s+)["'](${specifierPattern})["']`, + "gu", + ), + new RegExp(`\\bimport\\(\\s*["'](${specifierPattern})["']\\s*\\)`, "gu"), + ]; + const violations = []; + + for (const root of rule.roots) { + for (const filePath of walk(path.join(repoRoot, root), rule)) { + const repoPath = toRepoPath(filePath); + if (allowedFiles.has(repoPath)) { + continue; + } + const source = fs.readFileSync(filePath, "utf8"); + for (const pattern of patterns) { + for (const match of source.matchAll(pattern)) { + const line = source.slice(0, match.index).split("\n").length; + violations.push(`${repoPath}:${line}: ${match[1]} (${rule.message})`); + } + } + } + } + + return violations; +} + +function collectRuleViolations(rule) { + if (rule.collect) { + return rule.collect(); + } + if (rule.moduleSpecifiers) { + return collectModuleSpecifierRuleViolations(rule); + } + return collectIdentifierRuleViolations(rule); +} + +const rules = [ + { + id: "internal-config-api", + collect: () => collectDeprecatedInternalConfigApiViolations(), + }, + { + id: "plugin-sdk-compat-subpaths", + roots: ["src", "extensions", "packages"], + moduleSpecifiers: [ + "openclaw/plugin-sdk/agent-dir-compat", + "openclaw/plugin-sdk/channel-config-schema-legacy", + "openclaw/plugin-sdk/channel-reply-pipeline", + "openclaw/plugin-sdk/channel-runtime", + "openclaw/plugin-sdk/compat", + "openclaw/plugin-sdk/discord", + "openclaw/plugin-sdk/infra-runtime", + "openclaw/plugin-sdk/mattermost", + "openclaw/plugin-sdk/matrix", + "openclaw/plugin-sdk/telegram-account", + "openclaw/plugin-sdk/testing", + "openclaw/plugin-sdk/test-utils", + "openclaw/plugin-sdk/zalouser", + ], + message: "use focused non-deprecated plugin SDK subpaths", + }, + { + id: "message-api", + roots: ["src", "extensions", "packages"], + names: [ + "deliverOutboundPayloads", + "dispatchChannelMessageReplyWithBase", + "recordChannelMessageReplyDispatch", + "buildChannelMessageReplyDispatchBase", + "hasFinalChannelMessageReplyDispatch", + "hasVisibleChannelMessageReplyDispatch", + "resolveChannelMessageReplyDispatchCounts", + "createChannelTurnReplyPipeline", + "deliverDurableInboundReplyPayload", + ], + allowedFiles: [ + "src/channels/turn/durable-delivery.ts", + "src/channels/turn/kernel.ts", + "src/infra/outbound/deliver-runtime.ts", + "src/infra/outbound/deliver.ts", + "src/plugin-sdk/channel-message-runtime.ts", + "src/plugin-sdk/channel-message.ts", + "src/plugin-sdk/channel-test-helpers.ts", + "src/plugin-sdk/inbound-reply-dispatch.ts", + "src/plugin-sdk/outbound-runtime.ts", + "src/plugin-sdk/test-helpers/outbound-delivery.ts", + "src/plugin-sdk/testing.ts", + ], + message: "use sendDurableMessageBatch or deliverInboundReplyWithMessageSendContext", + }, +]; + +const selectedRuleIds = new Set( + process.argv + .slice(2) + .filter((arg) => arg.startsWith("--rule=")) + .map((arg) => arg.slice("--rule=".length)), +); + +const selectedRules = + selectedRuleIds.size === 0 ? rules : rules.filter((rule) => selectedRuleIds.has(rule.id)); +const unknownRuleIds = [...selectedRuleIds].filter((id) => !rules.some((rule) => rule.id === id)); + +if (unknownRuleIds.length > 0) { + console.error(`Unknown deprecated API usage rule(s): ${unknownRuleIds.join(", ")}`); + process.exit(1); +} + +const violations = selectedRules.flatMap((rule) => + collectRuleViolations(rule).map((violation) => `${rule.id}: ${violation}`), +); + +if (violations.length > 0) { + console.error("Deprecated API usage guard failed:"); + for (const violation of violations) { + console.error(`- ${violation}`); + } + process.exit(1); +} + +console.log("deprecated API usage guard passed"); diff --git a/scripts/check-deprecated-internal-config-api.mjs b/scripts/check-deprecated-internal-config-api.mjs deleted file mode 100644 index df541b9abd3..00000000000 --- a/scripts/check-deprecated-internal-config-api.mjs +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env node -import { collectDeprecatedInternalConfigApiViolations } from "./lib/deprecated-config-api-guard.mjs"; - -export function main() { - const violations = collectDeprecatedInternalConfigApiViolations(); - if (violations.length === 0) { - console.log("deprecated internal config API guard passed"); - return 0; - } - - console.error("Deprecated internal config API guard failed:"); - for (const violation of violations) { - console.error(`- ${violation}`); - } - return 1; -} - -if (import.meta.url === `file://${process.argv[1]}`) { - process.exitCode = main(); -} diff --git a/scripts/check.mjs b/scripts/check.mjs index b2235bcfa06..651a687a6fe 100644 --- a/scripts/check.mjs +++ b/scripts/check.mjs @@ -12,8 +12,8 @@ export async function main(argv = process.argv.slice(2)) { { name: "runtime action config guard", args: ["check:no-runtime-action-load-config"] }, !includeArchitecture ? { - name: "deprecated internal config API guard", - args: ["check:deprecated-internal-config-api"], + name: "deprecated API usage guard", + args: ["check:deprecated-api-usage"], } : null, { name: "temp path guard", args: ["check:temp-path-guardrails"] }, diff --git a/src/agents/command/delivery.test.ts b/src/agents/command/delivery.test.ts index 2e82a135ead..879fb192b0a 100644 --- a/src/agents/command/delivery.test.ts +++ b/src/agents/command/delivery.test.ts @@ -13,6 +13,7 @@ const deliverOutboundPayloadsMock = vi.hoisted(() => ); vi.mock("../../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: deliverOutboundPayloadsMock, + deliverOutboundPayloadsInternal: deliverOutboundPayloadsMock, })); const createReplyMediaPathNormalizerMock = vi.hoisted(() => @@ -228,7 +229,35 @@ describe("normalizeAgentCommandReplyPayloads", () => { it("does not report success when best-effort delivery records an error", async () => { deliverOutboundPayloadsMock.mockImplementationOnce(async (params: unknown) => { - (params as { onError?: (err: unknown) => void }).onError?.(new Error("send failed")); + ( + params as { + onError?: (err: unknown, payload: ReplyPayload) => void; + onPayloadDeliveryOutcome?: (outcome: { + index: number; + payload: ReplyPayload; + status: "failed"; + error: Error; + stage: "send"; + }) => void; + } + ).onError?.(new Error("send failed"), { text: "here you go" }); + ( + params as { + onPayloadDeliveryOutcome?: (outcome: { + index: number; + payload: ReplyPayload; + status: "failed"; + error: Error; + stage: "send"; + }) => void; + } + ).onPayloadDeliveryOutcome?.({ + index: 0, + payload: { text: "here you go" }, + status: "failed", + error: new Error("send failed"), + stage: "send", + }); return []; }); @@ -259,6 +288,12 @@ describe("normalizeAgentCommandReplyPayloads", () => { expect(delivered.deliverySucceeded).toBe(false); expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("send failed")); + expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith( + expect.objectContaining({ + bestEffort: true, + queuePolicy: "best_effort", + }), + ); }); it("threads agentId into the normalizer when sessionKey is unresolved", async () => { diff --git a/src/agents/command/delivery.ts b/src/agents/command/delivery.ts index 2cad2a18179..87cbf54e68f 100644 --- a/src/agents/command/delivery.ts +++ b/src/agents/command/delivery.ts @@ -3,6 +3,7 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js"; import { createReplyMediaPathNormalizer } from "../../auto-reply/reply/reply-media-paths.runtime.js"; +import { sendDurableMessageBatch } from "../../channels/message/runtime.js"; import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js"; import { createReplyPrefixContext } from "../../channels/reply-prefix.js"; import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js"; @@ -13,7 +14,6 @@ import { resolveAgentOutboundTarget, } from "../../infra/outbound/agent-delivery.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; -import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { buildOutboundResultEnvelope } from "../../infra/outbound/envelope.js"; import { createOutboundPayloadPlan, @@ -381,7 +381,7 @@ export async function deliverAgentCommandResult(params: { } if (deliver && deliveryChannel && !isInternalMessageChannel(deliveryChannel)) { if (deliveryTarget) { - await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg, channel: deliveryChannel, to: deliveryTarget, @@ -391,10 +391,17 @@ export async function deliverAgentCommandResult(params: { replyToId: resolvedReplyToId ?? null, threadId: resolvedThreadTarget ?? null, bestEffort: bestEffortDeliver, + durability: bestEffortDeliver ? "best_effort" : "required", onError: markDeliveryError, onPayload: logPayload, deps: createOutboundSendDeps(deps), }); + if (!bestEffortDeliver && (send.status === "failed" || send.status === "partial_failed")) { + throw send.error; + } + if (send.status === "failed" || send.status === "partial_failed") { + deliveryHadError = true; + } deliverySucceeded = !deliveryHadError; } } diff --git a/src/auto-reply/reply/route-reply.test.ts b/src/auto-reply/reply/route-reply.test.ts index ffcb93cf3d4..33072741faa 100644 --- a/src/auto-reply/reply/route-reply.test.ts +++ b/src/auto-reply/reply/route-reply.test.ts @@ -18,6 +18,12 @@ const mocks = vi.hoisted(() => ({ vi.mock("../../infra/outbound/deliver-runtime.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, +})); + +vi.mock("../../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, })); const { routeReply } = await import("./route-reply.js"); diff --git a/src/auto-reply/reply/route-reply.ts b/src/auto-reply/reply/route-reply.ts index 1c0dd5c7de8..10de1aa49e6 100644 --- a/src/auto-reply/reply/route-reply.ts +++ b/src/auto-reply/reply/route-reply.ts @@ -30,12 +30,12 @@ import { shouldSuppressReasoningPayload, } from "./reply-payloads.js"; -const deliverRuntimeLoader = createLazyImportLoader( - () => import("../../infra/outbound/deliver-runtime.js"), +const messageRuntimeLoader = createLazyImportLoader( + () => import("../../channels/message/runtime.js"), ); function loadDeliverRuntime() { - return deliverRuntimeLoader.load(); + return messageRuntimeLoader.load(); } export type RouteReplyParams = { @@ -215,7 +215,7 @@ export async function routeReply(params: RouteReplyParams): Promise vi.fn()); vi.mock("../../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads, + deliverOutboundPayloadsInternal: deliverOutboundPayloads, })); import { sendDurableMessageBatch, withDurableMessageSendContext } from "./send.js"; type DeliveryIntentCallbackParams = { onDeliveryIntent?: (intent: OutboundDeliveryIntent) => void; + onPayloadDeliveryOutcome?: (outcome: OutboundPayloadDeliveryOutcome) => void; }; const cfg = {} as OpenClawConfig; @@ -367,6 +371,211 @@ describe("withDurableMessageSendContext", () => { ); }); + it("reports hook-cancelled deliveries as explicit suppressed sends", async () => { + deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => { + params.onPayloadDeliveryOutcome?.({ + index: 0, + status: "suppressed", + reason: "cancelled_by_message_sending_hook", + hookEffect: { cancelReason: "owned-by-other-agent" }, + }); + return []; + }); + const onCommitReceipt = vi.fn(); + + const result = await sendDurableMessageBatch({ + cfg, + channel: "slack", + to: "C123", + payloads: [{ text: "claimed elsewhere" }], + onCommitReceipt, + }); + + expect(result).toEqual( + expect.objectContaining({ + status: "suppressed", + reason: "cancelled_by_message_sending_hook", + payloadOutcomes: [ + expect.objectContaining({ + status: "suppressed", + reason: "cancelled_by_message_sending_hook", + hookEffect: { cancelReason: "owned-by-other-agent" }, + }), + ], + }), + ); + expect(onCommitReceipt).toHaveBeenCalledWith( + expect.objectContaining({ platformMessageIds: [] }), + ); + }); + + it("forwards payload delivery outcomes to callers while collecting durable outcomes", async () => { + const onPayloadDeliveryOutcome = vi.fn(); + deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => { + params.onPayloadDeliveryOutcome?.({ + index: 0, + status: "suppressed", + reason: "cancelled_by_message_sending_hook", + }); + return []; + }); + + const result = await sendDurableMessageBatch({ + cfg, + channel: "slack", + to: "C123", + payloads: [{ text: "claimed elsewhere" }], + onPayloadDeliveryOutcome, + }); + + expect(result).toEqual( + expect.objectContaining({ + status: "suppressed", + payloadOutcomes: [ + expect.objectContaining({ + index: 0, + status: "suppressed", + reason: "cancelled_by_message_sending_hook", + }), + ], + }), + ); + expect(onPayloadDeliveryOutcome).toHaveBeenCalledWith( + expect.objectContaining({ + index: 0, + status: "suppressed", + reason: "cancelled_by_message_sending_hook", + }), + ); + }); + + it("reports zero-result failed best-effort payloads as failed sends", async () => { + const error = new Error("send failed"); + deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => { + params.onPayloadDeliveryOutcome?.({ + index: 0, + status: "failed", + error, + sentBeforeError: false, + stage: "platform_send", + }); + return []; + }); + const onCommitReceipt = vi.fn(); + const onSendFailure = vi.fn(); + + const result = await sendDurableMessageBatch({ + cfg, + channel: "telegram", + to: "chat-1", + payloads: [{ text: "hello" }], + bestEffort: true, + onCommitReceipt, + onSendFailure, + }); + + expect(result).toEqual( + expect.objectContaining({ + status: "failed", + error, + stage: "platform_send", + payloadOutcomes: [ + expect.objectContaining({ + index: 0, + status: "failed", + error, + stage: "platform_send", + }), + ], + }), + ); + expect(onCommitReceipt).not.toHaveBeenCalled(); + expect(onSendFailure).toHaveBeenCalledWith(error); + }); + + it("reports best-effort partial failures with the delivered receipt prefix", async () => { + const error = new Error("second payload failed"); + deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => { + params.onPayloadDeliveryOutcome?.({ + index: 0, + status: "sent", + results: [{ channel: "telegram", messageId: "msg-1" }], + }); + params.onPayloadDeliveryOutcome?.({ + index: 1, + status: "failed", + error, + sentBeforeError: true, + stage: "platform_send", + }); + return [{ channel: "telegram", messageId: "msg-1" }]; + }); + const onSendFailure = vi.fn(); + + const result = await sendDurableMessageBatch({ + cfg, + channel: "telegram", + to: "chat-1", + payloads: [{ text: "first" }, { text: "second" }], + onSendFailure, + }); + + expect(result).toEqual( + expect.objectContaining({ + status: "partial_failed", + results: [{ channel: "telegram", messageId: "msg-1" }], + receipt: expect.objectContaining({ platformMessageIds: ["msg-1"] }), + error, + sentBeforeError: true, + }), + ); + expect(onSendFailure).toHaveBeenCalledWith(error); + }); + + it("maps thrown outbound partial delivery errors to partial_failed", async () => { + const cause = new Error("network reset"); + const error = new OutboundDeliveryError("network reset", { + cause, + results: [{ channel: "telegram", messageId: "msg-1" }], + payloadOutcomes: [ + { + index: 0, + status: "sent", + results: [{ channel: "telegram", messageId: "msg-1" }], + }, + { + index: 1, + status: "failed", + error: cause, + sentBeforeError: true, + stage: "platform_send", + }, + ], + stage: "platform_send", + }); + deliverOutboundPayloads.mockRejectedValueOnce(error); + const onSendFailure = vi.fn(); + + const result = await sendDurableMessageBatch({ + cfg, + channel: "telegram", + to: "chat-1", + payloads: [{ text: "first" }, { text: "second" }], + onSendFailure, + }); + + expect(result).toEqual( + expect.objectContaining({ + status: "partial_failed", + results: [{ channel: "telegram", messageId: "msg-1" }], + receipt: expect.objectContaining({ platformMessageIds: ["msg-1"] }), + error, + sentBeforeError: true, + }), + ); + expect(onSendFailure).toHaveBeenCalledWith(error); + }); + it("runs the failure hook when send-context orchestration throws", async () => { const onSendFailure = vi.fn(); const error = new Error("boom"); diff --git a/src/channels/message/send.ts b/src/channels/message/send.ts index 052d56b1f46..0035f6fcf9c 100644 --- a/src/channels/message/send.ts +++ b/src/channels/message/send.ts @@ -2,7 +2,11 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import { formatErrorMessage } from "../../infra/errors.js"; import type { OutboundDeliveryResult } from "../../infra/outbound/deliver-types.js"; import { - deliverOutboundPayloads, + isOutboundDeliveryError, + type OutboundPayloadDeliveryOutcome, +} from "../../infra/outbound/deliver-types.js"; +import { + deliverOutboundPayloadsInternal, type DeliverOutboundPayloadsParams, type OutboundDeliveryIntent, } from "../../infra/outbound/deliver.js"; @@ -33,21 +37,71 @@ export type DurableMessageBatchSendParams = Omit< previousReceipt?: MessageReceipt; }; +export type DurableMessageSuppressionReason = + | "cancelled_by_message_sending_hook" + | "empty_after_message_sending_hook" + | "no_visible_payload" + | "adapter_returned_no_identity" + | "no_visible_result"; + +export type DurableMessageFailureStage = "platform_send" | "queue" | "unknown"; + +export type DurableMessagePayloadDeliveryOutcome = + | { + index: number; + status: "sent"; + results: OutboundDeliveryResult[]; + } + | { + index: number; + status: "suppressed"; + reason: DurableMessageSuppressionReason; + hookEffect?: { + cancelReason?: string; + metadata?: Record; + }; + } + | { + index: number; + status: "failed"; + error: unknown; + sentBeforeError: boolean; + stage: DurableMessageFailureStage; + }; + export type DurableMessageBatchSendResult = | { status: "sent"; results: OutboundDeliveryResult[]; receipt: MessageReceipt; deliveryIntent?: OutboundDeliveryIntent; + payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[]; } | { status: "suppressed"; results: []; receipt: MessageReceipt; deliveryIntent?: OutboundDeliveryIntent; - reason: "no_visible_result"; + reason: DurableMessageSuppressionReason; + payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[]; } - | { status: "failed"; error: unknown }; + | { + status: "partial_failed"; + results: OutboundDeliveryResult[]; + receipt: MessageReceipt; + error: unknown; + sentBeforeError: true; + deliveryIntent?: OutboundDeliveryIntent; + payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[]; + } + | { + status: "failed"; + error: unknown; + stage?: DurableMessageFailureStage; + payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[]; + }; + +export type DurableMessageDeliveryOutcome = DurableMessageBatchSendResult; const neverAbortedSignal = new AbortController().signal; @@ -65,6 +119,18 @@ function toDurableMessageIntent( }; } +function toDurablePayloadOutcome( + outcome: OutboundPayloadDeliveryOutcome, +): DurableMessagePayloadDeliveryOutcome { + return outcome; +} + +function toDurablePayloadOutcomes( + outcomes: readonly OutboundPayloadDeliveryOutcome[], +): DurableMessagePayloadDeliveryOutcome[] { + return outcomes.map((outcome) => toDurablePayloadOutcome(outcome)); +} + export type DurableMessageSendContextParams = DurableMessageBatchSendParams & { durability?: Exclude; preview?: LiveMessageState; @@ -99,6 +165,7 @@ export async function withDurableMessageSendContext( onCommitReceipt, onPreviewUpdate, onSendFailure, + onPayloadDeliveryOutcome, payloads, preview, previousReceipt, @@ -129,13 +196,20 @@ export async function withDurableMessageSendContext( return liveState; }, send: async (rendered): Promise => { + const payloadOutcomes: OutboundPayloadDeliveryOutcome[] = []; + const durablePayloadOutcomes = (): DurableMessagePayloadDeliveryOutcome[] => + toDurablePayloadOutcomes(payloadOutcomes); try { - const results = await deliverOutboundPayloads({ + const results = await deliverOutboundPayloadsInternal({ ...deliveryParams, payloads: rendered.payloads, renderedBatchPlan: rendered.plan, queuePolicy, ...(effectiveSignal ? { abortSignal: effectiveSignal } : {}), + onPayloadDeliveryOutcome: (outcome) => { + payloadOutcomes.push(outcome); + onPayloadDeliveryOutcome?.(outcome); + }, onDeliveryIntent: (intent) => { deliveryIntent = intent; ctx.intent = toDurableMessageIntent(intent, rendered); @@ -146,13 +220,36 @@ export async function withDurableMessageSendContext( threadId: params.threadId == null ? undefined : String(params.threadId), replyToId: params.replyToId ?? undefined, }); + const failedOutcome = payloadOutcomes.find((outcome) => outcome.status === "failed"); + if (failedOutcome) { + if (results.length > 0) { + return { + status: "partial_failed", + results, + receipt, + error: failedOutcome.error, + sentBeforeError: true, + ...(deliveryIntent ? { deliveryIntent } : {}), + ...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}), + }; + } + return { + status: "failed", + error: failedOutcome.error, + stage: failedOutcome.stage, + ...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}), + }; + } if (results.length === 0) { return { status: "suppressed", results: [], receipt, ...(deliveryIntent ? { deliveryIntent } : {}), - reason: "no_visible_result", + reason: + payloadOutcomes.find((outcome) => outcome.status === "suppressed")?.reason ?? + "no_visible_result", + ...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}), }; } return { @@ -160,8 +257,37 @@ export async function withDurableMessageSendContext( results, receipt, ...(deliveryIntent ? { deliveryIntent } : {}), + ...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}), }; } catch (error: unknown) { + if (isOutboundDeliveryError(error)) { + if (error.results.length > 0) { + const receipt = createMessageReceiptFromOutboundResults({ + results: error.results, + threadId: params.threadId == null ? undefined : String(params.threadId), + replyToId: params.replyToId ?? undefined, + }); + return { + status: "partial_failed", + results: error.results, + receipt, + error, + sentBeforeError: true, + ...(deliveryIntent ? { deliveryIntent } : {}), + ...(error.payloadOutcomes.length > 0 + ? { payloadOutcomes: toDurablePayloadOutcomes(error.payloadOutcomes) } + : {}), + }; + } + return { + status: "failed", + error, + stage: error.stage, + ...(error.payloadOutcomes.length > 0 + ? { payloadOutcomes: toDurablePayloadOutcomes(error.payloadOutcomes) } + : {}), + }; + } return { status: "failed", error }; } }, @@ -213,7 +339,7 @@ export async function sendDurableMessageBatch( return await withDurableMessageSendContext(params, async (ctx) => { const rendered = await ctx.render(); const result = await ctx.send(rendered); - if (result.status !== "failed") { + if (result.status === "sent" || result.status === "suppressed") { await ctx.commit(result.receipt); } else { await ctx.fail(result.error); diff --git a/src/channels/turn/durable-delivery.test.ts b/src/channels/turn/durable-delivery.test.ts index 6211dc253ad..c48a5d41f81 100644 --- a/src/channels/turn/durable-delivery.test.ts +++ b/src/channels/turn/durable-delivery.test.ts @@ -165,4 +165,34 @@ describe("durable inbound reply delivery", () => { }), ); }); + + it("reports durable partial send failures as failed delivery", async () => { + const error = new Error("second chunk failed"); + mocks.sendDurableMessageBatch.mockResolvedValueOnce({ + status: "partial_failed", + results: [{ channel: "telegram", messageId: "m1" }], + receipt: { + primaryPlatformMessageId: "m1", + platformMessageIds: ["m1"], + parts: [{ platformMessageId: "m1", kind: "text", index: 0 }], + sentAt: 1, + }, + error, + sentBeforeError: true, + }); + + const result = await deliverInboundReplyWithMessageSendContext({ + cfg: {}, + channel: "telegram", + agentId: "main", + info: { kind: "final" }, + payload: { text: "final" }, + ctxPayload: { + CommandAuthorized: true, + OriginatingTo: "chat-1", + }, + }); + + expect(result).toEqual({ status: "failed", error }); + }); }); diff --git a/src/channels/turn/durable-delivery.ts b/src/channels/turn/durable-delivery.ts index b9501b93b17..bc9d9e7d7af 100644 --- a/src/channels/turn/durable-delivery.ts +++ b/src/channels/turn/durable-delivery.ts @@ -191,6 +191,9 @@ export async function deliverInboundReplyWithMessageSendContext( if (send.status === "failed") { return { status: "failed" as const, error: send.error }; } + if (send.status === "partial_failed") { + return { status: "failed" as const, error: send.error }; + } const delivery = createChannelDeliveryResultFromReceipt({ receipt: send.receipt, diff --git a/src/commands/agent.delivery.test.ts b/src/commands/agent.delivery.test.ts index 56042cf7b62..0f2639fb091 100644 --- a/src/commands/agent.delivery.test.ts +++ b/src/commands/agent.delivery.test.ts @@ -20,6 +20,7 @@ vi.mock("../channels/plugins/index.js", () => ({ vi.mock("../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, })); vi.mock("../infra/outbound/targets.js", async () => { diff --git a/src/cron/delivery.failure-notify.test.ts b/src/cron/delivery.failure-notify.test.ts index 9755895b269..5e217692d7b 100644 --- a/src/cron/delivery.failure-notify.test.ts +++ b/src/cron/delivery.failure-notify.test.ts @@ -15,6 +15,7 @@ vi.mock("./isolated-agent/delivery-target.js", () => ({ vi.mock("../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, })); vi.mock("../infra/outbound/identity.js", () => ({ diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index d59e2f6fe26..2164004cc32 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -1,8 +1,8 @@ +import { sendDurableMessageBatch } from "../channels/message/runtime.js"; import type { CliDeps } from "../cli/deps.types.js"; import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import type { OpenClawConfig } from "../config/types.js"; import { formatErrorMessage } from "../infra/errors.js"; -import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { getChildLogger } from "../logging.js"; @@ -94,7 +94,7 @@ async function deliverCronAnnouncePayload(params: { message: string; abortSignal: AbortSignal; }): Promise { - await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg: params.cfg, channel: params.delivery.resolvedTarget.channel, to: params.delivery.resolvedTarget.to, @@ -105,8 +105,11 @@ async function deliverCronAnnouncePayload(params: { identity: params.delivery.identity, bestEffort: false, deps: createOutboundSendDeps(params.deps), - abortSignal: params.abortSignal, + signal: params.abortSignal, }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } } export async function sendCronAnnouncePayloadStrict(params: { diff --git a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts index 7315d11c055..7664213bb52 100644 --- a/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts +++ b/src/cron/isolated-agent/delivery-dispatch.double-announce.test.ts @@ -15,12 +15,17 @@ import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js"; // --- Module mocks (must be hoisted before imports) --- -const { countActiveDescendantRunsMock, maybeApplyTtsToPayloadMock, retireSessionMcpRuntimeMock } = - vi.hoisted(() => ({ - countActiveDescendantRunsMock: vi.fn().mockReturnValue(0), - maybeApplyTtsToPayloadMock: vi.fn(async (params: { payload: unknown }) => params.payload), - retireSessionMcpRuntimeMock: vi.fn().mockResolvedValue(true), - })); +const { + countActiveDescendantRunsMock, + deliverOutboundPayloadsMock, + maybeApplyTtsToPayloadMock, + retireSessionMcpRuntimeMock, +} = vi.hoisted(() => ({ + countActiveDescendantRunsMock: vi.fn().mockReturnValue(0), + deliverOutboundPayloadsMock: vi.fn().mockResolvedValue([{ ok: true }]), + maybeApplyTtsToPayloadMock: vi.fn(async (params: { payload: unknown }) => params.payload), + retireSessionMcpRuntimeMock: vi.fn().mockResolvedValue(true), +})); vi.mock("../../config/sessions/main-session.js", () => ({ resolveAgentMainSessionKey: vi.fn(({ agentId }: { agentId: string }) => `agent:${agentId}:main`), @@ -40,7 +45,8 @@ vi.mock("./delivery-subagent-registry.runtime.js", () => ({ })); vi.mock("../../infra/outbound/deliver.js", () => ({ - deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]), + deliverOutboundPayloads: deliverOutboundPayloadsMock, + deliverOutboundPayloadsInternal: deliverOutboundPayloadsMock, })); vi.mock("../../infra/outbound/identity.js", () => ({ diff --git a/src/cron/isolated-agent/delivery-dispatch.ts b/src/cron/isolated-agent/delivery-dispatch.ts index e9cee927b3b..ad97c410559 100644 --- a/src/cron/isolated-agent/delivery-dispatch.ts +++ b/src/cron/isolated-agent/delivery-dispatch.ts @@ -556,8 +556,8 @@ export async function dispatchCronDelivery( const { buildOutboundSessionContext, createOutboundSendDeps, - deliverOutboundPayloads, resolveAgentOutboundIdentity, + sendDurableMessageBatch, } = await loadDeliveryOutboundRuntime(); const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId); const deliveryIdempotencyKey = buildDirectCronDeliveryIdempotencyKey({ @@ -660,8 +660,8 @@ export async function dispatchCronDelivery( } : undefined; - const runDelivery = async () => - await deliverOutboundPayloads({ + const runDelivery = async () => { + const send = await sendDurableMessageBatch({ cfg: params.cfgWithAgentDefaults, channel: delivery.channel, to: delivery.to, @@ -671,8 +671,9 @@ export async function dispatchCronDelivery( session: deliverySession, identity, bestEffort: params.deliveryBestEffort, + durability: params.deliveryBestEffort ? "best_effort" : "required", deps: createOutboundSendDeps(params.deps), - abortSignal: params.abortSignal, + signal: params.abortSignal, onError, // Isolated cron direct delivery uses its own transient retry loop. // Keep all attempts out of the write-ahead delivery queue so a @@ -681,6 +682,17 @@ export async function dispatchCronDelivery( // See: https://github.com/openclaw/openclaw/issues/40545 skipQueue: true, }); + if ( + send.status === "failed" || + (!params.deliveryBestEffort && send.status === "partial_failed") + ) { + throw send.error; + } + if (send.status === "partial_failed") { + hadPartialFailure = true; + } + return send.status === "sent" || send.status === "partial_failed" ? send.results : []; + }; const deliveryResults = options?.retryTransient ? await retryTransientDirectCronDelivery({ jobId: params.job.id, diff --git a/src/cron/isolated-agent/delivery-outbound.runtime.ts b/src/cron/isolated-agent/delivery-outbound.runtime.ts index fe8244b13aa..3ef659decdf 100644 --- a/src/cron/isolated-agent/delivery-outbound.runtime.ts +++ b/src/cron/isolated-agent/delivery-outbound.runtime.ts @@ -1,8 +1,6 @@ export { createOutboundSendDeps } from "../../cli/outbound-send-deps.js"; -export { - deliverOutboundPayloads, - type OutboundDeliveryResult, -} from "../../infra/outbound/deliver.js"; +export { sendDurableMessageBatch } from "../../channels/message/runtime.js"; +export { type OutboundDeliveryResult } from "../../infra/outbound/deliver.js"; export { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js"; export { buildOutboundSessionContext } from "../../infra/outbound/session-context.js"; export { enqueueSystemEvent } from "../../infra/system-events.js"; diff --git a/src/gateway/server-methods/send.test.ts b/src/gateway/server-methods/send.test.ts index 3073f6a4e21..7ed72cc4df7 100644 --- a/src/gateway/server-methods/send.test.ts +++ b/src/gateway/server-methods/send.test.ts @@ -99,6 +99,7 @@ vi.mock("../../infra/outbound/channel-selection.js", () => ({ vi.mock("../../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, })); vi.mock("../../config/sessions.js", async () => { diff --git a/src/gateway/server-methods/send.ts b/src/gateway/server-methods/send.ts index 1cd4e1f75c8..196a3877feb 100644 --- a/src/gateway/server-methods/send.ts +++ b/src/gateway/server-methods/send.ts @@ -1,4 +1,5 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js"; +import { sendDurableMessageBatch } from "../../channels/message/runtime.js"; import { normalizeChannelId } from "../../channels/plugins/index.js"; import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js"; import { createOutboundSendDeps } from "../../cli/deps.js"; @@ -6,7 +7,6 @@ import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { resolveOutboundChannelPlugin } from "../../infra/outbound/channel-resolution.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; -import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js"; import { ensureOutboundSessionEntry, resolveOutboundSessionRoute, @@ -538,7 +538,7 @@ export const sendHandlers: GatewayRequestHandlers = { sessionKey: outboundSessionKey, conversationType: outboundRoute?.chatType, }); - const results = await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg, channel: outboundChannel, to: deliveryTarget, @@ -560,6 +560,10 @@ export const sendHandlers: GatewayRequestHandlers = { } : undefined, }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } + const results = send.status === "sent" ? send.results : []; const result = results.at(-1); if (!result) { diff --git a/src/gateway/server-node-events.runtime.ts b/src/gateway/server-node-events.runtime.ts index a602e0ac489..1c2f221ef2f 100644 --- a/src/gateway/server-node-events.runtime.ts +++ b/src/gateway/server-node-events.runtime.ts @@ -1,13 +1,13 @@ export { resolveSessionAgentId } from "../agents/agent-scope.js"; export { sanitizeInboundSystemTags } from "../auto-reply/reply/inbound-text.js"; export { normalizeChannelId } from "../channels/plugins/index.js"; +export { sendDurableMessageBatch } from "../channels/message/runtime.js"; export { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; export { agentCommandFromIngress } from "../commands/agent.js"; export { getRuntimeConfig } from "../config/io.js"; export { updateSessionStore } from "../config/sessions.js"; export { loadOrCreateDeviceIdentity } from "../infra/device-identity.js"; export { requestHeartbeat } from "../infra/heartbeat-wake.js"; -export { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; export { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; export { resolveOutboundTarget } from "../infra/outbound/targets.js"; export { registerApnsRegistration } from "../infra/push-apns.js"; diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 499feffa02d..e44ce6b3565 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -19,7 +19,6 @@ import { createOutboundSendDeps, defaultRuntime, deleteMediaBuffer, - deliverOutboundPayloads, enqueueSystemEvent, formatForLog, getRuntimeConfig, @@ -39,6 +38,7 @@ import { resolveSessionModelRef, sanitizeInboundSystemTags, scopedHeartbeatWakeOptions, + sendDurableMessageBatch, updateSessionStore, } from "./server-node-events.runtime.js"; @@ -345,15 +345,19 @@ async function sendReceiptAck(params: { cfg: params.cfg, sessionKey: params.sessionKey, }); - await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg: params.cfg, channel: params.channel, to: resolved.to, payloads: [{ text: params.text }], session, bestEffort: true, + durability: "best_effort", deps: createOutboundSendDeps(params.deps), }); + if (send.status === "failed") { + throw send.error; + } } export const handleNodeEvent = async ( diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index cb6d887e5f7..f4adaddaac9 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -110,6 +110,16 @@ const mocks = vi.hoisted(() => { skippedMaxRetries: 0, deferredBackoff: 0, })), + resolveAgentConfig: vi.fn(() => undefined), + resolveAgentWorkspaceDir: vi.fn(() => "/tmp/openclaw-test-workspace"), + resolveDefaultAgentId: vi.fn(() => "main"), + normalizeSessionDeliveryFields: vi.fn((source?: Record) => ({ + deliveryContext: source?.deliveryContext, + lastChannel: source?.lastChannel ?? source?.channel, + lastTo: source?.lastTo, + lastAccountId: source?.lastAccountId, + lastThreadId: source?.lastThreadId, + })), injectTimestamp: vi.fn((message: string) => `stamped:${message}`), timestampOptsFromConfig: vi.fn(() => ({})), recordInboundSessionAndDispatchReply: vi.fn( @@ -124,9 +134,18 @@ const mocks = vi.hoisted(() => { vi.unmock("./server-restart-sentinel.js"); vi.resetModules(); -vi.mock("../agents/agent-scope.js", () => ({ - resolveSessionAgentId: mocks.resolveSessionAgentId, -})); +vi.mock("../agents/agent-scope.js", async () => { + const actual = await vi.importActual( + "../agents/agent-scope.js", + ); + return { + ...actual, + resolveAgentConfig: mocks.resolveAgentConfig, + resolveAgentWorkspaceDir: mocks.resolveAgentWorkspaceDir, + resolveDefaultAgentId: mocks.resolveDefaultAgentId, + resolveSessionAgentId: mocks.resolveSessionAgentId, + }; +}); vi.mock("../infra/restart-sentinel.js", () => ({ readRestartSentinel: mocks.readRestartSentinel, @@ -147,6 +166,7 @@ vi.mock("../config/sessions.js", () => ({ })); vi.mock("../config/sessions/thread-info.js", () => ({ + parseSessionThreadInfoFast: mocks.parseSessionThreadInfo, parseSessionThreadInfo: mocks.parseSessionThreadInfo, })); @@ -157,6 +177,7 @@ vi.mock("./session-utils.js", () => ({ vi.mock("../utils/delivery-context.shared.js", () => ({ deliveryContextFromSession: mocks.deliveryContextFromSession, mergeDeliveryContext: mocks.mergeDeliveryContext, + normalizeSessionDeliveryFields: mocks.normalizeSessionDeliveryFields, })); vi.mock("../channels/plugins/index.js", async () => { @@ -176,12 +197,34 @@ vi.mock("../channels/plugins/index.js", async () => { }; }); +vi.mock("../channels/turn/kernel.js", () => ({ + dispatchAssembledChannelTurn: async (params: { + delivery: { + preparePayload?: (payload: { text?: string; replyToId?: string | null }) => { + text?: string; + replyToId?: string | null; + }; + deliver: (payload: { text?: string; replyToId?: string | null }) => Promise; + onError?: (err: unknown, info: { kind: string }) => void; + }; + }) => { + await mocks.recordInboundSessionAndDispatchReply({ + ...params, + deliver: async (payload: { text?: string; replyToId?: string | null }) => + params.delivery.deliver(params.delivery.preparePayload?.(payload) ?? payload), + onDispatchError: (err: unknown, info: { kind: string }) => + params.delivery.onError?.(err, info), + } as unknown as RecordInboundSessionAndDispatchReplyParams); + }, +})); + vi.mock("../infra/outbound/targets.js", () => ({ resolveOutboundTarget: mocks.resolveOutboundTarget, })); vi.mock("../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, })); vi.mock("../infra/outbound/delivery-queue.js", () => ({ @@ -213,6 +256,7 @@ vi.mock("../logging/subsystem.js", () => { info: mocks.logInfo, warn: mocks.logWarn, error: mocks.logError, + isEnabled: vi.fn(() => false), child: vi.fn(), }; logger.child.mockReturnValue(logger); diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index cd9a9da0c0c..8eb1f9b0a30 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -2,14 +2,15 @@ import { resolveSessionAgentId } from "../agents/agent-scope.js"; import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js"; import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js"; import type { ChatType } from "../channels/chat-type.js"; +import { sendDurableMessageBatch } from "../channels/message/runtime.js"; import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js"; import { recordInboundSession } from "../channels/session.js"; +import { dispatchAssembledChannelTurn } from "../channels/turn/kernel.js"; import type { CliDeps } from "../cli/deps.types.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; import { parseSessionThreadInfo } from "../config/sessions/thread-info.js"; import { formatErrorMessage } from "../infra/errors.js"; import { requestHeartbeat } from "../infra/heartbeat-wake.js"; -import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { ackDelivery, enqueueDelivery, failDelivery } from "../infra/outbound/delivery-queue.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; @@ -34,7 +35,6 @@ import { } from "../infra/session-delivery-queue.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { recordChannelMessageReplyDispatch } from "../plugin-sdk/channel-message.js"; import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js"; import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js"; import { @@ -115,7 +115,7 @@ async function deliverRestartSentinelNotice(params: { }).catch(() => null); for (let attempt = 1; attempt <= OUTBOUND_MAX_ATTEMPTS; attempt += 1) { try { - const results = await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg: params.cfg, channel: params.channel, to: params.to, @@ -128,6 +128,10 @@ async function deliverRestartSentinelNotice(params: { bestEffort: false, skipQueue: true, }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } + const results = send.status === "sent" ? send.results : []; if (results.length > 0) { if (queueId) { await ackDelivery(queueId).catch(() => {}); @@ -272,73 +276,95 @@ async function deliverQueuedSessionDelivery(params: { config: cfg, }); let dispatchError: unknown; - await recordChannelMessageReplyDispatch({ + const ctxPayload = finalizeInboundContext( + { + Body: userMessage, + BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(cfg)), + BodyForCommands: "", + RawBody: userMessage, + CommandBody: "", + SessionKey: canonicalKey, + AccountId: route.accountId, + MessageSid: messageId, + Timestamp: Date.now(), + Provider: route.channel, + Surface: route.channel, + ChatType: route.chatType, + CommandAuthorized: false, + ReplyToId: route.replyToId, + OriginatingChannel: route.channel, + OriginatingTo: route.to, + ExplicitDeliverRoute: true, + MessageThreadId: route.threadId, + }, + { + forceBodyForCommands: true, + forceChatType: true, + }, + ); + await dispatchAssembledChannelTurn({ cfg, channel: route.channel, accountId: route.accountId, agentId, routeSessionKey: canonicalKey, storePath, - ctxPayload: finalizeInboundContext( - { - Body: userMessage, - BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(cfg)), - BodyForCommands: "", - RawBody: userMessage, - CommandBody: "", - SessionKey: canonicalKey, - AccountId: route.accountId, - MessageSid: messageId, - Timestamp: Date.now(), - Provider: route.channel, - Surface: route.channel, - ChatType: route.chatType, - CommandAuthorized: false, - ReplyToId: route.replyToId, - OriginatingChannel: route.channel, - OriginatingTo: route.to, - ExplicitDeliverRoute: true, - MessageThreadId: route.threadId, - }, - { - forceBodyForCommands: true, - forceChatType: true, - }, - ), + ctxPayload, recordInboundSession, dispatchReplyWithBufferedBlockDispatcher, - deliver: async (payload) => { - const outboundPayload = resolveRestartContinuationOutboundPayload({ - payload, - messageId, - replyToId: route.replyToId, - }); - const results = await deliverOutboundPayloads({ - cfg, - channel: route.channel, - to: route.to, - accountId: route.accountId, - replyToId: route.replyToId, - threadId: route.threadId, - payloads: [outboundPayload], - session: buildOutboundSessionContext({ - cfg, - sessionKey: canonicalKey, + delivery: { + preparePayload: (payload) => + resolveRestartContinuationOutboundPayload({ + payload, + messageId, + replyToId: route.replyToId, }), - deps: params.deps, - bestEffort: false, - }); - if (results.length === 0) { - throw new Error("restart continuation delivery returned no results"); - } + durable: (_payload, info) => + info.kind === "final" + ? { + to: route.to, + replyToId: route.replyToId, + threadId: route.threadId, + deps: params.deps, + } + : false, + deliver: async (payload) => { + const send = await sendDurableMessageBatch({ + cfg, + channel: route.channel, + to: route.to, + accountId: route.accountId, + replyToId: route.replyToId, + threadId: route.threadId, + payloads: [payload], + session: buildOutboundSessionContext({ + cfg, + sessionKey: canonicalKey, + }), + deps: params.deps, + bestEffort: false, + }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } + const results = send.status === "sent" ? send.results : []; + if (results.length === 0) { + throw new Error("restart continuation delivery returned no results"); + } + }, + onError: (err, info) => { + dispatchError ??= err; + log.warn(`restart continuation dispatch failed during ${info.kind}: ${String(err)}`, { + sessionKey: canonicalKey, + }); + }, }, - onRecordError: (err) => { - log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, { - sessionKey: canonicalKey, - }); - }, - onDispatchError: (err) => { - dispatchError ??= err; + record: { + onRecordError: (err) => { + log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, { + sessionKey: canonicalKey, + }); + }, }, }); if (dispatchError) { diff --git a/src/gateway/server-runtime-services.test.ts b/src/gateway/server-runtime-services.test.ts index 52fceb8d9df..9e5a6d8e7e9 100644 --- a/src/gateway/server-runtime-services.test.ts +++ b/src/gateway/server-runtime-services.test.ts @@ -30,6 +30,7 @@ vi.mock("../infra/env.js", () => ({ vi.mock("../infra/outbound/deliver.js", () => ({ deliverOutboundPayloads: hoisted.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: hoisted.deliverOutboundPayloads, })); vi.mock("../infra/outbound/delivery-queue.js", () => ({ diff --git a/src/gateway/server-runtime-services.ts b/src/gateway/server-runtime-services.ts index d5d0847e28f..752fe49fd59 100644 --- a/src/gateway/server-runtime-services.ts +++ b/src/gateway/server-runtime-services.ts @@ -159,10 +159,10 @@ function recoverPendingOutboundDeliveries(params: { }): void { void (async () => { const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js"); - const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js"); + const { deliverOutboundPayloadsInternal } = await import("../infra/outbound/deliver.js"); const logRecovery = params.log.child("delivery-recovery"); await recoverPendingDeliveries({ - deliver: deliverOutboundPayloads, + deliver: deliverOutboundPayloadsInternal, log: logRecovery, cfg: params.cfg, }); diff --git a/src/infra/exec-approval-forwarder.runtime.ts b/src/infra/exec-approval-forwarder.runtime.ts index 16457f5fb81..0efdf8bb159 100644 --- a/src/infra/exec-approval-forwarder.runtime.ts +++ b/src/infra/exec-approval-forwarder.runtime.ts @@ -1,2 +1,2 @@ export { resolveExecApprovalSessionTarget } from "./exec-approval-session-target.js"; -export { deliverOutboundPayloads } from "./outbound/deliver.js"; +export { sendDurableMessageBatch } from "../channels/message/runtime.js"; diff --git a/src/infra/exec-approval-forwarder.ts b/src/infra/exec-approval-forwarder.ts index bad4a857560..4c0ce35a025 100644 --- a/src/infra/exec-approval-forwarder.ts +++ b/src/infra/exec-approval-forwarder.ts @@ -44,7 +44,8 @@ import { } from "./plugin-approvals.js"; const log = createSubsystemLogger("gateway/exec-approvals"); -type DeliverOutboundPayloads = typeof import("./outbound/deliver.js").deliverOutboundPayloads; +type DeliverApprovalPayloads = + typeof import("../channels/message/runtime.js").sendDurableMessageBatch; type MaybePromise = T | Promise; type ResolveSessionTargetFn = (params: { cfg: OpenClawConfig; @@ -130,7 +131,7 @@ export type ExecApprovalForwarder = { type ExecApprovalForwarderDeps = { getConfig?: () => OpenClawConfig; - deliver?: DeliverOutboundPayloads; + deliver?: DeliverApprovalPayloads; nowMs?: () => number; resolveSessionTarget?: ResolveSessionTargetFn; }; @@ -360,7 +361,7 @@ async function deliverToTargets(params: { cfg: OpenClawConfig; targets: ForwardTarget[]; buildPayload: (target: ForwardTarget) => ReplyPayload; - deliver: DeliverOutboundPayloads; + deliver: DeliverApprovalPayloads; beforeDeliver?: (target: ForwardTarget, payload: ReplyPayload) => Promise | void; shouldSend?: () => boolean; }) { @@ -375,7 +376,7 @@ async function deliverToTargets(params: { try { const payload = params.buildPayload(target); await params.beforeDeliver?.(target, payload); - await params.deliver({ + const send = await params.deliver({ cfg: params.cfg, channel, to: target.to, @@ -383,6 +384,9 @@ async function deliverToTargets(params: { threadId: target.threadId, payloads: [payload], }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } } catch (err) { log.error(`exec approvals: failed to deliver to ${channel}:${target.to}: ${String(err)}`); } @@ -529,7 +533,7 @@ function createApprovalHandlers< >(params: { strategy: ApprovalStrategy; getConfig: () => OpenClawConfig; - deliver: DeliverOutboundPayloads; + deliver: DeliverApprovalPayloads; nowMs: () => number; resolveSessionTarget: ResolveSessionTargetFn; }) { @@ -772,8 +776,8 @@ export function createExecApprovalForwarder( const deliver = deps.deliver ?? (async (params) => { - const { deliverOutboundPayloads } = await loadExecApprovalForwarderRuntime(); - return deliverOutboundPayloads(params); + const { sendDurableMessageBatch } = await loadExecApprovalForwarderRuntime(); + return sendDurableMessageBatch(params); }); const nowMs = deps.nowMs ?? Date.now; const resolveSessionTarget = deps.resolveSessionTarget ?? defaultResolveSessionTarget; diff --git a/src/infra/heartbeat-runner.isolated-key-stability.test.ts b/src/infra/heartbeat-runner.isolated-key-stability.test.ts index 2c30097d512..cbe6bc09cfa 100644 --- a/src/infra/heartbeat-runner.isolated-key-stability.test.ts +++ b/src/infra/heartbeat-runner.isolated-key-stability.test.ts @@ -12,7 +12,8 @@ import { } from "./system-events.js"; vi.mock("./outbound/deliver.js", () => ({ - deliverOutboundPayloads: vi.fn().mockResolvedValue(undefined), + deliverOutboundPayloads: vi.fn().mockResolvedValue([]), + deliverOutboundPayloadsInternal: vi.fn().mockResolvedValue([]), })); afterEach(() => { diff --git a/src/infra/heartbeat-runner.model-override.test.ts b/src/infra/heartbeat-runner.model-override.test.ts index 8887e2c81b9..5416ecafef1 100644 --- a/src/infra/heartbeat-runner.model-override.test.ts +++ b/src/infra/heartbeat-runner.model-override.test.ts @@ -9,7 +9,8 @@ import { } from "./heartbeat-runner.test-utils.js"; vi.mock("./outbound/deliver.js", () => ({ - deliverOutboundPayloads: vi.fn().mockResolvedValue(undefined), + deliverOutboundPayloads: vi.fn().mockResolvedValue([]), + deliverOutboundPayloadsInternal: vi.fn().mockResolvedValue([]), })); type SeedSessionInput = { diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index a82138c1f50..3bf1e208b93 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -38,6 +38,7 @@ import { resolveDefaultModel } from "../auto-reply/reply/directive-handling.defa import { resolveResponsePrefixTemplate } from "../auto-reply/reply/response-prefix-template.js"; import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js"; import type { ReplyPayload } from "../auto-reply/types.js"; +import { sendDurableMessageBatch } from "../channels/message/runtime.js"; import { getChannelPlugin } from "../channels/plugins/index.js"; import type { ChannelHeartbeatDeps, @@ -129,7 +130,6 @@ import { setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; -import { deliverOutboundPayloads } from "./outbound/deliver.js"; import { buildOutboundSessionContext } from "./outbound/session-context.js"; import { resolveHeartbeatDeliveryTarget, @@ -1594,7 +1594,7 @@ export async function runHeartbeatOnce(opts: { return false; } } - await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg, channel: delivery.channel, to: delivery.to, @@ -1604,6 +1604,9 @@ export async function runHeartbeatOnce(opts: { session: outboundSession, deps: opts.deps, }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } return true; }; @@ -1863,7 +1866,7 @@ export async function runHeartbeatOnce(opts: { } } - await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg, channel: delivery.channel, to: delivery.to, @@ -1883,6 +1886,9 @@ export async function runHeartbeatOnce(opts: { ], deps: opts.deps, }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } await markCommitmentsStatus({ cfg, ids: dueCommitmentIds, diff --git a/src/infra/outbound/deliver-runtime.ts b/src/infra/outbound/deliver-runtime.ts index a3f51a0272a..30b9feaef56 100644 --- a/src/infra/outbound/deliver-runtime.ts +++ b/src/infra/outbound/deliver-runtime.ts @@ -1 +1,3 @@ +/** @deprecated Use `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)`. */ export { deliverOutboundPayloads } from "./deliver.js"; +export { deliverOutboundPayloadsInternal } from "./deliver.js"; diff --git a/src/infra/outbound/deliver-types.ts b/src/infra/outbound/deliver-types.ts index f3d8efaba86..8697f8e4c16 100644 --- a/src/infra/outbound/deliver-types.ts +++ b/src/infra/outbound/deliver-types.ts @@ -15,3 +15,62 @@ export type OutboundDeliveryResult = { // Channel docking: stash channel-specific fields here to avoid core type churn. meta?: Record; }; + +export type OutboundPayloadDeliverySuppressionReason = + | "cancelled_by_message_sending_hook" + | "empty_after_message_sending_hook" + | "no_visible_payload" + | "adapter_returned_no_identity"; + +export type OutboundDeliveryFailureStage = "platform_send" | "queue" | "unknown"; + +export type OutboundPayloadDeliveryOutcome = + | { + index: number; + status: "sent"; + results: OutboundDeliveryResult[]; + } + | { + index: number; + status: "suppressed"; + reason: OutboundPayloadDeliverySuppressionReason; + hookEffect?: { + cancelReason?: string; + metadata?: Record; + }; + } + | { + index: number; + status: "failed"; + error: unknown; + sentBeforeError: boolean; + stage: OutboundDeliveryFailureStage; + }; + +export class OutboundDeliveryError extends Error { + readonly results: OutboundDeliveryResult[]; + readonly payloadOutcomes: OutboundPayloadDeliveryOutcome[]; + readonly sentBeforeError: boolean; + readonly stage: OutboundDeliveryFailureStage; + + constructor( + message: string, + options: { + cause: unknown; + results?: readonly OutboundDeliveryResult[]; + payloadOutcomes?: readonly OutboundPayloadDeliveryOutcome[]; + stage?: OutboundDeliveryFailureStage; + }, + ) { + super(message, { cause: options.cause }); + this.name = "OutboundDeliveryError"; + this.results = [...(options.results ?? [])]; + this.payloadOutcomes = [...(options.payloadOutcomes ?? [])]; + this.sentBeforeError = this.results.length > 0; + this.stage = options.stage ?? "unknown"; + } +} + +export function isOutboundDeliveryError(error: unknown): error is OutboundDeliveryError { + return error instanceof OutboundDeliveryError; +} diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index bcd578c8d21..068ee9bf1ce 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -1447,6 +1447,38 @@ describe("deliverOutboundPayloads", () => { expect(sendText).not.toHaveBeenCalled(); }); + it("keeps payload outcome indexes tied to original input payload positions", async () => { + const sendMatrix = vi.fn().mockResolvedValue({ + messageId: "visible", + roomId: "!room:example", + }); + const payloadOutcomes: unknown[] = []; + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room:example", + payloads: [{ text: "NO_REPLY" }, { text: "visible reply" }], + deps: { matrix: sendMatrix }, + onPayloadDeliveryOutcome: (outcome) => { + payloadOutcomes.push(outcome); + }, + }); + + expect(results).toEqual([ + expect.objectContaining({ + channel: "matrix", + messageId: "visible", + }), + ]); + expect(payloadOutcomes).toEqual([ + expect.objectContaining({ + index: 1, + status: "sent", + }), + ]); + }); + it("strips internal runtime scaffolding added by message_sending hooks before delivery", async () => { hookMocks.runner.hasHooks.mockImplementation( (hookName?: string) => hookName === "message_sending", diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 5b394479eb9..5dbb736367d 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -42,7 +42,13 @@ import { emitDiagnosticEvent, type DiagnosticMessageDeliveryKind } from "../diag import { formatErrorMessage } from "../errors.js"; import { throwIfAborted } from "./abort.js"; import { resolveOutboundChannelMessageAdapter } from "./channel-resolution.js"; -import type { OutboundDeliveryResult } from "./deliver-types.js"; +import { + OutboundDeliveryError, + type OutboundDeliveryFailureStage, + type OutboundDeliveryResult, + type OutboundPayloadDeliveryOutcome, + type OutboundPayloadDeliverySuppressionReason, +} from "./deliver-types.js"; import { attachOutboundDeliveryCommitHook, runOutboundDeliveryCommitHooks, @@ -67,7 +73,6 @@ import { import type { DeliveryMirror } from "./mirror.js"; import { createOutboundPayloadPlan, - projectOutboundPayloadPlanForDelivery, summarizeOutboundPayloadForTransport, type NormalizedOutboundPayload, type OutboundPayloadPlan, @@ -562,6 +567,11 @@ function createChannelOutboundContextBase( const isAbortError = (err: unknown): boolean => err instanceof Error && err.name === "AbortError"; +const isDeliveryAbortError = (err: unknown): boolean => + isAbortError(err) || + (err instanceof OutboundDeliveryError && + isAbortError((err as Error & { cause?: unknown }).cause)); + async function markQueuedPlatformSendAttemptStarted(params: { queueId: string; queuePolicy: OutboundDeliveryQueuePolicy; @@ -615,6 +625,7 @@ type DeliverOutboundPayloadsCoreParams = { bestEffort?: boolean; onError?: (err: unknown, payload: NormalizedOutboundPayload) => void; onPayload?: (payload: NormalizedOutboundPayload) => void; + onPayloadDeliveryOutcome?: (outcome: OutboundPayloadDeliveryOutcome) => void; /** Session/agent context used for hooks and media local-root scoping. */ session?: OutboundSessionContext; mirror?: DeliveryMirror; @@ -630,6 +641,13 @@ function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): strin return plan.flatMap((entry) => entry.parts.mediaUrls); } +/** + * @deprecated Direct outbound delivery is compatibility/runtime substrate. + * New message lifecycle code should use `sendDurableMessageBatch` from + * `src/channels/message/send.ts` or `deliverInboundReplyWithMessageSendContext` + * from `src/channels/turn/durable-delivery.ts`. Keep direct use only for + * outbound substrate, recovery, and compatibility paths. + */ export type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & { /** @internal Skip write-ahead queue (used by crash-recovery to avoid re-enqueueing). */ skipQueue?: boolean; @@ -730,13 +748,18 @@ function normalizeEmptyPayloadForDelivery(payload: ReplyPayload): ReplyPayload | return payload; } +type NormalizedPayloadForChannelDelivery = { + index: number; + payload: ReplyPayload; +}; + function normalizePayloadsForChannelDelivery( plan: readonly OutboundPayloadPlan[], handler: ChannelHandler, -): ReplyPayload[] { - const normalizedPayloads: ReplyPayload[] = []; - for (const payload of projectOutboundPayloadPlanForDelivery(plan)) { - let sanitizedPayload = stripInternalRuntimeScaffoldingFromPayload(payload); +): NormalizedPayloadForChannelDelivery[] { + const normalizedPayloads: NormalizedPayloadForChannelDelivery[] = []; + for (const entry of plan) { + let sanitizedPayload = stripInternalRuntimeScaffoldingFromPayload(entry.payload); if (handler.sanitizeText && sanitizedPayload.text) { if (!handler.shouldSkipPlainTextSanitization?.(sanitizedPayload)) { sanitizedPayload = { @@ -754,7 +777,7 @@ function normalizePayloadsForChannelDelivery( ) : null; if (normalized) { - normalizedPayloads.push(normalized); + normalizedPayloads.push({ index: entry.sourceIndex, payload: normalized }); } } return normalizedPayloads; @@ -1002,12 +1025,16 @@ async function applyMessageSendingHook(params: { threadId?: string | number | null; }): Promise<{ cancelled: boolean; + cancelReason?: string; + hookMetadata?: Record; + contentRewritten: boolean; payload: ReplyPayload; payloadSummary: NormalizedOutboundPayload; }> { if (!params.enabled) { return { cancelled: false, + contentRewritten: false, payload: params.payload, payloadSummary: params.payloadSummary, }; @@ -1034,6 +1061,9 @@ async function applyMessageSendingHook(params: { if (sendingResult?.cancel) { return { cancelled: true, + ...(sendingResult.cancelReason ? { cancelReason: sendingResult.cancelReason } : {}), + ...(sendingResult.metadata ? { hookMetadata: sendingResult.metadata } : {}), + contentRewritten: false, payload: params.payload, payloadSummary: params.payloadSummary, }; @@ -1041,6 +1071,7 @@ async function applyMessageSendingHook(params: { if (sendingResult?.content == null) { return { cancelled: false, + contentRewritten: false, payload: params.payload, payloadSummary: params.payloadSummary, }; @@ -1049,6 +1080,7 @@ async function applyMessageSendingHook(params: { const spokenText = sendingResult.content; return { cancelled: false, + contentRewritten: true, payload: { ...params.payload, spokenText, @@ -1065,6 +1097,7 @@ async function applyMessageSendingHook(params: { }; return { cancelled: false, + contentRewritten: true, payload, payloadSummary: { ...params.payloadSummary, @@ -1075,14 +1108,61 @@ async function applyMessageSendingHook(params: { // Don't block delivery on hook failure. return { cancelled: false, + contentRewritten: false, payload: params.payload, payloadSummary: params.payloadSummary, }; } } +function toOutboundDeliveryError(params: { + error: unknown; + results: readonly OutboundDeliveryResult[]; + payloadOutcomes: readonly OutboundPayloadDeliveryOutcome[]; + stage: OutboundDeliveryFailureStage; +}): OutboundDeliveryError { + if (params.error instanceof OutboundDeliveryError) { + return params.error; + } + return new OutboundDeliveryError(formatErrorMessage(params.error), { + cause: params.error, + results: params.results, + payloadOutcomes: params.payloadOutcomes, + stage: params.stage, + }); +} + +function suppressedPayloadOutcome(params: { + index: number; + reason: OutboundPayloadDeliverySuppressionReason; + hookEffect?: { + cancelReason?: string; + metadata?: Record; + }; +}): OutboundPayloadDeliveryOutcome { + return { + index: params.index, + status: "suppressed", + reason: params.reason, + ...(params.hookEffect ? { hookEffect: params.hookEffect } : {}), + }; +} + +/** + * @deprecated Direct outbound delivery is compatibility/runtime substrate. + * New message lifecycle code should use `sendDurableMessageBatch` from + * `src/channels/message/send.ts` or `deliverInboundReplyWithMessageSendContext` + * from `src/channels/turn/durable-delivery.ts`. Keep direct use only for + * outbound substrate, recovery, and compatibility paths. + */ export async function deliverOutboundPayloads( params: DeliverOutboundPayloadsParams, +): Promise { + return await deliverOutboundPayloadsInternal(params); +} + +export async function deliverOutboundPayloadsInternal( + params: DeliverOutboundPayloadsParams, ): Promise { const { channel, to, payloads } = params; const queuePolicy = params.queuePolicy ?? "best_effort"; @@ -1220,7 +1300,7 @@ async function deliverOutboundPayloadsWithQueueCleanup( return results; } catch (err) { if (queueId) { - if (isAbortError(err)) { + if (isDeliveryAbortError(err)) { await ackDelivery(queueId).catch(() => {}); } else if (!platformResultsReturned) { await failDelivery(queueId, formatErrorMessage(err)).catch(() => {}); @@ -1323,6 +1403,16 @@ async function deliverOutboundPayloadsCore( } }; const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler); + const payloadOutcomes: OutboundPayloadDeliveryOutcome[] = []; + const recordPayloadOutcome = (outcome: OutboundPayloadDeliveryOutcome): void => { + payloadOutcomes.push(outcome); + params.onPayloadDeliveryOutcome?.(outcome); + }; + if (normalizedPayloads.length === 0 && payloads.length > 0) { + payloads.forEach((_payload, index) => { + recordPayloadOutcome(suppressedPayloadOutcome({ index, reason: "no_visible_payload" })); + }); + } const hookRunner = getGlobalHookRunner(); const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key; const mirrorIsGroup = params.mirror?.isGroup; @@ -1348,7 +1438,7 @@ async function deliverOutboundPayloadsCore( }, ); } - for (const payload of normalizedPayloads) { + for (const { index: payloadIndex, payload } of normalizedPayloads) { let payloadSummary = buildPayloadSummary(payload); let deliveryKind: DiagnosticMessageDeliveryKind = "other"; let deliveryStartedAt = 0; @@ -1407,6 +1497,20 @@ async function deliverOutboundPayloadsCore( threadId: params.threadId, }); if (hookResult.cancelled) { + const hookEffect = + hookResult.cancelReason || hookResult.hookMetadata + ? { + ...(hookResult.cancelReason ? { cancelReason: hookResult.cancelReason } : {}), + ...(hookResult.hookMetadata ? { metadata: hookResult.hookMetadata } : {}), + } + : undefined; + recordPayloadOutcome( + suppressedPayloadOutcome({ + index: payloadIndex, + reason: "cancelled_by_message_sending_hook", + ...(hookEffect ? { hookEffect } : {}), + }), + ); continue; } const renderedPayload = stripInternalRuntimeScaffoldingFromPayload( @@ -1421,6 +1525,14 @@ async function deliverOutboundPayloadsCore( ) : null; if (!effectivePayload) { + recordPayloadOutcome( + suppressedPayloadOutcome({ + index: payloadIndex, + reason: hookResult.contentRewritten + ? "empty_after_message_sending_hook" + : "no_visible_payload", + }), + ); continue; } payloadSummary = buildPayloadSummary(effectivePayload); @@ -1458,9 +1570,16 @@ async function deliverOutboundPayloadsCore( ); if (!hasDeliveryResultIdentity(delivery)) { completeDeliveryDiagnostics(0); + recordPayloadOutcome( + suppressedPayloadOutcome({ + index: payloadIndex, + reason: "adapter_returned_no_identity", + }), + ); continue; } results.push(delivery); + recordPayloadOutcome({ index: payloadIndex, status: "sent", results: [delivery] }); await maybePinDeliveredMessage({ handler, payload: effectivePayload, @@ -1494,6 +1613,20 @@ async function deliverOutboundPayloadsCore( await sendTextChunks(payloadSummary.text, sendOverrides); } const deliveredResults = results.slice(beforeCount); + if (deliveredResults.length > 0) { + recordPayloadOutcome({ + index: payloadIndex, + status: "sent", + results: deliveredResults, + }); + } else { + recordPayloadOutcome( + suppressedPayloadOutcome({ + index: payloadIndex, + reason: "adapter_returned_no_identity", + }), + ); + } const messageId = results.at(-1)?.messageId; const pinMessageId = deliveredResults.find((entry) => entry.messageId)?.messageId; await maybePinDeliveredMessage({ @@ -1535,6 +1668,20 @@ async function deliverOutboundPayloadsCore( const beforeCount = results.length; await sendTextChunks(fallbackText, sendOverrides); const deliveredResults = results.slice(beforeCount); + if (deliveredResults.length > 0) { + recordPayloadOutcome({ + index: payloadIndex, + status: "sent", + results: deliveredResults, + }); + } else { + recordPayloadOutcome( + suppressedPayloadOutcome({ + index: payloadIndex, + reason: "adapter_returned_no_identity", + }), + ); + } const messageId = results.at(-1)?.messageId; const pinMessageId = deliveredResults.find((entry) => entry.messageId)?.messageId; await maybePinDeliveredMessage({ @@ -1591,6 +1738,21 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, results: results.slice(beforeCount), }); + const deliveredResults = results.slice(beforeCount); + if (deliveredResults.length > 0) { + recordPayloadOutcome({ + index: payloadIndex, + status: "sent", + results: deliveredResults, + }); + } else { + recordPayloadOutcome( + suppressedPayloadOutcome({ + index: payloadIndex, + reason: "adapter_returned_no_identity", + }), + ); + } completeDeliveryDiagnostics(results.length - beforeCount); emitMessageSent({ success: true, @@ -1598,6 +1760,13 @@ async function deliverOutboundPayloadsCore( messageId: lastMessageId, }); } catch (err) { + recordPayloadOutcome({ + index: payloadIndex, + status: "failed", + error: err, + sentBeforeError: results.length > 0, + stage: "platform_send", + }); errorDeliveryDiagnostics(err); emitMessageSent({ success: false, @@ -1605,7 +1774,12 @@ async function deliverOutboundPayloadsCore( error: formatErrorMessage(err), }); if (!params.bestEffort) { - throw err; + throw toOutboundDeliveryError({ + error: err, + results, + payloadOutcomes, + stage: "platform_send", + }); } params.onError?.(err, payloadSummary); } diff --git a/src/infra/outbound/message.test.ts b/src/infra/outbound/message.test.ts index e573b84f1c5..c440897d080 100644 --- a/src/infra/outbound/message.test.ts +++ b/src/infra/outbound/message.test.ts @@ -44,6 +44,7 @@ vi.mock("./targets.js", () => ({ vi.mock("./deliver.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, resolveOutboundDurableFinalDeliverySupport: mocks.resolveOutboundDurableFinalDeliverySupport, })); @@ -418,4 +419,49 @@ describe("sendMessage", () => { expect(mocks.resolveRuntimePluginRegistry).not.toHaveBeenCalled(); }); + + it("does not throw best-effort direct send failures", async () => { + mocks.deliverOutboundPayloads.mockImplementationOnce(async (params: unknown) => { + ( + params as { + onPayloadDeliveryOutcome?: (outcome: { + index: number; + payload: { text: string }; + status: "failed"; + error: Error; + stage: "send"; + }) => void; + } + ).onPayloadDeliveryOutcome?.({ + index: 0, + payload: { text: "hi" }, + status: "failed", + error: new Error("transport unavailable"), + stage: "send", + }); + return []; + }); + + await expect( + sendMessage({ + cfg: {}, + channel: "forum", + to: "123456", + content: "hi", + bestEffort: true, + }), + ).resolves.toMatchObject({ + channel: "forum", + to: "123456", + via: "direct", + result: undefined, + }); + + expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + bestEffort: true, + queuePolicy: "best_effort", + }), + ); + }); }); diff --git a/src/infra/outbound/message.ts b/src/infra/outbound/message.ts index cca21b01a5e..81a9cd5705d 100644 --- a/src/infra/outbound/message.ts +++ b/src/infra/outbound/message.ts @@ -1,5 +1,6 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; import { deriveDurableFinalDeliveryRequirements } from "../../channels/message/capabilities.js"; +import { sendDurableMessageBatch } from "../../channels/message/runtime.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import type { OutboundMediaAccess } from "../../media/load-options.js"; import type { PollInput } from "../../polls.js"; @@ -13,7 +14,6 @@ import { import { resolveOutboundChannelPlugin } from "./channel-resolution.js"; import { resolveMessageChannelSelection } from "./channel-selection.js"; import { - deliverOutboundPayloads, resolveOutboundDurableFinalDeliverySupport, type DurableFinalDeliveryRequirements, type OutboundDeliveryResult, @@ -379,7 +379,7 @@ export async function sendMessage(params: MessageSendParams): Promise; hasPresentation: boolean; @@ -130,6 +131,10 @@ type PreparedOutboundPayloadPlanEntry = { isSilent: boolean; }; +type IndexedPreparedOutboundPayloadPlanEntry = PreparedOutboundPayloadPlanEntry & { + sourceIndex: number; +}; + function createOutboundPayloadPlanEntry( payload: ReplyPayload, context: Pick = {}, @@ -195,15 +200,15 @@ export function createOutboundPayloadPlan( }); const hasPendingSpawnedChildren = context.hasPendingSpawnedChildren ?? resolvePendingSpawnedChildren(context.sessionKey); - const prepared: PreparedOutboundPayloadPlanEntry[] = []; - for (const payload of payloads) { + const prepared: IndexedPreparedOutboundPayloadPlanEntry[] = []; + for (const [sourceIndex, payload] of payloads.entries()) { const entry = createOutboundPayloadPlanEntry(payload, { extractMarkdownImages: context.extractMarkdownImages, }); if (!entry) { continue; } - prepared.push(entry); + prepared.push({ ...entry, sourceIndex }); } const hasVisibleNonSilentContent = prepared.some((entry) => { if (entry.isSilent) { @@ -219,6 +224,7 @@ export function createOutboundPayloadPlan( for (const entry of prepared) { if (!entry.isSilent) { plan.push({ + sourceIndex: entry.sourceIndex, payload: entry.payload, parts: resolveSendableOutboundReplyParts(entry.payload), hasPresentation: entry.hasPresentation, @@ -243,6 +249,7 @@ export function createOutboundPayloadPlan( continue; } plan.push({ + sourceIndex: entry.sourceIndex, payload: visibleSilentPayload, parts: resolveSendableOutboundReplyParts(visibleSilentPayload), hasPresentation: entry.hasPresentation, @@ -261,6 +268,7 @@ export function createOutboundPayloadPlan( continue; } plan.push({ + sourceIndex: entry.sourceIndex, payload: visibleSilentPayload, parts: resolveSendableOutboundReplyParts(visibleSilentPayload), hasPresentation: entry.hasPresentation, diff --git a/src/infra/session-maintenance-warning.test.ts b/src/infra/session-maintenance-warning.test.ts index ae2bcfc196f..19244655f73 100644 --- a/src/infra/session-maintenance-warning.test.ts +++ b/src/infra/session-maintenance-warning.test.ts @@ -15,6 +15,11 @@ const mocks = vi.hoisted(() => ({ enqueueSystemEvent: vi.fn(), })); +vi.mock("./outbound/deliver.js", () => ({ + deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, +})); + type SessionMaintenanceWarningModule = typeof import("./session-maintenance-warning.js"); let deliverSessionMaintenanceWarning: SessionMaintenanceWarningModule["deliverSessionMaintenanceWarning"]; diff --git a/src/infra/session-maintenance-warning.ts b/src/infra/session-maintenance-warning.ts index 8bbc8ddf264..3b24f455040 100644 --- a/src/infra/session-maintenance-warning.ts +++ b/src/infra/session-maintenance-warning.ts @@ -16,11 +16,11 @@ type WarningParams = { const warnedContexts = new Map(); const log = createSubsystemLogger("session-maintenance-warning"); -let deliverRuntimePromise: Promise | null = null; +let messageRuntimePromise: Promise | null = null; function resetSessionMaintenanceWarningForTests() { warnedContexts.clear(); - deliverRuntimePromise = null; + messageRuntimePromise = null; } export const __testing = { @@ -28,8 +28,8 @@ export const __testing = { } as const; function loadDeliverRuntime() { - deliverRuntimePromise ??= import("./outbound/deliver-runtime.js"); - return deliverRuntimePromise; + messageRuntimePromise ??= import("../channels/message/runtime.js"); + return messageRuntimePromise; } function shouldSendWarning(): boolean { @@ -126,12 +126,12 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P } try { - const { deliverOutboundPayloads } = await loadDeliverRuntime(); + const { sendDurableMessageBatch } = await loadDeliverRuntime(); const outboundSession = buildOutboundSessionContext({ cfg: params.cfg, sessionKey: params.sessionKey, }); - await deliverOutboundPayloads({ + const send = await sendDurableMessageBatch({ cfg: params.cfg, channel, to: target.to, @@ -140,6 +140,9 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P payloads: [{ text }], session: outboundSession, }); + if (send.status === "failed" || send.status === "partial_failed") { + throw send.error; + } } catch (err) { log.warn(`Failed to deliver session maintenance warning: ${String(err)}`); enqueueSystemEvent(text, { sessionKey: params.sessionKey }); diff --git a/src/media-understanding/echo-transcript.test.ts b/src/media-understanding/echo-transcript.test.ts index ca209ee44a5..782a8ee2743 100644 --- a/src/media-understanding/echo-transcript.test.ts +++ b/src/media-understanding/echo-transcript.test.ts @@ -6,6 +6,12 @@ const mockDeliverOutboundPayloads = vi.hoisted(() => vi.fn()); vi.mock("../infra/outbound/deliver-runtime.js", () => ({ deliverOutboundPayloads: (...args: unknown[]) => mockDeliverOutboundPayloads(...args), + deliverOutboundPayloadsInternal: (...args: unknown[]) => mockDeliverOutboundPayloads(...args), +})); + +vi.mock("../infra/outbound/deliver.js", () => ({ + deliverOutboundPayloads: (...args: unknown[]) => mockDeliverOutboundPayloads(...args), + deliverOutboundPayloadsInternal: (...args: unknown[]) => mockDeliverOutboundPayloads(...args), })); vi.mock("../utils/message-channel.js", () => ({ @@ -38,15 +44,18 @@ describe("sendTranscriptEcho", () => { }); expect(mockDeliverOutboundPayloads).toHaveBeenCalledOnce(); - expect(mockDeliverOutboundPayloads).toHaveBeenCalledWith({ - cfg: {}, - channel: "voicechat", - to: "+10000000001", - accountId: "acc1", - threadId: undefined, - payloads: [{ text: DEFAULT_ECHO_TRANSCRIPT_FORMAT.replace("{transcript}", "hello world") }], - bestEffort: true, - }); + expect(mockDeliverOutboundPayloads).toHaveBeenCalledWith( + expect.objectContaining({ + cfg: {}, + channel: "voicechat", + to: "+10000000001", + accountId: "acc1", + threadId: undefined, + payloads: [{ text: DEFAULT_ECHO_TRANSCRIPT_FORMAT.replace("{transcript}", "hello world") }], + bestEffort: true, + queuePolicy: "best_effort", + }), + ); }); it("uses a custom format when provided", async () => { diff --git a/src/media-understanding/echo-transcript.ts b/src/media-understanding/echo-transcript.ts index 7af132d7b11..11ae4f3d40c 100644 --- a/src/media-understanding/echo-transcript.ts +++ b/src/media-understanding/echo-transcript.ts @@ -4,12 +4,11 @@ import { logVerbose, shouldLogVerbose } from "../globals.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { isDeliverableMessageChannel } from "../utils/message-channel.js"; -let deliverRuntimePromise: Promise | null = - null; +let messageRuntimePromise: Promise | null = null; -function loadDeliverRuntime() { - deliverRuntimePromise ??= import("../infra/outbound/deliver-runtime.js"); - return deliverRuntimePromise; +function loadMessageRuntime() { + messageRuntimePromise ??= import("../channels/message/runtime.js"); + return messageRuntimePromise; } export const DEFAULT_ECHO_TRANSCRIPT_FORMAT = '📝 "{transcript}"'; @@ -52,8 +51,8 @@ export async function sendTranscriptEcho(params: { const text = formatEchoTranscript(transcript, params.format ?? DEFAULT_ECHO_TRANSCRIPT_FORMAT); try { - const { deliverOutboundPayloads } = await loadDeliverRuntime(); - await deliverOutboundPayloads({ + const { sendDurableMessageBatch } = await loadMessageRuntime(); + const send = await sendDurableMessageBatch({ cfg, channel: normalizedChannel, to, @@ -61,7 +60,11 @@ export async function sendTranscriptEcho(params: { threadId: ctx.MessageThreadId ?? undefined, payloads: [{ text }], bestEffort: true, + durability: "best_effort", }); + if (send.status === "failed") { + throw send.error; + } if (shouldLogVerbose()) { logVerbose(`media: echo-transcript sent to ${normalizedChannel}/${to}`); } diff --git a/src/plugin-sdk/channel-message-runtime.ts b/src/plugin-sdk/channel-message-runtime.ts index c39854866a9..4a2aa3f40b4 100644 --- a/src/plugin-sdk/channel-message-runtime.ts +++ b/src/plugin-sdk/channel-message-runtime.ts @@ -1,16 +1,20 @@ -export { - buildChannelMessageReplyDispatchBase, - dispatchChannelMessageReplyWithBase, - hasFinalChannelMessageReplyDispatch, - hasVisibleChannelMessageReplyDispatch, - recordChannelMessageReplyDispatch, - resolveChannelMessageReplyDispatchCounts, -} from "./inbound-reply-dispatch.js"; -export { - createChannelTurnReplyPipeline, - deliverDurableInboundReplyPayload, - deliverInboundReplyWithMessageSendContext, -} from "../channels/turn/kernel.js"; +/** @deprecated Compatibility helper for legacy reply dispatch bridges. */ +export { buildChannelMessageReplyDispatchBase } from "./inbound-reply-dispatch.js"; +/** @deprecated Compatibility reply-dispatch bridge. Use `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)`. */ +export { dispatchChannelMessageReplyWithBase } from "./inbound-reply-dispatch.js"; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ +export { hasFinalChannelMessageReplyDispatch } from "./inbound-reply-dispatch.js"; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ +export { hasVisibleChannelMessageReplyDispatch } from "./inbound-reply-dispatch.js"; +/** @deprecated Compatibility reply-dispatch bridge. Use `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)`. */ +export { recordChannelMessageReplyDispatch } from "./inbound-reply-dispatch.js"; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ +export { resolveChannelMessageReplyDispatchCounts } from "./inbound-reply-dispatch.js"; +/** @deprecated Compatibility assembly for legacy buffered reply dispatchers. */ +export { createChannelTurnReplyPipeline } from "../channels/turn/kernel.js"; +/** @deprecated Use `deliverInboundReplyWithMessageSendContext(...)`. */ +export { deliverDurableInboundReplyPayload } from "../channels/turn/kernel.js"; +export { deliverInboundReplyWithMessageSendContext } from "../channels/turn/kernel.js"; export type { DurableInboundReplyDeliveryOptions, DurableInboundReplyDeliveryParams, diff --git a/src/plugin-sdk/channel-message.ts b/src/plugin-sdk/channel-message.ts index b8a7caa3999..e9c91f5195c 100644 --- a/src/plugin-sdk/channel-message.ts +++ b/src/plugin-sdk/channel-message.ts @@ -138,17 +138,28 @@ export type { RenderedMessageBatchPlanKind, } from "../channels/message/index.js"; +export { + hasFinalChannelTurnDispatch, + hasVisibleChannelTurnDispatch, + resolveChannelTurnDispatchCounts, +}; + type ChannelTurnKernelModule = typeof import("../channels/turn/kernel.js"); type InboundReplyDispatchModule = typeof import("./inbound-reply-dispatch.js"); +/** @deprecated Use `createChannelMessageReplyPipeline(...)` for compatibility dispatchers. */ export function createChannelTurnReplyPipeline(params: CreateChannelReplyPipelineParams) { return createChannelReplyPipeline(params); } +/** @deprecated Compatibility helper for legacy reply dispatch results. */ export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts; +/** @deprecated Compatibility helper for legacy reply dispatch bridges. */ export const buildChannelMessageReplyDispatchBase: InboundReplyDispatchModule["buildChannelMessageReplyDispatchBase"] = ((params) => ({ cfg: params.cfg, @@ -163,12 +174,24 @@ export const buildChannelMessageReplyDispatchBase: InboundReplyDispatchModule["b params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher, })) as InboundReplyDispatchModule["buildChannelMessageReplyDispatchBase"]; +/** + * @deprecated Compatibility reply-dispatch bridge. New channel plugins should + * expose a `message` adapter and route sends through + * `deliverInboundReplyWithMessageSendContext(...)` or + * `sendDurableMessageBatch(...)`. + */ export const dispatchChannelMessageReplyWithBase: InboundReplyDispatchModule["dispatchChannelMessageReplyWithBase"] = async (...args) => { const mod = await import("./inbound-reply-dispatch.js"); return await mod.dispatchChannelMessageReplyWithBase(...args); }; +/** + * @deprecated Compatibility reply-dispatch bridge. New channel plugins should + * expose a `message` adapter and route sends through + * `deliverInboundReplyWithMessageSendContext(...)` or + * `sendDurableMessageBatch(...)`. + */ export const recordChannelMessageReplyDispatch: InboundReplyDispatchModule["recordChannelMessageReplyDispatch"] = async (...args) => { const mod = await import("./inbound-reply-dispatch.js"); diff --git a/src/plugin-sdk/channel-test-helpers.ts b/src/plugin-sdk/channel-test-helpers.ts index 83e361e3e46..b42360daefb 100644 --- a/src/plugin-sdk/channel-test-helpers.ts +++ b/src/plugin-sdk/channel-test-helpers.ts @@ -12,13 +12,14 @@ export { createEmptyPluginRegistry, createOutboundTestPlugin, createTestRegistry, - deliverOutboundPayloads, initializeGlobalHookRunner, releasePinnedPluginChannelRegistry, resetGlobalHookRunner, setActivePluginRegistry, type PluginHookRegistration, } from "./test-helpers/outbound-delivery.js"; +/** @deprecated Direct outbound delivery is runtime substrate; use channel message runtime helpers. */ +export { deliverOutboundPayloads } from "./test-helpers/outbound-delivery.js"; export { createPluginRuntimeMock } from "./test-helpers/plugin-runtime-mock.js"; export { createSendCfgThreadingRuntime, diff --git a/src/plugin-sdk/delivery-queue-runtime.test.ts b/src/plugin-sdk/delivery-queue-runtime.test.ts index 2c8dac0602d..531bb444fef 100644 --- a/src/plugin-sdk/delivery-queue-runtime.test.ts +++ b/src/plugin-sdk/delivery-queue-runtime.test.ts @@ -11,6 +11,7 @@ vi.mock("../infra/outbound/delivery-queue.js", () => ({ vi.mock("../infra/outbound/deliver-runtime.js", () => ({ deliverOutboundPayloads: mocks.deliverOutboundPayloads, + deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads, })); type DeliveryQueueRuntimeModule = typeof import("./delivery-queue-runtime.js"); diff --git a/src/plugin-sdk/delivery-queue-runtime.ts b/src/plugin-sdk/delivery-queue-runtime.ts index c9e5f65d850..fe3fdb04a0a 100644 --- a/src/plugin-sdk/delivery-queue-runtime.ts +++ b/src/plugin-sdk/delivery-queue-runtime.ts @@ -19,7 +19,8 @@ async function loadOutboundDeliverRuntime(): Promise { - const deliver = opts.deliver ?? (await loadOutboundDeliverRuntime()).deliverOutboundPayloads; + const deliver = + opts.deliver ?? (await loadOutboundDeliverRuntime()).deliverOutboundPayloadsInternal; await coreDrainPendingDeliveries({ ...opts, deliver, diff --git a/src/plugin-sdk/inbound-reply-dispatch.ts b/src/plugin-sdk/inbound-reply-dispatch.ts index cd17203600f..c763533a0bb 100644 --- a/src/plugin-sdk/inbound-reply-dispatch.ts +++ b/src/plugin-sdk/inbound-reply-dispatch.ts @@ -138,6 +138,11 @@ type RecordChannelMessageReplyDispatchParams = { /** * Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn. + * + * @deprecated Compatibility reply-dispatch bridge. New channel plugins should + * expose a `message` adapter via `defineChannelMessageAdapter(...)` and route + * sends through `deliverInboundReplyWithMessageSendContext(...)` or + * `sendDurableMessageBatch(...)`. */ export async function dispatchChannelMessageReplyWithBase( params: BuildInboundReplyDispatchBaseParams & @@ -171,7 +176,14 @@ export async function dispatchInboundReplyWithBase( await dispatchChannelMessageReplyWithBase(params); } -/** Record the inbound session first, then dispatch the reply using normalized outbound delivery. */ +/** + * Record the inbound session first, then dispatch the reply using normalized outbound delivery. + * + * @deprecated Compatibility reply-dispatch bridge. New channel plugins should + * expose a `message` adapter via `defineChannelMessageAdapter(...)` and route + * sends through `deliverInboundReplyWithMessageSendContext(...)` or + * `sendDurableMessageBatch(...)`. + */ export async function recordChannelMessageReplyDispatch( params: RecordChannelMessageReplyDispatchParams, ): Promise { @@ -246,7 +258,11 @@ export async function recordInboundSessionAndDispatchReply( await recordChannelMessageReplyDispatch(params); } +/** @deprecated Compatibility helper for legacy reply dispatch bridges. */ export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch; +/** @deprecated Compatibility helper for legacy reply dispatch results. */ export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts; diff --git a/src/plugin-sdk/outbound-runtime.ts b/src/plugin-sdk/outbound-runtime.ts index 35bdf67aa4f..7f2559d14f8 100644 --- a/src/plugin-sdk/outbound-runtime.ts +++ b/src/plugin-sdk/outbound-runtime.ts @@ -3,11 +3,21 @@ export { resolveOutboundSendDep, type OutboundSendDeps } from "../infra/outbound export { resolveAgentOutboundIdentity, type OutboundIdentity } from "../infra/outbound/identity.js"; export type { OutboundDeliveryFormattingOptions } from "../infra/outbound/formatting.js"; export { createReplyToFanout, type ReplyToResolution } from "../infra/outbound/reply-policy.js"; -export { - deliverOutboundPayloads, - type DeliverOutboundPayloadsParams, - type OutboundDeliveryResult, -} from "../infra/outbound/deliver.js"; +/** + * @deprecated Direct outbound delivery is compatibility/runtime substrate. New + * channel and plugin send paths should use + * `openclaw/plugin-sdk/channel-message-runtime` helpers: + * `sendDurableMessageBatch`, `withDurableMessageSendContext`, or + * `deliverInboundReplyWithMessageSendContext`. + */ +export { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; +/** + * @deprecated Direct outbound delivery params are compatibility/runtime + * substrate. New channel and plugin send paths should use + * `openclaw/plugin-sdk/channel-message-runtime` helpers. + */ +export type { DeliverOutboundPayloadsParams } from "../infra/outbound/deliver.js"; +export { type OutboundDeliveryResult } from "../infra/outbound/deliver.js"; export { sanitizeForPlainText } from "../infra/outbound/sanitize-text.js"; export { buildOutboundSessionContext, diff --git a/src/plugin-sdk/test-helpers/outbound-delivery.ts b/src/plugin-sdk/test-helpers/outbound-delivery.ts index 5ea9fa98825..5386ebd9450 100644 --- a/src/plugin-sdk/test-helpers/outbound-delivery.ts +++ b/src/plugin-sdk/test-helpers/outbound-delivery.ts @@ -3,10 +3,11 @@ export { createEmptyPluginRegistry, createOutboundTestPlugin, createTestRegistry, - deliverOutboundPayloads, initializeGlobalHookRunner, releasePinnedPluginChannelRegistry, resetGlobalHookRunner, setActivePluginRegistry, type PluginHookRegistration, } from "../testing.js"; +/** @deprecated Direct outbound delivery is runtime substrate; use channel message runtime helpers. */ +export { deliverOutboundPayloads } from "../testing.js"; diff --git a/src/plugin-sdk/testing.ts b/src/plugin-sdk/testing.ts index c79bcb1d6d1..734c8dbb4db 100644 --- a/src/plugin-sdk/testing.ts +++ b/src/plugin-sdk/testing.ts @@ -35,6 +35,7 @@ export type { ChannelGatewayContext } from "../channels/plugins/types.adapters.j export type { OpenClawConfig } from "../config/config.js"; export { isAtLeast, parseSemver } from "../infra/runtime-guard.js"; export { callGateway } from "../gateway/call.js"; +/** @deprecated Direct outbound delivery is runtime substrate; use channel message runtime helpers. */ export { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; export { createEmptyPluginRegistry, diff --git a/src/plugins/compat/registry.ts b/src/plugins/compat/registry.ts index 8680af64aea..4161a828b9e 100644 --- a/src/plugins/compat/registry.ts +++ b/src/plugins/compat/registry.ts @@ -758,7 +758,7 @@ export const PLUGIN_COMPAT_RECORDS = [ surfaces: ["api.runtime.config.loadConfig", "api.runtime.config.writeConfigFile"], diagnostics: [ "plugin runtime compatibility warning", - "deprecated internal config API guard", + "deprecated API usage guard", "runtime channel config boundary guard", ], tests: [ diff --git a/src/plugins/hook-message.types.ts b/src/plugins/hook-message.types.ts index f48734f8dd8..9a2d550f6b7 100644 --- a/src/plugins/hook-message.types.ts +++ b/src/plugins/hook-message.types.ts @@ -77,6 +77,8 @@ export type PluginHookMessageSendingEvent = { export type PluginHookMessageSendingResult = { content?: string; cancel?: boolean; + cancelReason?: string; + metadata?: Record; }; export type PluginHookMessageSentEvent = { diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index 2444f1eae92..a850dbfac69 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -1045,6 +1045,8 @@ export function createHookRunner( return { content: lastDefined(acc?.content, next.content), cancel: stickyTrue(acc?.cancel, next.cancel), + cancelReason: lastDefined(acc?.cancelReason, next.cancelReason), + metadata: next.metadata ?? acc?.metadata, }; }, shouldStop: (result) => result.cancel === true, diff --git a/src/plugins/wired-hooks-message.test.ts b/src/plugins/wired-hooks-message.test.ts index 4a5b5724d9e..f180b184eb7 100644 --- a/src/plugins/wired-hooks-message.test.ts +++ b/src/plugins/wired-hooks-message.test.ts @@ -47,8 +47,8 @@ describe("message_sending hook runner", () => { { name: "runMessageSending can cancel message delivery", event: { to: "user-123", content: "blocked" }, - hookResult: { cancel: true }, - expected: { cancel: true }, + hookResult: { cancel: true, cancelReason: "policy", metadata: { owner: "agent-2" } }, + expected: { cancel: true, cancelReason: "policy", metadata: { owner: "agent-2" } }, }, ] as const)("$name", async ({ event, hookResult, expected }) => { await expectMessageHookCall({