diff --git a/CHANGELOG.md b/CHANGELOG.md index 46f94afddd9..8999ae5c03c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ Docs: https://docs.openclaw.ai - Gateway/websocket broadcasts: require `operator.read` (or higher) for chat, agent, and tool-result event frames so pairing-scoped and node-role sessions no longer passively receive session chat content, and scope-gate unknown broadcast events by default. Plugin-defined `plugin.*` broadcasts are scoped to operator.write/admin, and status/transport events (`heartbeat`, `presence`, `tick`, etc.) remain unrestricted. Per-client sequence numbers preserve per-connection monotonicity. (#69373) Thanks @eleqtrizit. - Agents/compaction: always reload embedded Pi resources through an explicit loader and reapply reserve-token overrides so runs without extension factories no longer silently lose compaction settings before session start. (#67146) Thanks @ly85206559. - Memory-core/dreaming: normalize sweep timestamps and reuse hashed narrative session keys for fallback cleanup so Dreaming narrative sub-sessions stop leaking. (#67023) Thanks @chiyouYCH. - +- Gateway/startup: delay HTTP bind until websocket handlers are attached, so immediate post-startup websocket health/connect probes no longer hit the startup race window. (#43392) Thanks @dalefrieswthat. ## 2026.4.20 ### Changes diff --git a/src/cli/daemon-cli/lifecycle.test.ts b/src/cli/daemon-cli/lifecycle.test.ts index 20ee6a69a76..266d85e0775 100644 --- a/src/cli/daemon-cli/lifecycle.test.ts +++ b/src/cli/daemon-cli/lifecycle.test.ts @@ -39,16 +39,17 @@ const resolveGatewayPort = vi.hoisted(() => vi.fn((_cfg?: unknown, _env?: unknow const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []); const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>(); const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", ")); -const probeGateway = vi.fn< - (opts: { - url: string; - auth?: { token?: string; password?: string }; - timeoutMs: number; - }) => Promise<{ - ok: boolean; - configSnapshot: unknown; - }> ->(); +const probeGateway = + vi.fn< + (opts: { + url: string; + auth?: { token?: string; password?: string }; + timeoutMs: number; + }) => Promise<{ + ok: boolean; + configSnapshot: unknown; + }> + >(); const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true); const loadConfig = vi.hoisted(() => vi.fn(() => ({}))); const recoverInstalledLaunchAgent = vi.hoisted(() => vi.fn()); diff --git a/src/gateway/call.ts b/src/gateway/call.ts index 51a5d6cc4fc..ce2612b4eb8 100644 --- a/src/gateway/call.ts +++ b/src/gateway/call.ts @@ -406,7 +406,17 @@ function formatGatewayCloseError( const hint = code === 1006 ? "abnormal closure (no close frame)" : code === 1000 ? "normal closure" : ""; const suffix = hint ? ` ${hint}` : ""; - return `gateway closed (${code}${suffix}): ${reasonText}\n${connectionDetails.message}`; + let message = `gateway closed (${code}${suffix}): ${reasonText}\n${connectionDetails.message}`; + // Add troubleshooting hints for common issues + if (code === 1006) { + message += + "\n\nPossible causes:" + + "\n- Gateway not yet ready to accept connections (retry after a moment)" + + "\n- TLS mismatch (connecting with ws:// to a wss:// gateway, or vice versa)" + + "\n- Gateway crashed or was terminated unexpectedly" + + "\nRun `openclaw doctor` for diagnostics."; + } + return message; } function formatGatewayTimeoutError( diff --git a/src/gateway/server-close.test.ts b/src/gateway/server-close.test.ts index df399d5e197..3c74e34c431 100644 --- a/src/gateway/server-close.test.ts +++ b/src/gateway/server-close.test.ts @@ -205,4 +205,41 @@ describe("createGatewayCloseHandler", () => { await closeExpectation; expect(vi.getTimerCount()).toBe(0); }); + + it("ignores unbound http servers during shutdown", async () => { + const close = createGatewayCloseHandler({ + bonjourStop: null, + tailscaleCleanup: null, + canvasHost: null, + canvasHostServer: null, + stopChannel: vi.fn(async () => undefined), + pluginServices: null, + cron: { stop: vi.fn() }, + heartbeatRunner: { stop: vi.fn() } as never, + updateCheckStop: null, + nodePresenceTimers: new Map(), + broadcast: vi.fn(), + tickInterval: setInterval(() => undefined, 60_000), + healthInterval: setInterval(() => undefined, 60_000), + dedupeCleanup: setInterval(() => undefined, 60_000), + mediaCleanup: null, + agentUnsub: null, + heartbeatUnsub: null, + transcriptUnsub: null, + lifecycleUnsub: null, + chatRunState: { clear: vi.fn() }, + clients: new Set(), + configReloader: { stop: vi.fn(async () => undefined) }, + wss: { close: (cb: () => void) => cb() } as never, + httpServer: { + close: (cb: (err?: NodeJS.ErrnoException | null) => void) => + cb( + Object.assign(new Error("Server is not running."), { code: "ERR_SERVER_NOT_RUNNING" }), + ), + closeIdleConnections: vi.fn(), + } as never, + }); + + await expect(close({ reason: "startup failed before bind" })).resolves.toBeUndefined(); + }); }); diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index 5e0311e6381..cecb7114a5c 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -64,6 +64,15 @@ export async function runGatewayClosePrelude(params: { await params.closeMcpServer?.().catch(() => {}); } +function isServerNotRunningError(err: unknown): boolean { + return Boolean( + err && + typeof err === "object" && + "code" in err && + (err as { code?: unknown }).code === "ERR_SERVER_NOT_RUNNING", + ); +} + export function createGatewayCloseHandler(params: { bonjourStop: (() => Promise) | null; tailscaleCleanup: (() => Promise) | null; @@ -240,7 +249,13 @@ export function createGatewayCloseHandler(params: { httpServer.closeIdleConnections(); } const closePromise = new Promise((resolve, reject) => - httpServer.close((err) => (err ? reject(err) : resolve())), + httpServer.close((err) => { + if (!err || isServerNotRunningError(err)) { + resolve(); + return; + } + reject(err); + }), ); const httpGraceTimeout = createTimeoutRace(HTTP_CLOSE_GRACE_MS, () => false as const); const closedWithinGrace = await Promise.race([ diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 396e8b510eb..439fe864aad 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -1139,6 +1139,8 @@ export function attachGatewayUpgradeHandler(opts: { getResolvedAuth?: () => ResolvedGatewayAuth; /** Optional rate limiter for auth brute-force protection. */ rateLimiter?: AuthRateLimiter; + /** Optional logger for error diagnostics. */ + log?: { warn: (msg: string) => void }; }) { const { httpServer, @@ -1148,6 +1150,7 @@ export function attachGatewayUpgradeHandler(opts: { preauthConnectionBudget, resolvedAuth, rateLimiter, + log, } = opts; const getResolvedAuth = opts.getResolvedAuth ?? (() => resolvedAuth); httpServer.on("upgrade", (req, socket, head) => { @@ -1250,7 +1253,10 @@ export function attachGatewayUpgradeHandler(opts: { releaseUpgradeBudget(); throw new Error("gateway websocket upgrade failed"); } - })().catch(() => { + })().catch((err) => { + const remoteAddress = (socket as { remoteAddress?: string }).remoteAddress ?? "unknown"; + const errorMessage = err instanceof Error ? err.message : String(err); + log?.warn(`ws upgrade error from ${remoteAddress}: ${errorMessage}`); socket.destroy(); }); }); diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index 2058636c526..b77548ed615 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -84,6 +84,7 @@ export async function createGatewayRuntimeState(params: { httpServer: HttpServer; httpServers: HttpServer[]; httpBindHosts: string[]; + startListening: () => Promise; wss: WebSocketServer; preauthConnectionBudget: PreauthConnectionBudget; clients: Set; @@ -168,9 +169,18 @@ export async function createGatewayRuntimeState(params: { "Host-header origin fallback weakens origin checks and should only be used as break-glass.", ); } + // Create WebSocketServer first (with noServer: true) so we can attach upgrade handlers + // before HTTP servers start listening. This prevents a race condition where connections + // arrive before the upgrade handler is attached, which causes silent 1006 errors. + const wss = new WebSocketServer({ + noServer: true, + maxPayload: MAX_PREAUTH_PAYLOAD_BYTES, + }); + const preauthConnectionBudget = createPreauthConnectionBudget(); + const httpServers: HttpServer[] = []; const httpBindHosts: string[] = []; - for (const host of bindHosts) { + for (const _host of bindHosts) { const httpServer = createGatewayHttpServer({ canvasHost, clients, @@ -191,36 +201,9 @@ export async function createGatewayRuntimeState(params: { getReadiness: params.getReadiness, tlsOptions: params.gatewayTls?.enabled ? params.gatewayTls.tlsOptions : undefined, }); - try { - await listenGatewayHttpServer({ - httpServer, - bindHost: host, - port: params.port, - }); - httpServers.push(httpServer); - httpBindHosts.push(host); - } catch (err) { - if (host === bindHosts[0]) { - throw err; - } - params.log.warn( - `gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`, - ); - } - } - const httpServer = httpServers[0]; - if (!httpServer) { - throw new Error("Gateway HTTP server failed to start"); - } - - const wss = new WebSocketServer({ - noServer: true, - maxPayload: MAX_PREAUTH_PAYLOAD_BYTES, - }); - const preauthConnectionBudget = createPreauthConnectionBudget(); - for (const server of httpServers) { + // Attach upgrade handler BEFORE listening to prevent race condition attachGatewayUpgradeHandler({ - httpServer: server, + httpServer, wss, canvasHost, clients, @@ -228,9 +211,53 @@ export async function createGatewayRuntimeState(params: { resolvedAuth: params.resolvedAuth, getResolvedAuth: params.getResolvedAuth, rateLimiter: params.rateLimiter, + log: params.log, }); + httpServers.push(httpServer); } - + const httpServer = httpServers[0]; + if (!httpServer) { + throw new Error("Gateway HTTP server failed to start"); + } + let startListeningPromise: Promise | null = null; + const startListening = async (): Promise => { + if (startListeningPromise) { + await startListeningPromise; + return; + } + startListeningPromise = (async () => { + for (const [index, host] of bindHosts.entries()) { + const server = httpServers[index]; + if (!server) { + throw new Error(`Missing gateway HTTP server for bind host ${host}`); + } + try { + await listenGatewayHttpServer({ + httpServer: server, + bindHost: host, + port: params.port, + }); + httpBindHosts.push(host); + } catch (err) { + if (host === bindHosts[0]) { + throw err; + } + params.log.warn( + `gateway: failed to bind loopback alias ${host}:${params.port} (${String(err)})`, + ); + } + } + if (httpBindHosts.length === 0) { + throw new Error("Gateway HTTP server failed to start"); + } + })(); + try { + await startListeningPromise; + } catch (err) { + startListeningPromise = null; + throw err; + } + }; const agentRunSeq = new Map(); const dedupe = new Map(); const chatRunState = createChatRunState(); @@ -257,6 +284,7 @@ export async function createGatewayRuntimeState(params: { httpServer, httpServers, httpBindHosts, + startListening, wss, preauthConnectionBudget, clients, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 365fd6fed6c..77bc19f81b0 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -479,6 +479,7 @@ export async function startGatewayServer( httpServer, httpServers, httpBindHosts, + startListening, wss, preauthConnectionBudget, clients, @@ -526,7 +527,6 @@ export async function startGatewayServer( getReadiness, }), ); - startupTrace.mark("http.bound"); const { nodeRegistry, nodePresenceTimers, @@ -796,6 +796,8 @@ export async function startGatewayServer( broadcast, context: gatewayRequestContext, }); + await startListening(); + startupTrace.mark("http.bound"); ({ stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck, tailscaleCleanup: runtimeState.tailscaleCleanup, diff --git a/src/gateway/server.startup-websocket-race.test.ts b/src/gateway/server.startup-websocket-race.test.ts new file mode 100644 index 00000000000..8c18adec3f1 --- /dev/null +++ b/src/gateway/server.startup-websocket-race.test.ts @@ -0,0 +1,143 @@ +import http from "node:http"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { connectGatewayClient, disconnectGatewayClient } from "./test-helpers.e2e.js"; +import { getFreePort, installGatewayTestHooks, startGatewayServer } from "./test-helpers.js"; + +const machineNameDelay = vi.hoisted(() => { + let enteredResolve = () => {}; + let releaseResolve = () => {}; + let entered = new Promise((resolve) => { + enteredResolve = resolve; + }); + let release = new Promise((resolve) => { + releaseResolve = resolve; + }); + return { + waitUntilDelayed: async () => { + await entered; + }, + release: () => { + releaseResolve(); + }, + reset: () => { + entered = new Promise((resolve) => { + enteredResolve = resolve; + }); + release = new Promise((resolve) => { + releaseResolve = resolve; + }); + }, + run: async () => { + enteredResolve(); + await release; + return "test-machine"; + }, + }; +}); + +vi.mock("../infra/machine-name.js", () => ({ + getMachineDisplayName: () => machineNameDelay.run(), +})); + +installGatewayTestHooks({ scope: "suite" }); + +afterEach(() => { + machineNameDelay.release(); +}); + +describe("gateway startup websocket readiness", () => { + it("does not bind the websocket port until websocket handlers are attached", async () => { + machineNameDelay.reset(); + const previousMinimal = process.env.OPENCLAW_TEST_MINIMAL_GATEWAY; + process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = "0"; + let server: Awaited> | undefined; + try { + const port = await getFreePort(); + const startup = startGatewayServer(port, { + auth: { mode: "none" }, + }); + await machineNameDelay.waitUntilDelayed(); + + const pendingUpgrade = await new Promise< + { kind: "error"; code?: string } | { kind: "response"; status: number; body: string } + >((resolve) => { + const req = http.request({ + host: "127.0.0.1", + port, + path: "/", + headers: { + Connection: "Upgrade", + Upgrade: "websocket", + "Sec-WebSocket-Key": "dGVzdC1rZXktMDEyMzQ1Ng==", + "Sec-WebSocket-Version": "13", + }, + }); + req.once("error", (err) => { + resolve({ kind: "error", code: (err as NodeJS.ErrnoException).code }); + }); + req.once("response", (res) => { + let body = ""; + res.setEncoding("utf8"); + res.on("data", (chunk) => { + body += chunk; + }); + res.once("end", () => { + resolve({ kind: "response", status: res.statusCode ?? 0, body }); + }); + }); + req.end(); + }); + + expect(pendingUpgrade).toEqual({ kind: "error", code: "ECONNREFUSED" }); + + machineNameDelay.release(); + server = await startup; + expect(server).toBeDefined(); + } finally { + machineNameDelay.release(); + if (server) { + await server.close(); + } + if (previousMinimal === undefined) { + delete process.env.OPENCLAW_TEST_MINIMAL_GATEWAY; + } else { + process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = previousMinimal; + } + } + }); + + it("accepts an immediate websocket connection once startup resolves", async () => { + machineNameDelay.reset(); + const previousMinimal = process.env.OPENCLAW_TEST_MINIMAL_GATEWAY; + process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = "0"; + let server: Awaited> | undefined; + let client: Awaited> | undefined; + try { + const port = await getFreePort(); + machineNameDelay.release(); + server = await startGatewayServer(port, { + auth: { mode: "none" }, + }); + + client = await connectGatewayClient({ + url: `ws://127.0.0.1:${port}`, + timeoutMs: 5_000, + timeoutMessage: "expected websocket connect to succeed immediately after startup", + }); + + expect(client).toBeDefined(); + } finally { + if (client) { + await disconnectGatewayClient(client); + } + if (server) { + await server.close(); + } + if (previousMinimal === undefined) { + delete process.env.OPENCLAW_TEST_MINIMAL_GATEWAY; + } else { + process.env.OPENCLAW_TEST_MINIMAL_GATEWAY = previousMinimal; + } + } + }); +});