diff --git a/CHANGELOG.md b/CHANGELOG.md index cc32ac16cd4..afea749285e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -144,6 +144,7 @@ Docs: https://docs.openclaw.ai - Agents/failover service-unavailable handling: stop treating bare proxy/CDN `service unavailable` errors as provider overload while keeping them retryable via the timeout/failover path, so transient outages no longer show false rate-limit warnings or block fallback. (#36646) thanks @jnMetaCode. - Agents/current-time UTC anchor: append a machine-readable UTC suffix alongside local `Current time:` lines in shared cron-style prompt contexts so agents can compare UTC-stamped workspace timestamps without doing timezone math. (#32423) thanks @jriff. - TUI/webchat command-owner scope alignment: treat internal-channel gateway sessions with `operator.admin` as owner-authorized in command auth, restoring cron/gateway/connector tool access for affected TUI/webchat sessions while keeping external channels on identity-based owner checks. (from #35666, #35673, #35704) Thanks @Naylenv, @Octane0411, and @Sid-Qin. +- Discord/inbound timeout isolation: separate inbound worker timeout tracking from listener timeout budgets so queued Discord replies are no longer dropped when listener watchdog windows expire mid-run. (#36602) Thanks @dutifulbob. ## 2026.3.2 diff --git a/docs/channels/discord.md b/docs/channels/discord.md index b69e651eabb..86e80430f7b 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -1102,12 +1102,19 @@ openclaw logs --follow - `Listener DiscordMessageListener timed out after 30000ms for event MESSAGE_CREATE` - `Slow listener detected ...` + - `discord inbound worker timed out after ...` - Canonical knob: + Listener budget knob: - single-account: `channels.discord.eventQueue.listenerTimeout` - multi-account: `channels.discord.accounts..eventQueue.listenerTimeout` + Worker run timeout knob: + + - single-account: `channels.discord.inboundWorker.runTimeoutMs` + - multi-account: `channels.discord.accounts..inboundWorker.runTimeoutMs` + - default: `1800000` (30 minutes); set `0` to disable + Recommended baseline: ```json5 @@ -1119,6 +1126,9 @@ openclaw logs --follow eventQueue: { listenerTimeout: 120000, }, + inboundWorker: { + runTimeoutMs: 1800000, + }, }, }, }, @@ -1126,7 +1136,8 @@ openclaw logs --follow } ``` - Tune this first before adding alternate timeout controls elsewhere. + Use `eventQueue.listenerTimeout` for slow listener setup and `inboundWorker.runTimeoutMs` + only if you want a separate safety valve for queued agent turns. @@ -1177,7 +1188,8 @@ High-signal Discord fields: - startup/auth: `enabled`, `token`, `accounts.*`, `allowBots` - policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*` - command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*` -- event queue: `eventQueue.listenerTimeout` (canonical), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency` +- event queue: `eventQueue.listenerTimeout` (listener budget), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency` +- inbound worker: `inboundWorker.runTimeoutMs` - reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit` - delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage` - streaming: `streaming` (legacy alias: `streamMode`), `draftChunk`, `blockStreaming`, `blockStreamingCoalesce` diff --git a/docs/experiments/plans/discord-async-inbound-worker.md b/docs/experiments/plans/discord-async-inbound-worker.md new file mode 100644 index 00000000000..70397b51338 --- /dev/null +++ b/docs/experiments/plans/discord-async-inbound-worker.md @@ -0,0 +1,337 @@ +--- +summary: "Status and next steps for decoupling Discord gateway listeners from long-running agent turns with a Discord-specific inbound worker" +owner: "openclaw" +status: "in_progress" +last_updated: "2026-03-05" +title: "Discord Async Inbound Worker Plan" +--- + +# Discord Async Inbound Worker Plan + +## Objective + +Remove Discord listener timeout as a user-facing failure mode by making inbound Discord turns asynchronous: + +1. Gateway listener accepts and normalizes inbound events quickly. +2. A Discord run queue stores serialized jobs keyed by the same ordering boundary we use today. +3. A worker executes the actual agent turn outside the Carbon listener lifetime. +4. Replies are delivered back to the originating channel or thread after the run completes. + +This is the long-term fix for queued Discord runs timing out at `channels.discord.eventQueue.listenerTimeout` while the agent run itself is still making progress. + +## Current status + +This plan is partially implemented. + +Already done: + +- Discord listener timeout and Discord run timeout are now separate settings. +- Accepted inbound Discord turns are enqueued into `src/discord/monitor/inbound-worker.ts`. +- The worker now owns the long-running turn instead of the Carbon listener. +- Existing per-route ordering is preserved by queue key. +- Timeout regression coverage exists for the Discord worker path. + +What this means in plain language: + +- the production timeout bug is fixed +- the long-running turn no longer dies just because the Discord listener budget expires +- the worker architecture is not finished yet + +What is still missing: + +- `DiscordInboundJob` is still only partially normalized and still carries live runtime references +- command semantics (`stop`, `new`, `reset`, future session controls) are not yet fully worker-native +- worker observability and operator status are still minimal +- there is still no restart durability + +## Why this exists + +Current behavior ties the full agent turn to the listener lifetime: + +- `src/discord/monitor/listeners.ts` applies the timeout and abort boundary. +- `src/discord/monitor/message-handler.ts` keeps the queued run inside that boundary. +- `src/discord/monitor/message-handler.process.ts` performs media loading, routing, dispatch, typing, draft streaming, and final reply delivery inline. + +That architecture has two bad properties: + +- long but healthy turns can be aborted by the listener watchdog +- users can see no reply even when the downstream runtime would have produced one + +Raising the timeout helps but does not change the failure mode. + +## Non-goals + +- Do not redesign non-Discord channels in this pass. +- Do not broaden this into a generic all-channel worker framework in the first implementation. +- Do not extract a shared cross-channel inbound worker abstraction yet; only share low-level primitives when duplication is obvious. +- Do not add durable crash recovery in the first pass unless needed to land safely. +- Do not change route selection, binding semantics, or ACP policy in this plan. + +## Current constraints + +The current Discord processing path still depends on some live runtime objects that should not stay inside the long-term job payload: + +- Carbon `Client` +- raw Discord event shapes +- in-memory guild history map +- thread binding manager callbacks +- live typing and draft stream state + +We already moved execution onto a worker queue, but the normalization boundary is still incomplete. Right now the worker is "run later in the same process with some of the same live objects," not a fully data-only job boundary. + +## Target architecture + +### 1. Listener stage + +`DiscordMessageListener` remains the ingress point, but its job becomes: + +- run preflight and policy checks +- normalize accepted input into a serializable `DiscordInboundJob` +- enqueue the job into a per-session or per-channel async queue +- return immediately to Carbon once the enqueue succeeds + +The listener should no longer own the end-to-end LLM turn lifetime. + +### 2. Normalized job payload + +Introduce a serializable job descriptor that contains only the data needed to run the turn later. + +Minimum shape: + +- route identity + - `agentId` + - `sessionKey` + - `accountId` + - `channel` +- delivery identity + - destination channel id + - reply target message id + - thread id if present +- sender identity + - sender id, label, username, tag +- channel context + - guild id + - channel name or slug + - thread metadata + - resolved system prompt override +- normalized message body + - base text + - effective message text + - attachment descriptors or resolved media references +- gating decisions + - mention requirement outcome + - command authorization outcome + - bound session or agent metadata if applicable + +The job payload must not contain live Carbon objects or mutable closures. + +Current implementation status: + +- partially done +- `src/discord/monitor/inbound-job.ts` exists and defines the worker handoff +- the payload still contains live Discord runtime context and should be reduced further + +### 3. Worker stage + +Add a Discord-specific worker runner responsible for: + +- reconstructing the turn context from `DiscordInboundJob` +- loading media and any additional channel metadata needed for the run +- dispatching the agent turn +- delivering final reply payloads +- updating status and diagnostics + +Recommended location: + +- `src/discord/monitor/inbound-worker.ts` +- `src/discord/monitor/inbound-job.ts` + +### 4. Ordering model + +Ordering must remain equivalent to today for a given route boundary. + +Recommended key: + +- use the same queue key logic as `resolveDiscordRunQueueKey(...)` + +This preserves existing behavior: + +- one bound agent conversation does not interleave with itself +- different Discord channels can still progress independently + +### 5. Timeout model + +After cutover, there are two separate timeout classes: + +- listener timeout + - only covers normalization and enqueue + - should be short +- run timeout + - optional, worker-owned, explicit, and user-visible + - should not be inherited accidentally from Carbon listener settings + +This removes the current accidental coupling between "Discord gateway listener stayed alive" and "agent run is healthy." + +## Recommended implementation phases + +### Phase 1: normalization boundary + +- Status: partially implemented +- Done: + - extracted `buildDiscordInboundJob(...)` + - added worker handoff tests +- Remaining: + - make `DiscordInboundJob` plain data only + - move live runtime dependencies to worker-owned services instead of per-job payload + - stop rebuilding process context by stitching live listener refs back into the job + +### Phase 2: in-memory worker queue + +- Status: implemented +- Done: + - added `DiscordInboundWorkerQueue` keyed by resolved run queue key + - listener enqueues jobs instead of directly awaiting `processDiscordMessage(...)` + - worker executes jobs in-process, in memory only + +This is the first functional cutover. + +### Phase 3: process split + +- Status: not started +- Move delivery, typing, and draft streaming ownership behind worker-facing adapters. +- Replace direct use of live preflight context with worker context reconstruction. +- Keep `processDiscordMessage(...)` temporarily as a facade if needed, then split it. + +### Phase 4: command semantics + +- Status: not started + Make sure native Discord commands still behave correctly when work is queued: + +- `stop` +- `new` +- `reset` +- any future session-control commands + +The worker queue must expose enough run state for commands to target the active or queued turn. + +### Phase 5: observability and operator UX + +- Status: not started +- emit queue depth and active worker counts into monitor status +- record enqueue time, start time, finish time, and timeout or cancellation reason +- surface worker-owned timeout or delivery failures clearly in logs + +### Phase 6: optional durability follow-up + +- Status: not started + Only after the in-memory version is stable: + +- decide whether queued Discord jobs should survive gateway restart +- if yes, persist job descriptors and delivery checkpoints +- if no, document the explicit in-memory boundary + +This should be a separate follow-up unless restart recovery is required to land. + +## File impact + +Current primary files: + +- `src/discord/monitor/listeners.ts` +- `src/discord/monitor/message-handler.ts` +- `src/discord/monitor/message-handler.preflight.ts` +- `src/discord/monitor/message-handler.process.ts` +- `src/discord/monitor/status.ts` + +Current worker files: + +- `src/discord/monitor/inbound-job.ts` +- `src/discord/monitor/inbound-worker.ts` +- `src/discord/monitor/inbound-job.test.ts` +- `src/discord/monitor/message-handler.queue.test.ts` + +Likely next touch points: + +- `src/auto-reply/dispatch.ts` +- `src/discord/monitor/reply-delivery.ts` +- `src/discord/monitor/thread-bindings.ts` +- `src/discord/monitor/native-command.ts` + +## Next step now + +The next step is to make the worker boundary real instead of partial. + +Do this next: + +1. Move live runtime dependencies out of `DiscordInboundJob` +2. Keep those dependencies on the Discord worker instance instead +3. Reduce queued jobs to plain Discord-specific data: + - route identity + - delivery target + - sender info + - normalized message snapshot + - gating and binding decisions +4. Reconstruct worker execution context from that plain data inside the worker + +In practice, that means: + +- `client` +- `threadBindings` +- `guildHistories` +- `discordRestFetch` +- other mutable runtime-only handles + +should stop living on each queued job and instead live on the worker itself or behind worker-owned adapters. + +After that lands, the next follow-up should be command-state cleanup for `stop`, `new`, and `reset`. + +## Testing plan + +Keep the existing timeout repro coverage in: + +- `src/discord/monitor/message-handler.queue.test.ts` + +Add new tests for: + +1. listener returns after enqueue without awaiting full turn +2. per-route ordering is preserved +3. different channels still run concurrently +4. replies are delivered to the original message destination +5. `stop` cancels the active worker-owned run +6. worker failure produces visible diagnostics without blocking later jobs +7. ACP-bound Discord channels still route correctly under worker execution + +## Risks and mitigations + +- Risk: command semantics drift from current synchronous behavior + Mitigation: land command-state plumbing in the same cutover, not later + +- Risk: reply delivery loses thread or reply-to context + Mitigation: make delivery identity first-class in `DiscordInboundJob` + +- Risk: duplicate sends during retries or queue restarts + Mitigation: keep first pass in-memory only, or add explicit delivery idempotency before persistence + +- Risk: `message-handler.process.ts` becomes harder to reason about during migration + Mitigation: split into normalization, execution, and delivery helpers before or during worker cutover + +## Acceptance criteria + +The plan is complete when: + +1. Discord listener timeout no longer aborts healthy long-running turns. +2. Listener lifetime and agent-turn lifetime are separate concepts in code. +3. Existing per-session ordering is preserved. +4. ACP-bound Discord channels work through the same worker path. +5. `stop` targets the worker-owned run instead of the old listener-owned call stack. +6. Timeout and delivery failures become explicit worker outcomes, not silent listener drops. + +## Remaining landing strategy + +Finish this in follow-up PRs: + +1. make `DiscordInboundJob` plain-data only and move live runtime refs onto the worker +2. clean up command-state ownership for `stop`, `new`, and `reset` +3. add worker observability and operator status +4. decide whether durability is needed or explicitly document the in-memory boundary + +This is still a bounded follow-up if kept Discord-only and if we continue to avoid a premature cross-channel worker abstraction. diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index 2bcc14f3d4a..b260017362a 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -1,3 +1,7 @@ +import { + DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, + DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, +} from "../discord/monitor/timeouts.js"; import { MEDIA_AUDIO_FIELD_HELP } from "./media-audio-field-metadata.js"; import { IRC_FIELD_HELP } from "./schema.irc.js"; @@ -1451,8 +1455,8 @@ export const FIELD_HELP: Record = { "channels.discord.retry.maxDelayMs": "Maximum retry delay cap in ms for Discord outbound calls.", "channels.discord.retry.jitter": "Jitter factor (0-1) applied to Discord retry delays.", "channels.discord.maxLinesPerMessage": "Soft max line count per Discord message (default: 17).", - "channels.discord.eventQueue.listenerTimeout": - "Canonical Discord listener timeout control in ms for gateway event handlers. Default is 120000 in OpenClaw; set per account via channels.discord.accounts..eventQueue.listenerTimeout.", + "channels.discord.inboundWorker.runTimeoutMs": `Optional queued Discord inbound worker timeout in ms. This is separate from Carbon listener timeouts; defaults to ${DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS} and can be disabled with 0. Set per account via channels.discord.accounts..inboundWorker.runTimeoutMs.`, + "channels.discord.eventQueue.listenerTimeout": `Canonical Discord listener timeout control in ms for gateway normalization/enqueue handlers. Default is ${DISCORD_DEFAULT_LISTENER_TIMEOUT_MS} in OpenClaw; set per account via channels.discord.accounts..eventQueue.listenerTimeout.`, "channels.discord.eventQueue.maxQueueSize": "Optional Discord EventQueue capacity override (max queued events before backpressure). Set per account via channels.discord.accounts..eventQueue.maxQueueSize.", "channels.discord.eventQueue.maxConcurrency": diff --git a/src/config/schema.labels.ts b/src/config/schema.labels.ts index adbe5431e90..5908a370c37 100644 --- a/src/config/schema.labels.ts +++ b/src/config/schema.labels.ts @@ -722,6 +722,7 @@ export const FIELD_LABELS: Record = { "channels.discord.retry.maxDelayMs": "Discord Retry Max Delay (ms)", "channels.discord.retry.jitter": "Discord Retry Jitter", "channels.discord.maxLinesPerMessage": "Discord Max Lines Per Message", + "channels.discord.inboundWorker.runTimeoutMs": "Discord Inbound Worker Timeout (ms)", "channels.discord.eventQueue.listenerTimeout": "Discord EventQueue Listener Timeout (ms)", "channels.discord.eventQueue.maxQueueSize": "Discord EventQueue Max Queue Size", "channels.discord.eventQueue.maxConcurrency": "Discord EventQueue Max Concurrency", diff --git a/src/config/types.discord.ts b/src/config/types.discord.ts index 0473fbf42f1..2d2e674f6b6 100644 --- a/src/config/types.discord.ts +++ b/src/config/types.discord.ts @@ -330,11 +330,21 @@ export type DiscordAccountConfig = { activityType?: 0 | 1 | 2 | 3 | 4 | 5; /** Streaming URL (Twitch/YouTube). Required when activityType=1. */ activityUrl?: string; + /** + * In-process worker settings for queued inbound Discord runs. + * This is separate from Carbon's eventQueue listener budget. + */ + inboundWorker?: { + /** + * Max time (ms) a queued inbound run may execute before OpenClaw aborts it. + * Defaults to 1800000 (30 minutes). Set 0 to disable the worker-owned timeout. + */ + runTimeoutMs?: number; + }; /** * Carbon EventQueue configuration. Controls how Discord gateway events are processed. - * The most important option is `listenerTimeout` which defaults to 30s in Carbon -- - * too short for LLM calls with extended thinking. Set a higher value (e.g. 120000) - * to prevent the event queue from killing long-running message handlers. + * `listenerTimeout` only covers gateway listener work such as normalization and enqueue. + * It does not control the lifetime of queued inbound agent turns. */ eventQueue?: { /** Max time (ms) a single listener can run before being killed. Default: 120000. */ diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 8ad07d39910..55a98c5f827 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -528,6 +528,12 @@ export const DiscordAccountSchema = z .union([z.literal(0), z.literal(1), z.literal(2), z.literal(3), z.literal(4), z.literal(5)]) .optional(), activityUrl: z.string().url().optional(), + inboundWorker: z + .object({ + runTimeoutMs: z.number().int().nonnegative().optional(), + }) + .strict() + .optional(), eventQueue: z .object({ listenerTimeout: z.number().int().positive().optional(), diff --git a/src/discord/monitor/inbound-job.test.ts b/src/discord/monitor/inbound-job.test.ts new file mode 100644 index 00000000000..0fda69821eb --- /dev/null +++ b/src/discord/monitor/inbound-job.test.ts @@ -0,0 +1,148 @@ +import { Message } from "@buape/carbon"; +import { describe, expect, it } from "vitest"; +import { buildDiscordInboundJob, materializeDiscordInboundJob } from "./inbound-job.js"; +import { createBaseDiscordMessageContext } from "./message-handler.test-harness.js"; + +describe("buildDiscordInboundJob", () => { + it("keeps live runtime references out of the payload", async () => { + const ctx = await createBaseDiscordMessageContext({ + message: { + id: "m1", + channelId: "thread-1", + timestamp: new Date().toISOString(), + attachments: [], + channel: { + id: "thread-1", + isThread: () => true, + }, + }, + data: { + guild: { id: "g1", name: "Guild" }, + message: { + id: "m1", + channelId: "thread-1", + timestamp: new Date().toISOString(), + attachments: [], + channel: { + id: "thread-1", + isThread: () => true, + }, + }, + }, + threadChannel: { + id: "thread-1", + name: "codex", + parentId: "forum-1", + parent: { + id: "forum-1", + name: "Forum", + }, + ownerId: "user-1", + }, + }); + + const job = buildDiscordInboundJob(ctx); + + expect("runtime" in job.payload).toBe(false); + expect("client" in job.payload).toBe(false); + expect("threadBindings" in job.payload).toBe(false); + expect("discordRestFetch" in job.payload).toBe(false); + expect("channel" in job.payload.message).toBe(false); + expect("channel" in job.payload.data.message).toBe(false); + expect(job.runtime.client).toBe(ctx.client); + expect(job.runtime.threadBindings).toBe(ctx.threadBindings); + expect(job.payload.threadChannel).toEqual({ + id: "thread-1", + name: "codex", + parentId: "forum-1", + parent: { + id: "forum-1", + name: "Forum", + }, + ownerId: "user-1", + }); + expect(() => JSON.stringify(job.payload)).not.toThrow(); + }); + + it("re-materializes the process context with an overridden abort signal", async () => { + const ctx = await createBaseDiscordMessageContext(); + const job = buildDiscordInboundJob(ctx); + const overrideAbortController = new AbortController(); + + const rematerialized = materializeDiscordInboundJob(job, overrideAbortController.signal); + + expect(rematerialized.runtime).toBe(ctx.runtime); + expect(rematerialized.client).toBe(ctx.client); + expect(rematerialized.threadBindings).toBe(ctx.threadBindings); + expect(rematerialized.abortSignal).toBe(overrideAbortController.signal); + expect(rematerialized.message).toEqual(job.payload.message); + expect(rematerialized.data).toEqual(job.payload.data); + }); + + it("preserves Carbon message getters across queued jobs", async () => { + const ctx = await createBaseDiscordMessageContext(); + const message = new Message( + ctx.client as never, + { + id: "m1", + channel_id: "c1", + content: "hello", + attachments: [{ id: "a1", filename: "note.txt" }], + timestamp: new Date().toISOString(), + author: { + id: "u1", + username: "alice", + discriminator: "0", + avatar: null, + }, + referenced_message: { + id: "m0", + channel_id: "c1", + content: "earlier", + attachments: [], + timestamp: new Date().toISOString(), + author: { + id: "u2", + username: "bob", + discriminator: "0", + avatar: null, + }, + type: 0, + tts: false, + mention_everyone: false, + pinned: false, + flags: 0, + }, + type: 0, + tts: false, + mention_everyone: false, + pinned: false, + flags: 0, + } as ConstructorParameters[1], + ); + const runtimeChannel = { id: "c1", isThread: () => false }; + Object.defineProperty(message, "channel", { + value: runtimeChannel, + configurable: true, + enumerable: true, + writable: true, + }); + + const job = buildDiscordInboundJob({ + ...ctx, + message, + data: { + ...ctx.data, + message, + }, + }); + const rematerialized = materializeDiscordInboundJob(job); + + expect(job.payload.message).toBeInstanceOf(Message); + expect("channel" in job.payload.message).toBe(false); + expect(rematerialized.message.content).toBe("hello"); + expect(rematerialized.message.attachments).toHaveLength(1); + expect(rematerialized.message.timestamp).toBe(message.timestamp); + expect(rematerialized.message.referencedMessage?.content).toBe("earlier"); + }); +}); diff --git a/src/discord/monitor/inbound-job.ts b/src/discord/monitor/inbound-job.ts new file mode 100644 index 00000000000..2f8c9520f12 --- /dev/null +++ b/src/discord/monitor/inbound-job.ts @@ -0,0 +1,111 @@ +import type { DiscordMessagePreflightContext } from "./message-handler.preflight.types.js"; + +type DiscordInboundJobRuntimeField = + | "runtime" + | "abortSignal" + | "guildHistories" + | "client" + | "threadBindings" + | "discordRestFetch"; + +export type DiscordInboundJobRuntime = Pick< + DiscordMessagePreflightContext, + DiscordInboundJobRuntimeField +>; + +export type DiscordInboundJobPayload = Omit< + DiscordMessagePreflightContext, + DiscordInboundJobRuntimeField +>; + +export type DiscordInboundJob = { + queueKey: string; + payload: DiscordInboundJobPayload; + runtime: DiscordInboundJobRuntime; +}; + +export function resolveDiscordInboundJobQueueKey(ctx: DiscordMessagePreflightContext): string { + const sessionKey = ctx.route.sessionKey?.trim(); + if (sessionKey) { + return sessionKey; + } + const baseSessionKey = ctx.baseSessionKey?.trim(); + if (baseSessionKey) { + return baseSessionKey; + } + return ctx.messageChannelId; +} + +export function buildDiscordInboundJob(ctx: DiscordMessagePreflightContext): DiscordInboundJob { + const { + runtime, + abortSignal, + guildHistories, + client, + threadBindings, + discordRestFetch, + message, + data, + threadChannel, + ...payload + } = ctx; + + const sanitizedMessage = sanitizeDiscordInboundMessage(message); + return { + queueKey: resolveDiscordInboundJobQueueKey(ctx), + payload: { + ...payload, + message: sanitizedMessage, + data: { + ...data, + message: sanitizedMessage, + }, + threadChannel: normalizeDiscordThreadChannel(threadChannel), + }, + runtime: { + runtime, + abortSignal, + guildHistories, + client, + threadBindings, + discordRestFetch, + }, + }; +} + +export function materializeDiscordInboundJob( + job: DiscordInboundJob, + abortSignal?: AbortSignal, +): DiscordMessagePreflightContext { + return { + ...job.payload, + ...job.runtime, + abortSignal: abortSignal ?? job.runtime.abortSignal, + }; +} + +function sanitizeDiscordInboundMessage(message: T): T { + const descriptors = Object.getOwnPropertyDescriptors(message); + delete descriptors.channel; + return Object.create(Object.getPrototypeOf(message), descriptors) as T; +} + +function normalizeDiscordThreadChannel( + threadChannel: DiscordMessagePreflightContext["threadChannel"], +): DiscordMessagePreflightContext["threadChannel"] { + if (!threadChannel) { + return null; + } + return { + id: threadChannel.id, + name: threadChannel.name, + parentId: threadChannel.parentId, + parent: threadChannel.parent + ? { + id: threadChannel.parent.id, + name: threadChannel.parent.name, + } + : undefined, + ownerId: threadChannel.ownerId, + }; +} diff --git a/src/discord/monitor/inbound-worker.ts b/src/discord/monitor/inbound-worker.ts new file mode 100644 index 00000000000..eb4337cb913 --- /dev/null +++ b/src/discord/monitor/inbound-worker.ts @@ -0,0 +1,105 @@ +import { createRunStateMachine } from "../../channels/run-state-machine.js"; +import { danger } from "../../globals.js"; +import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts"; +import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; +import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js"; +import type { RuntimeEnv } from "./message-handler.preflight.types.js"; +import { processDiscordMessage } from "./message-handler.process.js"; +import type { DiscordMonitorStatusSink } from "./status.js"; +import { normalizeDiscordInboundWorkerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js"; + +type DiscordInboundWorkerParams = { + runtime: RuntimeEnv; + setStatus?: DiscordMonitorStatusSink; + abortSignal?: AbortSignal; + runTimeoutMs?: number; +}; + +export type DiscordInboundWorker = { + enqueue: (job: DiscordInboundJob) => void; + deactivate: () => void; +}; + +function formatDiscordRunContextSuffix(job: DiscordInboundJob): string { + const channelId = job.payload.messageChannelId?.trim(); + const messageId = job.payload.data?.message?.id?.trim(); + const details = [ + channelId ? `channelId=${channelId}` : null, + messageId ? `messageId=${messageId}` : null, + ].filter((entry): entry is string => Boolean(entry)); + if (details.length === 0) { + return ""; + } + return ` (${details.join(", ")})`; +} + +async function processDiscordInboundJob(params: { + job: DiscordInboundJob; + runtime: RuntimeEnv; + lifecycleSignal?: AbortSignal; + runTimeoutMs?: number; +}) { + const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs); + const contextSuffix = formatDiscordRunContextSuffix(params.job); + await runDiscordTaskWithTimeout({ + run: async (abortSignal) => { + await processDiscordMessage(materializeDiscordInboundJob(params.job, abortSignal)); + }, + timeoutMs, + abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal], + onTimeout: (resolvedTimeoutMs) => { + params.runtime.error?.( + danger( + `discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, { + decimals: 1, + unit: "seconds", + })}${contextSuffix}`, + ), + ); + }, + onErrorAfterTimeout: (error) => { + params.runtime.error?.( + danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`), + ); + }, + }); +} + +export function createDiscordInboundWorker( + params: DiscordInboundWorkerParams, +): DiscordInboundWorker { + const runQueue = new KeyedAsyncQueue(); + const runState = createRunStateMachine({ + setStatus: params.setStatus, + abortSignal: params.abortSignal, + }); + + return { + enqueue(job) { + void runQueue + .enqueue(job.queueKey, async () => { + if (!runState.isActive()) { + return; + } + runState.onRunStart(); + try { + if (!runState.isActive()) { + return; + } + await processDiscordInboundJob({ + job, + runtime: params.runtime, + lifecycleSignal: params.abortSignal, + runTimeoutMs: params.runTimeoutMs, + }); + } finally { + runState.onRunEnd(); + } + }) + .catch((error) => { + params.runtime.error?.(danger(`discord inbound worker failed: ${String(error)}`)); + }); + }, + deactivate: runState.deactivate, + }; +} diff --git a/src/discord/monitor/listeners.ts b/src/discord/monitor/listeners.ts index 5297460e228..4ca94de098d 100644 --- a/src/discord/monitor/listeners.ts +++ b/src/discord/monitor/listeners.ts @@ -34,6 +34,7 @@ import { resolveDiscordChannelInfo } from "./message-utils.js"; import { setPresence } from "./presence-cache.js"; import { isThreadArchived } from "./thread-bindings.discord-api.js"; import { closeDiscordThreadSessions } from "./thread-session-close.js"; +import { normalizeDiscordListenerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js"; type LoadedConfig = ReturnType; type RuntimeEnv = import("../../runtime.js").RuntimeEnv; @@ -70,16 +71,8 @@ type DiscordReactionRoutingParams = { }; const DISCORD_SLOW_LISTENER_THRESHOLD_MS = 30_000; -const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000; const discordEventQueueLog = createSubsystemLogger("discord/event-queue"); -function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number { - if (!Number.isFinite(raw) || (raw ?? 0) <= 0) { - return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS; - } - return Math.max(1_000, Math.floor(raw!)); -} - function formatListenerContextValue(value: unknown): string | null { if (value === undefined || value === null) { return null; @@ -138,57 +131,44 @@ async function runDiscordListenerWithSlowLog(params: { logger: Logger | undefined; listener: string; event: string; - run: (abortSignal: AbortSignal) => Promise; + run: (abortSignal: AbortSignal | undefined) => Promise; timeoutMs?: number; context?: Record; onError?: (err: unknown) => void; }) { const startedAt = Date.now(); const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs); - let timedOut = false; - let timeoutHandle: ReturnType | null = null; const logger = params.logger ?? discordEventQueueLog; - const abortController = new AbortController(); - const runPromise = params.run(abortController.signal).catch((err) => { - if (timedOut) { - const errorName = - err && typeof err === "object" && "name" in err ? String(err.name) : undefined; - if (abortController.signal.aborted && errorName === "AbortError") { + let timedOut = false; + + try { + timedOut = await runDiscordTaskWithTimeout({ + run: params.run, + timeoutMs, + onTimeout: (resolvedTimeoutMs) => { + logger.error( + danger( + `discord handler timed out after ${formatDurationSeconds(resolvedTimeoutMs, { + decimals: 1, + unit: "seconds", + })}${formatListenerContextSuffix(params.context)}`, + ), + ); + }, + onAbortAfterTimeout: () => { logger.warn( `discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`, ); - return; - } - logger.error( - danger( - `discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`, - ), - ); - return; - } - throw err; - }); - - try { - const timeoutPromise = new Promise<"timeout">((resolve) => { - timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs); - timeoutHandle.unref?.(); + }, + onErrorAfterTimeout: (err) => { + logger.error( + danger( + `discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`, + ), + ); + }, }); - const result = await Promise.race([ - runPromise.then(() => "completed" as const), - timeoutPromise, - ]); - if (result === "timeout") { - timedOut = true; - abortController.abort(); - logger.error( - danger( - `discord handler timed out after ${formatDurationSeconds(timeoutMs, { - decimals: 1, - unit: "seconds", - })}${formatListenerContextSuffix(params.context)}`, - ), - ); + if (timedOut) { return; } } catch (err) { @@ -198,9 +178,6 @@ async function runDiscordListenerWithSlowLog(params: { } throw err; } finally { - if (timeoutHandle) { - clearTimeout(timeoutHandle); - } if (!timedOut) { logSlowDiscordListener({ logger: params.logger, diff --git a/src/discord/monitor/message-handler.process.ts b/src/discord/monitor/message-handler.process.ts index 3b7082dc218..1fb0e8590c1 100644 --- a/src/discord/monitor/message-handler.process.ts +++ b/src/discord/monitor/message-handler.process.ts @@ -1,4 +1,4 @@ -import { ChannelType } from "@buape/carbon"; +import { ChannelType, type RequestClient } from "@buape/carbon"; import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js"; import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js"; import { resolveChunkMode } from "../../auto-reply/chunk.js"; @@ -161,15 +161,17 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) }), ); const statusReactionsEnabled = shouldAckReaction(); + // Discord outbound helpers expect Carbon's request client shape explicitly. + const discordRest = client.rest as unknown as RequestClient; const discordAdapter: StatusReactionAdapter = { setReaction: async (emoji) => { await reactMessageDiscord(messageChannelId, message.id, emoji, { - rest: client.rest as never, + rest: discordRest, }); }, removeReaction: async (emoji) => { await removeReactionDiscord(messageChannelId, message.id, emoji, { - rest: client.rest as never, + rest: discordRest, }); }, }; diff --git a/src/discord/monitor/message-handler.queue.test.ts b/src/discord/monitor/message-handler.queue.test.ts index 9ab7914adcc..45fbfeee278 100644 --- a/src/discord/monitor/message-handler.queue.test.ts +++ b/src/discord/monitor/message-handler.queue.test.ts @@ -4,6 +4,7 @@ import { createNoopThreadBindingManager } from "./thread-bindings.js"; const preflightDiscordMessageMock = vi.hoisted(() => vi.fn()); const processDiscordMessageMock = vi.hoisted(() => vi.fn()); +const eventualReplyDeliveredMock = vi.hoisted(() => vi.fn()); vi.mock("./message-handler.preflight.js", () => ({ preflightDiscordMessage: preflightDiscordMessageMock, @@ -26,7 +27,7 @@ function createDeferred() { function createHandlerParams(overrides?: { setStatus?: (patch: Record) => void; abortSignal?: AbortSignal; - listenerTimeoutMs?: number; + workerRunTimeoutMs?: number; }) { const cfg: OpenClawConfig = { channels: { @@ -65,7 +66,7 @@ function createHandlerParams(overrides?: { threadBindings: createNoopThreadBindingManager("default"), setStatus: overrides?.setStatus, abortSignal: overrides?.abortSignal, - listenerTimeoutMs: overrides?.listenerTimeoutMs, + workerRunTimeoutMs: overrides?.workerRunTimeoutMs, }; } @@ -85,6 +86,19 @@ function createMessageData(messageId: string, channelId = "ch-1") { function createPreflightContext(channelId = "ch-1") { return { + data: { + channel_id: channelId, + message: { + id: `msg-${channelId}`, + channel_id: channelId, + attachments: [], + }, + }, + message: { + id: `msg-${channelId}`, + channel_id: channelId, + attachments: [], + }, route: { sessionKey: `agent:main:discord:channel:${channelId}`, }, @@ -169,7 +183,7 @@ describe("createDiscordMessageHandler queue behavior", () => { }); }); - it("applies listener timeout to queued runs so stalled runs do not block the queue", async () => { + it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => { vi.useFakeTimers(); try { preflightDiscordMessageMock.mockReset(); @@ -191,7 +205,7 @@ describe("createDiscordMessageHandler queue behavior", () => { createPreflightContext(params.data.channel_id), ); - const params = createHandlerParams({ listenerTimeoutMs: 50 }); + const params = createHandlerParams({ workerRunTimeoutMs: 50 }); const handler = createDiscordMessageHandler(params); await expect( @@ -211,7 +225,50 @@ describe("createDiscordMessageHandler queue behavior", () => { | undefined; expect(firstCtx?.abortSignal?.aborted).toBe(true); expect(params.runtime.error).toHaveBeenCalledWith( - expect.stringContaining("discord queued run timed out after"), + expect.stringContaining("discord inbound worker timed out after"), + ); + } finally { + vi.useRealTimers(); + } + }); + + it("does not time out queued runs when the inbound worker timeout is disabled", async () => { + vi.useFakeTimers(); + try { + preflightDiscordMessageMock.mockReset(); + processDiscordMessageMock.mockReset(); + eventualReplyDeliveredMock.mockReset(); + + processDiscordMessageMock.mockImplementationOnce( + async (ctx: { abortSignal?: AbortSignal }) => { + await new Promise((resolve) => { + setTimeout(() => { + if (!ctx.abortSignal?.aborted) { + eventualReplyDeliveredMock(); + } + resolve(); + }, 80); + }); + }, + ); + preflightDiscordMessageMock.mockImplementation( + async (params: { data: { channel_id: string } }) => + createPreflightContext(params.data.channel_id), + ); + + const params = createHandlerParams({ workerRunTimeoutMs: 0 }); + const handler = createDiscordMessageHandler(params); + + await expect( + handler(createMessageData("m-1") as never, {} as never), + ).resolves.toBeUndefined(); + + await vi.advanceTimersByTimeAsync(80); + await Promise.resolve(); + + expect(eventualReplyDeliveredMock).toHaveBeenCalledTimes(1); + expect(params.runtime.error).not.toHaveBeenCalledWith( + expect.stringContaining("discord inbound worker timed out after"), ); } finally { vi.useRealTimers(); diff --git a/src/discord/monitor/message-handler.ts b/src/discord/monitor/message-handler.ts index 2d8a245c328..02a65041983 100644 --- a/src/discord/monitor/message-handler.ts +++ b/src/discord/monitor/message-handler.ts @@ -3,18 +3,13 @@ import { createChannelInboundDebouncer, shouldDebounceTextInbound, } from "../../channels/inbound-debounce-policy.js"; -import { createRunStateMachine } from "../../channels/run-state-machine.js"; import { resolveOpenProviderRuntimeGroupPolicy } from "../../config/runtime-group-policy.js"; import { danger } from "../../globals.js"; -import { formatDurationSeconds } from "../../infra/format-time/format-duration.ts"; -import { KeyedAsyncQueue } from "../../plugin-sdk/keyed-async-queue.js"; +import { buildDiscordInboundJob } from "./inbound-job.js"; +import { createDiscordInboundWorker } from "./inbound-worker.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { preflightDiscordMessage } from "./message-handler.preflight.js"; -import type { - DiscordMessagePreflightContext, - DiscordMessagePreflightParams, -} from "./message-handler.preflight.types.js"; -import { processDiscordMessage } from "./message-handler.process.js"; +import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js"; import { hasDiscordMessageStickers, resolveDiscordMessageChannelId, @@ -28,154 +23,13 @@ type DiscordMessageHandlerParams = Omit< > & { setStatus?: DiscordMonitorStatusSink; abortSignal?: AbortSignal; - listenerTimeoutMs?: number; + workerRunTimeoutMs?: number; }; export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & { deactivate: () => void; }; -const DEFAULT_DISCORD_RUN_TIMEOUT_MS = 120_000; -const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647; - -function normalizeDiscordRunTimeoutMs(timeoutMs?: number): number { - if (typeof timeoutMs !== "number" || !Number.isFinite(timeoutMs) || timeoutMs <= 0) { - return DEFAULT_DISCORD_RUN_TIMEOUT_MS; - } - return Math.max(1, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS)); -} - -function isAbortError(error: unknown): boolean { - if (typeof error !== "object" || error === null) { - return false; - } - return "name" in error && String((error as { name?: unknown }).name) === "AbortError"; -} - -function formatDiscordRunContextSuffix(ctx: DiscordMessagePreflightContext): string { - const eventData = ctx as { - data?: { - channel_id?: string; - message?: { - id?: string; - }; - }; - }; - const channelId = ctx.messageChannelId?.trim() || eventData.data?.channel_id?.trim(); - const messageId = eventData.data?.message?.id?.trim(); - const details = [ - channelId ? `channelId=${channelId}` : null, - messageId ? `messageId=${messageId}` : null, - ].filter((entry): entry is string => Boolean(entry)); - if (details.length === 0) { - return ""; - } - return ` (${details.join(", ")})`; -} - -function mergeAbortSignals(signals: Array): AbortSignal | undefined { - const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal)); - if (activeSignals.length === 0) { - return undefined; - } - if (activeSignals.length === 1) { - return activeSignals[0]; - } - if (typeof AbortSignal.any === "function") { - return AbortSignal.any(activeSignals); - } - const fallbackController = new AbortController(); - for (const signal of activeSignals) { - if (signal.aborted) { - fallbackController.abort(); - return fallbackController.signal; - } - } - const abortFallback = () => { - fallbackController.abort(); - for (const signal of activeSignals) { - signal.removeEventListener("abort", abortFallback); - } - }; - for (const signal of activeSignals) { - signal.addEventListener("abort", abortFallback, { once: true }); - } - return fallbackController.signal; -} - -async function processDiscordRunWithTimeout(params: { - ctx: DiscordMessagePreflightContext; - runtime: DiscordMessagePreflightParams["runtime"]; - lifecycleSignal?: AbortSignal; - timeoutMs?: number; -}) { - const timeoutMs = normalizeDiscordRunTimeoutMs(params.timeoutMs); - const timeoutAbortController = new AbortController(); - const combinedSignal = mergeAbortSignals([ - params.ctx.abortSignal, - params.lifecycleSignal, - timeoutAbortController.signal, - ]); - const processCtx = - combinedSignal && combinedSignal !== params.ctx.abortSignal - ? { ...params.ctx, abortSignal: combinedSignal } - : params.ctx; - const contextSuffix = formatDiscordRunContextSuffix(params.ctx); - let timedOut = false; - let timeoutHandle: ReturnType | null = null; - const processPromise = processDiscordMessage(processCtx).catch((error) => { - if (timedOut) { - if (timeoutAbortController.signal.aborted && isAbortError(error)) { - return; - } - params.runtime.error?.( - danger(`discord queued run failed after timeout: ${String(error)}${contextSuffix}`), - ); - return; - } - throw error; - }); - - try { - const timeoutPromise = new Promise<"timeout">((resolve) => { - timeoutHandle = setTimeout(() => resolve("timeout"), timeoutMs); - timeoutHandle.unref?.(); - }); - const result = await Promise.race([ - processPromise.then(() => "completed" as const), - timeoutPromise, - ]); - if (result === "timeout") { - timedOut = true; - timeoutAbortController.abort(); - params.runtime.error?.( - danger( - `discord queued run timed out after ${formatDurationSeconds(timeoutMs, { - decimals: 1, - unit: "seconds", - })}${contextSuffix}`, - ), - ); - } - } finally { - if (timeoutHandle) { - clearTimeout(timeoutHandle); - } - } -} - -function resolveDiscordRunQueueKey(ctx: DiscordMessagePreflightContext): string { - const sessionKey = ctx.route.sessionKey?.trim(); - if (sessionKey) { - return sessionKey; - } - const baseSessionKey = ctx.baseSessionKey?.trim(); - if (baseSessionKey) { - return baseSessionKey; - } - return ctx.messageChannelId; -} - export function createDiscordMessageHandler( params: DiscordMessageHandlerParams, ): DiscordMessageHandlerWithLifecycle { @@ -188,39 +42,13 @@ export function createDiscordMessageHandler( params.discordConfig?.ackReactionScope ?? params.cfg.messages?.ackReactionScope ?? "group-mentions"; - const runQueue = new KeyedAsyncQueue(); - const runState = createRunStateMachine({ + const inboundWorker = createDiscordInboundWorker({ + runtime: params.runtime, setStatus: params.setStatus, abortSignal: params.abortSignal, + runTimeoutMs: params.workerRunTimeoutMs, }); - const enqueueDiscordRun = (ctx: DiscordMessagePreflightContext) => { - const queueKey = resolveDiscordRunQueueKey(ctx); - void runQueue - .enqueue(queueKey, async () => { - if (!runState.isActive()) { - return; - } - runState.onRunStart(); - try { - if (!runState.isActive()) { - return; - } - await processDiscordRunWithTimeout({ - ctx, - runtime: params.runtime, - lifecycleSignal: params.abortSignal, - timeoutMs: params.listenerTimeoutMs, - }); - } finally { - runState.onRunEnd(); - } - }) - .catch((err) => { - params.runtime.error?.(danger(`discord process failed: ${String(err)}`)); - }); - }; - const { debouncer } = createChannelInboundDebouncer<{ data: DiscordMessageEvent; client: Client; @@ -279,7 +107,7 @@ export function createDiscordMessageHandler( if (!ctx) { return; } - enqueueDiscordRun(ctx); + inboundWorker.enqueue(buildDiscordInboundJob(ctx)); return; } const combinedBaseText = entries @@ -324,7 +152,7 @@ export function createDiscordMessageHandler( ctxBatch.MessageSidLast = ids[ids.length - 1]; } } - enqueueDiscordRun(ctx); + inboundWorker.enqueue(buildDiscordInboundJob(ctx)); }, onError: (err) => { params.runtime.error?.(danger(`discord debounce flush failed: ${String(err)}`)); @@ -352,7 +180,7 @@ export function createDiscordMessageHandler( } }; - handler.deactivate = runState.deactivate; + handler.deactivate = inboundWorker.deactivate; return handler; } diff --git a/src/discord/monitor/provider.test.ts b/src/discord/monitor/provider.test.ts index e3bc0ca36c1..3a52f1eb989 100644 --- a/src/discord/monitor/provider.test.ts +++ b/src/discord/monitor/provider.test.ts @@ -22,6 +22,7 @@ const { clientConstructorOptionsMock, createDiscordAutoPresenceControllerMock, createDiscordNativeCommandMock, + createDiscordMessageHandlerMock, createNoopThreadBindingManagerMock, createThreadBindingManagerMock, reconcileAcpThreadBindingsOnStartupMock, @@ -49,6 +50,14 @@ const { clientFetchUserMock: vi.fn(async (_target: string) => ({ id: "bot-1" })), clientGetPluginMock: vi.fn<(_name: string) => unknown>(() => undefined), createDiscordNativeCommandMock: vi.fn(() => ({ name: "mock-command" })), + createDiscordMessageHandlerMock: vi.fn(() => + Object.assign( + vi.fn(async () => undefined), + { + deactivate: vi.fn(), + }, + ), + ), createNoopThreadBindingManagerMock: vi.fn(() => { const manager = { stop: vi.fn() }; createdBindingManagers.push(manager); @@ -248,7 +257,7 @@ vi.mock("./listeners.js", () => ({ })); vi.mock("./message-handler.js", () => ({ - createDiscordMessageHandler: () => ({ handle: vi.fn() }), + createDiscordMessageHandler: createDiscordMessageHandlerMock, })); vi.mock("./native-command.js", () => ({ @@ -346,6 +355,14 @@ describe("monitorDiscordProvider", () => { refresh: vi.fn(), runNow: vi.fn(), })); + createDiscordMessageHandlerMock.mockClear().mockImplementation(() => + Object.assign( + vi.fn(async () => undefined), + { + deactivate: vi.fn(), + }, + ), + ); clientFetchUserMock.mockClear().mockResolvedValue({ id: "bot-1" }); clientGetPluginMock.mockClear().mockReturnValue(undefined); createDiscordNativeCommandMock.mockClear().mockReturnValue({ name: "mock-command" }); @@ -629,6 +646,63 @@ describe("monitorDiscordProvider", () => { expect(eventQueue?.listenerTimeout).toBe(300_000); }); + it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + + resolveDiscordAccountMock.mockImplementation(() => ({ + accountId: "default", + token: "cfg-token", + config: { + commands: { native: true, nativeSkills: false }, + voice: { enabled: false }, + agentComponents: { enabled: false }, + execApprovals: { enabled: false }, + eventQueue: { listenerTimeout: 50_000 }, + }, + })); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + expect(createDiscordMessageHandlerMock).toHaveBeenCalledTimes(1); + const firstCall = createDiscordMessageHandlerMock.mock.calls.at(0) as + | [{ workerRunTimeoutMs?: number; listenerTimeoutMs?: number }] + | undefined; + const params = firstCall?.[0]; + expect(params?.workerRunTimeoutMs).toBeUndefined(); + expect("listenerTimeoutMs" in (params ?? {})).toBe(false); + }); + + it("forwards inbound worker timeout config to the Discord message handler", async () => { + const { monitorDiscordProvider } = await import("./provider.js"); + + resolveDiscordAccountMock.mockImplementation(() => ({ + accountId: "default", + token: "cfg-token", + config: { + commands: { native: true, nativeSkills: false }, + voice: { enabled: false }, + agentComponents: { enabled: false }, + execApprovals: { enabled: false }, + inboundWorker: { runTimeoutMs: 300_000 }, + }, + })); + + await monitorDiscordProvider({ + config: baseConfig(), + runtime: baseRuntime(), + }); + + expect(createDiscordMessageHandlerMock).toHaveBeenCalledTimes(1); + const firstCall = createDiscordMessageHandlerMock.mock.calls.at(0) as + | [{ workerRunTimeoutMs?: number }] + | undefined; + const params = firstCall?.[0]; + expect(params?.workerRunTimeoutMs).toBe(300_000); + }); + it("registers plugin commands as native Discord commands", async () => { const { monitorDiscordProvider } = await import("./provider.js"); listNativeCommandSpecsForConfigMock.mockReturnValue([ diff --git a/src/discord/monitor/provider.ts b/src/discord/monitor/provider.ts index defa73d5262..fc24e6af1f5 100644 --- a/src/discord/monitor/provider.ts +++ b/src/discord/monitor/provider.ts @@ -600,8 +600,9 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { if (voiceEnabled) { clientPlugins.push(new VoicePlugin()); } - // Pass eventQueue config to Carbon so the listener timeout can be tuned. - // Default listenerTimeout is 120s (Carbon defaults to 30s which is too short for LLM calls). + // Pass eventQueue config to Carbon so the gateway listener budget can be tuned. + // Default listenerTimeout is 120s (Carbon defaults to 30s, which is too short for some + // Discord normalization/enqueue work). const eventQueueOpts = { listenerTimeout: 120_000, ...discordCfg.eventQueue, @@ -683,7 +684,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime, setStatus: opts.setStatus, abortSignal: opts.abortSignal, - listenerTimeoutMs: eventQueueOpts.listenerTimeout, + workerRunTimeoutMs: discordCfg.inboundWorker?.runTimeoutMs, botUserId, guildHistories, historyLimit, diff --git a/src/discord/monitor/timeouts.ts b/src/discord/monitor/timeouts.ts new file mode 100644 index 00000000000..2ca7f4625d4 --- /dev/null +++ b/src/discord/monitor/timeouts.ts @@ -0,0 +1,120 @@ +const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647; + +export const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000; +export const DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS = 30 * 60_000; + +function clampDiscordTimeoutMs(timeoutMs: number, minimumMs: number): number { + return Math.max(minimumMs, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS)); +} + +export function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number { + if (!Number.isFinite(raw) || (raw ?? 0) <= 0) { + return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS; + } + return clampDiscordTimeoutMs(raw!, 1_000); +} + +export function normalizeDiscordInboundWorkerTimeoutMs( + raw: number | undefined, +): number | undefined { + if (raw === 0) { + return undefined; + } + if (typeof raw !== "number" || !Number.isFinite(raw) || raw < 0) { + return DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS; + } + return clampDiscordTimeoutMs(raw, 1); +} + +export function isAbortError(error: unknown): boolean { + if (typeof error !== "object" || error === null) { + return false; + } + return "name" in error && String((error as { name?: unknown }).name) === "AbortError"; +} + +export function mergeAbortSignals( + signals: Array, +): AbortSignal | undefined { + const activeSignals = signals.filter((signal): signal is AbortSignal => Boolean(signal)); + if (activeSignals.length === 0) { + return undefined; + } + if (activeSignals.length === 1) { + return activeSignals[0]; + } + if (typeof AbortSignal.any === "function") { + return AbortSignal.any(activeSignals); + } + const fallbackController = new AbortController(); + for (const signal of activeSignals) { + if (signal.aborted) { + fallbackController.abort(); + return fallbackController.signal; + } + } + const abortFallback = () => { + fallbackController.abort(); + for (const signal of activeSignals) { + signal.removeEventListener("abort", abortFallback); + } + }; + for (const signal of activeSignals) { + signal.addEventListener("abort", abortFallback, { once: true }); + } + return fallbackController.signal; +} + +export async function runDiscordTaskWithTimeout(params: { + run: (abortSignal: AbortSignal | undefined) => Promise; + timeoutMs?: number; + abortSignals?: Array; + onTimeout: (timeoutMs: number) => void; + onAbortAfterTimeout?: () => void; + onErrorAfterTimeout?: (error: unknown) => void; +}): Promise { + const timeoutAbortController = params.timeoutMs ? new AbortController() : undefined; + const mergedAbortSignal = mergeAbortSignals([ + ...(params.abortSignals ?? []), + timeoutAbortController?.signal, + ]); + + let timedOut = false; + let timeoutHandle: ReturnType | null = null; + const runPromise = params.run(mergedAbortSignal).catch((error) => { + if (!timedOut) { + throw error; + } + if (timeoutAbortController?.signal.aborted && isAbortError(error)) { + params.onAbortAfterTimeout?.(); + return; + } + params.onErrorAfterTimeout?.(error); + }); + + try { + if (!params.timeoutMs) { + await runPromise; + return false; + } + const timeoutPromise = new Promise<"timeout">((resolve) => { + timeoutHandle = setTimeout(() => resolve("timeout"), params.timeoutMs); + timeoutHandle.unref?.(); + }); + const result = await Promise.race([ + runPromise.then(() => "completed" as const), + timeoutPromise, + ]); + if (result === "timeout") { + timedOut = true; + timeoutAbortController?.abort(); + params.onTimeout(params.timeoutMs); + return true; + } + return false; + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } +}