feat(diagnostics): capture model call size timing

This commit is contained in:
Vincent Koc
2026-04-26 13:04:24 -07:00
parent 8e1755928c
commit c6e9849351
9 changed files with 302 additions and 18 deletions

View File

@@ -1147,6 +1147,9 @@ describe("diagnostics-otel service", () => {
api: "completions",
transport: "http",
durationMs: 80,
requestPayloadBytes: 1234,
responseStreamBytes: 567,
timeToFirstByteMs: 45,
trace: {
traceId: TRACE_ID,
spanId: CHILD_SPAN_ID,
@@ -1309,6 +1312,41 @@ describe("diagnostics-otel service", () => {
"openclaw.model": "gpt-5.4",
}),
);
expect(
telemetryState.histograms.get("openclaw.model_call.request_bytes")?.record,
).toHaveBeenCalledWith(
1234,
expect.objectContaining({
"openclaw.provider": "openai",
"openclaw.model": "gpt-5.4",
}),
);
expect(
telemetryState.histograms.get("openclaw.model_call.response_bytes")?.record,
).toHaveBeenCalledWith(
567,
expect.objectContaining({
"openclaw.provider": "openai",
"openclaw.model": "gpt-5.4",
}),
);
expect(
telemetryState.histograms.get("openclaw.model_call.time_to_first_byte_ms")?.record,
).toHaveBeenCalledWith(
45,
expect.objectContaining({
"openclaw.provider": "openai",
"openclaw.model": "gpt-5.4",
}),
);
const modelCallSpan = telemetryState.spans.find((span) => span.name === "openclaw.model.call");
expect(modelCallSpan?.setAttributes).toHaveBeenCalledWith(
expect.objectContaining({
"openclaw.model_call.request_bytes": 1234,
"openclaw.model_call.response_bytes": 567,
"openclaw.model_call.time_to_first_byte_ms": 45,
}),
);
expect(telemetryState.histograms.get("openclaw.run.duration_ms")?.record).toHaveBeenCalledWith(
100,
expect.not.objectContaining({

View File

@@ -217,7 +217,7 @@ function positiveFiniteNumber(value: number | undefined): number | undefined {
}
function assignPositiveNumberAttr(
attrs: Record<string, string | number>,
attrs: Record<string, string | number | boolean>,
key: string,
value: number | undefined,
): void {
@@ -227,6 +227,23 @@ function assignPositiveNumberAttr(
}
}
function assignModelCallSizeTimingAttrs(
attrs: Record<string, string | number | boolean>,
evt: {
requestPayloadBytes?: number;
responseStreamBytes?: number;
timeToFirstByteMs?: number;
},
): void {
assignPositiveNumberAttr(attrs, "openclaw.model_call.request_bytes", evt.requestPayloadBytes);
assignPositiveNumberAttr(attrs, "openclaw.model_call.response_bytes", evt.responseStreamBytes);
assignPositiveNumberAttr(
attrs,
"openclaw.model_call.time_to_first_byte_ms",
evt.timeToFirstByteMs,
);
}
function assignGenAiSpanIdentityAttrs(
attrs: Record<string, string | number | boolean>,
input: { api?: string; model?: string; provider?: string },
@@ -812,6 +829,27 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
unit: "ms",
description: "Model call duration",
});
const modelCallRequestBytesHistogram = meter.createHistogram(
"openclaw.model_call.request_bytes",
{
unit: "By",
description: "UTF-8 byte size of sanitized model request payloads",
},
);
const modelCallResponseBytesHistogram = meter.createHistogram(
"openclaw.model_call.response_bytes",
{
unit: "By",
description: "UTF-8 byte size of streamed model response events",
},
);
const modelCallTimeToFirstByteHistogram = meter.createHistogram(
"openclaw.model_call.time_to_first_byte_ms",
{
unit: "ms",
description: "Elapsed time before the first streamed model response event",
},
);
const toolExecutionDurationHistogram = meter.createHistogram(
"openclaw.tool.execution.duration_ms",
{
@@ -1700,6 +1738,23 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
"gen_ai.request.model": lowCardinalityAttr(evt.model),
...(errorType ? { "error.type": errorType } : {}),
});
const recordModelCallSizeTimingMetrics = (
evt: Extract<DiagnosticEventPayload, { type: "model.call.completed" | "model.call.error" }>,
attrs: ReturnType<typeof modelCallMetricAttrs>,
) => {
const requestPayloadBytes = positiveFiniteNumber(evt.requestPayloadBytes);
if (requestPayloadBytes !== undefined) {
modelCallRequestBytesHistogram.record(requestPayloadBytes, attrs);
}
const responseStreamBytes = positiveFiniteNumber(evt.responseStreamBytes);
if (responseStreamBytes !== undefined) {
modelCallResponseBytesHistogram.record(responseStreamBytes, attrs);
}
const timeToFirstByteMs = positiveFiniteNumber(evt.timeToFirstByteMs);
if (timeToFirstByteMs !== undefined) {
modelCallTimeToFirstByteHistogram.record(timeToFirstByteMs, attrs);
}
};
const recordModelCallStarted = (
evt: Extract<DiagnosticEventPayload, { type: "model.call.started" }>,
@@ -1733,7 +1788,9 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
evt: Extract<DiagnosticEventPayload, { type: "model.call.completed" }>,
metadata: DiagnosticEventMetadata,
) => {
modelCallDurationHistogram.record(evt.durationMs, modelCallMetricAttrs(evt));
const metricAttrs = modelCallMetricAttrs(evt);
modelCallDurationHistogram.record(evt.durationMs, metricAttrs);
recordModelCallSizeTimingMetrics(evt, metricAttrs);
genAiOperationDurationHistogram.record(
evt.durationMs / 1000,
genAiModelCallMetricAttrs(evt),
@@ -1752,6 +1809,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
if (evt.transport) {
spanAttrs["openclaw.transport"] = evt.transport;
}
assignModelCallSizeTimingAttrs(spanAttrs, evt);
assignOtelModelContentAttributes(
spanAttrs,
evt as unknown as Record<string, unknown>,
@@ -1773,10 +1831,12 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
metadata: DiagnosticEventMetadata,
) => {
const errorType = lowCardinalityAttr(evt.errorCategory, "other");
modelCallDurationHistogram.record(evt.durationMs, {
const metricAttrs = {
...modelCallMetricAttrs(evt),
"openclaw.errorCategory": errorType,
});
};
modelCallDurationHistogram.record(evt.durationMs, metricAttrs);
recordModelCallSizeTimingMetrics(evt, metricAttrs);
genAiOperationDurationHistogram.record(
evt.durationMs / 1000,
genAiModelCallMetricAttrs(evt, errorType),
@@ -1797,6 +1857,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
if (evt.transport) {
spanAttrs["openclaw.transport"] = evt.transport;
}
assignModelCallSizeTimingAttrs(spanAttrs, evt);
assignOtelModelContentAttributes(
spanAttrs,
evt as unknown as Record<string, unknown>,

View File

@@ -53,8 +53,19 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
result: () => Promise<string>;
};
originalStream.result = async () => "kept";
const requestPayload = {
input: [{ role: "user", content: "secret prompt sk-test-secret-value" }],
model: "gpt-5.4",
};
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(() => originalStream) as unknown as StreamFn,
((
model: Parameters<StreamFn>[0],
_context: Parameters<StreamFn>[1],
options: Parameters<StreamFn>[2],
) => {
options?.onPayload?.(requestPayload, model);
return originalStream;
}) as unknown as StreamFn,
{
runId: "run-1",
sessionKey: "session-key",
@@ -102,7 +113,52 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
type: "model.call.completed",
callId: "call-1",
durationMs: expect.any(Number),
requestPayloadBytes: Buffer.byteLength(JSON.stringify(requestPayload), "utf8"),
responseStreamBytes: expect.any(Number),
timeToFirstByteMs: expect.any(Number),
});
expect(JSON.stringify(events)).not.toContain("sk-test-secret-value");
});
it("counts async onPayload replacements instead of raw payload content", async () => {
async function* stream() {
yield { type: "text_delta", delta: "safe" };
}
const originalPayload = { input: "secret sk-original-secret" };
const replacementPayload = { input: "redacted" };
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(async (
model: Parameters<StreamFn>[0],
_context: Parameters<StreamFn>[1],
options: Parameters<StreamFn>[2],
) => {
await options?.onPayload?.(originalPayload, model);
return stream();
}) as unknown as StreamFn,
{
runId: "run-1",
provider: "openai",
model: "gpt-5.4",
trace: createDiagnosticTraceContext(),
nextCallId: () => "call-payload",
},
);
const events = await collectModelCallEvents(async () => {
const streamResult = await wrapped({} as never, {} as never, {
onPayload: async () => replacementPayload,
});
await drain(streamResult as unknown as AsyncIterable<unknown>);
});
expect(events[1]).toMatchObject({
type: "model.call.completed",
callId: "call-payload",
requestPayloadBytes: Buffer.byteLength(JSON.stringify(replacementPayload), "utf8"),
responseStreamBytes: expect.any(Number),
timeToFirstByteMs: expect.any(Number),
});
expect(JSON.stringify(events)).not.toContain("sk-original-secret");
});
it("propagates the trusted model-call traceparent without mutating caller headers", async () => {
@@ -296,6 +352,8 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
callId: "call-hook",
outcome: "completed",
durationMs: expect.any(Number),
responseStreamBytes: expect.any(Number),
timeToFirstByteMs: expect.any(Number),
}),
expect.objectContaining({ runId: "run-1" }),
);

View File

@@ -45,13 +45,67 @@ type ModelCallErrorFields = Pick<
>;
type ModelCallEndedHookFields = Pick<
PluginHookModelCallEndedEvent,
"durationMs" | "outcome" | "errorCategory" | "upstreamRequestIdHash"
| "durationMs"
| "outcome"
| "errorCategory"
| "requestPayloadBytes"
| "responseStreamBytes"
| "timeToFirstByteMs"
| "upstreamRequestIdHash"
>;
type ModelCallSizeTimingFields = Pick<
Extract<DiagnosticEventInput, { type: "model.call.completed" }>,
"requestPayloadBytes" | "responseStreamBytes" | "timeToFirstByteMs"
>;
type ModelCallObservationState = {
requestPayloadBytes?: number;
responseStreamBytes: number;
timeToFirstByteMs?: number;
};
const MODEL_CALL_STREAM_RETURN_TIMEOUT_MS = 1000;
const TRACEPARENT_HEADER_NAME = "traceparent";
type ModelCallStreamOptions = Parameters<StreamFn>[2];
function utf8JsonByteLength(value: unknown): number | undefined {
try {
return Buffer.byteLength(JSON.stringify(value), "utf8");
} catch {
return undefined;
}
}
function assignRequestPayloadBytes(state: ModelCallObservationState, payload: unknown): void {
const bytes = utf8JsonByteLength(payload);
if (bytes !== undefined) {
state.requestPayloadBytes = bytes;
}
}
function observeResponseChunk(
state: ModelCallObservationState,
startedAt: number,
chunk: unknown,
): void {
state.timeToFirstByteMs ??= Math.max(0, Date.now() - startedAt);
const bytes = utf8JsonByteLength(chunk);
if (bytes !== undefined) {
state.responseStreamBytes += bytes;
}
}
function modelCallSizeTimingFields(state: ModelCallObservationState): ModelCallSizeTimingFields {
return {
...(state.requestPayloadBytes !== undefined
? { requestPayloadBytes: state.requestPayloadBytes }
: {}),
...(state.responseStreamBytes > 0 ? { responseStreamBytes: state.responseStreamBytes } : {}),
...(state.timeToFirstByteMs !== undefined
? { timeToFirstByteMs: state.timeToFirstByteMs }
: {}),
};
}
function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
if (value === null || (typeof value !== "object" && typeof value !== "function")) {
return false;
@@ -168,34 +222,45 @@ function emitModelCallStarted(eventBase: ModelCallEventBase): void {
dispatchModelCallStartedHook(eventBase);
}
function emitModelCallCompleted(eventBase: ModelCallEventBase, startedAt: number): void {
function emitModelCallCompleted(
eventBase: ModelCallEventBase,
startedAt: number,
state: ModelCallObservationState,
): void {
const durationMs = Date.now() - startedAt;
const sizeTimingFields = modelCallSizeTimingFields(state);
emitTrustedDiagnosticEvent({
type: "model.call.completed",
...eventBase,
durationMs,
...sizeTimingFields,
});
dispatchModelCallEndedHook(eventBase, {
durationMs,
outcome: "completed",
...sizeTimingFields,
});
}
function emitModelCallError(
eventBase: ModelCallEventBase,
startedAt: number,
state: ModelCallObservationState,
fields: ModelCallErrorFields,
): void {
const durationMs = Date.now() - startedAt;
const sizeTimingFields = modelCallSizeTimingFields(state);
emitTrustedDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs,
...sizeTimingFields,
...fields,
});
dispatchModelCallEndedHook(eventBase, {
durationMs,
outcome: "error",
...sizeTimingFields,
...fields,
});
}
@@ -203,10 +268,31 @@ function emitModelCallError(
function withDiagnosticTraceparentHeader(
options: ModelCallStreamOptions,
trace: DiagnosticTraceContext,
state: ModelCallObservationState,
): ModelCallStreamOptions {
const traceparent = formatDiagnosticTraceparent(trace);
const originalOnPayload = options?.onPayload;
const onPayload: NonNullable<ModelCallStreamOptions>["onPayload"] = (payload, model) => {
if (!originalOnPayload) {
assignRequestPayloadBytes(state, payload);
return undefined;
}
const result = originalOnPayload(payload, model);
if (isPromiseLike(result)) {
return result.then((replacement) => {
assignRequestPayloadBytes(state, replacement ?? payload);
return replacement;
});
}
assignRequestPayloadBytes(state, result ?? payload);
return result;
};
if (!traceparent) {
return options;
return {
...options,
onPayload,
};
}
const headers: Record<string, string> = {};
@@ -220,6 +306,7 @@ function withDiagnosticTraceparentHeader(
return {
...options,
headers,
onPayload,
};
}
@@ -259,6 +346,7 @@ async function* observeModelCallIterator<T>(
iterator: AsyncIterator<T>,
eventBase: ModelCallEventBase,
startedAt: number,
state: ModelCallObservationState,
): AsyncIterable<T> {
let terminalEmitted = false;
try {
@@ -267,18 +355,19 @@ async function* observeModelCallIterator<T>(
if (next.done) {
break;
}
observeResponseChunk(state, startedAt, next.value);
yield next.value;
}
terminalEmitted = true;
emitModelCallCompleted(eventBase, startedAt);
emitModelCallCompleted(eventBase, startedAt, state);
} catch (err) {
terminalEmitted = true;
emitModelCallError(eventBase, startedAt, modelCallErrorFields(err));
emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err));
throw err;
} finally {
if (!terminalEmitted) {
await safeReturnIterator(iterator);
emitModelCallCompleted(eventBase, startedAt);
emitModelCallCompleted(eventBase, startedAt, state);
}
}
}
@@ -288,9 +377,10 @@ function observeModelCallStream<T extends AsyncIterable<unknown>>(
createIterator: () => AsyncIterator<unknown>,
eventBase: ModelCallEventBase,
startedAt: number,
state: ModelCallObservationState,
): T {
const observedIterator = () =>
observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator]();
observeModelCallIterator(createIterator(), eventBase, startedAt, state)[Symbol.asyncIterator]();
let hasNonConfigurableIterator = false;
try {
hasNonConfigurableIterator =
@@ -318,6 +408,7 @@ function observeModelCallResult(
result: unknown,
eventBase: ModelCallEventBase,
startedAt: number,
state: ModelCallObservationState,
): unknown {
const createIterator = asyncIteratorFactory(result);
if (createIterator) {
@@ -326,9 +417,10 @@ function observeModelCallResult(
createIterator,
eventBase,
startedAt,
state,
);
}
emitModelCallCompleted(eventBase, startedAt);
emitModelCallCompleted(eventBase, startedAt, state);
return result;
}
@@ -342,22 +434,23 @@ export function wrapStreamFnWithDiagnosticModelCallEvents(
const eventBase = baseModelCallEvent(ctx, callId, trace);
emitModelCallStarted(eventBase);
const startedAt = Date.now();
const propagatedOptions = withDiagnosticTraceparentHeader(options, trace);
const state: ModelCallObservationState = { responseStreamBytes: 0 };
const propagatedOptions = withDiagnosticTraceparentHeader(options, trace, state);
try {
const result = streamFn(model, streamContext, propagatedOptions);
if (isPromiseLike(result)) {
return result.then(
(resolved) => observeModelCallResult(resolved, eventBase, startedAt),
(resolved) => observeModelCallResult(resolved, eventBase, startedAt, state),
(err) => {
emitModelCallError(eventBase, startedAt, modelCallErrorFields(err));
emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err));
throw err;
},
);
}
return observeModelCallResult(result, eventBase, startedAt);
return observeModelCallResult(result, eventBase, startedAt, state);
} catch (err) {
emitModelCallError(eventBase, startedAt, modelCallErrorFields(err));
emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err));
throw err;
}
}) as StreamFn;

