diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 44660d2373f..6112e2eaf2d 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -8903fd57a58acb9a7c949efc6b4197b249220dcd965420ceb7d884cb45fbc48d config-baseline.json -86ad0927d992bc873affb3e20a31c6e3c95b2185a91f46cc8e6262a723a78f7d config-baseline.core.json -bb7234c52b0bbf12de2a87fa553ec4e89e13aaba9d0d81cf1370621292da13e9 config-baseline.channel.json -1f5592bfd141ba1e982ce31763a253c10afb080ab4ea2b6538299b114e29cee1 config-baseline.plugin.json +ab654d17b4d3520c81de45dbcf96a8ecef35254cfd6df21af170dd2ebe550799 config-baseline.json +8bc9fda7c1096472beaa416a61043ce51d691d4dcad9ed3e0be46e68bb70b0ce config-baseline.core.json +56db8ae09c5573a453b8fb01ac579c5b9d8a69fa3fffff2ba2956e5e2ccb2f99 config-baseline.channel.json +0dd6583fafae6c9134e46c4cf9bddee9822d6436436dcb1a6dcba6d012962e51 config-baseline.plugin.json diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index dc4ecfcac87..be24b66064b 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -c65b1aa1fb4cf402b90bedd3614eb5d7c3903ab860856392d1ee2481818a7a22 plugin-sdk-api-baseline.json -da172742470204044c1542a3bba7f183161e90e742f6865c1c7f822dbdc7a7d6 plugin-sdk-api-baseline.jsonl +427eb476f48ad368fd7061297727a7634be75612aedef5de91e351ac446553ce plugin-sdk-api-baseline.json +6065b278792b4664d31c07ec46f852c3d99c8882adb4b37db3d4f2fe78a74af8 plugin-sdk-api-baseline.jsonl diff --git a/docs/channels/discord.md b/docs/channels/discord.md index 7381b969ccd..7c4de9bf8a4 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -1090,26 +1090,26 @@ openclaw logs --follow - + Typical logs: - - `Listener DiscordMessageListener timed out after 30000ms for event MESSAGE_CREATE` - `Slow listener detected ...` - - `discord inbound worker timed out after ...` + - `stuck session: sessionKey=agent:...:discord:... state=processing ...` - Listener budget knob: + Carbon gateway queue knobs: - single-account: `channels.discord.eventQueue.listenerTimeout` - multi-account: `channels.discord.accounts..eventQueue.listenerTimeout` + - this only controls Carbon gateway listener work, not agent turn lifetime - Worker run timeout knob: + Discord does not apply a channel-owned timeout to queued agent turns. Message listeners hand off immediately, and queued Discord runs preserve per-session ordering until the session/tool/runtime lifecycle completes or aborts the work. - - single-account: `channels.discord.inboundWorker.runTimeoutMs` - - multi-account: `channels.discord.accounts..inboundWorker.runTimeoutMs` - - default: `1800000` (30 minutes); set `0` to disable + Deprecated compatibility setting: - Recommended baseline: + - `channels.discord.inboundWorker.runTimeoutMs` + - `channels.discord.accounts..inboundWorker.runTimeoutMs` + - ignored by current Discord message handling ```json5 { @@ -1120,9 +1120,6 @@ openclaw logs --follow eventQueue: { listenerTimeout: 120000, }, - inboundWorker: { - runTimeoutMs: 1800000, - }, }, }, }, @@ -1130,9 +1127,6 @@ openclaw logs --follow } ``` - Use `eventQueue.listenerTimeout` for slow listener setup and `inboundWorker.runTimeoutMs` - only if you want a separate safety valve for queued agent turns. - @@ -1193,7 +1187,7 @@ Primary reference: [Configuration reference - Discord](/gateway/config-channels# - policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*` - command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*` - event queue: `eventQueue.listenerTimeout` (listener budget), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency` -- inbound worker: `inboundWorker.runTimeoutMs` +- deprecated compatibility: `inboundWorker.runTimeoutMs` (ignored) - gateway metadata: `gatewayInfoTimeoutMs` - reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit` - delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage` diff --git a/extensions/discord/api.ts b/extensions/discord/api.ts index 26f43fe3fc7..698eceacaea 100644 --- a/extensions/discord/api.ts +++ b/extensions/discord/api.ts @@ -111,6 +111,8 @@ export { export { collectDiscordSecurityAuditFindings } from "./src/security-audit.js"; export { resolveDiscordRuntimeGroupPolicy } from "./src/runtime-group-policy.js"; export { + DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, + DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, } from "./src/monitor/timeouts.js"; diff --git a/extensions/discord/runtime-api.ts b/extensions/discord/runtime-api.ts index 775127a620c..9d7a9aa4c14 100644 --- a/extensions/discord/runtime-api.ts +++ b/extensions/discord/runtime-api.ts @@ -96,11 +96,7 @@ export { DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, - isAbortError, mergeAbortSignals, - normalizeDiscordInboundWorkerTimeoutMs, - normalizeDiscordListenerTimeoutMs, - runDiscordTaskWithTimeout, } from "./src/monitor/timeouts.js"; export { fetchDiscordApplicationId, diff --git a/extensions/discord/src/config-ui-hints.ts b/extensions/discord/src/config-ui-hints.ts index 410478e1e36..68b8902ce90 100644 --- a/extensions/discord/src/config-ui-hints.ts +++ b/extensions/discord/src/config-ui-hints.ts @@ -90,8 +90,8 @@ export const discordChannelConfigUiHints = { help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).", }, "inboundWorker.runTimeoutMs": { - label: "Discord Inbound Worker Timeout (ms)", - help: "Optional queued Discord inbound worker timeout in ms. This is separate from Carbon listener timeouts; defaults to 1800000 and can be disabled with 0. Set per account via channels.discord.accounts..inboundWorker.runTimeoutMs.", + label: "Deprecated Discord Inbound Worker Timeout", + help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.", }, "eventQueue.listenerTimeout": { label: "Discord EventQueue Listener Timeout (ms)", diff --git a/extensions/discord/src/monitor.test.ts b/extensions/discord/src/monitor.test.ts index e58eda2f55c..9b93dab6523 100644 --- a/extensions/discord/src/monitor.test.ts +++ b/extensions/discord/src/monitor.test.ts @@ -192,7 +192,7 @@ describe("DiscordMessageListener", () => { expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("discord handler failed")); }); - it("does not apply its own slow-listener logging (owned by inbound worker)", async () => { + it("does not apply its own slow-listener logging", async () => { const deferred = createDeferred(); const handler = vi.fn(() => deferred.promise); const logger = { @@ -212,8 +212,7 @@ describe("DiscordMessageListener", () => { deferred.resolve(); await flushAsyncWork(); expect(handler).toHaveBeenCalledOnce(); - // The listener no longer wraps handlers with slow-listener logging; - // that responsibility moved to the inbound worker. + // The listener no longer wraps message handlers with slow-listener logging. expect(logger.warn).not.toHaveBeenCalled(); }); }); diff --git a/extensions/discord/src/monitor/inbound-worker.ts b/extensions/discord/src/monitor/inbound-worker.ts deleted file mode 100644 index 03be2c234ea..00000000000 --- a/extensions/discord/src/monitor/inbound-worker.ts +++ /dev/null @@ -1,247 +0,0 @@ -import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle"; -import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; -import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; -import { danger, formatDurationSeconds } from "openclaw/plugin-sdk/runtime-env"; -import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; -import { - commitDiscordInboundReplay, - createDiscordInboundReplayGuard, - DiscordRetryableInboundError, - releaseDiscordInboundReplay, -} from "./inbound-dedupe.js"; -import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js"; -import type { RuntimeEnv } from "./message-handler.preflight.types.js"; -import type { DiscordMonitorStatusSink } from "./status.js"; -import { resolveDiscordReplyDeliveryPlan } from "./threading.js"; -import { normalizeDiscordInboundWorkerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js"; - -type ProcessDiscordMessage = typeof import("./message-handler.process.js").processDiscordMessage; -type DeliverDiscordReply = typeof import("./reply-delivery.js").deliverDiscordReply; - -type DiscordInboundWorkerParams = { - runtime: RuntimeEnv; - setStatus?: DiscordMonitorStatusSink; - abortSignal?: AbortSignal; - runTimeoutMs?: number; - replayGuard?: ClaimableDedupe; - __testing?: DiscordInboundWorkerTestingHooks; -}; - -export type DiscordInboundWorker = { - enqueue: (job: DiscordInboundJob) => void; - deactivate: () => void; -}; - -export type DiscordInboundWorkerTestingHooks = { - processDiscordMessage?: ProcessDiscordMessage; - deliverDiscordReply?: DeliverDiscordReply; -}; - -let messageProcessRuntimePromise: - | Promise - | undefined; -let replyDeliveryRuntimePromise: Promise | undefined; - -async function loadMessageProcessRuntime() { - messageProcessRuntimePromise ??= import("./message-handler.process.js"); - return await messageProcessRuntimePromise; -} - -async function loadReplyDeliveryRuntime() { - replyDeliveryRuntimePromise ??= import("./reply-delivery.js"); - return await replyDeliveryRuntimePromise; -} - -function formatDiscordRunContextSuffix(job: DiscordInboundJob): string { - const channelId = job.payload.messageChannelId?.trim(); - const messageId = job.payload.data?.message?.id?.trim(); - const details = [ - channelId ? `channelId=${channelId}` : null, - messageId ? `messageId=${messageId}` : null, - ].filter((entry): entry is string => Boolean(entry)); - if (details.length === 0) { - return ""; - } - return ` (${details.join(", ")})`; -} - -async function processDiscordInboundJob(params: { - job: DiscordInboundJob; - runtime: RuntimeEnv; - lifecycleSignal?: AbortSignal; - runTimeoutMs?: number; - replayGuard: ClaimableDedupe; - testing?: DiscordInboundWorkerTestingHooks; -}) { - const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs); - const contextSuffix = formatDiscordRunContextSuffix(params.job); - let finalReplyStarted = false; - let createdThreadId: string | undefined; - let sessionKey: string | undefined; - const processDiscordMessageImpl = - params.testing?.processDiscordMessage ?? - (await loadMessageProcessRuntime()).processDiscordMessage; - try { - await runDiscordTaskWithTimeout({ - run: async (abortSignal) => { - await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal), { - onFinalReplyStart: () => { - finalReplyStarted = true; - }, - onFinalReplyDelivered: () => { - finalReplyStarted = true; - }, - onReplyPlanResolved: (resolved) => { - createdThreadId = normalizeOptionalString(resolved.createdThreadId); - sessionKey = normalizeOptionalString(resolved.sessionKey); - }, - }); - }, - timeoutMs, - abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal], - onTimeout: async (resolvedTimeoutMs) => { - params.runtime.error?.( - danger( - `discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, { - decimals: 1, - unit: "seconds", - })}${contextSuffix}`, - ), - ); - if (finalReplyStarted) { - return; - } - await sendDiscordInboundWorkerTimeoutReply({ - job: params.job, - runtime: params.runtime, - contextSuffix, - createdThreadId, - sessionKey, - deliverDiscordReplyImpl: params.testing?.deliverDiscordReply, - }); - }, - onErrorAfterTimeout: (error) => { - params.runtime.error?.( - danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`), - ); - }, - }); - await commitDiscordInboundReplay({ - replayKeys: params.job.replayKeys, - replayGuard: params.replayGuard, - }); - } catch (error) { - if (error instanceof DiscordRetryableInboundError) { - releaseDiscordInboundReplay({ - replayKeys: params.job.replayKeys, - error, - replayGuard: params.replayGuard, - }); - } else { - await commitDiscordInboundReplay({ - replayKeys: params.job.replayKeys, - replayGuard: params.replayGuard, - }); - } - throw error; - } -} - -async function sendDiscordInboundWorkerTimeoutReply(params: { - job: DiscordInboundJob; - runtime: RuntimeEnv; - contextSuffix: string; - createdThreadId?: string; - sessionKey?: string; - deliverDiscordReplyImpl?: DeliverDiscordReply; -}) { - const messageChannelId = params.job.payload.messageChannelId?.trim(); - const messageId = params.job.payload.message?.id?.trim(); - const token = params.job.payload.token?.trim(); - if (!messageChannelId || !messageId || !token) { - params.runtime.error?.( - danger( - `discord inbound worker timeout reply skipped: missing reply target${params.contextSuffix}`, - ), - ); - return; - } - - const deliveryPlan = resolveDiscordReplyDeliveryPlan({ - replyTarget: `channel:${params.job.payload.threadChannel?.id ?? messageChannelId}`, - replyToMode: params.job.payload.replyToMode, - messageId, - threadChannel: params.job.payload.threadChannel, - createdThreadId: params.createdThreadId, - }); - - try { - const deliverDiscordReplyImpl = - params.deliverDiscordReplyImpl ?? (await loadReplyDeliveryRuntime()).deliverDiscordReply; - await deliverDiscordReplyImpl({ - cfg: params.job.payload.cfg, - replies: [{ text: "Discord inbound worker timed out.", isError: true }], - target: deliveryPlan.deliverTarget, - token, - accountId: params.job.payload.accountId, - runtime: params.runtime, - textLimit: params.job.payload.textLimit, - maxLinesPerMessage: params.job.payload.discordConfig?.maxLinesPerMessage, - replyToId: deliveryPlan.replyReference.use(), - replyToMode: params.job.payload.replyToMode, - sessionKey: - params.sessionKey ?? - params.job.payload.route.sessionKey ?? - params.job.payload.baseSessionKey, - threadBindings: params.job.runtime.threadBindings, - }); - } catch (error) { - params.runtime.error?.( - danger( - `discord inbound worker timeout reply failed: ${String(error)}${params.contextSuffix}`, - ), - ); - } -} - -export function createDiscordInboundWorker( - params: DiscordInboundWorkerParams, -): DiscordInboundWorker { - const runQueue = new KeyedAsyncQueue(); - const runState = createRunStateMachine({ - setStatus: params.setStatus, - abortSignal: params.abortSignal, - }); - const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard(); - - return { - enqueue(job) { - void runQueue - .enqueue(job.queueKey, async () => { - if (!runState.isActive()) { - return; - } - runState.onRunStart(); - try { - if (!runState.isActive()) { - return; - } - await processDiscordInboundJob({ - job, - runtime: params.runtime, - lifecycleSignal: params.abortSignal, - runTimeoutMs: params.runTimeoutMs, - replayGuard, - testing: params.__testing, - }); - } finally { - runState.onRunEnd(); - } - }) - .catch((error) => { - params.runtime.error?.(danger(`discord inbound worker failed: ${String(error)}`)); - }); - }, - deactivate: runState.deactivate, - }; -} diff --git a/extensions/discord/src/monitor/listeners.ts b/extensions/discord/src/monitor/listeners.ts index b26b7e53526..950bfe2f6d2 100644 --- a/extensions/discord/src/monitor/listeners.ts +++ b/extensions/discord/src/monitor/listeners.ts @@ -38,7 +38,6 @@ import { setPresence } from "./presence-cache.js"; import { isThreadArchived } from "./thread-bindings.discord-api.js"; import { resolveFetchedDiscordThreadLikeChannelContext } from "./thread-channel-context.js"; import { closeDiscordThreadSessions } from "./thread-session-close.js"; -import { normalizeDiscordListenerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js"; type LoadedConfig = OpenClawConfig; type RuntimeEnv = import("openclaw/plugin-sdk/runtime-env").RuntimeEnv; @@ -141,46 +140,13 @@ async function runDiscordListenerWithSlowLog(params: { logger: Logger | undefined; listener: string; event: string; - run: (abortSignal: AbortSignal | undefined) => Promise; - timeoutMs?: number; + run: () => Promise; context?: Record; onError?: (err: unknown) => void; }) { const startedAt = Date.now(); - const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs); - const logger = params.logger ?? discordEventQueueLog; - let timedOut = false; - try { - timedOut = await runDiscordTaskWithTimeout({ - run: params.run, - timeoutMs, - onTimeout: (resolvedTimeoutMs) => { - logger.error( - danger( - `discord handler timed out after ${formatDurationSeconds(resolvedTimeoutMs, { - decimals: 1, - unit: "seconds", - })}${formatListenerContextSuffix(params.context)}`, - ), - ); - }, - onAbortAfterTimeout: () => { - logger.warn( - `discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`, - ); - }, - onErrorAfterTimeout: (err) => { - logger.error( - danger( - `discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`, - ), - ); - }, - }); - if (timedOut) { - return; - } + await params.run(); } catch (err) { if (params.onError) { params.onError(err); @@ -188,15 +154,13 @@ async function runDiscordListenerWithSlowLog(params: { } throw err; } finally { - if (!timedOut) { - logSlowDiscordListener({ - logger: params.logger, - listener: params.listener, - event: params.event, - durationMs: Date.now() - startedAt, - context: params.context, - }); - } + logSlowDiscordListener({ + logger: params.logger, + listener: params.listener, + event: params.event, + durationMs: Date.now() - startedAt, + context: params.context, + }); } } @@ -213,7 +177,6 @@ export class DiscordMessageListener extends MessageCreateListener { private handler: DiscordMessageHandler, private logger?: Logger, private onEvent?: () => void, - _options?: { timeoutMs?: number }, ) { super(); } @@ -221,9 +184,8 @@ export class DiscordMessageListener extends MessageCreateListener { async handle(data: DiscordMessageEvent, client: Client) { this.onEvent?.(); // Fire-and-forget: hand off to the handler without blocking the - // Carbon listener. Per-session ordering and run timeouts are owned - // by the inbound worker queue, so the listener no longer serializes - // or applies its own timeout. + // Carbon listener. Per-session ordering is owned by the message run queue, + // so the listener no longer serializes or applies its own timeout. void Promise.resolve() .then(() => this.handler(data, client)) .catch((err) => { diff --git a/extensions/discord/src/monitor/message-handler.module-test-helpers.ts b/extensions/discord/src/monitor/message-handler.module-test-helpers.ts index 499bc5d4efc..74e2ae1acab 100644 --- a/extensions/discord/src/monitor/message-handler.module-test-helpers.ts +++ b/extensions/discord/src/monitor/message-handler.module-test-helpers.ts @@ -1,10 +1,9 @@ import type { MockFn } from "openclaw/plugin-sdk/plugin-test-runtime"; import { vi } from "vitest"; -import type { DiscordInboundWorkerTestingHooks } from "./inbound-worker.js"; +import type { DiscordMessageRunQueueTestingHooks } from "./message-run-queue.js"; export const preflightDiscordMessageMock: MockFn = vi.fn(); export const processDiscordMessageMock: MockFn = vi.fn(); -export const deliverDiscordReplyMock: MockFn = vi.fn(async () => undefined); const { createDiscordMessageHandler: createRealDiscordMessageHandler } = await import("./message-handler.js"); @@ -14,9 +13,8 @@ type PreflightDiscordMessageHook = NonNullable< DiscordMessageHandlerTestingHooks["preflightDiscordMessage"] >; type ProcessDiscordMessageHook = NonNullable< - DiscordInboundWorkerTestingHooks["processDiscordMessage"] + DiscordMessageRunQueueTestingHooks["processDiscordMessage"] >; -type DeliverDiscordReplyHook = NonNullable; export function createDiscordMessageHandler( ...args: Parameters @@ -28,7 +26,6 @@ export function createDiscordMessageHandler( ...params.__testing, preflightDiscordMessage: preflightDiscordMessageMock as PreflightDiscordMessageHook, processDiscordMessage: processDiscordMessageMock as ProcessDiscordMessageHook, - deliverDiscordReply: deliverDiscordReplyMock as DeliverDiscordReplyHook, }, }); } diff --git a/extensions/discord/src/monitor/message-handler.queue.test.ts b/extensions/discord/src/monitor/message-handler.queue.test.ts index 4f6e141a259..8bd1bb2fe18 100644 --- a/extensions/discord/src/monitor/message-handler.queue.test.ts +++ b/extensions/discord/src/monitor/message-handler.queue.test.ts @@ -4,14 +4,12 @@ import { createDiscordMessageHandler, preflightDiscordMessageMock, processDiscordMessageMock, - deliverDiscordReplyMock, } from "./message-handler.module-test-helpers.js"; import { createDiscordHandlerParams, createDiscordPreflightContext, } from "./message-handler.test-helpers.js"; -const eventualReplyDeliveredMock = vi.hoisted(() => vi.fn()); type SetStatusFn = (patch: Record) => void; function createDeferred() { let resolve: (value: T | PromiseLike) => void = () => {}; @@ -56,10 +54,7 @@ function createPreflightContext(channelId = "ch-1") { }; } -function createHandlerWithDefaultPreflight(overrides?: { - setStatus?: SetStatusFn; - workerRunTimeoutMs?: number; -}) { +function createHandlerWithDefaultPreflight(overrides?: { setStatus?: SetStatusFn }) { preflightDiscordMessageMock.mockImplementation(async (params: { data: { channel_id: string } }) => createPreflightContext(params.data.channel_id), ); @@ -72,69 +67,6 @@ function installDefaultDiscordPreflight() { ); } -function createAbortOnTimeoutProcessImplementation() { - return async (ctx: { abortSignal?: AbortSignal }) => { - await new Promise((resolve) => { - if (ctx.abortSignal?.aborted) { - resolve(); - return; - } - ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); - }); - }; -} - -async function queueTimedMessages(params?: { - workerRunTimeoutMs?: number; - beforeCreateHandler?: () => void; -}) { - preflightDiscordMessageMock.mockReset(); - processDiscordMessageMock.mockReset(); - deliverDiscordReplyMock.mockClear(); - - processDiscordMessageMock - .mockImplementationOnce(createAbortOnTimeoutProcessImplementation()) - .mockImplementationOnce(async () => undefined); - installDefaultDiscordPreflight(); - params?.beforeCreateHandler?.(); - - const handlerParams = createDiscordHandlerParams({ - workerRunTimeoutMs: params?.workerRunTimeoutMs ?? 50, - }); - const handler = createDiscordMessageHandler(handlerParams); - - await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); - await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined(); - - return { handlerParams }; -} - -async function runSingleMessageTimeout(params: { - processImpl: Parameters[0]; - workerRunTimeoutMs?: number; -}) { - preflightDiscordMessageMock.mockReset(); - processDiscordMessageMock.mockReset(); - deliverDiscordReplyMock.mockClear(); - processDiscordMessageMock.mockImplementationOnce(params.processImpl); - installDefaultDiscordPreflight(); - - const handlerParams = createDiscordHandlerParams({ - workerRunTimeoutMs: params.workerRunTimeoutMs ?? 50, - }); - const handler = createDiscordMessageHandler(handlerParams); - - await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined(); - await vi.advanceTimersByTimeAsync(60); - await Promise.resolve(); - - expect(handlerParams.runtime.error).toHaveBeenCalledWith( - expect.stringContaining("discord inbound worker timed out after"), - ); - - return handlerParams; -} - async function createLifecycleStopScenario(params: { createHandler: (status: SetStatusFn) => { handler: (data: never, opts: never) => Promise; @@ -269,9 +201,7 @@ describe("createDiscordMessageHandler queue behavior", () => { await flushQueueWork(); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); expect(params.runtime.error).toHaveBeenCalledWith( - expect.stringContaining( - "discord inbound worker failed: DiscordRetryableInboundError: retry me", - ), + expect.stringContaining("discord message run failed: DiscordRetryableInboundError: retry me"), ); await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined(); @@ -298,7 +228,7 @@ describe("createDiscordMessageHandler queue behavior", () => { await flushQueueWork(); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); expect(params.runtime.error).toHaveBeenCalledWith( - expect.stringContaining("discord inbound worker failed: Error: post-send failure"), + expect.stringContaining("discord message run failed: Error: post-send failure"), ); await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined(); @@ -309,226 +239,56 @@ describe("createDiscordMessageHandler queue behavior", () => { expect(visibleSideEffect).toHaveBeenCalledTimes(1); }); - it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => { - vi.useFakeTimers(); - try { - const { handlerParams } = await queueTimedMessages(); - - await vi.advanceTimersByTimeAsync(60); - await flushQueueWork(); - expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); - - const firstCtx = processDiscordMessageMock.mock.calls[0]?.[0] as - | { abortSignal?: AbortSignal } - | undefined; - expect(firstCtx?.abortSignal?.aborted).toBe(true); - expect(handlerParams.runtime.error).toHaveBeenCalledWith( - expect.stringContaining("discord inbound worker timed out after"), - ); - expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1); - expect(deliverDiscordReplyMock).toHaveBeenCalledWith( - expect.objectContaining({ - target: "channel:ch-1", - token: "test-token", - replies: [ - expect.objectContaining({ - isError: true, - text: "Discord inbound worker timed out.", - }), - ], - }), - ); - } finally { - vi.useRealTimers(); - } - }); - - it("waits for the timeout fallback reply before starting the next queued run", async () => { - vi.useFakeTimers(); - try { - const deliverTimeoutReply = createDeferred(); - const { handlerParams } = await queueTimedMessages({ - beforeCreateHandler: () => { - deliverDiscordReplyMock.mockReset(); - deliverDiscordReplyMock.mockImplementationOnce(async () => { - await deliverTimeoutReply.promise; - }); - }, - }); - - await vi.advanceTimersByTimeAsync(60); - await flushQueueWork(); - expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1); - - expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); - expect(handlerParams.runtime.error).toHaveBeenCalledWith( - expect.stringContaining("discord inbound worker timed out after"), - ); - - deliverTimeoutReply.resolve(); - await deliverTimeoutReply.promise; - - await flushQueueWork(); - expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); - } finally { - vi.useRealTimers(); - } - }); - - it("does not send the timeout fallback when a final reply already went out", async () => { - vi.useFakeTimers(); - try { - await runSingleMessageTimeout({ - processImpl: async ( - ctx: { abortSignal?: AbortSignal }, - observer?: { onFinalReplyStart?: () => void; onFinalReplyDelivered?: () => void }, - ) => { - observer?.onFinalReplyStart?.(); - observer?.onFinalReplyDelivered?.(); - await new Promise((resolve) => { - if (ctx.abortSignal?.aborted) { - resolve(); - return; - } - ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); - }); - }, - }); - - expect(deliverDiscordReplyMock).not.toHaveBeenCalled(); - } finally { - vi.useRealTimers(); - } - }); - - it("routes the timeout fallback to the created auto-thread target", async () => { - vi.useFakeTimers(); - try { - await runSingleMessageTimeout({ - processImpl: async ( - ctx: { abortSignal?: AbortSignal }, - observer?: { - onReplyPlanResolved?: (params: { - createdThreadId?: string; - sessionKey?: string; - }) => void; - }, - ) => { - observer?.onReplyPlanResolved?.({ - createdThreadId: "thread-1", - sessionKey: "agent:main:discord:channel:thread-1", - }); - await new Promise((resolve) => { - if (ctx.abortSignal?.aborted) { - resolve(); - return; - } - ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true }); - }); - }, - }); - - expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1); - expect(deliverDiscordReplyMock).toHaveBeenCalledWith( - expect.objectContaining({ - target: "channel:thread-1", - sessionKey: "agent:main:discord:channel:thread-1", - replies: [ - expect.objectContaining({ - isError: true, - text: "Discord inbound worker timed out.", - }), - ], - }), - ); - } finally { - vi.useRealTimers(); - } - }); - - it("does not send the timeout fallback when final reply delivery is already in flight", async () => { + it("does not abort long queued runs with a Discord-owned channel timeout", async () => { vi.useFakeTimers(); try { preflightDiscordMessageMock.mockReset(); processDiscordMessageMock.mockReset(); - deliverDiscordReplyMock.mockClear(); - const finishFinalReply = createDeferred(); + const firstRun = createDeferred(); + const secondRun = createDeferred(); + const capturedAbortSignals: Array = []; processDiscordMessageMock.mockImplementationOnce( - async ( - _ctx: { abortSignal?: AbortSignal }, - observer?: { onFinalReplyStart?: () => void; onFinalReplyDelivered?: () => void }, - ) => { - observer?.onFinalReplyStart?.(); - await finishFinalReply.promise; - observer?.onFinalReplyDelivered?.(); + async (ctx: { abortSignal?: AbortSignal }) => { + capturedAbortSignals.push(ctx.abortSignal); + await firstRun.promise; }, ); - preflightDiscordMessageMock.mockImplementation( - async (params: { data: { channel_id: string } }) => - createPreflightContext(params.data.channel_id), + processDiscordMessageMock.mockImplementationOnce( + async (ctx: { abortSignal?: AbortSignal }) => { + capturedAbortSignals.push(ctx.abortSignal); + await secondRun.promise; + }, ); - - const params = createDiscordHandlerParams({ workerRunTimeoutMs: 50 }); + installDefaultDiscordPreflight(); + const params = createDiscordHandlerParams(); const handler = createDiscordMessageHandler(params); await expect( handler(createMessageData("m-1") as never, {} as never), ).resolves.toBeUndefined(); + await expect( + handler(createMessageData("m-2") as never, {} as never), + ).resolves.toBeUndefined(); await flushQueueWork(); expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); - await vi.advanceTimersByTimeAsync(60); - await Promise.resolve(); + await vi.advanceTimersByTimeAsync(60_000); + await flushQueueWork(); - expect(params.runtime.error).toHaveBeenCalledWith( - expect.stringContaining("discord inbound worker timed out after"), - ); - expect(deliverDiscordReplyMock).not.toHaveBeenCalled(); + expect(processDiscordMessageMock).toHaveBeenCalledTimes(1); + expect(capturedAbortSignals[0]?.aborted).not.toBe(true); + expect(params.runtime.error).not.toHaveBeenCalledWith(expect.stringContaining("timed out")); - finishFinalReply.resolve(); - await finishFinalReply.promise; - await Promise.resolve(); + firstRun.resolve(); + await firstRun.promise; + await flushQueueWork(); - expect(deliverDiscordReplyMock).not.toHaveBeenCalled(); - } finally { - vi.useRealTimers(); - } - }); + expect(processDiscordMessageMock).toHaveBeenCalledTimes(2); + expect(capturedAbortSignals[1]?.aborted).not.toBe(true); - it("does not time out queued runs when the inbound worker timeout is disabled", async () => { - vi.useFakeTimers(); - try { - preflightDiscordMessageMock.mockReset(); - processDiscordMessageMock.mockReset(); - eventualReplyDeliveredMock.mockReset(); - - processDiscordMessageMock.mockImplementationOnce( - async (ctx: { abortSignal?: AbortSignal }) => { - await new Promise((resolve) => { - setTimeout(() => { - if (!ctx.abortSignal?.aborted) { - eventualReplyDeliveredMock(); - } - resolve(); - }, 80); - }); - }, - ); - const params = createDiscordHandlerParams({ workerRunTimeoutMs: 0 }); - const handler = createHandlerWithDefaultPreflight({ workerRunTimeoutMs: 0 }); - - await expect( - handler(createMessageData("m-1") as never, {} as never), - ).resolves.toBeUndefined(); - - await vi.advanceTimersByTimeAsync(80); - await Promise.resolve(); - - expect(eventualReplyDeliveredMock).toHaveBeenCalledTimes(1); - expect(params.runtime.error).not.toHaveBeenCalledWith( - expect.stringContaining("discord inbound worker timed out after"), - ); + secondRun.resolve(); + await secondRun.promise; } finally { vi.useRealTimers(); } diff --git a/extensions/discord/src/monitor/message-handler.test-helpers.ts b/extensions/discord/src/monitor/message-handler.test-helpers.ts index 1c166c17d1e..fd52186b247 100644 --- a/extensions/discord/src/monitor/message-handler.test-helpers.ts +++ b/extensions/discord/src/monitor/message-handler.test-helpers.ts @@ -9,7 +9,6 @@ export function createDiscordHandlerParams(overrides?: { botUserId?: string; setStatus?: (patch: Record) => void; abortSignal?: AbortSignal; - workerRunTimeoutMs?: number; }): Parameters[0] { const cfg: OpenClawConfig = { channels: { @@ -48,7 +47,6 @@ export function createDiscordHandlerParams(overrides?: { threadBindings: createNoopThreadBindingManager("default"), setStatus: overrides?.setStatus, abortSignal: overrides?.abortSignal, - workerRunTimeoutMs: overrides?.workerRunTimeoutMs, }; } diff --git a/extensions/discord/src/monitor/message-handler.ts b/extensions/discord/src/monitor/message-handler.ts index b63cf304a01..1bf89ecbdf9 100644 --- a/extensions/discord/src/monitor/message-handler.ts +++ b/extensions/discord/src/monitor/message-handler.ts @@ -14,13 +14,13 @@ import { releaseDiscordInboundReplay, } from "./inbound-dedupe.js"; import { buildDiscordInboundJob } from "./inbound-job.js"; -import { - createDiscordInboundWorker, - type DiscordInboundWorkerTestingHooks, -} from "./inbound-worker.js"; import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js"; import { applyImplicitReplyBatchGate } from "./message-handler.batch-gate.js"; import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js"; +import { + createDiscordMessageRunQueue, + type DiscordMessageRunQueueTestingHooks, +} from "./message-run-queue.js"; import { hasDiscordMessageStickers, resolveDiscordMessageChannelId, @@ -37,11 +37,10 @@ type DiscordMessageHandlerParams = Omit< > & { setStatus?: DiscordMonitorStatusSink; abortSignal?: AbortSignal; - workerRunTimeoutMs?: number; __testing?: DiscordMessageHandlerTestingHooks; }; -type DiscordMessageHandlerTestingHooks = DiscordInboundWorkerTestingHooks & { +type DiscordMessageHandlerTestingHooks = DiscordMessageRunQueueTestingHooks & { preflightDiscordMessage?: PreflightDiscordMessage; }; @@ -76,11 +75,10 @@ export function createDiscordMessageHandler( "group-mentions"; const preflightDiscordMessageImpl = params.__testing?.preflightDiscordMessage; const replayGuard = createDiscordInboundReplayGuard(); - const inboundWorker = createDiscordInboundWorker({ + const messageRunQueue = createDiscordMessageRunQueue({ runtime: params.runtime, setStatus: params.setStatus, abortSignal: params.abortSignal, - runTimeoutMs: params.workerRunTimeoutMs, replayGuard, __testing: params.__testing, }); @@ -156,7 +154,7 @@ export function createDiscordMessageHandler( return; } applyImplicitReplyBatchGate(ctx, params.replyToMode, false); - inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); + messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); return; } const combinedBaseText = entries @@ -209,7 +207,7 @@ export function createDiscordMessageHandler( ctxBatch.MessageSidLast = ids[ids.length - 1]; } } - inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); + messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys })); } catch (error) { if (error instanceof DiscordRetryableInboundError) { releaseDiscordInboundReplay({ replayKeys, error, replayGuard }); @@ -262,7 +260,7 @@ export function createDiscordMessageHandler( } }; - handler.deactivate = inboundWorker.deactivate; + handler.deactivate = messageRunQueue.deactivate; return handler; } diff --git a/extensions/discord/src/monitor/message-run-queue.ts b/extensions/discord/src/monitor/message-run-queue.ts new file mode 100644 index 00000000000..c44037e44e3 --- /dev/null +++ b/extensions/discord/src/monitor/message-run-queue.ts @@ -0,0 +1,115 @@ +import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle"; +import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; +import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; +import { danger } from "openclaw/plugin-sdk/runtime-env"; +import { + commitDiscordInboundReplay, + createDiscordInboundReplayGuard, + DiscordRetryableInboundError, + releaseDiscordInboundReplay, +} from "./inbound-dedupe.js"; +import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js"; +import type { RuntimeEnv } from "./message-handler.preflight.types.js"; +import type { DiscordMonitorStatusSink } from "./status.js"; +import { mergeAbortSignals } from "./timeouts.js"; + +type ProcessDiscordMessage = typeof import("./message-handler.process.js").processDiscordMessage; + +type DiscordMessageRunQueueParams = { + runtime: RuntimeEnv; + setStatus?: DiscordMonitorStatusSink; + abortSignal?: AbortSignal; + replayGuard?: ClaimableDedupe; + __testing?: DiscordMessageRunQueueTestingHooks; +}; + +export type DiscordMessageRunQueue = { + enqueue: (job: DiscordInboundJob) => void; + deactivate: () => void; +}; + +export type DiscordMessageRunQueueTestingHooks = { + processDiscordMessage?: ProcessDiscordMessage; +}; + +let messageProcessRuntimePromise: + | Promise + | undefined; + +async function loadMessageProcessRuntime() { + messageProcessRuntimePromise ??= import("./message-handler.process.js"); + return await messageProcessRuntimePromise; +} + +async function processDiscordQueuedMessage(params: { + job: DiscordInboundJob; + lifecycleSignal?: AbortSignal; + replayGuard: ClaimableDedupe; + testing?: DiscordMessageRunQueueTestingHooks; +}) { + const processDiscordMessageImpl = + params.testing?.processDiscordMessage ?? + (await loadMessageProcessRuntime()).processDiscordMessage; + const abortSignal = mergeAbortSignals([params.job.runtime.abortSignal, params.lifecycleSignal]); + try { + await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal)); + await commitDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + replayGuard: params.replayGuard, + }); + } catch (error) { + if (error instanceof DiscordRetryableInboundError) { + releaseDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + error, + replayGuard: params.replayGuard, + }); + } else { + await commitDiscordInboundReplay({ + replayKeys: params.job.replayKeys, + replayGuard: params.replayGuard, + }); + } + throw error; + } +} + +export function createDiscordMessageRunQueue( + params: DiscordMessageRunQueueParams, +): DiscordMessageRunQueue { + const runQueue = new KeyedAsyncQueue(); + const runState = createRunStateMachine({ + setStatus: params.setStatus, + abortSignal: params.abortSignal, + }); + const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard(); + + return { + enqueue(job) { + void runQueue + .enqueue(job.queueKey, async () => { + if (!runState.isActive()) { + return; + } + runState.onRunStart(); + try { + if (!runState.isActive()) { + return; + } + await processDiscordQueuedMessage({ + job, + lifecycleSignal: params.abortSignal, + replayGuard, + testing: params.__testing, + }); + } finally { + runState.onRunEnd(); + } + }) + .catch((error) => { + params.runtime.error?.(danger(`discord message run failed: ${String(error)}`)); + }); + }, + deactivate: runState.deactivate, + }; +} diff --git a/extensions/discord/src/monitor/message-utils.ts b/extensions/discord/src/monitor/message-utils.ts index c047720e2a9..322f3cd619b 100644 --- a/extensions/discord/src/monitor/message-utils.ts +++ b/extensions/discord/src/monitor/message-utils.ts @@ -343,7 +343,7 @@ async function fetchDiscordMedia(params: { abortSignal?: AbortSignal; }) { // `totalTimeoutMs` is enforced per individual attachment or sticker fetch. - // The inbound worker's abort signal remains the outer bound for the message. + // The caller abort signal remains the outer bound for the message. const timeoutAbortController = params.totalTimeoutMs ? new AbortController() : undefined; const signal = mergeAbortSignals([params.abortSignal, timeoutAbortController?.signal]); let timedOut = false; diff --git a/extensions/discord/src/monitor/provider.startup.ts b/extensions/discord/src/monitor/provider.startup.ts index cb4cc81e720..fade36e7b3f 100644 --- a/extensions/discord/src/monitor/provider.startup.ts +++ b/extensions/discord/src/monitor/provider.startup.ts @@ -253,7 +253,6 @@ export function registerDiscordMonitorListeners(params: { logger: NonNullable[1]>; messageHandler: ConstructorParameters[0]; trackInboundEvent?: () => void; - eventQueueListenerTimeoutMs?: number; }) { registerDiscordListener( params.client.listeners, @@ -261,9 +260,7 @@ export function registerDiscordMonitorListeners(params: { ); registerDiscordListener( params.client.listeners, - new DiscordMessageListener(params.messageHandler, params.logger, params.trackInboundEvent, { - timeoutMs: params.eventQueueListenerTimeoutMs, - }), + new DiscordMessageListener(params.messageHandler, params.logger, params.trackInboundEvent), ); const reactionListenerOptions: ConstructorParameters[0] = { diff --git a/extensions/discord/src/monitor/provider.test.ts b/extensions/discord/src/monitor/provider.test.ts index 54a5757652d..9cd3b944da8 100644 --- a/extensions/discord/src/monitor/provider.test.ts +++ b/extensions/discord/src/monitor/provider.test.ts @@ -637,7 +637,7 @@ describe("monitorDiscordProvider", () => { expect(eventQueue?.listenerTimeout).toBe(300_000); }); - it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => { + it("does not pass eventQueue.listenerTimeout into the message run queue", async () => { await monitorDiscordProvider({ config: createConfigWithDiscordAccount({ eventQueue: { listenerTimeout: 50_000 }, @@ -653,7 +653,7 @@ describe("monitorDiscordProvider", () => { expect("listenerTimeoutMs" in (params ?? {})).toBe(false); }); - it("forwards inbound worker timeout config to the Discord message handler", async () => { + it("ignores deprecated inbound worker timeout config", async () => { resolveDiscordAccountMock.mockReturnValue({ accountId: "default", token: "MTIz.abc.def", @@ -674,7 +674,7 @@ describe("monitorDiscordProvider", () => { const params = getFirstDiscordMessageHandlerParams<{ workerRunTimeoutMs?: number; }>(); - expect(params?.workerRunTimeoutMs).toBe(300_000); + expect(params?.workerRunTimeoutMs).toBeUndefined(); }); it("continues startup when Discord daily slash-command create quota is exhausted", async () => { diff --git a/extensions/discord/src/monitor/provider.ts b/extensions/discord/src/monitor/provider.ts index b360d62d5a7..29bfc1c9479 100644 --- a/extensions/discord/src/monitor/provider.ts +++ b/extensions/discord/src/monitor/provider.ts @@ -958,7 +958,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { gateway, gatewaySupervisor: createdGatewaySupervisor, autoPresenceController: createdAutoPresenceController, - eventQueueOpts, } = await createDiscordMonitorClient({ accountId: account.accountId, applicationId, @@ -1079,7 +1078,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { runtime, setStatus: opts.setStatus, abortSignal: opts.abortSignal, - workerRunTimeoutMs: discordCfg.inboundWorker?.runTimeoutMs, botUserId, guildHistories, historyLimit, @@ -1120,7 +1118,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) { logger, messageHandler, trackInboundEvent, - eventQueueListenerTimeoutMs: eventQueueOpts.listenerTimeout, }); logDiscordStartupPhase({ diff --git a/extensions/discord/src/monitor/timeouts.ts b/extensions/discord/src/monitor/timeouts.ts index f0f682114b2..c02072de4de 100644 --- a/extensions/discord/src/monitor/timeouts.ts +++ b/extensions/discord/src/monitor/timeouts.ts @@ -1,40 +1,11 @@ -const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647; - +// Compatibility constants for existing imports. Discord no longer enforces +// channel-owned listener or inbound run timeouts. export const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000; export const DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS = 30 * 60_000; + export const DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS = 60_000; export const DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS = 120_000; -function clampDiscordTimeoutMs(timeoutMs: number, minimumMs: number): number { - return Math.max(minimumMs, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS)); -} - -export function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number { - if (!Number.isFinite(raw) || (raw ?? 0) <= 0) { - return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS; - } - return clampDiscordTimeoutMs(raw!, 1_000); -} - -export function normalizeDiscordInboundWorkerTimeoutMs( - raw: number | undefined, -): number | undefined { - if (raw === 0) { - return undefined; - } - if (typeof raw !== "number" || !Number.isFinite(raw) || raw < 0) { - return DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS; - } - return clampDiscordTimeoutMs(raw, 1); -} - -export function isAbortError(error: unknown): boolean { - if (typeof error !== "object" || error === null) { - return false; - } - return "name" in error && String((error as { name?: unknown }).name) === "AbortError"; -} - export function mergeAbortSignals( signals: Array, ): AbortSignal | undefined { @@ -66,57 +37,3 @@ export function mergeAbortSignals( } return fallbackController.signal; } - -export async function runDiscordTaskWithTimeout(params: { - run: (abortSignal: AbortSignal | undefined) => Promise; - timeoutMs?: number; - abortSignals?: Array; - onTimeout: (timeoutMs: number) => void | Promise; - onAbortAfterTimeout?: () => void; - onErrorAfterTimeout?: (error: unknown) => void; -}): Promise { - const timeoutAbortController = params.timeoutMs ? new AbortController() : undefined; - const mergedAbortSignal = mergeAbortSignals([ - ...(params.abortSignals ?? []), - timeoutAbortController?.signal, - ]); - - let timedOut = false; - let timeoutHandle: ReturnType | null = null; - const runPromise = params.run(mergedAbortSignal).catch((error) => { - if (!timedOut) { - throw error; - } - if (timeoutAbortController?.signal.aborted && isAbortError(error)) { - params.onAbortAfterTimeout?.(); - return; - } - params.onErrorAfterTimeout?.(error); - }); - - try { - if (!params.timeoutMs) { - await runPromise; - return false; - } - const timeoutPromise = new Promise<"timeout">((resolve) => { - timeoutHandle = setTimeout(() => resolve("timeout"), params.timeoutMs); - timeoutHandle.unref?.(); - }); - const result = await Promise.race([ - runPromise.then(() => "completed" as const), - timeoutPromise, - ]); - if (result === "timeout") { - timedOut = true; - timeoutAbortController?.abort(); - await params.onTimeout(params.timeoutMs); - return true; - } - return false; - } finally { - if (timeoutHandle) { - clearTimeout(timeoutHandle); - } - } -} diff --git a/extensions/discord/timeouts.ts b/extensions/discord/timeouts.ts index d609cd20391..b2521b56800 100644 --- a/extensions/discord/timeouts.ts +++ b/extensions/discord/timeouts.ts @@ -1,4 +1,6 @@ export { + DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, + DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, } from "./src/monitor/timeouts.js"; diff --git a/src/config/bundled-channel-config-metadata.generated.ts b/src/config/bundled-channel-config-metadata.generated.ts index 14af30b4518..68043e02c2f 100644 --- a/src/config/bundled-channel-config-metadata.generated.ts +++ b/src/config/bundled-channel-config-metadata.generated.ts @@ -3490,8 +3490,8 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).", }, "inboundWorker.runTimeoutMs": { - label: "Discord Inbound Worker Timeout (ms)", - help: "Optional queued Discord inbound worker timeout in ms. This is separate from Carbon listener timeouts; defaults to 1800000 and can be disabled with 0. Set per account via channels.discord.accounts..inboundWorker.runTimeoutMs.", + label: "Deprecated Discord Inbound Worker Timeout", + help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.", }, "eventQueue.listenerTimeout": { label: "Discord EventQueue Listener Timeout (ms)", diff --git a/src/config/types.discord.ts b/src/config/types.discord.ts index 4ba03d9d192..e97b5f559e4 100644 --- a/src/config/types.discord.ts +++ b/src/config/types.discord.ts @@ -340,13 +340,13 @@ export type DiscordAccountConfig = { /** Streaming URL (Twitch/YouTube). Required when activityType=1. */ activityUrl?: string; /** - * In-process worker settings for queued inbound Discord runs. - * This is separate from Carbon's eventQueue listener budget. + * @deprecated Kept for config compatibility. Discord no longer enforces + * channel-owned timeouts for queued inbound agent runs. */ inboundWorker?: { /** - * Max time (ms) a queued inbound run may execute before OpenClaw aborts it. - * Defaults to 1800000 (30 minutes). Set 0 to disable the worker-owned timeout. + * @deprecated Ignored. Queued Discord agent runs are governed by the + * session/tool/runtime lifecycle, not by Discord channel config. */ runTimeoutMs?: number; }; diff --git a/src/plugins/contracts/plugin-sdk-runtime-api-guardrails.test.ts b/src/plugins/contracts/plugin-sdk-runtime-api-guardrails.test.ts index 0131633c670..c2c08fcd303 100644 --- a/src/plugins/contracts/plugin-sdk-runtime-api-guardrails.test.ts +++ b/src/plugins/contracts/plugin-sdk-runtime-api-guardrails.test.ts @@ -20,7 +20,7 @@ const RUNTIME_API_EXPORT_GUARDS: Record = { 'export { clearGateways, getGateway, registerGateway, unregisterGateway } from "./src/monitor/gateway-registry.js";', 'export { clearPresences, getPresence, presenceCacheSize, setPresence } from "./src/monitor/presence-cache.js";', 'export { __testing, autoBindSpawnedDiscordSubagent, createNoopThreadBindingManager, createThreadBindingManager, formatThreadBindingDurationLabel, getThreadBindingManager, isRecentlyUnboundThreadWebhookMessage, listThreadBindingsBySessionKey, listThreadBindingsForAccount, reconcileAcpThreadBindingsOnStartup, resolveDiscordThreadBindingIdleTimeoutMs, resolveDiscordThreadBindingMaxAgeMs, resolveThreadBindingIdleTimeoutMs, resolveThreadBindingInactivityExpiresAt, resolveThreadBindingIntroText, resolveThreadBindingMaxAgeExpiresAt, resolveThreadBindingMaxAgeMs, resolveThreadBindingPersona, resolveThreadBindingPersonaFromRecord, resolveThreadBindingsEnabled, resolveThreadBindingThreadName, setThreadBindingIdleTimeoutBySessionKey, setThreadBindingMaxAgeBySessionKey, unbindThreadBindingsBySessionKey, type AcpThreadBindingReconciliationResult, type ThreadBindingManager, type ThreadBindingRecord, type ThreadBindingTargetKind } from "./src/monitor/thread-bindings.js";', - 'export { DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, isAbortError, mergeAbortSignals, normalizeDiscordInboundWorkerTimeoutMs, normalizeDiscordListenerTimeoutMs, runDiscordTaskWithTimeout } from "./src/monitor/timeouts.js";', + 'export { DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, mergeAbortSignals } from "./src/monitor/timeouts.js";', 'export { fetchDiscordApplicationId, fetchDiscordApplicationSummary, parseApplicationIdFromToken, probeDiscord, resolveDiscordPrivilegedIntentsFromFlags, type DiscordApplicationSummary, type DiscordPrivilegedIntentsSummary, type DiscordPrivilegedIntentStatus, type DiscordProbe } from "./src/probe.js";', 'export { resolveDiscordChannelAllowlist, type DiscordChannelResolution } from "./src/resolve-channels.js";', 'export { resolveDiscordUserAllowlist, type DiscordUserResolution } from "./src/resolve-users.js";',