fix(stream): tighten voice stream ingress guards (#66027)

* fix(stream): tighten voice stream ingress guards

* fix(stream): address review follow-ups

* fix(stream): normalize trusted proxy ip matching

* changelog: note voice-call media-stream ingress guard tightening (#66027)

* fix(stream): require non-empty trusted proxy list before honoring forwarding headers

Without an explicit trusted proxy list, the prior gate treated every
remote as 'from a trusted proxy', so enabling trustForwardingHeaders
let any direct caller spoof X-Forwarded-For / X-Real-IP and rotate the
resolved IP per request to evade maxPendingConnectionsPerIp. Require
trustedProxyIPs to be non-empty AND match the remote before trusting
forwarding headers.

---------

Co-authored-by: Devin Robison <drobison@nvidia.com>
This commit is contained in:
Agustin Rivera
2026-04-13 15:51:16 -07:00
committed by GitHub
parent 955270fb73
commit 692438cbb2
5 changed files with 426 additions and 4 deletions

View File

@@ -36,6 +36,7 @@ Docs: https://docs.openclaw.ai
- Memory/active-memory: move recalled memory onto the hidden untrusted prompt-prefix path instead of system prompt injection, label the visible Active Memory status line fields, and include the resolved recall provider/model in gateway debug logs so trace/debug output matches what the model actually saw.
- Memory/QMD: stop treating legacy lowercase `memory.md` as a second default root collection, so QMD recall no longer searches phantom `memory-alt-*` collections and builtin/QMD root-memory fallback stays aligned. (#66141) Thanks @mbelinky.
- Agents/OpenAI: map `minimal` thinking to OpenAI's supported `low` reasoning effort for GPT-5.4 requests, so embedded runs stop failing request validation.
- Voice-call/media-stream: resolve the source IP from trusted forwarding headers for per-IP pending-connection limits when `webhookSecurity.trustForwardingHeaders` and `trustedProxyIPs` are configured, and reserve `maxConnections` capacity for in-flight WebSocket upgrades so concurrent handshakes can no longer momentarily exceed the operator-set cap. (#66027) Thanks @eleqtrizit.
## 2026.4.12

View File

@@ -1,3 +1,5 @@
import type { IncomingMessage } from "node:http";
import net from "node:net";
import type {
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
@@ -257,6 +259,42 @@ describe("MediaStreamHandler security hardening", () => {
}
});
it("uses resolved client IPs for per-IP pending limits", async () => {
const handler = new MediaStreamHandler({
transcriptionProvider: createStubSttProvider(),
providerConfig: {},
preStartTimeoutMs: 5_000,
maxPendingConnections: 10,
maxPendingConnectionsPerIp: 1,
resolveClientIp: (request) => String(request.headers["x-forwarded-for"] ?? ""),
});
const server = await startWsServer(handler);
try {
const first = new WebSocket(server.url, {
headers: { "x-forwarded-for": "198.51.100.10" },
});
await withTimeout(new Promise((resolve) => first.once("open", resolve)));
const second = new WebSocket(server.url, {
headers: { "x-forwarded-for": "203.0.113.20" },
});
await withTimeout(new Promise((resolve) => second.once("open", resolve)));
expect(first.readyState).toBe(WebSocket.OPEN);
expect(second.readyState).toBe(WebSocket.OPEN);
const firstClosed = waitForClose(first);
const secondClosed = waitForClose(second);
first.close();
second.close();
await firstClosed;
await secondClosed;
} finally {
await server.close();
}
});
it("rejects upgrades when max connection cap is reached", async () => {
const handler = new MediaStreamHandler({
transcriptionProvider: createStubSttProvider(),
@@ -286,6 +324,128 @@ describe("MediaStreamHandler security hardening", () => {
}
});
it("counts in-flight upgrades against the max connection cap", () => {
const handler = new MediaStreamHandler({
transcriptionProvider: createStubSttProvider(),
providerConfig: {},
maxConnections: 2,
maxPendingConnections: 10,
maxPendingConnectionsPerIp: 10,
});
const fakeWss = {
clients: new Set([{}]),
handleUpgrade: vi.fn(),
emit: vi.fn(),
on: vi.fn(),
};
let upgradeCallback: ((ws: WebSocket) => void) | null = null;
fakeWss.handleUpgrade.mockImplementation(
(
_request: IncomingMessage,
_socket: unknown,
_head: Buffer,
callback: (ws: WebSocket) => void,
) => {
upgradeCallback = callback;
},
);
(
handler as unknown as {
wss: typeof fakeWss;
}
).wss = fakeWss;
const firstSocket = {
once: vi.fn(),
removeListener: vi.fn(),
write: vi.fn(),
destroy: vi.fn(),
};
handler.handleUpgrade(
{ socket: { remoteAddress: "127.0.0.1" } } as IncomingMessage,
firstSocket as never,
Buffer.alloc(0),
);
const secondSocket = {
once: vi.fn(),
removeListener: vi.fn(),
write: vi.fn(),
destroy: vi.fn(),
};
handler.handleUpgrade(
{ socket: { remoteAddress: "127.0.0.1" } } as IncomingMessage,
secondSocket as never,
Buffer.alloc(0),
);
expect(fakeWss.handleUpgrade).toHaveBeenCalledTimes(1);
expect(secondSocket.write).toHaveBeenCalledOnce();
expect(secondSocket.destroy).toHaveBeenCalledOnce();
expect(upgradeCallback).not.toBeNull();
const completeUpgrade = upgradeCallback as ((ws: WebSocket) => void) | null;
if (!completeUpgrade) {
throw new Error("Expected upgrade callback to be registered");
}
completeUpgrade({} as WebSocket);
expect(fakeWss.emit).toHaveBeenCalledWith(
"connection",
expect.anything(),
expect.objectContaining({ socket: { remoteAddress: "127.0.0.1" } }),
);
});
it("releases in-flight reservations when ws rejects a malformed upgrade before the callback", async () => {
const handler = new MediaStreamHandler({
transcriptionProvider: createStubSttProvider(),
providerConfig: {},
preStartTimeoutMs: 5_000,
maxConnections: 1,
maxPendingConnections: 10,
maxPendingConnectionsPerIp: 10,
});
const server = await startWsServer(handler);
const serverUrl = new URL(server.url);
try {
await withTimeout(
new Promise<void>((resolve, reject) => {
const socket = net.createConnection(
{ host: serverUrl.hostname, port: Number(serverUrl.port) },
() => {
socket.write(
[
"GET /voice/stream HTTP/1.1",
`Host: ${serverUrl.host}`,
"Upgrade: websocket",
"Connection: Upgrade",
"Sec-WebSocket-Version: 13",
"",
"",
].join("\r\n"),
);
},
);
socket.once("error", reject);
socket.once("data", () => {
socket.end();
});
socket.once("close", () => resolve());
}),
);
const ws = await connectWs(server.url);
expect(ws.readyState).toBe(WebSocket.OPEN);
ws.close();
await waitForClose(ws);
} finally {
await server.close();
}
});
it("clears pending state after valid start", async () => {
const handler = new MediaStreamHandler({
transcriptionProvider: createStubSttProvider(),

View File

@@ -32,6 +32,8 @@ export interface MediaStreamConfig {
maxPendingConnectionsPerIp?: number;
/** Max total open sockets (pending + active sessions). */
maxConnections?: number;
/** Optional trusted resolver for the source IP used by pending-connection guards. */
resolveClientIp?: (request: IncomingMessage) => string | undefined;
/** Validate whether to accept a media stream for the given call ID */
shouldAcceptStream?: (params: { callId: string; streamSid: string; token?: string }) => boolean;
/** Callback when transcript is received */
@@ -119,6 +121,7 @@ export class MediaStreamHandler {
private maxPendingConnections: number;
private maxPendingConnectionsPerIp: number;
private maxConnections: number;
private inflightUpgrades = 0;
/** TTS playback queues per stream (serialize audio to prevent overlap) */
private ttsQueues = new Map<string, TtsQueueEntry[]>();
/** Whether TTS is currently playing per stream */
@@ -148,15 +151,42 @@ export class MediaStreamHandler {
this.wss.on("connection", (ws, req) => this.handleConnection(ws, req));
}
const currentConnections = this.wss.clients.size;
const currentConnections = this.getCurrentConnectionCount();
if (currentConnections >= this.maxConnections) {
this.rejectUpgrade(socket, 503, "Too many media stream connections");
return;
}
this.wss.handleUpgrade(request, socket, head, (ws) => {
this.wss?.emit("connection", ws, request);
});
this.inflightUpgrades += 1;
let released = false;
const releaseUpgradeReservation = () => {
if (released) {
return;
}
released = true;
this.inflightUpgrades = Math.max(0, this.inflightUpgrades - 1);
};
const handleUpgradeAbort = () => {
socket.removeListener("error", handleUpgradeAbort);
socket.removeListener("close", handleUpgradeAbort);
releaseUpgradeReservation();
};
socket.once("error", handleUpgradeAbort);
socket.once("close", handleUpgradeAbort);
try {
this.wss.handleUpgrade(request, socket, head, (ws) => {
socket.removeListener("error", handleUpgradeAbort);
socket.removeListener("close", handleUpgradeAbort);
releaseUpgradeReservation();
this.wss?.emit("connection", ws, request);
});
} catch (error) {
socket.removeListener("error", handleUpgradeAbort);
socket.removeListener("close", handleUpgradeAbort);
releaseUpgradeReservation();
throw error;
}
}
/**
@@ -318,9 +348,17 @@ export class MediaStreamHandler {
}
private getClientIp(request: IncomingMessage): string {
const resolvedIp = this.config.resolveClientIp?.(request)?.trim();
if (resolvedIp) {
return resolvedIp;
}
return request.socket.remoteAddress || "unknown";
}
private getCurrentConnectionCount(): number {
return this.wss ? this.wss.clients.size + this.inflightUpgrades : this.inflightUpgrades;
}
private registerPendingConnection(ws: WebSocket, ip: string): boolean {
if (this.pendingConnections.size >= this.maxPendingConnections) {
console.warn("[MediaStream] Rejecting connection: pending connection limit reached");

View File

@@ -200,6 +200,155 @@ describe("VoiceCallWebhookServer realtime transcription provider selection", ()
});
});
describe("VoiceCallWebhookServer media stream client IP resolution", () => {
type MediaStreamRequestDouble = {
headers: Record<string, string>;
socket: { remoteAddress?: string };
};
const resolveMediaStreamClientIp = (
configOverrides: Partial<VoiceCallConfig>,
requestOverrides: Partial<MediaStreamRequestDouble> = {},
): string | undefined => {
const { manager } = createManager([]);
const server = new VoiceCallWebhookServer(
createConfig(configOverrides),
manager,
createTwilioStreamingProvider(),
);
const request = {
headers: {},
socket: { remoteAddress: "127.0.0.1" },
...requestOverrides,
};
return (
server as unknown as {
resolveMediaStreamClientIp: (request: MediaStreamRequestDouble) => string | undefined;
}
).resolveMediaStreamClientIp(request as never);
};
it("uses forwarded IPs only when forwarding trust is explicitly enabled", () => {
const ip = resolveMediaStreamClientIp(
{
webhookSecurity: {
allowedHosts: [],
trustForwardingHeaders: true,
trustedProxyIPs: ["127.0.0.1"],
},
},
{
headers: {
"x-forwarded-for": "198.51.100.10, 203.0.113.10",
},
},
);
expect(ip).toBe("203.0.113.10");
});
it("does not trust forwarded IPs when only allowedHosts is configured", () => {
const ip = resolveMediaStreamClientIp(
{
webhookSecurity: {
allowedHosts: ["voice.example.com"],
trustForwardingHeaders: false,
trustedProxyIPs: ["127.0.0.1"],
},
},
{
headers: {
"x-forwarded-for": "198.51.100.10",
"x-real-ip": "198.51.100.11",
},
},
);
expect(ip).toBe("127.0.0.1");
});
it("ignores spoofed forwarded IPs from untrusted remotes", () => {
const ip = resolveMediaStreamClientIp(
{
webhookSecurity: {
allowedHosts: [],
trustForwardingHeaders: true,
trustedProxyIPs: ["203.0.113.10"],
},
},
{
headers: {
"x-forwarded-for": "198.51.100.10",
},
socket: { remoteAddress: "127.0.0.1" },
},
);
expect(ip).toBe("127.0.0.1");
});
it("walks the forwarded chain from the right to support trusted multi-proxy deployments", () => {
const ip = resolveMediaStreamClientIp(
{
webhookSecurity: {
allowedHosts: [],
trustForwardingHeaders: true,
trustedProxyIPs: ["127.0.0.1", "203.0.113.10"],
},
},
{
headers: {
"x-forwarded-for": "198.51.100.10, 203.0.113.10",
},
},
);
expect(ip).toBe("198.51.100.10");
});
it("ignores forwarded IPs when no trusted proxy is configured", () => {
const ip = resolveMediaStreamClientIp(
{
webhookSecurity: {
allowedHosts: [],
trustForwardingHeaders: true,
trustedProxyIPs: [],
},
},
{
headers: {
"x-forwarded-for": "198.51.100.10",
"x-real-ip": "198.51.100.11",
},
socket: { remoteAddress: "127.0.0.1" },
},
);
expect(ip).toBe("127.0.0.1");
});
it("matches trusted proxies when the remote uses an IPv4-mapped form", () => {
const ip = resolveMediaStreamClientIp(
{
webhookSecurity: {
allowedHosts: [],
trustForwardingHeaders: true,
trustedProxyIPs: ["127.0.0.1", "203.0.113.10"],
},
},
{
headers: {
"x-forwarded-for": "198.51.100.10, 203.0.113.10",
},
socket: { remoteAddress: "::ffff:127.0.0.1" },
},
);
expect(ip).toBe("198.51.100.10");
});
});
async function runStaleCallReaperCase(params: {
callAgeMs: number;
staleCallReaperSeconds: number;

View File

@@ -57,6 +57,55 @@ function buildRequestUrl(
return new URL(requestUrl ?? "/", `http://${requestHost ?? fallbackHost}`);
}
function normalizeProxyIp(value: string | undefined): string | undefined {
const trimmed = value?.trim();
if (!trimmed) {
return undefined;
}
const unwrapped =
trimmed.startsWith("[") && trimmed.endsWith("]") ? trimmed.slice(1, -1) : trimmed;
const normalized = unwrapped.toLowerCase();
const mappedIpv4Prefix = "::ffff:";
if (normalized.startsWith(mappedIpv4Prefix)) {
const mappedIpv4 = normalized.slice(mappedIpv4Prefix.length);
if (/^\d{1,3}(?:\.\d{1,3}){3}$/.test(mappedIpv4)) {
return mappedIpv4;
}
}
return normalized;
}
function resolveForwardedClientIp(
request: http.IncomingMessage,
trustedProxyIPs: readonly string[],
): string | undefined {
const normalizedTrustedProxyIps = new Set(
trustedProxyIPs.map((ip) => normalizeProxyIp(ip)).filter((ip): ip is string => Boolean(ip)),
);
const forwardedFor = getHeader(request.headers, "x-forwarded-for");
if (forwardedFor) {
const forwardedIps = forwardedFor
.split(",")
.map((part) => part.trim())
.filter(Boolean);
if (forwardedIps.length > 0) {
if (normalizedTrustedProxyIps.size === 0) {
return forwardedIps[0];
}
for (let index = forwardedIps.length - 1; index >= 0; index -= 1) {
const hop = forwardedIps[index];
if (!normalizedTrustedProxyIps.has(normalizeProxyIp(hop) ?? "")) {
return hop;
}
}
return forwardedIps[0];
}
}
const realIp = getHeader(request.headers, "x-real-ip")?.trim();
return realIp || undefined;
}
function normalizeWebhookResponse(parsed: {
statusCode?: number;
providerResponseHeaders?: Record<string, string>;
@@ -132,6 +181,30 @@ export class VoiceCallWebhookServer {
this.pendingDisconnectHangups.delete(providerCallId);
}
private resolveMediaStreamClientIp(request: http.IncomingMessage): string | undefined {
const remoteIp = request.socket.remoteAddress ?? undefined;
const trustedProxyIPs = this.config.webhookSecurity.trustedProxyIPs.filter(Boolean);
const normalizedTrustedProxyIps = new Set(
trustedProxyIPs.map((ip) => normalizeProxyIp(ip)).filter((ip): ip is string => Boolean(ip)),
);
const normalizedRemoteIp = normalizeProxyIp(remoteIp);
const fromTrustedProxy =
normalizedTrustedProxyIps.size > 0 &&
normalizedRemoteIp !== undefined &&
normalizedTrustedProxyIps.has(normalizedRemoteIp);
const shouldTrustForwardingHeaders =
this.config.webhookSecurity.trustForwardingHeaders && fromTrustedProxy;
if (shouldTrustForwardingHeaders) {
const forwardedIp = resolveForwardedClientIp(request, trustedProxyIPs);
if (forwardedIp) {
return forwardedIp;
}
}
return remoteIp;
}
private shouldSuppressBargeInForInitialMessage(call: CallRecord | undefined): boolean {
if (!call || call.direction !== "outbound") {
return false;
@@ -202,6 +275,7 @@ export class VoiceCallWebhookServer {
maxPendingConnections: streaming.maxPendingConnections,
maxPendingConnectionsPerIp: streaming.maxPendingConnectionsPerIp,
maxConnections: streaming.maxConnections,
resolveClientIp: (request) => this.resolveMediaStreamClientIp(request),
shouldAcceptStream: ({ callId, token }) => {
const call = this.manager.getCallByProviderCallId(callId);
if (!call) {