perf(ui): trace chat send server milestones

Add operator-only Control UI chat send timing milestones across gateway dispatch, model selection, agent-run start, dispatch completion, and post-dispatch completion. The Control UI records these server phases into the existing chat send timing buffer, and the gateway broadcast guard now scopes the new timing event with other read-visible chat events.
This commit is contained in:
Vincent Koc
2026-06-03 05:02:06 -07:00
committed by GitHub
parent 03ccdb9fbc
commit 98ff56d70e
8 changed files with 446 additions and 21 deletions

View File

@@ -376,6 +376,7 @@ function broadcastChatClassEvents(
) {
broadcast("chat", chatPayload());
broadcast("agent", { type: "status", sessionKey: "agent:main:main" });
broadcast("chat.send_timing", { phase: "dispatch-started", runId: "run-1" });
broadcast("chat.side_result", chatSideResultPayload());
}
@@ -427,10 +428,10 @@ describe("gateway broadcaster", () => {
expect(pairingSocket.send).not.toHaveBeenCalled();
expect(nodeSocket.send).not.toHaveBeenCalled();
expect(readSocket.send).toHaveBeenCalledTimes(3);
expect(writeSocket.send).toHaveBeenCalledTimes(3);
expect(adminSocket.send).toHaveBeenCalledTimes(3);
const expectedEvents = ["chat", "agent", "chat.side_result"];
expect(readSocket.send).toHaveBeenCalledTimes(4);
expect(writeSocket.send).toHaveBeenCalledTimes(4);
expect(adminSocket.send).toHaveBeenCalledTimes(4);
const expectedEvents = ["chat", "agent", "chat.send_timing", "chat.side_result"];
expectSentEvents(readSocket, expectedEvents);
expectSentEvents(writeSocket, expectedEvents);
expectSentEvents(adminSocket, expectedEvents);

View File

@@ -21,6 +21,7 @@ import { logWs, shouldLogWs, summarizeAgentEventForWsLog } from "./ws-log.js";
const EVENT_SCOPE_GUARDS: Record<string, string[]> = {
agent: [READ_SCOPE],
chat: [READ_SCOPE],
"chat.send_timing": [READ_SCOPE],
"chat.side_result": [READ_SCOPE],
cron: [READ_SCOPE],
health: [],

View File

@@ -166,6 +166,7 @@ import {
import { loadOptionalServerMethodModelCatalog } from "./optional-model-catalog.js";
import { hasTrackedActiveSessionRun } from "./session-active-runs.js";
import type {
GatewayClient,
GatewayRequestContext,
GatewayRequestHandlerOptions,
GatewayRequestHandlers,
@@ -219,6 +220,13 @@ type ChatSendAckServerTiming = {
prepareAttachmentsMs?: number;
};
type ChatSendServerTimingPhase =
| "dispatch-started"
| "model-selected"
| "agent-run-started"
| "dispatch-completed"
| "post-dispatch-completed";
function roundedChatSendTimingMs(value: number): number {
return Math.max(0, Math.round(value * 1000) / 1000);
}
@@ -245,6 +253,47 @@ function shouldIncludeChatSendAckServerTiming(client?: {
return isOperatorUiClient(client);
}
function emitOperatorChatSendServerTiming(params: {
context: Pick<GatewayRequestContext, "broadcastToConnIds">;
client?: GatewayClient | null;
phase: ChatSendServerTimingPhase;
runId: string;
sessionKey: string;
agentId?: string;
receivedAtMs: number;
ackedAtMs: number;
dispatchStartedAtMs?: number;
extra?: Record<string, string | number>;
}) {
const connId =
typeof params.client?.connId === "string" && params.client.connId.trim()
? params.client.connId.trim()
: undefined;
if (!connId || !isOperatorUiClient(params.client?.connect?.client)) {
return;
}
const nowMs = performance.now();
params.context.broadcastToConnIds(
"chat.send_timing",
{
phase: params.phase,
runId: params.runId,
sessionKey: params.sessionKey,
...(params.agentId ? { agentId: params.agentId } : {}),
ackToPhaseMs: roundedChatSendTimingMs(nowMs - params.ackedAtMs),
receivedToPhaseMs: roundedChatSendTimingMs(nowMs - params.receivedAtMs),
...(params.dispatchStartedAtMs !== undefined
? {
dispatchStartedToPhaseMs: roundedChatSendTimingMs(nowMs - params.dispatchStartedAtMs),
}
: {}),
...params.extra,
},
new Set([connId]),
{ dropIfSlow: true },
);
}
async function handleChatMetadataRequest({
params,
respond,
@@ -3392,6 +3441,7 @@ export const chatHandlers: GatewayRequestHandlers = {
{ config: cfg },
);
respond(true, ackPayload, undefined, { runId: clientRunId });
const chatSendAckedAtMs = performance.now();
const persistedImagesPromise = persistChatSendImages({
images: parsedImages,
imageOrder,
@@ -3693,6 +3743,26 @@ export const chatHandlers: GatewayRequestHandlers = {
},
});
const emitServerTiming = (
phase: ChatSendServerTimingPhase,
extra?: Record<string, string | number>,
dispatchStartedAtMs?: number,
) => {
emitOperatorChatSendServerTiming({
context,
client,
phase,
runId: clientRunId,
sessionKey,
agentId,
receivedAtMs: chatSendReceivedAtMs,
ackedAtMs: chatSendAckedAtMs,
dispatchStartedAtMs,
extra,
});
};
const dispatchStartedAtMs = performance.now();
emitServerTiming("dispatch-started");
void measureDiagnosticsTimelineSpan(
"gateway.chat_send.dispatch_inbound",
async () => {
@@ -3721,6 +3791,11 @@ export const chatHandlers: GatewayRequestHandlers = {
userTurnTranscriptRecorder: userTurnRecorder,
onAgentRunStart: (runId) => {
agentRunStarted = true;
emitServerTiming(
"agent-run-started",
runId !== clientRunId ? { agentRunId: runId } : undefined,
dispatchStartedAtMs,
);
const connId = typeof client?.connId === "string" ? client.connId : undefined;
const wantsToolEvents = hasGatewayClientCap(
client?.connect?.caps,
@@ -3761,6 +3836,14 @@ export const chatHandlers: GatewayRequestHandlers = {
}),
});
onModelSelected(modelSelection);
emitServerTiming(
"model-selected",
{
provider: modelSelection.provider,
model: modelSelection.model,
},
dispatchStartedAtMs,
);
},
},
});
@@ -3776,6 +3859,8 @@ export const chatHandlers: GatewayRequestHandlers = {
},
)
.then(async () => {
emitServerTiming("dispatch-completed", undefined, dispatchStartedAtMs);
const postDispatchStartedAtMs = performance.now();
await measureDiagnosticsTimelineSpan(
"gateway.chat_send.post_dispatch",
async () => {
@@ -4362,6 +4447,13 @@ export const chatHandlers: GatewayRequestHandlers = {
attributes: chatSendTraceAttributes,
},
);
emitServerTiming(
"post-dispatch-completed",
{
postDispatchMs: roundedChatSendTimingMs(performance.now() - postDispatchStartedAtMs),
},
dispatchStartedAtMs,
);
})
.catch(async (err: unknown) => {
const emitAfterError =

View File

@@ -861,23 +861,26 @@ describe("gateway server chat", () => {
});
const first = Promise.resolve(callSend("first", "idem-active-a"));
await vi.waitFor(() => {
expect(responses).toEqual([
{
id: "first",
ok: true,
payload: expect.objectContaining({
runId: "idem-active-a",
status: "started",
serverTiming: {
receivedToAckMs: expect.any(Number),
loadSessionMs: expect.any(Number),
},
}),
error: undefined,
},
]);
}, FAST_WAIT_OPTS);
await vi.waitFor(
() => {
expect(responses).toEqual([
{
id: "first",
ok: true,
payload: expect.objectContaining({
runId: "idem-active-a",
status: "started",
serverTiming: {
receivedToAckMs: expect.any(Number),
loadSessionMs: expect.any(Number),
},
}),
error: undefined,
},
]);
},
{ timeout: 2_000, interval: 5 },
);
await callSend("duplicate", "idem-active-b");
@@ -1053,6 +1056,157 @@ describe("gateway server chat", () => {
}
});
test("chat.send emits operator-only post-ACK server timing milestones", async () => {
const sessionDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gw-"));
try {
testState.sessionStorePath = path.join(sessionDir, "sessions.json");
await writeSessionStore({
entries: {
main: {
sessionId: "sess-main",
updatedAt: Date.now(),
},
},
});
const responses: Array<{ ok: boolean; payload?: unknown; error?: unknown }> = [];
const broadcastToConnIds = vi.fn();
const context = {
loadGatewayModelCatalog: vi.fn<GatewayRequestContext["loadGatewayModelCatalog"]>(),
logGateway: {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
},
agentRunSeq: new Map<string, number>(),
chatAbortControllers: new Map(),
chatAbortedRuns: new Map(),
chatRunBuffers: new Map(),
chatDeltaSentAt: new Map(),
chatDeltaLastBroadcastLen: new Map(),
chatDeltaLastBroadcastText: new Map(),
agentDeltaSentAt: new Map(),
bufferedAgentEvents: new Map(),
clearChatRunState: vi.fn(),
addChatRun: vi.fn(),
removeChatRun: vi.fn(),
broadcast: vi.fn(),
broadcastToConnIds,
nodeSendToSession: vi.fn(),
registerToolEventRecipient: vi.fn(),
dedupe: new Map(),
} as unknown as GatewayRequestContext;
dispatchInboundMessageMock.mockImplementationOnce(async (args: unknown) => {
const replyOptions = (args as { replyOptions?: GetReplyOptions }).replyOptions;
replyOptions?.onModelSelected?.({
provider: "openai",
model: "gpt-5.5",
thinkLevel: undefined,
});
replyOptions?.onAgentRunStart?.("agent-run-1");
return {};
});
const { chatHandlers } = await import("./server-methods/chat.js");
await chatHandlers["chat.send"]({
req: {
type: "req",
id: "operator-timing",
method: "chat.send",
params: {
sessionKey: "main",
message: "measure",
idempotencyKey: "idem-server-timing",
},
},
params: {
sessionKey: "main",
message: "measure",
idempotencyKey: "idem-server-timing",
},
client: {
connId: "conn-control-ui",
connect: {
client: {
id: GATEWAY_CLIENT_NAMES.CONTROL_UI,
mode: GATEWAY_CLIENT_MODES.WEBCHAT,
},
scopes: ["operator.write"],
},
} as never,
isWebchatConnect: () => true,
respond: ((ok, payload, error) => {
responses.push({ ok, payload, error });
}) as RespondFn,
context,
});
expect(responses).toEqual([
{
ok: true,
payload: expect.objectContaining({
runId: "idem-server-timing",
status: "started",
serverTiming: {
receivedToAckMs: expect.any(Number),
loadSessionMs: expect.any(Number),
},
}),
error: undefined,
},
]);
await vi.waitFor(
() => {
const phases = broadcastToConnIds.mock.calls
.filter(([event]) => event === "chat.send_timing")
.map(([, payload]) => (payload as { phase?: unknown }).phase);
expect(phases).toEqual(
expect.arrayContaining([
"dispatch-started",
"model-selected",
"agent-run-started",
"dispatch-completed",
"post-dispatch-completed",
]),
);
},
{ timeout: 2_000, interval: 5 },
);
for (const [event, payload, connIds, opts] of broadcastToConnIds.mock.calls) {
expect(event).toBe("chat.send_timing");
expect(connIds).toEqual(new Set(["conn-control-ui"]));
expect(opts).toEqual({ dropIfSlow: true });
expect(payload).toMatchObject({
runId: "idem-server-timing",
sessionKey: "agent:main:main",
ackToPhaseMs: expect.any(Number),
receivedToPhaseMs: expect.any(Number),
});
}
const timingPayloads = broadcastToConnIds.mock.calls.map(([, payload]) => payload);
expect(timingPayloads).toEqual(
expect.arrayContaining([
expect.objectContaining({
phase: "model-selected",
provider: "openai",
model: "gpt-5.5",
}),
expect.objectContaining({
phase: "agent-run-started",
agentRunId: "agent-run-1",
dispatchStartedToPhaseMs: expect.any(Number),
}),
]),
);
} finally {
dispatchInboundMessageMock.mockReset();
testState.sessionStorePath = undefined;
clearConfigCache();
await fs.rm(sessionDir, { recursive: true, force: true });
}
});
test("chat.history backfills claude-cli sessions from Claude project files", async () => {
await withGatewayChatHarness(async ({ ws, createSessionDir }) => {
await connectOk(ws);

View File

@@ -52,6 +52,7 @@ let clearPendingQueueItemsForRun: typeof import("./app-chat.ts").clearPendingQue
let removeQueuedMessage: typeof import("./app-chat.ts").removeQueuedMessage;
let markQueuedChatSendsWaitingForReconnect: typeof import("./app-chat.ts").markQueuedChatSendsWaitingForReconnect;
let retryReconnectableQueuedChatSends: typeof import("./app-chat.ts").retryReconnectableQueuedChatSends;
let recordChatSendServerTiming: typeof import("./app-chat.ts").recordChatSendServerTiming;
async function loadChatHelpers(): Promise<void> {
({
@@ -66,6 +67,7 @@ async function loadChatHelpers(): Promise<void> {
removeQueuedMessage,
markQueuedChatSendsWaitingForReconnect,
retryReconnectableQueuedChatSends,
recordChatSendServerTiming,
} = await import("./app-chat.ts"));
}
@@ -1374,6 +1376,57 @@ describe("handleSendChat", () => {
});
});
it("records Gateway post-ACK server timing milestones for a chat send", async () => {
const request = vi.fn(async (method: string) => {
if (method === "chat.send") {
return { status: "started" };
}
throw new Error(`Unexpected request: ${method}`);
});
const host = makeHost({
client: { request } as unknown as ChatHost["client"],
chatMessage: "measure server milestone",
eventLogBuffer: [],
tab: "debug",
});
await handleSendChat(host);
const ack = eventPayloads(host, "control-ui.chat.send").find(
(payload) => payload.phase === "ack",
);
const runId = typeof ack?.runId === "string" ? ack.runId : "";
expect(runId).toMatch(uuidPattern);
recordChatSendServerTiming(host, {
phase: "agent-run-started",
runId,
sessionKey: "agent:main",
agentId: "main",
ackToPhaseMs: 12,
receivedToPhaseMs: 25,
dispatchStartedToPhaseMs: 8,
agentRunId: "agent-run-1",
});
expect(eventPayloads(host, "control-ui.chat.send")).toEqual(
expect.arrayContaining([
expect.objectContaining({
phase: "server-agent-run-started",
runId,
sessionKey: "agent:main",
agentId: "main",
ackStatus: "started",
serverPhase: "agent-run-started",
serverAckToPhaseMs: 12,
serverReceivedToPhaseMs: 25,
serverDispatchStartedToPhaseMs: 8,
agentRunId: "agent-run-1",
}),
]),
);
});
it("records pending send paint timing before a delayed chat.send ACK", async () => {
vi.spyOn(window, "requestAnimationFrame").mockImplementation((callback) => {
queueMicrotask(() => callback(0));

View File

@@ -597,6 +597,11 @@ type ChatSendTimingPhase =
| "pending-painted"
| "request-start"
| "ack"
| "server-dispatch-started"
| "server-model-selected"
| "server-agent-run-started"
| "server-dispatch-completed"
| "server-post-dispatch-completed"
| "first-assistant-visible"
| "terminal-before-delta"
| "queued-busy"
@@ -617,6 +622,21 @@ type ChatSendTimingEntry = {
firstAssistantVisibleRecorded?: boolean;
};
type ChatSendServerTimingPhase =
| "dispatch-started"
| "model-selected"
| "agent-run-started"
| "dispatch-completed"
| "post-dispatch-completed";
const CHAT_SEND_SERVER_TIMING_PHASES = new Set<ChatSendServerTimingPhase>([
"dispatch-started",
"model-selected",
"agent-run-started",
"dispatch-completed",
"post-dispatch-completed",
]);
function recordChatSendTiming(
host: ChatHost,
item: Pick<
@@ -647,6 +667,79 @@ function recordChatSendTiming(
);
}
function readChatSendServerTimingPhase(value: unknown): ChatSendServerTimingPhase | null {
return typeof value === "string" &&
(CHAT_SEND_SERVER_TIMING_PHASES as ReadonlySet<string>).has(value)
? (value as ChatSendServerTimingPhase)
: null;
}
function readChatSendTimingNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) && value >= 0 ? value : undefined;
}
export function recordChatSendServerTiming(host: ChatHost, payload: unknown) {
if (!payload || typeof payload !== "object") {
return;
}
const record = payload as Record<string, unknown>;
const phase = readChatSendServerTimingPhase(record.phase);
const runId = typeof record.runId === "string" && record.runId.trim() ? record.runId.trim() : "";
if (!phase || !runId) {
return;
}
const entry = host.chatSendTimingsByRun?.get(runId);
const nowMs = controlUiNowMs();
const serverAckToPhaseMs = readChatSendTimingNumber(record.ackToPhaseMs);
const serverReceivedToPhaseMs = readChatSendTimingNumber(record.receivedToPhaseMs);
const serverDispatchStartedToPhaseMs = readChatSendTimingNumber(record.dispatchStartedToPhaseMs);
const serverPostDispatchMs = readChatSendTimingNumber(record.postDispatchMs);
const durationMs =
entry?.submittedAtMs !== undefined
? roundedControlUiDurationMs(nowMs - entry.submittedAtMs)
: serverAckToPhaseMs;
if (durationMs === undefined) {
return;
}
recordControlUiPerformanceEvent(
host as Parameters<typeof recordControlUiPerformanceEvent>[0],
"control-ui.chat.send",
{
phase: `server-${phase}`,
durationMs,
runId,
sessionKey:
entry?.sessionKey ??
(typeof record.sessionKey === "string" && record.sessionKey.trim()
? record.sessionKey.trim()
: undefined),
agentId:
entry?.agentId ??
(typeof record.agentId === "string" && record.agentId.trim()
? record.agentId.trim()
: undefined),
sendAttempts: entry?.sendAttempts ?? 0,
sendState: entry?.sendState,
ackStatus: entry?.ackStatus,
serverPhase: phase,
...(serverAckToPhaseMs !== undefined ? { serverAckToPhaseMs } : {}),
...(serverReceivedToPhaseMs !== undefined ? { serverReceivedToPhaseMs } : {}),
...(serverDispatchStartedToPhaseMs !== undefined ? { serverDispatchStartedToPhaseMs } : {}),
...(serverPostDispatchMs !== undefined ? { serverPostDispatchMs } : {}),
...(typeof record.provider === "string" && record.provider.trim()
? { provider: record.provider.trim() }
: {}),
...(typeof record.model === "string" && record.model.trim()
? { model: record.model.trim() }
: {}),
...(typeof record.agentRunId === "string" && record.agentRunId.trim()
? { agentRunId: record.agentRunId.trim() }
: {}),
},
{ console: false, maxBufferedEventsForType: 40 },
);
}
function ensureChatSendTimingEntries(host: ChatHost): Map<string, ChatSendTimingEntry> {
if (host.chatSendTimingsByRun) {
return host.chatSendTimingsByRun;

View File

@@ -9,6 +9,7 @@ import {
flushChatQueueForEvent,
hasReconnectableQueuedChatSends,
markQueuedChatSendsWaitingForReconnect,
recordChatSendServerTiming,
recordFirstAssistantChatTiming,
refreshChatAvatar,
scopedAgentListParamsForRefreshTarget,
@@ -1222,6 +1223,14 @@ function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) {
return;
}
if (evt.event === "chat.send_timing") {
recordChatSendServerTiming(
host as unknown as Parameters<typeof recordChatSendServerTiming>[0],
evt.payload,
);
return;
}
if (evt.event === "chat.side_result") {
const sideResult = parseChatSideResult(evt.payload);
if (

View File

@@ -374,6 +374,17 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
const runId = requireString(params.idempotencyKey, "chat send idempotency key");
await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 });
await waitForControlUiChatSendPhases(page, runId, ["ack"]);
await gateway.emitGatewayEvent("chat.send_timing", {
phase: "agent-run-started",
runId,
agentId: "ops",
sessionKey: "global",
ackToPhaseMs: 11,
receivedToPhaseMs: 20,
dispatchStartedToPhaseMs: 7,
agentRunId: "agent-run-e2e",
});
await waitForControlUiChatSendPhases(page, runId, ["server-agent-run-started"]);
await gateway.emitGatewayEvent("chat", {
deltaText: "First token visible.",
message: {
@@ -391,6 +402,7 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
"pending-visible",
"request-start",
"ack",
"server-agent-run-started",
"first-assistant-visible",
]);
const sendTimingEvents = (await controlUiEventPayloads(page, "control-ui.chat.send")).filter(
@@ -415,6 +427,16 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
sessionKey: "global",
});
expect(ackTiming?.requestDurationMs).toEqual(expect.any(Number));
expect(sendTimingByPhase.get("server-agent-run-started")).toMatchObject({
agentRunId: "agent-run-e2e",
agentId: "ops",
runId,
serverAckToPhaseMs: 11,
serverDispatchStartedToPhaseMs: 7,
serverPhase: "agent-run-started",
serverReceivedToPhaseMs: 20,
sessionKey: "global",
});
const firstVisibleTiming = sendTimingByPhase.get("first-assistant-visible");
expect(firstVisibleTiming).toMatchObject({
ackStatus: "started",