From aca844014f2e6df3563629a1660bf651e040d869 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 6 May 2026 00:27:41 -0700 Subject: [PATCH] fix(diagnostics): export talk and recovery metrics --- CHANGELOG.md | 1 + docs/gateway/opentelemetry.md | 22 +++- docs/gateway/prometheus.md | 7 ++ .../diagnostics-otel/src/service.test.ts | 112 ++++++++++++++++++ extensions/diagnostics-otel/src/service.ts | 38 ++++++ .../src/service.test.ts | 74 ++++++++++++ .../diagnostics-prometheus/src/service.ts | 69 +++++++++++ extensions/google-meet/src/realtime-node.ts | 18 +-- extensions/google-meet/src/realtime.ts | 37 +++--- extensions/voice-call/src/media-stream.ts | 20 ++-- .../src/webhook/realtime-handler.ts | 18 +-- src/gateway/talk-handoff.ts | 18 +-- src/gateway/talk-realtime-relay.ts | 18 +-- src/gateway/talk-transcription-relay.ts | 18 +-- src/infra/diagnostic-events.ts | 18 +++ src/plugin-sdk/realtime-voice.ts | 2 + src/talk/diagnostics.test.ts | 72 +++++++++++ src/talk/diagnostics.ts | 51 ++++++++ src/talk/talk-session-controller.test.ts | 22 ++++ src/talk/talk-session-controller.ts | 13 +- 20 files changed, 584 insertions(+), 64 deletions(-) create mode 100644 src/talk/diagnostics.test.ts create mode 100644 src/talk/diagnostics.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index db8875d6fe1..8e3449bc7d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai - PR triage: mark external pull requests with `proof: supplied` when Barnacle finds structured real behavior proof, keep stale negative proof labels in sync across CRLF-edited PR bodies, and let ClawSweeper own the stronger `proof: sufficient` judgement. - Sessions CLI: show the selected agent runtime in the `openclaw sessions` table so terminal output matches the runtime visibility already present in JSON/status surfaces. Thanks @vincentkoc. - Talk/voice: unify realtime relay, transcription relay, managed-room handoff, Voice Call, Google Meet, VoiceClaw, and native clients around a shared Talk session controller and add the Gateway-managed `talk.session.*` RPC surface. +- Diagnostics/Talk: export bounded Talk lifecycle/audio metrics and session recovery metrics through OpenTelemetry and Prometheus without exposing transcripts, audio payloads, room ids, turn ids, or session ids. - Google Meet/Voice Call: make Twilio dial-in joins speak through the realtime Gemini voice bridge with paced audio streaming, backpressure-aware buffering, barge-in queue clearing, same-session agent consult routing, duplicate-consult coalescing, and no TwiML fallback during realtime speech, giving Meet participants a much snappier OpenClaw voice agent. (#77064) Thanks @scoootscooob. - Voice Call/realtime: add opt-in OpenClaw agent voice context capsules and consult-cadence guidance so Gemini/OpenAI realtime calls can sound like the configured agent without consulting the full agent on every ordinary turn. Thanks @scoootscooob. - Docker/Gateway: harden the gateway container by dropping `NET_RAW` and `NET_ADMIN` capabilities and enabling `no-new-privileges` in the bundled `docker-compose.yml`. Thanks @VintageAyu. diff --git a/docs/gateway/opentelemetry.md b/docs/gateway/opentelemetry.md index 13731697ee6..0952269f684 100644 --- a/docs/gateway/opentelemetry.md +++ b/docs/gateway/opentelemetry.md @@ -70,11 +70,11 @@ openclaw plugins enable diagnostics-otel ## Signals exported -| Signal | What goes in it | -| ----------- | ------------------------------------------------------------------------------------------------------------------------------------------ | -| **Metrics** | Counters and histograms for token usage, cost, run duration, message flow, queue lanes, session state, exec, and memory pressure. | -| **Traces** | Spans for model usage, model calls, harness lifecycle, tool execution, exec, webhook/message processing, context assembly, and tool loops. | -| **Logs** | Structured `logging.file` records exported over OTLP when `diagnostics.otel.logs` is enabled. | +| Signal | What goes in it | +| ----------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- | +| **Metrics** | Counters and histograms for token usage, cost, run duration, message flow, Talk events, queue lanes, session state/recovery, exec, and memory pressure. | +| **Traces** | Spans for model usage, model calls, harness lifecycle, tool execution, exec, webhook/message processing, context assembly, and tool loops. | +| **Logs** | Structured `logging.file` records exported over OTLP when `diagnostics.otel.logs` is enabled. | Toggle `traces`, `metrics`, and `logs` independently. All three default to on when `diagnostics.otel.enabled` is true. @@ -129,6 +129,9 @@ Raw model/tool content is **not** exported by default. Spans carry bounded identifiers (channel, provider, model, error category, hash-only request ids) and never include prompt text, response text, tool inputs, tool outputs, or session keys. +Talk metrics export only bounded event metadata such as mode, transport, +provider, and event type. They do not include transcripts, audio payloads, +session ids, turn ids, call ids, room ids, or handoff tokens. Outbound model requests may include a W3C `traceparent` header. That header is generated only from OpenClaw-owned diagnostic trace context for the active model @@ -191,6 +194,12 @@ When any subkey is enabled, model and tool spans get bounded, redacted - `openclaw.message.delivery.started` (counter, attrs: `openclaw.channel`, `openclaw.delivery.kind`) - `openclaw.message.delivery.duration_ms` (histogram, attrs: `openclaw.channel`, `openclaw.delivery.kind`, `openclaw.outcome`, `openclaw.errorCategory`) +### Talk + +- `openclaw.talk.event` (counter, attrs: `openclaw.talk.event_type`, `openclaw.talk.mode`, `openclaw.talk.transport`, `openclaw.talk.brain`, `openclaw.talk.provider`) +- `openclaw.talk.event.duration_ms` (histogram, attrs: same as `openclaw.talk.event`; emitted when a Talk event reports duration) +- `openclaw.talk.audio.bytes` (histogram, attrs: same as `openclaw.talk.event`; emitted for Talk audio frame events that report byte length) + ### Queues and sessions - `openclaw.queue.lane.enqueue` (counter, attrs: `openclaw.lane`) @@ -200,6 +209,9 @@ When any subkey is enabled, model and tool spans get bounded, redacted - `openclaw.session.state` (counter, attrs: `openclaw.state`, `openclaw.reason`) - `openclaw.session.stuck` (counter, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work) - `openclaw.session.stuck_age_ms` (histogram, attrs: `openclaw.state`; emitted only for stale session bookkeeping with no active work) +- `openclaw.session.recovery.requested` (counter, attrs: `openclaw.state`, `openclaw.action`, `openclaw.active_work_kind`, `openclaw.reason`) +- `openclaw.session.recovery.completed` (counter, attrs: `openclaw.state`, `openclaw.action`, `openclaw.status`, `openclaw.active_work_kind`, `openclaw.reason`) +- `openclaw.session.recovery.age_ms` (histogram, attrs: same as the matching recovery counter) - `openclaw.run.attempt` (counter, attrs: `openclaw.attempt`) ### Session liveness telemetry diff --git a/docs/gateway/prometheus.md b/docs/gateway/prometheus.md index b256c1683bb..c7de5dd486f 100644 --- a/docs/gateway/prometheus.md +++ b/docs/gateway/prometheus.md @@ -102,12 +102,18 @@ For traces, logs, OTLP push, and OpenTelemetry GenAI semantic attributes, see [O | `openclaw_harness_run_duration_seconds` | histogram | `channel`, `error_category`, `harness`, `model`, `outcome`, `phase`, `plugin`, `provider` | | `openclaw_message_processed_total` | counter | `channel`, `outcome`, `reason` | | `openclaw_message_processed_duration_seconds` | histogram | `channel`, `outcome`, `reason` | +| `openclaw_message_delivery_started_total` | counter | `channel`, `delivery_kind` | | `openclaw_message_delivery_total` | counter | `channel`, `delivery_kind`, `error_category`, `outcome` | | `openclaw_message_delivery_duration_seconds` | histogram | `channel`, `delivery_kind`, `error_category`, `outcome` | +| `openclaw_talk_event_total` | counter | `brain`, `event_type`, `mode`, `provider`, `transport` | +| `openclaw_talk_event_duration_seconds` | histogram | `brain`, `event_type`, `mode`, `provider`, `transport` | +| `openclaw_talk_audio_bytes` | histogram | `brain`, `event_type`, `mode`, `provider`, `transport` | | `openclaw_queue_lane_size` | gauge | `lane` | | `openclaw_queue_lane_wait_seconds` | histogram | `lane` | | `openclaw_session_state_total` | counter | `reason`, `state` | | `openclaw_session_queue_depth` | gauge | `state` | +| `openclaw_session_recovery_total` | counter | `action`, `active_work_kind`, `state`, `status` | +| `openclaw_session_recovery_age_seconds` | histogram | `action`, `active_work_kind`, `state`, `status` | | `openclaw_memory_bytes` | gauge | `kind` | | `openclaw_memory_rss_bytes` | histogram | none | | `openclaw_memory_pressure_total` | counter | `level`, `reason` | @@ -131,6 +137,7 @@ For traces, logs, OTLP push, and OpenTelemetry GenAI semantic attributes, see [O - prompt text, response text, tool inputs, tool outputs, system prompts + - Talk transcripts, audio payloads, call ids, room ids, handoff tokens, turn ids, and raw session ids - raw provider request IDs (only bounded hashes, where applicable, on spans — never on metrics) - session keys and session IDs - hostnames, file paths, secret values diff --git a/extensions/diagnostics-otel/src/service.test.ts b/extensions/diagnostics-otel/src/service.test.ts index 7b486a06886..d77783a899c 100644 --- a/extensions/diagnostics-otel/src/service.test.ts +++ b/extensions/diagnostics-otel/src/service.test.ts @@ -2478,6 +2478,118 @@ describe("diagnostics-otel service", () => { await service.stop?.(ctx); }); + test("exports session recovery and talk metrics with bounded attributes", async () => { + const service = createDiagnosticsOtelService(); + const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { metrics: true }); + await service.start(ctx); + + emitTrustedDiagnosticEvent({ + type: "session.recovery.requested", + sessionId: "session-should-not-export", + sessionKey: "key-should-not-export", + state: "processing", + ageMs: 12_000, + reason: "startup-sweep", + activeWorkKind: "tool_call", + allowActiveAbort: true, + }); + emitTrustedDiagnosticEvent({ + type: "session.recovery.completed", + sessionId: "session-should-not-export", + sessionKey: "key-should-not-export", + state: "processing", + ageMs: 13_000, + reason: "startup-sweep", + activeWorkKind: "tool_call", + status: "released", + action: "abort-active-run", + }); + emitTrustedDiagnosticEvent({ + type: "talk.event", + sessionId: "talk-session-should-not-export", + turnId: "turn-should-not-export", + talkEventType: "input.audio.delta", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + byteLength: 320, + }); + emitTrustedDiagnosticEvent({ + type: "talk.event", + sessionId: "talk-session-should-not-export", + talkEventType: "latency.metrics", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + durationMs: 45, + }); + await flushDiagnosticEvents(); + + expect( + telemetryState.counters.get("openclaw.session.recovery.requested")?.add, + ).toHaveBeenCalledWith( + 1, + expect.objectContaining({ + "openclaw.state": "processing", + "openclaw.action": "abort", + "openclaw.active_work_kind": "tool_call", + }), + ); + expect( + telemetryState.counters.get("openclaw.session.recovery.completed")?.add, + ).toHaveBeenCalledWith( + 1, + expect.objectContaining({ + "openclaw.state": "processing", + "openclaw.status": "released", + "openclaw.action": "abort-active-run", + }), + ); + expect( + telemetryState.histograms.get("openclaw.session.recovery.age_ms")?.record, + ).toHaveBeenCalledWith( + 13_000, + expect.objectContaining({ + "openclaw.status": "released", + }), + ); + expect(telemetryState.counters.get("openclaw.talk.event")?.add).toHaveBeenCalledWith(1, { + "openclaw.talk.brain": "agent-consult", + "openclaw.talk.event_type": "input.audio.delta", + "openclaw.talk.mode": "realtime", + "openclaw.talk.provider": "openai", + "openclaw.talk.transport": "gateway-relay", + }); + expect(telemetryState.histograms.get("openclaw.talk.audio.bytes")?.record).toHaveBeenCalledWith( + 320, + { + "openclaw.talk.brain": "agent-consult", + "openclaw.talk.event_type": "input.audio.delta", + "openclaw.talk.mode": "realtime", + "openclaw.talk.provider": "openai", + "openclaw.talk.transport": "gateway-relay", + }, + ); + expect( + telemetryState.histograms.get("openclaw.talk.event.duration_ms")?.record, + ).toHaveBeenCalledWith(45, { + "openclaw.talk.brain": "agent-consult", + "openclaw.talk.event_type": "latency.metrics", + "openclaw.talk.mode": "realtime", + "openclaw.talk.provider": "openai", + "openclaw.talk.transport": "gateway-relay", + }); + + const talkCounterCalls = JSON.stringify( + telemetryState.counters.get("openclaw.talk.event")?.add.mock.calls, + ); + expect(talkCounterCalls).not.toContain("talk-session-should-not-export"); + expect(talkCounterCalls).not.toContain("turn-should-not-export"); + await service.stop?.(ctx); + }); + test("does not export model or tool content unless capture is explicitly enabled", async () => { const service = createDiagnosticsOtelService(); const ctx = createOtelContext(OTEL_TEST_ENDPOINT, { traces: true, metrics: true }); diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index 43f975f35f4..7a6bc84f062 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -95,6 +95,7 @@ type SessionRecoveryDiagnosticEvent = Extract< DiagnosticEventPayload, { type: "session.recovery.requested" | "session.recovery.completed" } >; +type TalkDiagnosticEvent = Extract; const NO_CONTENT_CAPTURE: OtelContentCapturePolicy = { inputMessages: false, @@ -844,6 +845,18 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { description: "Age of sessions selected for recovery", }, ); + const talkEventCounter = meter.createCounter("openclaw.talk.event", { + unit: "1", + description: "Talk events emitted by type", + }); + const talkEventDurationHistogram = meter.createHistogram("openclaw.talk.event.duration_ms", { + unit: "ms", + description: "Talk event duration when reported", + }); + const talkAudioBytesHistogram = meter.createHistogram("openclaw.talk.audio.bytes", { + unit: "By", + description: "Talk audio frame byte lengths", + }); const runAttemptCounter = meter.createCounter("openclaw.run.attempt", { unit: "1", description: "Run attempts", @@ -1526,6 +1539,28 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { sessionRecoveryAgeHistogram.record(evt.ageMs, attrs); }; + const talkEventAttrs = (evt: TalkDiagnosticEvent): Record => ({ + "openclaw.talk.brain": lowCardinalityAttr(evt.brain), + "openclaw.talk.event_type": lowCardinalityAttr(evt.talkEventType), + "openclaw.talk.mode": lowCardinalityAttr(evt.mode), + "openclaw.talk.provider": lowCardinalityAttr(evt.provider), + "openclaw.talk.transport": lowCardinalityAttr(evt.transport), + }); + + const recordTalkEvent = (evt: TalkDiagnosticEvent, metadata: DiagnosticEventMetadata) => { + if (!metadata.trusted) { + return; + } + const attrs = talkEventAttrs(evt); + talkEventCounter.add(1, attrs); + if (typeof evt.durationMs === "number") { + talkEventDurationHistogram.record(evt.durationMs, attrs); + } + if (typeof evt.byteLength === "number") { + talkAudioBytesHistogram.record(evt.byteLength, attrs); + } + }; + const recordRunAttempt = (evt: Extract) => { runAttemptCounter.add(1, { "openclaw.attempt": evt.attempt }); }; @@ -2283,6 +2318,9 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { case "message.delivery.error": recordMessageDeliveryError(evt); return; + case "talk.event": + recordTalkEvent(evt, metadata); + return; case "queue.lane.enqueue": recordLaneEnqueue(evt); return; diff --git a/extensions/diagnostics-prometheus/src/service.test.ts b/extensions/diagnostics-prometheus/src/service.test.ts index f1530a95ead..1bd10ddee1c 100644 --- a/extensions/diagnostics-prometheus/src/service.test.ts +++ b/extensions/diagnostics-prometheus/src/service.test.ts @@ -90,6 +90,17 @@ describe("diagnostics-prometheus service", () => { it("bounds messaging labels without exporting raw chat identifiers", () => { const store = __test__.createPrometheusMetricStore(); + __test__.recordDiagnosticEvent( + store, + { + ...baseEvent(), + type: "message.delivery.started", + channel: "matrix", + deliveryKind: "text", + sessionKey: "session-should-not-export", + }, + trusted, + ); __test__.recordDiagnosticEvent( store, { @@ -119,6 +130,9 @@ describe("diagnostics-prometheus service", () => { const rendered = __test__.renderPrometheusMetrics(store); + expect(rendered).toContain( + 'openclaw_message_delivery_started_total{channel="matrix",delivery_kind="text"} 1', + ); expect(rendered).toContain( 'openclaw_message_processed_total{channel="unknown",outcome="completed",reason="none"} 1', ); @@ -127,9 +141,69 @@ describe("diagnostics-prometheus service", () => { ); expect(rendered).not.toContain("chat-should-not-export"); expect(rendered).not.toContain("message-should-not-export"); + expect(rendered).not.toContain("session-should-not-export"); expect(rendered).not.toContain("progress draft"); }); + it("records session recovery and talk metrics without exporting raw ids or content", () => { + const store = __test__.createPrometheusMetricStore(); + + __test__.recordDiagnosticEvent( + store, + { + ...baseEvent(), + type: "session.recovery.completed", + sessionId: "session-should-not-export", + sessionKey: "key-should-not-export", + state: "processing", + stateGeneration: 2, + ageMs: 12_000, + queueDepth: 1, + reason: "startup-sweep", + activeWorkKind: "tool_call", + allowActiveAbort: true, + status: "released", + action: "abort-active-run", + }, + trusted, + ); + __test__.recordDiagnosticEvent( + store, + { + ...baseEvent(), + type: "talk.event", + sessionId: "talk-session-should-not-export", + turnId: "turn-should-not-export", + talkEventType: "input.audio.delta", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + byteLength: 320, + }, + trusted, + ); + + const rendered = __test__.renderPrometheusMetrics(store); + + expect(rendered).toContain( + 'openclaw_session_recovery_total{action="abort-active-run",active_work_kind="tool_call",state="processing",status="released"} 1', + ); + expect(rendered).toContain( + 'openclaw_session_recovery_age_seconds_sum{action="abort-active-run",active_work_kind="tool_call",state="processing",status="released"} 12', + ); + expect(rendered).toContain( + 'openclaw_talk_event_total{brain="agent-consult",event_type="input.audio.delta",mode="realtime",provider="openai",transport="gateway-relay"} 1', + ); + expect(rendered).toContain( + 'openclaw_talk_audio_bytes_sum{brain="agent-consult",event_type="input.audio.delta",mode="realtime",provider="openai",transport="gateway-relay"} 320', + ); + expect(rendered).not.toContain("session-should-not-export"); + expect(rendered).not.toContain("key-should-not-export"); + expect(rendered).not.toContain("talk-session-should-not-export"); + expect(rendered).not.toContain("turn-should-not-export"); + }); + it("caps metric series growth and reports dropped series", () => { const store = __test__.createPrometheusMetricStore(); diff --git a/extensions/diagnostics-prometheus/src/service.ts b/extensions/diagnostics-prometheus/src/service.ts index fea4dd6e1fd..1fb75e171d7 100644 --- a/extensions/diagnostics-prometheus/src/service.ts +++ b/extensions/diagnostics-prometheus/src/service.ts @@ -351,6 +351,35 @@ function harnessLabels(evt: { }; } +function sessionRecoveryLabels( + evt: Extract< + DiagnosticEventPayload, + { type: "session.recovery.requested" | "session.recovery.completed" } + >, +): LabelSet { + return { + action: + evt.type === "session.recovery.completed" + ? lowCardinalityLabel(evt.action, "unknown") + : evt.allowActiveAbort + ? "abort" + : "recover", + active_work_kind: lowCardinalityLabel(evt.activeWorkKind, "none"), + state: evt.state, + status: evt.type === "session.recovery.completed" ? evt.status : "requested", + }; +} + +function talkLabels(evt: Extract): LabelSet { + return { + brain: lowCardinalityLabel(evt.brain), + event_type: lowCardinalityLabel(evt.talkEventType), + mode: lowCardinalityLabel(evt.mode), + provider: lowCardinalityLabel(evt.provider), + transport: lowCardinalityLabel(evt.transport), + }; +} + function recordModelUsage( store: PrometheusMetricStore, evt: Extract, @@ -497,6 +526,16 @@ function recordDiagnosticEvent( seconds(evt.durationMs), ); return; + case "message.delivery.started": + store.counter( + "openclaw_message_delivery_started_total", + "Outbound message delivery attempts started.", + { + channel: lowCardinalityLabel(evt.channel), + delivery_kind: lowCardinalityLabel(evt.deliveryKind, "other"), + }, + ); + return; case "message.delivery.completed": case "message.delivery.error": store.counter( @@ -527,6 +566,36 @@ function recordDiagnosticEvent( seconds(evt.durationMs), ); return; + case "talk.event": + store.counter("openclaw_talk_event_total", "Talk events emitted by type.", talkLabels(evt)); + store.histogram( + "openclaw_talk_event_duration_seconds", + "Talk event duration in seconds when reported.", + talkLabels(evt), + seconds(evt.durationMs), + ); + store.histogram( + "openclaw_talk_audio_bytes", + "Talk audio frame byte lengths.", + talkLabels(evt), + numericValue(evt.byteLength), + BYTE_BUCKETS, + ); + return; + case "session.recovery.requested": + case "session.recovery.completed": + store.counter( + "openclaw_session_recovery_total", + "Session recovery observations by status and action.", + sessionRecoveryLabels(evt), + ); + store.histogram( + "openclaw_session_recovery_age_seconds", + "Age of sessions selected for recovery in seconds.", + sessionRecoveryLabels(evt), + seconds(evt.ageMs), + ); + return; case "queue.lane.enqueue": case "queue.lane.dequeue": store.gauge( diff --git a/extensions/google-meet/src/realtime-node.ts b/extensions/google-meet/src/realtime-node.ts index c1ddb092313..e433ae405a4 100644 --- a/extensions/google-meet/src/realtime-node.ts +++ b/extensions/google-meet/src/realtime-node.ts @@ -9,6 +9,7 @@ import { createRealtimeVoiceAgentTalkbackQueue, createTalkSessionController, createRealtimeVoiceBridgeSession, + recordTalkDiagnosticEvent, type RealtimeVoiceAgentTalkbackQueue, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderPlugin, @@ -359,13 +360,16 @@ export async function startNodeRealtimeAudioBridge(params: { const transcript: GoogleMeetRealtimeTranscriptEntry[] = []; const realtimeEvents: GoogleMeetRealtimeEventEntry[] = []; const strategy = params.config.realtime.strategy; - const talk: TalkSessionController = createTalkSessionController({ - sessionId: `google-meet:${params.meetingSessionId}:${params.bridgeId}:node-realtime`, - mode: "realtime", - transport: "gateway-relay", - brain: strategy === "bidi" ? "direct-tools" : "agent-consult", - provider: resolved.provider.id, - }); + const talk: TalkSessionController = createTalkSessionController( + { + sessionId: `google-meet:${params.meetingSessionId}:${params.bridgeId}:node-realtime`, + mode: "realtime", + transport: "gateway-relay", + brain: strategy === "bidi" ? "direct-tools" : "agent-consult", + provider: resolved.provider.id, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); const recentTalkEvents: TalkEvent[] = []; const rememberTalkEvent = (event: TalkEvent | undefined): void => { if (event) { diff --git a/extensions/google-meet/src/realtime.ts b/extensions/google-meet/src/realtime.ts index 61110534817..a210d682bc3 100644 --- a/extensions/google-meet/src/realtime.ts +++ b/extensions/google-meet/src/realtime.ts @@ -23,6 +23,7 @@ import { REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ, REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, recordRealtimeVoiceBridgeEvent, + recordTalkDiagnosticEvent, recordRealtimeVoiceTranscript, resamplePcm, resolveConfiguredRealtimeVoiceProvider, @@ -485,14 +486,17 @@ export async function startCommandAgentAudioBridge(params: { fullConfig: params.fullConfig, providers: params.providers, }); - const talk = createTalkSessionController({ - sessionId: `google-meet:${params.meetingSessionId}:agent`, - mode: "stt-tts", - transport: "gateway-relay", - brain: "agent-consult", - provider: resolved.provider.id, - turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`, - }); + const talk = createTalkSessionController( + { + sessionId: `google-meet:${params.meetingSessionId}:agent`, + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: resolved.provider.id, + turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); const recentTalkEvents: TalkEvent[] = []; const emitTalkEvent = (input: TalkEventInput) => pushGoogleMeetTalkEvent(recentTalkEvents, talk.emit(input)); @@ -1034,13 +1038,16 @@ export async function startCommandRealtimeAudioBridge(params: { ); const transcript: GoogleMeetRealtimeTranscriptEntry[] = []; const realtimeEvents: GoogleMeetRealtimeEventEntry[] = []; - const talk: TalkSessionController = createTalkSessionController({ - sessionId: `google-meet:${params.meetingSessionId}:command-realtime`, - mode: "realtime", - transport: "gateway-relay", - brain: strategy === "bidi" ? "direct-tools" : "agent-consult", - provider: resolved.provider.id, - }); + const talk: TalkSessionController = createTalkSessionController( + { + sessionId: `google-meet:${params.meetingSessionId}:command-realtime`, + mode: "realtime", + transport: "gateway-relay", + brain: strategy === "bidi" ? "direct-tools" : "agent-consult", + provider: resolved.provider.id, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); const recentTalkEvents: TalkEvent[] = []; const rememberTalkEvent = (event: TalkEvent | undefined): void => { if (event) { diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index 193b1abde22..215c121305e 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -16,6 +16,7 @@ import type { } from "openclaw/plugin-sdk/realtime-transcription"; import { createTalkSessionController, + recordTalkDiagnosticEvent, type TalkEvent, type TalkEventInput, type TalkSessionController, @@ -784,14 +785,17 @@ export class MediaStreamHandler { } private createTalkEvents(callId: string, streamSid: string): TalkSessionController { - return createTalkSessionController({ - sessionId: `voice-call:${callId}:${streamSid}`, - mode: "stt-tts", - transport: "gateway-relay", - brain: "agent-consult", - provider: this.config.transcriptionProvider.id, - turnIdPrefix: `${streamSid}:turn`, - }); + return createTalkSessionController( + { + sessionId: `voice-call:${callId}:${streamSid}`, + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: this.config.transcriptionProvider.id, + turnIdPrefix: `${streamSid}:turn`, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); } private emitTalkEvent(session: StreamSession, input: TalkEventInput): void { diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index ecac126dd94..b9dcfd697e2 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -7,6 +7,7 @@ import { createTalkSessionController, createRealtimeVoiceBridgeSession, REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + recordTalkDiagnosticEvent, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderConfig, type RealtimeVoiceProviderPlugin, @@ -507,13 +508,16 @@ export class RealtimeCallHandler { const { callId, initialGreetingInstructions } = registration; const callRecord = this.manager.getCallByProviderCallId(callSid); - const talk: TalkSessionController = createTalkSessionController({ - sessionId: `voice-call:${callId}:realtime`, - mode: "realtime", - transport: "gateway-relay", - brain: "agent-consult", - provider: this.realtimeProvider.id, - }); + const talk: TalkSessionController = createTalkSessionController( + { + sessionId: `voice-call:${callId}:realtime`, + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: this.realtimeProvider.id, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); const rememberTalkEvent = (event: TalkEvent | undefined): TalkEvent | undefined => { if (event) { appendRecentTalkEventMetadata(callRecord, event); diff --git a/src/gateway/talk-handoff.ts b/src/gateway/talk-handoff.ts index 66a33276646..96629050fd1 100644 --- a/src/gateway/talk-handoff.ts +++ b/src/gateway/talk-handoff.ts @@ -1,4 +1,5 @@ import { createHash, randomBytes, randomUUID } from "node:crypto"; +import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; import { createTalkSessionController, type TalkBrain, @@ -319,13 +320,16 @@ function createTalkHandoffRoom(params: { provider?: string; }): TalkHandoffRoomState { return { - talk: createTalkSessionController({ - sessionId: params.roomId, - mode: params.mode, - transport: params.transport, - brain: params.brain, - provider: params.provider, - }), + talk: createTalkSessionController( + { + sessionId: params.roomId, + mode: params.mode, + transport: params.transport, + brain: params.brain, + provider: params.provider, + }, + { onEvent: recordTalkDiagnosticEvent }, + ), }; } diff --git a/src/gateway/talk-realtime-relay.ts b/src/gateway/talk-realtime-relay.ts index f90a784d53d..401d9ed068c 100644 --- a/src/gateway/talk-realtime-relay.ts +++ b/src/gateway/talk-realtime-relay.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js"; +import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; import { REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, type RealtimeVoiceBrowserAudioContract, @@ -160,13 +161,16 @@ export function createTalkRealtimeRelaySession( enforceRelaySessionLimits(params.connId); const relaySessionId = randomUUID(); const expiresAtMs = Date.now() + RELAY_SESSION_TTL_MS; - const talk = createTalkSessionController({ - sessionId: relaySessionId, - mode: "realtime", - transport: "gateway-relay", - brain: "agent-consult", - provider: params.provider.id, - }); + const talk = createTalkSessionController( + { + sessionId: relaySessionId, + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: params.provider.id, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); let relay: RelaySession | undefined; const emit = (event: TalkRealtimeRelayEventPayload, talkEvent?: TalkEventInput) => broadcastToOwner(params.context, params.connId, { diff --git a/src/gateway/talk-transcription-relay.ts b/src/gateway/talk-transcription-relay.ts index 0d195b6f397..d90863b9b5c 100644 --- a/src/gateway/talk-transcription-relay.ts +++ b/src/gateway/talk-transcription-relay.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import type { RealtimeTranscriptionProviderPlugin } from "../plugins/types.js"; import type { RealtimeTranscriptionProviderConfig } from "../realtime-transcription/provider-types.js"; +import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; import { type TalkEvent, type TalkEventInput, @@ -138,13 +139,16 @@ export function createTalkTranscriptionRelaySession( enforceTranscriptionSessionLimits(params.connId); const transcriptionSessionId = randomUUID(); const expiresAtMs = Date.now() + TRANSCRIPTION_SESSION_TTL_MS; - const talk = createTalkSessionController({ - sessionId: transcriptionSessionId, - mode: "transcription", - transport: "gateway-relay", - brain: "none", - provider: params.provider.id, - }); + const talk = createTalkSessionController( + { + sessionId: transcriptionSessionId, + mode: "transcription", + transport: "gateway-relay", + brain: "none", + provider: params.provider.id, + }, + { onEvent: recordTalkDiagnosticEvent }, + ); let relay: TranscriptionRelaySession | undefined; const emit = (event: TalkTranscriptionRelayEventPayload, talkEvent?: TalkEventInput): void => { broadcastToOwner(params.context, params.connId, { diff --git a/src/infra/diagnostic-events.ts b/src/infra/diagnostic-events.ts index 113670d2879..5653525244f 100644 --- a/src/infra/diagnostic-events.ts +++ b/src/infra/diagnostic-events.ts @@ -1,4 +1,5 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { TalkBrain, TalkEventType, TalkMode, TalkTransport } from "../talk/talk-events.js"; import { formatDiagnosticTraceparent, getActiveDiagnosticTraceContext, @@ -114,6 +115,21 @@ export type DiagnosticMessageDeliveryErrorEvent = DiagnosticMessageDeliveryBaseE errorCategory: string; }; +export type DiagnosticTalkEvent = DiagnosticBaseEvent & { + type: "talk.event"; + sessionId?: string; + turnId?: string; + captureId?: string; + talkEventType: TalkEventType; + mode: TalkMode; + transport: TalkTransport; + brain: TalkBrain; + provider?: string; + final?: boolean; + durationMs?: number; + byteLength?: number; +}; + export type DiagnosticSessionStateEvent = DiagnosticBaseEvent & { type: "session.state"; sessionKey?: string; @@ -548,6 +564,7 @@ export type DiagnosticEventPayload = | DiagnosticMessageDeliveryStartedEvent | DiagnosticMessageDeliveryCompletedEvent | DiagnosticMessageDeliveryErrorEvent + | DiagnosticTalkEvent | DiagnosticSessionStateEvent | DiagnosticSessionLongRunningEvent | DiagnosticSessionStalledEvent @@ -623,6 +640,7 @@ const ASYNC_DIAGNOSTIC_EVENT_TYPES = new Set([ "message.delivery.started", "message.delivery.completed", "message.delivery.error", + "talk.event", "model.call.started", "model.call.completed", "model.call.error", diff --git a/src/plugin-sdk/realtime-voice.ts b/src/plugin-sdk/realtime-voice.ts index 0e563c39e5a..efaef026cd9 100644 --- a/src/plugin-sdk/realtime-voice.ts +++ b/src/plugin-sdk/realtime-voice.ts @@ -35,10 +35,12 @@ export { type TalkMode, type TalkTransport, } from "../talk/talk-events.js"; +export { createTalkDiagnosticEvent, recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; export { createTalkSessionController, normalizeTalkTransport, type TalkEnsureTurnResult, + type TalkSessionControllerOptions, type TalkSessionController, type TalkSessionControllerParams, type TalkTurnFailure, diff --git a/src/talk/diagnostics.test.ts b/src/talk/diagnostics.test.ts new file mode 100644 index 00000000000..9a175e99f1f --- /dev/null +++ b/src/talk/diagnostics.test.ts @@ -0,0 +1,72 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + onInternalDiagnosticEvent, + resetDiagnosticEventsForTest, + type DiagnosticEventPayload, +} from "../infra/diagnostic-events.js"; +import { createTalkDiagnosticEvent, recordTalkDiagnosticEvent } from "./diagnostics.js"; +import { createTalkEventSequencer } from "./talk-events.js"; + +describe("talk diagnostics", () => { + beforeEach(() => { + resetDiagnosticEventsForTest(); + }); + + afterEach(() => { + resetDiagnosticEventsForTest(); + }); + + it("maps talk events to bounded diagnostic events without payload content", async () => { + const diagnostics: Array<{ event: DiagnosticEventPayload; trusted: boolean }> = []; + onInternalDiagnosticEvent((event, metadata) => { + diagnostics.push({ event, trusted: metadata.trusted }); + }); + const events = createTalkEventSequencer({ + sessionId: "talk-session", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + }); + + const talkEvent = events.next({ + type: "input.audio.delta", + turnId: "turn-1", + payload: { + byteLength: 320, + text: "private transcript should not export", + }, + }); + + expect(createTalkDiagnosticEvent(talkEvent)).toEqual({ + type: "talk.event", + sessionId: "talk-session", + turnId: "turn-1", + captureId: undefined, + talkEventType: "input.audio.delta", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + final: undefined, + durationMs: undefined, + byteLength: 320, + }); + + recordTalkDiagnosticEvent(talkEvent); + await new Promise((resolve) => setImmediate(resolve)); + + expect(diagnostics).toHaveLength(1); + expect(diagnostics[0]).toMatchObject({ + trusted: true, + event: { + type: "talk.event", + talkEventType: "input.audio.delta", + sessionId: "talk-session", + turnId: "turn-1", + byteLength: 320, + }, + }); + expect(JSON.stringify(diagnostics[0]?.event)).not.toContain("private transcript"); + }); +}); diff --git a/src/talk/diagnostics.ts b/src/talk/diagnostics.ts new file mode 100644 index 00000000000..17a004c5ba4 --- /dev/null +++ b/src/talk/diagnostics.ts @@ -0,0 +1,51 @@ +import { + emitTrustedDiagnosticEvent, + type DiagnosticEventInput, +} from "../infra/diagnostic-events.js"; +import type { TalkEvent } from "./talk-events.js"; + +type TalkDiagnosticEventInput = Extract; + +export function createTalkDiagnosticEvent(event: TalkEvent): TalkDiagnosticEventInput { + const payload = asRecord(event.payload); + return { + type: "talk.event", + sessionId: event.sessionId, + turnId: event.turnId, + captureId: event.captureId, + talkEventType: event.type, + mode: event.mode, + transport: event.transport, + brain: event.brain, + provider: event.provider, + final: event.final, + durationMs: firstFiniteNumber(payload, ["durationMs", "latencyMs", "elapsedMs"]), + byteLength: firstFiniteNumber(payload, ["byteLength", "audioBytes"]), + }; +} + +export function recordTalkDiagnosticEvent(event: TalkEvent): void { + emitTrustedDiagnosticEvent(createTalkDiagnosticEvent(event)); +} + +function asRecord(value: unknown): Record | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +function firstFiniteNumber( + record: Record | undefined, + keys: readonly string[], +): number | undefined { + if (!record) { + return undefined; + } + for (const key of keys) { + const value = record[key]; + if (typeof value === "number" && Number.isFinite(value) && value >= 0) { + return value; + } + } + return undefined; +} diff --git a/src/talk/talk-session-controller.test.ts b/src/talk/talk-session-controller.test.ts index 34e1693f440..c6e7cb01c46 100644 --- a/src/talk/talk-session-controller.test.ts +++ b/src/talk/talk-session-controller.test.ts @@ -97,6 +97,28 @@ describe("createTalkSessionController", () => { expect(talk.outputAudioActive).toBe(false); }); + it("notifies an event hook for emitted and controller-created events", () => { + const events: string[] = []; + const talk = createTalkSessionController( + { + sessionId: "talk-session", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + }, + { + now: () => "2026-05-05T00:00:00.000Z", + onEvent: (event) => events.push(event.type), + }, + ); + + talk.emit({ type: "session.started", payload: {} }); + const turn = talk.ensureTurn(); + talk.endTurn({ turnId: turn.turnId }); + + expect(events).toEqual(["session.started", "turn.started", "turn.ended"]); + }); + it("clears stale output audio state when a replacement turn starts", () => { const talk = createController(); diff --git a/src/talk/talk-session-controller.ts b/src/talk/talk-session-controller.ts index 3643853704c..bcddad03eef 100644 --- a/src/talk/talk-session-controller.ts +++ b/src/talk/talk-session-controller.ts @@ -49,9 +49,15 @@ export type TalkSessionControllerParams = TalkEventContext & { turnIdPrefix?: string; }; +export type TalkSessionControllerOptions = { + now?: () => Date | string; + onEvent?: (event: TalkEvent) => void; + sequencer?: TalkEventSequencer; +}; + export function createTalkSessionController( params: TalkSessionControllerParams, - options: { now?: () => Date | string; sequencer?: TalkEventSequencer } = {}, + options: TalkSessionControllerOptions = {}, ): TalkSessionController { const { maxRecentEvents = 20, turnIdPrefix = "turn", ...context } = params; const sequencer = options.sequencer ?? createTalkEventSequencer(context, { now: options.now }); @@ -65,6 +71,11 @@ export function createTalkSessionController( if (recentEvents.length > maxRecentEvents) { recentEvents.splice(0, recentEvents.length - maxRecentEvents); } + try { + options.onEvent?.(event as TalkEvent); + } catch { + // Diagnostics hooks must not break Talk delivery. + } return event; };