feat: unify browser realtime talk clients

This commit is contained in:
Peter Steinberger
2026-05-05 20:59:52 +01:00
parent 466f718320
commit 9e6f38f4e1
9 changed files with 1024 additions and 134 deletions

View File

@@ -3,31 +3,38 @@ import {
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
submitRealtimeTalkConsult,
type RealtimeTalkGatewayRelaySessionResult,
type RealtimeTalkEvent,
type RealtimeTalkTransport,
type RealtimeTalkTransportContext,
} from "./realtime-talk-shared.ts";
type GatewayRelayEvent =
| { relaySessionId?: string; type?: "ready" }
| { relaySessionId?: string; type?: "audio"; audioBase64?: string }
| { relaySessionId?: string; type?: "clear" }
| { relaySessionId?: string; type?: "mark"; markName?: string }
type GatewayRelayEvent = {
relaySessionId?: string;
talkEvent?: RealtimeTalkEvent;
} & (
| { type?: "ready" }
| { type?: "audio"; audioBase64?: string }
| { type?: "clear" }
| { type?: "mark"; markName?: string }
| {
relaySessionId?: string;
type?: "transcript";
role?: "user" | "assistant";
text?: string;
final?: boolean;
}
| {
relaySessionId?: string;
type?: "toolCall";
callId?: string;
name?: string;
args?: unknown;
}
| { relaySessionId?: string; type?: "error"; message?: string }
| { relaySessionId?: string; type?: "close"; reason?: string };
| { type?: "error"; message?: string }
| { type?: "close"; reason?: string }
);
const BARGE_IN_RMS_THRESHOLD = 0.02;
const BARGE_IN_PEAK_THRESHOLD = 0.08;
const BARGE_IN_CONSECUTIVE_SPEECH_FRAMES = 2;
export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport {
private media: MediaStream | null = null;
@@ -39,6 +46,9 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
private playhead = 0;
private closed = false;
private readonly sources = new Set<AudioBufferSourceNode>();
private readonly consultAbortControllers = new Set<AbortController>();
private cancelRequestedForPlayback = false;
private speechFramesDuringPlayback = 0;
constructor(
private readonly session: RealtimeTalkGatewayRelaySessionResult,
@@ -62,7 +72,13 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
}
this.handleRelayEvent(evt.payload as GatewayRelayEvent);
});
this.media = await navigator.mediaDevices.getUserMedia({ audio: true });
this.media = await navigator.mediaDevices.getUserMedia({
audio: {
autoGainControl: true,
echoCancellation: true,
noiseSuppression: true,
},
});
this.inputContext = new AudioContext({ sampleRate: this.session.audio.inputSampleRateHz });
this.outputContext = new AudioContext({ sampleRate: this.session.audio.outputSampleRateHz });
this.startMicrophonePump();
@@ -76,6 +92,7 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
this.inputProcessor = null;
this.inputSource?.disconnect();
this.inputSource = null;
this.abortConsults();
this.media?.getTracks().forEach((track) => track.stop());
this.media = null;
this.stopOutput();
@@ -98,7 +115,11 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
if (this.closed) {
return;
}
const pcm = floatToPcm16(event.inputBuffer.getChannelData(0));
const samples = event.inputBuffer.getChannelData(0);
const pcm = floatToPcm16(samples);
if (this.detectBargeInSpeech(samples)) {
this.cancelOutputForBargeIn();
}
void this.ctx.client.request("talk.realtime.relayAudio", {
relaySessionId: this.session.relaySessionId,
audioBase64: bytesToBase64(pcm),
@@ -113,12 +134,17 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
if (event.relaySessionId !== this.session.relaySessionId || this.closed) {
return;
}
if (event.talkEvent) {
this.ctx.callbacks.onTalkEvent?.(event.talkEvent);
}
switch (event.type) {
case "ready":
this.ctx.callbacks.onStatus?.("listening");
return;
case "audio":
if (event.audioBase64) {
this.cancelRequestedForPlayback = false;
this.speechFramesDuringPlayback = 0;
this.playPcm16(event.audioBase64);
}
return;
@@ -144,6 +170,7 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
this.ctx.callbacks.onStatus?.("error", event.message ?? "Realtime relay failed");
return;
case "close":
this.abortConsults();
if (!this.closed) {
this.ctx.callbacks.onStatus?.(
event.reason === "error" ? "error" : "idle",
@@ -188,6 +215,7 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
}
this.sources.clear();
this.playhead = this.outputContext?.currentTime ?? 0;
this.speechFramesDuringPlayback = 0;
}
private scheduleMarkAck(): void {
@@ -219,12 +247,20 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
this.submitToolResult(callId, { error: `Tool "${name}" not available in browser Talk` });
return;
}
await submitRealtimeTalkConsult({
ctx: this.ctx,
callId,
args: event.args ?? {},
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
});
const abortController = new AbortController();
this.consultAbortControllers.add(abortController);
try {
await submitRealtimeTalkConsult({
ctx: this.ctx,
callId,
args: event.args ?? {},
relaySessionId: this.session.relaySessionId,
signal: abortController.signal,
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
});
} finally {
this.consultAbortControllers.delete(abortController);
}
}
private submitToolResult(callId: string, result: unknown): void {
@@ -234,4 +270,45 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport
result,
});
}
private cancelOutputForBargeIn(): void {
if (this.sources.size === 0 || this.cancelRequestedForPlayback) {
return;
}
this.cancelRequestedForPlayback = true;
this.stopOutput();
void this.ctx.client.request("talk.realtime.relayCancel", {
relaySessionId: this.session.relaySessionId,
reason: "barge-in",
});
}
private abortConsults(): void {
for (const controller of this.consultAbortControllers) {
controller.abort();
}
this.consultAbortControllers.clear();
}
private detectBargeInSpeech(samples: Float32Array): boolean {
if (this.sources.size === 0 || this.cancelRequestedForPlayback || samples.length === 0) {
this.speechFramesDuringPlayback = 0;
return false;
}
let sumSquares = 0;
let peak = 0;
for (const sample of samples) {
const abs = Math.abs(sample);
peak = Math.max(peak, abs);
sumSquares += sample * sample;
}
const rms = Math.sqrt(sumSquares / samples.length);
if (rms >= BARGE_IN_RMS_THRESHOLD && peak >= BARGE_IN_PEAK_THRESHOLD) {
this.speechFramesDuringPlayback += 1;
} else {
this.speechFramesDuringPlayback = 0;
}
return this.speechFramesDuringPlayback >= BARGE_IN_CONSECUTIVE_SPEECH_FRAMES;
}
}

