fix(telegram): finalize streamed text in place

This commit is contained in:
Ayaan Zaidi
2026-05-05 22:13:03 +05:30
parent d7bd9fe049
commit bca16d0f00
4 changed files with 195 additions and 928 deletions

View File

@@ -118,6 +118,7 @@ Docs: https://docs.openclaw.ai
- Agents/generated media: treat attachment-style message tool actions as completed chat sends, preventing duplicate fallback media posts when generated files were already uploaded.
- Control UI/sessions: show each session's agent runtime in the Sessions table and allow filtering by runtime labels, matching the Agents panel runtime wording. Thanks @vincentkoc.
- Discord/streaming: show live reasoning text in progress drafts instead of a bare `Reasoning` status line.
- Telegram/streaming: finalize text replies by stopping the edited stream message instead of sending a second answer bubble, so Telegram turns cannot duplicate the streamed final response.
- Gateway/status: avoid marking fast repeated health/status samples as event-loop degraded from CPU/utilization alone until the Gateway has accumulated a sustained sampling window. Thanks @shakkernerd.
- Plugins/update: keep installed official npm and ClawHub plugins such as Codex, Discord, WhatsApp, and diagnostics plugins synced during host updates even when disabled or previously exact-pinned, while preserving third-party plugin pins. Thanks @vincentkoc.
- Doctor/status: warn when `OPENCLAW_GATEWAY_TOKEN` would shadow a different active `gateway.auth.token` source for local CLI commands, while avoiding false positives when config points at the same env token. Fixes #74271. Thanks @yelog.

View File

