From 692438cbb22e3c5c36236adaf556e2768cb1cfbd Mon Sep 17 00:00:00 2001 From: Agustin Rivera <31522568+eleqtrizit@users.noreply.github.com> Date: Mon, 13 Apr 2026 15:51:16 -0700 Subject: [PATCH] fix(stream): tighten voice stream ingress guards (#66027) * fix(stream): tighten voice stream ingress guards * fix(stream): address review follow-ups * fix(stream): normalize trusted proxy ip matching * changelog: note voice-call media-stream ingress guard tightening (#66027) * fix(stream): require non-empty trusted proxy list before honoring forwarding headers Without an explicit trusted proxy list, the prior gate treated every remote as 'from a trusted proxy', so enabling trustForwardingHeaders let any direct caller spoof X-Forwarded-For / X-Real-IP and rotate the resolved IP per request to evade maxPendingConnectionsPerIp. Require trustedProxyIPs to be non-empty AND match the remote before trusting forwarding headers. --------- Co-authored-by: Devin Robison --- CHANGELOG.md | 1 + .../voice-call/src/media-stream.test.ts | 160 ++++++++++++++++++ extensions/voice-call/src/media-stream.ts | 46 ++++- extensions/voice-call/src/webhook.test.ts | 149 ++++++++++++++++ extensions/voice-call/src/webhook.ts | 74 ++++++++ 5 files changed, 426 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b7f309cdf70..38f06436ae9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai - Memory/active-memory: move recalled memory onto the hidden untrusted prompt-prefix path instead of system prompt injection, label the visible Active Memory status line fields, and include the resolved recall provider/model in gateway debug logs so trace/debug output matches what the model actually saw. - Memory/QMD: stop treating legacy lowercase `memory.md` as a second default root collection, so QMD recall no longer searches phantom `memory-alt-*` collections and builtin/QMD root-memory fallback stays aligned. (#66141) Thanks @mbelinky. - Agents/OpenAI: map `minimal` thinking to OpenAI's supported `low` reasoning effort for GPT-5.4 requests, so embedded runs stop failing request validation. +- Voice-call/media-stream: resolve the source IP from trusted forwarding headers for per-IP pending-connection limits when `webhookSecurity.trustForwardingHeaders` and `trustedProxyIPs` are configured, and reserve `maxConnections` capacity for in-flight WebSocket upgrades so concurrent handshakes can no longer momentarily exceed the operator-set cap. (#66027) Thanks @eleqtrizit. ## 2026.4.12 diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts index 03c9540af9e..16f51aeb4f6 100644 --- a/extensions/voice-call/src/media-stream.test.ts +++ b/extensions/voice-call/src/media-stream.test.ts @@ -1,3 +1,5 @@ +import type { IncomingMessage } from "node:http"; +import net from "node:net"; import type { RealtimeTranscriptionProviderPlugin, RealtimeTranscriptionSession, @@ -257,6 +259,42 @@ describe("MediaStreamHandler security hardening", () => { } }); + it("uses resolved client IPs for per-IP pending limits", async () => { + const handler = new MediaStreamHandler({ + transcriptionProvider: createStubSttProvider(), + providerConfig: {}, + preStartTimeoutMs: 5_000, + maxPendingConnections: 10, + maxPendingConnectionsPerIp: 1, + resolveClientIp: (request) => String(request.headers["x-forwarded-for"] ?? ""), + }); + const server = await startWsServer(handler); + + try { + const first = new WebSocket(server.url, { + headers: { "x-forwarded-for": "198.51.100.10" }, + }); + await withTimeout(new Promise((resolve) => first.once("open", resolve))); + + const second = new WebSocket(server.url, { + headers: { "x-forwarded-for": "203.0.113.20" }, + }); + await withTimeout(new Promise((resolve) => second.once("open", resolve))); + + expect(first.readyState).toBe(WebSocket.OPEN); + expect(second.readyState).toBe(WebSocket.OPEN); + + const firstClosed = waitForClose(first); + const secondClosed = waitForClose(second); + first.close(); + second.close(); + await firstClosed; + await secondClosed; + } finally { + await server.close(); + } + }); + it("rejects upgrades when max connection cap is reached", async () => { const handler = new MediaStreamHandler({ transcriptionProvider: createStubSttProvider(), @@ -286,6 +324,128 @@ describe("MediaStreamHandler security hardening", () => { } }); + it("counts in-flight upgrades against the max connection cap", () => { + const handler = new MediaStreamHandler({ + transcriptionProvider: createStubSttProvider(), + providerConfig: {}, + maxConnections: 2, + maxPendingConnections: 10, + maxPendingConnectionsPerIp: 10, + }); + + const fakeWss = { + clients: new Set([{}]), + handleUpgrade: vi.fn(), + emit: vi.fn(), + on: vi.fn(), + }; + let upgradeCallback: ((ws: WebSocket) => void) | null = null; + fakeWss.handleUpgrade.mockImplementation( + ( + _request: IncomingMessage, + _socket: unknown, + _head: Buffer, + callback: (ws: WebSocket) => void, + ) => { + upgradeCallback = callback; + }, + ); + + ( + handler as unknown as { + wss: typeof fakeWss; + } + ).wss = fakeWss; + + const firstSocket = { + once: vi.fn(), + removeListener: vi.fn(), + write: vi.fn(), + destroy: vi.fn(), + }; + handler.handleUpgrade( + { socket: { remoteAddress: "127.0.0.1" } } as IncomingMessage, + firstSocket as never, + Buffer.alloc(0), + ); + + const secondSocket = { + once: vi.fn(), + removeListener: vi.fn(), + write: vi.fn(), + destroy: vi.fn(), + }; + handler.handleUpgrade( + { socket: { remoteAddress: "127.0.0.1" } } as IncomingMessage, + secondSocket as never, + Buffer.alloc(0), + ); + + expect(fakeWss.handleUpgrade).toHaveBeenCalledTimes(1); + expect(secondSocket.write).toHaveBeenCalledOnce(); + expect(secondSocket.destroy).toHaveBeenCalledOnce(); + + expect(upgradeCallback).not.toBeNull(); + const completeUpgrade = upgradeCallback as ((ws: WebSocket) => void) | null; + if (!completeUpgrade) { + throw new Error("Expected upgrade callback to be registered"); + } + completeUpgrade({} as WebSocket); + expect(fakeWss.emit).toHaveBeenCalledWith( + "connection", + expect.anything(), + expect.objectContaining({ socket: { remoteAddress: "127.0.0.1" } }), + ); + }); + + it("releases in-flight reservations when ws rejects a malformed upgrade before the callback", async () => { + const handler = new MediaStreamHandler({ + transcriptionProvider: createStubSttProvider(), + providerConfig: {}, + preStartTimeoutMs: 5_000, + maxConnections: 1, + maxPendingConnections: 10, + maxPendingConnectionsPerIp: 10, + }); + const server = await startWsServer(handler); + const serverUrl = new URL(server.url); + + try { + await withTimeout( + new Promise((resolve, reject) => { + const socket = net.createConnection( + { host: serverUrl.hostname, port: Number(serverUrl.port) }, + () => { + socket.write( + [ + "GET /voice/stream HTTP/1.1", + `Host: ${serverUrl.host}`, + "Upgrade: websocket", + "Connection: Upgrade", + "Sec-WebSocket-Version: 13", + "", + "", + ].join("\r\n"), + ); + }, + ); + socket.once("error", reject); + socket.once("data", () => { + socket.end(); + }); + socket.once("close", () => resolve()); + }), + ); + + const ws = await connectWs(server.url); + expect(ws.readyState).toBe(WebSocket.OPEN); + ws.close(); + await waitForClose(ws); + } finally { + await server.close(); + } + }); + it("clears pending state after valid start", async () => { const handler = new MediaStreamHandler({ transcriptionProvider: createStubSttProvider(), diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index 1b856e44a1f..87320077e1a 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -32,6 +32,8 @@ export interface MediaStreamConfig { maxPendingConnectionsPerIp?: number; /** Max total open sockets (pending + active sessions). */ maxConnections?: number; + /** Optional trusted resolver for the source IP used by pending-connection guards. */ + resolveClientIp?: (request: IncomingMessage) => string | undefined; /** Validate whether to accept a media stream for the given call ID */ shouldAcceptStream?: (params: { callId: string; streamSid: string; token?: string }) => boolean; /** Callback when transcript is received */ @@ -119,6 +121,7 @@ export class MediaStreamHandler { private maxPendingConnections: number; private maxPendingConnectionsPerIp: number; private maxConnections: number; + private inflightUpgrades = 0; /** TTS playback queues per stream (serialize audio to prevent overlap) */ private ttsQueues = new Map(); /** Whether TTS is currently playing per stream */ @@ -148,15 +151,42 @@ export class MediaStreamHandler { this.wss.on("connection", (ws, req) => this.handleConnection(ws, req)); } - const currentConnections = this.wss.clients.size; + const currentConnections = this.getCurrentConnectionCount(); if (currentConnections >= this.maxConnections) { this.rejectUpgrade(socket, 503, "Too many media stream connections"); return; } - this.wss.handleUpgrade(request, socket, head, (ws) => { - this.wss?.emit("connection", ws, request); - }); + this.inflightUpgrades += 1; + let released = false; + const releaseUpgradeReservation = () => { + if (released) { + return; + } + released = true; + this.inflightUpgrades = Math.max(0, this.inflightUpgrades - 1); + }; + const handleUpgradeAbort = () => { + socket.removeListener("error", handleUpgradeAbort); + socket.removeListener("close", handleUpgradeAbort); + releaseUpgradeReservation(); + }; + socket.once("error", handleUpgradeAbort); + socket.once("close", handleUpgradeAbort); + + try { + this.wss.handleUpgrade(request, socket, head, (ws) => { + socket.removeListener("error", handleUpgradeAbort); + socket.removeListener("close", handleUpgradeAbort); + releaseUpgradeReservation(); + this.wss?.emit("connection", ws, request); + }); + } catch (error) { + socket.removeListener("error", handleUpgradeAbort); + socket.removeListener("close", handleUpgradeAbort); + releaseUpgradeReservation(); + throw error; + } } /** @@ -318,9 +348,17 @@ export class MediaStreamHandler { } private getClientIp(request: IncomingMessage): string { + const resolvedIp = this.config.resolveClientIp?.(request)?.trim(); + if (resolvedIp) { + return resolvedIp; + } return request.socket.remoteAddress || "unknown"; } + private getCurrentConnectionCount(): number { + return this.wss ? this.wss.clients.size + this.inflightUpgrades : this.inflightUpgrades; + } + private registerPendingConnection(ws: WebSocket, ip: string): boolean { if (this.pendingConnections.size >= this.maxPendingConnections) { console.warn("[MediaStream] Rejecting connection: pending connection limit reached"); diff --git a/extensions/voice-call/src/webhook.test.ts b/extensions/voice-call/src/webhook.test.ts index 12b146cce67..3210901c0e6 100644 --- a/extensions/voice-call/src/webhook.test.ts +++ b/extensions/voice-call/src/webhook.test.ts @@ -200,6 +200,155 @@ describe("VoiceCallWebhookServer realtime transcription provider selection", () }); }); +describe("VoiceCallWebhookServer media stream client IP resolution", () => { + type MediaStreamRequestDouble = { + headers: Record; + socket: { remoteAddress?: string }; + }; + + const resolveMediaStreamClientIp = ( + configOverrides: Partial, + requestOverrides: Partial = {}, + ): string | undefined => { + const { manager } = createManager([]); + const server = new VoiceCallWebhookServer( + createConfig(configOverrides), + manager, + createTwilioStreamingProvider(), + ); + const request = { + headers: {}, + socket: { remoteAddress: "127.0.0.1" }, + ...requestOverrides, + }; + + return ( + server as unknown as { + resolveMediaStreamClientIp: (request: MediaStreamRequestDouble) => string | undefined; + } + ).resolveMediaStreamClientIp(request as never); + }; + + it("uses forwarded IPs only when forwarding trust is explicitly enabled", () => { + const ip = resolveMediaStreamClientIp( + { + webhookSecurity: { + allowedHosts: [], + trustForwardingHeaders: true, + trustedProxyIPs: ["127.0.0.1"], + }, + }, + { + headers: { + "x-forwarded-for": "198.51.100.10, 203.0.113.10", + }, + }, + ); + + expect(ip).toBe("203.0.113.10"); + }); + + it("does not trust forwarded IPs when only allowedHosts is configured", () => { + const ip = resolveMediaStreamClientIp( + { + webhookSecurity: { + allowedHosts: ["voice.example.com"], + trustForwardingHeaders: false, + trustedProxyIPs: ["127.0.0.1"], + }, + }, + { + headers: { + "x-forwarded-for": "198.51.100.10", + "x-real-ip": "198.51.100.11", + }, + }, + ); + + expect(ip).toBe("127.0.0.1"); + }); + + it("ignores spoofed forwarded IPs from untrusted remotes", () => { + const ip = resolveMediaStreamClientIp( + { + webhookSecurity: { + allowedHosts: [], + trustForwardingHeaders: true, + trustedProxyIPs: ["203.0.113.10"], + }, + }, + { + headers: { + "x-forwarded-for": "198.51.100.10", + }, + socket: { remoteAddress: "127.0.0.1" }, + }, + ); + + expect(ip).toBe("127.0.0.1"); + }); + + it("walks the forwarded chain from the right to support trusted multi-proxy deployments", () => { + const ip = resolveMediaStreamClientIp( + { + webhookSecurity: { + allowedHosts: [], + trustForwardingHeaders: true, + trustedProxyIPs: ["127.0.0.1", "203.0.113.10"], + }, + }, + { + headers: { + "x-forwarded-for": "198.51.100.10, 203.0.113.10", + }, + }, + ); + + expect(ip).toBe("198.51.100.10"); + }); + + it("ignores forwarded IPs when no trusted proxy is configured", () => { + const ip = resolveMediaStreamClientIp( + { + webhookSecurity: { + allowedHosts: [], + trustForwardingHeaders: true, + trustedProxyIPs: [], + }, + }, + { + headers: { + "x-forwarded-for": "198.51.100.10", + "x-real-ip": "198.51.100.11", + }, + socket: { remoteAddress: "127.0.0.1" }, + }, + ); + + expect(ip).toBe("127.0.0.1"); + }); + + it("matches trusted proxies when the remote uses an IPv4-mapped form", () => { + const ip = resolveMediaStreamClientIp( + { + webhookSecurity: { + allowedHosts: [], + trustForwardingHeaders: true, + trustedProxyIPs: ["127.0.0.1", "203.0.113.10"], + }, + }, + { + headers: { + "x-forwarded-for": "198.51.100.10, 203.0.113.10", + }, + socket: { remoteAddress: "::ffff:127.0.0.1" }, + }, + ); + + expect(ip).toBe("198.51.100.10"); + }); +}); + async function runStaleCallReaperCase(params: { callAgeMs: number; staleCallReaperSeconds: number; diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index 3e335d04a5f..441bd23023c 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -57,6 +57,55 @@ function buildRequestUrl( return new URL(requestUrl ?? "/", `http://${requestHost ?? fallbackHost}`); } +function normalizeProxyIp(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + if (!trimmed) { + return undefined; + } + const unwrapped = + trimmed.startsWith("[") && trimmed.endsWith("]") ? trimmed.slice(1, -1) : trimmed; + const normalized = unwrapped.toLowerCase(); + const mappedIpv4Prefix = "::ffff:"; + if (normalized.startsWith(mappedIpv4Prefix)) { + const mappedIpv4 = normalized.slice(mappedIpv4Prefix.length); + if (/^\d{1,3}(?:\.\d{1,3}){3}$/.test(mappedIpv4)) { + return mappedIpv4; + } + } + return normalized; +} + +function resolveForwardedClientIp( + request: http.IncomingMessage, + trustedProxyIPs: readonly string[], +): string | undefined { + const normalizedTrustedProxyIps = new Set( + trustedProxyIPs.map((ip) => normalizeProxyIp(ip)).filter((ip): ip is string => Boolean(ip)), + ); + const forwardedFor = getHeader(request.headers, "x-forwarded-for"); + if (forwardedFor) { + const forwardedIps = forwardedFor + .split(",") + .map((part) => part.trim()) + .filter(Boolean); + if (forwardedIps.length > 0) { + if (normalizedTrustedProxyIps.size === 0) { + return forwardedIps[0]; + } + for (let index = forwardedIps.length - 1; index >= 0; index -= 1) { + const hop = forwardedIps[index]; + if (!normalizedTrustedProxyIps.has(normalizeProxyIp(hop) ?? "")) { + return hop; + } + } + return forwardedIps[0]; + } + } + + const realIp = getHeader(request.headers, "x-real-ip")?.trim(); + return realIp || undefined; +} + function normalizeWebhookResponse(parsed: { statusCode?: number; providerResponseHeaders?: Record; @@ -132,6 +181,30 @@ export class VoiceCallWebhookServer { this.pendingDisconnectHangups.delete(providerCallId); } + private resolveMediaStreamClientIp(request: http.IncomingMessage): string | undefined { + const remoteIp = request.socket.remoteAddress ?? undefined; + const trustedProxyIPs = this.config.webhookSecurity.trustedProxyIPs.filter(Boolean); + const normalizedTrustedProxyIps = new Set( + trustedProxyIPs.map((ip) => normalizeProxyIp(ip)).filter((ip): ip is string => Boolean(ip)), + ); + const normalizedRemoteIp = normalizeProxyIp(remoteIp); + const fromTrustedProxy = + normalizedTrustedProxyIps.size > 0 && + normalizedRemoteIp !== undefined && + normalizedTrustedProxyIps.has(normalizedRemoteIp); + const shouldTrustForwardingHeaders = + this.config.webhookSecurity.trustForwardingHeaders && fromTrustedProxy; + + if (shouldTrustForwardingHeaders) { + const forwardedIp = resolveForwardedClientIp(request, trustedProxyIPs); + if (forwardedIp) { + return forwardedIp; + } + } + + return remoteIp; + } + private shouldSuppressBargeInForInitialMessage(call: CallRecord | undefined): boolean { if (!call || call.direction !== "outbound") { return false; @@ -202,6 +275,7 @@ export class VoiceCallWebhookServer { maxPendingConnections: streaming.maxPendingConnections, maxPendingConnectionsPerIp: streaming.maxPendingConnectionsPerIp, maxConnections: streaming.maxConnections, + resolveClientIp: (request) => this.resolveMediaStreamClientIp(request), shouldAcceptStream: ({ callId, token }) => { const call = this.manager.getCallByProviderCallId(callId); if (!call) {