feat(providers): add streaming stt providers

This commit is contained in:
Peter Steinberger
2026-04-23 03:05:44 +01:00
parent 5b68092351
commit 51ed22e608
32 changed files with 2399 additions and 16 deletions

View File

@@ -1,8 +1,16 @@
import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js";
import {
normalizeTranscriptForMatch,
streamAudioForLiveTest,
synthesizeElevenLabsLiveSpeech,
waitForLiveExpectation,
} from "../../test/helpers/stt-live-audio.js";
import { transcribeDeepgramAudio } from "./audio.js";
import { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
const DEEPGRAM_KEY = process.env.DEEPGRAM_API_KEY ?? "";
const ELEVENLABS_KEY = process.env.ELEVENLABS_API_KEY ?? "";
const DEEPGRAM_MODEL = process.env.DEEPGRAM_MODEL?.trim() || "nova-3";
const DEEPGRAM_BASE_URL = process.env.DEEPGRAM_BASE_URL?.trim();
const SAMPLE_URL =
@@ -41,4 +49,50 @@ describeLive("deepgram live", () => {
});
expect(result.text.trim().length).toBeGreaterThan(0);
}, 30000);
it("streams realtime STT through the registered transcription provider", async () => {
if (!ELEVENLABS_KEY) {
throw new Error("ELEVENLABS_API_KEY required to synthesize live realtime STT input");
}
const provider = buildDeepgramRealtimeTranscriptionProvider();
const phrase = "Testing OpenClaw Deepgram realtime transcription integration OK.";
const speech = await synthesizeElevenLabsLiveSpeech({
text: phrase,
apiKey: ELEVENLABS_KEY,
outputFormat: "ulaw_8000",
timeoutMs: 30_000,
});
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = provider.createSession({
providerConfig: {
apiKey: DEEPGRAM_KEY,
language: "en-US",
endpointingMs: 500,
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
});
try {
await session.connect();
await streamAudioForLiveTest({
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
sendAudio: (chunk) => session.sendAudio(chunk),
});
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
expect(partials.length + transcripts.length).toBeGreaterThan(0);
}, 90_000);
});

View File

