refactor(discord): share channel run queue

This commit is contained in:
Peter Steinberger
2026-04-29 06:20:58 +01:00
parent 996c9d71e9
commit 364c67bcb5
14 changed files with 205 additions and 54 deletions

View File

@@ -13,7 +13,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Channels/Discord: remove Discord-owned queued-run timeout replies while preserving message ordering and compatibility timeout constants, so long Discord turns stay governed by session/tool/runtime lifecycle instead of channel fallback errors. Thanks @codexGW.
- Channels/Discord: remove Discord-owned queued-run timeout replies through the shared channel lifecycle queue while preserving message ordering and compatibility timeout constants, so long Discord turns stay governed by session/tool/runtime lifecycle instead of channel fallback errors. Thanks @codexGW.
- Agents/tools: clamp `process.poll` waits to 30 seconds and honor abort signals while waiting, so long command polls cannot pin agent responsiveness after cancellation. Thanks @vincentkoc.
- Plugin SDK: add tracked Discord component-message helpers and a Telegram account-resolution compatibility facade, so existing plugins using those subpaths resolve while new plugins stay on generic channel SDK contracts. Thanks @vincentkoc.
- Shared labels: preserve Unicode combining marks and NFC-equivalent accented text in group/channel slug normalization so non-Latin labels no longer lose meaningful characters. Fixes #58932; carries forward #58942 and #58995. Thanks @fengqing-git, @Starhappysh, and @koen666.

View File

@@ -1,4 +1,4 @@
ab654d17b4d3520c81de45dbcf96a8ecef35254cfd6df21af170dd2ebe550799 config-baseline.json
6a67688ac174403c996027d90fa16eabb9aeff6a8af890b17d4628910c3b440f config-baseline.json
8bc9fda7c1096472beaa416a61043ce51d691d4dcad9ed3e0be46e68bb70b0ce config-baseline.core.json
56db8ae09c5573a453b8fb01ac579c5b9d8a69fa3fffff2ba2956e5e2ccb2f99 config-baseline.channel.json
9f5fad66a49fa618d64a963470aa69fed9fe4b4639cc4321f9ec04bfb2f8aa50 config-baseline.channel.json
0dd6583fafae6c9134e46c4cf9bddee9822d6436436dcb1a6dcba6d012962e51 config-baseline.plugin.json

View File

@@ -1,2 +1,2 @@
427eb476f48ad368fd7061297727a7634be75612aedef5de91e351ac446553ce plugin-sdk-api-baseline.json
6065b278792b4664d31c07ec46f852c3d99c8882adb4b37db3d4f2fe78a74af8 plugin-sdk-api-baseline.jsonl
eedcf9070e222077f618d68510c909b571dc51fbb030284ff3b30728719f7ae0 plugin-sdk-api-baseline.json
02043e1f48a15625580ed1e1ec569ccd1c7c9ad393be2aa54a1fa36afeeca7b5 plugin-sdk-api-baseline.jsonl

View File

@@ -1105,12 +1105,6 @@ openclaw logs --follow
Discord does not apply a channel-owned timeout to queued agent turns. Message listeners hand off immediately, and queued Discord runs preserve per-session ordering until the session/tool/runtime lifecycle completes or aborts the work.
Deprecated compatibility setting:
- `channels.discord.inboundWorker.runTimeoutMs`
- `channels.discord.accounts.<accountId>.inboundWorker.runTimeoutMs`
- ignored by current Discord message handling
```json5
{
channels: {
@@ -1187,7 +1181,6 @@ Primary reference: [Configuration reference - Discord](/gateway/config-channels#
- policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*`
- command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*`
- event queue: `eventQueue.listenerTimeout` (listener budget), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency`
- deprecated compatibility: `inboundWorker.runTimeoutMs` (ignored)
- gateway metadata: `gatewayInfoTimeoutMs`
- reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit`
- delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage`

View File

@@ -128,6 +128,14 @@ current run, or collected for a followup turn.
Details: [Queueing](/concepts/queue).
## Channel run ownership
Channel plugins may preserve ordering, debounce input, and apply transport
backpressure before a message enters the session queue. They should not impose a
separate timeout around the agent turn itself. Once a message is routed to a
session, long-running work is governed by the session, tool, and runtime
lifecycle so all channels report and recover from slow turns consistently.
## Streaming, chunking, and batching
Block streaming sends partial replies as the model produces text blocks.

View File

