From 7c712559486704f71fd2c70f528febde789b6807 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 17:03:09 +0100 Subject: [PATCH] fix(telegram): defer replay commit until update succeeds --- .../src/bot.create-telegram-bot.test.ts | 66 +++++++++++++++++++ extensions/telegram/src/bot.ts | 43 +++++++++++- 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index b6a82e2a69e..01fec728459 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -2680,4 +2680,70 @@ describe("createTelegramBot", () => { expect(replySpy).toHaveBeenCalledTimes(1); }); + + it("retries native command updates after a bubbled handler failure", async () => { + loadConfig.mockReturnValue({ + commands: { native: true }, + channels: { + telegram: { + dmPolicy: "open", + allowFrom: ["*"], + }, + }, + }); + + createTelegramBot({ token: "tok" }); + const verboseHandler = commandSpy.mock.calls.find((call) => call[0] === "verbose")?.[1] as + | ((ctx: Record) => Promise) + | undefined; + if (!verboseHandler) { + throw new Error("verbose command handler missing"); + } + + const middlewares = middlewareUseSpy.mock.calls + .map((call) => call[0]) + .filter( + (fn): fn is (ctx: Record, next: () => Promise) => Promise => + typeof fn === "function", + ); + const runMiddlewareChain = async (ctx: Record) => { + let idx = -1; + const dispatch = async (i: number): Promise => { + if (i <= idx) { + throw new Error("middleware dispatch called multiple times"); + } + idx = i; + const fn = middlewares[i]; + if (!fn) { + await verboseHandler(ctx); + return; + } + await fn(ctx, async () => dispatch(i + 1)); + }; + await dispatch(0); + }; + + const ctx = { + update: { update_id: 333 }, + message: { + chat: { id: 12345, type: "private" }, + from: { id: 12345, username: "testuser" }, + text: "/verbose on", + date: 1736380800, + message_id: 42, + }, + match: "on", + }; + + const loadConfigCallsBeforeRetry = loadConfig.mock.calls.length; + loadConfig.mockImplementationOnce(() => { + throw new Error("cfg boom"); + }); + await expect(runMiddlewareChain(ctx)).rejects.toThrow("cfg boom"); + const loadConfigCallsAfterFailure = loadConfig.mock.calls.length; + await runMiddlewareChain(ctx); + + expect(loadConfigCallsAfterFailure).toBe(loadConfigCallsBeforeRetry + 1); + expect(loadConfig.mock.calls.length).toBeGreaterThan(loadConfigCallsAfterFailure); + }); }); diff --git a/extensions/telegram/src/bot.ts b/extensions/telegram/src/bot.ts index ae39458acea..773f39e647b 100644 --- a/extensions/telegram/src/bot.ts +++ b/extensions/telegram/src/bot.ts @@ -256,6 +256,8 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance }); const recentUpdates = createTelegramUpdateDedupe(); + const pendingUpdateKeys = new Set(); + const activeHandledUpdateKeys = new Map(); const initialUpdateId = typeof opts.updateOffset?.lastUpdateId === "number" ? opts.updateOffset.lastUpdateId : null; @@ -308,6 +310,12 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance }); }; + const logSkippedUpdate = (key: string) => { + if (shouldLogVerbose()) { + logVerbose(`telegram dedupe: skipped ${key}`); + } + }; + const shouldSkipUpdate = (ctx: TelegramUpdateKeyContext) => { const updateId = resolveTelegramUpdateId(ctx); const skipCutoff = highestPersistedUpdateId ?? initialUpdateId; @@ -315,24 +323,55 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance return true; } const key = buildTelegramUpdateKey(ctx); + if (!key) { + return false; + } + const handled = activeHandledUpdateKeys.get(key); + if (handled != null) { + if (handled) { + logSkippedUpdate(key); + return true; + } + activeHandledUpdateKeys.set(key, true); + return false; + } const skipped = recentUpdates.check(key); - if (skipped && key && shouldLogVerbose()) { - logVerbose(`telegram dedupe: skipped ${key}`); + if (skipped) { + logSkippedUpdate(key); } return skipped; }; bot.use(async (ctx, next) => { const updateId = resolveTelegramUpdateId(ctx); + const updateKey = buildTelegramUpdateKey(ctx); let completed = false; if (typeof updateId === "number") { failedUpdateIds.delete(updateId); pendingUpdateIds.add(updateId); } + if (updateKey) { + if (pendingUpdateKeys.has(updateKey) || recentUpdates.peek(updateKey)) { + logSkippedUpdate(updateKey); + if (typeof updateId === "number") { + pendingUpdateIds.delete(updateId); + } + return; + } + pendingUpdateKeys.add(updateKey); + activeHandledUpdateKeys.set(updateKey, false); + } try { await next(); completed = true; } finally { + if (updateKey) { + activeHandledUpdateKeys.delete(updateKey); + if (completed) { + recentUpdates.check(updateKey); + } + pendingUpdateKeys.delete(updateKey); + } if (typeof updateId === "number") { pendingUpdateIds.delete(updateId); if (completed) {