From 7050af56d40037c10d966ac0597d275c0bf9c980 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Sun, 3 May 2026 22:58:19 -0700 Subject: [PATCH] fix(voice-call): bound realtime audio pacing --- CHANGELOG.md | 1 + .../src/webhook/realtime-audio-pacer.test.ts | 22 +++++++ .../src/webhook/realtime-audio-pacer.ts | 19 +++++++ .../src/webhook/realtime-handler.test.ts | 57 +++++++++++++++++++ .../src/webhook/realtime-handler.ts | 10 +++- 5 files changed, 108 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 18061cd245c..a1cc3fda725 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Docs: https://docs.openclaw.ai - Agents/commands: add `/steer ` for queue-independent steering of the active current-session run without starting a new turn when the session is idle. (#76934) - Agents/subagents: preserve every grouped child result when direct completion fallback has to bypass the requester-agent announce turn. Thanks @vincentkoc. - TTS/telephony: honor provider voice/model overrides in telephony synthesis providers so Google Meet agent speech logs match the backend that actually produced the audio. Thanks @vincentkoc. +- Voice Call/realtime: bound the paced Twilio audio queue and close overloaded realtime streams before provider audio can pile up behind the websocket backpressure guard. Thanks @vincentkoc. - Tools/BTW: add `/side` as a text and native slash-command alias for `/btw` side questions. - Doctor/config: `doctor --fix` now commits safe legacy migrations even when unrelated validation issues (e.g. a missing plugin) prevent full validation from passing, so `agents.defaults.llm` and other known-legacy keys are always cleaned up by `doctor --fix` regardless of other config problems. Fixes #76798. (#76800) Thanks @hclsys. - Docs: clarify that IRC uses raw TCP/TLS sockets outside operator-managed forward proxy routing, so direct IRC egress should be explicitly approved before enabling IRC. Thanks @jesse-merhi. diff --git a/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts b/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts index f87a7e78f99..d347ec75e7b 100644 --- a/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts +++ b/extensions/voice-call/src/webhook/realtime-audio-pacer.test.ts @@ -61,6 +61,28 @@ describe("RealtimeTwilioAudioPacer", () => { expect(sent).toHaveLength(2); expect(sent[1]).toEqual({ event: "clear", streamSid: "MZ-test" }); }); + + it("stops instead of buffering unbounded realtime audio", async () => { + vi.useFakeTimers(); + const sent: unknown[] = []; + const onBackpressure = vi.fn(); + const pacer = new RealtimeTwilioAudioPacer({ + streamSid: "MZ-test", + maxQueuedAudioBytes: 320, + onBackpressure, + sendJson: (message) => { + sent.push(message); + return true; + }, + }); + + pacer.sendAudio(Buffer.alloc(480, 0x7f)); + pacer.sendMark("after-overflow"); + await vi.advanceTimersByTimeAsync(100); + + expect(onBackpressure).toHaveBeenCalledOnce(); + expect(sent).toEqual([]); + }); }); describe("RealtimeMulawSpeechStartDetector", () => { diff --git a/extensions/voice-call/src/webhook/realtime-audio-pacer.ts b/extensions/voice-call/src/webhook/realtime-audio-pacer.ts index 3a820ae626e..3e4c8183f93 100644 --- a/extensions/voice-call/src/webhook/realtime-audio-pacer.ts +++ b/extensions/voice-call/src/webhook/realtime-audio-pacer.ts @@ -4,6 +4,7 @@ const TELEPHONY_CHUNK_MS = 20; const DEFAULT_SPEECH_RMS_THRESHOLD = 0.02; const DEFAULT_REQUIRED_LOUD_CHUNKS = 2; const DEFAULT_REQUIRED_QUIET_CHUNKS = 10; +const DEFAULT_MAX_QUEUED_AUDIO_BYTES = TELEPHONY_SAMPLE_RATE * 120; const PCM16_MAX_AMPLITUDE = 32768; const MULAW_LINEAR_SAMPLES = new Int16Array(256); @@ -27,10 +28,13 @@ export type RealtimeTwilioAudioPacerSendJson = (message: unknown) => boolean; export class RealtimeTwilioAudioPacer { private queue: RealtimeTwilioAudioQueueItem[] = []; private timer: ReturnType | null = null; + private queuedAudioBytes = 0; private closed = false; constructor( private readonly params: { + maxQueuedAudioBytes?: number; + onBackpressure?: () => void; sendJson: RealtimeTwilioAudioPacerSendJson; streamSid: string; }, @@ -40,13 +44,19 @@ export class RealtimeTwilioAudioPacer { if (this.closed || muLaw.length === 0) { return; } + const maxQueuedAudioBytes = this.params.maxQueuedAudioBytes ?? DEFAULT_MAX_QUEUED_AUDIO_BYTES; for (let offset = 0; offset < muLaw.length; offset += TELEPHONY_CHUNK_BYTES) { const chunk = Buffer.from(muLaw.subarray(offset, offset + TELEPHONY_CHUNK_BYTES)); + if (this.queuedAudioBytes + chunk.length > maxQueuedAudioBytes) { + this.failBackpressure(); + return; + } this.queue.push({ type: "audio", chunk, durationMs: Math.max(1, Math.round((chunk.length / TELEPHONY_SAMPLE_RATE) * 1000)), }); + this.queuedAudioBytes += chunk.length; } this.ensurePump(); } @@ -65,6 +75,7 @@ export class RealtimeTwilioAudioPacer { } this.clearTimer(); this.queue = []; + this.queuedAudioBytes = 0; this.params.sendJson({ event: "clear", streamSid: this.params.streamSid }); } @@ -72,6 +83,7 @@ export class RealtimeTwilioAudioPacer { this.closed = true; this.clearTimer(); this.queue = []; + this.queuedAudioBytes = 0; } private clearTimer(): void { @@ -88,6 +100,11 @@ export class RealtimeTwilioAudioPacer { } } + private failBackpressure(): void { + this.close(); + this.params.onBackpressure?.(); + } + private pump(): void { this.timer = null; if (this.closed) { @@ -101,6 +118,7 @@ export class RealtimeTwilioAudioPacer { let delayMs = 0; let sent = true; if (item.type === "audio") { + this.queuedAudioBytes = Math.max(0, this.queuedAudioBytes - item.chunk.length); sent = this.params.sendJson({ event: "media", streamSid: this.params.streamSid, @@ -117,6 +135,7 @@ export class RealtimeTwilioAudioPacer { if (!sent) { this.queue = []; + this.queuedAudioBytes = 0; return; } if (this.queue.length > 0) { diff --git a/extensions/voice-call/src/webhook/realtime-handler.test.ts b/extensions/voice-call/src/webhook/realtime-handler.test.ts index 7a846902736..f6d6592361a 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.test.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.test.ts @@ -629,6 +629,63 @@ describe("RealtimeCallHandler path routing", () => { }); describe("RealtimeCallHandler websocket hardening", () => { + it("closes realtime streams when paced outbound audio exceeds the internal queue cap", async () => { + let sendProviderAudio: ((audio: Buffer) => void) | undefined; + const createBridge = vi.fn( + (request: Parameters[0]) => { + sendProviderAudio = request.onAudio; + return makeBridge(); + }, + ); + const handler = makeHandler(undefined, { + manager: { + getCallByProviderCallId: vi.fn( + (): CallRecord => ({ + callId: "call-1", + providerCallId: "CA-backpressure", + provider: "twilio", + direction: "inbound", + state: "ringing", + from: "+15550001234", + to: "+15550009999", + startedAt: Date.now(), + transcript: [], + processedEventIds: [], + metadata: {}, + }), + ), + }, + realtimeProvider: makeRealtimeProvider(createBridge), + }); + const server = await startRealtimeServer(handler); + + try { + const ws = await connectWs(server.url); + try { + ws.send( + JSON.stringify({ + event: "start", + start: { streamSid: "MZ-backpressure", callSid: "CA-backpressure" }, + }), + ); + await vi.waitFor(() => { + expect(sendProviderAudio).toBeDefined(); + }); + + sendProviderAudio?.(Buffer.alloc(8_000 * 121, 0x7f)); + const closed = await waitForClose(ws); + + expect(closed.code).toBe(1013); + } finally { + if (ws.readyState !== WebSocket.CLOSED && ws.readyState !== WebSocket.CLOSING) { + ws.close(); + } + } + } finally { + await server.close(); + } + }); + it("rejects oversized pre-start frames before bridge setup", async () => { const createBridge = vi.fn(() => makeBridge()); const processEvent = vi.fn(); diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts index 542d9d613a9..e60f1e946e5 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -299,7 +299,15 @@ export class RealtimeCallHandler { } return true; }; - const audioPacer = new RealtimeTwilioAudioPacer({ streamSid, sendJson }); + const audioPacer = new RealtimeTwilioAudioPacer({ + streamSid, + sendJson, + onBackpressure: () => { + if (ws.readyState === WebSocket.OPEN) { + ws.close(1013, "Backpressure: paced audio queue exceeded"); + } + }, + }); const speechDetector = new RealtimeMulawSpeechStartDetector(); const session = createRealtimeVoiceBridgeSession({ provider: this.realtimeProvider,