fix(feishu): keep finals after streaming close errors

This commit is contained in:
Ted Li
2026-04-26 11:16:27 -07:00
committed by Mason Huang
parent bd2edcdba4
commit aa74f91078
2 changed files with 90 additions and 3 deletions

View File

@@ -958,6 +958,86 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
it("does not suppress a later final after error closeout", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
sendMediaFeishuMock.mockRejectedValueOnce(new Error("media failed"));
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await expect(
options.deliver(
{ text: "First answer", mediaUrl: "https://example.com/a.png" },
{ kind: "final" },
),
).rejects.toThrow("media failed");
await Promise.all([
options.onError?.(new Error("media failed"), { kind: "final" }),
options.onIdle?.(),
]);
await options.deliver({ text: "Second answer" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(2);
expect(streamingInstances[0].close).toHaveBeenCalledWith("First answer", {
note: "Agent: agent",
});
expect(streamingInstances[1].close).toHaveBeenCalledWith("Second answer", {
note: "Agent: agent",
});
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("does not suppress a recovery final after late media failure", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "First answer" }, { kind: "final" });
await options.onIdle?.();
sendMediaFeishuMock.mockRejectedValueOnce(new Error("media failed"));
await expect(
options.deliver(
{ text: "Late attachment", mediaUrl: "https://example.com/a.png" },
{ kind: "final" },
),
).rejects.toThrow("media failed");
await options.onError?.(new Error("media failed"), { kind: "final" });
await options.deliver({ text: "Recovered answer" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(2);
expect(streamingInstances[0].close).toHaveBeenCalledWith("First answer", {
note: "Agent: agent",
});
expect(streamingInstances[1].close).toHaveBeenCalledWith("Recovered answer", {
note: "Agent: agent",
});
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("cleans streaming state even when close throws", async () => {
const origPush = streamingInstances.push.bind(streamingInstances);
streamingInstances.push = (...args: StreamingSessionStub[]) => {

View File

@@ -229,6 +229,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
let partialUpdateQueue: Promise<void> = Promise.resolve();
let streamingStartPromise: Promise<void> | null = null;
let streamingClosedForReply = false;
let streamingCloseErroredForReply = false;
type StreamTextUpdateMode = "snapshot" | "delta";
const formatReasoningPrefix = (thinking: string): string => {
@@ -360,7 +361,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
})();
};
const closeStreaming = async () => {
const closeStreaming = async (options?: { markClosedForReply?: boolean }) => {
try {
if (streamingStartPromise) {
await streamingStartPromise;
@@ -379,7 +380,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
// the streaming card.
if (streamText) {
deliveredFinalTexts.add(streamText);
streamingClosedForReply = true;
if (options?.markClosedForReply !== false && !streamingCloseErroredForReply) {
streamingClosedForReply = true;
}
}
}
} finally {
@@ -454,6 +457,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
onReplyStart: async () => {
deliveredFinalTexts.clear();
streamingClosedForReply = false;
streamingCloseErroredForReply = false;
if (streamingEnabled && renderMode === "card") {
startStreaming();
}
@@ -472,6 +476,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
info?.kind === "final" &&
hasText &&
streamingClosedForReply &&
!streamingCloseErroredForReply &&
streamingEnabled &&
useCard;
const shouldDeliverText =
@@ -566,10 +571,12 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
}
},
onError: async (error, info) => {
streamingCloseErroredForReply = true;
streamingClosedForReply = false;
params.runtime.error?.(
`feishu[${account.accountId}] ${info.kind} reply failed: ${String(error)}`,
);
await closeStreaming();
await closeStreaming({ markClosedForReply: false });
typingCallbacks?.onIdle?.();
},
onIdle: async () => {