mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 05:10:44 +00:00
fix(gateway): prevent 1006 errors from race condition in WebSocket upgrade (#43392)
Merged via squash.
Prepared head SHA: 0bca6d3512
Co-authored-by: dalefrieswthat <176454532+dalefrieswthat@users.noreply.github.com>
Co-authored-by: grp06 <1573959+grp06@users.noreply.github.com>
Reviewed-by: @grp06
This commit is contained in:
@@ -29,7 +29,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Gateway/websocket broadcasts: require `operator.read` (or higher) for chat, agent, and tool-result event frames so pairing-scoped and node-role sessions no longer passively receive session chat content, and scope-gate unknown broadcast events by default. Plugin-defined `plugin.*` broadcasts are scoped to operator.write/admin, and status/transport events (`heartbeat`, `presence`, `tick`, etc.) remain unrestricted. Per-client sequence numbers preserve per-connection monotonicity. (#69373) Thanks @eleqtrizit.
|
||||
- Agents/compaction: always reload embedded Pi resources through an explicit loader and reapply reserve-token overrides so runs without extension factories no longer silently lose compaction settings before session start. (#67146) Thanks @ly85206559.
|
||||
- Memory-core/dreaming: normalize sweep timestamps and reuse hashed narrative session keys for fallback cleanup so Dreaming narrative sub-sessions stop leaking. (#67023) Thanks @chiyouYCH.
|
||||
|
||||
- Gateway/startup: delay HTTP bind until websocket handlers are attached, so immediate post-startup websocket health/connect probes no longer hit the startup race window. (#43392) Thanks @dalefrieswthat.
|
||||
## 2026.4.20
|
||||
|
||||
### Changes
|
||||
|
||||
@@ -39,16 +39,17 @@ const resolveGatewayPort = vi.hoisted(() => vi.fn((_cfg?: unknown, _env?: unknow
|
||||
const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []);
|
||||
const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>();
|
||||
const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", "));
|
||||
const probeGateway = vi.fn<
|
||||
(opts: {
|
||||
url: string;
|
||||
auth?: { token?: string; password?: string };
|
||||
timeoutMs: number;
|
||||
}) => Promise<{
|
||||
ok: boolean;
|
||||
configSnapshot: unknown;
|
||||
}>
|
||||
>();
|
||||
const probeGateway =
|
||||
vi.fn<
|
||||
(opts: {
|
||||
url: string;
|
||||
auth?: { token?: string; password?: string };
|
||||
timeoutMs: number;
|
||||
}) => Promise<{
|
||||
ok: boolean;
|
||||
configSnapshot: unknown;
|
||||
}>
|
||||
>();
|
||||
const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true);
|
||||
const loadConfig = vi.hoisted(() => vi.fn(() => ({})));
|
||||
const recoverInstalledLaunchAgent = vi.hoisted(() => vi.fn());
|
||||
|
||||
@@ -406,7 +406,17 @@ function formatGatewayCloseError(
|
||||
const hint =
|
||||
code === 1006 ? "abnormal closure (no close frame)" : code === 1000 ? "normal closure" : "";
|
||||
const suffix = hint ? ` ${hint}` : "";
|
||||
return `gateway closed (${code}${suffix}): ${reasonText}\n${connectionDetails.message}`;
|
||||
let message = `gateway closed (${code}${suffix}): ${reasonText}\n${connectionDetails.message}`;
|
||||
// Add troubleshooting hints for common issues
|
||||
if (code === 1006) {
|
||||
message +=
|
||||
"\n\nPossible causes:" +
|
||||
"\n- Gateway not yet ready to accept connections (retry after a moment)" +
|
||||
"\n- TLS mismatch (connecting with ws:// to a wss:// gateway, or vice versa)" +
|
||||
"\n- Gateway crashed or was terminated unexpectedly" +
|
||||
"\nRun `openclaw doctor` for diagnostics.";
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
function formatGatewayTimeoutError(
|
||||
|
||||
@@ -205,4 +205,41 @@ describe("createGatewayCloseHandler", () => {
|
||||
await closeExpectation;
|
||||
expect(vi.getTimerCount()).toBe(0);
|
||||
});
|
||||
|
||||
it("ignores unbound http servers during shutdown", async () => {
|
||||
const close = createGatewayCloseHandler({
|
||||
bonjourStop: null,
|
||||
tailscaleCleanup: null,
|
||||
canvasHost: null,
|
||||
canvasHostServer: null,
|
||||
stopChannel: vi.fn(async () => undefined),
|
||||
pluginServices: null,
|
||||
cron: { stop: vi.fn() },
|
||||
heartbeatRunner: { stop: vi.fn() } as never,
|
||||
updateCheckStop: null,
|
||||
nodePresenceTimers: new Map(),
|
||||
broadcast: vi.fn(),
|
||||
tickInterval: setInterval(() => undefined, 60_000),
|
||||
healthInterval: setInterval(() => undefined, 60_000),
|
||||
dedupeCleanup: setInterval(() => undefined, 60_000),
|
||||
mediaCleanup: null,
|
||||
agentUnsub: null,
|
||||
heartbeatUnsub: null,
|
||||
transcriptUnsub: null,
|
||||
lifecycleUnsub: null,
|
||||
chatRunState: { clear: vi.fn() },
|
||||
clients: new Set(),
|
||||
configReloader: { stop: vi.fn(async () => undefined) },
|
||||
wss: { close: (cb: () => void) => cb() } as never,
|
||||
httpServer: {
|
||||
close: (cb: (err?: NodeJS.ErrnoException | null) => void) =>
|
||||
cb(
|
||||
Object.assign(new Error("Server is not running."), { code: "ERR_SERVER_NOT_RUNNING" }),
|
||||
),
|
||||
closeIdleConnections: vi.fn(),
|
||||
} as never,
|
||||
});
|
||||
|
||||
await expect(close({ reason: "startup failed before bind" })).resolves.toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -64,6 +64,15 @@ export async function runGatewayClosePrelude(params: {
|
||||
await params.closeMcpServer?.().catch(() => {});
|
||||
}
|
||||
|
||||
function isServerNotRunningError(err: unknown): boolean {
|
||||
return Boolean(
|
||||
err &&
|
||||
typeof err === "object" &&
|
||||
"code" in err &&
|
||||
(err as { code?: unknown }).code === "ERR_SERVER_NOT_RUNNING",
|
||||
);
|
||||
}
|
||||
|
||||
export function createGatewayCloseHandler(params: {
|
||||
bonjourStop: (() => Promise<void>) | null;
|
||||
tailscaleCleanup: (() => Promise<void>) | null;
|
||||
@@ -240,7 +249,13 @@ export function createGatewayCloseHandler(params: {
|
||||
httpServer.closeIdleConnections();
|
||||
}
|
||||
const closePromise = new Promise<void>((resolve, reject) =>
|
||||
httpServer.close((err) => (err ? reject(err) : resolve())),
|
||||
httpServer.close((err) => {
|
||||
if (!err || isServerNotRunningError(err)) {
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
reject(err);
|
||||
}),
|
||||
);
|
||||
const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const);
|
||||
const closedWithinGrace = await Promise.race([
|
||||
|
||||
@@ -1139,6 +1139,8 @@ export function attachGatewayUpgradeHandler(opts: {
|
||||
getResolvedAuth?: () => ResolvedGatewayAuth;
|
||||
/** Optional rate limiter for auth brute-force protection. */
|
||||
rateLimiter?: AuthRateLimiter;
|
||||
/** Optional logger for error diagnostics. */
|
||||
log?: { warn: (msg: string) => void };
|
||||
}) {
|
||||
const {
|
||||
httpServer,
|
||||
@@ -1148,6 +1150,7 @@ export function attachGatewayUpgradeHandler(opts: {
|
||||
preauthConnectionBudget,
|
||||
resolvedAuth,
|
||||
rateLimiter,
|
||||
log,
|
||||
} = opts;
|
||||
const getResolvedAuth = opts.getResolvedAuth ?? (() => resolvedAuth);
|
||||
httpServer.on("upgrade", (req, socket, head) => {
|
||||
@@ -1250,7 +1253,10 @@ export function attachGatewayUpgradeHandler(opts: {
|
||||
releaseUpgradeBudget();
|
||||
throw new Error("gateway websocket upgrade failed");
|
||||
}
|
||||
})().catch(() => {
|
||||
})().catch((err) => {
|
||||
const remoteAddress = (socket as { remoteAddress?: string }).remoteAddress ?? "unknown";
|
||||
const errorMessage = err instanceof Error ? err.message : String(err);
|
||||
log?.warn(`ws upgrade error from ${remoteAddress}: ${errorMessage}`);
|
||||
socket.destroy();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -84,6 +84,7 @@ export async function createGatewayRuntimeState(params: {
|
||||
httpServer: HttpServer;
|
||||
httpServers: HttpServer[];
|
||||
httpBindHosts: string[];
|
||||
startListening: () => Promise<void>;
|
||||
wss: WebSocketServer;
|
||||
preauthConnectionBudget: PreauthConnectionBudget;
|
||||
clients: Set<GatewayWsClient>;
|
||||
@@ -168,9 +169,18 @@ export async function createGatewayRuntimeState(params: {
|
||||
"Host-header origin fallback weakens origin checks and should only be used as break-glass.",
|
||||
);
|
||||
}
|
||||
// Create WebSocketServer first (with noServer: true) so we can attach upgrade handlers
|
||||
// before HTTP servers start listening. This prevents a race condition where connections
|
||||
// arrive before the upgrade handler is attached, which causes silent 1006 errors.
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
maxPayload: MAX_PREAUTH_PAYLOAD_BYTES,
|
||||
});
|
||||
const preauthConnectionBudget = createPreauthConnectionBudget();
|
||||
|
||||
const httpServers: HttpServer[] = [];
|
||||
const httpBindHosts: string[] = [];
|
||||
for (const host of bindHosts) {
|
||||
for (const _host of bindHosts) {
|
||||
const httpServer = createGatewayHttpServer({
|
||||
canvasHost,
|
||||
clients,
|
||||
@@ -191,36 +201,9 @@ export async function createGatewayRuntimeState(params: {
|
||||
getReadiness: params.getReadiness,
|
||||
tlsOptions: params.gatewayTls?.enabled ? params.gatewayTls.tlsOptions : undefined,
|
||||
});
|
||||
try {
|
||||
await listenGatewayHttpServer({
|
||||
httpServer,
|
||||
bindHost: host,
|
||||
port: params.port,
|
||||
});
|
||||
httpServers.push(httpServer);
|
||||
httpBindHosts.push(host);
|
||||
} catch (err) {
|
||||
if (host === bindHosts[0]) {
|
||||
throw err;
|
||||
}
|
||||
params.log.warn(
|
||||
`gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
const httpServer = httpServers[0];
|
||||
if (!httpServer) {
|
||||
throw new Error("Gateway HTTP server failed to start");
|
||||
}
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
maxPayload: MAX_PREAUTH_PAYLOAD_BYTES,
|
||||
});
|
||||
const preauthConnectionBudget = createPreauthConnectionBudget();
|
||||
for (const server of httpServers) {
|
||||
// Attach upgrade handler BEFORE listening to prevent race condition
|
||||
attachGatewayUpgradeHandler({
|
||||
httpServer: server,
|
||||
httpServer,
|
||||
wss,
|
||||
canvasHost,
|
||||
clients,
|
||||
@@ -228,9 +211,53 @@ export async function createGatewayRuntimeState(params: {
|
||||
resolvedAuth: params.resolvedAuth,
|
||||
getResolvedAuth: params.getResolvedAuth,
|
||||
rateLimiter: params.rateLimiter,
|
||||
log: params.log,
|
||||
});
|
||||
httpServers.push(httpServer);
|
||||
}
|
||||
|
||||
const httpServer = httpServers[0];
|
||||
if (!httpServer) {
|
||||
throw new Error("Gateway HTTP server failed to start");
|
||||
}
|
||||
let startListeningPromise: Promise<void> | null = null;
|
||||
const startListening = async (): Promise<void> => {
|
||||
if (startListeningPromise) {
|
||||
await startListeningPromise;
|
||||
return;
|
||||
}
|
||||
startListeningPromise = (async () => {
|
||||
for (const [index, host] of bindHosts.entries()) {
|
||||
const server = httpServers[index];
|
||||
if (!server) {
|
||||
throw new Error(`Missing gateway HTTP server for bind host ${host}`);
|
||||
}
|
||||
try {
|
||||
await listenGatewayHttpServer({
|
||||
httpServer: server,
|
||||
bindHost: host,
|
||||
port: params.port,
|
||||
});
|
||||
httpBindHosts.push(host);
|
||||
} catch (err) {
|
||||
if (host === bindHosts[0]) {
|
||||
throw err;
|
||||
}
|
||||
params.log.warn(
|
||||
`gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
if (httpBindHosts.length === 0) {
|
||||
throw new Error("Gateway HTTP server failed to start");
|
||||
}
|
||||
})();
|
||||
try {
|
||||
await startListeningPromise;
|
||||
} catch (err) {
|
||||
startListeningPromise = null;
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
const agentRunSeq = new Map<string, number>();
|
||||
const dedupe = new Map<string, DedupeEntry>();
|
||||
const chatRunState = createChatRunState();
|
||||
@@ -257,6 +284,7 @@ export async function createGatewayRuntimeState(params: {
|
||||
httpServer,
|
||||
httpServers,
|
||||
httpBindHosts,
|
||||
startListening,
|
||||
wss,
|
||||
preauthConnectionBudget,
|
||||
clients,
|
||||
|
||||
@@ -479,6 +479,7 @@ export async function startGatewayServer(
|
||||
httpServer,
|
||||
httpServers,
|
||||
httpBindHosts,
|
||||
startListening,
|
||||
wss,
|
||||
preauthConnectionBudget,
|
||||
clients,
|
||||
@@ -526,7 +527,6 @@ export async function startGatewayServer(
|
||||
getReadiness,
|
||||
}),
|
||||
);
|
||||
startupTrace.mark("http.bound");
|
||||
const {
|
||||
nodeRegistry,
|
||||
nodePresenceTimers,
|
||||
@@ -796,6 +796,8 @@ export async function startGatewayServer(
|
||||
broadcast,
|
||||
context: gatewayRequestContext,
|
||||
});
|
||||
await startListening();
|
||||
startupTrace.mark("http.bound");
|
||||
({
|
||||
stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck,
|
||||
tailscaleCleanup: runtimeState.tailscaleCleanup,
|
||||
|
||||
143
src/gateway/server.startup-websocket-race.test.ts
Normal file
143
src/gateway/server.startup-websocket-race.test.ts
Normal file
@@ -0,0 +1,143 @@
|
||||
import http from "node:http";
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { connectGatewayClient, disconnectGatewayClient } from "./test-helpers.e2e.js";
|
||||
import { getFreePort, installGatewayTestHooks, startGatewayServer } from "./test-helpers.js";
|
||||
|
||||
const machineNameDelay = vi.hoisted(() => {
|
||||
let enteredResolve = () => {};
|
||||
let releaseResolve = () => {};
|
||||
let entered = new Promise<void>((resolve) => {
|
||||
enteredResolve = resolve;
|
||||
});
|
||||
let release = new Promise<void>((resolve) => {
|
||||
releaseResolve = resolve;
|
||||
});
|
||||
return {
|
||||
waitUntilDelayed: async () => {
|
||||
await entered;
|
||||
},
|
||||
release: () => {
|
||||
releaseResolve();
|
||||
},
|
||||
reset: () => {
|
||||
entered = new Promise<void>((resolve) => {
|
||||
enteredResolve = resolve;
|
||||
});
|
||||
release = new Promise<void>((resolve) => {
|
||||
releaseResolve = resolve;
|
||||
});
|
||||
},
|
||||
run: async () => {
|
||||
enteredResolve();
|
||||
await release;
|
||||
return "test-machine";
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../infra/machine-name.js", () => ({
|
||||
getMachineDisplayName: () => machineNameDelay.run(),
|
||||
}));
|
||||
|
||||
installGatewayTestHooks({ scope: "suite" });
|
||||
|
||||
afterEach(() => {
|
||||
machineNameDelay.release();
|
||||
});
|
||||
|
||||
describe("gateway startup websocket readiness", () => {
|
||||
it("does not bind the websocket port until websocket handlers are attached", async () => {
|
||||
machineNameDelay.reset();
|
||||
const previousMinimal = process.env.OPENCLAW_TEST_MINIMAL_GATEWAY;
|
||||
process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = "0";
|
||||
let server: Awaited<ReturnType<typeof startGatewayServer>> | undefined;
|
||||
try {
|
||||
const port = await getFreePort();
|
||||
const startup = startGatewayServer(port, {
|
||||
auth: { mode: "none" },
|
||||
});
|
||||
await machineNameDelay.waitUntilDelayed();
|
||||
|
||||
const pendingUpgrade = await new Promise<
|
||||
{ kind: "error"; code?: string } | { kind: "response"; status: number; body: string }
|
||||
>((resolve) => {
|
||||
const req = http.request({
|
||||
host: "127.0.0.1",
|
||||
port,
|
||||
path: "/",
|
||||
headers: {
|
||||
Connection: "Upgrade",
|
||||
Upgrade: "websocket",
|
||||
"Sec-WebSocket-Key": "dGVzdC1rZXktMDEyMzQ1Ng==",
|
||||
"Sec-WebSocket-Version": "13",
|
||||
},
|
||||
});
|
||||
req.once("error", (err) => {
|
||||
resolve({ kind: "error", code: (err as NodeJS.ErrnoException).code });
|
||||
});
|
||||
req.once("response", (res) => {
|
||||
let body = "";
|
||||
res.setEncoding("utf8");
|
||||
res.on("data", (chunk) => {
|
||||
body += chunk;
|
||||
});
|
||||
res.once("end", () => {
|
||||
resolve({ kind: "response", status: res.statusCode ?? 0, body });
|
||||
});
|
||||
});
|
||||
req.end();
|
||||
});
|
||||
|
||||
expect(pendingUpgrade).toEqual({ kind: "error", code: "ECONNREFUSED" });
|
||||
|
||||
machineNameDelay.release();
|
||||
server = await startup;
|
||||
expect(server).toBeDefined();
|
||||
} finally {
|
||||
machineNameDelay.release();
|
||||
if (server) {
|
||||
await server.close();
|
||||
}
|
||||
if (previousMinimal === undefined) {
|
||||
delete process.env.OPENCLAW_TEST_MINIMAL_GATEWAY;
|
||||
} else {
|
||||
process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = previousMinimal;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
it("accepts an immediate websocket connection once startup resolves", async () => {
|
||||
machineNameDelay.reset();
|
||||
const previousMinimal = process.env.OPENCLAW_TEST_MINIMAL_GATEWAY;
|
||||
process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = "0";
|
||||
let server: Awaited<ReturnType<typeof startGatewayServer>> | undefined;
|
||||
let client: Awaited<ReturnType<typeof connectGatewayClient>> | undefined;
|
||||
try {
|
||||
const port = await getFreePort();
|
||||
machineNameDelay.release();
|
||||
server = await startGatewayServer(port, {
|
||||
auth: { mode: "none" },
|
||||
});
|
||||
|
||||
client = await connectGatewayClient({
|
||||
url: `ws://127.0.0.1:${port}`,
|
||||
timeoutMs: 5_000,
|
||||
timeoutMessage: "expected websocket connect to succeed immediately after startup",
|
||||
});
|
||||
|
||||
expect(client).toBeDefined();
|
||||
} finally {
|
||||
if (client) {
|
||||
await disconnectGatewayClient(client);
|
||||
}
|
||||
if (server) {
|
||||
await server.close();
|
||||
}
|
||||
if (previousMinimal === undefined) {
|
||||
delete process.env.OPENCLAW_TEST_MINIMAL_GATEWAY;
|
||||
} else {
|
||||
process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = previousMinimal;
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user