diff --git a/CHANGELOG.md b/CHANGELOG.md index e0ae84b59d4..a3049314900 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Gateway/responses: emit every client tool call from `/v1/responses` JSON and SSE responses when the agent invokes multiple client tools in a single turn, so multi-tool plans, graph orchestration calls, and similar batched flows no longer drop every call but the last. Fixes #52288. Thanks @CharZhou and @bonelli. - Gateway/agent: enforce `session.sendPolicy=deny` on gateway agent requests only when `deliver: true`, so non-delivery smoke checks and internal agent runs are no longer rejected with `send blocked by session policy` while outbound delivery remains gated. Fixes #73381. Thanks @wenxu007. - Slack/reactions: treat missing no_reaction remove responses as idempotent success and route own-reaction cleanup through the remove helper, so concurrent cleanup no longer surfaces Slack race errors. Fixes #50733. (#76304) Thanks @martingarramon and @Hollychou924. +- Feishu: include media `file_key` and `image_key` values in inbound dedupe so reused message IDs still process distinct media attachments while true retries stay suppressed. Fixes #75057. Thanks @SymbolStar. - Control UI/Gateway: avoid full session-list reloads for locally applied message-phase session updates, carry known session keys through transcript-file update events, and defer media provider listing when explicit generation model config is present. Refs #76236, #76203, #76188, #76107, and #76166. Thanks @BunsDev. - Install/update: prune the obsolete `plugin-runtime-deps` state directory during packaged postinstall so upgrades from pre-2026.5.2 releases reclaim old bundled-plugin dependency caches without touching external plugin installs. - Auto-reply/queue: treat reset-triggered `/new` and `/reset` turns as interrupt runs across active-run queue handling, so steer/followup modes cannot delay a fresh session behind existing work. Fixes #74093. (#74144) Thanks @ruji9527 and @yelog. diff --git a/extensions/feishu/src/bot.test.ts b/extensions/feishu/src/bot.test.ts index c60377942d1..e195bf0a51e 100644 --- a/extensions/feishu/src/bot.test.ts +++ b/extensions/feishu/src/bot.test.ts @@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import type { ClawdbotConfig, PluginRuntime } from "../runtime-api.js"; import type { FeishuMessageEvent } from "./bot.js"; import { handleFeishuMessage } from "./bot.js"; +import { createFeishuMessageReceiveHandler } from "./monitor.message-handler.js"; import { setFeishuRuntime } from "./runtime.js"; type ConfiguredBindingRoute = ReturnType; @@ -3000,6 +3001,58 @@ describe("handleFeishuMessage command authorization", () => { expect(mockDispatchReplyFromConfig).toHaveBeenCalledTimes(1); }); + it("dedupes Feishu media by message_id plus file_key", async () => { + mockShouldComputeCommandAuthorized.mockReturnValue(false); + + const cfg: ClawdbotConfig = { + channels: { + feishu: { + dmPolicy: "open", + }, + }, + } as ClawdbotConfig; + const createAudioEvent = (fileKey: string): FeishuMessageEvent => ({ + sender: { + sender_id: { + open_id: "ou-audio-dedup", + }, + }, + message: { + message_id: "msg-audio-reused-id", + chat_id: "oc-dm", + chat_type: "p2p", + message_type: "audio", + content: JSON.stringify({ + file_key: fileKey, + duration: 1200, + }), + }, + }); + + await dispatchMessage({ cfg, event: createAudioEvent("file_audio_first") }); + await dispatchMessage({ cfg, event: createAudioEvent("file_audio_second") }); + await dispatchMessage({ cfg, event: createAudioEvent("file_audio_first") }); + + expect(mockDispatchReplyFromConfig).toHaveBeenCalledTimes(2); + expect(mockDownloadMessageResourceFeishu).toHaveBeenCalledTimes(2); + expect(mockDownloadMessageResourceFeishu).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + messageId: "msg-audio-reused-id", + fileKey: "file_audio_first", + type: "file", + }), + ); + expect(mockDownloadMessageResourceFeishu).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + messageId: "msg-audio-reused-id", + fileKey: "file_audio_second", + type: "file", + }), + ); + }); + it("skips empty-text messages with no media to prevent blank user turns in session (#74634)", async () => { // Feishu can deliver { "text": "" } events (empty-text or media-stripped // messages). Writing blank user content to the session causes downstream @@ -3038,3 +3091,73 @@ describe("handleFeishuMessage command authorization", () => { expect(mockDispatchReplyFromConfig).not.toHaveBeenCalled(); }); }); + +describe("createFeishuMessageReceiveHandler media dedupe", () => { + it("keeps same-id media variants distinct at receive time", async () => { + const handleMessage = vi.fn(async () => undefined); + const core = { + channel: { + debounce: { + resolveInboundDebounceMs: vi.fn(() => 0), + createInboundDebouncer: vi.fn( + (options: { onFlush: (entries: FeishuMessageEvent[]) => Promise | void }) => ({ + enqueue: async (event: FeishuMessageEvent) => { + await options.onFlush([event]); + }, + }), + ), + }, + text: { + hasControlCommand: vi.fn(() => false), + }, + }, + } as unknown as PluginRuntime; + const createAudioEvent = (fileKey: string): FeishuMessageEvent => ({ + sender: { + sender_id: { + open_id: "ou-audio-receive-dedup", + }, + }, + message: { + message_id: "msg-audio-receive-reused-id", + chat_id: "oc-dm", + chat_type: "p2p", + message_type: "audio", + content: JSON.stringify({ + file_key: fileKey, + duration: 1200, + }), + }, + }); + const handler = createFeishuMessageReceiveHandler({ + cfg: { channels: { feishu: { dmPolicy: "open" } } } as ClawdbotConfig, + core, + accountId: "receive-media-dedupe", + chatHistories: new Map(), + handleMessage, + resolveDebounceText: () => "", + hasProcessedMessage: vi.fn(async () => false), + recordProcessedMessage: vi.fn(async () => true), + }); + + await handler(createAudioEvent("file_audio_receive_first")); + await handler(createAudioEvent("file_audio_receive_second")); + await handler(createAudioEvent("file_audio_receive_first")); + + expect(handleMessage).toHaveBeenCalledTimes(2); + expect(handleMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + event: createAudioEvent("file_audio_receive_first"), + processingClaimHeld: true, + }), + ); + expect(handleMessage).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + event: createAudioEvent("file_audio_receive_second"), + processingClaimHeld: true, + }), + ); + }); +}); diff --git a/extensions/feishu/src/bot.ts b/extensions/feishu/src/bot.ts index 4f89b128837..06fe777ba57 100644 --- a/extensions/feishu/src/bot.ts +++ b/extensions/feishu/src/bot.ts @@ -42,6 +42,7 @@ import { type FeishuPermissionError, resolveFeishuSenderName } from "./bot-sende import { getChatInfo } from "./chat.js"; import { createFeishuClient } from "./client.js"; import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js"; +import { resolveFeishuMessageDedupeKey } from "./dedupe-key.js"; import { maybeCreateDynamicAgent } from "./dynamic-agent.js"; import { extractMentionTargets, isMentionForwardRequest } from "./mention.js"; import { @@ -409,9 +410,10 @@ export async function handleFeishuMessage(params: { const error = runtime?.error ?? console.error; const messageId = event.message.message_id; + const messageDedupeKey = resolveFeishuMessageDedupeKey(event); if ( !(await finalizeFeishuMessageProcessing({ - messageId, + messageId: messageDedupeKey, namespace: account.accountId, log, claimHeld: processingClaimHeld, @@ -1251,7 +1253,9 @@ export async function handleFeishuMessage(params: { // broadcast dispatch to avoid duplicate agent sessions and race conditions. // Uses a shared "broadcast" namespace (not per-account) so the first handler // to reach this point claims the message; subsequent accounts skip. - if (!(await tryRecordMessagePersistent(ctx.messageId, "broadcast", log))) { + if ( + !(await tryRecordMessagePersistent(messageDedupeKey ?? ctx.messageId, "broadcast", log)) + ) { log( `feishu[${account.accountId}]: broadcast already claimed by another account for message ${ctx.messageId}; skipping`, ); diff --git a/extensions/feishu/src/dedupe-key.ts b/extensions/feishu/src/dedupe-key.ts new file mode 100644 index 00000000000..96f3e86d4bd --- /dev/null +++ b/extensions/feishu/src/dedupe-key.ts @@ -0,0 +1,72 @@ +import type { FeishuMessageEvent } from "./event-types.js"; +import { normalizeFeishuExternalKey } from "./external-keys.js"; +import { parsePostContent } from "./post.js"; + +type FeishuMessageDedupeInput = Pick; + +function readRecord(value: unknown): Record | null { + return typeof value === "object" && value !== null && !Array.isArray(value) + ? (value as Record) + : null; +} + +function readExternalKey(value: unknown): string | undefined { + return normalizeFeishuExternalKey(typeof value === "string" ? value : ""); +} + +function parseContentRecord(content: string): Record | null { + try { + return readRecord(JSON.parse(content)); + } catch { + return null; + } +} + +function buildMediaDedupeKey(messageId: string, mediaParts: string[]): string { + return JSON.stringify([messageId, ...mediaParts]); +} + +function resolvePostMediaParts(content: string): string[] { + const parsed = parsePostContent(content); + return [ + ...parsed.imageKeys.map((imageKey) => `image_key:${imageKey}`), + ...parsed.mediaKeys.map((media) => `file_key:${media.fileKey}`), + ]; +} + +function resolveMessageMediaParts(messageType: string, content: string): string[] { + if (messageType === "post") { + return resolvePostMediaParts(content); + } + + const parsed = parseContentRecord(content); + if (!parsed) { + return []; + } + + const imageKey = readExternalKey(parsed.image_key); + const fileKey = readExternalKey(parsed.file_key); + switch (messageType) { + case "image": + return imageKey ? [`image_key:${imageKey}`] : []; + case "file": + case "audio": + case "sticker": + return fileKey ? [`file_key:${fileKey}`] : []; + case "video": + case "media": + return fileKey ? [`file_key:${fileKey}`] : imageKey ? [`image_key:${imageKey}`] : []; + default: + return fileKey ? [`file_key:${fileKey}`] : imageKey ? [`image_key:${imageKey}`] : []; + } +} + +export function resolveFeishuMessageDedupeKey(event: FeishuMessageDedupeInput): string | undefined { + const messageId = event.message.message_id?.trim(); + if (!messageId) { + return undefined; + } + const messageType = event.message.message_type.trim(); + const mediaParts = resolveMessageMediaParts(messageType, event.message.content); + return mediaParts.length > 0 ? buildMediaDedupeKey(messageId, mediaParts) : messageId; +} diff --git a/extensions/feishu/src/monitor.message-handler.ts b/extensions/feishu/src/monitor.message-handler.ts index 363c42eea18..616ac596efa 100644 --- a/extensions/feishu/src/monitor.message-handler.ts +++ b/extensions/feishu/src/monitor.message-handler.ts @@ -1,4 +1,5 @@ import type { ClawdbotConfig, HistoryEntry, PluginRuntime, RuntimeEnv } from "../runtime-api.js"; +import { resolveFeishuMessageDedupeKey } from "./dedupe-key.js"; import type { FeishuMessageEvent } from "./event-types.js"; import { isMentionForwardRequest } from "./mention.js"; import { @@ -110,21 +111,21 @@ function mergeFeishuDebounceMentions( return merged.size > 0 ? Array.from(merged.values()) : undefined; } -function dedupeFeishuDebounceEntriesByMessageId( +function dedupeFeishuDebounceEntriesByDedupeKey( entries: FeishuMessageEvent[], ): FeishuMessageEvent[] { const seen = new Set(); const deduped: FeishuMessageEvent[] = []; for (const entry of entries) { - const messageId = entry.message.message_id?.trim(); - if (!messageId) { + const dedupeKey = resolveFeishuMessageDedupeKey(entry); + if (!dedupeKey) { deduped.push(entry); continue; } - if (seen.has(messageId)) { + if (seen.has(dedupeKey)) { continue; } - seen.add(messageId); + seen.add(dedupeKey); deduped.push(entry); } return deduped; @@ -219,13 +220,13 @@ export function createFeishuMessageReceiveHandler({ const recordSuppressedMessageIds = async ( entries: FeishuMessageEvent[], - dispatchMessageId?: string, + dispatchDedupeKey?: string, ) => { - const keepMessageId = dispatchMessageId?.trim(); + const keepDedupeKey = dispatchDedupeKey?.trim(); const suppressedIds = new Set( entries - .map((entry) => entry.message.message_id?.trim()) - .filter((id): id is string => Boolean(id) && (!keepMessageId || id !== keepMessageId)), + .map((entry) => resolveFeishuMessageDedupeKey(entry)) + .filter((id): id is string => Boolean(id) && (!keepDedupeKey || id !== keepDedupeKey)), ); for (const messageId of suppressedIds) { try { @@ -266,10 +267,10 @@ export function createFeishuMessageReceiveHandler({ await dispatchFeishuMessage(last); return; } - const dedupedEntries = dedupeFeishuDebounceEntriesByMessageId(entries); + const dedupedEntries = dedupeFeishuDebounceEntriesByDedupeKey(entries); const freshEntries: FeishuMessageEvent[] = []; for (const entry of dedupedEntries) { - if (!(await hasProcessedMessage(entry.message.message_id, accountId, log))) { + if (!(await hasProcessedMessage(resolveFeishuMessageDedupeKey(entry), accountId, log))) { freshEntries.push(entry); } } @@ -277,7 +278,10 @@ export function createFeishuMessageReceiveHandler({ if (!dispatchEntry) { return; } - await recordSuppressedMessageIds(dedupedEntries, dispatchEntry.message.message_id); + await recordSuppressedMessageIds( + dedupedEntries, + resolveFeishuMessageDedupeKey(dispatchEntry), + ); const combinedText = freshEntries .map((entry) => resolveDebounceText(entry)) .filter(Boolean) @@ -302,7 +306,7 @@ export function createFeishuMessageReceiveHandler({ }, onError: (err, entries) => { for (const entry of entries) { - releaseFeishuMessageProcessing(entry.message.message_id, accountId); + releaseFeishuMessageProcessing(resolveFeishuMessageDedupeKey(entry), accountId); } error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`); }, @@ -315,7 +319,8 @@ export function createFeishuMessageReceiveHandler({ return; } const messageId = event.message?.message_id?.trim(); - if (!tryBeginFeishuMessageProcessing(messageId, accountId)) { + const messageDedupeKey = resolveFeishuMessageDedupeKey(event); + if (!tryBeginFeishuMessageProcessing(messageDedupeKey, accountId)) { log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`); return; } @@ -324,7 +329,7 @@ export function createFeishuMessageReceiveHandler({ }; if (fireAndForget) { void processMessage().catch((err) => { - releaseFeishuMessageProcessing(messageId, accountId); + releaseFeishuMessageProcessing(messageDedupeKey, accountId); error(`feishu[${accountId}]: error handling message: ${String(err)}`); }); return; @@ -332,7 +337,7 @@ export function createFeishuMessageReceiveHandler({ try { await processMessage(); } catch (err) { - releaseFeishuMessageProcessing(messageId, accountId); + releaseFeishuMessageProcessing(messageDedupeKey, accountId); error(`feishu[${accountId}]: error handling message: ${String(err)}`); } };