mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-21 00:34:47 +00:00
fix(clawhub): cancel stalled archive body reads
This commit is contained in:
committed by
Peter Steinberger
parent
817dca5ae9
commit
86b0a7ddda
@@ -152,6 +152,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Security/Windows ACL audit: classify Anonymous Logon, Guests, Interactive, Local, and Network SIDs as world-equivalent principals so broadly writable paths stay critical instead of being downgraded to group-writable. Fixes #74350. (#74383) Thanks @dwc1997.
|
||||
- Media-understanding: retry transient remote attachment fetch failures before audio or vision processing, so Discord voice notes are not lost after one network/CDN blip. Fixes #74316. Thanks @vyctorbrzezowski and @gabrielexito-stack.
|
||||
- Control UI: order timestamped live stream and tool items before untimestamped history fallbacks, keeping chat history in visible time order. Fixes #80759. (#81016) Thanks @akrimm702.
|
||||
- ClawHub: cancel stalled archive body reads for skill, package, and ClawPack downloads instead of leaving installs hanging after headers arrive. Fixes #52073. Refs #80006. Thanks @xinhuagu and @stainlu.
|
||||
- iMessage: stop sending visible `<media:image>` placeholder text for media-only native image sends while preserving the internal echo key that prevents self-echo duplicate replies. (#81209) Thanks @homer-byte.
|
||||
- Agents/sessions: create configured agent main sessions before first `sessions_send` or gateway send, so agent-to-agent messages no longer fail when the target agent has not started yet.
|
||||
- Control UI/config: discard stale redacted placeholders from form-mode config saves while preserving restorable saved secrets, so unrelated settings changes no longer submit `__OPENCLAW_REDACTED__` as real data. Fixes #60917. Thanks @giodl73-repo and @BunsDev.
|
||||
|
||||
@@ -33,6 +33,28 @@ async function expectPathMissing(targetPath: string): Promise<void> {
|
||||
expect((statError as { code?: unknown }).code).toBe("ENOENT");
|
||||
}
|
||||
|
||||
function createStalledBodyResponse(params: { headers: HeadersInit; firstChunk: Uint8Array }): {
|
||||
response: Response;
|
||||
cancel: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
const cancel = vi.fn();
|
||||
const body = new ReadableStream<Uint8Array>({
|
||||
start(controller) {
|
||||
controller.enqueue(params.firstChunk);
|
||||
},
|
||||
cancel(reason) {
|
||||
cancel(reason);
|
||||
},
|
||||
});
|
||||
return {
|
||||
response: new Response(body, {
|
||||
status: 200,
|
||||
headers: params.headers,
|
||||
}),
|
||||
cancel,
|
||||
};
|
||||
}
|
||||
|
||||
describe("clawhub helpers", () => {
|
||||
const originalHome = process.env.HOME;
|
||||
|
||||
@@ -493,6 +515,63 @@ describe("clawhub helpers", () => {
|
||||
).rejects.toThrow(/Rate limit exceeded$/);
|
||||
});
|
||||
|
||||
it("times out and cancels stalled skill archive body reads", async () => {
|
||||
const stalled = createStalledBodyResponse({
|
||||
firstChunk: new Uint8Array([4]),
|
||||
headers: { "content-type": "application/zip" },
|
||||
});
|
||||
|
||||
await expect(
|
||||
downloadClawHubSkillArchive({
|
||||
slug: "agentreceipt",
|
||||
version: "1.0.0",
|
||||
timeoutMs: 5,
|
||||
fetchImpl: async () => stalled.response,
|
||||
}),
|
||||
).rejects.toThrow(/skill archive download for agentreceipt body stalled after 5ms/i);
|
||||
expect(stalled.cancel).toHaveBeenCalledTimes(1);
|
||||
expect(stalled.cancel.mock.calls[0]?.[0]).toBeInstanceOf(Error);
|
||||
});
|
||||
|
||||
it("times out and cancels stalled package archive body reads", async () => {
|
||||
const stalled = createStalledBodyResponse({
|
||||
firstChunk: new Uint8Array([1]),
|
||||
headers: { "content-type": "application/zip" },
|
||||
});
|
||||
|
||||
await expect(
|
||||
downloadClawHubPackageArchive({
|
||||
name: "@hyf/zai-external-alpha",
|
||||
version: "0.0.1",
|
||||
timeoutMs: 5,
|
||||
fetchImpl: async () => stalled.response,
|
||||
}),
|
||||
).rejects.toThrow(
|
||||
/package archive download for @hyf\/zai-external-alpha body stalled after 5ms/i,
|
||||
);
|
||||
expect(stalled.cancel).toHaveBeenCalledTimes(1);
|
||||
expect(stalled.cancel.mock.calls[0]?.[0]).toBeInstanceOf(Error);
|
||||
});
|
||||
|
||||
it("times out and cancels stalled ClawPack artifact body reads", async () => {
|
||||
const stalled = createStalledBodyResponse({
|
||||
firstChunk: new Uint8Array([7]),
|
||||
headers: { "content-type": "application/octet-stream" },
|
||||
});
|
||||
|
||||
await expect(
|
||||
downloadClawHubPackageArchive({
|
||||
name: "demo",
|
||||
version: "1.2.3",
|
||||
artifact: "clawpack",
|
||||
timeoutMs: 5,
|
||||
fetchImpl: async () => stalled.response,
|
||||
}),
|
||||
).rejects.toThrow(/ClawPack download for demo@1.2.3 body stalled after 5ms/i);
|
||||
expect(stalled.cancel).toHaveBeenCalledTimes(1);
|
||||
expect(stalled.cancel.mock.calls[0]?.[0]).toBeInstanceOf(Error);
|
||||
});
|
||||
|
||||
it("downloads skill archives to sanitized temp paths and cleans them up", async () => {
|
||||
const archive = await downloadClawHubSkillArchive({
|
||||
slug: "agentreceipt",
|
||||
|
||||
@@ -636,6 +636,87 @@ async function fetchJson<T>(params: ClawHubRequestParams): Promise<T> {
|
||||
return (await response.json()) as T;
|
||||
}
|
||||
|
||||
function buildClawHubBodyTimeoutError(resourceLabel: string, timeoutMs: number): Error {
|
||||
return new Error(`ClawHub ${resourceLabel} body stalled after ${timeoutMs}ms`);
|
||||
}
|
||||
|
||||
async function readClawHubBodyChunkWithTimeout(params: {
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>;
|
||||
timeoutMs: number;
|
||||
resourceLabel: string;
|
||||
}): Promise<ReadableStreamReadResult<Uint8Array>> {
|
||||
return await new Promise((resolve, reject) => {
|
||||
let timeout: ReturnType<typeof setTimeout> | undefined;
|
||||
let settled = false;
|
||||
const settle = (fn: () => void) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
if (timeout !== undefined) {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
fn();
|
||||
};
|
||||
|
||||
timeout = setTimeout(() => {
|
||||
const error = buildClawHubBodyTimeoutError(params.resourceLabel, params.timeoutMs);
|
||||
void params.reader.cancel(error).catch(() => undefined);
|
||||
settle(() => reject(error));
|
||||
}, params.timeoutMs);
|
||||
|
||||
void params.reader.read().then(
|
||||
(result) => settle(() => resolve(result)),
|
||||
(error: unknown) => settle(() => reject(error)),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async function readClawHubResponseBytes(params: {
|
||||
response: Response;
|
||||
timeoutMs?: number;
|
||||
resourceLabel: string;
|
||||
}): Promise<Uint8Array> {
|
||||
const timeoutMs = params.timeoutMs ?? DEFAULT_FETCH_TIMEOUT_MS;
|
||||
const body = params.response.body;
|
||||
if (!body || typeof body.getReader !== "function") {
|
||||
return new Uint8Array(await params.response.arrayBuffer());
|
||||
}
|
||||
|
||||
const reader = body.getReader();
|
||||
const chunks: Uint8Array[] = [];
|
||||
let total = 0;
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await readClawHubBodyChunkWithTimeout({
|
||||
reader,
|
||||
timeoutMs,
|
||||
resourceLabel: params.resourceLabel,
|
||||
});
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
if (!value?.length) {
|
||||
continue;
|
||||
}
|
||||
chunks.push(value);
|
||||
total += value.length;
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
reader.releaseLock();
|
||||
} catch {}
|
||||
}
|
||||
|
||||
const bytes = new Uint8Array(total);
|
||||
let offset = 0;
|
||||
for (const chunk of chunks) {
|
||||
bytes.set(chunk, offset);
|
||||
offset += chunk.length;
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
export function resolveClawHubBaseUrl(baseUrl?: string): string {
|
||||
return normalizeBaseUrl(baseUrl);
|
||||
}
|
||||
@@ -902,7 +983,11 @@ export async function downloadClawHubPackageArchive(params: {
|
||||
if (!response.ok) {
|
||||
throw await buildClawHubError(response, url, hasToken);
|
||||
}
|
||||
const bytes = new Uint8Array(await response.arrayBuffer());
|
||||
const bytes = await readClawHubResponseBytes({
|
||||
response,
|
||||
timeoutMs: params.timeoutMs,
|
||||
resourceLabel: `ClawPack download for ${params.name}@${params.version}`,
|
||||
});
|
||||
const sha256Hex = formatSha256Hex(bytes);
|
||||
const npmIntegrity = formatSha512Integrity(bytes);
|
||||
const npmShasum = formatSha1Hex(bytes);
|
||||
@@ -977,7 +1062,11 @@ export async function downloadClawHubPackageArchive(params: {
|
||||
if (!response.ok) {
|
||||
throw await buildClawHubError(response, url, hasToken);
|
||||
}
|
||||
const bytes = new Uint8Array(await response.arrayBuffer());
|
||||
const bytes = await readClawHubResponseBytes({
|
||||
response,
|
||||
timeoutMs: params.timeoutMs,
|
||||
resourceLabel: `package archive download for ${params.name}`,
|
||||
});
|
||||
const sha256Hex = formatSha256Hex(bytes);
|
||||
const target = await createTempDownloadTarget({
|
||||
prefix: "openclaw-clawhub-package",
|
||||
@@ -1018,7 +1107,11 @@ export async function downloadClawHubSkillArchive(params: {
|
||||
if (!response.ok) {
|
||||
throw await buildClawHubError(response, url, hasToken);
|
||||
}
|
||||
const bytes = new Uint8Array(await response.arrayBuffer());
|
||||
const bytes = await readClawHubResponseBytes({
|
||||
response,
|
||||
timeoutMs: params.timeoutMs,
|
||||
resourceLabel: `skill archive download for ${params.slug}`,
|
||||
});
|
||||
const sha256Hex = formatSha256Hex(bytes);
|
||||
const target = await createTempDownloadTarget({
|
||||
prefix: "openclaw-clawhub-skill",
|
||||
|
||||
Reference in New Issue
Block a user