From e394e0f9b82746cd5bd6550f590df41e28a4e00d Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Fri, 29 May 2026 16:38:25 +0200 Subject: [PATCH] fix(qa-matrix): cap fault proxy bodies --- .../src/substrate/fault-proxy.test.ts | 47 ++++- .../qa-matrix/src/substrate/fault-proxy.ts | 163 +++++++++++++++++- 2 files changed, 201 insertions(+), 9 deletions(-) diff --git a/extensions/qa-matrix/src/substrate/fault-proxy.test.ts b/extensions/qa-matrix/src/substrate/fault-proxy.test.ts index aa620a6b39e..52a2d533c8e 100644 --- a/extensions/qa-matrix/src/substrate/fault-proxy.test.ts +++ b/extensions/qa-matrix/src/substrate/fault-proxy.test.ts @@ -4,7 +4,7 @@ import { startMatrixQaFaultProxy, type MatrixQaFaultProxy } from "./fault-proxy. const servers: Array<{ close(): Promise }> = []; -async function startTargetServer() { +async function startTargetServer(params?: { responseBody?: string }) { const requests: Array<{ authorization?: string; body: string; @@ -23,7 +23,7 @@ async function startTargetServer() { url: req.url ?? "/", }); res.writeHead(200, { "content-type": "application/json" }); - res.end(JSON.stringify({ forwarded: true })); + res.end(params?.responseBody ?? JSON.stringify({ forwarded: true })); }); await new Promise((resolve, reject) => { server.once("error", reject); @@ -169,4 +169,47 @@ describe("Matrix QA fault proxy", () => { }, ]); }); + + it("rejects oversized forwarded request bodies before contacting the target", async () => { + const target = await startTargetServer(); + proxy = await startMatrixQaFaultProxy({ + maxRequestBytes: 4, + targetBaseUrl: target.baseUrl, + rules: [], + }); + + const rejected = await fetch(`${proxy.baseUrl}/_matrix/client/v3/send`, { + body: "12345", + method: "POST", + }); + + expect(rejected.status).toBe(413); + await expect(rejected.json()).resolves.toMatchObject({ + errcode: "MATRIX_QA_FAULT_PROXY_REQUEST_TOO_LARGE", + }); + expect(target.requests).toEqual([]); + }); + + it("rejects oversized forwarded Matrix responses without buffering the full body", async () => { + const target = await startTargetServer({ responseBody: JSON.stringify({ payload: "large" }) }); + proxy = await startMatrixQaFaultProxy({ + maxResponseBytes: 8, + targetBaseUrl: target.baseUrl, + rules: [], + }); + + const rejected = await fetch(`${proxy.baseUrl}/_matrix/client/v3/sync`); + + expect(rejected.status).toBe(502); + await expect(rejected.json()).resolves.toMatchObject({ + errcode: "MATRIX_QA_FAULT_PROXY_RESPONSE_TOO_LARGE", + }); + expect(target.requests).toEqual([ + { + body: "", + method: "GET", + url: "/_matrix/client/v3/sync", + }, + ]); + }); }); diff --git a/extensions/qa-matrix/src/substrate/fault-proxy.ts b/extensions/qa-matrix/src/substrate/fault-proxy.ts index 16143d0cde9..42792579361 100644 --- a/extensions/qa-matrix/src/substrate/fault-proxy.ts +++ b/extensions/qa-matrix/src/substrate/fault-proxy.ts @@ -4,8 +4,12 @@ import { type IncomingMessage, type ServerResponse, } from "node:http"; +import { readResponseWithLimit } from "openclaw/plugin-sdk/response-limit-runtime"; import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime"; +const DEFAULT_FAULT_PROXY_REQUEST_MAX_BYTES = 16 * 1024 * 1024; +const DEFAULT_FAULT_PROXY_RESPONSE_MAX_BYTES = 16 * 1024 * 1024; + const HOP_BY_HOP_HEADERS = new Set([ "connection", "content-length", @@ -38,6 +42,17 @@ type MatrixQaFaultProxyForwardedResponse = { status: number; }; +class MatrixQaFaultProxyHttpError extends Error { + constructor( + readonly status: number, + readonly code: string, + message: string, + ) { + super(message); + this.name = "MatrixQaFaultProxyHttpError"; + } +} + export type MatrixQaFaultProxyRule = { id: string; match(request: MatrixQaFaultProxyRequest): boolean; @@ -87,12 +102,122 @@ function buildFetchHeaders(headers: IncomingHttpHeaders) { return result; } -async function readRequestBody(req: IncomingMessage) { - const chunks: Buffer[] = []; - for await (const chunk of req) { - chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk); +function normalizeByteChunk(chunk: string | Buffer): Buffer { + return typeof chunk === "string" ? Buffer.from(chunk) : chunk; +} + +function rejectOversizedRequestBody(maxBytes: number, size: number) { + return new MatrixQaFaultProxyHttpError( + 413, + "MATRIX_QA_FAULT_PROXY_REQUEST_TOO_LARGE", + `Matrix QA fault proxy request body exceeds ${maxBytes} bytes (got at least ${size})`, + ); +} + +function rejectAbortedRequestBody() { + return new MatrixQaFaultProxyHttpError( + 400, + "MATRIX_QA_FAULT_PROXY_REQUEST_ABORTED", + "Matrix QA fault proxy request body ended before upload completed", + ); +} + +function drainRejectedRequestBody(req: IncomingMessage) { + const onError = () => undefined; + const onClose = () => { + req.off("error", onError); + }; + req.on("error", onError); + req.once("close", onClose); + req.resume(); +} + +async function readRequestBody(req: IncomingMessage, maxBytes: number) { + const contentLength = normalizeHeaderValue(req.headers["content-length"]); + if (contentLength !== undefined) { + const size = Number(contentLength); + if (Number.isFinite(size) && size > maxBytes) { + drainRejectedRequestBody(req); + throw rejectOversizedRequestBody(maxBytes, size); + } } - return Buffer.concat(chunks); + + return await new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let total = 0; + let settled = false; + + const cleanup = () => { + req.off("data", onData); + req.off("end", onEnd); + req.off("error", onError); + req.off("aborted", onAborted); + req.off("close", onClose); + }; + const stopReading = () => { + req.off("data", onData); + req.off("end", onEnd); + req.off("aborted", onAborted); + }; + const settleReject = (error: Error, options?: { drain?: boolean }) => { + if (settled) { + return; + } + settled = true; + if (options?.drain) { + stopReading(); + req.resume(); + } else { + cleanup(); + } + reject(error); + }; + const onData = (chunk: string | Buffer) => { + const buffer = normalizeByteChunk(chunk); + const nextTotal = total + buffer.byteLength; + if (nextTotal > maxBytes) { + settleReject(rejectOversizedRequestBody(maxBytes, nextTotal), { drain: true }); + return; + } + chunks.push(buffer); + total = nextTotal; + }; + const onEnd = () => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve(Buffer.concat(chunks, total)); + }; + const onError = (error: Error) => { + if (settled) { + cleanup(); + return; + } + settleReject(error); + }; + const onAborted = () => { + settleReject(rejectAbortedRequestBody()); + }; + const onClose = () => { + if (settled) { + cleanup(); + return; + } + if (!req.complete) { + settleReject(rejectAbortedRequestBody()); + return; + } + cleanup(); + }; + + req.on("data", onData); + req.once("end", onEnd); + req.once("error", onError); + req.once("aborted", onAborted); + req.once("close", onClose); + }); } function bufferToArrayBuffer(buffer: Buffer) { @@ -113,6 +238,7 @@ function writeJsonResponse(res: ServerResponse, response: MatrixQaFaultProxyResp async function forwardMatrixQaFaultProxyRequest(params: { body: Buffer; + maxResponseBytes: number; req: IncomingMessage; targetUrl: URL; }): Promise { @@ -133,7 +259,14 @@ async function forwardMatrixQaFaultProxyRequest(params: { }); try { return { - body: Buffer.from(await response.arrayBuffer()), + body: await readResponseWithLimit(response, params.maxResponseBytes, { + onOverflow: ({ size }) => + new MatrixQaFaultProxyHttpError( + 502, + "MATRIX_QA_FAULT_PROXY_RESPONSE_TOO_LARGE", + `Matrix QA fault proxy upstream response exceeds ${params.maxResponseBytes} bytes (got at least ${size})`, + ), + }), headers: response.headers, status: response.status, }; @@ -157,10 +290,14 @@ function writeForwardedResponse( } export async function startMatrixQaFaultProxy(params: { + maxRequestBytes?: number; + maxResponseBytes?: number; rules: MatrixQaFaultProxyRule[]; targetBaseUrl: string; }): Promise { const targetBaseUrl = new URL(params.targetBaseUrl); + const maxRequestBytes = params.maxRequestBytes ?? DEFAULT_FAULT_PROXY_REQUEST_MAX_BYTES; + const maxResponseBytes = params.maxResponseBytes ?? DEFAULT_FAULT_PROXY_RESPONSE_MAX_BYTES; const hits: MatrixQaFaultProxyHit[] = []; const server = createServer(async (req, res) => { try { @@ -174,7 +311,7 @@ export async function startMatrixQaFaultProxy(params: { path, search: requestUrl.search, }; - const body = await readRequestBody(req); + const body = await readRequestBody(req, maxRequestBytes); const rule = params.rules.find((candidate) => candidate.match(request)); if (rule) { hits.push({ @@ -189,6 +326,7 @@ export async function startMatrixQaFaultProxy(params: { } const forwarded = await forwardMatrixQaFaultProxyRequest({ body, + maxResponseBytes, req, targetUrl: requestUrl, }); @@ -201,6 +339,17 @@ export async function startMatrixQaFaultProxy(params: { : forwarded; writeForwardedResponse(res, response); } catch (error) { + if (error instanceof MatrixQaFaultProxyHttpError) { + writeJsonResponse(res, { + body: { + errcode: error.code, + error: error.message, + }, + ...(error.status === 413 ? { headers: { connection: "close" } } : {}), + status: error.status, + }); + return; + } writeJsonResponse(res, { body: { errcode: "MATRIX_QA_FAULT_PROXY_ERROR",