diff --git a/src/telegram/bot.create-telegram-bot.test.ts b/src/telegram/bot.create-telegram-bot.test.ts index 378c1eb1065..07edc4f5432 100644 --- a/src/telegram/bot.create-telegram-bot.test.ts +++ b/src/telegram/bot.create-telegram-bot.test.ts @@ -75,6 +75,27 @@ describe("createTelegramBot", () => { globalThis.fetch = originalFetch; } }); + it("aborts wrapped client fetch when fetchAbortSignal aborts", async () => { + const originalFetch = globalThis.fetch; + const fetchSpy = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => init?.signal); + const shutdown = new AbortController(); + globalThis.fetch = fetchSpy as unknown as typeof fetch; + try { + createTelegramBot({ token: "tok", fetchAbortSignal: shutdown.signal }); + const clientFetch = (botCtorSpy.mock.calls[0]?.[1] as { client?: { fetch?: unknown } }) + ?.client?.fetch as ((input: RequestInfo | URL, init?: RequestInit) => Promise); + expect(clientFetch).toBeTypeOf("function"); + + const observedSignal = (await clientFetch("https://example.test")) as AbortSignal; + expect(observedSignal).toBeInstanceOf(AbortSignal); + expect(observedSignal.aborted).toBe(false); + + shutdown.abort(new Error("shutdown")); + expect(observedSignal.aborted).toBe(true); + } finally { + globalThis.fetch = originalFetch; + } + }); it("applies global and per-account timeoutSeconds", () => { loadConfig.mockReturnValue({ channels: { diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 9549fe71986..b7bb8c34e60 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -54,6 +54,8 @@ export type TelegramBotOptions = { replyToMode?: ReplyToMode; proxyFetch?: typeof fetch; config?: OpenClawConfig; + /** Signal to abort in-flight Telegram API fetch requests (e.g. getUpdates) on shutdown. */ + fetchAbortSignal?: AbortSignal; updateOffset?: { lastUpdateId?: number | null; onUpdateId?: (updateId: number) => void | Promise; @@ -103,14 +105,55 @@ export function createTelegramBot(opts: TelegramBotOptions) { // grammY's ApiClientOptions types still track `node-fetch` types; Node 22+ global fetch // (undici) is structurally compatible at runtime but not assignable in TS. const fetchForClient = fetchImpl as unknown as NonNullable; + + // When a shutdown abort signal is provided, wrap fetch so every Telegram API request + // (especially long-polling getUpdates) aborts immediately on shutdown. Without this, + // the in-flight getUpdates hangs for up to 30s, and a new gateway instance starting + // its own poll triggers a 409 Conflict from Telegram. + let finalFetch: NonNullable | undefined = + shouldProvideFetch && fetchImpl ? fetchForClient : undefined; + if (opts.fetchAbortSignal) { + const baseFetch = + finalFetch ?? (globalThis.fetch as unknown as NonNullable); + const shutdownSignal = opts.fetchAbortSignal; + // Cast baseFetch to global fetch to avoid node-fetch ↔ global-fetch type divergence; + // they are runtime-compatible (the codebase already casts at every fetch boundary). + const callFetch = baseFetch as unknown as typeof globalThis.fetch; + finalFetch = ((input: RequestInfo | URL, init?: RequestInit) => { + const controller = new AbortController(); + const abortWith = (signal: AbortSignal) => controller.abort(signal.reason); + const onShutdown = () => abortWith(shutdownSignal); + let onRequestAbort: (() => void) | undefined; + if (shutdownSignal.aborted) { + abortWith(shutdownSignal); + } else { + shutdownSignal.addEventListener("abort", onShutdown, { once: true }); + } + if (init?.signal) { + if (init.signal.aborted) { + abortWith(init.signal); + } else { + onRequestAbort = () => abortWith(init.signal as AbortSignal); + init.signal.addEventListener("abort", onRequestAbort, { once: true }); + } + } + return callFetch(input, { ...init, signal: controller.signal }).finally(() => { + shutdownSignal.removeEventListener("abort", onShutdown); + if (init?.signal && onRequestAbort) { + init.signal.removeEventListener("abort", onRequestAbort); + } + }); + }) as unknown as NonNullable; + } + const timeoutSeconds = typeof telegramCfg?.timeoutSeconds === "number" && Number.isFinite(telegramCfg.timeoutSeconds) ? Math.max(1, Math.floor(telegramCfg.timeoutSeconds)) : undefined; const client: ApiClientOptions | undefined = - shouldProvideFetch || timeoutSeconds + finalFetch || timeoutSeconds ? { - ...(shouldProvideFetch && fetchImpl ? { fetch: fetchForClient } : {}), + ...(finalFetch ? { fetch: finalFetch } : {}), ...(timeoutSeconds ? { timeoutSeconds } : {}), } : undefined; diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index d5dc43c5335..bd9a35fc97c 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -63,6 +63,10 @@ const { createTelegramBotErrors } = vi.hoisted(() => ({ createTelegramBotErrors: [] as unknown[], })); +const { createTelegramBotCalls } = vi.hoisted(() => ({ + createTelegramBotCalls: [] as Array>, +})); + const { createdBotStops } = vi.hoisted(() => ({ createdBotStops: [] as Array void>>>, })); @@ -142,7 +146,8 @@ vi.mock("../config/config.js", async (importOriginal) => { }); vi.mock("./bot.js", () => ({ - createTelegramBot: () => { + createTelegramBot: (opts: Record) => { + createTelegramBotCalls.push(opts); const nextError = createTelegramBotErrors.shift(); if (nextError) { throw nextError; @@ -217,6 +222,7 @@ describe("monitorTelegramProvider (grammY)", () => { task: () => Promise.reject(new Error("runSpy called without explicit test stub")), }), ); + createTelegramBotCalls.length = 0; computeBackoff.mockClear(); sleepWithAbort.mockClear(); startTelegramWebhookSpy.mockClear(); @@ -442,6 +448,47 @@ describe("monitorTelegramProvider (grammY)", () => { expect(runSpy).toHaveBeenCalledTimes(2); }); + it("aborts the active Telegram fetch when unhandled network rejection forces restart", async () => { + const abort = new AbortController(); + let running = true; + let releaseTask: (() => void) | undefined; + const stop = vi.fn(async () => { + running = false; + releaseTask?.(); + }); + + runSpy + .mockImplementationOnce(() => + makeRunnerStub({ + task: () => + new Promise((resolve) => { + releaseTask = resolve; + }), + stop, + isRunning: () => running, + }), + ) + .mockImplementationOnce(() => + makeRunnerStub({ + task: async () => { + abort.abort(); + }, + }), + ); + + const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await vi.waitFor(() => expect(createTelegramBotCalls.length).toBeGreaterThanOrEqual(1)); + const firstSignal = createTelegramBotCalls[0]?.fetchAbortSignal; + expect(firstSignal).toBeInstanceOf(AbortSignal); + expect((firstSignal as AbortSignal).aborted).toBe(false); + + expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true); + await monitor; + + expect((firstSignal as AbortSignal).aborted).toBe(true); + expect(stop).toHaveBeenCalled(); + }); + it("passes configured webhookHost to webhook listener", async () => { await monitorTelegramProvider({ token: "tok", diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 6325670f298..29be5e05fea 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -113,6 +113,7 @@ const isGrammyHttpError = (err: unknown): boolean => { export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const log = opts.runtime?.error ?? console.error; let activeRunner: ReturnType | undefined; + let activeFetchAbort: AbortController | undefined; let forceRestarted = false; // Register handler for Grammy HttpError unhandled rejections. @@ -129,6 +130,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { // polling stuck; force-stop the active runner so the loop can recover. if (isNetworkError && activeRunner && activeRunner.isRunning()) { forceRestarted = true; + activeFetchAbort?.abort(); void activeRunner.stop().catch(() => {}); log( `[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`, @@ -241,7 +243,9 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { ); }; - const createPollingBot = async (): Promise => { + const createPollingBot = async ( + fetchAbortController: AbortController, + ): Promise => { try { return createTelegramBot({ token, @@ -249,6 +253,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { proxyFetch, config: cfg, accountId: account.accountId, + fetchAbortSignal: fetchAbortController.signal, updateOffset: { lastUpdateId, onUpdateId: persistUpdateId, @@ -298,7 +303,10 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } }; - const runPollingCycle = async (bot: TelegramBot): Promise<"continue" | "exit"> => { + const runPollingCycle = async ( + bot: TelegramBot, + fetchAbortController: AbortController, + ): Promise<"continue" | "exit"> => { // Confirm the persisted offset with Telegram so the runner (which starts // at offset 0) does not re-fetch already-processed updates on restart. await confirmPersistedOffset(bot); @@ -317,6 +325,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { let stopPromise: Promise | undefined; let stalledRestart = false; const stopRunner = () => { + fetchAbortController.abort(); stopPromise ??= Promise.resolve(runner.stop()) .then(() => undefined) .catch(() => { @@ -393,12 +402,20 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { opts.abortSignal?.removeEventListener("abort", stopOnAbort); await stopRunner(); await stopBot(); + if (activeFetchAbort === fetchAbortController) { + activeFetchAbort = undefined; + } } }; while (!opts.abortSignal?.aborted) { - const bot = await createPollingBot(); + const fetchAbortController = new AbortController(); + activeFetchAbort = fetchAbortController; + const bot = await createPollingBot(fetchAbortController); if (!bot) { + if (activeFetchAbort === fetchAbortController) { + activeFetchAbort = undefined; + } continue; } @@ -410,7 +427,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { return; } - const state = await runPollingCycle(bot); + const state = await runPollingCycle(bot, fetchAbortController); if (state === "exit") { return; }