refactor: consolidate message delivery API

This commit is contained in:
Peter Steinberger
2026-05-09 07:02:33 +01:00
parent 195db88d71
commit a4b17d65a8
74 changed files with 1561 additions and 340 deletions

View File

@@ -30,6 +30,7 @@ Docs: https://docs.openclaw.ai
- Plugins/doctor: avoid full-array sorting while selecting ClawHub search/archive results and bounded dreaming doctor entries. Thanks @shakkernerd.
- Agents/compaction: keep contributor diagnostics to a bounded top-three selection without sorting the full history. Thanks @shakkernerd.
- Sessions/UI: avoid full-array sorting while selecting ACPX leases, Google Meet calendar events, and latest chat sessions. Thanks @shakkernerd.
- Plugin SDK: mark direct `deliverOutboundPayloads` and legacy reply-dispatch bridges as deprecated compatibility substrate, enrich `sendDurableMessageBatch` with explicit durable send outcomes, migrate bundled send/turn paths off deprecated APIs, and enforce the split with `check:deprecated-api-usage`.
- Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus.
- Telegram/streaming: continue over-limit draft previews in a new message instead of stopping when rendered preview text crosses Telegram's message limit. (#74508) Thanks @anagnorisis2peripeteia.
- Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip.

View File

@@ -1,2 +1,2 @@
9f7ea91407a66fee6bcdebaf64bd15d0a6ae8b48cf100753c96f2ad29ad86390 plugin-sdk-api-baseline.json
4d516f6ac681cf55e916f712601abb8f5a7ddcd92a7710e8947f89c38e4054e7 plugin-sdk-api-baseline.jsonl
779d6cd6c35fb4685cb5a31c40748ab71184b89632982920d2aeee9d703cdb3a plugin-sdk-api-baseline.json
ecb5ea5dc7b4573c61070b2cbdfdcbc3436032bbe39c3045dd8d5213d18ec858 plugin-sdk-api-baseline.jsonl

View File

@@ -369,6 +369,12 @@ Decision rules:
- `message_sending` with `cancel: false` is treated as no decision.
- Rewritten `content` continues to lower-priority hooks unless a later hook
cancels delivery.
- `message_sending` can return `cancelReason` and bounded `metadata` with a
cancellation. New message lifecycle APIs expose this as a suppressed delivery
outcome with reason `cancelled_by_message_sending_hook`; legacy direct
delivery keeps returning an empty result array for compatibility.
- `message_sent` is observation-only. Handler failures are logged and do not
change the delivery result.
## Install hooks

View File

