mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-23 23:22:32 +00:00
633 lines
19 KiB
TypeScript
633 lines
19 KiB
TypeScript
/**
|
|
* Media Stream Handler
|
|
*
|
|
* Handles bidirectional audio streaming between Twilio and the AI services.
|
|
* - Receives mu-law audio from Twilio via WebSocket
|
|
* - Forwards to the selected realtime transcription provider
|
|
* - Sends TTS audio back to Twilio
|
|
*/
|
|
|
|
import type { IncomingMessage } from "node:http";
|
|
import type { Duplex } from "node:stream";
|
|
import type {
|
|
RealtimeTranscriptionProviderConfig,
|
|
RealtimeTranscriptionProviderPlugin,
|
|
RealtimeTranscriptionSession,
|
|
} from "openclaw/plugin-sdk/realtime-transcription";
|
|
import { type RawData, WebSocket, WebSocketServer } from "ws";
|
|
|
|
/**
|
|
* Configuration for the media stream handler.
|
|
*/
|
|
export interface MediaStreamConfig {
|
|
/** Realtime transcription provider for streaming STT. */
|
|
transcriptionProvider: RealtimeTranscriptionProviderPlugin;
|
|
/** Provider-owned config blob passed into the transcription session. */
|
|
providerConfig: RealtimeTranscriptionProviderConfig;
|
|
/** Close sockets that never send a valid `start` frame within this window. */
|
|
preStartTimeoutMs?: number;
|
|
/** Max concurrent pre-start sockets. */
|
|
maxPendingConnections?: number;
|
|
/** Max concurrent pre-start sockets from a single source IP. */
|
|
maxPendingConnectionsPerIp?: number;
|
|
/** Max total open sockets (pending + active sessions). */
|
|
maxConnections?: number;
|
|
/** Validate whether to accept a media stream for the given call ID */
|
|
shouldAcceptStream?: (params: { callId: string; streamSid: string; token?: string }) => boolean;
|
|
/** Callback when transcript is received */
|
|
onTranscript?: (callId: string, transcript: string) => void;
|
|
/** Callback for partial transcripts (streaming UI) */
|
|
onPartialTranscript?: (callId: string, partial: string) => void;
|
|
/** Callback when stream connects */
|
|
onConnect?: (callId: string, streamSid: string) => void;
|
|
/** Callback when speech starts (barge-in) */
|
|
onSpeechStart?: (callId: string) => void;
|
|
/** Callback when stream disconnects */
|
|
onDisconnect?: (callId: string, streamSid: string) => void;
|
|
}
|
|
|
|
/**
|
|
* Active media stream session.
|
|
*/
|
|
interface StreamSession {
|
|
callId: string;
|
|
streamSid: string;
|
|
ws: WebSocket;
|
|
sttSession: RealtimeTranscriptionSession;
|
|
}
|
|
|
|
type TtsQueueEntry = {
|
|
playFn: (signal: AbortSignal) => Promise<void>;
|
|
controller: AbortController;
|
|
resolve: () => void;
|
|
reject: (error: unknown) => void;
|
|
};
|
|
|
|
type StreamSendResult = {
|
|
sent: boolean;
|
|
readyState?: number;
|
|
bufferedBeforeBytes: number;
|
|
bufferedAfterBytes: number;
|
|
};
|
|
|
|
type PendingConnection = {
|
|
ip: string;
|
|
timeout: ReturnType<typeof setTimeout>;
|
|
};
|
|
|
|
const DEFAULT_PRE_START_TIMEOUT_MS = 5000;
|
|
const DEFAULT_MAX_PENDING_CONNECTIONS = 32;
|
|
const DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP = 4;
|
|
const DEFAULT_MAX_CONNECTIONS = 128;
|
|
const MAX_INBOUND_MESSAGE_BYTES = 64 * 1024;
|
|
const MAX_WS_BUFFERED_BYTES = 1024 * 1024;
|
|
const CLOSE_REASON_LOG_MAX_CHARS = 120;
|
|
|
|
export function sanitizeLogText(value: string, maxChars: number): string {
|
|
const sanitized = value
|
|
.replace(/\p{Cc}/gu, " ")
|
|
.replace(/\s+/g, " ")
|
|
.trim();
|
|
if (sanitized.length <= maxChars) {
|
|
return sanitized;
|
|
}
|
|
return `${sanitized.slice(0, maxChars)}...`;
|
|
}
|
|
|
|
function normalizeWsMessageData(data: RawData): Buffer {
|
|
if (Buffer.isBuffer(data)) {
|
|
return data;
|
|
}
|
|
if (Array.isArray(data)) {
|
|
return Buffer.concat(data);
|
|
}
|
|
return Buffer.from(data);
|
|
}
|
|
|
|
/**
|
|
* Manages WebSocket connections for Twilio media streams.
|
|
*/
|
|
export class MediaStreamHandler {
|
|
private wss: WebSocketServer | null = null;
|
|
private sessions = new Map<string, StreamSession>();
|
|
private config: MediaStreamConfig;
|
|
/** Pending sockets that have upgraded but not yet sent an accepted `start` frame. */
|
|
private pendingConnections = new Map<WebSocket, PendingConnection>();
|
|
/** Pending socket count per remote IP for pre-auth throttling. */
|
|
private pendingByIp = new Map<string, number>();
|
|
private preStartTimeoutMs: number;
|
|
private maxPendingConnections: number;
|
|
private maxPendingConnectionsPerIp: number;
|
|
private maxConnections: number;
|
|
/** TTS playback queues per stream (serialize audio to prevent overlap) */
|
|
private ttsQueues = new Map<string, TtsQueueEntry[]>();
|
|
/** Whether TTS is currently playing per stream */
|
|
private ttsPlaying = new Map<string, boolean>();
|
|
/** Active TTS playback controllers per stream */
|
|
private ttsActiveControllers = new Map<string, AbortController>();
|
|
|
|
constructor(config: MediaStreamConfig) {
|
|
this.config = config;
|
|
this.preStartTimeoutMs = config.preStartTimeoutMs ?? DEFAULT_PRE_START_TIMEOUT_MS;
|
|
this.maxPendingConnections = config.maxPendingConnections ?? DEFAULT_MAX_PENDING_CONNECTIONS;
|
|
this.maxPendingConnectionsPerIp =
|
|
config.maxPendingConnectionsPerIp ?? DEFAULT_MAX_PENDING_CONNECTIONS_PER_IP;
|
|
this.maxConnections = config.maxConnections ?? DEFAULT_MAX_CONNECTIONS;
|
|
}
|
|
|
|
/**
|
|
* Handle WebSocket upgrade for media stream connections.
|
|
*/
|
|
handleUpgrade(request: IncomingMessage, socket: Duplex, head: Buffer): void {
|
|
if (!this.wss) {
|
|
this.wss = new WebSocketServer({
|
|
noServer: true,
|
|
// Reject oversized frames before app-level parsing runs on unauthenticated sockets.
|
|
maxPayload: MAX_INBOUND_MESSAGE_BYTES,
|
|
});
|
|
this.wss.on("connection", (ws, req) => this.handleConnection(ws, req));
|
|
}
|
|
|
|
const currentConnections = this.wss.clients.size;
|
|
if (currentConnections >= this.maxConnections) {
|
|
this.rejectUpgrade(socket, 503, "Too many media stream connections");
|
|
return;
|
|
}
|
|
|
|
this.wss.handleUpgrade(request, socket, head, (ws) => {
|
|
this.wss?.emit("connection", ws, request);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle new WebSocket connection from Twilio.
|
|
*/
|
|
private async handleConnection(ws: WebSocket, _request: IncomingMessage): Promise<void> {
|
|
let session: StreamSession | null = null;
|
|
const streamToken = this.getStreamToken(_request);
|
|
const ip = this.getClientIp(_request);
|
|
|
|
if (!this.registerPendingConnection(ws, ip)) {
|
|
ws.close(1013, "Too many pending media stream connections");
|
|
return;
|
|
}
|
|
|
|
ws.on("message", async (data: RawData) => {
|
|
try {
|
|
const raw = normalizeWsMessageData(data);
|
|
const message = JSON.parse(raw.toString("utf8")) as TwilioMediaMessage;
|
|
|
|
switch (message.event) {
|
|
case "connected":
|
|
console.log("[MediaStream] Twilio connected");
|
|
break;
|
|
|
|
case "start":
|
|
session = await this.handleStart(ws, message, streamToken);
|
|
if (session) {
|
|
this.clearPendingConnection(ws);
|
|
}
|
|
break;
|
|
|
|
case "media":
|
|
if (session && message.media?.payload) {
|
|
// Forward audio to STT
|
|
const audioBuffer = Buffer.from(message.media.payload, "base64");
|
|
session.sttSession.sendAudio(audioBuffer);
|
|
}
|
|
break;
|
|
|
|
case "stop":
|
|
if (session) {
|
|
this.handleStop(session);
|
|
session = null;
|
|
}
|
|
break;
|
|
}
|
|
} catch (error) {
|
|
console.error("[MediaStream] Error processing message:", error);
|
|
}
|
|
});
|
|
|
|
ws.on("close", (code, reason) => {
|
|
const rawReason = Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || "");
|
|
const reasonText = sanitizeLogText(rawReason, CLOSE_REASON_LOG_MAX_CHARS);
|
|
console.log(
|
|
`[MediaStream] WebSocket closed (code: ${code}, reason: ${reasonText || "none"})`,
|
|
);
|
|
this.clearPendingConnection(ws);
|
|
if (session) {
|
|
this.handleStop(session);
|
|
}
|
|
});
|
|
|
|
ws.on("error", (error) => {
|
|
console.error("[MediaStream] WebSocket error:", error);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle stream start event.
|
|
*/
|
|
private async handleStart(
|
|
ws: WebSocket,
|
|
message: TwilioMediaMessage,
|
|
streamToken?: string,
|
|
): Promise<StreamSession | null> {
|
|
const streamSid = message.streamSid || "";
|
|
const callSid = message.start?.callSid || "";
|
|
|
|
// Prefer token from start message customParameters (set via TwiML <Parameter>),
|
|
// falling back to query string token. Twilio strips query params from WebSocket
|
|
// URLs but reliably delivers <Parameter> values in customParameters.
|
|
const effectiveToken = message.start?.customParameters?.token ?? streamToken;
|
|
|
|
console.log(`[MediaStream] Stream started: ${streamSid} (call: ${callSid})`);
|
|
if (!callSid) {
|
|
console.warn("[MediaStream] Missing callSid; closing stream");
|
|
ws.close(1008, "Missing callSid");
|
|
return null;
|
|
}
|
|
if (
|
|
this.config.shouldAcceptStream &&
|
|
!this.config.shouldAcceptStream({ callId: callSid, streamSid, token: effectiveToken })
|
|
) {
|
|
console.warn(`[MediaStream] Rejecting stream for unknown call: ${callSid}`);
|
|
ws.close(1008, "Unknown call");
|
|
return null;
|
|
}
|
|
|
|
const sttSession = this.config.transcriptionProvider.createSession({
|
|
providerConfig: this.config.providerConfig,
|
|
onPartial: (partial) => {
|
|
this.config.onPartialTranscript?.(callSid, partial);
|
|
},
|
|
onTranscript: (transcript) => {
|
|
this.config.onTranscript?.(callSid, transcript);
|
|
},
|
|
onSpeechStart: () => {
|
|
this.config.onSpeechStart?.(callSid);
|
|
},
|
|
onError: (error) => {
|
|
console.warn("[MediaStream] Transcription session error:", error.message);
|
|
},
|
|
});
|
|
|
|
const session: StreamSession = {
|
|
callId: callSid,
|
|
streamSid,
|
|
ws,
|
|
sttSession,
|
|
};
|
|
|
|
this.sessions.set(streamSid, session);
|
|
|
|
// Notify connection BEFORE STT connect so TTS can work even if STT fails
|
|
this.config.onConnect?.(callSid, streamSid);
|
|
|
|
// Connect to transcription service (non-blocking, log errors but don't fail the call)
|
|
sttSession.connect().catch((err) => {
|
|
console.warn(`[MediaStream] STT connection failed (TTS still works):`, err.message);
|
|
});
|
|
|
|
return session;
|
|
}
|
|
|
|
/**
|
|
* Handle stream stop event.
|
|
*/
|
|
private handleStop(session: StreamSession): void {
|
|
console.log(`[MediaStream] Stream stopped: ${session.streamSid}`);
|
|
|
|
this.clearTtsState(session.streamSid);
|
|
session.sttSession.close();
|
|
this.sessions.delete(session.streamSid);
|
|
this.config.onDisconnect?.(session.callId, session.streamSid);
|
|
}
|
|
|
|
private getStreamToken(request: IncomingMessage): string | undefined {
|
|
if (!request.url || !request.headers.host) {
|
|
return undefined;
|
|
}
|
|
try {
|
|
const url = new URL(request.url, `http://${request.headers.host}`);
|
|
return url.searchParams.get("token") ?? undefined;
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
private getClientIp(request: IncomingMessage): string {
|
|
return request.socket.remoteAddress || "unknown";
|
|
}
|
|
|
|
private registerPendingConnection(ws: WebSocket, ip: string): boolean {
|
|
if (this.pendingConnections.size >= this.maxPendingConnections) {
|
|
console.warn("[MediaStream] Rejecting connection: pending connection limit reached");
|
|
return false;
|
|
}
|
|
|
|
const pendingForIp = this.pendingByIp.get(ip) ?? 0;
|
|
if (pendingForIp >= this.maxPendingConnectionsPerIp) {
|
|
console.warn(`[MediaStream] Rejecting connection: pending per-IP limit reached (${ip})`);
|
|
return false;
|
|
}
|
|
|
|
const timeout = setTimeout(() => {
|
|
if (!this.pendingConnections.has(ws)) {
|
|
return;
|
|
}
|
|
console.warn(
|
|
`[MediaStream] Closing pre-start idle connection after ${this.preStartTimeoutMs}ms (${ip})`,
|
|
);
|
|
ws.close(1008, "Start timeout");
|
|
}, this.preStartTimeoutMs);
|
|
|
|
timeout.unref?.();
|
|
this.pendingConnections.set(ws, { ip, timeout });
|
|
this.pendingByIp.set(ip, pendingForIp + 1);
|
|
return true;
|
|
}
|
|
|
|
private clearPendingConnection(ws: WebSocket): void {
|
|
const pending = this.pendingConnections.get(ws);
|
|
if (!pending) {
|
|
return;
|
|
}
|
|
|
|
clearTimeout(pending.timeout);
|
|
this.pendingConnections.delete(ws);
|
|
|
|
const current = this.pendingByIp.get(pending.ip) ?? 0;
|
|
if (current <= 1) {
|
|
this.pendingByIp.delete(pending.ip);
|
|
return;
|
|
}
|
|
this.pendingByIp.set(pending.ip, current - 1);
|
|
}
|
|
|
|
private rejectUpgrade(socket: Duplex, statusCode: 429 | 503, message: string): void {
|
|
const statusText = statusCode === 429 ? "Too Many Requests" : "Service Unavailable";
|
|
const body = `${message}\n`;
|
|
socket.write(
|
|
`HTTP/1.1 ${statusCode} ${statusText}\r\n` +
|
|
"Connection: close\r\n" +
|
|
"Content-Type: text/plain; charset=utf-8\r\n" +
|
|
`Content-Length: ${Buffer.byteLength(body)}\r\n` +
|
|
"\r\n" +
|
|
body,
|
|
);
|
|
socket.destroy();
|
|
}
|
|
|
|
/**
|
|
* Get an active session with an open WebSocket, or undefined if unavailable.
|
|
*/
|
|
private getOpenSession(streamSid: string): StreamSession | undefined {
|
|
const session = this.sessions.get(streamSid);
|
|
return session?.ws.readyState === WebSocket.OPEN ? session : undefined;
|
|
}
|
|
|
|
/**
|
|
* Send a message to a stream's WebSocket if available.
|
|
*/
|
|
private sendToStream(streamSid: string, message: unknown): StreamSendResult {
|
|
const session = this.sessions.get(streamSid);
|
|
if (!session) {
|
|
return {
|
|
sent: false,
|
|
bufferedBeforeBytes: 0,
|
|
bufferedAfterBytes: 0,
|
|
};
|
|
}
|
|
|
|
const readyState = session.ws.readyState;
|
|
const bufferedBeforeBytes = session.ws.bufferedAmount;
|
|
if (readyState !== WebSocket.OPEN) {
|
|
return {
|
|
sent: false,
|
|
readyState,
|
|
bufferedBeforeBytes,
|
|
bufferedAfterBytes: session.ws.bufferedAmount,
|
|
};
|
|
}
|
|
if (bufferedBeforeBytes > MAX_WS_BUFFERED_BYTES) {
|
|
try {
|
|
session.ws.close(1013, "Backpressure: send buffer exceeded");
|
|
} catch {
|
|
// Best-effort close; caller still receives sent:false.
|
|
}
|
|
return {
|
|
sent: false,
|
|
readyState,
|
|
bufferedBeforeBytes,
|
|
bufferedAfterBytes: session.ws.bufferedAmount,
|
|
};
|
|
}
|
|
|
|
try {
|
|
session.ws.send(JSON.stringify(message));
|
|
const bufferedAfterBytes = session.ws.bufferedAmount;
|
|
if (bufferedAfterBytes > MAX_WS_BUFFERED_BYTES) {
|
|
try {
|
|
session.ws.close(1013, "Backpressure: send buffer exceeded");
|
|
} catch {
|
|
// Best-effort close; caller still receives sent:false.
|
|
}
|
|
return {
|
|
sent: false,
|
|
readyState,
|
|
bufferedBeforeBytes,
|
|
bufferedAfterBytes,
|
|
};
|
|
}
|
|
return {
|
|
sent: true,
|
|
readyState,
|
|
bufferedBeforeBytes,
|
|
bufferedAfterBytes,
|
|
};
|
|
} catch {
|
|
return {
|
|
sent: false,
|
|
readyState,
|
|
bufferedBeforeBytes,
|
|
bufferedAfterBytes: session.ws.bufferedAmount,
|
|
};
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Send audio to a specific stream (for TTS playback).
|
|
* Audio should be mu-law encoded at 8kHz mono.
|
|
*/
|
|
sendAudio(streamSid: string, muLawAudio: Buffer): StreamSendResult {
|
|
return this.sendToStream(streamSid, {
|
|
event: "media",
|
|
streamSid,
|
|
media: { payload: muLawAudio.toString("base64") },
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Send a mark event to track audio playback position.
|
|
*/
|
|
sendMark(streamSid: string, name: string): StreamSendResult {
|
|
return this.sendToStream(streamSid, {
|
|
event: "mark",
|
|
streamSid,
|
|
mark: { name },
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Clear audio buffer (interrupt playback).
|
|
*/
|
|
clearAudio(streamSid: string): StreamSendResult {
|
|
return this.sendToStream(streamSid, { event: "clear", streamSid });
|
|
}
|
|
|
|
/**
|
|
* Queue a TTS operation for sequential playback.
|
|
* Only one TTS operation plays at a time per stream to prevent overlap.
|
|
*/
|
|
async queueTts(streamSid: string, playFn: (signal: AbortSignal) => Promise<void>): Promise<void> {
|
|
const queue = this.getTtsQueue(streamSid);
|
|
let resolveEntry: () => void;
|
|
let rejectEntry: (error: unknown) => void;
|
|
const promise = new Promise<void>((resolve, reject) => {
|
|
resolveEntry = resolve;
|
|
rejectEntry = reject;
|
|
});
|
|
|
|
queue.push({
|
|
playFn,
|
|
controller: new AbortController(),
|
|
resolve: resolveEntry!,
|
|
reject: rejectEntry!,
|
|
});
|
|
|
|
if (!this.ttsPlaying.get(streamSid)) {
|
|
void this.processQueue(streamSid);
|
|
}
|
|
|
|
return promise;
|
|
}
|
|
|
|
/**
|
|
* Clear TTS queue and interrupt current playback (barge-in).
|
|
*/
|
|
clearTtsQueue(streamSid: string, _reason = "unspecified"): void {
|
|
const queue = this.getTtsQueue(streamSid);
|
|
queue.length = 0;
|
|
this.ttsActiveControllers.get(streamSid)?.abort();
|
|
this.clearAudio(streamSid);
|
|
}
|
|
|
|
/**
|
|
* Get active session by call ID.
|
|
*/
|
|
getSessionByCallId(callId: string): StreamSession | undefined {
|
|
return [...this.sessions.values()].find((session) => session.callId === callId);
|
|
}
|
|
|
|
/**
|
|
* Close all sessions.
|
|
*/
|
|
closeAll(): void {
|
|
for (const session of this.sessions.values()) {
|
|
this.clearTtsState(session.streamSid);
|
|
session.sttSession.close();
|
|
session.ws.close();
|
|
}
|
|
this.sessions.clear();
|
|
}
|
|
|
|
private getTtsQueue(streamSid: string): TtsQueueEntry[] {
|
|
const existing = this.ttsQueues.get(streamSid);
|
|
if (existing) {
|
|
return existing;
|
|
}
|
|
const queue: TtsQueueEntry[] = [];
|
|
this.ttsQueues.set(streamSid, queue);
|
|
return queue;
|
|
}
|
|
|
|
/**
|
|
* Process the TTS queue for a stream.
|
|
* Uses iterative approach to avoid stack accumulation from recursion.
|
|
*/
|
|
private async processQueue(streamSid: string): Promise<void> {
|
|
this.ttsPlaying.set(streamSid, true);
|
|
|
|
while (true) {
|
|
const queue = this.ttsQueues.get(streamSid);
|
|
if (!queue || queue.length === 0) {
|
|
this.ttsPlaying.set(streamSid, false);
|
|
this.ttsActiveControllers.delete(streamSid);
|
|
return;
|
|
}
|
|
|
|
const entry = queue.shift()!;
|
|
this.ttsActiveControllers.set(streamSid, entry.controller);
|
|
|
|
try {
|
|
await entry.playFn(entry.controller.signal);
|
|
entry.resolve();
|
|
} catch (error) {
|
|
if (entry.controller.signal.aborted) {
|
|
entry.resolve();
|
|
} else {
|
|
console.error("[MediaStream] TTS playback error:", error);
|
|
entry.reject(error);
|
|
}
|
|
} finally {
|
|
if (this.ttsActiveControllers.get(streamSid) === entry.controller) {
|
|
this.ttsActiveControllers.delete(streamSid);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private clearTtsState(streamSid: string): void {
|
|
const queue = this.ttsQueues.get(streamSid);
|
|
if (queue) {
|
|
queue.length = 0;
|
|
}
|
|
this.ttsActiveControllers.get(streamSid)?.abort();
|
|
this.ttsActiveControllers.delete(streamSid);
|
|
this.ttsPlaying.delete(streamSid);
|
|
this.ttsQueues.delete(streamSid);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Twilio Media Stream message format.
|
|
*/
|
|
interface TwilioMediaMessage {
|
|
event: "connected" | "start" | "media" | "stop" | "mark" | "clear";
|
|
sequenceNumber?: string;
|
|
streamSid?: string;
|
|
start?: {
|
|
streamSid: string;
|
|
accountSid: string;
|
|
callSid: string;
|
|
tracks: string[];
|
|
customParameters?: Record<string, string>;
|
|
mediaFormat: {
|
|
encoding: string;
|
|
sampleRate: number;
|
|
channels: number;
|
|
};
|
|
};
|
|
media?: {
|
|
track?: string;
|
|
chunk?: string;
|
|
timestamp?: string;
|
|
payload?: string;
|
|
};
|
|
mark?: {
|
|
name: string;
|
|
};
|
|
}
|