perf(ui): trace chat first output latency

Add chat-send first visible assistant output telemetry in the Control UI, plus Gateway diagnostics correlation attributes for chat.send dispatch spans. Verified with focused UI/Gateway tests, tsgo, oxlint, autoreview, PR checks, and Testbox-through-Crabbox check:changed.
This commit is contained in:
Vincent Koc
2026-06-01 09:47:45 +01:00
committed by GitHub
parent 4094c94a8f
commit b13af38f99
6 changed files with 406 additions and 34 deletions

View File

@@ -2943,6 +2943,7 @@ export const chatHandlers: GatewayRequestHandlers = {
}
const rawSessionKey = p.sessionKey;
const agentIdOverride = normalizeOptionalText(p.agentId);
const clientRunId = p.idempotencyKey;
const requestedAgentId = resolveRequestedChatAgentId({
cfg: (context as { getRuntimeConfig?: () => OpenClawConfig }).getRuntimeConfig?.(),
requestedSessionKey: rawSessionKey,
@@ -2959,6 +2960,7 @@ export const chatHandlers: GatewayRequestHandlers = {
{
phase: "agent-turn",
attributes: {
runId: clientRunId,
hasAttachments: normalizedAttachments.length > 0,
hasExplicitOrigin: explicitOriginResult.value !== undefined,
},
@@ -3013,7 +3015,6 @@ export const chatHandlers: GatewayRequestHandlers = {
overrideMs: p.timeoutMs,
});
const now = Date.now();
const clientRunId = p.idempotencyKey;
const sendPolicy = resolveSendPolicy({
cfg,
@@ -3347,6 +3348,11 @@ export const chatHandlers: GatewayRequestHandlers = {
channel: INTERNAL_MESSAGE_CHANNEL,
});
const chatSendTraceAttributes = {
runId: clientRunId,
sessionKey,
agentId: selectedAgent.agentId ?? agentId,
provider: resolvedSessionModel.provider,
model: resolvedSessionModel.model,
hasAttachments: normalizedAttachments.length > 0,
hasExplicitOrigin: explicitOriginResult.value !== undefined,
hasConnectedClient: client?.connect !== undefined,

View File

@@ -116,6 +116,15 @@ async function writeMainSessionTranscript(sessionDir: string, lines: string[]) {
await fs.writeFile(path.join(sessionDir, "sess-main.jsonl"), `${lines.join("\n")}\n`, "utf-8");
}
async function readTimelineEvents(filePath: string): Promise<Array<Record<string, unknown>>> {
const raw = await fs.readFile(filePath, "utf-8");
return raw
.trim()
.split("\n")
.filter(Boolean)
.map((line) => JSON.parse(line) as Record<string, unknown>);
}
async function fetchHistoryMessages(
ws: GatewaySocket,
params?: {
@@ -1178,6 +1187,60 @@ describe("gateway server chat", () => {
});
});
test("chat.send diagnostics timeline carries run correlation attributes", async () => {
const timelineDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-chat-timeline-"));
const timelinePath = path.join(timelineDir, "timeline.jsonl");
const previousDiagnostics = process.env.OPENCLAW_DIAGNOSTICS;
const previousTimelinePath = process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH;
process.env.OPENCLAW_DIAGNOSTICS = "timeline";
process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH = timelinePath;
try {
await withGatewayChatHarness(async ({ ws, createSessionDir }) => {
const spy = getReplyFromConfig;
await connectOk(ws);
await createSessionDir();
await writeMainSessionStore();
mockGetReplyFromConfigOnce(async () => undefined);
const sendRes = await rpcReq(ws, "chat.send", {
sessionKey: "main",
message: "hello",
idempotencyKey: "idem-timeline",
});
expect(sendRes.ok).toBe(true);
await vi.waitFor(() => {
expect(spy.mock.calls.length).toBeGreaterThan(0);
}, FAST_WAIT_OPTS);
await vi.waitFor(async () => {
const events = await readTimelineEvents(timelinePath);
expect(
events.some(
(event) =>
event.type === "span.end" &&
event.name === "gateway.chat_send.dispatch_inbound" &&
(event.attributes as Record<string, unknown> | undefined)?.runId ===
"idem-timeline",
),
).toBe(true);
}, FAST_WAIT_OPTS);
});
} finally {
if (previousDiagnostics === undefined) {
delete process.env.OPENCLAW_DIAGNOSTICS;
} else {
process.env.OPENCLAW_DIAGNOSTICS = previousDiagnostics;
}
if (previousTimelinePath === undefined) {
delete process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH;
} else {
process.env.OPENCLAW_DIAGNOSTICS_TIMELINE_PATH = previousTimelinePath;
}
await fs.rm(timelineDir, { recursive: true, force: true, maxRetries: 5, retryDelay: 50 });
}
});
test("chat.history hard-caps single oversized nested payloads", async () => {
await withGatewayChatHarness(async ({ ws, createSessionDir }) => {
const historyMaxBytes = 64 * 1024;

View File

@@ -27,13 +27,13 @@ import type { ChatSideResult } from "./chat/side-result.ts";
import { executeSlashCommand } from "./chat/slash-command-executor.ts";
import { parseSlashCommand, refreshSlashCommands } from "./chat/slash-commands.ts";
import { formatConnectError } from "./connect-error.ts";
import { resolveControlUiAuthHeader } from "./control-ui-auth.ts";
import {
controlUiNowMs,
recordControlUiPerformanceEvent,
roundedControlUiDurationMs,
scheduleControlUiAfterPaint,
} from "./control-ui-performance.ts";
import { resolveControlUiAuthHeader } from "./control-ui-auth.ts";
import {
abortChatRun,
appendUserChatMessage,
@@ -41,7 +41,9 @@ import {
requestChatSend,
sendDetachedChatMessage,
sendSteerChatMessage,
type ChatEventPayload,
type ChatHistoryResult,
type ChatSendAck,
type ChatState,
} from "./controllers/chat.ts";
import { loadModels } from "./controllers/models.ts";
@@ -105,6 +107,7 @@ export type ChatHost = ChatInputHistoryState & {
refreshSessionsAfterChat: Map<string, ChatSessionRefreshTarget>;
pendingAbort?: { runId?: string | null; sessionKey: string; agentId?: string } | null;
chatSubmitGuards?: Map<string, Promise<void>>;
chatSendTimingsByRun?: Map<string, ChatSendTimingEntry>;
assistantAgentId?: string | null;
agentsList?: { defaultId?: string | null; mainKey?: string | null } | null;
eventLogBuffer?: unknown[];
@@ -395,8 +398,9 @@ function enqueuePendingSendMessage(
attachments?: ChatAttachment[],
refreshSessions?: boolean,
submittedAtMs = controlUiNowMs(),
sendState: ChatQueueItem["sendState"] =
host.connected && host.client ? "sending" : "waiting-reconnect",
sendState: ChatQueueItem["sendState"] = host.connected && host.client
? "sending"
: "waiting-reconnect",
): ChatQueueItem | null {
const trimmed = text.trim();
const hasAttachments = Boolean(attachments && attachments.length > 0);
@@ -541,9 +545,9 @@ function cancelPendingSendBeforeRequest(
restoreComposer && opts.previousDraft != null && !host.chatMessage.trim();
const willRestoreAttachments = Boolean(
restoreComposer &&
opts.previousAttachments?.length &&
host.chatAttachments.length === 0 &&
(willRestoreDraft || !host.chatMessage.trim()),
opts.previousAttachments?.length &&
host.chatAttachments.length === 0 &&
(willRestoreDraft || !host.chatMessage.trim()),
);
if (restoreComposer) {
if (willRestoreDraft) {
@@ -568,11 +572,26 @@ type ChatSendTimingPhase =
| "pending-painted"
| "request-start"
| "ack"
| "first-assistant-visible"
| "terminal-before-delta"
| "queued-busy"
| "waiting-model"
| "waiting-reconnect"
| "failed";
type ChatSendTimingEntry = {
runId: string;
sessionKey?: string;
agentId?: string;
sendAttempts: number;
sendState?: ChatQueueItem["sendState"];
submittedAtMs: number;
requestStartedAtMs?: number;
ackAtMs?: number;
ackStatus?: ChatSendAck["status"];
firstAssistantVisibleRecorded?: boolean;
};
function recordChatSendTiming(
host: ChatHost,
item: Pick<
@@ -603,6 +622,156 @@ function recordChatSendTiming(
);
}
function ensureChatSendTimingEntries(host: ChatHost): Map<string, ChatSendTimingEntry> {
if (host.chatSendTimingsByRun) {
return host.chatSendTimingsByRun;
}
const entries = new Map<string, ChatSendTimingEntry>();
host.chatSendTimingsByRun = entries;
return entries;
}
function registerChatSendTiming(
host: ChatHost,
item: Pick<
ChatQueueItem,
"sendRunId" | "sessionKey" | "agentId" | "sendAttempts" | "sendState" | "sendSubmittedAtMs"
>,
runId: string,
requestStartedAtMs: number,
) {
ensureChatSendTimingEntries(host).set(runId, {
runId,
sessionKey: item.sessionKey,
agentId: item.agentId,
sendAttempts: item.sendAttempts ?? 0,
sendState: item.sendState,
submittedAtMs: item.sendSubmittedAtMs ?? requestStartedAtMs,
requestStartedAtMs,
});
}
function updateChatSendAckTiming(
host: ChatHost,
requestedRunId: string,
ack: ChatSendAck,
item: Pick<
ChatQueueItem,
"sessionKey" | "agentId" | "sendAttempts" | "sendState" | "sendSubmittedAtMs"
>,
requestStartedAtMs: number,
) {
const entries = ensureChatSendTimingEntries(host);
const existing = entries.get(requestedRunId);
const submittedAtMs = existing?.submittedAtMs ?? item.sendSubmittedAtMs ?? requestStartedAtMs;
const next: ChatSendTimingEntry = {
...(existing ?? {
runId: ack.runId,
sessionKey: item.sessionKey,
agentId: item.agentId,
sendAttempts: item.sendAttempts ?? 0,
sendState: item.sendState,
submittedAtMs,
requestStartedAtMs,
}),
runId: ack.runId,
sessionKey: existing?.sessionKey ?? item.sessionKey,
agentId: existing?.agentId ?? item.agentId,
ackAtMs: controlUiNowMs(),
ackStatus: ack.status,
};
if (ack.runId !== requestedRunId) {
entries.delete(requestedRunId);
}
entries.set(ack.runId, next);
}
function chatEventHasVisibleTerminalPayload(payload: ChatEventPayload): boolean {
if (payload.state === "error" && payload.errorMessage?.trim()) {
return true;
}
return Boolean(payload.message && typeof payload.message === "object");
}
function resolveFirstAssistantTimingPhase(
host: ChatHost,
payload: ChatEventPayload,
entry: ChatSendTimingEntry,
): Extract<ChatSendTimingPhase, "first-assistant-visible" | "terminal-before-delta"> | null {
if (entry.firstAssistantVisibleRecorded) {
return null;
}
if (payload.state === "delta") {
return typeof host.chatStream === "string" && host.chatStream.trim()
? "first-assistant-visible"
: null;
}
if (payload.state === "final" || payload.state === "aborted" || payload.state === "error") {
return chatEventHasVisibleTerminalPayload(payload) ? "terminal-before-delta" : null;
}
return null;
}
export function recordFirstAssistantChatTiming(
host: ChatHost,
payload: ChatEventPayload | undefined,
handledState: ChatEventPayload["state"] | null,
) {
if (!payload || !handledState || typeof payload.runId !== "string") {
return;
}
const runId = payload.runId.trim();
const entry = runId ? host.chatSendTimingsByRun?.get(runId) : undefined;
if (!entry) {
return;
}
const phase = resolveFirstAssistantTimingPhase(host, payload, entry);
if (!phase) {
if (payload.state === "final" || payload.state === "aborted" || payload.state === "error") {
host.chatSendTimingsByRun?.delete(runId);
}
return;
}
const eventAtMs = controlUiNowMs();
entry.firstAssistantVisibleRecorded = true;
scheduleControlUiAfterPaint(host, () => {
const paintedAtMs = controlUiNowMs();
recordControlUiPerformanceEvent(
host as Parameters<typeof recordControlUiPerformanceEvent>[0],
"control-ui.chat.send",
{
phase,
durationMs: roundedControlUiDurationMs(paintedAtMs - entry.submittedAtMs),
runId,
sessionKey: entry.sessionKey ?? payload.sessionKey,
agentId: entry.agentId ?? payload.agentId,
sendAttempts: entry.sendAttempts,
sendState: entry.sendState,
ackStatus: entry.ackStatus,
eventState: payload.state,
firstAssistantPaintMs: roundedControlUiDurationMs(paintedAtMs - eventAtMs),
...(entry.requestStartedAtMs != null
? {
requestToFirstAssistantEventMs: roundedControlUiDurationMs(
eventAtMs - entry.requestStartedAtMs,
),
}
: {}),
...(entry.ackAtMs != null
? {
ackToFirstAssistantEventMs: roundedControlUiDurationMs(eventAtMs - entry.ackAtMs),
}
: {}),
},
{ console: false, maxBufferedEventsForType: 40 },
);
if (phase === "terminal-before-delta") {
host.chatSendTimingsByRun?.delete(runId);
}
});
}
function shouldRecordPendingSendPaint(item: ChatQueueItem): boolean {
return (
typeof item.sendSubmittedAtMs === "number" &&
@@ -622,21 +791,18 @@ function schedulePendingSendPaintTiming(
if (!sendRunId || startedAtMs == null) {
return;
}
scheduleControlUiAfterPaint(
host as Parameters<typeof scheduleControlUiAfterPaint>[0],
() => {
if (!visibleSessionMatches(host, sessionKey, item.agentId)) {
return;
}
const queued = readChatQueueForSession(host, sessionKey).find(
(entry) => entry.id === item.id && entry.sendRunId === sendRunId,
);
if (!queued || !shouldRecordPendingSendPaint(queued)) {
return;
}
recordChatSendTiming(host, queued, "pending-painted", startedAtMs);
},
);
scheduleControlUiAfterPaint(host as Parameters<typeof scheduleControlUiAfterPaint>[0], () => {
if (!visibleSessionMatches(host, sessionKey, item.agentId)) {
return;
}
const queued = readChatQueueForSession(host, sessionKey).find(
(entry) => entry.id === item.id && entry.sendRunId === sendRunId,
);
if (!queued || !shouldRecordPendingSendPaint(queued)) {
return;
}
recordChatSendTiming(host, queued, "pending-painted", startedAtMs);
});
}
function ensureQueuedSendState(
@@ -695,17 +861,19 @@ async function sendQueuedChatMessage(
const runId = prepared.sendRunId ?? generateUUID();
const startedAt = Date.now();
const requestStartedAtMs = controlUiNowMs();
updateQueuedMessageForSession(host, sessionKey, id, (item) => ({
...item,
sendAttempts: (item.sendAttempts ?? 0) + 1,
sendError: undefined,
sendRunId: runId,
sendState: "sending",
sendRequestStartedAtMs: requestStartedAtMs,
sessionKey,
agentId: prepared.agentId,
}));
recordChatSendTiming(host, prepared, "request-start", prepared.sendSubmittedAtMs);
const sendingItem =
updateQueuedMessageForSession(host, sessionKey, id, (item) => ({
...item,
sendAttempts: (item.sendAttempts ?? 0) + 1,
sendError: undefined,
sendRunId: runId,
sendState: "sending",
sendRequestStartedAtMs: requestStartedAtMs,
sessionKey,
agentId: prepared.agentId,
})) ?? prepared;
registerChatSendTiming(host, sendingItem, runId, requestStartedAtMs);
recordChatSendTiming(host, sendingItem, "request-start", sendingItem.sendSubmittedAtMs);
host.chatSending = true;
const isVisibleSession = () => visibleSessionMatches(host, sessionKey, prepared.agentId);
if (isVisibleSession()) {
@@ -723,7 +891,8 @@ async function sendQueuedChatMessage(
sessionKey,
agentId: prepared.agentId,
});
recordChatSendTiming(host, prepared, "ack", prepared.sendSubmittedAtMs, {
updateChatSendAckTiming(host, runId, ack, sendingItem, requestStartedAtMs);
recordChatSendTiming(host, sendingItem, "ack", sendingItem.sendSubmittedAtMs, {
ackStatus: ack.status,
requestDurationMs: roundedControlUiDurationMs(controlUiNowMs() - requestStartedAtMs),
});

View File

@@ -125,6 +125,7 @@ type TestGatewayHost = Parameters<typeof connectGateway>[0] & {
chatSideResult: unknown;
chatSideResultTerminalRuns: Set<string>;
chatStream: string | null;
updateComplete?: Promise<unknown>;
chatToolMessages: Record<string, unknown>[];
activityEntries: ActivityEntry[];
toolStreamById: Map<string, unknown>;
@@ -209,6 +210,18 @@ function connectHostGateway() {
return { host, client };
}
function eventPayloads(host: TestGatewayHost, event: string): Array<Record<string, unknown>> {
const payloads: Array<Record<string, unknown>> = [];
for (const entry of host.eventLogBuffer) {
const candidate = entry as { event?: unknown; payload?: unknown };
if (candidate.event !== event || !candidate.payload || typeof candidate.payload !== "object") {
continue;
}
payloads.push(candidate.payload as Record<string, unknown>);
}
return payloads;
}
function emitToolResultEvent(client: GatewayClientMock) {
client.emitEvent({
event: "agent",
@@ -1104,6 +1117,67 @@ describe("connectGateway", () => {
expect(loadChatHistoryMock).not.toHaveBeenCalled();
});
it("records first assistant paint timing for tracked chat sends", async () => {
const { host, client } = connectHostGateway();
host.updateComplete = Promise.resolve();
host.chatRunId = "run-first-visible";
host.chatStream = "";
(
host as TestGatewayHost & {
chatSendTimingsByRun: Map<string, Record<string, unknown>>;
}
).chatSendTimingsByRun = new Map([
[
"run-first-visible",
{
runId: "run-first-visible",
sessionKey: "main",
sendAttempts: 1,
sendState: "sending",
submittedAtMs: 100,
requestStartedAtMs: 125,
ackAtMs: 150,
ackStatus: "started",
},
],
]);
client.emitEvent({
event: "chat",
payload: {
runId: "run-first-visible",
sessionKey: "main",
state: "delta",
deltaText: "Hello",
message: {
role: "assistant",
content: [{ type: "text", text: "Hello" }],
timestamp: Date.now(),
},
},
});
await vi.waitFor(() =>
expect(
eventPayloads(host, "control-ui.chat.send").some(
(payload) => payload.phase === "first-assistant-visible",
),
).toBe(true),
);
const firstVisible = eventPayloads(host, "control-ui.chat.send").find(
(payload) => payload.phase === "first-assistant-visible",
);
expect(firstVisible).toMatchObject({
runId: "run-first-visible",
sessionKey: "main",
ackStatus: "started",
eventState: "delta",
sendState: "sending",
});
expect(firstVisible?.ackToFirstAssistantEventMs).toEqual(expect.any(Number));
expect(host.chatStream).toBe("Hello");
});
it("renders session-scoped tool events for externally started runs", () => {
const { host, client } = connectHostGateway();

View File

@@ -9,6 +9,7 @@ import {
flushChatQueueForEvent,
hasReconnectableQueuedChatSends,
markQueuedChatSendsWaitingForReconnect,
recordFirstAssistantChatTiming,
refreshChatAvatar,
scopedAgentListParamsForRefreshTarget,
retryReconnectableQueuedChatSends,
@@ -895,6 +896,11 @@ function handleChatGatewayEvent(host: GatewayHost, payload: ChatEventPayload | u
}
const activeRunIdBeforeEvent = host.chatRunId;
const state = handleChatEvent(host as unknown as ChatState, payload);
recordFirstAssistantChatTiming(
host as unknown as Parameters<typeof recordFirstAssistantChatTiming>[0],
payload,
state,
);
const terminalEventIsForDifferentActiveRun = isEventForDifferentActiveRun(
payload,
activeRunIdBeforeEvent,

View File

@@ -56,6 +56,26 @@ async function chatThreadDistanceFromBottom(page: Page): Promise<number> {
});
}
async function controlUiEventPayloads(
page: Page,
event: string,
): Promise<Array<Record<string, unknown>>> {
return page.evaluate((eventName) => {
const app = document.querySelector("openclaw-app") as
| (Element & { eventLogBuffer?: unknown[] })
| null;
return (app?.eventLogBuffer ?? [])
.filter((entry): entry is { event: string; payload: Record<string, unknown> } => {
const candidate = entry as { event?: unknown; payload?: unknown };
return (
candidate.event === eventName &&
Boolean(candidate.payload && typeof candidate.payload === "object")
);
})
.map((entry) => entry.payload);
}, event);
}
function chatSessionListResponse() {
return {
count: 2,
@@ -280,6 +300,40 @@ describeControlUiE2e("Control UI mocked Gateway E2E", () => {
const runId = requireString(params.idempotencyKey, "chat send idempotency key");
await page.locator(".chat-thread").getByText(prompt).waitFor({ timeout: 10_000 });
await gateway.emitGatewayEvent("chat", {
deltaText: "First token visible.",
message: {
content: [{ text: "First token visible.", type: "text" }],
role: "assistant",
timestamp: Date.now(),
},
runId,
sessionKey: "main",
state: "delta",
});
await page.getByText("First token visible.").waitFor({ timeout: 10_000 });
await page.waitForFunction((expectedRunId) => {
const app = document.querySelector("openclaw-app") as
| (Element & { eventLogBuffer?: unknown[] })
| null;
return (app?.eventLogBuffer ?? []).some((entry) => {
const candidate = entry as {
event?: unknown;
payload?: { phase?: unknown; runId?: unknown };
};
return (
candidate.event === "control-ui.chat.send" &&
candidate.payload?.phase === "first-assistant-visible" &&
candidate.payload.runId === expectedRunId
);
});
}, runId);
const firstOutputEvents = await controlUiEventPayloads(page, "control-ui.chat.send");
expect(
firstOutputEvents.some(
(payload) => payload.phase === "first-assistant-visible" && payload.runId === runId,
),
).toBe(true);
await gateway.resolveDeferred("chat.history", {
messages: [],
sessionId: "control-ui-e2e-session",