diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index cad463827ed..c8894d688fa 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -5,6 +5,7 @@ type StreamingSessionStub = { start: ReturnType; update: ReturnType; close: ReturnType; + discard: ReturnType; isActive: ReturnType; }; @@ -84,6 +85,9 @@ vi.mock("./streaming-card.js", () => { close = vi.fn(async () => { this.active = false; }); + discard = vi.fn(async () => { + this.active = false; + }); isActive = vi.fn(() => this.active); constructor() { @@ -163,7 +167,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); - function setupNonStreamingAutoDispatcher() { + function useNonStreamingAutoAccount() { resolveFeishuAccountMock.mockReturnValue({ accountId: "main", appId: "app_id", @@ -174,6 +178,10 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { streaming: false, }, }); + } + + function setupNonStreamingAutoDispatcher() { + useNonStreamingAutoAccount(); createFeishuReplyDispatcher({ cfg: {} as never, @@ -378,16 +386,66 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); - it("keeps auto mode plain text on non-streaming send path", async () => { + it("streams auto mode plain final text when streaming is enabled", async () => { const { options } = createDispatcherHarness(); await options.deliver({ text: "plain text" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith("plain text", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("keeps auto mode plain tool text on the message path when streaming is enabled", async () => { + const { options } = createDispatcherHarness(); + await options.deliver({ text: "tool summary" }, { kind: "tool" }); + + expect(streamingInstances).toHaveLength(0); + expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1); + expectMockArgFields(sendMessageFeishuMock, "message send params", { + text: "tool summary", + }); + expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("keeps active auto mode streaming sessions from swallowing tool text", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onAssistantMessageStart?.(); + await options.deliver({ text: "tool summary" }, { kind: "tool" }); + await options.deliver({ text: "plain final answer" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].start).toHaveBeenCalledTimes(1); + expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1); + expectMockArgFields(sendMessageFeishuMock, "message send params", { + text: "tool summary", + }); + expect(streamingInstances[0].close).toHaveBeenCalledWith("plain final answer", { + note: "Agent: agent", + }); + expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("keeps auto mode plain text on the message path when streaming is disabled", async () => { + const options = setupNonStreamingAutoDispatcher(); + await options.deliver({ text: "plain text" }, { kind: "final" }); expect(streamingInstances).toHaveLength(0); expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1); expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); }); - it("does not attach automatic mentions to plain text replies", async () => { + it("does not attach automatic mentions to non-streaming plain text replies", async () => { + useNonStreamingAutoAccount(); + const { options } = createDispatcherHarness({ replyToMessageId: "om_msg", }); @@ -611,6 +669,55 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); }); + it("waits for deliverable text before starting a card after assistant message start", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onAssistantMessageStart?.(); + await options.deliver({ text: "plain final answer" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].start).toHaveBeenCalledTimes(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith("plain final answer", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + }); + + it("does not create an empty card when assistant message start has no deliverable final", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onAssistantMessageStart?.(); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(0); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled(); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("starts a streaming card from partial snapshots in auto mode", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + result.replyOptions.onPartialReply?.({ text: "plain" }); + result.replyOptions.onPartialReply?.({ text: "plain streamed answer" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith("plain streamed answer", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + }); + it("skips distinct late final text after streaming card close", async () => { resolveFeishuAccountMock.mockReturnValue({ accountId: "main", @@ -844,6 +951,61 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); + it("discards partial streaming text when final replies send voice media", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + result.replyOptions.onPartialReply?.({ text: "spoken reply" }); + await options.deliver( + { + text: "spoken reply", + mediaUrl: "https://example.com/reply.mp3", + audioAsVoice: true, + }, + { kind: "final" }, + ); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].discard).toHaveBeenCalledTimes(1); + expect(streamingInstances[0].close).not.toHaveBeenCalled(); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1); + expectMockArgFields(sendMediaFeishuMock, "media send params", { + mediaUrl: "https://example.com/reply.mp3", + audioAsVoice: true, + }); + }); + + it("keeps partial streaming text when final replies send regular media only", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + result.replyOptions.onPartialReply?.({ text: "caption from stream" }); + await options.deliver( + { + mediaUrl: "https://example.com/image.png", + }, + { kind: "final" }, + ); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].discard).not.toHaveBeenCalled(); + expect(streamingInstances[0].close).toHaveBeenCalledWith("caption from stream", { + note: "Agent: agent", + }); + expect(sendMessageFeishuMock).not.toHaveBeenCalled(); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1); + expectMockArgFields(sendMediaFeishuMock, "media send params", { + mediaUrl: "https://example.com/image.png", + }); + }); + it("sends skipped voice text when final voice media degrades to a file attachment", async () => { sendMediaFeishuMock.mockResolvedValueOnce({ messageId: "file_msg", @@ -889,6 +1051,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); it("preserves captions for regular audio attachments", async () => { + useNonStreamingAutoAccount(); const { options } = createDispatcherHarness(); await options.deliver( { @@ -928,6 +1091,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); it("falls back to legacy mediaUrl when mediaUrls is an empty array", async () => { + useNonStreamingAutoAccount(); const { options } = createDispatcherHarness(); await options.deliver( { text: "caption", mediaUrl: "https://example.com/a.png", mediaUrls: [] }, @@ -961,6 +1125,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); it("passes replyInThread to sendMessageFeishu for plain text", async () => { + useNonStreamingAutoAccount(); const { options } = createDispatcherHarness({ replyToMessageId: "om_msg", replyInThread: true, @@ -974,6 +1139,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); it("allows top-level fallback for normal group quoted replies", async () => { + useNonStreamingAutoAccount(); const { options } = createDispatcherHarness({ replyToMessageId: "om_quote_reply", replyInThread: true, @@ -990,6 +1156,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); it("keeps native topic replies opted out of top-level fallback", async () => { + useNonStreamingAutoAccount(); const { options } = createDispatcherHarness({ replyToMessageId: "om_topic_root", replyInThread: true, diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 519cc7c71ff..6b2fd4f0e70 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -375,6 +375,18 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP })(); }; + const resetStreamingState = () => { + streaming = null; + streamingStartPromise = null; + partialUpdateQueue = Promise.resolve(); + streamText = ""; + lastPartial = ""; + reasoningText = ""; + statusLine = ""; + snapshotBaseText = ""; + lastSnapshotTextLength = 0; + }; + const closeStreaming = async (options?: { markClosedForReply?: boolean }) => { try { if (streamingStartPromise) { @@ -397,21 +409,31 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } } } finally { - streaming = null; - streamingStartPromise = null; - partialUpdateQueue = Promise.resolve(); - streamText = ""; - lastPartial = ""; - reasoningText = ""; - statusLine = ""; - snapshotBaseText = ""; - lastSnapshotTextLength = 0; + resetStreamingState(); } }; - const updateStreamingStatusLine = (nextStatusLine: string) => { + const discardStreamingPreview = async () => { + try { + if (streamingStartPromise) { + await streamingStartPromise; + } + await partialUpdateQueue; + if (streaming?.isActive()) { + await streaming.discard(); + } + } finally { + resetStreamingState(); + } + }; + + const updateStreamingStatusLine = ( + nextStatusLine: string, + options?: { startIfNeeded?: boolean }, + ) => { statusLine = nextStatusLine; - if (!streaming?.isActive() && !streamingStartPromise && renderMode !== "card") { + const hasStreamingSession = Boolean(streaming?.isActive() || streamingStartPromise); + if (!hasStreamingSession && (options?.startIfNeeded === false || renderMode !== "card")) { return; } startStreaming(); @@ -536,9 +558,11 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP ...(payload.audioAsVoice === true ? { audioAsVoice: true } : {}), }), ); + const streamingCardEnabledForReplyKind = streamingEnabled && info?.kind === "final"; const useCard = hasText && - (renderMode === "card" || + (streamingCardEnabledForReplyKind || + renderMode === "card" || (info?.kind === "block" && coreBlockStreamingEnabled && renderMode !== "raw") || (renderMode === "auto" && shouldUseCard(text))); const skipTextForDuplicateFinal = @@ -555,11 +579,19 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP !hasVoiceMedia && !skipTextForDuplicateFinal && !skipTextForClosedStreamingFinal; + const shouldDiscardStreamingPreview = + info?.kind === "final" && + hasMedia && + ((hasVoiceMedia && !shouldDeliverText) || skipTextForDuplicateFinal); if (!shouldDeliverText && !hasMedia) { return; } + if (shouldDiscardStreamingPreview) { + await discardStreamingPreview(); + } + if (shouldDeliverText) { if (info?.kind === "block") { // Drop internal block chunks unless we can safely consume them as @@ -580,7 +612,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } } - if (streaming?.isActive()) { + const shouldStreamText = info?.kind === "block" || info?.kind === "final"; + if (streaming?.isActive() && shouldStreamText) { if (info?.kind === "block") { // Some runtimes emit block payloads without onPartial/final callbacks. // Mirror block text into streamText so onIdle close still sends content. @@ -684,6 +717,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (!cleaned) { return; } + startStreaming(); queueStreamingUpdate(cleaned, { dedupeWithLastPartial: true, mode: "snapshot", @@ -729,7 +763,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP : undefined, onAssistantMessageStart: streamingEnabled ? () => { - updateStreamingStatusLine(""); + updateStreamingStatusLine("", { startIfNeeded: false }); } : undefined, onCompactionStart: streamingEnabled diff --git a/extensions/feishu/src/streaming-card.ts b/extensions/feishu/src/streaming-card.ts index 3dff22f3e7f..6d139fddd30 100644 --- a/extensions/feishu/src/streaming-card.ts +++ b/extensions/feishu/src/streaming-card.ts @@ -535,8 +535,9 @@ export class FeishuStreamingSession { const text = finalText ?? pendingMerged; const apiBase = resolveApiBase(this.creds.domain); - // Only send final update if content differs from what's already displayed - if (text && text !== this.state.sentText) { + // Only send final update if content differs from what's already displayed. + // An explicit empty final text clears a transient preview before closeout. + if ((text || finalText !== undefined) && text !== this.state.sentText) { const sent = text.startsWith(this.state.sentText) ? await this.updateCardContent( resolveStreamingCardAppendContent(this.state.sentText, text), @@ -589,6 +590,32 @@ export class FeishuStreamingSession { this.log?.(`Closed streaming: cardId=${finalState.cardId}`); } + async discard(): Promise { + if (!this.state || this.closed) { + return; + } + this.closed = true; + this.clearFlushTimer(); + await this.queue; + + const currentState = this.state; + try { + const response = await this.client.im.message.delete({ + path: { message_id: currentState.messageId }, + }); + if (response.code !== undefined && response.code !== 0) { + throw new Error(`Delete streaming card message failed: ${response.msg ?? response.code}`); + } + this.state = null; + this.pendingText = null; + this.log?.(`Discarded streaming card: cardId=${currentState.cardId}`); + } catch (error) { + this.log?.(`Discard failed: ${String(error)}`); + this.closed = false; + await this.close(""); + } + } + isActive(): boolean { return this.state !== null && !this.closed; }