Files
openclaw/extensions/google-meet/src/node-host.ts
2026-05-04 02:53:02 +01:00

514 lines
15 KiB
TypeScript

import { spawn, spawnSync, type ChildProcess } from "node:child_process";
import { randomUUID } from "node:crypto";
import { setTimeout as sleep } from "node:timers/promises";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import {
DEFAULT_GOOGLE_MEET_AUDIO_INPUT_COMMAND,
DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND,
} from "./config.js";
import {
GOOGLE_MEET_SYSTEM_PROFILER_COMMAND,
outputMentionsBlackHole2ch,
} from "./transports/chrome.js";
type NodeBridgeSession = {
id: string;
url?: string;
mode?: string;
outputCommand: { command: string; args: string[] };
input?: ChildProcess;
output?: ChildProcess;
chunks: Buffer[];
waiters: Array<() => void>;
closed: boolean;
createdAt: string;
lastInputAt?: string;
lastOutputAt?: string;
lastClearAt?: string;
lastInputBytes: number;
lastOutputBytes: number;
closedAt?: string;
clearCount: number;
};
const sessions = new Map<string, NodeBridgeSession>();
function asRecord(value: unknown): Record<string, unknown> {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: {};
}
function readString(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function readStringArray(value: unknown): string[] | undefined {
if (!Array.isArray(value)) {
return undefined;
}
const result = value.filter(
(entry): entry is string => typeof entry === "string" && entry.length > 0,
);
return result.length > 0 ? result : undefined;
}
function readNumber(value: unknown, fallback: number): number {
return typeof value === "number" && Number.isFinite(value) && value > 0 ? value : fallback;
}
function runCommandWithTimeout(argv: string[], timeoutMs: number) {
const [command, ...args] = argv;
if (!command) {
throw new Error("command must not be empty");
}
const result = spawnSync(command, args, {
encoding: "utf8",
timeout: timeoutMs,
});
return {
code: typeof result.status === "number" ? result.status : result.error ? 1 : 0,
stdout: result.stdout ?? "",
stderr: result.stderr ?? (result.error ? formatErrorMessage(result.error) : ""),
};
}
function assertBlackHoleAvailable(timeoutMs: number) {
if (process.platform !== "darwin") {
throw new Error("Chrome Meet transport with blackhole-2ch audio is currently macOS-only");
}
const result = runCommandWithTimeout(
[GOOGLE_MEET_SYSTEM_PROFILER_COMMAND, "SPAudioDataType"],
timeoutMs,
);
const output = `${result.stdout}\n${result.stderr}`;
if (result.code !== 0 || !outputMentionsBlackHole2ch(output)) {
throw new Error("BlackHole 2ch audio device not found on the node.");
}
}
function splitCommand(argv: string[]): { command: string; args: string[] } {
const [command, ...args] = argv;
if (!command) {
throw new Error("audio command must not be empty");
}
return { command, args };
}
function wake(session: NodeBridgeSession) {
const waiters = session.waiters.splice(0);
for (const waiter of waiters) {
waiter();
}
}
function stopSession(session: NodeBridgeSession) {
const wasClosed = session.closed;
session.closed = true;
session.closedAt ??= new Date().toISOString();
terminateChild(session.input);
terminateChild(session.output);
if (!wasClosed) {
wake(session);
}
}
function attachOutputProcessHandlers(session: NodeBridgeSession, outputProcess: ChildProcess) {
outputProcess.on("exit", () => {
if (session.output === outputProcess) {
stopSession(session);
}
});
outputProcess.on("error", () => {
if (session.output === outputProcess) {
stopSession(session);
}
});
outputProcess.stdin?.on?.("error", () => {
if (session.output === outputProcess) {
stopSession(session);
}
});
}
function startOutputProcess(command: { command: string; args: string[] }) {
return spawn(command.command, command.args, {
stdio: ["pipe", "ignore", "pipe"],
});
}
function startCommandPair(params: {
inputCommand: string[];
outputCommand: string[];
url?: string;
mode?: string;
}): NodeBridgeSession {
const input = splitCommand(params.inputCommand);
const output = splitCommand(params.outputCommand);
const session: NodeBridgeSession = {
id: `meet_node_${randomUUID()}`,
url: params.url,
mode: params.mode,
outputCommand: output,
chunks: [],
waiters: [],
closed: false,
createdAt: new Date().toISOString(),
lastInputBytes: 0,
lastOutputBytes: 0,
clearCount: 0,
};
const outputProcess = startOutputProcess(output);
const inputProcess = spawn(input.command, input.args, {
stdio: ["ignore", "pipe", "pipe"],
});
session.input = inputProcess;
session.output = outputProcess;
inputProcess.stdout?.on("data", (chunk) => {
const audio = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
session.lastInputAt = new Date().toISOString();
session.lastInputBytes += audio.byteLength;
session.chunks.push(audio);
if (session.chunks.length > 200) {
session.chunks.splice(0, session.chunks.length - 200);
}
wake(session);
});
inputProcess.on("exit", () => stopSession(session));
attachOutputProcessHandlers(session, outputProcess);
inputProcess.on("error", () => stopSession(session));
sessions.set(session.id, session);
return session;
}
function terminateChild(child?: ChildProcess) {
if (!child) {
return;
}
let exited = child.exitCode !== null || child.signalCode !== null;
child.once?.("exit", () => {
exited = true;
});
try {
child.kill("SIGTERM");
} catch {
// Best-effort cleanup for node-host child processes.
}
const timer = setTimeout(() => {
if (exited) {
return;
}
try {
child.kill("SIGKILL");
} catch {
// Process may have exited after the grace check.
}
}, 2_000);
timer.unref?.();
}
async function pullAudio(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
if (!bridgeId) {
throw new Error("bridgeId required");
}
const session = sessions.get(bridgeId);
if (!session) {
throw new Error(`unknown bridgeId: ${bridgeId}`);
}
const timeoutMs = Math.min(readNumber(params.timeoutMs, 250), 2_000);
if (session.chunks.length === 0 && !session.closed) {
await Promise.race([
sleep(timeoutMs),
new Promise<void>((resolve) => {
session.waiters.push(resolve);
}),
]);
}
const chunk = session.chunks.shift();
return {
bridgeId,
closed: session.closed,
base64: chunk ? chunk.toString("base64") : undefined,
};
}
function pushAudio(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
const base64 = readString(params.base64);
if (!bridgeId || !base64) {
throw new Error("bridgeId and base64 required");
}
const session = sessions.get(bridgeId);
if (!session || session.closed) {
throw new Error(`bridge is not open: ${bridgeId}`);
}
const audio = Buffer.from(base64, "base64");
session.lastOutputAt = new Date().toISOString();
session.lastOutputBytes += audio.byteLength;
try {
session.output?.stdin?.write(audio);
} catch {
stopSession(session);
throw new Error(`bridge is not open: ${bridgeId}`);
}
return { bridgeId, ok: true };
}
function clearAudio(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
if (!bridgeId) {
throw new Error("bridgeId required");
}
const session = sessions.get(bridgeId);
if (!session || session.closed) {
throw new Error(`bridge is not open: ${bridgeId}`);
}
const previousOutput = session.output;
const outputProcess = startOutputProcess(session.outputCommand);
session.output = outputProcess;
attachOutputProcessHandlers(session, outputProcess);
session.clearCount += 1;
session.lastClearAt = new Date().toISOString();
terminateChild(previousOutput);
return { bridgeId, ok: true, clearCount: session.clearCount };
}
function startChrome(params: Record<string, unknown>) {
const url = readString(params.url);
if (!url) {
throw new Error("url required");
}
const timeoutMs = readNumber(params.joinTimeoutMs, 30_000);
const mode = readString(params.mode);
let bridgeId: string | undefined;
let audioBridge: { type: "external-command" | "node-command-pair" } | undefined;
if (mode === "agent" || mode === "bidi" || mode === "realtime") {
assertBlackHoleAvailable(Math.min(timeoutMs, 10_000));
const healthCommand = readStringArray(params.audioBridgeHealthCommand);
if (healthCommand) {
const health = runCommandWithTimeout(healthCommand, timeoutMs);
if (health.code !== 0) {
throw new Error(
`Chrome audio bridge health check failed: ${health.stderr || health.stdout || health.code}`,
);
}
}
const bridgeCommand = readStringArray(params.audioBridgeCommand);
if (bridgeCommand) {
if (mode === "agent") {
throw new Error(
"Chrome agent mode requires audioInputCommand and audioOutputCommand so OpenClaw can run STT and regular TTS directly.",
);
}
const bridge = runCommandWithTimeout(bridgeCommand, timeoutMs);
if (bridge.code !== 0) {
throw new Error(
`failed to start Chrome audio bridge: ${bridge.stderr || bridge.stdout || bridge.code}`,
);
}
audioBridge = { type: "external-command" };
} else {
const session = startCommandPair({
inputCommand: readStringArray(params.audioInputCommand) ?? [
...DEFAULT_GOOGLE_MEET_AUDIO_INPUT_COMMAND,
],
outputCommand: readStringArray(params.audioOutputCommand) ?? [
...DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND,
],
url,
mode,
});
bridgeId = session.id;
audioBridge = { type: "node-command-pair" };
}
}
if (params.launch !== false) {
const argv = ["open", "-a", "Google Chrome"];
const browserProfile = readString(params.browserProfile);
if (browserProfile) {
argv.push("--args", `--profile-directory=${browserProfile}`);
}
argv.push(url);
const result = runCommandWithTimeout(argv, timeoutMs);
if (result.code !== 0) {
if (bridgeId) {
const session = sessions.get(bridgeId);
if (session) {
stopSession(session);
}
}
throw new Error(
`failed to launch Chrome for Meet: ${result.stderr || result.stdout || result.code}`,
);
}
}
return {
launched: params.launch !== false,
bridgeId,
audioBridge,
browser:
params.launch !== false
? {
status: "chrome-opened",
browserUrl: url,
notes: [
"Browser page control is handled by OpenClaw browser automation when using chrome-node.",
],
}
: undefined,
};
}
function bridgeStatus(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
const session = bridgeId ? sessions.get(bridgeId) : undefined;
return {
bridge: session
? {
bridgeId,
closed: session.closed,
createdAt: session.createdAt,
lastInputAt: session.lastInputAt,
lastOutputAt: session.lastOutputAt,
lastClearAt: session.lastClearAt,
lastInputBytes: session.lastInputBytes,
lastOutputBytes: session.lastOutputBytes,
clearCount: session.clearCount,
queuedInputChunks: session.chunks.length,
}
: bridgeId
? { bridgeId, closed: true }
: undefined,
};
}
function normalizeMeetKey(value?: string): string | undefined {
if (!value) {
return undefined;
}
try {
const url = new URL(value);
if (url.hostname.toLowerCase() !== "meet.google.com") {
return value;
}
const match = /^\/([a-z]{3}-[a-z]{4}-[a-z]{3})(?:$|[/?#])/i.exec(url.pathname);
return match?.[1]?.toLowerCase() ?? value;
} catch {
return value;
}
}
function summarizeSession(session: NodeBridgeSession) {
return {
bridgeId: session.id,
url: session.url,
mode: session.mode,
closed: session.closed,
createdAt: session.createdAt,
closedAt: session.closedAt,
lastInputAt: session.lastInputAt,
lastOutputAt: session.lastOutputAt,
lastInputBytes: session.lastInputBytes,
lastOutputBytes: session.lastOutputBytes,
};
}
function listSessions(params: Record<string, unknown>) {
const urlKey = normalizeMeetKey(readString(params.url));
const mode = readString(params.mode);
const bridges = [...sessions.values()]
.filter((session) => !session.closed)
.filter((session) => !urlKey || normalizeMeetKey(session.url) === urlKey)
.filter((session) => !mode || session.mode === mode)
.map(summarizeSession);
return { bridges };
}
function stopSessionsByUrl(params: Record<string, unknown>) {
const urlKey = normalizeMeetKey(readString(params.url));
if (!urlKey) {
throw new Error("url required");
}
const mode = readString(params.mode);
const exceptBridgeId = readString(params.exceptBridgeId);
let stopped = 0;
for (const [bridgeId, session] of sessions) {
if (exceptBridgeId && bridgeId === exceptBridgeId) {
continue;
}
if (normalizeMeetKey(session.url) !== urlKey) {
continue;
}
if (mode && session.mode !== mode) {
continue;
}
const wasClosed = session.closed;
stopSession(session);
sessions.delete(bridgeId);
if (!wasClosed) {
stopped += 1;
}
}
return { ok: true, stopped };
}
function stopChrome(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
if (!bridgeId) {
return { ok: true, stopped: false };
}
const session = sessions.get(bridgeId);
if (!session) {
return { ok: true, stopped: false };
}
stopSession(session);
sessions.delete(bridgeId);
return { ok: true, stopped: true };
}
export async function handleGoogleMeetNodeHostCommand(paramsJSON?: string | null): Promise<string> {
const raw = paramsJSON ? JSON.parse(paramsJSON) : {};
const params = asRecord(raw);
const action = readString(params.action);
let result: unknown;
switch (action) {
case "setup":
assertBlackHoleAvailable(10_000);
result = { ok: true };
break;
case "start":
result = startChrome(params);
break;
case "status":
result = bridgeStatus(params);
break;
case "list":
result = listSessions(params);
break;
case "stopByUrl":
result = stopSessionsByUrl(params);
break;
case "pullAudio":
result = await pullAudio(params);
break;
case "pushAudio":
result = pushAudio(params);
break;
case "clearAudio":
result = clearAudio(params);
break;
case "stop":
result = stopChrome(params);
break;
default:
throw new Error("unsupported googlemeet.chrome action");
}
return JSON.stringify(result);
}