From 86b0a7ddda8a7461367fcbf6098d2d9b5a8ef0cd Mon Sep 17 00:00:00 2001 From: stainlu Date: Thu, 14 May 2026 09:43:43 +0800 Subject: [PATCH] fix(clawhub): cancel stalled archive body reads --- CHANGELOG.md | 1 + src/infra/clawhub.test.ts | 79 +++++++++++++++++++++++++++++++ src/infra/clawhub.ts | 99 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 176 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6499c9994f2..0a514a95c0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -152,6 +152,7 @@ Docs: https://docs.openclaw.ai - Security/Windows ACL audit: classify Anonymous Logon, Guests, Interactive, Local, and Network SIDs as world-equivalent principals so broadly writable paths stay critical instead of being downgraded to group-writable. Fixes #74350. (#74383) Thanks @dwc1997. - Media-understanding: retry transient remote attachment fetch failures before audio or vision processing, so Discord voice notes are not lost after one network/CDN blip. Fixes #74316. Thanks @vyctorbrzezowski and @gabrielexito-stack. - Control UI: order timestamped live stream and tool items before untimestamped history fallbacks, keeping chat history in visible time order. Fixes #80759. (#81016) Thanks @akrimm702. +- ClawHub: cancel stalled archive body reads for skill, package, and ClawPack downloads instead of leaving installs hanging after headers arrive. Fixes #52073. Refs #80006. Thanks @xinhuagu and @stainlu. - iMessage: stop sending visible `` placeholder text for media-only native image sends while preserving the internal echo key that prevents self-echo duplicate replies. (#81209) Thanks @homer-byte. - Agents/sessions: create configured agent main sessions before first `sessions_send` or gateway send, so agent-to-agent messages no longer fail when the target agent has not started yet. - Control UI/config: discard stale redacted placeholders from form-mode config saves while preserving restorable saved secrets, so unrelated settings changes no longer submit `__OPENCLAW_REDACTED__` as real data. Fixes #60917. Thanks @giodl73-repo and @BunsDev. diff --git a/src/infra/clawhub.test.ts b/src/infra/clawhub.test.ts index 9906968ce20..0e6f4144fed 100644 --- a/src/infra/clawhub.test.ts +++ b/src/infra/clawhub.test.ts @@ -33,6 +33,28 @@ async function expectPathMissing(targetPath: string): Promise { expect((statError as { code?: unknown }).code).toBe("ENOENT"); } +function createStalledBodyResponse(params: { headers: HeadersInit; firstChunk: Uint8Array }): { + response: Response; + cancel: ReturnType; +} { + const cancel = vi.fn(); + const body = new ReadableStream({ + start(controller) { + controller.enqueue(params.firstChunk); + }, + cancel(reason) { + cancel(reason); + }, + }); + return { + response: new Response(body, { + status: 200, + headers: params.headers, + }), + cancel, + }; +} + describe("clawhub helpers", () => { const originalHome = process.env.HOME; @@ -493,6 +515,63 @@ describe("clawhub helpers", () => { ).rejects.toThrow(/Rate limit exceeded$/); }); + it("times out and cancels stalled skill archive body reads", async () => { + const stalled = createStalledBodyResponse({ + firstChunk: new Uint8Array([4]), + headers: { "content-type": "application/zip" }, + }); + + await expect( + downloadClawHubSkillArchive({ + slug: "agentreceipt", + version: "1.0.0", + timeoutMs: 5, + fetchImpl: async () => stalled.response, + }), + ).rejects.toThrow(/skill archive download for agentreceipt body stalled after 5ms/i); + expect(stalled.cancel).toHaveBeenCalledTimes(1); + expect(stalled.cancel.mock.calls[0]?.[0]).toBeInstanceOf(Error); + }); + + it("times out and cancels stalled package archive body reads", async () => { + const stalled = createStalledBodyResponse({ + firstChunk: new Uint8Array([1]), + headers: { "content-type": "application/zip" }, + }); + + await expect( + downloadClawHubPackageArchive({ + name: "@hyf/zai-external-alpha", + version: "0.0.1", + timeoutMs: 5, + fetchImpl: async () => stalled.response, + }), + ).rejects.toThrow( + /package archive download for @hyf\/zai-external-alpha body stalled after 5ms/i, + ); + expect(stalled.cancel).toHaveBeenCalledTimes(1); + expect(stalled.cancel.mock.calls[0]?.[0]).toBeInstanceOf(Error); + }); + + it("times out and cancels stalled ClawPack artifact body reads", async () => { + const stalled = createStalledBodyResponse({ + firstChunk: new Uint8Array([7]), + headers: { "content-type": "application/octet-stream" }, + }); + + await expect( + downloadClawHubPackageArchive({ + name: "demo", + version: "1.2.3", + artifact: "clawpack", + timeoutMs: 5, + fetchImpl: async () => stalled.response, + }), + ).rejects.toThrow(/ClawPack download for demo@1.2.3 body stalled after 5ms/i); + expect(stalled.cancel).toHaveBeenCalledTimes(1); + expect(stalled.cancel.mock.calls[0]?.[0]).toBeInstanceOf(Error); + }); + it("downloads skill archives to sanitized temp paths and cleans them up", async () => { const archive = await downloadClawHubSkillArchive({ slug: "agentreceipt", diff --git a/src/infra/clawhub.ts b/src/infra/clawhub.ts index d8f4a4df59c..ab0b0809710 100644 --- a/src/infra/clawhub.ts +++ b/src/infra/clawhub.ts @@ -636,6 +636,87 @@ async function fetchJson(params: ClawHubRequestParams): Promise { return (await response.json()) as T; } +function buildClawHubBodyTimeoutError(resourceLabel: string, timeoutMs: number): Error { + return new Error(`ClawHub ${resourceLabel} body stalled after ${timeoutMs}ms`); +} + +async function readClawHubBodyChunkWithTimeout(params: { + reader: ReadableStreamDefaultReader; + timeoutMs: number; + resourceLabel: string; +}): Promise> { + return await new Promise((resolve, reject) => { + let timeout: ReturnType | undefined; + let settled = false; + const settle = (fn: () => void) => { + if (settled) { + return; + } + settled = true; + if (timeout !== undefined) { + clearTimeout(timeout); + } + fn(); + }; + + timeout = setTimeout(() => { + const error = buildClawHubBodyTimeoutError(params.resourceLabel, params.timeoutMs); + void params.reader.cancel(error).catch(() => undefined); + settle(() => reject(error)); + }, params.timeoutMs); + + void params.reader.read().then( + (result) => settle(() => resolve(result)), + (error: unknown) => settle(() => reject(error)), + ); + }); +} + +async function readClawHubResponseBytes(params: { + response: Response; + timeoutMs?: number; + resourceLabel: string; +}): Promise { + const timeoutMs = params.timeoutMs ?? DEFAULT_FETCH_TIMEOUT_MS; + const body = params.response.body; + if (!body || typeof body.getReader !== "function") { + return new Uint8Array(await params.response.arrayBuffer()); + } + + const reader = body.getReader(); + const chunks: Uint8Array[] = []; + let total = 0; + try { + while (true) { + const { done, value } = await readClawHubBodyChunkWithTimeout({ + reader, + timeoutMs, + resourceLabel: params.resourceLabel, + }); + if (done) { + break; + } + if (!value?.length) { + continue; + } + chunks.push(value); + total += value.length; + } + } finally { + try { + reader.releaseLock(); + } catch {} + } + + const bytes = new Uint8Array(total); + let offset = 0; + for (const chunk of chunks) { + bytes.set(chunk, offset); + offset += chunk.length; + } + return bytes; +} + export function resolveClawHubBaseUrl(baseUrl?: string): string { return normalizeBaseUrl(baseUrl); } @@ -902,7 +983,11 @@ export async function downloadClawHubPackageArchive(params: { if (!response.ok) { throw await buildClawHubError(response, url, hasToken); } - const bytes = new Uint8Array(await response.arrayBuffer()); + const bytes = await readClawHubResponseBytes({ + response, + timeoutMs: params.timeoutMs, + resourceLabel: `ClawPack download for ${params.name}@${params.version}`, + }); const sha256Hex = formatSha256Hex(bytes); const npmIntegrity = formatSha512Integrity(bytes); const npmShasum = formatSha1Hex(bytes); @@ -977,7 +1062,11 @@ export async function downloadClawHubPackageArchive(params: { if (!response.ok) { throw await buildClawHubError(response, url, hasToken); } - const bytes = new Uint8Array(await response.arrayBuffer()); + const bytes = await readClawHubResponseBytes({ + response, + timeoutMs: params.timeoutMs, + resourceLabel: `package archive download for ${params.name}`, + }); const sha256Hex = formatSha256Hex(bytes); const target = await createTempDownloadTarget({ prefix: "openclaw-clawhub-package", @@ -1018,7 +1107,11 @@ export async function downloadClawHubSkillArchive(params: { if (!response.ok) { throw await buildClawHubError(response, url, hasToken); } - const bytes = new Uint8Array(await response.arrayBuffer()); + const bytes = await readClawHubResponseBytes({ + response, + timeoutMs: params.timeoutMs, + resourceLabel: `skill archive download for ${params.slug}`, + }); const sha256Hex = formatSha256Hex(bytes); const target = await createTempDownloadTarget({ prefix: "openclaw-clawhub-skill",