From 81384daeb4c3bb519b9189ef40966bc67e62649e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Feb 2026 17:47:12 +0100 Subject: [PATCH] fix(telegram): harden polling retry setup and teardown order Co-authored-by: Cklee <99405438+liebertar@users.noreply.github.com> Co-authored-by: Ho Lim <166576253+HOYALIM@users.noreply.github.com> --- CHANGELOG.md | 1 + src/telegram/monitor.test.ts | 67 +++++++++++++++++++++++++++++++++++- src/telegram/monitor.ts | 56 +++++++++++++++++++++++------- 3 files changed, 110 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 132d62ee495..6bcec28cefd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Telegram/Webhook: keep webhook monitors alive until gateway abort signals fire, preventing false channel exits and immediate webhook auto-restart loops. +- Telegram/Polling: retry recoverable setup-time network failures in monitor startup and await runner teardown before retry to avoid overlapping polling sessions. - Signal/RPC: guard malformed Signal RPC JSON responses with a clear status-scoped error and add regression coverage for invalid JSON responses. (#22995) Thanks @adhitShet. - Gateway/Subagents: guard gateway and subagent session-key/message trim paths against undefined inputs to prevent early `Cannot read properties of undefined (reading 'trim')` crashes during subagent spawn and wait flows. - Agents/Workspace: guard `resolveUserPath` against undefined/null input to prevent `Cannot read properties of undefined (reading 'trim')` crashes when workspace paths are missing in embedded runner flows. diff --git a/src/telegram/monitor.test.ts b/src/telegram/monitor.test.ts index ac0e834b32b..299b91bc415 100644 --- a/src/telegram/monitor.test.ts +++ b/src/telegram/monitor.test.ts @@ -55,6 +55,10 @@ const { registerUnhandledRejectionHandlerMock, emitUnhandledRejection, resetUnha }; }); +const { createTelegramBotErrors } = vi.hoisted(() => ({ + createTelegramBotErrors: [] as unknown[], +})); + const { computeBackoff, sleepWithAbort } = vi.hoisted(() => ({ computeBackoff: vi.fn(() => 0), sleepWithAbort: vi.fn(async () => undefined), @@ -73,6 +77,10 @@ vi.mock("../config/config.js", async (importOriginal) => { vi.mock("./bot.js", () => ({ createTelegramBot: () => { + const nextError = createTelegramBotErrors.shift(); + if (nextError) { + throw nextError; + } handlers.message = async (ctx: MockCtx) => { const chatId = ctx.message.chat.id; const isGroup = ctx.message.chat.type !== "private"; @@ -134,6 +142,7 @@ describe("monitorTelegramProvider (grammY)", () => { startTelegramWebhookSpy.mockClear(); registerUnhandledRejectionHandlerMock.mockClear(); resetUnhandledRejection(); + createTelegramBotErrors.length = 0; }); it("processes a DM and sends reply", async () => { @@ -218,6 +227,62 @@ describe("monitorTelegramProvider (grammY)", () => { expect(runSpy).toHaveBeenCalledTimes(2); }); + it("retries setup-time recoverable errors before starting polling", async () => { + const setupError = Object.assign(new TypeError("fetch failed"), { + cause: Object.assign(new Error("connect timeout"), { + code: "UND_ERR_CONNECT_TIMEOUT", + }), + }); + createTelegramBotErrors.push(setupError); + + runSpy.mockImplementationOnce(() => ({ + task: () => Promise.resolve(), + stop: vi.fn(), + isRunning: () => false, + })); + + await monitorTelegramProvider({ token: "tok" }); + + expect(computeBackoff).toHaveBeenCalled(); + expect(sleepWithAbort).toHaveBeenCalled(); + expect(runSpy).toHaveBeenCalledTimes(1); + }); + + it("awaits runner.stop before retrying after recoverable polling error", async () => { + const recoverableError = Object.assign(new TypeError("fetch failed"), { + cause: Object.assign(new Error("connect timeout"), { + code: "UND_ERR_CONNECT_TIMEOUT", + }), + }); + let firstStopped = false; + const firstStop = vi.fn(async () => { + await Promise.resolve(); + firstStopped = true; + }); + + runSpy + .mockImplementationOnce(() => ({ + task: () => Promise.reject(recoverableError), + stop: firstStop, + isRunning: () => false, + })) + .mockImplementationOnce(() => { + expect(firstStopped).toBe(true); + return { + task: () => Promise.resolve(), + stop: vi.fn(), + isRunning: () => false, + }; + }); + + await monitorTelegramProvider({ token: "tok" }); + + expect(firstStop).toHaveBeenCalled(); + expect(computeBackoff).toHaveBeenCalled(); + expect(sleepWithAbort).toHaveBeenCalled(); + expect(runSpy).toHaveBeenCalledTimes(2); + }); + it("surfaces non-recoverable errors", async () => { runSpy.mockImplementationOnce(() => ({ task: () => Promise.reject(new Error("bad token")), @@ -256,7 +321,7 @@ describe("monitorTelegramProvider (grammY)", () => { expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true); await monitor; - expect(stop).toHaveBeenCalledTimes(1); + expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1); expect(computeBackoff).toHaveBeenCalled(); expect(sleepWithAbort).toHaveBeenCalled(); expect(runSpy).toHaveBeenCalledTimes(2); diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index e90db5fde57..a720d1be647 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -152,18 +152,6 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } }; - const bot = createTelegramBot({ - token, - runtime: opts.runtime, - proxyFetch, - config: cfg, - accountId: account.accountId, - updateOffset: { - lastUpdateId, - onUpdateId: persistUpdateId, - }, - }); - if (opts.useWebhook) { await startTelegramWebhook({ token, @@ -192,9 +180,46 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { // Use grammyjs/runner for concurrent update processing let restartAttempts = 0; + const runnerOptions = createTelegramRunnerOptions(cfg); while (!opts.abortSignal?.aborted) { - const runner = run(bot, createTelegramRunnerOptions(cfg)); + let bot; + try { + bot = createTelegramBot({ + token, + runtime: opts.runtime, + proxyFetch, + config: cfg, + accountId: account.accountId, + updateOffset: { + lastUpdateId, + onUpdateId: persistUpdateId, + }, + }); + } catch (err) { + if (opts.abortSignal?.aborted) { + return; + } + if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) { + throw err; + } + restartAttempts += 1; + const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); + (opts.runtime?.error ?? console.error)( + `Telegram setup network error: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`, + ); + try { + await sleepWithAbort(delayMs, opts.abortSignal); + } catch (sleepErr) { + if (opts.abortSignal?.aborted) { + return; + } + throw sleepErr; + } + continue; + } + + const runner = run(bot, runnerOptions); activeRunner = runner; const stopOnAbort = () => { if (opts.abortSignal?.aborted) { @@ -243,6 +268,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { } } finally { opts.abortSignal?.removeEventListener("abort", stopOnAbort); + try { + await runner.stop(); + } catch { + // Runner may already be stopped by abort/retry paths. + } } } } finally {