From 56873b606578edb4473125b8b2eb6a2a81372be3 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 13 May 2026 11:35:23 +0530 Subject: [PATCH] fix(telegram): detect polling stalls from getUpdates --- .../telegram/src/polling-liveness.test.ts | 13 +- extensions/telegram/src/polling-liveness.ts | 48 +--- .../telegram/src/polling-session.test.ts | 246 ++++-------------- extensions/telegram/src/polling-session.ts | 10 +- 4 files changed, 58 insertions(+), 259 deletions(-) diff --git a/extensions/telegram/src/polling-liveness.test.ts b/extensions/telegram/src/polling-liveness.test.ts index b38b1bb886d..f4d83a68ec5 100644 --- a/extensions/telegram/src/polling-liveness.test.ts +++ b/extensions/telegram/src/polling-liveness.test.ts @@ -5,7 +5,7 @@ const POLL_STALL_THRESHOLD_MS = 90_000; describe("TelegramPollingLivenessTracker", () => { it("records successful getUpdates calls and publishes poll success time", () => { - const nowValues = [0, 0, 10, 25]; + const nowValues = [0, 10, 25]; const now = vi.fn(() => nowValues.shift() ?? 25); const onPollSuccess = vi.fn(); const tracker = new TelegramPollingLivenessTracker({ now, onPollSuccess }); @@ -20,21 +20,16 @@ describe("TelegramPollingLivenessTracker", () => { ); }); - it("does not detect a polling stall while a recent non-polling API call is in flight", () => { + it("detects stale polling without considering unrelated API activity", () => { let now = 0; const tracker = new TelegramPollingLivenessTracker({ now: () => now }); - now = 60_000; - const callId = tracker.noteApiCallStarted(); - now = 120_001; expect( tracker.detectStall({ thresholdMs: POLL_STALL_THRESHOLD_MS, - }), - ).toBeNull(); - - tracker.noteApiCallFinished(callId); + })?.message, + ).toContain("Polling stall detected"); }); it("detects and throttles stale polling diagnostics", () => { diff --git a/extensions/telegram/src/polling-liveness.ts b/extensions/telegram/src/polling-liveness.ts index b57237de020..a1729bf2dd0 100644 --- a/extensions/telegram/src/polling-liveness.ts +++ b/extensions/telegram/src/polling-liveness.ts @@ -12,10 +12,6 @@ type TelegramPollingStall = { export class TelegramPollingLivenessTracker { #lastGetUpdatesAt: number; - #lastApiActivityAt: number; - #nextInFlightApiCallId = 0; - #latestInFlightApiStartedAt: number | null = null; - #inFlightApiStartedAt = new Map(); #lastGetUpdatesStartedAt: number | null = null; #lastGetUpdatesFinishedAt: number | null = null; #lastGetUpdatesDurationMs: number | null = null; @@ -27,37 +23,12 @@ export class TelegramPollingLivenessTracker { constructor(private readonly options: TelegramPollingLivenessTrackerOptions = {}) { this.#lastGetUpdatesAt = this.#now(); - this.#lastApiActivityAt = this.#now(); } get inFlightGetUpdates() { return this.#inFlightGetUpdates; } - noteApiCallStarted(): number { - const startedAt = this.#now(); - const callId = this.#nextInFlightApiCallId; - this.#nextInFlightApiCallId += 1; - this.#inFlightApiStartedAt.set(callId, startedAt); - this.#latestInFlightApiStartedAt = - this.#latestInFlightApiStartedAt == null - ? startedAt - : Math.max(this.#latestInFlightApiStartedAt, startedAt); - return callId; - } - - noteApiCallSuccess(at = this.#now()) { - this.#lastApiActivityAt = at; - } - - noteApiCallFinished(callId: number) { - const startedAt = this.#inFlightApiStartedAt.get(callId); - this.#inFlightApiStartedAt.delete(callId); - if (startedAt != null && this.#latestInFlightApiStartedAt === startedAt) { - this.#latestInFlightApiStartedAt = this.#resolveLatestInFlightApiStartedAt(); - } - } - noteGetUpdatesStarted(payload: unknown, at = this.#now()) { this.#lastGetUpdatesAt = at; this.#lastGetUpdatesStartedAt = at; @@ -72,7 +43,6 @@ export class TelegramPollingLivenessTracker { this.#lastGetUpdatesDurationMs = this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt; this.#lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok"; - this.#lastApiActivityAt = at; this.options.onPollSuccess?.(at); } @@ -82,7 +52,6 @@ export class TelegramPollingLivenessTracker { this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt; this.#lastGetUpdatesOutcome = "error"; this.#lastGetUpdatesError = formatErrorMessage(err); - this.#lastApiActivityAt = at; } noteGetUpdatesFinished() { @@ -100,13 +69,7 @@ export class TelegramPollingLivenessTracker { ? 0 : now - (this.#lastGetUpdatesFinishedAt ?? this.#lastGetUpdatesAt); const elapsed = this.#inFlightGetUpdates > 0 ? activeElapsed : idleElapsed; - const apiLivenessAt = - this.#latestInFlightApiStartedAt == null - ? this.#lastApiActivityAt - : Math.max(this.#lastApiActivityAt, this.#latestInFlightApiStartedAt); - const apiElapsed = now - apiLivenessAt; - - if (elapsed <= params.thresholdMs || apiElapsed <= params.thresholdMs) { + if (elapsed <= params.thresholdMs) { return null; } if (this.#stallDiagLoggedAt && now - this.#stallDiagLoggedAt < params.thresholdMs / 2) { @@ -129,15 +92,6 @@ export class TelegramPollingLivenessTracker { return `inFlight=${this.#inFlightGetUpdates} outcome=${this.#lastGetUpdatesOutcome} startedAt=${this.#lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${this.#lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${this.#lastGetUpdatesDurationMs ?? "n/a"} offset=${this.#lastGetUpdatesOffset ?? "n/a"}${error}`; } - #resolveLatestInFlightApiStartedAt(): number | null { - let newestStartedAt: number | null = null; - for (const activeStartedAt of this.#inFlightApiStartedAt.values()) { - newestStartedAt = - newestStartedAt == null ? activeStartedAt : Math.max(newestStartedAt, activeStartedAt); - } - return newestStartedAt; - } - #now(): number { return this.options.now?.() ?? Date.now(); } diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index d9d3262d0bd..6fec56a3d3e 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -97,10 +97,7 @@ function makeBot() { }; } -function installPollingStallWatchdogHarness( - dateNowSequence: readonly number[] = [0, 0], - fallbackDateNow = 150_001, -) { +function installPollingStallWatchdogHarness(dateNowSequence: readonly number[] = [0, 0]) { let watchdog: (() => void) | undefined; const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { watchdog = fn as () => void; @@ -116,7 +113,7 @@ function installPollingStallWatchdogHarness( for (const value of dateNowSequence) { dateNowSpy.mockImplementationOnce(() => value); } - dateNowSpy.mockImplementation(() => fallbackDateNow); + dateNowSpy.mockImplementation(() => 0); return { async waitForWatchdog() { @@ -129,6 +126,10 @@ function installPollingStallWatchdogHarness( expect(watchdog).toBeTypeOf("function"); return watchdog; }, + setNow(now: number) { + dateNowSpy.mockReset(); + dateNowSpy.mockImplementation(() => now); + }, restore() { setIntervalSpy.mockRestore(); clearIntervalSpy.mockRestore(); @@ -405,7 +406,7 @@ describe("TelegramPollingSession", () => { }; }); - const watchdogHarness = installPollingStallWatchdogHarness(); + const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 0, 0, 0]); const log = vi.fn(); const session = new TelegramPollingSession({ @@ -425,6 +426,7 @@ describe("TelegramPollingSession", () => { try { const runPromise = session.runUntilAbort(); const watchdog = await watchdogHarness.waitForWatchdog(); + watchdogHarness.setNow(150_001); watchdog?.(); await runPromise; @@ -481,6 +483,7 @@ describe("TelegramPollingSession", () => { try { const runPromise = session.runUntilAbort(); const watchdog = await watchdogHarness.waitForWatchdog(); + watchdogHarness.setNow(150_001); watchdog?.(); await runPromise; @@ -498,7 +501,7 @@ describe("TelegramPollingSession", () => { const runnerStop = vi.fn(async () => undefined); mockBotCapturingApiMiddleware(botStop); const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); - const watchdogHarness = installPollingStallWatchdogHarness([0, 0], 150_001); + const watchdogHarness = installPollingStallWatchdogHarness([0, 0]); const log = vi.fn(); const session = createPollingSession({ @@ -588,6 +591,7 @@ describe("TelegramPollingSession", () => { const runPromise = session.runUntilAbort(); const watchdog = await watchdogHarness.waitForWatchdog(); + watchdogHarness.setNow(150_001); watchdog?.(); await runPromise; @@ -651,7 +655,7 @@ describe("TelegramPollingSession", () => { const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); - const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 30_000], 119_999); + const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 30_000]); const log = vi.fn(); const session = createPollingSession({ @@ -804,17 +808,13 @@ describe("TelegramPollingSession", () => { }); }); - it("does not trigger stall restart when non-getUpdates API calls are active", async () => { + it("triggers stall restart even after a non-getUpdates API call succeeds", async () => { const abort = new AbortController(); const botStop = vi.fn(async () => undefined); const runnerStop = vi.fn(async () => undefined); const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); - // t=0: lastGetUpdatesAt and lastApiActivityAt initialized - // t=150_001: watchdog fires (getUpdates stale for 150s) - // But right before watchdog, a sendMessage succeeds at t=150_001 - // All subsequent Date.now calls return the same value, giving apiIdle = 0. const watchdogHarness = installPollingStallWatchdogHarness(); const log = vi.fn(); @@ -827,196 +827,27 @@ describe("TelegramPollingSession", () => { const runPromise = session.runUntilAbort(); const watchdog = await watchdogHarness.waitForWatchdog(); - // Simulate a sendMessage call through the middleware before watchdog fires. - // This updates lastApiActivityAt, proving the network is alive. const apiMiddleware = getApiMiddleware(); if (apiMiddleware) { + watchdogHarness.setNow(0); + await apiMiddleware( + vi.fn(async () => []), + "getUpdates", + { offset: 1 }, + ); + + watchdogHarness.setNow(150_001); const fakePrev = vi.fn(async () => ({ ok: true })); await apiMiddleware(fakePrev, "sendMessage", { chat_id: 123, text: "hello" }); } - // Now fire the watchdog — getUpdates is stale (120s) but API was just active + watchdogHarness.setNow(150_001); watchdog?.(); + await Promise.resolve(); - // The watchdog should NOT have triggered a restart - expect(runnerStop).not.toHaveBeenCalled(); - expect(botStop).not.toHaveBeenCalled(); - expectLogExcludes(log, "Polling stall detected"); - - // Clean up: abort to end the session - abort.abort(); - resolveFirstTask(); - await runPromise; - } finally { - watchdogHarness.restore(); - } - }); - - it("does not trigger stall restart while a recent non-getUpdates API call is in-flight", async () => { - const abort = new AbortController(); - const botStop = vi.fn(async () => undefined); - const runnerStop = vi.fn(async () => undefined); - const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); - const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); - - const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 60_000]); - - const log = vi.fn(); - const session = createPollingSession({ - abortSignal: abort.signal, - log, - }); - - try { - const runPromise = session.runUntilAbort(); - - const watchdog = await watchdogHarness.waitForWatchdog(); - - // Start an in-flight sendMessage that has NOT yet resolved. - // This simulates a slow delivery where the API call is still pending. - let resolveSendMessage: ((v: unknown) => void) | undefined; - const apiMiddleware = getApiMiddleware(); - if (apiMiddleware) { - const slowPrev = vi.fn( - () => - new Promise((resolve) => { - resolveSendMessage = resolve; - }), - ); - // Fire-and-forget: the call is in-flight but not awaited yet - const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" }); - - // Fire the watchdog while sendMessage is still in-flight. - // The in-flight call started 60s ago, so API liveness is still recent. - watchdog?.(); - - // The watchdog should NOT have triggered a restart - expect(runnerStop).not.toHaveBeenCalled(); - expect(botStop).not.toHaveBeenCalled(); - expectLogExcludes(log, "Polling stall detected"); - - // Resolve the in-flight call to clean up - resolveSendMessage?.({ ok: true }); - await sendPromise; - } - - abort.abort(); - resolveFirstTask(); - await runPromise; - } finally { - watchdogHarness.restore(); - } - }); - - it("triggers stall restart when a non-getUpdates API call has been in-flight past the threshold", async () => { - const abort = new AbortController(); - const botStop = vi.fn(async () => undefined); - const runnerStop = vi.fn(async () => undefined); - const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); - const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); - - const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1]); - - const log = vi.fn(); - const session = createPollingSession({ - abortSignal: abort.signal, - log, - }); - - try { - const runPromise = session.runUntilAbort(); - - const watchdog = await watchdogHarness.waitForWatchdog(); - - let resolveSendMessage: ((v: unknown) => void) | undefined; - const apiMiddleware = getApiMiddleware(); - if (apiMiddleware) { - const slowPrev = vi.fn( - () => - new Promise((resolve) => { - resolveSendMessage = resolve; - }), - ); - const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" }); - - // The in-flight send started at t=1 and is still stuck at t=150_001. - // That is older than the watchdog threshold, so restart should proceed. - watchdog?.(); - - expect(runnerStop).toHaveBeenCalledTimes(1); - expect(botStop).toHaveBeenCalledTimes(1); - expectLogIncludes(log, "Polling stall detected"); - - resolveSendMessage?.({ ok: true }); - await sendPromise; - } - - abort.abort(); - resolveFirstTask(); - await runPromise; - } finally { - watchdogHarness.restore(); - } - }); - - it("does not trigger stall restart when a newer non-getUpdates API call starts while an older one is still in-flight", async () => { - const abort = new AbortController(); - const botStop = vi.fn(async () => undefined); - const runnerStop = vi.fn(async () => undefined); - const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); - const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); - - const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 120_000]); - - const log = vi.fn(); - const session = createPollingSession({ - abortSignal: abort.signal, - log, - }); - - try { - const runPromise = session.runUntilAbort(); - - const watchdog = await watchdogHarness.waitForWatchdog(); - - let resolveFirstSend: ((v: unknown) => void) | undefined; - let resolveSecondSend: ((v: unknown) => void) | undefined; - const apiMiddleware = getApiMiddleware(); - if (apiMiddleware) { - const firstSendPromise = apiMiddleware( - vi.fn( - () => - new Promise((resolve) => { - resolveFirstSend = resolve; - }), - ), - "sendMessage", - { chat_id: 123, text: "older" }, - ); - const secondSendPromise = apiMiddleware( - vi.fn( - () => - new Promise((resolve) => { - resolveSecondSend = resolve; - }), - ), - "sendMessage", - { chat_id: 123, text: "newer" }, - ); - - // The older send is stale, but the newer send started just now. - // Watchdog liveness must follow the newest active non-getUpdates call. - watchdog?.(); - - expect(runnerStop).not.toHaveBeenCalled(); - expect(botStop).not.toHaveBeenCalled(); - expectLogExcludes(log, "Polling stall detected"); - - resolveFirstSend?.({ ok: true }); - resolveSecondSend?.({ ok: true }); - await firstSendPromise; - await secondSendPromise; - } + expect(runnerStop).toHaveBeenCalledTimes(1); + expect(botStop).toHaveBeenCalledTimes(1); + expectLogIncludes(log, "Polling stall detected"); abort.abort(); resolveFirstTask(); @@ -1089,6 +920,31 @@ describe("TelegramPollingSession", () => { expectLogIncludes(log, "Another OpenClaw gateway, script, or Telegram poller"); }); + it("logs polling cycle start after a transport rebuild", async () => { + const abort = new AbortController(); + const log = vi.fn(); + const recoverableError = new Error("recoverable polling error"); + const transport1 = makeTelegramTransport(); + const transport2 = makeTelegramTransport(); + const createTelegramTransport = vi + .fn<() => ReturnType>() + .mockReturnValueOnce(transport2); + createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot()); + mockRestartAfterPollingError(recoverableError, abort); + + const session = createPollingSession({ + abortSignal: abort.signal, + log, + telegramTransport: transport1, + createTelegramTransport, + }); + + await session.runUntilAbort(); + + expectLogIncludes(log, "rebuilding transport for next polling cycle"); + expectLogIncludes(log, "polling cycle started"); + }); + it("closes the transport once when runUntilAbort exits normally", async () => { const abort = new AbortController(); const transport = makeTelegramTransport(); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index f99c302be56..1977d73d231 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -239,14 +239,7 @@ export class TelegramPollingSession { }); bot.api.config.use(async (prev, method, payload, signal) => { if (method !== "getUpdates") { - const callId = liveness.noteApiCallStarted(); - try { - const result = await prev(method, payload, signal); - liveness.noteApiCallSuccess(); - return result; - } finally { - liveness.noteApiCallFinished(callId); - } + return await prev(method, payload, signal); } liveness.noteGetUpdatesStarted(payload); @@ -263,6 +256,7 @@ export class TelegramPollingSession { }); const runner = run(bot, this.opts.runnerOptions); + this.opts.log(`[telegram][diag] polling cycle started ${liveness.formatDiagnosticFields()}`); this.#activeRunner = runner; const fetchAbortController = this.#activeFetchAbort; const abortFetch = () => {