From fd244fd76de3a62ff08c26c8fd9aab2ae250b3a4 Mon Sep 17 00:00:00 2001 From: Josh Avant <830519+joshavant@users.noreply.github.com> Date: Thu, 14 May 2026 03:35:06 -0500 Subject: [PATCH] Fix Telegram polling ingress under event-loop stalls (#81746) * fix telegram polling ingress under event-loop stalls * add changelog for telegram ingress fix --- CHANGELOG.md | 1 + extensions/telegram/src/monitor.test.ts | 126 ++++++++----- extensions/telegram/src/monitor.ts | 7 + extensions/telegram/src/monitor.types.ts | 3 + .../telegram/src/polling-session.test.ts | 71 +++++++ extensions/telegram/src/polling-session.ts | 164 +++++++++++++++- .../src/telegram-ingress-spool.test.ts | 44 +++++ .../telegram/src/telegram-ingress-spool.ts | 133 +++++++++++++ .../src/telegram-ingress-worker.runtime.ts | 175 ++++++++++++++++++ .../telegram/src/telegram-ingress-worker.ts | 93 ++++++++++ .../telegram-ingress-worker.runtime.ts | 1 + ...tic-stuck-session-recovery.runtime.test.ts | 20 ++ ...agnostic-stuck-session-recovery.runtime.ts | 4 +- 13 files changed, 791 insertions(+), 51 deletions(-) create mode 100644 extensions/telegram/src/telegram-ingress-spool.test.ts create mode 100644 extensions/telegram/src/telegram-ingress-spool.ts create mode 100644 extensions/telegram/src/telegram-ingress-worker.runtime.ts create mode 100644 extensions/telegram/src/telegram-ingress-worker.ts create mode 100644 extensions/telegram/telegram-ingress-worker.runtime.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 825972525f6..0f618fe9314 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai - Memory/daily-files: widen the daily-memory file matcher used by Dreaming, rem-backfill, rem-harness, the doctor sweep, and short-term promotion so `memory/YYYY-MM-DD-.md` files written by the bundled session-memory hook (and any future slugged variants) are discovered alongside the date-only `memory/YYYY-MM-DD.md` shape. Date extraction still uses the leading `YYYY-MM-DD` capture group, so per-day ingestion/promotion semantics are unchanged for existing date-only files; slugged files now flow through the same paths instead of being silently skipped. Fixes #69536. Thanks @jack-stormentswe. - Security/sandbox: include Windows `USERPROFILE` in the sandbox blocked home roots so credential-bearing binds (such as `.codex`, `.openclaw`, or `.ssh` under the Windows user profile) are denied even when `HOME` points at a different shell home. (#63074) Thanks @luoyanglang. - Gateway/OpenAI-compatible HTTP: parse shared JSON endpoint paths without trusting malformed Host headers, avoiding 500s before `/v1/chat/completions`, `/v1/responses`, and `/v1/embeddings` request handling. +- Telegram: keep Bot API polling alive during main event-loop stalls by moving ingress to an isolated worker with a durable local spool. Fixes #81132. (#81746) Thanks @joshavant. - Voice-call webhooks: parse webhook and realtime upgrade paths without trusting malformed Host headers, avoiding 500s before provider signature checks or path rejection. - Media store: reject malformed redirect `Location` headers as media-download failures instead of letting URL parsing escape the async response callback. - ClickClack: skip malformed realtime websocket frames instead of stopping the channel monitor on a single bad JSON event. diff --git a/extensions/telegram/src/monitor.test.ts b/extensions/telegram/src/monitor.test.ts index 8cec88d1b65..e58418fb590 100644 --- a/extensions/telegram/src/monitor.test.ts +++ b/extensions/telegram/src/monitor.test.ts @@ -115,6 +115,11 @@ type RunnerStub = { isRunning: () => boolean; }; +const withLegacyPolling = (opts: MonitorTelegramOpts): MonitorTelegramOpts => ({ + ...opts, + isolatedIngress: { enabled: false, ...opts.isolatedIngress }, +}); + const makeRunnerStub = (overrides: Partial = {}): RunnerStub => ({ task: overrides.task ?? (() => Promise.resolve()), stop: overrides.stop ?? vi.fn<() => void | Promise>(), @@ -197,7 +202,7 @@ async function expectOffsetConfirmationSkipped(offset: number | null) { api.deleteWebhook.mockResolvedValueOnce(true); mockRunOnceAndAbort(abort); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expect(api.getUpdates).not.toHaveBeenCalled(); } @@ -225,7 +230,7 @@ async function runMonitorAndCaptureStartupOrder(params?: { persistedOffset?: num return makeAbortRunner(abort); }); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); return { order }; } @@ -305,6 +310,7 @@ async function monitorWithAutoAbort(opts: Omit { ) .mockImplementationOnce(() => makeAbortRunner(abort)); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expectRecoverableRetryState(2); }); @@ -516,7 +522,7 @@ describe("monitorTelegramProvider (grammY)", () => { api.deleteWebhook.mockRejectedValueOnce(cleanupError); mockRunOnceAndAbort(abort); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expect(api.deleteWebhook).toHaveBeenCalledTimes(1); expect(api.getWebhookInfo).not.toHaveBeenCalled(); @@ -531,7 +537,7 @@ describe("monitorTelegramProvider (grammY)", () => { api.deleteWebhook.mockRejectedValueOnce(cleanupError); mockRunOnceAndAbort(abort); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expect(api.deleteWebhook).toHaveBeenCalledTimes(1); expect(api.getWebhookInfo).not.toHaveBeenCalled(); @@ -545,7 +551,7 @@ describe("monitorTelegramProvider (grammY)", () => { createTelegramBotErrors.push(setupError); mockRunOnceAndAbort(abort); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expectRecoverableRetryState(1); }); @@ -571,7 +577,7 @@ describe("monitorTelegramProvider (grammY)", () => { return makeAbortRunner(abort); }); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expect(firstStop).toHaveBeenCalled(); expectRecoverableRetryState(2); @@ -581,7 +587,7 @@ describe("monitorTelegramProvider (grammY)", () => { const abort = new AbortController(); mockRunOnceAndAbort(abort); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expect(createdBotStops.length).toBe(1); expect(createdBotStops[0]).toHaveBeenCalledTimes(1); @@ -591,10 +597,12 @@ describe("monitorTelegramProvider (grammY)", () => { const abort = new AbortController(); const firstCycle = mockRunOnceWithStalledPollingRunner(); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); - await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow( + await expect(monitorTelegramProvider(withLegacyPolling({ token: "tok" }))).rejects.toThrow( "refusing duplicate poller", ); expect(runSpy).toHaveBeenCalledTimes(1); @@ -609,15 +617,19 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceWithStalledPollingRunner(); - const firstMonitor = monitorTelegramProvider({ - token: "tok-a", - abortSignal: firstAbort.signal, - }); + const firstMonitor = monitorTelegramProvider( + withLegacyPolling({ + token: "tok-a", + abortSignal: firstAbort.signal, + }), + ); await firstCycle.waitForRunStart(); - const secondMonitor = monitorTelegramProvider({ - token: "tok-b", - abortSignal: secondAbort.signal, - }); + const secondMonitor = monitorTelegramProvider( + withLegacyPolling({ + token: "tok-b", + abortSignal: secondAbort.signal, + }), + ); await secondCycle.waitForRunStart(); expect(runSpy).toHaveBeenCalledTimes(2); @@ -632,18 +644,22 @@ describe("monitorTelegramProvider (grammY)", () => { const secondAbort = new AbortController(); const firstCycle = mockRunOnceWithStalledPollingRunner(); - const firstMonitor = monitorTelegramProvider({ - token: "tok", - abortSignal: firstAbort.signal, - }); + const firstMonitor = monitorTelegramProvider( + withLegacyPolling({ + token: "tok", + abortSignal: firstAbort.signal, + }), + ); await firstCycle.waitForRunStart(); firstAbort.abort(); const secondCycle = mockRunOnceAndAbort(secondAbort); - const secondMonitor = monitorTelegramProvider({ - token: "tok", - abortSignal: secondAbort.signal, - }); + const secondMonitor = monitorTelegramProvider( + withLegacyPolling({ + token: "tok", + abortSignal: secondAbort.signal, + }), + ); await secondCycle.waitForRunStart(); await Promise.all([firstMonitor, secondMonitor]); @@ -656,7 +672,7 @@ describe("monitorTelegramProvider (grammY)", () => { const abort = new AbortController(); mockRunOnceAndAbort(abort); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); expect(vi.getTimerCount()).toBe(0); } finally { @@ -671,7 +687,9 @@ describe("monitorTelegramProvider (grammY)", () => { }), ); - await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow("bad token"); + await expect(monitorTelegramProvider(withLegacyPolling({ token: "tok" }))).rejects.toThrow( + "bad token", + ); }); it("force-restarts polling when unhandled network rejection stalls runner", async () => { @@ -679,7 +697,9 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceWithStalledPollingRunner(); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); expect(emitUnhandledRejection(await makeTaggedPollingFetchError())).toBe(true); @@ -697,7 +717,9 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceWithStalledPollingRunner(); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); expect(emitUncaughtException(await makeTaggedPollingFetchError())).toBe(true); @@ -713,7 +735,9 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceWithStalledPollingRunner(); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); expect(emitUncaughtException(await makeTaggedPollingHttpError())).toBe(true); @@ -745,7 +769,9 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceAndAbort(abort); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); vi.advanceTimersByTime(150_000); @@ -779,7 +805,9 @@ describe("monitorTelegramProvider (grammY)", () => { .mockReturnValueOnce(rebuiltTransport); const secondCycle = mockRunOnceAndAbort(abort); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); expect(emitUnhandledRejection(await makeTaggedPollingFetchError())).toBe(true); @@ -798,7 +826,9 @@ describe("monitorTelegramProvider (grammY)", () => { const { stop, waitForTaskStart } = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceAndAbort(abort); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await waitForTaskStart(); const firstSignal = createTelegramBotCalls[0]?.fetchAbortSignal; expect(firstSignal).toBeInstanceOf(AbortSignal); @@ -817,7 +847,9 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const { stop } = firstCycle; - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); const slackDnsError = Object.assign( @@ -891,7 +923,9 @@ describe("monitorTelegramProvider (grammY)", () => { const { stop } = firstCycle; const secondCycle = mockRunOnceAndAbort(abort); - const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ token: "tok", abortSignal: abort.signal }), + ); await firstCycle.waitForRunStart(); // Advance time past the stall threshold (120s) + watchdog interval (30s) @@ -910,14 +944,16 @@ describe("monitorTelegramProvider (grammY)", () => { const firstCycle = mockRunOnceWithStalledPollingRunner(); const secondCycle = mockRunOnceAndAbort(abort); - const monitor = monitorTelegramProvider({ - token: "tok", - abortSignal: abort.signal, - config: { - agents: { defaults: { maxConcurrent: 2 } }, - channels: { telegram: { pollingStallThresholdMs: 30_000 } }, - }, - }); + const monitor = monitorTelegramProvider( + withLegacyPolling({ + token: "tok", + abortSignal: abort.signal, + config: { + agents: { defaults: { maxConcurrent: 2 } }, + channels: { telegram: { pollingStallThresholdMs: 30_000 } }, + }, + }), + ); await firstCycle.waitForRunStart(); vi.advanceTimersByTime(60_000); @@ -996,7 +1032,7 @@ describe("monitorTelegramProvider (grammY)", () => { return makeAbortRunner(abort); }); - await monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); + await monitorTelegramProvider(withLegacyPolling({ token: "tok", abortSignal: abort.signal })); // deleteWebhook should be called twice: once on initial cleanup, once after 409 reset expect(api.deleteWebhook).toHaveBeenCalledTimes(2); diff --git a/extensions/telegram/src/monitor.ts b/extensions/telegram/src/monitor.ts index 2dfd3355199..9aa60db491a 100644 --- a/extensions/telegram/src/monitor.ts +++ b/extensions/telegram/src/monitor.ts @@ -291,6 +291,13 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { createTelegramTransport: createTelegramTransportForPolling, stallThresholdMs: account.config.pollingStallThresholdMs, setStatus: opts.setStatus, + isolatedIngress: { + enabled: opts.isolatedIngress?.enabled ?? true, + apiRoot: account.config.apiRoot, + timeoutSeconds: account.config.timeoutSeconds, + proxy: account.config.proxy, + network: account.config.network, + }, }); await pollingSession.runUntilAbort(); } finally { diff --git a/extensions/telegram/src/monitor.types.ts b/extensions/telegram/src/monitor.types.ts index d205deac5a4..ab3f277824f 100644 --- a/extensions/telegram/src/monitor.types.ts +++ b/extensions/telegram/src/monitor.types.ts @@ -23,6 +23,9 @@ export type MonitorTelegramOpts = { webhookCertPath?: string; botInfo?: TelegramBotInfo; setStatus?: (patch: Omit) => void; + isolatedIngress?: { + enabled?: boolean; + }; }; export type TelegramMonitorFn = (opts?: MonitorTelegramOpts) => Promise; diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 6fec56a3d3e..081f1772384 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -1,3 +1,6 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; @@ -30,6 +33,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", () => ({ })); let TelegramPollingSession: typeof import("./polling-session.js").TelegramPollingSession; +let writeTelegramSpooledUpdate: typeof import("./telegram-ingress-spool.js").writeTelegramSpooledUpdate; type TelegramApiMiddleware = ( prev: (...args: unknown[]) => Promise, @@ -192,6 +196,7 @@ function createPollingSession(params: { createTelegramTransport?: () => ReturnType; stallThresholdMs?: number; setStatus?: (patch: Omit) => void; + isolatedIngress?: ConstructorParameters[0]["isolatedIngress"]; }) { return new TelegramPollingSession({ token: "tok", @@ -207,6 +212,7 @@ function createPollingSession(params: { telegramTransport: params.telegramTransport, stallThresholdMs: params.stallThresholdMs, setStatus: params.setStatus, + isolatedIngress: params.isolatedIngress, ...(params.createTelegramTransport ? { createTelegramTransport: params.createTelegramTransport } : {}), @@ -262,6 +268,7 @@ async function waitForApiMiddleware( describe("TelegramPollingSession", () => { beforeAll(async () => { ({ TelegramPollingSession } = await import("./polling-session.js")); + ({ writeTelegramSpooledUpdate } = await import("./telegram-ingress-spool.js")); }); beforeEach(() => { @@ -365,6 +372,70 @@ describe("TelegramPollingSession", () => { expect(bot.api.getUpdates).not.toHaveBeenCalled(); }); + it("drains isolated ingress spool through the main-thread bot without offset watermark skipping", async () => { + const abort = new AbortController(); + const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + const handleUpdate = vi.fn(async () => undefined); + const bot = { + api: { + deleteWebhook: vi.fn(async () => true), + config: { use: vi.fn() }, + }, + handleUpdate, + stop: vi.fn(async () => undefined), + }; + createTelegramBotMock.mockReturnValueOnce(bot); + await writeTelegramSpooledUpdate({ + spoolDir: tempDir, + update: { update_id: 42, message: { text: "hello" } }, + }); + let stopWorker: (() => void) | undefined; + const workerDone = new Promise((resolve) => { + stopWorker = resolve; + }); + const createWorker = vi.fn(() => ({ + onMessage: vi.fn(() => () => undefined), + stop: vi.fn(async () => { + stopWorker?.(); + }), + task: vi.fn(async () => { + await workerDone; + }), + })); + + try { + const session = createPollingSession({ + abortSignal: abort.signal, + isolatedIngress: { + enabled: true, + spoolDir: tempDir, + createWorker, + drainIntervalMs: 10, + }, + }); + + const runPromise = session.runUntilAbort(); + await vi.waitFor(() => expect(handleUpdate).toHaveBeenCalledTimes(1)); + abort.abort(); + await runPromise; + + expect(createWorker).toHaveBeenCalledWith( + expect.objectContaining({ + initialUpdateId: null, + spoolDir: tempDir, + token: "tok", + }), + ); + expect( + mockObjectArg(createTelegramBotMock, "createTelegramBot").updateOffset, + ).toBeUndefined(); + expect(handleUpdate).toHaveBeenCalledWith({ update_id: 42, message: { text: "hello" } }); + await expect(fs.readdir(tempDir)).resolves.toEqual([]); + } finally { + await fs.rm(tempDir, { recursive: true, force: true }); + } + }); + it("forces a restart when polling stalls without getUpdates activity", async () => { const abort = new AbortController(); const botStop = vi.fn(async () => undefined); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 1977d73d231..639320850b8 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -1,5 +1,6 @@ import { type RunOptions, run } from "@grammyjs/runner"; import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; +import type { TelegramNetworkConfig } from "openclaw/plugin-sdk/config-contracts"; import { computeBackoff, formatDurationPrecise, @@ -15,6 +16,15 @@ import { TelegramPollingLivenessTracker } from "./polling-liveness.js"; import { createTelegramPollingStatusPublisher } from "./polling-status.js"; import { TelegramPollingTransportState } from "./polling-transport-state.js"; import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js"; +import { + deleteTelegramSpooledUpdate, + listTelegramSpooledUpdates, + resolveTelegramIngressSpoolDir, +} from "./telegram-ingress-spool.js"; +import { + createTelegramIngressWorker, + type TelegramIngressWorkerFactory, +} from "./telegram-ingress-worker.js"; const TELEGRAM_POLL_RESTART_POLICY = { initialMs: 2000, @@ -80,6 +90,16 @@ type TelegramPollingSessionOpts = { /** Stall detection threshold in ms. Defaults to 120_000 (2 min). */ stallThresholdMs?: number; setStatus?: (patch: Omit) => void; + isolatedIngress?: { + enabled: boolean; + apiRoot?: string; + timeoutSeconds?: number; + proxy?: string; + network?: TelegramNetworkConfig; + spoolDir?: string; + createWorker?: TelegramIngressWorkerFactory; + drainIntervalMs?: number; + }; }; export class TelegramPollingSession { @@ -135,7 +155,9 @@ export class TelegramPollingSession { return; } - const state = await this.#runPollingCycle(bot); + const state = this.opts.isolatedIngress?.enabled + ? await this.#runIsolatedIngressCycle(bot) + : await this.#runPollingCycle(bot); if (state === "exit") { return; } @@ -181,6 +203,12 @@ export class TelegramPollingSession { const fetchAbortController = new AbortController(); this.#activeFetchAbort = fetchAbortController; const telegramTransport = this.#transportState.acquireForNextCycle(); + const updateOffset = this.opts.isolatedIngress?.enabled + ? undefined + : { + lastUpdateId: this.opts.getLastUpdateId(), + onUpdateId: this.opts.persistUpdateId, + }; try { return createTelegramBot({ token: this.opts.token, @@ -191,10 +219,7 @@ export class TelegramPollingSession { botInfo: this.opts.botInfo, fetchAbortSignal: fetchAbortController.signal, minimumClientTimeoutSeconds: TELEGRAM_POLLING_CLIENT_TIMEOUT_FLOOR_SECONDS, - updateOffset: { - lastUpdateId: this.opts.getLastUpdateId(), - onUpdateId: this.opts.persistUpdateId, - }, + ...(updateOffset ? { updateOffset } : {}), telegramTransport, }); } catch (err) { @@ -233,6 +258,135 @@ export class TelegramPollingSession { } } + async #drainSpooledUpdates(params: { bot: TelegramBot; spoolDir: string }): Promise { + let handled = 0; + const updates = await listTelegramSpooledUpdates({ spoolDir: params.spoolDir, limit: 100 }); + for (const update of updates) { + if (this.opts.abortSignal?.aborted) { + break; + } + try { + await params.bot.handleUpdate( + update.update as Parameters[0], + ); + await deleteTelegramSpooledUpdate(update); + handled += 1; + } catch (err) { + this.opts.log( + `[telegram][diag] spooled update ${update.updateId} handler failed; keeping for retry: ${formatErrorMessage(err)}`, + ); + break; + } + } + return handled; + } + + async #runIsolatedIngressCycle(bot: TelegramBot): Promise<"continue" | "exit"> { + const ingress = this.opts.isolatedIngress; + if (!ingress?.enabled) { + return this.#runPollingCycle(bot); + } + const spoolDir = + ingress.spoolDir ?? resolveTelegramIngressSpoolDir({ accountId: this.opts.accountId }); + const workerFactory = ingress.createWorker ?? createTelegramIngressWorker; + const worker = workerFactory({ + token: this.opts.token, + accountId: this.opts.accountId, + initialUpdateId: this.opts.getLastUpdateId(), + spoolDir, + apiRoot: ingress.apiRoot, + timeoutSeconds: ingress.timeoutSeconds, + network: ingress.network, + proxy: ingress.proxy, + }); + this.opts.log(`[telegram][diag] isolated polling ingress started spool=${spoolDir}`); + const pollState: { + startedAt: number | null; + offset: number | null; + outcome: string; + error?: string; + } = { + startedAt: null, + offset: null, + outcome: "not-started", + }; + let consecutiveDrainFailures = 0; + const unsubscribe = worker.onMessage((message) => { + if (message.type === "poll-start") { + pollState.startedAt = message.startedAt; + pollState.offset = message.offset; + pollState.outcome = "started"; + delete pollState.error; + return; + } + if (message.type === "poll-success") { + this.#status.notePollSuccess(message.finishedAt); + pollState.outcome = `ok:${message.count}`; + return; + } + if (message.type === "poll-error") { + pollState.outcome = "error"; + pollState.error = message.message; + } + }); + const stopOnAbort = () => { + void worker.stop(); + }; + this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true }); + const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500)); + let drainActive = false; + const drainOnce = async () => { + if (drainActive || this.opts.abortSignal?.aborted) { + return; + } + drainActive = true; + try { + await this.#drainSpooledUpdates({ bot, spoolDir }); + consecutiveDrainFailures = 0; + } catch (err) { + consecutiveDrainFailures += 1; + this.opts.log( + `[telegram][diag] isolated polling spool drain failed (${consecutiveDrainFailures}): ${formatErrorMessage(err)}`, + ); + } finally { + drainActive = false; + } + }; + await drainOnce(); + const drainTimer = setInterval(() => { + void drainOnce(); + }, drainIntervalMs); + drainTimer.unref?.(); + const stopBot = () => { + return Promise.resolve(bot.stop()) + .then(() => undefined) + .catch(() => { + // Bot may already be stopped by shutdown paths. + }); + }; + try { + await worker.task(); + if (this.opts.abortSignal?.aborted) { + return "exit"; + } + const errorText = pollState.error ? ` error=${pollState.error}` : ""; + this.opts.log( + `[telegram][diag] isolated polling ingress stopped outcome=${pollState.outcome} startedAt=${pollState.startedAt ?? "n/a"} offset=${pollState.offset ?? "n/a"}${errorText}`, + ); + const shouldRestart = await this.#waitBeforeRestart( + (delay) => `Telegram isolated polling ingress stopped; restarting in ${delay}.`, + ); + return shouldRestart ? "continue" : "exit"; + } finally { + clearInterval(drainTimer); + unsubscribe(); + this.opts.abortSignal?.removeEventListener("abort", stopOnAbort); + await worker.stop(); + await drainOnce(); + await waitForGracefulStop(stopBot); + } + } + async #runPollingCycle(bot: TelegramBot): Promise<"continue" | "exit"> { const liveness = new TelegramPollingLivenessTracker({ onPollSuccess: (finishedAt) => this.#status.notePollSuccess(finishedAt), diff --git a/extensions/telegram/src/telegram-ingress-spool.test.ts b/extensions/telegram/src/telegram-ingress-spool.test.ts new file mode 100644 index 00000000000..78ea3069536 --- /dev/null +++ b/extensions/telegram/src/telegram-ingress-spool.test.ts @@ -0,0 +1,44 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import { + deleteTelegramSpooledUpdate, + listTelegramSpooledUpdates, + writeTelegramSpooledUpdate, +} from "./telegram-ingress-spool.js"; + +describe("Telegram ingress spool", () => { + it("persists updates durably in update_id order and deletes handled entries", async () => { + const spoolDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-telegram-spool-")); + try { + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 11, message: { text: "second" } }, + now: 2, + }); + await writeTelegramSpooledUpdate({ + spoolDir, + update: { update_id: 10, message: { text: "first" } }, + now: 1, + }); + + const updates = await listTelegramSpooledUpdates({ spoolDir }); + + expect(updates.map((update) => update.updateId)).toEqual([10, 11]); + expect(updates.map((update) => update.receivedAt)).toEqual([1, 2]); + expect(updates[0]?.update).toEqual({ update_id: 10, message: { text: "first" } }); + + if (!updates[0]) { + throw new Error("Expected a spooled update"); + } + await deleteTelegramSpooledUpdate(updates[0]); + + expect( + (await listTelegramSpooledUpdates({ spoolDir })).map((update) => update.updateId), + ).toEqual([11]); + } finally { + await fs.rm(spoolDir, { recursive: true, force: true }); + } + }); +}); diff --git a/extensions/telegram/src/telegram-ingress-spool.ts b/extensions/telegram/src/telegram-ingress-spool.ts new file mode 100644 index 00000000000..5fb623d08bd --- /dev/null +++ b/extensions/telegram/src/telegram-ingress-spool.ts @@ -0,0 +1,133 @@ +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store"; +import { resolveStateDir } from "openclaw/plugin-sdk/state-paths"; + +const SPOOL_VERSION = 1; + +type TelegramSpooledUpdatePayload = { + version: number; + updateId: number; + receivedAt: number; + update: unknown; +}; + +export type TelegramSpooledUpdate = { + updateId: number; + path: string; + update: unknown; + receivedAt: number; +}; + +function normalizeAccountId(accountId?: string) { + const trimmed = accountId?.trim(); + if (!trimmed) { + return "default"; + } + return trimmed.replace(/[^a-z0-9._-]+/gi, "_"); +} + +function isValidUpdateId(value: unknown): value is number { + return typeof value === "number" && Number.isSafeInteger(value) && value >= 0; +} + +export function resolveTelegramIngressSpoolDir(params: { + accountId?: string; + env?: NodeJS.ProcessEnv; +}): string { + const stateDir = resolveStateDir(params.env, os.homedir); + return path.join(stateDir, "telegram", `ingress-spool-${normalizeAccountId(params.accountId)}`); +} + +export function resolveTelegramUpdateId(update: unknown): number | null { + if (!update || typeof update !== "object") { + return null; + } + const value = (update as { update_id?: unknown }).update_id; + return isValidUpdateId(value) ? value : null; +} + +function spoolFileName(updateId: number): string { + return `${String(updateId).padStart(16, "0")}.json`; +} + +function parseSpooledUpdate(value: unknown, filePath: string): TelegramSpooledUpdate | null { + if (!value || typeof value !== "object") { + return null; + } + const payload = value as Partial; + if (payload.version !== SPOOL_VERSION || !isValidUpdateId(payload.updateId)) { + return null; + } + return { + updateId: payload.updateId, + path: filePath, + update: payload.update, + receivedAt: typeof payload.receivedAt === "number" ? payload.receivedAt : 0, + }; +} + +export async function writeTelegramSpooledUpdate(params: { + spoolDir: string; + update: unknown; + now?: number; +}): Promise { + const updateId = resolveTelegramUpdateId(params.update); + if (updateId === null) { + throw new Error("Telegram update missing numeric update_id."); + } + await fs.mkdir(params.spoolDir, { recursive: true }); + const targetPath = path.join(params.spoolDir, spoolFileName(updateId)); + const tempPath = path.join(params.spoolDir, `${spoolFileName(updateId)}.${randomUUID()}.tmp`); + const payload: TelegramSpooledUpdatePayload = { + version: SPOOL_VERSION, + updateId, + receivedAt: params.now ?? Date.now(), + update: params.update, + }; + await fs.writeFile(tempPath, `${JSON.stringify(payload)}\n`, { mode: 0o600 }); + await fs.rename(tempPath, targetPath); + return updateId; +} + +export async function listTelegramSpooledUpdates(params: { + spoolDir: string; + limit?: number; +}): Promise { + let entries: string[]; + try { + entries = await fs.readdir(params.spoolDir); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + return []; + } + throw err; + } + const files = entries + .filter((entry) => entry.endsWith(".json")) + .toSorted() + .slice(0, Math.max(1, params.limit ?? 100)); + const updates: TelegramSpooledUpdate[] = []; + for (const file of files) { + const filePath = path.join(params.spoolDir, file); + const { value } = await readJsonFileWithFallback(filePath, null); + const parsed = parseSpooledUpdate(value, filePath); + if (parsed) { + updates.push(parsed); + } + } + return updates; +} + +export async function deleteTelegramSpooledUpdate(update: TelegramSpooledUpdate): Promise { + try { + await fs.unlink(update.path); + } catch (err) { + if ((err as { code?: string }).code === "ENOENT") { + return; + } + throw err; + } +} diff --git a/extensions/telegram/src/telegram-ingress-worker.runtime.ts b/extensions/telegram/src/telegram-ingress-worker.runtime.ts new file mode 100644 index 00000000000..b71cd9ceabf --- /dev/null +++ b/extensions/telegram/src/telegram-ingress-worker.runtime.ts @@ -0,0 +1,175 @@ +import { parentPort, workerData } from "node:worker_threads"; +import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; +import { normalizeTelegramApiRoot } from "./api-root.js"; +import { resolveTelegramTransport } from "./fetch.js"; +import { isRecoverableTelegramNetworkError } from "./network-errors.js"; +import { makeProxyFetch } from "./proxy.js"; +import { TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS } from "./request-timeouts.js"; +import { writeTelegramSpooledUpdate } from "./telegram-ingress-spool.js"; +import type { + TelegramIngressWorkerMessage, + TelegramIngressWorkerOptions, +} from "./telegram-ingress-worker.js"; +import { writeTelegramUpdateOffset } from "./update-offset-store.js"; + +const options = workerData as TelegramIngressWorkerOptions; +const pollLimit = 100; +const retryInitialMs = 1000; +const retryMaxMs = 30_000; +let stopped = false; +let activeController: AbortController | undefined; + +function post(message: TelegramIngressWorkerMessage): void { + // oxlint-disable-next-line unicorn/require-post-message-target-origin -- Node worker_threads ports do not accept a targetOrigin argument. + parentPort?.postMessage(message); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function formatErrorMessage(err: unknown): string { + if (err instanceof Error) { + return err.message || err.name; + } + return String(err); +} + +function resolveBackoff(attempt: number): number { + return Math.min(retryMaxMs, retryInitialMs * 2 ** Math.max(0, attempt - 1)); +} + +parentPort?.on("message", (message: { type?: string }) => { + if (message?.type !== "stop") { + return; + } + stopped = true; + activeController?.abort(new Error("telegram ingress worker stopped")); +}); + +async function fetchJson(params: { + fetch: typeof fetch; + url: string; + body: unknown; +}): Promise { + const controller = new AbortController(); + activeController = controller; + const timeout = setTimeout(() => { + controller.abort(new Error("Telegram getUpdates timed out")); + }, TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS); + timeout.unref?.(); + try { + const response = await params.fetch(params.url, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(params.body), + signal: controller.signal, + }); + const json = (await response.json()) as { + ok?: unknown; + result?: unknown; + description?: unknown; + }; + if (!response.ok || json.ok !== true) { + throw new Error( + typeof json.description === "string" + ? json.description + : `Telegram getUpdates failed with HTTP ${response.status}`, + ); + } + return json.result; + } finally { + clearTimeout(timeout); + if (activeController === controller) { + activeController = undefined; + } + } +} + +async function main(): Promise { + const proxyFetch = options.proxy ? makeProxyFetch(options.proxy) : undefined; + const transport = resolveTelegramTransport(proxyFetch, { network: options.network }); + const fetchImpl = transport.fetch ?? globalThis.fetch; + const apiRoot = normalizeTelegramApiRoot(options.apiRoot ?? "https://api.telegram.org"); + const getUpdatesUrl = `${apiRoot}/bot${options.token}/getUpdates`; + const pollTimeoutSeconds = + typeof options.timeoutSeconds === "number" && Number.isFinite(options.timeoutSeconds) + ? Math.max(1, Math.floor(options.timeoutSeconds)) + : 30; + let lastUpdateId = options.initialUpdateId; + let failures = 0; + + try { + for (;;) { + if (stopped) { + break; + } + const offset = lastUpdateId === null ? null : lastUpdateId + 1; + const startedAt = Date.now(); + post({ type: "poll-start", offset, startedAt }); + try { + const result = await fetchJson({ + fetch: fetchImpl, + url: getUpdatesUrl, + body: { + timeout: pollTimeoutSeconds, + limit: pollLimit, + allowed_updates: resolveTelegramAllowedUpdates(), + ...(offset === null ? {} : { offset }), + }, + }); + if (!Array.isArray(result)) { + throw new Error("Telegram getUpdates returned a non-array result."); + } + for (const update of result) { + if (stopped) { + break; + } + const updateId = await writeTelegramSpooledUpdate({ + spoolDir: options.spoolDir, + update, + }); + if (lastUpdateId === null || updateId > lastUpdateId) { + lastUpdateId = updateId; + await writeTelegramUpdateOffset({ + accountId: options.accountId, + botToken: options.token, + updateId, + }); + } + post({ type: "spooled", updateId, queued: result.length }); + } + failures = 0; + post({ + type: "poll-success", + offset, + count: result.length, + finishedAt: Date.now(), + }); + } catch (err) { + if (stopped) { + break; + } + failures += 1; + post({ + type: "poll-error", + message: formatErrorMessage(err), + finishedAt: Date.now(), + }); + if (!isRecoverableTelegramNetworkError(err, { context: "polling" })) { + throw err; + } + await sleep(resolveBackoff(failures)); + } + } + } finally { + await transport.close(); + } +} + +main() + .then(() => undefined) + .catch((err) => { + post({ type: "poll-error", message: formatErrorMessage(err), finishedAt: Date.now() }); + process.exitCode = 1; + }); diff --git a/extensions/telegram/src/telegram-ingress-worker.ts b/extensions/telegram/src/telegram-ingress-worker.ts new file mode 100644 index 00000000000..abc095ef590 --- /dev/null +++ b/extensions/telegram/src/telegram-ingress-worker.ts @@ -0,0 +1,93 @@ +import { Worker } from "node:worker_threads"; +import type { TelegramNetworkConfig } from "openclaw/plugin-sdk/config-contracts"; + +export type TelegramIngressWorkerMessage = + | { + type: "poll-start"; + offset: number | null; + startedAt: number; + } + | { + type: "poll-success"; + offset: number | null; + count: number; + finishedAt: number; + } + | { + type: "poll-error"; + message: string; + finishedAt: number; + } + | { + type: "spooled"; + updateId: number; + queued: number; + }; + +export type TelegramIngressWorkerOptions = { + token: string; + accountId: string; + initialUpdateId: number | null; + spoolDir: string; + apiRoot?: string; + timeoutSeconds?: number; + network?: TelegramNetworkConfig; + proxy?: string; +}; + +export type TelegramIngressWorkerHandle = { + onMessage(listener: (message: TelegramIngressWorkerMessage) => void): () => void; + stop(): Promise; + task(): Promise; +}; + +export type TelegramIngressWorkerFactory = ( + options: TelegramIngressWorkerOptions, +) => TelegramIngressWorkerHandle; + +export const createTelegramIngressWorker: TelegramIngressWorkerFactory = (options) => { + const listeners = new Set<(message: TelegramIngressWorkerMessage) => void>(); + const worker = new Worker(new URL("./telegram-ingress-worker.runtime.js", import.meta.url), { + workerData: options, + }); + const taskPromise = new Promise((resolve, reject) => { + worker.once("error", reject); + worker.once("exit", (code) => { + if (code === 0) { + resolve(); + return; + } + reject(new Error(`Telegram ingress worker exited with code ${code}`)); + }); + }); + worker.on("message", (message: TelegramIngressWorkerMessage) => { + for (const listener of listeners) { + listener(message); + } + }); + + return { + onMessage(listener) { + listeners.add(listener); + return () => { + listeners.delete(listener); + }; + }, + async stop() { + // oxlint-disable-next-line unicorn/require-post-message-target-origin -- Node worker_threads workers do not accept a targetOrigin argument. + worker.postMessage({ type: "stop" }); + const timeout = setTimeout(() => { + void worker.terminate(); + }, 15_000); + timeout.unref?.(); + try { + await taskPromise.catch(() => undefined); + } finally { + clearTimeout(timeout); + } + }, + task() { + return taskPromise; + }, + }; +}; diff --git a/extensions/telegram/telegram-ingress-worker.runtime.ts b/extensions/telegram/telegram-ingress-worker.runtime.ts new file mode 100644 index 00000000000..7f52885d414 --- /dev/null +++ b/extensions/telegram/telegram-ingress-worker.runtime.ts @@ -0,0 +1 @@ +import "./src/telegram-ingress-worker.runtime.js"; diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts index fc6bd78cc24..fd4c2a4de07 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.test.ts @@ -309,6 +309,26 @@ describe("stuck session recovery", () => { ]); }); + it("clears stale queued processing state even when the lane has no active work", async () => { + mocks.resolveActiveEmbeddedRunHandleSessionId.mockReturnValue(undefined); + mocks.resolveActiveEmbeddedRunSessionId.mockReturnValue(undefined); + mocks.isEmbeddedPiRunActive.mockReturnValue(false); + mocks.resetCommandLane.mockReturnValue(0); + + await recoverStuckDiagnosticSession({ + sessionId: "stale-session", + sessionKey: "agent:main:main", + ageMs: 180_000, + queueDepth: 2, + }); + + expect(mocks.resetCommandLane).toHaveBeenCalledWith("session:agent:main:main"); + expect(warnLogMessages()).toEqual([ + "stuck session recovery: sessionId=stale-session sessionKey=agent:main:main age=180s action=release_lane aborted=false drained=true released=0", + "stuck session recovery outcome: status=released action=release_lane sessionId=stale-session sessionKey=agent:main:main lane=session:agent:main:main released=0", + ]); + }); + it("releases a stale session-id lane when no session key is available", async () => { mocks.isEmbeddedPiRunHandleActive.mockReturnValue(false); mocks.resetCommandLane.mockReturnValue(1); diff --git a/src/logging/diagnostic-stuck-session-recovery.runtime.ts b/src/logging/diagnostic-stuck-session-recovery.runtime.ts index df554859768..0fbb94cb22c 100644 --- a/src/logging/diagnostic-stuck-session-recovery.runtime.ts +++ b/src/logging/diagnostic-stuck-session-recovery.runtime.ts @@ -165,7 +165,9 @@ export async function recoverStuckDiagnosticSession( const released = sessionLane && (!activeSessionId || !aborted || !drained) ? resetCommandLane(sessionLane) : 0; - if (aborted || released > 0) { + const clearStaleQueuedSession = !aborted && released === 0 && (params.queueDepth ?? 0) > 0; + + if (aborted || released > 0 || clearStaleQueuedSession) { const action = aborted ? "abort_embedded_run" : "release_lane"; const stoppedFields = formatStoppedCronSessionDiagnosticFields( resolveCronSessionDiagnosticContext({ sessionKey: params.sessionKey, activeSessionId }),