From bd20f8e07e92038e44a96a7a4c92e5d4f1e296a6 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 1 May 2026 02:49:02 +0100 Subject: [PATCH] fix(discord): harden rate limit retries (#75338) * fix(discord): harden rate limit retries * fix(discord): guard voice upload fetches * fix(discord): avoid stale rate limit requeues --- CHANGELOG.md | 1 + .../discord/src/internal/rest-errors.ts | 39 ++-- .../discord/src/internal/rest-scheduler.ts | 47 ++++- extensions/discord/src/internal/rest.test.ts | 185 ++++++++++++++++++ extensions/discord/src/internal/rest.ts | 3 +- .../discord/src/send.webhook.proxy.test.ts | 59 ++++++ extensions/discord/src/send.webhook.ts | 40 +++- extensions/discord/src/voice-message.test.ts | 153 ++++++++++++++- extensions/discord/src/voice-message.ts | 175 +++++++++++------ 9 files changed, 620 insertions(+), 82 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8871b41745e..0a869a10ef3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai - Agents/commitments: keep inferred follow-ups internal when heartbeat target is none, strip raw source text from stored commitments, disable tools during due-commitment heartbeat turns, bound hidden extraction queue growth, expire stale commitments, and add QA/Docker safety coverage. Thanks @vignesh07. - Plugins/runtime-deps: accept already materialized package-level runtime-deps supersets as converged, so later lazy plugin activation no longer prunes and relaunches `pnpm install` after gateway startup pre-staging, reducing event-loop pressure from repeated runtime-deps repair on packaged installs. Fixes #75283; refs #75297 and #72338. Thanks @brokemac79, @lisandromachado, and @midhunmonachan. +- Discord: retry queued REST 429s against learned bucket/global cooldowns and reacquire fresh voice upload URLs after CDN upload rate limits, so outbound sends recover without reusing stale single-use upload URLs. Thanks @discord. - TTS/providers: keep bundled speech-provider compat fallback available when plugins are globally disabled, so cold gateway and CLI startup can still resolve fallback speech providers instead of leaving explicit TTS provider selection with no registered providers. Refs #75265. Thanks @sliekens. - Discord: collapse repeated native slash-command deploy rate-limit startup logs into one non-fatal warning while keeping per-request REST timing in verbose output. Thanks @discord. - Providers/OpenAI Codex: preserve existing wrapped Codex streams during OpenAI attribution so PI OAuth bearer injection reaches ChatGPT/Codex Responses, and strip native Codex-only unsupported payload fields without touching custom compatible endpoints. (#75111) Thanks @keshavbotagent. diff --git a/extensions/discord/src/internal/rest-errors.ts b/extensions/discord/src/internal/rest-errors.ts index 641ed98b71f..7bf6873e294 100644 --- a/extensions/discord/src/internal/rest-errors.ts +++ b/extensions/discord/src/internal/rest-errors.ts @@ -20,21 +20,36 @@ export function readDiscordMessage(body: unknown, fallback: string): string { return typeof value === "string" && value.trim() ? value : fallback; } -export function readRetryAfter(body: unknown, response: Response): number { +function readRetryAfterHeader(value: string | null, now = Date.now()): number | undefined { + if (!value) { + return undefined; + } + const seconds = Number(value); + if (Number.isFinite(seconds)) { + return seconds; + } + const retryAt = Date.parse(value); + return Number.isFinite(retryAt) ? (retryAt - now) / 1000 : undefined; +} + +function coerceRetryAfterSeconds(value: unknown): number | undefined { + if (typeof value !== "number" && typeof value !== "string") { + return undefined; + } + const seconds = typeof value === "number" ? value : Number(value); + return Number.isFinite(seconds) && seconds >= 0 ? Math.max(0, seconds) : undefined; +} + +export function readRetryAfter(body: unknown, response: Response, fallbackSeconds = 0): number { const bodyValue = body && typeof body === "object" && "retry_after" in body ? (body as { retry_after?: unknown }).retry_after : undefined; - const headerValue = response.headers.get("Retry-After"); - const seconds = - typeof bodyValue === "number" - ? bodyValue - : typeof bodyValue === "string" - ? Number(bodyValue) - : headerValue - ? Number(headerValue) - : 0; - return Number.isFinite(seconds) && seconds > 0 ? seconds : 0; + return ( + coerceRetryAfterSeconds(bodyValue) ?? + coerceRetryAfterSeconds(readRetryAfterHeader(response.headers.get("Retry-After"))) ?? + fallbackSeconds + ); } export class DiscordError extends Error { @@ -66,7 +81,7 @@ export class RateLimitError extends DiscordError { ) { super(response, body); this.name = "RateLimitError"; - this.retryAfter = readRetryAfter(body, response); + this.retryAfter = readRetryAfter(body, response, 1); this.scope = body.global ? "global" : response.headers.get("X-RateLimit-Scope"); this.bucket = response.headers.get("X-RateLimit-Bucket"); } diff --git a/extensions/discord/src/internal/rest-scheduler.ts b/extensions/discord/src/internal/rest-scheduler.ts index 2ca4d395276..23ecc6789c1 100644 --- a/extensions/discord/src/internal/rest-scheduler.ts +++ b/extensions/discord/src/internal/rest-scheduler.ts @@ -1,4 +1,4 @@ -import { readRetryAfter } from "./rest-errors.js"; +import { RateLimitError, readRetryAfter } from "./rest-errors.js"; import { createBucketKey, createRouteKey, readHeaderNumber, readResetAt } from "./rest-routes.js"; export type RequestQuery = Record; @@ -6,8 +6,10 @@ export type ScheduledRequest = { method: string; path: string; data?: TData; + generation: number; query?: RequestQuery; routeKey: string; + retryCount: number; resolve: (value?: unknown) => void; reject: (reason?: unknown) => void; }; @@ -26,6 +28,7 @@ type BucketState = { export type RestSchedulerOptions = { maxConcurrency: number; + maxRateLimitRetries: number; maxQueueSize: number; }; @@ -37,6 +40,7 @@ export class RestScheduler { private drainTimer: NodeJS.Timeout | undefined; private globalRateLimitUntil = 0; private invalidRequestTimestamps: Array<{ at: number; status: number }> = []; + private queueGeneration = 0; private queuedRequests = 0; private routeBuckets = new Map(); @@ -58,7 +62,14 @@ export class RestScheduler { const bucket = this.getBucket(this.routeBuckets.get(routeKey) ?? routeKey); return new Promise((resolve, reject) => { this.queuedRequests += 1; - bucket.pending.push({ ...params, routeKey, resolve, reject }); + bucket.pending.push({ + ...params, + generation: this.queueGeneration, + routeKey, + retryCount: 0, + resolve, + reject, + }); this.drainQueues(); }); } @@ -69,6 +80,7 @@ export class RestScheduler { } clearQueue(): void { + this.queueGeneration += 1; if (this.drainTimer) { clearTimeout(this.drainTimer); this.drainTimer = undefined; @@ -77,6 +89,7 @@ export class RestScheduler { } abortPending(): void { + this.queueGeneration += 1; this.rejectPending(new DOMException("Aborted", "AbortError")); } @@ -119,6 +132,10 @@ export class RestScheduler { return Math.max(1, Math.floor(this.options.maxConcurrency)); } + private get maxRateLimitRetries(): number { + return Math.max(0, Math.floor(this.options.maxRateLimitRetries)); + } + private getBucket(key: string): BucketState { const existing = this.buckets.get(key); if (existing) { @@ -220,7 +237,7 @@ export class RestScheduler { return; } bucket.rateLimitHits += 1; - const retryAfterMs = Math.max(0, readRetryAfter(parsed, response) * 1000); + const retryAfterMs = Math.max(0, readRetryAfter(parsed, response, 1) * 1000); const retryAt = Date.now() + retryAfterMs; if (response.headers.get("X-RateLimit-Global") === "true" || isGlobalRateLimit(parsed)) { this.globalRateLimitUntil = Math.max(this.globalRateLimitUntil, retryAt); @@ -337,14 +354,21 @@ export class RestScheduler { queued: ScheduledRequest, bucket: BucketState, ): Promise { + let requeued = false; try { queued.resolve(await this.executor(queued)); } catch (error) { + if (error instanceof RateLimitError && this.requeueRateLimitedRequest(queued)) { + requeued = true; + return; + } queued.reject(error); } finally { bucket.active = Math.max(0, bucket.active - 1); this.activeWorkers = Math.max(0, this.activeWorkers - 1); - this.queuedRequests = Math.max(0, this.queuedRequests - 1); + if (!requeued) { + this.queuedRequests = Math.max(0, this.queuedRequests - 1); + } if (bucket.active === 0 && bucket.pending.length === 0) { for (const routeKey of bucket.routeKeys) { if (this.routeBuckets.get(routeKey) === routeKey) { @@ -356,6 +380,21 @@ export class RestScheduler { } } + private requeueRateLimitedRequest(queued: ScheduledRequest): boolean { + if ( + queued.generation !== this.queueGeneration || + queued.retryCount >= this.maxRateLimitRetries + ) { + return false; + } + const bucketKey = this.routeBuckets.get(queued.routeKey) ?? queued.routeKey; + this.getBucket(bucketKey).pending.push({ + ...queued, + retryCount: queued.retryCount + 1, + }); + return true; + } + private rejectPending(error: Error | DOMException): void { for (const bucket of this.buckets.values()) { for (const queued of bucket.pending.splice(0)) { diff --git a/extensions/discord/src/internal/rest.test.ts b/extensions/discord/src/internal/rest.test.ts index f2cd5503e5d..f7adc6ea81a 100644 --- a/extensions/discord/src/internal/rest.test.ts +++ b/extensions/discord/src/internal/rest.test.ts @@ -153,6 +153,154 @@ describe("RequestClient", () => { expect(fetchSpy).toHaveBeenCalledTimes(2); }); + it("retries queued rate limit responses after the learned reset", async () => { + vi.useFakeTimers(); + vi.setSystemTime(0); + const responses = [ + Promise.resolve( + createJsonResponse( + { message: "Rate limited", retry_after: 0.1, global: false }, + { + status: 429, + headers: { + "X-RateLimit-Bucket": "channel-messages", + "X-RateLimit-Limit": "1", + "X-RateLimit-Remaining": "0", + }, + }, + ), + ), + Promise.resolve( + createJsonResponse( + { id: "retried" }, + { + headers: { + "X-RateLimit-Bucket": "channel-messages", + "X-RateLimit-Limit": "1", + "X-RateLimit-Remaining": "1", + }, + }, + ), + ), + ]; + const fetchSpy = vi.fn(async () => { + const response = responses.shift(); + if (!response) { + throw new Error("unexpected request"); + } + return await response; + }); + const client = new RequestClient("test-token", { fetch: fetchSpy }); + + const request = client.get("/channels/c1/messages"); + await Promise.resolve(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(client.queueSize).toBe(1); + + await vi.advanceTimersByTimeAsync(99); + expect(fetchSpy).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1); + await expect(request).resolves.toEqual({ id: "retried" }); + expect(fetchSpy).toHaveBeenCalledTimes(2); + expect(client.queueSize).toBe(0); + expect(client.getSchedulerMetrics().buckets).toEqual([]); + }); + + it("honors maxRateLimitRetries for queued requests", async () => { + const fetchSpy = vi.fn(async () => + createJsonResponse( + { message: "Rate limited", retry_after: 0.1, global: false }, + { + status: 429, + headers: { "X-RateLimit-Bucket": "channel-messages" }, + }, + ), + ); + const client = new RequestClient("test-token", { + fetch: fetchSpy, + scheduler: { maxRateLimitRetries: 0 }, + }); + + await expect(client.get("/channels/c1/messages")).rejects.toMatchObject({ + name: "RateLimitError", + retryAfter: 0.1, + }); + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(client.queueSize).toBe(0); + }); + + it("does not requeue an active rate limit after the queue is cleared", async () => { + const response = createDeferred(); + const fetchSpy = vi.fn(async () => { + if (fetchSpy.mock.calls.length > 1) { + throw new Error("unexpected retry after clearQueue"); + } + return await response.promise; + }); + const client = new RequestClient("test-token", { fetch: fetchSpy }); + + const request = client.get("/channels/c1/messages"); + await vi.waitFor(() => expect(fetchSpy).toHaveBeenCalledTimes(1)); + expect(client.queueSize).toBe(1); + + client.clearQueue(); + expect(client.queueSize).toBe(1); + + response.resolve( + createJsonResponse( + { message: "Rate limited", retry_after: 0, global: false }, + { + status: 429, + headers: { "X-RateLimit-Bucket": "channel-messages" }, + }, + ), + ); + + await expect(request).rejects.toMatchObject({ + name: "RateLimitError", + retryAfter: 0, + }); + expect(fetchSpy).toHaveBeenCalledTimes(1); + expect(client.queueSize).toBe(0); + }); + + it("retries queued global rate limits after Retry-After", async () => { + vi.useFakeTimers(); + vi.setSystemTime(0); + const responses = [ + Promise.resolve( + createJsonResponse( + { message: "Rate limited", retry_after: 0.1, global: true }, + { + status: 429, + headers: { "X-RateLimit-Global": "true" }, + }, + ), + ), + Promise.resolve(createJsonResponse({ id: "after-global" })), + ]; + const fetchSpy = vi.fn(async () => { + const response = responses.shift(); + if (!response) { + throw new Error("unexpected request"); + } + return await response; + }); + const client = new RequestClient("test-token", { fetch: fetchSpy }); + + const request = client.get("/channels/c1/messages"); + await Promise.resolve(); + expect(fetchSpy).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(99); + expect(fetchSpy).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1); + await expect(request).resolves.toEqual({ id: "after-global" }); + expect(fetchSpy).toHaveBeenCalledTimes(2); + }); + it("preserves Discord error codes on rate limit errors", async () => { const client = new RequestClient("test-token", { queueRequests: false, @@ -175,6 +323,43 @@ describe("RequestClient", () => { }); }); + it("parses HTTP-date Retry-After headers on rate limit errors", async () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-05-01T12:00:00.000Z")); + const client = new RequestClient("test-token", { + queueRequests: false, + fetch: async () => + new Response(JSON.stringify({ message: "Slow down", global: false }), { + status: 429, + headers: { "Retry-After": "Fri, 01 May 2026 12:00:05 GMT" }, + }), + }); + + await expect(client.get("/channels/c1/messages")).rejects.toMatchObject({ + name: "RateLimitError", + retryAfter: 5, + }); + }); + + it("falls back to Retry-After when the rate limit body value is malformed", async () => { + const client = new RequestClient("test-token", { + queueRequests: false, + fetch: async () => + new Response( + JSON.stringify({ message: "Slow down", retry_after: "not-a-number", global: false }), + { + status: 429, + headers: { "Retry-After": "7" }, + }, + ), + }); + + await expect(client.get("/channels/c1/messages")).rejects.toMatchObject({ + name: "RateLimitError", + retryAfter: 7, + }); + }); + it("tracks invalid requests and exposes bucket scheduler metrics", async () => { const client = new RequestClient("test-token", { queueRequests: false, diff --git a/extensions/discord/src/internal/rest.ts b/extensions/discord/src/internal/rest.ts index e2b81139145..5908da9ab68 100644 --- a/extensions/discord/src/internal/rest.ts +++ b/extensions/discord/src/internal/rest.ts @@ -88,6 +88,7 @@ export class RequestClient { this.scheduler = new RestScheduler( { maxConcurrency: this.options.scheduler?.maxConcurrency ?? DEFAULT_MAX_CONCURRENT_WORKERS, + maxRateLimitRetries: this.options.scheduler?.maxRateLimitRetries ?? 3, maxQueueSize: this.options.maxQueueSize ?? defaultOptions.maxQueueSize, }, async (request) => @@ -167,7 +168,7 @@ export class RequestClient { const rateLimitBody = isDiscordRateLimitBody(parsed) ? parsed : undefined; throw new RateLimitError(response, { message: readDiscordMessage(rateLimitBody, "Rate limited"), - retry_after: readRetryAfter(rateLimitBody, response), + retry_after: readRetryAfter(rateLimitBody, response, 1), code: readDiscordCode(rateLimitBody), global: Boolean(rateLimitBody?.global), }); diff --git a/extensions/discord/src/send.webhook.proxy.test.ts b/extensions/discord/src/send.webhook.proxy.test.ts index 8fe94acfb19..5b33dd6c2d8 100644 --- a/extensions/discord/src/send.webhook.proxy.test.ts +++ b/extensions/discord/src/send.webhook.proxy.test.ts @@ -129,4 +129,63 @@ describe("sendWebhookMessageDiscord proxy support", () => { expect(globalFetchMock).toHaveBeenCalled(); globalFetchMock.mockRestore(); }); + + it("throws typed rate limit errors for webhook 429 responses", async () => { + const globalFetchMock = vi.spyOn(globalThis, "fetch").mockResolvedValue( + new Response(JSON.stringify({ message: "Slow down", retry_after: 0.25, global: false }), { + status: 429, + }), + ); + + const cfg = { + channels: { + discord: { + token: "Bot test-token", + }, + }, + } as OpenClawConfig; + + await expect( + sendWebhookMessageDiscord("hello", { + cfg, + accountId: "default", + webhookId: "123", + webhookToken: "abc", + wait: true, + }), + ).rejects.toMatchObject({ + name: "RateLimitError", + status: 429, + retryAfter: 0.25, + }); + globalFetchMock.mockRestore(); + }); + + it("throws typed status errors for webhook server failures", async () => { + const globalFetchMock = vi + .spyOn(globalThis, "fetch") + .mockResolvedValue(new Response("upstream unavailable", { status: 503 })); + + const cfg = { + channels: { + discord: { + token: "Bot test-token", + }, + }, + } as OpenClawConfig; + + await expect( + sendWebhookMessageDiscord("hello", { + cfg, + accountId: "default", + webhookId: "123", + webhookToken: "abc", + wait: true, + }), + ).rejects.toMatchObject({ + name: "DiscordError", + status: 503, + }); + globalFetchMock.mockRestore(); + }); }); diff --git a/extensions/discord/src/send.webhook.ts b/extensions/discord/src/send.webhook.ts index bb7ce6d744e..bdd9237b9b7 100644 --- a/extensions/discord/src/send.webhook.ts +++ b/extensions/discord/src/send.webhook.ts @@ -2,6 +2,13 @@ import { recordChannelActivity } from "openclaw/plugin-sdk/channel-activity-runt import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import { resolveDiscordClientAccountContext } from "./client.js"; +import { + DiscordError, + RateLimitError, + readDiscordCode, + readDiscordMessage, + readRetryAfter, +} from "./internal/rest-errors.js"; import { rewriteDiscordKnownMentions } from "./mentions.js"; import type { DiscordSendResult } from "./send.types.js"; @@ -33,6 +40,34 @@ function resolveWebhookExecutionUrl(params: { return baseUrl.toString(); } +function coerceWebhookErrorBody(raw: string): unknown { + if (!raw) { + return undefined; + } + try { + return JSON.parse(raw); + } catch { + return { message: raw.slice(0, 200) }; + } +} + +async function throwWebhookResponseError(response: Response): Promise { + const raw = await response.text().catch(() => ""); + const parsed = coerceWebhookErrorBody(raw); + if (response.status === 429) { + throw new RateLimitError(response, { + message: readDiscordMessage(parsed, "Rate limited"), + retry_after: readRetryAfter(parsed, response, 1), + code: readDiscordCode(parsed), + global: + parsed && typeof parsed === "object" && "global" in parsed + ? Boolean((parsed as { global?: unknown }).global) + : false, + }); + } + throw new DiscordError(response, parsed); +} + export async function sendWebhookMessageDiscord( text: string, opts: DiscordWebhookSendOpts, @@ -74,10 +109,7 @@ export async function sendWebhookMessageDiscord( }, ); if (!response.ok) { - const raw = await response.text().catch(() => ""); - throw new Error( - `Discord webhook send failed (${response.status}${raw ? `: ${raw.slice(0, 200)}` : ""})`, - ); + await throwWebhookResponseError(response); } const payload = (await response.json().catch(() => ({}))) as { diff --git a/extensions/discord/src/voice-message.test.ts b/extensions/discord/src/voice-message.test.ts index 23098fcf905..1d222db3061 100644 --- a/extensions/discord/src/voice-message.test.ts +++ b/extensions/discord/src/voice-message.test.ts @@ -1,4 +1,6 @@ import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import type { RequestClient } from "./internal/discord.js"; +import type { VoiceMessageMetadata } from "./voice-message.js"; const runFfprobeMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => Promise>()); const runFfmpegMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => Promise>()); @@ -25,11 +27,21 @@ vi.mock("openclaw/plugin-sdk/media-runtime", async () => { }; }); +vi.mock("openclaw/plugin-sdk/ssrf-runtime", async () => { + return { + fetchWithSsrFGuard: async (params: { url: string; init?: RequestInit }) => ({ + response: await globalThis.fetch(params.url, params.init), + release: async () => {}, + }), + }; +}); + let ensureOggOpus: typeof import("./voice-message.js").ensureOggOpus; +let sendDiscordVoiceMessage: typeof import("./voice-message.js").sendDiscordVoiceMessage; describe("ensureOggOpus", () => { beforeAll(async () => { - ({ ensureOggOpus } = await import("./voice-message.js")); + ({ ensureOggOpus, sendDiscordVoiceMessage } = await import("./voice-message.js")); }); beforeEach(() => { @@ -81,3 +93,142 @@ describe("ensureOggOpus", () => { ); }); }); + +describe("sendDiscordVoiceMessage", () => { + const metadata: VoiceMessageMetadata = { + durationSecs: 1, + waveform: "waveform", + }; + + beforeAll(async () => { + ({ sendDiscordVoiceMessage } = await import("./voice-message.js")); + }); + + beforeEach(() => { + vi.restoreAllMocks(); + }); + + function createRest(post = vi.fn(async () => ({ id: "msg-1", channel_id: "channel-1" }))) { + return { + options: { baseUrl: "https://discord.test/api/v10" }, + post, + } as unknown as RequestClient; + } + + async function retryRateLimits(fn: () => Promise): Promise { + let lastError: unknown; + for (let attempt = 0; attempt < 3; attempt += 1) { + try { + return await fn(); + } catch (err) { + lastError = err; + if (!(err instanceof Error) || err.name !== "RateLimitError") { + throw err; + } + } + } + throw lastError; + } + + it("requests a fresh upload URL when the CDN upload is rate limited", async () => { + const post = vi.fn(async () => ({ id: "msg-1", channel_id: "channel-1" })); + const rest = createRest(post); + let uploadUrlRequests = 0; + const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async (input, init) => { + const url = input instanceof Request ? input.url : String(input); + const method = input instanceof Request ? input.method : (init?.method ?? "GET"); + if (method === "POST" && url.endsWith("/channels/channel-1/attachments")) { + uploadUrlRequests += 1; + return new Response( + JSON.stringify({ + attachments: [ + { + id: 0, + upload_url: `https://cdn.test/upload-${uploadUrlRequests}`, + upload_filename: `uploaded-${uploadUrlRequests}.ogg`, + }, + ], + }), + { status: 200 }, + ); + } + if (method === "PUT" && url === "https://cdn.test/upload-1") { + return new Response( + JSON.stringify({ message: "Slow down", retry_after: 0, global: false }), + { status: 429 }, + ); + } + if (method === "PUT" && url === "https://cdn.test/upload-2") { + return new Response(null, { status: 200 }); + } + throw new Error(`unexpected fetch ${method} ${url}`); + }); + + await expect( + sendDiscordVoiceMessage( + rest, + "channel-1", + Buffer.from("ogg"), + metadata, + undefined, + retryRateLimits, + false, + "bot-token", + ), + ).resolves.toEqual({ id: "msg-1", channel_id: "channel-1" }); + + expect(uploadUrlRequests).toBe(2); + expect(fetchMock).toHaveBeenCalledTimes(4); + expect(post).toHaveBeenCalledWith("/channels/channel-1/messages", { + body: expect.objectContaining({ + attachments: [ + expect.objectContaining({ + uploaded_filename: "uploaded-2.ogg", + }), + ], + }), + }); + }); + + it("throws typed CDN upload failures", async () => { + const rest = createRest(); + vi.spyOn(globalThis, "fetch").mockImplementation(async (input, init) => { + const url = input instanceof Request ? input.url : String(input); + const method = input instanceof Request ? input.method : (init?.method ?? "GET"); + if (method === "POST" && url.endsWith("/channels/channel-1/attachments")) { + return new Response( + JSON.stringify({ + attachments: [ + { + id: 0, + upload_url: "https://cdn.test/upload", + upload_filename: "uploaded.ogg", + }, + ], + }), + { status: 200 }, + ); + } + if (method === "PUT" && url === "https://cdn.test/upload") { + return new Response("cdn unavailable", { status: 503 }); + } + throw new Error(`unexpected fetch ${method} ${url}`); + }); + + await expect( + sendDiscordVoiceMessage( + rest, + "channel-1", + Buffer.from("ogg"), + metadata, + undefined, + async (fn) => await fn(), + false, + "bot-token", + ), + ).rejects.toMatchObject({ + name: "DiscordError", + status: 503, + }); + }); +}); diff --git a/extensions/discord/src/voice-message.ts b/extensions/discord/src/voice-message.ts index 0ae40df9df5..03cb24d6119 100644 --- a/extensions/discord/src/voice-message.ts +++ b/extensions/discord/src/voice-message.ts @@ -22,9 +22,11 @@ import { import { MEDIA_FFMPEG_MAX_AUDIO_DURATION_SECS } from "openclaw/plugin-sdk/media-runtime"; import { unlinkIfExists } from "openclaw/plugin-sdk/media-runtime"; import type { RetryRunner } from "openclaw/plugin-sdk/retry-runtime"; +import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime"; import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; -import { RateLimitError, type RequestClient } from "./internal/discord.js"; +import { DiscordError, RateLimitError, type RequestClient } from "./internal/discord.js"; +import { readDiscordMessage, readRetryAfter } from "./internal/rest-errors.js"; const DISCORD_VOICE_MESSAGE_FLAG = 1 << 13; const SUPPRESS_NOTIFICATIONS_FLAG = 1 << 12; @@ -253,6 +255,99 @@ type UploadUrlResponse = { }>; }; +function coerceDiscordErrorBody(raw: string): unknown { + if (!raw) { + return undefined; + } + try { + return JSON.parse(raw); + } catch { + return { message: raw.slice(0, 200) }; + } +} + +async function createVoiceRequestError( + response: Response, + fallbackMessage: string, +): Promise { + const raw = await response.text().catch(() => ""); + const parsed = coerceDiscordErrorBody(raw); + if (response.status === 429) { + throw createRateLimitError(response, { + message: readDiscordMessage(parsed, "You are being rate limited."), + retry_after: readRetryAfter(parsed, response, 1), + global: + parsed && typeof parsed === "object" && "global" in parsed + ? Boolean((parsed as { global?: unknown }).global) + : false, + }); + } + return new DiscordError( + response, + parsed ?? { + message: fallbackMessage, + }, + ); +} + +async function requestVoiceUploadUrl(params: { + rest: RequestClient; + channelId: string; + botToken: string; + filename: string; + fileSize: number; +}): Promise { + const url = `${params.rest.options?.baseUrl ?? "https://discord.com/api"}/channels/${params.channelId}/attachments`; + const uploadUrlInit: RequestInit = { + method: "POST", + headers: { + Authorization: `Bot ${params.botToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + files: [{ filename: params.filename, file_size: params.fileSize, id: "0" }], + }), + }; + const { response: res, release } = await fetchWithSsrFGuard({ + url, + init: uploadUrlInit, + auditContext: "discord.voice.upload-url", + }); + try { + if (!res.ok) { + throw await createVoiceRequestError(res, "Upload URL request failed"); + } + return (await res.json()) as UploadUrlResponse; + } finally { + await release(); + } +} + +async function uploadVoiceAttachment(params: { + uploadUrl: string; + audioBuffer: Buffer; +}): Promise { + const { response: uploadResponse, release } = await fetchWithSsrFGuard({ + url: params.uploadUrl, + init: { + method: "PUT", + headers: { + "Content-Type": "audio/ogg", + }, + body: new Uint8Array(params.audioBuffer), + }, + auditContext: "discord.voice.attachment-upload", + }); + + try { + if (!uploadResponse.ok) { + throw await createVoiceRequestError(uploadResponse, "Failed to upload voice message"); + } + } finally { + await release(); + } +} + /** * Send a voice message to Discord * @@ -275,72 +370,32 @@ export async function sendDiscordVoiceMessage( const fileSize = audioBuffer.byteLength; // Step 1: Request upload URL from Discord - // Must use fetch() directly instead of rest.post() because ./internal/discord.js's - // RequestClient auto-converts requests to multipart/form-data when the body - // contains a "files" key. Discord's /attachments endpoint expects JSON, so - // the auto-conversion causes HTTP 400 "Expected Content-Type application/json". + // RequestClient auto-converts "files" bodies to multipart/form-data, but Discord's + // /attachments endpoint expects JSON, so this path uses a guarded raw HTTP call. const botToken = token; if (!botToken) { throw new Error("Discord bot token is required for voice message upload"); } - const uploadUrlResponse = await request(async () => { - const url = `${rest.options?.baseUrl ?? "https://discord.com/api"}/channels/${channelId}/attachments`; - const uploadUrlRequest = new Request(url, { - method: "POST", - headers: { - Authorization: `Bot ${botToken}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ - files: [{ filename, file_size: fileSize, id: "0" }], - }), + const { upload_filename } = await request(async () => { + const uploadUrlResponse = await requestVoiceUploadUrl({ + rest, + channelId, + botToken, + filename, + fileSize, }); - const res = await fetch(uploadUrlRequest); - if (!res.ok) { - if (res.status === 429) { - const retryData = (await res.json().catch(() => ({}))) as { - message?: string; - retry_after?: number; - global?: boolean; - }; - throw createRateLimitError(res, { - message: retryData.message ?? "You are being rate limited.", - retry_after: retryData.retry_after ?? 1, - global: retryData.global ?? false, - }); - } - const errorBody = (await res.json().catch(() => null)) as { - code?: number; - message?: string; - } | null; - const err = new Error(`Upload URL request failed: ${res.status} ${errorBody?.message ?? ""}`); - if (errorBody?.code !== undefined) { - (err as Error & { code: number }).code = errorBody.code; - } - throw err; + + if (!uploadUrlResponse.attachments?.[0]) { + throw new Error("Failed to get upload URL for voice message"); } - return (await res.json()) as UploadUrlResponse; - }, "voice-upload-url"); - if (!uploadUrlResponse.attachments?.[0]) { - throw new Error("Failed to get upload URL for voice message"); - } - - const { upload_url, upload_filename } = uploadUrlResponse.attachments[0]; - - // Step 2: Upload the file to Discord's CDN - // Note: Not wrapped in retry runner - upload URLs are single-use and CDN behavior differs - const uploadResponse = await fetch(upload_url, { - method: "PUT", - headers: { - "Content-Type": "audio/ogg", - }, - body: new Uint8Array(audioBuffer), - }); - - if (!uploadResponse.ok) { - throw new Error(`Failed to upload voice message: ${uploadResponse.status}`); - } + const attachment = uploadUrlResponse.attachments[0]; + await uploadVoiceAttachment({ + uploadUrl: attachment.upload_url, + audioBuffer, + }); + return attachment; + }, "voice-upload"); // Step 3: Send the message with voice message flag and metadata const flags = silent