refactor(stt): share transcription helpers

This commit is contained in:
Peter Steinberger
2026-04-23 04:29:19 +01:00
parent a58633d809
commit c866820fed
24 changed files with 360 additions and 779 deletions

View File

@@ -1,10 +1,8 @@
import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js";
import {
normalizeTranscriptForMatch,
streamAudioForLiveTest,
runRealtimeSttLiveTest,
synthesizeElevenLabsLiveSpeech,
waitForLiveExpectation,
} from "../../test/helpers/stt-live-audio.js";
import { transcribeDeepgramAudio } from "./audio.js";
import { buildDeepgramRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
@@ -62,37 +60,15 @@ describeLive("deepgram live", () => {
outputFormat: "ulaw_8000",
timeoutMs: 30_000,
});
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = provider.createSession({
await runRealtimeSttLiveTest({
provider,
providerConfig: {
apiKey: DEEPGRAM_KEY,
language: "en-US",
endpointingMs: 500,
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
});
try {
await session.connect();
await streamAudioForLiveTest({
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
sendAudio: (chunk) => session.sendAudio(chunk),
});
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
expect(partials.length + transcripts.length).toBeGreaterThan(0);
}, 90_000);
});

View File

@@ -2,9 +2,8 @@ import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js";
import {
normalizeTranscriptForMatch,
streamAudioForLiveTest,
runRealtimeSttLiveTest,
synthesizeElevenLabsLiveSpeech,
waitForLiveExpectation,
} from "../../test/helpers/stt-live-audio.js";
import { elevenLabsMediaUnderstandingProvider } from "./media-understanding-provider.js";
import { buildElevenLabsRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
@@ -45,10 +44,9 @@ describeLive("elevenlabs plugin live", () => {
outputFormat: "ulaw_8000",
timeoutMs: 30_000,
});
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = provider.createSession({
await runRealtimeSttLiveTest({
provider,
providerConfig: {
apiKey: ELEVENLABS_KEY,
audioFormat: "ulaw_8000",
@@ -56,29 +54,8 @@ describeLive("elevenlabs plugin live", () => {
commitStrategy: "vad",
languageCode: "en",
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
closeBeforeWait: true,
});
try {
await session.connect();
await streamAudioForLiveTest({
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
sendAudio: (chunk) => session.sendAudio(chunk),
});
session.close();
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
expect(partials.length + transcripts.length).toBeGreaterThan(0);
}, 90_000);
});

View File

@@ -31,12 +31,11 @@ describe("elevenLabsMediaUnderstandingProvider", () => {
expect(result).toEqual({ text: "hello", model: "scribe_v2" });
expect(fetchMock).toHaveBeenCalledWith(
"https://api.elevenlabs.io/v1/speech-to-text",
expect.objectContaining({
method: "POST",
headers: { "xi-api-key": "eleven-key" },
}),
expect.objectContaining({ method: "POST" }),
);
const init = fetchMock.mock.calls[0]?.[1] as RequestInit;
const headers = new Headers(init.headers);
expect(headers.get("xi-api-key")).toBe("eleven-key");
const form = init.body as FormData;
expect(form.get("model_id")).toBe("scribe_v2");
expect(form.get("language_code")).toBe("en");

View File

@@ -1,51 +1,19 @@
import path from "node:path";
import type {
AudioTranscriptionRequest,
AudioTranscriptionResult,
MediaUnderstandingProvider,
} from "openclaw/plugin-sdk/media-understanding";
import { normalizeElevenLabsBaseUrl } from "./shared.js";
import {
assertOkOrThrowHttpError,
buildAudioTranscriptionFormData,
postTranscriptionRequest,
resolveProviderHttpRequestConfig,
requireTranscriptionText,
} from "openclaw/plugin-sdk/provider-http";
import { DEFAULT_ELEVENLABS_BASE_URL, normalizeElevenLabsBaseUrl } from "./shared.js";
const DEFAULT_ELEVENLABS_STT_MODEL = "scribe_v2";
function resolveUploadFileName(fileName?: string, mime?: string): string {
const trimmed = fileName?.trim();
const baseName = trimmed ? path.basename(trimmed) : "audio";
const lowerMime = mime?.trim().toLowerCase();
if (/\.aac$/i.test(baseName)) {
return `${baseName.slice(0, -4) || "audio"}.m4a`;
}
if (!path.extname(baseName) && lowerMime === "audio/aac") {
return `${baseName || "audio"}.m4a`;
}
return baseName;
}
async function readErrorDetail(res: Response): Promise<string | undefined> {
const text = (await res.text()).trim();
if (!text) {
return undefined;
}
try {
const json = JSON.parse(text) as {
detail?: { message?: string; detail?: string; status?: string; code?: string };
message?: string;
error?: string;
};
return (
json.message ??
json.detail?.message ??
json.detail?.detail ??
json.error ??
json.detail?.status ??
json.detail?.code
);
} catch {
return text.slice(0, 300);
}
}
export async function transcribeElevenLabsAudio(
req: AudioTranscriptionRequest,
): Promise<AudioTranscriptionResult> {
@@ -56,46 +24,51 @@ export async function transcribeElevenLabsAudio(
}
const model = req.model?.trim() || DEFAULT_ELEVENLABS_STT_MODEL;
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), req.timeoutMs);
try {
const form = new FormData();
const bytes = new Uint8Array(req.buffer);
const blob = new Blob([bytes], { type: req.mime ?? "application/octet-stream" });
form.append("file", blob, resolveUploadFileName(req.fileName, req.mime));
form.append("model_id", model);
if (req.language?.trim()) {
form.append("language_code", req.language.trim());
}
if (req.prompt?.trim()) {
form.append("prompt", req.prompt.trim());
}
const res = await fetchFn(`${normalizeElevenLabsBaseUrl(req.baseUrl)}/v1/speech-to-text`, {
method: "POST",
headers: {
const { baseUrl, allowPrivateNetwork, headers, dispatcherPolicy } =
resolveProviderHttpRequestConfig({
baseUrl: normalizeElevenLabsBaseUrl(req.baseUrl),
defaultBaseUrl: DEFAULT_ELEVENLABS_BASE_URL,
headers: req.headers,
request: req.request,
defaultHeaders: {
"xi-api-key": apiKey,
},
body: form,
signal: controller.signal,
provider: "elevenlabs",
api: "elevenlabs-speech-to-text",
capability: "audio",
transport: "media-understanding",
});
const form = buildAudioTranscriptionFormData({
buffer: req.buffer,
fileName: req.fileName,
mime: req.mime,
fields: {
model_id: model,
language_code: req.language,
prompt: req.prompt,
},
});
const { response, release } = await postTranscriptionRequest({
url: `${baseUrl}/v1/speech-to-text`,
headers,
body: form,
timeoutMs: req.timeoutMs,
fetchFn,
allowPrivateNetwork,
dispatcherPolicy,
auditContext: "elevenlabs speech-to-text",
});
if (!res.ok) {
const detail = await readErrorDetail(res);
throw new Error(
`ElevenLabs audio transcription failed (${res.status})${detail ? `: ${detail}` : ""}`,
);
}
const payload = (await res.json()) as { text?: string };
const text = payload.text?.trim();
if (!text) {
throw new Error("ElevenLabs audio transcription response missing text");
}
try {
await assertOkOrThrowHttpError(response, "ElevenLabs audio transcription failed");
const payload = (await response.json()) as { text?: string };
const text = requireTranscriptionText(
payload.text,
"ElevenLabs audio transcription response missing text",
);
return { text, model };
} finally {
clearTimeout(timeout);
await release();
}
}

View File

@@ -0,0 +1,3 @@
{
"specs": ["ws@^8.20.0"]
}

View File

@@ -2,9 +2,8 @@ import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../../src/agents/live-test-helpers.js";
import {
normalizeTranscriptForMatch,
streamAudioForLiveTest,
runRealtimeSttLiveTest,
synthesizeElevenLabsLiveSpeech,
waitForLiveExpectation,
} from "../../test/helpers/stt-live-audio.js";
import { mistralMediaUnderstandingProvider } from "./media-understanding-provider.js";
import { buildMistralRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
@@ -46,39 +45,17 @@ describeLive("mistral plugin live", () => {
outputFormat: "ulaw_8000",
timeoutMs: 30_000,
});
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = provider.createSession({
await runRealtimeSttLiveTest({
provider,
providerConfig: {
apiKey: MISTRAL_KEY,
sampleRate: 8000,
encoding: "pcm_mulaw",
targetStreamingDelayMs: 800,
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
closeBeforeWait: true,
});
try {
await session.connect();
await streamAudioForLiveTest({
audio: Buffer.concat([Buffer.alloc(4000, 0xff), speech, Buffer.alloc(8000, 0xff)]),
sendAudio: (chunk) => session.sendAudio(chunk),
});
session.close();
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
expect(partials.length + transcripts.length).toBeGreaterThan(0);
}, 90_000);
});

View File

@@ -13,6 +13,7 @@ import {
registerProviderPlugin,
requireRegisteredProvider,
} from "../../test/helpers/plugins/provider-registration.js";
import { runRealtimeSttLiveTest } from "../../test/helpers/stt-live-audio.js";
import plugin from "./index.js";
const OPENAI_API_KEY = process.env.OPENAI_API_KEY ?? "";
@@ -140,21 +141,6 @@ async function createTempAgentDir(): Promise<string> {
return await fs.mkdtemp(path.join(os.tmpdir(), "openai-plugin-live-"));
}
async function waitForLiveExpectation(expectation: () => void, timeoutMs = 30_000) {
const started = Date.now();
let lastError: unknown;
while (Date.now() - started < timeoutMs) {
try {
expectation();
return;
} catch (error) {
lastError = error;
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
throw lastError;
}
function normalizeTranscriptForMatch(value: string): string {
return value.toLowerCase().replace(/[^a-z0-9]+/g, "");
}
@@ -339,40 +325,19 @@ describeLive("openai plugin live", () => {
expect(telephony.outputFormat).toBe("pcm");
expect(telephony.sampleRate).toBe(24_000);
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = realtimeProvider.createSession({
const speech = convertPcm24kToMulaw8k(telephony.audioBuffer);
const silence = Buffer.alloc(8_000, 0xff);
const audio = Buffer.concat([silence.subarray(0, 4_000), speech, silence]);
const { transcripts, partials } = await runRealtimeSttLiveTest({
provider: realtimeProvider,
providerConfig: {
apiKey: OPENAI_API_KEY,
language: "en",
silenceDurationMs: 500,
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
audio,
});
try {
await session.connect();
const speech = convertPcm24kToMulaw8k(telephony.audioBuffer);
const silence = Buffer.alloc(8_000, 0xff);
const audio = Buffer.concat([silence.subarray(0, 4_000), speech, silence]);
for (let offset = 0; offset < audio.byteLength; offset += 160) {
session.sendAudio(audio.subarray(offset, offset + 160));
await new Promise((resolve) => setTimeout(resolve, 5));
}
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
} finally {
session.close();
}
const normalized = transcripts.join(" ").toLowerCase();
const compact = normalizeTranscriptForMatch(normalized);
expect(compact).toContain("openclaw");

View File

@@ -1,20 +1,14 @@
import { randomUUID } from "node:crypto";
import {
captureWsEvent,
createDebugProxyWebSocketAgent,
resolveDebugProxySettings,
} from "openclaw/plugin-sdk/proxy-capture";
import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
createRealtimeTranscriptionWebSocketSession,
type RealtimeTranscriptionProviderConfig,
type RealtimeTranscriptionProviderPlugin,
type RealtimeTranscriptionSession,
type RealtimeTranscriptionSessionCreateRequest,
type RealtimeTranscriptionWebSocketTransport,
} from "openclaw/plugin-sdk/realtime-transcription";
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
import WebSocket from "ws";
import {
asFiniteNumber,
captureOpenAIRealtimeWsClose,
readRealtimeErrorDetail,
resolveOpenAIProviderConfigRecord,
trimToUndefined,
@@ -45,6 +39,11 @@ type RealtimeEvent = {
error?: unknown;
};
const OPENAI_REALTIME_TRANSCRIPTION_URL = "wss://api.openai.com/v1/realtime?intent=transcription";
const OPENAI_REALTIME_TRANSCRIPTION_CONNECT_TIMEOUT_MS = 10_000;
const OPENAI_REALTIME_TRANSCRIPTION_MAX_RECONNECT_ATTEMPTS = 5;
const OPENAI_REALTIME_TRANSCRIPTION_RECONNECT_DELAY_MS = 1000;
function normalizeProviderConfig(
config: RealtimeTranscriptionProviderConfig,
): OpenAIRealtimeTranscriptionProviderConfig {
@@ -67,228 +66,84 @@ function normalizeProviderConfig(
};
}
class OpenAIRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private static readonly MAX_RECONNECT_ATTEMPTS = 5;
private static readonly RECONNECT_DELAY_MS = 1000;
private static readonly CONNECT_TIMEOUT_MS = 10_000;
function createOpenAIRealtimeTranscriptionSession(
config: OpenAIRealtimeTranscriptionSessionConfig,
): RealtimeTranscriptionSession {
let pendingTranscript = "";
private ws: WebSocket | null = null;
private connected = false;
private closed = false;
private reconnectAttempts = 0;
private pendingTranscript = "";
private readonly flowId = randomUUID();
constructor(private readonly config: OpenAIRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
this.sendEvent({
type: "input_audio_buffer.append",
audio: audio.toString("base64"),
});
}
close(): void {
this.closed = true;
this.connected = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
const url = "wss://api.openai.com/v1/realtime?intent=transcription";
const debugProxy = resolveDebugProxySettings();
const proxyAgent = createDebugProxyWebSocketAgent(debugProxy);
this.ws = new WebSocket(url, {
headers: {
Authorization: `Bearer ${this.config.apiKey}`,
"OpenAI-Beta": "realtime=v1",
},
...(proxyAgent ? { agent: proxyAgent } : {}),
});
const connectTimeout = setTimeout(() => {
reject(new Error("OpenAI realtime transcription connection timeout"));
}, OpenAIRealtimeTranscriptionSession.CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
clearTimeout(connectTimeout);
this.connected = true;
this.reconnectAttempts = 0;
captureWsEvent({
url,
direction: "local",
kind: "ws-open",
flowId: this.flowId,
meta: {
provider: "openai",
capability: "realtime-transcription",
},
});
this.sendEvent({
type: "transcription_session.update",
session: {
input_audio_format: "g711_ulaw",
input_audio_transcription: {
model: this.config.model,
...(this.config.language ? { language: this.config.language } : {}),
...(this.config.prompt ? { prompt: this.config.prompt } : {}),
},
turn_detection: {
type: "server_vad",
threshold: this.config.vadThreshold,
prefix_padding_ms: 300,
silence_duration_ms: this.config.silenceDurationMs,
},
},
});
resolve();
});
this.ws.on("message", (data: Buffer) => {
captureWsEvent({
url,
direction: "inbound",
kind: "ws-frame",
flowId: this.flowId,
payload: data,
meta: {
provider: "openai",
capability: "realtime-transcription",
},
});
try {
this.handleEvent(JSON.parse(data.toString()) as RealtimeEvent);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
captureWsEvent({
url,
direction: "local",
kind: "error",
flowId: this.flowId,
errorText: error instanceof Error ? error.message : String(error),
meta: {
provider: "openai",
capability: "realtime-transcription",
},
});
if (!this.connected) {
clearTimeout(connectTimeout);
reject(error);
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", (code, reasonBuffer) => {
captureOpenAIRealtimeWsClose({
url,
flowId: this.flowId,
capability: "realtime-transcription",
code,
reasonBuffer,
});
this.connected = false;
if (this.closed) {
return;
}
void this.attemptReconnect();
});
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed) {
return;
}
if (this.reconnectAttempts >= OpenAIRealtimeTranscriptionSession.MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(new Error("OpenAI realtime transcription reconnect limit reached"));
return;
}
this.reconnectAttempts += 1;
const delay =
OpenAIRealtimeTranscriptionSession.RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
await new Promise((resolve) => setTimeout(resolve, delay));
if (this.closed) {
return;
}
try {
await this.doConnect();
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
await this.attemptReconnect();
}
}
private handleEvent(event: RealtimeEvent): void {
const handleEvent = (event: RealtimeEvent) => {
switch (event.type) {
case "conversation.item.input_audio_transcription.delta":
if (event.delta) {
this.pendingTranscript += event.delta;
this.config.onPartial?.(this.pendingTranscript);
pendingTranscript += event.delta;
config.onPartial?.(pendingTranscript);
}
return;
case "conversation.item.input_audio_transcription.completed":
if (event.transcript) {
this.config.onTranscript?.(event.transcript);
config.onTranscript?.(event.transcript);
}
this.pendingTranscript = "";
pendingTranscript = "";
return;
case "input_audio_buffer.speech_started":
this.pendingTranscript = "";
this.config.onSpeechStart?.();
pendingTranscript = "";
config.onSpeechStart?.();
return;
case "error": {
const detail = readRealtimeErrorDetail(event.error);
this.config.onError?.(new Error(detail));
config.onError?.(new Error(detail));
return;
}
default:
return;
}
}
};
private sendEvent(event: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
const payload = JSON.stringify(event);
captureWsEvent({
url: "wss://api.openai.com/v1/realtime?intent=transcription",
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: {
provider: "openai",
capability: "realtime-transcription",
return createRealtimeTranscriptionWebSocketSession<RealtimeEvent>({
providerId: "openai",
callbacks: config,
url: OPENAI_REALTIME_TRANSCRIPTION_URL,
headers: {
Authorization: `Bearer ${config.apiKey}`,
"OpenAI-Beta": "realtime=v1",
},
readyOnOpen: true,
connectTimeoutMs: OPENAI_REALTIME_TRANSCRIPTION_CONNECT_TIMEOUT_MS,
maxReconnectAttempts: OPENAI_REALTIME_TRANSCRIPTION_MAX_RECONNECT_ATTEMPTS,
reconnectDelayMs: OPENAI_REALTIME_TRANSCRIPTION_RECONNECT_DELAY_MS,
connectTimeoutMessage: "OpenAI realtime transcription connection timeout",
reconnectLimitMessage: "OpenAI realtime transcription reconnect limit reached",
sendAudio: (audio, transport) => {
transport.sendJson({
type: "input_audio_buffer.append",
audio: audio.toString("base64"),
});
},
onOpen: (transport: RealtimeTranscriptionWebSocketTransport) => {
transport.sendJson({
type: "transcription_session.update",
session: {
input_audio_format: "g711_ulaw",
input_audio_transcription: {
model: config.model,
...(config.language ? { language: config.language } : {}),
...(config.prompt ? { prompt: config.prompt } : {}),
},
turn_detection: {
type: "server_vad",
threshold: config.vadThreshold,
prefix_padding_ms: 300,
silence_duration_ms: config.silenceDurationMs,
},
},
});
this.ws.send(payload);
}
}
},
onMessage: handleEvent,
});
}
export function buildOpenAIRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
@@ -306,7 +161,7 @@ export function buildOpenAIRealtimeTranscriptionProvider(): RealtimeTranscriptio
if (!apiKey) {
throw new Error("OpenAI API key missing");
}
return new OpenAIRealtimeTranscriptionSession({
return createOpenAIRealtimeTranscriptionSession({
...req,
apiKey,
language: config.language,

View File

@@ -0,0 +1,3 @@
{
"specs": ["@mariozechner/pi-ai@0.68.1", "@sinclair/typebox@0.34.49", "ws@^8.20.0"]
}

View File

@@ -1,18 +1,13 @@
import { randomUUID } from "node:crypto";
import {
captureWsEvent,
createDebugProxyWebSocketAgent,
resolveDebugProxySettings,
} from "openclaw/plugin-sdk/proxy-capture";
import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
createRealtimeTranscriptionWebSocketSession,
type RealtimeTranscriptionProviderConfig,
type RealtimeTranscriptionProviderPlugin,
type RealtimeTranscriptionSession,
type RealtimeTranscriptionSessionCreateRequest,
type RealtimeTranscriptionWebSocketTransport,
} from "openclaw/plugin-sdk/realtime-transcription";
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
import WebSocket from "ws";
import { XAI_BASE_URL } from "./model-definitions.js";
type XaiRealtimeTranscriptionEncoding = "pcm" | "mulaw" | "alaw";
@@ -160,307 +155,86 @@ function readTranscriptText(event: XaiRealtimeTranscriptionEvent): string | unde
return normalizeOptionalString(event.text ?? event.transcript);
}
class XaiRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private ws: WebSocket | null = null;
private connected = false;
private ready = false;
private closed = false;
private reconnectAttempts = 0;
private queuedAudio: Buffer[] = [];
private queuedBytes = 0;
private closeTimer: ReturnType<typeof setTimeout> | undefined;
private lastTranscript: string | undefined;
private speechStarted = false;
private reconnecting = false;
private readonly flowId = randomUUID();
function createXaiRealtimeTranscriptionSession(
config: XaiRealtimeTranscriptionSessionConfig,
): RealtimeTranscriptionSession {
let lastTranscript: string | undefined;
let speechStarted = false;
constructor(private readonly config: XaiRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.closed) {
const emitTranscript = (text: string) => {
if (text === lastTranscript) {
return;
}
if (this.ws?.readyState === WebSocket.OPEN && this.ready) {
this.sendAudioFrame(audio);
lastTranscript = text;
config.onTranscript?.(text);
};
const handleEvent = (
event: XaiRealtimeTranscriptionEvent,
transport: RealtimeTranscriptionWebSocketTransport,
) => {
if (event.type === "transcript.created") {
transport.markReady();
return;
}
this.queueAudio(audio);
}
close(): void {
this.closed = true;
this.connected = false;
this.queuedAudio = [];
this.queuedBytes = 0;
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
this.forceClose();
if (!transport.isReady() && event.type === "error") {
transport.failConnect(new Error(readErrorDetail(event.error ?? event.message)));
return;
}
this.sendEvent({ type: "audio.done" });
this.closeTimer = setTimeout(() => this.forceClose(), XAI_REALTIME_STT_CLOSE_TIMEOUT_MS);
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
const url = toXaiRealtimeWsUrl(this.config);
const debugProxy = resolveDebugProxySettings();
const proxyAgent = createDebugProxyWebSocketAgent(debugProxy);
let settled = false;
let opened = false;
const finishConnect = () => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.ready = true;
this.flushQueuedAudio();
resolve();
};
const failConnect = (error: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(connectTimeout);
this.config.onError?.(error);
this.closed = true;
this.forceClose();
reject(error);
};
this.ready = false;
this.ws = new WebSocket(url, {
headers: {
Authorization: `Bearer ${this.config.apiKey}`,
},
...(proxyAgent ? { agent: proxyAgent } : {}),
});
const connectTimeout = setTimeout(() => {
failConnect(new Error("xAI realtime transcription connection timeout"));
}, XAI_REALTIME_STT_CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
opened = true;
this.connected = true;
this.reconnectAttempts = 0;
captureWsEvent({
url,
direction: "local",
kind: "ws-open",
flowId: this.flowId,
meta: { provider: "xai", capability: "realtime-transcription" },
});
});
this.ws.on("message", (data: Buffer) => {
captureWsEvent({
url,
direction: "inbound",
kind: "ws-frame",
flowId: this.flowId,
payload: data,
meta: { provider: "xai", capability: "realtime-transcription" },
});
try {
const event = JSON.parse(data.toString()) as XaiRealtimeTranscriptionEvent;
if (event.type === "transcript.created") {
finishConnect();
return;
}
if (!this.ready && event.type === "error") {
failConnect(new Error(readErrorDetail(event.error ?? event.message)));
return;
}
this.handleEvent(event);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
captureWsEvent({
url,
direction: "local",
kind: "error",
flowId: this.flowId,
errorText: error instanceof Error ? error.message : String(error),
meta: { provider: "xai", capability: "realtime-transcription" },
});
if (!this.ready) {
failConnect(error instanceof Error ? error : new Error(String(error)));
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", () => {
clearTimeout(connectTimeout);
this.connected = false;
this.ready = false;
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
if (this.closed) {
return;
}
if (!opened || !settled) {
return;
}
void this.attemptReconnect();
});
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed) {
return;
}
if (this.reconnecting) {
return;
}
if (this.reconnectAttempts >= XAI_REALTIME_STT_MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(new Error("xAI realtime transcription reconnect limit reached"));
return;
}
this.reconnectAttempts += 1;
const delay = XAI_REALTIME_STT_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
this.reconnecting = true;
try {
await new Promise((resolve) => setTimeout(resolve, delay));
if (this.closed) {
return;
}
await this.doConnect();
} catch {
if (!this.closed) {
this.reconnecting = false;
await this.attemptReconnect();
return;
}
} finally {
this.reconnecting = false;
}
}
private handleEvent(event: XaiRealtimeTranscriptionEvent): void {
switch (event.type) {
case "transcript.partial": {
const text = readTranscriptText(event);
if (!text) {
return;
}
if (!this.speechStarted) {
this.speechStarted = true;
this.config.onSpeechStart?.();
if (!speechStarted) {
speechStarted = true;
config.onSpeechStart?.();
}
if (event.is_final && event.speech_final) {
this.emitTranscript(text);
this.speechStarted = false;
emitTranscript(text);
speechStarted = false;
return;
}
this.config.onPartial?.(text);
config.onPartial?.(text);
return;
}
case "transcript.done": {
const text = readTranscriptText(event);
if (text) {
this.emitTranscript(text);
emitTranscript(text);
}
this.forceClose();
transport.closeNow();
return;
}
case "error":
this.config.onError?.(new Error(readErrorDetail(event.error ?? event.message)));
config.onError?.(new Error(readErrorDetail(event.error ?? event.message)));
return;
default:
return;
}
}
};
private emitTranscript(text: string): void {
if (text === this.lastTranscript) {
return;
}
this.lastTranscript = text;
this.config.onTranscript?.(text);
}
private queueAudio(audio: Buffer): void {
if (audio.byteLength === 0) {
return;
}
this.queuedAudio.push(Buffer.from(audio));
this.queuedBytes += audio.byteLength;
while (this.queuedBytes > XAI_REALTIME_STT_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) {
const dropped = this.queuedAudio.shift();
this.queuedBytes -= dropped?.byteLength ?? 0;
}
}
private flushQueuedAudio(): void {
for (const audio of this.queuedAudio) {
this.sendAudioFrame(audio);
}
this.queuedAudio = [];
this.queuedBytes = 0;
}
private sendAudioFrame(audio: Buffer): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
this.queueAudio(audio);
return;
}
captureWsEvent({
url: toXaiRealtimeWsUrl(this.config),
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload: audio,
meta: { provider: "xai", capability: "realtime-transcription" },
});
this.ws.send(audio);
}
private sendEvent(event: unknown): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
const payload = JSON.stringify(event);
captureWsEvent({
url: toXaiRealtimeWsUrl(this.config),
direction: "outbound",
kind: "ws-frame",
flowId: this.flowId,
payload,
meta: { provider: "xai", capability: "realtime-transcription" },
});
this.ws.send(payload);
}
private forceClose(): void {
if (this.closeTimer) {
clearTimeout(this.closeTimer);
this.closeTimer = undefined;
}
this.connected = false;
this.ready = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
return createRealtimeTranscriptionWebSocketSession<XaiRealtimeTranscriptionEvent>({
providerId: "xai",
callbacks: config,
url: () => toXaiRealtimeWsUrl(config),
headers: { Authorization: `Bearer ${config.apiKey}` },
connectTimeoutMs: XAI_REALTIME_STT_CONNECT_TIMEOUT_MS,
closeTimeoutMs: XAI_REALTIME_STT_CLOSE_TIMEOUT_MS,
maxReconnectAttempts: XAI_REALTIME_STT_MAX_RECONNECT_ATTEMPTS,
reconnectDelayMs: XAI_REALTIME_STT_RECONNECT_DELAY_MS,
maxQueuedBytes: XAI_REALTIME_STT_MAX_QUEUED_BYTES,
connectTimeoutMessage: "xAI realtime transcription connection timeout",
reconnectLimitMessage: "xAI realtime transcription reconnect limit reached",
sendAudio: (audio, transport) => {
transport.sendBinary(audio);
},
onClose: (transport) => {
transport.sendJson({ type: "audio.done" });
},
onMessage: handleEvent,
});
}
export function buildXaiRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
@@ -478,7 +252,7 @@ export function buildXaiRealtimeTranscriptionProvider(): RealtimeTranscriptionPr
if (!apiKey) {
throw new Error("xAI API key missing");
}
return new XaiRealtimeTranscriptionSession({
return createXaiRealtimeTranscriptionSession({
...req,
apiKey,
baseUrl: normalizeXaiRealtimeBaseUrl(config.baseUrl),

View File

@@ -5,6 +5,7 @@ import type {
} from "openclaw/plugin-sdk/media-understanding";
import {
assertOkOrThrowHttpError,
buildAudioTranscriptionFormData,
postTranscriptionRequest,
resolveProviderHttpRequestConfig,
requireTranscriptionText,
@@ -41,19 +42,17 @@ export async function transcribeXaiAudio(
transport: "media-understanding",
});
const form = new FormData();
const blob = new Blob([new Uint8Array(params.buffer)], {
type: params.mime ?? "application/octet-stream",
});
form.append("file", blob, params.fileName || "audio");
const model = normalizeOptionalString(params.model);
if (model) {
form.append("model", model);
}
const language = normalizeOptionalString(params.language);
if (language) {
form.append("language", language);
}
const form = buildAudioTranscriptionFormData({
buffer: params.buffer,
fileName: params.fileName,
mime: params.mime,
fields: {
model,
language,
},
});
const { response, release } = await postTranscriptionRequest({
url: `${baseUrl}/stt`,

View File

@@ -8,6 +8,7 @@ import {
registerProviderPlugin,
requireRegisteredProvider,
} from "../../test/helpers/plugins/provider-registration.js";
import { runRealtimeSttLiveTest } from "../../test/helpers/stt-live-audio.js";
import plugin from "./index.js";
import { XAI_DEFAULT_STT_MODEL } from "./stt.js";
@@ -66,21 +67,6 @@ const registerXaiPlugin = () =>
name: "xAI Provider",
});
async function waitForLiveExpectation(expectation: () => void, timeoutMs = 30_000) {
const started = Date.now();
let lastError: unknown;
while (Date.now() - started < timeoutMs) {
try {
expectation();
return;
} catch (error) {
lastError = error;
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
throw lastError;
}
function normalizeTranscriptForMatch(value: string): string {
return value.toLowerCase().replace(/[^a-z0-9]+/g, "");
}
@@ -216,10 +202,9 @@ describeLive("xai plugin live", () => {
expect(telephony.outputFormat).toBe("pcm");
expect(telephony.sampleRate).toBe(24_000);
const transcripts: string[] = [];
const partials: string[] = [];
const errors: Error[] = [];
const session = realtimeProvider.createSession({
const chunkSize = Math.max(1, Math.floor(telephony.sampleRate * 2 * 0.1));
const { transcripts, partials } = await runRealtimeSttLiveTest({
provider: realtimeProvider,
providerConfig: {
apiKey: XAI_API_KEY,
baseUrl: "https://api.x.ai/v1",
@@ -229,26 +214,12 @@ describeLive("xai plugin live", () => {
endpointingMs: 500,
language: "en",
},
onPartial: (partial) => partials.push(partial),
onTranscript: (transcript) => transcripts.push(transcript),
onError: (error) => errors.push(error),
audio: telephony.audioBuffer,
chunkSize,
delayMs: 20,
closeBeforeWait: true,
});
await session.connect();
const audio = telephony.audioBuffer;
const chunkSize = Math.max(1, Math.floor(telephony.sampleRate * 2 * 0.1));
for (let offset = 0; offset < audio.byteLength; offset += chunkSize) {
session.sendAudio(audio.subarray(offset, offset + chunkSize));
await new Promise((resolve) => setTimeout(resolve, 20));
}
session.close();
await waitForLiveExpectation(() => {
if (errors[0]) {
throw errors[0];
}
expect(normalizeTranscriptForMatch(transcripts.join(" "))).toContain("openclaw");
}, 60_000);
const normalized = transcripts.join(" ").toLowerCase();
const compact = normalizeTranscriptForMatch(normalized);
expect(compact).toContain("openclaw");