perf(gateway): lazy load ws and aux handlers

This commit is contained in:
Peter Steinberger
2026-05-03 14:54:36 +01:00
parent bd79b25bb7
commit 72072d8b2e
6 changed files with 251 additions and 138 deletions

View File

@@ -18,9 +18,7 @@ import {
} from "./config-reload-plan.js";
import { createExecApprovalIosPushDelivery } from "./exec-approval-ios-push.js";
import { ExecApprovalManager } from "./exec-approval-manager.js";
import { createExecApprovalHandlers } from "./server-methods/exec-approval.js";
import { createPluginApprovalHandlers } from "./server-methods/plugin-approval.js";
import { createSecretsHandlers } from "./server-methods/secrets.js";
import type { GatewayRequestHandler, GatewayRequestHandlers } from "./server-methods/types.js";
import {
disconnectStaleSharedGatewayAuthClients,
setCurrentSharedGatewaySessionGeneration,
@@ -39,6 +37,20 @@ type ReloadSecretsResult = {
warningCount: number;
};
function createLazyHandler(
method: string,
loadHandlers: () => Promise<GatewayRequestHandlers>,
): GatewayRequestHandler {
return async (opts) => {
const handlers = await loadHandlers();
const handler = handlers[method];
if (!handler) {
throw new Error(`lazy gateway handler not found: ${method}`);
}
await handler(opts);
};
}
export function createGatewayAuxHandlers(params: {
log: GatewayAuxHandlerLogger;
activateRuntimeSecrets: ActivateRuntimeSecrets;
@@ -53,15 +65,25 @@ export function createGatewayAuxHandlers(params: {
const execApprovalManager = new ExecApprovalManager();
const execApprovalForwarder = createExecApprovalForwarder();
const execApprovalIosPushDelivery = createExecApprovalIosPushDelivery({ log: params.log });
const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, {
forwarder: execApprovalForwarder,
iosPushDelivery: execApprovalIosPushDelivery,
});
let execApprovalHandlersPromise: Promise<GatewayRequestHandlers> | null = null;
const loadExecApprovalHandlers = () =>
(execApprovalHandlersPromise ??= import("./server-methods/exec-approval.js").then(
({ createExecApprovalHandlers }) =>
createExecApprovalHandlers(execApprovalManager, {
forwarder: execApprovalForwarder,
iosPushDelivery: execApprovalIosPushDelivery,
}),
));
const buildReloadPlan = params.buildReloadPlan ?? buildGatewayReloadPlan;
const pluginApprovalManager = new ExecApprovalManager<PluginApprovalRequestPayload>();
const pluginApprovalHandlers = createPluginApprovalHandlers(pluginApprovalManager, {
forwarder: execApprovalForwarder,
});
let pluginApprovalHandlersPromise: Promise<GatewayRequestHandlers> | null = null;
const loadPluginApprovalHandlers = () =>
(pluginApprovalHandlersPromise ??= import("./server-methods/plugin-approval.js").then(
({ createPluginApprovalHandlers }) =>
createPluginApprovalHandlers(pluginApprovalManager, {
forwarder: execApprovalForwarder,
}),
));
// Serialize the entire `secrets.reload` path (activation + channel restart)
// so concurrent callers cannot overlap the stop/start loop and so the
// "before" snapshot used for the reload-plan diff is always the snapshot
@@ -83,132 +105,168 @@ export function createGatewayAuxHandlers(params: {
reloadInFlight = run;
return run;
};
const secretsHandlers = createSecretsHandlers({
reloadSecrets: () =>
runExclusiveReload(async () => {
const previousSnapshot = getActiveSecretsRuntimeSnapshot();
if (!previousSnapshot) {
throw new Error("Secrets runtime snapshot is not active.");
}
// Snapshot both `current` and `required` because
// `setCurrentSharedGatewaySessionGeneration` can clear `required` as
// a side effect of activating a new generation. Restoring only
// `current` on rollback would leave `required` cleared and weaken
// shared-gateway auth-generation enforcement after a failed reload.
const previousSharedGatewaySessionGeneration =
params.sharedGatewaySessionGenerationState.current;
const previousSharedGatewaySessionGenerationRequired =
params.sharedGatewaySessionGenerationState.required;
let nextSharedGatewaySessionGeneration = previousSharedGatewaySessionGeneration;
let sharedGatewaySessionGenerationChanged = false;
const stoppedChannels: ChannelKind[] = [];
const restartedChannels = new Set<ChannelKind>();
try {
const prepared = await params.activateRuntimeSecrets(previousSnapshot.sourceConfig, {
reason: "reload",
activate: true,
});
nextSharedGatewaySessionGeneration =
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
const plan = buildReloadPlan(diffConfigPaths(previousSnapshot.config, prepared.config));
setCurrentSharedGatewaySessionGeneration(
params.sharedGatewaySessionGenerationState,
nextSharedGatewaySessionGeneration,
);
sharedGatewaySessionGenerationChanged =
previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration;
if (sharedGatewaySessionGenerationChanged) {
disconnectStaleSharedGatewayAuthClients({
clients: params.clients,
expectedGeneration: nextSharedGatewaySessionGeneration,
});
}
if (plan.restartChannels.size > 0) {
const restartChannels = [...plan.restartChannels];
if (
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS)
) {
throw new Error(
`secrets.reload requires restarting channels: ${restartChannels.join(", ")}`,
);
}
const restartFailures: ChannelKind[] = [];
for (const channel of restartChannels) {
params.logChannels.info(`restarting ${channel} channel after secrets reload`);
// Track for rollback before awaiting stopChannel: if stopChannel
// throws after partially stopping the channel (for example, a
// plugin hook rejects after the runtime already closed the
// socket), we still need the outer catch to attempt restart so
// the channel is not left down after a failed reload.
stoppedChannels.push(channel);
let secretsHandlersPromise: Promise<GatewayRequestHandlers> | null = null;
const loadSecretsHandlers = () =>
(secretsHandlersPromise ??= import("./server-methods/secrets.js").then(
({ createSecretsHandlers }) =>
createSecretsHandlers({
reloadSecrets: () =>
runExclusiveReload(async () => {
const previousSnapshot = getActiveSecretsRuntimeSnapshot();
if (!previousSnapshot) {
throw new Error("Secrets runtime snapshot is not active.");
}
// Snapshot both `current` and `required` because
// `setCurrentSharedGatewaySessionGeneration` can clear `required` as
// a side effect of activating a new generation. Restoring only
// `current` on rollback would leave `required` cleared and weaken
// shared-gateway auth-generation enforcement after a failed reload.
const previousSharedGatewaySessionGeneration =
params.sharedGatewaySessionGenerationState.current;
const previousSharedGatewaySessionGenerationRequired =
params.sharedGatewaySessionGenerationState.required;
let nextSharedGatewaySessionGeneration = previousSharedGatewaySessionGeneration;
let sharedGatewaySessionGenerationChanged = false;
const stoppedChannels: ChannelKind[] = [];
const restartedChannels = new Set<ChannelKind>();
try {
await params.stopChannel(channel);
await params.startChannel(channel);
restartedChannels.add(channel);
} catch {
params.logChannels.info(
`failed to restart ${channel} channel after secrets reload`,
const prepared = await params.activateRuntimeSecrets(
previousSnapshot.sourceConfig,
{
reason: "reload",
activate: true,
},
);
restartFailures.push(channel);
nextSharedGatewaySessionGeneration =
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
const plan = buildReloadPlan(
diffConfigPaths(previousSnapshot.config, prepared.config),
);
setCurrentSharedGatewaySessionGeneration(
params.sharedGatewaySessionGenerationState,
nextSharedGatewaySessionGeneration,
);
sharedGatewaySessionGenerationChanged =
previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration;
if (sharedGatewaySessionGenerationChanged) {
disconnectStaleSharedGatewayAuthClients({
clients: params.clients,
expectedGeneration: nextSharedGatewaySessionGeneration,
});
}
if (plan.restartChannels.size > 0) {
const restartChannels = [...plan.restartChannels];
if (
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS)
) {
throw new Error(
`secrets.reload requires restarting channels: ${restartChannels.join(", ")}`,
);
}
const restartFailures: ChannelKind[] = [];
for (const channel of restartChannels) {
params.logChannels.info(`restarting ${channel} channel after secrets reload`);
// Track for rollback before awaiting stopChannel: if stopChannel
// throws after partially stopping the channel (for example, a
// plugin hook rejects after the runtime already closed the
// socket), we still need the outer catch to attempt restart so
// the channel is not left down after a failed reload.
stoppedChannels.push(channel);
try {
await params.stopChannel(channel);
await params.startChannel(channel);
restartedChannels.add(channel);
} catch {
params.logChannels.info(
`failed to restart ${channel} channel after secrets reload`,
);
restartFailures.push(channel);
}
}
if (restartFailures.length > 0) {
throw new Error(
`failed to restart channels after secrets reload: ${restartFailures.join(", ")}`,
);
}
}
return { warningCount: prepared.warnings.length };
} catch (err) {
activateSecretsRuntimeSnapshot(previousSnapshot);
params.sharedGatewaySessionGenerationState.current =
previousSharedGatewaySessionGeneration;
params.sharedGatewaySessionGenerationState.required =
previousSharedGatewaySessionGenerationRequired;
if (sharedGatewaySessionGenerationChanged) {
disconnectStaleSharedGatewayAuthClients({
clients: params.clients,
expectedGeneration: previousSharedGatewaySessionGeneration,
});
}
for (const channel of stoppedChannels) {
params.logChannels.info(
`rolling back ${channel} channel after secrets reload failure`,
);
try {
if (restartedChannels.has(channel)) {
await params.stopChannel(channel);
}
await params.startChannel(channel);
} catch {
params.logChannels.info(
`failed to roll back ${channel} channel after secrets reload`,
);
}
}
throw err;
}
}),
log: params.log,
resolveSecrets: async ({ commandName, targetIds }) => {
const { assignments, diagnostics, inactiveRefPaths } =
resolveCommandSecretsFromActiveRuntimeSnapshot({
commandName,
targetIds: new Set(targetIds),
});
if (assignments.length === 0) {
return {
assignments: [] as CommandSecretAssignment[],
diagnostics,
inactiveRefPaths,
};
}
if (restartFailures.length > 0) {
throw new Error(
`failed to restart channels after secrets reload: ${restartFailures.join(", ")}`,
);
}
}
return { warningCount: prepared.warnings.length };
} catch (err) {
activateSecretsRuntimeSnapshot(previousSnapshot);
params.sharedGatewaySessionGenerationState.current =
previousSharedGatewaySessionGeneration;
params.sharedGatewaySessionGenerationState.required =
previousSharedGatewaySessionGenerationRequired;
if (sharedGatewaySessionGenerationChanged) {
disconnectStaleSharedGatewayAuthClients({
clients: params.clients,
expectedGeneration: previousSharedGatewaySessionGeneration,
});
}
for (const channel of stoppedChannels) {
params.logChannels.info(`rolling back ${channel} channel after secrets reload failure`);
try {
if (restartedChannels.has(channel)) {
await params.stopChannel(channel);
}
await params.startChannel(channel);
} catch {
params.logChannels.info(
`failed to roll back ${channel} channel after secrets reload`,
);
}
}
throw err;
}
}),
log: params.log,
resolveSecrets: async ({ commandName, targetIds }) => {
const { assignments, diagnostics, inactiveRefPaths } =
resolveCommandSecretsFromActiveRuntimeSnapshot({
commandName,
targetIds: new Set(targetIds),
});
if (assignments.length === 0) {
return { assignments: [] as CommandSecretAssignment[], diagnostics, inactiveRefPaths };
}
return { assignments, diagnostics, inactiveRefPaths };
},
});
return { assignments, diagnostics, inactiveRefPaths };
},
}),
));
return {
execApprovalManager,
pluginApprovalManager,
extraHandlers: {
...execApprovalHandlers,
...pluginApprovalHandlers,
...secretsHandlers,
"exec.approval.get": createLazyHandler("exec.approval.get", loadExecApprovalHandlers),
"exec.approval.list": createLazyHandler("exec.approval.list", loadExecApprovalHandlers),
"exec.approval.request": createLazyHandler("exec.approval.request", loadExecApprovalHandlers),
"exec.approval.waitDecision": createLazyHandler(
"exec.approval.waitDecision",
loadExecApprovalHandlers,
),
"exec.approval.resolve": createLazyHandler("exec.approval.resolve", loadExecApprovalHandlers),
"plugin.approval.list": createLazyHandler("plugin.approval.list", loadPluginApprovalHandlers),
"plugin.approval.request": createLazyHandler(
"plugin.approval.request",
loadPluginApprovalHandlers,
),
"plugin.approval.waitDecision": createLazyHandler(
"plugin.approval.waitDecision",
loadPluginApprovalHandlers,
),
"plugin.approval.resolve": createLazyHandler(
"plugin.approval.resolve",
loadPluginApprovalHandlers,
),
"secrets.reload": createLazyHandler("secrets.reload", loadSecretsHandlers),
"secrets.resolve": createLazyHandler("secrets.resolve", loadSecretsHandlers),
},
};
}

View File

@@ -30,6 +30,14 @@ describe("gateway startup import boundaries", () => {
expect(serverImpl).not.toContain('from "../tasks/task-registry.js"');
expect(serverImpl).not.toContain('from "../tasks/task-registry.maintenance.js"');
expect(serverImpl).toContain('import("../tasks/task-registry.maintenance.js")');
const wsConnection = readSource("src/gateway/server/ws-connection.ts");
expect(wsConnection).not.toMatch(
/import\s+\{[^}]*attachGatewayWsMessageHandler[^}]*\}\s+from "\.\/ws-connection\/message-handler\.js"/s,
);
expect(wsConnection).toContain('import("./ws-connection/message-handler.js")');
expect(readSource("src/gateway/server-aux-handlers.ts")).not.toMatch(
/import\s+\{[^}]*create(?:Exec|Plugin|Secrets)[^}]*\}\s+from "\.\/server-methods\//s,
);
expect(validation).not.toContain("legacy-secretref-env-marker");
expect(validation).not.toContain("commands/doctor");
});

