From 93bbbe5e37323883bd52334a1750c962b70fd18c Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 14:21:38 +0100 Subject: [PATCH] feat: add browser realtime talk transports --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/providers/google.md | 12 +- docs/providers/openai.md | 12 +- docs/web/control-ui.md | 8 +- .../google/realtime-voice-provider.test.ts | 93 +++- extensions/google/realtime-voice-provider.ts | 151 ++++- extensions/openai/realtime-voice-provider.ts | 2 + scripts/dev/realtime-talk-live-smoke.ts | 515 ++++++++++++++++++ src/gateway/method-scopes.ts | 4 + src/gateway/protocol/index.ts | 31 ++ src/gateway/protocol/schema/channels.ts | 103 +++- .../protocol/schema/protocol-schemas.ts | 10 + src/gateway/protocol/schema/types.ts | 5 + src/gateway/server-methods-list.ts | 4 + src/gateway/server-methods/talk.ts | 174 +++++- src/gateway/talk-realtime-relay.test.ts | 181 ++++++ src/gateway/talk-realtime-relay.ts | 248 +++++++++ src/realtime-voice/provider-types.ts | 50 +- ui/src/ui/chat/realtime-talk-audio.ts | 37 ++ ui/src/ui/chat/realtime-talk-gateway-relay.ts | 237 ++++++++ ui/src/ui/chat/realtime-talk-google-live.ts | 247 +++++++++ ui/src/ui/chat/realtime-talk-shared.ts | 184 +++++++ ui/src/ui/chat/realtime-talk-webrtc.ts | 182 +++++++ ui/src/ui/chat/realtime-talk.ts | 305 ++--------- ui/src/ui/realtime-talk.test.ts | 126 +++++ 26 files changed, 2607 insertions(+), 319 deletions(-) create mode 100644 scripts/dev/realtime-talk-live-smoke.ts create mode 100644 src/gateway/talk-realtime-relay.test.ts create mode 100644 src/gateway/talk-realtime-relay.ts create mode 100644 ui/src/ui/chat/realtime-talk-audio.ts create mode 100644 ui/src/ui/chat/realtime-talk-gateway-relay.ts create mode 100644 ui/src/ui/chat/realtime-talk-google-live.ts create mode 100644 ui/src/ui/chat/realtime-talk-shared.ts create mode 100644 ui/src/ui/chat/realtime-talk-webrtc.ts create mode 100644 ui/src/ui/realtime-talk.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index d3a1b8af263..6ccc0fc4d7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Docs: https://docs.openclaw.ai ### Changes - Plugins/runtime: expose provider-backed thinking policy and normalization through `api.runtime.agent`, letting tool plugins validate thinking overrides without duplicating provider/model level lists. Thanks @openclaw. +- Control UI/Talk: add a generic browser realtime transport contract, Google Live browser Talk sessions with constrained ephemeral tokens, and a Gateway relay for backend-only realtime voice plugins. Thanks @VACInc. - Providers: add Cerebras as a bundled plugin with onboarding, static model catalog, docs, and manifest-owned endpoint metadata. - Memory/OpenAI-compatible: add optional `memorySearch.inputType`, `queryInputType`, and `documentInputType` config for asymmetric embedding endpoints, including direct query embeddings and provider batch indexing. Carries forward #63313 and #60727. Thanks @HOYALIM and @prospect1314521. - Ollama/memory: add model-specific retrieval query prefixes for `nomic-embed-text`, `qwen3-embedding`, and `mxbai-embed-large` memory-search queries while leaving document batches unchanged. Carries forward #45013. Thanks @laolin5564. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index a46623a757d..1fcbe4bb217 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -b81647828ee6599cdd1d76d96ea02c92ccdebb8c1b3b443cefe10ca8bd2ddbfe plugin-sdk-api-baseline.json -ca9f3569352522621857b51872f30b3c31881505fd9eff2451b1b46d77670726 plugin-sdk-api-baseline.jsonl +7178659d932136074130426d08e596738a991c6812b2494149427d1f822f1be8 plugin-sdk-api-baseline.json +fc1e3ab9f21b6f7b6a55498cf5ee322d62dccf4c23322f0ba27559e55a59f901 plugin-sdk-api-baseline.jsonl diff --git a/docs/providers/google.md b/docs/providers/google.md index 8c01ee10876..c3edcc161d7 100644 --- a/docs/providers/google.md +++ b/docs/providers/google.md @@ -352,11 +352,17 @@ SDK rejects language-code hints on this API path. -Control UI Talk browser sessions still require a realtime voice provider with a -browser WebRTC session implementation. Today that path is OpenAI Realtime; the -Google provider is for backend realtime bridges. +Control UI Talk supports Google Live browser sessions with constrained one-use +tokens. Backend-only realtime voice providers can also run through the generic +Gateway relay transport, which keeps provider credentials on the Gateway. +For maintainer live verification, run +`OPENAI_API_KEY=... GEMINI_API_KEY=... node --import tsx scripts/dev/realtime-talk-live-smoke.ts`. +The Google leg mints the same constrained Live API token shape used by Control +UI Talk, opens the browser WebSocket endpoint, sends the initial setup payload, +and waits for `setupComplete`. + ## Advanced configuration diff --git a/docs/providers/openai.md b/docs/providers/openai.md index 7bff1308889..198db564636 100644 --- a/docs/providers/openai.md +++ b/docs/providers/openai.md @@ -546,7 +546,17 @@ Legacy `plugins.entries.openai.config.personality` is still read as a compatibil | API key | `...openai.apiKey` | Falls back to `OPENAI_API_KEY` | - Supports Azure OpenAI via `azureEndpoint` and `azureDeployment` config keys. Supports bidirectional tool calling. Uses G.711 u-law audio format. + Supports Azure OpenAI via `azureEndpoint` and `azureDeployment` config keys for backend realtime bridges. Supports bidirectional tool calling. Uses G.711 u-law audio format. + + + + Control UI Talk uses OpenAI browser realtime sessions with a Gateway-minted + ephemeral client secret and a direct browser WebRTC SDP exchange against the + OpenAI Realtime API. Maintainer live verification is available with + `OPENAI_API_KEY=... GEMINI_API_KEY=... node --import tsx scripts/dev/realtime-talk-live-smoke.ts`; + the OpenAI leg mints a client secret in Node, generates a browser SDP offer + with fake microphone media, posts it to OpenAI, and applies the SDP answer + without logging secrets. diff --git a/docs/web/control-ui.md b/docs/web/control-ui.md index d8f9d5a2e9d..e58899558bc 100644 --- a/docs/web/control-ui.md +++ b/docs/web/control-ui.md @@ -87,7 +87,7 @@ The Control UI can localize itself on first load based on your browser locale. T - Chat with the model via Gateway WS (`chat.history`, `chat.send`, `chat.abort`, `chat.inject`). - - Talk to OpenAI Realtime directly from the browser via WebRTC. The Gateway mints a short-lived Realtime client secret with `talk.realtime.session`; the browser sends microphone audio directly to OpenAI and relays `openclaw_agent_consult` tool calls back through `chat.send` for the larger configured OpenClaw model. + - Talk through browser realtime sessions. OpenAI uses direct WebRTC, Google Live uses a constrained one-use browser token over WebSocket, and backend-only realtime voice plugins use the Gateway relay transport. The relay keeps provider credentials on the Gateway while the browser streams microphone PCM through `talk.realtime.relay*` RPCs and sends `openclaw_agent_consult` tool calls back through `chat.send` for the larger configured OpenClaw model. - Stream tool calls + live tool output cards in Chat (agent events). @@ -144,11 +144,13 @@ The Control UI can localize itself on first load based on your browser locale. T - The chat header model and thinking pickers patch the active session immediately through `sessions.patch`; they are persistent session overrides, not one-turn-only send options. - When fresh Gateway session usage reports show high context pressure, the chat composer area shows a context notice and, at recommended compaction levels, a compact button that runs the normal session compaction path. Stale token snapshots are hidden until the Gateway reports fresh usage again. - - Talk mode uses a registered realtime voice provider that supports browser WebRTC sessions. Configure OpenAI with `talk.provider: "openai"` plus `talk.providers.openai.apiKey`, or reuse the Voice Call realtime provider config. The browser never receives the standard OpenAI API key; it receives only the ephemeral Realtime client secret. Google Live realtime voice is supported for backend Voice Call and Google Meet bridges, but not this browser WebRTC path yet. The Realtime session prompt is assembled by the Gateway; `talk.realtime.session` does not accept caller-provided instruction overrides. + + Talk mode uses a registered realtime voice provider. Configure OpenAI with `talk.provider: "openai"` plus `talk.providers.openai.apiKey`, or configure Google with `talk.provider: "google"` plus `talk.providers.google.apiKey`; Voice Call realtime provider config can still be reused as the fallback. The browser never receives a standard provider API key. OpenAI receives an ephemeral Realtime client secret for WebRTC. Google Live receives a one-use constrained Live API auth token for a browser WebSocket session, with instructions and tool declarations locked into the token by the Gateway. Providers that only expose a backend realtime bridge run through the Gateway relay transport, so credentials and vendor sockets stay server-side while browser audio moves through authenticated Gateway RPCs. The Realtime session prompt is assembled by the Gateway; `talk.realtime.session` does not accept caller-provided instruction overrides. In the Chat composer, the Talk control is the waves button next to the microphone dictation button. When Talk starts, the composer status row shows `Connecting Talk...`, then `Talk live` while audio is connected, or `Asking OpenClaw...` while a realtime tool call is consulting the configured larger model through `chat.send`. + Maintainer live smoke: `OPENAI_API_KEY=... GEMINI_API_KEY=... node --import tsx scripts/dev/realtime-talk-live-smoke.ts` verifies the OpenAI browser WebRTC SDP exchange, Google Live constrained-token browser WebSocket setup, and the Gateway relay browser adapter with fake microphone media. The command prints provider status only and does not log secrets. + - Click **Stop** (calls `chat.abort`). diff --git a/extensions/google/realtime-voice-provider.test.ts b/extensions/google/realtime-voice-provider.test.ts index d252df2b98c..814d4cf727a 100644 --- a/extensions/google/realtime-voice-provider.test.ts +++ b/extensions/google/realtime-voice-provider.test.ts @@ -20,7 +20,7 @@ type MockGoogleLiveConnectParams = { }; }; -const { connectMock, session } = vi.hoisted(() => { +const { connectMock, createTokenMock, session } = vi.hoisted(() => { const session: MockGoogleLiveSession = { close: vi.fn(), sendClientContent: vi.fn(), @@ -28,11 +28,17 @@ const { connectMock, session } = vi.hoisted(() => { sendToolResponse: vi.fn(), }; const connectMock = vi.fn(async (_params: MockGoogleLiveConnectParams) => session); - return { connectMock, session }; + const createTokenMock = vi.fn(async (_params: unknown) => ({ + name: "auth_tokens/browser-session", + })); + return { connectMock, createTokenMock, session }; }); vi.mock("./google-genai-runtime.js", () => ({ createGoogleGenAI: vi.fn(() => ({ + authTokens: { + create: createTokenMock, + }, live: { connect: connectMock, }, @@ -50,6 +56,7 @@ function lastConnectParams(): MockGoogleLiveConnectParams { describe("buildGoogleRealtimeVoiceProvider", () => { beforeEach(() => { connectMock.mockClear(); + createTokenMock.mockClear(); session.close.mockClear(); session.sendClientContent.mockClear(); session.sendRealtimeInput.mockClear(); @@ -223,6 +230,88 @@ describe("buildGoogleRealtimeVoiceProvider", () => { expect(lastConnectParams().config).not.toHaveProperty("temperature"); }); + it("creates constrained browser sessions for Google Live Talk", async () => { + const provider = buildGoogleRealtimeVoiceProvider(); + + const session = await provider.createBrowserSession?.({ + providerConfig: { + apiKey: "gemini-key", + model: "gemini-live-2.5-flash-preview", + voice: "Puck", + temperature: 0.4, + }, + instructions: "Speak briefly.", + tools: [ + { + type: "function", + name: "openclaw_agent_consult", + description: "Ask OpenClaw", + parameters: { + type: "object", + properties: { + question: { type: "string" }, + }, + required: ["question"], + }, + }, + ], + }); + + expect(createTokenMock).toHaveBeenCalledTimes(1); + expect(createTokenMock.mock.calls[0]?.[0]).toMatchObject({ + config: { + uses: 1, + liveConnectConstraints: { + model: "gemini-live-2.5-flash-preview", + config: { + responseModalities: ["AUDIO"], + temperature: 0.4, + systemInstruction: "Speak briefly.", + speechConfig: { + voiceConfig: { + prebuiltVoiceConfig: { + voiceName: "Puck", + }, + }, + }, + tools: [ + { + functionDeclarations: [ + { + name: "openclaw_agent_consult", + behavior: "NON_BLOCKING", + }, + ], + }, + ], + }, + }, + }, + }); + expect(session).toMatchObject({ + provider: "google", + transport: "json-pcm-websocket", + protocol: "google-live-bidi", + clientSecret: "auth_tokens/browser-session", + websocketUrl: + "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained", + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: 16000, + outputEncoding: "pcm16", + outputSampleRateHz: 24000, + }, + initialMessage: { + setup: { + model: "models/gemini-live-2.5-flash-preview", + generationConfig: { + responseModalities: ["AUDIO"], + }, + }, + }, + }); + }); + it("waits for setup completion before draining audio and firing ready", async () => { const provider = buildGoogleRealtimeVoiceProvider(); const onReady = vi.fn(); diff --git a/extensions/google/realtime-voice-provider.ts b/extensions/google/realtime-voice-provider.ts index 4244e60fc34..cc67a3535ce 100644 --- a/extensions/google/realtime-voice-provider.ts +++ b/extensions/google/realtime-voice-provider.ts @@ -9,6 +9,7 @@ import { TurnCoverage, type FunctionDeclaration, type FunctionResponse, + type LiveConnectConfig, type LiveServerContent, type LiveServerMessage, type LiveServerToolCall, @@ -19,6 +20,8 @@ import type { OpenClawConfig } from "openclaw/plugin-sdk/provider-onboard"; import type { RealtimeVoiceAudioFormat, RealtimeVoiceBridge, + RealtimeVoiceBrowserSession, + RealtimeVoiceBrowserSessionCreateRequest, RealtimeVoiceBridgeCreateRequest, RealtimeVoiceProviderConfig, RealtimeVoiceProviderPlugin, @@ -40,8 +43,13 @@ const GOOGLE_REALTIME_DEFAULT_MODEL = "gemini-2.5-flash-native-audio-preview-12- const GOOGLE_REALTIME_DEFAULT_VOICE = "Kore"; const GOOGLE_REALTIME_DEFAULT_API_VERSION = "v1beta"; const GOOGLE_REALTIME_INPUT_SAMPLE_RATE = 16_000; +const GOOGLE_REALTIME_BROWSER_API_VERSION = "v1alpha"; +const GOOGLE_REALTIME_BROWSER_WEBSOCKET_URL = + "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained"; const MAX_PENDING_AUDIO_CHUNKS = 320; const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 700; +const GOOGLE_REALTIME_BROWSER_SESSION_TTL_MS = 30 * 60 * 1000; +const GOOGLE_REALTIME_BROWSER_NEW_SESSION_TTL_MS = 60 * 1000; type GoogleRealtimeSensitivity = "low" | "high"; type GoogleRealtimeThinkingLevel = "minimal" | "low" | "medium" | "high"; @@ -66,8 +74,10 @@ type GoogleRealtimeVoiceProviderConfig = { thinkingBudget?: number; }; -type GoogleRealtimeVoiceBridgeConfig = RealtimeVoiceBridgeCreateRequest & { +type GoogleRealtimeLiveConfig = { apiKey: string; + instructions?: string; + tools?: RealtimeVoiceTool[]; model?: string; voice?: string; temperature?: number; @@ -84,6 +94,8 @@ type GoogleRealtimeVoiceBridgeConfig = RealtimeVoiceBridgeCreateRequest & { thinkingBudget?: number; }; +type GoogleRealtimeVoiceBridgeConfig = RealtimeVoiceBridgeCreateRequest & GoogleRealtimeLiveConfig; + type GoogleLiveSession = { sendClientContent: (params: { turns?: Array<{ role: string; parts: Array<{ text: string }> }>; @@ -258,7 +270,7 @@ function mapTurnCoverage(value: GoogleRealtimeTurnCoverage | undefined): TurnCov } } -function buildThinkingConfig(config: GoogleRealtimeVoiceBridgeConfig): ThinkingConfig | undefined { +function buildThinkingConfig(config: GoogleRealtimeLiveConfig): ThinkingConfig | undefined { if (config.thinkingLevel) { return { thinkingLevel: config.thinkingLevel.toUpperCase() as ThinkingConfig["thinkingLevel"] }; } @@ -269,7 +281,7 @@ function buildThinkingConfig(config: GoogleRealtimeVoiceBridgeConfig): ThinkingC } function buildRealtimeInputConfig( - config: GoogleRealtimeVoiceBridgeConfig, + config: GoogleRealtimeLiveConfig, ): RealtimeInputConfig | undefined { const startSensitivity = mapStartSensitivity(config.startSensitivity); const endSensitivity = mapEndSensitivity(config.endSensitivity); @@ -310,6 +322,51 @@ function buildFunctionDeclarations(tools: RealtimeVoiceTool[] | undefined): Func }); } +function buildGoogleLiveConnectConfig(config: GoogleRealtimeLiveConfig): LiveConnectConfig { + const functionDeclarations = buildFunctionDeclarations(config.tools); + return { + responseModalities: [Modality.AUDIO], + ...(typeof config.temperature === "number" && config.temperature > 0 + ? { temperature: config.temperature } + : {}), + speechConfig: { + voiceConfig: { + prebuiltVoiceConfig: { + voiceName: config.voice ?? GOOGLE_REALTIME_DEFAULT_VOICE, + }, + }, + }, + systemInstruction: config.instructions, + ...(functionDeclarations.length > 0 ? { tools: [{ functionDeclarations }] } : {}), + ...(buildRealtimeInputConfig(config) + ? { realtimeInputConfig: buildRealtimeInputConfig(config) } + : {}), + inputAudioTranscription: {}, + outputAudioTranscription: {}, + ...(typeof config.enableAffectiveDialog === "boolean" + ? { enableAffectiveDialog: config.enableAffectiveDialog } + : {}), + ...(buildThinkingConfig(config) ? { thinkingConfig: buildThinkingConfig(config) } : {}), + }; +} + +function toGoogleModelResource(model: string): string { + return model.startsWith("models/") ? model : `models/${model}`; +} + +function buildBrowserInitialSetup(model: string) { + return { + setup: { + model: toGoogleModelResource(model), + generationConfig: { + responseModalities: [Modality.AUDIO], + }, + inputAudioTranscription: {}, + outputAudioTranscription: {}, + }, + }; +} + function parsePcmSampleRate(mimeType: string | undefined): number { const match = mimeType?.match(/(?:^|[;,\s])rate=(\d+)/i); const parsed = match ? Number.parseInt(match[1] ?? "", 10) : Number.NaN; @@ -366,31 +423,9 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { }, }); - const functionDeclarations = buildFunctionDeclarations(this.config.tools); this.session = (await ai.live.connect({ model: this.config.model ?? GOOGLE_REALTIME_DEFAULT_MODEL, - config: { - responseModalities: [Modality.AUDIO], - ...(typeof this.config.temperature === "number" && this.config.temperature > 0 - ? { temperature: this.config.temperature } - : {}), - speechConfig: { - voiceConfig: { - prebuiltVoiceConfig: { - voiceName: this.config.voice ?? GOOGLE_REALTIME_DEFAULT_VOICE, - }, - }, - }, - systemInstruction: this.config.instructions, - ...(functionDeclarations.length > 0 ? { tools: [{ functionDeclarations }] } : {}), - ...(this.realtimeInputConfig ? { realtimeInputConfig: this.realtimeInputConfig } : {}), - inputAudioTranscription: {}, - outputAudioTranscription: {}, - ...(typeof this.config.enableAffectiveDialog === "boolean" - ? { enableAffectiveDialog: this.config.enableAffectiveDialog } - : {}), - ...(this.thinkingConfig ? { thinkingConfig: this.thinkingConfig } : {}), - }, + config: buildGoogleLiveConnectConfig(this.config), callbacks: { onopen: () => { this.connected = true; @@ -657,14 +692,67 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge { }); } } +} - private get realtimeInputConfig(): RealtimeInputConfig | undefined { - return buildRealtimeInputConfig(this.config); +async function createGoogleRealtimeBrowserSession( + req: RealtimeVoiceBrowserSessionCreateRequest, +): Promise { + const config = normalizeProviderConfig(req.providerConfig); + const apiKey = config.apiKey || resolveEnvApiKey(); + if (!apiKey) { + throw new Error("Google Gemini API key missing"); } - private get thinkingConfig(): ThinkingConfig | undefined { - return buildThinkingConfig(this.config); + const model = req.model ?? config.model ?? GOOGLE_REALTIME_DEFAULT_MODEL; + const voice = req.voice ?? config.voice ?? GOOGLE_REALTIME_DEFAULT_VOICE; + const expiresAtMs = Date.now() + GOOGLE_REALTIME_BROWSER_SESSION_TTL_MS; + const newSessionExpiresAtMs = Date.now() + GOOGLE_REALTIME_BROWSER_NEW_SESSION_TTL_MS; + const ai = createGoogleGenAI({ + apiKey, + httpOptions: { + apiVersion: GOOGLE_REALTIME_BROWSER_API_VERSION, + }, + }); + const token = await ai.authTokens.create({ + config: { + uses: 1, + expireTime: new Date(expiresAtMs).toISOString(), + newSessionExpireTime: new Date(newSessionExpiresAtMs).toISOString(), + liveConnectConstraints: { + model, + config: buildGoogleLiveConnectConfig({ + ...config, + apiKey, + model, + voice, + instructions: req.instructions, + tools: req.tools, + }), + }, + }, + }); + const clientSecret = token.name?.trim(); + if (!clientSecret) { + throw new Error("Google Live browser session did not return an ephemeral token"); } + + return { + provider: "google", + transport: "json-pcm-websocket", + protocol: "google-live-bidi", + clientSecret, + websocketUrl: GOOGLE_REALTIME_BROWSER_WEBSOCKET_URL, + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: GOOGLE_REALTIME_INPUT_SAMPLE_RATE, + outputEncoding: "pcm16", + outputSampleRateHz: 24_000, + }, + initialMessage: buildBrowserInitialSetup(model), + model, + voice, + expiresAt: Math.floor(expiresAtMs / 1000), + }; } export function buildGoogleRealtimeVoiceProvider(): RealtimeVoiceProviderPlugin { @@ -700,6 +788,7 @@ export function buildGoogleRealtimeVoiceProvider(): RealtimeVoiceProviderPlugin thinkingBudget: config.thinkingBudget, }); }, + createBrowserSession: createGoogleRealtimeBrowserSession, }; } @@ -707,5 +796,7 @@ export { GOOGLE_REALTIME_DEFAULT_API_VERSION, GOOGLE_REALTIME_DEFAULT_MODEL, GOOGLE_REALTIME_DEFAULT_VOICE, + GOOGLE_REALTIME_BROWSER_API_VERSION, + GOOGLE_REALTIME_BROWSER_WEBSOCKET_URL, }; export type { GoogleRealtimeVoiceProviderConfig }; diff --git a/extensions/openai/realtime-voice-provider.ts b/extensions/openai/realtime-voice-provider.ts index 3a43da4c8d0..f9b8ad51e3e 100644 --- a/extensions/openai/realtime-voice-provider.ts +++ b/extensions/openai/realtime-voice-provider.ts @@ -665,7 +665,9 @@ async function createOpenAIRealtimeBrowserSession( : undefined; return { provider: "openai", + transport: "webrtc-sdp", clientSecret, + offerUrl: "https://api.openai.com/v1/realtime/calls", model, voice, ...(typeof expiresAt === "number" ? { expiresAt } : {}), diff --git a/scripts/dev/realtime-talk-live-smoke.ts b/scripts/dev/realtime-talk-live-smoke.ts new file mode 100644 index 00000000000..43f3c544157 --- /dev/null +++ b/scripts/dev/realtime-talk-live-smoke.ts @@ -0,0 +1,515 @@ +import { mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { GoogleGenAI, Modality } from "@google/genai"; +import { chromium, type Browser } from "playwright"; +import { createServer, type ViteDevServer } from "vite"; + +const OPENAI_REALTIME_MODEL = + process.env.OPENCLAW_REALTIME_OPENAI_MODEL?.trim() || "gpt-realtime-1.5"; +const OPENAI_REALTIME_VOICE = process.env.OPENCLAW_REALTIME_OPENAI_VOICE?.trim() || "alloy"; +const GOOGLE_REALTIME_MODEL = + process.env.OPENCLAW_REALTIME_GOOGLE_MODEL?.trim() || + "gemini-2.5-flash-native-audio-preview-12-2025"; +const GOOGLE_REALTIME_VOICE = process.env.OPENCLAW_REALTIME_GOOGLE_VOICE?.trim() || "Kore"; +const GOOGLE_LIVE_WS_URL = + "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained"; + +type SmokeResult = { + name: string; + ok: boolean; + details?: Record; +}; + +function getEnv(name: string): string | undefined { + const value = process.env[name]?.trim(); + return value ? value : undefined; +} + +function shortError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +async function readBoundedText(response: Response): Promise { + const text = await response.text(); + return text.length > 600 ? `${text.slice(0, 600)}...` : text; +} + +function printResult(result: SmokeResult): void { + console.log(`${result.name}: ${result.ok ? "ok" : "failed"}`, result.details ?? {}); +} + +function compareStrings(left: string | undefined, right: string | undefined): number { + return (left ?? "").localeCompare(right ?? ""); +} + +async function createOpenAIClientSecret(apiKey: string): Promise { + const response = await fetch("https://api.openai.com/v1/realtime/client_secrets", { + method: "POST", + headers: { + Authorization: `Bearer ${apiKey}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + session: { + type: "realtime", + model: OPENAI_REALTIME_MODEL, + audio: { + output: { voice: OPENAI_REALTIME_VOICE }, + }, + }, + }), + }); + if (!response.ok) { + throw new Error( + `OpenAI Realtime client secret failed (${response.status}): ${await readBoundedText( + response, + )}`, + ); + } + const payload = (await response.json()) as Record; + const nested = + payload.client_secret && typeof payload.client_secret === "object" + ? (payload.client_secret as Record) + : undefined; + const value = typeof payload.value === "string" ? payload.value : undefined; + const nestedValue = typeof nested?.value === "string" ? nested.value : undefined; + const secret = value ?? nestedValue; + if (!secret) { + throw new Error("OpenAI Realtime client secret response did not include a value"); + } + return secret; +} + +async function smokeOpenAIWebRtc(browser: Browser, apiKey: string): Promise { + try { + const clientSecret = await createOpenAIClientSecret(apiKey); + const context = await browser.newContext({ + permissions: ["microphone"], + }); + const page = await context.newPage(); + const result = await page.evaluate( + async ({ clientSecret: secret }) => { + let media: MediaStream; + if (navigator.mediaDevices?.getUserMedia) { + media = await navigator.mediaDevices.getUserMedia({ audio: true }); + } else { + const audioContext = new AudioContext(); + const destination = audioContext.createMediaStreamDestination(); + const oscillator = audioContext.createOscillator(); + oscillator.connect(destination); + oscillator.start(); + media = destination.stream; + } + const peer = new RTCPeerConnection(); + for (const track of media.getAudioTracks()) { + peer.addTrack(track, media); + } + const channel = peer.createDataChannel("oai-events"); + const connectionState = new Promise((resolve) => { + const timeout = window.setTimeout(() => resolve(peer.connectionState), 12_000); + peer.addEventListener("connectionstatechange", () => { + if (peer.connectionState === "connected" || peer.connectionState === "failed") { + window.clearTimeout(timeout); + resolve(peer.connectionState); + } + }); + channel.addEventListener("open", () => { + window.clearTimeout(timeout); + resolve(peer.connectionState || "data-channel-open"); + }); + }); + const offer = await peer.createOffer(); + await peer.setLocalDescription(offer); + const response = await fetch("https://api.openai.com/v1/realtime/calls", { + method: "POST", + body: offer.sdp, + headers: { + Authorization: `Bearer ${secret}`, + "Content-Type": "application/sdp", + }, + }); + if (!response.ok) { + throw new Error(`OpenAI Realtime SDP offer failed (${response.status})`); + } + const answer = await response.text(); + await peer.setRemoteDescription({ type: "answer", sdp: answer }); + const state = await connectionState; + peer.close(); + media.getTracks().forEach((track) => track.stop()); + return { + answerHasAudio: answer.includes("m=audio"), + remoteDescriptionApplied: peer.remoteDescription?.type === "answer", + connectionState: state, + }; + }, + { clientSecret }, + ); + await context.close(); + return { + name: "openai-webrtc-browser", + ok: result.answerHasAudio && result.remoteDescriptionApplied, + details: { + model: OPENAI_REALTIME_MODEL, + answerHasAudio: result.answerHasAudio, + remoteDescriptionApplied: result.remoteDescriptionApplied, + connectionState: result.connectionState, + }, + }; + } catch (error) { + return { name: "openai-webrtc-browser", ok: false, details: { error: shortError(error) } }; + } +} + +async function createGoogleLiveToken(apiKey: string): Promise { + const ai = new GoogleGenAI({ + apiKey, + httpOptions: { apiVersion: "v1alpha" }, + }); + const now = Date.now(); + const token = await ai.authTokens.create({ + config: { + uses: 1, + expireTime: new Date(now + 30 * 60 * 1000).toISOString(), + newSessionExpireTime: new Date(now + 60 * 1000).toISOString(), + liveConnectConstraints: { + model: GOOGLE_REALTIME_MODEL, + config: { + responseModalities: [Modality.AUDIO], + speechConfig: { + voiceConfig: { + prebuiltVoiceConfig: { voiceName: GOOGLE_REALTIME_VOICE }, + }, + }, + systemInstruction: "OpenClaw browser Talk live smoke.", + inputAudioTranscription: {}, + outputAudioTranscription: {}, + }, + }, + }, + }); + const name = token.name?.trim(); + if (!name) { + throw new Error("Google Live auth token response did not include a token name"); + } + return name; +} + +async function smokeGoogleLiveBrowserWs(browser: Browser, apiKey: string): Promise { + try { + const token = await createGoogleLiveToken(apiKey); + const page = await browser.newPage(); + await page.evaluate("globalThis.__name = (fn) => fn"); + const result = await page.evaluate( + async ({ model, tokenName, websocketUrl }) => { + const debug: { + opened: boolean; + messages: string[]; + close?: { code: number; reason: string }; + error: boolean; + } = { opened: false, messages: [], error: false }; + const dataToText = async (data: unknown): Promise => { + if (typeof data === "string") { + return data; + } + if (data instanceof Blob) { + return await data.text(); + } + if (data instanceof ArrayBuffer) { + return new TextDecoder().decode(data); + } + return String(data); + }; + const url = new URL(websocketUrl); + url.searchParams.set("access_token", tokenName); + const ws = new WebSocket(url.toString()); + const done = new Promise>((resolve, reject) => { + const timeout = window.setTimeout( + () => reject(new Error(`Google Live setup timed out: ${JSON.stringify(debug)}`)), + 15_000, + ); + ws.addEventListener("open", () => { + debug.opened = true; + ws.send( + JSON.stringify({ + setup: { + model: model.startsWith("models/") ? model : `models/${model}`, + generationConfig: { responseModalities: ["AUDIO"] }, + inputAudioTranscription: {}, + outputAudioTranscription: {}, + }, + }), + ); + }); + ws.addEventListener("message", (event) => { + void (async () => { + const text = await dataToText(event.data); + debug.messages.push(text.slice(0, 300)); + const message = JSON.parse(text) as { setupComplete?: unknown }; + if (!message.setupComplete) { + return; + } + window.clearTimeout(timeout); + resolve({ setupComplete: true, readyState: ws.readyState }); + })().catch((error) => { + window.clearTimeout(timeout); + reject(error); + }); + }); + ws.addEventListener("error", () => { + debug.error = true; + window.clearTimeout(timeout); + reject(new Error("Google Live browser WebSocket errored")); + }); + ws.addEventListener("close", (event) => { + debug.close = { code: event.code, reason: event.reason }; + if (event.code !== 1000) { + window.clearTimeout(timeout); + reject(new Error(`Google Live browser WebSocket closed: ${JSON.stringify(debug)}`)); + } + }); + }); + const value = await done; + ws.close(1000); + return value; + }, + { + model: GOOGLE_REALTIME_MODEL, + tokenName: token, + websocketUrl: GOOGLE_LIVE_WS_URL, + }, + ); + await page.close(); + return { + name: "google-live-browser-ws", + ok: result.setupComplete === true, + details: { model: GOOGLE_REALTIME_MODEL, setupComplete: result.setupComplete === true }, + }; + } catch (error) { + return { name: "google-live-browser-ws", ok: false, details: { error: shortError(error) } }; + } +} + +async function smokeGatewayRelayBrowser(browser: Browser): Promise { + let server: ViteDevServer | undefined; + const dir = await mkdtemp(path.join(tmpdir(), "openclaw-realtime-talk-")); + try { + const repoRoot = process.cwd().replaceAll("\\", "/"); + await writeFile( + path.join(dir, "index.html"), + '', + ); + await writeFile( + path.join(dir, "main.ts"), + ` +import { GatewayRelayRealtimeTalkTransport } from "/@fs/${repoRoot}/ui/src/ui/chat/realtime-talk-gateway-relay.ts"; + +const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); +const listeners = new Set(); +const requests = []; +const statuses = []; +const transcripts = []; + +function emit(event) { + for (const listener of [...listeners]) { + listener(event); + } +} + +function base64ZeroPcm(bytes) { + let text = ""; + for (let index = 0; index < bytes; index += 1) { + text += String.fromCharCode(0); + } + return btoa(text); +} + +const client = { + addEventListener(listener) { + listeners.add(listener); + return () => listeners.delete(listener); + }, + async request(method, params) { + requests.push({ method, params }); + if (method === "chat.send") { + const runId = params.idempotencyKey || "run-smoke"; + window.setTimeout(() => { + emit({ event: "chat", payload: { runId, state: "final", message: { text: "relay consult ok" } } }); + }, 50); + return { runId }; + } + return { ok: true }; + }, +}; + +try { + const transport = new GatewayRelayRealtimeTalkTransport( + { + provider: "smoke", + transport: "gateway-relay", + relaySessionId: "relay-live-smoke", + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: 24000, + outputEncoding: "pcm16", + outputSampleRateHz: 24000, + }, + }, + { + client, + sessionKey: "main", + callbacks: { + onStatus: (status, detail) => statuses.push({ status, detail }), + onTranscript: (entry) => transcripts.push(entry), + }, + }, + ); + await transport.start(); + emit({ event: "talk.realtime.relay", payload: { relaySessionId: "relay-live-smoke", type: "ready" } }); + emit({ + event: "talk.realtime.relay", + payload: { relaySessionId: "relay-live-smoke", type: "transcript", role: "user", text: "relay user", final: true }, + }); + emit({ + event: "talk.realtime.relay", + payload: { relaySessionId: "relay-live-smoke", type: "transcript", role: "assistant", text: "relay assistant", final: false }, + }); + emit({ + event: "talk.realtime.relay", + payload: { relaySessionId: "relay-live-smoke", type: "audio", audioBase64: base64ZeroPcm(480) }, + }); + const processor = transport.inputProcessor; + processor?.onaudioprocess?.({ + inputBuffer: { getChannelData: () => new Float32Array(160).fill(0.01) }, + }); + emit({ event: "talk.realtime.relay", payload: { relaySessionId: "relay-live-smoke", type: "mark" } }); + emit({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-live-smoke", + type: "toolCall", + callId: "call-smoke", + name: "openclaw_agent_consult", + args: { question: "confirm relay consult path" }, + }, + }); + await delay(400); + transport.stop(); + await delay(100); + window.__relaySmokeResult = { requests, statuses, transcripts }; + window.__relaySmokeDone = true; +} catch (error) { + window.__relaySmokeResult = { error: error instanceof Error ? error.message : String(error), requests, statuses, transcripts }; + window.__relaySmokeDone = true; +} +`, + ); + server = await createServer({ + root: dir, + logLevel: "silent", + server: { host: "127.0.0.1", port: 0 }, + }); + await server.listen(); + const address = server.httpServer?.address(); + if (!address || typeof address === "string") { + throw new Error("Vite did not expose a local port"); + } + const url = `http://127.0.0.1:${address.port}/`; + const context = await browser.newContext({ permissions: ["microphone"] }); + await context.grantPermissions(["microphone"], { origin: url }); + const page = await context.newPage(); + await page.goto(url); + await page.waitForFunction(() => globalThis.__relaySmokeDone === true, undefined, { + timeout: 15_000, + }); + const result = (await page.evaluate(() => globalThis.__relaySmokeResult)) as { + error?: string; + requests?: Array<{ method?: string }>; + statuses?: Array<{ status?: string }>; + transcripts?: Array<{ role?: string; text?: string }>; + }; + await context.close(); + if (result.error) { + throw new Error(result.error); + } + const methods = new Set((result.requests ?? []).map((request) => request.method)); + const statusNames = new Set((result.statuses ?? []).map((entry) => entry.status)); + const transcriptTexts = new Set((result.transcripts ?? []).map((entry) => entry.text)); + const expectedMethods = [ + "talk.realtime.relayAudio", + "talk.realtime.relayMark", + "talk.realtime.relayToolResult", + "talk.realtime.relayStop", + ]; + const ok = + expectedMethods.every((method) => methods.has(method)) && + statusNames.has("listening") && + statusNames.has("thinking") && + transcriptTexts.has("relay user") && + transcriptTexts.has("relay assistant"); + return { + name: "gateway-relay-browser-adapter", + ok, + details: { + methods: [...methods].toSorted(compareStrings), + statuses: [...statusNames].toSorted(compareStrings), + transcripts: [...transcriptTexts].toSorted(compareStrings), + }, + }; + } catch (error) { + return { + name: "gateway-relay-browser-adapter", + ok: false, + details: { error: shortError(error) }, + }; + } finally { + await server?.close(); + await rm(dir, { recursive: true, force: true }); + } +} + +async function main(): Promise { + const openAIKey = getEnv("OPENAI_API_KEY"); + const googleKey = getEnv("GEMINI_API_KEY") ?? getEnv("GOOGLE_API_KEY"); + const browser = await chromium.launch({ + headless: true, + args: [ + "--autoplay-policy=no-user-gesture-required", + "--no-sandbox", + "--use-fake-device-for-media-stream", + "--use-fake-ui-for-media-stream", + ], + }); + const results: SmokeResult[] = []; + try { + if (!openAIKey) { + results.push({ + name: "openai-webrtc-browser", + ok: false, + details: { error: "OPENAI_API_KEY missing" }, + }); + } else { + results.push(await smokeOpenAIWebRtc(browser, openAIKey)); + } + if (!googleKey) { + results.push({ + name: "google-live-browser-ws", + ok: false, + details: { error: "GEMINI_API_KEY or GOOGLE_API_KEY missing" }, + }); + } else { + results.push(await smokeGoogleLiveBrowserWs(browser, googleKey)); + } + results.push(await smokeGatewayRelayBrowser(browser)); + } finally { + await browser.close(); + } + for (const result of results) { + printResult(result); + } + if (results.some((result) => !result.ok)) { + process.exitCode = 1; + } +} + +await main(); diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index 0e0d572b75f..6b4f81411e0 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -129,6 +129,10 @@ const METHOD_SCOPE_GROUPS: Record = { "wake", "talk.mode", "talk.realtime.session", + "talk.realtime.relayAudio", + "talk.realtime.relayMark", + "talk.realtime.relayStop", + "talk.realtime.relayToolResult", "talk.speak", "tts.enable", "tts.disable", diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 8275076797f..20be8b61cce 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -52,6 +52,16 @@ import { TalkConfigParamsSchema, type TalkConfigResult, TalkConfigResultSchema, + type TalkRealtimeRelayAudioParams, + TalkRealtimeRelayAudioParamsSchema, + type TalkRealtimeRelayMarkParams, + TalkRealtimeRelayMarkParamsSchema, + type TalkRealtimeRelayOkResult, + TalkRealtimeRelayOkResultSchema, + type TalkRealtimeRelayStopParams, + TalkRealtimeRelayStopParamsSchema, + type TalkRealtimeRelayToolResultParams, + TalkRealtimeRelayToolResultParamsSchema, type TalkRealtimeSessionParams, TalkRealtimeSessionParamsSchema, type TalkRealtimeSessionResult, @@ -463,6 +473,17 @@ export const validateTalkRealtimeSessionParams = ajv.compile( TalkRealtimeSessionResultSchema, ); +export const validateTalkRealtimeRelayAudioParams = ajv.compile( + TalkRealtimeRelayAudioParamsSchema, +); +export const validateTalkRealtimeRelayMarkParams = ajv.compile( + TalkRealtimeRelayMarkParamsSchema, +); +export const validateTalkRealtimeRelayStopParams = ajv.compile( + TalkRealtimeRelayStopParamsSchema, +); +export const validateTalkRealtimeRelayToolResultParams = + ajv.compile(TalkRealtimeRelayToolResultParamsSchema); export const validateTalkSpeakParams = ajv.compile(TalkSpeakParamsSchema); export const validateTalkSpeakResult = ajv.compile(TalkSpeakResultSchema); export const validateChannelsStatusParams = ajv.compile( @@ -660,6 +681,11 @@ export { TalkConfigResultSchema, TalkRealtimeSessionParamsSchema, TalkRealtimeSessionResultSchema, + TalkRealtimeRelayAudioParamsSchema, + TalkRealtimeRelayMarkParamsSchema, + TalkRealtimeRelayStopParamsSchema, + TalkRealtimeRelayToolResultParamsSchema, + TalkRealtimeRelayOkResultSchema, TalkSpeakParamsSchema, TalkSpeakResultSchema, ChannelsStatusParamsSchema, @@ -766,6 +792,11 @@ export type { TalkConfigResult, TalkRealtimeSessionParams, TalkRealtimeSessionResult, + TalkRealtimeRelayAudioParams, + TalkRealtimeRelayMarkParams, + TalkRealtimeRelayStopParams, + TalkRealtimeRelayToolResultParams, + TalkRealtimeRelayOkResult, TalkSpeakParams, TalkSpeakResult, TalkModeParams, diff --git a/src/gateway/protocol/schema/channels.ts b/src/gateway/protocol/schema/channels.ts index 45769210a18..d6914488d09 100644 --- a/src/gateway/protocol/schema/channels.ts +++ b/src/gateway/protocol/schema/channels.ts @@ -46,10 +46,62 @@ export const TalkRealtimeSessionParamsSchema = Type.Object( { additionalProperties: false }, ); -export const TalkRealtimeSessionResultSchema = Type.Object( +export const TalkRealtimeRelayAudioParamsSchema = Type.Object( + { + relaySessionId: NonEmptyString, + audioBase64: NonEmptyString, + timestamp: Type.Optional(Type.Number()), + }, + { additionalProperties: false }, +); + +export const TalkRealtimeRelayMarkParamsSchema = Type.Object( + { + relaySessionId: NonEmptyString, + markName: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + +export const TalkRealtimeRelayStopParamsSchema = Type.Object( + { + relaySessionId: NonEmptyString, + }, + { additionalProperties: false }, +); + +export const TalkRealtimeRelayToolResultParamsSchema = Type.Object( + { + relaySessionId: NonEmptyString, + callId: NonEmptyString, + result: Type.Unknown(), + }, + { additionalProperties: false }, +); + +export const TalkRealtimeRelayOkResultSchema = Type.Object( + { + ok: Type.Boolean(), + }, + { additionalProperties: false }, +); + +const BrowserRealtimeAudioContractSchema = Type.Object( + { + inputEncoding: Type.Union([Type.Literal("pcm16"), Type.Literal("g711_ulaw")]), + inputSampleRateHz: Type.Integer({ minimum: 1 }), + outputEncoding: Type.Union([Type.Literal("pcm16"), Type.Literal("g711_ulaw")]), + outputSampleRateHz: Type.Integer({ minimum: 1 }), + }, + { additionalProperties: false }, +); + +const BrowserRealtimeWebRtcSdpSessionSchema = Type.Object( { provider: NonEmptyString, + transport: Type.Optional(Type.Literal("webrtc-sdp")), clientSecret: NonEmptyString, + offerUrl: Type.Optional(Type.String()), model: Type.Optional(Type.String()), voice: Type.Optional(Type.String()), expiresAt: Type.Optional(Type.Number()), @@ -57,6 +109,55 @@ export const TalkRealtimeSessionResultSchema = Type.Object( { additionalProperties: false }, ); +const BrowserRealtimeJsonPcmWebSocketSessionSchema = Type.Object( + { + provider: NonEmptyString, + transport: Type.Literal("json-pcm-websocket"), + protocol: NonEmptyString, + clientSecret: NonEmptyString, + websocketUrl: NonEmptyString, + audio: BrowserRealtimeAudioContractSchema, + initialMessage: Type.Optional(Type.Unknown()), + model: Type.Optional(Type.String()), + voice: Type.Optional(Type.String()), + expiresAt: Type.Optional(Type.Number()), + }, + { additionalProperties: false }, +); + +const BrowserRealtimeGatewayRelaySessionSchema = Type.Object( + { + provider: NonEmptyString, + transport: Type.Literal("gateway-relay"), + relaySessionId: NonEmptyString, + audio: BrowserRealtimeAudioContractSchema, + model: Type.Optional(Type.String()), + voice: Type.Optional(Type.String()), + expiresAt: Type.Optional(Type.Number()), + }, + { additionalProperties: false }, +); + +const BrowserRealtimeManagedRoomSessionSchema = Type.Object( + { + provider: NonEmptyString, + transport: Type.Literal("managed-room"), + roomUrl: NonEmptyString, + token: Type.Optional(Type.String()), + model: Type.Optional(Type.String()), + voice: Type.Optional(Type.String()), + expiresAt: Type.Optional(Type.Number()), + }, + { additionalProperties: false }, +); + +export const TalkRealtimeSessionResultSchema = Type.Union([ + BrowserRealtimeWebRtcSdpSessionSchema, + BrowserRealtimeJsonPcmWebSocketSessionSchema, + BrowserRealtimeGatewayRelaySessionSchema, + BrowserRealtimeManagedRoomSessionSchema, +]); + const talkProviderFieldSchemas = { apiKey: Type.Optional(SecretInputSchema), }; diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index 4180789e3b2..c3df0479f82 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -54,6 +54,11 @@ import { ChannelsLogoutParamsSchema, TalkConfigParamsSchema, TalkConfigResultSchema, + TalkRealtimeRelayAudioParamsSchema, + TalkRealtimeRelayMarkParamsSchema, + TalkRealtimeRelayOkResultSchema, + TalkRealtimeRelayStopParamsSchema, + TalkRealtimeRelayToolResultParamsSchema, TalkRealtimeSessionParamsSchema, TalkRealtimeSessionResultSchema, TalkSpeakParamsSchema, @@ -286,6 +291,11 @@ export const ProtocolSchemas = { TalkConfigResult: TalkConfigResultSchema, TalkRealtimeSessionParams: TalkRealtimeSessionParamsSchema, TalkRealtimeSessionResult: TalkRealtimeSessionResultSchema, + TalkRealtimeRelayAudioParams: TalkRealtimeRelayAudioParamsSchema, + TalkRealtimeRelayMarkParams: TalkRealtimeRelayMarkParamsSchema, + TalkRealtimeRelayStopParams: TalkRealtimeRelayStopParamsSchema, + TalkRealtimeRelayToolResultParams: TalkRealtimeRelayToolResultParamsSchema, + TalkRealtimeRelayOkResult: TalkRealtimeRelayOkResultSchema, TalkSpeakParams: TalkSpeakParamsSchema, TalkSpeakResult: TalkSpeakResultSchema, ChannelsStatusParams: ChannelsStatusParamsSchema, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index 0fec4c3492b..1d312c1dd09 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -84,6 +84,11 @@ export type TalkConfigParams = SchemaType<"TalkConfigParams">; export type TalkConfigResult = SchemaType<"TalkConfigResult">; export type TalkRealtimeSessionParams = SchemaType<"TalkRealtimeSessionParams">; export type TalkRealtimeSessionResult = SchemaType<"TalkRealtimeSessionResult">; +export type TalkRealtimeRelayAudioParams = SchemaType<"TalkRealtimeRelayAudioParams">; +export type TalkRealtimeRelayMarkParams = SchemaType<"TalkRealtimeRelayMarkParams">; +export type TalkRealtimeRelayStopParams = SchemaType<"TalkRealtimeRelayStopParams">; +export type TalkRealtimeRelayToolResultParams = SchemaType<"TalkRealtimeRelayToolResultParams">; +export type TalkRealtimeRelayOkResult = SchemaType<"TalkRealtimeRelayOkResult">; export type TalkSpeakParams = SchemaType<"TalkSpeakParams">; export type TalkSpeakResult = SchemaType<"TalkSpeakResult">; export type ChannelsStatusParams = SchemaType<"ChannelsStatusParams">; diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index 8979342537d..d912d648457 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -51,6 +51,10 @@ const BASE_METHODS = [ "wizard.status", "talk.config", "talk.realtime.session", + "talk.realtime.relayAudio", + "talk.realtime.relayMark", + "talk.realtime.relayStop", + "talk.realtime.relayToolResult", "talk.speak", "talk.mode", "commands.list", diff --git a/src/gateway/server-methods/talk.ts b/src/gateway/server-methods/talk.ts index 08039d957e3..87f745ebb40 100644 --- a/src/gateway/server-methods/talk.ts +++ b/src/gateway/server-methods/talk.ts @@ -29,9 +29,20 @@ import { type TalkSpeakParams, validateTalkConfigParams, validateTalkModeParams, + validateTalkRealtimeRelayAudioParams, + validateTalkRealtimeRelayMarkParams, + validateTalkRealtimeRelayStopParams, + validateTalkRealtimeRelayToolResultParams, validateTalkRealtimeSessionParams, validateTalkSpeakParams, } from "../protocol/index.js"; +import { + acknowledgeTalkRealtimeRelayMark, + createTalkRealtimeRelaySession, + sendTalkRealtimeRelayAudio, + stopTalkRealtimeRelaySession, + submitTalkRealtimeRelayToolResult, +} from "../talk-realtime-relay.js"; import { formatForLog } from "../ws-log.js"; import { asRecord } from "./record-shared.js"; import type { GatewayRequestHandlers } from "./types.js"; @@ -199,6 +210,22 @@ function buildRealtimeInstructions(): string { return `You are OpenClaw's realtime voice interface. Keep spoken replies concise. If the user asks for code, repository state, tools, files, current OpenClaw context, or deeper reasoning, call ${REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME} and then summarize the result naturally.`; } +function withRealtimeBrowserOverrides( + providerConfig: RealtimeVoiceProviderConfig, + params: { model?: string; voice?: string }, +): RealtimeVoiceProviderConfig { + const overrides: RealtimeVoiceProviderConfig = {}; + const model = normalizeOptionalString(params.model); + const voice = normalizeOptionalString(params.voice); + if (model) { + overrides.model = model; + } + if (voice) { + overrides.voice = voice; + } + return Object.keys(overrides).length > 0 ? { ...providerConfig, ...overrides } : providerConfig; +} + function isFallbackEligibleTalkReason(reason: TalkSpeakReason): boolean { return ( reason === "talk_unconfigured" || @@ -397,7 +424,7 @@ export const talkHandlers: GatewayRequestHandlers = { respond(true, { config: configPayload }, undefined); }, - "talk.realtime.session": async ({ params, respond, context }) => { + "talk.realtime.session": async ({ params, respond, context, client }) => { if (!validateTalkRealtimeSessionParams(params)) { respond( false, @@ -424,35 +451,146 @@ export const talkHandlers: GatewayRequestHandlers = { cfgForResolve: runtimeConfig, noRegisteredProviderMessage: "No realtime voice provider registered", }); - if (!resolution.provider.createBrowserSession) { + if (resolution.provider.createBrowserSession) { + const session = await resolution.provider.createBrowserSession({ + providerConfig: resolution.providerConfig, + instructions: buildRealtimeInstructions(), + tools: [REALTIME_VOICE_AGENT_CONSULT_TOOL], + model: normalizeOptionalString(typedParams.model), + voice: normalizeOptionalString(typedParams.voice), + }); + respond(true, session, undefined); + return; + } + + const connId = client?.connId; + if (!connId) { respond( false, undefined, - errorShape( - ErrorCodes.UNAVAILABLE, - `Realtime voice provider "${resolution.provider.id}" does not support browser WebRTC sessions`, - ), + errorShape(ErrorCodes.UNAVAILABLE, "Realtime relay requires a connected browser client"), ); return; } - const session = await resolution.provider.createBrowserSession({ - providerConfig: resolution.providerConfig, + const model = normalizeOptionalString(typedParams.model); + const voice = normalizeOptionalString(typedParams.voice); + const session = createTalkRealtimeRelaySession({ + context, + connId, + provider: resolution.provider, + providerConfig: withRealtimeBrowserOverrides(resolution.providerConfig, { model, voice }), instructions: buildRealtimeInstructions(), tools: [REALTIME_VOICE_AGENT_CONSULT_TOOL], - model: normalizeOptionalString(typedParams.model), - voice: normalizeOptionalString(typedParams.voice), + model, + voice, }); + respond(true, session, undefined); + } catch (err) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); + } + }, + "talk.realtime.relayAudio": async ({ params, respond, client }) => { + if (!validateTalkRealtimeRelayAudioParams(params)) { respond( - true, - { - provider: session.provider, - clientSecret: session.clientSecret, - ...(session.model ? { model: session.model } : {}), - ...(session.voice ? { voice: session.voice } : {}), - ...(typeof session.expiresAt === "number" ? { expiresAt: session.expiresAt } : {}), - }, + false, undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid talk.realtime.relayAudio params: ${formatValidationErrors(validateTalkRealtimeRelayAudioParams.errors)}`, + ), ); + return; + } + const connId = client?.connId; + if (!connId) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "realtime relay unavailable")); + return; + } + try { + sendTalkRealtimeRelayAudio({ + relaySessionId: params.relaySessionId, + connId, + audioBase64: params.audioBase64, + timestamp: params.timestamp, + }); + respond(true, { ok: true }, undefined); + } catch (err) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); + } + }, + "talk.realtime.relayMark": async ({ params, respond, client }) => { + if (!validateTalkRealtimeRelayMarkParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid talk.realtime.relayMark params: ${formatValidationErrors(validateTalkRealtimeRelayMarkParams.errors)}`, + ), + ); + return; + } + const connId = client?.connId; + if (!connId) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "realtime relay unavailable")); + return; + } + try { + acknowledgeTalkRealtimeRelayMark({ relaySessionId: params.relaySessionId, connId }); + respond(true, { ok: true }, undefined); + } catch (err) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); + } + }, + "talk.realtime.relayStop": async ({ params, respond, client }) => { + if (!validateTalkRealtimeRelayStopParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid talk.realtime.relayStop params: ${formatValidationErrors(validateTalkRealtimeRelayStopParams.errors)}`, + ), + ); + return; + } + const connId = client?.connId; + if (!connId) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "realtime relay unavailable")); + return; + } + try { + stopTalkRealtimeRelaySession({ relaySessionId: params.relaySessionId, connId }); + respond(true, { ok: true }, undefined); + } catch (err) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); + } + }, + "talk.realtime.relayToolResult": async ({ params, respond, client }) => { + if (!validateTalkRealtimeRelayToolResultParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid talk.realtime.relayToolResult params: ${formatValidationErrors(validateTalkRealtimeRelayToolResultParams.errors)}`, + ), + ); + return; + } + const connId = client?.connId; + if (!connId) { + respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, "realtime relay unavailable")); + return; + } + try { + submitTalkRealtimeRelayToolResult({ + relaySessionId: params.relaySessionId, + connId, + callId: params.callId, + result: params.result, + }); + respond(true, { ok: true }, undefined); } catch (err) { respond(false, undefined, errorShape(ErrorCodes.UNAVAILABLE, formatForLog(err))); } diff --git a/src/gateway/talk-realtime-relay.test.ts b/src/gateway/talk-realtime-relay.test.ts new file mode 100644 index 00000000000..1a7125b2ae3 --- /dev/null +++ b/src/gateway/talk-realtime-relay.test.ts @@ -0,0 +1,181 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js"; +import type { RealtimeVoiceBridgeCreateRequest } from "../realtime-voice/provider-types.js"; +import { + acknowledgeTalkRealtimeRelayMark, + clearTalkRealtimeRelaySessionsForTest, + createTalkRealtimeRelaySession, + sendTalkRealtimeRelayAudio, + stopTalkRealtimeRelaySession, + submitTalkRealtimeRelayToolResult, +} from "./talk-realtime-relay.js"; + +describe("talk realtime gateway relay", () => { + afterEach(() => { + clearTalkRealtimeRelaySessionsForTest(); + }); + + it("bridges browser audio, transcripts, marks, and tool results through a backend provider", async () => { + let bridgeRequest: RealtimeVoiceBridgeCreateRequest | undefined; + const bridge = { + supportsToolResultContinuation: true, + connect: vi.fn(async () => { + bridgeRequest?.onReady?.(); + bridgeRequest?.onAudio(Buffer.from("audio-out")); + bridgeRequest?.onMark?.("mark-1"); + bridgeRequest?.onTranscript?.("user", "hello", true); + bridgeRequest?.onToolCall?.({ + itemId: "item-1", + callId: "call-1", + name: "openclaw_agent_consult", + args: { question: "what now" }, + }); + }), + sendAudio: vi.fn(), + setMediaTimestamp: vi.fn(), + sendUserMessage: vi.fn(), + triggerGreeting: vi.fn(), + submitToolResult: vi.fn(), + acknowledgeMark: vi.fn(), + close: vi.fn(), + isConnected: vi.fn(() => true), + }; + const provider: RealtimeVoiceProviderPlugin = { + id: "relay-test", + label: "Relay Test", + isConfigured: () => true, + createBridge: (req) => { + bridgeRequest = req; + return bridge; + }, + }; + const events: Array<{ event: string; payload: unknown; connIds: string[] }> = []; + const context = { + broadcastToConnIds: (event: string, payload: unknown, connIds: ReadonlySet) => { + events.push({ event, payload, connIds: [...connIds] }); + }, + } as never; + + const session = createTalkRealtimeRelaySession({ + context, + connId: "conn-1", + provider, + providerConfig: { model: "provider-model" }, + instructions: "be brief", + tools: [], + model: "browser-model", + voice: "voice-a", + }); + await Promise.resolve(); + + expect(session).toMatchObject({ + provider: "relay-test", + transport: "gateway-relay", + model: "browser-model", + voice: "voice-a", + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: 24000, + outputEncoding: "pcm16", + outputSampleRateHz: 24000, + }, + }); + expect(bridgeRequest).toMatchObject({ + providerConfig: { model: "provider-model" }, + audioFormat: { encoding: "pcm16", sampleRateHz: 24000, channels: 1 }, + instructions: "be brief", + }); + expect(events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + event: "talk.realtime.relay", + connIds: ["conn-1"], + payload: { relaySessionId: session.relaySessionId, type: "ready" }, + }), + expect.objectContaining({ + payload: { + relaySessionId: session.relaySessionId, + type: "audio", + audioBase64: Buffer.from("audio-out").toString("base64"), + }, + }), + expect.objectContaining({ + payload: { relaySessionId: session.relaySessionId, type: "mark", markName: "mark-1" }, + }), + expect.objectContaining({ + payload: { + relaySessionId: session.relaySessionId, + type: "transcript", + role: "user", + text: "hello", + final: true, + }, + }), + expect.objectContaining({ + payload: { + relaySessionId: session.relaySessionId, + type: "toolCall", + itemId: "item-1", + callId: "call-1", + name: "openclaw_agent_consult", + args: { question: "what now" }, + }, + }), + ]), + ); + + sendTalkRealtimeRelayAudio({ + relaySessionId: session.relaySessionId, + connId: "conn-1", + audioBase64: Buffer.from("audio-in").toString("base64"), + timestamp: 123, + }); + acknowledgeTalkRealtimeRelayMark({ relaySessionId: session.relaySessionId, connId: "conn-1" }); + submitTalkRealtimeRelayToolResult({ + relaySessionId: session.relaySessionId, + connId: "conn-1", + callId: "call-1", + result: { ok: true }, + }); + stopTalkRealtimeRelaySession({ relaySessionId: session.relaySessionId, connId: "conn-1" }); + + expect(bridge.sendAudio).toHaveBeenCalledWith(Buffer.from("audio-in")); + expect(bridge.setMediaTimestamp).toHaveBeenCalledWith(123); + expect(bridge.acknowledgeMark).toHaveBeenCalled(); + expect(bridge.submitToolResult).toHaveBeenCalledWith("call-1", { ok: true }, undefined); + expect(bridge.close).toHaveBeenCalled(); + }); + + it("rejects relay control from a different connection", () => { + const provider: RealtimeVoiceProviderPlugin = { + id: "relay-test", + label: "Relay Test", + isConfigured: () => true, + createBridge: () => ({ + connect: vi.fn(async () => undefined), + sendAudio: vi.fn(), + setMediaTimestamp: vi.fn(), + submitToolResult: vi.fn(), + acknowledgeMark: vi.fn(), + close: vi.fn(), + isConnected: vi.fn(() => true), + }), + }; + const session = createTalkRealtimeRelaySession({ + context: { broadcastToConnIds: vi.fn() } as never, + connId: "conn-1", + provider, + providerConfig: {}, + instructions: "brief", + tools: [], + }); + + expect(() => + sendTalkRealtimeRelayAudio({ + relaySessionId: session.relaySessionId, + connId: "conn-2", + audioBase64: Buffer.from("audio").toString("base64"), + }), + ).toThrow("Unknown realtime relay session"); + }); +}); diff --git a/src/gateway/talk-realtime-relay.ts b/src/gateway/talk-realtime-relay.ts new file mode 100644 index 00000000000..9af95d1d25e --- /dev/null +++ b/src/gateway/talk-realtime-relay.ts @@ -0,0 +1,248 @@ +import { randomUUID } from "node:crypto"; +import type { RealtimeVoiceProviderPlugin } from "../plugins/types.js"; +import { + REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, + type RealtimeVoiceBrowserAudioContract, + type RealtimeVoiceProviderConfig, + type RealtimeVoiceTool, +} from "../realtime-voice/provider-types.js"; +import { + createRealtimeVoiceBridgeSession, + type RealtimeVoiceBridgeSession, +} from "../realtime-voice/session-runtime.js"; +import type { GatewayRequestContext } from "./server-methods/shared-types.js"; + +const RELAY_SESSION_TTL_MS = 30 * 60 * 1000; +const MAX_AUDIO_BASE64_BYTES = 512 * 1024; +const RELAY_EVENT = "talk.realtime.relay"; + +export type TalkRealtimeRelayEvent = + | { relaySessionId: string; type: "ready" } + | { relaySessionId: string; type: "audio"; audioBase64: string } + | { relaySessionId: string; type: "clear" } + | { relaySessionId: string; type: "mark"; markName: string } + | { + relaySessionId: string; + type: "transcript"; + role: "user" | "assistant"; + text: string; + final: boolean; + } + | { + relaySessionId: string; + type: "toolCall"; + itemId: string; + callId: string; + name: string; + args: unknown; + } + | { relaySessionId: string; type: "error"; message: string } + | { relaySessionId: string; type: "close"; reason: "completed" | "error" }; + +type RelaySession = { + id: string; + connId: string; + context: GatewayRequestContext; + bridge: RealtimeVoiceBridgeSession; + expiresAtMs: number; + cleanupTimer: ReturnType; +}; + +export type CreateTalkRealtimeRelaySessionParams = { + context: GatewayRequestContext; + connId: string; + provider: RealtimeVoiceProviderPlugin; + providerConfig: RealtimeVoiceProviderConfig; + instructions: string; + tools: RealtimeVoiceTool[]; + model?: string; + voice?: string; +}; + +export type TalkRealtimeRelaySessionResult = { + provider: string; + transport: "gateway-relay"; + relaySessionId: string; + audio: RealtimeVoiceBrowserAudioContract; + model?: string; + voice?: string; + expiresAt: number; +}; + +const relaySessions = new Map(); + +function formatError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function broadcastToOwner( + context: GatewayRequestContext, + connId: string, + event: TalkRealtimeRelayEvent, +): void { + context.broadcastToConnIds(RELAY_EVENT, event, new Set([connId]), { dropIfSlow: true }); +} + +function closeRelaySession(session: RelaySession, reason: "completed" | "error"): void { + relaySessions.delete(session.id); + clearTimeout(session.cleanupTimer); + session.bridge.close(); + broadcastToOwner(session.context, session.connId, { + relaySessionId: session.id, + type: "close", + reason, + }); +} + +export function createTalkRealtimeRelaySession( + params: CreateTalkRealtimeRelaySessionParams, +): TalkRealtimeRelaySessionResult { + const relaySessionId = randomUUID(); + const expiresAtMs = Date.now() + RELAY_SESSION_TTL_MS; + let relay: RelaySession | undefined; + const emit = (event: TalkRealtimeRelayEvent) => + broadcastToOwner(params.context, params.connId, event); + const bridge = createRealtimeVoiceBridgeSession({ + provider: params.provider, + providerConfig: params.providerConfig, + audioFormat: REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ, + instructions: params.instructions, + tools: params.tools, + markStrategy: "transport", + audioSink: { + isOpen: () => Boolean(relay && relaySessions.has(relay.id)), + sendAudio: (audio) => + emit({ + relaySessionId, + type: "audio", + audioBase64: audio.toString("base64"), + }), + clearAudio: () => emit({ relaySessionId, type: "clear" }), + sendMark: (markName) => emit({ relaySessionId, type: "mark", markName }), + }, + onTranscript: (role, text, final) => { + emit({ relaySessionId, type: "transcript", role, text, final }); + }, + onToolCall: (toolCall) => { + emit({ + relaySessionId, + type: "toolCall", + itemId: toolCall.itemId, + callId: toolCall.callId, + name: toolCall.name, + args: toolCall.args, + }); + }, + onReady: () => emit({ relaySessionId, type: "ready" }), + onError: (error) => emit({ relaySessionId, type: "error", message: error.message }), + onClose: (reason) => { + const active = relaySessions.get(relaySessionId); + if (!active) { + return; + } + relaySessions.delete(relaySessionId); + clearTimeout(active.cleanupTimer); + emit({ relaySessionId, type: "close", reason }); + }, + }); + relay = { + id: relaySessionId, + connId: params.connId, + context: params.context, + bridge, + expiresAtMs, + cleanupTimer: setTimeout(() => { + const active = relaySessions.get(relaySessionId); + if (active) { + closeRelaySession(active, "completed"); + } + }, RELAY_SESSION_TTL_MS), + }; + relay.cleanupTimer.unref?.(); + relaySessions.set(relaySessionId, relay); + bridge.connect().catch((error: unknown) => { + emit({ relaySessionId, type: "error", message: formatError(error) }); + const active = relaySessions.get(relaySessionId); + if (active) { + closeRelaySession(active, "error"); + } + }); + + return { + provider: params.provider.id, + transport: "gateway-relay", + relaySessionId, + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ.sampleRateHz, + outputEncoding: "pcm16", + outputSampleRateHz: REALTIME_VOICE_AUDIO_FORMAT_PCM16_24KHZ.sampleRateHz, + }, + ...(params.model ? { model: params.model } : {}), + ...(params.voice ? { voice: params.voice } : {}), + expiresAt: Math.floor(expiresAtMs / 1000), + }; +} + +function getRelaySession(relaySessionId: string, connId: string): RelaySession { + const session = relaySessions.get(relaySessionId); + if (!session || session.connId !== connId || Date.now() > session.expiresAtMs) { + if (session) { + closeRelaySession(session, "completed"); + } + throw new Error("Unknown realtime relay session"); + } + return session; +} + +export function sendTalkRealtimeRelayAudio(params: { + relaySessionId: string; + connId: string; + audioBase64: string; + timestamp?: number; +}): void { + if (params.audioBase64.length > MAX_AUDIO_BASE64_BYTES) { + throw new Error("Realtime relay audio frame is too large"); + } + const session = getRelaySession(params.relaySessionId, params.connId); + const audio = Buffer.from(params.audioBase64, "base64"); + session.bridge.sendAudio(audio); + if (typeof params.timestamp === "number" && Number.isFinite(params.timestamp)) { + session.bridge.setMediaTimestamp(params.timestamp); + } +} + +export function acknowledgeTalkRealtimeRelayMark(params: { + relaySessionId: string; + connId: string; +}): void { + getRelaySession(params.relaySessionId, params.connId).bridge.acknowledgeMark(); +} + +export function submitTalkRealtimeRelayToolResult(params: { + relaySessionId: string; + connId: string; + callId: string; + result: unknown; +}): void { + getRelaySession(params.relaySessionId, params.connId).bridge.submitToolResult( + params.callId, + params.result, + ); +} + +export function stopTalkRealtimeRelaySession(params: { + relaySessionId: string; + connId: string; +}): void { + const session = getRelaySession(params.relaySessionId, params.connId); + closeRelaySession(session, "completed"); +} + +export function clearTalkRealtimeRelaySessionsForTest(): void { + for (const session of relaySessions.values()) { + clearTimeout(session.cleanupTimer); + session.bridge.close(); + } + relaySessions.clear(); +} diff --git a/src/realtime-voice/provider-types.ts b/src/realtime-voice/provider-types.ts index 46caa9ee58a..21166388164 100644 --- a/src/realtime-voice/provider-types.ts +++ b/src/realtime-voice/provider-types.ts @@ -90,14 +90,62 @@ export type RealtimeVoiceBrowserSessionCreateRequest = { voice?: string; }; -export type RealtimeVoiceBrowserSession = { +export type RealtimeVoiceBrowserAudioContract = { + inputEncoding: "pcm16" | "g711_ulaw"; + inputSampleRateHz: number; + outputEncoding: "pcm16" | "g711_ulaw"; + outputSampleRateHz: number; +}; + +export type RealtimeVoiceBrowserWebRtcSdpSession = { provider: RealtimeVoiceProviderId; + transport?: "webrtc-sdp"; clientSecret: string; + offerUrl?: string; model?: string; voice?: string; expiresAt?: number; }; +export type RealtimeVoiceBrowserJsonPcmWebSocketSession = { + provider: RealtimeVoiceProviderId; + transport: "json-pcm-websocket"; + protocol: string; + clientSecret: string; + websocketUrl: string; + audio: RealtimeVoiceBrowserAudioContract; + initialMessage?: unknown; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeVoiceBrowserGatewayRelaySession = { + provider: RealtimeVoiceProviderId; + transport: "gateway-relay"; + relaySessionId: string; + audio: RealtimeVoiceBrowserAudioContract; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeVoiceBrowserManagedRoomSession = { + provider: RealtimeVoiceProviderId; + transport: "managed-room"; + roomUrl: string; + token?: string; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeVoiceBrowserSession = + | RealtimeVoiceBrowserWebRtcSdpSession + | RealtimeVoiceBrowserJsonPcmWebSocketSession + | RealtimeVoiceBrowserGatewayRelaySession + | RealtimeVoiceBrowserManagedRoomSession; + export type RealtimeVoiceBridge = { supportsToolResultContinuation?: boolean; connect(): Promise; diff --git a/ui/src/ui/chat/realtime-talk-audio.ts b/ui/src/ui/chat/realtime-talk-audio.ts new file mode 100644 index 00000000000..ef6b10a21f5 --- /dev/null +++ b/ui/src/ui/chat/realtime-talk-audio.ts @@ -0,0 +1,37 @@ +export function bytesToBase64(bytes: Uint8Array): string { + let binary = ""; + const chunkSize = 0x8000; + for (let offset = 0; offset < bytes.length; offset += chunkSize) { + const chunk = bytes.subarray(offset, offset + chunkSize); + binary += String.fromCharCode(...chunk); + } + return btoa(binary); +} + +export function base64ToBytes(value: string): Uint8Array { + const binary = atob(value); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i += 1) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} + +export function floatToPcm16(samples: Float32Array): Uint8Array { + const bytes = new Uint8Array(samples.length * 2); + const view = new DataView(bytes.buffer); + for (let i = 0; i < samples.length; i += 1) { + const clamped = Math.max(-1, Math.min(1, samples[i] ?? 0)); + view.setInt16(i * 2, clamped < 0 ? clamped * 0x8000 : clamped * 0x7fff, true); + } + return bytes; +} + +export function pcm16ToFloat(bytes: Uint8Array): Float32Array { + const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength); + const samples = new Float32Array(Math.floor(bytes.byteLength / 2)); + for (let i = 0; i < samples.length; i += 1) { + samples[i] = view.getInt16(i * 2, true) / 0x8000; + } + return samples; +} diff --git a/ui/src/ui/chat/realtime-talk-gateway-relay.ts b/ui/src/ui/chat/realtime-talk-gateway-relay.ts new file mode 100644 index 00000000000..5eab5d67252 --- /dev/null +++ b/ui/src/ui/chat/realtime-talk-gateway-relay.ts @@ -0,0 +1,237 @@ +import { base64ToBytes, bytesToBase64, floatToPcm16, pcm16ToFloat } from "./realtime-talk-audio.ts"; +import { + REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + submitRealtimeTalkConsult, + type RealtimeTalkGatewayRelaySessionResult, + type RealtimeTalkTransport, + type RealtimeTalkTransportContext, +} from "./realtime-talk-shared.ts"; + +type GatewayRelayEvent = + | { relaySessionId?: string; type?: "ready" } + | { relaySessionId?: string; type?: "audio"; audioBase64?: string } + | { relaySessionId?: string; type?: "clear" } + | { relaySessionId?: string; type?: "mark"; markName?: string } + | { + relaySessionId?: string; + type?: "transcript"; + role?: "user" | "assistant"; + text?: string; + final?: boolean; + } + | { + relaySessionId?: string; + type?: "toolCall"; + callId?: string; + name?: string; + args?: unknown; + } + | { relaySessionId?: string; type?: "error"; message?: string } + | { relaySessionId?: string; type?: "close"; reason?: string }; + +export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport { + private media: MediaStream | null = null; + private inputContext: AudioContext | null = null; + private outputContext: AudioContext | null = null; + private inputSource: MediaStreamAudioSourceNode | null = null; + private inputProcessor: ScriptProcessorNode | null = null; + private unsubscribe: (() => void) | null = null; + private playhead = 0; + private closed = false; + private readonly sources = new Set(); + + constructor( + private readonly session: RealtimeTalkGatewayRelaySessionResult, + private readonly ctx: RealtimeTalkTransportContext, + ) {} + + async start(): Promise { + if (!navigator.mediaDevices?.getUserMedia) { + throw new Error("Realtime Talk requires browser microphone access"); + } + if ( + this.session.audio.inputEncoding !== "pcm16" || + this.session.audio.outputEncoding !== "pcm16" + ) { + throw new Error("Gateway-relay realtime Talk currently requires PCM16 audio"); + } + this.closed = false; + this.unsubscribe = this.ctx.client.addEventListener((evt) => { + if (evt.event !== "talk.realtime.relay") { + return; + } + this.handleRelayEvent(evt.payload as GatewayRelayEvent); + }); + this.media = await navigator.mediaDevices.getUserMedia({ audio: true }); + this.inputContext = new AudioContext({ sampleRate: this.session.audio.inputSampleRateHz }); + this.outputContext = new AudioContext({ sampleRate: this.session.audio.outputSampleRateHz }); + this.startMicrophonePump(); + } + + stop(): void { + this.closed = true; + this.unsubscribe?.(); + this.unsubscribe = null; + this.inputProcessor?.disconnect(); + this.inputProcessor = null; + this.inputSource?.disconnect(); + this.inputSource = null; + this.media?.getTracks().forEach((track) => track.stop()); + this.media = null; + this.stopOutput(); + void this.inputContext?.close(); + this.inputContext = null; + void this.outputContext?.close(); + this.outputContext = null; + void this.ctx.client.request("talk.realtime.relayStop", { + relaySessionId: this.session.relaySessionId, + }); + } + + private startMicrophonePump(): void { + if (!this.media || !this.inputContext) { + return; + } + this.inputSource = this.inputContext.createMediaStreamSource(this.media); + this.inputProcessor = this.inputContext.createScriptProcessor(4096, 1, 1); + this.inputProcessor.onaudioprocess = (event) => { + if (this.closed) { + return; + } + const pcm = floatToPcm16(event.inputBuffer.getChannelData(0)); + void this.ctx.client.request("talk.realtime.relayAudio", { + relaySessionId: this.session.relaySessionId, + audioBase64: bytesToBase64(pcm), + timestamp: Math.round((this.inputContext?.currentTime ?? 0) * 1000), + }); + }; + this.inputSource.connect(this.inputProcessor); + this.inputProcessor.connect(this.inputContext.destination); + } + + private handleRelayEvent(event: GatewayRelayEvent): void { + if (event.relaySessionId !== this.session.relaySessionId || this.closed) { + return; + } + switch (event.type) { + case "ready": + this.ctx.callbacks.onStatus?.("listening"); + return; + case "audio": + if (event.audioBase64) { + this.playPcm16(event.audioBase64); + } + return; + case "clear": + this.stopOutput(); + return; + case "mark": + this.scheduleMarkAck(); + return; + case "transcript": + if (event.role && event.text) { + this.ctx.callbacks.onTranscript?.({ + role: event.role, + text: event.text, + final: event.final ?? false, + }); + } + return; + case "toolCall": + void this.handleToolCall(event); + return; + case "error": + this.ctx.callbacks.onStatus?.("error", event.message ?? "Realtime relay failed"); + return; + case "close": + if (!this.closed) { + this.ctx.callbacks.onStatus?.( + event.reason === "error" ? "error" : "idle", + event.reason === "error" ? "Realtime relay closed" : undefined, + ); + } + return; + default: + return; + } + } + + private playPcm16(base64: string): void { + if (!this.outputContext) { + return; + } + const samples = pcm16ToFloat(base64ToBytes(base64)); + if (samples.length === 0) { + return; + } + const buffer = this.outputContext.createBuffer( + 1, + samples.length, + this.session.audio.outputSampleRateHz, + ); + buffer.getChannelData(0).set(samples); + const source = this.outputContext.createBufferSource(); + this.sources.add(source); + source.addEventListener("ended", () => this.sources.delete(source)); + source.buffer = buffer; + source.connect(this.outputContext.destination); + const startAt = Math.max(this.outputContext.currentTime, this.playhead); + source.start(startAt); + this.playhead = startAt + buffer.duration; + } + + private stopOutput(): void { + for (const source of this.sources) { + try { + source.stop(); + } catch {} + } + this.sources.clear(); + this.playhead = this.outputContext?.currentTime ?? 0; + } + + private scheduleMarkAck(): void { + const delayMs = Math.max( + 0, + Math.ceil( + ((this.playhead || this.outputContext?.currentTime || 0) - + (this.outputContext?.currentTime ?? 0)) * + 1000, + ), + ); + window.setTimeout(() => { + if (this.closed) { + return; + } + void this.ctx.client.request("talk.realtime.relayMark", { + relaySessionId: this.session.relaySessionId, + }); + }, delayMs); + } + + private async handleToolCall(event: Extract) { + const callId = event.callId?.trim(); + const name = event.name?.trim(); + if (!callId || !name) { + return; + } + if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { + this.submitToolResult(callId, { error: `Tool "${name}" not available in browser Talk` }); + return; + } + await submitRealtimeTalkConsult({ + ctx: this.ctx, + callId, + args: event.args ?? {}, + submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + }); + } + + private submitToolResult(callId: string, result: unknown): void { + void this.ctx.client.request("talk.realtime.relayToolResult", { + relaySessionId: this.session.relaySessionId, + callId, + result, + }); + } +} diff --git a/ui/src/ui/chat/realtime-talk-google-live.ts b/ui/src/ui/chat/realtime-talk-google-live.ts new file mode 100644 index 00000000000..5cd77ace1ae --- /dev/null +++ b/ui/src/ui/chat/realtime-talk-google-live.ts @@ -0,0 +1,247 @@ +import { base64ToBytes, bytesToBase64, floatToPcm16, pcm16ToFloat } from "./realtime-talk-audio.ts"; +import type { RealtimeTalkJsonPcmWebSocketSessionResult } from "./realtime-talk-shared.ts"; +import { + REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + submitRealtimeTalkConsult, + type RealtimeTalkTransport, + type RealtimeTalkTransportContext, +} from "./realtime-talk-shared.ts"; + +type GoogleLiveMessage = { + setupComplete?: unknown; + serverContent?: { + interrupted?: boolean; + inputTranscription?: { text?: string; finished?: boolean }; + outputTranscription?: { text?: string; finished?: boolean }; + modelTurn?: { + parts?: Array<{ + text?: string; + thought?: boolean; + inlineData?: { data?: string; mimeType?: string }; + }>; + }; + turnComplete?: boolean; + }; + toolCall?: { + functionCalls?: Array<{ + id?: string; + name?: string; + args?: unknown; + }>; + }; +}; + +type PendingFunctionCall = { + name: string; + args: unknown; +}; + +function buildGoogleLiveUrl(session: RealtimeTalkJsonPcmWebSocketSessionResult): string { + const url = new URL(session.websocketUrl); + url.searchParams.set("access_token", session.clientSecret); + return url.toString(); +} + +export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { + private ws: WebSocket | null = null; + private media: MediaStream | null = null; + private inputContext: AudioContext | null = null; + private outputContext: AudioContext | null = null; + private inputSource: MediaStreamAudioSourceNode | null = null; + private inputProcessor: ScriptProcessorNode | null = null; + private playhead = 0; + private closed = false; + private pendingCalls = new Map(); + + constructor( + private readonly session: RealtimeTalkJsonPcmWebSocketSessionResult, + private readonly ctx: RealtimeTalkTransportContext, + ) {} + + async start(): Promise { + if (!navigator.mediaDevices?.getUserMedia || typeof WebSocket === "undefined") { + throw new Error("Realtime Talk requires browser WebSocket and microphone access"); + } + if (this.session.protocol !== "google-live-bidi") { + throw new Error(`Unsupported realtime WebSocket protocol: ${this.session.protocol}`); + } + this.closed = false; + this.media = await navigator.mediaDevices.getUserMedia({ audio: true }); + this.inputContext = new AudioContext({ sampleRate: this.session.audio.inputSampleRateHz }); + this.outputContext = new AudioContext({ sampleRate: this.session.audio.outputSampleRateHz }); + this.ws = new WebSocket(buildGoogleLiveUrl(this.session)); + this.ws.addEventListener("open", () => { + this.send(this.session.initialMessage ?? { setup: {} }); + this.startMicrophonePump(); + }); + this.ws.addEventListener("message", (event) => this.handleMessage(event.data)); + this.ws.addEventListener("close", () => { + if (!this.closed) { + this.ctx.callbacks.onStatus?.("error", "Realtime connection closed"); + } + }); + this.ws.addEventListener("error", () => { + if (!this.closed) { + this.ctx.callbacks.onStatus?.("error", "Realtime connection failed"); + } + }); + } + + stop(): void { + this.closed = true; + this.pendingCalls.clear(); + this.inputProcessor?.disconnect(); + this.inputProcessor = null; + this.inputSource?.disconnect(); + this.inputSource = null; + this.media?.getTracks().forEach((track) => track.stop()); + this.media = null; + void this.inputContext?.close(); + this.inputContext = null; + void this.outputContext?.close(); + this.outputContext = null; + this.ws?.close(); + this.ws = null; + } + + private startMicrophonePump(): void { + if (!this.media || !this.inputContext) { + return; + } + this.inputSource = this.inputContext.createMediaStreamSource(this.media); + this.inputProcessor = this.inputContext.createScriptProcessor(4096, 1, 1); + this.inputProcessor.onaudioprocess = (event) => { + if (this.ws?.readyState !== WebSocket.OPEN) { + return; + } + const pcm = floatToPcm16(event.inputBuffer.getChannelData(0)); + this.send({ + realtimeInput: { + audio: { + data: bytesToBase64(pcm), + mimeType: `audio/pcm;rate=${this.inputContext?.sampleRate ?? 16000}`, + }, + }, + }); + }; + this.inputSource.connect(this.inputProcessor); + this.inputProcessor.connect(this.inputContext.destination); + } + + private send(message: unknown): void { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(message)); + } + } + + private handleMessage(data: unknown): void { + let message: GoogleLiveMessage; + try { + message = JSON.parse(String(data)) as GoogleLiveMessage; + } catch { + return; + } + if (message.setupComplete) { + this.ctx.callbacks.onStatus?.("listening"); + } + const content = message.serverContent; + if (content?.interrupted) { + this.playhead = this.outputContext?.currentTime ?? 0; + } + if (content?.inputTranscription?.text) { + this.ctx.callbacks.onTranscript?.({ + role: "user", + text: content.inputTranscription.text, + final: content.inputTranscription.finished ?? false, + }); + } + if (content?.outputTranscription?.text) { + this.ctx.callbacks.onTranscript?.({ + role: "assistant", + text: content.outputTranscription.text, + final: content.outputTranscription.finished ?? false, + }); + } + for (const part of content?.modelTurn?.parts ?? []) { + if (part.inlineData?.data) { + this.playPcm16(part.inlineData.data); + } else if (!part.thought && typeof part.text === "string" && part.text.trim()) { + this.ctx.callbacks.onTranscript?.({ + role: "assistant", + text: part.text, + final: content?.turnComplete ?? false, + }); + } + } + for (const call of message.toolCall?.functionCalls ?? []) { + void this.handleToolCall(call); + } + } + + private playPcm16(base64: string): void { + if (!this.outputContext) { + return; + } + const samples = pcm16ToFloat(base64ToBytes(base64)); + if (samples.length === 0) { + return; + } + const buffer = this.outputContext.createBuffer( + 1, + samples.length, + this.session.audio.outputSampleRateHz, + ); + buffer.getChannelData(0).set(samples); + const source = this.outputContext.createBufferSource(); + source.buffer = buffer; + source.connect(this.outputContext.destination); + const startAt = Math.max(this.outputContext.currentTime, this.playhead); + source.start(startAt); + this.playhead = startAt + buffer.duration; + } + + private async handleToolCall(call: { + id?: string; + name?: string; + args?: unknown; + }): Promise { + const name = call.name?.trim(); + const callId = call.id?.trim(); + if (!name || !callId) { + return; + } + this.pendingCalls.set(callId, { name, args: call.args ?? {} }); + if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { + return; + } + await submitRealtimeTalkConsult({ + ctx: this.ctx, + callId, + args: call.args ?? {}, + submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + }); + } + + private submitToolResult(callId: string, result: unknown): void { + const pending = this.pendingCalls.get(callId); + if (!pending) { + return; + } + this.pendingCalls.delete(callId); + this.send({ + toolResponse: { + functionResponses: [ + { + id: callId, + name: pending.name, + scheduling: "WHEN_IDLE", + response: + result && typeof result === "object" && !Array.isArray(result) + ? result + : { output: result }, + }, + ], + }, + }); + } +} diff --git a/ui/src/ui/chat/realtime-talk-shared.ts b/ui/src/ui/chat/realtime-talk-shared.ts new file mode 100644 index 00000000000..d36e1f37d36 --- /dev/null +++ b/ui/src/ui/chat/realtime-talk-shared.ts @@ -0,0 +1,184 @@ +import { + buildRealtimeVoiceAgentConsultChatMessage, + REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, +} from "../../../../src/realtime-voice/agent-consult-tool.js"; +import type { GatewayBrowserClient, GatewayEventFrame } from "../gateway.ts"; +import { generateUUID } from "../uuid.ts"; + +export type RealtimeTalkStatus = "idle" | "connecting" | "listening" | "thinking" | "error"; + +export type RealtimeTalkCallbacks = { + onStatus?: (status: RealtimeTalkStatus, detail?: string) => void; + onTranscript?: (entry: { role: "user" | "assistant"; text: string; final: boolean }) => void; +}; + +export type RealtimeTalkAudioContract = { + inputEncoding: "pcm16" | "g711_ulaw"; + inputSampleRateHz: number; + outputEncoding: "pcm16" | "g711_ulaw"; + outputSampleRateHz: number; +}; + +export type RealtimeTalkWebRtcSdpSessionResult = { + provider: string; + transport?: "webrtc-sdp"; + clientSecret: string; + offerUrl?: string; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeTalkJsonPcmWebSocketSessionResult = { + provider: string; + transport: "json-pcm-websocket"; + protocol: string; + clientSecret: string; + websocketUrl: string; + audio: RealtimeTalkAudioContract; + initialMessage?: unknown; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeTalkGatewayRelaySessionResult = { + provider: string; + transport: "gateway-relay"; + relaySessionId: string; + audio: RealtimeTalkAudioContract; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeTalkManagedRoomSessionResult = { + provider: string; + transport: "managed-room"; + roomUrl: string; + token?: string; + model?: string; + voice?: string; + expiresAt?: number; +}; + +export type RealtimeTalkSessionResult = + | RealtimeTalkWebRtcSdpSessionResult + | RealtimeTalkJsonPcmWebSocketSessionResult + | RealtimeTalkGatewayRelaySessionResult + | RealtimeTalkManagedRoomSessionResult; + +export type RealtimeTalkTransport = { + start(): Promise; + stop(): void; +}; + +export type RealtimeTalkTransportContext = { + client: GatewayBrowserClient; + sessionKey: string; + callbacks: RealtimeTalkCallbacks; +}; + +type ChatPayload = { + runId?: string; + state?: string; + errorMessage?: string; + message?: unknown; +}; + +function extractTextFromMessage(message: unknown): string { + if (!message || typeof message !== "object") { + return ""; + } + const record = message as Record; + if (typeof record.text === "string") { + return record.text; + } + const content = Array.isArray(record.content) ? record.content : []; + const parts = content + .map((block) => { + if (!block || typeof block !== "object") { + return ""; + } + const entry = block as Record; + return entry.type === "text" && typeof entry.text === "string" ? entry.text : ""; + }) + .filter(Boolean); + return parts.join("\n\n").trim(); +} + +function waitForChatResult(params: { + client: GatewayBrowserClient; + runId: string; + timeoutMs: number; +}): Promise { + return new Promise((resolve, reject) => { + const timer = window.setTimeout(() => { + unsubscribe(); + reject(new Error("OpenClaw tool call timed out")); + }, params.timeoutMs); + const unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => { + if (evt.event !== "chat") { + return; + } + const payload = evt.payload as ChatPayload | undefined; + if (!payload || payload.runId !== params.runId) { + return; + } + if (payload.state === "final") { + window.clearTimeout(timer); + unsubscribe(); + resolve(extractTextFromMessage(payload.message) || "OpenClaw finished with no text."); + } else if (payload.state === "error") { + window.clearTimeout(timer); + unsubscribe(); + reject(new Error(payload.errorMessage ?? "OpenClaw tool call failed")); + } + }); + }); +} + +export async function submitRealtimeTalkConsult(params: { + ctx: RealtimeTalkTransportContext; + args: unknown; + submit: (callId: string, result: unknown) => void; + callId: string; +}): Promise { + const { ctx, callId, submit } = params; + ctx.callbacks.onStatus?.("thinking"); + let question = ""; + try { + const args = + typeof params.args === "string" ? JSON.parse(params.args || "{}") : (params.args ?? {}); + question = buildRealtimeVoiceAgentConsultChatMessage(args); + } catch {} + if (!question) { + submit(callId, { + error: `${REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME} requires a question`, + }); + ctx.callbacks.onStatus?.("listening"); + return; + } + try { + const idempotencyKey = generateUUID(); + const response = await ctx.client.request<{ runId?: string }>("chat.send", { + sessionKey: ctx.sessionKey, + message: question, + idempotencyKey, + }); + const result = await waitForChatResult({ + client: ctx.client, + runId: response.runId ?? idempotencyKey, + timeoutMs: 120_000, + }); + submit(callId, { result }); + } catch (error) { + submit(callId, { + error: error instanceof Error ? error.message : String(error), + }); + } finally { + ctx.callbacks.onStatus?.("listening"); + } +} + +export { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME }; diff --git a/ui/src/ui/chat/realtime-talk-webrtc.ts b/ui/src/ui/chat/realtime-talk-webrtc.ts new file mode 100644 index 00000000000..74681eff705 --- /dev/null +++ b/ui/src/ui/chat/realtime-talk-webrtc.ts @@ -0,0 +1,182 @@ +import type { RealtimeTalkWebRtcSdpSessionResult } from "./realtime-talk-shared.ts"; +import { + REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + submitRealtimeTalkConsult, + type RealtimeTalkTransport, + type RealtimeTalkTransportContext, +} from "./realtime-talk-shared.ts"; + +type RealtimeServerEvent = { + type?: string; + item_id?: string; + call_id?: string; + name?: string; + delta?: string; + transcript?: string; + arguments?: string; +}; + +type ToolBuffer = { + name: string; + callId: string; + args: string; +}; + +export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { + private peer: RTCPeerConnection | null = null; + private channel: RTCDataChannel | null = null; + private media: MediaStream | null = null; + private audio: HTMLAudioElement | null = null; + private closed = false; + private toolBuffers = new Map(); + + constructor( + private readonly session: RealtimeTalkWebRtcSdpSessionResult, + private readonly ctx: RealtimeTalkTransportContext, + ) {} + + async start(): Promise { + if (!navigator.mediaDevices?.getUserMedia || typeof RTCPeerConnection === "undefined") { + throw new Error("Realtime Talk requires browser WebRTC and microphone access"); + } + this.closed = false; + this.peer = new RTCPeerConnection(); + this.audio = document.createElement("audio"); + this.audio.autoplay = true; + this.audio.style.display = "none"; + document.body.append(this.audio); + this.peer.addEventListener("track", (event) => { + if (this.audio) { + this.audio.srcObject = event.streams[0]; + } + }); + this.media = await navigator.mediaDevices.getUserMedia({ audio: true }); + for (const track of this.media.getAudioTracks()) { + this.peer.addTrack(track, this.media); + } + this.channel = this.peer.createDataChannel("oai-events"); + this.channel.addEventListener("open", () => this.ctx.callbacks.onStatus?.("listening")); + this.channel.addEventListener("message", (event) => this.handleRealtimeEvent(event.data)); + this.peer.addEventListener("connectionstatechange", () => { + if (this.closed) { + return; + } + if (this.peer?.connectionState === "failed" || this.peer?.connectionState === "closed") { + this.ctx.callbacks.onStatus?.("error", "Realtime connection closed"); + } + }); + + const offer = await this.peer.createOffer(); + await this.peer.setLocalDescription(offer); + const sdp = await fetch(this.session.offerUrl ?? "https://api.openai.com/v1/realtime/calls", { + method: "POST", + body: offer.sdp, + headers: { + Authorization: `Bearer ${this.session.clientSecret}`, + "Content-Type": "application/sdp", + }, + }); + if (!sdp.ok) { + throw new Error(`Realtime WebRTC setup failed (${sdp.status})`); + } + await this.peer.setRemoteDescription({ + type: "answer", + sdp: await sdp.text(), + }); + } + + stop(): void { + this.closed = true; + this.channel?.close(); + this.channel = null; + this.peer?.close(); + this.peer = null; + this.media?.getTracks().forEach((track) => track.stop()); + this.media = null; + this.audio?.remove(); + this.audio = null; + this.toolBuffers.clear(); + } + + private send(event: unknown): void { + if (this.channel?.readyState === "open") { + this.channel.send(JSON.stringify(event)); + } + } + + private handleRealtimeEvent(data: unknown): void { + let event: RealtimeServerEvent; + try { + event = JSON.parse(String(data)) as RealtimeServerEvent; + } catch { + return; + } + switch (event.type) { + case "conversation.item.input_audio_transcription.completed": + if (event.transcript) { + this.ctx.callbacks.onTranscript?.({ role: "user", text: event.transcript, final: true }); + } + return; + case "response.audio_transcript.done": + if (event.transcript) { + this.ctx.callbacks.onTranscript?.({ + role: "assistant", + text: event.transcript, + final: true, + }); + } + return; + case "response.function_call_arguments.delta": + this.bufferToolDelta(event); + return; + case "response.function_call_arguments.done": + void this.handleToolCall(event); + return; + default: + return; + } + } + + private bufferToolDelta(event: RealtimeServerEvent): void { + const key = event.item_id ?? "unknown"; + const existing = this.toolBuffers.get(key); + if (existing) { + existing.args += event.delta ?? ""; + return; + } + this.toolBuffers.set(key, { + name: event.name ?? "", + callId: event.call_id ?? "", + args: event.delta ?? "", + }); + } + + private async handleToolCall(event: RealtimeServerEvent): Promise { + const key = event.item_id ?? "unknown"; + const buffered = this.toolBuffers.get(key); + this.toolBuffers.delete(key); + const name = buffered?.name || event.name || ""; + const callId = buffered?.callId || event.call_id || ""; + if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME || !callId) { + return; + } + await submitRealtimeTalkConsult({ + ctx: this.ctx, + callId, + args: buffered?.args || event.arguments || "{}", + submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + }); + } + + private submitToolResult(callId: string, result: unknown): void { + this.send({ + type: "conversation.item.create", + item: { + type: "function_call_output", + call_id: callId, + output: JSON.stringify(result), + }, + }); + this.send({ type: "response.create" }); + } +} diff --git a/ui/src/ui/chat/realtime-talk.ts b/ui/src/ui/chat/realtime-talk.ts index 61fcef9af66..67b1abaffe4 100644 --- a/ui/src/ui/chat/realtime-talk.ts +++ b/ui/src/ui/chat/realtime-talk.ts @@ -1,107 +1,50 @@ +import type { GatewayBrowserClient } from "../gateway.ts"; +import { GatewayRelayRealtimeTalkTransport } from "./realtime-talk-gateway-relay.ts"; +import { GoogleLiveRealtimeTalkTransport } from "./realtime-talk-google-live.ts"; import { - buildRealtimeVoiceAgentConsultChatMessage, - REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, -} from "../../../../src/realtime-voice/agent-consult-tool.js"; -import type { GatewayBrowserClient, GatewayEventFrame } from "../gateway.ts"; -import { generateUUID } from "../uuid.ts"; + type RealtimeTalkCallbacks, + type RealtimeTalkGatewayRelaySessionResult, + type RealtimeTalkJsonPcmWebSocketSessionResult, + type RealtimeTalkSessionResult, + type RealtimeTalkStatus, + type RealtimeTalkTransport, + type RealtimeTalkTransportContext, + type RealtimeTalkWebRtcSdpSessionResult, +} from "./realtime-talk-shared.ts"; +import { WebRtcSdpRealtimeTalkTransport } from "./realtime-talk-webrtc.ts"; -export type RealtimeTalkStatus = "idle" | "connecting" | "listening" | "thinking" | "error"; +export type { RealtimeTalkCallbacks, RealtimeTalkSessionResult, RealtimeTalkStatus }; -export type RealtimeTalkCallbacks = { - onStatus?: (status: RealtimeTalkStatus, detail?: string) => void; - onTranscript?: (entry: { role: "user" | "assistant"; text: string; final: boolean }) => void; -}; - -export type RealtimeTalkSessionResult = { - provider: string; - clientSecret: string; - model?: string; - voice?: string; - expiresAt?: number; -}; - -type RealtimeServerEvent = { - type?: string; - item_id?: string; - call_id?: string; - name?: string; - delta?: string; - transcript?: string; - arguments?: string; -}; - -type ToolBuffer = { - name: string; - callId: string; - args: string; -}; - -type ChatPayload = { - runId?: string; - state?: string; - errorMessage?: string; - message?: unknown; -}; - -function extractTextFromMessage(message: unknown): string { - if (!message || typeof message !== "object") { - return ""; +function createTransport( + session: RealtimeTalkSessionResult, + ctx: RealtimeTalkTransportContext, +): RealtimeTalkTransport { + const transport = session.transport ?? "webrtc-sdp"; + if (transport === "webrtc-sdp") { + return new WebRtcSdpRealtimeTalkTransport(session as RealtimeTalkWebRtcSdpSessionResult, ctx); } - const record = message as Record; - if (typeof record.text === "string") { - return record.text; + if (transport === "json-pcm-websocket") { + return new GoogleLiveRealtimeTalkTransport( + session as RealtimeTalkJsonPcmWebSocketSessionResult, + ctx, + ); } - const content = Array.isArray(record.content) ? record.content : []; - const parts = content - .map((block) => { - if (!block || typeof block !== "object") { - return ""; - } - const entry = block as Record; - return entry.type === "text" && typeof entry.text === "string" ? entry.text : ""; - }) - .filter(Boolean); - return parts.join("\n\n").trim(); -} - -function waitForChatResult(params: { - client: GatewayBrowserClient; - runId: string; - timeoutMs: number; -}): Promise { - return new Promise((resolve, reject) => { - const timer = window.setTimeout(() => { - unsubscribe(); - reject(new Error("OpenClaw tool call timed out")); - }, params.timeoutMs); - const unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => { - if (evt.event !== "chat") { - return; - } - const payload = evt.payload as ChatPayload | undefined; - if (!payload || payload.runId !== params.runId) { - return; - } - if (payload.state === "final") { - window.clearTimeout(timer); - unsubscribe(); - resolve(extractTextFromMessage(payload.message) || "OpenClaw finished with no text."); - } else if (payload.state === "error") { - window.clearTimeout(timer); - unsubscribe(); - reject(new Error(payload.errorMessage ?? "OpenClaw tool call failed")); - } - }); - }); + if (transport === "gateway-relay") { + return new GatewayRelayRealtimeTalkTransport( + session as RealtimeTalkGatewayRelaySessionResult, + ctx, + ); + } + if (transport === "managed-room") { + throw new Error("Managed-room realtime Talk sessions are not available in this UI yet"); + } + const unknownTransport = (session as { transport?: string }).transport ?? "unknown"; + throw new Error(`Unsupported realtime Talk transport: ${unknownTransport}`); } export class RealtimeTalkSession { - private peer: RTCPeerConnection | null = null; - private channel: RTCDataChannel | null = null; - private media: MediaStream | null = null; - private audio: HTMLAudioElement | null = null; + private transport: RealtimeTalkTransport | null = null; private closed = false; - private toolBuffers = new Map(); constructor( private readonly client: GatewayBrowserClient, @@ -110,180 +53,26 @@ export class RealtimeTalkSession { ) {} async start(): Promise { - if (!navigator.mediaDevices?.getUserMedia || typeof RTCPeerConnection === "undefined") { - throw new Error("Realtime Talk requires browser WebRTC and microphone access"); - } this.closed = false; this.callbacks.onStatus?.("connecting"); const session = await this.client.request("talk.realtime.session", { sessionKey: this.sessionKey, }); - this.peer = new RTCPeerConnection(); - this.audio = document.createElement("audio"); - this.audio.autoplay = true; - this.audio.style.display = "none"; - document.body.append(this.audio); - this.peer.addEventListener("track", (event) => { - if (this.audio) { - this.audio.srcObject = event.streams[0]; - } - }); - this.media = await navigator.mediaDevices.getUserMedia({ audio: true }); - for (const track of this.media.getAudioTracks()) { - this.peer.addTrack(track, this.media); + if (this.closed) { + return; } - this.channel = this.peer.createDataChannel("oai-events"); - this.channel.addEventListener("open", () => this.callbacks.onStatus?.("listening")); - this.channel.addEventListener("message", (event) => this.handleRealtimeEvent(event.data)); - this.peer.addEventListener("connectionstatechange", () => { - if (this.closed) { - return; - } - if (this.peer?.connectionState === "failed" || this.peer?.connectionState === "closed") { - this.callbacks.onStatus?.("error", "Realtime connection closed"); - } - }); - - const offer = await this.peer.createOffer(); - await this.peer.setLocalDescription(offer); - const sdp = await fetch("https://api.openai.com/v1/realtime/calls", { - method: "POST", - body: offer.sdp, - headers: { - Authorization: `Bearer ${session.clientSecret}`, - "Content-Type": "application/sdp", - }, - }); - if (!sdp.ok) { - throw new Error(`Realtime WebRTC setup failed (${sdp.status})`); - } - await this.peer.setRemoteDescription({ - type: "answer", - sdp: await sdp.text(), + this.transport = createTransport(session, { + client: this.client, + sessionKey: this.sessionKey, + callbacks: this.callbacks, }); + await this.transport.start(); } stop(): void { this.closed = true; this.callbacks.onStatus?.("idle"); - this.channel?.close(); - this.channel = null; - this.peer?.close(); - this.peer = null; - this.media?.getTracks().forEach((track) => track.stop()); - this.media = null; - this.audio?.remove(); - this.audio = null; - this.toolBuffers.clear(); - } - - private send(event: unknown): void { - if (this.channel?.readyState === "open") { - this.channel.send(JSON.stringify(event)); - } - } - - private handleRealtimeEvent(data: unknown): void { - let event: RealtimeServerEvent; - try { - event = JSON.parse(String(data)) as RealtimeServerEvent; - } catch { - return; - } - switch (event.type) { - case "conversation.item.input_audio_transcription.completed": - if (event.transcript) { - this.callbacks.onTranscript?.({ role: "user", text: event.transcript, final: true }); - } - return; - case "response.audio_transcript.done": - if (event.transcript) { - this.callbacks.onTranscript?.({ - role: "assistant", - text: event.transcript, - final: true, - }); - } - return; - case "response.function_call_arguments.delta": - this.bufferToolDelta(event); - return; - case "response.function_call_arguments.done": - void this.handleToolCall(event); - return; - default: - return; - } - } - - private bufferToolDelta(event: RealtimeServerEvent): void { - const key = event.item_id ?? "unknown"; - const existing = this.toolBuffers.get(key); - if (existing) { - existing.args += event.delta ?? ""; - return; - } - this.toolBuffers.set(key, { - name: event.name ?? "", - callId: event.call_id ?? "", - args: event.delta ?? "", - }); - } - - private async handleToolCall(event: RealtimeServerEvent): Promise { - const key = event.item_id ?? "unknown"; - const buffered = this.toolBuffers.get(key); - this.toolBuffers.delete(key); - const name = buffered?.name || event.name || ""; - const callId = buffered?.callId || event.call_id || ""; - if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME || !callId) { - return; - } - this.callbacks.onStatus?.("thinking"); - let question = ""; - try { - question = buildRealtimeVoiceAgentConsultChatMessage( - JSON.parse(buffered?.args || event.arguments || "{}"), - ); - } catch {} - if (!question) { - this.submitToolResult(callId, { - error: `${REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME} requires a question`, - }); - this.callbacks.onStatus?.("listening"); - return; - } - try { - const idempotencyKey = generateUUID(); - const response = await this.client.request<{ runId?: string }>("chat.send", { - sessionKey: this.sessionKey, - message: question, - idempotencyKey, - }); - const result = await waitForChatResult({ - client: this.client, - runId: response.runId ?? idempotencyKey, - timeoutMs: 120_000, - }); - this.submitToolResult(callId, { result }); - } catch (error) { - this.submitToolResult(callId, { - error: error instanceof Error ? error.message : String(error), - }); - } finally { - this.callbacks.onStatus?.("listening"); - } - } - - private submitToolResult(callId: string, result: unknown): void { - this.send({ - type: "conversation.item.create", - item: { - type: "function_call_output", - call_id: callId, - output: JSON.stringify(result), - }, - }); - this.send({ type: "response.create" }); + this.transport?.stop(); + this.transport = null; } } diff --git a/ui/src/ui/realtime-talk.test.ts b/ui/src/ui/realtime-talk.test.ts new file mode 100644 index 00000000000..8ededad9c8f --- /dev/null +++ b/ui/src/ui/realtime-talk.test.ts @@ -0,0 +1,126 @@ +// @vitest-environment node +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const { + googleStart, + googleStop, + relayStart, + relayStop, + webRtcStart, + webRtcStop, + googleCtor, + relayCtor, + webRtcCtor, +} = vi.hoisted(() => ({ + googleStart: vi.fn(async () => undefined), + googleStop: vi.fn(), + relayStart: vi.fn(async () => undefined), + relayStop: vi.fn(), + webRtcStart: vi.fn(async () => undefined), + webRtcStop: vi.fn(), + googleCtor: vi.fn(function () { + return { start: googleStart, stop: googleStop }; + }), + relayCtor: vi.fn(function () { + return { start: relayStart, stop: relayStop }; + }), + webRtcCtor: vi.fn(function () { + return { start: webRtcStart, stop: webRtcStop }; + }), +})); + +vi.mock("./chat/realtime-talk-google-live.ts", () => ({ + GoogleLiveRealtimeTalkTransport: googleCtor, +})); + +vi.mock("./chat/realtime-talk-gateway-relay.ts", () => ({ + GatewayRelayRealtimeTalkTransport: relayCtor, +})); + +vi.mock("./chat/realtime-talk-webrtc.ts", () => ({ + WebRtcSdpRealtimeTalkTransport: webRtcCtor, +})); + +import { RealtimeTalkSession } from "./chat/realtime-talk.ts"; + +describe("RealtimeTalkSession", () => { + beforeEach(() => { + googleStart.mockClear(); + googleStop.mockClear(); + relayStart.mockClear(); + relayStop.mockClear(); + webRtcStart.mockClear(); + webRtcStop.mockClear(); + googleCtor.mockClear(); + relayCtor.mockClear(); + webRtcCtor.mockClear(); + }); + + it("starts the Google Live WebSocket transport from a generic session result", async () => { + const request = vi.fn(async () => ({ + provider: "google", + transport: "json-pcm-websocket", + protocol: "google-live-bidi", + clientSecret: "auth_tokens/session", + websocketUrl: "wss://example.test/live", + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: 16000, + outputEncoding: "pcm16", + outputSampleRateHz: 24000, + }, + })); + const onStatus = vi.fn(); + const session = new RealtimeTalkSession({ request } as never, "main", { onStatus }); + + await session.start(); + + expect(request).toHaveBeenCalledWith("talk.realtime.session", { sessionKey: "main" }); + expect(googleCtor).toHaveBeenCalledTimes(1); + expect(googleStart).toHaveBeenCalledTimes(1); + expect(webRtcCtor).not.toHaveBeenCalled(); + expect(relayCtor).not.toHaveBeenCalled(); + expect(onStatus).toHaveBeenCalledWith("connecting"); + }); + + it("starts the Gateway relay transport for backend-only realtime providers", async () => { + const request = vi.fn(async () => ({ + provider: "example", + transport: "gateway-relay", + relaySessionId: "relay-1", + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: 24000, + outputEncoding: "pcm16", + outputSampleRateHz: 24000, + }, + })); + const session = new RealtimeTalkSession({ request } as never, "main"); + + await session.start(); + session.stop(); + + expect(relayCtor).toHaveBeenCalledTimes(1); + expect(relayStart).toHaveBeenCalledTimes(1); + expect(relayStop).toHaveBeenCalledTimes(1); + expect(googleCtor).not.toHaveBeenCalled(); + expect(webRtcCtor).not.toHaveBeenCalled(); + }); + + it("keeps legacy session results on the OpenAI-style WebRTC transport", async () => { + const request = vi.fn(async () => ({ + provider: "openai", + clientSecret: "secret", + })); + const session = new RealtimeTalkSession({ request } as never, "main"); + + await session.start(); + session.stop(); + + expect(webRtcCtor).toHaveBeenCalledTimes(1); + expect(webRtcStart).toHaveBeenCalledTimes(1); + expect(webRtcStop).toHaveBeenCalledTimes(1); + expect(googleCtor).not.toHaveBeenCalled(); + expect(relayCtor).not.toHaveBeenCalled(); + }); +});