perf(test): narrow feishu reply lifecycle

This commit is contained in:
Peter Steinberger
2026-04-20 17:57:24 +01:00
parent 78cf0e95ad
commit 164f0feddf
9 changed files with 573 additions and 431 deletions

View File

@@ -1,17 +1,15 @@
import os from "node:os";
import path from "node:path";
import { createDedupeCache, createPersistentDedupe } from "./dedup-runtime-api.js";
import { createPersistentDedupe } from "./dedup-runtime-api.js";
import {
releaseFeishuMessageProcessing,
tryBeginFeishuMessageProcessing,
} from "./processing-claims.js";
// Persistent TTL: 24 hours — survives restarts & WebSocket reconnects.
const DEDUP_TTL_MS = 24 * 60 * 60 * 1000;
const MEMORY_MAX_SIZE = 1_000;
const FILE_MAX_ENTRIES = 10_000;
const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000;
const EVENT_MEMORY_MAX_SIZE = 2_000;
const processingClaims = createDedupeCache({
ttlMs: EVENT_DEDUP_TTL_MS,
maxSize: EVENT_MEMORY_MAX_SIZE,
});
function resolveStateDirFromEnv(env: NodeJS.ProcessEnv = process.env): string {
const stateOverride = env.OPENCLAW_STATE_DIR?.trim();
@@ -36,35 +34,12 @@ const persistentDedupe = createPersistentDedupe({
resolveFilePath: resolveNamespaceFilePath,
});
function resolveEventDedupeKey(
namespace: string,
messageId: string | undefined | null,
): string | null {
const trimmed = messageId?.trim();
if (!trimmed) {
return null;
}
return `${namespace}:${trimmed}`;
}
function normalizeMessageId(messageId: string | undefined | null): string | null {
const trimmed = messageId?.trim();
return trimmed ? trimmed : null;
}
export function tryBeginFeishuMessageProcessing(
messageId: string | undefined | null,
namespace = "global",
): boolean {
return !processingClaims.check(resolveEventDedupeKey(namespace, messageId));
}
export function releaseFeishuMessageProcessing(
messageId: string | undefined | null,
namespace = "global",
): void {
processingClaims.delete(resolveEventDedupeKey(namespace, messageId));
}
export { releaseFeishuMessageProcessing, tryBeginFeishuMessageProcessing };
export async function claimUnprocessedFeishuMessage(params: {
messageId: string | undefined | null;

View File

@@ -89,11 +89,25 @@ const {
sendCardFeishuMock,
} = feishuLifecycleTestMocks;
vi.mock("./client.js", async () => {
const actual = await vi.importActual<typeof import("./client.js")>("./client.js");
vi.mock("./client.js", () => {
return {
...actual,
FEISHU_HTTP_TIMEOUT_ENV_VAR: "OPENCLAW_FEISHU_HTTP_TIMEOUT_MS",
FEISHU_HTTP_TIMEOUT_MAX_MS: 300_000,
FEISHU_HTTP_TIMEOUT_MS: 30_000,
FEISHU_USER_AGENT: "openclaw-feishu-test",
clearClientCache: vi.fn(),
createFeishuClient: vi.fn(() => {
throw new Error("unexpected Feishu client call in lifecycle test");
}),
createFeishuWSClient: vi.fn(async () => ({
close: vi.fn(),
start: vi.fn(),
})),
createEventDispatcher: createEventDispatcherMock,
getFeishuClient: vi.fn(() => null),
getFeishuUserAgent: vi.fn(() => "openclaw-feishu-test"),
pluginVersion: "test",
setFeishuClientRuntimeForTest: vi.fn(),
};
});

View File

@@ -1,5 +1,5 @@
import * as crypto from "crypto";
import * as Lark from "@larksuiteoapi/node-sdk";
import type * as Lark from "@larksuiteoapi/node-sdk";
import type { ClawdbotConfig, RuntimeEnv, HistoryEntry } from "../runtime-api.js";
import { resolveFeishuAccount } from "./accounts.js";
import { raceWithTimeoutAndAbort } from "./async.js";
@@ -19,12 +19,11 @@ import {
hasProcessedFeishuMessage,
recordProcessedFeishuMessage,
releaseFeishuMessageProcessing,
tryBeginFeishuMessageProcessing,
warmupDedupFromDisk,
} from "./dedup.js";
import { isMentionForwardRequest } from "./mention.js";
import { applyBotIdentityState, startBotIdentityRecovery } from "./monitor.bot-identity.js";
import { parseFeishuDriveCommentNoticeEventPayload } from "./monitor.comment.js";
import { createFeishuMessageReceiveHandler } from "./monitor.message-handler.js";
import { fetchBotIdentityForMonitor } from "./monitor.startup.js";
import { botNames, botOpenIds } from "./monitor.state.js";
import { monitorWebhook, monitorWebSocket } from "./monitor.transport.js";
@@ -201,30 +200,6 @@ function readStringOrNumber(value: unknown): string | number | undefined {
return typeof value === "string" || typeof value === "number" ? value : undefined;
}
function parseFeishuMessageEventPayload(value: unknown): FeishuMessageEvent | null {
if (!isRecord(value)) {
return null;
}
const sender = value.sender;
const message = value.message;
if (!isRecord(sender) || !isRecord(message)) {
return null;
}
const senderId = sender.sender_id;
if (!isRecord(senderId)) {
return null;
}
const messageId = readString(message.message_id);
const chatId = readString(message.chat_id);
const chatType = normalizeFeishuChatType(message.chat_type);
const messageType = readString(message.message_type);
const content = readString(message.content);
if (!messageId || !chatId || !chatType || !messageType || !content) {
return null;
}
return value as FeishuMessageEvent;
}
function parseFeishuBotAddedEventPayload(value: unknown): FeishuBotAddedEvent | null {
if (!isRecord(value) || !readString(value.chat_id) || !isRecord(value.operator_id)) {
return null;
@@ -326,94 +301,14 @@ function buildCommentNoticeQueueKey(event: {
const fileToken = event.notice_meta?.file_token?.trim() || "unknown";
return `comment-doc:${fileType}:${fileToken}`;
}
function mergeFeishuDebounceMentions(
entries: FeishuMessageEvent[],
): FeishuMessageEvent["message"]["mentions"] | undefined {
const merged = new Map<string, NonNullable<FeishuMessageEvent["message"]["mentions"]>[number]>();
for (const entry of entries) {
for (const mention of entry.message.mentions ?? []) {
const stableId =
mention.id.open_id?.trim() || mention.id.user_id?.trim() || mention.id.union_id?.trim();
const mentionName = mention.name?.trim();
const mentionKey = mention.key?.trim();
const fallback =
mentionName && mentionKey ? `${mentionName}|${mentionKey}` : mentionName || mentionKey;
const key = stableId || fallback;
if (!key || merged.has(key)) {
continue;
}
merged.set(key, mention);
}
}
if (merged.size === 0) {
return undefined;
}
return Array.from(merged.values());
}
function dedupeFeishuDebounceEntriesByMessageId(
entries: FeishuMessageEvent[],
): FeishuMessageEvent[] {
const seen = new Set<string>();
const deduped: FeishuMessageEvent[] = [];
for (const entry of entries) {
const messageId = entry.message.message_id?.trim();
if (!messageId) {
deduped.push(entry);
continue;
}
if (seen.has(messageId)) {
continue;
}
seen.add(messageId);
deduped.push(entry);
}
return deduped;
}
function resolveFeishuDebounceMentions(params: {
entries: FeishuMessageEvent[];
botOpenId?: string;
}): FeishuMessageEvent["message"]["mentions"] | undefined {
const { entries, botOpenId } = params;
if (entries.length === 0) {
return undefined;
}
for (let index = entries.length - 1; index >= 0; index -= 1) {
const entry = entries[index];
if (isMentionForwardRequest(entry, botOpenId)) {
// Keep mention-forward semantics scoped to a single source message.
return mergeFeishuDebounceMentions([entry]);
}
}
const merged = mergeFeishuDebounceMentions(entries);
if (!merged) {
return undefined;
}
const normalizedBotOpenId = botOpenId?.trim();
if (!normalizedBotOpenId) {
return undefined;
}
const botMentions = merged.filter(
(mention) => mention.id.open_id?.trim() === normalizedBotOpenId,
);
return botMentions.length > 0 ? botMentions : undefined;
}
function registerEventHandlers(
eventDispatcher: Lark.EventDispatcher,
context: RegisterEventHandlersContext,
): void {
const { cfg, accountId, runtime, chatHistories, fireAndForget } = context;
const core = getFeishuRuntime();
const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({
cfg,
channel: "feishu",
});
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
// Keep normal Feishu traffic FIFO per chat while allowing explicit out-of-band
// commands like /btw and /stop to bypass the busy main-chat lane.
// Non-message lifecycle events still share FIFO execution by resource key.
const enqueue = createSequentialQueue();
const runFeishuHandler = async (params: { task: () => Promise<void>; errorMessage: string }) => {
if (fireAndForget) {
@@ -428,170 +323,24 @@ function registerEventHandlers(
error(`${params.errorMessage}: ${String(err)}`);
}
};
const dispatchFeishuMessage = async (event: FeishuMessageEvent) => {
const sequentialKey = getFeishuSequentialKey({
accountId,
event,
botOpenId: botOpenIds.get(accountId),
botName: botNames.get(accountId),
});
const task = () =>
handleFeishuMessage({
cfg,
event,
botOpenId: botOpenIds.get(accountId),
botName: botNames.get(accountId),
runtime,
chatHistories,
accountId,
processingClaimHeld: true,
});
await enqueue(sequentialKey, task);
};
const resolveSenderDebounceId = (event: FeishuMessageEvent): string | undefined => {
const senderId =
event.sender.sender_id.open_id?.trim() || event.sender.sender_id.user_id?.trim();
return senderId || undefined;
};
const resolveDebounceText = (event: FeishuMessageEvent): string => {
const botOpenId = botOpenIds.get(accountId);
const parsed = parseFeishuMessageEvent(event, botOpenId, botNames.get(accountId));
return parsed.content.trim();
};
const recordSuppressedMessageIds = async (
entries: FeishuMessageEvent[],
dispatchMessageId?: string,
) => {
const keepMessageId = dispatchMessageId?.trim();
const suppressedIds = new Set(
entries
.map((entry) => entry.message.message_id?.trim())
.filter((id): id is string => Boolean(id) && (!keepMessageId || id !== keepMessageId)),
);
if (suppressedIds.size === 0) {
return;
}
for (const messageId of suppressedIds) {
try {
await recordProcessedFeishuMessage(messageId, accountId, log);
} catch (err) {
error(
`feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`,
);
}
}
};
const isMessageAlreadyProcessed = async (entry: FeishuMessageEvent): Promise<boolean> => {
return await hasProcessedFeishuMessage(entry.message.message_id, accountId, log);
};
const inboundDebouncer = core.channel.debounce.createInboundDebouncer<FeishuMessageEvent>({
debounceMs: inboundDebounceMs,
buildKey: (event) => {
const chatId = event.message.chat_id?.trim();
const senderId = resolveSenderDebounceId(event);
if (!chatId || !senderId) {
return null;
}
const rootId = event.message.root_id?.trim();
const threadKey = rootId ? `thread:${rootId}` : "chat";
return `feishu:${accountId}:${chatId}:${threadKey}:${senderId}`;
},
shouldDebounce: (event) => {
if (event.message.message_type !== "text") {
return false;
}
const text = resolveDebounceText(event);
if (!text) {
return false;
}
return !core.channel.text.hasControlCommand(text, cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
await dispatchFeishuMessage(last);
return;
}
const dedupedEntries = dedupeFeishuDebounceEntriesByMessageId(entries);
const freshEntries: FeishuMessageEvent[] = [];
for (const entry of dedupedEntries) {
if (!(await isMessageAlreadyProcessed(entry))) {
freshEntries.push(entry);
}
}
const dispatchEntry = freshEntries.at(-1);
if (!dispatchEntry) {
return;
}
await recordSuppressedMessageIds(dedupedEntries, dispatchEntry.message.message_id);
const combinedText = freshEntries
.map((entry) => resolveDebounceText(entry))
.filter(Boolean)
.join("\n");
const mergedMentions = resolveFeishuDebounceMentions({
entries: freshEntries,
botOpenId: botOpenIds.get(accountId),
});
if (!combinedText.trim()) {
await dispatchFeishuMessage({
...dispatchEntry,
message: {
...dispatchEntry.message,
mentions: mergedMentions ?? dispatchEntry.message.mentions,
},
});
return;
}
await dispatchFeishuMessage({
...dispatchEntry,
message: {
...dispatchEntry.message,
message_type: "text",
content: JSON.stringify({ text: combinedText }),
mentions: mergedMentions ?? dispatchEntry.message.mentions,
},
});
},
onError: (err, entries) => {
for (const entry of entries) {
releaseFeishuMessageProcessing(entry.message.message_id, accountId);
}
error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`);
},
});
eventDispatcher.register({
"im.message.receive_v1": async (data) => {
const event = parseFeishuMessageEventPayload(data);
if (!event) {
error(`feishu[${accountId}]: ignoring malformed message event payload`);
return;
}
const messageId = event.message?.message_id?.trim();
if (!tryBeginFeishuMessageProcessing(messageId, accountId)) {
log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`);
return;
}
const processMessage = async () => {
await inboundDebouncer.enqueue(event);
};
if (fireAndForget) {
void processMessage().catch((err) => {
releaseFeishuMessageProcessing(messageId, accountId);
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
});
return;
}
try {
await processMessage();
} catch (err) {
releaseFeishuMessageProcessing(messageId, accountId);
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
}
},
"im.message.receive_v1": createFeishuMessageReceiveHandler({
cfg,
core: getFeishuRuntime(),
accountId,
runtime,
chatHistories,
fireAndForget,
handleMessage: handleFeishuMessage,
resolveDebounceText: ({ event, botOpenId, botName }) =>
parseFeishuMessageEvent(event, botOpenId, botName).content,
hasProcessedMessage: hasProcessedFeishuMessage,
recordProcessedMessage: recordProcessedFeishuMessage,
getBotOpenId: (id) => botOpenIds.get(id),
getBotName: (id) => botNames.get(id),
resolveSequentialKey: getFeishuSequentialKey,
}),
"im.message.message_read_v1": async () => {
// Ignore read receipts
},

View File

@@ -0,0 +1,337 @@
import type { ClawdbotConfig, HistoryEntry, PluginRuntime, RuntimeEnv } from "../runtime-api.js";
import type { FeishuMessageEvent } from "./event-types.js";
import { isMentionForwardRequest } from "./mention.js";
import {
releaseFeishuMessageProcessing,
tryBeginFeishuMessageProcessing,
} from "./processing-claims.js";
import { createSequentialQueue } from "./sequential-queue.js";
import type { FeishuChatType } from "./types.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function readString(value: unknown): string | undefined {
return typeof value === "string" ? value : undefined;
}
type FeishuMessageReceiveHandlerContext = {
cfg: ClawdbotConfig;
core: PluginRuntime;
accountId: string;
runtime?: RuntimeEnv;
chatHistories: Map<string, HistoryEntry[]>;
fireAndForget?: boolean;
handleMessage: (params: {
cfg: ClawdbotConfig;
event: FeishuMessageEvent;
botOpenId?: string;
botName?: string;
runtime?: RuntimeEnv;
chatHistories?: Map<string, HistoryEntry[]>;
accountId?: string;
processingClaimHeld?: boolean;
}) => Promise<void>;
resolveDebounceText: (params: {
event: FeishuMessageEvent;
botOpenId?: string;
botName?: string;
}) => string;
hasProcessedMessage: (
messageId: string | undefined | null,
namespace: string,
log?: (...args: unknown[]) => void,
) => Promise<boolean>;
recordProcessedMessage: (
messageId: string | undefined | null,
namespace: string,
log?: (...args: unknown[]) => void,
) => Promise<boolean>;
getBotOpenId?: (accountId: string) => string | undefined;
getBotName?: (accountId: string) => string | undefined;
resolveSequentialKey?: (params: {
accountId: string;
event: FeishuMessageEvent;
botOpenId?: string;
botName?: string;
}) => string;
};
function normalizeFeishuChatType(value: unknown): FeishuChatType | undefined {
return value === "group" || value === "private" || value === "p2p" ? value : undefined;
}
function parseFeishuMessageEventPayload(value: unknown): FeishuMessageEvent | null {
if (!isRecord(value)) {
return null;
}
const sender = value.sender;
const message = value.message;
if (!isRecord(sender) || !isRecord(message)) {
return null;
}
const senderId = sender.sender_id;
if (!isRecord(senderId)) {
return null;
}
const messageId = readString(message.message_id);
const chatId = readString(message.chat_id);
const chatType = normalizeFeishuChatType(message.chat_type);
const messageType = readString(message.message_type);
const content = readString(message.content);
if (!messageId || !chatId || !chatType || !messageType || !content) {
return null;
}
return value as FeishuMessageEvent;
}
function mergeFeishuDebounceMentions(
entries: FeishuMessageEvent[],
): FeishuMessageEvent["message"]["mentions"] | undefined {
const merged = new Map<string, NonNullable<FeishuMessageEvent["message"]["mentions"]>[number]>();
for (const entry of entries) {
for (const mention of entry.message.mentions ?? []) {
const stableId =
mention.id.open_id?.trim() || mention.id.user_id?.trim() || mention.id.union_id?.trim();
const mentionName = mention.name?.trim();
const mentionKey = mention.key?.trim();
const fallback =
mentionName && mentionKey ? `${mentionName}|${mentionKey}` : mentionName || mentionKey;
const key = stableId || fallback;
if (!key || merged.has(key)) {
continue;
}
merged.set(key, mention);
}
}
return merged.size > 0 ? Array.from(merged.values()) : undefined;
}
function dedupeFeishuDebounceEntriesByMessageId(
entries: FeishuMessageEvent[],
): FeishuMessageEvent[] {
const seen = new Set<string>();
const deduped: FeishuMessageEvent[] = [];
for (const entry of entries) {
const messageId = entry.message.message_id?.trim();
if (!messageId) {
deduped.push(entry);
continue;
}
if (seen.has(messageId)) {
continue;
}
seen.add(messageId);
deduped.push(entry);
}
return deduped;
}
function resolveFeishuDebounceMentions(params: {
entries: FeishuMessageEvent[];
botOpenId?: string;
}): FeishuMessageEvent["message"]["mentions"] | undefined {
const { entries, botOpenId } = params;
if (entries.length === 0) {
return undefined;
}
for (let index = entries.length - 1; index >= 0; index -= 1) {
const entry = entries[index];
if (isMentionForwardRequest(entry, botOpenId)) {
return mergeFeishuDebounceMentions([entry]);
}
}
const merged = mergeFeishuDebounceMentions(entries);
if (!merged) {
return undefined;
}
const normalizedBotOpenId = botOpenId?.trim();
if (!normalizedBotOpenId) {
return undefined;
}
const botMentions = merged.filter(
(mention) => mention.id.open_id?.trim() === normalizedBotOpenId,
);
return botMentions.length > 0 ? botMentions : undefined;
}
export function createFeishuMessageReceiveHandler({
cfg,
core,
accountId,
runtime,
chatHistories,
fireAndForget,
handleMessage,
resolveDebounceText: resolveText,
hasProcessedMessage,
recordProcessedMessage,
getBotOpenId = () => undefined,
getBotName = () => undefined,
resolveSequentialKey = ({ accountId, event }) =>
`feishu:${accountId}:${event.message.chat_id?.trim() || "unknown"}`,
}: FeishuMessageReceiveHandlerContext): (data: unknown) => Promise<void> {
const inboundDebounceMs = core.channel.debounce.resolveInboundDebounceMs({
cfg,
channel: "feishu",
});
const log = runtime?.log ?? console.log;
const error = runtime?.error ?? console.error;
const enqueue = createSequentialQueue();
const dispatchFeishuMessage = async (event: FeishuMessageEvent) => {
const sequentialKey = resolveSequentialKey({
accountId,
event,
botOpenId: getBotOpenId(accountId),
botName: getBotName(accountId),
});
const task = () =>
handleMessage({
cfg,
event,
botOpenId: getBotOpenId(accountId),
botName: getBotName(accountId),
runtime,
chatHistories,
accountId,
processingClaimHeld: true,
});
await enqueue(sequentialKey, task);
};
const resolveSenderDebounceId = (event: FeishuMessageEvent): string | undefined => {
const senderId =
event.sender.sender_id.open_id?.trim() || event.sender.sender_id.user_id?.trim();
return senderId || undefined;
};
const resolveDebounceText = (event: FeishuMessageEvent): string => {
return resolveText({
event,
botOpenId: getBotOpenId(accountId),
botName: getBotName(accountId),
}).trim();
};
const recordSuppressedMessageIds = async (
entries: FeishuMessageEvent[],
dispatchMessageId?: string,
) => {
const keepMessageId = dispatchMessageId?.trim();
const suppressedIds = new Set(
entries
.map((entry) => entry.message.message_id?.trim())
.filter((id): id is string => Boolean(id) && (!keepMessageId || id !== keepMessageId)),
);
for (const messageId of suppressedIds) {
try {
await recordProcessedMessage(messageId, accountId, log);
} catch (err) {
error(
`feishu[${accountId}]: failed to record merged dedupe id ${messageId}: ${String(err)}`,
);
}
}
};
const inboundDebouncer = core.channel.debounce.createInboundDebouncer<FeishuMessageEvent>({
debounceMs: inboundDebounceMs,
buildKey: (event) => {
const chatId = event.message.chat_id?.trim();
const senderId = resolveSenderDebounceId(event);
if (!chatId || !senderId) {
return null;
}
const rootId = event.message.root_id?.trim();
const threadKey = rootId ? `thread:${rootId}` : "chat";
return `feishu:${accountId}:${chatId}:${threadKey}:${senderId}`;
},
shouldDebounce: (event) => {
if (event.message.message_type !== "text") {
return false;
}
const text = resolveDebounceText(event);
return Boolean(text) && !core.channel.text.hasControlCommand(text, cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
await dispatchFeishuMessage(last);
return;
}
const dedupedEntries = dedupeFeishuDebounceEntriesByMessageId(entries);
const freshEntries: FeishuMessageEvent[] = [];
for (const entry of dedupedEntries) {
if (!(await hasProcessedMessage(entry.message.message_id, accountId, log))) {
freshEntries.push(entry);
}
}
const dispatchEntry = freshEntries.at(-1);
if (!dispatchEntry) {
return;
}
await recordSuppressedMessageIds(dedupedEntries, dispatchEntry.message.message_id);
const combinedText = freshEntries
.map((entry) => resolveDebounceText(entry))
.filter(Boolean)
.join("\n");
const mergedMentions = resolveFeishuDebounceMentions({
entries: freshEntries,
botOpenId: getBotOpenId(accountId),
});
await dispatchFeishuMessage({
...dispatchEntry,
message: {
...dispatchEntry.message,
...(combinedText.trim()
? {
message_type: "text",
content: JSON.stringify({ text: combinedText }),
}
: {}),
mentions: mergedMentions ?? dispatchEntry.message.mentions,
},
});
},
onError: (err, entries) => {
for (const entry of entries) {
releaseFeishuMessageProcessing(entry.message.message_id, accountId);
}
error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`);
},
});
return async (data) => {
const event = parseFeishuMessageEventPayload(data);
if (!event) {
error(`feishu[${accountId}]: ignoring malformed message event payload`);
return;
}
const messageId = event.message?.message_id?.trim();
if (!tryBeginFeishuMessageProcessing(messageId, accountId)) {
log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`);
return;
}
const processMessage = async () => {
await inboundDebouncer.enqueue(event);
};
if (fireAndForget) {
void processMessage().catch((err) => {
releaseFeishuMessageProcessing(messageId, accountId);
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
});
return;
}
try {
await processMessage();
} catch (err) {
releaseFeishuMessageProcessing(messageId, accountId);
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
}
};
}

View File

@@ -648,23 +648,27 @@ describe("Feishu inbound debounce regressions", () => {
it("uses latest fresh message id when debounce batch ends with stale retry", async () => {
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
const recordSpy = vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
setStaleRetryMocks();
setStaleRetryMocks("om_old_latest_fresh");
const onMessage = await setupDebounceMonitor();
await onMessage(createTextEvent({ messageId: "om_new", text: "fresh" }));
await onMessage(createTextEvent({ messageId: "om_new_latest_fresh", text: "fresh" }));
await Promise.resolve();
await Promise.resolve();
await onMessage(createTextEvent({ messageId: "om_old", text: "stale" }));
await onMessage(createTextEvent({ messageId: "om_old_latest_fresh", text: "stale" }));
await Promise.resolve();
await Promise.resolve();
await vi.advanceTimersByTimeAsync(25);
const dispatched = expectSingleDispatchedEvent();
expect(dispatched.message.message_id).toBe("om_new");
expect(dispatched.message.message_id).toBe("om_new_latest_fresh");
const combined = JSON.parse(dispatched.message.content) as { text?: string };
expect(combined.text).toBe("fresh");
expect(recordSpy).toHaveBeenCalledWith("om_old", "default", expect.any(Function));
expect(recordSpy).not.toHaveBeenCalledWith("om_new", "default", expect.any(Function));
expect(recordSpy).toHaveBeenCalledWith("om_old_latest_fresh", "default", expect.any(Function));
expect(recordSpy).not.toHaveBeenCalledWith(
"om_new_latest_fresh",
"default",
expect.any(Function),
);
});
it("releases early event dedupe when debounced dispatch fails", async () => {

View File

@@ -6,7 +6,6 @@ import {
createFeishuLifecycleConfig,
createFeishuLifecycleReplyDispatcher,
createFeishuTextMessageEvent,
createResolvedFeishuLifecycleAccount,
expectFeishuReplyDispatcherSentFinalReplyOnce,
expectFeishuReplyPipelineDedupedAcrossReplay,
expectFeishuReplyPipelineDedupedAfterPostSendFailure,
@@ -14,22 +13,20 @@ import {
mockFeishuReplyOnceDispatch,
restoreFeishuLifecycleStateDir,
setFeishuLifecycleStateDir,
setupFeishuLifecycleHandler,
setupFeishuMessageReceiveLifecycleHandler,
} from "./test-support/lifecycle-test-support.js";
const {
createEventDispatcherMock,
createFeishuReplyDispatcherMock,
dispatchReplyFromConfigMock,
finalizeInboundContextMock,
resolveAgentRouteMock,
resolveBoundConversationMock,
touchBindingMock,
withReplyDispatcherMock,
} = getFeishuLifecycleTestMocks();
let _handlers: Record<string, (data: unknown) => Promise<void>> = {};
let lastRuntime: ReturnType<typeof createRuntimeEnv> | null = null;
let lifecycleCore: ReturnType<typeof installFeishuLifecycleReplyRuntime>;
const handleMessageMock = vi.fn();
const originalStateDir = process.env.OPENCLAW_STATE_DIR;
const lifecycleConfig = createFeishuLifecycleConfig({
accountId: "acct-lifecycle",
@@ -47,34 +44,18 @@ const lifecycleConfig = createFeishuLifecycleConfig({
},
});
const lifecycleAccount = createResolvedFeishuLifecycleAccount({
accountId: "acct-lifecycle",
appId: "cli_test",
appSecret: "secret_test",
config: {
groupPolicy: "open",
groups: {
oc_group_1: {
requireMention: false,
groupSessionScope: "group_topic_sender",
replyInThread: "enabled",
},
},
},
});
async function setupLifecycleMonitor() {
lastRuntime = createRuntimeEnv();
return setupFeishuLifecycleHandler({
createEventDispatcherMock,
onRegister: (registered) => {
_handlers = registered;
},
return setupFeishuMessageReceiveLifecycleHandler({
runtime: lastRuntime,
core: lifecycleCore,
cfg: lifecycleConfig,
account: lifecycleAccount,
handlerKey: "im.message.receive_v1",
missingHandlerMessage: "missing im.message.receive_v1 handler",
accountId: "acct-lifecycle",
handleMessage: handleMessageMock,
resolveDebounceText: ({ event }) => {
const parsed = JSON.parse(event.message.content) as { text?: string };
return parsed.text ?? "";
},
});
}
@@ -82,17 +63,11 @@ describe("Feishu reply-once lifecycle", () => {
beforeEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
_handlers = {};
lastRuntime = null;
setFeishuLifecycleStateDir("openclaw-feishu-lifecycle");
createFeishuReplyDispatcherMock.mockReturnValue(createFeishuLifecycleReplyDispatcher());
resolveBoundConversationMock.mockReturnValue({
bindingId: "binding-1",
targetSessionKey: "agent:bound-agent:feishu:topic:om_root_topic_1:ou_sender_1",
});
resolveAgentRouteMock.mockReturnValue({
agentId: "main",
channel: "feishu",
@@ -108,8 +83,33 @@ describe("Feishu reply-once lifecycle", () => {
});
withReplyDispatcherMock.mockImplementation(async ({ run }) => await run());
handleMessageMock.mockImplementation(async ({ event }) => {
const reply = createFeishuReplyDispatcherMock({
accountId: "acct-lifecycle",
chatId: event.message.chat_id,
replyToMessageId: event.message.root_id ?? event.message.message_id,
replyInThread: true,
rootId: event.message.root_id,
});
try {
await withReplyDispatcherMock({
dispatcher: reply.dispatcher,
onSettled: () => reply.markDispatchIdle(),
run: () =>
dispatchReplyFromConfigMock({
ctx: {
AccountId: "acct-lifecycle",
MessageSid: event.message.message_id,
},
dispatcher: reply.dispatcher,
}),
});
} catch (err) {
lastRuntime?.error(`feishu[acct-lifecycle]: failed to dispatch message: ${String(err)}`);
}
});
installFeishuLifecycleReplyRuntime({
lifecycleCore = installFeishuLifecycleReplyRuntime({
resolveAgentRouteMock,
finalizeInboundContextMock,
dispatchReplyFromConfigMock,
@@ -141,6 +141,7 @@ describe("Feishu reply-once lifecycle", () => {
});
expect(lastRuntime?.error).not.toHaveBeenCalled();
expect(handleMessageMock).toHaveBeenCalledTimes(1);
expect(dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1);
expect(createFeishuReplyDispatcherMock).toHaveBeenCalledWith(
@@ -152,15 +153,6 @@ describe("Feishu reply-once lifecycle", () => {
rootId: "om_root_topic_1",
}),
);
expect(finalizeInboundContextMock).toHaveBeenCalledWith(
expect.objectContaining({
AccountId: "acct-lifecycle",
SessionKey: "agent:bound-agent:feishu:topic:om_root_topic_1:ou_sender_1",
MessageSid: "om_lifecycle_once",
MessageThreadId: "om_root_topic_1",
}),
);
expect(touchBindingMock).toHaveBeenCalledWith("binding-1");
expectFeishuReplyDispatcherSentFinalReplyOnce({ createFeishuReplyDispatcherMock });
});
@@ -187,6 +179,7 @@ describe("Feishu reply-once lifecycle", () => {
});
expect(lastRuntime?.error).toHaveBeenCalledTimes(1);
expect(handleMessageMock).toHaveBeenCalledTimes(1);
expect(dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expectFeishuReplyDispatcherSentFinalReplyOnce({ createFeishuReplyDispatcherMock });
});

View File

@@ -1,5 +1,5 @@
import * as http from "http";
import * as Lark from "@larksuiteoapi/node-sdk";
import type * as Lark from "@larksuiteoapi/node-sdk";
import {
createFixedWindowRateLimiter,
createWebhookAnomalyTracker,

View File

@@ -0,0 +1,59 @@
const EVENT_DEDUP_TTL_MS = 5 * 60 * 1000;
const EVENT_MEMORY_MAX_SIZE = 2_000;
const processingClaims = new Map<string, number>();
function resolveEventDedupeKey(
namespace: string,
messageId: string | undefined | null,
): string | null {
const trimmed = messageId?.trim();
return trimmed ? `${namespace}:${trimmed}` : null;
}
function pruneProcessingClaims(now: number): void {
const cutoff = now - EVENT_DEDUP_TTL_MS;
for (const [key, seenAt] of processingClaims) {
if (seenAt < cutoff) {
processingClaims.delete(key);
}
}
while (processingClaims.size > EVENT_MEMORY_MAX_SIZE) {
const oldestKey = processingClaims.keys().next().value;
if (!oldestKey) {
return;
}
processingClaims.delete(oldestKey);
}
}
export function tryBeginFeishuMessageProcessing(
messageId: string | undefined | null,
namespace = "global",
): boolean {
const key = resolveEventDedupeKey(namespace, messageId);
if (!key) {
return true;
}
const now = Date.now();
pruneProcessingClaims(now);
if (processingClaims.has(key)) {
processingClaims.delete(key);
processingClaims.set(key, now);
pruneProcessingClaims(now);
return false;
}
processingClaims.set(key, now);
pruneProcessingClaims(now);
return true;
}
export function releaseFeishuMessageProcessing(
messageId: string | undefined | null,
namespace = "global",
): void {
const key = resolveEventDedupeKey(namespace, messageId);
if (key) {
processingClaims.delete(key);
}
}

View File

@@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto";
import { expect, vi, type Mock } from "vitest";
import { createPluginRuntimeMock } from "../../../../test/helpers/plugins/plugin-runtime-mock.js";
import type { ClawdbotConfig, PluginRuntime, RuntimeEnv } from "../../runtime-api.js";
import { createFeishuMessageReceiveHandler } from "../monitor.message-handler.js";
import { setFeishuRuntime } from "../runtime.js";
import type { ResolvedFeishuAccount } from "../types.js";
@@ -105,45 +106,44 @@ export function installFeishuLifecycleRuntime(params: {
upsertPairingRequest?: PluginRuntime["channel"]["pairing"]["upsertPairingRequest"];
buildPairingReply?: PluginRuntime["channel"]["pairing"]["buildPairingReply"];
detectMime?: PluginRuntime["media"]["detectMime"];
}) {
setFeishuRuntime(
createPluginRuntimeMock({
channel: {
debounce: createImmediateInboundDebounce(),
text: {
hasControlCommand: params.hasControlCommand ?? vi.fn(() => false),
},
routing: {
resolveAgentRoute: params.resolveAgentRoute,
},
reply: {
resolveEnvelopeFormatOptions: vi.fn(() => ({})),
formatAgentEnvelope: vi.fn((value: { body: string }) => value.body),
finalizeInboundContext: params.finalizeInboundContext,
dispatchReplyFromConfig: params.dispatchReplyFromConfig,
withReplyDispatcher: params.withReplyDispatcher,
},
commands: {
shouldComputeCommandAuthorized:
params.shouldComputeCommandAuthorized ?? vi.fn(() => false),
resolveCommandAuthorizedFromAuthorizers:
params.resolveCommandAuthorizedFromAuthorizers ?? vi.fn(() => false),
},
session: {
readSessionUpdatedAt: vi.fn(),
resolveStorePath: params.resolveStorePath,
},
pairing: {
readAllowFromStore: params.readAllowFromStore ?? vi.fn().mockResolvedValue([]),
upsertPairingRequest: params.upsertPairingRequest ?? vi.fn(),
buildPairingReply: params.buildPairingReply ?? vi.fn(),
},
}): PluginRuntime {
const runtime = createPluginRuntimeMock({
channel: {
debounce: createImmediateInboundDebounce(),
text: {
hasControlCommand: params.hasControlCommand ?? vi.fn(() => false),
},
media: {
detectMime: params.detectMime ?? vi.fn(async () => "text/plain"),
routing: {
resolveAgentRoute: params.resolveAgentRoute,
},
}) as unknown as PluginRuntime,
);
reply: {
resolveEnvelopeFormatOptions: vi.fn(() => ({})),
formatAgentEnvelope: vi.fn((value: { body: string }) => value.body),
finalizeInboundContext: params.finalizeInboundContext,
dispatchReplyFromConfig: params.dispatchReplyFromConfig,
withReplyDispatcher: params.withReplyDispatcher,
},
commands: {
shouldComputeCommandAuthorized: params.shouldComputeCommandAuthorized ?? vi.fn(() => false),
resolveCommandAuthorizedFromAuthorizers:
params.resolveCommandAuthorizedFromAuthorizers ?? vi.fn(() => false),
},
session: {
readSessionUpdatedAt: vi.fn(),
resolveStorePath: params.resolveStorePath,
},
pairing: {
readAllowFromStore: params.readAllowFromStore ?? vi.fn().mockResolvedValue([]),
upsertPairingRequest: params.upsertPairingRequest ?? vi.fn(),
buildPairingReply: params.buildPairingReply ?? vi.fn(),
},
},
media: {
detectMime: params.detectMime ?? vi.fn(async () => "text/plain"),
},
}) as unknown as PluginRuntime;
setFeishuRuntime(runtime);
return runtime;
}
export function installFeishuLifecycleReplyRuntime(params: {
@@ -152,8 +152,8 @@ export function installFeishuLifecycleReplyRuntime(params: {
dispatchReplyFromConfigMock: unknown;
withReplyDispatcherMock: unknown;
storePath: string;
}) {
installFeishuLifecycleRuntime({
}): PluginRuntime {
return installFeishuLifecycleRuntime({
resolveAgentRoute:
params.resolveAgentRouteMock as PluginRuntime["channel"]["routing"]["resolveAgentRoute"],
finalizeInboundContext:
@@ -303,13 +303,13 @@ export async function replayFeishuLifecycleEvent(params: {
event: unknown;
waitForFirst: () => void | Promise<void>;
waitForSecond?: () => void | Promise<void>;
waitTimeoutMs?: number;
}) {
const waitOptions = { timeout: params.waitTimeoutMs ?? FEISHU_LIFECYCLE_WAIT_TIMEOUT_MS };
await params.handler(params.event);
await vi.waitFor(params.waitForFirst, { timeout: FEISHU_LIFECYCLE_WAIT_TIMEOUT_MS });
await vi.waitFor(params.waitForFirst, waitOptions);
await params.handler(params.event);
await vi.waitFor(params.waitForSecond ?? params.waitForFirst, {
timeout: FEISHU_LIFECYCLE_WAIT_TIMEOUT_MS,
});
await vi.waitFor(params.waitForSecond ?? params.waitForFirst, waitOptions);
}
export async function runFeishuLifecycleSequence(
@@ -351,21 +351,14 @@ export async function expectFeishuReplyPipelineDedupedAcrossReplay(params: {
await replayFeishuLifecycleEvent({
handler: params.handler,
event: params.event,
waitForFirst: () =>
vi.waitFor(
() => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
},
waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs },
),
waitForSecond: () =>
vi.waitFor(
() => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(params.createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1);
},
waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs },
),
waitTimeoutMs,
waitForFirst: () => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
},
waitForSecond: () => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(params.createFeishuReplyDispatcherMock).toHaveBeenCalledTimes(1);
},
});
}
@@ -380,22 +373,15 @@ export async function expectFeishuReplyPipelineDedupedAfterPostSendFailure(param
await replayFeishuLifecycleEvent({
handler: params.handler,
event: params.event,
waitForFirst: () =>
vi.waitFor(
() => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1);
},
waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs },
),
waitForSecond: () =>
vi.waitFor(
() => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1);
},
waitTimeoutMs == null ? undefined : { timeout: waitTimeoutMs },
),
waitTimeoutMs,
waitForFirst: () => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1);
},
waitForSecond: () => {
expect(params.dispatchReplyFromConfigMock).toHaveBeenCalledTimes(1);
expect(params.runtimeErrorMock).toHaveBeenCalledTimes(1);
},
});
}
@@ -413,6 +399,31 @@ async function loadMonitorSingleAccount() {
return module.monitorSingleAccount;
}
export async function setupFeishuMessageReceiveLifecycleHandler(params: {
runtime: RuntimeEnv;
core: PluginRuntime;
cfg: ClawdbotConfig;
accountId: string;
fireAndForget?: boolean;
handleMessage: Parameters<typeof createFeishuMessageReceiveHandler>[0]["handleMessage"];
resolveDebounceText: Parameters<
typeof createFeishuMessageReceiveHandler
>[0]["resolveDebounceText"];
}): Promise<(data: unknown) => Promise<void>> {
return createFeishuMessageReceiveHandler({
cfg: params.cfg,
core: params.core,
accountId: params.accountId,
runtime: params.runtime,
chatHistories: new Map(),
fireAndForget: params.fireAndForget,
handleMessage: params.handleMessage,
resolveDebounceText: params.resolveDebounceText,
hasProcessedMessage: vi.fn(async () => false),
recordProcessedMessage: vi.fn(async () => true),
});
}
export async function setupFeishuLifecycleHandler(params: {
createEventDispatcherMock: {
mockReturnValue: (value: unknown) => unknown;