fix(feishu): cap per-chat queue task wait so a single hang doesn't starve later messages

Per-chat sequential queue had no timeout: if a single dispatch hung
(e.g. an agent call that never resolved), every subsequent message in
the same chat stayed `queued` until the gateway was restarted.

Add an optional `taskTimeoutMs` (default 5 min) to `createSequentialQueue`.
After the cap, the in-flight task is evicted from the blocking chain so
newer same-key tasks can proceed. The original task is NOT aborted —
it continues running in the background; we just stop starving the queue.
A warning log surfaces the eviction with the offending key.

`taskTimeoutMs: 0` restores legacy unbounded behavior.

Same-chat FIFO ordering for normal-cadence messages is preserved
(see #64324) — only pathologically slow tasks get evicted.

Fixes #70133.
This commit is contained in:
Martin Garramon
2026-05-03 09:43:55 -03:00
committed by Peter Steinberger
parent 0bf06e953f
commit 0028e6040a
4 changed files with 141 additions and 3 deletions

View File

@@ -31,6 +31,7 @@ Docs: https://docs.openclaw.ai
- CLI/plugins: warn when npm plugin installs remain shadowed by a failing config-selected source and surface the repair path in `plugins doctor`. Thanks @LindalyX-Lee.
- Agents/Telegram: preserve explicit reply and quote context in embedded model prompts without letting quoted text drive prompt-local image loading. Fixes #76419. (#76659) Thanks @cheechnd.
- Active Memory: apply `setupGraceTimeoutMs` to the embedded recall runner as well as the outer prompt-build watchdog, so very-cold first recalls keep the configured setup grace end-to-end. (#74480) Thanks @volcano303.
- Channels/Feishu: cap how long the per-chat sequential queue blocks subsequent same-key tasks behind a single in-flight task (5 min default), so a single hung dispatch no longer leaves later same-chat messages in `queued` state until gateway restart; the stuck task continues running but is evicted from the blocking chain and a warning is logged. Fixes #70133. Thanks @bek91 for the report.
- CLI/config: keep JSON dry-run patches validating touched channel configuration against bundled channel schemas even when the patch only contains SecretRef objects.
- Plugins/tools: keep disabled bundled tool plugins out of explicit runtime allowlist ownership and fall back from loaded-but-empty channel registries to tool-bearing plugin registries, so Active Memory can use bundled `memory-core` search/get tools even when `memory-lancedb` is disabled. Fixes #76603. Thanks @jwong-art.
- Plugins/install: run `npm install` from the managed npm-root manifest so installing one `@openclaw/*` plugin preserves already installed sibling plugins instead of pruning them. Fixes #76571. (#76602) Thanks @byungskers and @crpol.

View File

@@ -181,7 +181,13 @@ export function createFeishuMessageReceiveHandler({
});
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
const enqueue = createSequentialQueue();
const enqueue = createSequentialQueue({
onTaskTimeout: (key, timeoutMs) => {
log(
`feishu[${accountId}]: per-chat task exceeded ${timeoutMs}ms cap (key=${key}); evicting from queue so later same-key messages can proceed (#70133)`,
);
},
});
const dispatchFeishuMessage = async (event: FeishuMessageEvent) => {
const sequentialKey = resolveSequentialKey({

View File

@@ -89,4 +89,67 @@ describe("createSequentialQueue", () => {
process.off("unhandledRejection", onUnhandledRejection);
}
});
it("evicts a stuck task after taskTimeoutMs so newer same-key work proceeds", async () => {
const timeouts: Array<{ key: string; timeoutMs: number }> = [];
const enqueue = createSequentialQueue({
taskTimeoutMs: 25,
onTaskTimeout: (key, timeoutMs) => {
timeouts.push({ key, timeoutMs });
},
});
const order: string[] = [];
// Stuck task — never resolves until the test cleans up.
const stuckGate = createDeferred();
const stuck = enqueue("feishu:default:chat-stuck", async () => {
order.push("stuck:start");
await stuckGate.promise;
order.push("stuck:end");
});
// Second same-key task — would be starved indefinitely without the cap.
const followUp = enqueue("feishu:default:chat-stuck", async () => {
order.push("follow-up:ran");
});
await followUp;
expect(order).toEqual(["stuck:start", "follow-up:ran"]);
expect(timeouts).toEqual([{ key: "feishu:default:chat-stuck", timeoutMs: 25 }]);
// Drain the leaked stuck task so it doesn't trip the unhandled-rejection guard.
stuckGate.resolve();
await stuck;
});
it("disables the timeout cap when taskTimeoutMs is 0 (legacy behavior)", async () => {
const timeouts: Array<{ key: string; timeoutMs: number }> = [];
const enqueue = createSequentialQueue({
taskTimeoutMs: 0,
onTaskTimeout: (key, timeoutMs) => {
timeouts.push({ key, timeoutMs });
},
});
const gate = createDeferred();
const order: string[] = [];
const first = enqueue("feishu:default:chat-1", async () => {
order.push("first:start");
await gate.promise;
order.push("first:end");
});
const second = enqueue("feishu:default:chat-1", async () => {
order.push("second:ran");
});
// Wait long enough that a timeout would have fired if it were active.
await new Promise((resolve) => setTimeout(resolve, 30));
expect(order).toEqual(["first:start"]);
expect(timeouts).toEqual([]);
gate.resolve();
await Promise.all([first, second]);
expect(order).toEqual(["first:start", "first:end", "second:ran"]);
});
});

View File

@@ -1,9 +1,50 @@
export function createSequentialQueue() {
/**
* Per-key serial task queue for Feishu inbound message handling.
*
* Tasks enqueued under the same key run in FIFO order. Different keys run
* concurrently. This preserves the channel's same-chat ordering contract
* (see #64324) while letting cross-chat work proceed in parallel.
*
* `taskTimeoutMs` bounds how long the queue will block subsequent same-key
* tasks behind a single in-flight task. After the cap, the in-flight task
* is evicted from the blocking chain so newer messages for the same key
* can proceed. The original task is NOT aborted — it continues running in
* the background; it just stops starving the queue.
*
* Without this cap, a single hung dispatch (e.g. an agent call that never
* resolves) keeps later same-chat messages in `queued` state until the
* gateway is restarted. See #70133.
*/
const DEFAULT_TASK_TIMEOUT_MS = 5 * 60 * 1000;
export interface SequentialQueueOptions {
/**
* Maximum time (ms) to block subsequent same-key tasks behind a single
* in-flight task. Pass 0 (or a non-finite value) to disable the cap and
* restore unbounded legacy behavior.
*
* Default: 5 minutes.
*/
taskTimeoutMs?: number;
/**
* Optional callback fired when a task exceeds `taskTimeoutMs`. The task
* itself is not awaited further; this callback is the only signal the
* caller gets that the queue moved on without it.
*/
onTaskTimeout?: (key: string, timeoutMs: number) => void;
}
export function createSequentialQueue(options: SequentialQueueOptions = {}) {
const queues = new Map<string, Promise<void>>();
const taskTimeoutMs = options.taskTimeoutMs ?? DEFAULT_TASK_TIMEOUT_MS;
const onTaskTimeout = options.onTaskTimeout;
return (key: string, task: () => Promise<void>): Promise<void> => {
const previous = queues.get(key) ?? Promise.resolve();
const next = previous.then(task, task);
const wrapped = () => boundedRun(key, task, taskTimeoutMs, onTaskTimeout);
const next = previous.then(wrapped, wrapped);
queues.set(key, next);
const cleanup = () => {
if (queues.get(key) === next) {
@@ -14,3 +55,30 @@ export function createSequentialQueue() {
return next;
};
}
async function boundedRun(
key: string,
task: () => Promise<void>,
timeoutMs: number,
onTaskTimeout: ((key: string, timeoutMs: number) => void) | undefined,
): Promise<void> {
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
return task();
}
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = new Promise<void>((resolve) => {
timeoutHandle = setTimeout(() => {
try {
onTaskTimeout?.(key, timeoutMs);
} catch {
// Swallow logging errors so they cannot poison the queue chain.
}
resolve();
}, timeoutMs);
});
try {
await Promise.race([task(), timeoutPromise]);
} finally {
if (timeoutHandle) clearTimeout(timeoutHandle);
}
}