diff --git a/src/config/types.discord.ts b/src/config/types.discord.ts index 531fc1106cd..9045ecb6af1 100644 --- a/src/config/types.discord.ts +++ b/src/config/types.discord.ts @@ -302,6 +302,20 @@ export type DiscordAccountConfig = { activityType?: 0 | 1 | 2 | 3 | 4 | 5; /** Streaming URL (Twitch/YouTube). Required when activityType=1. */ activityUrl?: string; + /** + * Carbon EventQueue configuration. Controls how Discord gateway events are processed. + * The most important option is `listenerTimeout` which defaults to 30s in Carbon -- + * too short for LLM calls with extended thinking. Set a higher value (e.g. 120000) + * to prevent the event queue from killing long-running message handlers. + */ + eventQueue?: { + /** Max time (ms) a single listener can run before being killed. Default: 120000. */ + listenerTimeout?: number; + /** Max events queued before backpressure is applied. Default: 10000. */ + maxQueueSize?: number; + /** Max concurrent event processing operations. Default: 50. */ + maxConcurrency?: number; + }; }; export type DiscordConfig = { diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index f0a3a8a1f95..260e202bc04 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -517,6 +517,14 @@ export const DiscordAccountSchema = z .union([z.literal(0), z.literal(1), z.literal(2), z.literal(3), z.literal(4), z.literal(5)]) .optional(), activityUrl: z.string().url().optional(), + eventQueue: z + .object({ + listenerTimeout: z.number().int().positive().optional(), + maxQueueSize: z.number().int().positive().optional(), + maxConcurrency: z.number().int().positive().optional(), + }) + .strict() + .optional(), }) .strict() .superRefine((value, ctx) => { diff --git a/src/discord/monitor/provider.test.ts b/src/discord/monitor/provider.test.ts index 731f38c32ea..e41fa45ae76 100644 --- a/src/discord/monitor/provider.test.ts +++ b/src/discord/monitor/provider.test.ts @@ -6,6 +6,7 @@ import type { RuntimeEnv } from "../../runtime.js"; const { clientFetchUserMock, clientGetPluginMock, + clientConstructorOptionsMock, createDiscordNativeCommandMock, createNoopThreadBindingManagerMock, createThreadBindingManagerMock, @@ -21,6 +22,7 @@ const { } = vi.hoisted(() => { const createdBindingManagers: Array<{ stop: ReturnType }> = []; return { + clientConstructorOptionsMock: vi.fn(), clientFetchUserMock: vi.fn(async (_target: string) => ({ id: "bot-1" })), clientGetPluginMock: vi.fn<(_name: string) => unknown>(() => undefined), createDiscordNativeCommandMock: vi.fn(() => ({ name: "mock-command" })), @@ -69,9 +71,12 @@ vi.mock("@buape/carbon", () => { class Client { listeners: unknown[]; rest: { put: ReturnType }; - constructor(_options: unknown, handlers: { listeners?: unknown[] }) { + options: unknown; + constructor(options: unknown, handlers: { listeners?: unknown[] }) { + this.options = options; this.listeners = handlers.listeners ?? []; this.rest = { put: vi.fn(async () => undefined) }; + clientConstructorOptionsMock(options); } async handleDeployRequest() { return undefined; @@ -254,6 +259,7 @@ describe("monitorDiscordProvider", () => { }) as OpenClawConfig; beforeEach(() => { + clientConstructorOptionsMock.mockClear(); clientFetchUserMock.mockClear().mockResolvedValue({ id: "bot-1" }); clientGetPluginMock.mockClear().mockReturnValue(undefined); createDiscordNativeCommandMock.mockClear().mockReturnValue({ name: "mock-command" }); @@ -334,4 +340,47 @@ describe("monitorDiscordProvider", () => { expect(lifecycleArgs.pendingGatewayErrors).toHaveLength(1); expect(String(lifecycleArgs.pendingGatewayErrors?.[0])).toContain("4014"); }); + + it("passes default eventQueue.listenerTimeout of 120s to Carbon Client", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1); + const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as { + eventQueue?: { listenerTimeout?: number }; + }; + expect(opts.eventQueue).toBeDefined(); + expect(opts.eventQueue?.listenerTimeout).toBe(120_000); + }); + + it("forwards custom eventQueue config from discord config to Carbon Client", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + + resolveDiscordAccountMock.mockImplementation(() => ({ + accountId: "default", + token: "cfg-token", + config: { + commands: { native: true, nativeSkills: false }, + voice: { enabled: false }, + agentComponents: { enabled: false }, + execApprovals: { enabled: false }, + eventQueue: { listenerTimeout: 300_000 }, + }, + })); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + expect(clientConstructorOptionsMock).toHaveBeenCalledTimes(1); + const opts = clientConstructorOptionsMock.mock.calls[0]?.[0] as { + eventQueue?: { listenerTimeout?: number }; + }; + expect(opts.eventQueue?.listenerTimeout).toBe(300_000); + }); }); diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index f8614419c5c..977995ab68e 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -517,6 +517,12 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { if (voiceEnabled) { clientPlugins.push(new VoicePlugin()); } + // Pass eventQueue config to Carbon so the listener timeout can be tuned. + // Default listenerTimeout is 120s (Carbon defaults to 30s which is too short for LLM calls). + const eventQueueOpts = { + listenerTimeout: 120_000, + ...discordCfg.eventQueue, + }; const client = new Client( { baseUrl: "http://localhost", @@ -525,6 +531,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { publicKey: "a", token, autoDeploy: false, + eventQueue: eventQueueOpts, }, { commands,