fix(feishu): stream plain replies as cards

Feishu `channels.feishu.streaming=true` now streams ordinary assistant replies through CardKit in auto mode, while keeping tool-summary delivery on the existing message path.

Also discards stale partial previews when final delivery intentionally suppresses text for voice media or duplicate final text, and preserves streamed partial text for regular media-only finals.

Verification:
- `node scripts/run-vitest.mjs run extensions/feishu/src/reply-dispatcher.test.ts`
- `pnpm tsgo:extensions`
- `pnpm test:extensions:package-boundary:compile`
- `pnpm exec oxfmt --check extensions/feishu/src/reply-dispatcher.ts extensions/feishu/src/reply-dispatcher.test.ts extensions/feishu/src/streaming-card.ts`
- `git diff --check`
- `.agents/skills/autoreview/scripts/autoreview --mode branch --base origin/main`
- GitHub PR checks on run 26689677607 passed except repeated unrelated broad Vitest no-output timeouts in `checks-node-agentic-commands-doctor` and `checks-node-core-runtime-infra-state`.

Co-authored-by: 传妈 <chuanmother@chuanMac-Mini.local>
This commit is contained in:
chuanchuan
2026-05-31 01:47:03 +08:00
committed by GitHub
parent ca4a12381a
commit 3b8ab4e112
3 changed files with 247 additions and 19 deletions

View File

