mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-25 16:12:13 +00:00
refactor: move Telegram channel implementation to extensions/ (#45635)
* refactor: move Telegram channel implementation to extensions/telegram/src/ Move all Telegram channel code (123 files + 10 bot/ files + 8 channel plugin files) from src/telegram/ and src/channels/plugins/*/telegram.ts to extensions/telegram/src/. Leave thin re-export shims at original locations so cross-cutting src/ imports continue to resolve. - Fix all relative import paths in moved files (../X/ -> ../../../src/X/) - Fix vi.mock paths in 60 test files - Fix inline typeof import() expressions - Update tsconfig.plugin-sdk.dts.json rootDir to "." for cross-directory DTS - Update write-plugin-sdk-entry-dts.ts for new rootDir structure - Move channel plugin files with correct path remapping * fix: support keyed telegram send deps * fix: sync telegram extension copies with latest main * fix: correct import paths and remove misplaced files in telegram extension * fix: sync outbound-adapter with main (add sendTelegramPayloadMessages) and fix delivery.test import path
This commit is contained in:
@@ -1,849 +1 @@
|
||||
import type { Bot } from "grammy";
|
||||
import { resolveAgentDir } from "../agents/agent-scope.js";
|
||||
import {
|
||||
findModelInCatalog,
|
||||
loadModelCatalog,
|
||||
modelSupportsVision,
|
||||
} from "../agents/model-catalog.js";
|
||||
import { resolveDefaultModelForAgent } from "../agents/model-selection.js";
|
||||
import { resolveChunkMode } from "../auto-reply/chunk.js";
|
||||
import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js";
|
||||
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
import { removeAckReactionAfterReply } from "../channels/ack-reactions.js";
|
||||
import { logAckFailure, logTypingFailure } from "../channels/logging.js";
|
||||
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
|
||||
import { createTypingCallbacks } from "../channels/typing.js";
|
||||
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveSessionStoreEntry,
|
||||
resolveStorePath,
|
||||
} from "../config/sessions.js";
|
||||
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
|
||||
import { danger, logVerbose } from "../globals.js";
|
||||
import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { TelegramMessageContext } from "./bot-message-context.js";
|
||||
import type { TelegramBotOptions } from "./bot.js";
|
||||
import { deliverReplies } from "./bot/delivery.js";
|
||||
import type { TelegramStreamMode } from "./bot/types.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { createTelegramDraftStream } from "./draft-stream.js";
|
||||
import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js";
|
||||
import { renderTelegramHtmlText } from "./format.js";
|
||||
import {
|
||||
type ArchivedPreview,
|
||||
createLaneDeliveryStateTracker,
|
||||
createLaneTextDeliverer,
|
||||
type DraftLaneState,
|
||||
type LaneName,
|
||||
type LanePreviewLifecycle,
|
||||
} from "./lane-delivery.js";
|
||||
import {
|
||||
createTelegramReasoningStepState,
|
||||
splitTelegramReasoningText,
|
||||
} from "./reasoning-lane-coordinator.js";
|
||||
import { editMessageTelegram } from "./send.js";
|
||||
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
|
||||
|
||||
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
|
||||
|
||||
/** Minimum chars before sending first streaming message (improves push notification UX) */
|
||||
const DRAFT_MIN_INITIAL_CHARS = 30;
|
||||
|
||||
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
|
||||
try {
|
||||
const catalog = await loadModelCatalog({ config: cfg });
|
||||
const defaultModel = resolveDefaultModelForAgent({ cfg, agentId });
|
||||
const entry = findModelInCatalog(catalog, defaultModel.provider, defaultModel.model);
|
||||
if (!entry) {
|
||||
return false;
|
||||
}
|
||||
return modelSupportsVision(entry);
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
export function pruneStickerMediaFromContext(
|
||||
ctxPayload: {
|
||||
MediaPath?: string;
|
||||
MediaUrl?: string;
|
||||
MediaType?: string;
|
||||
MediaPaths?: string[];
|
||||
MediaUrls?: string[];
|
||||
MediaTypes?: string[];
|
||||
},
|
||||
opts?: { stickerMediaIncluded?: boolean },
|
||||
) {
|
||||
if (opts?.stickerMediaIncluded === false) {
|
||||
return;
|
||||
}
|
||||
const nextMediaPaths = Array.isArray(ctxPayload.MediaPaths)
|
||||
? ctxPayload.MediaPaths.slice(1)
|
||||
: undefined;
|
||||
const nextMediaUrls = Array.isArray(ctxPayload.MediaUrls)
|
||||
? ctxPayload.MediaUrls.slice(1)
|
||||
: undefined;
|
||||
const nextMediaTypes = Array.isArray(ctxPayload.MediaTypes)
|
||||
? ctxPayload.MediaTypes.slice(1)
|
||||
: undefined;
|
||||
ctxPayload.MediaPaths = nextMediaPaths && nextMediaPaths.length > 0 ? nextMediaPaths : undefined;
|
||||
ctxPayload.MediaUrls = nextMediaUrls && nextMediaUrls.length > 0 ? nextMediaUrls : undefined;
|
||||
ctxPayload.MediaTypes = nextMediaTypes && nextMediaTypes.length > 0 ? nextMediaTypes : undefined;
|
||||
ctxPayload.MediaPath = ctxPayload.MediaPaths?.[0];
|
||||
ctxPayload.MediaUrl = ctxPayload.MediaUrls?.[0] ?? ctxPayload.MediaPath;
|
||||
ctxPayload.MediaType = ctxPayload.MediaTypes?.[0];
|
||||
}
|
||||
|
||||
type DispatchTelegramMessageParams = {
|
||||
context: TelegramMessageContext;
|
||||
bot: Bot;
|
||||
cfg: OpenClawConfig;
|
||||
runtime: RuntimeEnv;
|
||||
replyToMode: ReplyToMode;
|
||||
streamMode: TelegramStreamMode;
|
||||
textLimit: number;
|
||||
telegramCfg: TelegramAccountConfig;
|
||||
opts: Pick<TelegramBotOptions, "token">;
|
||||
};
|
||||
|
||||
type TelegramReasoningLevel = "off" | "on" | "stream";
|
||||
|
||||
function resolveTelegramReasoningLevel(params: {
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey?: string;
|
||||
agentId: string;
|
||||
}): TelegramReasoningLevel {
|
||||
const { cfg, sessionKey, agentId } = params;
|
||||
if (!sessionKey) {
|
||||
return "off";
|
||||
}
|
||||
try {
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const entry = resolveSessionStoreEntry({ store, sessionKey }).existing;
|
||||
const level = entry?.reasoningLevel;
|
||||
if (level === "on" || level === "stream") {
|
||||
return level;
|
||||
}
|
||||
} catch {
|
||||
// Fall through to default.
|
||||
}
|
||||
return "off";
|
||||
}
|
||||
|
||||
export const dispatchTelegramMessage = async ({
|
||||
context,
|
||||
bot,
|
||||
cfg,
|
||||
runtime,
|
||||
replyToMode,
|
||||
streamMode,
|
||||
textLimit,
|
||||
telegramCfg,
|
||||
opts,
|
||||
}: DispatchTelegramMessageParams) => {
|
||||
const {
|
||||
ctxPayload,
|
||||
msg,
|
||||
chatId,
|
||||
isGroup,
|
||||
threadSpec,
|
||||
historyKey,
|
||||
historyLimit,
|
||||
groupHistories,
|
||||
route,
|
||||
skillFilter,
|
||||
sendTyping,
|
||||
sendRecordVoice,
|
||||
ackReactionPromise,
|
||||
reactionApi,
|
||||
removeAckAfterReply,
|
||||
statusReactionController,
|
||||
} = context;
|
||||
|
||||
const draftMaxChars = Math.min(textLimit, 4096);
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const renderDraftPreview = (text: string) => ({
|
||||
text: renderTelegramHtmlText(text, { tableMode }),
|
||||
parseMode: "HTML" as const,
|
||||
});
|
||||
const accountBlockStreamingEnabled =
|
||||
typeof telegramCfg.blockStreaming === "boolean"
|
||||
? telegramCfg.blockStreaming
|
||||
: cfg.agents?.defaults?.blockStreamingDefault === "on";
|
||||
const resolvedReasoningLevel = resolveTelegramReasoningLevel({
|
||||
cfg,
|
||||
sessionKey: ctxPayload.SessionKey,
|
||||
agentId: route.agentId,
|
||||
});
|
||||
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
|
||||
const streamReasoningDraft = resolvedReasoningLevel === "stream";
|
||||
const previewStreamingEnabled = streamMode !== "off";
|
||||
const canStreamAnswerDraft =
|
||||
previewStreamingEnabled && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning;
|
||||
const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft;
|
||||
const draftReplyToMessageId =
|
||||
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
|
||||
const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS;
|
||||
// Keep DM preview lanes on real message transport. Native draft previews still
|
||||
// require a draft->message materialize hop, and that overlap keeps reintroducing
|
||||
// a visible duplicate flash at finalize time.
|
||||
const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft;
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
const archivedAnswerPreviews: ArchivedPreview[] = [];
|
||||
const archivedReasoningPreviewIds: number[] = [];
|
||||
const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => {
|
||||
const stream = enabled
|
||||
? createTelegramDraftStream({
|
||||
api: bot.api,
|
||||
chatId,
|
||||
maxChars: draftMaxChars,
|
||||
thread: threadSpec,
|
||||
previewTransport: useMessagePreviewTransportForDm ? "message" : "auto",
|
||||
replyToMessageId: draftReplyToMessageId,
|
||||
minInitialChars: draftMinInitialChars,
|
||||
renderText: renderDraftPreview,
|
||||
onSupersededPreview:
|
||||
laneName === "answer" || laneName === "reasoning"
|
||||
? (preview) => {
|
||||
if (laneName === "reasoning") {
|
||||
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
|
||||
archivedReasoningPreviewIds.push(preview.messageId);
|
||||
}
|
||||
return;
|
||||
}
|
||||
archivedAnswerPreviews.push({
|
||||
messageId: preview.messageId,
|
||||
textSnapshot: preview.textSnapshot,
|
||||
deleteIfUnused: true,
|
||||
});
|
||||
}
|
||||
: undefined,
|
||||
log: logVerbose,
|
||||
warn: logVerbose,
|
||||
})
|
||||
: undefined;
|
||||
return {
|
||||
stream,
|
||||
lastPartialText: "",
|
||||
hasStreamedMessage: false,
|
||||
};
|
||||
};
|
||||
const lanes: Record<LaneName, DraftLaneState> = {
|
||||
answer: createDraftLane("answer", canStreamAnswerDraft),
|
||||
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
|
||||
};
|
||||
// Active preview lifecycle answers "can this current preview still be
|
||||
// finalized?" Cleanup retention is separate so archived-preview decisions do
|
||||
// not poison the active lane.
|
||||
const activePreviewLifecycleByLane: Record<LaneName, LanePreviewLifecycle> = {
|
||||
answer: "transient",
|
||||
reasoning: "transient",
|
||||
};
|
||||
const retainPreviewOnCleanupByLane: Record<LaneName, boolean> = {
|
||||
answer: false,
|
||||
reasoning: false,
|
||||
};
|
||||
const answerLane = lanes.answer;
|
||||
const reasoningLane = lanes.reasoning;
|
||||
let splitReasoningOnNextStream = false;
|
||||
let skipNextAnswerMessageStartRotation = false;
|
||||
let draftLaneEventQueue = Promise.resolve();
|
||||
const reasoningStepState = createTelegramReasoningStepState();
|
||||
const enqueueDraftLaneEvent = (task: () => Promise<void>): Promise<void> => {
|
||||
const next = draftLaneEventQueue.then(task);
|
||||
draftLaneEventQueue = next.catch((err) => {
|
||||
logVerbose(`telegram: draft lane callback failed: ${String(err)}`);
|
||||
});
|
||||
return draftLaneEventQueue;
|
||||
};
|
||||
type SplitLaneSegment = { lane: LaneName; text: string };
|
||||
type SplitLaneSegmentsResult = {
|
||||
segments: SplitLaneSegment[];
|
||||
suppressedReasoningOnly: boolean;
|
||||
};
|
||||
const splitTextIntoLaneSegments = (text?: string): SplitLaneSegmentsResult => {
|
||||
const split = splitTelegramReasoningText(text);
|
||||
const segments: SplitLaneSegment[] = [];
|
||||
const suppressReasoning = resolvedReasoningLevel === "off";
|
||||
if (split.reasoningText && !suppressReasoning) {
|
||||
segments.push({ lane: "reasoning", text: split.reasoningText });
|
||||
}
|
||||
if (split.answerText) {
|
||||
segments.push({ lane: "answer", text: split.answerText });
|
||||
}
|
||||
return {
|
||||
segments,
|
||||
suppressedReasoningOnly:
|
||||
Boolean(split.reasoningText) && suppressReasoning && !split.answerText,
|
||||
};
|
||||
};
|
||||
const resetDraftLaneState = (lane: DraftLaneState) => {
|
||||
lane.lastPartialText = "";
|
||||
lane.hasStreamedMessage = false;
|
||||
};
|
||||
const rotateAnswerLaneForNewAssistantMessage = async () => {
|
||||
let didForceNewMessage = false;
|
||||
if (answerLane.hasStreamedMessage) {
|
||||
// Materialize the current streamed draft into a permanent message
|
||||
// so it remains visible across tool boundaries.
|
||||
const materializedId = await answerLane.stream?.materialize?.();
|
||||
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
|
||||
if (
|
||||
typeof previewMessageId === "number" &&
|
||||
activePreviewLifecycleByLane.answer === "transient"
|
||||
) {
|
||||
archivedAnswerPreviews.push({
|
||||
messageId: previewMessageId,
|
||||
textSnapshot: answerLane.lastPartialText,
|
||||
deleteIfUnused: false,
|
||||
});
|
||||
}
|
||||
answerLane.stream?.forceNewMessage();
|
||||
didForceNewMessage = true;
|
||||
}
|
||||
resetDraftLaneState(answerLane);
|
||||
if (didForceNewMessage) {
|
||||
// New assistant message boundary: this lane now tracks a fresh preview lifecycle.
|
||||
activePreviewLifecycleByLane.answer = "transient";
|
||||
retainPreviewOnCleanupByLane.answer = false;
|
||||
}
|
||||
return didForceNewMessage;
|
||||
};
|
||||
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
|
||||
const laneStream = lane.stream;
|
||||
if (!laneStream || !text) {
|
||||
return;
|
||||
}
|
||||
if (text === lane.lastPartialText) {
|
||||
return;
|
||||
}
|
||||
// Mark that we've received streaming content (for forceNewMessage decision).
|
||||
lane.hasStreamedMessage = true;
|
||||
// Some providers briefly emit a shorter prefix snapshot (for example
|
||||
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
|
||||
// visible punctuation flicker.
|
||||
if (
|
||||
lane.lastPartialText &&
|
||||
lane.lastPartialText.startsWith(text) &&
|
||||
text.length < lane.lastPartialText.length
|
||||
) {
|
||||
return;
|
||||
}
|
||||
lane.lastPartialText = text;
|
||||
laneStream.update(text);
|
||||
};
|
||||
const ingestDraftLaneSegments = async (text: string | undefined) => {
|
||||
const split = splitTextIntoLaneSegments(text);
|
||||
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
|
||||
if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") {
|
||||
// Some providers can emit the first partial of a new assistant message before
|
||||
// onAssistantMessageStart() arrives. Rotate preemptively so we do not edit
|
||||
// the previously finalized preview message with the next message's text.
|
||||
skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage();
|
||||
}
|
||||
for (const segment of split.segments) {
|
||||
if (segment.lane === "reasoning") {
|
||||
reasoningStepState.noteReasoningHint();
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
}
|
||||
updateDraftFromPartial(lanes[segment.lane], segment.text);
|
||||
}
|
||||
};
|
||||
const flushDraftLane = async (lane: DraftLaneState) => {
|
||||
if (!lane.stream) {
|
||||
return;
|
||||
}
|
||||
await lane.stream.flush();
|
||||
};
|
||||
|
||||
const disableBlockStreaming = !previewStreamingEnabled
|
||||
? true
|
||||
: forceBlockStreamingForReasoning
|
||||
? false
|
||||
: typeof telegramCfg.blockStreaming === "boolean"
|
||||
? !telegramCfg.blockStreaming
|
||||
: canStreamAnswerDraft
|
||||
? true
|
||||
: undefined;
|
||||
|
||||
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
|
||||
cfg,
|
||||
agentId: route.agentId,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
|
||||
|
||||
// Handle uncached stickers: get a dedicated vision description before dispatch
|
||||
// This ensures we cache a raw description rather than a conversational response
|
||||
const sticker = ctxPayload.Sticker;
|
||||
if (sticker?.fileId && sticker.fileUniqueId && ctxPayload.MediaPath) {
|
||||
const agentDir = resolveAgentDir(cfg, route.agentId);
|
||||
const stickerSupportsVision = await resolveStickerVisionSupport(cfg, route.agentId);
|
||||
let description = sticker.cachedDescription ?? null;
|
||||
if (!description) {
|
||||
description = await describeStickerImage({
|
||||
imagePath: ctxPayload.MediaPath,
|
||||
cfg,
|
||||
agentDir,
|
||||
agentId: route.agentId,
|
||||
});
|
||||
}
|
||||
if (description) {
|
||||
// Format the description with sticker context
|
||||
const stickerContext = [sticker.emoji, sticker.setName ? `from "${sticker.setName}"` : null]
|
||||
.filter(Boolean)
|
||||
.join(" ");
|
||||
const formattedDesc = `[Sticker${stickerContext ? ` ${stickerContext}` : ""}] ${description}`;
|
||||
|
||||
sticker.cachedDescription = description;
|
||||
if (!stickerSupportsVision) {
|
||||
// Update context to use description instead of image
|
||||
ctxPayload.Body = formattedDesc;
|
||||
ctxPayload.BodyForAgent = formattedDesc;
|
||||
// Drop only the sticker attachment; keep replied media context if present.
|
||||
pruneStickerMediaFromContext(ctxPayload, {
|
||||
stickerMediaIncluded: ctxPayload.StickerMediaIncluded,
|
||||
});
|
||||
}
|
||||
|
||||
// Cache the description for future encounters
|
||||
if (sticker.fileId) {
|
||||
cacheSticker({
|
||||
fileId: sticker.fileId,
|
||||
fileUniqueId: sticker.fileUniqueId,
|
||||
emoji: sticker.emoji,
|
||||
setName: sticker.setName,
|
||||
description,
|
||||
cachedAt: new Date().toISOString(),
|
||||
receivedFrom: ctxPayload.From,
|
||||
});
|
||||
logVerbose(`telegram: cached sticker description for ${sticker.fileUniqueId}`);
|
||||
} else {
|
||||
logVerbose(`telegram: skipped sticker cache (missing fileId)`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const replyQuoteText =
|
||||
ctxPayload.ReplyToIsQuote && ctxPayload.ReplyToBody
|
||||
? ctxPayload.ReplyToBody.trim() || undefined
|
||||
: undefined;
|
||||
const deliveryState = createLaneDeliveryStateTracker();
|
||||
const clearGroupHistory = () => {
|
||||
if (isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
|
||||
}
|
||||
};
|
||||
const deliveryBaseOptions = {
|
||||
chatId: String(chatId),
|
||||
accountId: route.accountId,
|
||||
sessionKeyForInternalHooks: ctxPayload.SessionKey,
|
||||
mirrorIsGroup: isGroup,
|
||||
mirrorGroupId: isGroup ? String(chatId) : undefined,
|
||||
token: opts.token,
|
||||
runtime,
|
||||
bot,
|
||||
mediaLocalRoots,
|
||||
replyToMode,
|
||||
textLimit,
|
||||
thread: threadSpec,
|
||||
tableMode,
|
||||
chunkMode,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
replyQuoteText,
|
||||
};
|
||||
const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => {
|
||||
if (payload.text === text) {
|
||||
return payload;
|
||||
}
|
||||
return { ...payload, text };
|
||||
};
|
||||
const sendPayload = async (payload: ReplyPayload) => {
|
||||
const result = await deliverReplies({
|
||||
...deliveryBaseOptions,
|
||||
replies: [payload],
|
||||
onVoiceRecording: sendRecordVoice,
|
||||
});
|
||||
if (result.delivered) {
|
||||
deliveryState.markDelivered();
|
||||
}
|
||||
return result.delivered;
|
||||
};
|
||||
const deliverLaneText = createLaneTextDeliverer({
|
||||
lanes,
|
||||
archivedAnswerPreviews,
|
||||
activePreviewLifecycleByLane,
|
||||
retainPreviewOnCleanupByLane,
|
||||
draftMaxChars,
|
||||
applyTextToPayload,
|
||||
sendPayload,
|
||||
flushDraftLane,
|
||||
stopDraftLane: async (lane) => {
|
||||
await lane.stream?.stop();
|
||||
},
|
||||
editPreview: async ({ messageId, text, previewButtons }) => {
|
||||
await editMessageTelegram(chatId, messageId, text, {
|
||||
api: bot.api,
|
||||
cfg,
|
||||
accountId: route.accountId,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
buttons: previewButtons,
|
||||
});
|
||||
},
|
||||
deletePreviewMessage: async (messageId) => {
|
||||
await bot.api.deleteMessage(chatId, messageId);
|
||||
},
|
||||
log: logVerbose,
|
||||
markDelivered: () => {
|
||||
deliveryState.markDelivered();
|
||||
},
|
||||
});
|
||||
|
||||
let queuedFinal = false;
|
||||
|
||||
if (statusReactionController) {
|
||||
void statusReactionController.setThinking();
|
||||
}
|
||||
|
||||
const typingCallbacks = createTypingCallbacks({
|
||||
start: sendTyping,
|
||||
onStartError: (err) => {
|
||||
logTypingFailure({
|
||||
log: logVerbose,
|
||||
channel: "telegram",
|
||||
target: String(chatId),
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
let dispatchError: unknown;
|
||||
try {
|
||||
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: ctxPayload,
|
||||
cfg,
|
||||
dispatcherOptions: {
|
||||
...prefixOptions,
|
||||
typingCallbacks,
|
||||
deliver: async (payload, info) => {
|
||||
if (info.kind === "final") {
|
||||
// Assistant callbacks are fire-and-forget; ensure queued boundary
|
||||
// rotations/partials are applied before final delivery mapping.
|
||||
await enqueueDraftLaneEvent(async () => {});
|
||||
}
|
||||
if (
|
||||
shouldSuppressLocalTelegramExecApprovalPrompt({
|
||||
cfg,
|
||||
accountId: route.accountId,
|
||||
payload,
|
||||
})
|
||||
) {
|
||||
queuedFinal = true;
|
||||
return;
|
||||
}
|
||||
const previewButtons = (
|
||||
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
|
||||
)?.buttons;
|
||||
const split = splitTextIntoLaneSegments(payload.text);
|
||||
const segments = split.segments;
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
|
||||
const flushBufferedFinalAnswer = async () => {
|
||||
const buffered = reasoningStepState.takeBufferedFinalAnswer();
|
||||
if (!buffered) {
|
||||
return;
|
||||
}
|
||||
const bufferedButtons = (
|
||||
buffered.payload.channelData?.telegram as
|
||||
| { buttons?: TelegramInlineButtons }
|
||||
| undefined
|
||||
)?.buttons;
|
||||
await deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: buffered.text,
|
||||
payload: buffered.payload,
|
||||
infoKind: "final",
|
||||
previewButtons: bufferedButtons,
|
||||
});
|
||||
reasoningStepState.resetForNextStep();
|
||||
};
|
||||
|
||||
for (const segment of segments) {
|
||||
if (
|
||||
segment.lane === "answer" &&
|
||||
info.kind === "final" &&
|
||||
reasoningStepState.shouldBufferFinalAnswer()
|
||||
) {
|
||||
reasoningStepState.bufferFinalAnswer({
|
||||
payload,
|
||||
text: segment.text,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
if (segment.lane === "reasoning") {
|
||||
reasoningStepState.noteReasoningHint();
|
||||
}
|
||||
const result = await deliverLaneText({
|
||||
laneName: segment.lane,
|
||||
text: segment.text,
|
||||
payload,
|
||||
infoKind: info.kind,
|
||||
previewButtons,
|
||||
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
|
||||
});
|
||||
if (segment.lane === "reasoning") {
|
||||
if (result !== "skipped") {
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (info.kind === "final") {
|
||||
if (reasoningLane.hasStreamedMessage) {
|
||||
activePreviewLifecycleByLane.reasoning = "complete";
|
||||
retainPreviewOnCleanupByLane.reasoning = true;
|
||||
}
|
||||
reasoningStepState.resetForNextStep();
|
||||
}
|
||||
}
|
||||
if (segments.length > 0) {
|
||||
return;
|
||||
}
|
||||
if (split.suppressedReasoningOnly) {
|
||||
if (hasMedia) {
|
||||
const payloadWithoutSuppressedReasoning =
|
||||
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
|
||||
await sendPayload(payloadWithoutSuppressedReasoning);
|
||||
}
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (info.kind === "final") {
|
||||
await answerLane.stream?.stop();
|
||||
await reasoningLane.stream?.stop();
|
||||
reasoningStepState.resetForNextStep();
|
||||
}
|
||||
const canSendAsIs =
|
||||
hasMedia || (typeof payload.text === "string" && payload.text.length > 0);
|
||||
if (!canSendAsIs) {
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
return;
|
||||
}
|
||||
await sendPayload(payload);
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
},
|
||||
onSkip: (_payload, info) => {
|
||||
if (info.reason !== "silent") {
|
||||
deliveryState.markNonSilentSkip();
|
||||
}
|
||||
},
|
||||
onError: (err, info) => {
|
||||
deliveryState.markNonSilentFailure();
|
||||
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
|
||||
},
|
||||
},
|
||||
replyOptions: {
|
||||
skillFilter,
|
||||
disableBlockStreaming,
|
||||
onPartialReply:
|
||||
answerLane.stream || reasoningLane.stream
|
||||
? (payload) =>
|
||||
enqueueDraftLaneEvent(async () => {
|
||||
await ingestDraftLaneSegments(payload.text);
|
||||
})
|
||||
: undefined,
|
||||
onReasoningStream: reasoningLane.stream
|
||||
? (payload) =>
|
||||
enqueueDraftLaneEvent(async () => {
|
||||
// Split between reasoning blocks only when the next reasoning
|
||||
// stream starts. Splitting at reasoning-end can orphan the active
|
||||
// preview and cause duplicate reasoning sends on reasoning final.
|
||||
if (splitReasoningOnNextStream) {
|
||||
reasoningLane.stream?.forceNewMessage();
|
||||
resetDraftLaneState(reasoningLane);
|
||||
splitReasoningOnNextStream = false;
|
||||
}
|
||||
await ingestDraftLaneSegments(payload.text);
|
||||
})
|
||||
: undefined,
|
||||
onAssistantMessageStart: answerLane.stream
|
||||
? () =>
|
||||
enqueueDraftLaneEvent(async () => {
|
||||
reasoningStepState.resetForNextStep();
|
||||
if (skipNextAnswerMessageStartRotation) {
|
||||
skipNextAnswerMessageStartRotation = false;
|
||||
activePreviewLifecycleByLane.answer = "transient";
|
||||
retainPreviewOnCleanupByLane.answer = false;
|
||||
return;
|
||||
}
|
||||
await rotateAnswerLaneForNewAssistantMessage();
|
||||
// Message-start is an explicit assistant-message boundary.
|
||||
// Even when no forceNewMessage happened (e.g. prior answer had no
|
||||
// streamed partials), the next partial belongs to a fresh lifecycle
|
||||
// and must not trigger late pre-rotation mid-message.
|
||||
activePreviewLifecycleByLane.answer = "transient";
|
||||
retainPreviewOnCleanupByLane.answer = false;
|
||||
})
|
||||
: undefined,
|
||||
onReasoningEnd: reasoningLane.stream
|
||||
? () =>
|
||||
enqueueDraftLaneEvent(async () => {
|
||||
// Split when/if a later reasoning block begins.
|
||||
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
|
||||
})
|
||||
: undefined,
|
||||
onToolStart: statusReactionController
|
||||
? async (payload) => {
|
||||
await statusReactionController.setTool(payload.name);
|
||||
}
|
||||
: undefined,
|
||||
onCompactionStart: statusReactionController
|
||||
? () => statusReactionController.setCompacting()
|
||||
: undefined,
|
||||
onCompactionEnd: statusReactionController
|
||||
? async () => {
|
||||
statusReactionController.cancelPending();
|
||||
await statusReactionController.setThinking();
|
||||
}
|
||||
: undefined,
|
||||
onModelSelected,
|
||||
},
|
||||
}));
|
||||
} catch (err) {
|
||||
dispatchError = err;
|
||||
runtime.error?.(danger(`telegram dispatch failed: ${String(err)}`));
|
||||
} finally {
|
||||
// Upstream assistant callbacks are fire-and-forget; drain queued lane work
|
||||
// before stream cleanup so boundary rotations/materialization complete first.
|
||||
await draftLaneEventQueue;
|
||||
// Must stop() first to flush debounced content before clear() wipes state.
|
||||
const streamCleanupStates = new Map<
|
||||
NonNullable<DraftLaneState["stream"]>,
|
||||
{ shouldClear: boolean }
|
||||
>();
|
||||
const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [
|
||||
{ laneName: "answer", lane: answerLane },
|
||||
{ laneName: "reasoning", lane: reasoningLane },
|
||||
];
|
||||
for (const laneState of lanesToCleanup) {
|
||||
const stream = laneState.lane.stream;
|
||||
if (!stream) {
|
||||
continue;
|
||||
}
|
||||
// Don't clear (delete) the stream if: (a) it was finalized, or
|
||||
// (b) the active stream message is itself a boundary-finalized archive.
|
||||
const activePreviewMessageId = stream.messageId();
|
||||
const hasBoundaryFinalizedActivePreview =
|
||||
laneState.laneName === "answer" &&
|
||||
typeof activePreviewMessageId === "number" &&
|
||||
archivedAnswerPreviews.some(
|
||||
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
|
||||
);
|
||||
const shouldClear =
|
||||
!retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
|
||||
const existing = streamCleanupStates.get(stream);
|
||||
if (!existing) {
|
||||
streamCleanupStates.set(stream, { shouldClear });
|
||||
continue;
|
||||
}
|
||||
existing.shouldClear = existing.shouldClear && shouldClear;
|
||||
}
|
||||
for (const [stream, cleanupState] of streamCleanupStates) {
|
||||
await stream.stop();
|
||||
if (cleanupState.shouldClear) {
|
||||
await stream.clear();
|
||||
}
|
||||
}
|
||||
for (const archivedPreview of archivedAnswerPreviews) {
|
||||
if (archivedPreview.deleteIfUnused === false) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
for (const messageId of archivedReasoningPreviewIds) {
|
||||
try {
|
||||
await bot.api.deleteMessage(chatId, messageId);
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
let sentFallback = false;
|
||||
const deliverySummary = deliveryState.snapshot();
|
||||
if (
|
||||
dispatchError ||
|
||||
(!deliverySummary.delivered &&
|
||||
(deliverySummary.skippedNonSilent > 0 || deliverySummary.failedNonSilent > 0))
|
||||
) {
|
||||
const fallbackText = dispatchError
|
||||
? "Something went wrong while processing your request. Please try again."
|
||||
: EMPTY_RESPONSE_FALLBACK;
|
||||
const result = await deliverReplies({
|
||||
replies: [{ text: fallbackText }],
|
||||
...deliveryBaseOptions,
|
||||
});
|
||||
sentFallback = result.delivered;
|
||||
}
|
||||
|
||||
const hasFinalResponse = queuedFinal || sentFallback;
|
||||
|
||||
if (statusReactionController && !hasFinalResponse) {
|
||||
void statusReactionController.setError().catch((err) => {
|
||||
logVerbose(`telegram: status reaction error finalize failed: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
|
||||
if (!hasFinalResponse) {
|
||||
clearGroupHistory();
|
||||
return;
|
||||
}
|
||||
|
||||
if (statusReactionController) {
|
||||
void statusReactionController.setDone().catch((err) => {
|
||||
logVerbose(`telegram: status reaction finalize failed: ${String(err)}`);
|
||||
});
|
||||
} else {
|
||||
removeAckReactionAfterReply({
|
||||
removeAfterReply: removeAckAfterReply,
|
||||
ackReactionPromise,
|
||||
ackReactionValue: ackReactionPromise ? "ack" : null,
|
||||
remove: () => reactionApi?.(chatId, msg.message_id ?? 0, []) ?? Promise.resolve(),
|
||||
onError: (err) => {
|
||||
if (!msg.message_id) {
|
||||
return;
|
||||
}
|
||||
logAckFailure({
|
||||
log: logVerbose,
|
||||
channel: "telegram",
|
||||
target: `${chatId}/${msg.message_id}`,
|
||||
error: err,
|
||||
});
|
||||
},
|
||||
});
|
||||
}
|
||||
clearGroupHistory();
|
||||
};
|
||||
export * from "../../extensions/telegram/src/bot-message-dispatch.js";
|
||||
|
||||
Reference in New Issue
Block a user