@@ -81,13 +81,11 @@ import {
import { shouldSuppressLocalTelegramExecApprovalPrompt } from "./exec-approvals.js";
import { markdownToTelegramChunks, renderTelegramHtmlText } from "./format.js";
import {
type ArchivedPreview,
createLaneDeliveryStateTracker,
createLaneTextDeliverer,
type DraftLaneState,
type LaneDeliveryResult,
type LaneName,
type LanePreviewLifecycle,
} from "./lane-delivery.js";
import {
createTelegramReasoningStepState,
@@ -354,7 +352,7 @@ export const dispatchTelegramMessage = async ({
channel: "telegram",
accountId: route.accountId,
});
const renderDraftPreview = (text: string) => ({
const renderStreamText = (text: string) => ({
text: renderTelegramHtmlText(text, { tableMode }),
parseMode: "HTML" as const,
});
@@ -369,7 +367,7 @@ export const dispatchTelegramMessage = async ({
});
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
const streamReasoningDraft = resolvedReasoningLevel === "stream";
const previewStreamingEnabled = streamMode !== "off";
const streamDeliveryEnabled = streamMode !== "off";
const rawReplyQuoteText =
ctxPayload.ReplyToIsQuote && typeof ctxPayload.ReplyToQuoteText === "string"
? ctxPayload.ReplyToQuoteText
@@ -418,7 +416,7 @@ export const dispatchTelegramMessage = async ({
}
const hasTelegramQuoteReply = replyToMode !== "off" && replyQuoteText != null;
const canStreamAnswerDraft =
previewStreamingEnabled &&
streamDeliveryEnabled &&
!hasTelegramQuoteReply &&
!accountBlockStreamingEnabled &&
!forceBlockStreamingForReasoning;
@@ -430,8 +428,6 @@ export const dispatchTelegramMessage = async ({
const draftMinInitialChars = streamMode === "progress" ? 0 : DRAFT_MIN_INITIAL_CHARS;
const progressSeed = `${route.accountId}:${chatId}:${threadSpec.id ?? ""}`;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
const archivedAnswerPreviews: ArchivedPreview[] = [];
const archivedReasoningPreviewIds: number[] = [];
const createDraftLane = (laneName: LaneName, enabled: boolean): DraftLaneState => {
const stream = enabled
? (telegramDeps.createTelegramDraftStream ?? createTelegramDraftStream)({
@@ -441,24 +437,14 @@ export const dispatchTelegramMessage = async ({
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
renderText: renderDraftPreview,
onSupersededPreview:
laneName === "answer" || laneName === "reasoning"
? (preview) => {
if (laneName === "reasoning") {
if (!archivedReasoningPreviewIds.includes(preview.messageId)) {
archivedReasoningPreviewIds.push(preview.messageId);
}
return;
}
archivedAnswerPreviews.push({
messageId: preview.messageId,
textSnapshot: preview.textSnapshot,
visibleSinceMs: preview.visibleSinceMs,
deleteIfUnused: true,
});
}
: undefined,
renderText: renderStreamText,
onSupersededPreview: (superseded) => {
void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => {
logVerbose(
`telegram: superseded ${laneName} stream cleanup failed (${superseded.messageId}): ${String(err)}`,
);
});
},
log: logVerbose,
warn: logVerbose,
})
@@ -467,43 +453,36 @@ export const dispatchTelegramMessage = async ({
stream,
lastPartialText: "",
hasStreamedMessage: false,
finalized: false,
};
};
const lanes: Record<LaneName, DraftLaneState> = {
answer: createDraftLane("answer", canStreamAnswerDraft),
reasoning: createDraftLane("reasoning", canStreamReasoningDraft),
};
const activePreviewLifecycleByLane: Record<LaneName, LanePreviewLifecycle> = {
answer: "transient",
reasoning: "transient",
};
const retainPreviewOnCleanupByLane: Record<LaneName, boolean> = {
answer: false,
reasoning: false,
};
const answerLane = lanes.answer;
const reasoningLane = lanes.reasoning;
const previewToolProgressEnabled =
const streamToolProgressEnabled =
Boolean(answerLane.stream) && resolveChannelStreamingPreviewToolProgress(telegramCfg);
let previewToolProgressSuppressed = false;
let previewToolProgressLines: string[] = [];
let answerLaneHasAssistantContent = false;
let streamToolProgressSuppressed = false;
let streamToolProgressLines: string[] = [];
const renderProgressDraft = async (options?: { flush?: boolean }) => {
if (!answerLane.stream || streamMode !== "progress") {
return;
}
const previewText = formatChannelProgressDraftText({
const streamText = formatChannelProgressDraftText({
entry: telegramCfg,
lines: previewToolProgressLines,
lines: streamToolProgressLines,
seed: progressSeed,
formatLine: formatProgressAsMarkdownCode,
});
if (!previewText || previewText === answerLane.lastPartialText) {
if (!streamText || streamText === answerLane.lastPartialText) {
return;
}
answerLane.lastPartialText = previewText;
answerLane.lastPartialText = streamText;
answerLane.hasStreamedMessage = true;
answerLane.stream.update(previewText);
answerLane.finalized = false;
answerLane.stream.update(streamText);
if (options?.flush) {
await answerLane.stream.flush();
}
@@ -511,7 +490,7 @@ export const dispatchTelegramMessage = async ({
const progressDraftGate = createChannelProgressDraftGate({
onStart: () => renderProgressDraft({ flush: true }),
});
const pushPreviewToolProgress = async (line?: string, options?: { toolName?: string }) => {
const pushStreamToolProgress = async (line?: string, options?: { toolName?: string }) => {
if (!answerLane.stream) {
return;
}
@@ -520,31 +499,32 @@ export const dispatchTelegramMessage = async ({
}
const normalized = sanitizeProgressMarkdownText(line?.replace(/\s+/g, " ").trim() ?? "");
if (streamMode !== "progress") {
if (!previewToolProgressEnabled || previewToolProgressSuppressed || !normalized) {
if (!streamToolProgressEnabled || streamToolProgressSuppressed || !normalized) {
return;
}
const previous = previewToolProgressLines.at(-1);
const previous = streamToolProgressLines.at(-1);
if (previous === normalized) {
return;
}
previewToolProgressLines = [...previewToolProgressLines, normalized].slice(
streamToolProgressLines = [...streamToolProgressLines, normalized].slice(
-resolveChannelProgressDraftMaxLines(telegramCfg),
);
const previewText = formatChannelProgressDraftText({
const streamText = formatChannelProgressDraftText({
entry: telegramCfg,
lines: previewToolProgressLines,
lines: streamToolProgressLines,
seed: progressSeed,
formatLine: formatProgressAsMarkdownCode,
});
answerLane.lastPartialText = previewText;
answerLane.lastPartialText = streamText;
answerLane.hasStreamedMessage = true;
answerLane.stream.update(previewText);
answerLane.finalized = false;
answerLane.stream.update(streamText);
return;
}
if (previewToolProgressEnabled && !previewToolProgressSuppressed && normalized) {
const previous = previewToolProgressLines.at(-1);
if (streamToolProgressEnabled && !streamToolProgressSuppressed && normalized) {
const previous = streamToolProgressLines.at(-1);
if (previous !== normalized) {
previewToolProgressLines = [...previewToolProgressLines, normalized].slice(
streamToolProgressLines = [...streamToolProgressLines, normalized].slice(
-resolveChannelProgressDraftMaxLines(telegramCfg),
);
}
@@ -556,9 +536,6 @@ export const dispatchTelegramMessage = async ({
}
};
let splitReasoningOnNextStream = false;
let skipNextAnswerMessageStartRotation = false;
let pendingCompactionReplayBoundary = false;
let discardAnswerPreviewOnNextRotation = false;
let draftLaneEventQueue = Promise.resolve();
const reasoningStepState = createTelegramReasoningStepState();
const enqueueDraftLaneEvent = (task: () => Promise<void>): Promise<void> => {
@@ -597,35 +574,22 @@ export const dispatchTelegramMessage = async ({
const resetDraftLaneState = (lane: DraftLaneState) => {
lane.lastPartialText = "";
lane.hasStreamedMessage = false;
lane.finalized = false;
};
const rotateAnswerLaneForNewAssistantMessage = async () => {
let didForceNewMessage = false;
if (answerLane.hasStreamedMessage) {
const materializedId = await answerLane.stream?.materialize?.();
const previewMessageId = materializedId ?? answerLane.stream?.messageId();
if (
!discardAnswerPreviewOnNextRotation &&
typeof previewMessageId === "number" &&
activePreviewLifecycleByLane.answer === "transient"
) {
archivedAnswerPreviews.push({
messageId: previewMessageId,
textSnapshot: answerLane.lastPartialText,
visibleSinceMs: answerLane.stream?.visibleSinceMs?.(),
deleteIfUnused: !answerLaneHasAssistantContent,
});
}
answerLane.stream?.forceNewMessage();
didForceNewMessage = true;
const rotateLaneForNewMessage = async (lane: DraftLaneState) => {
if (!lane.hasStreamedMessage && typeof lane.stream?.messageId() !== "number") {
resetDraftLaneState(lane);
return;
}
discardAnswerPreviewOnNextRotation = false;
resetDraftLaneState(answerLane);
answerLaneHasAssistantContent = false;
if (didForceNewMessage) {
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
await lane.stream?.stop();
lane.stream?.forceNewMessage();
resetDraftLaneState(lane);
};
const prepareAnswerLaneForText = async () => {
if (!answerLane.finalized) {
return;
}
return didForceNewMessage;
await rotateLaneForNewMessage(answerLane);
};
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
const laneStream = lane.stream;
@@ -639,11 +603,11 @@ export const dispatchTelegramMessage = async ({
if (streamMode === "progress") {
return;
}
answerLaneHasAssistantContent = true;
previewToolProgressSuppressed = true;
previewToolProgressLines = [];
streamToolProgressSuppressed = true;
streamToolProgressLines = [];
}
lane.hasStreamedMessage = true;
lane.finalized = false;
if (
lane.lastPartialText &&
lane.lastPartialText.startsWith(text) &&
@@ -656,11 +620,10 @@ export const dispatchTelegramMessage = async ({
};
const ingestDraftLaneSegments = async (text: string | undefined) => {
const split = splitTextIntoLaneSegments(text);
const hasAnswerSegment = split.segments.some((segment) => segment.lane === "answer");
if (hasAnswerSegment && activePreviewLifecycleByLane.answer !== "transient") {
skipNextAnswerMessageStartRotation = await rotateAnswerLaneForNewAssistantMessage();
}
for (const segment of split.segments) {
if (segment.lane === "answer") {
await prepareAnswerLaneForText();
}
if (segment.lane === "reasoning") {
reasoningStepState.noteReasoningHint();
reasoningStepState.noteReasoningDelivered();
@@ -676,7 +639,7 @@ export const dispatchTelegramMessage = async ({
};
const resolvedBlockStreamingEnabled = resolveChannelStreamingBlockEnabled(telegramCfg);
const disableBlockStreaming = !previewStreamingEnabled
const disableBlockStreaming = !streamDeliveryEnabled
? true
: forceBlockStreamingForReasoning
? false
@@ -801,7 +764,7 @@ export const dispatchTelegramMessage = async ({
} = next;
return followUp;
};
const splitFinalTextForPreview = (text: string): string[] => {
const splitFinalTextForStream = (text: string): string[] => {
const markdownChunks =
chunkMode === "newline"
? chunkMarkdownTextWithMode(text, draftMaxChars, chunkMode)
@@ -830,7 +793,6 @@ export const dispatchTelegramMessage = async ({
}
return payload.replyToId != null && replyQuoteByMessageId[payload.replyToId] != null;
};
let lastVisibleNonPreviewDeliveryAtMs: number | undefined;
const sendPayload = async (
payload: ReplyPayload,
options?: { durable?: boolean; silent?: boolean },
@@ -875,7 +837,6 @@ export const dispatchTelegramMessage = async ({
}
if (durable.status === "handled_visible") {
deliveryState.markDelivered();
lastVisibleNonPreviewDeliveryAtMs = Date.now();
return true;
}
if (durable.status === "handled_no_send") {
@@ -891,7 +852,6 @@ export const dispatchTelegramMessage = async ({
});
if (result.delivered) {
deliveryState.markDelivered();
lastVisibleNonPreviewDeliveryAtMs = Date.now();
}
return result.delivered;
};
@@ -912,19 +872,19 @@ export const dispatchTelegramMessage = async ({
};
const deliverLaneText = createLaneTextDeliverer({
lanes,
archivedAnswerPreviews,
activePreviewLifecycleByLane,
retainPreviewOnCleanupByLane,
draftMaxChars,
applyTextToPayload,
applyTextToFollowUpPayload,
splitFinalTextForPreview,
splitFinalTextForStream: splitFinalTextForStream,
sendPayload,
flushDraftLane,
stopDraftLane: async (lane) => {
await lane.stream?.stop();
},
editPreview: async ({ messageId, text, previewButtons }) => {
clearDraftLane: async (lane) => {
await lane.stream?.clear();
},
editStreamMessage: async ({ messageId, text, buttons }) => {
if (isDispatchSuperseded()) {
return;
}
@@ -933,20 +893,13 @@ export const dispatchTelegramMessage = async ({
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
buttons,
});
},
deletePreviewMessage: async (messageId) => {
if (isDispatchSuperseded()) {
return;
}
await bot.api.deleteMessage(chatId, messageId);
},
log: logVerbose,
markDelivered: () => {
deliveryState.markDelivered();
},
getLastVisibleNonPreviewDeliveryAtMs: () => lastVisibleNonPreviewDeliveryAtMs,
});
if (isDmTopic) {
@@ -1026,14 +979,6 @@ export const dispatchTelegramMessage = async ({
if (isDispatchSuperseded()) {
return;
}
const markVisibleNonPreviewBoundary = (didDeliver: boolean) => {
if (didDeliver && info.kind !== "final") {
pendingCompactionReplayBoundary = false;
if (answerLane.hasStreamedMessage) {
discardAnswerPreviewOnNextRotation = true;
}
}
};
if (payload.isError === true) {
hadErrorReplyFailureOrSkip = true;
}
@@ -1050,7 +995,7 @@ export const dispatchTelegramMessage = async ({
queuedFinal = true;
return;
}
const previewButtons = (
const telegramButtons = (
payload.channelData?.telegram as
| { buttons?: TelegramInlineButtons }
| undefined
@@ -1076,7 +1021,7 @@ export const dispatchTelegramMessage = async ({
text: buffered.text,
payload: buffered.payload,
infoKind: "final",
previewButtons: bufferedButtons,
buttons: bufferedButtons,
});
reasoningStepState.resetForNextStep();
};
@@ -1102,13 +1047,10 @@ export const dispatchTelegramMessage = async ({
text: segment.text,
payload,
infoKind: info.kind,
previewButtons,
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
buttons: telegramButtons,
});
if (info.kind === "final") {
emitPreviewFinalizedHook(result);
} else if (segment.lane === "answer" && result.kind === "sent") {
markVisibleNonPreviewBoundary(true);
}
if (segment.lane === "reasoning") {
if (result.kind !== "skipped") {
@@ -1122,24 +1064,18 @@ export const dispatchTelegramMessage = async ({
}
}
if (segments.length > 0) {
if (info.kind === "final") {
pendingCompactionReplayBoundary = false;
}
return;
}
if (split.suppressedReasoningOnly) {
if (reply.hasMedia) {
const payloadWithoutSuppressedReasoning =
typeof payload.text === "string" ? { ...payload, text: "" } : payload;
markVisibleNonPreviewBoundary(
await sendPayload(payloadWithoutSuppressedReasoning, {
durable: info.kind === "final",
}),
);
await sendPayload(payloadWithoutSuppressedReasoning, {
durable: info.kind === "final",
});
}
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
@@ -1153,16 +1089,12 @@ export const dispatchTelegramMessage = async ({
if (!canSendAsIs) {
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
return;
}
markVisibleNonPreviewBoundary(
await sendPayload(payload, { durable: info.kind === "final" }),
);
await sendPayload(payload, { durable: info.kind === "final" });
if (info.kind === "final") {
await flushBufferedFinalAnswer();
pendingCompactionReplayBoundary = false;
}
},
onSkip: (payload, info) => {
@@ -1225,46 +1157,29 @@ export const dispatchTelegramMessage = async ({
? () =>
enqueueDraftLaneEvent(async () => {
reasoningStepState.resetForNextStep();
previewToolProgressSuppressed = false;
previewToolProgressLines = [];
if (skipNextAnswerMessageStartRotation) {
skipNextAnswerMessageStartRotation = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
streamToolProgressSuppressed = false;
streamToolProgressLines = [];
if (answerLane.finalized) {
await rotateLaneForNewMessage(answerLane);
}
if (streamMode === "progress") {
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
if (pendingCompactionReplayBoundary) {
pendingCompactionReplayBoundary = false;
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
return;
}
await rotateAnswerLaneForNewAssistantMessage();
activePreviewLifecycleByLane.answer = "transient";
retainPreviewOnCleanupByLane.answer = false;
})
: undefined,
onReasoningEnd: reasoningLane.stream
? () =>
enqueueDraftLaneEvent(async () => {
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
previewToolProgressSuppressed = false;
previewToolProgressLines = [];
streamToolProgressSuppressed = false;
streamToolProgressLines = [];
})
: undefined,
suppressDefaultToolProgressMessages:
!previewStreamingEnabled || Boolean(answerLane.stream),
!streamDeliveryEnabled || Boolean(answerLane.stream),
onToolStart: async (payload) => {
const toolName = payload.name?.trim();
if (statusReactionController && toolName) {
await statusReactionController.setTool(toolName);
}
await pushPreviewToolProgress(
await pushStreamToolProgress(
formatChannelProgressDraftLineForEntry(
telegramCfg,
{
@@ -1279,7 +1194,7 @@ export const dispatchTelegramMessage = async ({
);
},
onItemEvent: async (payload) => {
await pushPreviewToolProgress(
await pushStreamToolProgress(
formatChannelProgressDraftLineForEntry(telegramCfg, {
event: "item",
itemKind: payload.kind,
@@ -1297,7 +1212,7 @@ export const dispatchTelegramMessage = async ({
if (payload.phase !== "update") {
return;
}
await pushPreviewToolProgress(
await pushStreamToolProgress(
formatChannelProgressDraftLine({
event: "plan",
phase: payload.phase,
@@ -1311,7 +1226,7 @@ export const dispatchTelegramMessage = async ({
if (payload.phase !== "requested") {
return;
}
await pushPreviewToolProgress(
await pushStreamToolProgress(
formatChannelProgressDraftLine({
event: "approval",
phase: payload.phase,
@@ -1326,7 +1241,7 @@ export const dispatchTelegramMessage = async ({
if (payload.phase !== "end") {
return;
}
await pushPreviewToolProgress(
await pushStreamToolProgress(
formatChannelProgressDraftLine({
event: "command-output",
phase: payload.phase,
@@ -1341,7 +1256,7 @@ export const dispatchTelegramMessage = async ({
if (payload.phase !== "end") {
return;
}
await pushPreviewToolProgress(
await pushStreamToolProgress(
formatChannelProgressDraftLine({
event: "patch",
phase: payload.phase,
@@ -1354,20 +1269,11 @@ export const dispatchTelegramMessage = async ({
}),
);
},
onCompactionStart:
statusReactionController || answerLane.stream
? async () => {
if (
answerLane.hasStreamedMessage &&
activePreviewLifecycleByLane.answer === "transient"
) {
pendingCompactionReplayBoundary = true;
}
if (statusReactionController) {
await statusReactionController.setCompacting();
}
}
: undefined,
onCompactionStart: statusReactionController
? async () => {
await statusReactionController.setCompacting();
}
: undefined,
onCompactionEnd: statusReactionController
? async () => {
statusReactionController.cancelPending();
@@ -1392,76 +1298,25 @@ export const dispatchTelegramMessage = async ({
} finally {
await draftLaneEventQueue;
progressDraftGate.cancel();
if (isDispatchSuperseded()) {
if (answerLane.hasStreamedMessage || typeof answerLane.stream?.messageId() === "number") {
retainPreviewOnCleanupByLane.answer = true;
}
for (const archivedPreview of archivedAnswerPreviews) {
archivedPreview.deleteIfUnused = false;
}
}
const streamCleanupStates = new Map<
NonNullable<DraftLaneState["stream"]>,
{ shouldClear: boolean }
>();
const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [
{ laneName: "answer", lane: answerLane },
{ laneName: "reasoning", lane: reasoningLane },
];
for (const laneState of lanesToCleanup) {
const stream = laneState.lane.stream;
for (const { lane } of lanesToCleanup) {
const stream = lane.stream;
if (!stream) {
continue;
}
const activePreviewMessageId = stream.messageId();
const hasBoundaryFinalizedActivePreview =
laneState.laneName === "answer" &&
typeof activePreviewMessageId === "number" &&
archivedAnswerPreviews.some(
(p) => p.deleteIfUnused === false && p.messageId === activePreviewMessageId,
);
const shouldClear =
!retainPreviewOnCleanupByLane[laneState.laneName] && !hasBoundaryFinalizedActivePreview;
const existing = streamCleanupStates.get(stream);
if (!existing) {
streamCleanupStates.set(stream, { shouldClear });
continue;
}
existing.shouldClear = existing.shouldClear && shouldClear;
}
for (const [stream, cleanupState] of streamCleanupStates) {
if (isDispatchSuperseded()) {
await (typeof stream.discard === "function" ? stream.discard() : stream.stop());
continue;
}
await stream.stop();
if (cleanupState.shouldClear) {
if (lane.finalized) {
await stream.stop();
} else {
await stream.clear();
}
}
if (!isDispatchSuperseded()) {
for (const archivedPreview of archivedAnswerPreviews) {
if (archivedPreview.deleteIfUnused === false) {
continue;
}
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {
logVerbose(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
}
for (const messageId of archivedReasoningPreviewIds) {
try {
await bot.api.deleteMessage(chatId, messageId);
} catch (err) {
logVerbose(
`telegram: archived reasoning preview cleanup failed (${messageId}): ${String(err)}`,
);
}
}
}
}
} finally {
dispatchWasSuperseded = isDispatchSuperseded();

View File

@@ -1,55 +1,11 @@
import {
createPreviewMessageReceipt,
defineFinalizableLivePreviewAdapter,
deliverWithFinalizableLivePreviewAdapter,
type MessageReceipt,
} from "openclaw/plugin-sdk/channel-message";
import { resolveSendableOutboundReplyParts } from "openclaw/plugin-sdk/reply-payload";
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
import type { TelegramInlineButtons } from "./button-types.js";
import type { TelegramDraftStream } from "./draft-stream.js";
import {
isRecoverableTelegramNetworkError,
isSafeToRetrySendError,
isTelegramClientRejection,
} from "./network-errors.js";
const MESSAGE_NOT_MODIFIED_RE =
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
const MESSAGE_NOT_FOUND_RE =
/400:\s*Bad Request:\s*message to edit not found|MESSAGE_ID_INVALID|message can't be edited/i;
const LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS = 60_000;
function extractErrorText(err: unknown): string {
return typeof err === "string"
? err
: err instanceof Error
? err.message
: typeof err === "object" && err && "description" in err
? typeof err.description === "string"
? err.description
: ""
: "";
}
function isMessageNotModifiedError(err: unknown): boolean {
return MESSAGE_NOT_MODIFIED_RE.test(extractErrorText(err));
}
/**
* Returns true when Telegram rejects an edit because the target message can no
* longer be resolved or edited. The caller still needs preview context to
* decide whether to retain a different visible preview or fall back to send.
*/
function isMissingPreviewMessageError(err: unknown): boolean {
return MESSAGE_NOT_FOUND_RE.test(extractErrorText(err));
}
function isIncompleteFinalPreviewPrefix(previewText: string, finalText: string): boolean {
const preview = previewText.trimEnd();
const final = finalText.trimEnd();
return preview.length > 0 && preview.length < final.length && final.startsWith(preview);
}
export type LaneName = "answer" | "reasoning";
@@ -57,26 +13,9 @@ export type DraftLaneState = {
stream: TelegramDraftStream | undefined;
lastPartialText: string;
hasStreamedMessage: boolean;
finalized: boolean;
};
export type ArchivedPreview = {
messageId: number;
textSnapshot: string;
visibleSinceMs?: number;
// Boundary-finalized previews should remain visible even if no matching
// final edit arrives; superseded previews can be safely deleted.
deleteIfUnused?: boolean;
};
export type LanePreviewLifecycle = "transient" | "complete";
export type LaneDeliveryResult =
| {
kind: "preview-finalized";
delivery: LanePreviewFinalizedDelivery;
}
| { kind: "preview-retained" | "preview-updated" | "sent" | "skipped" };
type LanePreviewFinalizedDelivery = {
content: string;
messageId: number;
@@ -87,36 +26,34 @@ type LanePreviewFinalizedDeliveryInput = Omit<LanePreviewFinalizedDelivery, "rec
receipt?: MessageReceipt;
};
export type LaneDeliveryResult =
| {
kind: "preview-finalized";
delivery: LanePreviewFinalizedDelivery;
}
| { kind: "preview-retained" | "preview-updated" | "sent" | "skipped" };
type CreateLaneTextDelivererParams = {
lanes: Record<LaneName, DraftLaneState>;
archivedAnswerPreviews: ArchivedPreview[];
activePreviewLifecycleByLane: Record<LaneName, LanePreviewLifecycle>;
retainPreviewOnCleanupByLane: Record<LaneName, boolean>;
draftMaxChars: number;
applyTextToPayload: (payload: ReplyPayload, text: string) => ReplyPayload;
applyTextToFollowUpPayload?: (payload: ReplyPayload, text: string) => ReplyPayload;
splitFinalTextForPreview?: (text: string) => readonly string[];
splitFinalTextForStream?: (text: string) => readonly string[];
sendPayload: (
payload: ReplyPayload,
options?: { durable?: boolean; silent?: boolean },
) => Promise<boolean>;
flushDraftLane: (lane: DraftLaneState) => Promise<void>;
stopDraftLane: (lane: DraftLaneState) => Promise<void>;
editPreview: (params: {
clearDraftLane: (lane: DraftLaneState) => Promise<void>;
editStreamMessage: (params: {
laneName: LaneName;
messageId: number;
text: string;
context: "final" | "update";
previewButtons?: TelegramInlineButtons;
buttons?: TelegramInlineButtons;
}) => Promise<void>;
deletePreviewMessage: (messageId: number) => Promise<void>;
log: (message: string) => void;
markDelivered: () => void;
now?: () => number;
// Force fresh final when a visible non-preview message has been delivered
// since the active preview was created, even if the preview is younger
// than the long-lived threshold (#76529).
getLastVisibleNonPreviewDeliveryAtMs?: () => number | undefined;
};
type DeliverLaneTextParams = {
@@ -124,55 +61,7 @@ type DeliverLaneTextParams = {
text: string;
payload: ReplyPayload;
infoKind: string;
previewButtons?: TelegramInlineButtons;
allowPreviewUpdateForNonFinal?: boolean;
};
type TryUpdatePreviewParams = {
lane: DraftLaneState;
laneName: LaneName;
text: string;
previewButtons?: TelegramInlineButtons;
stopBeforeEdit?: boolean;
updateLaneSnapshot?: boolean;
skipRegressive: RegressiveSkipMode;
context: "final" | "update";
previewMessageId?: number;
previewTextSnapshot?: string;
};
type PreviewEditResult = "edited" | "retained" | "regressive-skipped" | "fallback";
type ConsumeArchivedAnswerPreviewParams = {
lane: DraftLaneState;
text: string;
payload: ReplyPayload;
previewButtons?: TelegramInlineButtons;
canEditViaPreview: boolean;
};
type PreviewUpdateContext = "final" | "update";
type RegressiveSkipMode = "always" | "existingOnly" | "never";
type ResolvePreviewTargetParams = {
lane: DraftLaneState;
previewMessageIdOverride?: number;
stopBeforeEdit: boolean;
context: PreviewUpdateContext;
};
type PreviewTargetResolution = {
hadPreviewMessage: boolean;
previewMessageId: number | undefined;
stopCreatesFirstPreview: boolean;
};
type TelegramPreviewFinalEdit = {
laneName: LaneName;
messageId: number;
text: string;
context: "final" | "update";
previewButtons?: TelegramInlineButtons;
buttons?: TelegramInlineButtons;
};
function result(
@@ -192,485 +81,113 @@ function result(
return { kind };
}
function shouldSkipRegressivePreviewUpdate(args: {
currentPreviewText: string | undefined;
text: string;
skipRegressive: RegressiveSkipMode;
hadPreviewMessage: boolean;
}): boolean {
const currentPreviewText = args.currentPreviewText;
if (currentPreviewText === undefined) {
return false;
}
if (args.skipRegressive === "never") {
return false;
}
return (
currentPreviewText.startsWith(args.text) &&
args.text.length < currentPreviewText.length &&
(args.skipRegressive === "always" || args.hadPreviewMessage)
);
}
function isLongLivedPreview(visibleSinceMs: number | undefined, nowMs: number): boolean {
return (
typeof visibleSinceMs === "number" &&
Number.isFinite(visibleSinceMs) &&
nowMs - visibleSinceMs >= LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS
);
}
function compactPreviewFinalChunks(chunks: readonly string[]): string[] {
const result: string[] = [];
let pendingWhitespace = "";
function compactChunks(chunks: readonly string[]): string[] {
const out: string[] = [];
let whitespace = "";
for (const chunk of chunks) {
if (!chunk) {
continue;
}
if (chunk.trim().length === 0) {
pendingWhitespace += chunk;
whitespace += chunk;
continue;
}
result.push(`${pendingWhitespace}${chunk}`);
pendingWhitespace = "";
out.push(`${whitespace}${chunk}`);
whitespace = "";
}
if (pendingWhitespace && result.length > 0) {
result[result.length - 1] = `${result[result.length - 1]}${pendingWhitespace}`;
if (whitespace && out.length > 0) {
out[out.length - 1] = `${out[out.length - 1]}${whitespace}`;
}
return result;
}
function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTargetResolution {
const lanePreviewMessageId = params.lane.stream?.messageId();
const previewMessageId =
typeof params.previewMessageIdOverride === "number"
? params.previewMessageIdOverride
: lanePreviewMessageId;
const hadPreviewMessage =
typeof params.previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number";
return {
hadPreviewMessage,
previewMessageId: typeof previewMessageId === "number" ? previewMessageId : undefined,
stopCreatesFirstPreview:
params.stopBeforeEdit && !hadPreviewMessage && params.context === "final",
};
return out;
}
export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText;
const readNow = () => params.now?.() ?? Date.now();
const markActivePreviewComplete = (laneName: LaneName) => {
params.activePreviewLifecycleByLane[laneName] = "complete";
params.retainPreviewOnCleanupByLane[laneName] = true;
};
const isMessagePreviewLane = (lane: DraftLaneState) => lane.stream != null;
const wasVisiblyOverwrittenSince = (visibleSinceMs: number | undefined): boolean => {
if (typeof visibleSinceMs !== "number") {
return false;
}
const lastNonPreviewAt = params.getLastVisibleNonPreviewDeliveryAtMs?.();
return typeof lastNonPreviewAt === "number" && lastNonPreviewAt > visibleSinceMs;
};
const shouldUseFreshFinalForLane = (lane: DraftLaneState) => {
if (!isMessagePreviewLane(lane)) {
return false;
}
const visibleSinceMs = lane.stream?.visibleSinceMs?.();
return (
isLongLivedPreview(visibleSinceMs, readNow()) || wasVisiblyOverwrittenSince(visibleSinceMs)
);
};
const shouldUseFreshFinalForPreview = (lane: DraftLaneState, visibleSinceMs?: number) =>
isMessagePreviewLane(lane) &&
(isLongLivedPreview(visibleSinceMs, readNow()) || wasVisiblyOverwrittenSince(visibleSinceMs));
const buildFollowUpPayload = (payload: ReplyPayload, text: string) =>
const followUpPayload = (payload: ReplyPayload, text: string) =>
params.applyTextToFollowUpPayload
? params.applyTextToFollowUpPayload(payload, text)
: params.applyTextToPayload(payload, text);
const clearActivePreviewAfterFreshFinal = async (lane: DraftLaneState, laneName: LaneName) => {
try {
await lane.stream?.clear();
} catch (err) {
params.log(`telegram: ${laneName} fresh final preview cleanup failed: ${String(err)}`);
const clearUnfinalizedStream = async (lane: DraftLaneState) => {
if (!lane.stream || lane.finalized) {
return;
}
await params.clearDraftLane(lane);
lane.lastPartialText = "";
lane.hasStreamedMessage = false;
lane.stream?.forceNewMessage();
};
const tryEditPreviewMessage = async (args: {
laneName: LaneName;
messageId: number;
text: string;
context: "final" | "update";
previewButtons?: TelegramInlineButtons;
updateLaneSnapshot: boolean;
lane: DraftLaneState;
finalTextAlreadyLanded: boolean;
retainAlternatePreviewOnMissingTarget: boolean;
targetPreviewText: string;
}): Promise<PreviewEditResult> => {
const previewEditState: { result: PreviewEditResult } = { result: "fallback" };
const adapter = defineFinalizableLivePreviewAdapter<
{ text: string },
number,
TelegramPreviewFinalEdit
>({
draft: {
flush: async () => {},
clear: async () => {},
id: () => args.messageId,
},
buildFinalEdit: (payload) => ({
laneName: args.laneName,
messageId: args.messageId,
text: payload.text,
context: args.context,
...(args.previewButtons ? { previewButtons: args.previewButtons } : {}),
}),
editFinal: async (_messageId, edit) => {
try {
await params.editPreview(edit);
} catch (err) {
if (isMessageNotModifiedError(err)) {
params.log(
`telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`,
);
return;
}
throw err;
}
},
createPreviewReceipt: (messageId) => createPreviewMessageReceipt({ id: messageId }),
onPreviewFinalized: () => {
if (args.updateLaneSnapshot) {
args.lane.lastPartialText = args.text;
}
params.markDelivered();
previewEditState.result = "edited";
},
handlePreviewEditError: ({ error: err }) => {
previewEditState.result = "fallback";
if (isMessageNotModifiedError(err)) {
params.log(
`telegram: ${args.laneName} preview ${args.context} edit returned "message is not modified"; treating as delivered`,
);
params.markDelivered();
previewEditState.result = "edited";
return "retain";
}
if (args.context === "final") {
if (args.finalTextAlreadyLanded) {
params.log(
`telegram: ${args.laneName} preview final edit failed after stop flush; keeping existing preview (${String(err)})`,
);
params.markDelivered();
previewEditState.result = "retained";
return "retain";
}
if (isSafeToRetrySendError(err)) {
params.log(
`telegram: ${args.laneName} preview final edit failed before reaching Telegram; falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isMissingPreviewMessageError(err)) {
if (args.retainAlternatePreviewOnMissingTarget) {
params.log(
`telegram: ${args.laneName} preview final edit target missing; keeping alternate preview without fallback (${String(err)})`,
);
params.markDelivered();
previewEditState.result = "retained";
return "retain";
}
params.log(
`telegram: ${args.laneName} preview final edit target missing with no alternate preview; falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isRecoverableTelegramNetworkError(err, { allowMessageMatch: true })) {
params.log(
`telegram: ${args.laneName} preview final edit may have landed despite network error; keeping existing preview (${String(err)})`,
);
params.markDelivered();
previewEditState.result = "retained";
return "retain";
}
if (isTelegramClientRejection(err)) {
params.log(
`telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`,
);
return "fallback";
}
if (isIncompleteFinalPreviewPrefix(args.targetPreviewText, args.text)) {
params.log(
`telegram: ${args.laneName} preview final edit failed and existing preview is an incomplete prefix; falling back to standard send (${String(err)})`,
);
return "fallback";
}
// Default: ambiguous error — retain when fallback may duplicate a final
// edit that already landed or when the preview is not known-incomplete.
params.log(
`telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`,
);
params.markDelivered();
previewEditState.result = "retained";
return "retain";
}
params.log(
`telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`,
);
return "fallback";
},
});
const delivered = await deliverWithFinalizableLivePreviewAdapter({
kind: "final",
payload: { text: args.text },
adapter,
deliverNormally: async () => false,
});
if (delivered.kind === "preview-finalized" || previewEditState.result === "edited") {
return "edited";
const streamText = async (
laneName: LaneName,
lane: DraftLaneState,
text: string,
payload: ReplyPayload,
isFinal: boolean,
buttons?: TelegramInlineButtons,
): Promise<LaneDeliveryResult | undefined> => {
const stream = lane.stream;
if (!stream || text.length === 0 || payload.isError) {
return undefined;
}
if (delivered.kind === "preview-retained") {
return "retained";
const chunks =
text.length > params.draftMaxChars
? compactChunks(params.splitFinalTextForStream?.(text) ?? [])
: [text];
const [firstChunk, ...remainingChunks] = chunks;
if (!firstChunk || firstChunk.length > params.draftMaxChars) {
return undefined;
}
return "fallback";
};
const tryDeliverLongFinalThroughPreview = async (args: {
lane: DraftLaneState;
laneName: LaneName;
text: string;
payload: ReplyPayload;
previewButtons?: TelegramInlineButtons;
}): Promise<LaneDeliveryResult | undefined> => {
lane.lastPartialText = firstChunk;
lane.hasStreamedMessage = true;
lane.finalized = false;
stream.update(firstChunk);
if (isFinal) {
await params.stopDraftLane(lane);
} else {
await params.flushDraftLane(lane);
}
const messageId = stream.messageId();
if (typeof messageId !== "number") {
if (isFinal && stream.sendMayHaveLanded?.()) {
lane.finalized = true;
params.markDelivered();
return result("preview-retained");
}
return undefined;
}
const deliveredStreamText = stream.lastDeliveredText?.();
if (
!args.lane.stream ||
args.previewButtons !== undefined ||
params.activePreviewLifecycleByLane[args.laneName] !== "transient"
isFinal &&
deliveredStreamText !== undefined &&
deliveredStreamText !== firstChunk.trimEnd()
) {
return undefined;
}
const chunks = compactPreviewFinalChunks(params.splitFinalTextForPreview?.(args.text) ?? []);
const [firstChunk, ...remainingChunks] = chunks;
if (!firstChunk || remainingChunks.length === 0 || firstChunk.length > params.draftMaxChars) {
return undefined;
}
await params.flushDraftLane(args.lane);
const previewMessageId = args.lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return undefined;
}
const finalized = await tryUpdatePreviewForLane({
lane: args.lane,
laneName: args.laneName,
text: firstChunk,
stopBeforeEdit: true,
updateLaneSnapshot: true,
skipRegressive: "never",
context: "final",
});
if (finalized === "fallback") {
return undefined;
}
if (finalized === "retained") {
markActivePreviewComplete(args.laneName);
return result("preview-retained");
}
markActivePreviewComplete(args.laneName);
const remainingText = remainingChunks.join("");
if (remainingText.trim().length > 0) {
await params.sendPayload(buildFollowUpPayload(args.payload, remainingText));
}
return result("preview-finalized", {
content: args.text,
messageId: previewMessageId,
});
};
const tryUpdatePreviewForLane = async ({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit = false,
updateLaneSnapshot = false,
skipRegressive,
context,
previewMessageId: previewMessageIdOverride,
previewTextSnapshot,
}: TryUpdatePreviewParams): Promise<PreviewEditResult> => {
const editPreview = (
messageId: number,
finalTextAlreadyLanded: boolean,
retainAlternatePreviewOnMissingTarget: boolean,
targetPreviewText: string,
) =>
tryEditPreviewMessage({
laneName,
messageId,
text,
context,
previewButtons,
updateLaneSnapshot,
lane,
finalTextAlreadyLanded,
retainAlternatePreviewOnMissingTarget,
targetPreviewText,
});
const finalizePreview = (
previewMessageId: number,
finalTextAlreadyLanded: boolean,
hadPreviewMessage: boolean,
retainAlternatePreviewOnMissingTarget = false,
): PreviewEditResult | Promise<PreviewEditResult> => {
const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane);
const shouldSkipRegressive = shouldSkipRegressivePreviewUpdate({
currentPreviewText,
text,
skipRegressive,
hadPreviewMessage,
});
if (shouldSkipRegressive) {
params.markDelivered();
return "regressive-skipped";
}
return editPreview(
previewMessageId,
finalTextAlreadyLanded,
retainAlternatePreviewOnMissingTarget,
currentPreviewText,
);
};
if (!lane.stream) {
return "fallback";
}
const previewTargetBeforeStop = resolvePreviewTarget({
lane,
previewMessageIdOverride,
stopBeforeEdit,
context,
});
if (previewTargetBeforeStop.stopCreatesFirstPreview && lane.hasStreamedMessage) {
// Final stop() can create the first visible preview message.
// Prime pending text so the stop flush sends the final text snapshot.
lane.stream.update(text);
await params.stopDraftLane(lane);
const previewTargetAfterStop = resolvePreviewTarget({
lane,
stopBeforeEdit: false,
context,
});
if (typeof previewTargetAfterStop.previewMessageId !== "number") {
return "fallback";
}
return finalizePreview(previewTargetAfterStop.previewMessageId, true, false);
}
if (stopBeforeEdit) {
await params.stopDraftLane(lane);
}
const previewTargetAfterStop = resolvePreviewTarget({
lane,
previewMessageIdOverride,
stopBeforeEdit: false,
context,
});
if (typeof previewTargetAfterStop.previewMessageId !== "number") {
// Only retain for final delivery when a prior preview is already visible
// to the user — otherwise falling back is safer than silence. For updates,
// always fall back so the caller can attempt sendPayload without stale
// markDelivered() state.
if (context === "final" && lane.hasStreamedMessage && lane.stream?.sendMayHaveLanded?.()) {
params.log(
`telegram: ${laneName} preview send may have landed despite missing message id; keeping to avoid duplicate`,
);
params.markDelivered();
return "retained";
}
return "fallback";
}
const activePreviewMessageId = lane.stream?.messageId();
return finalizePreview(
previewTargetAfterStop.previewMessageId,
false,
previewTargetAfterStop.hadPreviewMessage,
typeof activePreviewMessageId === "number" &&
activePreviewMessageId !== previewTargetAfterStop.previewMessageId,
);
};
const consumeArchivedAnswerPreviewForFinal = async ({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
}: ConsumeArchivedAnswerPreviewParams): Promise<LaneDeliveryResult | undefined> => {
const archivedPreview = params.archivedAnswerPreviews.shift();
if (!archivedPreview) {
return undefined;
}
if (canEditViaPreview && shouldUseFreshFinalForPreview(lane, archivedPreview.visibleSinceMs)) {
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
if (delivered) {
try {
await params.deletePreviewMessage(archivedPreview.messageId);
} catch (err) {
params.log(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
return result("sent");
}
}
if (canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane,
laneName: "answer",
text,
previewButtons,
stopBeforeEdit: false,
skipRegressive: "existingOnly",
context: "final",
previewMessageId: archivedPreview.messageId,
previewTextSnapshot: archivedPreview.textSnapshot,
});
if (finalized === "edited") {
return result("preview-finalized", {
content: text,
messageId: archivedPreview.messageId,
});
}
if (finalized === "regressive-skipped") {
return result("preview-finalized", {
content: archivedPreview.textSnapshot,
messageId: archivedPreview.messageId,
});
}
if (finalized === "retained") {
params.retainPreviewOnCleanupByLane.answer = true;
return result("preview-retained");
}
}
// Send the replacement message first, then clean up the old preview.
// This avoids the visual "disappear then reappear" flash.
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
// Once this archived preview is consumed by a fallback final send, delete it
// regardless of deleteIfUnused. That flag only applies to unconsumed boundaries.
if (delivered || archivedPreview.deleteIfUnused !== false) {
params.markDelivered();
if (buttons) {
try {
await params.deletePreviewMessage(archivedPreview.messageId);
await params.editStreamMessage({ laneName, messageId, text: firstChunk, buttons });
} catch (err) {
params.log(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
params.log(`telegram: ${laneName} stream button edit failed: ${String(err)}`);
}
}
return delivered ? result("sent") : result("skipped");
if (isFinal) {
lane.finalized = true;
for (const chunk of remainingChunks) {
if (chunk.trim().length === 0) {
continue;
}
await params.sendPayload(followUpPayload(payload, chunk));
}
return result("preview-finalized", { content: text, messageId });
}
return result("preview-updated");
};
return async ({
@@ -678,132 +195,28 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
text,
payload,
infoKind,
previewButtons,
allowPreviewUpdateForNonFinal = false,
buttons,
}: DeliverLaneTextParams): Promise<LaneDeliveryResult> => {
const lane = params.lanes[laneName];
const reply = resolveSendableOutboundReplyParts(payload, { text });
const hasMedia = reply.hasMedia;
const canEditViaPreview =
!hasMedia && text.length > 0 && text.length <= params.draftMaxChars && !payload.isError;
if (infoKind === "final") {
// Transient previews must decide cleanup retention per final attempt.
// Completed previews intentionally stay retained so later extra payloads
// do not clear the already-finalized message.
if (params.activePreviewLifecycleByLane[laneName] === "transient") {
params.retainPreviewOnCleanupByLane[laneName] = false;
}
if (laneName === "answer") {
const archivedResult = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResult) {
return archivedResult;
}
}
if (canEditViaPreview && params.activePreviewLifecycleByLane[laneName] === "transient") {
await params.flushDraftLane(lane);
if (laneName === "answer") {
const archivedResultAfterFlush = await consumeArchivedAnswerPreviewForFinal({
lane,
text,
payload,
previewButtons,
canEditViaPreview,
});
if (archivedResultAfterFlush) {
return archivedResultAfterFlush;
}
}
if (shouldUseFreshFinalForLane(lane)) {
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
if (delivered) {
await clearActivePreviewAfterFreshFinal(lane, laneName);
return result("sent");
}
}
const previewMessageId = lane.stream?.messageId();
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: true,
skipRegressive: "existingOnly",
context: "final",
});
const finalizedMessageId = previewMessageId ?? lane.stream?.messageId();
if (finalized === "edited") {
markActivePreviewComplete(laneName);
if (typeof finalizedMessageId !== "number") {
return result("preview-retained");
}
return result("preview-finalized", {
content: text,
messageId: finalizedMessageId,
});
}
if (finalized === "regressive-skipped") {
markActivePreviewComplete(laneName);
if (typeof finalizedMessageId !== "number") {
return result("preview-retained");
}
return result("preview-finalized", {
content: lane.lastPartialText,
messageId: finalizedMessageId,
});
}
if (finalized === "retained") {
markActivePreviewComplete(laneName);
return result("preview-retained");
}
} else if (!hasMedia && !payload.isError && text.length > params.draftMaxChars) {
const longFinalResult = await tryDeliverLongFinalThroughPreview({
lane,
laneName,
text,
payload,
previewButtons,
});
if (longFinalResult) {
return longFinalResult;
}
params.log(
`telegram: preview final too long for edit (${text.length} > ${params.draftMaxChars}); falling back to standard send`,
);
}
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: true,
});
return delivered ? result("sent") : result("skipped");
const isFinal = infoKind === "final";
const streamed = !reply.hasMedia
? await streamText(laneName, lane, text, payload, isFinal, buttons)
: undefined;
if (streamed) {
return streamed;
}
if (allowPreviewUpdateForNonFinal && canEditViaPreview) {
const updated = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: false,
updateLaneSnapshot: true,
skipRegressive: "always",
context: "update",
});
if (updated === "edited" || updated === "regressive-skipped") {
return result("preview-updated");
}
if (isFinal) {
await clearUnfinalizedStream(lane);
}
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text), {
durable: isFinal,
});
if (delivered && isFinal) {
lane.finalized = true;
}
return delivered ? result("sent") : result("skipped");
};
}

View File

@@ -1,9 +1,7 @@
export {
type ArchivedPreview,
createLaneTextDeliverer,
type DraftLaneState,
type LaneDeliveryResult,
type LaneName,
type LanePreviewLifecycle,
} from "./lane-delivery-text-deliverer.js";
export { createLaneDeliveryStateTracker } from "./lane-delivery-state.js";