feat(diagnostics): emit model call events

Emit structured diagnostic events for embedded run and model-call lifecycle with trace context, duration, and safe error categories.
This commit is contained in:
Vincent Koc
2026-04-24 02:17:07 -07:00
committed by GitHub
parent e5f55dd024
commit 0e7250f37b
8 changed files with 535 additions and 20 deletions

View File

@@ -14,6 +14,7 @@ Docs: https://docs.openclaw.ai
- Diagnostics/OTEL: pass immutable per-run diagnostic trace context through agent and tool hook contexts, and parent exported diagnostic spans from validated context without retaining global trace state. Thanks @vincentkoc.
- Diagnostics/OTEL: make exporter startup restart-safe so config reloads do not retain stale SDKs, log transports, or diagnostic event listeners. Thanks @vincentkoc.
- Diagnostics: emit structured tool execution diagnostic events with trace context, timing, and redacted error metadata. Thanks @vincentkoc.
- Diagnostics: emit structured run and model-call diagnostic events with trace context, duration, and non-message error metadata. Thanks @vincentkoc.
- Control UI/chat: add a Steer action on queued messages so a browser follow-up can be injected into the active run without retyping it.
- Control UI/Talk: add browser WebRTC realtime voice sessions backed by OpenAI Realtime, with Gateway-minted ephemeral client secrets and `openclaw_agent_consult` handoff to the full OpenClaw agent.
- Agents/tools: add optional per-call `timeoutMs` support for image, video, music, and TTS generation tools so agents can extend provider request timeouts only when a specific generation needs it.

View File

@@ -1,2 +1,2 @@
8ca22ea6125fb198641c676d73b4df5a3bc49079be68bef8ed0718a54c1bb53a plugin-sdk-api-baseline.json
197d9743128020062fc457228fa9139d0bd465d9e1775101bfc39137f4a10896 plugin-sdk-api-baseline.jsonl
b125289f628c19afb6087dcd58b674fa8acc8899545f99db81c264c4c964d17f plugin-sdk-api-baseline.json
2a2e9959cd35a375ec97682ec5d5108d94d4e77a82085929c58e9a994313d5e6 plugin-sdk-api-baseline.jsonl

View File

@@ -815,6 +815,11 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
case "tool.execution.started":
case "tool.execution.completed":
case "tool.execution.error":
case "run.started":
case "run.completed":
case "model.call.started":
case "model.call.completed":
case "model.call.error":
case "diagnostic.memory.sample":
case "diagnostic.memory.pressure":
case "payload.large":

View File

