refactor(telegram): split polling liveness tracking

This commit is contained in:
Peter Steinberger
2026-04-20 23:16:09 +01:00
parent 431e33b567
commit 3eb48ec3e7
5 changed files with 326 additions and 109 deletions

View File

@@ -0,0 +1,82 @@
import { describe, expect, it, vi } from "vitest";
import { TelegramPollingLivenessTracker } from "./polling-liveness.js";
const POLL_STALL_THRESHOLD_MS = 90_000;
describe("TelegramPollingLivenessTracker", () => {
it("records successful getUpdates calls and publishes poll success time", () => {
const nowValues = [0, 0, 10, 25];
const now = vi.fn(() => nowValues.shift() ?? 25);
const onPollSuccess = vi.fn();
const tracker = new TelegramPollingLivenessTracker({ now, onPollSuccess });
tracker.noteGetUpdatesStarted({ offset: 42 });
tracker.noteGetUpdatesSuccess([{ update_id: 1 }, { update_id: 2 }]);
tracker.noteGetUpdatesFinished();
expect(onPollSuccess).toHaveBeenCalledWith(25);
expect(tracker.formatDiagnosticFields("error")).toBe(
"inFlight=0 outcome=ok:2 startedAt=10 finishedAt=25 durationMs=15 offset=42",
);
});
it("does not detect a polling stall while a recent non-polling API call is in flight", () => {
let now = 0;
const tracker = new TelegramPollingLivenessTracker({ now: () => now });
now = 60_000;
const callId = tracker.noteApiCallStarted();
now = 120_001;
expect(
tracker.detectStall({
thresholdMs: POLL_STALL_THRESHOLD_MS,
runnerIsRunning: true,
}),
).toBeNull();
tracker.noteApiCallFinished(callId);
});
it("detects and throttles stale polling diagnostics", () => {
let now = 0;
const tracker = new TelegramPollingLivenessTracker({ now: () => now });
now = 120_001;
const stall = tracker.detectStall({
thresholdMs: POLL_STALL_THRESHOLD_MS,
runnerIsRunning: true,
});
expect(stall?.message).toContain("Polling stall detected (no completed getUpdates");
expect(stall?.message).toContain("inFlight=0 outcome=not-started");
now = 130_000;
expect(
tracker.detectStall({
thresholdMs: POLL_STALL_THRESHOLD_MS,
runnerIsRunning: true,
}),
).toBeNull();
});
it("reports active stuck getUpdates calls", () => {
let now = 0;
const tracker = new TelegramPollingLivenessTracker({ now: () => now });
now = 1;
tracker.noteGetUpdatesStarted({ offset: 7 });
now = 120_001;
const stall = tracker.detectStall({
thresholdMs: POLL_STALL_THRESHOLD_MS,
runnerIsRunning: true,
});
expect(stall?.message).toContain("active getUpdates stuck");
expect(stall?.message).toContain("inFlight=1 outcome=started startedAt=1");
expect(stall?.message).toContain("offset=7");
tracker.noteGetUpdatesSuccess([]);
tracker.noteGetUpdatesFinished();
});
});

View File

@@ -0,0 +1,159 @@
import { formatDurationPrecise } from "openclaw/plugin-sdk/runtime-env";
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
type TelegramPollingLivenessTrackerOptions = {
now?: () => number;
onPollSuccess?: (finishedAt: number) => void;
};
type TelegramPollingStall = {
message: string;
};
export class TelegramPollingLivenessTracker {
#lastGetUpdatesAt: number;
#lastApiActivityAt: number;
#nextInFlightApiCallId = 0;
#latestInFlightApiStartedAt: number | null = null;
#inFlightApiStartedAt = new Map<number, number>();
#lastGetUpdatesStartedAt: number | null = null;
#lastGetUpdatesFinishedAt: number | null = null;
#lastGetUpdatesDurationMs: number | null = null;
#lastGetUpdatesOutcome = "not-started";
#lastGetUpdatesError: string | null = null;
#lastGetUpdatesOffset: number | null = null;
#inFlightGetUpdates = 0;
#stallDiagLoggedAt = 0;
constructor(private readonly options: TelegramPollingLivenessTrackerOptions = {}) {
this.#lastGetUpdatesAt = this.#now();
this.#lastApiActivityAt = this.#now();
}
get inFlightGetUpdates() {
return this.#inFlightGetUpdates;
}
noteApiCallStarted(): number {
const startedAt = this.#now();
const callId = this.#nextInFlightApiCallId;
this.#nextInFlightApiCallId += 1;
this.#inFlightApiStartedAt.set(callId, startedAt);
this.#latestInFlightApiStartedAt =
this.#latestInFlightApiStartedAt == null
? startedAt
: Math.max(this.#latestInFlightApiStartedAt, startedAt);
return callId;
}
noteApiCallSuccess(at = this.#now()) {
this.#lastApiActivityAt = at;
}
noteApiCallFinished(callId: number) {
const startedAt = this.#inFlightApiStartedAt.get(callId);
this.#inFlightApiStartedAt.delete(callId);
if (startedAt != null && this.#latestInFlightApiStartedAt === startedAt) {
this.#latestInFlightApiStartedAt = this.#resolveLatestInFlightApiStartedAt();
}
}
noteGetUpdatesStarted(payload: unknown, at = this.#now()) {
this.#lastGetUpdatesAt = at;
this.#lastGetUpdatesStartedAt = at;
this.#lastGetUpdatesOffset = resolveGetUpdatesOffset(payload);
this.#inFlightGetUpdates += 1;
this.#lastGetUpdatesOutcome = "started";
this.#lastGetUpdatesError = null;
}
noteGetUpdatesSuccess(result: unknown, at = this.#now()) {
this.#lastGetUpdatesFinishedAt = at;
this.#lastGetUpdatesDurationMs =
this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt;
this.#lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok";
this.#lastApiActivityAt = at;
this.options.onPollSuccess?.(at);
}
noteGetUpdatesError(err: unknown, at = this.#now()) {
this.#lastGetUpdatesFinishedAt = at;
this.#lastGetUpdatesDurationMs =
this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt;
this.#lastGetUpdatesOutcome = "error";
this.#lastGetUpdatesError = formatErrorMessage(err);
this.#lastApiActivityAt = at;
}
noteGetUpdatesFinished() {
this.#inFlightGetUpdates = Math.max(0, this.#inFlightGetUpdates - 1);
}
detectStall(params: {
thresholdMs: number;
runnerIsRunning: boolean;
now?: number;
}): TelegramPollingStall | null {
if (!params.runnerIsRunning) {
return null;
}
const now = params.now ?? this.#now();
const activeElapsed =
this.#inFlightGetUpdates > 0 && this.#lastGetUpdatesStartedAt != null
? now - this.#lastGetUpdatesStartedAt
: 0;
const idleElapsed =
this.#inFlightGetUpdates > 0
? 0
: now - (this.#lastGetUpdatesFinishedAt ?? this.#lastGetUpdatesAt);
const elapsed = this.#inFlightGetUpdates > 0 ? activeElapsed : idleElapsed;
const apiLivenessAt =
this.#latestInFlightApiStartedAt == null
? this.#lastApiActivityAt
: Math.max(this.#lastApiActivityAt, this.#latestInFlightApiStartedAt);
const apiElapsed = now - apiLivenessAt;
if (elapsed <= params.thresholdMs || apiElapsed <= params.thresholdMs) {
return null;
}
if (this.#stallDiagLoggedAt && now - this.#stallDiagLoggedAt < params.thresholdMs / 2) {
return null;
}
this.#stallDiagLoggedAt = now;
const elapsedLabel =
this.#inFlightGetUpdates > 0
? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}`
: `no completed getUpdates for ${formatDurationPrecise(elapsed)}`;
return {
message: `Polling stall detected (${elapsedLabel}); forcing restart. [diag ${this.formatDiagnosticFields("error")}]`,
};
}
formatDiagnosticFields(errorLabel?: "error" | "lastGetUpdatesError"): string {
const error =
this.#lastGetUpdatesError && errorLabel ? ` ${errorLabel}=${this.#lastGetUpdatesError}` : "";
return `inFlight=${this.#inFlightGetUpdates} outcome=${this.#lastGetUpdatesOutcome} startedAt=${this.#lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${this.#lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${this.#lastGetUpdatesDurationMs ?? "n/a"} offset=${this.#lastGetUpdatesOffset ?? "n/a"}${error}`;
}
#resolveLatestInFlightApiStartedAt(): number | null {
let newestStartedAt: number | null = null;
for (const activeStartedAt of this.#inFlightApiStartedAt.values()) {
newestStartedAt =
newestStartedAt == null ? activeStartedAt : Math.max(newestStartedAt, activeStartedAt);
}
return newestStartedAt;
}
#now(): number {
return this.options.now?.() ?? Date.now();
}
}
function resolveGetUpdatesOffset(payload: unknown): number | null {
if (!payload || typeof payload !== "object" || !("offset" in payload)) {
return null;
}
const offset = (payload as { offset?: unknown }).offset;
return typeof offset === "number" ? offset : null;
}

View File

@@ -1,6 +1,5 @@
import { type RunOptions, run } from "@grammyjs/runner";
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import {
computeBackoff,
formatDurationPrecise,
@@ -12,6 +11,8 @@ import { withTelegramApiErrorLogging } from "./api-logging.js";
import { createTelegramBot } from "./bot.js";
import { type TelegramTransport } from "./fetch.js";
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
import { TelegramPollingLivenessTracker } from "./polling-liveness.js";
import { createTelegramPollingStatusPublisher } from "./polling-status.js";
import { TelegramPollingTransportState } from "./polling-transport-state.js";
const TELEGRAM_POLL_RESTART_POLICY = {
@@ -69,6 +70,7 @@ export class TelegramPollingSession {
#activeRunner: ReturnType<typeof run> | undefined;
#activeFetchAbort: AbortController | undefined;
#transportState: TelegramPollingTransportState;
#status: ReturnType<typeof createTelegramPollingStatusPublisher>;
constructor(private readonly opts: TelegramPollingSessionOpts) {
this.#transportState = new TelegramPollingTransportState({
@@ -76,6 +78,7 @@ export class TelegramPollingSession {
initialTransport: opts.telegramTransport,
createTelegramTransport: opts.createTelegramTransport,
});
this.#status = createTelegramPollingStatusPublisher(opts.setStatus);
}
get activeRunner() {
@@ -95,12 +98,7 @@ export class TelegramPollingSession {
}
async runUntilAbort(): Promise<void> {
this.opts.setStatus?.({
mode: "polling",
connected: false,
lastConnectedAt: null,
lastEventAt: null,
});
this.#status.notePollingStart();
try {
while (!this.opts.abortSignal?.aborted) {
const bot = await this.#createPollingBot();
@@ -126,10 +124,7 @@ export class TelegramPollingSession {
// this, the undici keep-alive sockets survive beyond the session and
// leak to api.telegram.org; see openclaw#68128.
await this.#transportState.dispose();
this.opts.setStatus?.({
mode: "polling",
connected: false,
});
this.#status.notePollingStop();
}
}
@@ -224,84 +219,31 @@ export class TelegramPollingSession {
async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> {
await this.#confirmPersistedOffset(bot);
let lastGetUpdatesAt = Date.now();
let lastApiActivityAt = Date.now();
let nextInFlightApiCallId = 0;
let latestInFlightApiStartedAt: number | null = null;
const inFlightApiStartedAt = new Map<number, number>();
let lastGetUpdatesStartedAt: number | null = null;
let lastGetUpdatesFinishedAt: number | null = null;
let lastGetUpdatesDurationMs: number | null = null;
let lastGetUpdatesOutcome = "not-started";
let lastGetUpdatesError: string | null = null;
let lastGetUpdatesOffset: number | null = null;
let inFlightGetUpdates = 0;
let _stopSequenceLogged = false;
let stallDiagLoggedAt = 0;
const liveness = new TelegramPollingLivenessTracker({
onPollSuccess: (finishedAt) => this.#status.notePollSuccess(finishedAt),
});
bot.api.config.use(async (prev, method, payload, signal) => {
if (method !== "getUpdates") {
const startedAt = Date.now();
const callId = nextInFlightApiCallId;
nextInFlightApiCallId += 1;
inFlightApiStartedAt.set(callId, startedAt);
latestInFlightApiStartedAt =
latestInFlightApiStartedAt == null
? startedAt
: Math.max(latestInFlightApiStartedAt, startedAt);
const callId = liveness.noteApiCallStarted();
try {
const result = await prev(method, payload, signal);
lastApiActivityAt = Date.now();
liveness.noteApiCallSuccess();
return result;
} finally {
inFlightApiStartedAt.delete(callId);
if (latestInFlightApiStartedAt === startedAt) {
let newestStartedAt: number | null = null;
for (const activeStartedAt of inFlightApiStartedAt.values()) {
newestStartedAt =
newestStartedAt == null
? activeStartedAt
: Math.max(newestStartedAt, activeStartedAt);
}
latestInFlightApiStartedAt = newestStartedAt;
}
liveness.noteApiCallFinished(callId);
}
}
const startedAt = Date.now();
lastGetUpdatesAt = startedAt;
lastGetUpdatesStartedAt = startedAt;
lastGetUpdatesOffset =
payload && typeof payload === "object" && "offset" in payload
? ((payload as { offset?: number }).offset ?? null)
: null;
inFlightGetUpdates += 1;
lastGetUpdatesOutcome = "started";
lastGetUpdatesError = null;
liveness.noteGetUpdatesStarted(payload);
try {
const result = await prev(method, payload, signal);
const finishedAt = Date.now();
lastGetUpdatesFinishedAt = finishedAt;
lastGetUpdatesDurationMs = finishedAt - startedAt;
lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok";
lastApiActivityAt = finishedAt;
this.opts.setStatus?.({
...createConnectedChannelStatusPatch(finishedAt),
mode: "polling",
lastError: null,
});
liveness.noteGetUpdatesSuccess(result);
return result;
} catch (err) {
const finishedAt = Date.now();
lastGetUpdatesFinishedAt = finishedAt;
lastGetUpdatesDurationMs = finishedAt - startedAt;
lastGetUpdatesOutcome = "error";
lastGetUpdatesError = formatErrorMessage(err);
lastApiActivityAt = finishedAt;
liveness.noteGetUpdatesError(err);
throw err;
} finally {
inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1);
liveness.noteGetUpdatesFinished();
}
});
@@ -349,41 +291,14 @@ export class TelegramPollingSession {
return;
}
const now = Date.now();
const activeElapsed =
inFlightGetUpdates > 0 && lastGetUpdatesStartedAt != null
? now - lastGetUpdatesStartedAt
: 0;
const idleElapsed =
inFlightGetUpdates > 0 ? 0 : now - (lastGetUpdatesFinishedAt ?? lastGetUpdatesAt);
const elapsed = inFlightGetUpdates > 0 ? activeElapsed : idleElapsed;
const apiLivenessAt =
latestInFlightApiStartedAt == null
? lastApiActivityAt
: Math.max(lastApiActivityAt, latestInFlightApiStartedAt);
const apiElapsed = now - apiLivenessAt;
// Treat recent non-getUpdates success and recent non-getUpdates start as
// the same liveness signal. Slow delivery should suppress the watchdog,
// but only for the same bounded window as recent successful API traffic.
if (
elapsed > POLL_STALL_THRESHOLD_MS &&
apiElapsed > POLL_STALL_THRESHOLD_MS &&
runner.isRunning()
) {
if (stallDiagLoggedAt && now - stallDiagLoggedAt < POLL_STALL_THRESHOLD_MS / 2) {
return;
}
stallDiagLoggedAt = now;
const stall = liveness.detectStall({
thresholdMs: POLL_STALL_THRESHOLD_MS,
runnerIsRunning: runner.isRunning(),
});
if (stall) {
this.#transportState.markDirty();
stalledRestart = true;
const elapsedLabel =
inFlightGetUpdates > 0
? `active getUpdates stuck for ${formatDurationPrecise(elapsed)}`
: `no completed getUpdates for ${formatDurationPrecise(elapsed)}`;
this.opts.log(
`[telegram] Polling stall detected (${elapsedLabel}); forcing restart. [diag inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${lastGetUpdatesError}` : ""}]`,
);
this.opts.log(`[telegram] ${stall.message}`);
void stopRunner();
void stopBot();
if (!forceCycleTimer) {
@@ -413,7 +328,7 @@ export class TelegramPollingSession {
: "runner stopped (maxRetryTime exceeded or graceful stop)";
this.#forceRestarted = false;
this.opts.log(
`[telegram][diag] polling cycle finished reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"}${lastGetUpdatesError ? ` error=${String(lastGetUpdatesError)}` : ""}`,
`[telegram][diag] polling cycle finished reason=${reason} ${liveness.formatDiagnosticFields("error")}`,
);
const shouldRestart = await this.#waitBeforeRestart(
(delay) => `Telegram polling runner stopped (${reason}); restarting in ${delay}.`,
@@ -438,7 +353,7 @@ export class TelegramPollingSession {
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
this.opts.log(
`[telegram][diag] polling cycle error reason=${reason} inFlight=${inFlightGetUpdates} outcome=${lastGetUpdatesOutcome} startedAt=${lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${lastGetUpdatesDurationMs ?? "n/a"} offset=${lastGetUpdatesOffset ?? "n/a"} err=${errMsg}${lastGetUpdatesError ? ` lastGetUpdatesError=${String(lastGetUpdatesError)}` : ""}`,
`[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}`,
);
const shouldRestart = await this.#waitBeforeRestart(
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,

View File

@@ -0,0 +1,31 @@
import { describe, expect, it, vi } from "vitest";
import { createTelegramPollingStatusPublisher } from "./polling-status.js";
describe("createTelegramPollingStatusPublisher", () => {
it("publishes start, successful poll, and stop status patches", () => {
const setStatus = vi.fn();
const status = createTelegramPollingStatusPublisher(setStatus);
status.notePollingStart();
status.notePollSuccess(1234);
status.notePollingStop();
expect(setStatus).toHaveBeenNthCalledWith(1, {
mode: "polling",
connected: false,
lastConnectedAt: null,
lastEventAt: null,
});
expect(setStatus).toHaveBeenNthCalledWith(2, {
mode: "polling",
connected: true,
lastConnectedAt: 1234,
lastEventAt: 1234,
lastError: null,
});
expect(setStatus).toHaveBeenNthCalledWith(3, {
mode: "polling",
connected: false,
});
});
});

View File

@@ -0,0 +1,30 @@
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
type TelegramPollingStatusSink = (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
export function createTelegramPollingStatusPublisher(setStatus?: TelegramPollingStatusSink) {
return {
notePollingStart() {
setStatus?.({
mode: "polling",
connected: false,
lastConnectedAt: null,
lastEventAt: null,
});
},
notePollSuccess(at = Date.now()) {
setStatus?.({
...createConnectedChannelStatusPatch(at),
mode: "polling",
lastError: null,
});
},
notePollingStop() {
setStatus?.({
mode: "polling",
connected: false,
});
},
};
}