From 72072d8b2eba1ec1a2c7d43aae70d6b01932daaa Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 3 May 2026 14:54:36 +0100 Subject: [PATCH] perf(gateway): lazy load ws and aux handlers --- src/gateway/server-aux-handlers.ts | 312 +++++++++++------- src/gateway/server-import-boundary.test.ts | 8 + src/gateway/server.impl.ts | 3 +- src/gateway/server/ws-connection.test.ts | 13 +- src/gateway/server/ws-connection.ts | 47 ++- .../server/ws-connection/message-handler.ts | 6 +- 6 files changed, 251 insertions(+), 138 deletions(-) diff --git a/src/gateway/server-aux-handlers.ts b/src/gateway/server-aux-handlers.ts index fba7148d764..05145478bde 100644 --- a/src/gateway/server-aux-handlers.ts +++ b/src/gateway/server-aux-handlers.ts @@ -18,9 +18,7 @@ import { } from "./config-reload-plan.js"; import { createExecApprovalIosPushDelivery } from "./exec-approval-ios-push.js"; import { ExecApprovalManager } from "./exec-approval-manager.js"; -import { createExecApprovalHandlers } from "./server-methods/exec-approval.js"; -import { createPluginApprovalHandlers } from "./server-methods/plugin-approval.js"; -import { createSecretsHandlers } from "./server-methods/secrets.js"; +import type { GatewayRequestHandler, GatewayRequestHandlers } from "./server-methods/types.js"; import { disconnectStaleSharedGatewayAuthClients, setCurrentSharedGatewaySessionGeneration, @@ -39,6 +37,20 @@ type ReloadSecretsResult = { warningCount: number; }; +function createLazyHandler( + method: string, + loadHandlers: () => Promise, +): GatewayRequestHandler { + return async (opts) => { + const handlers = await loadHandlers(); + const handler = handlers[method]; + if (!handler) { + throw new Error(`lazy gateway handler not found: ${method}`); + } + await handler(opts); + }; +} + export function createGatewayAuxHandlers(params: { log: GatewayAuxHandlerLogger; activateRuntimeSecrets: ActivateRuntimeSecrets; @@ -53,15 +65,25 @@ export function createGatewayAuxHandlers(params: { const execApprovalManager = new ExecApprovalManager(); const execApprovalForwarder = createExecApprovalForwarder(); const execApprovalIosPushDelivery = createExecApprovalIosPushDelivery({ log: params.log }); - const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, { - forwarder: execApprovalForwarder, - iosPushDelivery: execApprovalIosPushDelivery, - }); + let execApprovalHandlersPromise: Promise | null = null; + const loadExecApprovalHandlers = () => + (execApprovalHandlersPromise ??= import("./server-methods/exec-approval.js").then( + ({ createExecApprovalHandlers }) => + createExecApprovalHandlers(execApprovalManager, { + forwarder: execApprovalForwarder, + iosPushDelivery: execApprovalIosPushDelivery, + }), + )); const buildReloadPlan = params.buildReloadPlan ?? buildGatewayReloadPlan; const pluginApprovalManager = new ExecApprovalManager(); - const pluginApprovalHandlers = createPluginApprovalHandlers(pluginApprovalManager, { - forwarder: execApprovalForwarder, - }); + let pluginApprovalHandlersPromise: Promise | null = null; + const loadPluginApprovalHandlers = () => + (pluginApprovalHandlersPromise ??= import("./server-methods/plugin-approval.js").then( + ({ createPluginApprovalHandlers }) => + createPluginApprovalHandlers(pluginApprovalManager, { + forwarder: execApprovalForwarder, + }), + )); // Serialize the entire `secrets.reload` path (activation + channel restart) // so concurrent callers cannot overlap the stop/start loop and so the // "before" snapshot used for the reload-plan diff is always the snapshot @@ -83,132 +105,168 @@ export function createGatewayAuxHandlers(params: { reloadInFlight = run; return run; }; - const secretsHandlers = createSecretsHandlers({ - reloadSecrets: () => - runExclusiveReload(async () => { - const previousSnapshot = getActiveSecretsRuntimeSnapshot(); - if (!previousSnapshot) { - throw new Error("Secrets runtime snapshot is not active."); - } - // Snapshot both `current` and `required` because - // `setCurrentSharedGatewaySessionGeneration` can clear `required` as - // a side effect of activating a new generation. Restoring only - // `current` on rollback would leave `required` cleared and weaken - // shared-gateway auth-generation enforcement after a failed reload. - const previousSharedGatewaySessionGeneration = - params.sharedGatewaySessionGenerationState.current; - const previousSharedGatewaySessionGenerationRequired = - params.sharedGatewaySessionGenerationState.required; - let nextSharedGatewaySessionGeneration = previousSharedGatewaySessionGeneration; - let sharedGatewaySessionGenerationChanged = false; - const stoppedChannels: ChannelKind[] = []; - const restartedChannels = new Set(); - try { - const prepared = await params.activateRuntimeSecrets(previousSnapshot.sourceConfig, { - reason: "reload", - activate: true, - }); - nextSharedGatewaySessionGeneration = - params.resolveSharedGatewaySessionGenerationForConfig(prepared.config); - const plan = buildReloadPlan(diffConfigPaths(previousSnapshot.config, prepared.config)); - setCurrentSharedGatewaySessionGeneration( - params.sharedGatewaySessionGenerationState, - nextSharedGatewaySessionGeneration, - ); - sharedGatewaySessionGenerationChanged = - previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration; - if (sharedGatewaySessionGenerationChanged) { - disconnectStaleSharedGatewayAuthClients({ - clients: params.clients, - expectedGeneration: nextSharedGatewaySessionGeneration, - }); - } - if (plan.restartChannels.size > 0) { - const restartChannels = [...plan.restartChannels]; - if ( - isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) || - isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS) - ) { - throw new Error( - `secrets.reload requires restarting channels: ${restartChannels.join(", ")}`, - ); - } - const restartFailures: ChannelKind[] = []; - for (const channel of restartChannels) { - params.logChannels.info(`restarting ${channel} channel after secrets reload`); - // Track for rollback before awaiting stopChannel: if stopChannel - // throws after partially stopping the channel (for example, a - // plugin hook rejects after the runtime already closed the - // socket), we still need the outer catch to attempt restart so - // the channel is not left down after a failed reload. - stoppedChannels.push(channel); + let secretsHandlersPromise: Promise | null = null; + const loadSecretsHandlers = () => + (secretsHandlersPromise ??= import("./server-methods/secrets.js").then( + ({ createSecretsHandlers }) => + createSecretsHandlers({ + reloadSecrets: () => + runExclusiveReload(async () => { + const previousSnapshot = getActiveSecretsRuntimeSnapshot(); + if (!previousSnapshot) { + throw new Error("Secrets runtime snapshot is not active."); + } + // Snapshot both `current` and `required` because + // `setCurrentSharedGatewaySessionGeneration` can clear `required` as + // a side effect of activating a new generation. Restoring only + // `current` on rollback would leave `required` cleared and weaken + // shared-gateway auth-generation enforcement after a failed reload. + const previousSharedGatewaySessionGeneration = + params.sharedGatewaySessionGenerationState.current; + const previousSharedGatewaySessionGenerationRequired = + params.sharedGatewaySessionGenerationState.required; + let nextSharedGatewaySessionGeneration = previousSharedGatewaySessionGeneration; + let sharedGatewaySessionGenerationChanged = false; + const stoppedChannels: ChannelKind[] = []; + const restartedChannels = new Set(); try { - await params.stopChannel(channel); - await params.startChannel(channel); - restartedChannels.add(channel); - } catch { - params.logChannels.info( - `failed to restart ${channel} channel after secrets reload`, + const prepared = await params.activateRuntimeSecrets( + previousSnapshot.sourceConfig, + { + reason: "reload", + activate: true, + }, ); - restartFailures.push(channel); + nextSharedGatewaySessionGeneration = + params.resolveSharedGatewaySessionGenerationForConfig(prepared.config); + const plan = buildReloadPlan( + diffConfigPaths(previousSnapshot.config, prepared.config), + ); + setCurrentSharedGatewaySessionGeneration( + params.sharedGatewaySessionGenerationState, + nextSharedGatewaySessionGeneration, + ); + sharedGatewaySessionGenerationChanged = + previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration; + if (sharedGatewaySessionGenerationChanged) { + disconnectStaleSharedGatewayAuthClients({ + clients: params.clients, + expectedGeneration: nextSharedGatewaySessionGeneration, + }); + } + if (plan.restartChannels.size > 0) { + const restartChannels = [...plan.restartChannels]; + if ( + isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) || + isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS) + ) { + throw new Error( + `secrets.reload requires restarting channels: ${restartChannels.join(", ")}`, + ); + } + const restartFailures: ChannelKind[] = []; + for (const channel of restartChannels) { + params.logChannels.info(`restarting ${channel} channel after secrets reload`); + // Track for rollback before awaiting stopChannel: if stopChannel + // throws after partially stopping the channel (for example, a + // plugin hook rejects after the runtime already closed the + // socket), we still need the outer catch to attempt restart so + // the channel is not left down after a failed reload. + stoppedChannels.push(channel); + try { + await params.stopChannel(channel); + await params.startChannel(channel); + restartedChannels.add(channel); + } catch { + params.logChannels.info( + `failed to restart ${channel} channel after secrets reload`, + ); + restartFailures.push(channel); + } + } + if (restartFailures.length > 0) { + throw new Error( + `failed to restart channels after secrets reload: ${restartFailures.join(", ")}`, + ); + } + } + return { warningCount: prepared.warnings.length }; + } catch (err) { + activateSecretsRuntimeSnapshot(previousSnapshot); + params.sharedGatewaySessionGenerationState.current = + previousSharedGatewaySessionGeneration; + params.sharedGatewaySessionGenerationState.required = + previousSharedGatewaySessionGenerationRequired; + if (sharedGatewaySessionGenerationChanged) { + disconnectStaleSharedGatewayAuthClients({ + clients: params.clients, + expectedGeneration: previousSharedGatewaySessionGeneration, + }); + } + for (const channel of stoppedChannels) { + params.logChannels.info( + `rolling back ${channel} channel after secrets reload failure`, + ); + try { + if (restartedChannels.has(channel)) { + await params.stopChannel(channel); + } + await params.startChannel(channel); + } catch { + params.logChannels.info( + `failed to roll back ${channel} channel after secrets reload`, + ); + } + } + throw err; } + }), + log: params.log, + resolveSecrets: async ({ commandName, targetIds }) => { + const { assignments, diagnostics, inactiveRefPaths } = + resolveCommandSecretsFromActiveRuntimeSnapshot({ + commandName, + targetIds: new Set(targetIds), + }); + if (assignments.length === 0) { + return { + assignments: [] as CommandSecretAssignment[], + diagnostics, + inactiveRefPaths, + }; } - if (restartFailures.length > 0) { - throw new Error( - `failed to restart channels after secrets reload: ${restartFailures.join(", ")}`, - ); - } - } - return { warningCount: prepared.warnings.length }; - } catch (err) { - activateSecretsRuntimeSnapshot(previousSnapshot); - params.sharedGatewaySessionGenerationState.current = - previousSharedGatewaySessionGeneration; - params.sharedGatewaySessionGenerationState.required = - previousSharedGatewaySessionGenerationRequired; - if (sharedGatewaySessionGenerationChanged) { - disconnectStaleSharedGatewayAuthClients({ - clients: params.clients, - expectedGeneration: previousSharedGatewaySessionGeneration, - }); - } - for (const channel of stoppedChannels) { - params.logChannels.info(`rolling back ${channel} channel after secrets reload failure`); - try { - if (restartedChannels.has(channel)) { - await params.stopChannel(channel); - } - await params.startChannel(channel); - } catch { - params.logChannels.info( - `failed to roll back ${channel} channel after secrets reload`, - ); - } - } - throw err; - } - }), - log: params.log, - resolveSecrets: async ({ commandName, targetIds }) => { - const { assignments, diagnostics, inactiveRefPaths } = - resolveCommandSecretsFromActiveRuntimeSnapshot({ - commandName, - targetIds: new Set(targetIds), - }); - if (assignments.length === 0) { - return { assignments: [] as CommandSecretAssignment[], diagnostics, inactiveRefPaths }; - } - return { assignments, diagnostics, inactiveRefPaths }; - }, - }); + return { assignments, diagnostics, inactiveRefPaths }; + }, + }), + )); return { execApprovalManager, pluginApprovalManager, extraHandlers: { - ...execApprovalHandlers, - ...pluginApprovalHandlers, - ...secretsHandlers, + "exec.approval.get": createLazyHandler("exec.approval.get", loadExecApprovalHandlers), + "exec.approval.list": createLazyHandler("exec.approval.list", loadExecApprovalHandlers), + "exec.approval.request": createLazyHandler("exec.approval.request", loadExecApprovalHandlers), + "exec.approval.waitDecision": createLazyHandler( + "exec.approval.waitDecision", + loadExecApprovalHandlers, + ), + "exec.approval.resolve": createLazyHandler("exec.approval.resolve", loadExecApprovalHandlers), + "plugin.approval.list": createLazyHandler("plugin.approval.list", loadPluginApprovalHandlers), + "plugin.approval.request": createLazyHandler( + "plugin.approval.request", + loadPluginApprovalHandlers, + ), + "plugin.approval.waitDecision": createLazyHandler( + "plugin.approval.waitDecision", + loadPluginApprovalHandlers, + ), + "plugin.approval.resolve": createLazyHandler( + "plugin.approval.resolve", + loadPluginApprovalHandlers, + ), + "secrets.reload": createLazyHandler("secrets.reload", loadSecretsHandlers), + "secrets.resolve": createLazyHandler("secrets.resolve", loadSecretsHandlers), }, }; } diff --git a/src/gateway/server-import-boundary.test.ts b/src/gateway/server-import-boundary.test.ts index 5761aa2026d..0f986c03950 100644 --- a/src/gateway/server-import-boundary.test.ts +++ b/src/gateway/server-import-boundary.test.ts @@ -30,6 +30,14 @@ describe("gateway startup import boundaries", () => { expect(serverImpl).not.toContain('from "../tasks/task-registry.js"'); expect(serverImpl).not.toContain('from "../tasks/task-registry.maintenance.js"'); expect(serverImpl).toContain('import("../tasks/task-registry.maintenance.js")'); + const wsConnection = readSource("src/gateway/server/ws-connection.ts"); + expect(wsConnection).not.toMatch( + /import\s+\{[^}]*attachGatewayWsMessageHandler[^}]*\}\s+from "\.\/ws-connection\/message-handler\.js"/s, + ); + expect(wsConnection).toContain('import("./ws-connection/message-handler.js")'); + expect(readSource("src/gateway/server-aux-handlers.ts")).not.toMatch( + /import\s+\{[^}]*create(?:Exec|Plugin|Secrets)[^}]*\}\s+from "\.\/server-methods\//s, + ); expect(validation).not.toContain("legacy-secretref-env-marker"); expect(validation).not.toContain("commands/doctor"); }); diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 6f5df6476b7..52e4de73200 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -67,6 +67,7 @@ import { createLazyGatewayCronState } from "./server-cron-lazy.js"; import { applyGatewayLaneConcurrency } from "./server-lanes.js"; import { createGatewayServerLiveState, type GatewayServerLiveState } from "./server-live-state.js"; import { GATEWAY_EVENTS } from "./server-methods-list.js"; +import type { GatewayRequestHandlers } from "./server-methods/types.js"; import { loadGatewayModelCatalog } from "./server-model-catalog.js"; import { bootstrapGatewayNetworkRuntime } from "./server-network-runtime.js"; import { createGatewayNodeSessionRuntime } from "./server-node-session-runtime.js"; @@ -1016,7 +1017,7 @@ export async function startGatewayServer( stopChannel, logChannels, }); - const attachedGatewayExtraHandlers = { + const attachedGatewayExtraHandlers: GatewayRequestHandlers = { ...pluginRegistry.gatewayHandlers, ...extraHandlers, }; diff --git a/src/gateway/server/ws-connection.test.ts b/src/gateway/server/ws-connection.test.ts index c21593cd457..3e15cd44e3d 100644 --- a/src/gateway/server/ws-connection.test.ts +++ b/src/gateway/server/ws-connection.test.ts @@ -31,6 +31,10 @@ function createResolvedAuth(token: string): ResolvedGatewayAuth { }; } +async function waitForLazyMessageHandler() { + await vi.dynamicImportSettled(); +} + describe("attachGatewayWsConnectionHandler", () => { beforeEach(() => { attachGatewayWsMessageHandlerMock.mockReset(); @@ -40,7 +44,7 @@ describe("attachGatewayWsConnectionHandler", () => { vi.useRealTimers(); }); - it("threads current auth getters into the handshake handler instead of a stale snapshot", () => { + it("threads current auth getters into the handshake handler instead of a stale snapshot", async () => { const listeners = new Map void>(); const wss = { on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { @@ -91,6 +95,7 @@ describe("attachGatewayWsConnectionHandler", () => { const onConnection = listeners.get("connection"); expect(onConnection).toBeTypeOf("function"); onConnection?.(socket, upgradeReq); + await waitForLazyMessageHandler(); expect(attachGatewayWsMessageHandlerMock).toHaveBeenCalledTimes(1); const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as { @@ -106,7 +111,7 @@ describe("attachGatewayWsConnectionHandler", () => { ); }); - it("rejects late client registration after a pre-connect socket close", () => { + it("rejects late client registration after a pre-connect socket close", async () => { const listeners = new Map void>(); const wss = { on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { @@ -156,6 +161,7 @@ describe("attachGatewayWsConnectionHandler", () => { const onConnection = listeners.get("connection"); expect(onConnection).toBeTypeOf("function"); onConnection?.(socket, upgradeReq); + await waitForLazyMessageHandler(); const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as { setClient: (client: unknown) => boolean; @@ -173,7 +179,7 @@ describe("attachGatewayWsConnectionHandler", () => { expect(clients.size).toBe(0); }); - it("sends protocol pings until the connection closes", () => { + it("sends protocol pings until the connection closes", async () => { vi.useFakeTimers(); const listeners = new Map void>(); const wss = { @@ -224,6 +230,7 @@ describe("attachGatewayWsConnectionHandler", () => { const onConnection = listeners.get("connection"); expect(onConnection).toBeTypeOf("function"); onConnection?.(socket, upgradeReq); + await waitForLazyMessageHandler(); const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as { setClient: (client: unknown) => boolean; diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index 95a0bf9ffa3..22d9ebae3be 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -1,6 +1,6 @@ import { randomUUID } from "node:crypto"; import type { Socket } from "node:net"; -import type { WebSocket, WebSocketServer } from "ws"; +import type { RawData, WebSocket, WebSocketServer } from "ws"; import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js"; import { removeRemoteNodeInfo } from "../../infra/skills-remote.js"; import { upsertPresence } from "../../infra/system-presence.js"; @@ -21,9 +21,9 @@ import { logWs } from "../ws-log.js"; import { getHealthVersion, incrementPresenceVersion } from "./health-state.js"; import type { PreauthConnectionBudget } from "./preauth-connection-budget.js"; import { broadcastPresenceSnapshot } from "./presence-events.js"; -import { - attachGatewayWsMessageHandler, - type WsOriginCheckMetrics, +import type { + GatewayWsMessageHandlerParams, + WsOriginCheckMetrics, } from "./ws-connection/message-handler.js"; import { resolveSharedGatewaySessionGeneration } from "./ws-shared-generation.js"; import type { GatewayWsClient } from "./ws-types.js"; @@ -32,6 +32,7 @@ type SubsystemLogger = ReturnType; const LOG_HEADER_MAX_LEN = 300; const LOG_HEADER_FORMAT_REGEX = /\p{Cf}/gu; +const MAX_QUEUED_MESSAGE_HANDLER_FRAMES = 16; function replaceControlChars(value: string): string { let cleaned = ""; @@ -154,6 +155,42 @@ export type AttachGatewayWsConnectionHandlerParams = GatewayWsSharedHandlerParam buildRequestContext: () => GatewayRequestContext; }; +function attachGatewayWsMessageHandlerOnDemand(params: GatewayWsMessageHandlerParams): void { + const queued: RawData[] = []; + const queueMessage = (data: RawData) => { + if (queued.length >= MAX_QUEUED_MESSAGE_HANDLER_FRAMES) { + params.setCloseCause("message-handler-loading-overflow", { + queuedFrames: queued.length, + }); + params.close(1008, "gateway message handler loading"); + return; + } + queued.push(data); + }; + params.socket.on("message", queueMessage); + void import("./ws-connection/message-handler.js") + .then(({ attachGatewayWsMessageHandler }) => { + params.socket.off("message", queueMessage); + if (params.isClosed()) { + return; + } + attachGatewayWsMessageHandler(params); + for (const data of queued) { + params.socket.emit("message", data); + } + }) + .catch((error: unknown) => { + params.socket.off("message", queueMessage); + params.setCloseCause("message-handler-load-failed", { + error: formatError(error), + }); + params.logWsControl.warn( + `failed to load ws message handler conn=${params.connId}: ${formatError(error)}`, + ); + params.close(1011, "gateway message handler unavailable"); + }); +} + export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnectionHandlerParams) { const { wss, @@ -390,7 +427,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti } }, handshakeTimeoutMs); - attachGatewayWsMessageHandler({ + attachGatewayWsMessageHandlerOnDemand({ socket, upgradeReq, connId, diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index 88ed1a5f849..9cd048aa56d 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -173,7 +173,7 @@ function resolvePinnedClientMetadata(params: { }; } -export function attachGatewayWsMessageHandler(params: { +export type GatewayWsMessageHandlerParams = { socket: WebSocket; upgradeReq: IncomingMessage; connId: string; @@ -214,7 +214,9 @@ export function attachGatewayWsMessageHandler(params: { logGateway: SubsystemLogger; logHealth: SubsystemLogger; logWsControl: SubsystemLogger; -}) { +}; + +export function attachGatewayWsMessageHandler(params: GatewayWsMessageHandlerParams) { const { socket, upgradeReq,