From 975340fbd55158b83eb7c4b9b71def1e9dd5d36b Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Fri, 19 Jun 2026 06:09:30 +0200 Subject: [PATCH] fix(audit): cancel stalled advisory body reads --- scripts/pre-commit/pnpm-audit-prod.mjs | 105 +++++++++++-------------- test/scripts/pnpm-audit-prod.test.ts | 40 ++++++++++ 2 files changed, 88 insertions(+), 57 deletions(-) diff --git a/scripts/pre-commit/pnpm-audit-prod.mjs b/scripts/pre-commit/pnpm-audit-prod.mjs index d68b6c9078a..bce1f9208af 100644 --- a/scripts/pre-commit/pnpm-audit-prod.mjs +++ b/scripts/pre-commit/pnpm-audit-prod.mjs @@ -5,6 +5,7 @@ import { readFile } from "node:fs/promises"; import path from "node:path"; import process from "node:process"; import { pathToFileURL } from "node:url"; +import { readBoundedResponseText as readBoundedResponseTextWithLimit } from "../lib/bounded-response.mjs"; const DEFAULT_REGISTRY = "https://registry.npmjs.org"; const BULK_ADVISORY_PATH = "/-/npm/v1/security/advisories/bulk"; @@ -710,17 +711,15 @@ function resolveBulkAdvisoryResponseBodyMaxBytes() { async function withBulkAdvisoryTimeout({ label, timeoutMs, run }) { const controller = new AbortController(); let timeout; + const timeoutPromise = new Promise((_resolve, reject) => { + timeout = setTimeout(() => { + const error = new Error(`${label} exceeded timeout of ${timeoutMs}ms`); + controller.abort(error); + reject(error); + }, timeoutMs); + }); try { - return await Promise.race([ - run(controller.signal), - new Promise((_resolve, reject) => { - timeout = setTimeout(() => { - const error = new Error(`${label} exceeded timeout of ${timeoutMs}ms`); - controller.abort(error); - reject(error); - }, timeoutMs); - }), - ]); + return await Promise.race([run({ signal: controller.signal, timeoutPromise }), timeoutPromise]); } finally { if (timeout) { clearTimeout(timeout); @@ -728,51 +727,19 @@ async function withBulkAdvisoryTimeout({ label, timeoutMs, run }) { } } -async function readBoundedResponseText(response, maxBytes, label) { - const rawContentLength = response.headers?.get?.("content-length"); - const contentLength = - rawContentLength && /^\d+$/u.test(rawContentLength) ? Number(rawContentLength) : undefined; - if (Number.isSafeInteger(contentLength) && contentLength > maxBytes) { - await response.body?.cancel().catch(() => undefined); - throw Object.assign(new Error(`${label} exceeded ${maxBytes} bytes`), { code: "ETOOBIG" }); - } - - if (!response.body) { - return ""; - } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - const chunks = []; - let totalBytes = 0; - try { - for (;;) { - const { done, value } = await reader.read(); - if (done) { - const tail = decoder.decode(); - if (tail) { - chunks.push(tail); - } - break; - } - - totalBytes += value.byteLength; - if (totalBytes > maxBytes) { - await reader.cancel().catch(() => undefined); - throw Object.assign(new Error(`${label} exceeded ${maxBytes} bytes`), { code: "ETOOBIG" }); - } - chunks.push(decoder.decode(value, { stream: true })); - } - } finally { - reader.releaseLock(); - } - - return chunks.join(""); +async function readBoundedResponseText(response, maxBytes, label, options = {}) { + return await readBoundedResponseTextWithLimit(response, label, maxBytes, { + signal: options.signal, + timeoutPromise: options.timeoutPromise, + formatTooLargeMessage: (messageLabel, bytes) => `${messageLabel} exceeded ${bytes} bytes`, + createTooLargeError: (message) => Object.assign(new Error(message), { code: "ETOOBIG" }), + }); } export async function readBoundedBulkAdvisoryErrorText( response, maxChars = BULK_ADVISORY_ERROR_BODY_MAX_CHARS, + options = {}, ) { if (!response.body) { return ""; @@ -782,10 +749,24 @@ export async function readBoundedBulkAdvisoryErrorText( const decoder = new TextDecoder(); let text = ""; let truncated = false; + let canceled = false; try { while (text.length <= maxChars) { - const { done, value } = await reader.read(); + const read = reader.read(); + const readWithTimeout = options.timeoutPromise + ? Promise.race([ + read, + options.timeoutPromise.catch((error) => { + canceled = true; + void Promise.resolve() + .then(() => reader.cancel()) + .catch(() => undefined); + throw error; + }), + ]) + : read; + const { done, value } = await readWithTimeout; if (done) { text += decoder.decode(); break; @@ -801,7 +782,7 @@ export async function readBoundedBulkAdvisoryErrorText( } finally { if (truncated) { await reader.cancel().catch(() => undefined); - } else { + } else if (!canceled) { reader.releaseLock(); } } @@ -809,8 +790,13 @@ export async function readBoundedBulkAdvisoryErrorText( return truncated ? `${text}\n[truncated]` : text; } -async function readBulkAdvisoryJson(response, maxBytes) { - const text = await readBoundedResponseText(response, maxBytes, "Bulk advisory response body"); +async function readBulkAdvisoryJson(response, maxBytes, options = {}) { + const text = await readBoundedResponseText( + response, + maxBytes, + "Bulk advisory response body", + options, + ); if (!text.trim()) { throw new Error("Bulk advisory response body was empty"); } @@ -828,7 +814,7 @@ export async function fetchBulkAdvisories({ return await withBulkAdvisoryTimeout({ label: "Bulk advisory request", timeoutMs, - run: async (signal) => { + run: async ({ signal, timeoutPromise }) => { const response = await fetchImpl(url, { method: "POST", headers: { @@ -840,13 +826,18 @@ export async function fetchBulkAdvisories({ }); if (!response.ok) { - const bodyText = await readBoundedBulkAdvisoryErrorText(response); + const bodyText = await readBoundedBulkAdvisoryErrorText(response, undefined, { + timeoutPromise, + }); throw new Error( `Bulk advisory request failed (${response.status} ${response.statusText}): ${bodyText}`, ); } - return await readBulkAdvisoryJson(response, responseBodyMaxBytes); + return await readBulkAdvisoryJson(response, responseBodyMaxBytes, { + signal, + timeoutPromise, + }); }, }); } diff --git a/test/scripts/pnpm-audit-prod.test.ts b/test/scripts/pnpm-audit-prod.test.ts index ce681cc53b0..a36991bcff8 100644 --- a/test/scripts/pnpm-audit-prod.test.ts +++ b/test/scripts/pnpm-audit-prod.test.ts @@ -273,6 +273,46 @@ snapshots: expect(signal?.aborted).toBe(true); }); + it("cancels stalled successful bulk advisory response bodies on request timeout", async () => { + let cancelled = false; + const body = new ReadableStream({ + pull() { + return new Promise(() => {}); + }, + cancel() { + cancelled = true; + }, + }); + const request = fetchBulkAdvisories({ + payload: { axios: ["1.0.0"] }, + timeoutMs: 5, + fetchImpl: async () => new Response(body, { status: 200 }), + }); + + await expect(request).rejects.toThrow(/Bulk advisory request exceeded timeout/u); + expect(cancelled).toBe(true); + }); + + it("cancels stalled failed bulk advisory response bodies on request timeout", async () => { + let cancelled = false; + const body = new ReadableStream({ + pull() { + return new Promise(() => {}); + }, + cancel() { + cancelled = true; + }, + }); + const request = fetchBulkAdvisories({ + payload: { axios: ["1.0.0"] }, + timeoutMs: 5, + fetchImpl: async () => new Response(body, { status: 500, statusText: "Internal Error" }), + }); + + await expect(request).rejects.toThrow(/Bulk advisory request exceeded timeout/u); + expect(cancelled).toBe(true); + }); + it("bounds successful bulk advisory response bodies", async () => { let cancelled = false; const body = new ReadableStream({