@@ -0,0 +1,167 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import { beforeEach, describe, expect, it } from "vitest";
import {
onDiagnosticEvent,
resetDiagnosticEventsForTest,
type DiagnosticEventPayload,
} from "../../../infra/diagnostic-events.js";
import { createDiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js";
import { wrapStreamFnWithDiagnosticModelCallEvents } from "./attempt.model-diagnostic-events.js";
async function collectModelCallEvents(run: () => Promise<void>): Promise<DiagnosticEventPayload[]> {
const events: DiagnosticEventPayload[] = [];
const stop = onDiagnosticEvent((event) => {
if (event.type.startsWith("model.call.")) {
events.push(event);
}
});
try {
await run();
return events;
} finally {
stop();
}
}
async function drain(stream: AsyncIterable<unknown>): Promise<void> {
for await (const _ of stream) {
// drain
}
}
describe("wrapStreamFnWithDiagnosticModelCallEvents", () => {
beforeEach(() => {
resetDiagnosticEventsForTest();
});
it("emits started and completed events for async streams", async () => {
async function* stream() {
yield { type: "text", text: "ok" };
}
const originalStream = stream() as unknown as AsyncIterable<unknown> & {
result: () => Promise<string>;
};
originalStream.result = async () => "kept";
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(() => originalStream) as unknown as StreamFn,
{
runId: "run-1",
sessionKey: "session-key",
sessionId: "session-id",
provider: "openai",
model: "gpt-5.4",
api: "openai-responses",
transport: "http",
trace: createDiagnosticTraceContext({
traceId: "4bf92f3577b34da6a3ce929d0e0e4736",
spanId: "00f067aa0ba902b7",
}),
nextCallId: () => "call-1",
},
);
const events = await collectModelCallEvents(async () => {
const returned = wrapped(
{} as never,
{} as never,
{} as never,
) as unknown as typeof originalStream;
expect(returned).toBe(originalStream);
expect(await returned.result()).toBe("kept");
await drain(returned);
});
expect(events.map((event) => event.type)).toEqual([
"model.call.started",
"model.call.completed",
]);
expect(events[0]).toMatchObject({
type: "model.call.started",
runId: "run-1",
callId: "call-1",
sessionKey: "session-key",
sessionId: "session-id",
provider: "openai",
model: "gpt-5.4",
api: "openai-responses",
transport: "http",
});
expect(events[0]?.trace?.parentSpanId).toBe("00f067aa0ba902b7");
expect(events[1]).toMatchObject({
type: "model.call.completed",
callId: "call-1",
durationMs: expect.any(Number),
});
});
it("emits error events when stream iteration fails", async () => {
const stream = {
[Symbol.asyncIterator]() {
return {
async next(): Promise<IteratorResult<unknown>> {
throw new TypeError("provider failed");
},
};
},
};
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(() => stream) as unknown as StreamFn,
{
runId: "run-1",
provider: "anthropic",
model: "sonnet-4.6",
trace: createDiagnosticTraceContext(),
nextCallId: () => "call-err",
},
);
const events = await collectModelCallEvents(async () => {
await expect(
drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable<unknown>),
).rejects.toThrow("provider failed");
});
expect(events.map((event) => event.type)).toEqual(["model.call.started", "model.call.error"]);
expect(events[1]).toMatchObject({
type: "model.call.error",
callId: "call-err",
errorCategory: "TypeError",
durationMs: expect.any(Number),
});
});
it("emits error events when stream consumption stops early", async () => {
async function* stream() {
yield { type: "text", text: "first" };
yield { type: "text", text: "second" };
}
const wrapped = wrapStreamFnWithDiagnosticModelCallEvents(
(() => stream()) as unknown as StreamFn,
{
runId: "run-1",
provider: "openai",
model: "gpt-5.4",
trace: createDiagnosticTraceContext(),
nextCallId: () => "call-abandoned",
},
);
const events = await collectModelCallEvents(async () => {
for await (const _ of wrapped(
{} as never,
{} as never,
{} as never,
) as AsyncIterable<unknown>) {
break;
}
});
expect(events.map((event) => event.type)).toEqual(["model.call.started", "model.call.error"]);
expect(events[1]).toMatchObject({
type: "model.call.error",
callId: "call-abandoned",
errorCategory: "StreamAbandoned",
durationMs: expect.any(Number),
});
});
});

View File

