From 7a2cc4b8d65c00eaab66771b9b5be33c3f1fe647 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Sat, 9 May 2026 09:05:15 +0530 Subject: [PATCH] fix(telegram): stop DM topic threadless fallback (#78575) (thanks @tmimmanuel) --- CHANGELOG.md | 1 + extensions/telegram/src/bot/delivery.send.ts | 39 +------------ extensions/telegram/src/bot/delivery.test.ts | 58 ++++++++------------ extensions/telegram/src/draft-stream.test.ts | 15 ++--- extensions/telegram/src/draft-stream.ts | 3 +- extensions/telegram/src/send.test.ts | 24 +++++++- extensions/telegram/src/send.ts | 11 +++- 7 files changed, 68 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 847e6f09ed1..c6ee9c824b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -302,6 +302,7 @@ Docs: https://docs.openclaw.ai - CLI/completion: guard the shell-profile source line written by `openclaw completion --install` with a file existence check (`[ -f ... ] && source ...` for bash/zsh, `test -f ...; and source ...` for fish) so uninstalling OpenClaw no longer makes new login shells error on a missing completion cache. (#78659) Thanks @sjf. - Cron/doctor: repair persisted cron jobs whose `payload.model` was stored as `"default"`, `"null"`, blank, or JSON `null` by removing the bad override during `openclaw doctor --fix` while keeping cron runtime model validation strict. Fixes #78549. Thanks @bizzle12368239. - Telegram: honor `accessGroup:*` sender allowlists for DMs, groups, native commands, and callback authorization before applying Telegram's numeric sender-ID checks. Fixes #78660. Thanks @manugc. +- Telegram: fail private-topic sends instead of retrying them as plain DMs when Telegram rejects the topic id, keeping private-topic `message_thread_id` routing intact. Fixes #79455. (#78575) Thanks @tmimmanuel. - Agent delivery: report `deliverySucceeded=false` when outbound delivery returns no adapter result, so claimed/empty delivery paths no longer masquerade as successful sends. Fixes #78532. Thanks @joeyfrasier. - Cron/isolated runs: fail implicit announce delivery before model execution when `delivery.channel=last` has no previous route, so recurring jobs do not spend tokens before hitting a permanent delivery-target error. Fixes #78608. Thanks @sallyom. - Gateway/sessions: persist a new generated transcript file when daily gateway-agent session rollover changes the session id, while preserving custom transcript paths. Fixes #78607. Thanks @nailujac, @zerone0x, and @sallyom. diff --git a/extensions/telegram/src/bot/delivery.send.ts b/extensions/telegram/src/bot/delivery.send.ts index 34deb6e6641..daf7926c63b 100644 --- a/extensions/telegram/src/bot/delivery.send.ts +++ b/extensions/telegram/src/bot/delivery.send.ts @@ -17,18 +17,10 @@ export { buildTelegramSendParams } from "../reply-parameters.js"; const PARSE_ERR_RE = /can't parse entities|parse entities|find end of the entity/i; const EMPTY_TEXT_ERR_RE = /message text is empty/i; -const THREAD_NOT_FOUND_RE = /message thread not found/i; const QUOTE_PARAM_RE = /\bquote not found\b|\bQUOTE_TEXT_INVALID\b|\bquote text invalid\b/i; const GrammyErrorCtor: typeof GrammyError | undefined = typeof GrammyError === "function" ? GrammyError : undefined; -function isTelegramThreadNotFoundError(err: unknown): boolean { - if (GrammyErrorCtor && err instanceof GrammyErrorCtor) { - return THREAD_NOT_FOUND_RE.test(err.description); - } - return THREAD_NOT_FOUND_RE.test(formatErrorMessage(err)); -} - function isTelegramQuoteParamError(err: unknown): boolean { if (GrammyErrorCtor && err instanceof GrammyErrorCtor) { return QUOTE_PARAM_RE.test(err.description); @@ -36,23 +28,6 @@ function isTelegramQuoteParamError(err: unknown): boolean { return QUOTE_PARAM_RE.test(formatErrorMessage(err)); } -function hasMessageThreadIdParam(params: Record | undefined): boolean { - if (!params) { - return false; - } - return typeof params.message_thread_id === "number"; -} - -function removeMessageThreadIdParam( - params: Record | undefined, -): Record { - if (!params) { - return {}; - } - const { message_thread_id: _ignored, ...rest } = params; - return rest; -} - function createTelegramDeliverySendRetry() { return createTelegramRetryRunner({ shouldRetry: (err) => isSafeToRetrySendError(err) || isTelegramRateLimitError(err), @@ -68,12 +43,9 @@ export async function sendTelegramWithThreadFallback(params: { send: (effectiveParams: Record) => Promise; shouldLog?: (err: unknown) => boolean; }): Promise { - const allowThreadlessRetry = params.thread?.scope === "dm"; - const hasThreadId = hasMessageThreadIdParam(params.requestParams); const hasNativeQuote = getTelegramNativeQuoteReplyMessageId(params.requestParams) != null; const shouldSuppressFirstErrorLog = (err: unknown) => - (allowThreadlessRetry && hasThreadId && isTelegramThreadNotFoundError(err)) || - (hasNativeQuote && isTelegramQuoteParamError(err)); + hasNativeQuote && isTelegramQuoteParamError(err); const mergedShouldLog = params.shouldLog ? (err: unknown) => params.shouldLog!(err) && !shouldSuppressFirstErrorLog(err) : (err: unknown) => !shouldSuppressFirstErrorLog(err); @@ -103,14 +75,7 @@ export async function sendTelegramWithThreadFallback(params: { requestParams: removeTelegramNativeQuoteParam(params.requestParams), }); } - if (!allowThreadlessRetry || !hasThreadId || !isTelegramThreadNotFoundError(err)) { - throw err; - } - const retryParams = removeMessageThreadIdParam(params.requestParams); - params.runtime.log?.( - `telegram ${params.operation}: message thread not found; retrying without message_thread_id`, - ); - return await runLoggedSend(`${params.operation} (threadless retry)`, retryParams); + throw err; } } diff --git a/extensions/telegram/src/bot/delivery.test.ts b/extensions/telegram/src/bot/delivery.test.ts index 20028fe6e4e..1f7a36d39f7 100644 --- a/extensions/telegram/src/bot/delivery.test.ts +++ b/extensions/telegram/src/bot/delivery.test.ts @@ -730,32 +730,27 @@ describe("deliverReplies", () => { ); }); - it("retries DM topic sends without message_thread_id when thread is missing", async () => { + it("does not retry DM topic sends without the topic id when the topic is missing", async () => { const runtime = createRuntime(); - const sendMessage = vi - .fn() - .mockRejectedValueOnce(createThreadNotFoundError("sendMessage")) - .mockResolvedValueOnce({ - message_id: 7, - chat: { id: "123" }, - }); + const sendMessage = vi.fn().mockRejectedValueOnce(createThreadNotFoundError("sendMessage")); const bot = createBot({ sendMessage }); - await deliverWith({ - replies: [{ text: "hello" }], - runtime, - bot, - thread: { id: 42, scope: "dm" }, - }); + await expect( + deliverWith({ + replies: [{ text: "hello" }], + runtime, + bot, + thread: { id: 42, scope: "dm" }, + }), + ).rejects.toThrow("message thread not found"); - expect(sendMessage).toHaveBeenCalledTimes(2); + expect(sendMessage).toHaveBeenCalledTimes(1); expect(sendMessage.mock.calls[0]?.[2]).toEqual( expect.objectContaining({ message_thread_id: 42, }), ); - expect(sendMessage.mock.calls[1]?.[2]).not.toHaveProperty("message_thread_id"); - expect(runtime.error).not.toHaveBeenCalled(); + expect(runtime.error).toHaveBeenCalledTimes(1); }); it("does not retry forum sends without message_thread_id", async () => { @@ -818,34 +813,29 @@ describe("deliverReplies", () => { expect(runtime.error).toHaveBeenCalledTimes(1); }); - it("retries media sends without message_thread_id for DM topics", async () => { + it("does not retry DM topic media sends without the topic id", async () => { const runtime = createRuntime(); - const sendPhoto = vi - .fn() - .mockRejectedValueOnce(createThreadNotFoundError("sendPhoto")) - .mockResolvedValueOnce({ - message_id: 8, - chat: { id: "123" }, - }); + const sendPhoto = vi.fn().mockRejectedValueOnce(createThreadNotFoundError("sendPhoto")); const bot = createBot({ sendPhoto }); mockMediaLoad("photo.jpg", "image/jpeg", "image"); - await deliverWith({ - replies: [{ mediaUrl: "https://example.com/photo.jpg", text: "caption" }], - runtime, - bot, - thread: { id: 42, scope: "dm" }, - }); + await expect( + deliverWith({ + replies: [{ mediaUrl: "https://example.com/photo.jpg", text: "caption" }], + runtime, + bot, + thread: { id: 42, scope: "dm" }, + }), + ).rejects.toThrow("message thread not found"); - expect(sendPhoto).toHaveBeenCalledTimes(2); + expect(sendPhoto).toHaveBeenCalledTimes(1); expect(sendPhoto.mock.calls[0]?.[2]).toEqual( expect.objectContaining({ message_thread_id: 42, }), ); - expect(sendPhoto.mock.calls[1]?.[2]).not.toHaveProperty("message_thread_id"); - expect(runtime.error).not.toHaveBeenCalled(); + expect(runtime.error).toHaveBeenCalledTimes(1); }); it("does not include link_preview_options when linkPreview is true", async () => { diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index cad4fc4ab16..fa775e818f0 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -145,11 +145,9 @@ describe("createTelegramDraftStream", () => { } }); - it("retries DM message preview send without thread when thread is not found", async () => { + it("does not retry DM message preview sends without the topic id", async () => { const api = createMockDraftApi(); - api.sendMessage - .mockRejectedValueOnce(new Error("400: Bad Request: message thread not found")) - .mockResolvedValueOnce({ message_id: 17 }); + api.sendMessage.mockRejectedValueOnce(new Error("400: Bad Request: message thread not found")); const warn = vi.fn(); const stream = createDraftStream(api, { thread: { id: 42, scope: "dm" }, @@ -159,11 +157,10 @@ describe("createTelegramDraftStream", () => { stream.update("Hello"); await stream.flush(); - expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello", { message_thread_id: 42 }); - expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Hello", undefined); - expect(warn).toHaveBeenCalledWith( - "telegram stream preview send failed with message_thread_id, retrying without thread", - ); + expect(api.sendMessage).toHaveBeenCalledTimes(1); + expect(api.sendMessage).toHaveBeenCalledWith(123, "Hello", { message_thread_id: 42 }); + expect(warn).toHaveBeenCalledWith(expect.stringContaining("message thread not found")); + expect(warn).not.toHaveBeenCalledWith(expect.stringContaining("retrying without thread")); }); it("keeps allow_sending_without_reply on message previews that target a reply", async () => { diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index 9a532fc5255..b93134196b8 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -109,6 +109,7 @@ export function createTelegramDraftStream(params: { const minInitialChars = params.minInitialChars; const chatId = params.chatId; const threadParams = buildTelegramThreadParams(params.thread); + const allowThreadlessRetry = params.thread?.scope !== "dm"; const replyToMessageId = normalizeTelegramReplyToMessageId(params.replyToMessageId); const replyParams = replyToMessageId != null @@ -153,7 +154,7 @@ export function createTelegramDraftStream(params: { usedThreadParams, }; } catch (err) { - if (!usedThreadParams || !THREAD_NOT_FOUND_RE.test(String(err))) { + if (!allowThreadlessRetry || !usedThreadParams || !THREAD_NOT_FOUND_RE.test(String(err))) { throw err; } const threadlessParams: TelegramSendMessageParams = { ...sendParams }; diff --git a/extensions/telegram/src/send.test.ts b/extensions/telegram/src/send.test.ts index b0ec5a7f773..d3962d139c1 100644 --- a/extensions/telegram/src/send.test.ts +++ b/extensions/telegram/src/send.test.ts @@ -1651,7 +1651,6 @@ describe("sendMessageTelegram", () => { it("retries sends without message_thread_id on thread-not-found", async () => { const cases = [ { name: "forum", chatId: "-100123", text: "hello forum", messageId: 58 }, - { name: "private", chatId: "123456789", text: "hello private", messageId: 59 }, ] as const; const threadErr = new Error("400: Bad Request: message thread not found"); @@ -1695,6 +1694,29 @@ describe("sendMessageTelegram", () => { } }); + it("does not retry private DM topic sends without the topic id", async () => { + const threadErr = new Error("400: Bad Request: message thread not found"); + const sendMessage = vi.fn().mockRejectedValueOnce(threadErr); + const api = { sendMessage } as unknown as { + sendMessage: typeof sendMessage; + }; + + await expect( + sendMessageTelegram("123456789", "hello private", { + cfg: TELEGRAM_TEST_CFG, + token: "tok", + api, + messageThreadId: 271, + }), + ).rejects.toThrow("message thread not found"); + + expect(sendMessage).toHaveBeenCalledTimes(1); + expect(sendMessage).toHaveBeenCalledWith("123456789", "hello private", { + parse_mode: "HTML", + message_thread_id: 271, + }); + }); + it("does not retry on non-retriable thread/chat errors", async () => { const cases: Array<{ chatId: string; diff --git a/extensions/telegram/src/send.ts b/extensions/telegram/src/send.ts index 025ae0d10f1..597af4c6d88 100644 --- a/extensions/telegram/src/send.ts +++ b/extensions/telegram/src/send.ts @@ -538,6 +538,7 @@ async function withTelegramThreadFallback< params: TParams, label: string, verbose: boolean | undefined, + allowThreadlessRetry: boolean, attempt: (effectiveParams: TParams, effectiveLabel: string) => Promise, ): Promise { try { @@ -545,7 +546,11 @@ async function withTelegramThreadFallback< } catch (err) { // Do not widen this fallback to cover "chat not found". // chat-not-found is routing/auth/membership/token; stripping thread IDs hides root cause. - if (!hasMessageThreadIdParam(params) || !isTelegramThreadNotFoundError(err)) { + if ( + !allowThreadlessRetry || + !hasMessageThreadIdParam(params) || + !isTelegramThreadNotFoundError(err) + ) { throw err; } if (verbose) { @@ -659,6 +664,7 @@ export async function sendMessageTelegram( params, "message", opts.verbose, + target.chatType !== "direct", async (effectiveParams, label) => { const baseParams = effectiveParams ? { ...effectiveParams } : {}; if (linkPreviewOptions) { @@ -855,6 +861,7 @@ export async function sendMessageTelegram( mediaParams, label, opts.verbose, + target.chatType !== "direct", async (effectiveParams, retryLabel) => requestWithChatNotFound(() => sender(effectiveParams), retryLabel), ); @@ -1508,6 +1515,7 @@ export async function sendStickerTelegram( stickerParams, "sticker", opts.verbose, + target.chatType !== "direct", async (effectiveParams, label) => requestWithChatNotFound(() => api.sendSticker(chatId, fileId.trim(), effectiveParams), label), ); @@ -1615,6 +1623,7 @@ export async function sendPollTelegram( pollParams, "poll", opts.verbose, + target.chatType !== "direct", async (effectiveParams, label) => requestWithChatNotFound( () => api.sendPoll(chatId, normalizedPoll.question, pollOptions, effectiveParams),