mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:00:43 +00:00
fix(feishu): repair media-aware message dedupe (#76408)
Summary: - The PR adds a Feishu-local media-aware dedupe-key helper, wires it through inbound receive/debounce/persistent/broadcast dedupe paths, adds Feishu regression coverage, and adds an Unreleased changelog entry. - Reproducibility: yes. Source inspection shows a high-confidence path: two Feishu audio events for the same a ... _key` values collide in current-main receive and persistent dedupe before media parsing distinguishes them. Automerge notes: - No ClawSweeper repair was needed after automerge opt-in. Validation: - ClawSweeper review passed for headc0229f8a48. - Required merge gates passed before the squash merge. Prepared head SHA:c0229f8a48Review: https://github.com/openclaw/openclaw/pull/76408#issuecomment-4365292618 Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: SymbolStar <24540119+SymbolStar@users.noreply.github.com>
This commit is contained in:
@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/responses: emit every client tool call from `/v1/responses` JSON and SSE responses when the agent invokes multiple client tools in a single turn, so multi-tool plans, graph orchestration calls, and similar batched flows no longer drop every call but the last. Fixes #52288. Thanks @CharZhou and @bonelli.
|
||||
- Gateway/agent: enforce `session.sendPolicy=deny` on gateway agent requests only when `deliver: true`, so non-delivery smoke checks and internal agent runs are no longer rejected with `send blocked by session policy` while outbound delivery remains gated. Fixes #73381. Thanks @wenxu007.
|
||||
- Slack/reactions: treat missing no_reaction remove responses as idempotent success and route own-reaction cleanup through the remove helper, so concurrent cleanup no longer surfaces Slack race errors. Fixes #50733. (#76304) Thanks @martingarramon and @Hollychou924.
|
||||
- Feishu: include media `file_key` and `image_key` values in inbound dedupe so reused message IDs still process distinct media attachments while true retries stay suppressed. Fixes #75057. Thanks @SymbolStar.
|
||||
- Control UI/Gateway: avoid full session-list reloads for locally applied message-phase session updates, carry known session keys through transcript-file update events, and defer media provider listing when explicit generation model config is present. Refs #76236, #76203, #76188, #76107, and #76166. Thanks @BunsDev.
|
||||
- Install/update: prune the obsolete `plugin-runtime-deps` state directory during packaged postinstall so upgrades from pre-2026.5.2 releases reclaim old bundled-plugin dependency caches without touching external plugin installs.
|
||||
- Auto-reply/queue: treat reset-triggered `/new` and `/reset` turns as interrupt runs across active-run queue handling, so steer/followup modes cannot delay a fresh session behind existing work. Fixes #74093. (#74144) Thanks @ruji9527 and @yelog.
|
||||
|
||||
@@ -5,6 +5,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { ClawdbotConfig, PluginRuntime } from "../runtime-api.js";
|
||||
import type { FeishuMessageEvent } from "./bot.js";
|
||||
import { handleFeishuMessage } from "./bot.js";
|
||||
import { createFeishuMessageReceiveHandler } from "./monitor.message-handler.js";
|
||||
import { setFeishuRuntime } from "./runtime.js";
|
||||
|
||||
type ConfiguredBindingRoute = ReturnType<typeof ConversationRuntime.resolveConfiguredBindingRoute>;
|
||||
@@ -3000,6 +3001,58 @@ describe("handleFeishuMessage command authorization", () => {
|
||||
expect(mockDispatchReplyFromConfig).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("dedupes Feishu media by message_id plus file_key", async () => {
|
||||
mockShouldComputeCommandAuthorized.mockReturnValue(false);
|
||||
|
||||
const cfg: ClawdbotConfig = {
|
||||
channels: {
|
||||
feishu: {
|
||||
dmPolicy: "open",
|
||||
},
|
||||
},
|
||||
} as ClawdbotConfig;
|
||||
const createAudioEvent = (fileKey: string): FeishuMessageEvent => ({
|
||||
sender: {
|
||||
sender_id: {
|
||||
open_id: "ou-audio-dedup",
|
||||
},
|
||||
},
|
||||
message: {
|
||||
message_id: "msg-audio-reused-id",
|
||||
chat_id: "oc-dm",
|
||||
chat_type: "p2p",
|
||||
message_type: "audio",
|
||||
content: JSON.stringify({
|
||||
file_key: fileKey,
|
||||
duration: 1200,
|
||||
}),
|
||||
},
|
||||
});
|
||||
|
||||
await dispatchMessage({ cfg, event: createAudioEvent("file_audio_first") });
|
||||
await dispatchMessage({ cfg, event: createAudioEvent("file_audio_second") });
|
||||
await dispatchMessage({ cfg, event: createAudioEvent("file_audio_first") });
|
||||
|
||||
expect(mockDispatchReplyFromConfig).toHaveBeenCalledTimes(2);
|
||||
expect(mockDownloadMessageResourceFeishu).toHaveBeenCalledTimes(2);
|
||||
expect(mockDownloadMessageResourceFeishu).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
messageId: "msg-audio-reused-id",
|
||||
fileKey: "file_audio_first",
|
||||
type: "file",
|
||||
}),
|
||||
);
|
||||
expect(mockDownloadMessageResourceFeishu).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.objectContaining({
|
||||
messageId: "msg-audio-reused-id",
|
||||
fileKey: "file_audio_second",
|
||||
type: "file",
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("skips empty-text messages with no media to prevent blank user turns in session (#74634)", async () => {
|
||||
// Feishu can deliver { "text": "" } events (empty-text or media-stripped
|
||||
// messages). Writing blank user content to the session causes downstream
|
||||
@@ -3038,3 +3091,73 @@ describe("handleFeishuMessage command authorization", () => {
|
||||
expect(mockDispatchReplyFromConfig).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("createFeishuMessageReceiveHandler media dedupe", () => {
|
||||
it("keeps same-id media variants distinct at receive time", async () => {
|
||||
const handleMessage = vi.fn(async () => undefined);
|
||||
const core = {
|
||||
channel: {
|
||||
debounce: {
|
||||
resolveInboundDebounceMs: vi.fn(() => 0),
|
||||
createInboundDebouncer: vi.fn(
|
||||
(options: { onFlush: (entries: FeishuMessageEvent[]) => Promise<void> | void }) => ({
|
||||
enqueue: async (event: FeishuMessageEvent) => {
|
||||
await options.onFlush([event]);
|
||||
},
|
||||
}),
|
||||
),
|
||||
},
|
||||
text: {
|
||||
hasControlCommand: vi.fn(() => false),
|
||||
},
|
||||
},
|
||||
} as unknown as PluginRuntime;
|
||||
const createAudioEvent = (fileKey: string): FeishuMessageEvent => ({
|
||||
sender: {
|
||||
sender_id: {
|
||||
open_id: "ou-audio-receive-dedup",
|
||||
},
|
||||
},
|
||||
message: {
|
||||
message_id: "msg-audio-receive-reused-id",
|
||||
chat_id: "oc-dm",
|
||||
chat_type: "p2p",
|
||||
message_type: "audio",
|
||||
content: JSON.stringify({
|
||||
file_key: fileKey,
|
||||
duration: 1200,
|
||||
}),
|
||||
},
|
||||
});
|
||||
const handler = createFeishuMessageReceiveHandler({
|
||||
cfg: { channels: { feishu: { dmPolicy: "open" } } } as ClawdbotConfig,
|
||||
core,
|
||||
accountId: "receive-media-dedupe",
|
||||
chatHistories: new Map(),
|
||||
handleMessage,
|
||||
resolveDebounceText: () => "",
|
||||
hasProcessedMessage: vi.fn(async () => false),
|
||||
recordProcessedMessage: vi.fn(async () => true),
|
||||
});
|
||||
|
||||
await handler(createAudioEvent("file_audio_receive_first"));
|
||||
await handler(createAudioEvent("file_audio_receive_second"));
|
||||
await handler(createAudioEvent("file_audio_receive_first"));
|
||||
|
||||
expect(handleMessage).toHaveBeenCalledTimes(2);
|
||||
expect(handleMessage).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
expect.objectContaining({
|
||||
event: createAudioEvent("file_audio_receive_first"),
|
||||
processingClaimHeld: true,
|
||||
}),
|
||||
);
|
||||
expect(handleMessage).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
expect.objectContaining({
|
||||
event: createAudioEvent("file_audio_receive_second"),
|
||||
processingClaimHeld: true,
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -42,6 +42,7 @@ import { type FeishuPermissionError, resolveFeishuSenderName } from "./bot-sende
|
||||
import { getChatInfo } from "./chat.js";
|
||||
import { createFeishuClient } from "./client.js";
|
||||
import { finalizeFeishuMessageProcessing, tryRecordMessagePersistent } from "./dedup.js";
|
||||
import { resolveFeishuMessageDedupeKey } from "./dedupe-key.js";
|
||||
import { maybeCreateDynamicAgent } from "./dynamic-agent.js";
|
||||
import { extractMentionTargets, isMentionForwardRequest } from "./mention.js";
|
||||
import {
|
||||
@@ -409,9 +410,10 @@ export async function handleFeishuMessage(params: {
|
||||
const error = runtime?.error ?? console.error;
|
||||
|
||||
const messageId = event.message.message_id;
|
||||
const messageDedupeKey = resolveFeishuMessageDedupeKey(event);
|
||||
if (
|
||||
!(await finalizeFeishuMessageProcessing({
|
||||
messageId,
|
||||
messageId: messageDedupeKey,
|
||||
namespace: account.accountId,
|
||||
log,
|
||||
claimHeld: processingClaimHeld,
|
||||
@@ -1251,7 +1253,9 @@ export async function handleFeishuMessage(params: {
|
||||
// broadcast dispatch to avoid duplicate agent sessions and race conditions.
|
||||
// Uses a shared "broadcast" namespace (not per-account) so the first handler
|
||||
// to reach this point claims the message; subsequent accounts skip.
|
||||
if (!(await tryRecordMessagePersistent(ctx.messageId, "broadcast", log))) {
|
||||
if (
|
||||
!(await tryRecordMessagePersistent(messageDedupeKey ?? ctx.messageId, "broadcast", log))
|
||||
) {
|
||||
log(
|
||||
`feishu[${account.accountId}]: broadcast already claimed by another account for message ${ctx.messageId}; skipping`,
|
||||
);
|
||||
|
||||
72
extensions/feishu/src/dedupe-key.ts
Normal file
72
extensions/feishu/src/dedupe-key.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import type { FeishuMessageEvent } from "./event-types.js";
|
||||
import { normalizeFeishuExternalKey } from "./external-keys.js";
|
||||
import { parsePostContent } from "./post.js";
|
||||
|
||||
type FeishuMessageDedupeInput = Pick<FeishuMessageEvent, "message">;
|
||||
|
||||
function readRecord(value: unknown): Record<string, unknown> | null {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value)
|
||||
? (value as Record<string, unknown>)
|
||||
: null;
|
||||
}
|
||||
|
||||
function readExternalKey(value: unknown): string | undefined {
|
||||
return normalizeFeishuExternalKey(typeof value === "string" ? value : "");
|
||||
}
|
||||
|
||||
function parseContentRecord(content: string): Record<string, unknown> | null {
|
||||
try {
|
||||
return readRecord(JSON.parse(content));
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function buildMediaDedupeKey(messageId: string, mediaParts: string[]): string {
|
||||
return JSON.stringify([messageId, ...mediaParts]);
|
||||
}
|
||||
|
||||
function resolvePostMediaParts(content: string): string[] {
|
||||
const parsed = parsePostContent(content);
|
||||
return [
|
||||
...parsed.imageKeys.map((imageKey) => `image_key:${imageKey}`),
|
||||
...parsed.mediaKeys.map((media) => `file_key:${media.fileKey}`),
|
||||
];
|
||||
}
|
||||
|
||||
function resolveMessageMediaParts(messageType: string, content: string): string[] {
|
||||
if (messageType === "post") {
|
||||
return resolvePostMediaParts(content);
|
||||
}
|
||||
|
||||
const parsed = parseContentRecord(content);
|
||||
if (!parsed) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const imageKey = readExternalKey(parsed.image_key);
|
||||
const fileKey = readExternalKey(parsed.file_key);
|
||||
switch (messageType) {
|
||||
case "image":
|
||||
return imageKey ? [`image_key:${imageKey}`] : [];
|
||||
case "file":
|
||||
case "audio":
|
||||
case "sticker":
|
||||
return fileKey ? [`file_key:${fileKey}`] : [];
|
||||
case "video":
|
||||
case "media":
|
||||
return fileKey ? [`file_key:${fileKey}`] : imageKey ? [`image_key:${imageKey}`] : [];
|
||||
default:
|
||||
return fileKey ? [`file_key:${fileKey}`] : imageKey ? [`image_key:${imageKey}`] : [];
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveFeishuMessageDedupeKey(event: FeishuMessageDedupeInput): string | undefined {
|
||||
const messageId = event.message.message_id?.trim();
|
||||
if (!messageId) {
|
||||
return undefined;
|
||||
}
|
||||
const messageType = event.message.message_type.trim();
|
||||
const mediaParts = resolveMessageMediaParts(messageType, event.message.content);
|
||||
return mediaParts.length > 0 ? buildMediaDedupeKey(messageId, mediaParts) : messageId;
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { ClawdbotConfig, HistoryEntry, PluginRuntime, RuntimeEnv } from "../runtime-api.js";
|
||||
import { resolveFeishuMessageDedupeKey } from "./dedupe-key.js";
|
||||
import type { FeishuMessageEvent } from "./event-types.js";
|
||||
import { isMentionForwardRequest } from "./mention.js";
|
||||
import {
|
||||
@@ -110,21 +111,21 @@ function mergeFeishuDebounceMentions(
|
||||
return merged.size > 0 ? Array.from(merged.values()) : undefined;
|
||||
}
|
||||
|
||||
function dedupeFeishuDebounceEntriesByMessageId(
|
||||
function dedupeFeishuDebounceEntriesByDedupeKey(
|
||||
entries: FeishuMessageEvent[],
|
||||
): FeishuMessageEvent[] {
|
||||
const seen = new Set<string>();
|
||||
const deduped: FeishuMessageEvent[] = [];
|
||||
for (const entry of entries) {
|
||||
const messageId = entry.message.message_id?.trim();
|
||||
if (!messageId) {
|
||||
const dedupeKey = resolveFeishuMessageDedupeKey(entry);
|
||||
if (!dedupeKey) {
|
||||
deduped.push(entry);
|
||||
continue;
|
||||
}
|
||||
if (seen.has(messageId)) {
|
||||
if (seen.has(dedupeKey)) {
|
||||
continue;
|
||||
}
|
||||
seen.add(messageId);
|
||||
seen.add(dedupeKey);
|
||||
deduped.push(entry);
|
||||
}
|
||||
return deduped;
|
||||
@@ -219,13 +220,13 @@ export function createFeishuMessageReceiveHandler({
|
||||
|
||||
const recordSuppressedMessageIds = async (
|
||||
entries: FeishuMessageEvent[],
|
||||
dispatchMessageId?: string,
|
||||
dispatchDedupeKey?: string,
|
||||
) => {
|
||||
const keepMessageId = dispatchMessageId?.trim();
|
||||
const keepDedupeKey = dispatchDedupeKey?.trim();
|
||||
const suppressedIds = new Set(
|
||||
entries
|
||||
.map((entry) => entry.message.message_id?.trim())
|
||||
.filter((id): id is string => Boolean(id) && (!keepMessageId || id !== keepMessageId)),
|
||||
.map((entry) => resolveFeishuMessageDedupeKey(entry))
|
||||
.filter((id): id is string => Boolean(id) && (!keepDedupeKey || id !== keepDedupeKey)),
|
||||
);
|
||||
for (const messageId of suppressedIds) {
|
||||
try {
|
||||
@@ -266,10 +267,10 @@ export function createFeishuMessageReceiveHandler({
|
||||
await dispatchFeishuMessage(last);
|
||||
return;
|
||||
}
|
||||
const dedupedEntries = dedupeFeishuDebounceEntriesByMessageId(entries);
|
||||
const dedupedEntries = dedupeFeishuDebounceEntriesByDedupeKey(entries);
|
||||
const freshEntries: FeishuMessageEvent[] = [];
|
||||
for (const entry of dedupedEntries) {
|
||||
if (!(await hasProcessedMessage(entry.message.message_id, accountId, log))) {
|
||||
if (!(await hasProcessedMessage(resolveFeishuMessageDedupeKey(entry), accountId, log))) {
|
||||
freshEntries.push(entry);
|
||||
}
|
||||
}
|
||||
@@ -277,7 +278,10 @@ export function createFeishuMessageReceiveHandler({
|
||||
if (!dispatchEntry) {
|
||||
return;
|
||||
}
|
||||
await recordSuppressedMessageIds(dedupedEntries, dispatchEntry.message.message_id);
|
||||
await recordSuppressedMessageIds(
|
||||
dedupedEntries,
|
||||
resolveFeishuMessageDedupeKey(dispatchEntry),
|
||||
);
|
||||
const combinedText = freshEntries
|
||||
.map((entry) => resolveDebounceText(entry))
|
||||
.filter(Boolean)
|
||||
@@ -302,7 +306,7 @@ export function createFeishuMessageReceiveHandler({
|
||||
},
|
||||
onError: (err, entries) => {
|
||||
for (const entry of entries) {
|
||||
releaseFeishuMessageProcessing(entry.message.message_id, accountId);
|
||||
releaseFeishuMessageProcessing(resolveFeishuMessageDedupeKey(entry), accountId);
|
||||
}
|
||||
error(`feishu[${accountId}]: inbound debounce flush failed: ${String(err)}`);
|
||||
},
|
||||
@@ -315,7 +319,8 @@ export function createFeishuMessageReceiveHandler({
|
||||
return;
|
||||
}
|
||||
const messageId = event.message?.message_id?.trim();
|
||||
if (!tryBeginFeishuMessageProcessing(messageId, accountId)) {
|
||||
const messageDedupeKey = resolveFeishuMessageDedupeKey(event);
|
||||
if (!tryBeginFeishuMessageProcessing(messageDedupeKey, accountId)) {
|
||||
log(`feishu[${accountId}]: dropping duplicate event for message ${messageId}`);
|
||||
return;
|
||||
}
|
||||
@@ -324,7 +329,7 @@ export function createFeishuMessageReceiveHandler({
|
||||
};
|
||||
if (fireAndForget) {
|
||||
void processMessage().catch((err) => {
|
||||
releaseFeishuMessageProcessing(messageId, accountId);
|
||||
releaseFeishuMessageProcessing(messageDedupeKey, accountId);
|
||||
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
|
||||
});
|
||||
return;
|
||||
@@ -332,7 +337,7 @@ export function createFeishuMessageReceiveHandler({
|
||||
try {
|
||||
await processMessage();
|
||||
} catch (err) {
|
||||
releaseFeishuMessageProcessing(messageId, accountId);
|
||||
releaseFeishuMessageProcessing(messageDedupeKey, accountId);
|
||||
error(`feishu[${accountId}]: error handling message: ${String(err)}`);
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user