fix(talk): add bounded lifecycle logging

This commit is contained in:
Vincent Koc
2026-05-06 02:16:51 -07:00
parent 28e27ca5d1
commit 16321a27b6
12 changed files with 348 additions and 26 deletions

View File

@@ -9,7 +9,7 @@ import {
createRealtimeVoiceAgentTalkbackQueue,
createTalkSessionController,
createRealtimeVoiceBridgeSession,
recordTalkDiagnosticEvent,
recordTalkObservabilityEvent,
type RealtimeVoiceAgentTalkbackQueue,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceProviderPlugin,
@@ -41,6 +41,7 @@ import {
convertGoogleMeetTtsAudioForBridge,
formatGoogleMeetAgentAudioModelLog,
formatGoogleMeetAgentTtsResultLog,
formatGoogleMeetTranscriptSummaryLog,
formatGoogleMeetRealtimeVoiceModelLog,
type GoogleMeetRealtimeEventEntry,
type GoogleMeetRealtimeTranscriptEntry,
@@ -181,7 +182,9 @@ export async function startNodeAgentAudioBridge(params: {
return;
}
recordGoogleMeetRealtimeTranscript(transcript, "assistant", normalized);
params.logger.info(`[google-meet] node agent assistant: ${normalized}`);
params.logger.info(
formatGoogleMeetTranscriptSummaryLog("node agent assistant", normalized),
);
const result = await params.runtime.tts.textToSpeechTelephony({
text: normalized,
cfg: params.fullConfig,
@@ -233,10 +236,13 @@ export async function startNodeAgentAudioBridge(params: {
return;
}
recordGoogleMeetRealtimeTranscript(transcript, "user", trimmed);
params.logger.info(`[google-meet] node agent user: ${trimmed}`);
params.logger.info(formatGoogleMeetTranscriptSummaryLog("node agent user", trimmed));
if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text: trimmed })) {
params.logger.info(
`[google-meet] node agent ignored assistant echo transcript: ${trimmed}`,
formatGoogleMeetTranscriptSummaryLog(
"node agent ignored assistant echo transcript",
trimmed,
),
);
return;
}
@@ -368,7 +374,7 @@ export async function startNodeRealtimeAudioBridge(params: {
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
provider: resolved.provider.id,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
const recentTalkEvents: TalkEvent[] = [];
const rememberTalkEvent = (event: TalkEvent | undefined): void => {
@@ -577,11 +583,14 @@ export async function startNodeRealtimeAudioBridge(params: {
}
if (isFinal) {
recordGoogleMeetRealtimeTranscript(transcript, role, text);
params.logger.info(`[google-meet] node realtime ${role}: ${text}`);
params.logger.info(formatGoogleMeetTranscriptSummaryLog(`node realtime ${role}`, text));
if (role === "user" && strategy === "agent") {
if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text })) {
params.logger.info(
`[google-meet] node realtime ignored assistant echo transcript: ${text}`,
formatGoogleMeetTranscriptSummaryLog(
"node realtime ignored assistant echo transcript",
text,
),
);
return;
}

View File

@@ -23,7 +23,7 @@ import {
REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ,
REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
recordRealtimeVoiceBridgeEvent,
recordTalkDiagnosticEvent,
recordTalkObservabilityEvent,
recordRealtimeVoiceTranscript,
resamplePcm,
resolveConfiguredRealtimeVoiceProvider,
@@ -407,6 +407,10 @@ export function formatGoogleMeetAgentTtsResultLog(
].join(" ");
}
export function formatGoogleMeetTranscriptSummaryLog(prefix: string, text: string): string {
return `[google-meet] ${prefix}: chars=${text.length}`;
}
function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | undefined {
const trimmed = text?.trim();
if (!trimmed) {
@@ -495,7 +499,7 @@ export async function startCommandAgentAudioBridge(params: {
provider: resolved.provider.id,
turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
const recentTalkEvents: TalkEvent[] = [];
const emitTalkEvent = (input: TalkEventInput) =>
@@ -636,7 +640,7 @@ export async function startCommandAgentAudioBridge(params: {
return;
}
recordGoogleMeetRealtimeTranscript(transcript, "assistant", normalized);
params.logger.info(`[google-meet] agent assistant: ${normalized}`);
params.logger.info(formatGoogleMeetTranscriptSummaryLog("agent assistant", normalized));
const turnId = ensureTalkTurn();
emitTalkEvent({
type: "output.text.done",
@@ -720,9 +724,11 @@ export async function startCommandAgentAudioBridge(params: {
payload: { meetingSessionId: params.meetingSessionId, text: trimmed, role: "user" },
});
recordGoogleMeetRealtimeTranscript(transcript, "user", trimmed);
params.logger.info(`[google-meet] agent user: ${trimmed}`);
params.logger.info(formatGoogleMeetTranscriptSummaryLog("agent user", trimmed));
if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text: trimmed })) {
params.logger.info(`[google-meet] agent ignored assistant echo transcript: ${trimmed}`);
params.logger.info(
formatGoogleMeetTranscriptSummaryLog("agent ignored assistant echo transcript", trimmed),
);
return;
}
agentTalkback?.enqueue(trimmed);
@@ -1046,7 +1052,7 @@ export async function startCommandRealtimeAudioBridge(params: {
brain: strategy === "bidi" ? "direct-tools" : "agent-consult",
provider: resolved.provider.id,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
const recentTalkEvents: TalkEvent[] = [];
const rememberTalkEvent = (event: TalkEvent | undefined): void => {
@@ -1171,10 +1177,15 @@ export async function startCommandRealtimeAudioBridge(params: {
}
if (isFinal) {
recordGoogleMeetRealtimeTranscript(transcript, role, text);
params.logger.info(`[google-meet] realtime ${role}: ${text}`);
params.logger.info(formatGoogleMeetTranscriptSummaryLog(`realtime ${role}`, text));
if (role === "user" && strategy === "agent") {
if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text })) {
params.logger.info(`[google-meet] realtime ignored assistant echo transcript: ${text}`);
params.logger.info(
formatGoogleMeetTranscriptSummaryLog(
"realtime ignored assistant echo transcript",
text,
),
);
return;
}
agentTalkback?.enqueue(text);

View File

@@ -16,7 +16,7 @@ import type {
} from "openclaw/plugin-sdk/realtime-transcription";
import {
createTalkSessionController,
recordTalkDiagnosticEvent,
recordTalkObservabilityEvent,
type TalkEvent,
type TalkEventInput,
type TalkSessionController,
@@ -794,7 +794,7 @@ export class MediaStreamHandler {
provider: this.config.transcriptionProvider.id,
turnIdPrefix: `${streamSid}:turn`,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
}

View File

@@ -7,7 +7,7 @@ import {
createTalkSessionController,
createRealtimeVoiceBridgeSession,
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
recordTalkDiagnosticEvent,
recordTalkObservabilityEvent,
type RealtimeVoiceBridgeSession,
type RealtimeVoiceProviderConfig,
type RealtimeVoiceProviderPlugin,
@@ -516,7 +516,7 @@ export class RealtimeCallHandler {
brain: "agent-consult",
provider: this.realtimeProvider.id,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
const rememberTalkEvent = (event: TalkEvent | undefined): TalkEvent | undefined => {
if (event) {

View File

@@ -1,5 +1,5 @@
import { createHash, randomBytes, randomUUID } from "node:crypto";
import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
import { recordTalkObservabilityEvent } from "../talk/observability.js";
import {
createTalkSessionController,
type TalkBrain,
@@ -328,7 +328,7 @@ function createTalkHandoffRoom(params: {
brain: params.brain,
provider: params.provider,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
),
};
}

View File

@@ -1,6 +1,6 @@
import { randomUUID } from "node:crypto";
import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js";
import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
import { recordTalkObservabilityEvent } from "../talk/observability.js";
import {
REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ,
type RealtimeVoiceBrowserAudioContract,
@@ -169,7 +169,7 @@ export function createTalkRealtimeRelaySession(
brain: "agent-consult",
provider: params.provider.id,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
let relay: RelaySession | undefined;
const emit = (event: TalkRealtimeRelayEventPayload, talkEvent?: TalkEventInput) =>

View File

@@ -1,7 +1,7 @@
import { randomUUID } from "node:crypto";
import type { RealtimeTranscriptionProviderPlugin } from "../plugins/types.js";
import type { RealtimeTranscriptionProviderConfig } from "../realtime-transcription/provider-types.js";
import { recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
import { recordTalkObservabilityEvent } from "../talk/observability.js";
import {
type TalkEvent,
type TalkEventInput,
@@ -147,7 +147,7 @@ export function createTalkTranscriptionRelaySession(
brain: "none",
provider: params.provider.id,
},
{ onEvent: recordTalkDiagnosticEvent },
{ onEvent: recordTalkObservabilityEvent },
);
let relay: TranscriptionRelaySession | undefined;
const emit = (event: TalkTranscriptionRelayEventPayload, talkEvent?: TalkEventInput): void => {

View File

@@ -36,6 +36,8 @@ export {
type TalkTransport,
} from "../talk/talk-events.js";
export { createTalkDiagnosticEvent, recordTalkDiagnosticEvent } from "../talk/diagnostics.js";
export { createTalkLogRecord, recordTalkLogEvent } from "../talk/logging.js";
export { recordTalkObservabilityEvent } from "../talk/observability.js";
export {
createTalkSessionController,
normalizeTalkTransport,

View File

@@ -59,7 +59,7 @@ export function createRealtimeVoiceAgentTalkbackQueue(
}
const currentQuestion = nextQuestion;
pendingQuestion = undefined;
params.logger.info(`${params.logPrefix} consult: ${currentQuestion}`);
params.logger.info(`${params.logPrefix} consult: chars=${currentQuestion.length}`);
activeAbortController = new AbortController();
const result = await params.consult({
question: currentQuestion,

195
src/talk/logging.test.ts Normal file
View File

@@ -0,0 +1,195 @@
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
onInternalDiagnosticEvent,
resetDiagnosticEventsForTest,
type DiagnosticEventPayload,
} from "../infra/diagnostic-events.js";
import { resetLogger, setLoggerOverride } from "../logging/logger.js";
import { createTalkLogRecord, recordTalkLogEvent } from "./logging.js";
import { recordTalkObservabilityEvent } from "./observability.js";
import { createTalkEventSequencer } from "./talk-events.js";
function flushDiagnosticEvents() {
return new Promise<void>((resolve) => setImmediate(resolve));
}
describe("talk logging", () => {
let tmpDir: string;
let logFile: string;
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-talk-logs-"));
logFile = path.join(tmpDir, "openclaw.log");
resetDiagnosticEventsForTest();
resetLogger();
setLoggerOverride({ level: "info", file: logFile });
});
afterEach(() => {
resetDiagnosticEventsForTest();
setLoggerOverride(null);
resetLogger();
fs.rmSync(tmpDir, { recursive: true, force: true });
});
it("emits bounded lifecycle log records without transcript text or scoped ids", async () => {
const logs: Array<Extract<DiagnosticEventPayload, { type: "log.record" }>> = [];
const unsubscribe = onInternalDiagnosticEvent((event) => {
if (event.type === "log.record") {
logs.push(event);
}
});
const events = createTalkEventSequencer({
sessionId: "talk-session",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
});
const talkEvent = events.next({
type: "output.text.done",
turnId: "turn-1",
callId: "call-1",
itemId: "item-1",
final: true,
payload: {
text: "private transcript should not be logged",
durationMs: 42,
},
});
expect(createTalkLogRecord(talkEvent)).toEqual({
level: "info",
message: "talk event output.text.done",
attributes: {
sessionId: "talk-session",
talkEventType: "output.text.done",
talkMode: "realtime",
talkTransport: "gateway-relay",
talkBrain: "agent-consult",
talkProvider: "openai",
talkFinal: true,
talkDurationMs: 42,
},
});
recordTalkLogEvent(talkEvent);
await flushDiagnosticEvents();
unsubscribe();
expect(logs).toHaveLength(1);
expect(logs[0]).toMatchObject({
type: "log.record",
level: "INFO",
message: "talk event output.text.done",
attributes: {
subsystem: "talk",
sessionId: "talk-session",
talkEventType: "output.text.done",
talkMode: "realtime",
talkTransport: "gateway-relay",
talkBrain: "agent-consult",
talkProvider: "openai",
talkFinal: true,
talkDurationMs: 42,
},
});
const serialized = JSON.stringify(logs[0]);
expect(serialized).not.toContain("private transcript");
expect(serialized).not.toContain("turn-1");
expect(serialized).not.toContain("call-1");
expect(serialized).not.toContain("item-1");
const fileLog = fs.readFileSync(logFile, "utf8");
expect(fileLog).toContain("talk event output.text.done");
expect(fileLog).toContain('"session_id":"talk-session"');
expect(fileLog).not.toContain("private transcript");
expect(fileLog).not.toContain("turn-1");
expect(fileLog).not.toContain("call-1");
expect(fileLog).not.toContain("item-1");
});
it("drops high-volume delta records from file and OTLP logs", async () => {
const logs: Array<Extract<DiagnosticEventPayload, { type: "log.record" }>> = [];
const unsubscribe = onInternalDiagnosticEvent((event) => {
if (event.type === "log.record") {
logs.push(event);
}
});
const events = createTalkEventSequencer({
sessionId: "talk-session",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
});
recordTalkLogEvent(
events.next({
type: "transcript.delta",
turnId: "turn-1",
payload: { text: "private partial transcript" },
}),
);
recordTalkLogEvent(
events.next({
type: "output.audio.delta",
turnId: "turn-1",
payload: { byteLength: 320 },
}),
);
await flushDiagnosticEvents();
unsubscribe();
expect(logs).toHaveLength(0);
});
it("records diagnostics and logs through the combined observability hook", async () => {
const observed: Array<{ event: DiagnosticEventPayload; trusted: boolean }> = [];
const unsubscribe = onInternalDiagnosticEvent((event, metadata) => {
observed.push({ event, trusted: metadata.trusted });
});
const events = createTalkEventSequencer({
sessionId: "talk-session",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
provider: "openai",
});
recordTalkObservabilityEvent(
events.next({
type: "session.error",
payload: { message: "provider failure with private detail" },
final: true,
}),
);
await flushDiagnosticEvents();
unsubscribe();
expect(observed).toEqual(
expect.arrayContaining([
expect.objectContaining({
trusted: true,
event: expect.objectContaining({
type: "talk.event",
talkEventType: "session.error",
sessionId: "talk-session",
}),
}),
expect.objectContaining({
trusted: false,
event: expect.objectContaining({
type: "log.record",
level: "WARN",
message: "talk event session.error",
}),
}),
]),
);
expect(JSON.stringify(observed)).not.toContain("private detail");
});
});

97
src/talk/logging.ts Normal file
View File

@@ -0,0 +1,97 @@
import { getChildLogger } from "../logging/logger.js";
import type { TalkEvent, TalkEventType } from "./talk-events.js";
type TalkLogLevel = "info" | "warn";
type TalkLogRecord = {
level: TalkLogLevel;
message: string;
attributes: Record<string, string | number | boolean>;
};
const OMITTED_TALK_LOG_EVENT_TYPES = new Set<TalkEventType>([
"input.audio.delta",
"output.audio.delta",
"output.text.delta",
"transcript.delta",
"tool.progress",
]);
const TALK_LOGGER_BINDINGS = Object.freeze({ subsystem: "talk" });
export function createTalkLogRecord(event: TalkEvent): TalkLogRecord | undefined {
if (OMITTED_TALK_LOG_EVENT_TYPES.has(event.type)) {
return undefined;
}
const payload = asRecord(event.payload);
const attributes: Record<string, string | number | boolean> = {
sessionId: event.sessionId,
talkEventType: event.type,
talkMode: event.mode,
talkTransport: event.transport,
talkBrain: event.brain,
};
if (event.provider) {
attributes.talkProvider = event.provider;
}
if (typeof event.final === "boolean") {
attributes.talkFinal = event.final;
}
const durationMs = firstFiniteNumber(payload, ["durationMs", "latencyMs", "elapsedMs"]);
if (durationMs !== undefined) {
attributes.talkDurationMs = durationMs;
}
const byteLength = firstFiniteNumber(payload, ["byteLength", "audioBytes"]);
if (byteLength !== undefined) {
attributes.talkByteLength = byteLength;
}
return {
level: event.type === "session.error" || event.type === "tool.error" ? "warn" : "info",
message: `talk event ${event.type}`,
attributes,
};
}
export function recordTalkLogEvent(event: TalkEvent): void {
const record = createTalkLogRecord(event);
if (!record) {
return;
}
try {
const logger = getChildLogger(TALK_LOGGER_BINDINGS);
if (record.level === "warn") {
logger.warn(record.attributes, record.message);
return;
}
logger.info(record.attributes, record.message);
} catch {
// logging must never block the realtime Talk path
}
}
function asRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function firstFiniteNumber(
record: Record<string, unknown> | undefined,
keys: readonly string[],
): number | undefined {
if (!record) {
return undefined;
}
for (const key of keys) {
const value = record[key];
if (typeof value === "number" && Number.isFinite(value) && value >= 0) {
return value;
}
}
return undefined;
}

View File

@@ -0,0 +1,8 @@
import { recordTalkDiagnosticEvent } from "./diagnostics.js";
import { recordTalkLogEvent } from "./logging.js";
import type { TalkEvent } from "./talk-events.js";
export function recordTalkObservabilityEvent(event: TalkEvent): void {
recordTalkDiagnosticEvent(event);
recordTalkLogEvent(event);
}