View File

@@ -317,12 +317,18 @@ export type DiagnosticModelCallStartedEvent = DiagnosticModelCallBaseEvent & {
export type DiagnosticModelCallCompletedEvent = DiagnosticModelCallBaseEvent & {
type: "model.call.completed";
durationMs: number;
requestPayloadBytes?: number;
responseStreamBytes?: number;
timeToFirstByteMs?: number;
};
export type DiagnosticModelCallErrorEvent = DiagnosticModelCallBaseEvent & {
type: "model.call.error";
durationMs: number;
errorCategory: string;
requestPayloadBytes?: number;
responseStreamBytes?: number;
timeToFirstByteMs?: number;
};
export type DiagnosticContextAssembledEvent = DiagnosticBaseEvent & {

View File

@@ -338,6 +338,14 @@ function readStabilityEventRecord(
assignOptionalCodeString(sanitized, "model", record.model, `${label}.model`);
assignOptionalNumber(sanitized, "durationMs", record.durationMs, `${label}.durationMs`);
assignOptionalNumber(sanitized, "requestBytes", record.requestBytes, `${label}.requestBytes`);
assignOptionalNumber(sanitized, "responseBytes", record.responseBytes, `${label}.responseBytes`);
assignOptionalNumber(
sanitized,
"timeToFirstByteMs",
record.timeToFirstByteMs,
`${label}.timeToFirstByteMs`,
);
assignOptionalNumber(sanitized, "costUsd", record.costUsd, `${label}.costUsd`);
assignOptionalNumber(sanitized, "count", record.count, `${label}.count`);
assignOptionalNumber(sanitized, "bytes", record.bytes, `${label}.bytes`);

View File

@@ -152,6 +152,9 @@ describe("diagnostic stability recorder", () => {
provider: "openai",
model: "gpt-5.4",
durationMs: 1,
requestPayloadBytes: 1234,
responseStreamBytes: 567,
timeToFirstByteMs: 89,
errorCategory: "TypeError",
});
await new Promise<void>((resolve) => setImmediate(resolve));
@@ -167,8 +170,13 @@ describe("diagnostic stability recorder", () => {
type: "model.call.error",
provider: "openai",
model: "gpt-5.4",
durationMs: 1,
requestBytes: 1234,
responseBytes: 567,
timeToFirstByteMs: 89,
reason: "TypeError",
});
expect(JSON.stringify(snapshot.events[1])).not.toContain("call-1");
});
it("summarizes memory and large payload events", () => {

View File

@@ -31,6 +31,9 @@ export type DiagnosticStabilityEventRecord = {
provider?: string;
model?: string;
durationMs?: number;
requestBytes?: number;
responseBytes?: number;
timeToFirstByteMs?: number;
resultCount?: number;
commandLength?: number;
exitCode?: number;
@@ -341,11 +344,17 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
record.provider = event.provider;
record.model = event.model;
record.durationMs = event.durationMs;
record.requestBytes = event.requestPayloadBytes;
record.responseBytes = event.responseStreamBytes;
record.timeToFirstByteMs = event.timeToFirstByteMs;
break;
case "model.call.error":
record.provider = event.provider;
record.model = event.model;
record.durationMs = event.durationMs;
record.requestBytes = event.requestPayloadBytes;
record.responseBytes = event.responseStreamBytes;
record.timeToFirstByteMs = event.timeToFirstByteMs;
assignReasonCode(record, event.errorCategory);
break;
case "log.record":

View File

@@ -212,6 +212,9 @@ export type PluginHookModelCallEndedEvent = PluginHookModelCallBaseEvent & {
durationMs: number;
outcome: "completed" | "error";
errorCategory?: string;
requestPayloadBytes?: number;
responseStreamBytes?: number;
timeToFirstByteMs?: number;
upstreamRequestIdHash?: string;
};