Files
openclaw/extensions/telegram/src/draft-stream.ts
2026-06-13 21:45:22 +05:30

438 lines
15 KiB
TypeScript

// Telegram plugin module implements draft stream behavior.
import type { Bot } from "grammy";
import {
createFinalizableDraftStreamControlsForState,
takeMessageIdAfterStop,
} from "openclaw/plugin-sdk/channel-outbound";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
import {
isRecoverableTelegramNetworkError,
isSafeToRetrySendError,
isTelegramClientRejection,
isTelegramMessageNotModifiedError,
isTelegramRateLimitError,
readTelegramRetryAfterMs,
} from "./network-errors.js";
import { normalizeTelegramReplyToMessageId } from "./outbound-params.js";
import {
buildTelegramRichMarkdown,
TELEGRAM_RICH_TEXT_LIMIT,
getTelegramRichRawApi,
type TelegramInputRichMessage,
type TelegramSendRichMessageParams,
} from "./rich-message.js";
const TELEGRAM_STREAM_MAX_CHARS = TELEGRAM_RICH_TEXT_LIMIT;
const DEFAULT_THROTTLE_MS = 1000;
// Retryable preview failures keep the latest text pending for the next throttle
// tick; cap consecutive misses so a persistent outage stops the preview instead
// of warn-spamming for the rest of the run.
const MAX_CONSECUTIVE_PREVIEW_FAILURES = 3;
// Flood waits beyond this freeze the preview longer than it is useful; clamp so
// a large retry_after cannot park the suspension past the run's lifetime.
const MAX_PREVIEW_FLOOD_SUSPEND_MS = 60_000;
export type TelegramDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
messageId: () => number | undefined;
visibleSinceMs?: () => number | undefined;
previewRevision?: () => number;
lastDeliveredText?: () => string;
clear: () => Promise<void>;
stop: () => Promise<void>;
/** Stop without a final flush or delete. */
discard?: () => Promise<void>;
/** Return the current preview message id after pending updates settle. */
materialize?: () => Promise<number | undefined>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
/** True when a preview sendMessage was attempted but the response was lost. */
sendMayHaveLanded?: () => boolean;
};
type TelegramDraftPreview = {
text: string;
richMessage: TelegramInputRichMessage;
};
type SupersededTelegramPreview = {
messageId: number;
textSnapshot: string;
visibleSinceMs?: number;
retain?: boolean;
};
function renderTelegramDraftPreview(
text: string,
renderText: ((text: string) => TelegramDraftPreview) | undefined,
): TelegramDraftPreview {
const trimmed = text.trimEnd();
return (
renderText?.(trimmed) ?? { text: trimmed, richMessage: buildTelegramRichMarkdown(trimmed) }
);
}
function telegramDraftPreviewKey(preview: TelegramDraftPreview): string {
return JSON.stringify(preview.richMessage);
}
function findTelegramDraftChunkLength(
text: string,
maxChars: number,
renderText: ((text: string) => TelegramDraftPreview) | undefined,
): number {
let best = 0;
let low = 1;
let high = text.length;
while (low <= high) {
const mid = Math.floor((low + high) / 2);
const renderedText = renderTelegramDraftPreview(text.slice(0, mid), renderText).text.trimEnd();
if (renderedText && renderedText.length <= maxChars) {
best = mid;
low = mid + 1;
} else {
high = mid - 1;
}
}
return best;
}
export function createTelegramDraftStream(params: {
api: Bot["api"];
chatId: Parameters<Bot["api"]["sendMessage"]>[0];
maxChars?: number;
thread?: TelegramThreadSpec | null;
replyToMessageId?: number;
throttleMs?: number;
/** Minimum chars before sending first message (debounce for push notifications) */
minInitialChars?: number;
/** Optional preview renderer (e.g. markdown -> HTML + parse mode). */
renderText?: (text: string) => TelegramDraftPreview;
/** Called when a late send resolves after forceNewMessage() switched generations. */
onSupersededPreview?: (preview: SupersededTelegramPreview) => void;
log?: (message: string) => void;
warn?: (message: string) => void;
}): TelegramDraftStream {
const maxChars = Math.min(
params.maxChars ?? TELEGRAM_STREAM_MAX_CHARS,
TELEGRAM_STREAM_MAX_CHARS,
);
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const minInitialChars = params.minInitialChars;
const chatId = params.chatId;
const threadParams = buildTelegramThreadParams(params.thread);
const replyToMessageId = normalizeTelegramReplyToMessageId(params.replyToMessageId);
const richReplyParams: Omit<TelegramSendRichMessageParams, "chat_id" | "rich_message"> =
replyToMessageId != null
? {
...threadParams,
reply_parameters: {
message_id: replyToMessageId,
allow_sending_without_reply: true,
},
}
: (threadParams ?? {});
const streamState = { stopped: false, final: false };
let messageSendAttempted = false;
let suspendedUntilMs = 0;
let consecutivePreviewFailures = 0;
let streamMessageId: number | undefined;
let streamVisibleSinceMs: number | undefined;
let lastSentPreviewKey = "";
let lastDeliveredText = "";
let lastRequestedText = "";
let previewRevision = 0;
let generation = 0;
let deliveredTextOffset = 0;
type PreviewSendParams = {
preview: TelegramDraftPreview;
sendGeneration: number;
};
const sendRenderedMessage = async (preview: TelegramDraftPreview) => {
const richRawApi = getTelegramRichRawApi(params.api);
return await richRawApi.sendRichMessage({
chat_id: chatId,
rich_message: preview.richMessage,
...richReplyParams,
});
};
const sendMessageTransportPreview = async ({
preview,
sendGeneration,
}: PreviewSendParams): Promise<boolean> => {
if (typeof streamMessageId === "number") {
streamVisibleSinceMs ??= Date.now();
const richRawApi = getTelegramRichRawApi(params.api);
await richRawApi.editMessageText({
chat_id: chatId,
message_id: streamMessageId,
rich_message: preview.richMessage,
});
return true;
}
messageSendAttempted = true;
let sent: Awaited<ReturnType<typeof sendRenderedMessage>>;
try {
sent = await sendRenderedMessage(preview);
} catch (err) {
if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) {
messageSendAttempted = false;
}
throw err;
}
const sentMessageId = sent?.message_id;
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
streamState.stopped = true;
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
return false;
}
const normalizedMessageId = Math.trunc(sentMessageId);
const visibleSinceMs = Date.now();
if (sendGeneration !== generation) {
params.onSupersededPreview?.({
messageId: normalizedMessageId,
textSnapshot: preview.text,
visibleSinceMs,
retain: true,
});
return true;
}
streamMessageId = normalizedMessageId;
streamVisibleSinceMs = visibleSinceMs;
return true;
};
const stopOversizedPreview = (renderedText: string): false => {
streamState.stopped = true;
params.warn?.(
`telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`,
);
return false;
};
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
if (streamState.stopped && !streamState.final) {
return false;
}
// Flood-control suspension: returning false keeps the newest text pending,
// so the first tick after retry_after delivers it. Final flushes still try
// so the last text has a chance to land.
if (!streamState.final && Date.now() < suspendedUntilMs) {
return false;
}
const trimmed = text.trimEnd();
if (!trimmed) {
return false;
}
const currentText = trimmed.slice(deliveredTextOffset).trimStart();
if (!currentText) {
return false;
}
const rendered = renderTelegramDraftPreview(currentText, params.renderText);
const renderedText = rendered.text.trimEnd();
const renderedPreview = { ...rendered, text: renderedText };
const renderedPreviewKey = telegramDraftPreviewKey(renderedPreview);
if (!renderedText) {
return false;
}
if (renderedText.length > maxChars) {
const chunkLength = findTelegramDraftChunkLength(currentText, maxChars, params.renderText);
if (!streamState.final) {
if (chunkLength > 0) {
return await sendOrEditStreamMessage(
trimmed.slice(0, deliveredTextOffset) + currentText.slice(0, chunkLength),
);
}
return stopOversizedPreview(renderedText);
}
if (lastDeliveredText.length > deliveredTextOffset) {
const supersededMessageId = streamMessageId;
const supersededTextSnapshot = lastDeliveredText.slice(deliveredTextOffset);
const supersededVisibleSinceMs = streamVisibleSinceMs;
deliveredTextOffset = lastDeliveredText.length;
resetStreamToNewMessage({ keepFinal: true, keepPending: true, resetOffset: false });
if (typeof supersededMessageId === "number") {
params.onSupersededPreview?.({
messageId: supersededMessageId,
textSnapshot: supersededTextSnapshot,
visibleSinceMs: supersededVisibleSinceMs,
retain: true,
});
}
return await sendOrEditStreamMessage(trimmed);
}
if (chunkLength > 0) {
const sent = await sendOrEditStreamMessage(
trimmed.slice(0, deliveredTextOffset) + currentText.slice(0, chunkLength),
);
if (!sent) {
return false;
}
return await sendOrEditStreamMessage(trimmed);
}
return stopOversizedPreview(renderedText);
}
if (renderedPreviewKey === lastSentPreviewKey) {
return true;
}
const sendGeneration = generation;
if (typeof streamMessageId !== "number" && minInitialChars != null && !streamState.final) {
if (renderedText.length < minInitialChars) {
return false;
}
}
const previousSentPreviewKey = lastSentPreviewKey;
lastSentPreviewKey = renderedPreviewKey;
try {
const sent = await sendMessageTransportPreview({
preview: renderedPreview,
sendGeneration,
});
if (sent) {
previewRevision += 1;
lastDeliveredText = trimmed;
consecutivePreviewFailures = 0;
suspendedUntilMs = 0;
}
return sent;
} catch (err) {
const isEdit = typeof streamMessageId === "number";
if (isEdit && isTelegramMessageNotModifiedError(err)) {
// Telegram already shows exactly this text; count the edit as delivered.
consecutivePreviewFailures = 0;
lastDeliveredText = trimmed;
return true;
}
// Roll back the dedupe snapshot so the retried tick is not skipped as a no-op.
lastSentPreviewKey = previousSentPreviewKey;
// Flood control is always retryable: Telegram rejected the call outright.
// Beyond that, edits retry on any transient network error (re-editing the
// same content is idempotent) while an unsent first preview retries only
// on provably pre-connect failures — anything ambiguous could duplicate
// the preview message.
const retryable =
isTelegramRateLimitError(err) ||
(isEdit ? isRecoverableTelegramNetworkError(err) : isSafeToRetrySendError(err));
consecutivePreviewFailures += 1;
if (retryable && consecutivePreviewFailures <= MAX_CONSECUTIVE_PREVIEW_FAILURES) {
const retryAfterMs = readTelegramRetryAfterMs(err);
if (retryAfterMs !== undefined) {
suspendedUntilMs = Date.now() + Math.min(retryAfterMs, MAX_PREVIEW_FLOOD_SUSPEND_MS);
}
params.warn?.(
`telegram stream preview ${isEdit ? "edit" : "send"} failed (retrying): ${formatErrorMessage(err)}`,
);
return false;
}
streamState.stopped = true;
params.warn?.(`telegram stream preview failed: ${formatErrorMessage(err)}`);
return false;
}
};
const {
loop,
update: updateDraft,
stopForClear,
} = createFinalizableDraftStreamControlsForState({
throttleMs,
state: streamState,
sendOrEditStreamMessage,
});
const update = (text: string) => {
if (streamState.stopped || streamState.final) {
return;
}
lastRequestedText = text;
updateDraft(text);
};
const stop = async () => {
streamState.final = true;
await loop.flush();
if (streamState.stopped) {
return;
}
const finalText = lastRequestedText.trimEnd();
if (finalText && finalText !== lastDeliveredText.trimEnd()) {
await sendOrEditStreamMessage(finalText);
}
streamState.final = true;
};
const resetStreamToNewMessage: (options?: {
keepFinal?: boolean;
keepPending?: boolean;
resetOffset?: boolean;
}) => void = (options) => {
streamState.stopped = false;
streamState.final = options?.keepFinal === true;
generation += 1;
messageSendAttempted = false;
streamMessageId = undefined;
streamVisibleSinceMs = undefined;
lastSentPreviewKey = "";
if (options?.resetOffset !== false) {
deliveredTextOffset = 0;
lastRequestedText = "";
}
if (!options?.keepPending) {
loop.resetPending();
}
loop.resetThrottleWindow();
};
const clear = async () => {
const messageId = await takeMessageIdAfterStop({
stopForClear,
readMessageId: () => streamMessageId,
clearMessageId: () => {
streamMessageId = undefined;
},
});
if (typeof messageId === "number" && Number.isFinite(messageId)) {
try {
await params.api.deleteMessage(chatId, messageId);
params.log?.(`telegram stream preview deleted (chat=${chatId}, message=${messageId})`);
} catch (err) {
params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`);
}
}
};
const discard = async () => {
await stopForClear();
};
const forceNewMessage = () => {
resetStreamToNewMessage();
};
const materialize = async (): Promise<number | undefined> => {
await stop();
return streamMessageId;
};
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return {
update,
flush: loop.flush,
messageId: () => streamMessageId,
visibleSinceMs: () => streamVisibleSinceMs,
previewRevision: () => previewRevision,
lastDeliveredText: () => lastDeliveredText,
clear,
stop,
discard,
materialize,
forceNewMessage,
sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",
};
}