From c399fb750b501f4b4ff559f74a4ce4314f6f3994 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 28 Apr 2026 20:57:41 +0100 Subject: [PATCH] fix(ui): handle Google Live binary talk frames --- CHANGELOG.md | 1 + ui/src/ui/chat/realtime-talk-google-live.ts | 80 +++++- ui/src/ui/realtime-talk-google-live.test.ts | 272 +++++++++++++++++++- 3 files changed, 343 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 54482e65d5b..4119f627fd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Channels/Discord: suppress duplicate gateway monitors when multiple enabled accounts resolve to the same bot token, preferring config tokens over default env fallback and reporting skipped duplicates as disabled. Supersedes #73608. Thanks @kagura-agent. +- Control UI/Talk: decode Google Live binary WebSocket JSON frames and stop queued browser audio on interruption or shutdown, so browser Talk leaves `Connecting Talk...` and barge-in no longer plays stale audio. Fixes #73601 and #73460; supersedes #73466. Thanks @Spolen23 and @WadydX. - Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash. - Agents/bootstrap: pass pending BOOTSTRAP.md contents through the first-run user prompt while keeping them out of privileged system context, and show limited bootstrap guidance when workspace file access is unavailable. Fixes #73622. Thanks @mark1010. - ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26. diff --git a/ui/src/ui/chat/realtime-talk-google-live.ts b/ui/src/ui/chat/realtime-talk-google-live.ts index 7108af42280..5d6254e3356 100644 --- a/ui/src/ui/chat/realtime-talk-google-live.ts +++ b/ui/src/ui/chat/realtime-talk-google-live.ts @@ -74,6 +74,7 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { private playhead = 0; private closed = false; private pendingCalls = new Map(); + private readonly sources = new Set(); constructor( private readonly session: RealtimeTalkJsonPcmWebSocketSessionResult, @@ -93,11 +94,17 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { this.inputContext = new AudioContext({ sampleRate: this.session.audio.inputSampleRateHz }); this.outputContext = new AudioContext({ sampleRate: this.session.audio.outputSampleRateHz }); this.ws = new WebSocket(wsUrl); + this.ws.binaryType = "arraybuffer"; this.ws.addEventListener("open", () => { + if (this.closed) { + return; + } this.send(this.session.initialMessage ?? { setup: {} }); this.startMicrophonePump(); }); - this.ws.addEventListener("message", (event) => this.handleMessage(event.data)); + this.ws.addEventListener("message", (event) => { + void this.handleMessage(event.data); + }); this.ws.addEventListener("close", () => { if (!this.closed) { this.ctx.callbacks.onStatus?.("error", "Realtime connection closed"); @@ -119,6 +126,7 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { this.inputSource = null; this.media?.getTracks().forEach((track) => track.stop()); this.media = null; + this.stopOutput(); void this.inputContext?.close(); this.inputContext = null; void this.outputContext?.close(); @@ -128,7 +136,7 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { } private startMicrophonePump(): void { - if (!this.media || !this.inputContext) { + if (this.closed || !this.media || !this.inputContext) { return; } this.inputSource = this.inputContext.createMediaStreamSource(this.media); @@ -152,24 +160,30 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { } private send(message: unknown): void { - if (this.ws?.readyState === WebSocket.OPEN) { + if (!this.closed && this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } } - private handleMessage(data: unknown): void { + private async handleMessage(data: unknown): Promise { + if (this.closed) { + return; + } let message: GoogleLiveMessage; try { - message = JSON.parse(String(data)) as GoogleLiveMessage; + message = JSON.parse(await decodeGoogleLiveMessageData(data)) as GoogleLiveMessage; } catch { return; } + if (this.closed) { + return; + } if (message.setupComplete) { this.ctx.callbacks.onStatus?.("listening"); } const content = message.serverContent; if (content?.interrupted) { - this.playhead = this.outputContext?.currentTime ?? 0; + this.stopOutput(); } if (content?.inputTranscription?.text) { this.ctx.callbacks.onTranscript?.({ @@ -216,6 +230,8 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { ); buffer.getChannelData(0).set(samples); const source = this.outputContext.createBufferSource(); + this.sources.add(source); + source.addEventListener("ended", () => this.sources.delete(source)); source.buffer = buffer; source.connect(this.outputContext.destination); const startAt = Math.max(this.outputContext.currentTime, this.playhead); @@ -223,6 +239,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { this.playhead = startAt + buffer.duration; } + private stopOutput(): void { + for (const source of this.sources) { + try { + source.stop(); + } catch {} + } + this.sources.clear(); + this.playhead = this.outputContext?.currentTime ?? 0; + } + private async handleToolCall(call: { id?: string; name?: string; @@ -238,13 +264,31 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { return; } await submitRealtimeTalkConsult({ - ctx: this.ctx, + ctx: this.createActiveContext(), callId, args: call.args ?? {}, submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), }); } + private createActiveContext(): RealtimeTalkTransportContext { + return { + ...this.ctx, + callbacks: { + onStatus: (status, detail) => { + if (!this.closed) { + this.ctx.callbacks.onStatus?.(status, detail); + } + }, + onTranscript: (entry) => { + if (!this.closed) { + this.ctx.callbacks.onTranscript?.(entry); + } + }, + }, + }; + } + private submitToolResult(callId: string, result: unknown): void { const pending = this.pendingCalls.get(callId); if (!pending) { @@ -268,3 +312,25 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { }); } } + +async function decodeGoogleLiveMessageData(data: unknown): Promise { + if (typeof data === "string") { + return data; + } + if (typeof Blob !== "undefined" && data instanceof Blob) { + data = await data.arrayBuffer(); + } + if (isArrayBufferLike(data)) { + return new TextDecoder().decode(new Uint8Array(data)); + } + if (ArrayBuffer.isView(data)) { + return new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength)); + } + return String(data); +} + +function isArrayBufferLike(data: unknown): data is ArrayBuffer { + return ( + data instanceof ArrayBuffer || Object.prototype.toString.call(data) === "[object ArrayBuffer]" + ); +} diff --git a/ui/src/ui/realtime-talk-google-live.test.ts b/ui/src/ui/realtime-talk-google-live.test.ts index 005ed7be482..b2e652d5123 100644 --- a/ui/src/ui/realtime-talk-google-live.test.ts +++ b/ui/src/ui/realtime-talk-google-live.test.ts @@ -1,6 +1,115 @@ -import { describe, expect, it } from "vitest"; -import { buildGoogleLiveUrl } from "./chat/realtime-talk-google-live.ts"; -import type { RealtimeTalkJsonPcmWebSocketSessionResult } from "./chat/realtime-talk-shared.ts"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { + buildGoogleLiveUrl, + GoogleLiveRealtimeTalkTransport, +} from "./chat/realtime-talk-google-live.ts"; +import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "./chat/realtime-talk-shared.ts"; +import type { + RealtimeTalkJsonPcmWebSocketSessionResult, + RealtimeTalkTransportContext, +} from "./chat/realtime-talk-shared.ts"; + +type MockWebSocketEvent = { + data?: unknown; + code?: number; + reason?: string; +}; + +type MockWebSocketHandler = (event?: MockWebSocketEvent) => void; +type MockWebSocketEventType = "close" | "error" | "message" | "open"; + +const wsInstances: MockGoogleLiveWebSocket[] = []; +const createdSources: MockAudioBufferSource[] = []; + +class MockGoogleLiveWebSocket { + static OPEN = 1; + + readonly handlers: Record = { + close: [], + error: [], + message: [], + open: [], + }; + readonly sent: string[] = []; + binaryType: BinaryType = "blob"; + readyState = MockGoogleLiveWebSocket.OPEN; + + constructor(readonly url: string) { + wsInstances.push(this); + } + + addEventListener(type: MockWebSocketEventType, handler: MockWebSocketHandler) { + this.handlers[type].push(handler); + } + + send(data: string) { + this.sent.push(data); + } + + close() { + this.readyState = 3; + } + + emitOpen() { + for (const handler of this.handlers.open) { + handler(); + } + } + + emitMessage(data: unknown) { + for (const handler of this.handlers.message) { + handler({ data }); + } + } +} + +class MockAudioBufferSource { + buffer: unknown = null; + readonly addEventListener = vi.fn(); + readonly connect = vi.fn(); + readonly start = vi.fn(); + readonly stop = vi.fn(); +} + +class MockAudioContext { + readonly currentTime = 0; + readonly destination = {}; + readonly sampleRate: number; + readonly close = vi.fn(async () => undefined); + + constructor(options?: { sampleRate?: number }) { + this.sampleRate = options?.sampleRate ?? 24000; + } + + createMediaStreamSource() { + return { + connect: vi.fn(), + disconnect: vi.fn(), + }; + } + + createScriptProcessor() { + return { + connect: vi.fn(), + disconnect: vi.fn(), + onaudioprocess: null, + }; + } + + createBuffer(_channels: number, length: number, sampleRate: number) { + const channel = new Float32Array(length); + return { + duration: length / sampleRate, + getChannelData: () => channel, + }; + } + + createBufferSource() { + const source = new MockAudioBufferSource(); + createdSources.push(source); + return source; + } +} function createSession( websocketUrl: string, @@ -21,6 +130,163 @@ function createSession( }; } +function createClient(): RealtimeTalkTransportContext["client"] { + const client = { + addEventListener: vi.fn(() => () => undefined), + request: vi.fn(), + } as unknown as RealtimeTalkTransportContext["client"]; + return client; +} + +function createTransport( + callbacks: RealtimeTalkTransportContext["callbacks"] = {}, + client = createClient(), +) { + return new GoogleLiveRealtimeTalkTransport( + createSession( + "wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained", + ), + { + callbacks, + client, + sessionKey: "main", + }, + ); +} + +function encodeJsonFrame(value: unknown): ArrayBuffer { + return new TextEncoder().encode(JSON.stringify(value)).buffer; +} + +function latestWebSocket(): MockGoogleLiveWebSocket { + const ws = wsInstances.at(-1); + if (!ws) { + throw new Error("missing WebSocket"); + } + return ws; +} + +describe("GoogleLiveRealtimeTalkTransport", () => { + beforeEach(() => { + wsInstances.length = 0; + createdSources.length = 0; + vi.stubGlobal("WebSocket", MockGoogleLiveWebSocket); + vi.stubGlobal("AudioContext", MockAudioContext); + vi.stubGlobal("navigator", { + mediaDevices: { + getUserMedia: vi.fn(async () => ({ + getTracks: () => [{ stop: vi.fn() }], + })), + }, + }); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + }); + + it("requests ArrayBuffer frames and decodes binary setup messages", async () => { + const onStatus = vi.fn(); + const transport = createTransport({ onStatus }); + + await transport.start(); + const ws = latestWebSocket(); + ws.emitMessage(encodeJsonFrame({ setupComplete: {} })); + + expect(ws.binaryType).toBe("arraybuffer"); + await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening")); + }); + + it("decodes Blob setup messages", async () => { + const onStatus = vi.fn(); + const transport = createTransport({ onStatus }); + + await transport.start(); + latestWebSocket().emitMessage(new Blob([JSON.stringify({ setupComplete: {} })])); + + await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening")); + }); + + it("stops queued output when Google Live sends interruption", async () => { + const transport = createTransport(); + await transport.start(); + const ws = latestWebSocket(); + + ws.emitMessage( + encodeJsonFrame({ + serverContent: { + modelTurn: { + parts: [{ inlineData: { data: "AAAAAA==", mimeType: "audio/pcm;rate=24000" } }], + }, + }, + }), + ); + await vi.waitFor(() => expect(createdSources).toHaveLength(1)); + + const source = createdSources[0]; + ws.emitMessage(encodeJsonFrame({ serverContent: { interrupted: true } })); + + await vi.waitFor(() => expect(source?.stop).toHaveBeenCalledTimes(1)); + }); + + it("ignores late WebSocket events after stop", async () => { + const onStatus = vi.fn(); + const transport = createTransport({ onStatus }); + await transport.start(); + const ws = latestWebSocket(); + + transport.stop(); + ws.emitOpen(); + ws.emitMessage(new Blob([JSON.stringify({ setupComplete: {} })])); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(ws.sent).toEqual([]); + expect(onStatus).not.toHaveBeenCalled(); + }); + + it("does not revive Talk status after stop while a tool consult settles", async () => { + const onStatus = vi.fn(); + let runId = "run-1"; + const listeners = new Set<(event: { event: string; payload?: unknown }) => void>(); + const client = { + addEventListener: vi.fn((listener: (event: { event: string; payload?: unknown }) => void) => { + listeners.add(listener); + return () => listeners.delete(listener); + }), + request: vi.fn(async (_method: string, params: { idempotencyKey?: string }) => { + runId = params.idempotencyKey ?? runId; + return { runId }; + }), + } as unknown as RealtimeTalkTransportContext["client"]; + const transport = createTransport({ onStatus }, client); + await transport.start(); + + latestWebSocket().emitMessage( + encodeJsonFrame({ + toolCall: { + functionCalls: [ + { + id: "call-1", + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + args: { question: "check the session" }, + }, + ], + }, + }), + ); + await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("thinking", undefined)); + await vi.waitFor(() => expect(listeners.size).toBe(1)); + + transport.stop(); + for (const listener of listeners) { + listener({ event: "chat", payload: { runId, state: "final", message: { text: "done" } } }); + } + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(onStatus).not.toHaveBeenCalledWith("listening"); + }); +}); + describe("Google Live realtime Talk URL", () => { it("only preserves the allowlisted Google Live endpoint and appends the ephemeral token", () => { const url = buildGoogleLiveUrl(