View File

@@ -2,6 +2,7 @@ import { base64ToBytes, bytesToBase64, floatToPcm16, pcm16ToFloat } from "./real
import type { RealtimeTalkJsonPcmWebSocketSessionResult } from "./realtime-talk-shared.ts";
import {
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
createRealtimeTalkEventEmitter,
submitRealtimeTalkConsult,
type RealtimeTalkTransport,
type RealtimeTalkTransportContext,
@@ -74,12 +75,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
private playhead = 0;
private closed = false;
private pendingCalls = new Map<string, PendingFunctionCall>();
private readonly consultAbortControllers = new Set<AbortController>();
private readonly sources = new Set<AudioBufferSourceNode>();
private readonly emitTalkEvent: ReturnType<typeof createRealtimeTalkEventEmitter>;
constructor(
private readonly session: RealtimeTalkJsonPcmWebSocketSessionResult,
private readonly ctx: RealtimeTalkTransportContext,
) {}
) {
this.emitTalkEvent = createRealtimeTalkEventEmitter(ctx, session);
}
async start(): Promise<void> {
if (!navigator.mediaDevices?.getUserMedia || typeof WebSocket === "undefined") {
@@ -118,7 +123,14 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
}
stop(): void {
if (!this.closed) {
this.emitTalkEvent({ type: "session.closed", final: true });
}
this.closed = true;
for (const controller of this.consultAbortControllers) {
controller.abort();
}
this.consultAbortControllers.clear();
this.pendingCalls.clear();
this.inputProcessor?.disconnect();
this.inputProcessor = null;
@@ -180,10 +192,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
}
if (message.setupComplete) {
this.ctx.callbacks.onStatus?.("listening");
this.emitTalkEvent({ type: "session.ready" });
}
const content = message.serverContent;
if (content?.interrupted) {
this.stopOutput();
this.emitTalkEvent({
type: "turn.cancelled",
final: true,
payload: { reason: "provider-interrupted" },
});
}
if (content?.inputTranscription?.text) {
this.ctx.callbacks.onTranscript?.({
@@ -191,6 +209,11 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
text: content.inputTranscription.text,
final: content.inputTranscription.finished ?? false,
});
this.emitTalkEvent({
type: content.inputTranscription.finished ? "transcript.done" : "transcript.delta",
final: content.inputTranscription.finished ?? false,
payload: { role: "user", text: content.inputTranscription.text },
});
}
if (content?.outputTranscription?.text) {
this.ctx.callbacks.onTranscript?.({
@@ -198,9 +221,21 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
text: content.outputTranscription.text,
final: content.outputTranscription.finished ?? false,
});
this.emitTalkEvent({
type: content.outputTranscription.finished ? "output.text.done" : "output.text.delta",
final: content.outputTranscription.finished ?? false,
payload: { text: content.outputTranscription.text },
});
}
for (const part of content?.modelTurn?.parts ?? []) {
if (part.inlineData?.data) {
this.emitTalkEvent({
type: "output.audio.delta",
payload: {
byteLength: base64ToBytes(part.inlineData.data).byteLength,
mimeType: part.inlineData.mimeType,
},
});
this.playPcm16(part.inlineData.data);
} else if (!part.thought && typeof part.text === "string" && part.text.trim()) {
this.ctx.callbacks.onTranscript?.({
@@ -208,8 +243,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
text: part.text,
final: content?.turnComplete ?? false,
});
this.emitTalkEvent({
type: content?.turnComplete ? "output.text.done" : "output.text.delta",
final: content?.turnComplete ?? false,
payload: { text: part.text },
});
}
}
if (content?.turnComplete) {
this.emitTalkEvent({ type: "turn.ended", final: true });
}
for (const call of message.toolCall?.functionCalls ?? []) {
void this.handleToolCall(call);
}
@@ -260,15 +303,27 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport {
return;
}
this.pendingCalls.set(callId, { name, args: call.args ?? {} });
this.emitTalkEvent({
type: "tool.call",
callId,
payload: { name, args: call.args ?? {} },
});
if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) {
return;
}
await submitRealtimeTalkConsult({
ctx: this.createActiveContext(),
callId,
args: call.args ?? {},
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
});
const abortController = new AbortController();
this.consultAbortControllers.add(abortController);
try {
await submitRealtimeTalkConsult({
ctx: this.createActiveContext(),
callId,
args: call.args ?? {},
signal: abortController.signal,
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
});
} finally {
this.consultAbortControllers.delete(abortController);
}
}
private createActiveContext(): RealtimeTalkTransportContext {

View File

@@ -1,15 +1,25 @@
import {
buildRealtimeVoiceAgentConsultChatMessage,
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
} from "../../../../src/realtime-voice/agent-consult-tool.js";
import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "../../../../src/realtime-voice/agent-consult-tool.js";
import type { TalkEvent } from "../../../../src/realtime-voice/talk-events.js";
import type { GatewayBrowserClient, GatewayEventFrame } from "../gateway.ts";
import { generateUUID } from "../uuid.ts";
export type RealtimeTalkStatus = "idle" | "connecting" | "listening" | "thinking" | "error";
export type RealtimeTalkEvent = TalkEvent;
export type RealtimeTalkCallbacks = {
onStatus?: (status: RealtimeTalkStatus, detail?: string) => void;
onTranscript?: (entry: { role: "user" | "assistant"; text: string; final: boolean }) => void;
onTalkEvent?: (event: RealtimeTalkEvent) => void;
};
export type RealtimeTalkEventInput<TPayload = unknown> = {
type: RealtimeTalkEvent["type"];
payload?: TPayload;
turnId?: string;
captureId?: string;
final?: boolean;
callId?: string;
itemId?: string;
parentId?: string;
};
export type RealtimeTalkAudioContract = {
@@ -21,7 +31,7 @@ export type RealtimeTalkAudioContract = {
export type RealtimeTalkWebRtcSdpSessionResult = {
provider: string;
transport?: "webrtc-sdp";
transport: "webrtc";
clientSecret: string;
offerUrl?: string;
offerHeaders?: Record<string, string>;
@@ -32,7 +42,7 @@ export type RealtimeTalkWebRtcSdpSessionResult = {
export type RealtimeTalkJsonPcmWebSocketSessionResult = {
provider: string;
transport: "json-pcm-websocket";
transport: "provider-websocket";
protocol: string;
clientSecret: string;
websocketUrl: string;
@@ -80,6 +90,86 @@ export type RealtimeTalkTransportContext = {
callbacks: RealtimeTalkCallbacks;
};
export function createRealtimeTalkEventEmitter(
ctx: RealtimeTalkTransportContext,
session: RealtimeTalkSessionResult,
): (input: RealtimeTalkEventInput) => void {
let seq = 0;
let turnSeq = 0;
let activeTurnId: string | undefined;
const sessionId = resolveRealtimeTalkEventSessionId(ctx, session);
return (input) => {
if (!ctx.callbacks.onTalkEvent) {
return;
}
const turnId = resolveRealtimeTalkTurnId(input);
seq += 1;
ctx.callbacks.onTalkEvent({
id: `${sessionId}:${seq}`,
type: input.type,
sessionId,
turnId,
captureId: input.captureId,
seq,
timestamp: new Date().toISOString(),
mode: "realtime",
transport: session.transport,
brain: "agent-consult",
provider: session.provider,
final: input.final,
callId: input.callId,
itemId: input.itemId,
parentId: input.parentId,
payload: input.payload ?? null,
});
if (
input.type === "turn.ended" ||
input.type === "turn.cancelled" ||
input.type === "session.replaced" ||
input.type === "session.closed"
) {
activeTurnId = undefined;
}
};
function resolveRealtimeTalkTurnId(input: RealtimeTalkEventInput): string | undefined {
if (input.type === "turn.started") {
activeTurnId = input.turnId ?? activeTurnId ?? `turn-${++turnSeq}`;
return activeTurnId;
}
if (!isTurnScopedTalkEvent(input.type)) {
return input.turnId;
}
activeTurnId = input.turnId ?? activeTurnId ?? `turn-${++turnSeq}`;
return activeTurnId;
}
}
function isTurnScopedTalkEvent(type: RealtimeTalkEvent["type"]): boolean {
return (
type === "turn.ended" ||
type === "turn.cancelled" ||
type.startsWith("input.audio.") ||
type.startsWith("transcript.") ||
type.startsWith("output.") ||
type.startsWith("tool.")
);
}
function resolveRealtimeTalkEventSessionId(
ctx: RealtimeTalkTransportContext,
session: RealtimeTalkSessionResult,
): string {
const explicitSessionId = (session as { sessionId?: unknown }).sessionId;
if (typeof explicitSessionId === "string" && explicitSessionId.trim()) {
return explicitSessionId.trim();
}
if ("relaySessionId" in session && session.relaySessionId.trim()) {
return session.relaySessionId;
}
return `${ctx.sessionKey}:${session.provider}:${session.transport}`;
}
type ChatPayload = {
runId?: string;
state?: string;
@@ -112,13 +202,24 @@ function waitForChatResult(params: {
client: GatewayBrowserClient;
runId: string;
timeoutMs: number;
signal?: AbortSignal;
}): Promise<string> {
return new Promise((resolve, reject) => {
if (params.signal?.aborted) {
reject(new DOMException("OpenClaw tool call aborted", "AbortError"));
return;
}
const timer = window.setTimeout(() => {
unsubscribe();
cleanup();
reject(new Error("OpenClaw tool call timed out"));
}, params.timeoutMs);
const unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => {
const onAbort = () => {
cleanup();
reject(new DOMException("OpenClaw tool call aborted", "AbortError"));
};
params.signal?.addEventListener("abort", onAbort, { once: true });
let unsubscribe: () => void = () => undefined;
unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => {
if (evt.event !== "chat") {
return;
}
@@ -127,15 +228,23 @@ function waitForChatResult(params: {
return;
}
if (payload.state === "final") {
window.clearTimeout(timer);
unsubscribe();
cleanup();
resolve(extractTextFromMessage(payload.message) || "OpenClaw finished with no text.");
} else if (payload.state === "aborted") {
cleanup();
reject(
new DOMException(payload.errorMessage ?? "OpenClaw tool call aborted", "AbortError"),
);
} else if (payload.state === "error") {
window.clearTimeout(timer);
unsubscribe();
cleanup();
reject(new Error(payload.errorMessage ?? "OpenClaw tool call failed"));
}
});
function cleanup() {
window.clearTimeout(timer);
params.signal?.removeEventListener("abort", onAbort);
unsubscribe();
}
});
}
@@ -144,42 +253,72 @@ export async function submitRealtimeTalkConsult(params: {
args: unknown;
submit: (callId: string, result: unknown) => void;
callId: string;
relaySessionId?: string;
signal?: AbortSignal;
}): Promise<void> {
const { ctx, callId, submit } = params;
ctx.callbacks.onStatus?.("thinking");
let question = "";
let runId: string | undefined;
let aborted = false;
const abortRun = () => {
aborted = true;
if (runId) {
void ctx.client.request("chat.abort", { sessionKey: ctx.sessionKey, runId });
}
};
if (params.signal?.aborted) {
return;
}
params.signal?.addEventListener("abort", abortRun, { once: true });
try {
const args =
typeof params.args === "string" ? JSON.parse(params.args || "{}") : (params.args ?? {});
question = buildRealtimeVoiceAgentConsultChatMessage(args);
} catch {}
if (!question) {
submit(callId, {
error: `${REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME} requires a question`,
});
ctx.callbacks.onStatus?.("listening");
return;
}
try {
const idempotencyKey = generateUUID();
const response = await ctx.client.request<{ runId?: string }>("chat.send", {
sessionKey: ctx.sessionKey,
message: question,
idempotencyKey,
});
const response = await ctx.client.request<{ runId?: string; idempotencyKey?: string }>(
"talk.realtime.toolCall",
{
sessionKey: ctx.sessionKey,
callId,
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
args,
...(params.relaySessionId ? { relaySessionId: params.relaySessionId } : {}),
},
);
runId = response.runId ?? response.idempotencyKey;
if (!runId) {
throw new Error("OpenClaw realtime tool call did not return a run id");
}
if (params.signal?.aborted) {
abortRun();
return;
}
const result = await waitForChatResult({
client: ctx.client,
runId: response.runId ?? idempotencyKey,
runId,
timeoutMs: 120_000,
signal: params.signal,
});
submit(callId, { result });
} catch (error) {
if (aborted || params.signal?.aborted || isAbortError(error)) {
return;
}
submit(callId, {
error: error instanceof Error ? error.message : String(error),
});
} finally {
ctx.callbacks.onStatus?.("listening");
params.signal?.removeEventListener("abort", abortRun);
if (!aborted && !params.signal?.aborted) {
ctx.callbacks.onStatus?.("listening");
}
}
}
function isAbortError(error: unknown): boolean {
return (
typeof DOMException !== "undefined" &&
error instanceof DOMException &&
error.name === "AbortError"
);
}
export { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME };

View File

@@ -1,6 +1,7 @@
import type { RealtimeTalkWebRtcSdpSessionResult } from "./realtime-talk-shared.ts";
import {
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
createRealtimeTalkEventEmitter,
submitRealtimeTalkConsult,
type RealtimeTalkTransport,
type RealtimeTalkTransportContext,
@@ -34,11 +35,15 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
private audio: HTMLAudioElement | null = null;
private closed = false;
private toolBuffers = new Map<string, ToolBuffer>();
private readonly consultAbortControllers = new Set<AbortController>();
private readonly emitTalkEvent: ReturnType<typeof createRealtimeTalkEventEmitter>;
constructor(
private readonly session: RealtimeTalkWebRtcSdpSessionResult,
private readonly ctx: RealtimeTalkTransportContext,
) {}
) {
this.emitTalkEvent = createRealtimeTalkEventEmitter(ctx, session);
}
async start(): Promise<void> {
if (!navigator.mediaDevices?.getUserMedia || typeof RTCPeerConnection === "undefined") {
@@ -60,7 +65,10 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
this.peer.addTrack(track, this.media);
}
this.channel = this.peer.createDataChannel("oai-events");
this.channel.addEventListener("open", () => this.ctx.callbacks.onStatus?.("listening"));
this.channel.addEventListener("open", () => {
this.ctx.callbacks.onStatus?.("listening");
this.emitTalkEvent({ type: "session.ready" });
});
this.channel.addEventListener("message", (event) => this.handleRealtimeEvent(event.data));
this.peer.addEventListener("connectionstatechange", () => {
if (this.closed) {
@@ -92,6 +100,9 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
}
stop(): void {
if (!this.closed) {
this.emitTalkEvent({ type: "session.closed", final: true });
}
this.closed = true;
this.channel?.close();
this.channel = null;
@@ -101,6 +112,10 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
this.media = null;
this.audio?.remove();
this.audio = null;
for (const controller of this.consultAbortControllers) {
controller.abort();
}
this.consultAbortControllers.clear();
this.toolBuffers.clear();
}
@@ -111,6 +126,9 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
}
private handleRealtimeEvent(data: unknown): void {
if (this.closed) {
return;
}
let event: RealtimeServerEvent;
try {
event = JSON.parse(String(data)) as RealtimeServerEvent;
@@ -121,6 +139,12 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
case "conversation.item.input_audio_transcription.completed":
if (event.transcript) {
this.ctx.callbacks.onTranscript?.({ role: "user", text: event.transcript, final: true });
this.emitTalkEvent({
type: "transcript.done",
final: true,
itemId: event.item_id,
payload: { role: "user", text: event.transcript },
});
}
return;
case "response.audio_transcript.done":
@@ -130,6 +154,12 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
text: event.transcript,
final: true,
});
this.emitTalkEvent({
type: "output.text.done",
final: true,
itemId: event.item_id,
payload: { text: event.transcript },
});
}
return;
case "response.function_call_arguments.delta":
@@ -140,18 +170,30 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
return;
case "input_audio_buffer.speech_started":
this.ctx.callbacks.onStatus?.("listening", "Speech detected");
this.emitTalkEvent({ type: "turn.started", payload: { source: event.type } });
return;
case "input_audio_buffer.speech_stopped":
this.ctx.callbacks.onStatus?.("thinking", "Processing speech");
this.emitTalkEvent({ type: "input.audio.committed", final: true });
return;
case "response.created":
this.ctx.callbacks.onStatus?.("thinking", "Generating response");
return;
case "response.done":
this.ctx.callbacks.onStatus?.("listening", this.extractResponseStatus(event));
this.emitTalkEvent({
type: "turn.ended",
final: true,
payload: { status: event.response?.status ?? "completed" },
});
return;
case "error":
this.ctx.callbacks.onStatus?.("error", this.extractErrorDetail(event.error));
this.emitTalkEvent({
type: "session.error",
final: true,
payload: { message: this.extractErrorDetail(event.error) },
});
return;
default:
return;
@@ -197,12 +239,25 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport {
if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME || !callId) {
return;
}
await submitRealtimeTalkConsult({
ctx: this.ctx,
this.emitTalkEvent({
type: "tool.call",
callId,
args: buffered?.args || event.arguments || "{}",
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
itemId: key,
payload: { name, args: buffered?.args || event.arguments || "{}" },
});
const abortController = new AbortController();
this.consultAbortControllers.add(abortController);
try {
await submitRealtimeTalkConsult({
ctx: this.ctx,
callId,
args: buffered?.args || event.arguments || "{}",
signal: abortController.signal,
submit: (toolCallId, result) => this.submitToolResult(toolCallId, result),
});
} finally {
this.consultAbortControllers.delete(abortController);
}
}
private submitToolResult(callId: string, result: unknown): void {

View File

@@ -1,8 +1,10 @@
import { normalizeTalkTransport } from "../../../../src/realtime-voice/talk-session-controller.js";
import type { GatewayBrowserClient } from "../gateway.ts";
import { GatewayRelayRealtimeTalkTransport } from "./realtime-talk-gateway-relay.ts";
import { GoogleLiveRealtimeTalkTransport } from "./realtime-talk-google-live.ts";
import {
type RealtimeTalkCallbacks,
type RealtimeTalkEvent,
type RealtimeTalkGatewayRelaySessionResult,
type RealtimeTalkJsonPcmWebSocketSessionResult,
type RealtimeTalkSessionResult,
@@ -13,17 +15,22 @@ import {
} from "./realtime-talk-shared.ts";
import { WebRtcSdpRealtimeTalkTransport } from "./realtime-talk-webrtc.ts";
export type { RealtimeTalkCallbacks, RealtimeTalkSessionResult, RealtimeTalkStatus };
export type {
RealtimeTalkCallbacks,
RealtimeTalkEvent,
RealtimeTalkSessionResult,
RealtimeTalkStatus,
};
function createTransport(
session: RealtimeTalkSessionResult,
ctx: RealtimeTalkTransportContext,
): RealtimeTalkTransport {
const transport = resolveTransport(session);
if (transport === "webrtc-sdp") {
if (transport === "webrtc") {
return new WebRtcSdpRealtimeTalkTransport(session as RealtimeTalkWebRtcSdpSessionResult, ctx);
}
if (transport === "json-pcm-websocket") {
if (transport === "provider-websocket") {
return new GoogleLiveRealtimeTalkTransport(
session as RealtimeTalkJsonPcmWebSocketSessionResult,
ctx,
@@ -43,30 +50,7 @@ function createTransport(
}
function resolveTransport(session: RealtimeTalkSessionResult): string {
if (session.transport) {
return session.transport;
}
const raw = session as {
provider?: string;
protocol?: string;
websocketUrl?: string;
};
const provider = raw.provider?.trim().toLowerCase();
if (provider === "google" && (raw.protocol === "google-live-bidi" || raw.websocketUrl)) {
return "json-pcm-websocket";
}
if (provider === "google") {
throw new Error(buildGoogleWebRtcUnsupportedMessage());
}
return "webrtc-sdp";
}
function buildGoogleWebRtcUnsupportedMessage(): string {
return [
'Realtime voice provider "google" does not support browser WebRTC sessions.',
"Control UI Talk can use Google through the gateway relay or a Google Live WebSocket session instead.",
'Restart the gateway so it returns "gateway-relay" or "json-pcm-websocket", or switch Talk realtime to a WebRTC-capable provider such as OpenAI.',
].join(" ");
return normalizeTalkTransport((session as { transport?: string }).transport) ?? "webrtc";
}
export class RealtimeTalkSession {

View File

@@ -0,0 +1,361 @@
// @vitest-environment jsdom
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { GatewayRelayRealtimeTalkTransport } from "./chat/realtime-talk-gateway-relay.ts";
import {
REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
type RealtimeTalkEvent,
type RealtimeTalkGatewayRelaySessionResult,
type RealtimeTalkTransportContext,
} from "./chat/realtime-talk-shared.ts";
type GatewayFrame = { event: string; payload?: unknown };
type GatewayListener = (event: GatewayFrame) => void;
type MockProcessor = {
connect: ReturnType<typeof vi.fn>;
disconnect: ReturnType<typeof vi.fn>;
onaudioprocess:
| ((event: { inputBuffer: { getChannelData: (channel: number) => Float32Array } }) => void)
| null;
};
const listeners = new Set<GatewayListener>();
const processors: MockProcessor[] = [];
class MockAudioContext {
readonly currentTime = 0;
readonly destination = {};
readonly close = vi.fn(async () => undefined);
createMediaStreamSource() {
return {
connect: vi.fn(),
disconnect: vi.fn(),
};
}
createScriptProcessor() {
const processor: MockProcessor = {
connect: vi.fn(),
disconnect: vi.fn(),
onaudioprocess: null,
};
processors.push(processor);
return processor;
}
createBuffer(_channels: number, length: number, sampleRate: number) {
const channel = new Float32Array(length);
return {
duration: length / sampleRate,
getChannelData: () => channel,
};
}
createBufferSource() {
return {
addEventListener: vi.fn(),
buffer: null,
connect: vi.fn(),
start: vi.fn(),
stop: vi.fn(),
};
}
}
function createSession(): RealtimeTalkGatewayRelaySessionResult {
return {
provider: "openai",
transport: "gateway-relay",
relaySessionId: "relay-1",
audio: {
inputEncoding: "pcm16",
inputSampleRateHz: 24000,
outputEncoding: "pcm16",
outputSampleRateHz: 24000,
},
};
}
function createClient(): RealtimeTalkTransportContext["client"] {
return {
addEventListener: vi.fn((listener: GatewayListener) => {
listeners.add(listener);
return () => listeners.delete(listener);
}),
request: vi.fn(async () => ({})),
} as unknown as RealtimeTalkTransportContext["client"];
}
function emitGatewayFrame(frame: GatewayFrame): void {
for (const listener of listeners) {
listener(frame);
}
}
function pumpMicrophone(samples: Float32Array): void {
const processor = processors.at(-1);
expect(processor).toBeDefined();
processor?.onaudioprocess?.({
inputBuffer: {
getChannelData: () => samples,
},
});
}
describe("GatewayRelayRealtimeTalkTransport", () => {
beforeEach(() => {
listeners.clear();
processors.length = 0;
vi.stubGlobal("AudioContext", MockAudioContext);
Object.defineProperty(globalThis.navigator, "mediaDevices", {
configurable: true,
value: {
getUserMedia: vi.fn(async () => ({
getTracks: () => [{ stop: vi.fn() }],
})),
},
});
});
afterEach(() => {
vi.unstubAllGlobals();
listeners.clear();
processors.length = 0;
});
it("forwards common Talk events from Gateway relay frames", async () => {
const onTalkEvent = vi.fn();
const transport = new GatewayRelayRealtimeTalkTransport(createSession(), {
callbacks: { onTalkEvent },
client: createClient(),
sessionKey: "main",
});
const talkEvent = {
id: "relay-1:1",
type: "session.ready",
sessionId: "relay-1",
seq: 1,
timestamp: "2026-05-05T00:00:00.000Z",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
payload: {},
} satisfies RealtimeTalkEvent;
await transport.start();
emitGatewayFrame({
event: "talk.realtime.relay",
payload: {
relaySessionId: "relay-1",
type: "ready",
talkEvent,
},
});
expect(onTalkEvent).toHaveBeenCalledWith(talkEvent);
transport.stop();
});
it("does not forward Talk events for another relay session", async () => {
const onTalkEvent = vi.fn();
const transport = new GatewayRelayRealtimeTalkTransport(createSession(), {
callbacks: { onTalkEvent },
client: createClient(),
sessionKey: "main",
});
await transport.start();
emitGatewayFrame({
event: "talk.realtime.relay",
payload: {
relaySessionId: "relay-other",
type: "ready",
talkEvent: {
id: "relay-other:1",
type: "session.ready",
sessionId: "relay-other",
seq: 1,
timestamp: "2026-05-05T00:00:00.000Z",
mode: "realtime",
transport: "gateway-relay",
brain: "agent-consult",
payload: {},
} satisfies RealtimeTalkEvent,
},
});
expect(onTalkEvent).not.toHaveBeenCalled();
transport.stop();
});
it("keeps assistant playback alive while relay input is silence", async () => {
const client = createClient();
const transport = new GatewayRelayRealtimeTalkTransport(createSession(), {
callbacks: {},
client,
sessionKey: "main",
});
await transport.start();
emitGatewayFrame({
event: "talk.realtime.relay",
payload: {
relaySessionId: "relay-1",
type: "audio",
audioBase64: "AAAA",
},
});
pumpMicrophone(new Float32Array(4096));
expect(client.request).not.toHaveBeenCalledWith("talk.realtime.relayCancel", expect.anything());
expect(client.request).toHaveBeenCalledWith(
"talk.realtime.relayAudio",
expect.objectContaining({ relaySessionId: "relay-1" }),
);
transport.stop();
});
it("cancels relay playback after sustained input speech", async () => {
const client = createClient();
const transport = new GatewayRelayRealtimeTalkTransport(createSession(), {
callbacks: {},
client,
sessionKey: "main",
});
const speech = new Float32Array(4096).fill(0.25);
await transport.start();
emitGatewayFrame({
event: "talk.realtime.relay",
payload: {
relaySessionId: "relay-1",
type: "audio",
audioBase64: "AAAA",
},
});
pumpMicrophone(speech);
expect(client.request).not.toHaveBeenCalledWith("talk.realtime.relayCancel", expect.anything());
pumpMicrophone(speech);
pumpMicrophone(speech);
const cancelCalls = vi
.mocked(client.request)
.mock.calls.filter(([method]) => method === "talk.realtime.relayCancel");
expect(cancelCalls).toEqual([
[
"talk.realtime.relayCancel",
{
relaySessionId: "relay-1",
reason: "barge-in",
},
],
]);
transport.stop();
});
it("treats aborted consult chat events as cancellation", async () => {
const onStatus = vi.fn();
const client = createClient();
vi.mocked(client.request).mockImplementation(async (method) => {
if (method === "talk.realtime.toolCall") {
return { runId: "run-1" };
}
return {};
});
const transport = new GatewayRelayRealtimeTalkTransport(createSession(), {
callbacks: { onStatus },
client,
sessionKey: "main",
});
await transport.start();
emitGatewayFrame({
event: "talk.realtime.relay",
payload: {
relaySessionId: "relay-1",
type: "toolCall",
callId: "call-1",
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
args: { question: "status?" },
},
});
await vi.waitFor(() =>
expect(client.request).toHaveBeenCalledWith(
"talk.realtime.toolCall",
expect.objectContaining({
callId: "call-1",
relaySessionId: "relay-1",
}),
),
);
emitGatewayFrame({
event: "chat",
payload: {
runId: "run-1",
state: "aborted",
},
});
await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening"));
expect(
vi
.mocked(client.request)
.mock.calls.some(([method]) => method === "talk.realtime.relayToolResult"),
).toBe(false);
transport.stop();
});
it("aborts in-flight consults when the relay transport stops", async () => {
const client = createClient();
vi.mocked(client.request).mockImplementation(async (method, params) => {
if (method === "chat.abort") {
expect(params).toEqual({ sessionKey: "main", runId: "run-1" });
return { ok: true, aborted: true };
}
if (method === "talk.realtime.toolCall") {
return { runId: "run-1" };
}
return {};
});
const transport = new GatewayRelayRealtimeTalkTransport(createSession(), {
callbacks: {},
client,
sessionKey: "main",
});
await transport.start();
emitGatewayFrame({
event: "talk.realtime.relay",
payload: {
relaySessionId: "relay-1",
type: "toolCall",
callId: "call-1",
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
args: { question: "status?" },
},
});
await vi.waitFor(() =>
expect(client.request).toHaveBeenCalledWith("talk.realtime.toolCall", expect.anything()),
);
transport.stop();
await vi.waitFor(() =>
expect(client.request).toHaveBeenCalledWith("chat.abort", {
sessionKey: "main",
runId: "run-1",
}),
);
emitGatewayFrame({
event: "chat",
payload: { runId: "run-1", state: "final", message: { text: "late answer" } },
});
await new Promise((resolve) => setTimeout(resolve, 0));
expect(
vi
.mocked(client.request)
.mock.calls.some(([method]) => method === "talk.realtime.relayToolResult"),
).toBe(false);
});
});

View File

@@ -117,7 +117,7 @@ function createSession(
): RealtimeTalkJsonPcmWebSocketSessionResult {
return {
provider: "google",
transport: "json-pcm-websocket",
transport: "provider-websocket",
protocol: "google-live-bidi",
clientSecret,
websocketUrl,
@@ -187,7 +187,8 @@ describe("GoogleLiveRealtimeTalkTransport", () => {
it("requests ArrayBuffer frames and decodes binary setup messages", async () => {
const onStatus = vi.fn();
const transport = createTransport({ onStatus });
const onTalkEvent = vi.fn();
const transport = createTransport({ onStatus, onTalkEvent });
await transport.start();
const ws = latestWebSocket();
@@ -195,6 +196,13 @@ describe("GoogleLiveRealtimeTalkTransport", () => {
expect(ws.binaryType).toBe("arraybuffer");
await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening"));
expect(onTalkEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: "session.ready",
sessionId: "main:google:provider-websocket",
transport: "provider-websocket",
}),
);
});
it("decodes Blob setup messages", async () => {
@@ -208,7 +216,8 @@ describe("GoogleLiveRealtimeTalkTransport", () => {
});
it("stops queued output when Google Live sends interruption", async () => {
const transport = createTransport();
const onTalkEvent = vi.fn();
const transport = createTransport({ onTalkEvent });
await transport.start();
const ws = latestWebSocket();
@@ -227,6 +236,60 @@ describe("GoogleLiveRealtimeTalkTransport", () => {
ws.emitMessage(encodeJsonFrame({ serverContent: { interrupted: true } }));
await vi.waitFor(() => expect(source?.stop).toHaveBeenCalledTimes(1));
expect(onTalkEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: "turn.cancelled",
final: true,
payload: { reason: "provider-interrupted" },
}),
);
});
it("emits common Talk events for Google Live transcript and audio frames", async () => {
const onTranscript = vi.fn();
const onTalkEvent = vi.fn();
const transport = createTransport({ onTalkEvent, onTranscript });
await transport.start();
latestWebSocket().emitMessage(
encodeJsonFrame({
serverContent: {
inputTranscription: { text: "hello", finished: true },
outputTranscription: { text: "hi", finished: false },
modelTurn: {
parts: [
{ inlineData: { data: "AAAAAA==", mimeType: "audio/pcm;rate=24000" } },
{ text: "there" },
],
},
turnComplete: true,
},
}),
);
await vi.waitFor(() =>
expect(onTalkEvent.mock.calls.map(([event]) => event.type)).toEqual([
"transcript.done",
"output.text.delta",
"output.audio.delta",
"output.text.done",
"turn.ended",
]),
);
expect(onTalkEvent.mock.calls.map(([event]) => event.turnId)).toEqual([
"turn-1",
"turn-1",
"turn-1",
"turn-1",
"turn-1",
]);
expect(onTranscript).toHaveBeenCalledWith({ role: "user", text: "hello", final: true });
expect(onTranscript).toHaveBeenCalledWith({ role: "assistant", text: "hi", final: false });
expect(onTalkEvent.mock.calls[2]?.[0]).toMatchObject({
payload: { byteLength: 4, mimeType: "audio/pcm;rate=24000" },
sessionId: "main:google:provider-websocket",
transport: "provider-websocket",
});
});
it("ignores late WebSocket events after stop", async () => {
@@ -253,8 +316,18 @@ describe("GoogleLiveRealtimeTalkTransport", () => {
listeners.add(listener);
return () => listeners.delete(listener);
}),
request: vi.fn(async (_method: string, params: { idempotencyKey?: string }) => {
runId = params.idempotencyKey ?? runId;
request: vi.fn(async (method: string, params: Record<string, unknown>) => {
if (method === "chat.abort") {
expect(params).toEqual({ sessionKey: "main", runId });
return { ok: true, aborted: true };
}
expect(method).toBe("talk.realtime.toolCall");
expect(params).toEqual(
expect.objectContaining({
callId: "call-1",
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
}),
);
return { runId };
}),
} as unknown as RealtimeTalkTransportContext["client"];
@@ -283,6 +356,7 @@ describe("GoogleLiveRealtimeTalkTransport", () => {
}
await new Promise((resolve) => setTimeout(resolve, 0));
expect(client.request).toHaveBeenCalledWith("chat.abort", { sessionKey: "main", runId });
expect(onStatus).not.toHaveBeenCalledWith("listening");
});
});

View File

@@ -1,5 +1,6 @@
// @vitest-environment jsdom
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "./chat/realtime-talk-shared.ts";
import { WebRtcSdpRealtimeTalkTransport } from "./chat/realtime-talk-webrtc.ts";
class FakeDataChannel extends EventTarget {
@@ -72,7 +73,7 @@ describe("WebRtcSdpRealtimeTalkTransport", () => {
const transport = new WebRtcSdpRealtimeTalkTransport(
{
provider: "openai",
transport: "webrtc-sdp",
transport: "webrtc",
clientSecret: "client-secret-123",
offerUrl: "https://api.openai.com/v1/realtime/calls",
offerHeaders: {
@@ -111,7 +112,7 @@ describe("WebRtcSdpRealtimeTalkTransport", () => {
const transport = new WebRtcSdpRealtimeTalkTransport(
{
provider: "openai",
transport: "webrtc-sdp",
transport: "webrtc",
clientSecret: "client-secret-123",
},
{
@@ -142,16 +143,17 @@ describe("WebRtcSdpRealtimeTalkTransport", () => {
vi.fn(async () => new Response("answer-sdp")) as unknown as typeof fetch,
);
const onStatus = vi.fn();
const onTalkEvent = vi.fn();
const transport = new WebRtcSdpRealtimeTalkTransport(
{
provider: "openai",
transport: "webrtc-sdp",
transport: "webrtc",
clientSecret: "client-secret-123",
},
{
client: {} as never,
sessionKey: "main",
callbacks: { onStatus },
callbacks: { onStatus, onTalkEvent },
},
);
@@ -170,6 +172,149 @@ describe("WebRtcSdpRealtimeTalkTransport", () => {
expect(onStatus).toHaveBeenCalledWith("thinking", "Processing speech");
expect(onStatus).toHaveBeenCalledWith("thinking", "Generating response");
expect(onStatus).toHaveBeenCalledWith("listening", undefined);
expect(onTalkEvent.mock.calls.map(([event]) => event.type)).toEqual([
"turn.started",
"input.audio.committed",
"turn.ended",
]);
expect(onTalkEvent.mock.calls.map(([event]) => event.turnId)).toEqual([
"turn-1",
"turn-1",
"turn-1",
]);
transport.stop();
});
it("emits common Talk transcript events from the OpenAI data channel", async () => {
vi.stubGlobal(
"fetch",
vi.fn(async () => new Response("answer-sdp")) as unknown as typeof fetch,
);
const onTranscript = vi.fn();
const onTalkEvent = vi.fn();
const transport = new WebRtcSdpRealtimeTalkTransport(
{
provider: "openai",
transport: "webrtc",
clientSecret: "client-secret-123",
},
{
client: {} as never,
sessionKey: "main",
callbacks: { onTranscript, onTalkEvent },
},
);
await transport.start();
const peer = FakePeerConnection.instances[0];
peer?.channel.dispatchEvent(
new MessageEvent("message", {
data: JSON.stringify({
type: "conversation.item.input_audio_transcription.completed",
item_id: "input-1",
transcript: "hello",
}),
}),
);
peer?.channel.dispatchEvent(
new MessageEvent("message", {
data: JSON.stringify({
type: "response.audio_transcript.done",
item_id: "response-1",
transcript: "hi there",
}),
}),
);
expect(onTranscript).toHaveBeenCalledWith({ role: "user", text: "hello", final: true });
expect(onTranscript).toHaveBeenCalledWith({
role: "assistant",
text: "hi there",
final: true,
});
expect(onTalkEvent.mock.calls.map(([event]) => event.type)).toEqual([
"transcript.done",
"output.text.done",
]);
expect(onTalkEvent.mock.calls.map(([event]) => event.turnId)).toEqual(["turn-1", "turn-1"]);
expect(onTalkEvent.mock.calls[0]?.[0]).toMatchObject({
itemId: "input-1",
payload: { role: "user", text: "hello" },
sessionId: "main:openai:webrtc",
transport: "webrtc",
});
expect(onTalkEvent.mock.calls[1]?.[0]).toMatchObject({
itemId: "response-1",
payload: { text: "hi there" },
sessionId: "main:openai:webrtc",
transport: "webrtc",
});
transport.stop();
});
it("aborts an in-flight OpenAI tool consult when the transport stops", async () => {
vi.stubGlobal(
"fetch",
vi.fn(async () => new Response("answer-sdp")) as unknown as typeof fetch,
);
const listeners = new Set<(event: { event: string; payload?: unknown }) => void>();
const request = vi.fn(async (method: string, params: Record<string, unknown>) => {
if (method === "chat.abort") {
expect(params).toEqual({ sessionKey: "main", runId: "run-1" });
return { ok: true, aborted: true };
}
expect(method).toBe("talk.realtime.toolCall");
expect(params).toEqual(
expect.objectContaining({
callId: "call-1",
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
}),
);
return { runId: "run-1" };
});
const transport = new WebRtcSdpRealtimeTalkTransport(
{
provider: "openai",
transport: "webrtc",
clientSecret: "client-secret-123",
},
{
client: {
addEventListener: vi.fn(
(listener: (event: { event: string; payload?: unknown }) => void) => {
listeners.add(listener);
return () => listeners.delete(listener);
},
),
request,
} as never,
sessionKey: "main",
callbacks: {},
},
);
await transport.start();
const peer = FakePeerConnection.instances[0];
peer?.channel.dispatchEvent(
new MessageEvent("message", {
data: JSON.stringify({
type: "response.function_call_arguments.done",
item_id: "item-1",
call_id: "call-1",
name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME,
arguments: JSON.stringify({ question: "status?" }),
}),
}),
);
await vi.waitFor(() =>
expect(request).toHaveBeenCalledWith("talk.realtime.toolCall", expect.anything()),
);
transport.stop();
await vi.waitFor(() =>
expect(request).toHaveBeenCalledWith("chat.abort", { sessionKey: "main", runId: "run-1" }),
);
expect(listeners.size).toBe(0);
});
});

View File

@@ -59,7 +59,7 @@ describe("RealtimeTalkSession", () => {
it("starts the Google Live WebSocket transport from a generic session result", async () => {
const request = vi.fn(async () => ({
provider: "google",
transport: "json-pcm-websocket",
transport: "provider-websocket",
protocol: "google-live-bidi",
clientSecret: "auth_tokens/session",
websocketUrl: "wss://example.test/live",
@@ -83,11 +83,40 @@ describe("RealtimeTalkSession", () => {
expect(onStatus).toHaveBeenCalledWith("connecting");
});
it("keeps Google Live WebSocket sessions off the WebRTC fallback when transport is omitted", async () => {
it("defaults legacy session results without an explicit transport to WebRTC", async () => {
const request = vi.fn(async () => ({
provider: "google",
protocol: "google-live-bidi",
provider: "openai",
clientSecret: "auth_tokens/session",
}));
const session = new RealtimeTalkSession({ request } as never, "main");
await session.start();
expect(webRtcCtor).toHaveBeenCalledTimes(1);
expect(webRtcStart).toHaveBeenCalledTimes(1);
expect(googleCtor).not.toHaveBeenCalled();
});
it("accepts legacy WebRTC transport names", async () => {
const request = vi.fn(async () => ({
provider: "openai",
transport: "webrtc-sdp",
clientSecret: "secret",
}));
const session = new RealtimeTalkSession({ request } as never, "main");
await session.start();
expect(webRtcCtor).toHaveBeenCalledTimes(1);
expect(googleCtor).not.toHaveBeenCalled();
});
it("accepts legacy provider WebSocket transport names", async () => {
const request = vi.fn(async () => ({
provider: "example",
transport: "json-pcm-websocket",
clientSecret: "secret",
protocol: "google-live-bidi",
websocketUrl: "wss://example.test/live",
audio: {
inputEncoding: "pcm16",
@@ -100,38 +129,8 @@ describe("RealtimeTalkSession", () => {
await session.start();
expect(webRtcCtor).not.toHaveBeenCalled();
expect(googleCtor).toHaveBeenCalledTimes(1);
expect(googleStart).toHaveBeenCalledTimes(1);
expect(webRtcCtor).not.toHaveBeenCalled();
});
it("does not treat ambiguous Google sessions as browser WebRTC sessions", async () => {
const request = vi.fn(async () => ({
provider: "google",
clientSecret: "secret",
}));
const session = new RealtimeTalkSession({ request } as never, "main");
await expect(session.start()).rejects.toThrow(
'Realtime voice provider "google" does not support browser WebRTC sessions. Control UI Talk can use Google through the gateway relay or a Google Live WebSocket session instead. Restart the gateway so it returns "gateway-relay" or "json-pcm-websocket", or switch Talk realtime to a WebRTC-capable provider such as OpenAI.',
);
expect(webRtcCtor).not.toHaveBeenCalled();
expect(googleCtor).not.toHaveBeenCalled();
});
it("does not infer Google Live transport from websocketUrl on non-Google sessions", async () => {
const request = vi.fn(async () => ({
provider: "example",
clientSecret: "secret",
websocketUrl: "wss://example.test/live",
}));
const session = new RealtimeTalkSession({ request } as never, "main");
await session.start();
expect(webRtcCtor).toHaveBeenCalledTimes(1);
expect(googleCtor).not.toHaveBeenCalled();
});
it("starts the Gateway relay transport for backend-only realtime providers", async () => {
@@ -158,9 +157,10 @@ describe("RealtimeTalkSession", () => {
expect(webRtcCtor).not.toHaveBeenCalled();
});
it("keeps legacy session results on the OpenAI-style WebRTC transport", async () => {
it("starts the WebRTC transport for canonical WebRTC sessions", async () => {
const request = vi.fn(async () => ({
provider: "openai",
transport: "webrtc",
clientSecret: "secret",
}));
const session = new RealtimeTalkSession({ request } as never, "main");