fix: keep gateway RPC up during startup (#63480)

Thanks @neeravmakwana.
This commit is contained in:
Neerav Makwana
2026-04-09 07:39:18 -04:00
committed by GitHub
parent 12544e24d7
commit 5577e2d441
7 changed files with 101 additions and 51 deletions

View File

@@ -23,6 +23,7 @@ Docs: https://docs.openclaw.ai
- Status: show configured fallback models in `/status` and shared session status cards so per-agent fallback configuration is visible before a live failover happens. (#33111) Thanks @AnCoSONG.
- Fireworks/FirePass: disable Kimi K2.5 Turbo reasoning output by forcing thinking off on the FirePass path and hardening the provider wrapper so hidden reasoning no longer leaks into visible replies. (#63607) Thanks @frankekn.
- Sessions/model selection: preserve catalog-backed session model labels and keep already-qualified session model refs stable when catalog metadata is unavailable, so Control UI model selection survives reloads without bogus provider-prefixed values. (#61382) Thanks @Mule-ME.
- Gateway/startup: keep WebSocket RPC available while channels and plugin sidecars start, hold `chat.history` unavailable until startup sidecars finish so synchronous history reads cannot stall startup (reported in #63450), refresh advertised gateway methods after deferred plugin reloads, and enforce the pre-auth WebSocket upgrade budget before the no-handler 503 path so upgrade floods cannot bypass connection limits during that window. (#63480) Thanks @neeravmakwana.
## 2026.4.9

View File

@@ -265,6 +265,20 @@ function writeUpgradeAuthFailure(
socket.write("HTTP/1.1 401 Unauthorized\r\nConnection: close\r\n\r\n");
}
function writeUpgradeServiceUnavailable(
socket: { write: (chunk: string) => void },
responseBody: string,
) {
socket.write(
"HTTP/1.1 503 Service Unavailable\r\n" +
"Connection: close\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
`Content-Length: ${Buffer.byteLength(responseBody, "utf8")}\r\n` +
"\r\n" +
responseBody,
);
}
export type HooksRequestHandler = (req: IncomingMessage, res: ServerResponse) => Promise<boolean>;
type GatewayHttpRequestStage = {
@@ -1050,29 +1064,15 @@ export function attachGatewayUpgradeHandler(opts: {
}
}
const preauthBudgetKey = resolveRequestClientIp(req, trustedProxies, allowRealIpFallback);
if (wss.listenerCount("connection") === 0) {
const responseBody = "Gateway websocket handlers unavailable";
socket.write(
"HTTP/1.1 503 Service Unavailable\r\n" +
"Connection: close\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
`Content-Length: ${Buffer.byteLength(responseBody, "utf8")}\r\n` +
"\r\n" +
responseBody,
);
// Keep startup upgrades inside the pre-auth budget until WS handlers attach.
if (!preauthConnectionBudget.acquire(preauthBudgetKey)) {
writeUpgradeServiceUnavailable(socket, "Too many unauthenticated sockets");
socket.destroy();
return;
}
if (!preauthConnectionBudget.acquire(preauthBudgetKey)) {
const responseBody = "Too many unauthenticated sockets";
socket.write(
"HTTP/1.1 503 Service Unavailable\r\n" +
"Connection: close\r\n" +
"Content-Type: text/plain; charset=utf-8\r\n" +
`Content-Length: ${Buffer.byteLength(responseBody, "utf8")}\r\n` +
"\r\n" +
responseBody,
);
if (wss.listenerCount("connection") === 0) {
preauthConnectionBudget.release(preauthBudgetKey);
writeUpgradeServiceUnavailable(socket, "Gateway websocket handlers unavailable");
socket.destroy();
return;
}

View File

@@ -131,6 +131,31 @@ describe("gateway control-plane write rate limit", () => {
expect(handlerCalls).toHaveBeenCalledTimes(4);
});
it("blocks startup-gated methods before dispatch", async () => {
const handlerCalls = vi.fn();
const handler: GatewayRequestHandler = (opts) => {
handlerCalls(opts);
opts.respond(true, undefined, undefined);
};
const context = {
...buildContext(),
unavailableGatewayMethods: new Set(["chat.history"]),
} as Parameters<typeof handleGatewayRequest>[0]["context"];
const client = buildClient();
const blocked = await runRequest({ method: "chat.history", context, client, handler });
expect(handlerCalls).not.toHaveBeenCalled();
expect(blocked).toHaveBeenCalledWith(
false,
undefined,
expect.objectContaining({
code: "UNAVAILABLE",
retryable: true,
}),
);
});
it("uses connId fallback when both device and client IP are unknown", () => {
const key = resolveControlPlaneRateLimitKey({
connect: buildConnect(),

View File

@@ -106,6 +106,17 @@ export async function handleGatewayRequest(
respond(false, undefined, authError);
return;
}
if (context.unavailableGatewayMethods?.has(req.method)) {
respond(
false,
undefined,
errorShape(ErrorCodes.UNAVAILABLE, `${req.method} unavailable during gateway startup`, {
retryable: true,
details: { method: req.method },
}),
);
return;
}
if (CONTROL_PLANE_WRITE_METHODS.has(req.method)) {
const budget = consumeControlPlaneWriteBudget({ client });
if (!budget.allowed) {

View File

@@ -105,6 +105,7 @@ export type GatewayRequestContext = {
prompter: import("../../wizard/prompts.js").WizardPrompter,
) => Promise<void>;
broadcastVoiceWakeChanged: (triggers: string[]) => void;
unavailableGatewayMethods?: ReadonlySet<string>;
};
export type GatewayRequestOptions = {

View File

@@ -641,8 +641,14 @@ export async function startGatewayServer(
const channelRuntimeEnvs = Object.fromEntries(
Object.entries(channelLogs).map(([id, logger]) => [id, runtimeForLogger(logger)]),
) as unknown as Record<ChannelId, RuntimeEnv>;
const channelMethods = listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []);
const gatewayMethods = Array.from(new Set([...baseGatewayMethods, ...channelMethods]));
const listActiveGatewayMethods = (nextBaseGatewayMethods: string[]) =>
Array.from(
new Set([
...nextBaseGatewayMethods,
...listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []),
]),
);
let gatewayMethods = listActiveGatewayMethods(baseGatewayMethods);
let pluginServices: PluginServicesHandle | null = null;
const runtimeConfig = await resolveGatewayRuntimeConfig({
cfg: cfgAtStart,
@@ -1337,6 +1343,9 @@ export async function startGatewayServer(
const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port;
const unavailableGatewayMethods = new Set<string>(
minimalTestGateway ? [] : ["chat.history"],
);
const gatewayRequestContext: import("./server-methods/types.js").GatewayRequestContext = {
deps,
cron,
@@ -1433,13 +1442,26 @@ export async function startGatewayServer(
markChannelLoggedOut,
wizardRunner,
broadcastVoiceWakeChanged,
unavailableGatewayMethods,
};
// Register a lazy fallback for plugin subagent dispatch in non-WS paths
// (Telegram polling, WhatsApp, etc.) so later runtime swaps can expose the
// current gateway context without relying on a startup snapshot.
setFallbackGatewayContextResolver(() => gatewayRequestContext);
if (!minimalTestGateway) {
if (deferredConfiguredChannelPluginIds.length > 0) {
({ pluginRegistry, gatewayMethods: baseGatewayMethods } = reloadDeferredGatewayPlugins({
cfg: gatewayPluginConfigAtStart,
workspaceDir: defaultWorkspaceDir,
log,
coreGatewayHandlers,
baseMethods,
pluginIds: startupPluginIds,
logDiagnostics: false,
}));
gatewayMethods = listActiveGatewayMethods(baseGatewayMethods);
}
}
attachGatewayWsHandlers({
wss,
clients,
@@ -1467,6 +1489,21 @@ export async function startGatewayServer(
broadcast,
context: gatewayRequestContext,
});
if (!minimalTestGateway) {
log.info("starting channels and sidecars...");
({ pluginServices } = await startGatewaySidecars({
cfg: gatewayPluginConfigAtStart,
pluginRegistry,
defaultWorkspaceDir,
deps,
startChannels,
log,
logHooks,
logChannels,
}));
unavailableGatewayMethods.delete("chat.history");
}
logGatewayStartup({
cfg: cfgAtStart,
bindHost,
@@ -1499,31 +1536,6 @@ export async function startGatewayServer(
logTailscale,
});
if (!minimalTestGateway) {
if (deferredConfiguredChannelPluginIds.length > 0) {
({ pluginRegistry } = reloadDeferredGatewayPlugins({
cfg: gatewayPluginConfigAtStart,
workspaceDir: defaultWorkspaceDir,
log,
coreGatewayHandlers,
baseMethods,
pluginIds: startupPluginIds,
logDiagnostics: false,
}));
}
log.info("starting channels and sidecars...");
({ pluginServices } = await startGatewaySidecars({
cfg: gatewayPluginConfigAtStart,
pluginRegistry,
defaultWorkspaceDir,
deps,
startChannels,
log,
logHooks,
logChannels,
}));
}
// Run gateway_start plugin hook (fire-and-forget)
if (!minimalTestGateway) {
const hookRunner = getGlobalHookRunner();

View File

@@ -25,7 +25,7 @@ afterEach(async () => {
});
describe("gateway pre-auth hardening", () => {
it("rejects upgrades before websocket handlers attach without consuming pre-auth budget", async () => {
it("rejects upgrades before websocket handlers attach (pre-auth budget enforced, then released)", async () => {
const clients = new Set<GatewayWsClient>();
const resolvedAuth: ResolvedGatewayAuth = { mode: "none", allowTailscale: false };
const httpServer = createGatewayHttpServer({