diff --git a/extensions/telegram/src/webhook.test.ts b/extensions/telegram/src/webhook.test.ts index 05403bcd5a7..73feb502501 100644 --- a/extensions/telegram/src/webhook.test.ts +++ b/extensions/telegram/src/webhook.test.ts @@ -2,6 +2,7 @@ import { createHash } from "node:crypto"; import { once } from "node:events"; import { request, type IncomingMessage } from "node:http"; import { setTimeout as sleep } from "node:timers/promises"; +import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "openclaw/plugin-sdk/webhook-ingress"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; const handlerSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined)); @@ -24,6 +25,7 @@ const TELEGRAM_SECRET = "secret"; const TELEGRAM_WEBHOOK_PATH = "/hook"; const WEBHOOK_TEST_YIELD_MS = 0; const WEBHOOK_DRAIN_GUARD_MS = 5; +const TELEGRAM_WEBHOOK_RATE_LIMIT_BURST = WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests + 10; function collectResponseBody( res: IncomingMessage, @@ -558,6 +560,148 @@ describe("startTelegramWebhook", () => { ); }); + it("rate limits repeated invalid secret guesses before authentication succeeds", async () => { + handlerSpy.mockClear(); + await withStartedWebhook( + { + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + }, + async ({ port }) => { + let saw429 = false; + + for (let i = 0; i < TELEGRAM_WEBHOOK_RATE_LIMIT_BURST; i += 1) { + const response = await postWebhookJson({ + url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH), + payload: JSON.stringify({ update_id: i, message: { text: `guess ${i}` } }), + secret: `wrong-secret-${String(i).padStart(3, "0")}`, + }); + + if (response.status === 429) { + saw429 = true; + expect(await response.text()).toBe("Too Many Requests"); + break; + } + + expect(response.status).toBe(401); + expect(await response.text()).toBe("unauthorized"); + } + + expect(saw429).toBe(true); + + const validResponse = await postWebhookJson({ + url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH), + payload: JSON.stringify({ update_id: 999, message: { text: "hello" } }), + secret: TELEGRAM_SECRET, + }); + expect(validResponse.status).toBe(429); + expect(await validResponse.text()).toBe("Too Many Requests"); + expect(handlerSpy).not.toHaveBeenCalled(); + }, + ); + }); + + it("uses the forwarded client ip when trusted proxies are configured", async () => { + handlerSpy.mockClear(); + await withStartedWebhook( + { + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + config: { + gateway: { + trustedProxies: ["127.0.0.1"], + }, + }, + }, + async ({ port }) => { + for (let i = 0; i < TELEGRAM_WEBHOOK_RATE_LIMIT_BURST; i += 1) { + const response = await fetchWithTimeout( + webhookUrl(port, TELEGRAM_WEBHOOK_PATH), + { + method: "POST", + headers: { + "content-type": "application/json", + "x-forwarded-for": "198.51.100.10", + "x-telegram-bot-api-secret-token": `wrong-secret-${String(i).padStart(3, "0")}`, + }, + body: JSON.stringify({ update_id: i, message: { text: `guess ${i}` } }), + }, + 5_000, + ); + if (response.status === 429) { + break; + } + expect(response.status).toBe(401); + } + + const isolatedClient = await fetchWithTimeout( + webhookUrl(port, TELEGRAM_WEBHOOK_PATH), + { + method: "POST", + headers: { + "content-type": "application/json", + "x-forwarded-for": "203.0.113.20", + "x-telegram-bot-api-secret-token": TELEGRAM_SECRET, + }, + body: JSON.stringify({ update_id: 201, message: { text: "hello" } }), + }, + 5_000, + ); + + expect(isolatedClient.status).toBe(200); + expect(handlerSpy).toHaveBeenCalledTimes(1); + }, + ); + }); + + it("keeps rate-limit state isolated per webhook listener", async () => { + handlerSpy.mockClear(); + const firstAbort = new AbortController(); + const secondAbort = new AbortController(); + const first = await startTelegramWebhook({ + token: TELEGRAM_TOKEN, + port: 0, + abortSignal: firstAbort.signal, + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + }); + const second = await startTelegramWebhook({ + token: TELEGRAM_TOKEN, + port: 0, + abortSignal: secondAbort.signal, + secret: TELEGRAM_SECRET, + path: TELEGRAM_WEBHOOK_PATH, + }); + + try { + const firstPort = getServerPort(first.server); + const secondPort = getServerPort(second.server); + + for (let i = 0; i < TELEGRAM_WEBHOOK_RATE_LIMIT_BURST; i += 1) { + const response = await postWebhookJson({ + url: webhookUrl(firstPort, TELEGRAM_WEBHOOK_PATH), + payload: JSON.stringify({ update_id: i, message: { text: `guess ${i}` } }), + secret: `wrong-secret-${String(i).padStart(3, "0")}`, + }); + if (response.status === 429) { + break; + } + } + + const secondResponse = await postWebhookJson({ + url: webhookUrl(secondPort, TELEGRAM_WEBHOOK_PATH), + payload: JSON.stringify({ update_id: 301, message: { text: "hello" } }), + secret: TELEGRAM_SECRET, + }); + + expect(secondResponse.status).toBe(200); + expect(handlerSpy).toHaveBeenCalledTimes(1); + } finally { + firstAbort.abort(); + secondAbort.abort(); + } + }); + it("rejects startup when webhook secret is missing", async () => { await expect( startTelegramWebhook({ diff --git a/extensions/telegram/src/webhook.ts b/extensions/telegram/src/webhook.ts index ab82866c35b..d0b76d41712 100644 --- a/extensions/telegram/src/webhook.ts +++ b/extensions/telegram/src/webhook.ts @@ -1,5 +1,7 @@ import { timingSafeEqual } from "node:crypto"; import { createServer } from "node:http"; +import type { IncomingMessage } from "node:http"; +import net from "node:net"; import * as grammy from "grammy"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { isDiagnosticsEnabled } from "openclaw/plugin-sdk/infra-runtime"; @@ -14,6 +16,11 @@ import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat, } from "openclaw/plugin-sdk/text-runtime"; +import { + applyBasicWebhookRequestGuards, + createFixedWindowRateLimiter, + WEBHOOK_RATE_LIMIT_DEFAULTS, +} from "openclaw/plugin-sdk/webhook-ingress"; import { resolveTelegramAllowedUpdates } from "./allowed-updates.js"; import { withTelegramApiErrorLogging } from "./api-logging.js"; import { createTelegramBot } from "./bot.js"; @@ -103,6 +110,132 @@ function hasValidTelegramWebhookSecret( return actual.length === expected.length && timingSafeEqual(actual, expected); } +function parseIpLiteral(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + if (!trimmed) { + return undefined; + } + if (trimmed.startsWith("[")) { + const end = trimmed.indexOf("]"); + if (end !== -1) { + const candidate = trimmed.slice(1, end); + return net.isIP(candidate) === 0 ? undefined : candidate; + } + } + if (net.isIP(trimmed) !== 0) { + return trimmed; + } + const lastColon = trimmed.lastIndexOf(":"); + if (lastColon > -1 && trimmed.includes(".") && trimmed.indexOf(":") === lastColon) { + const candidate = trimmed.slice(0, lastColon); + return net.isIP(candidate) === 4 ? candidate : undefined; + } + return undefined; +} + +function isTrustedProxyAddress( + ip: string | undefined, + trustedProxies?: readonly string[], +): boolean { + const candidate = parseIpLiteral(ip); + if (!candidate || !trustedProxies?.length) { + return false; + } + const blockList = new net.BlockList(); + for (const proxy of trustedProxies) { + const trimmed = proxy.trim(); + if (!trimmed) { + continue; + } + if (trimmed.includes("/")) { + const [address, prefix] = trimmed.split("/", 2); + const parsedPrefix = Number.parseInt(prefix ?? "", 10); + const family = net.isIP(address); + if ( + family === 4 && + Number.isInteger(parsedPrefix) && + parsedPrefix >= 0 && + parsedPrefix <= 32 + ) { + blockList.addSubnet(address, parsedPrefix, "ipv4"); + } + if ( + family === 6 && + Number.isInteger(parsedPrefix) && + parsedPrefix >= 0 && + parsedPrefix <= 128 + ) { + blockList.addSubnet(address, parsedPrefix, "ipv6"); + } + continue; + } + if (net.isIP(trimmed) === 4) { + blockList.addAddress(trimmed, "ipv4"); + continue; + } + if (net.isIP(trimmed) === 6) { + blockList.addAddress(trimmed, "ipv6"); + } + } + return blockList.check(candidate, net.isIP(candidate) === 6 ? "ipv6" : "ipv4"); +} + +function resolveForwardedClientIp( + forwardedFor: string | undefined, + trustedProxies?: readonly string[], +): string | undefined { + if (!trustedProxies?.length) { + return undefined; + } + const forwardedChain = forwardedFor + ?.split(",") + .map((entry) => parseIpLiteral(entry)) + .filter((entry): entry is string => Boolean(entry)); + if (!forwardedChain?.length) { + return undefined; + } + for (let index = forwardedChain.length - 1; index >= 0; index -= 1) { + const hop = forwardedChain[index]; + if (!isTrustedProxyAddress(hop, trustedProxies)) { + return hop; + } + } + return undefined; +} + +function resolveTelegramWebhookClientIp(req: IncomingMessage, config?: OpenClawConfig): string { + const remoteAddress = parseIpLiteral(req.socket.remoteAddress); + const trustedProxies = config?.gateway?.trustedProxies; + if (!remoteAddress) { + return "unknown"; + } + if (!isTrustedProxyAddress(remoteAddress, trustedProxies)) { + return remoteAddress; + } + const forwardedFor = Array.isArray(req.headers["x-forwarded-for"]) + ? req.headers["x-forwarded-for"][0] + : req.headers["x-forwarded-for"]; + const forwardedClientIp = resolveForwardedClientIp(forwardedFor, trustedProxies); + if (forwardedClientIp) { + return forwardedClientIp; + } + if (config?.gateway?.allowRealIpFallback === true) { + const realIp = Array.isArray(req.headers["x-real-ip"]) + ? req.headers["x-real-ip"][0] + : req.headers["x-real-ip"]; + return parseIpLiteral(realIp) ?? "unknown"; + } + return "unknown"; +} + +function resolveTelegramWebhookRateLimitKey( + req: IncomingMessage, + path: string, + config?: OpenClawConfig, +): string { + return `${path}:${resolveTelegramWebhookClientIp(req, config)}`; +} + export async function startTelegramWebhook(opts: { token: string; accountId?: string; @@ -143,6 +276,11 @@ export async function startTelegramWebhook(opts: { runtime, abortSignal: opts.abortSignal, }); + const telegramWebhookRateLimiter = createFixedWindowRateLimiter({ + windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs, + maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests, + maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys, + }); const handler = grammy.webhookCallback(bot, "callback", { secretToken: secret, onTimeout: "return", @@ -172,6 +310,18 @@ export async function startTelegramWebhook(opts: { res.end(); return; } + // Apply the per-source limit before auth so invalid secret guesses consume budget + // in the same window as any later request from that source. + if ( + !applyBasicWebhookRequestGuards({ + req, + res, + rateLimiter: telegramWebhookRateLimiter, + rateLimitKey: resolveTelegramWebhookRateLimitKey(req, path, opts.config), + }) + ) { + return; + } const startTime = Date.now(); if (diagnosticsEnabled) { logWebhookReceived({ channel: "telegram", updateType: "telegram-post" }); diff --git a/src/config/doc-baseline.ts b/src/config/doc-baseline.ts index b590be8607e..d937159e44f 100644 --- a/src/config/doc-baseline.ts +++ b/src/config/doc-baseline.ts @@ -33,6 +33,13 @@ type ChannelSurfaceMetadata = { configUiHints?: ConfigSchemaResponse["uiHints"]; }; +function compareChannelSurfaceMetadata( + left: ChannelSurfaceMetadata, + right: ChannelSurfaceMetadata, +): number { + return left.id.localeCompare(right.id); +} + export type ConfigDocBaselineKind = "core" | "channel" | "plugin"; export type ConfigDocBaselineEntry = { @@ -372,9 +379,9 @@ async function loadBundledConfigSchemaResponse(): Promise }).map((entry) => [entry.id, entry.meta] as const), ); logConfigDocBaselineDebug(`loaded ${manifestRegistry.plugins.length} bundled plugin manifests`); - const bundledChannelPlugins = manifestRegistry.plugins.filter( - (plugin) => plugin.origin === "bundled" && plugin.channels.length > 0, - ); + const bundledChannelPlugins = manifestRegistry.plugins + .filter((plugin) => plugin.origin === "bundled" && plugin.channels.length > 0) + .toSorted((left, right) => left.id.localeCompare(right.id)); const channelPlugins = process.env.OPENCLAW_CONFIG_DOC_BASELINE_DEBUG === "1" ? await bundledChannelPlugins.reduce>( @@ -432,6 +439,7 @@ async function loadBundledConfigSchemaResponse(): Promise cache: false, plugins: manifestRegistry.plugins .filter((plugin) => plugin.origin === "bundled") + .toSorted((left, right) => left.id.localeCompare(right.id)) .map((plugin) => ({ id: plugin.id, name: plugin.name, @@ -439,7 +447,7 @@ async function loadBundledConfigSchemaResponse(): Promise configUiHints: plugin.configUiHints, configSchema: plugin.configSchema, })), - channels: channelPlugins.map((entry) => ({ + channels: channelPlugins.toSorted(compareChannelSurfaceMetadata).map((entry) => ({ id: entry.id, label: entry.label, description: entry.description,