fix(gateway): prevent 1006 errors from WebSocket upgrade race condition

Attach upgrade handlers before HTTP server starts listening.

Fixes #43381
This commit is contained in:
dalefrieswthat
2026-03-11 12:54:29 -05:00
committed by George Pickett
parent 67719b3c28
commit 722fbd4df6
4 changed files with 88 additions and 43 deletions

View File

@@ -39,16 +39,17 @@ const resolveGatewayPort = vi.hoisted(() => vi.fn((_cfg?: unknown, _env?: unknow
const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []);
const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>();
const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", "));
const probeGateway = vi.fn<
(opts: {
url: string;
auth?: { token?: string; password?: string };
timeoutMs: number;
}) => Promise<{
ok: boolean;
configSnapshot: unknown;
}>
>();
const probeGateway =
vi.fn<
(opts: {
url: string;
auth?: { token?: string; password?: string };
timeoutMs: number;
}) => Promise<{
ok: boolean;
configSnapshot: unknown;
}>
>();
const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true);
const loadConfig = vi.hoisted(() => vi.fn(() => ({})));
const recoverInstalledLaunchAgent = vi.hoisted(() => vi.fn());

View File

@@ -406,7 +406,17 @@ function formatGatewayCloseError(
const hint =
code === 1006 ? "abnormal closure (no close frame)" : code === 1000 ? "normal closure" : "";
const suffix = hint ? ` ${hint}` : "";
return `gateway closed (${code}${suffix}): ${reasonText}\n${connectionDetails.message}`;
let message = `gateway closed (${code}${suffix}): ${reasonText}\n${connectionDetails.message}`;
// Add troubleshooting hints for common issues
if (code === 1006) {
message +=
"\n\nPossible causes:" +
"\n- Gateway not yet ready to accept connections (retry after a moment)" +
"\n- TLS mismatch (connecting with ws:// to a wss:// gateway, or vice versa)" +
"\n- Gateway crashed or was terminated unexpectedly" +
"\nRun `openclaw doctor` for diagnostics.";
}
return message;
}
function formatGatewayTimeoutError(

View File

@@ -1139,6 +1139,8 @@ export function attachGatewayUpgradeHandler(opts: {
getResolvedAuth?: () => ResolvedGatewayAuth;
/** Optional rate limiter for auth brute-force protection. */
rateLimiter?: AuthRateLimiter;
/** Optional logger for error diagnostics. */
log?: { warn: (msg: string) => void };
}) {
const {
httpServer,
@@ -1148,6 +1150,7 @@ export function attachGatewayUpgradeHandler(opts: {
preauthConnectionBudget,
resolvedAuth,
rateLimiter,
log,
} = opts;
const getResolvedAuth = opts.getResolvedAuth ?? (() => resolvedAuth);
httpServer.on("upgrade", (req, socket, head) => {
@@ -1250,7 +1253,10 @@ export function attachGatewayUpgradeHandler(opts: {
releaseUpgradeBudget();
throw new Error("gateway websocket upgrade failed");
}
})().catch(() => {
})().catch((err) => {
const remoteAddress = (socket as { remoteAddress?: string }).remoteAddress ?? "unknown";
const errorMessage = err instanceof Error ? err.message : String(err);
log?.warn(`ws upgrade error from ${remoteAddress}: ${errorMessage}`);
socket.destroy();
});
});

View File

@@ -84,6 +84,7 @@ export async function createGatewayRuntimeState(params: {
httpServer: HttpServer;
httpServers: HttpServer[];
httpBindHosts: string[];
startListening: () => Promise<void>;
wss: WebSocketServer;
preauthConnectionBudget: PreauthConnectionBudget;
clients: Set<GatewayWsClient>;
@@ -168,9 +169,18 @@ export async function createGatewayRuntimeState(params: {
"Host-header origin fallback weakens origin checks and should only be used as break-glass.",
);
}
// Create WebSocketServer first (with noServer: true) so we can attach upgrade handlers
// before HTTP servers start listening. This prevents a race condition where connections
// arrive before the upgrade handler is attached, which causes silent 1006 errors.
const wss = new WebSocketServer({
noServer: true,
maxPayload: MAX_PREAUTH_PAYLOAD_BYTES,
});
const preauthConnectionBudget = createPreauthConnectionBudget();
const httpServers: HttpServer[] = [];
const httpBindHosts: string[] = [];
for (const host of bindHosts) {
for (const _host of bindHosts) {
const httpServer = createGatewayHttpServer({
canvasHost,
clients,
@@ -191,36 +201,9 @@ export async function createGatewayRuntimeState(params: {
getReadiness: params.getReadiness,
tlsOptions: params.gatewayTls?.enabled ? params.gatewayTls.tlsOptions : undefined,
});
try {
await listenGatewayHttpServer({
httpServer,
bindHost: host,
port: params.port,
});
httpServers.push(httpServer);
httpBindHosts.push(host);
} catch (err) {
if (host === bindHosts[0]) {
throw err;
}
params.log.warn(
`gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`,
);
}
}
const httpServer = httpServers[0];
if (!httpServer) {
throw new Error("Gateway HTTP server failed to start");
}
const wss = new WebSocketServer({
noServer: true,
maxPayload: MAX_PREAUTH_PAYLOAD_BYTES,
});
const preauthConnectionBudget = createPreauthConnectionBudget();
for (const server of httpServers) {
// Attach upgrade handler BEFORE listening to prevent race condition
attachGatewayUpgradeHandler({
httpServer: server,
httpServer,
wss,
canvasHost,
clients,
@@ -228,9 +211,53 @@ export async function createGatewayRuntimeState(params: {
resolvedAuth: params.resolvedAuth,
getResolvedAuth: params.getResolvedAuth,
rateLimiter: params.rateLimiter,
log: params.log,
});
httpServers.push(httpServer);
}
const httpServer = httpServers[0];
if (!httpServer) {
throw new Error("Gateway HTTP server failed to start");
}
let startListeningPromise: Promise<void> | null = null;
const startListening = async (): Promise<void> => {
if (startListeningPromise) {
await startListeningPromise;
return;
}
startListeningPromise = (async () => {
for (const [index, host] of bindHosts.entries()) {
const server = httpServers[index];
if (!server) {
throw new Error(`Missing gateway HTTP server for bind host ${host}`);
}
try {
await listenGatewayHttpServer({
httpServer: server,
bindHost: host,
port: params.port,
});
httpBindHosts.push(host);
} catch (err) {
if (host === bindHosts[0]) {
throw err;
}
params.log.warn(
`gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`,
);
}
}
if (httpBindHosts.length === 0) {
throw new Error("Gateway HTTP server failed to start");
}
})();
try {
await startListeningPromise;
} catch (err) {
startListeningPromise = null;
throw err;
}
};
const agentRunSeq = new Map<string, number>();
const dedupe = new Map<string, DedupeEntry>();
const chatRunState = createChatRunState();
@@ -257,6 +284,7 @@ export async function createGatewayRuntimeState(params: {
httpServer,
httpServers,
httpBindHosts,
startListening,
wss,
preauthConnectionBudget,
clients,