fix(telegram): detect polling stalls from getUpdates

This commit is contained in:
Ayaan Zaidi
2026-05-13 11:35:23 +05:30
parent 7899b70c18
commit 56873b6065
4 changed files with 58 additions and 259 deletions

View File

@@ -5,7 +5,7 @@ const POLL_STALL_THRESHOLD_MS = 90_000;
describe("TelegramPollingLivenessTracker", () => {
it("records successful getUpdates calls and publishes poll success time", () => {
const nowValues = [0, 0, 10, 25];
const nowValues = [0, 10, 25];
const now = vi.fn(() => nowValues.shift() ?? 25);
const onPollSuccess = vi.fn();
const tracker = new TelegramPollingLivenessTracker({ now, onPollSuccess });
@@ -20,21 +20,16 @@ describe("TelegramPollingLivenessTracker", () => {
);
});
it("does not detect a polling stall while a recent non-polling API call is in flight", () => {
it("detects stale polling without considering unrelated API activity", () => {
let now = 0;
const tracker = new TelegramPollingLivenessTracker({ now: () => now });
now = 60_000;
const callId = tracker.noteApiCallStarted();
now = 120_001;
expect(
tracker.detectStall({
thresholdMs: POLL_STALL_THRESHOLD_MS,
}),
).toBeNull();
tracker.noteApiCallFinished(callId);
})?.message,
).toContain("Polling stall detected");
});
it("detects and throttles stale polling diagnostics", () => {

View File

@@ -12,10 +12,6 @@ type TelegramPollingStall = {
export class TelegramPollingLivenessTracker {
#lastGetUpdatesAt: number;
#lastApiActivityAt: number;
#nextInFlightApiCallId = 0;
#latestInFlightApiStartedAt: number | null = null;
#inFlightApiStartedAt = new Map<number, number>();
#lastGetUpdatesStartedAt: number | null = null;
#lastGetUpdatesFinishedAt: number | null = null;
#lastGetUpdatesDurationMs: number | null = null;
@@ -27,37 +23,12 @@ export class TelegramPollingLivenessTracker {
constructor(private readonly options: TelegramPollingLivenessTrackerOptions = {}) {
this.#lastGetUpdatesAt = this.#now();
this.#lastApiActivityAt = this.#now();
}
get inFlightGetUpdates() {
return this.#inFlightGetUpdates;
}
noteApiCallStarted(): number {
const startedAt = this.#now();
const callId = this.#nextInFlightApiCallId;
this.#nextInFlightApiCallId += 1;
this.#inFlightApiStartedAt.set(callId, startedAt);
this.#latestInFlightApiStartedAt =
this.#latestInFlightApiStartedAt == null
? startedAt
: Math.max(this.#latestInFlightApiStartedAt, startedAt);
return callId;
}
noteApiCallSuccess(at = this.#now()) {
this.#lastApiActivityAt = at;
}
noteApiCallFinished(callId: number) {
const startedAt = this.#inFlightApiStartedAt.get(callId);
this.#inFlightApiStartedAt.delete(callId);
if (startedAt != null && this.#latestInFlightApiStartedAt === startedAt) {
this.#latestInFlightApiStartedAt = this.#resolveLatestInFlightApiStartedAt();
}
}
noteGetUpdatesStarted(payload: unknown, at = this.#now()) {
this.#lastGetUpdatesAt = at;
this.#lastGetUpdatesStartedAt = at;
@@ -72,7 +43,6 @@ export class TelegramPollingLivenessTracker {
this.#lastGetUpdatesDurationMs =
this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt;
this.#lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok";
this.#lastApiActivityAt = at;
this.options.onPollSuccess?.(at);
}
@@ -82,7 +52,6 @@ export class TelegramPollingLivenessTracker {
this.#lastGetUpdatesStartedAt == null ? null : at - this.#lastGetUpdatesStartedAt;
this.#lastGetUpdatesOutcome = "error";
this.#lastGetUpdatesError = formatErrorMessage(err);
this.#lastApiActivityAt = at;
}
noteGetUpdatesFinished() {
@@ -100,13 +69,7 @@ export class TelegramPollingLivenessTracker {
? 0
: now - (this.#lastGetUpdatesFinishedAt ?? this.#lastGetUpdatesAt);
const elapsed = this.#inFlightGetUpdates > 0 ? activeElapsed : idleElapsed;
const apiLivenessAt =
this.#latestInFlightApiStartedAt == null
? this.#lastApiActivityAt
: Math.max(this.#lastApiActivityAt, this.#latestInFlightApiStartedAt);
const apiElapsed = now - apiLivenessAt;
if (elapsed <= params.thresholdMs || apiElapsed <= params.thresholdMs) {
if (elapsed <= params.thresholdMs) {
return null;
}
if (this.#stallDiagLoggedAt && now - this.#stallDiagLoggedAt < params.thresholdMs / 2) {
@@ -129,15 +92,6 @@ export class TelegramPollingLivenessTracker {
return `inFlight=${this.#inFlightGetUpdates} outcome=${this.#lastGetUpdatesOutcome} startedAt=${this.#lastGetUpdatesStartedAt ?? "n/a"} finishedAt=${this.#lastGetUpdatesFinishedAt ?? "n/a"} durationMs=${this.#lastGetUpdatesDurationMs ?? "n/a"} offset=${this.#lastGetUpdatesOffset ?? "n/a"}${error}`;
}
#resolveLatestInFlightApiStartedAt(): number | null {
let newestStartedAt: number | null = null;
for (const activeStartedAt of this.#inFlightApiStartedAt.values()) {
newestStartedAt =
newestStartedAt == null ? activeStartedAt : Math.max(newestStartedAt, activeStartedAt);
}
return newestStartedAt;
}
#now(): number {
return this.options.now?.() ?? Date.now();
}

View File

@@ -97,10 +97,7 @@ function makeBot() {
};
}
function installPollingStallWatchdogHarness(
dateNowSequence: readonly number[] = [0, 0],
fallbackDateNow = 150_001,
) {
function installPollingStallWatchdogHarness(dateNowSequence: readonly number[] = [0, 0]) {
let watchdog: (() => void) | undefined;
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
watchdog = fn as () => void;
@@ -116,7 +113,7 @@ function installPollingStallWatchdogHarness(
for (const value of dateNowSequence) {
dateNowSpy.mockImplementationOnce(() => value);
}
dateNowSpy.mockImplementation(() => fallbackDateNow);
dateNowSpy.mockImplementation(() => 0);
return {
async waitForWatchdog() {
@@ -129,6 +126,10 @@ function installPollingStallWatchdogHarness(
expect(watchdog).toBeTypeOf("function");
return watchdog;
},
setNow(now: number) {
dateNowSpy.mockReset();
dateNowSpy.mockImplementation(() => now);
},
restore() {
setIntervalSpy.mockRestore();
clearIntervalSpy.mockRestore();
@@ -405,7 +406,7 @@ describe("TelegramPollingSession", () => {
};
});
const watchdogHarness = installPollingStallWatchdogHarness();
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 0, 0, 0]);
const log = vi.fn();
const session = new TelegramPollingSession({
@@ -425,6 +426,7 @@ describe("TelegramPollingSession", () => {
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
watchdogHarness.setNow(150_001);
watchdog?.();
await runPromise;
@@ -481,6 +483,7 @@ describe("TelegramPollingSession", () => {
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
watchdogHarness.setNow(150_001);
watchdog?.();
await runPromise;
@@ -498,7 +501,7 @@ describe("TelegramPollingSession", () => {
const runnerStop = vi.fn(async () => undefined);
mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0], 150_001);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0]);
const log = vi.fn();
const session = createPollingSession({
@@ -588,6 +591,7 @@ describe("TelegramPollingSession", () => {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
watchdogHarness.setNow(150_001);
watchdog?.();
await runPromise;
@@ -651,7 +655,7 @@ describe("TelegramPollingSession", () => {
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 30_000], 119_999);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 30_000]);
const log = vi.fn();
const session = createPollingSession({
@@ -804,17 +808,13 @@ describe("TelegramPollingSession", () => {
});
});
it("does not trigger stall restart when non-getUpdates API calls are active", async () => {
it("triggers stall restart even after a non-getUpdates API call succeeds", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
// t=0: lastGetUpdatesAt and lastApiActivityAt initialized
// t=150_001: watchdog fires (getUpdates stale for 150s)
// But right before watchdog, a sendMessage succeeds at t=150_001
// All subsequent Date.now calls return the same value, giving apiIdle = 0.
const watchdogHarness = installPollingStallWatchdogHarness();
const log = vi.fn();
@@ -827,196 +827,27 @@ describe("TelegramPollingSession", () => {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
// Simulate a sendMessage call through the middleware before watchdog fires.
// This updates lastApiActivityAt, proving the network is alive.
const apiMiddleware = getApiMiddleware();
if (apiMiddleware) {
watchdogHarness.setNow(0);
await apiMiddleware(
vi.fn(async () => []),
"getUpdates",
{ offset: 1 },
);
watchdogHarness.setNow(150_001);
const fakePrev = vi.fn(async () => ({ ok: true }));
await apiMiddleware(fakePrev, "sendMessage", { chat_id: 123, text: "hello" });
}
// Now fire the watchdog — getUpdates is stale (120s) but API was just active
watchdogHarness.setNow(150_001);
watchdog?.();
await Promise.resolve();
// The watchdog should NOT have triggered a restart
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expectLogExcludes(log, "Polling stall detected");
// Clean up: abort to end the session
abort.abort();
resolveFirstTask();
await runPromise;
} finally {
watchdogHarness.restore();
}
});
it("does not trigger stall restart while a recent non-getUpdates API call is in-flight", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 60_000]);
const log = vi.fn();
const session = createPollingSession({
abortSignal: abort.signal,
log,
});
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
// Start an in-flight sendMessage that has NOT yet resolved.
// This simulates a slow delivery where the API call is still pending.
let resolveSendMessage: ((v: unknown) => void) | undefined;
const apiMiddleware = getApiMiddleware();
if (apiMiddleware) {
const slowPrev = vi.fn(
() =>
new Promise((resolve) => {
resolveSendMessage = resolve;
}),
);
// Fire-and-forget: the call is in-flight but not awaited yet
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
// Fire the watchdog while sendMessage is still in-flight.
// The in-flight call started 60s ago, so API liveness is still recent.
watchdog?.();
// The watchdog should NOT have triggered a restart
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expectLogExcludes(log, "Polling stall detected");
// Resolve the in-flight call to clean up
resolveSendMessage?.({ ok: true });
await sendPromise;
}
abort.abort();
resolveFirstTask();
await runPromise;
} finally {
watchdogHarness.restore();
}
});
it("triggers stall restart when a non-getUpdates API call has been in-flight past the threshold", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1]);
const log = vi.fn();
const session = createPollingSession({
abortSignal: abort.signal,
log,
});
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
let resolveSendMessage: ((v: unknown) => void) | undefined;
const apiMiddleware = getApiMiddleware();
if (apiMiddleware) {
const slowPrev = vi.fn(
() =>
new Promise((resolve) => {
resolveSendMessage = resolve;
}),
);
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
// The in-flight send started at t=1 and is still stuck at t=150_001.
// That is older than the watchdog threshold, so restart should proceed.
watchdog?.();
expect(runnerStop).toHaveBeenCalledTimes(1);
expect(botStop).toHaveBeenCalledTimes(1);
expectLogIncludes(log, "Polling stall detected");
resolveSendMessage?.({ ok: true });
await sendPromise;
}
abort.abort();
resolveFirstTask();
await runPromise;
} finally {
watchdogHarness.restore();
}
});
it("does not trigger stall restart when a newer non-getUpdates API call starts while an older one is still in-flight", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 120_000]);
const log = vi.fn();
const session = createPollingSession({
abortSignal: abort.signal,
log,
});
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
let resolveFirstSend: ((v: unknown) => void) | undefined;
let resolveSecondSend: ((v: unknown) => void) | undefined;
const apiMiddleware = getApiMiddleware();
if (apiMiddleware) {
const firstSendPromise = apiMiddleware(
vi.fn(
() =>
new Promise((resolve) => {
resolveFirstSend = resolve;
}),
),
"sendMessage",
{ chat_id: 123, text: "older" },
);
const secondSendPromise = apiMiddleware(
vi.fn(
() =>
new Promise((resolve) => {
resolveSecondSend = resolve;
}),
),
"sendMessage",
{ chat_id: 123, text: "newer" },
);
// The older send is stale, but the newer send started just now.
// Watchdog liveness must follow the newest active non-getUpdates call.
watchdog?.();
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expectLogExcludes(log, "Polling stall detected");
resolveFirstSend?.({ ok: true });
resolveSecondSend?.({ ok: true });
await firstSendPromise;
await secondSendPromise;
}
expect(runnerStop).toHaveBeenCalledTimes(1);
expect(botStop).toHaveBeenCalledTimes(1);
expectLogIncludes(log, "Polling stall detected");
abort.abort();
resolveFirstTask();
@@ -1089,6 +920,31 @@ describe("TelegramPollingSession", () => {
expectLogIncludes(log, "Another OpenClaw gateway, script, or Telegram poller");
});
it("logs polling cycle start after a transport rebuild", async () => {
const abort = new AbortController();
const log = vi.fn();
const recoverableError = new Error("recoverable polling error");
const transport1 = makeTelegramTransport();
const transport2 = makeTelegramTransport();
const createTelegramTransport = vi
.fn<() => ReturnType<typeof makeTelegramTransport>>()
.mockReturnValueOnce(transport2);
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
mockRestartAfterPollingError(recoverableError, abort);
const session = createPollingSession({
abortSignal: abort.signal,
log,
telegramTransport: transport1,
createTelegramTransport,
});
await session.runUntilAbort();
expectLogIncludes(log, "rebuilding transport for next polling cycle");
expectLogIncludes(log, "polling cycle started");
});
it("closes the transport once when runUntilAbort exits normally", async () => {
const abort = new AbortController();
const transport = makeTelegramTransport();

View File

@@ -239,14 +239,7 @@ export class TelegramPollingSession {
});
bot.api.config.use(async (prev, method, payload, signal) => {
if (method !== "getUpdates") {
const callId = liveness.noteApiCallStarted();
try {
const result = await prev(method, payload, signal);
liveness.noteApiCallSuccess();
return result;
} finally {
liveness.noteApiCallFinished(callId);
}
return await prev(method, payload, signal);
}
liveness.noteGetUpdatesStarted(payload);
@@ -263,6 +256,7 @@ export class TelegramPollingSession {
});
const runner = run(bot, this.opts.runnerOptions);
this.opts.log(`[telegram][diag] polling cycle started ${liveness.formatDiagnosticFields()}`);
this.#activeRunner = runner;
const fetchAbortController = this.#activeFetchAbort;
const abortFetch = () => {