mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:20:43 +00:00
fix(discord): harden rate limit retries (#75338)
* fix(discord): harden rate limit retries * fix(discord): guard voice upload fetches * fix(discord): avoid stale rate limit requeues
This commit is contained in:
committed by
GitHub
parent
3c4851037b
commit
bd20f8e07e
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
- Agents/commitments: keep inferred follow-ups internal when heartbeat target is none, strip raw source text from stored commitments, disable tools during due-commitment heartbeat turns, bound hidden extraction queue growth, expire stale commitments, and add QA/Docker safety coverage. Thanks @vignesh07.
|
||||
- Plugins/runtime-deps: accept already materialized package-level runtime-deps supersets as converged, so later lazy plugin activation no longer prunes and relaunches `pnpm install` after gateway startup pre-staging, reducing event-loop pressure from repeated runtime-deps repair on packaged installs. Fixes #75283; refs #75297 and #72338. Thanks @brokemac79, @lisandromachado, and @midhunmonachan.
|
||||
- Discord: retry queued REST 429s against learned bucket/global cooldowns and reacquire fresh voice upload URLs after CDN upload rate limits, so outbound sends recover without reusing stale single-use upload URLs. Thanks @discord.
|
||||
- TTS/providers: keep bundled speech-provider compat fallback available when plugins are globally disabled, so cold gateway and CLI startup can still resolve fallback speech providers instead of leaving explicit TTS provider selection with no registered providers. Refs #75265. Thanks @sliekens.
|
||||
- Discord: collapse repeated native slash-command deploy rate-limit startup logs into one non-fatal warning while keeping per-request REST timing in verbose output. Thanks @discord.
|
||||
- Providers/OpenAI Codex: preserve existing wrapped Codex streams during OpenAI attribution so PI OAuth bearer injection reaches ChatGPT/Codex Responses, and strip native Codex-only unsupported payload fields without touching custom compatible endpoints. (#75111) Thanks @keshavbotagent.
|
||||
|
||||
@@ -20,21 +20,36 @@ export function readDiscordMessage(body: unknown, fallback: string): string {
|
||||
return typeof value === "string" && value.trim() ? value : fallback;
|
||||
}
|
||||
|
||||
export function readRetryAfter(body: unknown, response: Response): number {
|
||||
function readRetryAfterHeader(value: string | null, now = Date.now()): number | undefined {
|
||||
if (!value) {
|
||||
return undefined;
|
||||
}
|
||||
const seconds = Number(value);
|
||||
if (Number.isFinite(seconds)) {
|
||||
return seconds;
|
||||
}
|
||||
const retryAt = Date.parse(value);
|
||||
return Number.isFinite(retryAt) ? (retryAt - now) / 1000 : undefined;
|
||||
}
|
||||
|
||||
function coerceRetryAfterSeconds(value: unknown): number | undefined {
|
||||
if (typeof value !== "number" && typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const seconds = typeof value === "number" ? value : Number(value);
|
||||
return Number.isFinite(seconds) && seconds >= 0 ? Math.max(0, seconds) : undefined;
|
||||
}
|
||||
|
||||
export function readRetryAfter(body: unknown, response: Response, fallbackSeconds = 0): number {
|
||||
const bodyValue =
|
||||
body && typeof body === "object" && "retry_after" in body
|
||||
? (body as { retry_after?: unknown }).retry_after
|
||||
: undefined;
|
||||
const headerValue = response.headers.get("Retry-After");
|
||||
const seconds =
|
||||
typeof bodyValue === "number"
|
||||
? bodyValue
|
||||
: typeof bodyValue === "string"
|
||||
? Number(bodyValue)
|
||||
: headerValue
|
||||
? Number(headerValue)
|
||||
: 0;
|
||||
return Number.isFinite(seconds) && seconds > 0 ? seconds : 0;
|
||||
return (
|
||||
coerceRetryAfterSeconds(bodyValue) ??
|
||||
coerceRetryAfterSeconds(readRetryAfterHeader(response.headers.get("Retry-After"))) ??
|
||||
fallbackSeconds
|
||||
);
|
||||
}
|
||||
|
||||
export class DiscordError extends Error {
|
||||
@@ -66,7 +81,7 @@ export class RateLimitError extends DiscordError {
|
||||
) {
|
||||
super(response, body);
|
||||
this.name = "RateLimitError";
|
||||
this.retryAfter = readRetryAfter(body, response);
|
||||
this.retryAfter = readRetryAfter(body, response, 1);
|
||||
this.scope = body.global ? "global" : response.headers.get("X-RateLimit-Scope");
|
||||
this.bucket = response.headers.get("X-RateLimit-Bucket");
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { readRetryAfter } from "./rest-errors.js";
|
||||
import { RateLimitError, readRetryAfter } from "./rest-errors.js";
|
||||
import { createBucketKey, createRouteKey, readHeaderNumber, readResetAt } from "./rest-routes.js";
|
||||
|
||||
export type RequestQuery = Record<string, string | number | boolean>;
|
||||
@@ -6,8 +6,10 @@ export type ScheduledRequest<TData> = {
|
||||
method: string;
|
||||
path: string;
|
||||
data?: TData;
|
||||
generation: number;
|
||||
query?: RequestQuery;
|
||||
routeKey: string;
|
||||
retryCount: number;
|
||||
resolve: (value?: unknown) => void;
|
||||
reject: (reason?: unknown) => void;
|
||||
};
|
||||
@@ -26,6 +28,7 @@ type BucketState<TData> = {
|
||||
|
||||
export type RestSchedulerOptions = {
|
||||
maxConcurrency: number;
|
||||
maxRateLimitRetries: number;
|
||||
maxQueueSize: number;
|
||||
};
|
||||
|
||||
@@ -37,6 +40,7 @@ export class RestScheduler<TData> {
|
||||
private drainTimer: NodeJS.Timeout | undefined;
|
||||
private globalRateLimitUntil = 0;
|
||||
private invalidRequestTimestamps: Array<{ at: number; status: number }> = [];
|
||||
private queueGeneration = 0;
|
||||
private queuedRequests = 0;
|
||||
private routeBuckets = new Map<string, string>();
|
||||
|
||||
@@ -58,7 +62,14 @@ export class RestScheduler<TData> {
|
||||
const bucket = this.getBucket(this.routeBuckets.get(routeKey) ?? routeKey);
|
||||
return new Promise((resolve, reject) => {
|
||||
this.queuedRequests += 1;
|
||||
bucket.pending.push({ ...params, routeKey, resolve, reject });
|
||||
bucket.pending.push({
|
||||
...params,
|
||||
generation: this.queueGeneration,
|
||||
routeKey,
|
||||
retryCount: 0,
|
||||
resolve,
|
||||
reject,
|
||||
});
|
||||
this.drainQueues();
|
||||
});
|
||||
}
|
||||
@@ -69,6 +80,7 @@ export class RestScheduler<TData> {
|
||||
}
|
||||
|
||||
clearQueue(): void {
|
||||
this.queueGeneration += 1;
|
||||
if (this.drainTimer) {
|
||||
clearTimeout(this.drainTimer);
|
||||
this.drainTimer = undefined;
|
||||
@@ -77,6 +89,7 @@ export class RestScheduler<TData> {
|
||||
}
|
||||
|
||||
abortPending(): void {
|
||||
this.queueGeneration += 1;
|
||||
this.rejectPending(new DOMException("Aborted", "AbortError"));
|
||||
}
|
||||
|
||||
@@ -119,6 +132,10 @@ export class RestScheduler<TData> {
|
||||
return Math.max(1, Math.floor(this.options.maxConcurrency));
|
||||
}
|
||||
|
||||
private get maxRateLimitRetries(): number {
|
||||
return Math.max(0, Math.floor(this.options.maxRateLimitRetries));
|
||||
}
|
||||
|
||||
private getBucket(key: string): BucketState<TData> {
|
||||
const existing = this.buckets.get(key);
|
||||
if (existing) {
|
||||
@@ -220,7 +237,7 @@ export class RestScheduler<TData> {
|
||||
return;
|
||||
}
|
||||
bucket.rateLimitHits += 1;
|
||||
const retryAfterMs = Math.max(0, readRetryAfter(parsed, response) * 1000);
|
||||
const retryAfterMs = Math.max(0, readRetryAfter(parsed, response, 1) * 1000);
|
||||
const retryAt = Date.now() + retryAfterMs;
|
||||
if (response.headers.get("X-RateLimit-Global") === "true" || isGlobalRateLimit(parsed)) {
|
||||
this.globalRateLimitUntil = Math.max(this.globalRateLimitUntil, retryAt);
|
||||
@@ -337,14 +354,21 @@ export class RestScheduler<TData> {
|
||||
queued: ScheduledRequest<TData>,
|
||||
bucket: BucketState<TData>,
|
||||
): Promise<void> {
|
||||
let requeued = false;
|
||||
try {
|
||||
queued.resolve(await this.executor(queued));
|
||||
} catch (error) {
|
||||
if (error instanceof RateLimitError && this.requeueRateLimitedRequest(queued)) {
|
||||
requeued = true;
|
||||
return;
|
||||
}
|
||||
queued.reject(error);
|
||||
} finally {
|
||||
bucket.active = Math.max(0, bucket.active - 1);
|
||||
this.activeWorkers = Math.max(0, this.activeWorkers - 1);
|
||||
this.queuedRequests = Math.max(0, this.queuedRequests - 1);
|
||||
if (!requeued) {
|
||||
this.queuedRequests = Math.max(0, this.queuedRequests - 1);
|
||||
}
|
||||
if (bucket.active === 0 && bucket.pending.length === 0) {
|
||||
for (const routeKey of bucket.routeKeys) {
|
||||
if (this.routeBuckets.get(routeKey) === routeKey) {
|
||||
@@ -356,6 +380,21 @@ export class RestScheduler<TData> {
|
||||
}
|
||||
}
|
||||
|
||||
private requeueRateLimitedRequest(queued: ScheduledRequest<TData>): boolean {
|
||||
if (
|
||||
queued.generation !== this.queueGeneration ||
|
||||
queued.retryCount >= this.maxRateLimitRetries
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const bucketKey = this.routeBuckets.get(queued.routeKey) ?? queued.routeKey;
|
||||
this.getBucket(bucketKey).pending.push({
|
||||
...queued,
|
||||
retryCount: queued.retryCount + 1,
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
private rejectPending(error: Error | DOMException): void {
|
||||
for (const bucket of this.buckets.values()) {
|
||||
for (const queued of bucket.pending.splice(0)) {
|
||||
|
||||
@@ -153,6 +153,154 @@ describe("RequestClient", () => {
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("retries queued rate limit responses after the learned reset", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(0);
|
||||
const responses = [
|
||||
Promise.resolve(
|
||||
createJsonResponse(
|
||||
{ message: "Rate limited", retry_after: 0.1, global: false },
|
||||
{
|
||||
status: 429,
|
||||
headers: {
|
||||
"X-RateLimit-Bucket": "channel-messages",
|
||||
"X-RateLimit-Limit": "1",
|
||||
"X-RateLimit-Remaining": "0",
|
||||
},
|
||||
},
|
||||
),
|
||||
),
|
||||
Promise.resolve(
|
||||
createJsonResponse(
|
||||
{ id: "retried" },
|
||||
{
|
||||
headers: {
|
||||
"X-RateLimit-Bucket": "channel-messages",
|
||||
"X-RateLimit-Limit": "1",
|
||||
"X-RateLimit-Remaining": "1",
|
||||
},
|
||||
},
|
||||
),
|
||||
),
|
||||
];
|
||||
const fetchSpy = vi.fn(async () => {
|
||||
const response = responses.shift();
|
||||
if (!response) {
|
||||
throw new Error("unexpected request");
|
||||
}
|
||||
return await response;
|
||||
});
|
||||
const client = new RequestClient("test-token", { fetch: fetchSpy });
|
||||
|
||||
const request = client.get("/channels/c1/messages");
|
||||
await Promise.resolve();
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
expect(client.queueSize).toBe(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(99);
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
await expect(request).resolves.toEqual({ id: "retried" });
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(2);
|
||||
expect(client.queueSize).toBe(0);
|
||||
expect(client.getSchedulerMetrics().buckets).toEqual([]);
|
||||
});
|
||||
|
||||
it("honors maxRateLimitRetries for queued requests", async () => {
|
||||
const fetchSpy = vi.fn(async () =>
|
||||
createJsonResponse(
|
||||
{ message: "Rate limited", retry_after: 0.1, global: false },
|
||||
{
|
||||
status: 429,
|
||||
headers: { "X-RateLimit-Bucket": "channel-messages" },
|
||||
},
|
||||
),
|
||||
);
|
||||
const client = new RequestClient("test-token", {
|
||||
fetch: fetchSpy,
|
||||
scheduler: { maxRateLimitRetries: 0 },
|
||||
});
|
||||
|
||||
await expect(client.get("/channels/c1/messages")).rejects.toMatchObject({
|
||||
name: "RateLimitError",
|
||||
retryAfter: 0.1,
|
||||
});
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
expect(client.queueSize).toBe(0);
|
||||
});
|
||||
|
||||
it("does not requeue an active rate limit after the queue is cleared", async () => {
|
||||
const response = createDeferred<Response>();
|
||||
const fetchSpy = vi.fn(async () => {
|
||||
if (fetchSpy.mock.calls.length > 1) {
|
||||
throw new Error("unexpected retry after clearQueue");
|
||||
}
|
||||
return await response.promise;
|
||||
});
|
||||
const client = new RequestClient("test-token", { fetch: fetchSpy });
|
||||
|
||||
const request = client.get("/channels/c1/messages");
|
||||
await vi.waitFor(() => expect(fetchSpy).toHaveBeenCalledTimes(1));
|
||||
expect(client.queueSize).toBe(1);
|
||||
|
||||
client.clearQueue();
|
||||
expect(client.queueSize).toBe(1);
|
||||
|
||||
response.resolve(
|
||||
createJsonResponse(
|
||||
{ message: "Rate limited", retry_after: 0, global: false },
|
||||
{
|
||||
status: 429,
|
||||
headers: { "X-RateLimit-Bucket": "channel-messages" },
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
await expect(request).rejects.toMatchObject({
|
||||
name: "RateLimitError",
|
||||
retryAfter: 0,
|
||||
});
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
expect(client.queueSize).toBe(0);
|
||||
});
|
||||
|
||||
it("retries queued global rate limits after Retry-After", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(0);
|
||||
const responses = [
|
||||
Promise.resolve(
|
||||
createJsonResponse(
|
||||
{ message: "Rate limited", retry_after: 0.1, global: true },
|
||||
{
|
||||
status: 429,
|
||||
headers: { "X-RateLimit-Global": "true" },
|
||||
},
|
||||
),
|
||||
),
|
||||
Promise.resolve(createJsonResponse({ id: "after-global" })),
|
||||
];
|
||||
const fetchSpy = vi.fn(async () => {
|
||||
const response = responses.shift();
|
||||
if (!response) {
|
||||
throw new Error("unexpected request");
|
||||
}
|
||||
return await response;
|
||||
});
|
||||
const client = new RequestClient("test-token", { fetch: fetchSpy });
|
||||
|
||||
const request = client.get("/channels/c1/messages");
|
||||
await Promise.resolve();
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(99);
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
await expect(request).resolves.toEqual({ id: "after-global" });
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("preserves Discord error codes on rate limit errors", async () => {
|
||||
const client = new RequestClient("test-token", {
|
||||
queueRequests: false,
|
||||
@@ -175,6 +323,43 @@ describe("RequestClient", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("parses HTTP-date Retry-After headers on rate limit errors", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date("2026-05-01T12:00:00.000Z"));
|
||||
const client = new RequestClient("test-token", {
|
||||
queueRequests: false,
|
||||
fetch: async () =>
|
||||
new Response(JSON.stringify({ message: "Slow down", global: false }), {
|
||||
status: 429,
|
||||
headers: { "Retry-After": "Fri, 01 May 2026 12:00:05 GMT" },
|
||||
}),
|
||||
});
|
||||
|
||||
await expect(client.get("/channels/c1/messages")).rejects.toMatchObject({
|
||||
name: "RateLimitError",
|
||||
retryAfter: 5,
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to Retry-After when the rate limit body value is malformed", async () => {
|
||||
const client = new RequestClient("test-token", {
|
||||
queueRequests: false,
|
||||
fetch: async () =>
|
||||
new Response(
|
||||
JSON.stringify({ message: "Slow down", retry_after: "not-a-number", global: false }),
|
||||
{
|
||||
status: 429,
|
||||
headers: { "Retry-After": "7" },
|
||||
},
|
||||
),
|
||||
});
|
||||
|
||||
await expect(client.get("/channels/c1/messages")).rejects.toMatchObject({
|
||||
name: "RateLimitError",
|
||||
retryAfter: 7,
|
||||
});
|
||||
});
|
||||
|
||||
it("tracks invalid requests and exposes bucket scheduler metrics", async () => {
|
||||
const client = new RequestClient("test-token", {
|
||||
queueRequests: false,
|
||||
|
||||
@@ -88,6 +88,7 @@ export class RequestClient {
|
||||
this.scheduler = new RestScheduler<RequestData>(
|
||||
{
|
||||
maxConcurrency: this.options.scheduler?.maxConcurrency ?? DEFAULT_MAX_CONCURRENT_WORKERS,
|
||||
maxRateLimitRetries: this.options.scheduler?.maxRateLimitRetries ?? 3,
|
||||
maxQueueSize: this.options.maxQueueSize ?? defaultOptions.maxQueueSize,
|
||||
},
|
||||
async (request) =>
|
||||
@@ -167,7 +168,7 @@ export class RequestClient {
|
||||
const rateLimitBody = isDiscordRateLimitBody(parsed) ? parsed : undefined;
|
||||
throw new RateLimitError(response, {
|
||||
message: readDiscordMessage(rateLimitBody, "Rate limited"),
|
||||
retry_after: readRetryAfter(rateLimitBody, response),
|
||||
retry_after: readRetryAfter(rateLimitBody, response, 1),
|
||||
code: readDiscordCode(rateLimitBody),
|
||||
global: Boolean(rateLimitBody?.global),
|
||||
});
|
||||
|
||||
@@ -129,4 +129,63 @@ describe("sendWebhookMessageDiscord proxy support", () => {
|
||||
expect(globalFetchMock).toHaveBeenCalled();
|
||||
globalFetchMock.mockRestore();
|
||||
});
|
||||
|
||||
it("throws typed rate limit errors for webhook 429 responses", async () => {
|
||||
const globalFetchMock = vi.spyOn(globalThis, "fetch").mockResolvedValue(
|
||||
new Response(JSON.stringify({ message: "Slow down", retry_after: 0.25, global: false }), {
|
||||
status: 429,
|
||||
}),
|
||||
);
|
||||
|
||||
const cfg = {
|
||||
channels: {
|
||||
discord: {
|
||||
token: "Bot test-token",
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
await expect(
|
||||
sendWebhookMessageDiscord("hello", {
|
||||
cfg,
|
||||
accountId: "default",
|
||||
webhookId: "123",
|
||||
webhookToken: "abc",
|
||||
wait: true,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
name: "RateLimitError",
|
||||
status: 429,
|
||||
retryAfter: 0.25,
|
||||
});
|
||||
globalFetchMock.mockRestore();
|
||||
});
|
||||
|
||||
it("throws typed status errors for webhook server failures", async () => {
|
||||
const globalFetchMock = vi
|
||||
.spyOn(globalThis, "fetch")
|
||||
.mockResolvedValue(new Response("upstream unavailable", { status: 503 }));
|
||||
|
||||
const cfg = {
|
||||
channels: {
|
||||
discord: {
|
||||
token: "Bot test-token",
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
|
||||
await expect(
|
||||
sendWebhookMessageDiscord("hello", {
|
||||
cfg,
|
||||
accountId: "default",
|
||||
webhookId: "123",
|
||||
webhookToken: "abc",
|
||||
wait: true,
|
||||
}),
|
||||
).rejects.toMatchObject({
|
||||
name: "DiscordError",
|
||||
status: 503,
|
||||
});
|
||||
globalFetchMock.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,6 +2,13 @@ import { recordChannelActivity } from "openclaw/plugin-sdk/channel-activity-runt
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { resolveDiscordClientAccountContext } from "./client.js";
|
||||
import {
|
||||
DiscordError,
|
||||
RateLimitError,
|
||||
readDiscordCode,
|
||||
readDiscordMessage,
|
||||
readRetryAfter,
|
||||
} from "./internal/rest-errors.js";
|
||||
import { rewriteDiscordKnownMentions } from "./mentions.js";
|
||||
import type { DiscordSendResult } from "./send.types.js";
|
||||
|
||||
@@ -33,6 +40,34 @@ function resolveWebhookExecutionUrl(params: {
|
||||
return baseUrl.toString();
|
||||
}
|
||||
|
||||
function coerceWebhookErrorBody(raw: string): unknown {
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return { message: raw.slice(0, 200) };
|
||||
}
|
||||
}
|
||||
|
||||
async function throwWebhookResponseError(response: Response): Promise<never> {
|
||||
const raw = await response.text().catch(() => "");
|
||||
const parsed = coerceWebhookErrorBody(raw);
|
||||
if (response.status === 429) {
|
||||
throw new RateLimitError(response, {
|
||||
message: readDiscordMessage(parsed, "Rate limited"),
|
||||
retry_after: readRetryAfter(parsed, response, 1),
|
||||
code: readDiscordCode(parsed),
|
||||
global:
|
||||
parsed && typeof parsed === "object" && "global" in parsed
|
||||
? Boolean((parsed as { global?: unknown }).global)
|
||||
: false,
|
||||
});
|
||||
}
|
||||
throw new DiscordError(response, parsed);
|
||||
}
|
||||
|
||||
export async function sendWebhookMessageDiscord(
|
||||
text: string,
|
||||
opts: DiscordWebhookSendOpts,
|
||||
@@ -74,10 +109,7 @@ export async function sendWebhookMessageDiscord(
|
||||
},
|
||||
);
|
||||
if (!response.ok) {
|
||||
const raw = await response.text().catch(() => "");
|
||||
throw new Error(
|
||||
`Discord webhook send failed (${response.status}${raw ? `: ${raw.slice(0, 200)}` : ""})`,
|
||||
);
|
||||
await throwWebhookResponseError(response);
|
||||
}
|
||||
|
||||
const payload = (await response.json().catch(() => ({}))) as {
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { RequestClient } from "./internal/discord.js";
|
||||
import type { VoiceMessageMetadata } from "./voice-message.js";
|
||||
|
||||
const runFfprobeMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => Promise<string>>());
|
||||
const runFfmpegMock = vi.hoisted(() => vi.fn<(...args: unknown[]) => Promise<void>>());
|
||||
@@ -25,11 +27,21 @@ vi.mock("openclaw/plugin-sdk/media-runtime", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/ssrf-runtime", async () => {
|
||||
return {
|
||||
fetchWithSsrFGuard: async (params: { url: string; init?: RequestInit }) => ({
|
||||
response: await globalThis.fetch(params.url, params.init),
|
||||
release: async () => {},
|
||||
}),
|
||||
};
|
||||
});
|
||||
|
||||
let ensureOggOpus: typeof import("./voice-message.js").ensureOggOpus;
|
||||
let sendDiscordVoiceMessage: typeof import("./voice-message.js").sendDiscordVoiceMessage;
|
||||
|
||||
describe("ensureOggOpus", () => {
|
||||
beforeAll(async () => {
|
||||
({ ensureOggOpus } = await import("./voice-message.js"));
|
||||
({ ensureOggOpus, sendDiscordVoiceMessage } = await import("./voice-message.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
@@ -81,3 +93,142 @@ describe("ensureOggOpus", () => {
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("sendDiscordVoiceMessage", () => {
|
||||
const metadata: VoiceMessageMetadata = {
|
||||
durationSecs: 1,
|
||||
waveform: "waveform",
|
||||
};
|
||||
|
||||
beforeAll(async () => {
|
||||
({ sendDiscordVoiceMessage } = await import("./voice-message.js"));
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
function createRest(post = vi.fn(async () => ({ id: "msg-1", channel_id: "channel-1" }))) {
|
||||
return {
|
||||
options: { baseUrl: "https://discord.test/api/v10" },
|
||||
post,
|
||||
} as unknown as RequestClient;
|
||||
}
|
||||
|
||||
async function retryRateLimits<T>(fn: () => Promise<T>): Promise<T> {
|
||||
let lastError: unknown;
|
||||
for (let attempt = 0; attempt < 3; attempt += 1) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
lastError = err;
|
||||
if (!(err instanceof Error) || err.name !== "RateLimitError") {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
it("requests a fresh upload URL when the CDN upload is rate limited", async () => {
|
||||
const post = vi.fn(async () => ({ id: "msg-1", channel_id: "channel-1" }));
|
||||
const rest = createRest(post);
|
||||
let uploadUrlRequests = 0;
|
||||
const fetchMock = vi.spyOn(globalThis, "fetch").mockImplementation(async (input, init) => {
|
||||
const url = input instanceof Request ? input.url : String(input);
|
||||
const method = input instanceof Request ? input.method : (init?.method ?? "GET");
|
||||
if (method === "POST" && url.endsWith("/channels/channel-1/attachments")) {
|
||||
uploadUrlRequests += 1;
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
attachments: [
|
||||
{
|
||||
id: 0,
|
||||
upload_url: `https://cdn.test/upload-${uploadUrlRequests}`,
|
||||
upload_filename: `uploaded-${uploadUrlRequests}.ogg`,
|
||||
},
|
||||
],
|
||||
}),
|
||||
{ status: 200 },
|
||||
);
|
||||
}
|
||||
if (method === "PUT" && url === "https://cdn.test/upload-1") {
|
||||
return new Response(
|
||||
JSON.stringify({ message: "Slow down", retry_after: 0, global: false }),
|
||||
{ status: 429 },
|
||||
);
|
||||
}
|
||||
if (method === "PUT" && url === "https://cdn.test/upload-2") {
|
||||
return new Response(null, { status: 200 });
|
||||
}
|
||||
throw new Error(`unexpected fetch ${method} ${url}`);
|
||||
});
|
||||
|
||||
await expect(
|
||||
sendDiscordVoiceMessage(
|
||||
rest,
|
||||
"channel-1",
|
||||
Buffer.from("ogg"),
|
||||
metadata,
|
||||
undefined,
|
||||
retryRateLimits,
|
||||
false,
|
||||
"bot-token",
|
||||
),
|
||||
).resolves.toEqual({ id: "msg-1", channel_id: "channel-1" });
|
||||
|
||||
expect(uploadUrlRequests).toBe(2);
|
||||
expect(fetchMock).toHaveBeenCalledTimes(4);
|
||||
expect(post).toHaveBeenCalledWith("/channels/channel-1/messages", {
|
||||
body: expect.objectContaining({
|
||||
attachments: [
|
||||
expect.objectContaining({
|
||||
uploaded_filename: "uploaded-2.ogg",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
});
|
||||
});
|
||||
|
||||
it("throws typed CDN upload failures", async () => {
|
||||
const rest = createRest();
|
||||
vi.spyOn(globalThis, "fetch").mockImplementation(async (input, init) => {
|
||||
const url = input instanceof Request ? input.url : String(input);
|
||||
const method = input instanceof Request ? input.method : (init?.method ?? "GET");
|
||||
if (method === "POST" && url.endsWith("/channels/channel-1/attachments")) {
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
attachments: [
|
||||
{
|
||||
id: 0,
|
||||
upload_url: "https://cdn.test/upload",
|
||||
upload_filename: "uploaded.ogg",
|
||||
},
|
||||
],
|
||||
}),
|
||||
{ status: 200 },
|
||||
);
|
||||
}
|
||||
if (method === "PUT" && url === "https://cdn.test/upload") {
|
||||
return new Response("cdn unavailable", { status: 503 });
|
||||
}
|
||||
throw new Error(`unexpected fetch ${method} ${url}`);
|
||||
});
|
||||
|
||||
await expect(
|
||||
sendDiscordVoiceMessage(
|
||||
rest,
|
||||
"channel-1",
|
||||
Buffer.from("ogg"),
|
||||
metadata,
|
||||
undefined,
|
||||
async (fn) => await fn(),
|
||||
false,
|
||||
"bot-token",
|
||||
),
|
||||
).rejects.toMatchObject({
|
||||
name: "DiscordError",
|
||||
status: 503,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -22,9 +22,11 @@ import {
|
||||
import { MEDIA_FFMPEG_MAX_AUDIO_DURATION_SECS } from "openclaw/plugin-sdk/media-runtime";
|
||||
import { unlinkIfExists } from "openclaw/plugin-sdk/media-runtime";
|
||||
import type { RetryRunner } from "openclaw/plugin-sdk/retry-runtime";
|
||||
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk/ssrf-runtime";
|
||||
import { resolvePreferredOpenClawTmpDir } from "openclaw/plugin-sdk/temp-path";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { RateLimitError, type RequestClient } from "./internal/discord.js";
|
||||
import { DiscordError, RateLimitError, type RequestClient } from "./internal/discord.js";
|
||||
import { readDiscordMessage, readRetryAfter } from "./internal/rest-errors.js";
|
||||
|
||||
const DISCORD_VOICE_MESSAGE_FLAG = 1 << 13;
|
||||
const SUPPRESS_NOTIFICATIONS_FLAG = 1 << 12;
|
||||
@@ -253,6 +255,99 @@ type UploadUrlResponse = {
|
||||
}>;
|
||||
};
|
||||
|
||||
function coerceDiscordErrorBody(raw: string): unknown {
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
try {
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return { message: raw.slice(0, 200) };
|
||||
}
|
||||
}
|
||||
|
||||
async function createVoiceRequestError(
|
||||
response: Response,
|
||||
fallbackMessage: string,
|
||||
): Promise<Error> {
|
||||
const raw = await response.text().catch(() => "");
|
||||
const parsed = coerceDiscordErrorBody(raw);
|
||||
if (response.status === 429) {
|
||||
throw createRateLimitError(response, {
|
||||
message: readDiscordMessage(parsed, "You are being rate limited."),
|
||||
retry_after: readRetryAfter(parsed, response, 1),
|
||||
global:
|
||||
parsed && typeof parsed === "object" && "global" in parsed
|
||||
? Boolean((parsed as { global?: unknown }).global)
|
||||
: false,
|
||||
});
|
||||
}
|
||||
return new DiscordError(
|
||||
response,
|
||||
parsed ?? {
|
||||
message: fallbackMessage,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async function requestVoiceUploadUrl(params: {
|
||||
rest: RequestClient;
|
||||
channelId: string;
|
||||
botToken: string;
|
||||
filename: string;
|
||||
fileSize: number;
|
||||
}): Promise<UploadUrlResponse> {
|
||||
const url = `${params.rest.options?.baseUrl ?? "https://discord.com/api"}/channels/${params.channelId}/attachments`;
|
||||
const uploadUrlInit: RequestInit = {
|
||||
method: "POST",
|
||||
headers: {
|
||||
Authorization: `Bot ${params.botToken}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
files: [{ filename: params.filename, file_size: params.fileSize, id: "0" }],
|
||||
}),
|
||||
};
|
||||
const { response: res, release } = await fetchWithSsrFGuard({
|
||||
url,
|
||||
init: uploadUrlInit,
|
||||
auditContext: "discord.voice.upload-url",
|
||||
});
|
||||
try {
|
||||
if (!res.ok) {
|
||||
throw await createVoiceRequestError(res, "Upload URL request failed");
|
||||
}
|
||||
return (await res.json()) as UploadUrlResponse;
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
async function uploadVoiceAttachment(params: {
|
||||
uploadUrl: string;
|
||||
audioBuffer: Buffer;
|
||||
}): Promise<void> {
|
||||
const { response: uploadResponse, release } = await fetchWithSsrFGuard({
|
||||
url: params.uploadUrl,
|
||||
init: {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "audio/ogg",
|
||||
},
|
||||
body: new Uint8Array(params.audioBuffer),
|
||||
},
|
||||
auditContext: "discord.voice.attachment-upload",
|
||||
});
|
||||
|
||||
try {
|
||||
if (!uploadResponse.ok) {
|
||||
throw await createVoiceRequestError(uploadResponse, "Failed to upload voice message");
|
||||
}
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a voice message to Discord
|
||||
*
|
||||
@@ -275,72 +370,32 @@ export async function sendDiscordVoiceMessage(
|
||||
const fileSize = audioBuffer.byteLength;
|
||||
|
||||
// Step 1: Request upload URL from Discord
|
||||
// Must use fetch() directly instead of rest.post() because ./internal/discord.js's
|
||||
// RequestClient auto-converts requests to multipart/form-data when the body
|
||||
// contains a "files" key. Discord's /attachments endpoint expects JSON, so
|
||||
// the auto-conversion causes HTTP 400 "Expected Content-Type application/json".
|
||||
// RequestClient auto-converts "files" bodies to multipart/form-data, but Discord's
|
||||
// /attachments endpoint expects JSON, so this path uses a guarded raw HTTP call.
|
||||
const botToken = token;
|
||||
if (!botToken) {
|
||||
throw new Error("Discord bot token is required for voice message upload");
|
||||
}
|
||||
const uploadUrlResponse = await request(async () => {
|
||||
const url = `${rest.options?.baseUrl ?? "https://discord.com/api"}/channels/${channelId}/attachments`;
|
||||
const uploadUrlRequest = new Request(url, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
Authorization: `Bot ${botToken}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
files: [{ filename, file_size: fileSize, id: "0" }],
|
||||
}),
|
||||
const { upload_filename } = await request(async () => {
|
||||
const uploadUrlResponse = await requestVoiceUploadUrl({
|
||||
rest,
|
||||
channelId,
|
||||
botToken,
|
||||
filename,
|
||||
fileSize,
|
||||
});
|
||||
const res = await fetch(uploadUrlRequest);
|
||||
if (!res.ok) {
|
||||
if (res.status === 429) {
|
||||
const retryData = (await res.json().catch(() => ({}))) as {
|
||||
message?: string;
|
||||
retry_after?: number;
|
||||
global?: boolean;
|
||||
};
|
||||
throw createRateLimitError(res, {
|
||||
message: retryData.message ?? "You are being rate limited.",
|
||||
retry_after: retryData.retry_after ?? 1,
|
||||
global: retryData.global ?? false,
|
||||
});
|
||||
}
|
||||
const errorBody = (await res.json().catch(() => null)) as {
|
||||
code?: number;
|
||||
message?: string;
|
||||
} | null;
|
||||
const err = new Error(`Upload URL request failed: ${res.status} ${errorBody?.message ?? ""}`);
|
||||
if (errorBody?.code !== undefined) {
|
||||
(err as Error & { code: number }).code = errorBody.code;
|
||||
}
|
||||
throw err;
|
||||
|
||||
if (!uploadUrlResponse.attachments?.[0]) {
|
||||
throw new Error("Failed to get upload URL for voice message");
|
||||
}
|
||||
return (await res.json()) as UploadUrlResponse;
|
||||
}, "voice-upload-url");
|
||||
|
||||
if (!uploadUrlResponse.attachments?.[0]) {
|
||||
throw new Error("Failed to get upload URL for voice message");
|
||||
}
|
||||
|
||||
const { upload_url, upload_filename } = uploadUrlResponse.attachments[0];
|
||||
|
||||
// Step 2: Upload the file to Discord's CDN
|
||||
// Note: Not wrapped in retry runner - upload URLs are single-use and CDN behavior differs
|
||||
const uploadResponse = await fetch(upload_url, {
|
||||
method: "PUT",
|
||||
headers: {
|
||||
"Content-Type": "audio/ogg",
|
||||
},
|
||||
body: new Uint8Array(audioBuffer),
|
||||
});
|
||||
|
||||
if (!uploadResponse.ok) {
|
||||
throw new Error(`Failed to upload voice message: ${uploadResponse.status}`);
|
||||
}
|
||||
const attachment = uploadUrlResponse.attachments[0];
|
||||
await uploadVoiceAttachment({
|
||||
uploadUrl: attachment.upload_url,
|
||||
audioBuffer,
|
||||
});
|
||||
return attachment;
|
||||
}, "voice-upload");
|
||||
|
||||
// Step 3: Send the message with voice message flag and metadata
|
||||
const flags = silent
|
||||
|
||||
Reference in New Issue
Block a user