@@ -64,7 +64,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview)
| `plugin-sdk/telegram-command-config` | Telegram custom-command normalization/validation helpers with bundled-contract fallback |
| `plugin-sdk/command-gating` | Narrow command authorization gate helpers |
| `plugin-sdk/channel-policy` | `resolveChannelGroupRequireMention` |
| `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, draft stream lifecycle/finalization helpers |
| `plugin-sdk/channel-lifecycle` | `createAccountStatusSink`, `createChannelRunQueue`, draft stream lifecycle/finalization helpers |
| `plugin-sdk/inbound-envelope` | Shared inbound route + envelope builder helpers |
| `plugin-sdk/inbound-reply-dispatch` | Shared inbound record-and-dispatch helpers |
| `plugin-sdk/messaging-targets` | Target parsing/matching helpers |

View File

@@ -89,10 +89,6 @@ export const discordChannelConfigUiHints = {
label: "Discord Thread Parent Inheritance",
help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).",
},
"inboundWorker.runTimeoutMs": {
label: "Deprecated Discord Inbound Worker Timeout",
help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.",
},
"eventQueue.listenerTimeout": {
label: "Discord EventQueue Listener Timeout (ms)",
help: "Canonical Discord listener timeout control in ms for gateway normalization/enqueue handlers. Default is 120000 in OpenClaw; set per account via channels.discord.accounts.<id>.eventQueue.listenerTimeout.",

View File

@@ -1,5 +1,4 @@
import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle";
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
import { createChannelRunQueue } from "openclaw/plugin-sdk/channel-lifecycle";
import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
import { danger } from "openclaw/plugin-sdk/runtime-env";
import {
@@ -77,39 +76,26 @@ async function processDiscordQueuedMessage(params: {
export function createDiscordMessageRunQueue(
params: DiscordMessageRunQueueParams,
): DiscordMessageRunQueue {
const runQueue = new KeyedAsyncQueue();
const runState = createRunStateMachine({
const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard();
const runQueue = createChannelRunQueue({
setStatus: params.setStatus,
abortSignal: params.abortSignal,
onError: (error) => {
params.runtime.error?.(danger(`discord message run failed: ${String(error)}`));
},
});
const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard();
return {
enqueue(job) {
void runQueue
.enqueue(job.queueKey, async () => {
if (!runState.isActive()) {
return;
}
runState.onRunStart();
try {
if (!runState.isActive()) {
return;
}
await processDiscordQueuedMessage({
job,
lifecycleSignal: params.abortSignal,
replayGuard,
testing: params.__testing,
});
} finally {
runState.onRunEnd();
}
})
.catch((error) => {
params.runtime.error?.(danger(`discord message run failed: ${String(error)}`));
runQueue.enqueue(job.queueKey, async ({ lifecycleSignal }) => {
await processDiscordQueuedMessage({
job,
lifecycleSignal,
replayGuard,
testing: params.__testing,
});
});
},
deactivate: runState.deactivate,
deactivate: runQueue.deactivate,
};
}

View File

@@ -653,7 +653,7 @@ describe("monitorDiscordProvider", () => {
expect("listenerTimeoutMs" in (params ?? {})).toBe(false);
});
it("ignores deprecated inbound worker timeout config", async () => {
it("ignores legacy inbound worker timeout config", async () => {
resolveDiscordAccountMock.mockReturnValue({
accountId: "default",
token: "MTIz.abc.def",

View File

@@ -3489,10 +3489,6 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
label: "Discord Thread Parent Inheritance",
help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).",
},
"inboundWorker.runTimeoutMs": {
label: "Deprecated Discord Inbound Worker Timeout",
help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.",
},
"eventQueue.listenerTimeout": {
label: "Discord EventQueue Listener Timeout (ms)",
help: "Canonical Discord listener timeout control in ms for gateway normalization/enqueue handlers. Default is 120000 in OpenClaw; set per account via channels.discord.accounts.<id>.eventQueue.listenerTimeout.",

View File

@@ -340,13 +340,13 @@ export type DiscordAccountConfig = {
/** Streaming URL (Twitch/YouTube). Required when activityType=1. */
activityUrl?: string;
/**
* @deprecated Kept for config compatibility. Discord no longer enforces
* channel-owned timeouts for queued inbound agent runs.
* Legacy compatibility block. Discord no longer enforces channel-owned
* timeouts for queued inbound agent runs.
*/
inboundWorker?: {
/**
* @deprecated Ignored. Queued Discord agent runs are governed by the
* session/tool/runtime lifecycle, not by Discord channel config.
* Ignored. Queued Discord agent runs are governed by the session/tool/runtime
* lifecycle, not by Discord channel config.
*/
runTimeoutMs?: number;
};

View File

@@ -1,4 +1,6 @@
import type { ChannelAccountSnapshot } from "../channels/plugins/types.core.js";
import { createRunStateMachine, type RunStateStatusSink } from "../channels/run-state-machine.js";
import { KeyedAsyncQueue } from "./keyed-async-queue.js";
type CloseAwareServer = {
once: (event: "close", listener: () => void) => unknown;
@@ -11,6 +13,21 @@ type PassiveAccountLifecycleParams<Handle> = {
onStop?: () => void | Promise<void>;
};
export type ChannelRunQueueTaskContext = {
lifecycleSignal?: AbortSignal;
};
export type ChannelRunQueue = {
enqueue: (key: string, task: (context: ChannelRunQueueTaskContext) => Promise<void>) => void;
deactivate: () => void;
};
export type ChannelRunQueueParams = {
setStatus?: RunStateStatusSink;
abortSignal?: AbortSignal;
onError?: (error: unknown) => void;
};
/** Bind a fixed account id into a status writer so lifecycle code can emit partial snapshots. */
export function createAccountStatusSink(params: {
accountId: string;
@@ -21,6 +38,49 @@ export function createAccountStatusSink(params: {
};
}
/**
* Serialize channel work per key while keeping lifecycle/busy accounting out of
* channel-specific message handlers. The queue does not impose run timeouts;
* callers should rely on session/tool/runtime lifecycle for long-running work.
*/
export function createChannelRunQueue(params: ChannelRunQueueParams): ChannelRunQueue {
const queue = new KeyedAsyncQueue();
const runState = createRunStateMachine({
setStatus: params.setStatus,
abortSignal: params.abortSignal,
});
const reportError = (error: unknown) => {
try {
params.onError?.(error);
} catch {
// Keep queue error handling best-effort; callers should not create a
// secondary unhandled rejection from their reporting hook.
}
};
return {
enqueue(key, task) {
void queue
.enqueue(key, async () => {
if (!runState.isActive()) {
return;
}
runState.onRunStart();
try {
if (!runState.isActive()) {
return;
}
await task({ lifecycleSignal: params.abortSignal });
} finally {
runState.onRunEnd();
}
})
.catch(reportError);
},
deactivate: runState.deactivate,
};
}
/**
* Return a promise that resolves when the signal is aborted.
*

View File

@@ -0,0 +1,110 @@
import { describe, expect, it, vi } from "vitest";
import { createChannelRunQueue } from "./channel-lifecycle.core.js";
function createDeferred() {
let resolve: (() => void) | undefined;
const promise = new Promise<void>((innerResolve) => {
resolve = innerResolve;
});
return { promise, resolve };
}
async function flushAsyncWork() {
for (let i = 0; i < 20; i += 1) {
await Promise.resolve();
}
}
describe("createChannelRunQueue", () => {
it("serializes work per key while allowing unrelated keys to run", async () => {
const first = createDeferred();
const second = createDeferred();
const third = createDeferred();
const order: string[] = [];
const queue = createChannelRunQueue({});
queue.enqueue("same", async () => {
order.push("start:first");
await first.promise;
order.push("end:first");
});
queue.enqueue("same", async () => {
order.push("start:second");
await second.promise;
order.push("end:second");
});
queue.enqueue("other", async () => {
order.push("start:third");
await third.promise;
order.push("end:third");
});
await flushAsyncWork();
expect(order).toEqual(["start:first", "start:third"]);
third.resolve?.();
await third.promise;
await flushAsyncWork();
expect(order).toEqual(["start:first", "start:third", "end:third"]);
first.resolve?.();
await first.promise;
await flushAsyncWork();
expect(order).toEqual(["start:first", "start:third", "end:third", "end:first", "start:second"]);
second.resolve?.();
await second.promise;
});
it("updates run status and routes async errors", async () => {
const setStatus = vi.fn();
const onError = vi.fn();
const queue = createChannelRunQueue({ setStatus, onError });
queue.enqueue("key", async () => {
throw new Error("boom");
});
await flushAsyncWork();
expect(setStatus).toHaveBeenCalledWith({ activeRuns: 0, busy: false });
expect(setStatus).toHaveBeenCalledWith(expect.objectContaining({ activeRuns: 1, busy: true }));
expect(setStatus).toHaveBeenLastCalledWith(
expect.objectContaining({ activeRuns: 0, busy: false }),
);
expect(onError).toHaveBeenCalledWith(expect.any(Error));
});
it("contains reporting hook errors", async () => {
const queue = createChannelRunQueue({
onError: () => {
throw new Error("report failed");
},
});
queue.enqueue("key", async () => {
throw new Error("boom");
});
await flushAsyncWork();
});
it("skips queued work after deactivation", async () => {
const first = createDeferred();
const task = vi.fn();
const queue = createChannelRunQueue({});
queue.enqueue("key", async () => {
await first.promise;
});
queue.enqueue("key", task);
await flushAsyncWork();
queue.deactivate();
first.resolve?.();
await first.promise;
await flushAsyncWork();
expect(task).not.toHaveBeenCalled();
});
});

View File

@@ -848,6 +848,7 @@ describe("plugin-sdk subpath exports", () => {
"createDraftStreamLoop",
"createLoggedPairingApprovalNotifier",
"createPairingPrefixStripper",
"createChannelRunQueue",
"createRunStateMachine",
"createRuntimeDirectoryLiveAdapter",
"createRuntimeOutboundDelegates",
@@ -1298,6 +1299,7 @@ describe("plugin-sdk subpath exports", () => {
expect(typeof channelLifecycleSdk.createDraftStreamLoop).toBe("function");
expect(typeof channelLifecycleSdk.createFinalizableDraftLifecycle).toBe("function");
expect(typeof channelLifecycleSdk.createChannelRunQueue).toBe("function");
expect(typeof channelLifecycleSdk.runPassiveAccountLifecycle).toBe("function");
expect(typeof channelLifecycleSdk.createRunStateMachine).toBe("function");
expect(typeof channelLifecycleSdk.createArmableStallWatchdog).toBe("function");