From 4d0ca7c31533930748d569c687c95bf2f638c097 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 17:06:59 +0100 Subject: [PATCH] fix(telegram): restart stalled polling after unhandled network errors --- CHANGELOG.md | 1 + src/telegram/monitor.test.ts | 61 ++++++++++++++++++++++++++++++++++++ src/telegram/monitor.ts | 29 +++++++++++++++-- 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7afed7fafe..b457be7ae46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,7 @@ Docs: https://docs.openclaw.ai - Telegram/Streaming: preserve archived draft preview mapping after flush and clean superseded reasoning preview bubbles so multi-message preview finals no longer cross-edit or orphan stale messages under send/rotation races. (#23202) Thanks @obviyus. - Telegram/Replies: extract forwarded-origin context from unified reply targets (`reply_to_message` and `external_reply`) so forward+comment metadata is preserved across partial reply shapes. (#9720) thanks @mcaxtr. - Telegram/Polling: persist a safe update-offset watermark bounded by pending updates so crash/restart cannot skip queued lower `update_id` updates after out-of-order completion. (#23284) thanks @frankekn. +- Telegram/Polling: force-restart stuck runner instances when recoverable unhandled network rejections escape the polling task path, so polling resumes instead of silently stalling. (#19721) Thanks @jg-noncelogic. - Slack/Slash commands: preserve the Bolt app receiver when registering external select options handlers so monitor startup does not crash on runtimes that require bound `app.options` calls. (#23209) Thanks @0xgaia. - Slack/Telegram slash sessions: await session metadata persistence before dispatch so first-turn native slash runs do not race session-origin metadata updates. (#23065) thanks @hydro13. - Agents/Ollama: preserve unsafe integer tool-call arguments as exact strings during NDJSON parsing, preventing large numeric IDs from being rounded before tool execution. (#23170) Thanks @BestJoester. diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index ff12faaa217..3e4b43ce2f7 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -28,6 +28,7 @@ const { initSpy, runSpy, loadConfig } = vi.hoisted(() => ({ runSpy: vi.fn(() => ({ task: () => Promise.resolve(), stop: vi.fn(), + isRunning: () => false, })), loadConfig: vi.fn(() => ({ agents: { defaults: { maxConcurrent: 2 } }, @@ -35,6 +36,25 @@ const { initSpy, runSpy, loadConfig } = vi.hoisted(() => ({ })), })); +const { registerUnhandledRejectionHandlerMock, emitUnhandledRejection, resetUnhandledRejection } = + vi.hoisted(() => { + let handler: ((reason: unknown) => boolean) | undefined; + return { + registerUnhandledRejectionHandlerMock: vi.fn((next: (reason: unknown) => boolean) => { + handler = next; + return () => { + if (handler === next) { + handler = undefined; + } + }; + }), + emitUnhandledRejection: (reason: unknown) => handler?.(reason) ?? false, + resetUnhandledRejection: () => { + handler = undefined; + }, + }; + }); + const { computeBackoff, sleepWithAbort } = vi.hoisted(() => ({ computeBackoff: vi.fn(() => 0), sleepWithAbort: vi.fn(async () => undefined), @@ -87,6 +107,10 @@ vi.mock("../infra/backoff.js", () => ({ sleepWithAbort, })); +vi.mock("../infra/unhandled-rejections.js", () => ({ + registerUnhandledRejectionHandler: registerUnhandledRejectionHandlerMock, +})); + vi.mock("./webhook.js", () => ({ startTelegramWebhook: startTelegramWebhookSpy, })); @@ -108,6 +132,8 @@ describe("monitorTelegramProvider (grammY)", () => { computeBackoff.mockClear(); sleepWithAbort.mockClear(); startTelegramWebhookSpy.mockClear(); + registerUnhandledRejectionHandlerMock.mockClear(); + resetUnhandledRejection(); }); it("processes a DM and sends reply", async () => { @@ -201,6 +227,41 @@ describe("monitorTelegramProvider (grammY)", () => { await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token"); }); + it("force-restarts polling when unhandled network rejection stalls runner", async () => { + let running = true; + let releaseTask: (() => void) | undefined; + const stop = vi.fn(async () => { + running = false; + releaseTask?.(); + }); + + runSpy + .mockImplementationOnce(() => ({ + task: () => + new Promise((resolve) => { + releaseTask = resolve; + }), + stop, + isRunning: () => running, + })) + .mockImplementationOnce(() => ({ + task: () => Promise.resolve(), + stop: vi.fn(), + isRunning: () => false, + })); + + const monitor = monitorTelegramProvider({ token: "tok" }); + await vi.waitFor(() => expect(runSpy).toHaveBeenCalledTimes(1)); + + expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true); + await monitor; + + expect(stop).toHaveBeenCalledTimes(1); + expect(computeBackoff).toHaveBeenCalled(); + expect(sleepWithAbort).toHaveBeenCalled(); + expect(runSpy).toHaveBeenCalledTimes(2); + }); + it("passes configured webhookHost to webhook listener", async () => { await monitorTelegramProvider({ token: "tok", diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 05629e4ace1..86cd6115de2 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -90,16 +90,29 @@ const isGrammyHttpError = (err: unknown): boolean => { export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const log = opts.runtime?.error ?? console.error; + let activeRunner: ReturnType | undefined; + let forceRestarted = false; // Register handler for Grammy HttpError unhandled rejections. // This catches network errors that escape the polling loop's try-catch // (e.g., from setMyCommands during bot setup). // We gate on isGrammyHttpError to avoid suppressing non-Telegram errors. const unregisterHandler = registerUnhandledRejectionHandler((err) => { - if (isGrammyHttpError(err) && isRecoverableTelegramNetworkError(err, { context: "polling" })) { + const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" }); + if (isGrammyHttpError(err) && isNetworkError) { log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`); return true; // handled - don't crash } + // Network failures can surface outside the runner task promise and leave + // polling stuck; force-stop the active runner so the loop can recover. + if (isNetworkError && activeRunner && activeRunner.isRunning()) { + forceRestarted = true; + void activeRunner.stop().catch(() => {}); + log( + `[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`, + ); + return true; // handled + } return false; }); @@ -173,6 +186,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { while (!opts.abortSignal?.aborted) { const runner = run(bot, createTelegramRunnerOptions(cfg)); + activeRunner = runner; const stopOnAbort = () => { if (opts.abortSignal?.aborted) { void runner.stop(); @@ -182,8 +196,19 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { try { // runner.task() returns a promise that resolves when the runner stops await runner.task(); - return; + if (!forceRestarted) { + return; + } + forceRestarted = false; + restartAttempts += 1; + const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); + log( + `Telegram polling runner restarted after unhandled network error; retrying in ${formatDurationPrecise(delayMs)}.`, + ); + await sleepWithAbort(delayMs, opts.abortSignal); + continue; } catch (err) { + forceRestarted = false; if (opts.abortSignal?.aborted) { throw err; }