From 3bf6ed181e03ed150f762cf7628075e452038a83 Mon Sep 17 00:00:00 2001 From: rexl2018 <38375107+rexl2018@users.noreply.github.com> Date: Thu, 5 Mar 2026 11:32:35 +0800 Subject: [PATCH] Feishu: harden streaming merge semantics and final reply dedupe (#33245) * Feishu: close duplicate final gap and cover routing precedence * Feishu: resolve reviewer duplicate-final and routing feedback * Feishu: tighten streaming send-mode option typing * Feishu: fix reverse-overlap streaming merge ordering * Feishu: align streaming final dedupe test expectation * Feishu: allow distinct streaming finals while deduping repeats --------- Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> --- CHANGELOG.md | 1 + .../feishu/src/reply-dispatcher.test.ts | 35 +++++++- extensions/feishu/src/reply-dispatcher.ts | 14 ++- extensions/feishu/src/streaming-card.test.ts | 86 ++++++------------- extensions/feishu/src/streaming-card.ts | 49 +++++++---- 5 files changed, 99 insertions(+), 86 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b29cdaebcf2..4736be5d8ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Feishu/streaming card delivery synthesis: unify snapshot and delta streaming merge semantics, apply overlap-aware final merge, suppress duplicate final text delivery (including text+media final packets), prefer topic-thread `message.reply` routing when a reply target exists, and tune card print cadence to avoid duplicate incremental rendering. (from #33245, #32896, #33840) Thanks @rexl2018, @kcinzgg, and @aerelune. - Security/dependency audit: patch transitive Hono vulnerabilities by pinning `hono` to `4.12.5` and `@hono/node-server` to `1.19.10` in production resolution paths. Thanks @shakkernerd. - Security/dependency audit: bump `tar` to `7.5.10` (from `7.5.9`) to address the high-severity hardlink path traversal advisory (`GHSA-qffp-2rhf-9h96`). Thanks @shakkernerd. - Cron/announce delivery robustness: bypass pending-descendant announce guards for cron completion sends, ensure named-agent announce routes have outbound session entries, and fall back to direct delivery only when an announce send was actually attempted and failed. (from #35185, #32443, #34987) Thanks @Sid-Qin, @scoootscooob, and @bmendonca3. diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index 7f25db5e417..3f464a88318 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -300,7 +300,6 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { expect(sendMessageFeishuMock).not.toHaveBeenCalled(); expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); }); - it("suppresses duplicate final text while still sending media", async () => { resolveFeishuAccountMock.mockReturnValue({ accountId: "main", @@ -341,6 +340,40 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { ); }); + it("keeps distinct non-streaming final payloads", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "auto", + streaming: false, + }, + }); + + createFeishuReplyDispatcher({ + cfg: {} as never, + agentId: "agent", + runtime: { log: vi.fn(), error: vi.fn() } as never, + chatId: "oc_chat", + }); + + const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0]; + await options.deliver({ text: "notice header" }, { kind: "final" }); + await options.deliver({ text: "actual answer body" }, { kind: "final" }); + + expect(sendMessageFeishuMock).toHaveBeenCalledTimes(2); + expect(sendMessageFeishuMock).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ text: "notice header" }), + ); + expect(sendMessageFeishuMock).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ text: "actual answer body" }), + ); + }); + it("treats block updates as delta chunks", async () => { resolveFeishuAccountMock.mockReturnValue({ accountId: "main", diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 58ca55eef28..c754bce5c16 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -143,7 +143,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP let streaming: FeishuStreamingSession | null = null; let streamText = ""; let lastPartial = ""; - let lastFinalText: string | null = null; + const deliveredFinalTexts = new Set(); let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; type StreamTextUpdateMode = "snapshot" | "delta"; @@ -230,7 +230,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP responsePrefixContextProvider: prefixContext.responsePrefixContextProvider, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId), onReplyStart: () => { - lastFinalText = null; + deliveredFinalTexts.clear(); if (streamingEnabled && renderMode === "card") { startStreaming(); } @@ -246,10 +246,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP : []; const hasText = Boolean(text.trim()); const hasMedia = mediaList.length > 0; - // Suppress only exact duplicate final text payloads to avoid - // dropping legitimate multi-part final replies. const skipTextForDuplicateFinal = - info?.kind === "final" && hasText && lastFinalText === text; + info?.kind === "final" && hasText && deliveredFinalTexts.has(text); const shouldDeliverText = hasText && !skipTextForDuplicateFinal; if (!shouldDeliverText && !hasMedia) { @@ -287,7 +285,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (info?.kind === "final") { streamText = mergeStreamingText(streamText, text); await closeStreaming(); - lastFinalText = text; + deliveredFinalTexts.add(text); } // Send media even when streaming handled the text if (hasMedia) { @@ -324,7 +322,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP first = false; } if (info?.kind === "final") { - lastFinalText = text; + deliveredFinalTexts.add(text); } } else { const converted = core.channel.text.convertMarkdownTables(text, tableMode); @@ -345,7 +343,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP first = false; } if (info?.kind === "final") { - lastFinalText = text; + deliveredFinalTexts.add(text); } } } diff --git a/extensions/feishu/src/streaming-card.test.ts b/extensions/feishu/src/streaming-card.test.ts index f0276c0a91f..bb12feab613 100644 --- a/extensions/feishu/src/streaming-card.test.ts +++ b/extensions/feishu/src/streaming-card.test.ts @@ -1,12 +1,5 @@ -import { beforeEach, describe, expect, it, vi } from "vitest"; - -const fetchWithSsrFGuardMock = vi.hoisted(() => vi.fn()); - -vi.mock("openclaw/plugin-sdk/feishu", () => ({ - fetchWithSsrFGuard: fetchWithSsrFGuardMock, -})); - -import { FeishuStreamingSession, mergeStreamingText } from "./streaming-card.js"; +import { describe, expect, it } from "vitest"; +import { mergeStreamingText, resolveStreamingCardSendMode } from "./streaming-card.js"; describe("mergeStreamingText", () => { it("prefers the latest full text when it already includes prior text", () => { @@ -28,59 +21,34 @@ describe("mergeStreamingText", () => { expect(mergeStreamingText("revision_id: 552", "2,一点变化都没有")).toBe( "revision_id: 552,一点变化都没有", ); + expect(mergeStreamingText("abc", "cabc")).toBe("cabc"); }); }); -describe("FeishuStreamingSession routing", () => { - beforeEach(() => { - vi.clearAllMocks(); - fetchWithSsrFGuardMock.mockReset(); - }); - - it("prefers message.reply when reply target and root id both exist", async () => { - fetchWithSsrFGuardMock - .mockResolvedValueOnce({ - response: { json: async () => ({ code: 0, msg: "ok", tenant_access_token: "token" }) }, - release: async () => {}, - }) - .mockResolvedValueOnce({ - response: { json: async () => ({ code: 0, msg: "ok", data: { card_id: "card_1" } }) }, - release: async () => {}, - }); - - const replyMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_reply" } })); - const createMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_create" } })); - - const session = new FeishuStreamingSession( - { - im: { - message: { - reply: replyMock, - create: createMock, - }, - }, - } as never, - { - appId: "app", - appSecret: "secret", - domain: "feishu", - }, - ); - - await session.start("oc_chat", "chat_id", { - replyToMessageId: "om_parent", - replyInThread: true, - rootId: "om_topic_root", - }); - - expect(replyMock).toHaveBeenCalledTimes(1); - expect(replyMock).toHaveBeenCalledWith({ - path: { message_id: "om_parent" }, - data: expect.objectContaining({ - msg_type: "interactive", - reply_in_thread: true, +describe("resolveStreamingCardSendMode", () => { + it("prefers message.reply when reply target and root id both exist", () => { + expect( + resolveStreamingCardSendMode({ + replyToMessageId: "om_parent", + rootId: "om_topic_root", }), - }); - expect(createMock).not.toHaveBeenCalled(); + ).toBe("reply"); + }); + + it("falls back to root create when reply target is absent", () => { + expect( + resolveStreamingCardSendMode({ + rootId: "om_topic_root", + }), + ).toBe("root_create"); + }); + + it("uses create mode when no reply routing fields are provided", () => { + expect(resolveStreamingCardSendMode()).toBe("create"); + expect( + resolveStreamingCardSendMode({ + replyInThread: true, + }), + ).toBe("create"); }); }); diff --git a/extensions/feishu/src/streaming-card.ts b/extensions/feishu/src/streaming-card.ts index a254182614f..45db480d360 100644 --- a/extensions/feishu/src/streaming-card.ts +++ b/extensions/feishu/src/streaming-card.ts @@ -16,6 +16,13 @@ export type StreamingCardHeader = { template?: string; }; +type StreamingStartOptions = { + replyToMessageId?: string; + replyInThread?: boolean; + rootId?: string; + header?: StreamingCardHeader; +}; + // Token cache (keyed by domain + appId) const tokenCache = new Map(); @@ -103,6 +110,12 @@ export function mergeStreamingText( if (previous.startsWith(next)) { return previous; } + if (next.includes(previous)) { + return next; + } + if (previous.includes(next)) { + return previous; + } // Merge partial overlaps, e.g. "这" + "这是" => "这是". const maxOverlap = Math.min(previous.length, next.length); @@ -111,17 +124,20 @@ export function mergeStreamingText( return `${previous}${next.slice(overlap)}`; } } - - if (next.includes(previous)) { - return next; - } - if (previous.includes(next)) { - return previous; - } // Fallback for fragmented partial chunks: append as-is to avoid losing tokens. return `${previous}${next}`; } +export function resolveStreamingCardSendMode(options?: StreamingStartOptions) { + if (options?.replyToMessageId) { + return "reply"; + } + if (options?.rootId) { + return "root_create"; + } + return "create"; +} + /** Streaming card session manager */ export class FeishuStreamingSession { private client: Client; @@ -143,12 +159,7 @@ export class FeishuStreamingSession { async start( receiveId: string, receiveIdType: "open_id" | "user_id" | "union_id" | "email" | "chat_id" = "chat_id", - options?: { - replyToMessageId?: string; - replyInThread?: boolean; - rootId?: string; - header?: StreamingCardHeader; - }, + options?: StreamingStartOptions, ): Promise { if (this.state) { return; @@ -204,22 +215,24 @@ export class FeishuStreamingSession { // message.create with root_id may silently ignore root_id for card // references (card_id format). let sendRes; - if (options?.replyToMessageId) { + const sendOptions = options ?? {}; + const sendMode = resolveStreamingCardSendMode(sendOptions); + if (sendMode === "reply") { sendRes = await this.client.im.message.reply({ - path: { message_id: options.replyToMessageId }, + path: { message_id: sendOptions.replyToMessageId! }, data: { msg_type: "interactive", content: cardContent, - ...(options.replyInThread ? { reply_in_thread: true } : {}), + ...(sendOptions.replyInThread ? { reply_in_thread: true } : {}), }, }); - } else if (options?.rootId) { + } else if (sendMode === "root_create") { // root_id is undeclared in the SDK types but accepted at runtime sendRes = await this.client.im.message.create({ params: { receive_id_type: receiveIdType }, data: Object.assign( { receive_id: receiveId, msg_type: "interactive", content: cardContent }, - { root_id: options.rootId }, + { root_id: sendOptions.rootId }, ), }); } else {