@@ -5,6 +5,7 @@ type StreamingSessionStub = {
start: ReturnType<typeof vi.fn>;
update: ReturnType<typeof vi.fn>;
close: ReturnType<typeof vi.fn>;
discard: ReturnType<typeof vi.fn>;
isActive: ReturnType<typeof vi.fn>;
};
@@ -84,6 +85,9 @@ vi.mock("./streaming-card.js", () => {
close = vi.fn(async () => {
this.active = false;
});
discard = vi.fn(async () => {
this.active = false;
});
isActive = vi.fn(() => this.active);
constructor() {
@@ -163,7 +167,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
function setupNonStreamingAutoDispatcher() {
function useNonStreamingAutoAccount() {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
@@ -174,6 +178,10 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
streaming: false,
},
});
}
function setupNonStreamingAutoDispatcher() {
useNonStreamingAutoAccount();
createFeishuReplyDispatcher({
cfg: {} as never,
@@ -378,16 +386,66 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
it("keeps auto mode plain text on non-streaming send path", async () => {
it("streams auto mode plain final text when streaming is enabled", async () => {
const { options } = createDispatcherHarness();
await options.deliver({ text: "plain text" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("plain text", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("keeps auto mode plain tool text on the message path when streaming is enabled", async () => {
const { options } = createDispatcherHarness();
await options.deliver({ text: "tool summary" }, { kind: "tool" });
expect(streamingInstances).toHaveLength(0);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expectMockArgFields(sendMessageFeishuMock, "message send params", {
text: "tool summary",
});
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("keeps active auto mode streaming sessions from swallowing tool text", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onAssistantMessageStart?.();
await options.deliver({ text: "tool summary" }, { kind: "tool" });
await options.deliver({ text: "plain final answer" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].start).toHaveBeenCalledTimes(1);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expectMockArgFields(sendMessageFeishuMock, "message send params", {
text: "tool summary",
});
expect(streamingInstances[0].close).toHaveBeenCalledWith("plain final answer", {
note: "Agent: agent",
});
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("keeps auto mode plain text on the message path when streaming is disabled", async () => {
const options = setupNonStreamingAutoDispatcher();
await options.deliver({ text: "plain text" }, { kind: "final" });
expect(streamingInstances).toHaveLength(0);
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(1);
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("does not attach automatic mentions to plain text replies", async () => {
it("does not attach automatic mentions to non-streaming plain text replies", async () => {
useNonStreamingAutoAccount();
const { options } = createDispatcherHarness({
replyToMessageId: "om_msg",
});
@@ -611,6 +669,55 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("waits for deliverable text before starting a card after assistant message start", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onAssistantMessageStart?.();
await options.deliver({ text: "plain final answer" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].start).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("plain final answer", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
});
it("does not create an empty card when assistant message start has no deliverable final", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onAssistantMessageStart?.();
await options.onIdle?.();
expect(streamingInstances).toHaveLength(0);
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("starts a streaming card from partial snapshots in auto mode", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
result.replyOptions.onPartialReply?.({ text: "plain" });
result.replyOptions.onPartialReply?.({ text: "plain streamed answer" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("plain streamed answer", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
});
it("skips distinct late final text after streaming card close", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
@@ -844,6 +951,61 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
it("discards partial streaming text when final replies send voice media", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
result.replyOptions.onPartialReply?.({ text: "spoken reply" });
await options.deliver(
{
text: "spoken reply",
mediaUrl: "https://example.com/reply.mp3",
audioAsVoice: true,
},
{ kind: "final" },
);
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].discard).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).not.toHaveBeenCalled();
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1);
expectMockArgFields(sendMediaFeishuMock, "media send params", {
mediaUrl: "https://example.com/reply.mp3",
audioAsVoice: true,
});
});
it("keeps partial streaming text when final replies send regular media only", async () => {
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
result.replyOptions.onPartialReply?.({ text: "caption from stream" });
await options.deliver(
{
mediaUrl: "https://example.com/image.png",
},
{ kind: "final" },
);
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].discard).not.toHaveBeenCalled();
expect(streamingInstances[0].close).toHaveBeenCalledWith("caption from stream", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
expect(sendMediaFeishuMock).toHaveBeenCalledTimes(1);
expectMockArgFields(sendMediaFeishuMock, "media send params", {
mediaUrl: "https://example.com/image.png",
});
});
it("sends skipped voice text when final voice media degrades to a file attachment", async () => {
sendMediaFeishuMock.mockResolvedValueOnce({
messageId: "file_msg",
@@ -889,6 +1051,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
it("preserves captions for regular audio attachments", async () => {
useNonStreamingAutoAccount();
const { options } = createDispatcherHarness();
await options.deliver(
{
@@ -928,6 +1091,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
it("falls back to legacy mediaUrl when mediaUrls is an empty array", async () => {
useNonStreamingAutoAccount();
const { options } = createDispatcherHarness();
await options.deliver(
{ text: "caption", mediaUrl: "https://example.com/a.png", mediaUrls: [] },
@@ -961,6 +1125,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
it("passes replyInThread to sendMessageFeishu for plain text", async () => {
useNonStreamingAutoAccount();
const { options } = createDispatcherHarness({
replyToMessageId: "om_msg",
replyInThread: true,
@@ -974,6 +1139,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
it("allows top-level fallback for normal group quoted replies", async () => {
useNonStreamingAutoAccount();
const { options } = createDispatcherHarness({
replyToMessageId: "om_quote_reply",
replyInThread: true,
@@ -990,6 +1156,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
it("keeps native topic replies opted out of top-level fallback", async () => {
useNonStreamingAutoAccount();
const { options } = createDispatcherHarness({
replyToMessageId: "om_topic_root",
replyInThread: true,

View File

@@ -375,6 +375,18 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
})();
};
const resetStreamingState = () => {
streaming = null;
streamingStartPromise = null;
partialUpdateQueue = Promise.resolve();
streamText = "";
lastPartial = "";
reasoningText = "";
statusLine = "";
snapshotBaseText = "";
lastSnapshotTextLength = 0;
};
const closeStreaming = async (options?: { markClosedForReply?: boolean }) => {
try {
if (streamingStartPromise) {
@@ -397,21 +409,31 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
}
}
} finally {
streaming = null;
streamingStartPromise = null;
partialUpdateQueue = Promise.resolve();
streamText = "";
lastPartial = "";
reasoningText = "";
statusLine = "";
snapshotBaseText = "";
lastSnapshotTextLength = 0;
resetStreamingState();
}
};
const updateStreamingStatusLine = (nextStatusLine: string) => {
const discardStreamingPreview = async () => {
try {
if (streamingStartPromise) {
await streamingStartPromise;
}
await partialUpdateQueue;
if (streaming?.isActive()) {
await streaming.discard();
}
} finally {
resetStreamingState();
}
};
const updateStreamingStatusLine = (
nextStatusLine: string,
options?: { startIfNeeded?: boolean },
) => {
statusLine = nextStatusLine;
if (!streaming?.isActive() && !streamingStartPromise && renderMode !== "card") {
const hasStreamingSession = Boolean(streaming?.isActive() || streamingStartPromise);
if (!hasStreamingSession && (options?.startIfNeeded === false || renderMode !== "card")) {
return;
}
startStreaming();
@@ -536,9 +558,11 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
...(payload.audioAsVoice === true ? { audioAsVoice: true } : {}),
}),
);
const streamingCardEnabledForReplyKind = streamingEnabled && info?.kind === "final";
const useCard =
hasText &&
(renderMode === "card" ||
(streamingCardEnabledForReplyKind ||
renderMode === "card" ||
(info?.kind === "block" && coreBlockStreamingEnabled && renderMode !== "raw") ||
(renderMode === "auto" && shouldUseCard(text)));
const skipTextForDuplicateFinal =
@@ -555,11 +579,19 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
!hasVoiceMedia &&
!skipTextForDuplicateFinal &&
!skipTextForClosedStreamingFinal;
const shouldDiscardStreamingPreview =
info?.kind === "final" &&
hasMedia &&
((hasVoiceMedia && !shouldDeliverText) || skipTextForDuplicateFinal);
if (!shouldDeliverText && !hasMedia) {
return;
}
if (shouldDiscardStreamingPreview) {
await discardStreamingPreview();
}
if (shouldDeliverText) {
if (info?.kind === "block") {
// Drop internal block chunks unless we can safely consume them as
@@ -580,7 +612,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
}
}
if (streaming?.isActive()) {
const shouldStreamText = info?.kind === "block" || info?.kind === "final";
if (streaming?.isActive() && shouldStreamText) {
if (info?.kind === "block") {
// Some runtimes emit block payloads without onPartial/final callbacks.
// Mirror block text into streamText so onIdle close still sends content.
@@ -684,6 +717,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (!cleaned) {
return;
}
startStreaming();
queueStreamingUpdate(cleaned, {
dedupeWithLastPartial: true,
mode: "snapshot",
@@ -729,7 +763,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
: undefined,
onAssistantMessageStart: streamingEnabled
? () => {
updateStreamingStatusLine("");
updateStreamingStatusLine("", { startIfNeeded: false });
}
: undefined,
onCompactionStart: streamingEnabled

View File

@@ -535,8 +535,9 @@ export class FeishuStreamingSession {
const text = finalText ?? pendingMerged;
const apiBase = resolveApiBase(this.creds.domain);
// Only send final update if content differs from what's already displayed
if (text && text !== this.state.sentText) {
// Only send final update if content differs from what's already displayed.
// An explicit empty final text clears a transient preview before closeout.
if ((text || finalText !== undefined) && text !== this.state.sentText) {
const sent = text.startsWith(this.state.sentText)
? await this.updateCardContent(
resolveStreamingCardAppendContent(this.state.sentText, text),
@@ -589,6 +590,32 @@ export class FeishuStreamingSession {
this.log?.(`Closed streaming: cardId=${finalState.cardId}`);
}
async discard(): Promise<void> {
if (!this.state || this.closed) {
return;
}
this.closed = true;
this.clearFlushTimer();
await this.queue;
const currentState = this.state;
try {
const response = await this.client.im.message.delete({
path: { message_id: currentState.messageId },
});
if (response.code !== undefined && response.code !== 0) {
throw new Error(`Delete streaming card message failed: ${response.msg ?? response.code}`);
}
this.state = null;
this.pendingText = null;
this.log?.(`Discarded streaming card: cardId=${currentState.cardId}`);
} catch (error) {
this.log?.(`Discard failed: ${String(error)}`);
this.closed = false;
await this.close("");
}
}
isActive(): boolean {
return this.state !== null && !this.closed;
}