Files
openclaw/extensions/telegram/src/bot-message.ts

175 lines
5.5 KiB
TypeScript

import type { ReplyToMode } from "openclaw/plugin-sdk/config-types";
import type { TelegramAccountConfig } from "openclaw/plugin-sdk/config-types";
import {
createSubsystemLogger,
danger,
logVerbose,
shouldLogVerbose,
} from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import type { TelegramBotDeps } from "./bot-deps.js";
import {
buildTelegramMessageContext,
type BuildTelegramMessageContextParams,
type TelegramMediaRef,
} from "./bot-message-context.js";
import type { TelegramMessageContextOptions } from "./bot-message-context.types.js";
import { dispatchTelegramMessage } from "./bot-message-dispatch.js";
import type { TelegramBotOptions } from "./bot.types.js";
import { buildTelegramThreadParams } from "./bot/helpers.js";
import type { TelegramContext, TelegramStreamMode } from "./bot/types.js";
import type { TelegramReplyChainEntry } from "./message-cache.js";
const telegramInboundLog = createSubsystemLogger("gateway/channels/telegram").child("inbound");
export function formatTelegramInboundLogLine(params: {
from: string;
to: string;
chatType: string;
body: string;
mediaType?: string;
}): string {
const kindLabel = params.mediaType ? `, ${params.mediaType}` : "";
return `Inbound message ${params.from} -> ${params.to} (${params.chatType}${kindLabel}, ${params.body.length} chars)`;
}
type TelegramMessageProcessorDeps = Omit<
BuildTelegramMessageContextParams,
"primaryCtx" | "allMedia" | "storeAllowFrom" | "options"
> & {
telegramCfg: TelegramAccountConfig;
runtime: RuntimeEnv;
replyToMode: ReplyToMode;
streamMode: TelegramStreamMode;
textLimit: number;
telegramDeps: TelegramBotDeps;
opts: Pick<TelegramBotOptions, "token">;
};
export const createTelegramMessageProcessor = (deps: TelegramMessageProcessorDeps) => {
const {
bot,
cfg,
account,
telegramCfg,
historyLimit,
groupHistories,
dmPolicy,
allowFrom,
groupAllowFrom,
ackReactionScope,
logger,
resolveGroupActivation,
resolveGroupRequireMention,
resolveTelegramGroupConfig,
loadFreshConfig,
sendChatActionHandler,
runtime,
replyToMode,
streamMode,
textLimit,
telegramDeps,
opts,
} = deps;
return async (
primaryCtx: TelegramContext,
allMedia: TelegramMediaRef[],
storeAllowFrom: string[],
options?: TelegramMessageContextOptions,
replyMedia?: TelegramMediaRef[],
replyChain?: TelegramReplyChainEntry[],
) => {
const ingressReceivedAtMs =
typeof options?.receivedAtMs === "number" && Number.isFinite(options.receivedAtMs)
? options.receivedAtMs
: undefined;
const ingressDebugEnabled =
shouldLogVerbose() || process.env.OPENCLAW_DEBUG_TELEGRAM_INGRESS === "1";
const ingressContextStartMs = ingressReceivedAtMs ? Date.now() : undefined;
const context = await buildTelegramMessageContext({
primaryCtx,
allMedia,
replyMedia,
replyChain,
storeAllowFrom,
options,
bot,
cfg,
account,
historyLimit,
groupHistories,
dmPolicy,
allowFrom,
groupAllowFrom,
ackReactionScope,
logger,
resolveGroupActivation,
resolveGroupRequireMention,
resolveTelegramGroupConfig,
sendChatActionHandler,
loadFreshConfig,
upsertPairingRequest: telegramDeps.upsertChannelPairingRequest,
});
if (!context) {
if (ingressDebugEnabled && ingressReceivedAtMs && ingressContextStartMs) {
logVerbose(
`telegram ingress: chatId=${primaryCtx.message.chat.id} dropped after ${Date.now() - ingressReceivedAtMs}ms` +
(options?.ingressBuffer ? ` buffer=${options.ingressBuffer}` : ""),
);
}
return;
}
if (ingressDebugEnabled && ingressReceivedAtMs && ingressContextStartMs) {
logVerbose(
`telegram ingress: chatId=${context.chatId} contextReadyMs=${Date.now() - ingressReceivedAtMs}` +
` preDispatchMs=${Date.now() - ingressContextStartMs}` +
(options?.ingressBuffer ? ` buffer=${options.ingressBuffer}` : ""),
);
}
void context.sendTyping().catch((err) => {
logVerbose(`telegram early typing cue failed for chat ${context.chatId}: ${String(err)}`);
});
telegramInboundLog.info(
formatTelegramInboundLogLine({
from: context.ctxPayload.From,
to: context.primaryCtx.me?.username
? `@${context.primaryCtx.me.username}`
: context.ctxPayload.To,
chatType: context.ctxPayload.ChatType,
body: context.ctxPayload.RawBody,
mediaType: allMedia[0]?.contentType,
}),
);
try {
await dispatchTelegramMessage({
context,
bot,
cfg,
runtime,
replyToMode,
streamMode,
textLimit,
telegramCfg,
telegramDeps,
opts,
});
if (ingressDebugEnabled && ingressReceivedAtMs) {
logVerbose(
`telegram ingress: chatId=${context.chatId} dispatchCompleteMs=${Date.now() - ingressReceivedAtMs}` +
(options?.ingressBuffer ? ` buffer=${options.ingressBuffer}` : ""),
);
}
} catch (err) {
runtime.error?.(danger(`telegram message processing failed: ${String(err)}`));
try {
await bot.api.sendMessage(
context.chatId,
"Something went wrong while processing your request. Please try again.",
buildTelegramThreadParams(context.threadSpec),
);
} catch {}
}
};
};