mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-08 19:50:44 +00:00
409 lines
14 KiB
TypeScript
409 lines
14 KiB
TypeScript
import { type RunOptions, run } from "@grammyjs/runner";
|
|
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
|
|
import {
|
|
computeBackoff,
|
|
formatDurationPrecise,
|
|
sleepWithAbort,
|
|
} from "openclaw/plugin-sdk/runtime-env";
|
|
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
|
|
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
|
|
import { withTelegramApiErrorLogging } from "./api-logging.js";
|
|
import { createTelegramBot } from "./bot.js";
|
|
import { type TelegramTransport } from "./fetch.js";
|
|
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
|
import { TelegramPollingLivenessTracker } from "./polling-liveness.js";
|
|
import { createTelegramPollingStatusPublisher } from "./polling-status.js";
|
|
import { TelegramPollingTransportState } from "./polling-transport-state.js";
|
|
|
|
const TELEGRAM_POLL_RESTART_POLICY = {
|
|
initialMs: 2000,
|
|
maxMs: 30_000,
|
|
factor: 1.8,
|
|
jitter: 0.25,
|
|
};
|
|
|
|
const DEFAULT_POLL_STALL_THRESHOLD_MS = 120_000;
|
|
const MIN_POLL_STALL_THRESHOLD_MS = 30_000;
|
|
const MAX_POLL_STALL_THRESHOLD_MS = 600_000;
|
|
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
|
|
const POLL_STOP_GRACE_MS = 15_000;
|
|
|
|
type TelegramBot = ReturnType<typeof createTelegramBot>;
|
|
|
|
const waitForGracefulStop = async (stop: () => Promise<void>) => {
|
|
let timer: ReturnType<typeof setTimeout> | undefined;
|
|
try {
|
|
await Promise.race([
|
|
stop(),
|
|
new Promise<void>((resolve) => {
|
|
timer = setTimeout(resolve, POLL_STOP_GRACE_MS);
|
|
timer.unref?.();
|
|
}),
|
|
]);
|
|
} finally {
|
|
if (timer) {
|
|
clearTimeout(timer);
|
|
}
|
|
}
|
|
};
|
|
|
|
const resolvePollingStallThresholdMs = (value: number | undefined): number => {
|
|
if (typeof value !== "number" || !Number.isFinite(value)) {
|
|
return DEFAULT_POLL_STALL_THRESHOLD_MS;
|
|
}
|
|
return Math.min(
|
|
MAX_POLL_STALL_THRESHOLD_MS,
|
|
Math.max(MIN_POLL_STALL_THRESHOLD_MS, Math.floor(value)),
|
|
);
|
|
};
|
|
|
|
type TelegramPollingSessionOpts = {
|
|
token: string;
|
|
config: Parameters<typeof createTelegramBot>[0]["config"];
|
|
accountId: string;
|
|
runtime: Parameters<typeof createTelegramBot>[0]["runtime"];
|
|
proxyFetch: Parameters<typeof createTelegramBot>[0]["proxyFetch"];
|
|
abortSignal?: AbortSignal;
|
|
runnerOptions: RunOptions<unknown>;
|
|
getLastUpdateId: () => number | null;
|
|
persistUpdateId: (updateId: number) => Promise<void>;
|
|
log: (line: string) => void;
|
|
/** Pre-resolved Telegram transport to reuse across bot instances */
|
|
telegramTransport?: TelegramTransport;
|
|
/** Rebuild Telegram transport after stall/network recovery when marked dirty. */
|
|
createTelegramTransport?: () => TelegramTransport;
|
|
/** Stall detection threshold in ms. Defaults to 120_000 (2 min). */
|
|
stallThresholdMs?: number;
|
|
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
|
|
};
|
|
|
|
export class TelegramPollingSession {
|
|
#restartAttempts = 0;
|
|
#webhookCleared = false;
|
|
#forceRestarted = false;
|
|
#activeRunner: ReturnType<typeof run> | undefined;
|
|
#activeFetchAbort: AbortController | undefined;
|
|
#transportState: TelegramPollingTransportState;
|
|
#status: ReturnType<typeof createTelegramPollingStatusPublisher>;
|
|
#stallThresholdMs: number;
|
|
|
|
constructor(private readonly opts: TelegramPollingSessionOpts) {
|
|
this.#transportState = new TelegramPollingTransportState({
|
|
log: opts.log,
|
|
initialTransport: opts.telegramTransport,
|
|
createTelegramTransport: opts.createTelegramTransport,
|
|
});
|
|
this.#status = createTelegramPollingStatusPublisher(opts.setStatus);
|
|
this.#stallThresholdMs = resolvePollingStallThresholdMs(opts.stallThresholdMs);
|
|
}
|
|
|
|
get activeRunner() {
|
|
return this.#activeRunner;
|
|
}
|
|
|
|
markForceRestarted() {
|
|
this.#forceRestarted = true;
|
|
}
|
|
|
|
markTransportDirty() {
|
|
this.#transportState.markDirty();
|
|
}
|
|
|
|
abortActiveFetch() {
|
|
this.#activeFetchAbort?.abort();
|
|
}
|
|
|
|
async runUntilAbort(): Promise<void> {
|
|
this.#status.notePollingStart();
|
|
try {
|
|
while (!this.opts.abortSignal?.aborted) {
|
|
const bot = await this.#createPollingBot();
|
|
if (!bot) {
|
|
continue;
|
|
}
|
|
|
|
const cleanupState = await this.#ensureWebhookCleanup(bot);
|
|
if (cleanupState === "retry") {
|
|
continue;
|
|
}
|
|
if (cleanupState === "exit") {
|
|
return;
|
|
}
|
|
|
|
const state = await this.#runPollingCycle(bot);
|
|
if (state === "exit") {
|
|
return;
|
|
}
|
|
}
|
|
} finally {
|
|
// Release the transport's dispatchers on session shutdown. Without
|
|
// this, the undici keep-alive sockets survive beyond the session and
|
|
// leak to api.telegram.org; see openclaw#68128.
|
|
await this.#transportState.dispose();
|
|
this.#status.notePollingStop();
|
|
}
|
|
}
|
|
|
|
async #waitBeforeRestart(buildLine: (delay: string) => string): Promise<boolean> {
|
|
this.#restartAttempts += 1;
|
|
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, this.#restartAttempts);
|
|
const delay = formatDurationPrecise(delayMs);
|
|
this.opts.log(buildLine(delay));
|
|
try {
|
|
await sleepWithAbort(delayMs, this.opts.abortSignal);
|
|
} catch (sleepErr) {
|
|
if (this.opts.abortSignal?.aborted) {
|
|
return false;
|
|
}
|
|
throw sleepErr;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
async #waitBeforeRetryOnRecoverableSetupError(err: unknown, logPrefix: string): Promise<boolean> {
|
|
if (this.opts.abortSignal?.aborted) {
|
|
return false;
|
|
}
|
|
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
|
|
throw err;
|
|
}
|
|
return this.#waitBeforeRestart(
|
|
(delay) => `${logPrefix}: ${formatErrorMessage(err)}; retrying in ${delay}.`,
|
|
);
|
|
}
|
|
|
|
async #createPollingBot(): Promise<TelegramBot | undefined> {
|
|
const fetchAbortController = new AbortController();
|
|
this.#activeFetchAbort = fetchAbortController;
|
|
const telegramTransport = this.#transportState.acquireForNextCycle();
|
|
try {
|
|
return createTelegramBot({
|
|
token: this.opts.token,
|
|
runtime: this.opts.runtime,
|
|
proxyFetch: this.opts.proxyFetch,
|
|
config: this.opts.config,
|
|
accountId: this.opts.accountId,
|
|
fetchAbortSignal: fetchAbortController.signal,
|
|
updateOffset: {
|
|
lastUpdateId: this.opts.getLastUpdateId(),
|
|
onUpdateId: this.opts.persistUpdateId,
|
|
},
|
|
telegramTransport,
|
|
});
|
|
} catch (err) {
|
|
await this.#waitBeforeRetryOnRecoverableSetupError(err, "Telegram setup network error");
|
|
if (this.#activeFetchAbort === fetchAbortController) {
|
|
this.#activeFetchAbort = undefined;
|
|
}
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
async #ensureWebhookCleanup(bot: TelegramBot): Promise<"ready" | "retry" | "exit"> {
|
|
if (this.#webhookCleared) {
|
|
return "ready";
|
|
}
|
|
try {
|
|
await withTelegramApiErrorLogging({
|
|
operation: "deleteWebhook",
|
|
runtime: this.opts.runtime,
|
|
fn: () => bot.api.deleteWebhook({ drop_pending_updates: false }),
|
|
});
|
|
this.#webhookCleared = true;
|
|
return "ready";
|
|
} catch (err) {
|
|
const shouldRetry = await this.#waitBeforeRetryOnRecoverableSetupError(
|
|
err,
|
|
"Telegram webhook cleanup failed",
|
|
);
|
|
return shouldRetry ? "retry" : "exit";
|
|
}
|
|
}
|
|
|
|
async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
|
|
const liveness = new TelegramPollingLivenessTracker({
|
|
onPollSuccess: (finishedAt) => this.#status.notePollSuccess(finishedAt),
|
|
});
|
|
bot.api.config.use(async (prev, method, payload, signal) => {
|
|
if (method !== "getUpdates") {
|
|
const callId = liveness.noteApiCallStarted();
|
|
try {
|
|
const result = await prev(method, payload, signal);
|
|
liveness.noteApiCallSuccess();
|
|
return result;
|
|
} finally {
|
|
liveness.noteApiCallFinished(callId);
|
|
}
|
|
}
|
|
|
|
liveness.noteGetUpdatesStarted(payload);
|
|
try {
|
|
const result = await prev(method, payload, signal);
|
|
liveness.noteGetUpdatesSuccess(result);
|
|
return result;
|
|
} catch (err) {
|
|
liveness.noteGetUpdatesError(err);
|
|
throw err;
|
|
} finally {
|
|
liveness.noteGetUpdatesFinished();
|
|
}
|
|
});
|
|
|
|
const runner = run(bot, this.opts.runnerOptions);
|
|
this.#activeRunner = runner;
|
|
const fetchAbortController = this.#activeFetchAbort;
|
|
const abortFetch = () => {
|
|
fetchAbortController?.abort();
|
|
};
|
|
|
|
if (this.opts.abortSignal && fetchAbortController) {
|
|
this.opts.abortSignal.addEventListener("abort", abortFetch, { once: true });
|
|
}
|
|
let stopPromise: Promise<void> | undefined;
|
|
let stalledRestart = false;
|
|
let forceCycleTimer: ReturnType<typeof setTimeout> | undefined;
|
|
let forceCycleResolve: (() => void) | undefined;
|
|
const forceCyclePromise = new Promise<void>((resolve) => {
|
|
forceCycleResolve = resolve;
|
|
});
|
|
const stopRunner = () => {
|
|
fetchAbortController?.abort();
|
|
stopPromise ??= Promise.resolve(runner.stop())
|
|
.then(() => undefined)
|
|
.catch(() => {
|
|
// Runner may already be stopped by abort/retry paths.
|
|
});
|
|
return stopPromise;
|
|
};
|
|
const stopBot = () => {
|
|
return Promise.resolve(bot.stop())
|
|
.then(() => undefined)
|
|
.catch(() => {
|
|
// Bot may already be stopped by runner stop/abort paths.
|
|
});
|
|
};
|
|
const stopOnAbort = () => {
|
|
if (this.opts.abortSignal?.aborted) {
|
|
void stopRunner();
|
|
}
|
|
};
|
|
|
|
const watchdog = setInterval(() => {
|
|
if (this.opts.abortSignal?.aborted) {
|
|
return;
|
|
}
|
|
|
|
const stall = liveness.detectStall({
|
|
thresholdMs: this.#stallThresholdMs,
|
|
});
|
|
if (stall) {
|
|
this.#transportState.markDirty();
|
|
stalledRestart = true;
|
|
this.opts.log(`[telegram] ${stall.message}`);
|
|
void stopRunner();
|
|
void stopBot();
|
|
if (!forceCycleTimer) {
|
|
forceCycleTimer = setTimeout(() => {
|
|
if (this.opts.abortSignal?.aborted) {
|
|
return;
|
|
}
|
|
this.opts.log(
|
|
`[telegram] Polling runner stop timed out after ${formatDurationPrecise(POLL_STOP_GRACE_MS)}; forcing restart cycle.`,
|
|
);
|
|
forceCycleResolve?.();
|
|
}, POLL_STOP_GRACE_MS);
|
|
}
|
|
}
|
|
}, POLL_WATCHDOG_INTERVAL_MS);
|
|
|
|
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
|
|
try {
|
|
await Promise.race([runner.task(), forceCyclePromise]);
|
|
if (this.opts.abortSignal?.aborted) {
|
|
return "exit";
|
|
}
|
|
const reason = stalledRestart
|
|
? "polling stall detected"
|
|
: this.#forceRestarted
|
|
? "unhandled network error"
|
|
: "runner stopped (maxRetryTime exceeded or graceful stop)";
|
|
this.#forceRestarted = false;
|
|
this.opts.log(
|
|
`[telegram][diag] polling cycle finished reason=${reason} ${liveness.formatDiagnosticFields("error")}`,
|
|
);
|
|
const shouldRestart = await this.#waitBeforeRestart(
|
|
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
|
|
);
|
|
return shouldRestart ? "continue" : "exit";
|
|
} catch (err) {
|
|
this.#forceRestarted = false;
|
|
if (this.opts.abortSignal?.aborted) {
|
|
throw err;
|
|
}
|
|
const isConflict = isGetUpdatesConflict(err);
|
|
if (isConflict) {
|
|
this.#webhookCleared = false;
|
|
}
|
|
const isRecoverable = isRecoverableTelegramNetworkError(err, { context: "polling" });
|
|
// Mark transport dirty on 409 conflict as well as recoverable network
|
|
// errors. Without this, Telegram-side session termination returns 409
|
|
// and the retry reuses the same HTTP keep-alive TCP socket, which
|
|
// Telegram treats as the "old" session and keeps terminating — producing
|
|
// a tight 409 retry loop at low but non-zero rate. (#69787)
|
|
if (isRecoverable || isConflict) {
|
|
this.#transportState.markDirty();
|
|
}
|
|
if (!isConflict && !isRecoverable) {
|
|
throw err;
|
|
}
|
|
const reason = isConflict ? "getUpdates conflict" : "network error";
|
|
const errMsg = formatErrorMessage(err);
|
|
const conflictHint = isConflict
|
|
? " Another OpenClaw gateway, script, or Telegram poller may be using this bot token; stop the duplicate poller or switch this account to webhook mode."
|
|
: "";
|
|
this.opts.log(
|
|
`[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}${conflictHint}`,
|
|
);
|
|
const shouldRestart = await this.#waitBeforeRestart(
|
|
(delay) => `Telegram ${reason}: ${errMsg};${conflictHint} retrying in ${delay}.`,
|
|
);
|
|
return shouldRestart ? "continue" : "exit";
|
|
} finally {
|
|
clearInterval(watchdog);
|
|
if (forceCycleTimer) {
|
|
clearTimeout(forceCycleTimer);
|
|
}
|
|
this.opts.abortSignal?.removeEventListener("abort", abortFetch);
|
|
this.opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
|
await waitForGracefulStop(stopRunner);
|
|
await waitForGracefulStop(stopBot);
|
|
this.#activeRunner = undefined;
|
|
if (this.#activeFetchAbort === fetchAbortController) {
|
|
this.#activeFetchAbort = undefined;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const isGetUpdatesConflict = (err: unknown) => {
|
|
if (!err || typeof err !== "object") {
|
|
return false;
|
|
}
|
|
const typed = err as {
|
|
error_code?: number;
|
|
errorCode?: number;
|
|
description?: string;
|
|
method?: string;
|
|
message?: string;
|
|
};
|
|
const errorCode = typed.error_code ?? typed.errorCode;
|
|
if (errorCode !== 409) {
|
|
return false;
|
|
}
|
|
const haystack = [typed.method, typed.description, typed.message]
|
|
.filter((value): value is string => typeof value === "string")
|
|
.join(" ");
|
|
const normalizedHaystack = normalizeLowercaseStringOrEmpty(haystack);
|
|
return normalizedHaystack.includes("getupdates");
|
|
};
|