fix(ui): stream tool events live in control chat (#39104)

Land #39104 by @jakepresent.

Co-authored-by: Jake Present <jakepresent@microsoft.com>
This commit is contained in:
Peter Steinberger
2026-03-07 19:26:42 +00:00
parent 499c1ee6e3
commit de2ccffec1
9 changed files with 94 additions and 19 deletions

View File

@@ -258,22 +258,31 @@ function handleTerminalChatEvent(
host: GatewayHost,
payload: ChatEventPayload | undefined,
state: ReturnType<typeof handleChatEvent>,
) {
): boolean {
if (state !== "final" && state !== "error" && state !== "aborted") {
return;
return false;
}
resetToolStream(host as unknown as Parameters<typeof resetToolStream>[0]);
// Check if tool events were seen before resetting (resetToolStream clears toolStreamOrder).
const toolHost = host as unknown as Parameters<typeof resetToolStream>[0];
const hadToolEvents = toolHost.toolStreamOrder.length > 0;
resetToolStream(toolHost);
void flushChatQueueForEvent(host as unknown as Parameters<typeof flushChatQueueForEvent>[0]);
const runId = payload?.runId;
if (!runId || !host.refreshSessionsAfterChat.has(runId)) {
return;
if (runId && host.refreshSessionsAfterChat.has(runId)) {
host.refreshSessionsAfterChat.delete(runId);
if (state === "final") {
void loadSessions(host as unknown as OpenClawApp, {
activeMinutes: CHAT_SESSIONS_ACTIVE_MINUTES,
});
}
}
host.refreshSessionsAfterChat.delete(runId);
if (state === "final") {
void loadSessions(host as unknown as OpenClawApp, {
activeMinutes: CHAT_SESSIONS_ACTIVE_MINUTES,
});
// Reload history when tools were used so the persisted tool results
// replace the now-cleared streaming state.
if (hadToolEvents && state === "final") {
void loadChatHistory(host as unknown as OpenClawApp);
return true;
}
return false;
}
function handleChatGatewayEvent(host: GatewayHost, payload: ChatEventPayload | undefined) {
@@ -284,8 +293,8 @@ function handleChatGatewayEvent(host: GatewayHost, payload: ChatEventPayload | u
);
}
const state = handleChatEvent(host as unknown as OpenClawApp, payload);
handleTerminalChatEvent(host, payload, state);
if (state === "final" && shouldReloadHistoryForFinalEvent(payload)) {
const historyReloaded = handleTerminalChatEvent(host, payload, state);
if (state === "final" && !historyReloaded && shouldReloadHistoryForFinalEvent(payload)) {
void loadChatHistory(host as unknown as OpenClawApp);
}
}
@@ -307,6 +316,17 @@ function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) {
host as unknown as Parameters<typeof handleAgentEvent>[0],
evt.payload as AgentEventPayload | undefined,
);
// Reload history after each tool result so the persisted text + tool
// output replaces any truncated streaming fragments.
const agentPayload = evt.payload as AgentEventPayload | undefined;
const toolData = agentPayload?.data;
if (
agentPayload?.stream === "tool" &&
typeof toolData?.phase === "string" &&
toolData.phase === "result"
) {
void loadChatHistory(host as unknown as OpenClawApp);
}
return;
}

View File

@@ -1029,6 +1029,7 @@ export function renderApp(state: AppViewState) {
assistantAvatarUrl: chatAvatarUrl,
messages: state.chatMessages,
toolMessages: state.chatToolMessages,
streamSegments: state.chatStreamSegments,
stream: state.chatStream,
streamStartedAt: state.chatStreamStartedAt,
draft: state.chatMessage,

View File

@@ -13,6 +13,9 @@ function createHost(overrides?: Partial<MutableHost>): MutableHost {
return {
sessionKey: "main",
chatRunId: null,
chatStream: null,
chatStreamStartedAt: null,
chatStreamSegments: [],
toolStreamById: new Map<string, ToolStreamEntry>(),
toolStreamOrder: [],
chatToolMessages: [],

View File

@@ -28,6 +28,9 @@ export type ToolStreamEntry = {
type ToolStreamHost = {
sessionKey: string;
chatRunId: string | null;
chatStream: string | null;
chatStreamStartedAt: number | null;
chatStreamSegments: Array<{ text: string; ts: number }>;
toolStreamById: Map<string, ToolStreamEntry>;
toolStreamOrder: string[];
chatToolMessages: Record<string, unknown>[];
@@ -231,10 +234,14 @@ export function scheduleToolStreamSync(host: ToolStreamHost, force = false) {
}
export function resetToolStream(host: ToolStreamHost) {
if (host.toolStreamSyncTimer != null) {
clearTimeout(host.toolStreamSyncTimer);
host.toolStreamSyncTimer = null;
}
host.toolStreamById.clear();
host.toolStreamOrder = [];
host.chatToolMessages = [];
flushToolStreamSync(host);
host.chatStreamSegments = [];
}
export type CompactionStatus = {
@@ -401,11 +408,14 @@ export function handleAgentEvent(host: ToolStreamHost, payload?: AgentEventPaylo
if (payload.stream !== "tool") {
return;
}
const accepted = resolveAcceptedSession(host, payload);
if (!accepted.accepted) {
// Filter by session only. Don't check chatRunId because the client sets it
// to a client-generated UUID (via generateUUID in sendChatMessage), while
// tool events arrive with the server's engine runId — they can never match.
const sessionKey = typeof payload.sessionKey === "string" ? payload.sessionKey : undefined;
if (sessionKey && sessionKey !== host.sessionKey) {
return;
}
const sessionKey = accepted.sessionKey;
const data = payload.data ?? {};
const toolCallId = typeof data.toolCallId === "string" ? data.toolCallId : "";
@@ -425,6 +435,13 @@ export function handleAgentEvent(host: ToolStreamHost, payload?: AgentEventPaylo
const now = Date.now();
let entry = host.toolStreamById.get(toolCallId);
if (!entry) {
// Commit any in-progress streaming text as a segment so it renders
// above the tool card instead of below it.
if (host.chatStream && host.chatStream.trim().length > 0) {
host.chatStreamSegments = [...host.chatStreamSegments, { text: host.chatStream, ts: now }];
host.chatStream = null;
host.chatStreamStartedAt = null;
}
entry = {
toolCallId,
runId: payload.runId,

View File

@@ -144,6 +144,7 @@ export class OpenClawApp extends LitElement {
@state() chatMessage = "";
@state() chatMessages: unknown[] = [];
@state() chatToolMessages: unknown[] = [];
@state() chatStreamSegments: Array<{ text: string; ts: number }> = [];
@state() chatStream: string | null = null;
@state() chatStreamStartedAt: number | null = null;
@state() chatRunId: string | null = null;

View File

@@ -1,3 +1,4 @@
import { resetToolStream } from "../app-tool-stream.ts";
import { extractText } from "../chat/message-extract.ts";
import type { GatewayBrowserClient } from "../gateway.ts";
import type { ChatAttachment } from "../ui-types.ts";
@@ -50,6 +51,18 @@ export type ChatEventPayload = {
errorMessage?: string;
};
function maybeResetToolStream(state: ChatState) {
const toolHost = state as ChatState & Partial<Parameters<typeof resetToolStream>[0]>;
if (
toolHost.toolStreamById instanceof Map &&
Array.isArray(toolHost.toolStreamOrder) &&
Array.isArray(toolHost.chatToolMessages) &&
Array.isArray(toolHost.chatStreamSegments)
) {
resetToolStream(toolHost as Parameters<typeof resetToolStream>[0]);
}
}
export async function loadChatHistory(state: ChatState) {
if (!state.client || !state.connected) {
return;
@@ -67,6 +80,11 @@ export async function loadChatHistory(state: ChatState) {
const messages = Array.isArray(res.messages) ? res.messages : [];
state.chatMessages = messages.filter((message) => !isAssistantSilentReply(message));
state.chatThinkingLevel = res.thinkingLevel ?? null;
// Clear all streaming state — history includes tool results and text
// inline, so keeping streaming artifacts would cause duplicates.
maybeResetToolStream(state);
state.chatStream = null;
state.chatStreamStartedAt = null;
} catch (err) {
state.lastError = String(err);
} finally {

View File

@@ -241,7 +241,7 @@ export class GatewayBrowserClient {
role,
scopes,
device,
caps: [],
caps: ["tool-events"],
auth,
userAgent: navigator.userAgent,
locale: navigator.language,

View File

@@ -43,6 +43,7 @@ export type ChatProps = {
fallbackStatus?: FallbackIndicatorStatus | null;
messages: unknown[];
toolMessages: unknown[];
streamSegments: Array<{ text: string; ts: number }>;
stream: string | null;
streamStartedAt: number | null;
assistantAvatarUrl?: string | null;
@@ -566,8 +567,21 @@ function buildChatItems(props: ChatProps): Array<ChatItem | MessageGroup> {
message: msg,
});
}
if (props.showThinking) {
for (let i = 0; i < tools.length; i++) {
// Interleave stream segments and tool cards in order. Each segment
// contains text that was streaming before the corresponding tool started.
// This ensures correct visual ordering: text → tool → text → tool → ...
const segments = props.streamSegments ?? [];
const maxLen = Math.max(segments.length, tools.length);
for (let i = 0; i < maxLen; i++) {
if (i < segments.length && segments[i].text.trim().length > 0) {
items.push({
kind: "stream" as const,
key: `stream-seg:${props.sessionKey}:${i}`,
text: segments[i].text,
startedAt: segments[i].ts,
});
}
if (i < tools.length) {
items.push({
kind: "message",
key: messageKey(tools[i], i + history.length),