From 6640d57b647eb7189b44bf3fc44e47b7efabb013 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 1 Jun 2026 09:21:43 +0200 Subject: [PATCH] refactor: share websocket connection test harness --- .../server/ws-connection.startup.test.ts | 83 +++------ .../server/ws-connection.test-helpers.ts | 129 ++++++++++++++ src/gateway/server/ws-connection.test.ts | 160 +++--------------- 3 files changed, 175 insertions(+), 197 deletions(-) create mode 100644 src/gateway/server/ws-connection.test-helpers.ts diff --git a/src/gateway/server/ws-connection.startup.test.ts b/src/gateway/server/ws-connection.startup.test.ts index 72de53c0a1c..1e23c0d9de9 100644 --- a/src/gateway/server/ws-connection.startup.test.ts +++ b/src/gateway/server/ws-connection.startup.test.ts @@ -1,6 +1,4 @@ -import { EventEmitter } from "node:events"; import { describe, expect, it, vi } from "vitest"; -import type { WebSocketServer } from "ws"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES, @@ -13,75 +11,34 @@ import { GATEWAY_STARTUP_UNAVAILABLE_REASON, } from "../../../packages/gateway-protocol/src/startup-unavailable.js"; import { attachGatewayWsConnectionHandler } from "./ws-connection.js"; - -function createLogger() { - return { - debug: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }; -} - -function createRequestContext() { - return { - unsubscribeAllSessionEvents: vi.fn(), - nodeRegistry: { unregister: vi.fn() }, - nodeUnsubscribeAll: vi.fn(), - }; -} +import { + attachGatewayWsForTest, + createGatewayWsTestLogger, + createGatewayWsTestRequestContext, + createGatewayWsTestSocket, +} from "./ws-connection.test-helpers.js"; describe("attachGatewayWsConnectionHandler startup readiness", () => { it("returns a retryable startup-unavailable connect response while sidecars are pending", async () => { - const listeners = new Map void>(); - const wss = { - on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { - listeners.set(event, handler); - }), - } as unknown as WebSocketServer; const sent: unknown[] = []; - const socket = Object.assign(new EventEmitter(), { - _socket: { - remoteAddress: "127.0.0.1", - remotePort: 1234, - localAddress: "127.0.0.1", - localPort: 5678, - }, - send: vi.fn((data: string, cb?: (err?: Error) => void) => { + const socket = createGatewayWsTestSocket({ + closeEmits: true, + onSend: (data) => { sent.push(JSON.parse(data)); - cb?.(); - }), - close: vi.fn((code?: number, reason?: string) => { - socket.emit("close", code ?? 1000, Buffer.from(reason ?? "")); - }), + }, }); - const upgradeReq = { - headers: { host: "127.0.0.1:19001" }, - socket: { localAddress: "127.0.0.1" }, - }; - const logWsControl = createLogger(); + const logWsControl = createGatewayWsTestLogger(); - attachGatewayWsConnectionHandler({ - wss, - clients: new Set(), - preauthConnectionBudget: { release: vi.fn() } as never, - port: 19001, - resolvedAuth: { mode: "none", allowTailscale: false }, - isStartupPending: () => true, - gatewayMethods: [], - events: [], - refreshHealthSnapshot: vi.fn(async () => ({}) as never), - logGateway: createLogger() as never, - logHealth: createLogger() as never, - logWsControl: logWsControl as never, - extraHandlers: {}, - broadcast: vi.fn(), - buildRequestContext: () => createRequestContext() as never, + attachGatewayWsForTest({ + attach: attachGatewayWsConnectionHandler, + socket, + options: { + resolvedAuth: { mode: "none", allowTailscale: false }, + isStartupPending: () => true, + logWsControl: logWsControl as never, + buildRequestContext: () => createGatewayWsTestRequestContext() as never, + }, }); - - const onConnection = listeners.get("connection"); - expect(onConnection).toBeTypeOf("function"); - onConnection?.(socket, upgradeReq); socket.emit( "message", JSON.stringify({ diff --git a/src/gateway/server/ws-connection.test-helpers.ts b/src/gateway/server/ws-connection.test-helpers.ts new file mode 100644 index 00000000000..b7ebb18213c --- /dev/null +++ b/src/gateway/server/ws-connection.test-helpers.ts @@ -0,0 +1,129 @@ +import { EventEmitter } from "node:events"; +import { expect, vi } from "vitest"; +import type { WebSocketServer } from "ws"; +import type { ResolvedGatewayAuth } from "../auth.js"; +import type { attachGatewayWsConnectionHandler } from "./ws-connection.js"; + +type AttachGatewayWsConnectionParams = Parameters[0]; + +export type GatewayWsTestSocket = EventEmitter & { + _socket: { + remoteAddress: string; + remotePort: number; + localAddress: string; + localPort: number; + }; + send: ReturnType; + ping?: ReturnType; + close: ReturnType; +}; + +export function createGatewayWsTestLogger() { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; +} + +export function createResolvedGatewayTokenAuth(token: string): ResolvedGatewayAuth { + return { + mode: "token", + allowTailscale: false, + token, + }; +} + +export function createGatewayWsTestRequestContext( + overrides: { + nodeRegistry?: { unregister: ReturnType }; + } = {}, +) { + return { + unsubscribeAllSessionEvents: vi.fn(), + nodeRegistry: overrides.nodeRegistry ?? { unregister: vi.fn() }, + nodeUnsubscribeAll: vi.fn(), + }; +} + +export function createGatewayWsTestSocket( + params: { + closeEmits?: boolean; + onSend?: (data: string) => void; + ping?: boolean; + } = {}, +): GatewayWsTestSocket { + const socket = Object.assign(new EventEmitter(), { + _socket: { + remoteAddress: "127.0.0.1", + remotePort: 1234, + localAddress: "127.0.0.1", + localPort: 5678, + }, + send: vi.fn((data: string, cb?: (err?: Error) => void) => { + params.onSend?.(data); + cb?.(); + }), + ...(params.ping ? { ping: vi.fn() } : {}), + close: vi.fn((code?: number, reason?: string) => { + if (params.closeEmits) { + socket.emit("close", code ?? 1000, Buffer.from(reason ?? "")); + } + }), + }); + return socket; +} + +export function attachGatewayWsForTest(params: { + attach: typeof attachGatewayWsConnectionHandler; + clients?: Set; + headers?: Record; + host?: string; + options?: Partial; + socket?: GatewayWsTestSocket; +}) { + const listeners = new Map void>(); + const wss = { + on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + listeners.set(event, handler); + }), + } as unknown as WebSocketServer; + const socket = params.socket ?? createGatewayWsTestSocket(); + const upgradeReq = { + headers: { host: params.host ?? "127.0.0.1:19001", ...params.headers }, + socket: { localAddress: "127.0.0.1" }, + }; + const clients = params.clients ?? new Set(); + + params.attach({ + wss, + clients: clients as never, + preauthConnectionBudget: { release: vi.fn() } as never, + port: 19001, + resolvedAuth: createResolvedGatewayTokenAuth("token"), + preauthHandshakeTimeoutMs: 60_000, + gatewayMethods: [], + events: [], + refreshHealthSnapshot: vi.fn(async () => ({}) as never), + logGateway: createGatewayWsTestLogger() as never, + logHealth: createGatewayWsTestLogger() as never, + logWsControl: createGatewayWsTestLogger() as never, + extraHandlers: {}, + broadcast: vi.fn(), + buildRequestContext: () => createGatewayWsTestRequestContext() as never, + ...params.options, + }); + + const onConnection = listeners.get("connection"); + expect(onConnection).toBeTypeOf("function"); + onConnection?.(socket, upgradeReq); + + return { + clients, + listeners, + socket, + upgradeReq, + wss, + }; +} diff --git a/src/gateway/server/ws-connection.test.ts b/src/gateway/server/ws-connection.test.ts index 7e583c568ae..ee62f7a16cf 100644 --- a/src/gateway/server/ws-connection.test.ts +++ b/src/gateway/server/ws-connection.test.ts @@ -1,7 +1,12 @@ -import { EventEmitter } from "node:events"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { WebSocketServer } from "ws"; import type { ResolvedGatewayAuth } from "../auth.js"; +import { + attachGatewayWsForTest, + createGatewayWsTestRequestContext, + createGatewayWsTestSocket, + createResolvedGatewayTokenAuth, + type GatewayWsTestSocket, +} from "./ws-connection.test-helpers.js"; const { attachGatewayWsMessageHandlerMock, broadcastPresenceSnapshotMock, upsertPresenceMock } = vi.hoisted(() => ({ @@ -23,23 +28,6 @@ vi.mock("./presence-events.js", () => ({ import { attachGatewayWsConnectionHandler } from "./ws-connection.js"; import { resolveSharedGatewaySessionGeneration } from "./ws-shared-generation.js"; -function createLogger() { - return { - debug: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }; -} - -function createResolvedAuth(token: string): ResolvedGatewayAuth { - return { - mode: "token", - allowTailscale: false, - token, - }; -} - async function waitForLazyMessageHandler() { await vi.dynamicImportSettled(); } @@ -48,86 +36,28 @@ function firstAttachedHandlerParams(): unknown { return attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0]; } -type TestSocket = EventEmitter & { - _socket: { - remoteAddress: string; - remotePort: number; - localAddress: string; - localPort: number; - }; - send: ReturnType; - ping?: ReturnType; - close: ReturnType; -}; - -function createTestSocket(params: { ping?: boolean } = {}): TestSocket { - return Object.assign(new EventEmitter(), { - _socket: { - remoteAddress: "127.0.0.1", - remotePort: 1234, - localAddress: "127.0.0.1", - localPort: 5678, - }, - send: vi.fn(), - ...(params.ping ? { ping: vi.fn() } : {}), - close: vi.fn(), - }); -} - async function connectTestWs( params: { host?: string; headers?: Record; - socket?: TestSocket; + socket?: GatewayWsTestSocket; clients?: Set; options?: Partial[0]>; } = {}, ) { - const listeners = new Map void>(); - const wss = { - on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { - listeners.set(event, handler); - }), - } as unknown as WebSocketServer; - const socket = params.socket ?? createTestSocket(); - const upgradeReq = { - headers: { host: params.host ?? "127.0.0.1:19001", ...params.headers }, - socket: { localAddress: "127.0.0.1" }, - }; - const clients = params.clients ?? new Set(); - - attachGatewayWsConnectionHandler({ - wss, - clients: clients as never, - preauthConnectionBudget: { release: vi.fn() } as never, - port: 19001, - resolvedAuth: createResolvedAuth("token"), - preauthHandshakeTimeoutMs: 60_000, - gatewayMethods: [], - events: [], - refreshHealthSnapshot: vi.fn(async () => ({}) as never), - logGateway: createLogger() as never, - logHealth: createLogger() as never, - logWsControl: createLogger() as never, - extraHandlers: {}, - broadcast: vi.fn(), - buildRequestContext: () => - ({ - unsubscribeAllSessionEvents: vi.fn(), - nodeRegistry: { unregister: vi.fn() }, - nodeUnsubscribeAll: vi.fn(), - }) as never, - ...params.options, + const connected = attachGatewayWsForTest({ + attach: attachGatewayWsConnectionHandler, + clients: params.clients, + headers: params.headers, + host: params.host, + options: params.options, + socket: params.socket, }); - - const onConnection = listeners.get("connection"); - expect(onConnection).toBeTypeOf("function"); - onConnection?.(socket, upgradeReq); await waitForLazyMessageHandler(); return { - clients, - socket, + clients: connected.clients, + socket: connected.socket, passed: firstAttachedHandlerParams(), }; } @@ -144,7 +74,7 @@ describe("attachGatewayWsConnectionHandler", () => { }); it("threads current auth getters into the handshake handler instead of a stale snapshot", async () => { - const initialAuth = createResolvedAuth("token-before"); + const initialAuth = createResolvedGatewayTokenAuth("token-before"); let currentAuth = initialAuth; const { passed } = await connectTestWs({ @@ -160,7 +90,7 @@ describe("attachGatewayWsConnectionHandler", () => { getRequiredSharedGatewaySessionGeneration?: () => string | undefined; }; - currentAuth = createResolvedAuth("token-after"); + currentAuth = createResolvedGatewayTokenAuth("token-after"); expect(handlerParams.getResolvedAuth().token).toBe("token-after"); expect(handlerParams.getRequiredSharedGatewaySessionGeneration?.()).toBe( @@ -228,7 +158,7 @@ describe("attachGatewayWsConnectionHandler", () => { it("sends protocol pings until the connection closes", async () => { vi.useFakeTimers(); - const socket = createTestSocket({ ping: true }); + const socket = createGatewayWsTestSocket({ ping: true }); const { passed } = await connectTestWs({ socket }); const handlerParams = passed as { setClient: (client: unknown) => boolean; @@ -251,53 +181,15 @@ describe("attachGatewayWsConnectionHandler", () => { }); it("skips node presence disconnects for stale reconnected sockets", async () => { - const listeners = new Map void>(); const unregister = vi.fn(() => null); - const wss = { - on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { - listeners.set(event, handler); - }), - } as unknown as WebSocketServer; - const socket = Object.assign(new EventEmitter(), { - _socket: { - remoteAddress: "127.0.0.1", - remotePort: 1234, - localAddress: "127.0.0.1", - localPort: 5678, + const { socket } = attachGatewayWsForTest({ + attach: attachGatewayWsConnectionHandler, + options: { + refreshHealthSnapshot: vi.fn(), + buildRequestContext: () => + createGatewayWsTestRequestContext({ nodeRegistry: { unregister } }) as never, }, - send: vi.fn(), - close: vi.fn(), }); - const upgradeReq = { - headers: { host: "127.0.0.1:19001" }, - socket: { localAddress: "127.0.0.1" }, - }; - - attachGatewayWsConnectionHandler({ - wss, - clients: new Set(), - preauthConnectionBudget: { release: vi.fn() } as never, - port: 19001, - resolvedAuth: createResolvedAuth("token"), - gatewayMethods: [], - events: [], - refreshHealthSnapshot: vi.fn(), - logGateway: createLogger() as never, - logHealth: createLogger() as never, - logWsControl: createLogger() as never, - extraHandlers: {}, - broadcast: vi.fn(), - buildRequestContext: () => - ({ - unsubscribeAllSessionEvents: vi.fn(), - nodeRegistry: { unregister }, - nodeUnsubscribeAll: vi.fn(), - }) as never, - }); - - const onConnection = listeners.get("connection"); - expect(onConnection).toBeTypeOf("function"); - onConnection?.(socket, upgradeReq); await waitForLazyMessageHandler(); const passed = firstAttachedHandlerParams() as {