diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f4a39519f0..04884bab1eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -95,6 +95,7 @@ Docs: https://docs.openclaw.ai - Agents/BTW: strip replayed tool blocks, hidden reasoning, and malformed image payloads from `/btw` side-question context so Bedrock no-tools side questions keep working after tool-use turns. (#64225) Thanks @ngutman. - Commands/btw: keep tool-less side questions from sending injected empty `tools` arrays on strict OpenAI-compatible providers, so `/btw` continues working after prior tool-call history. (#64219) Thanks @ngutman. - Agents/Bedrock: let `/btw` side questions use `auth: "aws-sdk"` without a static API key so Bedrock IAM and instance-role sessions stop failing before the side question runs. (#64218) Thanks @SnowSky1. +- Feishu: route `/btw` side questions and `/stop` onto bounded out-of-band lanes so BTW no longer waits behind a busy normal chat turn while ordinary same-chat traffic stays FIFO. (#64324) Thanks @ngutman. - Agents/failover: detect llama.cpp slot context overflows as context-overflow errors so compaction can retry self-hosted OpenAI-compatible runs instead of surfacing the raw upstream 400. (#64196) Thanks @alexander-applyinnovations. - Claude CLI/skills: pass eligible OpenClaw skills into CLI runs, including native Claude Code skill resolution via a temporary plugin plus per-run skill env/API key injection. (#62686, #62723) Thanks @zomars. - Discord: keep generated auto-thread names working with reasoning models by giving title generation enough output budget for thinking plus visible title text. (#64172) Thanks @hanamizuki. diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index 06a90681ac0..d24ed1ed03f 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -29,6 +29,8 @@ import { botNames, botOpenIds } from "./monitor.state.js"; import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js"; import { getFeishuRuntime } from "./runtime.js"; import { getMessageFeishu } from "./send.js"; +import { getFeishuSequentialKey } from "./sequential-key.js"; +import { createSequentialQueue } from "./sequential-queue.js"; import { createFeishuThreadBindingManager } from "./thread-bindings.js"; import type { FeishuChatType, ResolvedFeishuAccount } from "./types.js"; @@ -290,25 +292,6 @@ function parseFeishuCardActionEventPayload(value: unknown): FeishuCardActionEven }; } -/** - * Per-chat serial queue that ensures messages from the same chat are processed - * in arrival order while allowing different chats to run concurrently. - */ -function createChatQueue() { - const queues = new Map>(); - return (chatId: string, task: () => Promise): Promise => { - const prev = queues.get(chatId) ?? Promise.resolve(); - const next = prev.then(task, task); - queues.set(chatId, next); - void next.finally(() => { - if (queues.get(chatId) === next) { - queues.delete(chatId); - } - }); - return next; - }; -} - function mergeFeishuDebounceMentions( entries: FeishuMessageEvent[], ): FeishuMessageEvent["message"]["mentions"] | undefined { @@ -395,7 +378,9 @@ function registerEventHandlers( }); const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - const enqueue = createChatQueue(); + // Keep normal Feishu traffic FIFO per chat while allowing explicit out-of-band + // commands like /btw and /stop to bypass the busy main-chat lane. + const enqueue = createSequentialQueue(); const runFeishuHandler = async (params: { task: () => Promise; errorMessage: string }) => { if (fireAndForget) { void params.task().catch((err) => { @@ -410,7 +395,12 @@ function registerEventHandlers( } }; const dispatchFeishuMessage = async (event: FeishuMessageEvent) => { - const chatId = event.message.chat_id?.trim() || "unknown"; + const sequentialKey = getFeishuSequentialKey({ + accountId, + event, + botOpenId: botOpenIds.get(accountId), + botName: botNames.get(accountId), + }); const task = () => handleFeishuMessage({ cfg, @@ -422,7 +412,7 @@ function registerEventHandlers( accountId, processingClaimHeld: true, }); - await enqueue(chatId, task); + await enqueue(sequentialKey, task); }; const resolveSenderDebounceId = (event: FeishuMessageEvent): string | undefined => { const senderId = diff --git a/extensions/feishu/src/sequential-key.test.ts b/extensions/feishu/src/sequential-key.test.ts new file mode 100644 index 00000000000..542f199c4a8 --- /dev/null +++ b/extensions/feishu/src/sequential-key.test.ts @@ -0,0 +1,72 @@ +import { describe, expect, it } from "vitest"; +import type { FeishuMessageEvent } from "./bot.js"; +import { getFeishuSequentialKey } from "./sequential-key.js"; + +function createTextEvent(params: { + text: string; + messageId?: string; + chatId?: string; +}): FeishuMessageEvent { + return { + sender: { + sender_id: { + open_id: "ou_sender_1", + user_id: "ou_user_1", + }, + sender_type: "user", + }, + message: { + message_id: params.messageId ?? "om_message_1", + chat_id: params.chatId ?? "oc_dm_chat", + chat_type: "p2p", + message_type: "text", + content: JSON.stringify({ text: params.text }), + }, + } as FeishuMessageEvent; +} + +describe("getFeishuSequentialKey", () => { + it.each([ + [createTextEvent({ text: "hello" }), "feishu:default:oc_dm_chat"], + [createTextEvent({ text: "/status" }), "feishu:default:oc_dm_chat"], + [createTextEvent({ text: "/stop" }), "feishu:default:oc_dm_chat:control"], + [createTextEvent({ text: "/btw what changed?" }), "feishu:default:oc_dm_chat:btw"], + ])("resolves sequential key %#", (event, expected) => { + expect( + getFeishuSequentialKey({ + accountId: "default", + event, + }), + ).toBe(expected); + }); + + it("keeps /btw on a stable per-chat lane across different message ids", () => { + const first = createTextEvent({ text: "/btw one", messageId: "om_message_1" }); + const second = createTextEvent({ text: "/btw two", messageId: "om_message_2" }); + + expect( + getFeishuSequentialKey({ + accountId: "default", + event: first, + }), + ).toBe("feishu:default:oc_dm_chat:btw"); + expect( + getFeishuSequentialKey({ + accountId: "default", + event: second, + }), + ).toBe("feishu:default:oc_dm_chat:btw"); + }); + + it("falls back to a stable btw lane when the message id is unavailable", () => { + const event = createTextEvent({ text: "/btw what changed?" }); + delete (event.message as { message_id?: string }).message_id; + + expect( + getFeishuSequentialKey({ + accountId: "default", + event, + }), + ).toBe("feishu:default:oc_dm_chat:btw"); + }); +}); diff --git a/extensions/feishu/src/sequential-key.ts b/extensions/feishu/src/sequential-key.ts new file mode 100644 index 00000000000..4dabde0ad7d --- /dev/null +++ b/extensions/feishu/src/sequential-key.ts @@ -0,0 +1,25 @@ +import { isAbortRequestText, isBtwRequestText } from "openclaw/plugin-sdk/reply-runtime"; +import { parseFeishuMessageEvent, type FeishuMessageEvent } from "./bot.js"; + +export function getFeishuSequentialKey(params: { + accountId: string; + event: FeishuMessageEvent; + botOpenId?: string; + botName?: string; +}): string { + const { accountId, event, botOpenId, botName } = params; + const chatId = event.message.chat_id?.trim() || "unknown"; + const baseKey = `feishu:${accountId}:${chatId}`; + const parsed = parseFeishuMessageEvent(event, botOpenId, botName); + const text = parsed.content.trim(); + + if (isAbortRequestText(text)) { + return `${baseKey}:control`; + } + + if (isBtwRequestText(text)) { + return `${baseKey}:btw`; + } + + return baseKey; +} diff --git a/extensions/feishu/src/sequential-queue.test.ts b/extensions/feishu/src/sequential-queue.test.ts new file mode 100644 index 00000000000..a3dfaec15f0 --- /dev/null +++ b/extensions/feishu/src/sequential-queue.test.ts @@ -0,0 +1,64 @@ +import { describe, expect, it } from "vitest"; +import { createSequentialQueue } from "./sequential-queue.js"; + +function createDeferred() { + let resolve!: () => void; + const promise = new Promise((res) => { + resolve = res; + }); + return { promise, resolve }; +} + +describe("createSequentialQueue", () => { + it("serializes tasks for the same key", async () => { + const enqueue = createSequentialQueue(); + const gate = createDeferred(); + const order: string[] = []; + + const first = enqueue("feishu:default:chat-1", async () => { + order.push("first:start"); + await gate.promise; + order.push("first:end"); + }); + const second = enqueue("feishu:default:chat-1", async () => { + order.push("second:start"); + order.push("second:end"); + }); + + await Promise.resolve(); + expect(order).toEqual(["first:start"]); + + gate.resolve(); + await Promise.all([first, second]); + + expect(order).toEqual(["first:start", "first:end", "second:start", "second:end"]); + }); + + it("allows different keys to run concurrently", async () => { + const enqueue = createSequentialQueue(); + const gateA = createDeferred(); + const gateB = createDeferred(); + const order: string[] = []; + + const first = enqueue("feishu:default:chat-1", async () => { + order.push("chat-1:start"); + await gateA.promise; + order.push("chat-1:end"); + }); + const second = enqueue("feishu:default:chat-1:btw:om_2", async () => { + order.push("btw:start"); + await gateB.promise; + order.push("btw:end"); + }); + + await Promise.resolve(); + expect(order).toEqual(["chat-1:start", "btw:start"]); + + gateA.resolve(); + gateB.resolve(); + await Promise.all([first, second]); + + expect(order).toContain("chat-1:end"); + expect(order).toContain("btw:end"); + }); +}); diff --git a/extensions/feishu/src/sequential-queue.ts b/extensions/feishu/src/sequential-queue.ts new file mode 100644 index 00000000000..42ccadd200e --- /dev/null +++ b/extensions/feishu/src/sequential-queue.ts @@ -0,0 +1,15 @@ +export function createSequentialQueue() { + const queues = new Map>(); + + return (key: string, task: () => Promise): Promise => { + const previous = queues.get(key) ?? Promise.resolve(); + const next = previous.then(task, task); + queues.set(key, next); + void next.finally(() => { + if (queues.get(key) === next) { + queues.delete(key); + } + }); + return next; + }; +}