refactor: extract telegram polling session

This commit is contained in:
Peter Steinberger
2026-03-09 06:13:32 +00:00
parent 2e79d82198
commit 1d301f74a6
2 changed files with 307 additions and 279 deletions

View File

@@ -1,18 +1,15 @@
import { type RunOptions, run } from "@grammyjs/runner";
import type { RunOptions } from "@grammyjs/runner";
import { resolveAgentMaxConcurrent } from "../config/agent-limits.js";
import type { OpenClawConfig } from "../config/config.js";
import { loadConfig } from "../config/config.js";
import { waitForAbortSignal } from "../infra/abort-signal.js";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js";
import { formatDurationPrecise } from "../infra/format-time/format-duration.ts";
import { registerUnhandledRejectionHandler } from "../infra/unhandled-rejections.js";
import type { RuntimeEnv } from "../runtime.js";
import { resolveTelegramAccount } from "./accounts.js";
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { createTelegramBot } from "./bot.js";
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
import { TelegramPollingSession } from "./polling-session.js";
import { makeProxyFetch } from "./proxy.js";
import { readTelegramUpdateOffset, writeTelegramUpdateOffset } from "./update-offset-store.js";
import { startTelegramWebhook } from "./webhook.js";
@@ -55,21 +52,6 @@ export function createTelegramRunnerOptions(cfg: OpenClawConfig): RunOptions<unk
};
}
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs: 2000,
maxMs: 30_000,
factor: 1.8,
jitter: 0.25,
};
// Polling stall detection: if no getUpdates call is seen for this long,
// assume the runner is stuck and force-restart it.
// Default fetch timeout is 30s, so 3x gives ample margin for slow responses.
const POLL_STALL_THRESHOLD_MS = 90_000;
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
type TelegramBot = ReturnType<typeof createTelegramBot>;
function normalizePersistedUpdateId(value: number | null): number | null {
if (value === null) {
return null;
@@ -80,28 +62,6 @@ function normalizePersistedUpdateId(value: number | null): number | null {
return value;
}
const isGetUpdatesConflict = (err: unknown) => {
if (!err || typeof err !== "object") {
return false;
}
const typed = err as {
error_code?: number;
errorCode?: number;
description?: string;
method?: string;
message?: string;
};
const errorCode = typed.error_code ?? typed.errorCode;
if (errorCode !== 409) {
return false;
}
const haystack = [typed.method, typed.description, typed.message]
.filter((value): value is string => typeof value === "string")
.join(" ")
.toLowerCase();
return haystack.includes("getupdates");
};
/** Check if error is a Grammy HttpError (used to scope unhandled rejection handling) */
const isGrammyHttpError = (err: unknown): boolean => {
if (!err || typeof err !== "object") {
@@ -112,31 +72,26 @@ const isGrammyHttpError = (err: unknown): boolean => {
export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const log = opts.runtime?.error ?? console.error;
let activeRunner: ReturnType<typeof run> | undefined;
let activeFetchAbort: AbortController | undefined;
let forceRestarted = false;
let pollingSession: TelegramPollingSession | undefined;
// Register handler for Grammy HttpError unhandled rejections.
// This catches network errors that escape the polling loop's try-catch
// (e.g., from setMyCommands during bot setup).
// We gate on isGrammyHttpError to avoid suppressing non-Telegram errors.
const unregisterHandler = registerUnhandledRejectionHandler((err) => {
const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" });
if (isGrammyHttpError(err) && isNetworkError) {
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
return true; // handled - don't crash
return true;
}
// Network failures can surface outside the runner task promise and leave
// polling stuck; force-stop the active runner so the loop can recover.
const activeRunner = pollingSession?.activeRunner;
if (isNetworkError && activeRunner && activeRunner.isRunning()) {
forceRestarted = true;
activeFetchAbort?.abort();
pollingSession?.markForceRestarted();
pollingSession?.abortActiveFetch();
void activeRunner.stop().catch(() => {});
log(
`[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`,
);
return true; // handled
return true;
}
return false;
});
@@ -166,6 +121,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
`[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`,
);
}
const persistUpdateId = async (updateId: number) => {
const normalizedUpdateId = normalizePersistedUpdateId(updateId);
if (normalizedUpdateId === null) {
@@ -208,230 +164,19 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
return;
}
// Use grammyjs/runner for concurrent update processing
let restartAttempts = 0;
let webhookCleared = false;
const runnerOptions = createTelegramRunnerOptions(cfg);
const waitBeforeRestart = async (buildLine: (delay: string) => string): Promise<boolean> => {
restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
const delay = formatDurationPrecise(delayMs);
log(buildLine(delay));
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch (sleepErr) {
if (opts.abortSignal?.aborted) {
return false;
}
throw sleepErr;
}
return true;
};
const waitBeforeRetryOnRecoverableSetupError = async (
err: unknown,
logPrefix: string,
): Promise<boolean> => {
if (opts.abortSignal?.aborted) {
return false;
}
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
throw err;
}
return waitBeforeRestart(
(delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`,
);
};
const createPollingBot = async (
fetchAbortController: AbortController,
): Promise<TelegramBot | undefined> => {
try {
return createTelegramBot({
token,
runtime: opts.runtime,
proxyFetch,
config: cfg,
accountId: account.accountId,
fetchAbortSignal: fetchAbortController.signal,
updateOffset: {
lastUpdateId,
onUpdateId: persistUpdateId,
},
});
} catch (err) {
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
err,
"Telegram setup network error",
);
if (!shouldRetry) {
return undefined;
}
return undefined;
}
};
const ensureWebhookCleanup = async (bot: TelegramBot): Promise<"ready" | "retry" | "exit"> => {
if (webhookCleared) {
return "ready";
}
try {
await withTelegramApiErrorLogging({
operation: "deleteWebhook",
runtime: opts.runtime,
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
});
webhookCleared = true;
return "ready";
} catch (err) {
const shouldRetry = await waitBeforeRetryOnRecoverableSetupError(
err,
"Telegram webhook cleanup failed",
);
return shouldRetry ? "retry" : "exit";
}
};
const confirmPersistedOffset = async (bot: TelegramBot): Promise<void> => {
if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) {
return;
}
try {
await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 });
} catch {
// Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate.
}
};
const runPollingCycle = async (
bot: TelegramBot,
fetchAbortController: AbortController,
): Promise<"continue" | "exit"> => {
// Confirm the persisted offset with Telegram so the runner (which starts
// at offset 0) does not re-fetch already-processed updates on restart.
await confirmPersistedOffset(bot);
// Track getUpdates calls to detect polling stalls.
let lastGetUpdatesAt = Date.now();
bot.api.config.use((prev, method, payload, signal) => {
if (method === "getUpdates") {
lastGetUpdatesAt = Date.now();
}
return prev(method, payload, signal);
});
const runner = run(bot, runnerOptions);
activeRunner = runner;
let stopPromise: Promise<void> | undefined;
let stalledRestart = false;
const stopRunner = () => {
fetchAbortController.abort();
stopPromise ??= Promise.resolve(runner.stop())
.then(() => undefined)
.catch(() => {
// Runner may already be stopped by abort/retry paths.
});
return stopPromise;
};
const stopBot = () => {
return Promise.resolve(bot.stop())
.then(() => undefined)
.catch(() => {
// Bot may already be stopped by runner stop/abort paths.
});
};
const stopOnAbort = () => {
if (opts.abortSignal?.aborted) {
void stopRunner();
}
};
// Watchdog: detect when getUpdates calls have stalled and force-restart.
const watchdog = setInterval(() => {
if (opts.abortSignal?.aborted) {
return;
}
const elapsed = Date.now() - lastGetUpdatesAt;
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
stalledRestart = true;
log(
`[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`,
);
void stopRunner();
}
}, POLL_WATCHDOG_INTERVAL_MS);
opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
// runner.task() returns a promise that resolves when the runner stops
await runner.task();
if (opts.abortSignal?.aborted) {
return "exit";
}
const reason = stalledRestart
? "polling stall detected"
: forceRestarted
? "unhandled network error"
: "runner stopped (maxRetryTime exceeded or graceful stop)";
forceRestarted = false;
const shouldRestart = await waitBeforeRestart(
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
);
return shouldRestart ? "continue" : "exit";
} catch (err) {
forceRestarted = false;
if (opts.abortSignal?.aborted) {
throw err;
}
const isConflict = isGetUpdatesConflict(err);
if (isConflict) {
webhookCleared = false;
}
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
if (!isConflict && !isRecoverable) {
throw err;
}
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
const shouldRestart = await waitBeforeRestart(
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
);
return shouldRestart ? "continue" : "exit";
} finally {
clearInterval(watchdog);
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
await stopRunner();
await stopBot();
if (activeFetchAbort === fetchAbortController) {
activeFetchAbort = undefined;
}
}
};
while (!opts.abortSignal?.aborted) {
const fetchAbortController = new AbortController();
activeFetchAbort = fetchAbortController;
const bot = await createPollingBot(fetchAbortController);
if (!bot) {
if (activeFetchAbort === fetchAbortController) {
activeFetchAbort = undefined;
}
continue;
}
const cleanupState = await ensureWebhookCleanup(bot);
if (cleanupState === "retry") {
continue;
}
if (cleanupState === "exit") {
return;
}
const state = await runPollingCycle(bot, fetchAbortController);
if (state === "exit") {
return;
}
}
pollingSession = new TelegramPollingSession({
token,
config: cfg,
accountId: account.accountId,
runtime: opts.runtime,
proxyFetch,
abortSignal: opts.abortSignal,
runnerOptions: createTelegramRunnerOptions(cfg),
getLastUpdateId: () => lastUpdateId,
persistUpdateId,
log,
});
await pollingSession.runUntilAbort();
} finally {
unregisterHandler();
}

View File

@@ -0,0 +1,283 @@
import { type RunOptions, run } from "@grammyjs/runner";
import { computeBackoff, sleepWithAbort } from "../infra/backoff.js";
import { formatErrorMessage } from "../infra/errors.js";
import { formatDurationPrecise } from "../infra/format-time/format-duration.ts";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { createTelegramBot } from "./bot.js";
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
const TELEGRAM_POLL_RESTART_POLICY = {
initialMs: 2000,
maxMs: 30_000,
factor: 1.8,
jitter: 0.25,
};
const POLL_STALL_THRESHOLD_MS = 90_000;
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
type TelegramBot = ReturnType<typeof createTelegramBot>;
type TelegramPollingSessionOpts = {
token: string;
config: Parameters<typeof createTelegramBot>[0]["config"];
accountId: string;
runtime: Parameters<typeof createTelegramBot>[0]["runtime"];
proxyFetch: Parameters<typeof createTelegramBot>[0]["proxyFetch"];
abortSignal?: AbortSignal;
runnerOptions: RunOptions<unknown>;
getLastUpdateId: () => number | null;
persistUpdateId: (updateId: number) => Promise<void>;
log: (line: string) => void;
};
export class TelegramPollingSession {
#restartAttempts = 0;
#webhookCleared = false;
#forceRestarted = false;
#activeRunner: ReturnType<typeof run> | undefined;
#activeFetchAbort: AbortController | undefined;
constructor(private readonly opts: TelegramPollingSessionOpts) {}
get activeRunner() {
return this.#activeRunner;
}
markForceRestarted() {
this.#forceRestarted = true;
}
abortActiveFetch() {
this.#activeFetchAbort?.abort();
}
async runUntilAbort(): Promise<void> {
while (!this.opts.abortSignal?.aborted) {
const bot = await this.#createPollingBot();
if (!bot) {
continue;
}
const cleanupState = await this.#ensureWebhookCleanup(bot);
if (cleanupState === "retry") {
continue;
}
if (cleanupState === "exit") {
return;
}
const state = await this.#runPollingCycle(bot);
if (state === "exit") {
return;
}
}
}
async #waitBeforeRestart(buildLine: (delay: string) => string): Promise<boolean> {
this.#restartAttempts += 1;
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, this.#restartAttempts);
const delay = formatDurationPrecise(delayMs);
this.opts.log(buildLine(delay));
try {
await sleepWithAbort(delayMs, this.opts.abortSignal);
} catch (sleepErr) {
if (this.opts.abortSignal?.aborted) {
return false;
}
throw sleepErr;
}
return true;
}
async #waitBeforeRetryOnRecoverableSetupError(err: unknown, logPrefix: string): Promise<boolean> {
if (this.opts.abortSignal?.aborted) {
return false;
}
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
throw err;
}
return this.#waitBeforeRestart(
(delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`,
);
}
async #createPollingBot(): Promise<TelegramBot | undefined> {
const fetchAbortController = new AbortController();
this.#activeFetchAbort = fetchAbortController;
try {
return createTelegramBot({
token: this.opts.token,
runtime: this.opts.runtime,
proxyFetch: this.opts.proxyFetch,
config: this.opts.config,
accountId: this.opts.accountId,
fetchAbortSignal: fetchAbortController.signal,
updateOffset: {
lastUpdateId: this.opts.getLastUpdateId(),
onUpdateId: this.opts.persistUpdateId,
},
});
} catch (err) {
await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error");
if (this.#activeFetchAbort === fetchAbortController) {
this.#activeFetchAbort = undefined;
}
return undefined;
}
}
async #ensureWebhookCleanup(bot: TelegramBot): Promise<"ready" | "retry" | "exit"> {
if (this.#webhookCleared) {
return "ready";
}
try {
await withTelegramApiErrorLogging({
operation: "deleteWebhook",
runtime: this.opts.runtime,
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
});
this.#webhookCleared = true;
return "ready";
} catch (err) {
const shouldRetry = await this.#waitBeforeRetryOnRecoverableSetupError(
err,
"Telegram webhook cleanup failed",
);
return shouldRetry ? "retry" : "exit";
}
}
async #confirmPersistedOffset(bot: TelegramBot): Promise<void> {
const lastUpdateId = this.opts.getLastUpdateId();
if (lastUpdateId === null || lastUpdateId >= Number.MAX_SAFE_INTEGER) {
return;
}
try {
await bot.api.getUpdates({ offset: lastUpdateId + 1, limit: 1, timeout: 0 });
} catch {
// Non-fatal: runner middleware still skips duplicates via shouldSkipUpdate.
}
}
async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
await this.#confirmPersistedOffset(bot);
let lastGetUpdatesAt = Date.now();
bot.api.config.use((prev, method, payload, signal) => {
if (method === "getUpdates") {
lastGetUpdatesAt = Date.now();
}
return prev(method, payload, signal);
});
const runner = run(bot, this.opts.runnerOptions);
this.#activeRunner = runner;
const fetchAbortController = this.#activeFetchAbort;
let stopPromise: Promise<void> | undefined;
let stalledRestart = false;
const stopRunner = () => {
fetchAbortController?.abort();
stopPromise ??= Promise.resolve(runner.stop())
.then(() => undefined)
.catch(() => {
// Runner may already be stopped by abort/retry paths.
});
return stopPromise;
};
const stopBot = () => {
return Promise.resolve(bot.stop())
.then(() => undefined)
.catch(() => {
// Bot may already be stopped by runner stop/abort paths.
});
};
const stopOnAbort = () => {
if (this.opts.abortSignal?.aborted) {
void stopRunner();
}
};
const watchdog = setInterval(() => {
if (this.opts.abortSignal?.aborted) {
return;
}
const elapsed = Date.now() - lastGetUpdatesAt;
if (elapsed > POLL_STALL_THRESHOLD_MS && runner.isRunning()) {
stalledRestart = true;
this.opts.log(
`[telegram] Polling stall detected (no getUpdates for ${formatDurationPrecise(elapsed)}); forcing restart.`,
);
void stopRunner();
}
}, POLL_WATCHDOG_INTERVAL_MS);
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
try {
await runner.task();
if (this.opts.abortSignal?.aborted) {
return "exit";
}
const reason = stalledRestart
? "polling stall detected"
: this.#forceRestarted
? "unhandled network error"
: "runner stopped (maxRetryTime exceeded or graceful stop)";
this.#forceRestarted = false;
const shouldRestart = await this.#waitBeforeRestart(
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
);
return shouldRestart ? "continue" : "exit";
} catch (err) {
this.#forceRestarted = false;
if (this.opts.abortSignal?.aborted) {
throw err;
}
const isConflict = isGetUpdatesConflict(err);
if (isConflict) {
this.#webhookCleared = false;
}
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
if (!isConflict && !isRecoverable) {
throw err;
}
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
const shouldRestart = await this.#waitBeforeRestart(
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
);
return shouldRestart ? "continue" : "exit";
} finally {
clearInterval(watchdog);
this.opts.abortSignal?.removeEventListener("abort", stopOnAbort);
await stopRunner();
await stopBot();
this.#activeRunner = undefined;
if (this.#activeFetchAbort === fetchAbortController) {
this.#activeFetchAbort = undefined;
}
}
}
}
const isGetUpdatesConflict = (err: unknown) => {
if (!err || typeof err !== "object") {
return false;
}
const typed = err as {
error_code?: number;
errorCode?: number;
description?: string;
method?: string;
message?: string;
};
const errorCode = typed.error_code ?? typed.errorCode;
if (errorCode !== 409) {
return false;
}
const haystack = [typed.method, typed.description, typed.message]
.filter((value): value is string => typeof value === "string")
.join(" ")
.toLowerCase();
return haystack.includes("getupdates");
};