Feishu: harden streaming merge semantics and final reply dedupe (#33245)

* Feishu: close duplicate final gap and cover routing precedence

* Feishu: resolve reviewer duplicate-final and routing feedback

* Feishu: tighten streaming send-mode option typing

* Feishu: fix reverse-overlap streaming merge ordering

* Feishu: align streaming final dedupe test expectation

* Feishu: allow distinct streaming finals while deduping repeats

---------

Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
This commit is contained in:
rexl2018
2026-03-05 11:32:35 +08:00
committed by GitHub
parent 8b8167d547
commit 3bf6ed181e
5 changed files with 99 additions and 86 deletions

View File

@@ -300,7 +300,6 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
expect(sendMessageFeishuMock).not.toHaveBeenCalled();
expect(sendMarkdownCardFeishuMock).not.toHaveBeenCalled();
});
it("suppresses duplicate final text while still sending media", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
@@ -341,6 +340,40 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
);
});
it("keeps distinct non-streaming final payloads", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "auto",
streaming: false,
},
});
createFeishuReplyDispatcher({
cfg: {} as never,
agentId: "agent",
runtime: { log: vi.fn(), error: vi.fn() } as never,
chatId: "oc_chat",
});
const options = createReplyDispatcherWithTypingMock.mock.calls[0]?.[0];
await options.deliver({ text: "notice header" }, { kind: "final" });
await options.deliver({ text: "actual answer body" }, { kind: "final" });
expect(sendMessageFeishuMock).toHaveBeenCalledTimes(2);
expect(sendMessageFeishuMock).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ text: "notice header" }),
);
expect(sendMessageFeishuMock).toHaveBeenNthCalledWith(
2,
expect.objectContaining({ text: "actual answer body" }),
);
});
it("treats block updates as delta chunks", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",

View File

@@ -143,7 +143,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
let streaming: FeishuStreamingSession | null = null;
let streamText = "";
let lastPartial = "";
let lastFinalText: string | null = null;
const deliveredFinalTexts = new Set<string>();
let partialUpdateQueue: Promise<void> = Promise.resolve();
let streamingStartPromise: Promise<void> | null = null;
type StreamTextUpdateMode = "snapshot" | "delta";
@@ -230,7 +230,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
responsePrefixContextProvider: prefixContext.responsePrefixContextProvider,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, agentId),
onReplyStart: () => {
lastFinalText = null;
deliveredFinalTexts.clear();
if (streamingEnabled && renderMode === "card") {
startStreaming();
}
@@ -246,10 +246,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
: [];
const hasText = Boolean(text.trim());
const hasMedia = mediaList.length > 0;
// Suppress only exact duplicate final text payloads to avoid
// dropping legitimate multi-part final replies.
const skipTextForDuplicateFinal =
info?.kind === "final" && hasText && lastFinalText === text;
info?.kind === "final" && hasText && deliveredFinalTexts.has(text);
const shouldDeliverText = hasText && !skipTextForDuplicateFinal;
if (!shouldDeliverText && !hasMedia) {
@@ -287,7 +285,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (info?.kind === "final") {
streamText = mergeStreamingText(streamText, text);
await closeStreaming();
lastFinalText = text;
deliveredFinalTexts.add(text);
}
// Send media even when streaming handled the text
if (hasMedia) {
@@ -324,7 +322,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
first = false;
}
if (info?.kind === "final") {
lastFinalText = text;
deliveredFinalTexts.add(text);
}
} else {
const converted = core.channel.text.convertMarkdownTables(text, tableMode);
@@ -345,7 +343,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
first = false;
}
if (info?.kind === "final") {
lastFinalText = text;
deliveredFinalTexts.add(text);
}
}
}

View File

@@ -1,12 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const fetchWithSsrFGuardMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/feishu", () => ({
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
}));
import { FeishuStreamingSession, mergeStreamingText } from "./streaming-card.js";
import { describe, expect, it } from "vitest";
import { mergeStreamingText, resolveStreamingCardSendMode } from "./streaming-card.js";
describe("mergeStreamingText", () => {
it("prefers the latest full text when it already includes prior text", () => {
@@ -28,59 +21,34 @@ describe("mergeStreamingText", () => {
expect(mergeStreamingText("revision_id: 552", "2一点变化都没有")).toBe(
"revision_id: 552一点变化都没有",
);
expect(mergeStreamingText("abc", "cabc")).toBe("cabc");
});
});
describe("FeishuStreamingSession routing", () => {
beforeEach(() => {
vi.clearAllMocks();
fetchWithSsrFGuardMock.mockReset();
});
it("prefers message.reply when reply target and root id both exist", async () => {
fetchWithSsrFGuardMock
.mockResolvedValueOnce({
response: { json: async () => ({ code: 0, msg: "ok", tenant_access_token: "token" }) },
release: async () => {},
})
.mockResolvedValueOnce({
response: { json: async () => ({ code: 0, msg: "ok", data: { card_id: "card_1" } }) },
release: async () => {},
});
const replyMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_reply" } }));
const createMock = vi.fn(async () => ({ code: 0, data: { message_id: "msg_create" } }));
const session = new FeishuStreamingSession(
{
im: {
message: {
reply: replyMock,
create: createMock,
},
},
} as never,
{
appId: "app",
appSecret: "secret",
domain: "feishu",
},
);
await session.start("oc_chat", "chat_id", {
replyToMessageId: "om_parent",
replyInThread: true,
rootId: "om_topic_root",
});
expect(replyMock).toHaveBeenCalledTimes(1);
expect(replyMock).toHaveBeenCalledWith({
path: { message_id: "om_parent" },
data: expect.objectContaining({
msg_type: "interactive",
reply_in_thread: true,
describe("resolveStreamingCardSendMode", () => {
it("prefers message.reply when reply target and root id both exist", () => {
expect(
resolveStreamingCardSendMode({
replyToMessageId: "om_parent",
rootId: "om_topic_root",
}),
});
expect(createMock).not.toHaveBeenCalled();
).toBe("reply");
});
it("falls back to root create when reply target is absent", () => {
expect(
resolveStreamingCardSendMode({
rootId: "om_topic_root",
}),
).toBe("root_create");
});
it("uses create mode when no reply routing fields are provided", () => {
expect(resolveStreamingCardSendMode()).toBe("create");
expect(
resolveStreamingCardSendMode({
replyInThread: true,
}),
).toBe("create");
});
});

View File

@@ -16,6 +16,13 @@ export type StreamingCardHeader = {
template?: string;
};
type StreamingStartOptions = {
replyToMessageId?: string;
replyInThread?: boolean;
rootId?: string;
header?: StreamingCardHeader;
};
// Token cache (keyed by domain + appId)
const tokenCache = new Map<string, { token: string; expiresAt: number }>();
@@ -103,6 +110,12 @@ export function mergeStreamingText(
if (previous.startsWith(next)) {
return previous;
}
if (next.includes(previous)) {
return next;
}
if (previous.includes(next)) {
return previous;
}
// Merge partial overlaps, e.g. "这" + "这是" => "这是".
const maxOverlap = Math.min(previous.length, next.length);
@@ -111,17 +124,20 @@ export function mergeStreamingText(
return `${previous}${next.slice(overlap)}`;
}
}
if (next.includes(previous)) {
return next;
}
if (previous.includes(next)) {
return previous;
}
// Fallback for fragmented partial chunks: append as-is to avoid losing tokens.
return `${previous}${next}`;
}
export function resolveStreamingCardSendMode(options?: StreamingStartOptions) {
if (options?.replyToMessageId) {
return "reply";
}
if (options?.rootId) {
return "root_create";
}
return "create";
}
/** Streaming card session manager */
export class FeishuStreamingSession {
private client: Client;
@@ -143,12 +159,7 @@ export class FeishuStreamingSession {
async start(
receiveId: string,
receiveIdType: "open_id" | "user_id" | "union_id" | "email" | "chat_id" = "chat_id",
options?: {
replyToMessageId?: string;
replyInThread?: boolean;
rootId?: string;
header?: StreamingCardHeader;
},
options?: StreamingStartOptions,
): Promise<void> {
if (this.state) {
return;
@@ -204,22 +215,24 @@ export class FeishuStreamingSession {
// message.create with root_id may silently ignore root_id for card
// references (card_id format).
let sendRes;
if (options?.replyToMessageId) {
const sendOptions = options ?? {};
const sendMode = resolveStreamingCardSendMode(sendOptions);
if (sendMode === "reply") {
sendRes = await this.client.im.message.reply({
path: { message_id: options.replyToMessageId },
path: { message_id: sendOptions.replyToMessageId! },
data: {
msg_type: "interactive",
content: cardContent,
...(options.replyInThread ? { reply_in_thread: true } : {}),
...(sendOptions.replyInThread ? { reply_in_thread: true } : {}),
},
});
} else if (options?.rootId) {
} else if (sendMode === "root_create") {
// root_id is undeclared in the SDK types but accepted at runtime
sendRes = await this.client.im.message.create({
params: { receive_id_type: receiveIdType },
data: Object.assign(
{ receive_id: receiveId, msg_type: "interactive", content: cardContent },
{ root_id: options.rootId },
{ root_id: sendOptions.rootId },
),
});
} else {