telegram: throttle repeated webhook auth guesses (#55142)

* telegram: throttle repeated webhook auth guesses

* telegram: use per-listener webhook rate limits

* config: stabilize doc baseline ordering
This commit is contained in:
Jacob Tomlinson
2026-03-26 09:19:31 -07:00
committed by GitHub
parent a92fbf7d40
commit c2c136ae95
3 changed files with 306 additions and 4 deletions

View File

@@ -2,6 +2,7 @@ import { createHash } from "node:crypto";
import { once } from "node:events";
import { request, type IncomingMessage } from "node:http";
import { setTimeout as sleep } from "node:timers/promises";
import { WEBHOOK_RATE_LIMIT_DEFAULTS } from "openclaw/plugin-sdk/webhook-ingress";
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
const handlerSpy = vi.hoisted(() => vi.fn((..._args: unknown[]): unknown => undefined));
@@ -24,6 +25,7 @@ const TELEGRAM_SECRET = "secret";
const TELEGRAM_WEBHOOK_PATH = "/hook";
const WEBHOOK_TEST_YIELD_MS = 0;
const WEBHOOK_DRAIN_GUARD_MS = 5;
const TELEGRAM_WEBHOOK_RATE_LIMIT_BURST = WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests + 10;
function collectResponseBody(
res: IncomingMessage,
@@ -558,6 +560,148 @@ describe("startTelegramWebhook", () => {
);
});
it("rate limits repeated invalid secret guesses before authentication succeeds", async () => {
handlerSpy.mockClear();
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
},
async ({ port }) => {
let saw429 = false;
for (let i = 0; i < TELEGRAM_WEBHOOK_RATE_LIMIT_BURST; i += 1) {
const response = await postWebhookJson({
url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH),
payload: JSON.stringify({ update_id: i, message: { text: `guess ${i}` } }),
secret: `wrong-secret-${String(i).padStart(3, "0")}`,
});
if (response.status === 429) {
saw429 = true;
expect(await response.text()).toBe("Too Many Requests");
break;
}
expect(response.status).toBe(401);
expect(await response.text()).toBe("unauthorized");
}
expect(saw429).toBe(true);
const validResponse = await postWebhookJson({
url: webhookUrl(port, TELEGRAM_WEBHOOK_PATH),
payload: JSON.stringify({ update_id: 999, message: { text: "hello" } }),
secret: TELEGRAM_SECRET,
});
expect(validResponse.status).toBe(429);
expect(await validResponse.text()).toBe("Too Many Requests");
expect(handlerSpy).not.toHaveBeenCalled();
},
);
});
it("uses the forwarded client ip when trusted proxies are configured", async () => {
handlerSpy.mockClear();
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
config: {
gateway: {
trustedProxies: ["127.0.0.1"],
},
},
},
async ({ port }) => {
for (let i = 0; i < TELEGRAM_WEBHOOK_RATE_LIMIT_BURST; i += 1) {
const response = await fetchWithTimeout(
webhookUrl(port, TELEGRAM_WEBHOOK_PATH),
{
method: "POST",
headers: {
"content-type": "application/json",
"x-forwarded-for": "198.51.100.10",
"x-telegram-bot-api-secret-token": `wrong-secret-${String(i).padStart(3, "0")}`,
},
body: JSON.stringify({ update_id: i, message: { text: `guess ${i}` } }),
},
5_000,
);
if (response.status === 429) {
break;
}
expect(response.status).toBe(401);
}
const isolatedClient = await fetchWithTimeout(
webhookUrl(port, TELEGRAM_WEBHOOK_PATH),
{
method: "POST",
headers: {
"content-type": "application/json",
"x-forwarded-for": "203.0.113.20",
"x-telegram-bot-api-secret-token": TELEGRAM_SECRET,
},
body: JSON.stringify({ update_id: 201, message: { text: "hello" } }),
},
5_000,
);
expect(isolatedClient.status).toBe(200);
expect(handlerSpy).toHaveBeenCalledTimes(1);
},
);
});
it("keeps rate-limit state isolated per webhook listener", async () => {
handlerSpy.mockClear();
const firstAbort = new AbortController();
const secondAbort = new AbortController();
const first = await startTelegramWebhook({
token: TELEGRAM_TOKEN,
port: 0,
abortSignal: firstAbort.signal,
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
});
const second = await startTelegramWebhook({
token: TELEGRAM_TOKEN,
port: 0,
abortSignal: secondAbort.signal,
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
});
try {
const firstPort = getServerPort(first.server);
const secondPort = getServerPort(second.server);
for (let i = 0; i < TELEGRAM_WEBHOOK_RATE_LIMIT_BURST; i += 1) {
const response = await postWebhookJson({
url: webhookUrl(firstPort, TELEGRAM_WEBHOOK_PATH),
payload: JSON.stringify({ update_id: i, message: { text: `guess ${i}` } }),
secret: `wrong-secret-${String(i).padStart(3, "0")}`,
});
if (response.status === 429) {
break;
}
}
const secondResponse = await postWebhookJson({
url: webhookUrl(secondPort, TELEGRAM_WEBHOOK_PATH),
payload: JSON.stringify({ update_id: 301, message: { text: "hello" } }),
secret: TELEGRAM_SECRET,
});
expect(secondResponse.status).toBe(200);
expect(handlerSpy).toHaveBeenCalledTimes(1);
} finally {
firstAbort.abort();
secondAbort.abort();
}
});
it("rejects startup when webhook secret is missing", async () => {
await expect(
startTelegramWebhook({

View File

@@ -1,5 +1,7 @@
import { timingSafeEqual } from "node:crypto";
import { createServer } from "node:http";
import type { IncomingMessage } from "node:http";
import net from "node:net";
import * as grammy from "grammy";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { isDiagnosticsEnabled } from "openclaw/plugin-sdk/infra-runtime";
@@ -14,6 +16,11 @@ import {
startDiagnosticHeartbeat,
stopDiagnosticHeartbeat,
} from "openclaw/plugin-sdk/text-runtime";
import {
applyBasicWebhookRequestGuards,
createFixedWindowRateLimiter,
WEBHOOK_RATE_LIMIT_DEFAULTS,
} from "openclaw/plugin-sdk/webhook-ingress";
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { createTelegramBot } from "./bot.js";
@@ -103,6 +110,132 @@ function hasValidTelegramWebhookSecret(
return actual.length === expected.length && timingSafeEqual(actual, expected);
}
function parseIpLiteral(value: string | undefined): string | undefined {
const trimmed = value?.trim();
if (!trimmed) {
return undefined;
}
if (trimmed.startsWith("[")) {
const end = trimmed.indexOf("]");
if (end !== -1) {
const candidate = trimmed.slice(1, end);
return net.isIP(candidate) === 0 ? undefined : candidate;
}
}
if (net.isIP(trimmed) !== 0) {
return trimmed;
}
const lastColon = trimmed.lastIndexOf(":");
if (lastColon > -1 && trimmed.includes(".") && trimmed.indexOf(":") === lastColon) {
const candidate = trimmed.slice(0, lastColon);
return net.isIP(candidate) === 4 ? candidate : undefined;
}
return undefined;
}
function isTrustedProxyAddress(
ip: string | undefined,
trustedProxies?: readonly string[],
): boolean {
const candidate = parseIpLiteral(ip);
if (!candidate || !trustedProxies?.length) {
return false;
}
const blockList = new net.BlockList();
for (const proxy of trustedProxies) {
const trimmed = proxy.trim();
if (!trimmed) {
continue;
}
if (trimmed.includes("/")) {
const [address, prefix] = trimmed.split("/", 2);
const parsedPrefix = Number.parseInt(prefix ?? "", 10);
const family = net.isIP(address);
if (
family === 4 &&
Number.isInteger(parsedPrefix) &&
parsedPrefix >= 0 &&
parsedPrefix <= 32
) {
blockList.addSubnet(address, parsedPrefix, "ipv4");
}
if (
family === 6 &&
Number.isInteger(parsedPrefix) &&
parsedPrefix >= 0 &&
parsedPrefix <= 128
) {
blockList.addSubnet(address, parsedPrefix, "ipv6");
}
continue;
}
if (net.isIP(trimmed) === 4) {
blockList.addAddress(trimmed, "ipv4");
continue;
}
if (net.isIP(trimmed) === 6) {
blockList.addAddress(trimmed, "ipv6");
}
}
return blockList.check(candidate, net.isIP(candidate) === 6 ? "ipv6" : "ipv4");
}
function resolveForwardedClientIp(
forwardedFor: string | undefined,
trustedProxies?: readonly string[],
): string | undefined {
if (!trustedProxies?.length) {
return undefined;
}
const forwardedChain = forwardedFor
?.split(",")
.map((entry) => parseIpLiteral(entry))
.filter((entry): entry is string => Boolean(entry));
if (!forwardedChain?.length) {
return undefined;
}
for (let index = forwardedChain.length - 1; index >= 0; index -= 1) {
const hop = forwardedChain[index];
if (!isTrustedProxyAddress(hop, trustedProxies)) {
return hop;
}
}
return undefined;
}
function resolveTelegramWebhookClientIp(req: IncomingMessage, config?: OpenClawConfig): string {
const remoteAddress = parseIpLiteral(req.socket.remoteAddress);
const trustedProxies = config?.gateway?.trustedProxies;
if (!remoteAddress) {
return "unknown";
}
if (!isTrustedProxyAddress(remoteAddress, trustedProxies)) {
return remoteAddress;
}
const forwardedFor = Array.isArray(req.headers["x-forwarded-for"])
? req.headers["x-forwarded-for"][0]
: req.headers["x-forwarded-for"];
const forwardedClientIp = resolveForwardedClientIp(forwardedFor, trustedProxies);
if (forwardedClientIp) {
return forwardedClientIp;
}
if (config?.gateway?.allowRealIpFallback === true) {
const realIp = Array.isArray(req.headers["x-real-ip"])
? req.headers["x-real-ip"][0]
: req.headers["x-real-ip"];
return parseIpLiteral(realIp) ?? "unknown";
}
return "unknown";
}
function resolveTelegramWebhookRateLimitKey(
req: IncomingMessage,
path: string,
config?: OpenClawConfig,
): string {
return `${path}:${resolveTelegramWebhookClientIp(req, config)}`;
}
export async function startTelegramWebhook(opts: {
token: string;
accountId?: string;
@@ -143,6 +276,11 @@ export async function startTelegramWebhook(opts: {
runtime,
abortSignal: opts.abortSignal,
});
const telegramWebhookRateLimiter = createFixedWindowRateLimiter({
windowMs: WEBHOOK_RATE_LIMIT_DEFAULTS.windowMs,
maxRequests: WEBHOOK_RATE_LIMIT_DEFAULTS.maxRequests,
maxTrackedKeys: WEBHOOK_RATE_LIMIT_DEFAULTS.maxTrackedKeys,
});
const handler = grammy.webhookCallback(bot, "callback", {
secretToken: secret,
onTimeout: "return",
@@ -172,6 +310,18 @@ export async function startTelegramWebhook(opts: {
res.end();
return;
}
// Apply the per-source limit before auth so invalid secret guesses consume budget
// in the same window as any later request from that source.
if (
!applyBasicWebhookRequestGuards({
req,
res,
rateLimiter: telegramWebhookRateLimiter,
rateLimitKey: resolveTelegramWebhookRateLimitKey(req, path, opts.config),
})
) {
return;
}
const startTime = Date.now();
if (diagnosticsEnabled) {
logWebhookReceived({ channel: "telegram", updateType: "telegram-post" });

View File

@@ -33,6 +33,13 @@ type ChannelSurfaceMetadata = {
configUiHints?: ConfigSchemaResponse["uiHints"];
};
function compareChannelSurfaceMetadata(
left: ChannelSurfaceMetadata,
right: ChannelSurfaceMetadata,
): number {
return left.id.localeCompare(right.id);
}
export type ConfigDocBaselineKind = "core" | "channel" | "plugin";
export type ConfigDocBaselineEntry = {
@@ -372,9 +379,9 @@ async function loadBundledConfigSchemaResponse(): Promise<ConfigSchemaResponse>
}).map((entry) => [entry.id, entry.meta] as const),
);
logConfigDocBaselineDebug(`loaded ${manifestRegistry.plugins.length} bundled plugin manifests`);
const bundledChannelPlugins = manifestRegistry.plugins.filter(
(plugin) => plugin.origin === "bundled" && plugin.channels.length > 0,
);
const bundledChannelPlugins = manifestRegistry.plugins
.filter((plugin) => plugin.origin === "bundled" && plugin.channels.length > 0)
.toSorted((left, right) => left.id.localeCompare(right.id));
const channelPlugins =
process.env.OPENCLAW_CONFIG_DOC_BASELINE_DEBUG === "1"
? await bundledChannelPlugins.reduce<Promise<ChannelSurfaceMetadata[]>>(
@@ -432,6 +439,7 @@ async function loadBundledConfigSchemaResponse(): Promise<ConfigSchemaResponse>
cache: false,
plugins: manifestRegistry.plugins
.filter((plugin) => plugin.origin === "bundled")
.toSorted((left, right) => left.id.localeCompare(right.id))
.map((plugin) => ({
id: plugin.id,
name: plugin.name,
@@ -439,7 +447,7 @@ async function loadBundledConfigSchemaResponse(): Promise<ConfigSchemaResponse>
configUiHints: plugin.configUiHints,
configSchema: plugin.configSchema,
})),
channels: channelPlugins.map((entry) => ({
channels: channelPlugins.toSorted(compareChannelSurfaceMetadata).map((entry) => ({
id: entry.id,
label: entry.label,
description: entry.description,