mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 14:44:05 +00:00
refactor: share bounded response reader
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import { readBoundedResponseText } from "../lib/bounded-response.ts";
|
||||
|
||||
type JsonObject = Record<string, unknown>;
|
||||
|
||||
type TelegramBotApiOptions = {
|
||||
@@ -32,46 +34,6 @@ function taggedError(message: string, code: string) {
|
||||
return Object.assign(new Error(message), { code });
|
||||
}
|
||||
|
||||
async function readBoundedResponseText(
|
||||
response: Response,
|
||||
label: string,
|
||||
byteLimit: number,
|
||||
timeoutPromise: Promise<never>,
|
||||
) {
|
||||
const contentLength = response.headers.get("content-length");
|
||||
if (contentLength) {
|
||||
const parsedLength = Number(contentLength);
|
||||
if (Number.isSafeInteger(parsedLength) && parsedLength > byteLimit) {
|
||||
await response.body?.cancel().catch(() => {});
|
||||
throw taggedError(`${label} response body exceeded ${byteLimit} bytes`, "ETOOBIG");
|
||||
}
|
||||
}
|
||||
if (!response.body) {
|
||||
return "";
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let byteCount = 0;
|
||||
let text = "";
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await Promise.race([reader.read(), timeoutPromise]);
|
||||
if (done) {
|
||||
return text + decoder.decode();
|
||||
}
|
||||
byteCount += value.byteLength;
|
||||
if (byteCount > byteLimit) {
|
||||
await reader.cancel().catch(() => {});
|
||||
throw taggedError(`${label} response body exceeded ${byteLimit} bytes`, "ETOOBIG");
|
||||
}
|
||||
text += decoder.decode(value, { stream: true });
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
function parseJsonPayload(rawPayload: string, label: string) {
|
||||
try {
|
||||
return JSON.parse(rawPayload) as JsonObject;
|
||||
@@ -111,7 +73,12 @@ export async function telegramBotApi(
|
||||
}),
|
||||
timeoutPromise,
|
||||
]);
|
||||
const rawPayload = await readBoundedResponseText(response, label, maxBodyBytes, timeoutPromise);
|
||||
const rawPayload = await readBoundedResponseText(response, label, maxBodyBytes, {
|
||||
createTooLargeError(message) {
|
||||
return taggedError(message, "ETOOBIG");
|
||||
},
|
||||
timeoutPromise,
|
||||
});
|
||||
const payload = parseJsonPayload(rawPayload, label);
|
||||
if (!response.ok || payload.ok !== true) {
|
||||
throw new Error(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { readBoundedResponseText } from "../lib/bounded-response.ts";
|
||||
|
||||
export type JsonObject = Record<string, unknown>;
|
||||
|
||||
@@ -138,46 +139,6 @@ export function runCommand(
|
||||
});
|
||||
}
|
||||
|
||||
async function readBoundedResponseText(
|
||||
response: Response,
|
||||
label: string,
|
||||
byteLimit: number,
|
||||
timeoutPromise: Promise<never>,
|
||||
) {
|
||||
const contentLength = response.headers.get("content-length");
|
||||
if (contentLength) {
|
||||
const parsedLength = Number(contentLength);
|
||||
if (Number.isSafeInteger(parsedLength) && parsedLength > byteLimit) {
|
||||
await response.body?.cancel().catch(() => {});
|
||||
throw bodyTooLargeError(`${label} response body exceeded ${byteLimit} bytes`);
|
||||
}
|
||||
}
|
||||
if (!response.body) {
|
||||
return "";
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let byteCount = 0;
|
||||
let text = "";
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await Promise.race([reader.read(), timeoutPromise]);
|
||||
if (done) {
|
||||
return text + decoder.decode();
|
||||
}
|
||||
byteCount += value.byteLength;
|
||||
if (byteCount > byteLimit) {
|
||||
await reader.cancel().catch(() => {});
|
||||
throw bodyTooLargeError(`${label} response body exceeded ${byteLimit} bytes`);
|
||||
}
|
||||
text += decoder.decode(value, { stream: true });
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
export async function fetchJsonWithTimeout(params: FetchJsonParams) {
|
||||
const timeoutMs = Math.max(1, params.timeoutMs);
|
||||
const maxBodyBytes = resolveFetchBodyLimit(params.maxBodyBytes);
|
||||
@@ -200,12 +161,10 @@ export async function fetchJsonWithTimeout(params: FetchJsonParams) {
|
||||
}),
|
||||
timeoutPromise,
|
||||
]);
|
||||
const rawPayload = await readBoundedResponseText(
|
||||
response,
|
||||
params.label,
|
||||
maxBodyBytes,
|
||||
const rawPayload = await readBoundedResponseText(response, params.label, maxBodyBytes, {
|
||||
createTooLargeError: bodyTooLargeError,
|
||||
timeoutPromise,
|
||||
);
|
||||
});
|
||||
const payload = JSON.parse(rawPayload) as JsonObject;
|
||||
return { payload, response };
|
||||
} finally {
|
||||
|
||||
@@ -1,11 +1,27 @@
|
||||
type BoundedResponseTextOptions = {
|
||||
createTooLargeError?: (message: string) => Error;
|
||||
formatTooLargeMessage?: (label: string, maxBytes: number) => string;
|
||||
timeoutPromise?: Promise<never>;
|
||||
};
|
||||
|
||||
const defaultTooLargeMessage = (label: string, maxBytes: number) =>
|
||||
`${label} response body exceeded ${maxBytes} bytes`;
|
||||
|
||||
const defaultTooLargeError = (message: string) => new Error(`${message}.`);
|
||||
|
||||
export async function readBoundedResponseText(
|
||||
response: Response,
|
||||
label: string,
|
||||
maxBytes: number,
|
||||
options: BoundedResponseTextOptions = {},
|
||||
): Promise<string> {
|
||||
const contentLength = Number.parseInt(response.headers.get("content-length") ?? "", 10);
|
||||
if (Number.isFinite(contentLength) && contentLength > maxBytes) {
|
||||
throw new Error(`${label} response body exceeded ${maxBytes} bytes.`);
|
||||
const formatTooLargeMessage = options.formatTooLargeMessage ?? defaultTooLargeMessage;
|
||||
const createTooLargeError = options.createTooLargeError ?? defaultTooLargeError;
|
||||
const tooLargeError = () => createTooLargeError(formatTooLargeMessage(label, maxBytes));
|
||||
const contentLength = Number(response.headers.get("content-length") ?? "");
|
||||
if (Number.isSafeInteger(contentLength) && contentLength > maxBytes) {
|
||||
await response.body?.cancel().catch(() => undefined);
|
||||
throw tooLargeError();
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
@@ -16,11 +32,12 @@ export async function readBoundedResponseText(
|
||||
const decoder = new TextDecoder();
|
||||
const chunks: string[] = [];
|
||||
let totalBytes = 0;
|
||||
let canceled = false;
|
||||
|
||||
try {
|
||||
for (;;) {
|
||||
const { done, value } = await reader.read();
|
||||
const { done, value } = await (options.timeoutPromise
|
||||
? Promise.race([reader.read(), options.timeoutPromise])
|
||||
: reader.read());
|
||||
if (done) {
|
||||
const tail = decoder.decode();
|
||||
if (tail) {
|
||||
@@ -31,16 +48,13 @@ export async function readBoundedResponseText(
|
||||
|
||||
totalBytes += value.byteLength;
|
||||
if (totalBytes > maxBytes) {
|
||||
canceled = true;
|
||||
await reader.cancel().catch(() => undefined);
|
||||
throw new Error(`${label} response body exceeded ${maxBytes} bytes.`);
|
||||
throw tooLargeError();
|
||||
}
|
||||
chunks.push(decoder.decode(value, { stream: true }));
|
||||
}
|
||||
} finally {
|
||||
if (!canceled) {
|
||||
reader.releaseLock();
|
||||
}
|
||||
reader.releaseLock();
|
||||
}
|
||||
|
||||
return chunks.join("");
|
||||
|
||||
@@ -9,6 +9,7 @@ import { stageQaMockAuthProfiles } from "../extensions/qa-lab/src/providers/shar
|
||||
import { buildQaGatewayConfig } from "../extensions/qa-lab/src/qa-gateway-config.js";
|
||||
import { resetConfigRuntimeState } from "../src/config/config.js";
|
||||
import { startGatewayServer } from "../src/gateway/server.js";
|
||||
import { readBoundedResponseText } from "./lib/bounded-response.ts";
|
||||
|
||||
type Lane = "normal" | "code";
|
||||
|
||||
@@ -56,50 +57,8 @@ function timeoutError(message: string) {
|
||||
return Object.assign(new Error(message), { code: "ETIMEDOUT" });
|
||||
}
|
||||
|
||||
function bodyTooLargeError(url: string, byteLimit: number) {
|
||||
return Object.assign(new Error(`HTTP response from ${url} exceeded ${byteLimit} bytes`), {
|
||||
code: "ETOOBIG",
|
||||
});
|
||||
}
|
||||
|
||||
async function readBoundedResponseText(
|
||||
response: Response,
|
||||
url: string,
|
||||
byteLimit: number,
|
||||
timeoutPromise: Promise<never>,
|
||||
) {
|
||||
const contentLength = response.headers.get("content-length");
|
||||
if (contentLength) {
|
||||
const parsedLength = Number(contentLength);
|
||||
if (Number.isSafeInteger(parsedLength) && parsedLength > byteLimit) {
|
||||
await response.body?.cancel().catch(() => {});
|
||||
throw bodyTooLargeError(url, byteLimit);
|
||||
}
|
||||
}
|
||||
if (!response.body) {
|
||||
return "";
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let byteCount = 0;
|
||||
let text = "";
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await Promise.race([reader.read(), timeoutPromise]);
|
||||
if (done) {
|
||||
return text + decoder.decode();
|
||||
}
|
||||
byteCount += value.byteLength;
|
||||
if (byteCount > byteLimit) {
|
||||
await reader.cancel().catch(() => {});
|
||||
throw bodyTooLargeError(url, byteLimit);
|
||||
}
|
||||
text += decoder.decode(value, { stream: true });
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
function bodyTooLargeErrorMessage(url: string, byteLimit: number) {
|
||||
return `HTTP response from ${url} exceeded ${byteLimit} bytes`;
|
||||
}
|
||||
|
||||
async function freePort(): Promise<number> {
|
||||
@@ -208,7 +167,13 @@ export async function fetchJson(
|
||||
}),
|
||||
timeoutPromise,
|
||||
]);
|
||||
text = await readBoundedResponseText(response, url, maxBodyBytes, timeoutPromise);
|
||||
text = await readBoundedResponseText(response, url, maxBodyBytes, {
|
||||
createTooLargeError(message) {
|
||||
return Object.assign(new Error(message), { code: "ETOOBIG" });
|
||||
},
|
||||
formatTooLargeMessage: bodyTooLargeErrorMessage,
|
||||
timeoutPromise,
|
||||
});
|
||||
} finally {
|
||||
if (timeout) {
|
||||
clearTimeout(timeout);
|
||||
|
||||
Reference in New Issue
Block a user