fix(gateway): bind after websocket runtime is ready

This commit is contained in:
George Pickett
2026-04-20 14:57:47 -07:00
parent 722fbd4df6
commit 880f55b5ac
4 changed files with 163 additions and 2 deletions

View File

@@ -205,4 +205,41 @@ describe("createGatewayCloseHandler", () => {
await closeExpectation;
expect(vi.getTimerCount()).toBe(0);
});
it("ignores unbound http servers during shutdown", async () => {
const close = createGatewayCloseHandler({
bonjourStop: null,
tailscaleCleanup: null,
canvasHost: null,
canvasHostServer: null,
stopChannel: vi.fn(async () => undefined),
pluginServices: null,
cron: { stop: vi.fn() },
heartbeatRunner: { stop: vi.fn() } as never,
updateCheckStop: null,
nodePresenceTimers: new Map(),
broadcast: vi.fn(),
tickInterval: setInterval(() => undefined, 60_000),
healthInterval: setInterval(() => undefined, 60_000),
dedupeCleanup: setInterval(() => undefined, 60_000),
mediaCleanup: null,
agentUnsub: null,
heartbeatUnsub: null,
transcriptUnsub: null,
lifecycleUnsub: null,
chatRunState: { clear: vi.fn() },
clients: new Set(),
configReloader: { stop: vi.fn(async () => undefined) },
wss: { close: (cb: () => void) => cb() } as never,
httpServer: {
close: (cb: (err?: NodeJS.ErrnoException | null) => void) =>
cb(
Object.assign(new Error("Server is not running."), { code: "ERR_SERVER_NOT_RUNNING" }),
),
closeIdleConnections: vi.fn(),
} as never,
});
await expect(close({ reason: "startup failed before bind" })).resolves.toBeUndefined();
});
});

View File

@@ -64,6 +64,15 @@ export async function runGatewayClosePrelude(params: {
await params.closeMcpServer?.().catch(() => {});
}
function isServerNotRunningError(err: unknown): boolean {
return Boolean(
err &&
typeof err === "object" &&
"code" in err &&
(err as { code?: unknown }).code === "ERR_SERVER_NOT_RUNNING",
);
}
export function createGatewayCloseHandler(params: {
bonjourStop: (() => Promise<void>) | null;
tailscaleCleanup: (() => Promise<void>) | null;
@@ -240,7 +249,13 @@ export function createGatewayCloseHandler(params: {
httpServer.closeIdleConnections();
}
const closePromise = new Promise<void>((resolve, reject) =>
httpServer.close((err) => (err ? reject(err) : resolve())),
httpServer.close((err) => {
if (!err || isServerNotRunningError(err)) {
resolve();
return;
}
reject(err);
}),
);
const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const);
const closedWithinGrace = await Promise.race([

View File

@@ -479,6 +479,7 @@ export async function startGatewayServer(
httpServer,
httpServers,
httpBindHosts,
startListening,
wss,
preauthConnectionBudget,
clients,
@@ -526,7 +527,6 @@ export async function startGatewayServer(
getReadiness,
}),
);
startupTrace.mark("http.bound");
const {
nodeRegistry,
nodePresenceTimers,
@@ -796,6 +796,8 @@ export async function startGatewayServer(
broadcast,
context: gatewayRequestContext,
});
await startListening();
startupTrace.mark("http.bound");
({
stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck,
tailscaleCleanup: runtimeState.tailscaleCleanup,

View File

@@ -0,0 +1,107 @@
import http from "node:http";
import { afterEach, describe, expect, it, vi } from "vitest";
import { getFreePort, installGatewayTestHooks, startGatewayServer } from "./test-helpers.js";
const machineNameDelay = vi.hoisted(() => {
let enteredResolve = () => {};
let releaseResolve = () => {};
let entered = new Promise<void>((resolve) => {
enteredResolve = resolve;
});
let release = new Promise<void>((resolve) => {
releaseResolve = resolve;
});
return {
waitUntilDelayed: async () => {
await entered;
},
release: () => {
releaseResolve();
},
reset: () => {
entered = new Promise<void>((resolve) => {
enteredResolve = resolve;
});
release = new Promise<void>((resolve) => {
releaseResolve = resolve;
});
},
run: async () => {
enteredResolve();
await release;
return "test-machine";
},
};
});
vi.mock("../infra/machine-name.js", () => ({
getMachineDisplayName: () => machineNameDelay.run(),
}));
installGatewayTestHooks({ scope: "suite" });
afterEach(() => {
machineNameDelay.release();
});
describe("gateway startup websocket readiness", () => {
it("does not bind the websocket port until websocket handlers are attached", async () => {
machineNameDelay.reset();
const previousMinimal = process.env.OPENCLAW_TEST_MINIMAL_GATEWAY;
process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = "0";
let server: Awaited<ReturnType<typeof startGatewayServer>> | undefined;
try {
const port = await getFreePort();
const startup = startGatewayServer(port, {
auth: { mode: "none" },
});
await machineNameDelay.waitUntilDelayed();
const pendingUpgrade = await new Promise<
{ kind: "error"; code?: string } | { kind: "response"; status: number; body: string }
>((resolve) => {
const req = http.request({
host: "127.0.0.1",
port,
path: "/",
headers: {
Connection: "Upgrade",
Upgrade: "websocket",
"Sec-WebSocket-Key": "dGVzdC1rZXktMDEyMzQ1Ng==",
"Sec-WebSocket-Version": "13",
},
});
req.once("error", (err) => {
resolve({ kind: "error", code: (err as NodeJS.ErrnoException).code });
});
req.once("response", (res) => {
let body = "";
res.setEncoding("utf8");
res.on("data", (chunk) => {
body += chunk;
});
res.once("end", () => {
resolve({ kind: "response", status: res.statusCode ?? 0, body });
});
});
req.end();
});
expect(pendingUpgrade).toEqual({ kind: "error", code: "ECONNREFUSED" });
machineNameDelay.release();
server = await startup;
expect(server).toBeDefined();
} finally {
machineNameDelay.release();
if (server) {
await server.close();
}
if (previousMinimal === undefined) {
delete process.env.OPENCLAW_TEST_MINIMAL_GATEWAY;
} else {
process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = previousMinimal;
}
}
});
});