diff --git a/CHANGELOG.md b/CHANGELOG.md index b820465190c..6d7b81dac31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -264,6 +264,7 @@ Docs: https://docs.openclaw.ai - Agents/OpenAI WS compat store flag: omit `store` from `response.create` payloads when model compat sets `supportsStore: false`, preventing strict OpenAI-compatible providers from rejecting websocket requests with unknown-field errors. (#39113) Thanks @scoootscooob. - Config/validation log sanitization: sanitize config-validation issue paths/messages before logging so control characters and ANSI escape sequences cannot inject misleading terminal output from crafted config content. (#39116) Thanks @powermaster888. - Agents/compaction counter accuracy: count successful overflow-triggered auto-compactions (`willRetry=true`) in the compaction counter while still excluding aborted/no-result events, so `/status` reflects actual safeguard compaction activity. (#39123) Thanks @MumuTW. +- Gateway/chat delta ordering: flush buffered assistant deltas before emitting tool `start` events so pre-tool text is delivered to Control UI before tool cards, avoiding transient text/tool ordering artifacts in streaming. (#39128) Thanks @0xtangping. ## 2026.3.2 diff --git a/src/gateway/server-chat.agent-events.test.ts b/src/gateway/server-chat.agent-events.test.ts index b89e2462c51..6d705fc4a8c 100644 --- a/src/gateway/server-chat.agent-events.test.ts +++ b/src/gateway/server-chat.agent-events.test.ts @@ -470,6 +470,74 @@ describe("agent event handler", () => { nowSpy?.mockRestore(); }); + it("flushes buffered chat delta before tool start events", () => { + let now = 12_000; + const nowSpy = vi.spyOn(Date, "now").mockImplementation(() => now); + const { + broadcast, + broadcastToConnIds, + nodeSendToSession, + chatRunState, + toolEventRecipients, + handler, + } = createHarness({ + resolveSessionKeyForRun: () => "session-tool-flush", + }); + + chatRunState.registry.add("run-tool-flush", { + sessionKey: "session-tool-flush", + clientRunId: "client-tool-flush", + }); + registerAgentRunContext("run-tool-flush", { + sessionKey: "session-tool-flush", + verboseLevel: "off", + }); + toolEventRecipients.add("run-tool-flush", "conn-1"); + + handler({ + runId: "run-tool-flush", + seq: 1, + stream: "assistant", + ts: Date.now(), + data: { text: "Before tool" }, + }); + + // Throttled assistant update (within 150ms window). + now = 12_050; + handler({ + runId: "run-tool-flush", + seq: 2, + stream: "assistant", + ts: Date.now(), + data: { text: "Before tool expanded" }, + }); + + handler({ + runId: "run-tool-flush", + seq: 3, + stream: "tool", + ts: Date.now(), + data: { phase: "start", name: "read", toolCallId: "tool-flush-1" }, + }); + + const chatCalls = chatBroadcastCalls(broadcast); + expect(chatCalls).toHaveLength(2); + const flushedPayload = chatCalls[1]?.[1] as { + state?: string; + message?: { content?: Array<{ text?: string }> }; + }; + expect(flushedPayload.state).toBe("delta"); + expect(flushedPayload.message?.content?.[0]?.text).toBe("Before tool expanded"); + expect(sessionChatCalls(nodeSendToSession)).toHaveLength(2); + + expect(broadcastToConnIds).toHaveBeenCalledTimes(1); + const flushCallOrder = broadcast.mock.invocationCallOrder[1] ?? 0; + const toolCallOrder = broadcastToConnIds.mock.invocationCallOrder[0] ?? Number.MAX_SAFE_INTEGER; + expect(flushCallOrder).toBeLessThan(toolCallOrder); + nowSpy.mockRestore(); + resetAgentRunContextForTest(); + }); + it("routes tool events only to registered recipients when verbose is enabled", () => { const { broadcast, broadcastToConnIds, toolEventRecipients, handler } = createHarness({ resolveSessionKeyForRun: () => "session-1", diff --git a/src/gateway/server-chat.ts b/src/gateway/server-chat.ts index 5ce6e8471f5..b1a065684f8 100644 --- a/src/gateway/server-chat.ts +++ b/src/gateway/server-chat.ts @@ -390,6 +390,60 @@ export function createAgentEventHandler({ nodeSendToSession(sessionKey, "chat", payload); }; + const flushBufferedChatDeltaIfNeeded = ( + sessionKey: string, + clientRunId: string, + sourceRunId: string, + seq: number, + ) => { + const bufferedText = stripInlineDirectiveTagsForDisplay( + chatRunState.buffers.get(clientRunId) ?? "", + ).text.trim(); + const normalizedHeartbeatText = normalizeHeartbeatChatFinalText({ + runId: clientRunId, + sourceRunId, + text: bufferedText, + }); + const text = normalizedHeartbeatText.text.trim(); + const shouldSuppressSilent = + normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN); + const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text); + const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput( + clientRunId, + sourceRunId, + ); + if ( + !text || + shouldSuppressSilent || + shouldSuppressSilentLeadFragment || + shouldSuppressHeartbeatStreaming + ) { + return; + } + + const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0; + if (text.length <= lastBroadcastLen) { + return; + } + + const now = Date.now(); + const flushPayload = { + runId: clientRunId, + sessionKey, + seq, + state: "delta" as const, + message: { + role: "assistant", + content: [{ type: "text", text }], + timestamp: now, + }, + }; + broadcast("chat", flushPayload, { dropIfSlow: true }); + nodeSendToSession(sessionKey, "chat", flushPayload); + chatRunState.deltaLastBroadcastLen.set(clientRunId, text.length); + chatRunState.deltaSentAt.set(clientRunId, now); + }; + const emitChatFinal = ( sessionKey: string, clientRunId: string, @@ -410,38 +464,11 @@ export function createAgentEventHandler({ const text = normalizedHeartbeatText.text.trim(); const shouldSuppressSilent = normalizedHeartbeatText.suppress || isSilentReplyText(text, SILENT_REPLY_TOKEN); - const shouldSuppressSilentLeadFragment = isSilentReplyLeadFragment(text); - const shouldSuppressHeartbeatStreaming = shouldHideHeartbeatChatOutput( - clientRunId, - sourceRunId, - ); // Flush any throttled delta so streaming clients receive the complete text - // before the final event. The 150 ms throttle in emitChatDelta may have + // before the final event. The 150 ms throttle in emitChatDelta may have // suppressed the most recent chunk, leaving the client with stale text. // Only flush if the buffer has grown since the last broadcast to avoid duplicates. - if ( - text && - !shouldSuppressSilent && - !shouldSuppressSilentLeadFragment && - !shouldSuppressHeartbeatStreaming - ) { - const lastBroadcastLen = chatRunState.deltaLastBroadcastLen.get(clientRunId) ?? 0; - if (text.length > lastBroadcastLen) { - const flushPayload = { - runId: clientRunId, - sessionKey, - seq, - state: "delta" as const, - message: { - role: "assistant", - content: [{ type: "text", text }], - timestamp: Date.now(), - }, - }; - broadcast("chat", flushPayload, { dropIfSlow: true }); - nodeSendToSession(sessionKey, "chat", flushPayload); - } - } + flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, sourceRunId, seq); chatRunState.deltaLastBroadcastLen.delete(clientRunId); chatRunState.buffers.delete(clientRunId); chatRunState.deltaSentAt.delete(clientRunId); @@ -542,6 +569,12 @@ export function createAgentEventHandler({ } agentRunSeq.set(evt.runId, evt.seq); if (isToolEvent) { + const toolPhase = typeof evt.data?.phase === "string" ? evt.data.phase : ""; + // Flush pending assistant text before tool-start events so clients can + // render complete pre-tool text above tool cards (not truncated by delta throttle). + if (toolPhase === "start" && isControlUiVisible && sessionKey && !isAborted) { + flushBufferedChatDeltaIfNeeded(sessionKey, clientRunId, evt.runId, evt.seq); + } // Always broadcast tool events to registered WS recipients with // tool-events capability, regardless of verboseLevel. The verbose // setting only controls whether tool details are sent as channel