diff --git a/CHANGELOG.md b/CHANGELOG.md index 4bd305e417c..520979d2b25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai - Agents/generated media: treat attachment-style message tool actions as completed chat sends, preventing duplicate fallback media posts when generated files were already uploaded. - Control UI/sessions: show each session's agent runtime in the Sessions table and allow filtering by runtime labels, matching the Agents panel runtime wording. Thanks @vincentkoc. - Discord/streaming: show live reasoning text in progress drafts instead of a bare `Reasoning` status line. +- Telegram/streaming: finalize text replies by stopping the edited stream message instead of sending a second answer bubble, so Telegram turns cannot duplicate the streamed final response. - Gateway/status: avoid marking fast repeated health/status samples as event-loop degraded from CPU/utilization alone until the Gateway has accumulated a sustained sampling window. Thanks @shakkernerd. - Plugins/update: keep installed official npm and ClawHub plugins such as Codex, Discord, WhatsApp, and diagnostics plugins synced during host updates even when disabled or previously exact-pinned, while preserving third-party plugin pins. Thanks @vincentkoc. - Doctor/status: warn when `OPENCLAW_GATEWAY_TOKEN` would shadow a different active `gateway.auth.token` source for local CLI commands, while avoiding false positives when config points at the same env token. Fixes #74271. Thanks @yelog. diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index a80a10480cc..3d50a01888c 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -81,13 +81,11 @@ import { import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js"; import { markdownToTelegramChunks, renderTelegramHtmlText } from "./format.js"; import { - type ArchivedPreview, createLaneDeliveryStateTracker, createLaneTextDeliverer, type DraftLaneState, type LaneDeliveryResult, type LaneName, - type LanePreviewLifecycle, } from "./lane-delivery.js"; import { createTelegramReasoningStepState, @@ -354,7 +352,7 @@ export const dispatchTelegramMessage = async ({ channel: "telegram", accountId: route.accountId, }); - const renderDraftPreview = (text: string) => ({ + const renderStreamText = (text: string) => ({ text: renderTelegramHtmlText(text, { tableMode }), parseMode: "HTML" as const, }); @@ -369,7 +367,7 @@ export const dispatchTelegramMessage = async ({ }); const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on"; const streamReasoningDraft = resolvedReasoningLevel === "stream"; - const previewStreamingEnabled = streamMode !== "off"; + const streamDeliveryEnabled = streamMode !== "off"; const rawReplyQuoteText = ctxPayload.ReplyToIsQuote && typeof ctxPayload.ReplyToQuoteText === "string" ? ctxPayload.ReplyToQuoteText @@ -418,7 +416,7 @@ export const dispatchTelegramMessage = async ({ } const hasTelegramQuoteReply = replyToMode !== "off" && replyQuoteText != null; const canStreamAnswerDraft = - previewStreamingEnabled && + streamDeliveryEnabled && !hasTelegramQuoteReply && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning; @@ -430,8 +428,6 @@ export const dispatchTelegramMessage = async ({ const draftMinInitialChars = streamMode === "progress" ? 0 : DRAFT_MIN_INITIAL_CHARS; const progressSeed = `${route.accountId}:${chatId}:${threadSpec.id ?? ""}`; const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); - const archivedAnswerPreviews: ArchivedPreview[] = []; - const archivedReasoningPreviewIds: number[] = []; const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => { const stream = enabled ? (telegramDeps.createTelegramDraftStream ?? createTelegramDraftStream)({ @@ -441,24 +437,14 @@ export const dispatchTelegramMessage = async ({ thread: threadSpec, replyToMessageId: draftReplyToMessageId, minInitialChars: draftMinInitialChars, - renderText: renderDraftPreview, - onSupersededPreview: - laneName === "answer" || laneName === "reasoning" - ? (preview) => { - if (laneName === "reasoning") { - if (!archivedReasoningPreviewIds.includes(preview.messageId)) { - archivedReasoningPreviewIds.push(preview.messageId); - } - return; - } - archivedAnswerPreviews.push({ - messageId: preview.messageId, - textSnapshot: preview.textSnapshot, - visibleSinceMs: preview.visibleSinceMs, - deleteIfUnused: true, - }); - } - : undefined, + renderText: renderStreamText, + onSupersededPreview: (superseded) => { + void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => { + logVerbose( + `telegram: superseded ${laneName} stream cleanup failed (${superseded.messageId}): ${String(err)}`, + ); + }); + }, log: logVerbose, warn: logVerbose, }) @@ -467,43 +453,36 @@ export const dispatchTelegramMessage = async ({ stream, lastPartialText: "", hasStreamedMessage: false, + finalized: false, }; }; const lanes: Record = { answer: createDraftLane("answer", canStreamAnswerDraft), reasoning: createDraftLane("reasoning", canStreamReasoningDraft), }; - const activePreviewLifecycleByLane: Record = { - answer: "transient", - reasoning: "transient", - }; - const retainPreviewOnCleanupByLane: Record = { - answer: false, - reasoning: false, - }; const answerLane = lanes.answer; const reasoningLane = lanes.reasoning; - const previewToolProgressEnabled = + const streamToolProgressEnabled = Boolean(answerLane.stream) && resolveChannelStreamingPreviewToolProgress(telegramCfg); - let previewToolProgressSuppressed = false; - let previewToolProgressLines: string[] = []; - let answerLaneHasAssistantContent = false; + let streamToolProgressSuppressed = false; + let streamToolProgressLines: string[] = []; const renderProgressDraft = async (options?: { flush?: boolean }) => { if (!answerLane.stream || streamMode !== "progress") { return; } - const previewText = formatChannelProgressDraftText({ + const streamText = formatChannelProgressDraftText({ entry: telegramCfg, - lines: previewToolProgressLines, + lines: streamToolProgressLines, seed: progressSeed, formatLine: formatProgressAsMarkdownCode, }); - if (!previewText || previewText === answerLane.lastPartialText) { + if (!streamText || streamText === answerLane.lastPartialText) { return; } - answerLane.lastPartialText = previewText; + answerLane.lastPartialText = streamText; answerLane.hasStreamedMessage = true; - answerLane.stream.update(previewText); + answerLane.finalized = false; + answerLane.stream.update(streamText); if (options?.flush) { await answerLane.stream.flush(); } @@ -511,7 +490,7 @@ export const dispatchTelegramMessage = async ({ const progressDraftGate = createChannelProgressDraftGate({ onStart: () => renderProgressDraft({ flush: true }), }); - const pushPreviewToolProgress = async (line?: string, options?: { toolName?: string }) => { + const pushStreamToolProgress = async (line?: string, options?: { toolName?: string }) => { if (!answerLane.stream) { return; } @@ -520,31 +499,32 @@ export const dispatchTelegramMessage = async ({ } const normalized = sanitizeProgressMarkdownText(line?.replace(/\s+/g, " ").trim() ?? ""); if (streamMode !== "progress") { - if (!previewToolProgressEnabled || previewToolProgressSuppressed || !normalized) { + if (!streamToolProgressEnabled || streamToolProgressSuppressed || !normalized) { return; } - const previous = previewToolProgressLines.at(-1); + const previous = streamToolProgressLines.at(-1); if (previous === normalized) { return; } - previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + streamToolProgressLines = [...streamToolProgressLines, normalized].slice( -resolveChannelProgressDraftMaxLines(telegramCfg), ); - const previewText = formatChannelProgressDraftText({ + const streamText = formatChannelProgressDraftText({ entry: telegramCfg, - lines: previewToolProgressLines, + lines: streamToolProgressLines, seed: progressSeed, formatLine: formatProgressAsMarkdownCode, }); - answerLane.lastPartialText = previewText; + answerLane.lastPartialText = streamText; answerLane.hasStreamedMessage = true; - answerLane.stream.update(previewText); + answerLane.finalized = false; + answerLane.stream.update(streamText); return; } - if (previewToolProgressEnabled && !previewToolProgressSuppressed && normalized) { - const previous = previewToolProgressLines.at(-1); + if (streamToolProgressEnabled && !streamToolProgressSuppressed && normalized) { + const previous = streamToolProgressLines.at(-1); if (previous !== normalized) { - previewToolProgressLines = [...previewToolProgressLines, normalized].slice( + streamToolProgressLines = [...streamToolProgressLines, normalized].slice( -resolveChannelProgressDraftMaxLines(telegramCfg), ); } @@ -556,9 +536,6 @@ export const dispatchTelegramMessage = async ({ } }; let splitReasoningOnNextStream = false; - let skipNextAnswerMessageStartRotation = false; - let pendingCompactionReplayBoundary = false; - let discardAnswerPreviewOnNextRotation = false; let draftLaneEventQueue = Promise.resolve(); const reasoningStepState = createTelegramReasoningStepState(); const enqueueDraftLaneEvent = (task: () => Promise): Promise => { @@ -597,35 +574,22 @@ export const dispatchTelegramMessage = async ({ const resetDraftLaneState = (lane: DraftLaneState) => { lane.lastPartialText = ""; lane.hasStreamedMessage = false; + lane.finalized = false; }; - const rotateAnswerLaneForNewAssistantMessage = async () => { - let didForceNewMessage = false; - if (answerLane.hasStreamedMessage) { - const materializedId = await answerLane.stream?.materialize?.(); - const previewMessageId = materializedId ?? answerLane.stream?.messageId(); - if ( - !discardAnswerPreviewOnNextRotation && - typeof previewMessageId === "number" && - activePreviewLifecycleByLane.answer === "transient" - ) { - archivedAnswerPreviews.push({ - messageId: previewMessageId, - textSnapshot: answerLane.lastPartialText, - visibleSinceMs: answerLane.stream?.visibleSinceMs?.(), - deleteIfUnused: !answerLaneHasAssistantContent, - }); - } - answerLane.stream?.forceNewMessage(); - didForceNewMessage = true; + const rotateLaneForNewMessage = async (lane: DraftLaneState) => { + if (!lane.hasStreamedMessage && typeof lane.stream?.messageId() !== "number") { + resetDraftLaneState(lane); + return; } - discardAnswerPreviewOnNextRotation = false; - resetDraftLaneState(answerLane); - answerLaneHasAssistantContent = false; - if (didForceNewMessage) { - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; + await lane.stream?.stop(); + lane.stream?.forceNewMessage(); + resetDraftLaneState(lane); + }; + const prepareAnswerLaneForText = async () => { + if (!answerLane.finalized) { + return; } - return didForceNewMessage; + await rotateLaneForNewMessage(answerLane); }; const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => { const laneStream = lane.stream; @@ -639,11 +603,11 @@ export const dispatchTelegramMessage = async ({ if (streamMode === "progress") { return; } - answerLaneHasAssistantContent = true; - previewToolProgressSuppressed = true; - previewToolProgressLines = []; + streamToolProgressSuppressed = true; + streamToolProgressLines = []; } lane.hasStreamedMessage = true; + lane.finalized = false; if ( lane.lastPartialText && lane.lastPartialText.startsWith(text) && @@ -656,11 +620,10 @@ export const dispatchTelegramMessage = async ({ }; const ingestDraftLaneSegments = async (text: string | undefined) => { const split = splitTextIntoLaneSegments(text); - const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer"); - if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") { - skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage(); - } for (const segment of split.segments) { + if (segment.lane === "answer") { + await prepareAnswerLaneForText(); + } if (segment.lane === "reasoning") { reasoningStepState.noteReasoningHint(); reasoningStepState.noteReasoningDelivered(); @@ -676,7 +639,7 @@ export const dispatchTelegramMessage = async ({ }; const resolvedBlockStreamingEnabled = resolveChannelStreamingBlockEnabled(telegramCfg); - const disableBlockStreaming = !previewStreamingEnabled + const disableBlockStreaming = !streamDeliveryEnabled ? true : forceBlockStreamingForReasoning ? false @@ -801,7 +764,7 @@ export const dispatchTelegramMessage = async ({ } = next; return followUp; }; - const splitFinalTextForPreview = (text: string): string[] => { + const splitFinalTextForStream = (text: string): string[] => { const markdownChunks = chunkMode === "newline" ? chunkMarkdownTextWithMode(text, draftMaxChars, chunkMode) @@ -830,7 +793,6 @@ export const dispatchTelegramMessage = async ({ } return payload.replyToId != null && replyQuoteByMessageId[payload.replyToId] != null; }; - let lastVisibleNonPreviewDeliveryAtMs: number | undefined; const sendPayload = async ( payload: ReplyPayload, options?: { durable?: boolean; silent?: boolean }, @@ -875,7 +837,6 @@ export const dispatchTelegramMessage = async ({ } if (durable.status === "handled_visible") { deliveryState.markDelivered(); - lastVisibleNonPreviewDeliveryAtMs = Date.now(); return true; } if (durable.status === "handled_no_send") { @@ -891,7 +852,6 @@ export const dispatchTelegramMessage = async ({ }); if (result.delivered) { deliveryState.markDelivered(); - lastVisibleNonPreviewDeliveryAtMs = Date.now(); } return result.delivered; }; @@ -912,19 +872,19 @@ export const dispatchTelegramMessage = async ({ }; const deliverLaneText = createLaneTextDeliverer({ lanes, - archivedAnswerPreviews, - activePreviewLifecycleByLane, - retainPreviewOnCleanupByLane, draftMaxChars, applyTextToPayload, applyTextToFollowUpPayload, - splitFinalTextForPreview, + splitFinalTextForStream: splitFinalTextForStream, sendPayload, flushDraftLane, stopDraftLane: async (lane) => { await lane.stream?.stop(); }, - editPreview: async ({ messageId, text, previewButtons }) => { + clearDraftLane: async (lane) => { + await lane.stream?.clear(); + }, + editStreamMessage: async ({ messageId, text, buttons }) => { if (isDispatchSuperseded()) { return; } @@ -933,20 +893,13 @@ export const dispatchTelegramMessage = async ({ cfg, accountId: route.accountId, linkPreview: telegramCfg.linkPreview, - buttons: previewButtons, + buttons, }); }, - deletePreviewMessage: async (messageId) => { - if (isDispatchSuperseded()) { - return; - } - await bot.api.deleteMessage(chatId, messageId); - }, log: logVerbose, markDelivered: () => { deliveryState.markDelivered(); }, - getLastVisibleNonPreviewDeliveryAtMs: () => lastVisibleNonPreviewDeliveryAtMs, }); if (isDmTopic) { @@ -1026,14 +979,6 @@ export const dispatchTelegramMessage = async ({ if (isDispatchSuperseded()) { return; } - const markVisibleNonPreviewBoundary = (didDeliver: boolean) => { - if (didDeliver && info.kind !== "final") { - pendingCompactionReplayBoundary = false; - if (answerLane.hasStreamedMessage) { - discardAnswerPreviewOnNextRotation = true; - } - } - }; if (payload.isError === true) { hadErrorReplyFailureOrSkip = true; } @@ -1050,7 +995,7 @@ export const dispatchTelegramMessage = async ({ queuedFinal = true; return; } - const previewButtons = ( + const telegramButtons = ( payload.channelData?.telegram as | { buttons?: TelegramInlineButtons } | undefined @@ -1076,7 +1021,7 @@ export const dispatchTelegramMessage = async ({ text: buffered.text, payload: buffered.payload, infoKind: "final", - previewButtons: bufferedButtons, + buttons: bufferedButtons, }); reasoningStepState.resetForNextStep(); }; @@ -1102,13 +1047,10 @@ export const dispatchTelegramMessage = async ({ text: segment.text, payload, infoKind: info.kind, - previewButtons, - allowPreviewUpdateForNonFinal: segment.lane === "reasoning", + buttons: telegramButtons, }); if (info.kind === "final") { emitPreviewFinalizedHook(result); - } else if (segment.lane === "answer" && result.kind === "sent") { - markVisibleNonPreviewBoundary(true); } if (segment.lane === "reasoning") { if (result.kind !== "skipped") { @@ -1122,24 +1064,18 @@ export const dispatchTelegramMessage = async ({ } } if (segments.length > 0) { - if (info.kind === "final") { - pendingCompactionReplayBoundary = false; - } return; } if (split.suppressedReasoningOnly) { if (reply.hasMedia) { const payloadWithoutSuppressedReasoning = typeof payload.text === "string" ? { ...payload, text: "" } : payload; - markVisibleNonPreviewBoundary( - await sendPayload(payloadWithoutSuppressedReasoning, { - durable: info.kind === "final", - }), - ); + await sendPayload(payloadWithoutSuppressedReasoning, { + durable: info.kind === "final", + }); } if (info.kind === "final") { await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; } return; } @@ -1153,16 +1089,12 @@ export const dispatchTelegramMessage = async ({ if (!canSendAsIs) { if (info.kind === "final") { await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; } return; } - markVisibleNonPreviewBoundary( - await sendPayload(payload, { durable: info.kind === "final" }), - ); + await sendPayload(payload, { durable: info.kind === "final" }); if (info.kind === "final") { await flushBufferedFinalAnswer(); - pendingCompactionReplayBoundary = false; } }, onSkip: (payload, info) => { @@ -1225,46 +1157,29 @@ export const dispatchTelegramMessage = async ({ ? () => enqueueDraftLaneEvent(async () => { reasoningStepState.resetForNextStep(); - previewToolProgressSuppressed = false; - previewToolProgressLines = []; - if (skipNextAnswerMessageStartRotation) { - skipNextAnswerMessageStartRotation = false; - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - return; + streamToolProgressSuppressed = false; + streamToolProgressLines = []; + if (answerLane.finalized) { + await rotateLaneForNewMessage(answerLane); } - if (streamMode === "progress") { - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - return; - } - if (pendingCompactionReplayBoundary) { - pendingCompactionReplayBoundary = false; - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; - return; - } - await rotateAnswerLaneForNewAssistantMessage(); - activePreviewLifecycleByLane.answer = "transient"; - retainPreviewOnCleanupByLane.answer = false; }) : undefined, onReasoningEnd: reasoningLane.stream ? () => enqueueDraftLaneEvent(async () => { splitReasoningOnNextStream = reasoningLane.hasStreamedMessage; - previewToolProgressSuppressed = false; - previewToolProgressLines = []; + streamToolProgressSuppressed = false; + streamToolProgressLines = []; }) : undefined, suppressDefaultToolProgressMessages: - !previewStreamingEnabled || Boolean(answerLane.stream), + !streamDeliveryEnabled || Boolean(answerLane.stream), onToolStart: async (payload) => { const toolName = payload.name?.trim(); if (statusReactionController && toolName) { await statusReactionController.setTool(toolName); } - await pushPreviewToolProgress( + await pushStreamToolProgress( formatChannelProgressDraftLineForEntry( telegramCfg, { @@ -1279,7 +1194,7 @@ export const dispatchTelegramMessage = async ({ ); }, onItemEvent: async (payload) => { - await pushPreviewToolProgress( + await pushStreamToolProgress( formatChannelProgressDraftLineForEntry(telegramCfg, { event: "item", itemKind: payload.kind, @@ -1297,7 +1212,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "update") { return; } - await pushPreviewToolProgress( + await pushStreamToolProgress( formatChannelProgressDraftLine({ event: "plan", phase: payload.phase, @@ -1311,7 +1226,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "requested") { return; } - await pushPreviewToolProgress( + await pushStreamToolProgress( formatChannelProgressDraftLine({ event: "approval", phase: payload.phase, @@ -1326,7 +1241,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "end") { return; } - await pushPreviewToolProgress( + await pushStreamToolProgress( formatChannelProgressDraftLine({ event: "command-output", phase: payload.phase, @@ -1341,7 +1256,7 @@ export const dispatchTelegramMessage = async ({ if (payload.phase !== "end") { return; } - await pushPreviewToolProgress( + await pushStreamToolProgress( formatChannelProgressDraftLine({ event: "patch", phase: payload.phase, @@ -1354,20 +1269,11 @@ export const dispatchTelegramMessage = async ({ }), ); }, - onCompactionStart: - statusReactionController || answerLane.stream - ? async () => { - if ( - answerLane.hasStreamedMessage && - activePreviewLifecycleByLane.answer === "transient" - ) { - pendingCompactionReplayBoundary = true; - } - if (statusReactionController) { - await statusReactionController.setCompacting(); - } - } - : undefined, + onCompactionStart: statusReactionController + ? async () => { + await statusReactionController.setCompacting(); + } + : undefined, onCompactionEnd: statusReactionController ? async () => { statusReactionController.cancelPending(); @@ -1392,76 +1298,25 @@ export const dispatchTelegramMessage = async ({ } finally { await draftLaneEventQueue; progressDraftGate.cancel(); - if (isDispatchSuperseded()) { - if (answerLane.hasStreamedMessage || typeof answerLane.stream?.messageId() === "number") { - retainPreviewOnCleanupByLane.answer = true; - } - for (const archivedPreview of archivedAnswerPreviews) { - archivedPreview.deleteIfUnused = false; - } - } - const streamCleanupStates = new Map< - NonNullable, - { shouldClear: boolean } - >(); const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [ { laneName: "answer", lane: answerLane }, { laneName: "reasoning", lane: reasoningLane }, ]; - for (const laneState of lanesToCleanup) { - const stream = laneState.lane.stream; + for (const { lane } of lanesToCleanup) { + const stream = lane.stream; if (!stream) { continue; } - const activePreviewMessageId = stream.messageId(); - const hasBoundaryFinalizedActivePreview = - laneState.laneName === "answer" && - typeof activePreviewMessageId === "number" && - archivedAnswerPreviews.some( - (p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId, - ); - const shouldClear = - !retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview; - const existing = streamCleanupStates.get(stream); - if (!existing) { - streamCleanupStates.set(stream, { shouldClear }); - continue; - } - existing.shouldClear = existing.shouldClear && shouldClear; - } - for (const [stream, cleanupState] of streamCleanupStates) { if (isDispatchSuperseded()) { await (typeof stream.discard === "function" ? stream.discard() : stream.stop()); continue; } - await stream.stop(); - if (cleanupState.shouldClear) { + if (lane.finalized) { + await stream.stop(); + } else { await stream.clear(); } } - if (!isDispatchSuperseded()) { - for (const archivedPreview of archivedAnswerPreviews) { - if (archivedPreview.deleteIfUnused === false) { - continue; - } - try { - await bot.api.deleteMessage(chatId, archivedPreview.messageId); - } catch (err) { - logVerbose( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - } - for (const messageId of archivedReasoningPreviewIds) { - try { - await bot.api.deleteMessage(chatId, messageId); - } catch (err) { - logVerbose( - `telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`, - ); - } - } - } } } finally { dispatchWasSuperseded = isDispatchSuperseded(); diff --git a/extensions/telegram/src/lane-delivery-text-deliverer.ts b/extensions/telegram/src/lane-delivery-text-deliverer.ts index ec490e123a5..c1187f312ad 100644 --- a/extensions/telegram/src/lane-delivery-text-deliverer.ts +++ b/extensions/telegram/src/lane-delivery-text-deliverer.ts @@ -1,55 +1,11 @@ import { createPreviewMessageReceipt, - defineFinalizableLivePreviewAdapter, - deliverWithFinalizableLivePreviewAdapter, type MessageReceipt, } from "openclaw/plugin-sdk/channel-message"; import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import type { TelegramInlineButtons } from "./button-types.js"; import type { TelegramDraftStream } from "./draft-stream.js"; -import { - isRecoverableTelegramNetworkError, - isSafeToRetrySendError, - isTelegramClientRejection, -} from "./network-errors.js"; - -const MESSAGE_NOT_MODIFIED_RE = - /400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i; -const MESSAGE_NOT_FOUND_RE = - /400:\s*Bad Request:\s*message to edit not found|MESSAGE_ID_INVALID|message can't be edited/i; -const LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS = 60_000; - -function extractErrorText(err: unknown): string { - return typeof err === "string" - ? err - : err instanceof Error - ? err.message - : typeof err === "object" && err && "description" in err - ? typeof err.description === "string" - ? err.description - : "" - : ""; -} - -function isMessageNotModifiedError(err: unknown): boolean { - return MESSAGE_NOT_MODIFIED_RE.test(extractErrorText(err)); -} - -/** - * Returns true when Telegram rejects an edit because the target message can no - * longer be resolved or edited. The caller still needs preview context to - * decide whether to retain a different visible preview or fall back to send. - */ -function isMissingPreviewMessageError(err: unknown): boolean { - return MESSAGE_NOT_FOUND_RE.test(extractErrorText(err)); -} - -function isIncompleteFinalPreviewPrefix(previewText: string, finalText: string): boolean { - const preview = previewText.trimEnd(); - const final = finalText.trimEnd(); - return preview.length > 0 && preview.length < final.length && final.startsWith(preview); -} export type LaneName = "answer" | "reasoning"; @@ -57,26 +13,9 @@ export type DraftLaneState = { stream: TelegramDraftStream | undefined; lastPartialText: string; hasStreamedMessage: boolean; + finalized: boolean; }; -export type ArchivedPreview = { - messageId: number; - textSnapshot: string; - visibleSinceMs?: number; - // Boundary-finalized previews should remain visible even if no matching - // final edit arrives; superseded previews can be safely deleted. - deleteIfUnused?: boolean; -}; - -export type LanePreviewLifecycle = "transient" | "complete"; - -export type LaneDeliveryResult = - | { - kind: "preview-finalized"; - delivery: LanePreviewFinalizedDelivery; - } - | { kind: "preview-retained" | "preview-updated" | "sent" | "skipped" }; - type LanePreviewFinalizedDelivery = { content: string; messageId: number; @@ -87,36 +26,34 @@ type LanePreviewFinalizedDeliveryInput = Omit; - archivedAnswerPreviews: ArchivedPreview[]; - activePreviewLifecycleByLane: Record; - retainPreviewOnCleanupByLane: Record; draftMaxChars: number; applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload; applyTextToFollowUpPayload?: (payload: ReplyPayload, text: string) => ReplyPayload; - splitFinalTextForPreview?: (text: string) => readonly string[]; + splitFinalTextForStream?: (text: string) => readonly string[]; sendPayload: ( payload: ReplyPayload, options?: { durable?: boolean; silent?: boolean }, ) => Promise; flushDraftLane: (lane: DraftLaneState) => Promise; stopDraftLane: (lane: DraftLaneState) => Promise; - editPreview: (params: { + clearDraftLane: (lane: DraftLaneState) => Promise; + editStreamMessage: (params: { laneName: LaneName; messageId: number; text: string; - context: "final" | "update"; - previewButtons?: TelegramInlineButtons; + buttons?: TelegramInlineButtons; }) => Promise; - deletePreviewMessage: (messageId: number) => Promise; log: (message: string) => void; markDelivered: () => void; - now?: () => number; - // Force fresh final when a visible non-preview message has been delivered - // since the active preview was created, even if the preview is younger - // than the long-lived threshold (#76529). - getLastVisibleNonPreviewDeliveryAtMs?: () => number | undefined; }; type DeliverLaneTextParams = { @@ -124,55 +61,7 @@ type DeliverLaneTextParams = { text: string; payload: ReplyPayload; infoKind: string; - previewButtons?: TelegramInlineButtons; - allowPreviewUpdateForNonFinal?: boolean; -}; - -type TryUpdatePreviewParams = { - lane: DraftLaneState; - laneName: LaneName; - text: string; - previewButtons?: TelegramInlineButtons; - stopBeforeEdit?: boolean; - updateLaneSnapshot?: boolean; - skipRegressive: RegressiveSkipMode; - context: "final" | "update"; - previewMessageId?: number; - previewTextSnapshot?: string; -}; - -type PreviewEditResult = "edited" | "retained" | "regressive-skipped" | "fallback"; - -type ConsumeArchivedAnswerPreviewParams = { - lane: DraftLaneState; - text: string; - payload: ReplyPayload; - previewButtons?: TelegramInlineButtons; - canEditViaPreview: boolean; -}; - -type PreviewUpdateContext = "final" | "update"; -type RegressiveSkipMode = "always" | "existingOnly" | "never"; - -type ResolvePreviewTargetParams = { - lane: DraftLaneState; - previewMessageIdOverride?: number; - stopBeforeEdit: boolean; - context: PreviewUpdateContext; -}; - -type PreviewTargetResolution = { - hadPreviewMessage: boolean; - previewMessageId: number | undefined; - stopCreatesFirstPreview: boolean; -}; - -type TelegramPreviewFinalEdit = { - laneName: LaneName; - messageId: number; - text: string; - context: "final" | "update"; - previewButtons?: TelegramInlineButtons; + buttons?: TelegramInlineButtons; }; function result( @@ -192,485 +81,113 @@ function result( return { kind }; } -function shouldSkipRegressivePreviewUpdate(args: { - currentPreviewText: string | undefined; - text: string; - skipRegressive: RegressiveSkipMode; - hadPreviewMessage: boolean; -}): boolean { - const currentPreviewText = args.currentPreviewText; - if (currentPreviewText === undefined) { - return false; - } - if (args.skipRegressive === "never") { - return false; - } - return ( - currentPreviewText.startsWith(args.text) && - args.text.length < currentPreviewText.length && - (args.skipRegressive === "always" || args.hadPreviewMessage) - ); -} - -function isLongLivedPreview(visibleSinceMs: number | undefined, nowMs: number): boolean { - return ( - typeof visibleSinceMs === "number" && - Number.isFinite(visibleSinceMs) && - nowMs - visibleSinceMs >= LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS - ); -} - -function compactPreviewFinalChunks(chunks: readonly string[]): string[] { - const result: string[] = []; - let pendingWhitespace = ""; +function compactChunks(chunks: readonly string[]): string[] { + const out: string[] = []; + let whitespace = ""; for (const chunk of chunks) { if (!chunk) { continue; } if (chunk.trim().length === 0) { - pendingWhitespace += chunk; + whitespace += chunk; continue; } - result.push(`${pendingWhitespace}${chunk}`); - pendingWhitespace = ""; + out.push(`${whitespace}${chunk}`); + whitespace = ""; } - if (pendingWhitespace && result.length > 0) { - result[result.length - 1] = `${result[result.length - 1]}${pendingWhitespace}`; + if (whitespace && out.length > 0) { + out[out.length - 1] = `${out[out.length - 1]}${whitespace}`; } - return result; -} - -function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTargetResolution { - const lanePreviewMessageId = params.lane.stream?.messageId(); - const previewMessageId = - typeof params.previewMessageIdOverride === "number" - ? params.previewMessageIdOverride - : lanePreviewMessageId; - const hadPreviewMessage = - typeof params.previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number"; - return { - hadPreviewMessage, - previewMessageId: typeof previewMessageId === "number" ? previewMessageId : undefined, - stopCreatesFirstPreview: - params.stopBeforeEdit && !hadPreviewMessage && params.context === "final", - }; + return out; } export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { - const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; - const readNow = () => params.now?.() ?? Date.now(); - const markActivePreviewComplete = (laneName: LaneName) => { - params.activePreviewLifecycleByLane[laneName] = "complete"; - params.retainPreviewOnCleanupByLane[laneName] = true; - }; - const isMessagePreviewLane = (lane: DraftLaneState) => lane.stream != null; - const wasVisiblyOverwrittenSince = (visibleSinceMs: number | undefined): boolean => { - if (typeof visibleSinceMs !== "number") { - return false; - } - const lastNonPreviewAt = params.getLastVisibleNonPreviewDeliveryAtMs?.(); - return typeof lastNonPreviewAt === "number" && lastNonPreviewAt > visibleSinceMs; - }; - const shouldUseFreshFinalForLane = (lane: DraftLaneState) => { - if (!isMessagePreviewLane(lane)) { - return false; - } - const visibleSinceMs = lane.stream?.visibleSinceMs?.(); - return ( - isLongLivedPreview(visibleSinceMs, readNow()) || wasVisiblyOverwrittenSince(visibleSinceMs) - ); - }; - const shouldUseFreshFinalForPreview = (lane: DraftLaneState, visibleSinceMs?: number) => - isMessagePreviewLane(lane) && - (isLongLivedPreview(visibleSinceMs, readNow()) || wasVisiblyOverwrittenSince(visibleSinceMs)); - const buildFollowUpPayload = (payload: ReplyPayload, text: string) => + const followUpPayload = (payload: ReplyPayload, text: string) => params.applyTextToFollowUpPayload ? params.applyTextToFollowUpPayload(payload, text) : params.applyTextToPayload(payload, text); - const clearActivePreviewAfterFreshFinal = async (lane: DraftLaneState, laneName: LaneName) => { - try { - await lane.stream?.clear(); - } catch (err) { - params.log(`telegram: ${laneName} fresh final preview cleanup failed: ${String(err)}`); + + const clearUnfinalizedStream = async (lane: DraftLaneState) => { + if (!lane.stream || lane.finalized) { + return; } + await params.clearDraftLane(lane); lane.lastPartialText = ""; lane.hasStreamedMessage = false; - lane.stream?.forceNewMessage(); }; - const tryEditPreviewMessage = async (args: { - laneName: LaneName; - messageId: number; - text: string; - context: "final" | "update"; - previewButtons?: TelegramInlineButtons; - updateLaneSnapshot: boolean; - lane: DraftLaneState; - finalTextAlreadyLanded: boolean; - retainAlternatePreviewOnMissingTarget: boolean; - targetPreviewText: string; - }): Promise => { - const previewEditState: { result: PreviewEditResult } = { result: "fallback" }; - const adapter = defineFinalizableLivePreviewAdapter< - { text: string }, - number, - TelegramPreviewFinalEdit - >({ - draft: { - flush: async () => {}, - clear: async () => {}, - id: () => args.messageId, - }, - buildFinalEdit: (payload) => ({ - laneName: args.laneName, - messageId: args.messageId, - text: payload.text, - context: args.context, - ...(args.previewButtons ? { previewButtons: args.previewButtons } : {}), - }), - editFinal: async (_messageId, edit) => { - try { - await params.editPreview(edit); - } catch (err) { - if (isMessageNotModifiedError(err)) { - params.log( - `telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`, - ); - return; - } - throw err; - } - }, - createPreviewReceipt: (messageId) => createPreviewMessageReceipt({ id: messageId }), - onPreviewFinalized: () => { - if (args.updateLaneSnapshot) { - args.lane.lastPartialText = args.text; - } - params.markDelivered(); - previewEditState.result = "edited"; - }, - handlePreviewEditError: ({ error: err }) => { - previewEditState.result = "fallback"; - if (isMessageNotModifiedError(err)) { - params.log( - `telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`, - ); - params.markDelivered(); - previewEditState.result = "edited"; - return "retain"; - } - if (args.context === "final") { - if (args.finalTextAlreadyLanded) { - params.log( - `telegram: ${args.laneName} preview final edit failed after stop flush; keeping existing preview (${String(err)})`, - ); - params.markDelivered(); - previewEditState.result = "retained"; - return "retain"; - } - if (isSafeToRetrySendError(err)) { - params.log( - `telegram: ${args.laneName} preview final edit failed before reaching Telegram; falling back to standard send (${String(err)})`, - ); - return "fallback"; - } - if (isMissingPreviewMessageError(err)) { - if (args.retainAlternatePreviewOnMissingTarget) { - params.log( - `telegram: ${args.laneName} preview final edit target missing; keeping alternate preview without fallback (${String(err)})`, - ); - params.markDelivered(); - previewEditState.result = "retained"; - return "retain"; - } - params.log( - `telegram: ${args.laneName} preview final edit target missing with no alternate preview; falling back to standard send (${String(err)})`, - ); - return "fallback"; - } - if (isRecoverableTelegramNetworkError(err, { allowMessageMatch: true })) { - params.log( - `telegram: ${args.laneName} preview final edit may have landed despite network error; keeping existing preview (${String(err)})`, - ); - params.markDelivered(); - previewEditState.result = "retained"; - return "retain"; - } - if (isTelegramClientRejection(err)) { - params.log( - `telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`, - ); - return "fallback"; - } - if (isIncompleteFinalPreviewPrefix(args.targetPreviewText, args.text)) { - params.log( - `telegram: ${args.laneName} preview final edit failed and existing preview is an incomplete prefix; falling back to standard send (${String(err)})`, - ); - return "fallback"; - } - // Default: ambiguous error — retain when fallback may duplicate a final - // edit that already landed or when the preview is not known-incomplete. - params.log( - `telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`, - ); - params.markDelivered(); - previewEditState.result = "retained"; - return "retain"; - } - params.log( - `telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`, - ); - return "fallback"; - }, - }); - const delivered = await deliverWithFinalizableLivePreviewAdapter({ - kind: "final", - payload: { text: args.text }, - adapter, - deliverNormally: async () => false, - }); - if (delivered.kind === "preview-finalized" || previewEditState.result === "edited") { - return "edited"; + const streamText = async ( + laneName: LaneName, + lane: DraftLaneState, + text: string, + payload: ReplyPayload, + isFinal: boolean, + buttons?: TelegramInlineButtons, + ): Promise => { + const stream = lane.stream; + if (!stream || text.length === 0 || payload.isError) { + return undefined; } - if (delivered.kind === "preview-retained") { - return "retained"; + + const chunks = + text.length > params.draftMaxChars + ? compactChunks(params.splitFinalTextForStream?.(text) ?? []) + : [text]; + const [firstChunk, ...remainingChunks] = chunks; + if (!firstChunk || firstChunk.length > params.draftMaxChars) { + return undefined; } - return "fallback"; - }; - const tryDeliverLongFinalThroughPreview = async (args: { - lane: DraftLaneState; - laneName: LaneName; - text: string; - payload: ReplyPayload; - previewButtons?: TelegramInlineButtons; - }): Promise => { + + lane.lastPartialText = firstChunk; + lane.hasStreamedMessage = true; + lane.finalized = false; + stream.update(firstChunk); + if (isFinal) { + await params.stopDraftLane(lane); + } else { + await params.flushDraftLane(lane); + } + + const messageId = stream.messageId(); + if (typeof messageId !== "number") { + if (isFinal && stream.sendMayHaveLanded?.()) { + lane.finalized = true; + params.markDelivered(); + return result("preview-retained"); + } + return undefined; + } + + const deliveredStreamText = stream.lastDeliveredText?.(); if ( - !args.lane.stream || - args.previewButtons !== undefined || - params.activePreviewLifecycleByLane[args.laneName] !== "transient" + isFinal && + deliveredStreamText !== undefined && + deliveredStreamText !== firstChunk.trimEnd() ) { return undefined; } - const chunks = compactPreviewFinalChunks(params.splitFinalTextForPreview?.(args.text) ?? []); - const [firstChunk, ...remainingChunks] = chunks; - if (!firstChunk || remainingChunks.length === 0 || firstChunk.length > params.draftMaxChars) { - return undefined; - } - await params.flushDraftLane(args.lane); - const previewMessageId = args.lane.stream.messageId(); - if (typeof previewMessageId !== "number") { - return undefined; - } - const finalized = await tryUpdatePreviewForLane({ - lane: args.lane, - laneName: args.laneName, - text: firstChunk, - stopBeforeEdit: true, - updateLaneSnapshot: true, - skipRegressive: "never", - context: "final", - }); - if (finalized === "fallback") { - return undefined; - } - if (finalized === "retained") { - markActivePreviewComplete(args.laneName); - return result("preview-retained"); - } - markActivePreviewComplete(args.laneName); - const remainingText = remainingChunks.join(""); - if (remainingText.trim().length > 0) { - await params.sendPayload(buildFollowUpPayload(args.payload, remainingText)); - } - return result("preview-finalized", { - content: args.text, - messageId: previewMessageId, - }); - }; - const tryUpdatePreviewForLane = async ({ - lane, - laneName, - text, - previewButtons, - stopBeforeEdit = false, - updateLaneSnapshot = false, - skipRegressive, - context, - previewMessageId: previewMessageIdOverride, - previewTextSnapshot, - }: TryUpdatePreviewParams): Promise => { - const editPreview = ( - messageId: number, - finalTextAlreadyLanded: boolean, - retainAlternatePreviewOnMissingTarget: boolean, - targetPreviewText: string, - ) => - tryEditPreviewMessage({ - laneName, - messageId, - text, - context, - previewButtons, - updateLaneSnapshot, - lane, - finalTextAlreadyLanded, - retainAlternatePreviewOnMissingTarget, - targetPreviewText, - }); - const finalizePreview = ( - previewMessageId: number, - finalTextAlreadyLanded: boolean, - hadPreviewMessage: boolean, - retainAlternatePreviewOnMissingTarget = false, - ): PreviewEditResult | Promise => { - const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane); - const shouldSkipRegressive = shouldSkipRegressivePreviewUpdate({ - currentPreviewText, - text, - skipRegressive, - hadPreviewMessage, - }); - if (shouldSkipRegressive) { - params.markDelivered(); - return "regressive-skipped"; - } - return editPreview( - previewMessageId, - finalTextAlreadyLanded, - retainAlternatePreviewOnMissingTarget, - currentPreviewText, - ); - }; - if (!lane.stream) { - return "fallback"; - } - const previewTargetBeforeStop = resolvePreviewTarget({ - lane, - previewMessageIdOverride, - stopBeforeEdit, - context, - }); - if (previewTargetBeforeStop.stopCreatesFirstPreview && lane.hasStreamedMessage) { - // Final stop() can create the first visible preview message. - // Prime pending text so the stop flush sends the final text snapshot. - lane.stream.update(text); - await params.stopDraftLane(lane); - const previewTargetAfterStop = resolvePreviewTarget({ - lane, - stopBeforeEdit: false, - context, - }); - if (typeof previewTargetAfterStop.previewMessageId !== "number") { - return "fallback"; - } - return finalizePreview(previewTargetAfterStop.previewMessageId, true, false); - } - if (stopBeforeEdit) { - await params.stopDraftLane(lane); - } - const previewTargetAfterStop = resolvePreviewTarget({ - lane, - previewMessageIdOverride, - stopBeforeEdit: false, - context, - }); - if (typeof previewTargetAfterStop.previewMessageId !== "number") { - // Only retain for final delivery when a prior preview is already visible - // to the user — otherwise falling back is safer than silence. For updates, - // always fall back so the caller can attempt sendPayload without stale - // markDelivered() state. - if (context === "final" && lane.hasStreamedMessage && lane.stream?.sendMayHaveLanded?.()) { - params.log( - `telegram: ${laneName} preview send may have landed despite missing message id; keeping to avoid duplicate`, - ); - params.markDelivered(); - return "retained"; - } - return "fallback"; - } - const activePreviewMessageId = lane.stream?.messageId(); - return finalizePreview( - previewTargetAfterStop.previewMessageId, - false, - previewTargetAfterStop.hadPreviewMessage, - typeof activePreviewMessageId === "number" && - activePreviewMessageId !== previewTargetAfterStop.previewMessageId, - ); - }; - - const consumeArchivedAnswerPreviewForFinal = async ({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }: ConsumeArchivedAnswerPreviewParams): Promise => { - const archivedPreview = params.archivedAnswerPreviews.shift(); - if (!archivedPreview) { - return undefined; - } - if (canEditViaPreview && shouldUseFreshFinalForPreview(lane, archivedPreview.visibleSinceMs)) { - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), { - durable: true, - }); - if (delivered) { - try { - await params.deletePreviewMessage(archivedPreview.messageId); - } catch (err) { - params.log( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); - } - return result("sent"); - } - } - if (canEditViaPreview) { - const finalized = await tryUpdatePreviewForLane({ - lane, - laneName: "answer", - text, - previewButtons, - stopBeforeEdit: false, - skipRegressive: "existingOnly", - context: "final", - previewMessageId: archivedPreview.messageId, - previewTextSnapshot: archivedPreview.textSnapshot, - }); - if (finalized === "edited") { - return result("preview-finalized", { - content: text, - messageId: archivedPreview.messageId, - }); - } - if (finalized === "regressive-skipped") { - return result("preview-finalized", { - content: archivedPreview.textSnapshot, - messageId: archivedPreview.messageId, - }); - } - if (finalized === "retained") { - params.retainPreviewOnCleanupByLane.answer = true; - return result("preview-retained"); - } - } - // Send the replacement message first, then clean up the old preview. - // This avoids the visual "disappear then reappear" flash. - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), { - durable: true, - }); - // Once this archived preview is consumed by a fallback final send, delete it - // regardless of deleteIfUnused. That flag only applies to unconsumed boundaries. - if (delivered || archivedPreview.deleteIfUnused !== false) { + params.markDelivered(); + if (buttons) { try { - await params.deletePreviewMessage(archivedPreview.messageId); + await params.editStreamMessage({ laneName, messageId, text: firstChunk, buttons }); } catch (err) { - params.log( - `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, - ); + params.log(`telegram: ${laneName} stream button edit failed: ${String(err)}`); } } - return delivered ? result("sent") : result("skipped"); + + if (isFinal) { + lane.finalized = true; + for (const chunk of remainingChunks) { + if (chunk.trim().length === 0) { + continue; + } + await params.sendPayload(followUpPayload(payload, chunk)); + } + return result("preview-finalized", { content: text, messageId }); + } + + return result("preview-updated"); }; return async ({ @@ -678,132 +195,28 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { text, payload, infoKind, - previewButtons, - allowPreviewUpdateForNonFinal = false, + buttons, }: DeliverLaneTextParams): Promise => { const lane = params.lanes[laneName]; const reply = resolveSendableOutboundReplyParts(payload, { text }); - const hasMedia = reply.hasMedia; - const canEditViaPreview = - !hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError; - - if (infoKind === "final") { - // Transient previews must decide cleanup retention per final attempt. - // Completed previews intentionally stay retained so later extra payloads - // do not clear the already-finalized message. - if (params.activePreviewLifecycleByLane[laneName] === "transient") { - params.retainPreviewOnCleanupByLane[laneName] = false; - } - if (laneName === "answer") { - const archivedResult = await consumeArchivedAnswerPreviewForFinal({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }); - if (archivedResult) { - return archivedResult; - } - } - if (canEditViaPreview && params.activePreviewLifecycleByLane[laneName] === "transient") { - await params.flushDraftLane(lane); - if (laneName === "answer") { - const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({ - lane, - text, - payload, - previewButtons, - canEditViaPreview, - }); - if (archivedResultAfterFlush) { - return archivedResultAfterFlush; - } - } - if (shouldUseFreshFinalForLane(lane)) { - await params.stopDraftLane(lane); - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), { - durable: true, - }); - if (delivered) { - await clearActivePreviewAfterFreshFinal(lane, laneName); - return result("sent"); - } - } - const previewMessageId = lane.stream?.messageId(); - const finalized = await tryUpdatePreviewForLane({ - lane, - laneName, - text, - previewButtons, - stopBeforeEdit: true, - skipRegressive: "existingOnly", - context: "final", - }); - const finalizedMessageId = previewMessageId ?? lane.stream?.messageId(); - if (finalized === "edited") { - markActivePreviewComplete(laneName); - if (typeof finalizedMessageId !== "number") { - return result("preview-retained"); - } - return result("preview-finalized", { - content: text, - messageId: finalizedMessageId, - }); - } - if (finalized === "regressive-skipped") { - markActivePreviewComplete(laneName); - if (typeof finalizedMessageId !== "number") { - return result("preview-retained"); - } - return result("preview-finalized", { - content: lane.lastPartialText, - messageId: finalizedMessageId, - }); - } - if (finalized === "retained") { - markActivePreviewComplete(laneName); - return result("preview-retained"); - } - } else if (!hasMedia && !payload.isError && text.length > params.draftMaxChars) { - const longFinalResult = await tryDeliverLongFinalThroughPreview({ - lane, - laneName, - text, - payload, - previewButtons, - }); - if (longFinalResult) { - return longFinalResult; - } - params.log( - `telegram: preview final too long for edit (${text.length} > ${params.draftMaxChars}); falling back to standard send`, - ); - } - await params.stopDraftLane(lane); - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), { - durable: true, - }); - return delivered ? result("sent") : result("skipped"); + const isFinal = infoKind === "final"; + const streamed = !reply.hasMedia + ? await streamText(laneName, lane, text, payload, isFinal, buttons) + : undefined; + if (streamed) { + return streamed; } - if (allowPreviewUpdateForNonFinal && canEditViaPreview) { - const updated = await tryUpdatePreviewForLane({ - lane, - laneName, - text, - previewButtons, - stopBeforeEdit: false, - updateLaneSnapshot: true, - skipRegressive: "always", - context: "update", - }); - if (updated === "edited" || updated === "regressive-skipped") { - return result("preview-updated"); - } + if (isFinal) { + await clearUnfinalizedStream(lane); } - const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), { + durable: isFinal, + }); + if (delivered && isFinal) { + lane.finalized = true; + } return delivered ? result("sent") : result("skipped"); }; } diff --git a/extensions/telegram/src/lane-delivery.ts b/extensions/telegram/src/lane-delivery.ts index 29357498e2e..d98a7eeb8e8 100644 --- a/extensions/telegram/src/lane-delivery.ts +++ b/extensions/telegram/src/lane-delivery.ts @@ -1,9 +1,7 @@ export { - type ArchivedPreview, createLaneTextDeliverer, type DraftLaneState, type LaneDeliveryResult, type LaneName, - type LanePreviewLifecycle, } from "./lane-delivery-text-deliverer.js"; export { createLaneDeliveryStateTracker } from "./lane-delivery-state.js";