From fa6436eaf3e836a45880a2e5ba8b779a544d2aed Mon Sep 17 00:00:00 2001 From: George Kalogirou Date: Mon, 23 Feb 2026 00:14:13 +0200 Subject: [PATCH] fix(telegram): abort in-flight getUpdates fetch on shutdown When the gateway receives SIGTERM, runner.stop() stops the grammY polling loop but does not abort the in-flight getUpdates HTTP request. That request hangs for up to 30 seconds (the Telegram API timeout). If a new gateway instance starts polling during that window, Telegram returns a 409 Conflict error, causing message loss and requiring exponential backoff recovery. This is especially problematic with service managers (launchd, systemd) that restart the process immediately after SIGTERM. Wire an AbortController into the fetch layer so every Telegram API request (especially the long-polling getUpdates) aborts immediately on shutdown: - bot.ts: Accept optional fetchAbortSignal in TelegramBotOptions; wrap the grammY fetch with AbortSignal.any() to merge the shutdown signal. - monitor.ts: Create a per-iteration AbortController, pass its signal to createTelegramBot, and abort it from the SIGTERM handler, force-restart path, and finally block. Co-Authored-By: Claude Opus 4.6 --- src/telegram/bot.create-telegram-bot.test.ts | 21 +++++++++ src/telegram/bot.ts | 47 ++++++++++++++++++- src/telegram/monitor.test.ts | 49 +++++++++++++++++++- src/telegram/monitor.ts | 25 ++++++++-- 4 files changed, 135 insertions(+), 7 deletions(-) diff --git a/src/telegram/bot.create-telegram-bot.test.ts b/src/telegram/bot.create-telegram-bot.test.ts index 378c1eb1065..07edc4f5432 100644 --- a/src/telegram/bot.create-telegram-bot.test.ts +++ b/src/telegram/bot.create-telegram-bot.test.ts @@ -75,6 +75,27 @@ describe("createTelegramBot", () => { globalThis.fetch = originalFetch; } }); + it("aborts wrapped client fetch when fetchAbortSignal aborts", async () => { + const originalFetch = globalThis.fetch; + const fetchSpy = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => init?.signal); + const shutdown = new AbortController(); + globalThis.fetch = fetchSpy as unknown as typeof fetch; + try { + createTelegramBot({ token: "tok", fetchAbortSignal: shutdown.signal }); + const clientFetch = (botCtorSpy.mock.calls[0]?.[1] as { client?: { fetch?: unknown } }) + ?.client?.fetch as ((input: RequestInfo | URL, init?: RequestInit) => Promise); + expect(clientFetch).toBeTypeOf("function"); + + const observedSignal = (await clientFetch("https://example.test")) as AbortSignal; + expect(observedSignal).toBeInstanceOf(AbortSignal); + expect(observedSignal.aborted).toBe(false); + + shutdown.abort(new Error("shutdown")); + expect(observedSignal.aborted).toBe(true); + } finally { + globalThis.fetch = originalFetch; + } + }); it("applies global and per-account timeoutSeconds", () => { loadConfig.mockReturnValue({ channels: { diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 9549fe71986..b7bb8c34e60 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -54,6 +54,8 @@ export type TelegramBotOptions = { replyToMode?: ReplyToMode; proxyFetch?: typeof fetch; config?: OpenClawConfig; + /** Signal to abort in-flight Telegram API fetch requests (e.g. getUpdates) on shutdown. */ + fetchAbortSignal?: AbortSignal; updateOffset?: { lastUpdateId?: number | null; onUpdateId?: (updateId: number) => void | Promise; @@ -103,14 +105,55 @@ export function createTelegramBot(opts: TelegramBotOptions) { // grammY's ApiClientOptions types still track `node-fetch` types; Node 22+ global fetch // (undici) is structurally compatible at runtime but not assignable in TS. const fetchForClient = fetchImpl as unknown as NonNullable; + + // When a shutdown abort signal is provided, wrap fetch so every Telegram API request + // (especially long-polling getUpdates) aborts immediately on shutdown. Without this, + // the in-flight getUpdates hangs for up to 30s, and a new gateway instance starting + // its own poll triggers a 409 Conflict from Telegram. + let finalFetch: NonNullable | undefined = + shouldProvideFetch && fetchImpl ? fetchForClient : undefined; + if (opts.fetchAbortSignal) { + const baseFetch = + finalFetch ?? (globalThis.fetch as unknown as NonNullable); + const shutdownSignal = opts.fetchAbortSignal; + // Cast baseFetch to global fetch to avoid node-fetch ↔ global-fetch type divergence; + // they are runtime-compatible (the codebase already casts at every fetch boundary). + const callFetch = baseFetch as unknown as typeof globalThis.fetch; + finalFetch = ((input: RequestInfo | URL, init?: RequestInit) => { + const controller = new AbortController(); + const abortWith = (signal: AbortSignal) => controller.abort(signal.reason); + const onShutdown = () => abortWith(shutdownSignal); + let onRequestAbort: (() => void) | undefined; + if (shutdownSignal.aborted) { + abortWith(shutdownSignal); + } else { + shutdownSignal.addEventListener("abort", onShutdown, { once: true }); + } + if (init?.signal) { + if (init.signal.aborted) { + abortWith(init.signal); + } else { + onRequestAbort = () => abortWith(init.signal as AbortSignal); + init.signal.addEventListener("abort", onRequestAbort, { once: true }); + } + } + return callFetch(input, { ...init, signal: controller.signal }).finally(() => { + shutdownSignal.removeEventListener("abort", onShutdown); + if (init?.signal && onRequestAbort) { + init.signal.removeEventListener("abort", onRequestAbort); + } + }); + }) as unknown as NonNullable; + } + const timeoutSeconds = typeof telegramCfg?.timeoutSeconds === "number" && Number.isFinite(telegramCfg.timeoutSeconds) ? Math.max(1, Math.floor(telegramCfg.timeoutSeconds)) : undefined; const client: ApiClientOptions | undefined = - shouldProvideFetch || timeoutSeconds + finalFetch || timeoutSeconds ? { - ...(shouldProvideFetch && fetchImpl ? { fetch: fetchForClient } : {}), + ...(finalFetch ? { fetch: finalFetch } : {}), ...(timeoutSeconds ? { timeoutSeconds } : {}), } : undefined; diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index d5dc43c5335..bd9a35fc97c 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -63,6 +63,10 @@ const { createTelegramBotErrors } = vi.hoisted(() => ({ createTelegramBotErrors: [] as unknown[], })); +const { createTelegramBotCalls } = vi.hoisted(() => ({ + createTelegramBotCalls: [] as Array>, +})); + const { createdBotStops } = vi.hoisted(() => ({ createdBotStops: [] as Array void>>>, })); @@ -142,7 +146,8 @@ vi.mock("../config/config.js", async (importOriginal) => { }); vi.mock("./bot.js", () => ({ - createTelegramBot: () => { + createTelegramBot: (opts: Record) => { + createTelegramBotCalls.push(opts); const nextError = createTelegramBotErrors.shift(); if (nextError) { throw nextError; @@ -217,6 +222,7 @@ describe("monitorTelegramProvider (grammY)", () => { task: () => Promise.reject(new Error("runSpy called without explicit test stub")), }), ); + createTelegramBotCalls.length = 0; computeBackoff.mockClear(); sleepWithAbort.mockClear(); startTelegramWebhookSpy.mockClear(); @@ -442,6 +448,47 @@ describe("monitorTelegramProvider (grammY)", () => { expect(runSpy).toHaveBeenCalledTimes(2); }); + it("aborts the active Telegram fetch when unhandled network rejection forces restart", async () => { + const abort = new AbortController(); + let running = true; + let releaseTask: (() => void) | undefined; + const stop = vi.fn(async () => { + running = false; + releaseTask?.(); + }); + + runSpy + .mockImplementationOnce(() => + makeRunnerStub({ + task: () => + new Promise((resolve) => { + releaseTask = resolve; + }), + stop, + isRunning: () => running, + }), + ) + .mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); + + const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await vi.waitFor(() => expect(createTelegramBotCalls.length).toBeGreaterThanOrEqual(1)); + const firstSignal = createTelegramBotCalls[0]?.fetchAbortSignal; + expect(firstSignal).toBeInstanceOf(AbortSignal); + expect((firstSignal as AbortSignal).aborted).toBe(false); + + expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true); + await monitor; + + expect((firstSignal as AbortSignal).aborted).toBe(true); + expect(stop).toHaveBeenCalled(); + }); + it("passes configured webhookHost to webhook listener", async () => { await monitorTelegramProvider({ token: "tok", diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 6325670f298..29be5e05fea 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -113,6 +113,7 @@ const isGrammyHttpError = (err: unknown): boolean => { export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const log = opts.runtime?.error ?? console.error; let activeRunner: ReturnType | undefined; + let activeFetchAbort: AbortController | undefined; let forceRestarted = false; // Register handler for Grammy HttpError unhandled rejections. @@ -129,6 +130,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { // polling stuck; force-stop the active runner so the loop can recover. if (isNetworkError && activeRunner && activeRunner.isRunning()) { forceRestarted = true; + activeFetchAbort?.abort(); void activeRunner.stop().catch(() => {}); log( `[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`, @@ -241,7 +243,9 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { ); }; - const createPollingBot = async (): Promise => { + const createPollingBot = async ( + fetchAbortController: AbortController, + ): Promise => { try { return createTelegramBot({ token, @@ -249,6 +253,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { proxyFetch, config: cfg, accountId: account.accountId, + fetchAbortSignal: fetchAbortController.signal, updateOffset: { lastUpdateId, onUpdateId: persistUpdateId, @@ -298,7 +303,10 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } }; - const runPollingCycle = async (bot: TelegramBot): Promise<"continue" | "exit"> => { + const runPollingCycle = async ( + bot: TelegramBot, + fetchAbortController: AbortController, + ): Promise<"continue" | "exit"> => { // Confirm the persisted offset with Telegram so the runner (which starts // at offset 0) does not re-fetch already-processed updates on restart. await confirmPersistedOffset(bot); @@ -317,6 +325,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { let stopPromise: Promise | undefined; let stalledRestart = false; const stopRunner = () => { + fetchAbortController.abort(); stopPromise ??= Promise.resolve(runner.stop()) .then(() => undefined) .catch(() => { @@ -393,12 +402,20 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { opts.abortSignal?.removeEventListener("abort", stopOnAbort); await stopRunner(); await stopBot(); + if (activeFetchAbort === fetchAbortController) { + activeFetchAbort = undefined; + } } }; while (!opts.abortSignal?.aborted) { - const bot = await createPollingBot(); + const fetchAbortController = new AbortController(); + activeFetchAbort = fetchAbortController; + const bot = await createPollingBot(fetchAbortController); if (!bot) { + if (activeFetchAbort === fetchAbortController) { + activeFetchAbort = undefined; + } continue; } @@ -410,7 +427,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { return; } - const state = await runPollingCycle(bot); + const state = await runPollingCycle(bot, fetchAbortController); if (state === "exit") { return; }