fix(telegram): harden polling transport liveness (#69476)

* fix(telegram): release undici dispatchers via TelegramTransport.close()

TelegramTransport now exposes an explicit close() that destroys every
owned undici dispatcher (default Agent plus lazily-created IPv4 and
IP-pinned fallback Agents) and the TCP sockets they hold. Dispatcher
constructors are also given bounded keep-alive defaults
(keepAliveTimeout, keepAliveMaxTimeout, connections, pipelining) as a
defence-in-depth layer so the pool cannot grow unbounded even if a
caller forgets to call close().

Without this, every transport that went through a fallback retry left
its fallback Agents anchored forever in a closure; long-running polling
sessions accumulated hundreds of ESTABLISHED keep-alive sockets to
api.telegram.org, saturating the per-IP quota on upstream forward
proxies and making the currently-active outbound node time out while
every other node still tested healthy.

Mock dispatchers in fetch.test.ts gain destroy() spies so the close()
chain is assertable. Call sites that built caller-owned transports from
globalThis.fetch (delivery.resolve-media, test helpers) return an async
no-op close(), matching the new required surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(telegram): dispose polling transport on shutdown and dirty rebuild

Every recoverable network error and stall-watchdog trip sets
TelegramPollingTransportState.#transportDirty so the next polling
cycle rebuilds the transport inside acquireForNextCycle(). Previously
the rebuild simply overwrote the field, leaving the old transport's
keep-alive sockets anchored in the now-unreferenced dispatcher — the
polling loop has no natural GC point for these resources, and Node's
object GC never touches OS-level sockets.

acquireForNextCycle() now closes the previous transport (fire-and-
forget so the polling cycle is not blocked by a slow destroy) before
swapping in the rebuilt one. dispose() is a new method that the owning
TelegramPollingSession calls from the finally block of runUntilAbort(),
so a single transport is always tied to a single polling session
lifetime. After dispose(), acquireForNextCycle() returns undefined to
prevent zombie rebuilds.

Under high sustained polling traffic over long-lived sessions, this is
what stops the per-gateway connection count to api.telegram.org from
growing indefinitely and saturating upstream proxy quotas.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(changelog): note Telegram undici dispatcher lifecycle fix

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(telegram): disable HTTP/2 for all Telegram polling dispatchers

Undici 8 enables HTTP/2 ALPN by default, but Telegram's long-polling
connections stall on Windows due to IPv6 + H2 multiplexing issues. The
core fetch-guard already sets allowH2:false for guarded paths, but the
Telegram extension creates its own Agent/ProxyAgent/EnvHttpProxyAgent
instances directly from undici without this flag.

Apply allowH2:false to all dispatcher constructors in the Telegram
transport layer, matching the approach used in src/infra/net/undici-runtime.ts.

Fixes #66885

* fix: avoid false telegram polling stall restarts

* fix(telegram): publish polling health liveness

---------

Co-authored-by: Ethan Chen <ethanbit@qq.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Magicray1217 <magicray1217@users.noreply.github.com>
Co-authored-by: aoao <aoao@openclaw>
This commit is contained in:
Peter Steinberger
2026-04-20 23:03:57 +01:00
committed by GitHub
parent 250c756fb4
commit 60fea81cf1
18 changed files with 823 additions and 75 deletions

View File

@@ -111,6 +111,8 @@ Docs: https://docs.openclaw.ai
- Plugins/discovery: reuse bundled and global plugin discovery results across workspace cache misses so Windows multi-workspace startup stops redoing the shared synchronous scan. (#67940) Thanks @obviyus.
- Bundled plugins/install: keep staged bundled plugin runtime imports resolving through the packaged Plugin SDK while omitting checkout-only aliases from the dist inventory, so published installs do not fail on repo-local paths.
- Plugins/webhooks: enforce synchronous plugin registration with full rollback of failed plugin side effects, and cache SecretRef-backed webhook auth per route so plugin startup and inbound webhook auth stay deterministic. (#67941) Thanks @obviyus.
- Telegram/polling transport: give the Telegram undici dispatcher pool bounded keep-alive defaults and an explicit lifecycle. Previously every recoverable network error and stall watchdog trip silently replaced the transport, abandoning the old dispatcher pool and its sockets; long-running gateway processes accumulated hundreds of ESTABLISHED connections to `api.telegram.org`, saturating per-IP upstream proxy quotas and causing the actively-used outbound proxy node to time out while every other node still tested healthy. Transports now expose `close()`, `TelegramPollingTransportState` destroys the stale transport on dirty-rebuild, and `TelegramPollingSession` disposes the transport when polling exits — backed by a strict per-origin pool cap on every constructed `Agent`, `ProxyAgent`, and `EnvHttpProxyAgent` as defence in depth.
- Telegram/polling: publish successful `getUpdates` calls as account health liveness, avoid false stall restarts after recoverable `getUpdates` errors, and force Telegram API dispatchers to HTTP/1.1 so stalled polling recovers instead of sitting connected-but-dead.
- Telegram/ACP bindings: drop persisted DM bindings that still point at missing or failed ACP sessions on restart, while preserving plugin-owned bindings and uncertain store reads. (#67822) Thanks @chinar-amrutkar.
- Telegram/streaming: keep a transient preview on the same Telegram message when auto-compaction retries an in-flight answer, so streamed replies no longer appear duplicated after compaction. (#66939) Thanks @rubencu.
- Memory/sqlite-vec: emit the degraded sqlite-vec warning once per degraded episode instead of repeating it for every file write, while preserving the latch across safe-reindex rollback and resetting it when vector state is genuinely rebuilt. (#67898) Thanks @rubencu.

View File

@@ -917,6 +917,7 @@ Per-account, per-group, and per-topic overrides are supported (same inheritance
- Node 22+ + custom fetch/proxy can trigger immediate abort behavior if AbortSignal types mismatch.
- Some hosts resolve `api.telegram.org` to IPv6 first; broken IPv6 egress can cause intermittent Telegram API failures.
- If logs include `TypeError: fetch failed` or `Network request for 'getUpdates' failed!`, OpenClaw now retries these as recoverable network errors.
- If logs include `Polling stall detected`, OpenClaw restarts polling and rebuilds the Telegram transport. Persistent stalls usually point to proxy, DNS, IPv6, or TLS egress issues between the host and `api.telegram.org`.
- On VPS hosts with unstable direct egress/TLS, route Telegram API calls through `channels.telegram.proxy`:
```yaml

View File

@@ -45,13 +45,14 @@ Full troubleshooting: [/channels/whatsapp#troubleshooting](/channels/whatsapp#tr
### Telegram failure signatures
| Symptom | Fastest check | Fix |
| ----------------------------------- | ----------------------------------------------- | --------------------------------------------------------------------------- |
| `/start` but no usable reply flow | `openclaw pairing list telegram` | Approve pairing or change DM policy. |
| Bot online but group stays silent | Verify mention requirement and bot privacy mode | Disable privacy mode for group visibility or mention bot. |
| Send failures with network errors | Inspect logs for Telegram API call failures | Fix DNS/IPv6/proxy routing to `api.telegram.org`. |
| `setMyCommands` rejected at startup | Inspect logs for `BOT_COMMANDS_TOO_MUCH` | Reduce plugin/skill/custom Telegram commands or disable native menus. |
| Upgraded and allowlist blocks you | `openclaw security audit` and config allowlists | Run `openclaw doctor --fix` or replace `@username` with numeric sender IDs. |
| Symptom | Fastest check | Fix |
| ----------------------------------- | ------------------------------------------------ | --------------------------------------------------------------------------- |
| `/start` but no usable reply flow | `openclaw pairing list telegram` | Approve pairing or change DM policy. |
| Bot online but group stays silent | Verify mention requirement and bot privacy mode | Disable privacy mode for group visibility or mention bot. |
| Send failures with network errors | Inspect logs for Telegram API call failures | Fix DNS/IPv6/proxy routing to `api.telegram.org`. |
| Polling stalls or reconnects slowly | `openclaw logs --follow` for polling diagnostics | Upgrade, then check proxy/DNS/IPv6 if `getUpdates` keeps timing out. |
| `setMyCommands` rejected at startup | Inspect logs for `BOT_COMMANDS_TOO_MUCH` | Reduce plugin/skill/custom Telegram commands or disable native menus. |
| Upgraded and allowlist blocks you | `openclaw security audit` and config allowlists | Run `openclaw doctor --fix` or replace `@username` with numeric sender IDs. |
Full troubleshooting: [/channels/telegram#troubleshooting](/channels/telegram#troubleshooting)

View File

@@ -1334,6 +1334,7 @@ describe("createTelegramBot", () => {
telegramTransport: {
fetch: mediaFetch as typeof fetch,
sourceFetch: mediaFetch as typeof fetch,
close: async () => {},
},
});
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;
@@ -1447,6 +1448,7 @@ describe("createTelegramBot", () => {
telegramTransport: {
fetch: mediaFetch as typeof fetch,
sourceFetch: mediaFetch as typeof fetch,
close: async () => {},
},
});
const handler = getOnHandler("message") as (ctx: Record<string, unknown>) => Promise<void>;

View File

@@ -345,6 +345,7 @@ describe("resolveMedia getFile retry", () => {
fetch: callerFetch,
sourceFetch: callerFetch,
dispatcherAttempts,
close: async () => {},
};
fetchRemoteMedia.mockResolvedValueOnce({
buffer: Buffer.from("pdf-data"),
@@ -379,7 +380,7 @@ describe("resolveMedia getFile retry", () => {
it("uses caller-provided fetch impl for sticker downloads", async () => {
const getFile = vi.fn().mockResolvedValue({ file_path: "stickers/file_0.webp" });
const callerFetch = vi.fn() as unknown as typeof fetch;
const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch };
const callerTransport = { fetch: callerFetch, sourceFetch: callerFetch, close: async () => {} };
fetchRemoteMedia.mockResolvedValueOnce({
buffer: Buffer.from("sticker-data"),
contentType: "image/webp",

View File

@@ -151,6 +151,9 @@ function resolveRequiredTelegramTransport(transport?: TelegramTransport): Telegr
return {
fetch: resolvedFetch,
sourceFetch: resolvedFetch,
// Caller-owned transport constructed from the globalThis fetch — it owns
// no dispatcher lifecycle of its own, so close() is a no-op.
close: async () => {},
};
}

View File

@@ -5,6 +5,7 @@ import {
} from "openclaw/plugin-sdk/allowlist-config-edit";
import type { ChannelMessageActionAdapter } from "openclaw/plugin-sdk/channel-contract";
import { clearAccountEntryFields, createChatChannelPlugin } from "openclaw/plugin-sdk/channel-core";
import { createAccountStatusSink } from "openclaw/plugin-sdk/channel-lifecycle";
import { createPairingPrefixStripper } from "openclaw/plugin-sdk/channel-pairing";
import { createAllowlistProviderRouteAllowlistWarningCollector } from "openclaw/plugin-sdk/channel-policy";
import { attachChannelToResult } from "openclaw/plugin-sdk/channel-send-result";
@@ -766,7 +767,6 @@ export const telegramPlugin = createChatChannelPlugin({
actions: telegramMessageActions,
status: createComputedAccountStatusAdapter<ResolvedTelegramAccount, TelegramProbe>({
defaultRuntime: createDefaultChannelRuntimeState(DEFAULT_ACCOUNT_ID),
skipStaleSocketHealthCheck: true,
collectStatusIssues: collectTelegramStatusIssues,
buildChannelSummary: ({ snapshot }) => buildTokenChannelStatusSummary(snapshot),
probeAccount: async ({ account, timeoutMs }) =>
@@ -902,6 +902,10 @@ export const telegramPlugin = createChatChannelPlugin({
}
}
ctx.log?.info(`[${account.accountId}] starting provider${telegramBotLabel}`);
const setStatus = createAccountStatusSink({
accountId: account.accountId,
setStatus: ctx.setStatus,
});
return resolveTelegramMonitor()({
token,
accountId: account.accountId,
@@ -916,6 +920,7 @@ export const telegramPlugin = createChatChannelPlugin({
webhookHost: account.config.webhookHost,
webhookPort: account.config.webhookPort,
webhookCertPath: account.config.webhookCertPath,
setStatus,
});
},
logoutAccount: async ({ accountId, cfg }) => {

View File

@@ -7,28 +7,37 @@ const loggerDebug = vi.hoisted(() => vi.fn());
const undiciFetch = vi.hoisted(() => vi.fn());
const setGlobalDispatcher = vi.hoisted(() => vi.fn());
type MockDispatcherInstance = {
options?: Record<string, unknown> | string;
destroy: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
};
const AgentCtor = vi.hoisted(() =>
vi.fn(function MockAgent(
this: { options?: Record<string, unknown> },
options?: Record<string, unknown>,
) {
vi.fn(function MockAgent(this: MockDispatcherInstance, options?: Record<string, unknown>) {
this.options = options;
this.destroy = vi.fn(async () => undefined);
this.close = vi.fn(async () => undefined);
}),
);
const EnvHttpProxyAgentCtor = vi.hoisted(() =>
vi.fn(function MockEnvHttpProxyAgent(
this: { options?: Record<string, unknown> },
this: MockDispatcherInstance,
options?: Record<string, unknown>,
) {
this.options = options;
this.destroy = vi.fn(async () => undefined);
this.close = vi.fn(async () => undefined);
}),
);
const ProxyAgentCtor = vi.hoisted(() =>
vi.fn(function MockProxyAgent(
this: { options?: Record<string, unknown> | string },
this: MockDispatcherInstance,
options?: Record<string, unknown> | string,
) {
this.options = options;
this.destroy = vi.fn(async () => undefined);
this.close = vi.fn(async () => undefined);
}),
);
@@ -140,6 +149,7 @@ function getDispatcherFromUndiciCall(nth: number) {
return init?.dispatcher as
| {
options?: {
allowH2?: boolean;
connect?: Record<string, unknown>;
proxyTls?: Record<string, unknown>;
requestTls?: Record<string, unknown>;
@@ -186,6 +196,7 @@ function expectStickyAutoSelectDispatcher(
dispatcher:
| {
options?: {
allowH2?: boolean;
connect?: Record<string, unknown>;
proxyTls?: Record<string, unknown>;
requestTls?: Record<string, unknown>;
@@ -202,6 +213,22 @@ function expectStickyAutoSelectDispatcher(
);
}
function expectHttp1OnlyDispatcher(
dispatcher:
| {
options?: {
allowH2?: boolean;
};
}
| undefined,
): void {
expect(dispatcher?.options).toEqual(
expect.objectContaining({
allowH2: false,
}),
);
}
function expectPinnedIpv4ConnectDispatcher(args: {
pinnedCall: number;
firstCall?: number;
@@ -319,6 +346,7 @@ describe("resolveTelegramFetch", () => {
const dispatcher = getDispatcherFromUndiciCall(1);
expect(dispatcher).toBeDefined();
expectHttp1OnlyDispatcher(dispatcher);
expect(dispatcher?.options?.connect).toEqual(
expect.objectContaining({
autoSelectFamily: true,
@@ -354,6 +382,7 @@ describe("resolveTelegramFetch", () => {
expect(AgentCtor).not.toHaveBeenCalled();
const dispatcher = getDispatcherFromUndiciCall(1);
expectHttp1OnlyDispatcher(dispatcher);
expect(dispatcher?.options?.connect).toEqual(
expect.objectContaining({
autoSelectFamily: false,
@@ -379,6 +408,7 @@ describe("resolveTelegramFetch", () => {
expect(ProxyAgentCtor).toHaveBeenCalledTimes(1);
expect(ProxyAgentCtor).toHaveBeenCalledWith(
expect.objectContaining({
allowH2: false,
uri: "http://127.0.0.1:7777",
}),
);
@@ -398,6 +428,7 @@ describe("resolveTelegramFetch", () => {
await resolved("https://api.telegram.org/botx/getMe");
const dispatcher = getDispatcherFromUndiciCall(1);
expectHttp1OnlyDispatcher(dispatcher);
expect(dispatcher?.options?.connect).toEqual(
expect.objectContaining({
autoSelectFamily: true,
@@ -431,6 +462,7 @@ describe("resolveTelegramFetch", () => {
expect(EnvHttpProxyAgentCtor).not.toHaveBeenCalled();
expect(AgentCtor).not.toHaveBeenCalled();
const dispatcher = getDispatcherFromUndiciCall(1);
expectHttp1OnlyDispatcher(dispatcher);
expect(dispatcher?.options).toEqual(
expect.objectContaining({
uri: "http://127.0.0.1:7890",
@@ -828,4 +860,88 @@ describe("resolveTelegramFetch", () => {
expect(setDefaultResultOrder).not.toHaveBeenCalled();
expect(setDefaultAutoSelectFamily).not.toHaveBeenCalled();
});
describe("transport lifecycle", () => {
it("passes a bounded keep-alive pool configuration to every constructed dispatcher", () => {
resolveTelegramTransport(undefined, {
network: {
autoSelectFamily: true,
dnsResultOrder: "ipv4first",
},
});
// One direct Agent for the default dispatcher plus two lazy fallbacks not yet touched.
expect(AgentCtor).toHaveBeenCalledTimes(1);
const defaultAgent = AgentCtor.mock.instances[0]?.options;
expect(defaultAgent).toEqual(
expect.objectContaining({
allowH2: false,
keepAliveTimeout: expect.any(Number),
keepAliveMaxTimeout: expect.any(Number),
connections: expect.any(Number),
pipelining: expect.any(Number),
}),
);
const connections = (defaultAgent as { connections?: number }).connections;
expect(connections).toBeGreaterThan(0);
expect(connections).toBeLessThan(100);
});
it("close() destroys the default dispatcher and all lazily-created fallback dispatchers", async () => {
undiciFetch
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"))
.mockRejectedValueOnce(buildFetchFallbackError("EHOSTUNREACH"))
.mockResolvedValueOnce({ ok: true } as Response);
const transport = resolveTelegramTransport(undefined, {
network: {
autoSelectFamily: true,
dnsResultOrder: "ipv4first",
},
});
// Trigger fallback chain so the two lazy fallback dispatchers are instantiated.
await transport.fetch("https://api.telegram.org/botx/getMe");
// Three Agents total: default + IPv4 fallback + pinned-IP fallback.
expect(AgentCtor).toHaveBeenCalledTimes(3);
const instances = AgentCtor.mock.instances;
expect(instances).toHaveLength(3);
await transport.close();
for (const instance of instances) {
expect(instance.destroy).toHaveBeenCalledTimes(1);
}
});
it("close() is idempotent", async () => {
const transport = resolveTelegramTransport(undefined, {
network: {
autoSelectFamily: true,
dnsResultOrder: "ipv4first",
},
});
const instance = AgentCtor.mock.instances[0];
await transport.close();
await transport.close();
await transport.close();
expect(instance.destroy).toHaveBeenCalledTimes(1);
});
it("close() swallows dispatcher destroy failures so callers can safely fire-and-forget", async () => {
const transport = resolveTelegramTransport(undefined, {
network: {
autoSelectFamily: true,
dnsResultOrder: "ipv4first",
},
});
const instance = AgentCtor.mock.instances[0];
instance.destroy.mockRejectedValueOnce(new Error("already destroyed"));
await expect(transport.close()).resolves.toBeUndefined();
});
});
});

View File

@@ -28,6 +28,36 @@ const TELEGRAM_AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS = 300;
const TELEGRAM_API_HOSTNAME = "api.telegram.org";
const TELEGRAM_FALLBACK_IPS: readonly string[] = ["149.154.167.220"];
// Dispatcher defaults that bound the per-origin connection pool. Telegram long
// polling keeps a handful of connections hot for hours, so the defaults must be
// strict enough that (a) idle sockets are closed even when the pool is still
// actively used and (b) the pool itself cannot grow unbounded under transient
// concurrency spikes. These values are a defence-in-depth layer; the primary
// fix for the leak observed in openclaw#68128 is the transport lifecycle that
// calls `close()` on abandoned dispatchers.
const TELEGRAM_DISPATCHER_KEEP_ALIVE_TIMEOUT_MS = 30_000;
const TELEGRAM_DISPATCHER_KEEP_ALIVE_MAX_TIMEOUT_MS = 600_000;
const TELEGRAM_DISPATCHER_CONNECTIONS_PER_ORIGIN = 10;
const TELEGRAM_DISPATCHER_PIPELINING = 1;
type TelegramAgentPoolOptions = {
allowH2: false;
keepAliveTimeout: number;
keepAliveMaxTimeout: number;
connections: number;
pipelining: number;
};
function telegramAgentPoolOptions(): TelegramAgentPoolOptions {
return {
allowH2: false,
keepAliveTimeout: TELEGRAM_DISPATCHER_KEEP_ALIVE_TIMEOUT_MS,
keepAliveMaxTimeout: TELEGRAM_DISPATCHER_KEEP_ALIVE_MAX_TIMEOUT_MS,
connections: TELEGRAM_DISPATCHER_CONNECTIONS_PER_ORIGIN,
pipelining: TELEGRAM_DISPATCHER_PIPELINING,
};
}
type RequestInitWithDispatcher = RequestInit & {
dispatcher?: unknown;
};
@@ -265,14 +295,18 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
mode: TelegramDispatcherMode;
effectivePolicy: PinnedDispatcherPolicy;
} {
// Telegram polling uses long-lived connections. Undici 8 enables HTTP/2 ALPN
// by default, which can stall Telegram long-polling on Windows/IPv6 networks.
// Force HTTP/1.1 for every dispatcher while keeping bounded pool defaults.
const poolOptions = telegramAgentPoolOptions();
if (policy.mode === "explicit-proxy") {
const requestTlsOptions = withPinnedLookup(policy.proxyTls, policy.pinnedHostname);
const proxyOptions = requestTlsOptions
? ({
uri: policy.proxyUrl,
requestTls: requestTlsOptions,
} satisfies ConstructorParameters<typeof ProxyAgent>[0])
: policy.proxyUrl;
const proxyOptions = {
uri: policy.proxyUrl,
...poolOptions,
...(requestTlsOptions ? { requestTls: requestTlsOptions } : {}),
} satisfies ConstructorParameters<typeof ProxyAgent>[0];
try {
return {
dispatcher: new ProxyAgent(proxyOptions),
@@ -288,13 +322,11 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
if (policy.mode === "env-proxy") {
const connectOptions = withPinnedLookup(policy.connect, policy.pinnedHostname);
const proxyTlsOptions = withPinnedLookup(policy.proxyTls, policy.pinnedHostname);
const proxyOptions =
connectOptions || proxyTlsOptions
? ({
...(connectOptions ? { connect: connectOptions } : {}),
...(proxyTlsOptions ? { proxyTls: proxyTlsOptions } : {}),
} satisfies ConstructorParameters<typeof EnvHttpProxyAgent>[0])
: undefined;
const proxyOptions = {
...poolOptions,
...(connectOptions ? { connect: connectOptions } : {}),
...(proxyTlsOptions ? { proxyTls: proxyTlsOptions } : {}),
} satisfies ConstructorParameters<typeof EnvHttpProxyAgent>[0];
try {
return {
dispatcher: new EnvHttpProxyAgent(proxyOptions),
@@ -310,11 +342,10 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
...(connectOptions ? { connect: connectOptions } : {}),
};
return {
dispatcher: new Agent(
directPolicy.connect
? ({ connect: directPolicy.connect } satisfies ConstructorParameters<typeof Agent>[0])
: undefined,
),
dispatcher: new Agent({
...poolOptions,
...(directPolicy.connect ? { connect: directPolicy.connect } : {}),
} satisfies ConstructorParameters<typeof Agent>[0]),
mode: "direct",
effectivePolicy: directPolicy,
};
@@ -323,13 +354,10 @@ function createTelegramDispatcher(policy: PinnedDispatcherPolicy): {
const connectOptions = withPinnedLookup(policy.connect, policy.pinnedHostname);
return {
dispatcher: new Agent(
connectOptions
? ({
connect: connectOptions,
} satisfies ConstructorParameters<typeof Agent>[0])
: undefined,
),
dispatcher: new Agent({
...poolOptions,
...(connectOptions ? { connect: connectOptions } : {}),
} satisfies ConstructorParameters<typeof Agent>[0]),
mode: "direct",
effectivePolicy: policy,
};
@@ -429,13 +457,29 @@ export type TelegramTransport = {
fetch: typeof fetch;
sourceFetch: typeof fetch;
dispatcherAttempts?: TelegramDispatcherAttempt[];
/**
* Release all dispatchers owned by this transport and the TCP sockets they
* hold. Safe to call multiple times; subsequent calls resolve immediately.
*
* Callers that pass their own `proxyFetch` own the underlying dispatcher
* lifecycle themselves and this is effectively a no-op. Callers that let
* this module construct the transport MUST invoke `close()` when the
* transport is no longer needed (e.g. on polling session dispose or when
* swapping transports after a network stall); otherwise undici keeps the
* keep-alive sockets open indefinitely, leaking hundreds of connections
* to api.telegram.org over long-running sessions.
*/
close(): Promise<void>;
};
function createTelegramTransportAttempts(params: {
defaultDispatcher: ReturnType<typeof createTelegramDispatcher>;
allowFallback: boolean;
fallbackPolicy?: PinnedDispatcherPolicy;
ownedDispatchers: Set<TelegramDispatcher>;
}): TelegramTransportAttempt[] {
params.ownedDispatchers.add(params.defaultDispatcher.dispatcher);
const attempts: TelegramTransportAttempt[] = [
{
createDispatcher: () => params.defaultDispatcher.dispatcher,
@@ -447,12 +491,14 @@ function createTelegramTransportAttempts(params: {
return attempts;
}
const fallbackPolicy = params.fallbackPolicy;
const ownedDispatchers = params.ownedDispatchers;
let ipv4Dispatcher: TelegramDispatcher | null = null;
attempts.push({
createDispatcher: () => {
if (!ipv4Dispatcher) {
ipv4Dispatcher = createTelegramDispatcher(fallbackPolicy).dispatcher;
ownedDispatchers.add(ipv4Dispatcher);
}
return ipv4Dispatcher;
},
@@ -476,6 +522,7 @@ function createTelegramTransportAttempts(params: {
createDispatcher: () => {
if (!fallbackIpDispatcher) {
fallbackIpDispatcher = createTelegramDispatcher(fallbackIpPolicy).dispatcher;
ownedDispatchers.add(fallbackIpDispatcher);
}
return fallbackIpDispatcher;
},
@@ -486,6 +533,23 @@ function createTelegramTransportAttempts(params: {
return attempts;
}
async function destroyOwnedDispatchers(dispatchers: Iterable<TelegramDispatcher>): Promise<void> {
// Use destroy() rather than close() so abandoned sockets are released
// immediately without waiting for in-flight requests that the caller has
// already decided to abandon (session aborted, or stale transport being
// replaced after a stall). The per-dispatcher try/catch isolates failures
// (already-destroyed dispatchers throw) so Promise.all never rejects.
await Promise.all(
[...dispatchers].map(async (dispatcher) => {
try {
await dispatcher.destroy();
} catch {
// Intentionally ignored: dispatcher may already be destroyed.
}
}),
);
}
export function resolveTelegramTransport(
proxyFetch?: typeof fetch,
options?: { network?: TelegramNetworkConfig },
@@ -518,7 +582,8 @@ export function resolveTelegramTransport(
: undiciSourceFetch;
const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value);
if (effectiveProxyFetch && !explicitProxyUrl) {
return { fetch: sourceFetch, sourceFetch };
// The caller owns the underlying dispatcher lifecycle; nothing to close here.
return { fetch: sourceFetch, sourceFetch, close: async () => {} };
}
const useEnvProxy = !explicitProxyUrl && hasEnvHttpProxyForTelegramApi();
@@ -543,10 +608,12 @@ export function resolveTelegramTransport(
proxyUrl: explicitProxyUrl,
}).policy
: undefined;
const ownedDispatchers = new Set<TelegramDispatcher>();
const transportAttempts = createTelegramTransportAttempts({
defaultDispatcher,
allowFallback: allowStickyFallback,
fallbackPolicy: fallbackDispatcherPolicy,
ownedDispatchers,
});
let stickyAttemptIndex = 0;
@@ -615,10 +682,22 @@ export function resolveTelegramTransport(
throw err;
}) as typeof fetch;
let closed = false;
const close = async (): Promise<void> => {
if (closed) {
return;
}
closed = true;
const toDestroy = [...ownedDispatchers];
ownedDispatchers.clear();
await destroyOwnedDispatchers(toDestroy);
};
return {
fetch: resolvedFetch,
sourceFetch,
dispatcherAttempts: transportAttempts.map((attempt) => attempt.exportAttempt),
close,
};
}

View File

@@ -87,6 +87,7 @@ const { resolveTelegramTransportSpy } = vi.hoisted(() => ({
resolveTelegramTransportSpy: vi.fn(() => ({
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
})),
}));
@@ -358,6 +359,7 @@ describe("monitorTelegramProvider (grammY)", () => {
resolveTelegramTransportSpy.mockReset().mockImplementation(() => ({
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
}));
registerUnhandledRejectionHandlerMock.mockClear();
resetUnhandledRejection();
@@ -565,10 +567,12 @@ describe("monitorTelegramProvider (grammY)", () => {
const telegramTransport = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
const rebuiltTransport = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
resolveTelegramTransportSpy
.mockReturnValueOnce(telegramTransport)
@@ -600,10 +604,12 @@ describe("monitorTelegramProvider (grammY)", () => {
const telegramTransport = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
const rebuiltTransport = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
resolveTelegramTransportSpy
.mockReturnValueOnce(telegramTransport)
@@ -760,6 +766,7 @@ describe("monitorTelegramProvider (grammY)", () => {
const telegramTransport = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
resolveTelegramTransportSpy.mockReturnValueOnce(telegramTransport);

View File

@@ -227,6 +227,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
log,
telegramTransport,
createTelegramTransport: createTelegramTransportForPolling,
setStatus: opts.setStatus,
});
await pollingSession.runUntilAbort();
} finally {

View File

@@ -1,4 +1,7 @@
import type { ChannelRuntimeSurface } from "openclaw/plugin-sdk/channel-contract";
import type {
ChannelAccountSnapshot,
ChannelRuntimeSurface,
} from "openclaw/plugin-sdk/channel-contract";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
@@ -17,6 +20,7 @@ export type MonitorTelegramOpts = {
proxyFetch?: typeof fetch;
webhookUrl?: string;
webhookCertPath?: string;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
};
export type TelegramMonitorFn = (opts?: MonitorTelegramOpts) => Promise<void>;

View File

@@ -1,3 +1,4 @@
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const runMock = vi.hoisted(() => vi.fn());
@@ -97,7 +98,11 @@ function expectTelegramBotTransportSequence(firstTransport: unknown, secondTrans
}
function makeTelegramTransport() {
return { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
return {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
}
function mockRestartAfterPollingError(error: unknown, abort: AbortController) {
@@ -136,6 +141,7 @@ function createPollingSession(params: {
log?: (message: string) => void;
telegramTransport?: ReturnType<typeof makeTelegramTransport>;
createTelegramTransport?: () => ReturnType<typeof makeTelegramTransport>;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
}) {
return new TelegramPollingSession({
token: "tok",
@@ -149,6 +155,7 @@ function createPollingSession(params: {
persistUpdateId: async () => undefined,
log: params.log ?? (() => undefined),
telegramTransport: params.telegramTransport,
setStatus: params.setStatus,
...(params.createTelegramTransport
? { createTelegramTransport: params.createTelegramTransport }
: {}),
@@ -188,6 +195,19 @@ function mockLongRunningPollingCycle(runnerStop: AsyncVoidFn) {
return () => firstTaskResolve?.();
}
async function waitForApiMiddleware(
getApiMiddleware: () => TelegramApiMiddleware | undefined,
): Promise<TelegramApiMiddleware> {
for (let attempt = 0; attempt < 20; attempt += 1) {
const apiMiddleware = getApiMiddleware();
if (apiMiddleware) {
return apiMiddleware;
}
await Promise.resolve();
}
throw new Error("Telegram API middleware was not installed");
}
describe("TelegramPollingSession", () => {
beforeAll(async () => {
({ TelegramPollingSession } = await import("./polling-session.js"));
@@ -366,8 +386,16 @@ describe("TelegramPollingSession", () => {
const watchdogHarness = installPollingStallWatchdogHarness();
const transport1 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
const transport2 = { fetch: globalThis.fetch, sourceFetch: globalThis.fetch };
const transport1 = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
const transport2 = {
fetch: globalThis.fetch,
sourceFetch: globalThis.fetch,
close: vi.fn(async () => undefined),
};
const createTelegramTransport = vi.fn(() => transport2);
try {
@@ -420,6 +448,171 @@ describe("TelegramPollingSession", () => {
expect(createTelegramTransport).toHaveBeenCalledTimes(1);
});
it("does not trigger stall restart shortly after a getUpdates error", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const watchdogHarness = installPollingStallWatchdogHarness([0, 0, 1, 30_000], 119_999);
const log = vi.fn();
const session = createPollingSession({
abortSignal: abort.signal,
log,
});
try {
const runPromise = session.runUntilAbort();
const watchdog = await watchdogHarness.waitForWatchdog();
const apiMiddleware = getApiMiddleware();
if (apiMiddleware) {
const failedGetUpdates = vi.fn(async () => {
throw new Error("Network request for 'getUpdates' failed!");
});
await expect(apiMiddleware(failedGetUpdates, "getUpdates", { offset: 1 })).rejects.toThrow(
"Network request for 'getUpdates' failed!",
);
}
watchdog?.();
expect(runnerStop).not.toHaveBeenCalled();
expect(botStop).not.toHaveBeenCalled();
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
abort.abort();
resolveFirstTask();
await runPromise;
} finally {
watchdogHarness.restore();
}
});
it("publishes polling liveness after getUpdates succeeds", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
const runnerStop = vi.fn(async () => undefined);
const setStatus = vi.fn();
const getApiMiddleware = mockBotCapturingApiMiddleware(botStop);
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
const session = createPollingSession({
abortSignal: abort.signal,
setStatus,
});
const runPromise = session.runUntilAbort();
const apiMiddleware = await waitForApiMiddleware(getApiMiddleware);
const fakeGetUpdates = vi.fn(async () => []);
await apiMiddleware(fakeGetUpdates, "getUpdates", { offset: 1 });
expect(setStatus).toHaveBeenCalledWith({
mode: "polling",
connected: false,
lastConnectedAt: null,
lastEventAt: null,
});
const connectedPatch = setStatus.mock.calls.find(
([patch]) => (patch as Record<string, unknown>).connected === true,
)?.[0] as Record<string, unknown> | undefined;
expect(connectedPatch).toMatchObject({
connected: true,
mode: "polling",
lastConnectedAt: expect.any(Number),
lastEventAt: expect.any(Number),
lastError: null,
});
expect(connectedPatch?.lastConnectedAt).toBe(connectedPatch?.lastEventAt);
abort.abort();
resolveFirstTask();
await runPromise;
expect(setStatus).toHaveBeenLastCalledWith({
mode: "polling",
connected: false,
});
});
it("keeps polling marked connected across recoverable restart cycles", async () => {
const abort = new AbortController();
const recoverableError = new Error("recoverable polling error");
const setStatus = vi.fn();
let apiMiddleware: TelegramApiMiddleware | undefined;
const bot = {
api: {
deleteWebhook: vi.fn(async () => true),
getUpdates: vi.fn(async () => []),
config: {
use: vi.fn((fn: TelegramApiMiddleware) => {
apiMiddleware = fn;
}),
},
},
stop: vi.fn(async () => undefined),
};
createTelegramBotMock.mockReturnValue(bot);
let cycle = 0;
runMock.mockImplementation(() => {
cycle += 1;
if (cycle === 1) {
return {
task: async () => {
const middleware = apiMiddleware;
if (!middleware) {
throw new Error("Telegram API middleware was not installed");
}
await middleware(
vi.fn(async () => []),
"getUpdates",
{ offset: 1 },
);
throw recoverableError;
},
stop: vi.fn(async () => undefined),
isRunning: () => false,
};
}
return {
task: async () => {
abort.abort();
},
stop: vi.fn(async () => undefined),
isRunning: () => false,
};
});
const session = createPollingSession({
abortSignal: abort.signal,
setStatus,
});
await session.runUntilAbort();
expect(runMock).toHaveBeenCalledTimes(2);
expect(setStatus).toHaveBeenCalledWith(
expect.objectContaining({ connected: true, mode: "polling" }),
);
const disconnectedPatches = setStatus.mock.calls.filter(
([patch]) => (patch as Record<string, unknown>).connected === false,
);
expect(disconnectedPatches).toHaveLength(2);
expect(disconnectedPatches[0]?.[0]).toMatchObject({
mode: "polling",
lastConnectedAt: null,
lastEventAt: null,
});
expect(disconnectedPatches[1]?.[0]).toEqual({
mode: "polling",
connected: false,
});
});
it("does not trigger stall restart when non-getUpdates API calls are active", async () => {
const abort = new AbortController();
const botStop = vi.fn(async () => undefined);
@@ -668,4 +861,51 @@ describe("TelegramPollingSession", () => {
expectTelegramBotTransportSequence(transport1, transport1);
expect(createTelegramTransport).not.toHaveBeenCalled();
});
it("closes the transport once when runUntilAbort exits normally", async () => {
const abort = new AbortController();
const transport = makeTelegramTransport();
createTelegramBotMock.mockReturnValueOnce(makeBot());
runMock.mockReturnValueOnce({
task: async () => {
abort.abort();
},
stop: vi.fn(async () => undefined),
isRunning: () => false,
});
const session = createPollingSession({
abortSignal: abort.signal,
telegramTransport: transport,
});
await session.runUntilAbort();
expect(transport.close).toHaveBeenCalledTimes(1);
});
it("closes the stale transport when a rebuild replaces it", async () => {
const abort = new AbortController();
const recoverableError = new Error("recoverable polling error");
const transport1 = makeTelegramTransport();
const transport2 = makeTelegramTransport();
const createTelegramTransport = vi
.fn<() => ReturnType<typeof makeTelegramTransport>>()
.mockReturnValueOnce(transport2);
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
mockRestartAfterPollingError(recoverableError, abort);
const session = createPollingSessionWithTransportRestart({
abortSignal: abort.signal,
telegramTransport: transport1,
createTelegramTransport,
});
await session.runUntilAbort();
// Dirty-rebuild closes transport1 (fire-and-forget via #closeTransportAsync).
// dispose() closes transport2 since it becomes the held transport after the rebuild.
expect(transport1.close).toHaveBeenCalled();
expect(transport2.close).toHaveBeenCalled();
});
});

View File

@@ -1,4 +1,6 @@
import { type RunOptions, run } from "@grammyjs/runner";
import type { ChannelAccountSnapshot } from "openclaw/plugin-sdk/channel-contract";
import { createConnectedChannelStatusPatch } from "openclaw/plugin-sdk/gateway-runtime";
import {
computeBackoff,
formatDurationPrecise,
@@ -57,6 +59,7 @@ type TelegramPollingSessionOpts = {
telegramTransport?: TelegramTransport;
/** Rebuild Telegram transport after stall/network recovery when marked dirty. */
createTelegramTransport?: () => TelegramTransport;
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
};
export class TelegramPollingSession {
@@ -92,24 +95,41 @@ export class TelegramPollingSession {
}
async runUntilAbort(): Promise<void> {
while (!this.opts.abortSignal?.aborted) {
const bot = await this.#createPollingBot();
if (!bot) {
continue;
}
this.opts.setStatus?.({
mode: "polling",
connected: false,
lastConnectedAt: null,
lastEventAt: null,
});
try {
while (!this.opts.abortSignal?.aborted) {
const bot = await this.#createPollingBot();
if (!bot) {
continue;
}
const cleanupState = await this.#ensureWebhookCleanup(bot);
if (cleanupState === "retry") {
continue;
}
if (cleanupState === "exit") {
return;
}
const cleanupState = await this.#ensureWebhookCleanup(bot);
if (cleanupState === "retry") {
continue;
}
if (cleanupState === "exit") {
return;
}
const state = await this.#runPollingCycle(bot);
if (state === "exit") {
return;
const state = await this.#runPollingCycle(bot);
if (state === "exit") {
return;
}
}
} finally {
// Release the transport's dispatchers on session shutdown. Without
// this, the undici keep-alive sockets survive beyond the session and
// leak to api.telegram.org; see openclaw#68128.
await this.#transportState.dispose();
this.opts.setStatus?.({
mode: "polling",
connected: false,
});
}
}
@@ -265,6 +285,12 @@ export class TelegramPollingSession {
lastGetUpdatesFinishedAt = finishedAt;
lastGetUpdatesDurationMs = finishedAt - startedAt;
lastGetUpdatesOutcome = Array.isArray(result) ? `ok:${result.length}` : "ok";
lastApiActivityAt = finishedAt;
this.opts.setStatus?.({
...createConnectedChannelStatusPatch(finishedAt),
mode: "polling",
lastError: null,
});
return result;
} catch (err) {
const finishedAt = Date.now();
@@ -272,6 +298,7 @@ export class TelegramPollingSession {
lastGetUpdatesDurationMs = finishedAt - startedAt;
lastGetUpdatesOutcome = "error";
lastGetUpdatesError = formatErrorMessage(err);
lastApiActivityAt = finishedAt;
throw err;
} finally {
inFlightGetUpdates = Math.max(0, inFlightGetUpdates - 1);

View File

@@ -0,0 +1,163 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import type { TelegramTransport } from "./fetch.js";
import { TelegramPollingTransportState } from "./polling-transport-state.js";
type LogFn = (line: string) => void;
type LogSpy = ReturnType<typeof vi.fn<LogFn>>;
function makeMockTransport(label = "transport"): TelegramTransport & {
close: ReturnType<typeof vi.fn<() => Promise<void>>>;
} {
return {
fetch: (async () => new Response(`ok-${label}`)) as typeof fetch,
sourceFetch: (async () => new Response(`ok-${label}`)) as typeof fetch,
close: vi.fn<() => Promise<void>>(async () => undefined),
};
}
function anyLogMatches(log: LogSpy, fragment: string): boolean {
return log.mock.calls.some((call) => {
const first = call[0];
return typeof first === "string" && first.includes(fragment);
});
}
async function flushMicrotasks() {
// The dirty-rebuild path fires close() without awaiting it so the polling
// cycle is not blocked; tests must flush microtasks before asserting on it.
await Promise.resolve();
await Promise.resolve();
}
describe("TelegramPollingTransportState", () => {
let log: LogSpy;
beforeEach(() => {
log = vi.fn<LogFn>();
});
it("returns the initial transport when not dirty", () => {
const initial = makeMockTransport("initial");
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
});
const acquired = state.acquireForNextCycle();
expect(acquired).toBe(initial);
expect(initial.close).not.toHaveBeenCalled();
});
it("closes the stale transport when a dirty rebuild replaces it", async () => {
const initial = makeMockTransport("initial");
const rebuilt = makeMockTransport("rebuilt");
const createTelegramTransport = vi.fn(() => rebuilt);
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
createTelegramTransport,
});
state.markDirty();
const acquired = state.acquireForNextCycle();
expect(acquired).toBe(rebuilt);
await flushMicrotasks();
expect(initial.close).toHaveBeenCalledTimes(1);
expect(rebuilt.close).not.toHaveBeenCalled();
expect(anyLogMatches(log, "closing stale transport")).toBe(true);
});
it("does not close when dirty rebuild keeps the same transport instance", async () => {
const initial = makeMockTransport("initial");
// createTelegramTransport returns the same instance — e.g., factory returned null → fallback to previous
const createTelegramTransport = vi.fn(() => initial);
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
createTelegramTransport,
});
state.markDirty();
state.acquireForNextCycle();
await flushMicrotasks();
expect(initial.close).not.toHaveBeenCalled();
});
it("dispose() closes the currently-held transport and blocks until close resolves", async () => {
const initial = makeMockTransport("initial");
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
});
// Force the state to promote the initial transport into the held slot.
state.acquireForNextCycle();
let closeResolved = false;
initial.close.mockImplementationOnce(async () => {
await Promise.resolve();
closeResolved = true;
});
await state.dispose();
expect(initial.close).toHaveBeenCalledTimes(1);
expect(closeResolved).toBe(true);
});
it("dispose() is idempotent and safe with no transport", async () => {
const state = new TelegramPollingTransportState({ log });
await expect(state.dispose()).resolves.toBeUndefined();
await expect(state.dispose()).resolves.toBeUndefined();
});
it("dispose() swallows errors thrown by transport.close()", async () => {
const initial = makeMockTransport("initial");
initial.close.mockRejectedValueOnce(new Error("boom"));
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
});
state.acquireForNextCycle();
await expect(state.dispose()).resolves.toBeUndefined();
expect(anyLogMatches(log, "failed to close transport during dispose")).toBe(true);
});
it("acquireForNextCycle() returns undefined after dispose()", async () => {
const initial = makeMockTransport("initial");
const createTelegramTransport = vi.fn(() => makeMockTransport("rebuilt"));
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
createTelegramTransport,
});
await state.dispose();
const acquired = state.acquireForNextCycle();
expect(acquired).toBeUndefined();
expect(createTelegramTransport).not.toHaveBeenCalled();
});
it("clears the dirty flag even when no factory is configured", () => {
const initial = makeMockTransport("initial");
const state = new TelegramPollingTransportState({
log,
initialTransport: initial,
});
state.markDirty();
const acquired = state.acquireForNextCycle();
expect(acquired).toBe(initial);
// Next cycle without markDirty should not trigger another rebuild log.
state.acquireForNextCycle();
const rebuildLogs = log.mock.calls.filter((call) => {
const line = call[0];
return typeof line === "string" && line.includes("rebuilding transport");
});
expect(rebuildLogs.length).toBeLessThanOrEqual(1);
});
});

View File

@@ -9,6 +9,7 @@ type TelegramPollingTransportStateOpts = {
export class TelegramPollingTransportState {
#telegramTransport: TelegramTransport | undefined;
#transportDirty = false;
#disposed = false;
constructor(private readonly opts: TelegramPollingTransportStateOpts) {
this.#telegramTransport = opts.initialTransport;
@@ -19,10 +20,23 @@ export class TelegramPollingTransportState {
}
acquireForNextCycle(): TelegramTransport | undefined {
const shouldCreateTransport = this.#transportDirty || !this.#telegramTransport;
if (this.#disposed) {
return undefined;
}
const previous = this.#telegramTransport;
const shouldCreateTransport = this.#transportDirty || !previous;
const nextTransport = shouldCreateTransport
? (this.opts.createTelegramTransport?.() ?? this.#telegramTransport)
: this.#telegramTransport;
? (this.opts.createTelegramTransport?.() ?? previous)
: previous;
// When the dirty flag triggered a rebuild, release the old transport's
// dispatchers. Without this, each network stall / recoverable error
// leaves a full pool of keep-alive sockets to api.telegram.org dangling
// forever — which over long-running sessions accumulates into the
// hundreds of ESTABLISHED connections that choke per-IP upstream quotas.
if (this.#transportDirty && previous && nextTransport !== previous) {
this.opts.log("[telegram][diag] closing stale transport before rebuild");
this.#closeTransportAsync(previous, "stale-transport rebuild");
}
if (this.#transportDirty && nextTransport) {
this.opts.log("[telegram][diag] rebuilding transport for next polling cycle");
}
@@ -30,4 +44,40 @@ export class TelegramPollingTransportState {
this.#transportDirty = false;
return nextTransport;
}
async dispose(): Promise<void> {
if (this.#disposed) {
return;
}
this.#disposed = true;
const transport = this.#telegramTransport;
this.#telegramTransport = undefined;
if (!transport) {
return;
}
try {
await transport.close();
} catch (err) {
this.opts.log(
`[telegram][diag] failed to close transport during dispose: ${formatCloseError(err)}`,
);
}
}
// Fire-and-forget close used on the rebuild path so the polling cycle is not
// blocked by a slow destroy. The error path is logged but never rethrown.
#closeTransportAsync(transport: TelegramTransport, context: string) {
void transport.close().catch((err) => {
this.opts.log(
`[telegram][diag] failed to close transport (${context}): ${formatCloseError(err)}`,
);
});
}
}
function formatCloseError(err: unknown): string {
if (err instanceof Error) {
return err.message;
}
return String(err);
}

View File

@@ -136,7 +136,7 @@ describe("evaluateChannelHealth", () => {
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
});
it("skips stale-socket detection for telegram long-polling channels", () => {
it("flags stale sockets for telegram polling channels", () => {
const evaluation = evaluateChannelHealth(
{
running: true,
@@ -144,7 +144,49 @@ describe("evaluateChannelHealth", () => {
enabled: true,
configured: true,
lastStartAt: 0,
lastEventAt: null,
lastEventAt: 0,
mode: "polling",
},
{
channelId: "telegram",
now: 100_000,
channelConnectGraceMs: 10_000,
staleEventThresholdMs: 30_000,
},
);
expect(evaluation).toEqual({ healthy: false, reason: "stale-socket" });
});
it("skips stale-socket detection for telegram accounts without explicit polling mode", () => {
const evaluation = evaluateChannelHealth(
{
running: true,
connected: true,
enabled: true,
configured: true,
lastStartAt: 0,
lastEventAt: 0,
},
{
channelId: "telegram",
now: 100_000,
channelConnectGraceMs: 10_000,
staleEventThresholdMs: 30_000,
},
);
expect(evaluation).toEqual({ healthy: true, reason: "healthy" });
});
it("skips stale-socket detection for telegram accounts with malformed mode", () => {
const evaluation = evaluateChannelHealth(
{
running: true,
connected: true,
enabled: true,
configured: true,
lastStartAt: 0,
lastEventAt: 0,
mode: { polling: true } as unknown as string,
},
{
channelId: "telegram",

View File

@@ -59,6 +59,7 @@ export function evaluateChannelHealth(
snapshot: ChannelHealthSnapshot,
policy: ChannelHealthPolicy,
): ChannelHealthEvaluation {
const mode = typeof snapshot.mode === "string" ? snapshot.mode.trim().toLowerCase() : undefined;
if (!isManagedAccount(snapshot)) {
return { healthy: true, reason: "unmanaged" };
}
@@ -78,6 +79,10 @@ export function evaluateChannelHealth(
typeof snapshot.lastRunActivityAt === "number" && Number.isFinite(snapshot.lastRunActivityAt)
? snapshot.lastRunActivityAt
: null;
const lastEventAt =
typeof snapshot.lastEventAt === "number" && Number.isFinite(snapshot.lastEventAt)
? snapshot.lastEventAt
: null;
const busyStateInitializedForLifecycle =
lastStartAt == null || (lastRunActivityAt != null && lastRunActivityAt >= lastStartAt);
@@ -107,24 +112,23 @@ export function evaluateChannelHealth(
if (snapshot.connected === false) {
return { healthy: false, reason: "disconnected" };
}
// Skip stale-socket checks for channels that declare this health policy and
// any channel explicitly operating in webhook mode. In these cases, there is
// no persistent outgoing socket that can go half-dead, so the lack of
// incoming events does not necessarily indicate a connection failure.
if (
// Telegram only has reliable stale-socket liveness in explicit polling mode.
// Webhook accounts and malformed legacy mode values do not have a persistent
// outgoing socket to age-check.
const shouldCheckStaleSocket =
policy.skipStaleSocketCheck !== true &&
snapshot.mode !== "webhook" &&
snapshot.connected === true &&
snapshot.lastEventAt != null
) {
if (lastStartAt != null && snapshot.lastEventAt < lastStartAt) {
lastEventAt != null &&
(policy.channelId === "telegram" ? mode === "polling" : mode !== "webhook");
if (shouldCheckStaleSocket) {
if (lastStartAt != null && lastEventAt < lastStartAt) {
const lifecycleEventGap = Math.max(0, policy.now - lastStartAt);
if (lifecycleEventGap <= policy.staleEventThresholdMs) {
return { healthy: true, reason: "healthy" };
}
return { healthy: false, reason: "stale-socket" };
}
const eventAge = policy.now - snapshot.lastEventAt;
const eventAge = policy.now - lastEventAt;
if (eventAge > policy.staleEventThresholdMs) {
return { healthy: false, reason: "stale-socket" };
}