diff --git a/extensions/diagnostics-otel/src/service.ts b/extensions/diagnostics-otel/src/service.ts index 43f975f35f4..1b28a4e972e 100644 --- a/extensions/diagnostics-otel/src/service.ts +++ b/extensions/diagnostics-otel/src/service.ts @@ -2294,6 +2294,8 @@ export function createDiagnosticsOtelService(): OpenClawPluginService { return; case "session.long_running": case "session.stalled": + case "session.recovery.completed": + case "session.recovery.requested": return; case "session.stuck": recordSessionStuck(evt); diff --git a/extensions/google-meet/index.test.ts b/extensions/google-meet/index.test.ts index 2e3b3b70df7..a80728c738b 100644 --- a/extensions/google-meet/index.test.ts +++ b/extensions/google-meet/index.test.ts @@ -3945,6 +3945,23 @@ describe("google-meet plugin", () => { realtimeTranscriptLines: 2, lastRealtimeTranscriptRole: "assistant", }); + const talkEventTypes = handle.getHealth().recentTalkEvents?.map((event) => event.type) ?? []; + expect(talkEventTypes).toEqual([ + "session.started", + "session.ready", + "turn.started", + "input.audio.delta", + "input.audio.committed", + "transcript.done", + "output.text.done", + "output.audio.started", + "output.audio.delta", + "output.audio.done", + "turn.ended", + ]); + expect(talkEventTypes.indexOf("output.text.done")).toBeLessThan( + talkEventTypes.indexOf("output.audio.started"), + ); await handle.stop(); }); @@ -4167,6 +4184,21 @@ describe("google-meet plugin", () => { undefined, ); }); + expect(handle.getHealth().recentTalkEvents?.map((event) => event.type)).toEqual( + expect.arrayContaining([ + "session.started", + "session.ready", + "input.audio.delta", + "output.audio.delta", + "output.audio.done", + "transcript.done", + "output.text.done", + "tool.call", + "tool.progress", + "tool.result", + "turn.ended", + ]), + ); expect(runtime.agent.runEmbeddedPiAgent).toHaveBeenCalledWith( expect.objectContaining({ messageProvider: "google-meet", @@ -4644,6 +4676,24 @@ describe("google-meet plugin", () => { lastRealtimeEventDetail: "status=completed", clearCount: 1, }); + const talkEvents = handle.getHealth().recentTalkEvents ?? []; + expect(talkEvents.map((event) => event.type)).toEqual( + expect.arrayContaining([ + "session.started", + "session.ready", + "input.audio.delta", + "output.audio.delta", + "output.audio.done", + "output.text.done", + "tool.call", + "tool.progress", + "tool.result", + "turn.ended", + ]), + ); + expect(talkEvents[0]).toMatchObject({ + sessionId: "google-meet:meet-1:bridge-1:node-realtime", + }); await handle.stop(); diff --git a/extensions/google-meet/src/agent-consult.ts b/extensions/google-meet/src/agent-consult.ts index 31f8c4af7dd..8e1a67a4980 100644 --- a/extensions/google-meet/src/agent-consult.ts +++ b/extensions/google-meet/src/agent-consult.ts @@ -1,4 +1,5 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk/plugin-runtime"; import { buildRealtimeVoiceAgentConsultWorkingResponse, @@ -7,7 +8,9 @@ import { resolveRealtimeVoiceAgentConsultTools, resolveRealtimeVoiceAgentConsultToolsAllow, type RealtimeVoiceBridgeSession, + type RealtimeVoiceToolCallEvent, type RealtimeVoiceTool, + type TalkEventInput, } from "openclaw/plugin-sdk/realtime-voice"; import { normalizeAgentId } from "openclaw/plugin-sdk/routing"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; @@ -74,3 +77,82 @@ export async function consultOpenClawAgentForGoogleMeet(params: { extraSystemPrompt: GOOGLE_MEET_CONSULT_SYSTEM_PROMPT, }); } + +export function handleGoogleMeetRealtimeConsultToolCall(params: { + strategy: string; + session: RealtimeVoiceBridgeSession; + event: RealtimeVoiceToolCallEvent; + config: GoogleMeetConfig; + fullConfig: OpenClawConfig; + runtime: PluginRuntime; + logger: RuntimeLogger; + meetingSessionId: string; + requesterSessionKey?: string; + transcript: Array<{ role: "user" | "assistant"; text: string }>; + onTalkEvent?: (event: TalkEventInput) => void; +}): void { + const callId = params.event.callId || params.event.itemId; + if (params.strategy !== "bidi") { + params.onTalkEvent?.({ + type: "tool.error", + callId, + payload: { + name: params.event.name, + error: `Tool "${params.event.name}" is only available in bidi realtime strategy`, + }, + final: true, + }); + params.session.submitToolResult(callId, { + error: `Tool "${params.event.name}" is only available in bidi realtime strategy`, + }); + return; + } + if (params.event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) { + params.onTalkEvent?.({ + type: "tool.error", + callId, + payload: { name: params.event.name, error: `Tool "${params.event.name}" not available` }, + final: true, + }); + params.session.submitToolResult(callId, { + error: `Tool "${params.event.name}" not available`, + }); + return; + } + params.onTalkEvent?.({ + type: "tool.progress", + callId, + payload: { name: params.event.name, status: "working" }, + }); + submitGoogleMeetConsultWorkingResponse(params.session, callId); + void consultOpenClawAgentForGoogleMeet({ + config: params.config, + fullConfig: params.fullConfig, + runtime: params.runtime, + logger: params.logger, + meetingSessionId: params.meetingSessionId, + requesterSessionKey: params.requesterSessionKey, + args: params.event.args, + transcript: params.transcript, + }) + .then((result) => { + params.onTalkEvent?.({ + type: "tool.result", + callId, + payload: { name: params.event.name, result }, + final: true, + }); + params.session.submitToolResult(callId, result); + }) + .catch((error: Error) => { + params.onTalkEvent?.({ + type: "tool.error", + callId, + payload: { name: params.event.name, error: formatErrorMessage(error) }, + final: true, + }); + params.session.submitToolResult(callId, { + error: formatErrorMessage(error), + }); + }); +} diff --git a/extensions/google-meet/src/realtime-node.ts b/extensions/google-meet/src/realtime-node.ts index 64f9e28c199..c1ddb092313 100644 --- a/extensions/google-meet/src/realtime-node.ts +++ b/extensions/google-meet/src/realtime-node.ts @@ -6,15 +6,20 @@ import type { RealtimeTranscriptionSession, } from "openclaw/plugin-sdk/realtime-transcription"; import { + createRealtimeVoiceAgentTalkbackQueue, + createTalkSessionController, createRealtimeVoiceBridgeSession, + type RealtimeVoiceAgentTalkbackQueue, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderPlugin, + type TalkEvent, + type TalkEventInput, + type TalkSessionController, } from "openclaw/plugin-sdk/realtime-voice"; import { consultOpenClawAgentForGoogleMeet, - GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME, + handleGoogleMeetRealtimeConsultToolCall, resolveGoogleMeetRealtimeTools, - submitGoogleMeetConsultWorkingResponse, } from "./agent-consult.js"; import type { GoogleMeetConfig } from "./config.js"; import { @@ -29,6 +34,8 @@ import { resolveGoogleMeetRealtimeProvider, resolveGoogleMeetRealtimeTranscriptionProvider, isGoogleMeetLikelyAssistantEchoTranscript, + pushGoogleMeetTalkEvent, + summarizeGoogleMeetTalkEvents, convertGoogleMeetBridgeAudioForStt, convertGoogleMeetTtsAudioForBridge, formatGoogleMeetAgentAudioModelLog, @@ -108,9 +115,7 @@ export async function startNodeAgentAudioBridge(params: { }), ); const transcript: GoogleMeetRealtimeTranscriptEntry[] = []; - let agentConsultActive = false; - let pendingAgentQuestion: string | undefined; - let agentConsultDebounceTimer: ReturnType | undefined; + let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined; let ttsQueue = Promise.resolve(); const stop = async () => { @@ -118,10 +123,7 @@ export async function startNodeAgentAudioBridge(params: { return; } stopped = true; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - agentConsultDebounceTimer = undefined; - } + agentTalkback?.close(); try { sttSession?.close(); } catch (error) { @@ -201,73 +203,26 @@ export async function startNodeAgentAudioBridge(params: { }); }; - const runAgentConsultForUserTranscript = async (question: string): Promise => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - if (agentConsultActive) { - pendingAgentQuestion = trimmed; - return; - } - agentConsultActive = true; - let nextQuestion: string | undefined = trimmed; - try { - while (nextQuestion) { - if (stopped) { - return; - } - const currentQuestion = nextQuestion; - pendingAgentQuestion = undefined; - params.logger.info(`[google-meet] node agent consult: ${currentQuestion}`); - const result = await consultOpenClawAgentForGoogleMeet({ - config: params.config, - fullConfig: params.fullConfig, - runtime: params.runtime, - logger: params.logger, - meetingSessionId: params.meetingSessionId, - requesterSessionKey: params.requesterSessionKey, - args: { - question: currentQuestion, - responseStyle: "Brief, natural spoken answer for a live meeting.", - }, - transcript, - }); - enqueueSpeakText(result.text); - nextQuestion = pendingAgentQuestion; - } - } catch (error) { - params.logger.warn(`[google-meet] node agent consult failed: ${formatErrorMessage(error)}`); - enqueueSpeakText("I hit an error while checking that. Please try again."); - } finally { - agentConsultActive = false; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - } - }; - - const enqueueAgentConsultForUserTranscript = (question: string): void => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - } - agentConsultDebounceTimer = setTimeout(() => { - agentConsultDebounceTimer = undefined; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - }, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS); - agentConsultDebounceTimer.unref?.(); - }; + agentTalkback = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS, + isStopped: () => stopped, + logger: params.logger, + logPrefix: "[google-meet] node agent", + responseStyle: "Brief, natural spoken answer for a live meeting.", + fallbackText: "I hit an error while checking that. Please try again.", + consult: ({ question, responseStyle }) => + consultOpenClawAgentForGoogleMeet({ + config: params.config, + fullConfig: params.fullConfig, + runtime: params.runtime, + logger: params.logger, + meetingSessionId: params.meetingSessionId, + requesterSessionKey: params.requesterSessionKey, + args: { question, responseStyle }, + transcript, + }), + deliver: enqueueSpeakText, + }); sttSession = resolved.provider.createSession({ providerConfig: resolved.providerConfig, @@ -284,7 +239,7 @@ export async function startNodeAgentAudioBridge(params: { ); return; } - enqueueAgentConsultForUserTranscript(trimmed); + agentTalkback?.enqueue(trimmed); }, onError: (error) => { params.logger.warn( @@ -404,6 +359,54 @@ export async function startNodeRealtimeAudioBridge(params: { const transcript: GoogleMeetRealtimeTranscriptEntry[] = []; const realtimeEvents: GoogleMeetRealtimeEventEntry[] = []; const strategy = params.config.realtime.strategy; + const talk: TalkSessionController = createTalkSessionController({ + sessionId: `google-meet:${params.meetingSessionId}:${params.bridgeId}:node-realtime`, + mode: "realtime", + transport: "gateway-relay", + brain: strategy === "bidi" ? "direct-tools" : "agent-consult", + provider: resolved.provider.id, + }); + const recentTalkEvents: TalkEvent[] = []; + const rememberTalkEvent = (event: TalkEvent | undefined): void => { + if (event) { + pushGoogleMeetTalkEvent(recentTalkEvents, event); + } + }; + const emitTalkEvent = (input: TalkEventInput): void => { + rememberTalkEvent(talk.emit(input)); + }; + const ensureTalkTurn = (): string => { + const turn = talk.ensureTurn({ + payload: { bridgeId: params.bridgeId, meetingSessionId: params.meetingSessionId }, + }); + if (turn.event) { + rememberTalkEvent(turn.event); + } + return turn.turnId; + }; + const finishOutputAudio = (reason: string): void => { + rememberTalkEvent( + talk.finishOutputAudio({ + payload: { bridgeId: params.bridgeId, reason }, + }), + ); + }; + const endTalkTurn = (reason = "completed"): void => { + const ended = talk.endTurn({ + payload: { bridgeId: params.bridgeId, reason }, + }); + if (ended.ok) { + rememberTalkEvent(ended.event); + } + }; + emitTalkEvent({ + type: "session.started", + payload: { + bridgeId: params.bridgeId, + meetingSessionId: params.meetingSessionId, + nodeId: params.nodeId, + }, + }); params.logger.info( formatGoogleMeetRealtimeVoiceModelLog({ strategy, @@ -413,95 +416,36 @@ export async function startNodeRealtimeAudioBridge(params: { audioFormat: params.config.chrome.audioFormat, }), ); - let agentConsultActive = false; - let pendingAgentQuestion: string | undefined; - let agentConsultDebounceTimer: ReturnType | undefined; - const enqueueAgentConsultForUserTranscript = (question: string): void => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - } - agentConsultDebounceTimer = setTimeout(() => { - agentConsultDebounceTimer = undefined; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - }, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS); - agentConsultDebounceTimer.unref?.(); - }; - const runAgentConsultForUserTranscript = async (question: string): Promise => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - if (agentConsultActive) { - pendingAgentQuestion = trimmed; - return; - } - agentConsultActive = true; - let nextQuestion: string | undefined = trimmed; - try { - while (nextQuestion) { - if (stopped) { - return; - } - const currentQuestion = nextQuestion; - pendingAgentQuestion = undefined; - params.logger.info(`[google-meet] node realtime agent consult: ${currentQuestion}`); - const result = await consultOpenClawAgentForGoogleMeet({ - config: params.config, - fullConfig: params.fullConfig, - runtime: params.runtime, - logger: params.logger, - meetingSessionId: params.meetingSessionId, - requesterSessionKey: params.requesterSessionKey, - args: { - question: currentQuestion, - responseStyle: "Brief, natural spoken answer for a live meeting.", - }, - transcript, - }); - if (!stopped && result.text.trim()) { - bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(result.text.trim())); - } - nextQuestion = pendingAgentQuestion; - } - } catch (error) { - params.logger.warn( - `[google-meet] node realtime agent consult failed: ${formatErrorMessage(error)}`, - ); - if (!stopped) { - bridge?.sendUserMessage( - buildGoogleMeetSpeakExactUserMessage( - "I hit an error while checking that. Please try again.", - ), - ); - } - } finally { - agentConsultActive = false; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - } - }; + let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined; + agentTalkback = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS, + isStopped: () => stopped, + logger: params.logger, + logPrefix: "[google-meet] node realtime agent", + responseStyle: "Brief, natural spoken answer for a live meeting.", + fallbackText: "I hit an error while checking that. Please try again.", + consult: ({ question, responseStyle }) => + consultOpenClawAgentForGoogleMeet({ + config: params.config, + fullConfig: params.fullConfig, + runtime: params.runtime, + logger: params.logger, + meetingSessionId: params.meetingSessionId, + requesterSessionKey: params.requesterSessionKey, + args: { question, responseStyle }, + transcript, + }), + deliver: (text) => { + bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(text)); + }, + }); const stop = async () => { if (stopped) { return; } stopped = true; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - agentConsultDebounceTimer = undefined; - } + agentTalkback?.close(); try { bridge?.close(); } catch (error) { @@ -537,6 +481,18 @@ export async function startNodeRealtimeAudioBridge(params: { audioSink: { isOpen: () => !stopped, sendAudio: (audio) => { + const turnId = ensureTalkTurn(); + rememberTalkEvent( + talk.startOutputAudio({ + turnId, + payload: { bridgeId: params.bridgeId }, + }).event, + ); + emitTalkEvent({ + type: "output.audio.delta", + turnId, + payload: { byteLength: audio.byteLength }, + }); const suppression = extendGoogleMeetOutputEchoSuppression({ audio, audioFormat: params.config.chrome.audioFormat, @@ -569,6 +525,7 @@ export async function startNodeRealtimeAudioBridge(params: { clearAudio: () => { lastClearAt = new Date().toISOString(); clearCount += 1; + finishOutputAudio("clear"); suppressInputUntil = 0; lastOutputPlayableUntilMs = 0; void params.runtime.nodes @@ -590,6 +547,30 @@ export async function startNodeRealtimeAudioBridge(params: { }, }, onTranscript: (role, text, isFinal) => { + const turnId = ensureTalkTurn(); + const eventType = + role === "assistant" + ? isFinal + ? "output.text.done" + : "output.text.delta" + : isFinal + ? "transcript.done" + : "transcript.delta"; + const payload = role === "assistant" ? { text } : { role, text }; + emitTalkEvent({ + type: eventType, + turnId, + payload, + final: isFinal, + }); + if (role === "user" && isFinal) { + emitTalkEvent({ + type: "input.audio.committed", + turnId, + payload: { bridgeId: params.bridgeId }, + final: true, + }); + } if (isFinal) { recordGoogleMeetRealtimeTranscript(transcript, role, text); params.logger.info(`[google-meet] node realtime ${role}: ${text}`); @@ -600,12 +581,35 @@ export async function startNodeRealtimeAudioBridge(params: { ); return; } - enqueueAgentConsultForUserTranscript(text); + agentTalkback?.enqueue(text); } } }, onEvent: (event) => { recordGoogleMeetRealtimeEvent(realtimeEvents, event); + if (event.type === "input_audio_buffer.speech_started") { + ensureTalkTurn(); + } else if (event.type === "input_audio_buffer.speech_stopped") { + const turnId = talk.activeTurnId; + if (!turnId) { + return; + } + emitTalkEvent({ + type: "input.audio.committed", + turnId, + payload: { bridgeId: params.bridgeId, source: event.type }, + final: true, + }); + } else if (event.type === "response.done") { + finishOutputAudio("response.done"); + endTalkTurn("response.done"); + } else if (event.type === "error") { + emitTalkEvent({ + type: "session.error", + payload: { message: event.detail ?? "Realtime provider error" }, + final: true, + }); + } if ( event.type === "error" || event.type === "response.done" || @@ -619,52 +623,57 @@ export async function startNodeRealtimeAudioBridge(params: { } }, onToolCall: (event, session) => { - if (strategy !== "bidi") { - session.submitToolResult(event.callId || event.itemId, { - error: `Tool "${event.name}" is only available in bidi realtime strategy`, - }); - return; - } - if (event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) { - session.submitToolResult(event.callId || event.itemId, { - error: `Tool "${event.name}" not available`, - }); - return; - } - submitGoogleMeetConsultWorkingResponse(session, event.callId || event.itemId); - void consultOpenClawAgentForGoogleMeet({ + emitTalkEvent({ + type: "tool.call", + turnId: ensureTalkTurn(), + itemId: event.itemId, + callId: event.callId, + payload: { name: event.name, args: event.args }, + }); + const turnId = ensureTalkTurn(); + handleGoogleMeetRealtimeConsultToolCall({ + strategy, + session, + event, config: params.config, fullConfig: params.fullConfig, runtime: params.runtime, logger: params.logger, meetingSessionId: params.meetingSessionId, requesterSessionKey: params.requesterSessionKey, - args: event.args, transcript, - }) - .then((result) => { - session.submitToolResult(event.callId || event.itemId, result); - }) - .catch((error: Error) => { - session.submitToolResult(event.callId || event.itemId, { - error: formatErrorMessage(error), - }); - }); + onTalkEvent: (input) => emitTalkEvent({ ...input, turnId: input.turnId ?? turnId }), + }); }, onError: (error) => { params.logger.warn( `[google-meet] node realtime voice bridge failed: ${formatErrorMessage(error)}`, ); + emitTalkEvent({ + type: "session.error", + payload: { message: formatErrorMessage(error) }, + final: true, + }); void stop(); }, onClose: (reason) => { realtimeReady = false; + finishOutputAudio(reason); + emitTalkEvent({ + type: "session.closed", + payload: { reason }, + final: true, + }); if (reason === "error") { void stop(); } }, onReady: () => { realtimeReady = true; + emitTalkEvent({ + type: "session.ready", + payload: { bridgeId: params.bridgeId }, + }); }, }); @@ -695,6 +704,11 @@ export async function startNodeRealtimeAudioBridge(params: { } lastInputAt = new Date().toISOString(); lastInputBytes += audio.byteLength; + emitTalkEvent({ + type: "input.audio.delta", + turnId: ensureTalkTurn(), + payload: { byteLength: audio.byteLength }, + }); bridge?.sendAudio(audio); } if (result.closed === true) { @@ -740,6 +754,7 @@ export async function startNodeRealtimeAudioBridge(params: { suppressedInputBytes, ...getGoogleMeetRealtimeTranscriptHealth(transcript), ...getGoogleMeetRealtimeEventHealth(realtimeEvents), + recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents), consecutiveInputErrors, lastInputError, clearCount, diff --git a/extensions/google-meet/src/realtime.ts b/extensions/google-meet/src/realtime.ts index 929a2eeacd4..61110534817 100644 --- a/extensions/google-meet/src/realtime.ts +++ b/extensions/google-meet/src/realtime.ts @@ -11,23 +11,35 @@ import { type RealtimeTranscriptionSession, } from "openclaw/plugin-sdk/realtime-transcription"; import { + createRealtimeVoiceAgentTalkbackQueue, createRealtimeVoiceBridgeSession, + createTalkSessionController, convertPcmToMulaw8k, + extendRealtimeVoiceOutputEchoSuppression, + getRealtimeVoiceBridgeEventHealth, + getRealtimeVoiceTranscriptHealth, + isLikelyRealtimeVoiceAssistantEchoTranscript, mulawToPcm, REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ, REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, + recordRealtimeVoiceBridgeEvent, + recordRealtimeVoiceTranscript, resamplePcm, resolveConfiguredRealtimeVoiceProvider, + type RealtimeVoiceAgentTalkbackQueue, + type RealtimeVoiceBridgeEventLogEntry, type RealtimeVoiceBridgeSession, - type RealtimeVoiceBridgeEvent, type RealtimeVoiceProviderConfig, type RealtimeVoiceProviderPlugin, + type RealtimeVoiceTranscriptEntry, + type TalkEvent, + type TalkEventInput, + type TalkSessionController, } from "openclaw/plugin-sdk/realtime-voice"; import { consultOpenClawAgentForGoogleMeet, - GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME, + handleGoogleMeetRealtimeConsultToolCall, resolveGoogleMeetRealtimeTools, - submitGoogleMeetConsultWorkingResponse, } from "./agent-consult.js"; import type { GoogleMeetConfig } from "./config.js"; import type { GoogleMeetChromeHealth } from "./transports/types.js"; @@ -71,48 +83,16 @@ type ResolvedRealtimeTranscriptionProvider = { providerConfig: RealtimeTranscriptionProviderConfig; }; -export type GoogleMeetRealtimeTranscriptEntry = { - at: string; - role: "user" | "assistant"; - text: string; -}; - -export function recordGoogleMeetRealtimeTranscript( - transcript: GoogleMeetRealtimeTranscriptEntry[], - role: "user" | "assistant", - text: string, -): GoogleMeetRealtimeTranscriptEntry { - const entry = { at: new Date().toISOString(), role, text }; - transcript.push(entry); - if (transcript.length > 40) { - transcript.splice(0, transcript.length - 40); - } - return entry; -} +export type GoogleMeetRealtimeTranscriptEntry = RealtimeVoiceTranscriptEntry; +export const recordGoogleMeetRealtimeTranscript = recordRealtimeVoiceTranscript; export function getGoogleMeetRealtimeTranscriptHealth( transcript: GoogleMeetRealtimeTranscriptEntry[], -): Pick< - GoogleMeetChromeHealth, - | "realtimeTranscriptLines" - | "lastRealtimeTranscriptAt" - | "lastRealtimeTranscriptRole" - | "lastRealtimeTranscriptText" - | "recentRealtimeTranscript" -> { - const last = transcript.at(-1); - return { - realtimeTranscriptLines: transcript.length, - lastRealtimeTranscriptAt: last?.at, - lastRealtimeTranscriptRole: last?.role, - lastRealtimeTranscriptText: last?.text, - recentRealtimeTranscript: transcript.slice(-5), - }; +): Pick> { + return getRealtimeVoiceTranscriptHealth(transcript); } -export type GoogleMeetRealtimeEventEntry = RealtimeVoiceBridgeEvent & { - at: string; -}; +export type GoogleMeetRealtimeEventEntry = RealtimeVoiceBridgeEventLogEntry; export const GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS = 900; export const GOOGLE_MEET_OUTPUT_ECHO_SUPPRESSION_TAIL_MS = 3_000; @@ -120,33 +100,15 @@ export const GOOGLE_MEET_TRANSCRIPT_ECHO_LOOKBACK_MS = 45_000; export function recordGoogleMeetRealtimeEvent( events: GoogleMeetRealtimeEventEntry[], - event: RealtimeVoiceBridgeEvent, -) { - if (event.direction === "client" && event.type === "input_audio_buffer.append") { - return; - } - events.push({ at: new Date().toISOString(), ...event }); - if (events.length > 40) { - events.splice(0, events.length - 40); - } + event: Parameters[1], +): void { + recordRealtimeVoiceBridgeEvent(events, event); } export function getGoogleMeetRealtimeEventHealth( events: GoogleMeetRealtimeEventEntry[], -): Pick< - GoogleMeetChromeHealth, - | "lastRealtimeEventAt" - | "lastRealtimeEventType" - | "lastRealtimeEventDetail" - | "recentRealtimeEvents" -> { - const last = events.at(-1); - return { - lastRealtimeEventAt: last?.at, - lastRealtimeEventType: last ? `${last.direction}:${last.type}` : undefined, - lastRealtimeEventDetail: last?.detail, - recentRealtimeEvents: events.slice(-10), - }; +): Pick> { + return getRealtimeVoiceBridgeEventHealth(events); } function splitCommand(argv: string[]): { command: string; args: string[] } { @@ -174,61 +136,15 @@ function readPcm16Stats(audio: Buffer): { rms: number; peak: number } { }; } -function normalizeTranscriptForEchoMatch(text: string): string[] { - return text - .toLowerCase() - .replace(/['’]/g, "") - .replace(/[^a-z0-9]+/g, " ") - .trim() - .split(/\s+/) - .filter((token) => token.length > 1); -} - -function hasMeaningfulEchoOverlap(userTokens: string[], assistantTokens: string[]): boolean { - if (userTokens.length < 4 || assistantTokens.length < 4) { - return false; - } - const uniqueUserTokens = [...new Set(userTokens)]; - if (uniqueUserTokens.length < 4) { - return false; - } - const assistantTokenSet = new Set(assistantTokens); - const overlap = uniqueUserTokens.filter((token) => assistantTokenSet.has(token)).length; - return overlap / uniqueUserTokens.length >= 0.58; -} - export function isGoogleMeetLikelyAssistantEchoTranscript(params: { transcript: GoogleMeetRealtimeTranscriptEntry[]; text: string; nowMs?: number; }): boolean { - const userTokens = normalizeTranscriptForEchoMatch(params.text); - if (userTokens.length < 4) { - return false; - } - const nowMs = params.nowMs ?? Date.now(); - const recentAssistantText = params.transcript - .filter((entry) => { - if (entry.role !== "assistant") { - return false; - } - const at = Date.parse(entry.at); - return Number.isFinite(at) && nowMs - at <= GOOGLE_MEET_TRANSCRIPT_ECHO_LOOKBACK_MS; - }) - .slice(-6) - .map((entry) => entry.text) - .join(" "); - if (!recentAssistantText.trim()) { - return false; - } - const userNormalized = userTokens.join(" "); - const assistantTokens = normalizeTranscriptForEchoMatch(recentAssistantText); - const assistantNormalized = assistantTokens.join(" "); - return ( - (userNormalized.length >= 18 && assistantNormalized.includes(userNormalized)) || - (assistantNormalized.length >= 18 && userNormalized.includes(assistantNormalized)) || - hasMeaningfulEchoOverlap(userTokens, assistantTokens) - ); + return isLikelyRealtimeVoiceAssistantEchoTranscript({ + ...params, + lookbackMs: GOOGLE_MEET_TRANSCRIPT_ECHO_LOOKBACK_MS, + }); } export function extendGoogleMeetOutputEchoSuppression(params: { @@ -239,17 +155,11 @@ export function extendGoogleMeetOutputEchoSuppression(params: { suppressInputUntilMs: number; }): { lastOutputPlayableUntilMs: number; suppressInputUntilMs: number; durationMs: number } { const bytesPerMs = params.audioFormat === "g711-ulaw-8khz" ? 8 : 48; - const durationMs = Math.ceil(params.audio.byteLength / bytesPerMs); - const playbackStartMs = Math.max(params.nowMs, params.lastOutputPlayableUntilMs); - const playbackEndMs = playbackStartMs + durationMs; - return { - durationMs, - lastOutputPlayableUntilMs: playbackEndMs, - suppressInputUntilMs: Math.max( - params.suppressInputUntilMs, - playbackEndMs + GOOGLE_MEET_OUTPUT_ECHO_SUPPRESSION_TAIL_MS, - ), - }; + return extendRealtimeVoiceOutputEchoSuppression({ + ...params, + bytesPerMs, + tailMs: GOOGLE_MEET_OUTPUT_ECHO_SUPPRESSION_TAIL_MS, + }); } export function resolveGoogleMeetRealtimeAudioFormat(config: GoogleMeetConfig) { @@ -508,6 +418,31 @@ function normalizeGoogleMeetTtsPromptText(text: string | undefined): string | un return trimmed; } +export function pushGoogleMeetTalkEvent( + events: TalkEvent[], + event: TalkEvent, + maxEntries = 40, +): void { + events.push(event); + if (events.length > maxEntries) { + events.splice(0, events.length - maxEntries); + } +} + +export function summarizeGoogleMeetTalkEvents( + events: TalkEvent[], +): NonNullable { + return events.slice(-20).map((event) => ({ + id: event.id, + type: event.type, + sessionId: event.sessionId, + turnId: event.turnId, + seq: event.seq, + timestamp: event.timestamp, + final: event.final, + })); +} + export async function startCommandAgentAudioBridge(params: { config: GoogleMeetConfig; fullConfig: OpenClawConfig; @@ -542,9 +477,7 @@ export async function startCommandAgentAudioBridge(params: { let lastSuppressedInputAt: string | undefined; let suppressInputUntil = 0; let lastOutputPlayableUntilMs = 0; - let agentConsultActive = false; - let pendingAgentQuestion: string | undefined; - let agentConsultDebounceTimer: ReturnType | undefined; + let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined; let ttsQueue = Promise.resolve(); const transcript: GoogleMeetRealtimeTranscriptEntry[] = []; const resolved = resolveGoogleMeetRealtimeTranscriptionProvider({ @@ -552,6 +485,34 @@ export async function startCommandAgentAudioBridge(params: { fullConfig: params.fullConfig, providers: params.providers, }); + const talk = createTalkSessionController({ + sessionId: `google-meet:${params.meetingSessionId}:agent`, + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: resolved.provider.id, + turnIdPrefix: `google-meet:${params.meetingSessionId}:turn`, + }); + const recentTalkEvents: TalkEvent[] = []; + const emitTalkEvent = (input: TalkEventInput) => + pushGoogleMeetTalkEvent(recentTalkEvents, talk.emit(input)); + const ensureTalkTurn = () => { + const turn = talk.ensureTurn({ + payload: { meetingSessionId: params.meetingSessionId }, + }); + if (turn.event) { + pushGoogleMeetTalkEvent(recentTalkEvents, turn.event); + } + return turn.turnId; + }; + const endTalkTurn = () => { + const ended = talk.endTurn({ + payload: { meetingSessionId: params.meetingSessionId }, + }); + if (ended.ok) { + pushGoogleMeetTalkEvent(recentTalkEvents, ended.event); + } + }; params.logger.info( formatGoogleMeetAgentAudioModelLog({ provider: resolved.provider, @@ -593,10 +554,7 @@ export async function startCommandAgentAudioBridge(params: { return; } stopped = true; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - agentConsultDebounceTimer = undefined; - } + agentTalkback?.close(); try { sttSession?.close(); } catch (error) { @@ -604,6 +562,11 @@ export async function startCommandAgentAudioBridge(params: { `[google-meet] agent transcription bridge close ignored: ${formatErrorMessage(error)}`, ); } + emitTalkEvent({ + type: "session.closed", + final: true, + payload: { meetingSessionId: params.meetingSessionId }, + }); terminateProcess(inputProcess); terminateProcess(outputProcess); }; @@ -646,6 +609,11 @@ export async function startCommandAgentAudioBridge(params: { lastOutputPlayableUntilMs = suppression.lastOutputPlayableUntilMs; lastOutputAt = new Date().toISOString(); lastOutputBytes += audio.byteLength; + emitTalkEvent({ + type: "output.audio.delta", + turnId: ensureTalkTurn(), + payload: { meetingSessionId: params.meetingSessionId, bytes: audio.byteLength }, + }); try { outputProcess.stdin?.write(audio); } catch (error) { @@ -665,6 +633,13 @@ export async function startCommandAgentAudioBridge(params: { } recordGoogleMeetRealtimeTranscript(transcript, "assistant", normalized); params.logger.info(`[google-meet] agent assistant: ${normalized}`); + const turnId = ensureTalkTurn(); + emitTalkEvent({ + type: "output.text.done", + turnId, + final: true, + payload: { meetingSessionId: params.meetingSessionId, text: normalized }, + }); const result = await params.runtime.tts.textToSpeechTelephony({ text: normalized, cfg: params.fullConfig, @@ -673,6 +648,11 @@ export async function startCommandAgentAudioBridge(params: { throw new Error(result.error ?? "TTS conversion failed"); } params.logger.info(formatGoogleMeetAgentTtsResultLog("agent", result)); + emitTalkEvent({ + type: "output.audio.started", + turnId, + payload: { meetingSessionId: params.meetingSessionId }, + }); writeOutputAudio( convertGoogleMeetTtsAudioForBridge( result.audioBuffer, @@ -681,79 +661,39 @@ export async function startCommandAgentAudioBridge(params: { result.outputFormat, ), ); + emitTalkEvent({ + type: "output.audio.done", + turnId, + final: true, + payload: { meetingSessionId: params.meetingSessionId }, + }); + endTalkTurn(); }) .catch((error) => { params.logger.warn(`[google-meet] agent TTS failed: ${formatErrorMessage(error)}`); }); }; - const runAgentConsultForUserTranscript = async (question: string): Promise => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - if (agentConsultActive) { - pendingAgentQuestion = trimmed; - return; - } - agentConsultActive = true; - let nextQuestion: string | undefined = trimmed; - try { - while (nextQuestion) { - if (stopped) { - return; - } - const currentQuestion = nextQuestion; - pendingAgentQuestion = undefined; - params.logger.info(`[google-meet] agent consult: ${currentQuestion}`); - const result = await consultOpenClawAgentForGoogleMeet({ - config: params.config, - fullConfig: params.fullConfig, - runtime: params.runtime, - logger: params.logger, - meetingSessionId: params.meetingSessionId, - requesterSessionKey: params.requesterSessionKey, - args: { - question: currentQuestion, - responseStyle: "Brief, natural spoken answer for a live meeting.", - }, - transcript, - }); - enqueueSpeakText(result.text); - nextQuestion = pendingAgentQuestion; - } - } catch (error) { - params.logger.warn(`[google-meet] agent consult failed: ${formatErrorMessage(error)}`); - enqueueSpeakText("I hit an error while checking that. Please try again."); - } finally { - agentConsultActive = false; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - } - }; - - const enqueueAgentConsultForUserTranscript = (question: string): void => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - } - agentConsultDebounceTimer = setTimeout(() => { - agentConsultDebounceTimer = undefined; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - }, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS); - agentConsultDebounceTimer.unref?.(); - }; + agentTalkback = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS, + isStopped: () => stopped, + logger: params.logger, + logPrefix: "[google-meet] agent", + responseStyle: "Brief, natural spoken answer for a live meeting.", + fallbackText: "I hit an error while checking that. Please try again.", + consult: ({ question, responseStyle }) => + consultOpenClawAgentForGoogleMeet({ + config: params.config, + fullConfig: params.fullConfig, + runtime: params.runtime, + logger: params.logger, + meetingSessionId: params.meetingSessionId, + requesterSessionKey: params.requesterSessionKey, + args: { question, responseStyle }, + transcript, + }), + deliver: enqueueSpeakText, + }); sttSession = resolved.provider.createSession({ providerConfig: resolved.providerConfig, @@ -762,24 +702,50 @@ export async function startCommandAgentAudioBridge(params: { if (!trimmed || stopped) { return; } + const turnId = ensureTalkTurn(); + emitTalkEvent({ + type: "input.audio.committed", + turnId, + final: true, + payload: { meetingSessionId: params.meetingSessionId }, + }); + emitTalkEvent({ + type: "transcript.done", + turnId, + final: true, + payload: { meetingSessionId: params.meetingSessionId, text: trimmed, role: "user" }, + }); recordGoogleMeetRealtimeTranscript(transcript, "user", trimmed); params.logger.info(`[google-meet] agent user: ${trimmed}`); if (isGoogleMeetLikelyAssistantEchoTranscript({ transcript, text: trimmed })) { params.logger.info(`[google-meet] agent ignored assistant echo transcript: ${trimmed}`); return; } - enqueueAgentConsultForUserTranscript(trimmed); + agentTalkback?.enqueue(trimmed); }, onError: (error) => { params.logger.warn( `[google-meet] agent transcription bridge failed: ${formatErrorMessage(error)}`, ); + emitTalkEvent({ + type: "session.error", + final: true, + payload: { meetingSessionId: params.meetingSessionId, error: formatErrorMessage(error) }, + }); void stop(); }, }); + emitTalkEvent({ + type: "session.started", + payload: { meetingSessionId: params.meetingSessionId, provider: resolved.provider.id }, + }); await sttSession.connect(); realtimeReady = true; + emitTalkEvent({ + type: "session.ready", + payload: { meetingSessionId: params.meetingSessionId }, + }); inputProcess.stdout?.on("data", (chunk) => { if (stopped) { @@ -793,6 +759,11 @@ export async function startCommandAgentAudioBridge(params: { } lastInputAt = new Date().toISOString(); lastInputBytes += audio.byteLength; + emitTalkEvent({ + type: "input.audio.delta", + turnId: ensureTalkTurn(), + payload: { meetingSessionId: params.meetingSessionId, bytes: audio.byteLength }, + }); sttSession?.sendAudio(convertGoogleMeetBridgeAudioForStt(audio, params.config)); }); @@ -813,6 +784,7 @@ export async function startCommandAgentAudioBridge(params: { lastOutputBytes, suppressedInputBytes, ...getGoogleMeetRealtimeTranscriptHealth(transcript), + recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents), bridgeClosed: stopped, }), stop, @@ -859,7 +831,7 @@ export async function startCommandRealtimeAudioBridge(params: { let lastOutputAtMs = 0; let lastOutputPlayableUntilMs = 0; let bargeInInputProcess: BridgeProcess | undefined; - let agentConsultDebounceTimer: ReturnType | undefined; + let agentTalkback: RealtimeVoiceAgentTalkbackQueue | undefined; const suppressInputForOutput = (audio: Buffer) => { const suppression = extendGoogleMeetOutputEchoSuppression({ @@ -906,10 +878,7 @@ export async function startCommandRealtimeAudioBridge(params: { return; } stopped = true; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - agentConsultDebounceTimer = undefined; - } + agentTalkback?.close(); try { bridge?.close(); } catch (error) { @@ -1065,84 +1034,72 @@ export async function startCommandRealtimeAudioBridge(params: { ); const transcript: GoogleMeetRealtimeTranscriptEntry[] = []; const realtimeEvents: GoogleMeetRealtimeEventEntry[] = []; - let agentConsultActive = false; - let pendingAgentQuestion: string | undefined; - const enqueueAgentConsultForUserTranscript = (question: string): void => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - pendingAgentQuestion = pendingAgentQuestion ? `${pendingAgentQuestion}\n${trimmed}` : trimmed; - if (agentConsultDebounceTimer) { - clearTimeout(agentConsultDebounceTimer); - } - agentConsultDebounceTimer = setTimeout(() => { - agentConsultDebounceTimer = undefined; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } - }, GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS); - agentConsultDebounceTimer.unref?.(); - }; - const runAgentConsultForUserTranscript = async (question: string): Promise => { - const trimmed = question.trim(); - if (!trimmed || stopped) { - return; - } - if (agentConsultActive) { - pendingAgentQuestion = trimmed; - return; - } - agentConsultActive = true; - let nextQuestion: string | undefined = trimmed; - try { - while (nextQuestion) { - if (stopped) { - return; - } - const currentQuestion = nextQuestion; - pendingAgentQuestion = undefined; - params.logger.info(`[google-meet] realtime agent consult: ${currentQuestion}`); - const result = await consultOpenClawAgentForGoogleMeet({ - config: params.config, - fullConfig: params.fullConfig, - runtime: params.runtime, - logger: params.logger, - meetingSessionId: params.meetingSessionId, - requesterSessionKey: params.requesterSessionKey, - args: { - question: currentQuestion, - responseStyle: "Brief, natural spoken answer for a live meeting.", - }, - transcript, - }); - if (!stopped && result.text.trim()) { - bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(result.text.trim())); - } - nextQuestion = pendingAgentQuestion; - } - } catch (error) { - params.logger.warn( - `[google-meet] realtime agent consult failed: ${formatErrorMessage(error)}`, - ); - if (!stopped) { - bridge?.sendUserMessage( - buildGoogleMeetSpeakExactUserMessage( - "I hit an error while checking that. Please try again.", - ), - ); - } - } finally { - agentConsultActive = false; - const queuedQuestion = pendingAgentQuestion; - pendingAgentQuestion = undefined; - if (queuedQuestion && !stopped) { - void runAgentConsultForUserTranscript(queuedQuestion); - } + const talk: TalkSessionController = createTalkSessionController({ + sessionId: `google-meet:${params.meetingSessionId}:command-realtime`, + mode: "realtime", + transport: "gateway-relay", + brain: strategy === "bidi" ? "direct-tools" : "agent-consult", + provider: resolved.provider.id, + }); + const recentTalkEvents: TalkEvent[] = []; + const rememberTalkEvent = (event: TalkEvent | undefined): void => { + if (event) { + pushGoogleMeetTalkEvent(recentTalkEvents, event); } }; + const emitTalkEvent = (input: TalkEventInput): void => { + rememberTalkEvent(talk.emit(input)); + }; + const ensureTalkTurn = (): string => { + const turn = talk.ensureTurn({ + payload: { meetingSessionId: params.meetingSessionId }, + }); + if (turn.event) { + rememberTalkEvent(turn.event); + } + return turn.turnId; + }; + const finishOutputAudio = (reason: string): void => { + rememberTalkEvent( + talk.finishOutputAudio({ + payload: { reason }, + }), + ); + }; + const endTalkTurn = (reason = "completed"): void => { + const ended = talk.endTurn({ + payload: { reason }, + }); + if (ended.ok) { + rememberTalkEvent(ended.event); + } + }; + emitTalkEvent({ + type: "session.started", + payload: { meetingSessionId: params.meetingSessionId }, + }); + agentTalkback = createRealtimeVoiceAgentTalkbackQueue({ + debounceMs: GOOGLE_MEET_AGENT_TRANSCRIPT_DEBOUNCE_MS, + isStopped: () => stopped, + logger: params.logger, + logPrefix: "[google-meet] realtime agent", + responseStyle: "Brief, natural spoken answer for a live meeting.", + fallbackText: "I hit an error while checking that. Please try again.", + consult: ({ question, responseStyle }) => + consultOpenClawAgentForGoogleMeet({ + config: params.config, + fullConfig: params.fullConfig, + runtime: params.runtime, + logger: params.logger, + meetingSessionId: params.meetingSessionId, + requesterSessionKey: params.requesterSessionKey, + args: { question, responseStyle }, + transcript, + }), + deliver: (text) => { + bridge?.sendUserMessage(buildGoogleMeetSpeakExactUserMessage(text)); + }, + }); bridge = createRealtimeVoiceBridgeSession({ provider: resolved.provider, providerConfig: resolved.providerConfig, @@ -1157,15 +1114,54 @@ export async function startCommandRealtimeAudioBridge(params: { audioSink: { isOpen: () => !stopped, sendAudio: (audio) => { + const turnId = ensureTalkTurn(); + rememberTalkEvent( + talk.startOutputAudio({ + turnId, + payload: { meetingSessionId: params.meetingSessionId }, + }).event, + ); + emitTalkEvent({ + type: "output.audio.delta", + turnId, + payload: { byteLength: audio.byteLength }, + }); lastOutputAtMs = Date.now(); lastOutputAt = new Date().toISOString(); lastOutputBytes += audio.byteLength; suppressInputForOutput(audio); writeOutputAudio(audio); }, - clearAudio: clearOutputPlayback, + clearAudio: () => { + clearOutputPlayback(); + finishOutputAudio("clear"); + }, }, onTranscript: (role, text, isFinal) => { + const turnId = ensureTalkTurn(); + const eventType = + role === "assistant" + ? isFinal + ? "output.text.done" + : "output.text.delta" + : isFinal + ? "transcript.done" + : "transcript.delta"; + const payload = role === "assistant" ? { text } : { role, text }; + emitTalkEvent({ + type: eventType, + turnId, + payload, + final: isFinal, + }); + if (role === "user" && isFinal) { + emitTalkEvent({ + type: "input.audio.committed", + turnId, + payload: { meetingSessionId: params.meetingSessionId }, + final: true, + }); + } if (isFinal) { recordGoogleMeetRealtimeTranscript(transcript, role, text); params.logger.info(`[google-meet] realtime ${role}: ${text}`); @@ -1174,12 +1170,35 @@ export async function startCommandRealtimeAudioBridge(params: { params.logger.info(`[google-meet] realtime ignored assistant echo transcript: ${text}`); return; } - enqueueAgentConsultForUserTranscript(text); + agentTalkback?.enqueue(text); } } }, onEvent: (event) => { recordGoogleMeetRealtimeEvent(realtimeEvents, event); + if (event.type === "input_audio_buffer.speech_started") { + ensureTalkTurn(); + } else if (event.type === "input_audio_buffer.speech_stopped") { + const turnId = talk.activeTurnId; + if (!turnId) { + return; + } + emitTalkEvent({ + type: "input.audio.committed", + turnId, + payload: { meetingSessionId: params.meetingSessionId, source: event.type }, + final: true, + }); + } else if (event.type === "response.done") { + finishOutputAudio("response.done"); + endTalkTurn("response.done"); + } else if (event.type === "error") { + emitTalkEvent({ + type: "session.error", + payload: { message: event.detail ?? "Realtime provider error" }, + final: true, + }); + } if ( event.type === "error" || event.type === "response.done" || @@ -1193,47 +1212,54 @@ export async function startCommandRealtimeAudioBridge(params: { } }, onToolCall: (event, session) => { - if (strategy !== "bidi") { - session.submitToolResult(event.callId || event.itemId, { - error: `Tool "${event.name}" is only available in bidi realtime strategy`, - }); - return; - } - if (event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) { - session.submitToolResult(event.callId || event.itemId, { - error: `Tool "${event.name}" not available`, - }); - return; - } - submitGoogleMeetConsultWorkingResponse(session, event.callId || event.itemId); - void consultOpenClawAgentForGoogleMeet({ + emitTalkEvent({ + type: "tool.call", + turnId: ensureTalkTurn(), + itemId: event.itemId, + callId: event.callId, + payload: { name: event.name, args: event.args }, + }); + const turnId = ensureTalkTurn(); + handleGoogleMeetRealtimeConsultToolCall({ + strategy, + session, + event, config: params.config, fullConfig: params.fullConfig, runtime: params.runtime, logger: params.logger, meetingSessionId: params.meetingSessionId, requesterSessionKey: params.requesterSessionKey, - args: event.args, transcript, - }) - .then((result) => { - session.submitToolResult(event.callId || event.itemId, result); - }) - .catch((error: Error) => { - session.submitToolResult(event.callId || event.itemId, { - error: formatErrorMessage(error), - }); - }); + onTalkEvent: (input) => emitTalkEvent({ ...input, turnId: input.turnId ?? turnId }), + }); + }, + onError: (error) => { + emitTalkEvent({ + type: "session.error", + payload: { message: formatErrorMessage(error) }, + final: true, + }); + fail("realtime voice bridge")(error); }, - onError: fail("realtime voice bridge"), onClose: (reason) => { realtimeReady = false; + finishOutputAudio(reason); + emitTalkEvent({ + type: "session.closed", + payload: { reason }, + final: true, + }); if (reason === "error") { void stop(); } }, onReady: () => { realtimeReady = true; + emitTalkEvent({ + type: "session.ready", + payload: { meetingSessionId: params.meetingSessionId }, + }); }, }); startHumanBargeInMonitor(); @@ -1248,6 +1274,11 @@ export async function startCommandRealtimeAudioBridge(params: { } lastInputAt = new Date().toISOString(); lastInputBytes += audio.byteLength; + emitTalkEvent({ + type: "input.audio.delta", + turnId: ensureTalkTurn(), + payload: { byteLength: audio.byteLength }, + }); bridge?.sendAudio(Buffer.from(audio)); } }); @@ -1273,6 +1304,7 @@ export async function startCommandRealtimeAudioBridge(params: { suppressedInputBytes, ...getGoogleMeetRealtimeTranscriptHealth(transcript), ...getGoogleMeetRealtimeEventHealth(realtimeEvents), + recentTalkEvents: summarizeGoogleMeetTalkEvents(recentTalkEvents), lastClearAt, clearCount, bridgeClosed: stopped, diff --git a/extensions/google-meet/src/transports/types.ts b/extensions/google-meet/src/transports/types.ts index cac0298829c..37e7679acb0 100644 --- a/extensions/google-meet/src/transports/types.ts +++ b/extensions/google-meet/src/transports/types.ts @@ -62,6 +62,15 @@ export type GoogleMeetChromeHealth = { type: string; detail?: string; }>; + recentTalkEvents?: Array<{ + id: string; + type: string; + sessionId: string; + turnId?: string; + seq: number; + timestamp: string; + final?: boolean; + }>; manualActionRequired?: boolean; manualActionReason?: GoogleMeetManualActionReason; manualActionMessage?: string; diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts index d2b87c149e8..89a33023390 100644 --- a/extensions/voice-call/src/media-stream.test.ts +++ b/extensions/voice-call/src/media-stream.test.ts @@ -3,7 +3,9 @@ import net from "node:net"; import type { RealtimeTranscriptionProviderPlugin, RealtimeTranscriptionSession, + RealtimeTranscriptionSessionCreateRequest, } from "openclaw/plugin-sdk/realtime-transcription"; +import { createTalkSessionController, type TalkEvent } from "openclaw/plugin-sdk/realtime-voice"; import { describe, expect, it, vi } from "vitest"; import { WebSocket } from "ws"; import { MediaStreamHandler, sanitizeLogText } from "./media-stream.js"; @@ -160,6 +162,124 @@ describe("MediaStreamHandler TTS queue", () => { }); describe("MediaStreamHandler security hardening", () => { + it("emits common Talk events for telephony STT/TTS sessions", async () => { + let callbacks: RealtimeTranscriptionSessionCreateRequest | undefined; + const sentAudio: Buffer[] = []; + const session: RealtimeTranscriptionSession = { + connect: async () => {}, + sendAudio: (audio) => { + sentAudio.push(Buffer.from(audio)); + }, + close: () => {}, + isConnected: () => true, + }; + const talkEvents: TalkEvent[] = []; + const handler = new MediaStreamHandler({ + transcriptionProvider: { + createSession: (request) => { + callbacks = request; + return session; + }, + id: "openai", + label: "OpenAI", + isConfigured: () => true, + }, + providerConfig: {}, + shouldAcceptStream: () => true, + onTalkEvent: (_callId, _streamSid, event) => { + talkEvents.push(event); + }, + }); + const server = await startWsServer(handler); + + try { + const ws = await connectWs(server.url); + ws.send( + JSON.stringify({ + event: "start", + streamSid: "MZ-talk", + start: { callSid: "CA-talk" }, + }), + ); + await flush(); + + ws.send( + JSON.stringify({ + event: "media", + streamSid: "MZ-talk", + media: { payload: Buffer.from("hello").toString("base64") }, + }), + ); + await flush(); + expect(Buffer.concat(sentAudio).toString()).toBe("hello"); + + callbacks?.onSpeechStart?.(); + callbacks?.onPartial?.("hel"); + callbacks?.onTranscript?.("hello there"); + + await handler.queueTts("MZ-talk", async () => { + handler.sendAudio("MZ-talk", Buffer.alloc(160, 0xff)); + }); + + const activePlayback = handler.queueTts("MZ-talk", async (signal) => { + await waitForAbort(signal); + }); + await flush(); + handler.clearTtsQueue("MZ-talk", "barge-in"); + await activePlayback; + + ws.close(); + await waitForClose(ws); + await vi.waitFor(() => { + expect(talkEvents.some((event) => event.type === "session.closed")).toBe(true); + }); + + expect(talkEvents.map((event) => event.type)).toEqual([ + "session.started", + "session.ready", + "turn.started", + "input.audio.delta", + "transcript.delta", + "input.audio.committed", + "transcript.done", + "output.audio.started", + "output.audio.delta", + "output.audio.done", + "turn.ended", + "turn.started", + "output.audio.started", + "turn.cancelled", + "session.closed", + ]); + expect(talkEvents[0]).toEqual( + expect.objectContaining({ + sessionId: "voice-call:CA-talk:MZ-talk", + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + seq: 1, + }), + ); + expect(talkEvents.find((event) => event.type === "transcript.done")).toEqual( + expect.objectContaining({ + final: true, + turnId: "MZ-talk:turn-1", + payload: expect.objectContaining({ text: "hello there", role: "user" }), + }), + ); + expect(talkEvents.find((event) => event.type === "turn.cancelled")).toEqual( + expect.objectContaining({ + final: true, + turnId: "MZ-talk:turn-2", + payload: expect.objectContaining({ reason: "barge-in" }), + }), + ); + } finally { + await server.close(); + } + }); + it("fails sends and closes stream when buffered bytes already exceed the cap", () => { const handler = new MediaStreamHandler({ transcriptionProvider: createStubSttProvider(), @@ -180,6 +300,7 @@ describe("MediaStreamHandler security hardening", () => { streamSid: string; ws: WebSocket; sttSession: RealtimeTranscriptionSession; + talk: ReturnType; } >; } @@ -188,6 +309,13 @@ describe("MediaStreamHandler security hardening", () => { streamSid: "MZ-backpressure", ws, sttSession: createStubSession(), + talk: createTalkSessionController({ + sessionId: "voice-call:CA-backpressure:MZ-backpressure", + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + }), }); const result = handler.sendAudio("MZ-backpressure", Buffer.alloc(160, 0xff)); diff --git a/extensions/voice-call/src/media-stream.ts b/extensions/voice-call/src/media-stream.ts index c884e887a69..193b1abde22 100644 --- a/extensions/voice-call/src/media-stream.ts +++ b/extensions/voice-call/src/media-stream.ts @@ -14,6 +14,12 @@ import type { RealtimeTranscriptionProviderPlugin, RealtimeTranscriptionSession, } from "openclaw/plugin-sdk/realtime-transcription"; +import { + createTalkSessionController, + type TalkEvent, + type TalkEventInput, + type TalkSessionController, +} from "openclaw/plugin-sdk/realtime-voice"; import { type RawData, WebSocket, WebSocketServer } from "ws"; /** @@ -48,6 +54,8 @@ export interface MediaStreamConfig { onSpeechStart?: (callId: string) => void; /** Callback when stream disconnects */ onDisconnect?: (callId: string, streamSid: string) => void; + /** Callback for common Talk events emitted by the telephony STT/TTS adapter. */ + onTalkEvent?: (callId: string, streamSid: string, event: TalkEvent) => void; } /** @@ -58,6 +66,7 @@ interface StreamSession { streamSid: string; ws: WebSocket; sttSession: RealtimeTranscriptionSession; + talk: TalkSessionController; } type TtsQueueEntry = { @@ -225,6 +234,16 @@ export class MediaStreamHandler { if (session && message.media?.payload) { // Forward audio to STT const audioBuffer = Buffer.from(message.media.payload, "base64"); + const turnId = this.ensureActiveTurn(session); + this.emitTalkEvent(session, { + type: "input.audio.delta", + turnId, + payload: { + callId: session.callId, + streamSid: session.streamSid, + bytes: audioBuffer.byteLength, + }, + }); session.sttSession.sendAudio(audioBuffer); } break; @@ -296,16 +315,52 @@ export class MediaStreamHandler { const sttSession = this.config.transcriptionProvider.createSession({ providerConfig: this.config.providerConfig, onPartial: (partial) => { + const session = this.sessions.get(streamSid); + if (session) { + this.emitTalkEvent(session, { + type: "transcript.delta", + turnId: this.ensureActiveTurn(session), + payload: { callId: callSid, streamSid, text: partial, role: "user" }, + }); + } this.config.onPartialTranscript?.(callSid, partial); }, onTranscript: (transcript) => { + const session = this.sessions.get(streamSid); + if (session) { + const turnId = this.ensureActiveTurn(session); + this.emitTalkEvent(session, { + type: "input.audio.committed", + turnId, + final: true, + payload: { callId: callSid, streamSid }, + }); + this.emitTalkEvent(session, { + type: "transcript.done", + turnId, + final: true, + payload: { callId: callSid, streamSid, text: transcript, role: "user" }, + }); + } this.config.onTranscript?.(callSid, transcript); }, onSpeechStart: () => { + const session = this.sessions.get(streamSid); + if (session) { + this.ensureActiveTurn(session); + } this.config.onSpeechStart?.(callSid); }, onError: (error) => { console.warn("[MediaStream] Transcription session error:", error.message); + const session = this.sessions.get(streamSid); + if (session) { + this.emitTalkEvent(session, { + type: "session.error", + final: true, + payload: { callId: callSid, streamSid, error: error.message }, + }); + } }, }); @@ -314,10 +369,15 @@ export class MediaStreamHandler { streamSid, ws, sttSession, + talk: this.createTalkEvents(callSid, streamSid), }; this.sessions.set(streamSid, session); this.config.onConnect?.(callSid, streamSid); + this.emitTalkEvent(session, { + type: "session.started", + payload: { callId: callSid, streamSid, provider: this.config.transcriptionProvider.id }, + }); void this.connectTranscriptionAndNotify(session); return session; @@ -331,6 +391,15 @@ export class MediaStreamHandler { "[MediaStream] STT connection failed; closing media stream:", error instanceof Error ? error.message : String(error), ); + this.emitTalkEvent(session, { + type: "session.error", + final: true, + payload: { + callId: session.callId, + streamSid: session.streamSid, + error: error instanceof Error ? error.message : String(error), + }, + }); if ( this.sessions.get(session.streamSid) === session && session.ws.readyState === WebSocket.OPEN @@ -350,6 +419,10 @@ export class MediaStreamHandler { return; } + this.emitTalkEvent(session, { + type: "session.ready", + payload: { callId: session.callId, streamSid: session.streamSid }, + }); this.config.onTranscriptionReady?.(session.callId, session.streamSid); } @@ -362,6 +435,11 @@ export class MediaStreamHandler { this.clearTtsState(session.streamSid); session.sttSession.close(); this.sessions.delete(session.streamSid); + this.emitTalkEvent(session, { + type: "session.closed", + final: true, + payload: { callId: session.callId, streamSid: session.streamSid }, + }); this.config.onDisconnect?.(session.callId, session.streamSid); } @@ -530,6 +608,14 @@ export class MediaStreamHandler { * Audio should be mu-law encoded at 8kHz mono. */ sendAudio(streamSid: string, muLawAudio: Buffer): StreamSendResult { + const session = this.getOpenSession(streamSid); + if (session) { + this.emitTalkEvent(session, { + type: "output.audio.delta", + turnId: this.ensureActiveTurn(session), + payload: { callId: session.callId, streamSid, bytes: muLawAudio.byteLength }, + }); + } return this.sendToStream(streamSid, { event: "media", streamSid, @@ -589,6 +675,15 @@ export class MediaStreamHandler { const queue = this.getTtsQueue(streamSid); this.resolveQueuedTtsEntries(queue); this.ttsActiveControllers.get(streamSid)?.abort(); + const session = this.sessions.get(streamSid); + if (session?.talk.activeTurnId) { + const cancelled = session.talk.cancelTurn({ + payload: { callId: session.callId, streamSid, reason: _reason }, + }); + if (cancelled.ok) { + this.config.onTalkEvent?.(session.callId, session.streamSid, cancelled.event); + } + } this.clearAudio(streamSid); } @@ -638,9 +733,40 @@ export class MediaStreamHandler { const entry = queue.shift()!; this.ttsActiveControllers.set(streamSid, entry.controller); + const session = this.sessions.get(streamSid); + let playbackTurnId: string | undefined; try { + if (session) { + playbackTurnId = this.ensureActiveTurn(session); + this.emitTalkEvent(session, { + type: "output.audio.started", + turnId: playbackTurnId, + payload: { callId: session.callId, streamSid }, + }); + } await entry.playFn(entry.controller.signal); + if (entry.controller.signal.aborted) { + entry.resolve(); + continue; + } + if (session) { + const turnId = playbackTurnId ?? this.ensureActiveTurn(session); + this.emitTalkEvent(session, { + type: "output.audio.done", + turnId, + final: true, + payload: { callId: session.callId, streamSid }, + }); + if (session.talk.activeTurnId) { + const ended = session.talk.endTurn({ + payload: { callId: session.callId, streamSid }, + }); + if (ended.ok) { + this.config.onTalkEvent?.(session.callId, session.streamSid, ended.event); + } + } + } entry.resolve(); } catch (error) { if (entry.controller.signal.aborted) { @@ -657,6 +783,32 @@ export class MediaStreamHandler { } } + private createTalkEvents(callId: string, streamSid: string): TalkSessionController { + return createTalkSessionController({ + sessionId: `voice-call:${callId}:${streamSid}`, + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: this.config.transcriptionProvider.id, + turnIdPrefix: `${streamSid}:turn`, + }); + } + + private emitTalkEvent(session: StreamSession, input: TalkEventInput): void { + const event = session.talk.emit(input); + this.config.onTalkEvent?.(session.callId, session.streamSid, event); + } + + private ensureActiveTurn(session: StreamSession): string { + const turn = session.talk.ensureTurn({ + payload: { callId: session.callId, streamSid: session.streamSid }, + }); + if (turn.event) { + this.config.onTalkEvent?.(session.callId, session.streamSid, turn.event); + } + return turn.turnId; + } + private clearTtsState(streamSid: string): void { const queue = this.ttsQueues.get(streamSid); if (queue) { diff --git a/extensions/voice-call/src/realtime-fast-context.test.ts b/extensions/voice-call/src/realtime-fast-context.test.ts index 597080b3d63..56724b6a9bd 100644 --- a/extensions/voice-call/src/realtime-fast-context.test.ts +++ b/extensions/voice-call/src/realtime-fast-context.test.ts @@ -6,7 +6,7 @@ const mocks = vi.hoisted(() => ({ getActiveMemorySearchManager: vi.fn(), })); -vi.mock("openclaw/plugin-sdk/memory-host-search", () => ({ +vi.mock("../../../src/plugins/memory-runtime.js", () => ({ getActiveMemorySearchManager: mocks.getActiveMemorySearchManager, })); diff --git a/extensions/voice-call/src/realtime-fast-context.ts b/extensions/voice-call/src/realtime-fast-context.ts index d5c0fb1cdde..d490efe9763 100644 --- a/extensions/voice-call/src/realtime-fast-context.ts +++ b/extensions/voice-call/src/realtime-fast-context.ts @@ -1,151 +1,27 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; -import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; -import { getActiveMemorySearchManager } from "openclaw/plugin-sdk/memory-host-search"; import { - parseRealtimeVoiceAgentConsultArgs, - type RealtimeVoiceAgentConsultResult, + resolveRealtimeVoiceFastContextConsult, + type RealtimeVoiceFastContextConsultResult, + type RealtimeVoiceFastContextConfig, } from "openclaw/plugin-sdk/realtime-voice"; -import { withTimeout } from "openclaw/plugin-sdk/security-runtime"; -import type { VoiceCallRealtimeFastContextConfig } from "./config.js"; type Logger = { debug?: (message: string) => void; - warn: (message: string) => void; }; -type MemorySearchHit = { - path: string; - startLine: number; - endLine: number; - snippet: string; - source: "memory" | "sessions"; - score: number; -}; - -type FastContextLookupResult = - | { status: "unavailable"; error?: string } - | { status: "hits"; hits: MemorySearchHit[] }; - -type RealtimeFastContextConsultResult = - | { handled: false } - | { handled: true; result: RealtimeVoiceAgentConsultResult }; - -const MAX_SNIPPET_CHARS = 700; - -class RealtimeFastContextTimeoutError extends Error { - constructor(timeoutMs: number) { - super(`fast context lookup timed out after ${timeoutMs}ms`); - this.name = "RealtimeFastContextTimeoutError"; - } -} - -function normalizeSnippet(text: string): string { - const normalized = text.replace(/\s+/g, " ").trim(); - if (normalized.length <= MAX_SNIPPET_CHARS) { - return normalized; - } - return `${normalized.slice(0, MAX_SNIPPET_CHARS - 1).trimEnd()}...`; -} - -function buildSearchQuery(args: unknown): string { - const parsed = parseRealtimeVoiceAgentConsultArgs(args); - return [parsed.question, parsed.context].filter(Boolean).join("\n\n"); -} - -function buildContextText(params: { query: string; hits: MemorySearchHit[] }): string { - const hits = params.hits - .map((hit, index) => { - const location = `${hit.path}:${hit.startLine}-${hit.endLine}`; - return `${index + 1}. [${hit.source}] ${location}\n${normalizeSnippet(hit.snippet)}`; - }) - .join("\n\n"); - return [ - "Fast OpenClaw memory context found for the live caller.", - "Use this context only if it answers the caller's question. If it is not relevant, say briefly that you do not have that context handy.", - `Question:\n${params.query}`, - `Context:\n${hits}`, - ].join("\n\n"); -} - -function buildMissText(query: string): string { - return [ - "No relevant OpenClaw memory or session context was found quickly for the live caller.", - "Answer briefly that you do not have that context handy. Do not keep checking unless the caller asks you to.", - `Question:\n${query}`, - ].join("\n\n"); -} - -async function lookupFastContext(params: { - cfg: OpenClawConfig; - agentId: string; - sessionKey: string; - config: VoiceCallRealtimeFastContextConfig; - query: string; -}): Promise { - const memory = await getActiveMemorySearchManager({ - cfg: params.cfg, - agentId: params.agentId, - }); - if (!memory.manager) { - return { - status: "unavailable", - error: memory.error ?? "no active memory manager", - }; - } - const hits = await memory.manager.search(params.query, { - maxResults: params.config.maxResults, - sessionKey: params.sessionKey, - sources: params.config.sources, - }); - return { status: "hits", hits }; -} - export async function resolveRealtimeFastContextConsult(params: { cfg: OpenClawConfig; agentId: string; sessionKey: string; - config: VoiceCallRealtimeFastContextConfig; + config: RealtimeVoiceFastContextConfig; args: unknown; logger: Logger; -}): Promise { - if (!params.config.enabled) { - return { handled: false }; - } - - const query = buildSearchQuery(params.args); - try { - const lookup = await withTimeout( - lookupFastContext({ - cfg: params.cfg, - agentId: params.agentId, - sessionKey: params.sessionKey, - config: params.config, - query, - }), - params.config.timeoutMs, - { createError: () => new RealtimeFastContextTimeoutError(params.config.timeoutMs) }, - ); - if (lookup.status === "unavailable") { - params.logger.debug?.(`[voice-call] realtime fast context unavailable: ${lookup.error}`); - return params.config.fallbackToConsult - ? { handled: false } - : { handled: true, result: { text: buildMissText(query) } }; - } - const { hits } = lookup; - if (hits.length === 0) { - return params.config.fallbackToConsult - ? { handled: false } - : { handled: true, result: { text: buildMissText(query) } }; - } - return { - handled: true, - result: { text: buildContextText({ query, hits }) }, - }; - } catch (error) { - const message = formatErrorMessage(error); - params.logger.debug?.(`[voice-call] realtime fast context lookup failed: ${message}`); - return params.config.fallbackToConsult - ? { handled: false } - : { handled: true, result: { text: buildMissText(query) } }; - } +}): Promise { + return await resolveRealtimeVoiceFastContextConsult({ + ...params, + labels: { + audienceLabel: "caller", + contextName: "OpenClaw memory or session context", + }, + }); } diff --git a/extensions/voice-call/src/webhook.test.ts b/extensions/voice-call/src/webhook.test.ts index fb929d73d1d..e9f2ab0d3b8 100644 --- a/extensions/voice-call/src/webhook.test.ts +++ b/extensions/voice-call/src/webhook.test.ts @@ -224,6 +224,70 @@ describe("VoiceCallWebhookServer realtime transcription provider selection", () await server.stop(); } }); + + it("records media stream Talk events on the active call metadata", async () => { + const call = createCall(Date.now()); + const manager = { + getActiveCalls: () => [call], + getCallByProviderCallId: (providerCallId: string) => + providerCallId === "provider-call-1" ? call : undefined, + endCall: vi.fn(async () => ({ success: true })), + processEvent: vi.fn(), + speakInitialMessage: vi.fn(async () => {}), + } as unknown as CallManager; + const config = createConfig({ + streaming: { + ...createConfig().streaming, + enabled: true, + providers: { + openai: { + apiKey: "sk-test", // pragma: allowlist secret + }, + }, + }, + }); + + const server = new VoiceCallWebhookServer(config, manager, provider); + try { + await server.start(); + const mediaHandler = server.getMediaStreamHandler() as unknown as { + config: { + onTalkEvent?: NonNullable; + }; + }; + mediaHandler.config.onTalkEvent?.("provider-call-1", "MZ-talk", { + id: "voice-call:provider-call-1:MZ-talk:1", + type: "transcript.done", + sessionId: "voice-call:provider-call-1:MZ-talk", + turnId: "MZ-talk:turn:1", + seq: 1, + timestamp: "2026-05-05T06:00:00.000Z", + mode: "stt-tts", + transport: "gateway-relay", + brain: "agent-consult", + provider: "openai", + final: true, + payload: { text: "hello", role: "user" }, + }); + + expect(call.metadata).toEqual( + expect.objectContaining({ + lastTalkEventAt: "2026-05-05T06:00:00.000Z", + lastTalkEventType: "transcript.done", + recentTalkEvents: [ + { + at: "2026-05-05T06:00:00.000Z", + type: "transcript.done", + sessionId: "voice-call:provider-call-1:MZ-talk", + turnId: "MZ-talk:turn:1", + }, + ], + }), + ); + } finally { + await server.stop(); + } + }); }); describe("VoiceCallWebhookServer media stream client IP resolution", () => { diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index 6c99f97516b..3fff86cfcc6 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -2,6 +2,7 @@ import http from "node:http"; import { URL } from "node:url"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { resolveConfiguredCapabilityProvider } from "openclaw/plugin-sdk/provider-selection-runtime"; +import type { TalkEvent } from "openclaw/plugin-sdk/realtime-voice"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import { createWebhookInFlightLimiter, @@ -77,6 +78,28 @@ function sanitizeTranscriptForLog(value: string): string { return `${sanitized.slice(0, TRANSCRIPT_LOG_MAX_CHARS)}...`; } +function appendRecentTalkEventMetadata(call: CallRecord, event: TalkEvent): void { + const metadata = call.metadata ?? {}; + const recent = Array.isArray(metadata.recentTalkEvents) + ? metadata.recentTalkEvents.filter( + (entry): entry is { at: string; type: string; sessionId: string; turnId?: string } => + !!entry && typeof entry === "object" && !Array.isArray(entry), + ) + : []; + recent.push({ + at: event.timestamp, + type: event.type, + sessionId: event.sessionId, + turnId: event.turnId, + }); + call.metadata = { + ...metadata, + lastTalkEventAt: event.timestamp, + lastTalkEventType: event.type, + recentTalkEvents: recent.slice(-10), + }; +} + function buildRequestUrl( requestUrl: string | undefined, requestHost: string | undefined, @@ -400,6 +423,12 @@ export class VoiceCallWebhookServer { const safePartial = sanitizeTranscriptForLog(partial); console.log(`[voice-call] Partial for ${callId}: ${safePartial} (chars=${partial.length})`); }, + onTalkEvent: (providerCallId, _streamSid, event) => { + const call = this.manager.getCallByProviderCallId(providerCallId); + if (call) { + appendRecentTalkEventMetadata(call, event); + } + }, onConnect: (callId, streamSid) => { console.log(`[voice-call] Media stream connected: ${callId} -> ${streamSid}`); this.clearPendingDisconnectHangup(callId); diff --git a/extensions/voice-call/src/webhook/realtime-handler.test.ts b/extensions/voice-call/src/webhook/realtime-handler.test.ts index 840a021d339..b87ab697e44 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.test.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.test.ts @@ -422,6 +422,192 @@ describe("RealtimeCallHandler path routing", () => { } }); + it("records common Talk events for realtime telephony sessions", async () => { + let callbacks: + | { + onAudio?: (audio: Buffer) => void; + onEvent?: (event: { + direction: "client" | "server"; + type: string; + detail?: string; + }) => void; + onReady?: () => void; + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + } + | undefined; + const sendAudio = vi.fn(); + const call: CallRecord = { + callId: "call-1", + providerCallId: "CA-talk-events", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }; + const createBridge = vi.fn( + (request: Parameters[0]) => { + callbacks = request; + return makeBridge({ sendAudio }); + }, + ); + const handler = makeHandler(undefined, { + manager: { + getCallByProviderCallId: vi.fn((): CallRecord => call), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-talk-events", callSid: "CA-talk-events" }, + }), + ); + await vi.waitFor(() => { + expect(createBridge).toHaveBeenCalled(); + }); + + callbacks?.onReady?.(); + ws.send( + JSON.stringify({ + event: "media", + media: { payload: Buffer.from([0xff, 0xff]).toString("base64") }, + }), + ); + await vi.waitFor(() => { + expect(sendAudio).toHaveBeenCalledWith(Buffer.from([0xff, 0xff])); + }); + callbacks?.onTranscript?.("user", "hello", true); + callbacks?.onAudio?.(Buffer.from([1, 2, 3])); + callbacks?.onTranscript?.("assistant", "hi there", true); + callbacks?.onEvent?.({ direction: "server", type: "response.done" }); + + const recent = call.metadata?.recentTalkEvents as + | Array<{ + brain: string; + provider: string; + sessionId: string; + transport: string; + type: string; + }> + | undefined; + expect(recent?.map((event) => event.type)).toEqual([ + "session.started", + "session.ready", + "turn.started", + "input.audio.delta", + "transcript.done", + "input.audio.committed", + "output.audio.started", + "output.audio.delta", + "output.text.done", + "output.audio.done", + "turn.ended", + ]); + expect(recent?.[0]).toMatchObject({ + provider: "openai", + sessionId: "voice-call:call-1:realtime", + transport: "gateway-relay", + }); + expect(call.metadata?.lastTalkEventType).toBe("turn.ended"); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + + it("emits barge-in cancellation with a turn before provider speech_started", async () => { + let callbacks: + | { + onAudio?: (audio: Buffer) => void; + } + | undefined; + const sendAudio = vi.fn(); + const call: CallRecord = { + callId: "call-1", + providerCallId: "CA-barge-in", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }; + const createBridge = vi.fn( + (request: Parameters[0]) => { + callbacks = request; + return makeBridge({ sendAudio }); + }, + ); + const handler = makeHandler(undefined, { + manager: { + getCallByProviderCallId: vi.fn((): CallRecord => call), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-barge-in", callSid: "CA-barge-in" }, + }), + ); + await vi.waitFor(() => { + expect(createBridge).toHaveBeenCalled(); + }); + + callbacks?.onAudio?.(Buffer.from([1, 2, 3])); + const speechPayload = Buffer.alloc(160, 0x00).toString("base64"); + ws.send(JSON.stringify({ event: "media", media: { payload: speechPayload } })); + ws.send(JSON.stringify({ event: "media", media: { payload: speechPayload } })); + + await vi.waitFor(() => { + expect(sendAudio).toHaveBeenCalledTimes(2); + }); + + const recent = call.metadata?.recentTalkEvents as + | Array<{ + turnId?: string; + type: string; + }> + | undefined; + const cancelled = recent?.find((event) => event.type === "turn.cancelled"); + expect(cancelled).toMatchObject({ + turnId: expect.stringMatching(/^turn-\d+$/), + }); + expect(recent?.findLast((event) => event.type === "input.audio.delta")?.turnId).not.toBe( + cancelled?.turnId, + ); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + it("submits continuing responses only for realtime agent consult calls", async () => { let callbacks: | { diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index 96aea578af4..ecac126dd94 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -4,11 +4,15 @@ import type { Duplex } from "node:stream"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { buildRealtimeVoiceAgentConsultWorkingResponse, + createTalkSessionController, createRealtimeVoiceBridgeSession, REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderConfig, type RealtimeVoiceProviderPlugin, + type TalkEvent, + type TalkEventInput, + type TalkSessionController, } from "openclaw/plugin-sdk/realtime-voice"; import WebSocket, { WebSocketServer } from "ws"; import type { VoiceCallRealtimeConfig } from "../config.js"; @@ -41,6 +45,7 @@ const CONSULT_TRANSCRIPT_SETTLE_MS = 350; const CONSULT_TRANSCRIPT_SETTLE_MAX_MS = 1_000; const MAX_PARTIAL_USER_TRANSCRIPT_CHARS = 1_200; const RECENT_FINAL_USER_TRANSCRIPT_TTL_MS = 2_000; +const BARGE_IN_REQUIRED_LOUD_CHUNKS = 2; function normalizePath(pathname: string): string { const trimmed = pathname.trim(); @@ -243,6 +248,36 @@ type NativeConsultState = { type TelephonyCloseReason = "completed" | "error"; +function appendRecentTalkEventMetadata( + call: CallRecord | null | undefined, + event: TalkEvent, +): void { + if (!call) { + return; + } + const metadata = call.metadata ?? {}; + const previous = Array.isArray(metadata.recentTalkEvents) ? metadata.recentTalkEvents : []; + metadata.lastTalkEventAt = event.timestamp; + metadata.lastTalkEventType = event.type; + metadata.recentTalkEvents = [ + ...previous, + { + id: event.id, + brain: event.brain, + mode: event.mode, + provider: event.provider, + seq: event.seq, + sessionId: event.sessionId, + timestamp: event.timestamp, + transport: event.transport, + type: event.type, + ...(event.turnId ? { turnId: event.turnId } : {}), + ...(event.final !== undefined ? { final: event.final } : {}), + }, + ].slice(-12); + call.metadata = metadata; +} + export class RealtimeCallHandler { private readonly toolHandlers = new Map(); private readonly pendingStreamTokens = new Map(); @@ -471,6 +506,49 @@ export class RealtimeCallHandler { } const { callId, initialGreetingInstructions } = registration; + const callRecord = this.manager.getCallByProviderCallId(callSid); + const talk: TalkSessionController = createTalkSessionController({ + sessionId: `voice-call:${callId}:realtime`, + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + provider: this.realtimeProvider.id, + }); + const rememberTalkEvent = (event: TalkEvent | undefined): TalkEvent | undefined => { + if (event) { + appendRecentTalkEventMetadata(callRecord, event); + } + return event; + }; + const emitTalkEvent = (input: TalkEventInput): TalkEvent => { + return rememberTalkEvent(talk.emit(input)) as TalkEvent; + }; + const ensureTalkTurn = (): string => { + const turn = talk.ensureTurn({ + payload: { callId, providerCallId: callSid }, + }); + rememberTalkEvent(turn.event); + return turn.turnId; + }; + const endTalkTurn = (reason = "completed"): void => { + const ended = talk.endTurn({ + payload: { callId, providerCallId: callSid, reason }, + }); + if (ended.ok) { + rememberTalkEvent(ended.event); + } + }; + const finishOutputAudio = (reason: string): void => { + rememberTalkEvent( + talk.finishOutputAudio({ + payload: { callId, providerCallId: callSid, reason }, + }), + ); + }; + emitTalkEvent({ + type: "session.started", + payload: { callId, providerCallId: callSid, streamSid }, + }); console.log( `[voice-call] Realtime bridge starting for call ${callId} (providerCallId=${callSid}, initialGreeting=${initialGreetingInstructions ? "queued" : "absent"})`, ); @@ -516,7 +594,9 @@ export class RealtimeCallHandler { } }, }); - const speechDetector = new RealtimeMulawSpeechStartDetector(); + const speechDetector = new RealtimeMulawSpeechStartDetector({ + requiredLoudChunks: BARGE_IN_REQUIRED_LOUD_CHUNKS, + }); const session = createRealtimeVoiceBridgeSession({ provider: this.realtimeProvider, providerConfig: this.providerConfig, @@ -527,6 +607,18 @@ export class RealtimeCallHandler { audioSink: { isOpen: () => ws.readyState === WebSocket.OPEN, sendAudio: (muLaw) => { + const turnId = ensureTalkTurn(); + rememberTalkEvent( + talk.startOutputAudio({ + turnId, + payload: { callId, providerCallId: callSid }, + }).event, + ); + emitTalkEvent({ + type: "output.audio.delta", + turnId, + payload: { byteLength: muLaw.length }, + }); audioPacer.sendAudio(muLaw); }, clearAudio: () => { @@ -534,12 +626,37 @@ export class RealtimeCallHandler { console.log( `[voice-call] realtime outbound audio clear requested callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`, ); + finishOutputAudio("clear"); }, sendMark: (markName) => { audioPacer.sendMark(markName); }, }, onTranscript: (role, text, isFinal) => { + const turnId = ensureTalkTurn(); + const eventType = + role === "assistant" + ? isFinal + ? "output.text.done" + : "output.text.delta" + : isFinal + ? "transcript.done" + : "transcript.delta"; + const payload = role === "assistant" ? { text } : { role, text }; + emitTalkEvent({ + type: eventType, + turnId, + payload, + final: isFinal, + }); + if (role === "user" && isFinal) { + emitTalkEvent({ + type: "input.audio.committed", + turnId, + payload: { callId, providerCallId: callSid }, + final: true, + }); + } if (!isFinal) { if (role === "user" && text.trim()) { const transcript = this.recordPartialUserTranscript(callId, text); @@ -590,6 +707,14 @@ export class RealtimeCallHandler { }); }, onToolCall: (toolEvent, session) => { + const turnId = ensureTalkTurn(); + emitTalkEvent({ + type: "tool.call", + turnId, + itemId: toolEvent.itemId, + callId: toolEvent.callId, + payload: { name: toolEvent.name, args: toolEvent.args }, + }); console.log( `[voice-call] realtime tool call received callId=${callId} providerCallId=${callSid} tool=${toolEvent.name}`, ); @@ -599,10 +724,54 @@ export class RealtimeCallHandler { toolEvent.callId || toolEvent.itemId, toolEvent.name, toolEvent.args, + turnId, + emitTalkEvent, ); }, + onEvent: (event) => { + if (event.type === "input_audio_buffer.speech_started") { + ensureTalkTurn(); + return; + } + if (event.type === "input_audio_buffer.speech_stopped") { + const turnId = talk.activeTurnId; + if (!turnId) { + return; + } + emitTalkEvent({ + type: "input.audio.committed", + turnId, + payload: { callId, providerCallId: callSid, source: event.type }, + final: true, + }); + return; + } + if (event.type === "response.done") { + finishOutputAudio("response.done"); + endTalkTurn("response.done"); + return; + } + if (event.type === "error") { + emitTalkEvent({ + type: "session.error", + payload: { message: event.detail ?? "Realtime provider error" }, + final: true, + }); + } + }, + onReady: () => { + emitTalkEvent({ + type: "session.ready", + payload: { callId, providerCallId: callSid }, + }); + }, onError: (error) => { console.error("[voice-call] realtime voice error:", error.message); + emitTalkEvent({ + type: "session.error", + payload: { message: error.message }, + final: true, + }); }, onClose: (reason) => { this.activeBridgesByCallId.delete(callId); @@ -610,6 +779,12 @@ export class RealtimeCallHandler { this.activeTelephonyClosersByCallId.delete(callId); this.activeTelephonyClosersByCallId.delete(callSid); this.clearUserTranscriptState(callId); + finishOutputAudio(reason); + emitTalkEvent({ + type: "session.closed", + payload: { reason }, + final: true, + }); if (reason !== "error") { return; } @@ -639,11 +814,25 @@ export class RealtimeCallHandler { const sendAudioToSession = session.sendAudio.bind(session); session.sendAudio = (audio) => { if (speechDetector.accept(audio)) { + const interruptedTurnId = ensureTalkTurn(); const clearedBytes = audioPacer.clearAudio(); console.log( `[voice-call] realtime outbound audio cleared by barge-in callId=${callId} providerCallId=${callSid} queuedBytes=${clearedBytes}`, ); + finishOutputAudio("barge-in"); + const cancelled = talk.cancelTurn({ + turnId: interruptedTurnId, + payload: { callId, providerCallId: callSid, reason: "barge-in" }, + }); + if (cancelled.ok) { + rememberTalkEvent(cancelled.event); + } } + emitTalkEvent({ + type: "input.audio.delta", + turnId: ensureTalkTurn(), + payload: { byteLength: audio.length }, + }); sendAudioToSession(audio); }; const closeSession = session.close.bind(session); @@ -961,9 +1150,49 @@ export class RealtimeCallHandler { bridgeCallId: string, name: string, args: unknown, + turnId: string, + emitTalkEvent?: (input: TalkEventInput) => TalkEvent, ): Promise { const handler = this.toolHandlers.get(name); const startedAt = Date.now(); + const hasResultError = (result: unknown): boolean => { + return Boolean( + result && typeof result === "object" && !Array.isArray(result) && "error" in result, + ); + }; + const emitFinalToolEvent = (result: unknown): void => { + emitTalkEvent?.({ + type: hasResultError(result) ? "tool.error" : "tool.result", + turnId, + callId: bridgeCallId, + payload: { name, result }, + final: true, + }); + }; + const submitFinalToolResult = (result: unknown): void => { + bridge.submitToolResult(bridgeCallId, result); + emitFinalToolEvent(result); + }; + const submitWorkingResponse = () => { + if ( + handler && + name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME && + bridge.bridge.supportsToolResultContinuation && + !this.config.fastContext.enabled + ) { + emitTalkEvent?.({ + type: "tool.progress", + turnId, + callId: bridgeCallId, + payload: { name, status: "working" }, + }); + bridge.submitToolResult( + bridgeCallId, + buildRealtimeVoiceAgentConsultWorkingResponse("caller"), + { willContinue: true }, + ); + } + }; if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { this.lastProviderConsultAtByCallId.set(callId, Date.now()); const timer = this.forcedConsultTimersByCallId.get(callId); @@ -974,7 +1203,7 @@ export class RealtimeCallHandler { const forcedConsult = this.forcedConsultsByCallId.get(callId); if (forcedConsult) { if (forcedConsult.completedAt) { - bridge.submitToolResult(bridgeCallId, { + submitFinalToolResult({ status: "already_delivered", message: "OpenClaw already delivered this consult result internally. Do not repeat it.", }); @@ -984,31 +1213,17 @@ export class RealtimeCallHandler { const result = await forcedConsult.promise.catch((error: unknown) => ({ error: formatErrorMessage(error), })); - bridge.submitToolResult(bridgeCallId, result); + submitFinalToolResult(result); return; } - const submitWorkingResponse = () => { - if ( - handler && - bridge.bridge.supportsToolResultContinuation && - !this.config.fastContext.enabled - ) { - bridge.submitToolResult( - bridgeCallId, - buildRealtimeVoiceAgentConsultWorkingResponse("caller"), - { willContinue: true }, - ); - } - }; - const existingNativeConsult = this.nativeConsultsInFlightByCallId.get(callId); if (existingNativeConsult) { console.log( `[voice-call] realtime tool call sharing in-flight agent consult callId=${callId} ageMs=${Date.now() - existingNativeConsult.startedAt}`, ); submitWorkingResponse(); - bridge.submitToolResult(bridgeCallId, await existingNativeConsult.promise); + submitFinalToolResult(await existingNativeConsult.promise); return; } @@ -1047,7 +1262,7 @@ export class RealtimeCallHandler { console.log( `[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`, ); - bridge.submitToolResult(bridgeCallId, result); + submitFinalToolResult(result); if (status === "ok") { this.consumePartialUserTranscript(callId, state.partialUserTranscript); } @@ -1084,7 +1299,7 @@ export class RealtimeCallHandler { console.log( `[voice-call] realtime tool call completed callId=${callId} tool=${name} status=${status} elapsedMs=${Date.now() - startedAt}${error ? ` error=${error}` : ""}`, ); - bridge.submitToolResult(bridgeCallId, result); + submitFinalToolResult(result); if (name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME && status === "ok") { this.consumePartialUserTranscript(callId, context.partialUserTranscript); }