@@ -31,6 +31,36 @@ Runtime delivery helpers are available from
`openclaw/plugin-sdk/channel-message-runtime` for monitor/send code paths that
are already doing asynchronous message I/O.
New channel and plugin send code should use the message lifecycle helpers from
`openclaw/plugin-sdk/channel-message-runtime`: `sendDurableMessageBatch`,
`withDurableMessageSendContext`, or `deliverInboundReplyWithMessageSendContext`.
The older
`deliverOutboundPayloads(...)` helper in `openclaw/plugin-sdk/outbound-runtime`
is deprecated compatibility/runtime substrate for outbound internals, recovery,
and legacy adapters. Do not use it for new channel or plugin send paths.
`sendDurableMessageBatch(...)` returns an explicit lifecycle outcome:
- `sent` - at least one visible platform message was delivered.
- `suppressed` - no platform message should be treated as missing. Stable
reasons include `cancelled_by_message_sending_hook`,
`empty_after_message_sending_hook`, `no_visible_payload`,
`adapter_returned_no_identity`, and legacy `no_visible_result`.
- `partial_failed` - at least one platform message was delivered before a later
payload or side effect failed. The result includes the delivered receipt prefix
plus the failure.
- `failed` - no platform receipt was produced.
Use `payloadOutcomes` when a batch mixes sent, suppressed, and failed payloads.
Do not infer hook cancellation by checking whether the old direct-delivery array
is empty.
Compatibility dispatchers that still need the buffered reply dispatcher should
build reply-prefix options with `createChannelMessageReplyPipeline(...)` from
`openclaw/plugin-sdk/channel-message`, then call the runtime's
`channel.turn.runPrepared(...)`. That keeps session recording and dispatch
ordering on the shared turn lifecycle without adding another public turn wrapper.
## Minimal adapter
Most new channel plugins can start with a small adapter:
@@ -124,8 +154,10 @@ tool send, implement `actions.prepareSendPayload(...)` instead of sending from
`prepareSendPayload(...)` receives the normalized core `ReplyPayload` plus the
full action context. Return a payload with channel-specific data in
`payload.channelData.<channel>` and let core call `sendMessage(...)`,
`deliverOutboundPayloads(...)`, the write-ahead queue, message-sending hooks,
retry, recovery, and ack cleanup.
the message lifecycle runtime, the write-ahead queue, message-sending hooks,
retry, recovery, and ack cleanup. The lifecycle runtime may call
`deliverOutboundPayloads(...)` internally as compatibility substrate, but channel
plugins should not call it directly for new send behavior.
Return `null` only when the send cannot be represented as a durable payload, for
example because it contains a non-serializable component factory. Core will keep
@@ -390,17 +422,21 @@ surface.
These APIs remain importable for third-party compatibility. Do not use them for
new channel code.
| Deprecated API | Replacement |
| -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
| `openclaw/plugin-sdk/channel-reply-pipeline` | `openclaw/plugin-sdk/channel-message` |
| `createChannelTurnReplyPipeline(...)` | `createChannelMessageReplyPipeline(...)` for compatibility dispatchers, or a `message` adapter for new channel code |
| `deliverDurableInboundReplyPayload(...)` | `deliverInboundReplyWithMessageSendContext(...)` from `openclaw/plugin-sdk/channel-message-runtime` |
| `dispatchInboundReplyWithBase(...)` | `dispatchChannelMessageReplyWithBase(...)` only for compatibility dispatchers |
| `recordInboundSessionAndDispatchReply(...)` | `recordChannelMessageReplyDispatch(...)` only for compatibility dispatchers |
| `resolveChannelSourceReplyDeliveryMode(...)` | `resolveChannelMessageSourceReplyDeliveryMode(...)` |
| `deliverFinalizableDraftPreview(...)` | `defineFinalizableLivePreviewAdapter(...)` plus `deliverWithFinalizableLivePreviewAdapter(...)` |
| `DraftPreviewFinalizerDraft` | `LivePreviewFinalizerDraft` |
| `DraftPreviewFinalizerResult` | `LivePreviewFinalizerResult` |
| Deprecated API | Replacement |
| -------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
| `openclaw/plugin-sdk/channel-reply-pipeline` | `openclaw/plugin-sdk/channel-message` |
| `createChannelTurnReplyPipeline(...)` | `createChannelMessageReplyPipeline(...)` for compatibility dispatchers, or a `message` adapter for new channel code |
| `buildChannelMessageReplyDispatchBase(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code |
| `dispatchChannelMessageReplyWithBase(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code |
| `recordChannelMessageReplyDispatch(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code |
| `deliverOutboundPayloads(...)` | `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)` from `channel-message-runtime` |
| `deliverDurableInboundReplyPayload(...)` | `deliverInboundReplyWithMessageSendContext(...)` from `openclaw/plugin-sdk/channel-message-runtime` |
| `dispatchInboundReplyWithBase(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code |
| `recordInboundSessionAndDispatchReply(...)` | `createChannelMessageReplyPipeline(...)` plus `channel.turn.runPrepared(...)`, or a `message` adapter for new channel code |
| `resolveChannelSourceReplyDeliveryMode(...)` | `resolveChannelMessageSourceReplyDeliveryMode(...)` |
| `deliverFinalizableDraftPreview(...)` | `defineFinalizableLivePreviewAdapter(...)` plus `deliverWithFinalizableLivePreviewAdapter(...)` |
| `DraftPreviewFinalizerDraft` | `LivePreviewFinalizerDraft` |
| `DraftPreviewFinalizerResult` | `LivePreviewFinalizerResult` |
Compatibility dispatchers can still use `createReplyPrefixContext(...)`,
`createReplyPrefixOptions(...)`, and `createTypingCallbacks(...)` through the

View File

@@ -250,7 +250,7 @@ releases.
helpers for external plugins during the migration window and warn once with
the `runtime-config-load-write` compatibility code. Bundled plugins and repo
runtime code are protected by scanner guardrails in
`pnpm check:deprecated-internal-config-api` and
`pnpm check:deprecated-api-usage` and
`pnpm check:no-runtime-action-load-config`: new production plugin usage
fails outright, direct config writes fail, gateway server methods must use
the request runtime snapshot, runtime channel send/action/client helpers

View File

@@ -66,14 +66,14 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview)
| `plugin-sdk/command-gating` | Narrow command authorization gate helpers |
| `plugin-sdk/channel-policy` | `resolveChannelGroupRequireMention` |
| `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, `createChannelRunQueue`, and legacy draft stream lifecycle helpers. New preview finalization code should use `plugin-sdk/channel-message`. |
| `plugin-sdk/channel-message` | Cheap message lifecycle contract helpers such as `defineChannelMessageAdapter`, `createChannelMessageAdapterFromOutbound`, `createReplyPrefixContext`, `resolveChannelMessageSourceReplyDeliveryMode`, compatibility facades, durable-final capability derivation, capability proof helpers for send/receipt/side-effect capabilities, `MessageReceiveContext`, receive ack policy proofs, `defineFinalizableLivePreviewAdapter`, `deliverWithFinalizableLivePreviewAdapter`, live-preview and live-finalizer capability proofs, durable recovery state, `RenderedMessageBatch`, message receipt types, and receipt id helpers. See [Channel message API](/plugins/sdk-channel-message). Legacy `createChannelTurnReplyPipeline` remains only for compatibility dispatchers. |
| `plugin-sdk/channel-message-runtime` | Runtime delivery helpers that may load outbound delivery, including `deliverInboundReplyWithMessageSendContext`, `sendDurableMessageBatch`, `withDurableMessageSendContext`, `dispatchChannelMessageReplyWithBase`, and `recordChannelMessageReplyDispatch`. Use from monitor/send runtime modules, not hot plugin bootstrap files. |
| `plugin-sdk/channel-message` | Cheap message lifecycle contract helpers such as `defineChannelMessageAdapter`, `createChannelMessageAdapterFromOutbound`, `createChannelMessageReplyPipeline`, `createReplyPrefixContext`, `resolveChannelMessageSourceReplyDeliveryMode`, durable-final capability derivation, capability proof helpers for send/receipt/side-effect capabilities, `MessageReceiveContext`, receive ack policy proofs, `defineFinalizableLivePreviewAdapter`, `deliverWithFinalizableLivePreviewAdapter`, live-preview and live-finalizer capability proofs, durable recovery state, `RenderedMessageBatch`, message receipt types, and receipt id helpers. See [Channel message API](/plugins/sdk-channel-message). Legacy reply-dispatch facades are deprecated compatibility only. |
| `plugin-sdk/channel-message-runtime` | Runtime delivery helpers that may load outbound delivery, including `deliverInboundReplyWithMessageSendContext`, `sendDurableMessageBatch`, and `withDurableMessageSendContext`. Deprecated reply-dispatch bridges remain importable for compatibility dispatchers only. Use from monitor/send runtime modules, not hot plugin bootstrap files. |
| `plugin-sdk/inbound-envelope` | Shared inbound route + envelope builder helpers |
| `plugin-sdk/inbound-reply-dispatch` | Legacy shared inbound record-and-dispatch helpers, visible/final dispatch predicates, and deprecated `deliverDurableInboundReplyPayload` compatibility for prepared channel dispatchers. New channel receive/dispatch code should import runtime lifecycle helpers from `plugin-sdk/channel-message-runtime`. |
| `plugin-sdk/messaging-targets` | Target parsing/matching helpers |
| `plugin-sdk/outbound-media` | Shared outbound media loading helpers |
| `plugin-sdk/outbound-send-deps` | Lightweight outbound send dependency lookup for channel adapters |
| `plugin-sdk/outbound-runtime` | Outbound delivery, identity, send delegate, session, formatting, and payload planning helpers |
| `plugin-sdk/outbound-runtime` | Outbound identity, send delegate, session, formatting, and payload planning helpers. Direct delivery helpers such as `deliverOutboundPayloads` are deprecated compatibility substrate; use `plugin-sdk/channel-message-runtime` for new send paths. |
| `plugin-sdk/poll-runtime` | Narrow poll normalization helpers |
| `plugin-sdk/thread-bindings-runtime` | Thread-binding lifecycle and adapter helpers |
| `plugin-sdk/agent-media-payload` | Legacy agent media payload builder |

View File

@@ -19,6 +19,17 @@ vi.mock("openclaw/plugin-sdk/outbound-runtime", async () => {
};
});
vi.mock("../../../../src/infra/outbound/deliver.js", async () => {
const actual = await vi.importActual<typeof import("../../../../src/infra/outbound/deliver.js")>(
"../../../../src/infra/outbound/deliver.js",
);
return {
...actual,
deliverOutboundPayloads: deliverOutboundPayloadsMock,
deliverOutboundPayloadsInternal: deliverOutboundPayloadsMock,
};
});
vi.mock("../send.js", async () => {
const actual = await vi.importActual<typeof import("../send.js")>("../send.js");
return {

View File

@@ -1,4 +1,5 @@
import { resolveAgentAvatar } from "openclaw/plugin-sdk/agent-runtime";
import { sendDurableMessageBatch } from "openclaw/plugin-sdk/channel-message";
import type {
MarkdownTableMode,
OpenClawConfig,
@@ -7,7 +8,6 @@ import type {
import type { OutboundMediaAccess } from "openclaw/plugin-sdk/media-runtime";
import {
buildOutboundSessionContext,
deliverOutboundPayloads,
type OutboundDeliveryFormattingOptions,
type OutboundIdentity,
type OutboundSendDeps,
@@ -181,7 +181,7 @@ export async function deliverDiscordReply(params: {
return;
}
const results = await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg: params.cfg,
channel: "discord",
to: delivery.to,
@@ -205,6 +205,10 @@ export async function deliverDiscordReply(params: {
requesterAccountId: params.accountId,
}),
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
const results = send.status === "sent" ? send.results : [];
if (results.length === 0) {
throw new Error(`discord final reply produced no delivered message for ${delivery.to}`);
}

View File

@@ -1,3 +1,4 @@
import { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message";
import { createChannelPairingController } from "openclaw/plugin-sdk/channel-pairing";
import {
readStoreAllowFromForDmPolicy,
@@ -333,38 +334,53 @@ export async function handleIrcInbound(params: {
CommandAuthorized: commandAuthorized,
});
const { dispatchChannelMessageReplyWithBase } =
await import("openclaw/plugin-sdk/channel-message");
await dispatchChannelMessageReplyWithBase({
const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({
cfg: config as OpenClawConfig,
agentId: route.agentId,
channel: CHANNEL_ID,
accountId: account.accountId,
route,
});
await core.channel.turn.runPrepared({
channel: CHANNEL_ID,
accountId: account.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
core,
deliver: async (payload) => {
await deliverIrcReply({
payload,
cfg: config,
target: peerId,
accountId: account.accountId,
sendReply: params.sendReply,
statusSink,
});
},
onRecordError: (err) => {
runtime.error?.(`irc: failed updating session meta: ${String(err)}`);
},
onDispatchError: (err, info) => {
runtime.error?.(`irc ${info.kind} reply failed: ${String(err)}`);
},
replyOptions: {
skillFilter: groupMatch.groupConfig?.skills,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
recordInboundSession: core.channel.session.recordInboundSession,
runDispatch: async () =>
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config as OpenClawConfig,
dispatcherOptions: {
...replyPipeline,
deliver: async (payload) => {
await deliverIrcReply({
payload,
cfg: config,
target: peerId,
accountId: account.accountId,
sendReply: params.sendReply,
statusSink,
});
},
onError: (err, info) => {
runtime.error?.(`irc ${info.kind} reply failed: ${String(err)}`);
},
},
replyOptions: {
onModelSelected,
skillFilter: groupMatch.groupConfig?.skills,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
},
}),
record: {
onRecordError: (err) => {
runtime.error?.(`irc: failed updating session meta: ${String(err)}`);
},
},
});
}

View File

@@ -29,7 +29,7 @@ export {
resolveEffectiveAllowFromLists,
} from "openclaw/plugin-sdk/channel-policy";
export { resolveControlCommandGate } from "openclaw/plugin-sdk/command-auth";
export { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message";
export { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message";
export { chunkTextForOutbound } from "openclaw/plugin-sdk/text-chunking";
export {
deliverFormattedTextWithAttachments,

View File

@@ -64,7 +64,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", async () => {
vi.mock("openclaw/plugin-sdk/channel-message", () => ({
createChannelMessageReplyPipeline: vi.fn(() => ({})),
hasFinalChannelMessageReplyDispatch: vi.fn(() => false),
hasFinalChannelTurnDispatch: vi.fn(() => false),
}));
vi.mock("openclaw/plugin-sdk/webhook-ingress", async () => {

View File

@@ -1,5 +1,5 @@
import type { webhook } from "@line/bot-sdk";
import { hasFinalChannelMessageReplyDispatch } from "openclaw/plugin-sdk/channel-message";
import { hasFinalChannelTurnDispatch } from "openclaw/plugin-sdk/channel-message";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { chunkMarkdownText } from "openclaw/plugin-sdk/reply-runtime";
import {
@@ -313,7 +313,7 @@ export async function monitorLineProvider(
},
});
const dispatchResult = turnResult.dispatched ? turnResult.dispatchResult : undefined;
if (!hasFinalChannelMessageReplyDispatch(dispatchResult)) {
if (!hasFinalChannelTurnDispatch(dispatchResult)) {
logVerbose(`line: no response generated for message from ${ctxPayload.From}`);
}
} catch (err) {

View File

@@ -23,7 +23,7 @@ export {
resolveDefaultGroupPolicy,
warnMissingProviderGroupPolicyFallbackOnce,
} from "openclaw/plugin-sdk/runtime-group-policy";
export { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message";
export { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message";
export type { OutboundReplyPayload } from "openclaw/plugin-sdk/reply-payload";
export { deliverFormattedTextWithAttachments } from "openclaw/plugin-sdk/reply-payload";
export type { PluginRuntime } from "openclaw/plugin-sdk/runtime-store";

View File

@@ -7,7 +7,6 @@ import type { CoreConfig, NextcloudTalkInboundMessage } from "./types.js";
const {
createChannelPairingControllerMock,
dispatchChannelMessageReplyWithBaseMock,
readStoreAllowFromForDmPolicyMock,
resolveDmGroupAccessWithCommandGateMock,
resolveAllowlistProviderRuntimeGroupPolicyMock,
@@ -16,7 +15,6 @@ const {
} = vi.hoisted(() => {
return {
createChannelPairingControllerMock: vi.fn(),
dispatchChannelMessageReplyWithBaseMock: vi.fn(),
readStoreAllowFromForDmPolicyMock: vi.fn(),
resolveDmGroupAccessWithCommandGateMock: vi.fn(),
resolveAllowlistProviderRuntimeGroupPolicyMock: vi.fn(),
@@ -33,7 +31,6 @@ vi.mock("../runtime-api.js", async () => {
return {
...actual,
createChannelPairingController: createChannelPairingControllerMock,
dispatchChannelMessageReplyWithBase: dispatchChannelMessageReplyWithBaseMock,
readStoreAllowFromForDmPolicy: readStoreAllowFromForDmPolicyMock,
resolveDmGroupAccessWithCommandGate: resolveDmGroupAccessWithCommandGateMock,
resolveAllowlistProviderRuntimeGroupPolicy: resolveAllowlistProviderRuntimeGroupPolicyMock,
@@ -183,7 +180,7 @@ describe("nextcloud-talk inbound behavior", () => {
.find((status) => status.lastOutboundAt !== undefined);
expect(typeof outboundStatus?.lastOutboundAt).toBe("number");
expect(outboundStatus?.lastOutboundAt).toBeGreaterThanOrEqual(1_736_380_800_000);
expect(dispatchChannelMessageReplyWithBaseMock).not.toHaveBeenCalled();
expect(sendMessageNextcloudTalkMock).toHaveBeenCalledTimes(1);
});
it("drops unmentioned group traffic before dispatch", async () => {
@@ -222,7 +219,7 @@ describe("nextcloud-talk inbound behavior", () => {
runtime,
});
expect(dispatchChannelMessageReplyWithBaseMock).not.toHaveBeenCalled();
expect(sendMessageNextcloudTalkMock).not.toHaveBeenCalled();
expect(runtime.log).toHaveBeenCalledWith("nextcloud-talk: drop room room-group (no mention)");
});
});

View File

@@ -1,9 +1,9 @@
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import {
GROUP_POLICY_BLOCKED_LABEL,
createChannelMessageReplyPipeline,
createChannelPairingController,
deliverFormattedTextWithAttachments,
dispatchChannelMessageReplyWithBase,
logInboundDrop,
readStoreAllowFromForDmPolicy,
resolveAllowlistProviderRuntimeGroupPolicy,
@@ -286,35 +286,52 @@ export async function handleNextcloudTalkInbound(params: {
CommandAuthorized: commandAuthorized,
});
await dispatchChannelMessageReplyWithBase({
const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({
cfg: config as OpenClawConfig,
agentId: route.agentId,
channel: CHANNEL_ID,
accountId: account.accountId,
route,
});
await core.channel.turn.runPrepared({
channel: CHANNEL_ID,
accountId: account.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
core,
deliver: async (payload) => {
await deliverNextcloudTalkReply({
cfg: config,
payload,
roomToken,
accountId: account.accountId,
statusSink,
});
},
onRecordError: (err) => {
runtime.error?.(`nextcloud-talk: failed updating session meta: ${String(err)}`);
},
onDispatchError: (err, info) => {
runtime.error?.(`nextcloud-talk ${info.kind} reply failed: ${String(err)}`);
},
replyOptions: {
skillFilter: roomConfig?.skills,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
recordInboundSession: core.channel.session.recordInboundSession,
runDispatch: async () =>
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: config as OpenClawConfig,
dispatcherOptions: {
...replyPipeline,
deliver: async (payload) => {
await deliverNextcloudTalkReply({
cfg: config,
payload,
roomToken,
accountId: account.accountId,
statusSink,
});
},
onError: (err, info) => {
runtime.error?.(`nextcloud-talk ${info.kind} reply failed: ${String(err)}`);
},
},
replyOptions: {
onModelSelected,
skillFilter: roomConfig?.skills,
disableBlockStreaming:
typeof account.config.blockStreaming === "boolean"
? !account.config.blockStreaming
: undefined,
},
}),
record: {
onRecordError: (err) => {
runtime.error?.(`nextcloud-talk: failed updating session meta: ${String(err)}`);
},
},
});
}

View File

@@ -10,7 +10,6 @@ export {
createDefaultChannelRuntimeState,
createPluginRuntimeStore,
defineChannelPluginEntry,
dispatchChannelMessageReplyWithBase,
getChatChannelMeta,
jsonResult,
type OpenClawConfig,

View File

@@ -1,22 +1,8 @@
import { createPluginRuntimeMock } from "openclaw/plugin-sdk/channel-test-helpers";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { setQaChannelRuntime } from "../api.js";
import { handleQaInbound, isHttpMediaUrl } from "./inbound.js";
const dispatchChannelMessageReplyWithBaseMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/channel-message", async (importOriginal) => {
const actual = await importOriginal<typeof import("openclaw/plugin-sdk/channel-message")>();
return {
...actual,
dispatchChannelMessageReplyWithBase: dispatchChannelMessageReplyWithBaseMock,
};
});
beforeEach(() => {
dispatchChannelMessageReplyWithBaseMock.mockReset();
});
describe("isHttpMediaUrl", () => {
it("accepts only http and https urls", () => {
expect(isHttpMediaUrl("https://example.com/image.png")).toBe(true);
@@ -64,9 +50,9 @@ describe("handleQaInbound", () => {
},
});
expect(dispatchChannelMessageReplyWithBaseMock).toHaveBeenCalledTimes(1);
expect(dispatchChannelMessageReplyWithBaseMock.mock.calls[0]?.[0].ctxPayload.WasMentioned).toBe(
true,
);
expect(runtime.channel.turn.runPrepared).toHaveBeenCalledTimes(1);
expect(
vi.mocked(runtime.channel.turn.runPrepared).mock.calls[0]?.[0].ctxPayload.WasMentioned,
).toBe(true);
});
});

View File

@@ -1,4 +1,4 @@
import { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message";
import { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import {
buildAgentMediaPayload,
@@ -151,42 +151,59 @@ export async function handleQaInbound(params: {
...mediaPayload,
});
await dispatchChannelMessageReplyWithBase({
const { onModelSelected, ...replyPipeline } = createChannelMessageReplyPipeline({
cfg: params.config as OpenClawConfig,
agentId: route.agentId,
channel: params.channelId,
accountId: params.account.accountId,
route,
});
await runtime.channel.turn.runPrepared({
channel: params.channelId,
accountId: params.account.accountId,
routeSessionKey: route.sessionKey,
storePath,
ctxPayload,
core: runtime,
deliver: async (payload) => {
const text =
payload && typeof payload === "object" && "text" in payload
? ((payload as { text?: string }).text ?? "")
: "";
if (!text.trim()) {
return;
}
await sendQaBusMessage({
baseUrl: params.account.baseUrl,
accountId: params.account.accountId,
to: target,
text,
senderId: params.account.botUserId,
senderName: params.account.botDisplayName,
threadId: inbound.threadId,
replyToId: inbound.id,
});
},
onRecordError: (error) => {
throw error instanceof Error
? error
: new Error(`qa-channel session record failed: ${String(error)}`);
},
onDispatchError: (error) => {
throw error instanceof Error
? error
: new Error(`qa-channel dispatch failed: ${String(error)}`);
recordInboundSession: runtime.channel.session.recordInboundSession,
runDispatch: async () =>
await runtime.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg: params.config as OpenClawConfig,
dispatcherOptions: {
...replyPipeline,
deliver: async (payload) => {
const text =
payload && typeof payload === "object" && "text" in payload
? ((payload as { text?: string }).text ?? "")
: "";
if (!text.trim()) {
return;
}
await sendQaBusMessage({
baseUrl: params.account.baseUrl,
accountId: params.account.accountId,
to: target,
text,
senderId: params.account.botUserId,
senderName: params.account.botDisplayName,
threadId: inbound.threadId,
replyToId: inbound.id,
});
},
onError: (error) => {
throw error instanceof Error
? error
: new Error(`qa-channel dispatch failed: ${String(error)}`);
},
},
replyOptions: { onModelSelected },
}),
record: {
onRecordError: (error) => {
throw error instanceof Error
? error
: new Error(`qa-channel session record failed: ${String(error)}`);
},
},
});
}

View File

@@ -20,4 +20,4 @@ export {
createDefaultChannelRuntimeState,
} from "openclaw/plugin-sdk/status-helpers";
export { createPluginRuntimeStore } from "openclaw/plugin-sdk/runtime-store";
export { dispatchChannelMessageReplyWithBase } from "openclaw/plugin-sdk/channel-message";
export { createChannelMessageReplyPipeline } from "openclaw/plugin-sdk/channel-message";

View File

@@ -1316,12 +1316,12 @@
"canvas:a2ui:bundle": "node scripts/bundle-a2ui.mjs",
"changed:lanes": "node scripts/changed-lanes.mjs",
"check": "node scripts/check.mjs",
"check:architecture": "pnpm check:import-cycles && pnpm check:madge-import-cycles && pnpm check:deprecated-internal-config-api && pnpm check:deprecated-jsdoc",
"check:architecture": "pnpm check:import-cycles && pnpm check:madge-import-cycles && pnpm check:deprecated-api-usage && pnpm check:deprecated-jsdoc",
"check:base-config-schema": "node --import tsx scripts/generate-base-config-schema.ts --check",
"check:bundled-channel-config-metadata": "node --import tsx scripts/generate-bundled-channel-config-metadata.ts --check",
"check:changed": "node scripts/check-changed.mjs",
"check:changelog-attributions": "node scripts/check-changelog-attributions.mjs",
"check:deprecated-internal-config-api": "node scripts/check-deprecated-internal-config-api.mjs",
"check:deprecated-api-usage": "node scripts/check-deprecated-api-usage.mjs",
"check:deprecated-jsdoc": "node scripts/check-deprecated-jsdoc.mjs",
"check:docs": "pnpm format:docs:check && pnpm lint:docs && pnpm docs:check-mdx && pnpm docs:check-i18n-glossary && pnpm docs:check-links",
"check:host-env-policy:swift": "node scripts/generate-host-env-security-policy-swift.mjs --check",

View File

@@ -0,0 +1,210 @@
#!/usr/bin/env node
import fs from "node:fs";
import path from "node:path";
import { collectDeprecatedInternalConfigApiViolations } from "./lib/deprecated-config-api-guard.mjs";
const repoRoot = process.cwd();
const sourceExtensions = new Set([".ts", ".tsx", ".js", ".mjs", ".mts"]);
const skippedSegments = new Set(["node_modules", "dist", "build", "coverage", ".turbo"]);
const skippedFilePatterns = [
/\.test\.[cm]?[jt]sx?$/u,
/\.spec\.[cm]?[jt]sx?$/u,
/\.e2e\.[cm]?[jt]sx?$/u,
/\.d\.ts$/u,
];
function toRepoPath(filePath) {
return path.relative(repoRoot, filePath).split(path.sep).join("/");
}
function shouldSkipFile(filePath, rule) {
const repoPath = toRepoPath(filePath);
return (rule.skippedFilePatterns ?? skippedFilePatterns).some((pattern) =>
pattern.test(repoPath),
);
}
function* walk(dir, rule) {
if (!fs.existsSync(dir)) {
return;
}
for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
if (skippedSegments.has(entry.name)) {
continue;
}
const entryPath = path.join(dir, entry.name);
if (entry.isDirectory()) {
yield* walk(entryPath, rule);
continue;
}
if (!entry.isFile() || !sourceExtensions.has(path.extname(entry.name))) {
continue;
}
if (!shouldSkipFile(entryPath, rule)) {
yield entryPath;
}
}
}
function escapeRegExp(value) {
return value.replace(/[.*+?^${}()|[\]\\]/gu, "\\$&");
}
function collectIdentifierRuleViolations(rule) {
const allowedFiles = new Set(rule.allowedFiles ?? []);
const pattern = new RegExp(
`\\b(?:${rule.names.map((name) => escapeRegExp(name)).join("|")})\\b`,
"gu",
);
const violations = [];
for (const root of rule.roots) {
for (const filePath of walk(path.join(repoRoot, root), rule)) {
const repoPath = toRepoPath(filePath);
if (allowedFiles.has(repoPath)) {
continue;
}
const source = fs.readFileSync(filePath, "utf8");
for (const match of source.matchAll(pattern)) {
const line = source.slice(0, match.index).split("\n").length;
violations.push(`${repoPath}:${line}: ${match[0]} (${rule.message})`);
}
}
}
return violations;
}
function collectModuleSpecifierRuleViolations(rule) {
const allowedFiles = new Set(rule.allowedFiles ?? []);
const specifierPattern = rule.moduleSpecifiers
.map((specifier) => escapeRegExp(specifier))
.join("|");
const patterns = [
new RegExp(
`\\bimport\\s+(?:type\\s+)?(?:[^"']+?\\s+from\\s+)?["'](${specifierPattern})["']`,
"gu",
),
new RegExp(
`\\bexport\\s+(?:type\\s+)?(?:\\*\\s+from\\s+|[^"']+?\\s+from\\s+)["'](${specifierPattern})["']`,
"gu",
),
new RegExp(`\\bimport\\(\\s*["'](${specifierPattern})["']\\s*\\)`, "gu"),
];
const violations = [];
for (const root of rule.roots) {
for (const filePath of walk(path.join(repoRoot, root), rule)) {
const repoPath = toRepoPath(filePath);
if (allowedFiles.has(repoPath)) {
continue;
}
const source = fs.readFileSync(filePath, "utf8");
for (const pattern of patterns) {
for (const match of source.matchAll(pattern)) {
const line = source.slice(0, match.index).split("\n").length;
violations.push(`${repoPath}:${line}: ${match[1]} (${rule.message})`);
}
}
}
}
return violations;
}
function collectRuleViolations(rule) {
if (rule.collect) {
return rule.collect();
}
if (rule.moduleSpecifiers) {
return collectModuleSpecifierRuleViolations(rule);
}
return collectIdentifierRuleViolations(rule);
}
const rules = [
{
id: "internal-config-api",
collect: () => collectDeprecatedInternalConfigApiViolations(),
},
{
id: "plugin-sdk-compat-subpaths",
roots: ["src", "extensions", "packages"],
moduleSpecifiers: [
"openclaw/plugin-sdk/agent-dir-compat",
"openclaw/plugin-sdk/channel-config-schema-legacy",
"openclaw/plugin-sdk/channel-reply-pipeline",
"openclaw/plugin-sdk/channel-runtime",
"openclaw/plugin-sdk/compat",
"openclaw/plugin-sdk/discord",
"openclaw/plugin-sdk/infra-runtime",
"openclaw/plugin-sdk/mattermost",
"openclaw/plugin-sdk/matrix",
"openclaw/plugin-sdk/telegram-account",
"openclaw/plugin-sdk/testing",
"openclaw/plugin-sdk/test-utils",
"openclaw/plugin-sdk/zalouser",
],
message: "use focused non-deprecated plugin SDK subpaths",
},
{
id: "message-api",
roots: ["src", "extensions", "packages"],
names: [
"deliverOutboundPayloads",
"dispatchChannelMessageReplyWithBase",
"recordChannelMessageReplyDispatch",
"buildChannelMessageReplyDispatchBase",
"hasFinalChannelMessageReplyDispatch",
"hasVisibleChannelMessageReplyDispatch",
"resolveChannelMessageReplyDispatchCounts",
"createChannelTurnReplyPipeline",
"deliverDurableInboundReplyPayload",
],
allowedFiles: [
"src/channels/turn/durable-delivery.ts",
"src/channels/turn/kernel.ts",
"src/infra/outbound/deliver-runtime.ts",
"src/infra/outbound/deliver.ts",
"src/plugin-sdk/channel-message-runtime.ts",
"src/plugin-sdk/channel-message.ts",
"src/plugin-sdk/channel-test-helpers.ts",
"src/plugin-sdk/inbound-reply-dispatch.ts",
"src/plugin-sdk/outbound-runtime.ts",
"src/plugin-sdk/test-helpers/outbound-delivery.ts",
"src/plugin-sdk/testing.ts",
],
message: "use sendDurableMessageBatch or deliverInboundReplyWithMessageSendContext",
},
];
const selectedRuleIds = new Set(
process.argv
.slice(2)
.filter((arg) => arg.startsWith("--rule="))
.map((arg) => arg.slice("--rule=".length)),
);
const selectedRules =
selectedRuleIds.size === 0 ? rules : rules.filter((rule) => selectedRuleIds.has(rule.id));
const unknownRuleIds = [...selectedRuleIds].filter((id) => !rules.some((rule) => rule.id === id));
if (unknownRuleIds.length > 0) {
console.error(`Unknown deprecated API usage rule(s): ${unknownRuleIds.join(", ")}`);
process.exit(1);
}
const violations = selectedRules.flatMap((rule) =>
collectRuleViolations(rule).map((violation) => `${rule.id}: ${violation}`),
);
if (violations.length > 0) {
console.error("Deprecated API usage guard failed:");
for (const violation of violations) {
console.error(`- ${violation}`);
}
process.exit(1);
}
console.log("deprecated API usage guard passed");

View File

@@ -1,20 +0,0 @@
#!/usr/bin/env node
import { collectDeprecatedInternalConfigApiViolations } from "./lib/deprecated-config-api-guard.mjs";
export function main() {
const violations = collectDeprecatedInternalConfigApiViolations();
if (violations.length === 0) {
console.log("deprecated internal config API guard passed");
return 0;
}
console.error("Deprecated internal config API guard failed:");
for (const violation of violations) {
console.error(`- ${violation}`);
}
return 1;
}
if (import.meta.url === `file://${process.argv[1]}`) {
process.exitCode = main();
}

View File

@@ -12,8 +12,8 @@ export async function main(argv = process.argv.slice(2)) {
{ name: "runtime action config guard", args: ["check:no-runtime-action-load-config"] },
!includeArchitecture
? {
name: "deprecated internal config API guard",
args: ["check:deprecated-internal-config-api"],
name: "deprecated API usage guard",
args: ["check:deprecated-api-usage"],
}
: null,
{ name: "temp path guard", args: ["check:temp-path-guardrails"] },

View File

@@ -13,6 +13,7 @@ const deliverOutboundPayloadsMock = vi.hoisted(() =>
);
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: deliverOutboundPayloadsMock,
deliverOutboundPayloadsInternal: deliverOutboundPayloadsMock,
}));
const createReplyMediaPathNormalizerMock = vi.hoisted(() =>
@@ -228,7 +229,35 @@ describe("normalizeAgentCommandReplyPayloads", () => {
it("does not report success when best-effort delivery records an error", async () => {
deliverOutboundPayloadsMock.mockImplementationOnce(async (params: unknown) => {
(params as { onError?: (err: unknown) => void }).onError?.(new Error("send failed"));
(
params as {
onError?: (err: unknown, payload: ReplyPayload) => void;
onPayloadDeliveryOutcome?: (outcome: {
index: number;
payload: ReplyPayload;
status: "failed";
error: Error;
stage: "send";
}) => void;
}
).onError?.(new Error("send failed"), { text: "here you go" });
(
params as {
onPayloadDeliveryOutcome?: (outcome: {
index: number;
payload: ReplyPayload;
status: "failed";
error: Error;
stage: "send";
}) => void;
}
).onPayloadDeliveryOutcome?.({
index: 0,
payload: { text: "here you go" },
status: "failed",
error: new Error("send failed"),
stage: "send",
});
return [];
});
@@ -259,6 +288,12 @@ describe("normalizeAgentCommandReplyPayloads", () => {
expect(delivered.deliverySucceeded).toBe(false);
expect(runtime.error).toHaveBeenCalledWith(expect.stringContaining("send failed"));
expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith(
expect.objectContaining({
bestEffort: true,
queuePolicy: "best_effort",
}),
);
});
it("threads agentId into the normalizer when sessionKey is unresolved", async () => {

View File

@@ -3,6 +3,7 @@ import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { normalizeReplyPayload } from "../../auto-reply/reply/normalize-reply.js";
import { createReplyMediaPathNormalizer } from "../../auto-reply/reply/reply-media-paths.runtime.js";
import { sendDurableMessageBatch } from "../../channels/message/runtime.js";
import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js";
import { createReplyPrefixContext } from "../../channels/reply-prefix.js";
import { createOutboundSendDeps, type CliDeps } from "../../cli/outbound-send-deps.js";
@@ -13,7 +14,6 @@ import {
resolveAgentOutboundTarget,
} from "../../infra/outbound/agent-delivery.js";
import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import { buildOutboundResultEnvelope } from "../../infra/outbound/envelope.js";
import {
createOutboundPayloadPlan,
@@ -381,7 +381,7 @@ export async function deliverAgentCommandResult(params: {
}
if (deliver && deliveryChannel && !isInternalMessageChannel(deliveryChannel)) {
if (deliveryTarget) {
await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg,
channel: deliveryChannel,
to: deliveryTarget,
@@ -391,10 +391,17 @@ export async function deliverAgentCommandResult(params: {
replyToId: resolvedReplyToId ?? null,
threadId: resolvedThreadTarget ?? null,
bestEffort: bestEffortDeliver,
durability: bestEffortDeliver ? "best_effort" : "required",
onError: markDeliveryError,
onPayload: logPayload,
deps: createOutboundSendDeps(deps),
});
if (!bestEffortDeliver && (send.status === "failed" || send.status === "partial_failed")) {
throw send.error;
}
if (send.status === "failed" || send.status === "partial_failed") {
deliveryHadError = true;
}
deliverySucceeded = !deliveryHadError;
}
}

View File

@@ -18,6 +18,12 @@ const mocks = vi.hoisted(() => ({
vi.mock("../../infra/outbound/deliver-runtime.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
const { routeReply } = await import("./route-reply.js");

View File

@@ -30,12 +30,12 @@ import {
shouldSuppressReasoningPayload,
} from "./reply-payloads.js";
const deliverRuntimeLoader = createLazyImportLoader(
() => import("../../infra/outbound/deliver-runtime.js"),
const messageRuntimeLoader = createLazyImportLoader(
() => import("../../channels/message/runtime.js"),
);
function loadDeliverRuntime() {
return deliverRuntimeLoader.load();
return messageRuntimeLoader.load();
}
export type RouteReplyParams = {
@@ -215,7 +215,7 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
try {
// Provider docking: this is an execution boundary (we're about to send).
// Keep the module cheap to import by loading outbound plumbing lazily.
const { deliverOutboundPayloads } = await loadDeliverRuntime();
const { sendDurableMessageBatch } = await loadDeliverRuntime();
const outboundSession = buildOutboundSessionContext({
cfg,
agentId: resolvedAgentId,
@@ -229,7 +229,7 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
requesterSenderUsername: params.requesterSenderUsername,
requesterSenderE164: params.requesterSenderE164,
});
const results = await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg,
channel: channelId,
to,
@@ -238,7 +238,7 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
replyToId: resolvedReplyToId ?? null,
threadId: resolvedThreadId,
session: outboundSession,
abortSignal,
signal: abortSignal,
mirror:
params.mirror !== false && params.sessionKey
? {
@@ -251,6 +251,10 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
}
: undefined,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
const results = send.status === "sent" ? send.results : [];
const last = results.at(-1);
return { ok: true, messageId: last?.messageId };

View File

@@ -1,17 +1,21 @@
import { describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { OutboundDeliveryError } from "../../infra/outbound/deliver-types.js";
import type { OutboundPayloadDeliveryOutcome } from "../../infra/outbound/deliver-types.js";
import type { OutboundDeliveryIntent } from "../../infra/outbound/deliver.js";
const deliverOutboundPayloads = vi.hoisted(() => vi.fn());
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads,
deliverOutboundPayloadsInternal: deliverOutboundPayloads,
}));
import { sendDurableMessageBatch, withDurableMessageSendContext } from "./send.js";
type DeliveryIntentCallbackParams = {
onDeliveryIntent?: (intent: OutboundDeliveryIntent) => void;
onPayloadDeliveryOutcome?: (outcome: OutboundPayloadDeliveryOutcome) => void;
};
const cfg = {} as OpenClawConfig;
@@ -367,6 +371,211 @@ describe("withDurableMessageSendContext", () => {
);
});
it("reports hook-cancelled deliveries as explicit suppressed sends", async () => {
deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => {
params.onPayloadDeliveryOutcome?.({
index: 0,
status: "suppressed",
reason: "cancelled_by_message_sending_hook",
hookEffect: { cancelReason: "owned-by-other-agent" },
});
return [];
});
const onCommitReceipt = vi.fn();
const result = await sendDurableMessageBatch({
cfg,
channel: "slack",
to: "C123",
payloads: [{ text: "claimed elsewhere" }],
onCommitReceipt,
});
expect(result).toEqual(
expect.objectContaining({
status: "suppressed",
reason: "cancelled_by_message_sending_hook",
payloadOutcomes: [
expect.objectContaining({
status: "suppressed",
reason: "cancelled_by_message_sending_hook",
hookEffect: { cancelReason: "owned-by-other-agent" },
}),
],
}),
);
expect(onCommitReceipt).toHaveBeenCalledWith(
expect.objectContaining({ platformMessageIds: [] }),
);
});
it("forwards payload delivery outcomes to callers while collecting durable outcomes", async () => {
const onPayloadDeliveryOutcome = vi.fn();
deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => {
params.onPayloadDeliveryOutcome?.({
index: 0,
status: "suppressed",
reason: "cancelled_by_message_sending_hook",
});
return [];
});
const result = await sendDurableMessageBatch({
cfg,
channel: "slack",
to: "C123",
payloads: [{ text: "claimed elsewhere" }],
onPayloadDeliveryOutcome,
});
expect(result).toEqual(
expect.objectContaining({
status: "suppressed",
payloadOutcomes: [
expect.objectContaining({
index: 0,
status: "suppressed",
reason: "cancelled_by_message_sending_hook",
}),
],
}),
);
expect(onPayloadDeliveryOutcome).toHaveBeenCalledWith(
expect.objectContaining({
index: 0,
status: "suppressed",
reason: "cancelled_by_message_sending_hook",
}),
);
});
it("reports zero-result failed best-effort payloads as failed sends", async () => {
const error = new Error("send failed");
deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => {
params.onPayloadDeliveryOutcome?.({
index: 0,
status: "failed",
error,
sentBeforeError: false,
stage: "platform_send",
});
return [];
});
const onCommitReceipt = vi.fn();
const onSendFailure = vi.fn();
const result = await sendDurableMessageBatch({
cfg,
channel: "telegram",
to: "chat-1",
payloads: [{ text: "hello" }],
bestEffort: true,
onCommitReceipt,
onSendFailure,
});
expect(result).toEqual(
expect.objectContaining({
status: "failed",
error,
stage: "platform_send",
payloadOutcomes: [
expect.objectContaining({
index: 0,
status: "failed",
error,
stage: "platform_send",
}),
],
}),
);
expect(onCommitReceipt).not.toHaveBeenCalled();
expect(onSendFailure).toHaveBeenCalledWith(error);
});
it("reports best-effort partial failures with the delivered receipt prefix", async () => {
const error = new Error("second payload failed");
deliverOutboundPayloads.mockImplementationOnce(async (params: DeliveryIntentCallbackParams) => {
params.onPayloadDeliveryOutcome?.({
index: 0,
status: "sent",
results: [{ channel: "telegram", messageId: "msg-1" }],
});
params.onPayloadDeliveryOutcome?.({
index: 1,
status: "failed",
error,
sentBeforeError: true,
stage: "platform_send",
});
return [{ channel: "telegram", messageId: "msg-1" }];
});
const onSendFailure = vi.fn();
const result = await sendDurableMessageBatch({
cfg,
channel: "telegram",
to: "chat-1",
payloads: [{ text: "first" }, { text: "second" }],
onSendFailure,
});
expect(result).toEqual(
expect.objectContaining({
status: "partial_failed",
results: [{ channel: "telegram", messageId: "msg-1" }],
receipt: expect.objectContaining({ platformMessageIds: ["msg-1"] }),
error,
sentBeforeError: true,
}),
);
expect(onSendFailure).toHaveBeenCalledWith(error);
});
it("maps thrown outbound partial delivery errors to partial_failed", async () => {
const cause = new Error("network reset");
const error = new OutboundDeliveryError("network reset", {
cause,
results: [{ channel: "telegram", messageId: "msg-1" }],
payloadOutcomes: [
{
index: 0,
status: "sent",
results: [{ channel: "telegram", messageId: "msg-1" }],
},
{
index: 1,
status: "failed",
error: cause,
sentBeforeError: true,
stage: "platform_send",
},
],
stage: "platform_send",
});
deliverOutboundPayloads.mockRejectedValueOnce(error);
const onSendFailure = vi.fn();
const result = await sendDurableMessageBatch({
cfg,
channel: "telegram",
to: "chat-1",
payloads: [{ text: "first" }, { text: "second" }],
onSendFailure,
});
expect(result).toEqual(
expect.objectContaining({
status: "partial_failed",
results: [{ channel: "telegram", messageId: "msg-1" }],
receipt: expect.objectContaining({ platformMessageIds: ["msg-1"] }),
error,
sentBeforeError: true,
}),
);
expect(onSendFailure).toHaveBeenCalledWith(error);
});
it("runs the failure hook when send-context orchestration throws", async () => {
const onSendFailure = vi.fn();
const error = new Error("boom");

View File

@@ -2,7 +2,11 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { formatErrorMessage } from "../../infra/errors.js";
import type { OutboundDeliveryResult } from "../../infra/outbound/deliver-types.js";
import {
deliverOutboundPayloads,
isOutboundDeliveryError,
type OutboundPayloadDeliveryOutcome,
} from "../../infra/outbound/deliver-types.js";
import {
deliverOutboundPayloadsInternal,
type DeliverOutboundPayloadsParams,
type OutboundDeliveryIntent,
} from "../../infra/outbound/deliver.js";
@@ -33,21 +37,71 @@ export type DurableMessageBatchSendParams = Omit<
previousReceipt?: MessageReceipt;
};
export type DurableMessageSuppressionReason =
| "cancelled_by_message_sending_hook"
| "empty_after_message_sending_hook"
| "no_visible_payload"
| "adapter_returned_no_identity"
| "no_visible_result";
export type DurableMessageFailureStage = "platform_send" | "queue" | "unknown";
export type DurableMessagePayloadDeliveryOutcome =
| {
index: number;
status: "sent";
results: OutboundDeliveryResult[];
}
| {
index: number;
status: "suppressed";
reason: DurableMessageSuppressionReason;
hookEffect?: {
cancelReason?: string;
metadata?: Record<string, unknown>;
};
}
| {
index: number;
status: "failed";
error: unknown;
sentBeforeError: boolean;
stage: DurableMessageFailureStage;
};
export type DurableMessageBatchSendResult =
| {
status: "sent";
results: OutboundDeliveryResult[];
receipt: MessageReceipt;
deliveryIntent?: OutboundDeliveryIntent;
payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[];
}
| {
status: "suppressed";
results: [];
receipt: MessageReceipt;
deliveryIntent?: OutboundDeliveryIntent;
reason: "no_visible_result";
reason: DurableMessageSuppressionReason;
payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[];
}
| { status: "failed"; error: unknown };
| {
status: "partial_failed";
results: OutboundDeliveryResult[];
receipt: MessageReceipt;
error: unknown;
sentBeforeError: true;
deliveryIntent?: OutboundDeliveryIntent;
payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[];
}
| {
status: "failed";
error: unknown;
stage?: DurableMessageFailureStage;
payloadOutcomes?: DurableMessagePayloadDeliveryOutcome[];
};
export type DurableMessageDeliveryOutcome = DurableMessageBatchSendResult;
const neverAbortedSignal = new AbortController().signal;
@@ -65,6 +119,18 @@ function toDurableMessageIntent(
};
}
function toDurablePayloadOutcome(
outcome: OutboundPayloadDeliveryOutcome,
): DurableMessagePayloadDeliveryOutcome {
return outcome;
}
function toDurablePayloadOutcomes(
outcomes: readonly OutboundPayloadDeliveryOutcome[],
): DurableMessagePayloadDeliveryOutcome[] {
return outcomes.map((outcome) => toDurablePayloadOutcome(outcome));
}
export type DurableMessageSendContextParams = DurableMessageBatchSendParams & {
durability?: Exclude<MessageDurabilityPolicy, "disabled">;
preview?: LiveMessageState<ReplyPayload>;
@@ -99,6 +165,7 @@ export async function withDurableMessageSendContext<T>(
onCommitReceipt,
onPreviewUpdate,
onSendFailure,
onPayloadDeliveryOutcome,
payloads,
preview,
previousReceipt,
@@ -129,13 +196,20 @@ export async function withDurableMessageSendContext<T>(
return liveState;
},
send: async (rendered): Promise<DurableMessageBatchSendResult> => {
const payloadOutcomes: OutboundPayloadDeliveryOutcome[] = [];
const durablePayloadOutcomes = (): DurableMessagePayloadDeliveryOutcome[] =>
toDurablePayloadOutcomes(payloadOutcomes);
try {
const results = await deliverOutboundPayloads({
const results = await deliverOutboundPayloadsInternal({
...deliveryParams,
payloads: rendered.payloads,
renderedBatchPlan: rendered.plan,
queuePolicy,
...(effectiveSignal ? { abortSignal: effectiveSignal } : {}),
onPayloadDeliveryOutcome: (outcome) => {
payloadOutcomes.push(outcome);
onPayloadDeliveryOutcome?.(outcome);
},
onDeliveryIntent: (intent) => {
deliveryIntent = intent;
ctx.intent = toDurableMessageIntent(intent, rendered);
@@ -146,13 +220,36 @@ export async function withDurableMessageSendContext<T>(
threadId: params.threadId == null ? undefined : String(params.threadId),
replyToId: params.replyToId ?? undefined,
});
const failedOutcome = payloadOutcomes.find((outcome) => outcome.status === "failed");
if (failedOutcome) {
if (results.length > 0) {
return {
status: "partial_failed",
results,
receipt,
error: failedOutcome.error,
sentBeforeError: true,
...(deliveryIntent ? { deliveryIntent } : {}),
...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}),
};
}
return {
status: "failed",
error: failedOutcome.error,
stage: failedOutcome.stage,
...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}),
};
}
if (results.length === 0) {
return {
status: "suppressed",
results: [],
receipt,
...(deliveryIntent ? { deliveryIntent } : {}),
reason: "no_visible_result",
reason:
payloadOutcomes.find((outcome) => outcome.status === "suppressed")?.reason ??
"no_visible_result",
...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}),
};
}
return {
@@ -160,8 +257,37 @@ export async function withDurableMessageSendContext<T>(
results,
receipt,
...(deliveryIntent ? { deliveryIntent } : {}),
...(payloadOutcomes.length > 0 ? { payloadOutcomes: durablePayloadOutcomes() } : {}),
};
} catch (error: unknown) {
if (isOutboundDeliveryError(error)) {
if (error.results.length > 0) {
const receipt = createMessageReceiptFromOutboundResults({
results: error.results,
threadId: params.threadId == null ? undefined : String(params.threadId),
replyToId: params.replyToId ?? undefined,
});
return {
status: "partial_failed",
results: error.results,
receipt,
error,
sentBeforeError: true,
...(deliveryIntent ? { deliveryIntent } : {}),
...(error.payloadOutcomes.length > 0
? { payloadOutcomes: toDurablePayloadOutcomes(error.payloadOutcomes) }
: {}),
};
}
return {
status: "failed",
error,
stage: error.stage,
...(error.payloadOutcomes.length > 0
? { payloadOutcomes: toDurablePayloadOutcomes(error.payloadOutcomes) }
: {}),
};
}
return { status: "failed", error };
}
},
@@ -213,7 +339,7 @@ export async function sendDurableMessageBatch(
return await withDurableMessageSendContext(params, async (ctx) => {
const rendered = await ctx.render();
const result = await ctx.send(rendered);
if (result.status !== "failed") {
if (result.status === "sent" || result.status === "suppressed") {
await ctx.commit(result.receipt);
} else {
await ctx.fail(result.error);

View File

@@ -165,4 +165,34 @@ describe("durable inbound reply delivery", () => {
}),
);
});
it("reports durable partial send failures as failed delivery", async () => {
const error = new Error("second chunk failed");
mocks.sendDurableMessageBatch.mockResolvedValueOnce({
status: "partial_failed",
results: [{ channel: "telegram", messageId: "m1" }],
receipt: {
primaryPlatformMessageId: "m1",
platformMessageIds: ["m1"],
parts: [{ platformMessageId: "m1", kind: "text", index: 0 }],
sentAt: 1,
},
error,
sentBeforeError: true,
});
const result = await deliverInboundReplyWithMessageSendContext({
cfg: {},
channel: "telegram",
agentId: "main",
info: { kind: "final" },
payload: { text: "final" },
ctxPayload: {
CommandAuthorized: true,
OriginatingTo: "chat-1",
},
});
expect(result).toEqual({ status: "failed", error });
});
});

View File

@@ -191,6 +191,9 @@ export async function deliverInboundReplyWithMessageSendContext(
if (send.status === "failed") {
return { status: "failed" as const, error: send.error };
}
if (send.status === "partial_failed") {
return { status: "failed" as const, error: send.error };
}
const delivery = createChannelDeliveryResultFromReceipt({
receipt: send.receipt,

View File

@@ -20,6 +20,7 @@ vi.mock("../channels/plugins/index.js", () => ({
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/targets.js", async () => {

View File

@@ -15,6 +15,7 @@ vi.mock("./isolated-agent/delivery-target.js", () => ({
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/identity.js", () => ({

View File

@@ -1,8 +1,8 @@
import { sendDurableMessageBatch } from "../channels/message/runtime.js";
import type { CliDeps } from "../cli/deps.types.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import type { OpenClawConfig } from "../config/types.js";
import { formatErrorMessage } from "../infra/errors.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { getChildLogger } from "../logging.js";
@@ -94,7 +94,7 @@ async function deliverCronAnnouncePayload(params: {
message: string;
abortSignal: AbortSignal;
}): Promise<void> {
await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg: params.cfg,
channel: params.delivery.resolvedTarget.channel,
to: params.delivery.resolvedTarget.to,
@@ -105,8 +105,11 @@ async function deliverCronAnnouncePayload(params: {
identity: params.delivery.identity,
bestEffort: false,
deps: createOutboundSendDeps(params.deps),
abortSignal: params.abortSignal,
signal: params.abortSignal,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
}
export async function sendCronAnnouncePayloadStrict(params: {

View File

@@ -15,12 +15,17 @@ import { SILENT_REPLY_TOKEN } from "../../auto-reply/tokens.js";
// --- Module mocks (must be hoisted before imports) ---
const { countActiveDescendantRunsMock, maybeApplyTtsToPayloadMock, retireSessionMcpRuntimeMock } =
vi.hoisted(() => ({
countActiveDescendantRunsMock: vi.fn().mockReturnValue(0),
maybeApplyTtsToPayloadMock: vi.fn(async (params: { payload: unknown }) => params.payload),
retireSessionMcpRuntimeMock: vi.fn().mockResolvedValue(true),
}));
const {
countActiveDescendantRunsMock,
deliverOutboundPayloadsMock,
maybeApplyTtsToPayloadMock,
retireSessionMcpRuntimeMock,
} = vi.hoisted(() => ({
countActiveDescendantRunsMock: vi.fn().mockReturnValue(0),
deliverOutboundPayloadsMock: vi.fn().mockResolvedValue([{ ok: true }]),
maybeApplyTtsToPayloadMock: vi.fn(async (params: { payload: unknown }) => params.payload),
retireSessionMcpRuntimeMock: vi.fn().mockResolvedValue(true),
}));
vi.mock("../../config/sessions/main-session.js", () => ({
resolveAgentMainSessionKey: vi.fn(({ agentId }: { agentId: string }) => `agent:${agentId}:main`),
@@ -40,7 +45,8 @@ vi.mock("./delivery-subagent-registry.runtime.js", () => ({
}));
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: vi.fn().mockResolvedValue([{ ok: true }]),
deliverOutboundPayloads: deliverOutboundPayloadsMock,
deliverOutboundPayloadsInternal: deliverOutboundPayloadsMock,
}));
vi.mock("../../infra/outbound/identity.js", () => ({

View File

@@ -556,8 +556,8 @@ export async function dispatchCronDelivery(
const {
buildOutboundSessionContext,
createOutboundSendDeps,
deliverOutboundPayloads,
resolveAgentOutboundIdentity,
sendDurableMessageBatch,
} = await loadDeliveryOutboundRuntime();
const identity = resolveAgentOutboundIdentity(params.cfgWithAgentDefaults, params.agentId);
const deliveryIdempotencyKey = buildDirectCronDeliveryIdempotencyKey({
@@ -660,8 +660,8 @@ export async function dispatchCronDelivery(
}
: undefined;
const runDelivery = async () =>
await deliverOutboundPayloads({
const runDelivery = async () => {
const send = await sendDurableMessageBatch({
cfg: params.cfgWithAgentDefaults,
channel: delivery.channel,
to: delivery.to,
@@ -671,8 +671,9 @@ export async function dispatchCronDelivery(
session: deliverySession,
identity,
bestEffort: params.deliveryBestEffort,
durability: params.deliveryBestEffort ? "best_effort" : "required",
deps: createOutboundSendDeps(params.deps),
abortSignal: params.abortSignal,
signal: params.abortSignal,
onError,
// Isolated cron direct delivery uses its own transient retry loop.
// Keep all attempts out of the write-ahead delivery queue so a
@@ -681,6 +682,17 @@ export async function dispatchCronDelivery(
// See: https://github.com/openclaw/openclaw/issues/40545
skipQueue: true,
});
if (
send.status === "failed" ||
(!params.deliveryBestEffort && send.status === "partial_failed")
) {
throw send.error;
}
if (send.status === "partial_failed") {
hadPartialFailure = true;
}
return send.status === "sent" || send.status === "partial_failed" ? send.results : [];
};
const deliveryResults = options?.retryTransient
? await retryTransientDirectCronDelivery({
jobId: params.job.id,

View File

@@ -1,8 +1,6 @@
export { createOutboundSendDeps } from "../../cli/outbound-send-deps.js";
export {
deliverOutboundPayloads,
type OutboundDeliveryResult,
} from "../../infra/outbound/deliver.js";
export { sendDurableMessageBatch } from "../../channels/message/runtime.js";
export { type OutboundDeliveryResult } from "../../infra/outbound/deliver.js";
export { resolveAgentOutboundIdentity } from "../../infra/outbound/identity.js";
export { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
export { enqueueSystemEvent } from "../../infra/system-events.js";

View File

@@ -99,6 +99,7 @@ vi.mock("../../infra/outbound/channel-selection.js", () => ({
vi.mock("../../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
vi.mock("../../config/sessions.js", async () => {

View File

@@ -1,4 +1,5 @@
import { resolveSessionAgentId } from "../../agents/agent-scope.js";
import { sendDurableMessageBatch } from "../../channels/message/runtime.js";
import { normalizeChannelId } from "../../channels/plugins/index.js";
import { dispatchChannelMessageAction } from "../../channels/plugins/message-action-dispatch.js";
import { createOutboundSendDeps } from "../../cli/deps.js";
@@ -6,7 +7,6 @@ import { applyPluginAutoEnable } from "../../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { resolveOutboundChannelPlugin } from "../../infra/outbound/channel-resolution.js";
import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js";
import { deliverOutboundPayloads } from "../../infra/outbound/deliver.js";
import {
ensureOutboundSessionEntry,
resolveOutboundSessionRoute,
@@ -538,7 +538,7 @@ export const sendHandlers: GatewayRequestHandlers = {
sessionKey: outboundSessionKey,
conversationType: outboundRoute?.chatType,
});
const results = await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg,
channel: outboundChannel,
to: deliveryTarget,
@@ -560,6 +560,10 @@ export const sendHandlers: GatewayRequestHandlers = {
}
: undefined,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
const results = send.status === "sent" ? send.results : [];
const result = results.at(-1);
if (!result) {

View File

@@ -1,13 +1,13 @@
export { resolveSessionAgentId } from "../agents/agent-scope.js";
export { sanitizeInboundSystemTags } from "../auto-reply/reply/inbound-text.js";
export { normalizeChannelId } from "../channels/plugins/index.js";
export { sendDurableMessageBatch } from "../channels/message/runtime.js";
export { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
export { agentCommandFromIngress } from "../commands/agent.js";
export { getRuntimeConfig } from "../config/io.js";
export { updateSessionStore } from "../config/sessions.js";
export { loadOrCreateDeviceIdentity } from "../infra/device-identity.js";
export { requestHeartbeat } from "../infra/heartbeat-wake.js";
export { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
export { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
export { resolveOutboundTarget } from "../infra/outbound/targets.js";
export { registerApnsRegistration } from "../infra/push-apns.js";

View File

@@ -19,7 +19,6 @@ import {
createOutboundSendDeps,
defaultRuntime,
deleteMediaBuffer,
deliverOutboundPayloads,
enqueueSystemEvent,
formatForLog,
getRuntimeConfig,
@@ -39,6 +38,7 @@ import {
resolveSessionModelRef,
sanitizeInboundSystemTags,
scopedHeartbeatWakeOptions,
sendDurableMessageBatch,
updateSessionStore,
} from "./server-node-events.runtime.js";
@@ -345,15 +345,19 @@ async function sendReceiptAck(params: {
cfg: params.cfg,
sessionKey: params.sessionKey,
});
await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg: params.cfg,
channel: params.channel,
to: resolved.to,
payloads: [{ text: params.text }],
session,
bestEffort: true,
durability: "best_effort",
deps: createOutboundSendDeps(params.deps),
});
if (send.status === "failed") {
throw send.error;
}
}
export const handleNodeEvent = async (

View File

@@ -110,6 +110,16 @@ const mocks = vi.hoisted(() => {
skippedMaxRetries: 0,
deferredBackoff: 0,
})),
resolveAgentConfig: vi.fn(() => undefined),
resolveAgentWorkspaceDir: vi.fn(() => "/tmp/openclaw-test-workspace"),
resolveDefaultAgentId: vi.fn(() => "main"),
normalizeSessionDeliveryFields: vi.fn((source?: Record<string, unknown>) => ({
deliveryContext: source?.deliveryContext,
lastChannel: source?.lastChannel ?? source?.channel,
lastTo: source?.lastTo,
lastAccountId: source?.lastAccountId,
lastThreadId: source?.lastThreadId,
})),
injectTimestamp: vi.fn((message: string) => `stamped:${message}`),
timestampOptsFromConfig: vi.fn(() => ({})),
recordInboundSessionAndDispatchReply: vi.fn(
@@ -124,9 +134,18 @@ const mocks = vi.hoisted(() => {
vi.unmock("./server-restart-sentinel.js");
vi.resetModules();
vi.mock("../agents/agent-scope.js", () => ({
resolveSessionAgentId: mocks.resolveSessionAgentId,
}));
vi.mock("../agents/agent-scope.js", async () => {
const actual = await vi.importActual<typeof import("../agents/agent-scope.js")>(
"../agents/agent-scope.js",
);
return {
...actual,
resolveAgentConfig: mocks.resolveAgentConfig,
resolveAgentWorkspaceDir: mocks.resolveAgentWorkspaceDir,
resolveDefaultAgentId: mocks.resolveDefaultAgentId,
resolveSessionAgentId: mocks.resolveSessionAgentId,
};
});
vi.mock("../infra/restart-sentinel.js", () => ({
readRestartSentinel: mocks.readRestartSentinel,
@@ -147,6 +166,7 @@ vi.mock("../config/sessions.js", () => ({
}));
vi.mock("../config/sessions/thread-info.js", () => ({
parseSessionThreadInfoFast: mocks.parseSessionThreadInfo,
parseSessionThreadInfo: mocks.parseSessionThreadInfo,
}));
@@ -157,6 +177,7 @@ vi.mock("./session-utils.js", () => ({
vi.mock("../utils/delivery-context.shared.js", () => ({
deliveryContextFromSession: mocks.deliveryContextFromSession,
mergeDeliveryContext: mocks.mergeDeliveryContext,
normalizeSessionDeliveryFields: mocks.normalizeSessionDeliveryFields,
}));
vi.mock("../channels/plugins/index.js", async () => {
@@ -176,12 +197,34 @@ vi.mock("../channels/plugins/index.js", async () => {
};
});
vi.mock("../channels/turn/kernel.js", () => ({
dispatchAssembledChannelTurn: async (params: {
delivery: {
preparePayload?: (payload: { text?: string; replyToId?: string | null }) => {
text?: string;
replyToId?: string | null;
};
deliver: (payload: { text?: string; replyToId?: string | null }) => Promise<void>;
onError?: (err: unknown, info: { kind: string }) => void;
};
}) => {
await mocks.recordInboundSessionAndDispatchReply({
...params,
deliver: async (payload: { text?: string; replyToId?: string | null }) =>
params.delivery.deliver(params.delivery.preparePayload?.(payload) ?? payload),
onDispatchError: (err: unknown, info: { kind: string }) =>
params.delivery.onError?.(err, info),
} as unknown as RecordInboundSessionAndDispatchReplyParams);
},
}));
vi.mock("../infra/outbound/targets.js", () => ({
resolveOutboundTarget: mocks.resolveOutboundTarget,
}));
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/delivery-queue.js", () => ({
@@ -213,6 +256,7 @@ vi.mock("../logging/subsystem.js", () => {
info: mocks.logInfo,
warn: mocks.logWarn,
error: mocks.logError,
isEnabled: vi.fn(() => false),
child: vi.fn(),
};
logger.child.mockReturnValue(logger);

View File

@@ -2,14 +2,15 @@ import { resolveSessionAgentId } from "../agents/agent-scope.js";
import { finalizeInboundContext } from "../auto-reply/reply/inbound-context.js";
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
import type { ChatType } from "../channels/chat-type.js";
import { sendDurableMessageBatch } from "../channels/message/runtime.js";
import { getChannelPlugin, normalizeChannelId } from "../channels/plugins/index.js";
import { recordInboundSession } from "../channels/session.js";
import { dispatchAssembledChannelTurn } from "../channels/turn/kernel.js";
import type { CliDeps } from "../cli/deps.types.js";
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
import { parseSessionThreadInfo } from "../config/sessions/thread-info.js";
import { formatErrorMessage } from "../infra/errors.js";
import { requestHeartbeat } from "../infra/heartbeat-wake.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { ackDelivery, enqueueDelivery, failDelivery } from "../infra/outbound/delivery-queue.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
@@ -34,7 +35,6 @@ import {
} from "../infra/session-delivery-queue.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { recordChannelMessageReplyDispatch } from "../plugin-sdk/channel-message.js";
import { stringifyRouteThreadId } from "../plugin-sdk/channel-route.js";
import type { OutboundReplyPayload } from "../plugin-sdk/reply-payload.js";
import {
@@ -115,7 +115,7 @@ async function deliverRestartSentinelNotice(params: {
}).catch(() => null);
for (let attempt = 1; attempt <= OUTBOUND_MAX_ATTEMPTS; attempt += 1) {
try {
const results = await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg: params.cfg,
channel: params.channel,
to: params.to,
@@ -128,6 +128,10 @@ async function deliverRestartSentinelNotice(params: {
bestEffort: false,
skipQueue: true,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
const results = send.status === "sent" ? send.results : [];
if (results.length > 0) {
if (queueId) {
await ackDelivery(queueId).catch(() => {});
@@ -272,73 +276,95 @@ async function deliverQueuedSessionDelivery(params: {
config: cfg,
});
let dispatchError: unknown;
await recordChannelMessageReplyDispatch({
const ctxPayload = finalizeInboundContext(
{
Body: userMessage,
BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(cfg)),
BodyForCommands: "",
RawBody: userMessage,
CommandBody: "",
SessionKey: canonicalKey,
AccountId: route.accountId,
MessageSid: messageId,
Timestamp: Date.now(),
Provider: route.channel,
Surface: route.channel,
ChatType: route.chatType,
CommandAuthorized: false,
ReplyToId: route.replyToId,
OriginatingChannel: route.channel,
OriginatingTo: route.to,
ExplicitDeliverRoute: true,
MessageThreadId: route.threadId,
},
{
forceBodyForCommands: true,
forceChatType: true,
},
);
await dispatchAssembledChannelTurn({
cfg,
channel: route.channel,
accountId: route.accountId,
agentId,
routeSessionKey: canonicalKey,
storePath,
ctxPayload: finalizeInboundContext(
{
Body: userMessage,
BodyForAgent: injectTimestamp(userMessage, timestampOptsFromConfig(cfg)),
BodyForCommands: "",
RawBody: userMessage,
CommandBody: "",
SessionKey: canonicalKey,
AccountId: route.accountId,
MessageSid: messageId,
Timestamp: Date.now(),
Provider: route.channel,
Surface: route.channel,
ChatType: route.chatType,
CommandAuthorized: false,
ReplyToId: route.replyToId,
OriginatingChannel: route.channel,
OriginatingTo: route.to,
ExplicitDeliverRoute: true,
MessageThreadId: route.threadId,
},
{
forceBodyForCommands: true,
forceChatType: true,
},
),
ctxPayload,
recordInboundSession,
dispatchReplyWithBufferedBlockDispatcher,
deliver: async (payload) => {
const outboundPayload = resolveRestartContinuationOutboundPayload({
payload,
messageId,
replyToId: route.replyToId,
});
const results = await deliverOutboundPayloads({
cfg,
channel: route.channel,
to: route.to,
accountId: route.accountId,
replyToId: route.replyToId,
threadId: route.threadId,
payloads: [outboundPayload],
session: buildOutboundSessionContext({
cfg,
sessionKey: canonicalKey,
delivery: {
preparePayload: (payload) =>
resolveRestartContinuationOutboundPayload({
payload,
messageId,
replyToId: route.replyToId,
}),
deps: params.deps,
bestEffort: false,
});
if (results.length === 0) {
throw new Error("restart continuation delivery returned no results");
}
durable: (_payload, info) =>
info.kind === "final"
? {
to: route.to,
replyToId: route.replyToId,
threadId: route.threadId,
deps: params.deps,
}
: false,
deliver: async (payload) => {
const send = await sendDurableMessageBatch({
cfg,
channel: route.channel,
to: route.to,
accountId: route.accountId,
replyToId: route.replyToId,
threadId: route.threadId,
payloads: [payload],
session: buildOutboundSessionContext({
cfg,
sessionKey: canonicalKey,
}),
deps: params.deps,
bestEffort: false,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
const results = send.status === "sent" ? send.results : [];
if (results.length === 0) {
throw new Error("restart continuation delivery returned no results");
}
},
onError: (err, info) => {
dispatchError ??= err;
log.warn(`restart continuation dispatch failed during ${info.kind}: ${String(err)}`, {
sessionKey: canonicalKey,
});
},
},
onRecordError: (err) => {
log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, {
sessionKey: canonicalKey,
});
},
onDispatchError: (err) => {
dispatchError ??= err;
record: {
onRecordError: (err) => {
log.warn(`restart continuation failed to record inbound session metadata: ${String(err)}`, {
sessionKey: canonicalKey,
});
},
},
});
if (dispatchError) {

View File

@@ -30,6 +30,7 @@ vi.mock("../infra/env.js", () => ({
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: hoisted.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: hoisted.deliverOutboundPayloads,
}));
vi.mock("../infra/outbound/delivery-queue.js", () => ({

View File

@@ -159,10 +159,10 @@ function recoverPendingOutboundDeliveries(params: {
}): void {
void (async () => {
const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js");
const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js");
const { deliverOutboundPayloadsInternal } = await import("../infra/outbound/deliver.js");
const logRecovery = params.log.child("delivery-recovery");
await recoverPendingDeliveries({
deliver: deliverOutboundPayloads,
deliver: deliverOutboundPayloadsInternal,
log: logRecovery,
cfg: params.cfg,
});

View File

@@ -1,2 +1,2 @@
export { resolveExecApprovalSessionTarget } from "./exec-approval-session-target.js";
export { deliverOutboundPayloads } from "./outbound/deliver.js";
export { sendDurableMessageBatch } from "../channels/message/runtime.js";

View File

@@ -44,7 +44,8 @@ import {
} from "./plugin-approvals.js";
const log = createSubsystemLogger("gateway/exec-approvals");
type DeliverOutboundPayloads = typeof import("./outbound/deliver.js").deliverOutboundPayloads;
type DeliverApprovalPayloads =
typeof import("../channels/message/runtime.js").sendDurableMessageBatch;
type MaybePromise<T> = T | Promise<T>;
type ResolveSessionTargetFn = (params: {
cfg: OpenClawConfig;
@@ -130,7 +131,7 @@ export type ExecApprovalForwarder = {
type ExecApprovalForwarderDeps = {
getConfig?: () => OpenClawConfig;
deliver?: DeliverOutboundPayloads;
deliver?: DeliverApprovalPayloads;
nowMs?: () => number;
resolveSessionTarget?: ResolveSessionTargetFn;
};
@@ -360,7 +361,7 @@ async function deliverToTargets(params: {
cfg: OpenClawConfig;
targets: ForwardTarget[];
buildPayload: (target: ForwardTarget) => ReplyPayload;
deliver: DeliverOutboundPayloads;
deliver: DeliverApprovalPayloads;
beforeDeliver?: (target: ForwardTarget, payload: ReplyPayload) => Promise<void> | void;
shouldSend?: () => boolean;
}) {
@@ -375,7 +376,7 @@ async function deliverToTargets(params: {
try {
const payload = params.buildPayload(target);
await params.beforeDeliver?.(target, payload);
await params.deliver({
const send = await params.deliver({
cfg: params.cfg,
channel,
to: target.to,
@@ -383,6 +384,9 @@ async function deliverToTargets(params: {
threadId: target.threadId,
payloads: [payload],
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
} catch (err) {
log.error(`exec approvals: failed to deliver to ${channel}:${target.to}: ${String(err)}`);
}
@@ -529,7 +533,7 @@ function createApprovalHandlers<
>(params: {
strategy: ApprovalStrategy<TRequest, TResolved, TRouteRequest>;
getConfig: () => OpenClawConfig;
deliver: DeliverOutboundPayloads;
deliver: DeliverApprovalPayloads;
nowMs: () => number;
resolveSessionTarget: ResolveSessionTargetFn;
}) {
@@ -772,8 +776,8 @@ export function createExecApprovalForwarder(
const deliver =
deps.deliver ??
(async (params) => {
const { deliverOutboundPayloads } = await loadExecApprovalForwarderRuntime();
return deliverOutboundPayloads(params);
const { sendDurableMessageBatch } = await loadExecApprovalForwarderRuntime();
return sendDurableMessageBatch(params);
});
const nowMs = deps.nowMs ?? Date.now;
const resolveSessionTarget = deps.resolveSessionTarget ?? defaultResolveSessionTarget;

View File

@@ -12,7 +12,8 @@ import {
} from "./system-events.js";
vi.mock("./outbound/deliver.js", () => ({
deliverOutboundPayloads: vi.fn().mockResolvedValue(undefined),
deliverOutboundPayloads: vi.fn().mockResolvedValue([]),
deliverOutboundPayloadsInternal: vi.fn().mockResolvedValue([]),
}));
afterEach(() => {

View File

@@ -9,7 +9,8 @@ import {
} from "./heartbeat-runner.test-utils.js";
vi.mock("./outbound/deliver.js", () => ({
deliverOutboundPayloads: vi.fn().mockResolvedValue(undefined),
deliverOutboundPayloads: vi.fn().mockResolvedValue([]),
deliverOutboundPayloadsInternal: vi.fn().mockResolvedValue([]),
}));
type SeedSessionInput = {

View File

@@ -38,6 +38,7 @@ import { resolveDefaultModel } from "../auto-reply/reply/directive-handling.defa
import { resolveResponsePrefixTemplate } from "../auto-reply/reply/response-prefix-template.js";
import { HEARTBEAT_TOKEN } from "../auto-reply/tokens.js";
import type { ReplyPayload } from "../auto-reply/types.js";
import { sendDurableMessageBatch } from "../channels/message/runtime.js";
import { getChannelPlugin } from "../channels/plugins/index.js";
import type {
ChannelHeartbeatDeps,
@@ -129,7 +130,6 @@ import {
setHeartbeatWakeHandler,
} from "./heartbeat-wake.js";
import type { OutboundSendDeps } from "./outbound/deliver.js";
import { deliverOutboundPayloads } from "./outbound/deliver.js";
import { buildOutboundSessionContext } from "./outbound/session-context.js";
import {
resolveHeartbeatDeliveryTarget,
@@ -1594,7 +1594,7 @@ export async function runHeartbeatOnce(opts: {
return false;
}
}
await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg,
channel: delivery.channel,
to: delivery.to,
@@ -1604,6 +1604,9 @@ export async function runHeartbeatOnce(opts: {
session: outboundSession,
deps: opts.deps,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
return true;
};
@@ -1863,7 +1866,7 @@ export async function runHeartbeatOnce(opts: {
}
}
await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg,
channel: delivery.channel,
to: delivery.to,
@@ -1883,6 +1886,9 @@ export async function runHeartbeatOnce(opts: {
],
deps: opts.deps,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
await markCommitmentsStatus({
cfg,
ids: dueCommitmentIds,

View File

@@ -1 +1,3 @@
/** @deprecated Use `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)`. */
export { deliverOutboundPayloads } from "./deliver.js";
export { deliverOutboundPayloadsInternal } from "./deliver.js";

View File

@@ -15,3 +15,62 @@ export type OutboundDeliveryResult = {
// Channel docking: stash channel-specific fields here to avoid core type churn.
meta?: Record<string, unknown>;
};
export type OutboundPayloadDeliverySuppressionReason =
| "cancelled_by_message_sending_hook"
| "empty_after_message_sending_hook"
| "no_visible_payload"
| "adapter_returned_no_identity";
export type OutboundDeliveryFailureStage = "platform_send" | "queue" | "unknown";
export type OutboundPayloadDeliveryOutcome =
| {
index: number;
status: "sent";
results: OutboundDeliveryResult[];
}
| {
index: number;
status: "suppressed";
reason: OutboundPayloadDeliverySuppressionReason;
hookEffect?: {
cancelReason?: string;
metadata?: Record<string, unknown>;
};
}
| {
index: number;
status: "failed";
error: unknown;
sentBeforeError: boolean;
stage: OutboundDeliveryFailureStage;
};
export class OutboundDeliveryError extends Error {
readonly results: OutboundDeliveryResult[];
readonly payloadOutcomes: OutboundPayloadDeliveryOutcome[];
readonly sentBeforeError: boolean;
readonly stage: OutboundDeliveryFailureStage;
constructor(
message: string,
options: {
cause: unknown;
results?: readonly OutboundDeliveryResult[];
payloadOutcomes?: readonly OutboundPayloadDeliveryOutcome[];
stage?: OutboundDeliveryFailureStage;
},
) {
super(message, { cause: options.cause });
this.name = "OutboundDeliveryError";
this.results = [...(options.results ?? [])];
this.payloadOutcomes = [...(options.payloadOutcomes ?? [])];
this.sentBeforeError = this.results.length > 0;
this.stage = options.stage ?? "unknown";
}
}
export function isOutboundDeliveryError(error: unknown): error is OutboundDeliveryError {
return error instanceof OutboundDeliveryError;
}

View File

@@ -1447,6 +1447,38 @@ describe("deliverOutboundPayloads", () => {
expect(sendText).not.toHaveBeenCalled();
});
it("keeps payload outcome indexes tied to original input payload positions", async () => {
const sendMatrix = vi.fn().mockResolvedValue({
messageId: "visible",
roomId: "!room:example",
});
const payloadOutcomes: unknown[] = [];
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room:example",
payloads: [{ text: "NO_REPLY" }, { text: "visible reply" }],
deps: { matrix: sendMatrix },
onPayloadDeliveryOutcome: (outcome) => {
payloadOutcomes.push(outcome);
},
});
expect(results).toEqual([
expect.objectContaining({
channel: "matrix",
messageId: "visible",
}),
]);
expect(payloadOutcomes).toEqual([
expect.objectContaining({
index: 1,
status: "sent",
}),
]);
});
it("strips internal runtime scaffolding added by message_sending hooks before delivery", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",

View File

@@ -42,7 +42,13 @@ import { emitDiagnosticEvent, type DiagnosticMessageDeliveryKind } from "../diag
import { formatErrorMessage } from "../errors.js";
import { throwIfAborted } from "./abort.js";
import { resolveOutboundChannelMessageAdapter } from "./channel-resolution.js";
import type { OutboundDeliveryResult } from "./deliver-types.js";
import {
OutboundDeliveryError,
type OutboundDeliveryFailureStage,
type OutboundDeliveryResult,
type OutboundPayloadDeliveryOutcome,
type OutboundPayloadDeliverySuppressionReason,
} from "./deliver-types.js";
import {
attachOutboundDeliveryCommitHook,
runOutboundDeliveryCommitHooks,
@@ -67,7 +73,6 @@ import {
import type { DeliveryMirror } from "./mirror.js";
import {
createOutboundPayloadPlan,
projectOutboundPayloadPlanForDelivery,
summarizeOutboundPayloadForTransport,
type NormalizedOutboundPayload,
type OutboundPayloadPlan,
@@ -562,6 +567,11 @@ function createChannelOutboundContextBase(
const isAbortError = (err: unknown): boolean => err instanceof Error && err.name === "AbortError";
const isDeliveryAbortError = (err: unknown): boolean =>
isAbortError(err) ||
(err instanceof OutboundDeliveryError &&
isAbortError((err as Error & { cause?: unknown }).cause));
async function markQueuedPlatformSendAttemptStarted(params: {
queueId: string;
queuePolicy: OutboundDeliveryQueuePolicy;
@@ -615,6 +625,7 @@ type DeliverOutboundPayloadsCoreParams = {
bestEffort?: boolean;
onError?: (err: unknown, payload: NormalizedOutboundPayload) => void;
onPayload?: (payload: NormalizedOutboundPayload) => void;
onPayloadDeliveryOutcome?: (outcome: OutboundPayloadDeliveryOutcome) => void;
/** Session/agent context used for hooks and media local-root scoping. */
session?: OutboundSessionContext;
mirror?: DeliveryMirror;
@@ -630,6 +641,13 @@ function collectPayloadMediaSources(plan: readonly OutboundPayloadPlan[]): strin
return plan.flatMap((entry) => entry.parts.mediaUrls);
}
/**
* @deprecated Direct outbound delivery is compatibility/runtime substrate.
* New message lifecycle code should use `sendDurableMessageBatch` from
* `src/channels/message/send.ts` or `deliverInboundReplyWithMessageSendContext`
* from `src/channels/turn/durable-delivery.ts`. Keep direct use only for
* outbound substrate, recovery, and compatibility paths.
*/
export type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & {
/** @internal Skip write-ahead queue (used by crash-recovery to avoid re-enqueueing). */
skipQueue?: boolean;
@@ -730,13 +748,18 @@ function normalizeEmptyPayloadForDelivery(payload: ReplyPayload): ReplyPayload |
return payload;
}
type NormalizedPayloadForChannelDelivery = {
index: number;
payload: ReplyPayload;
};
function normalizePayloadsForChannelDelivery(
plan: readonly OutboundPayloadPlan[],
handler: ChannelHandler,
): ReplyPayload[] {
const normalizedPayloads: ReplyPayload[] = [];
for (const payload of projectOutboundPayloadPlanForDelivery(plan)) {
let sanitizedPayload = stripInternalRuntimeScaffoldingFromPayload(payload);
): NormalizedPayloadForChannelDelivery[] {
const normalizedPayloads: NormalizedPayloadForChannelDelivery[] = [];
for (const entry of plan) {
let sanitizedPayload = stripInternalRuntimeScaffoldingFromPayload(entry.payload);
if (handler.sanitizeText && sanitizedPayload.text) {
if (!handler.shouldSkipPlainTextSanitization?.(sanitizedPayload)) {
sanitizedPayload = {
@@ -754,7 +777,7 @@ function normalizePayloadsForChannelDelivery(
)
: null;
if (normalized) {
normalizedPayloads.push(normalized);
normalizedPayloads.push({ index: entry.sourceIndex, payload: normalized });
}
}
return normalizedPayloads;
@@ -1002,12 +1025,16 @@ async function applyMessageSendingHook(params: {
threadId?: string | number | null;
}): Promise<{
cancelled: boolean;
cancelReason?: string;
hookMetadata?: Record<string, unknown>;
contentRewritten: boolean;
payload: ReplyPayload;
payloadSummary: NormalizedOutboundPayload;
}> {
if (!params.enabled) {
return {
cancelled: false,
contentRewritten: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
@@ -1034,6 +1061,9 @@ async function applyMessageSendingHook(params: {
if (sendingResult?.cancel) {
return {
cancelled: true,
...(sendingResult.cancelReason ? { cancelReason: sendingResult.cancelReason } : {}),
...(sendingResult.metadata ? { hookMetadata: sendingResult.metadata } : {}),
contentRewritten: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
@@ -1041,6 +1071,7 @@ async function applyMessageSendingHook(params: {
if (sendingResult?.content == null) {
return {
cancelled: false,
contentRewritten: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
@@ -1049,6 +1080,7 @@ async function applyMessageSendingHook(params: {
const spokenText = sendingResult.content;
return {
cancelled: false,
contentRewritten: true,
payload: {
...params.payload,
spokenText,
@@ -1065,6 +1097,7 @@ async function applyMessageSendingHook(params: {
};
return {
cancelled: false,
contentRewritten: true,
payload,
payloadSummary: {
...params.payloadSummary,
@@ -1075,14 +1108,61 @@ async function applyMessageSendingHook(params: {
// Don't block delivery on hook failure.
return {
cancelled: false,
contentRewritten: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
}
}
function toOutboundDeliveryError(params: {
error: unknown;
results: readonly OutboundDeliveryResult[];
payloadOutcomes: readonly OutboundPayloadDeliveryOutcome[];
stage: OutboundDeliveryFailureStage;
}): OutboundDeliveryError {
if (params.error instanceof OutboundDeliveryError) {
return params.error;
}
return new OutboundDeliveryError(formatErrorMessage(params.error), {
cause: params.error,
results: params.results,
payloadOutcomes: params.payloadOutcomes,
stage: params.stage,
});
}
function suppressedPayloadOutcome(params: {
index: number;
reason: OutboundPayloadDeliverySuppressionReason;
hookEffect?: {
cancelReason?: string;
metadata?: Record<string, unknown>;
};
}): OutboundPayloadDeliveryOutcome {
return {
index: params.index,
status: "suppressed",
reason: params.reason,
...(params.hookEffect ? { hookEffect: params.hookEffect } : {}),
};
}
/**
* @deprecated Direct outbound delivery is compatibility/runtime substrate.
* New message lifecycle code should use `sendDurableMessageBatch` from
* `src/channels/message/send.ts` or `deliverInboundReplyWithMessageSendContext`
* from `src/channels/turn/durable-delivery.ts`. Keep direct use only for
* outbound substrate, recovery, and compatibility paths.
*/
export async function deliverOutboundPayloads(
params: DeliverOutboundPayloadsParams,
): Promise<OutboundDeliveryResult[]> {
return await deliverOutboundPayloadsInternal(params);
}
export async function deliverOutboundPayloadsInternal(
params: DeliverOutboundPayloadsParams,
): Promise<OutboundDeliveryResult[]> {
const { channel, to, payloads } = params;
const queuePolicy = params.queuePolicy ?? "best_effort";
@@ -1220,7 +1300,7 @@ async function deliverOutboundPayloadsWithQueueCleanup(
return results;
} catch (err) {
if (queueId) {
if (isAbortError(err)) {
if (isDeliveryAbortError(err)) {
await ackDelivery(queueId).catch(() => {});
} else if (!platformResultsReturned) {
await failDelivery(queueId, formatErrorMessage(err)).catch(() => {});
@@ -1323,6 +1403,16 @@ async function deliverOutboundPayloadsCore(
}
};
const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler);
const payloadOutcomes: OutboundPayloadDeliveryOutcome[] = [];
const recordPayloadOutcome = (outcome: OutboundPayloadDeliveryOutcome): void => {
payloadOutcomes.push(outcome);
params.onPayloadDeliveryOutcome?.(outcome);
};
if (normalizedPayloads.length === 0 && payloads.length > 0) {
payloads.forEach((_payload, index) => {
recordPayloadOutcome(suppressedPayloadOutcome({ index, reason: "no_visible_payload" }));
});
}
const hookRunner = getGlobalHookRunner();
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
const mirrorIsGroup = params.mirror?.isGroup;
@@ -1348,7 +1438,7 @@ async function deliverOutboundPayloadsCore(
},
);
}
for (const payload of normalizedPayloads) {
for (const { index: payloadIndex, payload } of normalizedPayloads) {
let payloadSummary = buildPayloadSummary(payload);
let deliveryKind: DiagnosticMessageDeliveryKind = "other";
let deliveryStartedAt = 0;
@@ -1407,6 +1497,20 @@ async function deliverOutboundPayloadsCore(
threadId: params.threadId,
});
if (hookResult.cancelled) {
const hookEffect =
hookResult.cancelReason || hookResult.hookMetadata
? {
...(hookResult.cancelReason ? { cancelReason: hookResult.cancelReason } : {}),
...(hookResult.hookMetadata ? { metadata: hookResult.hookMetadata } : {}),
}
: undefined;
recordPayloadOutcome(
suppressedPayloadOutcome({
index: payloadIndex,
reason: "cancelled_by_message_sending_hook",
...(hookEffect ? { hookEffect } : {}),
}),
);
continue;
}
const renderedPayload = stripInternalRuntimeScaffoldingFromPayload(
@@ -1421,6 +1525,14 @@ async function deliverOutboundPayloadsCore(
)
: null;
if (!effectivePayload) {
recordPayloadOutcome(
suppressedPayloadOutcome({
index: payloadIndex,
reason: hookResult.contentRewritten
? "empty_after_message_sending_hook"
: "no_visible_payload",
}),
);
continue;
}
payloadSummary = buildPayloadSummary(effectivePayload);
@@ -1458,9 +1570,16 @@ async function deliverOutboundPayloadsCore(
);
if (!hasDeliveryResultIdentity(delivery)) {
completeDeliveryDiagnostics(0);
recordPayloadOutcome(
suppressedPayloadOutcome({
index: payloadIndex,
reason: "adapter_returned_no_identity",
}),
);
continue;
}
results.push(delivery);
recordPayloadOutcome({ index: payloadIndex, status: "sent", results: [delivery] });
await maybePinDeliveredMessage({
handler,
payload: effectivePayload,
@@ -1494,6 +1613,20 @@ async function deliverOutboundPayloadsCore(
await sendTextChunks(payloadSummary.text, sendOverrides);
}
const deliveredResults = results.slice(beforeCount);
if (deliveredResults.length > 0) {
recordPayloadOutcome({
index: payloadIndex,
status: "sent",
results: deliveredResults,
});
} else {
recordPayloadOutcome(
suppressedPayloadOutcome({
index: payloadIndex,
reason: "adapter_returned_no_identity",
}),
);
}
const messageId = results.at(-1)?.messageId;
const pinMessageId = deliveredResults.find((entry) => entry.messageId)?.messageId;
await maybePinDeliveredMessage({
@@ -1535,6 +1668,20 @@ async function deliverOutboundPayloadsCore(
const beforeCount = results.length;
await sendTextChunks(fallbackText, sendOverrides);
const deliveredResults = results.slice(beforeCount);
if (deliveredResults.length > 0) {
recordPayloadOutcome({
index: payloadIndex,
status: "sent",
results: deliveredResults,
});
} else {
recordPayloadOutcome(
suppressedPayloadOutcome({
index: payloadIndex,
reason: "adapter_returned_no_identity",
}),
);
}
const messageId = results.at(-1)?.messageId;
const pinMessageId = deliveredResults.find((entry) => entry.messageId)?.messageId;
await maybePinDeliveredMessage({
@@ -1591,6 +1738,21 @@ async function deliverOutboundPayloadsCore(
target: deliveryTarget,
results: results.slice(beforeCount),
});
const deliveredResults = results.slice(beforeCount);
if (deliveredResults.length > 0) {
recordPayloadOutcome({
index: payloadIndex,
status: "sent",
results: deliveredResults,
});
} else {
recordPayloadOutcome(
suppressedPayloadOutcome({
index: payloadIndex,
reason: "adapter_returned_no_identity",
}),
);
}
completeDeliveryDiagnostics(results.length - beforeCount);
emitMessageSent({
success: true,
@@ -1598,6 +1760,13 @@ async function deliverOutboundPayloadsCore(
messageId: lastMessageId,
});
} catch (err) {
recordPayloadOutcome({
index: payloadIndex,
status: "failed",
error: err,
sentBeforeError: results.length > 0,
stage: "platform_send",
});
errorDeliveryDiagnostics(err);
emitMessageSent({
success: false,
@@ -1605,7 +1774,12 @@ async function deliverOutboundPayloadsCore(
error: formatErrorMessage(err),
});
if (!params.bestEffort) {
throw err;
throw toOutboundDeliveryError({
error: err,
results,
payloadOutcomes,
stage: "platform_send",
});
}
params.onError?.(err, payloadSummary);
}

View File

@@ -44,6 +44,7 @@ vi.mock("./targets.js", () => ({
vi.mock("./deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
resolveOutboundDurableFinalDeliverySupport: mocks.resolveOutboundDurableFinalDeliverySupport,
}));
@@ -418,4 +419,49 @@ describe("sendMessage", () => {
expect(mocks.resolveRuntimePluginRegistry).not.toHaveBeenCalled();
});
it("does not throw best-effort direct send failures", async () => {
mocks.deliverOutboundPayloads.mockImplementationOnce(async (params: unknown) => {
(
params as {
onPayloadDeliveryOutcome?: (outcome: {
index: number;
payload: { text: string };
status: "failed";
error: Error;
stage: "send";
}) => void;
}
).onPayloadDeliveryOutcome?.({
index: 0,
payload: { text: "hi" },
status: "failed",
error: new Error("transport unavailable"),
stage: "send",
});
return [];
});
await expect(
sendMessage({
cfg: {},
channel: "forum",
to: "123456",
content: "hi",
bestEffort: true,
}),
).resolves.toMatchObject({
channel: "forum",
to: "123456",
via: "direct",
result: undefined,
});
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
bestEffort: true,
queuePolicy: "best_effort",
}),
);
});
});

View File

@@ -1,5 +1,6 @@
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import { deriveDurableFinalDeliveryRequirements } from "../../channels/message/capabilities.js";
import { sendDurableMessageBatch } from "../../channels/message/runtime.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { OutboundMediaAccess } from "../../media/load-options.js";
import type { PollInput } from "../../polls.js";
@@ -13,7 +14,6 @@ import {
import { resolveOutboundChannelPlugin } from "./channel-resolution.js";
import { resolveMessageChannelSelection } from "./channel-selection.js";
import {
deliverOutboundPayloads,
resolveOutboundDurableFinalDeliverySupport,
type DurableFinalDeliveryRequirements,
type OutboundDeliveryResult,
@@ -379,7 +379,7 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
silent: params.silent,
});
}
const results = await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg,
channel: outboundChannel,
to: resolvedTarget.to,
@@ -392,8 +392,9 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
forceDocument: params.forceDocument,
deps: params.deps,
bestEffort: params.bestEffort,
queuePolicy: params.queuePolicy,
abortSignal: params.abortSignal,
durability:
params.bestEffort || params.queuePolicy === "best_effort" ? "best_effort" : "required",
signal: params.abortSignal,
silent: params.silent,
mediaAccess: params.mediaAccess,
mirror: params.mirror
@@ -405,6 +406,10 @@ export async function sendMessage(params: MessageSendParams): Promise<MessageSen
}
: undefined,
});
if (!params.bestEffort && (send.status === "failed" || send.status === "partial_failed")) {
throw send.error;
}
const results = send.status === "sent" || send.status === "partial_failed" ? send.results : [];
return {
channel,

View File

@@ -47,6 +47,7 @@ export type OutboundPayloadJson = {
};
export type OutboundPayloadPlan = {
sourceIndex: number;
payload: ReplyPayload;
parts: ReturnType<typeof resolveSendableOutboundReplyParts>;
hasPresentation: boolean;
@@ -130,6 +131,10 @@ type PreparedOutboundPayloadPlanEntry = {
isSilent: boolean;
};
type IndexedPreparedOutboundPayloadPlanEntry = PreparedOutboundPayloadPlanEntry & {
sourceIndex: number;
};
function createOutboundPayloadPlanEntry(
payload: ReplyPayload,
context: Pick<OutboundPayloadPlanContext, "extractMarkdownImages"> = {},
@@ -195,15 +200,15 @@ export function createOutboundPayloadPlan(
});
const hasPendingSpawnedChildren =
context.hasPendingSpawnedChildren ?? resolvePendingSpawnedChildren(context.sessionKey);
const prepared: PreparedOutboundPayloadPlanEntry[] = [];
for (const payload of payloads) {
const prepared: IndexedPreparedOutboundPayloadPlanEntry[] = [];
for (const [sourceIndex, payload] of payloads.entries()) {
const entry = createOutboundPayloadPlanEntry(payload, {
extractMarkdownImages: context.extractMarkdownImages,
});
if (!entry) {
continue;
}
prepared.push(entry);
prepared.push({ ...entry, sourceIndex });
}
const hasVisibleNonSilentContent = prepared.some((entry) => {
if (entry.isSilent) {
@@ -219,6 +224,7 @@ export function createOutboundPayloadPlan(
for (const entry of prepared) {
if (!entry.isSilent) {
plan.push({
sourceIndex: entry.sourceIndex,
payload: entry.payload,
parts: resolveSendableOutboundReplyParts(entry.payload),
hasPresentation: entry.hasPresentation,
@@ -243,6 +249,7 @@ export function createOutboundPayloadPlan(
continue;
}
plan.push({
sourceIndex: entry.sourceIndex,
payload: visibleSilentPayload,
parts: resolveSendableOutboundReplyParts(visibleSilentPayload),
hasPresentation: entry.hasPresentation,
@@ -261,6 +268,7 @@ export function createOutboundPayloadPlan(
continue;
}
plan.push({
sourceIndex: entry.sourceIndex,
payload: visibleSilentPayload,
parts: resolveSendableOutboundReplyParts(visibleSilentPayload),
hasPresentation: entry.hasPresentation,

View File

@@ -15,6 +15,11 @@ const mocks = vi.hoisted(() => ({
enqueueSystemEvent: vi.fn(),
}));
vi.mock("./outbound/deliver.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
type SessionMaintenanceWarningModule = typeof import("./session-maintenance-warning.js");
let deliverSessionMaintenanceWarning: SessionMaintenanceWarningModule["deliverSessionMaintenanceWarning"];

View File

@@ -16,11 +16,11 @@ type WarningParams = {
const warnedContexts = new Map<string, string>();
const log = createSubsystemLogger("session-maintenance-warning");
let deliverRuntimePromise: Promise<typeof import("./outbound/deliver-runtime.js")> | null = null;
let messageRuntimePromise: Promise<typeof import("../channels/message/runtime.js")> | null = null;
function resetSessionMaintenanceWarningForTests() {
warnedContexts.clear();
deliverRuntimePromise = null;
messageRuntimePromise = null;
}
export const __testing = {
@@ -28,8 +28,8 @@ export const __testing = {
} as const;
function loadDeliverRuntime() {
deliverRuntimePromise ??= import("./outbound/deliver-runtime.js");
return deliverRuntimePromise;
messageRuntimePromise ??= import("../channels/message/runtime.js");
return messageRuntimePromise;
}
function shouldSendWarning(): boolean {
@@ -126,12 +126,12 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P
}
try {
const { deliverOutboundPayloads } = await loadDeliverRuntime();
const { sendDurableMessageBatch } = await loadDeliverRuntime();
const outboundSession = buildOutboundSessionContext({
cfg: params.cfg,
sessionKey: params.sessionKey,
});
await deliverOutboundPayloads({
const send = await sendDurableMessageBatch({
cfg: params.cfg,
channel,
to: target.to,
@@ -140,6 +140,9 @@ export async function deliverSessionMaintenanceWarning(params: WarningParams): P
payloads: [{ text }],
session: outboundSession,
});
if (send.status === "failed" || send.status === "partial_failed") {
throw send.error;
}
} catch (err) {
log.warn(`Failed to deliver session maintenance warning: ${String(err)}`);
enqueueSystemEvent(text, { sessionKey: params.sessionKey });

View File

@@ -6,6 +6,12 @@ const mockDeliverOutboundPayloads = vi.hoisted(() => vi.fn());
vi.mock("../infra/outbound/deliver-runtime.js", () => ({
deliverOutboundPayloads: (...args: unknown[]) => mockDeliverOutboundPayloads(...args),
deliverOutboundPayloadsInternal: (...args: unknown[]) => mockDeliverOutboundPayloads(...args),
}));
vi.mock("../infra/outbound/deliver.js", () => ({
deliverOutboundPayloads: (...args: unknown[]) => mockDeliverOutboundPayloads(...args),
deliverOutboundPayloadsInternal: (...args: unknown[]) => mockDeliverOutboundPayloads(...args),
}));
vi.mock("../utils/message-channel.js", () => ({
@@ -38,15 +44,18 @@ describe("sendTranscriptEcho", () => {
});
expect(mockDeliverOutboundPayloads).toHaveBeenCalledOnce();
expect(mockDeliverOutboundPayloads).toHaveBeenCalledWith({
cfg: {},
channel: "voicechat",
to: "+10000000001",
accountId: "acc1",
threadId: undefined,
payloads: [{ text: DEFAULT_ECHO_TRANSCRIPT_FORMAT.replace("{transcript}", "hello world") }],
bestEffort: true,
});
expect(mockDeliverOutboundPayloads).toHaveBeenCalledWith(
expect.objectContaining({
cfg: {},
channel: "voicechat",
to: "+10000000001",
accountId: "acc1",
threadId: undefined,
payloads: [{ text: DEFAULT_ECHO_TRANSCRIPT_FORMAT.replace("{transcript}", "hello world") }],
bestEffort: true,
queuePolicy: "best_effort",
}),
);
});
it("uses a custom format when provided", async () => {

View File

@@ -4,12 +4,11 @@ import { logVerbose, shouldLogVerbose } from "../globals.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { isDeliverableMessageChannel } from "../utils/message-channel.js";
let deliverRuntimePromise: Promise<typeof import("../infra/outbound/deliver-runtime.js")> | null =
null;
let messageRuntimePromise: Promise<typeof import("../channels/message/runtime.js")> | null = null;
function loadDeliverRuntime() {
deliverRuntimePromise ??= import("../infra/outbound/deliver-runtime.js");
return deliverRuntimePromise;
function loadMessageRuntime() {
messageRuntimePromise ??= import("../channels/message/runtime.js");
return messageRuntimePromise;
}
export const DEFAULT_ECHO_TRANSCRIPT_FORMAT = '📝 "{transcript}"';
@@ -52,8 +51,8 @@ export async function sendTranscriptEcho(params: {
const text = formatEchoTranscript(transcript, params.format ?? DEFAULT_ECHO_TRANSCRIPT_FORMAT);
try {
const { deliverOutboundPayloads } = await loadDeliverRuntime();
await deliverOutboundPayloads({
const { sendDurableMessageBatch } = await loadMessageRuntime();
const send = await sendDurableMessageBatch({
cfg,
channel: normalizedChannel,
to,
@@ -61,7 +60,11 @@ export async function sendTranscriptEcho(params: {
threadId: ctx.MessageThreadId ?? undefined,
payloads: [{ text }],
bestEffort: true,
durability: "best_effort",
});
if (send.status === "failed") {
throw send.error;
}
if (shouldLogVerbose()) {
logVerbose(`media: echo-transcript sent to ${normalizedChannel}/${to}`);
}

View File

@@ -1,16 +1,20 @@
export {
buildChannelMessageReplyDispatchBase,
dispatchChannelMessageReplyWithBase,
hasFinalChannelMessageReplyDispatch,
hasVisibleChannelMessageReplyDispatch,
recordChannelMessageReplyDispatch,
resolveChannelMessageReplyDispatchCounts,
} from "./inbound-reply-dispatch.js";
export {
createChannelTurnReplyPipeline,
deliverDurableInboundReplyPayload,
deliverInboundReplyWithMessageSendContext,
} from "../channels/turn/kernel.js";
/** @deprecated Compatibility helper for legacy reply dispatch bridges. */
export { buildChannelMessageReplyDispatchBase } from "./inbound-reply-dispatch.js";
/** @deprecated Compatibility reply-dispatch bridge. Use `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)`. */
export { dispatchChannelMessageReplyWithBase } from "./inbound-reply-dispatch.js";
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export { hasFinalChannelMessageReplyDispatch } from "./inbound-reply-dispatch.js";
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export { hasVisibleChannelMessageReplyDispatch } from "./inbound-reply-dispatch.js";
/** @deprecated Compatibility reply-dispatch bridge. Use `sendDurableMessageBatch(...)` or `deliverInboundReplyWithMessageSendContext(...)`. */
export { recordChannelMessageReplyDispatch } from "./inbound-reply-dispatch.js";
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export { resolveChannelMessageReplyDispatchCounts } from "./inbound-reply-dispatch.js";
/** @deprecated Compatibility assembly for legacy buffered reply dispatchers. */
export { createChannelTurnReplyPipeline } from "../channels/turn/kernel.js";
/** @deprecated Use `deliverInboundReplyWithMessageSendContext(...)`. */
export { deliverDurableInboundReplyPayload } from "../channels/turn/kernel.js";
export { deliverInboundReplyWithMessageSendContext } from "../channels/turn/kernel.js";
export type {
DurableInboundReplyDeliveryOptions,
DurableInboundReplyDeliveryParams,

View File

@@ -138,17 +138,28 @@ export type {
RenderedMessageBatchPlanKind,
} from "../channels/message/index.js";
export {
hasFinalChannelTurnDispatch,
hasVisibleChannelTurnDispatch,
resolveChannelTurnDispatchCounts,
};
type ChannelTurnKernelModule = typeof import("../channels/turn/kernel.js");
type InboundReplyDispatchModule = typeof import("./inbound-reply-dispatch.js");
/** @deprecated Use `createChannelMessageReplyPipeline(...)` for compatibility dispatchers. */
export function createChannelTurnReplyPipeline(params: CreateChannelReplyPipelineParams) {
return createChannelReplyPipeline(params);
}
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts;
/** @deprecated Compatibility helper for legacy reply dispatch bridges. */
export const buildChannelMessageReplyDispatchBase: InboundReplyDispatchModule["buildChannelMessageReplyDispatchBase"] =
((params) => ({
cfg: params.cfg,
@@ -163,12 +174,24 @@ export const buildChannelMessageReplyDispatchBase: InboundReplyDispatchModule["b
params.core.channel.reply.dispatchReplyWithBufferedBlockDispatcher,
})) as InboundReplyDispatchModule["buildChannelMessageReplyDispatchBase"];
/**
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter and route sends through
* `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export const dispatchChannelMessageReplyWithBase: InboundReplyDispatchModule["dispatchChannelMessageReplyWithBase"] =
async (...args) => {
const mod = await import("./inbound-reply-dispatch.js");
return await mod.dispatchChannelMessageReplyWithBase(...args);
};
/**
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter and route sends through
* `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export const recordChannelMessageReplyDispatch: InboundReplyDispatchModule["recordChannelMessageReplyDispatch"] =
async (...args) => {
const mod = await import("./inbound-reply-dispatch.js");

View File

@@ -12,13 +12,14 @@ export {
createEmptyPluginRegistry,
createOutboundTestPlugin,
createTestRegistry,
deliverOutboundPayloads,
initializeGlobalHookRunner,
releasePinnedPluginChannelRegistry,
resetGlobalHookRunner,
setActivePluginRegistry,
type PluginHookRegistration,
} from "./test-helpers/outbound-delivery.js";
/** @deprecated Direct outbound delivery is runtime substrate; use channel message runtime helpers. */
export { deliverOutboundPayloads } from "./test-helpers/outbound-delivery.js";
export { createPluginRuntimeMock } from "./test-helpers/plugin-runtime-mock.js";
export {
createSendCfgThreadingRuntime,

View File

@@ -11,6 +11,7 @@ vi.mock("../infra/outbound/delivery-queue.js", () => ({
vi.mock("../infra/outbound/deliver-runtime.js", () => ({
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
deliverOutboundPayloadsInternal: mocks.deliverOutboundPayloads,
}));
type DeliveryQueueRuntimeModule = typeof import("./delivery-queue-runtime.js");

View File

@@ -19,7 +19,8 @@ async function loadOutboundDeliverRuntime(): Promise<OutboundDeliverRuntimeModul
}
export async function drainPendingDeliveries(opts: DrainPendingDeliveriesOptions): Promise<void> {
const deliver = opts.deliver ?? (await loadOutboundDeliverRuntime()).deliverOutboundPayloads;
const deliver =
opts.deliver ?? (await loadOutboundDeliverRuntime()).deliverOutboundPayloadsInternal;
await coreDrainPendingDeliveries({
...opts,
deliver,

View File

@@ -138,6 +138,11 @@ type RecordChannelMessageReplyDispatchParams = {
/**
* Resolve the shared dispatch base and immediately record + dispatch one inbound reply turn.
*
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export async function dispatchChannelMessageReplyWithBase(
params: BuildInboundReplyDispatchBaseParams &
@@ -171,7 +176,14 @@ export async function dispatchInboundReplyWithBase(
await dispatchChannelMessageReplyWithBase(params);
}
/** Record the inbound session first, then dispatch the reply using normalized outbound delivery. */
/**
* Record the inbound session first, then dispatch the reply using normalized outbound delivery.
*
* @deprecated Compatibility reply-dispatch bridge. New channel plugins should
* expose a `message` adapter via `defineChannelMessageAdapter(...)` and route
* sends through `deliverInboundReplyWithMessageSendContext(...)` or
* `sendDurableMessageBatch(...)`.
*/
export async function recordChannelMessageReplyDispatch(
params: RecordChannelMessageReplyDispatchParams,
): Promise<void> {
@@ -246,7 +258,11 @@ export async function recordInboundSessionAndDispatchReply(
await recordChannelMessageReplyDispatch(params);
}
/** @deprecated Compatibility helper for legacy reply dispatch bridges. */
export const buildChannelMessageReplyDispatchBase = buildInboundReplyDispatchBase;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasFinalChannelMessageReplyDispatch = hasFinalChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const hasVisibleChannelMessageReplyDispatch = hasVisibleChannelTurnDispatch;
/** @deprecated Compatibility helper for legacy reply dispatch results. */
export const resolveChannelMessageReplyDispatchCounts = resolveChannelTurnDispatchCounts;

View File

@@ -3,11 +3,21 @@ export { resolveOutboundSendDep, type OutboundSendDeps } from "../infra/outbound
export { resolveAgentOutboundIdentity, type OutboundIdentity } from "../infra/outbound/identity.js";
export type { OutboundDeliveryFormattingOptions } from "../infra/outbound/formatting.js";
export { createReplyToFanout, type ReplyToResolution } from "../infra/outbound/reply-policy.js";
export {
deliverOutboundPayloads,
type DeliverOutboundPayloadsParams,
type OutboundDeliveryResult,
} from "../infra/outbound/deliver.js";
/**
* @deprecated Direct outbound delivery is compatibility/runtime substrate. New
* channel and plugin send paths should use
* `openclaw/plugin-sdk/channel-message-runtime` helpers:
* `sendDurableMessageBatch`, `withDurableMessageSendContext`, or
* `deliverInboundReplyWithMessageSendContext`.
*/
export { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
/**
* @deprecated Direct outbound delivery params are compatibility/runtime
* substrate. New channel and plugin send paths should use
* `openclaw/plugin-sdk/channel-message-runtime` helpers.
*/
export type { DeliverOutboundPayloadsParams } from "../infra/outbound/deliver.js";
export { type OutboundDeliveryResult } from "../infra/outbound/deliver.js";
export { sanitizeForPlainText } from "../infra/outbound/sanitize-text.js";
export {
buildOutboundSessionContext,

View File

@@ -3,10 +3,11 @@ export {
createEmptyPluginRegistry,
createOutboundTestPlugin,
createTestRegistry,
deliverOutboundPayloads,
initializeGlobalHookRunner,
releasePinnedPluginChannelRegistry,
resetGlobalHookRunner,
setActivePluginRegistry,
type PluginHookRegistration,
} from "../testing.js";
/** @deprecated Direct outbound delivery is runtime substrate; use channel message runtime helpers. */
export { deliverOutboundPayloads } from "../testing.js";

View File

@@ -35,6 +35,7 @@ export type { ChannelGatewayContext } from "../channels/plugins/types.adapters.j
export type { OpenClawConfig } from "../config/config.js";
export { isAtLeast, parseSemver } from "../infra/runtime-guard.js";
export { callGateway } from "../gateway/call.js";
/** @deprecated Direct outbound delivery is runtime substrate; use channel message runtime helpers. */
export { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
export {
createEmptyPluginRegistry,

View File

@@ -758,7 +758,7 @@ export const PLUGIN_COMPAT_RECORDS = [
surfaces: ["api.runtime.config.loadConfig", "api.runtime.config.writeConfigFile"],
diagnostics: [
"plugin runtime compatibility warning",
"deprecated internal config API guard",
"deprecated API usage guard",
"runtime channel config boundary guard",
],
tests: [

View File

@@ -77,6 +77,8 @@ export type PluginHookMessageSendingEvent = {
export type PluginHookMessageSendingResult = {
content?: string;
cancel?: boolean;
cancelReason?: string;
metadata?: Record<string, unknown>;
};
export type PluginHookMessageSentEvent = {

View File

@@ -1045,6 +1045,8 @@ export function createHookRunner(
return {
content: lastDefined(acc?.content, next.content),
cancel: stickyTrue(acc?.cancel, next.cancel),
cancelReason: lastDefined(acc?.cancelReason, next.cancelReason),
metadata: next.metadata ?? acc?.metadata,
};
},
shouldStop: (result) => result.cancel === true,

View File

@@ -47,8 +47,8 @@ describe("message_sending hook runner", () => {
{
name: "runMessageSending can cancel message delivery",
event: { to: "user-123", content: "blocked" },
hookResult: { cancel: true },
expected: { cancel: true },
hookResult: { cancel: true, cancelReason: "policy", metadata: { owner: "agent-2" } },
expected: { cancel: true, cancelReason: "policy", metadata: { owner: "agent-2" } },
},
] as const)("$name", async ({ event, hookResult, expected }) => {
await expectMessageHookCall({