diff --git a/extensions/telegram/src/bot.create-telegram-bot.test.ts b/extensions/telegram/src/bot.create-telegram-bot.test.ts index c1bea47f7d1..b6a82e2a69e 100644 --- a/extensions/telegram/src/bot.create-telegram-bot.test.ts +++ b/extensions/telegram/src/bot.create-telegram-bot.test.ts @@ -1049,6 +1049,72 @@ describe("createTelegramBot", () => { } }); + it("does not persist failed updates into the watermark", async () => { + sequentializeSpy.mockImplementationOnce( + () => async (_ctx: unknown, next: () => Promise) => { + await next(); + }, + ); + + const onUpdateId = vi.fn(); + + createTelegramBot({ + token: "tok", + updateOffset: { + lastUpdateId: 200, + onUpdateId, + }, + }); + + type Middleware = ( + ctx: Record, + next: () => Promise, + ) => Promise | void; + + const middlewares = middlewareUseSpy.mock.calls + .map((call) => call[0]) + .filter((fn): fn is Middleware => typeof fn === "function"); + + const runMiddlewareChain = async ( + ctx: Record, + finalNext: () => Promise, + ) => { + let idx = -1; + const dispatch = async (i: number): Promise => { + if (i <= idx) { + throw new Error("middleware dispatch called multiple times"); + } + idx = i; + const fn = middlewares[i]; + if (!fn) { + await finalNext(); + return; + } + await fn(ctx, async () => dispatch(i + 1)); + }; + await dispatch(0); + }; + + await expect( + runMiddlewareChain({ update: { update_id: 201 } }, async () => { + throw new Error("middleware boom"); + }), + ).rejects.toThrow("middleware boom"); + + await runMiddlewareChain({ update: { update_id: 202 } }, async () => {}); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(onUpdateId).not.toHaveBeenCalled(); + expect(onUpdateId).not.toHaveBeenCalledWith(201); + expect(onUpdateId).not.toHaveBeenCalledWith(202); + + await runMiddlewareChain({ update: { update_id: 201 } }, async () => {}); + + await vi.waitFor(() => { + expect(onUpdateId).toHaveBeenCalledWith(202); + }); + }); + it("allows distinct callback_query ids without update_id", async () => { loadConfig.mockReturnValue({ channels: { diff --git a/extensions/telegram/src/bot.ts b/extensions/telegram/src/bot.ts index 083654c62c9..ae39458acea 100644 --- a/extensions/telegram/src/bot.ts +++ b/extensions/telegram/src/bot.ts @@ -264,6 +264,7 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance // We only persist a watermark that is strictly less than the smallest pending update_id, // so we never write an offset that would skip an update still waiting to run. const pendingUpdateIds = new Set(); + const failedUpdateIds = new Set(); let highestCompletedUpdateId: number | null = initialUpdateId; let highestPersistedUpdateId: number | null = initialUpdateId; const maybePersistSafeWatermark = () => { @@ -285,6 +286,17 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance safe = Math.min(safe, minPending - 1); } } + if (failedUpdateIds.size > 0) { + let minFailed: number | null = null; + for (const id of failedUpdateIds) { + if (minFailed === null || id < minFailed) { + minFailed = id; + } + } + if (minFailed !== null) { + safe = Math.min(safe, minFailed - 1); + } + } if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) { return; } @@ -312,18 +324,25 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance bot.use(async (ctx, next) => { const updateId = resolveTelegramUpdateId(ctx); + let completed = false; if (typeof updateId === "number") { + failedUpdateIds.delete(updateId); pendingUpdateIds.add(updateId); } try { await next(); + completed = true; } finally { if (typeof updateId === "number") { pendingUpdateIds.delete(updateId); - if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) { - highestCompletedUpdateId = updateId; + if (completed) { + if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) { + highestCompletedUpdateId = updateId; + } + maybePersistSafeWatermark(); + } else { + failedUpdateIds.add(updateId); } - maybePersistSafeWatermark(); } } });