diff --git a/extensions/google-meet/index.test.ts b/extensions/google-meet/index.test.ts index c83211300fc..e917116a0ff 100644 --- a/extensions/google-meet/index.test.ts +++ b/extensions/google-meet/index.test.ts @@ -21,6 +21,7 @@ import { fetchGoogleMeetSpace, normalizeGoogleMeetSpaceName, } from "./src/meet.js"; +import { handleGoogleMeetNodeHostCommand } from "./src/node-host.js"; import { startNodeRealtimeAudioBridge } from "./src/realtime-node.js"; import { startCommandRealtimeAudioBridge } from "./src/realtime.js"; import { normalizeMeetUrl } from "./src/runtime.js"; @@ -1326,6 +1327,17 @@ describe("google-meet plugin", () => { expect(respond.mock.calls[0]?.[0]).toBe(true); expect(nodesList.mock.calls[0]).toEqual([]); + expect(nodesInvoke).toHaveBeenCalledWith( + expect.objectContaining({ + nodeId: "node-1", + command: "googlemeet.chrome", + params: expect.objectContaining({ + action: "stopByUrl", + url: "https://meet.google.com/abc-defg-hij", + mode: "transcribe", + }), + }), + ); expect(nodesInvoke).toHaveBeenCalledWith( expect.objectContaining({ nodeId: "node-1", @@ -1394,7 +1406,7 @@ describe("google-meet plugin", () => { expect( nodesInvoke.mock.calls.filter(([call]) => call.command === "googlemeet.chrome"), - ).toHaveLength(1); + ).toHaveLength(2); expect(second.mock.calls[0]?.[1]).toMatchObject({ session: { chrome: { health: { inCall: true, micMuted: false } }, @@ -1438,7 +1450,7 @@ describe("google-meet plugin", () => { expect( nodesInvoke.mock.calls.filter(([call]) => call.command === "googlemeet.chrome"), - ).toHaveLength(1); + ).toHaveLength(2); expect(second.mock.calls[0]?.[1]).toMatchObject({ session: { notes: expect.arrayContaining(["Reused existing active Meet session."]), @@ -2168,4 +2180,147 @@ describe("google-meet plugin", () => { }), ); }); + + it("keeps paired-node realtime audio alive after transient input pull failures", async () => { + const sendAudio = vi.fn(); + const bridge = { + connect: vi.fn(async () => {}), + sendAudio, + setMediaTimestamp: vi.fn(), + submitToolResult: vi.fn(), + acknowledgeMark: vi.fn(), + close: vi.fn(), + triggerGreeting: vi.fn(), + isConnected: vi.fn(() => true), + }; + const provider: RealtimeVoiceProviderPlugin = { + id: "openai", + label: "OpenAI", + autoSelectOrder: 1, + resolveConfig: ({ rawConfig }) => rawConfig, + isConfigured: () => true, + createBridge: () => bridge, + }; + let pullCount = 0; + const runtime = { + nodes: { + invoke: vi.fn(async ({ params }: { params?: { action?: string } }) => { + if (params?.action === "pullAudio") { + pullCount += 1; + if (pullCount === 1) { + throw new Error("transient node timeout"); + } + if (pullCount === 2) { + return { bridgeId: "bridge-1", base64: Buffer.from([5, 4, 3]).toString("base64") }; + } + await new Promise((resolve) => setTimeout(resolve, 1_000)); + return { bridgeId: "bridge-1" }; + } + return { ok: true }; + }), + }, + }; + + const handle = await startNodeRealtimeAudioBridge({ + config: resolveGoogleMeetConfig({ + realtime: { provider: "openai", model: "gpt-realtime" }, + }), + fullConfig: {} as never, + runtime: runtime as never, + meetingSessionId: "meet-1", + nodeId: "node-1", + bridgeId: "bridge-1", + logger: noopLogger, + providers: [provider], + }); + + await vi.waitFor(() => { + expect(sendAudio).toHaveBeenCalledWith(Buffer.from([5, 4, 3])); + }); + expect(bridge.close).not.toHaveBeenCalled(); + expect(handle.getHealth()).toMatchObject({ + audioInputActive: true, + lastInputBytes: 3, + consecutiveInputErrors: 0, + }); + + await handle.stop(); + }); + + it("stops paired-node realtime audio after repeated input pull failures", async () => { + const bridge = { + connect: vi.fn(async () => {}), + sendAudio: vi.fn(), + setMediaTimestamp: vi.fn(), + submitToolResult: vi.fn(), + acknowledgeMark: vi.fn(), + close: vi.fn(), + triggerGreeting: vi.fn(), + isConnected: vi.fn(() => true), + }; + const provider: RealtimeVoiceProviderPlugin = { + id: "openai", + label: "OpenAI", + autoSelectOrder: 1, + resolveConfig: ({ rawConfig }) => rawConfig, + isConfigured: () => true, + createBridge: () => bridge, + }; + const runtime = { + nodes: { + invoke: vi.fn(async ({ params }: { params?: { action?: string } }) => { + if (params?.action === "pullAudio") { + throw new Error("node invoke timeout"); + } + return { ok: true }; + }), + }, + }; + + const handle = await startNodeRealtimeAudioBridge({ + config: resolveGoogleMeetConfig({ + realtime: { provider: "openai", model: "gpt-realtime" }, + }), + fullConfig: {} as never, + runtime: runtime as never, + meetingSessionId: "meet-1", + nodeId: "node-1", + bridgeId: "bridge-1", + logger: noopLogger, + providers: [provider], + }); + + await vi.waitFor( + () => { + expect(bridge.close).toHaveBeenCalled(); + }, + { timeout: 3_000 }, + ); + expect(handle.getHealth()).toMatchObject({ + bridgeClosed: true, + consecutiveInputErrors: 5, + lastInputError: "node invoke timeout", + }); + expect(runtime.nodes.invoke).toHaveBeenCalledWith( + expect.objectContaining({ + nodeId: "node-1", + command: "googlemeet.chrome", + params: { action: "stop", bridgeId: "bridge-1" }, + timeoutMs: 5_000, + }), + ); + }); + + it("exposes node-host list and stop-by-url bridge actions", async () => { + const listed = JSON.parse( + await handleGoogleMeetNodeHostCommand( + JSON.stringify({ action: "list", url: "https://meet.google.com/abc-defg-hij" }), + ), + ); + expect(listed).toEqual({ bridges: [] }); + + await expect( + handleGoogleMeetNodeHostCommand(JSON.stringify({ action: "stopByUrl" })), + ).rejects.toThrow("url required"); + }); }); diff --git a/extensions/google-meet/src/node-host.ts b/extensions/google-meet/src/node-host.ts index 16d42ca17bc..c360d52c0ab 100644 --- a/extensions/google-meet/src/node-host.ts +++ b/extensions/google-meet/src/node-host.ts @@ -13,6 +13,8 @@ import { type NodeBridgeSession = { id: string; + url?: string; + mode?: string; input?: ChildProcess; output?: ChildProcess; chunks: Buffer[]; @@ -23,6 +25,7 @@ type NodeBridgeSession = { lastOutputAt?: string; lastInputBytes: number; lastOutputBytes: number; + closedAt?: string; }; const sessions = new Map(); @@ -101,19 +104,24 @@ function stopSession(session: NodeBridgeSession) { return; } session.closed = true; - session.input?.kill("SIGTERM"); - session.output?.kill("SIGTERM"); + session.closedAt = new Date().toISOString(); + terminateChild(session.input); + terminateChild(session.output); wake(session); } function startCommandPair(params: { inputCommand: string[]; outputCommand: string[]; + url?: string; + mode?: string; }): NodeBridgeSession { const input = splitCommand(params.inputCommand); const output = splitCommand(params.outputCommand); const session: NodeBridgeSession = { id: `meet_node_${randomUUID()}`, + url: params.url, + mode: params.mode, chunks: [], waiters: [], closed: false, @@ -147,6 +155,32 @@ function startCommandPair(params: { return session; } +function terminateChild(child?: ChildProcess) { + if (!child) { + return; + } + let exited = child.exitCode !== null || child.signalCode !== null; + child.once?.("exit", () => { + exited = true; + }); + try { + child.kill("SIGTERM"); + } catch { + // Best-effort cleanup for node-host child processes. + } + const timer = setTimeout(() => { + if (exited) { + return; + } + try { + child.kill("SIGKILL"); + } catch { + // Process may have exited after the grace check. + } + }, 2_000); + timer.unref?.(); +} + async function pullAudio(params: Record) { const bridgeId = readString(params.bridgeId); if (!bridgeId) { @@ -227,6 +261,8 @@ function startChrome(params: Record) { outputCommand: readStringArray(params.audioOutputCommand) ?? [ ...DEFAULT_GOOGLE_MEET_AUDIO_OUTPUT_COMMAND, ], + url, + mode: readString(params.mode), }); bridgeId = session.id; audioBridge = { type: "node-command-pair" }; @@ -290,6 +326,72 @@ function bridgeStatus(params: Record) { }; } +function normalizeMeetKey(value?: string): string | undefined { + if (!value) { + return undefined; + } + try { + const url = new URL(value); + if (url.hostname.toLowerCase() !== "meet.google.com") { + return value; + } + const match = /^\/([a-z]{3}-[a-z]{4}-[a-z]{3})(?:$|[/?#])/i.exec(url.pathname); + return match?.[1]?.toLowerCase() ?? value; + } catch { + return value; + } +} + +function summarizeSession(session: NodeBridgeSession) { + return { + bridgeId: session.id, + url: session.url, + mode: session.mode, + closed: session.closed, + createdAt: session.createdAt, + closedAt: session.closedAt, + lastInputAt: session.lastInputAt, + lastOutputAt: session.lastOutputAt, + lastInputBytes: session.lastInputBytes, + lastOutputBytes: session.lastOutputBytes, + }; +} + +function listSessions(params: Record) { + const urlKey = normalizeMeetKey(readString(params.url)); + const mode = readString(params.mode); + const bridges = [...sessions.values()] + .filter((session) => !urlKey || normalizeMeetKey(session.url) === urlKey) + .filter((session) => !mode || session.mode === mode) + .map(summarizeSession); + return { bridges }; +} + +function stopSessionsByUrl(params: Record) { + const urlKey = normalizeMeetKey(readString(params.url)); + if (!urlKey) { + throw new Error("url required"); + } + const mode = readString(params.mode); + const exceptBridgeId = readString(params.exceptBridgeId); + let stopped = 0; + for (const [bridgeId, session] of sessions) { + if (exceptBridgeId && bridgeId === exceptBridgeId) { + continue; + } + if (normalizeMeetKey(session.url) !== urlKey) { + continue; + } + if (mode && session.mode !== mode) { + continue; + } + stopSession(session); + sessions.delete(bridgeId); + stopped += 1; + } + return { ok: true, stopped }; +} + function stopChrome(params: Record) { const bridgeId = readString(params.bridgeId); if (!bridgeId) { @@ -320,6 +422,12 @@ export async function handleGoogleMeetNodeHostCommand(paramsJSON?: string | null case "status": result = bridgeStatus(params); break; + case "list": + result = listSessions(params); + break; + case "stopByUrl": + result = stopSessionsByUrl(params); + break; case "pullAudio": result = await pullAudio(params); break; diff --git a/extensions/google-meet/src/realtime-node.ts b/extensions/google-meet/src/realtime-node.ts index 43e578804b1..7c21f8d0a37 100644 --- a/extensions/google-meet/src/realtime-node.ts +++ b/extensions/google-meet/src/realtime-node.ts @@ -52,6 +52,8 @@ export async function startNodeRealtimeAudioBridge(params: { let lastOutputAt: string | undefined; let lastInputBytes = 0; let lastOutputBytes = 0; + let consecutiveInputErrors = 0; + let lastInputError: string | undefined; const resolved = resolveGoogleMeetRealtimeProvider({ config: params.config, fullConfig: params.fullConfig, @@ -183,6 +185,8 @@ export async function startNodeRealtimeAudioBridge(params: { timeoutMs: 2_000, }); const result = asRecord(asRecord(raw).payload ?? raw); + consecutiveInputErrors = 0; + lastInputError = undefined; const base64 = readString(result.base64); if (base64) { const audio = Buffer.from(base64, "base64"); @@ -195,8 +199,17 @@ export async function startNodeRealtimeAudioBridge(params: { } } catch (error) { if (!stopped) { - params.logger.warn(`[google-meet] node audio input failed: ${formatErrorMessage(error)}`); - await stop(); + const message = formatErrorMessage(error); + consecutiveInputErrors += 1; + lastInputError = message; + params.logger.warn( + `[google-meet] node audio input failed (${consecutiveInputErrors}/5): ${message}`, + ); + if (consecutiveInputErrors >= 5 || /unknown bridgeId|bridge is not open/i.test(message)) { + await stop(); + } else { + await new Promise((resolve) => setTimeout(resolve, 250)); + } } } } @@ -219,6 +232,8 @@ export async function startNodeRealtimeAudioBridge(params: { lastOutputAt, lastInputBytes, lastOutputBytes, + consecutiveInputErrors, + lastInputError, bridgeClosed: stopped, }), stop, diff --git a/extensions/google-meet/src/transports/chrome.ts b/extensions/google-meet/src/transports/chrome.ts index abf7c434f62..47d36d3bfe1 100644 --- a/extensions/google-meet/src/transports/chrome.ts +++ b/extensions/google-meet/src/transports/chrome.ts @@ -620,6 +620,24 @@ export async function launchChromeMeetOnNode(params: { runtime: params.runtime, requestedNode: params.config.chromeNode.node, }); + try { + await params.runtime.nodes.invoke({ + nodeId, + command: "googlemeet.chrome", + params: { + action: "stopByUrl", + url: params.url, + mode: params.mode, + }, + timeoutMs: 5_000, + }); + } catch (error) { + params.logger.debug?.( + `[google-meet] node bridge cleanup before join ignored: ${ + error instanceof Error ? error.message : String(error) + }`, + ); + } const browserControl = await openMeetWithBrowserProxy({ runtime: params.runtime, nodeId, diff --git a/extensions/google-meet/src/transports/types.ts b/extensions/google-meet/src/transports/types.ts index 87c81ec6f71..037b288beeb 100644 --- a/extensions/google-meet/src/transports/types.ts +++ b/extensions/google-meet/src/transports/types.ts @@ -33,6 +33,8 @@ export type GoogleMeetChromeHealth = { lastOutputAt?: string; lastInputBytes?: number; lastOutputBytes?: number; + consecutiveInputErrors?: number; + lastInputError?: string; browserUrl?: string; browserTitle?: string; bridgeClosed?: boolean;