mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 11:30:43 +00:00
fix(discord): remove channel run timeouts
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
8903fd57a58acb9a7c949efc6b4197b249220dcd965420ceb7d884cb45fbc48d config-baseline.json
|
||||
86ad0927d992bc873affb3e20a31c6e3c95b2185a91f46cc8e6262a723a78f7d config-baseline.core.json
|
||||
bb7234c52b0bbf12de2a87fa553ec4e89e13aaba9d0d81cf1370621292da13e9 config-baseline.channel.json
|
||||
1f5592bfd141ba1e982ce31763a253c10afb080ab4ea2b6538299b114e29cee1 config-baseline.plugin.json
|
||||
ab654d17b4d3520c81de45dbcf96a8ecef35254cfd6df21af170dd2ebe550799 config-baseline.json
|
||||
8bc9fda7c1096472beaa416a61043ce51d691d4dcad9ed3e0be46e68bb70b0ce config-baseline.core.json
|
||||
56db8ae09c5573a453b8fb01ac579c5b9d8a69fa3fffff2ba2956e5e2ccb2f99 config-baseline.channel.json
|
||||
0dd6583fafae6c9134e46c4cf9bddee9822d6436436dcb1a6dcba6d012962e51 config-baseline.plugin.json
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
c65b1aa1fb4cf402b90bedd3614eb5d7c3903ab860856392d1ee2481818a7a22 plugin-sdk-api-baseline.json
|
||||
da172742470204044c1542a3bba7f183161e90e742f6865c1c7f822dbdc7a7d6 plugin-sdk-api-baseline.jsonl
|
||||
427eb476f48ad368fd7061297727a7634be75612aedef5de91e351ac446553ce plugin-sdk-api-baseline.json
|
||||
6065b278792b4664d31c07ec46f852c3d99c8882adb4b37db3d4f2fe78a74af8 plugin-sdk-api-baseline.jsonl
|
||||
|
||||
@@ -1090,26 +1090,26 @@ openclaw logs --follow
|
||||
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="Long-running handlers time out or duplicate replies">
|
||||
<Accordion title="Long-running Discord turns or duplicate replies">
|
||||
|
||||
Typical logs:
|
||||
|
||||
- `Listener DiscordMessageListener timed out after 30000ms for event MESSAGE_CREATE`
|
||||
- `Slow listener detected ...`
|
||||
- `discord inbound worker timed out after ...`
|
||||
- `stuck session: sessionKey=agent:...:discord:... state=processing ...`
|
||||
|
||||
Listener budget knob:
|
||||
Carbon gateway queue knobs:
|
||||
|
||||
- single-account: `channels.discord.eventQueue.listenerTimeout`
|
||||
- multi-account: `channels.discord.accounts.<accountId>.eventQueue.listenerTimeout`
|
||||
- this only controls Carbon gateway listener work, not agent turn lifetime
|
||||
|
||||
Worker run timeout knob:
|
||||
Discord does not apply a channel-owned timeout to queued agent turns. Message listeners hand off immediately, and queued Discord runs preserve per-session ordering until the session/tool/runtime lifecycle completes or aborts the work.
|
||||
|
||||
- single-account: `channels.discord.inboundWorker.runTimeoutMs`
|
||||
- multi-account: `channels.discord.accounts.<accountId>.inboundWorker.runTimeoutMs`
|
||||
- default: `1800000` (30 minutes); set `0` to disable
|
||||
Deprecated compatibility setting:
|
||||
|
||||
Recommended baseline:
|
||||
- `channels.discord.inboundWorker.runTimeoutMs`
|
||||
- `channels.discord.accounts.<accountId>.inboundWorker.runTimeoutMs`
|
||||
- ignored by current Discord message handling
|
||||
|
||||
```json5
|
||||
{
|
||||
@@ -1120,9 +1120,6 @@ openclaw logs --follow
|
||||
eventQueue: {
|
||||
listenerTimeout: 120000,
|
||||
},
|
||||
inboundWorker: {
|
||||
runTimeoutMs: 1800000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -1130,9 +1127,6 @@ openclaw logs --follow
|
||||
}
|
||||
```
|
||||
|
||||
Use `eventQueue.listenerTimeout` for slow listener setup and `inboundWorker.runTimeoutMs`
|
||||
only if you want a separate safety valve for queued agent turns.
|
||||
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="Gateway metadata lookup timeout warnings">
|
||||
@@ -1193,7 +1187,7 @@ Primary reference: [Configuration reference - Discord](/gateway/config-channels#
|
||||
- policy: `groupPolicy`, `dm.*`, `guilds.*`, `guilds.*.channels.*`
|
||||
- command: `commands.native`, `commands.useAccessGroups`, `configWrites`, `slashCommand.*`
|
||||
- event queue: `eventQueue.listenerTimeout` (listener budget), `eventQueue.maxQueueSize`, `eventQueue.maxConcurrency`
|
||||
- inbound worker: `inboundWorker.runTimeoutMs`
|
||||
- deprecated compatibility: `inboundWorker.runTimeoutMs` (ignored)
|
||||
- gateway metadata: `gatewayInfoTimeoutMs`
|
||||
- reply/history: `replyToMode`, `historyLimit`, `dmHistoryLimit`, `dms.*.historyLimit`
|
||||
- delivery: `textChunkLimit`, `chunkMode`, `maxLinesPerMessage`
|
||||
|
||||
@@ -111,6 +111,8 @@ export {
|
||||
export { collectDiscordSecurityAuditFindings } from "./src/security-audit.js";
|
||||
export { resolveDiscordRuntimeGroupPolicy } from "./src/runtime-group-policy.js";
|
||||
export {
|
||||
DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS,
|
||||
DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS,
|
||||
DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS,
|
||||
DISCORD_DEFAULT_LISTENER_TIMEOUT_MS,
|
||||
} from "./src/monitor/timeouts.js";
|
||||
|
||||
@@ -96,11 +96,7 @@ export {
|
||||
DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS,
|
||||
DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS,
|
||||
DISCORD_DEFAULT_LISTENER_TIMEOUT_MS,
|
||||
isAbortError,
|
||||
mergeAbortSignals,
|
||||
normalizeDiscordInboundWorkerTimeoutMs,
|
||||
normalizeDiscordListenerTimeoutMs,
|
||||
runDiscordTaskWithTimeout,
|
||||
} from "./src/monitor/timeouts.js";
|
||||
export {
|
||||
fetchDiscordApplicationId,
|
||||
|
||||
@@ -90,8 +90,8 @@ export const discordChannelConfigUiHints = {
|
||||
help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).",
|
||||
},
|
||||
"inboundWorker.runTimeoutMs": {
|
||||
label: "Discord Inbound Worker Timeout (ms)",
|
||||
help: "Optional queued Discord inbound worker timeout in ms. This is separate from Carbon listener timeouts; defaults to 1800000 and can be disabled with 0. Set per account via channels.discord.accounts.<id>.inboundWorker.runTimeoutMs.",
|
||||
label: "Deprecated Discord Inbound Worker Timeout",
|
||||
help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.",
|
||||
},
|
||||
"eventQueue.listenerTimeout": {
|
||||
label: "Discord EventQueue Listener Timeout (ms)",
|
||||
|
||||
@@ -192,7 +192,7 @@ describe("DiscordMessageListener", () => {
|
||||
expect(logger.error).toHaveBeenCalledWith(expect.stringContaining("discord handler failed"));
|
||||
});
|
||||
|
||||
it("does not apply its own slow-listener logging (owned by inbound worker)", async () => {
|
||||
it("does not apply its own slow-listener logging", async () => {
|
||||
const deferred = createDeferred();
|
||||
const handler = vi.fn(() => deferred.promise);
|
||||
const logger = {
|
||||
@@ -212,8 +212,7 @@ describe("DiscordMessageListener", () => {
|
||||
deferred.resolve();
|
||||
await flushAsyncWork();
|
||||
expect(handler).toHaveBeenCalledOnce();
|
||||
// The listener no longer wraps handlers with slow-listener logging;
|
||||
// that responsibility moved to the inbound worker.
|
||||
// The listener no longer wraps message handlers with slow-listener logging.
|
||||
expect(logger.warn).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,247 +0,0 @@
|
||||
import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle";
|
||||
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
|
||||
import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
|
||||
import { danger, formatDurationSeconds } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
|
||||
import {
|
||||
commitDiscordInboundReplay,
|
||||
createDiscordInboundReplayGuard,
|
||||
DiscordRetryableInboundError,
|
||||
releaseDiscordInboundReplay,
|
||||
} from "./inbound-dedupe.js";
|
||||
import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js";
|
||||
import type { RuntimeEnv } from "./message-handler.preflight.types.js";
|
||||
import type { DiscordMonitorStatusSink } from "./status.js";
|
||||
import { resolveDiscordReplyDeliveryPlan } from "./threading.js";
|
||||
import { normalizeDiscordInboundWorkerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js";
|
||||
|
||||
type ProcessDiscordMessage = typeof import("./message-handler.process.js").processDiscordMessage;
|
||||
type DeliverDiscordReply = typeof import("./reply-delivery.js").deliverDiscordReply;
|
||||
|
||||
type DiscordInboundWorkerParams = {
|
||||
runtime: RuntimeEnv;
|
||||
setStatus?: DiscordMonitorStatusSink;
|
||||
abortSignal?: AbortSignal;
|
||||
runTimeoutMs?: number;
|
||||
replayGuard?: ClaimableDedupe;
|
||||
__testing?: DiscordInboundWorkerTestingHooks;
|
||||
};
|
||||
|
||||
export type DiscordInboundWorker = {
|
||||
enqueue: (job: DiscordInboundJob) => void;
|
||||
deactivate: () => void;
|
||||
};
|
||||
|
||||
export type DiscordInboundWorkerTestingHooks = {
|
||||
processDiscordMessage?: ProcessDiscordMessage;
|
||||
deliverDiscordReply?: DeliverDiscordReply;
|
||||
};
|
||||
|
||||
let messageProcessRuntimePromise:
|
||||
| Promise<typeof import("./message-handler.process.js")>
|
||||
| undefined;
|
||||
let replyDeliveryRuntimePromise: Promise<typeof import("./reply-delivery.js")> | undefined;
|
||||
|
||||
async function loadMessageProcessRuntime() {
|
||||
messageProcessRuntimePromise ??= import("./message-handler.process.js");
|
||||
return await messageProcessRuntimePromise;
|
||||
}
|
||||
|
||||
async function loadReplyDeliveryRuntime() {
|
||||
replyDeliveryRuntimePromise ??= import("./reply-delivery.js");
|
||||
return await replyDeliveryRuntimePromise;
|
||||
}
|
||||
|
||||
function formatDiscordRunContextSuffix(job: DiscordInboundJob): string {
|
||||
const channelId = job.payload.messageChannelId?.trim();
|
||||
const messageId = job.payload.data?.message?.id?.trim();
|
||||
const details = [
|
||||
channelId ? `channelId=${channelId}` : null,
|
||||
messageId ? `messageId=${messageId}` : null,
|
||||
].filter((entry): entry is string => Boolean(entry));
|
||||
if (details.length === 0) {
|
||||
return "";
|
||||
}
|
||||
return ` (${details.join(", ")})`;
|
||||
}
|
||||
|
||||
async function processDiscordInboundJob(params: {
|
||||
job: DiscordInboundJob;
|
||||
runtime: RuntimeEnv;
|
||||
lifecycleSignal?: AbortSignal;
|
||||
runTimeoutMs?: number;
|
||||
replayGuard: ClaimableDedupe;
|
||||
testing?: DiscordInboundWorkerTestingHooks;
|
||||
}) {
|
||||
const timeoutMs = normalizeDiscordInboundWorkerTimeoutMs(params.runTimeoutMs);
|
||||
const contextSuffix = formatDiscordRunContextSuffix(params.job);
|
||||
let finalReplyStarted = false;
|
||||
let createdThreadId: string | undefined;
|
||||
let sessionKey: string | undefined;
|
||||
const processDiscordMessageImpl =
|
||||
params.testing?.processDiscordMessage ??
|
||||
(await loadMessageProcessRuntime()).processDiscordMessage;
|
||||
try {
|
||||
await runDiscordTaskWithTimeout({
|
||||
run: async (abortSignal) => {
|
||||
await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal), {
|
||||
onFinalReplyStart: () => {
|
||||
finalReplyStarted = true;
|
||||
},
|
||||
onFinalReplyDelivered: () => {
|
||||
finalReplyStarted = true;
|
||||
},
|
||||
onReplyPlanResolved: (resolved) => {
|
||||
createdThreadId = normalizeOptionalString(resolved.createdThreadId);
|
||||
sessionKey = normalizeOptionalString(resolved.sessionKey);
|
||||
},
|
||||
});
|
||||
},
|
||||
timeoutMs,
|
||||
abortSignals: [params.job.runtime.abortSignal, params.lifecycleSignal],
|
||||
onTimeout: async (resolvedTimeoutMs) => {
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord inbound worker timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
|
||||
decimals: 1,
|
||||
unit: "seconds",
|
||||
})}${contextSuffix}`,
|
||||
),
|
||||
);
|
||||
if (finalReplyStarted) {
|
||||
return;
|
||||
}
|
||||
await sendDiscordInboundWorkerTimeoutReply({
|
||||
job: params.job,
|
||||
runtime: params.runtime,
|
||||
contextSuffix,
|
||||
createdThreadId,
|
||||
sessionKey,
|
||||
deliverDiscordReplyImpl: params.testing?.deliverDiscordReply,
|
||||
});
|
||||
},
|
||||
onErrorAfterTimeout: (error) => {
|
||||
params.runtime.error?.(
|
||||
danger(`discord inbound worker failed after timeout: ${String(error)}${contextSuffix}`),
|
||||
);
|
||||
},
|
||||
});
|
||||
await commitDiscordInboundReplay({
|
||||
replayKeys: params.job.replayKeys,
|
||||
replayGuard: params.replayGuard,
|
||||
});
|
||||
} catch (error) {
|
||||
if (error instanceof DiscordRetryableInboundError) {
|
||||
releaseDiscordInboundReplay({
|
||||
replayKeys: params.job.replayKeys,
|
||||
error,
|
||||
replayGuard: params.replayGuard,
|
||||
});
|
||||
} else {
|
||||
await commitDiscordInboundReplay({
|
||||
replayKeys: params.job.replayKeys,
|
||||
replayGuard: params.replayGuard,
|
||||
});
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function sendDiscordInboundWorkerTimeoutReply(params: {
|
||||
job: DiscordInboundJob;
|
||||
runtime: RuntimeEnv;
|
||||
contextSuffix: string;
|
||||
createdThreadId?: string;
|
||||
sessionKey?: string;
|
||||
deliverDiscordReplyImpl?: DeliverDiscordReply;
|
||||
}) {
|
||||
const messageChannelId = params.job.payload.messageChannelId?.trim();
|
||||
const messageId = params.job.payload.message?.id?.trim();
|
||||
const token = params.job.payload.token?.trim();
|
||||
if (!messageChannelId || !messageId || !token) {
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord inbound worker timeout reply skipped: missing reply target${params.contextSuffix}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const deliveryPlan = resolveDiscordReplyDeliveryPlan({
|
||||
replyTarget: `channel:${params.job.payload.threadChannel?.id ?? messageChannelId}`,
|
||||
replyToMode: params.job.payload.replyToMode,
|
||||
messageId,
|
||||
threadChannel: params.job.payload.threadChannel,
|
||||
createdThreadId: params.createdThreadId,
|
||||
});
|
||||
|
||||
try {
|
||||
const deliverDiscordReplyImpl =
|
||||
params.deliverDiscordReplyImpl ?? (await loadReplyDeliveryRuntime()).deliverDiscordReply;
|
||||
await deliverDiscordReplyImpl({
|
||||
cfg: params.job.payload.cfg,
|
||||
replies: [{ text: "Discord inbound worker timed out.", isError: true }],
|
||||
target: deliveryPlan.deliverTarget,
|
||||
token,
|
||||
accountId: params.job.payload.accountId,
|
||||
runtime: params.runtime,
|
||||
textLimit: params.job.payload.textLimit,
|
||||
maxLinesPerMessage: params.job.payload.discordConfig?.maxLinesPerMessage,
|
||||
replyToId: deliveryPlan.replyReference.use(),
|
||||
replyToMode: params.job.payload.replyToMode,
|
||||
sessionKey:
|
||||
params.sessionKey ??
|
||||
params.job.payload.route.sessionKey ??
|
||||
params.job.payload.baseSessionKey,
|
||||
threadBindings: params.job.runtime.threadBindings,
|
||||
});
|
||||
} catch (error) {
|
||||
params.runtime.error?.(
|
||||
danger(
|
||||
`discord inbound worker timeout reply failed: ${String(error)}${params.contextSuffix}`,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export function createDiscordInboundWorker(
|
||||
params: DiscordInboundWorkerParams,
|
||||
): DiscordInboundWorker {
|
||||
const runQueue = new KeyedAsyncQueue();
|
||||
const runState = createRunStateMachine({
|
||||
setStatus: params.setStatus,
|
||||
abortSignal: params.abortSignal,
|
||||
});
|
||||
const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard();
|
||||
|
||||
return {
|
||||
enqueue(job) {
|
||||
void runQueue
|
||||
.enqueue(job.queueKey, async () => {
|
||||
if (!runState.isActive()) {
|
||||
return;
|
||||
}
|
||||
runState.onRunStart();
|
||||
try {
|
||||
if (!runState.isActive()) {
|
||||
return;
|
||||
}
|
||||
await processDiscordInboundJob({
|
||||
job,
|
||||
runtime: params.runtime,
|
||||
lifecycleSignal: params.abortSignal,
|
||||
runTimeoutMs: params.runTimeoutMs,
|
||||
replayGuard,
|
||||
testing: params.__testing,
|
||||
});
|
||||
} finally {
|
||||
runState.onRunEnd();
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
params.runtime.error?.(danger(`discord inbound worker failed: ${String(error)}`));
|
||||
});
|
||||
},
|
||||
deactivate: runState.deactivate,
|
||||
};
|
||||
}
|
||||
@@ -38,7 +38,6 @@ import { setPresence } from "./presence-cache.js";
|
||||
import { isThreadArchived } from "./thread-bindings.discord-api.js";
|
||||
import { resolveFetchedDiscordThreadLikeChannelContext } from "./thread-channel-context.js";
|
||||
import { closeDiscordThreadSessions } from "./thread-session-close.js";
|
||||
import { normalizeDiscordListenerTimeoutMs, runDiscordTaskWithTimeout } from "./timeouts.js";
|
||||
|
||||
type LoadedConfig = OpenClawConfig;
|
||||
type RuntimeEnv = import("openclaw/plugin-sdk/runtime-env").RuntimeEnv;
|
||||
@@ -141,46 +140,13 @@ async function runDiscordListenerWithSlowLog(params: {
|
||||
logger: Logger | undefined;
|
||||
listener: string;
|
||||
event: string;
|
||||
run: (abortSignal: AbortSignal | undefined) => Promise<void>;
|
||||
timeoutMs?: number;
|
||||
run: () => Promise<void>;
|
||||
context?: Record<string, unknown>;
|
||||
onError?: (err: unknown) => void;
|
||||
}) {
|
||||
const startedAt = Date.now();
|
||||
const timeoutMs = normalizeDiscordListenerTimeoutMs(params.timeoutMs);
|
||||
const logger = params.logger ?? discordEventQueueLog;
|
||||
let timedOut = false;
|
||||
|
||||
try {
|
||||
timedOut = await runDiscordTaskWithTimeout({
|
||||
run: params.run,
|
||||
timeoutMs,
|
||||
onTimeout: (resolvedTimeoutMs) => {
|
||||
logger.error(
|
||||
danger(
|
||||
`discord handler timed out after ${formatDurationSeconds(resolvedTimeoutMs, {
|
||||
decimals: 1,
|
||||
unit: "seconds",
|
||||
})}${formatListenerContextSuffix(params.context)}`,
|
||||
),
|
||||
);
|
||||
},
|
||||
onAbortAfterTimeout: () => {
|
||||
logger.warn(
|
||||
`discord handler canceled after timeout${formatListenerContextSuffix(params.context)}`,
|
||||
);
|
||||
},
|
||||
onErrorAfterTimeout: (err) => {
|
||||
logger.error(
|
||||
danger(
|
||||
`discord handler failed after timeout: ${String(err)}${formatListenerContextSuffix(params.context)}`,
|
||||
),
|
||||
);
|
||||
},
|
||||
});
|
||||
if (timedOut) {
|
||||
return;
|
||||
}
|
||||
await params.run();
|
||||
} catch (err) {
|
||||
if (params.onError) {
|
||||
params.onError(err);
|
||||
@@ -188,15 +154,13 @@ async function runDiscordListenerWithSlowLog(params: {
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
if (!timedOut) {
|
||||
logSlowDiscordListener({
|
||||
logger: params.logger,
|
||||
listener: params.listener,
|
||||
event: params.event,
|
||||
durationMs: Date.now() - startedAt,
|
||||
context: params.context,
|
||||
});
|
||||
}
|
||||
logSlowDiscordListener({
|
||||
logger: params.logger,
|
||||
listener: params.listener,
|
||||
event: params.event,
|
||||
durationMs: Date.now() - startedAt,
|
||||
context: params.context,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,7 +177,6 @@ export class DiscordMessageListener extends MessageCreateListener {
|
||||
private handler: DiscordMessageHandler,
|
||||
private logger?: Logger,
|
||||
private onEvent?: () => void,
|
||||
_options?: { timeoutMs?: number },
|
||||
) {
|
||||
super();
|
||||
}
|
||||
@@ -221,9 +184,8 @@ export class DiscordMessageListener extends MessageCreateListener {
|
||||
async handle(data: DiscordMessageEvent, client: Client) {
|
||||
this.onEvent?.();
|
||||
// Fire-and-forget: hand off to the handler without blocking the
|
||||
// Carbon listener. Per-session ordering and run timeouts are owned
|
||||
// by the inbound worker queue, so the listener no longer serializes
|
||||
// or applies its own timeout.
|
||||
// Carbon listener. Per-session ordering is owned by the message run queue,
|
||||
// so the listener no longer serializes or applies its own timeout.
|
||||
void Promise.resolve()
|
||||
.then(() => this.handler(data, client))
|
||||
.catch((err) => {
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import type { MockFn } from "openclaw/plugin-sdk/plugin-test-runtime";
|
||||
import { vi } from "vitest";
|
||||
import type { DiscordInboundWorkerTestingHooks } from "./inbound-worker.js";
|
||||
import type { DiscordMessageRunQueueTestingHooks } from "./message-run-queue.js";
|
||||
|
||||
export const preflightDiscordMessageMock: MockFn = vi.fn();
|
||||
export const processDiscordMessageMock: MockFn = vi.fn();
|
||||
export const deliverDiscordReplyMock: MockFn = vi.fn(async () => undefined);
|
||||
|
||||
const { createDiscordMessageHandler: createRealDiscordMessageHandler } =
|
||||
await import("./message-handler.js");
|
||||
@@ -14,9 +13,8 @@ type PreflightDiscordMessageHook = NonNullable<
|
||||
DiscordMessageHandlerTestingHooks["preflightDiscordMessage"]
|
||||
>;
|
||||
type ProcessDiscordMessageHook = NonNullable<
|
||||
DiscordInboundWorkerTestingHooks["processDiscordMessage"]
|
||||
DiscordMessageRunQueueTestingHooks["processDiscordMessage"]
|
||||
>;
|
||||
type DeliverDiscordReplyHook = NonNullable<DiscordInboundWorkerTestingHooks["deliverDiscordReply"]>;
|
||||
|
||||
export function createDiscordMessageHandler(
|
||||
...args: Parameters<typeof createRealDiscordMessageHandler>
|
||||
@@ -28,7 +26,6 @@ export function createDiscordMessageHandler(
|
||||
...params.__testing,
|
||||
preflightDiscordMessage: preflightDiscordMessageMock as PreflightDiscordMessageHook,
|
||||
processDiscordMessage: processDiscordMessageMock as ProcessDiscordMessageHook,
|
||||
deliverDiscordReply: deliverDiscordReplyMock as DeliverDiscordReplyHook,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
@@ -4,14 +4,12 @@ import {
|
||||
createDiscordMessageHandler,
|
||||
preflightDiscordMessageMock,
|
||||
processDiscordMessageMock,
|
||||
deliverDiscordReplyMock,
|
||||
} from "./message-handler.module-test-helpers.js";
|
||||
import {
|
||||
createDiscordHandlerParams,
|
||||
createDiscordPreflightContext,
|
||||
} from "./message-handler.test-helpers.js";
|
||||
|
||||
const eventualReplyDeliveredMock = vi.hoisted(() => vi.fn());
|
||||
type SetStatusFn = (patch: Record<string, unknown>) => void;
|
||||
function createDeferred<T = void>() {
|
||||
let resolve: (value: T | PromiseLike<T>) => void = () => {};
|
||||
@@ -56,10 +54,7 @@ function createPreflightContext(channelId = "ch-1") {
|
||||
};
|
||||
}
|
||||
|
||||
function createHandlerWithDefaultPreflight(overrides?: {
|
||||
setStatus?: SetStatusFn;
|
||||
workerRunTimeoutMs?: number;
|
||||
}) {
|
||||
function createHandlerWithDefaultPreflight(overrides?: { setStatus?: SetStatusFn }) {
|
||||
preflightDiscordMessageMock.mockImplementation(async (params: { data: { channel_id: string } }) =>
|
||||
createPreflightContext(params.data.channel_id),
|
||||
);
|
||||
@@ -72,69 +67,6 @@ function installDefaultDiscordPreflight() {
|
||||
);
|
||||
}
|
||||
|
||||
function createAbortOnTimeoutProcessImplementation() {
|
||||
return async (ctx: { abortSignal?: AbortSignal }) => {
|
||||
await new Promise<void>((resolve) => {
|
||||
if (ctx.abortSignal?.aborted) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
async function queueTimedMessages(params?: {
|
||||
workerRunTimeoutMs?: number;
|
||||
beforeCreateHandler?: () => void;
|
||||
}) {
|
||||
preflightDiscordMessageMock.mockReset();
|
||||
processDiscordMessageMock.mockReset();
|
||||
deliverDiscordReplyMock.mockClear();
|
||||
|
||||
processDiscordMessageMock
|
||||
.mockImplementationOnce(createAbortOnTimeoutProcessImplementation())
|
||||
.mockImplementationOnce(async () => undefined);
|
||||
installDefaultDiscordPreflight();
|
||||
params?.beforeCreateHandler?.();
|
||||
|
||||
const handlerParams = createDiscordHandlerParams({
|
||||
workerRunTimeoutMs: params?.workerRunTimeoutMs ?? 50,
|
||||
});
|
||||
const handler = createDiscordMessageHandler(handlerParams);
|
||||
|
||||
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
|
||||
await expect(handler(createMessageData("m-2") as never, {} as never)).resolves.toBeUndefined();
|
||||
|
||||
return { handlerParams };
|
||||
}
|
||||
|
||||
async function runSingleMessageTimeout(params: {
|
||||
processImpl: Parameters<typeof processDiscordMessageMock.mockImplementationOnce>[0];
|
||||
workerRunTimeoutMs?: number;
|
||||
}) {
|
||||
preflightDiscordMessageMock.mockReset();
|
||||
processDiscordMessageMock.mockReset();
|
||||
deliverDiscordReplyMock.mockClear();
|
||||
processDiscordMessageMock.mockImplementationOnce(params.processImpl);
|
||||
installDefaultDiscordPreflight();
|
||||
|
||||
const handlerParams = createDiscordHandlerParams({
|
||||
workerRunTimeoutMs: params.workerRunTimeoutMs ?? 50,
|
||||
});
|
||||
const handler = createDiscordMessageHandler(handlerParams);
|
||||
|
||||
await expect(handler(createMessageData("m-1") as never, {} as never)).resolves.toBeUndefined();
|
||||
await vi.advanceTimersByTimeAsync(60);
|
||||
await Promise.resolve();
|
||||
|
||||
expect(handlerParams.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord inbound worker timed out after"),
|
||||
);
|
||||
|
||||
return handlerParams;
|
||||
}
|
||||
|
||||
async function createLifecycleStopScenario(params: {
|
||||
createHandler: (status: SetStatusFn) => {
|
||||
handler: (data: never, opts: never) => Promise<void>;
|
||||
@@ -269,9 +201,7 @@ describe("createDiscordMessageHandler queue behavior", () => {
|
||||
await flushQueueWork();
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(params.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
"discord inbound worker failed: DiscordRetryableInboundError: retry me",
|
||||
),
|
||||
expect.stringContaining("discord message run failed: DiscordRetryableInboundError: retry me"),
|
||||
);
|
||||
|
||||
await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined();
|
||||
@@ -298,7 +228,7 @@ describe("createDiscordMessageHandler queue behavior", () => {
|
||||
await flushQueueWork();
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(params.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord inbound worker failed: Error: post-send failure"),
|
||||
expect.stringContaining("discord message run failed: Error: post-send failure"),
|
||||
);
|
||||
|
||||
await expect(handler(duplicate as never, {} as never)).resolves.toBeUndefined();
|
||||
@@ -309,226 +239,56 @@ describe("createDiscordMessageHandler queue behavior", () => {
|
||||
expect(visibleSideEffect).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("applies explicit inbound worker timeout to queued runs so stalled runs do not block the queue", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { handlerParams } = await queueTimedMessages();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60);
|
||||
await flushQueueWork();
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
|
||||
|
||||
const firstCtx = processDiscordMessageMock.mock.calls[0]?.[0] as
|
||||
| { abortSignal?: AbortSignal }
|
||||
| undefined;
|
||||
expect(firstCtx?.abortSignal?.aborted).toBe(true);
|
||||
expect(handlerParams.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord inbound worker timed out after"),
|
||||
);
|
||||
expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1);
|
||||
expect(deliverDiscordReplyMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
target: "channel:ch-1",
|
||||
token: "test-token",
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
isError: true,
|
||||
text: "Discord inbound worker timed out.",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("waits for the timeout fallback reply before starting the next queued run", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const deliverTimeoutReply = createDeferred();
|
||||
const { handlerParams } = await queueTimedMessages({
|
||||
beforeCreateHandler: () => {
|
||||
deliverDiscordReplyMock.mockReset();
|
||||
deliverDiscordReplyMock.mockImplementationOnce(async () => {
|
||||
await deliverTimeoutReply.promise;
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60);
|
||||
await flushQueueWork();
|
||||
expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(handlerParams.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord inbound worker timed out after"),
|
||||
);
|
||||
|
||||
deliverTimeoutReply.resolve();
|
||||
await deliverTimeoutReply.promise;
|
||||
|
||||
await flushQueueWork();
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not send the timeout fallback when a final reply already went out", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
await runSingleMessageTimeout({
|
||||
processImpl: async (
|
||||
ctx: { abortSignal?: AbortSignal },
|
||||
observer?: { onFinalReplyStart?: () => void; onFinalReplyDelivered?: () => void },
|
||||
) => {
|
||||
observer?.onFinalReplyStart?.();
|
||||
observer?.onFinalReplyDelivered?.();
|
||||
await new Promise<void>((resolve) => {
|
||||
if (ctx.abortSignal?.aborted) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
expect(deliverDiscordReplyMock).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("routes the timeout fallback to the created auto-thread target", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
await runSingleMessageTimeout({
|
||||
processImpl: async (
|
||||
ctx: { abortSignal?: AbortSignal },
|
||||
observer?: {
|
||||
onReplyPlanResolved?: (params: {
|
||||
createdThreadId?: string;
|
||||
sessionKey?: string;
|
||||
}) => void;
|
||||
},
|
||||
) => {
|
||||
observer?.onReplyPlanResolved?.({
|
||||
createdThreadId: "thread-1",
|
||||
sessionKey: "agent:main:discord:channel:thread-1",
|
||||
});
|
||||
await new Promise<void>((resolve) => {
|
||||
if (ctx.abortSignal?.aborted) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
ctx.abortSignal?.addEventListener("abort", () => resolve(), { once: true });
|
||||
});
|
||||
},
|
||||
});
|
||||
|
||||
expect(deliverDiscordReplyMock).toHaveBeenCalledTimes(1);
|
||||
expect(deliverDiscordReplyMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
target: "channel:thread-1",
|
||||
sessionKey: "agent:main:discord:channel:thread-1",
|
||||
replies: [
|
||||
expect.objectContaining({
|
||||
isError: true,
|
||||
text: "Discord inbound worker timed out.",
|
||||
}),
|
||||
],
|
||||
}),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("does not send the timeout fallback when final reply delivery is already in flight", async () => {
|
||||
it("does not abort long queued runs with a Discord-owned channel timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
preflightDiscordMessageMock.mockReset();
|
||||
processDiscordMessageMock.mockReset();
|
||||
deliverDiscordReplyMock.mockClear();
|
||||
|
||||
const finishFinalReply = createDeferred();
|
||||
const firstRun = createDeferred();
|
||||
const secondRun = createDeferred();
|
||||
const capturedAbortSignals: Array<AbortSignal | undefined> = [];
|
||||
processDiscordMessageMock.mockImplementationOnce(
|
||||
async (
|
||||
_ctx: { abortSignal?: AbortSignal },
|
||||
observer?: { onFinalReplyStart?: () => void; onFinalReplyDelivered?: () => void },
|
||||
) => {
|
||||
observer?.onFinalReplyStart?.();
|
||||
await finishFinalReply.promise;
|
||||
observer?.onFinalReplyDelivered?.();
|
||||
async (ctx: { abortSignal?: AbortSignal }) => {
|
||||
capturedAbortSignals.push(ctx.abortSignal);
|
||||
await firstRun.promise;
|
||||
},
|
||||
);
|
||||
preflightDiscordMessageMock.mockImplementation(
|
||||
async (params: { data: { channel_id: string } }) =>
|
||||
createPreflightContext(params.data.channel_id),
|
||||
processDiscordMessageMock.mockImplementationOnce(
|
||||
async (ctx: { abortSignal?: AbortSignal }) => {
|
||||
capturedAbortSignals.push(ctx.abortSignal);
|
||||
await secondRun.promise;
|
||||
},
|
||||
);
|
||||
|
||||
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 50 });
|
||||
installDefaultDiscordPreflight();
|
||||
const params = createDiscordHandlerParams();
|
||||
const handler = createDiscordMessageHandler(params);
|
||||
|
||||
await expect(
|
||||
handler(createMessageData("m-1") as never, {} as never),
|
||||
).resolves.toBeUndefined();
|
||||
await expect(
|
||||
handler(createMessageData("m-2") as never, {} as never),
|
||||
).resolves.toBeUndefined();
|
||||
await flushQueueWork();
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(60);
|
||||
await Promise.resolve();
|
||||
await vi.advanceTimersByTimeAsync(60_000);
|
||||
await flushQueueWork();
|
||||
|
||||
expect(params.runtime.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord inbound worker timed out after"),
|
||||
);
|
||||
expect(deliverDiscordReplyMock).not.toHaveBeenCalled();
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(1);
|
||||
expect(capturedAbortSignals[0]?.aborted).not.toBe(true);
|
||||
expect(params.runtime.error).not.toHaveBeenCalledWith(expect.stringContaining("timed out"));
|
||||
|
||||
finishFinalReply.resolve();
|
||||
await finishFinalReply.promise;
|
||||
await Promise.resolve();
|
||||
firstRun.resolve();
|
||||
await firstRun.promise;
|
||||
await flushQueueWork();
|
||||
|
||||
expect(deliverDiscordReplyMock).not.toHaveBeenCalled();
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
expect(processDiscordMessageMock).toHaveBeenCalledTimes(2);
|
||||
expect(capturedAbortSignals[1]?.aborted).not.toBe(true);
|
||||
|
||||
it("does not time out queued runs when the inbound worker timeout is disabled", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
preflightDiscordMessageMock.mockReset();
|
||||
processDiscordMessageMock.mockReset();
|
||||
eventualReplyDeliveredMock.mockReset();
|
||||
|
||||
processDiscordMessageMock.mockImplementationOnce(
|
||||
async (ctx: { abortSignal?: AbortSignal }) => {
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(() => {
|
||||
if (!ctx.abortSignal?.aborted) {
|
||||
eventualReplyDeliveredMock();
|
||||
}
|
||||
resolve();
|
||||
}, 80);
|
||||
});
|
||||
},
|
||||
);
|
||||
const params = createDiscordHandlerParams({ workerRunTimeoutMs: 0 });
|
||||
const handler = createHandlerWithDefaultPreflight({ workerRunTimeoutMs: 0 });
|
||||
|
||||
await expect(
|
||||
handler(createMessageData("m-1") as never, {} as never),
|
||||
).resolves.toBeUndefined();
|
||||
|
||||
await vi.advanceTimersByTimeAsync(80);
|
||||
await Promise.resolve();
|
||||
|
||||
expect(eventualReplyDeliveredMock).toHaveBeenCalledTimes(1);
|
||||
expect(params.runtime.error).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("discord inbound worker timed out after"),
|
||||
);
|
||||
secondRun.resolve();
|
||||
await secondRun.promise;
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
|
||||
@@ -9,7 +9,6 @@ export function createDiscordHandlerParams(overrides?: {
|
||||
botUserId?: string;
|
||||
setStatus?: (patch: Record<string, unknown>) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
workerRunTimeoutMs?: number;
|
||||
}): Parameters<typeof createDiscordMessageHandler>[0] {
|
||||
const cfg: OpenClawConfig = {
|
||||
channels: {
|
||||
@@ -48,7 +47,6 @@ export function createDiscordHandlerParams(overrides?: {
|
||||
threadBindings: createNoopThreadBindingManager("default"),
|
||||
setStatus: overrides?.setStatus,
|
||||
abortSignal: overrides?.abortSignal,
|
||||
workerRunTimeoutMs: overrides?.workerRunTimeoutMs,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -14,13 +14,13 @@ import {
|
||||
releaseDiscordInboundReplay,
|
||||
} from "./inbound-dedupe.js";
|
||||
import { buildDiscordInboundJob } from "./inbound-job.js";
|
||||
import {
|
||||
createDiscordInboundWorker,
|
||||
type DiscordInboundWorkerTestingHooks,
|
||||
} from "./inbound-worker.js";
|
||||
import type { DiscordMessageEvent, DiscordMessageHandler } from "./listeners.js";
|
||||
import { applyImplicitReplyBatchGate } from "./message-handler.batch-gate.js";
|
||||
import type { DiscordMessagePreflightParams } from "./message-handler.preflight.types.js";
|
||||
import {
|
||||
createDiscordMessageRunQueue,
|
||||
type DiscordMessageRunQueueTestingHooks,
|
||||
} from "./message-run-queue.js";
|
||||
import {
|
||||
hasDiscordMessageStickers,
|
||||
resolveDiscordMessageChannelId,
|
||||
@@ -37,11 +37,10 @@ type DiscordMessageHandlerParams = Omit<
|
||||
> & {
|
||||
setStatus?: DiscordMonitorStatusSink;
|
||||
abortSignal?: AbortSignal;
|
||||
workerRunTimeoutMs?: number;
|
||||
__testing?: DiscordMessageHandlerTestingHooks;
|
||||
};
|
||||
|
||||
type DiscordMessageHandlerTestingHooks = DiscordInboundWorkerTestingHooks & {
|
||||
type DiscordMessageHandlerTestingHooks = DiscordMessageRunQueueTestingHooks & {
|
||||
preflightDiscordMessage?: PreflightDiscordMessage;
|
||||
};
|
||||
|
||||
@@ -76,11 +75,10 @@ export function createDiscordMessageHandler(
|
||||
"group-mentions";
|
||||
const preflightDiscordMessageImpl = params.__testing?.preflightDiscordMessage;
|
||||
const replayGuard = createDiscordInboundReplayGuard();
|
||||
const inboundWorker = createDiscordInboundWorker({
|
||||
const messageRunQueue = createDiscordMessageRunQueue({
|
||||
runtime: params.runtime,
|
||||
setStatus: params.setStatus,
|
||||
abortSignal: params.abortSignal,
|
||||
runTimeoutMs: params.workerRunTimeoutMs,
|
||||
replayGuard,
|
||||
__testing: params.__testing,
|
||||
});
|
||||
@@ -156,7 +154,7 @@ export function createDiscordMessageHandler(
|
||||
return;
|
||||
}
|
||||
applyImplicitReplyBatchGate(ctx, params.replyToMode, false);
|
||||
inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
|
||||
messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
|
||||
return;
|
||||
}
|
||||
const combinedBaseText = entries
|
||||
@@ -209,7 +207,7 @@ export function createDiscordMessageHandler(
|
||||
ctxBatch.MessageSidLast = ids[ids.length - 1];
|
||||
}
|
||||
}
|
||||
inboundWorker.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
|
||||
messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
|
||||
} catch (error) {
|
||||
if (error instanceof DiscordRetryableInboundError) {
|
||||
releaseDiscordInboundReplay({ replayKeys, error, replayGuard });
|
||||
@@ -262,7 +260,7 @@ export function createDiscordMessageHandler(
|
||||
}
|
||||
};
|
||||
|
||||
handler.deactivate = inboundWorker.deactivate;
|
||||
handler.deactivate = messageRunQueue.deactivate;
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
115
extensions/discord/src/monitor/message-run-queue.ts
Normal file
115
extensions/discord/src/monitor/message-run-queue.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import { createRunStateMachine } from "openclaw/plugin-sdk/channel-lifecycle";
|
||||
import { KeyedAsyncQueue } from "openclaw/plugin-sdk/keyed-async-queue";
|
||||
import type { ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe";
|
||||
import { danger } from "openclaw/plugin-sdk/runtime-env";
|
||||
import {
|
||||
commitDiscordInboundReplay,
|
||||
createDiscordInboundReplayGuard,
|
||||
DiscordRetryableInboundError,
|
||||
releaseDiscordInboundReplay,
|
||||
} from "./inbound-dedupe.js";
|
||||
import { materializeDiscordInboundJob, type DiscordInboundJob } from "./inbound-job.js";
|
||||
import type { RuntimeEnv } from "./message-handler.preflight.types.js";
|
||||
import type { DiscordMonitorStatusSink } from "./status.js";
|
||||
import { mergeAbortSignals } from "./timeouts.js";
|
||||
|
||||
type ProcessDiscordMessage = typeof import("./message-handler.process.js").processDiscordMessage;
|
||||
|
||||
type DiscordMessageRunQueueParams = {
|
||||
runtime: RuntimeEnv;
|
||||
setStatus?: DiscordMonitorStatusSink;
|
||||
abortSignal?: AbortSignal;
|
||||
replayGuard?: ClaimableDedupe;
|
||||
__testing?: DiscordMessageRunQueueTestingHooks;
|
||||
};
|
||||
|
||||
export type DiscordMessageRunQueue = {
|
||||
enqueue: (job: DiscordInboundJob) => void;
|
||||
deactivate: () => void;
|
||||
};
|
||||
|
||||
export type DiscordMessageRunQueueTestingHooks = {
|
||||
processDiscordMessage?: ProcessDiscordMessage;
|
||||
};
|
||||
|
||||
let messageProcessRuntimePromise:
|
||||
| Promise<typeof import("./message-handler.process.js")>
|
||||
| undefined;
|
||||
|
||||
async function loadMessageProcessRuntime() {
|
||||
messageProcessRuntimePromise ??= import("./message-handler.process.js");
|
||||
return await messageProcessRuntimePromise;
|
||||
}
|
||||
|
||||
async function processDiscordQueuedMessage(params: {
|
||||
job: DiscordInboundJob;
|
||||
lifecycleSignal?: AbortSignal;
|
||||
replayGuard: ClaimableDedupe;
|
||||
testing?: DiscordMessageRunQueueTestingHooks;
|
||||
}) {
|
||||
const processDiscordMessageImpl =
|
||||
params.testing?.processDiscordMessage ??
|
||||
(await loadMessageProcessRuntime()).processDiscordMessage;
|
||||
const abortSignal = mergeAbortSignals([params.job.runtime.abortSignal, params.lifecycleSignal]);
|
||||
try {
|
||||
await processDiscordMessageImpl(materializeDiscordInboundJob(params.job, abortSignal));
|
||||
await commitDiscordInboundReplay({
|
||||
replayKeys: params.job.replayKeys,
|
||||
replayGuard: params.replayGuard,
|
||||
});
|
||||
} catch (error) {
|
||||
if (error instanceof DiscordRetryableInboundError) {
|
||||
releaseDiscordInboundReplay({
|
||||
replayKeys: params.job.replayKeys,
|
||||
error,
|
||||
replayGuard: params.replayGuard,
|
||||
});
|
||||
} else {
|
||||
await commitDiscordInboundReplay({
|
||||
replayKeys: params.job.replayKeys,
|
||||
replayGuard: params.replayGuard,
|
||||
});
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export function createDiscordMessageRunQueue(
|
||||
params: DiscordMessageRunQueueParams,
|
||||
): DiscordMessageRunQueue {
|
||||
const runQueue = new KeyedAsyncQueue();
|
||||
const runState = createRunStateMachine({
|
||||
setStatus: params.setStatus,
|
||||
abortSignal: params.abortSignal,
|
||||
});
|
||||
const replayGuard = params.replayGuard ?? createDiscordInboundReplayGuard();
|
||||
|
||||
return {
|
||||
enqueue(job) {
|
||||
void runQueue
|
||||
.enqueue(job.queueKey, async () => {
|
||||
if (!runState.isActive()) {
|
||||
return;
|
||||
}
|
||||
runState.onRunStart();
|
||||
try {
|
||||
if (!runState.isActive()) {
|
||||
return;
|
||||
}
|
||||
await processDiscordQueuedMessage({
|
||||
job,
|
||||
lifecycleSignal: params.abortSignal,
|
||||
replayGuard,
|
||||
testing: params.__testing,
|
||||
});
|
||||
} finally {
|
||||
runState.onRunEnd();
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
params.runtime.error?.(danger(`discord message run failed: ${String(error)}`));
|
||||
});
|
||||
},
|
||||
deactivate: runState.deactivate,
|
||||
};
|
||||
}
|
||||
@@ -343,7 +343,7 @@ async function fetchDiscordMedia(params: {
|
||||
abortSignal?: AbortSignal;
|
||||
}) {
|
||||
// `totalTimeoutMs` is enforced per individual attachment or sticker fetch.
|
||||
// The inbound worker's abort signal remains the outer bound for the message.
|
||||
// The caller abort signal remains the outer bound for the message.
|
||||
const timeoutAbortController = params.totalTimeoutMs ? new AbortController() : undefined;
|
||||
const signal = mergeAbortSignals([params.abortSignal, timeoutAbortController?.signal]);
|
||||
let timedOut = false;
|
||||
|
||||
@@ -253,7 +253,6 @@ export function registerDiscordMonitorListeners(params: {
|
||||
logger: NonNullable<ConstructorParameters<typeof DiscordMessageListener>[1]>;
|
||||
messageHandler: ConstructorParameters<typeof DiscordMessageListener>[0];
|
||||
trackInboundEvent?: () => void;
|
||||
eventQueueListenerTimeoutMs?: number;
|
||||
}) {
|
||||
registerDiscordListener(
|
||||
params.client.listeners,
|
||||
@@ -261,9 +260,7 @@ export function registerDiscordMonitorListeners(params: {
|
||||
);
|
||||
registerDiscordListener(
|
||||
params.client.listeners,
|
||||
new DiscordMessageListener(params.messageHandler, params.logger, params.trackInboundEvent, {
|
||||
timeoutMs: params.eventQueueListenerTimeoutMs,
|
||||
}),
|
||||
new DiscordMessageListener(params.messageHandler, params.logger, params.trackInboundEvent),
|
||||
);
|
||||
|
||||
const reactionListenerOptions: ConstructorParameters<typeof DiscordReactionListener>[0] = {
|
||||
|
||||
@@ -637,7 +637,7 @@ describe("monitorDiscordProvider", () => {
|
||||
expect(eventQueue?.listenerTimeout).toBe(300_000);
|
||||
});
|
||||
|
||||
it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => {
|
||||
it("does not pass eventQueue.listenerTimeout into the message run queue", async () => {
|
||||
await monitorDiscordProvider({
|
||||
config: createConfigWithDiscordAccount({
|
||||
eventQueue: { listenerTimeout: 50_000 },
|
||||
@@ -653,7 +653,7 @@ describe("monitorDiscordProvider", () => {
|
||||
expect("listenerTimeoutMs" in (params ?? {})).toBe(false);
|
||||
});
|
||||
|
||||
it("forwards inbound worker timeout config to the Discord message handler", async () => {
|
||||
it("ignores deprecated inbound worker timeout config", async () => {
|
||||
resolveDiscordAccountMock.mockReturnValue({
|
||||
accountId: "default",
|
||||
token: "MTIz.abc.def",
|
||||
@@ -674,7 +674,7 @@ describe("monitorDiscordProvider", () => {
|
||||
const params = getFirstDiscordMessageHandlerParams<{
|
||||
workerRunTimeoutMs?: number;
|
||||
}>();
|
||||
expect(params?.workerRunTimeoutMs).toBe(300_000);
|
||||
expect(params?.workerRunTimeoutMs).toBeUndefined();
|
||||
});
|
||||
|
||||
it("continues startup when Discord daily slash-command create quota is exhausted", async () => {
|
||||
|
||||
@@ -958,7 +958,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
gateway,
|
||||
gatewaySupervisor: createdGatewaySupervisor,
|
||||
autoPresenceController: createdAutoPresenceController,
|
||||
eventQueueOpts,
|
||||
} = await createDiscordMonitorClient({
|
||||
accountId: account.accountId,
|
||||
applicationId,
|
||||
@@ -1079,7 +1078,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
runtime,
|
||||
setStatus: opts.setStatus,
|
||||
abortSignal: opts.abortSignal,
|
||||
workerRunTimeoutMs: discordCfg.inboundWorker?.runTimeoutMs,
|
||||
botUserId,
|
||||
guildHistories,
|
||||
historyLimit,
|
||||
@@ -1120,7 +1118,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
logger,
|
||||
messageHandler,
|
||||
trackInboundEvent,
|
||||
eventQueueListenerTimeoutMs: eventQueueOpts.listenerTimeout,
|
||||
});
|
||||
|
||||
logDiscordStartupPhase({
|
||||
|
||||
@@ -1,40 +1,11 @@
|
||||
const MAX_DISCORD_TIMEOUT_MS = 2_147_483_647;
|
||||
|
||||
// Compatibility constants for existing imports. Discord no longer enforces
|
||||
// channel-owned listener or inbound run timeouts.
|
||||
export const DISCORD_DEFAULT_LISTENER_TIMEOUT_MS = 120_000;
|
||||
export const DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS = 30 * 60_000;
|
||||
|
||||
export const DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS = 60_000;
|
||||
export const DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS = 120_000;
|
||||
|
||||
function clampDiscordTimeoutMs(timeoutMs: number, minimumMs: number): number {
|
||||
return Math.max(minimumMs, Math.min(Math.floor(timeoutMs), MAX_DISCORD_TIMEOUT_MS));
|
||||
}
|
||||
|
||||
export function normalizeDiscordListenerTimeoutMs(raw: number | undefined): number {
|
||||
if (!Number.isFinite(raw) || (raw ?? 0) <= 0) {
|
||||
return DISCORD_DEFAULT_LISTENER_TIMEOUT_MS;
|
||||
}
|
||||
return clampDiscordTimeoutMs(raw!, 1_000);
|
||||
}
|
||||
|
||||
export function normalizeDiscordInboundWorkerTimeoutMs(
|
||||
raw: number | undefined,
|
||||
): number | undefined {
|
||||
if (raw === 0) {
|
||||
return undefined;
|
||||
}
|
||||
if (typeof raw !== "number" || !Number.isFinite(raw) || raw < 0) {
|
||||
return DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS;
|
||||
}
|
||||
return clampDiscordTimeoutMs(raw, 1);
|
||||
}
|
||||
|
||||
export function isAbortError(error: unknown): boolean {
|
||||
if (typeof error !== "object" || error === null) {
|
||||
return false;
|
||||
}
|
||||
return "name" in error && String((error as { name?: unknown }).name) === "AbortError";
|
||||
}
|
||||
|
||||
export function mergeAbortSignals(
|
||||
signals: Array<AbortSignal | undefined>,
|
||||
): AbortSignal | undefined {
|
||||
@@ -66,57 +37,3 @@ export function mergeAbortSignals(
|
||||
}
|
||||
return fallbackController.signal;
|
||||
}
|
||||
|
||||
export async function runDiscordTaskWithTimeout(params: {
|
||||
run: (abortSignal: AbortSignal | undefined) => Promise<void>;
|
||||
timeoutMs?: number;
|
||||
abortSignals?: Array<AbortSignal | undefined>;
|
||||
onTimeout: (timeoutMs: number) => void | Promise<void>;
|
||||
onAbortAfterTimeout?: () => void;
|
||||
onErrorAfterTimeout?: (error: unknown) => void;
|
||||
}): Promise<boolean> {
|
||||
const timeoutAbortController = params.timeoutMs ? new AbortController() : undefined;
|
||||
const mergedAbortSignal = mergeAbortSignals([
|
||||
...(params.abortSignals ?? []),
|
||||
timeoutAbortController?.signal,
|
||||
]);
|
||||
|
||||
let timedOut = false;
|
||||
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;
|
||||
const runPromise = params.run(mergedAbortSignal).catch((error) => {
|
||||
if (!timedOut) {
|
||||
throw error;
|
||||
}
|
||||
if (timeoutAbortController?.signal.aborted && isAbortError(error)) {
|
||||
params.onAbortAfterTimeout?.();
|
||||
return;
|
||||
}
|
||||
params.onErrorAfterTimeout?.(error);
|
||||
});
|
||||
|
||||
try {
|
||||
if (!params.timeoutMs) {
|
||||
await runPromise;
|
||||
return false;
|
||||
}
|
||||
const timeoutPromise = new Promise<"timeout">((resolve) => {
|
||||
timeoutHandle = setTimeout(() => resolve("timeout"), params.timeoutMs);
|
||||
timeoutHandle.unref?.();
|
||||
});
|
||||
const result = await Promise.race([
|
||||
runPromise.then(() => "completed" as const),
|
||||
timeoutPromise,
|
||||
]);
|
||||
if (result === "timeout") {
|
||||
timedOut = true;
|
||||
timeoutAbortController?.abort();
|
||||
await params.onTimeout(params.timeoutMs);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
} finally {
|
||||
if (timeoutHandle) {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
export {
|
||||
DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS,
|
||||
DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS,
|
||||
DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS,
|
||||
DISCORD_DEFAULT_LISTENER_TIMEOUT_MS,
|
||||
} from "./src/monitor/timeouts.js";
|
||||
|
||||
@@ -3490,8 +3490,8 @@ export const GENERATED_BUNDLED_CHANNEL_CONFIG_METADATA = [
|
||||
help: "If true, Discord thread sessions inherit the parent channel transcript (default: false).",
|
||||
},
|
||||
"inboundWorker.runTimeoutMs": {
|
||||
label: "Discord Inbound Worker Timeout (ms)",
|
||||
help: "Optional queued Discord inbound worker timeout in ms. This is separate from Carbon listener timeouts; defaults to 1800000 and can be disabled with 0. Set per account via channels.discord.accounts.<id>.inboundWorker.runTimeoutMs.",
|
||||
label: "Deprecated Discord Inbound Worker Timeout",
|
||||
help: "Ignored compatibility setting. Discord no longer aborts queued agent runs at the channel layer; session/tool/runtime lifecycle controls long-running work.",
|
||||
},
|
||||
"eventQueue.listenerTimeout": {
|
||||
label: "Discord EventQueue Listener Timeout (ms)",
|
||||
|
||||
@@ -340,13 +340,13 @@ export type DiscordAccountConfig = {
|
||||
/** Streaming URL (Twitch/YouTube). Required when activityType=1. */
|
||||
activityUrl?: string;
|
||||
/**
|
||||
* In-process worker settings for queued inbound Discord runs.
|
||||
* This is separate from Carbon's eventQueue listener budget.
|
||||
* @deprecated Kept for config compatibility. Discord no longer enforces
|
||||
* channel-owned timeouts for queued inbound agent runs.
|
||||
*/
|
||||
inboundWorker?: {
|
||||
/**
|
||||
* Max time (ms) a queued inbound run may execute before OpenClaw aborts it.
|
||||
* Defaults to 1800000 (30 minutes). Set 0 to disable the worker-owned timeout.
|
||||
* @deprecated Ignored. Queued Discord agent runs are governed by the
|
||||
* session/tool/runtime lifecycle, not by Discord channel config.
|
||||
*/
|
||||
runTimeoutMs?: number;
|
||||
};
|
||||
|
||||
@@ -20,7 +20,7 @@ const RUNTIME_API_EXPORT_GUARDS: Record<string, readonly string[]> = {
|
||||
'export { clearGateways, getGateway, registerGateway, unregisterGateway } from "./src/monitor/gateway-registry.js";',
|
||||
'export { clearPresences, getPresence, presenceCacheSize, setPresence } from "./src/monitor/presence-cache.js";',
|
||||
'export { __testing, autoBindSpawnedDiscordSubagent, createNoopThreadBindingManager, createThreadBindingManager, formatThreadBindingDurationLabel, getThreadBindingManager, isRecentlyUnboundThreadWebhookMessage, listThreadBindingsBySessionKey, listThreadBindingsForAccount, reconcileAcpThreadBindingsOnStartup, resolveDiscordThreadBindingIdleTimeoutMs, resolveDiscordThreadBindingMaxAgeMs, resolveThreadBindingIdleTimeoutMs, resolveThreadBindingInactivityExpiresAt, resolveThreadBindingIntroText, resolveThreadBindingMaxAgeExpiresAt, resolveThreadBindingMaxAgeMs, resolveThreadBindingPersona, resolveThreadBindingPersonaFromRecord, resolveThreadBindingsEnabled, resolveThreadBindingThreadName, setThreadBindingIdleTimeoutBySessionKey, setThreadBindingMaxAgeBySessionKey, unbindThreadBindingsBySessionKey, type AcpThreadBindingReconciliationResult, type ThreadBindingManager, type ThreadBindingRecord, type ThreadBindingTargetKind } from "./src/monitor/thread-bindings.js";',
|
||||
'export { DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, isAbortError, mergeAbortSignals, normalizeDiscordInboundWorkerTimeoutMs, normalizeDiscordListenerTimeoutMs, runDiscordTaskWithTimeout } from "./src/monitor/timeouts.js";',
|
||||
'export { DISCORD_ATTACHMENT_IDLE_TIMEOUT_MS, DISCORD_ATTACHMENT_TOTAL_TIMEOUT_MS, DISCORD_DEFAULT_INBOUND_WORKER_TIMEOUT_MS, DISCORD_DEFAULT_LISTENER_TIMEOUT_MS, mergeAbortSignals } from "./src/monitor/timeouts.js";',
|
||||
'export { fetchDiscordApplicationId, fetchDiscordApplicationSummary, parseApplicationIdFromToken, probeDiscord, resolveDiscordPrivilegedIntentsFromFlags, type DiscordApplicationSummary, type DiscordPrivilegedIntentsSummary, type DiscordPrivilegedIntentStatus, type DiscordProbe } from "./src/probe.js";',
|
||||
'export { resolveDiscordChannelAllowlist, type DiscordChannelResolution } from "./src/resolve-channels.js";',
|
||||
'export { resolveDiscordUserAllowlist, type DiscordUserResolution } from "./src/resolve-users.js";',
|
||||
|
||||
Reference in New Issue
Block a user