fix(ui): handle Google Live binary talk frames

This commit is contained in:
Peter Steinberger
2026-04-28 20:57:41 +01:00
parent 0a2d635e68
commit c399fb750b
3 changed files with 343 additions and 10 deletions

View File

@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Channels/Discord: suppress duplicate gateway monitors when multiple enabled accounts resolve to the same bot token, preferring config tokens over default env fallback and reporting skipped duplicates as disabled. Supersedes #73608. Thanks @kagura-agent.
- Control UI/Talk: decode Google Live binary WebSocket JSON frames and stop queued browser audio on interruption or shutdown, so browser Talk leaves `Connecting Talk...` and barge-in no longer plays stale audio. Fixes #73601 and #73460; supersedes #73466. Thanks @Spolen23 and @WadydX.
- Channels/Discord: ignore stale route-shaped conversation bindings after a Discord channel is reconfigured to another agent, while preserving explicit focus and subagent bindings. Fixes #73626. Thanks @ramitrkar-hash.
- Agents/bootstrap: pass pending BOOTSTRAP.md contents through the first-run user prompt while keeping them out of privileged system context, and show limited bootstrap guidance when workspace file access is unavailable. Fixes #73622. Thanks @mark1010.
- ACP/tasks: classify parent-owned ACP sessions as background work regardless of persistent runtime mode, so delegated ACP output reports through the parent task notifier instead of acting like a normal foreground chat session. Refs #73609. Thanks @joerod26.

View File

