mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 14:50:45 +00:00
feat: add xai realtime transcription
This commit is contained in:
@@ -50,8 +50,8 @@ function registerXaiAutoEnableProbe(): XaiAutoEnableProbe {
|
||||
}
|
||||
|
||||
describe("xai provider plugin", () => {
|
||||
it("registers xAI media understanding for batch STT", async () => {
|
||||
const { mediaProviders } = await registerProviderPlugin({
|
||||
it("registers xAI speech providers for batch and streaming STT", async () => {
|
||||
const { mediaProviders, realtimeTranscriptionProviders } = await registerProviderPlugin({
|
||||
plugin,
|
||||
id: "xai",
|
||||
name: "xAI Provider",
|
||||
@@ -66,6 +66,15 @@ describe("xai provider plugin", () => {
|
||||
}),
|
||||
]),
|
||||
);
|
||||
expect(realtimeTranscriptionProviders).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
id: "xai",
|
||||
label: "xAI Realtime Transcription",
|
||||
aliases: expect.arrayContaining(["xai-realtime"]),
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it("declares setup auto-enable reasons for plugin-owned tool config", () => {
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
import { applyXaiConfig, XAI_DEFAULT_MODEL_REF } from "./onboard.js";
|
||||
import { buildXaiProvider } from "./provider-catalog.js";
|
||||
import { isModernXaiModel, resolveXaiForwardCompatModel } from "./provider-models.js";
|
||||
import { buildXaiRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
|
||||
import { buildXaiSpeechProvider } from "./speech-provider.js";
|
||||
import { resolveFallbackXaiAuth } from "./src/tool-auth-shared.js";
|
||||
import { resolveEffectiveXSearchConfig } from "./src/x-search-config.js";
|
||||
@@ -209,6 +210,7 @@ export default defineSingleProviderPluginEntry({
|
||||
api.registerVideoGenerationProvider(buildXaiVideoGenerationProvider());
|
||||
api.registerImageGenerationProvider(buildXaiImageGenerationProvider());
|
||||
api.registerSpeechProvider(buildXaiSpeechProvider());
|
||||
api.registerRealtimeTranscriptionProvider(buildXaiRealtimeTranscriptionProvider());
|
||||
api.registerTool((ctx) => createLazyCodeExecutionTool(ctx), { name: "code_execution" });
|
||||
api.registerTool((ctx) => createLazyXSearchTool(ctx), { name: "x_search" });
|
||||
},
|
||||
|
||||
@@ -87,6 +87,7 @@
|
||||
"videoGenerationProviders": ["xai"],
|
||||
"mediaUnderstandingProviders": ["xai"],
|
||||
"speechProviders": ["xai"],
|
||||
"realtimeTranscriptionProviders": ["xai"],
|
||||
"imageGenerationProviders": ["xai"],
|
||||
"tools": ["code_execution", "x_search"]
|
||||
},
|
||||
|
||||
201
extensions/xai/realtime-transcription-provider.test.ts
Normal file
201
extensions/xai/realtime-transcription-provider.test.ts
Normal file
@@ -0,0 +1,201 @@
|
||||
import { createServer } from "node:http";
|
||||
import type { AddressInfo } from "node:net";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import type WebSocket from "ws";
|
||||
import { WebSocketServer } from "ws";
|
||||
import { buildXaiRealtimeTranscriptionProvider } from "./realtime-transcription-provider.js";
|
||||
|
||||
let cleanup: (() => Promise<void>) | undefined;
|
||||
|
||||
afterEach(async () => {
|
||||
await cleanup?.();
|
||||
cleanup = undefined;
|
||||
});
|
||||
|
||||
async function createRealtimeSttServer(params?: {
|
||||
onRequest?: (url: URL) => void;
|
||||
onBinary?: (audio: Buffer) => void;
|
||||
initialEvent?: unknown;
|
||||
}) {
|
||||
const server = createServer();
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
const clients = new Set<WebSocket>();
|
||||
const done = vi.fn();
|
||||
|
||||
server.on("upgrade", (request, socket, head) => {
|
||||
const url = new URL(request.url ?? "/", "http://127.0.0.1");
|
||||
params?.onRequest?.(url);
|
||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
clients.add(ws);
|
||||
ws.on("close", () => clients.delete(ws));
|
||||
ws.send(JSON.stringify(params?.initialEvent ?? { type: "transcript.created" }));
|
||||
ws.on("message", (data, isBinary) => {
|
||||
const buffer = Buffer.isBuffer(data)
|
||||
? data
|
||||
: Array.isArray(data)
|
||||
? Buffer.concat(data)
|
||||
: Buffer.from(data);
|
||||
if (isBinary) {
|
||||
params?.onBinary?.(buffer);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "transcript.partial",
|
||||
text: "hello openclaw",
|
||||
is_final: false,
|
||||
speech_final: false,
|
||||
}),
|
||||
);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "transcript.partial",
|
||||
text: "hello openclaw final",
|
||||
is_final: true,
|
||||
speech_final: true,
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const event = JSON.parse(buffer.toString()) as { type?: string };
|
||||
if (event.type === "audio.done") {
|
||||
ws.send(JSON.stringify({ type: "transcript.done", text: "hello openclaw final" }));
|
||||
done();
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => server.listen(0, "127.0.0.1", resolve));
|
||||
const port = (server.address() as AddressInfo).port;
|
||||
cleanup = async () => {
|
||||
for (const ws of clients) {
|
||||
ws.terminate();
|
||||
}
|
||||
await new Promise<void>((resolve) => wss.close(() => resolve()));
|
||||
await new Promise<void>((resolve) => server.close(() => resolve()));
|
||||
};
|
||||
return { baseUrl: `http://127.0.0.1:${port}/v1`, done };
|
||||
}
|
||||
|
||||
async function waitFor(expectation: () => void) {
|
||||
const started = Date.now();
|
||||
let lastError: unknown;
|
||||
while (Date.now() - started < 3000) {
|
||||
try {
|
||||
expectation();
|
||||
return;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
await new Promise((resolve) => setTimeout(resolve, 25));
|
||||
}
|
||||
}
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
describe("xai realtime transcription provider", () => {
|
||||
it("normalizes provider config for voice-call streaming", () => {
|
||||
const provider = buildXaiRealtimeTranscriptionProvider();
|
||||
|
||||
expect(
|
||||
provider.resolveConfig?.({
|
||||
cfg: {} as never,
|
||||
rawConfig: {
|
||||
providers: {
|
||||
xai: {
|
||||
apiKey: "xai-test-key",
|
||||
baseUrl: "https://api.x.ai/v1",
|
||||
sampleRate: 24000,
|
||||
encoding: "pcm",
|
||||
interimResults: false,
|
||||
endpointingMs: 500,
|
||||
language: "en",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
).toEqual({
|
||||
apiKey: "xai-test-key",
|
||||
baseUrl: "https://api.x.ai/v1",
|
||||
sampleRate: 24000,
|
||||
encoding: "pcm",
|
||||
interimResults: false,
|
||||
endpointingMs: 500,
|
||||
language: "en",
|
||||
});
|
||||
});
|
||||
|
||||
it("streams raw binary audio and maps partial and final transcript events", async () => {
|
||||
const binaryFrames: Buffer[] = [];
|
||||
const requestUrls: URL[] = [];
|
||||
const server = await createRealtimeSttServer({
|
||||
onRequest: (url) => requestUrls.push(url),
|
||||
onBinary: (audio) => binaryFrames.push(audio),
|
||||
});
|
||||
const provider = buildXaiRealtimeTranscriptionProvider();
|
||||
const onPartial = vi.fn();
|
||||
const onTranscript = vi.fn();
|
||||
const onSpeechStart = vi.fn();
|
||||
|
||||
const session = provider.createSession({
|
||||
providerConfig: {
|
||||
apiKey: "xai-test-key",
|
||||
baseUrl: server.baseUrl,
|
||||
sampleRate: 24000,
|
||||
encoding: "pcm",
|
||||
endpointingMs: 500,
|
||||
},
|
||||
onPartial,
|
||||
onTranscript,
|
||||
onSpeechStart,
|
||||
});
|
||||
|
||||
session.sendAudio(Buffer.from("queued-before-ready"));
|
||||
await session.connect();
|
||||
session.sendAudio(Buffer.from("after-ready"));
|
||||
await waitFor(() => expect(onTranscript).toHaveBeenCalledWith("hello openclaw final"));
|
||||
session.close();
|
||||
await waitFor(() => expect(server.done).toHaveBeenCalled());
|
||||
|
||||
expect(requestUrls[0]?.pathname).toBe("/v1/stt");
|
||||
expect(requestUrls[0]?.searchParams.get("sample_rate")).toBe("24000");
|
||||
expect(requestUrls[0]?.searchParams.get("encoding")).toBe("pcm");
|
||||
expect(requestUrls[0]?.searchParams.get("interim_results")).toBe("true");
|
||||
expect(requestUrls[0]?.searchParams.get("endpointing")).toBe("500");
|
||||
expect(Buffer.concat(binaryFrames).toString()).toContain("queued-before-ready");
|
||||
expect(Buffer.concat(binaryFrames).toString()).toContain("after-ready");
|
||||
expect(onSpeechStart).toHaveBeenCalled();
|
||||
expect(onPartial).toHaveBeenCalledWith("hello openclaw");
|
||||
});
|
||||
|
||||
it("rejects setup errors before the stream is ready", async () => {
|
||||
const server = await createRealtimeSttServer({
|
||||
initialEvent: {
|
||||
type: "error",
|
||||
error: {
|
||||
message: "Streaming ASR unavailable",
|
||||
},
|
||||
},
|
||||
});
|
||||
const provider = buildXaiRealtimeTranscriptionProvider();
|
||||
const onError = vi.fn();
|
||||
|
||||
const session = provider.createSession({
|
||||
providerConfig: {
|
||||
apiKey: "xai-test-key",
|
||||
baseUrl: server.baseUrl,
|
||||
},
|
||||
onError,
|
||||
});
|
||||
|
||||
await expect(session.connect()).rejects.toThrow("Streaming ASR unavailable");
|
||||
expect(session.isConnected()).toBe(false);
|
||||
expect(onError).toHaveBeenCalledWith(expect.any(Error));
|
||||
expect(onError.mock.calls[0]?.[0].message).toBe("Streaming ASR unavailable");
|
||||
});
|
||||
|
||||
it("accepts xAI realtime aliases", () => {
|
||||
const provider = buildXaiRealtimeTranscriptionProvider();
|
||||
expect(provider.aliases).toEqual(
|
||||
expect.arrayContaining(["xai-realtime", "grok-stt-streaming"]),
|
||||
);
|
||||
});
|
||||
});
|
||||
493
extensions/xai/realtime-transcription-provider.ts
Normal file
493
extensions/xai/realtime-transcription-provider.ts
Normal file
@@ -0,0 +1,493 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import {
|
||||
captureWsEvent,
|
||||
createDebugProxyWebSocketAgent,
|
||||
resolveDebugProxySettings,
|
||||
} from "openclaw/plugin-sdk/proxy-capture";
|
||||
import type {
|
||||
RealtimeTranscriptionProviderConfig,
|
||||
RealtimeTranscriptionProviderPlugin,
|
||||
RealtimeTranscriptionSession,
|
||||
RealtimeTranscriptionSessionCreateRequest,
|
||||
} from "openclaw/plugin-sdk/realtime-transcription";
|
||||
import { normalizeResolvedSecretInputString } from "openclaw/plugin-sdk/secret-input";
|
||||
import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime";
|
||||
import WebSocket from "ws";
|
||||
import { XAI_BASE_URL } from "./model-definitions.js";
|
||||
|
||||
type XaiRealtimeTranscriptionEncoding = "pcm" | "mulaw" | "alaw";
|
||||
|
||||
type XaiRealtimeTranscriptionProviderConfig = {
|
||||
apiKey?: string;
|
||||
baseUrl?: string;
|
||||
sampleRate?: number;
|
||||
encoding?: XaiRealtimeTranscriptionEncoding;
|
||||
interimResults?: boolean;
|
||||
endpointingMs?: number;
|
||||
language?: string;
|
||||
};
|
||||
|
||||
type XaiRealtimeTranscriptionSessionConfig = RealtimeTranscriptionSessionCreateRequest & {
|
||||
apiKey: string;
|
||||
baseUrl: string;
|
||||
sampleRate: number;
|
||||
encoding: XaiRealtimeTranscriptionEncoding;
|
||||
interimResults: boolean;
|
||||
endpointingMs: number;
|
||||
language?: string;
|
||||
};
|
||||
|
||||
type XaiRealtimeTranscriptionEvent = {
|
||||
type?: string;
|
||||
text?: string;
|
||||
transcript?: string;
|
||||
is_final?: boolean;
|
||||
speech_final?: boolean;
|
||||
error?: unknown;
|
||||
message?: string;
|
||||
};
|
||||
|
||||
const XAI_REALTIME_STT_DEFAULT_SAMPLE_RATE = 8000;
|
||||
const XAI_REALTIME_STT_DEFAULT_ENCODING: XaiRealtimeTranscriptionEncoding = "mulaw";
|
||||
const XAI_REALTIME_STT_DEFAULT_ENDPOINTING_MS = 800;
|
||||
const XAI_REALTIME_STT_CONNECT_TIMEOUT_MS = 10_000;
|
||||
const XAI_REALTIME_STT_CLOSE_TIMEOUT_MS = 5_000;
|
||||
const XAI_REALTIME_STT_MAX_RECONNECT_ATTEMPTS = 5;
|
||||
const XAI_REALTIME_STT_RECONNECT_DELAY_MS = 1000;
|
||||
const XAI_REALTIME_STT_MAX_QUEUED_BYTES = 2 * 1024 * 1024;
|
||||
|
||||
function readRecord(value: unknown): Record<string, unknown> | undefined {
|
||||
return value && typeof value === "object" ? (value as Record<string, unknown>) : undefined;
|
||||
}
|
||||
|
||||
function readNestedXaiConfig(rawConfig: RealtimeTranscriptionProviderConfig) {
|
||||
const raw = readRecord(rawConfig);
|
||||
const providers = readRecord(raw?.providers);
|
||||
return readRecord(providers?.xai ?? raw?.xai ?? raw) ?? {};
|
||||
}
|
||||
|
||||
function readFiniteNumber(value: unknown): number | undefined {
|
||||
const next =
|
||||
typeof value === "number"
|
||||
? value
|
||||
: typeof value === "string"
|
||||
? Number.parseFloat(value)
|
||||
: undefined;
|
||||
return Number.isFinite(next) ? next : undefined;
|
||||
}
|
||||
|
||||
function readBoolean(value: unknown): boolean | undefined {
|
||||
if (typeof value === "boolean") {
|
||||
return value;
|
||||
}
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
const normalized = value.trim().toLowerCase();
|
||||
if (["1", "true", "yes", "on"].includes(normalized)) {
|
||||
return true;
|
||||
}
|
||||
if (["0", "false", "no", "off"].includes(normalized)) {
|
||||
return false;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
function normalizeEncoding(value: unknown): XaiRealtimeTranscriptionEncoding | undefined {
|
||||
const normalized = normalizeOptionalString(value)?.toLowerCase();
|
||||
if (!normalized) {
|
||||
return undefined;
|
||||
}
|
||||
if (normalized === "ulaw" || normalized === "g711_ulaw" || normalized === "g711-mulaw") {
|
||||
return "mulaw";
|
||||
}
|
||||
if (normalized === "g711_alaw" || normalized === "g711-alaw") {
|
||||
return "alaw";
|
||||
}
|
||||
if (normalized === "pcm" || normalized === "mulaw" || normalized === "alaw") {
|
||||
return normalized;
|
||||
}
|
||||
throw new Error(`Invalid xAI realtime transcription encoding: ${normalized}`);
|
||||
}
|
||||
|
||||
function normalizeXaiRealtimeBaseUrl(value?: string): string {
|
||||
return normalizeOptionalString(value ?? process.env.XAI_BASE_URL) ?? XAI_BASE_URL;
|
||||
}
|
||||
|
||||
function toXaiRealtimeWsUrl(config: XaiRealtimeTranscriptionSessionConfig): string {
|
||||
const url = new URL(normalizeXaiRealtimeBaseUrl(config.baseUrl));
|
||||
url.protocol = url.protocol === "http:" ? "ws:" : "wss:";
|
||||
url.pathname = `${url.pathname.replace(/\/+$/, "")}/stt`;
|
||||
url.searchParams.set("sample_rate", String(config.sampleRate));
|
||||
url.searchParams.set("encoding", config.encoding);
|
||||
url.searchParams.set("interim_results", String(config.interimResults));
|
||||
url.searchParams.set("endpointing", String(config.endpointingMs));
|
||||
if (config.language) {
|
||||
url.searchParams.set("language", config.language);
|
||||
}
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
function normalizeProviderConfig(
|
||||
config: RealtimeTranscriptionProviderConfig,
|
||||
): XaiRealtimeTranscriptionProviderConfig {
|
||||
const raw = readNestedXaiConfig(config);
|
||||
return {
|
||||
apiKey: normalizeResolvedSecretInputString({
|
||||
value: raw.apiKey,
|
||||
path: "plugins.entries.voice-call.config.streaming.providers.xai.apiKey",
|
||||
}),
|
||||
baseUrl: normalizeOptionalString(raw.baseUrl),
|
||||
sampleRate: readFiniteNumber(raw.sampleRate ?? raw.sample_rate),
|
||||
encoding: normalizeEncoding(raw.encoding),
|
||||
interimResults: readBoolean(raw.interimResults ?? raw.interim_results),
|
||||
endpointingMs: readFiniteNumber(raw.endpointingMs ?? raw.endpointing ?? raw.silenceDurationMs),
|
||||
language: normalizeOptionalString(raw.language),
|
||||
};
|
||||
}
|
||||
|
||||
function readErrorDetail(value: unknown): string {
|
||||
if (typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
const record = readRecord(value);
|
||||
const message = normalizeOptionalString(record?.message);
|
||||
const code = normalizeOptionalString(record?.code);
|
||||
return message ?? code ?? "xAI realtime transcription error";
|
||||
}
|
||||
|
||||
function readTranscriptText(event: XaiRealtimeTranscriptionEvent): string | undefined {
|
||||
return normalizeOptionalString(event.text ?? event.transcript);
|
||||
}
|
||||
|
||||
class XaiRealtimeTranscriptionSession implements RealtimeTranscriptionSession {
|
||||
private ws: WebSocket | null = null;
|
||||
private connected = false;
|
||||
private ready = false;
|
||||
private closed = false;
|
||||
private reconnectAttempts = 0;
|
||||
private queuedAudio: Buffer[] = [];
|
||||
private queuedBytes = 0;
|
||||
private closeTimer: ReturnType<typeof setTimeout> | undefined;
|
||||
private lastTranscript: string | undefined;
|
||||
private speechStarted = false;
|
||||
private reconnecting = false;
|
||||
private readonly flowId = randomUUID();
|
||||
|
||||
constructor(private readonly config: XaiRealtimeTranscriptionSessionConfig) {}
|
||||
|
||||
async connect(): Promise<void> {
|
||||
this.closed = false;
|
||||
this.reconnectAttempts = 0;
|
||||
await this.doConnect();
|
||||
}
|
||||
|
||||
sendAudio(audio: Buffer): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
if (this.ws?.readyState === WebSocket.OPEN && this.ready) {
|
||||
this.sendAudioFrame(audio);
|
||||
return;
|
||||
}
|
||||
this.queueAudio(audio);
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.closed = true;
|
||||
this.connected = false;
|
||||
this.queuedAudio = [];
|
||||
this.queuedBytes = 0;
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
this.forceClose();
|
||||
return;
|
||||
}
|
||||
this.sendEvent({ type: "audio.done" });
|
||||
this.closeTimer = setTimeout(() => this.forceClose(), XAI_REALTIME_STT_CLOSE_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
isConnected(): boolean {
|
||||
return this.connected;
|
||||
}
|
||||
|
||||
private async doConnect(): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const url = toXaiRealtimeWsUrl(this.config);
|
||||
const debugProxy = resolveDebugProxySettings();
|
||||
const proxyAgent = createDebugProxyWebSocketAgent(debugProxy);
|
||||
let settled = false;
|
||||
let opened = false;
|
||||
const finishConnect = () => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(connectTimeout);
|
||||
this.ready = true;
|
||||
this.flushQueuedAudio();
|
||||
resolve();
|
||||
};
|
||||
const failConnect = (error: Error) => {
|
||||
if (settled) {
|
||||
return;
|
||||
}
|
||||
settled = true;
|
||||
clearTimeout(connectTimeout);
|
||||
this.config.onError?.(error);
|
||||
this.closed = true;
|
||||
this.forceClose();
|
||||
reject(error);
|
||||
};
|
||||
this.ready = false;
|
||||
this.ws = new WebSocket(url, {
|
||||
headers: {
|
||||
Authorization: `Bearer ${this.config.apiKey}`,
|
||||
},
|
||||
...(proxyAgent ? { agent: proxyAgent } : {}),
|
||||
});
|
||||
|
||||
const connectTimeout = setTimeout(() => {
|
||||
failConnect(new Error("xAI realtime transcription connection timeout"));
|
||||
}, XAI_REALTIME_STT_CONNECT_TIMEOUT_MS);
|
||||
|
||||
this.ws.on("open", () => {
|
||||
opened = true;
|
||||
this.connected = true;
|
||||
this.reconnectAttempts = 0;
|
||||
captureWsEvent({
|
||||
url,
|
||||
direction: "local",
|
||||
kind: "ws-open",
|
||||
flowId: this.flowId,
|
||||
meta: { provider: "xai", capability: "realtime-transcription" },
|
||||
});
|
||||
});
|
||||
|
||||
this.ws.on("message", (data: Buffer) => {
|
||||
captureWsEvent({
|
||||
url,
|
||||
direction: "inbound",
|
||||
kind: "ws-frame",
|
||||
flowId: this.flowId,
|
||||
payload: data,
|
||||
meta: { provider: "xai", capability: "realtime-transcription" },
|
||||
});
|
||||
try {
|
||||
const event = JSON.parse(data.toString()) as XaiRealtimeTranscriptionEvent;
|
||||
if (event.type === "transcript.created") {
|
||||
finishConnect();
|
||||
return;
|
||||
}
|
||||
if (!this.ready && event.type === "error") {
|
||||
failConnect(new Error(readErrorDetail(event.error ?? event.message)));
|
||||
return;
|
||||
}
|
||||
this.handleEvent(event);
|
||||
} catch (error) {
|
||||
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
|
||||
}
|
||||
});
|
||||
|
||||
this.ws.on("error", (error) => {
|
||||
captureWsEvent({
|
||||
url,
|
||||
direction: "local",
|
||||
kind: "error",
|
||||
flowId: this.flowId,
|
||||
errorText: error instanceof Error ? error.message : String(error),
|
||||
meta: { provider: "xai", capability: "realtime-transcription" },
|
||||
});
|
||||
if (!this.ready) {
|
||||
failConnect(error instanceof Error ? error : new Error(String(error)));
|
||||
return;
|
||||
}
|
||||
this.config.onError?.(error instanceof Error ? error : new Error(String(error)));
|
||||
});
|
||||
|
||||
this.ws.on("close", () => {
|
||||
clearTimeout(connectTimeout);
|
||||
this.connected = false;
|
||||
this.ready = false;
|
||||
if (this.closeTimer) {
|
||||
clearTimeout(this.closeTimer);
|
||||
this.closeTimer = undefined;
|
||||
}
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
if (!opened || !settled) {
|
||||
return;
|
||||
}
|
||||
void this.attemptReconnect();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private async attemptReconnect(): Promise<void> {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
if (this.reconnecting) {
|
||||
return;
|
||||
}
|
||||
if (this.reconnectAttempts >= XAI_REALTIME_STT_MAX_RECONNECT_ATTEMPTS) {
|
||||
this.config.onError?.(new Error("xAI realtime transcription reconnect limit reached"));
|
||||
return;
|
||||
}
|
||||
this.reconnectAttempts += 1;
|
||||
const delay = XAI_REALTIME_STT_RECONNECT_DELAY_MS * 2 ** (this.reconnectAttempts - 1);
|
||||
this.reconnecting = true;
|
||||
try {
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
await this.doConnect();
|
||||
} catch {
|
||||
if (!this.closed) {
|
||||
this.reconnecting = false;
|
||||
await this.attemptReconnect();
|
||||
return;
|
||||
}
|
||||
} finally {
|
||||
this.reconnecting = false;
|
||||
}
|
||||
}
|
||||
|
||||
private handleEvent(event: XaiRealtimeTranscriptionEvent): void {
|
||||
switch (event.type) {
|
||||
case "transcript.partial": {
|
||||
const text = readTranscriptText(event);
|
||||
if (!text) {
|
||||
return;
|
||||
}
|
||||
if (!this.speechStarted) {
|
||||
this.speechStarted = true;
|
||||
this.config.onSpeechStart?.();
|
||||
}
|
||||
if (event.is_final && event.speech_final) {
|
||||
this.emitTranscript(text);
|
||||
this.speechStarted = false;
|
||||
return;
|
||||
}
|
||||
this.config.onPartial?.(text);
|
||||
return;
|
||||
}
|
||||
case "transcript.done": {
|
||||
const text = readTranscriptText(event);
|
||||
if (text) {
|
||||
this.emitTranscript(text);
|
||||
}
|
||||
this.forceClose();
|
||||
return;
|
||||
}
|
||||
case "error":
|
||||
this.config.onError?.(new Error(readErrorDetail(event.error ?? event.message)));
|
||||
return;
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private emitTranscript(text: string): void {
|
||||
if (text === this.lastTranscript) {
|
||||
return;
|
||||
}
|
||||
this.lastTranscript = text;
|
||||
this.config.onTranscript?.(text);
|
||||
}
|
||||
|
||||
private queueAudio(audio: Buffer): void {
|
||||
if (audio.byteLength === 0) {
|
||||
return;
|
||||
}
|
||||
this.queuedAudio.push(Buffer.from(audio));
|
||||
this.queuedBytes += audio.byteLength;
|
||||
while (this.queuedBytes > XAI_REALTIME_STT_MAX_QUEUED_BYTES && this.queuedAudio.length > 0) {
|
||||
const dropped = this.queuedAudio.shift();
|
||||
this.queuedBytes -= dropped?.byteLength ?? 0;
|
||||
}
|
||||
}
|
||||
|
||||
private flushQueuedAudio(): void {
|
||||
for (const audio of this.queuedAudio) {
|
||||
this.sendAudioFrame(audio);
|
||||
}
|
||||
this.queuedAudio = [];
|
||||
this.queuedBytes = 0;
|
||||
}
|
||||
|
||||
private sendAudioFrame(audio: Buffer): void {
|
||||
if (this.ws?.readyState !== WebSocket.OPEN) {
|
||||
this.queueAudio(audio);
|
||||
return;
|
||||
}
|
||||
captureWsEvent({
|
||||
url: toXaiRealtimeWsUrl(this.config),
|
||||
direction: "outbound",
|
||||
kind: "ws-frame",
|
||||
flowId: this.flowId,
|
||||
payload: audio,
|
||||
meta: { provider: "xai", capability: "realtime-transcription" },
|
||||
});
|
||||
this.ws.send(audio);
|
||||
}
|
||||
|
||||
private sendEvent(event: unknown): void {
|
||||
if (this.ws?.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
const payload = JSON.stringify(event);
|
||||
captureWsEvent({
|
||||
url: toXaiRealtimeWsUrl(this.config),
|
||||
direction: "outbound",
|
||||
kind: "ws-frame",
|
||||
flowId: this.flowId,
|
||||
payload,
|
||||
meta: { provider: "xai", capability: "realtime-transcription" },
|
||||
});
|
||||
this.ws.send(payload);
|
||||
}
|
||||
|
||||
private forceClose(): void {
|
||||
if (this.closeTimer) {
|
||||
clearTimeout(this.closeTimer);
|
||||
this.closeTimer = undefined;
|
||||
}
|
||||
this.connected = false;
|
||||
this.ready = false;
|
||||
if (this.ws) {
|
||||
this.ws.close(1000, "Transcription session closed");
|
||||
this.ws = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function buildXaiRealtimeTranscriptionProvider(): RealtimeTranscriptionProviderPlugin {
|
||||
return {
|
||||
id: "xai",
|
||||
label: "xAI Realtime Transcription",
|
||||
aliases: ["xai-realtime", "grok-stt-streaming"],
|
||||
autoSelectOrder: 25,
|
||||
resolveConfig: ({ rawConfig }) => normalizeProviderConfig(rawConfig),
|
||||
isConfigured: ({ providerConfig }) =>
|
||||
Boolean(normalizeProviderConfig(providerConfig).apiKey || process.env.XAI_API_KEY),
|
||||
createSession: (req) => {
|
||||
const config = normalizeProviderConfig(req.providerConfig);
|
||||
const apiKey = config.apiKey || process.env.XAI_API_KEY;
|
||||
if (!apiKey) {
|
||||
throw new Error("xAI API key missing");
|
||||
}
|
||||
return new XaiRealtimeTranscriptionSession({
|
||||
...req,
|
||||
apiKey,
|
||||
baseUrl: normalizeXaiRealtimeBaseUrl(config.baseUrl),
|
||||
sampleRate: config.sampleRate ?? XAI_REALTIME_STT_DEFAULT_SAMPLE_RATE,
|
||||
encoding: config.encoding ?? XAI_REALTIME_STT_DEFAULT_ENCODING,
|
||||
interimResults: config.interimResults ?? true,
|
||||
endpointingMs: config.endpointingMs ?? XAI_REALTIME_STT_DEFAULT_ENDPOINTING_MS,
|
||||
language: config.language,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -66,6 +66,21 @@ const registerXaiPlugin = () =>
|
||||
name: "xAI Provider",
|
||||
});
|
||||
|
||||
async function waitForLiveExpectation(expectation: () => void, timeoutMs = 30_000) {
|
||||
const started = Date.now();
|
||||
let lastError: unknown;
|
||||
while (Date.now() - started < timeoutMs) {
|
||||
try {
|
||||
expectation();
|
||||
return;
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
}
|
||||
}
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
describeLive("xai plugin live", () => {
|
||||
it("synthesizes TTS through the registered speech provider", async () => {
|
||||
const { speechProviders } = await registerXaiPlugin();
|
||||
@@ -102,8 +117,11 @@ describeLive("xai plugin live", () => {
|
||||
},
|
||||
timeoutMs: 90_000,
|
||||
});
|
||||
expect(telephony?.outputFormat).toBe("pcm");
|
||||
expect(telephony?.sampleRate).toBe(24_000);
|
||||
if (!telephony) {
|
||||
throw new Error("xAI telephony synthesis did not return audio");
|
||||
}
|
||||
expect(telephony.outputFormat).toBe("pcm");
|
||||
expect(telephony.sampleRate).toBe(24_000);
|
||||
expect(telephony?.audioBuffer.byteLength).toBeGreaterThan(512);
|
||||
}, 120_000);
|
||||
|
||||
@@ -144,6 +162,67 @@ describeLive("xai plugin live", () => {
|
||||
expect(normalized).toContain("integration");
|
||||
}, 180_000);
|
||||
|
||||
it("streams realtime STT through the registered transcription provider", async () => {
|
||||
const { realtimeTranscriptionProviders, speechProviders } = await registerXaiPlugin();
|
||||
const realtimeProvider = requireRegisteredProvider(realtimeTranscriptionProviders, "xai");
|
||||
const speechProvider = requireRegisteredProvider(speechProviders, "xai");
|
||||
const cfg = createLiveConfig();
|
||||
const phrase = "OpenClaw xAI realtime transcription integration test OK.";
|
||||
|
||||
const telephony = await speechProvider.synthesizeTelephony?.({
|
||||
text: phrase,
|
||||
cfg,
|
||||
providerConfig: {
|
||||
apiKey: XAI_API_KEY,
|
||||
baseUrl: "https://api.x.ai/v1",
|
||||
voiceId: "eve",
|
||||
},
|
||||
timeoutMs: 90_000,
|
||||
});
|
||||
if (!telephony) {
|
||||
throw new Error("xAI telephony synthesis did not return audio");
|
||||
}
|
||||
expect(telephony.outputFormat).toBe("pcm");
|
||||
expect(telephony.sampleRate).toBe(24_000);
|
||||
|
||||
const transcripts: string[] = [];
|
||||
const partials: string[] = [];
|
||||
const errors: Error[] = [];
|
||||
const session = realtimeProvider.createSession({
|
||||
providerConfig: {
|
||||
apiKey: XAI_API_KEY,
|
||||
baseUrl: "https://api.x.ai/v1",
|
||||
sampleRate: telephony.sampleRate,
|
||||
encoding: "pcm",
|
||||
interimResults: true,
|
||||
endpointingMs: 500,
|
||||
language: "en",
|
||||
},
|
||||
onPartial: (partial) => partials.push(partial),
|
||||
onTranscript: (transcript) => transcripts.push(transcript),
|
||||
onError: (error) => errors.push(error),
|
||||
});
|
||||
|
||||
await session.connect();
|
||||
const audio = telephony.audioBuffer;
|
||||
const chunkSize = Math.max(1, Math.floor(telephony.sampleRate * 2 * 0.1));
|
||||
for (let offset = 0; offset < audio.byteLength; offset += chunkSize) {
|
||||
session.sendAudio(audio.subarray(offset, offset + chunkSize));
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
}
|
||||
session.close();
|
||||
|
||||
await waitForLiveExpectation(() => {
|
||||
if (errors[0]) {
|
||||
throw errors[0];
|
||||
}
|
||||
expect(transcripts.join(" ").toLowerCase()).toContain("openclaw");
|
||||
}, 60_000);
|
||||
const normalized = transcripts.join(" ").toLowerCase();
|
||||
expect(normalized).toContain("transcription");
|
||||
expect(partials.length + transcripts.length).toBeGreaterThan(0);
|
||||
}, 180_000);
|
||||
|
||||
it("generates and edits images through the registered image provider", async () => {
|
||||
const { imageProviders } = await registerXaiPlugin();
|
||||
const imageProvider = requireRegisteredProvider(imageProviders, "xai");
|
||||
|
||||
Reference in New Issue
Block a user