diff --git a/ui/src/ui/chat/realtime-talk-gateway-relay.ts b/ui/src/ui/chat/realtime-talk-gateway-relay.ts index 5eab5d67252..9d71d26af14 100644 --- a/ui/src/ui/chat/realtime-talk-gateway-relay.ts +++ b/ui/src/ui/chat/realtime-talk-gateway-relay.ts @@ -3,31 +3,38 @@ import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, submitRealtimeTalkConsult, type RealtimeTalkGatewayRelaySessionResult, + type RealtimeTalkEvent, type RealtimeTalkTransport, type RealtimeTalkTransportContext, } from "./realtime-talk-shared.ts"; -type GatewayRelayEvent = - | { relaySessionId?: string; type?: "ready" } - | { relaySessionId?: string; type?: "audio"; audioBase64?: string } - | { relaySessionId?: string; type?: "clear" } - | { relaySessionId?: string; type?: "mark"; markName?: string } +type GatewayRelayEvent = { + relaySessionId?: string; + talkEvent?: RealtimeTalkEvent; +} & ( + | { type?: "ready" } + | { type?: "audio"; audioBase64?: string } + | { type?: "clear" } + | { type?: "mark"; markName?: string } | { - relaySessionId?: string; type?: "transcript"; role?: "user" | "assistant"; text?: string; final?: boolean; } | { - relaySessionId?: string; type?: "toolCall"; callId?: string; name?: string; args?: unknown; } - | { relaySessionId?: string; type?: "error"; message?: string } - | { relaySessionId?: string; type?: "close"; reason?: string }; + | { type?: "error"; message?: string } + | { type?: "close"; reason?: string } +); + +const BARGE_IN_RMS_THRESHOLD = 0.02; +const BARGE_IN_PEAK_THRESHOLD = 0.08; +const BARGE_IN_CONSECUTIVE_SPEECH_FRAMES = 2; export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport { private media: MediaStream | null = null; @@ -39,6 +46,9 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport private playhead = 0; private closed = false; private readonly sources = new Set(); + private readonly consultAbortControllers = new Set(); + private cancelRequestedForPlayback = false; + private speechFramesDuringPlayback = 0; constructor( private readonly session: RealtimeTalkGatewayRelaySessionResult, @@ -62,7 +72,13 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport } this.handleRelayEvent(evt.payload as GatewayRelayEvent); }); - this.media = await navigator.mediaDevices.getUserMedia({ audio: true }); + this.media = await navigator.mediaDevices.getUserMedia({ + audio: { + autoGainControl: true, + echoCancellation: true, + noiseSuppression: true, + }, + }); this.inputContext = new AudioContext({ sampleRate: this.session.audio.inputSampleRateHz }); this.outputContext = new AudioContext({ sampleRate: this.session.audio.outputSampleRateHz }); this.startMicrophonePump(); @@ -76,6 +92,7 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport this.inputProcessor = null; this.inputSource?.disconnect(); this.inputSource = null; + this.abortConsults(); this.media?.getTracks().forEach((track) => track.stop()); this.media = null; this.stopOutput(); @@ -98,7 +115,11 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport if (this.closed) { return; } - const pcm = floatToPcm16(event.inputBuffer.getChannelData(0)); + const samples = event.inputBuffer.getChannelData(0); + const pcm = floatToPcm16(samples); + if (this.detectBargeInSpeech(samples)) { + this.cancelOutputForBargeIn(); + } void this.ctx.client.request("talk.realtime.relayAudio", { relaySessionId: this.session.relaySessionId, audioBase64: bytesToBase64(pcm), @@ -113,12 +134,17 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport if (event.relaySessionId !== this.session.relaySessionId || this.closed) { return; } + if (event.talkEvent) { + this.ctx.callbacks.onTalkEvent?.(event.talkEvent); + } switch (event.type) { case "ready": this.ctx.callbacks.onStatus?.("listening"); return; case "audio": if (event.audioBase64) { + this.cancelRequestedForPlayback = false; + this.speechFramesDuringPlayback = 0; this.playPcm16(event.audioBase64); } return; @@ -144,6 +170,7 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport this.ctx.callbacks.onStatus?.("error", event.message ?? "Realtime relay failed"); return; case "close": + this.abortConsults(); if (!this.closed) { this.ctx.callbacks.onStatus?.( event.reason === "error" ? "error" : "idle", @@ -188,6 +215,7 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport } this.sources.clear(); this.playhead = this.outputContext?.currentTime ?? 0; + this.speechFramesDuringPlayback = 0; } private scheduleMarkAck(): void { @@ -219,12 +247,20 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport this.submitToolResult(callId, { error: `Tool "${name}" not available in browser Talk` }); return; } - await submitRealtimeTalkConsult({ - ctx: this.ctx, - callId, - args: event.args ?? {}, - submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), - }); + const abortController = new AbortController(); + this.consultAbortControllers.add(abortController); + try { + await submitRealtimeTalkConsult({ + ctx: this.ctx, + callId, + args: event.args ?? {}, + relaySessionId: this.session.relaySessionId, + signal: abortController.signal, + submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + }); + } finally { + this.consultAbortControllers.delete(abortController); + } } private submitToolResult(callId: string, result: unknown): void { @@ -234,4 +270,45 @@ export class GatewayRelayRealtimeTalkTransport implements RealtimeTalkTransport result, }); } + + private cancelOutputForBargeIn(): void { + if (this.sources.size === 0 || this.cancelRequestedForPlayback) { + return; + } + this.cancelRequestedForPlayback = true; + this.stopOutput(); + void this.ctx.client.request("talk.realtime.relayCancel", { + relaySessionId: this.session.relaySessionId, + reason: "barge-in", + }); + } + + private abortConsults(): void { + for (const controller of this.consultAbortControllers) { + controller.abort(); + } + this.consultAbortControllers.clear(); + } + + private detectBargeInSpeech(samples: Float32Array): boolean { + if (this.sources.size === 0 || this.cancelRequestedForPlayback || samples.length === 0) { + this.speechFramesDuringPlayback = 0; + return false; + } + + let sumSquares = 0; + let peak = 0; + for (const sample of samples) { + const abs = Math.abs(sample); + peak = Math.max(peak, abs); + sumSquares += sample * sample; + } + const rms = Math.sqrt(sumSquares / samples.length); + if (rms >= BARGE_IN_RMS_THRESHOLD && peak >= BARGE_IN_PEAK_THRESHOLD) { + this.speechFramesDuringPlayback += 1; + } else { + this.speechFramesDuringPlayback = 0; + } + return this.speechFramesDuringPlayback >= BARGE_IN_CONSECUTIVE_SPEECH_FRAMES; + } } diff --git a/ui/src/ui/chat/realtime-talk-google-live.ts b/ui/src/ui/chat/realtime-talk-google-live.ts index 5d6254e3356..a7cbf2b947b 100644 --- a/ui/src/ui/chat/realtime-talk-google-live.ts +++ b/ui/src/ui/chat/realtime-talk-google-live.ts @@ -2,6 +2,7 @@ import { base64ToBytes, bytesToBase64, floatToPcm16, pcm16ToFloat } from "./real import type { RealtimeTalkJsonPcmWebSocketSessionResult } from "./realtime-talk-shared.ts"; import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + createRealtimeTalkEventEmitter, submitRealtimeTalkConsult, type RealtimeTalkTransport, type RealtimeTalkTransportContext, @@ -74,12 +75,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { private playhead = 0; private closed = false; private pendingCalls = new Map(); + private readonly consultAbortControllers = new Set(); private readonly sources = new Set(); + private readonly emitTalkEvent: ReturnType; constructor( private readonly session: RealtimeTalkJsonPcmWebSocketSessionResult, private readonly ctx: RealtimeTalkTransportContext, - ) {} + ) { + this.emitTalkEvent = createRealtimeTalkEventEmitter(ctx, session); + } async start(): Promise { if (!navigator.mediaDevices?.getUserMedia || typeof WebSocket === "undefined") { @@ -118,7 +123,14 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { } stop(): void { + if (!this.closed) { + this.emitTalkEvent({ type: "session.closed", final: true }); + } this.closed = true; + for (const controller of this.consultAbortControllers) { + controller.abort(); + } + this.consultAbortControllers.clear(); this.pendingCalls.clear(); this.inputProcessor?.disconnect(); this.inputProcessor = null; @@ -180,10 +192,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { } if (message.setupComplete) { this.ctx.callbacks.onStatus?.("listening"); + this.emitTalkEvent({ type: "session.ready" }); } const content = message.serverContent; if (content?.interrupted) { this.stopOutput(); + this.emitTalkEvent({ + type: "turn.cancelled", + final: true, + payload: { reason: "provider-interrupted" }, + }); } if (content?.inputTranscription?.text) { this.ctx.callbacks.onTranscript?.({ @@ -191,6 +209,11 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { text: content.inputTranscription.text, final: content.inputTranscription.finished ?? false, }); + this.emitTalkEvent({ + type: content.inputTranscription.finished ? "transcript.done" : "transcript.delta", + final: content.inputTranscription.finished ?? false, + payload: { role: "user", text: content.inputTranscription.text }, + }); } if (content?.outputTranscription?.text) { this.ctx.callbacks.onTranscript?.({ @@ -198,9 +221,21 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { text: content.outputTranscription.text, final: content.outputTranscription.finished ?? false, }); + this.emitTalkEvent({ + type: content.outputTranscription.finished ? "output.text.done" : "output.text.delta", + final: content.outputTranscription.finished ?? false, + payload: { text: content.outputTranscription.text }, + }); } for (const part of content?.modelTurn?.parts ?? []) { if (part.inlineData?.data) { + this.emitTalkEvent({ + type: "output.audio.delta", + payload: { + byteLength: base64ToBytes(part.inlineData.data).byteLength, + mimeType: part.inlineData.mimeType, + }, + }); this.playPcm16(part.inlineData.data); } else if (!part.thought && typeof part.text === "string" && part.text.trim()) { this.ctx.callbacks.onTranscript?.({ @@ -208,8 +243,16 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { text: part.text, final: content?.turnComplete ?? false, }); + this.emitTalkEvent({ + type: content?.turnComplete ? "output.text.done" : "output.text.delta", + final: content?.turnComplete ?? false, + payload: { text: part.text }, + }); } } + if (content?.turnComplete) { + this.emitTalkEvent({ type: "turn.ended", final: true }); + } for (const call of message.toolCall?.functionCalls ?? []) { void this.handleToolCall(call); } @@ -260,15 +303,27 @@ export class GoogleLiveRealtimeTalkTransport implements RealtimeTalkTransport { return; } this.pendingCalls.set(callId, { name, args: call.args ?? {} }); + this.emitTalkEvent({ + type: "tool.call", + callId, + payload: { name, args: call.args ?? {} }, + }); if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME) { return; } - await submitRealtimeTalkConsult({ - ctx: this.createActiveContext(), - callId, - args: call.args ?? {}, - submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), - }); + const abortController = new AbortController(); + this.consultAbortControllers.add(abortController); + try { + await submitRealtimeTalkConsult({ + ctx: this.createActiveContext(), + callId, + args: call.args ?? {}, + signal: abortController.signal, + submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + }); + } finally { + this.consultAbortControllers.delete(abortController); + } } private createActiveContext(): RealtimeTalkTransportContext { diff --git a/ui/src/ui/chat/realtime-talk-shared.ts b/ui/src/ui/chat/realtime-talk-shared.ts index 0567544a239..1e2a1399f43 100644 --- a/ui/src/ui/chat/realtime-talk-shared.ts +++ b/ui/src/ui/chat/realtime-talk-shared.ts @@ -1,15 +1,25 @@ -import { - buildRealtimeVoiceAgentConsultChatMessage, - REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, -} from "../../../../src/realtime-voice/agent-consult-tool.js"; +import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "../../../../src/realtime-voice/agent-consult-tool.js"; +import type { TalkEvent } from "../../../../src/realtime-voice/talk-events.js"; import type { GatewayBrowserClient, GatewayEventFrame } from "../gateway.ts"; -import { generateUUID } from "../uuid.ts"; export type RealtimeTalkStatus = "idle" | "connecting" | "listening" | "thinking" | "error"; +export type RealtimeTalkEvent = TalkEvent; export type RealtimeTalkCallbacks = { onStatus?: (status: RealtimeTalkStatus, detail?: string) => void; onTranscript?: (entry: { role: "user" | "assistant"; text: string; final: boolean }) => void; + onTalkEvent?: (event: RealtimeTalkEvent) => void; +}; + +export type RealtimeTalkEventInput = { + type: RealtimeTalkEvent["type"]; + payload?: TPayload; + turnId?: string; + captureId?: string; + final?: boolean; + callId?: string; + itemId?: string; + parentId?: string; }; export type RealtimeTalkAudioContract = { @@ -21,7 +31,7 @@ export type RealtimeTalkAudioContract = { export type RealtimeTalkWebRtcSdpSessionResult = { provider: string; - transport?: "webrtc-sdp"; + transport: "webrtc"; clientSecret: string; offerUrl?: string; offerHeaders?: Record; @@ -32,7 +42,7 @@ export type RealtimeTalkWebRtcSdpSessionResult = { export type RealtimeTalkJsonPcmWebSocketSessionResult = { provider: string; - transport: "json-pcm-websocket"; + transport: "provider-websocket"; protocol: string; clientSecret: string; websocketUrl: string; @@ -80,6 +90,86 @@ export type RealtimeTalkTransportContext = { callbacks: RealtimeTalkCallbacks; }; +export function createRealtimeTalkEventEmitter( + ctx: RealtimeTalkTransportContext, + session: RealtimeTalkSessionResult, +): (input: RealtimeTalkEventInput) => void { + let seq = 0; + let turnSeq = 0; + let activeTurnId: string | undefined; + const sessionId = resolveRealtimeTalkEventSessionId(ctx, session); + return (input) => { + if (!ctx.callbacks.onTalkEvent) { + return; + } + const turnId = resolveRealtimeTalkTurnId(input); + seq += 1; + ctx.callbacks.onTalkEvent({ + id: `${sessionId}:${seq}`, + type: input.type, + sessionId, + turnId, + captureId: input.captureId, + seq, + timestamp: new Date().toISOString(), + mode: "realtime", + transport: session.transport, + brain: "agent-consult", + provider: session.provider, + final: input.final, + callId: input.callId, + itemId: input.itemId, + parentId: input.parentId, + payload: input.payload ?? null, + }); + if ( + input.type === "turn.ended" || + input.type === "turn.cancelled" || + input.type === "session.replaced" || + input.type === "session.closed" + ) { + activeTurnId = undefined; + } + }; + + function resolveRealtimeTalkTurnId(input: RealtimeTalkEventInput): string | undefined { + if (input.type === "turn.started") { + activeTurnId = input.turnId ?? activeTurnId ?? `turn-${++turnSeq}`; + return activeTurnId; + } + if (!isTurnScopedTalkEvent(input.type)) { + return input.turnId; + } + activeTurnId = input.turnId ?? activeTurnId ?? `turn-${++turnSeq}`; + return activeTurnId; + } +} + +function isTurnScopedTalkEvent(type: RealtimeTalkEvent["type"]): boolean { + return ( + type === "turn.ended" || + type === "turn.cancelled" || + type.startsWith("input.audio.") || + type.startsWith("transcript.") || + type.startsWith("output.") || + type.startsWith("tool.") + ); +} + +function resolveRealtimeTalkEventSessionId( + ctx: RealtimeTalkTransportContext, + session: RealtimeTalkSessionResult, +): string { + const explicitSessionId = (session as { sessionId?: unknown }).sessionId; + if (typeof explicitSessionId === "string" && explicitSessionId.trim()) { + return explicitSessionId.trim(); + } + if ("relaySessionId" in session && session.relaySessionId.trim()) { + return session.relaySessionId; + } + return `${ctx.sessionKey}:${session.provider}:${session.transport}`; +} + type ChatPayload = { runId?: string; state?: string; @@ -112,13 +202,24 @@ function waitForChatResult(params: { client: GatewayBrowserClient; runId: string; timeoutMs: number; + signal?: AbortSignal; }): Promise { return new Promise((resolve, reject) => { + if (params.signal?.aborted) { + reject(new DOMException("OpenClaw tool call aborted", "AbortError")); + return; + } const timer = window.setTimeout(() => { - unsubscribe(); + cleanup(); reject(new Error("OpenClaw tool call timed out")); }, params.timeoutMs); - const unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => { + const onAbort = () => { + cleanup(); + reject(new DOMException("OpenClaw tool call aborted", "AbortError")); + }; + params.signal?.addEventListener("abort", onAbort, { once: true }); + let unsubscribe: () => void = () => undefined; + unsubscribe = params.client.addEventListener((evt: GatewayEventFrame) => { if (evt.event !== "chat") { return; } @@ -127,15 +228,23 @@ function waitForChatResult(params: { return; } if (payload.state === "final") { - window.clearTimeout(timer); - unsubscribe(); + cleanup(); resolve(extractTextFromMessage(payload.message) || "OpenClaw finished with no text."); + } else if (payload.state === "aborted") { + cleanup(); + reject( + new DOMException(payload.errorMessage ?? "OpenClaw tool call aborted", "AbortError"), + ); } else if (payload.state === "error") { - window.clearTimeout(timer); - unsubscribe(); + cleanup(); reject(new Error(payload.errorMessage ?? "OpenClaw tool call failed")); } }); + function cleanup() { + window.clearTimeout(timer); + params.signal?.removeEventListener("abort", onAbort); + unsubscribe(); + } }); } @@ -144,42 +253,72 @@ export async function submitRealtimeTalkConsult(params: { args: unknown; submit: (callId: string, result: unknown) => void; callId: string; + relaySessionId?: string; + signal?: AbortSignal; }): Promise { const { ctx, callId, submit } = params; ctx.callbacks.onStatus?.("thinking"); - let question = ""; + let runId: string | undefined; + let aborted = false; + const abortRun = () => { + aborted = true; + if (runId) { + void ctx.client.request("chat.abort", { sessionKey: ctx.sessionKey, runId }); + } + }; + if (params.signal?.aborted) { + return; + } + params.signal?.addEventListener("abort", abortRun, { once: true }); try { const args = typeof params.args === "string" ? JSON.parse(params.args || "{}") : (params.args ?? {}); - question = buildRealtimeVoiceAgentConsultChatMessage(args); - } catch {} - if (!question) { - submit(callId, { - error: `${REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME} requires a question`, - }); - ctx.callbacks.onStatus?.("listening"); - return; - } - try { - const idempotencyKey = generateUUID(); - const response = await ctx.client.request<{ runId?: string }>("chat.send", { - sessionKey: ctx.sessionKey, - message: question, - idempotencyKey, - }); + const response = await ctx.client.request<{ runId?: string; idempotencyKey?: string }>( + "talk.realtime.toolCall", + { + sessionKey: ctx.sessionKey, + callId, + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + args, + ...(params.relaySessionId ? { relaySessionId: params.relaySessionId } : {}), + }, + ); + runId = response.runId ?? response.idempotencyKey; + if (!runId) { + throw new Error("OpenClaw realtime tool call did not return a run id"); + } + if (params.signal?.aborted) { + abortRun(); + return; + } const result = await waitForChatResult({ client: ctx.client, - runId: response.runId ?? idempotencyKey, + runId, timeoutMs: 120_000, + signal: params.signal, }); submit(callId, { result }); } catch (error) { + if (aborted || params.signal?.aborted || isAbortError(error)) { + return; + } submit(callId, { error: error instanceof Error ? error.message : String(error), }); } finally { - ctx.callbacks.onStatus?.("listening"); + params.signal?.removeEventListener("abort", abortRun); + if (!aborted && !params.signal?.aborted) { + ctx.callbacks.onStatus?.("listening"); + } } } +function isAbortError(error: unknown): boolean { + return ( + typeof DOMException !== "undefined" && + error instanceof DOMException && + error.name === "AbortError" + ); +} + export { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME }; diff --git a/ui/src/ui/chat/realtime-talk-webrtc.ts b/ui/src/ui/chat/realtime-talk-webrtc.ts index 88bdf7ad506..f6293c66b07 100644 --- a/ui/src/ui/chat/realtime-talk-webrtc.ts +++ b/ui/src/ui/chat/realtime-talk-webrtc.ts @@ -1,6 +1,7 @@ import type { RealtimeTalkWebRtcSdpSessionResult } from "./realtime-talk-shared.ts"; import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + createRealtimeTalkEventEmitter, submitRealtimeTalkConsult, type RealtimeTalkTransport, type RealtimeTalkTransportContext, @@ -34,11 +35,15 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { private audio: HTMLAudioElement | null = null; private closed = false; private toolBuffers = new Map(); + private readonly consultAbortControllers = new Set(); + private readonly emitTalkEvent: ReturnType; constructor( private readonly session: RealtimeTalkWebRtcSdpSessionResult, private readonly ctx: RealtimeTalkTransportContext, - ) {} + ) { + this.emitTalkEvent = createRealtimeTalkEventEmitter(ctx, session); + } async start(): Promise { if (!navigator.mediaDevices?.getUserMedia || typeof RTCPeerConnection === "undefined") { @@ -60,7 +65,10 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { this.peer.addTrack(track, this.media); } this.channel = this.peer.createDataChannel("oai-events"); - this.channel.addEventListener("open", () => this.ctx.callbacks.onStatus?.("listening")); + this.channel.addEventListener("open", () => { + this.ctx.callbacks.onStatus?.("listening"); + this.emitTalkEvent({ type: "session.ready" }); + }); this.channel.addEventListener("message", (event) => this.handleRealtimeEvent(event.data)); this.peer.addEventListener("connectionstatechange", () => { if (this.closed) { @@ -92,6 +100,9 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { } stop(): void { + if (!this.closed) { + this.emitTalkEvent({ type: "session.closed", final: true }); + } this.closed = true; this.channel?.close(); this.channel = null; @@ -101,6 +112,10 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { this.media = null; this.audio?.remove(); this.audio = null; + for (const controller of this.consultAbortControllers) { + controller.abort(); + } + this.consultAbortControllers.clear(); this.toolBuffers.clear(); } @@ -111,6 +126,9 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { } private handleRealtimeEvent(data: unknown): void { + if (this.closed) { + return; + } let event: RealtimeServerEvent; try { event = JSON.parse(String(data)) as RealtimeServerEvent; @@ -121,6 +139,12 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { case "conversation.item.input_audio_transcription.completed": if (event.transcript) { this.ctx.callbacks.onTranscript?.({ role: "user", text: event.transcript, final: true }); + this.emitTalkEvent({ + type: "transcript.done", + final: true, + itemId: event.item_id, + payload: { role: "user", text: event.transcript }, + }); } return; case "response.audio_transcript.done": @@ -130,6 +154,12 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { text: event.transcript, final: true, }); + this.emitTalkEvent({ + type: "output.text.done", + final: true, + itemId: event.item_id, + payload: { text: event.transcript }, + }); } return; case "response.function_call_arguments.delta": @@ -140,18 +170,30 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { return; case "input_audio_buffer.speech_started": this.ctx.callbacks.onStatus?.("listening", "Speech detected"); + this.emitTalkEvent({ type: "turn.started", payload: { source: event.type } }); return; case "input_audio_buffer.speech_stopped": this.ctx.callbacks.onStatus?.("thinking", "Processing speech"); + this.emitTalkEvent({ type: "input.audio.committed", final: true }); return; case "response.created": this.ctx.callbacks.onStatus?.("thinking", "Generating response"); return; case "response.done": this.ctx.callbacks.onStatus?.("listening", this.extractResponseStatus(event)); + this.emitTalkEvent({ + type: "turn.ended", + final: true, + payload: { status: event.response?.status ?? "completed" }, + }); return; case "error": this.ctx.callbacks.onStatus?.("error", this.extractErrorDetail(event.error)); + this.emitTalkEvent({ + type: "session.error", + final: true, + payload: { message: this.extractErrorDetail(event.error) }, + }); return; default: return; @@ -197,12 +239,25 @@ export class WebRtcSdpRealtimeTalkTransport implements RealtimeTalkTransport { if (name !== REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME || !callId) { return; } - await submitRealtimeTalkConsult({ - ctx: this.ctx, + this.emitTalkEvent({ + type: "tool.call", callId, - args: buffered?.args || event.arguments || "{}", - submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + itemId: key, + payload: { name, args: buffered?.args || event.arguments || "{}" }, }); + const abortController = new AbortController(); + this.consultAbortControllers.add(abortController); + try { + await submitRealtimeTalkConsult({ + ctx: this.ctx, + callId, + args: buffered?.args || event.arguments || "{}", + signal: abortController.signal, + submit: (toolCallId, result) => this.submitToolResult(toolCallId, result), + }); + } finally { + this.consultAbortControllers.delete(abortController); + } } private submitToolResult(callId: string, result: unknown): void { diff --git a/ui/src/ui/chat/realtime-talk.ts b/ui/src/ui/chat/realtime-talk.ts index e348490fadf..4686af7cda1 100644 --- a/ui/src/ui/chat/realtime-talk.ts +++ b/ui/src/ui/chat/realtime-talk.ts @@ -1,8 +1,10 @@ +import { normalizeTalkTransport } from "../../../../src/realtime-voice/talk-session-controller.js"; import type { GatewayBrowserClient } from "../gateway.ts"; import { GatewayRelayRealtimeTalkTransport } from "./realtime-talk-gateway-relay.ts"; import { GoogleLiveRealtimeTalkTransport } from "./realtime-talk-google-live.ts"; import { type RealtimeTalkCallbacks, + type RealtimeTalkEvent, type RealtimeTalkGatewayRelaySessionResult, type RealtimeTalkJsonPcmWebSocketSessionResult, type RealtimeTalkSessionResult, @@ -13,17 +15,22 @@ import { } from "./realtime-talk-shared.ts"; import { WebRtcSdpRealtimeTalkTransport } from "./realtime-talk-webrtc.ts"; -export type { RealtimeTalkCallbacks, RealtimeTalkSessionResult, RealtimeTalkStatus }; +export type { + RealtimeTalkCallbacks, + RealtimeTalkEvent, + RealtimeTalkSessionResult, + RealtimeTalkStatus, +}; function createTransport( session: RealtimeTalkSessionResult, ctx: RealtimeTalkTransportContext, ): RealtimeTalkTransport { const transport = resolveTransport(session); - if (transport === "webrtc-sdp") { + if (transport === "webrtc") { return new WebRtcSdpRealtimeTalkTransport(session as RealtimeTalkWebRtcSdpSessionResult, ctx); } - if (transport === "json-pcm-websocket") { + if (transport === "provider-websocket") { return new GoogleLiveRealtimeTalkTransport( session as RealtimeTalkJsonPcmWebSocketSessionResult, ctx, @@ -43,30 +50,7 @@ function createTransport( } function resolveTransport(session: RealtimeTalkSessionResult): string { - if (session.transport) { - return session.transport; - } - const raw = session as { - provider?: string; - protocol?: string; - websocketUrl?: string; - }; - const provider = raw.provider?.trim().toLowerCase(); - if (provider === "google" && (raw.protocol === "google-live-bidi" || raw.websocketUrl)) { - return "json-pcm-websocket"; - } - if (provider === "google") { - throw new Error(buildGoogleWebRtcUnsupportedMessage()); - } - return "webrtc-sdp"; -} - -function buildGoogleWebRtcUnsupportedMessage(): string { - return [ - 'Realtime voice provider "google" does not support browser WebRTC sessions.', - "Control UI Talk can use Google through the gateway relay or a Google Live WebSocket session instead.", - 'Restart the gateway so it returns "gateway-relay" or "json-pcm-websocket", or switch Talk realtime to a WebRTC-capable provider such as OpenAI.', - ].join(" "); + return normalizeTalkTransport((session as { transport?: string }).transport) ?? "webrtc"; } export class RealtimeTalkSession { diff --git a/ui/src/ui/realtime-talk-gateway-relay.test.ts b/ui/src/ui/realtime-talk-gateway-relay.test.ts new file mode 100644 index 00000000000..a9fdac1744e --- /dev/null +++ b/ui/src/ui/realtime-talk-gateway-relay.test.ts @@ -0,0 +1,361 @@ +// @vitest-environment jsdom +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { GatewayRelayRealtimeTalkTransport } from "./chat/realtime-talk-gateway-relay.ts"; +import { + REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + type RealtimeTalkEvent, + type RealtimeTalkGatewayRelaySessionResult, + type RealtimeTalkTransportContext, +} from "./chat/realtime-talk-shared.ts"; + +type GatewayFrame = { event: string; payload?: unknown }; +type GatewayListener = (event: GatewayFrame) => void; +type MockProcessor = { + connect: ReturnType; + disconnect: ReturnType; + onaudioprocess: + | ((event: { inputBuffer: { getChannelData: (channel: number) => Float32Array } }) => void) + | null; +}; + +const listeners = new Set(); +const processors: MockProcessor[] = []; + +class MockAudioContext { + readonly currentTime = 0; + readonly destination = {}; + readonly close = vi.fn(async () => undefined); + + createMediaStreamSource() { + return { + connect: vi.fn(), + disconnect: vi.fn(), + }; + } + + createScriptProcessor() { + const processor: MockProcessor = { + connect: vi.fn(), + disconnect: vi.fn(), + onaudioprocess: null, + }; + processors.push(processor); + return processor; + } + + createBuffer(_channels: number, length: number, sampleRate: number) { + const channel = new Float32Array(length); + return { + duration: length / sampleRate, + getChannelData: () => channel, + }; + } + + createBufferSource() { + return { + addEventListener: vi.fn(), + buffer: null, + connect: vi.fn(), + start: vi.fn(), + stop: vi.fn(), + }; + } +} + +function createSession(): RealtimeTalkGatewayRelaySessionResult { + return { + provider: "openai", + transport: "gateway-relay", + relaySessionId: "relay-1", + audio: { + inputEncoding: "pcm16", + inputSampleRateHz: 24000, + outputEncoding: "pcm16", + outputSampleRateHz: 24000, + }, + }; +} + +function createClient(): RealtimeTalkTransportContext["client"] { + return { + addEventListener: vi.fn((listener: GatewayListener) => { + listeners.add(listener); + return () => listeners.delete(listener); + }), + request: vi.fn(async () => ({})), + } as unknown as RealtimeTalkTransportContext["client"]; +} + +function emitGatewayFrame(frame: GatewayFrame): void { + for (const listener of listeners) { + listener(frame); + } +} + +function pumpMicrophone(samples: Float32Array): void { + const processor = processors.at(-1); + expect(processor).toBeDefined(); + processor?.onaudioprocess?.({ + inputBuffer: { + getChannelData: () => samples, + }, + }); +} + +describe("GatewayRelayRealtimeTalkTransport", () => { + beforeEach(() => { + listeners.clear(); + processors.length = 0; + vi.stubGlobal("AudioContext", MockAudioContext); + Object.defineProperty(globalThis.navigator, "mediaDevices", { + configurable: true, + value: { + getUserMedia: vi.fn(async () => ({ + getTracks: () => [{ stop: vi.fn() }], + })), + }, + }); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + listeners.clear(); + processors.length = 0; + }); + + it("forwards common Talk events from Gateway relay frames", async () => { + const onTalkEvent = vi.fn(); + const transport = new GatewayRelayRealtimeTalkTransport(createSession(), { + callbacks: { onTalkEvent }, + client: createClient(), + sessionKey: "main", + }); + const talkEvent = { + id: "relay-1:1", + type: "session.ready", + sessionId: "relay-1", + seq: 1, + timestamp: "2026-05-05T00:00:00.000Z", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + payload: {}, + } satisfies RealtimeTalkEvent; + + await transport.start(); + emitGatewayFrame({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-1", + type: "ready", + talkEvent, + }, + }); + + expect(onTalkEvent).toHaveBeenCalledWith(talkEvent); + transport.stop(); + }); + + it("does not forward Talk events for another relay session", async () => { + const onTalkEvent = vi.fn(); + const transport = new GatewayRelayRealtimeTalkTransport(createSession(), { + callbacks: { onTalkEvent }, + client: createClient(), + sessionKey: "main", + }); + + await transport.start(); + emitGatewayFrame({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-other", + type: "ready", + talkEvent: { + id: "relay-other:1", + type: "session.ready", + sessionId: "relay-other", + seq: 1, + timestamp: "2026-05-05T00:00:00.000Z", + mode: "realtime", + transport: "gateway-relay", + brain: "agent-consult", + payload: {}, + } satisfies RealtimeTalkEvent, + }, + }); + + expect(onTalkEvent).not.toHaveBeenCalled(); + transport.stop(); + }); + + it("keeps assistant playback alive while relay input is silence", async () => { + const client = createClient(); + const transport = new GatewayRelayRealtimeTalkTransport(createSession(), { + callbacks: {}, + client, + sessionKey: "main", + }); + + await transport.start(); + emitGatewayFrame({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-1", + type: "audio", + audioBase64: "AAAA", + }, + }); + pumpMicrophone(new Float32Array(4096)); + + expect(client.request).not.toHaveBeenCalledWith("talk.realtime.relayCancel", expect.anything()); + expect(client.request).toHaveBeenCalledWith( + "talk.realtime.relayAudio", + expect.objectContaining({ relaySessionId: "relay-1" }), + ); + transport.stop(); + }); + + it("cancels relay playback after sustained input speech", async () => { + const client = createClient(); + const transport = new GatewayRelayRealtimeTalkTransport(createSession(), { + callbacks: {}, + client, + sessionKey: "main", + }); + const speech = new Float32Array(4096).fill(0.25); + + await transport.start(); + emitGatewayFrame({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-1", + type: "audio", + audioBase64: "AAAA", + }, + }); + pumpMicrophone(speech); + expect(client.request).not.toHaveBeenCalledWith("talk.realtime.relayCancel", expect.anything()); + + pumpMicrophone(speech); + pumpMicrophone(speech); + + const cancelCalls = vi + .mocked(client.request) + .mock.calls.filter(([method]) => method === "talk.realtime.relayCancel"); + expect(cancelCalls).toEqual([ + [ + "talk.realtime.relayCancel", + { + relaySessionId: "relay-1", + reason: "barge-in", + }, + ], + ]); + transport.stop(); + }); + + it("treats aborted consult chat events as cancellation", async () => { + const onStatus = vi.fn(); + const client = createClient(); + vi.mocked(client.request).mockImplementation(async (method) => { + if (method === "talk.realtime.toolCall") { + return { runId: "run-1" }; + } + return {}; + }); + const transport = new GatewayRelayRealtimeTalkTransport(createSession(), { + callbacks: { onStatus }, + client, + sessionKey: "main", + }); + + await transport.start(); + emitGatewayFrame({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-1", + type: "toolCall", + callId: "call-1", + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + args: { question: "status?" }, + }, + }); + await vi.waitFor(() => + expect(client.request).toHaveBeenCalledWith( + "talk.realtime.toolCall", + expect.objectContaining({ + callId: "call-1", + relaySessionId: "relay-1", + }), + ), + ); + + emitGatewayFrame({ + event: "chat", + payload: { + runId: "run-1", + state: "aborted", + }, + }); + + await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening")); + expect( + vi + .mocked(client.request) + .mock.calls.some(([method]) => method === "talk.realtime.relayToolResult"), + ).toBe(false); + transport.stop(); + }); + + it("aborts in-flight consults when the relay transport stops", async () => { + const client = createClient(); + vi.mocked(client.request).mockImplementation(async (method, params) => { + if (method === "chat.abort") { + expect(params).toEqual({ sessionKey: "main", runId: "run-1" }); + return { ok: true, aborted: true }; + } + if (method === "talk.realtime.toolCall") { + return { runId: "run-1" }; + } + return {}; + }); + const transport = new GatewayRelayRealtimeTalkTransport(createSession(), { + callbacks: {}, + client, + sessionKey: "main", + }); + + await transport.start(); + emitGatewayFrame({ + event: "talk.realtime.relay", + payload: { + relaySessionId: "relay-1", + type: "toolCall", + callId: "call-1", + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + args: { question: "status?" }, + }, + }); + await vi.waitFor(() => + expect(client.request).toHaveBeenCalledWith("talk.realtime.toolCall", expect.anything()), + ); + + transport.stop(); + await vi.waitFor(() => + expect(client.request).toHaveBeenCalledWith("chat.abort", { + sessionKey: "main", + runId: "run-1", + }), + ); + emitGatewayFrame({ + event: "chat", + payload: { runId: "run-1", state: "final", message: { text: "late answer" } }, + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + expect( + vi + .mocked(client.request) + .mock.calls.some(([method]) => method === "talk.realtime.relayToolResult"), + ).toBe(false); + }); +}); diff --git a/ui/src/ui/realtime-talk-google-live.test.ts b/ui/src/ui/realtime-talk-google-live.test.ts index b2e652d5123..39e85c4b85e 100644 --- a/ui/src/ui/realtime-talk-google-live.test.ts +++ b/ui/src/ui/realtime-talk-google-live.test.ts @@ -117,7 +117,7 @@ function createSession( ): RealtimeTalkJsonPcmWebSocketSessionResult { return { provider: "google", - transport: "json-pcm-websocket", + transport: "provider-websocket", protocol: "google-live-bidi", clientSecret, websocketUrl, @@ -187,7 +187,8 @@ describe("GoogleLiveRealtimeTalkTransport", () => { it("requests ArrayBuffer frames and decodes binary setup messages", async () => { const onStatus = vi.fn(); - const transport = createTransport({ onStatus }); + const onTalkEvent = vi.fn(); + const transport = createTransport({ onStatus, onTalkEvent }); await transport.start(); const ws = latestWebSocket(); @@ -195,6 +196,13 @@ describe("GoogleLiveRealtimeTalkTransport", () => { expect(ws.binaryType).toBe("arraybuffer"); await vi.waitFor(() => expect(onStatus).toHaveBeenCalledWith("listening")); + expect(onTalkEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: "session.ready", + sessionId: "main:google:provider-websocket", + transport: "provider-websocket", + }), + ); }); it("decodes Blob setup messages", async () => { @@ -208,7 +216,8 @@ describe("GoogleLiveRealtimeTalkTransport", () => { }); it("stops queued output when Google Live sends interruption", async () => { - const transport = createTransport(); + const onTalkEvent = vi.fn(); + const transport = createTransport({ onTalkEvent }); await transport.start(); const ws = latestWebSocket(); @@ -227,6 +236,60 @@ describe("GoogleLiveRealtimeTalkTransport", () => { ws.emitMessage(encodeJsonFrame({ serverContent: { interrupted: true } })); await vi.waitFor(() => expect(source?.stop).toHaveBeenCalledTimes(1)); + expect(onTalkEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: "turn.cancelled", + final: true, + payload: { reason: "provider-interrupted" }, + }), + ); + }); + + it("emits common Talk events for Google Live transcript and audio frames", async () => { + const onTranscript = vi.fn(); + const onTalkEvent = vi.fn(); + const transport = createTransport({ onTalkEvent, onTranscript }); + + await transport.start(); + latestWebSocket().emitMessage( + encodeJsonFrame({ + serverContent: { + inputTranscription: { text: "hello", finished: true }, + outputTranscription: { text: "hi", finished: false }, + modelTurn: { + parts: [ + { inlineData: { data: "AAAAAA==", mimeType: "audio/pcm;rate=24000" } }, + { text: "there" }, + ], + }, + turnComplete: true, + }, + }), + ); + + await vi.waitFor(() => + expect(onTalkEvent.mock.calls.map(([event]) => event.type)).toEqual([ + "transcript.done", + "output.text.delta", + "output.audio.delta", + "output.text.done", + "turn.ended", + ]), + ); + expect(onTalkEvent.mock.calls.map(([event]) => event.turnId)).toEqual([ + "turn-1", + "turn-1", + "turn-1", + "turn-1", + "turn-1", + ]); + expect(onTranscript).toHaveBeenCalledWith({ role: "user", text: "hello", final: true }); + expect(onTranscript).toHaveBeenCalledWith({ role: "assistant", text: "hi", final: false }); + expect(onTalkEvent.mock.calls[2]?.[0]).toMatchObject({ + payload: { byteLength: 4, mimeType: "audio/pcm;rate=24000" }, + sessionId: "main:google:provider-websocket", + transport: "provider-websocket", + }); }); it("ignores late WebSocket events after stop", async () => { @@ -253,8 +316,18 @@ describe("GoogleLiveRealtimeTalkTransport", () => { listeners.add(listener); return () => listeners.delete(listener); }), - request: vi.fn(async (_method: string, params: { idempotencyKey?: string }) => { - runId = params.idempotencyKey ?? runId; + request: vi.fn(async (method: string, params: Record) => { + if (method === "chat.abort") { + expect(params).toEqual({ sessionKey: "main", runId }); + return { ok: true, aborted: true }; + } + expect(method).toBe("talk.realtime.toolCall"); + expect(params).toEqual( + expect.objectContaining({ + callId: "call-1", + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + }), + ); return { runId }; }), } as unknown as RealtimeTalkTransportContext["client"]; @@ -283,6 +356,7 @@ describe("GoogleLiveRealtimeTalkTransport", () => { } await new Promise((resolve) => setTimeout(resolve, 0)); + expect(client.request).toHaveBeenCalledWith("chat.abort", { sessionKey: "main", runId }); expect(onStatus).not.toHaveBeenCalledWith("listening"); }); }); diff --git a/ui/src/ui/realtime-talk-webrtc.test.ts b/ui/src/ui/realtime-talk-webrtc.test.ts index 32e5b1a288b..f44d514c29a 100644 --- a/ui/src/ui/realtime-talk-webrtc.test.ts +++ b/ui/src/ui/realtime-talk-webrtc.test.ts @@ -1,5 +1,6 @@ // @vitest-environment jsdom import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME } from "./chat/realtime-talk-shared.ts"; import { WebRtcSdpRealtimeTalkTransport } from "./chat/realtime-talk-webrtc.ts"; class FakeDataChannel extends EventTarget { @@ -72,7 +73,7 @@ describe("WebRtcSdpRealtimeTalkTransport", () => { const transport = new WebRtcSdpRealtimeTalkTransport( { provider: "openai", - transport: "webrtc-sdp", + transport: "webrtc", clientSecret: "client-secret-123", offerUrl: "https://api.openai.com/v1/realtime/calls", offerHeaders: { @@ -111,7 +112,7 @@ describe("WebRtcSdpRealtimeTalkTransport", () => { const transport = new WebRtcSdpRealtimeTalkTransport( { provider: "openai", - transport: "webrtc-sdp", + transport: "webrtc", clientSecret: "client-secret-123", }, { @@ -142,16 +143,17 @@ describe("WebRtcSdpRealtimeTalkTransport", () => { vi.fn(async () => new Response("answer-sdp")) as unknown as typeof fetch, ); const onStatus = vi.fn(); + const onTalkEvent = vi.fn(); const transport = new WebRtcSdpRealtimeTalkTransport( { provider: "openai", - transport: "webrtc-sdp", + transport: "webrtc", clientSecret: "client-secret-123", }, { client: {} as never, sessionKey: "main", - callbacks: { onStatus }, + callbacks: { onStatus, onTalkEvent }, }, ); @@ -170,6 +172,149 @@ describe("WebRtcSdpRealtimeTalkTransport", () => { expect(onStatus).toHaveBeenCalledWith("thinking", "Processing speech"); expect(onStatus).toHaveBeenCalledWith("thinking", "Generating response"); expect(onStatus).toHaveBeenCalledWith("listening", undefined); + expect(onTalkEvent.mock.calls.map(([event]) => event.type)).toEqual([ + "turn.started", + "input.audio.committed", + "turn.ended", + ]); + expect(onTalkEvent.mock.calls.map(([event]) => event.turnId)).toEqual([ + "turn-1", + "turn-1", + "turn-1", + ]); transport.stop(); }); + + it("emits common Talk transcript events from the OpenAI data channel", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async () => new Response("answer-sdp")) as unknown as typeof fetch, + ); + const onTranscript = vi.fn(); + const onTalkEvent = vi.fn(); + const transport = new WebRtcSdpRealtimeTalkTransport( + { + provider: "openai", + transport: "webrtc", + clientSecret: "client-secret-123", + }, + { + client: {} as never, + sessionKey: "main", + callbacks: { onTranscript, onTalkEvent }, + }, + ); + + await transport.start(); + const peer = FakePeerConnection.instances[0]; + peer?.channel.dispatchEvent( + new MessageEvent("message", { + data: JSON.stringify({ + type: "conversation.item.input_audio_transcription.completed", + item_id: "input-1", + transcript: "hello", + }), + }), + ); + peer?.channel.dispatchEvent( + new MessageEvent("message", { + data: JSON.stringify({ + type: "response.audio_transcript.done", + item_id: "response-1", + transcript: "hi there", + }), + }), + ); + + expect(onTranscript).toHaveBeenCalledWith({ role: "user", text: "hello", final: true }); + expect(onTranscript).toHaveBeenCalledWith({ + role: "assistant", + text: "hi there", + final: true, + }); + expect(onTalkEvent.mock.calls.map(([event]) => event.type)).toEqual([ + "transcript.done", + "output.text.done", + ]); + expect(onTalkEvent.mock.calls.map(([event]) => event.turnId)).toEqual(["turn-1", "turn-1"]); + expect(onTalkEvent.mock.calls[0]?.[0]).toMatchObject({ + itemId: "input-1", + payload: { role: "user", text: "hello" }, + sessionId: "main:openai:webrtc", + transport: "webrtc", + }); + expect(onTalkEvent.mock.calls[1]?.[0]).toMatchObject({ + itemId: "response-1", + payload: { text: "hi there" }, + sessionId: "main:openai:webrtc", + transport: "webrtc", + }); + transport.stop(); + }); + + it("aborts an in-flight OpenAI tool consult when the transport stops", async () => { + vi.stubGlobal( + "fetch", + vi.fn(async () => new Response("answer-sdp")) as unknown as typeof fetch, + ); + const listeners = new Set<(event: { event: string; payload?: unknown }) => void>(); + const request = vi.fn(async (method: string, params: Record) => { + if (method === "chat.abort") { + expect(params).toEqual({ sessionKey: "main", runId: "run-1" }); + return { ok: true, aborted: true }; + } + expect(method).toBe("talk.realtime.toolCall"); + expect(params).toEqual( + expect.objectContaining({ + callId: "call-1", + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + }), + ); + return { runId: "run-1" }; + }); + const transport = new WebRtcSdpRealtimeTalkTransport( + { + provider: "openai", + transport: "webrtc", + clientSecret: "client-secret-123", + }, + { + client: { + addEventListener: vi.fn( + (listener: (event: { event: string; payload?: unknown }) => void) => { + listeners.add(listener); + return () => listeners.delete(listener); + }, + ), + request, + } as never, + sessionKey: "main", + callbacks: {}, + }, + ); + + await transport.start(); + const peer = FakePeerConnection.instances[0]; + peer?.channel.dispatchEvent( + new MessageEvent("message", { + data: JSON.stringify({ + type: "response.function_call_arguments.done", + item_id: "item-1", + call_id: "call-1", + name: REALTIME_VOICE_AGENT_CONSULT_TOOL_NAME, + arguments: JSON.stringify({ question: "status?" }), + }), + }), + ); + await vi.waitFor(() => + expect(request).toHaveBeenCalledWith("talk.realtime.toolCall", expect.anything()), + ); + + transport.stop(); + + await vi.waitFor(() => + expect(request).toHaveBeenCalledWith("chat.abort", { sessionKey: "main", runId: "run-1" }), + ); + expect(listeners.size).toBe(0); + }); }); diff --git a/ui/src/ui/realtime-talk.test.ts b/ui/src/ui/realtime-talk.test.ts index b2a15f12a58..c8fbf8e040d 100644 --- a/ui/src/ui/realtime-talk.test.ts +++ b/ui/src/ui/realtime-talk.test.ts @@ -59,7 +59,7 @@ describe("RealtimeTalkSession", () => { it("starts the Google Live WebSocket transport from a generic session result", async () => { const request = vi.fn(async () => ({ provider: "google", - transport: "json-pcm-websocket", + transport: "provider-websocket", protocol: "google-live-bidi", clientSecret: "auth_tokens/session", websocketUrl: "wss://example.test/live", @@ -83,11 +83,40 @@ describe("RealtimeTalkSession", () => { expect(onStatus).toHaveBeenCalledWith("connecting"); }); - it("keeps Google Live WebSocket sessions off the WebRTC fallback when transport is omitted", async () => { + it("defaults legacy session results without an explicit transport to WebRTC", async () => { const request = vi.fn(async () => ({ - provider: "google", - protocol: "google-live-bidi", + provider: "openai", clientSecret: "auth_tokens/session", + })); + const session = new RealtimeTalkSession({ request } as never, "main"); + + await session.start(); + + expect(webRtcCtor).toHaveBeenCalledTimes(1); + expect(webRtcStart).toHaveBeenCalledTimes(1); + expect(googleCtor).not.toHaveBeenCalled(); + }); + + it("accepts legacy WebRTC transport names", async () => { + const request = vi.fn(async () => ({ + provider: "openai", + transport: "webrtc-sdp", + clientSecret: "secret", + })); + const session = new RealtimeTalkSession({ request } as never, "main"); + + await session.start(); + + expect(webRtcCtor).toHaveBeenCalledTimes(1); + expect(googleCtor).not.toHaveBeenCalled(); + }); + + it("accepts legacy provider WebSocket transport names", async () => { + const request = vi.fn(async () => ({ + provider: "example", + transport: "json-pcm-websocket", + clientSecret: "secret", + protocol: "google-live-bidi", websocketUrl: "wss://example.test/live", audio: { inputEncoding: "pcm16", @@ -100,38 +129,8 @@ describe("RealtimeTalkSession", () => { await session.start(); + expect(webRtcCtor).not.toHaveBeenCalled(); expect(googleCtor).toHaveBeenCalledTimes(1); - expect(googleStart).toHaveBeenCalledTimes(1); - expect(webRtcCtor).not.toHaveBeenCalled(); - }); - - it("does not treat ambiguous Google sessions as browser WebRTC sessions", async () => { - const request = vi.fn(async () => ({ - provider: "google", - clientSecret: "secret", - })); - const session = new RealtimeTalkSession({ request } as never, "main"); - - await expect(session.start()).rejects.toThrow( - 'Realtime voice provider "google" does not support browser WebRTC sessions. Control UI Talk can use Google through the gateway relay or a Google Live WebSocket session instead. Restart the gateway so it returns "gateway-relay" or "json-pcm-websocket", or switch Talk realtime to a WebRTC-capable provider such as OpenAI.', - ); - - expect(webRtcCtor).not.toHaveBeenCalled(); - expect(googleCtor).not.toHaveBeenCalled(); - }); - - it("does not infer Google Live transport from websocketUrl on non-Google sessions", async () => { - const request = vi.fn(async () => ({ - provider: "example", - clientSecret: "secret", - websocketUrl: "wss://example.test/live", - })); - const session = new RealtimeTalkSession({ request } as never, "main"); - - await session.start(); - - expect(webRtcCtor).toHaveBeenCalledTimes(1); - expect(googleCtor).not.toHaveBeenCalled(); }); it("starts the Gateway relay transport for backend-only realtime providers", async () => { @@ -158,9 +157,10 @@ describe("RealtimeTalkSession", () => { expect(webRtcCtor).not.toHaveBeenCalled(); }); - it("keeps legacy session results on the OpenAI-style WebRTC transport", async () => { + it("starts the WebRTC transport for canonical WebRTC sessions", async () => { const request = vi.fn(async () => ({ provider: "openai", + transport: "webrtc", clientSecret: "secret", })); const session = new RealtimeTalkSession({ request } as never, "main");