diff --git a/extensions/voice-call/src/media-stream.test.ts b/extensions/voice-call/src/media-stream.test.ts index 1874bec83f5..03c9540af9e 100644 --- a/extensions/voice-call/src/media-stream.test.ts +++ b/extensions/voice-call/src/media-stream.test.ts @@ -1,5 +1,3 @@ -import { once } from "node:events"; -import http from "node:http"; import type { RealtimeTranscriptionProviderPlugin, RealtimeTranscriptionSession, @@ -7,6 +5,12 @@ import type { import { describe, expect, it, vi } from "vitest"; import { WebSocket } from "ws"; import { MediaStreamHandler, sanitizeLogText } from "./media-stream.js"; +import { + connectWs, + startUpgradeWsServer, + waitForClose, + withTimeout, +} from "./websocket-test-support.js"; const createStubSession = (): RealtimeTranscriptionSession => ({ connect: async () => {}, @@ -36,69 +40,18 @@ const waitForAbort = (signal: AbortSignal): Promise => signal.addEventListener("abort", () => resolve(), { once: true }); }); -const withTimeout = async (promise: Promise, timeoutMs = 2000): Promise => { - let timer: ReturnType | null = null; - const timeout = new Promise((_, reject) => { - timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs); - }); - - try { - return await Promise.race([promise, timeout]); - } finally { - if (timer) { - clearTimeout(timer); - } - } -}; - const startWsServer = async ( handler: MediaStreamHandler, ): Promise<{ url: string; close: () => Promise; -}> => { - const server = http.createServer(); - server.on("upgrade", (request, socket, head) => { - handler.handleUpgrade(request, socket, head); - }); - - await new Promise((resolve) => { - server.listen(0, "127.0.0.1", resolve); - }); - - const address = server.address(); - if (!address || typeof address === "string") { - throw new Error("Failed to resolve test server address"); - } - - return { - url: `ws://127.0.0.1:${address.port}/voice/stream`, - close: async () => { - await new Promise((resolve, reject) => { - server.close((err) => (err ? reject(err) : resolve())); - }); +}> => + startUpgradeWsServer({ + urlPath: "/voice/stream", + onUpgrade: (request, socket, head) => { + handler.handleUpgrade(request, socket, head); }, - }; -}; - -const connectWs = async (url: string): Promise => { - const ws = new WebSocket(url); - await withTimeout(once(ws, "open") as Promise<[unknown]>); - return ws; -}; - -const waitForClose = async ( - ws: WebSocket, -): Promise<{ - code: number; - reason: string; -}> => { - const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? []; - return { - code, - reason: Buffer.isBuffer(reason) ? reason.toString() : String(reason || ""), - }; -}; + }); describe("MediaStreamHandler TTS queue", () => { it("serializes TTS playback and resolves in order", async () => { diff --git a/extensions/voice-call/src/webhook/realtime-handler.test.ts b/extensions/voice-call/src/webhook/realtime-handler.test.ts index 36662b678fd..7593992843c 100644 --- a/extensions/voice-call/src/webhook/realtime-handler.test.ts +++ b/extensions/voice-call/src/webhook/realtime-handler.test.ts @@ -1,4 +1,3 @@ -import { once } from "node:events"; import http from "node:http"; import type { RealtimeVoiceBridge, @@ -9,6 +8,7 @@ import { WebSocket } from "ws"; import type { VoiceCallRealtimeConfig } from "../config.js"; import type { CallManager } from "../manager.js"; import type { VoiceCallProvider } from "../providers/base.js"; +import { connectWs, startUpgradeWsServer, waitForClose } from "../websocket-test-support.js"; import { RealtimeCallHandler } from "./realtime-handler.js"; function makeRequest(url: string, host = "gateway.ts.net"): http.IncomingMessage { @@ -83,21 +83,6 @@ function makeHandler( ); } -const withTimeout = async (promise: Promise, timeoutMs = 2000): Promise => { - let timer: ReturnType | null = null; - const timeout = new Promise((_, reject) => { - timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs); - }); - - try { - return await Promise.race([promise, timeout]); - } finally { - if (timer) { - clearTimeout(timer); - } - } -}; - const startRealtimeServer = async ( handler: RealtimeCallHandler, ): Promise<{ @@ -110,47 +95,12 @@ const startRealtimeServer = async ( throw new Error("Failed to extract realtime stream path"); } - const server = http.createServer(); - server.on("upgrade", (request, socket, head) => { - handler.handleWebSocketUpgrade(request, socket, head); - }); - - await new Promise((resolve) => { - server.listen(0, "127.0.0.1", resolve); - }); - - const address = server.address(); - if (!address || typeof address === "string") { - throw new Error("Failed to resolve test server address"); - } - - return { - url: `ws://127.0.0.1:${address.port}${match[1]}`, - close: async () => { - await new Promise((resolve, reject) => { - server.close((err) => (err ? reject(err) : resolve())); - }); + return await startUpgradeWsServer({ + urlPath: match[1], + onUpgrade: (request, socket, head) => { + handler.handleWebSocketUpgrade(request, socket, head); }, - }; -}; - -const connectWs = async (url: string): Promise => { - const ws = new WebSocket(url); - await withTimeout(once(ws, "open") as Promise<[unknown]>); - return ws; -}; - -const waitForClose = async ( - ws: WebSocket, -): Promise<{ - code: number; - reason: string; -}> => { - const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? []; - return { - code, - reason: Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || ""), - }; + }); }; describe("RealtimeCallHandler path routing", () => { diff --git a/extensions/voice-call/src/websocket-test-support.ts b/extensions/voice-call/src/websocket-test-support.ts new file mode 100644 index 00000000000..dd28d13a74a --- /dev/null +++ b/extensions/voice-call/src/websocket-test-support.ts @@ -0,0 +1,72 @@ +import { once } from "node:events"; +import http from "node:http"; +import { WebSocket } from "ws"; + +export const withTimeout = async (promise: Promise, timeoutMs = 2000): Promise => { + let timer: ReturnType | null = null; + const timeout = new Promise((_, reject) => { + timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs); + }); + + try { + return await Promise.race([promise, timeout]); + } finally { + if (timer) { + clearTimeout(timer); + } + } +}; + +export const startUpgradeWsServer = async (params: { + urlPath: string; + onUpgrade: ( + request: http.IncomingMessage, + socket: Parameters[2], + head: Buffer, + ) => void; +}): Promise<{ + url: string; + close: () => Promise; +}> => { + const server = http.createServer(); + server.on("upgrade", (request, socket, head) => { + params.onUpgrade(request, socket, head); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", resolve); + }); + + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("Failed to resolve test server address"); + } + + return { + url: `ws://127.0.0.1:${address.port}${params.urlPath}`, + close: async () => { + await new Promise((resolve, reject) => { + server.close((err) => (err ? reject(err) : resolve())); + }); + }, + }; +}; + +export const connectWs = async (url: string): Promise => { + const ws = new WebSocket(url); + await withTimeout(once(ws, "open") as Promise<[unknown]>); + return ws; +}; + +export const waitForClose = async ( + ws: WebSocket, +): Promise<{ + code: number; + reason: string; +}> => { + const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? []; + return { + code, + reason: Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || ""), + }; +};