fix(telegram): block watermark advancement past failed updates

This commit is contained in:
Vincent Koc
2026-04-13 16:51:57 +01:00
parent 95517edaeb
commit 54eaf85ea2
2 changed files with 88 additions and 3 deletions

View File

@@ -1049,6 +1049,72 @@ describe("createTelegramBot", () => {
}
});
it("does not persist failed updates into the watermark", async () => {
sequentializeSpy.mockImplementationOnce(
() => async (_ctx: unknown, next: () => Promise<void>) => {
await next();
},
);
const onUpdateId = vi.fn();
createTelegramBot({
token: "tok",
updateOffset: {
lastUpdateId: 200,
onUpdateId,
},
});
type Middleware = (
ctx: Record<string, unknown>,
next: () => Promise<void>,
) => Promise<void> | void;
const middlewares = middlewareUseSpy.mock.calls
.map((call) => call[0])
.filter((fn): fn is Middleware => typeof fn === "function");
const runMiddlewareChain = async (
ctx: Record<string, unknown>,
finalNext: () => Promise<void>,
) => {
let idx = -1;
const dispatch = async (i: number): Promise<void> => {
if (i <= idx) {
throw new Error("middleware dispatch called multiple times");
}
idx = i;
const fn = middlewares[i];
if (!fn) {
await finalNext();
return;
}
await fn(ctx, async () => dispatch(i + 1));
};
await dispatch(0);
};
await expect(
runMiddlewareChain({ update: { update_id: 201 } }, async () => {
throw new Error("middleware boom");
}),
).rejects.toThrow("middleware boom");
await runMiddlewareChain({ update: { update_id: 202 } }, async () => {});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(onUpdateId).not.toHaveBeenCalled();
expect(onUpdateId).not.toHaveBeenCalledWith(201);
expect(onUpdateId).not.toHaveBeenCalledWith(202);
await runMiddlewareChain({ update: { update_id: 201 } }, async () => {});
await vi.waitFor(() => {
expect(onUpdateId).toHaveBeenCalledWith(202);
});
});
it("allows distinct callback_query ids without update_id", async () => {
loadConfig.mockReturnValue({
channels: {

View File

@@ -264,6 +264,7 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
// We only persist a watermark that is strictly less than the smallest pending update_id,
// so we never write an offset that would skip an update still waiting to run.
const pendingUpdateIds = new Set<number>();
const failedUpdateIds = new Set<number>();
let highestCompletedUpdateId: number | null = initialUpdateId;
let highestPersistedUpdateId: number | null = initialUpdateId;
const maybePersistSafeWatermark = () => {
@@ -285,6 +286,17 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
safe = Math.min(safe, minPending - 1);
}
}
if (failedUpdateIds.size > 0) {
let minFailed: number | null = null;
for (const id of failedUpdateIds) {
if (minFailed === null || id < minFailed) {
minFailed = id;
}
}
if (minFailed !== null) {
safe = Math.min(safe, minFailed - 1);
}
}
if (highestPersistedUpdateId !== null && safe <= highestPersistedUpdateId) {
return;
}
@@ -312,18 +324,25 @@ export function createTelegramBot(opts: TelegramBotOptions): TelegramBotInstance
bot.use(async (ctx, next) => {
const updateId = resolveTelegramUpdateId(ctx);
let completed = false;
if (typeof updateId === "number") {
failedUpdateIds.delete(updateId);
pendingUpdateIds.add(updateId);
}
try {
await next();
completed = true;
} finally {
if (typeof updateId === "number") {
pendingUpdateIds.delete(updateId);
if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) {
highestCompletedUpdateId = updateId;
if (completed) {
if (highestCompletedUpdateId === null || updateId > highestCompletedUpdateId) {
highestCompletedUpdateId = updateId;
}
maybePersistSafeWatermark();
} else {
failedUpdateIds.add(updateId);
}
maybePersistSafeWatermark();
}
}
});