import { spawn } from "node:child_process"; import type { Writable } from "node:stream"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import type { PluginRuntime, RuntimeLogger } from "openclaw/plugin-sdk/plugin-runtime"; import { createRealtimeVoiceBridgeSession, REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ, REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, resolveConfiguredRealtimeVoiceProvider, type RealtimeVoiceBridgeSession, type RealtimeVoiceProviderConfig, type RealtimeVoiceProviderPlugin, } from "openclaw/plugin-sdk/realtime-voice"; import { consultOpenClawAgentForGoogleMeet, GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME, resolveGoogleMeetRealtimeTools, submitGoogleMeetConsultWorkingResponse, } from "./agent-consult.js"; import type { GoogleMeetConfig } from "./config.js"; import type { GoogleMeetChromeHealth } from "./transports/types.js"; type BridgeProcess = { pid?: number; killed?: boolean; stdin?: Writable | null; stdout?: { on(event: "data", listener: (chunk: Buffer | string) => void): unknown } | null; stderr?: { on(event: "data", listener: (chunk: Buffer | string) => void): unknown } | null; kill(signal?: NodeJS.Signals): boolean; on( event: "exit", listener: (code: number | null, signal: NodeJS.Signals | null) => void, ): unknown; on(event: "error", listener: (error: Error) => void): unknown; }; type SpawnFn = ( command: string, args: string[], options: { stdio: ["pipe" | "ignore", "pipe" | "ignore", "pipe" | "ignore"] }, ) => BridgeProcess; export type ChromeRealtimeAudioBridgeHandle = { providerId: string; inputCommand: string[]; outputCommand: string[]; speak: (instructions?: string) => void; getHealth: () => GoogleMeetChromeHealth; stop: () => Promise; }; type ResolvedRealtimeProvider = { provider: RealtimeVoiceProviderPlugin; providerConfig: RealtimeVoiceProviderConfig; }; function splitCommand(argv: string[]): { command: string; args: string[] } { const [command, ...args] = argv; if (!command) { throw new Error("audio bridge command must not be empty"); } return { command, args }; } export function resolveGoogleMeetRealtimeAudioFormat(config: GoogleMeetConfig) { return config.chrome.audioFormat === "g711-ulaw-8khz" ? REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ : REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ; } export function resolveGoogleMeetRealtimeProvider(params: { config: GoogleMeetConfig; fullConfig: OpenClawConfig; providers?: RealtimeVoiceProviderPlugin[]; }): ResolvedRealtimeProvider { return resolveConfiguredRealtimeVoiceProvider({ configuredProviderId: params.config.realtime.provider, providerConfigs: params.config.realtime.providers, cfg: params.fullConfig, providers: params.providers, defaultModel: params.config.realtime.model, noRegisteredProviderMessage: "No configured realtime voice provider registered", }); } export async function startCommandRealtimeAudioBridge(params: { config: GoogleMeetConfig; fullConfig: OpenClawConfig; runtime: PluginRuntime; meetingSessionId: string; inputCommand: string[]; outputCommand: string[]; logger: RuntimeLogger; providers?: RealtimeVoiceProviderPlugin[]; spawn?: SpawnFn; }): Promise { const input = splitCommand(params.inputCommand); const output = splitCommand(params.outputCommand); const spawnFn: SpawnFn = params.spawn ?? ((command, args, options) => spawn(command, args, options) as unknown as BridgeProcess); const spawnOutputProcess = () => spawnFn(output.command, output.args, { stdio: ["pipe", "ignore", "pipe"], }); let outputProcess = spawnOutputProcess(); const inputProcess = spawnFn(input.command, input.args, { stdio: ["ignore", "pipe", "pipe"], }); let stopped = false; let bridge: RealtimeVoiceBridgeSession | null = null; let realtimeReady = false; let lastInputAt: string | undefined; let lastOutputAt: string | undefined; let lastInputBytes = 0; let lastOutputBytes = 0; let lastClearAt: string | undefined; let clearCount = 0; const stop = async () => { if (stopped) { return; } stopped = true; try { bridge?.close(); } catch (error) { params.logger.debug?.( `[google-meet] realtime voice bridge close ignored: ${formatErrorMessage(error)}`, ); } inputProcess.kill("SIGTERM"); outputProcess.kill("SIGTERM"); }; const fail = (label: string) => (error: Error) => { params.logger.warn(`[google-meet] ${label} failed: ${formatErrorMessage(error)}`); void stop(); }; const attachOutputProcessHandlers = (proc: BridgeProcess) => { proc.on("error", (error) => { if (proc !== outputProcess) { return; } fail("audio output command")(error); }); proc.on("exit", (code, signal) => { if (proc !== outputProcess) { return; } if (!stopped) { params.logger.warn( `[google-meet] audio output command exited (${code ?? signal ?? "done"})`, ); void stop(); } }); proc.stderr?.on("data", (chunk) => { params.logger.debug?.(`[google-meet] audio output: ${String(chunk).trim()}`); }); }; const clearOutputPlayback = () => { if (stopped) { return; } const previousOutput = outputProcess; outputProcess = spawnOutputProcess(); attachOutputProcessHandlers(outputProcess); clearCount += 1; lastClearAt = new Date().toISOString(); params.logger.debug?.( `[google-meet] cleared realtime audio output buffer by restarting playback command`, ); previousOutput.kill("SIGTERM"); }; inputProcess.on("error", fail("audio input command")); inputProcess.on("exit", (code, signal) => { if (!stopped) { params.logger.warn(`[google-meet] audio input command exited (${code ?? signal ?? "done"})`); void stop(); } }); attachOutputProcessHandlers(outputProcess); inputProcess.stderr?.on("data", (chunk) => { params.logger.debug?.(`[google-meet] audio input: ${String(chunk).trim()}`); }); const resolved = resolveGoogleMeetRealtimeProvider({ config: params.config, fullConfig: params.fullConfig, providers: params.providers, }); const transcript: Array<{ role: "user" | "assistant"; text: string }> = []; bridge = createRealtimeVoiceBridgeSession({ provider: resolved.provider, providerConfig: resolved.providerConfig, audioFormat: resolveGoogleMeetRealtimeAudioFormat(params.config), instructions: params.config.realtime.instructions, initialGreetingInstructions: params.config.realtime.introMessage, triggerGreetingOnReady: false, markStrategy: "ack-immediately", tools: resolveGoogleMeetRealtimeTools(params.config.realtime.toolPolicy), audioSink: { isOpen: () => !stopped, sendAudio: (audio) => { lastOutputAt = new Date().toISOString(); lastOutputBytes += audio.byteLength; outputProcess.stdin?.write(audio); }, clearAudio: clearOutputPlayback, }, onTranscript: (role, text, isFinal) => { if (isFinal) { transcript.push({ role, text }); if (transcript.length > 40) { transcript.splice(0, transcript.length - 40); } params.logger.debug?.(`[google-meet] ${role}: ${text}`); } }, onToolCall: (event, session) => { if (event.name !== GOOGLE_MEET_AGENT_CONSULT_TOOL_NAME) { session.submitToolResult(event.callId || event.itemId, { error: `Tool "${event.name}" not available`, }); return; } submitGoogleMeetConsultWorkingResponse(session, event.callId || event.itemId); void consultOpenClawAgentForGoogleMeet({ config: params.config, fullConfig: params.fullConfig, runtime: params.runtime, logger: params.logger, meetingSessionId: params.meetingSessionId, args: event.args, transcript, }) .then((result) => { session.submitToolResult(event.callId || event.itemId, result); }) .catch((error: Error) => { session.submitToolResult(event.callId || event.itemId, { error: formatErrorMessage(error), }); }); }, onError: fail("realtime voice bridge"), onClose: (reason) => { realtimeReady = false; if (reason === "error") { void stop(); } }, onReady: () => { realtimeReady = true; }, }); inputProcess.stdout?.on("data", (chunk) => { const audio = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); if (!stopped && audio.byteLength > 0) { lastInputAt = new Date().toISOString(); lastInputBytes += audio.byteLength; bridge?.sendAudio(Buffer.from(audio)); } }); await bridge.connect(); return { providerId: resolved.provider.id, inputCommand: params.inputCommand, outputCommand: params.outputCommand, speak: (instructions) => { bridge?.triggerGreeting(instructions); }, getHealth: () => ({ providerConnected: bridge?.bridge.isConnected() ?? false, realtimeReady, audioInputActive: lastInputBytes > 0, audioOutputActive: lastOutputBytes > 0, lastInputAt, lastOutputAt, lastInputBytes, lastOutputBytes, lastClearAt, clearCount, bridgeClosed: stopped, }), stop, }; }