diff --git a/CHANGELOG.md b/CHANGELOG.md index 8700811c3f9..85ba2162974 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Google Chat: preserve reply text when a typing indicator message is deleted or can no longer be updated, so media captions and first text chunks are resent instead of silently disappearing. (#71498) Thanks @colin-lgtm. - Cron: tolerate malformed legacy job rows in startup, main-session system-event payloads, and human-readable `cron list` output so missing `state`, `payload.text`, or display fields no longer crash the scheduler or CLI. Fixes #66016, #65916, #64137, #57872, #59968, #63813, #52804, and #43163. (#71509) Thanks @vincentkoc. - CLI/models: make `openclaw models scan` fall back to public OpenRouter free-model metadata when no `OPENROUTER_API_KEY` is configured, avoid config secret resolution for explicit `--no-probe` scans, and apply the scan timeout to the OpenRouter catalog request. +- Feishu: keep streaming cards to one live card per turn, flush throttled card edits after meaningful text boundaries, and skip exact block/partial repeats so tool-heavy replies do not duplicate card output. Thanks @allan0509. - Heartbeat: clamp oversized scheduler delays through the shared safe timer helper, preventing `every` values over Node's timeout cap from becoming a 1 ms crash loop. Fixes #71414. (#71478) Thanks @hclsys. - Control UI/chat: collapse assistant token/model context details behind an explicit Context disclosure and show full dates in message footers, making historical transcript timing clear without noisy default metadata. (#71337) Thanks @BunsDev. - OpenAI/Codex OAuth: explain `unsupported_country_region_territory` token-exchange failures with a proxy/region hint instead of surfacing a generic OAuth error. Fixes #51175. (#71501) Thanks @vincentkoc and @wulala-xjj. diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index 9dbc60b3f6a..c3b9a43d6e9 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -296,6 +296,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { rootId: "om_root_topic", }); await options.deliver({ text: "```ts\nconst x = 1\n```" }, { kind: "final" }); + await options.onIdle?.(); expect(streamingInstances).toHaveLength(1); expect(streamingInstances[0].start).toHaveBeenCalledTimes(1); @@ -330,20 +331,17 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); - it("delivers distinct final payloads after streaming close", async () => { + it("coalesces distinct final payloads into one streaming card until idle", async () => { const { options } = createDispatcherHarness({ runtime: createRuntimeLogger(), }); await options.deliver({ text: "```md\n完整回复第一段\n```" }, { kind: "final" }); await options.deliver({ text: "```md\n完整回复第一段 + 第二段\n```" }, { kind: "final" }); + await options.onIdle?.(); - expect(streamingInstances).toHaveLength(2); + expect(streamingInstances).toHaveLength(1); expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); - expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\n完整回复第一段\n```", { - note: "Agent: agent", - }); - expect(streamingInstances[1].close).toHaveBeenCalledTimes(1); - expect(streamingInstances[1].close).toHaveBeenCalledWith( + expect(streamingInstances[0].close).toHaveBeenCalledWith( "```md\n完整回复第一段 + 第二段\n```", { note: "Agent: agent", @@ -358,6 +356,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { runtime: createRuntimeLogger(), }); await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" }); + await options.onIdle?.(); await options.deliver({ text: "```md\n同一条回复\n```" }, { kind: "final" }); expect(streamingInstances).toHaveLength(1); @@ -370,6 +369,17 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); it("skips final text already closed by idle streaming", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + const { result, options } = createDispatcherHarness({ runtime: createRuntimeLogger(), }); @@ -454,6 +464,33 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { }); }); + it("skips block payloads that exactly repeat the latest partial snapshot", async () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "card", + streaming: true, + }, + }); + + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + await options.onReplyStart?.(); + result.replyOptions.onPartialReply?.({ text: "```md\npartial\n```" }); + await options.deliver({ text: "```md\npartial\n```" }, { kind: "block" }); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + expect(streamingInstances[0].close).toHaveBeenCalledWith("```md\npartial\n```", { + note: "Agent: agent", + }); + }); + it("sends media-only payloads as attachments", async () => { const { options } = createDispatcherHarness(); await options.deliver({ mediaUrl: "https://example.com/a.png" }, { kind: "final" }); @@ -508,6 +545,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { { text: "```ts\nconst x = 1\n```", mediaUrls: ["https://example.com/a.png"] }, { kind: "final" }, ); + await options.onIdle?.(); expect(streamingInstances).toHaveLength(1); expect(streamingInstances[0].start).toHaveBeenCalledTimes(1); @@ -576,6 +614,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { result.replyOptions.onPartialReply?.({ text: "answer part" }); result.replyOptions.onReasoningEnd?.(); await options.deliver({ text: "answer part final" }, { kind: "final" }); + await options.onIdle?.(); expect(streamingInstances).toHaveLength(1); const updateCalls = streamingInstances[0].update.mock.calls.map((c: unknown[]) => @@ -667,6 +706,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { result.replyOptions.onReasoningStream?.({ text: "" }); result.replyOptions.onPartialReply?.({ text: "```ts\ncode\n```" }); await options.deliver({ text: "```ts\ncode\n```" }, { kind: "final" }); + await options.onIdle?.(); expect(streamingInstances).toHaveLength(1); const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; @@ -684,6 +724,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" }); result.replyOptions.onReasoningEnd?.(); await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" }); + await options.onIdle?.(); expect(streamingInstances).toHaveLength(1); expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); @@ -798,6 +839,7 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { // or transient Feishu failures recover without a process restart. nowSpy.mockReturnValue(62_000); await options.deliver({ text: "```ts\nconst z = 3\n```" }, { kind: "final" }); + await options.onIdle?.(); expect(streamingInstances).toHaveLength(2); expect(streamingInstances[1].start).toHaveBeenCalled(); diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index dd4857a9cc2..4804be0e72d 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -453,12 +453,11 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP if (info?.kind === "block") { // Some runtimes emit block payloads without onPartial/final callbacks. // Mirror block text into streamText so onIdle close still sends content. - queueStreamingUpdate(text, { mode: "delta" }); + queueStreamingUpdate(text, { mode: "delta", dedupeWithLastPartial: true }); } if (info?.kind === "final") { - streamText = mergeStreamingText(streamText, text); - await closeStreaming(); - deliveredFinalTexts.add(text); + streamText = text; + flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); } // Send media even when streaming handled the text if (hasMedia) { diff --git a/extensions/feishu/src/streaming-card.test.ts b/extensions/feishu/src/streaming-card.test.ts index bb12feab613..f531e11fd1c 100644 --- a/extensions/feishu/src/streaming-card.test.ts +++ b/extensions/feishu/src/streaming-card.test.ts @@ -1,5 +1,141 @@ -import { describe, expect, it } from "vitest"; -import { mergeStreamingText, resolveStreamingCardSendMode } from "./streaming-card.js"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const fetchWithSsrFGuardMock = vi.hoisted(() => vi.fn()); + +vi.mock("openclaw/plugin-sdk/ssrf-runtime", () => ({ + fetchWithSsrFGuard: fetchWithSsrFGuardMock, +})); + +import { + FeishuStreamingSession, + mergeStreamingText, + resolveStreamingCardSendMode, +} from "./streaming-card.js"; + +type StreamingSessionState = { + cardId: string; + messageId: string; + sequence: number; + currentText: string; + hasNote: boolean; +}; + +function setStreamingSessionInternals( + session: FeishuStreamingSession, + values: { + state: StreamingSessionState; + lastUpdateTime?: number; + }, +) { + const internals = session as unknown as { + state: StreamingSessionState; + lastUpdateTime: number; + }; + internals.state = values.state; + if (values.lastUpdateTime !== undefined) { + internals.lastUpdateTime = values.lastUpdateTime; + } +} + +describe("FeishuStreamingSession", () => { + beforeEach(() => { + vi.useRealTimers(); + fetchWithSsrFGuardMock.mockReset(); + }); + + function mockFetches(updateBodies: string[]) { + fetchWithSsrFGuardMock.mockImplementation( + async ({ url, init }: { url: string; init?: { body?: string } }) => { + const release = vi.fn(async () => {}); + if (url.includes("/auth/")) { + return { + response: { + ok: true, + json: async () => ({ + code: 0, + msg: "ok", + tenant_access_token: "token", + expire: 7200, + }), + }, + release, + }; + } + if (url.includes("/elements/content/content")) { + updateBodies.push(init?.body ?? ""); + } + return { + response: { + ok: true, + json: async () => ({ code: 0, msg: "ok" }), + }, + release, + }; + }, + ); + } + + it("flushes throttled pending text after the throttle window", async () => { + vi.useFakeTimers(); + vi.setSystemTime(1_000); + const updateBodies: string[] = []; + mockFetches(updateBodies); + + const session = new FeishuStreamingSession({} as never, { + appId: "app_pending_flush", + appSecret: "secret", + }); + setStreamingSessionInternals(session, { + state: { + cardId: "card_1", + messageId: "om_1", + sequence: 1, + currentText: "hello", + hasNote: false, + }, + lastUpdateTime: 1_000, + }); + + await session.update("hello small"); + expect(updateBodies).toHaveLength(0); + + await vi.advanceTimersByTimeAsync(160); + + expect(updateBodies).toHaveLength(1); + expect(JSON.parse(updateBodies[0] ?? "{}")).toMatchObject({ + content: "hello small", + }); + }); + + it("pushes natural-boundary updates immediately inside the throttle window", async () => { + vi.useFakeTimers(); + vi.setSystemTime(2_000); + const updateBodies: string[] = []; + mockFetches(updateBodies); + + const session = new FeishuStreamingSession({} as never, { + appId: "app_boundary_flush", + appSecret: "secret", + }); + setStreamingSessionInternals(session, { + state: { + cardId: "card_2", + messageId: "om_2", + sequence: 1, + currentText: "hello", + hasNote: false, + }, + lastUpdateTime: 2_000, + }); + + await session.update("hello!"); + + expect(updateBodies).toHaveLength(1); + expect(JSON.parse(updateBodies[0] ?? "{}")).toMatchObject({ + content: "hello!", + }); + }); +}); describe("mergeStreamingText", () => { it("prefers the latest full text when it already includes prior text", () => { diff --git a/extensions/feishu/src/streaming-card.ts b/extensions/feishu/src/streaming-card.ts index 1aee94990a9..ef1aa1ec7f5 100644 --- a/extensions/feishu/src/streaming-card.ts +++ b/extensions/feishu/src/streaming-card.ts @@ -39,6 +39,9 @@ type StreamingStartOptions = { header?: StreamingCardHeader; }; +const STREAMING_UPDATE_THROTTLE_MS = 160; +const STREAMING_SIGNIFICANT_DELTA_CHARS = 18; + // Token cache (keyed by domain + appId) const tokenCache = new Map(); @@ -112,6 +115,20 @@ function truncateSummary(text: string, max = 50): string { return clean.length <= max ? clean : clean.slice(0, max - 3) + "..."; } +function hasNaturalStreamingBoundary(text: string): boolean { + return /[\n。!?!?;;::]$/.test(text); +} + +function shouldPushStreamingUpdate(previousText: string, nextText: string): boolean { + if (!previousText) { + return true; + } + if (hasNaturalStreamingBoundary(nextText)) { + return true; + } + return nextText.length - previousText.length >= STREAMING_SIGNIFICANT_DELTA_CHARS; +} + export function mergeStreamingText( previousText: string | undefined, nextText: string | undefined, @@ -169,7 +186,7 @@ export class FeishuStreamingSession { private lastUpdateTime = 0; private pendingText: string | null = null; private flushTimer: ReturnType | null = null; - private updateThrottleMs = 100; // Throttle updates to max 10/sec + private updateThrottleMs = STREAMING_UPDATE_THROTTLE_MS; constructor(client: Client, creds: Credentials, log?: (msg: string) => void) { this.client = client; @@ -324,6 +341,28 @@ export class FeishuStreamingSession { .catch((error) => onError?.(error)); } + private clearFlushTimer(): void { + if (this.flushTimer) { + clearTimeout(this.flushTimer); + this.flushTimer = null; + } + } + + private schedulePendingFlush(): void { + if (this.flushTimer || !this.pendingText || this.closed) { + return; + } + const delayMs = Math.max(0, this.updateThrottleMs - (Date.now() - this.lastUpdateTime)); + this.flushTimer = setTimeout(() => { + this.flushTimer = null; + const pending = this.pendingText; + if (!pending || this.closed) { + return; + } + void this.update(pending); + }, delayMs); + } + async update(text: string): Promise { if (!this.state || this.closed) { return; @@ -332,28 +371,27 @@ export class FeishuStreamingSession { if (!mergedInput || mergedInput === this.state.currentText) { return; } + this.pendingText = mergedInput; + this.clearFlushTimer(); - // Throttle: skip if updated recently, but remember pending text + const shouldForceUpdate = shouldPushStreamingUpdate(this.state.currentText, mergedInput); const now = Date.now(); - if (now - this.lastUpdateTime < this.updateThrottleMs) { - this.pendingText = mergedInput; + if (!shouldForceUpdate && now - this.lastUpdateTime < this.updateThrottleMs) { + this.schedulePendingFlush(); return; } - this.pendingText = null; this.lastUpdateTime = now; - if (this.flushTimer) { - clearTimeout(this.flushTimer); - this.flushTimer = null; - } this.queue = this.queue.then(async () => { if (!this.state || this.closed) { return; } - const mergedText = mergeStreamingText(this.state.currentText, mergedInput); + const nextText = this.pendingText ?? mergedInput; + const mergedText = mergeStreamingText(this.state.currentText, nextText); if (!mergedText || mergedText === this.state.currentText) { return; } + this.pendingText = null; this.state.currentText = mergedText; await this.updateCardContent(mergedText, (e) => this.log?.(`Update failed: ${String(e)}`)); }); @@ -395,10 +433,7 @@ export class FeishuStreamingSession { return; } this.closed = true; - if (this.flushTimer) { - clearTimeout(this.flushTimer); - this.flushTimer = null; - } + this.clearFlushTimer(); await this.queue; const pendingMerged = mergeStreamingText(this.state.currentText, this.pendingText ?? undefined);