From f5ad8e5b53d418e17390a75b004fcccfb5fc064a Mon Sep 17 00:00:00 2001 From: VACInc <3279061+VACInc@users.noreply.github.com> Date: Sat, 9 May 2026 19:46:44 -0400 Subject: [PATCH] fix telegram topic bottleneck --- CHANGELOG.md | 1 + extensions/telegram/src/bot-core.ts | 2 + .../src/bot.create-telegram-bot.test.ts | 65 +++++++++++++++++++ .../src/sendchataction-401-backoff.test.ts | 47 ++++++++++++++ .../src/sendchataction-401-backoff.ts | 37 +++++++++++ .../telegram/src/sequential-key.test.ts | 19 ++++++ extensions/telegram/src/sequential-key.ts | 4 +- 7 files changed, 174 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4146eb85fe0..a19ece40351 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -932,6 +932,7 @@ Docs: https://docs.openclaw.ai - QA/Matrix: steer the live tool-progress preview check away from `HEARTBEAT.md` and report final preview candidates when the live marker reply misses the exact token. Thanks @vincentkoc. - QA/Matrix: let the live tool-progress preview check verify progress replacement events without depending on the preview saying `Working`. Thanks @vincentkoc. - Tlon: expose `groupInviteAllowlist` in the channel config schema and clarify that group invite auto-accept fails closed without an invite allowlist. Thanks @vincentkoc. +- Telegram: let forum-topic messages that omit `chat.is_forum` use per-topic processing lanes when Telegram still marks them as topic messages, and coalesce duplicate group typing cues so cosmetic Telegram API calls do not pile up ahead of real replies during topic bursts. - Control UI/WebChat: collapse duplicate in-flight internal text sends onto the active Gateway run so rapid repeat submits do not start fresh `agent:main:main` dispatches. Fixes #75737. Thanks @dsdsddd1 and @BunsDev. - Mattermost: accept the documented `channels.mattermost.streaming` config and honor `streaming: "off"` by disabling draft preview posts. Thanks @vincentkoc. - Mattermost: expose streaming progress config labels and help text in generated channel config metadata so Control UI/docs can explain the new `channels.mattermost.streaming.progress.*` fields. Thanks @vincentkoc. diff --git a/extensions/telegram/src/bot-core.ts b/extensions/telegram/src/bot-core.ts index 64606f32eef..45a308c9239 100644 --- a/extensions/telegram/src/bot-core.ts +++ b/extensions/telegram/src/bot-core.ts @@ -59,6 +59,7 @@ const DEFAULT_TELEGRAM_BOT_RUNTIME: TelegramBotRuntime = { sequentialize, apiThrottler, }; +const TELEGRAM_TYPING_COALESCE_MS = 4_000; let telegramBotRuntimeForTest: TelegramBotRuntime | undefined; @@ -562,6 +563,7 @@ export function createTelegramBotCore( sendChatActionFn: (chatId, action, threadParams) => bot.api.sendChatAction(chatId, action, threadParams), logger: (message) => logVerbose(`telegram: ${message}`), + minIntervalMs: TELEGRAM_TYPING_COALESCE_MS, }); const processMessage = createTelegramMessageProcessor({ diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index 6fd0ff2bffa..c06fe37b537 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -415,6 +415,71 @@ describe("createTelegramBot", () => { expect(events).toEqual(["busy:start", "status", "busy:end"]); }); + it("lets Telegram topic messages without chat forum metadata use separate lanes", async () => { + installPerKeySequentializer(); + loadConfig.mockReturnValue({ + channels: { + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + groups: { "*": { requireMention: false } }, + }, + }, + }); + + const events: string[] = []; + let releaseFirstTopic!: () => void; + const firstTopicGate = new Promise((resolve) => { + releaseFirstTopic = resolve; + }); + + createTelegramBot({ token: "tok" }); + const sequentializer = sequentializeSpy.mock.results[0]?.value as + | TelegramMiddleware + | undefined; + expect(sequentializer).toBeDefined(); + if (!sequentializer) { + return; + } + + const topicCtx = (threadId: number, updateId: number) => { + const base = makeForumGroupMessageCtx({ threadId, text: `topic ${threadId}` }); + return { + ...base, + message: { + ...base.message, + message_id: updateId, + is_topic_message: true, + chat: { + id: -1001234567890, + type: "supergroup", + title: "Forum Group", + }, + }, + update: { update_id: updateId }, + }; + }; + + const firstPromise = sequentializer(topicCtx(10, 301), async () => { + events.push("first:start"); + await firstTopicGate; + events.push("first:end"); + }); + + await flushTelegramTestMicrotasks(); + expect(events).toEqual(["first:start"]); + + await sequentializer(topicCtx(20, 302), async () => { + events.push("second"); + }); + + expect(events).toEqual(["first:start", "second"]); + + releaseFirstTopic(); + await firstPromise; + expect(events).toEqual(["first:start", "second", "first:end"]); + }); + it("keeps ordinary Telegram messages serialized within the same topic", async () => { installPerKeySequentializer(); loadConfig.mockReturnValue({ diff --git a/extensions/telegram/src/sendchataction-401-backoff.test.ts b/extensions/telegram/src/sendchataction-401-backoff.test.ts index e070066fb7e..f18a5d34b1f 100644 --- a/extensions/telegram/src/sendchataction-401-backoff.test.ts +++ b/extensions/telegram/src/sendchataction-401-backoff.test.ts @@ -33,6 +33,53 @@ describe("createTelegramSendChatActionHandler", () => { expect(handler.isSuspended()).toBe(false); }); + it("coalesces duplicate chat actions while one for the chat is pending", async () => { + let resolveSend: ((value: true) => void) | undefined; + const send = new Promise((resolve) => { + resolveSend = resolve; + }); + const fn = vi.fn(() => send); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + minIntervalMs: 4000, + }); + + const first = handler.sendChatAction(-100, "typing", { message_thread_id: 1 }); + await handler.sendChatAction(-100, "typing", { message_thread_id: 2 }); + + expect(fn).toHaveBeenCalledTimes(1); + expect(fn).toHaveBeenCalledWith(-100, "typing", { message_thread_id: 1 }); + + resolveSend?.(true); + await first; + }); + + it("coalesces recent same-chat actions after the pending send resolves", async () => { + let now = 1000; + const fn = vi.fn().mockResolvedValue(true); + const logger = vi.fn(); + const handler = createTelegramSendChatActionHandler({ + sendChatActionFn: fn, + logger, + minIntervalMs: 4000, + now: () => now, + }); + + await handler.sendChatAction(-100, "typing"); + now = 4999; + await handler.sendChatAction(-100, "typing"); + expect(fn).toHaveBeenCalledTimes(1); + + await handler.sendChatAction(-100, "upload_photo"); + expect(fn).toHaveBeenCalledTimes(2); + + now = 5000; + await handler.sendChatAction(-100, "typing"); + expect(fn).toHaveBeenCalledTimes(3); + }); + it("applies exponential backoff on consecutive 401 errors", async () => { const fn = vi.fn().mockRejectedValue(make401Error()); const logger = vi.fn(); diff --git a/extensions/telegram/src/sendchataction-401-backoff.ts b/extensions/telegram/src/sendchataction-401-backoff.ts index 48418f69942..21b3a3ea6b7 100644 --- a/extensions/telegram/src/sendchataction-401-backoff.ts +++ b/extensions/telegram/src/sendchataction-401-backoff.ts @@ -47,6 +47,12 @@ export type CreateTelegramSendChatActionHandlerParams = { sendChatActionFn: SendChatActionFn; logger: TelegramSendChatActionLogger; maxConsecutive401?: number; + /** + * Best-effort per-chat/action coalescing window. Kept opt-in so tests and + * non-typing callers can preserve exact sendChatAction semantics. + */ + minIntervalMs?: number; + now?: () => number; }; const BACKOFF_POLICY: BackoffPolicy = { @@ -79,15 +85,27 @@ export function createTelegramSendChatActionHandler({ sendChatActionFn, logger, maxConsecutive401 = 10, + minIntervalMs = 0, + now = () => Date.now(), }: CreateTelegramSendChatActionHandlerParams): TelegramSendChatActionHandler { let consecutive401Failures = 0; let suspended = false; + const pendingKeys = new Set(); + const lastAttemptAtByKey = new Map(); const reset = () => { consecutive401Failures = 0; suspended = false; + pendingKeys.clear(); + lastAttemptAtByKey.clear(); }; + const coalesceKey = (chatId: number | string, action: ChatAction) => + // The Telegram API throttler keys group traffic by chat_id, not thread ID. + // Coalescing at the same level keeps topic typing cues from filling the + // shared outbound lane ahead of real replies. + `${String(chatId)}:${action}`; + const sendChatAction = async ( chatId: number | string, action: ChatAction, @@ -97,6 +115,21 @@ export function createTelegramSendChatActionHandler({ return; } + const shouldCoalesce = Number.isFinite(minIntervalMs) && minIntervalMs > 0; + const key = shouldCoalesce ? coalesceKey(chatId, action) : undefined; + if (key) { + if (pendingKeys.has(key)) { + return; + } + const currentTime = now(); + const lastAttemptAt = lastAttemptAtByKey.get(key); + if (lastAttemptAt !== undefined && currentTime - lastAttemptAt < minIntervalMs) { + return; + } + pendingKeys.add(key); + lastAttemptAtByKey.set(key, currentTime); + } + if (consecutive401Failures > 0) { const backoffMs = computeBackoff(BACKOFF_POLICY, consecutive401Failures); logger( @@ -132,6 +165,10 @@ export function createTelegramSendChatActionHandler({ } } throw error; + } finally { + if (key) { + pendingKeys.delete(key); + } } }; diff --git a/extensions/telegram/src/sequential-key.test.ts b/extensions/telegram/src/sequential-key.test.ts index b24f59d6cd9..a89aae578db 100644 --- a/extensions/telegram/src/sequential-key.test.ts +++ b/extensions/telegram/src/sequential-key.test.ts @@ -32,6 +32,25 @@ describe("getTelegramSequentialKey", () => { }, "telegram:123", ], + [ + { + message: mockMessage({ + chat: mockChat({ id: 123, type: "supergroup" }), + message_thread_id: 9, + is_topic_message: true, + }), + }, + "telegram:123:topic:9", + ], + [ + { + message: mockMessage({ + chat: mockChat({ id: 123, type: "supergroup" }), + is_topic_message: true, + }), + }, + "telegram:123:topic:1", + ], [ { message: mockMessage({ diff --git a/extensions/telegram/src/sequential-key.ts b/extensions/telegram/src/sequential-key.ts index fa15b733ad9..830a2001e63 100644 --- a/extensions/telegram/src/sequential-key.ts +++ b/extensions/telegram/src/sequential-key.ts @@ -96,7 +96,9 @@ export function getTelegramSequentialKey(ctx: TelegramSequentialKeyContext): str } const isGroup = msg?.chat?.type === "group" || msg?.chat?.type === "supergroup"; const messageThreadId = msg?.message_thread_id; - const isForum = msg?.chat?.is_forum; + const isForum = + msg?.chat?.is_forum ?? + (msg?.chat?.type === "supergroup" && msg.is_topic_message === true ? true : undefined); const threadId = isGroup ? resolveTelegramForumThreadId({ isForum, messageThreadId }) : messageThreadId;