fix(telegram): remove native draft preview transport

This commit is contained in:
Ayaan Zaidi
2026-04-30 17:49:02 +05:30
parent 4aa08e9d79
commit 2a4dd89253
3 changed files with 8 additions and 266 deletions

View File

@@ -409,8 +409,6 @@ export const dispatchTelegramMessage = async ({
? (replyQuoteMessageId ?? msg.message_id)
: undefined;
const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS;
// DM draft previews still duplicate briefly at materialize time.
const useMessagePreviewTransportForDm = threadSpec?.scope === "dm" && canStreamAnswerDraft;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
const archivedAnswerPreviews: ArchivedPreview[] = [];
const archivedReasoningPreviewIds: number[] = [];
@@ -421,7 +419,6 @@ export const dispatchTelegramMessage = async ({
chatId,
maxChars: draftMaxChars,
thread: threadSpec,
previewTransport: useMessagePreviewTransportForDm ? "message" : "auto",
replyToMessageId: draftReplyToMessageId,
minInitialChars: draftMinInitialChars,
renderText: renderDraftPreview,

View File

@@ -10,21 +10,7 @@ import { normalizeTelegramReplyToMessageId } from "./outbound-params.js";
const TELEGRAM_STREAM_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 1000;
const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647;
const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i;
const DRAFT_METHOD_UNAVAILABLE_RE =
/(unknown method|method .*not (found|available|supported)|unsupported)/i;
const DRAFT_CHAT_UNSUPPORTED_RE = /(can't be used|can be used only)/i;
type TelegramSendMessageDraft = (
chatId: Parameters<Bot["api"]["sendMessage"]>[0],
draftId: number,
text: string,
params?: {
message_thread_id?: number;
parse_mode?: "HTML";
},
) => Promise<unknown>;
type TelegramSendMessageParams = Parameters<Bot["api"]["sendMessage"]>[2];
@@ -38,71 +24,18 @@ function hasNumericMessageThreadId(
);
}
/**
* Keep draft-id allocation shared across bundled chunks so concurrent preview
* lanes do not accidentally reuse draft ids when code-split entries coexist.
*/
const TELEGRAM_DRAFT_STREAM_STATE_KEY = Symbol.for("openclaw.telegramDraftStreamState");
let draftStreamState: { nextDraftId: number } | undefined;
function getDraftStreamState(): { nextDraftId: number } {
if (!draftStreamState) {
const globalStore = globalThis as Record<PropertyKey, unknown>;
draftStreamState = (globalStore[TELEGRAM_DRAFT_STREAM_STATE_KEY] as
| { nextDraftId: number }
| undefined) ?? {
nextDraftId: 0,
};
globalStore[TELEGRAM_DRAFT_STREAM_STATE_KEY] = draftStreamState;
}
return draftStreamState;
}
function allocateTelegramDraftId(): number {
const state = getDraftStreamState();
state.nextDraftId = state.nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : state.nextDraftId + 1;
return state.nextDraftId;
}
function resolveSendMessageDraftApi(api: Bot["api"]): TelegramSendMessageDraft | undefined {
const sendMessageDraft = (api as Bot["api"] & { sendMessageDraft?: TelegramSendMessageDraft })
.sendMessageDraft;
if (typeof sendMessageDraft !== "function") {
return undefined;
}
return sendMessageDraft.bind(api as object);
}
function shouldFallbackFromDraftTransport(err: unknown): boolean {
const text =
typeof err === "string"
? err
: err instanceof Error
? err.message
: typeof err === "object" && err && "description" in err
? typeof err.description === "string"
? err.description
: ""
: "";
if (!/sendMessageDraft/i.test(text)) {
return false;
}
return DRAFT_METHOD_UNAVAILABLE_RE.test(text) || DRAFT_CHAT_UNSUPPORTED_RE.test(text);
}
export type TelegramDraftStream = {
update: (text: string) => void;
flush: () => Promise<void>;
messageId: () => number | undefined;
visibleSinceMs?: () => number | undefined;
previewMode?: () => "message" | "draft";
previewRevision?: () => number;
lastDeliveredText?: () => string;
clear: () => Promise<void>;
stop: () => Promise<void>;
/** Stop without a final flush or delete. */
discard?: () => Promise<void>;
/** Convert the current draft preview into a permanent message (sendMessage). */
/** Return the current preview message id after pending updates settle. */
materialize?: () => Promise<number | undefined>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
@@ -127,7 +60,6 @@ export function createTelegramDraftStream(params: {
chatId: Parameters<Bot["api"]["sendMessage"]>[0];
maxChars?: number;
thread?: TelegramThreadSpec | null;
previewTransport?: "auto" | "message" | "draft";
replyToMessageId?: number;
throttleMs?: number;
/** Minimum chars before sending first message (debounce for push notifications) */
@@ -146,13 +78,6 @@ export function createTelegramDraftStream(params: {
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const minInitialChars = params.minInitialChars;
const chatId = params.chatId;
const requestedPreviewTransport = params.previewTransport ?? "auto";
const prefersDraftTransport =
requestedPreviewTransport === "draft"
? true
: requestedPreviewTransport === "message"
? false
: params.thread?.scope === "dm";
const threadParams = buildTelegramThreadParams(params.thread);
const replyToMessageId = normalizeTelegramReplyToMessageId(params.replyToMessageId);
const replyParams =
@@ -163,22 +88,11 @@ export function createTelegramDraftStream(params: {
allow_sending_without_reply: true,
}
: threadParams;
const resolvedDraftApi = prefersDraftTransport
? resolveSendMessageDraftApi(params.api)
: undefined;
const usesDraftTransport = Boolean(prefersDraftTransport && resolvedDraftApi);
if (prefersDraftTransport && !usesDraftTransport) {
params.warn?.(
"telegram stream preview: sendMessageDraft unavailable; falling back to sendMessage/editMessageText",
);
}
const streamState = { stopped: false, final: false };
let messageSendAttempted = false;
let streamMessageId: number | undefined;
let streamVisibleSinceMs: number | undefined;
let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined;
let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message";
let lastSentText = "";
let lastDeliveredText = "";
let lastSentParseMode: "HTML" | undefined;
@@ -275,26 +189,6 @@ export function createTelegramDraftStream(params: {
streamVisibleSinceMs = visibleSinceMs;
return true;
};
const sendDraftTransportPreview = async ({
renderedText,
renderedParseMode,
}: PreviewSendParams): Promise<boolean> => {
const draftId = streamDraftId ?? allocateTelegramDraftId();
streamDraftId = draftId;
const draftParams = {
...(threadParams?.message_thread_id != null
? { message_thread_id: threadParams.message_thread_id }
: {}),
...(renderedParseMode ? { parse_mode: renderedParseMode } : {}),
};
await resolvedDraftApi!(
chatId,
draftId,
renderedText,
Object.keys(draftParams).length > 0 ? draftParams : undefined,
);
return true;
};
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
if (streamState.stopped && !streamState.final) {
@@ -331,36 +225,11 @@ export function createTelegramDraftStream(params: {
lastSentText = renderedText;
lastSentParseMode = renderedParseMode;
try {
let sent = false;
if (previewTransport === "draft") {
try {
sent = await sendDraftTransportPreview({
renderedText,
renderedParseMode,
sendGeneration,
});
} catch (err) {
if (!shouldFallbackFromDraftTransport(err)) {
throw err;
}
previewTransport = "message";
streamDraftId = undefined;
params.warn?.(
"telegram stream preview: sendMessageDraft rejected by API; falling back to sendMessage/editMessageText",
);
sent = await sendMessageTransportPreview({
renderedText,
renderedParseMode,
sendGeneration,
});
}
} else {
sent = await sendMessageTransportPreview({
renderedText,
renderedParseMode,
sendGeneration,
});
}
const sent = await sendMessageTransportPreview({
renderedText,
renderedParseMode,
sendGeneration,
});
if (sent) {
previewRevision += 1;
lastDeliveredText = trimmed;
@@ -396,16 +265,6 @@ export function createTelegramDraftStream(params: {
}
return;
}
if (previewTransport !== "draft" || resolvedDraftApi == null || streamDraftId == null) {
return;
}
const clearDraftId = streamDraftId;
streamDraftId = undefined;
try {
await resolvedDraftApi(chatId, clearDraftId, "", threadParams);
} catch (err) {
params.warn?.(`telegram stream preview cleanup failed: ${formatErrorMessage(err)}`);
}
};
const discard = async () => {
@@ -419,9 +278,6 @@ export function createTelegramDraftStream(params: {
messageSendAttempted = false;
streamMessageId = undefined;
streamVisibleSinceMs = undefined;
if (previewTransport === "draft") {
streamDraftId = allocateTelegramDraftId();
}
lastSentText = "";
lastSentParseMode = undefined;
loop.resetPending();
@@ -430,41 +286,7 @@ export function createTelegramDraftStream(params: {
const materialize = async (): Promise<number | undefined> => {
await stop();
if (previewTransport === "message" && typeof streamMessageId === "number") {
return streamMessageId;
}
const renderedText = lastSentText || lastDeliveredText;
if (!renderedText) {
return undefined;
}
const renderedParseMode = lastSentText ? lastSentParseMode : undefined;
try {
const { sent, usedThreadParams } = await sendRenderedMessageWithThreadFallback({
renderedText,
renderedParseMode,
fallbackWarnMessage:
"telegram stream preview materialize send failed with message_thread_id, retrying without thread",
});
const sentId = sent?.message_id;
if (typeof sentId === "number" && Number.isFinite(sentId)) {
streamMessageId = Math.trunc(sentId);
streamVisibleSinceMs = Date.now();
if (resolvedDraftApi != null && streamDraftId != null) {
const clearDraftId = streamDraftId;
const clearThreadParams =
usedThreadParams && threadParams?.message_thread_id != null
? { message_thread_id: threadParams.message_thread_id }
: undefined;
try {
await resolvedDraftApi(chatId, clearDraftId, "", clearThreadParams);
} catch {}
}
return streamMessageId;
}
} catch (err) {
params.warn?.(`telegram stream preview materialize failed: ${formatErrorMessage(err)}`);
}
return undefined;
return streamMessageId;
};
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
@@ -474,7 +296,6 @@ export function createTelegramDraftStream(params: {
flush: loop.flush,
messageId: () => streamMessageId,
visibleSinceMs: () => streamVisibleSinceMs,
previewMode: () => previewTransport,
previewRevision: () => previewRevision,
lastDeliveredText: () => lastDeliveredText,
clear,
@@ -485,9 +306,3 @@ export function createTelegramDraftStream(params: {
sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",
};
}
export const __testing = {
resetTelegramDraftStreamForTests() {
getDraftStreamState().nextDraftId = 0;
},
};

View File

@@ -203,8 +203,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
params.activePreviewLifecycleByLane[laneName] = "complete";
params.retainPreviewOnCleanupByLane[laneName] = true;
};
const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft";
const isMessagePreviewLane = (lane: DraftLaneState) => !isDraftPreviewLane(lane);
const isMessagePreviewLane = (lane: DraftLaneState) => lane.stream != null;
const shouldUseFreshFinalForLane = (lane: DraftLaneState) =>
isMessagePreviewLane(lane) && isLongLivedPreview(lane.stream?.visibleSinceMs?.(), readNow());
const shouldUseFreshFinalForPreview = (lane: DraftLaneState, visibleSinceMs?: number) =>
@@ -219,43 +218,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
lane.hasStreamedMessage = false;
lane.stream?.forceNewMessage();
};
const canMaterializeDraftFinal = (
lane: DraftLaneState,
previewButtons?: TelegramInlineButtons,
) => {
const hasPreviewButtons = Boolean(previewButtons && previewButtons.length > 0);
return (
lane.hasStreamedMessage &&
isDraftPreviewLane(lane) &&
!hasPreviewButtons &&
typeof lane.stream?.materialize === "function"
);
};
const tryMaterializeDraftPreviewForFinal = async (args: {
lane: DraftLaneState;
laneName: LaneName;
text: string;
}): Promise<number | undefined> => {
const stream = args.lane.stream;
if (!stream || !isDraftPreviewLane(args.lane)) {
return undefined;
}
// Draft previews have no message_id to edit; materialize the final text
// into a real message and treat that as the finalized delivery.
stream.update(args.text);
const materializedMessageId = await stream.materialize?.();
if (typeof materializedMessageId !== "number") {
params.log(
`telegram: ${args.laneName} draft preview materialize produced no message id; falling back to standard send`,
);
return undefined;
}
args.lane.lastPartialText = args.text;
params.markDelivered();
return materializedMessageId;
};
const tryEditPreviewMessage = async (args: {
laneName: LaneName;
messageId: number;
@@ -578,20 +540,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
return archivedResultAfterFlush;
}
}
if (canMaterializeDraftFinal(lane, previewButtons)) {
const materializedMessageId = await tryMaterializeDraftPreviewForFinal({
lane,
laneName,
text,
});
if (typeof materializedMessageId === "number") {
markActivePreviewComplete(laneName);
return result("preview-finalized", {
content: text,
messageId: materializedMessageId,
});
}
}
if (shouldUseFreshFinalForLane(lane)) {
await params.stopDraftLane(lane);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
@@ -639,24 +587,6 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
if (allowPreviewUpdateForNonFinal && canEditViaPreview) {
if (isDraftPreviewLane(lane)) {
// DM draft flow has no message_id to edit; updates are sent via sendMessageDraft.
// Only mark as updated when the draft flush actually emits an update.
const previewRevisionBeforeFlush = lane.stream?.previewRevision?.() ?? 0;
lane.stream?.update(text);
await params.flushDraftLane(lane);
const previewUpdated = (lane.stream?.previewRevision?.() ?? 0) > previewRevisionBeforeFlush;
if (!previewUpdated) {
params.log(
`telegram: ${laneName} draft preview update not emitted; falling back to standard send`,
);
const delivered = await params.sendPayload(params.applyTextToPayload(payload, text));
return delivered ? result("sent") : result("skipped");
}
lane.lastPartialText = text;
params.markDelivered();
return result("preview-updated");
}
const updated = await tryUpdatePreviewForLane({
lane,
laneName,