diff --git a/extensions/telegram/src/polling-liveness.test.ts b/extensions/telegram/src/polling-liveness.test.ts new file mode 100644 index 00000000000..f41413838a2 --- /dev/null +++ b/extensions/telegram/src/polling-liveness.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, it, vi } from "vitest"; +import { TelegramPollingLivenessTracker } from "./polling-liveness.js"; + +const POLL_STALL_THRESHOLD_MS = 90_000; + +describe("TelegramPollingLivenessTracker", () => { + it("records successful getUpdates calls and publishes poll success time", () => { + const nowValues = [0, 0, 10, 25]; + const now = vi.fn(() => nowValues.shift() ?? 25); + const onPollSuccess = vi.fn(); + const tracker = new TelegramPollingLivenessTracker({ now, onPollSuccess }); + + tracker.noteGetUpdatesStarted({ offset: 42 }); + tracker.noteGetUpdatesSuccess([{ update_id: 1 }, { update_id: 2 }]); + tracker.noteGetUpdatesFinished(); + + expect(onPollSuccess).toHaveBeenCalledWith(25); + expect(tracker.formatDiagnosticFields("error")).toBe( + "inFlight=0 outcome=ok:2 startedAt=10 finishedAt=25 durationMs=15 offset=42", + ); + }); + + it("does not detect a polling stall while a recent non-polling API call is in flight", () => { + let now = 0; + const tracker = new TelegramPollingLivenessTracker({ now: () => now }); + + now = 60_000; + const callId = tracker.noteApiCallStarted(); + + now = 120_001; + expect( + tracker.detectStall({ + thresholdMs: POLL_STALL_THRESHOLD_MS, + runnerIsRunning: true, + }), + ).toBeNull(); + + tracker.noteApiCallFinished(callId); + }); + + it("detects and throttles stale polling diagnostics", () => { + let now = 0; + const tracker = new TelegramPollingLivenessTracker({ now: () => now }); + + now = 120_001; + const stall = tracker.detectStall({ + thresholdMs: POLL_STALL_THRESHOLD_MS, + runnerIsRunning: true, + }); + expect(stall?.message).toContain("Polling stall detected (no completed getUpdates"); + expect(stall?.message).toContain("inFlight=0 outcome=not-started"); + + now = 130_000; + expect( + tracker.detectStall({ + thresholdMs: POLL_STALL_THRESHOLD_MS, + runnerIsRunning: true, + }), + ).toBeNull(); + }); + + it("reports active stuck getUpdates calls", () => { + let now = 0; + const tracker = new TelegramPollingLivenessTracker({ now: () => now }); + + now = 1; + tracker.noteGetUpdatesStarted({ offset: 7 }); + + now = 120_001; + const stall = tracker.detectStall({ + thresholdMs: POLL_STALL_THRESHOLD_MS, + runnerIsRunning: true, + }); + + expect(stall?.message).toContain("active getUpdates stuck"); + expect(stall?.message).toContain("inFlight=1 outcome=started startedAt=1"); + expect(stall?.message).toContain("offset=7"); + + tracker.noteGetUpdatesSuccess([]); + tracker.noteGetUpdatesFinished(); + }); +}); diff --git a/extensions/telegram/src/polling-liveness.ts b/extensions/telegram/src/polling-liveness.ts new file mode 100644 index 00000000000..6fcd200f6fb --- /dev/null +++ b/extensions/telegram/src/polling-liveness.ts @@ -0,0 +1,159 @@ +import { formatDurationPrecise } from "openclaw/plugin-sdk/runtime-env"; +import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime"; + +type TelegramPollingLivenessTrackerOptions = { + now?: () => number; + onPollSuccess?: (finishedAt: number) => void; +}; + +type TelegramPollingStall = { + message: string; +}; + +export class TelegramPollingLivenessTracker { + #lastGetUpdatesAt: number; + #lastApiActivityAt: number; + #nextInFlightApiCallId = 0; + #latestInFlightApiStartedAt: number | null = null; + #inFlightApiStartedAt = new Map(); + #lastGetUpdatesStartedAt: number | null = null; + #lastGetUpdatesFinishedAt: number | null = null; + #lastGetUpdatesDurationMs: number | null = null; + #lastGetUpdatesOutcome = "not-started"; + #lastGetUpdatesError: string | null = null; + #lastGetUpdatesOffset: number | null = null; + #inFlightGetUpdates = 0; + #stallDiagLoggedAt = 0; + + constructor(private readonly options: TelegramPollingLivenessTrackerOptions = {}) { + this.#lastGetUpdatesAt = this.#now(); + this.#lastApiActivityAt = this.#now(); + } + + get inFlightGetUpdates() { + return this.#inFlightGetUpdates; + } + + noteApiCallStarted(): number { + const startedAt = this.#now(); + const callId = this.#nextInFlightApiCallId; + this.#nextInFlightApiCallId += 1; + this.#inFlightApiStartedAt.set(callId, startedAt); + this.#latestInFlightApiStartedAt = + this.#latestInFlightApiStartedAt == null + ? startedAt + : Math.max(this.#latestInFlightApiStartedAt, startedAt); + return callId; + } + + noteApiCallSuccess(at = this.#now()) { + this.#lastApiActivityAt = at; + } + + noteApiCallFinished(callId: number) { + const startedAt = this.#inFlightApiStartedAt.get(callId); + this.#inFlightApiStartedAt.delete(callId); + if (startedAt != null && this.#latestInFlightApiStartedAt === startedAt) { + this.#latestInFlightApiStartedAt = this.#resolveLatestInFlightApiStartedAt(); + } + } + + noteGetUpdatesStarted(payload: unknown, at = this.#now()) { + this.#lastGetUpdatesAt = at; + this.#lastGetUpdatesStartedAt = at; + this.#lastGetUpdatesOffset = resolveGetUpdatesOffset(payload); + this.#inFlightGetUpdates += 1; + this.#lastGetUpdatesOutcome = "started"; + this.#lastGetUpdatesError = null; + } + + noteGetUpdatesSuccess(result: unknown, at = this.#now()) { + this.#lastGetUpdatesFinishedAt = at; + this.#lastGetUpdatesDurationMs = + this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt; + this.#lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok"; + this.#lastApiActivityAt = at; + this.options.onPollSuccess?.(at); + } + + noteGetUpdatesError(err: unknown, at = this.#now()) { + this.#lastGetUpdatesFinishedAt = at; + this.#lastGetUpdatesDurationMs = + this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt; + this.#lastGetUpdatesOutcome = "error"; + this.#lastGetUpdatesError = formatErrorMessage(err); + this.#lastApiActivityAt = at; + } + + noteGetUpdatesFinished() { + this.#inFlightGetUpdates = Math.max(0, this.#inFlightGetUpdates - 1); + } + + detectStall(params: { + thresholdMs: number; + runnerIsRunning: boolean; + now?: number; + }): TelegramPollingStall | null { + if (!params.runnerIsRunning) { + return null; + } + const now = params.now ?? this.#now(); + const activeElapsed = + this.#inFlightGetUpdates > 0 && this.#lastGetUpdatesStartedAt != null + ? now - this.#lastGetUpdatesStartedAt + : 0; + const idleElapsed = + this.#inFlightGetUpdates > 0 + ? 0 + : now - (this.#lastGetUpdatesFinishedAt ?? this.#lastGetUpdatesAt); + const elapsed = this.#inFlightGetUpdates > 0 ? activeElapsed : idleElapsed; + const apiLivenessAt = + this.#latestInFlightApiStartedAt == null + ? this.#lastApiActivityAt + : Math.max(this.#lastApiActivityAt, this.#latestInFlightApiStartedAt); + const apiElapsed = now - apiLivenessAt; + + if (elapsed <= params.thresholdMs || apiElapsed <= params.thresholdMs) { + return null; + } + if (this.#stallDiagLoggedAt && now - this.#stallDiagLoggedAt < params.thresholdMs / 2) { + return null; + } + this.#stallDiagLoggedAt = now; + + const elapsedLabel = + this.#inFlightGetUpdates > 0 + ? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}` + : `no completed getUpdates for ${formatDurationPrecise(elapsed)}`; + return { + message: `Polling stall detected (${elapsedLabel}); forcing restart. [diag ${this.formatDiagnosticFields("error")}]`, + }; + } + + formatDiagnosticFields(errorLabel?: "error" | "lastGetUpdatesError"): string { + const error = + this.#lastGetUpdatesError && errorLabel ? ` ${errorLabel}=${this.#lastGetUpdatesError}` : ""; + return `inFlight=${this.#inFlightGetUpdates} outcome=${this.#lastGetUpdatesOutcome} startedAt=${this.#lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${this.#lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${this.#lastGetUpdatesDurationMs ?? "n/a"} offset=${this.#lastGetUpdatesOffset ?? "n/a"}${error}`; + } + + #resolveLatestInFlightApiStartedAt(): number | null { + let newestStartedAt: number | null = null; + for (const activeStartedAt of this.#inFlightApiStartedAt.values()) { + newestStartedAt = + newestStartedAt == null ? activeStartedAt : Math.max(newestStartedAt, activeStartedAt); + } + return newestStartedAt; + } + + #now(): number { + return this.options.now?.() ?? Date.now(); + } +} + +function resolveGetUpdatesOffset(payload: unknown): number | null { + if (!payload || typeof payload !== "object" || !("offset" in payload)) { + return null; + } + const offset = (payload as { offset?: unknown }).offset; + return typeof offset === "number" ? offset : null; +} diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index c89921dedb3..87d8595c5d1 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -1,6 +1,5 @@ import { type RunOptions, run } from "@grammyjs/runner"; import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; -import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; import { computeBackoff, formatDurationPrecise, @@ -12,6 +11,8 @@ import { withTelegramApiErrorLogging } from "./api-logging.js"; import { createTelegramBot } from "./bot.js"; import { type TelegramTransport } from "./fetch.js"; import { isRecoverableTelegramNetworkError } from "./network-errors.js"; +import { TelegramPollingLivenessTracker } from "./polling-liveness.js"; +import { createTelegramPollingStatusPublisher } from "./polling-status.js"; import { TelegramPollingTransportState } from "./polling-transport-state.js"; const TELEGRAM_POLL_RESTART_POLICY = { @@ -69,6 +70,7 @@ export class TelegramPollingSession { #activeRunner: ReturnType | undefined; #activeFetchAbort: AbortController | undefined; #transportState: TelegramPollingTransportState; + #status: ReturnType; constructor(private readonly opts: TelegramPollingSessionOpts) { this.#transportState = new TelegramPollingTransportState({ @@ -76,6 +78,7 @@ export class TelegramPollingSession { initialTransport: opts.telegramTransport, createTelegramTransport: opts.createTelegramTransport, }); + this.#status = createTelegramPollingStatusPublisher(opts.setStatus); } get activeRunner() { @@ -95,12 +98,7 @@ export class TelegramPollingSession { } async runUntilAbort(): Promise { - this.opts.setStatus?.({ - mode: "polling", - connected: false, - lastConnectedAt: null, - lastEventAt: null, - }); + this.#status.notePollingStart(); try { while (!this.opts.abortSignal?.aborted) { const bot = await this.#createPollingBot(); @@ -126,10 +124,7 @@ export class TelegramPollingSession { // this, the undici keep-alive sockets survive beyond the session and // leak to api.telegram.org; see openclaw#68128. await this.#transportState.dispose(); - this.opts.setStatus?.({ - mode: "polling", - connected: false, - }); + this.#status.notePollingStop(); } } @@ -224,84 +219,31 @@ export class TelegramPollingSession { async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> { await this.#confirmPersistedOffset(bot); - let lastGetUpdatesAt = Date.now(); - let lastApiActivityAt = Date.now(); - let nextInFlightApiCallId = 0; - let latestInFlightApiStartedAt: number | null = null; - const inFlightApiStartedAt = new Map(); - let lastGetUpdatesStartedAt: number | null = null; - let lastGetUpdatesFinishedAt: number | null = null; - let lastGetUpdatesDurationMs: number | null = null; - let lastGetUpdatesOutcome = "not-started"; - let lastGetUpdatesError: string | null = null; - let lastGetUpdatesOffset: number | null = null; - let inFlightGetUpdates = 0; - let _stopSequenceLogged = false; - let stallDiagLoggedAt = 0; - + const liveness = new TelegramPollingLivenessTracker({ + onPollSuccess: (finishedAt) => this.#status.notePollSuccess(finishedAt), + }); bot.api.config.use(async (prev, method, payload, signal) => { if (method !== "getUpdates") { - const startedAt = Date.now(); - const callId = nextInFlightApiCallId; - nextInFlightApiCallId += 1; - inFlightApiStartedAt.set(callId, startedAt); - latestInFlightApiStartedAt = - latestInFlightApiStartedAt == null - ? startedAt - : Math.max(latestInFlightApiStartedAt, startedAt); + const callId = liveness.noteApiCallStarted(); try { const result = await prev(method, payload, signal); - lastApiActivityAt = Date.now(); + liveness.noteApiCallSuccess(); return result; } finally { - inFlightApiStartedAt.delete(callId); - if (latestInFlightApiStartedAt === startedAt) { - let newestStartedAt: number | null = null; - for (const activeStartedAt of inFlightApiStartedAt.values()) { - newestStartedAt = - newestStartedAt == null - ? activeStartedAt - : Math.max(newestStartedAt, activeStartedAt); - } - latestInFlightApiStartedAt = newestStartedAt; - } + liveness.noteApiCallFinished(callId); } } - const startedAt = Date.now(); - lastGetUpdatesAt = startedAt; - lastGetUpdatesStartedAt = startedAt; - lastGetUpdatesOffset = - payload && typeof payload === "object" && "offset" in payload - ? ((payload as { offset?: number }).offset ?? null) - : null; - inFlightGetUpdates += 1; - lastGetUpdatesOutcome = "started"; - lastGetUpdatesError = null; - + liveness.noteGetUpdatesStarted(payload); try { const result = await prev(method, payload, signal); - const finishedAt = Date.now(); - lastGetUpdatesFinishedAt = finishedAt; - lastGetUpdatesDurationMs = finishedAt - startedAt; - lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok"; - lastApiActivityAt = finishedAt; - this.opts.setStatus?.({ - ...createConnectedChannelStatusPatch(finishedAt), - mode: "polling", - lastError: null, - }); + liveness.noteGetUpdatesSuccess(result); return result; } catch (err) { - const finishedAt = Date.now(); - lastGetUpdatesFinishedAt = finishedAt; - lastGetUpdatesDurationMs = finishedAt - startedAt; - lastGetUpdatesOutcome = "error"; - lastGetUpdatesError = formatErrorMessage(err); - lastApiActivityAt = finishedAt; + liveness.noteGetUpdatesError(err); throw err; } finally { - inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1); + liveness.noteGetUpdatesFinished(); } }); @@ -349,41 +291,14 @@ export class TelegramPollingSession { return; } - const now = Date.now(); - const activeElapsed = - inFlightGetUpdates > 0 && lastGetUpdatesStartedAt != null - ? now - lastGetUpdatesStartedAt - : 0; - const idleElapsed = - inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt); - const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed; - const apiLivenessAt = - latestInFlightApiStartedAt == null - ? lastApiActivityAt - : Math.max(lastApiActivityAt, latestInFlightApiStartedAt); - const apiElapsed = now - apiLivenessAt; - - // Treat recent non-getUpdates success and recent non-getUpdates start as - // the same liveness signal. Slow delivery should suppress the watchdog, - // but only for the same bounded window as recent successful API traffic. - if ( - elapsed > POLL_STALL_THRESHOLD_MS && - apiElapsed > POLL_STALL_THRESHOLD_MS && - runner.isRunning() - ) { - if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) { - return; - } - stallDiagLoggedAt = now; + const stall = liveness.detectStall({ + thresholdMs: POLL_STALL_THRESHOLD_MS, + runnerIsRunning: runner.isRunning(), + }); + if (stall) { this.#transportState.markDirty(); stalledRestart = true; - const elapsedLabel = - inFlightGetUpdates > 0 - ? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}` - : `no completed getUpdates for ${formatDurationPrecise(elapsed)}`; - this.opts.log( - `[telegram] Polling stall detected (${elapsedLabel}); forcing restart. [diag inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}]`, - ); + this.opts.log(`[telegram] ${stall.message}`); void stopRunner(); void stopBot(); if (!forceCycleTimer) { @@ -413,7 +328,7 @@ export class TelegramPollingSession { : "runner stopped (maxRetryTime exceeded or graceful stop)"; this.#forceRestarted = false; this.opts.log( - `[telegram][diag] polling cycle finished reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${String(lastGetUpdatesError)}` : ""}`, + `[telegram][diag] polling cycle finished reason=${reason} ${liveness.formatDiagnosticFields("error")}`, ); const shouldRestart = await this.#waitBeforeRestart( (delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`, @@ -438,7 +353,7 @@ export class TelegramPollingSession { const reason = isConflict ? "getUpdates conflict" : "network error"; const errMsg = formatErrorMessage(err); this.opts.log( - `[telegram][diag] polling cycle error reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"} err=${errMsg}${lastGetUpdatesError ? ` lastGetUpdatesError=${String(lastGetUpdatesError)}` : ""}`, + `[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}`, ); const shouldRestart = await this.#waitBeforeRestart( (delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`, diff --git a/extensions/telegram/src/polling-status.test.ts b/extensions/telegram/src/polling-status.test.ts new file mode 100644 index 00000000000..d56b5fd4b44 --- /dev/null +++ b/extensions/telegram/src/polling-status.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTelegramPollingStatusPublisher } from "./polling-status.js"; + +describe("createTelegramPollingStatusPublisher", () => { + it("publishes start, successful poll, and stop status patches", () => { + const setStatus = vi.fn(); + const status = createTelegramPollingStatusPublisher(setStatus); + + status.notePollingStart(); + status.notePollSuccess(1234); + status.notePollingStop(); + + expect(setStatus).toHaveBeenNthCalledWith(1, { + mode: "polling", + connected: false, + lastConnectedAt: null, + lastEventAt: null, + }); + expect(setStatus).toHaveBeenNthCalledWith(2, { + mode: "polling", + connected: true, + lastConnectedAt: 1234, + lastEventAt: 1234, + lastError: null, + }); + expect(setStatus).toHaveBeenNthCalledWith(3, { + mode: "polling", + connected: false, + }); + }); +}); diff --git a/extensions/telegram/src/polling-status.ts b/extensions/telegram/src/polling-status.ts new file mode 100644 index 00000000000..439013b39d5 --- /dev/null +++ b/extensions/telegram/src/polling-status.ts @@ -0,0 +1,30 @@ +import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; +import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; + +type TelegramPollingStatusSink = (patch: Omit) => void; + +export function createTelegramPollingStatusPublisher(setStatus?: TelegramPollingStatusSink) { + return { + notePollingStart() { + setStatus?.({ + mode: "polling", + connected: false, + lastConnectedAt: null, + lastEventAt: null, + }); + }, + notePollSuccess(at = Date.now()) { + setStatus?.({ + ...createConnectedChannelStatusPatch(at), + mode: "polling", + lastError: null, + }); + }, + notePollingStop() { + setStatus?.({ + mode: "polling", + connected: false, + }); + }, + }; +}