mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-06 11:52:55 +00:00
refactor: share websocket connection test harness
This commit is contained in:
@@ -1,6 +1,4 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { WebSocketServer } from "ws";
|
||||
import {
|
||||
GATEWAY_CLIENT_MODES,
|
||||
GATEWAY_CLIENT_NAMES,
|
||||
@@ -13,75 +11,34 @@ import {
|
||||
GATEWAY_STARTUP_UNAVAILABLE_REASON,
|
||||
} from "../../../packages/gateway-protocol/src/startup-unavailable.js";
|
||||
import { attachGatewayWsConnectionHandler } from "./ws-connection.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createRequestContext() {
|
||||
return {
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
nodeRegistry: { unregister: vi.fn() },
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
};
|
||||
}
|
||||
import {
|
||||
attachGatewayWsForTest,
|
||||
createGatewayWsTestLogger,
|
||||
createGatewayWsTestRequestContext,
|
||||
createGatewayWsTestSocket,
|
||||
} from "./ws-connection.test-helpers.js";
|
||||
|
||||
describe("attachGatewayWsConnectionHandler startup readiness", () => {
|
||||
it("returns a retryable startup-unavailable connect response while sidecars are pending", async () => {
|
||||
const listeners = new Map<string, (...args: unknown[]) => void>();
|
||||
const wss = {
|
||||
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
|
||||
listeners.set(event, handler);
|
||||
}),
|
||||
} as unknown as WebSocketServer;
|
||||
const sent: unknown[] = [];
|
||||
const socket = Object.assign(new EventEmitter(), {
|
||||
_socket: {
|
||||
remoteAddress: "127.0.0.1",
|
||||
remotePort: 1234,
|
||||
localAddress: "127.0.0.1",
|
||||
localPort: 5678,
|
||||
},
|
||||
send: vi.fn((data: string, cb?: (err?: Error) => void) => {
|
||||
const socket = createGatewayWsTestSocket({
|
||||
closeEmits: true,
|
||||
onSend: (data) => {
|
||||
sent.push(JSON.parse(data));
|
||||
cb?.();
|
||||
}),
|
||||
close: vi.fn((code?: number, reason?: string) => {
|
||||
socket.emit("close", code ?? 1000, Buffer.from(reason ?? ""));
|
||||
}),
|
||||
},
|
||||
});
|
||||
const upgradeReq = {
|
||||
headers: { host: "127.0.0.1:19001" },
|
||||
socket: { localAddress: "127.0.0.1" },
|
||||
};
|
||||
const logWsControl = createLogger();
|
||||
const logWsControl = createGatewayWsTestLogger();
|
||||
|
||||
attachGatewayWsConnectionHandler({
|
||||
wss,
|
||||
clients: new Set(),
|
||||
preauthConnectionBudget: { release: vi.fn() } as never,
|
||||
port: 19001,
|
||||
resolvedAuth: { mode: "none", allowTailscale: false },
|
||||
isStartupPending: () => true,
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: logWsControl as never,
|
||||
extraHandlers: {},
|
||||
broadcast: vi.fn(),
|
||||
buildRequestContext: () => createRequestContext() as never,
|
||||
attachGatewayWsForTest({
|
||||
attach: attachGatewayWsConnectionHandler,
|
||||
socket,
|
||||
options: {
|
||||
resolvedAuth: { mode: "none", allowTailscale: false },
|
||||
isStartupPending: () => true,
|
||||
logWsControl: logWsControl as never,
|
||||
buildRequestContext: () => createGatewayWsTestRequestContext() as never,
|
||||
},
|
||||
});
|
||||
|
||||
const onConnection = listeners.get("connection");
|
||||
expect(onConnection).toBeTypeOf("function");
|
||||
onConnection?.(socket, upgradeReq);
|
||||
socket.emit(
|
||||
"message",
|
||||
JSON.stringify({
|
||||
|
||||
129
src/gateway/server/ws-connection.test-helpers.ts
Normal file
129
src/gateway/server/ws-connection.test-helpers.ts
Normal file
@@ -0,0 +1,129 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import { expect, vi } from "vitest";
|
||||
import type { WebSocketServer } from "ws";
|
||||
import type { ResolvedGatewayAuth } from "../auth.js";
|
||||
import type { attachGatewayWsConnectionHandler } from "./ws-connection.js";
|
||||
|
||||
type AttachGatewayWsConnectionParams = Parameters<typeof attachGatewayWsConnectionHandler>[0];
|
||||
|
||||
export type GatewayWsTestSocket = EventEmitter & {
|
||||
_socket: {
|
||||
remoteAddress: string;
|
||||
remotePort: number;
|
||||
localAddress: string;
|
||||
localPort: number;
|
||||
};
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
ping?: ReturnType<typeof vi.fn>;
|
||||
close: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
export function createGatewayWsTestLogger() {
|
||||
return {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
export function createResolvedGatewayTokenAuth(token: string): ResolvedGatewayAuth {
|
||||
return {
|
||||
mode: "token",
|
||||
allowTailscale: false,
|
||||
token,
|
||||
};
|
||||
}
|
||||
|
||||
export function createGatewayWsTestRequestContext(
|
||||
overrides: {
|
||||
nodeRegistry?: { unregister: ReturnType<typeof vi.fn> };
|
||||
} = {},
|
||||
) {
|
||||
return {
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
nodeRegistry: overrides.nodeRegistry ?? { unregister: vi.fn() },
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
export function createGatewayWsTestSocket(
|
||||
params: {
|
||||
closeEmits?: boolean;
|
||||
onSend?: (data: string) => void;
|
||||
ping?: boolean;
|
||||
} = {},
|
||||
): GatewayWsTestSocket {
|
||||
const socket = Object.assign(new EventEmitter(), {
|
||||
_socket: {
|
||||
remoteAddress: "127.0.0.1",
|
||||
remotePort: 1234,
|
||||
localAddress: "127.0.0.1",
|
||||
localPort: 5678,
|
||||
},
|
||||
send: vi.fn((data: string, cb?: (err?: Error) => void) => {
|
||||
params.onSend?.(data);
|
||||
cb?.();
|
||||
}),
|
||||
...(params.ping ? { ping: vi.fn() } : {}),
|
||||
close: vi.fn((code?: number, reason?: string) => {
|
||||
if (params.closeEmits) {
|
||||
socket.emit("close", code ?? 1000, Buffer.from(reason ?? ""));
|
||||
}
|
||||
}),
|
||||
});
|
||||
return socket;
|
||||
}
|
||||
|
||||
export function attachGatewayWsForTest(params: {
|
||||
attach: typeof attachGatewayWsConnectionHandler;
|
||||
clients?: Set<unknown>;
|
||||
headers?: Record<string, string>;
|
||||
host?: string;
|
||||
options?: Partial<AttachGatewayWsConnectionParams>;
|
||||
socket?: GatewayWsTestSocket;
|
||||
}) {
|
||||
const listeners = new Map<string, (...args: unknown[]) => void>();
|
||||
const wss = {
|
||||
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
|
||||
listeners.set(event, handler);
|
||||
}),
|
||||
} as unknown as WebSocketServer;
|
||||
const socket = params.socket ?? createGatewayWsTestSocket();
|
||||
const upgradeReq = {
|
||||
headers: { host: params.host ?? "127.0.0.1:19001", ...params.headers },
|
||||
socket: { localAddress: "127.0.0.1" },
|
||||
};
|
||||
const clients = params.clients ?? new Set<unknown>();
|
||||
|
||||
params.attach({
|
||||
wss,
|
||||
clients: clients as never,
|
||||
preauthConnectionBudget: { release: vi.fn() } as never,
|
||||
port: 19001,
|
||||
resolvedAuth: createResolvedGatewayTokenAuth("token"),
|
||||
preauthHandshakeTimeoutMs: 60_000,
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
|
||||
logGateway: createGatewayWsTestLogger() as never,
|
||||
logHealth: createGatewayWsTestLogger() as never,
|
||||
logWsControl: createGatewayWsTestLogger() as never,
|
||||
extraHandlers: {},
|
||||
broadcast: vi.fn(),
|
||||
buildRequestContext: () => createGatewayWsTestRequestContext() as never,
|
||||
...params.options,
|
||||
});
|
||||
|
||||
const onConnection = listeners.get("connection");
|
||||
expect(onConnection).toBeTypeOf("function");
|
||||
onConnection?.(socket, upgradeReq);
|
||||
|
||||
return {
|
||||
clients,
|
||||
listeners,
|
||||
socket,
|
||||
upgradeReq,
|
||||
wss,
|
||||
};
|
||||
}
|
||||
@@ -1,7 +1,12 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { WebSocketServer } from "ws";
|
||||
import type { ResolvedGatewayAuth } from "../auth.js";
|
||||
import {
|
||||
attachGatewayWsForTest,
|
||||
createGatewayWsTestRequestContext,
|
||||
createGatewayWsTestSocket,
|
||||
createResolvedGatewayTokenAuth,
|
||||
type GatewayWsTestSocket,
|
||||
} from "./ws-connection.test-helpers.js";
|
||||
|
||||
const { attachGatewayWsMessageHandlerMock, broadcastPresenceSnapshotMock, upsertPresenceMock } =
|
||||
vi.hoisted(() => ({
|
||||
@@ -23,23 +28,6 @@ vi.mock("./presence-events.js", () => ({
|
||||
import { attachGatewayWsConnectionHandler } from "./ws-connection.js";
|
||||
import { resolveSharedGatewaySessionGeneration } from "./ws-shared-generation.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createResolvedAuth(token: string): ResolvedGatewayAuth {
|
||||
return {
|
||||
mode: "token",
|
||||
allowTailscale: false,
|
||||
token,
|
||||
};
|
||||
}
|
||||
|
||||
async function waitForLazyMessageHandler() {
|
||||
await vi.dynamicImportSettled();
|
||||
}
|
||||
@@ -48,86 +36,28 @@ function firstAttachedHandlerParams(): unknown {
|
||||
return attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0];
|
||||
}
|
||||
|
||||
type TestSocket = EventEmitter & {
|
||||
_socket: {
|
||||
remoteAddress: string;
|
||||
remotePort: number;
|
||||
localAddress: string;
|
||||
localPort: number;
|
||||
};
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
ping?: ReturnType<typeof vi.fn>;
|
||||
close: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
function createTestSocket(params: { ping?: boolean } = {}): TestSocket {
|
||||
return Object.assign(new EventEmitter(), {
|
||||
_socket: {
|
||||
remoteAddress: "127.0.0.1",
|
||||
remotePort: 1234,
|
||||
localAddress: "127.0.0.1",
|
||||
localPort: 5678,
|
||||
},
|
||||
send: vi.fn(),
|
||||
...(params.ping ? { ping: vi.fn() } : {}),
|
||||
close: vi.fn(),
|
||||
});
|
||||
}
|
||||
|
||||
async function connectTestWs(
|
||||
params: {
|
||||
host?: string;
|
||||
headers?: Record<string, string>;
|
||||
socket?: TestSocket;
|
||||
socket?: GatewayWsTestSocket;
|
||||
clients?: Set<unknown>;
|
||||
options?: Partial<Parameters<typeof attachGatewayWsConnectionHandler>[0]>;
|
||||
} = {},
|
||||
) {
|
||||
const listeners = new Map<string, (...args: unknown[]) => void>();
|
||||
const wss = {
|
||||
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
|
||||
listeners.set(event, handler);
|
||||
}),
|
||||
} as unknown as WebSocketServer;
|
||||
const socket = params.socket ?? createTestSocket();
|
||||
const upgradeReq = {
|
||||
headers: { host: params.host ?? "127.0.0.1:19001", ...params.headers },
|
||||
socket: { localAddress: "127.0.0.1" },
|
||||
};
|
||||
const clients = params.clients ?? new Set<unknown>();
|
||||
|
||||
attachGatewayWsConnectionHandler({
|
||||
wss,
|
||||
clients: clients as never,
|
||||
preauthConnectionBudget: { release: vi.fn() } as never,
|
||||
port: 19001,
|
||||
resolvedAuth: createResolvedAuth("token"),
|
||||
preauthHandshakeTimeoutMs: 60_000,
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
extraHandlers: {},
|
||||
broadcast: vi.fn(),
|
||||
buildRequestContext: () =>
|
||||
({
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
nodeRegistry: { unregister: vi.fn() },
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
}) as never,
|
||||
...params.options,
|
||||
const connected = attachGatewayWsForTest({
|
||||
attach: attachGatewayWsConnectionHandler,
|
||||
clients: params.clients,
|
||||
headers: params.headers,
|
||||
host: params.host,
|
||||
options: params.options,
|
||||
socket: params.socket,
|
||||
});
|
||||
|
||||
const onConnection = listeners.get("connection");
|
||||
expect(onConnection).toBeTypeOf("function");
|
||||
onConnection?.(socket, upgradeReq);
|
||||
await waitForLazyMessageHandler();
|
||||
|
||||
return {
|
||||
clients,
|
||||
socket,
|
||||
clients: connected.clients,
|
||||
socket: connected.socket,
|
||||
passed: firstAttachedHandlerParams(),
|
||||
};
|
||||
}
|
||||
@@ -144,7 +74,7 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
});
|
||||
|
||||
it("threads current auth getters into the handshake handler instead of a stale snapshot", async () => {
|
||||
const initialAuth = createResolvedAuth("token-before");
|
||||
const initialAuth = createResolvedGatewayTokenAuth("token-before");
|
||||
let currentAuth = initialAuth;
|
||||
|
||||
const { passed } = await connectTestWs({
|
||||
@@ -160,7 +90,7 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
getRequiredSharedGatewaySessionGeneration?: () => string | undefined;
|
||||
};
|
||||
|
||||
currentAuth = createResolvedAuth("token-after");
|
||||
currentAuth = createResolvedGatewayTokenAuth("token-after");
|
||||
|
||||
expect(handlerParams.getResolvedAuth().token).toBe("token-after");
|
||||
expect(handlerParams.getRequiredSharedGatewaySessionGeneration?.()).toBe(
|
||||
@@ -228,7 +158,7 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
|
||||
it("sends protocol pings until the connection closes", async () => {
|
||||
vi.useFakeTimers();
|
||||
const socket = createTestSocket({ ping: true });
|
||||
const socket = createGatewayWsTestSocket({ ping: true });
|
||||
const { passed } = await connectTestWs({ socket });
|
||||
const handlerParams = passed as {
|
||||
setClient: (client: unknown) => boolean;
|
||||
@@ -251,53 +181,15 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
});
|
||||
|
||||
it("skips node presence disconnects for stale reconnected sockets", async () => {
|
||||
const listeners = new Map<string, (...args: unknown[]) => void>();
|
||||
const unregister = vi.fn(() => null);
|
||||
const wss = {
|
||||
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
|
||||
listeners.set(event, handler);
|
||||
}),
|
||||
} as unknown as WebSocketServer;
|
||||
const socket = Object.assign(new EventEmitter(), {
|
||||
_socket: {
|
||||
remoteAddress: "127.0.0.1",
|
||||
remotePort: 1234,
|
||||
localAddress: "127.0.0.1",
|
||||
localPort: 5678,
|
||||
const { socket } = attachGatewayWsForTest({
|
||||
attach: attachGatewayWsConnectionHandler,
|
||||
options: {
|
||||
refreshHealthSnapshot: vi.fn(),
|
||||
buildRequestContext: () =>
|
||||
createGatewayWsTestRequestContext({ nodeRegistry: { unregister } }) as never,
|
||||
},
|
||||
send: vi.fn(),
|
||||
close: vi.fn(),
|
||||
});
|
||||
const upgradeReq = {
|
||||
headers: { host: "127.0.0.1:19001" },
|
||||
socket: { localAddress: "127.0.0.1" },
|
||||
};
|
||||
|
||||
attachGatewayWsConnectionHandler({
|
||||
wss,
|
||||
clients: new Set(),
|
||||
preauthConnectionBudget: { release: vi.fn() } as never,
|
||||
port: 19001,
|
||||
resolvedAuth: createResolvedAuth("token"),
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(),
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
extraHandlers: {},
|
||||
broadcast: vi.fn(),
|
||||
buildRequestContext: () =>
|
||||
({
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
nodeRegistry: { unregister },
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
}) as never,
|
||||
});
|
||||
|
||||
const onConnection = listeners.get("connection");
|
||||
expect(onConnection).toBeTypeOf("function");
|
||||
onConnection?.(socket, upgradeReq);
|
||||
await waitForLazyMessageHandler();
|
||||
|
||||
const passed = firstAttachedHandlerParams() as {
|
||||
|
||||
Reference in New Issue
Block a user