From 0028e6040a73dbe1b010748737852619b9227186 Mon Sep 17 00:00:00 2001 From: Martin Garramon Date: Sun, 3 May 2026 09:43:55 -0300 Subject: [PATCH] fix(feishu): cap per-chat queue task wait so a single hang doesn't starve later messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-chat sequential queue had no timeout: if a single dispatch hung (e.g. an agent call that never resolved), every subsequent message in the same chat stayed `queued` until the gateway was restarted. Add an optional `taskTimeoutMs` (default 5 min) to `createSequentialQueue`. After the cap, the in-flight task is evicted from the blocking chain so newer same-key tasks can proceed. The original task is NOT aborted — it continues running in the background; we just stop starving the queue. A warning log surfaces the eviction with the offending key. `taskTimeoutMs: 0` restores legacy unbounded behavior. Same-chat FIFO ordering for normal-cadence messages is preserved (see #64324) — only pathologically slow tasks get evicted. Fixes #70133. --- CHANGELOG.md | 1 + .../feishu/src/monitor.message-handler.ts | 8 ++- .../feishu/src/sequential-queue.test.ts | 63 ++++++++++++++++ extensions/feishu/src/sequential-queue.ts | 72 ++++++++++++++++++- 4 files changed, 141 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c699c948d9c..09ff2001a08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai - CLI/plugins: warn when npm plugin installs remain shadowed by a failing config-selected source and surface the repair path in `plugins doctor`. Thanks @LindalyX-Lee. - Agents/Telegram: preserve explicit reply and quote context in embedded model prompts without letting quoted text drive prompt-local image loading. Fixes #76419. (#76659) Thanks @cheechnd. - Active Memory: apply `setupGraceTimeoutMs` to the embedded recall runner as well as the outer prompt-build watchdog, so very-cold first recalls keep the configured setup grace end-to-end. (#74480) Thanks @volcano303. +- Channels/Feishu: cap how long the per-chat sequential queue blocks subsequent same-key tasks behind a single in-flight task (5 min default), so a single hung dispatch no longer leaves later same-chat messages in `queued` state until gateway restart; the stuck task continues running but is evicted from the blocking chain and a warning is logged. Fixes #70133. Thanks @bek91 for the report. - CLI/config: keep JSON dry-run patches validating touched channel configuration against bundled channel schemas even when the patch only contains SecretRef objects. - Plugins/tools: keep disabled bundled tool plugins out of explicit runtime allowlist ownership and fall back from loaded-but-empty channel registries to tool-bearing plugin registries, so Active Memory can use bundled `memory-core` search/get tools even when `memory-lancedb` is disabled. Fixes #76603. Thanks @jwong-art. - Plugins/install: run `npm install` from the managed npm-root manifest so installing one `@openclaw/*` plugin preserves already installed sibling plugins instead of pruning them. Fixes #76571. (#76602) Thanks @byungskers and @crpol. diff --git a/extensions/feishu/src/monitor.message-handler.ts b/extensions/feishu/src/monitor.message-handler.ts index 616ac596efa..3df9cbbed0d 100644 --- a/extensions/feishu/src/monitor.message-handler.ts +++ b/extensions/feishu/src/monitor.message-handler.ts @@ -181,7 +181,13 @@ export function createFeishuMessageReceiveHandler({ }); const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - const enqueue = createSequentialQueue(); + const enqueue = createSequentialQueue({ + onTaskTimeout: (key, timeoutMs) => { + log( + `feishu[${accountId}]: per-chat task exceeded ${timeoutMs}ms cap (key=${key}); evicting from queue so later same-key messages can proceed (#70133)`, + ); + }, + }); const dispatchFeishuMessage = async (event: FeishuMessageEvent) => { const sequentialKey = resolveSequentialKey({ diff --git a/extensions/feishu/src/sequential-queue.test.ts b/extensions/feishu/src/sequential-queue.test.ts index fa2cbaf2bd8..a0ea3dfd4d6 100644 --- a/extensions/feishu/src/sequential-queue.test.ts +++ b/extensions/feishu/src/sequential-queue.test.ts @@ -89,4 +89,67 @@ describe("createSequentialQueue", () => { process.off("unhandledRejection", onUnhandledRejection); } }); + + it("evicts a stuck task after taskTimeoutMs so newer same-key work proceeds", async () => { + const timeouts: Array<{ key: string; timeoutMs: number }> = []; + const enqueue = createSequentialQueue({ + taskTimeoutMs: 25, + onTaskTimeout: (key, timeoutMs) => { + timeouts.push({ key, timeoutMs }); + }, + }); + const order: string[] = []; + + // Stuck task — never resolves until the test cleans up. + const stuckGate = createDeferred(); + const stuck = enqueue("feishu:default:chat-stuck", async () => { + order.push("stuck:start"); + await stuckGate.promise; + order.push("stuck:end"); + }); + + // Second same-key task — would be starved indefinitely without the cap. + const followUp = enqueue("feishu:default:chat-stuck", async () => { + order.push("follow-up:ran"); + }); + + await followUp; + + expect(order).toEqual(["stuck:start", "follow-up:ran"]); + expect(timeouts).toEqual([{ key: "feishu:default:chat-stuck", timeoutMs: 25 }]); + + // Drain the leaked stuck task so it doesn't trip the unhandled-rejection guard. + stuckGate.resolve(); + await stuck; + }); + + it("disables the timeout cap when taskTimeoutMs is 0 (legacy behavior)", async () => { + const timeouts: Array<{ key: string; timeoutMs: number }> = []; + const enqueue = createSequentialQueue({ + taskTimeoutMs: 0, + onTaskTimeout: (key, timeoutMs) => { + timeouts.push({ key, timeoutMs }); + }, + }); + const gate = createDeferred(); + const order: string[] = []; + + const first = enqueue("feishu:default:chat-1", async () => { + order.push("first:start"); + await gate.promise; + order.push("first:end"); + }); + const second = enqueue("feishu:default:chat-1", async () => { + order.push("second:ran"); + }); + + // Wait long enough that a timeout would have fired if it were active. + await new Promise((resolve) => setTimeout(resolve, 30)); + expect(order).toEqual(["first:start"]); + expect(timeouts).toEqual([]); + + gate.resolve(); + await Promise.all([first, second]); + expect(order).toEqual(["first:start", "first:end", "second:ran"]); + }); }); diff --git a/extensions/feishu/src/sequential-queue.ts b/extensions/feishu/src/sequential-queue.ts index edaf2bf398c..2280e7a27fe 100644 --- a/extensions/feishu/src/sequential-queue.ts +++ b/extensions/feishu/src/sequential-queue.ts @@ -1,9 +1,50 @@ -export function createSequentialQueue() { +/** + * Per-key serial task queue for Feishu inbound message handling. + * + * Tasks enqueued under the same key run in FIFO order. Different keys run + * concurrently. This preserves the channel's same-chat ordering contract + * (see #64324) while letting cross-chat work proceed in parallel. + * + * `taskTimeoutMs` bounds how long the queue will block subsequent same-key + * tasks behind a single in-flight task. After the cap, the in-flight task + * is evicted from the blocking chain so newer messages for the same key + * can proceed. The original task is NOT aborted — it continues running in + * the background; it just stops starving the queue. + * + * Without this cap, a single hung dispatch (e.g. an agent call that never + * resolves) keeps later same-chat messages in `queued` state until the + * gateway is restarted. See #70133. + */ + +const DEFAULT_TASK_TIMEOUT_MS = 5 * 60 * 1000; + +export interface SequentialQueueOptions { + /** + * Maximum time (ms) to block subsequent same-key tasks behind a single + * in-flight task. Pass 0 (or a non-finite value) to disable the cap and + * restore unbounded legacy behavior. + * + * Default: 5 minutes. + */ + taskTimeoutMs?: number; + + /** + * Optional callback fired when a task exceeds `taskTimeoutMs`. The task + * itself is not awaited further; this callback is the only signal the + * caller gets that the queue moved on without it. + */ + onTaskTimeout?: (key: string, timeoutMs: number) => void; +} + +export function createSequentialQueue(options: SequentialQueueOptions = {}) { const queues = new Map>(); + const taskTimeoutMs = options.taskTimeoutMs ?? DEFAULT_TASK_TIMEOUT_MS; + const onTaskTimeout = options.onTaskTimeout; return (key: string, task: () => Promise): Promise => { const previous = queues.get(key) ?? Promise.resolve(); - const next = previous.then(task, task); + const wrapped = () => boundedRun(key, task, taskTimeoutMs, onTaskTimeout); + const next = previous.then(wrapped, wrapped); queues.set(key, next); const cleanup = () => { if (queues.get(key) === next) { @@ -14,3 +55,30 @@ export function createSequentialQueue() { return next; }; } + +async function boundedRun( + key: string, + task: () => Promise, + timeoutMs: number, + onTaskTimeout: ((key: string, timeoutMs: number) => void) | undefined, +): Promise { + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + return task(); + } + let timeoutHandle: ReturnType | undefined; + const timeoutPromise = new Promise((resolve) => { + timeoutHandle = setTimeout(() => { + try { + onTaskTimeout?.(key, timeoutMs); + } catch { + // Swallow logging errors so they cannot poison the queue chain. + } + resolve(); + }, timeoutMs); + }); + try { + await Promise.race([task(), timeoutPromise]); + } finally { + if (timeoutHandle) clearTimeout(timeoutHandle); + } +}