import { type RunOptions, run } from "@grammyjs/runner"; import { computeBackoff, sleepWithAbort } from "../../../src/infra/backoff.js"; import { formatErrorMessage } from "../../../src/infra/errors.js"; import { formatDurationPrecise } from "../../../src/infra/format-time/format-duration.ts"; import { withTelegramApiErrorLogging } from "./api-logging.js"; import { createTelegramBot } from "./bot.js"; import { isRecoverableTelegramNetworkError } from "./network-errors.js"; const TELEGRAM_POLL_RESTART_POLICY = { initialMs: 2000, maxMs: 30_000, factor: 1.8, jitter: 0.25, }; const POLL_STALL_THRESHOLD_MS = 90_000; const POLL_WATCHDOG_INTERVAL_MS = 30_000; const POLL_STOP_GRACE_MS = 15_000; const waitForGracefulStop = async (stop: () => Promise) => { let timer: ReturnType | undefined; try { await Promise.race([ stop(), new Promise((resolve) => { timer = setTimeout(resolve, POLL_STOP_GRACE_MS); timer.unref?.(); }), ]); } finally { if (timer) { clearTimeout(timer); } } }; type TelegramBot = ReturnType; type TelegramPollingSessionOpts = { token: string; config: Parameters[0]["config"]; accountId: string; runtime: Parameters[0]["runtime"]; proxyFetch: Parameters[0]["proxyFetch"]; abortSignal?: AbortSignal; runnerOptions: RunOptions; getLastUpdateId: () => number | null; persistUpdateId: (updateId: number) => Promise; log: (line: string) => void; }; export class TelegramPollingSession { #restartAttempts = 0; #webhookCleared = false; #forceRestarted = false; #activeRunner: ReturnType | undefined; #activeFetchAbort: AbortController | undefined; constructor(private readonly opts: TelegramPollingSessionOpts) {} get activeRunner() { return this.#activeRunner; } markForceRestarted() { this.#forceRestarted = true; } abortActiveFetch() { this.#activeFetchAbort?.abort(); } async runUntilAbort(): Promise { while (!this.opts.abortSignal?.aborted) { const bot = await this.#createPollingBot(); if (!bot) { continue; } const cleanupState = await this.#ensureWebhookCleanup(bot); if (cleanupState === "retry") { continue; } if (cleanupState === "exit") { return; } const state = await this.#runPollingCycle(bot); if (state === "exit") { return; } } } async #waitBeforeRestart(buildLine: (delay: string) => string): Promise { this.#restartAttempts += 1; const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, this.#restartAttempts); const delay = formatDurationPrecise(delayMs); this.opts.log(buildLine(delay)); try { await sleepWithAbort(delayMs, this.opts.abortSignal); } catch (sleepErr) { if (this.opts.abortSignal?.aborted) { return false; } throw sleepErr; } return true; } async #waitBeforeRetryOnRecoverableSetupError(err: unknown, logPrefix: string): Promise { if (this.opts.abortSignal?.aborted) { return false; } if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) { throw err; } return this.#waitBeforeRestart( (delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`, ); } async #createPollingBot(): Promise { const fetchAbortController = new AbortController(); this.#activeFetchAbort = fetchAbortController; try { return createTelegramBot({ token: this.opts.token, runtime: this.opts.runtime, proxyFetch: this.opts.proxyFetch, config: this.opts.config, accountId: this.opts.accountId, fetchAbortSignal: fetchAbortController.signal, updateOffset: { lastUpdateId: this.opts.getLastUpdateId(), onUpdateId: this.opts.persistUpdateId, }, }); } catch (err) { await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error"); if (this.#activeFetchAbort === fetchAbortController) { this.#activeFetchAbort = undefined; } return undefined; } } async #ensureWebhookCleanup(bot: TelegramBot): Promise<"ready" | "retry" | "exit"> { if (this.#webhookCleared) { return "ready"; } try { await withTelegramApiErrorLogging({ operation: "deleteWebhook", runtime: this.opts.runtime, fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }), }); this.#webhookCleared = true; return "ready"; } catch (err) { const shouldRetry = await this.#waitBeforeRetryOnRecoverableSetupError( err, "Telegram webhook cleanup failed", ); return shouldRetry ? "retry" : "exit"; } } async #confirmPersistedOffset(bot: TelegramBot): Promise { const lastUpdateId = this.opts.getLastUpdateId(); if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) { return; } try { await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 }); } catch { // Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate. } } async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> { await this.#confirmPersistedOffset(bot); let lastGetUpdatesAt = Date.now(); bot.api.config.use((prev, method, payload, signal) => { if (method === "getUpdates") { lastGetUpdatesAt = Date.now(); } return prev(method, payload, signal); }); const runner = run(bot, this.opts.runnerOptions); this.#activeRunner = runner; const fetchAbortController = this.#activeFetchAbort; let stopPromise: Promise | undefined; let stalledRestart = false; let forceCycleTimer: ReturnType | undefined; let forceCycleResolve: (() => void) | undefined; const forceCyclePromise = new Promise((resolve) => { forceCycleResolve = resolve; }); const stopRunner = () => { fetchAbortController?.abort(); stopPromise ??= Promise.resolve(runner.stop()) .then(() => undefined) .catch(() => { // Runner may already be stopped by abort/retry paths. }); return stopPromise; }; const stopBot = () => { return Promise.resolve(bot.stop()) .then(() => undefined) .catch(() => { // Bot may already be stopped by runner stop/abort paths. }); }; const stopOnAbort = () => { if (this.opts.abortSignal?.aborted) { void stopRunner(); } }; const watchdog = setInterval(() => { if (this.opts.abortSignal?.aborted) { return; } const elapsed = Date.now() - lastGetUpdatesAt; if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) { stalledRestart = true; this.opts.log( `[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`, ); void stopRunner(); void stopBot(); if (!forceCycleTimer) { forceCycleTimer = setTimeout(() => { if (this.opts.abortSignal?.aborted) { return; } this.opts.log( `[telegram] Polling runner stop timed out after ${formatDurationPrecise(POLL_STOP_GRACE_MS)}; forcing restart cycle.`, ); forceCycleResolve?.(); }, POLL_STOP_GRACE_MS); } } }, POLL_WATCHDOG_INTERVAL_MS); this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); try { await Promise.race([runner.task(), forceCyclePromise]); if (this.opts.abortSignal?.aborted) { return "exit"; } const reason = stalledRestart ? "polling stall detected" : this.#forceRestarted ? "unhandled network error" : "runner stopped (maxRetryTime exceeded or graceful stop)"; this.#forceRestarted = false; const shouldRestart = await this.#waitBeforeRestart( (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, ); return shouldRestart ? "continue" : "exit"; } catch (err) { this.#forceRestarted = false; if (this.opts.abortSignal?.aborted) { throw err; } const isConflict = isGetUpdatesConflict(err); if (isConflict) { this.#webhookCleared = false; } const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" }); if (!isConflict && !isRecoverable) { throw err; } const reason = isConflict ? "getUpdates conflict" : "network error"; const errMsg = formatErrorMessage(err); const shouldRestart = await this.#waitBeforeRestart( (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, ); return shouldRestart ? "continue" : "exit"; } finally { clearInterval(watchdog); if (forceCycleTimer) { clearTimeout(forceCycleTimer); } this.opts.abortSignal?.removeEventListener("abort", stopOnAbort); await waitForGracefulStop(stopRunner); await waitForGracefulStop(stopBot); this.#activeRunner = undefined; if (this.#activeFetchAbort === fetchAbortController) { this.#activeFetchAbort = undefined; } } } } const isGetUpdatesConflict = (err: unknown) => { if (!err || typeof err !== "object") { return false; } const typed = err as { error_code?: number; errorCode?: number; description?: string; method?: string; message?: string; }; const errorCode = typed.error_code ?? typed.errorCode; if (errorCode !== 409) { return false; } const haystack = [typed.method, typed.description, typed.message] .filter((value): value is string => typeof value === "string") .join(" ") .toLowerCase(); return haystack.includes("getupdates"); };