fix(feishu): coalesce streaming card final delivery

This commit is contained in:
Vincent Koc
2026-04-25 03:06:38 -07:00
committed by GitHub
parent 38703ed9a1
commit 455eba7f94
5 changed files with 240 additions and 27 deletions

View File

@@ -296,6 +296,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
rootId: "om_root_topic",
});
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].start).toHaveBeenCalledTimes(1);
@@ -330,20 +331,17 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
it("delivers distinct final payloads after streaming close", async () => {
it("coalesces distinct final payloads into one streaming card until idle", async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "```md\n完整回复第一段\n```" }, { kind: "final" });
await options.deliver({ text: "```md\n完整回复第一段 + 第二段\n```" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(2);
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\n完整回复第一段\n```", {
note: "Agent: agent",
});
expect(streamingInstances[1].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[1].close).toHaveBeenCalledWith(
expect(streamingInstances[0].close).toHaveBeenCalledWith(
"```md\n完整回复第一段 + 第二段\n```",
{
note: "Agent: agent",
@@ -358,6 +356,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" });
await options.onIdle?.();
await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(1);
@@ -370,6 +369,17 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
it("skips final text already closed by idle streaming", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
@@ -454,6 +464,33 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
it("skips block payloads that exactly repeat the latest partial snapshot", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onPartialReply?.({ text: "```md\npartial\n```" });
await options.deliver({ text: "```md\npartial\n```" }, { kind: "block" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\npartial\n```", {
note: "Agent: agent",
});
});
it("sends media-only payloads as attachments", async () => {
const { options } = createDispatcherHarness();
await options.deliver({ mediaUrl: "https://example.com/a.png" }, { kind: "final" });
@@ -508,6 +545,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
{ text: "```ts\nconst x = 1\n```", mediaUrls: ["https://example.com/a.png"] },
{ kind: "final" },
);
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].start).toHaveBeenCalledTimes(1);
@@ -576,6 +614,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
result.replyOptions.onPartialReply?.({ text: "answer part" });
result.replyOptions.onReasoningEnd?.();
await options.deliver({ text: "answer part final" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
const updateCalls = streamingInstances[0].update.mock.calls.map((c: unknown[]) =>
@@ -667,6 +706,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
result.replyOptions.onReasoningStream?.({ text: "" });
result.replyOptions.onPartialReply?.({ text: "```ts\ncode\n```" });
await options.deliver({ text: "```ts\ncode\n```" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
const closeArg = streamingInstances[0].close.mock.calls[0][0] as string;
@@ -684,6 +724,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" });
result.replyOptions.onReasoningEnd?.();
await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledTimes(1);
@@ -798,6 +839,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
// or transient Feishu failures recover without a process restart.
nowSpy.mockReturnValue(62_000);
await options.deliver({ text: "```ts\nconst z = 3\n```" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(2);
expect(streamingInstances[1].start).toHaveBeenCalled();

View File

@@ -453,12 +453,11 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (info?.kind === "block") {
// Some runtimes emit block payloads without onPartial/final callbacks.
// Mirror block text into streamText so onIdle close still sends content.
queueStreamingUpdate(text, { mode: "delta" });
queueStreamingUpdate(text, { mode: "delta", dedupeWithLastPartial: true });
}
if (info?.kind === "final") {
streamText = mergeStreamingText(streamText, text);
await closeStreaming();
deliveredFinalTexts.add(text);
streamText = text;
flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText));
}
// Send media even when streaming handled the text
if (hasMedia) {

View File

@@ -1,5 +1,141 @@
import { describe, expect, it } from "vitest";
import { mergeStreamingText, resolveStreamingCardSendMode } from "./streaming-card.js";
import { beforeEach, describe, expect, it, vi } from "vitest";
const fetchWithSsrFGuardMock = vi.hoisted(() => vi.fn());
vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({
fetchWithSsrFGuard: fetchWithSsrFGuardMock,
}));
import {
FeishuStreamingSession,
mergeStreamingText,
resolveStreamingCardSendMode,
} from "./streaming-card.js";
type StreamingSessionState = {
cardId: string;
messageId: string;
sequence: number;
currentText: string;
hasNote: boolean;
};
function setStreamingSessionInternals(
session: FeishuStreamingSession,
values: {
state: StreamingSessionState;
lastUpdateTime?: number;
},
) {
const internals = session as unknown as {
state: StreamingSessionState;
lastUpdateTime: number;
};
internals.state = values.state;
if (values.lastUpdateTime !== undefined) {
internals.lastUpdateTime = values.lastUpdateTime;
}
}
describe("FeishuStreamingSession", () => {
beforeEach(() => {
vi.useRealTimers();
fetchWithSsrFGuardMock.mockReset();
});
function mockFetches(updateBodies: string[]) {
fetchWithSsrFGuardMock.mockImplementation(
async ({ url, init }: { url: string; init?: { body?: string } }) => {
const release = vi.fn(async () => {});
if (url.includes("/auth/")) {
return {
response: {
ok: true,
json: async () => ({
code: 0,
msg: "ok",
tenant_access_token: "token",
expire: 7200,
}),
},
release,
};
}
if (url.includes("/elements/content/content")) {
updateBodies.push(init?.body ?? "");
}
return {
response: {
ok: true,
json: async () => ({ code: 0, msg: "ok" }),
},
release,
};
},
);
}
it("flushes throttled pending text after the throttle window", async () => {
vi.useFakeTimers();
vi.setSystemTime(1_000);
const updateBodies: string[] = [];
mockFetches(updateBodies);
const session = new FeishuStreamingSession({} as never, {
appId: "app_pending_flush",
appSecret: "secret",
});
setStreamingSessionInternals(session, {
state: {
cardId: "card_1",
messageId: "om_1",
sequence: 1,
currentText: "hello",
hasNote: false,
},
lastUpdateTime: 1_000,
});
await session.update("hello small");
expect(updateBodies).toHaveLength(0);
await vi.advanceTimersByTimeAsync(160);
expect(updateBodies).toHaveLength(1);
expect(JSON.parse(updateBodies[0] ?? "{}")).toMatchObject({
content: "hello small",
});
});
it("pushes natural-boundary updates immediately inside the throttle window", async () => {
vi.useFakeTimers();
vi.setSystemTime(2_000);
const updateBodies: string[] = [];
mockFetches(updateBodies);
const session = new FeishuStreamingSession({} as never, {
appId: "app_boundary_flush",
appSecret: "secret",
});
setStreamingSessionInternals(session, {
state: {
cardId: "card_2",
messageId: "om_2",
sequence: 1,
currentText: "hello",
hasNote: false,
},
lastUpdateTime: 2_000,
});
await session.update("hello!");
expect(updateBodies).toHaveLength(1);
expect(JSON.parse(updateBodies[0] ?? "{}")).toMatchObject({
content: "hello!",
});
});
});
describe("mergeStreamingText", () => {
it("prefers the latest full text when it already includes prior text", () => {

View File

@@ -39,6 +39,9 @@ type StreamingStartOptions = {
header?: StreamingCardHeader;
};
const STREAMING_UPDATE_THROTTLE_MS = 160;
const STREAMING_SIGNIFICANT_DELTA_CHARS = 18;
// Token cache (keyed by domain + appId)
const tokenCache = new Map<string, { token: string; expiresAt: number }>();
@@ -112,6 +115,20 @@ function truncateSummary(text: string, max = 50): string {
return clean.length <= max ? clean : clean.slice(0, max - 3) + "...";
}
function hasNaturalStreamingBoundary(text: string): boolean {
return /[\n。!?;:]$/.test(text);
}
function shouldPushStreamingUpdate(previousText: string, nextText: string): boolean {
if (!previousText) {
return true;
}
if (hasNaturalStreamingBoundary(nextText)) {
return true;
}
return nextText.length - previousText.length >= STREAMING_SIGNIFICANT_DELTA_CHARS;
}
export function mergeStreamingText(
previousText: string | undefined,
nextText: string | undefined,
@@ -169,7 +186,7 @@ export class FeishuStreamingSession {
private lastUpdateTime = 0;
private pendingText: string | null = null;
private flushTimer: ReturnType<typeof setTimeout> | null = null;
private updateThrottleMs = 100; // Throttle updates to max 10/sec
private updateThrottleMs = STREAMING_UPDATE_THROTTLE_MS;
constructor(client: Client, creds: Credentials, log?: (msg: string) => void) {
this.client = client;
@@ -324,6 +341,28 @@ export class FeishuStreamingSession {
.catch((error) => onError?.(error));
}
private clearFlushTimer(): void {
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
}
private schedulePendingFlush(): void {
if (this.flushTimer || !this.pendingText || this.closed) {
return;
}
const delayMs = Math.max(0, this.updateThrottleMs - (Date.now() - this.lastUpdateTime));
this.flushTimer = setTimeout(() => {
this.flushTimer = null;
const pending = this.pendingText;
if (!pending || this.closed) {
return;
}
void this.update(pending);
}, delayMs);
}
async update(text: string): Promise<void> {
if (!this.state || this.closed) {
return;
@@ -332,28 +371,27 @@ export class FeishuStreamingSession {
if (!mergedInput || mergedInput === this.state.currentText) {
return;
}
this.pendingText = mergedInput;
this.clearFlushTimer();
// Throttle: skip if updated recently, but remember pending text
const shouldForceUpdate = shouldPushStreamingUpdate(this.state.currentText, mergedInput);
const now = Date.now();
if (now - this.lastUpdateTime < this.updateThrottleMs) {
this.pendingText = mergedInput;
if (!shouldForceUpdate && now - this.lastUpdateTime < this.updateThrottleMs) {
this.schedulePendingFlush();
return;
}
this.pendingText = null;
this.lastUpdateTime = now;
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
this.queue = this.queue.then(async () => {
if (!this.state || this.closed) {
return;
}
const mergedText = mergeStreamingText(this.state.currentText, mergedInput);
const nextText = this.pendingText ?? mergedInput;
const mergedText = mergeStreamingText(this.state.currentText, nextText);
if (!mergedText || mergedText === this.state.currentText) {
return;
}
this.pendingText = null;
this.state.currentText = mergedText;
await this.updateCardContent(mergedText, (e) => this.log?.(`Update failed: ${String(e)}`));
});
@@ -395,10 +433,7 @@ export class FeishuStreamingSession {
return;
}
this.closed = true;
if (this.flushTimer) {
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
this.clearFlushTimer();
await this.queue;
const pendingMerged = mergeStreamingText(this.state.currentText, this.pendingText ?? undefined);