diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts index e96524982e6..c4ac82b7d3f 100644 --- a/extensions/diagnostics-otel/src/service.test.ts +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -1147,6 +1147,9 @@ describe("diagnostics-otel service", () => { api: "completions", transport: "http", durationMs: 80, + requestPayloadBytes: 1234, + responseStreamBytes: 567, + timeToFirstByteMs: 45, trace: { traceId: TRACE_ID, spanId: CHILD_SPAN_ID, @@ -1309,6 +1312,41 @@ describe("diagnostics-otel service", () => { "openclaw.model": "gpt-5.4", }), ); + expect( + telemetryState.histograms.get("openclaw.model_call.request_bytes")?.record, + ).toHaveBeenCalledWith( + 1234, + expect.objectContaining({ + "openclaw.provider": "openai", + "openclaw.model": "gpt-5.4", + }), + ); + expect( + telemetryState.histograms.get("openclaw.model_call.response_bytes")?.record, + ).toHaveBeenCalledWith( + 567, + expect.objectContaining({ + "openclaw.provider": "openai", + "openclaw.model": "gpt-5.4", + }), + ); + expect( + telemetryState.histograms.get("openclaw.model_call.time_to_first_byte_ms")?.record, + ).toHaveBeenCalledWith( + 45, + expect.objectContaining({ + "openclaw.provider": "openai", + "openclaw.model": "gpt-5.4", + }), + ); + const modelCallSpan = telemetryState.spans.find((span) => span.name === "openclaw.model.call"); + expect(modelCallSpan?.setAttributes).toHaveBeenCalledWith( + expect.objectContaining({ + "openclaw.model_call.request_bytes": 1234, + "openclaw.model_call.response_bytes": 567, + "openclaw.model_call.time_to_first_byte_ms": 45, + }), + ); expect(telemetryState.histograms.get("openclaw.run.duration_ms")?.record).toHaveBeenCalledWith( 100, expect.not.objectContaining({ diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index 5742de215c8..3ec59b05238 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -217,7 +217,7 @@ function positiveFiniteNumber(value: number | undefined): number | undefined { } function assignPositiveNumberAttr( - attrs: Record, + attrs: Record, key: string, value: number | undefined, ): void { @@ -227,6 +227,23 @@ function assignPositiveNumberAttr( } } +function assignModelCallSizeTimingAttrs( + attrs: Record, + evt: { + requestPayloadBytes?: number; + responseStreamBytes?: number; + timeToFirstByteMs?: number; + }, +): void { + assignPositiveNumberAttr(attrs, "openclaw.model_call.request_bytes", evt.requestPayloadBytes); + assignPositiveNumberAttr(attrs, "openclaw.model_call.response_bytes", evt.responseStreamBytes); + assignPositiveNumberAttr( + attrs, + "openclaw.model_call.time_to_first_byte_ms", + evt.timeToFirstByteMs, + ); +} + function assignGenAiSpanIdentityAttrs( attrs: Record, input: { api?: string; model?: string; provider?: string }, @@ -812,6 +829,27 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { unit: "ms", description: "Model call duration", }); + const modelCallRequestBytesHistogram = meter.createHistogram( + "openclaw.model_call.request_bytes", + { + unit: "By", + description: "UTF-8 byte size of sanitized model request payloads", + }, + ); + const modelCallResponseBytesHistogram = meter.createHistogram( + "openclaw.model_call.response_bytes", + { + unit: "By", + description: "UTF-8 byte size of streamed model response events", + }, + ); + const modelCallTimeToFirstByteHistogram = meter.createHistogram( + "openclaw.model_call.time_to_first_byte_ms", + { + unit: "ms", + description: "Elapsed time before the first streamed model response event", + }, + ); const toolExecutionDurationHistogram = meter.createHistogram( "openclaw.tool.execution.duration_ms", { @@ -1700,6 +1738,23 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { "gen_ai.request.model": lowCardinalityAttr(evt.model), ...(errorType ? { "error.type": errorType } : {}), }); + const recordModelCallSizeTimingMetrics = ( + evt: Extract, + attrs: ReturnType, + ) => { + const requestPayloadBytes = positiveFiniteNumber(evt.requestPayloadBytes); + if (requestPayloadBytes !== undefined) { + modelCallRequestBytesHistogram.record(requestPayloadBytes, attrs); + } + const responseStreamBytes = positiveFiniteNumber(evt.responseStreamBytes); + if (responseStreamBytes !== undefined) { + modelCallResponseBytesHistogram.record(responseStreamBytes, attrs); + } + const timeToFirstByteMs = positiveFiniteNumber(evt.timeToFirstByteMs); + if (timeToFirstByteMs !== undefined) { + modelCallTimeToFirstByteHistogram.record(timeToFirstByteMs, attrs); + } + }; const recordModelCallStarted = ( evt: Extract, @@ -1733,7 +1788,9 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { evt: Extract, metadata: DiagnosticEventMetadata, ) => { - modelCallDurationHistogram.record(evt.durationMs, modelCallMetricAttrs(evt)); + const metricAttrs = modelCallMetricAttrs(evt); + modelCallDurationHistogram.record(evt.durationMs, metricAttrs); + recordModelCallSizeTimingMetrics(evt, metricAttrs); genAiOperationDurationHistogram.record( evt.durationMs / 1000, genAiModelCallMetricAttrs(evt), @@ -1752,6 +1809,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { if (evt.transport) { spanAttrs["openclaw.transport"] = evt.transport; } + assignModelCallSizeTimingAttrs(spanAttrs, evt); assignOtelModelContentAttributes( spanAttrs, evt as unknown as Record, @@ -1773,10 +1831,12 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { metadata: DiagnosticEventMetadata, ) => { const errorType = lowCardinalityAttr(evt.errorCategory, "other"); - modelCallDurationHistogram.record(evt.durationMs, { + const metricAttrs = { ...modelCallMetricAttrs(evt), "openclaw.errorCategory": errorType, - }); + }; + modelCallDurationHistogram.record(evt.durationMs, metricAttrs); + recordModelCallSizeTimingMetrics(evt, metricAttrs); genAiOperationDurationHistogram.record( evt.durationMs / 1000, genAiModelCallMetricAttrs(evt, errorType), @@ -1797,6 +1857,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { if (evt.transport) { spanAttrs["openclaw.transport"] = evt.transport; } + assignModelCallSizeTimingAttrs(spanAttrs, evt); assignOtelModelContentAttributes( spanAttrs, evt as unknown as Record, diff --git a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts index 20ca3888d57..92dcdd4df21 100644 --- a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts @@ -53,8 +53,19 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { result: () => Promise; }; originalStream.result = async () => "kept"; + const requestPayload = { + input: [{ role: "user", content: "secret prompt sk-test-secret-value" }], + model: "gpt-5.4", + }; const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( - (() => originalStream) as unknown as StreamFn, + (( + model: Parameters[0], + _context: Parameters[1], + options: Parameters[2], + ) => { + options?.onPayload?.(requestPayload, model); + return originalStream; + }) as unknown as StreamFn, { runId: "run-1", sessionKey: "session-key", @@ -102,7 +113,52 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { type: "model.call.completed", callId: "call-1", durationMs: expect.any(Number), + requestPayloadBytes: Buffer.byteLength(JSON.stringify(requestPayload), "utf8"), + responseStreamBytes: expect.any(Number), + timeToFirstByteMs: expect.any(Number), }); + expect(JSON.stringify(events)).not.toContain("sk-test-secret-value"); + }); + + it("counts async onPayload replacements instead of raw payload content", async () => { + async function* stream() { + yield { type: "text_delta", delta: "safe" }; + } + const originalPayload = { input: "secret sk-original-secret" }; + const replacementPayload = { input: "redacted" }; + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (async ( + model: Parameters[0], + _context: Parameters[1], + options: Parameters[2], + ) => { + await options?.onPayload?.(originalPayload, model); + return stream(); + }) as unknown as StreamFn, + { + runId: "run-1", + provider: "openai", + model: "gpt-5.4", + trace: createDiagnosticTraceContext(), + nextCallId: () => "call-payload", + }, + ); + + const events = await collectModelCallEvents(async () => { + const streamResult = await wrapped({} as never, {} as never, { + onPayload: async () => replacementPayload, + }); + await drain(streamResult as unknown as AsyncIterable); + }); + + expect(events[1]).toMatchObject({ + type: "model.call.completed", + callId: "call-payload", + requestPayloadBytes: Buffer.byteLength(JSON.stringify(replacementPayload), "utf8"), + responseStreamBytes: expect.any(Number), + timeToFirstByteMs: expect.any(Number), + }); + expect(JSON.stringify(events)).not.toContain("sk-original-secret"); }); it("propagates the trusted model-call traceparent without mutating caller headers", async () => { @@ -296,6 +352,8 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { callId: "call-hook", outcome: "completed", durationMs: expect.any(Number), + responseStreamBytes: expect.any(Number), + timeToFirstByteMs: expect.any(Number), }), expect.objectContaining({ runId: "run-1" }), ); diff --git a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts index a85aa64489d..cf10564d934 100644 --- a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts +++ b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts @@ -45,13 +45,67 @@ type ModelCallErrorFields = Pick< >; type ModelCallEndedHookFields = Pick< PluginHookModelCallEndedEvent, - "durationMs" | "outcome" | "errorCategory" | "upstreamRequestIdHash" + | "durationMs" + | "outcome" + | "errorCategory" + | "requestPayloadBytes" + | "responseStreamBytes" + | "timeToFirstByteMs" + | "upstreamRequestIdHash" >; +type ModelCallSizeTimingFields = Pick< + Extract, + "requestPayloadBytes" | "responseStreamBytes" | "timeToFirstByteMs" +>; +type ModelCallObservationState = { + requestPayloadBytes?: number; + responseStreamBytes: number; + timeToFirstByteMs?: number; +}; const MODEL_CALL_STREAM_RETURN_TIMEOUT_MS = 1000; const TRACEPARENT_HEADER_NAME = "traceparent"; type ModelCallStreamOptions = Parameters[2]; +function utf8JsonByteLength(value: unknown): number | undefined { + try { + return Buffer.byteLength(JSON.stringify(value), "utf8"); + } catch { + return undefined; + } +} + +function assignRequestPayloadBytes(state: ModelCallObservationState, payload: unknown): void { + const bytes = utf8JsonByteLength(payload); + if (bytes !== undefined) { + state.requestPayloadBytes = bytes; + } +} + +function observeResponseChunk( + state: ModelCallObservationState, + startedAt: number, + chunk: unknown, +): void { + state.timeToFirstByteMs ??= Math.max(0, Date.now() - startedAt); + const bytes = utf8JsonByteLength(chunk); + if (bytes !== undefined) { + state.responseStreamBytes += bytes; + } +} + +function modelCallSizeTimingFields(state: ModelCallObservationState): ModelCallSizeTimingFields { + return { + ...(state.requestPayloadBytes !== undefined + ? { requestPayloadBytes: state.requestPayloadBytes } + : {}), + ...(state.responseStreamBytes > 0 ? { responseStreamBytes: state.responseStreamBytes } : {}), + ...(state.timeToFirstByteMs !== undefined + ? { timeToFirstByteMs: state.timeToFirstByteMs } + : {}), + }; +} + function isPromiseLike(value: unknown): value is PromiseLike { if (value === null || (typeof value !== "object" && typeof value !== "function")) { return false; @@ -168,34 +222,45 @@ function emitModelCallStarted(eventBase: ModelCallEventBase): void { dispatchModelCallStartedHook(eventBase); } -function emitModelCallCompleted(eventBase: ModelCallEventBase, startedAt: number): void { +function emitModelCallCompleted( + eventBase: ModelCallEventBase, + startedAt: number, + state: ModelCallObservationState, +): void { const durationMs = Date.now() - startedAt; + const sizeTimingFields = modelCallSizeTimingFields(state); emitTrustedDiagnosticEvent({ type: "model.call.completed", ...eventBase, durationMs, + ...sizeTimingFields, }); dispatchModelCallEndedHook(eventBase, { durationMs, outcome: "completed", + ...sizeTimingFields, }); } function emitModelCallError( eventBase: ModelCallEventBase, startedAt: number, + state: ModelCallObservationState, fields: ModelCallErrorFields, ): void { const durationMs = Date.now() - startedAt; + const sizeTimingFields = modelCallSizeTimingFields(state); emitTrustedDiagnosticEvent({ type: "model.call.error", ...eventBase, durationMs, + ...sizeTimingFields, ...fields, }); dispatchModelCallEndedHook(eventBase, { durationMs, outcome: "error", + ...sizeTimingFields, ...fields, }); } @@ -203,10 +268,31 @@ function emitModelCallError( function withDiagnosticTraceparentHeader( options: ModelCallStreamOptions, trace: DiagnosticTraceContext, + state: ModelCallObservationState, ): ModelCallStreamOptions { const traceparent = formatDiagnosticTraceparent(trace); + const originalOnPayload = options?.onPayload; + const onPayload: NonNullable["onPayload"] = (payload, model) => { + if (!originalOnPayload) { + assignRequestPayloadBytes(state, payload); + return undefined; + } + const result = originalOnPayload(payload, model); + if (isPromiseLike(result)) { + return result.then((replacement) => { + assignRequestPayloadBytes(state, replacement ?? payload); + return replacement; + }); + } + assignRequestPayloadBytes(state, result ?? payload); + return result; + }; + if (!traceparent) { - return options; + return { + ...options, + onPayload, + }; } const headers: Record = {}; @@ -220,6 +306,7 @@ function withDiagnosticTraceparentHeader( return { ...options, headers, + onPayload, }; } @@ -259,6 +346,7 @@ async function* observeModelCallIterator( iterator: AsyncIterator, eventBase: ModelCallEventBase, startedAt: number, + state: ModelCallObservationState, ): AsyncIterable { let terminalEmitted = false; try { @@ -267,18 +355,19 @@ async function* observeModelCallIterator( if (next.done) { break; } + observeResponseChunk(state, startedAt, next.value); yield next.value; } terminalEmitted = true; - emitModelCallCompleted(eventBase, startedAt); + emitModelCallCompleted(eventBase, startedAt, state); } catch (err) { terminalEmitted = true; - emitModelCallError(eventBase, startedAt, modelCallErrorFields(err)); + emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err)); throw err; } finally { if (!terminalEmitted) { await safeReturnIterator(iterator); - emitModelCallCompleted(eventBase, startedAt); + emitModelCallCompleted(eventBase, startedAt, state); } } } @@ -288,9 +377,10 @@ function observeModelCallStream>( createIterator: () => AsyncIterator, eventBase: ModelCallEventBase, startedAt: number, + state: ModelCallObservationState, ): T { const observedIterator = () => - observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator](); + observeModelCallIterator(createIterator(), eventBase, startedAt, state)[Symbol.asyncIterator](); let hasNonConfigurableIterator = false; try { hasNonConfigurableIterator = @@ -318,6 +408,7 @@ function observeModelCallResult( result: unknown, eventBase: ModelCallEventBase, startedAt: number, + state: ModelCallObservationState, ): unknown { const createIterator = asyncIteratorFactory(result); if (createIterator) { @@ -326,9 +417,10 @@ function observeModelCallResult( createIterator, eventBase, startedAt, + state, ); } - emitModelCallCompleted(eventBase, startedAt); + emitModelCallCompleted(eventBase, startedAt, state); return result; } @@ -342,22 +434,23 @@ export function wrapStreamFnWithDiagnosticModelCallEvents( const eventBase = baseModelCallEvent(ctx, callId, trace); emitModelCallStarted(eventBase); const startedAt = Date.now(); - const propagatedOptions = withDiagnosticTraceparentHeader(options, trace); + const state: ModelCallObservationState = { responseStreamBytes: 0 }; + const propagatedOptions = withDiagnosticTraceparentHeader(options, trace, state); try { const result = streamFn(model, streamContext, propagatedOptions); if (isPromiseLike(result)) { return result.then( - (resolved) => observeModelCallResult(resolved, eventBase, startedAt), + (resolved) => observeModelCallResult(resolved, eventBase, startedAt, state), (err) => { - emitModelCallError(eventBase, startedAt, modelCallErrorFields(err)); + emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err)); throw err; }, ); } - return observeModelCallResult(result, eventBase, startedAt); + return observeModelCallResult(result, eventBase, startedAt, state); } catch (err) { - emitModelCallError(eventBase, startedAt, modelCallErrorFields(err)); + emitModelCallError(eventBase, startedAt, state, modelCallErrorFields(err)); throw err; } }) as StreamFn; diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index d6ed9ebd091..6e29e2c8990 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -317,12 +317,18 @@ export type DiagnosticModelCallStartedEvent = DiagnosticModelCallBaseEvent & { export type DiagnosticModelCallCompletedEvent = DiagnosticModelCallBaseEvent & { type: "model.call.completed"; durationMs: number; + requestPayloadBytes?: number; + responseStreamBytes?: number; + timeToFirstByteMs?: number; }; export type DiagnosticModelCallErrorEvent = DiagnosticModelCallBaseEvent & { type: "model.call.error"; durationMs: number; errorCategory: string; + requestPayloadBytes?: number; + responseStreamBytes?: number; + timeToFirstByteMs?: number; }; export type DiagnosticContextAssembledEvent = DiagnosticBaseEvent & { diff --git a/src/logging/diagnostic-stability-bundle.ts b/src/logging/diagnostic-stability-bundle.ts index 664db1fdde5..1ca1623dc8e 100644 --- a/src/logging/diagnostic-stability-bundle.ts +++ b/src/logging/diagnostic-stability-bundle.ts @@ -338,6 +338,14 @@ function readStabilityEventRecord( assignOptionalCodeString(sanitized, "model", record.model, `${label}.model`); assignOptionalNumber(sanitized, "durationMs", record.durationMs, `${label}.durationMs`); + assignOptionalNumber(sanitized, "requestBytes", record.requestBytes, `${label}.requestBytes`); + assignOptionalNumber(sanitized, "responseBytes", record.responseBytes, `${label}.responseBytes`); + assignOptionalNumber( + sanitized, + "timeToFirstByteMs", + record.timeToFirstByteMs, + `${label}.timeToFirstByteMs`, + ); assignOptionalNumber(sanitized, "costUsd", record.costUsd, `${label}.costUsd`); assignOptionalNumber(sanitized, "count", record.count, `${label}.count`); assignOptionalNumber(sanitized, "bytes", record.bytes, `${label}.bytes`); diff --git a/src/logging/diagnostic-stability.test.ts b/src/logging/diagnostic-stability.test.ts index ba2e4d22e8b..98c5cd0b66d 100644 --- a/src/logging/diagnostic-stability.test.ts +++ b/src/logging/diagnostic-stability.test.ts @@ -152,6 +152,9 @@ describe("diagnostic stability recorder", () => { provider: "openai", model: "gpt-5.4", durationMs: 1, + requestPayloadBytes: 1234, + responseStreamBytes: 567, + timeToFirstByteMs: 89, errorCategory: "TypeError", }); await new Promise((resolve) => setImmediate(resolve)); @@ -167,8 +170,13 @@ describe("diagnostic stability recorder", () => { type: "model.call.error", provider: "openai", model: "gpt-5.4", + durationMs: 1, + requestBytes: 1234, + responseBytes: 567, + timeToFirstByteMs: 89, reason: "TypeError", }); + expect(JSON.stringify(snapshot.events[1])).not.toContain("call-1"); }); it("summarizes memory and large payload events", () => { diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index 086a353fa37..310478f8102 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -31,6 +31,9 @@ export type DiagnosticStabilityEventRecord = { provider?: string; model?: string; durationMs?: number; + requestBytes?: number; + responseBytes?: number; + timeToFirstByteMs?: number; resultCount?: number; commandLength?: number; exitCode?: number; @@ -341,11 +344,17 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.provider = event.provider; record.model = event.model; record.durationMs = event.durationMs; + record.requestBytes = event.requestPayloadBytes; + record.responseBytes = event.responseStreamBytes; + record.timeToFirstByteMs = event.timeToFirstByteMs; break; case "model.call.error": record.provider = event.provider; record.model = event.model; record.durationMs = event.durationMs; + record.requestBytes = event.requestPayloadBytes; + record.responseBytes = event.responseStreamBytes; + record.timeToFirstByteMs = event.timeToFirstByteMs; assignReasonCode(record, event.errorCategory); break; case "log.record": diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index 98d242b0083..334d69e7f2e 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -212,6 +212,9 @@ export type PluginHookModelCallEndedEvent = PluginHookModelCallBaseEvent & { durationMs: number; outcome: "completed" | "error"; errorCategory?: string; + requestPayloadBytes?: number; + responseStreamBytes?: number; + timeToFirstByteMs?: number; upstreamRequestIdHash?: string; };