View File

@@ -67,6 +67,7 @@ import { createLazyGatewayCronState } from "./server-cron-lazy.js";
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
import { createGatewayServerLiveState, type GatewayServerLiveState } from "./server-live-state.js";
import { GATEWAY_EVENTS } from "./server-methods-list.js";
import type { GatewayRequestHandlers } from "./server-methods/types.js";
import { loadGatewayModelCatalog } from "./server-model-catalog.js";
import { bootstrapGatewayNetworkRuntime } from "./server-network-runtime.js";
import { createGatewayNodeSessionRuntime } from "./server-node-session-runtime.js";
@@ -1016,7 +1017,7 @@ export async function startGatewayServer(
stopChannel,
logChannels,
});
const attachedGatewayExtraHandlers = {
const attachedGatewayExtraHandlers: GatewayRequestHandlers = {
...pluginRegistry.gatewayHandlers,
...extraHandlers,
};

View File

@@ -31,6 +31,10 @@ function createResolvedAuth(token: string): ResolvedGatewayAuth {
};
}
async function waitForLazyMessageHandler() {
await vi.dynamicImportSettled();
}
describe("attachGatewayWsConnectionHandler", () => {
beforeEach(() => {
attachGatewayWsMessageHandlerMock.mockReset();
@@ -40,7 +44,7 @@ describe("attachGatewayWsConnectionHandler", () => {
vi.useRealTimers();
});
it("threads current auth getters into the handshake handler instead of a stale snapshot", () => {
it("threads current auth getters into the handshake handler instead of a stale snapshot", async () => {
const listeners = new Map<string, (...args: unknown[]) => void>();
const wss = {
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
@@ -91,6 +95,7 @@ describe("attachGatewayWsConnectionHandler", () => {
const onConnection = listeners.get("connection");
expect(onConnection).toBeTypeOf("function");
onConnection?.(socket, upgradeReq);
await waitForLazyMessageHandler();
expect(attachGatewayWsMessageHandlerMock).toHaveBeenCalledTimes(1);
const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as {
@@ -106,7 +111,7 @@ describe("attachGatewayWsConnectionHandler", () => {
);
});
it("rejects late client registration after a pre-connect socket close", () => {
it("rejects late client registration after a pre-connect socket close", async () => {
const listeners = new Map<string, (...args: unknown[]) => void>();
const wss = {
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
@@ -156,6 +161,7 @@ describe("attachGatewayWsConnectionHandler", () => {
const onConnection = listeners.get("connection");
expect(onConnection).toBeTypeOf("function");
onConnection?.(socket, upgradeReq);
await waitForLazyMessageHandler();
const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as {
setClient: (client: unknown) => boolean;
@@ -173,7 +179,7 @@ describe("attachGatewayWsConnectionHandler", () => {
expect(clients.size).toBe(0);
});
it("sends protocol pings until the connection closes", () => {
it("sends protocol pings until the connection closes", async () => {
vi.useFakeTimers();
const listeners = new Map<string, (...args: unknown[]) => void>();
const wss = {
@@ -224,6 +230,7 @@ describe("attachGatewayWsConnectionHandler", () => {
const onConnection = listeners.get("connection");
expect(onConnection).toBeTypeOf("function");
onConnection?.(socket, upgradeReq);
await waitForLazyMessageHandler();
const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as {
setClient: (client: unknown) => boolean;

View File

@@ -1,6 +1,6 @@
import { randomUUID } from "node:crypto";
import type { Socket } from "node:net";
import type { WebSocket, WebSocketServer } from "ws";
import type { RawData, WebSocket, WebSocketServer } from "ws";
import { resolveCanvasHostUrl } from "../../infra/canvas-host-url.js";
import { removeRemoteNodeInfo } from "../../infra/skills-remote.js";
import { upsertPresence } from "../../infra/system-presence.js";
@@ -21,9 +21,9 @@ import { logWs } from "../ws-log.js";
import { getHealthVersion, incrementPresenceVersion } from "./health-state.js";
import type { PreauthConnectionBudget } from "./preauth-connection-budget.js";
import { broadcastPresenceSnapshot } from "./presence-events.js";
import {
attachGatewayWsMessageHandler,
type WsOriginCheckMetrics,
import type {
GatewayWsMessageHandlerParams,
WsOriginCheckMetrics,
} from "./ws-connection/message-handler.js";
import { resolveSharedGatewaySessionGeneration } from "./ws-shared-generation.js";
import type { GatewayWsClient } from "./ws-types.js";
@@ -32,6 +32,7 @@ type SubsystemLogger = ReturnType<typeof createSubsystemLogger>;
const LOG_HEADER_MAX_LEN = 300;
const LOG_HEADER_FORMAT_REGEX = /\p{Cf}/gu;
const MAX_QUEUED_MESSAGE_HANDLER_FRAMES = 16;
function replaceControlChars(value: string): string {
let cleaned = "";
@@ -154,6 +155,42 @@ export type AttachGatewayWsConnectionHandlerParams = GatewayWsSharedHandlerParam
buildRequestContext: () => GatewayRequestContext;
};
function attachGatewayWsMessageHandlerOnDemand(params: GatewayWsMessageHandlerParams): void {
const queued: RawData[] = [];
const queueMessage = (data: RawData) => {
if (queued.length >= MAX_QUEUED_MESSAGE_HANDLER_FRAMES) {
params.setCloseCause("message-handler-loading-overflow", {
queuedFrames: queued.length,
});
params.close(1008, "gateway message handler loading");
return;
}
queued.push(data);
};
params.socket.on("message", queueMessage);
void import("./ws-connection/message-handler.js")
.then(({ attachGatewayWsMessageHandler }) => {
params.socket.off("message", queueMessage);
if (params.isClosed()) {
return;
}
attachGatewayWsMessageHandler(params);
for (const data of queued) {
params.socket.emit("message", data);
}
})
.catch((error: unknown) => {
params.socket.off("message", queueMessage);
params.setCloseCause("message-handler-load-failed", {
error: formatError(error),
});
params.logWsControl.warn(
`failed to load ws message handler conn=${params.connId}: ${formatError(error)}`,
);
params.close(1011, "gateway message handler unavailable");
});
}
export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnectionHandlerParams) {
const {
wss,
@@ -390,7 +427,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
}
}, handshakeTimeoutMs);
attachGatewayWsMessageHandler({
attachGatewayWsMessageHandlerOnDemand({
socket,
upgradeReq,
connId,

View File

@@ -173,7 +173,7 @@ function resolvePinnedClientMetadata(params: {
};
}
export function attachGatewayWsMessageHandler(params: {
export type GatewayWsMessageHandlerParams = {
socket: WebSocket;
upgradeReq: IncomingMessage;
connId: string;
@@ -214,7 +214,9 @@ export function attachGatewayWsMessageHandler(params: {
logGateway: SubsystemLogger;
logHealth: SubsystemLogger;
logWsControl: SubsystemLogger;
}) {
};
export function attachGatewayWsMessageHandler(params: GatewayWsMessageHandlerParams) {
const {
socket,
upgradeReq,