diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 24be0903df5..13a0b2384b7 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -409,8 +409,6 @@ export const dispatchTelegramMessage = async ({ ? (replyQuoteMessageId ?? msg.message_id) : undefined; const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS; - // DM draft previews still duplicate briefly at materialize time. - const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); const archivedAnswerPreviews: ArchivedPreview[] = []; const archivedReasoningPreviewIds: number[] = []; @@ -421,7 +419,6 @@ export const dispatchTelegramMessage = async ({ chatId, maxChars: draftMaxChars, thread: threadSpec, - previewTransport: useMessagePreviewTransportForDm ? "message" : "auto", replyToMessageId: draftReplyToMessageId, minInitialChars: draftMinInitialChars, renderText: renderDraftPreview, diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index 19511a224a3..f9ab22c88d7 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -10,21 +10,7 @@ import { normalizeTelegramReplyToMessageId } from "./outbound-params.js"; const TELEGRAM_STREAM_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 1000; -const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647; const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i; -const DRAFT_METHOD_UNAVAILABLE_RE = - /(unknown method|method .*not (found|available|supported)|unsupported)/i; -const DRAFT_CHAT_UNSUPPORTED_RE = /(can't be used|can be used only)/i; - -type TelegramSendMessageDraft = ( - chatId: Parameters[0], - draftId: number, - text: string, - params?: { - message_thread_id?: number; - parse_mode?: "HTML"; - }, -) => Promise; type TelegramSendMessageParams = Parameters[2]; @@ -38,71 +24,18 @@ function hasNumericMessageThreadId( ); } -/** - * Keep draft-id allocation shared across bundled chunks so concurrent preview - * lanes do not accidentally reuse draft ids when code-split entries coexist. - */ -const TELEGRAM_DRAFT_STREAM_STATE_KEY = Symbol.for("openclaw.telegramDraftStreamState"); -let draftStreamState: { nextDraftId: number } | undefined; - -function getDraftStreamState(): { nextDraftId: number } { - if (!draftStreamState) { - const globalStore = globalThis as Record; - draftStreamState = (globalStore[TELEGRAM_DRAFT_STREAM_STATE_KEY] as - | { nextDraftId: number } - | undefined) ?? { - nextDraftId: 0, - }; - globalStore[TELEGRAM_DRAFT_STREAM_STATE_KEY] = draftStreamState; - } - return draftStreamState; -} - -function allocateTelegramDraftId(): number { - const state = getDraftStreamState(); - state.nextDraftId = state.nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : state.nextDraftId + 1; - return state.nextDraftId; -} - -function resolveSendMessageDraftApi(api: Bot["api"]): TelegramSendMessageDraft | undefined { - const sendMessageDraft = (api as Bot["api"] & { sendMessageDraft?: TelegramSendMessageDraft }) - .sendMessageDraft; - if (typeof sendMessageDraft !== "function") { - return undefined; - } - return sendMessageDraft.bind(api as object); -} - -function shouldFallbackFromDraftTransport(err: unknown): boolean { - const text = - typeof err === "string" - ? err - : err instanceof Error - ? err.message - : typeof err === "object" && err && "description" in err - ? typeof err.description === "string" - ? err.description - : "" - : ""; - if (!/sendMessageDraft/i.test(text)) { - return false; - } - return DRAFT_METHOD_UNAVAILABLE_RE.test(text) || DRAFT_CHAT_UNSUPPORTED_RE.test(text); -} - export type TelegramDraftStream = { update: (text: string) => void; flush: () => Promise; messageId: () => number | undefined; visibleSinceMs?: () => number | undefined; - previewMode?: () => "message" | "draft"; previewRevision?: () => number; lastDeliveredText?: () => string; clear: () => Promise; stop: () => Promise; /** Stop without a final flush or delete. */ discard?: () => Promise; - /** Convert the current draft preview into a permanent message (sendMessage). */ + /** Return the current preview message id after pending updates settle. */ materialize?: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; @@ -127,7 +60,6 @@ export function createTelegramDraftStream(params: { chatId: Parameters[0]; maxChars?: number; thread?: TelegramThreadSpec | null; - previewTransport?: "auto" | "message" | "draft"; replyToMessageId?: number; throttleMs?: number; /** Minimum chars before sending first message (debounce for push notifications) */ @@ -146,13 +78,6 @@ export function createTelegramDraftStream(params: { const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS); const minInitialChars = params.minInitialChars; const chatId = params.chatId; - const requestedPreviewTransport = params.previewTransport ?? "auto"; - const prefersDraftTransport = - requestedPreviewTransport === "draft" - ? true - : requestedPreviewTransport === "message" - ? false - : params.thread?.scope === "dm"; const threadParams = buildTelegramThreadParams(params.thread); const replyToMessageId = normalizeTelegramReplyToMessageId(params.replyToMessageId); const replyParams = @@ -163,22 +88,11 @@ export function createTelegramDraftStream(params: { allow_sending_without_reply: true, } : threadParams; - const resolvedDraftApi = prefersDraftTransport - ? resolveSendMessageDraftApi(params.api) - : undefined; - const usesDraftTransport = Boolean(prefersDraftTransport && resolvedDraftApi); - if (prefersDraftTransport && !usesDraftTransport) { - params.warn?.( - "telegram stream preview: sendMessageDraft unavailable; falling back to sendMessage/editMessageText", - ); - } const streamState = { stopped: false, final: false }; let messageSendAttempted = false; let streamMessageId: number | undefined; let streamVisibleSinceMs: number | undefined; - let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined; - let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message"; let lastSentText = ""; let lastDeliveredText = ""; let lastSentParseMode: "HTML" | undefined; @@ -275,26 +189,6 @@ export function createTelegramDraftStream(params: { streamVisibleSinceMs = visibleSinceMs; return true; }; - const sendDraftTransportPreview = async ({ - renderedText, - renderedParseMode, - }: PreviewSendParams): Promise => { - const draftId = streamDraftId ?? allocateTelegramDraftId(); - streamDraftId = draftId; - const draftParams = { - ...(threadParams?.message_thread_id != null - ? { message_thread_id: threadParams.message_thread_id } - : {}), - ...(renderedParseMode ? { parse_mode: renderedParseMode } : {}), - }; - await resolvedDraftApi!( - chatId, - draftId, - renderedText, - Object.keys(draftParams).length > 0 ? draftParams : undefined, - ); - return true; - }; const sendOrEditStreamMessage = async (text: string): Promise => { if (streamState.stopped && !streamState.final) { @@ -331,36 +225,11 @@ export function createTelegramDraftStream(params: { lastSentText = renderedText; lastSentParseMode = renderedParseMode; try { - let sent = false; - if (previewTransport === "draft") { - try { - sent = await sendDraftTransportPreview({ - renderedText, - renderedParseMode, - sendGeneration, - }); - } catch (err) { - if (!shouldFallbackFromDraftTransport(err)) { - throw err; - } - previewTransport = "message"; - streamDraftId = undefined; - params.warn?.( - "telegram stream preview: sendMessageDraft rejected by API; falling back to sendMessage/editMessageText", - ); - sent = await sendMessageTransportPreview({ - renderedText, - renderedParseMode, - sendGeneration, - }); - } - } else { - sent = await sendMessageTransportPreview({ - renderedText, - renderedParseMode, - sendGeneration, - }); - } + const sent = await sendMessageTransportPreview({ + renderedText, + renderedParseMode, + sendGeneration, + }); if (sent) { previewRevision += 1; lastDeliveredText = trimmed; @@ -396,16 +265,6 @@ export function createTelegramDraftStream(params: { } return; } - if (previewTransport !== "draft" || resolvedDraftApi == null || streamDraftId == null) { - return; - } - const clearDraftId = streamDraftId; - streamDraftId = undefined; - try { - await resolvedDraftApi(chatId, clearDraftId, "", threadParams); - } catch (err) { - params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`); - } }; const discard = async () => { @@ -419,9 +278,6 @@ export function createTelegramDraftStream(params: { messageSendAttempted = false; streamMessageId = undefined; streamVisibleSinceMs = undefined; - if (previewTransport === "draft") { - streamDraftId = allocateTelegramDraftId(); - } lastSentText = ""; lastSentParseMode = undefined; loop.resetPending(); @@ -430,41 +286,7 @@ export function createTelegramDraftStream(params: { const materialize = async (): Promise => { await stop(); - if (previewTransport === "message" && typeof streamMessageId === "number") { - return streamMessageId; - } - const renderedText = lastSentText || lastDeliveredText; - if (!renderedText) { - return undefined; - } - const renderedParseMode = lastSentText ? lastSentParseMode : undefined; - try { - const { sent, usedThreadParams } = await sendRenderedMessageWithThreadFallback({ - renderedText, - renderedParseMode, - fallbackWarnMessage: - "telegram stream preview materialize send failed with message_thread_id, retrying without thread", - }); - const sentId = sent?.message_id; - if (typeof sentId === "number" && Number.isFinite(sentId)) { - streamMessageId = Math.trunc(sentId); - streamVisibleSinceMs = Date.now(); - if (resolvedDraftApi != null && streamDraftId != null) { - const clearDraftId = streamDraftId; - const clearThreadParams = - usedThreadParams && threadParams?.message_thread_id != null - ? { message_thread_id: threadParams.message_thread_id } - : undefined; - try { - await resolvedDraftApi(chatId, clearDraftId, "", clearThreadParams); - } catch {} - } - return streamMessageId; - } - } catch (err) { - params.warn?.(`telegram stream preview materialize failed: ${formatErrorMessage(err)}`); - } - return undefined; + return streamMessageId; }; params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`); @@ -474,7 +296,6 @@ export function createTelegramDraftStream(params: { flush: loop.flush, messageId: () => streamMessageId, visibleSinceMs: () => streamVisibleSinceMs, - previewMode: () => previewTransport, previewRevision: () => previewRevision, lastDeliveredText: () => lastDeliveredText, clear, @@ -485,9 +306,3 @@ export function createTelegramDraftStream(params: { sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number", }; } - -export const __testing = { - resetTelegramDraftStreamForTests() { - getDraftStreamState().nextDraftId = 0; - }, -}; diff --git a/extensions/telegram/src/lane-delivery-text-deliverer.ts b/extensions/telegram/src/lane-delivery-text-deliverer.ts index 8fb0a42f411..0930e738cfd 100644 --- a/extensions/telegram/src/lane-delivery-text-deliverer.ts +++ b/extensions/telegram/src/lane-delivery-text-deliverer.ts @@ -203,8 +203,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { params.activePreviewLifecycleByLane[laneName] = "complete"; params.retainPreviewOnCleanupByLane[laneName] = true; }; - const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft"; - const isMessagePreviewLane = (lane: DraftLaneState) => !isDraftPreviewLane(lane); + const isMessagePreviewLane = (lane: DraftLaneState) => lane.stream != null; const shouldUseFreshFinalForLane = (lane: DraftLaneState) => isMessagePreviewLane(lane) && isLongLivedPreview(lane.stream?.visibleSinceMs?.(), readNow()); const shouldUseFreshFinalForPreview = (lane: DraftLaneState, visibleSinceMs?: number) => @@ -219,43 +218,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { lane.hasStreamedMessage = false; lane.stream?.forceNewMessage(); }; - const canMaterializeDraftFinal = ( - lane: DraftLaneState, - previewButtons?: TelegramInlineButtons, - ) => { - const hasPreviewButtons = Boolean(previewButtons && previewButtons.length > 0); - return ( - lane.hasStreamedMessage && - isDraftPreviewLane(lane) && - !hasPreviewButtons && - typeof lane.stream?.materialize === "function" - ); - }; - - const tryMaterializeDraftPreviewForFinal = async (args: { - lane: DraftLaneState; - laneName: LaneName; - text: string; - }): Promise => { - const stream = args.lane.stream; - if (!stream || !isDraftPreviewLane(args.lane)) { - return undefined; - } - // Draft previews have no message_id to edit; materialize the final text - // into a real message and treat that as the finalized delivery. - stream.update(args.text); - const materializedMessageId = await stream.materialize?.(); - if (typeof materializedMessageId !== "number") { - params.log( - `telegram: ${args.laneName} draft preview materialize produced no message id; falling back to standard send`, - ); - return undefined; - } - args.lane.lastPartialText = args.text; - params.markDelivered(); - return materializedMessageId; - }; - const tryEditPreviewMessage = async (args: { laneName: LaneName; messageId: number; @@ -578,20 +540,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { return archivedResultAfterFlush; } } - if (canMaterializeDraftFinal(lane, previewButtons)) { - const materializedMessageId = await tryMaterializeDraftPreviewForFinal({ - lane, - laneName, - text, - }); - if (typeof materializedMessageId === "number") { - markActivePreviewComplete(laneName); - return result("preview-finalized", { - content: text, - messageId: materializedMessageId, - }); - } - } if (shouldUseFreshFinalForLane(lane)) { await params.stopDraftLane(lane); const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); @@ -639,24 +587,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } if (allowPreviewUpdateForNonFinal && canEditViaPreview) { - if (isDraftPreviewLane(lane)) { - // DM draft flow has no message_id to edit; updates are sent via sendMessageDraft. - // Only mark as updated when the draft flush actually emits an update. - const previewRevisionBeforeFlush = lane.stream?.previewRevision?.() ?? 0; - lane.stream?.update(text); - await params.flushDraftLane(lane); - const previewUpdated = (lane.stream?.previewRevision?.() ?? 0) > previewRevisionBeforeFlush; - if (!previewUpdated) { - params.log( - `telegram: ${laneName} draft preview update not emitted; falling back to standard send`, - ); - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); - return delivered ? result("sent") : result("skipped"); - } - lane.lastPartialText = text; - params.markDelivered(); - return result("preview-updated"); - } const updated = await tryUpdatePreviewForLane({ lane, laneName,