From 60fea81cf19f7fbe485a9c6b4c4045532f8b0f88 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 20 Apr 2026 23:03:57 +0100 Subject: [PATCH] fix(telegram): harden polling transport liveness (#69476) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(telegram): release undici dispatchers via TelegramTransport.close() TelegramTransport now exposes an explicit close() that destroys every owned undici dispatcher (default Agent plus lazily-created IPv4 and IP-pinned fallback Agents) and the TCP sockets they hold. Dispatcher constructors are also given bounded keep-alive defaults (keepAliveTimeout, keepAliveMaxTimeout, connections, pipelining) as a defence-in-depth layer so the pool cannot grow unbounded even if a caller forgets to call close(). Without this, every transport that went through a fallback retry left its fallback Agents anchored forever in a closure; long-running polling sessions accumulated hundreds of ESTABLISHED keep-alive sockets to api.telegram.org, saturating the per-IP quota on upstream forward proxies and making the currently-active outbound node time out while every other node still tested healthy. Mock dispatchers in fetch.test.ts gain destroy() spies so the close() chain is assertable. Call sites that built caller-owned transports from globalThis.fetch (delivery.resolve-media, test helpers) return an async no-op close(), matching the new required surface. Co-Authored-By: Claude Opus 4.7 (1M context) * fix(telegram): dispose polling transport on shutdown and dirty rebuild Every recoverable network error and stall-watchdog trip sets TelegramPollingTransportState.#transportDirty so the next polling cycle rebuilds the transport inside acquireForNextCycle(). Previously the rebuild simply overwrote the field, leaving the old transport's keep-alive sockets anchored in the now-unreferenced dispatcher — the polling loop has no natural GC point for these resources, and Node's object GC never touches OS-level sockets. acquireForNextCycle() now closes the previous transport (fire-and- forget so the polling cycle is not blocked by a slow destroy) before swapping in the rebuilt one. dispose() is a new method that the owning TelegramPollingSession calls from the finally block of runUntilAbort(), so a single transport is always tied to a single polling session lifetime. After dispose(), acquireForNextCycle() returns undefined to prevent zombie rebuilds. Under high sustained polling traffic over long-lived sessions, this is what stops the per-gateway connection count to api.telegram.org from growing indefinitely and saturating upstream proxy quotas. Co-Authored-By: Claude Opus 4.7 (1M context) * docs(changelog): note Telegram undici dispatcher lifecycle fix Co-Authored-By: Claude Opus 4.7 (1M context) * fix(telegram): disable HTTP/2 for all Telegram polling dispatchers Undici 8 enables HTTP/2 ALPN by default, but Telegram's long-polling connections stall on Windows due to IPv6 + H2 multiplexing issues. The core fetch-guard already sets allowH2:false for guarded paths, but the Telegram extension creates its own Agent/ProxyAgent/EnvHttpProxyAgent instances directly from undici without this flag. Apply allowH2:false to all dispatcher constructors in the Telegram transport layer, matching the approach used in src/infra/net/undici-runtime.ts. Fixes #66885 * fix: avoid false telegram polling stall restarts * fix(telegram): publish polling health liveness --------- Co-authored-by: Ethan Chen Co-authored-by: Claude Opus 4.7 (1M context) Co-authored-by: Magicray1217 Co-authored-by: aoao --- CHANGELOG.md | 2 + docs/channels/telegram.md | 1 + docs/channels/troubleshooting.md | 15 +- extensions/telegram/src/bot.test.ts | 2 + .../bot/delivery.resolve-media-retry.test.ts | 3 +- .../src/bot/delivery.resolve-media.ts | 3 + extensions/telegram/src/channel.ts | 7 +- extensions/telegram/src/fetch.test.ts | 128 ++++++++- extensions/telegram/src/fetch.ts | 131 ++++++++-- extensions/telegram/src/monitor.test.ts | 7 + extensions/telegram/src/monitor.ts | 1 + extensions/telegram/src/monitor.types.ts | 6 +- .../telegram/src/polling-session.test.ts | 246 +++++++++++++++++- extensions/telegram/src/polling-session.ts | 57 ++-- .../src/polling-transport-state.test.ts | 163 ++++++++++++ .../telegram/src/polling-transport-state.ts | 56 +++- src/gateway/channel-health-policy.test.ts | 46 +++- src/gateway/channel-health-policy.ts | 24 +- 18 files changed, 823 insertions(+), 75 deletions(-) create mode 100644 extensions/telegram/src/polling-transport-state.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c36fa96551..ad1d2d6bcb5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -111,6 +111,8 @@ Docs: https://docs.openclaw.ai - Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus. - Bundled plugins/install: keep staged bundled plugin runtime imports resolving through the packaged Plugin SDK while omitting checkout-only aliases from the dist inventory, so published installs do not fail on repo-local paths. - Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus. +- Telegram/polling transport: give the Telegram undici dispatcher pool bounded keep-alive defaults and an explicit lifecycle. Previously every recoverable network error and stall watchdog trip silently replaced the transport, abandoning the old dispatcher pool and its sockets; long-running gateway processes accumulated hundreds of ESTABLISHED connections to `api.telegram.org`, saturating per-IP upstream proxy quotas and causing the actively-used outbound proxy node to time out while every other node still tested healthy. Transports now expose `close()`, `TelegramPollingTransportState` destroys the stale transport on dirty-rebuild, and `TelegramPollingSession` disposes the transport when polling exits — backed by a strict per-origin pool cap on every constructed `Agent`, `ProxyAgent`, and `EnvHttpProxyAgent` as defence in depth. +- Telegram/polling: publish successful `getUpdates` calls as account health liveness, avoid false stall restarts after recoverable `getUpdates` errors, and force Telegram API dispatchers to HTTP/1.1 so stalled polling recovers instead of sitting connected-but-dead. - Telegram/ACP bindings: drop persisted DM bindings that still point at missing or failed ACP sessions on restart, while preserving plugin-owned bindings and uncertain store reads. (#67822) Thanks @chinar-amrutkar. - Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu. - Memory/sqlite-vec: emit the degraded sqlite-vec warning once per degraded episode instead of repeating it for every file write, while preserving the latch across safe-reindex rollback and resetting it when vector state is genuinely rebuilt. (#67898) Thanks @rubencu. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index b39a0b6b909..20522e517c2 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -917,6 +917,7 @@ Per-account, per-group, and per-topic overrides are supported (same inheritance - Node 22+ + custom fetch/proxy can trigger immediate abort behavior if AbortSignal types mismatch. - Some hosts resolve `api.telegram.org` to IPv6 first; broken IPv6 egress can cause intermittent Telegram API failures. - If logs include `TypeError: fetch failed` or `Network request for 'getUpdates' failed!`, OpenClaw now retries these as recoverable network errors. + - If logs include `Polling stall detected`, OpenClaw restarts polling and rebuilds the Telegram transport. Persistent stalls usually point to proxy, DNS, IPv6, or TLS egress issues between the host and `api.telegram.org`. - On VPS hosts with unstable direct egress/TLS, route Telegram API calls through `channels.telegram.proxy`: ```yaml diff --git a/docs/channels/troubleshooting.md b/docs/channels/troubleshooting.md index 889c0d01f67..72033c8e8bd 100644 --- a/docs/channels/troubleshooting.md +++ b/docs/channels/troubleshooting.md @@ -45,13 +45,14 @@ Full troubleshooting: [/channels/whatsapp#troubleshooting](/channels/whatsapp#tr ### Telegram failure signatures -| Symptom | Fastest check | Fix | -| ----------------------------------- | ----------------------------------------------- | --------------------------------------------------------------------------- | -| `/start` but no usable reply flow | `openclaw pairing list telegram` | Approve pairing or change DM policy. | -| Bot online but group stays silent | Verify mention requirement and bot privacy mode | Disable privacy mode for group visibility or mention bot. | -| Send failures with network errors | Inspect logs for Telegram API call failures | Fix DNS/IPv6/proxy routing to `api.telegram.org`. | -| `setMyCommands` rejected at startup | Inspect logs for `BOT_COMMANDS_TOO_MUCH` | Reduce plugin/skill/custom Telegram commands or disable native menus. | -| Upgraded and allowlist blocks you | `openclaw security audit` and config allowlists | Run `openclaw doctor --fix` or replace `@username` with numeric sender IDs. | +| Symptom | Fastest check | Fix | +| ----------------------------------- | ------------------------------------------------ | --------------------------------------------------------------------------- | +| `/start` but no usable reply flow | `openclaw pairing list telegram` | Approve pairing or change DM policy. | +| Bot online but group stays silent | Verify mention requirement and bot privacy mode | Disable privacy mode for group visibility or mention bot. | +| Send failures with network errors | Inspect logs for Telegram API call failures | Fix DNS/IPv6/proxy routing to `api.telegram.org`. | +| Polling stalls or reconnects slowly | `openclaw logs --follow` for polling diagnostics | Upgrade, then check proxy/DNS/IPv6 if `getUpdates` keeps timing out. | +| `setMyCommands` rejected at startup | Inspect logs for `BOT_COMMANDS_TOO_MUCH` | Reduce plugin/skill/custom Telegram commands or disable native menus. | +| Upgraded and allowlist blocks you | `openclaw security audit` and config allowlists | Run `openclaw doctor --fix` or replace `@username` with numeric sender IDs. | Full troubleshooting: [/channels/telegram#troubleshooting](/channels/telegram#troubleshooting) diff --git a/extensions/telegram/src/bot.test.ts b/extensions/telegram/src/bot.test.ts index 86a06f3a861..46c228dd02d 100644 --- a/extensions/telegram/src/bot.test.ts +++ b/extensions/telegram/src/bot.test.ts @@ -1334,6 +1334,7 @@ describe("createTelegramBot", () => { telegramTransport: { fetch: mediaFetch as typeof fetch, sourceFetch: mediaFetch as typeof fetch, + close: async () => {}, }, }); const handler = getOnHandler("message") as (ctx: Record) => Promise; @@ -1447,6 +1448,7 @@ describe("createTelegramBot", () => { telegramTransport: { fetch: mediaFetch as typeof fetch, sourceFetch: mediaFetch as typeof fetch, + close: async () => {}, }, }); const handler = getOnHandler("message") as (ctx: Record) => Promise; diff --git a/extensions/telegram/src/bot/delivery.resolve-media-retry.test.ts b/extensions/telegram/src/bot/delivery.resolve-media-retry.test.ts index d73ef198026..2d6ba93c66d 100644 --- a/extensions/telegram/src/bot/delivery.resolve-media-retry.test.ts +++ b/extensions/telegram/src/bot/delivery.resolve-media-retry.test.ts @@ -345,6 +345,7 @@ describe("resolveMedia getFile retry", () => { fetch: callerFetch, sourceFetch: callerFetch, dispatcherAttempts, + close: async () => {}, }; fetchRemoteMedia.mockResolvedValueOnce({ buffer: Buffer.from("pdf-data"), @@ -379,7 +380,7 @@ describe("resolveMedia getFile retry", () => { it("uses caller-provided fetch impl for sticker downloads", async () => { const getFile = vi.fn().mockResolvedValue({ file_path: "stickers/file_0.webp" }); const callerFetch = vi.fn() as unknown as typeof fetch; - const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch }; + const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch, close: async () => {} }; fetchRemoteMedia.mockResolvedValueOnce({ buffer: Buffer.from("sticker-data"), contentType: "image/webp", diff --git a/extensions/telegram/src/bot/delivery.resolve-media.ts b/extensions/telegram/src/bot/delivery.resolve-media.ts index 3674bd12830..174831bd392 100644 --- a/extensions/telegram/src/bot/delivery.resolve-media.ts +++ b/extensions/telegram/src/bot/delivery.resolve-media.ts @@ -151,6 +151,9 @@ function resolveRequiredTelegramTransport(transport?: TelegramTransport): Telegr return { fetch: resolvedFetch, sourceFetch: resolvedFetch, + // Caller-owned transport constructed from the globalThis fetch — it owns + // no dispatcher lifecycle of its own, so close() is a no-op. + close: async () => {}, }; } diff --git a/extensions/telegram/src/channel.ts b/extensions/telegram/src/channel.ts index 3437f3db2f4..0610815bd72 100644 --- a/extensions/telegram/src/channel.ts +++ b/extensions/telegram/src/channel.ts @@ -5,6 +5,7 @@ import { } from "openclaw/plugin-sdk/allowlist-config-edit"; import type { ChannelMessageActionAdapter } from "openclaw/plugin-sdk/channel-contract"; import { clearAccountEntryFields, createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core"; +import { createAccountStatusSink } from "openclaw/plugin-sdk/channel-lifecycle"; import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing"; import { createAllowlistProviderRouteAllowlistWarningCollector } from "openclaw/plugin-sdk/channel-policy"; import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result"; @@ -766,7 +767,6 @@ export const telegramPlugin = createChatChannelPlugin({ actions: telegramMessageActions, status: createComputedAccountStatusAdapter({ defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID), - skipStaleSocketHealthCheck: true, collectStatusIssues: collectTelegramStatusIssues, buildChannelSummary: ({ snapshot }) => buildTokenChannelStatusSummary(snapshot), probeAccount: async ({ account, timeoutMs }) => @@ -902,6 +902,10 @@ export const telegramPlugin = createChatChannelPlugin({ } } ctx.log?.info(`[${account.accountId}] starting provider${telegramBotLabel}`); + const setStatus = createAccountStatusSink({ + accountId: account.accountId, + setStatus: ctx.setStatus, + }); return resolveTelegramMonitor()({ token, accountId: account.accountId, @@ -916,6 +920,7 @@ export const telegramPlugin = createChatChannelPlugin({ webhookHost: account.config.webhookHost, webhookPort: account.config.webhookPort, webhookCertPath: account.config.webhookCertPath, + setStatus, }); }, logoutAccount: async ({ accountId, cfg }) => { diff --git a/extensions/telegram/src/fetch.test.ts b/extensions/telegram/src/fetch.test.ts index 595ac2c5933..ace61c184cd 100644 --- a/extensions/telegram/src/fetch.test.ts +++ b/extensions/telegram/src/fetch.test.ts @@ -7,28 +7,37 @@ const loggerDebug = vi.hoisted(() => vi.fn()); const undiciFetch = vi.hoisted(() => vi.fn()); const setGlobalDispatcher = vi.hoisted(() => vi.fn()); +type MockDispatcherInstance = { + options?: Record | string; + destroy: ReturnType; + close: ReturnType; +}; + const AgentCtor = vi.hoisted(() => - vi.fn(function MockAgent( - this: { options?: Record }, - options?: Record, - ) { + vi.fn(function MockAgent(this: MockDispatcherInstance, options?: Record) { this.options = options; + this.destroy = vi.fn(async () => undefined); + this.close = vi.fn(async () => undefined); }), ); const EnvHttpProxyAgentCtor = vi.hoisted(() => vi.fn(function MockEnvHttpProxyAgent( - this: { options?: Record }, + this: MockDispatcherInstance, options?: Record, ) { this.options = options; + this.destroy = vi.fn(async () => undefined); + this.close = vi.fn(async () => undefined); }), ); const ProxyAgentCtor = vi.hoisted(() => vi.fn(function MockProxyAgent( - this: { options?: Record | string }, + this: MockDispatcherInstance, options?: Record | string, ) { this.options = options; + this.destroy = vi.fn(async () => undefined); + this.close = vi.fn(async () => undefined); }), ); @@ -140,6 +149,7 @@ function getDispatcherFromUndiciCall(nth: number) { return init?.dispatcher as | { options?: { + allowH2?: boolean; connect?: Record; proxyTls?: Record; requestTls?: Record; @@ -186,6 +196,7 @@ function expectStickyAutoSelectDispatcher( dispatcher: | { options?: { + allowH2?: boolean; connect?: Record; proxyTls?: Record; requestTls?: Record; @@ -202,6 +213,22 @@ function expectStickyAutoSelectDispatcher( ); } +function expectHttp1OnlyDispatcher( + dispatcher: + | { + options?: { + allowH2?: boolean; + }; + } + | undefined, +): void { + expect(dispatcher?.options).toEqual( + expect.objectContaining({ + allowH2: false, + }), + ); +} + function expectPinnedIpv4ConnectDispatcher(args: { pinnedCall: number; firstCall?: number; @@ -319,6 +346,7 @@ describe("resolveTelegramFetch", () => { const dispatcher = getDispatcherFromUndiciCall(1); expect(dispatcher).toBeDefined(); + expectHttp1OnlyDispatcher(dispatcher); expect(dispatcher?.options?.connect).toEqual( expect.objectContaining({ autoSelectFamily: true, @@ -354,6 +382,7 @@ describe("resolveTelegramFetch", () => { expect(AgentCtor).not.toHaveBeenCalled(); const dispatcher = getDispatcherFromUndiciCall(1); + expectHttp1OnlyDispatcher(dispatcher); expect(dispatcher?.options?.connect).toEqual( expect.objectContaining({ autoSelectFamily: false, @@ -379,6 +408,7 @@ describe("resolveTelegramFetch", () => { expect(ProxyAgentCtor).toHaveBeenCalledTimes(1); expect(ProxyAgentCtor).toHaveBeenCalledWith( expect.objectContaining({ + allowH2: false, uri: "http://127.0.0.1:7777", }), ); @@ -398,6 +428,7 @@ describe("resolveTelegramFetch", () => { await resolved("https://api.telegram.org/botx/getMe"); const dispatcher = getDispatcherFromUndiciCall(1); + expectHttp1OnlyDispatcher(dispatcher); expect(dispatcher?.options?.connect).toEqual( expect.objectContaining({ autoSelectFamily: true, @@ -431,6 +462,7 @@ describe("resolveTelegramFetch", () => { expect(EnvHttpProxyAgentCtor).not.toHaveBeenCalled(); expect(AgentCtor).not.toHaveBeenCalled(); const dispatcher = getDispatcherFromUndiciCall(1); + expectHttp1OnlyDispatcher(dispatcher); expect(dispatcher?.options).toEqual( expect.objectContaining({ uri: "http://127.0.0.1:7890", @@ -828,4 +860,88 @@ describe("resolveTelegramFetch", () => { expect(setDefaultResultOrder).not.toHaveBeenCalled(); expect(setDefaultAutoSelectFamily).not.toHaveBeenCalled(); }); + + describe("transport lifecycle", () => { + it("passes a bounded keep-alive pool configuration to every constructed dispatcher", () => { + resolveTelegramTransport(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + // One direct Agent for the default dispatcher plus two lazy fallbacks not yet touched. + expect(AgentCtor).toHaveBeenCalledTimes(1); + const defaultAgent = AgentCtor.mock.instances[0]?.options; + expect(defaultAgent).toEqual( + expect.objectContaining({ + allowH2: false, + keepAliveTimeout: expect.any(Number), + keepAliveMaxTimeout: expect.any(Number), + connections: expect.any(Number), + pipelining: expect.any(Number), + }), + ); + const connections = (defaultAgent as { connections?: number }).connections; + expect(connections).toBeGreaterThan(0); + expect(connections).toBeLessThan(100); + }); + + it("close() destroys the default dispatcher and all lazily-created fallback dispatchers", async () => { + undiciFetch + .mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH")) + .mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH")) + .mockResolvedValueOnce({ ok: true } as Response); + + const transport = resolveTelegramTransport(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + // Trigger fallback chain so the two lazy fallback dispatchers are instantiated. + await transport.fetch("https://api.telegram.org/botx/getMe"); + + // Three Agents total: default + IPv4 fallback + pinned-IP fallback. + expect(AgentCtor).toHaveBeenCalledTimes(3); + const instances = AgentCtor.mock.instances; + expect(instances).toHaveLength(3); + + await transport.close(); + + for (const instance of instances) { + expect(instance.destroy).toHaveBeenCalledTimes(1); + } + }); + + it("close() is idempotent", async () => { + const transport = resolveTelegramTransport(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + const instance = AgentCtor.mock.instances[0]; + + await transport.close(); + await transport.close(); + await transport.close(); + + expect(instance.destroy).toHaveBeenCalledTimes(1); + }); + + it("close() swallows dispatcher destroy failures so callers can safely fire-and-forget", async () => { + const transport = resolveTelegramTransport(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + const instance = AgentCtor.mock.instances[0]; + instance.destroy.mockRejectedValueOnce(new Error("already destroyed")); + + await expect(transport.close()).resolves.toBeUndefined(); + }); + }); }); diff --git a/extensions/telegram/src/fetch.ts b/extensions/telegram/src/fetch.ts index 5f7f495d392..5827c598bee 100644 --- a/extensions/telegram/src/fetch.ts +++ b/extensions/telegram/src/fetch.ts @@ -28,6 +28,36 @@ const TELEGRAM_AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS = 300; const TELEGRAM_API_HOSTNAME = "api.telegram.org"; const TELEGRAM_FALLBACK_IPS: readonly string[] = ["149.154.167.220"]; +// Dispatcher defaults that bound the per-origin connection pool. Telegram long +// polling keeps a handful of connections hot for hours, so the defaults must be +// strict enough that (a) idle sockets are closed even when the pool is still +// actively used and (b) the pool itself cannot grow unbounded under transient +// concurrency spikes. These values are a defence-in-depth layer; the primary +// fix for the leak observed in openclaw#68128 is the transport lifecycle that +// calls `close()` on abandoned dispatchers. +const TELEGRAM_DISPATCHER_KEEP_ALIVE_TIMEOUT_MS = 30_000; +const TELEGRAM_DISPATCHER_KEEP_ALIVE_MAX_TIMEOUT_MS = 600_000; +const TELEGRAM_DISPATCHER_CONNECTIONS_PER_ORIGIN = 10; +const TELEGRAM_DISPATCHER_PIPELINING = 1; + +type TelegramAgentPoolOptions = { + allowH2: false; + keepAliveTimeout: number; + keepAliveMaxTimeout: number; + connections: number; + pipelining: number; +}; + +function telegramAgentPoolOptions(): TelegramAgentPoolOptions { + return { + allowH2: false, + keepAliveTimeout: TELEGRAM_DISPATCHER_KEEP_ALIVE_TIMEOUT_MS, + keepAliveMaxTimeout: TELEGRAM_DISPATCHER_KEEP_ALIVE_MAX_TIMEOUT_MS, + connections: TELEGRAM_DISPATCHER_CONNECTIONS_PER_ORIGIN, + pipelining: TELEGRAM_DISPATCHER_PIPELINING, + }; +} + type RequestInitWithDispatcher = RequestInit & { dispatcher?: unknown; }; @@ -265,14 +295,18 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): { mode: TelegramDispatcherMode; effectivePolicy: PinnedDispatcherPolicy; } { + // Telegram polling uses long-lived connections. Undici 8 enables HTTP/2 ALPN + // by default, which can stall Telegram long-polling on Windows/IPv6 networks. + // Force HTTP/1.1 for every dispatcher while keeping bounded pool defaults. + const poolOptions = telegramAgentPoolOptions(); + if (policy.mode === "explicit-proxy") { const requestTlsOptions = withPinnedLookup(policy.proxyTls, policy.pinnedHostname); - const proxyOptions = requestTlsOptions - ? ({ - uri: policy.proxyUrl, - requestTls: requestTlsOptions, - } satisfies ConstructorParameters[0]) - : policy.proxyUrl; + const proxyOptions = { + uri: policy.proxyUrl, + ...poolOptions, + ...(requestTlsOptions ? { requestTls: requestTlsOptions } : {}), + } satisfies ConstructorParameters[0]; try { return { dispatcher: new ProxyAgent(proxyOptions), @@ -288,13 +322,11 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): { if (policy.mode === "env-proxy") { const connectOptions = withPinnedLookup(policy.connect, policy.pinnedHostname); const proxyTlsOptions = withPinnedLookup(policy.proxyTls, policy.pinnedHostname); - const proxyOptions = - connectOptions || proxyTlsOptions - ? ({ - ...(connectOptions ? { connect: connectOptions } : {}), - ...(proxyTlsOptions ? { proxyTls: proxyTlsOptions } : {}), - } satisfies ConstructorParameters[0]) - : undefined; + const proxyOptions = { + ...poolOptions, + ...(connectOptions ? { connect: connectOptions } : {}), + ...(proxyTlsOptions ? { proxyTls: proxyTlsOptions } : {}), + } satisfies ConstructorParameters[0]; try { return { dispatcher: new EnvHttpProxyAgent(proxyOptions), @@ -310,11 +342,10 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): { ...(connectOptions ? { connect: connectOptions } : {}), }; return { - dispatcher: new Agent( - directPolicy.connect - ? ({ connect: directPolicy.connect } satisfies ConstructorParameters[0]) - : undefined, - ), + dispatcher: new Agent({ + ...poolOptions, + ...(directPolicy.connect ? { connect: directPolicy.connect } : {}), + } satisfies ConstructorParameters[0]), mode: "direct", effectivePolicy: directPolicy, }; @@ -323,13 +354,10 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): { const connectOptions = withPinnedLookup(policy.connect, policy.pinnedHostname); return { - dispatcher: new Agent( - connectOptions - ? ({ - connect: connectOptions, - } satisfies ConstructorParameters[0]) - : undefined, - ), + dispatcher: new Agent({ + ...poolOptions, + ...(connectOptions ? { connect: connectOptions } : {}), + } satisfies ConstructorParameters[0]), mode: "direct", effectivePolicy: policy, }; @@ -429,13 +457,29 @@ export type TelegramTransport = { fetch: typeof fetch; sourceFetch: typeof fetch; dispatcherAttempts?: TelegramDispatcherAttempt[]; + /** + * Release all dispatchers owned by this transport and the TCP sockets they + * hold. Safe to call multiple times; subsequent calls resolve immediately. + * + * Callers that pass their own `proxyFetch` own the underlying dispatcher + * lifecycle themselves and this is effectively a no-op. Callers that let + * this module construct the transport MUST invoke `close()` when the + * transport is no longer needed (e.g. on polling session dispose or when + * swapping transports after a network stall); otherwise undici keeps the + * keep-alive sockets open indefinitely, leaking hundreds of connections + * to api.telegram.org over long-running sessions. + */ + close(): Promise; }; function createTelegramTransportAttempts(params: { defaultDispatcher: ReturnType; allowFallback: boolean; fallbackPolicy?: PinnedDispatcherPolicy; + ownedDispatchers: Set; }): TelegramTransportAttempt[] { + params.ownedDispatchers.add(params.defaultDispatcher.dispatcher); + const attempts: TelegramTransportAttempt[] = [ { createDispatcher: () => params.defaultDispatcher.dispatcher, @@ -447,12 +491,14 @@ function createTelegramTransportAttempts(params: { return attempts; } const fallbackPolicy = params.fallbackPolicy; + const ownedDispatchers = params.ownedDispatchers; let ipv4Dispatcher: TelegramDispatcher | null = null; attempts.push({ createDispatcher: () => { if (!ipv4Dispatcher) { ipv4Dispatcher = createTelegramDispatcher(fallbackPolicy).dispatcher; + ownedDispatchers.add(ipv4Dispatcher); } return ipv4Dispatcher; }, @@ -476,6 +522,7 @@ function createTelegramTransportAttempts(params: { createDispatcher: () => { if (!fallbackIpDispatcher) { fallbackIpDispatcher = createTelegramDispatcher(fallbackIpPolicy).dispatcher; + ownedDispatchers.add(fallbackIpDispatcher); } return fallbackIpDispatcher; }, @@ -486,6 +533,23 @@ function createTelegramTransportAttempts(params: { return attempts; } +async function destroyOwnedDispatchers(dispatchers: Iterable): Promise { + // Use destroy() rather than close() so abandoned sockets are released + // immediately without waiting for in-flight requests that the caller has + // already decided to abandon (session aborted, or stale transport being + // replaced after a stall). The per-dispatcher try/catch isolates failures + // (already-destroyed dispatchers throw) so Promise.all never rejects. + await Promise.all( + [...dispatchers].map(async (dispatcher) => { + try { + await dispatcher.destroy(); + } catch { + // Intentionally ignored: dispatcher may already be destroyed. + } + }), + ); +} + export function resolveTelegramTransport( proxyFetch?: typeof fetch, options?: { network?: TelegramNetworkConfig }, @@ -518,7 +582,8 @@ export function resolveTelegramTransport( : undiciSourceFetch; const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value); if (effectiveProxyFetch && !explicitProxyUrl) { - return { fetch: sourceFetch, sourceFetch }; + // The caller owns the underlying dispatcher lifecycle; nothing to close here. + return { fetch: sourceFetch, sourceFetch, close: async () => {} }; } const useEnvProxy = !explicitProxyUrl && hasEnvHttpProxyForTelegramApi(); @@ -543,10 +608,12 @@ export function resolveTelegramTransport( proxyUrl: explicitProxyUrl, }).policy : undefined; + const ownedDispatchers = new Set(); const transportAttempts = createTelegramTransportAttempts({ defaultDispatcher, allowFallback: allowStickyFallback, fallbackPolicy: fallbackDispatcherPolicy, + ownedDispatchers, }); let stickyAttemptIndex = 0; @@ -615,10 +682,22 @@ export function resolveTelegramTransport( throw err; }) as typeof fetch; + let closed = false; + const close = async (): Promise => { + if (closed) { + return; + } + closed = true; + const toDestroy = [...ownedDispatchers]; + ownedDispatchers.clear(); + await destroyOwnedDispatchers(toDestroy); + }; + return { fetch: resolvedFetch, sourceFetch, dispatcherAttempts: transportAttempts.map((attempt) => attempt.exportAttempt), + close, }; } diff --git a/extensions/telegram/src/monitor.test.ts b/extensions/telegram/src/monitor.test.ts index a00aff43cfc..9e25ca7c4f2 100644 --- a/extensions/telegram/src/monitor.test.ts +++ b/extensions/telegram/src/monitor.test.ts @@ -87,6 +87,7 @@ const { resolveTelegramTransportSpy } = vi.hoisted(() => ({ resolveTelegramTransportSpy: vi.fn(() => ({ fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), })), })); @@ -358,6 +359,7 @@ describe("monitorTelegramProvider (grammY)", () => { resolveTelegramTransportSpy.mockReset().mockImplementation(() => ({ fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), })); registerUnhandledRejectionHandlerMock.mockClear(); resetUnhandledRejection(); @@ -565,10 +567,12 @@ describe("monitorTelegramProvider (grammY)", () => { const telegramTransport = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), }; const rebuiltTransport = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), }; resolveTelegramTransportSpy .mockReturnValueOnce(telegramTransport) @@ -600,10 +604,12 @@ describe("monitorTelegramProvider (grammY)", () => { const telegramTransport = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), }; const rebuiltTransport = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), }; resolveTelegramTransportSpy .mockReturnValueOnce(telegramTransport) @@ -760,6 +766,7 @@ describe("monitorTelegramProvider (grammY)", () => { const telegramTransport = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), }; resolveTelegramTransportSpy.mockReturnValueOnce(telegramTransport); diff --git a/extensions/telegram/src/monitor.ts b/extensions/telegram/src/monitor.ts index 7b1427ac9cb..40a4cd06586 100644 --- a/extensions/telegram/src/monitor.ts +++ b/extensions/telegram/src/monitor.ts @@ -227,6 +227,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { log, telegramTransport, createTelegramTransport: createTelegramTransportForPolling, + setStatus: opts.setStatus, }); await pollingSession.runUntilAbort(); } finally { diff --git a/extensions/telegram/src/monitor.types.ts b/extensions/telegram/src/monitor.types.ts index 7eeb46dfb3a..0a9834e333a 100644 --- a/extensions/telegram/src/monitor.types.ts +++ b/extensions/telegram/src/monitor.types.ts @@ -1,4 +1,7 @@ -import type { ChannelRuntimeSurface } from "openclaw/plugin-sdk/channel-contract"; +import type { + ChannelAccountSnapshot, + ChannelRuntimeSurface, +} from "openclaw/plugin-sdk/channel-contract"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; @@ -17,6 +20,7 @@ export type MonitorTelegramOpts = { proxyFetch?: typeof fetch; webhookUrl?: string; webhookCertPath?: string; + setStatus?: (patch: Omit) => void; }; export type TelegramMonitorFn = (opts?: MonitorTelegramOpts) => Promise; diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index 11826723390..01cec86c0f5 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -1,3 +1,4 @@ +import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const runMock = vi.hoisted(() => vi.fn()); @@ -97,7 +98,11 @@ function expectTelegramBotTransportSequence(firstTransport: unknown, secondTrans } function makeTelegramTransport() { - return { fetch: globalThis.fetch, sourceFetch: globalThis.fetch }; + return { + fetch: globalThis.fetch, + sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), + }; } function mockRestartAfterPollingError(error: unknown, abort: AbortController) { @@ -136,6 +141,7 @@ function createPollingSession(params: { log?: (message: string) => void; telegramTransport?: ReturnType; createTelegramTransport?: () => ReturnType; + setStatus?: (patch: Omit) => void; }) { return new TelegramPollingSession({ token: "tok", @@ -149,6 +155,7 @@ function createPollingSession(params: { persistUpdateId: async () => undefined, log: params.log ?? (() => undefined), telegramTransport: params.telegramTransport, + setStatus: params.setStatus, ...(params.createTelegramTransport ? { createTelegramTransport: params.createTelegramTransport } : {}), @@ -188,6 +195,19 @@ function mockLongRunningPollingCycle(runnerStop: AsyncVoidFn) { return () => firstTaskResolve?.(); } +async function waitForApiMiddleware( + getApiMiddleware: () => TelegramApiMiddleware | undefined, +): Promise { + for (let attempt = 0; attempt < 20; attempt += 1) { + const apiMiddleware = getApiMiddleware(); + if (apiMiddleware) { + return apiMiddleware; + } + await Promise.resolve(); + } + throw new Error("Telegram API middleware was not installed"); +} + describe("TelegramPollingSession", () => { beforeAll(async () => { ({ TelegramPollingSession } = await import("./polling-session.js")); @@ -366,8 +386,16 @@ describe("TelegramPollingSession", () => { const watchdogHarness = installPollingStallWatchdogHarness(); - const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch }; - const transport2 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch }; + const transport1 = { + fetch: globalThis.fetch, + sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), + }; + const transport2 = { + fetch: globalThis.fetch, + sourceFetch: globalThis.fetch, + close: vi.fn(async () => undefined), + }; const createTelegramTransport = vi.fn(() => transport2); try { @@ -420,6 +448,171 @@ describe("TelegramPollingSession", () => { expect(createTelegramTransport).toHaveBeenCalledTimes(1); }); + it("does not trigger stall restart shortly after a getUpdates error", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); + const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); + + const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 30_000], 119_999); + + const log = vi.fn(); + const session = createPollingSession({ + abortSignal: abort.signal, + log, + }); + + try { + const runPromise = session.runUntilAbort(); + const watchdog = await watchdogHarness.waitForWatchdog(); + + const apiMiddleware = getApiMiddleware(); + if (apiMiddleware) { + const failedGetUpdates = vi.fn(async () => { + throw new Error("Network request for 'getUpdates' failed!"); + }); + await expect(apiMiddleware(failedGetUpdates, "getUpdates", { offset: 1 })).rejects.toThrow( + "Network request for 'getUpdates' failed!", + ); + } + + watchdog?.(); + + expect(runnerStop).not.toHaveBeenCalled(); + expect(botStop).not.toHaveBeenCalled(); + expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); + + abort.abort(); + resolveFirstTask(); + await runPromise; + } finally { + watchdogHarness.restore(); + } + }); + + it("publishes polling liveness after getUpdates succeeds", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + const setStatus = vi.fn(); + const getApiMiddleware = mockBotCapturingApiMiddleware(botStop); + const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); + + const session = createPollingSession({ + abortSignal: abort.signal, + setStatus, + }); + + const runPromise = session.runUntilAbort(); + + const apiMiddleware = await waitForApiMiddleware(getApiMiddleware); + const fakeGetUpdates = vi.fn(async () => []); + await apiMiddleware(fakeGetUpdates, "getUpdates", { offset: 1 }); + + expect(setStatus).toHaveBeenCalledWith({ + mode: "polling", + connected: false, + lastConnectedAt: null, + lastEventAt: null, + }); + const connectedPatch = setStatus.mock.calls.find( + ([patch]) => (patch as Record).connected === true, + )?.[0] as Record | undefined; + expect(connectedPatch).toMatchObject({ + connected: true, + mode: "polling", + lastConnectedAt: expect.any(Number), + lastEventAt: expect.any(Number), + lastError: null, + }); + expect(connectedPatch?.lastConnectedAt).toBe(connectedPatch?.lastEventAt); + + abort.abort(); + resolveFirstTask(); + await runPromise; + + expect(setStatus).toHaveBeenLastCalledWith({ + mode: "polling", + connected: false, + }); + }); + + it("keeps polling marked connected across recoverable restart cycles", async () => { + const abort = new AbortController(); + const recoverableError = new Error("recoverable polling error"); + const setStatus = vi.fn(); + let apiMiddleware: TelegramApiMiddleware | undefined; + const bot = { + api: { + deleteWebhook: vi.fn(async () => true), + getUpdates: vi.fn(async () => []), + config: { + use: vi.fn((fn: TelegramApiMiddleware) => { + apiMiddleware = fn; + }), + }, + }, + stop: vi.fn(async () => undefined), + }; + createTelegramBotMock.mockReturnValue(bot); + + let cycle = 0; + runMock.mockImplementation(() => { + cycle += 1; + if (cycle === 1) { + return { + task: async () => { + const middleware = apiMiddleware; + if (!middleware) { + throw new Error("Telegram API middleware was not installed"); + } + await middleware( + vi.fn(async () => []), + "getUpdates", + { offset: 1 }, + ); + throw recoverableError; + }, + stop: vi.fn(async () => undefined), + isRunning: () => false, + }; + } + return { + task: async () => { + abort.abort(); + }, + stop: vi.fn(async () => undefined), + isRunning: () => false, + }; + }); + + const session = createPollingSession({ + abortSignal: abort.signal, + setStatus, + }); + + await session.runUntilAbort(); + + expect(runMock).toHaveBeenCalledTimes(2); + expect(setStatus).toHaveBeenCalledWith( + expect.objectContaining({ connected: true, mode: "polling" }), + ); + const disconnectedPatches = setStatus.mock.calls.filter( + ([patch]) => (patch as Record).connected === false, + ); + expect(disconnectedPatches).toHaveLength(2); + expect(disconnectedPatches[0]?.[0]).toMatchObject({ + mode: "polling", + lastConnectedAt: null, + lastEventAt: null, + }); + expect(disconnectedPatches[1]?.[0]).toEqual({ + mode: "polling", + connected: false, + }); + }); + it("does not trigger stall restart when non-getUpdates API calls are active", async () => { const abort = new AbortController(); const botStop = vi.fn(async () => undefined); @@ -668,4 +861,51 @@ describe("TelegramPollingSession", () => { expectTelegramBotTransportSequence(transport1, transport1); expect(createTelegramTransport).not.toHaveBeenCalled(); }); + + it("closes the transport once when runUntilAbort exits normally", async () => { + const abort = new AbortController(); + const transport = makeTelegramTransport(); + createTelegramBotMock.mockReturnValueOnce(makeBot()); + runMock.mockReturnValueOnce({ + task: async () => { + abort.abort(); + }, + stop: vi.fn(async () => undefined), + isRunning: () => false, + }); + + const session = createPollingSession({ + abortSignal: abort.signal, + telegramTransport: transport, + }); + + await session.runUntilAbort(); + + expect(transport.close).toHaveBeenCalledTimes(1); + }); + + it("closes the stale transport when a rebuild replaces it", async () => { + const abort = new AbortController(); + const recoverableError = new Error("recoverable polling error"); + const transport1 = makeTelegramTransport(); + const transport2 = makeTelegramTransport(); + const createTelegramTransport = vi + .fn<() => ReturnType>() + .mockReturnValueOnce(transport2); + createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot()); + mockRestartAfterPollingError(recoverableError, abort); + + const session = createPollingSessionWithTransportRestart({ + abortSignal: abort.signal, + telegramTransport: transport1, + createTelegramTransport, + }); + + await session.runUntilAbort(); + + // Dirty-rebuild closes transport1 (fire-and-forget via #closeTransportAsync). + // dispose() closes transport2 since it becomes the held transport after the rebuild. + expect(transport1.close).toHaveBeenCalled(); + expect(transport2.close).toHaveBeenCalled(); + }); }); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index a629e7e7f64..c89921dedb3 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -1,4 +1,6 @@ import { type RunOptions, run } from "@grammyjs/runner"; +import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract"; +import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime"; import { computeBackoff, formatDurationPrecise, @@ -57,6 +59,7 @@ type TelegramPollingSessionOpts = { telegramTransport?: TelegramTransport; /** Rebuild Telegram transport after stall/network recovery when marked dirty. */ createTelegramTransport?: () => TelegramTransport; + setStatus?: (patch: Omit) => void; }; export class TelegramPollingSession { @@ -92,24 +95,41 @@ export class TelegramPollingSession { } async runUntilAbort(): Promise { - while (!this.opts.abortSignal?.aborted) { - const bot = await this.#createPollingBot(); - if (!bot) { - continue; - } + this.opts.setStatus?.({ + mode: "polling", + connected: false, + lastConnectedAt: null, + lastEventAt: null, + }); + try { + while (!this.opts.abortSignal?.aborted) { + const bot = await this.#createPollingBot(); + if (!bot) { + continue; + } - const cleanupState = await this.#ensureWebhookCleanup(bot); - if (cleanupState === "retry") { - continue; - } - if (cleanupState === "exit") { - return; - } + const cleanupState = await this.#ensureWebhookCleanup(bot); + if (cleanupState === "retry") { + continue; + } + if (cleanupState === "exit") { + return; + } - const state = await this.#runPollingCycle(bot); - if (state === "exit") { - return; + const state = await this.#runPollingCycle(bot); + if (state === "exit") { + return; + } } + } finally { + // Release the transport's dispatchers on session shutdown. Without + // this, the undici keep-alive sockets survive beyond the session and + // leak to api.telegram.org; see openclaw#68128. + await this.#transportState.dispose(); + this.opts.setStatus?.({ + mode: "polling", + connected: false, + }); } } @@ -265,6 +285,12 @@ export class TelegramPollingSession { lastGetUpdatesFinishedAt = finishedAt; lastGetUpdatesDurationMs = finishedAt - startedAt; lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok"; + lastApiActivityAt = finishedAt; + this.opts.setStatus?.({ + ...createConnectedChannelStatusPatch(finishedAt), + mode: "polling", + lastError: null, + }); return result; } catch (err) { const finishedAt = Date.now(); @@ -272,6 +298,7 @@ export class TelegramPollingSession { lastGetUpdatesDurationMs = finishedAt - startedAt; lastGetUpdatesOutcome = "error"; lastGetUpdatesError = formatErrorMessage(err); + lastApiActivityAt = finishedAt; throw err; } finally { inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1); diff --git a/extensions/telegram/src/polling-transport-state.test.ts b/extensions/telegram/src/polling-transport-state.test.ts new file mode 100644 index 00000000000..df53b5ce60c --- /dev/null +++ b/extensions/telegram/src/polling-transport-state.test.ts @@ -0,0 +1,163 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { TelegramTransport } from "./fetch.js"; +import { TelegramPollingTransportState } from "./polling-transport-state.js"; + +type LogFn = (line: string) => void; +type LogSpy = ReturnType>; + +function makeMockTransport(label = "transport"): TelegramTransport & { + close: ReturnType Promise>>; +} { + return { + fetch: (async () => new Response(`ok-${label}`)) as typeof fetch, + sourceFetch: (async () => new Response(`ok-${label}`)) as typeof fetch, + close: vi.fn<() => Promise>(async () => undefined), + }; +} + +function anyLogMatches(log: LogSpy, fragment: string): boolean { + return log.mock.calls.some((call) => { + const first = call[0]; + return typeof first === "string" && first.includes(fragment); + }); +} + +async function flushMicrotasks() { + // The dirty-rebuild path fires close() without awaiting it so the polling + // cycle is not blocked; tests must flush microtasks before asserting on it. + await Promise.resolve(); + await Promise.resolve(); +} + +describe("TelegramPollingTransportState", () => { + let log: LogSpy; + beforeEach(() => { + log = vi.fn(); + }); + + it("returns the initial transport when not dirty", () => { + const initial = makeMockTransport("initial"); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + }); + + const acquired = state.acquireForNextCycle(); + + expect(acquired).toBe(initial); + expect(initial.close).not.toHaveBeenCalled(); + }); + + it("closes the stale transport when a dirty rebuild replaces it", async () => { + const initial = makeMockTransport("initial"); + const rebuilt = makeMockTransport("rebuilt"); + const createTelegramTransport = vi.fn(() => rebuilt); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + createTelegramTransport, + }); + + state.markDirty(); + const acquired = state.acquireForNextCycle(); + + expect(acquired).toBe(rebuilt); + await flushMicrotasks(); + expect(initial.close).toHaveBeenCalledTimes(1); + expect(rebuilt.close).not.toHaveBeenCalled(); + expect(anyLogMatches(log, "closing stale transport")).toBe(true); + }); + + it("does not close when dirty rebuild keeps the same transport instance", async () => { + const initial = makeMockTransport("initial"); + // createTelegramTransport returns the same instance — e.g., factory returned null → fallback to previous + const createTelegramTransport = vi.fn(() => initial); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + createTelegramTransport, + }); + + state.markDirty(); + state.acquireForNextCycle(); + + await flushMicrotasks(); + expect(initial.close).not.toHaveBeenCalled(); + }); + + it("dispose() closes the currently-held transport and blocks until close resolves", async () => { + const initial = makeMockTransport("initial"); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + }); + // Force the state to promote the initial transport into the held slot. + state.acquireForNextCycle(); + + let closeResolved = false; + initial.close.mockImplementationOnce(async () => { + await Promise.resolve(); + closeResolved = true; + }); + + await state.dispose(); + + expect(initial.close).toHaveBeenCalledTimes(1); + expect(closeResolved).toBe(true); + }); + + it("dispose() is idempotent and safe with no transport", async () => { + const state = new TelegramPollingTransportState({ log }); + await expect(state.dispose()).resolves.toBeUndefined(); + await expect(state.dispose()).resolves.toBeUndefined(); + }); + + it("dispose() swallows errors thrown by transport.close()", async () => { + const initial = makeMockTransport("initial"); + initial.close.mockRejectedValueOnce(new Error("boom")); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + }); + state.acquireForNextCycle(); + + await expect(state.dispose()).resolves.toBeUndefined(); + expect(anyLogMatches(log, "failed to close transport during dispose")).toBe(true); + }); + + it("acquireForNextCycle() returns undefined after dispose()", async () => { + const initial = makeMockTransport("initial"); + const createTelegramTransport = vi.fn(() => makeMockTransport("rebuilt")); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + createTelegramTransport, + }); + + await state.dispose(); + + const acquired = state.acquireForNextCycle(); + expect(acquired).toBeUndefined(); + expect(createTelegramTransport).not.toHaveBeenCalled(); + }); + + it("clears the dirty flag even when no factory is configured", () => { + const initial = makeMockTransport("initial"); + const state = new TelegramPollingTransportState({ + log, + initialTransport: initial, + }); + state.markDirty(); + + const acquired = state.acquireForNextCycle(); + + expect(acquired).toBe(initial); + // Next cycle without markDirty should not trigger another rebuild log. + state.acquireForNextCycle(); + const rebuildLogs = log.mock.calls.filter((call) => { + const line = call[0]; + return typeof line === "string" && line.includes("rebuilding transport"); + }); + expect(rebuildLogs.length).toBeLessThanOrEqual(1); + }); +}); diff --git a/extensions/telegram/src/polling-transport-state.ts b/extensions/telegram/src/polling-transport-state.ts index 1f87556038d..8857df3ca39 100644 --- a/extensions/telegram/src/polling-transport-state.ts +++ b/extensions/telegram/src/polling-transport-state.ts @@ -9,6 +9,7 @@ type TelegramPollingTransportStateOpts = { export class TelegramPollingTransportState { #telegramTransport: TelegramTransport | undefined; #transportDirty = false; + #disposed = false; constructor(private readonly opts: TelegramPollingTransportStateOpts) { this.#telegramTransport = opts.initialTransport; @@ -19,10 +20,23 @@ export class TelegramPollingTransportState { } acquireForNextCycle(): TelegramTransport | undefined { - const shouldCreateTransport = this.#transportDirty || !this.#telegramTransport; + if (this.#disposed) { + return undefined; + } + const previous = this.#telegramTransport; + const shouldCreateTransport = this.#transportDirty || !previous; const nextTransport = shouldCreateTransport - ? (this.opts.createTelegramTransport?.() ?? this.#telegramTransport) - : this.#telegramTransport; + ? (this.opts.createTelegramTransport?.() ?? previous) + : previous; + // When the dirty flag triggered a rebuild, release the old transport's + // dispatchers. Without this, each network stall / recoverable error + // leaves a full pool of keep-alive sockets to api.telegram.org dangling + // forever — which over long-running sessions accumulates into the + // hundreds of ESTABLISHED connections that choke per-IP upstream quotas. + if (this.#transportDirty && previous && nextTransport !== previous) { + this.opts.log("[telegram][diag] closing stale transport before rebuild"); + this.#closeTransportAsync(previous, "stale-transport rebuild"); + } if (this.#transportDirty && nextTransport) { this.opts.log("[telegram][diag] rebuilding transport for next polling cycle"); } @@ -30,4 +44,40 @@ export class TelegramPollingTransportState { this.#transportDirty = false; return nextTransport; } + + async dispose(): Promise { + if (this.#disposed) { + return; + } + this.#disposed = true; + const transport = this.#telegramTransport; + this.#telegramTransport = undefined; + if (!transport) { + return; + } + try { + await transport.close(); + } catch (err) { + this.opts.log( + `[telegram][diag] failed to close transport during dispose: ${formatCloseError(err)}`, + ); + } + } + + // Fire-and-forget close used on the rebuild path so the polling cycle is not + // blocked by a slow destroy. The error path is logged but never rethrown. + #closeTransportAsync(transport: TelegramTransport, context: string) { + void transport.close().catch((err) => { + this.opts.log( + `[telegram][diag] failed to close transport (${context}): ${formatCloseError(err)}`, + ); + }); + } +} + +function formatCloseError(err: unknown): string { + if (err instanceof Error) { + return err.message; + } + return String(err); } diff --git a/src/gateway/channel-health-policy.test.ts b/src/gateway/channel-health-policy.test.ts index 359190cd8c6..1656bb8c74e 100644 --- a/src/gateway/channel-health-policy.test.ts +++ b/src/gateway/channel-health-policy.test.ts @@ -136,7 +136,7 @@ describe("evaluateChannelHealth", () => { expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); }); - it("skips stale-socket detection for telegram long-polling channels", () => { + it("flags stale sockets for telegram polling channels", () => { const evaluation = evaluateChannelHealth( { running: true, @@ -144,7 +144,49 @@ describe("evaluateChannelHealth", () => { enabled: true, configured: true, lastStartAt: 0, - lastEventAt: null, + lastEventAt: 0, + mode: "polling", + }, + { + channelId: "telegram", + now: 100_000, + channelConnectGraceMs: 10_000, + staleEventThresholdMs: 30_000, + }, + ); + expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" }); + }); + + it("skips stale-socket detection for telegram accounts without explicit polling mode", () => { + const evaluation = evaluateChannelHealth( + { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: 0, + lastEventAt: 0, + }, + { + channelId: "telegram", + now: 100_000, + channelConnectGraceMs: 10_000, + staleEventThresholdMs: 30_000, + }, + ); + expect(evaluation).toEqual({ healthy: true, reason: "healthy" }); + }); + + it("skips stale-socket detection for telegram accounts with malformed mode", () => { + const evaluation = evaluateChannelHealth( + { + running: true, + connected: true, + enabled: true, + configured: true, + lastStartAt: 0, + lastEventAt: 0, + mode: { polling: true } as unknown as string, }, { channelId: "telegram", diff --git a/src/gateway/channel-health-policy.ts b/src/gateway/channel-health-policy.ts index f88b00e746f..58233036260 100644 --- a/src/gateway/channel-health-policy.ts +++ b/src/gateway/channel-health-policy.ts @@ -59,6 +59,7 @@ export function evaluateChannelHealth( snapshot: ChannelHealthSnapshot, policy: ChannelHealthPolicy, ): ChannelHealthEvaluation { + const mode = typeof snapshot.mode === "string" ? snapshot.mode.trim().toLowerCase() : undefined; if (!isManagedAccount(snapshot)) { return { healthy: true, reason: "unmanaged" }; } @@ -78,6 +79,10 @@ export function evaluateChannelHealth( typeof snapshot.lastRunActivityAt === "number" && Number.isFinite(snapshot.lastRunActivityAt) ? snapshot.lastRunActivityAt : null; + const lastEventAt = + typeof snapshot.lastEventAt === "number" && Number.isFinite(snapshot.lastEventAt) + ? snapshot.lastEventAt + : null; const busyStateInitializedForLifecycle = lastStartAt == null || (lastRunActivityAt != null && lastRunActivityAt >= lastStartAt); @@ -107,24 +112,23 @@ export function evaluateChannelHealth( if (snapshot.connected === false) { return { healthy: false, reason: "disconnected" }; } - // Skip stale-socket checks for channels that declare this health policy and - // any channel explicitly operating in webhook mode. In these cases, there is - // no persistent outgoing socket that can go half-dead, so the lack of - // incoming events does not necessarily indicate a connection failure. - if ( + // Telegram only has reliable stale-socket liveness in explicit polling mode. + // Webhook accounts and malformed legacy mode values do not have a persistent + // outgoing socket to age-check. + const shouldCheckStaleSocket = policy.skipStaleSocketCheck !== true && - snapshot.mode !== "webhook" && snapshot.connected === true && - snapshot.lastEventAt != null - ) { - if (lastStartAt != null && snapshot.lastEventAt < lastStartAt) { + lastEventAt != null && + (policy.channelId === "telegram" ? mode === "polling" : mode !== "webhook"); + if (shouldCheckStaleSocket) { + if (lastStartAt != null && lastEventAt < lastStartAt) { const lifecycleEventGap = Math.max(0, policy.now - lastStartAt); if (lifecycleEventGap <= policy.staleEventThresholdMs) { return { healthy: true, reason: "healthy" }; } return { healthy: false, reason: "stale-socket" }; } - const eventAge = policy.now - snapshot.lastEventAt; + const eventAge = policy.now - lastEventAt; if (eventAge > policy.staleEventThresholdMs) { return { healthy: false, reason: "stale-socket" }; }