mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 23:32:52 +00:00
fix(qa-matrix): cap fault proxy bodies
This commit is contained in:
@@ -4,7 +4,7 @@ import { startMatrixQaFaultProxy, type MatrixQaFaultProxy } from "./fault-proxy.
|
||||
|
||||
const servers: Array<{ close(): Promise<void> }> = [];
|
||||
|
||||
async function startTargetServer() {
|
||||
async function startTargetServer(params?: { responseBody?: string }) {
|
||||
const requests: Array<{
|
||||
authorization?: string;
|
||||
body: string;
|
||||
@@ -23,7 +23,7 @@ async function startTargetServer() {
|
||||
url: req.url ?? "/",
|
||||
});
|
||||
res.writeHead(200, { "content-type": "application/json" });
|
||||
res.end(JSON.stringify({ forwarded: true }));
|
||||
res.end(params?.responseBody ?? JSON.stringify({ forwarded: true }));
|
||||
});
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.once("error", reject);
|
||||
@@ -169,4 +169,47 @@ describe("Matrix QA fault proxy", () => {
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("rejects oversized forwarded request bodies before contacting the target", async () => {
|
||||
const target = await startTargetServer();
|
||||
proxy = await startMatrixQaFaultProxy({
|
||||
maxRequestBytes: 4,
|
||||
targetBaseUrl: target.baseUrl,
|
||||
rules: [],
|
||||
});
|
||||
|
||||
const rejected = await fetch(`${proxy.baseUrl}/_matrix/client/v3/send`, {
|
||||
body: "12345",
|
||||
method: "POST",
|
||||
});
|
||||
|
||||
expect(rejected.status).toBe(413);
|
||||
await expect(rejected.json()).resolves.toMatchObject({
|
||||
errcode: "MATRIX_QA_FAULT_PROXY_REQUEST_TOO_LARGE",
|
||||
});
|
||||
expect(target.requests).toEqual([]);
|
||||
});
|
||||
|
||||
it("rejects oversized forwarded Matrix responses without buffering the full body", async () => {
|
||||
const target = await startTargetServer({ responseBody: JSON.stringify({ payload: "large" }) });
|
||||
proxy = await startMatrixQaFaultProxy({
|
||||
maxResponseBytes: 8,
|
||||
targetBaseUrl: target.baseUrl,
|
||||
rules: [],
|
||||
});
|
||||
|
||||
const rejected = await fetch(`${proxy.baseUrl}/_matrix/client/v3/sync`);
|
||||
|
||||
expect(rejected.status).toBe(502);
|
||||
await expect(rejected.json()).resolves.toMatchObject({
|
||||
errcode: "MATRIX_QA_FAULT_PROXY_RESPONSE_TOO_LARGE",
|
||||
});
|
||||
expect(target.requests).toEqual([
|
||||
{
|
||||
body: "",
|
||||
method: "GET",
|
||||
url: "/_matrix/client/v3/sync",
|
||||
},
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,8 +4,12 @@ import {
|
||||
type IncomingMessage,
|
||||
type ServerResponse,
|
||||
} from "node:http";
|
||||
import { readResponseWithLimit } from "openclaw/plugin-sdk/response-limit-runtime";
|
||||
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
|
||||
|
||||
const DEFAULT_FAULT_PROXY_REQUEST_MAX_BYTES = 16 * 1024 * 1024;
|
||||
const DEFAULT_FAULT_PROXY_RESPONSE_MAX_BYTES = 16 * 1024 * 1024;
|
||||
|
||||
const HOP_BY_HOP_HEADERS = new Set([
|
||||
"connection",
|
||||
"content-length",
|
||||
@@ -38,6 +42,17 @@ type MatrixQaFaultProxyForwardedResponse = {
|
||||
status: number;
|
||||
};
|
||||
|
||||
class MatrixQaFaultProxyHttpError extends Error {
|
||||
constructor(
|
||||
readonly status: number,
|
||||
readonly code: string,
|
||||
message: string,
|
||||
) {
|
||||
super(message);
|
||||
this.name = "MatrixQaFaultProxyHttpError";
|
||||
}
|
||||
}
|
||||
|
||||
export type MatrixQaFaultProxyRule = {
|
||||
id: string;
|
||||
match(request: MatrixQaFaultProxyRequest): boolean;
|
||||
@@ -87,12 +102,122 @@ function buildFetchHeaders(headers: IncomingHttpHeaders) {
|
||||
return result;
|
||||
}
|
||||
|
||||
async function readRequestBody(req: IncomingMessage) {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) {
|
||||
chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk);
|
||||
function normalizeByteChunk(chunk: string | Buffer): Buffer {
|
||||
return typeof chunk === "string" ? Buffer.from(chunk) : chunk;
|
||||
}
|
||||
|
||||
function rejectOversizedRequestBody(maxBytes: number, size: number) {
|
||||
return new MatrixQaFaultProxyHttpError(
|
||||
413,
|
||||
"MATRIX_QA_FAULT_PROXY_REQUEST_TOO_LARGE",
|
||||
`Matrix QA fault proxy request body exceeds ${maxBytes} bytes (got at least ${size})`,
|
||||
);
|
||||
}
|
||||
|
||||
function rejectAbortedRequestBody() {
|
||||
return new MatrixQaFaultProxyHttpError(
|
||||
400,
|
||||
"MATRIX_QA_FAULT_PROXY_REQUEST_ABORTED",
|
||||
"Matrix QA fault proxy request body ended before upload completed",
|
||||
);
|
||||
}
|
||||
|
||||
function drainRejectedRequestBody(req: IncomingMessage) {
|
||||
const onError = () => undefined;
|
||||
const onClose = () => {
|
||||
req.off("error", onError);
|
||||
};
|
||||
req.on("error", onError);
|
||||
req.once("close", onClose);
|
||||
req.resume();
|
||||
}
|
||||
|
||||
async function readRequestBody(req: IncomingMessage, maxBytes: number) {
|
||||
const contentLength = normalizeHeaderValue(req.headers["content-length"]);
|
||||
if (contentLength !== undefined) {
|
||||
const size = Number(contentLength);
|
||||
if (Number.isFinite(size) && size > maxBytes) {
|
||||
drainRejectedRequestBody(req);
|
||||
throw rejectOversizedRequestBody(maxBytes, size);
|
||||
}
|
||||
}
|
||||
return Buffer.concat(chunks);
|
||||
|
||||
return await new Promise<Buffer>((resolve, reject) => {
|
||||
const chunks: Buffer[] = [];
|
||||
let total = 0;
|
||||
let settled = false;
|
||||
|
||||
const cleanup = () => {
|
||||
req.off("data", onData);
|
||||
req.off("end", onEnd);
|
||||
req.off("error", onError);
|
||||
req.off("aborted", onAborted);
|
||||
req.off("close", onClose);
|
||||
};
|
||||
const stopReading = () => {
|
||||
req.off("data", onData);
|
||||
req.off("end", onEnd);
|
||||
req.off("aborted", onAborted);
|
||||
};
|
||||
const settleReject = (error: Error, options?: { drain?: boolean }) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
if (options?.drain) {
|
||||
stopReading();
|
||||
req.resume();
|
||||
} else {
|
||||
cleanup();
|
||||
}
|
||||
reject(error);
|
||||
};
|
||||
const onData = (chunk: string | Buffer) => {
|
||||
const buffer = normalizeByteChunk(chunk);
|
||||
const nextTotal = total + buffer.byteLength;
|
||||
if (nextTotal > maxBytes) {
|
||||
settleReject(rejectOversizedRequestBody(maxBytes, nextTotal), { drain: true });
|
||||
return;
|
||||
}
|
||||
chunks.push(buffer);
|
||||
total = nextTotal;
|
||||
};
|
||||
const onEnd = () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
cleanup();
|
||||
resolve(Buffer.concat(chunks, total));
|
||||
};
|
||||
const onError = (error: Error) => {
|
||||
if (settled) {
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
settleReject(error);
|
||||
};
|
||||
const onAborted = () => {
|
||||
settleReject(rejectAbortedRequestBody());
|
||||
};
|
||||
const onClose = () => {
|
||||
if (settled) {
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
if (!req.complete) {
|
||||
settleReject(rejectAbortedRequestBody());
|
||||
return;
|
||||
}
|
||||
cleanup();
|
||||
};
|
||||
|
||||
req.on("data", onData);
|
||||
req.once("end", onEnd);
|
||||
req.once("error", onError);
|
||||
req.once("aborted", onAborted);
|
||||
req.once("close", onClose);
|
||||
});
|
||||
}
|
||||
|
||||
function bufferToArrayBuffer(buffer: Buffer) {
|
||||
@@ -113,6 +238,7 @@ function writeJsonResponse(res: ServerResponse, response: MatrixQaFaultProxyResp
|
||||
|
||||
async function forwardMatrixQaFaultProxyRequest(params: {
|
||||
body: Buffer;
|
||||
maxResponseBytes: number;
|
||||
req: IncomingMessage;
|
||||
targetUrl: URL;
|
||||
}): Promise<MatrixQaFaultProxyForwardedResponse> {
|
||||
@@ -133,7 +259,14 @@ async function forwardMatrixQaFaultProxyRequest(params: {
|
||||
});
|
||||
try {
|
||||
return {
|
||||
body: Buffer.from(await response.arrayBuffer()),
|
||||
body: await readResponseWithLimit(response, params.maxResponseBytes, {
|
||||
onOverflow: ({ size }) =>
|
||||
new MatrixQaFaultProxyHttpError(
|
||||
502,
|
||||
"MATRIX_QA_FAULT_PROXY_RESPONSE_TOO_LARGE",
|
||||
`Matrix QA fault proxy upstream response exceeds ${params.maxResponseBytes} bytes (got at least ${size})`,
|
||||
),
|
||||
}),
|
||||
headers: response.headers,
|
||||
status: response.status,
|
||||
};
|
||||
@@ -157,10 +290,14 @@ function writeForwardedResponse(
|
||||
}
|
||||
|
||||
export async function startMatrixQaFaultProxy(params: {
|
||||
maxRequestBytes?: number;
|
||||
maxResponseBytes?: number;
|
||||
rules: MatrixQaFaultProxyRule[];
|
||||
targetBaseUrl: string;
|
||||
}): Promise<MatrixQaFaultProxy> {
|
||||
const targetBaseUrl = new URL(params.targetBaseUrl);
|
||||
const maxRequestBytes = params.maxRequestBytes ?? DEFAULT_FAULT_PROXY_REQUEST_MAX_BYTES;
|
||||
const maxResponseBytes = params.maxResponseBytes ?? DEFAULT_FAULT_PROXY_RESPONSE_MAX_BYTES;
|
||||
const hits: MatrixQaFaultProxyHit[] = [];
|
||||
const server = createServer(async (req, res) => {
|
||||
try {
|
||||
@@ -174,7 +311,7 @@ export async function startMatrixQaFaultProxy(params: {
|
||||
path,
|
||||
search: requestUrl.search,
|
||||
};
|
||||
const body = await readRequestBody(req);
|
||||
const body = await readRequestBody(req, maxRequestBytes);
|
||||
const rule = params.rules.find((candidate) => candidate.match(request));
|
||||
if (rule) {
|
||||
hits.push({
|
||||
@@ -189,6 +326,7 @@ export async function startMatrixQaFaultProxy(params: {
|
||||
}
|
||||
const forwarded = await forwardMatrixQaFaultProxyRequest({
|
||||
body,
|
||||
maxResponseBytes,
|
||||
req,
|
||||
targetUrl: requestUrl,
|
||||
});
|
||||
@@ -201,6 +339,17 @@ export async function startMatrixQaFaultProxy(params: {
|
||||
: forwarded;
|
||||
writeForwardedResponse(res, response);
|
||||
} catch (error) {
|
||||
if (error instanceof MatrixQaFaultProxyHttpError) {
|
||||
writeJsonResponse(res, {
|
||||
body: {
|
||||
errcode: error.code,
|
||||
error: error.message,
|
||||
},
|
||||
...(error.status === 413 ? { headers: { connection: "close" } } : {}),
|
||||
status: error.status,
|
||||
});
|
||||
return;
|
||||
}
|
||||
writeJsonResponse(res, {
|
||||
body: {
|
||||
errcode: "MATRIX_QA_FAULT_PROXY_ERROR",
|
||||
|
||||
Reference in New Issue
Block a user