import { spawn, spawnSync, type ChildProcess } from "node:child_process"; import { randomUUID } from "node:crypto"; import { setTimeout as sleep } from "node:timers/promises"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { DEFAULT_GOOGLE_MEET_AUDIO_INPUT_COMMAND, DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND, } from "./config.js"; import { GOOGLE_MEET_SYSTEM_PROFILER_COMMAND, outputMentionsBlackHole2ch, } from "./transports/chrome.js"; type NodeBridgeSession = { id: string; url?: string; mode?: string; outputCommand: { command: string; args: string[] }; input?: ChildProcess; output?: ChildProcess; chunks: Buffer[]; waiters: Array<() => void>; closed: boolean; createdAt: string; lastInputAt?: string; lastOutputAt?: string; lastClearAt?: string; lastInputBytes: number; lastOutputBytes: number; closedAt?: string; clearCount: number; }; const sessions = new Map(); function asRecord(value: unknown): Record { return value && typeof value === "object" && !Array.isArray(value) ? (value as Record) : {}; } function readString(value: unknown): string | undefined { return typeof value === "string" && value.trim() ? value.trim() : undefined; } function readStringArray(value: unknown): string[] | undefined { if (!Array.isArray(value)) { return undefined; } const result = value.filter( (entry): entry is string => typeof entry === "string" && entry.length > 0, ); return result.length > 0 ? result : undefined; } function readNumber(value: unknown, fallback: number): number { return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : fallback; } function runCommandWithTimeout(argv: string[], timeoutMs: number) { const [command, ...args] = argv; if (!command) { throw new Error("command must not be empty"); } const result = spawnSync(command, args, { encoding: "utf8", timeout: timeoutMs, }); return { code: typeof result.status === "number" ? result.status : result.error ? 1 : 0, stdout: result.stdout ?? "", stderr: result.stderr ?? (result.error ? formatErrorMessage(result.error) : ""), }; } function assertBlackHoleAvailable(timeoutMs: number) { if (process.platform !== "darwin") { throw new Error("Chrome Meet transport with blackhole-2ch audio is currently macOS-only"); } const result = runCommandWithTimeout( [GOOGLE_MEET_SYSTEM_PROFILER_COMMAND, "SPAudioDataType"], timeoutMs, ); const output = `${result.stdout}\n${result.stderr}`; if (result.code !== 0 || !outputMentionsBlackHole2ch(output)) { throw new Error("BlackHole 2ch audio device not found on the node."); } } function splitCommand(argv: string[]): { command: string; args: string[] } { const [command, ...args] = argv; if (!command) { throw new Error("audio command must not be empty"); } return { command, args }; } function wake(session: NodeBridgeSession) { const waiters = session.waiters.splice(0); for (const waiter of waiters) { waiter(); } } function stopSession(session: NodeBridgeSession) { const wasClosed = session.closed; session.closed = true; session.closedAt ??= new Date().toISOString(); terminateChild(session.input); terminateChild(session.output); if (!wasClosed) { wake(session); } } function attachOutputProcessHandlers(session: NodeBridgeSession, outputProcess: ChildProcess) { outputProcess.on("exit", () => { if (session.output === outputProcess) { stopSession(session); } }); outputProcess.on("error", () => { if (session.output === outputProcess) { stopSession(session); } }); outputProcess.stdin?.on?.("error", () => { if (session.output === outputProcess) { stopSession(session); } }); } function startOutputProcess(command: { command: string; args: string[] }) { return spawn(command.command, command.args, { stdio: ["pipe", "ignore", "pipe"], }); } function startCommandPair(params: { inputCommand: string[]; outputCommand: string[]; url?: string; mode?: string; }): NodeBridgeSession { const input = splitCommand(params.inputCommand); const output = splitCommand(params.outputCommand); const session: NodeBridgeSession = { id: `meet_node_${randomUUID()}`, url: params.url, mode: params.mode, outputCommand: output, chunks: [], waiters: [], closed: false, createdAt: new Date().toISOString(), lastInputBytes: 0, lastOutputBytes: 0, clearCount: 0, }; const outputProcess = startOutputProcess(output); const inputProcess = spawn(input.command, input.args, { stdio: ["ignore", "pipe", "pipe"], }); session.input = inputProcess; session.output = outputProcess; inputProcess.stdout?.on("data", (chunk) => { const audio = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); session.lastInputAt = new Date().toISOString(); session.lastInputBytes += audio.byteLength; session.chunks.push(audio); if (session.chunks.length > 200) { session.chunks.splice(0, session.chunks.length - 200); } wake(session); }); inputProcess.on("exit", () => stopSession(session)); attachOutputProcessHandlers(session, outputProcess); inputProcess.on("error", () => stopSession(session)); sessions.set(session.id, session); return session; } function terminateChild(child?: ChildProcess) { if (!child) { return; } let exited = child.exitCode !== null || child.signalCode !== null; child.once?.("exit", () => { exited = true; }); try { child.kill("SIGTERM"); } catch { // Best-effort cleanup for node-host child processes. } const timer = setTimeout(() => { if (exited) { return; } try { child.kill("SIGKILL"); } catch { // Process may have exited after the grace check. } }, 2_000); timer.unref?.(); } async function pullAudio(params: Record) { const bridgeId = readString(params.bridgeId); if (!bridgeId) { throw new Error("bridgeId required"); } const session = sessions.get(bridgeId); if (!session) { throw new Error(`unknown bridgeId: ${bridgeId}`); } const timeoutMs = Math.min(readNumber(params.timeoutMs, 250), 2_000); if (session.chunks.length === 0 && !session.closed) { await Promise.race([ sleep(timeoutMs), new Promise((resolve) => { session.waiters.push(resolve); }), ]); } const chunk = session.chunks.shift(); return { bridgeId, closed: session.closed, base64: chunk ? chunk.toString("base64") : undefined, }; } function pushAudio(params: Record) { const bridgeId = readString(params.bridgeId); const base64 = readString(params.base64); if (!bridgeId || !base64) { throw new Error("bridgeId and base64 required"); } const session = sessions.get(bridgeId); if (!session || session.closed) { throw new Error(`bridge is not open: ${bridgeId}`); } const audio = Buffer.from(base64, "base64"); session.lastOutputAt = new Date().toISOString(); session.lastOutputBytes += audio.byteLength; try { session.output?.stdin?.write(audio); } catch { stopSession(session); throw new Error(`bridge is not open: ${bridgeId}`); } return { bridgeId, ok: true }; } function clearAudio(params: Record) { const bridgeId = readString(params.bridgeId); if (!bridgeId) { throw new Error("bridgeId required"); } const session = sessions.get(bridgeId); if (!session || session.closed) { throw new Error(`bridge is not open: ${bridgeId}`); } const previousOutput = session.output; const outputProcess = startOutputProcess(session.outputCommand); session.output = outputProcess; attachOutputProcessHandlers(session, outputProcess); session.clearCount += 1; session.lastClearAt = new Date().toISOString(); terminateChild(previousOutput); return { bridgeId, ok: true, clearCount: session.clearCount }; } function startChrome(params: Record) { const url = readString(params.url); if (!url) { throw new Error("url required"); } const timeoutMs = readNumber(params.joinTimeoutMs, 30_000); const mode = readString(params.mode); let bridgeId: string | undefined; let audioBridge: { type: "external-command" | "node-command-pair" } | undefined; if (mode === "agent" || mode === "bidi" || mode === "realtime") { assertBlackHoleAvailable(Math.min(timeoutMs, 10_000)); const healthCommand = readStringArray(params.audioBridgeHealthCommand); if (healthCommand) { const health = runCommandWithTimeout(healthCommand, timeoutMs); if (health.code !== 0) { throw new Error( `Chrome audio bridge health check failed: ${health.stderr || health.stdout || health.code}`, ); } } const bridgeCommand = readStringArray(params.audioBridgeCommand); if (bridgeCommand) { if (mode === "agent") { throw new Error( "Chrome agent mode requires audioInputCommand and audioOutputCommand so OpenClaw can run STT and regular TTS directly.", ); } const bridge = runCommandWithTimeout(bridgeCommand, timeoutMs); if (bridge.code !== 0) { throw new Error( `failed to start Chrome audio bridge: ${bridge.stderr || bridge.stdout || bridge.code}`, ); } audioBridge = { type: "external-command" }; } else { const session = startCommandPair({ inputCommand: readStringArray(params.audioInputCommand) ?? [ ...DEFAULT_GOOGLE_MEET_AUDIO_INPUT_COMMAND, ], outputCommand: readStringArray(params.audioOutputCommand) ?? [ ...DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND, ], url, mode, }); bridgeId = session.id; audioBridge = { type: "node-command-pair" }; } } if (params.launch !== false) { const argv = ["open", "-a", "Google Chrome"]; const browserProfile = readString(params.browserProfile); if (browserProfile) { argv.push("--args", `--profile-directory=${browserProfile}`); } argv.push(url); const result = runCommandWithTimeout(argv, timeoutMs); if (result.code !== 0) { if (bridgeId) { const session = sessions.get(bridgeId); if (session) { stopSession(session); } } throw new Error( `failed to launch Chrome for Meet: ${result.stderr || result.stdout || result.code}`, ); } } return { launched: params.launch !== false, bridgeId, audioBridge, browser: params.launch !== false ? { status: "chrome-opened", browserUrl: url, notes: [ "Browser page control is handled by OpenClaw browser automation when using chrome-node.", ], } : undefined, }; } function bridgeStatus(params: Record) { const bridgeId = readString(params.bridgeId); const session = bridgeId ? sessions.get(bridgeId) : undefined; return { bridge: session ? { bridgeId, closed: session.closed, createdAt: session.createdAt, lastInputAt: session.lastInputAt, lastOutputAt: session.lastOutputAt, lastClearAt: session.lastClearAt, lastInputBytes: session.lastInputBytes, lastOutputBytes: session.lastOutputBytes, clearCount: session.clearCount, queuedInputChunks: session.chunks.length, } : bridgeId ? { bridgeId, closed: true } : undefined, }; } function normalizeMeetKey(value?: string): string | undefined { if (!value) { return undefined; } try { const url = new URL(value); if (url.hostname.toLowerCase() !== "meet.google.com") { return value; } const match = /^\/([a-z]{3}-[a-z]{4}-[a-z]{3})(?:$|[/?#])/i.exec(url.pathname); return match?.[1]?.toLowerCase() ?? value; } catch { return value; } } function summarizeSession(session: NodeBridgeSession) { return { bridgeId: session.id, url: session.url, mode: session.mode, closed: session.closed, createdAt: session.createdAt, closedAt: session.closedAt, lastInputAt: session.lastInputAt, lastOutputAt: session.lastOutputAt, lastInputBytes: session.lastInputBytes, lastOutputBytes: session.lastOutputBytes, }; } function listSessions(params: Record) { const urlKey = normalizeMeetKey(readString(params.url)); const mode = readString(params.mode); const bridges = [...sessions.values()] .filter((session) => !session.closed) .filter((session) => !urlKey || normalizeMeetKey(session.url) === urlKey) .filter((session) => !mode || session.mode === mode) .map(summarizeSession); return { bridges }; } function stopSessionsByUrl(params: Record) { const urlKey = normalizeMeetKey(readString(params.url)); if (!urlKey) { throw new Error("url required"); } const mode = readString(params.mode); const exceptBridgeId = readString(params.exceptBridgeId); let stopped = 0; for (const [bridgeId, session] of sessions) { if (exceptBridgeId && bridgeId === exceptBridgeId) { continue; } if (normalizeMeetKey(session.url) !== urlKey) { continue; } if (mode && session.mode !== mode) { continue; } const wasClosed = session.closed; stopSession(session); sessions.delete(bridgeId); if (!wasClosed) { stopped += 1; } } return { ok: true, stopped }; } function stopChrome(params: Record) { const bridgeId = readString(params.bridgeId); if (!bridgeId) { return { ok: true, stopped: false }; } const session = sessions.get(bridgeId); if (!session) { return { ok: true, stopped: false }; } stopSession(session); sessions.delete(bridgeId); return { ok: true, stopped: true }; } export async function handleGoogleMeetNodeHostCommand(paramsJSON?: string | null): Promise { const raw = paramsJSON ? JSON.parse(paramsJSON) : {}; const params = asRecord(raw); const action = readString(params.action); let result: unknown; switch (action) { case "setup": assertBlackHoleAvailable(10_000); result = { ok: true }; break; case "start": result = startChrome(params); break; case "status": result = bridgeStatus(params); break; case "list": result = listSessions(params); break; case "stopByUrl": result = stopSessionsByUrl(params); break; case "pullAudio": result = await pullAudio(params); break; case "pushAudio": result = pushAudio(params); break; case "clearAudio": result = clearAudio(params); break; case "stop": result = stopChrome(params); break; default: throw new Error("unsupported googlemeet.chrome action"); } return JSON.stringify(result); }