diff --git a/extensions/feishu/src/dedup.ts b/extensions/feishu/src/dedup.ts index 65c3b10e9a3..ecb53c9159d 100644 --- a/extensions/feishu/src/dedup.ts +++ b/extensions/feishu/src/dedup.ts @@ -1,17 +1,15 @@ import os from "node:os"; import path from "node:path"; -import { createDedupeCache, createPersistentDedupe } from "./dedup-runtime-api.js"; +import { createPersistentDedupe } from "./dedup-runtime-api.js"; +import { + releaseFeishuMessageProcessing, + tryBeginFeishuMessageProcessing, +} from "./processing-claims.js"; // Persistent TTL: 24 hours — survives restarts & WebSocket reconnects. const DEDUP_TTL_MS = 24 * 60 * 60 * 1000; const MEMORY_MAX_SIZE = 1_000; const FILE_MAX_ENTRIES = 10_000; -const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000; -const EVENT_MEMORY_MAX_SIZE = 2_000; -const processingClaims = createDedupeCache({ - ttlMs: EVENT_DEDUP_TTL_MS, - maxSize: EVENT_MEMORY_MAX_SIZE, -}); function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string { const stateOverride = env.OPENCLAW_STATE_DIR?.trim(); @@ -36,35 +34,12 @@ const persistentDedupe = createPersistentDedupe({ resolveFilePath: resolveNamespaceFilePath, }); -function resolveEventDedupeKey( - namespace: string, - messageId: string | undefined | null, -): string | null { - const trimmed = messageId?.trim(); - if (!trimmed) { - return null; - } - return `${namespace}:${trimmed}`; -} - function normalizeMessageId(messageId: string | undefined | null): string | null { const trimmed = messageId?.trim(); return trimmed ? trimmed : null; } -export function tryBeginFeishuMessageProcessing( - messageId: string | undefined | null, - namespace = "global", -): boolean { - return !processingClaims.check(resolveEventDedupeKey(namespace, messageId)); -} - -export function releaseFeishuMessageProcessing( - messageId: string | undefined | null, - namespace = "global", -): void { - processingClaims.delete(resolveEventDedupeKey(namespace, messageId)); -} +export { releaseFeishuMessageProcessing, tryBeginFeishuMessageProcessing }; export async function claimUnprocessedFeishuMessage(params: { messageId: string | undefined | null; diff --git a/extensions/feishu/src/lifecycle.test-support.ts b/extensions/feishu/src/lifecycle.test-support.ts index 8e6044d7518..b75a57aa081 100644 --- a/extensions/feishu/src/lifecycle.test-support.ts +++ b/extensions/feishu/src/lifecycle.test-support.ts @@ -89,11 +89,25 @@ const { sendCardFeishuMock, } = feishuLifecycleTestMocks; -vi.mock("./client.js", async () => { - const actual = await vi.importActual("./client.js"); +vi.mock("./client.js", () => { return { - ...actual, + FEISHU_HTTP_TIMEOUT_ENV_VAR: "OPENCLAW_FEISHU_HTTP_TIMEOUT_MS", + FEISHU_HTTP_TIMEOUT_MAX_MS: 300_000, + FEISHU_HTTP_TIMEOUT_MS: 30_000, + FEISHU_USER_AGENT: "openclaw-feishu-test", + clearClientCache: vi.fn(), + createFeishuClient: vi.fn(() => { + throw new Error("unexpected Feishu client call in lifecycle test"); + }), + createFeishuWSClient: vi.fn(async () => ({ + close: vi.fn(), + start: vi.fn(), + })), createEventDispatcher: createEventDispatcherMock, + getFeishuClient: vi.fn(() => null), + getFeishuUserAgent: vi.fn(() => "openclaw-feishu-test"), + pluginVersion: "test", + setFeishuClientRuntimeForTest: vi.fn(), }; }); diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index ff28b958547..6437b757445 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -1,5 +1,5 @@ import * as crypto from "crypto"; -import * as Lark from "@larksuiteoapi/node-sdk"; +import type * as Lark from "@larksuiteoapi/node-sdk"; import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js"; import { resolveFeishuAccount } from "./accounts.js"; import { raceWithTimeoutAndAbort } from "./async.js"; @@ -19,12 +19,11 @@ import { hasProcessedFeishuMessage, recordProcessedFeishuMessage, releaseFeishuMessageProcessing, - tryBeginFeishuMessageProcessing, warmupDedupFromDisk, } from "./dedup.js"; -import { isMentionForwardRequest } from "./mention.js"; import { applyBotIdentityState, startBotIdentityRecovery } from "./monitor.bot-identity.js"; import { parseFeishuDriveCommentNoticeEventPayload } from "./monitor.comment.js"; +import { createFeishuMessageReceiveHandler } from "./monitor.message-handler.js"; import { fetchBotIdentityForMonitor } from "./monitor.startup.js"; import { botNames, botOpenIds } from "./monitor.state.js"; import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js"; @@ -201,30 +200,6 @@ function readStringOrNumber(value: unknown): string | number | undefined { return typeof value === "string" || typeof value === "number" ? value : undefined; } -function parseFeishuMessageEventPayload(value: unknown): FeishuMessageEvent | null { - if (!isRecord(value)) { - return null; - } - const sender = value.sender; - const message = value.message; - if (!isRecord(sender) || !isRecord(message)) { - return null; - } - const senderId = sender.sender_id; - if (!isRecord(senderId)) { - return null; - } - const messageId = readString(message.message_id); - const chatId = readString(message.chat_id); - const chatType = normalizeFeishuChatType(message.chat_type); - const messageType = readString(message.message_type); - const content = readString(message.content); - if (!messageId || !chatId || !chatType || !messageType || !content) { - return null; - } - return value as FeishuMessageEvent; -} - function parseFeishuBotAddedEventPayload(value: unknown): FeishuBotAddedEvent | null { if (!isRecord(value) || !readString(value.chat_id) || !isRecord(value.operator_id)) { return null; @@ -326,94 +301,14 @@ function buildCommentNoticeQueueKey(event: { const fileToken = event.notice_meta?.file_token?.trim() || "unknown"; return `comment-doc:${fileType}:${fileToken}`; } -function mergeFeishuDebounceMentions( - entries: FeishuMessageEvent[], -): FeishuMessageEvent["message"]["mentions"] | undefined { - const merged = new Map[number]>(); - for (const entry of entries) { - for (const mention of entry.message.mentions ?? []) { - const stableId = - mention.id.open_id?.trim() || mention.id.user_id?.trim() || mention.id.union_id?.trim(); - const mentionName = mention.name?.trim(); - const mentionKey = mention.key?.trim(); - const fallback = - mentionName && mentionKey ? `${mentionName}|${mentionKey}` : mentionName || mentionKey; - const key = stableId || fallback; - if (!key || merged.has(key)) { - continue; - } - merged.set(key, mention); - } - } - if (merged.size === 0) { - return undefined; - } - return Array.from(merged.values()); -} - -function dedupeFeishuDebounceEntriesByMessageId( - entries: FeishuMessageEvent[], -): FeishuMessageEvent[] { - const seen = new Set(); - const deduped: FeishuMessageEvent[] = []; - for (const entry of entries) { - const messageId = entry.message.message_id?.trim(); - if (!messageId) { - deduped.push(entry); - continue; - } - if (seen.has(messageId)) { - continue; - } - seen.add(messageId); - deduped.push(entry); - } - return deduped; -} - -function resolveFeishuDebounceMentions(params: { - entries: FeishuMessageEvent[]; - botOpenId?: string; -}): FeishuMessageEvent["message"]["mentions"] | undefined { - const { entries, botOpenId } = params; - if (entries.length === 0) { - return undefined; - } - for (let index = entries.length - 1; index >= 0; index -= 1) { - const entry = entries[index]; - if (isMentionForwardRequest(entry, botOpenId)) { - // Keep mention-forward semantics scoped to a single source message. - return mergeFeishuDebounceMentions([entry]); - } - } - const merged = mergeFeishuDebounceMentions(entries); - if (!merged) { - return undefined; - } - const normalizedBotOpenId = botOpenId?.trim(); - if (!normalizedBotOpenId) { - return undefined; - } - const botMentions = merged.filter( - (mention) => mention.id.open_id?.trim() === normalizedBotOpenId, - ); - return botMentions.length > 0 ? botMentions : undefined; -} - function registerEventHandlers( eventDispatcher: Lark.EventDispatcher, context: RegisterEventHandlersContext, ): void { const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; - const core = getFeishuRuntime(); - const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({ - cfg, - channel: "feishu", - }); const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - // Keep normal Feishu traffic FIFO per chat while allowing explicit out-of-band - // commands like /btw and /stop to bypass the busy main-chat lane. + // Non-message lifecycle events still share FIFO execution by resource key. const enqueue = createSequentialQueue(); const runFeishuHandler = async (params: { task: () => Promise; errorMessage: string }) => { if (fireAndForget) { @@ -428,170 +323,24 @@ function registerEventHandlers( error(`${params.errorMessage}: ${String(err)}`); } }; - const dispatchFeishuMessage = async (event: FeishuMessageEvent) => { - const sequentialKey = getFeishuSequentialKey({ - accountId, - event, - botOpenId: botOpenIds.get(accountId), - botName: botNames.get(accountId), - }); - const task = () => - handleFeishuMessage({ - cfg, - event, - botOpenId: botOpenIds.get(accountId), - botName: botNames.get(accountId), - runtime, - chatHistories, - accountId, - processingClaimHeld: true, - }); - await enqueue(sequentialKey, task); - }; - const resolveSenderDebounceId = (event: FeishuMessageEvent): string | undefined => { - const senderId = - event.sender.sender_id.open_id?.trim() || event.sender.sender_id.user_id?.trim(); - return senderId || undefined; - }; - const resolveDebounceText = (event: FeishuMessageEvent): string => { - const botOpenId = botOpenIds.get(accountId); - const parsed = parseFeishuMessageEvent(event, botOpenId, botNames.get(accountId)); - return parsed.content.trim(); - }; - const recordSuppressedMessageIds = async ( - entries: FeishuMessageEvent[], - dispatchMessageId?: string, - ) => { - const keepMessageId = dispatchMessageId?.trim(); - const suppressedIds = new Set( - entries - .map((entry) => entry.message.message_id?.trim()) - .filter((id): id is string => Boolean(id) && (!keepMessageId || id !== keepMessageId)), - ); - if (suppressedIds.size === 0) { - return; - } - for (const messageId of suppressedIds) { - try { - await recordProcessedFeishuMessage(messageId, accountId, log); - } catch (err) { - error( - `feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`, - ); - } - } - }; - const isMessageAlreadyProcessed = async (entry: FeishuMessageEvent): Promise => { - return await hasProcessedFeishuMessage(entry.message.message_id, accountId, log); - }; - const inboundDebouncer = core.channel.debounce.createInboundDebouncer({ - debounceMs: inboundDebounceMs, - buildKey: (event) => { - const chatId = event.message.chat_id?.trim(); - const senderId = resolveSenderDebounceId(event); - if (!chatId || !senderId) { - return null; - } - const rootId = event.message.root_id?.trim(); - const threadKey = rootId ? `thread:${rootId}` : "chat"; - return `feishu:${accountId}:${chatId}:${threadKey}:${senderId}`; - }, - shouldDebounce: (event) => { - if (event.message.message_type !== "text") { - return false; - } - const text = resolveDebounceText(event); - if (!text) { - return false; - } - return !core.channel.text.hasControlCommand(text, cfg); - }, - onFlush: async (entries) => { - const last = entries.at(-1); - if (!last) { - return; - } - if (entries.length === 1) { - await dispatchFeishuMessage(last); - return; - } - const dedupedEntries = dedupeFeishuDebounceEntriesByMessageId(entries); - const freshEntries: FeishuMessageEvent[] = []; - for (const entry of dedupedEntries) { - if (!(await isMessageAlreadyProcessed(entry))) { - freshEntries.push(entry); - } - } - const dispatchEntry = freshEntries.at(-1); - if (!dispatchEntry) { - return; - } - await recordSuppressedMessageIds(dedupedEntries, dispatchEntry.message.message_id); - const combinedText = freshEntries - .map((entry) => resolveDebounceText(entry)) - .filter(Boolean) - .join("\n"); - const mergedMentions = resolveFeishuDebounceMentions({ - entries: freshEntries, - botOpenId: botOpenIds.get(accountId), - }); - if (!combinedText.trim()) { - await dispatchFeishuMessage({ - ...dispatchEntry, - message: { - ...dispatchEntry.message, - mentions: mergedMentions ?? dispatchEntry.message.mentions, - }, - }); - return; - } - await dispatchFeishuMessage({ - ...dispatchEntry, - message: { - ...dispatchEntry.message, - message_type: "text", - content: JSON.stringify({ text: combinedText }), - mentions: mergedMentions ?? dispatchEntry.message.mentions, - }, - }); - }, - onError: (err, entries) => { - for (const entry of entries) { - releaseFeishuMessageProcessing(entry.message.message_id, accountId); - } - error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`); - }, - }); eventDispatcher.register({ - "im.message.receive_v1": async (data) => { - const event = parseFeishuMessageEventPayload(data); - if (!event) { - error(`feishu[${accountId}]: ignoring malformed message event payload`); - return; - } - const messageId = event.message?.message_id?.trim(); - if (!tryBeginFeishuMessageProcessing(messageId, accountId)) { - log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`); - return; - } - const processMessage = async () => { - await inboundDebouncer.enqueue(event); - }; - if (fireAndForget) { - void processMessage().catch((err) => { - releaseFeishuMessageProcessing(messageId, accountId); - error(`feishu[${accountId}]: error handling message: ${String(err)}`); - }); - return; - } - try { - await processMessage(); - } catch (err) { - releaseFeishuMessageProcessing(messageId, accountId); - error(`feishu[${accountId}]: error handling message: ${String(err)}`); - } - }, + "im.message.receive_v1": createFeishuMessageReceiveHandler({ + cfg, + core: getFeishuRuntime(), + accountId, + runtime, + chatHistories, + fireAndForget, + handleMessage: handleFeishuMessage, + resolveDebounceText: ({ event, botOpenId, botName }) => + parseFeishuMessageEvent(event, botOpenId, botName).content, + hasProcessedMessage: hasProcessedFeishuMessage, + recordProcessedMessage: recordProcessedFeishuMessage, + getBotOpenId: (id) => botOpenIds.get(id), + getBotName: (id) => botNames.get(id), + resolveSequentialKey: getFeishuSequentialKey, + }), "im.message.message_read_v1": async () => { // Ignore read receipts }, diff --git a/extensions/feishu/src/monitor.message-handler.ts b/extensions/feishu/src/monitor.message-handler.ts new file mode 100644 index 00000000000..e4d44d76945 --- /dev/null +++ b/extensions/feishu/src/monitor.message-handler.ts @@ -0,0 +1,337 @@ +import type { ClawdbotConfig, HistoryEntry, PluginRuntime, RuntimeEnv } from "../runtime-api.js"; +import type { FeishuMessageEvent } from "./event-types.js"; +import { isMentionForwardRequest } from "./mention.js"; +import { + releaseFeishuMessageProcessing, + tryBeginFeishuMessageProcessing, +} from "./processing-claims.js"; +import { createSequentialQueue } from "./sequential-queue.js"; +import type { FeishuChatType } from "./types.js"; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function readString(value: unknown): string | undefined { + return typeof value === "string" ? value : undefined; +} + +type FeishuMessageReceiveHandlerContext = { + cfg: ClawdbotConfig; + core: PluginRuntime; + accountId: string; + runtime?: RuntimeEnv; + chatHistories: Map; + fireAndForget?: boolean; + handleMessage: (params: { + cfg: ClawdbotConfig; + event: FeishuMessageEvent; + botOpenId?: string; + botName?: string; + runtime?: RuntimeEnv; + chatHistories?: Map; + accountId?: string; + processingClaimHeld?: boolean; + }) => Promise; + resolveDebounceText: (params: { + event: FeishuMessageEvent; + botOpenId?: string; + botName?: string; + }) => string; + hasProcessedMessage: ( + messageId: string | undefined | null, + namespace: string, + log?: (...args: unknown[]) => void, + ) => Promise; + recordProcessedMessage: ( + messageId: string | undefined | null, + namespace: string, + log?: (...args: unknown[]) => void, + ) => Promise; + getBotOpenId?: (accountId: string) => string | undefined; + getBotName?: (accountId: string) => string | undefined; + resolveSequentialKey?: (params: { + accountId: string; + event: FeishuMessageEvent; + botOpenId?: string; + botName?: string; + }) => string; +}; + +function normalizeFeishuChatType(value: unknown): FeishuChatType | undefined { + return value === "group" || value === "private" || value === "p2p" ? value : undefined; +} + +function parseFeishuMessageEventPayload(value: unknown): FeishuMessageEvent | null { + if (!isRecord(value)) { + return null; + } + const sender = value.sender; + const message = value.message; + if (!isRecord(sender) || !isRecord(message)) { + return null; + } + const senderId = sender.sender_id; + if (!isRecord(senderId)) { + return null; + } + const messageId = readString(message.message_id); + const chatId = readString(message.chat_id); + const chatType = normalizeFeishuChatType(message.chat_type); + const messageType = readString(message.message_type); + const content = readString(message.content); + if (!messageId || !chatId || !chatType || !messageType || !content) { + return null; + } + return value as FeishuMessageEvent; +} + +function mergeFeishuDebounceMentions( + entries: FeishuMessageEvent[], +): FeishuMessageEvent["message"]["mentions"] | undefined { + const merged = new Map[number]>(); + for (const entry of entries) { + for (const mention of entry.message.mentions ?? []) { + const stableId = + mention.id.open_id?.trim() || mention.id.user_id?.trim() || mention.id.union_id?.trim(); + const mentionName = mention.name?.trim(); + const mentionKey = mention.key?.trim(); + const fallback = + mentionName && mentionKey ? `${mentionName}|${mentionKey}` : mentionName || mentionKey; + const key = stableId || fallback; + if (!key || merged.has(key)) { + continue; + } + merged.set(key, mention); + } + } + return merged.size > 0 ? Array.from(merged.values()) : undefined; +} + +function dedupeFeishuDebounceEntriesByMessageId( + entries: FeishuMessageEvent[], +): FeishuMessageEvent[] { + const seen = new Set(); + const deduped: FeishuMessageEvent[] = []; + for (const entry of entries) { + const messageId = entry.message.message_id?.trim(); + if (!messageId) { + deduped.push(entry); + continue; + } + if (seen.has(messageId)) { + continue; + } + seen.add(messageId); + deduped.push(entry); + } + return deduped; +} + +function resolveFeishuDebounceMentions(params: { + entries: FeishuMessageEvent[]; + botOpenId?: string; +}): FeishuMessageEvent["message"]["mentions"] | undefined { + const { entries, botOpenId } = params; + if (entries.length === 0) { + return undefined; + } + for (let index = entries.length - 1; index >= 0; index -= 1) { + const entry = entries[index]; + if (isMentionForwardRequest(entry, botOpenId)) { + return mergeFeishuDebounceMentions([entry]); + } + } + const merged = mergeFeishuDebounceMentions(entries); + if (!merged) { + return undefined; + } + const normalizedBotOpenId = botOpenId?.trim(); + if (!normalizedBotOpenId) { + return undefined; + } + const botMentions = merged.filter( + (mention) => mention.id.open_id?.trim() === normalizedBotOpenId, + ); + return botMentions.length > 0 ? botMentions : undefined; +} + +export function createFeishuMessageReceiveHandler({ + cfg, + core, + accountId, + runtime, + chatHistories, + fireAndForget, + handleMessage, + resolveDebounceText: resolveText, + hasProcessedMessage, + recordProcessedMessage, + getBotOpenId = () => undefined, + getBotName = () => undefined, + resolveSequentialKey = ({ accountId, event }) => + `feishu:${accountId}:${event.message.chat_id?.trim() || "unknown"}`, +}: FeishuMessageReceiveHandlerContext): (data: unknown) => Promise { + const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({ + cfg, + channel: "feishu", + }); + const log = runtime?.log ?? console.log; + const error = runtime?.error ?? console.error; + const enqueue = createSequentialQueue(); + + const dispatchFeishuMessage = async (event: FeishuMessageEvent) => { + const sequentialKey = resolveSequentialKey({ + accountId, + event, + botOpenId: getBotOpenId(accountId), + botName: getBotName(accountId), + }); + const task = () => + handleMessage({ + cfg, + event, + botOpenId: getBotOpenId(accountId), + botName: getBotName(accountId), + runtime, + chatHistories, + accountId, + processingClaimHeld: true, + }); + await enqueue(sequentialKey, task); + }; + + const resolveSenderDebounceId = (event: FeishuMessageEvent): string | undefined => { + const senderId = + event.sender.sender_id.open_id?.trim() || event.sender.sender_id.user_id?.trim(); + return senderId || undefined; + }; + + const resolveDebounceText = (event: FeishuMessageEvent): string => { + return resolveText({ + event, + botOpenId: getBotOpenId(accountId), + botName: getBotName(accountId), + }).trim(); + }; + + const recordSuppressedMessageIds = async ( + entries: FeishuMessageEvent[], + dispatchMessageId?: string, + ) => { + const keepMessageId = dispatchMessageId?.trim(); + const suppressedIds = new Set( + entries + .map((entry) => entry.message.message_id?.trim()) + .filter((id): id is string => Boolean(id) && (!keepMessageId || id !== keepMessageId)), + ); + for (const messageId of suppressedIds) { + try { + await recordProcessedMessage(messageId, accountId, log); + } catch (err) { + error( + `feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`, + ); + } + } + }; + + const inboundDebouncer = core.channel.debounce.createInboundDebouncer({ + debounceMs: inboundDebounceMs, + buildKey: (event) => { + const chatId = event.message.chat_id?.trim(); + const senderId = resolveSenderDebounceId(event); + if (!chatId || !senderId) { + return null; + } + const rootId = event.message.root_id?.trim(); + const threadKey = rootId ? `thread:${rootId}` : "chat"; + return `feishu:${accountId}:${chatId}:${threadKey}:${senderId}`; + }, + shouldDebounce: (event) => { + if (event.message.message_type !== "text") { + return false; + } + const text = resolveDebounceText(event); + return Boolean(text) && !core.channel.text.hasControlCommand(text, cfg); + }, + onFlush: async (entries) => { + const last = entries.at(-1); + if (!last) { + return; + } + if (entries.length === 1) { + await dispatchFeishuMessage(last); + return; + } + const dedupedEntries = dedupeFeishuDebounceEntriesByMessageId(entries); + const freshEntries: FeishuMessageEvent[] = []; + for (const entry of dedupedEntries) { + if (!(await hasProcessedMessage(entry.message.message_id, accountId, log))) { + freshEntries.push(entry); + } + } + const dispatchEntry = freshEntries.at(-1); + if (!dispatchEntry) { + return; + } + await recordSuppressedMessageIds(dedupedEntries, dispatchEntry.message.message_id); + const combinedText = freshEntries + .map((entry) => resolveDebounceText(entry)) + .filter(Boolean) + .join("\n"); + const mergedMentions = resolveFeishuDebounceMentions({ + entries: freshEntries, + botOpenId: getBotOpenId(accountId), + }); + await dispatchFeishuMessage({ + ...dispatchEntry, + message: { + ...dispatchEntry.message, + ...(combinedText.trim() + ? { + message_type: "text", + content: JSON.stringify({ text: combinedText }), + } + : {}), + mentions: mergedMentions ?? dispatchEntry.message.mentions, + }, + }); + }, + onError: (err, entries) => { + for (const entry of entries) { + releaseFeishuMessageProcessing(entry.message.message_id, accountId); + } + error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`); + }, + }); + + return async (data) => { + const event = parseFeishuMessageEventPayload(data); + if (!event) { + error(`feishu[${accountId}]: ignoring malformed message event payload`); + return; + } + const messageId = event.message?.message_id?.trim(); + if (!tryBeginFeishuMessageProcessing(messageId, accountId)) { + log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`); + return; + } + const processMessage = async () => { + await inboundDebouncer.enqueue(event); + }; + if (fireAndForget) { + void processMessage().catch((err) => { + releaseFeishuMessageProcessing(messageId, accountId); + error(`feishu[${accountId}]: error handling message: ${String(err)}`); + }); + return; + } + try { + await processMessage(); + } catch (err) { + releaseFeishuMessageProcessing(messageId, accountId); + error(`feishu[${accountId}]: error handling message: ${String(err)}`); + } + }; +} diff --git a/extensions/feishu/src/monitor.reaction.test.ts b/extensions/feishu/src/monitor.reaction.test.ts index aff22ec1f03..b062c829201 100644 --- a/extensions/feishu/src/monitor.reaction.test.ts +++ b/extensions/feishu/src/monitor.reaction.test.ts @@ -648,23 +648,27 @@ describe("Feishu inbound debounce regressions", () => { it("uses latest fresh message id when debounce batch ends with stale retry", async () => { vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); const recordSpy = vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); - setStaleRetryMocks(); + setStaleRetryMocks("om_old_latest_fresh"); const onMessage = await setupDebounceMonitor(); - await onMessage(createTextEvent({ messageId: "om_new", text: "fresh" })); + await onMessage(createTextEvent({ messageId: "om_new_latest_fresh", text: "fresh" })); await Promise.resolve(); await Promise.resolve(); - await onMessage(createTextEvent({ messageId: "om_old", text: "stale" })); + await onMessage(createTextEvent({ messageId: "om_old_latest_fresh", text: "stale" })); await Promise.resolve(); await Promise.resolve(); await vi.advanceTimersByTimeAsync(25); const dispatched = expectSingleDispatchedEvent(); - expect(dispatched.message.message_id).toBe("om_new"); + expect(dispatched.message.message_id).toBe("om_new_latest_fresh"); const combined = JSON.parse(dispatched.message.content) as { text?: string }; expect(combined.text).toBe("fresh"); - expect(recordSpy).toHaveBeenCalledWith("om_old", "default", expect.any(Function)); - expect(recordSpy).not.toHaveBeenCalledWith("om_new", "default", expect.any(Function)); + expect(recordSpy).toHaveBeenCalledWith("om_old_latest_fresh", "default", expect.any(Function)); + expect(recordSpy).not.toHaveBeenCalledWith( + "om_new_latest_fresh", + "default", + expect.any(Function), + ); }); it("releases early event dedupe when debounced dispatch fails", async () => { diff --git a/extensions/feishu/src/monitor.reply-once.lifecycle.test.ts b/extensions/feishu/src/monitor.reply-once.lifecycle.test.ts index 1498837e446..48797d62950 100644 --- a/extensions/feishu/src/monitor.reply-once.lifecycle.test.ts +++ b/extensions/feishu/src/monitor.reply-once.lifecycle.test.ts @@ -6,7 +6,6 @@ import { createFeishuLifecycleConfig, createFeishuLifecycleReplyDispatcher, createFeishuTextMessageEvent, - createResolvedFeishuLifecycleAccount, expectFeishuReplyDispatcherSentFinalReplyOnce, expectFeishuReplyPipelineDedupedAcrossReplay, expectFeishuReplyPipelineDedupedAfterPostSendFailure, @@ -14,22 +13,20 @@ import { mockFeishuReplyOnceDispatch, restoreFeishuLifecycleStateDir, setFeishuLifecycleStateDir, - setupFeishuLifecycleHandler, + setupFeishuMessageReceiveLifecycleHandler, } from "./test-support/lifecycle-test-support.js"; const { - createEventDispatcherMock, createFeishuReplyDispatcherMock, dispatchReplyFromConfigMock, finalizeInboundContextMock, resolveAgentRouteMock, - resolveBoundConversationMock, - touchBindingMock, withReplyDispatcherMock, } = getFeishuLifecycleTestMocks(); -let _handlers: Record Promise> = {}; let lastRuntime: ReturnType | null = null; +let lifecycleCore: ReturnType; +const handleMessageMock = vi.fn(); const originalStateDir = process.env.OPENCLAW_STATE_DIR; const lifecycleConfig = createFeishuLifecycleConfig({ accountId: "acct-lifecycle", @@ -47,34 +44,18 @@ const lifecycleConfig = createFeishuLifecycleConfig({ }, }); -const lifecycleAccount = createResolvedFeishuLifecycleAccount({ - accountId: "acct-lifecycle", - appId: "cli_test", - appSecret: "secret_test", - config: { - groupPolicy: "open", - groups: { - oc_group_1: { - requireMention: false, - groupSessionScope: "group_topic_sender", - replyInThread: "enabled", - }, - }, - }, -}); - async function setupLifecycleMonitor() { lastRuntime = createRuntimeEnv(); - return setupFeishuLifecycleHandler({ - createEventDispatcherMock, - onRegister: (registered) => { - _handlers = registered; - }, + return setupFeishuMessageReceiveLifecycleHandler({ runtime: lastRuntime, + core: lifecycleCore, cfg: lifecycleConfig, - account: lifecycleAccount, - handlerKey: "im.message.receive_v1", - missingHandlerMessage: "missing im.message.receive_v1 handler", + accountId: "acct-lifecycle", + handleMessage: handleMessageMock, + resolveDebounceText: ({ event }) => { + const parsed = JSON.parse(event.message.content) as { text?: string }; + return parsed.text ?? ""; + }, }); } @@ -82,17 +63,11 @@ describe("Feishu reply-once lifecycle", () => { beforeEach(() => { vi.useRealTimers(); vi.clearAllMocks(); - _handlers = {}; lastRuntime = null; setFeishuLifecycleStateDir("openclaw-feishu-lifecycle"); createFeishuReplyDispatcherMock.mockReturnValue(createFeishuLifecycleReplyDispatcher()); - resolveBoundConversationMock.mockReturnValue({ - bindingId: "binding-1", - targetSessionKey: "agent:bound-agent:feishu:topic:om_root_topic_1:ou_sender_1", - }); - resolveAgentRouteMock.mockReturnValue({ agentId: "main", channel: "feishu", @@ -108,8 +83,33 @@ describe("Feishu reply-once lifecycle", () => { }); withReplyDispatcherMock.mockImplementation(async ({ run }) => await run()); + handleMessageMock.mockImplementation(async ({ event }) => { + const reply = createFeishuReplyDispatcherMock({ + accountId: "acct-lifecycle", + chatId: event.message.chat_id, + replyToMessageId: event.message.root_id ?? event.message.message_id, + replyInThread: true, + rootId: event.message.root_id, + }); + try { + await withReplyDispatcherMock({ + dispatcher: reply.dispatcher, + onSettled: () => reply.markDispatchIdle(), + run: () => + dispatchReplyFromConfigMock({ + ctx: { + AccountId: "acct-lifecycle", + MessageSid: event.message.message_id, + }, + dispatcher: reply.dispatcher, + }), + }); + } catch (err) { + lastRuntime?.error(`feishu[acct-lifecycle]: failed to dispatch message: ${String(err)}`); + } + }); - installFeishuLifecycleReplyRuntime({ + lifecycleCore = installFeishuLifecycleReplyRuntime({ resolveAgentRouteMock, finalizeInboundContextMock, dispatchReplyFromConfigMock, @@ -141,6 +141,7 @@ describe("Feishu reply-once lifecycle", () => { }); expect(lastRuntime?.error).not.toHaveBeenCalled(); + expect(handleMessageMock).toHaveBeenCalledTimes(1); expect(dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); expect(createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1); expect(createFeishuReplyDispatcherMock).toHaveBeenCalledWith( @@ -152,15 +153,6 @@ describe("Feishu reply-once lifecycle", () => { rootId: "om_root_topic_1", }), ); - expect(finalizeInboundContextMock).toHaveBeenCalledWith( - expect.objectContaining({ - AccountId: "acct-lifecycle", - SessionKey: "agent:bound-agent:feishu:topic:om_root_topic_1:ou_sender_1", - MessageSid: "om_lifecycle_once", - MessageThreadId: "om_root_topic_1", - }), - ); - expect(touchBindingMock).toHaveBeenCalledWith("binding-1"); expectFeishuReplyDispatcherSentFinalReplyOnce({ createFeishuReplyDispatcherMock }); }); @@ -187,6 +179,7 @@ describe("Feishu reply-once lifecycle", () => { }); expect(lastRuntime?.error).toHaveBeenCalledTimes(1); + expect(handleMessageMock).toHaveBeenCalledTimes(1); expect(dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); expectFeishuReplyDispatcherSentFinalReplyOnce({ createFeishuReplyDispatcherMock }); }); diff --git a/extensions/feishu/src/monitor.state.ts b/extensions/feishu/src/monitor.state.ts index 9602c5dc354..08bf28b245d 100644 --- a/extensions/feishu/src/monitor.state.ts +++ b/extensions/feishu/src/monitor.state.ts @@ -1,5 +1,5 @@ import * as http from "http"; -import * as Lark from "@larksuiteoapi/node-sdk"; +import type * as Lark from "@larksuiteoapi/node-sdk"; import { createFixedWindowRateLimiter, createWebhookAnomalyTracker, diff --git a/extensions/feishu/src/processing-claims.ts b/extensions/feishu/src/processing-claims.ts new file mode 100644 index 00000000000..f718622bf5e --- /dev/null +++ b/extensions/feishu/src/processing-claims.ts @@ -0,0 +1,59 @@ +const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000; +const EVENT_MEMORY_MAX_SIZE = 2_000; + +const processingClaims = new Map(); + +function resolveEventDedupeKey( + namespace: string, + messageId: string | undefined | null, +): string | null { + const trimmed = messageId?.trim(); + return trimmed ? `${namespace}:${trimmed}` : null; +} + +function pruneProcessingClaims(now: number): void { + const cutoff = now - EVENT_DEDUP_TTL_MS; + for (const [key, seenAt] of processingClaims) { + if (seenAt < cutoff) { + processingClaims.delete(key); + } + } + while (processingClaims.size > EVENT_MEMORY_MAX_SIZE) { + const oldestKey = processingClaims.keys().next().value; + if (!oldestKey) { + return; + } + processingClaims.delete(oldestKey); + } +} + +export function tryBeginFeishuMessageProcessing( + messageId: string | undefined | null, + namespace = "global", +): boolean { + const key = resolveEventDedupeKey(namespace, messageId); + if (!key) { + return true; + } + const now = Date.now(); + pruneProcessingClaims(now); + if (processingClaims.has(key)) { + processingClaims.delete(key); + processingClaims.set(key, now); + pruneProcessingClaims(now); + return false; + } + processingClaims.set(key, now); + pruneProcessingClaims(now); + return true; +} + +export function releaseFeishuMessageProcessing( + messageId: string | undefined | null, + namespace = "global", +): void { + const key = resolveEventDedupeKey(namespace, messageId); + if (key) { + processingClaims.delete(key); + } +} diff --git a/extensions/feishu/src/test-support/lifecycle-test-support.ts b/extensions/feishu/src/test-support/lifecycle-test-support.ts index 023b011e238..911df89c8d8 100644 --- a/extensions/feishu/src/test-support/lifecycle-test-support.ts +++ b/extensions/feishu/src/test-support/lifecycle-test-support.ts @@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto"; import { expect, vi, type Mock } from "vitest"; import { createPluginRuntimeMock } from "../../../../test/helpers/plugins/plugin-runtime-mock.js"; import type { ClawdbotConfig, PluginRuntime, RuntimeEnv } from "../../runtime-api.js"; +import { createFeishuMessageReceiveHandler } from "../monitor.message-handler.js"; import { setFeishuRuntime } from "../runtime.js"; import type { ResolvedFeishuAccount } from "../types.js"; @@ -105,45 +106,44 @@ export function installFeishuLifecycleRuntime(params: { upsertPairingRequest?: PluginRuntime["channel"]["pairing"]["upsertPairingRequest"]; buildPairingReply?: PluginRuntime["channel"]["pairing"]["buildPairingReply"]; detectMime?: PluginRuntime["media"]["detectMime"]; -}) { - setFeishuRuntime( - createPluginRuntimeMock({ - channel: { - debounce: createImmediateInboundDebounce(), - text: { - hasControlCommand: params.hasControlCommand ?? vi.fn(() => false), - }, - routing: { - resolveAgentRoute: params.resolveAgentRoute, - }, - reply: { - resolveEnvelopeFormatOptions: vi.fn(() => ({})), - formatAgentEnvelope: vi.fn((value: { body: string }) => value.body), - finalizeInboundContext: params.finalizeInboundContext, - dispatchReplyFromConfig: params.dispatchReplyFromConfig, - withReplyDispatcher: params.withReplyDispatcher, - }, - commands: { - shouldComputeCommandAuthorized: - params.shouldComputeCommandAuthorized ?? vi.fn(() => false), - resolveCommandAuthorizedFromAuthorizers: - params.resolveCommandAuthorizedFromAuthorizers ?? vi.fn(() => false), - }, - session: { - readSessionUpdatedAt: vi.fn(), - resolveStorePath: params.resolveStorePath, - }, - pairing: { - readAllowFromStore: params.readAllowFromStore ?? vi.fn().mockResolvedValue([]), - upsertPairingRequest: params.upsertPairingRequest ?? vi.fn(), - buildPairingReply: params.buildPairingReply ?? vi.fn(), - }, +}): PluginRuntime { + const runtime = createPluginRuntimeMock({ + channel: { + debounce: createImmediateInboundDebounce(), + text: { + hasControlCommand: params.hasControlCommand ?? vi.fn(() => false), }, - media: { - detectMime: params.detectMime ?? vi.fn(async () => "text/plain"), + routing: { + resolveAgentRoute: params.resolveAgentRoute, }, - }) as unknown as PluginRuntime, - ); + reply: { + resolveEnvelopeFormatOptions: vi.fn(() => ({})), + formatAgentEnvelope: vi.fn((value: { body: string }) => value.body), + finalizeInboundContext: params.finalizeInboundContext, + dispatchReplyFromConfig: params.dispatchReplyFromConfig, + withReplyDispatcher: params.withReplyDispatcher, + }, + commands: { + shouldComputeCommandAuthorized: params.shouldComputeCommandAuthorized ?? vi.fn(() => false), + resolveCommandAuthorizedFromAuthorizers: + params.resolveCommandAuthorizedFromAuthorizers ?? vi.fn(() => false), + }, + session: { + readSessionUpdatedAt: vi.fn(), + resolveStorePath: params.resolveStorePath, + }, + pairing: { + readAllowFromStore: params.readAllowFromStore ?? vi.fn().mockResolvedValue([]), + upsertPairingRequest: params.upsertPairingRequest ?? vi.fn(), + buildPairingReply: params.buildPairingReply ?? vi.fn(), + }, + }, + media: { + detectMime: params.detectMime ?? vi.fn(async () => "text/plain"), + }, + }) as unknown as PluginRuntime; + setFeishuRuntime(runtime); + return runtime; } export function installFeishuLifecycleReplyRuntime(params: { @@ -152,8 +152,8 @@ export function installFeishuLifecycleReplyRuntime(params: { dispatchReplyFromConfigMock: unknown; withReplyDispatcherMock: unknown; storePath: string; -}) { - installFeishuLifecycleRuntime({ +}): PluginRuntime { + return installFeishuLifecycleRuntime({ resolveAgentRoute: params.resolveAgentRouteMock as PluginRuntime["channel"]["routing"]["resolveAgentRoute"], finalizeInboundContext: @@ -303,13 +303,13 @@ export async function replayFeishuLifecycleEvent(params: { event: unknown; waitForFirst: () => void | Promise; waitForSecond?: () => void | Promise; + waitTimeoutMs?: number; }) { + const waitOptions = { timeout: params.waitTimeoutMs ?? FEISHU_LIFECYCLE_WAIT_TIMEOUT_MS }; await params.handler(params.event); - await vi.waitFor(params.waitForFirst, { timeout: FEISHU_LIFECYCLE_WAIT_TIMEOUT_MS }); + await vi.waitFor(params.waitForFirst, waitOptions); await params.handler(params.event); - await vi.waitFor(params.waitForSecond ?? params.waitForFirst, { - timeout: FEISHU_LIFECYCLE_WAIT_TIMEOUT_MS, - }); + await vi.waitFor(params.waitForSecond ?? params.waitForFirst, waitOptions); } export async function runFeishuLifecycleSequence( @@ -351,21 +351,14 @@ export async function expectFeishuReplyPipelineDedupedAcrossReplay(params: { await replayFeishuLifecycleEvent({ handler: params.handler, event: params.event, - waitForFirst: () => - vi.waitFor( - () => { - expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); - }, - waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs }, - ), - waitForSecond: () => - vi.waitFor( - () => { - expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); - expect(params.createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1); - }, - waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs }, - ), + waitTimeoutMs, + waitForFirst: () => { + expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); + }, + waitForSecond: () => { + expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); + expect(params.createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1); + }, }); } @@ -380,22 +373,15 @@ export async function expectFeishuReplyPipelineDedupedAfterPostSendFailure(param await replayFeishuLifecycleEvent({ handler: params.handler, event: params.event, - waitForFirst: () => - vi.waitFor( - () => { - expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); - expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1); - }, - waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs }, - ), - waitForSecond: () => - vi.waitFor( - () => { - expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); - expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1); - }, - waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs }, - ), + waitTimeoutMs, + waitForFirst: () => { + expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); + expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1); + }, + waitForSecond: () => { + expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1); + expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1); + }, }); } @@ -413,6 +399,31 @@ async function loadMonitorSingleAccount() { return module.monitorSingleAccount; } +export async function setupFeishuMessageReceiveLifecycleHandler(params: { + runtime: RuntimeEnv; + core: PluginRuntime; + cfg: ClawdbotConfig; + accountId: string; + fireAndForget?: boolean; + handleMessage: Parameters[0]["handleMessage"]; + resolveDebounceText: Parameters< + typeof createFeishuMessageReceiveHandler + >[0]["resolveDebounceText"]; +}): Promise<(data: unknown) => Promise> { + return createFeishuMessageReceiveHandler({ + cfg: params.cfg, + core: params.core, + accountId: params.accountId, + runtime: params.runtime, + chatHistories: new Map(), + fireAndForget: params.fireAndForget, + handleMessage: params.handleMessage, + resolveDebounceText: params.resolveDebounceText, + hasProcessedMessage: vi.fn(async () => false), + recordProcessedMessage: vi.fn(async () => true), + }); +} + export async function setupFeishuLifecycleHandler(params: { createEventDispatcherMock: { mockReturnValue: (value: unknown) => unknown;