From aa74f9107856e2281ddc2e5bfdffb598c58c9455 Mon Sep 17 00:00:00 2001 From: Ted Li Date: Sun, 26 Apr 2026 11:16:27 -0700 Subject: [PATCH] fix(feishu): keep finals after streaming close errors --- .../feishu/src/reply-dispatcher.test.ts | 80 +++++++++++++++++++ extensions/feishu/src/reply-dispatcher.ts | 13 ++- 2 files changed, 90 insertions(+), 3 deletions(-) diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index ca58a9704da..428123da471 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -958,6 +958,86 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); + it("does not suppress a later final after error closeout", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + sendMediaFeishuMock.mockRejectedValueOnce(new Error("media failed")); + + const { options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await expect( + options.deliver( + { text: "First answer", mediaUrl: "https://example.com/a.png" }, + { kind: "final" }, + ), + ).rejects.toThrow("media failed"); + await Promise.all([ + options.onError?.(new Error("media failed"), { kind: "final" }), + options.onIdle?.(), + ]); + await options.deliver({ text: "Second answer" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(2); + expect(streamingInstances[0].close).toHaveBeenCalledWith("First answer", { + note: "Agent: agent", + }); + expect(streamingInstances[1].close).toHaveBeenCalledWith("Second answer", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("does not suppress a recovery final after late media failure", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.deliver({ text: "First answer" }, { kind: "final" }); + await options.onIdle?.(); + sendMediaFeishuMock.mockRejectedValueOnce(new Error("media failed")); + await expect( + options.deliver( + { text: "Late attachment", mediaUrl: "https://example.com/a.png" }, + { kind: "final" }, + ), + ).rejects.toThrow("media failed"); + await options.onError?.(new Error("media failed"), { kind: "final" }); + await options.deliver({ text: "Recovered answer" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(2); + expect(streamingInstances[0].close).toHaveBeenCalledWith("First answer", { + note: "Agent: agent", + }); + expect(streamingInstances[1].close).toHaveBeenCalledWith("Recovered answer", { + note: "Agent: agent", + }); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + }); + it("cleans streaming state even when close throws", async () => { const origPush = streamingInstances.push.bind(streamingInstances); streamingInstances.push = (...args: StreamingSessionStub[]) => { diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index e8659574b63..efaae0bd465 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -229,6 +229,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; let streamingClosedForReply = false; + let streamingCloseErroredForReply = false; type StreamTextUpdateMode = "snapshot" | "delta"; const formatReasoningPrefix = (thinking: string): string => { @@ -360,7 +361,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP })(); }; - const closeStreaming = async () => { + const closeStreaming = async (options?: { markClosedForReply?: boolean }) => { try { if (streamingStartPromise) { await streamingStartPromise; @@ -379,7 +380,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP // the streaming card. if (streamText) { deliveredFinalTexts.add(streamText); - streamingClosedForReply = true; + if (options?.markClosedForReply !== false && !streamingCloseErroredForReply) { + streamingClosedForReply = true; + } } } } finally { @@ -454,6 +457,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP onReplyStart: async () => { deliveredFinalTexts.clear(); streamingClosedForReply = false; + streamingCloseErroredForReply = false; if (streamingEnabled && renderMode === "card") { startStreaming(); } @@ -472,6 +476,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP info?.kind === "final" && hasText && streamingClosedForReply && + !streamingCloseErroredForReply && streamingEnabled && useCard; const shouldDeliverText = @@ -566,10 +571,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } }, onError: async (error, info) => { + streamingCloseErroredForReply = true; + streamingClosedForReply = false; params.runtime.error?.( `feishu[${account.accountId}] ${info.kind} reply failed: ${String(error)}`, ); - await closeStreaming(); + await closeStreaming({ markClosedForReply: false }); typingCallbacks?.onIdle?.(); }, onIdle: async () => {