diff --git a/CHANGELOG.md b/CHANGELOG.md index c2c72156307..44912314c32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai - Status: show configured fallback models in `/status` and shared session status cards so per-agent fallback configuration is visible before a live failover happens. (#33111) Thanks @AnCoSONG. - Fireworks/FirePass: disable Kimi K2.5 Turbo reasoning output by forcing thinking off on the FirePass path and hardening the provider wrapper so hidden reasoning no longer leaks into visible replies. (#63607) Thanks @frankekn. - Sessions/model selection: preserve catalog-backed session model labels and keep already-qualified session model refs stable when catalog metadata is unavailable, so Control UI model selection survives reloads without bogus provider-prefixed values. (#61382) Thanks @Mule-ME. +- Gateway/startup: keep WebSocket RPC available while channels and plugin sidecars start, hold `chat.history` unavailable until startup sidecars finish so synchronous history reads cannot stall startup (reported in #63450), refresh advertised gateway methods after deferred plugin reloads, and enforce the pre-auth WebSocket upgrade budget before the no-handler 503 path so upgrade floods cannot bypass connection limits during that window. (#63480) Thanks @neeravmakwana. ## 2026.4.9 diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index 9ab47ccb66f..36e0316c611 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -265,6 +265,20 @@ function writeUpgradeAuthFailure( socket.write("HTTP/1.1 401 Unauthorized\r\nConnection: close\r\n\r\n"); } +function writeUpgradeServiceUnavailable( + socket: { write: (chunk: string) => void }, + responseBody: string, +) { + socket.write( + "HTTP/1.1 503 Service Unavailable\r\n" + + "Connection: close\r\n" + + "Content-Type: text/plain; charset=utf-8\r\n" + + `Content-Length: ${Buffer.byteLength(responseBody, "utf8")}\r\n` + + "\r\n" + + responseBody, + ); +} + export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise; type GatewayHttpRequestStage = { @@ -1050,29 +1064,15 @@ export function attachGatewayUpgradeHandler(opts: { } } const preauthBudgetKey = resolveRequestClientIp(req, trustedProxies, allowRealIpFallback); - if (wss.listenerCount("connection") === 0) { - const responseBody = "Gateway websocket handlers unavailable"; - socket.write( - "HTTP/1.1 503 Service Unavailable\r\n" + - "Connection: close\r\n" + - "Content-Type: text/plain; charset=utf-8\r\n" + - `Content-Length: ${Buffer.byteLength(responseBody, "utf8")}\r\n` + - "\r\n" + - responseBody, - ); + // Keep startup upgrades inside the pre-auth budget until WS handlers attach. + if (!preauthConnectionBudget.acquire(preauthBudgetKey)) { + writeUpgradeServiceUnavailable(socket, "Too many unauthenticated sockets"); socket.destroy(); return; } - if (!preauthConnectionBudget.acquire(preauthBudgetKey)) { - const responseBody = "Too many unauthenticated sockets"; - socket.write( - "HTTP/1.1 503 Service Unavailable\r\n" + - "Connection: close\r\n" + - "Content-Type: text/plain; charset=utf-8\r\n" + - `Content-Length: ${Buffer.byteLength(responseBody, "utf8")}\r\n` + - "\r\n" + - responseBody, - ); + if (wss.listenerCount("connection") === 0) { + preauthConnectionBudget.release(preauthBudgetKey); + writeUpgradeServiceUnavailable(socket, "Gateway websocket handlers unavailable"); socket.destroy(); return; } diff --git a/src/gateway/server-methods.control-plane-rate-limit.test.ts b/src/gateway/server-methods.control-plane-rate-limit.test.ts index 2b0247b04dd..f9611c60093 100644 --- a/src/gateway/server-methods.control-plane-rate-limit.test.ts +++ b/src/gateway/server-methods.control-plane-rate-limit.test.ts @@ -131,6 +131,31 @@ describe("gateway control-plane write rate limit", () => { expect(handlerCalls).toHaveBeenCalledTimes(4); }); + it("blocks startup-gated methods before dispatch", async () => { + const handlerCalls = vi.fn(); + const handler: GatewayRequestHandler = (opts) => { + handlerCalls(opts); + opts.respond(true, undefined, undefined); + }; + const context = { + ...buildContext(), + unavailableGatewayMethods: new Set(["chat.history"]), + } as Parameters[0]["context"]; + const client = buildClient(); + + const blocked = await runRequest({ method: "chat.history", context, client, handler }); + + expect(handlerCalls).not.toHaveBeenCalled(); + expect(blocked).toHaveBeenCalledWith( + false, + undefined, + expect.objectContaining({ + code: "UNAVAILABLE", + retryable: true, + }), + ); + }); + it("uses connId fallback when both device and client IP are unknown", () => { const key = resolveControlPlaneRateLimitKey({ connect: buildConnect(), diff --git a/src/gateway/server-methods.ts b/src/gateway/server-methods.ts index 295b190ab0a..93ce9481c29 100644 --- a/src/gateway/server-methods.ts +++ b/src/gateway/server-methods.ts @@ -106,6 +106,17 @@ export async function handleGatewayRequest( respond(false, undefined, authError); return; } + if (context.unavailableGatewayMethods?.has(req.method)) { + respond( + false, + undefined, + errorShape(ErrorCodes.UNAVAILABLE, `${req.method} unavailable during gateway startup`, { + retryable: true, + details: { method: req.method }, + }), + ); + return; + } if (CONTROL_PLANE_WRITE_METHODS.has(req.method)) { const budget = consumeControlPlaneWriteBudget({ client }); if (!budget.allowed) { diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index a7dc1f914f6..88d124a224c 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -105,6 +105,7 @@ export type GatewayRequestContext = { prompter: import("../../wizard/prompts.js").WizardPrompter, ) => Promise; broadcastVoiceWakeChanged: (triggers: string[]) => void; + unavailableGatewayMethods?: ReadonlySet; }; export type GatewayRequestOptions = { diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 4f7e2d1b394..dafc0b44264 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -641,8 +641,14 @@ export async function startGatewayServer( const channelRuntimeEnvs = Object.fromEntries( Object.entries(channelLogs).map(([id, logger]) => [id, runtimeForLogger(logger)]), ) as unknown as Record; - const channelMethods = listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []); - const gatewayMethods = Array.from(new Set([...baseGatewayMethods, ...channelMethods])); + const listActiveGatewayMethods = (nextBaseGatewayMethods: string[]) => + Array.from( + new Set([ + ...nextBaseGatewayMethods, + ...listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []), + ]), + ); + let gatewayMethods = listActiveGatewayMethods(baseGatewayMethods); let pluginServices: PluginServicesHandle | null = null; const runtimeConfig = await resolveGatewayRuntimeConfig({ cfg: cfgAtStart, @@ -1337,6 +1343,9 @@ export async function startGatewayServer( const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; + const unavailableGatewayMethods = new Set( + minimalTestGateway ? [] : ["chat.history"], + ); const gatewayRequestContext: import("./server-methods/types.js").GatewayRequestContext = { deps, cron, @@ -1433,13 +1442,26 @@ export async function startGatewayServer( markChannelLoggedOut, wizardRunner, broadcastVoiceWakeChanged, + unavailableGatewayMethods, }; - // Register a lazy fallback for plugin subagent dispatch in non-WS paths - // (Telegram polling, WhatsApp, etc.) so later runtime swaps can expose the - // current gateway context without relying on a startup snapshot. setFallbackGatewayContextResolver(() => gatewayRequestContext); + if (!minimalTestGateway) { + if (deferredConfiguredChannelPluginIds.length > 0) { + ({ pluginRegistry, gatewayMethods: baseGatewayMethods } = reloadDeferredGatewayPlugins({ + cfg: gatewayPluginConfigAtStart, + workspaceDir: defaultWorkspaceDir, + log, + coreGatewayHandlers, + baseMethods, + pluginIds: startupPluginIds, + logDiagnostics: false, + })); + gatewayMethods = listActiveGatewayMethods(baseGatewayMethods); + } + } + attachGatewayWsHandlers({ wss, clients, @@ -1467,6 +1489,21 @@ export async function startGatewayServer( broadcast, context: gatewayRequestContext, }); + + if (!minimalTestGateway) { + log.info("starting channels and sidecars..."); + ({ pluginServices } = await startGatewaySidecars({ + cfg: gatewayPluginConfigAtStart, + pluginRegistry, + defaultWorkspaceDir, + deps, + startChannels, + log, + logHooks, + logChannels, + })); + unavailableGatewayMethods.delete("chat.history"); + } logGatewayStartup({ cfg: cfgAtStart, bindHost, @@ -1499,31 +1536,6 @@ export async function startGatewayServer( logTailscale, }); - if (!minimalTestGateway) { - if (deferredConfiguredChannelPluginIds.length > 0) { - ({ pluginRegistry } = reloadDeferredGatewayPlugins({ - cfg: gatewayPluginConfigAtStart, - workspaceDir: defaultWorkspaceDir, - log, - coreGatewayHandlers, - baseMethods, - pluginIds: startupPluginIds, - logDiagnostics: false, - })); - } - log.info("starting channels and sidecars..."); - ({ pluginServices } = await startGatewaySidecars({ - cfg: gatewayPluginConfigAtStart, - pluginRegistry, - defaultWorkspaceDir, - deps, - startChannels, - log, - logHooks, - logChannels, - })); - } - // Run gateway_start plugin hook (fire-and-forget) if (!minimalTestGateway) { const hookRunner = getGlobalHookRunner(); diff --git a/src/gateway/server.preauth-hardening.test.ts b/src/gateway/server.preauth-hardening.test.ts index e52efde44fa..7cf5872d534 100644 --- a/src/gateway/server.preauth-hardening.test.ts +++ b/src/gateway/server.preauth-hardening.test.ts @@ -25,7 +25,7 @@ afterEach(async () => { }); describe("gateway pre-auth hardening", () => { - it("rejects upgrades before websocket handlers attach without consuming pre-auth budget", async () => { + it("rejects upgrades before websocket handlers attach (pre-auth budget enforced, then released)", async () => { const clients = new Set(); const resolvedAuth: ResolvedGatewayAuth = { mode: "none", allowTailscale: false }; const httpServer = createGatewayHttpServer({