mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 17:40:44 +00:00
fix: Discord read/search timeout, session-key fallback, and gateway execution mode (#73521)
* fix: Discord read/search timeout, session-key fallback, and gateway execution mode - Add 15s timeout to readMessagesDiscord and searchMessagesDiscord so they fail fast instead of hanging indefinitely (#73431) - Fall back to CommandTargetSessionKey in dispatchReplyFromConfig when SessionKey is empty, so Discord inbound message:received hooks fire reliably (#73431, refs #33038) - Add resolveExecutionMode to Discord channel actions routing read/search through gateway timeout path, matching Telegram's pattern (#73431) * fix: move timeout to fetch layer, drop send.messages wrapper Inject AbortSignal.timeout into the Discord proxy-request-client fetch wrapper so every Discord REST call gets a 15s timeout at the HTTP level. This replaces the Promise.race wrapper in send.messages.ts — cleaner, covers all calls, and actually aborts the TCP connection. * fix: remove unused callerController variable in proxy-request-client test * fix: remove unnecessary mergeAbortSignal helper
This commit is contained in:
@@ -113,6 +113,21 @@ describe("discordMessageActions", () => {
|
|||||||
expect(discovery?.schema).toBeUndefined();
|
expect(discovery?.schema).toBeUndefined();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it.each(["read", "search"])("routes %s actions through gateway execution mode", (action) => {
|
||||||
|
expect(discordMessageActions.resolveExecutionMode?.({ action: action as never })).toBe(
|
||||||
|
"gateway",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it.each(["send", "edit", "delete", "react", "pin", "poll"])(
|
||||||
|
"routes %s actions through local execution mode",
|
||||||
|
(action) => {
|
||||||
|
expect(discordMessageActions.resolveExecutionMode?.({ action: action as never })).toBe(
|
||||||
|
"local",
|
||||||
|
);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
it("extracts send targets for message and thread reply actions", () => {
|
it("extracts send targets for message and thread reply actions", () => {
|
||||||
expect(
|
expect(
|
||||||
discordMessageActions.extractToolSend?.({
|
discordMessageActions.extractToolSend?.({
|
||||||
|
|||||||
@@ -160,6 +160,8 @@ function describeDiscordMessageTool({
|
|||||||
}
|
}
|
||||||
|
|
||||||
export const discordMessageActions: ChannelMessageActionAdapter = {
|
export const discordMessageActions: ChannelMessageActionAdapter = {
|
||||||
|
resolveExecutionMode: ({ action }) =>
|
||||||
|
action === "read" || action === "search" ? "gateway" : "local",
|
||||||
describeMessageTool: describeDiscordMessageTool,
|
describeMessageTool: describeDiscordMessageTool,
|
||||||
extractToolSend: ({ args }) => {
|
extractToolSend: ({ args }) => {
|
||||||
const action = normalizeOptionalString(args.action) ?? "";
|
const action = normalizeOptionalString(args.action) ?? "";
|
||||||
|
|||||||
66
extensions/discord/src/proxy-request-client.test.ts
Normal file
66
extensions/discord/src/proxy-request-client.test.ts
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import { describe, expect, it, vi } from "vitest";
|
||||||
|
import { createDiscordRequestClient, DISCORD_REST_TIMEOUT_MS } from "./proxy-request-client.js";
|
||||||
|
|
||||||
|
describe("createDiscordRequestClient", () => {
|
||||||
|
it("injects an abort timeout signal into fetch calls", async () => {
|
||||||
|
const fetchSpy = vi.fn(async (_input: string | URL | Request, init?: RequestInit) => {
|
||||||
|
expect(init?.signal).toBeDefined();
|
||||||
|
expect(init!.signal!.aborted).toBe(false);
|
||||||
|
return new Response(JSON.stringify([]), { status: 200 });
|
||||||
|
});
|
||||||
|
|
||||||
|
const client = createDiscordRequestClient("Bot test-token", {
|
||||||
|
fetch: fetchSpy as never,
|
||||||
|
queueRequests: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.get("/channels/123/messages");
|
||||||
|
expect(fetchSpy).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it(
|
||||||
|
"aborts hanging requests after the timeout",
|
||||||
|
async () => {
|
||||||
|
const fetchSpy = vi.fn(
|
||||||
|
(_input: string | URL | Request, init?: RequestInit) =>
|
||||||
|
new Promise<Response>((_resolve, reject) => {
|
||||||
|
init?.signal?.addEventListener("abort", () => {
|
||||||
|
reject(new DOMException("The operation was aborted.", "AbortError"));
|
||||||
|
});
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const client = createDiscordRequestClient("Bot test-token", {
|
||||||
|
fetch: fetchSpy as never,
|
||||||
|
queueRequests: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await expect(client.get("/channels/123/messages")).rejects.toThrow();
|
||||||
|
},
|
||||||
|
DISCORD_REST_TIMEOUT_MS + 5_000,
|
||||||
|
);
|
||||||
|
|
||||||
|
it("always injects a timeout signal even without a caller signal", async () => {
|
||||||
|
let receivedSignal: AbortSignal | undefined;
|
||||||
|
|
||||||
|
const fetchSpy = vi.fn(async (_input: string | URL | Request, init?: RequestInit) => {
|
||||||
|
receivedSignal = init?.signal ?? undefined;
|
||||||
|
return new Response(JSON.stringify({}), { status: 200 });
|
||||||
|
});
|
||||||
|
|
||||||
|
const client = createDiscordRequestClient("Bot test-token", {
|
||||||
|
fetch: fetchSpy as never,
|
||||||
|
queueRequests: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
await client.get("/channels/123/messages");
|
||||||
|
|
||||||
|
expect(receivedSignal).toBeDefined();
|
||||||
|
expect(receivedSignal!.aborted).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("exports a reasonable timeout constant", () => {
|
||||||
|
expect(DISCORD_REST_TIMEOUT_MS).toBeGreaterThanOrEqual(5_000);
|
||||||
|
expect(DISCORD_REST_TIMEOUT_MS).toBeLessThanOrEqual(30_000);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -3,6 +3,8 @@ import { FormData as UndiciFormData } from "undici";
|
|||||||
|
|
||||||
export type ProxyRequestClientOptions = RequestClientOptions;
|
export type ProxyRequestClientOptions = RequestClientOptions;
|
||||||
|
|
||||||
|
export const DISCORD_REST_TIMEOUT_MS = 15_000;
|
||||||
|
|
||||||
function toUndiciFormData(body: FormData): UndiciFormData {
|
function toUndiciFormData(body: FormData): UndiciFormData {
|
||||||
const converted = new UndiciFormData();
|
const converted = new UndiciFormData();
|
||||||
for (const [key, value] of body.entries()) {
|
for (const [key, value] of body.entries()) {
|
||||||
@@ -22,15 +24,17 @@ function toUndiciFormData(body: FormData): UndiciFormData {
|
|||||||
|
|
||||||
function wrapDiscordFetch(fetchImpl: NonNullable<RequestClientOptions["fetch"]>) {
|
function wrapDiscordFetch(fetchImpl: NonNullable<RequestClientOptions["fetch"]>) {
|
||||||
return (input: string | URL | Request, init?: RequestInit): Promise<Response> => {
|
return (input: string | URL | Request, init?: RequestInit): Promise<Response> => {
|
||||||
|
const signal = AbortSignal.timeout(DISCORD_REST_TIMEOUT_MS);
|
||||||
if (init?.body instanceof FormData) {
|
if (init?.body instanceof FormData) {
|
||||||
// Carbon builds global FormData; undici-backed proxy fetch needs undici's
|
// Carbon builds global FormData; undici-backed proxy fetch needs undici's
|
||||||
// FormData class to preserve multipart boundaries.
|
// FormData class to preserve multipart boundaries.
|
||||||
return fetchImpl(input, {
|
return fetchImpl(input, {
|
||||||
...init,
|
...init,
|
||||||
|
signal,
|
||||||
body: toUndiciFormData(init.body) as unknown as BodyInit,
|
body: toUndiciFormData(init.body) as unknown as BodyInit,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return fetchImpl(input, init);
|
return fetchImpl(input, { ...init, signal });
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
53
extensions/discord/src/send.messages.test.ts
Normal file
53
extensions/discord/src/send.messages.test.ts
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import { describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
const restMock = {
|
||||||
|
get: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
vi.mock("./send.shared.js", () => ({
|
||||||
|
resolveDiscordRest: () => restMock,
|
||||||
|
}));
|
||||||
|
|
||||||
|
const { readMessagesDiscord, searchMessagesDiscord } = await import("./send.messages.js");
|
||||||
|
|
||||||
|
describe("readMessagesDiscord", () => {
|
||||||
|
it("returns messages from the REST client", async () => {
|
||||||
|
const messages = [{ id: "1", content: "hello" }];
|
||||||
|
restMock.get.mockResolvedValueOnce(messages);
|
||||||
|
|
||||||
|
const result = await readMessagesDiscord("C1", { limit: 5 }, { cfg: {} as never });
|
||||||
|
|
||||||
|
expect(result).toEqual(messages);
|
||||||
|
expect(restMock.get).toHaveBeenCalledWith(expect.stringContaining("C1"), { limit: 5 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("propagates REST errors", async () => {
|
||||||
|
restMock.get.mockRejectedValueOnce(new Error("Discord API error"));
|
||||||
|
|
||||||
|
await expect(readMessagesDiscord("C1", {}, { cfg: {} as never })).rejects.toThrow(
|
||||||
|
"Discord API error",
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("searchMessagesDiscord", () => {
|
||||||
|
it("returns search results from the REST client", async () => {
|
||||||
|
const results = { messages: [[{ id: "1" }]], total_results: 1 };
|
||||||
|
restMock.get.mockResolvedValueOnce(results);
|
||||||
|
|
||||||
|
const result = await searchMessagesDiscord(
|
||||||
|
{ guildId: "G1", content: "test", limit: 1 },
|
||||||
|
{ cfg: {} as never },
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result).toEqual(results);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("propagates REST errors", async () => {
|
||||||
|
restMock.get.mockRejectedValueOnce(new Error("Discord API error"));
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
searchMessagesDiscord({ guildId: "G1", content: "test" }, { cfg: {} as never }),
|
||||||
|
).rejects.toThrow("Discord API error");
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -2824,6 +2824,34 @@ describe("dispatchReplyFromConfig", () => {
|
|||||||
expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled();
|
expect(internalHookMocks.triggerInternalHook).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("falls back to CommandTargetSessionKey for internal hook when SessionKey is empty", async () => {
|
||||||
|
setNoAbort();
|
||||||
|
const cfg = emptyConfig;
|
||||||
|
const dispatcher = createDispatcher();
|
||||||
|
const ctx = buildTestCtx({
|
||||||
|
Provider: "discord",
|
||||||
|
Surface: "discord",
|
||||||
|
CommandBody: "hello",
|
||||||
|
MessageSid: "msg-99",
|
||||||
|
});
|
||||||
|
(ctx as MsgContext).SessionKey = undefined;
|
||||||
|
(ctx as MsgContext).CommandTargetSessionKey = "agent:main:discord:guild:123";
|
||||||
|
|
||||||
|
const replyResolver = async () => ({ text: "reply" }) satisfies ReplyPayload;
|
||||||
|
await dispatchReplyFromConfig({ ctx, cfg, dispatcher, replyResolver });
|
||||||
|
|
||||||
|
expect(internalHookMocks.createInternalHookEvent).toHaveBeenCalledWith(
|
||||||
|
"message",
|
||||||
|
"received",
|
||||||
|
"agent:main:discord:guild:123",
|
||||||
|
expect.objectContaining({
|
||||||
|
content: "hello",
|
||||||
|
messageId: "msg-99",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
expect(internalHookMocks.triggerInternalHook).toHaveBeenCalledTimes(1);
|
||||||
|
});
|
||||||
|
|
||||||
it("emits diagnostics when enabled", async () => {
|
it("emits diagnostics when enabled", async () => {
|
||||||
setNoAbort();
|
setNoAbort();
|
||||||
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;
|
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;
|
||||||
|
|||||||
@@ -292,7 +292,8 @@ export async function dispatchReplyFromConfig(
|
|||||||
const channel = normalizeLowercaseStringOrEmpty(ctx.Surface ?? ctx.Provider ?? "unknown");
|
const channel = normalizeLowercaseStringOrEmpty(ctx.Surface ?? ctx.Provider ?? "unknown");
|
||||||
const chatId = ctx.To ?? ctx.From;
|
const chatId = ctx.To ?? ctx.From;
|
||||||
const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
const messageId = ctx.MessageSid ?? ctx.MessageSidFirst ?? ctx.MessageSidLast;
|
||||||
const sessionKey = ctx.SessionKey;
|
const sessionKey =
|
||||||
|
normalizeOptionalString(ctx.SessionKey) ?? normalizeOptionalString(ctx.CommandTargetSessionKey);
|
||||||
const startTime = diagnosticsEnabled ? Date.now() : 0;
|
const startTime = diagnosticsEnabled ? Date.now() : 0;
|
||||||
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
|
const canTrackSession = diagnosticsEnabled && Boolean(sessionKey);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user