import { beforeEach, describe, expect, it, vi } from "vitest"; const runMock = vi.hoisted(() => vi.fn()); const createTelegramBotMock = vi.hoisted(() => vi.fn()); const isRecoverableTelegramNetworkErrorMock = vi.hoisted(() => vi.fn(() => true)); const computeBackoffMock = vi.hoisted(() => vi.fn(() => 0)); const sleepWithAbortMock = vi.hoisted(() => vi.fn(async () => undefined)); vi.mock("@grammyjs/runner", () => ({ run: runMock, })); vi.mock("./bot.js", () => ({ createTelegramBot: createTelegramBotMock, })); vi.mock("./network-errors.js", () => ({ isRecoverableTelegramNetworkError: isRecoverableTelegramNetworkErrorMock, })); vi.mock("./api-logging.js", () => ({ withTelegramApiErrorLogging: async ({ fn }: { fn: () => Promise }) => await fn(), })); vi.mock("openclaw/plugin-sdk/runtime-env", async (importOriginal) => { const actual = await importOriginal(); return { ...actual, computeBackoff: computeBackoffMock, sleepWithAbort: sleepWithAbortMock, }; }); let TelegramPollingSession: typeof import("./polling-session.js").TelegramPollingSession; function makeBot() { return { api: { deleteWebhook: vi.fn(async () => true), getUpdates: vi.fn(async () => []), config: { use: vi.fn() }, }, stop: vi.fn(async () => undefined), }; } function installPollingStallWatchdogHarness() { let watchdog: (() => void) | undefined; const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { watchdog = fn as () => void; return 1 as unknown as ReturnType; }); const clearIntervalSpy = vi.spyOn(globalThis, "clearInterval").mockImplementation(() => {}); const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout").mockImplementation((fn) => { void Promise.resolve().then(() => (fn as () => void)()); return 1 as unknown as ReturnType; }); const clearTimeoutSpy = vi.spyOn(globalThis, "clearTimeout").mockImplementation(() => {}); const dateNowSpy = vi .spyOn(Date, "now") .mockImplementationOnce(() => 0) .mockImplementation(() => 120_001); return { async waitForWatchdog() { for (let attempt = 0; attempt < 20 && !watchdog; attempt += 1) { await Promise.resolve(); } expect(watchdog).toBeTypeOf("function"); return watchdog; }, restore() { setIntervalSpy.mockRestore(); clearIntervalSpy.mockRestore(); setTimeoutSpy.mockRestore(); clearTimeoutSpy.mockRestore(); dateNowSpy.mockRestore(); }, }; } function expectTelegramBotTransportSequence(firstTransport: unknown, secondTransport: unknown) { expect(createTelegramBotMock).toHaveBeenCalledTimes(2); expect(createTelegramBotMock.mock.calls[0]?.[0]?.telegramTransport).toBe(firstTransport); expect(createTelegramBotMock.mock.calls[1]?.[0]?.telegramTransport).toBe(secondTransport); } function makeTelegramTransport() { return { fetch: globalThis.fetch, sourceFetch: globalThis.fetch }; } function mockRestartAfterPollingError(error: unknown, abort: AbortController) { let firstCycle = true; runMock.mockImplementation(() => { if (firstCycle) { firstCycle = false; return { task: async () => { throw error; }, stop: vi.fn(async () => undefined), isRunning: () => false, }; } return { task: async () => { abort.abort(); }, stop: vi.fn(async () => undefined), isRunning: () => false, }; }); } function createPollingSessionWithTransportRestart(params: { abortSignal: AbortSignal; telegramTransport: ReturnType; createTelegramTransport: () => ReturnType; }) { return new TelegramPollingSession({ token: "tok", config: {}, accountId: "default", runtime: undefined, proxyFetch: undefined, abortSignal: params.abortSignal, runnerOptions: {}, getLastUpdateId: () => null, persistUpdateId: async () => undefined, log: () => undefined, telegramTransport: params.telegramTransport, createTelegramTransport: params.createTelegramTransport, }); } describe("TelegramPollingSession", () => { beforeEach(async () => { vi.resetModules(); runMock.mockReset(); createTelegramBotMock.mockReset(); isRecoverableTelegramNetworkErrorMock.mockReset().mockReturnValue(true); computeBackoffMock.mockReset().mockReturnValue(0); sleepWithAbortMock.mockReset().mockResolvedValue(undefined); ({ TelegramPollingSession } = await import("./polling-session.js")); }); it("uses backoff helpers for recoverable polling retries", async () => { const abort = new AbortController(); const recoverableError = new Error("recoverable polling error"); const botStop = vi.fn(async () => undefined); const runnerStop = vi.fn(async () => undefined); const bot = { api: { deleteWebhook: vi.fn(async () => true), getUpdates: vi.fn(async () => []), config: { use: vi.fn() }, }, stop: botStop, }; createTelegramBotMock.mockReturnValue(bot); let firstCycle = true; runMock.mockImplementation(() => { if (firstCycle) { firstCycle = false; return { task: async () => { throw recoverableError; }, stop: runnerStop, isRunning: () => false, }; } return { task: async () => { abort.abort(); }, stop: runnerStop, isRunning: () => false, }; }); const session = new TelegramPollingSession({ token: "tok", config: {}, accountId: "default", runtime: undefined, proxyFetch: undefined, abortSignal: abort.signal, runnerOptions: {}, getLastUpdateId: () => null, persistUpdateId: async () => undefined, log: () => undefined, telegramTransport: undefined, }); await session.runUntilAbort(); expect(runMock).toHaveBeenCalledTimes(2); expect(computeBackoffMock).toHaveBeenCalledTimes(1); expect(sleepWithAbortMock).toHaveBeenCalledTimes(1); }); it("forces a restart when polling stalls without getUpdates activity", async () => { const abort = new AbortController(); const botStop = vi.fn(async () => undefined); const firstRunnerStop = vi.fn(async () => undefined); const secondRunnerStop = vi.fn(async () => undefined); const bot = { api: { deleteWebhook: vi.fn(async () => true), getUpdates: vi.fn(async () => []), config: { use: vi.fn() }, }, stop: botStop, }; createTelegramBotMock.mockReturnValue(bot); let firstTaskResolve: (() => void) | undefined; const firstTask = new Promise((resolve) => { firstTaskResolve = resolve; }); let cycle = 0; runMock.mockImplementation(() => { cycle += 1; if (cycle === 1) { return { task: () => firstTask, stop: async () => { await firstRunnerStop(); firstTaskResolve?.(); }, isRunning: () => true, }; } return { task: async () => { abort.abort(); }, stop: secondRunnerStop, isRunning: () => false, }; }); const watchdogHarness = installPollingStallWatchdogHarness(); const log = vi.fn(); const session = new TelegramPollingSession({ token: "tok", config: {}, accountId: "default", runtime: undefined, proxyFetch: undefined, abortSignal: abort.signal, runnerOptions: {}, getLastUpdateId: () => null, persistUpdateId: async () => undefined, log, telegramTransport: undefined, }); try { const runPromise = session.runUntilAbort(); const watchdog = await watchdogHarness.waitForWatchdog(); watchdog?.(); await runPromise; expect(runMock).toHaveBeenCalledTimes(2); expect(firstRunnerStop).toHaveBeenCalledTimes(1); expect(botStop).toHaveBeenCalled(); expect(log).toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); expect(log).toHaveBeenCalledWith(expect.stringContaining("polling stall detected")); } finally { watchdogHarness.restore(); } }); it("rebuilds the transport after a stalled polling cycle", async () => { vi.useFakeTimers({ shouldAdvanceTime: true }); const abort = new AbortController(); const firstBot = makeBot(); const secondBot = makeBot(); createTelegramBotMock.mockReturnValueOnce(firstBot).mockReturnValueOnce(secondBot); let firstTaskResolve: (() => void) | undefined; const firstTask = new Promise((resolve) => { firstTaskResolve = resolve; }); let cycle = 0; runMock.mockImplementation(() => { cycle += 1; if (cycle === 1) { return { task: () => firstTask, stop: async () => { firstTaskResolve?.(); }, isRunning: () => true, }; } return { task: async () => { abort.abort(); }, stop: vi.fn(async () => undefined), isRunning: () => false, }; }); const watchdogHarness = installPollingStallWatchdogHarness(); const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch }; const transport2 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch }; const createTelegramTransport = vi.fn(() => transport2); try { const session = new TelegramPollingSession({ token: "tok", config: {}, accountId: "default", runtime: undefined, proxyFetch: undefined, abortSignal: abort.signal, runnerOptions: {}, getLastUpdateId: () => null, persistUpdateId: async () => undefined, log: () => undefined, telegramTransport: transport1, createTelegramTransport, }); const runPromise = session.runUntilAbort(); const watchdog = await watchdogHarness.waitForWatchdog(); watchdog?.(); await runPromise; expectTelegramBotTransportSequence(transport1, transport2); expect(createTelegramTransport).toHaveBeenCalledTimes(1); } finally { watchdogHarness.restore(); vi.useRealTimers(); } }); it("rebuilds the transport after a recoverable polling error", async () => { const abort = new AbortController(); const recoverableError = new Error("recoverable polling error"); const transport1 = makeTelegramTransport(); const transport2 = makeTelegramTransport(); const createTelegramTransport = vi.fn(() => transport2); createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot()); mockRestartAfterPollingError(recoverableError, abort); const session = createPollingSessionWithTransportRestart({ abortSignal: abort.signal, telegramTransport: transport1, createTelegramTransport, }); await session.runUntilAbort(); expectTelegramBotTransportSequence(transport1, transport2); expect(createTelegramTransport).toHaveBeenCalledTimes(1); }); it("reuses the transport after a getUpdates conflict", async () => { const abort = new AbortController(); const conflictError = Object.assign( new Error("Conflict: terminated by other getUpdates request"), { error_code: 409, method: "getUpdates", }, ); const transport1 = makeTelegramTransport(); const createTelegramTransport = vi.fn(() => makeTelegramTransport()); createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot()); isRecoverableTelegramNetworkErrorMock.mockReturnValue(false); mockRestartAfterPollingError(conflictError, abort); const session = createPollingSessionWithTransportRestart({ abortSignal: abort.signal, telegramTransport: transport1, createTelegramTransport, }); await session.runUntilAbort(); expectTelegramBotTransportSequence(transport1, transport1); expect(createTelegramTransport).not.toHaveBeenCalled(); }); });