@@ -0,0 +1,183 @@
import type { StreamFn } from "@mariozechner/pi-agent-core";
import {
emitDiagnosticEvent,
type DiagnosticEventInput,
} from "../../../infra/diagnostic-events.js";
import {
createChildDiagnosticTraceContext,
freezeDiagnosticTraceContext,
type DiagnosticTraceContext,
} from "../../../infra/diagnostic-trace-context.js";
type ModelCallDiagnosticContext = {
runId: string;
sessionKey?: string;
sessionId?: string;
provider: string;
model: string;
api?: string;
transport?: string;
trace: DiagnosticTraceContext;
nextCallId: () => string;
};
type ModelCallEventBase = Omit<
Extract<DiagnosticEventInput, { type: "model.call.started" }>,
"type"
>;
export function diagnosticErrorCategory(err: unknown): string {
if (err instanceof Error && err.name.trim()) {
return err.name;
}
return typeof err;
}
function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
return (
value !== null &&
(typeof value === "object" || typeof value === "function") &&
typeof (value as { then?: unknown }).then === "function"
);
}
function isAsyncIterable(value: unknown): value is AsyncIterable<unknown> {
return (
value !== null &&
typeof value === "object" &&
typeof (value as { [Symbol.asyncIterator]?: unknown })[Symbol.asyncIterator] === "function"
);
}
function baseModelCallEvent(
ctx: ModelCallDiagnosticContext,
callId: string,
trace: DiagnosticTraceContext,
): ModelCallEventBase {
return {
runId: ctx.runId,
callId,
...(ctx.sessionKey && { sessionKey: ctx.sessionKey }),
...(ctx.sessionId && { sessionId: ctx.sessionId }),
provider: ctx.provider,
model: ctx.model,
...(ctx.api && { api: ctx.api }),
...(ctx.transport && { transport: ctx.transport }),
trace,
};
}
async function* observeModelCallIterator<T>(
iterator: AsyncIterator<T>,
eventBase: ModelCallEventBase,
startedAt: number,
): AsyncIterable<T> {
let terminalEmitted = false;
try {
for (;;) {
const next = await iterator.next();
if (next.done) {
break;
}
yield next.value;
}
terminalEmitted = true;
emitDiagnosticEvent({
type: "model.call.completed",
...eventBase,
durationMs: Date.now() - startedAt,
});
} catch (err) {
terminalEmitted = true;
emitDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
errorCategory: diagnosticErrorCategory(err),
});
throw err;
} finally {
if (!terminalEmitted) {
await iterator.return?.();
emitDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
errorCategory: "StreamAbandoned",
});
}
}
}
function observeModelCallStream<T extends AsyncIterable<unknown>>(
stream: T,
eventBase: ModelCallEventBase,
startedAt: number,
): T {
const createIterator = stream[Symbol.asyncIterator].bind(stream);
Object.defineProperty(stream, Symbol.asyncIterator, {
configurable: true,
value: () =>
observeModelCallIterator(createIterator(), eventBase, startedAt)[Symbol.asyncIterator](),
});
return stream;
}
function observeModelCallResult(
result: unknown,
eventBase: ModelCallEventBase,
startedAt: number,
): unknown {
if (isAsyncIterable(result)) {
return observeModelCallStream(result, eventBase, startedAt);
}
emitDiagnosticEvent({
type: "model.call.completed",
...eventBase,
durationMs: Date.now() - startedAt,
});
return result;
}
export function wrapStreamFnWithDiagnosticModelCallEvents(
streamFn: StreamFn,
ctx: ModelCallDiagnosticContext,
): StreamFn {
return ((model, streamContext, options) => {
const callId = ctx.nextCallId();
const trace = freezeDiagnosticTraceContext(createChildDiagnosticTraceContext(ctx.trace));
const eventBase = baseModelCallEvent(ctx, callId, trace);
emitDiagnosticEvent({
type: "model.call.started",
...eventBase,
});
const startedAt = Date.now();
try {
const result = streamFn(model, streamContext, options);
if (isPromiseLike(result)) {
return result.then(
(resolved) => observeModelCallResult(resolved, eventBase, startedAt),
(err) => {
emitDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
errorCategory: diagnosticErrorCategory(err),
});
throw err;
},
);
}
return observeModelCallResult(result, eventBase, startedAt);
} catch (err) {
emitDiagnosticEvent({
type: "model.call.error",
...eventBase,
durationMs: Date.now() - startedAt,
errorCategory: diagnosticErrorCategory(err),
});
throw err;
}
}) as StreamFn;
}

View File

