mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 00:20:22 +00:00
feat(telegram): default streaming preview to partial
This commit is contained in:
@@ -6,6 +6,9 @@ const TELEGRAM_STREAM_MAX_CHARS = 4096;
|
||||
const DEFAULT_THROTTLE_MS = 1000;
|
||||
const TELEGRAM_DRAFT_ID_MAX = 2_147_483_647;
|
||||
const THREAD_NOT_FOUND_RE = /400:\s*Bad Request:\s*message thread not found/i;
|
||||
const DRAFT_METHOD_UNAVAILABLE_RE =
|
||||
/(unknown method|method .*not (found|available|supported)|unsupported)/i;
|
||||
const DRAFT_CHAT_UNSUPPORTED_RE = /(can't be used|can be used only)/i;
|
||||
|
||||
type TelegramSendMessageDraft = (
|
||||
chatId: number,
|
||||
@@ -33,6 +36,23 @@ function resolveSendMessageDraftApi(api: Bot["api"]): TelegramSendMessageDraft |
|
||||
return sendMessageDraft.bind(api as object);
|
||||
}
|
||||
|
||||
function shouldFallbackFromDraftTransport(err: unknown): boolean {
|
||||
const text =
|
||||
typeof err === "string"
|
||||
? err
|
||||
: err instanceof Error
|
||||
? err.message
|
||||
: typeof err === "object" && err && "description" in err
|
||||
? typeof err.description === "string"
|
||||
? err.description
|
||||
: ""
|
||||
: "";
|
||||
if (!/sendMessageDraft/i.test(text)) {
|
||||
return false;
|
||||
}
|
||||
return DRAFT_METHOD_UNAVAILABLE_RE.test(text) || DRAFT_CHAT_UNSUPPORTED_RE.test(text);
|
||||
}
|
||||
|
||||
export type TelegramDraftStream = {
|
||||
update: (text: string) => void;
|
||||
flush: () => Promise<void>;
|
||||
@@ -105,101 +125,98 @@ export function createTelegramDraftStream(params: {
|
||||
const streamState = { stopped: false, final: false };
|
||||
let streamMessageId: number | undefined;
|
||||
let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined;
|
||||
let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message";
|
||||
let lastSentText = "";
|
||||
let lastSentParseMode: "HTML" | undefined;
|
||||
let previewRevision = 0;
|
||||
let generation = 0;
|
||||
const sendStreamPreview = usesDraftTransport
|
||||
? async ({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
}: {
|
||||
renderedText: string;
|
||||
renderedParseMode: "HTML" | undefined;
|
||||
sendGeneration: number;
|
||||
}): Promise<boolean> => {
|
||||
const draftId = streamDraftId ?? allocateTelegramDraftId();
|
||||
streamDraftId = draftId;
|
||||
const draftParams = {
|
||||
...(threadParams?.message_thread_id != null
|
||||
? { message_thread_id: threadParams.message_thread_id }
|
||||
: {}),
|
||||
...(renderedParseMode ? { parse_mode: renderedParseMode } : {}),
|
||||
};
|
||||
await resolvedDraftApi!(
|
||||
chatId,
|
||||
draftId,
|
||||
renderedText,
|
||||
Object.keys(draftParams).length > 0 ? draftParams : undefined,
|
||||
);
|
||||
return true;
|
||||
type PreviewSendParams = {
|
||||
renderedText: string;
|
||||
renderedParseMode: "HTML" | undefined;
|
||||
sendGeneration: number;
|
||||
};
|
||||
const sendMessageTransportPreview = async ({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
sendGeneration,
|
||||
}: PreviewSendParams): Promise<boolean> => {
|
||||
if (typeof streamMessageId === "number") {
|
||||
if (renderedParseMode) {
|
||||
await params.api.editMessageText(chatId, streamMessageId, renderedText, {
|
||||
parse_mode: renderedParseMode,
|
||||
});
|
||||
} else {
|
||||
await params.api.editMessageText(chatId, streamMessageId, renderedText);
|
||||
}
|
||||
: async ({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
sendGeneration,
|
||||
}: {
|
||||
renderedText: string;
|
||||
renderedParseMode: "HTML" | undefined;
|
||||
sendGeneration: number;
|
||||
}): Promise<boolean> => {
|
||||
if (typeof streamMessageId === "number") {
|
||||
if (renderedParseMode) {
|
||||
await params.api.editMessageText(chatId, streamMessageId, renderedText, {
|
||||
parse_mode: renderedParseMode,
|
||||
});
|
||||
} else {
|
||||
await params.api.editMessageText(chatId, streamMessageId, renderedText);
|
||||
}
|
||||
return true;
|
||||
return true;
|
||||
}
|
||||
const sendParams = renderedParseMode
|
||||
? {
|
||||
...replyParams,
|
||||
parse_mode: renderedParseMode,
|
||||
}
|
||||
const sendParams = renderedParseMode
|
||||
? {
|
||||
...replyParams,
|
||||
parse_mode: renderedParseMode,
|
||||
}
|
||||
: replyParams;
|
||||
let sent;
|
||||
try {
|
||||
sent = await params.api.sendMessage(chatId, renderedText, sendParams);
|
||||
} catch (err) {
|
||||
const hasThreadParam =
|
||||
"message_thread_id" in (sendParams ?? {}) &&
|
||||
typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number";
|
||||
if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) {
|
||||
throw err;
|
||||
}
|
||||
const threadlessParams = {
|
||||
...(sendParams as Record<string, unknown>),
|
||||
};
|
||||
delete threadlessParams.message_thread_id;
|
||||
params.warn?.(
|
||||
"telegram stream preview send failed with message_thread_id, retrying without thread",
|
||||
);
|
||||
sent = await params.api.sendMessage(
|
||||
chatId,
|
||||
renderedText,
|
||||
Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined,
|
||||
);
|
||||
}
|
||||
const sentMessageId = sent?.message_id;
|
||||
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
|
||||
streamState.stopped = true;
|
||||
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
|
||||
return false;
|
||||
}
|
||||
const normalizedMessageId = Math.trunc(sentMessageId);
|
||||
if (sendGeneration !== generation) {
|
||||
params.onSupersededPreview?.({
|
||||
messageId: normalizedMessageId,
|
||||
textSnapshot: renderedText,
|
||||
parseMode: renderedParseMode,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
streamMessageId = normalizedMessageId;
|
||||
return true;
|
||||
: replyParams;
|
||||
let sent;
|
||||
try {
|
||||
sent = await params.api.sendMessage(chatId, renderedText, sendParams);
|
||||
} catch (err) {
|
||||
const hasThreadParam =
|
||||
"message_thread_id" in (sendParams ?? {}) &&
|
||||
typeof (sendParams as { message_thread_id?: unknown }).message_thread_id === "number";
|
||||
if (!hasThreadParam || !THREAD_NOT_FOUND_RE.test(String(err))) {
|
||||
throw err;
|
||||
}
|
||||
const threadlessParams = {
|
||||
...(sendParams as Record<string, unknown>),
|
||||
};
|
||||
delete threadlessParams.message_thread_id;
|
||||
params.warn?.(
|
||||
"telegram stream preview send failed with message_thread_id, retrying without thread",
|
||||
);
|
||||
sent = await params.api.sendMessage(
|
||||
chatId,
|
||||
renderedText,
|
||||
Object.keys(threadlessParams).length > 0 ? threadlessParams : undefined,
|
||||
);
|
||||
}
|
||||
const sentMessageId = sent?.message_id;
|
||||
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
|
||||
streamState.stopped = true;
|
||||
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
|
||||
return false;
|
||||
}
|
||||
const normalizedMessageId = Math.trunc(sentMessageId);
|
||||
if (sendGeneration !== generation) {
|
||||
params.onSupersededPreview?.({
|
||||
messageId: normalizedMessageId,
|
||||
textSnapshot: renderedText,
|
||||
parseMode: renderedParseMode,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
streamMessageId = normalizedMessageId;
|
||||
return true;
|
||||
};
|
||||
const sendDraftTransportPreview = async ({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
}: PreviewSendParams): Promise<boolean> => {
|
||||
const draftId = streamDraftId ?? allocateTelegramDraftId();
|
||||
streamDraftId = draftId;
|
||||
const draftParams = {
|
||||
...(threadParams?.message_thread_id != null
|
||||
? { message_thread_id: threadParams.message_thread_id }
|
||||
: {}),
|
||||
...(renderedParseMode ? { parse_mode: renderedParseMode } : {}),
|
||||
};
|
||||
await resolvedDraftApi!(
|
||||
chatId,
|
||||
draftId,
|
||||
renderedText,
|
||||
Object.keys(draftParams).length > 0 ? draftParams : undefined,
|
||||
);
|
||||
return true;
|
||||
};
|
||||
|
||||
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
|
||||
// Allow final flush even if stopped (e.g., after clear()).
|
||||
@@ -240,11 +257,36 @@ export function createTelegramDraftStream(params: {
|
||||
lastSentText = renderedText;
|
||||
lastSentParseMode = renderedParseMode;
|
||||
try {
|
||||
const sent = await sendStreamPreview({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
sendGeneration,
|
||||
});
|
||||
let sent = false;
|
||||
if (previewTransport === "draft") {
|
||||
try {
|
||||
sent = await sendDraftTransportPreview({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
sendGeneration,
|
||||
});
|
||||
} catch (err) {
|
||||
if (!shouldFallbackFromDraftTransport(err)) {
|
||||
throw err;
|
||||
}
|
||||
previewTransport = "message";
|
||||
streamDraftId = undefined;
|
||||
params.warn?.(
|
||||
"telegram stream preview: sendMessageDraft rejected by API; falling back to sendMessage/editMessageText",
|
||||
);
|
||||
sent = await sendMessageTransportPreview({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
sendGeneration,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
sent = await sendMessageTransportPreview({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
sendGeneration,
|
||||
});
|
||||
}
|
||||
if (sent) {
|
||||
previewRevision += 1;
|
||||
}
|
||||
@@ -281,7 +323,7 @@ export function createTelegramDraftStream(params: {
|
||||
const forceNewMessage = () => {
|
||||
generation += 1;
|
||||
streamMessageId = undefined;
|
||||
if (usesDraftTransport) {
|
||||
if (previewTransport === "draft") {
|
||||
streamDraftId = allocateTelegramDraftId();
|
||||
}
|
||||
lastSentText = "";
|
||||
@@ -296,7 +338,7 @@ export function createTelegramDraftStream(params: {
|
||||
update,
|
||||
flush: loop.flush,
|
||||
messageId: () => streamMessageId,
|
||||
previewMode: () => (usesDraftTransport ? "draft" : "message"),
|
||||
previewMode: () => previewTransport,
|
||||
previewRevision: () => previewRevision,
|
||||
clear,
|
||||
stop,
|
||||
|
||||
Reference in New Issue
Block a user