diff --git a/CHANGELOG.md b/CHANGELOG.md index 04f0af64a26..5cf4f09dea7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - Plugins/compat: mark `OPENCLAW_DISABLE_PERSISTED_PLUGIN_REGISTRY` as a deprecated break-glass switch and point operators at registry repair instead. Thanks @vincentkoc. - Plugins/registry: ignore stale persisted registry reads when plugin policy no longer matches current config, and stamp generated registry files with a do-not-edit warning. Thanks @vincentkoc. - Diagnostics/OTEL: surface provider request identifiers as bounded hashes on model-call diagnostics and span events, without exporting raw request IDs or metric labels. Thanks @Lidang-Jiang and @vincentkoc. +- Plugins/diagnostics: add metadata-only `model_call_started` and `model_call_ended` hooks for provider/model call telemetry without exposing prompts, responses, headers, request bodies, or raw provider request IDs. Thanks @vincentkoc. - Diagnostics/OTEL: add bounded outbound message delivery lifecycle diagnostics and export them as low-cardinality delivery spans/metrics without message body, recipient, room, or media-path data. (#71471) Thanks @vincentkoc and @jlapenna. - Diagnostics/OTEL: emit bounded exec-process diagnostics and export them as `openclaw.exec` spans without exposing command text, working directories, or container identifiers. (#71451) Thanks @vincentkoc and @jlapenna. - Diagnostics/OTEL: support `OPENCLAW_OTEL_PRELOADED=1` so the plugin can reuse an already-registered OpenTelemetry SDK while keeping OpenClaw diagnostic listeners wired. (#71450) Thanks @vincentkoc and @jlapenna. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 55890d1576f..877d530c333 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -f6d9588737310773031e744b6726ba80a9ca742205db335aae95fbd1e2925dc8 plugin-sdk-api-baseline.json -a4c86fe92b7bea538f33139e9b57cfada766b7d504323c2e20a7ca205994be44 plugin-sdk-api-baseline.jsonl +fae367b052828a57feab3bfcd10a58ebeacd6c858b337d0aab72726863952946 plugin-sdk-api-baseline.json +1bb8995e1486f7d900928aaace87421a8297fe41264197f9bf849f07c65c8f2b plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/hooks.md b/docs/plugins/hooks.md index 8475f42211d..44c252554b7 100644 --- a/docs/plugins/hooks.md +++ b/docs/plugins/hooks.md @@ -68,6 +68,7 @@ observation-only. **Conversation observation** +- `model_call_started` / `model_call_ended` — observe sanitized provider/model call metadata, timing, outcome, and bounded request-id hashes without prompt or response content - `llm_input` — observe provider input (system prompt, prompt, history) - `llm_output` — observe provider output @@ -162,6 +163,13 @@ so your plugin does not depend on a legacy combined phase. `before_agent_start` and `agent_end` include `event.runId` when OpenClaw can identify the active run. The same value is also available on `ctx.runId`. +Use `model_call_started` and `model_call_ended` for provider-call telemetry +that should not receive raw prompts, history, responses, headers, request +bodies, or provider request IDs. These hooks include stable metadata such as +`runId`, `callId`, `provider`, `model`, optional `api`/`transport`, terminal +`durationMs`/`outcome`, and `upstreamRequestIdHash` when OpenClaw can derive a +bounded provider request-id hash. + Non-bundled plugins that need `llm_input`, `llm_output`, or `agent_end` must set: ```json diff --git a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts index 8fc0c66d0b6..a432724c059 100644 --- a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts +++ b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.test.ts @@ -1,11 +1,16 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; -import { beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { onInternalDiagnosticEvent, resetDiagnosticEventsForTest, type DiagnosticEventPayload, } from "../../../infra/diagnostic-events.js"; import { createDiagnosticTraceContext } from "../../../infra/diagnostic-trace-context.js"; +import { + initializeGlobalHookRunner, + resetGlobalHookRunner, +} from "../../../plugins/hook-runner-global.js"; +import { createHookRunnerWithRegistry } from "../../../plugins/hooks.test-helpers.js"; import { wrapStreamFnWithDiagnosticModelCallEvents } from "./attempt.model-diagnostic-events.js"; async function collectModelCallEvents(run: () => Promise): Promise { @@ -33,6 +38,11 @@ async function drain(stream: AsyncIterable): Promise { describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { beforeEach(() => { resetDiagnosticEventsForTest(); + resetGlobalHookRunner(); + }); + + afterEach(() => { + resetGlobalHookRunner(); }); it("emits started and completed events for async streams", async () => { @@ -170,6 +180,79 @@ describe("wrapStreamFnWithDiagnosticModelCallEvents", () => { ]); }); + it("fires frozen sanitized model-call plugin hooks", async () => { + const started = vi.fn(); + const ended = vi.fn(); + const { registry } = createHookRunnerWithRegistry([ + { hookName: "model_call_started", handler: started }, + { hookName: "model_call_ended", handler: ended }, + ]); + initializeGlobalHookRunner(registry); + const secretChunk = "secret response with Bearer sk-test-secret-value"; + + async function* stream() { + yield { type: "text", text: secretChunk }; + } + const wrapped = wrapStreamFnWithDiagnosticModelCallEvents( + (() => stream()) as unknown as StreamFn, + { + runId: "run-1", + sessionKey: "session-key", + sessionId: "session-id", + provider: "openai", + model: "gpt-5.4", + api: "openai-responses", + transport: "http", + trace: createDiagnosticTraceContext(), + nextCallId: () => "call-hook", + }, + ); + + const events = await collectModelCallEvents(async () => { + await drain(wrapped({} as never, {} as never, {} as never) as AsyncIterable); + }); + await new Promise((resolve) => setImmediate(resolve)); + + expect(events.map((event) => event.type)).toEqual([ + "model.call.started", + "model.call.completed", + ]); + expect(started).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-1", + callId: "call-hook", + sessionKey: "session-key", + sessionId: "session-id", + provider: "openai", + model: "gpt-5.4", + api: "openai-responses", + transport: "http", + }), + expect.objectContaining({ + runId: "run-1", + sessionKey: "session-key", + sessionId: "session-id", + modelProviderId: "openai", + modelId: "gpt-5.4", + }), + ); + expect(ended).toHaveBeenCalledWith( + expect.objectContaining({ + runId: "run-1", + callId: "call-hook", + outcome: "completed", + durationMs: expect.any(Number), + }), + expect.objectContaining({ runId: "run-1" }), + ); + const startedEvent = started.mock.calls[0]?.[0]; + const startedCtx = started.mock.calls[0]?.[1]; + expect(Object.isFrozen(startedEvent)).toBe(true); + expect(Object.isFrozen(startedCtx)).toBe(true); + expect(Object.isFrozen((startedCtx as { trace?: unknown } | undefined)?.trace)).toBe(true); + expect(JSON.stringify([started.mock.calls, ended.mock.calls])).not.toContain(secretChunk); + }); + it("emits error events when stream consumption stops early", async () => { async function* stream() { yield { type: "text", text: "first" }; diff --git a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts index 6f3c02800ff..5062ef555cc 100644 --- a/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts +++ b/src/agents/pi-embedded-runner/run/attempt.model-diagnostic-events.ts @@ -1,4 +1,5 @@ import type { StreamFn } from "@mariozechner/pi-agent-core"; +import { fireAndForgetBoundedHook } from "../../../hooks/fire-and-forget.js"; import { diagnosticErrorCategory, diagnosticProviderRequestIdHash, @@ -12,6 +13,12 @@ import { freezeDiagnosticTraceContext, type DiagnosticTraceContext, } from "../../../infra/diagnostic-trace-context.js"; +import { getGlobalHookRunner } from "../../../plugins/hook-runner-global.js"; +import type { + PluginHookAgentContext, + PluginHookModelCallEndedEvent, + PluginHookModelCallStartedEvent, +} from "../../../plugins/hook-types.js"; export { diagnosticErrorCategory }; @@ -35,6 +42,10 @@ type ModelCallErrorFields = Pick< Extract, "errorCategory" | "upstreamRequestIdHash" >; +type ModelCallEndedHookFields = Pick< + PluginHookModelCallEndedEvent, + "durationMs" | "outcome" | "errorCategory" | "upstreamRequestIdHash" +>; const MODEL_CALL_STREAM_RETURN_TIMEOUT_MS = 1000; @@ -90,6 +101,102 @@ function modelCallErrorFields(err: unknown): ModelCallErrorFields { }; } +function modelCallHookEventBase(eventBase: ModelCallEventBase): PluginHookModelCallStartedEvent { + return { + runId: eventBase.runId, + callId: eventBase.callId, + ...(eventBase.sessionKey ? { sessionKey: eventBase.sessionKey } : {}), + ...(eventBase.sessionId ? { sessionId: eventBase.sessionId } : {}), + provider: eventBase.provider, + model: eventBase.model, + ...(eventBase.api ? { api: eventBase.api } : {}), + ...(eventBase.transport ? { transport: eventBase.transport } : {}), + }; +} + +function modelCallHookContext(eventBase: ModelCallEventBase): PluginHookAgentContext { + return Object.freeze({ + runId: eventBase.runId, + trace: eventBase.trace, + ...(eventBase.sessionKey ? { sessionKey: eventBase.sessionKey } : {}), + ...(eventBase.sessionId ? { sessionId: eventBase.sessionId } : {}), + modelProviderId: eventBase.provider, + modelId: eventBase.model, + }) as PluginHookAgentContext; +} + +function dispatchModelCallStartedHook(eventBase: ModelCallEventBase): void { + const hookRunner = getGlobalHookRunner(); + if (!hookRunner?.hasHooks("model_call_started")) { + return; + } + const event = Object.freeze(modelCallHookEventBase(eventBase)) as PluginHookModelCallStartedEvent; + const hookCtx = modelCallHookContext(eventBase); + fireAndForgetBoundedHook( + () => hookRunner.runModelCallStarted(event, hookCtx), + "model_call_started plugin hook failed", + ); +} + +function dispatchModelCallEndedHook( + eventBase: ModelCallEventBase, + fields: ModelCallEndedHookFields, +): void { + const hookRunner = getGlobalHookRunner(); + if (!hookRunner?.hasHooks("model_call_ended")) { + return; + } + const event = Object.freeze({ + ...modelCallHookEventBase(eventBase), + ...fields, + }) as PluginHookModelCallEndedEvent; + const hookCtx = modelCallHookContext(eventBase); + fireAndForgetBoundedHook( + () => hookRunner.runModelCallEnded(event, hookCtx), + "model_call_ended plugin hook failed", + ); +} + +function emitModelCallStarted(eventBase: ModelCallEventBase): void { + emitTrustedDiagnosticEvent({ + type: "model.call.started", + ...eventBase, + }); + dispatchModelCallStartedHook(eventBase); +} + +function emitModelCallCompleted(eventBase: ModelCallEventBase, startedAt: number): void { + const durationMs = Date.now() - startedAt; + emitTrustedDiagnosticEvent({ + type: "model.call.completed", + ...eventBase, + durationMs, + }); + dispatchModelCallEndedHook(eventBase, { + durationMs, + outcome: "completed", + }); +} + +function emitModelCallError( + eventBase: ModelCallEventBase, + startedAt: number, + fields: ModelCallErrorFields, +): void { + const durationMs = Date.now() - startedAt; + emitTrustedDiagnosticEvent({ + type: "model.call.error", + ...eventBase, + durationMs, + ...fields, + }); + dispatchModelCallEndedHook(eventBase, { + durationMs, + outcome: "error", + ...fields, + }); +} + async function safeReturnIterator(iterator: AsyncIterator): Promise { let returnResult: unknown; try { @@ -137,29 +244,15 @@ async function* observeModelCallIterator( yield next.value; } terminalEmitted = true; - emitTrustedDiagnosticEvent({ - type: "model.call.completed", - ...eventBase, - durationMs: Date.now() - startedAt, - }); + emitModelCallCompleted(eventBase, startedAt); } catch (err) { terminalEmitted = true; - emitTrustedDiagnosticEvent({ - type: "model.call.error", - ...eventBase, - durationMs: Date.now() - startedAt, - ...modelCallErrorFields(err), - }); + emitModelCallError(eventBase, startedAt, modelCallErrorFields(err)); throw err; } finally { if (!terminalEmitted) { await safeReturnIterator(iterator); - emitTrustedDiagnosticEvent({ - type: "model.call.error", - ...eventBase, - durationMs: Date.now() - startedAt, - errorCategory: "StreamAbandoned", - }); + emitModelCallError(eventBase, startedAt, { errorCategory: "StreamAbandoned" }); } } } @@ -209,11 +302,7 @@ function observeModelCallResult( startedAt, ); } - emitTrustedDiagnosticEvent({ - type: "model.call.completed", - ...eventBase, - durationMs: Date.now() - startedAt, - }); + emitModelCallCompleted(eventBase, startedAt); return result; } @@ -225,10 +314,7 @@ export function wrapStreamFnWithDiagnosticModelCallEvents( const callId = ctx.nextCallId(); const trace = freezeDiagnosticTraceContext(createChildDiagnosticTraceContext(ctx.trace)); const eventBase = baseModelCallEvent(ctx, callId, trace); - emitTrustedDiagnosticEvent({ - type: "model.call.started", - ...eventBase, - }); + emitModelCallStarted(eventBase); const startedAt = Date.now(); try { @@ -237,24 +323,14 @@ export function wrapStreamFnWithDiagnosticModelCallEvents( return result.then( (resolved) => observeModelCallResult(resolved, eventBase, startedAt), (err) => { - emitTrustedDiagnosticEvent({ - type: "model.call.error", - ...eventBase, - durationMs: Date.now() - startedAt, - ...modelCallErrorFields(err), - }); + emitModelCallError(eventBase, startedAt, modelCallErrorFields(err)); throw err; }, ); } return observeModelCallResult(result, eventBase, startedAt); } catch (err) { - emitTrustedDiagnosticEvent({ - type: "model.call.error", - ...eventBase, - durationMs: Date.now() - startedAt, - ...modelCallErrorFields(err), - }); + emitModelCallError(eventBase, startedAt, modelCallErrorFields(err)); throw err; } }) as StreamFn; diff --git a/src/plugins/hook-types.ts b/src/plugins/hook-types.ts index de0cf3ce592..d590f5f3063 100644 --- a/src/plugins/hook-types.ts +++ b/src/plugins/hook-types.ts @@ -59,6 +59,8 @@ export type PluginHookName = | "before_prompt_build" | "before_agent_start" | "before_agent_reply" + | "model_call_started" + | "model_call_ended" | "llm_input" | "llm_output" | "agent_end" @@ -90,6 +92,8 @@ export const PLUGIN_HOOK_NAMES = [ "before_prompt_build", "before_agent_start", "before_agent_reply", + "model_call_started", + "model_call_ended", "llm_input", "llm_output", "agent_end", @@ -187,6 +191,26 @@ export type PluginHookLlmInputEvent = { imagesCount: number; }; +export type PluginHookModelCallBaseEvent = { + runId: string; + callId: string; + sessionKey?: string; + sessionId?: string; + provider: string; + model: string; + api?: string; + transport?: string; +}; + +export type PluginHookModelCallStartedEvent = PluginHookModelCallBaseEvent; + +export type PluginHookModelCallEndedEvent = PluginHookModelCallBaseEvent & { + durationMs: number; + outcome: "completed" | "error"; + errorCategory?: string; + upstreamRequestIdHash?: string; +}; + export type PluginHookLlmOutputEvent = { runId: string; sessionId: string; @@ -676,6 +700,14 @@ export type PluginHookHandlerMap = { event: PluginHookBeforeAgentReplyEvent, ctx: PluginHookAgentContext, ) => Promise | PluginHookBeforeAgentReplyResult | void; + model_call_started: ( + event: PluginHookModelCallStartedEvent, + ctx: PluginHookAgentContext, + ) => Promise | void; + model_call_ended: ( + event: PluginHookModelCallEndedEvent, + ctx: PluginHookAgentContext, + ) => Promise | void; llm_input: (event: PluginHookLlmInputEvent, ctx: PluginHookAgentContext) => Promise | void; llm_output: ( event: PluginHookLlmOutputEvent, diff --git a/src/plugins/hooks.ts b/src/plugins/hooks.ts index be0ed93b3d1..30ff84a0600 100644 --- a/src/plugins/hooks.ts +++ b/src/plugins/hooks.ts @@ -29,6 +29,8 @@ import type { PluginHookBeforePromptBuildEvent, PluginHookBeforePromptBuildResult, PluginHookBeforeCompactionEvent, + PluginHookModelCallEndedEvent, + PluginHookModelCallStartedEvent, PluginHookInboundClaimContext, PluginHookInboundClaimEvent, PluginHookInboundClaimResult, @@ -85,6 +87,8 @@ export type { PluginHookBeforeModelResolveResult, PluginHookBeforePromptBuildEvent, PluginHookBeforePromptBuildResult, + PluginHookModelCallEndedEvent, + PluginHookModelCallStartedEvent, PluginHookLlmInputEvent, PluginHookLlmOutputEvent, PluginHookAgentEndEvent, @@ -588,6 +592,30 @@ export function createHookRunner( ); } + /** + * Run model_call_started hook. + * Allows plugins to observe sanitized model-call metadata. + * Runs in parallel (fire-and-forget). + */ + async function runModelCallStarted( + event: PluginHookModelCallStartedEvent, + ctx: PluginHookAgentContext, + ): Promise { + return runVoidHook("model_call_started", event, ctx); + } + + /** + * Run model_call_ended hook. + * Allows plugins to observe sanitized terminal model-call metadata. + * Runs in parallel (fire-and-forget). + */ + async function runModelCallEnded( + event: PluginHookModelCallEndedEvent, + ctx: PluginHookAgentContext, + ): Promise { + return runVoidHook("model_call_ended", event, ctx); + } + /** * Run agent_end hook. * Allows plugins to analyze completed conversations. @@ -1124,6 +1152,8 @@ export function createHookRunner( runBeforePromptBuild, runBeforeAgentStart, runBeforeAgentReply, + runModelCallStarted, + runModelCallEnded, runLlmInput, runLlmOutput, runAgentEnd, diff --git a/src/plugins/wired-hooks-llm.test.ts b/src/plugins/wired-hooks-llm.test.ts index 133acb5ddfa..90ce6b40280 100644 --- a/src/plugins/wired-hooks-llm.test.ts +++ b/src/plugins/wired-hooks-llm.test.ts @@ -7,14 +7,24 @@ const hookCtx = { }; async function expectLlmHookCall(params: { - hookName: "llm_input" | "llm_output"; + hookName: "model_call_started" | "model_call_ended" | "llm_input" | "llm_output"; event: Record; expectedEvent: Record; }) { const handler = vi.fn(); const { runner } = createHookRunnerWithRegistry([{ hookName: params.hookName, handler }]); - if (params.hookName === "llm_input") { + if (params.hookName === "model_call_started") { + await runner.runModelCallStarted( + params.event as Parameters[0], + hookCtx, + ); + } else if (params.hookName === "model_call_ended") { + await runner.runModelCallEnded( + params.event as Parameters[0], + hookCtx, + ); + } else if (params.hookName === "llm_input") { await runner.runLlmInput( { ...params.event, @@ -40,6 +50,38 @@ async function expectLlmHookCall(params: { describe("llm hook runner methods", () => { it.each([ + { + name: "runModelCallStarted invokes registered model_call_started hooks", + hookName: "model_call_started" as const, + methodName: "runModelCallStarted" as const, + event: { + runId: "run-1", + callId: "call-1", + sessionId: "session-1", + provider: "openai", + model: "gpt-5", + api: "openai-responses", + transport: "http", + }, + expectedEvent: { runId: "run-1", callId: "call-1", provider: "openai" }, + }, + { + name: "runModelCallEnded invokes registered model_call_ended hooks", + hookName: "model_call_ended" as const, + methodName: "runModelCallEnded" as const, + event: { + runId: "run-1", + callId: "call-1", + sessionId: "session-1", + provider: "openai", + model: "gpt-5", + durationMs: 42, + outcome: "error", + errorCategory: "TimeoutError", + upstreamRequestIdHash: "sha256:abcdef123456", + }, + expectedEvent: { runId: "run-1", callId: "call-1", outcome: "error" }, + }, { name: "runLlmInput invokes registered llm_input hooks", hookName: "llm_input" as const, @@ -80,8 +122,13 @@ describe("llm hook runner methods", () => { }); it("hasHooks returns true for registered llm hooks", () => { - const { runner } = createHookRunnerWithRegistry([{ hookName: "llm_input", handler: vi.fn() }]); + const { runner } = createHookRunnerWithRegistry([ + { hookName: "model_call_started", handler: vi.fn() }, + { hookName: "llm_input", handler: vi.fn() }, + ]); + expect(runner.hasHooks("model_call_started")).toBe(true); + expect(runner.hasHooks("model_call_ended")).toBe(false); expect(runner.hasHooks("llm_input")).toBe(true); expect(runner.hasHooks("llm_output")).toBe(false); });