diff --git a/CHANGELOG.md b/CHANGELOG.md index 2db4805cee0..2e2e65653c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ Docs: https://docs.openclaw.ai - Cron/state errors: record `lastErrorReason` in cron job state and keep the gateway schema aligned with the full failover-reason set, including regression coverage for protocol conformance. (#14382) thanks @futuremind2026. - Tools/web search: recover OpenRouter Perplexity citation extraction from `message.annotations` when chat-completions responses omit top-level citations. (#40881) Thanks @laurieluo. - Security/external content: treat whitespace-delimited `EXTERNAL UNTRUSTED CONTENT` boundary markers like underscore-delimited variants so prompt wrappers cannot bypass marker sanitization. (#35983) Thanks @urianpaul94. +- Telegram/network env-proxy: apply configured transport policy to proxied HTTPS dispatchers as well as direct `NO_PROXY` bypasses, so resolver-scoped IPv4 fallback and network settings work consistently for env-proxied Telegram traffic. (#40740) Thanks @sircrumpet. ## 2026.3.8 diff --git a/extensions/telegram/src/channel.test.ts b/extensions/telegram/src/channel.test.ts index c1912db56f0..2bf1b681497 100644 --- a/extensions/telegram/src/channel.test.ts +++ b/extensions/telegram/src/channel.test.ts @@ -57,18 +57,38 @@ function installGatewayRuntime(params?: { probeOk?: boolean; botUsername?: strin const probeTelegram = vi.fn(async () => params?.probeOk ? { ok: true, bot: { username: params.botUsername ?? "bot" } } : { ok: false }, ); + const collectUnmentionedGroupIds = vi.fn(() => ({ + groupIds: [] as string[], + unresolvedGroups: 0, + hasWildcardUnmentionedGroups: false, + })); + const auditGroupMembership = vi.fn(async () => ({ + ok: true, + checkedGroups: 0, + unresolvedGroups: 0, + hasWildcardUnmentionedGroups: false, + groups: [], + elapsedMs: 0, + })); setTelegramRuntime({ channel: { telegram: { monitorTelegramProvider, probeTelegram, + collectUnmentionedGroupIds, + auditGroupMembership, }, }, logging: { shouldLogVerbose: () => false, }, } as unknown as PluginRuntime); - return { monitorTelegramProvider, probeTelegram }; + return { + monitorTelegramProvider, + probeTelegram, + collectUnmentionedGroupIds, + auditGroupMembership, + }; } describe("telegramPlugin duplicate token guard", () => { @@ -149,6 +169,85 @@ describe("telegramPlugin duplicate token guard", () => { ); }); + it("passes account proxy and network settings into Telegram probes", async () => { + const { probeTelegram } = installGatewayRuntime({ + probeOk: true, + botUsername: "opsbot", + }); + + const cfg = createCfg(); + cfg.channels!.telegram!.accounts!.ops = { + ...cfg.channels!.telegram!.accounts!.ops, + proxy: "http://127.0.0.1:8888", + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }; + const account = telegramPlugin.config.resolveAccount(cfg, "ops"); + + await telegramPlugin.status!.probeAccount!({ + account, + timeoutMs: 5000, + cfg, + }); + + expect(probeTelegram).toHaveBeenCalledWith("token-ops", 5000, { + accountId: "ops", + proxyUrl: "http://127.0.0.1:8888", + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + }); + + it("passes account proxy and network settings into Telegram membership audits", async () => { + const { collectUnmentionedGroupIds, auditGroupMembership } = installGatewayRuntime({ + probeOk: true, + botUsername: "opsbot", + }); + + collectUnmentionedGroupIds.mockReturnValue({ + groupIds: ["-100123"], + unresolvedGroups: 0, + hasWildcardUnmentionedGroups: false, + }); + + const cfg = createCfg(); + cfg.channels!.telegram!.accounts!.ops = { + ...cfg.channels!.telegram!.accounts!.ops, + proxy: "http://127.0.0.1:8888", + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + groups: { + "-100123": { requireMention: false }, + }, + }; + const account = telegramPlugin.config.resolveAccount(cfg, "ops"); + + await telegramPlugin.status!.auditAccount!({ + account, + timeoutMs: 5000, + probe: { ok: true, bot: { id: 123 }, elapsedMs: 1 }, + cfg, + }); + + expect(auditGroupMembership).toHaveBeenCalledWith({ + token: "token-ops", + botId: 123, + groupIds: ["-100123"], + proxyUrl: "http://127.0.0.1:8888", + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + timeoutMs: 5000, + }); + }); + it("forwards mediaLocalRoots to sendMessageTelegram for outbound media sends", async () => { const sendMessageTelegram = vi.fn(async () => ({ messageId: "tg-1" })); setTelegramRuntime({ diff --git a/extensions/telegram/src/channel.ts b/extensions/telegram/src/channel.ts index 7ea0a7a6525..5893f4e0a2e 100644 --- a/extensions/telegram/src/channel.ts +++ b/extensions/telegram/src/channel.ts @@ -438,11 +438,11 @@ export const telegramPlugin: ChannelPlugin buildTokenChannelStatusSummary(snapshot), probeAccount: async ({ account, timeoutMs }) => - getTelegramRuntime().channel.telegram.probeTelegram( - account.token, - timeoutMs, - account.config.proxy, - ), + getTelegramRuntime().channel.telegram.probeTelegram(account.token, timeoutMs, { + accountId: account.accountId, + proxyUrl: account.config.proxy, + network: account.config.network, + }), auditAccount: async ({ account, timeoutMs, probe, cfg }) => { const groups = cfg.channels?.telegram?.accounts?.[account.accountId]?.groups ?? @@ -468,6 +468,7 @@ export const telegramPlugin: ChannelPlugin { undiciFetch.mockResolvedValue({ ok: true }); const proxyFetch = makeProxyFetch(proxyUrl); + expect(proxyAgentSpy).not.toHaveBeenCalled(); await proxyFetch("https://api.example.com/v1/audio"); expect(proxyAgentSpy).toHaveBeenCalledWith(proxyUrl); diff --git a/src/infra/net/proxy-fetch.ts b/src/infra/net/proxy-fetch.ts index e6c11813959..391387f3cca 100644 --- a/src/infra/net/proxy-fetch.ts +++ b/src/infra/net/proxy-fetch.ts @@ -1,19 +1,46 @@ import { EnvHttpProxyAgent, ProxyAgent, fetch as undiciFetch } from "undici"; import { logWarn } from "../../logger.js"; +export const PROXY_FETCH_PROXY_URL = Symbol.for("openclaw.proxyFetch.proxyUrl"); +type ProxyFetchWithMetadata = typeof fetch & { + [PROXY_FETCH_PROXY_URL]?: string; +}; + /** * Create a fetch function that routes requests through the given HTTP proxy. * Uses undici's ProxyAgent under the hood. */ export function makeProxyFetch(proxyUrl: string): typeof fetch { - const agent = new ProxyAgent(proxyUrl); + let agent: ProxyAgent | null = null; + const resolveAgent = (): ProxyAgent => { + if (!agent) { + agent = new ProxyAgent(proxyUrl); + } + return agent; + }; // undici's fetch is runtime-compatible with global fetch but the types diverge // on stream/body internals. Single cast at the boundary keeps the rest type-safe. - return ((input: RequestInfo | URL, init?: RequestInit) => + const proxyFetch = ((input: RequestInfo | URL, init?: RequestInit) => undiciFetch(input as string | URL, { ...(init as Record), - dispatcher: agent, - }) as unknown as Promise) as typeof fetch; + dispatcher: resolveAgent(), + }) as unknown as Promise) as ProxyFetchWithMetadata; + Object.defineProperty(proxyFetch, PROXY_FETCH_PROXY_URL, { + value: proxyUrl, + enumerable: false, + configurable: false, + writable: false, + }); + return proxyFetch; +} + +export function getProxyUrlFromFetch(fetchImpl?: typeof fetch): string | undefined { + const proxyUrl = (fetchImpl as ProxyFetchWithMetadata | undefined)?.[PROXY_FETCH_PROXY_URL]; + if (typeof proxyUrl !== "string") { + return undefined; + } + const trimmed = proxyUrl.trim(); + return trimmed ? trimmed : undefined; } /** diff --git a/src/telegram/audit-membership-runtime.ts b/src/telegram/audit-membership-runtime.ts index 4f2c5a43710..c710fb92aa7 100644 --- a/src/telegram/audit-membership-runtime.ts +++ b/src/telegram/audit-membership-runtime.ts @@ -5,6 +5,7 @@ import type { TelegramGroupMembershipAudit, TelegramGroupMembershipAuditEntry, } from "./audit.js"; +import { resolveTelegramFetch } from "./fetch.js"; import { makeProxyFetch } from "./proxy.js"; const TELEGRAM_API_BASE = "https://api.telegram.org"; @@ -16,7 +17,8 @@ type TelegramGroupMembershipAuditData = Omit { - const fetcher = params.proxyUrl ? makeProxyFetch(params.proxyUrl) : fetch; + const proxyFetch = params.proxyUrl ? makeProxyFetch(params.proxyUrl) : undefined; + const fetcher = resolveTelegramFetch(proxyFetch, { network: params.network }); const base = `${TELEGRAM_API_BASE}/bot${params.token}`; const groups: TelegramGroupMembershipAuditEntry[] = []; diff --git a/src/telegram/audit.test.ts b/src/telegram/audit.test.ts index c7524c6ca05..e5cc4490e08 100644 --- a/src/telegram/audit.test.ts +++ b/src/telegram/audit.test.ts @@ -2,16 +2,22 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; let collectTelegramUnmentionedGroupIds: typeof import("./audit.js").collectTelegramUnmentionedGroupIds; let auditTelegramGroupMembership: typeof import("./audit.js").auditTelegramGroupMembership; +const undiciFetch = vi.hoisted(() => vi.fn()); + +vi.mock("undici", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + fetch: undiciFetch, + }; +}); function mockGetChatMemberStatus(status: string) { - vi.stubGlobal( - "fetch", - vi.fn().mockResolvedValueOnce( - new Response(JSON.stringify({ ok: true, result: { status } }), { - status: 200, - headers: { "Content-Type": "application/json" }, - }), - ), + undiciFetch.mockResolvedValueOnce( + new Response(JSON.stringify({ ok: true, result: { status } }), { + status: 200, + headers: { "Content-Type": "application/json" }, + }), ); } @@ -31,7 +37,7 @@ describe("telegram audit", () => { }); beforeEach(() => { - vi.unstubAllGlobals(); + undiciFetch.mockReset(); }); it("collects unmentioned numeric group ids and flags wildcard", async () => { diff --git a/src/telegram/audit.ts b/src/telegram/audit.ts index 24e5f58957a..6b667c37581 100644 --- a/src/telegram/audit.ts +++ b/src/telegram/audit.ts @@ -1,4 +1,5 @@ import type { TelegramGroupConfig } from "../config/types.js"; +import type { TelegramNetworkConfig } from "../config/types.telegram.js"; export type TelegramGroupMembershipAuditEntry = { chatId: string; @@ -64,6 +65,7 @@ export type AuditTelegramGroupMembershipParams = { botId: number; groupIds: string[]; proxyUrl?: string; + network?: TelegramNetworkConfig; timeoutMs: number; }; diff --git a/src/telegram/bot-handlers.ts b/src/telegram/bot-handlers.ts index 78290f342ad..2d1327bcd5f 100644 --- a/src/telegram/bot-handlers.ts +++ b/src/telegram/bot-handlers.ts @@ -123,6 +123,7 @@ export const registerTelegramHandlers = ({ accountId, bot, opts, + telegramFetchImpl, runtime, mediaMaxBytes, telegramCfg, @@ -371,7 +372,7 @@ export const registerTelegramHandlers = ({ for (const { ctx } of entry.messages) { let media; try { - media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch); + media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramFetchImpl); } catch (mediaErr) { if (!isRecoverableMediaGroupError(mediaErr)) { throw mediaErr; @@ -475,7 +476,7 @@ export const registerTelegramHandlers = ({ }, mediaMaxBytes, opts.token, - opts.proxyFetch, + telegramFetchImpl, ); if (!media) { return []; @@ -986,7 +987,7 @@ export const registerTelegramHandlers = ({ let media: Awaited> = null; try { - media = await resolveMedia(ctx, mediaMaxBytes, opts.token, opts.proxyFetch); + media = await resolveMedia(ctx, mediaMaxBytes, opts.token, telegramFetchImpl); } catch (mediaErr) { if (isMediaSizeLimitError(mediaErr)) { if (sendOversizeWarning) { diff --git a/src/telegram/bot-native-commands.ts b/src/telegram/bot-native-commands.ts index aa37c98e9b9..06148b17b33 100644 --- a/src/telegram/bot-native-commands.ts +++ b/src/telegram/bot-native-commands.ts @@ -94,6 +94,7 @@ export type RegisterTelegramHandlerParams = { bot: Bot; mediaMaxBytes: number; opts: TelegramBotOptions; + telegramFetchImpl?: typeof fetch; runtime: RuntimeEnv; telegramCfg: TelegramAccountConfig; allowFrom?: Array; diff --git a/src/telegram/bot.media.e2e-harness.ts b/src/telegram/bot.media.e2e-harness.ts index 58628df522b..d26eff44fb6 100644 --- a/src/telegram/bot.media.e2e-harness.ts +++ b/src/telegram/bot.media.e2e-harness.ts @@ -6,6 +6,9 @@ export const middlewareUseSpy: Mock = vi.fn(); export const onSpy: Mock = vi.fn(); export const stopSpy: Mock = vi.fn(); export const sendChatActionSpy: Mock = vi.fn(); +export const undiciFetchSpy: Mock = vi.fn((input: RequestInfo | URL, init?: RequestInit) => + globalThis.fetch(input, init), +); async function defaultSaveMediaBuffer(buffer: Buffer, contentType?: string) { return { @@ -81,6 +84,14 @@ vi.mock("@grammyjs/transformer-throttler", () => ({ apiThrottler: () => throttlerSpy(), })); +vi.mock("undici", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + fetch: (...args: Parameters) => undiciFetchSpy(...args), + }; +}); + vi.mock("../media/store.js", async (importOriginal) => { const actual = await importOriginal(); const mockModule = Object.create(null) as Record; diff --git a/src/telegram/bot.ts b/src/telegram/bot.ts index 8bfa0b8ac0c..48d0c745b42 100644 --- a/src/telegram/bot.ts +++ b/src/telegram/bot.ts @@ -439,6 +439,7 @@ export function createTelegramBot(opts: TelegramBotOptions) { accountId: account.accountId, bot, opts, + telegramFetchImpl: fetchImpl as unknown as typeof fetch | undefined, runtime, mediaMaxBytes, telegramCfg, diff --git a/src/telegram/bot/delivery.resolve-media-retry.test.ts b/src/telegram/bot/delivery.resolve-media-retry.test.ts index ce8f50abbbe..df6124343fd 100644 --- a/src/telegram/bot/delivery.resolve-media-retry.test.ts +++ b/src/telegram/bot/delivery.resolve-media-retry.test.ts @@ -293,6 +293,62 @@ describe("resolveMedia getFile retry", () => { expect(getFile).toHaveBeenCalledTimes(3); expect(result).toBeNull(); }); + + it("uses caller-provided fetch impl for file downloads", async () => { + const getFile = vi.fn().mockResolvedValue({ file_path: "documents/file_42.pdf" }); + const callerFetch = vi.fn() as unknown as typeof fetch; + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("pdf-data"), + contentType: "application/pdf", + fileName: "file_42.pdf", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/file_42---uuid.pdf", + contentType: "application/pdf", + }); + + const result = await resolveMedia( + makeCtx("document", getFile), + MAX_MEDIA_BYTES, + BOT_TOKEN, + callerFetch, + ); + + expect(result).not.toBeNull(); + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ + fetchImpl: callerFetch, + }), + ); + }); + + it("uses caller-provided fetch impl for sticker downloads", async () => { + const getFile = vi.fn().mockResolvedValue({ file_path: "stickers/file_0.webp" }); + const callerFetch = vi.fn() as unknown as typeof fetch; + fetchRemoteMedia.mockResolvedValueOnce({ + buffer: Buffer.from("sticker-data"), + contentType: "image/webp", + fileName: "file_0.webp", + }); + saveMediaBuffer.mockResolvedValueOnce({ + path: "/tmp/file_0.webp", + contentType: "image/webp", + }); + + const result = await resolveMedia( + makeCtx("sticker", getFile), + MAX_MEDIA_BYTES, + BOT_TOKEN, + callerFetch, + ); + + expect(result).not.toBeNull(); + expect(fetchRemoteMedia).toHaveBeenCalledWith( + expect.objectContaining({ + fetchImpl: callerFetch, + }), + ); + }); }); describe("resolveMedia original filename preservation", () => { diff --git a/src/telegram/bot/delivery.resolve-media.ts b/src/telegram/bot/delivery.resolve-media.ts index 14df1d6e2a8..9f560116a5d 100644 --- a/src/telegram/bot/delivery.resolve-media.ts +++ b/src/telegram/bot/delivery.resolve-media.ts @@ -92,12 +92,20 @@ async function resolveTelegramFileWithRetry( } } -function resolveRequiredFetchImpl(proxyFetch?: typeof fetch): typeof fetch { - const fetchImpl = proxyFetch ?? globalThis.fetch; - if (!fetchImpl) { +function resolveRequiredFetchImpl(fetchImpl?: typeof fetch): typeof fetch { + const resolved = fetchImpl ?? globalThis.fetch; + if (!resolved) { throw new Error("fetch is not available; set channels.telegram.proxy in config"); } - return fetchImpl; + return resolved; +} + +function resolveOptionalFetchImpl(fetchImpl?: typeof fetch): typeof fetch | null { + try { + return resolveRequiredFetchImpl(fetchImpl); + } catch { + return null; + } } /** Default idle timeout for Telegram media downloads (30 seconds). */ @@ -134,7 +142,7 @@ async function resolveStickerMedia(params: { ctx: TelegramContext; maxBytes: number; token: string; - proxyFetch?: typeof fetch; + fetchImpl?: typeof fetch; }): Promise< | { path: string; @@ -145,7 +153,7 @@ async function resolveStickerMedia(params: { | null | undefined > { - const { msg, ctx, maxBytes, token, proxyFetch } = params; + const { msg, ctx, maxBytes, token, fetchImpl } = params; if (!msg.sticker) { return undefined; } @@ -165,15 +173,15 @@ async function resolveStickerMedia(params: { logVerbose("telegram: getFile returned no file_path for sticker"); return null; } - const fetchImpl = proxyFetch ?? globalThis.fetch; - if (!fetchImpl) { + const resolvedFetchImpl = resolveOptionalFetchImpl(fetchImpl); + if (!resolvedFetchImpl) { logVerbose("telegram: fetch not available for sticker download"); return null; } const saved = await downloadAndSaveTelegramFile({ filePath: file.file_path, token, - fetchImpl, + fetchImpl: resolvedFetchImpl, maxBytes, }); @@ -229,7 +237,7 @@ export async function resolveMedia( ctx: TelegramContext, maxBytes: number, token: string, - proxyFetch?: typeof fetch, + fetchImpl?: typeof fetch, ): Promise<{ path: string; contentType?: string; @@ -242,7 +250,7 @@ export async function resolveMedia( ctx, maxBytes, token, - proxyFetch, + fetchImpl, }); if (stickerResolved !== undefined) { return stickerResolved; @@ -263,7 +271,7 @@ export async function resolveMedia( const saved = await downloadAndSaveTelegramFile({ filePath: file.file_path, token, - fetchImpl: resolveRequiredFetchImpl(proxyFetch), + fetchImpl: resolveRequiredFetchImpl(fetchImpl), maxBytes, telegramFileName: resolveTelegramFileName(msg), }); diff --git a/src/telegram/fetch.env-proxy-runtime.test.ts b/src/telegram/fetch.env-proxy-runtime.test.ts new file mode 100644 index 00000000000..0292f465747 --- /dev/null +++ b/src/telegram/fetch.env-proxy-runtime.test.ts @@ -0,0 +1,58 @@ +import { createRequire } from "node:module"; +import { afterEach, describe, expect, it, vi } from "vitest"; + +const require = createRequire(import.meta.url); +const EnvHttpProxyAgent = require("undici/lib/dispatcher/env-http-proxy-agent.js") as { + new (opts?: Record): Record; +}; +const { kHttpsProxyAgent, kNoProxyAgent } = require("undici/lib/core/symbols.js") as { + kHttpsProxyAgent: symbol; + kNoProxyAgent: symbol; +}; + +function getOwnSymbolValue( + target: Record, + description: string, +): Record | undefined { + const symbol = Object.getOwnPropertySymbols(target).find( + (entry) => entry.description === description, + ); + const value = symbol ? target[symbol] : undefined; + return value && typeof value === "object" ? (value as Record) : undefined; +} + +afterEach(() => { + vi.unstubAllEnvs(); +}); + +describe("undici env proxy semantics", () => { + it("uses proxyTls rather than connect for proxied HTTPS transport settings", () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + const connect = { + family: 4, + autoSelectFamily: false, + }; + + const withoutProxyTls = new EnvHttpProxyAgent({ connect }); + const noProxyAgent = withoutProxyTls[kNoProxyAgent] as Record; + const httpsProxyAgent = withoutProxyTls[kHttpsProxyAgent] as Record; + + expect(getOwnSymbolValue(noProxyAgent, "options")?.connect).toEqual( + expect.objectContaining(connect), + ); + expect(getOwnSymbolValue(httpsProxyAgent, "proxy tls settings")).toBeUndefined(); + + const withProxyTls = new EnvHttpProxyAgent({ + connect, + proxyTls: connect, + }); + const httpsProxyAgentWithProxyTls = withProxyTls[kHttpsProxyAgent] as Record< + PropertyKey, + unknown + >; + + expect(getOwnSymbolValue(httpsProxyAgentWithProxyTls, "proxy tls settings")).toEqual( + expect.objectContaining(connect), + ); + }); +}); diff --git a/src/telegram/fetch.test.ts b/src/telegram/fetch.test.ts index 95b26d931cb..dc4c7a5145a 100644 --- a/src/telegram/fetch.test.ts +++ b/src/telegram/fetch.test.ts @@ -1,25 +1,36 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { resolveFetch } from "../infra/fetch.js"; -import { resetTelegramFetchStateForTests, resolveTelegramFetch } from "./fetch.js"; +import { resolveTelegramFetch } from "./fetch.js"; -const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn()); const setDefaultResultOrder = vi.hoisted(() => vi.fn()); +const setDefaultAutoSelectFamily = vi.hoisted(() => vi.fn()); + +const undiciFetch = vi.hoisted(() => vi.fn()); const setGlobalDispatcher = vi.hoisted(() => vi.fn()); -const getGlobalDispatcherState = vi.hoisted(() => ({ value: undefined as unknown })); -const getGlobalDispatcher = vi.hoisted(() => vi.fn(() => getGlobalDispatcherState.value)); -const EnvHttpProxyAgentCtor = vi.hoisted(() => - vi.fn(function MockEnvHttpProxyAgent(this: { options: unknown }, options: unknown) { +const AgentCtor = vi.hoisted(() => + vi.fn(function MockAgent( + this: { options?: Record }, + options?: Record, + ) { + this.options = options; + }), +); +const EnvHttpProxyAgentCtor = vi.hoisted(() => + vi.fn(function MockEnvHttpProxyAgent( + this: { options?: Record }, + options?: Record, + ) { + this.options = options; + }), +); +const ProxyAgentCtor = vi.hoisted(() => + vi.fn(function MockProxyAgent( + this: { options?: Record | string }, + options?: Record | string, + ) { this.options = options; }), ); - -vi.mock("node:net", async () => { - const actual = await vi.importActual("node:net"); - return { - ...actual, - setDefaultAutoSelectFamily, - }; -}); vi.mock("node:dns", async () => { const actual = await vi.importActual("node:dns"); @@ -29,266 +40,655 @@ vi.mock("node:dns", async () => { }; }); +vi.mock("node:net", async () => { + const actual = await vi.importActual("node:net"); + return { + ...actual, + setDefaultAutoSelectFamily, + }; +}); + vi.mock("undici", () => ({ + Agent: AgentCtor, EnvHttpProxyAgent: EnvHttpProxyAgentCtor, - getGlobalDispatcher, + ProxyAgent: ProxyAgentCtor, + fetch: undiciFetch, setGlobalDispatcher, })); -const originalFetch = globalThis.fetch; - -function expectEnvProxyAgentConstructorCall(params: { nth: number; autoSelectFamily: boolean }) { - expect(EnvHttpProxyAgentCtor).toHaveBeenNthCalledWith(params.nth, { - connect: { - autoSelectFamily: params.autoSelectFamily, - autoSelectFamilyAttemptTimeout: 300, - }, - }); +function resolveTelegramFetchOrThrow( + proxyFetch?: typeof fetch, + options?: { network?: { autoSelectFamily?: boolean; dnsResultOrder?: "ipv4first" | "verbatim" } }, +) { + return resolveTelegramFetch(proxyFetch, options); } -function resolveTelegramFetchOrThrow() { - const resolved = resolveTelegramFetch(); - if (!resolved) { - throw new Error("expected resolved fetch"); +function getDispatcherFromUndiciCall(nth: number) { + const call = undiciFetch.mock.calls[nth - 1] as [RequestInfo | URL, RequestInit?] | undefined; + if (!call) { + throw new Error(`missing undici fetch call #${nth}`); } - return resolved; + const init = call[1] as (RequestInit & { dispatcher?: unknown }) | undefined; + return init?.dispatcher as + | { + options?: { + connect?: Record; + proxyTls?: Record; + }; + } + | undefined; +} + +function buildFetchFallbackError(code: string) { + const connectErr = Object.assign(new Error(`connect ${code} api.telegram.org:443`), { + code, + }); + return Object.assign(new TypeError("fetch failed"), { + cause: connectErr, + }); } afterEach(() => { - resetTelegramFetchStateForTests(); - setDefaultAutoSelectFamily.mockReset(); - setDefaultResultOrder.mockReset(); + undiciFetch.mockReset(); setGlobalDispatcher.mockReset(); - getGlobalDispatcher.mockClear(); - getGlobalDispatcherState.value = undefined; + AgentCtor.mockClear(); EnvHttpProxyAgentCtor.mockClear(); + ProxyAgentCtor.mockClear(); + setDefaultResultOrder.mockReset(); + setDefaultAutoSelectFamily.mockReset(); vi.unstubAllEnvs(); vi.clearAllMocks(); - if (originalFetch) { - globalThis.fetch = originalFetch; - } else { - delete (globalThis as { fetch?: typeof fetch }).fetch; - } }); describe("resolveTelegramFetch", () => { - it("returns wrapped global fetch when available", async () => { - const fetchMock = vi.fn(async () => ({})); - globalThis.fetch = fetchMock as unknown as typeof fetch; + it("wraps proxy fetches and leaves retry policy to caller-provided fetch", async () => { + const proxyFetch = vi.fn(async () => ({ ok: true }) as Response) as unknown as typeof fetch; - const resolved = resolveTelegramFetch(); + const resolved = resolveTelegramFetchOrThrow(proxyFetch); - expect(resolved).toBeTypeOf("function"); - expect(resolved).not.toBe(fetchMock); - }); + await resolved("https://api.telegram.org/botx/getMe"); - it("wraps proxy fetches and normalizes foreign signals once", async () => { - let seenSignal: AbortSignal | undefined; - const proxyFetch = vi.fn(async (_input: RequestInfo | URL, init?: RequestInit) => { - seenSignal = init?.signal as AbortSignal | undefined; - return {} as Response; - }); - - const resolved = resolveTelegramFetch(proxyFetch as unknown as typeof fetch); - expect(resolved).toBeTypeOf("function"); - - let abortHandler: (() => void) | null = null; - const addEventListener = vi.fn((event: string, handler: () => void) => { - if (event === "abort") { - abortHandler = handler; - } - }); - const removeEventListener = vi.fn((event: string, handler: () => void) => { - if (event === "abort" && abortHandler === handler) { - abortHandler = null; - } - }); - const fakeSignal = { - aborted: false, - addEventListener, - removeEventListener, - } as unknown as AbortSignal; - - if (!resolved) { - throw new Error("expected resolved proxy fetch"); - } - await resolved("https://example.com", { signal: fakeSignal }); - - expect(proxyFetch).toHaveBeenCalledOnce(); - expect(seenSignal).toBeInstanceOf(AbortSignal); - expect(seenSignal).not.toBe(fakeSignal); - expect(addEventListener).toHaveBeenCalledTimes(1); - expect(removeEventListener).toHaveBeenCalledTimes(1); + expect(proxyFetch).toHaveBeenCalledTimes(1); + expect(undiciFetch).not.toHaveBeenCalled(); }); it("does not double-wrap an already wrapped proxy fetch", async () => { const proxyFetch = vi.fn(async () => ({ ok: true }) as Response) as unknown as typeof fetch; - const alreadyWrapped = resolveFetch(proxyFetch); + const wrapped = resolveFetch(proxyFetch); - const resolved = resolveTelegramFetch(alreadyWrapped); + const resolved = resolveTelegramFetch(wrapped); - expect(resolved).toBe(alreadyWrapped); + expect(resolved).toBe(wrapped); }); - it("honors env enable override", async () => { - vi.stubEnv("OPENCLAW_TELEGRAM_ENABLE_AUTO_SELECT_FAMILY", "1"); - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(); - expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(true); - }); + it("uses resolver-scoped Agent dispatcher with configured transport policy", async () => { + undiciFetch.mockResolvedValue({ ok: true } as Response); - it("uses config override when provided", async () => { - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(true); - }); - - it("env disable override wins over config", async () => { - vi.stubEnv("OPENCLAW_TELEGRAM_ENABLE_AUTO_SELECT_FAMILY", "0"); - vi.stubEnv("OPENCLAW_TELEGRAM_DISABLE_AUTO_SELECT_FAMILY", "1"); - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - expect(setDefaultAutoSelectFamily).toHaveBeenCalledWith(false); - }); - - it("applies dns result order from config", async () => { - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(undefined, { network: { dnsResultOrder: "verbatim" } }); - expect(setDefaultResultOrder).toHaveBeenCalledWith("verbatim"); - }); - - it("retries dns setter on next call when previous attempt threw", async () => { - setDefaultResultOrder.mockImplementationOnce(() => { - throw new Error("dns setter failed once"); - }); - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - - resolveTelegramFetch(undefined, { network: { dnsResultOrder: "ipv4first" } }); - resolveTelegramFetch(undefined, { network: { dnsResultOrder: "ipv4first" } }); - - expect(setDefaultResultOrder).toHaveBeenCalledTimes(2); - }); - - it("replaces global undici dispatcher with proxy-aware EnvHttpProxyAgent", async () => { - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - - expect(setGlobalDispatcher).toHaveBeenCalledTimes(1); - expectEnvProxyAgentConstructorCall({ nth: 1, autoSelectFamily: true }); - }); - - it("keeps an existing proxy-like global dispatcher", async () => { - getGlobalDispatcherState.value = { - constructor: { name: "ProxyAgent" }, - }; - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - - expect(setGlobalDispatcher).not.toHaveBeenCalled(); - expect(EnvHttpProxyAgentCtor).not.toHaveBeenCalled(); - }); - - it("updates proxy-like dispatcher when proxy env is configured", async () => { - vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); - getGlobalDispatcherState.value = { - constructor: { name: "ProxyAgent" }, - }; - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - - expect(setGlobalDispatcher).toHaveBeenCalledTimes(1); - expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(1); - }); - - it("sets global dispatcher only once across repeated equal decisions", async () => { - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - - expect(setGlobalDispatcher).toHaveBeenCalledTimes(1); - }); - - it("updates global dispatcher when autoSelectFamily decision changes", async () => { - globalThis.fetch = vi.fn(async () => ({})) as unknown as typeof fetch; - resolveTelegramFetch(undefined, { network: { autoSelectFamily: true } }); - resolveTelegramFetch(undefined, { network: { autoSelectFamily: false } }); - - expect(setGlobalDispatcher).toHaveBeenCalledTimes(2); - expectEnvProxyAgentConstructorCall({ nth: 1, autoSelectFamily: true }); - expectEnvProxyAgentConstructorCall({ nth: 2, autoSelectFamily: false }); - }); - - it("retries once with ipv4 fallback when fetch fails with network timeout/unreachable", async () => { - const timeoutErr = Object.assign(new Error("connect ETIMEDOUT 149.154.166.110:443"), { - code: "ETIMEDOUT", - }); - const unreachableErr = Object.assign( - new Error("connect ENETUNREACH 2001:67c:4e8:f004::9:443"), - { - code: "ENETUNREACH", + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "verbatim", }, - ); - const fetchError = Object.assign(new TypeError("fetch failed"), { - cause: Object.assign(new Error("aggregate"), { - errors: [timeoutErr, unreachableErr], - }), }); - const fetchMock = vi - .fn() - .mockRejectedValueOnce(fetchError) - .mockResolvedValueOnce({ ok: true } as Response); - globalThis.fetch = fetchMock as unknown as typeof fetch; - const resolved = resolveTelegramFetchOrThrow(); + await resolved("https://api.telegram.org/botx/getMe"); - await resolved("https://api.telegram.org/file/botx/photos/file_1.jpg"); + expect(AgentCtor).toHaveBeenCalledTimes(1); + expect(EnvHttpProxyAgentCtor).not.toHaveBeenCalled(); - expect(fetchMock).toHaveBeenCalledTimes(2); - expect(setGlobalDispatcher).toHaveBeenCalledTimes(2); - expectEnvProxyAgentConstructorCall({ nth: 1, autoSelectFamily: true }); - expectEnvProxyAgentConstructorCall({ nth: 2, autoSelectFamily: false }); + const dispatcher = getDispatcherFromUndiciCall(1); + expect(dispatcher).toBeDefined(); + expect(dispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(typeof dispatcher?.options?.connect?.lookup).toBe("function"); }); - it("retries with ipv4 fallback once per request, not once per process", async () => { - const timeoutErr = Object.assign(new Error("connect ETIMEDOUT 149.154.166.110:443"), { - code: "ETIMEDOUT", + it("uses EnvHttpProxyAgent dispatcher when proxy env is configured", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + undiciFetch.mockResolvedValue({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, }); - const fetchError = Object.assign(new TypeError("fetch failed"), { - cause: timeoutErr, + + await resolved("https://api.telegram.org/botx/getMe"); + + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(1); + expect(AgentCtor).not.toHaveBeenCalled(); + + const dispatcher = getDispatcherFromUndiciCall(1); + expect(dispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: false, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(dispatcher?.options?.proxyTls).toEqual( + expect.objectContaining({ + autoSelectFamily: false, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + }); + + it("pins env-proxy transport policy onto proxyTls for proxied HTTPS requests", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + undiciFetch.mockResolvedValue({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, }); - const fetchMock = vi - .fn() + + await resolved("https://api.telegram.org/botx/getMe"); + + const dispatcher = getDispatcherFromUndiciCall(1); + expect(dispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(dispatcher?.options?.proxyTls).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + }); + + it("keeps resolver-scoped transport policy for OpenClaw proxy fetches", async () => { + const { makeProxyFetch } = await import("./proxy.js"); + const proxyFetch = makeProxyFetch("http://127.0.0.1:7890"); + ProxyAgentCtor.mockClear(); + undiciFetch.mockResolvedValue({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(proxyFetch, { + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + + await resolved("https://api.telegram.org/botx/getMe"); + + expect(ProxyAgentCtor).toHaveBeenCalledTimes(1); + expect(EnvHttpProxyAgentCtor).not.toHaveBeenCalled(); + expect(AgentCtor).not.toHaveBeenCalled(); + const dispatcher = getDispatcherFromUndiciCall(1); + expect(dispatcher?.options).toEqual( + expect.objectContaining({ + uri: "http://127.0.0.1:7890", + }), + ); + expect(dispatcher?.options?.proxyTls).toEqual( + expect.objectContaining({ + autoSelectFamily: false, + }), + ); + }); + + it("does not blind-retry when sticky IPv4 fallback is disallowed for explicit proxy paths", async () => { + const { makeProxyFetch } = await import("./proxy.js"); + const proxyFetch = makeProxyFetch("http://127.0.0.1:7890"); + ProxyAgentCtor.mockClear(); + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch.mockRejectedValueOnce(fetchError).mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(proxyFetch, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + await expect(resolved("https://api.telegram.org/botx/sendMessage")).rejects.toThrow( + "fetch failed", + ); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(undiciFetch).toHaveBeenCalledTimes(2); + expect(ProxyAgentCtor).toHaveBeenCalledTimes(1); + + const firstDispatcher = getDispatcherFromUndiciCall(1); + const secondDispatcher = getDispatcherFromUndiciCall(2); + + expect(firstDispatcher).toBe(secondDispatcher); + expect(firstDispatcher?.options?.proxyTls).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(firstDispatcher?.options?.proxyTls?.family).not.toBe(4); + }); + + it("does not blind-retry when sticky IPv4 fallback is disallowed for env proxy paths", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch.mockRejectedValueOnce(fetchError).mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + await expect(resolved("https://api.telegram.org/botx/sendMessage")).rejects.toThrow( + "fetch failed", + ); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(undiciFetch).toHaveBeenCalledTimes(2); + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(1); + + const firstDispatcher = getDispatcherFromUndiciCall(1); + const secondDispatcher = getDispatcherFromUndiciCall(2); + + expect(firstDispatcher).toBe(secondDispatcher); + expect(firstDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(firstDispatcher?.options?.connect?.family).not.toBe(4); + }); + + it("treats ALL_PROXY-only env as direct transport and arms sticky IPv4 fallback", async () => { + vi.stubEnv("ALL_PROXY", "socks5://127.0.0.1:1080"); + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch .mockRejectedValueOnce(fetchError) .mockResolvedValueOnce({ ok: true } as Response) - .mockRejectedValueOnce(fetchError) .mockResolvedValueOnce({ ok: true } as Response); - globalThis.fetch = fetchMock as unknown as typeof fetch; - const resolved = resolveTelegramFetchOrThrow(); + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); - await resolved("https://api.telegram.org/file/botx/photos/file_1.jpg"); - await resolved("https://api.telegram.org/file/botx/photos/file_2.jpg"); + await resolved("https://api.telegram.org/botx/sendMessage"); + await resolved("https://api.telegram.org/botx/sendChatAction"); - expect(fetchMock).toHaveBeenCalledTimes(4); + expect(EnvHttpProxyAgentCtor).not.toHaveBeenCalled(); + expect(AgentCtor).toHaveBeenCalledTimes(2); + + const firstDispatcher = getDispatcherFromUndiciCall(1); + const secondDispatcher = getDispatcherFromUndiciCall(2); + const thirdDispatcher = getDispatcherFromUndiciCall(3); + + expect(firstDispatcher).not.toBe(secondDispatcher); + expect(secondDispatcher).toBe(thirdDispatcher); + expect(secondDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + family: 4, + autoSelectFamily: false, + }), + ); }); - it("does not retry when fetch fails without fallback network error codes", async () => { - const fetchError = Object.assign(new TypeError("fetch failed"), { - cause: Object.assign(new Error("connect ECONNRESET"), { - code: "ECONNRESET", - }), + it("arms sticky IPv4 fallback when env proxy init falls back to direct Agent", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + EnvHttpProxyAgentCtor.mockImplementationOnce(function ThrowingEnvProxyAgent() { + throw new Error("invalid proxy config"); }); - const fetchMock = vi.fn().mockRejectedValue(fetchError); - globalThis.fetch = fetchMock as unknown as typeof fetch; + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch + .mockRejectedValueOnce(fetchError) + .mockResolvedValueOnce({ ok: true } as Response) + .mockResolvedValueOnce({ ok: true } as Response); - const resolved = resolveTelegramFetchOrThrow(); + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); - await expect(resolved("https://api.telegram.org/file/botx/photos/file_3.jpg")).rejects.toThrow( + await resolved("https://api.telegram.org/botx/sendMessage"); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(undiciFetch).toHaveBeenCalledTimes(3); + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(1); + expect(AgentCtor).toHaveBeenCalledTimes(2); + + const firstDispatcher = getDispatcherFromUndiciCall(1); + const secondDispatcher = getDispatcherFromUndiciCall(2); + const thirdDispatcher = getDispatcherFromUndiciCall(3); + + expect(firstDispatcher).not.toBe(secondDispatcher); + expect(secondDispatcher).toBe(thirdDispatcher); + expect(secondDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + family: 4, + autoSelectFamily: false, + }), + ); + }); + + it("arms sticky IPv4 fallback when NO_PROXY bypasses telegram under env proxy", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + vi.stubEnv("NO_PROXY", "api.telegram.org"); + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch + .mockRejectedValueOnce(fetchError) + .mockResolvedValueOnce({ ok: true } as Response) + .mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + await resolved("https://api.telegram.org/botx/sendMessage"); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(undiciFetch).toHaveBeenCalledTimes(3); + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(2); + expect(AgentCtor).not.toHaveBeenCalled(); + + const firstDispatcher = getDispatcherFromUndiciCall(1); + const secondDispatcher = getDispatcherFromUndiciCall(2); + const thirdDispatcher = getDispatcherFromUndiciCall(3); + + expect(firstDispatcher).not.toBe(secondDispatcher); + expect(secondDispatcher).toBe(thirdDispatcher); + expect(secondDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + family: 4, + autoSelectFamily: false, + }), + ); + }); + + it("uses no_proxy over NO_PROXY when deciding env-proxy bypass", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + vi.stubEnv("NO_PROXY", ""); + vi.stubEnv("no_proxy", "api.telegram.org"); + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch + .mockRejectedValueOnce(fetchError) + .mockResolvedValueOnce({ ok: true } as Response) + .mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + await resolved("https://api.telegram.org/botx/sendMessage"); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(2); + const secondDispatcher = getDispatcherFromUndiciCall(2); + expect(secondDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + family: 4, + autoSelectFamily: false, + }), + ); + }); + + it("matches whitespace and wildcard no_proxy entries like EnvHttpProxyAgent", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + vi.stubEnv("no_proxy", "localhost *.telegram.org"); + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch + .mockRejectedValueOnce(fetchError) + .mockResolvedValueOnce({ ok: true } as Response) + .mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + await resolved("https://api.telegram.org/botx/sendMessage"); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(2); + const secondDispatcher = getDispatcherFromUndiciCall(2); + expect(secondDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + family: 4, + autoSelectFamily: false, + }), + ); + }); + + it("fails closed when explicit proxy dispatcher initialization fails", async () => { + const { makeProxyFetch } = await import("./proxy.js"); + const proxyFetch = makeProxyFetch("http://127.0.0.1:7890"); + ProxyAgentCtor.mockClear(); + ProxyAgentCtor.mockImplementationOnce(function ThrowingProxyAgent() { + throw new Error("invalid proxy config"); + }); + + expect(() => + resolveTelegramFetchOrThrow(proxyFetch, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }), + ).toThrow("explicit proxy dispatcher init failed: invalid proxy config"); + }); + + it("falls back to Agent when env proxy dispatcher initialization fails", async () => { + vi.stubEnv("HTTPS_PROXY", "http://127.0.0.1:7890"); + EnvHttpProxyAgentCtor.mockImplementationOnce(function ThrowingEnvProxyAgent() { + throw new Error("invalid proxy config"); + }); + undiciFetch.mockResolvedValue({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: false, + }, + }); + + await resolved("https://api.telegram.org/botx/getMe"); + + expect(EnvHttpProxyAgentCtor).toHaveBeenCalledTimes(1); + expect(AgentCtor).toHaveBeenCalledTimes(1); + + const dispatcher = getDispatcherFromUndiciCall(1); + expect(dispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: false, + }), + ); + }); + + it("retries once and then keeps sticky IPv4 dispatcher for subsequent requests", async () => { + const fetchError = buildFetchFallbackError("ETIMEDOUT"); + undiciFetch + .mockRejectedValueOnce(fetchError) + .mockResolvedValueOnce({ ok: true } as Response) + .mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + }, + }); + + await resolved("https://api.telegram.org/botx/sendMessage"); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(undiciFetch).toHaveBeenCalledTimes(3); + + const firstDispatcher = getDispatcherFromUndiciCall(1); + const secondDispatcher = getDispatcherFromUndiciCall(2); + const thirdDispatcher = getDispatcherFromUndiciCall(3); + + expect(firstDispatcher).toBeDefined(); + expect(secondDispatcher).toBeDefined(); + expect(thirdDispatcher).toBeDefined(); + + expect(firstDispatcher).not.toBe(secondDispatcher); + expect(secondDispatcher).toBe(thirdDispatcher); + + expect(firstDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(secondDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + family: 4, + autoSelectFamily: false, + }), + ); + }); + + it("preserves caller-provided dispatcher across fallback retry", async () => { + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch.mockRejectedValueOnce(fetchError).mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + }, + }); + + const callerDispatcher = { name: "caller" }; + + await resolved("https://api.telegram.org/botx/sendMessage", { + dispatcher: callerDispatcher, + } as RequestInit); + + expect(undiciFetch).toHaveBeenCalledTimes(2); + + const firstCallInit = undiciFetch.mock.calls[0]?.[1] as + | (RequestInit & { dispatcher?: unknown }) + | undefined; + const secondCallInit = undiciFetch.mock.calls[1]?.[1] as + | (RequestInit & { dispatcher?: unknown }) + | undefined; + + expect(firstCallInit?.dispatcher).toBe(callerDispatcher); + expect(secondCallInit?.dispatcher).toBe(callerDispatcher); + }); + + it("does not arm sticky fallback from caller-provided dispatcher failures", async () => { + const fetchError = buildFetchFallbackError("EHOSTUNREACH"); + undiciFetch + .mockRejectedValueOnce(fetchError) + .mockResolvedValueOnce({ ok: true } as Response) + .mockResolvedValueOnce({ ok: true } as Response); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + }, + }); + + const callerDispatcher = { name: "caller" }; + + await resolved("https://api.telegram.org/botx/sendMessage", { + dispatcher: callerDispatcher, + } as RequestInit); + await resolved("https://api.telegram.org/botx/sendChatAction"); + + expect(undiciFetch).toHaveBeenCalledTimes(3); + + const firstCallInit = undiciFetch.mock.calls[0]?.[1] as + | (RequestInit & { dispatcher?: unknown }) + | undefined; + const secondCallInit = undiciFetch.mock.calls[1]?.[1] as + | (RequestInit & { dispatcher?: unknown }) + | undefined; + const thirdDispatcher = getDispatcherFromUndiciCall(3); + + expect(firstCallInit?.dispatcher).toBe(callerDispatcher); + expect(secondCallInit?.dispatcher).toBe(callerDispatcher); + expect(thirdDispatcher?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + autoSelectFamilyAttemptTimeout: 300, + }), + ); + expect(thirdDispatcher?.options?.connect?.family).not.toBe(4); + }); + + it("does not retry when error codes do not match fallback rules", async () => { + const fetchError = buildFetchFallbackError("ECONNRESET"); + undiciFetch.mockRejectedValue(fetchError); + + const resolved = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + }, + }); + + await expect(resolved("https://api.telegram.org/botx/sendMessage")).rejects.toThrow( "fetch failed", ); - expect(fetchMock).toHaveBeenCalledTimes(1); + expect(undiciFetch).toHaveBeenCalledTimes(1); + }); + + it("keeps per-resolver transport policy isolated across multiple accounts", async () => { + undiciFetch.mockResolvedValue({ ok: true } as Response); + + const resolverA = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + const resolverB = resolveTelegramFetchOrThrow(undefined, { + network: { + autoSelectFamily: true, + dnsResultOrder: "verbatim", + }, + }); + + await resolverA("https://api.telegram.org/botA/getMe"); + await resolverB("https://api.telegram.org/botB/getMe"); + + const dispatcherA = getDispatcherFromUndiciCall(1); + const dispatcherB = getDispatcherFromUndiciCall(2); + + expect(dispatcherA).toBeDefined(); + expect(dispatcherB).toBeDefined(); + expect(dispatcherA).not.toBe(dispatcherB); + + expect(dispatcherA?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: false, + }), + ); + expect(dispatcherB?.options?.connect).toEqual( + expect.objectContaining({ + autoSelectFamily: true, + }), + ); + + // Core guarantee: Telegram transport no longer mutates process-global defaults. + expect(setGlobalDispatcher).not.toHaveBeenCalled(); + expect(setDefaultResultOrder).not.toHaveBeenCalled(); + expect(setDefaultAutoSelectFamily).not.toHaveBeenCalled(); }); }); diff --git a/src/telegram/fetch.ts b/src/telegram/fetch.ts index f1e50021e92..3934c10c391 100644 --- a/src/telegram/fetch.ts +++ b/src/telegram/fetch.ts @@ -1,23 +1,43 @@ import * as dns from "node:dns"; -import * as net from "node:net"; -import { EnvHttpProxyAgent, getGlobalDispatcher, setGlobalDispatcher } from "undici"; +import { Agent, EnvHttpProxyAgent, ProxyAgent, fetch as undiciFetch } from "undici"; import type { TelegramNetworkConfig } from "../config/types.telegram.js"; import { resolveFetch } from "../infra/fetch.js"; -import { hasProxyEnvConfigured } from "../infra/net/proxy-env.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { resolveTelegramAutoSelectFamilyDecision, resolveTelegramDnsResultOrderDecision, } from "./network-config.js"; +import { getProxyUrlFromFetch } from "./proxy.js"; -let appliedAutoSelectFamily: boolean | null = null; -let appliedDnsResultOrder: string | null = null; -let appliedGlobalDispatcherAutoSelectFamily: boolean | null = null; const log = createSubsystemLogger("telegram/network"); -function isProxyLikeDispatcher(dispatcher: unknown): boolean { - const ctorName = (dispatcher as { constructor?: { name?: string } })?.constructor?.name; - return typeof ctorName === "string" && ctorName.includes("ProxyAgent"); -} + +const TELEGRAM_AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS = 300; +const TELEGRAM_API_HOSTNAME = "api.telegram.org"; + +type RequestInitWithDispatcher = RequestInit & { + dispatcher?: unknown; +}; + +type TelegramDispatcher = Agent | EnvHttpProxyAgent | ProxyAgent; + +type TelegramDispatcherMode = "direct" | "env-proxy" | "explicit-proxy"; + +type TelegramDnsResultOrder = "ipv4first" | "verbatim"; + +type LookupCallback = + | ((err: NodeJS.ErrnoException | null, address: string, family: number) => void) + | ((err: NodeJS.ErrnoException | null, addresses: dns.LookupAddress[]) => void); + +type LookupOptions = (dns.LookupOneOptions | dns.LookupAllOptions) & { + order?: TelegramDnsResultOrder; + verbatim?: boolean; +}; + +type LookupFunction = ( + hostname: string, + options: number | dns.LookupOneOptions | dns.LookupAllOptions | undefined, + callback: LookupCallback, +) => void; const FALLBACK_RETRY_ERROR_CODES = [ "ETIMEDOUT", @@ -48,72 +68,215 @@ const IPV4_FALLBACK_RULES: readonly Ipv4FallbackRule[] = [ }, ]; -// Node 22 workaround: enable autoSelectFamily to allow IPv4 fallback on broken IPv6 networks. -// Many networks have IPv6 configured but not routed, causing "Network is unreachable" errors. -// See: https://github.com/nodejs/node/issues/54359 -function applyTelegramNetworkWorkarounds(network?: TelegramNetworkConfig): void { - // Apply autoSelectFamily workaround - const autoSelectDecision = resolveTelegramAutoSelectFamilyDecision({ network }); - if (autoSelectDecision.value !== null && autoSelectDecision.value !== appliedAutoSelectFamily) { - if (typeof net.setDefaultAutoSelectFamily === "function") { - try { - net.setDefaultAutoSelectFamily(autoSelectDecision.value); - appliedAutoSelectFamily = autoSelectDecision.value; - const label = autoSelectDecision.source ? ` (${autoSelectDecision.source})` : ""; - log.info(`autoSelectFamily=${autoSelectDecision.value}${label}`); - } catch { - // ignore if unsupported by the runtime - } - } +function normalizeDnsResultOrder(value: string | null): TelegramDnsResultOrder | null { + if (value === "ipv4first" || value === "verbatim") { + return value; + } + return null; +} + +function createDnsResultOrderLookup( + order: TelegramDnsResultOrder | null, +): LookupFunction | undefined { + if (!order) { + return undefined; + } + const lookup = dns.lookup as unknown as ( + hostname: string, + options: LookupOptions, + callback: LookupCallback, + ) => void; + return (hostname, options, callback) => { + const baseOptions: LookupOptions = + typeof options === "number" + ? { family: options } + : options + ? { ...(options as LookupOptions) } + : {}; + const lookupOptions: LookupOptions = { + ...baseOptions, + order, + // Keep `verbatim` for compatibility with Node runtimes that ignore `order`. + verbatim: order === "verbatim", + }; + lookup(hostname, lookupOptions, callback); + }; +} + +function buildTelegramConnectOptions(params: { + autoSelectFamily: boolean | null; + dnsResultOrder: TelegramDnsResultOrder | null; + forceIpv4: boolean; +}): { + autoSelectFamily?: boolean; + autoSelectFamilyAttemptTimeout?: number; + family?: number; + lookup?: LookupFunction; +} | null { + const connect: { + autoSelectFamily?: boolean; + autoSelectFamilyAttemptTimeout?: number; + family?: number; + lookup?: LookupFunction; + } = {}; + + if (params.forceIpv4) { + connect.family = 4; + connect.autoSelectFamily = false; + } else if (typeof params.autoSelectFamily === "boolean") { + connect.autoSelectFamily = params.autoSelectFamily; + connect.autoSelectFamilyAttemptTimeout = TELEGRAM_AUTO_SELECT_FAMILY_ATTEMPT_TIMEOUT_MS; } - // Node 22's built-in globalThis.fetch uses undici's internal Agent whose - // connect options are frozen at construction time. Calling - // net.setDefaultAutoSelectFamily() after that agent is created has no - // effect on it. Replace the global dispatcher with one that carries the - // current autoSelectFamily setting so subsequent globalThis.fetch calls - // inherit the same decision. - // See: https://github.com/openclaw/openclaw/issues/25676 - if ( - autoSelectDecision.value !== null && - autoSelectDecision.value !== appliedGlobalDispatcherAutoSelectFamily - ) { - const existingGlobalDispatcher = getGlobalDispatcher(); - const shouldPreserveExistingProxy = - isProxyLikeDispatcher(existingGlobalDispatcher) && !hasProxyEnvConfigured(); - if (!shouldPreserveExistingProxy) { - try { - setGlobalDispatcher( - new EnvHttpProxyAgent({ - connect: { - autoSelectFamily: autoSelectDecision.value, - autoSelectFamilyAttemptTimeout: 300, - }, - }), - ); - appliedGlobalDispatcherAutoSelectFamily = autoSelectDecision.value; - log.info(`global undici dispatcher autoSelectFamily=${autoSelectDecision.value}`); - } catch { - // ignore if setGlobalDispatcher is unavailable - } - } + const lookup = createDnsResultOrderLookup(params.dnsResultOrder); + if (lookup) { + connect.lookup = lookup; } - // Apply DNS result order workaround for IPv4/IPv6 issues. - // Some APIs (including Telegram) may fail with IPv6 on certain networks. - // See: https://github.com/openclaw/openclaw/issues/5311 - const dnsDecision = resolveTelegramDnsResultOrderDecision({ network }); - if (dnsDecision.value !== null && dnsDecision.value !== appliedDnsResultOrder) { - if (typeof dns.setDefaultResultOrder === "function") { - try { - dns.setDefaultResultOrder(dnsDecision.value as "ipv4first" | "verbatim"); - appliedDnsResultOrder = dnsDecision.value; - const label = dnsDecision.source ? ` (${dnsDecision.source})` : ""; - log.info(`dnsResultOrder=${dnsDecision.value}${label}`); - } catch { - // ignore if unsupported by the runtime - } + return Object.keys(connect).length > 0 ? connect : null; +} + +function shouldBypassEnvProxyForTelegramApi(env: NodeJS.ProcessEnv = process.env): boolean { + // We need this classification before dispatch to decide whether sticky IPv4 fallback + // can safely arm. EnvHttpProxyAgent does not expose route decisions (proxy vs direct + // NO_PROXY bypass), so we mirror undici's parsing/matching behavior for this host. + // Match EnvHttpProxyAgent behavior (undici): + // - lower-case no_proxy takes precedence over NO_PROXY + // - entries split by comma or whitespace + // - wildcard handling is exact-string "*" only + // - leading "." and "*." are normalized the same way + const noProxyValue = env.no_proxy ?? env.NO_PROXY ?? ""; + if (!noProxyValue) { + return false; + } + if (noProxyValue === "*") { + return true; + } + const targetHostname = TELEGRAM_API_HOSTNAME.toLowerCase(); + const targetPort = 443; + const noProxyEntries = noProxyValue.split(/[,\s]/); + for (let i = 0; i < noProxyEntries.length; i++) { + const entry = noProxyEntries[i]; + if (!entry) { + continue; } + const parsed = entry.match(/^(.+):(\d+)$/); + const entryHostname = (parsed ? parsed[1] : entry).replace(/^\*?\./, "").toLowerCase(); + const entryPort = parsed ? Number.parseInt(parsed[2], 10) : 0; + if (entryPort && entryPort !== targetPort) { + continue; + } + if ( + targetHostname === entryHostname || + targetHostname.slice(-(entryHostname.length + 1)) === `.${entryHostname}` + ) { + return true; + } + } + return false; +} + +function hasEnvHttpProxyForTelegramApi(env: NodeJS.ProcessEnv = process.env): boolean { + // Match EnvHttpProxyAgent behavior (undici) for HTTPS requests: + // - lower-case env vars take precedence over upper-case + // - HTTPS requests use https_proxy/HTTPS_PROXY first, then fall back to http_proxy/HTTP_PROXY + // - ALL_PROXY is ignored by EnvHttpProxyAgent + const httpProxy = env.http_proxy ?? env.HTTP_PROXY; + const httpsProxy = env.https_proxy ?? env.HTTPS_PROXY; + return Boolean(httpProxy) || Boolean(httpsProxy); +} + +function createTelegramDispatcher(params: { + autoSelectFamily: boolean | null; + dnsResultOrder: TelegramDnsResultOrder | null; + useEnvProxy: boolean; + forceIpv4: boolean; + proxyUrl?: string; +}): { dispatcher: TelegramDispatcher; mode: TelegramDispatcherMode } { + const connect = buildTelegramConnectOptions({ + autoSelectFamily: params.autoSelectFamily, + dnsResultOrder: params.dnsResultOrder, + forceIpv4: params.forceIpv4, + }); + const explicitProxyUrl = params.proxyUrl?.trim(); + if (explicitProxyUrl) { + const proxyOptions = connect + ? ({ + uri: explicitProxyUrl, + proxyTls: connect, + } satisfies ConstructorParameters[0]) + : explicitProxyUrl; + try { + return { + dispatcher: new ProxyAgent(proxyOptions), + mode: "explicit-proxy", + }; + } catch (err) { + const reason = err instanceof Error ? err.message : String(err); + throw new Error(`explicit proxy dispatcher init failed: ${reason}`, { cause: err }); + } + } + if (params.useEnvProxy) { + const proxyOptions = connect + ? ({ + connect, + // undici's EnvHttpProxyAgent passes `connect` only to the no-proxy Agent. + // Real proxied HTTPS traffic reads transport settings from ProxyAgent.proxyTls. + proxyTls: connect, + } satisfies ConstructorParameters[0]) + : undefined; + try { + return { + dispatcher: new EnvHttpProxyAgent(proxyOptions), + mode: "env-proxy", + }; + } catch (err) { + log.warn( + `env proxy dispatcher init failed; falling back to direct dispatcher: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + const agentOptions = connect + ? ({ + connect, + } satisfies ConstructorParameters[0]) + : undefined; + return { + dispatcher: new Agent(agentOptions), + mode: "direct", + }; +} + +function withDispatcherIfMissing( + init: RequestInit | undefined, + dispatcher: TelegramDispatcher, +): RequestInitWithDispatcher { + const withDispatcher = init as RequestInitWithDispatcher | undefined; + if (withDispatcher?.dispatcher) { + return init ?? {}; + } + return init ? { ...init, dispatcher } : { dispatcher }; +} + +function resolveWrappedFetch(fetchImpl: typeof fetch): typeof fetch { + return resolveFetch(fetchImpl) ?? fetchImpl; +} + +function logResolverNetworkDecisions(params: { + autoSelectDecision: ReturnType; + dnsDecision: ReturnType; +}): void { + if (params.autoSelectDecision.value !== null) { + const sourceLabel = params.autoSelectDecision.source + ? ` (${params.autoSelectDecision.source})` + : ""; + log.info(`autoSelectFamily=${params.autoSelectDecision.value}${sourceLabel}`); + } + if (params.dnsDecision.value !== null) { + const sourceLabel = params.dnsDecision.source ? ` (${params.dnsDecision.source})` : ""; + log.info(`dnsResultOrder=${params.dnsDecision.value}${sourceLabel}`); } } @@ -151,6 +314,11 @@ function collectErrorCodes(err: unknown): Set { return codes; } +function formatErrorCodes(err: unknown): string { + const codes = [...collectErrorCodes(err)]; + return codes.length > 0 ? codes.join(",") : "none"; +} + function shouldRetryWithIpv4Fallback(err: unknown): boolean { const ctx: Ipv4FallbackContext = { message: @@ -165,44 +333,97 @@ function shouldRetryWithIpv4Fallback(err: unknown): boolean { return true; } -function applyTelegramIpv4Fallback(): void { - applyTelegramNetworkWorkarounds({ - autoSelectFamily: false, - dnsResultOrder: "ipv4first", - }); - log.warn("fetch fallback: forcing autoSelectFamily=false + dnsResultOrder=ipv4first"); -} - // Prefer wrapped fetch when available to normalize AbortSignal across runtimes. export function resolveTelegramFetch( proxyFetch?: typeof fetch, options?: { network?: TelegramNetworkConfig }, -): typeof fetch | undefined { - applyTelegramNetworkWorkarounds(options?.network); - const sourceFetch = proxyFetch ? resolveFetch(proxyFetch) : resolveFetch(); - if (!sourceFetch) { - throw new Error("fetch is not available; set channels.telegram.proxy in config"); - } - // When Telegram media fetch hits dual-stack edge cases (ENETUNREACH/ETIMEDOUT), - // switch to IPv4-safe network mode and retry once. - if (proxyFetch) { +): typeof fetch { + const autoSelectDecision = resolveTelegramAutoSelectFamilyDecision({ + network: options?.network, + }); + const dnsDecision = resolveTelegramDnsResultOrderDecision({ + network: options?.network, + }); + logResolverNetworkDecisions({ + autoSelectDecision, + dnsDecision, + }); + + const explicitProxyUrl = proxyFetch ? getProxyUrlFromFetch(proxyFetch) : undefined; + const undiciSourceFetch = resolveWrappedFetch(undiciFetch as unknown as typeof fetch); + const sourceFetch = explicitProxyUrl + ? undiciSourceFetch + : proxyFetch + ? resolveWrappedFetch(proxyFetch) + : undiciSourceFetch; + + // Preserve fully caller-owned custom fetch implementations. + // OpenClaw proxy fetches are metadata-tagged and continue into resolver-scoped policy. + if (proxyFetch && !explicitProxyUrl) { return sourceFetch; } + + const dnsResultOrder = normalizeDnsResultOrder(dnsDecision.value); + const useEnvProxy = !explicitProxyUrl && hasEnvHttpProxyForTelegramApi(); + const defaultDispatcherResolution = createTelegramDispatcher({ + autoSelectFamily: autoSelectDecision.value, + dnsResultOrder, + useEnvProxy, + forceIpv4: false, + proxyUrl: explicitProxyUrl, + }); + const defaultDispatcher = defaultDispatcherResolution.dispatcher; + const shouldBypassEnvProxy = shouldBypassEnvProxyForTelegramApi(); + const allowStickyIpv4Fallback = + defaultDispatcherResolution.mode === "direct" || + (defaultDispatcherResolution.mode === "env-proxy" && shouldBypassEnvProxy); + const stickyShouldUseEnvProxy = defaultDispatcherResolution.mode === "env-proxy"; + + let stickyIpv4FallbackEnabled = false; + let stickyIpv4Dispatcher: TelegramDispatcher | null = null; + const resolveStickyIpv4Dispatcher = () => { + if (!stickyIpv4Dispatcher) { + stickyIpv4Dispatcher = createTelegramDispatcher({ + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + useEnvProxy: stickyShouldUseEnvProxy, + forceIpv4: true, + proxyUrl: explicitProxyUrl, + }).dispatcher; + } + return stickyIpv4Dispatcher; + }; + return (async (input: RequestInfo | URL, init?: RequestInit) => { + const callerProvidedDispatcher = Boolean( + (init as RequestInitWithDispatcher | undefined)?.dispatcher, + ); + const initialInit = withDispatcherIfMissing( + init, + stickyIpv4FallbackEnabled ? resolveStickyIpv4Dispatcher() : defaultDispatcher, + ); try { - return await sourceFetch(input, init); + return await sourceFetch(input, initialInit); } catch (err) { if (shouldRetryWithIpv4Fallback(err)) { - applyTelegramIpv4Fallback(); - return sourceFetch(input, init); + // Preserve caller-owned dispatchers on retry. + if (callerProvidedDispatcher) { + return sourceFetch(input, init ?? {}); + } + // Proxy routes should not arm sticky IPv4 mode; `family=4` would constrain + // proxy-connect behavior instead of Telegram endpoint selection. + if (!allowStickyIpv4Fallback) { + throw err; + } + if (!stickyIpv4FallbackEnabled) { + stickyIpv4FallbackEnabled = true; + log.warn( + `fetch fallback: enabling sticky IPv4-only dispatcher (codes=${formatErrorCodes(err)})`, + ); + } + return sourceFetch(input, withDispatcherIfMissing(init, resolveStickyIpv4Dispatcher())); } throw err; } }) as typeof fetch; } - -export function resetTelegramFetchStateForTests(): void { - appliedAutoSelectFamily = null; - appliedDnsResultOrder = null; - appliedGlobalDispatcherAutoSelectFamily = null; -} diff --git a/src/telegram/probe.test.ts b/src/telegram/probe.test.ts index 11b0b317eec..7006d14a2f7 100644 --- a/src/telegram/probe.test.ts +++ b/src/telegram/probe.test.ts @@ -1,14 +1,28 @@ -import { type Mock, describe, expect, it, vi } from "vitest"; +import { afterEach, type Mock, describe, expect, it, vi } from "vitest"; import { withFetchPreconnect } from "../test-utils/fetch-mock.js"; -import { probeTelegram } from "./probe.js"; +import { probeTelegram, resetTelegramProbeFetcherCacheForTests } from "./probe.js"; + +const resolveTelegramFetch = vi.hoisted(() => vi.fn()); +const makeProxyFetch = vi.hoisted(() => vi.fn()); + +vi.mock("./fetch.js", () => ({ + resolveTelegramFetch, +})); + +vi.mock("./proxy.js", () => ({ + makeProxyFetch, +})); describe("probeTelegram retry logic", () => { const token = "test-token"; const timeoutMs = 5000; + const originalFetch = global.fetch; const installFetchMock = (): Mock => { const fetchMock = vi.fn(); global.fetch = withFetchPreconnect(fetchMock); + resolveTelegramFetch.mockImplementation((proxyFetch?: typeof fetch) => proxyFetch ?? fetch); + makeProxyFetch.mockImplementation(() => fetchMock as unknown as typeof fetch); return fetchMock; }; @@ -41,6 +55,19 @@ describe("probeTelegram retry logic", () => { expect(result.bot?.username).toBe("test_bot"); } + afterEach(() => { + resetTelegramProbeFetcherCacheForTests(); + resolveTelegramFetch.mockReset(); + makeProxyFetch.mockReset(); + vi.unstubAllEnvs(); + vi.clearAllMocks(); + if (originalFetch) { + global.fetch = originalFetch; + } else { + delete (globalThis as { fetch?: typeof fetch }).fetch; + } + }); + it.each([ { errors: [], @@ -95,6 +122,35 @@ describe("probeTelegram retry logic", () => { } }); + it("respects timeout budget across retries", async () => { + const fetchMock = vi.fn((_input: RequestInfo | URL, init?: RequestInit) => { + return new Promise((_resolve, reject) => { + const signal = init?.signal; + if (signal?.aborted) { + reject(new Error("Request aborted")); + return; + } + signal?.addEventListener("abort", () => reject(new Error("Request aborted")), { + once: true, + }); + }); + }); + global.fetch = withFetchPreconnect(fetchMock as unknown as typeof fetch); + resolveTelegramFetch.mockImplementation((proxyFetch?: typeof fetch) => proxyFetch ?? fetch); + makeProxyFetch.mockImplementation(() => fetchMock as unknown as typeof fetch); + vi.useFakeTimers(); + try { + const probePromise = probeTelegram(`${token}-budget`, 500); + await vi.advanceTimersByTimeAsync(600); + const result = await probePromise; + + expect(result.ok).toBe(false); + expect(fetchMock).toHaveBeenCalledTimes(1); + } finally { + vi.useRealTimers(); + } + }); + it("should NOT retry if getMe returns a 401 Unauthorized", async () => { const fetchMock = installFetchMock(); const mockResponse = { @@ -114,4 +170,106 @@ describe("probeTelegram retry logic", () => { expect(result.error).toBe("Unauthorized"); expect(fetchMock).toHaveBeenCalledTimes(1); // Should not retry }); + + it("uses resolver-scoped Telegram fetch with probe network options", async () => { + const fetchMock = installFetchMock(); + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + + await probeTelegram(token, timeoutMs, { + proxyUrl: "http://127.0.0.1:8888", + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + + expect(makeProxyFetch).toHaveBeenCalledWith("http://127.0.0.1:8888"); + expect(resolveTelegramFetch).toHaveBeenCalledWith(fetchMock, { + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + }); + + it("reuses probe fetcher across repeated probes for the same account transport settings", async () => { + const fetchMock = installFetchMock(); + vi.stubEnv("VITEST", ""); + vi.stubEnv("NODE_ENV", "production"); + + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + await probeTelegram(`${token}-cache`, timeoutMs, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + await probeTelegram(`${token}-cache`, timeoutMs, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + expect(resolveTelegramFetch).toHaveBeenCalledTimes(1); + }); + + it("does not reuse probe fetcher cache when network settings differ", async () => { + const fetchMock = installFetchMock(); + vi.stubEnv("VITEST", ""); + vi.stubEnv("NODE_ENV", "production"); + + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + await probeTelegram(`${token}-cache-variant`, timeoutMs, { + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + await probeTelegram(`${token}-cache-variant`, timeoutMs, { + network: { + autoSelectFamily: false, + dnsResultOrder: "ipv4first", + }, + }); + + expect(resolveTelegramFetch).toHaveBeenCalledTimes(2); + }); + + it("reuses probe fetcher cache across token rotation when accountId is stable", async () => { + const fetchMock = installFetchMock(); + vi.stubEnv("VITEST", ""); + vi.stubEnv("NODE_ENV", "production"); + + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + await probeTelegram(`${token}-old`, timeoutMs, { + accountId: "main", + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + mockGetMeSuccess(fetchMock); + mockGetWebhookInfoSuccess(fetchMock); + await probeTelegram(`${token}-new`, timeoutMs, { + accountId: "main", + network: { + autoSelectFamily: true, + dnsResultOrder: "ipv4first", + }, + }); + + expect(resolveTelegramFetch).toHaveBeenCalledTimes(1); + }); }); diff --git a/src/telegram/probe.ts b/src/telegram/probe.ts index f988733f0ee..8311506e455 100644 --- a/src/telegram/probe.ts +++ b/src/telegram/probe.ts @@ -1,5 +1,7 @@ import type { BaseProbeResult } from "../channels/plugins/types.js"; +import type { TelegramNetworkConfig } from "../config/types.telegram.js"; import { fetchWithTimeout } from "../utils/fetch-timeout.js"; +import { resolveTelegramFetch } from "./fetch.js"; import { makeProxyFetch } from "./proxy.js"; const TELEGRAM_API_BASE = "https://api.telegram.org"; @@ -17,15 +19,90 @@ export type TelegramProbe = BaseProbeResult & { webhook?: { url?: string | null; hasCustomCert?: boolean | null }; }; +export type TelegramProbeOptions = { + proxyUrl?: string; + network?: TelegramNetworkConfig; + accountId?: string; +}; + +const probeFetcherCache = new Map(); +const MAX_PROBE_FETCHER_CACHE_SIZE = 64; + +export function resetTelegramProbeFetcherCacheForTests(): void { + probeFetcherCache.clear(); +} + +function resolveProbeOptions( + proxyOrOptions?: string | TelegramProbeOptions, +): TelegramProbeOptions | undefined { + if (!proxyOrOptions) { + return undefined; + } + if (typeof proxyOrOptions === "string") { + return { proxyUrl: proxyOrOptions }; + } + return proxyOrOptions; +} + +function shouldUseProbeFetcherCache(): boolean { + return !process.env.VITEST && process.env.NODE_ENV !== "test"; +} + +function buildProbeFetcherCacheKey(token: string, options?: TelegramProbeOptions): string { + const cacheIdentity = options?.accountId?.trim() || token; + const cacheIdentityKind = options?.accountId?.trim() ? "account" : "token"; + const proxyKey = options?.proxyUrl?.trim() ?? ""; + const autoSelectFamily = options?.network?.autoSelectFamily; + const autoSelectFamilyKey = + typeof autoSelectFamily === "boolean" ? String(autoSelectFamily) : "default"; + const dnsResultOrderKey = options?.network?.dnsResultOrder ?? "default"; + return `${cacheIdentityKind}:${cacheIdentity}::${proxyKey}::${autoSelectFamilyKey}::${dnsResultOrderKey}`; +} + +function setCachedProbeFetcher(cacheKey: string, fetcher: typeof fetch): typeof fetch { + probeFetcherCache.set(cacheKey, fetcher); + if (probeFetcherCache.size > MAX_PROBE_FETCHER_CACHE_SIZE) { + const oldestKey = probeFetcherCache.keys().next().value; + if (oldestKey !== undefined) { + probeFetcherCache.delete(oldestKey); + } + } + return fetcher; +} + +function resolveProbeFetcher(token: string, options?: TelegramProbeOptions): typeof fetch { + const cacheEnabled = shouldUseProbeFetcherCache(); + const cacheKey = cacheEnabled ? buildProbeFetcherCacheKey(token, options) : null; + if (cacheKey) { + const cachedFetcher = probeFetcherCache.get(cacheKey); + if (cachedFetcher) { + return cachedFetcher; + } + } + + const proxyUrl = options?.proxyUrl?.trim(); + const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined; + const resolved = resolveTelegramFetch(proxyFetch, { network: options?.network }); + + if (cacheKey) { + return setCachedProbeFetcher(cacheKey, resolved); + } + return resolved; +} + export async function probeTelegram( token: string, timeoutMs: number, - proxyUrl?: string, + proxyOrOptions?: string | TelegramProbeOptions, ): Promise { const started = Date.now(); - const fetcher = proxyUrl ? makeProxyFetch(proxyUrl) : fetch; + const timeoutBudgetMs = Math.max(1, Math.floor(timeoutMs)); + const deadlineMs = started + timeoutBudgetMs; + const options = resolveProbeOptions(proxyOrOptions); + const fetcher = resolveProbeFetcher(token, options); const base = `${TELEGRAM_API_BASE}/bot${token}`; - const retryDelayMs = Math.max(50, Math.min(1000, timeoutMs)); + const retryDelayMs = Math.max(50, Math.min(1000, Math.floor(timeoutBudgetMs / 5))); + const resolveRemainingBudgetMs = () => Math.max(0, deadlineMs - Date.now()); const result: TelegramProbe = { ok: false, @@ -40,19 +117,35 @@ export async function probeTelegram( // Retry loop for initial connection (handles network/DNS startup races) for (let i = 0; i < 3; i++) { + const remainingBudgetMs = resolveRemainingBudgetMs(); + if (remainingBudgetMs <= 0) { + break; + } try { - meRes = await fetchWithTimeout(`${base}/getMe`, {}, timeoutMs, fetcher); + meRes = await fetchWithTimeout( + `${base}/getMe`, + {}, + Math.max(1, Math.min(timeoutBudgetMs, remainingBudgetMs)), + fetcher, + ); break; } catch (err) { fetchError = err; if (i < 2) { - await new Promise((resolve) => setTimeout(resolve, retryDelayMs)); + const remainingAfterAttemptMs = resolveRemainingBudgetMs(); + if (remainingAfterAttemptMs <= 0) { + break; + } + const delayMs = Math.min(retryDelayMs, remainingAfterAttemptMs); + if (delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } } } } if (!meRes) { - throw fetchError; + throw fetchError ?? new Error(`probe timed out after ${timeoutBudgetMs}ms`); } const meJson = (await meRes.json()) as { @@ -89,16 +182,24 @@ export async function probeTelegram( // Try to fetch webhook info, but don't fail health if it errors. try { - const webhookRes = await fetchWithTimeout(`${base}/getWebhookInfo`, {}, timeoutMs, fetcher); - const webhookJson = (await webhookRes.json()) as { - ok?: boolean; - result?: { url?: string; has_custom_certificate?: boolean }; - }; - if (webhookRes.ok && webhookJson?.ok) { - result.webhook = { - url: webhookJson.result?.url ?? null, - hasCustomCert: webhookJson.result?.has_custom_certificate ?? null, + const webhookRemainingBudgetMs = resolveRemainingBudgetMs(); + if (webhookRemainingBudgetMs > 0) { + const webhookRes = await fetchWithTimeout( + `${base}/getWebhookInfo`, + {}, + Math.max(1, Math.min(timeoutBudgetMs, webhookRemainingBudgetMs)), + fetcher, + ); + const webhookJson = (await webhookRes.json()) as { + ok?: boolean; + result?: { url?: string; has_custom_certificate?: boolean }; }; + if (webhookRes.ok && webhookJson?.ok) { + result.webhook = { + url: webhookJson.result?.url ?? null, + hasCustomCert: webhookJson.result?.has_custom_certificate ?? null, + }; + } } } catch { // ignore webhook errors for probe diff --git a/src/telegram/proxy.test.ts b/src/telegram/proxy.test.ts index 27065d5c50c..4f2ca8f62e6 100644 --- a/src/telegram/proxy.test.ts +++ b/src/telegram/proxy.test.ts @@ -29,7 +29,7 @@ vi.mock("undici", () => ({ setGlobalDispatcher: mocks.setGlobalDispatcher, })); -import { makeProxyFetch } from "./proxy.js"; +import { getProxyUrlFromFetch, makeProxyFetch } from "./proxy.js"; describe("makeProxyFetch", () => { it("uses undici fetch with ProxyAgent dispatcher", async () => { @@ -46,4 +46,11 @@ describe("makeProxyFetch", () => { ); expect(mocks.setGlobalDispatcher).not.toHaveBeenCalled(); }); + + it("attaches proxy metadata for resolver transport handling", () => { + const proxyUrl = "http://proxy.test:8080"; + const proxyFetch = makeProxyFetch(proxyUrl); + + expect(getProxyUrlFromFetch(proxyFetch)).toBe(proxyUrl); + }); }); diff --git a/src/telegram/proxy.ts b/src/telegram/proxy.ts index c4cb7129a17..3ac2bb10159 100644 --- a/src/telegram/proxy.ts +++ b/src/telegram/proxy.ts @@ -1 +1 @@ -export { makeProxyFetch } from "../infra/net/proxy-fetch.js"; +export { getProxyUrlFromFetch, makeProxyFetch } from "../infra/net/proxy-fetch.js"; diff --git a/src/telegram/send.proxy.test.ts b/src/telegram/send.proxy.test.ts index ee47ec765c4..8e16078a67c 100644 --- a/src/telegram/send.proxy.test.ts +++ b/src/telegram/send.proxy.test.ts @@ -51,7 +51,12 @@ vi.mock("grammy", () => ({ InputFile: class {}, })); -import { deleteMessageTelegram, reactMessageTelegram, sendMessageTelegram } from "./send.js"; +import { + deleteMessageTelegram, + reactMessageTelegram, + resetTelegramClientOptionsCacheForTests, + sendMessageTelegram, +} from "./send.js"; describe("telegram proxy client", () => { const proxyUrl = "http://proxy.test:8080"; @@ -76,6 +81,8 @@ describe("telegram proxy client", () => { }; beforeEach(() => { + resetTelegramClientOptionsCacheForTests(); + vi.unstubAllEnvs(); botApi.sendMessage.mockResolvedValue({ message_id: 1, chat: { id: "123" } }); botApi.setMessageReaction.mockResolvedValue(undefined); botApi.deleteMessage.mockResolvedValue(true); @@ -87,6 +94,33 @@ describe("telegram proxy client", () => { resolveTelegramFetch.mockClear(); }); + it("reuses cached Telegram client options for repeated sends with same account transport settings", async () => { + const { fetchImpl } = prepareProxyFetch(); + vi.stubEnv("VITEST", ""); + vi.stubEnv("NODE_ENV", "production"); + + await sendMessageTelegram("123", "first", { token: "tok", accountId: "foo" }); + await sendMessageTelegram("123", "second", { token: "tok", accountId: "foo" }); + + expect(makeProxyFetch).toHaveBeenCalledTimes(1); + expect(resolveTelegramFetch).toHaveBeenCalledTimes(1); + expect(botCtorSpy).toHaveBeenCalledTimes(2); + expect(botCtorSpy).toHaveBeenNthCalledWith( + 1, + "tok", + expect.objectContaining({ + client: expect.objectContaining({ fetch: fetchImpl }), + }), + ); + expect(botCtorSpy).toHaveBeenNthCalledWith( + 2, + "tok", + expect.objectContaining({ + client: expect.objectContaining({ fetch: fetchImpl }), + }), + ); + }); + it.each([ { name: "sendMessage", diff --git a/src/telegram/send.ts b/src/telegram/send.ts index e1b352a0a61..313abf361e8 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -115,6 +115,12 @@ const MESSAGE_NOT_MODIFIED_RE = const CHAT_NOT_FOUND_RE = /400: Bad Request: chat not found/i; const sendLogger = createSubsystemLogger("telegram/send"); const diagLogger = createSubsystemLogger("telegram/diagnostic"); +const telegramClientOptionsCache = new Map(); +const MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE = 64; + +export function resetTelegramClientOptionsCacheForTests(): void { + telegramClientOptionsCache.clear(); +} function createTelegramHttpLogger(cfg: ReturnType) { const enabled = isDiagnosticFlagEnabled("telegram.http", cfg); @@ -130,25 +136,74 @@ function createTelegramHttpLogger(cfg: ReturnType) { }; } +function shouldUseTelegramClientOptionsCache(): boolean { + return !process.env.VITEST && process.env.NODE_ENV !== "test"; +} + +function buildTelegramClientOptionsCacheKey(params: { + account: ResolvedTelegramAccount; + timeoutSeconds?: number; +}): string { + const proxyKey = params.account.config.proxy?.trim() ?? ""; + const autoSelectFamily = params.account.config.network?.autoSelectFamily; + const autoSelectFamilyKey = + typeof autoSelectFamily === "boolean" ? String(autoSelectFamily) : "default"; + const dnsResultOrderKey = params.account.config.network?.dnsResultOrder ?? "default"; + const timeoutSecondsKey = + typeof params.timeoutSeconds === "number" ? String(params.timeoutSeconds) : "default"; + return `${params.account.accountId}::${proxyKey}::${autoSelectFamilyKey}::${dnsResultOrderKey}::${timeoutSecondsKey}`; +} + +function setCachedTelegramClientOptions( + cacheKey: string, + clientOptions: ApiClientOptions | undefined, +): ApiClientOptions | undefined { + telegramClientOptionsCache.set(cacheKey, clientOptions); + if (telegramClientOptionsCache.size > MAX_TELEGRAM_CLIENT_OPTIONS_CACHE_SIZE) { + const oldestKey = telegramClientOptionsCache.keys().next().value; + if (oldestKey !== undefined) { + telegramClientOptionsCache.delete(oldestKey); + } + } + return clientOptions; +} + function resolveTelegramClientOptions( account: ResolvedTelegramAccount, ): ApiClientOptions | undefined { - const proxyUrl = account.config.proxy?.trim(); - const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined; - const fetchImpl = resolveTelegramFetch(proxyFetch, { - network: account.config.network, - }); const timeoutSeconds = typeof account.config.timeoutSeconds === "number" && Number.isFinite(account.config.timeoutSeconds) ? Math.max(1, Math.floor(account.config.timeoutSeconds)) : undefined; - return fetchImpl || timeoutSeconds - ? { - ...(fetchImpl ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } : {}), - ...(timeoutSeconds ? { timeoutSeconds } : {}), - } - : undefined; + + const cacheEnabled = shouldUseTelegramClientOptionsCache(); + const cacheKey = cacheEnabled + ? buildTelegramClientOptionsCacheKey({ + account, + timeoutSeconds, + }) + : null; + if (cacheKey && telegramClientOptionsCache.has(cacheKey)) { + return telegramClientOptionsCache.get(cacheKey); + } + + const proxyUrl = account.config.proxy?.trim(); + const proxyFetch = proxyUrl ? makeProxyFetch(proxyUrl) : undefined; + const fetchImpl = resolveTelegramFetch(proxyFetch, { + network: account.config.network, + }); + const clientOptions = + fetchImpl || timeoutSeconds + ? { + ...(fetchImpl ? { fetch: fetchImpl as unknown as ApiClientOptions["fetch"] } : {}), + ...(timeoutSeconds ? { timeoutSeconds } : {}), + } + : undefined; + if (cacheKey) { + return setCachedTelegramClientOptions(cacheKey, clientOptions); + } + return clientOptions; } function resolveToken(explicit: string | undefined, params: { accountId: string; token: string }) {