Fix Google Meet chrome-node bridge cleanup

This commit is contained in:
BSnizND
2026-04-26 14:37:51 -07:00
committed by Peter Steinberger
parent edf43dfc88
commit 3536018db0
5 changed files with 304 additions and 6 deletions

View File

@@ -21,6 +21,7 @@ import {
fetchGoogleMeetSpace,
normalizeGoogleMeetSpaceName,
} from "./src/meet.js";
import { handleGoogleMeetNodeHostCommand } from "./src/node-host.js";
import { startNodeRealtimeAudioBridge } from "./src/realtime-node.js";
import { startCommandRealtimeAudioBridge } from "./src/realtime.js";
import { normalizeMeetUrl } from "./src/runtime.js";
@@ -1326,6 +1327,17 @@ describe("google-meet plugin", () => {
expect(respond.mock.calls[0]?.[0]).toBe(true);
expect(nodesList.mock.calls[0]).toEqual([]);
expect(nodesInvoke).toHaveBeenCalledWith(
expect.objectContaining({
nodeId: "node-1",
command: "googlemeet.chrome",
params: expect.objectContaining({
action: "stopByUrl",
url: "https://meet.google.com/abc-defg-hij",
mode: "transcribe",
}),
}),
);
expect(nodesInvoke).toHaveBeenCalledWith(
expect.objectContaining({
nodeId: "node-1",
@@ -1394,7 +1406,7 @@ describe("google-meet plugin", () => {
expect(
nodesInvoke.mock.calls.filter(([call]) => call.command === "googlemeet.chrome"),
).toHaveLength(1);
).toHaveLength(2);
expect(second.mock.calls[0]?.[1]).toMatchObject({
session: {
chrome: { health: { inCall: true, micMuted: false } },
@@ -1438,7 +1450,7 @@ describe("google-meet plugin", () => {
expect(
nodesInvoke.mock.calls.filter(([call]) => call.command === "googlemeet.chrome"),
).toHaveLength(1);
).toHaveLength(2);
expect(second.mock.calls[0]?.[1]).toMatchObject({
session: {
notes: expect.arrayContaining(["Reused existing active Meet session."]),
@@ -2168,4 +2180,147 @@ describe("google-meet plugin", () => {
}),
);
});
it("keeps paired-node realtime audio alive after transient input pull failures", async () => {
const sendAudio = vi.fn();
const bridge = {
connect: vi.fn(async () => {}),
sendAudio,
setMediaTimestamp: vi.fn(),
submitToolResult: vi.fn(),
acknowledgeMark: vi.fn(),
close: vi.fn(),
triggerGreeting: vi.fn(),
isConnected: vi.fn(() => true),
};
const provider: RealtimeVoiceProviderPlugin = {
id: "openai",
label: "OpenAI",
autoSelectOrder: 1,
resolveConfig: ({ rawConfig }) => rawConfig,
isConfigured: () => true,
createBridge: () => bridge,
};
let pullCount = 0;
const runtime = {
nodes: {
invoke: vi.fn(async ({ params }: { params?: { action?: string } }) => {
if (params?.action === "pullAudio") {
pullCount += 1;
if (pullCount === 1) {
throw new Error("transient node timeout");
}
if (pullCount === 2) {
return { bridgeId: "bridge-1", base64: Buffer.from([5, 4, 3]).toString("base64") };
}
await new Promise((resolve) => setTimeout(resolve, 1_000));
return { bridgeId: "bridge-1" };
}
return { ok: true };
}),
},
};
const handle = await startNodeRealtimeAudioBridge({
config: resolveGoogleMeetConfig({
realtime: { provider: "openai", model: "gpt-realtime" },
}),
fullConfig: {} as never,
runtime: runtime as never,
meetingSessionId: "meet-1",
nodeId: "node-1",
bridgeId: "bridge-1",
logger: noopLogger,
providers: [provider],
});
await vi.waitFor(() => {
expect(sendAudio).toHaveBeenCalledWith(Buffer.from([5, 4, 3]));
});
expect(bridge.close).not.toHaveBeenCalled();
expect(handle.getHealth()).toMatchObject({
audioInputActive: true,
lastInputBytes: 3,
consecutiveInputErrors: 0,
});
await handle.stop();
});
it("stops paired-node realtime audio after repeated input pull failures", async () => {
const bridge = {
connect: vi.fn(async () => {}),
sendAudio: vi.fn(),
setMediaTimestamp: vi.fn(),
submitToolResult: vi.fn(),
acknowledgeMark: vi.fn(),
close: vi.fn(),
triggerGreeting: vi.fn(),
isConnected: vi.fn(() => true),
};
const provider: RealtimeVoiceProviderPlugin = {
id: "openai",
label: "OpenAI",
autoSelectOrder: 1,
resolveConfig: ({ rawConfig }) => rawConfig,
isConfigured: () => true,
createBridge: () => bridge,
};
const runtime = {
nodes: {
invoke: vi.fn(async ({ params }: { params?: { action?: string } }) => {
if (params?.action === "pullAudio") {
throw new Error("node invoke timeout");
}
return { ok: true };
}),
},
};
const handle = await startNodeRealtimeAudioBridge({
config: resolveGoogleMeetConfig({
realtime: { provider: "openai", model: "gpt-realtime" },
}),
fullConfig: {} as never,
runtime: runtime as never,
meetingSessionId: "meet-1",
nodeId: "node-1",
bridgeId: "bridge-1",
logger: noopLogger,
providers: [provider],
});
await vi.waitFor(
() => {
expect(bridge.close).toHaveBeenCalled();
},
{ timeout: 3_000 },
);
expect(handle.getHealth()).toMatchObject({
bridgeClosed: true,
consecutiveInputErrors: 5,
lastInputError: "node invoke timeout",
});
expect(runtime.nodes.invoke).toHaveBeenCalledWith(
expect.objectContaining({
nodeId: "node-1",
command: "googlemeet.chrome",
params: { action: "stop", bridgeId: "bridge-1" },
timeoutMs: 5_000,
}),
);
});
it("exposes node-host list and stop-by-url bridge actions", async () => {
const listed = JSON.parse(
await handleGoogleMeetNodeHostCommand(
JSON.stringify({ action: "list", url: "https://meet.google.com/abc-defg-hij" }),
),
);
expect(listed).toEqual({ bridges: [] });
await expect(
handleGoogleMeetNodeHostCommand(JSON.stringify({ action: "stopByUrl" })),
).rejects.toThrow("url required");
});
});

View File

@@ -13,6 +13,8 @@ import {
type NodeBridgeSession = {
id: string;
url?: string;
mode?: string;
input?: ChildProcess;
output?: ChildProcess;
chunks: Buffer[];
@@ -23,6 +25,7 @@ type NodeBridgeSession = {
lastOutputAt?: string;
lastInputBytes: number;
lastOutputBytes: number;
closedAt?: string;
};
const sessions = new Map<string, NodeBridgeSession>();
@@ -101,19 +104,24 @@ function stopSession(session: NodeBridgeSession) {
return;
}
session.closed = true;
session.input?.kill("SIGTERM");
session.output?.kill("SIGTERM");
session.closedAt = new Date().toISOString();
terminateChild(session.input);
terminateChild(session.output);
wake(session);
}
function startCommandPair(params: {
inputCommand: string[];
outputCommand: string[];
url?: string;
mode?: string;
}): NodeBridgeSession {
const input = splitCommand(params.inputCommand);
const output = splitCommand(params.outputCommand);
const session: NodeBridgeSession = {
id: `meet_node_${randomUUID()}`,
url: params.url,
mode: params.mode,
chunks: [],
waiters: [],
closed: false,
@@ -147,6 +155,32 @@ function startCommandPair(params: {
return session;
}
function terminateChild(child?: ChildProcess) {
if (!child) {
return;
}
let exited = child.exitCode !== null || child.signalCode !== null;
child.once?.("exit", () => {
exited = true;
});
try {
child.kill("SIGTERM");
} catch {
// Best-effort cleanup for node-host child processes.
}
const timer = setTimeout(() => {
if (exited) {
return;
}
try {
child.kill("SIGKILL");
} catch {
// Process may have exited after the grace check.
}
}, 2_000);
timer.unref?.();
}
async function pullAudio(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
if (!bridgeId) {
@@ -227,6 +261,8 @@ function startChrome(params: Record<string, unknown>) {
outputCommand: readStringArray(params.audioOutputCommand) ?? [
...DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND,
],
url,
mode: readString(params.mode),
});
bridgeId = session.id;
audioBridge = { type: "node-command-pair" };
@@ -290,6 +326,72 @@ function bridgeStatus(params: Record<string, unknown>) {
};
}
function normalizeMeetKey(value?: string): string | undefined {
if (!value) {
return undefined;
}
try {
const url = new URL(value);
if (url.hostname.toLowerCase() !== "meet.google.com") {
return value;
}
const match = /^\/([a-z]{3}-[a-z]{4}-[a-z]{3})(?:$|[/?#])/i.exec(url.pathname);
return match?.[1]?.toLowerCase() ?? value;
} catch {
return value;
}
}
function summarizeSession(session: NodeBridgeSession) {
return {
bridgeId: session.id,
url: session.url,
mode: session.mode,
closed: session.closed,
createdAt: session.createdAt,
closedAt: session.closedAt,
lastInputAt: session.lastInputAt,
lastOutputAt: session.lastOutputAt,
lastInputBytes: session.lastInputBytes,
lastOutputBytes: session.lastOutputBytes,
};
}
function listSessions(params: Record<string, unknown>) {
const urlKey = normalizeMeetKey(readString(params.url));
const mode = readString(params.mode);
const bridges = [...sessions.values()]
.filter((session) => !urlKey || normalizeMeetKey(session.url) === urlKey)
.filter((session) => !mode || session.mode === mode)
.map(summarizeSession);
return { bridges };
}
function stopSessionsByUrl(params: Record<string, unknown>) {
const urlKey = normalizeMeetKey(readString(params.url));
if (!urlKey) {
throw new Error("url required");
}
const mode = readString(params.mode);
const exceptBridgeId = readString(params.exceptBridgeId);
let stopped = 0;
for (const [bridgeId, session] of sessions) {
if (exceptBridgeId && bridgeId === exceptBridgeId) {
continue;
}
if (normalizeMeetKey(session.url) !== urlKey) {
continue;
}
if (mode && session.mode !== mode) {
continue;
}
stopSession(session);
sessions.delete(bridgeId);
stopped += 1;
}
return { ok: true, stopped };
}
function stopChrome(params: Record<string, unknown>) {
const bridgeId = readString(params.bridgeId);
if (!bridgeId) {
@@ -320,6 +422,12 @@ export async function handleGoogleMeetNodeHostCommand(paramsJSON?: string | null
case "status":
result = bridgeStatus(params);
break;
case "list":
result = listSessions(params);
break;
case "stopByUrl":
result = stopSessionsByUrl(params);
break;
case "pullAudio":
result = await pullAudio(params);
break;

View File

@@ -52,6 +52,8 @@ export async function startNodeRealtimeAudioBridge(params: {
let lastOutputAt: string | undefined;
let lastInputBytes = 0;
let lastOutputBytes = 0;
let consecutiveInputErrors = 0;
let lastInputError: string | undefined;
const resolved = resolveGoogleMeetRealtimeProvider({
config: params.config,
fullConfig: params.fullConfig,
@@ -183,6 +185,8 @@ export async function startNodeRealtimeAudioBridge(params: {
timeoutMs: 2_000,
});
const result = asRecord(asRecord(raw).payload ?? raw);
consecutiveInputErrors = 0;
lastInputError = undefined;
const base64 = readString(result.base64);
if (base64) {
const audio = Buffer.from(base64, "base64");
@@ -195,8 +199,17 @@ export async function startNodeRealtimeAudioBridge(params: {
}
} catch (error) {
if (!stopped) {
params.logger.warn(`[google-meet] node audio input failed: ${formatErrorMessage(error)}`);
await stop();
const message = formatErrorMessage(error);
consecutiveInputErrors += 1;
lastInputError = message;
params.logger.warn(
`[google-meet] node audio input failed (${consecutiveInputErrors}/5): ${message}`,
);
if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) {
await stop();
} else {
await new Promise((resolve) => setTimeout(resolve, 250));
}
}
}
}
@@ -219,6 +232,8 @@ export async function startNodeRealtimeAudioBridge(params: {
lastOutputAt,
lastInputBytes,
lastOutputBytes,
consecutiveInputErrors,
lastInputError,
bridgeClosed: stopped,
}),
stop,

View File

@@ -620,6 +620,24 @@ export async function launchChromeMeetOnNode(params: {
runtime: params.runtime,
requestedNode: params.config.chromeNode.node,
});
try {
await params.runtime.nodes.invoke({
nodeId,
command: "googlemeet.chrome",
params: {
action: "stopByUrl",
url: params.url,
mode: params.mode,
},
timeoutMs: 5_000,
});
} catch (error) {
params.logger.debug?.(
`[google-meet] node bridge cleanup before join ignored: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
const browserControl = await openMeetWithBrowserProxy({
runtime: params.runtime,
nodeId,

View File

@@ -33,6 +33,8 @@ export type GoogleMeetChromeHealth = {
lastOutputAt?: string;
lastInputBytes?: number;
lastOutputBytes?: number;
consecutiveInputErrors?: number;
lastInputError?: string;
browserUrl?: string;
browserTitle?: string;
bridgeClosed?: boolean;