@@ -1,5 +1,6 @@
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
import { deepgramMediaUnderstandingProvider } from "./media-understanding-provider.js";
import { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
export default definePluginEntry({
id: "deepgram",
@@ -7,5 +8,6 @@ export default definePluginEntry({
description: "Bundled Deepgram audio transcription provider",
register(api) {
api.registerMediaUnderstandingProvider(deepgramMediaUnderstandingProvider);
api.registerRealtimeTranscriptionProvider(buildDeepgramRealtimeTranscriptionProvider());
},
});

View File

@@ -5,7 +5,8 @@
"deepgram": ["DEEPGRAM_API_KEY"]
},
"contracts": {
"mediaUnderstandingProviders": ["deepgram"]
"mediaUnderstandingProviders": ["deepgram"],
"realtimeTranscriptionProviders": ["deepgram"]
},
"mediaUnderstandingProviderMetadata": {
"deepgram": {

View File

@@ -4,6 +4,9 @@
"private": true,
"description": "OpenClaw Deepgram media-understanding provider",
"type": "module",
"dependencies": {
"ws": "^8.20.0"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"
},

View File

@@ -0,0 +1,68 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../src/config/types.openclaw.js";
import {
__testing,
buildDeepgramRealtimeTranscriptionProvider,
} from "./realtime-transcription-provider.js";
describe("buildDeepgramRealtimeTranscriptionProvider", () => {
afterEach(() => {
vi.unstubAllEnvs();
});
it("normalizes nested provider config", () => {
const provider = buildDeepgramRealtimeTranscriptionProvider();
const resolved = provider.resolveConfig?.({
cfg: {} as OpenClawConfig,
rawConfig: {
providers: {
deepgram: {
apiKey: "dg-key",
model: "nova-3",
encoding: "g711_ulaw",
sample_rate: "8000",
interim_results: "true",
endpointing: "500",
language: "en-US",
},
},
},
});
expect(resolved).toMatchObject({
apiKey: "dg-key",
model: "nova-3",
encoding: "mulaw",
sampleRate: 8000,
interimResults: true,
endpointingMs: 500,
language: "en-US",
});
});
it("builds a Deepgram listen websocket URL", () => {
const url = __testing.toDeepgramRealtimeWsUrl({
apiKey: "dg-key",
baseUrl: "https://api.deepgram.com/v1",
model: "nova-3",
providerConfig: {},
sampleRate: 8000,
encoding: "mulaw",
interimResults: true,
endpointingMs: 800,
});
expect(url).toContain("wss://api.deepgram.com/v1/listen?");
expect(url).toContain("model=nova-3");
expect(url).toContain("encoding=mulaw");
expect(url).toContain("sample_rate=8000");
});
it("requires an API key when creating sessions", () => {
vi.stubEnv("DEEPGRAM_API_KEY", "");
const provider = buildDeepgramRealtimeTranscriptionProvider();
expect(() => provider.createSession({ providerConfig: {} })).toThrow(
"Deepgram API key missing",
);
});
});

View File

@@ -0,0 +1,504 @@
import { randomUUID } from "node:crypto";
import {
captureWsEvent,
createDebugProxyWebSocketAgent,
resolveDebugProxySettings,
} from "openclaw/plugin-sdk/proxy-capture";
import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
} from "openclaw/plugin-sdk/realtime-transcription";
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import WebSocket from "ws";
import { DEFAULT_DEEPGRAM_AUDIO_BASE_URL, DEFAULT_DEEPGRAM_AUDIO_MODEL } from "./audio.js";
type DeepgramRealtimeTranscriptionEncoding = "linear16" | "mulaw" | "alaw";
type DeepgramRealtimeTranscriptionProviderConfig = {
apiKey?: string;
baseUrl?: string;
model?: string;
language?: string;
sampleRate?: number;
encoding?: DeepgramRealtimeTranscriptionEncoding;
interimResults?: boolean;
endpointingMs?: number;
};
type DeepgramRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
apiKey: string;
baseUrl: string;
model: string;
sampleRate: number;
encoding: DeepgramRealtimeTranscriptionEncoding;
interimResults: boolean;
endpointingMs: number;
language?: string;
};
type DeepgramRealtimeTranscriptionEvent = {
type?: string;
channel?: {
alternatives?: Array<{
transcript?: string;
}>;
};
is_final?: boolean;
speech_final?: boolean;
error?: unknown;
message?: string;
};
const DEEPGRAM_REALTIME_DEFAULT_SAMPLE_RATE = 8000;
const DEEPGRAM_REALTIME_DEFAULT_ENCODING: DeepgramRealtimeTranscriptionEncoding = "mulaw";
const DEEPGRAM_REALTIME_DEFAULT_ENDPOINTING_MS = 800;
const DEEPGRAM_REALTIME_CONNECT_TIMEOUT_MS = 10_000;
const DEEPGRAM_REALTIME_CLOSE_TIMEOUT_MS = 5_000;
const DEEPGRAM_REALTIME_MAX_RECONNECT_ATTEMPTS = 5;
const DEEPGRAM_REALTIME_RECONNECT_DELAY_MS = 1000;
const DEEPGRAM_REALTIME_MAX_QUEUED_BYTES = 2 * 1024 * 1024;
function readRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function readNestedDeepgramConfig(rawConfig: RealtimeTranscriptionProviderConfig) {
const raw = readRecord(rawConfig);
const providers = readRecord(raw?.providers);
return readRecord(providers?.deepgram ?? raw?.deepgram ?? raw) ?? {};
}
function readFiniteNumber(value: unknown): number | undefined {
const next =
typeof value === "number"
? value
: typeof value === "string"
? Number.parseFloat(value)
: undefined;
return Number.isFinite(next) ? next : undefined;
}
function readBoolean(value: unknown): boolean | undefined {
if (typeof value === "boolean") {
return value;
}
if (typeof value !== "string") {
return undefined;
}
const normalized = value.trim().toLowerCase();
if (["1", "true", "yes", "on"].includes(normalized)) {
return true;
}
if (["0", "false", "no", "off"].includes(normalized)) {
return false;
}
return undefined;
}
function normalizeDeepgramEncoding(
value: unknown,
): DeepgramRealtimeTranscriptionEncoding | undefined {
const normalized = normalizeOptionalString(value)?.toLowerCase();
if (!normalized) {
return undefined;
}
if (normalized === "pcm" || normalized === "pcm_s16le" || normalized === "linear16") {
return "linear16";
}
if (normalized === "ulaw" || normalized === "g711_ulaw" || normalized === "g711-mulaw") {
return "mulaw";
}
if (normalized === "g711_alaw" || normalized === "g711-alaw") {
return "alaw";
}
if (normalized === "mulaw" || normalized === "alaw") {
return normalized;
}
throw new Error(`Invalid Deepgram realtime transcription encoding: ${normalized}`);
}
function normalizeDeepgramRealtimeBaseUrl(value?: string): string {
return (
normalizeOptionalString(value ?? process.env.DEEPGRAM_BASE_URL) ??
DEFAULT_DEEPGRAM_AUDIO_BASE_URL
);
}
function toDeepgramRealtimeWsUrl(config: DeepgramRealtimeTranscriptionSessionConfig): string {
const url = new URL(normalizeDeepgramRealtimeBaseUrl(config.baseUrl));
url.protocol = url.protocol === "http:" ? "ws:" : "wss:";
url.pathname = `${url.pathname.replace(/\/+$/, "")}/listen`;
url.searchParams.set("model", config.model);
url.searchParams.set("encoding", config.encoding);
url.searchParams.set("sample_rate", String(config.sampleRate));
url.searchParams.set("channels", "1");
url.searchParams.set("interim_results", String(config.interimResults));
url.searchParams.set("endpointing", String(config.endpointingMs));
if (config.language) {
url.searchParams.set("language", config.language);
}
return url.toString();
}
function normalizeProviderConfig(
config: RealtimeTranscriptionProviderConfig,
): DeepgramRealtimeTranscriptionProviderConfig {
const raw = readNestedDeepgramConfig(config);
return {
apiKey: normalizeResolvedSecretInputString({
value: raw.apiKey,
path: "plugins.entries.voice-call.config.streaming.providers.deepgram.apiKey",
}),
baseUrl: normalizeOptionalString(raw.baseUrl),
model: normalizeOptionalString(raw.model ?? raw.sttModel),
language: normalizeOptionalString(raw.language),
sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate),
encoding: normalizeDeepgramEncoding(raw.encoding),
interimResults: readBoolean(raw.interimResults ?? raw.interim_results),
endpointingMs: readFiniteNumber(raw.endpointingMs ?? raw.endpointing ?? raw.silenceDurationMs),
};
}
function rawWsDataToBuffer(data: WebSocket.RawData): Buffer {
if (Buffer.isBuffer(data)) {
return data;
}
if (Array.isArray(data)) {
return Buffer.concat(data);
}
return Buffer.from(data);
}
function readErrorDetail(value: unknown): string {
if (typeof value === "string") {
return value;
}
const record = readRecord(value);
const message = normalizeOptionalString(record?.message);
const code = normalizeOptionalString(record?.code);
return message ?? code ?? "Deepgram realtime transcription error";
}
function readTranscriptText(event: DeepgramRealtimeTranscriptionEvent): string | undefined {
return normalizeOptionalString(event.channel?.alternatives?.[0]?.transcript);
}
class DeepgramRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private ws: WebSocket | null = null;
private connected = false;
private closed = false;
private reconnectAttempts = 0;
private queuedAudio: Buffer[] = [];
private queuedBytes = 0;
private closeTimer: ReturnType<typeof setTimeout> | undefined;
private lastTranscript: string | undefined;
private speechStarted = false;
private reconnecting = false;
private readonly flowId = randomUUID();
constructor(private readonly config: DeepgramRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.closed || audio.byteLength === 0) {
return;
}
if (this.ws?.readyState === WebSocket.OPEN) {
this.sendAudioFrame(audio);
return;
}
this.queueAudio(audio);
}
close(): void {
this.closed = true;
this.connected = false;
this.queuedAudio = [];
this.queuedBytes = 0;
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
this.forceClose();
return;
}
this.sendEvent({ type: "Finalize" });
this.closeTimer = setTimeout(() => this.forceClose(), DEEPGRAM_REALTIME_CLOSE_TIMEOUT_MS);
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
const url = toDeepgramRealtimeWsUrl(this.config);
const debugProxy = resolveDebugProxySettings();
const proxyAgent = createDebugProxyWebSocketAgent(debugProxy);
let settled = false;
let opened = false;
const finishConnect = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.flushQueuedAudio();
resolve();
};
const failConnect = (error: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.config.onError?.(error);
this.closed = true;
this.forceClose();
reject(error);
};
this.ws = new WebSocket(url, {
headers: {
Authorization: `Token ${this.config.apiKey}`,
},
...(proxyAgent ? { agent: proxyAgent } : {}),
});
const connectTimeout = setTimeout(() => {
failConnect(new Error("Deepgram realtime transcription connection timeout"));
}, DEEPGRAM_REALTIME_CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
opened = true;
this.connected = true;
this.reconnectAttempts = 0;
captureWsEvent({
url,
direction: "local",
kind: "ws-open",
flowId: this.flowId,
meta: { provider: "deepgram", capability: "realtime-transcription" },
});
finishConnect();
});
this.ws.on("message", (data) => {
const payload = rawWsDataToBuffer(data);
captureWsEvent({
url,
direction: "inbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "deepgram", capability: "realtime-transcription" },
});
try {
this.handleEvent(JSON.parse(payload.toString()) as DeepgramRealtimeTranscriptionEvent);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
captureWsEvent({
url,
direction: "local",
kind: "error",
flowId: this.flowId,
errorText: error instanceof Error ? error.message : String(error),
meta: { provider: "deepgram", capability: "realtime-transcription" },
});
if (!opened) {
failConnect(error instanceof Error ? error : new Error(String(error)));
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", () => {
clearTimeout(connectTimeout);
this.connected = false;
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
if (this.closed || !opened || !settled) {
return;
}
void this.attemptReconnect();
});
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed || this.reconnecting) {
return;
}
if (this.reconnectAttempts >= DEEPGRAM_REALTIME_MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(new Error("Deepgram realtime transcription reconnect limit reached"));
return;
}
this.reconnectAttempts += 1;
const delay = DEEPGRAM_REALTIME_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
this.reconnecting = true;
try {
await new Promise((resolve) => setTimeout(resolve, delay));
if (!this.closed) {
await this.doConnect();
}
} catch {
if (!this.closed) {
this.reconnecting = false;
await this.attemptReconnect();
return;
}
} finally {
this.reconnecting = false;
}
}
private handleEvent(event: DeepgramRealtimeTranscriptionEvent): void {
switch (event.type) {
case "Results": {
const text = readTranscriptText(event);
if (!text) {
return;
}
if (!this.speechStarted) {
this.speechStarted = true;
this.config.onSpeechStart?.();
}
if (event.is_final || event.speech_final) {
this.emitTranscript(text);
if (event.speech_final) {
this.speechStarted = false;
}
return;
}
this.config.onPartial?.(text);
return;
}
case "SpeechStarted":
this.speechStarted = true;
this.config.onSpeechStart?.();
return;
case "Error":
case "error":
this.config.onError?.(new Error(readErrorDetail(event.error ?? event.message)));
return;
default:
return;
}
}
private emitTranscript(text: string): void {
if (text === this.lastTranscript) {
return;
}
this.lastTranscript = text;
this.config.onTranscript?.(text);
}
private queueAudio(audio: Buffer): void {
this.queuedAudio.push(Buffer.from(audio));
this.queuedBytes += audio.byteLength;
while (this.queuedBytes > DEEPGRAM_REALTIME_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) {
const dropped = this.queuedAudio.shift();
this.queuedBytes -= dropped?.byteLength ?? 0;
}
}
private flushQueuedAudio(): void {
for (const audio of this.queuedAudio) {
this.sendAudioFrame(audio);
}
this.queuedAudio = [];
this.queuedBytes = 0;
}
private sendAudioFrame(audio: Buffer): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
this.queueAudio(audio);
return;
}
captureWsEvent({
url: toDeepgramRealtimeWsUrl(this.config),
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload: audio,
meta: { provider: "deepgram", capability: "realtime-transcription" },
});
this.ws.send(audio);
}
private sendEvent(event: unknown): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
const payload = JSON.stringify(event);
captureWsEvent({
url: toDeepgramRealtimeWsUrl(this.config),
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "deepgram", capability: "realtime-transcription" },
});
this.ws.send(payload);
}
private forceClose(): void {
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
this.connected = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
}
export function buildDeepgramRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
return {
id: "deepgram",
label: "Deepgram Realtime Transcription",
aliases: ["deepgram-realtime", "nova-3-streaming"],
autoSelectOrder: 35,
resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig),
isConfigured: ({ providerConfig }) =>
Boolean(normalizeProviderConfig(providerConfig).apiKey || process.env.DEEPGRAM_API_KEY),
createSession: (req) => {
const config = normalizeProviderConfig(req.providerConfig);
const apiKey = config.apiKey || process.env.DEEPGRAM_API_KEY;
if (!apiKey) {
throw new Error("Deepgram API key missing");
}
return new DeepgramRealtimeTranscriptionSession({
...req,
apiKey,
baseUrl: normalizeDeepgramRealtimeBaseUrl(config.baseUrl),
model: config.model ?? DEFAULT_DEEPGRAM_AUDIO_MODEL,
sampleRate: config.sampleRate ?? DEEPGRAM_REALTIME_DEFAULT_SAMPLE_RATE,
encoding: config.encoding ?? DEEPGRAM_REALTIME_DEFAULT_ENCODING,
interimResults: config.interimResults ?? true,
endpointingMs: config.endpointingMs ?? DEEPGRAM_REALTIME_DEFAULT_ENDPOINTING_MS,
language: config.language,
});
},
};
}
export const __testing = {
normalizeProviderConfig,
toDeepgramRealtimeWsUrl,
};

