From 364c67bcb5b7f26a6d41024eb07b6a709de782bb Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 06:20:58 +0100 Subject: [PATCH] refactor(discord): share channel run queue --- CHANGELOG.md | 2 +- docs/.generated/config-baseline.sha256 | 4 +- .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/channels/discord.md | 7 -- docs/concepts/messages.md | 8 ++ docs/plugins/sdk-subpaths.md | 2 +- extensions/discord/src/config-ui-hints.ts | 4 - .../discord/src/monitor/message-run-queue.ts | 42 +++---- .../discord/src/monitor/provider.test.ts | 2 +- ...ndled-channel-config-metadata.generated.ts | 4 - src/config/types.discord.ts | 8 +- src/plugin-sdk/channel-lifecycle.core.ts | 60 ++++++++++ .../channel-lifecycle.queue.test.ts | 110 ++++++++++++++++++ .../contracts/plugin-sdk-subpaths.test.ts | 2 + 14 files changed, 205 insertions(+), 54 deletions(-) create mode 100644 src/plugin-sdk/channel-lifecycle.queue.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c47643d99fd..3b406b5371d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes -- Channels/Discord: remove Discord-owned queued-run timeout replies while preserving message ordering and compatibility timeout constants, so long Discord turns stay governed by session/tool/runtime lifecycle instead of channel fallback errors. Thanks @codexGW. +- Channels/Discord: remove Discord-owned queued-run timeout replies through the shared channel lifecycle queue while preserving message ordering and compatibility timeout constants, so long Discord turns stay governed by session/tool/runtime lifecycle instead of channel fallback errors. Thanks @codexGW. - Agents/tools: clamp `process.poll` waits to 30 seconds and honor abort signals while waiting, so long command polls cannot pin agent responsiveness after cancellation. Thanks @vincentkoc. - Plugin SDK: add tracked Discord component-message helpers and a Telegram account-resolution compatibility facade, so existing plugins using those subpaths resolve while new plugins stay on generic channel SDK contracts. Thanks @vincentkoc. - Shared labels: preserve Unicode combining marks and NFC-equivalent accented text in group/channel slug normalization so non-Latin labels no longer lose meaningful characters. Fixes #58932; carries forward #58942 and #58995. Thanks @fengqing-git, @Starhappysh, and @koen666. diff --git a/docs/.generated/config-baseline.sha256 b/docs/.generated/config-baseline.sha256 index 6112e2eaf2d..2478297bb74 100644 --- a/docs/.generated/config-baseline.sha256 +++ b/docs/.generated/config-baseline.sha256 @@ -1,4 +1,4 @@ -ab654d17b4d3520c81de45dbcf96a8ecef35254cfd6df21af170dd2ebe550799 config-baseline.json +6a67688ac174403c996027d90fa16eabb9aeff6a8af890b17d4628910c3b440f config-baseline.json 8bc9fda7c1096472beaa416a61043ce51d691d4dcad9ed3e0be46e68bb70b0ce config-baseline.core.json -56db8ae09c5573a453b8fb01ac579c5b9d8a69fa3fffff2ba2956e5e2ccb2f99 config-baseline.channel.json +9f5fad66a49fa618d64a963470aa69fed9fe4b4639cc4321f9ec04bfb2f8aa50 config-baseline.channel.json 0dd6583fafae6c9134e46c4cf9bddee9822d6436436dcb1a6dcba6d012962e51 config-baseline.plugin.json diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index be24b66064b..dd0fd585289 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -427eb476f48ad368fd7061297727a7634be75612aedef5de91e351ac446553ce plugin-sdk-api-baseline.json -6065b278792b4664d31c07ec46f852c3d99c8882adb4b37db3d4f2fe78a74af8 plugin-sdk-api-baseline.jsonl +eedcf9070e222077f618d68510c909b571dc51fbb030284ff3b30728719f7ae0 plugin-sdk-api-baseline.json +02043e1f48a15625580ed1e1ec569ccd1c7c9ad393be2aa54a1fa36afeeca7b5 plugin-sdk-api-baseline.jsonl diff --git a/docs/channels/discord.md b/docs/channels/discord.md index 7c4de9bf8a4..6f4181ce1a4 100644 --- a/docs/channels/discord.md +++ b/docs/channels/discord.md @@ -1105,12 +1105,6 @@ openclaw logs --follow Discord does not apply a channel-owned timeout to queued agent turns. Message listeners hand off immediately, and queued Discord runs preserve per-session ordering until the session/tool/runtime lifecycle completes or aborts the work. - Deprecated compatibility setting: - - - `channels.discord.inboundWorker.runTimeoutMs` - - `channels.discord.accounts..inboundWorker.runTimeoutMs` - - ignored by current Discord message handling - ```json5 { channels: { @@ -1187,7 +1181,6 @@ Primary reference: [Configuration reference - Discord](/gateway/config-channels# - policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*` - command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*` - event queue: `eventQueue.listenerTimeout` (listener budget), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency` -- deprecated compatibility: `inboundWorker.runTimeoutMs` (ignored) - gateway metadata: `gatewayInfoTimeoutMs` - reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit` - delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage` diff --git a/docs/concepts/messages.md b/docs/concepts/messages.md index 98fa9e73589..4ad3e6b6473 100644 --- a/docs/concepts/messages.md +++ b/docs/concepts/messages.md @@ -128,6 +128,14 @@ current run, or collected for a followup turn. Details: [Queueing](/concepts/queue). +## Channel run ownership + +Channel plugins may preserve ordering, debounce input, and apply transport +backpressure before a message enters the session queue. They should not impose a +separate timeout around the agent turn itself. Once a message is routed to a +session, long-running work is governed by the session, tool, and runtime +lifecycle so all channels report and recover from slow turns consistently. + ## Streaming, chunking, and batching Block streaming sends partial replies as the model produces text blocks. diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index 52ec6856b79..23fad984a6f 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -64,7 +64,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview) | `plugin-sdk/telegram-command-config` | Telegram custom-command normalization/validation helpers with bundled-contract fallback | | `plugin-sdk/command-gating` | Narrow command authorization gate helpers | | `plugin-sdk/channel-policy` | `resolveChannelGroupRequireMention` | - | `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, draft stream lifecycle/finalization helpers | + | `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, `createChannelRunQueue`, draft stream lifecycle/finalization helpers | | `plugin-sdk/inbound-envelope` | Shared inbound route + envelope builder helpers | | `plugin-sdk/inbound-reply-dispatch` | Shared inbound record-and-dispatch helpers | | `plugin-sdk/messaging-targets` | Target parsing/matching helpers | diff --git a/extensions/discord/src/config-ui-hints.ts b/extensions/discord/src/config-ui-hints.ts index 68b8902ce90..bb6055f3a62 100644 --- a/extensions/discord/src/config-ui-hints.ts +++ b/extensions/discord/src/config-ui-hints.ts @@ -89,10 +89,6 @@ export const discordChannelConfigUiHints = { label: "Discord Thread Parent Inheritance", help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).", }, - "inboundWorker.runTimeoutMs": { - label: "Deprecated Discord Inbound Worker Timeout", - help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.", - }, "eventQueue.listenerTimeout": { label: "Discord EventQueue Listener Timeout (ms)", help: "Canonical Discord listener timeout control in ms for gateway normalization/enqueue handlers. Default is 120000 in OpenClaw; set per account via channels.discord.accounts..eventQueue.listenerTimeout.", diff --git a/extensions/discord/src/monitor/message-run-queue.ts b/extensions/discord/src/monitor/message-run-queue.ts index c44037e44e3..7794e301a35 100644 --- a/extensions/discord/src/monitor/message-run-queue.ts +++ b/extensions/discord/src/monitor/message-run-queue.ts @@ -1,5 +1,4 @@ -import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle"; -import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue"; +import { createChannelRunQueue } from "openclaw/plugin-sdk/channel-lifecycle"; import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { danger } from "openclaw/plugin-sdk/runtime-env"; import { @@ -77,39 +76,26 @@ async function processDiscordQueuedMessage(params: { export function createDiscordMessageRunQueue( params: DiscordMessageRunQueueParams, ): DiscordMessageRunQueue { - const runQueue = new KeyedAsyncQueue(); - const runState = createRunStateMachine({ + const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard(); + const runQueue = createChannelRunQueue({ setStatus: params.setStatus, abortSignal: params.abortSignal, + onError: (error) => { + params.runtime.error?.(danger(`discord message run failed: ${String(error)}`)); + }, }); - const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard(); return { enqueue(job) { - void runQueue - .enqueue(job.queueKey, async () => { - if (!runState.isActive()) { - return; - } - runState.onRunStart(); - try { - if (!runState.isActive()) { - return; - } - await processDiscordQueuedMessage({ - job, - lifecycleSignal: params.abortSignal, - replayGuard, - testing: params.__testing, - }); - } finally { - runState.onRunEnd(); - } - }) - .catch((error) => { - params.runtime.error?.(danger(`discord message run failed: ${String(error)}`)); + runQueue.enqueue(job.queueKey, async ({ lifecycleSignal }) => { + await processDiscordQueuedMessage({ + job, + lifecycleSignal, + replayGuard, + testing: params.__testing, }); + }); }, - deactivate: runState.deactivate, + deactivate: runQueue.deactivate, }; } diff --git a/extensions/discord/src/monitor/provider.test.ts b/extensions/discord/src/monitor/provider.test.ts index 9cd3b944da8..9ccb67ee159 100644 --- a/extensions/discord/src/monitor/provider.test.ts +++ b/extensions/discord/src/monitor/provider.test.ts @@ -653,7 +653,7 @@ describe("monitorDiscordProvider", () => { expect("listenerTimeoutMs" in (params ?? {})).toBe(false); }); - it("ignores deprecated inbound worker timeout config", async () => { + it("ignores legacy inbound worker timeout config", async () => { resolveDiscordAccountMock.mockReturnValue({ accountId: "default", token: "MTIz.abc.def", diff --git a/src/config/bundled-channel-config-metadata.generated.ts b/src/config/bundled-channel-config-metadata.generated.ts index 68043e02c2f..68576795da7 100644 --- a/src/config/bundled-channel-config-metadata.generated.ts +++ b/src/config/bundled-channel-config-metadata.generated.ts @@ -3489,10 +3489,6 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [ label: "Discord Thread Parent Inheritance", help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).", }, - "inboundWorker.runTimeoutMs": { - label: "Deprecated Discord Inbound Worker Timeout", - help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.", - }, "eventQueue.listenerTimeout": { label: "Discord EventQueue Listener Timeout (ms)", help: "Canonical Discord listener timeout control in ms for gateway normalization/enqueue handlers. Default is 120000 in OpenClaw; set per account via channels.discord.accounts..eventQueue.listenerTimeout.", diff --git a/src/config/types.discord.ts b/src/config/types.discord.ts index e97b5f559e4..f6e0a011c08 100644 --- a/src/config/types.discord.ts +++ b/src/config/types.discord.ts @@ -340,13 +340,13 @@ export type DiscordAccountConfig = { /** Streaming URL (Twitch/YouTube). Required when activityType=1. */ activityUrl?: string; /** - * @deprecated Kept for config compatibility. Discord no longer enforces - * channel-owned timeouts for queued inbound agent runs. + * Legacy compatibility block. Discord no longer enforces channel-owned + * timeouts for queued inbound agent runs. */ inboundWorker?: { /** - * @deprecated Ignored. Queued Discord agent runs are governed by the - * session/tool/runtime lifecycle, not by Discord channel config. + * Ignored. Queued Discord agent runs are governed by the session/tool/runtime + * lifecycle, not by Discord channel config. */ runTimeoutMs?: number; }; diff --git a/src/plugin-sdk/channel-lifecycle.core.ts b/src/plugin-sdk/channel-lifecycle.core.ts index 28045aeb058..ef3ae7c4e0c 100644 --- a/src/plugin-sdk/channel-lifecycle.core.ts +++ b/src/plugin-sdk/channel-lifecycle.core.ts @@ -1,4 +1,6 @@ import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js"; +import { createRunStateMachine, type RunStateStatusSink } from "../channels/run-state-machine.js"; +import { KeyedAsyncQueue } from "./keyed-async-queue.js"; type CloseAwareServer = { once: (event: "close", listener: () => void) => unknown; @@ -11,6 +13,21 @@ type PassiveAccountLifecycleParams = { onStop?: () => void | Promise; }; +export type ChannelRunQueueTaskContext = { + lifecycleSignal?: AbortSignal; +}; + +export type ChannelRunQueue = { + enqueue: (key: string, task: (context: ChannelRunQueueTaskContext) => Promise) => void; + deactivate: () => void; +}; + +export type ChannelRunQueueParams = { + setStatus?: RunStateStatusSink; + abortSignal?: AbortSignal; + onError?: (error: unknown) => void; +}; + /** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */ export function createAccountStatusSink(params: { accountId: string; @@ -21,6 +38,49 @@ export function createAccountStatusSink(params: { }; } +/** + * Serialize channel work per key while keeping lifecycle/busy accounting out of + * channel-specific message handlers. The queue does not impose run timeouts; + * callers should rely on session/tool/runtime lifecycle for long-running work. + */ +export function createChannelRunQueue(params: ChannelRunQueueParams): ChannelRunQueue { + const queue = new KeyedAsyncQueue(); + const runState = createRunStateMachine({ + setStatus: params.setStatus, + abortSignal: params.abortSignal, + }); + const reportError = (error: unknown) => { + try { + params.onError?.(error); + } catch { + // Keep queue error handling best-effort; callers should not create a + // secondary unhandled rejection from their reporting hook. + } + }; + + return { + enqueue(key, task) { + void queue + .enqueue(key, async () => { + if (!runState.isActive()) { + return; + } + runState.onRunStart(); + try { + if (!runState.isActive()) { + return; + } + await task({ lifecycleSignal: params.abortSignal }); + } finally { + runState.onRunEnd(); + } + }) + .catch(reportError); + }, + deactivate: runState.deactivate, + }; +} + /** * Return a promise that resolves when the signal is aborted. * diff --git a/src/plugin-sdk/channel-lifecycle.queue.test.ts b/src/plugin-sdk/channel-lifecycle.queue.test.ts new file mode 100644 index 00000000000..d4afb4aa717 --- /dev/null +++ b/src/plugin-sdk/channel-lifecycle.queue.test.ts @@ -0,0 +1,110 @@ +import { describe, expect, it, vi } from "vitest"; +import { createChannelRunQueue } from "./channel-lifecycle.core.js"; + +function createDeferred() { + let resolve: (() => void) | undefined; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return { promise, resolve }; +} + +async function flushAsyncWork() { + for (let i = 0; i < 20; i += 1) { + await Promise.resolve(); + } +} + +describe("createChannelRunQueue", () => { + it("serializes work per key while allowing unrelated keys to run", async () => { + const first = createDeferred(); + const second = createDeferred(); + const third = createDeferred(); + const order: string[] = []; + const queue = createChannelRunQueue({}); + + queue.enqueue("same", async () => { + order.push("start:first"); + await first.promise; + order.push("end:first"); + }); + queue.enqueue("same", async () => { + order.push("start:second"); + await second.promise; + order.push("end:second"); + }); + queue.enqueue("other", async () => { + order.push("start:third"); + await third.promise; + order.push("end:third"); + }); + + await flushAsyncWork(); + expect(order).toEqual(["start:first", "start:third"]); + + third.resolve?.(); + await third.promise; + await flushAsyncWork(); + expect(order).toEqual(["start:first", "start:third", "end:third"]); + + first.resolve?.(); + await first.promise; + await flushAsyncWork(); + expect(order).toEqual(["start:first", "start:third", "end:third", "end:first", "start:second"]); + + second.resolve?.(); + await second.promise; + }); + + it("updates run status and routes async errors", async () => { + const setStatus = vi.fn(); + const onError = vi.fn(); + const queue = createChannelRunQueue({ setStatus, onError }); + + queue.enqueue("key", async () => { + throw new Error("boom"); + }); + + await flushAsyncWork(); + + expect(setStatus).toHaveBeenCalledWith({ activeRuns: 0, busy: false }); + expect(setStatus).toHaveBeenCalledWith(expect.objectContaining({ activeRuns: 1, busy: true })); + expect(setStatus).toHaveBeenLastCalledWith( + expect.objectContaining({ activeRuns: 0, busy: false }), + ); + expect(onError).toHaveBeenCalledWith(expect.any(Error)); + }); + + it("contains reporting hook errors", async () => { + const queue = createChannelRunQueue({ + onError: () => { + throw new Error("report failed"); + }, + }); + + queue.enqueue("key", async () => { + throw new Error("boom"); + }); + + await flushAsyncWork(); + }); + + it("skips queued work after deactivation", async () => { + const first = createDeferred(); + const task = vi.fn(); + const queue = createChannelRunQueue({}); + + queue.enqueue("key", async () => { + await first.promise; + }); + queue.enqueue("key", task); + await flushAsyncWork(); + + queue.deactivate(); + first.resolve?.(); + await first.promise; + await flushAsyncWork(); + + expect(task).not.toHaveBeenCalled(); + }); +}); diff --git a/src/plugins/contracts/plugin-sdk-subpaths.test.ts b/src/plugins/contracts/plugin-sdk-subpaths.test.ts index 416410a6ba1..1a6dd47ba50 100644 --- a/src/plugins/contracts/plugin-sdk-subpaths.test.ts +++ b/src/plugins/contracts/plugin-sdk-subpaths.test.ts @@ -848,6 +848,7 @@ describe("plugin-sdk subpath exports", () => { "createDraftStreamLoop", "createLoggedPairingApprovalNotifier", "createPairingPrefixStripper", + "createChannelRunQueue", "createRunStateMachine", "createRuntimeDirectoryLiveAdapter", "createRuntimeOutboundDelegates", @@ -1298,6 +1299,7 @@ describe("plugin-sdk subpath exports", () => { expect(typeof channelLifecycleSdk.createDraftStreamLoop).toBe("function"); expect(typeof channelLifecycleSdk.createFinalizableDraftLifecycle).toBe("function"); + expect(typeof channelLifecycleSdk.createChannelRunQueue).toBe("function"); expect(typeof channelLifecycleSdk.runPassiveAccountLifecycle).toBe("function"); expect(typeof channelLifecycleSdk.createRunStateMachine).toBe("function"); expect(typeof channelLifecycleSdk.createArmableStallWatchdog).toBe("function");