fix(discord): preserve outbound reply threading

This commit is contained in:
Peter Steinberger
2026-04-25 02:54:29 +01:00
parent 6d271762ab
commit 5e640b93da
30 changed files with 1910 additions and 1625 deletions

View File

@@ -98,6 +98,7 @@ Docs: https://docs.openclaw.ai
- Providers/Google: map `/think adaptive` to Gemini dynamic thinking instead of a fixed medium/high budget, using Gemini 3's provider default and Gemini 2.5's `thinkingBudget: -1`. Fixes #71316. Thanks @steipete.
- Providers/MiniMax: keep M2.7 chat model metadata text-only so image tool requests route through `MiniMax-VL-01` instead of the Anthropic-compatible chat endpoint. Fixes #71296. Thanks @ilker-cevikkaya.
- Discord/replies: run `message_sending` plugin hooks for Discord reply delivery, including DM targets, so plugins can transform or cancel outbound Discord replies consistently with other channels. Fixes #59350. (#71094) Thanks @wei840222.
- Discord/replies: preserve single-use native reply semantics across shared payload fallback, component, voice, and queued delivery paths, so explicit reply tags no longer consume implicit reply slots and chunked fallback sends reply only once. Thanks @steipete.
- Control UI/commands: carry provider-owned thinking option ids/labels in session rows and defaults so fresh sessions show and accept dynamic modes such as `adaptive`, `xhigh`, and `max`. Fixes #71269. Thanks @Young-Khalil.
- Image generation: make explicit `model=` overrides exact-only so failed `openai/gpt-image-2` requests no longer fall through to Gemini or other configured providers, and update `image_generate list` to mention OpenAI Codex OAuth as valid auth for `openai/gpt-image-2`. Fixes #71290 and #71231. Thanks @Young-Khalil and @steipete.
- Providers/GitHub Copilot: keep the plugin stream wrapper from claiming transport selection before OpenClaw picks a boundary-aware stream path, avoiding Pi's stale fallback Copilot headers on normal model turns. Thanks @steipete.

View File

@@ -466,6 +466,14 @@ should use `resolveInboundMentionDecision({ facts, policy })`.
You can also pass raw adapter objects instead of the declarative options
if you need full control.
Raw outbound adapters may define a `chunker(text, limit, ctx)` function.
The optional `ctx.formatting` carries delivery-time formatting decisions
such as `maxLinesPerMessage`; apply it before sending so reply threading
and chunk boundaries are resolved once by shared outbound delivery.
Send contexts also include `replyToIdSource` (`implicit` or `explicit`)
when a native reply target was resolved, so payload helpers can preserve
explicit reply tags without consuming an implicit single-use reply slot.
</Accordion>
</Step>

View File

@@ -262,7 +262,7 @@ releases.
| `plugin-sdk/inbound-reply-dispatch` | Inbound reply helpers | Shared record-and-dispatch helpers |
| `plugin-sdk/messaging-targets` | Messaging target parsing | Target parsing/matching helpers |
| `plugin-sdk/outbound-media` | Outbound media helpers | Shared outbound media loading |
| `plugin-sdk/outbound-runtime` | Outbound runtime helpers | Outbound identity/send delegate and payload planning helpers |
| `plugin-sdk/outbound-runtime` | Outbound runtime helpers | Outbound delivery, identity/send delegate, session, formatting, and payload planning helpers |
| `plugin-sdk/thread-bindings-runtime` | Thread-binding helpers | Thread-binding lifecycle and adapter helpers |
| `plugin-sdk/agent-media-payload` | Legacy media payload helpers | Agent media payload builder for legacy field layouts |
| `plugin-sdk/channel-runtime` | Deprecated compatibility shim | Legacy channel runtime utilities only |

View File

@@ -50,7 +50,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview)
| `plugin-sdk/inbound-reply-dispatch` | Shared inbound record-and-dispatch helpers |
| `plugin-sdk/messaging-targets` | Target parsing/matching helpers |
| `plugin-sdk/outbound-media` | Shared outbound media loading helpers |
| `plugin-sdk/outbound-runtime` | Outbound identity, send delegate, and payload planning helpers |
| `plugin-sdk/outbound-runtime` | Outbound delivery, identity, send delegate, session, formatting, and payload planning helpers |
| `plugin-sdk/poll-runtime` | Narrow poll normalization helpers |
| `plugin-sdk/thread-bindings-runtime` | Thread-binding lifecycle and adapter helpers |
| `plugin-sdk/agent-media-payload` | Legacy agent media payload builder |

View File

@@ -15,7 +15,6 @@ import {
} from "openclaw/plugin-sdk/directory-runtime";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { createLazyRuntimeModule } from "openclaw/plugin-sdk/lazy-runtime";
import { resolveOutboundSendDep } from "openclaw/plugin-sdk/outbound-runtime";
import { sleepWithAbort } from "openclaw/plugin-sdk/runtime-env";
import {
createComputedAccountStatusAdapter,
@@ -49,16 +48,12 @@ import {
resolveDiscordGroupRequireMention,
resolveDiscordGroupToolPolicy,
} from "./group-policy.js";
import { isLikelyDiscordVideoMedia } from "./media-detection.js";
import {
setThreadBindingIdleTimeoutBySessionKey,
setThreadBindingMaxAgeBySessionKey,
} from "./monitor/thread-bindings.session-updates.js";
import {
looksLikeDiscordTargetId,
normalizeDiscordMessagingTarget,
normalizeDiscordOutboundTarget,
} from "./normalize.js";
import { looksLikeDiscordTargetId, normalizeDiscordMessagingTarget } from "./normalize.js";
import { discordOutbound } from "./outbound-adapter.js";
import { resolveDiscordOutboundSessionRoute } from "./outbound-session-route.js";
import type { DiscordProbe } from "./probe.js";
import { getDiscordRuntime } from "./runtime.js";
@@ -69,8 +64,6 @@ import { createDiscordPluginBase, discordConfigAdapter } from "./shared.js";
import { collectDiscordStatusIssues } from "./status-issues.js";
import { parseDiscordTarget } from "./target-parsing.js";
type DiscordSendFn = typeof import("./send.js").sendMessageDiscord;
let discordProviderRuntimePromise:
| Promise<typeof import("./monitor/provider.runtime.js")>
| undefined;
@@ -145,22 +138,6 @@ function resolveRuntimeDiscordMessageActions() {
}
}
function resolveOptionalDiscordRuntime() {
try {
return getDiscordRuntime();
} catch {
return null;
}
}
async function resolveDiscordSend(deps?: { [channelId: string]: unknown }): Promise<DiscordSendFn> {
return (
resolveOutboundSendDep<DiscordSendFn>(deps, "discord") ??
resolveOptionalDiscordRuntime()?.channel?.discord?.sendMessageDiscord ??
(await loadDiscordSendModule()).sendMessageDiscord
);
}
const discordMessageActions = {
describeMessageTool: (
ctx: Parameters<NonNullable<ChannelMessageActionAdapter["describeMessageTool"]>>[0],
@@ -811,84 +788,13 @@ export const discordPlugin: ChannelPlugin<ResolvedDiscordAccount, DiscordProbe>
},
},
outbound: {
base: {
deliveryMode: "direct",
chunker: null,
textChunkLimit: 2000,
pollMaxOptions: 10,
shouldTreatDeliveredTextAsVisible: shouldTreatDiscordDeliveredTextAsVisible,
shouldSuppressLocalPayloadPrompt: ({ cfg, accountId, payload }) =>
shouldSuppressLocalDiscordExecApprovalPrompt({
cfg,
accountId,
payload,
}),
resolveTarget: ({ to }) => normalizeDiscordOutboundTarget(to),
},
attachedResults: {
channel: "discord",
sendText: async ({ cfg, to, text, accountId, deps, replyToId, threadId, silent }) => {
const send = await resolveDiscordSend(deps);
return await send(resolveDiscordAttachedOutboundTarget({ to, threadId }), text, {
verbose: false,
cfg,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
});
},
sendMedia: async ({
...discordOutbound,
shouldTreatDeliveredTextAsVisible: shouldTreatDiscordDeliveredTextAsVisible,
shouldSuppressLocalPayloadPrompt: ({ cfg, accountId, payload }) =>
shouldSuppressLocalDiscordExecApprovalPrompt({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
accountId,
deps,
replyToId,
threadId,
silent,
}) => {
const send = await resolveDiscordSend(deps);
const target = resolveDiscordAttachedOutboundTarget({ to, threadId });
if (text.trim() && mediaUrl && isLikelyDiscordVideoMedia(mediaUrl)) {
await send(target, text, {
verbose: false,
cfg,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
});
return await send(target, "", {
verbose: false,
cfg,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
});
}
return await send(target, text, {
verbose: false,
cfg,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
});
},
sendPoll: async ({ cfg, to, poll, accountId, threadId, silent }) =>
await (
await loadDiscordSendModule()
).sendPollDiscord(resolveDiscordAttachedOutboundTarget({ to, threadId }), poll, {
cfg,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
}),
},
payload,
}),
},
});

View File