@@ -74,6 +74,7 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
private playhead = 0;
private closed = false;
private pendingCalls = new Map<string, PendingFunctionCall>();
private readonly sources = new Set<AudioBufferSourceNode>();
constructor(
private readonly session: RealtimeTalkJsonPcmWebSocketSessionResult,
@@ -93,11 +94,17 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
this.inputContext = new AudioContext({ sampleRate: this.session.audio.inputSampleRateHz });
this.outputContext = new AudioContext({ sampleRate: this.session.audio.outputSampleRateHz });
this.ws = new WebSocket(wsUrl);
this.ws.binaryType = "arraybuffer";
this.ws.addEventListener("open", () => {
if (this.closed) {
return;
}
this.send(this.session.initialMessage ?? { setup: {} });
this.startMicrophonePump();
});
this.ws.addEventListener("message", (event) => this.handleMessage(event.data));
this.ws.addEventListener("message", (event) => {
void this.handleMessage(event.data);
});
this.ws.addEventListener("close", () => {
if (!this.closed) {
this.ctx.callbacks.onStatus?.("error", "Realtime connection closed");
@@ -119,6 +126,7 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
this.inputSource = null;
this.media?.getTracks().forEach((track) => track.stop());
this.media = null;
this.stopOutput();
void this.inputContext?.close();
this.inputContext = null;
void this.outputContext?.close();
@@ -128,7 +136,7 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
}
private startMicrophonePump(): void {
if (!this.media || !this.inputContext) {
if (this.closed || !this.media || !this.inputContext) {
return;
}
this.inputSource = this.inputContext.createMediaStreamSource(this.media);
@@ -152,24 +160,30 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
}
private send(message: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
if (!this.closed && this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(message));
}
}
private handleMessage(data: unknown): void {
private async handleMessage(data: unknown): Promise<void> {
if (this.closed) {
return;
}
let message: GoogleLiveMessage;
try {
message = JSON.parse(String(data)) as GoogleLiveMessage;
message = JSON.parse(await decodeGoogleLiveMessageData(data)) as GoogleLiveMessage;
} catch {
return;
}
if (this.closed) {
return;
}
if (message.setupComplete) {
this.ctx.callbacks.onStatus?.("listening");
}
const content = message.serverContent;
if (content?.interrupted) {
this.playhead = this.outputContext?.currentTime ?? 0;
this.stopOutput();
}
if (content?.inputTranscription?.text) {
this.ctx.callbacks.onTranscript?.({
@@ -216,6 +230,8 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
);
buffer.getChannelData(0).set(samples);
const source = this.outputContext.createBufferSource();
this.sources.add(source);
source.addEventListener("ended", () => this.sources.delete(source));
source.buffer = buffer;
source.connect(this.outputContext.destination);
const startAt = Math.max(this.outputContext.currentTime, this.playhead);
@@ -223,6 +239,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
this.playhead = startAt + buffer.duration;
}
private stopOutput(): void {
for (const source of this.sources) {
try {
source.stop();
} catch {}
}
this.sources.clear();
this.playhead = this.outputContext?.currentTime ?? 0;
}
private async handleToolCall(call: {
id?: string;
name?: string;
@@ -238,13 +264,31 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
return;
}
await submitRealtimeTalkConsult({
ctx: this.ctx,
ctx: this.createActiveContext(),
callId,
args: call.args ?? {},
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
});
}
private createActiveContext(): RealtimeTalkTransportContext {
return {
...this.ctx,
callbacks: {
onStatus: (status, detail) => {
if (!this.closed) {
this.ctx.callbacks.onStatus?.(status, detail);
}
},
onTranscript: (entry) => {
if (!this.closed) {
this.ctx.callbacks.onTranscript?.(entry);
}
},
},
};
}
private submitToolResult(callId: string, result: unknown): void {
const pending = this.pendingCalls.get(callId);
if (!pending) {
@@ -268,3 +312,25 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
});
}
}
async function decodeGoogleLiveMessageData(data: unknown): Promise<string> {
if (typeof data === "string") {
return data;
}
if (typeof Blob !== "undefined" && data instanceof Blob) {
data = await data.arrayBuffer();
}
if (isArrayBufferLike(data)) {
return new TextDecoder().decode(new Uint8Array(data));
}
if (ArrayBuffer.isView(data)) {
return new TextDecoder().decode(new Uint8Array(data.buffer, data.byteOffset, data.byteLength));
}
return String(data);
}
function isArrayBufferLike(data: unknown): data is ArrayBuffer {
return (
data instanceof ArrayBuffer || Object.prototype.toString.call(data) === "[object ArrayBuffer]"
);
}

View File

@@ -1,6 +1,115 @@
import { describe, expect, it } from "vitest";
import { buildGoogleLiveUrl } from "./chat/realtime-talk-google-live.ts";
import type { RealtimeTalkJsonPcmWebSocketSessionResult } from "./chat/realtime-talk-shared.ts";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
buildGoogleLiveUrl,
GoogleLiveRealtimeTalkTransport,
} from "./chat/realtime-talk-google-live.ts";
import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "./chat/realtime-talk-shared.ts";
import type {
RealtimeTalkJsonPcmWebSocketSessionResult,
RealtimeTalkTransportContext,
} from "./chat/realtime-talk-shared.ts";
type MockWebSocketEvent = {
data?: unknown;
code?: number;
reason?: string;
};
type MockWebSocketHandler = (event?: MockWebSocketEvent) => void;
type MockWebSocketEventType = "close" | "error" | "message" | "open";
const wsInstances: MockGoogleLiveWebSocket[] = [];
const createdSources: MockAudioBufferSource[] = [];
class MockGoogleLiveWebSocket {
static OPEN = 1;
readonly handlers: Record<MockWebSocketEventType, MockWebSocketHandler[]> = {
close: [],
error: [],
message: [],
open: [],
};
readonly sent: string[] = [];
binaryType: BinaryType = "blob";
readyState = MockGoogleLiveWebSocket.OPEN;
constructor(readonly url: string) {
wsInstances.push(this);
}
addEventListener(type: MockWebSocketEventType, handler: MockWebSocketHandler) {
this.handlers[type].push(handler);
}
send(data: string) {
this.sent.push(data);
}
close() {
this.readyState = 3;
}
emitOpen() {
for (const handler of this.handlers.open) {
handler();
}
}
emitMessage(data: unknown) {
for (const handler of this.handlers.message) {
handler({ data });
}
}
}
class MockAudioBufferSource {
buffer: unknown = null;
readonly addEventListener = vi.fn();
readonly connect = vi.fn();
readonly start = vi.fn();
readonly stop = vi.fn();
}
class MockAudioContext {
readonly currentTime = 0;
readonly destination = {};
readonly sampleRate: number;
readonly close = vi.fn(async () => undefined);
constructor(options?: { sampleRate?: number }) {
this.sampleRate = options?.sampleRate ?? 24000;
}
createMediaStreamSource() {
return {
connect: vi.fn(),
disconnect: vi.fn(),
};
}
createScriptProcessor() {
return {
connect: vi.fn(),
disconnect: vi.fn(),
onaudioprocess: null,
};
}
createBuffer(_channels: number, length: number, sampleRate: number) {
const channel = new Float32Array(length);
return {
duration: length / sampleRate,
getChannelData: () => channel,
};
}
createBufferSource() {
const source = new MockAudioBufferSource();
createdSources.push(source);
return source;
}
}
function createSession(
websocketUrl: string,
@@ -21,6 +130,163 @@ function createSession(
};
}
function createClient(): RealtimeTalkTransportContext["client"] {
const client = {
addEventListener: vi.fn(() => () => undefined),
request: vi.fn(),
} as unknown as RealtimeTalkTransportContext["client"];
return client;
}
function createTransport(
callbacks: RealtimeTalkTransportContext["callbacks"] = {},
client = createClient(),
) {
return new GoogleLiveRealtimeTalkTransport(
createSession(
"wss://generativelanguage.googleapis.com/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContentConstrained",
),
{
callbacks,
client,
sessionKey: "main",
},
);
}
function encodeJsonFrame(value: unknown): ArrayBuffer {
return new TextEncoder().encode(JSON.stringify(value)).buffer;
}
function latestWebSocket(): MockGoogleLiveWebSocket {
const ws = wsInstances.at(-1);
if (!ws) {
throw new Error("missing WebSocket");
}
return ws;
}
describe("GoogleLiveRealtimeTalkTransport", () => {
beforeEach(() => {
wsInstances.length = 0;
createdSources.length = 0;
vi.stubGlobal("WebSocket", MockGoogleLiveWebSocket);
vi.stubGlobal("AudioContext", MockAudioContext);
vi.stubGlobal("navigator", {
mediaDevices: {
getUserMedia: vi.fn(async () => ({
getTracks: () => [{ stop: vi.fn() }],
})),
},
});
});
afterEach(() => {
vi.unstubAllGlobals();
});
it("requests ArrayBuffer frames and decodes binary setup messages", async () => {
const onStatus = vi.fn();
const transport = createTransport({ onStatus });
await transport.start();
const ws = latestWebSocket();
ws.emitMessage(encodeJsonFrame({ setupComplete: {} }));
expect(ws.binaryType).toBe("arraybuffer");
await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening"));
});
it("decodes Blob setup messages", async () => {
const onStatus = vi.fn();
const transport = createTransport({ onStatus });
await transport.start();
latestWebSocket().emitMessage(new Blob([JSON.stringify({ setupComplete: {} })]));
await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening"));
});
it("stops queued output when Google Live sends interruption", async () => {
const transport = createTransport();
await transport.start();
const ws = latestWebSocket();
ws.emitMessage(
encodeJsonFrame({
serverContent: {
modelTurn: {
parts: [{ inlineData: { data: "AAAAAA==", mimeType: "audio/pcm;rate=24000" } }],
},
},
}),
);
await vi.waitFor(() => expect(createdSources).toHaveLength(1));
const source = createdSources[0];
ws.emitMessage(encodeJsonFrame({ serverContent: { interrupted: true } }));
await vi.waitFor(() => expect(source?.stop).toHaveBeenCalledTimes(1));
});
it("ignores late WebSocket events after stop", async () => {
const onStatus = vi.fn();
const transport = createTransport({ onStatus });
await transport.start();
const ws = latestWebSocket();
transport.stop();
ws.emitOpen();
ws.emitMessage(new Blob([JSON.stringify({ setupComplete: {} })]));
await new Promise((resolve) => setTimeout(resolve, 0));
expect(ws.sent).toEqual([]);
expect(onStatus).not.toHaveBeenCalled();
});
it("does not revive Talk status after stop while a tool consult settles", async () => {
const onStatus = vi.fn();
let runId = "run-1";
const listeners = new Set<(event: { event: string; payload?: unknown }) => void>();
const client = {
addEventListener: vi.fn((listener: (event: { event: string; payload?: unknown }) => void) => {
listeners.add(listener);
return () => listeners.delete(listener);
}),
request: vi.fn(async (_method: string, params: { idempotencyKey?: string }) => {
runId = params.idempotencyKey ?? runId;
return { runId };
}),
} as unknown as RealtimeTalkTransportContext["client"];
const transport = createTransport({ onStatus }, client);
await transport.start();
latestWebSocket().emitMessage(
encodeJsonFrame({
toolCall: {
functionCalls: [
{
id: "call-1",
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
args: { question: "check the session" },
},
],
},
}),
);
await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("thinking", undefined));
await vi.waitFor(() => expect(listeners.size).toBe(1));
transport.stop();
for (const listener of listeners) {
listener({ event: "chat", payload: { runId, state: "final", message: { text: "done" } } });
}
await new Promise((resolve) => setTimeout(resolve, 0));
expect(onStatus).not.toHaveBeenCalledWith("listening");
});
});
describe("Google Live realtime Talk URL", () => {
it("only preserves the allowlisted Google Live endpoint and appends the ephemeral token", () => {
const url = buildGoogleLiveUrl(