Files
openclaw/src/gateway/server.impl.ts
2026-04-13 01:41:53 +01:00

856 lines
30 KiB
TypeScript

import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/runs.js";
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
import type { CanvasHostServer } from "../canvas-host/server.js";
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
import { createDefaultDeps } from "../cli/deps.js";
import { isRestartEnabled } from "../config/commands.flags.js";
import {
type OpenClawConfig,
applyConfigOverrides,
getRuntimeConfig,
isNixMode,
loadConfig,
readConfigFileSnapshot,
registerConfigWriteListener,
writeConfigFile,
} from "../config/config.js";
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import { resolveMainSessionKey } from "../config/sessions.js";
import { clearAgentRunContext } from "../infra/agent-events.js";
import { isDiagnosticsEnabled } from "../infra/diagnostic-events.js";
import { logAcceptedEnvOption } from "../infra/env.js";
import { ensureOpenClawCliOnPath } from "../infra/path-env.js";
import { setGatewaySigusr1RestartPolicy, setPreRestartDeferralCheck } from "../infra/restart.js";
import { enqueueSystemEvent } from "../infra/system-events.js";
import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js";
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
import { runGlobalGatewayStopSafely } from "../plugins/hook-runner-global.js";
import { createPluginRuntime } from "../plugins/runtime/index.js";
import { getTotalQueueSize } from "../process/command-queue.js";
import type { RuntimeEnv } from "../runtime.js";
import {
clearSecretsRuntimeSnapshot,
getActiveSecretsRuntimeSnapshot,
} from "../secrets/runtime.js";
import {
getInspectableTaskRegistrySummary,
stopTaskRegistryMaintenance,
} from "../tasks/task-registry.maintenance.js";
import { runSetupWizard } from "../wizard/setup.js";
import { createAuthRateLimiter, type AuthRateLimiter } from "./auth-rate-limit.js";
import { resolveGatewayAuth } from "./auth.js";
import { closeMcpLoopbackServer } from "./mcp-http.js";
import { createGatewayAuxHandlers } from "./server-aux-handlers.js";
import { createChannelManager } from "./server-channels.js";
import { createGatewayCloseHandler, runGatewayClosePrelude } from "./server-close.js";
import { resolveGatewayControlUiRootState } from "./server-control-ui-root.js";
import { buildGatewayCronService } from "./server-cron.js";
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
import { createGatewayServerLiveState, type GatewayServerLiveState } from "./server-live-state.js";
import { GATEWAY_EVENTS } from "./server-methods-list.js";
import { coreGatewayHandlers } from "./server-methods.js";
import { loadGatewayModelCatalog } from "./server-model-catalog.js";
import { createGatewayNodeSessionRuntime } from "./server-node-session-runtime.js";
import { reloadDeferredGatewayPlugins } from "./server-plugin-bootstrap.js";
import { setFallbackGatewayContextResolver } from "./server-plugins.js";
import { startManagedGatewayConfigReloader } from "./server-reload-handlers.js";
import { createGatewayRequestContext } from "./server-request-context.js";
import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js";
import {
activateGatewayScheduledServices,
startGatewayRuntimeServices,
} from "./server-runtime-services.js";
import { createGatewayRuntimeState } from "./server-runtime-state.js";
import { startGatewayEventSubscriptions } from "./server-runtime-subscriptions.js";
import { resolveSessionKeyForRun } from "./server-session-key.js";
import {
enforceSharedGatewaySessionGenerationForConfigWrite,
getRequiredSharedGatewaySessionGeneration,
type SharedGatewaySessionGenerationState,
} from "./server-shared-auth-generation.js";
import {
createRuntimeSecretsActivator,
loadGatewayStartupConfigSnapshot,
prepareGatewayStartupConfig,
} from "./server-startup-config.js";
import { prepareGatewayPluginBootstrap } from "./server-startup-plugins.js";
import { STARTUP_UNAVAILABLE_GATEWAY_METHODS } from "./server-startup-unavailable-methods.js";
import { startGatewayEarlyRuntime, startGatewayPostAttachRuntime } from "./server-startup.js";
import { createWizardSessionTracker } from "./server-wizard-sessions.js";
import { attachGatewayWsHandlers } from "./server-ws-runtime.js";
import {
getHealthCache,
getHealthVersion,
getPresenceVersion,
incrementPresenceVersion,
refreshGatewayHealthSnapshot,
} from "./server/health-state.js";
import { resolveHookClientIpConfig } from "./server/hooks.js";
import { createReadinessChecker } from "./server/readiness.js";
import { loadGatewayTlsRuntime } from "./server/tls.js";
import { resolveSharedGatewaySessionGeneration } from "./server/ws-shared-generation.js";
import { maybeSeedControlUiAllowedOriginsAtStartup } from "./startup-control-ui-origins.js";
export { __resetModelCatalogCacheForTest } from "./server-model-catalog.js";
ensureOpenClawCliOnPath();
const MAX_MEDIA_TTL_HOURS = 24 * 7;
function resolveMediaCleanupTtlMs(ttlHoursRaw: number): number {
const ttlHours = Math.min(Math.max(ttlHoursRaw, 1), MAX_MEDIA_TTL_HOURS);
const ttlMs = ttlHours * 60 * 60_000;
if (!Number.isFinite(ttlMs) || !Number.isSafeInteger(ttlMs)) {
throw new Error(`Invalid media.ttlHours: ${String(ttlHoursRaw)}`);
}
return ttlMs;
}
const log = createSubsystemLogger("gateway");
const logCanvas = log.child("canvas");
const logDiscovery = log.child("discovery");
const logTailscale = log.child("tailscale");
const logChannels = log.child("channels");
let cachedChannelRuntime: ReturnType<typeof createPluginRuntime>["channel"] | null = null;
function getChannelRuntime() {
cachedChannelRuntime ??= createPluginRuntime().channel;
return cachedChannelRuntime;
}
const logHealth = log.child("health");
const logCron = log.child("cron");
const logReload = log.child("reload");
const logHooks = log.child("hooks");
const logPlugins = log.child("plugins");
const logWsControl = log.child("ws");
const logSecrets = log.child("secrets");
const gatewayRuntime = runtimeForLogger(log);
const canvasRuntime = runtimeForLogger(logCanvas);
type AuthRateLimitConfig = Parameters<typeof createAuthRateLimiter>[0];
function createGatewayAuthRateLimiters(rateLimitConfig: AuthRateLimitConfig | undefined): {
rateLimiter?: AuthRateLimiter;
browserRateLimiter: AuthRateLimiter;
} {
const rateLimiter = rateLimitConfig ? createAuthRateLimiter(rateLimitConfig) : undefined;
// Browser-origin WS auth attempts always use loopback-non-exempt throttling.
const browserRateLimiter = createAuthRateLimiter({
...rateLimitConfig,
exemptLoopback: false,
});
return { rateLimiter, browserRateLimiter };
}
export type GatewayServer = {
close: (opts?: { reason?: string; restartExpectedMs?: number | null }) => Promise<void>;
};
export type GatewayServerOptions = {
/**
* Bind address policy for the Gateway WebSocket/HTTP server.
* - loopback: 127.0.0.1
* - lan: 0.0.0.0
* - tailnet: bind only to the Tailscale IPv4 address (100.64.0.0/10)
* - auto: prefer loopback, else LAN
*/
bind?: import("../config/config.js").GatewayBindMode;
/**
* Advanced override for the bind host, bypassing bind resolution.
* Prefer `bind` unless you really need a specific address.
*/
host?: string;
/**
* If false, do not serve the browser Control UI.
* Default: config `gateway.controlUi.enabled` (or true when absent).
*/
controlUiEnabled?: boolean;
/**
* If false, do not serve `POST /v1/chat/completions`.
* Default: config `gateway.http.endpoints.chatCompletions.enabled` (or false when absent).
*/
openAiChatCompletionsEnabled?: boolean;
/**
* If false, do not serve `POST /v1/responses` (OpenResponses API).
* Default: config `gateway.http.endpoints.responses.enabled` (or false when absent).
*/
openResponsesEnabled?: boolean;
/**
* Override gateway auth configuration (merges with config).
*/
auth?: import("../config/config.js").GatewayAuthConfig;
/**
* Override gateway Tailscale exposure configuration (merges with config).
*/
tailscale?: import("../config/config.js").GatewayTailscaleConfig;
/**
* Test-only: allow canvas host startup even when NODE_ENV/VITEST would disable it.
*/
allowCanvasHostInTests?: boolean;
/**
* Test-only: override the setup wizard runner.
*/
wizardRunner?: (
opts: import("../commands/onboard-types.js").OnboardOptions,
runtime: import("../runtime.js").RuntimeEnv,
prompter: import("../wizard/prompts.js").WizardPrompter,
) => Promise<void>;
/**
* Optional startup timestamp used for concise readiness logging.
*/
startupStartedAt?: number;
};
export async function startGatewayServer(
port = 18789,
opts: GatewayServerOptions = {},
): Promise<GatewayServer> {
const minimalTestGateway =
process.env.VITEST === "1" && process.env.OPENCLAW_TEST_MINIMAL_GATEWAY === "1";
// Ensure all default port derivations (browser/canvas) see the actual runtime port.
process.env.OPENCLAW_GATEWAY_PORT = String(port);
logAcceptedEnvOption({
key: "OPENCLAW_RAW_STREAM",
description: "raw stream logging enabled",
});
logAcceptedEnvOption({
key: "OPENCLAW_RAW_STREAM_PATH",
description: "raw stream log path override",
});
const configSnapshot = await loadGatewayStartupConfigSnapshot({
minimalTestGateway,
log,
});
const emitSecretsStateEvent = (
code: "SECRETS_RELOADER_DEGRADED" | "SECRETS_RELOADER_RECOVERED",
message: string,
cfg: OpenClawConfig,
) => {
enqueueSystemEvent(`[${code}] ${message}`, {
sessionKey: resolveMainSessionKey(cfg),
contextKey: code,
});
};
const activateRuntimeSecrets = createRuntimeSecretsActivator({
logSecrets,
emitStateEvent: emitSecretsStateEvent,
});
let cfgAtStart: OpenClawConfig;
let startupInternalWriteHash: string | null = null;
const startupRuntimeConfig = applyConfigOverrides(configSnapshot.config);
const authBootstrap = await prepareGatewayStartupConfig({
configSnapshot,
authOverride: opts.auth,
tailscaleOverride: opts.tailscale,
activateRuntimeSecrets,
});
cfgAtStart = authBootstrap.cfg;
if (authBootstrap.generatedToken) {
if (authBootstrap.persistedGeneratedToken) {
log.info(
"Gateway auth token was missing. Generated a new token and saved it to config (gateway.auth.token).",
);
} else {
log.warn(
"Gateway auth token was missing. Generated a runtime token for this startup without changing config; restart will generate a different token. Persist one with `openclaw config set gateway.auth.mode token` and `openclaw config set gateway.auth.token <token>`.",
);
}
}
const diagnosticsEnabled = isDiagnosticsEnabled(cfgAtStart);
if (diagnosticsEnabled) {
startDiagnosticHeartbeat(undefined, { getConfig: getRuntimeConfig });
}
setGatewaySigusr1RestartPolicy({ allowExternal: isRestartEnabled(cfgAtStart) });
setPreRestartDeferralCheck(
() =>
getTotalQueueSize() +
getTotalPendingReplies() +
getActiveEmbeddedRunCount() +
getInspectableTaskRegistrySummary().active,
);
// Unconditional startup migration: seed gateway.controlUi.allowedOrigins for existing
// non-loopback installs that upgraded to v2026.2.26+ without required origins.
const controlUiSeed = minimalTestGateway
? { config: cfgAtStart, persistedAllowedOriginsSeed: false }
: await maybeSeedControlUiAllowedOriginsAtStartup({
config: cfgAtStart,
writeConfig: writeConfigFile,
log,
});
cfgAtStart = controlUiSeed.config;
if (authBootstrap.persistedGeneratedToken || controlUiSeed.persistedAllowedOriginsSeed) {
const startupSnapshot = await readConfigFileSnapshot();
startupInternalWriteHash = startupSnapshot.hash ?? null;
}
const pluginBootstrap = await prepareGatewayPluginBootstrap({
cfgAtStart,
startupRuntimeConfig,
minimalTestGateway,
log,
});
const {
gatewayPluginConfigAtStart,
defaultWorkspaceDir,
deferredConfiguredChannelPluginIds,
startupPluginIds,
baseMethods,
} = pluginBootstrap;
let { pluginRegistry, baseGatewayMethods } = pluginBootstrap;
const channelLogs = Object.fromEntries(
listChannelPlugins().map((plugin) => [plugin.id, logChannels.child(plugin.id)]),
) as Record<ChannelId, ReturnType<typeof createSubsystemLogger>>;
const channelRuntimeEnvs = Object.fromEntries(
Object.entries(channelLogs).map(([id, logger]) => [id, runtimeForLogger(logger)]),
) as unknown as Record<ChannelId, RuntimeEnv>;
const listActiveGatewayMethods = (nextBaseGatewayMethods: string[]) =>
Array.from(
new Set([
...nextBaseGatewayMethods,
...listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []),
]),
);
const runtimeConfig = await resolveGatewayRuntimeConfig({
cfg: cfgAtStart,
port,
bind: opts.bind,
host: opts.host,
controlUiEnabled: opts.controlUiEnabled,
openAiChatCompletionsEnabled: opts.openAiChatCompletionsEnabled,
openResponsesEnabled: opts.openResponsesEnabled,
auth: opts.auth,
tailscale: opts.tailscale,
});
const {
bindHost,
controlUiEnabled,
openAiChatCompletionsEnabled,
openAiChatCompletionsConfig,
openResponsesEnabled,
openResponsesConfig,
strictTransportSecurityHeader,
controlUiBasePath,
controlUiRoot: controlUiRootOverride,
resolvedAuth,
tailscaleConfig,
tailscaleMode,
} = runtimeConfig;
const getResolvedAuth = () =>
resolveGatewayAuth({
authConfig:
getActiveSecretsRuntimeSnapshot()?.config.gateway?.auth ?? getRuntimeConfig().gateway?.auth,
authOverride: opts.auth,
env: process.env,
tailscaleMode,
});
const resolveSharedGatewaySessionGenerationForConfig = (config: OpenClawConfig) =>
resolveSharedGatewaySessionGeneration(
resolveGatewayAuth({
authConfig: config.gateway?.auth,
authOverride: opts.auth,
env: process.env,
tailscaleMode,
}),
);
const resolveCurrentSharedGatewaySessionGeneration = () =>
resolveSharedGatewaySessionGeneration(getResolvedAuth());
const resolveSharedGatewaySessionGenerationForRuntimeSnapshot = () =>
resolveSharedGatewaySessionGeneration(
resolveGatewayAuth({
authConfig: getRuntimeConfig().gateway?.auth,
authOverride: opts.auth,
env: process.env,
tailscaleMode,
}),
);
const sharedGatewaySessionGenerationState: SharedGatewaySessionGenerationState = {
current: resolveCurrentSharedGatewaySessionGeneration(),
required: null,
};
const initialHooksConfig = runtimeConfig.hooksConfig;
const initialHookClientIpConfig = resolveHookClientIpConfig(cfgAtStart);
const canvasHostEnabled = runtimeConfig.canvasHostEnabled;
// Create auth rate limiters used by connect/auth flows.
const rateLimitConfig = cfgAtStart.gateway?.auth?.rateLimit;
const { rateLimiter: authRateLimiter, browserRateLimiter: browserAuthRateLimiter } =
createGatewayAuthRateLimiters(rateLimitConfig);
const controlUiRootState = await resolveGatewayControlUiRootState({
controlUiRootOverride,
controlUiEnabled,
gatewayRuntime,
log,
});
const wizardRunner = opts.wizardRunner ?? runSetupWizard;
const { wizardSessions, findRunningWizard, purgeWizardSession } = createWizardSessionTracker();
const deps = createDefaultDeps();
let runtimeState: GatewayServerLiveState | null = null;
let canvasHostServer: CanvasHostServer | null = null;
const gatewayTls = await loadGatewayTlsRuntime(cfgAtStart.gateway?.tls, log.child("tls"));
if (cfgAtStart.gateway?.tls?.enabled && !gatewayTls.enabled) {
throw new Error(gatewayTls.error ?? "gateway tls: failed to enable");
}
const serverStartedAt = Date.now();
const channelManager = createChannelManager({
loadConfig: () =>
applyPluginAutoEnable({
config: loadConfig(),
env: process.env,
}).config,
channelLogs,
channelRuntimeEnvs,
resolveChannelRuntime: getChannelRuntime,
});
const getReadiness = createReadinessChecker({
channelManager,
startedAt: serverStartedAt,
});
log.info("starting HTTP server...");
const {
canvasHost,
releasePluginRouteRegistry,
httpServer,
httpServers,
httpBindHosts,
wss,
preauthConnectionBudget,
clients,
broadcast,
broadcastToConnIds,
agentRunSeq,
dedupe,
chatRunState,
chatRunBuffers,
chatDeltaSentAt,
chatDeltaLastBroadcastLen,
addChatRun,
removeChatRun,
chatAbortControllers,
toolEventRecipients,
} = await createGatewayRuntimeState({
cfg: cfgAtStart,
bindHost,
port,
controlUiEnabled,
controlUiBasePath,
controlUiRoot: controlUiRootState,
openAiChatCompletionsEnabled,
openAiChatCompletionsConfig,
openResponsesEnabled,
openResponsesConfig,
strictTransportSecurityHeader,
resolvedAuth,
rateLimiter: authRateLimiter,
gatewayTls,
hooksConfig: () => runtimeState?.hooksConfig ?? initialHooksConfig,
getHookClientIpConfig: () => runtimeState?.hookClientIpConfig ?? initialHookClientIpConfig,
pluginRegistry,
pinChannelRegistry: !minimalTestGateway,
deps,
canvasRuntime,
canvasHostEnabled,
allowCanvasHostInTests: opts.allowCanvasHostInTests,
logCanvas,
log,
logHooks,
logPlugins,
getReadiness,
});
const {
nodeRegistry,
nodePresenceTimers,
sessionEventSubscribers,
sessionMessageSubscribers,
nodeSendToSession,
nodeSendToAllSubscribed,
nodeSubscribe,
nodeUnsubscribe,
nodeUnsubscribeAll,
broadcastVoiceWakeChanged,
hasMobileNodeConnected,
} = createGatewayNodeSessionRuntime({ broadcast });
applyGatewayLaneConcurrency(cfgAtStart);
runtimeState = createGatewayServerLiveState({
hooksConfig: initialHooksConfig,
hookClientIpConfig: initialHookClientIpConfig,
cronState: buildGatewayCronService({
cfg: cfgAtStart,
deps,
broadcast,
}),
gatewayMethods: listActiveGatewayMethods(baseGatewayMethods),
});
deps.cron = runtimeState.cronState.cron;
const runClosePrelude = async () =>
await runGatewayClosePrelude({
...(diagnosticsEnabled ? { stopDiagnostics: stopDiagnosticHeartbeat } : {}),
clearSkillsRefreshTimer: () => {
if (!runtimeState?.skillsRefreshTimer) {
return;
}
clearTimeout(runtimeState.skillsRefreshTimer);
runtimeState.skillsRefreshTimer = null;
},
skillsChangeUnsub: runtimeState.skillsChangeUnsub,
...(authRateLimiter ? { disposeAuthRateLimiter: () => authRateLimiter.dispose() } : {}),
disposeBrowserAuthRateLimiter: () => browserAuthRateLimiter.dispose(),
stopModelPricingRefresh: runtimeState.stopModelPricingRefresh,
stopChannelHealthMonitor: () => runtimeState?.channelHealthMonitor?.stop(),
clearSecretsRuntimeSnapshot,
closeMcpServer: async () => await closeMcpLoopbackServer(),
});
const closeOnStartupFailure = async () => {
await runClosePrelude();
await createGatewayCloseHandler({
bonjourStop: runtimeState.bonjourStop,
tailscaleCleanup: runtimeState.tailscaleCleanup,
canvasHost,
canvasHostServer,
releasePluginRouteRegistry,
stopChannel,
pluginServices: runtimeState.pluginServices,
cron: runtimeState.cronState.cron,
heartbeatRunner: runtimeState.heartbeatRunner,
updateCheckStop: runtimeState.stopGatewayUpdateCheck,
stopTaskRegistryMaintenance,
nodePresenceTimers,
broadcast,
tickInterval: runtimeState.tickInterval,
healthInterval: runtimeState.healthInterval,
dedupeCleanup: runtimeState.dedupeCleanup,
mediaCleanup: runtimeState.mediaCleanup,
agentUnsub: runtimeState.agentUnsub,
heartbeatUnsub: runtimeState.heartbeatUnsub,
transcriptUnsub: runtimeState.transcriptUnsub,
lifecycleUnsub: runtimeState.lifecycleUnsub,
chatRunState,
clients,
configReloader: runtimeState.configReloader,
wss,
httpServer,
httpServers,
})({ reason: "gateway startup failed" });
};
const { getRuntimeSnapshot, startChannels, startChannel, stopChannel, markChannelLoggedOut } =
channelManager;
try {
const earlyRuntime = await startGatewayEarlyRuntime({
minimalTestGateway,
cfgAtStart,
port,
gatewayTls,
tailscaleMode,
log,
logDiscovery,
nodeRegistry,
broadcast,
nodeSendToAllSubscribed,
getPresenceVersion,
getHealthVersion,
refreshGatewayHealthSnapshot,
logHealth,
dedupe,
chatAbortControllers,
chatRunState,
chatRunBuffers,
chatDeltaSentAt,
chatDeltaLastBroadcastLen,
removeChatRun,
agentRunSeq,
nodeSendToSession,
...(typeof cfgAtStart.media?.ttlHours === "number"
? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) }
: {}),
skillsRefreshDelayMs: runtimeState.skillsRefreshDelayMs,
getSkillsRefreshTimer: () => runtimeState.skillsRefreshTimer,
setSkillsRefreshTimer: (timer) => {
runtimeState.skillsRefreshTimer = timer;
},
loadConfig,
});
runtimeState.bonjourStop = earlyRuntime.bonjourStop;
runtimeState.skillsChangeUnsub = earlyRuntime.skillsChangeUnsub;
if (earlyRuntime.maintenance) {
runtimeState.tickInterval = earlyRuntime.maintenance.tickInterval;
runtimeState.healthInterval = earlyRuntime.maintenance.healthInterval;
runtimeState.dedupeCleanup = earlyRuntime.maintenance.dedupeCleanup;
runtimeState.mediaCleanup = earlyRuntime.maintenance.mediaCleanup;
}
Object.assign(
runtimeState,
startGatewayEventSubscriptions({
minimalTestGateway,
broadcast,
broadcastToConnIds,
nodeSendToSession,
agentRunSeq,
chatRunState,
resolveSessionKeyForRun,
clearAgentRunContext,
toolEventRecipients,
sessionEventSubscribers,
sessionMessageSubscribers,
chatAbortControllers,
}),
);
Object.assign(
runtimeState,
startGatewayRuntimeServices({
minimalTestGateway,
cfgAtStart,
channelManager,
log,
}),
);
const { execApprovalManager, pluginApprovalManager, extraHandlers } = createGatewayAuxHandlers({
log,
activateRuntimeSecrets,
sharedGatewaySessionGenerationState,
resolveSharedGatewaySessionGenerationForConfig,
clients,
});
const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port;
const unavailableGatewayMethods = new Set<string>(
minimalTestGateway ? [] : STARTUP_UNAVAILABLE_GATEWAY_METHODS,
);
const gatewayRequestContext = createGatewayRequestContext({
deps,
runtimeState,
execApprovalManager,
pluginApprovalManager,
loadGatewayModelCatalog,
getHealthCache,
refreshHealthSnapshot: refreshGatewayHealthSnapshot,
logHealth,
logGateway: log,
incrementPresenceVersion,
getHealthVersion,
broadcast,
broadcastToConnIds,
nodeSendToSession,
nodeSendToAllSubscribed,
nodeSubscribe,
nodeUnsubscribe,
nodeUnsubscribeAll,
hasConnectedMobileNode: hasMobileNodeConnected,
clients,
enforceSharedGatewayAuthGenerationForConfigWrite: (nextConfig: OpenClawConfig) => {
enforceSharedGatewaySessionGenerationForConfigWrite({
state: sharedGatewaySessionGenerationState,
nextConfig,
resolveRuntimeSnapshotGeneration: resolveSharedGatewaySessionGenerationForRuntimeSnapshot,
clients,
});
},
nodeRegistry,
agentRunSeq,
chatAbortControllers,
chatAbortedRuns: chatRunState.abortedRuns,
chatRunBuffers: chatRunState.buffers,
chatDeltaSentAt: chatRunState.deltaSentAt,
chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen,
addChatRun,
removeChatRun,
subscribeSessionEvents: sessionEventSubscribers.subscribe,
unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe,
subscribeSessionMessageEvents: sessionMessageSubscribers.subscribe,
unsubscribeSessionMessageEvents: sessionMessageSubscribers.unsubscribe,
unsubscribeAllSessionEvents: (connId: string) => {
sessionEventSubscribers.unsubscribe(connId);
sessionMessageSubscribers.unsubscribeAll(connId);
},
getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll,
registerToolEventRecipient: toolEventRecipients.add,
dedupe,
wizardSessions,
findRunningWizard,
purgeWizardSession,
getRuntimeSnapshot,
startChannel,
stopChannel,
markChannelLoggedOut,
wizardRunner,
broadcastVoiceWakeChanged,
unavailableGatewayMethods,
});
setFallbackGatewayContextResolver(() => gatewayRequestContext);
if (!minimalTestGateway) {
if (deferredConfiguredChannelPluginIds.length > 0) {
({ pluginRegistry, gatewayMethods: baseGatewayMethods } = reloadDeferredGatewayPlugins({
cfg: gatewayPluginConfigAtStart,
workspaceDir: defaultWorkspaceDir,
log,
coreGatewayHandlers,
baseMethods,
pluginIds: startupPluginIds,
logDiagnostics: false,
}));
runtimeState.gatewayMethods = listActiveGatewayMethods(baseGatewayMethods);
}
}
attachGatewayWsHandlers({
wss,
clients,
preauthConnectionBudget,
port,
gatewayHost: bindHost ?? undefined,
canvasHostEnabled: Boolean(canvasHost),
canvasHostServerPort,
resolvedAuth,
getResolvedAuth,
getRequiredSharedGatewaySessionGeneration: () =>
getRequiredSharedGatewaySessionGeneration(sharedGatewaySessionGenerationState),
rateLimiter: authRateLimiter,
browserRateLimiter: browserAuthRateLimiter,
gatewayMethods: runtimeState.gatewayMethods,
events: GATEWAY_EVENTS,
logGateway: log,
logHealth,
logWsControl,
extraHandlers: { ...pluginRegistry.gatewayHandlers, ...extraHandlers },
broadcast,
context: gatewayRequestContext,
});
({
stopGatewayUpdateCheck: runtimeState.stopGatewayUpdateCheck,
tailscaleCleanup: runtimeState.tailscaleCleanup,
pluginServices: runtimeState.pluginServices,
} = await startGatewayPostAttachRuntime({
minimalTestGateway,
cfgAtStart,
bindHost,
bindHosts: httpBindHosts,
port,
tlsEnabled: gatewayTls.enabled,
log,
isNixMode,
startupStartedAt: opts.startupStartedAt,
broadcast,
tailscaleMode,
resetOnExit: tailscaleConfig.resetOnExit ?? false,
controlUiBasePath,
logTailscale,
gatewayPluginConfigAtStart,
pluginRegistry,
defaultWorkspaceDir,
deps,
startChannels,
logHooks,
logChannels,
unavailableGatewayMethods,
}));
// Keep scheduled work inert until post-attach sidecars finish.
const activated = activateGatewayScheduledServices({
minimalTestGateway,
cfgAtStart,
cron: runtimeState.cronState.cron,
logCron,
log,
});
runtimeState.heartbeatRunner = activated.heartbeatRunner;
runtimeState.configReloader = startManagedGatewayConfigReloader({
minimalTestGateway,
initialConfig: cfgAtStart,
initialInternalWriteHash: startupInternalWriteHash,
watchPath: configSnapshot.path,
readSnapshot: readConfigFileSnapshot,
subscribeToWrites: registerConfigWriteListener,
deps,
broadcast,
getState: () => ({
hooksConfig: runtimeState.hooksConfig,
hookClientIpConfig: runtimeState.hookClientIpConfig,
heartbeatRunner: runtimeState.heartbeatRunner,
cronState: runtimeState.cronState,
channelHealthMonitor: runtimeState.channelHealthMonitor,
}),
setState: (nextState) => {
runtimeState.hooksConfig = nextState.hooksConfig;
runtimeState.hookClientIpConfig = nextState.hookClientIpConfig;
runtimeState.heartbeatRunner = nextState.heartbeatRunner;
runtimeState.cronState = nextState.cronState;
deps.cron = runtimeState.cronState.cron;
runtimeState.channelHealthMonitor = nextState.channelHealthMonitor;
},
startChannel,
stopChannel,
logHooks,
logChannels,
logCron,
logReload,
channelManager,
activateRuntimeSecrets,
resolveSharedGatewaySessionGenerationForConfig,
sharedGatewaySessionGenerationState,
clients,
});
} catch (err) {
await closeOnStartupFailure();
throw err;
}
const close = createGatewayCloseHandler({
bonjourStop: runtimeState.bonjourStop,
tailscaleCleanup: runtimeState.tailscaleCleanup,
canvasHost,
canvasHostServer,
releasePluginRouteRegistry,
stopChannel,
pluginServices: runtimeState.pluginServices,
cron: runtimeState.cronState.cron,
heartbeatRunner: runtimeState.heartbeatRunner,
updateCheckStop: runtimeState.stopGatewayUpdateCheck,
stopTaskRegistryMaintenance,
nodePresenceTimers,
broadcast,
tickInterval: runtimeState.tickInterval,
healthInterval: runtimeState.healthInterval,
dedupeCleanup: runtimeState.dedupeCleanup,
mediaCleanup: runtimeState.mediaCleanup,
agentUnsub: runtimeState.agentUnsub,
heartbeatUnsub: runtimeState.heartbeatUnsub,
transcriptUnsub: runtimeState.transcriptUnsub,
lifecycleUnsub: runtimeState.lifecycleUnsub,
chatRunState,
clients,
configReloader: runtimeState.configReloader,
wss,
httpServer,
httpServers,
});
return {
close: async (opts) => {
// Run gateway_stop plugin hook before shutdown
await runGlobalGatewayStopSafely({
event: { reason: opts?.reason ?? "gateway stopping" },
ctx: { port },
onError: (err) => log.warn(`gateway_stop hook failed: ${String(err)}`),
});
await runClosePrelude();
await close(opts);
},
};
}