mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 09:10:45 +00:00
test(voice-call): share websocket test helpers
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
import { once } from "node:events";
|
||||
import http from "node:http";
|
||||
import type {
|
||||
RealtimeTranscriptionProviderPlugin,
|
||||
RealtimeTranscriptionSession,
|
||||
@@ -7,6 +5,12 @@ import type {
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { WebSocket } from "ws";
|
||||
import { MediaStreamHandler, sanitizeLogText } from "./media-stream.js";
|
||||
import {
|
||||
connectWs,
|
||||
startUpgradeWsServer,
|
||||
waitForClose,
|
||||
withTimeout,
|
||||
} from "./websocket-test-support.js";
|
||||
|
||||
const createStubSession = (): RealtimeTranscriptionSession => ({
|
||||
connect: async () => {},
|
||||
@@ -36,69 +40,18 @@ const waitForAbort = (signal: AbortSignal): Promise<void> =>
|
||||
signal.addEventListener("abort", () => resolve(), { once: true });
|
||||
});
|
||||
|
||||
const withTimeout = async <T>(promise: Promise<T>, timeoutMs = 2000): Promise<T> => {
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs);
|
||||
});
|
||||
|
||||
try {
|
||||
return await Promise.race([promise, timeout]);
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const startWsServer = async (
|
||||
handler: MediaStreamHandler,
|
||||
): Promise<{
|
||||
url: string;
|
||||
close: () => Promise<void>;
|
||||
}> => {
|
||||
const server = http.createServer();
|
||||
server.on("upgrade", (request, socket, head) => {
|
||||
handler.handleUpgrade(request, socket, head);
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
server.listen(0, "127.0.0.1", resolve);
|
||||
});
|
||||
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("Failed to resolve test server address");
|
||||
}
|
||||
|
||||
return {
|
||||
url: `ws://127.0.0.1:${address.port}/voice/stream`,
|
||||
close: async () => {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((err) => (err ? reject(err) : resolve()));
|
||||
});
|
||||
}> =>
|
||||
startUpgradeWsServer({
|
||||
urlPath: "/voice/stream",
|
||||
onUpgrade: (request, socket, head) => {
|
||||
handler.handleUpgrade(request, socket, head);
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const connectWs = async (url: string): Promise<WebSocket> => {
|
||||
const ws = new WebSocket(url);
|
||||
await withTimeout(once(ws, "open") as Promise<[unknown]>);
|
||||
return ws;
|
||||
};
|
||||
|
||||
const waitForClose = async (
|
||||
ws: WebSocket,
|
||||
): Promise<{
|
||||
code: number;
|
||||
reason: string;
|
||||
}> => {
|
||||
const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? [];
|
||||
return {
|
||||
code,
|
||||
reason: Buffer.isBuffer(reason) ? reason.toString() : String(reason || ""),
|
||||
};
|
||||
};
|
||||
});
|
||||
|
||||
describe("MediaStreamHandler TTS queue", () => {
|
||||
it("serializes TTS playback and resolves in order", async () => {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import { once } from "node:events";
|
||||
import http from "node:http";
|
||||
import type {
|
||||
RealtimeVoiceBridge,
|
||||
@@ -9,6 +8,7 @@ import { WebSocket } from "ws";
|
||||
import type { VoiceCallRealtimeConfig } from "../config.js";
|
||||
import type { CallManager } from "../manager.js";
|
||||
import type { VoiceCallProvider } from "../providers/base.js";
|
||||
import { connectWs, startUpgradeWsServer, waitForClose } from "../websocket-test-support.js";
|
||||
import { RealtimeCallHandler } from "./realtime-handler.js";
|
||||
|
||||
function makeRequest(url: string, host = "gateway.ts.net"): http.IncomingMessage {
|
||||
@@ -83,21 +83,6 @@ function makeHandler(
|
||||
);
|
||||
}
|
||||
|
||||
const withTimeout = async <T>(promise: Promise<T>, timeoutMs = 2000): Promise<T> => {
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs);
|
||||
});
|
||||
|
||||
try {
|
||||
return await Promise.race([promise, timeout]);
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const startRealtimeServer = async (
|
||||
handler: RealtimeCallHandler,
|
||||
): Promise<{
|
||||
@@ -110,47 +95,12 @@ const startRealtimeServer = async (
|
||||
throw new Error("Failed to extract realtime stream path");
|
||||
}
|
||||
|
||||
const server = http.createServer();
|
||||
server.on("upgrade", (request, socket, head) => {
|
||||
handler.handleWebSocketUpgrade(request, socket, head);
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
server.listen(0, "127.0.0.1", resolve);
|
||||
});
|
||||
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("Failed to resolve test server address");
|
||||
}
|
||||
|
||||
return {
|
||||
url: `ws://127.0.0.1:${address.port}${match[1]}`,
|
||||
close: async () => {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((err) => (err ? reject(err) : resolve()));
|
||||
});
|
||||
return await startUpgradeWsServer({
|
||||
urlPath: match[1],
|
||||
onUpgrade: (request, socket, head) => {
|
||||
handler.handleWebSocketUpgrade(request, socket, head);
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
const connectWs = async (url: string): Promise<WebSocket> => {
|
||||
const ws = new WebSocket(url);
|
||||
await withTimeout(once(ws, "open") as Promise<[unknown]>);
|
||||
return ws;
|
||||
};
|
||||
|
||||
const waitForClose = async (
|
||||
ws: WebSocket,
|
||||
): Promise<{
|
||||
code: number;
|
||||
reason: string;
|
||||
}> => {
|
||||
const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? [];
|
||||
return {
|
||||
code,
|
||||
reason: Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || ""),
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
describe("RealtimeCallHandler path routing", () => {
|
||||
|
||||
72
extensions/voice-call/src/websocket-test-support.ts
Normal file
72
extensions/voice-call/src/websocket-test-support.ts
Normal file
@@ -0,0 +1,72 @@
|
||||
import { once } from "node:events";
|
||||
import http from "node:http";
|
||||
import { WebSocket } from "ws";
|
||||
|
||||
export const withTimeout = async <T>(promise: Promise<T>, timeoutMs = 2000): Promise<T> => {
|
||||
let timer: ReturnType<typeof setTimeout> | null = null;
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error(`Timed out after ${timeoutMs}ms`)), timeoutMs);
|
||||
});
|
||||
|
||||
try {
|
||||
return await Promise.race([promise, timeout]);
|
||||
} finally {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export const startUpgradeWsServer = async (params: {
|
||||
urlPath: string;
|
||||
onUpgrade: (
|
||||
request: http.IncomingMessage,
|
||||
socket: Parameters<http.Server["emit"]>[2],
|
||||
head: Buffer,
|
||||
) => void;
|
||||
}): Promise<{
|
||||
url: string;
|
||||
close: () => Promise<void>;
|
||||
}> => {
|
||||
const server = http.createServer();
|
||||
server.on("upgrade", (request, socket, head) => {
|
||||
params.onUpgrade(request, socket, head);
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
server.listen(0, "127.0.0.1", resolve);
|
||||
});
|
||||
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("Failed to resolve test server address");
|
||||
}
|
||||
|
||||
return {
|
||||
url: `ws://127.0.0.1:${address.port}${params.urlPath}`,
|
||||
close: async () => {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((err) => (err ? reject(err) : resolve()));
|
||||
});
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
export const connectWs = async (url: string): Promise<WebSocket> => {
|
||||
const ws = new WebSocket(url);
|
||||
await withTimeout(once(ws, "open") as Promise<[unknown]>);
|
||||
return ws;
|
||||
};
|
||||
|
||||
export const waitForClose = async (
|
||||
ws: WebSocket,
|
||||
): Promise<{
|
||||
code: number;
|
||||
reason: string;
|
||||
}> => {
|
||||
const [code, reason] = (await withTimeout(once(ws, "close") as Promise<[number, Buffer]>)) ?? [];
|
||||
return {
|
||||
code,
|
||||
reason: Buffer.isBuffer(reason) ? reason.toString("utf8") : String(reason || ""),
|
||||
};
|
||||
};
|
||||
Reference in New Issue
Block a user