From 1d301f74a6a9262e5ec36e4725dffd4fbb49a3c1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 9 Mar 2026 06:13:32 +0000 Subject: [PATCH] refactor: extract telegram polling session --- src/telegram/monitor.ts | 303 +++----------------------------- src/telegram/polling-session.ts | 283 +++++++++++++++++++++++++++++ 2 files changed, 307 insertions(+), 279 deletions(-) create mode 100644 src/telegram/polling-session.ts diff --git a/src/telegram/monitor.ts b/src/telegram/monitor.ts index 29be5e05fea..ed1e1a8744a 100644 --- a/src/telegram/monitor.ts +++ b/src/telegram/monitor.ts @@ -1,18 +1,15 @@ -import { type RunOptions, run } from "@grammyjs/runner"; +import type { RunOptions } from "@grammyjs/runner"; import { resolveAgentMaxConcurrent } from "../config/agent-limits.js"; import type { OpenClawConfig } from "../config/config.js"; import { loadConfig } from "../config/config.js"; import { waitForAbortSignal } from "../infra/abort-signal.js"; -import { computeBackoff, sleepWithAbort } from "../infra/backoff.js"; import { formatErrorMessage } from "../infra/errors.js"; -import { formatDurationPrecise } from "../infra/format-time/format-duration.ts"; import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js"; import type { RuntimeEnv } from "../runtime.js"; import { resolveTelegramAccount } from "./accounts.js"; import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; -import { withTelegramApiErrorLogging } from "./api-logging.js"; -import { createTelegramBot } from "./bot.js"; import { isRecoverableTelegramNetworkError } from "./network-errors.js"; +import { TelegramPollingSession } from "./polling-session.js"; import { makeProxyFetch } from "./proxy.js"; import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js"; import { startTelegramWebhook } from "./webhook.js"; @@ -55,21 +52,6 @@ export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions; - function normalizePersistedUpdateId(value: number | null): number | null { if (value === null) { return null; @@ -80,28 +62,6 @@ function normalizePersistedUpdateId(value: number | null): number | null { return value; } -const isGetUpdatesConflict = (err: unknown) => { - if (!err || typeof err !== "object") { - return false; - } - const typed = err as { - error_code?: number; - errorCode?: number; - description?: string; - method?: string; - message?: string; - }; - const errorCode = typed.error_code ?? typed.errorCode; - if (errorCode !== 409) { - return false; - } - const haystack = [typed.method, typed.description, typed.message] - .filter((value): value is string => typeof value === "string") - .join(" ") - .toLowerCase(); - return haystack.includes("getupdates"); -}; - /** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */ const isGrammyHttpError = (err: unknown): boolean => { if (!err || typeof err !== "object") { @@ -112,31 +72,26 @@ const isGrammyHttpError = (err: unknown): boolean => { export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { const log = opts.runtime?.error ?? console.error; - let activeRunner: ReturnType | undefined; - let activeFetchAbort: AbortController | undefined; - let forceRestarted = false; + let pollingSession: TelegramPollingSession | undefined; - // Register handler for Grammy HttpError unhandled rejections. - // This catches network errors that escape the polling loop's try-catch - // (e.g., from setMyCommands during bot setup). - // We gate on isGrammyHttpError to avoid suppressing non-Telegram errors. const unregisterHandler = registerUnhandledRejectionHandler((err) => { const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" }); if (isGrammyHttpError(err) && isNetworkError) { log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`); - return true; // handled - don't crash + return true; } - // Network failures can surface outside the runner task promise and leave - // polling stuck; force-stop the active runner so the loop can recover. + + const activeRunner = pollingSession?.activeRunner; if (isNetworkError && activeRunner && activeRunner.isRunning()) { - forceRestarted = true; - activeFetchAbort?.abort(); + pollingSession?.markForceRestarted(); + pollingSession?.abortActiveFetch(); void activeRunner.stop().catch(() => {}); log( `[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`, ); - return true; // handled + return true; } + return false; }); @@ -166,6 +121,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { `[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`, ); } + const persistUpdateId = async (updateId: number) => { const normalizedUpdateId = normalizePersistedUpdateId(updateId); if (normalizedUpdateId === null) { @@ -208,230 +164,19 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { return; } - // Use grammyjs/runner for concurrent update processing - let restartAttempts = 0; - let webhookCleared = false; - const runnerOptions = createTelegramRunnerOptions(cfg); - const waitBeforeRestart = async (buildLine: (delay: string) => string): Promise => { - restartAttempts += 1; - const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts); - const delay = formatDurationPrecise(delayMs); - log(buildLine(delay)); - try { - await sleepWithAbort(delayMs, opts.abortSignal); - } catch (sleepErr) { - if (opts.abortSignal?.aborted) { - return false; - } - throw sleepErr; - } - return true; - }; - - const waitBeforeRetryOnRecoverableSetupError = async ( - err: unknown, - logPrefix: string, - ): Promise => { - if (opts.abortSignal?.aborted) { - return false; - } - if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) { - throw err; - } - return waitBeforeRestart( - (delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`, - ); - }; - - const createPollingBot = async ( - fetchAbortController: AbortController, - ): Promise => { - try { - return createTelegramBot({ - token, - runtime: opts.runtime, - proxyFetch, - config: cfg, - accountId: account.accountId, - fetchAbortSignal: fetchAbortController.signal, - updateOffset: { - lastUpdateId, - onUpdateId: persistUpdateId, - }, - }); - } catch (err) { - const shouldRetry = await waitBeforeRetryOnRecoverableSetupError( - err, - "Telegram setup network error", - ); - if (!shouldRetry) { - return undefined; - } - return undefined; - } - }; - - const ensureWebhookCleanup = async (bot: TelegramBot): Promise<"ready" | "retry" | "exit"> => { - if (webhookCleared) { - return "ready"; - } - try { - await withTelegramApiErrorLogging({ - operation: "deleteWebhook", - runtime: opts.runtime, - fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }), - }); - webhookCleared = true; - return "ready"; - } catch (err) { - const shouldRetry = await waitBeforeRetryOnRecoverableSetupError( - err, - "Telegram webhook cleanup failed", - ); - return shouldRetry ? "retry" : "exit"; - } - }; - - const confirmPersistedOffset = async (bot: TelegramBot): Promise => { - if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) { - return; - } - try { - await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 }); - } catch { - // Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate. - } - }; - - const runPollingCycle = async ( - bot: TelegramBot, - fetchAbortController: AbortController, - ): Promise<"continue" | "exit"> => { - // Confirm the persisted offset with Telegram so the runner (which starts - // at offset 0) does not re-fetch already-processed updates on restart. - await confirmPersistedOffset(bot); - - // Track getUpdates calls to detect polling stalls. - let lastGetUpdatesAt = Date.now(); - bot.api.config.use((prev, method, payload, signal) => { - if (method === "getUpdates") { - lastGetUpdatesAt = Date.now(); - } - return prev(method, payload, signal); - }); - - const runner = run(bot, runnerOptions); - activeRunner = runner; - let stopPromise: Promise | undefined; - let stalledRestart = false; - const stopRunner = () => { - fetchAbortController.abort(); - stopPromise ??= Promise.resolve(runner.stop()) - .then(() => undefined) - .catch(() => { - // Runner may already be stopped by abort/retry paths. - }); - return stopPromise; - }; - const stopBot = () => { - return Promise.resolve(bot.stop()) - .then(() => undefined) - .catch(() => { - // Bot may already be stopped by runner stop/abort paths. - }); - }; - const stopOnAbort = () => { - if (opts.abortSignal?.aborted) { - void stopRunner(); - } - }; - - // Watchdog: detect when getUpdates calls have stalled and force-restart. - const watchdog = setInterval(() => { - if (opts.abortSignal?.aborted) { - return; - } - const elapsed = Date.now() - lastGetUpdatesAt; - if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) { - stalledRestart = true; - log( - `[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`, - ); - void stopRunner(); - } - }, POLL_WATCHDOG_INTERVAL_MS); - - opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); - try { - // runner.task() returns a promise that resolves when the runner stops - await runner.task(); - if (opts.abortSignal?.aborted) { - return "exit"; - } - const reason = stalledRestart - ? "polling stall detected" - : forceRestarted - ? "unhandled network error" - : "runner stopped (maxRetryTime exceeded or graceful stop)"; - forceRestarted = false; - const shouldRestart = await waitBeforeRestart( - (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, - ); - return shouldRestart ? "continue" : "exit"; - } catch (err) { - forceRestarted = false; - if (opts.abortSignal?.aborted) { - throw err; - } - const isConflict = isGetUpdatesConflict(err); - if (isConflict) { - webhookCleared = false; - } - const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); - if (!isConflict && !isRecoverable) { - throw err; - } - const reason = isConflict ? "getUpdates conflict" : "network error"; - const errMsg = formatErrorMessage(err); - const shouldRestart = await waitBeforeRestart( - (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, - ); - return shouldRestart ? "continue" : "exit"; - } finally { - clearInterval(watchdog); - opts.abortSignal?.removeEventListener("abort", stopOnAbort); - await stopRunner(); - await stopBot(); - if (activeFetchAbort === fetchAbortController) { - activeFetchAbort = undefined; - } - } - }; - - while (!opts.abortSignal?.aborted) { - const fetchAbortController = new AbortController(); - activeFetchAbort = fetchAbortController; - const bot = await createPollingBot(fetchAbortController); - if (!bot) { - if (activeFetchAbort === fetchAbortController) { - activeFetchAbort = undefined; - } - continue; - } - - const cleanupState = await ensureWebhookCleanup(bot); - if (cleanupState === "retry") { - continue; - } - if (cleanupState === "exit") { - return; - } - - const state = await runPollingCycle(bot, fetchAbortController); - if (state === "exit") { - return; - } - } + pollingSession = new TelegramPollingSession({ + token, + config: cfg, + accountId: account.accountId, + runtime: opts.runtime, + proxyFetch, + abortSignal: opts.abortSignal, + runnerOptions: createTelegramRunnerOptions(cfg), + getLastUpdateId: () => lastUpdateId, + persistUpdateId, + log, + }); + await pollingSession.runUntilAbort(); } finally { unregisterHandler(); } diff --git a/src/telegram/polling-session.ts b/src/telegram/polling-session.ts new file mode 100644 index 00000000000..784c8b2d759 --- /dev/null +++ b/src/telegram/polling-session.ts @@ -0,0 +1,283 @@ +import { type RunOptions, run } from "@grammyjs/runner"; +import { computeBackoff, sleepWithAbort } from "../infra/backoff.js"; +import { formatErrorMessage } from "../infra/errors.js"; +import { formatDurationPrecise } from "../infra/format-time/format-duration.ts"; +import { withTelegramApiErrorLogging } from "./api-logging.js"; +import { createTelegramBot } from "./bot.js"; +import { isRecoverableTelegramNetworkError } from "./network-errors.js"; + +const TELEGRAM_POLL_RESTART_POLICY = { + initialMs: 2000, + maxMs: 30_000, + factor: 1.8, + jitter: 0.25, +}; + +const POLL_STALL_THRESHOLD_MS = 90_000; +const POLL_WATCHDOG_INTERVAL_MS = 30_000; + +type TelegramBot = ReturnType; + +type TelegramPollingSessionOpts = { + token: string; + config: Parameters[0]["config"]; + accountId: string; + runtime: Parameters[0]["runtime"]; + proxyFetch: Parameters[0]["proxyFetch"]; + abortSignal?: AbortSignal; + runnerOptions: RunOptions; + getLastUpdateId: () => number | null; + persistUpdateId: (updateId: number) => Promise; + log: (line: string) => void; +}; + +export class TelegramPollingSession { + #restartAttempts = 0; + #webhookCleared = false; + #forceRestarted = false; + #activeRunner: ReturnType | undefined; + #activeFetchAbort: AbortController | undefined; + + constructor(private readonly opts: TelegramPollingSessionOpts) {} + + get activeRunner() { + return this.#activeRunner; + } + + markForceRestarted() { + this.#forceRestarted = true; + } + + abortActiveFetch() { + this.#activeFetchAbort?.abort(); + } + + async runUntilAbort(): Promise { + while (!this.opts.abortSignal?.aborted) { + const bot = await this.#createPollingBot(); + if (!bot) { + continue; + } + + const cleanupState = await this.#ensureWebhookCleanup(bot); + if (cleanupState === "retry") { + continue; + } + if (cleanupState === "exit") { + return; + } + + const state = await this.#runPollingCycle(bot); + if (state === "exit") { + return; + } + } + } + + async #waitBeforeRestart(buildLine: (delay: string) => string): Promise { + this.#restartAttempts += 1; + const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, this.#restartAttempts); + const delay = formatDurationPrecise(delayMs); + this.opts.log(buildLine(delay)); + try { + await sleepWithAbort(delayMs, this.opts.abortSignal); + } catch (sleepErr) { + if (this.opts.abortSignal?.aborted) { + return false; + } + throw sleepErr; + } + return true; + } + + async #waitBeforeRetryOnRecoverableSetupError(err: unknown, logPrefix: string): Promise { + if (this.opts.abortSignal?.aborted) { + return false; + } + if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) { + throw err; + } + return this.#waitBeforeRestart( + (delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`, + ); + } + + async #createPollingBot(): Promise { + const fetchAbortController = new AbortController(); + this.#activeFetchAbort = fetchAbortController; + try { + return createTelegramBot({ + token: this.opts.token, + runtime: this.opts.runtime, + proxyFetch: this.opts.proxyFetch, + config: this.opts.config, + accountId: this.opts.accountId, + fetchAbortSignal: fetchAbortController.signal, + updateOffset: { + lastUpdateId: this.opts.getLastUpdateId(), + onUpdateId: this.opts.persistUpdateId, + }, + }); + } catch (err) { + await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error"); + if (this.#activeFetchAbort === fetchAbortController) { + this.#activeFetchAbort = undefined; + } + return undefined; + } + } + + async #ensureWebhookCleanup(bot: TelegramBot): Promise<"ready" | "retry" | "exit"> { + if (this.#webhookCleared) { + return "ready"; + } + try { + await withTelegramApiErrorLogging({ + operation: "deleteWebhook", + runtime: this.opts.runtime, + fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }), + }); + this.#webhookCleared = true; + return "ready"; + } catch (err) { + const shouldRetry = await this.#waitBeforeRetryOnRecoverableSetupError( + err, + "Telegram webhook cleanup failed", + ); + return shouldRetry ? "retry" : "exit"; + } + } + + async #confirmPersistedOffset(bot: TelegramBot): Promise { + const lastUpdateId = this.opts.getLastUpdateId(); + if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) { + return; + } + try { + await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 }); + } catch { + // Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate. + } + } + + async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> { + await this.#confirmPersistedOffset(bot); + + let lastGetUpdatesAt = Date.now(); + bot.api.config.use((prev, method, payload, signal) => { + if (method === "getUpdates") { + lastGetUpdatesAt = Date.now(); + } + return prev(method, payload, signal); + }); + + const runner = run(bot, this.opts.runnerOptions); + this.#activeRunner = runner; + const fetchAbortController = this.#activeFetchAbort; + let stopPromise: Promise | undefined; + let stalledRestart = false; + const stopRunner = () => { + fetchAbortController?.abort(); + stopPromise ??= Promise.resolve(runner.stop()) + .then(() => undefined) + .catch(() => { + // Runner may already be stopped by abort/retry paths. + }); + return stopPromise; + }; + const stopBot = () => { + return Promise.resolve(bot.stop()) + .then(() => undefined) + .catch(() => { + // Bot may already be stopped by runner stop/abort paths. + }); + }; + const stopOnAbort = () => { + if (this.opts.abortSignal?.aborted) { + void stopRunner(); + } + }; + + const watchdog = setInterval(() => { + if (this.opts.abortSignal?.aborted) { + return; + } + const elapsed = Date.now() - lastGetUpdatesAt; + if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) { + stalledRestart = true; + this.opts.log( + `[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`, + ); + void stopRunner(); + } + }, POLL_WATCHDOG_INTERVAL_MS); + + this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); + try { + await runner.task(); + if (this.opts.abortSignal?.aborted) { + return "exit"; + } + const reason = stalledRestart + ? "polling stall detected" + : this.#forceRestarted + ? "unhandled network error" + : "runner stopped (maxRetryTime exceeded or graceful stop)"; + this.#forceRestarted = false; + const shouldRestart = await this.#waitBeforeRestart( + (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, + ); + return shouldRestart ? "continue" : "exit"; + } catch (err) { + this.#forceRestarted = false; + if (this.opts.abortSignal?.aborted) { + throw err; + } + const isConflict = isGetUpdatesConflict(err); + if (isConflict) { + this.#webhookCleared = false; + } + const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); + if (!isConflict && !isRecoverable) { + throw err; + } + const reason = isConflict ? "getUpdates conflict" : "network error"; + const errMsg = formatErrorMessage(err); + const shouldRestart = await this.#waitBeforeRestart( + (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, + ); + return shouldRestart ? "continue" : "exit"; + } finally { + clearInterval(watchdog); + this.opts.abortSignal?.removeEventListener("abort", stopOnAbort); + await stopRunner(); + await stopBot(); + this.#activeRunner = undefined; + if (this.#activeFetchAbort === fetchAbortController) { + this.#activeFetchAbort = undefined; + } + } + } +} + +const isGetUpdatesConflict = (err: unknown) => { + if (!err || typeof err !== "object") { + return false; + } + const typed = err as { + error_code?: number; + errorCode?: number; + description?: string; + method?: string; + message?: string; + }; + const errorCode = typed.error_code ?? typed.errorCode; + if (errorCode !== 409) { + return false; + } + const haystack = [typed.method, typed.description, typed.message] + .filter((value): value is string => typeof value === "string") + .join(" ") + .toLowerCase(); + return haystack.includes("getupdates"); +};