From 8c8f3969856f219318ca30b7a6cecd3e2af7af25 Mon Sep 17 00:00:00 2001 From: Ted Li Date: Wed, 29 Apr 2026 01:47:20 -0700 Subject: [PATCH] fix(feishu): suppress late streaming card finals (#72294) Merged via squash. Prepared head SHA: d18a9ff4c30f29cd00af394662656d22dca3ba1e Co-authored-by: MonkeyLeeT <6754057+MonkeyLeeT@users.noreply.github.com> Co-authored-by: hxy91819 <8814856+hxy91819@users.noreply.github.com> Reviewed-by: @hxy91819 --- CHANGELOG.md | 1 + .../feishu/src/reply-dispatcher.test.ts | 120 ++++++++++++++++++ extensions/feishu/src/reply-dispatcher.ts | 27 +++- 3 files changed, 143 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5ea92bcc5c..6b368421b22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ Docs: https://docs.openclaw.ai - Plugins/runtime-deps: cache bundled runtime-deps JSON/package files and root chunk import scans by file signature, reducing repeated staged-runtime scanning during bundled channel startup. Refs #73647 and #73705. Thanks @mattmcintyre and @bmilne1981. - CLI/TUI: keep `chat.history` off model-catalog discovery so initial Gateway-backed TUI history loads cannot block behind slow provider/plugin model scans on low-core hosts. Refs #73524. Thanks @harshcatsystems-collab. - Channels/WhatsApp: flag recently reconnected linked accounts in channel status even when the socket is currently healthy, so flapping WhatsApp Web sessions no longer look clean after a brief reconnect. Refs #73602. Thanks @Vksh07. +- Feishu: suppress distinct late `final` text deliveries after a streaming card has already closed, while keeping media attachments deliverable, so late-finals no longer reopen duplicate Feishu cards. Fixes #71977. (#72294) Thanks @MonkeyLeeT. - Gateway: expose `gateway.handshakeTimeoutMs` in config, schema, and docs while preserving `OPENCLAW_HANDSHAKE_TIMEOUT_MS` precedence, so loaded or low-powered hosts can tune local WebSocket pre-auth handshakes without patching dist files. Supersedes #51282; refs #73592 and #73652. Thanks @henry-the-frog. - Gateway/TUI/status: align configured and env-based WebSocket handshake budgets across local clients, probes, and fallback RPCs while preserving explicit status timeouts and paired-device auth fallback, so slow local gateways are not marked unreachable by a shorter client watchdog. Refs #73524, #73535, #73592, and #73602. Thanks @harshcatsystems-collab, @DJBlackhawk, and @Vksh07. - Agents/auth: scope external CLI credential discovery to configured providers during model auth status and startup prewarm, so opencode-only and other single-provider gateways do not block on unrelated Claude CLI Keychain probes. Fixes #73908. Thanks @Ailuras. diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index fb0b7342b4e..428123da471 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -399,6 +399,46 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); }); + it("skips distinct late final text after streaming card close", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.deliver({ text: "First complete answer" }, { kind: "final" }); + await options.onIdle?.(); + await options.deliver( + { text: "Late tool-result final", mediaUrl: "https://example.com/a.png" }, + { kind: "final" }, + ); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith("First complete answer", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1); + expect(sendMediaFeishuMock).toHaveBeenCalledWith( + expect.objectContaining({ + mediaUrl: "https://example.com/a.png", + }), + ); + }); + it("suppresses duplicate final text while still sending media", async () => { const options = setupNonStreamingAutoDispatcher(); await options.deliver({ text: "plain final" }, { kind: "final" }); @@ -918,6 +958,86 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); + it("does not suppress a later final after error closeout", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + sendMediaFeishuMock.mockRejectedValueOnce(new Error("media failed")); + + const { options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await expect( + options.deliver( + { text: "First answer", mediaUrl: "https://example.com/a.png" }, + { kind: "final" }, + ), + ).rejects.toThrow("media failed"); + await Promise.all([ + options.onError?.(new Error("media failed"), { kind: "final" }), + options.onIdle?.(), + ]); + await options.deliver({ text: "Second answer" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(2); + expect(streamingInstances[0].close).toHaveBeenCalledWith("First answer", { + note: "Agent: agent", + }); + expect(streamingInstances[1].close).toHaveBeenCalledWith("Second answer", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("does not suppress a recovery final after late media failure", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.deliver({ text: "First answer" }, { kind: "final" }); + await options.onIdle?.(); + sendMediaFeishuMock.mockRejectedValueOnce(new Error("media failed")); + await expect( + options.deliver( + { text: "Late attachment", mediaUrl: "https://example.com/a.png" }, + { kind: "final" }, + ), + ).rejects.toThrow("media failed"); + await options.onError?.(new Error("media failed"), { kind: "final" }); + await options.deliver({ text: "Recovered answer" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(2); + expect(streamingInstances[0].close).toHaveBeenCalledWith("First answer", { + note: "Agent: agent", + }); + expect(streamingInstances[1].close).toHaveBeenCalledWith("Recovered answer", { + note: "Agent: agent", + }); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + }); + it("cleans streaming state even when close throws", async () => { const origPush = streamingInstances.push.bind(streamingInstances); streamingInstances.push = (...args: StreamingSessionStub[]) => { diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 8fa6e09df67..efaae0bd465 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -228,6 +228,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP const deliveredFinalTexts = new Set(); let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; + let streamingClosedForReply = false; + let streamingCloseErroredForReply = false; type StreamTextUpdateMode = "snapshot" | "delta"; const formatReasoningPrefix = (thinking: string): string => { @@ -359,7 +361,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP })(); }; - const closeStreaming = async () => { + const closeStreaming = async (options?: { markClosedForReply?: boolean }) => { try { if (streamingStartPromise) { await streamingStartPromise; @@ -378,6 +380,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP // the streaming card. if (streamText) { deliveredFinalTexts.add(streamText); + if (options?.markClosedForReply !== false && !streamingCloseErroredForReply) { + streamingClosedForReply = true; + } } } } finally { @@ -451,6 +456,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId), onReplyStart: async () => { deliveredFinalTexts.clear(); + streamingClosedForReply = false; + streamingCloseErroredForReply = false; if (streamingEnabled && renderMode === "card") { startStreaming(); } @@ -461,17 +468,25 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP const text = reply.text; const hasText = reply.hasText; const hasMedia = reply.hasMedia; + const useCard = + hasText && (renderMode === "card" || (renderMode === "auto" && shouldUseCard(text))); const skipTextForDuplicateFinal = info?.kind === "final" && hasText && deliveredFinalTexts.has(text); - const shouldDeliverText = hasText && !skipTextForDuplicateFinal; + const skipTextForClosedStreamingFinal = + info?.kind === "final" && + hasText && + streamingClosedForReply && + !streamingCloseErroredForReply && + streamingEnabled && + useCard; + const shouldDeliverText = + hasText && !skipTextForDuplicateFinal && !skipTextForClosedStreamingFinal; if (!shouldDeliverText && !hasMedia) { return; } if (shouldDeliverText) { - const useCard = renderMode === "card" || (renderMode === "auto" && shouldUseCard(text)); - if (info?.kind === "block") { // Drop internal block chunks unless we can safely consume them as // streaming-card fallback content. @@ -556,10 +571,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } }, onError: async (error, info) => { + streamingCloseErroredForReply = true; + streamingClosedForReply = false; params.runtime.error?.( `feishu[${account.accountId}] ${info.kind} reply failed: ${String(error)}`, ); - await closeStreaming(); + await closeStreaming({ markClosedForReply: false }); typingCallbacks?.onIdle?.(); }, onIdle: async () => {