From 98ff56d70e330e421312cd09c6553f2e09af9fd8 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 3 Jun 2026 05:02:06 -0700 Subject: [PATCH] perf(ui): trace chat send server milestones Add operator-only Control UI chat send timing milestones across gateway dispatch, model selection, agent-run start, dispatch completion, and post-dispatch completion. The Control UI records these server phases into the existing chat send timing buffer, and the gateway broadcast guard now scopes the new timing event with other read-visible chat events. --- src/gateway/gateway-misc.test.ts | 9 +- src/gateway/server-broadcast.ts | 1 + src/gateway/server-methods/chat.ts | 92 +++++++++ .../server.chat.gateway-server-chat-b.test.ts | 188 ++++++++++++++++-- ui/src/ui/app-chat.test.ts | 53 +++++ ui/src/ui/app-chat.ts | 93 +++++++++ ui/src/ui/app-gateway.ts | 9 + ui/src/ui/e2e/chat-flow.e2e.test.ts | 22 ++ 8 files changed, 446 insertions(+), 21 deletions(-) diff --git a/src/gateway/gateway-misc.test.ts b/src/gateway/gateway-misc.test.ts index fc1f5f50bd1..8de06d8e789 100644 --- a/src/gateway/gateway-misc.test.ts +++ b/src/gateway/gateway-misc.test.ts @@ -376,6 +376,7 @@ function broadcastChatClassEvents( ) { broadcast("chat", chatPayload()); broadcast("agent", { type: "status", sessionKey: "agent:main:main" }); + broadcast("chat.send_timing", { phase: "dispatch-started", runId: "run-1" }); broadcast("chat.side_result", chatSideResultPayload()); } @@ -427,10 +428,10 @@ describe("gateway broadcaster", () => { expect(pairingSocket.send).not.toHaveBeenCalled(); expect(nodeSocket.send).not.toHaveBeenCalled(); - expect(readSocket.send).toHaveBeenCalledTimes(3); - expect(writeSocket.send).toHaveBeenCalledTimes(3); - expect(adminSocket.send).toHaveBeenCalledTimes(3); - const expectedEvents = ["chat", "agent", "chat.side_result"]; + expect(readSocket.send).toHaveBeenCalledTimes(4); + expect(writeSocket.send).toHaveBeenCalledTimes(4); + expect(adminSocket.send).toHaveBeenCalledTimes(4); + const expectedEvents = ["chat", "agent", "chat.send_timing", "chat.side_result"]; expectSentEvents(readSocket, expectedEvents); expectSentEvents(writeSocket, expectedEvents); expectSentEvents(adminSocket, expectedEvents); diff --git a/src/gateway/server-broadcast.ts b/src/gateway/server-broadcast.ts index 15a428fe362..2437e028ca3 100644 --- a/src/gateway/server-broadcast.ts +++ b/src/gateway/server-broadcast.ts @@ -21,6 +21,7 @@ import { logWs, shouldLogWs, summarizeAgentEventForWsLog } from "./ws-log.js"; const EVENT_SCOPE_GUARDS: Record = { agent: [READ_SCOPE], chat: [READ_SCOPE], + "chat.send_timing": [READ_SCOPE], "chat.side_result": [READ_SCOPE], cron: [READ_SCOPE], health: [], diff --git a/src/gateway/server-methods/chat.ts b/src/gateway/server-methods/chat.ts index 9c07f26f420..06a3407d98a 100644 --- a/src/gateway/server-methods/chat.ts +++ b/src/gateway/server-methods/chat.ts @@ -166,6 +166,7 @@ import { import { loadOptionalServerMethodModelCatalog } from "./optional-model-catalog.js"; import { hasTrackedActiveSessionRun } from "./session-active-runs.js"; import type { + GatewayClient, GatewayRequestContext, GatewayRequestHandlerOptions, GatewayRequestHandlers, @@ -219,6 +220,13 @@ type ChatSendAckServerTiming = { prepareAttachmentsMs?: number; }; +type ChatSendServerTimingPhase = + | "dispatch-started" + | "model-selected" + | "agent-run-started" + | "dispatch-completed" + | "post-dispatch-completed"; + function roundedChatSendTimingMs(value: number): number { return Math.max(0, Math.round(value * 1000) / 1000); } @@ -245,6 +253,47 @@ function shouldIncludeChatSendAckServerTiming(client?: { return isOperatorUiClient(client); } +function emitOperatorChatSendServerTiming(params: { + context: Pick; + client?: GatewayClient | null; + phase: ChatSendServerTimingPhase; + runId: string; + sessionKey: string; + agentId?: string; + receivedAtMs: number; + ackedAtMs: number; + dispatchStartedAtMs?: number; + extra?: Record; +}) { + const connId = + typeof params.client?.connId === "string" && params.client.connId.trim() + ? params.client.connId.trim() + : undefined; + if (!connId || !isOperatorUiClient(params.client?.connect?.client)) { + return; + } + const nowMs = performance.now(); + params.context.broadcastToConnIds( + "chat.send_timing", + { + phase: params.phase, + runId: params.runId, + sessionKey: params.sessionKey, + ...(params.agentId ? { agentId: params.agentId } : {}), + ackToPhaseMs: roundedChatSendTimingMs(nowMs - params.ackedAtMs), + receivedToPhaseMs: roundedChatSendTimingMs(nowMs - params.receivedAtMs), + ...(params.dispatchStartedAtMs !== undefined + ? { + dispatchStartedToPhaseMs: roundedChatSendTimingMs(nowMs - params.dispatchStartedAtMs), + } + : {}), + ...params.extra, + }, + new Set([connId]), + { dropIfSlow: true }, + ); +} + async function handleChatMetadataRequest({ params, respond, @@ -3392,6 +3441,7 @@ export const chatHandlers: GatewayRequestHandlers = { { config: cfg }, ); respond(true, ackPayload, undefined, { runId: clientRunId }); + const chatSendAckedAtMs = performance.now(); const persistedImagesPromise = persistChatSendImages({ images: parsedImages, imageOrder, @@ -3693,6 +3743,26 @@ export const chatHandlers: GatewayRequestHandlers = { }, }); + const emitServerTiming = ( + phase: ChatSendServerTimingPhase, + extra?: Record, + dispatchStartedAtMs?: number, + ) => { + emitOperatorChatSendServerTiming({ + context, + client, + phase, + runId: clientRunId, + sessionKey, + agentId, + receivedAtMs: chatSendReceivedAtMs, + ackedAtMs: chatSendAckedAtMs, + dispatchStartedAtMs, + extra, + }); + }; + const dispatchStartedAtMs = performance.now(); + emitServerTiming("dispatch-started"); void measureDiagnosticsTimelineSpan( "gateway.chat_send.dispatch_inbound", async () => { @@ -3721,6 +3791,11 @@ export const chatHandlers: GatewayRequestHandlers = { userTurnTranscriptRecorder: userTurnRecorder, onAgentRunStart: (runId) => { agentRunStarted = true; + emitServerTiming( + "agent-run-started", + runId !== clientRunId ? { agentRunId: runId } : undefined, + dispatchStartedAtMs, + ); const connId = typeof client?.connId === "string" ? client.connId : undefined; const wantsToolEvents = hasGatewayClientCap( client?.connect?.caps, @@ -3761,6 +3836,14 @@ export const chatHandlers: GatewayRequestHandlers = { }), }); onModelSelected(modelSelection); + emitServerTiming( + "model-selected", + { + provider: modelSelection.provider, + model: modelSelection.model, + }, + dispatchStartedAtMs, + ); }, }, }); @@ -3776,6 +3859,8 @@ export const chatHandlers: GatewayRequestHandlers = { }, ) .then(async () => { + emitServerTiming("dispatch-completed", undefined, dispatchStartedAtMs); + const postDispatchStartedAtMs = performance.now(); await measureDiagnosticsTimelineSpan( "gateway.chat_send.post_dispatch", async () => { @@ -4362,6 +4447,13 @@ export const chatHandlers: GatewayRequestHandlers = { attributes: chatSendTraceAttributes, }, ); + emitServerTiming( + "post-dispatch-completed", + { + postDispatchMs: roundedChatSendTimingMs(performance.now() - postDispatchStartedAtMs), + }, + dispatchStartedAtMs, + ); }) .catch(async (err: unknown) => { const emitAfterError = diff --git a/src/gateway/server.chat.gateway-server-chat-b.test.ts b/src/gateway/server.chat.gateway-server-chat-b.test.ts index 4b2b1612914..460576fdeac 100644 --- a/src/gateway/server.chat.gateway-server-chat-b.test.ts +++ b/src/gateway/server.chat.gateway-server-chat-b.test.ts @@ -861,23 +861,26 @@ describe("gateway server chat", () => { }); const first = Promise.resolve(callSend("first", "idem-active-a")); - await vi.waitFor(() => { - expect(responses).toEqual([ - { - id: "first", - ok: true, - payload: expect.objectContaining({ - runId: "idem-active-a", - status: "started", - serverTiming: { - receivedToAckMs: expect.any(Number), - loadSessionMs: expect.any(Number), - }, - }), - error: undefined, - }, - ]); - }, FAST_WAIT_OPTS); + await vi.waitFor( + () => { + expect(responses).toEqual([ + { + id: "first", + ok: true, + payload: expect.objectContaining({ + runId: "idem-active-a", + status: "started", + serverTiming: { + receivedToAckMs: expect.any(Number), + loadSessionMs: expect.any(Number), + }, + }), + error: undefined, + }, + ]); + }, + { timeout: 2_000, interval: 5 }, + ); await callSend("duplicate", "idem-active-b"); @@ -1053,6 +1056,157 @@ describe("gateway server chat", () => { } }); + test("chat.send emits operator-only post-ACK server timing milestones", async () => { + const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-")); + try { + testState.sessionStorePath = path.join(sessionDir, "sessions.json"); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + }); + + const responses: Array<{ ok: boolean; payload?: unknown; error?: unknown }> = []; + const broadcastToConnIds = vi.fn(); + const context = { + loadGatewayModelCatalog: vi.fn(), + logGateway: { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }, + agentRunSeq: new Map(), + chatAbortControllers: new Map(), + chatAbortedRuns: new Map(), + chatRunBuffers: new Map(), + chatDeltaSentAt: new Map(), + chatDeltaLastBroadcastLen: new Map(), + chatDeltaLastBroadcastText: new Map(), + agentDeltaSentAt: new Map(), + bufferedAgentEvents: new Map(), + clearChatRunState: vi.fn(), + addChatRun: vi.fn(), + removeChatRun: vi.fn(), + broadcast: vi.fn(), + broadcastToConnIds, + nodeSendToSession: vi.fn(), + registerToolEventRecipient: vi.fn(), + dedupe: new Map(), + } as unknown as GatewayRequestContext; + dispatchInboundMessageMock.mockImplementationOnce(async (args: unknown) => { + const replyOptions = (args as { replyOptions?: GetReplyOptions }).replyOptions; + replyOptions?.onModelSelected?.({ + provider: "openai", + model: "gpt-5.5", + thinkLevel: undefined, + }); + replyOptions?.onAgentRunStart?.("agent-run-1"); + return {}; + }); + + const { chatHandlers } = await import("./server-methods/chat.js"); + await chatHandlers["chat.send"]({ + req: { + type: "req", + id: "operator-timing", + method: "chat.send", + params: { + sessionKey: "main", + message: "measure", + idempotencyKey: "idem-server-timing", + }, + }, + params: { + sessionKey: "main", + message: "measure", + idempotencyKey: "idem-server-timing", + }, + client: { + connId: "conn-control-ui", + connect: { + client: { + id: GATEWAY_CLIENT_NAMES.CONTROL_UI, + mode: GATEWAY_CLIENT_MODES.WEBCHAT, + }, + scopes: ["operator.write"], + }, + } as never, + isWebchatConnect: () => true, + respond: ((ok, payload, error) => { + responses.push({ ok, payload, error }); + }) as RespondFn, + context, + }); + + expect(responses).toEqual([ + { + ok: true, + payload: expect.objectContaining({ + runId: "idem-server-timing", + status: "started", + serverTiming: { + receivedToAckMs: expect.any(Number), + loadSessionMs: expect.any(Number), + }, + }), + error: undefined, + }, + ]); + await vi.waitFor( + () => { + const phases = broadcastToConnIds.mock.calls + .filter(([event]) => event === "chat.send_timing") + .map(([, payload]) => (payload as { phase?: unknown }).phase); + expect(phases).toEqual( + expect.arrayContaining([ + "dispatch-started", + "model-selected", + "agent-run-started", + "dispatch-completed", + "post-dispatch-completed", + ]), + ); + }, + { timeout: 2_000, interval: 5 }, + ); + for (const [event, payload, connIds, opts] of broadcastToConnIds.mock.calls) { + expect(event).toBe("chat.send_timing"); + expect(connIds).toEqual(new Set(["conn-control-ui"])); + expect(opts).toEqual({ dropIfSlow: true }); + expect(payload).toMatchObject({ + runId: "idem-server-timing", + sessionKey: "agent:main:main", + ackToPhaseMs: expect.any(Number), + receivedToPhaseMs: expect.any(Number), + }); + } + const timingPayloads = broadcastToConnIds.mock.calls.map(([, payload]) => payload); + expect(timingPayloads).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + phase: "model-selected", + provider: "openai", + model: "gpt-5.5", + }), + expect.objectContaining({ + phase: "agent-run-started", + agentRunId: "agent-run-1", + dispatchStartedToPhaseMs: expect.any(Number), + }), + ]), + ); + } finally { + dispatchInboundMessageMock.mockReset(); + testState.sessionStorePath = undefined; + clearConfigCache(); + await fs.rm(sessionDir, { recursive: true, force: true }); + } + }); + test("chat.history backfills claude-cli sessions from Claude project files", async () => { await withGatewayChatHarness(async ({ ws, createSessionDir }) => { await connectOk(ws); diff --git a/ui/src/ui/app-chat.test.ts b/ui/src/ui/app-chat.test.ts index 5f7d6e92fae..d11663a04e2 100644 --- a/ui/src/ui/app-chat.test.ts +++ b/ui/src/ui/app-chat.test.ts @@ -52,6 +52,7 @@ let clearPendingQueueItemsForRun: typeof import("./app-chat.ts").clearPendingQue let removeQueuedMessage: typeof import("./app-chat.ts").removeQueuedMessage; let markQueuedChatSendsWaitingForReconnect: typeof import("./app-chat.ts").markQueuedChatSendsWaitingForReconnect; let retryReconnectableQueuedChatSends: typeof import("./app-chat.ts").retryReconnectableQueuedChatSends; +let recordChatSendServerTiming: typeof import("./app-chat.ts").recordChatSendServerTiming; async function loadChatHelpers(): Promise { ({ @@ -66,6 +67,7 @@ async function loadChatHelpers(): Promise { removeQueuedMessage, markQueuedChatSendsWaitingForReconnect, retryReconnectableQueuedChatSends, + recordChatSendServerTiming, } = await import("./app-chat.ts")); } @@ -1374,6 +1376,57 @@ describe("handleSendChat", () => { }); }); + it("records Gateway post-ACK server timing milestones for a chat send", async () => { + const request = vi.fn(async (method: string) => { + if (method === "chat.send") { + return { status: "started" }; + } + throw new Error(`Unexpected request: ${method}`); + }); + const host = makeHost({ + client: { request } as unknown as ChatHost["client"], + chatMessage: "measure server milestone", + eventLogBuffer: [], + tab: "debug", + }); + + await handleSendChat(host); + + const ack = eventPayloads(host, "control-ui.chat.send").find( + (payload) => payload.phase === "ack", + ); + const runId = typeof ack?.runId === "string" ? ack.runId : ""; + expect(runId).toMatch(uuidPattern); + + recordChatSendServerTiming(host, { + phase: "agent-run-started", + runId, + sessionKey: "agent:main", + agentId: "main", + ackToPhaseMs: 12, + receivedToPhaseMs: 25, + dispatchStartedToPhaseMs: 8, + agentRunId: "agent-run-1", + }); + + expect(eventPayloads(host, "control-ui.chat.send")).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + phase: "server-agent-run-started", + runId, + sessionKey: "agent:main", + agentId: "main", + ackStatus: "started", + serverPhase: "agent-run-started", + serverAckToPhaseMs: 12, + serverReceivedToPhaseMs: 25, + serverDispatchStartedToPhaseMs: 8, + agentRunId: "agent-run-1", + }), + ]), + ); + }); + it("records pending send paint timing before a delayed chat.send ACK", async () => { vi.spyOn(window, "requestAnimationFrame").mockImplementation((callback) => { queueMicrotask(() => callback(0)); diff --git a/ui/src/ui/app-chat.ts b/ui/src/ui/app-chat.ts index d78db3f7e88..9dc1b7c9712 100644 --- a/ui/src/ui/app-chat.ts +++ b/ui/src/ui/app-chat.ts @@ -597,6 +597,11 @@ type ChatSendTimingPhase = | "pending-painted" | "request-start" | "ack" + | "server-dispatch-started" + | "server-model-selected" + | "server-agent-run-started" + | "server-dispatch-completed" + | "server-post-dispatch-completed" | "first-assistant-visible" | "terminal-before-delta" | "queued-busy" @@ -617,6 +622,21 @@ type ChatSendTimingEntry = { firstAssistantVisibleRecorded?: boolean; }; +type ChatSendServerTimingPhase = + | "dispatch-started" + | "model-selected" + | "agent-run-started" + | "dispatch-completed" + | "post-dispatch-completed"; + +const CHAT_SEND_SERVER_TIMING_PHASES = new Set([ + "dispatch-started", + "model-selected", + "agent-run-started", + "dispatch-completed", + "post-dispatch-completed", +]); + function recordChatSendTiming( host: ChatHost, item: Pick< @@ -647,6 +667,79 @@ function recordChatSendTiming( ); } +function readChatSendServerTimingPhase(value: unknown): ChatSendServerTimingPhase | null { + return typeof value === "string" && + (CHAT_SEND_SERVER_TIMING_PHASES as ReadonlySet).has(value) + ? (value as ChatSendServerTimingPhase) + : null; +} + +function readChatSendTimingNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined; +} + +export function recordChatSendServerTiming(host: ChatHost, payload: unknown) { + if (!payload || typeof payload !== "object") { + return; + } + const record = payload as Record; + const phase = readChatSendServerTimingPhase(record.phase); + const runId = typeof record.runId === "string" && record.runId.trim() ? record.runId.trim() : ""; + if (!phase || !runId) { + return; + } + const entry = host.chatSendTimingsByRun?.get(runId); + const nowMs = controlUiNowMs(); + const serverAckToPhaseMs = readChatSendTimingNumber(record.ackToPhaseMs); + const serverReceivedToPhaseMs = readChatSendTimingNumber(record.receivedToPhaseMs); + const serverDispatchStartedToPhaseMs = readChatSendTimingNumber(record.dispatchStartedToPhaseMs); + const serverPostDispatchMs = readChatSendTimingNumber(record.postDispatchMs); + const durationMs = + entry?.submittedAtMs !== undefined + ? roundedControlUiDurationMs(nowMs - entry.submittedAtMs) + : serverAckToPhaseMs; + if (durationMs === undefined) { + return; + } + recordControlUiPerformanceEvent( + host as Parameters[0], + "control-ui.chat.send", + { + phase: `server-${phase}`, + durationMs, + runId, + sessionKey: + entry?.sessionKey ?? + (typeof record.sessionKey === "string" && record.sessionKey.trim() + ? record.sessionKey.trim() + : undefined), + agentId: + entry?.agentId ?? + (typeof record.agentId === "string" && record.agentId.trim() + ? record.agentId.trim() + : undefined), + sendAttempts: entry?.sendAttempts ?? 0, + sendState: entry?.sendState, + ackStatus: entry?.ackStatus, + serverPhase: phase, + ...(serverAckToPhaseMs !== undefined ? { serverAckToPhaseMs } : {}), + ...(serverReceivedToPhaseMs !== undefined ? { serverReceivedToPhaseMs } : {}), + ...(serverDispatchStartedToPhaseMs !== undefined ? { serverDispatchStartedToPhaseMs } : {}), + ...(serverPostDispatchMs !== undefined ? { serverPostDispatchMs } : {}), + ...(typeof record.provider === "string" && record.provider.trim() + ? { provider: record.provider.trim() } + : {}), + ...(typeof record.model === "string" && record.model.trim() + ? { model: record.model.trim() } + : {}), + ...(typeof record.agentRunId === "string" && record.agentRunId.trim() + ? { agentRunId: record.agentRunId.trim() } + : {}), + }, + { console: false, maxBufferedEventsForType: 40 }, + ); +} + function ensureChatSendTimingEntries(host: ChatHost): Map { if (host.chatSendTimingsByRun) { return host.chatSendTimingsByRun; diff --git a/ui/src/ui/app-gateway.ts b/ui/src/ui/app-gateway.ts index c0f07ad9854..0668ae9db83 100644 --- a/ui/src/ui/app-gateway.ts +++ b/ui/src/ui/app-gateway.ts @@ -9,6 +9,7 @@ import { flushChatQueueForEvent, hasReconnectableQueuedChatSends, markQueuedChatSendsWaitingForReconnect, + recordChatSendServerTiming, recordFirstAssistantChatTiming, refreshChatAvatar, scopedAgentListParamsForRefreshTarget, @@ -1222,6 +1223,14 @@ function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) { return; } + if (evt.event === "chat.send_timing") { + recordChatSendServerTiming( + host as unknown as Parameters[0], + evt.payload, + ); + return; + } + if (evt.event === "chat.side_result") { const sideResult = parseChatSideResult(evt.payload); if ( diff --git a/ui/src/ui/e2e/chat-flow.e2e.test.ts b/ui/src/ui/e2e/chat-flow.e2e.test.ts index 85cf3eb9f65..7451dba1264 100644 --- a/ui/src/ui/e2e/chat-flow.e2e.test.ts +++ b/ui/src/ui/e2e/chat-flow.e2e.test.ts @@ -374,6 +374,17 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => { const runId = requireString(params.idempotencyKey, "chat send idempotency key"); await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 }); await waitForControlUiChatSendPhases(page, runId, ["ack"]); + await gateway.emitGatewayEvent("chat.send_timing", { + phase: "agent-run-started", + runId, + agentId: "ops", + sessionKey: "global", + ackToPhaseMs: 11, + receivedToPhaseMs: 20, + dispatchStartedToPhaseMs: 7, + agentRunId: "agent-run-e2e", + }); + await waitForControlUiChatSendPhases(page, runId, ["server-agent-run-started"]); await gateway.emitGatewayEvent("chat", { deltaText: "First token visible.", message: { @@ -391,6 +402,7 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => { "pending-visible", "request-start", "ack", + "server-agent-run-started", "first-assistant-visible", ]); const sendTimingEvents = (await controlUiEventPayloads(page, "control-ui.chat.send")).filter( @@ -415,6 +427,16 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => { sessionKey: "global", }); expect(ackTiming?.requestDurationMs).toEqual(expect.any(Number)); + expect(sendTimingByPhase.get("server-agent-run-started")).toMatchObject({ + agentRunId: "agent-run-e2e", + agentId: "ops", + runId, + serverAckToPhaseMs: 11, + serverDispatchStartedToPhaseMs: 7, + serverPhase: "agent-run-started", + serverReceivedToPhaseMs: 20, + sessionKey: "global", + }); const firstVisibleTiming = sendTimingByPhase.get("first-assistant-visible"); expect(firstVisibleTiming).toMatchObject({ ackStatus: "started",