From 68b9ad4205aa4f931afa9f2beb0353af5dc74999 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 24 Apr 2026 12:36:50 +0100 Subject: [PATCH] test: slim feishu monitor handler imports --- extensions/feishu/src/dedup-runtime-api.ts | 8 +- extensions/feishu/src/monitor.account.ts | 246 ++---------------- .../feishu/src/monitor.bot-menu-handler.ts | 164 ++++++++++++ .../feishu/src/monitor.bot-menu.test.ts | 83 +----- .../src/monitor.comment-notice-handler.ts | 105 ++++++++ extensions/feishu/src/monitor.comment.test.ts | 89 +------ extensions/feishu/src/monitor.comment.ts | 12 +- .../feishu/src/monitor.synthetic-error.ts | 18 ++ 8 files changed, 331 insertions(+), 394 deletions(-) create mode 100644 extensions/feishu/src/monitor.bot-menu-handler.ts create mode 100644 extensions/feishu/src/monitor.comment-notice-handler.ts create mode 100644 extensions/feishu/src/monitor.synthetic-error.ts diff --git a/extensions/feishu/src/dedup-runtime-api.ts b/extensions/feishu/src/dedup-runtime-api.ts index 7b3920668fb..ca9b5cec8d4 100644 --- a/extensions/feishu/src/dedup-runtime-api.ts +++ b/extensions/feishu/src/dedup-runtime-api.ts @@ -1,5 +1,3 @@ -export { - createDedupeCache, - createPersistentDedupe, - readJsonFileWithFallback, -} from "../runtime-api.js"; +export { createDedupeCache } from "openclaw/plugin-sdk/core"; +export { createPersistentDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; +export { readJsonFileWithFallback } from "openclaw/plugin-sdk/json-store"; diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index b636e89d24d..9f60900e8c7 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -1,7 +1,6 @@ import * as crypto from "node:crypto"; import type * as Lark from "@larksuiteoapi/node-sdk"; import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js"; -import { resolveFeishuAccount } from "./accounts.js"; import { raceWithTimeoutAndAbort } from "./async.js"; import { handleFeishuMessage, @@ -10,50 +9,30 @@ import { type FeishuBotAddedEvent, } from "./bot.js"; import { handleFeishuCardAction, type FeishuCardActionEvent } from "./card-action.js"; -import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js"; import { createEventDispatcher } from "./client.js"; -import { handleFeishuCommentEvent } from "./comment-handler.js"; import { isRecord, readString } from "./comment-shared.js"; import { - claimUnprocessedFeishuMessage, hasProcessedFeishuMessage, recordProcessedFeishuMessage, - releaseFeishuMessageProcessing, warmupDedupFromDisk, } from "./dedup.js"; import { applyBotIdentityState, startBotIdentityRecovery } from "./monitor.bot-identity.js"; -import { parseFeishuDriveCommentNoticeEventPayload } from "./monitor.comment.js"; +import { createFeishuBotMenuHandler } from "./monitor.bot-menu-handler.js"; +import { createFeishuDriveCommentNoticeHandler } from "./monitor.comment-notice-handler.js"; import { createFeishuMessageReceiveHandler } from "./monitor.message-handler.js"; import { fetchBotIdentityForMonitor } from "./monitor.startup.js"; import { botNames, botOpenIds } from "./monitor.state.js"; +import { FeishuRetryableSyntheticEventError } from "./monitor.synthetic-error.js"; import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js"; import { getFeishuRuntime } from "./runtime.js"; import { getMessageFeishu } from "./send.js"; import { getFeishuSequentialKey } from "./sequential-key.js"; -import { createSequentialQueue } from "./sequential-queue.js"; import { createFeishuThreadBindingManager } from "./thread-bindings.js"; import type { FeishuChatType, ResolvedFeishuAccount } from "./types.js"; const FEISHU_REACTION_VERIFY_TIMEOUT_MS = 1_500; -export class FeishuRetryableSyntheticEventError extends Error { - constructor(message: string, options?: ErrorOptions) { - super(message, options); - this.name = "FeishuRetryableSyntheticEventError"; - } -} - -function isFeishuRetryableSyntheticEventError( - error: unknown, -): error is FeishuRetryableSyntheticEventError { - return ( - error instanceof FeishuRetryableSyntheticEventError || - (typeof error === "object" && - error !== null && - "name" in error && - error.name === "FeishuRetryableSyntheticEventError") - ); -} +export { FeishuRetryableSyntheticEventError }; export type FeishuReactionCreatedEvent = { message_id: string; @@ -104,6 +83,7 @@ export async function resolveReactionSyntheticEvent( return null; } + const { resolveFeishuAccount } = await import("./accounts.js"); const account = resolveFeishuAccount({ cfg, accountId }); const reactionNotifications = account.config.reactionNotifications ?? "own"; if (reactionNotifications === "off") { @@ -187,19 +167,6 @@ type RegisterEventHandlersContext = { fireAndForget?: boolean; }; -type FeishuBotMenuEvent = { - event_key?: string; - timestamp?: string | number; - operator?: { - operator_name?: string; - operator_id?: { open_id?: string; user_id?: string; union_id?: string }; - }; -}; - -function readStringOrNumber(value: unknown): string | number | undefined { - return typeof value === "string" || typeof value === "number" ? value : undefined; -} - function parseFeishuBotAddedEventPayload(value: unknown): FeishuBotAddedEvent | null { if (!isRecord(value) || !readString(value.chat_id) || !isRecord(value.operator_id)) { return null; @@ -214,32 +181,6 @@ function parseFeishuBotRemovedChatId(value: unknown): string | null { return readString(value.chat_id) ?? null; } -function parseFeishuBotMenuEvent(value: unknown): FeishuBotMenuEvent | null { - if (!isRecord(value)) { - return null; - } - const operator = value.operator; - if (operator !== undefined && !isRecord(operator)) { - return null; - } - return { - event_key: readString(value.event_key), - timestamp: readStringOrNumber(value.timestamp), - operator: operator - ? { - operator_name: readString(operator.operator_name), - operator_id: isRecord(operator.operator_id) - ? { - open_id: readString(operator.operator_id.open_id), - user_id: readString(operator.operator_id.user_id), - union_id: readString(operator.operator_id.union_id), - } - : undefined, - } - : undefined, - }; -} - function parseFeishuCardActionEventPayload(value: unknown): FeishuCardActionEvent | null { if (!isRecord(value)) { return null; @@ -291,16 +232,6 @@ function parseFeishuCardActionEventPayload(value: unknown): FeishuCardActionEven }; } -function buildCommentNoticeQueueKey(event: { - notice_meta?: { - file_type?: string; - file_token?: string; - }; -}): string { - const fileType = event.notice_meta?.file_type?.trim() || "unknown"; - const fileToken = event.notice_meta?.file_token?.trim() || "unknown"; - return `comment-doc:${fileType}:${fileToken}`; -} function registerEventHandlers( eventDispatcher: Lark.EventDispatcher, context: RegisterEventHandlersContext, @@ -308,8 +239,6 @@ function registerEventHandlers( const { cfg, accountId, runtime, chatHistories, fireAndForget } = context; const log = runtime?.log ?? console.log; const error = runtime?.error ?? console.error; - // Non-message lifecycle events still share FIFO execution by resource key. - const enqueue = createSequentialQueue(); const runFeishuHandler = async (params: { task: () => Promise; errorMessage: string }) => { if (fireAndForget) { void params.task().catch((err) => { @@ -366,68 +295,12 @@ function registerEventHandlers( error(`feishu[${accountId}]: error handling bot removed event: ${String(err)}`); } }, - "drive.notice.comment_add_v1": async (data: unknown) => { - await runFeishuHandler({ - errorMessage: `feishu[${accountId}]: error handling drive comment notice`, - task: async () => { - const event = parseFeishuDriveCommentNoticeEventPayload(data); - if (!event) { - error(`feishu[${accountId}]: ignoring malformed drive comment notice payload`); - return; - } - const eventId = event.event_id?.trim(); - const syntheticMessageId = eventId ? `drive-comment:${eventId}` : undefined; - if (syntheticMessageId) { - const claim = await claimUnprocessedFeishuMessage({ - messageId: syntheticMessageId, - namespace: accountId, - log, - }); - if (claim === "duplicate") { - log(`feishu[${accountId}]: dropping duplicate comment event ${syntheticMessageId}`); - return; - } - if (claim === "inflight") { - log(`feishu[${accountId}]: dropping in-flight comment event ${syntheticMessageId}`); - return; - } - } - log( - `feishu[${accountId}]: received drive comment notice ` + - `event=${event.event_id ?? "unknown"} ` + - `type=${event.notice_meta?.notice_type ?? "unknown"} ` + - `file=${event.notice_meta?.file_type ?? "unknown"}:${event.notice_meta?.file_token ?? "unknown"} ` + - `comment=${event.comment_id ?? "unknown"} ` + - `reply=${event.reply_id ?? "none"} ` + - `from=${event.notice_meta?.from_user_id?.open_id ?? "unknown"} ` + - `mentioned=${event.is_mentioned === true ? "yes" : "no"}`, - ); - try { - await enqueue(buildCommentNoticeQueueKey(event), async () => { - await handleFeishuCommentEvent({ - cfg, - accountId, - event, - botOpenId: botOpenIds.get(accountId), - runtime, - }); - }); - if (syntheticMessageId) { - await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); - } - } catch (err) { - if (syntheticMessageId && !isFeishuRetryableSyntheticEventError(err)) { - await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); - } - throw err; - } finally { - if (syntheticMessageId) { - releaseFeishuMessageProcessing(syntheticMessageId, accountId); - } - } - }, - }); - }, + "drive.notice.comment_add_v1": createFeishuDriveCommentNoticeHandler({ + cfg, + accountId, + runtime, + fireAndForget, + }), "im.message.reaction.created_v1": async (data) => { await runFeishuHandler({ errorMessage: `feishu[${accountId}]: error handling reaction event`, @@ -487,96 +360,13 @@ function registerEventHandlers( }, }); }, - "application.bot.menu_v6": async (data) => { - try { - const event = parseFeishuBotMenuEvent(data); - if (!event) { - return; - } - const operatorOpenId = event.operator?.operator_id?.open_id?.trim(); - const eventKey = event.event_key?.trim(); - if (!operatorOpenId || !eventKey) { - return; - } - const syntheticEvent: FeishuMessageEvent = { - sender: { - sender_id: { - open_id: operatorOpenId, - user_id: event.operator?.operator_id?.user_id, - union_id: event.operator?.operator_id?.union_id, - }, - sender_type: "user", - }, - message: { - message_id: `bot-menu:${eventKey}:${event.timestamp ?? Date.now()}`, - chat_id: `p2p:${operatorOpenId}`, - chat_type: "p2p", - message_type: "text", - content: JSON.stringify({ - text: `/menu ${eventKey}`, - }), - }, - }; - const syntheticMessageId = syntheticEvent.message.message_id; - const claim = await claimUnprocessedFeishuMessage({ - messageId: syntheticMessageId, - namespace: accountId, - log, - }); - if (claim === "duplicate") { - log(`feishu[${accountId}]: dropping duplicate bot-menu event for ${syntheticMessageId}`); - return; - } - if (claim === "inflight") { - log(`feishu[${accountId}]: dropping in-flight bot-menu event for ${syntheticMessageId}`); - return; - } - const handleLegacyMenu = () => - handleFeishuMessage({ - cfg, - event: syntheticEvent, - botOpenId: botOpenIds.get(accountId), - botName: botNames.get(accountId), - runtime, - chatHistories, - accountId, - processingClaimHeld: true, - }); - - const promise = maybeHandleFeishuQuickActionMenu({ - cfg, - eventKey, - operatorOpenId, - runtime, - accountId, - }) - .then(async (handledMenu) => { - if (handledMenu) { - await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); - releaseFeishuMessageProcessing(syntheticMessageId, accountId); - return; - } - return await handleLegacyMenu(); - }) - .catch(async (err) => { - if (isFeishuRetryableSyntheticEventError(err)) { - releaseFeishuMessageProcessing(syntheticMessageId, accountId); - } else { - await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); - } - throw err; - }); - if (fireAndForget) { - promise.catch((err) => { - error(`feishu[${accountId}]: error handling bot menu event: ${String(err)}`); - }); - return; - } - await promise; - } catch (err) { - error(`feishu[${accountId}]: error handling bot menu event: ${String(err)}`); - } - }, + "application.bot.menu_v6": createFeishuBotMenuHandler({ + cfg, + accountId, + runtime, + chatHistories, + fireAndForget, + }), "card.action.trigger": async (data: unknown) => { try { const event = parseFeishuCardActionEventPayload(data); diff --git a/extensions/feishu/src/monitor.bot-menu-handler.ts b/extensions/feishu/src/monitor.bot-menu-handler.ts new file mode 100644 index 00000000000..ce536beb230 --- /dev/null +++ b/extensions/feishu/src/monitor.bot-menu-handler.ts @@ -0,0 +1,164 @@ +import type { ClawdbotConfig, HistoryEntry, RuntimeEnv } from "../runtime-api.js"; +import { handleFeishuMessage, type FeishuMessageEvent } from "./bot.js"; +import { maybeHandleFeishuQuickActionMenu } from "./card-ux-launcher.js"; +import { + claimUnprocessedFeishuMessage, + recordProcessedFeishuMessage, + releaseFeishuMessageProcessing, +} from "./dedup.js"; +import { botNames, botOpenIds } from "./monitor.state.js"; +import { isFeishuRetryableSyntheticEventError } from "./monitor.synthetic-error.js"; + +type FeishuBotMenuEvent = { + event_key?: string; + timestamp?: string | number; + operator?: { + operator_name?: string; + operator_id?: { open_id?: string; user_id?: string; union_id?: string }; + }; +}; + +function readString(value: unknown): string | undefined { + return typeof value === "string" ? value : undefined; +} + +function readStringOrNumber(value: unknown): string | number | undefined { + return typeof value === "string" || typeof value === "number" ? value : undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function parseFeishuBotMenuEvent(value: unknown): FeishuBotMenuEvent | null { + if (!isRecord(value)) { + return null; + } + const operator = value.operator; + if (operator !== undefined && !isRecord(operator)) { + return null; + } + return { + event_key: readString(value.event_key), + timestamp: readStringOrNumber(value.timestamp), + operator: operator + ? { + operator_name: readString(operator.operator_name), + operator_id: isRecord(operator.operator_id) + ? { + open_id: readString(operator.operator_id.open_id), + user_id: readString(operator.operator_id.user_id), + union_id: readString(operator.operator_id.union_id), + } + : undefined, + } + : undefined, + }; +} + +export function createFeishuBotMenuHandler(params: { + cfg: ClawdbotConfig; + accountId: string; + runtime?: RuntimeEnv; + chatHistories: Map; + fireAndForget?: boolean; + getBotOpenId?: (accountId: string) => string | undefined; + getBotName?: (accountId: string) => string | undefined; +}): (data: unknown) => Promise { + const { cfg, accountId, runtime, chatHistories, fireAndForget } = params; + const log = runtime?.log ?? console.log; + const error = runtime?.error ?? console.error; + const getBotOpenId = params.getBotOpenId ?? ((id) => botOpenIds.get(id)); + const getBotName = params.getBotName ?? ((id) => botNames.get(id)); + + return async (data) => { + try { + const event = parseFeishuBotMenuEvent(data); + if (!event) { + return; + } + const operatorOpenId = event.operator?.operator_id?.open_id?.trim(); + const eventKey = event.event_key?.trim(); + if (!operatorOpenId || !eventKey) { + return; + } + const syntheticEvent: FeishuMessageEvent = { + sender: { + sender_id: { + open_id: operatorOpenId, + user_id: event.operator?.operator_id?.user_id, + union_id: event.operator?.operator_id?.union_id, + }, + sender_type: "user", + }, + message: { + message_id: `bot-menu:${eventKey}:${event.timestamp ?? Date.now()}`, + chat_id: `p2p:${operatorOpenId}`, + chat_type: "p2p", + message_type: "text", + content: JSON.stringify({ + text: `/menu ${eventKey}`, + }), + }, + }; + const syntheticMessageId = syntheticEvent.message.message_id; + const claim = await claimUnprocessedFeishuMessage({ + messageId: syntheticMessageId, + namespace: accountId, + log, + }); + if (claim === "duplicate") { + log(`feishu[${accountId}]: dropping duplicate bot-menu event for ${syntheticMessageId}`); + return; + } + if (claim === "inflight") { + log(`feishu[${accountId}]: dropping in-flight bot-menu event for ${syntheticMessageId}`); + return; + } + const handleLegacyMenu = () => + handleFeishuMessage({ + cfg, + event: syntheticEvent, + botOpenId: getBotOpenId(accountId), + botName: getBotName(accountId), + runtime, + chatHistories, + accountId, + processingClaimHeld: true, + }); + + const promise = maybeHandleFeishuQuickActionMenu({ + cfg, + eventKey, + operatorOpenId, + runtime, + accountId, + }) + .then(async (handledMenu) => { + if (handledMenu) { + await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); + releaseFeishuMessageProcessing(syntheticMessageId, accountId); + return; + } + return await handleLegacyMenu(); + }) + .catch(async (err) => { + if (isFeishuRetryableSyntheticEventError(err)) { + releaseFeishuMessageProcessing(syntheticMessageId, accountId); + } else { + await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); + } + throw err; + }); + if (fireAndForget) { + promise.catch((err) => { + error(`feishu[${accountId}]: error handling bot menu event: ${String(err)}`); + }); + return; + } + await promise; + } catch (err) { + error(`feishu[${accountId}]: error handling bot menu event: ${String(err)}`); + } + }; +} diff --git a/extensions/feishu/src/monitor.bot-menu.test.ts b/extensions/feishu/src/monitor.bot-menu.test.ts index 770fe26a3cb..ce2ea8411f9 100644 --- a/extensions/feishu/src/monitor.bot-menu.test.ts +++ b/extensions/feishu/src/monitor.bot-menu.test.ts @@ -1,49 +1,15 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { ClawdbotConfig, RuntimeEnv } from "../runtime-api.js"; +import type { ClawdbotConfig } from "../runtime-api.js"; import { expectFirstSentCardUsesFillWidthOnly } from "./card-test-helpers.js"; -import { monitorSingleAccount } from "./monitor.account.js"; -import { setFeishuRuntime } from "./runtime.js"; -import type { ResolvedFeishuAccount } from "./types.js"; +import { createFeishuBotMenuHandler } from "./monitor.bot-menu-handler.js"; -const createEventDispatcherMock = vi.hoisted(() => vi.fn()); -const monitorWebSocketMock = vi.hoisted(() => vi.fn(async () => {})); -const monitorWebhookMock = vi.hoisted(() => vi.fn(async () => {})); const handleFeishuMessageMock = vi.hoisted(() => vi.fn(async () => {})); const parseFeishuMessageEventMock = vi.hoisted(() => vi.fn()); const sendCardFeishuMock = vi.hoisted(() => vi.fn(async () => ({ messageId: "m1", chatId: "c1" }))); const getMessageFeishuMock = vi.hoisted(() => vi.fn()); -const createFeishuThreadBindingManagerMock = vi.hoisted(() => vi.fn(() => ({ stop: vi.fn() }))); -let handlers: Record Promise> = {}; const originalStateDir = process.env.OPENCLAW_STATE_DIR; -const hasControlCommand = () => false; -const resolveInboundDebounceMs = () => 0; -const createInboundDebouncer = () => ({ - run: async (fn: () => Promise) => await fn(), -}); -const createMonitorRuntime = () => - ({ - channel: { - debounce: { - createInboundDebouncer, - resolveInboundDebounceMs, - }, - text: { - hasControlCommand, - }, - }, - }) as never; - -vi.mock("./client.js", () => ({ - createEventDispatcher: createEventDispatcherMock, -})); - -vi.mock("./monitor.transport.js", () => ({ - monitorWebSocket: monitorWebSocketMock, - monitorWebhook: monitorWebhookMock, -})); - vi.mock("./bot.js", () => { return { handleFeishuMessage: handleFeishuMessageMock, @@ -58,25 +24,6 @@ vi.mock("./send.js", () => { }; }); -vi.mock("./thread-bindings.js", () => ({ - createFeishuThreadBindingManager: createFeishuThreadBindingManagerMock, -})); - -function buildAccount(): ResolvedFeishuAccount { - return { - accountId: "default", - enabled: true, - configured: true, - appId: "cli_test", - appSecret: "secret_test", // pragma: allowlist secret - domain: "feishu", - config: { - enabled: true, - connectionMode: "websocket", - }, - } as ResolvedFeishuAccount; -} - function createBotMenuEvent(params: { eventKey: string; timestamp: string }) { return { event_key: params.eventKey, @@ -92,37 +39,23 @@ function createBotMenuEvent(params: { eventKey: string; timestamp: string }) { } async function registerHandlers() { - setFeishuRuntime(createMonitorRuntime()); - const register = vi.fn((registered: Record Promise>) => { - handlers = registered; - }); - createEventDispatcherMock.mockReturnValue({ register }); - - await monitorSingleAccount({ + return createFeishuBotMenuHandler({ cfg: {} as ClawdbotConfig, - account: buildAccount(), + accountId: "default", runtime: { log: vi.fn(), error: vi.fn(), exit: vi.fn(), - } as RuntimeEnv, - botOpenIdSource: { - kind: "prefetched", - botOpenId: "ou_bot", - botName: "Bot", }, + chatHistories: new Map(), + fireAndForget: true, + getBotOpenId: () => "ou_bot", + getBotName: () => "Bot", }); - - const onBotMenu = handlers["application.bot.menu_v6"]; - if (!onBotMenu) { - throw new Error("missing application.bot.menu_v6 handler"); - } - return onBotMenu; } describe("Feishu bot menu handler", () => { beforeEach(() => { - handlers = {}; vi.clearAllMocks(); process.env.OPENCLAW_STATE_DIR = `/tmp/openclaw-feishu-bot-menu-test-${Date.now()}-${Math.random().toString(36).slice(2)}`; }); diff --git a/extensions/feishu/src/monitor.comment-notice-handler.ts b/extensions/feishu/src/monitor.comment-notice-handler.ts new file mode 100644 index 00000000000..bb359f81533 --- /dev/null +++ b/extensions/feishu/src/monitor.comment-notice-handler.ts @@ -0,0 +1,105 @@ +import type { ClawdbotConfig, RuntimeEnv } from "../runtime-api.js"; +import { handleFeishuCommentEvent } from "./comment-handler.js"; +import { + claimUnprocessedFeishuMessage, + recordProcessedFeishuMessage, + releaseFeishuMessageProcessing, +} from "./dedup.js"; +import { parseFeishuDriveCommentNoticeEventPayload } from "./monitor.comment.js"; +import { botOpenIds } from "./monitor.state.js"; +import { isFeishuRetryableSyntheticEventError } from "./monitor.synthetic-error.js"; +import { createSequentialQueue } from "./sequential-queue.js"; + +function buildCommentNoticeQueueKey(event: { + notice_meta?: { + file_type?: string; + file_token?: string; + }; +}): string { + const fileType = event.notice_meta?.file_type?.trim() || "unknown"; + const fileToken = event.notice_meta?.file_token?.trim() || "unknown"; + return `comment-doc:${fileType}:${fileToken}`; +} + +export function createFeishuDriveCommentNoticeHandler(params: { + cfg: ClawdbotConfig; + accountId: string; + runtime?: RuntimeEnv; + fireAndForget?: boolean; + getBotOpenId?: (accountId: string) => string | undefined; +}): (data: unknown) => Promise { + const { cfg, accountId, runtime, fireAndForget } = params; + const log = runtime?.log ?? console.log; + const error = runtime?.error ?? console.error; + const enqueue = createSequentialQueue(); + const getBotOpenId = params.getBotOpenId ?? ((id) => botOpenIds.get(id)); + + const runFeishuHandler = async (task: () => Promise) => { + const promise = task().catch((err) => { + error(`feishu[${accountId}]: error handling drive comment notice: ${String(err)}`); + }); + if (!fireAndForget) { + await promise; + } + }; + + return async (data: unknown) => { + await runFeishuHandler(async () => { + const event = parseFeishuDriveCommentNoticeEventPayload(data); + if (!event) { + error(`feishu[${accountId}]: ignoring malformed drive comment notice payload`); + return; + } + const eventId = event.event_id?.trim(); + const syntheticMessageId = eventId ? `drive-comment:${eventId}` : undefined; + if (syntheticMessageId) { + const claim = await claimUnprocessedFeishuMessage({ + messageId: syntheticMessageId, + namespace: accountId, + log, + }); + if (claim === "duplicate") { + log(`feishu[${accountId}]: dropping duplicate comment event ${syntheticMessageId}`); + return; + } + if (claim === "inflight") { + log(`feishu[${accountId}]: dropping in-flight comment event ${syntheticMessageId}`); + return; + } + } + log( + `feishu[${accountId}]: received drive comment notice ` + + `event=${event.event_id ?? "unknown"} ` + + `type=${event.notice_meta?.notice_type ?? "unknown"} ` + + `file=${event.notice_meta?.file_type ?? "unknown"}:${event.notice_meta?.file_token ?? "unknown"} ` + + `comment=${event.comment_id ?? "unknown"} ` + + `reply=${event.reply_id ?? "none"} ` + + `from=${event.notice_meta?.from_user_id?.open_id ?? "unknown"} ` + + `mentioned=${event.is_mentioned === true ? "yes" : "no"}`, + ); + try { + await enqueue(buildCommentNoticeQueueKey(event), async () => { + await handleFeishuCommentEvent({ + cfg, + accountId, + event, + botOpenId: getBotOpenId(accountId), + runtime, + }); + }); + if (syntheticMessageId) { + await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); + } + } catch (err) { + if (syntheticMessageId && !isFeishuRetryableSyntheticEventError(err)) { + await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); + } + throw err; + } finally { + if (syntheticMessageId) { + releaseFeishuMessageProcessing(syntheticMessageId, accountId); + } + } + }); + }; +} diff --git a/extensions/feishu/src/monitor.comment.test.ts b/extensions/feishu/src/monitor.comment.test.ts index 1c825322409..f3237c99c18 100644 --- a/extensions/feishu/src/monitor.comment.test.ts +++ b/extensions/feishu/src/monitor.comment.test.ts @@ -1,34 +1,21 @@ -import { - createInboundDebouncer, - resolveInboundDebounceMs, -} from "openclaw/plugin-sdk/channel-inbound-debounce"; -import { hasControlCommand } from "openclaw/plugin-sdk/command-detection"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createNonExitingRuntimeEnv } from "../../../test/helpers/plugins/runtime-env.js"; -import type { ClawdbotConfig, PluginRuntime, RuntimeEnv } from "../runtime-api.js"; +import type { ClawdbotConfig } from "../runtime-api.js"; import * as dedup from "./dedup.js"; -import { monitorSingleAccount } from "./monitor.account.js"; +import { createFeishuDriveCommentNoticeHandler } from "./monitor.comment-notice-handler.js"; import { resolveDriveCommentEventTurn, type FeishuDriveCommentNoticeEvent, } from "./monitor.comment.js"; -import { setFeishuRuntime } from "./runtime.js"; -import type { ResolvedFeishuAccount } from "./types.js"; const handleFeishuCommentEventMock = vi.hoisted(() => vi.fn(async () => {})); -const createEventDispatcherMock = vi.hoisted(() => vi.fn()); const createFeishuClientMock = vi.hoisted(() => vi.fn()); -const monitorWebSocketMock = vi.hoisted(() => vi.fn(async () => {})); -const monitorWebhookMock = vi.hoisted(() => vi.fn(async () => {})); -const createFeishuThreadBindingManagerMock = vi.hoisted(() => vi.fn(() => ({ stop: vi.fn() }))); -let handlers: Record Promise> = {}; let lastRuntime: ReturnType | null = null; const TEST_DOC_TOKEN = "ZsJfdxrBFo0RwuxteOLc1Ekvneb"; const TEST_WIKI_TOKEN = "OtYpd5pKOoMeQzxrzkocv9KIn4H"; vi.mock("./client.js", () => ({ - createEventDispatcher: createEventDispatcherMock, createFeishuClient: createFeishuClientMock, })); @@ -36,15 +23,6 @@ vi.mock("./comment-handler.js", () => ({ handleFeishuCommentEvent: handleFeishuCommentEventMock, })); -vi.mock("./monitor.transport.js", () => ({ - monitorWebSocket: monitorWebSocketMock, - monitorWebhook: monitorWebhookMock, -})); - -vi.mock("./thread-bindings.js", () => ({ - createFeishuThreadBindingManager: createFeishuThreadBindingManagerMock, -})); - function buildMonitorConfig(): ClawdbotConfig { return { channels: { @@ -55,39 +33,6 @@ function buildMonitorConfig(): ClawdbotConfig { } as ClawdbotConfig; } -function buildMonitorAccount(): ResolvedFeishuAccount { - return { - accountId: "default", - enabled: true, - configured: true, - appId: "cli_test", - appSecret: "secret_test", // pragma: allowlist secret - domain: "feishu", - config: { - enabled: true, - connectionMode: "websocket", - }, - } as ResolvedFeishuAccount; -} - -function createFeishuMonitorRuntime(params?: { - createInboundDebouncer?: PluginRuntime["channel"]["debounce"]["createInboundDebouncer"]; - resolveInboundDebounceMs?: PluginRuntime["channel"]["debounce"]["resolveInboundDebounceMs"]; - hasControlCommand?: PluginRuntime["channel"]["text"]["hasControlCommand"]; -}): PluginRuntime { - return { - channel: { - debounce: { - createInboundDebouncer: params?.createInboundDebouncer ?? createInboundDebouncer, - resolveInboundDebounceMs: params?.resolveInboundDebounceMs ?? resolveInboundDebounceMs, - }, - text: { - hasControlCommand: params?.hasControlCommand ?? hasControlCommand, - }, - }, - } as unknown as PluginRuntime; -} - function makeDriveCommentEvent( overrides: Partial = {}, ): FeishuDriveCommentNoticeEvent { @@ -250,27 +195,15 @@ function makeOpenApiClient(params: { } async function setupCommentMonitorHandler(): Promise<(data: unknown) => Promise> { - const register = vi.fn((registered: Record Promise>) => { - handlers = registered; - }); - createEventDispatcherMock.mockReturnValue({ register }); lastRuntime = createNonExitingRuntimeEnv(); - await monitorSingleAccount({ + return createFeishuDriveCommentNoticeHandler({ cfg: buildMonitorConfig(), - account: buildMonitorAccount(), - runtime: lastRuntime as RuntimeEnv, - botOpenIdSource: { - kind: "prefetched", - botOpenId: "ou_bot", - }, + accountId: "default", + runtime: lastRuntime, + fireAndForget: true, + getBotOpenId: () => "ou_bot", }); - - const handler = handlers["drive.notice.comment_add_v1"]; - if (!handler) { - throw new Error("missing drive.notice.comment_add_v1 handler"); - } - return handler; } describe("resolveDriveCommentEventTurn", () => { @@ -871,20 +804,12 @@ describe("resolveDriveCommentEventTurn", () => { describe("drive.notice.comment_add_v1 monitor handler", () => { beforeEach(() => { - handlers = {}; lastRuntime = null; handleFeishuCommentEventMock.mockClear(); - createEventDispatcherMock.mockReset(); createFeishuClientMock.mockReset().mockReturnValue(makeOpenApiClient({}) as never); - createFeishuThreadBindingManagerMock.mockReset().mockImplementation(() => ({ - stop: vi.fn(), - })); vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("claimed"); - vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); vi.spyOn(dedup, "releaseFeishuMessageProcessing").mockImplementation(() => {}); - vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false); - setFeishuRuntime(createFeishuMonitorRuntime()); }); afterEach(() => { diff --git a/extensions/feishu/src/monitor.comment.ts b/extensions/feishu/src/monitor.comment.ts index 76da23c4ea0..3d80972ec34 100644 --- a/extensions/feishu/src/monitor.comment.ts +++ b/extensions/feishu/src/monitor.comment.ts @@ -1,6 +1,5 @@ import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import type { ClawdbotConfig } from "../runtime-api.js"; -import { resolveFeishuAccount } from "./accounts.js"; import { raceWithTimeoutAndAbort } from "./async.js"; import { createFeishuClient } from "./client.js"; import { @@ -53,6 +52,7 @@ type ResolveDriveCommentEventParams = { cfg: ClawdbotConfig; accountId: string; event: FeishuDriveCommentNoticeEvent; + account?: ResolvedFeishuAccount; botOpenId?: string; createClient?: (account: ResolvedFeishuAccount) => FeishuRequestClient; verificationTimeoutMs?: number; @@ -1224,8 +1224,9 @@ async function resolveDriveCommentEventCore(params: ResolveDriveCommentEventPara cfg, accountId, event, + account, botOpenId, - createClient = (account) => createFeishuClient(account) as FeishuRequestClient, + createClient, verificationTimeoutMs = FEISHU_COMMENT_VERIFY_TIMEOUT_MS, logger, waitMs = delayMs, @@ -1262,8 +1263,11 @@ async function resolveDriveCommentEventCore(params: ResolveDriveCommentEventPara return null; } - const account = resolveFeishuAccount({ cfg, accountId }); - const client = createClient(account); + const client = createClient + ? createClient(account ?? ({ accountId } as ResolvedFeishuAccount)) + : (createFeishuClient( + (await import("./accounts.js")).resolveFeishuAccount({ cfg, accountId }), + ) as FeishuRequestClient); const context = await fetchDriveCommentContext({ client, fileToken, diff --git a/extensions/feishu/src/monitor.synthetic-error.ts b/extensions/feishu/src/monitor.synthetic-error.ts new file mode 100644 index 00000000000..873adb14cfd --- /dev/null +++ b/extensions/feishu/src/monitor.synthetic-error.ts @@ -0,0 +1,18 @@ +export class FeishuRetryableSyntheticEventError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "FeishuRetryableSyntheticEventError"; + } +} + +export function isFeishuRetryableSyntheticEventError( + error: unknown, +): error is FeishuRetryableSyntheticEventError { + return ( + error instanceof FeishuRetryableSyntheticEventError || + (typeof error === "object" && + error !== null && + "name" in error && + error.name === "FeishuRetryableSyntheticEventError") + ); +}