@@ -9,8 +9,10 @@ import {
} from "@mariozechner/pi-coding-agent";
import { filterHeartbeatPairs } from "../../../auto-reply/heartbeat-filter.js";
import { resolveChannelCapabilities } from "../../../config/channel-capabilities.js";
import { emitDiagnosticEvent } from "../../../infra/diagnostic-events.js";
import {
createDiagnosticTraceContext,
createChildDiagnosticTraceContext,
freezeDiagnosticTraceContext,
} from "../../../infra/diagnostic-trace-context.js";
import { isEmbeddedMode } from "../../../infra/embedded-mode.js";
@@ -225,6 +227,10 @@ import {
resolveAttemptBootstrapContext,
runAttemptContextEngineBootstrap,
} from "./attempt.context-engine-helpers.js";
import {
diagnosticErrorCategory,
wrapStreamFnWithDiagnosticModelCallEvents,
} from "./attempt.model-diagnostic-events.js";
import {
buildAfterTurnRuntimeContext,
buildAfterTurnRuntimeContextFromUsage,
@@ -477,6 +483,15 @@ export async function runEmbeddedAttempt(
});
let restoreSkillEnv: (() => void) | undefined;
let aborted = Boolean(params.abortSignal?.aborted);
let externalAbort = false;
let timedOut = false;
let idleTimedOut = false;
let timedOutDuringCompaction = false;
let promptError: unknown = null;
let emitDiagnosticRunCompleted:
| ((outcome: "completed" | "aborted" | "error", err?: unknown) => void)
| undefined;
try {
const { shouldLoadSkillEntries, skillEntries } = resolveEmbeddedRunSkillEntries({
workspaceDir: effectiveWorkspace,
@@ -516,6 +531,40 @@ export async function runEmbeddedAttempt(
const contextInjectionMode = resolveContextInjectionMode(params.config);
const agentDir = params.agentDir ?? resolveOpenClawAgentDir();
const diagnosticTrace = freezeDiagnosticTraceContext(createDiagnosticTraceContext());
const runTrace = freezeDiagnosticTraceContext(
createChildDiagnosticTraceContext(diagnosticTrace),
);
const diagnosticRunBase = {
runId: params.runId,
...(params.sessionKey && { sessionKey: params.sessionKey }),
...(params.sessionId && { sessionId: params.sessionId }),
provider: params.provider,
model: params.modelId,
trigger: params.trigger,
...((params.messageChannel ?? params.messageProvider)
? { channel: params.messageChannel ?? params.messageProvider }
: {}),
trace: runTrace,
};
emitDiagnosticEvent({
type: "run.started",
...diagnosticRunBase,
});
const diagnosticRunStartedAt = Date.now();
let diagnosticRunCompleted = false;
emitDiagnosticRunCompleted = (outcome, err) => {
if (diagnosticRunCompleted) {
return;
}
diagnosticRunCompleted = true;
emitDiagnosticEvent({
type: "run.completed",
...diagnosticRunBase,
durationMs: Date.now() - diagnosticRunStartedAt,
outcome,
...(err ? { errorCategory: diagnosticErrorCategory(err) } : {}),
});
};
const toolsRaw = params.disableTools
? []
: (() => {
@@ -986,12 +1035,6 @@ export async function runEmbeddedAttempt(
let removeToolResultContextGuard: (() => void) | undefined;
let trajectoryRecorder: ReturnType<typeof createTrajectoryRuntimeRecorder> | null = null;
let trajectoryEndRecorded = false;
let aborted = Boolean(params.abortSignal?.aborted);
let externalAbort = false;
let timedOut = false;
let idleTimedOut = false;
let timedOutDuringCompaction = false;
let promptError: unknown = null;
try {
await repairSessionFileIfNeeded({
sessionFile: params.sessionFile,
@@ -1590,6 +1633,21 @@ export async function runEmbeddedAttempt(
(error) => idleTimeoutTrigger?.(error),
);
}
let diagnosticModelCallSeq = 0;
activeSession.agent.streamFn = wrapStreamFnWithDiagnosticModelCallEvents(
activeSession.agent.streamFn,
{
runId: params.runId,
...(params.sessionKey && { sessionKey: params.sessionKey }),
...(params.sessionId && { sessionId: params.sessionId }),
provider: params.provider,
model: params.modelId,
api: params.model.api,
transport: effectiveAgentTransport,
trace: runTrace,
nextCallId: () => `${params.runId}:model:${(diagnosticModelCallSeq += 1)}`,
},
);
try {
const prior = await sanitizeSessionHistory({
@@ -2846,20 +2904,40 @@ export async function runEmbeddedAttempt(
// flushPendingToolResults() fires while tools are still executing, inserting
// synthetic "missing tool result" errors and causing silent agent failures.
// See: https://github.com/openclaw/openclaw/issues/8643
await cleanupEmbeddedAttemptResources({
removeToolResultContextGuard,
flushPendingToolResultsAfterIdle,
session,
sessionManager,
releaseWsSession,
allowWsSessionPool:
!promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction,
sessionId: params.sessionId,
bundleLspRuntime,
sessionLock,
});
let cleanupError: unknown;
try {
await cleanupEmbeddedAttemptResources({
removeToolResultContextGuard,
flushPendingToolResultsAfterIdle,
session,
sessionManager,
releaseWsSession,
allowWsSessionPool:
!promptError && !aborted && !timedOut && !idleTimedOut && !timedOutDuringCompaction,
sessionId: params.sessionId,
bundleLspRuntime,
sessionLock,
});
} catch (err) {
cleanupError = err;
}
emitDiagnosticRunCompleted?.(
cleanupError || promptError
? "error"
: aborted || timedOut || idleTimedOut || timedOutDuringCompaction
? "aborted"
: "completed",
cleanupError ?? promptError,
);
if (cleanupError) {
await Promise.reject(cleanupError);
}
}
} finally {
emitDiagnosticRunCompleted?.(
aborted ? "aborted" : "error",
promptError ?? new Error("run exited before diagnostic completion"),
);
restoreSkillEnv?.();
}
}

View File

@@ -185,6 +185,54 @@ export type DiagnosticToolExecutionErrorEvent = DiagnosticToolExecutionBaseEvent
errorCode?: string;
};
type DiagnosticRunBaseEvent = DiagnosticBaseEvent & {
runId: string;
sessionKey?: string;
sessionId?: string;
provider?: string;
model?: string;
trigger?: string;
channel?: string;
};
export type DiagnosticRunStartedEvent = DiagnosticRunBaseEvent & {
type: "run.started";
};
export type DiagnosticRunCompletedEvent = DiagnosticRunBaseEvent & {
type: "run.completed";
durationMs: number;
outcome: "completed" | "aborted" | "error";
errorCategory?: string;
};
type DiagnosticModelCallBaseEvent = DiagnosticBaseEvent & {
type: "model.call.started" | "model.call.completed" | "model.call.error";
runId: string;
callId: string;
sessionKey?: string;
sessionId?: string;
provider: string;
model: string;
api?: string;
transport?: string;
};
export type DiagnosticModelCallStartedEvent = DiagnosticModelCallBaseEvent & {
type: "model.call.started";
};
export type DiagnosticModelCallCompletedEvent = DiagnosticModelCallBaseEvent & {
type: "model.call.completed";
durationMs: number;
};
export type DiagnosticModelCallErrorEvent = DiagnosticModelCallBaseEvent & {
type: "model.call.error";
durationMs: number;
errorCategory: string;
};
export type DiagnosticMemoryUsage = {
rssBytes: number;
heapTotalBytes: number;
@@ -238,6 +286,11 @@ export type DiagnosticEventPayload =
| DiagnosticToolExecutionStartedEvent
| DiagnosticToolExecutionCompletedEvent
| DiagnosticToolExecutionErrorEvent
| DiagnosticRunStartedEvent
| DiagnosticRunCompletedEvent
| DiagnosticModelCallStartedEvent
| DiagnosticModelCallCompletedEvent
| DiagnosticModelCallErrorEvent
| DiagnosticMemorySampleEvent
| DiagnosticMemoryPressureEvent
| DiagnosticPayloadLargeEvent;

View File

@@ -247,6 +247,34 @@ function sanitizeDiagnosticEvent(event: DiagnosticEventPayload): DiagnosticStabi
record.durationMs = event.durationMs;
record.reason = event.errorCategory;
break;
case "run.started":
record.provider = event.provider;
record.model = event.model;
record.channel = event.channel;
break;
case "run.completed":
record.provider = event.provider;
record.model = event.model;
record.channel = event.channel;
record.durationMs = event.durationMs;
record.outcome = event.outcome;
assignReasonCode(record, event.errorCategory);
break;
case "model.call.started":
record.provider = event.provider;
record.model = event.model;
break;
case "model.call.completed":
record.provider = event.provider;
record.model = event.model;
record.durationMs = event.durationMs;
break;
case "model.call.error":
record.provider = event.provider;
record.model = event.model;
record.durationMs = event.durationMs;
record.reason = event.errorCategory;
break;
case "diagnostic.memory.sample":
record.memory = copyMemory(event.memory);
break;