View File

@@ -1 +1,2 @@
export { deepgramMediaUnderstandingProvider } from "./media-understanding-provider.js";
export { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";

View File

@@ -0,0 +1,84 @@
import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js";
import {
normalizeTranscriptForMatch,
streamAudioForLiveTest,
synthesizeElevenLabsLiveSpeech,
waitForLiveExpectation,
} from "../../test/helpers/stt-live-audio.js";
import { elevenLabsMediaUnderstandingProvider } from "./media-understanding-provider.js";
import { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
const ELEVENLABS_KEY = process.env.ELEVENLABS_API_KEY ?? "";
const LIVE = isLiveTestEnabled(["ELEVENLABS_LIVE_TEST"]);
const describeLive = LIVE && ELEVENLABS_KEY ? describe : describe.skip;
describeLive("elevenlabs plugin live", () => {
it("transcribes synthesized speech through the media provider", async () => {
const phrase = "Testing OpenClaw ElevenLabs speech to text integration OK.";
const audio = await synthesizeElevenLabsLiveSpeech({
text: phrase,
apiKey: ELEVENLABS_KEY,
outputFormat: "mp3_44100_128",
timeoutMs: 30_000,
});
const transcript = await elevenLabsMediaUnderstandingProvider.transcribeAudio?.({
buffer: audio,
fileName: "elevenlabs-live.mp3",
mime: "audio/mpeg",
apiKey: ELEVENLABS_KEY,
timeoutMs: 60_000,
});
const normalized = normalizeTranscriptForMatch(transcript?.text ?? "");
expect(normalized).toContain("openclaw");
expect(normalized).toContain("elevenlabs");
}, 90_000);
it("streams realtime STT through the registered transcription provider", async () => {
const provider = buildElevenLabsRealtimeTranscriptionProvider();
const phrase = "Testing OpenClaw ElevenLabs realtime transcription integration OK.";
const speech = await synthesizeElevenLabsLiveSpeech({
text: phrase,
apiKey: ELEVENLABS_KEY,
outputFormat: "ulaw_8000",
timeoutMs: 30_000,
});
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = provider.createSession({
providerConfig: {
apiKey: ELEVENLABS_KEY,
audioFormat: "ulaw_8000",
sampleRate: 8000,
commitStrategy: "vad",
languageCode: "en",
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
});
try {
await session.connect();
await streamAudioForLiveTest({
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
sendAudio: (chunk) => session.sendAudio(chunk),
});
session.close();
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
expect(partials.length + transcripts.length).toBeGreaterThan(0);
}, 90_000);
});

View File

@@ -1,4 +1,6 @@
import { definePluginEntry } from "openclaw/plugin-sdk/plugin-entry";
import { elevenLabsMediaUnderstandingProvider } from "./media-understanding-provider.js";
import { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
import { buildElevenLabsSpeechProvider } from "./speech-provider.js";
export default definePluginEntry({
@@ -7,5 +9,7 @@ export default definePluginEntry({
description: "Bundled ElevenLabs speech provider",
register(api) {
api.registerSpeechProvider(buildElevenLabsSpeechProvider());
api.registerMediaUnderstandingProvider(elevenLabsMediaUnderstandingProvider);
api.registerRealtimeTranscriptionProvider(buildElevenLabsRealtimeTranscriptionProvider());
},
});

View File

@@ -0,0 +1,45 @@
import { describe, expect, it, vi } from "vitest";
import {
elevenLabsMediaUnderstandingProvider,
transcribeElevenLabsAudio,
} from "./media-understanding-provider.js";
describe("elevenLabsMediaUnderstandingProvider", () => {
it("has expected provider metadata", () => {
expect(elevenLabsMediaUnderstandingProvider.id).toBe("elevenlabs");
expect(elevenLabsMediaUnderstandingProvider.capabilities).toEqual(["audio"]);
expect(elevenLabsMediaUnderstandingProvider.defaultModels?.audio).toBe("scribe_v2");
expect(elevenLabsMediaUnderstandingProvider.transcribeAudio).toBeDefined();
});
it("posts multipart audio to ElevenLabs speech-to-text", async () => {
const fetchMock = vi
.fn<typeof fetch>()
.mockResolvedValue(new Response(JSON.stringify({ text: "hello" })));
const result = await transcribeElevenLabsAudio({
buffer: Buffer.from("audio"),
fileName: "voice.mp3",
mime: "audio/mpeg",
apiKey: "eleven-key",
model: "scribe_v2",
language: "en",
timeoutMs: 1000,
fetchFn: fetchMock,
});
expect(result).toEqual({ text: "hello", model: "scribe_v2" });
expect(fetchMock).toHaveBeenCalledWith(
"https://api.elevenlabs.io/v1/speech-to-text",
expect.objectContaining({
method: "POST",
headers: { "xi-api-key": "eleven-key" },
}),
);
const init = fetchMock.mock.calls[0]?.[1] as RequestInit;
const form = init.body as FormData;
expect(form.get("model_id")).toBe("scribe_v2");
expect(form.get("language_code")).toBe("en");
expect(form.get("file")).toBeInstanceOf(Blob);
});
});

View File

@@ -0,0 +1,108 @@
import path from "node:path";
import type {
AudioTranscriptionRequest,
AudioTranscriptionResult,
MediaUnderstandingProvider,
} from "openclaw/plugin-sdk/media-understanding";
import { normalizeElevenLabsBaseUrl } from "./shared.js";
const DEFAULT_ELEVENLABS_STT_MODEL = "scribe_v2";
function resolveUploadFileName(fileName?: string, mime?: string): string {
const trimmed = fileName?.trim();
const baseName = trimmed ? path.basename(trimmed) : "audio";
const lowerMime = mime?.trim().toLowerCase();
if (/\.aac$/i.test(baseName)) {
return `${baseName.slice(0, -4) || "audio"}.m4a`;
}
if (!path.extname(baseName) && lowerMime === "audio/aac") {
return `${baseName || "audio"}.m4a`;
}
return baseName;
}
async function readErrorDetail(res: Response): Promise<string | undefined> {
const text = (await res.text()).trim();
if (!text) {
return undefined;
}
try {
const json = JSON.parse(text) as {
detail?: { message?: string; detail?: string; status?: string; code?: string };
message?: string;
error?: string;
};
return (
json.message ??
json.detail?.message ??
json.detail?.detail ??
json.error ??
json.detail?.status ??
json.detail?.code
);
} catch {
return text.slice(0, 300);
}
}
export async function transcribeElevenLabsAudio(
req: AudioTranscriptionRequest,
): Promise<AudioTranscriptionResult> {
const fetchFn = req.fetchFn ?? fetch;
const apiKey = req.apiKey || process.env.ELEVENLABS_API_KEY || process.env.XI_API_KEY;
if (!apiKey) {
throw new Error("ElevenLabs API key missing");
}
const model = req.model?.trim() || DEFAULT_ELEVENLABS_STT_MODEL;
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), req.timeoutMs);
try {
const form = new FormData();
const bytes = new Uint8Array(req.buffer);
const blob = new Blob([bytes], { type: req.mime ?? "application/octet-stream" });
form.append("file", blob, resolveUploadFileName(req.fileName, req.mime));
form.append("model_id", model);
if (req.language?.trim()) {
form.append("language_code", req.language.trim());
}
if (req.prompt?.trim()) {
form.append("prompt", req.prompt.trim());
}
const res = await fetchFn(`${normalizeElevenLabsBaseUrl(req.baseUrl)}/v1/speech-to-text`, {
method: "POST",
headers: {
"xi-api-key": apiKey,
},
body: form,
signal: controller.signal,
});
if (!res.ok) {
const detail = await readErrorDetail(res);
throw new Error(
`ElevenLabs audio transcription failed (${res.status})${detail ? `: ${detail}` : ""}`,
);
}
const payload = (await res.json()) as { text?: string };
const text = payload.text?.trim();
if (!text) {
throw new Error("ElevenLabs audio transcription response missing text");
}
return { text, model };
} finally {
clearTimeout(timeout);
}
}
export const elevenLabsMediaUnderstandingProvider: MediaUnderstandingProvider = {
id: "elevenlabs",
capabilities: ["audio"],
defaultModels: { audio: DEFAULT_ELEVENLABS_STT_MODEL },
autoPriority: { audio: 45 },
transcribeAudio: transcribeElevenLabsAudio,
};

View File

@@ -1,8 +1,24 @@
{
"id": "elevenlabs",
"enabledByDefault": true,
"providerAuthEnvVars": {
"elevenlabs": ["ELEVENLABS_API_KEY", "XI_API_KEY"]
},
"contracts": {
"speechProviders": ["elevenlabs"]
"speechProviders": ["elevenlabs"],
"mediaUnderstandingProviders": ["elevenlabs"],
"realtimeTranscriptionProviders": ["elevenlabs"]
},
"mediaUnderstandingProviderMetadata": {
"elevenlabs": {
"capabilities": ["audio"],
"defaultModels": {
"audio": "scribe_v2"
},
"autoPriority": {
"audio": 45
}
}
},
"configContracts": {
"compatibilityMigrationPaths": [

View File

@@ -4,6 +4,9 @@
"private": true,
"description": "OpenClaw ElevenLabs speech plugin",
"type": "module",
"dependencies": {
"ws": "^8.20.0"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"
},

View File

@@ -0,0 +1,54 @@
import { describe, expect, it } from "vitest";
import type { OpenClawConfig } from "../../src/config/types.openclaw.js";
import {
__testing,
buildElevenLabsRealtimeTranscriptionProvider,
} from "./realtime-transcription-provider.js";
describe("buildElevenLabsRealtimeTranscriptionProvider", () => {
it("normalizes nested provider config", () => {
const provider = buildElevenLabsRealtimeTranscriptionProvider();
const resolved = provider.resolveConfig?.({
cfg: {} as OpenClawConfig,
rawConfig: {
providers: {
elevenlabs: {
apiKey: "eleven-key",
model_id: "scribe_v2_realtime",
audio_format: "ulaw_8000",
sample_rate: "8000",
commit_strategy: "vad",
language: "en",
},
},
},
});
expect(resolved).toMatchObject({
apiKey: "eleven-key",
audioFormat: "ulaw_8000",
sampleRate: 8000,
commitStrategy: "vad",
languageCode: "en",
});
});
it("builds an ElevenLabs realtime websocket URL", () => {
const url = __testing.toElevenLabsRealtimeWsUrl({
apiKey: "eleven-key",
baseUrl: "https://api.elevenlabs.io",
providerConfig: {},
modelId: "scribe_v2_realtime",
audioFormat: "ulaw_8000",
sampleRate: 8000,
commitStrategy: "vad",
languageCode: "en",
});
expect(url).toContain("wss://api.elevenlabs.io/v1/speech-to-text/realtime?");
expect(url).toContain("model_id=scribe_v2_realtime");
expect(url).toContain("audio_format=ulaw_8000");
expect(url).toContain("commit_strategy=vad");
expect(url).toContain("language_code=en");
});
});

View File

@@ -0,0 +1,488 @@
import { randomUUID } from "node:crypto";
import {
captureWsEvent,
createDebugProxyWebSocketAgent,
resolveDebugProxySettings,
} from "openclaw/plugin-sdk/proxy-capture";
import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
} from "openclaw/plugin-sdk/realtime-transcription";
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import WebSocket from "ws";
import { resolveElevenLabsApiKeyWithProfileFallback } from "./config-api.js";
import { normalizeElevenLabsBaseUrl } from "./shared.js";
type ElevenLabsRealtimeTranscriptionProviderConfig = {
apiKey?: string;
baseUrl?: string;
modelId?: string;
audioFormat?: string;
sampleRate?: number;
languageCode?: string;
commitStrategy?: "manual" | "vad";
vadSilenceThresholdSecs?: number;
vadThreshold?: number;
minSpeechDurationMs?: number;
minSilenceDurationMs?: number;
};
type ElevenLabsRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
apiKey: string;
baseUrl: string;
modelId: string;
audioFormat: string;
sampleRate: number;
commitStrategy: "manual" | "vad";
languageCode?: string;
vadSilenceThresholdSecs?: number;
vadThreshold?: number;
minSpeechDurationMs?: number;
minSilenceDurationMs?: number;
};
type ElevenLabsRealtimeTranscriptionEvent = {
message_type?: string;
text?: string;
error?: string;
message?: string;
code?: string;
};
const ELEVENLABS_REALTIME_DEFAULT_MODEL = "scribe_v2_realtime";
const ELEVENLABS_REALTIME_DEFAULT_AUDIO_FORMAT = "ulaw_8000";
const ELEVENLABS_REALTIME_DEFAULT_SAMPLE_RATE = 8000;
const ELEVENLABS_REALTIME_DEFAULT_COMMIT_STRATEGY: "manual" | "vad" = "vad";
const ELEVENLABS_REALTIME_CONNECT_TIMEOUT_MS = 10_000;
const ELEVENLABS_REALTIME_CLOSE_TIMEOUT_MS = 5_000;
const ELEVENLABS_REALTIME_MAX_RECONNECT_ATTEMPTS = 5;
const ELEVENLABS_REALTIME_RECONNECT_DELAY_MS = 1000;
const ELEVENLABS_REALTIME_MAX_QUEUED_BYTES = 2 * 1024 * 1024;
function readRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function readNestedElevenLabsConfig(rawConfig: RealtimeTranscriptionProviderConfig) {
const raw = readRecord(rawConfig);
const providers = readRecord(raw?.providers);
return readRecord(providers?.elevenlabs ?? raw?.elevenlabs ?? raw) ?? {};
}
function readFiniteNumber(value: unknown): number | undefined {
const next =
typeof value === "number"
? value
: typeof value === "string"
? Number.parseFloat(value)
: undefined;
return Number.isFinite(next) ? next : undefined;
}
function normalizeCommitStrategy(value: unknown): "manual" | "vad" | undefined {
const normalized = normalizeOptionalString(value)?.toLowerCase();
if (!normalized) {
return undefined;
}
if (normalized === "manual" || normalized === "vad") {
return normalized;
}
throw new Error(`Invalid ElevenLabs realtime transcription commit strategy: ${normalized}`);
}
function normalizeProviderConfig(
config: RealtimeTranscriptionProviderConfig,
): ElevenLabsRealtimeTranscriptionProviderConfig {
const raw = readNestedElevenLabsConfig(config);
return {
apiKey: normalizeResolvedSecretInputString({
value: raw.apiKey,
path: "plugins.entries.voice-call.config.streaming.providers.elevenlabs.apiKey",
}),
baseUrl: normalizeOptionalString(raw.baseUrl),
modelId: normalizeOptionalString(raw.modelId ?? raw.model ?? raw.sttModel),
audioFormat: normalizeOptionalString(raw.audioFormat ?? raw.audio_format ?? raw.encoding),
sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate),
languageCode: normalizeOptionalString(raw.languageCode ?? raw.language),
commitStrategy: normalizeCommitStrategy(raw.commitStrategy ?? raw.commit_strategy),
vadSilenceThresholdSecs: readFiniteNumber(
raw.vadSilenceThresholdSecs ?? raw.vad_silence_threshold_secs,
),
vadThreshold: readFiniteNumber(raw.vadThreshold ?? raw.vad_threshold),
minSpeechDurationMs: readFiniteNumber(raw.minSpeechDurationMs ?? raw.min_speech_duration_ms),
minSilenceDurationMs: readFiniteNumber(raw.minSilenceDurationMs ?? raw.min_silence_duration_ms),
};
}
function normalizeElevenLabsRealtimeBaseUrl(value?: string): string {
const url = new URL(normalizeElevenLabsBaseUrl(value));
url.protocol = url.protocol === "http:" ? "ws:" : "wss:";
return url.toString().replace(/\/+$/, "");
}
function toElevenLabsRealtimeWsUrl(config: ElevenLabsRealtimeTranscriptionSessionConfig): string {
const url = new URL(
`${normalizeElevenLabsRealtimeBaseUrl(config.baseUrl)}/v1/speech-to-text/realtime`,
);
url.searchParams.set("model_id", config.modelId);
url.searchParams.set("audio_format", config.audioFormat);
url.searchParams.set("commit_strategy", config.commitStrategy);
url.searchParams.set("include_timestamps", "false");
url.searchParams.set("include_language_detection", "false");
if (config.languageCode) {
url.searchParams.set("language_code", config.languageCode);
}
if (config.vadSilenceThresholdSecs != null) {
url.searchParams.set("vad_silence_threshold_secs", String(config.vadSilenceThresholdSecs));
}
if (config.vadThreshold != null) {
url.searchParams.set("vad_threshold", String(config.vadThreshold));
}
if (config.minSpeechDurationMs != null) {
url.searchParams.set("min_speech_duration_ms", String(config.minSpeechDurationMs));
}
if (config.minSilenceDurationMs != null) {
url.searchParams.set("min_silence_duration_ms", String(config.minSilenceDurationMs));
}
return url.toString();
}
function rawWsDataToBuffer(data: WebSocket.RawData): Buffer {
if (Buffer.isBuffer(data)) {
return data;
}
if (Array.isArray(data)) {
return Buffer.concat(data);
}
return Buffer.from(data);
}
function readErrorDetail(event: ElevenLabsRealtimeTranscriptionEvent): string {
return (
normalizeOptionalString(event.error) ??
normalizeOptionalString(event.message) ??
normalizeOptionalString(event.code) ??
"ElevenLabs realtime transcription error"
);
}
class ElevenLabsRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private ws: WebSocket | null = null;
private connected = false;
private ready = false;
private closed = false;
private reconnectAttempts = 0;
private queuedAudio: Buffer[] = [];
private queuedBytes = 0;
private closeTimer: ReturnType<typeof setTimeout> | undefined;
private lastTranscript: string | undefined;
private reconnecting = false;
private readonly flowId = randomUUID();
constructor(private readonly config: ElevenLabsRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.closed || audio.byteLength === 0) {
return;
}
if (this.ws?.readyState === WebSocket.OPEN && this.ready) {
this.sendAudioChunk(audio);
return;
}
this.queueAudio(audio);
}
close(): void {
this.closed = true;
this.connected = false;
this.queuedAudio = [];
this.queuedBytes = 0;
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
this.forceClose();
return;
}
this.sendJson({
message_type: "input_audio_chunk",
audio_base_64: "",
sample_rate: this.config.sampleRate,
commit: true,
});
this.closeTimer = setTimeout(() => this.forceClose(), ELEVENLABS_REALTIME_CLOSE_TIMEOUT_MS);
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
const url = toElevenLabsRealtimeWsUrl(this.config);
const debugProxy = resolveDebugProxySettings();
const proxyAgent = createDebugProxyWebSocketAgent(debugProxy);
let settled = false;
let opened = false;
const finishConnect = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.ready = true;
this.flushQueuedAudio();
resolve();
};
const failConnect = (error: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.config.onError?.(error);
this.closed = true;
this.forceClose();
reject(error);
};
this.ready = false;
this.ws = new WebSocket(url, {
headers: {
"xi-api-key": this.config.apiKey,
},
...(proxyAgent ? { agent: proxyAgent } : {}),
});
const connectTimeout = setTimeout(() => {
failConnect(new Error("ElevenLabs realtime transcription connection timeout"));
}, ELEVENLABS_REALTIME_CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
opened = true;
this.connected = true;
this.reconnectAttempts = 0;
captureWsEvent({
url,
direction: "local",
kind: "ws-open",
flowId: this.flowId,
meta: { provider: "elevenlabs", capability: "realtime-transcription" },
});
});
this.ws.on("message", (data) => {
const payload = rawWsDataToBuffer(data);
captureWsEvent({
url,
direction: "inbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "elevenlabs", capability: "realtime-transcription" },
});
try {
const event = JSON.parse(payload.toString()) as ElevenLabsRealtimeTranscriptionEvent;
if (event.message_type === "session_started") {
finishConnect();
return;
}
if (!this.ready && event.message_type?.includes("error")) {
failConnect(new Error(readErrorDetail(event)));
return;
}
this.handleEvent(event);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
captureWsEvent({
url,
direction: "local",
kind: "error",
flowId: this.flowId,
errorText: error instanceof Error ? error.message : String(error),
meta: { provider: "elevenlabs", capability: "realtime-transcription" },
});
if (!opened) {
failConnect(error instanceof Error ? error : new Error(String(error)));
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", () => {
clearTimeout(connectTimeout);
this.connected = false;
this.ready = false;
if (this.closed || !opened || !settled) {
return;
}
void this.attemptReconnect();
});
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed || this.reconnecting) {
return;
}
if (this.reconnectAttempts >= ELEVENLABS_REALTIME_MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(new Error("ElevenLabs realtime transcription reconnect limit reached"));
return;
}
this.reconnectAttempts += 1;
const delay = ELEVENLABS_REALTIME_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
this.reconnecting = true;
try {
await new Promise((resolve) => setTimeout(resolve, delay));
if (!this.closed) {
await this.doConnect();
}
} catch {
if (!this.closed) {
this.reconnecting = false;
await this.attemptReconnect();
return;
}
} finally {
this.reconnecting = false;
}
}
private handleEvent(event: ElevenLabsRealtimeTranscriptionEvent): void {
switch (event.message_type) {
case "partial_transcript":
if (event.text) {
this.config.onPartial?.(event.text);
}
return;
case "committed_transcript":
case "committed_transcript_with_timestamps":
if (event.text) {
this.emitTranscript(event.text);
}
return;
default:
if (event.message_type?.includes("error")) {
this.config.onError?.(new Error(readErrorDetail(event)));
}
return;
}
}
private emitTranscript(text: string): void {
if (text === this.lastTranscript) {
return;
}
this.lastTranscript = text;
this.config.onTranscript?.(text);
}
private queueAudio(audio: Buffer): void {
this.queuedAudio.push(Buffer.from(audio));
this.queuedBytes += audio.byteLength;
while (this.queuedBytes > ELEVENLABS_REALTIME_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) {
const dropped = this.queuedAudio.shift();
this.queuedBytes -= dropped?.byteLength ?? 0;
}
}
private flushQueuedAudio(): void {
for (const audio of this.queuedAudio) {
this.sendAudioChunk(audio);
}
this.queuedAudio = [];
this.queuedBytes = 0;
}
private sendAudioChunk(audio: Buffer): void {
this.sendJson({
message_type: "input_audio_chunk",
audio_base_64: audio.toString("base64"),
sample_rate: this.config.sampleRate,
...(this.config.commitStrategy === "manual" ? { commit: true } : {}),
});
}
private sendJson(event: unknown): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
const payload = JSON.stringify(event);
captureWsEvent({
url: toElevenLabsRealtimeWsUrl(this.config),
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "elevenlabs", capability: "realtime-transcription" },
});
this.ws.send(payload);
}
private forceClose(): void {
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
this.connected = false;
this.ready = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
}
export function buildElevenLabsRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
return {
id: "elevenlabs",
label: "ElevenLabs Realtime Transcription",
aliases: ["elevenlabs-realtime", "scribe-v2-realtime"],
autoSelectOrder: 40,
resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig),
isConfigured: ({ providerConfig }) =>
Boolean(
normalizeProviderConfig(providerConfig).apiKey ||
resolveElevenLabsApiKeyWithProfileFallback() ||
process.env.XI_API_KEY,
),
createSession: (req) => {
const config = normalizeProviderConfig(req.providerConfig);
const apiKey =
config.apiKey || resolveElevenLabsApiKeyWithProfileFallback() || process.env.XI_API_KEY;
if (!apiKey) {
throw new Error("ElevenLabs API key missing");
}
return new ElevenLabsRealtimeTranscriptionSession({
...req,
apiKey,
baseUrl: normalizeElevenLabsBaseUrl(config.baseUrl),
modelId: config.modelId ?? ELEVENLABS_REALTIME_DEFAULT_MODEL,
audioFormat: config.audioFormat ?? ELEVENLABS_REALTIME_DEFAULT_AUDIO_FORMAT,
sampleRate: config.sampleRate ?? ELEVENLABS_REALTIME_DEFAULT_SAMPLE_RATE,
commitStrategy: config.commitStrategy ?? ELEVENLABS_REALTIME_DEFAULT_COMMIT_STRATEGY,
languageCode: config.languageCode,
vadSilenceThresholdSecs: config.vadSilenceThresholdSecs,
vadThreshold: config.vadThreshold,
minSpeechDurationMs: config.minSpeechDurationMs,
minSilenceDurationMs: config.minSilenceDurationMs,
});
},
};
}
export const __testing = {
normalizeProviderConfig,
toElevenLabsRealtimeWsUrl,
};

