fix(feishu): finish streaming card closeout

This commit is contained in:
Vincent Koc
2026-04-25 04:04:03 -07:00
committed by GitHub
parent 935cd34e9f
commit 84a22a64be
3 changed files with 278 additions and 32 deletions

View File

@@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai
- Cron: tolerate malformed legacy job rows in startup, main-session system-event payloads, and human-readable `cron list` output so missing `state`, `payload.text`, or display fields no longer crash the scheduler or CLI. Fixes #66016, #65916, #64137, #57872, #59968, #63813, #52804, and #43163. (#71509) Thanks @vincentkoc.
- CLI/models: make `openclaw models scan` fall back to public OpenRouter free-model metadata when no `OPENROUTER_API_KEY` is configured, avoid config secret resolution for explicit `--no-probe` scans, and apply the scan timeout to the OpenRouter catalog request.
- Feishu: keep streaming cards to one live card per turn, flush throttled card edits after meaningful text boundaries, and skip exact block/partial repeats so tool-heavy replies do not duplicate card output. Thanks @allan0509.
- Feishu: finish the streaming-card duplicate closeout by stripping leaked reasoning tags, preserving cross-block partial snapshots, enabling topic-thread streaming cards, omitting the generic `main` card header, surfacing transient tool/compaction status, and cleaning streaming state after close failures. Thanks @sesame437, @Vicky-v7, @maoku-family, @Pengxiao-Wang, and @Maple778.
- Heartbeat: clamp oversized scheduler delays through the shared safe timer helper, preventing `every` values over Node's timeout cap from becoming a 1 ms crash loop. Fixes #71414. (#71478) Thanks @hclsys.
- Control UI/chat: collapse assistant token/model context details behind an explicit Context disclosure and show full dates in message footers, making historical transcript timing clear without noisy default metadata. (#71337) Thanks @BunsDev.
- OpenAI/Codex OAuth: explain `unsupported_country_region_territory` token-exchange failures with a proxy/region hint instead of surfacing a generic OAuth error. Fixes #51175. (#71501) Thanks @vincentkoc and @wulala-xjj.

View File

@@ -491,6 +491,64 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
});
it("preserves previous generation blocks when partial snapshots reset after tools", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onPartialReply?.({
text: "Preparing the lookup plan with enough text to count as one block.",
});
result.replyOptions.onPartialReply?.({ text: "Found" });
result.replyOptions.onPartialReply?.({ text: "Found the answer." });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].close).toHaveBeenCalledWith(
"Preparing the lookup plan with enough text to count as one block.Found the answer.",
{
note: "Agent: agent",
},
);
});
it("strips reasoning tags from streamed partial snapshots", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onPartialReply?.({
text: "<thinking>private chain of thought</thinking>\nvisible answer",
});
await options.onIdle?.();
expect(streamingInstances[0].close).toHaveBeenCalledWith("visible answer", {
note: "Agent: agent",
});
});
it("sends media-only payloads as attachments", async () => {
const { options } = createDispatcherHarness();
await options.deliver({ mediaUrl: "https://example.com/a.png" }, { kind: "final" });
@@ -757,7 +815,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
);
});
it("disables streaming for thread replies and keeps reply metadata", async () => {
it("uses streaming cards for thread replies and keeps topic metadata", async () => {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
replyToMessageId: "om_msg",
@@ -767,13 +825,127 @@ describe("createFeishuReplyDispatcher streaming behavior", () => {
});
await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" });
expect(streamingInstances).toHaveLength(0);
expect(sendStructuredCardFeishuMock).toHaveBeenCalledWith(
expect(streamingInstances).toHaveLength(1);
expect(streamingInstances[0].start).toHaveBeenCalledWith(
"oc_chat",
"chat_id",
expect.objectContaining({
replyToMessageId: "om_msg",
replyInThread: true,
rootId: "om_root_topic",
}),
);
expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled();
});
it("omits the generic main header from streaming and static cards", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { options } = createDispatcherHarness({
agentId: "main",
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "streamed card" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances[0].start).toHaveBeenCalledWith(
"oc_chat",
"chat_id",
expect.objectContaining({
header: undefined,
}),
);
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: false,
},
});
const { options: staticOptions } = createDispatcherHarness({
agentId: "main",
runtime: createRuntimeLogger(),
});
await staticOptions.deliver({ text: "static card" }, { kind: "final" });
expect(sendStructuredCardFeishuMock).toHaveBeenCalledWith(
expect.objectContaining({
header: undefined,
}),
);
});
it("shows transient tool status on streaming cards but omits it from the final close", async () => {
resolveFeishuAccountMock.mockReturnValue({
accountId: "main",
appId: "app_id",
appSecret: "app_secret",
domain: "feishu",
config: {
renderMode: "card",
streaming: true,
},
});
const { result, options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.onReplyStart?.();
result.replyOptions.onToolStart?.({ name: "web_search" });
result.replyOptions.onPartialReply?.({ text: "final answer" });
await options.onIdle?.();
const updateTexts = streamingInstances[0].update.mock.calls.map((call: unknown[]) =>
typeof call[0] === "string" ? call[0] : "",
);
expect(updateTexts.some((text) => text.includes("Using: web_search"))).toBe(true);
expect(streamingInstances[0].close).toHaveBeenCalledWith("final answer", {
note: "Agent: agent",
});
});
it("cleans streaming state even when close throws", async () => {
const origPush = streamingInstances.push.bind(streamingInstances);
streamingInstances.push = (...args: StreamingSessionStub[]) => {
if (args.length > 0 && streamingInstances.length === 0) {
args[0].close = vi.fn(async () => {
args[0].active = false;
throw new Error("close failed");
});
}
return origPush(...args);
};
try {
const { options } = createDispatcherHarness({
runtime: createRuntimeLogger(),
});
await options.deliver({ text: "```md\nfirst\n```" }, { kind: "final" });
await expect(options.onIdle?.()).rejects.toThrow("close failed");
await options.deliver({ text: "```md\nsecond\n```" }, { kind: "final" });
await options.onIdle?.();
expect(streamingInstances).toHaveLength(2);
expect(streamingInstances[1].close).toHaveBeenCalledWith("```md\nsecond\n```", {
note: "Agent: agent",
});
} finally {
streamingInstances.push = origPush;
}
});
it("passes replyInThread to media attachments", async () => {

View File

@@ -5,6 +5,7 @@ import {
resolveTextChunksWithFallback,
sendMediaWithLeadingCaption,
} from "openclaw/plugin-sdk/reply-payload";
import { stripReasoningTagsFromText } from "openclaw/plugin-sdk/text-runtime";
import { resolveFeishuRuntimeAccount } from "./accounts.js";
import { createFeishuClient } from "./client.js";
import { sendMediaFeishu } from "./media.js";
@@ -70,11 +71,15 @@ function normalizeEpochMs(timestamp: number | undefined): number | undefined {
function resolveCardHeader(
agentId: string,
identity: OutboundIdentity | undefined,
): CardHeaderConfig {
const name = identity?.name?.trim() || agentId;
): CardHeaderConfig | undefined {
const name = identity?.name?.trim() || (agentId === "main" ? "" : agentId);
const emoji = identity?.emoji?.trim();
const title = (emoji ? `${emoji} ${name}` : name).trim();
if (!title) {
return undefined;
}
return {
title: emoji ? `${emoji} ${name}` : name,
title,
template: identity?.theme ?? "blue",
};
}
@@ -210,15 +215,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
const chunkMode = core.channel.text.resolveChunkMode(cfg, "feishu");
const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg, channel: "feishu" });
const renderMode = account.config?.renderMode ?? "auto";
// Card streaming may miss thread affinity in topic contexts; use direct replies there.
const streamingEnabled =
!threadReplyMode && account.config?.streaming !== false && renderMode !== "raw";
const streamingEnabled = account.config?.streaming !== false && renderMode !== "raw";
const reasoningPreviewEnabled = streamingEnabled && params.allowReasoningPreview === true;
let streaming: FeishuStreamingSession | null = null;
let streamText = "";
let lastPartial = "";
let reasoningText = "";
let statusLine = "";
let snapshotBaseText = "";
let lastSnapshotTextLength = 0;
const deliveredFinalTexts = new Set<string>();
let partialUpdateQueue: Promise<void> = Promise.resolve();
let streamingStartPromise: Promise<void> | null = null;
@@ -245,6 +251,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (answer) {
parts.push(answer);
}
if (statusLine) {
parts.push(parts.length > 0 ? `\n\n${statusLine}` : statusLine);
}
return parts.join("");
};
@@ -276,8 +285,24 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
lastPartial = nextText;
}
const mode = options?.mode ?? "snapshot";
streamText =
mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText);
if (mode === "delta") {
streamText = `${streamText}${nextText}`;
} else {
const currentSnapshotText = snapshotBaseText
? streamText.slice(snapshotBaseText.length)
: streamText;
const startsNewSnapshotBlock =
lastSnapshotTextLength >= 20 &&
nextText.length < lastSnapshotTextLength * 0.5 &&
!currentSnapshotText.includes(nextText);
if (startsNewSnapshotBlock) {
snapshotBaseText = streamText;
streamText = `${snapshotBaseText}${nextText}`;
} else {
streamText = `${snapshotBaseText}${mergeStreamingText(currentSnapshotText, nextText)}`;
}
lastSnapshotTextLength = nextText.length;
}
flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText));
};
@@ -335,29 +360,46 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
};
const closeStreaming = async () => {
if (streamingStartPromise) {
await streamingStartPromise;
}
await partialUpdateQueue;
if (streaming?.isActive()) {
let text = buildCombinedStreamText(reasoningText, streamText);
if (mentionTargets?.length) {
text = buildMentionedCardContent(mentionTargets, text);
try {
if (streamingStartPromise) {
await streamingStartPromise;
}
const finalNote = resolveCardNote(agentId, identity, prefixContext.prefixContext);
await streaming.close(text, { note: finalNote });
// Track the raw streamed text so the duplicate-final check in deliver()
// can skip the redundant text delivery that arrives after onIdle closes
// the streaming card.
if (streamText) {
deliveredFinalTexts.add(streamText);
await partialUpdateQueue;
if (streaming?.isActive()) {
statusLine = "";
let text = buildCombinedStreamText(reasoningText, streamText);
if (mentionTargets?.length) {
text = buildMentionedCardContent(mentionTargets, text);
}
const finalNote = resolveCardNote(agentId, identity, prefixContext.prefixContext);
await streaming.close(text, { note: finalNote });
// Track the raw streamed text so the duplicate-final check in deliver()
// can skip the redundant text delivery that arrives after onIdle closes
// the streaming card.
if (streamText) {
deliveredFinalTexts.add(streamText);
}
}
} finally {
streaming = null;
streamingStartPromise = null;
partialUpdateQueue = Promise.resolve();
streamText = "";
lastPartial = "";
reasoningText = "";
statusLine = "";
snapshotBaseText = "";
lastSnapshotTextLength = 0;
}
streaming = null;
streamingStartPromise = null;
streamText = "";
lastPartial = "";
reasoningText = "";
};
const updateStreamingStatusLine = (nextStatusLine: string) => {
statusLine = nextStatusLine;
if (!streaming?.isActive() && !streamingStartPromise && renderMode !== "card") {
return;
}
startStreaming();
flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText));
};
const sendChunkedTextReply = async (params: {
@@ -457,6 +499,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
}
if (info?.kind === "final") {
streamText = text;
snapshotBaseText = "";
lastSnapshotTextLength = text.length;
flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText));
}
// Send media even when streaming handled the text
@@ -538,7 +582,14 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
if (!payload.text) {
return;
}
queueStreamingUpdate(payload.text, {
const cleaned = stripReasoningTagsFromText(payload.text, {
mode: "strict",
trim: "both",
});
if (!cleaned) {
return;
}
queueStreamingUpdate(cleaned, {
dedupeWithLastPartial: true,
mode: "snapshot",
});
@@ -554,6 +605,28 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP
}
: undefined,
onReasoningEnd: reasoningPreviewEnabled ? () => {} : undefined,
onToolStart: streamingEnabled
? (payload: { name?: string; phase?: string }) => {
updateStreamingStatusLine(
`🔧 **Using: ${payload.name ?? payload.phase ?? "tool"}...**`,
);
}
: undefined,
onAssistantMessageStart: streamingEnabled
? () => {
updateStreamingStatusLine("");
}
: undefined,
onCompactionStart: streamingEnabled
? () => {
updateStreamingStatusLine("📦 **Compacting context...**");
}
: undefined,
onCompactionEnd: streamingEnabled
? () => {
updateStreamingStatusLine("");
}
: undefined,
},
markDispatchIdle,
};