fix telegram topic bottleneck

This commit is contained in:
VACInc
2026-05-09 19:46:44 -04:00
committed by Ayaan Zaidi
parent e349a237d5
commit f5ad8e5b53
7 changed files with 174 additions and 1 deletions

View File

@@ -932,6 +932,7 @@ Docs: https://docs.openclaw.ai
- QA/Matrix: steer the live tool-progress preview check away from `HEARTBEAT.md` and report final preview candidates when the live marker reply misses the exact token. Thanks @vincentkoc.
- QA/Matrix: let the live tool-progress preview check verify progress replacement events without depending on the preview saying `Working`. Thanks @vincentkoc.
- Tlon: expose `groupInviteAllowlist` in the channel config schema and clarify that group invite auto-accept fails closed without an invite allowlist. Thanks @vincentkoc.
- Telegram: let forum-topic messages that omit `chat.is_forum` use per-topic processing lanes when Telegram still marks them as topic messages, and coalesce duplicate group typing cues so cosmetic Telegram API calls do not pile up ahead of real replies during topic bursts.
- Control UI/WebChat: collapse duplicate in-flight internal text sends onto the active Gateway run so rapid repeat submits do not start fresh `agent:main:main` dispatches. Fixes #75737. Thanks @dsdsddd1 and @BunsDev.
- Mattermost: accept the documented `channels.mattermost.streaming` config and honor `streaming: "off"` by disabling draft preview posts. Thanks @vincentkoc.
- Mattermost: expose streaming progress config labels and help text in generated channel config metadata so Control UI/docs can explain the new `channels.mattermost.streaming.progress.*` fields. Thanks @vincentkoc.

View File