View File

@@ -1 +1,6 @@
export {
elevenLabsMediaUnderstandingProvider,
transcribeElevenLabsAudio,
} from "./media-understanding-provider.js";
export { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
export { buildElevenLabsSpeechProvider } from "./speech-provider.js";

View File

@@ -5,6 +5,7 @@ import { mistralMemoryEmbeddingProviderAdapter } from "./memory-embedding-adapte
import { applyMistralConfig, MISTRAL_DEFAULT_MODEL_REF } from "./onboard.js";
import { buildMistralProvider } from "./provider-catalog.js";
import { contributeMistralResolvedModelCompat } from "./provider-compat.js";
import { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
const PROVIDER_ID = "mistral";
export function buildMistralReplayPolicy() {
@@ -55,5 +56,6 @@ export default defineSingleProviderPluginEntry({
register(api) {
api.registerMemoryEmbeddingProvider(mistralMemoryEmbeddingProviderAdapter);
api.registerMediaUnderstandingProvider(mistralMediaUnderstandingProvider);
api.registerRealtimeTranscriptionProvider(buildMistralRealtimeTranscriptionProvider());
},
});

View File

@@ -0,0 +1,84 @@
import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js";
import {
normalizeTranscriptForMatch,
streamAudioForLiveTest,
synthesizeElevenLabsLiveSpeech,
waitForLiveExpectation,
} from "../../test/helpers/stt-live-audio.js";
import { mistralMediaUnderstandingProvider } from "./media-understanding-provider.js";
import { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
const MISTRAL_KEY = process.env.MISTRAL_API_KEY ?? "";
const ELEVENLABS_KEY = process.env.ELEVENLABS_API_KEY ?? "";
const LIVE = isLiveTestEnabled(["MISTRAL_LIVE_TEST"]);
const describeLive = LIVE && MISTRAL_KEY && ELEVENLABS_KEY ? describe : describe.skip;
describeLive("mistral plugin live", () => {
it("transcribes synthesized speech through the media provider", async () => {
const phrase = "Testing OpenClaw Mistral speech to text integration OK.";
const audio = await synthesizeElevenLabsLiveSpeech({
text: phrase,
apiKey: ELEVENLABS_KEY,
outputFormat: "mp3_44100_128",
timeoutMs: 30_000,
});
const transcript = await mistralMediaUnderstandingProvider.transcribeAudio?.({
buffer: audio,
fileName: "mistral-live.mp3",
mime: "audio/mpeg",
apiKey: MISTRAL_KEY,
timeoutMs: 60_000,
});
const normalized = normalizeTranscriptForMatch(transcript?.text ?? "");
expect(normalized).toContain("openclaw");
expect(normalized).toContain("mistral");
}, 90_000);
it("streams realtime STT through the registered transcription provider", async () => {
const provider = buildMistralRealtimeTranscriptionProvider();
const phrase = "Testing OpenClaw Mistral realtime transcription integration OK.";
const speech = await synthesizeElevenLabsLiveSpeech({
text: phrase,
apiKey: ELEVENLABS_KEY,
outputFormat: "ulaw_8000",
timeoutMs: 30_000,
});
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = provider.createSession({
providerConfig: {
apiKey: MISTRAL_KEY,
sampleRate: 8000,
encoding: "pcm_mulaw",
targetStreamingDelayMs: 800,
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
});
try {
await session.connect();
await streamAudioForLiveTest({
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
sendAudio: (chunk) => session.sendAudio(chunk),
});
session.close();
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
expect(partials.length + transcripts.length).toBeGreaterThan(0);
}, 90_000);
});

View File

@@ -22,7 +22,8 @@
],
"contracts": {
"memoryEmbeddingProviders": ["mistral"],
"mediaUnderstandingProviders": ["mistral"]
"mediaUnderstandingProviders": ["mistral"],
"realtimeTranscriptionProviders": ["mistral"]
},
"mediaUnderstandingProviderMetadata": {
"mistral": {

View File

@@ -4,6 +4,9 @@
"private": true,
"description": "OpenClaw Mistral provider plugin",
"type": "module",
"dependencies": {
"ws": "^8.20.0"
},
"devDependencies": {
"@openclaw/plugin-sdk": "workspace:*"
},

View File

@@ -0,0 +1,60 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import type { OpenClawConfig } from "../../src/config/types.openclaw.js";
import {
__testing,
buildMistralRealtimeTranscriptionProvider,
} from "./realtime-transcription-provider.js";
describe("buildMistralRealtimeTranscriptionProvider", () => {
afterEach(() => {
vi.unstubAllEnvs();
});
it("normalizes nested provider config", () => {
const provider = buildMistralRealtimeTranscriptionProvider();
const resolved = provider.resolveConfig?.({
cfg: {} as OpenClawConfig,
rawConfig: {
providers: {
mistral: {
apiKey: "mistral-key",
model: "voxtral-mini-transcribe-realtime-2602",
encoding: "g711_ulaw",
sample_rate: "8000",
target_streaming_delay_ms: "240",
},
},
},
});
expect(resolved).toMatchObject({
apiKey: "mistral-key",
model: "voxtral-mini-transcribe-realtime-2602",
encoding: "pcm_mulaw",
sampleRate: 8000,
targetStreamingDelayMs: 240,
});
});
it("builds a Mistral realtime websocket URL", () => {
const url = __testing.toMistralRealtimeWsUrl({
apiKey: "mistral-key",
baseUrl: "https://api.mistral.ai/v1",
model: "voxtral-mini-transcribe-realtime-2602",
providerConfig: {},
sampleRate: 8000,
encoding: "pcm_mulaw",
targetStreamingDelayMs: 800,
});
expect(url).toContain("wss://api.mistral.ai/v1/audio/transcriptions/realtime?");
expect(url).toContain("model=voxtral-mini-transcribe-realtime-2602");
expect(url).toContain("target_streaming_delay_ms=800");
});
it("requires an API key when creating sessions", () => {
vi.stubEnv("MISTRAL_API_KEY", "");
const provider = buildMistralRealtimeTranscriptionProvider();
expect(() => provider.createSession({ providerConfig: {} })).toThrow("Mistral API key missing");
});
});

View File

@@ -0,0 +1,492 @@
import { randomUUID } from "node:crypto";
import {
captureWsEvent,
createDebugProxyWebSocketAgent,
resolveDebugProxySettings,
} from "openclaw/plugin-sdk/proxy-capture";
import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
} from "openclaw/plugin-sdk/realtime-transcription";
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import WebSocket from "ws";
type MistralRealtimeTranscriptionEncoding =
| "pcm_s16le"
| "pcm_s32le"
| "pcm_f16le"
| "pcm_f32le"
| "pcm_mulaw"
| "pcm_alaw";
type MistralRealtimeTranscriptionProviderConfig = {
apiKey?: string;
baseUrl?: string;
model?: string;
sampleRate?: number;
encoding?: MistralRealtimeTranscriptionEncoding;
targetStreamingDelayMs?: number;
};
type MistralRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
apiKey: string;
baseUrl: string;
model: string;
sampleRate: number;
encoding: MistralRealtimeTranscriptionEncoding;
targetStreamingDelayMs?: number;
};
type MistralRealtimeTranscriptionEvent = {
type?: string;
text?: string;
error?: {
message?: unknown;
code?: number;
};
};
const MISTRAL_REALTIME_DEFAULT_BASE_URL = "wss://api.mistral.ai";
const MISTRAL_REALTIME_DEFAULT_MODEL = "voxtral-mini-transcribe-realtime-2602";
const MISTRAL_REALTIME_DEFAULT_SAMPLE_RATE = 8000;
const MISTRAL_REALTIME_DEFAULT_ENCODING: MistralRealtimeTranscriptionEncoding = "pcm_mulaw";
const MISTRAL_REALTIME_DEFAULT_DELAY_MS = 800;
const MISTRAL_REALTIME_CONNECT_TIMEOUT_MS = 10_000;
const MISTRAL_REALTIME_CLOSE_TIMEOUT_MS = 5_000;
const MISTRAL_REALTIME_MAX_RECONNECT_ATTEMPTS = 5;
const MISTRAL_REALTIME_RECONNECT_DELAY_MS = 1000;
const MISTRAL_REALTIME_MAX_QUEUED_BYTES = 2 * 1024 * 1024;
function readRecord(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === "object" && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function readNestedMistralConfig(rawConfig: RealtimeTranscriptionProviderConfig) {
const raw = readRecord(rawConfig);
const providers = readRecord(raw?.providers);
return readRecord(providers?.mistral ?? raw?.mistral ?? raw) ?? {};
}
function readFiniteNumber(value: unknown): number | undefined {
const next =
typeof value === "number"
? value
: typeof value === "string"
? Number.parseFloat(value)
: undefined;
return Number.isFinite(next) ? next : undefined;
}
function normalizeMistralEncoding(
value: unknown,
): MistralRealtimeTranscriptionEncoding | undefined {
const normalized = normalizeOptionalString(value)?.toLowerCase();
if (!normalized) {
return undefined;
}
switch (normalized) {
case "pcm":
case "linear16":
case "pcm_s16le":
return "pcm_s16le";
case "pcm_s32le":
case "pcm_f16le":
case "pcm_f32le":
return normalized;
case "mulaw":
case "ulaw":
case "g711_ulaw":
case "g711-mulaw":
case "pcm_mulaw":
return "pcm_mulaw";
case "alaw":
case "g711_alaw":
case "g711-alaw":
case "pcm_alaw":
return "pcm_alaw";
default:
throw new Error(`Invalid Mistral realtime transcription encoding: ${normalized}`);
}
}
function normalizeMistralRealtimeBaseUrl(value?: string): string {
const raw = normalizeOptionalString(value ?? process.env.MISTRAL_REALTIME_BASE_URL);
if (!raw) {
return MISTRAL_REALTIME_DEFAULT_BASE_URL;
}
const url = new URL(raw);
url.protocol =
url.protocol === "http:" ? "ws:" : url.protocol === "https:" ? "wss:" : url.protocol;
url.pathname = url.pathname.replace(/\/v1\/?$/, "").replace(/\/+$/, "");
return url.toString().replace(/\/+$/, "");
}
function toMistralRealtimeWsUrl(config: MistralRealtimeTranscriptionSessionConfig): string {
const base = new URL(`${normalizeMistralRealtimeBaseUrl(config.baseUrl)}/`);
const url = new URL("v1/audio/transcriptions/realtime", base);
url.searchParams.set("model", config.model);
if (config.targetStreamingDelayMs != null) {
url.searchParams.set("target_streaming_delay_ms", String(config.targetStreamingDelayMs));
}
return url.toString();
}
function normalizeProviderConfig(
config: RealtimeTranscriptionProviderConfig,
): MistralRealtimeTranscriptionProviderConfig {
const raw = readNestedMistralConfig(config);
return {
apiKey: normalizeResolvedSecretInputString({
value: raw.apiKey,
path: "plugins.entries.voice-call.config.streaming.providers.mistral.apiKey",
}),
baseUrl: normalizeOptionalString(raw.baseUrl),
model: normalizeOptionalString(raw.model ?? raw.sttModel),
sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate),
encoding: normalizeMistralEncoding(raw.encoding),
targetStreamingDelayMs: readFiniteNumber(
raw.targetStreamingDelayMs ?? raw.target_streaming_delay_ms ?? raw.delayMs,
),
};
}
function rawWsDataToBuffer(data: WebSocket.RawData): Buffer {
if (Buffer.isBuffer(data)) {
return data;
}
if (Array.isArray(data)) {
return Buffer.concat(data);
}
return Buffer.from(data);
}
function readErrorDetail(event: MistralRealtimeTranscriptionEvent): string {
const message = event.error?.message;
if (typeof message === "string") {
return message;
}
if (message && typeof message === "object") {
return JSON.stringify(message);
}
if (typeof event.error?.code === "number") {
return `Mistral realtime transcription error (${event.error.code})`;
}
return "Mistral realtime transcription error";
}
class MistralRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private ws: WebSocket | null = null;
private connected = false;
private ready = false;
private closed = false;
private reconnectAttempts = 0;
private queuedAudio: Buffer[] = [];
private queuedBytes = 0;
private closeTimer: ReturnType<typeof setTimeout> | undefined;
private partialText = "";
private reconnecting = false;
private readonly flowId = randomUUID();
constructor(private readonly config: MistralRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.closed || audio.byteLength === 0) {
return;
}
if (this.ws?.readyState === WebSocket.OPEN && this.ready) {
this.sendJson({
type: "input_audio.append",
audio: audio.toString("base64"),
});
return;
}
this.queueAudio(audio);
}
close(): void {
this.closed = true;
this.connected = false;
this.queuedAudio = [];
this.queuedBytes = 0;
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
this.forceClose();
return;
}
this.sendJson({ type: "input_audio.flush" });
this.sendJson({ type: "input_audio.end" });
this.closeTimer = setTimeout(() => this.forceClose(), MISTRAL_REALTIME_CLOSE_TIMEOUT_MS);
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
const url = toMistralRealtimeWsUrl(this.config);
const debugProxy = resolveDebugProxySettings();
const proxyAgent = createDebugProxyWebSocketAgent(debugProxy);
let settled = false;
let opened = false;
const finishConnect = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.ready = true;
this.flushQueuedAudio();
resolve();
};
const failConnect = (error: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.config.onError?.(error);
this.closed = true;
this.forceClose();
reject(error);
};
this.ready = false;
this.ws = new WebSocket(url, {
headers: {
Authorization: `Bearer ${this.config.apiKey}`,
},
...(proxyAgent ? { agent: proxyAgent } : {}),
});
const connectTimeout = setTimeout(() => {
failConnect(new Error("Mistral realtime transcription connection timeout"));
}, MISTRAL_REALTIME_CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
opened = true;
this.connected = true;
this.reconnectAttempts = 0;
captureWsEvent({
url,
direction: "local",
kind: "ws-open",
flowId: this.flowId,
meta: { provider: "mistral", capability: "realtime-transcription" },
});
});
this.ws.on("message", (data) => {
const payload = rawWsDataToBuffer(data);
captureWsEvent({
url,
direction: "inbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "mistral", capability: "realtime-transcription" },
});
try {
const event = JSON.parse(payload.toString()) as MistralRealtimeTranscriptionEvent;
if (event.type === "session.created") {
this.sendJson({
type: "session.update",
session: {
audio_format: {
encoding: this.config.encoding,
sample_rate: this.config.sampleRate,
},
},
});
finishConnect();
return;
}
if (!this.ready && event.type === "error") {
failConnect(new Error(readErrorDetail(event)));
return;
}
this.handleEvent(event);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
captureWsEvent({
url,
direction: "local",
kind: "error",
flowId: this.flowId,
errorText: error instanceof Error ? error.message : String(error),
meta: { provider: "mistral", capability: "realtime-transcription" },
});
if (!opened) {
failConnect(error instanceof Error ? error : new Error(String(error)));
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", () => {
clearTimeout(connectTimeout);
this.connected = false;
this.ready = false;
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
if (this.closed || !opened || !settled) {
return;
}
void this.attemptReconnect();
});
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed || this.reconnecting) {
return;
}
if (this.reconnectAttempts >= MISTRAL_REALTIME_MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(new Error("Mistral realtime transcription reconnect limit reached"));
return;
}
this.reconnectAttempts += 1;
const delay = MISTRAL_REALTIME_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
this.reconnecting = true;
try {
await new Promise((resolve) => setTimeout(resolve, delay));
if (!this.closed) {
await this.doConnect();
}
} catch {
if (!this.closed) {
this.reconnecting = false;
await this.attemptReconnect();
return;
}
} finally {
this.reconnecting = false;
}
}
private handleEvent(event: MistralRealtimeTranscriptionEvent): void {
switch (event.type) {
case "transcription.text.delta":
if (event.text) {
this.partialText += event.text;
this.config.onPartial?.(this.partialText);
}
return;
case "transcription.segment":
if (event.text) {
this.config.onTranscript?.(event.text);
this.partialText = "";
}
return;
case "transcription.done":
if (this.partialText.trim()) {
this.config.onTranscript?.(this.partialText);
this.partialText = "";
}
this.forceClose();
return;
case "error":
this.config.onError?.(new Error(readErrorDetail(event)));
return;
default:
return;
}
}
private queueAudio(audio: Buffer): void {
this.queuedAudio.push(Buffer.from(audio));
this.queuedBytes += audio.byteLength;
while (this.queuedBytes > MISTRAL_REALTIME_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) {
const dropped = this.queuedAudio.shift();
this.queuedBytes -= dropped?.byteLength ?? 0;
}
}
private flushQueuedAudio(): void {
for (const audio of this.queuedAudio) {
this.sendJson({
type: "input_audio.append",
audio: audio.toString("base64"),
});
}
this.queuedAudio = [];
this.queuedBytes = 0;
}
private sendJson(event: unknown): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
const payload = JSON.stringify(event);
captureWsEvent({
url: toMistralRealtimeWsUrl(this.config),
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "mistral", capability: "realtime-transcription" },
});
this.ws.send(payload);
}
private forceClose(): void {
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
this.connected = false;
this.ready = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
}
export function buildMistralRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
return {
id: "mistral",
label: "Mistral Realtime Transcription",
aliases: ["mistral-realtime", "voxtral-realtime"],
autoSelectOrder: 45,
resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig),
isConfigured: ({ providerConfig }) =>
Boolean(normalizeProviderConfig(providerConfig).apiKey || process.env.MISTRAL_API_KEY),
createSession: (req) => {
const config = normalizeProviderConfig(req.providerConfig);
const apiKey = config.apiKey || process.env.MISTRAL_API_KEY;
if (!apiKey) {
throw new Error("Mistral API key missing");
}
return new MistralRealtimeTranscriptionSession({
...req,
apiKey,
baseUrl: normalizeMistralRealtimeBaseUrl(config.baseUrl),
model: config.model ?? MISTRAL_REALTIME_DEFAULT_MODEL,
sampleRate: config.sampleRate ?? MISTRAL_REALTIME_DEFAULT_SAMPLE_RATE,
encoding: config.encoding ?? MISTRAL_REALTIME_DEFAULT_ENCODING,
targetStreamingDelayMs: config.targetStreamingDelayMs ?? MISTRAL_REALTIME_DEFAULT_DELAY_MS,
});
},
};
}
export const __testing = {
normalizeProviderConfig,
toMistralRealtimeWsUrl,
};

View File

@@ -1 +1,2 @@
export { mistralMediaUnderstandingProvider } from "./media-understanding-provider.js";
export { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";