From 16321a27b64c9be1e609eda76e05f1c9d7e3cf58 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Wed, 6 May 2026 02:16:51 -0700 Subject: [PATCH] fix(talk): add bounded lifecycle logging --- extensions/google-meet/src/realtime-node.ts | 23 ++- extensions/google-meet/src/realtime.ts | 27 ++- extensions/voice-call/src/media-stream.ts | 4 +- .../src/webhook/realtime-handler.ts | 4 +- src/gateway/talk-handoff.ts | 4 +- src/gateway/talk-realtime-relay.ts | 4 +- src/gateway/talk-transcription-relay.ts | 4 +- src/plugin-sdk/realtime-voice.ts | 2 + src/talk/agent-talkback-runtime.ts | 2 +- src/talk/logging.test.ts | 195 ++++++++++++++++++ src/talk/logging.ts | 97 +++++++++ src/talk/observability.ts | 8 + 12 files changed, 348 insertions(+), 26 deletions(-) create mode 100644 src/talk/logging.test.ts create mode 100644 src/talk/logging.ts create mode 100644 src/talk/observability.ts diff --git a/extensions/google-meet/src/realtime-node.ts b/extensions/google-meet/src/realtime-node.ts index e433ae405a4..8504263ff3b 100644 --- a/extensions/google-meet/src/realtime-node.ts +++ b/extensions/google-meet/src/realtime-node.ts @@ -9,7 +9,7 @@ import { createRealtimeVoiceAgentTalkbackQueue, createTalkSessionController, createRealtimeVoiceBridgeSession, - recordTalkDiagnosticEvent, + recordTalkObservabilityEvent, type RealtimeVoiceAgentTalkbackQueue, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderPlugin, @@ -41,6 +41,7 @@ import { convertGoogleMeetTtsAudioForBridge, formatGoogleMeetAgentAudioModelLog, formatGoogleMeetAgentTtsResultLog, + formatGoogleMeetTranscriptSummaryLog, formatGoogleMeetRealtimeVoiceModelLog, type GoogleMeetRealtimeEventEntry, type GoogleMeetRealtimeTranscriptEntry, @@ -181,7 +182,9 @@ export async function startNodeAgentAudioBridge(params: { return; } recordGoogleMeetRealtimeTranscript(transcript, "assistant", normalized); - params.logger.info(`[google-meet] node agent assistant: ${normalized}`); + params.logger.info( + formatGoogleMeetTranscriptSummaryLog("node agent assistant", normalized), + ); const result = await params.runtime.tts.textToSpeechTelephony({ text: normalized, cfg: params.fullConfig, @@ -233,10 +236,13 @@ export async function startNodeAgentAudioBridge(params: { return; } recordGoogleMeetRealtimeTranscript(transcript, "user", trimmed); - params.logger.info(`[google-meet] node agent user: ${trimmed}`); + params.logger.info(formatGoogleMeetTranscriptSummaryLog("node agent user", trimmed)); if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text: trimmed })) { params.logger.info( - `[google-meet] node agent ignored assistant echo transcript: ${trimmed}`, + formatGoogleMeetTranscriptSummaryLog( + "node agent ignored assistant echo transcript", + trimmed, + ), ); return; } @@ -368,7 +374,7 @@ export async function startNodeRealtimeAudioBridge(params: { brain: strategy === "bidi" ? "direct-tools" : "agent-consult", provider: resolved.provider.id, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); const recentTalkEvents: TalkEvent[] = []; const rememberTalkEvent = (event: TalkEvent | undefined): void => { @@ -577,11 +583,14 @@ export async function startNodeRealtimeAudioBridge(params: { } if (isFinal) { recordGoogleMeetRealtimeTranscript(transcript, role, text); - params.logger.info(`[google-meet] node realtime ${role}: ${text}`); + params.logger.info(formatGoogleMeetTranscriptSummaryLog(`node realtime ${role}`, text)); if (role === "user" && strategy === "agent") { if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text })) { params.logger.info( - `[google-meet] node realtime ignored assistant echo transcript: ${text}`, + formatGoogleMeetTranscriptSummaryLog( + "node realtime ignored assistant echo transcript", + text, + ), ); return; } diff --git a/extensions/google-meet/src/realtime.ts b/extensions/google-meet/src/realtime.ts index a210d682bc3..20055b86e21 100644 --- a/extensions/google-meet/src/realtime.ts +++ b/extensions/google-meet/src/realtime.ts @@ -23,7 +23,7 @@ import { REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ, REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, recordRealtimeVoiceBridgeEvent, - recordTalkDiagnosticEvent, + recordTalkObservabilityEvent, recordRealtimeVoiceTranscript, resamplePcm, resolveConfiguredRealtimeVoiceProvider, @@ -407,6 +407,10 @@ export function formatGoogleMeetAgentTtsResultLog( ].join(" "); } +export function formatGoogleMeetTranscriptSummaryLog(prefix: string, text: string): string { + return `[google-meet] ${prefix}: chars=${text.length}`; +} + function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | undefined { const trimmed = text?.trim(); if (!trimmed) { @@ -495,7 +499,7 @@ export async function startCommandAgentAudioBridge(params: { provider: resolved.provider.id, turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); const recentTalkEvents: TalkEvent[] = []; const emitTalkEvent = (input: TalkEventInput) => @@ -636,7 +640,7 @@ export async function startCommandAgentAudioBridge(params: { return; } recordGoogleMeetRealtimeTranscript(transcript, "assistant", normalized); - params.logger.info(`[google-meet] agent assistant: ${normalized}`); + params.logger.info(formatGoogleMeetTranscriptSummaryLog("agent assistant", normalized)); const turnId = ensureTalkTurn(); emitTalkEvent({ type: "output.text.done", @@ -720,9 +724,11 @@ export async function startCommandAgentAudioBridge(params: { payload: { meetingSessionId: params.meetingSessionId, text: trimmed, role: "user" }, }); recordGoogleMeetRealtimeTranscript(transcript, "user", trimmed); - params.logger.info(`[google-meet] agent user: ${trimmed}`); + params.logger.info(formatGoogleMeetTranscriptSummaryLog("agent user", trimmed)); if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text: trimmed })) { - params.logger.info(`[google-meet] agent ignored assistant echo transcript: ${trimmed}`); + params.logger.info( + formatGoogleMeetTranscriptSummaryLog("agent ignored assistant echo transcript", trimmed), + ); return; } agentTalkback?.enqueue(trimmed); @@ -1046,7 +1052,7 @@ export async function startCommandRealtimeAudioBridge(params: { brain: strategy === "bidi" ? "direct-tools" : "agent-consult", provider: resolved.provider.id, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); const recentTalkEvents: TalkEvent[] = []; const rememberTalkEvent = (event: TalkEvent | undefined): void => { @@ -1171,10 +1177,15 @@ export async function startCommandRealtimeAudioBridge(params: { } if (isFinal) { recordGoogleMeetRealtimeTranscript(transcript, role, text); - params.logger.info(`[google-meet] realtime ${role}: ${text}`); + params.logger.info(formatGoogleMeetTranscriptSummaryLog(`realtime ${role}`, text)); if (role === "user" && strategy === "agent") { if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text })) { - params.logger.info(`[google-meet] realtime ignored assistant echo transcript: ${text}`); + params.logger.info( + formatGoogleMeetTranscriptSummaryLog( + "realtime ignored assistant echo transcript", + text, + ), + ); return; } agentTalkback?.enqueue(text); diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index 215c121305e..509dc64c361 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -16,7 +16,7 @@ import type { } from "openclaw/plugin-sdk/realtime-transcription"; import { createTalkSessionController, - recordTalkDiagnosticEvent, + recordTalkObservabilityEvent, type TalkEvent, type TalkEventInput, type TalkSessionController, @@ -794,7 +794,7 @@ export class MediaStreamHandler { provider: this.config.transcriptionProvider.id, turnIdPrefix: `${streamSid}:turn`, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); } diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index b9dcfd697e2..13c56adde44 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -7,7 +7,7 @@ import { createTalkSessionController, createRealtimeVoiceBridgeSession, REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, - recordTalkDiagnosticEvent, + recordTalkObservabilityEvent, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderConfig, type RealtimeVoiceProviderPlugin, @@ -516,7 +516,7 @@ export class RealtimeCallHandler { brain: "agent-consult", provider: this.realtimeProvider.id, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); const rememberTalkEvent = (event: TalkEvent | undefined): TalkEvent | undefined => { if (event) { diff --git a/src/gateway/talk-handoff.ts b/src/gateway/talk-handoff.ts index 96629050fd1..5356d40c249 100644 --- a/src/gateway/talk-handoff.ts +++ b/src/gateway/talk-handoff.ts @@ -1,5 +1,5 @@ import { createHash, randomBytes, randomUUID } from "node:crypto"; -import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; +import { recordTalkObservabilityEvent } from "../talk/observability.js"; import { createTalkSessionController, type TalkBrain, @@ -328,7 +328,7 @@ function createTalkHandoffRoom(params: { brain: params.brain, provider: params.provider, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ), }; } diff --git a/src/gateway/talk-realtime-relay.ts b/src/gateway/talk-realtime-relay.ts index 401d9ed068c..33fb7bef313 100644 --- a/src/gateway/talk-realtime-relay.ts +++ b/src/gateway/talk-realtime-relay.ts @@ -1,6 +1,6 @@ import { randomUUID } from "node:crypto"; import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js"; -import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; +import { recordTalkObservabilityEvent } from "../talk/observability.js"; import { REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, type RealtimeVoiceBrowserAudioContract, @@ -169,7 +169,7 @@ export function createTalkRealtimeRelaySession( brain: "agent-consult", provider: params.provider.id, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); let relay: RelaySession | undefined; const emit = (event: TalkRealtimeRelayEventPayload, talkEvent?: TalkEventInput) => diff --git a/src/gateway/talk-transcription-relay.ts b/src/gateway/talk-transcription-relay.ts index d90863b9b5c..02ce2341efc 100644 --- a/src/gateway/talk-transcription-relay.ts +++ b/src/gateway/talk-transcription-relay.ts @@ -1,7 +1,7 @@ import { randomUUID } from "node:crypto"; import type { RealtimeTranscriptionProviderPlugin } from "../plugins/types.js"; import type { RealtimeTranscriptionProviderConfig } from "../realtime-transcription/provider-types.js"; -import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; +import { recordTalkObservabilityEvent } from "../talk/observability.js"; import { type TalkEvent, type TalkEventInput, @@ -147,7 +147,7 @@ export function createTalkTranscriptionRelaySession( brain: "none", provider: params.provider.id, }, - { onEvent: recordTalkDiagnosticEvent }, + { onEvent: recordTalkObservabilityEvent }, ); let relay: TranscriptionRelaySession | undefined; const emit = (event: TalkTranscriptionRelayEventPayload, talkEvent?: TalkEventInput): void => { diff --git a/src/plugin-sdk/realtime-voice.ts b/src/plugin-sdk/realtime-voice.ts index efaef026cd9..f74feb68287 100644 --- a/src/plugin-sdk/realtime-voice.ts +++ b/src/plugin-sdk/realtime-voice.ts @@ -36,6 +36,8 @@ export { type TalkTransport, } from "../talk/talk-events.js"; export { createTalkDiagnosticEvent, recordTalkDiagnosticEvent } from "../talk/diagnostics.js"; +export { createTalkLogRecord, recordTalkLogEvent } from "../talk/logging.js"; +export { recordTalkObservabilityEvent } from "../talk/observability.js"; export { createTalkSessionController, normalizeTalkTransport, diff --git a/src/talk/agent-talkback-runtime.ts b/src/talk/agent-talkback-runtime.ts index 638c5fbb3ae..2d2f1399459 100644 --- a/src/talk/agent-talkback-runtime.ts +++ b/src/talk/agent-talkback-runtime.ts @@ -59,7 +59,7 @@ export function createRealtimeVoiceAgentTalkbackQueue( } const currentQuestion = nextQuestion; pendingQuestion = undefined; - params.logger.info(`${params.logPrefix} consult: ${currentQuestion}`); + params.logger.info(`${params.logPrefix} consult: chars=${currentQuestion.length}`); activeAbortController = new AbortController(); const result = await params.consult({ question: currentQuestion, diff --git a/src/talk/logging.test.ts b/src/talk/logging.test.ts new file mode 100644 index 00000000000..eb0421a61ee --- /dev/null +++ b/src/talk/logging.test.ts @@ -0,0 +1,195 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { + onInternalDiagnosticEvent, + resetDiagnosticEventsForTest, + type DiagnosticEventPayload, +} from "../infra/diagnostic-events.js"; +import { resetLogger, setLoggerOverride } from "../logging/logger.js"; +import { createTalkLogRecord, recordTalkLogEvent } from "./logging.js"; +import { recordTalkObservabilityEvent } from "./observability.js"; +import { createTalkEventSequencer } from "./talk-events.js"; + +function flushDiagnosticEvents() { + return new Promise((resolve) => setImmediate(resolve)); +} + +describe("talk logging", () => { + let tmpDir: string; + let logFile: string; + + beforeEach(() => { + tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-talk-logs-")); + logFile = path.join(tmpDir, "openclaw.log"); + resetDiagnosticEventsForTest(); + resetLogger(); + setLoggerOverride({ level: "info", file: logFile }); + }); + + afterEach(() => { + resetDiagnosticEventsForTest(); + setLoggerOverride(null); + resetLogger(); + fs.rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("emits bounded lifecycle log records without transcript text or scoped ids", async () => { + const logs: Array> = []; + const unsubscribe = onInternalDiagnosticEvent((event) => { + if (event.type === "log.record") { + logs.push(event); + } + }); + const events = createTalkEventSequencer({ + sessionId: "talk-session", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + }); + const talkEvent = events.next({ + type: "output.text.done", + turnId: "turn-1", + callId: "call-1", + itemId: "item-1", + final: true, + payload: { + text: "private transcript should not be logged", + durationMs: 42, + }, + }); + + expect(createTalkLogRecord(talkEvent)).toEqual({ + level: "info", + message: "talk event output.text.done", + attributes: { + sessionId: "talk-session", + talkEventType: "output.text.done", + talkMode: "realtime", + talkTransport: "gateway-relay", + talkBrain: "agent-consult", + talkProvider: "openai", + talkFinal: true, + talkDurationMs: 42, + }, + }); + + recordTalkLogEvent(talkEvent); + await flushDiagnosticEvents(); + unsubscribe(); + + expect(logs).toHaveLength(1); + expect(logs[0]).toMatchObject({ + type: "log.record", + level: "INFO", + message: "talk event output.text.done", + attributes: { + subsystem: "talk", + sessionId: "talk-session", + talkEventType: "output.text.done", + talkMode: "realtime", + talkTransport: "gateway-relay", + talkBrain: "agent-consult", + talkProvider: "openai", + talkFinal: true, + talkDurationMs: 42, + }, + }); + const serialized = JSON.stringify(logs[0]); + expect(serialized).not.toContain("private transcript"); + expect(serialized).not.toContain("turn-1"); + expect(serialized).not.toContain("call-1"); + expect(serialized).not.toContain("item-1"); + + const fileLog = fs.readFileSync(logFile, "utf8"); + expect(fileLog).toContain("talk event output.text.done"); + expect(fileLog).toContain('"session_id":"talk-session"'); + expect(fileLog).not.toContain("private transcript"); + expect(fileLog).not.toContain("turn-1"); + expect(fileLog).not.toContain("call-1"); + expect(fileLog).not.toContain("item-1"); + }); + + it("drops high-volume delta records from file and OTLP logs", async () => { + const logs: Array> = []; + const unsubscribe = onInternalDiagnosticEvent((event) => { + if (event.type === "log.record") { + logs.push(event); + } + }); + const events = createTalkEventSequencer({ + sessionId: "talk-session", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + }); + + recordTalkLogEvent( + events.next({ + type: "transcript.delta", + turnId: "turn-1", + payload: { text: "private partial transcript" }, + }), + ); + recordTalkLogEvent( + events.next({ + type: "output.audio.delta", + turnId: "turn-1", + payload: { byteLength: 320 }, + }), + ); + await flushDiagnosticEvents(); + unsubscribe(); + + expect(logs).toHaveLength(0); + }); + + it("records diagnostics and logs through the combined observability hook", async () => { + const observed: Array<{ event: DiagnosticEventPayload; trusted: boolean }> = []; + const unsubscribe = onInternalDiagnosticEvent((event, metadata) => { + observed.push({ event, trusted: metadata.trusted }); + }); + const events = createTalkEventSequencer({ + sessionId: "talk-session", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + }); + + recordTalkObservabilityEvent( + events.next({ + type: "session.error", + payload: { message: "provider failure with private detail" }, + final: true, + }), + ); + await flushDiagnosticEvents(); + unsubscribe(); + + expect(observed).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + trusted: true, + event: expect.objectContaining({ + type: "talk.event", + talkEventType: "session.error", + sessionId: "talk-session", + }), + }), + expect.objectContaining({ + trusted: false, + event: expect.objectContaining({ + type: "log.record", + level: "WARN", + message: "talk event session.error", + }), + }), + ]), + ); + expect(JSON.stringify(observed)).not.toContain("private detail"); + }); +}); diff --git a/src/talk/logging.ts b/src/talk/logging.ts new file mode 100644 index 00000000000..5de39a6026d --- /dev/null +++ b/src/talk/logging.ts @@ -0,0 +1,97 @@ +import { getChildLogger } from "../logging/logger.js"; +import type { TalkEvent, TalkEventType } from "./talk-events.js"; + +type TalkLogLevel = "info" | "warn"; + +type TalkLogRecord = { + level: TalkLogLevel; + message: string; + attributes: Record; +}; + +const OMITTED_TALK_LOG_EVENT_TYPES = new Set([ + "input.audio.delta", + "output.audio.delta", + "output.text.delta", + "transcript.delta", + "tool.progress", +]); + +const TALK_LOGGER_BINDINGS = Object.freeze({ subsystem: "talk" }); + +export function createTalkLogRecord(event: TalkEvent): TalkLogRecord | undefined { + if (OMITTED_TALK_LOG_EVENT_TYPES.has(event.type)) { + return undefined; + } + + const payload = asRecord(event.payload); + const attributes: Record = { + sessionId: event.sessionId, + talkEventType: event.type, + talkMode: event.mode, + talkTransport: event.transport, + talkBrain: event.brain, + }; + + if (event.provider) { + attributes.talkProvider = event.provider; + } + if (typeof event.final === "boolean") { + attributes.talkFinal = event.final; + } + + const durationMs = firstFiniteNumber(payload, ["durationMs", "latencyMs", "elapsedMs"]); + if (durationMs !== undefined) { + attributes.talkDurationMs = durationMs; + } + const byteLength = firstFiniteNumber(payload, ["byteLength", "audioBytes"]); + if (byteLength !== undefined) { + attributes.talkByteLength = byteLength; + } + + return { + level: event.type === "session.error" || event.type === "tool.error" ? "warn" : "info", + message: `talk event ${event.type}`, + attributes, + }; +} + +export function recordTalkLogEvent(event: TalkEvent): void { + const record = createTalkLogRecord(event); + if (!record) { + return; + } + + try { + const logger = getChildLogger(TALK_LOGGER_BINDINGS); + if (record.level === "warn") { + logger.warn(record.attributes, record.message); + return; + } + logger.info(record.attributes, record.message); + } catch { + // logging must never block the realtime Talk path + } +} + +function asRecord(value: unknown): Record | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +function firstFiniteNumber( + record: Record | undefined, + keys: readonly string[], +): number | undefined { + if (!record) { + return undefined; + } + for (const key of keys) { + const value = record[key]; + if (typeof value === "number" && Number.isFinite(value) && value >= 0) { + return value; + } + } + return undefined; +} diff --git a/src/talk/observability.ts b/src/talk/observability.ts new file mode 100644 index 00000000000..2d1c309c6f3 --- /dev/null +++ b/src/talk/observability.ts @@ -0,0 +1,8 @@ +import { recordTalkDiagnosticEvent } from "./diagnostics.js"; +import { recordTalkLogEvent } from "./logging.js"; +import type { TalkEvent } from "./talk-events.js"; + +export function recordTalkObservabilityEvent(event: TalkEvent): void { + recordTalkDiagnosticEvent(event); + recordTalkLogEvent(event); +}