@@ -59,6 +59,7 @@ const DEFAULT_TELEGRAM_BOT_RUNTIME: TelegramBotRuntime = {
sequentialize,
apiThrottler,
};
const TELEGRAM_TYPING_COALESCE_MS = 4_000;
let telegramBotRuntimeForTest: TelegramBotRuntime | undefined;
@@ -562,6 +563,7 @@ export function createTelegramBotCore(
sendChatActionFn: (chatId, action, threadParams) =>
bot.api.sendChatAction(chatId, action, threadParams),
logger: (message) => logVerbose(`telegram: ${message}`),
minIntervalMs: TELEGRAM_TYPING_COALESCE_MS,
});
const processMessage = createTelegramMessageProcessor({

View File

@@ -415,6 +415,71 @@ describe("createTelegramBot", () => {
expect(events).toEqual(["busy:start", "status", "busy:end"]);
});
it("lets Telegram topic messages without chat forum metadata use separate lanes", async () => {
installPerKeySequentializer();
loadConfig.mockReturnValue({
channels: {
telegram: {
dmPolicy: "open",
allowFrom: ["*"],
groups: { "*": { requireMention: false } },
},
},
});
const events: string[] = [];
let releaseFirstTopic!: () => void;
const firstTopicGate = new Promise<void>((resolve) => {
releaseFirstTopic = resolve;
});
createTelegramBot({ token: "tok" });
const sequentializer = sequentializeSpy.mock.results[0]?.value as
| TelegramMiddleware
| undefined;
expect(sequentializer).toBeDefined();
if (!sequentializer) {
return;
}
const topicCtx = (threadId: number, updateId: number) => {
const base = makeForumGroupMessageCtx({ threadId, text: `topic ${threadId}` });
return {
...base,
message: {
...base.message,
message_id: updateId,
is_topic_message: true,
chat: {
id: -1001234567890,
type: "supergroup",
title: "Forum Group",
},
},
update: { update_id: updateId },
};
};
const firstPromise = sequentializer(topicCtx(10, 301), async () => {
events.push("first:start");
await firstTopicGate;
events.push("first:end");
});
await flushTelegramTestMicrotasks();
expect(events).toEqual(["first:start"]);
await sequentializer(topicCtx(20, 302), async () => {
events.push("second");
});
expect(events).toEqual(["first:start", "second"]);
releaseFirstTopic();
await firstPromise;
expect(events).toEqual(["first:start", "second", "first:end"]);
});
it("keeps ordinary Telegram messages serialized within the same topic", async () => {
installPerKeySequentializer();
loadConfig.mockReturnValue({

View File

@@ -33,6 +33,53 @@ describe("createTelegramSendChatActionHandler", () => {
expect(handler.isSuspended()).toBe(false);
});
it("coalesces duplicate chat actions while one for the chat is pending", async () => {
let resolveSend: ((value: true) => void) | undefined;
const send = new Promise<true>((resolve) => {
resolveSend = resolve;
});
const fn = vi.fn(() => send);
const logger = vi.fn();
const handler = createTelegramSendChatActionHandler({
sendChatActionFn: fn,
logger,
minIntervalMs: 4000,
});
const first = handler.sendChatAction(-100, "typing", { message_thread_id: 1 });
await handler.sendChatAction(-100, "typing", { message_thread_id: 2 });
expect(fn).toHaveBeenCalledTimes(1);
expect(fn).toHaveBeenCalledWith(-100, "typing", { message_thread_id: 1 });
resolveSend?.(true);
await first;
});
it("coalesces recent same-chat actions after the pending send resolves", async () => {
let now = 1000;
const fn = vi.fn().mockResolvedValue(true);
const logger = vi.fn();
const handler = createTelegramSendChatActionHandler({
sendChatActionFn: fn,
logger,
minIntervalMs: 4000,
now: () => now,
});
await handler.sendChatAction(-100, "typing");
now = 4999;
await handler.sendChatAction(-100, "typing");
expect(fn).toHaveBeenCalledTimes(1);
await handler.sendChatAction(-100, "upload_photo");
expect(fn).toHaveBeenCalledTimes(2);
now = 5000;
await handler.sendChatAction(-100, "typing");
expect(fn).toHaveBeenCalledTimes(3);
});
it("applies exponential backoff on consecutive 401 errors", async () => {
const fn = vi.fn().mockRejectedValue(make401Error());
const logger = vi.fn();

View File

@@ -47,6 +47,12 @@ export type CreateTelegramSendChatActionHandlerParams = {
sendChatActionFn: SendChatActionFn;
logger: TelegramSendChatActionLogger;
maxConsecutive401?: number;
/**
* Best-effort per-chat/action coalescing window. Kept opt-in so tests and
* non-typing callers can preserve exact sendChatAction semantics.
*/
minIntervalMs?: number;
now?: () => number;
};
const BACKOFF_POLICY: BackoffPolicy = {
@@ -79,15 +85,27 @@ export function createTelegramSendChatActionHandler({
sendChatActionFn,
logger,
maxConsecutive401 = 10,
minIntervalMs = 0,
now = () => Date.now(),
}: CreateTelegramSendChatActionHandlerParams): TelegramSendChatActionHandler {
let consecutive401Failures = 0;
let suspended = false;
const pendingKeys = new Set<string>();
const lastAttemptAtByKey = new Map<string, number>();
const reset = () => {
consecutive401Failures = 0;
suspended = false;
pendingKeys.clear();
lastAttemptAtByKey.clear();
};
const coalesceKey = (chatId: number | string, action: ChatAction) =>
// The Telegram API throttler keys group traffic by chat_id, not thread ID.
// Coalescing at the same level keeps topic typing cues from filling the
// shared outbound lane ahead of real replies.
`${String(chatId)}:${action}`;
const sendChatAction = async (
chatId: number | string,
action: ChatAction,
@@ -97,6 +115,21 @@ export function createTelegramSendChatActionHandler({
return;
}
const shouldCoalesce = Number.isFinite(minIntervalMs) && minIntervalMs > 0;
const key = shouldCoalesce ? coalesceKey(chatId, action) : undefined;
if (key) {
if (pendingKeys.has(key)) {
return;
}
const currentTime = now();
const lastAttemptAt = lastAttemptAtByKey.get(key);
if (lastAttemptAt !== undefined && currentTime - lastAttemptAt < minIntervalMs) {
return;
}
pendingKeys.add(key);
lastAttemptAtByKey.set(key, currentTime);
}
if (consecutive401Failures > 0) {
const backoffMs = computeBackoff(BACKOFF_POLICY, consecutive401Failures);
logger(
@@ -132,6 +165,10 @@ export function createTelegramSendChatActionHandler({
}
}
throw error;
} finally {
if (key) {
pendingKeys.delete(key);
}
}
};

View File

@@ -32,6 +32,25 @@ describe("getTelegramSequentialKey", () => {
},
"telegram:123",
],
[
{
message: mockMessage({
chat: mockChat({ id: 123, type: "supergroup" }),
message_thread_id: 9,
is_topic_message: true,
}),
},
"telegram:123:topic:9",
],
[
{
message: mockMessage({
chat: mockChat({ id: 123, type: "supergroup" }),
is_topic_message: true,
}),
},
"telegram:123:topic:1",
],
[
{
message: mockMessage({

View File

@@ -96,7 +96,9 @@ export function getTelegramSequentialKey(ctx: TelegramSequentialKeyContext): str
}
const isGroup = msg?.chat?.type === "group" || msg?.chat?.type === "supergroup";
const messageThreadId = msg?.message_thread_id;
const isForum = msg?.chat?.is_forum;
const isForum =
msg?.chat?.is_forum ??
(msg?.chat?.type === "supergroup" && msg.is_topic_message === true ? true : undefined);
const threadId = isGroup
? resolveTelegramForumThreadId({ isForum, messageThreadId })
: messageThreadId;