From 0e7250f37bf347c72790abf2a6692d5ec84e5ea1 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Fri, 24 Apr 2026 02:17:07 -0700 Subject: [PATCH] feat(diagnostics): emit model call events Emit structured diagnostic events for embedded run and model-call lifecycle with trace context, duration, and safe error categories. --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- extensions/diagnostics-otel/src/service.ts | 5 + .../attempt.model-diagnostic-events.test.ts | 167 ++++++++++++++++ .../run/attempt.model-diagnostic-events.ts | 183 ++++++++++++++++++ src/agents/pi-embedded-runner/run/attempt.ts | 114 +++++++++-- src/infra/diagnostic-events.ts | 53 +++++ src/logging/diagnostic-stability.ts | 28 +++ 8 files changed, 535 insertions(+), 20 deletions(-) create mode 100644 src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts create mode 100644 src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index b9c10facbc7..3aa9365a818 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai - Diagnostics/OTEL: pass immutable per-run diagnostic trace context through agent and tool hook contexts, and parent exported diagnostic spans from validated context without retaining global trace state. Thanks @vincentkoc. - Diagnostics/OTEL: make exporter startup restart-safe so config reloads do not retain stale SDKs, log transports, or diagnostic event listeners. Thanks @vincentkoc. - Diagnostics: emit structured tool execution diagnostic events with trace context, timing, and redacted error metadata. Thanks @vincentkoc. +- Diagnostics: emit structured run and model-call diagnostic events with trace context, duration, and non-message error metadata. Thanks @vincentkoc. - Control UI/chat: add a Steer action on queued messages so a browser follow-up can be injected into the active run without retyping it. - Control UI/Talk: add browser WebRTC realtime voice sessions backed by OpenAI Realtime, with Gateway-minted ephemeral client secrets and `openclaw_agent_consult` handoff to the full OpenClaw agent. - Agents/tools: add optional per-call `timeoutMs` support for image, video, music, and TTS generation tools so agents can extend provider request timeouts only when a specific generation needs it. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index cd321edcfa1..2864a9a3fbe 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -8ca22ea6125fb198641c676d73b4df5a3bc49079be68bef8ed0718a54c1bb53a plugin-sdk-api-baseline.json -197d9743128020062fc457228fa9139d0bd465d9e1775101bfc39137f4a10896 plugin-sdk-api-baseline.jsonl +b125289f628c19afb6087dcd58b674fa8acc8899545f99db81c264c4c964d17f plugin-sdk-api-baseline.json +2a2e9959cd35a375ec97682ec5d5108d94d4e77a82085929c58e9a994313d5e6 plugin-sdk-api-baseline.jsonl diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index c61f7dcc424..bd215fc62bd 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -815,6 +815,11 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { case "tool.execution.started": case "tool.execution.completed": case "tool.execution.error": + case "run.started": + case "run.completed": + case "model.call.started": + case "model.call.completed": + case "model.call.error": case "diagnostic.memory.sample": case "diagnostic.memory.pressure": case "payload.large": diff --git a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts new file mode 100644 index 00000000000..41991b073ab --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts @@ -0,0 +1,167 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { beforeEach, describe, expect, it } from "vitest"; +import { + onDiagnosticEvent, + resetDiagnosticEventsForTest, + type DiagnosticEventPayload, +} from "../../../infra/diagnostic-events.js"; +import { createDiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js"; +import { wrapStreamFnWithDiagnosticModelCallEvents } from "./attempt.model-diagnostic-events.js"; + +async function collectModelCallEvents(run: () => Promise): Promise { + const events: DiagnosticEventPayload[] = []; + const stop = onDiagnosticEvent((event) => { + if (event.type.startsWith("model.call.")) { + events.push(event); + } + }); + try { + await run(); + return events; + } finally { + stop(); + } +} + +async function drain(stream: AsyncIterable): Promise { + for await (const _ of stream) { + // drain + } +} + +describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { + beforeEach(() => { + resetDiagnosticEventsForTest(); + }); + + it("emits started and completed events for async streams", async () => { + async function* stream() { + yield { type: "text", text: "ok" }; + } + const originalStream = stream() as unknown as AsyncIterable & { + result: () => Promise; + }; + originalStream.result = async () => "kept"; + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (() => originalStream) as unknown as StreamFn, + { + runId: "run-1", + sessionKey: "session-key", + sessionId: "session-id", + provider: "openai", + model: "gpt-5.4", + api: "openai-responses", + transport: "http", + trace: createDiagnosticTraceContext({ + traceId: "4bf92f3577b34da6a3ce929d0e0e4736", + spanId: "00f067aa0ba902b7", + }), + nextCallId: () => "call-1", + }, + ); + + const events = await collectModelCallEvents(async () => { + const returned = wrapped( + {} as never, + {} as never, + {} as never, + ) as unknown as typeof originalStream; + expect(returned).toBe(originalStream); + expect(await returned.result()).toBe("kept"); + await drain(returned); + }); + + expect(events.map((event) => event.type)).toEqual([ + "model.call.started", + "model.call.completed", + ]); + expect(events[0]).toMatchObject({ + type: "model.call.started", + runId: "run-1", + callId: "call-1", + sessionKey: "session-key", + sessionId: "session-id", + provider: "openai", + model: "gpt-5.4", + api: "openai-responses", + transport: "http", + }); + expect(events[0]?.trace?.parentSpanId).toBe("00f067aa0ba902b7"); + expect(events[1]).toMatchObject({ + type: "model.call.completed", + callId: "call-1", + durationMs: expect.any(Number), + }); + }); + + it("emits error events when stream iteration fails", async () => { + const stream = { + [Symbol.asyncIterator]() { + return { + async next(): Promise> { + throw new TypeError("provider failed"); + }, + }; + }, + }; + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (() => stream) as unknown as StreamFn, + { + runId: "run-1", + provider: "anthropic", + model: "sonnet-4.6", + trace: createDiagnosticTraceContext(), + nextCallId: () => "call-err", + }, + ); + + const events = await collectModelCallEvents(async () => { + await expect( + drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable), + ).rejects.toThrow("provider failed"); + }); + + expect(events.map((event) => event.type)).toEqual(["model.call.started", "model.call.error"]); + expect(events[1]).toMatchObject({ + type: "model.call.error", + callId: "call-err", + errorCategory: "TypeError", + durationMs: expect.any(Number), + }); + }); + + it("emits error events when stream consumption stops early", async () => { + async function* stream() { + yield { type: "text", text: "first" }; + yield { type: "text", text: "second" }; + } + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (() => stream()) as unknown as StreamFn, + { + runId: "run-1", + provider: "openai", + model: "gpt-5.4", + trace: createDiagnosticTraceContext(), + nextCallId: () => "call-abandoned", + }, + ); + + const events = await collectModelCallEvents(async () => { + for await (const _ of wrapped( + {} as never, + {} as never, + {} as never, + ) as AsyncIterable) { + break; + } + }); + + expect(events.map((event) => event.type)).toEqual(["model.call.started", "model.call.error"]); + expect(events[1]).toMatchObject({ + type: "model.call.error", + callId: "call-abandoned", + errorCategory: "StreamAbandoned", + durationMs: expect.any(Number), + }); + }); +}); diff --git a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts new file mode 100644 index 00000000000..a477ed6a5b7 --- /dev/null +++ b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts @@ -0,0 +1,183 @@ +import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { + emitDiagnosticEvent, + type DiagnosticEventInput, +} from "../../../infra/diagnostic-events.js"; +import { + createChildDiagnosticTraceContext, + freezeDiagnosticTraceContext, + type DiagnosticTraceContext, +} from "../../../infra/diagnostic-trace-context.js"; + +type ModelCallDiagnosticContext = { + runId: string; + sessionKey?: string; + sessionId?: string; + provider: string; + model: string; + api?: string; + transport?: string; + trace: DiagnosticTraceContext; + nextCallId: () => string; +}; + +type ModelCallEventBase = Omit< + Extract, + "type" +>; + +export function diagnosticErrorCategory(err: unknown): string { + if (err instanceof Error && err.name.trim()) { + return err.name; + } + return typeof err; +} + +function isPromiseLike(value: unknown): value is PromiseLike { + return ( + value !== null && + (typeof value === "object" || typeof value === "function") && + typeof (value as { then?: unknown }).then === "function" + ); +} + +function isAsyncIterable(value: unknown): value is AsyncIterable { + return ( + value !== null && + typeof value === "object" && + typeof (value as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator] === "function" + ); +} + +function baseModelCallEvent( + ctx: ModelCallDiagnosticContext, + callId: string, + trace: DiagnosticTraceContext, +): ModelCallEventBase { + return { + runId: ctx.runId, + callId, + ...(ctx.sessionKey && { sessionKey: ctx.sessionKey }), + ...(ctx.sessionId && { sessionId: ctx.sessionId }), + provider: ctx.provider, + model: ctx.model, + ...(ctx.api && { api: ctx.api }), + ...(ctx.transport && { transport: ctx.transport }), + trace, + }; +} + +async function* observeModelCallIterator( + iterator: AsyncIterator, + eventBase: ModelCallEventBase, + startedAt: number, +): AsyncIterable { + let terminalEmitted = false; + try { + for (;;) { + const next = await iterator.next(); + if (next.done) { + break; + } + yield next.value; + } + terminalEmitted = true; + emitDiagnosticEvent({ + type: "model.call.completed", + ...eventBase, + durationMs: Date.now() - startedAt, + }); + } catch (err) { + terminalEmitted = true; + emitDiagnosticEvent({ + type: "model.call.error", + ...eventBase, + durationMs: Date.now() - startedAt, + errorCategory: diagnosticErrorCategory(err), + }); + throw err; + } finally { + if (!terminalEmitted) { + await iterator.return?.(); + emitDiagnosticEvent({ + type: "model.call.error", + ...eventBase, + durationMs: Date.now() - startedAt, + errorCategory: "StreamAbandoned", + }); + } + } +} + +function observeModelCallStream>( + stream: T, + eventBase: ModelCallEventBase, + startedAt: number, +): T { + const createIterator = stream[Symbol.asyncIterator].bind(stream); + Object.defineProperty(stream, Symbol.asyncIterator, { + configurable: true, + value: () => + observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator](), + }); + return stream; +} + +function observeModelCallResult( + result: unknown, + eventBase: ModelCallEventBase, + startedAt: number, +): unknown { + if (isAsyncIterable(result)) { + return observeModelCallStream(result, eventBase, startedAt); + } + emitDiagnosticEvent({ + type: "model.call.completed", + ...eventBase, + durationMs: Date.now() - startedAt, + }); + return result; +} + +export function wrapStreamFnWithDiagnosticModelCallEvents( + streamFn: StreamFn, + ctx: ModelCallDiagnosticContext, +): StreamFn { + return ((model, streamContext, options) => { + const callId = ctx.nextCallId(); + const trace = freezeDiagnosticTraceContext(createChildDiagnosticTraceContext(ctx.trace)); + const eventBase = baseModelCallEvent(ctx, callId, trace); + emitDiagnosticEvent({ + type: "model.call.started", + ...eventBase, + }); + const startedAt = Date.now(); + + try { + const result = streamFn(model, streamContext, options); + if (isPromiseLike(result)) { + return result.then( + (resolved) => observeModelCallResult(resolved, eventBase, startedAt), + (err) => { + emitDiagnosticEvent({ + type: "model.call.error", + ...eventBase, + durationMs: Date.now() - startedAt, + errorCategory: diagnosticErrorCategory(err), + }); + throw err; + }, + ); + } + return observeModelCallResult(result, eventBase, startedAt); + } catch (err) { + emitDiagnosticEvent({ + type: "model.call.error", + ...eventBase, + durationMs: Date.now() - startedAt, + errorCategory: diagnosticErrorCategory(err), + }); + throw err; + } + }) as StreamFn; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index c02a39a6b5f..aa6b16e4329 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -9,8 +9,10 @@ import { } from "@mariozechner/pi-coding-agent"; import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js"; import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js"; +import { emitDiagnosticEvent } from "../../../infra/diagnostic-events.js"; import { createDiagnosticTraceContext, + createChildDiagnosticTraceContext, freezeDiagnosticTraceContext, } from "../../../infra/diagnostic-trace-context.js"; import { isEmbeddedMode } from "../../../infra/embedded-mode.js"; @@ -225,6 +227,10 @@ import { resolveAttemptBootstrapContext, runAttemptContextEngineBootstrap, } from "./attempt.context-engine-helpers.js"; +import { + diagnosticErrorCategory, + wrapStreamFnWithDiagnosticModelCallEvents, +} from "./attempt.model-diagnostic-events.js"; import { buildAfterTurnRuntimeContext, buildAfterTurnRuntimeContextFromUsage, @@ -477,6 +483,15 @@ export async function runEmbeddedAttempt( }); let restoreSkillEnv: (() => void) | undefined; + let aborted = Boolean(params.abortSignal?.aborted); + let externalAbort = false; + let timedOut = false; + let idleTimedOut = false; + let timedOutDuringCompaction = false; + let promptError: unknown = null; + let emitDiagnosticRunCompleted: + | ((outcome: "completed" | "aborted" | "error", err?: unknown) => void) + | undefined; try { const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({ workspaceDir: effectiveWorkspace, @@ -516,6 +531,40 @@ export async function runEmbeddedAttempt( const contextInjectionMode = resolveContextInjectionMode(params.config); const agentDir = params.agentDir ?? resolveOpenClawAgentDir(); const diagnosticTrace = freezeDiagnosticTraceContext(createDiagnosticTraceContext()); + const runTrace = freezeDiagnosticTraceContext( + createChildDiagnosticTraceContext(diagnosticTrace), + ); + const diagnosticRunBase = { + runId: params.runId, + ...(params.sessionKey && { sessionKey: params.sessionKey }), + ...(params.sessionId && { sessionId: params.sessionId }), + provider: params.provider, + model: params.modelId, + trigger: params.trigger, + ...((params.messageChannel ?? params.messageProvider) + ? { channel: params.messageChannel ?? params.messageProvider } + : {}), + trace: runTrace, + }; + emitDiagnosticEvent({ + type: "run.started", + ...diagnosticRunBase, + }); + const diagnosticRunStartedAt = Date.now(); + let diagnosticRunCompleted = false; + emitDiagnosticRunCompleted = (outcome, err) => { + if (diagnosticRunCompleted) { + return; + } + diagnosticRunCompleted = true; + emitDiagnosticEvent({ + type: "run.completed", + ...diagnosticRunBase, + durationMs: Date.now() - diagnosticRunStartedAt, + outcome, + ...(err ? { errorCategory: diagnosticErrorCategory(err) } : {}), + }); + }; const toolsRaw = params.disableTools ? [] : (() => { @@ -986,12 +1035,6 @@ export async function runEmbeddedAttempt( let removeToolResultContextGuard: (() => void) | undefined; let trajectoryRecorder: ReturnType | null = null; let trajectoryEndRecorded = false; - let aborted = Boolean(params.abortSignal?.aborted); - let externalAbort = false; - let timedOut = false; - let idleTimedOut = false; - let timedOutDuringCompaction = false; - let promptError: unknown = null; try { await repairSessionFileIfNeeded({ sessionFile: params.sessionFile, @@ -1590,6 +1633,21 @@ export async function runEmbeddedAttempt( (error) => idleTimeoutTrigger?.(error), ); } + let diagnosticModelCallSeq = 0; + activeSession.agent.streamFn = wrapStreamFnWithDiagnosticModelCallEvents( + activeSession.agent.streamFn, + { + runId: params.runId, + ...(params.sessionKey && { sessionKey: params.sessionKey }), + ...(params.sessionId && { sessionId: params.sessionId }), + provider: params.provider, + model: params.modelId, + api: params.model.api, + transport: effectiveAgentTransport, + trace: runTrace, + nextCallId: () => `${params.runId}:model:${(diagnosticModelCallSeq += 1)}`, + }, + ); try { const prior = await sanitizeSessionHistory({ @@ -2846,20 +2904,40 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 - await cleanupEmbeddedAttemptResources({ - removeToolResultContextGuard, - flushPendingToolResultsAfterIdle, - session, - sessionManager, - releaseWsSession, - allowWsSessionPool: - !promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction, - sessionId: params.sessionId, - bundleLspRuntime, - sessionLock, - }); + let cleanupError: unknown; + try { + await cleanupEmbeddedAttemptResources({ + removeToolResultContextGuard, + flushPendingToolResultsAfterIdle, + session, + sessionManager, + releaseWsSession, + allowWsSessionPool: + !promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction, + sessionId: params.sessionId, + bundleLspRuntime, + sessionLock, + }); + } catch (err) { + cleanupError = err; + } + emitDiagnosticRunCompleted?.( + cleanupError || promptError + ? "error" + : aborted || timedOut || idleTimedOut || timedOutDuringCompaction + ? "aborted" + : "completed", + cleanupError ?? promptError, + ); + if (cleanupError) { + await Promise.reject(cleanupError); + } } } finally { + emitDiagnosticRunCompleted?.( + aborted ? "aborted" : "error", + promptError ?? new Error("run exited before diagnostic completion"), + ); restoreSkillEnv?.(); } } diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index c046935c6a9..d4ba8c59102 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -185,6 +185,54 @@ export type DiagnosticToolExecutionErrorEvent = DiagnosticToolExecutionBaseEvent errorCode?: string; }; +type DiagnosticRunBaseEvent = DiagnosticBaseEvent & { + runId: string; + sessionKey?: string; + sessionId?: string; + provider?: string; + model?: string; + trigger?: string; + channel?: string; +}; + +export type DiagnosticRunStartedEvent = DiagnosticRunBaseEvent & { + type: "run.started"; +}; + +export type DiagnosticRunCompletedEvent = DiagnosticRunBaseEvent & { + type: "run.completed"; + durationMs: number; + outcome: "completed" | "aborted" | "error"; + errorCategory?: string; +}; + +type DiagnosticModelCallBaseEvent = DiagnosticBaseEvent & { + type: "model.call.started" | "model.call.completed" | "model.call.error"; + runId: string; + callId: string; + sessionKey?: string; + sessionId?: string; + provider: string; + model: string; + api?: string; + transport?: string; +}; + +export type DiagnosticModelCallStartedEvent = DiagnosticModelCallBaseEvent & { + type: "model.call.started"; +}; + +export type DiagnosticModelCallCompletedEvent = DiagnosticModelCallBaseEvent & { + type: "model.call.completed"; + durationMs: number; +}; + +export type DiagnosticModelCallErrorEvent = DiagnosticModelCallBaseEvent & { + type: "model.call.error"; + durationMs: number; + errorCategory: string; +}; + export type DiagnosticMemoryUsage = { rssBytes: number; heapTotalBytes: number; @@ -238,6 +286,11 @@ export type DiagnosticEventPayload = | DiagnosticToolExecutionStartedEvent | DiagnosticToolExecutionCompletedEvent | DiagnosticToolExecutionErrorEvent + | DiagnosticRunStartedEvent + | DiagnosticRunCompletedEvent + | DiagnosticModelCallStartedEvent + | DiagnosticModelCallCompletedEvent + | DiagnosticModelCallErrorEvent | DiagnosticMemorySampleEvent | DiagnosticMemoryPressureEvent | DiagnosticPayloadLargeEvent; diff --git a/src/logging/diagnostic-stability.ts b/src/logging/diagnostic-stability.ts index 959819a00ec..0893a4c5bec 100644 --- a/src/logging/diagnostic-stability.ts +++ b/src/logging/diagnostic-stability.ts @@ -247,6 +247,34 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi record.durationMs = event.durationMs; record.reason = event.errorCategory; break; + case "run.started": + record.provider = event.provider; + record.model = event.model; + record.channel = event.channel; + break; + case "run.completed": + record.provider = event.provider; + record.model = event.model; + record.channel = event.channel; + record.durationMs = event.durationMs; + record.outcome = event.outcome; + assignReasonCode(record, event.errorCategory); + break; + case "model.call.started": + record.provider = event.provider; + record.model = event.model; + break; + case "model.call.completed": + record.provider = event.provider; + record.model = event.model; + record.durationMs = event.durationMs; + break; + case "model.call.error": + record.provider = event.provider; + record.model = event.model; + record.durationMs = event.durationMs; + record.reason = event.errorCategory; + break; case "diagnostic.memory.sample": record.memory = copyMemory(event.memory); break;