refactor(outbound): plan text and media sends

This commit is contained in:
Peter Steinberger
2026-04-25 03:12:54 +01:00
parent a3862ffdf1
commit f550aa7622
4 changed files with 269 additions and 115 deletions

View File

@@ -1,10 +1,4 @@
import { sendMediaWithLeadingCaption } from "openclaw/plugin-sdk/reply-payload";
import {
chunkByParagraph,
chunkMarkdownTextWithMode,
resolveChunkMode,
resolveTextChunkLimit,
} from "../../auto-reply/chunk.js";
import { resolveChunkMode, resolveTextChunkLimit } from "../../auto-reply/chunk.js";
import type { ReplyPayload } from "../../auto-reply/types.js";
import { loadChannelOutboundAdapter } from "../../channels/plugins/outbound/load.js";
import type {
@@ -45,6 +39,11 @@ import {
} from "./delivery-queue.js";
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
import type { OutboundIdentity } from "./identity.js";
import {
planOutboundMediaMessageUnits,
planOutboundTextMessageUnits,
type OutboundMessageSendOverrides,
} from "./message-plan.js";
import type { DeliveryMirror } from "./mirror.js";
import {
createOutboundPayloadPlan,
@@ -82,14 +81,8 @@ async function loadChannelBootstrapRuntime() {
return await channelBootstrapRuntimePromise;
}
type Chunker = (
text: string,
limit: number,
ctx?: { formatting?: OutboundDeliveryFormattingOptions },
) => string[];
type ChannelHandler = {
chunker: Chunker | null;
chunker: ChannelOutboundAdapter["chunker"] | null;
chunkerMode?: "text" | "markdown";
textChunkLimit?: number;
supportsMedia: boolean;
@@ -111,45 +104,25 @@ type ChannelHandler = {
resolveEffectiveTextChunkLimit?: (fallbackLimit?: number) => number | undefined;
sendPayload?: (
payload: ReplyPayload,
overrides?: {
replyToId?: string | null;
threadId?: string | number | null;
audioAsVoice?: boolean;
},
overrides?: OutboundMessageSendOverrides,
) => Promise<OutboundDeliveryResult>;
sendFormattedText?: (
text: string,
overrides?: {
replyToId?: string | null;
threadId?: string | number | null;
audioAsVoice?: boolean;
},
overrides?: OutboundMessageSendOverrides,
) => Promise<OutboundDeliveryResult[]>;
sendFormattedMedia?: (
caption: string,
mediaUrl: string,
overrides?: {
replyToId?: string | null;
threadId?: string | number | null;
audioAsVoice?: boolean;
},
overrides?: OutboundMessageSendOverrides,
) => Promise<OutboundDeliveryResult>;
sendText: (
text: string,
overrides?: {
replyToId?: string | null;
threadId?: string | number | null;
audioAsVoice?: boolean;
},
overrides?: OutboundMessageSendOverrides,
) => Promise<OutboundDeliveryResult>;
sendMedia: (
caption: string,
mediaUrl: string,
overrides?: {
replyToId?: string | null;
threadId?: string | number | null;
audioAsVoice?: boolean;
},
overrides?: OutboundMessageSendOverrides,
) => Promise<OutboundDeliveryResult>;
};
@@ -203,11 +176,16 @@ function createPluginHandler(
const chunkerMode = outbound.chunkerMode;
const resolveCtx = (overrides?: {
replyToId?: string | null;
replyToIdSource?: "explicit" | "implicit";
threadId?: string | number | null;
audioAsVoice?: boolean;
}): Omit<ChannelOutboundContext, "text" | "mediaUrl"> => ({
...baseCtx,
replyToId: overrides && "replyToId" in overrides ? overrides.replyToId : baseCtx.replyToId,
replyToIdSource:
overrides && "replyToIdSource" in overrides
? overrides.replyToIdSource
: baseCtx.replyToIdSource,
threadId: overrides && "threadId" in overrides ? overrides.threadId : baseCtx.threadId,
audioAsVoice: overrides?.audioAsVoice,
});
@@ -841,55 +819,27 @@ async function deliverOutboundPayloadsCore(
replyToId: params.replyToId,
replyToMode: params.replyToMode,
});
const chunkTextForDelivery = (text: string, limit: number): string[] =>
params.formatting
? handler.chunker!(text, limit, { formatting: params.formatting })
: handler.chunker!(text, limit);
const sendTextChunks = async (
text: string,
overrides?: {
replyToId?: string | null;
replyToIdSource?: "explicit" | "implicit";
threadId?: string | number | null;
audioAsVoice?: boolean;
},
) => {
const consumeReplyTo = <T extends NonNullable<typeof overrides>>(value: T): T =>
applyReplyToConsumption(value, {
consumeImplicitReply: value.replyToIdSource === "implicit",
});
throwIfAborted(abortSignal);
if (!handler.chunker || textLimit === undefined) {
results.push(await handler.sendText(text, consumeReplyTo(overrides ?? {})));
return;
}
if (chunkMode === "newline") {
const mode = handler.chunkerMode ?? "text";
const blockChunks =
mode === "markdown"
? chunkMarkdownTextWithMode(text, textLimit, "newline")
: chunkByParagraph(text, textLimit);
if (!blockChunks.length && text) {
blockChunks.push(text);
const sendTextChunks = async (text: string, overrides: OutboundMessageSendOverrides = {}) => {
const units = planOutboundTextMessageUnits({
text,
overrides,
chunker: handler.chunker,
chunkerMode: handler.chunkerMode,
textLimit,
chunkMode,
formatting: params.formatting,
consumeReplyTo: (value) =>
applyReplyToConsumption(value, {
consumeImplicitReply: value.replyToIdSource === "implicit",
}),
});
for (const unit of units) {
if (unit.kind !== "text") {
continue;
}
for (const blockChunk of blockChunks) {
const chunks = chunkTextForDelivery(blockChunk, textLimit);
if (!chunks.length && blockChunk) {
chunks.push(blockChunk);
}
for (const chunk of chunks) {
throwIfAborted(abortSignal);
results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {})));
}
}
return;
}
const chunks = chunkTextForDelivery(text, textLimit);
for (const chunk of chunks) {
throwIfAborted(abortSignal);
results.push(await handler.sendText(chunk, consumeReplyTo(overrides ?? {})));
results.push(await handler.sendText(unit.text, unit.overrides));
}
};
const normalizedPayloads = normalizePayloadsForChannelDelivery(outboundPayloadPlan, handler);
@@ -951,14 +901,16 @@ async function deliverOutboundPayloadsCore(
params.onPayload?.(payloadSummary);
const replyToResolution = resolveCurrentReplyTo(effectivePayload);
const sendOverrides = {
const sendOverrides: OutboundMessageSendOverrides = {
replyToId: replyToResolution.replyToId,
replyToIdSource: replyToResolution.source,
threadId: params.threadId ?? undefined,
audioAsVoice: effectivePayload.audioAsVoice === true ? true : undefined,
forceDocument: params.forceDocument,
...(params.threadId !== undefined ? { threadId: params.threadId } : {}),
...(effectivePayload.audioAsVoice === true ? { audioAsVoice: true } : {}),
...(params.forceDocument !== undefined ? { forceDocument: params.forceDocument } : {}),
};
const applySendReplyToConsumption = <T extends typeof sendOverrides>(overrides: T): T =>
const applySendReplyToConsumption = <T extends OutboundMessageSendOverrides>(
overrides: T,
): T =>
applyReplyToConsumption(overrides, {
consumeImplicitReply: replyToResolution.source === "implicit",
});
@@ -1074,32 +1026,24 @@ async function deliverOutboundPayloadsCore(
let firstMessageId: string | undefined;
let lastMessageId: string | undefined;
const beforeCount = results.length;
await sendMediaWithLeadingCaption({
const mediaUnits = planOutboundMediaMessageUnits({
mediaUrls: payloadSummary.mediaUrls,
caption: payloadSummary.text,
send: async ({ mediaUrl, caption }) => {
throwIfAborted(abortSignal);
if (handler.sendFormattedMedia) {
const delivery = await handler.sendFormattedMedia(
caption ?? "",
mediaUrl,
applySendReplyToConsumption(sendOverrides),
);
results.push(delivery);
firstMessageId ??= delivery.messageId;
lastMessageId = delivery.messageId;
return;
}
const delivery = await handler.sendMedia(
caption ?? "",
mediaUrl,
applySendReplyToConsumption(sendOverrides),
);
results.push(delivery);
firstMessageId ??= delivery.messageId;
lastMessageId = delivery.messageId;
},
overrides: sendOverrides,
consumeReplyTo: applySendReplyToConsumption,
});
for (const unit of mediaUnits) {
if (unit.kind !== "media") {
continue;
}
throwIfAborted(abortSignal);
const delivery = handler.sendFormattedMedia
? await handler.sendFormattedMedia(unit.caption ?? "", unit.mediaUrl, unit.overrides)
: await handler.sendMedia(unit.caption ?? "", unit.mediaUrl, unit.overrides);
results.push(delivery);
firstMessageId ??= delivery.messageId;
lastMessageId = delivery.messageId;
}
await maybePinDeliveredMessage({
handler,
payload: effectivePayload,

View File

@@ -0,0 +1,88 @@
import { describe, expect, it } from "vitest";
import { planOutboundMediaMessageUnits, planOutboundTextMessageUnits } from "./message-plan.js";
import { createReplyToDeliveryPolicy } from "./reply-policy.js";
describe("outbound message planning", () => {
it("plans text chunks with one implicit reply in single-use modes", () => {
const policy = createReplyToDeliveryPolicy({
replyToId: "reply-1",
replyToMode: "first",
});
const reply = policy.resolveCurrentReplyTo({});
const units = planOutboundTextMessageUnits({
text: "abcd",
textLimit: 2,
chunker: (text, limit) => [text.slice(0, limit), text.slice(limit)],
overrides: { replyToId: reply.replyToId, replyToIdSource: reply.source },
consumeReplyTo: (overrides) =>
policy.applyReplyToConsumption(overrides, {
consumeImplicitReply: overrides.replyToIdSource === "implicit",
}),
});
expect(
units.map((unit) =>
unit.kind === "text" ? [unit.kind, unit.text, unit.overrides.replyToId] : [unit.kind],
),
).toEqual([
["text", "ab", "reply-1"],
["text", "cd", undefined],
]);
});
it("keeps explicit text replies from consuming the implicit slot", () => {
const policy = createReplyToDeliveryPolicy({
replyToId: "implicit-reply",
replyToMode: "first",
});
const explicit = policy.resolveCurrentReplyTo({ replyToId: "explicit-reply" });
const firstUnits = planOutboundTextMessageUnits({
text: "explicit",
overrides: { replyToId: explicit.replyToId, replyToIdSource: explicit.source },
consumeReplyTo: (overrides) =>
policy.applyReplyToConsumption(overrides, {
consumeImplicitReply: overrides.replyToIdSource === "implicit",
}),
});
const implicit = policy.resolveCurrentReplyTo({});
const secondUnits = planOutboundTextMessageUnits({
text: "implicit",
overrides: { replyToId: implicit.replyToId, replyToIdSource: implicit.source },
consumeReplyTo: (overrides) =>
policy.applyReplyToConsumption(overrides, {
consumeImplicitReply: overrides.replyToIdSource === "implicit",
}),
});
expect(firstUnits[0]?.overrides.replyToId).toBe("explicit-reply");
expect(secondUnits[0]?.overrides.replyToId).toBe("implicit-reply");
});
it("plans media sends with one implicit reply and a leading caption", () => {
const policy = createReplyToDeliveryPolicy({
replyToId: "reply-1",
replyToMode: "batched",
});
const reply = policy.resolveCurrentReplyTo({});
const units = planOutboundMediaMessageUnits({
caption: "caption",
mediaUrls: ["https://example.com/1.png", "https://example.com/2.png"],
overrides: { replyToId: reply.replyToId, replyToIdSource: reply.source },
consumeReplyTo: (overrides) =>
policy.applyReplyToConsumption(overrides, {
consumeImplicitReply: overrides.replyToIdSource === "implicit",
}),
});
expect(
units.map((unit) =>
unit.kind === "media"
? [unit.kind, unit.caption, unit.mediaUrl, unit.overrides.replyToId]
: [unit.kind],
),
).toEqual([
["media", "caption", "https://example.com/1.png", "reply-1"],
["media", undefined, "https://example.com/2.png", undefined],
]);
});
});

View File

@@ -0,0 +1,122 @@
import {
chunkByParagraph,
chunkMarkdownTextWithMode,
type ChunkMode,
} from "../../auto-reply/chunk.js";
import type { OutboundDeliveryFormattingOptions } from "./formatting.js";
import type { ReplyToOverride } from "./reply-policy.js";
export type OutboundMessageSendOverrides = ReplyToOverride & {
threadId?: string | number | null;
audioAsVoice?: boolean;
forceDocument?: boolean;
};
export type OutboundMessageUnit =
| {
kind: "text";
text: string;
overrides: OutboundMessageSendOverrides;
}
| {
kind: "media";
caption?: string;
mediaUrl: string;
overrides: OutboundMessageSendOverrides;
};
export type OutboundMessageChunker = (
text: string,
limit: number,
ctx?: { formatting?: OutboundDeliveryFormattingOptions },
) => string[];
type PlanReplyToConsumption = <T extends OutboundMessageSendOverrides>(overrides: T) => T;
function withPlannedReplyTo(
overrides: OutboundMessageSendOverrides,
consumeReplyTo?: PlanReplyToConsumption,
): OutboundMessageSendOverrides {
return consumeReplyTo ? consumeReplyTo({ ...overrides }) : { ...overrides };
}
function chunkTextForPlan(params: {
text: string;
limit: number;
chunker: OutboundMessageChunker;
formatting?: OutboundDeliveryFormattingOptions;
}): string[] {
return params.formatting
? params.chunker(params.text, params.limit, { formatting: params.formatting })
: params.chunker(params.text, params.limit);
}
export function planOutboundTextMessageUnits(params: {
text: string;
overrides: OutboundMessageSendOverrides;
chunker?: OutboundMessageChunker | null;
chunkerMode?: "text" | "markdown";
textLimit?: number;
chunkMode?: ChunkMode;
formatting?: OutboundDeliveryFormattingOptions;
consumeReplyTo?: PlanReplyToConsumption;
}): OutboundMessageUnit[] {
const planTextUnit = (text: string): OutboundMessageUnit => ({
kind: "text",
text,
overrides: withPlannedReplyTo(params.overrides, params.consumeReplyTo),
});
if (!params.chunker || params.textLimit === undefined) {
return [planTextUnit(params.text)];
}
if (params.chunkMode === "newline") {
const blockChunks =
(params.chunkerMode ?? "text") === "markdown"
? chunkMarkdownTextWithMode(params.text, params.textLimit, "newline")
: chunkByParagraph(params.text, params.textLimit);
if (!blockChunks.length && params.text) {
blockChunks.push(params.text);
}
const units: OutboundMessageUnit[] = [];
for (const blockChunk of blockChunks) {
const chunks = chunkTextForPlan({
text: blockChunk,
limit: params.textLimit,
chunker: params.chunker,
formatting: params.formatting,
});
if (!chunks.length && blockChunk) {
chunks.push(blockChunk);
}
for (const chunk of chunks) {
units.push(planTextUnit(chunk));
}
}
return units;
}
return chunkTextForPlan({
text: params.text,
limit: params.textLimit,
chunker: params.chunker,
formatting: params.formatting,
}).map(planTextUnit);
}
export function planOutboundMediaMessageUnits(params: {
caption: string;
mediaUrls: readonly string[];
overrides: OutboundMessageSendOverrides;
consumeReplyTo?: PlanReplyToConsumption;
}): OutboundMessageUnit[] {
return params.mediaUrls.map((mediaUrl, index) => ({
kind: "media" as const,
mediaUrl,
...(index === 0 ? { caption: params.caption } : {}),
overrides: withPlannedReplyTo(params.overrides, params.consumeReplyTo),
}));
}

View File

@@ -3,8 +3,8 @@ import type { ReplyPayload } from "../../auto-reply/types.js";
import type { ReplyToMode } from "../../config/types.js";
export type ReplyToOverride = {
replyToId?: string | null;
replyToIdSource?: ReplyToResolution["source"];
replyToId?: string | null | undefined;
replyToIdSource?: ReplyToResolution["source"] | undefined;
};
export type ReplyToResolution = {