From 5e640b93dae6a273286c66d93d20a82ae56ed533 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 25 Apr 2026 02:54:29 +0100 Subject: [PATCH] fix(discord): preserve outbound reply threading --- CHANGELOG.md | 1 + docs/plugins/sdk-channel-plugins.md | 8 + docs/plugins/sdk-migration.md | 2 +- docs/plugins/sdk-subpaths.md | 2 +- extensions/discord/src/channel.ts | 110 +- extensions/discord/src/delivery-retry.ts | 52 + .../src/monitor/reply-delivery.test.ts | 938 +++--------------- .../discord/src/monitor/reply-delivery.ts | 624 +++--------- .../src/outbound-adapter.test-harness.ts | 16 + .../discord/src/outbound-adapter.test.ts | 331 +++++- extensions/discord/src/outbound-adapter.ts | 329 +++--- .../src/outbound-payload.contract.test.ts | 2 +- extensions/discord/src/outbound-payload.ts | 241 +++++ .../discord/src/outbound-send-context.ts | 112 +++ extensions/discord/src/send.components.ts | 15 +- extensions/discord/src/send.outbound.ts | 42 +- extensions/discord/src/send.shared.ts | 9 +- src/channels/plugins/outbound.types.ts | 17 +- src/channels/plugins/types.adapters.ts | 1 + src/channels/plugins/types.ts | 1 + src/infra/outbound/deliver.test.ts | 297 ++++++ src/infra/outbound/deliver.ts | 173 +++- src/infra/outbound/delivery-queue-recovery.ts | 2 + src/infra/outbound/delivery-queue-storage.ts | 6 + .../outbound/delivery-queue.recovery.test.ts | 16 + src/infra/outbound/formatting.ts | 9 + src/infra/outbound/reply-policy.ts | 57 ++ src/plugin-sdk/outbound-runtime.ts | 10 + src/plugin-sdk/reply-payload.test.ts | 78 ++ src/plugin-sdk/reply-payload.ts | 34 +- 30 files changed, 1910 insertions(+), 1625 deletions(-) create mode 100644 extensions/discord/src/delivery-retry.ts create mode 100644 extensions/discord/src/outbound-payload.ts create mode 100644 extensions/discord/src/outbound-send-context.ts create mode 100644 src/infra/outbound/formatting.ts create mode 100644 src/infra/outbound/reply-policy.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b7f3a8dfad..604e0a710cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ Docs: https://docs.openclaw.ai - Providers/Google: map `/think adaptive` to Gemini dynamic thinking instead of a fixed medium/high budget, using Gemini 3's provider default and Gemini 2.5's `thinkingBudget: -1`. Fixes #71316. Thanks @steipete. - Providers/MiniMax: keep M2.7 chat model metadata text-only so image tool requests route through `MiniMax-VL-01` instead of the Anthropic-compatible chat endpoint. Fixes #71296. Thanks @ilker-cevikkaya. - Discord/replies: run `message_sending` plugin hooks for Discord reply delivery, including DM targets, so plugins can transform or cancel outbound Discord replies consistently with other channels. Fixes #59350. (#71094) Thanks @wei840222. +- Discord/replies: preserve single-use native reply semantics across shared payload fallback, component, voice, and queued delivery paths, so explicit reply tags no longer consume implicit reply slots and chunked fallback sends reply only once. Thanks @steipete. - Control UI/commands: carry provider-owned thinking option ids/labels in session rows and defaults so fresh sessions show and accept dynamic modes such as `adaptive`, `xhigh`, and `max`. Fixes #71269. Thanks @Young-Khalil. - Image generation: make explicit `model=` overrides exact-only so failed `openai/gpt-image-2` requests no longer fall through to Gemini or other configured providers, and update `image_generate list` to mention OpenAI Codex OAuth as valid auth for `openai/gpt-image-2`. Fixes #71290 and #71231. Thanks @Young-Khalil and @steipete. - Providers/GitHub Copilot: keep the plugin stream wrapper from claiming transport selection before OpenClaw picks a boundary-aware stream path, avoiding Pi's stale fallback Copilot headers on normal model turns. Thanks @steipete. diff --git a/docs/plugins/sdk-channel-plugins.md b/docs/plugins/sdk-channel-plugins.md index b4939dd20b5..bc468b37922 100644 --- a/docs/plugins/sdk-channel-plugins.md +++ b/docs/plugins/sdk-channel-plugins.md @@ -466,6 +466,14 @@ should use `resolveInboundMentionDecision({ facts, policy })`. You can also pass raw adapter objects instead of the declarative options if you need full control. + + Raw outbound adapters may define a `chunker(text, limit, ctx)` function. + The optional `ctx.formatting` carries delivery-time formatting decisions + such as `maxLinesPerMessage`; apply it before sending so reply threading + and chunk boundaries are resolved once by shared outbound delivery. + Send contexts also include `replyToIdSource` (`implicit` or `explicit`) + when a native reply target was resolved, so payload helpers can preserve + explicit reply tags without consuming an implicit single-use reply slot. diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index ac700234903..fd91c9f996f 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -262,7 +262,7 @@ releases. | `plugin-sdk/inbound-reply-dispatch` | Inbound reply helpers | Shared record-and-dispatch helpers | | `plugin-sdk/messaging-targets` | Messaging target parsing | Target parsing/matching helpers | | `plugin-sdk/outbound-media` | Outbound media helpers | Shared outbound media loading | - | `plugin-sdk/outbound-runtime` | Outbound runtime helpers | Outbound identity/send delegate and payload planning helpers | + | `plugin-sdk/outbound-runtime` | Outbound runtime helpers | Outbound delivery, identity/send delegate, session, formatting, and payload planning helpers | | `plugin-sdk/thread-bindings-runtime` | Thread-binding helpers | Thread-binding lifecycle and adapter helpers | | `plugin-sdk/agent-media-payload` | Legacy media payload helpers | Agent media payload builder for legacy field layouts | | `plugin-sdk/channel-runtime` | Deprecated compatibility shim | Legacy channel runtime utilities only | diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index 6469d96512e..8726e1c3f55 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -50,7 +50,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview) | `plugin-sdk/inbound-reply-dispatch` | Shared inbound record-and-dispatch helpers | | `plugin-sdk/messaging-targets` | Target parsing/matching helpers | | `plugin-sdk/outbound-media` | Shared outbound media loading helpers | - | `plugin-sdk/outbound-runtime` | Outbound identity, send delegate, and payload planning helpers | + | `plugin-sdk/outbound-runtime` | Outbound delivery, identity, send delegate, session, formatting, and payload planning helpers | | `plugin-sdk/poll-runtime` | Narrow poll normalization helpers | | `plugin-sdk/thread-bindings-runtime` | Thread-binding lifecycle and adapter helpers | | `plugin-sdk/agent-media-payload` | Legacy agent media payload builder | diff --git a/extensions/discord/src/channel.ts b/extensions/discord/src/channel.ts index 1d510ab1638..3ccf4632f81 100644 --- a/extensions/discord/src/channel.ts +++ b/extensions/discord/src/channel.ts @@ -15,7 +15,6 @@ import { } from "openclaw/plugin-sdk/directory-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { createLazyRuntimeModule } from "openclaw/plugin-sdk/lazy-runtime"; -import { resolveOutboundSendDep } from "openclaw/plugin-sdk/outbound-runtime"; import { sleepWithAbort } from "openclaw/plugin-sdk/runtime-env"; import { createComputedAccountStatusAdapter, @@ -49,16 +48,12 @@ import { resolveDiscordGroupRequireMention, resolveDiscordGroupToolPolicy, } from "./group-policy.js"; -import { isLikelyDiscordVideoMedia } from "./media-detection.js"; import { setThreadBindingIdleTimeoutBySessionKey, setThreadBindingMaxAgeBySessionKey, } from "./monitor/thread-bindings.session-updates.js"; -import { - looksLikeDiscordTargetId, - normalizeDiscordMessagingTarget, - normalizeDiscordOutboundTarget, -} from "./normalize.js"; +import { looksLikeDiscordTargetId, normalizeDiscordMessagingTarget } from "./normalize.js"; +import { discordOutbound } from "./outbound-adapter.js"; import { resolveDiscordOutboundSessionRoute } from "./outbound-session-route.js"; import type { DiscordProbe } from "./probe.js"; import { getDiscordRuntime } from "./runtime.js"; @@ -69,8 +64,6 @@ import { createDiscordPluginBase, discordConfigAdapter } from "./shared.js"; import { collectDiscordStatusIssues } from "./status-issues.js"; import { parseDiscordTarget } from "./target-parsing.js"; -type DiscordSendFn = typeof import("./send.js").sendMessageDiscord; - let discordProviderRuntimePromise: | Promise | undefined; @@ -145,22 +138,6 @@ function resolveRuntimeDiscordMessageActions() { } } -function resolveOptionalDiscordRuntime() { - try { - return getDiscordRuntime(); - } catch { - return null; - } -} - -async function resolveDiscordSend(deps?: { [channelId: string]: unknown }): Promise { - return ( - resolveOutboundSendDep(deps, "discord") ?? - resolveOptionalDiscordRuntime()?.channel?.discord?.sendMessageDiscord ?? - (await loadDiscordSendModule()).sendMessageDiscord - ); -} - const discordMessageActions = { describeMessageTool: ( ctx: Parameters>[0], @@ -811,84 +788,13 @@ export const discordPlugin: ChannelPlugin }, }, outbound: { - base: { - deliveryMode: "direct", - chunker: null, - textChunkLimit: 2000, - pollMaxOptions: 10, - shouldTreatDeliveredTextAsVisible: shouldTreatDiscordDeliveredTextAsVisible, - shouldSuppressLocalPayloadPrompt: ({ cfg, accountId, payload }) => - shouldSuppressLocalDiscordExecApprovalPrompt({ - cfg, - accountId, - payload, - }), - resolveTarget: ({ to }) => normalizeDiscordOutboundTarget(to), - }, - attachedResults: { - channel: "discord", - sendText: async ({ cfg, to, text, accountId, deps, replyToId, threadId, silent }) => { - const send = await resolveDiscordSend(deps); - return await send(resolveDiscordAttachedOutboundTarget({ to, threadId }), text, { - verbose: false, - cfg, - replyTo: replyToId ?? undefined, - accountId: accountId ?? undefined, - silent: silent ?? undefined, - }); - }, - sendMedia: async ({ + ...discordOutbound, + shouldTreatDeliveredTextAsVisible: shouldTreatDiscordDeliveredTextAsVisible, + shouldSuppressLocalPayloadPrompt: ({ cfg, accountId, payload }) => + shouldSuppressLocalDiscordExecApprovalPrompt({ cfg, - to, - text, - mediaUrl, - mediaLocalRoots, - mediaReadFile, accountId, - deps, - replyToId, - threadId, - silent, - }) => { - const send = await resolveDiscordSend(deps); - const target = resolveDiscordAttachedOutboundTarget({ to, threadId }); - if (text.trim() && mediaUrl && isLikelyDiscordVideoMedia(mediaUrl)) { - await send(target, text, { - verbose: false, - cfg, - replyTo: replyToId ?? undefined, - accountId: accountId ?? undefined, - silent: silent ?? undefined, - }); - return await send(target, "", { - verbose: false, - cfg, - mediaUrl, - mediaLocalRoots, - mediaReadFile, - accountId: accountId ?? undefined, - silent: silent ?? undefined, - }); - } - return await send(target, text, { - verbose: false, - cfg, - mediaUrl, - mediaLocalRoots, - mediaReadFile, - replyTo: replyToId ?? undefined, - accountId: accountId ?? undefined, - silent: silent ?? undefined, - }); - }, - sendPoll: async ({ cfg, to, poll, accountId, threadId, silent }) => - await ( - await loadDiscordSendModule() - ).sendPollDiscord(resolveDiscordAttachedOutboundTarget({ to, threadId }), poll, { - cfg, - accountId: accountId ?? undefined, - silent: silent ?? undefined, - }), - }, + payload, + }), }, }); diff --git a/extensions/discord/src/delivery-retry.ts b/extensions/discord/src/delivery-retry.ts new file mode 100644 index 00000000000..cc0c45efa35 --- /dev/null +++ b/extensions/discord/src/delivery-retry.ts @@ -0,0 +1,52 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; +import { + resolveRetryConfig, + retryAsync, + type RetryConfig, +} from "openclaw/plugin-sdk/retry-runtime"; +import { resolveDiscordAccount } from "./accounts.js"; + +const DISCORD_DELIVERY_RETRY_DEFAULTS = { + attempts: 3, + minDelayMs: 1000, + maxDelayMs: 30_000, + jitter: 0, +} satisfies Required; + +function isRetryableDiscordDeliveryError(err: unknown): boolean { + const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode; + return status === 429 || (status !== undefined && status >= 500); +} + +function getDiscordDeliveryRetryAfterMs(err: unknown): number | undefined { + if (!err || typeof err !== "object") { + return undefined; + } + if ( + "retryAfter" in err && + typeof err.retryAfter === "number" && + Number.isFinite(err.retryAfter) + ) { + return err.retryAfter * 1000; + } + const retryAfterRaw = (err as { headers?: Record }).headers?.["retry-after"]; + if (!retryAfterRaw) { + return undefined; + } + const retryAfterMs = Number(retryAfterRaw) * 1000; + return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined; +} + +export async function withDiscordDeliveryRetry(params: { + cfg: OpenClawConfig; + accountId?: string | null; + fn: () => Promise; +}): Promise { + const account = resolveDiscordAccount({ cfg: params.cfg, accountId: params.accountId }); + const retryConfig = resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, account.config.retry); + return await retryAsync(params.fn, { + ...retryConfig, + shouldRetry: (err) => isRetryableDiscordDeliveryError(err), + retryAfterMs: getDiscordDeliveryRetryAfterMs, + }); +} diff --git a/extensions/discord/src/monitor/reply-delivery.test.ts b/extensions/discord/src/monitor/reply-delivery.test.ts index 074ce9b10ec..3b27e432b65 100644 --- a/extensions/discord/src/monitor/reply-delivery.test.ts +++ b/extensions/discord/src/monitor/reply-delivery.test.ts @@ -1,48 +1,21 @@ +import type { RequestClient } from "@buape/carbon"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { EMPTY_DISCORD_TEST_CONFIG } from "../test-support/config.js"; -import { - __testing as threadBindingTesting, - createThreadBindingManager, -} from "./thread-bindings.js"; +const deliverOutboundPayloadsMock = vi.hoisted(() => vi.fn(async () => [])); const sendMessageDiscordMock = vi.hoisted(() => vi.fn()); const sendVoiceMessageDiscordMock = vi.hoisted(() => vi.fn()); -const sendWebhookMessageDiscordMock = vi.hoisted(() => vi.fn()); -const sendDiscordTextMock = vi.hoisted(() => vi.fn()); -const messageHookRunner = vi.hoisted(() => ({ - hasHooks: vi.fn<(name: string) => boolean>(() => false), - runMessageSending: vi.fn(), -})); -const buildDiscordSendErrorMock = vi.hoisted(() => - vi.fn<(err: unknown, ctx?: unknown) => Promise>(async (err: unknown) => err), -); -const retryAsyncMock = vi.hoisted(() => - vi.fn( - async ( - fn: () => Promise, - opts?: { - attempts?: number; - shouldRetry?: (err: unknown) => boolean; - }, - ) => { - const attempts = Math.max(1, opts?.attempts ?? 1); - let lastError: unknown; - for (let attempt = 1; attempt <= attempts; attempt++) { - try { - return await fn(); - } catch (error) { - lastError = error; - if (attempt >= attempts || opts?.shouldRetry?.(error) === false) { - throw error; - } - } - } - throw lastError; - }, - ), -); + +vi.mock("openclaw/plugin-sdk/outbound-runtime", async () => { + const actual = await vi.importActual( + "openclaw/plugin-sdk/outbound-runtime", + ); + return { + ...actual, + deliverOutboundPayloads: deliverOutboundPayloadsMock, + }; +}); vi.mock("../send.js", async () => { const actual = await vi.importActual("../send.js"); @@ -50,536 +23,203 @@ vi.mock("../send.js", async () => { ...actual, sendMessageDiscord: (...args: unknown[]) => sendMessageDiscordMock(...args), sendVoiceMessageDiscord: (...args: unknown[]) => sendVoiceMessageDiscordMock(...args), - sendWebhookMessageDiscord: (...args: unknown[]) => sendWebhookMessageDiscordMock(...args), }; }); -vi.mock("../send.shared.js", () => ({ - buildDiscordSendError: (err: unknown, ctx: unknown) => buildDiscordSendErrorMock(err, ctx), - sendDiscordText: (...args: unknown[]) => sendDiscordTextMock(...args), -})); - -vi.mock("openclaw/plugin-sdk/retry-runtime", async () => { - const actual = await vi.importActual( - "openclaw/plugin-sdk/retry-runtime", - ); - return { - ...actual, - retryAsync: retryAsyncMock, - }; -}); - -vi.mock("openclaw/plugin-sdk/plugin-runtime", () => ({ - getGlobalHookRunner: () => messageHookRunner, -})); - let deliverDiscordReply: typeof import("./reply-delivery.js").deliverDiscordReply; +function firstDeliverParams() { + const calls = deliverOutboundPayloadsMock.mock.calls as unknown as Array< + [ + { + cfg?: OpenClawConfig; + formatting?: unknown; + deps?: Record Promise>; + }, + ] + >; + const params = calls[0]?.[0]; + if (!params) { + throw new Error("deliverOutboundPayloads was not called"); + } + return params; +} + describe("deliverDiscordReply", () => { const runtime = {} as RuntimeEnv; const cfg = { channels: { discord: { token: "test-token" } }, } as OpenClawConfig; - const expectBotSendRetrySuccess = async (status: number, message: string) => { - sendMessageDiscordMock - .mockRejectedValueOnce(Object.assign(new Error(message), { status })) - .mockResolvedValueOnce({ messageId: "msg-1", channelId: "channel-1" }); - - await deliverDiscordReply({ - replies: [{ text: "retry me" }], - target: "channel:123", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); - }; - const createBoundThreadBindings = async ( - overrides: Partial<{ - threadId: string; - channelId: string; - targetSessionKey: string; - agentId: string; - label: string; - webhookId: string; - webhookToken: string; - introText: string; - }> = {}, - ) => { - const threadBindings = createThreadBindingManager({ - cfg: EMPTY_DISCORD_TEST_CONFIG, - accountId: "default", - persist: false, - enableSweeper: false, - }); - await threadBindings.bindTarget({ - threadId: "thread-1", - channelId: "parent-1", - targetKind: "subagent", - targetSessionKey: "agent:main:subagent:child", - agentId: "main", - webhookId: "wh_1", - webhookToken: "tok_1", - introText: "", - ...overrides, - }); - return threadBindings; - }; beforeAll(async () => { ({ deliverDiscordReply } = await import("./reply-delivery.js")); }); beforeEach(() => { - sendMessageDiscordMock.mockClear().mockResolvedValue({ + deliverOutboundPayloadsMock.mockClear(); + deliverOutboundPayloadsMock.mockResolvedValue([]); + sendMessageDiscordMock.mockReset().mockResolvedValue({ messageId: "msg-1", channelId: "channel-1", }); - sendVoiceMessageDiscordMock.mockClear().mockResolvedValue({ + sendVoiceMessageDiscordMock.mockReset().mockResolvedValue({ messageId: "voice-1", channelId: "channel-1", }); - sendWebhookMessageDiscordMock.mockClear().mockResolvedValue({ - messageId: "webhook-1", - channelId: "thread-1", - }); - sendDiscordTextMock.mockClear().mockResolvedValue({ - id: "msg-direct-1", - channel_id: "channel-1", - }); - buildDiscordSendErrorMock.mockClear().mockImplementation(async (err: unknown) => err); - retryAsyncMock.mockClear(); - messageHookRunner.hasHooks.mockReset().mockReturnValue(false); - messageHookRunner.runMessageSending.mockReset(); - threadBindingTesting.resetThreadBindingsForTests(); }); - it("routes audioAsVoice payloads through the voice API and sends text separately", async () => { + it("bridges regular replies to shared outbound with Discord runtime deps", async () => { + const rest = {} as RequestClient; + const replies = [{ text: "shared path" }]; + await deliverDiscordReply({ - replies: [ - { - text: "Hello there", - mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.mp3"], - audioAsVoice: true, - }, - ], - target: "channel:123", - token: "token", - runtime, - cfg, - textLimit: 2000, - replyToId: "reply-1", - }); - - expect(sendVoiceMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendVoiceMessageDiscordMock).toHaveBeenCalledWith( - "channel:123", - "https://example.com/voice.ogg", - expect.objectContaining({ token: "token", replyTo: "reply-1" }), - ); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); - expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( - 1, - "channel:123", - "Hello there", - expect.objectContaining({ token: "token", replyTo: "reply-1" }), - ); - expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( - 2, - "channel:123", - "", - expect.objectContaining({ - token: "token", - mediaUrl: "https://example.com/extra.mp3", - replyTo: "reply-1", - }), - ); - }); - - it("skips follow-up text when the voice payload text is blank", async () => { - await deliverDiscordReply({ - replies: [ - { - text: " ", - mediaUrl: "https://example.com/voice.ogg", - audioAsVoice: true, - }, - ], - target: "channel:456", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(sendVoiceMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).not.toHaveBeenCalled(); - }); - - it("passes mediaLocalRoots through media sends", async () => { - const mediaLocalRoots = ["/tmp/workspace-agent"] as const; - await deliverDiscordReply({ - replies: [ - { - text: "Media reply", - mediaUrls: ["https://example.com/first.png", "https://example.com/second.png"], - }, - ], - target: "channel:654", - token: "token", - runtime, - cfg, - textLimit: 2000, - mediaLocalRoots, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); - expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( - 1, - "channel:654", - "Media reply", - expect.objectContaining({ - token: "token", - mediaUrl: "https://example.com/first.png", - mediaLocalRoots, - }), - ); - expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( - 2, - "channel:654", - "", - expect.objectContaining({ - token: "token", - mediaUrl: "https://example.com/second.png", - mediaLocalRoots, - }), - ); - }); - - it("sends text first and videos as a separate media-only follow-up", async () => { - await deliverDiscordReply({ - replies: [ - { - text: "done — i kicked off a 5s Molty clip", - mediaUrls: ["/tmp/molty.mp4"], - }, - ], - target: "channel:654", - token: "token", - runtime, - cfg, - textLimit: 2000, - replyToId: "reply-1", - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); - expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( - 1, - "channel:654", - "done — i kicked off a 5s Molty clip", - expect.objectContaining({ - token: "token", - replyTo: "reply-1", - }), - ); - expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( - 2, - "channel:654", - "", - expect.objectContaining({ - token: "token", - mediaUrl: "/tmp/molty.mp4", - replyTo: "reply-1", - }), - ); - }); - - it("forwards cfg to Discord send helpers", async () => { - await deliverDiscordReply({ - replies: [{ text: "cfg path" }], + replies, target: "channel:101", token: "token", + accountId: "default", + rest, runtime, cfg, textLimit: 2000, + replyToId: "reply-1", + replyToMode: "all", }); + expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "discord", + to: "channel:101", + accountId: "default", + payloads: replies, + replyToId: "reply-1", + replyToMode: "all", + }), + ); + + const deps = firstDeliverParams().deps!; + await deps.discord("channel:101", "probe", { verbose: false }); expect(sendMessageDiscordMock).toHaveBeenCalledWith( "channel:101", - "cfg path", - expect.objectContaining({ cfg }), + "probe", + expect.objectContaining({ cfg: firstDeliverParams().cfg, token: "token", rest }), ); }); - it("honors payload reply targets even when replyToMode is off", async () => { - await deliverDiscordReply({ - replies: [ - { - text: "explicit reply", - replyToId: "reply-explicit-1", - replyToTag: true, - replyToCurrent: true, + it("passes resolved Discord formatting options as explicit delivery options", async () => { + const baseCfg = { + channels: { + discord: { + token: "test-token", + markdown: { tables: "code" }, + accounts: { + default: { + token: "account-token", + maxLinesPerMessage: 99, + streaming: { chunkMode: "length" }, + }, + }, }, - ], + }, + } as OpenClawConfig; + + await deliverDiscordReply({ + replies: [{ text: "formatted" }], + target: "channel:101", + token: "token", + accountId: "default", + runtime, + cfg: baseCfg, + textLimit: 1234, + maxLinesPerMessage: 7, + tableMode: "off", + chunkMode: "newline", + }); + + expect(firstDeliverParams().cfg).toBe(baseCfg); + expect(firstDeliverParams().formatting).toEqual({ + textLimit: 1234, + maxLinesPerMessage: 7, + tableMode: "off", + chunkMode: "newline", + }); + }); + + it("passes media roots and explicit off-mode payload reply tags to shared outbound", async () => { + const replies = [ + { + text: "explicit reply", + replyToId: "reply-explicit-1", + replyToTag: true, + }, + ]; + + await deliverDiscordReply({ + replies, target: "channel:202", token: "token", runtime, cfg, textLimit: 2000, replyToMode: "off", + mediaLocalRoots: ["/tmp/openclaw-media"], }); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:202", - "explicit reply", - expect.objectContaining({ replyTo: "reply-explicit-1" }), + expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith( + expect.objectContaining({ + payloads: replies, + replyToId: undefined, + replyToMode: "off", + mediaAccess: { localRoots: ["/tmp/openclaw-media"] }, + }), ); }); - it.each(["first", "batched"] as const)( - "uses replyToId only for the first chunk when replyToMode is %s", - async (replyToMode) => { - await deliverDiscordReply({ - replies: [ - { - text: "1234567890", - }, - ], - target: "channel:789", - token: "token", - runtime, - cfg, - textLimit: 5, - replyToId: "reply-1", - replyToMode, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(2); - expect(sendMessageDiscordMock.mock.calls).toEqual([ - expect.arrayContaining([ - "channel:789", - "12345", - expect.objectContaining({ replyTo: "reply-1" }), - ]), - expect.arrayContaining([ - "channel:789", - "67890", - expect.not.objectContaining({ replyTo: expect.anything() }), - ]), - ]); - }, - ); - - it("does not consume replyToId for replyToMode=first on whitespace-only payloads", async () => { + it("bridges Discord voice sends through the outbound dependency bag", async () => { await deliverDiscordReply({ - replies: [{ text: " " }, { text: "actual reply" }], - target: "channel:789", + replies: [{ text: "voice", mediaUrl: "https://example.com/voice.ogg", audioAsVoice: true }], + target: "channel:123", token: "token", runtime, cfg, textLimit: 2000, replyToId: "reply-1", - replyToMode: "first", }); - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:789", - "actual reply", - expect.objectContaining({ token: "token", replyTo: "reply-1" }), + const deps = firstDeliverParams().deps!; + await deps.discordVoice("channel:123", "https://example.com/voice.ogg", { + cfg, + replyTo: "reply-1", + }); + + expect(sendVoiceMessageDiscordMock).toHaveBeenCalledWith( + "channel:123", + "https://example.com/voice.ogg", + expect.objectContaining({ cfg, token: "token", replyTo: "reply-1" }), ); }); - it("preserves leading whitespace in delivered text chunks", async () => { - await deliverDiscordReply({ - replies: [{ text: " leading text" }], - target: "channel:789", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:789", - " leading text", - expect.objectContaining({ token: "token" }), - ); - }); - - it("sends text chunks in order via sendDiscordText when rest is provided", async () => { - const fakeRest = {} as import("@buape/carbon").RequestClient; - const callOrder: string[] = []; - sendDiscordTextMock.mockImplementation( - async (_rest: unknown, _channelId: unknown, text: string) => { - callOrder.push(text); - return { id: `msg-${callOrder.length}`, channel_id: "789" }; - }, - ); - - await deliverDiscordReply({ - replies: [{ text: "1234567890" }], - target: "channel:789", - token: "token", - rest: fakeRest, - runtime, - cfg, - textLimit: 5, - }); - - expect(sendMessageDiscordMock).not.toHaveBeenCalled(); - expect(sendDiscordTextMock).toHaveBeenCalledTimes(2); - expect(callOrder).toEqual(["12345", "67890"]); - expect(sendDiscordTextMock.mock.calls[0]?.[1]).toBe("789"); - expect(sendDiscordTextMock.mock.calls[1]?.[1]).toBe("789"); - }); - - it("passes maxLinesPerMessage and chunkMode through the fast path", async () => { - const fakeRest = {} as import("@buape/carbon").RequestClient; - - await deliverDiscordReply({ - replies: [{ text: Array.from({ length: 18 }, (_, index) => `line ${index + 1}`).join("\n") }], - target: "channel:789", - token: "token", - rest: fakeRest, - runtime, - cfg, - textLimit: 2000, - maxLinesPerMessage: 120, - chunkMode: "newline", - }); - - expect(sendMessageDiscordMock).not.toHaveBeenCalled(); - expect(sendDiscordTextMock).toHaveBeenCalledTimes(1); - const firstSendDiscordTextCall = sendDiscordTextMock.mock.calls[0]; - const [, , , , , maxLinesPerMessageArg, , , chunkModeArg] = firstSendDiscordTextCall ?? []; - - expect(maxLinesPerMessageArg).toBe(120); - expect(chunkModeArg).toBe("newline"); - }); - - it("falls back to sendMessageDiscord when rest is not provided", async () => { - await deliverDiscordReply({ - replies: [{ text: "single chunk" }], - target: "channel:789", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendDiscordTextMock).not.toHaveBeenCalled(); - }); - - it("retries bot send on 429 rate limit then succeeds", async () => { - await expectBotSendRetrySuccess(429, "rate limited"); - }); - - it("retries bot send on 500 server error then succeeds", async () => { - await expectBotSendRetrySuccess(500, "internal"); - }); - - it("does not retry on 4xx client errors", async () => { - const clientErr = Object.assign(new Error("bad request"), { status: 400 }); - sendMessageDiscordMock.mockRejectedValueOnce(clientErr); - - await expect( - deliverDiscordReply({ - replies: [{ text: "fail" }], - target: "channel:123", - token: "token", - runtime, - cfg, - textLimit: 2000, - }), - ).rejects.toThrow("bad request"); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - }); - - it("wraps direct REST permission errors with channel context", async () => { - const apiErr = Object.assign(new Error("Missing Permissions"), { - code: 50013, - status: 403, - }); - const wrappedErr = new Error( - "discord missing permissions in channel 789; permission probe did not identify missing ViewChannel/SendMessages (code=50013 status=403)", - ); - sendDiscordTextMock.mockRejectedValueOnce(apiErr); - buildDiscordSendErrorMock.mockResolvedValueOnce(wrappedErr); - - const fakeRest = { - post: vi.fn(), - get: vi.fn(), - } as unknown as import("@buape/carbon").RequestClient; - - await expect( - deliverDiscordReply({ - replies: [{ text: "fail" }], - target: "channel:789", - token: "token", - rest: fakeRest, - runtime, - cfg, - textLimit: 2000, - }), - ).rejects.toThrow("discord missing permissions in channel 789"); - - expect(buildDiscordSendErrorMock).toHaveBeenCalledWith( - apiErr, - expect.objectContaining({ channelId: "789", hasMedia: false }), - ); - }); - - it("throws after exhausting retry attempts", async () => { - const rateLimitErr = Object.assign(new Error("rate limited"), { status: 429 }); - sendMessageDiscordMock.mockRejectedValue(rateLimitErr); - - await expect( - deliverDiscordReply({ - replies: [{ text: "persistent failure" }], - target: "channel:123", - token: "token", - runtime, - cfg, - textLimit: 2000, - }), - ).rejects.toThrow("rate limited"); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(3); - }); - - it("delivers remaining chunks after a mid-sequence retry", async () => { - sendMessageDiscordMock - .mockResolvedValueOnce({ messageId: "c1" }) - .mockRejectedValueOnce(Object.assign(new Error("rate limited"), { status: 429 })) - .mockResolvedValueOnce({ messageId: "c2-retry" }) - .mockResolvedValueOnce({ messageId: "c3" }); - - await deliverDiscordReply({ - replies: [{ text: "A".repeat(6) }], - target: "channel:123", - token: "token", - runtime, - cfg, - textLimit: 2, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(4); - }); - - it("sends bound-session text replies through webhook delivery", async () => { - const threadBindings = await createBoundThreadBindings({ label: "codex-refactor" }); + it("rewrites bound thread replies to parent target plus thread id and persona", async () => { + const threadBindings = { + listBySessionKey: vi.fn(() => [ + { + accountId: "default", + channelId: "parent-1", + threadId: "thread-1", + targetSessionKey: "agent:main:subagent:child", + agentId: "main", + label: "child", + webhookId: "wh_1", + webhookToken: "tok_1", + }, + ]), + touchThread: vi.fn(), + }; await deliverDiscordReply({ replies: [{ text: "Hello from subagent" }], target: "channel:thread-1", token: "token", + accountId: "default", runtime, cfg, textLimit: 2000, @@ -588,291 +228,17 @@ describe("deliverDiscordReply", () => { threadBindings, }); - expect(sendWebhookMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendWebhookMessageDiscordMock).toHaveBeenCalledWith( - "Hello from subagent", + expect(deliverOutboundPayloadsMock).toHaveBeenCalledWith( expect.objectContaining({ - cfg, - webhookId: "wh_1", - webhookToken: "tok_1", - accountId: "default", + to: "channel:parent-1", threadId: "thread-1", - replyTo: "reply-1", + replyToId: "reply-1", + identity: expect.objectContaining({ name: "🤖 child" }), + session: expect.objectContaining({ + key: "agent:main:subagent:child", + agentId: "main", + }), }), ); - expect(sendMessageDiscordMock).not.toHaveBeenCalled(); - }); - - it("touches bound-thread activity after outbound delivery", async () => { - vi.useFakeTimers(); - try { - vi.setSystemTime(new Date("2026-02-20T00:00:00.000Z")); - const threadBindings = await createBoundThreadBindings(); - vi.setSystemTime(new Date("2026-02-20T00:02:00.000Z")); - - await deliverDiscordReply({ - replies: [{ text: "Activity ping" }], - target: "channel:thread-1", - token: "token", - runtime, - cfg, - textLimit: 2000, - sessionKey: "agent:main:subagent:child", - threadBindings, - }); - - expect(threadBindings.getByThreadId("thread-1")?.lastActivityAt).toBe( - new Date("2026-02-20T00:02:00.000Z").getTime(), - ); - } finally { - vi.useRealTimers(); - } - }); - - it("falls back to bot send when webhook delivery fails", async () => { - const threadBindings = await createBoundThreadBindings(); - sendWebhookMessageDiscordMock.mockRejectedValueOnce(new Error("rate limited")); - - await deliverDiscordReply({ - replies: [{ text: "Fallback path" }], - target: "channel:thread-1", - token: "token", - accountId: "default", - runtime, - cfg, - textLimit: 2000, - sessionKey: "agent:main:subagent:child", - threadBindings, - }); - - expect(sendWebhookMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendWebhookMessageDiscordMock.mock.calls[0]?.[1]?.cfg).toBe(cfg); - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:thread-1", - "Fallback path", - expect.objectContaining({ token: "token", accountId: "default" }), - ); - }); - - it("does not use thread webhook when outbound target is not a bound thread", async () => { - const threadBindings = await createBoundThreadBindings(); - - await deliverDiscordReply({ - replies: [{ text: "Parent channel delivery" }], - target: "channel:parent-1", - token: "token", - accountId: "default", - runtime, - cfg, - textLimit: 2000, - sessionKey: "agent:main:subagent:child", - threadBindings, - }); - - expect(sendWebhookMessageDiscordMock).not.toHaveBeenCalled(); - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:parent-1", - "Parent channel delivery", - expect.objectContaining({ token: "token", accountId: "default" }), - ); - }); - - it("replaces reply text with message_sending hook content", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockResolvedValue({ content: "filtered text" }); - - await deliverDiscordReply({ - replies: [{ text: "raw secret" }], - target: "channel:hook-1", - token: "token", - accountId: "acc-1", - runtime, - cfg, - textLimit: 2000, - }); - - expect(messageHookRunner.runMessageSending).toHaveBeenCalledWith( - expect.objectContaining({ - to: "hook-1", - content: "raw secret", - metadata: expect.objectContaining({ channel: "discord" }), - }), - expect.objectContaining({ - channelId: "discord", - accountId: "acc-1", - conversationId: "hook-1", - }), - ); - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:hook-1", - "filtered text", - expect.anything(), - ); - }); - - it("uses the raw Discord target as hook destination for DMs", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockResolvedValue({ content: "dm filtered" }); - - await deliverDiscordReply({ - replies: [{ text: "dm raw" }], - target: "user:U123", - token: "token", - accountId: "acc-1", - runtime, - cfg, - textLimit: 2000, - }); - - expect(messageHookRunner.runMessageSending).toHaveBeenCalledWith( - expect.objectContaining({ - to: "user:U123", - content: "dm raw", - }), - expect.objectContaining({ - channelId: "discord", - accountId: "acc-1", - conversationId: "user:U123", - }), - ); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "user:U123", - "dm filtered", - expect.anything(), - ); - }); - - it("reports replyToId to hooks only while a single-use fallback reply is still available", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockResolvedValue({}); - - await deliverDiscordReply({ - replies: [{ text: "first" }, { text: "second" }], - target: "channel:hook-thread", - token: "token", - runtime, - cfg, - textLimit: 2000, - replyToId: "reply-1", - replyToMode: "first", - }); - - expect(messageHookRunner.runMessageSending).toHaveBeenCalledTimes(2); - expect(messageHookRunner.runMessageSending.mock.calls[0]?.[0]).toEqual( - expect.objectContaining({ replyToId: "reply-1" }), - ); - expect(messageHookRunner.runMessageSending.mock.calls[1]?.[0]).toEqual( - expect.not.objectContaining({ replyToId: expect.anything() }), - ); - expect(sendMessageDiscordMock.mock.calls[0]?.[2]).toEqual( - expect.objectContaining({ replyTo: "reply-1" }), - ); - expect(sendMessageDiscordMock.mock.calls[1]?.[2]).toEqual( - expect.not.objectContaining({ replyTo: expect.anything() }), - ); - }); - - it("reports explicit payload reply targets to hooks when replyToMode is off", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockResolvedValue({}); - - await deliverDiscordReply({ - replies: [ - { - text: "explicit", - replyToId: "reply-explicit-1", - replyToTag: true, - }, - ], - target: "channel:hook-explicit", - token: "token", - runtime, - cfg, - textLimit: 2000, - replyToMode: "off", - }); - - expect(messageHookRunner.runMessageSending.mock.calls[0]?.[0]).toEqual( - expect.objectContaining({ replyToId: "reply-explicit-1" }), - ); - expect(sendMessageDiscordMock.mock.calls[0]?.[2]).toEqual( - expect.objectContaining({ replyTo: "reply-explicit-1" }), - ); - }); - - it("skips delivery when message_sending hook cancels the payload", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockResolvedValue({ cancel: true }); - - await deliverDiscordReply({ - replies: [{ text: "should not send" }], - target: "channel:hook-2", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(sendMessageDiscordMock).not.toHaveBeenCalled(); - }); - - it("skips delivery when hook blanks out a text-only reply", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockResolvedValue({ content: " " }); - - await deliverDiscordReply({ - replies: [{ text: "hello" }], - target: "channel:hook-3", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(sendMessageDiscordMock).not.toHaveBeenCalled(); - }); - - it("continues delivery when message_sending hook throws", async () => { - messageHookRunner.hasHooks.mockImplementation((name: string) => name === "message_sending"); - messageHookRunner.runMessageSending.mockRejectedValue(new Error("plugin exploded")); - const errorRuntime = { error: vi.fn() } as unknown as RuntimeEnv; - - await deliverDiscordReply({ - replies: [{ text: "should still send" }], - target: "channel:hook-4", - token: "token", - runtime: errorRuntime, - cfg, - textLimit: 2000, - }); - - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); - expect(sendMessageDiscordMock).toHaveBeenCalledWith( - "channel:hook-4", - "should still send", - expect.anything(), - ); - expect((errorRuntime.error as ReturnType).mock.calls[0]?.[0]).toMatch( - /plugin exploded/, - ); - }); - - it("skips hook resolution when no message_sending hooks are registered", async () => { - messageHookRunner.hasHooks.mockReturnValue(false); - - await deliverDiscordReply({ - replies: [{ text: "no hook path" }], - target: "channel:hook-5", - token: "token", - runtime, - cfg, - textLimit: 2000, - }); - - expect(messageHookRunner.runMessageSending).not.toHaveBeenCalled(); - expect(sendMessageDiscordMock).toHaveBeenCalledTimes(1); }); }); diff --git a/extensions/discord/src/monitor/reply-delivery.ts b/extensions/discord/src/monitor/reply-delivery.ts index c2dadca7a8c..588262cbae8 100644 --- a/extensions/discord/src/monitor/reply-delivery.ts +++ b/extensions/discord/src/monitor/reply-delivery.ts @@ -1,33 +1,27 @@ import type { RequestClient } from "@buape/carbon"; import { resolveAgentAvatar } from "openclaw/plugin-sdk/agent-runtime"; -import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; -import type { MarkdownTableMode, ReplyToMode } from "openclaw/plugin-sdk/config-runtime"; -import { getGlobalHookRunner } from "openclaw/plugin-sdk/plugin-runtime"; +import type { + MarkdownTableMode, + OpenClawConfig, + ReplyToMode, +} from "openclaw/plugin-sdk/config-runtime"; +import type { OutboundMediaAccess } from "openclaw/plugin-sdk/media-runtime"; +import { + buildOutboundSessionContext, + deliverOutboundPayloads, + type OutboundDeliveryFormattingOptions, + type OutboundIdentity, + type OutboundSendDeps, +} from "openclaw/plugin-sdk/outbound-runtime"; import type { ChunkMode } from "openclaw/plugin-sdk/reply-chunking"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-dispatch-runtime"; -import { - resolveSendableOutboundReplyParts, - resolveTextChunksWithFallback, - sendMediaWithLeadingCaption, -} from "openclaw/plugin-sdk/reply-payload"; -import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference"; -import { - resolveRetryConfig, - retryAsync, - type RetryConfig, - type RetryRunner, -} from "openclaw/plugin-sdk/retry-runtime"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; -import { convertMarkdownTables, normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; -import { resolveDiscordAccount } from "../accounts.js"; -import { chunkDiscordTextWithMode } from "../chunk.js"; -import { isLikelyDiscordVideoMedia } from "../media-detection.js"; -import { createDiscordRetryRunner } from "../retry.js"; -import { sendMessageDiscord, sendVoiceMessageDiscord, sendWebhookMessageDiscord } from "../send.js"; -import { buildDiscordSendError, sendDiscordText } from "../send.shared.js"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import { sendMessageDiscord, sendVoiceMessageDiscord } from "../send.js"; export type DiscordThreadBindingLookupRecord = { accountId: string; + channelId: string; threadId: string; agentId: string; label?: string; @@ -40,162 +34,6 @@ export type DiscordThreadBindingLookup = { touchThread?: (params: { threadId: string; at?: number; persist?: boolean }) => unknown; }; -type ResolvedRetryConfig = Required; - -const DISCORD_DELIVERY_RETRY_DEFAULTS: ResolvedRetryConfig = { - attempts: 3, - minDelayMs: 1000, - maxDelayMs: 30_000, - jitter: 0, -}; - -function isRetryableDiscordError(err: unknown): boolean { - const status = (err as { status?: number }).status ?? (err as { statusCode?: number }).statusCode; - return status === 429 || (status !== undefined && status >= 500); -} - -function getDiscordRetryAfterMs(err: unknown): number | undefined { - if (!err || typeof err !== "object") { - return undefined; - } - if ( - "retryAfter" in err && - typeof err.retryAfter === "number" && - Number.isFinite(err.retryAfter) - ) { - return err.retryAfter * 1000; - } - const retryAfterRaw = (err as { headers?: Record }).headers?.["retry-after"]; - if (!retryAfterRaw) { - return undefined; - } - const retryAfterMs = Number(retryAfterRaw) * 1000; - return Number.isFinite(retryAfterMs) ? retryAfterMs : undefined; -} - -function resolveDeliveryRetryConfig(retry?: RetryConfig): ResolvedRetryConfig { - return resolveRetryConfig(DISCORD_DELIVERY_RETRY_DEFAULTS, retry); -} - -async function sendWithRetry( - fn: () => Promise, - retryConfig: ResolvedRetryConfig, -): Promise { - await retryAsync(fn, { - ...retryConfig, - shouldRetry: (err) => isRetryableDiscordError(err), - retryAfterMs: getDiscordRetryAfterMs, - }); -} - -async function sendDiscordMediaOnly(params: { - target: string; - cfg: OpenClawConfig; - token: string; - rest?: RequestClient; - mediaUrl: string; - accountId?: string; - mediaLocalRoots?: readonly string[]; - replyTo?: string; - retryConfig: ResolvedRetryConfig; -}): Promise { - await sendWithRetry( - () => - sendMessageDiscord(params.target, "", { - cfg: params.cfg, - token: params.token, - rest: params.rest, - mediaUrl: params.mediaUrl, - accountId: params.accountId, - mediaLocalRoots: params.mediaLocalRoots, - replyTo: params.replyTo, - }), - params.retryConfig, - ); -} - -async function sendDiscordMediaBatch(params: { - target: string; - cfg: OpenClawConfig; - token: string; - rest?: RequestClient; - mediaUrls: string[]; - accountId?: string; - mediaLocalRoots?: readonly string[]; - replyTo: () => string | undefined; - retryConfig: ResolvedRetryConfig; -}): Promise { - await sendMediaWithLeadingCaption({ - mediaUrls: params.mediaUrls, - caption: "", - send: async ({ mediaUrl }) => { - await sendDiscordMediaOnly({ - target: params.target, - cfg: params.cfg, - token: params.token, - rest: params.rest, - mediaUrl, - accountId: params.accountId, - mediaLocalRoots: params.mediaLocalRoots, - replyTo: params.replyTo(), - retryConfig: params.retryConfig, - }); - }, - }); -} - -async function sendDiscordPayloadText(params: { - cfg: OpenClawConfig; - target: string; - text: string; - token: string; - rest?: RequestClient; - accountId?: string; - textLimit?: number; - maxLinesPerMessage?: number; - binding?: DiscordThreadBindingLookupRecord; - chunkMode?: ChunkMode; - username?: string; - avatarUrl?: string; - channelId?: string; - request?: RetryRunner; - retryConfig: ResolvedRetryConfig; - resolveReplyTo: () => string | undefined; -}): Promise { - const mode = params.chunkMode ?? "length"; - const chunkLimit = Math.min(params.textLimit ?? 2000, 2000); - const chunks = resolveTextChunksWithFallback( - params.text, - chunkDiscordTextWithMode(params.text, { - maxChars: chunkLimit, - maxLines: params.maxLinesPerMessage, - chunkMode: mode, - }), - ); - for (const chunk of chunks) { - if (!chunk.trim()) { - continue; - } - await sendDiscordChunkWithFallback({ - cfg: params.cfg, - target: params.target, - text: chunk, - token: params.token, - rest: params.rest, - accountId: params.accountId, - maxLinesPerMessage: params.maxLinesPerMessage, - replyTo: params.resolveReplyTo(), - binding: params.binding, - chunkMode: params.chunkMode, - username: params.username, - avatarUrl: params.avatarUrl, - channelId: params.channelId, - request: params.request, - retryConfig: params.retryConfig, - }); - } -} - function resolveTargetChannelId(target: string): string | undefined { if (!target.startsWith("channel:")) { return undefined; @@ -213,176 +51,107 @@ function resolveBoundThreadBinding(params: { if (!params.threadBindings || !sessionKey) { return undefined; } - const bindings = params.threadBindings.listBySessionKey(sessionKey); - if (bindings.length === 0) { - return undefined; - } const targetChannelId = resolveTargetChannelId(params.target); if (!targetChannelId) { return undefined; } - return bindings.find((entry) => entry.threadId === targetChannelId); + return params.threadBindings + .listBySessionKey(sessionKey) + .find((entry) => entry.threadId === targetChannelId); } -function createPayloadReplyToResolver(params: { - payload: ReplyPayload; - replyToMode: ReplyToMode; - resolveFallbackReplyTo: () => string | undefined; -}): () => string | undefined { - const payloadReplyTo = normalizeOptionalString(params.payload.replyToId); - const allowExplicitReplyWhenOff = Boolean( - payloadReplyTo && (params.payload.replyToTag || params.payload.replyToCurrent), - ); - - if (!payloadReplyTo || (params.replyToMode === "off" && !allowExplicitReplyWhenOff)) { - return params.resolveFallbackReplyTo; - } - - let payloadReplyUsed = false; - return () => { - if (params.replyToMode === "all") { - return payloadReplyTo; - } - if (payloadReplyUsed) { - return undefined; - } - payloadReplyUsed = true; - return payloadReplyTo; - }; -} - -function resolveMessageSendingHookReplyToId(params: { - payload: ReplyPayload; - replyToMode: ReplyToMode; - fallbackReplyTo: string | undefined; - fallbackReplyUsed: boolean; -}): string | undefined { - const payloadReplyTo = normalizeOptionalString(params.payload.replyToId); - const allowExplicitReplyWhenOff = Boolean( - payloadReplyTo && (params.payload.replyToTag || params.payload.replyToCurrent), - ); - if (payloadReplyTo && (params.replyToMode !== "off" || allowExplicitReplyWhenOff)) { - return payloadReplyTo; - } - if (!params.fallbackReplyTo) { - return undefined; - } - if (!isSingleUseReplyToMode(params.replyToMode)) { - return params.fallbackReplyTo; - } - return params.fallbackReplyUsed ? undefined : params.fallbackReplyTo; -} - -function resolveBindingPersona( +function resolveBindingIdentity( cfg: OpenClawConfig, binding: DiscordThreadBindingLookupRecord | undefined, -): { - username?: string; - avatarUrl?: string; -} { +): OutboundIdentity | undefined { if (!binding) { - return {}; + return undefined; } const baseLabel = binding.label?.trim() || binding.agentId; - const username = (`🤖 ${baseLabel}`.trim() || "🤖 agent").slice(0, 80); - - let avatarUrl: string | undefined; + const identity: OutboundIdentity = { + name: (`🤖 ${baseLabel}`.trim() || "🤖 agent").slice(0, 80), + }; try { const avatar = resolveAgentAvatar(cfg, binding.agentId); if (avatar.kind === "remote") { - avatarUrl = avatar.url; + identity.avatarUrl = avatar.url; } } catch { - avatarUrl = undefined; + // Avatar is cosmetic; delivery should not depend on local identity config. } - return { username, avatarUrl }; + return identity; } -async function sendDiscordChunkWithFallback(params: { +function createDiscordDeliveryDeps(params: { cfg: OpenClawConfig; - target: string; - text: string; token: string; - accountId?: string; - maxLinesPerMessage?: number; rest?: RequestClient; - replyTo?: string; - binding?: DiscordThreadBindingLookupRecord; - chunkMode?: ChunkMode; - username?: string; - avatarUrl?: string; - /** Pre-resolved channel ID to bypass redundant resolution per chunk. */ - channelId?: string; - /** Pre-created retry runner to avoid creating one per chunk. */ - request?: RetryRunner; - /** Pre-resolved retry config (account-level). */ - retryConfig: ResolvedRetryConfig; -}) { - if (!params.text.trim()) { - return; - } - const text = params.text; - const binding = params.binding; - if (binding?.webhookId && binding?.webhookToken) { - try { - await sendWebhookMessageDiscord(text, { - cfg: params.cfg, - webhookId: binding.webhookId, - webhookToken: binding.webhookToken, - accountId: binding.accountId, - threadId: binding.threadId, - replyTo: params.replyTo, - username: params.username, - avatarUrl: params.avatarUrl, - }); - return; - } catch { - // Fall through to the standard bot sender path. - } - } - // When channelId and request are pre-resolved, send directly via sendDiscordText - // to avoid per-chunk overhead (channel-type GET, re-chunking, client creation) - // that can cause ordering issues under queue contention or rate limiting. - if (params.channelId && params.request && params.rest) { - const { channelId, request, rest } = params; - try { - await sendWithRetry( - () => - sendDiscordText( - rest, - channelId, - text, - params.replyTo, - request, - params.maxLinesPerMessage, - undefined, - undefined, - params.chunkMode, - ), - params.retryConfig, - ); - } catch (err) { - throw await buildDiscordSendError(err, { - channelId, - cfg: params.cfg, - rest, - token: params.token, - hasMedia: false, - }); - } - return; - } - await sendWithRetry( - () => - sendMessageDiscord(params.target, text, { - cfg: params.cfg, +}): OutboundSendDeps { + return { + discord: (to: string, text: string, opts?: Parameters[2]) => + sendMessageDiscord(to, text, { + ...opts, + cfg: opts?.cfg ?? params.cfg, token: params.token, rest: params.rest, - accountId: params.accountId, - replyTo: params.replyTo, }), - params.retryConfig, - ); + discordVoice: ( + to: string, + audioPath: string, + opts?: Parameters[2], + ) => + sendVoiceMessageDiscord(to, audioPath, { + ...opts, + cfg: opts?.cfg ?? params.cfg, + token: params.token, + rest: params.rest, + }), + }; +} + +type DiscordDeliveryOptions = { + to: string; + threadId?: string; + agentId?: string; + identity?: OutboundIdentity; + mediaAccess?: OutboundMediaAccess; + replyToMode: ReplyToMode; + formatting: OutboundDeliveryFormattingOptions; +}; + +function resolveDiscordDeliveryOptions(params: { + cfg: OpenClawConfig; + target: string; + sessionKey?: string; + threadBindings?: DiscordThreadBindingLookup; + textLimit: number; + maxLinesPerMessage?: number; + tableMode?: MarkdownTableMode; + chunkMode?: ChunkMode; + replyToMode?: ReplyToMode; + mediaLocalRoots?: readonly string[]; +}): DiscordDeliveryOptions { + const binding = resolveBoundThreadBinding({ + threadBindings: params.threadBindings, + sessionKey: params.sessionKey, + target: params.target, + }); + return { + to: binding ? `channel:${binding.channelId}` : params.target, + threadId: binding?.threadId, + agentId: binding?.agentId, + identity: resolveBindingIdentity(params.cfg, binding), + mediaAccess: params.mediaLocalRoots?.length + ? { localRoots: params.mediaLocalRoots } + : undefined, + replyToMode: params.replyToMode ?? "all", + formatting: { + textLimit: params.textLimit, + maxLinesPerMessage: params.maxLinesPerMessage, + tableMode: params.tableMode, + chunkMode: params.chunkMode, + }, + }; } export async function deliverDiscordReply(params: { @@ -403,187 +172,32 @@ export async function deliverDiscordReply(params: { threadBindings?: DiscordThreadBindingLookup; mediaLocalRoots?: readonly string[]; }) { - const replyTo = normalizeOptionalString(params.replyToId); - const replyToMode = params.replyToMode ?? "all"; - const replyOnce = isSingleUseReplyToMode(replyToMode); - let replyUsed = false; - const resolveReplyTo = () => { - if (!replyTo) { - return undefined; - } - if (!replyOnce) { - return replyTo; - } - if (replyUsed) { - return undefined; - } - replyUsed = true; - return replyTo; - }; - const binding = resolveBoundThreadBinding({ - threadBindings: params.threadBindings, - sessionKey: params.sessionKey, - target: params.target, + void params.runtime; + + const delivery = resolveDiscordDeliveryOptions(params); + + await deliverOutboundPayloads({ + cfg: params.cfg, + channel: "discord", + to: delivery.to, + accountId: params.accountId, + payloads: params.replies, + replyToId: normalizeOptionalString(params.replyToId), + replyToMode: delivery.replyToMode, + formatting: delivery.formatting, + threadId: delivery.threadId, + identity: delivery.identity, + deps: createDiscordDeliveryDeps({ + cfg: params.cfg, + token: params.token, + rest: params.rest, + }), + mediaAccess: delivery.mediaAccess, + session: buildOutboundSessionContext({ + cfg: params.cfg, + sessionKey: params.sessionKey, + agentId: delivery.agentId, + requesterAccountId: params.accountId, + }), }); - const persona = resolveBindingPersona(params.cfg, binding); - // Pre-resolve channel ID and retry runner once to avoid per-chunk overhead. - // This eliminates redundant channel-type GET requests and client creation that - // can cause ordering issues when multiple chunks share the RequestClient queue. - const channelId = resolveTargetChannelId(params.target); - const account = resolveDiscordAccount({ cfg: params.cfg, accountId: params.accountId }); - const retryConfig = resolveDeliveryRetryConfig(account.config.retry); - const request: RetryRunner | undefined = channelId - ? createDiscordRetryRunner({ configRetry: account.config.retry }) - : undefined; - const hookRunner = getGlobalHookRunner(); - const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false; - const hookConversationId = channelId ?? params.target; - let deliveredAny = false; - for (const payload of params.replies) { - const resolvePayloadReplyTo = createPayloadReplyToResolver({ - payload, - replyToMode, - resolveFallbackReplyTo: resolveReplyTo, - }); - const tableMode = params.tableMode ?? "code"; - let effectiveText = payload.text ?? ""; - if (hasMessageSendingHooks) { - try { - const hookResult = await hookRunner?.runMessageSending( - { - to: hookConversationId, - content: effectiveText, - replyToId: resolveMessageSendingHookReplyToId({ - payload, - replyToMode, - fallbackReplyTo: replyTo, - fallbackReplyUsed: replyUsed, - }), - metadata: { - channel: "discord", - mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined), - }, - }, - { - channelId: "discord", - accountId: params.accountId, - conversationId: hookConversationId, - }, - ); - if (hookResult?.cancel) { - continue; - } - if (typeof hookResult?.content === "string") { - effectiveText = hookResult.content; - } - } catch (error) { - params.runtime.error?.( - `discord: message_sending hook failed: ${error instanceof Error ? error.message : String(error)}`, - ); - } - } - const reply = resolveSendableOutboundReplyParts( - { ...payload, text: effectiveText }, - { text: convertMarkdownTables(effectiveText, tableMode) }, - ); - if (!reply.hasContent) { - continue; - } - const sendReplyText = async () => - sendDiscordPayloadText({ - cfg: params.cfg, - target: params.target, - text: reply.text, - token: params.token, - rest: params.rest, - accountId: params.accountId, - textLimit: params.textLimit, - maxLinesPerMessage: params.maxLinesPerMessage, - resolveReplyTo: resolvePayloadReplyTo, - binding, - chunkMode: params.chunkMode, - username: persona.username, - avatarUrl: persona.avatarUrl, - channelId, - request, - retryConfig, - }); - const sendReplyMediaBatch = async (mediaUrls: string[]) => - sendDiscordMediaBatch({ - target: params.target, - cfg: params.cfg, - token: params.token, - rest: params.rest, - mediaUrls, - accountId: params.accountId, - mediaLocalRoots: params.mediaLocalRoots, - replyTo: resolvePayloadReplyTo, - retryConfig, - }); - if (!reply.hasMedia) { - await sendReplyText(); - if (reply.text.trim()) { - deliveredAny = true; - } - continue; - } - - const firstMedia = reply.mediaUrls[0]; - if (!firstMedia) { - continue; - } - // Voice message path: audioAsVoice flag routes through sendVoiceMessageDiscord. - if (payload.audioAsVoice) { - const replyTo = resolvePayloadReplyTo(); - await sendVoiceMessageDiscord(params.target, firstMedia, { - cfg: params.cfg, - token: params.token, - rest: params.rest, - accountId: params.accountId, - replyTo, - }); - deliveredAny = true; - // Voice messages cannot include text; send remaining text separately if present. - await sendReplyText(); - // Additional media items are sent as regular attachments (voice is single-file only). - await sendReplyMediaBatch(reply.mediaUrls.slice(1)); - continue; - } - - const shouldSplitVideoMediaReply = - reply.text.trim().length > 0 && - reply.mediaUrls.some((mediaUrl) => isLikelyDiscordVideoMedia(mediaUrl)); - if (shouldSplitVideoMediaReply) { - await sendReplyText(); - await sendReplyMediaBatch(reply.mediaUrls); - deliveredAny = true; - continue; - } - - await sendMediaWithLeadingCaption({ - mediaUrls: reply.mediaUrls, - caption: reply.text, - send: async ({ mediaUrl, caption }) => { - const replyTo = resolvePayloadReplyTo(); - await sendWithRetry( - () => - sendMessageDiscord(params.target, caption ?? "", { - cfg: params.cfg, - token: params.token, - rest: params.rest, - mediaUrl, - accountId: params.accountId, - mediaLocalRoots: params.mediaLocalRoots, - replyTo, - }), - retryConfig, - ); - }, - }); - deliveredAny = true; - } - - if (binding && deliveredAny) { - params.threadBindings?.touchThread?.({ threadId: binding.threadId }); - } } diff --git a/extensions/discord/src/outbound-adapter.test-harness.ts b/extensions/discord/src/outbound-adapter.test-harness.ts index 338644fe7bb..97cb07b895c 100644 --- a/extensions/discord/src/outbound-adapter.test-harness.ts +++ b/extensions/discord/src/outbound-adapter.test-harness.ts @@ -8,6 +8,7 @@ type DiscordOutboundHoisted = { sendDiscordComponentMessageMock: AsyncUnknownMock; sendPollDiscordMock: AsyncUnknownMock; sendWebhookMessageDiscordMock: AsyncUnknownMock; + sendVoiceMessageDiscordMock: AsyncUnknownMock; getThreadBindingManagerMock: UnknownMock; }; @@ -28,12 +29,14 @@ export function createDiscordOutboundHoisted(): DiscordOutboundHoisted { const sendDiscordComponentMessageMock = vi.fn(); const sendPollDiscordMock = vi.fn(); const sendWebhookMessageDiscordMock = vi.fn(); + const sendVoiceMessageDiscordMock = vi.fn(); const getThreadBindingManagerMock = vi.fn(); return { sendMessageDiscordMock, sendDiscordComponentMessageMock, sendPollDiscordMock, sendWebhookMessageDiscordMock, + sendVoiceMessageDiscordMock, getThreadBindingManagerMock, }; } @@ -68,6 +71,11 @@ export async function createDiscordSendModuleMock( Parameters, ReturnType >(hoisted.sendWebhookMessageDiscordMock, ...args), + sendVoiceMessageDiscord: (...args: Parameters) => + invokeMock< + Parameters, + ReturnType + >(hoisted.sendVoiceMessageDiscordMock, ...args), }; } @@ -115,6 +123,9 @@ export async function installDiscordOutboundModuleSpies(hoisted: DiscordOutbound vi.spyOn(sendModule, "sendWebhookMessageDiscord").mockImplementation( mockedSendModule.sendWebhookMessageDiscord, ); + vi.spyOn(sendModule, "sendVoiceMessageDiscord").mockImplementation( + mockedSendModule.sendVoiceMessageDiscord, + ); const sendComponentsModule = await import("./send.components.js"); const mockedSendComponentsModule = await createDiscordSendComponentsModuleMock( @@ -152,6 +163,10 @@ export function resetDiscordOutboundMocks(hoisted: DiscordOutboundHoisted) { messageId: "msg-webhook-1", channelId: "thread-1", }); + hoisted.sendVoiceMessageDiscordMock.mockReset().mockResolvedValue({ + messageId: "voice-1", + channelId: "ch-1", + }); hoisted.getThreadBindingManagerMock.mockReset().mockReturnValue(null); } @@ -187,5 +202,6 @@ export function mockDiscordBoundThreadManager(hoisted: DiscordOutboundHoisted) { boundBy: "system", boundAt: Date.now(), }), + touchThread: vi.fn(), }); } diff --git a/extensions/discord/src/outbound-adapter.test.ts b/extensions/discord/src/outbound-adapter.test.ts index 86749f8f9b6..f4dfca39357 100644 --- a/extensions/discord/src/outbound-adapter.test.ts +++ b/extensions/discord/src/outbound-adapter.test.ts @@ -1,4 +1,4 @@ -import { beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { createDiscordOutboundHoisted, expectDiscordThreadBotSend, @@ -72,6 +72,62 @@ describe("discordOutbound", () => { }); }); + it("forwards explicit formatting options to Discord text sends", async () => { + await discordOutbound.sendText?.({ + cfg: {}, + to: "channel:123456", + text: "formatted", + accountId: "default", + formatting: { + textLimit: 1234, + maxLinesPerMessage: 7, + tableMode: "off", + chunkMode: "newline", + }, + }); + + expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith( + "channel:123456", + "formatted", + expect.objectContaining({ + textLimit: 1234, + maxLinesPerMessage: 7, + tableMode: "off", + chunkMode: "newline", + }), + ); + }); + + it.each([500, 429])("retries transient Discord text send status %i", async (status) => { + hoisted.sendMessageDiscordMock + .mockRejectedValueOnce(Object.assign(new Error(`discord ${status}`), { status })) + .mockResolvedValueOnce({ + messageId: "msg-retry-ok", + channelId: "ch-1", + }); + + const result = await discordOutbound.sendText?.({ + cfg: { + channels: { + discord: { + token: "test-token", + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }, + }, + }, + to: "channel:123456", + text: "retry me", + accountId: "default", + }); + + expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledTimes(2); + expect(result).toEqual({ + channel: "discord", + messageId: "msg-retry-ok", + channelId: "ch-1", + }); + }); + it("uses webhook persona delivery for bound thread text replies", async () => { mockDiscordBoundThreadManager(hoisted); const cfg = { @@ -189,6 +245,155 @@ describe("discordOutbound", () => { }); }); + it("routes audioAsVoice payloads through the Discord voice send helper", async () => { + const result = await discordOutbound.sendPayload?.({ + cfg: {}, + to: "channel:123456", + text: "", + payload: { + text: "voice note", + mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.png"], + audioAsVoice: true, + }, + accountId: "default", + replyToId: "reply-1", + replyToMode: "first", + }); + + expect(hoisted.sendVoiceMessageDiscordMock).toHaveBeenCalledWith( + "channel:123456", + "https://example.com/voice.ogg", + expect.objectContaining({ + accountId: "default", + replyTo: "reply-1", + }), + ); + expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith( + "channel:123456", + "voice note", + expect.objectContaining({ + accountId: "default", + replyTo: undefined, + }), + ); + expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith( + "channel:123456", + "", + expect.objectContaining({ + accountId: "default", + mediaUrl: "https://example.com/extra.png", + replyTo: undefined, + }), + ); + expect(result).toEqual({ + channel: "discord", + messageId: "msg-1", + channelId: "ch-1", + }); + }); + + it("keeps replyToId on every internal audioAsVoice send when replyToMode is all", async () => { + await discordOutbound.sendPayload?.({ + cfg: {}, + to: "channel:123456", + text: "", + payload: { + text: "voice note", + mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.png"], + audioAsVoice: true, + }, + accountId: "default", + replyToId: "reply-1", + replyToMode: "all", + }); + + expect( + (hoisted.sendVoiceMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined) + ?.replyTo, + ).toBe("reply-1"); + expect( + hoisted.sendMessageDiscordMock.mock.calls.map( + (call) => (call[2] as { replyTo?: unknown } | undefined)?.replyTo, + ), + ).toEqual(["reply-1", "reply-1"]); + }); + + it("preserves explicit audioAsVoice payload replies when replyToMode is off", async () => { + await discordOutbound.sendPayload?.({ + cfg: {}, + to: "channel:123456", + text: "", + payload: { + text: "voice note", + mediaUrls: ["https://example.com/voice.ogg", "https://example.com/extra.png"], + audioAsVoice: true, + }, + accountId: "default", + replyToId: "explicit-reply-1", + replyToMode: "off", + }); + + expect( + (hoisted.sendVoiceMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined) + ?.replyTo, + ).toBe("explicit-reply-1"); + expect( + hoisted.sendMessageDiscordMock.mock.calls.map( + (call) => (call[2] as { replyTo?: unknown } | undefined)?.replyTo, + ), + ).toEqual(["explicit-reply-1", "explicit-reply-1"]); + }); + + it("sends video captions as text before a media-only video follow-up", async () => { + await discordOutbound.sendMedia?.({ + cfg: {}, + to: "channel:123456", + text: "rendered clip", + mediaUrl: "/tmp/render.mp4", + accountId: "default", + replyToId: "reply-1", + }); + + expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith( + "channel:123456", + "rendered clip", + expect.objectContaining({ + accountId: "default", + replyTo: "reply-1", + }), + ); + expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith( + "channel:123456", + "", + expect.objectContaining({ + accountId: "default", + mediaUrl: "/tmp/render.mp4", + }), + ); + }); + + it("touches bound thread activity after shared outbound delivery succeeds", async () => { + const touchThread = vi.fn(); + hoisted.getThreadBindingManagerMock.mockReturnValue({ + getByThreadId: () => ({ threadId: "thread-1" }), + touchThread, + }); + + await discordOutbound.afterDeliverPayload?.({ + cfg: {}, + target: { + channel: "discord", + to: "channel:parent-1", + accountId: "default", + threadId: "thread-1", + }, + payload: { text: "delivered" }, + results: [{ channel: "discord", messageId: "msg-1" }], + }); + + expect(touchThread).toHaveBeenCalledWith({ threadId: "thread-1" }); + }); + it("sends component payload media sequences with the component message first", async () => { hoisted.sendDiscordComponentMessageMock.mockResolvedValueOnce({ messageId: "component-1", @@ -224,6 +429,8 @@ describe("discordOutbound", () => { payload, accountId: "default", mediaLocalRoots: ["/tmp/media"], + replyToId: "reply-1", + replyToMode: "first", }); expect(hoisted.sendDiscordComponentMessageMock).toHaveBeenCalledWith( @@ -233,6 +440,7 @@ describe("discordOutbound", () => { mediaUrl: "https://example.com/1.png", mediaLocalRoots: ["/tmp/media"], accountId: "default", + replyTo: "reply-1", }), ); expect(hoisted.sendMessageDiscordMock).toHaveBeenCalledWith( @@ -242,6 +450,7 @@ describe("discordOutbound", () => { mediaUrl: "https://example.com/2.png", mediaLocalRoots: ["/tmp/media"], accountId: "default", + replyTo: undefined, }), ); expect(result).toEqual({ @@ -251,6 +460,98 @@ describe("discordOutbound", () => { }); }); + it("keeps replyToId on every internal component media send when replyToMode is all", async () => { + const payload = await discordOutbound.renderPresentation?.({ + payload: { + text: "hello", + mediaUrls: ["https://example.com/1.png", "https://example.com/2.png"], + }, + presentation: { + blocks: [{ type: "buttons", buttons: [{ label: "Open", value: "open" }] }], + }, + ctx: { + cfg: {}, + to: "channel:123456", + }, + } as never); + + if (!payload) { + throw new Error("expected Discord presentation payload"); + } + + await discordOutbound.sendPayload?.({ + cfg: {}, + to: "channel:123456", + text: "", + payload, + accountId: "default", + replyToId: "reply-1", + replyToMode: "all", + }); + + expect( + ( + hoisted.sendDiscordComponentMessageMock.mock.calls[0]?.[2] as + | { replyTo?: unknown } + | undefined + )?.replyTo, + ).toBe("reply-1"); + expect( + (hoisted.sendMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined) + ?.replyTo, + ).toBe("reply-1"); + }); + + it("preserves explicit component payload replies when replyToMode is off", async () => { + const payload = await discordOutbound.renderPresentation?.({ + payload: { + text: "hello", + mediaUrls: ["https://example.com/1.png", "https://example.com/2.png"], + }, + presentation: { + blocks: [{ type: "buttons", buttons: [{ label: "Open", value: "open" }] }], + }, + ctx: { + cfg: {}, + to: "channel:123456", + }, + } as never); + + if (!payload) { + throw new Error("expected Discord presentation payload"); + } + + await discordOutbound.sendPayload?.({ + cfg: {}, + to: "channel:123456", + text: "", + payload, + accountId: "default", + replyToId: "explicit-reply-1", + replyToMode: "off", + }); + + expect( + ( + hoisted.sendDiscordComponentMessageMock.mock.calls[0]?.[2] as + | { replyTo?: unknown } + | undefined + )?.replyTo, + ).toBe("explicit-reply-1"); + expect( + (hoisted.sendMessageDiscordMock.mock.calls[0]?.[2] as { replyTo?: unknown } | undefined) + ?.replyTo, + ).toBe("explicit-reply-1"); + }); + + it("uses explicit maxLinesPerMessage in its adapter chunker", () => { + expect( + discordOutbound.chunker?.("line one\nline two\nline three", 2000, { + formatting: { maxLinesPerMessage: 1 }, + }), + ).toEqual(["line one", "line two", "line three"]); + }); + it("renders channelData Discord components on payload sends", async () => { await discordOutbound.sendPayload?.({ cfg: {}, @@ -306,6 +607,34 @@ describe("discordOutbound", () => { ); }); + it("uses a single implicit reply for chunked approval payload fallbacks", async () => { + await discordOutbound.sendPayload?.({ + cfg: {}, + to: "channel:123456", + text: "", + payload: { + text: "line one\nline two", + channelData: { + execApproval: { + approvalId: "req-1", + approvalSlug: "req-1", + }, + }, + }, + accountId: "default", + replyToId: "reply-1", + replyToIdSource: "implicit", + replyToMode: "first", + formatting: { maxLinesPerMessage: 1 }, + }); + + expect( + hoisted.sendMessageDiscordMock.mock.calls.map( + (call) => (call[2] as { replyTo?: unknown } | undefined)?.replyTo, + ), + ).toEqual(["reply-1", undefined]); + }); + it("leaves non-approval mentions unchanged", async () => { await discordOutbound.sendPayload?.({ cfg: {}, diff --git a/extensions/discord/src/outbound-adapter.ts b/extensions/discord/src/outbound-adapter.ts index 1ee70643ce1..bbf36e641cd 100644 --- a/extensions/discord/src/outbound-adapter.ts +++ b/extensions/discord/src/outbound-adapter.ts @@ -1,5 +1,4 @@ import { - attachChannelToResult, type ChannelOutboundAdapter, createAttachedChannelResultAdapter, } from "openclaw/plugin-sdk/channel-send-result"; @@ -8,99 +7,39 @@ import { resolveOutboundSendDep, type OutboundIdentity, } from "openclaw/plugin-sdk/outbound-runtime"; -import { - resolvePayloadMediaUrls, - sendPayloadMediaSequenceOrFallback, - sendTextMediaPayload, -} from "openclaw/plugin-sdk/reply-payload"; import { normalizeOptionalString, normalizeOptionalStringifiedId, } from "openclaw/plugin-sdk/text-runtime"; -import { readDiscordComponentSpec, type DiscordComponentMessageSpec } from "./components.js"; +import { chunkDiscordTextWithMode } from "./chunk.js"; +import { withDiscordDeliveryRetry } from "./delivery-retry.js"; +import { isLikelyDiscordVideoMedia } from "./media-detection.js"; import type { ThreadBindingRecord } from "./monitor/thread-bindings.js"; import { normalizeDiscordOutboundTarget } from "./normalize.js"; +import { + buildDiscordPresentationPayload, + normalizeDiscordApprovalPayload, + sendDiscordOutboundPayload, +} from "./outbound-payload.js"; +import { + loadDiscordSendRuntime, + resolveDiscordFormattingOptions, + resolveDiscordOutboundTarget, + type DiscordSendFn, + type DiscordVoiceSendFn, +} from "./outbound-send-context.js"; export const DISCORD_TEXT_CHUNK_LIMIT = 2000; -type DiscordSendRuntime = typeof import("./send.js"); -type DiscordSendFn = DiscordSendRuntime["sendMessageDiscord"]; -type DiscordComponentSendFn = typeof import("./send.components.js").sendDiscordComponentMessage; -type DiscordSharedInteractiveModule = typeof import("./shared-interactive.js"); type DiscordThreadBindingsModule = typeof import("./monitor/thread-bindings.js"); -let discordSendRuntimePromise: Promise | undefined; -let discordComponentSendPromise: Promise | undefined; -let discordSharedInteractivePromise: Promise | undefined; let discordThreadBindingsPromise: Promise | undefined; -async function loadDiscordSendRuntime(): Promise { - discordSendRuntimePromise ??= import("./send.js"); - return await discordSendRuntimePromise; -} - -async function sendDiscordComponentMessageLazy( - ...args: Parameters -): ReturnType { - discordComponentSendPromise ??= import("./send.components.js").then( - (module) => module.sendDiscordComponentMessage, - ); - return await ( - await discordComponentSendPromise - )(...args); -} - -function loadDiscordSharedInteractive(): Promise { - discordSharedInteractivePromise ??= import("./shared-interactive.js"); - return discordSharedInteractivePromise; -} - function loadDiscordThreadBindings(): Promise { discordThreadBindingsPromise ??= import("./monitor/thread-bindings.js"); return discordThreadBindingsPromise; } -function hasApprovalChannelData(payload: { channelData?: unknown }): boolean { - const channelData = payload.channelData; - if (!channelData || typeof channelData !== "object" || Array.isArray(channelData)) { - return false; - } - return Boolean((channelData as { execApproval?: unknown }).execApproval); -} - -function neutralizeDiscordApprovalMentions(value: string): string { - return value - .replace(/@everyone/gi, "@\u200beveryone") - .replace(/@here/gi, "@\u200bhere") - .replace(/<@/g, "<@\u200b") - .replace(/<#/g, "<#\u200b"); -} - -function normalizeDiscordApprovalPayload( - payload: T, -): T { - return hasApprovalChannelData(payload) && payload.text - ? { - ...payload, - text: neutralizeDiscordApprovalMentions(payload.text), - } - : payload; -} - -function resolveDiscordOutboundTarget(params: { - to: string; - threadId?: string | number | null; -}): string { - if (params.threadId == null) { - return params.to; - } - const threadId = normalizeOptionalStringifiedId(params.threadId) ?? ""; - if (!threadId) { - return params.to; - } - return `channel:${threadId}`; -} - function resolveDiscordWebhookIdentity(params: { identity?: OutboundIdentity; binding: ThreadBindingRecord; @@ -156,7 +95,11 @@ async function maybeSendDiscordWebhookText(params: { export const discordOutbound: ChannelOutboundAdapter = { deliveryMode: "direct", - chunker: null, + chunker: (text, limit, ctx) => + chunkDiscordTextWithMode(text, { + maxChars: limit, + maxLines: ctx?.formatting?.maxLinesPerMessage, + }), textChunkLimit: DISCORD_TEXT_CHUNK_LIMIT, pollMaxOptions: 10, normalizePayload: ({ payload }) => normalizeDiscordApprovalPayload(payload), @@ -168,105 +111,31 @@ export const discordOutbound: ChannelOutboundAdapter = { divider: true, }, renderPresentation: async ({ payload, presentation }) => { - const componentSpec = (await loadDiscordSharedInteractive()).buildDiscordPresentationComponents( + return await buildDiscordPresentationPayload({ + payload, presentation, - ); - if (!componentSpec) { - return null; - } - return { - ...payload, - channelData: { - ...payload.channelData, - discord: { - ...(payload.channelData?.discord as Record | undefined), - presentationComponents: componentSpec, - }, - }, - }; + }); }, resolveTarget: ({ to }) => normalizeDiscordOutboundTarget(to), - sendPayload: async (ctx) => { - const payload = normalizeDiscordApprovalPayload({ - ...ctx.payload, - text: ctx.payload.text ?? "", - }); - const discordData = payload.channelData?.discord as - | { components?: unknown; presentationComponents?: DiscordComponentMessageSpec } - | undefined; - const rawComponentSpec = - discordData?.presentationComponents ?? - readDiscordComponentSpec(discordData?.components) ?? - (payload.interactive - ? (await loadDiscordSharedInteractive()).buildDiscordInteractiveComponents( - payload.interactive, - ) - : undefined); - const componentSpec = rawComponentSpec - ? rawComponentSpec.text - ? rawComponentSpec - : { - ...rawComponentSpec, - text: payload.text?.trim() ? payload.text : undefined, - } - : undefined; - if (!componentSpec) { - return await sendTextMediaPayload({ - channel: "discord", - ctx: { - ...ctx, - payload, - }, - adapter: discordOutbound, - }); - } - const send = - resolveOutboundSendDep(ctx.deps, "discord") ?? - (await loadDiscordSendRuntime()).sendMessageDiscord; - const target = resolveDiscordOutboundTarget({ to: ctx.to, threadId: ctx.threadId }); - const mediaUrls = resolvePayloadMediaUrls(payload); - const result = await sendPayloadMediaSequenceOrFallback({ - text: payload.text ?? "", - mediaUrls, - fallbackResult: { messageId: "", channelId: target }, - sendNoMedia: async () => - await sendDiscordComponentMessageLazy(target, componentSpec, { - replyTo: ctx.replyToId ?? undefined, - accountId: ctx.accountId ?? undefined, - silent: ctx.silent ?? undefined, - cfg: ctx.cfg, - }), - send: async ({ text, mediaUrl, isFirst }) => { - if (isFirst) { - return await sendDiscordComponentMessageLazy(target, componentSpec, { - mediaUrl, - mediaAccess: ctx.mediaAccess, - mediaLocalRoots: ctx.mediaLocalRoots, - mediaReadFile: ctx.mediaReadFile, - replyTo: ctx.replyToId ?? undefined, - accountId: ctx.accountId ?? undefined, - silent: ctx.silent ?? undefined, - cfg: ctx.cfg, - }); - } - return await send(target, text, { - verbose: false, - mediaUrl, - mediaAccess: ctx.mediaAccess, - mediaLocalRoots: ctx.mediaLocalRoots, - mediaReadFile: ctx.mediaReadFile, - replyTo: ctx.replyToId ?? undefined, - accountId: ctx.accountId ?? undefined, - silent: ctx.silent ?? undefined, - cfg: ctx.cfg, - }); - }, - }); - return attachChannelToResult("discord", result); - }, + sendPayload: async (ctx) => + await sendDiscordOutboundPayload({ + ctx, + fallbackAdapter: discordOutbound, + }), ...createAttachedChannelResultAdapter({ channel: "discord", - sendText: async ({ cfg, to, text, accountId, deps, replyToId, threadId, identity, silent }) => { + sendText: async ({ + cfg, + to, + text, + accountId, + deps, + replyToId, + threadId, + identity, + silent, + formatting, + }) => { if (!silent) { const webhookResult = await maybeSendDiscordWebhookText({ cfg, @@ -283,12 +152,18 @@ export const discordOutbound: ChannelOutboundAdapter = { const send = resolveOutboundSendDep(deps, "discord") ?? (await loadDiscordSendRuntime()).sendMessageDiscord; - return await send(resolveDiscordOutboundTarget({ to, threadId }), text, { - verbose: false, - replyTo: replyToId ?? undefined, - accountId: accountId ?? undefined, - silent: silent ?? undefined, + return await withDiscordDeliveryRetry({ cfg, + accountId, + fn: async () => + await send(resolveDiscordOutboundTarget({ to, threadId }), text, { + verbose: false, + replyTo: replyToId ?? undefined, + accountId: accountId ?? undefined, + silent: silent ?? undefined, + cfg, + ...resolveDiscordFormattingOptions({ formatting }), + }), }); }, sendMedia: async ({ @@ -296,6 +171,8 @@ export const discordOutbound: ChannelOutboundAdapter = { to, text, mediaUrl, + audioAsVoice, + mediaAccess, mediaLocalRoots, mediaReadFile, accountId, @@ -303,28 +180,102 @@ export const discordOutbound: ChannelOutboundAdapter = { replyToId, threadId, silent, + formatting, }) => { const send = resolveOutboundSendDep(deps, "discord") ?? (await loadDiscordSendRuntime()).sendMessageDiscord; - return await send(resolveDiscordOutboundTarget({ to, threadId }), text, { - verbose: false, - mediaUrl, - mediaLocalRoots, - mediaReadFile, - replyTo: replyToId ?? undefined, - accountId: accountId ?? undefined, - silent: silent ?? undefined, + const target = resolveDiscordOutboundTarget({ to, threadId }); + const formattingOptions = resolveDiscordFormattingOptions({ formatting }); + if (audioAsVoice && mediaUrl) { + const sendVoice = + resolveOutboundSendDep(deps, "discordVoice") ?? + (await loadDiscordSendRuntime()).sendVoiceMessageDiscord; + return await withDiscordDeliveryRetry({ + cfg, + accountId, + fn: async () => + await sendVoice(target, mediaUrl, { + cfg, + replyTo: replyToId ?? undefined, + accountId: accountId ?? undefined, + silent: silent ?? undefined, + }), + }); + } + if (text.trim() && mediaUrl && isLikelyDiscordVideoMedia(mediaUrl)) { + await withDiscordDeliveryRetry({ + cfg, + accountId, + fn: async () => + await send(target, text, { + verbose: false, + replyTo: replyToId ?? undefined, + accountId: accountId ?? undefined, + silent: silent ?? undefined, + cfg, + ...formattingOptions, + }), + }); + return await withDiscordDeliveryRetry({ + cfg, + accountId, + fn: async () => + await send(target, "", { + verbose: false, + mediaUrl, + mediaAccess, + mediaLocalRoots, + mediaReadFile, + accountId: accountId ?? undefined, + silent: silent ?? undefined, + cfg, + ...formattingOptions, + }), + }); + } + return await withDiscordDeliveryRetry({ cfg, + accountId, + fn: async () => + await send(target, text, { + verbose: false, + mediaUrl, + mediaAccess, + mediaLocalRoots, + mediaReadFile, + replyTo: replyToId ?? undefined, + accountId: accountId ?? undefined, + silent: silent ?? undefined, + cfg, + ...formattingOptions, + }), }); }, sendPoll: async ({ cfg, to, poll, accountId, threadId, silent }) => - await ( - await loadDiscordSendRuntime() - ).sendPollDiscord(resolveDiscordOutboundTarget({ to, threadId }), poll, { - accountId: accountId ?? undefined, - silent: silent ?? undefined, + await withDiscordDeliveryRetry({ cfg, + accountId, + fn: async () => + await ( + await loadDiscordSendRuntime() + ).sendPollDiscord(resolveDiscordOutboundTarget({ to, threadId }), poll, { + accountId: accountId ?? undefined, + silent: silent ?? undefined, + cfg, + }), }), }), + afterDeliverPayload: async ({ target }) => { + const threadId = normalizeOptionalStringifiedId(target.threadId); + if (!threadId) { + return; + } + const { getThreadBindingManager } = await loadDiscordThreadBindings(); + const manager = getThreadBindingManager(target.accountId ?? undefined); + if (!manager?.getByThreadId(threadId)) { + return; + } + manager.touchThread({ threadId }); + }, }; diff --git a/extensions/discord/src/outbound-payload.contract.test.ts b/extensions/discord/src/outbound-payload.contract.test.ts index f954036f4f3..39157a61a33 100644 --- a/extensions/discord/src/outbound-payload.contract.test.ts +++ b/extensions/discord/src/outbound-payload.contract.test.ts @@ -32,7 +32,7 @@ function createDiscordHarness(params: OutboundPayloadHarnessParams) { describe("Discord outbound payload contract", () => { installChannelOutboundPayloadContractSuite({ channel: "discord", - chunking: { mode: "passthrough", longTextLength: 3000 }, + chunking: { mode: "split", longTextLength: 3000, maxChunkLength: 2000 }, createHarness: createDiscordHarness, }); }); diff --git a/extensions/discord/src/outbound-payload.ts b/extensions/discord/src/outbound-payload.ts new file mode 100644 index 00000000000..6ee40dee0b8 --- /dev/null +++ b/extensions/discord/src/outbound-payload.ts @@ -0,0 +1,241 @@ +import { + attachChannelToResult, + type ChannelOutboundAdapter, +} from "openclaw/plugin-sdk/channel-send-result"; +import { + resolvePayloadMediaUrls, + sendPayloadMediaSequenceOrFallback, + sendTextMediaPayload, +} from "openclaw/plugin-sdk/reply-payload"; +import { readDiscordComponentSpec, type DiscordComponentMessageSpec } from "./components.js"; +import { createDiscordPayloadSendContext } from "./outbound-send-context.js"; + +type DiscordComponentSendFn = typeof import("./send.components.js").sendDiscordComponentMessage; +type DiscordSharedInteractiveModule = typeof import("./shared-interactive.js"); + +let discordComponentSendPromise: Promise | undefined; +let discordSharedInteractivePromise: Promise | undefined; + +async function sendDiscordComponentMessageLazy( + ...args: Parameters +): ReturnType { + discordComponentSendPromise ??= import("./send.components.js").then( + (module) => module.sendDiscordComponentMessage, + ); + return await ( + await discordComponentSendPromise + )(...args); +} + +function loadDiscordSharedInteractive(): Promise { + discordSharedInteractivePromise ??= import("./shared-interactive.js"); + return discordSharedInteractivePromise; +} + +function hasApprovalChannelData(payload: { channelData?: unknown }): boolean { + const channelData = payload.channelData; + if (!channelData || typeof channelData !== "object" || Array.isArray(channelData)) { + return false; + } + return Boolean((channelData as { execApproval?: unknown }).execApproval); +} + +function neutralizeDiscordApprovalMentions(value: string): string { + return value + .replace(/@everyone/gi, "@\u200beveryone") + .replace(/@here/gi, "@\u200bhere") + .replace(/<@/g, "<@\u200b") + .replace(/<#/g, "<#\u200b"); +} + +export function normalizeDiscordApprovalPayload< + T extends { + text?: string; + channelData?: unknown; + }, +>(payload: T): T { + return hasApprovalChannelData(payload) && payload.text + ? { + ...payload, + text: neutralizeDiscordApprovalMentions(payload.text), + } + : payload; +} + +export async function buildDiscordPresentationPayload(params: { + payload: Parameters>[0]["payload"]; + presentation: Parameters< + NonNullable + >[0]["presentation"]; +}): Promise { + const componentSpec = (await loadDiscordSharedInteractive()).buildDiscordPresentationComponents( + params.presentation, + ); + if (!componentSpec) { + return null; + } + return { + ...params.payload, + channelData: { + ...params.payload.channelData, + discord: { + ...(params.payload.channelData?.discord as Record | undefined), + presentationComponents: componentSpec, + }, + }, + }; +} + +function resolveDiscordComponentSpec( + payload: Parameters>[0]["payload"], +): Promise { + const discordData = payload.channelData?.discord as + | { components?: unknown; presentationComponents?: DiscordComponentMessageSpec } + | undefined; + const rawComponentSpec = + discordData?.presentationComponents ?? readDiscordComponentSpec(discordData?.components); + if (rawComponentSpec) { + return Promise.resolve( + rawComponentSpec.text + ? rawComponentSpec + : { + ...rawComponentSpec, + text: payload.text?.trim() ? payload.text : undefined, + }, + ); + } + if (!payload.interactive) { + return Promise.resolve(undefined); + } + return loadDiscordSharedInteractive().then((module) => { + const interactiveSpec = module.buildDiscordInteractiveComponents(payload.interactive); + if (!interactiveSpec) { + return undefined; + } + return interactiveSpec.text + ? interactiveSpec + : { + ...interactiveSpec, + text: payload.text?.trim() ? payload.text : undefined, + }; + }); +} + +export async function sendDiscordOutboundPayload(params: { + ctx: Parameters>[0]; + fallbackAdapter: ChannelOutboundAdapter; +}): Promise>>> { + const ctx = params.ctx; + const payload = normalizeDiscordApprovalPayload({ + ...ctx.payload, + text: ctx.payload.text ?? "", + }); + const mediaUrls = resolvePayloadMediaUrls(payload); + const sendContext = await createDiscordPayloadSendContext(ctx); + + if (payload.audioAsVoice && mediaUrls.length > 0) { + let lastResult = await sendContext.withRetry( + async () => + await sendContext.sendVoice(sendContext.target, mediaUrls[0], { + cfg: ctx.cfg, + replyTo: sendContext.resolveReplyTo(), + accountId: ctx.accountId ?? undefined, + silent: ctx.silent ?? undefined, + }), + ); + if (payload.text?.trim()) { + lastResult = await sendContext.withRetry( + async () => + await sendContext.send(sendContext.target, payload.text, { + verbose: false, + replyTo: sendContext.resolveReplyTo(), + accountId: ctx.accountId ?? undefined, + silent: ctx.silent ?? undefined, + cfg: ctx.cfg, + ...sendContext.formatting, + }), + ); + } + for (const mediaUrl of mediaUrls.slice(1)) { + lastResult = await sendContext.withRetry( + async () => + await sendContext.send(sendContext.target, "", { + verbose: false, + mediaUrl, + mediaAccess: ctx.mediaAccess, + mediaLocalRoots: ctx.mediaLocalRoots, + mediaReadFile: ctx.mediaReadFile, + replyTo: sendContext.resolveReplyTo(), + accountId: ctx.accountId ?? undefined, + silent: ctx.silent ?? undefined, + cfg: ctx.cfg, + ...sendContext.formatting, + }), + ); + } + return attachChannelToResult("discord", lastResult); + } + + const componentSpec = await resolveDiscordComponentSpec(payload); + if (!componentSpec) { + return await sendTextMediaPayload({ + channel: "discord", + ctx: { + ...ctx, + payload, + }, + adapter: params.fallbackAdapter, + }); + } + + const result = await sendPayloadMediaSequenceOrFallback({ + text: payload.text ?? "", + mediaUrls, + fallbackResult: { messageId: "", channelId: sendContext.target }, + sendNoMedia: async () => + await sendContext.withRetry( + async () => + await sendDiscordComponentMessageLazy(sendContext.target, componentSpec, { + replyTo: sendContext.resolveReplyTo(), + accountId: ctx.accountId ?? undefined, + silent: ctx.silent ?? undefined, + cfg: ctx.cfg, + ...sendContext.formatting, + }), + ), + send: async ({ text, mediaUrl, isFirst }) => { + if (isFirst) { + return await sendContext.withRetry( + async () => + await sendDiscordComponentMessageLazy(sendContext.target, componentSpec, { + mediaUrl, + mediaAccess: ctx.mediaAccess, + mediaLocalRoots: ctx.mediaLocalRoots, + mediaReadFile: ctx.mediaReadFile, + replyTo: sendContext.resolveReplyTo(), + accountId: ctx.accountId ?? undefined, + silent: ctx.silent ?? undefined, + cfg: ctx.cfg, + ...sendContext.formatting, + }), + ); + } + return await sendContext.withRetry( + async () => + await sendContext.send(sendContext.target, text, { + verbose: false, + mediaUrl, + mediaAccess: ctx.mediaAccess, + mediaLocalRoots: ctx.mediaLocalRoots, + mediaReadFile: ctx.mediaReadFile, + replyTo: sendContext.resolveReplyTo(), + accountId: ctx.accountId ?? undefined, + silent: ctx.silent ?? undefined, + cfg: ctx.cfg, + ...sendContext.formatting, + }), + ); + }, + }); + return attachChannelToResult("discord", result); +} diff --git a/extensions/discord/src/outbound-send-context.ts b/extensions/discord/src/outbound-send-context.ts new file mode 100644 index 00000000000..481c52ec8f7 --- /dev/null +++ b/extensions/discord/src/outbound-send-context.ts @@ -0,0 +1,112 @@ +import type { OpenClawConfig, ReplyToMode } from "openclaw/plugin-sdk/config-runtime"; +import { + resolveOutboundSendDep, + type OutboundSendDeps, +} from "openclaw/plugin-sdk/outbound-runtime"; +import { isSingleUseReplyToMode } from "openclaw/plugin-sdk/reply-reference"; +import { + normalizeOptionalString, + normalizeOptionalStringifiedId, +} from "openclaw/plugin-sdk/text-runtime"; +import { withDiscordDeliveryRetry } from "./delivery-retry.js"; + +type DiscordSendRuntime = typeof import("./send.js"); + +export type DiscordSendFn = DiscordSendRuntime["sendMessageDiscord"]; +export type DiscordVoiceSendFn = DiscordSendRuntime["sendVoiceMessageDiscord"]; +export type DiscordFormattingOptions = { + textLimit?: number; + maxLinesPerMessage?: number; + tableMode?: NonNullable[2]>["tableMode"]; + chunkMode?: NonNullable[2]>["chunkMode"]; +}; + +let discordSendRuntimePromise: Promise | undefined; + +export async function loadDiscordSendRuntime(): Promise { + discordSendRuntimePromise ??= import("./send.js"); + return await discordSendRuntimePromise; +} + +export function resolveDiscordOutboundTarget(params: { + to: string; + threadId?: string | number | null; +}): string { + if (params.threadId == null) { + return params.to; + } + const threadId = normalizeOptionalStringifiedId(params.threadId) ?? ""; + if (!threadId) { + return params.to; + } + return `channel:${threadId}`; +} + +export function resolveDiscordFormattingOptions(ctx: { + formatting?: DiscordFormattingOptions; +}): DiscordFormattingOptions { + const formatting = ctx.formatting; + return { + textLimit: formatting?.textLimit, + maxLinesPerMessage: formatting?.maxLinesPerMessage, + tableMode: formatting?.tableMode, + chunkMode: formatting?.chunkMode, + }; +} + +export function createResolvedReplyToFanout(params: { + replyToId?: string | null; + replyToMode?: ReplyToMode; +}): () => string | undefined { + const replyToId = normalizeOptionalString(params.replyToId); + if (!replyToId) { + return () => undefined; + } + if (!params.replyToMode || !isSingleUseReplyToMode(params.replyToMode)) { + return () => replyToId; + } + let current: string | undefined = replyToId; + return () => { + const value = current; + current = undefined; + return value; + }; +} + +export async function createDiscordPayloadSendContext(ctx: { + cfg: OpenClawConfig; + to: string; + accountId?: string | null; + deps?: OutboundSendDeps; + replyToId?: string | null; + replyToMode?: ReplyToMode; + formatting?: DiscordFormattingOptions; + threadId?: string | number | null; +}): Promise<{ + target: string; + formatting: DiscordFormattingOptions; + resolveReplyTo: () => string | undefined; + send: DiscordSendFn; + sendVoice: DiscordVoiceSendFn; + withRetry: (fn: () => Promise) => Promise; +}> { + const runtime = await loadDiscordSendRuntime(); + return { + target: resolveDiscordOutboundTarget({ to: ctx.to, threadId: ctx.threadId }), + formatting: resolveDiscordFormattingOptions(ctx), + resolveReplyTo: createResolvedReplyToFanout({ + replyToId: ctx.replyToId, + replyToMode: ctx.replyToMode, + }), + send: resolveOutboundSendDep(ctx.deps, "discord") ?? runtime.sendMessageDiscord, + sendVoice: + resolveOutboundSendDep(ctx.deps, "discordVoice") ?? + runtime.sendVoiceMessageDiscord, + withRetry: async (fn) => + await withDiscordDeliveryRetry({ + cfg: ctx.cfg, + accountId: ctx.accountId, + fn, + }), + }; +} diff --git a/extensions/discord/src/send.components.ts b/extensions/discord/src/send.components.ts index 59c32f0ca6e..5347f3c8ed6 100644 --- a/extensions/discord/src/send.components.ts +++ b/extensions/discord/src/send.components.ts @@ -5,8 +5,13 @@ import { type RequestClient, } from "@buape/carbon"; import { ChannelType, Routes } from "discord-api-types/v10"; -import { requireRuntimeConfig, type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; +import { + requireRuntimeConfig, + type MarkdownTableMode, + type OpenClawConfig, +} from "openclaw/plugin-sdk/config-runtime"; import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime"; +import type { ChunkMode } from "openclaw/plugin-sdk/reply-chunking"; import { resolveDiscordAccount } from "./accounts.js"; import { registerDiscordComponentEntries } from "./components-registry.js"; import { @@ -157,6 +162,10 @@ type DiscordComponentSendOpts = { mediaLocalRoots?: readonly string[]; mediaReadFile?: (filePath: string) => Promise; filename?: string; + textLimit?: number; + maxLinesPerMessage?: number; + tableMode?: MarkdownTableMode; + chunkMode?: ChunkMode; }; export function registerBuiltDiscordComponentMessage(params: { @@ -260,6 +269,10 @@ export async function sendDiscordComponentMessage( mediaAccess: opts.mediaAccess, replyTo: opts.replyTo, silent: opts.silent, + textLimit: opts.textLimit, + maxLinesPerMessage: opts.maxLinesPerMessage, + tableMode: opts.tableMode, + chunkMode: opts.chunkMode, }); } diff --git a/extensions/discord/src/send.outbound.ts b/extensions/discord/src/send.outbound.ts index cb7ab8d3962..8d43d6b0242 100644 --- a/extensions/discord/src/send.outbound.ts +++ b/extensions/discord/src/send.outbound.ts @@ -3,14 +3,18 @@ import fs from "node:fs/promises"; import path from "node:path"; import { serializePayload, type MessagePayloadObject, type RequestClient } from "@buape/carbon"; import { ChannelType, Routes } from "discord-api-types/v10"; -import { requireRuntimeConfig, type OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; +import { + requireRuntimeConfig, + type MarkdownTableMode, + type OpenClawConfig, +} from "openclaw/plugin-sdk/config-runtime"; import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/config-runtime"; import { recordChannelActivity } from "openclaw/plugin-sdk/infra-runtime"; import { maxBytesForKind } from "openclaw/plugin-sdk/media-runtime"; import { extensionForMime } from "openclaw/plugin-sdk/media-runtime"; import { unlinkIfExists } from "openclaw/plugin-sdk/media-runtime"; import type { PollInput } from "openclaw/plugin-sdk/media-runtime"; -import { resolveChunkMode } from "openclaw/plugin-sdk/reply-chunking"; +import { resolveChunkMode, type ChunkMode } from "openclaw/plugin-sdk/reply-chunking"; import type { RetryConfig } from "openclaw/plugin-sdk/retry-runtime"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { convertMarkdownTables, normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; @@ -60,6 +64,10 @@ type DiscordSendOpts = { rest?: RequestClient; replyTo?: string; retry?: RetryConfig; + textLimit?: number; + maxLinesPerMessage?: number; + tableMode?: MarkdownTableMode; + chunkMode?: ChunkMode; components?: DiscordSendComponents; embeds?: DiscordSendEmbeds; silent?: boolean; @@ -81,6 +89,7 @@ async function sendDiscordThreadTextChunks(params: { request: DiscordClientRequest; maxLinesPerMessage?: number; chunkMode: ReturnType; + maxChars?: number; silent?: boolean; }): Promise { for (const chunk of params.chunks) { @@ -95,6 +104,7 @@ async function sendDiscordThreadTextChunks(params: { undefined, params.chunkMode, params.silent, + params.maxChars, ); } } @@ -150,12 +160,18 @@ export async function sendMessageDiscord( channel: "discord", accountId: accountInfo.accountId, }); - const chunkMode = resolveChunkMode(cfg, "discord", accountInfo.accountId); + const effectiveTableMode = opts.tableMode ?? tableMode; + const chunkMode = opts.chunkMode ?? resolveChunkMode(cfg, "discord", accountInfo.accountId); + const maxLinesPerMessage = opts.maxLinesPerMessage ?? accountInfo.config.maxLinesPerMessage; + const textLimit = + typeof opts.textLimit === "number" && Number.isFinite(opts.textLimit) + ? Math.max(1, Math.min(Math.floor(opts.textLimit), 2000)) + : undefined; const mediaMaxBytes = typeof accountInfo.config.mediaMaxMb === "number" ? accountInfo.config.mediaMaxMb * 1024 * 1024 : DEFAULT_DISCORD_MEDIA_MAX_MB * 1024 * 1024; - const textWithTables = convertMarkdownTables(text ?? "", tableMode); + const textWithTables = convertMarkdownTables(text ?? "", effectiveTableMode); const textWithMentions = rewriteDiscordKnownMentions(textWithTables, { accountId: accountInfo.accountId, }); @@ -169,8 +185,9 @@ export async function sendMessageDiscord( if (isForumLikeType(channelType)) { const threadName = deriveForumThreadName(textWithTables); const chunks = buildDiscordTextChunks(textWithMentions, { - maxLinesPerMessage: accountInfo.config.maxLinesPerMessage, + maxLinesPerMessage, chunkMode, + maxChars: textLimit, }); const starterContent = chunks[0]?.trim() ? chunks[0] : threadName; const starterComponents = resolveDiscordSendComponents({ @@ -227,19 +244,21 @@ export async function sendMessageDiscord( mediaMaxBytes, undefined, request, - accountInfo.config.maxLinesPerMessage, + maxLinesPerMessage, undefined, undefined, chunkMode, opts.silent, + textLimit, ); await sendDiscordThreadTextChunks({ rest, threadId, chunks: afterMediaChunks, request, - maxLinesPerMessage: accountInfo.config.maxLinesPerMessage, + maxLinesPerMessage, chunkMode, + maxChars: textLimit, silent: opts.silent, }); } else { @@ -248,8 +267,9 @@ export async function sendMessageDiscord( threadId, chunks: remainingChunks, request, - maxLinesPerMessage: accountInfo.config.maxLinesPerMessage, + maxLinesPerMessage, chunkMode, + maxChars: textLimit, silent: opts.silent, }); } @@ -291,11 +311,12 @@ export async function sendMessageDiscord( mediaMaxBytes, opts.replyTo, request, - accountInfo.config.maxLinesPerMessage, + maxLinesPerMessage, opts.components, opts.embeds, chunkMode, opts.silent, + textLimit, ); } else { result = await sendDiscordText( @@ -304,11 +325,12 @@ export async function sendMessageDiscord( textWithMentions, opts.replyTo, request, - accountInfo.config.maxLinesPerMessage, + maxLinesPerMessage, opts.components, opts.embeds, chunkMode, opts.silent, + textLimit, ); } } catch (err) { diff --git a/extensions/discord/src/send.shared.ts b/extensions/discord/src/send.shared.ts index f54cabb7f45..45097ae3a06 100644 --- a/extensions/discord/src/send.shared.ts +++ b/extensions/discord/src/send.shared.ts @@ -357,13 +357,14 @@ async function sendDiscordText( embeds?: DiscordSendEmbeds, chunkMode?: ChunkMode, silent?: boolean, + maxChars?: number, ) { if (!text.trim()) { throw new Error("Message must be non-empty for Discord sends"); } const messageReference = replyTo ? { message_id: replyTo, fail_if_not_exists: false } : undefined; const flags = silent ? SUPPRESS_NOTIFICATIONS_FLAG : undefined; - const chunks = buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode }); + const chunks = buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode, maxChars }); const sendChunk = async (chunk: string, isFirst: boolean) => { const chunkComponents = resolveDiscordSendComponents({ components, @@ -418,6 +419,7 @@ async function sendDiscordMedia( embeds?: DiscordSendEmbeds, chunkMode?: ChunkMode, silent?: boolean, + maxChars?: number, ) { const media = await loadWebMedia( mediaUrl, @@ -429,7 +431,9 @@ async function sendDiscordMedia( media.fileName || (media.contentType ? `upload${extensionForMime(media.contentType) ?? ""}` : "") || "upload"; - const chunks = text ? buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode }) : []; + const chunks = text + ? buildDiscordTextChunks(text, { maxLinesPerMessage, chunkMode, maxChars }) + : []; const caption = chunks[0] ?? ""; const messageReference = replyTo ? { message_id: replyTo, fail_if_not_exists: false } : undefined; const flags = silent ? SUPPRESS_NOTIFICATIONS_FLAG : undefined; @@ -477,6 +481,7 @@ async function sendDiscordMedia( undefined, chunkMode, silent, + maxChars, ); } return res; diff --git a/src/channels/plugins/outbound.types.ts b/src/channels/plugins/outbound.types.ts index 938998cfa05..acc8425f2a8 100644 --- a/src/channels/plugins/outbound.types.ts +++ b/src/channels/plugins/outbound.types.ts @@ -1,6 +1,8 @@ import type { ReplyPayload } from "../../auto-reply/reply-payload.js"; +import type { ReplyToMode } from "../../config/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import type { OutboundDeliveryResult } from "../../infra/outbound/deliver-types.js"; +import type { OutboundDeliveryFormattingOptions } from "../../infra/outbound/formatting.js"; import type { OutboundIdentity } from "../../infra/outbound/identity-types.js"; import type { OutboundSendDeps } from "../../infra/outbound/send-deps.js"; import type { MessagePresentation, ReplyPayloadDeliveryPin } from "../../interactive/payload.js"; @@ -24,6 +26,9 @@ export type ChannelOutboundContext = { /** Send image as document to avoid Telegram compression. */ forceDocument?: boolean; replyToId?: string | null; + replyToIdSource?: "explicit" | "implicit"; + replyToMode?: ReplyToMode; + formatting?: OutboundDeliveryFormattingOptions; threadId?: string | number | null; accountId?: string | null; identity?: OutboundIdentity; @@ -63,9 +68,13 @@ export type ChannelOutboundFormattedContext = ChannelOutboundContext & { abortSignal?: AbortSignal; }; +export type ChannelOutboundChunkContext = { + formatting?: OutboundDeliveryFormattingOptions; +}; + export type ChannelOutboundAdapter = { deliveryMode: "direct" | "gateway" | "hybrid"; - chunker?: ((text: string, limit: number) => string[]) | null; + chunker?: ((text: string, limit: number, ctx?: ChannelOutboundChunkContext) => string[]) | null; chunkerMode?: "text" | "markdown"; textChunkLimit?: number; sanitizeText?: (params: { text: string; payload: ReplyPayload }) => string; @@ -91,6 +100,12 @@ export type ChannelOutboundAdapter = { payload: ReplyPayload; hint?: ChannelOutboundPayloadHint; }) => Promise | void; + afterDeliverPayload?: (params: { + cfg: OpenClawConfig; + target: ChannelOutboundTargetRef; + payload: ReplyPayload; + results: readonly OutboundDeliveryResult[]; + }) => Promise | void; presentationCapabilities?: ChannelPresentationCapabilities; deliveryCapabilities?: ChannelDeliveryCapabilities; renderPresentation?: (params: { diff --git a/src/channels/plugins/types.adapters.ts b/src/channels/plugins/types.adapters.ts index d057c538665..88421ea0eb4 100644 --- a/src/channels/plugins/types.adapters.ts +++ b/src/channels/plugins/types.adapters.ts @@ -18,6 +18,7 @@ import type { ChannelRuntimeSurface } from "./channel-runtime-surface.types.js"; import type { ConfigWriteTarget } from "./config-writes.js"; export type { ChannelOutboundAdapter, + ChannelOutboundChunkContext, ChannelOutboundContext, ChannelOutboundFormattedContext, ChannelOutboundPayloadContext, diff --git a/src/channels/plugins/types.ts b/src/channels/plugins/types.ts index 48af67955a3..2b1d49ba868 100644 --- a/src/channels/plugins/types.ts +++ b/src/channels/plugins/types.ts @@ -33,6 +33,7 @@ export type { ChannelLogoutContext, ChannelLogoutResult, ChannelOutboundAdapter, + ChannelOutboundChunkContext, ChannelOutboundContext, ChannelOutboundPayloadHint, ChannelOutboundTargetRef, diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 68ea95c4989..0110441bddc 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -448,6 +448,213 @@ describe("deliverOutboundPayloads", () => { expect(results.map((entry) => entry.messageId)).toEqual(["ab", "cd"]); }); + it("uses replyToId only on the first low-level send for single-use reply modes", async () => { + const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({ + channel: "matrix" as const, + messageId: text, + roomId: "!room", + })); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + textChunkLimit: 2, + chunker: (text, limit) => { + const chunks: string[] = []; + for (let i = 0; i < text.length; i += limit) { + chunks.push(text.slice(i, i + limit)); + } + return chunks; + }, + sendText, + }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: { channels: { matrix: { textChunkLimit: 2 } } } as OpenClawConfig, + channel: "matrix", + to: "!room", + payloads: [{ text: "abcd" }], + replyToId: "777", + replyToMode: "first", + }); + + expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual(["777", undefined]); + }); + + it("suppresses fallback replyToId when replyToMode is off but preserves explicit payload replies", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "message_sending", + ); + const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({ + channel: "matrix" as const, + messageId: text, + roomId: "!room", + })); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText, + }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room", + payloads: [{ text: "fallback" }, { text: "explicit", replyToId: "payload-reply" }], + replyToId: "fallback-reply", + replyToMode: "off", + }); + + expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual([ + undefined, + "payload-reply", + ]); + expect( + hookMocks.runner.runMessageSending.mock.calls.map( + ([event]) => (event as { replyToId?: string }).replyToId, + ), + ).toEqual([undefined, "payload-reply"]); + }); + + it("does not let explicit payload replies consume the implicit single-use reply slot", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "message_sending", + ); + const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({ + channel: "matrix" as const, + messageId: text, + roomId: "!room", + })); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText, + }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room", + payloads: [{ text: "explicit", replyToId: "payload-reply" }, { text: "fallback" }], + replyToId: "fallback-reply", + replyToMode: "first", + }); + + expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual([ + "payload-reply", + "fallback-reply", + ]); + expect( + hookMocks.runner.runMessageSending.mock.calls.map( + ([event]) => (event as { replyToId?: string }).replyToId, + ), + ).toEqual(["payload-reply", "fallback-reply"]); + }); + + it("skips text-only payloads blanked by message_sending hooks", async () => { + hookMocks.runner.hasHooks.mockImplementation( + (hookName?: string) => hookName === "message_sending", + ); + hookMocks.runner.runMessageSending.mockResolvedValue({ content: " " }); + const sendText = vi.fn().mockResolvedValue({ + channel: "matrix" as const, + messageId: "should-not-send", + roomId: "!room", + }); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText, + }, + }), + }, + ]), + ); + + const results = await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room", + payloads: [{ text: "redact me" }], + }); + + expect(results).toEqual([]); + expect(sendText).not.toHaveBeenCalled(); + }); + + it("runs adapter after-delivery hooks with the payload delivery results", async () => { + const afterDeliverPayload = vi.fn(); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + sendText: async ({ text }) => ({ + channel: "matrix" as const, + messageId: text, + }), + afterDeliverPayload, + }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: {}, + channel: "matrix", + to: "!room", + payloads: [{ text: "hello" }], + }); + + expect(afterDeliverPayload).toHaveBeenCalledWith( + expect.objectContaining({ + target: expect.objectContaining({ channel: "matrix", to: "!room" }), + payload: expect.objectContaining({ text: "hello" }), + results: [{ channel: "matrix", messageId: "hello" }], + }), + ); + }); + it("uses adapter-provided formatted senders and scoped media roots when available", async () => { const sendText = vi.fn(async ({ text }: { text: string }) => ({ channel: "line" as const, @@ -681,6 +888,96 @@ describe("deliverOutboundPayloads", () => { ); }); + it("lets explicit formatting options override configured chunking", async () => { + const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({ + channel: "matrix" as const, + messageId: text, + roomId: "!room", + })); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + chunker: (text, limit) => { + const chunks: string[] = []; + for (let i = 0; i < text.length; i += limit) { + chunks.push(text.slice(i, i + limit)); + } + return chunks; + }, + textChunkLimit: 4000, + sendText, + }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: { channels: { matrix: { textChunkLimit: 4000 } } } as OpenClawConfig, + channel: "matrix", + to: "!room", + payloads: [{ text: "abcd" }], + formatting: { textLimit: 2, chunkMode: "length" }, + }); + + expect(sendText.mock.calls.map((call) => call[0]?.text)).toEqual(["ab", "cd"]); + }); + + it("passes formatting options to adapter chunkers before consuming single-use replies", async () => { + const sendText = vi.fn().mockImplementation(async ({ text }: { text: string }) => ({ + channel: "matrix" as const, + messageId: text, + roomId: "!room", + })); + setActivePluginRegistry( + createTestRegistry([ + { + pluginId: "matrix", + source: "test", + plugin: createOutboundTestPlugin({ + id: "matrix", + outbound: { + deliveryMode: "direct", + chunker: (text, _limit, ctx) => + text.split("\n").reduce((chunks, line) => { + const maxLines = ctx?.formatting?.maxLinesPerMessage; + if (maxLines === 1) { + chunks.push(line); + return chunks; + } + chunks[chunks.length - 1] = chunks.length + ? `${chunks[chunks.length - 1]}\n${line}` + : line; + return chunks; + }, []), + textChunkLimit: 4000, + sendText, + }, + }), + }, + ]), + ); + + await deliverOutboundPayloads({ + cfg: { channels: { matrix: { textChunkLimit: 4000 } } } as OpenClawConfig, + channel: "matrix", + to: "!room", + payloads: [{ text: "line one\nline two" }], + replyToId: "reply-1", + replyToMode: "first", + formatting: { maxLinesPerMessage: 1 }, + }); + + expect(sendText.mock.calls.map((call) => call[0]?.text)).toEqual(["line one", "line two"]); + expect(sendText.mock.calls.map((call) => call[0]?.replyToId)).toEqual(["reply-1", undefined]); + }); + it("drops text payloads after adapter sanitization removes all content", async () => { const sendMatrix = vi.fn().mockResolvedValue({ messageId: "m1", roomId: "!room:example" }); const results = await deliverMatrixPayload({ diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 0ae718ca2de..49f7446fd17 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -14,6 +14,7 @@ import type { ChannelOutboundTargetRef, } from "../../channels/plugins/types.adapters.js"; import { resolveMirroredTranscriptText } from "../../config/sessions/transcript-mirror.js"; +import type { ReplyToMode } from "../../config/types.js"; import type { OpenClawConfig } from "../../config/types.openclaw.js"; import { fireAndForgetHook } from "../../hooks/fire-and-forget.js"; import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; @@ -42,6 +43,7 @@ import { failDelivery, withActiveDeliveryClaim, } from "./delivery-queue.js"; +import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; import type { OutboundIdentity } from "./identity.js"; import type { DeliveryMirror } from "./mirror.js"; import { @@ -51,6 +53,7 @@ import { type NormalizedOutboundPayload, type OutboundPayloadPlan, } from "./payloads.js"; +import { createReplyToDeliveryPolicy } from "./reply-policy.js"; import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js"; import type { OutboundSessionContext } from "./session-context.js"; import type { OutboundChannel } from "./targets.js"; @@ -79,7 +82,11 @@ async function loadChannelBootstrapRuntime() { return await channelBootstrapRuntimePromise; } -type Chunker = (text: string, limit: number) => string[]; +type Chunker = ( + text: string, + limit: number, + ctx?: { formatting?: OutboundDeliveryFormattingOptions }, +) => string[]; type ChannelHandler = { chunker: Chunker | null; @@ -94,6 +101,11 @@ type ChannelHandler = { messageId: string; pin: ReplyPayloadDeliveryPin; }) => Promise; + afterDeliverPayload?: (params: { + target: ChannelOutboundTargetRef; + payload: ReplyPayload; + results: readonly OutboundDeliveryResult[]; + }) => Promise; buildTargetRef: (overrides?: { threadId?: string | number | null }) => ChannelOutboundTargetRef; shouldSkipPlainTextSanitization?: (payload: ReplyPayload) => boolean; resolveEffectiveTextChunkLimit?: (fallbackLimit?: number) => number | undefined; @@ -147,6 +159,8 @@ type ChannelHandlerParams = { to: string; accountId?: string; replyToId?: string | null; + replyToMode?: ReplyToMode; + formatting?: OutboundDeliveryFormattingOptions; threadId?: string | number | null; identity?: OutboundIdentity; deps?: OutboundSendDeps; @@ -193,8 +207,8 @@ function createPluginHandler( audioAsVoice?: boolean; }): Omit => ({ ...baseCtx, - replyToId: overrides?.replyToId ?? baseCtx.replyToId, - threadId: overrides?.threadId ?? baseCtx.threadId, + replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId, + threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId, audioAsVoice: overrides?.audioAsVoice, }); const buildTargetRef = (overrides?: { @@ -244,6 +258,15 @@ function createPluginHandler( pin, }) : undefined, + afterDeliverPayload: outbound.afterDeliverPayload + ? async ({ target, payload, results }) => + outbound.afterDeliverPayload!({ + cfg: params.cfg, + target, + payload, + results, + }) + : undefined, shouldSkipPlainTextSanitization: outbound.shouldSkipPlainTextSanitization ? (payload) => outbound.shouldSkipPlainTextSanitization!({ payload }) : undefined, @@ -309,6 +332,8 @@ function createChannelOutboundContextBase( to: params.to, accountId: params.accountId, replyToId: params.replyToId, + replyToMode: params.replyToMode, + formatting: params.formatting, threadId: params.threadId, identity: params.identity, gifPlayback: params.gifPlayback, @@ -331,9 +356,12 @@ type DeliverOutboundPayloadsCoreParams = { accountId?: string; payloads: ReplyPayload[]; replyToId?: string | null; + replyToMode?: ReplyToMode; + formatting?: OutboundDeliveryFormattingOptions; threadId?: string | number | null; identity?: OutboundIdentity; deps?: OutboundSendDeps; + mediaAccess?: OutboundMediaAccess; gifPlayback?: boolean; forceDocument?: boolean; abortSignal?: AbortSignal; @@ -481,6 +509,30 @@ async function maybePinDeliveredMessage(params: { } } +async function maybeNotifyAfterDeliveredPayload(params: { + handler: ChannelHandler; + payload: ReplyPayload; + target: ChannelOutboundTargetRef; + results: readonly OutboundDeliveryResult[]; +}): Promise { + if (!params.handler.afterDeliverPayload || params.results.length === 0) { + return; + } + try { + await params.handler.afterDeliverPayload({ + target: params.target, + payload: params.payload, + results: params.results, + }); + } catch (err) { + log.warn("Plugin outbound adapter after-delivery hook failed.", { + channel: params.target.channel, + to: params.target.to, + error: formatErrorMessage(err), + }); + } +} + async function renderPresentationForDelivery( handler: ChannelHandler, payload: ReplyPayload, @@ -591,7 +643,7 @@ async function applyMessageSendingHook(params: { { to: params.to, content: params.payloadSummary.text, - replyToId: params.payload.replyToId ?? params.replyToId ?? undefined, + replyToId: params.replyToId ?? undefined, threadId: params.threadId ?? undefined, metadata: { channel: params.channel, @@ -656,6 +708,8 @@ export async function deliverOutboundPayloads( payloads, threadId: params.threadId, replyToId: params.replyToId, + replyToMode: params.replyToMode, + formatting: params.formatting, bestEffort: params.bestEffort, gifPlayback: params.gifPlayback, forceDocument: params.forceDocument, @@ -742,6 +796,7 @@ async function deliverOutboundPayloadsCore( cfg, agentId: params.session?.agentId ?? params.mirror?.agentId, mediaSources, + mediaAccess: params.mediaAccess, sessionKey: params.session?.key, messageProvider: params.session?.key ? undefined : channel, accountId: params.session?.requesterAccountId ?? accountId, @@ -750,7 +805,7 @@ async function deliverOutboundPayloadsCore( requesterSenderUsername: params.session?.requesterSenderUsername, requesterSenderE164: params.session?.requesterSenderE164, }) - : {}; + : (params.mediaAccess ?? {}); const results: OutboundDeliveryResult[] = []; const handler = await createChannelHandler({ cfg, @@ -759,6 +814,8 @@ async function deliverOutboundPayloadsCore( deps, accountId, replyToId: params.replyToId, + replyToMode: params.replyToMode, + formatting: params.formatting, threadId: params.threadId, identity: params.identity, gifPlayback: params.gifPlayback, @@ -772,22 +829,39 @@ async function deliverOutboundPayloadsCore( fallbackLimit: handler.textChunkLimit, }) : undefined; - const textLimit = handler.resolveEffectiveTextChunkLimit - ? handler.resolveEffectiveTextChunkLimit(configuredTextLimit) - : configuredTextLimit; - const chunkMode = handler.chunker ? resolveChunkMode(cfg, channel, accountId) : "length"; + const textLimit = + params.formatting?.textLimit ?? + (handler.resolveEffectiveTextChunkLimit + ? handler.resolveEffectiveTextChunkLimit(configuredTextLimit) + : configuredTextLimit); + const chunkMode = handler.chunker + ? (params.formatting?.chunkMode ?? resolveChunkMode(cfg, channel, accountId)) + : "length"; + const { resolveCurrentReplyTo, applyReplyToConsumption } = createReplyToDeliveryPolicy({ + replyToId: params.replyToId, + replyToMode: params.replyToMode, + }); + const chunkTextForDelivery = (text: string, limit: number): string[] => + params.formatting + ? handler.chunker!(text, limit, { formatting: params.formatting }) + : handler.chunker!(text, limit); const sendTextChunks = async ( text: string, overrides?: { replyToId?: string | null; + replyToIdSource?: "explicit" | "implicit"; threadId?: string | number | null; audioAsVoice?: boolean; }, ) => { + const consumeReplyTo = >(value: T): T => + applyReplyToConsumption(value, { + consumeImplicitReply: value.replyToIdSource === "implicit", + }); throwIfAborted(abortSignal); if (!handler.chunker || textLimit === undefined) { - results.push(await handler.sendText(text, overrides)); + results.push(await handler.sendText(text, consumeReplyTo(overrides ?? {}))); return; } if (chunkMode === "newline") { @@ -801,21 +875,21 @@ async function deliverOutboundPayloadsCore( blockChunks.push(text); } for (const blockChunk of blockChunks) { - const chunks = handler.chunker(blockChunk, textLimit); + const chunks = chunkTextForDelivery(blockChunk, textLimit); if (!chunks.length && blockChunk) { chunks.push(blockChunk); } for (const chunk of chunks) { throwIfAborted(abortSignal); - results.push(await handler.sendText(chunk, overrides)); + results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {}))); } } return; } - const chunks = handler.chunker(text, textLimit); + const chunks = chunkTextForDelivery(text, textLimit); for (const chunk of chunks) { throwIfAborted(abortSignal); - results.push(await handler.sendText(chunk, overrides)); + results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {}))); } }; const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler); @@ -857,32 +931,51 @@ async function deliverOutboundPayloadsCore( to, channel, accountId, - replyToId: params.replyToId, + replyToId: resolveCurrentReplyTo(payload).replyToId, threadId: params.threadId, }); if (hookResult.cancelled) { continue; } - const effectivePayload = await renderPresentationForDelivery(handler, hookResult.payload); + const renderedPayload = await renderPresentationForDelivery(handler, hookResult.payload); + const normalizedEffectivePayload = handler.normalizePayload + ? handler.normalizePayload(renderedPayload) + : renderedPayload; + const effectivePayload = normalizedEffectivePayload + ? normalizeEmptyPayloadForDelivery(normalizedEffectivePayload) + : null; + if (!effectivePayload) { + continue; + } payloadSummary = buildPayloadSummary(effectivePayload); params.onPayload?.(payloadSummary); + const replyToResolution = resolveCurrentReplyTo(effectivePayload); const sendOverrides = { - replyToId: effectivePayload.replyToId ?? params.replyToId ?? undefined, + replyToId: replyToResolution.replyToId, + replyToIdSource: replyToResolution.source, threadId: params.threadId ?? undefined, audioAsVoice: effectivePayload.audioAsVoice === true ? true : undefined, forceDocument: params.forceDocument, }; + const applySendReplyToConsumption = (overrides: T): T => + applyReplyToConsumption(overrides, { + consumeImplicitReply: replyToResolution.source === "implicit", + }); const deliveryTarget = handler.buildTargetRef({ threadId: sendOverrides.threadId }); if ( handler.sendPayload && - hasReplyPayloadContent({ + (hasReplyPayloadContent({ presentation: effectivePayload.presentation, interactive: effectivePayload.interactive, channelData: effectivePayload.channelData, - }) + }) || + effectivePayload.audioAsVoice === true) ) { - const delivery = await handler.sendPayload(effectivePayload, sendOverrides); + const delivery = await handler.sendPayload( + effectivePayload, + applySendReplyToConsumption(sendOverrides), + ); results.push(delivery); await maybePinDeliveredMessage({ handler, @@ -890,6 +983,12 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, messageId: delivery.messageId, }); + await maybeNotifyAfterDeliveredPayload({ + handler, + payload: effectivePayload, + target: deliveryTarget, + results: [delivery], + }); emitMessageSent({ success: true, content: payloadSummary.text, @@ -900,7 +999,12 @@ async function deliverOutboundPayloadsCore( if (payloadSummary.mediaUrls.length === 0) { const beforeCount = results.length; if (handler.sendFormattedText) { - results.push(...(await handler.sendFormattedText(payloadSummary.text, sendOverrides))); + results.push( + ...(await handler.sendFormattedText( + payloadSummary.text, + applySendReplyToConsumption(sendOverrides), + )), + ); } else { await sendTextChunks(payloadSummary.text, sendOverrides); } @@ -913,6 +1017,12 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, messageId: pinMessageId, }); + await maybeNotifyAfterDeliveredPayload({ + handler, + payload: effectivePayload, + target: deliveryTarget, + results: deliveredResults, + }); emitMessageSent({ success: results.length > beforeCount, content: payloadSummary.text, @@ -947,6 +1057,12 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, messageId: pinMessageId, }); + await maybeNotifyAfterDeliveredPayload({ + handler, + payload: effectivePayload, + target: deliveryTarget, + results: deliveredResults, + }); emitMessageSent({ success: results.length > beforeCount, content: payloadSummary.text, @@ -957,6 +1073,7 @@ async function deliverOutboundPayloadsCore( let firstMessageId: string | undefined; let lastMessageId: string | undefined; + const beforeCount = results.length; await sendMediaWithLeadingCaption({ mediaUrls: payloadSummary.mediaUrls, caption: payloadSummary.text, @@ -966,14 +1083,18 @@ async function deliverOutboundPayloadsCore( const delivery = await handler.sendFormattedMedia( caption ?? "", mediaUrl, - sendOverrides, + applySendReplyToConsumption(sendOverrides), ); results.push(delivery); firstMessageId ??= delivery.messageId; lastMessageId = delivery.messageId; return; } - const delivery = await handler.sendMedia(caption ?? "", mediaUrl, sendOverrides); + const delivery = await handler.sendMedia( + caption ?? "", + mediaUrl, + applySendReplyToConsumption(sendOverrides), + ); results.push(delivery); firstMessageId ??= delivery.messageId; lastMessageId = delivery.messageId; @@ -985,6 +1106,12 @@ async function deliverOutboundPayloadsCore( target: deliveryTarget, messageId: firstMessageId, }); + await maybeNotifyAfterDeliveredPayload({ + handler, + payload: effectivePayload, + target: deliveryTarget, + results: results.slice(beforeCount), + }); emitMessageSent({ success: true, content: payloadSummary.text, diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index 4bfd20cd4fc..1c99408ab03 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -118,6 +118,8 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) payloads: entry.payloads, threadId: entry.threadId, replyToId: entry.replyToId, + replyToMode: entry.replyToMode, + formatting: entry.formatting, bestEffort: entry.bestEffort, gifPlayback: entry.gifPlayback, forceDocument: entry.forceDocument, diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index 83d23b173be..1e1a9a68021 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -2,7 +2,9 @@ import fs from "node:fs"; import path from "node:path"; import type { ReplyPayload } from "../../auto-reply/types.js"; import { resolveStateDir } from "../../config/paths.js"; +import type { ReplyToMode } from "../../config/types.js"; import { generateSecureUuid } from "../secure-random.js"; +import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; import type { OutboundMirror } from "./mirror.js"; import type { OutboundSessionContext } from "./session-context.js"; import type { OutboundChannel } from "./targets.js"; @@ -22,6 +24,8 @@ export type QueuedDeliveryPayload = { payloads: ReplyPayload[]; threadId?: string | number | null; replyToId?: string | null; + replyToMode?: ReplyToMode; + formatting?: OutboundDeliveryFormattingOptions; bestEffort?: boolean; gifPlayback?: boolean; forceDocument?: boolean; @@ -142,6 +146,8 @@ export async function enqueueDelivery( payloads: params.payloads, threadId: params.threadId, replyToId: params.replyToId, + replyToMode: params.replyToMode, + formatting: params.formatting, bestEffort: params.bestEffort, gifPlayback: params.gifPlayback, forceDocument: params.forceDocument, diff --git a/src/infra/outbound/delivery-queue.recovery.test.ts b/src/infra/outbound/delivery-queue.recovery.test.ts index 592935cb80c..21e15736e40 100644 --- a/src/infra/outbound/delivery-queue.recovery.test.ts +++ b/src/infra/outbound/delivery-queue.recovery.test.ts @@ -156,6 +156,14 @@ describe("delivery-queue recovery", () => { channel: "demo-channel-a", to: "+1", payloads: [{ text: "a" }], + replyToId: "root-message", + replyToMode: "first", + formatting: { + textLimit: 1234, + maxLinesPerMessage: 7, + tableMode: "off", + chunkMode: "newline", + }, bestEffort: true, gifPlayback: true, silent: true, @@ -186,6 +194,14 @@ describe("delivery-queue recovery", () => { bestEffort: true, gifPlayback: true, silent: true, + replyToId: "root-message", + replyToMode: "first", + formatting: { + textLimit: 1234, + maxLinesPerMessage: 7, + tableMode: "off", + chunkMode: "newline", + }, gatewayClientScopes: ["operator.write"], mirror: { sessionKey: "agent:main:main", diff --git a/src/infra/outbound/formatting.ts b/src/infra/outbound/formatting.ts new file mode 100644 index 00000000000..6f035e40336 --- /dev/null +++ b/src/infra/outbound/formatting.ts @@ -0,0 +1,9 @@ +import type { ChunkMode } from "../../auto-reply/chunk.js"; +import type { MarkdownTableMode } from "../../config/types.js"; + +export type OutboundDeliveryFormattingOptions = { + textLimit?: number; + maxLinesPerMessage?: number; + tableMode?: MarkdownTableMode; + chunkMode?: ChunkMode; +}; diff --git a/src/infra/outbound/reply-policy.ts b/src/infra/outbound/reply-policy.ts new file mode 100644 index 00000000000..f8a53bb727a --- /dev/null +++ b/src/infra/outbound/reply-policy.ts @@ -0,0 +1,57 @@ +import { isSingleUseReplyToMode } from "../../auto-reply/reply/reply-reference.js"; +import type { ReplyPayload } from "../../auto-reply/types.js"; +import type { ReplyToMode } from "../../config/types.js"; + +export type ReplyToOverride = { + replyToId?: string | null; + replyToIdSource?: ReplyToResolution["source"]; +}; + +export type ReplyToResolution = { + replyToId?: string; + source?: "explicit" | "implicit"; +}; + +export function createReplyToDeliveryPolicy(params: { + replyToId?: string | null; + replyToMode?: ReplyToMode; +}): { + resolveCurrentReplyTo: (payload: ReplyPayload) => ReplyToResolution; + applyReplyToConsumption: ( + overrides: T, + options?: { consumeImplicitReply?: boolean }, + ) => T; +} { + const singleUseReplyTo = params.replyToMode ? isSingleUseReplyToMode(params.replyToMode) : false; + let replyToConsumed = false; + + const resolveCurrentReplyTo = (payload: ReplyPayload): ReplyToResolution => { + if (payload.replyToId != null) { + return payload.replyToId ? { replyToId: payload.replyToId, source: "explicit" } : {}; + } + const replyToId = (params.replyToMode === "off" ? undefined : params.replyToId) ?? undefined; + if (!replyToId) { + return {}; + } + if (!singleUseReplyTo) { + return { replyToId, source: "implicit" }; + } + return replyToConsumed ? {} : { replyToId, source: "implicit" }; + }; + + const applyReplyToConsumption = ( + overrides: T, + options?: { consumeImplicitReply?: boolean }, + ): T => { + if (!options?.consumeImplicitReply || !overrides.replyToId || !singleUseReplyTo) { + return overrides; + } + if (replyToConsumed) { + return { ...overrides, replyToId: undefined }; + } + replyToConsumed = true; + return overrides; + }; + + return { resolveCurrentReplyTo, applyReplyToConsumption }; +} diff --git a/src/plugin-sdk/outbound-runtime.ts b/src/plugin-sdk/outbound-runtime.ts index 7b1aec40769..4b2913aa200 100644 --- a/src/plugin-sdk/outbound-runtime.ts +++ b/src/plugin-sdk/outbound-runtime.ts @@ -1,7 +1,17 @@ export { createRuntimeOutboundDelegates } from "../channels/plugins/runtime-forwarders.js"; export { resolveOutboundSendDep, type OutboundSendDeps } from "../infra/outbound/send-deps.js"; export { resolveAgentOutboundIdentity, type OutboundIdentity } from "../infra/outbound/identity.js"; +export type { OutboundDeliveryFormattingOptions } from "../infra/outbound/formatting.js"; +export { + deliverOutboundPayloads, + type DeliverOutboundPayloadsParams, + type OutboundDeliveryResult, +} from "../infra/outbound/deliver.js"; export { sanitizeForPlainText } from "../infra/outbound/sanitize-text.js"; +export { + buildOutboundSessionContext, + type OutboundSessionContext, +} from "../infra/outbound/session-context.js"; export { createOutboundPayloadPlan, projectOutboundPayloadPlanForDelivery, diff --git a/src/plugin-sdk/reply-payload.test.ts b/src/plugin-sdk/reply-payload.test.ts index 66338a5f839..ed8a469a982 100644 --- a/src/plugin-sdk/reply-payload.test.ts +++ b/src/plugin-sdk/reply-payload.test.ts @@ -13,6 +13,7 @@ import { resolveOutboundMediaUrls, resolveSendableOutboundReplyParts, resolveTextChunksWithFallback, + sendTextMediaPayload, sendMediaWithLeadingCaption, sendPayloadWithChunkedTextAndMedia, } from "./reply-payload.js"; @@ -89,6 +90,83 @@ describe("sendPayloadWithChunkedTextAndMedia", () => { }); }); +describe("sendTextMediaPayload", () => { + it("uses an implicit single-use reply only for the first text chunk", async () => { + const sendText = vi.fn(async ({ text }) => ({ channel: "test", messageId: text })); + + await sendTextMediaPayload({ + channel: "test", + ctx: { + cfg: {}, + to: "target", + text: "", + payload: { text: "abcdef" }, + replyToId: "reply-1", + replyToIdSource: "implicit", + replyToMode: "first", + }, + adapter: { + textChunkLimit: 2, + chunker: (text) => ["ab", "cd", text.slice(4)], + sendText, + }, + }); + + expect(sendText.mock.calls.map((call) => call[0].replyToId)).toEqual([ + "reply-1", + undefined, + undefined, + ]); + }); + + it("uses an implicit single-use reply only for the first media fallback send", async () => { + const sendMedia = vi.fn(async ({ mediaUrl }) => ({ channel: "test", messageId: mediaUrl })); + + await sendTextMediaPayload({ + channel: "test", + ctx: { + cfg: {}, + to: "target", + text: "", + payload: { text: "caption", mediaUrls: ["https://example.com/1", "https://example.com/2"] }, + replyToId: "reply-1", + replyToIdSource: "implicit", + replyToMode: "batched", + }, + adapter: { sendMedia }, + }); + + expect(sendMedia.mock.calls.map((call) => call[0].replyToId)).toEqual(["reply-1", undefined]); + }); + + it("keeps explicit reply tags independent from single-use implicit reply modes", async () => { + const sendText = vi.fn(async ({ text }) => ({ channel: "test", messageId: text })); + + await sendTextMediaPayload({ + channel: "test", + ctx: { + cfg: {}, + to: "target", + text: "", + payload: { text: "abcd" }, + replyToId: "explicit-reply", + replyToIdSource: "explicit", + replyToMode: "first", + }, + adapter: { + textChunkLimit: 2, + chunker: () => ["ab", "cd"], + sendText, + }, + }); + + expect(sendText.mock.calls.map((call) => call[0].replyToId)).toEqual([ + "explicit-reply", + "explicit-reply", + ]); + }); +}); + describe("normalizeOutboundReplyPayload", () => { it("strips internal-only local media trust flags from loose payload objects", () => { expect( diff --git a/src/plugin-sdk/reply-payload.ts b/src/plugin-sdk/reply-payload.ts index 231e34f8f5d..fc93dcbb1bd 100644 --- a/src/plugin-sdk/reply-payload.ts +++ b/src/plugin-sdk/reply-payload.ts @@ -1,4 +1,5 @@ import type { ReplyPayload as InternalReplyPayload } from "../auto-reply/reply-payload.js"; +import { isSingleUseReplyToMode } from "../auto-reply/reply/reply-reference.js"; import type { ChannelOutboundAdapter } from "../channels/plugins/outbound.types.js"; import { normalizeLowercaseStringOrEmpty, readStringValue } from "../shared/string-coerce.js"; @@ -38,6 +39,26 @@ type SendPayloadAdapter = Pick< const REASONING_PREFIX = "reasoning:"; +function createSendPayloadReplyToFanout(ctx: SendPayloadContext): () => string | undefined { + const replyToId = ctx.replyToId ?? undefined; + if (!replyToId) { + return () => undefined; + } + const singleUse = + ctx.replyToIdSource !== "explicit" && + ctx.replyToMode !== undefined && + isSingleUseReplyToMode(ctx.replyToMode); + if (!singleUse) { + return () => replyToId; + } + let current: string | undefined = replyToId; + return () => { + const value = current; + current = undefined; + return value; + }; +} + function trimLeadingMarkdownQuoteMarkers(text: string): string { let candidate = text.trimStart(); while (candidate.startsWith(">")) { @@ -289,6 +310,7 @@ export async function sendTextMediaPayload(params: { if (!text && urls.length === 0) { return { channel: params.channel, messageId: "" }; } + const nextReplyToId = createSendPayloadReplyToFanout(params.ctx); if (urls.length > 0) { const lastResult = await sendPayloadMediaSequence({ text, @@ -298,15 +320,23 @@ export async function sendTextMediaPayload(params: { ...params.ctx, text, mediaUrl, + replyToId: nextReplyToId(), }), }); return lastResult ?? { channel: params.channel, messageId: "" }; } const limit = params.adapter.textChunkLimit; - const chunks = limit && params.adapter.chunker ? params.adapter.chunker(text, limit) : [text]; + const chunks = + limit && params.adapter.chunker + ? params.adapter.chunker(text, limit, { formatting: params.ctx.formatting }) + : [text]; let lastResult: Awaited>>; for (const chunk of chunks) { - lastResult = await params.adapter.sendText!({ ...params.ctx, text: chunk }); + lastResult = await params.adapter.sendText!({ + ...params.ctx, + text: chunk, + replyToId: nextReplyToId(), + }); } return lastResult!; }