diff --git a/scripts/dev/gateway-ws-client.ts b/scripts/dev/gateway-ws-client.ts index 4070399d33f..2e0a5f21e2d 100644 --- a/scripts/dev/gateway-ws-client.ts +++ b/scripts/dev/gateway-ws-client.ts @@ -65,8 +65,20 @@ export function createGatewayWsClient(params: { } >(); + const rejectPending = (error: Error) => { + for (const waiter of pending.values()) { + clearTimeout(waiter.timeout); + waiter.reject(error); + } + pending.clear(); + }; + const request = (method: string, paramsObj?: unknown, timeoutMs = 12_000) => new Promise((resolve, reject) => { + if (ws.readyState !== WebSocket.OPEN) { + reject(new Error(`gateway websocket is not open for ${method}`)); + return; + } const id = randomUUID(); const frame: GatewayReqFrame = { type: "req", id, method, params: paramsObj }; const timeout = setTimeout(() => { @@ -74,7 +86,24 @@ export function createGatewayWsClient(params: { reject(new Error(`timeout waiting for ${method}`)); }, timeoutMs); pending.set(id, { resolve, reject, timeout }); - ws.send(JSON.stringify(frame)); + try { + ws.send(JSON.stringify(frame), (err) => { + if (!err) { + return; + } + const waiter = pending.get(id); + if (!waiter) { + return; + } + pending.delete(id); + clearTimeout(waiter.timeout); + waiter.reject(err instanceof Error ? err : new Error(String(err))); + }); + } catch (err) { + pending.delete(id); + clearTimeout(timeout); + reject(err instanceof Error ? err : new Error(String(err))); + } }); const waitOpen = () => @@ -119,12 +148,16 @@ export function createGatewayWsClient(params: { params.onEvent?.(evt); } }); + ws.on("close", (code, reason) => { + const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : ""; + rejectPending(new Error(`gateway websocket closed (${code})${suffix}`)); + }); + ws.on("error", (err) => { + rejectPending(err instanceof Error ? err : new Error(String(err))); + }); const close = () => { - for (const waiter of pending.values()) { - clearTimeout(waiter.timeout); - } - pending.clear(); + rejectPending(new Error("gateway websocket client closed")); ws.close(); }; diff --git a/test/scripts/gateway-ws-client.test.ts b/test/scripts/gateway-ws-client.test.ts new file mode 100644 index 00000000000..353219bb9cc --- /dev/null +++ b/test/scripts/gateway-ws-client.test.ts @@ -0,0 +1,73 @@ +import { createServer, type Server } from "node:http"; +import { afterEach, describe, expect, it } from "vitest"; +import { WebSocketServer, type WebSocket } from "ws"; +import { createGatewayWsClient } from "../../scripts/dev/gateway-ws-client.js"; + +let server: Server | undefined; +let wss: WebSocketServer | undefined; + +afterEach(async () => { + await new Promise((resolve) => { + wss?.close(() => resolve()); + if (!wss) { + resolve(); + } + }); + wss = undefined; + + await new Promise((resolve, reject) => { + server?.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + if (!server) { + resolve(); + } + }); + server = undefined; +}); + +async function listen(handler: (ws: WebSocket) => void): Promise { + server = createServer(); + wss = new WebSocketServer({ server }); + wss.on("connection", handler); + await new Promise((resolve) => { + server?.listen(0, "127.0.0.1", resolve); + }); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("test websocket server did not get a TCP address"); + } + return `ws://127.0.0.1:${address.port}`; +} + +describe("createGatewayWsClient", () => { + it("rejects pending RPC requests when the client closes", async () => { + const url = await listen(() => {}); + const client = createGatewayWsClient({ url }); + await client.waitOpen(); + + const pending = client.request("health", {}, 1000); + client.close(); + + await expect(pending).rejects.toThrow("gateway websocket client closed"); + }); + + it("rejects pending RPC requests when the gateway closes the socket", async () => { + const url = await listen((ws) => { + ws.on("message", () => { + ws.close(1011, "boom"); + }); + }); + const client = createGatewayWsClient({ url }); + await client.waitOpen(); + + await expect(client.request("health", {}, 1000)).rejects.toThrow( + "gateway websocket closed (1011): boom", + ); + client.close(); + }); +});