mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:40:44 +00:00
fix(voice-call): pace realtime Twilio audio
This commit is contained in:
committed by
Peter Steinberger
parent
19f948af2e
commit
7fc9a82dca
@@ -445,7 +445,7 @@ Enable the Voice Call plugin on the Gateway host, not on the Chrome node:
|
||||
```json5
|
||||
{
|
||||
plugins: {
|
||||
allow: ["google-meet", "voice-call"],
|
||||
allow: ["google-meet", "voice-call", "google"],
|
||||
entries: {
|
||||
"google-meet": {
|
||||
enabled: true,
|
||||
@@ -458,8 +458,24 @@ Enable the Voice Call plugin on the Gateway host, not on the Chrome node:
|
||||
enabled: true,
|
||||
config: {
|
||||
provider: "twilio",
|
||||
inboundPolicy: "allowlist",
|
||||
realtime: {
|
||||
enabled: true,
|
||||
provider: "google",
|
||||
instructions: "Join this Google Meet as an OpenClaw agent. Be brief.",
|
||||
toolPolicy: "safe-read-only",
|
||||
providers: {
|
||||
google: {
|
||||
silenceDurationMs: 500,
|
||||
startSensitivity: "high",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
google: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -472,8 +488,12 @@ secrets out of `openclaw.json`:
|
||||
export TWILIO_ACCOUNT_SID=AC...
|
||||
export TWILIO_AUTH_TOKEN=...
|
||||
export TWILIO_FROM_NUMBER=+15550001234
|
||||
export GEMINI_API_KEY=...
|
||||
```
|
||||
|
||||
Use `realtime.provider: "openai"` with the OpenAI provider plugin and
|
||||
`OPENAI_API_KEY` instead if that is your realtime voice provider.
|
||||
|
||||
Restart or reload the Gateway after enabling `voice-call`; plugin config changes
|
||||
do not appear in an already running Gateway process until it reloads.
|
||||
|
||||
|
||||
@@ -250,6 +250,9 @@ Current runtime behaviour:
|
||||
Defaults: API key from `realtime.providers.google.apiKey`,
|
||||
`GEMINI_API_KEY`, or `GOOGLE_GENERATIVE_AI_API_KEY`; model
|
||||
`gemini-2.5-flash-native-audio-preview-12-2025`; voice `Kore`.
|
||||
`sessionResumption` and `contextWindowCompression` default on for longer,
|
||||
reconnectable calls. Use `silenceDurationMs`, `startSensitivity`, and
|
||||
`endSensitivity` to tune faster turn-taking on telephony audio.
|
||||
|
||||
```json5
|
||||
{
|
||||
@@ -270,6 +273,8 @@ Current runtime behaviour:
|
||||
apiKey: "${GEMINI_API_KEY}",
|
||||
model: "gemini-2.5-flash-native-audio-preview-12-2025",
|
||||
voice: "Kore",
|
||||
silenceDurationMs: 500,
|
||||
startSensitivity: "high",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -107,6 +107,8 @@ describe("buildGoogleRealtimeVoiceProvider", () => {
|
||||
turnCoverage: "only-activity",
|
||||
automaticActivityDetectionDisabled: false,
|
||||
enableAffectiveDialog: undefined,
|
||||
sessionResumption: undefined,
|
||||
contextWindowCompression: undefined,
|
||||
thinkingLevel: undefined,
|
||||
thinkingBudget: undefined,
|
||||
});
|
||||
@@ -181,6 +183,8 @@ describe("buildGoogleRealtimeVoiceProvider", () => {
|
||||
},
|
||||
turnCoverage: "TURN_INCLUDES_ONLY_ACTIVITY",
|
||||
},
|
||||
sessionResumption: {},
|
||||
contextWindowCompression: { slidingWindow: {} },
|
||||
tools: [
|
||||
{
|
||||
functionDeclarations: [
|
||||
@@ -312,6 +316,42 @@ describe("buildGoogleRealtimeVoiceProvider", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("can opt out of Google Live session resumption and context compression", async () => {
|
||||
const provider = buildGoogleRealtimeVoiceProvider();
|
||||
const bridge = provider.createBridge({
|
||||
providerConfig: {
|
||||
apiKey: "gemini-key",
|
||||
contextWindowCompression: false,
|
||||
sessionResumption: false,
|
||||
},
|
||||
onAudio: vi.fn(),
|
||||
onClearAudio: vi.fn(),
|
||||
});
|
||||
|
||||
await bridge.connect();
|
||||
|
||||
expect(lastConnectParams().config).not.toHaveProperty("contextWindowCompression");
|
||||
expect(lastConnectParams().config).not.toHaveProperty("sessionResumption");
|
||||
});
|
||||
|
||||
it("captures Google Live resumption handles and reuses them on reconnect", async () => {
|
||||
const provider = buildGoogleRealtimeVoiceProvider();
|
||||
const bridge = provider.createBridge({
|
||||
providerConfig: { apiKey: "gemini-key" },
|
||||
onAudio: vi.fn(),
|
||||
onClearAudio: vi.fn(),
|
||||
});
|
||||
|
||||
await bridge.connect();
|
||||
lastConnectParams().callbacks.onmessage({
|
||||
sessionResumptionUpdate: { resumable: true, newHandle: "resume-1" },
|
||||
});
|
||||
|
||||
await bridge.connect();
|
||||
|
||||
expect(lastConnectParams().config.sessionResumption).toEqual({ handle: "resume-1" });
|
||||
});
|
||||
|
||||
it("waits for setup completion before draining audio and firing ready", async () => {
|
||||
const provider = buildGoogleRealtimeVoiceProvider();
|
||||
const onReady = vi.fn();
|
||||
|
||||
@@ -1,20 +1,20 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import {
|
||||
import type {
|
||||
ActivityHandling,
|
||||
Behavior,
|
||||
EndSensitivity,
|
||||
FunctionDeclaration,
|
||||
FunctionResponse,
|
||||
FunctionResponseScheduling,
|
||||
LiveConnectConfig,
|
||||
LiveServerContent,
|
||||
LiveServerMessage,
|
||||
LiveServerToolCall,
|
||||
Modality,
|
||||
RealtimeInputConfig,
|
||||
StartSensitivity,
|
||||
ThinkingConfig,
|
||||
TurnCoverage,
|
||||
type FunctionDeclaration,
|
||||
type FunctionResponse,
|
||||
type LiveConnectConfig,
|
||||
type LiveServerContent,
|
||||
type LiveServerMessage,
|
||||
type LiveServerToolCall,
|
||||
type RealtimeInputConfig,
|
||||
type ThinkingConfig,
|
||||
} from "@google/genai";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/provider-onboard";
|
||||
import type {
|
||||
@@ -47,7 +47,7 @@ const GOOGLE_REALTIME_BROWSER_API_VERSION = "v1alpha";
|
||||
const GOOGLE_REALTIME_BROWSER_WEBSOCKET_URL =
|
||||
"wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained";
|
||||
const MAX_PENDING_AUDIO_CHUNKS = 320;
|
||||
const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 700;
|
||||
const DEFAULT_AUDIO_STREAM_END_SILENCE_MS = 500;
|
||||
const GOOGLE_REALTIME_BROWSER_SESSION_TTL_MS = 30 * 60 * 1000;
|
||||
const GOOGLE_REALTIME_BROWSER_NEW_SESSION_TTL_MS = 60 * 1000;
|
||||
|
||||
@@ -70,6 +70,8 @@ type GoogleRealtimeVoiceProviderConfig = {
|
||||
turnCoverage?: GoogleRealtimeTurnCoverage;
|
||||
automaticActivityDetectionDisabled?: boolean;
|
||||
enableAffectiveDialog?: boolean;
|
||||
sessionResumption?: boolean;
|
||||
contextWindowCompression?: boolean;
|
||||
thinkingLevel?: GoogleRealtimeThinkingLevel;
|
||||
thinkingBudget?: number;
|
||||
};
|
||||
@@ -90,6 +92,8 @@ type GoogleRealtimeLiveConfig = {
|
||||
turnCoverage?: GoogleRealtimeTurnCoverage;
|
||||
automaticActivityDetectionDisabled?: boolean;
|
||||
enableAffectiveDialog?: boolean;
|
||||
sessionResumption?: boolean;
|
||||
contextWindowCompression?: boolean;
|
||||
thinkingLevel?: GoogleRealtimeThinkingLevel;
|
||||
thinkingBudget?: number;
|
||||
};
|
||||
@@ -209,6 +213,8 @@ function normalizeProviderConfig(
|
||||
turnCoverage: asTurnCoverage(raw?.turnCoverage),
|
||||
automaticActivityDetectionDisabled: asBoolean(raw?.automaticActivityDetectionDisabled),
|
||||
enableAffectiveDialog: asBoolean(raw?.enableAffectiveDialog),
|
||||
sessionResumption: asBoolean(raw?.sessionResumption),
|
||||
contextWindowCompression: asBoolean(raw?.contextWindowCompression),
|
||||
thinkingLevel: asThinkingLevel(raw?.thinkingLevel),
|
||||
thinkingBudget: asFiniteNumber(raw?.thinkingBudget),
|
||||
};
|
||||
@@ -223,9 +229,9 @@ function mapStartSensitivity(
|
||||
): StartSensitivity | undefined {
|
||||
switch (value) {
|
||||
case "high":
|
||||
return StartSensitivity.START_SENSITIVITY_HIGH;
|
||||
return "START_SENSITIVITY_HIGH" as StartSensitivity;
|
||||
case "low":
|
||||
return StartSensitivity.START_SENSITIVITY_LOW;
|
||||
return "START_SENSITIVITY_LOW" as StartSensitivity;
|
||||
default:
|
||||
return undefined;
|
||||
}
|
||||
@@ -236,9 +242,9 @@ function mapEndSensitivity(
|
||||
): EndSensitivity | undefined {
|
||||
switch (value) {
|
||||
case "high":
|
||||
return EndSensitivity.END_SENSITIVITY_HIGH;
|
||||
return "END_SENSITIVITY_HIGH" as EndSensitivity;
|
||||
case "low":
|
||||
return EndSensitivity.END_SENSITIVITY_LOW;
|
||||
return "END_SENSITIVITY_LOW" as EndSensitivity;
|
||||
default:
|
||||
return undefined;
|
||||
}
|
||||
@@ -249,9 +255,9 @@ function mapActivityHandling(
|
||||
): ActivityHandling | undefined {
|
||||
switch (value) {
|
||||
case "no-interruption":
|
||||
return ActivityHandling.NO_INTERRUPTION;
|
||||
return "NO_INTERRUPTION" as ActivityHandling;
|
||||
case "start-of-activity-interrupts":
|
||||
return ActivityHandling.START_OF_ACTIVITY_INTERRUPTS;
|
||||
return "START_OF_ACTIVITY_INTERRUPTS" as ActivityHandling;
|
||||
default:
|
||||
return undefined;
|
||||
}
|
||||
@@ -260,11 +266,11 @@ function mapActivityHandling(
|
||||
function mapTurnCoverage(value: GoogleRealtimeTurnCoverage | undefined): TurnCoverage | undefined {
|
||||
switch (value) {
|
||||
case "only-activity":
|
||||
return TurnCoverage.TURN_INCLUDES_ONLY_ACTIVITY;
|
||||
return "TURN_INCLUDES_ONLY_ACTIVITY" as TurnCoverage;
|
||||
case "all-input":
|
||||
return TurnCoverage.TURN_INCLUDES_ALL_INPUT;
|
||||
return "TURN_INCLUDES_ALL_INPUT" as TurnCoverage;
|
||||
case "audio-activity-and-all-video":
|
||||
return TurnCoverage.TURN_INCLUDES_AUDIO_ACTIVITY_AND_ALL_VIDEO;
|
||||
return "TURN_INCLUDES_AUDIO_ACTIVITY_AND_ALL_VIDEO" as TurnCoverage;
|
||||
default:
|
||||
return undefined;
|
||||
}
|
||||
@@ -316,7 +322,7 @@ function buildFunctionDeclarations(tools: RealtimeVoiceTool[] | undefined): Func
|
||||
parametersJsonSchema: tool.parameters,
|
||||
};
|
||||
if (tool.name === REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) {
|
||||
declaration.behavior = Behavior.NON_BLOCKING;
|
||||
declaration.behavior = "NON_BLOCKING" as Behavior;
|
||||
}
|
||||
return declaration;
|
||||
});
|
||||
@@ -325,7 +331,7 @@ function buildFunctionDeclarations(tools: RealtimeVoiceTool[] | undefined): Func
|
||||
function buildGoogleLiveConnectConfig(config: GoogleRealtimeLiveConfig): LiveConnectConfig {
|
||||
const functionDeclarations = buildFunctionDeclarations(config.tools);
|
||||
return {
|
||||
responseModalities: [Modality.AUDIO],
|
||||
responseModalities: ["AUDIO" as Modality],
|
||||
...(typeof config.temperature === "number" && config.temperature > 0
|
||||
? { temperature: config.temperature }
|
||||
: {}),
|
||||
@@ -359,7 +365,7 @@ function buildBrowserInitialSetup(model: string) {
|
||||
setup: {
|
||||
model: toGoogleModelResource(model),
|
||||
generationConfig: {
|
||||
responseModalities: [Modality.AUDIO],
|
||||
responseModalities: ["AUDIO" as Modality],
|
||||
},
|
||||
inputAudioTranscription: {},
|
||||
outputAudioTranscription: {},
|
||||
@@ -403,6 +409,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
private audioStreamEnded = false;
|
||||
private pendingFunctionNames = new Map<string, string>();
|
||||
private readonly audioFormat: RealtimeVoiceAudioFormat;
|
||||
private resumptionHandle: string | undefined;
|
||||
|
||||
constructor(private readonly config: GoogleRealtimeVoiceBridgeConfig) {
|
||||
this.audioFormat = config.audioFormat ?? REALTIME_VOICE_AUDIO_FORMAT_G711_ULAW_8KHZ;
|
||||
@@ -425,7 +432,17 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
|
||||
this.session = (await ai.live.connect({
|
||||
model: this.config.model ?? GOOGLE_REALTIME_DEFAULT_MODEL,
|
||||
config: buildGoogleLiveConnectConfig(this.config),
|
||||
config: {
|
||||
...buildGoogleLiveConnectConfig(this.config),
|
||||
...(this.config.sessionResumption === false
|
||||
? {}
|
||||
: {
|
||||
sessionResumption: this.resumptionHandle ? { handle: this.resumptionHandle } : {},
|
||||
}),
|
||||
...(this.config.contextWindowCompression === false
|
||||
? {}
|
||||
: { contextWindowCompression: { slidingWindow: {} } }),
|
||||
},
|
||||
callbacks: {
|
||||
onopen: () => {
|
||||
this.connected = true;
|
||||
@@ -548,7 +565,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
: { output: result },
|
||||
};
|
||||
if (isConsultTool) {
|
||||
functionResponse.scheduling = FunctionResponseScheduling.WHEN_IDLE;
|
||||
functionResponse.scheduling = "WHEN_IDLE" as FunctionResponseScheduling;
|
||||
if (options?.willContinue === true) {
|
||||
functionResponse.willContinue = true;
|
||||
}
|
||||
@@ -607,6 +624,7 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
}
|
||||
|
||||
private handleMessage(message: LiveServerMessage): void {
|
||||
this.captureSessionLifecycle(message);
|
||||
if (message.setupComplete) {
|
||||
this.handleSetupComplete();
|
||||
}
|
||||
@@ -618,6 +636,20 @@ class GoogleRealtimeVoiceBridge implements RealtimeVoiceBridge {
|
||||
}
|
||||
}
|
||||
|
||||
private captureSessionLifecycle(message: LiveServerMessage): void {
|
||||
const raw = message as unknown as {
|
||||
goAway?: { timeLeft?: string };
|
||||
sessionResumptionUpdate?: { newHandle?: string; resumable?: boolean };
|
||||
};
|
||||
const update = raw.sessionResumptionUpdate;
|
||||
if (update?.resumable && update.newHandle) {
|
||||
this.resumptionHandle = update.newHandle;
|
||||
}
|
||||
if (raw.goAway?.timeLeft) {
|
||||
this.config.onError?.(new Error(`Google Live session goAway: ${raw.goAway.timeLeft}`));
|
||||
}
|
||||
}
|
||||
|
||||
private handleSetupComplete(): void {
|
||||
this.sessionConfigured = true;
|
||||
for (const chunk of this.pendingAudio.splice(0)) {
|
||||
@@ -784,6 +816,8 @@ export function buildGoogleRealtimeVoiceProvider(): RealtimeVoiceProviderPlugin
|
||||
turnCoverage: config.turnCoverage,
|
||||
automaticActivityDetectionDisabled: config.automaticActivityDetectionDisabled,
|
||||
enableAffectiveDialog: config.enableAffectiveDialog,
|
||||
sessionResumption: config.sessionResumption,
|
||||
contextWindowCompression: config.contextWindowCompression,
|
||||
thinkingLevel: config.thinkingLevel,
|
||||
thinkingBudget: config.thinkingBudget,
|
||||
});
|
||||
|
||||
@@ -589,9 +589,9 @@ export default definePluginEntry({
|
||||
respondError(respond, "to required", ErrorCodes.INVALID_REQUEST);
|
||||
return;
|
||||
}
|
||||
const rt = await ensureRuntime();
|
||||
const mode =
|
||||
params?.mode === "notify" || params?.mode === "conversation" ? params.mode : undefined;
|
||||
const rt = await ensureRuntime();
|
||||
await initiateCallAndRespond({
|
||||
rt,
|
||||
respond,
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import {
|
||||
RealtimeMulawSpeechStartDetector,
|
||||
RealtimeTwilioAudioPacer,
|
||||
calculateMulawRms,
|
||||
} from "./realtime-audio-pacer.js";
|
||||
|
||||
describe("RealtimeTwilioAudioPacer", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("paces realtime audio as 20ms telephony frames before marks", async () => {
|
||||
vi.useFakeTimers();
|
||||
const sent: unknown[] = [];
|
||||
const pacer = new RealtimeTwilioAudioPacer({
|
||||
streamSid: "MZ-test",
|
||||
sendJson: (message) => {
|
||||
sent.push(message);
|
||||
return true;
|
||||
},
|
||||
});
|
||||
|
||||
pacer.sendAudio(Buffer.alloc(320, 0x7f));
|
||||
pacer.sendMark("audio-1");
|
||||
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(
|
||||
Buffer.from((sent[0] as { media: { payload: string } }).media.payload, "base64"),
|
||||
).toHaveLength(160);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(20);
|
||||
expect(sent).toHaveLength(2);
|
||||
expect(
|
||||
Buffer.from((sent[1] as { media: { payload: string } }).media.payload, "base64"),
|
||||
).toHaveLength(160);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(20);
|
||||
expect(sent[2]).toEqual({
|
||||
event: "mark",
|
||||
streamSid: "MZ-test",
|
||||
mark: { name: "audio-1" },
|
||||
});
|
||||
});
|
||||
|
||||
it("clears queued audio immediately", async () => {
|
||||
vi.useFakeTimers();
|
||||
const sent: unknown[] = [];
|
||||
const pacer = new RealtimeTwilioAudioPacer({
|
||||
streamSid: "MZ-test",
|
||||
sendJson: (message) => {
|
||||
sent.push(message);
|
||||
return true;
|
||||
},
|
||||
});
|
||||
|
||||
pacer.sendAudio(Buffer.alloc(480, 0x7f));
|
||||
pacer.clearAudio();
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
expect(sent).toHaveLength(2);
|
||||
expect(sent[1]).toEqual({ event: "clear", streamSid: "MZ-test" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("RealtimeMulawSpeechStartDetector", () => {
|
||||
it("detects a speech start after consecutive loud chunks and resets after quiet", () => {
|
||||
const detector = new RealtimeMulawSpeechStartDetector({
|
||||
requiredLoudChunks: 2,
|
||||
requiredQuietChunks: 2,
|
||||
rmsThreshold: 0.02,
|
||||
});
|
||||
const silence = Buffer.alloc(160, 0xff);
|
||||
const speech = Buffer.alloc(160, 0x00);
|
||||
|
||||
expect(calculateMulawRms(silence)).toBeLessThan(0.02);
|
||||
expect(calculateMulawRms(speech)).toBeGreaterThan(0.02);
|
||||
expect(detector.accept(speech)).toBe(false);
|
||||
expect(detector.accept(speech)).toBe(true);
|
||||
expect(detector.accept(speech)).toBe(false);
|
||||
expect(detector.accept(silence)).toBe(false);
|
||||
expect(detector.accept(silence)).toBe(false);
|
||||
expect(detector.accept(speech)).toBe(false);
|
||||
expect(detector.accept(speech)).toBe(true);
|
||||
});
|
||||
});
|
||||
176
extensions/voice-call/src/webhook/realtime-audio-pacer.ts
Normal file
176
extensions/voice-call/src/webhook/realtime-audio-pacer.ts
Normal file
@@ -0,0 +1,176 @@
|
||||
import { mulawToPcm } from "openclaw/plugin-sdk/realtime-voice";
|
||||
|
||||
const TELEPHONY_SAMPLE_RATE = 8_000;
|
||||
const TELEPHONY_CHUNK_BYTES = 160;
|
||||
const TELEPHONY_CHUNK_MS = 20;
|
||||
const DEFAULT_SPEECH_RMS_THRESHOLD = 0.02;
|
||||
const DEFAULT_REQUIRED_LOUD_CHUNKS = 2;
|
||||
const DEFAULT_REQUIRED_QUIET_CHUNKS = 10;
|
||||
|
||||
type RealtimeTwilioAudioQueueItem =
|
||||
| {
|
||||
chunk: Buffer;
|
||||
durationMs: number;
|
||||
type: "audio";
|
||||
}
|
||||
| {
|
||||
name: string;
|
||||
type: "mark";
|
||||
};
|
||||
|
||||
export type RealtimeTwilioAudioPacerSendJson = (message: unknown) => boolean;
|
||||
|
||||
export class RealtimeTwilioAudioPacer {
|
||||
private queue: RealtimeTwilioAudioQueueItem[] = [];
|
||||
private timer: ReturnType<typeof setTimeout> | null = null;
|
||||
private closed = false;
|
||||
|
||||
constructor(
|
||||
private readonly params: {
|
||||
sendJson: RealtimeTwilioAudioPacerSendJson;
|
||||
streamSid: string;
|
||||
},
|
||||
) {}
|
||||
|
||||
sendAudio(muLaw: Buffer): void {
|
||||
if (this.closed || muLaw.length === 0) {
|
||||
return;
|
||||
}
|
||||
for (let offset = 0; offset < muLaw.length; offset += TELEPHONY_CHUNK_BYTES) {
|
||||
const chunk = Buffer.from(muLaw.subarray(offset, offset + TELEPHONY_CHUNK_BYTES));
|
||||
this.queue.push({
|
||||
type: "audio",
|
||||
chunk,
|
||||
durationMs: Math.max(1, Math.round((chunk.length / TELEPHONY_SAMPLE_RATE) * 1000)),
|
||||
});
|
||||
}
|
||||
this.ensurePump();
|
||||
}
|
||||
|
||||
sendMark(name: string): void {
|
||||
if (this.closed || !name) {
|
||||
return;
|
||||
}
|
||||
this.queue.push({ type: "mark", name });
|
||||
this.ensurePump();
|
||||
}
|
||||
|
||||
clearAudio(): void {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.clearTimer();
|
||||
this.queue = [];
|
||||
this.params.sendJson({ event: "clear", streamSid: this.params.streamSid });
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.closed = true;
|
||||
this.clearTimer();
|
||||
this.queue = [];
|
||||
}
|
||||
|
||||
private clearTimer(): void {
|
||||
if (!this.timer) {
|
||||
return;
|
||||
}
|
||||
clearTimeout(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
|
||||
private ensurePump(): void {
|
||||
if (!this.timer) {
|
||||
this.pump();
|
||||
}
|
||||
}
|
||||
|
||||
private pump(): void {
|
||||
this.timer = null;
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
const item = this.queue.shift();
|
||||
if (!item) {
|
||||
return;
|
||||
}
|
||||
|
||||
let delayMs = 0;
|
||||
let sent = true;
|
||||
if (item.type === "audio") {
|
||||
sent = this.params.sendJson({
|
||||
event: "media",
|
||||
streamSid: this.params.streamSid,
|
||||
media: { payload: item.chunk.toString("base64") },
|
||||
});
|
||||
delayMs = item.durationMs || TELEPHONY_CHUNK_MS;
|
||||
} else {
|
||||
sent = this.params.sendJson({
|
||||
event: "mark",
|
||||
streamSid: this.params.streamSid,
|
||||
mark: { name: item.name },
|
||||
});
|
||||
}
|
||||
|
||||
if (!sent) {
|
||||
this.queue = [];
|
||||
return;
|
||||
}
|
||||
if (this.queue.length > 0) {
|
||||
this.timer = setTimeout(() => this.pump(), delayMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function calculateMulawRms(muLaw: Buffer): number {
|
||||
if (muLaw.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
const pcm = mulawToPcm(muLaw);
|
||||
const samples = Math.floor(pcm.length / 2);
|
||||
if (samples === 0) {
|
||||
return 0;
|
||||
}
|
||||
let sum = 0;
|
||||
for (let i = 0; i < samples; i += 1) {
|
||||
const normalized = pcm.readInt16LE(i * 2) / 32768;
|
||||
sum += normalized * normalized;
|
||||
}
|
||||
return Math.sqrt(sum / samples);
|
||||
}
|
||||
|
||||
export class RealtimeMulawSpeechStartDetector {
|
||||
private loudChunks = 0;
|
||||
private quietChunks = DEFAULT_REQUIRED_QUIET_CHUNKS;
|
||||
private speaking = false;
|
||||
|
||||
constructor(
|
||||
private readonly params: {
|
||||
requiredLoudChunks?: number;
|
||||
requiredQuietChunks?: number;
|
||||
rmsThreshold?: number;
|
||||
} = {},
|
||||
) {}
|
||||
|
||||
accept(muLaw: Buffer): boolean {
|
||||
const rms = calculateMulawRms(muLaw);
|
||||
const threshold = this.params.rmsThreshold ?? DEFAULT_SPEECH_RMS_THRESHOLD;
|
||||
if (rms >= threshold) {
|
||||
this.quietChunks = 0;
|
||||
this.loudChunks += 1;
|
||||
const requiredLoudChunks = this.params.requiredLoudChunks ?? DEFAULT_REQUIRED_LOUD_CHUNKS;
|
||||
if (!this.speaking && this.loudChunks >= requiredLoudChunks) {
|
||||
this.speaking = true;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
this.loudChunks = 0;
|
||||
this.quietChunks += 1;
|
||||
const requiredQuietChunks = this.params.requiredQuietChunks ?? DEFAULT_REQUIRED_QUIET_CHUNKS;
|
||||
if (this.quietChunks >= requiredQuietChunks) {
|
||||
this.speaking = false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,10 @@ import type { CallManager } from "../manager.js";
|
||||
import type { VoiceCallProvider } from "../providers/base.js";
|
||||
import type { CallRecord, NormalizedEvent } from "../types.js";
|
||||
import type { WebhookResponsePayload } from "../webhook.types.js";
|
||||
import {
|
||||
RealtimeMulawSpeechStartDetector,
|
||||
RealtimeTwilioAudioPacer,
|
||||
} from "./realtime-audio-pacer.js";
|
||||
|
||||
export type ToolHandlerContext = {
|
||||
partialUserTranscript?: string;
|
||||
@@ -29,6 +33,7 @@ export type ToolHandlerFn = (
|
||||
const STREAM_TOKEN_TTL_MS = 30_000;
|
||||
const DEFAULT_HOST = "localhost:8443";
|
||||
const MAX_REALTIME_MESSAGE_BYTES = 256 * 1024;
|
||||
const MAX_REALTIME_WS_BUFFERED_BYTES = 1024 * 1024;
|
||||
|
||||
function normalizePath(pathname: string): string {
|
||||
const trimmed = pathname.trim();
|
||||
@@ -179,7 +184,8 @@ export class RealtimeCallHandler {
|
||||
? (msg.media as Record<string, unknown>)
|
||||
: undefined;
|
||||
if (msg.event === "media" && typeof mediaData?.payload === "string") {
|
||||
bridge.sendAudio(Buffer.from(mediaData.payload, "base64"));
|
||||
const audio = Buffer.from(mediaData.payload, "base64");
|
||||
bridge.sendAudio(audio);
|
||||
if (typeof mediaData.timestamp === "number") {
|
||||
bridge.setMediaTimestamp(mediaData.timestamp);
|
||||
} else if (typeof mediaData.timestamp === "string") {
|
||||
@@ -278,7 +284,24 @@ export class RealtimeCallHandler {
|
||||
this.endCallInManager(callSid, callId, reason);
|
||||
};
|
||||
|
||||
const bridge = createRealtimeVoiceBridgeSession({
|
||||
const sendJson = (message: unknown): boolean => {
|
||||
if (ws.readyState !== WebSocket.OPEN) {
|
||||
return false;
|
||||
}
|
||||
if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) {
|
||||
ws.close(1013, "Backpressure: send buffer exceeded");
|
||||
return false;
|
||||
}
|
||||
ws.send(JSON.stringify(message));
|
||||
if (ws.bufferedAmount > MAX_REALTIME_WS_BUFFERED_BYTES) {
|
||||
ws.close(1013, "Backpressure: send buffer exceeded");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
const audioPacer = new RealtimeTwilioAudioPacer({ streamSid, sendJson });
|
||||
const speechDetector = new RealtimeMulawSpeechStartDetector();
|
||||
const session = createRealtimeVoiceBridgeSession({
|
||||
provider: this.realtimeProvider,
|
||||
providerConfig: this.providerConfig,
|
||||
instructions: this.config.instructions,
|
||||
@@ -288,19 +311,13 @@ export class RealtimeCallHandler {
|
||||
audioSink: {
|
||||
isOpen: () => ws.readyState === WebSocket.OPEN,
|
||||
sendAudio: (muLaw) => {
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
event: "media",
|
||||
streamSid,
|
||||
media: { payload: muLaw.toString("base64") },
|
||||
}),
|
||||
);
|
||||
audioPacer.sendAudio(muLaw);
|
||||
},
|
||||
clearAudio: () => {
|
||||
ws.send(JSON.stringify({ event: "clear", streamSid }));
|
||||
audioPacer.clearAudio();
|
||||
},
|
||||
sendMark: (markName) => {
|
||||
ws.send(JSON.stringify({ event: "mark", streamSid, mark: { name: markName } }));
|
||||
audioPacer.sendMark(markName);
|
||||
},
|
||||
},
|
||||
onTranscript: (role, text, isFinal) => {
|
||||
@@ -367,24 +384,32 @@ export class RealtimeCallHandler {
|
||||
});
|
||||
},
|
||||
});
|
||||
this.activeBridgesByCallId.set(callId, bridge);
|
||||
this.activeBridgesByCallId.set(callSid, bridge);
|
||||
const closeBridge = bridge.close.bind(bridge);
|
||||
bridge.close = () => {
|
||||
this.activeBridgesByCallId.set(callId, session);
|
||||
this.activeBridgesByCallId.set(callSid, session);
|
||||
const sendAudioToSession = session.sendAudio.bind(session);
|
||||
session.sendAudio = (audio) => {
|
||||
if (speechDetector.accept(audio)) {
|
||||
audioPacer.clearAudio();
|
||||
}
|
||||
sendAudioToSession(audio);
|
||||
};
|
||||
const closeSession = session.close.bind(session);
|
||||
session.close = () => {
|
||||
this.activeBridgesByCallId.delete(callId);
|
||||
this.activeBridgesByCallId.delete(callSid);
|
||||
this.partialUserTranscriptsByCallId.delete(callId);
|
||||
closeBridge();
|
||||
audioPacer.close();
|
||||
closeSession();
|
||||
};
|
||||
|
||||
bridge.connect().catch((error: Error) => {
|
||||
session.connect().catch((error: Error) => {
|
||||
console.error("[voice-call] Failed to connect realtime bridge:", error);
|
||||
bridge.close();
|
||||
session.close();
|
||||
emitCallEnd("error");
|
||||
ws.close(1011, "Failed to connect");
|
||||
});
|
||||
|
||||
return bridge;
|
||||
return session;
|
||||
}
|
||||
|
||||
private registerCallInManager(
|
||||
|
||||
Reference in New Issue
Block a user