fix(voice-call): bound realtime audio pacing

This commit is contained in:
Vincent Koc
2026-05-03 22:58:19 -07:00
parent 83037720d9
commit 7050af56d4
5 changed files with 108 additions and 1 deletions

View File

@@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai
- Agents/commands: add `/steer <message>` for queue-independent steering of the active current-session run without starting a new turn when the session is idle. (#76934)
- Agents/subagents: preserve every grouped child result when direct completion fallback has to bypass the requester-agent announce turn. Thanks @vincentkoc.
- TTS/telephony: honor provider voice/model overrides in telephony synthesis providers so Google Meet agent speech logs match the backend that actually produced the audio. Thanks @vincentkoc.
- Voice Call/realtime: bound the paced Twilio audio queue and close overloaded realtime streams before provider audio can pile up behind the websocket backpressure guard. Thanks @vincentkoc.
- Tools/BTW: add `/side` as a text and native slash-command alias for `/btw` side questions.
- Doctor/config: `doctor --fix` now commits safe legacy migrations even when unrelated validation issues (e.g. a missing plugin) prevent full validation from passing, so `agents.defaults.llm` and other known-legacy keys are always cleaned up by `doctor --fix` regardless of other config problems. Fixes #76798. (#76800) Thanks @hclsys.
- Docs: clarify that IRC uses raw TCP/TLS sockets outside operator-managed forward proxy routing, so direct IRC egress should be explicitly approved before enabling IRC. Thanks @jesse-merhi.

View File

@@ -61,6 +61,28 @@ describe("RealtimeTwilioAudioPacer", () => {
expect(sent).toHaveLength(2);
expect(sent[1]).toEqual({ event: "clear", streamSid: "MZ-test" });
});
it("stops instead of buffering unbounded realtime audio", async () => {
vi.useFakeTimers();
const sent: unknown[] = [];
const onBackpressure = vi.fn();
const pacer = new RealtimeTwilioAudioPacer({
streamSid: "MZ-test",
maxQueuedAudioBytes: 320,
onBackpressure,
sendJson: (message) => {
sent.push(message);
return true;
},
});
pacer.sendAudio(Buffer.alloc(480, 0x7f));
pacer.sendMark("after-overflow");
await vi.advanceTimersByTimeAsync(100);
expect(onBackpressure).toHaveBeenCalledOnce();
expect(sent).toEqual([]);
});
});
describe("RealtimeMulawSpeechStartDetector", () => {

View File

@@ -4,6 +4,7 @@ const TELEPHONY_CHUNK_MS = 20;
const DEFAULT_SPEECH_RMS_THRESHOLD = 0.02;
const DEFAULT_REQUIRED_LOUD_CHUNKS = 2;
const DEFAULT_REQUIRED_QUIET_CHUNKS = 10;
const DEFAULT_MAX_QUEUED_AUDIO_BYTES = TELEPHONY_SAMPLE_RATE * 120;
const PCM16_MAX_AMPLITUDE = 32768;
const MULAW_LINEAR_SAMPLES = new Int16Array(256);
@@ -27,10 +28,13 @@ export type RealtimeTwilioAudioPacerSendJson = (message: unknown) => boolean;
export class RealtimeTwilioAudioPacer {
private queue: RealtimeTwilioAudioQueueItem[] = [];
private timer: ReturnType<typeof setTimeout> | null = null;
private queuedAudioBytes = 0;
private closed = false;
constructor(
private readonly params: {
maxQueuedAudioBytes?: number;
onBackpressure?: () => void;
sendJson: RealtimeTwilioAudioPacerSendJson;
streamSid: string;
},
@@ -40,13 +44,19 @@ export class RealtimeTwilioAudioPacer {
if (this.closed || muLaw.length === 0) {
return;
}
const maxQueuedAudioBytes = this.params.maxQueuedAudioBytes ?? DEFAULT_MAX_QUEUED_AUDIO_BYTES;
for (let offset = 0; offset < muLaw.length; offset += TELEPHONY_CHUNK_BYTES) {
const chunk = Buffer.from(muLaw.subarray(offset, offset + TELEPHONY_CHUNK_BYTES));
if (this.queuedAudioBytes + chunk.length > maxQueuedAudioBytes) {
this.failBackpressure();
return;
}
this.queue.push({
type: "audio",
chunk,
durationMs: Math.max(1, Math.round((chunk.length / TELEPHONY_SAMPLE_RATE) * 1000)),
});
this.queuedAudioBytes += chunk.length;
}
this.ensurePump();
}
@@ -65,6 +75,7 @@ export class RealtimeTwilioAudioPacer {
}
this.clearTimer();
this.queue = [];
this.queuedAudioBytes = 0;
this.params.sendJson({ event: "clear", streamSid: this.params.streamSid });
}
@@ -72,6 +83,7 @@ export class RealtimeTwilioAudioPacer {
this.closed = true;
this.clearTimer();
this.queue = [];
this.queuedAudioBytes = 0;
}
private clearTimer(): void {
@@ -88,6 +100,11 @@ export class RealtimeTwilioAudioPacer {
}
}
private failBackpressure(): void {
this.close();
this.params.onBackpressure?.();
}
private pump(): void {
this.timer = null;
if (this.closed) {
@@ -101,6 +118,7 @@ export class RealtimeTwilioAudioPacer {
let delayMs = 0;
let sent = true;
if (item.type === "audio") {
this.queuedAudioBytes = Math.max(0, this.queuedAudioBytes - item.chunk.length);
sent = this.params.sendJson({
event: "media",
streamSid: this.params.streamSid,
@@ -117,6 +135,7 @@ export class RealtimeTwilioAudioPacer {
if (!sent) {
this.queue = [];
this.queuedAudioBytes = 0;
return;
}
if (this.queue.length > 0) {

View File

@@ -629,6 +629,63 @@ describe("RealtimeCallHandler path routing", () => {
});
describe("RealtimeCallHandler websocket hardening", () => {
it("closes realtime streams when paced outbound audio exceeds the internal queue cap", async () => {
let sendProviderAudio: ((audio: Buffer) => void) | undefined;
const createBridge = vi.fn(
(request: Parameters<RealtimeVoiceProviderPlugin["createBridge"]>[0]) => {
sendProviderAudio = request.onAudio;
return makeBridge();
},
);
const handler = makeHandler(undefined, {
manager: {
getCallByProviderCallId: vi.fn(
(): CallRecord => ({
callId: "call-1",
providerCallId: "CA-backpressure",
provider: "twilio",
direction: "inbound",
state: "ringing",
from: "+15550001234",
to: "+15550009999",
startedAt: Date.now(),
transcript: [],
processedEventIds: [],
metadata: {},
}),
),
},
realtimeProvider: makeRealtimeProvider(createBridge),
});
const server = await startRealtimeServer(handler);
try {
const ws = await connectWs(server.url);
try {
ws.send(
JSON.stringify({
event: "start",
start: { streamSid: "MZ-backpressure", callSid: "CA-backpressure" },
}),
);
await vi.waitFor(() => {
expect(sendProviderAudio).toBeDefined();
});
sendProviderAudio?.(Buffer.alloc(8_000 * 121, 0x7f));
const closed = await waitForClose(ws);
expect(closed.code).toBe(1013);
} finally {
if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) {
ws.close();
}
}
} finally {
await server.close();
}
});
it("rejects oversized pre-start frames before bridge setup", async () => {
const createBridge = vi.fn(() => makeBridge());
const processEvent = vi.fn();

View File

@@ -299,7 +299,15 @@ export class RealtimeCallHandler {
}
return true;
};
const audioPacer = new RealtimeTwilioAudioPacer({ streamSid, sendJson });
const audioPacer = new RealtimeTwilioAudioPacer({
streamSid,
sendJson,
onBackpressure: () => {
if (ws.readyState === WebSocket.OPEN) {
ws.close(1013, "Backpressure: paced audio queue exceeded");
}
},
});
const speechDetector = new RealtimeMulawSpeechStartDetector();
const session = createRealtimeVoiceBridgeSession({
provider: this.realtimeProvider,