From 880f55b5ac4d4a80dbb41ae34ac4a06c0f1ee1dd Mon Sep 17 00:00:00 2001 From: George Pickett Date: Mon, 20 Apr 2026 14:57:47 -0700 Subject: [PATCH] fix(gateway): bind after websocket runtime is ready --- src/gateway/server-close.test.ts | 37 ++++++ src/gateway/server-close.ts | 17 ++- src/gateway/server.impl.ts | 4 +- .../server.startup-websocket-race.test.ts | 107 ++++++++++++++++++ 4 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 src/gateway/server.startup-websocket-race.test.ts diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index df399d5e197..3c74e34c431 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -205,4 +205,41 @@ describe("createGatewayCloseHandler", () => { await closeExpectation; expect(vi.getTimerCount()).toBe(0); }); + + it("ignores unbound http servers during shutdown", async () => { + const close = createGatewayCloseHandler({ + bonjourStop: null, + tailscaleCleanup: null, + canvasHost: null, + canvasHostServer: null, + stopChannel: vi.fn(async () => undefined), + pluginServices: null, + cron: { stop: vi.fn() }, + heartbeatRunner: { stop: vi.fn() } as never, + updateCheckStop: null, + nodePresenceTimers: new Map(), + broadcast: vi.fn(), + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: null, + agentUnsub: null, + heartbeatUnsub: null, + transcriptUnsub: null, + lifecycleUnsub: null, + chatRunState: { clear: vi.fn() }, + clients: new Set(), + configReloader: { stop: vi.fn(async () => undefined) }, + wss: { close: (cb: () => void) => cb() } as never, + httpServer: { + close: (cb: (err?: NodeJS.ErrnoException | null) => void) => + cb( + Object.assign(new Error("Server is not running."), { code: "ERR_SERVER_NOT_RUNNING" }), + ), + closeIdleConnections: vi.fn(), + } as never, + }); + + await expect(close({ reason: "startup failed before bind" })).resolves.toBeUndefined(); + }); }); diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 5e0311e6381..cecb7114a5c 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -64,6 +64,15 @@ export async function runGatewayClosePrelude(params: { await params.closeMcpServer?.().catch(() => {}); } +function isServerNotRunningError(err: unknown): boolean { + return Boolean( + err && + typeof err === "object" && + "code" in err && + (err as { code?: unknown }).code === "ERR_SERVER_NOT_RUNNING", + ); +} + export function createGatewayCloseHandler(params: { bonjourStop: (() => Promise) | null; tailscaleCleanup: (() => Promise) | null; @@ -240,7 +249,13 @@ export function createGatewayCloseHandler(params: { httpServer.closeIdleConnections(); } const closePromise = new Promise((resolve, reject) => - httpServer.close((err) => (err ? reject(err) : resolve())), + httpServer.close((err) => { + if (!err || isServerNotRunningError(err)) { + resolve(); + return; + } + reject(err); + }), ); const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const); const closedWithinGrace = await Promise.race([ diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 365fd6fed6c..77bc19f81b0 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -479,6 +479,7 @@ export async function startGatewayServer( httpServer, httpServers, httpBindHosts, + startListening, wss, preauthConnectionBudget, clients, @@ -526,7 +527,6 @@ export async function startGatewayServer( getReadiness, }), ); - startupTrace.mark("http.bound"); const { nodeRegistry, nodePresenceTimers, @@ -796,6 +796,8 @@ export async function startGatewayServer( broadcast, context: gatewayRequestContext, }); + await startListening(); + startupTrace.mark("http.bound"); ({ stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck, tailscaleCleanup: runtimeState.tailscaleCleanup, diff --git a/src/gateway/server.startup-websocket-race.test.ts b/src/gateway/server.startup-websocket-race.test.ts new file mode 100644 index 00000000000..7508279605e --- /dev/null +++ b/src/gateway/server.startup-websocket-race.test.ts @@ -0,0 +1,107 @@ +import http from "node:http"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { getFreePort, installGatewayTestHooks, startGatewayServer } from "./test-helpers.js"; + +const machineNameDelay = vi.hoisted(() => { + let enteredResolve = () => {}; + let releaseResolve = () => {}; + let entered = new Promise((resolve) => { + enteredResolve = resolve; + }); + let release = new Promise((resolve) => { + releaseResolve = resolve; + }); + return { + waitUntilDelayed: async () => { + await entered; + }, + release: () => { + releaseResolve(); + }, + reset: () => { + entered = new Promise((resolve) => { + enteredResolve = resolve; + }); + release = new Promise((resolve) => { + releaseResolve = resolve; + }); + }, + run: async () => { + enteredResolve(); + await release; + return "test-machine"; + }, + }; +}); + +vi.mock("../infra/machine-name.js", () => ({ + getMachineDisplayName: () => machineNameDelay.run(), +})); + +installGatewayTestHooks({ scope: "suite" }); + +afterEach(() => { + machineNameDelay.release(); +}); + +describe("gateway startup websocket readiness", () => { + it("does not bind the websocket port until websocket handlers are attached", async () => { + machineNameDelay.reset(); + const previousMinimal = process.env.OPENCLAW_TEST_MINIMAL_GATEWAY; + process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = "0"; + let server: Awaited> | undefined; + try { + const port = await getFreePort(); + const startup = startGatewayServer(port, { + auth: { mode: "none" }, + }); + await machineNameDelay.waitUntilDelayed(); + + const pendingUpgrade = await new Promise< + { kind: "error"; code?: string } | { kind: "response"; status: number; body: string } + >((resolve) => { + const req = http.request({ + host: "127.0.0.1", + port, + path: "/", + headers: { + Connection: "Upgrade", + Upgrade: "websocket", + "Sec-WebSocket-Key": "dGVzdC1rZXktMDEyMzQ1Ng==", + "Sec-WebSocket-Version": "13", + }, + }); + req.once("error", (err) => { + resolve({ kind: "error", code: (err as NodeJS.ErrnoException).code }); + }); + req.once("response", (res) => { + let body = ""; + res.setEncoding("utf8"); + res.on("data", (chunk) => { + body += chunk; + }); + res.once("end", () => { + resolve({ kind: "response", status: res.statusCode ?? 0, body }); + }); + }); + req.end(); + }); + + expect(pendingUpgrade).toEqual({ kind: "error", code: "ECONNREFUSED" }); + + machineNameDelay.release(); + server = await startup; + expect(server).toBeDefined(); + } finally { + machineNameDelay.release(); + if (server) { + await server.close(); + } + if (previousMinimal === undefined) { + delete process.env.OPENCLAW_TEST_MINIMAL_GATEWAY; + } else { + process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = previousMinimal; + } + } + }); +});