From 84a22a64bef084ff63e58d09124bad899a8f386e Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sat, 25 Apr 2026 04:04:03 -0700 Subject: [PATCH] fix(feishu): finish streaming card closeout --- CHANGELOG.md | 1 + .../feishu/src/reply-dispatcher.test.ts | 178 +++++++++++++++++- extensions/feishu/src/reply-dispatcher.ts | 131 ++++++++++--- 3 files changed, 278 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff7ef3fedb..1e63a778bbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Docs: https://docs.openclaw.ai - Cron: tolerate malformed legacy job rows in startup, main-session system-event payloads, and human-readable `cron list` output so missing `state`, `payload.text`, or display fields no longer crash the scheduler or CLI. Fixes #66016, #65916, #64137, #57872, #59968, #63813, #52804, and #43163. (#71509) Thanks @vincentkoc. - CLI/models: make `openclaw models scan` fall back to public OpenRouter free-model metadata when no `OPENROUTER_API_KEY` is configured, avoid config secret resolution for explicit `--no-probe` scans, and apply the scan timeout to the OpenRouter catalog request. - Feishu: keep streaming cards to one live card per turn, flush throttled card edits after meaningful text boundaries, and skip exact block/partial repeats so tool-heavy replies do not duplicate card output. Thanks @allan0509. +- Feishu: finish the streaming-card duplicate closeout by stripping leaked reasoning tags, preserving cross-block partial snapshots, enabling topic-thread streaming cards, omitting the generic `main` card header, surfacing transient tool/compaction status, and cleaning streaming state after close failures. Thanks @sesame437, @Vicky-v7, @maoku-family, @Pengxiao-Wang, and @Maple778. - Heartbeat: clamp oversized scheduler delays through the shared safe timer helper, preventing `every` values over Node's timeout cap from becoming a 1 ms crash loop. Fixes #71414. (#71478) Thanks @hclsys. - Control UI/chat: collapse assistant token/model context details behind an explicit Context disclosure and show full dates in message footers, making historical transcript timing clear without noisy default metadata. (#71337) Thanks @BunsDev. - OpenAI/Codex OAuth: explain `unsupported_country_region_territory` token-exchange failures with a proxy/region hint instead of surfacing a generic OAuth error. Fixes #51175. (#71501) Thanks @vincentkoc and @wulala-xjj. diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index c3b9a43d6e9..fb0b7342b4e 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -491,6 +491,64 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); + it("preserves previous generation blocks when partial snapshots reset after tools", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + await options.onReplyStart?.(); + result.replyOptions.onPartialReply?.({ + text: "Preparing the lookup plan with enough text to count as one block.", + }); + result.replyOptions.onPartialReply?.({ text: "Found" }); + result.replyOptions.onPartialReply?.({ text: "Found the answer." }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith( + "Preparing the lookup plan with enough text to count as one block.Found the answer.", + { + note: "Agent: agent", + }, + ); + }); + + it("strips reasoning tags from streamed partial snapshots", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + await options.onReplyStart?.(); + result.replyOptions.onPartialReply?.({ + text: "private chain of thought\nvisible answer", + }); + await options.onIdle?.(); + + expect(streamingInstances[0].close).toHaveBeenCalledWith("visible answer", { + note: "Agent: agent", + }); + }); + it("sends media-only payloads as attachments", async () => { const { options } = createDispatcherHarness(); await options.deliver({ mediaUrl: "https://example.com/a.png" }, { kind: "final" }); @@ -757,7 +815,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { ); }); - it("disables streaming for thread replies and keeps reply metadata", async () => { + it("uses streaming cards for thread replies and keeps topic metadata", async () => { const { options } = createDispatcherHarness({ runtime: createRuntimeLogger(), replyToMessageId: "om_msg", @@ -767,13 +825,127 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" }); - expect(streamingInstances).toHaveLength(0); - expect(sendStructuredCardFeishuMock).toHaveBeenCalledWith( + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].start).toHaveBeenCalledWith( + "oc_chat", + "chat_id", expect.objectContaining({ replyToMessageId: "om_msg", replyInThread: true, + rootId: "om_root_topic", }), ); + expect(sendStructuredCardFeishuMock).not.toHaveBeenCalled(); + }); + + it("omits the generic main header from streaming and static cards", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { options } = createDispatcherHarness({ + agentId: "main", + runtime: createRuntimeLogger(), + }); + await options.deliver({ text: "streamed card" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances[0].start).toHaveBeenCalledWith( + "oc_chat", + "chat_id", + expect.objectContaining({ + header: undefined, + }), + ); + + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: false, + }, + }); + + const { options: staticOptions } = createDispatcherHarness({ + agentId: "main", + runtime: createRuntimeLogger(), + }); + await staticOptions.deliver({ text: "static card" }, { kind: "final" }); + + expect(sendStructuredCardFeishuMock).toHaveBeenCalledWith( + expect.objectContaining({ + header: undefined, + }), + ); + }); + + it("shows transient tool status on streaming cards but omits it from the final close", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + await options.onReplyStart?.(); + result.replyOptions.onToolStart?.({ name: "web_search" }); + result.replyOptions.onPartialReply?.({ text: "final answer" }); + await options.onIdle?.(); + + const updateTexts = streamingInstances[0].update.mock.calls.map((call: unknown[]) => + typeof call[0] === "string" ? call[0] : "", + ); + expect(updateTexts.some((text) => text.includes("Using: web_search"))).toBe(true); + expect(streamingInstances[0].close).toHaveBeenCalledWith("final answer", { + note: "Agent: agent", + }); + }); + + it("cleans streaming state even when close throws", async () => { + const origPush = streamingInstances.push.bind(streamingInstances); + streamingInstances.push = (...args: StreamingSessionStub[]) => { + if (args.length > 0 && streamingInstances.length === 0) { + args[0].close = vi.fn(async () => { + args[0].active = false; + throw new Error("close failed"); + }); + } + return origPush(...args); + }; + + try { + const { options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + await options.deliver({ text: "```md\nfirst\n```" }, { kind: "final" }); + await expect(options.onIdle?.()).rejects.toThrow("close failed"); + await options.deliver({ text: "```md\nsecond\n```" }, { kind: "final" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(2); + expect(streamingInstances[1].close).toHaveBeenCalledWith("```md\nsecond\n```", { + note: "Agent: agent", + }); + } finally { + streamingInstances.push = origPush; + } }); it("passes replyInThread to media attachments", async () => { diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 4804be0e72d..8fa6e09df67 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -5,6 +5,7 @@ import { resolveTextChunksWithFallback, sendMediaWithLeadingCaption, } from "openclaw/plugin-sdk/reply-payload"; +import { stripReasoningTagsFromText } from "openclaw/plugin-sdk/text-runtime"; import { resolveFeishuRuntimeAccount } from "./accounts.js"; import { createFeishuClient } from "./client.js"; import { sendMediaFeishu } from "./media.js"; @@ -70,11 +71,15 @@ function normalizeEpochMs(timestamp: number | undefined): number | undefined { function resolveCardHeader( agentId: string, identity: OutboundIdentity | undefined, -): CardHeaderConfig { - const name = identity?.name?.trim() || agentId; +): CardHeaderConfig | undefined { + const name = identity?.name?.trim() || (agentId === "main" ? "" : agentId); const emoji = identity?.emoji?.trim(); + const title = (emoji ? `${emoji} ${name}` : name).trim(); + if (!title) { + return undefined; + } return { - title: emoji ? `${emoji} ${name}` : name, + title, template: identity?.theme ?? "blue", }; } @@ -210,15 +215,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP const chunkMode = core.channel.text.resolveChunkMode(cfg, "feishu"); const tableMode = core.channel.text.resolveMarkdownTableMode({ cfg, channel: "feishu" }); const renderMode = account.config?.renderMode ?? "auto"; - // Card streaming may miss thread affinity in topic contexts; use direct replies there. - const streamingEnabled = - !threadReplyMode && account.config?.streaming !== false && renderMode !== "raw"; + const streamingEnabled = account.config?.streaming !== false && renderMode !== "raw"; const reasoningPreviewEnabled = streamingEnabled && params.allowReasoningPreview === true; let streaming: FeishuStreamingSession | null = null; let streamText = ""; let lastPartial = ""; let reasoningText = ""; + let statusLine = ""; + let snapshotBaseText = ""; + let lastSnapshotTextLength = 0; const deliveredFinalTexts = new Set(); let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; @@ -245,6 +251,9 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (answer) { parts.push(answer); } + if (statusLine) { + parts.push(parts.length > 0 ? `\n\n${statusLine}` : statusLine); + } return parts.join(""); }; @@ -276,8 +285,24 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP lastPartial = nextText; } const mode = options?.mode ?? "snapshot"; - streamText = - mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText); + if (mode === "delta") { + streamText = `${streamText}${nextText}`; + } else { + const currentSnapshotText = snapshotBaseText + ? streamText.slice(snapshotBaseText.length) + : streamText; + const startsNewSnapshotBlock = + lastSnapshotTextLength >= 20 && + nextText.length < lastSnapshotTextLength * 0.5 && + !currentSnapshotText.includes(nextText); + if (startsNewSnapshotBlock) { + snapshotBaseText = streamText; + streamText = `${snapshotBaseText}${nextText}`; + } else { + streamText = `${snapshotBaseText}${mergeStreamingText(currentSnapshotText, nextText)}`; + } + lastSnapshotTextLength = nextText.length; + } flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); }; @@ -335,29 +360,46 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP }; const closeStreaming = async () => { - if (streamingStartPromise) { - await streamingStartPromise; - } - await partialUpdateQueue; - if (streaming?.isActive()) { - let text = buildCombinedStreamText(reasoningText, streamText); - if (mentionTargets?.length) { - text = buildMentionedCardContent(mentionTargets, text); + try { + if (streamingStartPromise) { + await streamingStartPromise; } - const finalNote = resolveCardNote(agentId, identity, prefixContext.prefixContext); - await streaming.close(text, { note: finalNote }); - // Track the raw streamed text so the duplicate-final check in deliver() - // can skip the redundant text delivery that arrives after onIdle closes - // the streaming card. - if (streamText) { - deliveredFinalTexts.add(streamText); + await partialUpdateQueue; + if (streaming?.isActive()) { + statusLine = ""; + let text = buildCombinedStreamText(reasoningText, streamText); + if (mentionTargets?.length) { + text = buildMentionedCardContent(mentionTargets, text); + } + const finalNote = resolveCardNote(agentId, identity, prefixContext.prefixContext); + await streaming.close(text, { note: finalNote }); + // Track the raw streamed text so the duplicate-final check in deliver() + // can skip the redundant text delivery that arrives after onIdle closes + // the streaming card. + if (streamText) { + deliveredFinalTexts.add(streamText); + } } + } finally { + streaming = null; + streamingStartPromise = null; + partialUpdateQueue = Promise.resolve(); + streamText = ""; + lastPartial = ""; + reasoningText = ""; + statusLine = ""; + snapshotBaseText = ""; + lastSnapshotTextLength = 0; } - streaming = null; - streamingStartPromise = null; - streamText = ""; - lastPartial = ""; - reasoningText = ""; + }; + + const updateStreamingStatusLine = (nextStatusLine: string) => { + statusLine = nextStatusLine; + if (!streaming?.isActive() && !streamingStartPromise && renderMode !== "card") { + return; + } + startStreaming(); + flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); }; const sendChunkedTextReply = async (params: { @@ -457,6 +499,8 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } if (info?.kind === "final") { streamText = text; + snapshotBaseText = ""; + lastSnapshotTextLength = text.length; flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); } // Send media even when streaming handled the text @@ -538,7 +582,14 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (!payload.text) { return; } - queueStreamingUpdate(payload.text, { + const cleaned = stripReasoningTagsFromText(payload.text, { + mode: "strict", + trim: "both", + }); + if (!cleaned) { + return; + } + queueStreamingUpdate(cleaned, { dedupeWithLastPartial: true, mode: "snapshot", }); @@ -554,6 +605,28 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } : undefined, onReasoningEnd: reasoningPreviewEnabled ? () => {} : undefined, + onToolStart: streamingEnabled + ? (payload: { name?: string; phase?: string }) => { + updateStreamingStatusLine( + `🔧 **Using: ${payload.name ?? payload.phase ?? "tool"}...**`, + ); + } + : undefined, + onAssistantMessageStart: streamingEnabled + ? () => { + updateStreamingStatusLine(""); + } + : undefined, + onCompactionStart: streamingEnabled + ? () => { + updateStreamingStatusLine("📦 **Compacting context...**"); + } + : undefined, + onCompactionEnd: streamingEnabled + ? () => { + updateStreamingStatusLine(""); + } + : undefined, }, markDispatchIdle, };