mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:00:43 +00:00
fix(voice-call): stabilize Twilio STT startup (#75257)
Fix Twilio voice-call startup so accepted media streams register immediately, realtime transcription readiness gates only the initial greeting, and early inbound media is preserved while STT connects. Fixes #75197. Thanks @PfanP and @donkeykong91.
This commit is contained in:
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Voice Call/Twilio: register accepted media streams immediately but wait for realtime transcription readiness before speaking the initial greeting, so reconnect grace handling stays live while OpenAI STT startup is no longer starved by TTS. Fixes #75197. (#75257) Thanks @donkeykong91 and @PfanP.
|
||||
- Agents/pi-embedded-runner: extract the `abortable` provider-call wrapper from `runEmbeddedAttempt` to module scope so its promise handlers no longer close over the run lexical context, releasing transcripts, tool buffers, and subscription callbacks when a provider call hangs past abort. (#74182) Thanks @cjboy007.
|
||||
- Docker: restore `python3` in the gateway runtime image after the slim-runtime switch. Fixes #75041.
|
||||
- CLI/Voice Call: scope `voicecall` command activation to the Voice Call plugin so setup and smoke checks no longer broad-load unrelated plugin runtimes or hang after printing JSON. Thanks @vincentkoc.
|
||||
|
||||
@@ -297,6 +297,7 @@ Current runtime behavior:
|
||||
- `streaming.provider` is optional. If unset, Voice Call uses the first registered realtime transcription provider.
|
||||
- Bundled realtime transcription providers: Deepgram (`deepgram`), ElevenLabs (`elevenlabs`), Mistral (`mistral`), OpenAI (`openai`), and xAI (`xai`), registered by their provider plugins.
|
||||
- Provider-owned raw config lives under `streaming.providers.<providerId>`.
|
||||
- After Twilio sends an accepted stream `start` message, Voice Call registers the stream immediately, queues inbound media through the transcription provider while the provider connects, and starts the initial greeting only after realtime transcription is ready.
|
||||
- If `streaming.provider` points at an unregistered provider, or none is registered, Voice Call logs a warning and skips media streaming instead of failing the whole plugin.
|
||||
|
||||
### Streaming provider examples
|
||||
|
||||
@@ -33,6 +33,20 @@ const flush = async (): Promise<void> => {
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
};
|
||||
|
||||
const createDeferred = (): {
|
||||
promise: Promise<void>;
|
||||
resolve: () => void;
|
||||
reject: (error: Error) => void;
|
||||
} => {
|
||||
let resolve!: () => void;
|
||||
let reject!: (error: Error) => void;
|
||||
const promise = new Promise<void>((resolvePromise, rejectPromise) => {
|
||||
resolve = resolvePromise;
|
||||
reject = rejectPromise;
|
||||
});
|
||||
return { promise, resolve, reject };
|
||||
};
|
||||
|
||||
const waitForAbort = (signal: AbortSignal): Promise<void> =>
|
||||
new Promise((resolve) => {
|
||||
if (signal.aborted) {
|
||||
@@ -502,6 +516,211 @@ describe("MediaStreamHandler security hardening", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("keeps accepted streams alive while STT readiness exceeds the pre-start timeout", async () => {
|
||||
const sttReady = createDeferred();
|
||||
const sttConnectStarted = createDeferred();
|
||||
const transcriptionReady = createDeferred();
|
||||
const events: string[] = [];
|
||||
|
||||
const session: RealtimeTranscriptionSession = {
|
||||
connect: async () => {
|
||||
events.push("stt-connect-start");
|
||||
sttConnectStarted.resolve();
|
||||
await sttReady.promise;
|
||||
events.push("stt-connect-ready");
|
||||
},
|
||||
sendAudio: () => {},
|
||||
close: () => {},
|
||||
isConnected: () => false,
|
||||
};
|
||||
|
||||
const handler = new MediaStreamHandler({
|
||||
transcriptionProvider: {
|
||||
createSession: () => session,
|
||||
id: "openai",
|
||||
label: "OpenAI",
|
||||
isConfigured: () => true,
|
||||
},
|
||||
providerConfig: {},
|
||||
preStartTimeoutMs: 40,
|
||||
shouldAcceptStream: () => true,
|
||||
onConnect: () => {
|
||||
events.push("onConnect");
|
||||
},
|
||||
onTranscriptionReady: () => {
|
||||
events.push("onTranscriptionReady");
|
||||
transcriptionReady.resolve();
|
||||
},
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const ws = await connectWs(server.url);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
event: "start",
|
||||
streamSid: "MZ-slow-stt",
|
||||
start: { callSid: "CA-slow-stt" },
|
||||
}),
|
||||
);
|
||||
|
||||
await withTimeout(sttConnectStarted.promise);
|
||||
await new Promise((resolve) => setTimeout(resolve, 80));
|
||||
expect(ws.readyState).toBe(WebSocket.OPEN);
|
||||
expect(events).toEqual(["onConnect", "stt-connect-start"]);
|
||||
|
||||
sttReady.resolve();
|
||||
await withTimeout(transcriptionReady.promise);
|
||||
expect(events).toEqual([
|
||||
"onConnect",
|
||||
"stt-connect-start",
|
||||
"stt-connect-ready",
|
||||
"onTranscriptionReady",
|
||||
]);
|
||||
|
||||
ws.close();
|
||||
await waitForClose(ws);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("forwards early Twilio media into the STT session before readiness", async () => {
|
||||
const sttReady = createDeferred();
|
||||
const sttConnectStarted = createDeferred();
|
||||
const transcriptionReady = createDeferred();
|
||||
const receivedAudio: Buffer[] = [];
|
||||
let onConnectCalls = 0;
|
||||
let onTranscriptionReadyCalls = 0;
|
||||
|
||||
const session: RealtimeTranscriptionSession = {
|
||||
connect: async () => {
|
||||
sttConnectStarted.resolve();
|
||||
await sttReady.promise;
|
||||
},
|
||||
sendAudio: (audio) => {
|
||||
receivedAudio.push(Buffer.from(audio));
|
||||
},
|
||||
close: () => {},
|
||||
isConnected: () => false,
|
||||
};
|
||||
|
||||
const handler = new MediaStreamHandler({
|
||||
transcriptionProvider: {
|
||||
createSession: () => session,
|
||||
id: "openai",
|
||||
label: "OpenAI",
|
||||
isConfigured: () => true,
|
||||
},
|
||||
providerConfig: {},
|
||||
shouldAcceptStream: () => true,
|
||||
onConnect: () => {
|
||||
onConnectCalls += 1;
|
||||
},
|
||||
onTranscriptionReady: () => {
|
||||
onTranscriptionReadyCalls += 1;
|
||||
transcriptionReady.resolve();
|
||||
},
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const ws = await connectWs(server.url);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
event: "start",
|
||||
streamSid: "MZ-early-media",
|
||||
start: { callSid: "CA-early-media" },
|
||||
}),
|
||||
);
|
||||
|
||||
await withTimeout(sttConnectStarted.promise);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
event: "media",
|
||||
streamSid: "MZ-early-media",
|
||||
media: { payload: Buffer.from("early").toString("base64") },
|
||||
}),
|
||||
);
|
||||
await flush();
|
||||
|
||||
expect(Buffer.concat(receivedAudio).toString()).toBe("early");
|
||||
expect(onConnectCalls).toBe(1);
|
||||
expect(onTranscriptionReadyCalls).toBe(0);
|
||||
|
||||
sttReady.resolve();
|
||||
await withTimeout(transcriptionReady.promise);
|
||||
expect(onConnectCalls).toBe(1);
|
||||
expect(onTranscriptionReadyCalls).toBe(1);
|
||||
|
||||
ws.close();
|
||||
await waitForClose(ws);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("closes the media stream and disconnects once when STT readiness fails", async () => {
|
||||
const sttConnectStarted = createDeferred();
|
||||
const onDisconnectReady = createDeferred();
|
||||
const onConnect = vi.fn();
|
||||
const onTranscriptionReady = vi.fn();
|
||||
const onDisconnect = vi.fn(() => {
|
||||
onDisconnectReady.resolve();
|
||||
});
|
||||
|
||||
const session: RealtimeTranscriptionSession = {
|
||||
connect: async () => {
|
||||
sttConnectStarted.resolve();
|
||||
throw new Error("provider unavailable");
|
||||
},
|
||||
sendAudio: () => {},
|
||||
close: vi.fn(),
|
||||
isConnected: () => false,
|
||||
};
|
||||
|
||||
const handler = new MediaStreamHandler({
|
||||
transcriptionProvider: {
|
||||
createSession: () => session,
|
||||
id: "openai",
|
||||
label: "OpenAI",
|
||||
isConfigured: () => true,
|
||||
},
|
||||
providerConfig: {},
|
||||
shouldAcceptStream: () => true,
|
||||
onConnect,
|
||||
onTranscriptionReady,
|
||||
onDisconnect,
|
||||
});
|
||||
const server = await startWsServer(handler);
|
||||
|
||||
try {
|
||||
const ws = await connectWs(server.url);
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
event: "start",
|
||||
streamSid: "MZ-stt-fail",
|
||||
start: { callSid: "CA-stt-fail" },
|
||||
}),
|
||||
);
|
||||
|
||||
await withTimeout(sttConnectStarted.promise);
|
||||
const closed = await waitForClose(ws);
|
||||
await withTimeout(onDisconnectReady.promise);
|
||||
|
||||
expect(closed.code).toBe(1011);
|
||||
expect(closed.reason).toBe("STT connection failed");
|
||||
expect(onConnect).toHaveBeenCalledTimes(1);
|
||||
expect(onConnect).toHaveBeenCalledWith("CA-stt-fail", "MZ-stt-fail");
|
||||
expect(onTranscriptionReady).not.toHaveBeenCalled();
|
||||
expect(onDisconnect).toHaveBeenCalledTimes(1);
|
||||
expect(onDisconnect).toHaveBeenCalledWith("CA-stt-fail", "MZ-stt-fail");
|
||||
expect(session.close).toHaveBeenCalledTimes(1);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects oversized pre-start frames at the websocket maxPayload guard before validation runs", async () => {
|
||||
const shouldAcceptStreamCalls: Array<{ callId: string; streamSid: string; token?: string }> =
|
||||
[];
|
||||
|
||||
@@ -42,6 +42,8 @@ export interface MediaStreamConfig {
|
||||
onPartialTranscript?: (callId: string, partial: string) => void;
|
||||
/** Callback when stream connects */
|
||||
onConnect?: (callId: string, streamSid: string) => void;
|
||||
/** Callback when realtime transcription is ready for the stream */
|
||||
onTranscriptionReady?: (callId: string, streamSid: string) => void;
|
||||
/** Callback when speech starts (barge-in) */
|
||||
onSpeechStart?: (callId: string) => void;
|
||||
/** Callback when stream disconnects */
|
||||
@@ -213,7 +215,7 @@ export class MediaStreamHandler {
|
||||
break;
|
||||
|
||||
case "start":
|
||||
session = await this.handleStart(ws, message, streamToken);
|
||||
session = this.handleStart(ws, message, streamToken);
|
||||
if (session) {
|
||||
this.clearPendingConnection(ws);
|
||||
}
|
||||
@@ -263,11 +265,11 @@ export class MediaStreamHandler {
|
||||
/**
|
||||
* Handle stream start event.
|
||||
*/
|
||||
private async handleStart(
|
||||
private handleStart(
|
||||
ws: WebSocket,
|
||||
message: TwilioMediaMessage,
|
||||
streamToken?: string,
|
||||
): Promise<StreamSession | null> {
|
||||
): StreamSession | null {
|
||||
const streamSid = message.streamSid || "";
|
||||
const callSid = message.start?.callSid || "";
|
||||
|
||||
@@ -315,18 +317,42 @@ export class MediaStreamHandler {
|
||||
};
|
||||
|
||||
this.sessions.set(streamSid, session);
|
||||
|
||||
// Notify connection BEFORE STT connect so TTS can work even if STT fails
|
||||
this.config.onConnect?.(callSid, streamSid);
|
||||
|
||||
// Connect to transcription service (non-blocking, log errors but don't fail the call)
|
||||
sttSession.connect().catch((err) => {
|
||||
console.warn(`[MediaStream] STT connection failed (TTS still works):`, err.message);
|
||||
});
|
||||
void this.connectTranscriptionAndNotify(session);
|
||||
|
||||
return session;
|
||||
}
|
||||
|
||||
private async connectTranscriptionAndNotify(session: StreamSession): Promise<void> {
|
||||
try {
|
||||
await session.sttSession.connect();
|
||||
} catch (error) {
|
||||
console.warn(
|
||||
"[MediaStream] STT connection failed; closing media stream:",
|
||||
error instanceof Error ? error.message : String(error),
|
||||
);
|
||||
if (
|
||||
this.sessions.get(session.streamSid) === session &&
|
||||
session.ws.readyState === WebSocket.OPEN
|
||||
) {
|
||||
session.ws.close(1011, "STT connection failed");
|
||||
} else {
|
||||
session.sttSession.close();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
this.sessions.get(session.streamSid) !== session ||
|
||||
session.ws.readyState !== WebSocket.OPEN
|
||||
) {
|
||||
session.sttSession.close();
|
||||
return;
|
||||
}
|
||||
|
||||
this.config.onTranscriptionReady?.(session.callId, session.streamSid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle stream stop event.
|
||||
*/
|
||||
|
||||
@@ -1159,7 +1159,7 @@ describe("VoiceCallWebhookServer stream disconnect grace", () => {
|
||||
processEvent: vi.fn(),
|
||||
} as unknown as CallManager;
|
||||
|
||||
let currentStreamSid: string | null = "MZ-new";
|
||||
let currentStreamSid: string | null = "MZ-old";
|
||||
const twilioProvider = createTwilioStreamingProvider({
|
||||
registerCallStream: (_callSid: string, streamSid: string) => {
|
||||
currentStreamSid = streamSid;
|
||||
@@ -1195,16 +1195,23 @@ describe("VoiceCallWebhookServer stream disconnect grace", () => {
|
||||
config: {
|
||||
onDisconnect?: (providerCallId: string, streamSid: string) => void;
|
||||
onConnect?: (providerCallId: string, streamSid: string) => void;
|
||||
onTranscriptionReady?: (providerCallId: string, streamSid: string) => void;
|
||||
};
|
||||
};
|
||||
if (!mediaHandler) {
|
||||
throw new Error("expected webhook server to expose a media stream handler");
|
||||
}
|
||||
|
||||
mediaHandler.config.onConnect?.("CA-stream-1", "MZ-new");
|
||||
mediaHandler.config.onDisconnect?.("CA-stream-1", "MZ-old");
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
mediaHandler.config.onConnect?.("CA-stream-1", "MZ-new");
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
expect(endCall).not.toHaveBeenCalled();
|
||||
expect(speakInitialMessage).not.toHaveBeenCalled();
|
||||
|
||||
mediaHandler.config.onTranscriptionReady?.("CA-stream-1", "MZ-new");
|
||||
expect(speakInitialMessage).toHaveBeenCalledTimes(1);
|
||||
expect(speakInitialMessage).toHaveBeenCalledWith("CA-stream-1");
|
||||
|
||||
mediaHandler.config.onDisconnect?.("CA-stream-1", "MZ-new");
|
||||
await vi.advanceTimersByTimeAsync(2_100);
|
||||
|
||||
@@ -383,8 +383,8 @@ export class VoiceCallWebhookServer {
|
||||
if (this.provider.name === "twilio") {
|
||||
(this.provider as TwilioProvider).registerCallStream(callId, streamSid);
|
||||
}
|
||||
|
||||
// Speak initial message immediately (no delay) to avoid stream timeout
|
||||
},
|
||||
onTranscriptionReady: (callId) => {
|
||||
this.manager.speakInitialMessage(callId).catch((err) => {
|
||||
console.warn(`[voice-call] Failed to speak initial message:`, err);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user