diff --git a/CHANGELOG.md b/CHANGELOG.md index 73e7a9d457d..c99244c9da4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai - Agents/sessions: add mailbox-style `sessions_list` filters for label, agent, and search plus visibility-scoped derived title and last-message previews. (#69839) Thanks @dangoZhang. - Providers/GPT-5: move the GPT-5 prompt overlay into the shared provider runtime so compatible GPT-5 models receive the same behavior and heartbeat guidance through OpenAI, OpenRouter, OpenCode, Codex, and other GPT providers; add `agents.defaults.promptOverlays.gpt5.personality` as the global friendly-style toggle while keeping the OpenAI plugin setting as a fallback. - Providers/xAI: add image generation, text-to-speech, and speech-to-text support, including `grok-imagine-image` / `grok-imagine-image-pro`, reference-image edits, six live xAI voices, MP3/WAV/PCM/G.711 TTS formats, `grok-stt` audio transcription, and xAI realtime transcription for Voice Call streaming. (#68694) Thanks @KateWilkins. +- Providers/STT: add Voice Call streaming transcription for Deepgram, ElevenLabs, and Mistral, and add ElevenLabs Scribe v2 batch audio transcription for inbound media. - Models/commands: add `/models add ` so you can register a model from chat and use it without restarting the gateway; keep `/models` as a simple provider browser while adding clearer add guidance and copy-friendly command examples. (#70211) Thanks @Takhoffman. - Pi/models: update the bundled pi packages to `0.68.1` and let the OpenCode Go catalog come from pi instead of plugin-maintained model aliases, adding the refreshed `opencode-go/kimi-k2.6`, Qwen, GLM, MiMo, and MiniMax entries. - CLI/doctor plugins: lazy-load doctor plugin paths and prefer installed plugin `dist/*` runtime entries over source-adjacent JavaScript fallbacks, reducing the measured `doctor --non-interactive` runtime by about 74% while keeping cold doctor startup on built plugin artifacts. (#69840) Thanks @gumadeiras. diff --git a/docs/providers/deepgram.md b/docs/providers/deepgram.md index e86ebbc38a8..ec2db0fcd1f 100644 --- a/docs/providers/deepgram.md +++ b/docs/providers/deepgram.md @@ -2,18 +2,22 @@ summary: "Deepgram transcription for inbound voice notes" read_when: - You want Deepgram speech-to-text for audio attachments + - You want Deepgram streaming transcription for Voice Call - You need a quick Deepgram config example title: "Deepgram" --- # Deepgram (Audio Transcription) -Deepgram is a speech-to-text API. In OpenClaw it is used for **inbound audio/voice note -transcription** via `tools.media.audio`. +Deepgram is a speech-to-text API. In OpenClaw it is used for inbound +audio/voice-note transcription through `tools.media.audio` and for Voice Call +streaming STT through `plugins.entries.voice-call.config.streaming`. -When enabled, OpenClaw uploads the audio file to Deepgram and injects the transcript -into the reply pipeline (`{{Transcript}}` + `[Audio]` block). This is **not streaming**; -it uses the pre-recorded transcription endpoint. +For batch transcription, OpenClaw uploads the complete audio file to Deepgram +and injects the transcript into the reply pipeline (`{{Transcript}}` + +`[Audio]` block). For Voice Call streaming, OpenClaw forwards live G.711 +u-law frames over Deepgram's WebSocket `listen` endpoint and emits partial or +final transcripts as Deepgram returns them. | Detail | Value | | ------------- | ---------------------------------------------------------- | @@ -101,6 +105,52 @@ it uses the pre-recorded transcription endpoint. +## Voice Call streaming STT + +The bundled `deepgram` plugin also registers a realtime transcription provider +for the Voice Call plugin. + +| Setting | Config path | Default | +| --------------- | ----------------------------------------------------------------------- | -------------------------------- | +| API key | `plugins.entries.voice-call.config.streaming.providers.deepgram.apiKey` | Falls back to `DEEPGRAM_API_KEY` | +| Model | `...deepgram.model` | `nova-3` | +| Language | `...deepgram.language` | (unset) | +| Encoding | `...deepgram.encoding` | `mulaw` | +| Sample rate | `...deepgram.sampleRate` | `8000` | +| Endpointing | `...deepgram.endpointingMs` | `800` | +| Interim results | `...deepgram.interimResults` | `true` | + +```json5 +{ + plugins: { + entries: { + "voice-call": { + config: { + streaming: { + enabled: true, + provider: "deepgram", + providers: { + deepgram: { + apiKey: "${DEEPGRAM_API_KEY}", + model: "nova-3", + endpointingMs: 800, + language: "en-US", + }, + }, + }, + }, + }, + }, + }, +} +``` + + +Voice Call receives telephony audio as 8 kHz G.711 u-law. The Deepgram +streaming provider defaults to `encoding: "mulaw"` and `sampleRate: 8000`, so +Twilio media frames can be forwarded directly. + + ## Notes @@ -118,12 +168,6 @@ it uses the pre-recorded transcription endpoint. - -Deepgram transcription is **pre-recorded only** (not real-time streaming). OpenClaw -uploads the complete audio file and waits for the full transcript before injecting -it into the conversation. - - ## Related diff --git a/docs/providers/elevenlabs.md b/docs/providers/elevenlabs.md new file mode 100644 index 00000000000..29f55c73f70 --- /dev/null +++ b/docs/providers/elevenlabs.md @@ -0,0 +1,111 @@ +--- +summary: "Use ElevenLabs speech, Scribe STT, and realtime transcription with OpenClaw" +read_when: + - You want ElevenLabs text-to-speech in OpenClaw + - You want ElevenLabs Scribe speech-to-text for audio attachments + - You want ElevenLabs realtime transcription for Voice Call +title: "ElevenLabs" +--- + +# ElevenLabs + +OpenClaw uses ElevenLabs for text-to-speech, batch speech-to-text with Scribe +v2, and Voice Call streaming STT with Scribe v2 Realtime. + +| Capability | OpenClaw surface | Default | +| ------------------------ | --------------------------------------------- | ------------------------ | +| Text-to-speech | `messages.tts` / `talk` | `eleven_multilingual_v2` | +| Batch speech-to-text | `tools.media.audio` | `scribe_v2` | +| Streaming speech-to-text | Voice Call `streaming.provider: "elevenlabs"` | `scribe_v2_realtime` | + +## Authentication + +Set `ELEVENLABS_API_KEY` in the environment. `XI_API_KEY` is also accepted for +compatibility with existing ElevenLabs tooling. + +```bash +export ELEVENLABS_API_KEY="..." +``` + +## Text-to-speech + +```json5 +{ + messages: { + tts: { + providers: { + elevenlabs: { + apiKey: "${ELEVENLABS_API_KEY}", + voiceId: "pMsXgVXv3BLzUgSXRplE", + modelId: "eleven_multilingual_v2", + }, + }, + }, + }, +} +``` + +## Speech-to-text + +Use Scribe v2 for inbound audio attachments and short recorded voice segments: + +```json5 +{ + tools: { + media: { + audio: { + enabled: true, + models: [{ provider: "elevenlabs", model: "scribe_v2" }], + }, + }, + }, +} +``` + +OpenClaw sends multipart audio to ElevenLabs `/v1/speech-to-text` with +`model_id: "scribe_v2"`. Language hints map to `language_code` when present. + +## Voice Call streaming STT + +The bundled `elevenlabs` plugin registers Scribe v2 Realtime for Voice Call +streaming transcription. + +| Setting | Config path | Default | +| --------------- | ------------------------------------------------------------------------- | ------------------------------------------------- | +| API key | `plugins.entries.voice-call.config.streaming.providers.elevenlabs.apiKey` | Falls back to `ELEVENLABS_API_KEY` / `XI_API_KEY` | +| Model | `...elevenlabs.modelId` | `scribe_v2_realtime` | +| Audio format | `...elevenlabs.audioFormat` | `ulaw_8000` | +| Sample rate | `...elevenlabs.sampleRate` | `8000` | +| Commit strategy | `...elevenlabs.commitStrategy` | `vad` | +| Language | `...elevenlabs.languageCode` | (unset) | + +```json5 +{ + plugins: { + entries: { + "voice-call": { + config: { + streaming: { + enabled: true, + provider: "elevenlabs", + providers: { + elevenlabs: { + apiKey: "${ELEVENLABS_API_KEY}", + audioFormat: "ulaw_8000", + commitStrategy: "vad", + languageCode: "en", + }, + }, + }, + }, + }, + }, + }, +} +``` + + +Voice Call receives Twilio media as 8 kHz G.711 u-law. The ElevenLabs realtime +provider defaults to `ulaw_8000`, so telephony frames can be forwarded without +transcoding. + diff --git a/docs/providers/index.md b/docs/providers/index.md index c6b82cf62fb..02360a9e20a 100644 --- a/docs/providers/index.md +++ b/docs/providers/index.md @@ -82,6 +82,9 @@ Looking for chat channel docs (WhatsApp/Telegram/Discord/Slack/Mattermost (plugi ## Transcription providers - [Deepgram (audio transcription)](/providers/deepgram) +- [ElevenLabs](/providers/elevenlabs#speech-to-text) +- [Mistral](/providers/mistral#audio-transcription-voxtral) +- [OpenAI](/providers/openai#speech-to-text) - [xAI](/providers/xai#speech-to-text) ## Community tools diff --git a/docs/providers/mistral.md b/docs/providers/mistral.md index 81211a49e34..ec48d1ee0e2 100644 --- a/docs/providers/mistral.md +++ b/docs/providers/mistral.md @@ -2,6 +2,7 @@ summary: "Use Mistral models and Voxtral transcription with OpenClaw" read_when: - You want to use Mistral models in OpenClaw + - You want Voxtral realtime transcription for Voice Call - You need Mistral API key onboarding and model refs title: "Mistral" --- @@ -65,7 +66,8 @@ OpenClaw currently ships this bundled Mistral catalog: ## Audio transcription (Voxtral) -Use Voxtral for audio transcription through the media understanding pipeline. +Use Voxtral for batch audio transcription through the media understanding +pipeline. ```json5 { @@ -84,6 +86,48 @@ Use Voxtral for audio transcription through the media understanding pipeline. The media transcription path uses `/v1/audio/transcriptions`. The default audio model for Mistral is `voxtral-mini-latest`. +## Voice Call streaming STT + +The bundled `mistral` plugin registers Voxtral Realtime as a Voice Call +streaming STT provider. + +| Setting | Config path | Default | +| ------------ | ---------------------------------------------------------------------- | --------------------------------------- | +| API key | `plugins.entries.voice-call.config.streaming.providers.mistral.apiKey` | Falls back to `MISTRAL_API_KEY` | +| Model | `...mistral.model` | `voxtral-mini-transcribe-realtime-2602` | +| Encoding | `...mistral.encoding` | `pcm_mulaw` | +| Sample rate | `...mistral.sampleRate` | `8000` | +| Target delay | `...mistral.targetStreamingDelayMs` | `800` | + +```json5 +{ + plugins: { + entries: { + "voice-call": { + config: { + streaming: { + enabled: true, + provider: "mistral", + providers: { + mistral: { + apiKey: "${MISTRAL_API_KEY}", + targetStreamingDelayMs: 800, + }, + }, + }, + }, + }, + }, + }, +} +``` + + +OpenClaw defaults Mistral realtime STT to `pcm_mulaw` at 8 kHz so Voice Call +can forward Twilio media frames directly. Use `encoding: "pcm_s16le"` and a +matching `sampleRate` only if your upstream stream is already raw PCM. + + ## Advanced configuration diff --git a/docs/tools/media-overview.md b/docs/tools/media-overview.md index 8bcb923c90b..bb85db1471d 100644 --- a/docs/tools/media-overview.md +++ b/docs/tools/media-overview.md @@ -31,11 +31,12 @@ This table shows which providers support which media capabilities across the pla | BytePlus | | Yes | | | | | | ComfyUI | Yes | Yes | Yes | | | | | Deepgram | | | | | Yes | | -| ElevenLabs | | | | Yes | | | +| ElevenLabs | | | | Yes | Yes | | | fal | Yes | Yes | | | | | | Google | Yes | Yes | Yes | | | Yes | | Microsoft | | | | Yes | | | | MiniMax | Yes | Yes | Yes | Yes | | | +| Mistral | | | | | Yes | | | OpenAI | Yes | Yes | | Yes | Yes | Yes | | Qwen | | Yes | | | | | | Runway | | Yes | | | | | @@ -51,6 +52,12 @@ Media understanding uses any vision-capable or audio-capable model registered in Video and music generation run as background tasks because provider processing typically takes 30 seconds to several minutes. When the agent calls `video_generate` or `music_generate`, OpenClaw submits the request to the provider, returns a task ID immediately, and tracks the job in the task ledger. The agent continues responding to other messages while the job runs. When the provider finishes, OpenClaw wakes the agent so it can post the finished media back into the original channel. Image generation and TTS are synchronous and complete inline with the reply. +Deepgram, ElevenLabs, Mistral, OpenAI, and xAI can all transcribe inbound +audio through the batch `tools.media.audio` path when configured. Deepgram, +ElevenLabs, Mistral, OpenAI, and xAI also register Voice Call streaming STT +providers, so live phone audio can be forwarded to the selected vendor +without waiting for a completed recording. + OpenAI maps to OpenClaw's image, video, batch TTS, batch STT, Voice Call streaming STT, realtime voice, and memory embedding surfaces. xAI currently maps to OpenClaw's image, video, search, code-execution, batch TTS, batch STT, diff --git a/extensions/deepgram/audio.live.test.ts b/extensions/deepgram/audio.live.test.ts index bdfba2fca5e..3a0ff252bd8 100644 --- a/extensions/deepgram/audio.live.test.ts +++ b/extensions/deepgram/audio.live.test.ts @@ -1,8 +1,16 @@ import { describe, expect, it } from "vitest"; import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js"; +import { + normalizeTranscriptForMatch, + streamAudioForLiveTest, + synthesizeElevenLabsLiveSpeech, + waitForLiveExpectation, +} from "../../test/helpers/stt-live-audio.js"; import { transcribeDeepgramAudio } from "./audio.js"; +import { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; const DEEPGRAM_KEY = process.env.DEEPGRAM_API_KEY ?? ""; +const ELEVENLABS_KEY = process.env.ELEVENLABS_API_KEY ?? ""; const DEEPGRAM_MODEL = process.env.DEEPGRAM_MODEL?.trim() || "nova-3"; const DEEPGRAM_BASE_URL = process.env.DEEPGRAM_BASE_URL?.trim(); const SAMPLE_URL = @@ -41,4 +49,50 @@ describeLive("deepgram live", () => { }); expect(result.text.trim().length).toBeGreaterThan(0); }, 30000); + + it("streams realtime STT through the registered transcription provider", async () => { + if (!ELEVENLABS_KEY) { + throw new Error("ELEVENLABS_API_KEY required to synthesize live realtime STT input"); + } + const provider = buildDeepgramRealtimeTranscriptionProvider(); + const phrase = "Testing OpenClaw Deepgram realtime transcription integration OK."; + const speech = await synthesizeElevenLabsLiveSpeech({ + text: phrase, + apiKey: ELEVENLABS_KEY, + outputFormat: "ulaw_8000", + timeoutMs: 30_000, + }); + const transcripts: string[] = []; + const partials: string[] = []; + const errors: Error[] = []; + const session = provider.createSession({ + providerConfig: { + apiKey: DEEPGRAM_KEY, + language: "en-US", + endpointingMs: 500, + }, + onPartial: (partial) => partials.push(partial), + onTranscript: (transcript) => transcripts.push(transcript), + onError: (error) => errors.push(error), + }); + + try { + await session.connect(); + await streamAudioForLiveTest({ + audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), + sendAudio: (chunk) => session.sendAudio(chunk), + }); + + await waitForLiveExpectation(() => { + if (errors[0]) { + throw errors[0]; + } + expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); + }, 60_000); + } finally { + session.close(); + } + + expect(partials.length + transcripts.length).toBeGreaterThan(0); + }, 90_000); }); diff --git a/extensions/deepgram/index.ts b/extensions/deepgram/index.ts index d405722004f..253715aea0b 100644 --- a/extensions/deepgram/index.ts +++ b/extensions/deepgram/index.ts @@ -1,5 +1,6 @@ import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; import { deepgramMediaUnderstandingProvider } from "./media-understanding-provider.js"; +import { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; export default definePluginEntry({ id: "deepgram", @@ -7,5 +8,6 @@ export default definePluginEntry({ description: "Bundled Deepgram audio transcription provider", register(api) { api.registerMediaUnderstandingProvider(deepgramMediaUnderstandingProvider); + api.registerRealtimeTranscriptionProvider(buildDeepgramRealtimeTranscriptionProvider()); }, }); diff --git a/extensions/deepgram/openclaw.plugin.json b/extensions/deepgram/openclaw.plugin.json index 6c0c7fb2012..12a38b7149e 100644 --- a/extensions/deepgram/openclaw.plugin.json +++ b/extensions/deepgram/openclaw.plugin.json @@ -5,7 +5,8 @@ "deepgram": ["DEEPGRAM_API_KEY"] }, "contracts": { - "mediaUnderstandingProviders": ["deepgram"] + "mediaUnderstandingProviders": ["deepgram"], + "realtimeTranscriptionProviders": ["deepgram"] }, "mediaUnderstandingProviderMetadata": { "deepgram": { diff --git a/extensions/deepgram/package.json b/extensions/deepgram/package.json index b729d652ad3..4f2baed8938 100644 --- a/extensions/deepgram/package.json +++ b/extensions/deepgram/package.json @@ -4,6 +4,9 @@ "private": true, "description": "OpenClaw Deepgram media-understanding provider", "type": "module", + "dependencies": { + "ws": "^8.20.0" + }, "devDependencies": { "@openclaw/plugin-sdk": "workspace:*" }, diff --git a/extensions/deepgram/realtime-transcription-provider.test.ts b/extensions/deepgram/realtime-transcription-provider.test.ts new file mode 100644 index 00000000000..54c1100796f --- /dev/null +++ b/extensions/deepgram/realtime-transcription-provider.test.ts @@ -0,0 +1,68 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../src/config/types.openclaw.js"; +import { + __testing, + buildDeepgramRealtimeTranscriptionProvider, +} from "./realtime-transcription-provider.js"; + +describe("buildDeepgramRealtimeTranscriptionProvider", () => { + afterEach(() => { + vi.unstubAllEnvs(); + }); + + it("normalizes nested provider config", () => { + const provider = buildDeepgramRealtimeTranscriptionProvider(); + const resolved = provider.resolveConfig?.({ + cfg: {} as OpenClawConfig, + rawConfig: { + providers: { + deepgram: { + apiKey: "dg-key", + model: "nova-3", + encoding: "g711_ulaw", + sample_rate: "8000", + interim_results: "true", + endpointing: "500", + language: "en-US", + }, + }, + }, + }); + + expect(resolved).toMatchObject({ + apiKey: "dg-key", + model: "nova-3", + encoding: "mulaw", + sampleRate: 8000, + interimResults: true, + endpointingMs: 500, + language: "en-US", + }); + }); + + it("builds a Deepgram listen websocket URL", () => { + const url = __testing.toDeepgramRealtimeWsUrl({ + apiKey: "dg-key", + baseUrl: "https://api.deepgram.com/v1", + model: "nova-3", + providerConfig: {}, + sampleRate: 8000, + encoding: "mulaw", + interimResults: true, + endpointingMs: 800, + }); + + expect(url).toContain("wss://api.deepgram.com/v1/listen?"); + expect(url).toContain("model=nova-3"); + expect(url).toContain("encoding=mulaw"); + expect(url).toContain("sample_rate=8000"); + }); + + it("requires an API key when creating sessions", () => { + vi.stubEnv("DEEPGRAM_API_KEY", ""); + const provider = buildDeepgramRealtimeTranscriptionProvider(); + expect(() => provider.createSession({ providerConfig: {} })).toThrow( + "Deepgram API key missing", + ); + }); +}); diff --git a/extensions/deepgram/realtime-transcription-provider.ts b/extensions/deepgram/realtime-transcription-provider.ts new file mode 100644 index 00000000000..28ded1d6a43 --- /dev/null +++ b/extensions/deepgram/realtime-transcription-provider.ts @@ -0,0 +1,504 @@ +import { randomUUID } from "node:crypto"; +import { + captureWsEvent, + createDebugProxyWebSocketAgent, + resolveDebugProxySettings, +} from "openclaw/plugin-sdk/proxy-capture"; +import type { + RealtimeTranscriptionProviderConfig, + RealtimeTranscriptionProviderPlugin, + RealtimeTranscriptionSession, + RealtimeTranscriptionSessionCreateRequest, +} from "openclaw/plugin-sdk/realtime-transcription"; +import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import WebSocket from "ws"; +import { DEFAULT_DEEPGRAM_AUDIO_BASE_URL, DEFAULT_DEEPGRAM_AUDIO_MODEL } from "./audio.js"; + +type DeepgramRealtimeTranscriptionEncoding = "linear16" | "mulaw" | "alaw"; + +type DeepgramRealtimeTranscriptionProviderConfig = { + apiKey?: string; + baseUrl?: string; + model?: string; + language?: string; + sampleRate?: number; + encoding?: DeepgramRealtimeTranscriptionEncoding; + interimResults?: boolean; + endpointingMs?: number; +}; + +type DeepgramRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & { + apiKey: string; + baseUrl: string; + model: string; + sampleRate: number; + encoding: DeepgramRealtimeTranscriptionEncoding; + interimResults: boolean; + endpointingMs: number; + language?: string; +}; + +type DeepgramRealtimeTranscriptionEvent = { + type?: string; + channel?: { + alternatives?: Array<{ + transcript?: string; + }>; + }; + is_final?: boolean; + speech_final?: boolean; + error?: unknown; + message?: string; +}; + +const DEEPGRAM_REALTIME_DEFAULT_SAMPLE_RATE = 8000; +const DEEPGRAM_REALTIME_DEFAULT_ENCODING: DeepgramRealtimeTranscriptionEncoding = "mulaw"; +const DEEPGRAM_REALTIME_DEFAULT_ENDPOINTING_MS = 800; +const DEEPGRAM_REALTIME_CONNECT_TIMEOUT_MS = 10_000; +const DEEPGRAM_REALTIME_CLOSE_TIMEOUT_MS = 5_000; +const DEEPGRAM_REALTIME_MAX_RECONNECT_ATTEMPTS = 5; +const DEEPGRAM_REALTIME_RECONNECT_DELAY_MS = 1000; +const DEEPGRAM_REALTIME_MAX_QUEUED_BYTES = 2 * 1024 * 1024; + +function readRecord(value: unknown): Record | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +function readNestedDeepgramConfig(rawConfig: RealtimeTranscriptionProviderConfig) { + const raw = readRecord(rawConfig); + const providers = readRecord(raw?.providers); + return readRecord(providers?.deepgram ?? raw?.deepgram ?? raw) ?? {}; +} + +function readFiniteNumber(value: unknown): number | undefined { + const next = + typeof value === "number" + ? value + : typeof value === "string" + ? Number.parseFloat(value) + : undefined; + return Number.isFinite(next) ? next : undefined; +} + +function readBoolean(value: unknown): boolean | undefined { + if (typeof value === "boolean") { + return value; + } + if (typeof value !== "string") { + return undefined; + } + const normalized = value.trim().toLowerCase(); + if (["1", "true", "yes", "on"].includes(normalized)) { + return true; + } + if (["0", "false", "no", "off"].includes(normalized)) { + return false; + } + return undefined; +} + +function normalizeDeepgramEncoding( + value: unknown, +): DeepgramRealtimeTranscriptionEncoding | undefined { + const normalized = normalizeOptionalString(value)?.toLowerCase(); + if (!normalized) { + return undefined; + } + if (normalized === "pcm" || normalized === "pcm_s16le" || normalized === "linear16") { + return "linear16"; + } + if (normalized === "ulaw" || normalized === "g711_ulaw" || normalized === "g711-mulaw") { + return "mulaw"; + } + if (normalized === "g711_alaw" || normalized === "g711-alaw") { + return "alaw"; + } + if (normalized === "mulaw" || normalized === "alaw") { + return normalized; + } + throw new Error(`Invalid Deepgram realtime transcription encoding: ${normalized}`); +} + +function normalizeDeepgramRealtimeBaseUrl(value?: string): string { + return ( + normalizeOptionalString(value ?? process.env.DEEPGRAM_BASE_URL) ?? + DEFAULT_DEEPGRAM_AUDIO_BASE_URL + ); +} + +function toDeepgramRealtimeWsUrl(config: DeepgramRealtimeTranscriptionSessionConfig): string { + const url = new URL(normalizeDeepgramRealtimeBaseUrl(config.baseUrl)); + url.protocol = url.protocol === "http:" ? "ws:" : "wss:"; + url.pathname = `${url.pathname.replace(/\/+$/, "")}/listen`; + url.searchParams.set("model", config.model); + url.searchParams.set("encoding", config.encoding); + url.searchParams.set("sample_rate", String(config.sampleRate)); + url.searchParams.set("channels", "1"); + url.searchParams.set("interim_results", String(config.interimResults)); + url.searchParams.set("endpointing", String(config.endpointingMs)); + if (config.language) { + url.searchParams.set("language", config.language); + } + return url.toString(); +} + +function normalizeProviderConfig( + config: RealtimeTranscriptionProviderConfig, +): DeepgramRealtimeTranscriptionProviderConfig { + const raw = readNestedDeepgramConfig(config); + return { + apiKey: normalizeResolvedSecretInputString({ + value: raw.apiKey, + path: "plugins.entries.voice-call.config.streaming.providers.deepgram.apiKey", + }), + baseUrl: normalizeOptionalString(raw.baseUrl), + model: normalizeOptionalString(raw.model ?? raw.sttModel), + language: normalizeOptionalString(raw.language), + sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate), + encoding: normalizeDeepgramEncoding(raw.encoding), + interimResults: readBoolean(raw.interimResults ?? raw.interim_results), + endpointingMs: readFiniteNumber(raw.endpointingMs ?? raw.endpointing ?? raw.silenceDurationMs), + }; +} + +function rawWsDataToBuffer(data: WebSocket.RawData): Buffer { + if (Buffer.isBuffer(data)) { + return data; + } + if (Array.isArray(data)) { + return Buffer.concat(data); + } + return Buffer.from(data); +} + +function readErrorDetail(value: unknown): string { + if (typeof value === "string") { + return value; + } + const record = readRecord(value); + const message = normalizeOptionalString(record?.message); + const code = normalizeOptionalString(record?.code); + return message ?? code ?? "Deepgram realtime transcription error"; +} + +function readTranscriptText(event: DeepgramRealtimeTranscriptionEvent): string | undefined { + return normalizeOptionalString(event.channel?.alternatives?.[0]?.transcript); +} + +class DeepgramRealtimeTranscriptionSession implements RealtimeTranscriptionSession { + private ws: WebSocket | null = null; + private connected = false; + private closed = false; + private reconnectAttempts = 0; + private queuedAudio: Buffer[] = []; + private queuedBytes = 0; + private closeTimer: ReturnType | undefined; + private lastTranscript: string | undefined; + private speechStarted = false; + private reconnecting = false; + private readonly flowId = randomUUID(); + + constructor(private readonly config: DeepgramRealtimeTranscriptionSessionConfig) {} + + async connect(): Promise { + this.closed = false; + this.reconnectAttempts = 0; + await this.doConnect(); + } + + sendAudio(audio: Buffer): void { + if (this.closed || audio.byteLength === 0) { + return; + } + if (this.ws?.readyState === WebSocket.OPEN) { + this.sendAudioFrame(audio); + return; + } + this.queueAudio(audio); + } + + close(): void { + this.closed = true; + this.connected = false; + this.queuedAudio = []; + this.queuedBytes = 0; + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + this.forceClose(); + return; + } + this.sendEvent({ type: "Finalize" }); + this.closeTimer = setTimeout(() => this.forceClose(), DEEPGRAM_REALTIME_CLOSE_TIMEOUT_MS); + } + + isConnected(): boolean { + return this.connected; + } + + private async doConnect(): Promise { + await new Promise((resolve, reject) => { + const url = toDeepgramRealtimeWsUrl(this.config); + const debugProxy = resolveDebugProxySettings(); + const proxyAgent = createDebugProxyWebSocketAgent(debugProxy); + let settled = false; + let opened = false; + const finishConnect = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(connectTimeout); + this.flushQueuedAudio(); + resolve(); + }; + const failConnect = (error: Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(connectTimeout); + this.config.onError?.(error); + this.closed = true; + this.forceClose(); + reject(error); + }; + this.ws = new WebSocket(url, { + headers: { + Authorization: `Token ${this.config.apiKey}`, + }, + ...(proxyAgent ? { agent: proxyAgent } : {}), + }); + + const connectTimeout = setTimeout(() => { + failConnect(new Error("Deepgram realtime transcription connection timeout")); + }, DEEPGRAM_REALTIME_CONNECT_TIMEOUT_MS); + + this.ws.on("open", () => { + opened = true; + this.connected = true; + this.reconnectAttempts = 0; + captureWsEvent({ + url, + direction: "local", + kind: "ws-open", + flowId: this.flowId, + meta: { provider: "deepgram", capability: "realtime-transcription" }, + }); + finishConnect(); + }); + + this.ws.on("message", (data) => { + const payload = rawWsDataToBuffer(data); + captureWsEvent({ + url, + direction: "inbound", + kind: "ws-frame", + flowId: this.flowId, + payload, + meta: { provider: "deepgram", capability: "realtime-transcription" }, + }); + try { + this.handleEvent(JSON.parse(payload.toString()) as DeepgramRealtimeTranscriptionEvent); + } catch (error) { + this.config.onError?.(error instanceof Error ? error : new Error(String(error))); + } + }); + + this.ws.on("error", (error) => { + captureWsEvent({ + url, + direction: "local", + kind: "error", + flowId: this.flowId, + errorText: error instanceof Error ? error.message : String(error), + meta: { provider: "deepgram", capability: "realtime-transcription" }, + }); + if (!opened) { + failConnect(error instanceof Error ? error : new Error(String(error))); + return; + } + this.config.onError?.(error instanceof Error ? error : new Error(String(error))); + }); + + this.ws.on("close", () => { + clearTimeout(connectTimeout); + this.connected = false; + if (this.closeTimer) { + clearTimeout(this.closeTimer); + this.closeTimer = undefined; + } + if (this.closed || !opened || !settled) { + return; + } + void this.attemptReconnect(); + }); + }); + } + + private async attemptReconnect(): Promise { + if (this.closed || this.reconnecting) { + return; + } + if (this.reconnectAttempts >= DEEPGRAM_REALTIME_MAX_RECONNECT_ATTEMPTS) { + this.config.onError?.(new Error("Deepgram realtime transcription reconnect limit reached")); + return; + } + this.reconnectAttempts += 1; + const delay = DEEPGRAM_REALTIME_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1); + this.reconnecting = true; + try { + await new Promise((resolve) => setTimeout(resolve, delay)); + if (!this.closed) { + await this.doConnect(); + } + } catch { + if (!this.closed) { + this.reconnecting = false; + await this.attemptReconnect(); + return; + } + } finally { + this.reconnecting = false; + } + } + + private handleEvent(event: DeepgramRealtimeTranscriptionEvent): void { + switch (event.type) { + case "Results": { + const text = readTranscriptText(event); + if (!text) { + return; + } + if (!this.speechStarted) { + this.speechStarted = true; + this.config.onSpeechStart?.(); + } + if (event.is_final || event.speech_final) { + this.emitTranscript(text); + if (event.speech_final) { + this.speechStarted = false; + } + return; + } + this.config.onPartial?.(text); + return; + } + case "SpeechStarted": + this.speechStarted = true; + this.config.onSpeechStart?.(); + return; + case "Error": + case "error": + this.config.onError?.(new Error(readErrorDetail(event.error ?? event.message))); + return; + default: + return; + } + } + + private emitTranscript(text: string): void { + if (text === this.lastTranscript) { + return; + } + this.lastTranscript = text; + this.config.onTranscript?.(text); + } + + private queueAudio(audio: Buffer): void { + this.queuedAudio.push(Buffer.from(audio)); + this.queuedBytes += audio.byteLength; + while (this.queuedBytes > DEEPGRAM_REALTIME_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) { + const dropped = this.queuedAudio.shift(); + this.queuedBytes -= dropped?.byteLength ?? 0; + } + } + + private flushQueuedAudio(): void { + for (const audio of this.queuedAudio) { + this.sendAudioFrame(audio); + } + this.queuedAudio = []; + this.queuedBytes = 0; + } + + private sendAudioFrame(audio: Buffer): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + this.queueAudio(audio); + return; + } + captureWsEvent({ + url: toDeepgramRealtimeWsUrl(this.config), + direction: "outbound", + kind: "ws-frame", + flowId: this.flowId, + payload: audio, + meta: { provider: "deepgram", capability: "realtime-transcription" }, + }); + this.ws.send(audio); + } + + private sendEvent(event: unknown): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + return; + } + const payload = JSON.stringify(event); + captureWsEvent({ + url: toDeepgramRealtimeWsUrl(this.config), + direction: "outbound", + kind: "ws-frame", + flowId: this.flowId, + payload, + meta: { provider: "deepgram", capability: "realtime-transcription" }, + }); + this.ws.send(payload); + } + + private forceClose(): void { + if (this.closeTimer) { + clearTimeout(this.closeTimer); + this.closeTimer = undefined; + } + this.connected = false; + if (this.ws) { + this.ws.close(1000, "Transcription session closed"); + this.ws = null; + } + } +} + +export function buildDeepgramRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin { + return { + id: "deepgram", + label: "Deepgram Realtime Transcription", + aliases: ["deepgram-realtime", "nova-3-streaming"], + autoSelectOrder: 35, + resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig), + isConfigured: ({ providerConfig }) => + Boolean(normalizeProviderConfig(providerConfig).apiKey || process.env.DEEPGRAM_API_KEY), + createSession: (req) => { + const config = normalizeProviderConfig(req.providerConfig); + const apiKey = config.apiKey || process.env.DEEPGRAM_API_KEY; + if (!apiKey) { + throw new Error("Deepgram API key missing"); + } + return new DeepgramRealtimeTranscriptionSession({ + ...req, + apiKey, + baseUrl: normalizeDeepgramRealtimeBaseUrl(config.baseUrl), + model: config.model ?? DEFAULT_DEEPGRAM_AUDIO_MODEL, + sampleRate: config.sampleRate ?? DEEPGRAM_REALTIME_DEFAULT_SAMPLE_RATE, + encoding: config.encoding ?? DEEPGRAM_REALTIME_DEFAULT_ENCODING, + interimResults: config.interimResults ?? true, + endpointingMs: config.endpointingMs ?? DEEPGRAM_REALTIME_DEFAULT_ENDPOINTING_MS, + language: config.language, + }); + }, + }; +} + +export const __testing = { + normalizeProviderConfig, + toDeepgramRealtimeWsUrl, +}; diff --git a/extensions/deepgram/test-api.ts b/extensions/deepgram/test-api.ts index 89dff7f7255..e9136c71743 100644 --- a/extensions/deepgram/test-api.ts +++ b/extensions/deepgram/test-api.ts @@ -1 +1,2 @@ export { deepgramMediaUnderstandingProvider } from "./media-understanding-provider.js"; +export { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; diff --git a/extensions/elevenlabs/elevenlabs.live.test.ts b/extensions/elevenlabs/elevenlabs.live.test.ts new file mode 100644 index 00000000000..3eff0b29d99 --- /dev/null +++ b/extensions/elevenlabs/elevenlabs.live.test.ts @@ -0,0 +1,84 @@ +import { describe, expect, it } from "vitest"; +import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js"; +import { + normalizeTranscriptForMatch, + streamAudioForLiveTest, + synthesizeElevenLabsLiveSpeech, + waitForLiveExpectation, +} from "../../test/helpers/stt-live-audio.js"; +import { elevenLabsMediaUnderstandingProvider } from "./media-understanding-provider.js"; +import { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; + +const ELEVENLABS_KEY = process.env.ELEVENLABS_API_KEY ?? ""; +const LIVE = isLiveTestEnabled(["ELEVENLABS_LIVE_TEST"]); +const describeLive = LIVE && ELEVENLABS_KEY ? describe : describe.skip; + +describeLive("elevenlabs plugin live", () => { + it("transcribes synthesized speech through the media provider", async () => { + const phrase = "Testing OpenClaw ElevenLabs speech to text integration OK."; + const audio = await synthesizeElevenLabsLiveSpeech({ + text: phrase, + apiKey: ELEVENLABS_KEY, + outputFormat: "mp3_44100_128", + timeoutMs: 30_000, + }); + + const transcript = await elevenLabsMediaUnderstandingProvider.transcribeAudio?.({ + buffer: audio, + fileName: "elevenlabs-live.mp3", + mime: "audio/mpeg", + apiKey: ELEVENLABS_KEY, + timeoutMs: 60_000, + }); + + const normalized = normalizeTranscriptForMatch(transcript?.text ?? ""); + expect(normalized).toContain("openclaw"); + expect(normalized).toContain("elevenlabs"); + }, 90_000); + + it("streams realtime STT through the registered transcription provider", async () => { + const provider = buildElevenLabsRealtimeTranscriptionProvider(); + const phrase = "Testing OpenClaw ElevenLabs realtime transcription integration OK."; + const speech = await synthesizeElevenLabsLiveSpeech({ + text: phrase, + apiKey: ELEVENLABS_KEY, + outputFormat: "ulaw_8000", + timeoutMs: 30_000, + }); + const transcripts: string[] = []; + const partials: string[] = []; + const errors: Error[] = []; + const session = provider.createSession({ + providerConfig: { + apiKey: ELEVENLABS_KEY, + audioFormat: "ulaw_8000", + sampleRate: 8000, + commitStrategy: "vad", + languageCode: "en", + }, + onPartial: (partial) => partials.push(partial), + onTranscript: (transcript) => transcripts.push(transcript), + onError: (error) => errors.push(error), + }); + + try { + await session.connect(); + await streamAudioForLiveTest({ + audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), + sendAudio: (chunk) => session.sendAudio(chunk), + }); + session.close(); + + await waitForLiveExpectation(() => { + if (errors[0]) { + throw errors[0]; + } + expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); + }, 60_000); + } finally { + session.close(); + } + + expect(partials.length + transcripts.length).toBeGreaterThan(0); + }, 90_000); +}); diff --git a/extensions/elevenlabs/index.ts b/extensions/elevenlabs/index.ts index 0a9c3cc6194..f64b7738a75 100644 --- a/extensions/elevenlabs/index.ts +++ b/extensions/elevenlabs/index.ts @@ -1,4 +1,6 @@ import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry"; +import { elevenLabsMediaUnderstandingProvider } from "./media-understanding-provider.js"; +import { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; import { buildElevenLabsSpeechProvider } from "./speech-provider.js"; export default definePluginEntry({ @@ -7,5 +9,7 @@ export default definePluginEntry({ description: "Bundled ElevenLabs speech provider", register(api) { api.registerSpeechProvider(buildElevenLabsSpeechProvider()); + api.registerMediaUnderstandingProvider(elevenLabsMediaUnderstandingProvider); + api.registerRealtimeTranscriptionProvider(buildElevenLabsRealtimeTranscriptionProvider()); }, }); diff --git a/extensions/elevenlabs/media-understanding-provider.test.ts b/extensions/elevenlabs/media-understanding-provider.test.ts new file mode 100644 index 00000000000..4362ee91bb4 --- /dev/null +++ b/extensions/elevenlabs/media-understanding-provider.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, it, vi } from "vitest"; +import { + elevenLabsMediaUnderstandingProvider, + transcribeElevenLabsAudio, +} from "./media-understanding-provider.js"; + +describe("elevenLabsMediaUnderstandingProvider", () => { + it("has expected provider metadata", () => { + expect(elevenLabsMediaUnderstandingProvider.id).toBe("elevenlabs"); + expect(elevenLabsMediaUnderstandingProvider.capabilities).toEqual(["audio"]); + expect(elevenLabsMediaUnderstandingProvider.defaultModels?.audio).toBe("scribe_v2"); + expect(elevenLabsMediaUnderstandingProvider.transcribeAudio).toBeDefined(); + }); + + it("posts multipart audio to ElevenLabs speech-to-text", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(JSON.stringify({ text: "hello" }))); + + const result = await transcribeElevenLabsAudio({ + buffer: Buffer.from("audio"), + fileName: "voice.mp3", + mime: "audio/mpeg", + apiKey: "eleven-key", + model: "scribe_v2", + language: "en", + timeoutMs: 1000, + fetchFn: fetchMock, + }); + + expect(result).toEqual({ text: "hello", model: "scribe_v2" }); + expect(fetchMock).toHaveBeenCalledWith( + "https://api.elevenlabs.io/v1/speech-to-text", + expect.objectContaining({ + method: "POST", + headers: { "xi-api-key": "eleven-key" }, + }), + ); + const init = fetchMock.mock.calls[0]?.[1] as RequestInit; + const form = init.body as FormData; + expect(form.get("model_id")).toBe("scribe_v2"); + expect(form.get("language_code")).toBe("en"); + expect(form.get("file")).toBeInstanceOf(Blob); + }); +}); diff --git a/extensions/elevenlabs/media-understanding-provider.ts b/extensions/elevenlabs/media-understanding-provider.ts new file mode 100644 index 00000000000..a26a04975f2 --- /dev/null +++ b/extensions/elevenlabs/media-understanding-provider.ts @@ -0,0 +1,108 @@ +import path from "node:path"; +import type { + AudioTranscriptionRequest, + AudioTranscriptionResult, + MediaUnderstandingProvider, +} from "openclaw/plugin-sdk/media-understanding"; +import { normalizeElevenLabsBaseUrl } from "./shared.js"; + +const DEFAULT_ELEVENLABS_STT_MODEL = "scribe_v2"; + +function resolveUploadFileName(fileName?: string, mime?: string): string { + const trimmed = fileName?.trim(); + const baseName = trimmed ? path.basename(trimmed) : "audio"; + const lowerMime = mime?.trim().toLowerCase(); + + if (/\.aac$/i.test(baseName)) { + return `${baseName.slice(0, -4) || "audio"}.m4a`; + } + if (!path.extname(baseName) && lowerMime === "audio/aac") { + return `${baseName || "audio"}.m4a`; + } + return baseName; +} + +async function readErrorDetail(res: Response): Promise { + const text = (await res.text()).trim(); + if (!text) { + return undefined; + } + try { + const json = JSON.parse(text) as { + detail?: { message?: string; detail?: string; status?: string; code?: string }; + message?: string; + error?: string; + }; + return ( + json.message ?? + json.detail?.message ?? + json.detail?.detail ?? + json.error ?? + json.detail?.status ?? + json.detail?.code + ); + } catch { + return text.slice(0, 300); + } +} + +export async function transcribeElevenLabsAudio( + req: AudioTranscriptionRequest, +): Promise { + const fetchFn = req.fetchFn ?? fetch; + const apiKey = req.apiKey || process.env.ELEVENLABS_API_KEY || process.env.XI_API_KEY; + if (!apiKey) { + throw new Error("ElevenLabs API key missing"); + } + + const model = req.model?.trim() || DEFAULT_ELEVENLABS_STT_MODEL; + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), req.timeoutMs); + + try { + const form = new FormData(); + const bytes = new Uint8Array(req.buffer); + const blob = new Blob([bytes], { type: req.mime ?? "application/octet-stream" }); + form.append("file", blob, resolveUploadFileName(req.fileName, req.mime)); + form.append("model_id", model); + if (req.language?.trim()) { + form.append("language_code", req.language.trim()); + } + if (req.prompt?.trim()) { + form.append("prompt", req.prompt.trim()); + } + + const res = await fetchFn(`${normalizeElevenLabsBaseUrl(req.baseUrl)}/v1/speech-to-text`, { + method: "POST", + headers: { + "xi-api-key": apiKey, + }, + body: form, + signal: controller.signal, + }); + + if (!res.ok) { + const detail = await readErrorDetail(res); + throw new Error( + `ElevenLabs audio transcription failed (${res.status})${detail ? `: ${detail}` : ""}`, + ); + } + + const payload = (await res.json()) as { text?: string }; + const text = payload.text?.trim(); + if (!text) { + throw new Error("ElevenLabs audio transcription response missing text"); + } + return { text, model }; + } finally { + clearTimeout(timeout); + } +} + +export const elevenLabsMediaUnderstandingProvider: MediaUnderstandingProvider = { + id: "elevenlabs", + capabilities: ["audio"], + defaultModels: { audio: DEFAULT_ELEVENLABS_STT_MODEL }, + autoPriority: { audio: 45 }, + transcribeAudio: transcribeElevenLabsAudio, +}; diff --git a/extensions/elevenlabs/openclaw.plugin.json b/extensions/elevenlabs/openclaw.plugin.json index 3b7154df40e..4d172d57694 100644 --- a/extensions/elevenlabs/openclaw.plugin.json +++ b/extensions/elevenlabs/openclaw.plugin.json @@ -1,8 +1,24 @@ { "id": "elevenlabs", "enabledByDefault": true, + "providerAuthEnvVars": { + "elevenlabs": ["ELEVENLABS_API_KEY", "XI_API_KEY"] + }, "contracts": { - "speechProviders": ["elevenlabs"] + "speechProviders": ["elevenlabs"], + "mediaUnderstandingProviders": ["elevenlabs"], + "realtimeTranscriptionProviders": ["elevenlabs"] + }, + "mediaUnderstandingProviderMetadata": { + "elevenlabs": { + "capabilities": ["audio"], + "defaultModels": { + "audio": "scribe_v2" + }, + "autoPriority": { + "audio": 45 + } + } }, "configContracts": { "compatibilityMigrationPaths": [ diff --git a/extensions/elevenlabs/package.json b/extensions/elevenlabs/package.json index f8aca5f71b8..a7a61c6dfc8 100644 --- a/extensions/elevenlabs/package.json +++ b/extensions/elevenlabs/package.json @@ -4,6 +4,9 @@ "private": true, "description": "OpenClaw ElevenLabs speech plugin", "type": "module", + "dependencies": { + "ws": "^8.20.0" + }, "devDependencies": { "@openclaw/plugin-sdk": "workspace:*" }, diff --git a/extensions/elevenlabs/realtime-transcription-provider.test.ts b/extensions/elevenlabs/realtime-transcription-provider.test.ts new file mode 100644 index 00000000000..401166ff867 --- /dev/null +++ b/extensions/elevenlabs/realtime-transcription-provider.test.ts @@ -0,0 +1,54 @@ +import { describe, expect, it } from "vitest"; +import type { OpenClawConfig } from "../../src/config/types.openclaw.js"; +import { + __testing, + buildElevenLabsRealtimeTranscriptionProvider, +} from "./realtime-transcription-provider.js"; + +describe("buildElevenLabsRealtimeTranscriptionProvider", () => { + it("normalizes nested provider config", () => { + const provider = buildElevenLabsRealtimeTranscriptionProvider(); + const resolved = provider.resolveConfig?.({ + cfg: {} as OpenClawConfig, + rawConfig: { + providers: { + elevenlabs: { + apiKey: "eleven-key", + model_id: "scribe_v2_realtime", + audio_format: "ulaw_8000", + sample_rate: "8000", + commit_strategy: "vad", + language: "en", + }, + }, + }, + }); + + expect(resolved).toMatchObject({ + apiKey: "eleven-key", + audioFormat: "ulaw_8000", + sampleRate: 8000, + commitStrategy: "vad", + languageCode: "en", + }); + }); + + it("builds an ElevenLabs realtime websocket URL", () => { + const url = __testing.toElevenLabsRealtimeWsUrl({ + apiKey: "eleven-key", + baseUrl: "https://api.elevenlabs.io", + providerConfig: {}, + modelId: "scribe_v2_realtime", + audioFormat: "ulaw_8000", + sampleRate: 8000, + commitStrategy: "vad", + languageCode: "en", + }); + + expect(url).toContain("wss://api.elevenlabs.io/v1/speech-to-text/realtime?"); + expect(url).toContain("model_id=scribe_v2_realtime"); + expect(url).toContain("audio_format=ulaw_8000"); + expect(url).toContain("commit_strategy=vad"); + expect(url).toContain("language_code=en"); + }); +}); diff --git a/extensions/elevenlabs/realtime-transcription-provider.ts b/extensions/elevenlabs/realtime-transcription-provider.ts new file mode 100644 index 00000000000..83751388148 --- /dev/null +++ b/extensions/elevenlabs/realtime-transcription-provider.ts @@ -0,0 +1,488 @@ +import { randomUUID } from "node:crypto"; +import { + captureWsEvent, + createDebugProxyWebSocketAgent, + resolveDebugProxySettings, +} from "openclaw/plugin-sdk/proxy-capture"; +import type { + RealtimeTranscriptionProviderConfig, + RealtimeTranscriptionProviderPlugin, + RealtimeTranscriptionSession, + RealtimeTranscriptionSessionCreateRequest, +} from "openclaw/plugin-sdk/realtime-transcription"; +import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import WebSocket from "ws"; +import { resolveElevenLabsApiKeyWithProfileFallback } from "./config-api.js"; +import { normalizeElevenLabsBaseUrl } from "./shared.js"; + +type ElevenLabsRealtimeTranscriptionProviderConfig = { + apiKey?: string; + baseUrl?: string; + modelId?: string; + audioFormat?: string; + sampleRate?: number; + languageCode?: string; + commitStrategy?: "manual" | "vad"; + vadSilenceThresholdSecs?: number; + vadThreshold?: number; + minSpeechDurationMs?: number; + minSilenceDurationMs?: number; +}; + +type ElevenLabsRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & { + apiKey: string; + baseUrl: string; + modelId: string; + audioFormat: string; + sampleRate: number; + commitStrategy: "manual" | "vad"; + languageCode?: string; + vadSilenceThresholdSecs?: number; + vadThreshold?: number; + minSpeechDurationMs?: number; + minSilenceDurationMs?: number; +}; + +type ElevenLabsRealtimeTranscriptionEvent = { + message_type?: string; + text?: string; + error?: string; + message?: string; + code?: string; +}; + +const ELEVENLABS_REALTIME_DEFAULT_MODEL = "scribe_v2_realtime"; +const ELEVENLABS_REALTIME_DEFAULT_AUDIO_FORMAT = "ulaw_8000"; +const ELEVENLABS_REALTIME_DEFAULT_SAMPLE_RATE = 8000; +const ELEVENLABS_REALTIME_DEFAULT_COMMIT_STRATEGY: "manual" | "vad" = "vad"; +const ELEVENLABS_REALTIME_CONNECT_TIMEOUT_MS = 10_000; +const ELEVENLABS_REALTIME_CLOSE_TIMEOUT_MS = 5_000; +const ELEVENLABS_REALTIME_MAX_RECONNECT_ATTEMPTS = 5; +const ELEVENLABS_REALTIME_RECONNECT_DELAY_MS = 1000; +const ELEVENLABS_REALTIME_MAX_QUEUED_BYTES = 2 * 1024 * 1024; + +function readRecord(value: unknown): Record | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +function readNestedElevenLabsConfig(rawConfig: RealtimeTranscriptionProviderConfig) { + const raw = readRecord(rawConfig); + const providers = readRecord(raw?.providers); + return readRecord(providers?.elevenlabs ?? raw?.elevenlabs ?? raw) ?? {}; +} + +function readFiniteNumber(value: unknown): number | undefined { + const next = + typeof value === "number" + ? value + : typeof value === "string" + ? Number.parseFloat(value) + : undefined; + return Number.isFinite(next) ? next : undefined; +} + +function normalizeCommitStrategy(value: unknown): "manual" | "vad" | undefined { + const normalized = normalizeOptionalString(value)?.toLowerCase(); + if (!normalized) { + return undefined; + } + if (normalized === "manual" || normalized === "vad") { + return normalized; + } + throw new Error(`Invalid ElevenLabs realtime transcription commit strategy: ${normalized}`); +} + +function normalizeProviderConfig( + config: RealtimeTranscriptionProviderConfig, +): ElevenLabsRealtimeTranscriptionProviderConfig { + const raw = readNestedElevenLabsConfig(config); + return { + apiKey: normalizeResolvedSecretInputString({ + value: raw.apiKey, + path: "plugins.entries.voice-call.config.streaming.providers.elevenlabs.apiKey", + }), + baseUrl: normalizeOptionalString(raw.baseUrl), + modelId: normalizeOptionalString(raw.modelId ?? raw.model ?? raw.sttModel), + audioFormat: normalizeOptionalString(raw.audioFormat ?? raw.audio_format ?? raw.encoding), + sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate), + languageCode: normalizeOptionalString(raw.languageCode ?? raw.language), + commitStrategy: normalizeCommitStrategy(raw.commitStrategy ?? raw.commit_strategy), + vadSilenceThresholdSecs: readFiniteNumber( + raw.vadSilenceThresholdSecs ?? raw.vad_silence_threshold_secs, + ), + vadThreshold: readFiniteNumber(raw.vadThreshold ?? raw.vad_threshold), + minSpeechDurationMs: readFiniteNumber(raw.minSpeechDurationMs ?? raw.min_speech_duration_ms), + minSilenceDurationMs: readFiniteNumber(raw.minSilenceDurationMs ?? raw.min_silence_duration_ms), + }; +} + +function normalizeElevenLabsRealtimeBaseUrl(value?: string): string { + const url = new URL(normalizeElevenLabsBaseUrl(value)); + url.protocol = url.protocol === "http:" ? "ws:" : "wss:"; + return url.toString().replace(/\/+$/, ""); +} + +function toElevenLabsRealtimeWsUrl(config: ElevenLabsRealtimeTranscriptionSessionConfig): string { + const url = new URL( + `${normalizeElevenLabsRealtimeBaseUrl(config.baseUrl)}/v1/speech-to-text/realtime`, + ); + url.searchParams.set("model_id", config.modelId); + url.searchParams.set("audio_format", config.audioFormat); + url.searchParams.set("commit_strategy", config.commitStrategy); + url.searchParams.set("include_timestamps", "false"); + url.searchParams.set("include_language_detection", "false"); + if (config.languageCode) { + url.searchParams.set("language_code", config.languageCode); + } + if (config.vadSilenceThresholdSecs != null) { + url.searchParams.set("vad_silence_threshold_secs", String(config.vadSilenceThresholdSecs)); + } + if (config.vadThreshold != null) { + url.searchParams.set("vad_threshold", String(config.vadThreshold)); + } + if (config.minSpeechDurationMs != null) { + url.searchParams.set("min_speech_duration_ms", String(config.minSpeechDurationMs)); + } + if (config.minSilenceDurationMs != null) { + url.searchParams.set("min_silence_duration_ms", String(config.minSilenceDurationMs)); + } + return url.toString(); +} + +function rawWsDataToBuffer(data: WebSocket.RawData): Buffer { + if (Buffer.isBuffer(data)) { + return data; + } + if (Array.isArray(data)) { + return Buffer.concat(data); + } + return Buffer.from(data); +} + +function readErrorDetail(event: ElevenLabsRealtimeTranscriptionEvent): string { + return ( + normalizeOptionalString(event.error) ?? + normalizeOptionalString(event.message) ?? + normalizeOptionalString(event.code) ?? + "ElevenLabs realtime transcription error" + ); +} + +class ElevenLabsRealtimeTranscriptionSession implements RealtimeTranscriptionSession { + private ws: WebSocket | null = null; + private connected = false; + private ready = false; + private closed = false; + private reconnectAttempts = 0; + private queuedAudio: Buffer[] = []; + private queuedBytes = 0; + private closeTimer: ReturnType | undefined; + private lastTranscript: string | undefined; + private reconnecting = false; + private readonly flowId = randomUUID(); + + constructor(private readonly config: ElevenLabsRealtimeTranscriptionSessionConfig) {} + + async connect(): Promise { + this.closed = false; + this.reconnectAttempts = 0; + await this.doConnect(); + } + + sendAudio(audio: Buffer): void { + if (this.closed || audio.byteLength === 0) { + return; + } + if (this.ws?.readyState === WebSocket.OPEN && this.ready) { + this.sendAudioChunk(audio); + return; + } + this.queueAudio(audio); + } + + close(): void { + this.closed = true; + this.connected = false; + this.queuedAudio = []; + this.queuedBytes = 0; + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + this.forceClose(); + return; + } + this.sendJson({ + message_type: "input_audio_chunk", + audio_base_64: "", + sample_rate: this.config.sampleRate, + commit: true, + }); + this.closeTimer = setTimeout(() => this.forceClose(), ELEVENLABS_REALTIME_CLOSE_TIMEOUT_MS); + } + + isConnected(): boolean { + return this.connected; + } + + private async doConnect(): Promise { + await new Promise((resolve, reject) => { + const url = toElevenLabsRealtimeWsUrl(this.config); + const debugProxy = resolveDebugProxySettings(); + const proxyAgent = createDebugProxyWebSocketAgent(debugProxy); + let settled = false; + let opened = false; + const finishConnect = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(connectTimeout); + this.ready = true; + this.flushQueuedAudio(); + resolve(); + }; + const failConnect = (error: Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(connectTimeout); + this.config.onError?.(error); + this.closed = true; + this.forceClose(); + reject(error); + }; + this.ready = false; + this.ws = new WebSocket(url, { + headers: { + "xi-api-key": this.config.apiKey, + }, + ...(proxyAgent ? { agent: proxyAgent } : {}), + }); + + const connectTimeout = setTimeout(() => { + failConnect(new Error("ElevenLabs realtime transcription connection timeout")); + }, ELEVENLABS_REALTIME_CONNECT_TIMEOUT_MS); + + this.ws.on("open", () => { + opened = true; + this.connected = true; + this.reconnectAttempts = 0; + captureWsEvent({ + url, + direction: "local", + kind: "ws-open", + flowId: this.flowId, + meta: { provider: "elevenlabs", capability: "realtime-transcription" }, + }); + }); + + this.ws.on("message", (data) => { + const payload = rawWsDataToBuffer(data); + captureWsEvent({ + url, + direction: "inbound", + kind: "ws-frame", + flowId: this.flowId, + payload, + meta: { provider: "elevenlabs", capability: "realtime-transcription" }, + }); + try { + const event = JSON.parse(payload.toString()) as ElevenLabsRealtimeTranscriptionEvent; + if (event.message_type === "session_started") { + finishConnect(); + return; + } + if (!this.ready && event.message_type?.includes("error")) { + failConnect(new Error(readErrorDetail(event))); + return; + } + this.handleEvent(event); + } catch (error) { + this.config.onError?.(error instanceof Error ? error : new Error(String(error))); + } + }); + + this.ws.on("error", (error) => { + captureWsEvent({ + url, + direction: "local", + kind: "error", + flowId: this.flowId, + errorText: error instanceof Error ? error.message : String(error), + meta: { provider: "elevenlabs", capability: "realtime-transcription" }, + }); + if (!opened) { + failConnect(error instanceof Error ? error : new Error(String(error))); + return; + } + this.config.onError?.(error instanceof Error ? error : new Error(String(error))); + }); + + this.ws.on("close", () => { + clearTimeout(connectTimeout); + this.connected = false; + this.ready = false; + if (this.closed || !opened || !settled) { + return; + } + void this.attemptReconnect(); + }); + }); + } + + private async attemptReconnect(): Promise { + if (this.closed || this.reconnecting) { + return; + } + if (this.reconnectAttempts >= ELEVENLABS_REALTIME_MAX_RECONNECT_ATTEMPTS) { + this.config.onError?.(new Error("ElevenLabs realtime transcription reconnect limit reached")); + return; + } + this.reconnectAttempts += 1; + const delay = ELEVENLABS_REALTIME_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1); + this.reconnecting = true; + try { + await new Promise((resolve) => setTimeout(resolve, delay)); + if (!this.closed) { + await this.doConnect(); + } + } catch { + if (!this.closed) { + this.reconnecting = false; + await this.attemptReconnect(); + return; + } + } finally { + this.reconnecting = false; + } + } + + private handleEvent(event: ElevenLabsRealtimeTranscriptionEvent): void { + switch (event.message_type) { + case "partial_transcript": + if (event.text) { + this.config.onPartial?.(event.text); + } + return; + case "committed_transcript": + case "committed_transcript_with_timestamps": + if (event.text) { + this.emitTranscript(event.text); + } + return; + default: + if (event.message_type?.includes("error")) { + this.config.onError?.(new Error(readErrorDetail(event))); + } + return; + } + } + + private emitTranscript(text: string): void { + if (text === this.lastTranscript) { + return; + } + this.lastTranscript = text; + this.config.onTranscript?.(text); + } + + private queueAudio(audio: Buffer): void { + this.queuedAudio.push(Buffer.from(audio)); + this.queuedBytes += audio.byteLength; + while (this.queuedBytes > ELEVENLABS_REALTIME_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) { + const dropped = this.queuedAudio.shift(); + this.queuedBytes -= dropped?.byteLength ?? 0; + } + } + + private flushQueuedAudio(): void { + for (const audio of this.queuedAudio) { + this.sendAudioChunk(audio); + } + this.queuedAudio = []; + this.queuedBytes = 0; + } + + private sendAudioChunk(audio: Buffer): void { + this.sendJson({ + message_type: "input_audio_chunk", + audio_base_64: audio.toString("base64"), + sample_rate: this.config.sampleRate, + ...(this.config.commitStrategy === "manual" ? { commit: true } : {}), + }); + } + + private sendJson(event: unknown): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + return; + } + const payload = JSON.stringify(event); + captureWsEvent({ + url: toElevenLabsRealtimeWsUrl(this.config), + direction: "outbound", + kind: "ws-frame", + flowId: this.flowId, + payload, + meta: { provider: "elevenlabs", capability: "realtime-transcription" }, + }); + this.ws.send(payload); + } + + private forceClose(): void { + if (this.closeTimer) { + clearTimeout(this.closeTimer); + this.closeTimer = undefined; + } + this.connected = false; + this.ready = false; + if (this.ws) { + this.ws.close(1000, "Transcription session closed"); + this.ws = null; + } + } +} + +export function buildElevenLabsRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin { + return { + id: "elevenlabs", + label: "ElevenLabs Realtime Transcription", + aliases: ["elevenlabs-realtime", "scribe-v2-realtime"], + autoSelectOrder: 40, + resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig), + isConfigured: ({ providerConfig }) => + Boolean( + normalizeProviderConfig(providerConfig).apiKey || + resolveElevenLabsApiKeyWithProfileFallback() || + process.env.XI_API_KEY, + ), + createSession: (req) => { + const config = normalizeProviderConfig(req.providerConfig); + const apiKey = + config.apiKey || resolveElevenLabsApiKeyWithProfileFallback() || process.env.XI_API_KEY; + if (!apiKey) { + throw new Error("ElevenLabs API key missing"); + } + return new ElevenLabsRealtimeTranscriptionSession({ + ...req, + apiKey, + baseUrl: normalizeElevenLabsBaseUrl(config.baseUrl), + modelId: config.modelId ?? ELEVENLABS_REALTIME_DEFAULT_MODEL, + audioFormat: config.audioFormat ?? ELEVENLABS_REALTIME_DEFAULT_AUDIO_FORMAT, + sampleRate: config.sampleRate ?? ELEVENLABS_REALTIME_DEFAULT_SAMPLE_RATE, + commitStrategy: config.commitStrategy ?? ELEVENLABS_REALTIME_DEFAULT_COMMIT_STRATEGY, + languageCode: config.languageCode, + vadSilenceThresholdSecs: config.vadSilenceThresholdSecs, + vadThreshold: config.vadThreshold, + minSpeechDurationMs: config.minSpeechDurationMs, + minSilenceDurationMs: config.minSilenceDurationMs, + }); + }, + }; +} + +export const __testing = { + normalizeProviderConfig, + toElevenLabsRealtimeWsUrl, +}; diff --git a/extensions/elevenlabs/test-api.ts b/extensions/elevenlabs/test-api.ts index 629f0e304dc..0eeca5b5d18 100644 --- a/extensions/elevenlabs/test-api.ts +++ b/extensions/elevenlabs/test-api.ts @@ -1 +1,6 @@ +export { + elevenLabsMediaUnderstandingProvider, + transcribeElevenLabsAudio, +} from "./media-understanding-provider.js"; +export { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; export { buildElevenLabsSpeechProvider } from "./speech-provider.js"; diff --git a/extensions/mistral/index.ts b/extensions/mistral/index.ts index 15aed5131a9..da6cb4906b1 100644 --- a/extensions/mistral/index.ts +++ b/extensions/mistral/index.ts @@ -5,6 +5,7 @@ import { mistralMemoryEmbeddingProviderAdapter } from "./memory-embedding-adapte import { applyMistralConfig, MISTRAL_DEFAULT_MODEL_REF } from "./onboard.js"; import { buildMistralProvider } from "./provider-catalog.js"; import { contributeMistralResolvedModelCompat } from "./provider-compat.js"; +import { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; const PROVIDER_ID = "mistral"; export function buildMistralReplayPolicy() { @@ -55,5 +56,6 @@ export default defineSingleProviderPluginEntry({ register(api) { api.registerMemoryEmbeddingProvider(mistralMemoryEmbeddingProviderAdapter); api.registerMediaUnderstandingProvider(mistralMediaUnderstandingProvider); + api.registerRealtimeTranscriptionProvider(buildMistralRealtimeTranscriptionProvider()); }, }); diff --git a/extensions/mistral/mistral.live.test.ts b/extensions/mistral/mistral.live.test.ts new file mode 100644 index 00000000000..6aec8fd1c1e --- /dev/null +++ b/extensions/mistral/mistral.live.test.ts @@ -0,0 +1,84 @@ +import { describe, expect, it } from "vitest"; +import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js"; +import { + normalizeTranscriptForMatch, + streamAudioForLiveTest, + synthesizeElevenLabsLiveSpeech, + waitForLiveExpectation, +} from "../../test/helpers/stt-live-audio.js"; +import { mistralMediaUnderstandingProvider } from "./media-understanding-provider.js"; +import { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; + +const MISTRAL_KEY = process.env.MISTRAL_API_KEY ?? ""; +const ELEVENLABS_KEY = process.env.ELEVENLABS_API_KEY ?? ""; +const LIVE = isLiveTestEnabled(["MISTRAL_LIVE_TEST"]); +const describeLive = LIVE && MISTRAL_KEY && ELEVENLABS_KEY ? describe : describe.skip; + +describeLive("mistral plugin live", () => { + it("transcribes synthesized speech through the media provider", async () => { + const phrase = "Testing OpenClaw Mistral speech to text integration OK."; + const audio = await synthesizeElevenLabsLiveSpeech({ + text: phrase, + apiKey: ELEVENLABS_KEY, + outputFormat: "mp3_44100_128", + timeoutMs: 30_000, + }); + + const transcript = await mistralMediaUnderstandingProvider.transcribeAudio?.({ + buffer: audio, + fileName: "mistral-live.mp3", + mime: "audio/mpeg", + apiKey: MISTRAL_KEY, + timeoutMs: 60_000, + }); + + const normalized = normalizeTranscriptForMatch(transcript?.text ?? ""); + expect(normalized).toContain("openclaw"); + expect(normalized).toContain("mistral"); + }, 90_000); + + it("streams realtime STT through the registered transcription provider", async () => { + const provider = buildMistralRealtimeTranscriptionProvider(); + const phrase = "Testing OpenClaw Mistral realtime transcription integration OK."; + const speech = await synthesizeElevenLabsLiveSpeech({ + text: phrase, + apiKey: ELEVENLABS_KEY, + outputFormat: "ulaw_8000", + timeoutMs: 30_000, + }); + const transcripts: string[] = []; + const partials: string[] = []; + const errors: Error[] = []; + const session = provider.createSession({ + providerConfig: { + apiKey: MISTRAL_KEY, + sampleRate: 8000, + encoding: "pcm_mulaw", + targetStreamingDelayMs: 800, + }, + onPartial: (partial) => partials.push(partial), + onTranscript: (transcript) => transcripts.push(transcript), + onError: (error) => errors.push(error), + }); + + try { + await session.connect(); + await streamAudioForLiveTest({ + audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]), + sendAudio: (chunk) => session.sendAudio(chunk), + }); + session.close(); + + await waitForLiveExpectation(() => { + if (errors[0]) { + throw errors[0]; + } + expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw"); + }, 60_000); + } finally { + session.close(); + } + + expect(partials.length + transcripts.length).toBeGreaterThan(0); + }, 90_000); +}); diff --git a/extensions/mistral/openclaw.plugin.json b/extensions/mistral/openclaw.plugin.json index 6f75022380e..6ad8939c2de 100644 --- a/extensions/mistral/openclaw.plugin.json +++ b/extensions/mistral/openclaw.plugin.json @@ -22,7 +22,8 @@ ], "contracts": { "memoryEmbeddingProviders": ["mistral"], - "mediaUnderstandingProviders": ["mistral"] + "mediaUnderstandingProviders": ["mistral"], + "realtimeTranscriptionProviders": ["mistral"] }, "mediaUnderstandingProviderMetadata": { "mistral": { diff --git a/extensions/mistral/package.json b/extensions/mistral/package.json index c49bad4118c..ef9903c5576 100644 --- a/extensions/mistral/package.json +++ b/extensions/mistral/package.json @@ -4,6 +4,9 @@ "private": true, "description": "OpenClaw Mistral provider plugin", "type": "module", + "dependencies": { + "ws": "^8.20.0" + }, "devDependencies": { "@openclaw/plugin-sdk": "workspace:*" }, diff --git a/extensions/mistral/realtime-transcription-provider.test.ts b/extensions/mistral/realtime-transcription-provider.test.ts new file mode 100644 index 00000000000..c0fd6c0cc77 --- /dev/null +++ b/extensions/mistral/realtime-transcription-provider.test.ts @@ -0,0 +1,60 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../src/config/types.openclaw.js"; +import { + __testing, + buildMistralRealtimeTranscriptionProvider, +} from "./realtime-transcription-provider.js"; + +describe("buildMistralRealtimeTranscriptionProvider", () => { + afterEach(() => { + vi.unstubAllEnvs(); + }); + + it("normalizes nested provider config", () => { + const provider = buildMistralRealtimeTranscriptionProvider(); + const resolved = provider.resolveConfig?.({ + cfg: {} as OpenClawConfig, + rawConfig: { + providers: { + mistral: { + apiKey: "mistral-key", + model: "voxtral-mini-transcribe-realtime-2602", + encoding: "g711_ulaw", + sample_rate: "8000", + target_streaming_delay_ms: "240", + }, + }, + }, + }); + + expect(resolved).toMatchObject({ + apiKey: "mistral-key", + model: "voxtral-mini-transcribe-realtime-2602", + encoding: "pcm_mulaw", + sampleRate: 8000, + targetStreamingDelayMs: 240, + }); + }); + + it("builds a Mistral realtime websocket URL", () => { + const url = __testing.toMistralRealtimeWsUrl({ + apiKey: "mistral-key", + baseUrl: "https://api.mistral.ai/v1", + model: "voxtral-mini-transcribe-realtime-2602", + providerConfig: {}, + sampleRate: 8000, + encoding: "pcm_mulaw", + targetStreamingDelayMs: 800, + }); + + expect(url).toContain("wss://api.mistral.ai/v1/audio/transcriptions/realtime?"); + expect(url).toContain("model=voxtral-mini-transcribe-realtime-2602"); + expect(url).toContain("target_streaming_delay_ms=800"); + }); + + it("requires an API key when creating sessions", () => { + vi.stubEnv("MISTRAL_API_KEY", ""); + const provider = buildMistralRealtimeTranscriptionProvider(); + expect(() => provider.createSession({ providerConfig: {} })).toThrow("Mistral API key missing"); + }); +}); diff --git a/extensions/mistral/realtime-transcription-provider.ts b/extensions/mistral/realtime-transcription-provider.ts new file mode 100644 index 00000000000..290d485c714 --- /dev/null +++ b/extensions/mistral/realtime-transcription-provider.ts @@ -0,0 +1,492 @@ +import { randomUUID } from "node:crypto"; +import { + captureWsEvent, + createDebugProxyWebSocketAgent, + resolveDebugProxySettings, +} from "openclaw/plugin-sdk/proxy-capture"; +import type { + RealtimeTranscriptionProviderConfig, + RealtimeTranscriptionProviderPlugin, + RealtimeTranscriptionSession, + RealtimeTranscriptionSessionCreateRequest, +} from "openclaw/plugin-sdk/realtime-transcription"; +import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input"; +import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; +import WebSocket from "ws"; + +type MistralRealtimeTranscriptionEncoding = + | "pcm_s16le" + | "pcm_s32le" + | "pcm_f16le" + | "pcm_f32le" + | "pcm_mulaw" + | "pcm_alaw"; + +type MistralRealtimeTranscriptionProviderConfig = { + apiKey?: string; + baseUrl?: string; + model?: string; + sampleRate?: number; + encoding?: MistralRealtimeTranscriptionEncoding; + targetStreamingDelayMs?: number; +}; + +type MistralRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & { + apiKey: string; + baseUrl: string; + model: string; + sampleRate: number; + encoding: MistralRealtimeTranscriptionEncoding; + targetStreamingDelayMs?: number; +}; + +type MistralRealtimeTranscriptionEvent = { + type?: string; + text?: string; + error?: { + message?: unknown; + code?: number; + }; +}; + +const MISTRAL_REALTIME_DEFAULT_BASE_URL = "wss://api.mistral.ai"; +const MISTRAL_REALTIME_DEFAULT_MODEL = "voxtral-mini-transcribe-realtime-2602"; +const MISTRAL_REALTIME_DEFAULT_SAMPLE_RATE = 8000; +const MISTRAL_REALTIME_DEFAULT_ENCODING: MistralRealtimeTranscriptionEncoding = "pcm_mulaw"; +const MISTRAL_REALTIME_DEFAULT_DELAY_MS = 800; +const MISTRAL_REALTIME_CONNECT_TIMEOUT_MS = 10_000; +const MISTRAL_REALTIME_CLOSE_TIMEOUT_MS = 5_000; +const MISTRAL_REALTIME_MAX_RECONNECT_ATTEMPTS = 5; +const MISTRAL_REALTIME_RECONNECT_DELAY_MS = 1000; +const MISTRAL_REALTIME_MAX_QUEUED_BYTES = 2 * 1024 * 1024; + +function readRecord(value: unknown): Record | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +function readNestedMistralConfig(rawConfig: RealtimeTranscriptionProviderConfig) { + const raw = readRecord(rawConfig); + const providers = readRecord(raw?.providers); + return readRecord(providers?.mistral ?? raw?.mistral ?? raw) ?? {}; +} + +function readFiniteNumber(value: unknown): number | undefined { + const next = + typeof value === "number" + ? value + : typeof value === "string" + ? Number.parseFloat(value) + : undefined; + return Number.isFinite(next) ? next : undefined; +} + +function normalizeMistralEncoding( + value: unknown, +): MistralRealtimeTranscriptionEncoding | undefined { + const normalized = normalizeOptionalString(value)?.toLowerCase(); + if (!normalized) { + return undefined; + } + switch (normalized) { + case "pcm": + case "linear16": + case "pcm_s16le": + return "pcm_s16le"; + case "pcm_s32le": + case "pcm_f16le": + case "pcm_f32le": + return normalized; + case "mulaw": + case "ulaw": + case "g711_ulaw": + case "g711-mulaw": + case "pcm_mulaw": + return "pcm_mulaw"; + case "alaw": + case "g711_alaw": + case "g711-alaw": + case "pcm_alaw": + return "pcm_alaw"; + default: + throw new Error(`Invalid Mistral realtime transcription encoding: ${normalized}`); + } +} + +function normalizeMistralRealtimeBaseUrl(value?: string): string { + const raw = normalizeOptionalString(value ?? process.env.MISTRAL_REALTIME_BASE_URL); + if (!raw) { + return MISTRAL_REALTIME_DEFAULT_BASE_URL; + } + const url = new URL(raw); + url.protocol = + url.protocol === "http:" ? "ws:" : url.protocol === "https:" ? "wss:" : url.protocol; + url.pathname = url.pathname.replace(/\/v1\/?$/, "").replace(/\/+$/, ""); + return url.toString().replace(/\/+$/, ""); +} + +function toMistralRealtimeWsUrl(config: MistralRealtimeTranscriptionSessionConfig): string { + const base = new URL(`${normalizeMistralRealtimeBaseUrl(config.baseUrl)}/`); + const url = new URL("v1/audio/transcriptions/realtime", base); + url.searchParams.set("model", config.model); + if (config.targetStreamingDelayMs != null) { + url.searchParams.set("target_streaming_delay_ms", String(config.targetStreamingDelayMs)); + } + return url.toString(); +} + +function normalizeProviderConfig( + config: RealtimeTranscriptionProviderConfig, +): MistralRealtimeTranscriptionProviderConfig { + const raw = readNestedMistralConfig(config); + return { + apiKey: normalizeResolvedSecretInputString({ + value: raw.apiKey, + path: "plugins.entries.voice-call.config.streaming.providers.mistral.apiKey", + }), + baseUrl: normalizeOptionalString(raw.baseUrl), + model: normalizeOptionalString(raw.model ?? raw.sttModel), + sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate), + encoding: normalizeMistralEncoding(raw.encoding), + targetStreamingDelayMs: readFiniteNumber( + raw.targetStreamingDelayMs ?? raw.target_streaming_delay_ms ?? raw.delayMs, + ), + }; +} + +function rawWsDataToBuffer(data: WebSocket.RawData): Buffer { + if (Buffer.isBuffer(data)) { + return data; + } + if (Array.isArray(data)) { + return Buffer.concat(data); + } + return Buffer.from(data); +} + +function readErrorDetail(event: MistralRealtimeTranscriptionEvent): string { + const message = event.error?.message; + if (typeof message === "string") { + return message; + } + if (message && typeof message === "object") { + return JSON.stringify(message); + } + if (typeof event.error?.code === "number") { + return `Mistral realtime transcription error (${event.error.code})`; + } + return "Mistral realtime transcription error"; +} + +class MistralRealtimeTranscriptionSession implements RealtimeTranscriptionSession { + private ws: WebSocket | null = null; + private connected = false; + private ready = false; + private closed = false; + private reconnectAttempts = 0; + private queuedAudio: Buffer[] = []; + private queuedBytes = 0; + private closeTimer: ReturnType | undefined; + private partialText = ""; + private reconnecting = false; + private readonly flowId = randomUUID(); + + constructor(private readonly config: MistralRealtimeTranscriptionSessionConfig) {} + + async connect(): Promise { + this.closed = false; + this.reconnectAttempts = 0; + await this.doConnect(); + } + + sendAudio(audio: Buffer): void { + if (this.closed || audio.byteLength === 0) { + return; + } + if (this.ws?.readyState === WebSocket.OPEN && this.ready) { + this.sendJson({ + type: "input_audio.append", + audio: audio.toString("base64"), + }); + return; + } + this.queueAudio(audio); + } + + close(): void { + this.closed = true; + this.connected = false; + this.queuedAudio = []; + this.queuedBytes = 0; + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + this.forceClose(); + return; + } + this.sendJson({ type: "input_audio.flush" }); + this.sendJson({ type: "input_audio.end" }); + this.closeTimer = setTimeout(() => this.forceClose(), MISTRAL_REALTIME_CLOSE_TIMEOUT_MS); + } + + isConnected(): boolean { + return this.connected; + } + + private async doConnect(): Promise { + await new Promise((resolve, reject) => { + const url = toMistralRealtimeWsUrl(this.config); + const debugProxy = resolveDebugProxySettings(); + const proxyAgent = createDebugProxyWebSocketAgent(debugProxy); + let settled = false; + let opened = false; + const finishConnect = () => { + if (settled) { + return; + } + settled = true; + clearTimeout(connectTimeout); + this.ready = true; + this.flushQueuedAudio(); + resolve(); + }; + const failConnect = (error: Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(connectTimeout); + this.config.onError?.(error); + this.closed = true; + this.forceClose(); + reject(error); + }; + this.ready = false; + this.ws = new WebSocket(url, { + headers: { + Authorization: `Bearer ${this.config.apiKey}`, + }, + ...(proxyAgent ? { agent: proxyAgent } : {}), + }); + + const connectTimeout = setTimeout(() => { + failConnect(new Error("Mistral realtime transcription connection timeout")); + }, MISTRAL_REALTIME_CONNECT_TIMEOUT_MS); + + this.ws.on("open", () => { + opened = true; + this.connected = true; + this.reconnectAttempts = 0; + captureWsEvent({ + url, + direction: "local", + kind: "ws-open", + flowId: this.flowId, + meta: { provider: "mistral", capability: "realtime-transcription" }, + }); + }); + + this.ws.on("message", (data) => { + const payload = rawWsDataToBuffer(data); + captureWsEvent({ + url, + direction: "inbound", + kind: "ws-frame", + flowId: this.flowId, + payload, + meta: { provider: "mistral", capability: "realtime-transcription" }, + }); + try { + const event = JSON.parse(payload.toString()) as MistralRealtimeTranscriptionEvent; + if (event.type === "session.created") { + this.sendJson({ + type: "session.update", + session: { + audio_format: { + encoding: this.config.encoding, + sample_rate: this.config.sampleRate, + }, + }, + }); + finishConnect(); + return; + } + if (!this.ready && event.type === "error") { + failConnect(new Error(readErrorDetail(event))); + return; + } + this.handleEvent(event); + } catch (error) { + this.config.onError?.(error instanceof Error ? error : new Error(String(error))); + } + }); + + this.ws.on("error", (error) => { + captureWsEvent({ + url, + direction: "local", + kind: "error", + flowId: this.flowId, + errorText: error instanceof Error ? error.message : String(error), + meta: { provider: "mistral", capability: "realtime-transcription" }, + }); + if (!opened) { + failConnect(error instanceof Error ? error : new Error(String(error))); + return; + } + this.config.onError?.(error instanceof Error ? error : new Error(String(error))); + }); + + this.ws.on("close", () => { + clearTimeout(connectTimeout); + this.connected = false; + this.ready = false; + if (this.closeTimer) { + clearTimeout(this.closeTimer); + this.closeTimer = undefined; + } + if (this.closed || !opened || !settled) { + return; + } + void this.attemptReconnect(); + }); + }); + } + + private async attemptReconnect(): Promise { + if (this.closed || this.reconnecting) { + return; + } + if (this.reconnectAttempts >= MISTRAL_REALTIME_MAX_RECONNECT_ATTEMPTS) { + this.config.onError?.(new Error("Mistral realtime transcription reconnect limit reached")); + return; + } + this.reconnectAttempts += 1; + const delay = MISTRAL_REALTIME_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1); + this.reconnecting = true; + try { + await new Promise((resolve) => setTimeout(resolve, delay)); + if (!this.closed) { + await this.doConnect(); + } + } catch { + if (!this.closed) { + this.reconnecting = false; + await this.attemptReconnect(); + return; + } + } finally { + this.reconnecting = false; + } + } + + private handleEvent(event: MistralRealtimeTranscriptionEvent): void { + switch (event.type) { + case "transcription.text.delta": + if (event.text) { + this.partialText += event.text; + this.config.onPartial?.(this.partialText); + } + return; + case "transcription.segment": + if (event.text) { + this.config.onTranscript?.(event.text); + this.partialText = ""; + } + return; + case "transcription.done": + if (this.partialText.trim()) { + this.config.onTranscript?.(this.partialText); + this.partialText = ""; + } + this.forceClose(); + return; + case "error": + this.config.onError?.(new Error(readErrorDetail(event))); + return; + default: + return; + } + } + + private queueAudio(audio: Buffer): void { + this.queuedAudio.push(Buffer.from(audio)); + this.queuedBytes += audio.byteLength; + while (this.queuedBytes > MISTRAL_REALTIME_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) { + const dropped = this.queuedAudio.shift(); + this.queuedBytes -= dropped?.byteLength ?? 0; + } + } + + private flushQueuedAudio(): void { + for (const audio of this.queuedAudio) { + this.sendJson({ + type: "input_audio.append", + audio: audio.toString("base64"), + }); + } + this.queuedAudio = []; + this.queuedBytes = 0; + } + + private sendJson(event: unknown): void { + if (this.ws?.readyState !== WebSocket.OPEN) { + return; + } + const payload = JSON.stringify(event); + captureWsEvent({ + url: toMistralRealtimeWsUrl(this.config), + direction: "outbound", + kind: "ws-frame", + flowId: this.flowId, + payload, + meta: { provider: "mistral", capability: "realtime-transcription" }, + }); + this.ws.send(payload); + } + + private forceClose(): void { + if (this.closeTimer) { + clearTimeout(this.closeTimer); + this.closeTimer = undefined; + } + this.connected = false; + this.ready = false; + if (this.ws) { + this.ws.close(1000, "Transcription session closed"); + this.ws = null; + } + } +} + +export function buildMistralRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin { + return { + id: "mistral", + label: "Mistral Realtime Transcription", + aliases: ["mistral-realtime", "voxtral-realtime"], + autoSelectOrder: 45, + resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig), + isConfigured: ({ providerConfig }) => + Boolean(normalizeProviderConfig(providerConfig).apiKey || process.env.MISTRAL_API_KEY), + createSession: (req) => { + const config = normalizeProviderConfig(req.providerConfig); + const apiKey = config.apiKey || process.env.MISTRAL_API_KEY; + if (!apiKey) { + throw new Error("Mistral API key missing"); + } + return new MistralRealtimeTranscriptionSession({ + ...req, + apiKey, + baseUrl: normalizeMistralRealtimeBaseUrl(config.baseUrl), + model: config.model ?? MISTRAL_REALTIME_DEFAULT_MODEL, + sampleRate: config.sampleRate ?? MISTRAL_REALTIME_DEFAULT_SAMPLE_RATE, + encoding: config.encoding ?? MISTRAL_REALTIME_DEFAULT_ENCODING, + targetStreamingDelayMs: config.targetStreamingDelayMs ?? MISTRAL_REALTIME_DEFAULT_DELAY_MS, + }); + }, + }; +} + +export const __testing = { + normalizeProviderConfig, + toMistralRealtimeWsUrl, +}; diff --git a/extensions/mistral/test-api.ts b/extensions/mistral/test-api.ts index 14e820308cf..007e600955f 100644 --- a/extensions/mistral/test-api.ts +++ b/extensions/mistral/test-api.ts @@ -1 +1,2 @@ export { mistralMediaUnderstandingProvider } from "./media-understanding-provider.js"; +export { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js"; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1b8dd88bc2c..d9a57b3e825 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -403,6 +403,10 @@ importers: version: link:../../packages/plugin-sdk extensions/deepgram: + dependencies: + ws: + specifier: ^8.20.0 + version: 8.20.0 devDependencies: '@openclaw/plugin-sdk': specifier: workspace:* @@ -514,6 +518,10 @@ importers: version: link:../../packages/plugin-sdk extensions/elevenlabs: + dependencies: + ws: + specifier: ^8.20.0 + version: 8.20.0 devDependencies: '@openclaw/plugin-sdk': specifier: workspace:* @@ -836,6 +844,10 @@ importers: version: link:../../packages/plugin-sdk extensions/mistral: + dependencies: + ws: + specifier: ^8.20.0 + version: 8.20.0 devDependencies: '@openclaw/plugin-sdk': specifier: workspace:* diff --git a/src/media-understanding/defaults.test.ts b/src/media-understanding/defaults.test.ts index 2e73d32cf54..1effd6da0bc 100644 --- a/src/media-understanding/defaults.test.ts +++ b/src/media-understanding/defaults.test.ts @@ -36,6 +36,7 @@ describe("resolveAutoMediaKeyProviders", () => { "xai", "deepgram", "google", + "elevenlabs", "mistral", ]); }); diff --git a/test/helpers/stt-live-audio.ts b/test/helpers/stt-live-audio.ts new file mode 100644 index 00000000000..d03b5d99497 --- /dev/null +++ b/test/helpers/stt-live-audio.ts @@ -0,0 +1,77 @@ +const DEFAULT_ELEVENLABS_BASE_URL = "https://api.elevenlabs.io"; +const DEFAULT_ELEVENLABS_VOICE_ID = "pMsXgVXv3BLzUgSXRplE"; +const DEFAULT_ELEVENLABS_TTS_MODEL_ID = "eleven_multilingual_v2"; + +export function normalizeTranscriptForMatch(value: string): string { + return value.toLowerCase().replace(/[^a-z0-9]+/g, ""); +} + +export async function waitForLiveExpectation(expectation: () => void, timeoutMs = 30_000) { + const started = Date.now(); + let lastError: unknown; + while (Date.now() - started < timeoutMs) { + try { + expectation(); + return; + } catch (error) { + lastError = error; + await new Promise((resolve) => setTimeout(resolve, 100)); + } + } + throw lastError; +} + +export async function synthesizeElevenLabsLiveSpeech(params: { + text: string; + apiKey: string; + outputFormat: "mp3_44100_128" | "ulaw_8000"; + timeoutMs?: number; +}): Promise { + const baseUrl = process.env.ELEVENLABS_BASE_URL?.trim() || DEFAULT_ELEVENLABS_BASE_URL; + const voiceId = process.env.ELEVENLABS_LIVE_VOICE_ID?.trim() || DEFAULT_ELEVENLABS_VOICE_ID; + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), params.timeoutMs ?? 30_000); + try { + const url = new URL(`${baseUrl.replace(/\/+$/, "")}/v1/text-to-speech/${voiceId}`); + url.searchParams.set("output_format", params.outputFormat); + const response = await fetch(url, { + method: "POST", + headers: { + "xi-api-key": params.apiKey, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + text: params.text, + model_id: DEFAULT_ELEVENLABS_TTS_MODEL_ID, + voice_settings: { + stability: 0.5, + similarity_boost: 0.75, + style: 0, + use_speaker_boost: true, + speed: 1, + }, + }), + signal: controller.signal, + }); + if (!response.ok) { + throw new Error(`ElevenLabs live TTS failed (${response.status})`); + } + return Buffer.from(await response.arrayBuffer()); + } finally { + clearTimeout(timeout); + } +} + +export async function streamAudioForLiveTest(params: { + audio: Buffer; + sendAudio: (chunk: Buffer) => void; + chunkSize?: number; + delayMs?: number; +}) { + const chunkSize = params.chunkSize ?? 160; + const delayMs = params.delayMs ?? 5; + for (let offset = 0; offset < params.audio.byteLength; offset += chunkSize) { + params.sendAudio(params.audio.subarray(offset, offset + chunkSize)); + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } +}