Files
openclaw/extensions/openai/realtime-transcription-provider.ts
2026-04-06 14:53:55 +01:00

274 lines
8.0 KiB
TypeScript

import type {
RealtimeTranscriptionProviderConfig,
RealtimeTranscriptionProviderPlugin,
RealtimeTranscriptionSession,
RealtimeTranscriptionSessionCreateRequest,
} from "openclaw/plugin-sdk/realtime-transcription";
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
import WebSocket from "ws";
type OpenAIRealtimeTranscriptionProviderConfig = {
apiKey?: string;
model?: string;
silenceDurationMs?: number;
vadThreshold?: number;
};
type OpenAIRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
apiKey: string;
model: string;
silenceDurationMs: number;
vadThreshold: number;
};
type RealtimeEvent = {
type: string;
delta?: string;
transcript?: string;
error?: unknown;
};
function trimToUndefined(value: unknown): string | undefined {
return typeof value === "string" && value.trim() ? value.trim() : undefined;
}
function asNumber(value: unknown): number | undefined {
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function asObject(value: unknown): Record<string, unknown> | undefined {
return typeof value === "object" && value !== null && !Array.isArray(value)
? (value as Record<string, unknown>)
: undefined;
}
function readRealtimeErrorDetail(error: unknown): string {
if (typeof error === "string" && error) {
return error;
}
const message = asObject(error)?.message;
if (typeof message === "string" && message) {
return message;
}
return "Unknown error";
}
function normalizeProviderConfig(
config: RealtimeTranscriptionProviderConfig,
): OpenAIRealtimeTranscriptionProviderConfig {
const providers = asObject(config.providers);
const raw = asObject(providers?.openai) ?? asObject(config.openai) ?? asObject(config);
return {
apiKey:
normalizeResolvedSecretInputString({
value: raw?.apiKey,
path: "plugins.entries.voice-call.config.streaming.providers.openai.apiKey",
}) ??
normalizeResolvedSecretInputString({
value: raw?.openaiApiKey,
path: "plugins.entries.voice-call.config.streaming.openaiApiKey",
}),
model: trimToUndefined(raw?.model) ?? trimToUndefined(raw?.sttModel),
silenceDurationMs: asNumber(raw?.silenceDurationMs),
vadThreshold: asNumber(raw?.vadThreshold),
};
}
function readProviderConfig(
providerConfig: RealtimeTranscriptionProviderConfig,
): OpenAIRealtimeTranscriptionProviderConfig {
return normalizeProviderConfig(providerConfig);
}
class OpenAIRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
private static readonly MAX_RECONNECT_ATTEMPTS = 5;
private static readonly RECONNECT_DELAY_MS = 1000;
private static readonly CONNECT_TIMEOUT_MS = 10_000;
private ws: WebSocket | null = null;
private connected = false;
private closed = false;
private reconnectAttempts = 0;
private pendingTranscript = "";
constructor(private readonly config: OpenAIRealtimeTranscriptionSessionConfig) {}
async connect(): Promise<void> {
this.closed = false;
this.reconnectAttempts = 0;
await this.doConnect();
}
sendAudio(audio: Buffer): void {
if (this.ws?.readyState !== WebSocket.OPEN) {
return;
}
this.sendEvent({
type: "input_audio_buffer.append",
audio: audio.toString("base64"),
});
}
close(): void {
this.closed = true;
this.connected = false;
if (this.ws) {
this.ws.close(1000, "Transcription session closed");
this.ws = null;
}
}
isConnected(): boolean {
return this.connected;
}
private async doConnect(): Promise<void> {
await new Promise<void>((resolve, reject) => {
this.ws = new WebSocket("wss://api.openai.com/v1/realtime?intent=transcription", {
headers: {
Authorization: `Bearer ${this.config.apiKey}`,
"OpenAI-Beta": "realtime=v1",
},
});
const connectTimeout = setTimeout(() => {
reject(new Error("OpenAI realtime transcription connection timeout"));
}, OpenAIRealtimeTranscriptionSession.CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
clearTimeout(connectTimeout);
this.connected = true;
this.reconnectAttempts = 0;
this.sendEvent({
type: "transcription_session.update",
session: {
input_audio_format: "g711_ulaw",
input_audio_transcription: {
model: this.config.model,
},
turn_detection: {
type: "server_vad",
threshold: this.config.vadThreshold,
prefix_padding_ms: 300,
silence_duration_ms: this.config.silenceDurationMs,
},
},
});
resolve();
});
this.ws.on("message", (data: Buffer) => {
try {
this.handleEvent(JSON.parse(data.toString()) as RealtimeEvent);
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
}
});
this.ws.on("error", (error) => {
if (!this.connected) {
clearTimeout(connectTimeout);
reject(error);
return;
}
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
});
this.ws.on("close", () => {
this.connected = false;
if (this.closed) {
return;
}
void this.attemptReconnect();
});
});
}
private async attemptReconnect(): Promise<void> {
if (this.closed) {
return;
}
if (this.reconnectAttempts >= OpenAIRealtimeTranscriptionSession.MAX_RECONNECT_ATTEMPTS) {
this.config.onError?.(new Error("OpenAI realtime transcription reconnect limit reached"));
return;
}
this.reconnectAttempts += 1;
const delay =
OpenAIRealtimeTranscriptionSession.RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
await new Promise((resolve) => setTimeout(resolve, delay));
if (this.closed) {
return;
}
try {
await this.doConnect();
} catch (error) {
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
await this.attemptReconnect();
}
}
private handleEvent(event: RealtimeEvent): void {
switch (event.type) {
case "conversation.item.input_audio_transcription.delta":
if (event.delta) {
this.pendingTranscript += event.delta;
this.config.onPartial?.(this.pendingTranscript);
}
return;
case "conversation.item.input_audio_transcription.completed":
if (event.transcript) {
this.config.onTranscript?.(event.transcript);
}
this.pendingTranscript = "";
return;
case "input_audio_buffer.speech_started":
this.pendingTranscript = "";
this.config.onSpeechStart?.();
return;
case "error": {
const detail = readRealtimeErrorDetail(event.error);
this.config.onError?.(new Error(detail));
return;
}
default:
return;
}
}
private sendEvent(event: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(event));
}
}
}
export function buildOpenAIRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
return {
id: "openai",
label: "OpenAI Realtime Transcription",
aliases: ["openai-realtime"],
autoSelectOrder: 10,
resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig),
isConfigured: ({ providerConfig }) =>
Boolean(readProviderConfig(providerConfig).apiKey || process.env.OPENAI_API_KEY),
createSession: (req) => {
const config = readProviderConfig(req.providerConfig);
const apiKey = config.apiKey || process.env.OPENAI_API_KEY;
if (!apiKey) {
throw new Error("OpenAI API key missing");
}
return new OpenAIRealtimeTranscriptionSession({
...req,
apiKey,
model: config.model ?? "gpt-4o-transcribe",
silenceDurationMs: config.silenceDurationMs ?? 800,
vadThreshold: config.vadThreshold ?? 0.5,
});
},
};
}