diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 49f7446fd17..7f1aeb8323e 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -1,10 +1,4 @@ -import { sendMediaWithLeadingCaption } from "openclaw/plugin-sdk/reply-payload"; -import { - chunkByParagraph, - chunkMarkdownTextWithMode, - resolveChunkMode, - resolveTextChunkLimit, -} from "../../auto-reply/chunk.js"; +import { resolveChunkMode, resolveTextChunkLimit } from "../../auto-reply/chunk.js"; import type { ReplyPayload } from "../../auto-reply/types.js"; import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js"; import type { @@ -45,6 +39,11 @@ import { } from "./delivery-queue.js"; import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; import type { OutboundIdentity } from "./identity.js"; +import { + planOutboundMediaMessageUnits, + planOutboundTextMessageUnits, + type OutboundMessageSendOverrides, +} from "./message-plan.js"; import type { DeliveryMirror } from "./mirror.js"; import { createOutboundPayloadPlan, @@ -82,14 +81,8 @@ async function loadChannelBootstrapRuntime() { return await channelBootstrapRuntimePromise; } -type Chunker = ( - text: string, - limit: number, - ctx?: { formatting?: OutboundDeliveryFormattingOptions }, -) => string[]; - type ChannelHandler = { - chunker: Chunker | null; + chunker: ChannelOutboundAdapter["chunker"] | null; chunkerMode?: "text" | "markdown"; textChunkLimit?: number; supportsMedia: boolean; @@ -111,45 +104,25 @@ type ChannelHandler = { resolveEffectiveTextChunkLimit?: (fallbackLimit?: number) => number | undefined; sendPayload?: ( payload: ReplyPayload, - overrides?: { - replyToId?: string | null; - threadId?: string | number | null; - audioAsVoice?: boolean; - }, + overrides?: OutboundMessageSendOverrides, ) => Promise; sendFormattedText?: ( text: string, - overrides?: { - replyToId?: string | null; - threadId?: string | number | null; - audioAsVoice?: boolean; - }, + overrides?: OutboundMessageSendOverrides, ) => Promise; sendFormattedMedia?: ( caption: string, mediaUrl: string, - overrides?: { - replyToId?: string | null; - threadId?: string | number | null; - audioAsVoice?: boolean; - }, + overrides?: OutboundMessageSendOverrides, ) => Promise; sendText: ( text: string, - overrides?: { - replyToId?: string | null; - threadId?: string | number | null; - audioAsVoice?: boolean; - }, + overrides?: OutboundMessageSendOverrides, ) => Promise; sendMedia: ( caption: string, mediaUrl: string, - overrides?: { - replyToId?: string | null; - threadId?: string | number | null; - audioAsVoice?: boolean; - }, + overrides?: OutboundMessageSendOverrides, ) => Promise; }; @@ -203,11 +176,16 @@ function createPluginHandler( const chunkerMode = outbound.chunkerMode; const resolveCtx = (overrides?: { replyToId?: string | null; + replyToIdSource?: "explicit" | "implicit"; threadId?: string | number | null; audioAsVoice?: boolean; }): Omit => ({ ...baseCtx, replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId, + replyToIdSource: + overrides && "replyToIdSource" in overrides + ? overrides.replyToIdSource + : baseCtx.replyToIdSource, threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId, audioAsVoice: overrides?.audioAsVoice, }); @@ -841,55 +819,27 @@ async function deliverOutboundPayloadsCore( replyToId: params.replyToId, replyToMode: params.replyToMode, }); - const chunkTextForDelivery = (text: string, limit: number): string[] => - params.formatting - ? handler.chunker!(text, limit, { formatting: params.formatting }) - : handler.chunker!(text, limit); - const sendTextChunks = async ( - text: string, - overrides?: { - replyToId?: string | null; - replyToIdSource?: "explicit" | "implicit"; - threadId?: string | number | null; - audioAsVoice?: boolean; - }, - ) => { - const consumeReplyTo = >(value: T): T => - applyReplyToConsumption(value, { - consumeImplicitReply: value.replyToIdSource === "implicit", - }); - throwIfAborted(abortSignal); - if (!handler.chunker || textLimit === undefined) { - results.push(await handler.sendText(text, consumeReplyTo(overrides ?? {}))); - return; - } - if (chunkMode === "newline") { - const mode = handler.chunkerMode ?? "text"; - const blockChunks = - mode === "markdown" - ? chunkMarkdownTextWithMode(text, textLimit, "newline") - : chunkByParagraph(text, textLimit); - - if (!blockChunks.length && text) { - blockChunks.push(text); + const sendTextChunks = async (text: string, overrides: OutboundMessageSendOverrides = {}) => { + const units = planOutboundTextMessageUnits({ + text, + overrides, + chunker: handler.chunker, + chunkerMode: handler.chunkerMode, + textLimit, + chunkMode, + formatting: params.formatting, + consumeReplyTo: (value) => + applyReplyToConsumption(value, { + consumeImplicitReply: value.replyToIdSource === "implicit", + }), + }); + for (const unit of units) { + if (unit.kind !== "text") { + continue; } - for (const blockChunk of blockChunks) { - const chunks = chunkTextForDelivery(blockChunk, textLimit); - if (!chunks.length && blockChunk) { - chunks.push(blockChunk); - } - for (const chunk of chunks) { - throwIfAborted(abortSignal); - results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {}))); - } - } - return; - } - const chunks = chunkTextForDelivery(text, textLimit); - for (const chunk of chunks) { throwIfAborted(abortSignal); - results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {}))); + results.push(await handler.sendText(unit.text, unit.overrides)); } }; const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler); @@ -951,14 +901,16 @@ async function deliverOutboundPayloadsCore( params.onPayload?.(payloadSummary); const replyToResolution = resolveCurrentReplyTo(effectivePayload); - const sendOverrides = { + const sendOverrides: OutboundMessageSendOverrides = { replyToId: replyToResolution.replyToId, replyToIdSource: replyToResolution.source, - threadId: params.threadId ?? undefined, - audioAsVoice: effectivePayload.audioAsVoice === true ? true : undefined, - forceDocument: params.forceDocument, + ...(params.threadId !== undefined ? { threadId: params.threadId } : {}), + ...(effectivePayload.audioAsVoice === true ? { audioAsVoice: true } : {}), + ...(params.forceDocument !== undefined ? { forceDocument: params.forceDocument } : {}), }; - const applySendReplyToConsumption = (overrides: T): T => + const applySendReplyToConsumption = ( + overrides: T, + ): T => applyReplyToConsumption(overrides, { consumeImplicitReply: replyToResolution.source === "implicit", }); @@ -1074,32 +1026,24 @@ async function deliverOutboundPayloadsCore( let firstMessageId: string | undefined; let lastMessageId: string | undefined; const beforeCount = results.length; - await sendMediaWithLeadingCaption({ + const mediaUnits = planOutboundMediaMessageUnits({ mediaUrls: payloadSummary.mediaUrls, caption: payloadSummary.text, - send: async ({ mediaUrl, caption }) => { - throwIfAborted(abortSignal); - if (handler.sendFormattedMedia) { - const delivery = await handler.sendFormattedMedia( - caption ?? "", - mediaUrl, - applySendReplyToConsumption(sendOverrides), - ); - results.push(delivery); - firstMessageId ??= delivery.messageId; - lastMessageId = delivery.messageId; - return; - } - const delivery = await handler.sendMedia( - caption ?? "", - mediaUrl, - applySendReplyToConsumption(sendOverrides), - ); - results.push(delivery); - firstMessageId ??= delivery.messageId; - lastMessageId = delivery.messageId; - }, + overrides: sendOverrides, + consumeReplyTo: applySendReplyToConsumption, }); + for (const unit of mediaUnits) { + if (unit.kind !== "media") { + continue; + } + throwIfAborted(abortSignal); + const delivery = handler.sendFormattedMedia + ? await handler.sendFormattedMedia(unit.caption ?? "", unit.mediaUrl, unit.overrides) + : await handler.sendMedia(unit.caption ?? "", unit.mediaUrl, unit.overrides); + results.push(delivery); + firstMessageId ??= delivery.messageId; + lastMessageId = delivery.messageId; + } await maybePinDeliveredMessage({ handler, payload: effectivePayload, diff --git a/src/infra/outbound/message-plan.test.ts b/src/infra/outbound/message-plan.test.ts new file mode 100644 index 00000000000..19a001ad578 --- /dev/null +++ b/src/infra/outbound/message-plan.test.ts @@ -0,0 +1,88 @@ +import { describe, expect, it } from "vitest"; +import { planOutboundMediaMessageUnits, planOutboundTextMessageUnits } from "./message-plan.js"; +import { createReplyToDeliveryPolicy } from "./reply-policy.js"; + +describe("outbound message planning", () => { + it("plans text chunks with one implicit reply in single-use modes", () => { + const policy = createReplyToDeliveryPolicy({ + replyToId: "reply-1", + replyToMode: "first", + }); + const reply = policy.resolveCurrentReplyTo({}); + const units = planOutboundTextMessageUnits({ + text: "abcd", + textLimit: 2, + chunker: (text, limit) => [text.slice(0, limit), text.slice(limit)], + overrides: { replyToId: reply.replyToId, replyToIdSource: reply.source }, + consumeReplyTo: (overrides) => + policy.applyReplyToConsumption(overrides, { + consumeImplicitReply: overrides.replyToIdSource === "implicit", + }), + }); + + expect( + units.map((unit) => + unit.kind === "text" ? [unit.kind, unit.text, unit.overrides.replyToId] : [unit.kind], + ), + ).toEqual([ + ["text", "ab", "reply-1"], + ["text", "cd", undefined], + ]); + }); + + it("keeps explicit text replies from consuming the implicit slot", () => { + const policy = createReplyToDeliveryPolicy({ + replyToId: "implicit-reply", + replyToMode: "first", + }); + const explicit = policy.resolveCurrentReplyTo({ replyToId: "explicit-reply" }); + const firstUnits = planOutboundTextMessageUnits({ + text: "explicit", + overrides: { replyToId: explicit.replyToId, replyToIdSource: explicit.source }, + consumeReplyTo: (overrides) => + policy.applyReplyToConsumption(overrides, { + consumeImplicitReply: overrides.replyToIdSource === "implicit", + }), + }); + const implicit = policy.resolveCurrentReplyTo({}); + const secondUnits = planOutboundTextMessageUnits({ + text: "implicit", + overrides: { replyToId: implicit.replyToId, replyToIdSource: implicit.source }, + consumeReplyTo: (overrides) => + policy.applyReplyToConsumption(overrides, { + consumeImplicitReply: overrides.replyToIdSource === "implicit", + }), + }); + + expect(firstUnits[0]?.overrides.replyToId).toBe("explicit-reply"); + expect(secondUnits[0]?.overrides.replyToId).toBe("implicit-reply"); + }); + + it("plans media sends with one implicit reply and a leading caption", () => { + const policy = createReplyToDeliveryPolicy({ + replyToId: "reply-1", + replyToMode: "batched", + }); + const reply = policy.resolveCurrentReplyTo({}); + const units = planOutboundMediaMessageUnits({ + caption: "caption", + mediaUrls: ["https://example.com/1.png", "https://example.com/2.png"], + overrides: { replyToId: reply.replyToId, replyToIdSource: reply.source }, + consumeReplyTo: (overrides) => + policy.applyReplyToConsumption(overrides, { + consumeImplicitReply: overrides.replyToIdSource === "implicit", + }), + }); + + expect( + units.map((unit) => + unit.kind === "media" + ? [unit.kind, unit.caption, unit.mediaUrl, unit.overrides.replyToId] + : [unit.kind], + ), + ).toEqual([ + ["media", "caption", "https://example.com/1.png", "reply-1"], + ["media", undefined, "https://example.com/2.png", undefined], + ]); + }); +}); diff --git a/src/infra/outbound/message-plan.ts b/src/infra/outbound/message-plan.ts new file mode 100644 index 00000000000..1f95747a5aa --- /dev/null +++ b/src/infra/outbound/message-plan.ts @@ -0,0 +1,122 @@ +import { + chunkByParagraph, + chunkMarkdownTextWithMode, + type ChunkMode, +} from "../../auto-reply/chunk.js"; +import type { OutboundDeliveryFormattingOptions } from "./formatting.js"; +import type { ReplyToOverride } from "./reply-policy.js"; + +export type OutboundMessageSendOverrides = ReplyToOverride & { + threadId?: string | number | null; + audioAsVoice?: boolean; + forceDocument?: boolean; +}; + +export type OutboundMessageUnit = + | { + kind: "text"; + text: string; + overrides: OutboundMessageSendOverrides; + } + | { + kind: "media"; + caption?: string; + mediaUrl: string; + overrides: OutboundMessageSendOverrides; + }; + +export type OutboundMessageChunker = ( + text: string, + limit: number, + ctx?: { formatting?: OutboundDeliveryFormattingOptions }, +) => string[]; + +type PlanReplyToConsumption = (overrides: T) => T; + +function withPlannedReplyTo( + overrides: OutboundMessageSendOverrides, + consumeReplyTo?: PlanReplyToConsumption, +): OutboundMessageSendOverrides { + return consumeReplyTo ? consumeReplyTo({ ...overrides }) : { ...overrides }; +} + +function chunkTextForPlan(params: { + text: string; + limit: number; + chunker: OutboundMessageChunker; + formatting?: OutboundDeliveryFormattingOptions; +}): string[] { + return params.formatting + ? params.chunker(params.text, params.limit, { formatting: params.formatting }) + : params.chunker(params.text, params.limit); +} + +export function planOutboundTextMessageUnits(params: { + text: string; + overrides: OutboundMessageSendOverrides; + chunker?: OutboundMessageChunker | null; + chunkerMode?: "text" | "markdown"; + textLimit?: number; + chunkMode?: ChunkMode; + formatting?: OutboundDeliveryFormattingOptions; + consumeReplyTo?: PlanReplyToConsumption; +}): OutboundMessageUnit[] { + const planTextUnit = (text: string): OutboundMessageUnit => ({ + kind: "text", + text, + overrides: withPlannedReplyTo(params.overrides, params.consumeReplyTo), + }); + + if (!params.chunker || params.textLimit === undefined) { + return [planTextUnit(params.text)]; + } + + if (params.chunkMode === "newline") { + const blockChunks = + (params.chunkerMode ?? "text") === "markdown" + ? chunkMarkdownTextWithMode(params.text, params.textLimit, "newline") + : chunkByParagraph(params.text, params.textLimit); + + if (!blockChunks.length && params.text) { + blockChunks.push(params.text); + } + + const units: OutboundMessageUnit[] = []; + for (const blockChunk of blockChunks) { + const chunks = chunkTextForPlan({ + text: blockChunk, + limit: params.textLimit, + chunker: params.chunker, + formatting: params.formatting, + }); + if (!chunks.length && blockChunk) { + chunks.push(blockChunk); + } + for (const chunk of chunks) { + units.push(planTextUnit(chunk)); + } + } + return units; + } + + return chunkTextForPlan({ + text: params.text, + limit: params.textLimit, + chunker: params.chunker, + formatting: params.formatting, + }).map(planTextUnit); +} + +export function planOutboundMediaMessageUnits(params: { + caption: string; + mediaUrls: readonly string[]; + overrides: OutboundMessageSendOverrides; + consumeReplyTo?: PlanReplyToConsumption; +}): OutboundMessageUnit[] { + return params.mediaUrls.map((mediaUrl, index) => ({ + kind: "media" as const, + mediaUrl, + ...(index === 0 ? { caption: params.caption } : {}), + overrides: withPlannedReplyTo(params.overrides, params.consumeReplyTo), + })); +} diff --git a/src/infra/outbound/reply-policy.ts b/src/infra/outbound/reply-policy.ts index 11889333c8c..85583143fd8 100644 --- a/src/infra/outbound/reply-policy.ts +++ b/src/infra/outbound/reply-policy.ts @@ -3,8 +3,8 @@ import type { ReplyPayload } from "../../auto-reply/types.js"; import type { ReplyToMode } from "../../config/types.js"; export type ReplyToOverride = { - replyToId?: string | null; - replyToIdSource?: ReplyToResolution["source"]; + replyToId?: string | null | undefined; + replyToIdSource?: ReplyToResolution["source"] | undefined; }; export type ReplyToResolution = {