@@ -0,0 +1,52 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
resolveRetryConfig,
retryAsync,
type RetryConfig,
} from "openclaw/plugin-sdk/retry-runtime";
import { resolveDiscordAccount } from "./accounts.js";
const DISCORD_DELIVERY_RETRY_DEFAULTS = {
attempts: 3,
minDelayMs: 1000,
maxDelayMs: 30_000,
jitter: 0,
} satisfies Required<RetryConfig>;
function isRetryableDiscordDeliveryError(err: unknown): boolean {
const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode;
return status === 429 || (status !== undefined && status >= 500);
}
function getDiscordDeliveryRetryAfterMs(err: unknown): number | undefined {
if (!err || typeof err !== "object") {
return undefined;
}
if (
"retryAfter" in err &&
typeof err.retryAfter === "number" &&
Number.isFinite(err.retryAfter)
) {
return err.retryAfter * 1000;
}
const retryAfterRaw = (err as { headers?: Record<string, string> }).headers?.["retry-after"];
if (!retryAfterRaw) {
return undefined;
}
const retryAfterMs = Number(retryAfterRaw) * 1000;
return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined;
}
export async function withDiscordDeliveryRetry<T>(params: {
cfg: OpenClawConfig;
accountId?: string | null;
fn: () => Promise<T>;
}): Promise<T> {
const account = resolveDiscordAccount({ cfg: params.cfg, accountId: params.accountId });
const retryConfig = resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, account.config.retry);
return await retryAsync(params.fn, {
...retryConfig,
shouldRetry: (err) => isRetryableDiscordDeliveryError(err),
retryAfterMs: getDiscordDeliveryRetryAfterMs,
});
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,33 +1,27 @@
import type { RequestClient } from "@buape/carbon";
import { resolveAgentAvatar } from "openclaw/plugin-sdk/agent-runtime";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import type { MarkdownTableMode, ReplyToMode } from "openclaw/plugin-sdk/config-runtime";
import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime";
import type {
MarkdownTableMode,
OpenClawConfig,
ReplyToMode,
} from "openclaw/plugin-sdk/config-runtime";
import type { OutboundMediaAccess } from "openclaw/plugin-sdk/media-runtime";
import {
buildOutboundSessionContext,
deliverOutboundPayloads,
type OutboundDeliveryFormattingOptions,
type OutboundIdentity,
type OutboundSendDeps,
} from "openclaw/plugin-sdk/outbound-runtime";
import type { ChunkMode } from "openclaw/plugin-sdk/reply-chunking";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-dispatch-runtime";
import {
resolveSendableOutboundReplyParts,
resolveTextChunksWithFallback,
sendMediaWithLeadingCaption,
} from "openclaw/plugin-sdk/reply-payload";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
import {
resolveRetryConfig,
retryAsync,
type RetryConfig,
type RetryRunner,
} from "openclaw/plugin-sdk/retry-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { convertMarkdownTables, normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import { resolveDiscordAccount } from "../accounts.js";
import { chunkDiscordTextWithMode } from "../chunk.js";
import { isLikelyDiscordVideoMedia } from "../media-detection.js";
import { createDiscordRetryRunner } from "../retry.js";
import { sendMessageDiscord, sendVoiceMessageDiscord, sendWebhookMessageDiscord } from "../send.js";
import { buildDiscordSendError, sendDiscordText } from "../send.shared.js";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import { sendMessageDiscord, sendVoiceMessageDiscord } from "../send.js";
export type DiscordThreadBindingLookupRecord = {
accountId: string;
channelId: string;
threadId: string;
agentId: string;
label?: string;
@@ -40,162 +34,6 @@ export type DiscordThreadBindingLookup = {
touchThread?: (params: { threadId: string; at?: number; persist?: boolean }) => unknown;
};
type ResolvedRetryConfig = Required<RetryConfig>;
const DISCORD_DELIVERY_RETRY_DEFAULTS: ResolvedRetryConfig = {
attempts: 3,
minDelayMs: 1000,
maxDelayMs: 30_000,
jitter: 0,
};
function isRetryableDiscordError(err: unknown): boolean {
const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode;
return status === 429 || (status !== undefined && status >= 500);
}
function getDiscordRetryAfterMs(err: unknown): number | undefined {
if (!err || typeof err !== "object") {
return undefined;
}
if (
"retryAfter" in err &&
typeof err.retryAfter === "number" &&
Number.isFinite(err.retryAfter)
) {
return err.retryAfter * 1000;
}
const retryAfterRaw = (err as { headers?: Record<string, string> }).headers?.["retry-after"];
if (!retryAfterRaw) {
return undefined;
}
const retryAfterMs = Number(retryAfterRaw) * 1000;
return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined;
}
function resolveDeliveryRetryConfig(retry?: RetryConfig): ResolvedRetryConfig {
return resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, retry);
}
async function sendWithRetry(
fn: () => Promise<unknown>,
retryConfig: ResolvedRetryConfig,
): Promise<void> {
await retryAsync(fn, {
...retryConfig,
shouldRetry: (err) => isRetryableDiscordError(err),
retryAfterMs: getDiscordRetryAfterMs,
});
}
async function sendDiscordMediaOnly(params: {
target: string;
cfg: OpenClawConfig;
token: string;
rest?: RequestClient;
mediaUrl: string;
accountId?: string;
mediaLocalRoots?: readonly string[];
replyTo?: string;
retryConfig: ResolvedRetryConfig;
}): Promise<void> {
await sendWithRetry(
() =>
sendMessageDiscord(params.target, "", {
cfg: params.cfg,
token: params.token,
rest: params.rest,
mediaUrl: params.mediaUrl,
accountId: params.accountId,
mediaLocalRoots: params.mediaLocalRoots,
replyTo: params.replyTo,
}),
params.retryConfig,
);
}
async function sendDiscordMediaBatch(params: {
target: string;
cfg: OpenClawConfig;
token: string;
rest?: RequestClient;
mediaUrls: string[];
accountId?: string;
mediaLocalRoots?: readonly string[];
replyTo: () => string | undefined;
retryConfig: ResolvedRetryConfig;
}): Promise<void> {
await sendMediaWithLeadingCaption({
mediaUrls: params.mediaUrls,
caption: "",
send: async ({ mediaUrl }) => {
await sendDiscordMediaOnly({
target: params.target,
cfg: params.cfg,
token: params.token,
rest: params.rest,
mediaUrl,
accountId: params.accountId,
mediaLocalRoots: params.mediaLocalRoots,
replyTo: params.replyTo(),
retryConfig: params.retryConfig,
});
},
});
}
async function sendDiscordPayloadText(params: {
cfg: OpenClawConfig;
target: string;
text: string;
token: string;
rest?: RequestClient;
accountId?: string;
textLimit?: number;
maxLinesPerMessage?: number;
binding?: DiscordThreadBindingLookupRecord;
chunkMode?: ChunkMode;
username?: string;
avatarUrl?: string;
channelId?: string;
request?: RetryRunner;
retryConfig: ResolvedRetryConfig;
resolveReplyTo: () => string | undefined;
}): Promise<void> {
const mode = params.chunkMode ?? "length";
const chunkLimit = Math.min(params.textLimit ?? 2000, 2000);
const chunks = resolveTextChunksWithFallback(
params.text,
chunkDiscordTextWithMode(params.text, {
maxChars: chunkLimit,
maxLines: params.maxLinesPerMessage,
chunkMode: mode,
}),
);
for (const chunk of chunks) {
if (!chunk.trim()) {
continue;
}
await sendDiscordChunkWithFallback({
cfg: params.cfg,
target: params.target,
text: chunk,
token: params.token,
rest: params.rest,
accountId: params.accountId,
maxLinesPerMessage: params.maxLinesPerMessage,
replyTo: params.resolveReplyTo(),
binding: params.binding,
chunkMode: params.chunkMode,
username: params.username,
avatarUrl: params.avatarUrl,
channelId: params.channelId,
request: params.request,
retryConfig: params.retryConfig,
});
}
}
function resolveTargetChannelId(target: string): string | undefined {
if (!target.startsWith("channel:")) {
return undefined;
@@ -213,176 +51,107 @@ function resolveBoundThreadBinding(params: {
if (!params.threadBindings || !sessionKey) {
return undefined;
}
const bindings = params.threadBindings.listBySessionKey(sessionKey);
if (bindings.length === 0) {
return undefined;
}
const targetChannelId = resolveTargetChannelId(params.target);
if (!targetChannelId) {
return undefined;
}
return bindings.find((entry) => entry.threadId === targetChannelId);
return params.threadBindings
.listBySessionKey(sessionKey)
.find((entry) => entry.threadId === targetChannelId);
}
function createPayloadReplyToResolver(params: {
payload: ReplyPayload;
replyToMode: ReplyToMode;
resolveFallbackReplyTo: () => string | undefined;
}): () => string | undefined {
const payloadReplyTo = normalizeOptionalString(params.payload.replyToId);
const allowExplicitReplyWhenOff = Boolean(
payloadReplyTo && (params.payload.replyToTag || params.payload.replyToCurrent),
);
if (!payloadReplyTo || (params.replyToMode === "off" && !allowExplicitReplyWhenOff)) {
return params.resolveFallbackReplyTo;
}
let payloadReplyUsed = false;
return () => {
if (params.replyToMode === "all") {
return payloadReplyTo;
}
if (payloadReplyUsed) {
return undefined;
}
payloadReplyUsed = true;
return payloadReplyTo;
};
}
function resolveMessageSendingHookReplyToId(params: {
payload: ReplyPayload;
replyToMode: ReplyToMode;
fallbackReplyTo: string | undefined;
fallbackReplyUsed: boolean;
}): string | undefined {
const payloadReplyTo = normalizeOptionalString(params.payload.replyToId);
const allowExplicitReplyWhenOff = Boolean(
payloadReplyTo && (params.payload.replyToTag || params.payload.replyToCurrent),
);
if (payloadReplyTo && (params.replyToMode !== "off" || allowExplicitReplyWhenOff)) {
return payloadReplyTo;
}
if (!params.fallbackReplyTo) {
return undefined;
}
if (!isSingleUseReplyToMode(params.replyToMode)) {
return params.fallbackReplyTo;
}
return params.fallbackReplyUsed ? undefined : params.fallbackReplyTo;
}
function resolveBindingPersona(
function resolveBindingIdentity(
cfg: OpenClawConfig,
binding: DiscordThreadBindingLookupRecord | undefined,
): {
username?: string;
avatarUrl?: string;
} {
): OutboundIdentity | undefined {
if (!binding) {
return {};
return undefined;
}
const baseLabel = binding.label?.trim() || binding.agentId;
const username = (`🤖 ${baseLabel}`.trim() || "🤖 agent").slice(0, 80);
let avatarUrl: string | undefined;
const identity: OutboundIdentity = {
name: (`🤖 ${baseLabel}`.trim() || "🤖 agent").slice(0, 80),
};
try {
const avatar = resolveAgentAvatar(cfg, binding.agentId);
if (avatar.kind === "remote") {
avatarUrl = avatar.url;
identity.avatarUrl = avatar.url;
}
} catch {
avatarUrl = undefined;
// Avatar is cosmetic; delivery should not depend on local identity config.
}
return { username, avatarUrl };
return identity;
}
async function sendDiscordChunkWithFallback(params: {
function createDiscordDeliveryDeps(params: {
cfg: OpenClawConfig;
target: string;
text: string;
token: string;
accountId?: string;
maxLinesPerMessage?: number;
rest?: RequestClient;
replyTo?: string;
binding?: DiscordThreadBindingLookupRecord;
chunkMode?: ChunkMode;
username?: string;
avatarUrl?: string;
/** Pre-resolved channel ID to bypass redundant resolution per chunk. */
channelId?: string;
/** Pre-created retry runner to avoid creating one per chunk. */
request?: RetryRunner;
/** Pre-resolved retry config (account-level). */
retryConfig: ResolvedRetryConfig;
}) {
if (!params.text.trim()) {
return;
}
const text = params.text;
const binding = params.binding;
if (binding?.webhookId && binding?.webhookToken) {
try {
await sendWebhookMessageDiscord(text, {
cfg: params.cfg,
webhookId: binding.webhookId,
webhookToken: binding.webhookToken,
accountId: binding.accountId,
threadId: binding.threadId,
replyTo: params.replyTo,
username: params.username,
avatarUrl: params.avatarUrl,
});
return;
} catch {
// Fall through to the standard bot sender path.
}
}
// When channelId and request are pre-resolved, send directly via sendDiscordText
// to avoid per-chunk overhead (channel-type GET, re-chunking, client creation)
// that can cause ordering issues under queue contention or rate limiting.
if (params.channelId && params.request && params.rest) {
const { channelId, request, rest } = params;
try {
await sendWithRetry(
() =>
sendDiscordText(
rest,
channelId,
text,
params.replyTo,
request,
params.maxLinesPerMessage,
undefined,
undefined,
params.chunkMode,
),
params.retryConfig,
);
} catch (err) {
throw await buildDiscordSendError(err, {
channelId,
cfg: params.cfg,
rest,
token: params.token,
hasMedia: false,
});
}
return;
}
await sendWithRetry(
() =>
sendMessageDiscord(params.target, text, {
cfg: params.cfg,
}): OutboundSendDeps {
return {
discord: (to: string, text: string, opts?: Parameters<typeof sendMessageDiscord>[2]) =>
sendMessageDiscord(to, text, {
...opts,
cfg: opts?.cfg ?? params.cfg,
token: params.token,
rest: params.rest,
accountId: params.accountId,
replyTo: params.replyTo,
}),
params.retryConfig,
);
discordVoice: (
to: string,
audioPath: string,
opts?: Parameters<typeof sendVoiceMessageDiscord>[2],
) =>
sendVoiceMessageDiscord(to, audioPath, {
...opts,
cfg: opts?.cfg ?? params.cfg,
token: params.token,
rest: params.rest,
}),
};
}
type DiscordDeliveryOptions = {
to: string;
threadId?: string;
agentId?: string;
identity?: OutboundIdentity;
mediaAccess?: OutboundMediaAccess;
replyToMode: ReplyToMode;
formatting: OutboundDeliveryFormattingOptions;
};
function resolveDiscordDeliveryOptions(params: {
cfg: OpenClawConfig;
target: string;
sessionKey?: string;
threadBindings?: DiscordThreadBindingLookup;
textLimit: number;
maxLinesPerMessage?: number;
tableMode?: MarkdownTableMode;
chunkMode?: ChunkMode;
replyToMode?: ReplyToMode;
mediaLocalRoots?: readonly string[];
}): DiscordDeliveryOptions {
const binding = resolveBoundThreadBinding({
threadBindings: params.threadBindings,
sessionKey: params.sessionKey,
target: params.target,
});
return {
to: binding ? `channel:${binding.channelId}` : params.target,
threadId: binding?.threadId,
agentId: binding?.agentId,
identity: resolveBindingIdentity(params.cfg, binding),
mediaAccess: params.mediaLocalRoots?.length
? { localRoots: params.mediaLocalRoots }
: undefined,
replyToMode: params.replyToMode ?? "all",
formatting: {
textLimit: params.textLimit,
maxLinesPerMessage: params.maxLinesPerMessage,
tableMode: params.tableMode,
chunkMode: params.chunkMode,
},
};
}
export async function deliverDiscordReply(params: {
@@ -403,187 +172,32 @@ export async function deliverDiscordReply(params: {
threadBindings?: DiscordThreadBindingLookup;
mediaLocalRoots?: readonly string[];
}) {
const replyTo = normalizeOptionalString(params.replyToId);
const replyToMode = params.replyToMode ?? "all";
const replyOnce = isSingleUseReplyToMode(replyToMode);
let replyUsed = false;
const resolveReplyTo = () => {
if (!replyTo) {
return undefined;
}
if (!replyOnce) {
return replyTo;
}
if (replyUsed) {
return undefined;
}
replyUsed = true;
return replyTo;
};
const binding = resolveBoundThreadBinding({
threadBindings: params.threadBindings,
sessionKey: params.sessionKey,
target: params.target,
void params.runtime;
const delivery = resolveDiscordDeliveryOptions(params);
await deliverOutboundPayloads({
cfg: params.cfg,
channel: "discord",
to: delivery.to,
accountId: params.accountId,
payloads: params.replies,
replyToId: normalizeOptionalString(params.replyToId),
replyToMode: delivery.replyToMode,
formatting: delivery.formatting,
threadId: delivery.threadId,
identity: delivery.identity,
deps: createDiscordDeliveryDeps({
cfg: params.cfg,
token: params.token,
rest: params.rest,
}),
mediaAccess: delivery.mediaAccess,
session: buildOutboundSessionContext({
cfg: params.cfg,
sessionKey: params.sessionKey,
agentId: delivery.agentId,
requesterAccountId: params.accountId,
}),
});
const persona = resolveBindingPersona(params.cfg, binding);
// Pre-resolve channel ID and retry runner once to avoid per-chunk overhead.
// This eliminates redundant channel-type GET requests and client creation that
// can cause ordering issues when multiple chunks share the RequestClient queue.
const channelId = resolveTargetChannelId(params.target);
const account = resolveDiscordAccount({ cfg: params.cfg, accountId: params.accountId });
const retryConfig = resolveDeliveryRetryConfig(account.config.retry);
const request: RetryRunner | undefined = channelId
? createDiscordRetryRunner({ configRetry: account.config.retry })
: undefined;
const hookRunner = getGlobalHookRunner();
const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false;
const hookConversationId = channelId ?? params.target;
let deliveredAny = false;
for (const payload of params.replies) {
const resolvePayloadReplyTo = createPayloadReplyToResolver({
payload,
replyToMode,
resolveFallbackReplyTo: resolveReplyTo,
});
const tableMode = params.tableMode ?? "code";
let effectiveText = payload.text ?? "";
if (hasMessageSendingHooks) {
try {
const hookResult = await hookRunner?.runMessageSending(
{
to: hookConversationId,
content: effectiveText,
replyToId: resolveMessageSendingHookReplyToId({
payload,
replyToMode,
fallbackReplyTo: replyTo,
fallbackReplyUsed: replyUsed,
}),
metadata: {
channel: "discord",
mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined),
},
},
{
channelId: "discord",
accountId: params.accountId,
conversationId: hookConversationId,
},
);
if (hookResult?.cancel) {
continue;
}
if (typeof hookResult?.content === "string") {
effectiveText = hookResult.content;
}
} catch (error) {
params.runtime.error?.(
`discord: message_sending hook failed: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
const reply = resolveSendableOutboundReplyParts(
{ ...payload, text: effectiveText },
{ text: convertMarkdownTables(effectiveText, tableMode) },
);
if (!reply.hasContent) {
continue;
}
const sendReplyText = async () =>
sendDiscordPayloadText({
cfg: params.cfg,
target: params.target,
text: reply.text,
token: params.token,
rest: params.rest,
accountId: params.accountId,
textLimit: params.textLimit,
maxLinesPerMessage: params.maxLinesPerMessage,
resolveReplyTo: resolvePayloadReplyTo,
binding,
chunkMode: params.chunkMode,
username: persona.username,
avatarUrl: persona.avatarUrl,
channelId,
request,
retryConfig,
});
const sendReplyMediaBatch = async (mediaUrls: string[]) =>
sendDiscordMediaBatch({
target: params.target,
cfg: params.cfg,
token: params.token,
rest: params.rest,
mediaUrls,
accountId: params.accountId,
mediaLocalRoots: params.mediaLocalRoots,
replyTo: resolvePayloadReplyTo,
retryConfig,
});
if (!reply.hasMedia) {
await sendReplyText();
if (reply.text.trim()) {
deliveredAny = true;
}
continue;
}
const firstMedia = reply.mediaUrls[0];
if (!firstMedia) {
continue;
}
// Voice message path: audioAsVoice flag routes through sendVoiceMessageDiscord.
if (payload.audioAsVoice) {
const replyTo = resolvePayloadReplyTo();
await sendVoiceMessageDiscord(params.target, firstMedia, {
cfg: params.cfg,
token: params.token,
rest: params.rest,
accountId: params.accountId,
replyTo,
});
deliveredAny = true;
// Voice messages cannot include text; send remaining text separately if present.
await sendReplyText();
// Additional media items are sent as regular attachments (voice is single-file only).
await sendReplyMediaBatch(reply.mediaUrls.slice(1));
continue;
}
const shouldSplitVideoMediaReply =
reply.text.trim().length > 0 &&
reply.mediaUrls.some((mediaUrl) => isLikelyDiscordVideoMedia(mediaUrl));
if (shouldSplitVideoMediaReply) {
await sendReplyText();
await sendReplyMediaBatch(reply.mediaUrls);
deliveredAny = true;
continue;
}
await sendMediaWithLeadingCaption({
mediaUrls: reply.mediaUrls,
caption: reply.text,
send: async ({ mediaUrl, caption }) => {
const replyTo = resolvePayloadReplyTo();
await sendWithRetry(
() =>
sendMessageDiscord(params.target, caption ?? "", {
cfg: params.cfg,
token: params.token,
rest: params.rest,
mediaUrl,
accountId: params.accountId,
mediaLocalRoots: params.mediaLocalRoots,
replyTo,
}),
retryConfig,
);
},
});
deliveredAny = true;
}
if (binding && deliveredAny) {
params.threadBindings?.touchThread?.({ threadId: binding.threadId });
}
}

View File

@@ -8,6 +8,7 @@ type DiscordOutboundHoisted = {
sendDiscordComponentMessageMock: AsyncUnknownMock;
sendPollDiscordMock: AsyncUnknownMock;
sendWebhookMessageDiscordMock: AsyncUnknownMock;
sendVoiceMessageDiscordMock: AsyncUnknownMock;
getThreadBindingManagerMock: UnknownMock;
};
@@ -28,12 +29,14 @@ export function createDiscordOutboundHoisted(): DiscordOutboundHoisted {
const sendDiscordComponentMessageMock = vi.fn();
const sendPollDiscordMock = vi.fn();
const sendWebhookMessageDiscordMock = vi.fn();
const sendVoiceMessageDiscordMock = vi.fn();
const getThreadBindingManagerMock = vi.fn();
return {
sendMessageDiscordMock,
sendDiscordComponentMessageMock,
sendPollDiscordMock,
sendWebhookMessageDiscordMock,
sendVoiceMessageDiscordMock,
getThreadBindingManagerMock,
};
}
@@ -68,6 +71,11 @@ export async function createDiscordSendModuleMock(
Parameters<DiscordSendModule["sendWebhookMessageDiscord"]>,
ReturnType<DiscordSendModule["sendWebhookMessageDiscord"]>
>(hoisted.sendWebhookMessageDiscordMock, ...args),
sendVoiceMessageDiscord: (...args: Parameters<DiscordSendModule["sendVoiceMessageDiscord"]>) =>
invokeMock<
Parameters<DiscordSendModule["sendVoiceMessageDiscord"]>,
ReturnType<DiscordSendModule["sendVoiceMessageDiscord"]>
>(hoisted.sendVoiceMessageDiscordMock, ...args),
};
}
@@ -115,6 +123,9 @@ export async function installDiscordOutboundModuleSpies(hoisted: DiscordOutbound
vi.spyOn(sendModule, "sendWebhookMessageDiscord").mockImplementation(
mockedSendModule.sendWebhookMessageDiscord,
);
vi.spyOn(sendModule, "sendVoiceMessageDiscord").mockImplementation(
mockedSendModule.sendVoiceMessageDiscord,
);
const sendComponentsModule = await import("./send.components.js");
const mockedSendComponentsModule = await createDiscordSendComponentsModuleMock(
@@ -152,6 +163,10 @@ export function resetDiscordOutboundMocks(hoisted: DiscordOutboundHoisted) {
messageId: "msg-webhook-1",
channelId: "thread-1",
});
hoisted.sendVoiceMessageDiscordMock.mockReset().mockResolvedValue({
messageId: "voice-1",
channelId: "ch-1",
});
hoisted.getThreadBindingManagerMock.mockReset().mockReturnValue(null);
}
@@ -187,5 +202,6 @@ export function mockDiscordBoundThreadManager(hoisted: DiscordOutboundHoisted) {
boundBy: "system",
boundAt: Date.now(),
}),
touchThread: vi.fn(),
});
}

View File

@@ -1,4 +1,4 @@
import { beforeAll, beforeEach, describe, expect, it } from "vitest";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
import {
createDiscordOutboundHoisted,
expectDiscordThreadBotSend,
@@ -72,6 +72,62 @@ describe("discordOutbound", () => {
});
});
it("forwards explicit formatting options to Discord text sends", async () => {
await discordOutbound.sendText?.({
cfg: {},
to: "channel:123456",
text: "formatted",
accountId: "default",
formatting: {
textLimit: 1234,
maxLinesPerMessage: 7,
tableMode: "off",
chunkMode: "newline",
},
});
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith(
"channel:123456",
"formatted",
expect.objectContaining({
textLimit: 1234,
maxLinesPerMessage: 7,
tableMode: "off",
chunkMode: "newline",
}),
);
});
it.each([500, 429])("retries transient Discord text send status %i", async (status) => {
hoisted.sendMessageDiscordMock
.mockRejectedValueOnce(Object.assign(new Error(`discord ${status}`), { status }))
.mockResolvedValueOnce({
messageId: "msg-retry-ok",
channelId: "ch-1",
});
const result = await discordOutbound.sendText?.({
cfg: {
channels: {
discord: {
token: "test-token",
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
},
},
},
to: "channel:123456",
text: "retry me",
accountId: "default",
});
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledTimes(2);
expect(result).toEqual({
channel: "discord",
messageId: "msg-retry-ok",
channelId: "ch-1",
});
});
it("uses webhook persona delivery for bound thread text replies", async () => {
mockDiscordBoundThreadManager(hoisted);
const cfg = {
@@ -189,6 +245,155 @@ describe("discordOutbound", () => {
});
});
it("routes audioAsVoice payloads through the Discord voice send helper", async () => {
const result = await discordOutbound.sendPayload?.({
cfg: {},
to: "channel:123456",
text: "",
payload: {
text: "voice note",
mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.png"],
audioAsVoice: true,
},
accountId: "default",
replyToId: "reply-1",
replyToMode: "first",
});
expect(hoisted.sendVoiceMessageDiscordMock).toHaveBeenCalledWith(
"channel:123456",
"https://example.com/voice.ogg",
expect.objectContaining({
accountId: "default",
replyTo: "reply-1",
}),
);
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith(
"channel:123456",
"voice note",
expect.objectContaining({
accountId: "default",
replyTo: undefined,
}),
);
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith(
"channel:123456",
"",
expect.objectContaining({
accountId: "default",
mediaUrl: "https://example.com/extra.png",
replyTo: undefined,
}),
);
expect(result).toEqual({
channel: "discord",
messageId: "msg-1",
channelId: "ch-1",
});
});
it("keeps replyToId on every internal audioAsVoice send when replyToMode is all", async () => {
await discordOutbound.sendPayload?.({
cfg: {},
to: "channel:123456",
text: "",
payload: {
text: "voice note",
mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.png"],
audioAsVoice: true,
},
accountId: "default",
replyToId: "reply-1",
replyToMode: "all",
});
expect(
(hoisted.sendVoiceMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined)
?.replyTo,
).toBe("reply-1");
expect(
hoisted.sendMessageDiscordMock.mock.calls.map(
(call) => (call[2] as { replyTo?: unknown } | undefined)?.replyTo,
),
).toEqual(["reply-1", "reply-1"]);
});
it("preserves explicit audioAsVoice payload replies when replyToMode is off", async () => {
await discordOutbound.sendPayload?.({
cfg: {},
to: "channel:123456",
text: "",
payload: {
text: "voice note",
mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.png"],
audioAsVoice: true,
},
accountId: "default",
replyToId: "explicit-reply-1",
replyToMode: "off",
});
expect(
(hoisted.sendVoiceMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined)
?.replyTo,
).toBe("explicit-reply-1");
expect(
hoisted.sendMessageDiscordMock.mock.calls.map(
(call) => (call[2] as { replyTo?: unknown } | undefined)?.replyTo,
),
).toEqual(["explicit-reply-1", "explicit-reply-1"]);
});
it("sends video captions as text before a media-only video follow-up", async () => {
await discordOutbound.sendMedia?.({
cfg: {},
to: "channel:123456",
text: "rendered clip",
mediaUrl: "/tmp/render.mp4",
accountId: "default",
replyToId: "reply-1",
});
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith(
"channel:123456",
"rendered clip",
expect.objectContaining({
accountId: "default",
replyTo: "reply-1",
}),
);
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith(
"channel:123456",
"",
expect.objectContaining({
accountId: "default",
mediaUrl: "/tmp/render.mp4",
}),
);
});
it("touches bound thread activity after shared outbound delivery succeeds", async () => {
const touchThread = vi.fn();
hoisted.getThreadBindingManagerMock.mockReturnValue({
getByThreadId: () => ({ threadId: "thread-1" }),
touchThread,
});
await discordOutbound.afterDeliverPayload?.({
cfg: {},
target: {
channel: "discord",
to: "channel:parent-1",
accountId: "default",
threadId: "thread-1",
},
payload: { text: "delivered" },
results: [{ channel: "discord", messageId: "msg-1" }],
});
expect(touchThread).toHaveBeenCalledWith({ threadId: "thread-1" });
});
it("sends component payload media sequences with the component message first", async () => {
hoisted.sendDiscordComponentMessageMock.mockResolvedValueOnce({
messageId: "component-1",
@@ -224,6 +429,8 @@ describe("discordOutbound", () => {
payload,
accountId: "default",
mediaLocalRoots: ["/tmp/media"],
replyToId: "reply-1",
replyToMode: "first",
});
expect(hoisted.sendDiscordComponentMessageMock).toHaveBeenCalledWith(
@@ -233,6 +440,7 @@ describe("discordOutbound", () => {
mediaUrl: "https://example.com/1.png",
mediaLocalRoots: ["/tmp/media"],
accountId: "default",
replyTo: "reply-1",
}),
);
expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith(
@@ -242,6 +450,7 @@ describe("discordOutbound", () => {
mediaUrl: "https://example.com/2.png",
mediaLocalRoots: ["/tmp/media"],
accountId: "default",
replyTo: undefined,
}),
);
expect(result).toEqual({
@@ -251,6 +460,98 @@ describe("discordOutbound", () => {
});
});
it("keeps replyToId on every internal component media send when replyToMode is all", async () => {
const payload = await discordOutbound.renderPresentation?.({
payload: {
text: "hello",
mediaUrls: ["https://example.com/1.png", "https://example.com/2.png"],
},
presentation: {
blocks: [{ type: "buttons", buttons: [{ label: "Open", value: "open" }] }],
},
ctx: {
cfg: {},
to: "channel:123456",
},
} as never);
if (!payload) {
throw new Error("expected Discord presentation payload");
}
await discordOutbound.sendPayload?.({
cfg: {},
to: "channel:123456",
text: "",
payload,
accountId: "default",
replyToId: "reply-1",
replyToMode: "all",
});
expect(
(
hoisted.sendDiscordComponentMessageMock.mock.calls[0]?.[2] as
| { replyTo?: unknown }
| undefined
)?.replyTo,
).toBe("reply-1");
expect(
(hoisted.sendMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined)
?.replyTo,
).toBe("reply-1");
});
it("preserves explicit component payload replies when replyToMode is off", async () => {
const payload = await discordOutbound.renderPresentation?.({
payload: {
text: "hello",
mediaUrls: ["https://example.com/1.png", "https://example.com/2.png"],
},
presentation: {
blocks: [{ type: "buttons", buttons: [{ label: "Open", value: "open" }] }],
},
ctx: {
cfg: {},
to: "channel:123456",
},
} as never);
if (!payload) {
throw new Error("expected Discord presentation payload");
}
await discordOutbound.sendPayload?.({
cfg: {},
to: "channel:123456",
text: "",
payload,
accountId: "default",
replyToId: "explicit-reply-1",
replyToMode: "off",
});
expect(
(
hoisted.sendDiscordComponentMessageMock.mock.calls[0]?.[2] as
| { replyTo?: unknown }
| undefined
)?.replyTo,
).toBe("explicit-reply-1");
expect(
(hoisted.sendMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined)
?.replyTo,
).toBe("explicit-reply-1");
});
it("uses explicit maxLinesPerMessage in its adapter chunker", () => {
expect(
discordOutbound.chunker?.("line one\nline two\nline three", 2000, {
formatting: { maxLinesPerMessage: 1 },
}),
).toEqual(["line one", "line two", "line three"]);
});
it("renders channelData Discord components on payload sends", async () => {
await discordOutbound.sendPayload?.({
cfg: {},
@@ -306,6 +607,34 @@ describe("discordOutbound", () => {
);
});
it("uses a single implicit reply for chunked approval payload fallbacks", async () => {
await discordOutbound.sendPayload?.({
cfg: {},
to: "channel:123456",
text: "",
payload: {
text: "line one\nline two",
channelData: {
execApproval: {
approvalId: "req-1",
approvalSlug: "req-1",
},
},
},
accountId: "default",
replyToId: "reply-1",
replyToIdSource: "implicit",
replyToMode: "first",
formatting: { maxLinesPerMessage: 1 },
});
expect(
hoisted.sendMessageDiscordMock.mock.calls.map(
(call) => (call[2] as { replyTo?: unknown } | undefined)?.replyTo,
),
).toEqual(["reply-1", undefined]);
});
it("leaves non-approval mentions unchanged", async () => {
await discordOutbound.sendPayload?.({
cfg: {},

View File

@@ -1,5 +1,4 @@
import {
attachChannelToResult,
type ChannelOutboundAdapter,
createAttachedChannelResultAdapter,
} from "openclaw/plugin-sdk/channel-send-result";
@@ -8,99 +7,39 @@ import {
resolveOutboundSendDep,
type OutboundIdentity,
} from "openclaw/plugin-sdk/outbound-runtime";
import {
resolvePayloadMediaUrls,
sendPayloadMediaSequenceOrFallback,
sendTextMediaPayload,
} from "openclaw/plugin-sdk/reply-payload";
import {
normalizeOptionalString,
normalizeOptionalStringifiedId,
} from "openclaw/plugin-sdk/text-runtime";
import { readDiscordComponentSpec, type DiscordComponentMessageSpec } from "./components.js";
import { chunkDiscordTextWithMode } from "./chunk.js";
import { withDiscordDeliveryRetry } from "./delivery-retry.js";
import { isLikelyDiscordVideoMedia } from "./media-detection.js";
import type { ThreadBindingRecord } from "./monitor/thread-bindings.js";
import { normalizeDiscordOutboundTarget } from "./normalize.js";
import {
buildDiscordPresentationPayload,
normalizeDiscordApprovalPayload,
sendDiscordOutboundPayload,
} from "./outbound-payload.js";
import {
loadDiscordSendRuntime,
resolveDiscordFormattingOptions,
resolveDiscordOutboundTarget,
type DiscordSendFn,
type DiscordVoiceSendFn,
} from "./outbound-send-context.js";
export const DISCORD_TEXT_CHUNK_LIMIT = 2000;
type DiscordSendRuntime = typeof import("./send.js");
type DiscordSendFn = DiscordSendRuntime["sendMessageDiscord"];
type DiscordComponentSendFn = typeof import("./send.components.js").sendDiscordComponentMessage;
type DiscordSharedInteractiveModule = typeof import("./shared-interactive.js");
type DiscordThreadBindingsModule = typeof import("./monitor/thread-bindings.js");
let discordSendRuntimePromise: Promise<DiscordSendRuntime> | undefined;
let discordComponentSendPromise: Promise<DiscordComponentSendFn> | undefined;
let discordSharedInteractivePromise: Promise<DiscordSharedInteractiveModule> | undefined;
let discordThreadBindingsPromise: Promise<DiscordThreadBindingsModule> | undefined;
async function loadDiscordSendRuntime(): Promise<DiscordSendRuntime> {
discordSendRuntimePromise ??= import("./send.js");
return await discordSendRuntimePromise;
}
async function sendDiscordComponentMessageLazy(
...args: Parameters<DiscordComponentSendFn>
): ReturnType<DiscordComponentSendFn> {
discordComponentSendPromise ??= import("./send.components.js").then(
(module) => module.sendDiscordComponentMessage,
);
return await (
await discordComponentSendPromise
)(...args);
}
function loadDiscordSharedInteractive(): Promise<DiscordSharedInteractiveModule> {
discordSharedInteractivePromise ??= import("./shared-interactive.js");
return discordSharedInteractivePromise;
}
function loadDiscordThreadBindings(): Promise<DiscordThreadBindingsModule> {
discordThreadBindingsPromise ??= import("./monitor/thread-bindings.js");
return discordThreadBindingsPromise;
}
function hasApprovalChannelData(payload: { channelData?: unknown }): boolean {
const channelData = payload.channelData;
if (!channelData || typeof channelData !== "object" || Array.isArray(channelData)) {
return false;
}
return Boolean((channelData as { execApproval?: unknown }).execApproval);
}
function neutralizeDiscordApprovalMentions(value: string): string {
return value
.replace(/@everyone/gi, "@\u200beveryone")
.replace(/@here/gi, "@\u200bhere")
.replace(/<@/g, "<@\u200b")
.replace(/<#/g, "<#\u200b");
}
function normalizeDiscordApprovalPayload<T extends { text?: string; channelData?: unknown }>(
payload: T,
): T {
return hasApprovalChannelData(payload) && payload.text
? {
...payload,
text: neutralizeDiscordApprovalMentions(payload.text),
}
: payload;
}
function resolveDiscordOutboundTarget(params: {
to: string;
threadId?: string | number | null;
}): string {
if (params.threadId == null) {
return params.to;
}
const threadId = normalizeOptionalStringifiedId(params.threadId) ?? "";
if (!threadId) {
return params.to;
}
return `channel:${threadId}`;
}
function resolveDiscordWebhookIdentity(params: {
identity?: OutboundIdentity;
binding: ThreadBindingRecord;
@@ -156,7 +95,11 @@ async function maybeSendDiscordWebhookText(params: {
export const discordOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
chunker: null,
chunker: (text, limit, ctx) =>
chunkDiscordTextWithMode(text, {
maxChars: limit,
maxLines: ctx?.formatting?.maxLinesPerMessage,
}),
textChunkLimit: DISCORD_TEXT_CHUNK_LIMIT,
pollMaxOptions: 10,
normalizePayload: ({ payload }) => normalizeDiscordApprovalPayload(payload),
@@ -168,105 +111,31 @@ export const discordOutbound: ChannelOutboundAdapter = {
divider: true,
},
renderPresentation: async ({ payload, presentation }) => {
const componentSpec = (await loadDiscordSharedInteractive()).buildDiscordPresentationComponents(
return await buildDiscordPresentationPayload({
payload,
presentation,
);
if (!componentSpec) {
return null;
}
return {
...payload,
channelData: {
...payload.channelData,
discord: {
...(payload.channelData?.discord as Record<string, unknown> | undefined),
presentationComponents: componentSpec,
},
},
};
});
},
resolveTarget: ({ to }) => normalizeDiscordOutboundTarget(to),
sendPayload: async (ctx) => {
const payload = normalizeDiscordApprovalPayload({
...ctx.payload,
text: ctx.payload.text ?? "",
});
const discordData = payload.channelData?.discord as
| { components?: unknown; presentationComponents?: DiscordComponentMessageSpec }
| undefined;
const rawComponentSpec =
discordData?.presentationComponents ??
readDiscordComponentSpec(discordData?.components) ??
(payload.interactive
? (await loadDiscordSharedInteractive()).buildDiscordInteractiveComponents(
payload.interactive,
)
: undefined);
const componentSpec = rawComponentSpec
? rawComponentSpec.text
? rawComponentSpec
: {
...rawComponentSpec,
text: payload.text?.trim() ? payload.text : undefined,
}
: undefined;
if (!componentSpec) {
return await sendTextMediaPayload({
channel: "discord",
ctx: {
...ctx,
payload,
},
adapter: discordOutbound,
});
}
const send =
resolveOutboundSendDep<DiscordSendFn>(ctx.deps, "discord") ??
(await loadDiscordSendRuntime()).sendMessageDiscord;
const target = resolveDiscordOutboundTarget({ to: ctx.to, threadId: ctx.threadId });
const mediaUrls = resolvePayloadMediaUrls(payload);
const result = await sendPayloadMediaSequenceOrFallback({
text: payload.text ?? "",
mediaUrls,
fallbackResult: { messageId: "", channelId: target },
sendNoMedia: async () =>
await sendDiscordComponentMessageLazy(target, componentSpec, {
replyTo: ctx.replyToId ?? undefined,
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
}),
send: async ({ text, mediaUrl, isFirst }) => {
if (isFirst) {
return await sendDiscordComponentMessageLazy(target, componentSpec, {
mediaUrl,
mediaAccess: ctx.mediaAccess,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
replyTo: ctx.replyToId ?? undefined,
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
});
}
return await send(target, text, {
verbose: false,
mediaUrl,
mediaAccess: ctx.mediaAccess,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
replyTo: ctx.replyToId ?? undefined,
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
});
},
});
return attachChannelToResult("discord", result);
},
sendPayload: async (ctx) =>
await sendDiscordOutboundPayload({
ctx,
fallbackAdapter: discordOutbound,
}),
...createAttachedChannelResultAdapter({
channel: "discord",
sendText: async ({ cfg, to, text, accountId, deps, replyToId, threadId, identity, silent }) => {
sendText: async ({
cfg,
to,
text,
accountId,
deps,
replyToId,
threadId,
identity,
silent,
formatting,
}) => {
if (!silent) {
const webhookResult = await maybeSendDiscordWebhookText({
cfg,
@@ -283,12 +152,18 @@ export const discordOutbound: ChannelOutboundAdapter = {
const send =
resolveOutboundSendDep<DiscordSendFn>(deps, "discord") ??
(await loadDiscordSendRuntime()).sendMessageDiscord;
return await send(resolveDiscordOutboundTarget({ to, threadId }), text, {
verbose: false,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
return await withDiscordDeliveryRetry({
cfg,
accountId,
fn: async () =>
await send(resolveDiscordOutboundTarget({ to, threadId }), text, {
verbose: false,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
cfg,
...resolveDiscordFormattingOptions({ formatting }),
}),
});
},
sendMedia: async ({
@@ -296,6 +171,8 @@ export const discordOutbound: ChannelOutboundAdapter = {
to,
text,
mediaUrl,
audioAsVoice,
mediaAccess,
mediaLocalRoots,
mediaReadFile,
accountId,
@@ -303,28 +180,102 @@ export const discordOutbound: ChannelOutboundAdapter = {
replyToId,
threadId,
silent,
formatting,
}) => {
const send =
resolveOutboundSendDep<DiscordSendFn>(deps, "discord") ??
(await loadDiscordSendRuntime()).sendMessageDiscord;
return await send(resolveDiscordOutboundTarget({ to, threadId }), text, {
verbose: false,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
const target = resolveDiscordOutboundTarget({ to, threadId });
const formattingOptions = resolveDiscordFormattingOptions({ formatting });
if (audioAsVoice && mediaUrl) {
const sendVoice =
resolveOutboundSendDep<DiscordVoiceSendFn>(deps, "discordVoice") ??
(await loadDiscordSendRuntime()).sendVoiceMessageDiscord;
return await withDiscordDeliveryRetry({
cfg,
accountId,
fn: async () =>
await sendVoice(target, mediaUrl, {
cfg,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
}),
});
}
if (text.trim() && mediaUrl && isLikelyDiscordVideoMedia(mediaUrl)) {
await withDiscordDeliveryRetry({
cfg,
accountId,
fn: async () =>
await send(target, text, {
verbose: false,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
cfg,
...formattingOptions,
}),
});
return await withDiscordDeliveryRetry({
cfg,
accountId,
fn: async () =>
await send(target, "", {
verbose: false,
mediaUrl,
mediaAccess,
mediaLocalRoots,
mediaReadFile,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
cfg,
...formattingOptions,
}),
});
}
return await withDiscordDeliveryRetry({
cfg,
accountId,
fn: async () =>
await send(target, text, {
verbose: false,
mediaUrl,
mediaAccess,
mediaLocalRoots,
mediaReadFile,
replyTo: replyToId ?? undefined,
accountId: accountId ?? undefined,
silent: silent ?? undefined,
cfg,
...formattingOptions,
}),
});
},
sendPoll: async ({ cfg, to, poll, accountId, threadId, silent }) =>
await (
await loadDiscordSendRuntime()
).sendPollDiscord(resolveDiscordOutboundTarget({ to, threadId }), poll, {
accountId: accountId ?? undefined,
silent: silent ?? undefined,
await withDiscordDeliveryRetry({
cfg,
accountId,
fn: async () =>
await (
await loadDiscordSendRuntime()
).sendPollDiscord(resolveDiscordOutboundTarget({ to, threadId }), poll, {
accountId: accountId ?? undefined,
silent: silent ?? undefined,
cfg,
}),
}),
}),
afterDeliverPayload: async ({ target }) => {
const threadId = normalizeOptionalStringifiedId(target.threadId);
if (!threadId) {
return;
}
const { getThreadBindingManager } = await loadDiscordThreadBindings();
const manager = getThreadBindingManager(target.accountId ?? undefined);
if (!manager?.getByThreadId(threadId)) {
return;
}
manager.touchThread({ threadId });
},
};

View File

@@ -32,7 +32,7 @@ function createDiscordHarness(params: OutboundPayloadHarnessParams) {
describe("Discord outbound payload contract", () => {
installChannelOutboundPayloadContractSuite({
channel: "discord",
chunking: { mode: "passthrough", longTextLength: 3000 },
chunking: { mode: "split", longTextLength: 3000, maxChunkLength: 2000 },
createHarness: createDiscordHarness,
});
});

View File

@@ -0,0 +1,241 @@
import {
attachChannelToResult,
type ChannelOutboundAdapter,
} from "openclaw/plugin-sdk/channel-send-result";
import {
resolvePayloadMediaUrls,
sendPayloadMediaSequenceOrFallback,
sendTextMediaPayload,
} from "openclaw/plugin-sdk/reply-payload";
import { readDiscordComponentSpec, type DiscordComponentMessageSpec } from "./components.js";
import { createDiscordPayloadSendContext } from "./outbound-send-context.js";
type DiscordComponentSendFn = typeof import("./send.components.js").sendDiscordComponentMessage;
type DiscordSharedInteractiveModule = typeof import("./shared-interactive.js");
let discordComponentSendPromise: Promise<DiscordComponentSendFn> | undefined;
let discordSharedInteractivePromise: Promise<DiscordSharedInteractiveModule> | undefined;
async function sendDiscordComponentMessageLazy(
...args: Parameters<DiscordComponentSendFn>
): ReturnType<DiscordComponentSendFn> {
discordComponentSendPromise ??= import("./send.components.js").then(
(module) => module.sendDiscordComponentMessage,
);
return await (
await discordComponentSendPromise
)(...args);
}
function loadDiscordSharedInteractive(): Promise<DiscordSharedInteractiveModule> {
discordSharedInteractivePromise ??= import("./shared-interactive.js");
return discordSharedInteractivePromise;
}
function hasApprovalChannelData(payload: { channelData?: unknown }): boolean {
const channelData = payload.channelData;
if (!channelData || typeof channelData !== "object" || Array.isArray(channelData)) {
return false;
}
return Boolean((channelData as { execApproval?: unknown }).execApproval);
}
function neutralizeDiscordApprovalMentions(value: string): string {
return value
.replace(/@everyone/gi, "@\u200beveryone")
.replace(/@here/gi, "@\u200bhere")
.replace(/<@/g, "<@\u200b")
.replace(/<#/g, "<#\u200b");
}
export function normalizeDiscordApprovalPayload<
T extends {
text?: string;
channelData?: unknown;
},
>(payload: T): T {
return hasApprovalChannelData(payload) && payload.text
? {
...payload,
text: neutralizeDiscordApprovalMentions(payload.text),
}
: payload;
}
export async function buildDiscordPresentationPayload(params: {
payload: Parameters<NonNullable<ChannelOutboundAdapter["renderPresentation"]>>[0]["payload"];
presentation: Parameters<
NonNullable<ChannelOutboundAdapter["renderPresentation"]>
>[0]["presentation"];
}): Promise<typeof params.payload | null> {
const componentSpec = (await loadDiscordSharedInteractive()).buildDiscordPresentationComponents(
params.presentation,
);
if (!componentSpec) {
return null;
}
return {
...params.payload,
channelData: {
...params.payload.channelData,
discord: {
...(params.payload.channelData?.discord as Record<string, unknown> | undefined),
presentationComponents: componentSpec,
},
},
};
}
function resolveDiscordComponentSpec(
payload: Parameters<NonNullable<ChannelOutboundAdapter["sendPayload"]>>[0]["payload"],
): Promise<DiscordComponentMessageSpec | undefined> {
const discordData = payload.channelData?.discord as
| { components?: unknown; presentationComponents?: DiscordComponentMessageSpec }
| undefined;
const rawComponentSpec =
discordData?.presentationComponents ?? readDiscordComponentSpec(discordData?.components);
if (rawComponentSpec) {
return Promise.resolve(
rawComponentSpec.text
? rawComponentSpec
: {
...rawComponentSpec,
text: payload.text?.trim() ? payload.text : undefined,
},
);
}
if (!payload.interactive) {
return Promise.resolve(undefined);
}
return loadDiscordSharedInteractive().then((module) => {
const interactiveSpec = module.buildDiscordInteractiveComponents(payload.interactive);
if (!interactiveSpec) {
return undefined;
}
return interactiveSpec.text
? interactiveSpec
: {
...interactiveSpec,
text: payload.text?.trim() ? payload.text : undefined,
};
});
}
export async function sendDiscordOutboundPayload(params: {
ctx: Parameters<NonNullable<ChannelOutboundAdapter["sendPayload"]>>[0];
fallbackAdapter: ChannelOutboundAdapter;
}): Promise<Awaited<ReturnType<NonNullable<ChannelOutboundAdapter["sendPayload"]>>>> {
const ctx = params.ctx;
const payload = normalizeDiscordApprovalPayload({
...ctx.payload,
text: ctx.payload.text ?? "",
});
const mediaUrls = resolvePayloadMediaUrls(payload);
const sendContext = await createDiscordPayloadSendContext(ctx);
if (payload.audioAsVoice && mediaUrls.length > 0) {
let lastResult = await sendContext.withRetry(
async () =>
await sendContext.sendVoice(sendContext.target, mediaUrls[0], {
cfg: ctx.cfg,
replyTo: sendContext.resolveReplyTo(),
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
}),
);
if (payload.text?.trim()) {
lastResult = await sendContext.withRetry(
async () =>
await sendContext.send(sendContext.target, payload.text, {
verbose: false,
replyTo: sendContext.resolveReplyTo(),
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
...sendContext.formatting,
}),
);
}
for (const mediaUrl of mediaUrls.slice(1)) {
lastResult = await sendContext.withRetry(
async () =>
await sendContext.send(sendContext.target, "", {
verbose: false,
mediaUrl,
mediaAccess: ctx.mediaAccess,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
replyTo: sendContext.resolveReplyTo(),
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
...sendContext.formatting,
}),
);
}
return attachChannelToResult("discord", lastResult);
}
const componentSpec = await resolveDiscordComponentSpec(payload);
if (!componentSpec) {
return await sendTextMediaPayload({
channel: "discord",
ctx: {
...ctx,
payload,
},
adapter: params.fallbackAdapter,
});
}
const result = await sendPayloadMediaSequenceOrFallback({
text: payload.text ?? "",
mediaUrls,
fallbackResult: { messageId: "", channelId: sendContext.target },
sendNoMedia: async () =>
await sendContext.withRetry(
async () =>
await sendDiscordComponentMessageLazy(sendContext.target, componentSpec, {
replyTo: sendContext.resolveReplyTo(),
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
...sendContext.formatting,
}),
),
send: async ({ text, mediaUrl, isFirst }) => {
if (isFirst) {
return await sendContext.withRetry(
async () =>
await sendDiscordComponentMessageLazy(sendContext.target, componentSpec, {
mediaUrl,
mediaAccess: ctx.mediaAccess,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
replyTo: sendContext.resolveReplyTo(),
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
...sendContext.formatting,
}),
);
}
return await sendContext.withRetry(
async () =>
await sendContext.send(sendContext.target, text, {
verbose: false,
mediaUrl,
mediaAccess: ctx.mediaAccess,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
replyTo: sendContext.resolveReplyTo(),
accountId: ctx.accountId ?? undefined,
silent: ctx.silent ?? undefined,
cfg: ctx.cfg,
...sendContext.formatting,
}),
);
},
});
return attachChannelToResult("discord", result);
}

View File

@@ -0,0 +1,112 @@
import type { OpenClawConfig, ReplyToMode } from "openclaw/plugin-sdk/config-runtime";
import {
resolveOutboundSendDep,
type OutboundSendDeps,
} from "openclaw/plugin-sdk/outbound-runtime";
import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference";
import {
normalizeOptionalString,
normalizeOptionalStringifiedId,
} from "openclaw/plugin-sdk/text-runtime";
import { withDiscordDeliveryRetry } from "./delivery-retry.js";
type DiscordSendRuntime = typeof import("./send.js");
export type DiscordSendFn = DiscordSendRuntime["sendMessageDiscord"];
export type DiscordVoiceSendFn = DiscordSendRuntime["sendVoiceMessageDiscord"];
export type DiscordFormattingOptions = {
textLimit?: number;
maxLinesPerMessage?: number;
tableMode?: NonNullable<Parameters<DiscordSendFn>[2]>["tableMode"];
chunkMode?: NonNullable<Parameters<DiscordSendFn>[2]>["chunkMode"];
};
let discordSendRuntimePromise: Promise<DiscordSendRuntime> | undefined;
export async function loadDiscordSendRuntime(): Promise<DiscordSendRuntime> {
discordSendRuntimePromise ??= import("./send.js");
return await discordSendRuntimePromise;
}
export function resolveDiscordOutboundTarget(params: {
to: string;
threadId?: string | number | null;
}): string {
if (params.threadId == null) {
return params.to;
}
const threadId = normalizeOptionalStringifiedId(params.threadId) ?? "";
if (!threadId) {
return params.to;
}
return `channel:${threadId}`;
}
export function resolveDiscordFormattingOptions(ctx: {
formatting?: DiscordFormattingOptions;
}): DiscordFormattingOptions {
const formatting = ctx.formatting;
return {
textLimit: formatting?.textLimit,
maxLinesPerMessage: formatting?.maxLinesPerMessage,
tableMode: formatting?.tableMode,
chunkMode: formatting?.chunkMode,
};
}
export function createResolvedReplyToFanout(params: {
replyToId?: string | null;
replyToMode?: ReplyToMode;
}): () => string | undefined {
const replyToId = normalizeOptionalString(params.replyToId);
if (!replyToId) {
return () => undefined;
}
if (!params.replyToMode || !isSingleUseReplyToMode(params.replyToMode)) {
return () => replyToId;
}
let current: string | undefined = replyToId;
return () => {
const value = current;
current = undefined;
return value;
};
}
export async function createDiscordPayloadSendContext(ctx: {
cfg: OpenClawConfig;
to: string;
accountId?: string | null;
deps?: OutboundSendDeps;
replyToId?: string | null;
replyToMode?: ReplyToMode;
formatting?: DiscordFormattingOptions;
threadId?: string | number | null;
}): Promise<{
target: string;
formatting: DiscordFormattingOptions;
resolveReplyTo: () => string | undefined;
send: DiscordSendFn;
sendVoice: DiscordVoiceSendFn;
withRetry: <T>(fn: () => Promise<T>) => Promise<T>;
}> {
const runtime = await loadDiscordSendRuntime();
return {
target: resolveDiscordOutboundTarget({ to: ctx.to, threadId: ctx.threadId }),
formatting: resolveDiscordFormattingOptions(ctx),
resolveReplyTo: createResolvedReplyToFanout({
replyToId: ctx.replyToId,
replyToMode: ctx.replyToMode,
}),
send: resolveOutboundSendDep<DiscordSendFn>(ctx.deps, "discord") ?? runtime.sendMessageDiscord,
sendVoice:
resolveOutboundSendDep<DiscordVoiceSendFn>(ctx.deps, "discordVoice") ??
runtime.sendVoiceMessageDiscord,
withRetry: async (fn) =>
await withDiscordDeliveryRetry({
cfg: ctx.cfg,
accountId: ctx.accountId,
fn,
}),
};
}

View File

@@ -5,8 +5,13 @@ import {
type RequestClient,
} from "@buape/carbon";
import { ChannelType, Routes } from "discord-api-types/v10";
import { requireRuntimeConfig, type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
requireRuntimeConfig,
type MarkdownTableMode,
type OpenClawConfig,
} from "openclaw/plugin-sdk/config-runtime";
import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime";
import type { ChunkMode } from "openclaw/plugin-sdk/reply-chunking";
import { resolveDiscordAccount } from "./accounts.js";
import { registerDiscordComponentEntries } from "./components-registry.js";
import {
@@ -157,6 +162,10 @@ type DiscordComponentSendOpts = {
mediaLocalRoots?: readonly string[];
mediaReadFile?: (filePath: string) => Promise<Buffer>;
filename?: string;
textLimit?: number;
maxLinesPerMessage?: number;
tableMode?: MarkdownTableMode;
chunkMode?: ChunkMode;
};
export function registerBuiltDiscordComponentMessage(params: {
@@ -260,6 +269,10 @@ export async function sendDiscordComponentMessage(
mediaAccess: opts.mediaAccess,
replyTo: opts.replyTo,
silent: opts.silent,
textLimit: opts.textLimit,
maxLinesPerMessage: opts.maxLinesPerMessage,
tableMode: opts.tableMode,
chunkMode: opts.chunkMode,
});
}

View File

@@ -3,14 +3,18 @@ import fs from "node:fs/promises";
import path from "node:path";
import { serializePayload, type MessagePayloadObject, type RequestClient } from "@buape/carbon";
import { ChannelType, Routes } from "discord-api-types/v10";
import { requireRuntimeConfig, type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import {
requireRuntimeConfig,
type MarkdownTableMode,
type OpenClawConfig,
} from "openclaw/plugin-sdk/config-runtime";
import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/config-runtime";
import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime";
import { maxBytesForKind } from "openclaw/plugin-sdk/media-runtime";
import { extensionForMime } from "openclaw/plugin-sdk/media-runtime";
import { unlinkIfExists } from "openclaw/plugin-sdk/media-runtime";
import type { PollInput } from "openclaw/plugin-sdk/media-runtime";
import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking";
import { resolveChunkMode, type ChunkMode } from "openclaw/plugin-sdk/reply-chunking";
import type { RetryConfig } from "openclaw/plugin-sdk/retry-runtime";
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
import { convertMarkdownTables, normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
@@ -60,6 +64,10 @@ type DiscordSendOpts = {
rest?: RequestClient;
replyTo?: string;
retry?: RetryConfig;
textLimit?: number;
maxLinesPerMessage?: number;
tableMode?: MarkdownTableMode;
chunkMode?: ChunkMode;
components?: DiscordSendComponents;
embeds?: DiscordSendEmbeds;
silent?: boolean;
@@ -81,6 +89,7 @@ async function sendDiscordThreadTextChunks(params: {
request: DiscordClientRequest;
maxLinesPerMessage?: number;
chunkMode: ReturnType<typeof resolveChunkMode>;
maxChars?: number;
silent?: boolean;
}): Promise<void> {
for (const chunk of params.chunks) {
@@ -95,6 +104,7 @@ async function sendDiscordThreadTextChunks(params: {
undefined,
params.chunkMode,
params.silent,
params.maxChars,
);
}
}
@@ -150,12 +160,18 @@ export async function sendMessageDiscord(
channel: "discord",
accountId: accountInfo.accountId,
});
const chunkMode = resolveChunkMode(cfg, "discord", accountInfo.accountId);
const effectiveTableMode = opts.tableMode ?? tableMode;
const chunkMode = opts.chunkMode ?? resolveChunkMode(cfg, "discord", accountInfo.accountId);
const maxLinesPerMessage = opts.maxLinesPerMessage ?? accountInfo.config.maxLinesPerMessage;
const textLimit =
typeof opts.textLimit === "number" && Number.isFinite(opts.textLimit)
? Math.max(1, Math.min(Math.floor(opts.textLimit), 2000))
: undefined;
const mediaMaxBytes =
typeof accountInfo.config.mediaMaxMb === "number"
? accountInfo.config.mediaMaxMb * 1024 * 1024
: DEFAULT_DISCORD_MEDIA_MAX_MB * 1024 * 1024;
const textWithTables = convertMarkdownTables(text ?? "", tableMode);
const textWithTables = convertMarkdownTables(text ?? "", effectiveTableMode);
const textWithMentions = rewriteDiscordKnownMentions(textWithTables, {
accountId: accountInfo.accountId,
});
@@ -169,8 +185,9 @@ export async function sendMessageDiscord(
if (isForumLikeType(channelType)) {
const threadName = deriveForumThreadName(textWithTables);
const chunks = buildDiscordTextChunks(textWithMentions, {
maxLinesPerMessage: accountInfo.config.maxLinesPerMessage,
maxLinesPerMessage,
chunkMode,
maxChars: textLimit,
});
const starterContent = chunks[0]?.trim() ? chunks[0] : threadName;
const starterComponents = resolveDiscordSendComponents({
@@ -227,19 +244,21 @@ export async function sendMessageDiscord(
mediaMaxBytes,
undefined,
request,
accountInfo.config.maxLinesPerMessage,
maxLinesPerMessage,
undefined,
undefined,
chunkMode,
opts.silent,
textLimit,
);
await sendDiscordThreadTextChunks({
rest,
threadId,
chunks: afterMediaChunks,
request,
maxLinesPerMessage: accountInfo.config.maxLinesPerMessage,
maxLinesPerMessage,
chunkMode,
maxChars: textLimit,
silent: opts.silent,
});
} else {
@@ -248,8 +267,9 @@ export async function sendMessageDiscord(
threadId,
chunks: remainingChunks,
request,
maxLinesPerMessage: accountInfo.config.maxLinesPerMessage,
maxLinesPerMessage,
chunkMode,
maxChars: textLimit,
silent: opts.silent,
});
}
@@ -291,11 +311,12 @@ export async function sendMessageDiscord(
mediaMaxBytes,
opts.replyTo,
request,
accountInfo.config.maxLinesPerMessage,
maxLinesPerMessage,
opts.components,
opts.embeds,
chunkMode,
opts.silent,
textLimit,
);
} else {
result = await sendDiscordText(
@@ -304,11 +325,12 @@ export async function sendMessageDiscord(
textWithMentions,
opts.replyTo,
request,
accountInfo.config.maxLinesPerMessage,
maxLinesPerMessage,
opts.components,
opts.embeds,
chunkMode,
opts.silent,
textLimit,
);
}
} catch (err) {

View File

@@ -357,13 +357,14 @@ async function sendDiscordText(
embeds?: DiscordSendEmbeds,
chunkMode?: ChunkMode,
silent?: boolean,
maxChars?: number,
) {
if (!text.trim()) {
throw new Error("Message must be non-empty for Discord sends");
}
const messageReference = replyTo ? { message_id: replyTo, fail_if_not_exists: false } : undefined;
const flags = silent ? SUPPRESS_NOTIFICATIONS_FLAG : undefined;
const chunks = buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode });
const chunks = buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode, maxChars });
const sendChunk = async (chunk: string, isFirst: boolean) => {
const chunkComponents = resolveDiscordSendComponents({
components,
@@ -418,6 +419,7 @@ async function sendDiscordMedia(
embeds?: DiscordSendEmbeds,
chunkMode?: ChunkMode,
silent?: boolean,
maxChars?: number,
) {
const media = await loadWebMedia(
mediaUrl,
@@ -429,7 +431,9 @@ async function sendDiscordMedia(
media.fileName ||
(media.contentType ? `upload${extensionForMime(media.contentType) ?? ""}` : "") ||
"upload";
const chunks = text ? buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode }) : [];
const chunks = text
? buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode, maxChars })
: [];
const caption = chunks[0] ?? "";
const messageReference = replyTo ? { message_id: replyTo, fail_if_not_exists: false } : undefined;
const flags = silent ? SUPPRESS_NOTIFICATIONS_FLAG : undefined;
@@ -477,6 +481,7 @@ async function sendDiscordMedia(
undefined,
chunkMode,
silent,
maxChars,
);
}
return res;

View File

@@ -1,6 +1,8 @@
import type { ReplyPayload } from "../../auto-reply/reply-payload.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import type { OutboundDeliveryResult } from "../../infra/outbound/deliver-types.js";
import type { OutboundDeliveryFormattingOptions } from "../../infra/outbound/formatting.js";
import type { OutboundIdentity } from "../../infra/outbound/identity-types.js";
import type { OutboundSendDeps } from "../../infra/outbound/send-deps.js";
import type { MessagePresentation, ReplyPayloadDeliveryPin } from "../../interactive/payload.js";
@@ -24,6 +26,9 @@ export type ChannelOutboundContext = {
/** Send image as document to avoid Telegram compression. */
forceDocument?: boolean;
replyToId?: string | null;
replyToIdSource?: "explicit" | "implicit";
replyToMode?: ReplyToMode;
formatting?: OutboundDeliveryFormattingOptions;
threadId?: string | number | null;
accountId?: string | null;
identity?: OutboundIdentity;
@@ -63,9 +68,13 @@ export type ChannelOutboundFormattedContext = ChannelOutboundContext & {
abortSignal?: AbortSignal;
};
export type ChannelOutboundChunkContext = {
formatting?: OutboundDeliveryFormattingOptions;
};
export type ChannelOutboundAdapter = {
deliveryMode: "direct" | "gateway" | "hybrid";
chunker?: ((text: string, limit: number) => string[]) | null;
chunker?: ((text: string, limit: number, ctx?: ChannelOutboundChunkContext) => string[]) | null;
chunkerMode?: "text" | "markdown";
textChunkLimit?: number;
sanitizeText?: (params: { text: string; payload: ReplyPayload }) => string;
@@ -91,6 +100,12 @@ export type ChannelOutboundAdapter = {
payload: ReplyPayload;
hint?: ChannelOutboundPayloadHint;
}) => Promise<void> | void;
afterDeliverPayload?: (params: {
cfg: OpenClawConfig;
target: ChannelOutboundTargetRef;
payload: ReplyPayload;
results: readonly OutboundDeliveryResult[];
}) => Promise<void> | void;
presentationCapabilities?: ChannelPresentationCapabilities;
deliveryCapabilities?: ChannelDeliveryCapabilities;
renderPresentation?: (params: {

View File

@@ -18,6 +18,7 @@ import type { ChannelRuntimeSurface } from "./channel-runtime-surface.types.js";
import type { ConfigWriteTarget } from "./config-writes.js";
export type {
ChannelOutboundAdapter,
ChannelOutboundChunkContext,
ChannelOutboundContext,
ChannelOutboundFormattedContext,
ChannelOutboundPayloadContext,

View File

@@ -33,6 +33,7 @@ export type {
ChannelLogoutContext,
ChannelLogoutResult,
ChannelOutboundAdapter,
ChannelOutboundChunkContext,
ChannelOutboundContext,
ChannelOutboundPayloadHint,
ChannelOutboundTargetRef,

View File

@@ -448,6 +448,213 @@ describe("deliverOutboundPayloads", () => {
expect(results.map((entry) => entry.messageId)).toEqual(["ab", "cd"]);
});
it("uses replyToId only on the first low-level send for single-use reply modes", async () => {
const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "!room",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
textChunkLimit: 2,
chunker: (text, limit) => {
const chunks: string[] = [];
for (let i = 0; i < text.length; i += limit) {
chunks.push(text.slice(i, i + limit));
}
return chunks;
},
sendText,
},
}),
},
]),
);
await deliverOutboundPayloads({
cfg: { channels: { matrix: { textChunkLimit: 2 } } } as OpenClawConfig,
channel: "matrix",
to: "!room",
payloads: [{ text: "abcd" }],
replyToId: "777",
replyToMode: "first",
});
expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual(["777", undefined]);
});
it("suppresses fallback replyToId when replyToMode is off but preserves explicit payload replies", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "!room",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText,
},
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "fallback" }, { text: "explicit", replyToId: "payload-reply" }],
replyToId: "fallback-reply",
replyToMode: "off",
});
expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual([
undefined,
"payload-reply",
]);
expect(
hookMocks.runner.runMessageSending.mock.calls.map(
([event]) => (event as { replyToId?: string }).replyToId,
),
).toEqual([undefined, "payload-reply"]);
});
it("does not let explicit payload replies consume the implicit single-use reply slot", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "!room",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText,
},
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "explicit", replyToId: "payload-reply" }, { text: "fallback" }],
replyToId: "fallback-reply",
replyToMode: "first",
});
expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual([
"payload-reply",
"fallback-reply",
]);
expect(
hookMocks.runner.runMessageSending.mock.calls.map(
([event]) => (event as { replyToId?: string }).replyToId,
),
).toEqual(["payload-reply", "fallback-reply"]);
});
it("skips text-only payloads blanked by message_sending hooks", async () => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "message_sending",
);
hookMocks.runner.runMessageSending.mockResolvedValue({ content: " " });
const sendText = vi.fn().mockResolvedValue({
channel: "matrix" as const,
messageId: "should-not-send",
roomId: "!room",
});
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText,
},
}),
},
]),
);
const results = await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "redact me" }],
});
expect(results).toEqual([]);
expect(sendText).not.toHaveBeenCalled();
});
it("runs adapter after-delivery hooks with the payload delivery results", async () => {
const afterDeliverPayload = vi.fn();
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
sendText: async ({ text }) => ({
channel: "matrix" as const,
messageId: text,
}),
afterDeliverPayload,
},
}),
},
]),
);
await deliverOutboundPayloads({
cfg: {},
channel: "matrix",
to: "!room",
payloads: [{ text: "hello" }],
});
expect(afterDeliverPayload).toHaveBeenCalledWith(
expect.objectContaining({
target: expect.objectContaining({ channel: "matrix", to: "!room" }),
payload: expect.objectContaining({ text: "hello" }),
results: [{ channel: "matrix", messageId: "hello" }],
}),
);
});
it("uses adapter-provided formatted senders and scoped media roots when available", async () => {
const sendText = vi.fn(async ({ text }: { text: string }) => ({
channel: "line" as const,
@@ -681,6 +888,96 @@ describe("deliverOutboundPayloads", () => {
);
});
it("lets explicit formatting options override configured chunking", async () => {
const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "!room",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
chunker: (text, limit) => {
const chunks: string[] = [];
for (let i = 0; i < text.length; i += limit) {
chunks.push(text.slice(i, i + limit));
}
return chunks;
},
textChunkLimit: 4000,
sendText,
},
}),
},
]),
);
await deliverOutboundPayloads({
cfg: { channels: { matrix: { textChunkLimit: 4000 } } } as OpenClawConfig,
channel: "matrix",
to: "!room",
payloads: [{ text: "abcd" }],
formatting: { textLimit: 2, chunkMode: "length" },
});
expect(sendText.mock.calls.map((call) => call[0]?.text)).toEqual(["ab", "cd"]);
});
it("passes formatting options to adapter chunkers before consuming single-use replies", async () => {
const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({
channel: "matrix" as const,
messageId: text,
roomId: "!room",
}));
setActivePluginRegistry(
createTestRegistry([
{
pluginId: "matrix",
source: "test",
plugin: createOutboundTestPlugin({
id: "matrix",
outbound: {
deliveryMode: "direct",
chunker: (text, _limit, ctx) =>
text.split("\n").reduce<string[]>((chunks, line) => {
const maxLines = ctx?.formatting?.maxLinesPerMessage;
if (maxLines === 1) {
chunks.push(line);
return chunks;
}
chunks[chunks.length - 1] = chunks.length
? `${chunks[chunks.length - 1]}\n${line}`
: line;
return chunks;
}, []),
textChunkLimit: 4000,
sendText,
},
}),
},
]),
);
await deliverOutboundPayloads({
cfg: { channels: { matrix: { textChunkLimit: 4000 } } } as OpenClawConfig,
channel: "matrix",
to: "!room",
payloads: [{ text: "line one\nline two" }],
replyToId: "reply-1",
replyToMode: "first",
formatting: { maxLinesPerMessage: 1 },
});
expect(sendText.mock.calls.map((call) => call[0]?.text)).toEqual(["line one", "line two"]);
expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual(["reply-1", undefined]);
});
it("drops text payloads after adapter sanitization removes all content", async () => {
const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" });
const results = await deliverMatrixPayload({

View File

@@ -14,6 +14,7 @@ import type {
ChannelOutboundTargetRef,
} from "../../channels/plugins/types.adapters.js";
import { resolveMirroredTranscriptText } from "../../config/sessions/transcript-mirror.js";
import type { ReplyToMode } from "../../config/types.js";
import type { OpenClawConfig } from "../../config/types.openclaw.js";
import { fireAndForgetHook } from "../../hooks/fire-and-forget.js";
import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js";
@@ -42,6 +43,7 @@ import {
failDelivery,
withActiveDeliveryClaim,
} from "./delivery-queue.js";
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
import type { OutboundIdentity } from "./identity.js";
import type { DeliveryMirror } from "./mirror.js";
import {
@@ -51,6 +53,7 @@ import {
type NormalizedOutboundPayload,
type OutboundPayloadPlan,
} from "./payloads.js";
import { createReplyToDeliveryPolicy } from "./reply-policy.js";
import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js";
import type { OutboundSessionContext } from "./session-context.js";
import type { OutboundChannel } from "./targets.js";
@@ -79,7 +82,11 @@ async function loadChannelBootstrapRuntime() {
return await channelBootstrapRuntimePromise;
}
type Chunker = (text: string, limit: number) => string[];
type Chunker = (
text: string,
limit: number,
ctx?: { formatting?: OutboundDeliveryFormattingOptions },
) => string[];
type ChannelHandler = {
chunker: Chunker | null;
@@ -94,6 +101,11 @@ type ChannelHandler = {
messageId: string;
pin: ReplyPayloadDeliveryPin;
}) => Promise<void>;
afterDeliverPayload?: (params: {
target: ChannelOutboundTargetRef;
payload: ReplyPayload;
results: readonly OutboundDeliveryResult[];
}) => Promise<void>;
buildTargetRef: (overrides?: { threadId?: string | number | null }) => ChannelOutboundTargetRef;
shouldSkipPlainTextSanitization?: (payload: ReplyPayload) => boolean;
resolveEffectiveTextChunkLimit?: (fallbackLimit?: number) => number | undefined;
@@ -147,6 +159,8 @@ type ChannelHandlerParams = {
to: string;
accountId?: string;
replyToId?: string | null;
replyToMode?: ReplyToMode;
formatting?: OutboundDeliveryFormattingOptions;
threadId?: string | number | null;
identity?: OutboundIdentity;
deps?: OutboundSendDeps;
@@ -193,8 +207,8 @@ function createPluginHandler(
audioAsVoice?: boolean;
}): Omit<ChannelOutboundContext, "text" | "mediaUrl"> => ({
...baseCtx,
replyToId: overrides?.replyToId ?? baseCtx.replyToId,
threadId: overrides?.threadId ?? baseCtx.threadId,
replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId,
threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId,
audioAsVoice: overrides?.audioAsVoice,
});
const buildTargetRef = (overrides?: {
@@ -244,6 +258,15 @@ function createPluginHandler(
pin,
})
: undefined,
afterDeliverPayload: outbound.afterDeliverPayload
? async ({ target, payload, results }) =>
outbound.afterDeliverPayload!({
cfg: params.cfg,
target,
payload,
results,
})
: undefined,
shouldSkipPlainTextSanitization: outbound.shouldSkipPlainTextSanitization
? (payload) => outbound.shouldSkipPlainTextSanitization!({ payload })
: undefined,
@@ -309,6 +332,8 @@ function createChannelOutboundContextBase(
to: params.to,
accountId: params.accountId,
replyToId: params.replyToId,
replyToMode: params.replyToMode,
formatting: params.formatting,
threadId: params.threadId,
identity: params.identity,
gifPlayback: params.gifPlayback,
@@ -331,9 +356,12 @@ type DeliverOutboundPayloadsCoreParams = {
accountId?: string;
payloads: ReplyPayload[];
replyToId?: string | null;
replyToMode?: ReplyToMode;
formatting?: OutboundDeliveryFormattingOptions;
threadId?: string | number | null;
identity?: OutboundIdentity;
deps?: OutboundSendDeps;
mediaAccess?: OutboundMediaAccess;
gifPlayback?: boolean;
forceDocument?: boolean;
abortSignal?: AbortSignal;
@@ -481,6 +509,30 @@ async function maybePinDeliveredMessage(params: {
}
}
async function maybeNotifyAfterDeliveredPayload(params: {
handler: ChannelHandler;
payload: ReplyPayload;
target: ChannelOutboundTargetRef;
results: readonly OutboundDeliveryResult[];
}): Promise<void> {
if (!params.handler.afterDeliverPayload || params.results.length === 0) {
return;
}
try {
await params.handler.afterDeliverPayload({
target: params.target,
payload: params.payload,
results: params.results,
});
} catch (err) {
log.warn("Plugin outbound adapter after-delivery hook failed.", {
channel: params.target.channel,
to: params.target.to,
error: formatErrorMessage(err),
});
}
}
async function renderPresentationForDelivery(
handler: ChannelHandler,
payload: ReplyPayload,
@@ -591,7 +643,7 @@ async function applyMessageSendingHook(params: {
{
to: params.to,
content: params.payloadSummary.text,
replyToId: params.payload.replyToId ?? params.replyToId ?? undefined,
replyToId: params.replyToId ?? undefined,
threadId: params.threadId ?? undefined,
metadata: {
channel: params.channel,
@@ -656,6 +708,8 @@ export async function deliverOutboundPayloads(
payloads,
threadId: params.threadId,
replyToId: params.replyToId,
replyToMode: params.replyToMode,
formatting: params.formatting,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,
@@ -742,6 +796,7 @@ async function deliverOutboundPayloadsCore(
cfg,
agentId: params.session?.agentId ?? params.mirror?.agentId,
mediaSources,
mediaAccess: params.mediaAccess,
sessionKey: params.session?.key,
messageProvider: params.session?.key ? undefined : channel,
accountId: params.session?.requesterAccountId ?? accountId,
@@ -750,7 +805,7 @@ async function deliverOutboundPayloadsCore(
requesterSenderUsername: params.session?.requesterSenderUsername,
requesterSenderE164: params.session?.requesterSenderE164,
})
: {};
: (params.mediaAccess ?? {});
const results: OutboundDeliveryResult[] = [];
const handler = await createChannelHandler({
cfg,
@@ -759,6 +814,8 @@ async function deliverOutboundPayloadsCore(
deps,
accountId,
replyToId: params.replyToId,
replyToMode: params.replyToMode,
formatting: params.formatting,
threadId: params.threadId,
identity: params.identity,
gifPlayback: params.gifPlayback,
@@ -772,22 +829,39 @@ async function deliverOutboundPayloadsCore(
fallbackLimit: handler.textChunkLimit,
})
: undefined;
const textLimit = handler.resolveEffectiveTextChunkLimit
? handler.resolveEffectiveTextChunkLimit(configuredTextLimit)
: configuredTextLimit;
const chunkMode = handler.chunker ? resolveChunkMode(cfg, channel, accountId) : "length";
const textLimit =
params.formatting?.textLimit ??
(handler.resolveEffectiveTextChunkLimit
? handler.resolveEffectiveTextChunkLimit(configuredTextLimit)
: configuredTextLimit);
const chunkMode = handler.chunker
? (params.formatting?.chunkMode ?? resolveChunkMode(cfg, channel, accountId))
: "length";
const { resolveCurrentReplyTo, applyReplyToConsumption } = createReplyToDeliveryPolicy({
replyToId: params.replyToId,
replyToMode: params.replyToMode,
});
const chunkTextForDelivery = (text: string, limit: number): string[] =>
params.formatting
? handler.chunker!(text, limit, { formatting: params.formatting })
: handler.chunker!(text, limit);
const sendTextChunks = async (
text: string,
overrides?: {
replyToId?: string | null;
replyToIdSource?: "explicit" | "implicit";
threadId?: string | number | null;
audioAsVoice?: boolean;
},
) => {
const consumeReplyTo = <T extends NonNullable<typeof overrides>>(value: T): T =>
applyReplyToConsumption(value, {
consumeImplicitReply: value.replyToIdSource === "implicit",
});
throwIfAborted(abortSignal);
if (!handler.chunker || textLimit === undefined) {
results.push(await handler.sendText(text, overrides));
results.push(await handler.sendText(text, consumeReplyTo(overrides ?? {})));
return;
}
if (chunkMode === "newline") {
@@ -801,21 +875,21 @@ async function deliverOutboundPayloadsCore(
blockChunks.push(text);
}
for (const blockChunk of blockChunks) {
const chunks = handler.chunker(blockChunk, textLimit);
const chunks = chunkTextForDelivery(blockChunk, textLimit);
if (!chunks.length && blockChunk) {
chunks.push(blockChunk);
}
for (const chunk of chunks) {
throwIfAborted(abortSignal);
results.push(await handler.sendText(chunk, overrides));
results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {})));
}
}
return;
}
const chunks = handler.chunker(text, textLimit);
const chunks = chunkTextForDelivery(text, textLimit);
for (const chunk of chunks) {
throwIfAborted(abortSignal);
results.push(await handler.sendText(chunk, overrides));
results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {})));
}
};
const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler);
@@ -857,32 +931,51 @@ async function deliverOutboundPayloadsCore(
to,
channel,
accountId,
replyToId: params.replyToId,
replyToId: resolveCurrentReplyTo(payload).replyToId,
threadId: params.threadId,
});
if (hookResult.cancelled) {
continue;
}
const effectivePayload = await renderPresentationForDelivery(handler, hookResult.payload);
const renderedPayload = await renderPresentationForDelivery(handler, hookResult.payload);
const normalizedEffectivePayload = handler.normalizePayload
? handler.normalizePayload(renderedPayload)
: renderedPayload;
const effectivePayload = normalizedEffectivePayload
? normalizeEmptyPayloadForDelivery(normalizedEffectivePayload)
: null;
if (!effectivePayload) {
continue;
}
payloadSummary = buildPayloadSummary(effectivePayload);
params.onPayload?.(payloadSummary);
const replyToResolution = resolveCurrentReplyTo(effectivePayload);
const sendOverrides = {
replyToId: effectivePayload.replyToId ?? params.replyToId ?? undefined,
replyToId: replyToResolution.replyToId,
replyToIdSource: replyToResolution.source,
threadId: params.threadId ?? undefined,
audioAsVoice: effectivePayload.audioAsVoice === true ? true : undefined,
forceDocument: params.forceDocument,
};
const applySendReplyToConsumption = <T extends typeof sendOverrides>(overrides: T): T =>
applyReplyToConsumption(overrides, {
consumeImplicitReply: replyToResolution.source === "implicit",
});
const deliveryTarget = handler.buildTargetRef({ threadId: sendOverrides.threadId });
if (
handler.sendPayload &&
hasReplyPayloadContent({
(hasReplyPayloadContent({
presentation: effectivePayload.presentation,
interactive: effectivePayload.interactive,
channelData: effectivePayload.channelData,
})
}) ||
effectivePayload.audioAsVoice === true)
) {
const delivery = await handler.sendPayload(effectivePayload, sendOverrides);
const delivery = await handler.sendPayload(
effectivePayload,
applySendReplyToConsumption(sendOverrides),
);
results.push(delivery);
await maybePinDeliveredMessage({
handler,
@@ -890,6 +983,12 @@ async function deliverOutboundPayloadsCore(
target: deliveryTarget,
messageId: delivery.messageId,
});
await maybeNotifyAfterDeliveredPayload({
handler,
payload: effectivePayload,
target: deliveryTarget,
results: [delivery],
});
emitMessageSent({
success: true,
content: payloadSummary.text,
@@ -900,7 +999,12 @@ async function deliverOutboundPayloadsCore(
if (payloadSummary.mediaUrls.length === 0) {
const beforeCount = results.length;
if (handler.sendFormattedText) {
results.push(...(await handler.sendFormattedText(payloadSummary.text, sendOverrides)));
results.push(
...(await handler.sendFormattedText(
payloadSummary.text,
applySendReplyToConsumption(sendOverrides),
)),
);
} else {
await sendTextChunks(payloadSummary.text, sendOverrides);
}
@@ -913,6 +1017,12 @@ async function deliverOutboundPayloadsCore(
target: deliveryTarget,
messageId: pinMessageId,
});
await maybeNotifyAfterDeliveredPayload({
handler,
payload: effectivePayload,
target: deliveryTarget,
results: deliveredResults,
});
emitMessageSent({
success: results.length > beforeCount,
content: payloadSummary.text,
@@ -947,6 +1057,12 @@ async function deliverOutboundPayloadsCore(
target: deliveryTarget,
messageId: pinMessageId,
});
await maybeNotifyAfterDeliveredPayload({
handler,
payload: effectivePayload,
target: deliveryTarget,
results: deliveredResults,
});
emitMessageSent({
success: results.length > beforeCount,
content: payloadSummary.text,
@@ -957,6 +1073,7 @@ async function deliverOutboundPayloadsCore(
let firstMessageId: string | undefined;
let lastMessageId: string | undefined;
const beforeCount = results.length;
await sendMediaWithLeadingCaption({
mediaUrls: payloadSummary.mediaUrls,
caption: payloadSummary.text,
@@ -966,14 +1083,18 @@ async function deliverOutboundPayloadsCore(
const delivery = await handler.sendFormattedMedia(
caption ?? "",
mediaUrl,
sendOverrides,
applySendReplyToConsumption(sendOverrides),
);
results.push(delivery);
firstMessageId ??= delivery.messageId;
lastMessageId = delivery.messageId;
return;
}
const delivery = await handler.sendMedia(caption ?? "", mediaUrl, sendOverrides);
const delivery = await handler.sendMedia(
caption ?? "",
mediaUrl,
applySendReplyToConsumption(sendOverrides),
);
results.push(delivery);
firstMessageId ??= delivery.messageId;
lastMessageId = delivery.messageId;
@@ -985,6 +1106,12 @@ async function deliverOutboundPayloadsCore(
target: deliveryTarget,
messageId: firstMessageId,
});
await maybeNotifyAfterDeliveredPayload({
handler,
payload: effectivePayload,
target: deliveryTarget,
results: results.slice(beforeCount),
});
emitMessageSent({
success: true,
content: payloadSummary.text,

View File

@@ -118,6 +118,8 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig)
payloads: entry.payloads,
threadId: entry.threadId,
replyToId: entry.replyToId,
replyToMode: entry.replyToMode,
formatting: entry.formatting,
bestEffort: entry.bestEffort,
gifPlayback: entry.gifPlayback,
forceDocument: entry.forceDocument,

View File

@@ -2,7 +2,9 @@ import fs from "node:fs";
import path from "node:path";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { resolveStateDir } from "../../config/paths.js";
import type { ReplyToMode } from "../../config/types.js";
import { generateSecureUuid } from "../secure-random.js";
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
import type { OutboundMirror } from "./mirror.js";
import type { OutboundSessionContext } from "./session-context.js";
import type { OutboundChannel } from "./targets.js";
@@ -22,6 +24,8 @@ export type QueuedDeliveryPayload = {
payloads: ReplyPayload[];
threadId?: string | number | null;
replyToId?: string | null;
replyToMode?: ReplyToMode;
formatting?: OutboundDeliveryFormattingOptions;
bestEffort?: boolean;
gifPlayback?: boolean;
forceDocument?: boolean;
@@ -142,6 +146,8 @@ export async function enqueueDelivery(
payloads: params.payloads,
threadId: params.threadId,
replyToId: params.replyToId,
replyToMode: params.replyToMode,
formatting: params.formatting,
bestEffort: params.bestEffort,
gifPlayback: params.gifPlayback,
forceDocument: params.forceDocument,

View File

@@ -156,6 +156,14 @@ describe("delivery-queue recovery", () => {
channel: "demo-channel-a",
to: "+1",
payloads: [{ text: "a" }],
replyToId: "root-message",
replyToMode: "first",
formatting: {
textLimit: 1234,
maxLinesPerMessage: 7,
tableMode: "off",
chunkMode: "newline",
},
bestEffort: true,
gifPlayback: true,
silent: true,
@@ -186,6 +194,14 @@ describe("delivery-queue recovery", () => {
bestEffort: true,
gifPlayback: true,
silent: true,
replyToId: "root-message",
replyToMode: "first",
formatting: {
textLimit: 1234,
maxLinesPerMessage: 7,
tableMode: "off",
chunkMode: "newline",
},
gatewayClientScopes: ["operator.write"],
mirror: {
sessionKey: "agent:main:main",

View File

@@ -0,0 +1,9 @@
import type { ChunkMode } from "../../auto-reply/chunk.js";
import type { MarkdownTableMode } from "../../config/types.js";
export type OutboundDeliveryFormattingOptions = {
textLimit?: number;
maxLinesPerMessage?: number;
tableMode?: MarkdownTableMode;
chunkMode?: ChunkMode;
};

View File

@@ -0,0 +1,57 @@
import { isSingleUseReplyToMode } from "../../auto-reply/reply/reply-reference.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import type { ReplyToMode } from "../../config/types.js";
export type ReplyToOverride = {
replyToId?: string | null;
replyToIdSource?: ReplyToResolution["source"];
};
export type ReplyToResolution = {
replyToId?: string;
source?: "explicit" | "implicit";
};
export function createReplyToDeliveryPolicy(params: {
replyToId?: string | null;
replyToMode?: ReplyToMode;
}): {
resolveCurrentReplyTo: (payload: ReplyPayload) => ReplyToResolution;
applyReplyToConsumption: <T extends ReplyToOverride>(
overrides: T,
options?: { consumeImplicitReply?: boolean },
) => T;
} {
const singleUseReplyTo = params.replyToMode ? isSingleUseReplyToMode(params.replyToMode) : false;
let replyToConsumed = false;
const resolveCurrentReplyTo = (payload: ReplyPayload): ReplyToResolution => {
if (payload.replyToId != null) {
return payload.replyToId ? { replyToId: payload.replyToId, source: "explicit" } : {};
}
const replyToId = (params.replyToMode === "off" ? undefined : params.replyToId) ?? undefined;
if (!replyToId) {
return {};
}
if (!singleUseReplyTo) {
return { replyToId, source: "implicit" };
}
return replyToConsumed ? {} : { replyToId, source: "implicit" };
};
const applyReplyToConsumption = <T extends ReplyToOverride>(
overrides: T,
options?: { consumeImplicitReply?: boolean },
): T => {
if (!options?.consumeImplicitReply || !overrides.replyToId || !singleUseReplyTo) {
return overrides;
}
if (replyToConsumed) {
return { ...overrides, replyToId: undefined };
}
replyToConsumed = true;
return overrides;
};
return { resolveCurrentReplyTo, applyReplyToConsumption };
}

View File

@@ -1,7 +1,17 @@
export { createRuntimeOutboundDelegates } from "../channels/plugins/runtime-forwarders.js";
export { resolveOutboundSendDep, type OutboundSendDeps } from "../infra/outbound/send-deps.js";
export { resolveAgentOutboundIdentity, type OutboundIdentity } from "../infra/outbound/identity.js";
export type { OutboundDeliveryFormattingOptions } from "../infra/outbound/formatting.js";
export {
deliverOutboundPayloads,
type DeliverOutboundPayloadsParams,
type OutboundDeliveryResult,
} from "../infra/outbound/deliver.js";
export { sanitizeForPlainText } from "../infra/outbound/sanitize-text.js";
export {
buildOutboundSessionContext,
type OutboundSessionContext,
} from "../infra/outbound/session-context.js";
export {
createOutboundPayloadPlan,
projectOutboundPayloadPlanForDelivery,

View File

@@ -13,6 +13,7 @@ import {
resolveOutboundMediaUrls,
resolveSendableOutboundReplyParts,
resolveTextChunksWithFallback,
sendTextMediaPayload,
sendMediaWithLeadingCaption,
sendPayloadWithChunkedTextAndMedia,
} from "./reply-payload.js";
@@ -89,6 +90,83 @@ describe("sendPayloadWithChunkedTextAndMedia", () => {
});
});
describe("sendTextMediaPayload", () => {
it("uses an implicit single-use reply only for the first text chunk", async () => {
const sendText = vi.fn(async ({ text }) => ({ channel: "test", messageId: text }));
await sendTextMediaPayload({
channel: "test",
ctx: {
cfg: {},
to: "target",
text: "",
payload: { text: "abcdef" },
replyToId: "reply-1",
replyToIdSource: "implicit",
replyToMode: "first",
},
adapter: {
textChunkLimit: 2,
chunker: (text) => ["ab", "cd", text.slice(4)],
sendText,
},
});
expect(sendText.mock.calls.map((call) => call[0].replyToId)).toEqual([
"reply-1",
undefined,
undefined,
]);
});
it("uses an implicit single-use reply only for the first media fallback send", async () => {
const sendMedia = vi.fn(async ({ mediaUrl }) => ({ channel: "test", messageId: mediaUrl }));
await sendTextMediaPayload({
channel: "test",
ctx: {
cfg: {},
to: "target",
text: "",
payload: { text: "caption", mediaUrls: ["https://example.com/1", "https://example.com/2"] },
replyToId: "reply-1",
replyToIdSource: "implicit",
replyToMode: "batched",
},
adapter: { sendMedia },
});
expect(sendMedia.mock.calls.map((call) => call[0].replyToId)).toEqual(["reply-1", undefined]);
});
it("keeps explicit reply tags independent from single-use implicit reply modes", async () => {
const sendText = vi.fn(async ({ text }) => ({ channel: "test", messageId: text }));
await sendTextMediaPayload({
channel: "test",
ctx: {
cfg: {},
to: "target",
text: "",
payload: { text: "abcd" },
replyToId: "explicit-reply",
replyToIdSource: "explicit",
replyToMode: "first",
},
adapter: {
textChunkLimit: 2,
chunker: () => ["ab", "cd"],
sendText,
},
});
expect(sendText.mock.calls.map((call) => call[0].replyToId)).toEqual([
"explicit-reply",
"explicit-reply",
]);
});
});
describe("normalizeOutboundReplyPayload", () => {
it("strips internal-only local media trust flags from loose payload objects", () => {
expect(

View File

@@ -1,4 +1,5 @@
import type { ReplyPayload as InternalReplyPayload } from "../auto-reply/reply-payload.js";
import { isSingleUseReplyToMode } from "../auto-reply/reply/reply-reference.js";
import type { ChannelOutboundAdapter } from "../channels/plugins/outbound.types.js";
import { normalizeLowercaseStringOrEmpty, readStringValue } from "../shared/string-coerce.js";
@@ -38,6 +39,26 @@ type SendPayloadAdapter = Pick<
const REASONING_PREFIX = "reasoning:";
function createSendPayloadReplyToFanout(ctx: SendPayloadContext): () => string | undefined {
const replyToId = ctx.replyToId ?? undefined;
if (!replyToId) {
return () => undefined;
}
const singleUse =
ctx.replyToIdSource !== "explicit" &&
ctx.replyToMode !== undefined &&
isSingleUseReplyToMode(ctx.replyToMode);
if (!singleUse) {
return () => replyToId;
}
let current: string | undefined = replyToId;
return () => {
const value = current;
current = undefined;
return value;
};
}
function trimLeadingMarkdownQuoteMarkers(text: string): string {
let candidate = text.trimStart();
while (candidate.startsWith(">")) {
@@ -289,6 +310,7 @@ export async function sendTextMediaPayload(params: {
if (!text && urls.length === 0) {
return { channel: params.channel, messageId: "" };
}
const nextReplyToId = createSendPayloadReplyToFanout(params.ctx);
if (urls.length > 0) {
const lastResult = await sendPayloadMediaSequence({
text,
@@ -298,15 +320,23 @@ export async function sendTextMediaPayload(params: {
...params.ctx,
text,
mediaUrl,
replyToId: nextReplyToId(),
}),
});
return lastResult ?? { channel: params.channel, messageId: "" };
}
const limit = params.adapter.textChunkLimit;
const chunks = limit && params.adapter.chunker ? params.adapter.chunker(text, limit) : [text];
const chunks =
limit && params.adapter.chunker
? params.adapter.chunker(text, limit, { formatting: params.ctx.formatting })
: [text];
let lastResult: Awaited<ReturnType<NonNullable<typeof params.adapter.sendText>>>;
for (const chunk of chunks) {
lastResult = await params.adapter.sendText!({ ...params.ctx, text: chunk });
lastResult = await params.adapter.sendText!({
...params.ctx,
text: chunk,
replyToId: nextReplyToId(),
});
}
return lastResult!;
}