diff --git a/CHANGELOG.md b/CHANGELOG.md index 1610b81e2ef..dbed37c7643 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ Docs: https://docs.openclaw.ai - Gateway/Control UI: resolve bundled dashboard assets through symlinked global wrappers and auto-detected package roots, while keeping configured and custom roots on the strict hardlink boundary. (#40385) Thanks @LarytheLord. - Docs/Changelog: correct the contributor credit for the bundled Control UI global-install fix to @LarytheLord. (#40420) Thanks @velvet-shark. - Models/openai-codex GPT-5.4 forward-compat: use the GPT-5.4 1,050,000-token context window and 128,000 max tokens for `openai-codex/gpt-5.4` instead of inheriting stale legacy Codex limits in resolver fallbacks and model listing. (#37876) thanks @yuweuii. +- Telegram/media downloads: time out only stalled body reads so polling recovers from hung file downloads without aborting slow downloads that are still streaming data. (#40098) thanks @tysoncung. ## 2026.3.7 diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts index 3f0ed6d531c..f1e87fc4938 100644 --- a/src/cli/daemon-cli/lifecycle.test.ts +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -36,17 +36,16 @@ const renderGatewayPortHealthDiagnostics = vi.fn(() => ["diag: unhealthy port"]) const renderRestartDiagnostics = vi.fn(() => ["diag: unhealthy runtime"]); const resolveGatewayPort = vi.fn(() => 18789); const findGatewayPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []); -const probeGateway = - vi.fn< - (opts: { - url: string; - auth?: { token?: string; password?: string }; - timeoutMs: number; - }) => Promise<{ - ok: boolean; - configSnapshot: unknown; - }> - >(); +const probeGateway = vi.fn< + (opts: { + url: string; + auth?: { token?: string; password?: string }; + timeoutMs: number; + }) => Promise<{ + ok: boolean; + configSnapshot: unknown; + }> +>(); const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true); const loadConfig = vi.fn(() => ({})); diff --git a/src/media/fetch.test.ts b/src/media/fetch.test.ts index 4802d6b3019..00966e26a34 100644 --- a/src/media/fetch.test.ts +++ b/src/media/fetch.test.ts @@ -12,6 +12,19 @@ function makeStream(chunks: Uint8Array[]) { }); } +function makeStallingFetch(firstChunk: Uint8Array) { + return vi.fn(async () => { + return new Response( + new ReadableStream({ + start(controller) { + controller.enqueue(firstChunk); + }, + }), + { status: 200 }, + ); + }); +} + describe("fetchRemoteMedia", () => { type LookupFn = NonNullable[0]["lookupFn"]>; @@ -54,6 +67,26 @@ describe("fetchRemoteMedia", () => { ).rejects.toThrow("exceeds maxBytes"); }); + it("aborts stalled body reads when idle timeout expires", async () => { + const lookupFn = vi.fn(async () => [ + { address: "93.184.216.34", family: 4 }, + ]) as unknown as LookupFn; + const fetchImpl = makeStallingFetch(new Uint8Array([1, 2])); + + await expect( + fetchRemoteMedia({ + url: "https://example.com/file.bin", + fetchImpl, + lookupFn, + maxBytes: 1024, + readIdleTimeoutMs: 20, + }), + ).rejects.toMatchObject({ + code: "fetch_failed", + name: "MediaFetchError", + }); + }, 5_000); + it("blocks private IP literals before fetching", async () => { const fetchImpl = vi.fn(); await expect( diff --git a/src/media/fetch.ts b/src/media/fetch.ts index 3f2372c0abf..cdd62e4a044 100644 --- a/src/media/fetch.ts +++ b/src/media/fetch.ts @@ -31,6 +31,8 @@ type FetchMediaOptions = { filePathHint?: string; maxBytes?: number; maxRedirects?: number; + /** Abort if the response body stops yielding data for this long (ms). */ + readIdleTimeoutMs?: number; ssrfPolicy?: SsrFPolicy; lookupFn?: LookupFn; }; @@ -87,6 +89,7 @@ export async function fetchRemoteMedia(options: FetchMediaOptions): Promise - new MediaFetchError( - "max_bytes", - `Failed to fetch media from ${res.url || url}: payload exceeds maxBytes ${maxBytes}`, - ), - }) - : Buffer.from(await res.arrayBuffer()); + let buffer: Buffer; + try { + buffer = maxBytes + ? await readResponseWithLimit(res, maxBytes, { + onOverflow: ({ maxBytes, res }) => + new MediaFetchError( + "max_bytes", + `Failed to fetch media from ${res.url || url}: payload exceeds maxBytes ${maxBytes}`, + ), + chunkTimeoutMs: readIdleTimeoutMs, + }) + : Buffer.from(await res.arrayBuffer()); + } catch (err) { + if (err instanceof MediaFetchError) { + throw err; + } + throw new MediaFetchError( + "fetch_failed", + `Failed to fetch media from ${res.url || url}: ${String(err)}`, + ); + } let fileNameFromUrl: string | undefined; try { const parsed = new URL(finalUrl); diff --git a/src/media/read-response-with-limit.test.ts b/src/media/read-response-with-limit.test.ts new file mode 100644 index 00000000000..c4cdcfc4fb3 --- /dev/null +++ b/src/media/read-response-with-limit.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from "vitest"; +import { readResponseWithLimit } from "./read-response-with-limit.js"; + +function makeStream(chunks: Uint8Array[], delayMs?: number) { + return new ReadableStream({ + async start(controller) { + for (const chunk of chunks) { + if (delayMs) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + controller.enqueue(chunk); + } + controller.close(); + }, + }); +} + +function makeStallingStream(initialChunks: Uint8Array[]) { + return new ReadableStream({ + start(controller) { + for (const chunk of initialChunks) { + controller.enqueue(chunk); + } + }, + }); +} + +describe("readResponseWithLimit", () => { + it("reads all chunks within the limit", async () => { + const body = makeStream([new Uint8Array([1, 2]), new Uint8Array([3, 4])]); + const res = new Response(body); + const buf = await readResponseWithLimit(res, 100); + expect(buf).toEqual(Buffer.from([1, 2, 3, 4])); + }); + + it("throws when total exceeds maxBytes", async () => { + const body = makeStream([new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])]); + const res = new Response(body); + await expect(readResponseWithLimit(res, 4)).rejects.toThrow(/too large/i); + }); + + it("calls custom onOverflow", async () => { + const body = makeStream([new Uint8Array(10)]); + const res = new Response(body); + await expect( + readResponseWithLimit(res, 5, { + onOverflow: ({ size, maxBytes }) => new Error(`custom: ${size} > ${maxBytes}`), + }), + ).rejects.toThrow("custom: 10 > 5"); + }); + + it("times out when no new chunk arrives before idle timeout", async () => { + const body = makeStallingStream([new Uint8Array([1, 2])]); + const res = new Response(body); + await expect(readResponseWithLimit(res, 1024, { chunkTimeoutMs: 50 })).rejects.toThrow( + /stalled/i, + ); + }, 5_000); + + it("does not time out while chunks keep arriving", async () => { + const body = makeStream([new Uint8Array([1]), new Uint8Array([2])], 10); + const res = new Response(body); + const buf = await readResponseWithLimit(res, 100, { chunkTimeoutMs: 500 }); + expect(buf).toEqual(Buffer.from([1, 2])); + }); +}); diff --git a/src/media/read-response-with-limit.ts b/src/media/read-response-with-limit.ts index a9ad353f5ea..c9ac52c8035 100644 --- a/src/media/read-response-with-limit.ts +++ b/src/media/read-response-with-limit.ts @@ -1,14 +1,55 @@ +async function readChunkWithIdleTimeout( + reader: ReadableStreamDefaultReader, + chunkTimeoutMs: number, +): Promise> { + let timeoutId: ReturnType | undefined; + let timedOut = false; + + return await new Promise((resolve, reject) => { + const clear = () => { + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + timeoutId = undefined; + } + }; + + timeoutId = setTimeout(() => { + timedOut = true; + clear(); + void reader.cancel().catch(() => undefined); + reject(new Error(`Media download stalled: no data received for ${chunkTimeoutMs}ms`)); + }, chunkTimeoutMs); + + void reader.read().then( + (result) => { + clear(); + if (!timedOut) { + resolve(result); + } + }, + (err) => { + clear(); + if (!timedOut) { + reject(err); + } + }, + ); + }); +} + export async function readResponseWithLimit( res: Response, maxBytes: number, opts?: { onOverflow?: (params: { size: number; maxBytes: number; res: Response }) => Error; + chunkTimeoutMs?: number; }, ): Promise { const onOverflow = opts?.onOverflow ?? ((params: { size: number; maxBytes: number }) => new Error(`Content too large: ${params.size} bytes (limit: ${params.maxBytes} bytes)`)); + const chunkTimeoutMs = opts?.chunkTimeoutMs; const body = res.body; if (!body || typeof body.getReader !== "function") { @@ -24,7 +65,9 @@ export async function readResponseWithLimit( let total = 0; try { while (true) { - const { done, value } = await reader.read(); + const { done, value } = chunkTimeoutMs + ? await readChunkWithIdleTimeout(reader, chunkTimeoutMs) + : await reader.read(); if (done) { break; } diff --git a/src/telegram/bot/delivery.resolve-media.ts b/src/telegram/bot/delivery.resolve-media.ts index e0f8d46abbd..14df1d6e2a8 100644 --- a/src/telegram/bot/delivery.resolve-media.ts +++ b/src/telegram/bot/delivery.resolve-media.ts @@ -100,6 +100,9 @@ function resolveRequiredFetchImpl(proxyFetch?: typeof fetch): typeof fetch { return fetchImpl; } +/** Default idle timeout for Telegram media downloads (30 seconds). */ +const TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS = 30_000; + async function downloadAndSaveTelegramFile(params: { filePath: string; token: string; @@ -113,6 +116,7 @@ async function downloadAndSaveTelegramFile(params: { fetchImpl: params.fetchImpl, filePathHint: params.filePath, maxBytes: params.maxBytes, + readIdleTimeoutMs: TELEGRAM_DOWNLOAD_IDLE_TIMEOUT_MS, ssrfPolicy: TELEGRAM_MEDIA_SSRF_POLICY, }); const originalName = params.telegramFileName ?? fetched.fileName ?? params.filePath;