refactor(telegram): unify outbound delivery adapter

This commit is contained in:
Ayaan Zaidi
2026-05-07 08:55:27 +05:30
parent a966303216
commit 6554e85ad6
4 changed files with 211 additions and 448 deletions

View File

@@ -11,14 +11,8 @@ import {
createChatChannelPlugin,
} from "openclaw/plugin-sdk/channel-core";
import { createAccountStatusSink } from "openclaw/plugin-sdk/channel-lifecycle";
import {
createMessageReceiptFromOutboundResults,
defineChannelMessageAdapter,
type ChannelMessageSendResult,
type MessageReceiptPartKind,
} from "openclaw/plugin-sdk/channel-message";
import { createChannelMessageAdapterFromOutbound } from "openclaw/plugin-sdk/channel-message";
import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing";
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
import {
PAIRING_APPROVED_MESSAGE,
buildTokenChannelStatusSummary,
@@ -62,9 +56,8 @@ import {
import { resolveTelegramInlineButtonsScope } from "./inline-buttons.js";
import * as monitorModule from "./monitor.js";
import { looksLikeTelegramTargetId, normalizeTelegramMessagingTarget } from "./normalize.js";
import { sendTelegramPayloadMessages } from "./outbound-adapter.js";
import { telegramOutboundBaseAdapter } from "./outbound-base.js";
import { parseTelegramReplyToMessageId, parseTelegramThreadId } from "./outbound-params.js";
import { createTelegramOutboundAdapter } from "./outbound-adapter.js";
import { parseTelegramThreadId } from "./outbound-params.js";
import type { TelegramProbe } from "./probe.js";
import * as probeModule from "./probe.js";
import { resolveTelegramReactionLevel } from "./reaction-level.js";
@@ -108,8 +101,6 @@ async function loadTelegramUpdateOffsetRuntime() {
return await telegramUpdateOffsetRuntimePromise;
}
type TelegramSendOptions = NonNullable<Parameters<TelegramSendFn>[2]>;
function resolveTelegramProbe() {
return (
getOptionalTelegramRuntime()?.channel?.telegram?.probeTelegram ?? probeModule.probeTelegram
@@ -169,115 +160,39 @@ function resolveTelegramTokenHelper() {
);
}
function buildTelegramSendOptions(params: {
cfg: OpenClawConfig;
mediaUrl?: string | null;
mediaLocalRoots?: readonly string[] | null;
mediaReadFile?: ((filePath: string) => Promise<Buffer>) | null;
accountId?: string | null;
replyToId?: string | null;
threadId?: string | number | null;
silent?: boolean | null;
forceDocument?: boolean | null;
gatewayClientScopes?: readonly string[] | null;
}): TelegramSendOptions {
return {
verbose: false,
cfg: params.cfg,
...(params.mediaUrl ? { mediaUrl: params.mediaUrl } : {}),
...(params.mediaLocalRoots?.length ? { mediaLocalRoots: params.mediaLocalRoots } : {}),
...(params.mediaReadFile ? { mediaReadFile: params.mediaReadFile } : {}),
messageThreadId: parseTelegramThreadId(params.threadId),
replyToMessageId: parseTelegramReplyToMessageId(params.replyToId),
accountId: params.accountId ?? undefined,
silent: params.silent ?? undefined,
forceDocument: params.forceDocument ?? undefined,
...(Array.isArray(params.gatewayClientScopes)
? { gatewayClientScopes: [...params.gatewayClientScopes] }
: {}),
};
}
async function sendTelegramOutbound(params: {
cfg: OpenClawConfig;
to: string;
text: string;
mediaUrl?: string | null;
mediaLocalRoots?: readonly string[] | null;
mediaReadFile?: ((filePath: string) => Promise<Buffer>) | null;
accountId?: string | null;
deps?: OutboundSendDeps;
replyToId?: string | null;
threadId?: string | number | null;
silent?: boolean | null;
forceDocument?: boolean | null;
gatewayClientScopes?: readonly string[] | null;
}) {
const send = await resolveTelegramSend(params.deps);
return await send(
params.to,
params.text,
buildTelegramSendOptions({
cfg: params.cfg,
mediaUrl: params.mediaUrl,
mediaLocalRoots: params.mediaLocalRoots,
mediaReadFile: params.mediaReadFile,
accountId: params.accountId,
replyToId: params.replyToId,
threadId: params.threadId,
silent: params.silent,
forceDocument: params.forceDocument,
gatewayClientScopes: params.gatewayClientScopes,
const telegramChannelOutbound = createTelegramOutboundAdapter({
resolveSend: resolveTelegramSend,
loadSendModule: loadTelegramSendModule,
shouldSuppressLocalPayloadPrompt: ({ cfg, accountId, payload }) =>
shouldSuppressLocalTelegramExecApprovalPrompt({
cfg,
accountId,
payload,
}),
);
}
type TelegramMessageSendSourceResult = {
messageId?: string;
chatId?: string;
receipt?: ChannelMessageSendResult["receipt"];
};
function toTelegramMessageSendResult(
result: TelegramMessageSendSourceResult,
kind: MessageReceiptPartKind,
replyToId?: string | null,
): ChannelMessageSendResult {
const receipt =
result.receipt ??
createMessageReceiptFromOutboundResults({
results: result.messageId
? [
{
channel: "telegram",
messageId: result.messageId,
chatId: result.chatId,
},
]
: [],
kind,
...(replyToId ? { replyToId } : {}),
});
return {
messageId: result.messageId || receipt.primaryPlatformMessageId,
receipt,
};
}
const telegramMessageAdapter = defineChannelMessageAdapter({
id: "telegram",
durableFinal: {
capabilities: {
text: true,
media: true,
payload: true,
silent: true,
replyTo: true,
thread: true,
messageSendingHooks: true,
batch: true,
},
beforeDeliverPayload: async ({ cfg, target, hint }) => {
if (hint?.kind !== "approval-pending" || hint.approvalKind !== "exec") {
return;
}
const threadId =
typeof target.threadId === "number"
? target.threadId
: typeof target.threadId === "string"
? Number.parseInt(target.threadId, 10)
: undefined;
const { sendTypingTelegram } = await loadTelegramSendModule();
await sendTypingTelegram(target.to, {
cfg,
accountId: target.accountId ?? undefined,
...(Number.isFinite(threadId) ? { messageThreadId: threadId } : {}),
}).catch(() => {});
},
shouldTreatDeliveredTextAsVisible: shouldTreatTelegramDeliveredTextAsVisible,
targetsMatchForReplySuppression: targetsMatchTelegramReplySuppression,
preferFinalAssistantVisibleText: true,
});
const telegramMessageAdapter = createChannelMessageAdapterFromOutbound<OpenClawConfig>({
id: "telegram",
live: {
capabilities: {
draftPreview: true,
@@ -297,70 +212,7 @@ const telegramMessageAdapter = defineChannelMessageAdapter({
defaultAckPolicy: "after_agent_dispatch",
supportedAckPolicies: ["after_receive_record", "after_agent_dispatch"],
},
send: {
text: async (ctx) =>
toTelegramMessageSendResult(
await sendTelegramOutbound({
cfg: ctx.cfg,
to: ctx.to,
text: ctx.text,
accountId: ctx.accountId,
deps: ctx.deps,
replyToId: ctx.replyToId,
threadId: ctx.threadId,
silent: ctx.silent,
gatewayClientScopes: ctx.gatewayClientScopes,
}),
"text",
ctx.replyToId,
),
media: async (ctx) =>
toTelegramMessageSendResult(
await sendTelegramOutbound({
cfg: ctx.cfg,
to: ctx.to,
text: ctx.text,
mediaUrl: ctx.mediaUrl,
mediaLocalRoots: ctx.mediaLocalRoots,
mediaReadFile: ctx.mediaReadFile,
accountId: ctx.accountId,
deps: ctx.deps,
replyToId: ctx.replyToId,
threadId: ctx.threadId,
silent: ctx.silent,
forceDocument: ctx.forceDocument,
gatewayClientScopes: ctx.gatewayClientScopes,
}),
"media",
ctx.replyToId,
),
payload: async (ctx) => {
const send = await resolveTelegramSend(ctx.deps);
const result = attachChannelToResult(
"telegram",
await sendTelegramPayloadMessages({
send,
to: ctx.to,
payload: ctx.payload,
baseOpts: {
...buildTelegramSendOptions({
cfg: ctx.cfg,
mediaUrl: ctx.mediaUrl,
mediaLocalRoots: ctx.mediaLocalRoots,
accountId: ctx.accountId,
replyToId: ctx.replyToId,
threadId: ctx.threadId,
silent: ctx.silent,
forceDocument: ctx.forceDocument,
gatewayClientScopes: ctx.gatewayClientScopes,
}),
...(ctx.mediaReadFile ? { mediaReadFile: ctx.mediaReadFile } : {}),
},
}),
);
return toTelegramMessageSendResult(result, "unknown", ctx.replyToId);
},
},
outbound: telegramChannelOutbound,
});
const telegramMessageActions: ChannelMessageActionAdapter = {
@@ -1179,142 +1031,5 @@ export const telegramPlugin = createChatChannelPlugin({
return to.includes(":topic:") ? to : `${to}:topic:${threadId}`;
},
},
outbound: {
base: {
...telegramOutboundBaseAdapter,
shouldSuppressLocalPayloadPrompt: ({ cfg, accountId, payload }) =>
shouldSuppressLocalTelegramExecApprovalPrompt({
cfg,
accountId,
payload,
}),
beforeDeliverPayload: async ({ cfg, target, hint }) => {
if (hint?.kind !== "approval-pending" || hint.approvalKind !== "exec") {
return;
}
const threadId =
typeof target.threadId === "number"
? target.threadId
: typeof target.threadId === "string"
? Number.parseInt(target.threadId, 10)
: undefined;
const { sendTypingTelegram } = await loadTelegramSendModule();
await sendTypingTelegram(target.to, {
cfg,
accountId: target.accountId ?? undefined,
...(Number.isFinite(threadId) ? { messageThreadId: threadId } : {}),
}).catch(() => {});
},
shouldSkipPlainTextSanitization: ({ payload }) => Boolean(payload.channelData),
shouldTreatDeliveredTextAsVisible: shouldTreatTelegramDeliveredTextAsVisible,
preferFinalAssistantVisibleText: true,
targetsMatchForReplySuppression: targetsMatchTelegramReplySuppression,
resolveEffectiveTextChunkLimit: ({ fallbackLimit }) =>
typeof fallbackLimit === "number" ? Math.min(fallbackLimit, 4096) : 4096,
supportsPollDurationSeconds: true,
supportsAnonymousPolls: true,
sendPayload: async ({
cfg,
to,
payload,
mediaLocalRoots,
accountId,
deps,
replyToId,
threadId,
silent,
forceDocument,
gatewayClientScopes,
}) => {
const send = await resolveTelegramSend(deps);
const result = await sendTelegramPayloadMessages({
send,
to,
payload,
baseOpts: buildTelegramSendOptions({
cfg,
mediaLocalRoots,
accountId,
replyToId,
threadId,
silent,
forceDocument,
gatewayClientScopes,
}),
});
return attachChannelToResult("telegram", result);
},
},
attachedResults: {
channel: "telegram",
sendText: async ({
cfg,
to,
text,
accountId,
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
}) =>
await sendTelegramOutbound({
cfg,
to,
text,
accountId,
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
}),
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
}) =>
await sendTelegramOutbound({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
accountId,
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
}),
sendPoll: async ({
cfg,
to,
poll,
accountId,
threadId,
silent,
isAnonymous,
gatewayClientScopes,
}) => {
const { sendPollTelegram } = await loadTelegramSendModule();
return await sendPollTelegram(to, poll, {
cfg,
accountId: accountId ?? undefined,
messageThreadId: parseTelegramThreadId(threadId),
silent: silent ?? undefined,
isAnonymous: isAnonymous ?? undefined,
gatewayClientScopes,
});
},
},
},
outbound: telegramChannelOutbound,
});

View File

@@ -22,20 +22,29 @@ import { resolveTelegramInlineButtons } from "./button-types.js";
import { markdownToTelegramHtmlChunks } from "./format.js";
import { resolveTelegramInteractiveTextFallback } from "./interactive-fallback.js";
import { parseTelegramReplyToMessageId, parseTelegramThreadId } from "./outbound-params.js";
import { pinMessageTelegram } from "./send.js";
export const TELEGRAM_TEXT_CHUNK_LIMIT = 4000;
type TelegramSendFn = typeof import("./send.js").sendMessageTelegram;
type TelegramSendModule = typeof import("./send.js");
type TelegramSendOpts = Parameters<TelegramSendFn>[2];
type ResolveTelegramSendFn = (deps?: OutboundSendDeps) => Promise<TelegramSendFn>;
type LoadTelegramSendModuleFn = () => Promise<TelegramSendModule>;
let telegramSendModulePromise: Promise<typeof import("./send.js")> | undefined;
async function loadTelegramSendModule() {
async function loadTelegramSendModule(): Promise<TelegramSendModule> {
telegramSendModulePromise ??= import("./send.js");
return await telegramSendModulePromise;
}
async function resolveDefaultTelegramSend(deps?: OutboundSendDeps): Promise<TelegramSendFn> {
return (
resolveOutboundSendDep<TelegramSendFn>(deps, "telegram") ??
(await loadTelegramSendModule()).sendMessageTelegram
);
}
async function resolveTelegramSendContext(params: {
cfg: NonNullable<TelegramSendOpts>["cfg"];
deps?: OutboundSendDeps;
@@ -44,6 +53,7 @@ async function resolveTelegramSendContext(params: {
threadId?: string | number | null;
silent?: boolean;
gatewayClientScopes?: readonly string[];
resolveSend: ResolveTelegramSendFn;
}): Promise<{
send: TelegramSendFn;
baseOpts: {
@@ -57,9 +67,7 @@ async function resolveTelegramSendContext(params: {
gatewayClientScopes?: readonly string[];
};
}> {
const send =
resolveOutboundSendDep<TelegramSendFn>(params.deps, "telegram") ??
(await loadTelegramSendModule()).sendMessageTelegram;
const send = await params.resolveSend(params.deps);
return {
send,
baseOpts: {
@@ -75,6 +83,16 @@ async function resolveTelegramSendContext(params: {
};
}
export type CreateTelegramOutboundAdapterOptions = {
resolveSend?: ResolveTelegramSendFn;
loadSendModule?: LoadTelegramSendModuleFn;
beforeDeliverPayload?: ChannelOutboundAdapter["beforeDeliverPayload"];
shouldSuppressLocalPayloadPrompt?: ChannelOutboundAdapter["shouldSuppressLocalPayloadPrompt"];
shouldTreatDeliveredTextAsVisible?: ChannelOutboundAdapter["shouldTreatDeliveredTextAsVisible"];
targetsMatchForReplySuppression?: ChannelOutboundAdapter["targetsMatchForReplySuppression"];
preferFinalAssistantVisibleText?: boolean;
};
export async function sendTelegramPayloadMessages(params: {
send: TelegramSendFn;
to: string;
@@ -121,81 +139,129 @@ export async function sendTelegramPayloadMessages(params: {
});
}
export const telegramOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
chunker: markdownToTelegramHtmlChunks,
chunkerMode: "markdown",
extractMarkdownImages: true,
textChunkLimit: TELEGRAM_TEXT_CHUNK_LIMIT,
sanitizeText: ({ text }) => sanitizeForPlainText(text),
shouldSkipPlainTextSanitization: ({ payload }) => Boolean(payload.channelData),
presentationCapabilities: {
supported: true,
buttons: true,
selects: true,
context: true,
divider: false,
},
deliveryCapabilities: {
pin: true,
durableFinal: {
text: true,
media: true,
payload: true,
silent: true,
replyTo: true,
thread: true,
nativeQuote: false,
messageSendingHooks: true,
batch: true,
export function createTelegramOutboundAdapter(
options: CreateTelegramOutboundAdapterOptions = {},
): ChannelOutboundAdapter {
const resolveSend = options.resolveSend ?? resolveDefaultTelegramSend;
const loadSendModule = options.loadSendModule ?? loadTelegramSendModule;
return {
deliveryMode: "direct",
chunker: markdownToTelegramHtmlChunks,
chunkerMode: "markdown",
extractMarkdownImages: true,
textChunkLimit: TELEGRAM_TEXT_CHUNK_LIMIT,
sanitizeText: ({ text }) => sanitizeForPlainText(text),
shouldSkipPlainTextSanitization: ({ payload }) => Boolean(payload.channelData),
shouldSuppressLocalPayloadPrompt: options.shouldSuppressLocalPayloadPrompt,
beforeDeliverPayload: options.beforeDeliverPayload,
shouldTreatDeliveredTextAsVisible: options.shouldTreatDeliveredTextAsVisible,
targetsMatchForReplySuppression: options.targetsMatchForReplySuppression,
preferFinalAssistantVisibleText: options.preferFinalAssistantVisibleText,
presentationCapabilities: {
supported: true,
buttons: true,
selects: true,
context: true,
divider: false,
},
},
renderPresentation: ({ payload, presentation }) => ({
...payload,
text: renderMessagePresentationFallbackText({ text: payload.text, presentation }),
interactive: presentationToInteractiveReply(presentation),
}),
pinDeliveredMessage: async ({ cfg, target, messageId, pin }) => {
await pinMessageTelegram(target.to, messageId, {
cfg,
accountId: target.accountId ?? undefined,
notify: pin.notify,
verbose: false,
});
},
resolveEffectiveTextChunkLimit: ({ fallbackLimit }) =>
typeof fallbackLimit === "number" ? Math.min(fallbackLimit, 4096) : 4096,
...createAttachedChannelResultAdapter({
channel: "telegram",
sendText: async ({
cfg,
to,
text,
accountId,
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
deliveryCapabilities: {
pin: true,
durableFinal: {
text: true,
media: true,
payload: true,
silent: true,
replyTo: true,
thread: true,
nativeQuote: false,
messageSendingHooks: true,
batch: true,
},
},
renderPresentation: ({ payload, presentation }) => ({
...payload,
text: renderMessagePresentationFallbackText({ text: payload.text, presentation }),
interactive: presentationToInteractiveReply(presentation),
}),
pinDeliveredMessage: async ({ cfg, target, messageId, pin }) => {
const { pinMessageTelegram } = await loadSendModule();
await pinMessageTelegram(target.to, messageId, {
cfg,
deps,
accountId: target.accountId ?? undefined,
notify: pin.notify,
verbose: false,
});
},
resolveEffectiveTextChunkLimit: ({ fallbackLimit }) =>
typeof fallbackLimit === "number" ? Math.min(fallbackLimit, 4096) : 4096,
supportsPollDurationSeconds: true,
supportsAnonymousPolls: true,
...createAttachedChannelResultAdapter({
channel: "telegram",
sendText: async ({
cfg,
to,
text,
accountId,
deps,
replyToId,
threadId,
silent,
gatewayClientScopes,
});
return await send(to, text, {
...baseOpts,
});
},
sendMedia: async ({
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
cfg,
deps,
accountId,
replyToId,
threadId,
silent,
gatewayClientScopes,
resolveSend,
});
return await send(to, text, {
...baseOpts,
});
},
sendMedia: async ({
cfg,
to,
text,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
accountId,
deps,
replyToId,
threadId,
forceDocument,
silent,
gatewayClientScopes,
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
cfg,
deps,
accountId,
replyToId,
threadId,
silent,
gatewayClientScopes,
resolveSend,
});
return await send(to, text, {
...baseOpts,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
forceDocument: forceDocument ?? false,
});
},
}),
sendPayload: async ({
cfg,
to,
text,
mediaUrl,
payload,
mediaLocalRoots,
mediaReadFile,
accountId,
@@ -214,50 +280,42 @@ export const telegramOutbound: ChannelOutboundAdapter = {
threadId,
silent,
gatewayClientScopes,
resolveSend,
});
return await send(to, text, {
...baseOpts,
mediaUrl,
mediaLocalRoots,
mediaReadFile,
forceDocument: forceDocument ?? false,
const result = await sendTelegramPayloadMessages({
send,
to,
payload,
baseOpts: {
...baseOpts,
mediaLocalRoots,
mediaReadFile,
forceDocument: forceDocument ?? false,
},
});
return attachChannelToResult("telegram", result);
},
}),
sendPayload: async ({
cfg,
to,
payload,
mediaLocalRoots,
mediaReadFile,
accountId,
deps,
replyToId,
threadId,
forceDocument,
silent,
gatewayClientScopes,
}) => {
const { send, baseOpts } = await resolveTelegramSendContext({
sendPoll: async ({
cfg,
deps,
to,
poll,
accountId,
replyToId,
threadId,
silent,
isAnonymous,
gatewayClientScopes,
});
const result = await sendTelegramPayloadMessages({
send,
to,
payload,
baseOpts: {
...baseOpts,
mediaLocalRoots,
mediaReadFile,
forceDocument: forceDocument ?? false,
},
});
return attachChannelToResult("telegram", result);
},
};
}) => {
const { sendPollTelegram } = await loadSendModule();
return await sendPollTelegram(to, poll, {
cfg,
accountId: accountId ?? undefined,
messageThreadId: parseTelegramThreadId(threadId),
silent: silent ?? undefined,
isAnonymous: isAnonymous ?? undefined,
gatewayClientScopes,
});
},
};
}
export const telegramOutbound: ChannelOutboundAdapter = createTelegramOutboundAdapter();

View File

@@ -1,10 +0,0 @@
import { chunkMarkdownText } from "openclaw/plugin-sdk/reply-runtime";
export const telegramOutboundBaseAdapter = {
deliveryMode: "direct" as const,
chunker: chunkMarkdownText,
chunkerMode: "markdown" as const,
extractMarkdownImages: true,
textChunkLimit: 4000,
pollMaxOptions: 10,
};

View File

@@ -1,17 +1,17 @@
import { chunkMarkdownText } from "openclaw/plugin-sdk/reply-runtime";
import { describe, expect, it } from "vitest";
import { telegramOutboundBaseAdapter } from "./outbound-base.js";
import { markdownToTelegramHtmlChunks } from "./format.js";
import { telegramOutbound } from "./outbound-adapter.js";
import { clearTelegramRuntime } from "./runtime.js";
describe("telegramPlugin outbound", () => {
it("uses static chunking when Telegram runtime is uninitialized", () => {
clearTelegramRuntime();
const text = `${"hello\n".repeat(1200)}tail`;
const expected = chunkMarkdownText(text, 4000);
const expected = markdownToTelegramHtmlChunks(text, 4000);
expect(telegramOutboundBaseAdapter.chunker(text, 4000)).toEqual(expected);
expect(telegramOutboundBaseAdapter.deliveryMode).toBe("direct");
expect(telegramOutboundBaseAdapter.chunkerMode).toBe("markdown");
expect(telegramOutboundBaseAdapter.textChunkLimit).toBe(4000);
expect(telegramOutbound.chunker?.(text, 4000)).toEqual(expected);
expect(telegramOutbound.deliveryMode).toBe("direct");
expect(telegramOutbound.chunkerMode).toBe("markdown");
expect(telegramOutbound.textChunkLimit).toBe(4000);
});
});