From 5eb6fdca6f5fe0a795e628e045fc2fd949821572 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 25 Mar 2026 09:37:47 +0530 Subject: [PATCH] fix(gateway): close runtime state on startup abort --- src/gateway/server.impl.ts | 1113 +++++++++++++++++++----------------- 1 file changed, 588 insertions(+), 525 deletions(-) diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 4d225bcbcaf..a7730370294 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -724,6 +724,68 @@ export async function startGatewayServer( getReadiness, }); let bonjourStop: (() => Promise) | null = null; + const noopInterval = () => setInterval(() => {}, 1 << 30); + let tickInterval = noopInterval(); + let healthInterval = noopInterval(); + let dedupeCleanup = noopInterval(); + let mediaCleanup: ReturnType | null = null; + let heartbeatRunner: HeartbeatRunner = { + stop: () => {}, + updateConfig: () => {}, + }; + let stopGatewayUpdateCheck = () => {}; + let tailscaleCleanup: (() => Promise) | null = null; + let browserControl: Awaited> = null; + let skillsRefreshTimer: ReturnType | null = null; + const skillsRefreshDelayMs = 30_000; + let skillsChangeUnsub = () => {}; + let channelHealthMonitor: ReturnType | null = null; + let stopModelPricingRefresh = () => {}; + let configReloader: { stop: () => Promise } = { stop: async () => {} }; + const closeOnStartupFailure = async () => { + if (diagnosticsEnabled) { + stopDiagnosticHeartbeat(); + } + if (skillsRefreshTimer) { + clearTimeout(skillsRefreshTimer); + skillsRefreshTimer = null; + } + skillsChangeUnsub(); + authRateLimiter?.dispose(); + browserAuthRateLimiter.dispose(); + stopModelPricingRefresh(); + channelHealthMonitor?.stop(); + clearSecretsRuntimeSnapshot(); + await createGatewayCloseHandler({ + bonjourStop, + tailscaleCleanup, + canvasHost, + canvasHostServer, + releasePluginRouteRegistry, + stopChannel, + pluginServices, + cron, + heartbeatRunner, + updateCheckStop: stopGatewayUpdateCheck, + nodePresenceTimers, + broadcast, + tickInterval, + healthInterval, + dedupeCleanup, + mediaCleanup, + agentUnsub, + heartbeatUnsub, + transcriptUnsub, + lifecycleUnsub, + chatRunState, + clients, + configReloader, + browserControl, + wss, + httpServer, + httpServers, + })({ reason: "gateway startup failed" }); + }; const nodeRegistry = new NodeRegistry(); const nodePresenceTimers = new Map>(); const nodeSubscriptions = createNodeSubscriptionManager(); @@ -755,555 +817,556 @@ export async function startGatewayServer( const { getRuntimeSnapshot, startChannels, startChannel, stopChannel, markChannelLoggedOut } = channelManager; - - if (!minimalTestGateway) { - const machineDisplayName = await getMachineDisplayName(); - const discovery = await startGatewayDiscovery({ - machineDisplayName, - port, - gatewayTls: gatewayTls.enabled - ? { enabled: true, fingerprintSha256: gatewayTls.fingerprintSha256 } - : undefined, - wideAreaDiscoveryEnabled: cfgAtStart.discovery?.wideArea?.enabled === true, - wideAreaDiscoveryDomain: cfgAtStart.discovery?.wideArea?.domain, - tailscaleMode, - mdnsMode: cfgAtStart.discovery?.mdns?.mode, - logDiscovery, - }); - bonjourStop = discovery.bonjourStop; - } - - if (!minimalTestGateway) { - setSkillsRemoteRegistry(nodeRegistry); - void primeRemoteSkillsCache(); - } - // Debounce skills-triggered node probes to avoid feedback loops and rapid-fire invokes. - // Skills changes can happen in bursts (e.g., file watcher events), and each probe - // takes time to complete. A 30-second delay ensures we batch changes together. - let skillsRefreshTimer: ReturnType | null = null; - const skillsRefreshDelayMs = 30_000; - const skillsChangeUnsub = minimalTestGateway - ? () => {} - : registerSkillsChangeListener((event) => { - if (event.reason === "remote-node") { - return; - } - if (skillsRefreshTimer) { - clearTimeout(skillsRefreshTimer); - } - skillsRefreshTimer = setTimeout(() => { - skillsRefreshTimer = null; - const latest = loadConfig(); - void refreshRemoteBinsForConnectedNodes(latest); - }, skillsRefreshDelayMs); + let agentUnsub: (() => void) | null = null; + let heartbeatUnsub: (() => void) | null = null; + let transcriptUnsub: (() => void) | null = null; + let lifecycleUnsub: (() => void) | null = null; + try { + if (!minimalTestGateway) { + const machineDisplayName = await getMachineDisplayName(); + const discovery = await startGatewayDiscovery({ + machineDisplayName, + port, + gatewayTls: gatewayTls.enabled + ? { enabled: true, fingerprintSha256: gatewayTls.fingerprintSha256 } + : undefined, + wideAreaDiscoveryEnabled: cfgAtStart.discovery?.wideArea?.enabled === true, + wideAreaDiscoveryDomain: cfgAtStart.discovery?.wideArea?.domain, + tailscaleMode, + mdnsMode: cfgAtStart.discovery?.mdns?.mode, + logDiscovery, }); + bonjourStop = discovery.bonjourStop; + } - const noopInterval = () => setInterval(() => {}, 1 << 30); - let tickInterval = noopInterval(); - let healthInterval = noopInterval(); - let dedupeCleanup = noopInterval(); - let mediaCleanup: ReturnType | null = null; - if (!minimalTestGateway) { - ({ tickInterval, healthInterval, dedupeCleanup, mediaCleanup } = startGatewayMaintenanceTimers({ - broadcast, - nodeSendToAllSubscribed, - getPresenceVersion, - getHealthVersion, - refreshGatewayHealthSnapshot, - logHealth, - dedupe, - chatAbortControllers, - chatRunState, - chatRunBuffers, - chatDeltaSentAt, - chatDeltaLastBroadcastLen, - removeChatRun, - agentRunSeq, - nodeSendToSession, - ...(typeof cfgAtStart.media?.ttlHours === "number" - ? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) } - : {}), - })); - } - - const agentUnsub = minimalTestGateway - ? null - : onAgentEvent( - createAgentEventHandler({ - broadcast, - broadcastToConnIds, - nodeSendToSession, - agentRunSeq, - chatRunState, - resolveSessionKeyForRun, - clearAgentRunContext, - toolEventRecipients, - sessionEventSubscribers, - }), - ); - - const heartbeatUnsub = minimalTestGateway - ? null - : onHeartbeatEvent((evt) => { - broadcast("heartbeat", evt, { dropIfSlow: true }); - }); - - const transcriptUnsub = minimalTestGateway - ? null - : onSessionTranscriptUpdate((update) => { - const sessionKey = - update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); - if (!sessionKey || update.message === undefined) { - return; - } - const connIds = new Set(); - for (const connId of sessionEventSubscribers.getAll()) { - connIds.add(connId); - } - for (const connId of sessionMessageSubscribers.get(sessionKey)) { - connIds.add(connId); - } - if (connIds.size === 0) { - return; - } - const { entry, storePath } = loadSessionEntry(sessionKey); - const messageSeq = entry?.sessionId - ? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length - : undefined; - const sessionRow = loadGatewaySessionRow(sessionKey); - const sessionSnapshot = sessionRow - ? { - session: sessionRow, - updatedAt: sessionRow.updatedAt ?? undefined, - sessionId: sessionRow.sessionId, - kind: sessionRow.kind, - channel: sessionRow.channel, - label: sessionRow.label, - displayName: sessionRow.displayName, - deliveryContext: sessionRow.deliveryContext, - parentSessionKey: sessionRow.parentSessionKey, - childSessions: sessionRow.childSessions, - thinkingLevel: sessionRow.thinkingLevel, - systemSent: sessionRow.systemSent, - abortedLastRun: sessionRow.abortedLastRun, - lastChannel: sessionRow.lastChannel, - lastTo: sessionRow.lastTo, - lastAccountId: sessionRow.lastAccountId, - totalTokens: sessionRow.totalTokens, - totalTokensFresh: sessionRow.totalTokensFresh, - contextTokens: sessionRow.contextTokens, - estimatedCostUsd: sessionRow.estimatedCostUsd, - modelProvider: sessionRow.modelProvider, - model: sessionRow.model, - status: sessionRow.status, - startedAt: sessionRow.startedAt, - endedAt: sessionRow.endedAt, - runtimeMs: sessionRow.runtimeMs, - } - : {}; - const message = attachOpenClawTranscriptMeta(update.message, { - ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), - ...(typeof messageSeq === "number" ? { seq: messageSeq } : {}), + if (!minimalTestGateway) { + setSkillsRemoteRegistry(nodeRegistry); + void primeRemoteSkillsCache(); + } + // Debounce skills-triggered node probes to avoid feedback loops and rapid-fire invokes. + // Skills changes can happen in bursts (e.g., file watcher events), and each probe + // takes time to complete. A 30-second delay ensures we batch changes together. + skillsChangeUnsub = minimalTestGateway + ? () => {} + : registerSkillsChangeListener((event) => { + if (event.reason === "remote-node") { + return; + } + if (skillsRefreshTimer) { + clearTimeout(skillsRefreshTimer); + } + skillsRefreshTimer = setTimeout(() => { + skillsRefreshTimer = null; + const latest = loadConfig(); + void refreshRemoteBinsForConnectedNodes(latest); + }, skillsRefreshDelayMs); }); - broadcastToConnIds( - "session.message", - { - sessionKey, - message, - ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), - ...(typeof messageSeq === "number" ? { messageSeq } : {}), - ...sessionSnapshot, - }, - connIds, - { dropIfSlow: true }, + + if (!minimalTestGateway) { + ({ tickInterval, healthInterval, dedupeCleanup, mediaCleanup } = + startGatewayMaintenanceTimers({ + broadcast, + nodeSendToAllSubscribed, + getPresenceVersion, + getHealthVersion, + refreshGatewayHealthSnapshot, + logHealth, + dedupe, + chatAbortControllers, + chatRunState, + chatRunBuffers, + chatDeltaSentAt, + chatDeltaLastBroadcastLen, + removeChatRun, + agentRunSeq, + nodeSendToSession, + ...(typeof cfgAtStart.media?.ttlHours === "number" + ? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) } + : {}), + })); + } + + agentUnsub = minimalTestGateway + ? null + : onAgentEvent( + createAgentEventHandler({ + broadcast, + broadcastToConnIds, + nodeSendToSession, + agentRunSeq, + chatRunState, + resolveSessionKeyForRun, + clearAgentRunContext, + toolEventRecipients, + sessionEventSubscribers, + }), ); - const sessionEventConnIds = sessionEventSubscribers.getAll(); - if (sessionEventConnIds.size > 0) { + heartbeatUnsub = minimalTestGateway + ? null + : onHeartbeatEvent((evt) => { + broadcast("heartbeat", evt, { dropIfSlow: true }); + }); + + transcriptUnsub = minimalTestGateway + ? null + : onSessionTranscriptUpdate((update) => { + const sessionKey = + update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile); + if (!sessionKey || update.message === undefined) { + return; + } + const connIds = new Set(); + for (const connId of sessionEventSubscribers.getAll()) { + connIds.add(connId); + } + for (const connId of sessionMessageSubscribers.get(sessionKey)) { + connIds.add(connId); + } + if (connIds.size === 0) { + return; + } + const { entry, storePath } = loadSessionEntry(sessionKey); + const messageSeq = entry?.sessionId + ? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length + : undefined; + const sessionRow = loadGatewaySessionRow(sessionKey); + const sessionSnapshot = sessionRow + ? { + session: sessionRow, + updatedAt: sessionRow.updatedAt ?? undefined, + sessionId: sessionRow.sessionId, + kind: sessionRow.kind, + channel: sessionRow.channel, + label: sessionRow.label, + displayName: sessionRow.displayName, + deliveryContext: sessionRow.deliveryContext, + parentSessionKey: sessionRow.parentSessionKey, + childSessions: sessionRow.childSessions, + thinkingLevel: sessionRow.thinkingLevel, + systemSent: sessionRow.systemSent, + abortedLastRun: sessionRow.abortedLastRun, + lastChannel: sessionRow.lastChannel, + lastTo: sessionRow.lastTo, + lastAccountId: sessionRow.lastAccountId, + totalTokens: sessionRow.totalTokens, + totalTokensFresh: sessionRow.totalTokensFresh, + contextTokens: sessionRow.contextTokens, + estimatedCostUsd: sessionRow.estimatedCostUsd, + modelProvider: sessionRow.modelProvider, + model: sessionRow.model, + status: sessionRow.status, + startedAt: sessionRow.startedAt, + endedAt: sessionRow.endedAt, + runtimeMs: sessionRow.runtimeMs, + } + : {}; + const message = attachOpenClawTranscriptMeta(update.message, { + ...(typeof update.messageId === "string" ? { id: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { seq: messageSeq } : {}), + }); broadcastToConnIds( - "sessions.changed", + "session.message", { sessionKey, - phase: "message", - ts: Date.now(), + message, ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), ...(typeof messageSeq === "number" ? { messageSeq } : {}), ...sessionSnapshot, }, - sessionEventConnIds, + connIds, { dropIfSlow: true }, ); - } - }); - const lifecycleUnsub = minimalTestGateway - ? null - : onSessionLifecycleEvent((event) => { - const connIds = sessionEventSubscribers.getAll(); - if (connIds.size === 0) { - return; - } - const sessionRow = loadGatewaySessionRow(event.sessionKey); - broadcastToConnIds( - "sessions.changed", - { - sessionKey: event.sessionKey, - reason: event.reason, - parentSessionKey: event.parentSessionKey, - label: event.label, - displayName: event.displayName, - ts: Date.now(), - ...(sessionRow - ? { - updatedAt: sessionRow.updatedAt ?? undefined, - sessionId: sessionRow.sessionId, - kind: sessionRow.kind, - channel: sessionRow.channel, - label: event.label ?? sessionRow.label, - displayName: event.displayName ?? sessionRow.displayName, - deliveryContext: sessionRow.deliveryContext, - parentSessionKey: event.parentSessionKey ?? sessionRow.parentSessionKey, - childSessions: sessionRow.childSessions, - thinkingLevel: sessionRow.thinkingLevel, - systemSent: sessionRow.systemSent, - abortedLastRun: sessionRow.abortedLastRun, - lastChannel: sessionRow.lastChannel, - lastTo: sessionRow.lastTo, - lastAccountId: sessionRow.lastAccountId, - totalTokens: sessionRow.totalTokens, - totalTokensFresh: sessionRow.totalTokensFresh, - contextTokens: sessionRow.contextTokens, - estimatedCostUsd: sessionRow.estimatedCostUsd, - modelProvider: sessionRow.modelProvider, - model: sessionRow.model, - status: sessionRow.status, - startedAt: sessionRow.startedAt, - endedAt: sessionRow.endedAt, - runtimeMs: sessionRow.runtimeMs, - } - : {}), - }, - connIds, - { dropIfSlow: true }, - ); - }); - - let heartbeatRunner: HeartbeatRunner = minimalTestGateway - ? { - stop: () => {}, - updateConfig: () => {}, - } - : startHeartbeatRunner({ cfg: cfgAtStart }); - - const healthCheckMinutes = cfgAtStart.gateway?.channelHealthCheckMinutes; - const healthCheckDisabled = healthCheckMinutes === 0; - const staleEventThresholdMinutes = cfgAtStart.gateway?.channelStaleEventThresholdMinutes; - const maxRestartsPerHour = cfgAtStart.gateway?.channelMaxRestartsPerHour; - let channelHealthMonitor = healthCheckDisabled - ? null - : startChannelHealthMonitor({ - channelManager, - checkIntervalMs: (healthCheckMinutes ?? 5) * 60_000, - ...(staleEventThresholdMinutes != null && { - staleEventThresholdMs: staleEventThresholdMinutes * 60_000, - }), - ...(maxRestartsPerHour != null && { maxRestartsPerHour }), - }); - - if (!minimalTestGateway) { - void cron.start().catch((err) => logCron.error(`failed to start: ${String(err)}`)); - } - - const stopModelPricingRefresh = - !minimalTestGateway && process.env.VITEST !== "1" - ? startGatewayModelPricingRefresh({ config: cfgAtStart }) - : () => {}; - - // Recover pending outbound deliveries from previous crash/restart. - if (!minimalTestGateway) { - void (async () => { - const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js"); - const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js"); - const logRecovery = log.child("delivery-recovery"); - await recoverPendingDeliveries({ - deliver: deliverOutboundPayloads, - log: logRecovery, - cfg: cfgAtStart, - }); - })().catch((err) => log.error(`Delivery recovery failed: ${String(err)}`)); - } - - const execApprovalManager = new ExecApprovalManager(); - const execApprovalForwarder = createExecApprovalForwarder(); - const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, { - forwarder: execApprovalForwarder, - }); - const secretsHandlers = createSecretsHandlers({ - reloadSecrets: async () => { - const active = getActiveSecretsRuntimeSnapshot(); - if (!active) { - throw new Error("Secrets runtime snapshot is not active."); - } - const prepared = await activateRuntimeSecrets(active.sourceConfig, { - reason: "reload", - activate: true, - }); - return { warningCount: prepared.warnings.length }; - }, - resolveSecrets: async ({ commandName, targetIds }) => { - const { assignments, diagnostics, inactiveRefPaths } = - resolveCommandSecretsFromActiveRuntimeSnapshot({ - commandName, - targetIds: new Set(targetIds), + const sessionEventConnIds = sessionEventSubscribers.getAll(); + if (sessionEventConnIds.size > 0) { + broadcastToConnIds( + "sessions.changed", + { + sessionKey, + phase: "message", + ts: Date.now(), + ...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}), + ...(typeof messageSeq === "number" ? { messageSeq } : {}), + ...sessionSnapshot, + }, + sessionEventConnIds, + { dropIfSlow: true }, + ); + } }); - if (assignments.length === 0) { - return { assignments: [] as CommandSecretAssignment[], diagnostics, inactiveRefPaths }; - } - return { assignments, diagnostics, inactiveRefPaths }; - }, - }); - const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; + lifecycleUnsub = minimalTestGateway + ? null + : onSessionLifecycleEvent((event) => { + const connIds = sessionEventSubscribers.getAll(); + if (connIds.size === 0) { + return; + } + const sessionRow = loadGatewaySessionRow(event.sessionKey); + broadcastToConnIds( + "sessions.changed", + { + sessionKey: event.sessionKey, + reason: event.reason, + parentSessionKey: event.parentSessionKey, + label: event.label, + displayName: event.displayName, + ts: Date.now(), + ...(sessionRow + ? { + updatedAt: sessionRow.updatedAt ?? undefined, + sessionId: sessionRow.sessionId, + kind: sessionRow.kind, + channel: sessionRow.channel, + label: event.label ?? sessionRow.label, + displayName: event.displayName ?? sessionRow.displayName, + deliveryContext: sessionRow.deliveryContext, + parentSessionKey: event.parentSessionKey ?? sessionRow.parentSessionKey, + childSessions: sessionRow.childSessions, + thinkingLevel: sessionRow.thinkingLevel, + systemSent: sessionRow.systemSent, + abortedLastRun: sessionRow.abortedLastRun, + lastChannel: sessionRow.lastChannel, + lastTo: sessionRow.lastTo, + lastAccountId: sessionRow.lastAccountId, + totalTokens: sessionRow.totalTokens, + totalTokensFresh: sessionRow.totalTokensFresh, + contextTokens: sessionRow.contextTokens, + estimatedCostUsd: sessionRow.estimatedCostUsd, + modelProvider: sessionRow.modelProvider, + model: sessionRow.model, + status: sessionRow.status, + startedAt: sessionRow.startedAt, + endedAt: sessionRow.endedAt, + runtimeMs: sessionRow.runtimeMs, + } + : {}), + }, + connIds, + { dropIfSlow: true }, + ); + }); - const gatewayRequestContext: import("./server-methods/types.js").GatewayRequestContext = { - deps, - cron, - cronStorePath, - execApprovalManager, - loadGatewayModelCatalog, - getHealthCache, - refreshHealthSnapshot: refreshGatewayHealthSnapshot, - logHealth, - logGateway: log, - incrementPresenceVersion, - getHealthVersion, - broadcast, - broadcastToConnIds, - nodeSendToSession, - nodeSendToAllSubscribed, - nodeSubscribe, - nodeUnsubscribe, - nodeUnsubscribeAll, - hasConnectedMobileNode: hasMobileNodeConnected, - hasExecApprovalClients: () => { - for (const gatewayClient of clients) { - const scopes = Array.isArray(gatewayClient.connect.scopes) - ? gatewayClient.connect.scopes - : []; - if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) { - return true; - } - } - return false; - }, - nodeRegistry, - agentRunSeq, - chatAbortControllers, - chatAbortedRuns: chatRunState.abortedRuns, - chatRunBuffers: chatRunState.buffers, - chatDeltaSentAt: chatRunState.deltaSentAt, - chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen, - addChatRun, - removeChatRun, - subscribeSessionEvents: sessionEventSubscribers.subscribe, - unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe, - subscribeSessionMessageEvents: sessionMessageSubscribers.subscribe, - unsubscribeSessionMessageEvents: sessionMessageSubscribers.unsubscribe, - unsubscribeAllSessionEvents: (connId: string) => { - sessionEventSubscribers.unsubscribe(connId); - sessionMessageSubscribers.unsubscribeAll(connId); - }, - getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll, - registerToolEventRecipient: toolEventRecipients.add, - dedupe, - wizardSessions, - findRunningWizard, - purgeWizardSession, - getRuntimeSnapshot, - startChannel, - stopChannel, - markChannelLoggedOut, - wizardRunner, - broadcastVoiceWakeChanged, - }; - - // Register a lazy fallback for plugin subagent dispatch in non-WS paths - // (Telegram polling, WhatsApp, etc.) so later runtime swaps can expose the - // current gateway context without relying on a startup snapshot. - setFallbackGatewayContextResolver(() => gatewayRequestContext); - - attachGatewayWsHandlers({ - wss, - clients, - port, - gatewayHost: bindHost ?? undefined, - canvasHostEnabled: Boolean(canvasHost), - canvasHostServerPort, - resolvedAuth, - rateLimiter: authRateLimiter, - browserRateLimiter: browserAuthRateLimiter, - gatewayMethods, - events: GATEWAY_EVENTS, - logGateway: log, - logHealth, - logWsControl, - extraHandlers: { - ...pluginRegistry.gatewayHandlers, - ...execApprovalHandlers, - ...secretsHandlers, - }, - broadcast, - context: gatewayRequestContext, - }); - logGatewayStartup({ - cfg: cfgAtStart, - bindHost, - bindHosts: httpBindHosts, - port, - tlsEnabled: gatewayTls.enabled, - log, - isNixMode, - }); - const stopGatewayUpdateCheck = minimalTestGateway - ? () => {} - : scheduleGatewayUpdateCheck({ - cfg: cfgAtStart, - log, - isNixMode, - onUpdateAvailableChange: (updateAvailable) => { - const payload: GatewayUpdateAvailableEventPayload = { updateAvailable }; - broadcast(GATEWAY_EVENT_UPDATE_AVAILABLE, payload, { dropIfSlow: true }); - }, - }); - const tailscaleCleanup = minimalTestGateway - ? null - : await startGatewayTailscaleExposure({ - tailscaleMode, - resetOnExit: tailscaleConfig.resetOnExit, - port, - controlUiBasePath, - logTailscale, - }); - - let browserControl: Awaited> = null; - if (!minimalTestGateway) { - if (deferredConfiguredChannelPluginIds.length > 0) { - ({ pluginRegistry } = loadGatewayPlugins({ - cfg: cfgAtStart, - workspaceDir: defaultWorkspaceDir, - log, - coreGatewayHandlers, - baseMethods, - logDiagnostics: false, - })); - // Re-pin: the deferred reload replaces setup-entry channel objects with - // full runtime implementations. Update the pinned channel registry so - // getChannelPlugin() resolves against the complete set. - pinActivePluginChannelRegistry(pluginRegistry); + if (!minimalTestGateway) { + heartbeatRunner = startHeartbeatRunner({ cfg: cfgAtStart }); } - ({ browserControl, pluginServices } = await startGatewaySidecars({ - cfg: cfgAtStart, - pluginRegistry, - defaultWorkspaceDir, - deps, - startChannels, - log, - logHooks, - logChannels, - logBrowser, - })); - } - // Run gateway_start plugin hook (fire-and-forget) - if (!minimalTestGateway) { - const hookRunner = getGlobalHookRunner(); - if (hookRunner?.hasHooks("gateway_start")) { - void hookRunner.runGatewayStart({ port }, { port }).catch((err) => { - log.warn(`gateway_start hook failed: ${String(err)}`); - }); - } - } - - const configReloader = minimalTestGateway - ? { stop: async () => {} } - : (() => { - const { applyHotReload, requestGatewayRestart } = createGatewayReloadHandlers({ - deps, - broadcast, - getState: () => ({ - hooksConfig, - hookClientIpConfig, - heartbeatRunner, - cronState, - browserControl, - channelHealthMonitor, + const healthCheckMinutes = cfgAtStart.gateway?.channelHealthCheckMinutes; + const healthCheckDisabled = healthCheckMinutes === 0; + const staleEventThresholdMinutes = cfgAtStart.gateway?.channelStaleEventThresholdMinutes; + const maxRestartsPerHour = cfgAtStart.gateway?.channelMaxRestartsPerHour; + channelHealthMonitor = healthCheckDisabled + ? null + : startChannelHealthMonitor({ + channelManager, + checkIntervalMs: (healthCheckMinutes ?? 5) * 60_000, + ...(staleEventThresholdMinutes != null && { + staleEventThresholdMs: staleEventThresholdMinutes * 60_000, }), - setState: (nextState) => { - hooksConfig = nextState.hooksConfig; - hookClientIpConfig = nextState.hookClientIpConfig; - heartbeatRunner = nextState.heartbeatRunner; - cronState = nextState.cronState; - cron = cronState.cron; - cronStorePath = cronState.storePath; - browserControl = nextState.browserControl; - channelHealthMonitor = nextState.channelHealthMonitor; - }, - startChannel, - stopChannel, - logHooks, - logBrowser, - logChannels, - logCron, - logReload, - createHealthMonitor: (opts: { - checkIntervalMs: number; - staleEventThresholdMs?: number; - maxRestartsPerHour?: number; - }) => - startChannelHealthMonitor({ - channelManager, - checkIntervalMs: opts.checkIntervalMs, - ...(opts.staleEventThresholdMs != null && { - staleEventThresholdMs: opts.staleEventThresholdMs, - }), - ...(opts.maxRestartsPerHour != null && { - maxRestartsPerHour: opts.maxRestartsPerHour, - }), - }), + ...(maxRestartsPerHour != null && { maxRestartsPerHour }), }); - return startGatewayConfigReloader({ - initialConfig: cfgAtStart, - readSnapshot: readConfigFileSnapshot, - onHotReload: async (plan, nextConfig) => { - const previousSnapshot = getActiveSecretsRuntimeSnapshot(); - const prepared = await activateRuntimeSecrets(nextConfig, { - reason: "reload", - activate: true, - }); - try { - await applyHotReload(plan, prepared.config); - } catch (err) { - if (previousSnapshot) { - activateSecretsRuntimeSnapshot(previousSnapshot); - } else { - clearSecretsRuntimeSnapshot(); - } - throw err; - } - }, - onRestart: async (plan, nextConfig) => { - await activateRuntimeSecrets(nextConfig, { reason: "restart-check", activate: false }); - requestGatewayRestart(plan, nextConfig); - }, - log: { - info: (msg) => logReload.info(msg), - warn: (msg) => logReload.warn(msg), - error: (msg) => logReload.error(msg), - }, - watchPath: configSnapshot.path, + if (!minimalTestGateway) { + void cron.start().catch((err) => logCron.error(`failed to start: ${String(err)}`)); + } + + stopModelPricingRefresh = + !minimalTestGateway && process.env.VITEST !== "1" + ? startGatewayModelPricingRefresh({ config: cfgAtStart }) + : () => {}; + + // Recover pending outbound deliveries from previous crash/restart. + if (!minimalTestGateway) { + void (async () => { + const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js"); + const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js"); + const logRecovery = log.child("delivery-recovery"); + await recoverPendingDeliveries({ + deliver: deliverOutboundPayloads, + log: logRecovery, + cfg: cfgAtStart, }); - })(); + })().catch((err) => log.error(`Delivery recovery failed: ${String(err)}`)); + } + + const execApprovalManager = new ExecApprovalManager(); + const execApprovalForwarder = createExecApprovalForwarder(); + const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, { + forwarder: execApprovalForwarder, + }); + const secretsHandlers = createSecretsHandlers({ + reloadSecrets: async () => { + const active = getActiveSecretsRuntimeSnapshot(); + if (!active) { + throw new Error("Secrets runtime snapshot is not active."); + } + const prepared = await activateRuntimeSecrets(active.sourceConfig, { + reason: "reload", + activate: true, + }); + return { warningCount: prepared.warnings.length }; + }, + resolveSecrets: async ({ commandName, targetIds }) => { + const { assignments, diagnostics, inactiveRefPaths } = + resolveCommandSecretsFromActiveRuntimeSnapshot({ + commandName, + targetIds: new Set(targetIds), + }); + if (assignments.length === 0) { + return { assignments: [] as CommandSecretAssignment[], diagnostics, inactiveRefPaths }; + } + return { assignments, diagnostics, inactiveRefPaths }; + }, + }); + + const canvasHostServerPort = (canvasHostServer as CanvasHostServer | null)?.port; + + const gatewayRequestContext: import("./server-methods/types.js").GatewayRequestContext = { + deps, + cron, + cronStorePath, + execApprovalManager, + loadGatewayModelCatalog, + getHealthCache, + refreshHealthSnapshot: refreshGatewayHealthSnapshot, + logHealth, + logGateway: log, + incrementPresenceVersion, + getHealthVersion, + broadcast, + broadcastToConnIds, + nodeSendToSession, + nodeSendToAllSubscribed, + nodeSubscribe, + nodeUnsubscribe, + nodeUnsubscribeAll, + hasConnectedMobileNode: hasMobileNodeConnected, + hasExecApprovalClients: () => { + for (const gatewayClient of clients) { + const scopes = Array.isArray(gatewayClient.connect.scopes) + ? gatewayClient.connect.scopes + : []; + if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) { + return true; + } + } + return false; + }, + nodeRegistry, + agentRunSeq, + chatAbortControllers, + chatAbortedRuns: chatRunState.abortedRuns, + chatRunBuffers: chatRunState.buffers, + chatDeltaSentAt: chatRunState.deltaSentAt, + chatDeltaLastBroadcastLen: chatRunState.deltaLastBroadcastLen, + addChatRun, + removeChatRun, + subscribeSessionEvents: sessionEventSubscribers.subscribe, + unsubscribeSessionEvents: sessionEventSubscribers.unsubscribe, + subscribeSessionMessageEvents: sessionMessageSubscribers.subscribe, + unsubscribeSessionMessageEvents: sessionMessageSubscribers.unsubscribe, + unsubscribeAllSessionEvents: (connId: string) => { + sessionEventSubscribers.unsubscribe(connId); + sessionMessageSubscribers.unsubscribeAll(connId); + }, + getSessionEventSubscriberConnIds: sessionEventSubscribers.getAll, + registerToolEventRecipient: toolEventRecipients.add, + dedupe, + wizardSessions, + findRunningWizard, + purgeWizardSession, + getRuntimeSnapshot, + startChannel, + stopChannel, + markChannelLoggedOut, + wizardRunner, + broadcastVoiceWakeChanged, + }; + + // Register a lazy fallback for plugin subagent dispatch in non-WS paths + // (Telegram polling, WhatsApp, etc.) so later runtime swaps can expose the + // current gateway context without relying on a startup snapshot. + setFallbackGatewayContextResolver(() => gatewayRequestContext); + + attachGatewayWsHandlers({ + wss, + clients, + port, + gatewayHost: bindHost ?? undefined, + canvasHostEnabled: Boolean(canvasHost), + canvasHostServerPort, + resolvedAuth, + rateLimiter: authRateLimiter, + browserRateLimiter: browserAuthRateLimiter, + gatewayMethods, + events: GATEWAY_EVENTS, + logGateway: log, + logHealth, + logWsControl, + extraHandlers: { + ...pluginRegistry.gatewayHandlers, + ...execApprovalHandlers, + ...secretsHandlers, + }, + broadcast, + context: gatewayRequestContext, + }); + logGatewayStartup({ + cfg: cfgAtStart, + bindHost, + bindHosts: httpBindHosts, + port, + tlsEnabled: gatewayTls.enabled, + log, + isNixMode, + }); + stopGatewayUpdateCheck = minimalTestGateway + ? () => {} + : scheduleGatewayUpdateCheck({ + cfg: cfgAtStart, + log, + isNixMode, + onUpdateAvailableChange: (updateAvailable) => { + const payload: GatewayUpdateAvailableEventPayload = { updateAvailable }; + broadcast(GATEWAY_EVENT_UPDATE_AVAILABLE, payload, { dropIfSlow: true }); + }, + }); + tailscaleCleanup = minimalTestGateway + ? null + : await startGatewayTailscaleExposure({ + tailscaleMode, + resetOnExit: tailscaleConfig.resetOnExit, + port, + controlUiBasePath, + logTailscale, + }); + + if (!minimalTestGateway) { + if (deferredConfiguredChannelPluginIds.length > 0) { + ({ pluginRegistry } = loadGatewayPlugins({ + cfg: cfgAtStart, + workspaceDir: defaultWorkspaceDir, + log, + coreGatewayHandlers, + baseMethods, + logDiagnostics: false, + })); + // Re-pin: the deferred reload replaces setup-entry channel objects with + // full runtime implementations. Update the pinned channel registry so + // getChannelPlugin() resolves against the complete set. + pinActivePluginChannelRegistry(pluginRegistry); + } + ({ browserControl, pluginServices } = await startGatewaySidecars({ + cfg: cfgAtStart, + pluginRegistry, + defaultWorkspaceDir, + deps, + startChannels, + log, + logHooks, + logChannels, + logBrowser, + })); + } + + // Run gateway_start plugin hook (fire-and-forget) + if (!minimalTestGateway) { + const hookRunner = getGlobalHookRunner(); + if (hookRunner?.hasHooks("gateway_start")) { + void hookRunner.runGatewayStart({ port }, { port }).catch((err) => { + log.warn(`gateway_start hook failed: ${String(err)}`); + }); + } + } + + configReloader = minimalTestGateway + ? { stop: async () => {} } + : (() => { + const { applyHotReload, requestGatewayRestart } = createGatewayReloadHandlers({ + deps, + broadcast, + getState: () => ({ + hooksConfig, + hookClientIpConfig, + heartbeatRunner, + cronState, + browserControl, + channelHealthMonitor, + }), + setState: (nextState) => { + hooksConfig = nextState.hooksConfig; + hookClientIpConfig = nextState.hookClientIpConfig; + heartbeatRunner = nextState.heartbeatRunner; + cronState = nextState.cronState; + cron = cronState.cron; + cronStorePath = cronState.storePath; + browserControl = nextState.browserControl; + channelHealthMonitor = nextState.channelHealthMonitor; + }, + startChannel, + stopChannel, + logHooks, + logBrowser, + logChannels, + logCron, + logReload, + createHealthMonitor: (opts: { + checkIntervalMs: number; + staleEventThresholdMs?: number; + maxRestartsPerHour?: number; + }) => + startChannelHealthMonitor({ + channelManager, + checkIntervalMs: opts.checkIntervalMs, + ...(opts.staleEventThresholdMs != null && { + staleEventThresholdMs: opts.staleEventThresholdMs, + }), + ...(opts.maxRestartsPerHour != null && { + maxRestartsPerHour: opts.maxRestartsPerHour, + }), + }), + }); + + return startGatewayConfigReloader({ + initialConfig: cfgAtStart, + readSnapshot: readConfigFileSnapshot, + onHotReload: async (plan, nextConfig) => { + const previousSnapshot = getActiveSecretsRuntimeSnapshot(); + const prepared = await activateRuntimeSecrets(nextConfig, { + reason: "reload", + activate: true, + }); + try { + await applyHotReload(plan, prepared.config); + } catch (err) { + if (previousSnapshot) { + activateSecretsRuntimeSnapshot(previousSnapshot); + } else { + clearSecretsRuntimeSnapshot(); + } + throw err; + } + }, + onRestart: async (plan, nextConfig) => { + await activateRuntimeSecrets(nextConfig, { + reason: "restart-check", + activate: false, + }); + requestGatewayRestart(plan, nextConfig); + }, + log: { + info: (msg) => logReload.info(msg), + warn: (msg) => logReload.warn(msg), + error: (msg) => logReload.error(msg), + }, + watchPath: configSnapshot.path, + }); + })(); + } catch (err) { + await closeOnStartupFailure(); + throw err; + } const close = createGatewayCloseHandler({ bonjourStop,