mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-18 20:51:10 +00:00
refactor(gateway): split startup and runtime seams (#63975)
Merged via squash.
Prepared head SHA: c6e47efa12
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Co-authored-by: gumadeiras <5599352+gumadeiras@users.noreply.github.com>
Reviewed-by: @gumadeiras
This commit is contained in:
committed by
GitHub
parent
33ad806a14
commit
8de63ca268
@@ -10,6 +10,7 @@ Docs: https://docs.openclaw.ai
|
||||
- macOS/Talk: add an experimental local MLX speech provider for Talk Mode, with explicit provider selection, local utterance playback, interruption handling, and system-voice fallback. (#63539) Thanks @ImLukeF.
|
||||
- Docs i18n: chunk raw doc translation, reject truncated tagged outputs, avoid ambiguous body-only wrapper unwrapping, and recover from terminated Pi translation sessions without changing the default `openai/gpt-5.4` path. (#62969, #63808) Thanks @hxy91819.
|
||||
- QA/testing: add a `--runner multipass` lane for `openclaw qa suite` so repo-backed QA scenarios can run inside a disposable Linux VM and write back the usual report, summary, and VM logs. (#63426) Thanks @shakkernerd.
|
||||
- Gateway: split startup and runtime seams so gateway lifecycle sequencing, reload state, and shutdown behavior stay easier to maintain without changing observed behavior. (#63975) Thanks @gumadeiras.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
||||
94
src/gateway/server-aux-handlers.ts
Normal file
94
src/gateway/server-aux-handlers.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { createExecApprovalForwarder } from "../infra/exec-approval-forwarder.js";
|
||||
import { type PluginApprovalRequestPayload } from "../infra/plugin-approvals.js";
|
||||
import {
|
||||
resolveCommandSecretsFromActiveRuntimeSnapshot,
|
||||
type CommandSecretAssignment,
|
||||
} from "../secrets/runtime-command-secrets.js";
|
||||
import { getActiveSecretsRuntimeSnapshot } from "../secrets/runtime.js";
|
||||
import { createExecApprovalIosPushDelivery } from "./exec-approval-ios-push.js";
|
||||
import { ExecApprovalManager } from "./exec-approval-manager.js";
|
||||
import { createExecApprovalHandlers } from "./server-methods/exec-approval.js";
|
||||
import { createPluginApprovalHandlers } from "./server-methods/plugin-approval.js";
|
||||
import { createSecretsHandlers } from "./server-methods/secrets.js";
|
||||
import {
|
||||
disconnectStaleSharedGatewayAuthClients,
|
||||
setCurrentSharedGatewaySessionGeneration,
|
||||
type SharedGatewayAuthClient,
|
||||
type SharedGatewaySessionGenerationState,
|
||||
} from "./server-shared-auth-generation.js";
|
||||
import type { ActivateRuntimeSecrets } from "./server-startup-config.js";
|
||||
|
||||
type GatewayAuxHandlerLogger = {
|
||||
warn?: (message: string) => void;
|
||||
error?: (message: string) => void;
|
||||
debug?: (message: string) => void;
|
||||
};
|
||||
|
||||
export function createGatewayAuxHandlers(params: {
|
||||
log: GatewayAuxHandlerLogger;
|
||||
activateRuntimeSecrets: ActivateRuntimeSecrets;
|
||||
sharedGatewaySessionGenerationState: SharedGatewaySessionGenerationState;
|
||||
resolveSharedGatewaySessionGenerationForConfig: (config: OpenClawConfig) => string | undefined;
|
||||
clients: Iterable<SharedGatewayAuthClient>;
|
||||
}) {
|
||||
const execApprovalManager = new ExecApprovalManager();
|
||||
const execApprovalForwarder = createExecApprovalForwarder();
|
||||
const execApprovalIosPushDelivery = createExecApprovalIosPushDelivery({ log: params.log });
|
||||
const execApprovalHandlers = createExecApprovalHandlers(execApprovalManager, {
|
||||
forwarder: execApprovalForwarder,
|
||||
iosPushDelivery: execApprovalIosPushDelivery,
|
||||
});
|
||||
const pluginApprovalManager = new ExecApprovalManager<PluginApprovalRequestPayload>();
|
||||
const pluginApprovalHandlers = createPluginApprovalHandlers(pluginApprovalManager, {
|
||||
forwarder: execApprovalForwarder,
|
||||
});
|
||||
const secretsHandlers = createSecretsHandlers({
|
||||
reloadSecrets: async () => {
|
||||
const active = getActiveSecretsRuntimeSnapshot();
|
||||
if (!active) {
|
||||
throw new Error("Secrets runtime snapshot is not active.");
|
||||
}
|
||||
const previousSharedGatewaySessionGeneration =
|
||||
params.sharedGatewaySessionGenerationState.current;
|
||||
const prepared = await params.activateRuntimeSecrets(active.sourceConfig, {
|
||||
reason: "reload",
|
||||
activate: true,
|
||||
});
|
||||
const nextSharedGatewaySessionGeneration =
|
||||
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
|
||||
setCurrentSharedGatewaySessionGeneration(
|
||||
params.sharedGatewaySessionGenerationState,
|
||||
nextSharedGatewaySessionGeneration,
|
||||
);
|
||||
if (previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration) {
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
return { warningCount: prepared.warnings.length };
|
||||
},
|
||||
resolveSecrets: async ({ commandName, targetIds }) => {
|
||||
const { assignments, diagnostics, inactiveRefPaths } =
|
||||
resolveCommandSecretsFromActiveRuntimeSnapshot({
|
||||
commandName,
|
||||
targetIds: new Set(targetIds),
|
||||
});
|
||||
if (assignments.length === 0) {
|
||||
return { assignments: [] as CommandSecretAssignment[], diagnostics, inactiveRefPaths };
|
||||
}
|
||||
return { assignments, diagnostics, inactiveRefPaths };
|
||||
},
|
||||
});
|
||||
|
||||
return {
|
||||
execApprovalManager,
|
||||
pluginApprovalManager,
|
||||
extraHandlers: {
|
||||
...execApprovalHandlers,
|
||||
...pluginApprovalHandlers,
|
||||
...secretsHandlers,
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -12,6 +12,28 @@ const shutdownLog = createSubsystemLogger("gateway/shutdown");
|
||||
const WEBSOCKET_CLOSE_GRACE_MS = 1_000;
|
||||
const WEBSOCKET_CLOSE_FORCE_CONTINUE_MS = 250;
|
||||
|
||||
export async function runGatewayClosePrelude(params: {
|
||||
stopDiagnostics?: () => void;
|
||||
clearSkillsRefreshTimer?: () => void;
|
||||
skillsChangeUnsub?: () => void;
|
||||
disposeAuthRateLimiter?: () => void;
|
||||
disposeBrowserAuthRateLimiter: () => void;
|
||||
stopModelPricingRefresh?: () => void;
|
||||
stopChannelHealthMonitor?: () => void;
|
||||
clearSecretsRuntimeSnapshot?: () => void;
|
||||
closeMcpServer?: () => Promise<void>;
|
||||
}): Promise<void> {
|
||||
params.stopDiagnostics?.();
|
||||
params.clearSkillsRefreshTimer?.();
|
||||
params.skillsChangeUnsub?.();
|
||||
params.disposeAuthRateLimiter?.();
|
||||
params.disposeBrowserAuthRateLimiter();
|
||||
params.stopModelPricingRefresh?.();
|
||||
params.stopChannelHealthMonitor?.();
|
||||
params.clearSecretsRuntimeSnapshot?.();
|
||||
await params.closeMcpServer?.().catch(() => {});
|
||||
}
|
||||
|
||||
export function createGatewayCloseHandler(params: {
|
||||
bonjourStop: (() => Promise<void>) | null;
|
||||
tailscaleCleanup: (() => Promise<void>) | null;
|
||||
|
||||
62
src/gateway/server-control-ui-root.ts
Normal file
62
src/gateway/server-control-ui-root.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import path from "node:path";
|
||||
import {
|
||||
ensureControlUiAssetsBuilt,
|
||||
isPackageProvenControlUiRootSync,
|
||||
resolveControlUiRootOverrideSync,
|
||||
resolveControlUiRootSync,
|
||||
} from "../infra/control-ui-assets.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { ControlUiRootState } from "./control-ui.js";
|
||||
|
||||
export async function resolveGatewayControlUiRootState(params: {
|
||||
controlUiRootOverride?: string;
|
||||
controlUiEnabled: boolean;
|
||||
gatewayRuntime: RuntimeEnv;
|
||||
log: { warn: (message: string) => void };
|
||||
}): Promise<ControlUiRootState | undefined> {
|
||||
if (params.controlUiRootOverride) {
|
||||
const resolvedOverride = resolveControlUiRootOverrideSync(params.controlUiRootOverride);
|
||||
const resolvedOverridePath = path.resolve(params.controlUiRootOverride);
|
||||
if (!resolvedOverride) {
|
||||
params.log.warn(`gateway: controlUi.root not found at ${resolvedOverridePath}`);
|
||||
}
|
||||
return resolvedOverride
|
||||
? { kind: "resolved", path: resolvedOverride }
|
||||
: { kind: "invalid", path: resolvedOverridePath };
|
||||
}
|
||||
|
||||
if (!params.controlUiEnabled) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const resolveRoot = () =>
|
||||
resolveControlUiRootSync({
|
||||
moduleUrl: import.meta.url,
|
||||
argv1: process.argv[1],
|
||||
cwd: process.cwd(),
|
||||
});
|
||||
|
||||
let resolvedRoot = resolveRoot();
|
||||
if (!resolvedRoot) {
|
||||
const ensureResult = await ensureControlUiAssetsBuilt(params.gatewayRuntime);
|
||||
if (!ensureResult.ok && ensureResult.message) {
|
||||
params.log.warn(`gateway: ${ensureResult.message}`);
|
||||
}
|
||||
resolvedRoot = resolveRoot();
|
||||
}
|
||||
|
||||
if (!resolvedRoot) {
|
||||
return { kind: "missing" };
|
||||
}
|
||||
|
||||
return {
|
||||
kind: isPackageProvenControlUiRootSync(resolvedRoot, {
|
||||
moduleUrl: import.meta.url,
|
||||
argv1: process.argv[1],
|
||||
cwd: process.cwd(),
|
||||
})
|
||||
? "bundled"
|
||||
: "resolved",
|
||||
path: resolvedRoot,
|
||||
};
|
||||
}
|
||||
32
src/gateway/server-live-state.ts
Normal file
32
src/gateway/server-live-state.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import type { PluginServicesHandle } from "../plugins/services.js";
|
||||
import type { HooksConfigResolved } from "./hooks.js";
|
||||
import type { GatewayCronState } from "./server-cron.js";
|
||||
import type { HookClientIpConfig } from "./server-http.js";
|
||||
import {
|
||||
createGatewayServerMutableState,
|
||||
type GatewayServerMutableState,
|
||||
} from "./server-runtime-handles.js";
|
||||
|
||||
export type GatewayServerLiveState = GatewayServerMutableState & {
|
||||
hooksConfig: HooksConfigResolved | null;
|
||||
hookClientIpConfig: HookClientIpConfig;
|
||||
cronState: GatewayCronState;
|
||||
pluginServices: PluginServicesHandle | null;
|
||||
gatewayMethods: string[];
|
||||
};
|
||||
|
||||
export function createGatewayServerLiveState(params: {
|
||||
hooksConfig: HooksConfigResolved | null;
|
||||
hookClientIpConfig: HookClientIpConfig;
|
||||
cronState: GatewayCronState;
|
||||
gatewayMethods: string[];
|
||||
}): GatewayServerLiveState {
|
||||
return {
|
||||
...createGatewayServerMutableState(),
|
||||
hooksConfig: params.hooksConfig,
|
||||
hookClientIpConfig: params.hookClientIpConfig,
|
||||
cronState: params.cronState,
|
||||
pluginServices: null,
|
||||
gatewayMethods: params.gatewayMethods,
|
||||
};
|
||||
}
|
||||
44
src/gateway/server-node-session-runtime.ts
Normal file
44
src/gateway/server-node-session-runtime.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
import { NodeRegistry } from "./node-registry.js";
|
||||
import {
|
||||
createSessionEventSubscriberRegistry,
|
||||
createSessionMessageSubscriberRegistry,
|
||||
} from "./server-chat.js";
|
||||
import { safeParseJson } from "./server-methods/nodes.helpers.js";
|
||||
import { hasConnectedMobileNode } from "./server-mobile-nodes.js";
|
||||
import { createNodeSubscriptionManager } from "./server-node-subscriptions.js";
|
||||
|
||||
export function createGatewayNodeSessionRuntime(params: {
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
}) {
|
||||
const nodeRegistry = new NodeRegistry();
|
||||
const nodePresenceTimers = new Map<string, ReturnType<typeof setInterval>>();
|
||||
const nodeSubscriptions = createNodeSubscriptionManager();
|
||||
const sessionEventSubscribers = createSessionEventSubscriberRegistry();
|
||||
const sessionMessageSubscribers = createSessionMessageSubscriberRegistry();
|
||||
const nodeSendEvent = (opts: { nodeId: string; event: string; payloadJSON?: string | null }) => {
|
||||
const payload = safeParseJson(opts.payloadJSON ?? null);
|
||||
nodeRegistry.sendEvent(opts.nodeId, opts.event, payload);
|
||||
};
|
||||
const nodeSendToSession = (sessionKey: string, event: string, payload: unknown) =>
|
||||
nodeSubscriptions.sendToSession(sessionKey, event, payload, nodeSendEvent);
|
||||
const nodeSendToAllSubscribed = (event: string, payload: unknown) =>
|
||||
nodeSubscriptions.sendToAllSubscribed(event, payload, nodeSendEvent);
|
||||
const broadcastVoiceWakeChanged = (triggers: string[]) => {
|
||||
params.broadcast("voicewake.changed", { triggers }, { dropIfSlow: true });
|
||||
};
|
||||
const hasMobileNodeConnected = () => hasConnectedMobileNode(nodeRegistry);
|
||||
|
||||
return {
|
||||
nodeRegistry,
|
||||
nodePresenceTimers,
|
||||
sessionEventSubscribers,
|
||||
sessionMessageSubscribers,
|
||||
nodeSendToSession,
|
||||
nodeSendToAllSubscribed,
|
||||
nodeSubscribe: nodeSubscriptions.subscribe,
|
||||
nodeUnsubscribe: nodeSubscriptions.unsubscribe,
|
||||
nodeUnsubscribeAll: nodeSubscriptions.unsubscribeAll,
|
||||
broadcastVoiceWakeChanged,
|
||||
hasMobileNodeConnected,
|
||||
};
|
||||
}
|
||||
@@ -16,13 +16,30 @@ import {
|
||||
} from "../infra/restart.js";
|
||||
import { setCommandLaneConcurrency, getTotalQueueSize } from "../process/command-queue.js";
|
||||
import { CommandLane } from "../process/lanes.js";
|
||||
import {
|
||||
activateSecretsRuntimeSnapshot,
|
||||
clearSecretsRuntimeSnapshot,
|
||||
getActiveSecretsRuntimeSnapshot,
|
||||
} from "../secrets/runtime.js";
|
||||
import { getInspectableTaskRegistrySummary } from "../tasks/task-registry.maintenance.js";
|
||||
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
import type { ChannelKind } from "./config-reload-plan.js";
|
||||
import type { GatewayReloadPlan } from "./config-reload.js";
|
||||
import { startGatewayConfigReloader, type GatewayReloadPlan } from "./config-reload.js";
|
||||
import { resolveHooksConfig } from "./hooks.js";
|
||||
import { buildGatewayCronService, type GatewayCronState } from "./server-cron.js";
|
||||
import type { HookClientIpConfig } from "./server-http.js";
|
||||
import {
|
||||
type GatewayChannelManager,
|
||||
startGatewayChannelHealthMonitor,
|
||||
startGatewayCronWithLogging,
|
||||
} from "./server-runtime-services.js";
|
||||
import {
|
||||
disconnectStaleSharedGatewayAuthClients,
|
||||
setCurrentSharedGatewaySessionGeneration,
|
||||
type SharedGatewayAuthClient,
|
||||
type SharedGatewaySessionGenerationState,
|
||||
} from "./server-shared-auth-generation.js";
|
||||
import type { ActivateRuntimeSecrets } from "./server-startup-config.js";
|
||||
import { resolveHookClientIpConfig } from "./server/hooks.js";
|
||||
|
||||
type GatewayHotReloadState = {
|
||||
@@ -48,11 +65,7 @@ export function createGatewayReloadHandlers(params: {
|
||||
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
|
||||
logCron: { error: (msg: string) => void };
|
||||
logReload: { info: (msg: string) => void; warn: (msg: string) => void };
|
||||
createHealthMonitor: (opts: {
|
||||
checkIntervalMs: number;
|
||||
staleEventThresholdMs?: number;
|
||||
maxRestartsPerHour?: number;
|
||||
}) => ChannelHealthMonitor;
|
||||
createHealthMonitor: (config: ReturnType<typeof loadConfig>) => ChannelHealthMonitor | null;
|
||||
}) {
|
||||
const applyHotReload = async (
|
||||
plan: GatewayReloadPlan,
|
||||
@@ -84,25 +97,15 @@ export function createGatewayReloadHandlers(params: {
|
||||
deps: params.deps,
|
||||
broadcast: params.broadcast,
|
||||
});
|
||||
void nextState.cronState.cron
|
||||
.start()
|
||||
.catch((err) => params.logCron.error(`failed to start: ${String(err)}`));
|
||||
startGatewayCronWithLogging({
|
||||
cron: nextState.cronState.cron,
|
||||
logCron: params.logCron,
|
||||
});
|
||||
}
|
||||
|
||||
if (plan.restartHealthMonitor) {
|
||||
state.channelHealthMonitor?.stop();
|
||||
const minutes = nextConfig.gateway?.channelHealthCheckMinutes;
|
||||
const staleMinutes = nextConfig.gateway?.channelStaleEventThresholdMinutes;
|
||||
nextState.channelHealthMonitor =
|
||||
minutes === 0
|
||||
? null
|
||||
: params.createHealthMonitor({
|
||||
checkIntervalMs: (minutes ?? 5) * 60_000,
|
||||
...(staleMinutes != null && { staleEventThresholdMs: staleMinutes * 60_000 }),
|
||||
...(nextConfig.gateway?.channelMaxRestartsPerHour != null && {
|
||||
maxRestartsPerHour: nextConfig.gateway.channelMaxRestartsPerHour,
|
||||
}),
|
||||
});
|
||||
nextState.channelHealthMonitor = params.createHealthMonitor(nextConfig);
|
||||
}
|
||||
|
||||
if (plan.restartGmailWatcher) {
|
||||
@@ -246,3 +249,158 @@ export function createGatewayReloadHandlers(params: {
|
||||
|
||||
return { applyHotReload, requestGatewayRestart };
|
||||
}
|
||||
|
||||
export function startManagedGatewayConfigReloader(params: {
|
||||
minimalTestGateway: boolean;
|
||||
initialConfig: ReturnType<typeof loadConfig>;
|
||||
initialInternalWriteHash: string | null;
|
||||
watchPath: string;
|
||||
readSnapshot: typeof import("../config/config.js").readConfigFileSnapshot;
|
||||
subscribeToWrites: typeof import("../config/config.js").registerConfigWriteListener;
|
||||
deps: CliDeps;
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
getState: () => GatewayHotReloadState;
|
||||
setState: (state: GatewayHotReloadState) => void;
|
||||
startChannel: (name: ChannelKind) => Promise<void>;
|
||||
stopChannel: (name: ChannelKind) => Promise<void>;
|
||||
logHooks: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
error: (msg: string) => void;
|
||||
};
|
||||
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
|
||||
logCron: { error: (msg: string) => void };
|
||||
logReload: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
error: (msg: string) => void;
|
||||
};
|
||||
channelManager: GatewayChannelManager;
|
||||
activateRuntimeSecrets: ActivateRuntimeSecrets;
|
||||
resolveSharedGatewaySessionGenerationForConfig: (
|
||||
config: ReturnType<typeof loadConfig>,
|
||||
) => string | undefined;
|
||||
sharedGatewaySessionGenerationState: SharedGatewaySessionGenerationState;
|
||||
clients: Iterable<SharedGatewayAuthClient>;
|
||||
}) {
|
||||
if (params.minimalTestGateway) {
|
||||
return { stop: async () => {} };
|
||||
}
|
||||
|
||||
const { applyHotReload, requestGatewayRestart } = createGatewayReloadHandlers({
|
||||
deps: params.deps,
|
||||
broadcast: params.broadcast,
|
||||
getState: params.getState,
|
||||
setState: params.setState,
|
||||
startChannel: params.startChannel,
|
||||
stopChannel: params.stopChannel,
|
||||
logHooks: params.logHooks,
|
||||
logChannels: params.logChannels,
|
||||
logCron: params.logCron,
|
||||
logReload: params.logReload,
|
||||
createHealthMonitor: (config) =>
|
||||
startGatewayChannelHealthMonitor({
|
||||
cfg: config,
|
||||
channelManager: params.channelManager,
|
||||
}),
|
||||
});
|
||||
|
||||
return startGatewayConfigReloader({
|
||||
initialConfig: params.initialConfig,
|
||||
initialInternalWriteHash: params.initialInternalWriteHash,
|
||||
readSnapshot: params.readSnapshot,
|
||||
subscribeToWrites: params.subscribeToWrites,
|
||||
onHotReload: async (plan, nextConfig) => {
|
||||
const previousSharedGatewaySessionGeneration =
|
||||
params.sharedGatewaySessionGenerationState.current;
|
||||
const previousSnapshot = getActiveSecretsRuntimeSnapshot();
|
||||
const prepared = await params.activateRuntimeSecrets(nextConfig, {
|
||||
reason: "reload",
|
||||
activate: true,
|
||||
});
|
||||
const nextSharedGatewaySessionGeneration =
|
||||
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
|
||||
params.sharedGatewaySessionGenerationState.current = nextSharedGatewaySessionGeneration;
|
||||
const sharedGatewaySessionGenerationChanged =
|
||||
previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration;
|
||||
if (sharedGatewaySessionGenerationChanged) {
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
try {
|
||||
await applyHotReload(plan, prepared.config);
|
||||
} catch (err) {
|
||||
if (previousSnapshot) {
|
||||
activateSecretsRuntimeSnapshot(previousSnapshot);
|
||||
} else {
|
||||
clearSecretsRuntimeSnapshot();
|
||||
}
|
||||
params.sharedGatewaySessionGenerationState.current = previousSharedGatewaySessionGeneration;
|
||||
if (sharedGatewaySessionGenerationChanged) {
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: previousSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
setCurrentSharedGatewaySessionGeneration(
|
||||
params.sharedGatewaySessionGenerationState,
|
||||
nextSharedGatewaySessionGeneration,
|
||||
);
|
||||
},
|
||||
onRestart: async (plan, nextConfig) => {
|
||||
const previousRequiredSharedGatewaySessionGeneration =
|
||||
params.sharedGatewaySessionGenerationState.required;
|
||||
const previousSharedGatewaySessionGeneration =
|
||||
params.sharedGatewaySessionGenerationState.current;
|
||||
try {
|
||||
const prepared = await params.activateRuntimeSecrets(nextConfig, {
|
||||
reason: "restart-check",
|
||||
activate: false,
|
||||
});
|
||||
const nextSharedGatewaySessionGeneration =
|
||||
params.resolveSharedGatewaySessionGenerationForConfig(prepared.config);
|
||||
const restartQueued = requestGatewayRestart(plan, nextConfig);
|
||||
if (!restartQueued) {
|
||||
if (previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration) {
|
||||
activateSecretsRuntimeSnapshot(prepared);
|
||||
setCurrentSharedGatewaySessionGeneration(
|
||||
params.sharedGatewaySessionGenerationState,
|
||||
nextSharedGatewaySessionGeneration,
|
||||
);
|
||||
params.sharedGatewaySessionGenerationState.required = null;
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
} else {
|
||||
params.sharedGatewaySessionGenerationState.required = null;
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (previousSharedGatewaySessionGeneration !== nextSharedGatewaySessionGeneration) {
|
||||
params.sharedGatewaySessionGenerationState.required = nextSharedGatewaySessionGeneration;
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
} else {
|
||||
params.sharedGatewaySessionGenerationState.required = null;
|
||||
}
|
||||
} catch (error) {
|
||||
params.sharedGatewaySessionGenerationState.required =
|
||||
previousRequiredSharedGatewaySessionGeneration;
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
log: {
|
||||
info: (msg) => params.logReload.info(msg),
|
||||
warn: (msg) => params.logReload.warn(msg),
|
||||
error: (msg) => params.logReload.error(msg),
|
||||
},
|
||||
watchPath: params.watchPath,
|
||||
});
|
||||
}
|
||||
|
||||
80
src/gateway/server-request-context.test.ts
Normal file
80
src/gateway/server-request-context.test.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { GatewayServerLiveState } from "./server-live-state.js";
|
||||
import { createGatewayRequestContext } from "./server-request-context.js";
|
||||
|
||||
describe("createGatewayRequestContext", () => {
|
||||
it("reads cron state live from runtime state", () => {
|
||||
const cronA = { start: vi.fn(), stop: vi.fn() } as never;
|
||||
const cronB = { start: vi.fn(), stop: vi.fn() } as never;
|
||||
const runtimeState: Pick<GatewayServerLiveState, "cronState"> = {
|
||||
cronState: {
|
||||
cron: cronA,
|
||||
storePath: "/tmp/cron-a",
|
||||
cronEnabled: true,
|
||||
},
|
||||
};
|
||||
|
||||
const context = createGatewayRequestContext({
|
||||
deps: {} as never,
|
||||
runtimeState,
|
||||
execApprovalManager: undefined,
|
||||
pluginApprovalManager: undefined,
|
||||
loadGatewayModelCatalog: vi.fn(async () => []),
|
||||
getHealthCache: vi.fn(() => null),
|
||||
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
|
||||
logHealth: { error: vi.fn() },
|
||||
logGateway: { warn: vi.fn(), info: vi.fn(), error: vi.fn() } as never,
|
||||
incrementPresenceVersion: vi.fn(() => 1),
|
||||
getHealthVersion: vi.fn(() => 1),
|
||||
broadcast: vi.fn(),
|
||||
broadcastToConnIds: vi.fn(),
|
||||
nodeSendToSession: vi.fn(),
|
||||
nodeSendToAllSubscribed: vi.fn(),
|
||||
nodeSubscribe: vi.fn(),
|
||||
nodeUnsubscribe: vi.fn(),
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
hasConnectedMobileNode: vi.fn(() => false),
|
||||
clients: new Set(),
|
||||
enforceSharedGatewayAuthGenerationForConfigWrite: vi.fn(),
|
||||
nodeRegistry: {} as never,
|
||||
agentRunSeq: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatAbortedRuns: new Map(),
|
||||
chatRunBuffers: new Map(),
|
||||
chatDeltaSentAt: new Map(),
|
||||
chatDeltaLastBroadcastLen: new Map(),
|
||||
addChatRun: vi.fn(),
|
||||
removeChatRun: vi.fn(),
|
||||
subscribeSessionEvents: vi.fn(),
|
||||
unsubscribeSessionEvents: vi.fn(),
|
||||
subscribeSessionMessageEvents: vi.fn(),
|
||||
unsubscribeSessionMessageEvents: vi.fn(),
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
getSessionEventSubscriberConnIds: vi.fn(() => new Set<string>()),
|
||||
registerToolEventRecipient: vi.fn(),
|
||||
dedupe: new Map(),
|
||||
wizardSessions: new Map(),
|
||||
findRunningWizard: vi.fn(() => null),
|
||||
purgeWizardSession: vi.fn(),
|
||||
getRuntimeSnapshot: vi.fn(() => ({}) as never),
|
||||
startChannel: vi.fn(async () => undefined),
|
||||
stopChannel: vi.fn(async () => undefined),
|
||||
markChannelLoggedOut: vi.fn(),
|
||||
wizardRunner: vi.fn(async () => undefined),
|
||||
broadcastVoiceWakeChanged: vi.fn(),
|
||||
unavailableGatewayMethods: new Set(),
|
||||
});
|
||||
|
||||
expect(context.cron).toBe(cronA);
|
||||
expect(context.cronStorePath).toBe("/tmp/cron-a");
|
||||
|
||||
runtimeState.cronState = {
|
||||
cron: cronB,
|
||||
storePath: "/tmp/cron-b",
|
||||
cronEnabled: true,
|
||||
};
|
||||
|
||||
expect(context.cron).toBe(cronB);
|
||||
expect(context.cronStorePath).toBe("/tmp/cron-b");
|
||||
});
|
||||
});
|
||||
154
src/gateway/server-request-context.ts
Normal file
154
src/gateway/server-request-context.ts
Normal file
@@ -0,0 +1,154 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import type { GatewayServerLiveState } from "./server-live-state.js";
|
||||
import type { GatewayRequestContext, GatewayClient } from "./server-methods/types.js";
|
||||
import { disconnectAllSharedGatewayAuthClients } from "./server-shared-auth-generation.js";
|
||||
|
||||
type GatewayRequestContextClient = GatewayClient & {
|
||||
socket: { close: (code: number, reason: string) => void };
|
||||
usesSharedGatewayAuth?: boolean;
|
||||
};
|
||||
|
||||
export type GatewayRequestContextParams = {
|
||||
deps: GatewayRequestContext["deps"];
|
||||
runtimeState: Pick<GatewayServerLiveState, "cronState">;
|
||||
execApprovalManager: GatewayRequestContext["execApprovalManager"];
|
||||
pluginApprovalManager: GatewayRequestContext["pluginApprovalManager"];
|
||||
loadGatewayModelCatalog: GatewayRequestContext["loadGatewayModelCatalog"];
|
||||
getHealthCache: GatewayRequestContext["getHealthCache"];
|
||||
refreshHealthSnapshot: GatewayRequestContext["refreshHealthSnapshot"];
|
||||
logHealth: GatewayRequestContext["logHealth"];
|
||||
logGateway: GatewayRequestContext["logGateway"];
|
||||
incrementPresenceVersion: GatewayRequestContext["incrementPresenceVersion"];
|
||||
getHealthVersion: GatewayRequestContext["getHealthVersion"];
|
||||
broadcast: GatewayRequestContext["broadcast"];
|
||||
broadcastToConnIds: GatewayRequestContext["broadcastToConnIds"];
|
||||
nodeSendToSession: GatewayRequestContext["nodeSendToSession"];
|
||||
nodeSendToAllSubscribed: GatewayRequestContext["nodeSendToAllSubscribed"];
|
||||
nodeSubscribe: GatewayRequestContext["nodeSubscribe"];
|
||||
nodeUnsubscribe: GatewayRequestContext["nodeUnsubscribe"];
|
||||
nodeUnsubscribeAll: GatewayRequestContext["nodeUnsubscribeAll"];
|
||||
hasConnectedMobileNode: GatewayRequestContext["hasConnectedMobileNode"];
|
||||
clients: Set<GatewayRequestContextClient>;
|
||||
enforceSharedGatewayAuthGenerationForConfigWrite: (nextConfig: OpenClawConfig) => void;
|
||||
nodeRegistry: GatewayRequestContext["nodeRegistry"];
|
||||
agentRunSeq: GatewayRequestContext["agentRunSeq"];
|
||||
chatAbortControllers: GatewayRequestContext["chatAbortControllers"];
|
||||
chatAbortedRuns: GatewayRequestContext["chatAbortedRuns"];
|
||||
chatRunBuffers: GatewayRequestContext["chatRunBuffers"];
|
||||
chatDeltaSentAt: GatewayRequestContext["chatDeltaSentAt"];
|
||||
chatDeltaLastBroadcastLen: GatewayRequestContext["chatDeltaLastBroadcastLen"];
|
||||
addChatRun: GatewayRequestContext["addChatRun"];
|
||||
removeChatRun: GatewayRequestContext["removeChatRun"];
|
||||
subscribeSessionEvents: GatewayRequestContext["subscribeSessionEvents"];
|
||||
unsubscribeSessionEvents: GatewayRequestContext["unsubscribeSessionEvents"];
|
||||
subscribeSessionMessageEvents: GatewayRequestContext["subscribeSessionMessageEvents"];
|
||||
unsubscribeSessionMessageEvents: GatewayRequestContext["unsubscribeSessionMessageEvents"];
|
||||
unsubscribeAllSessionEvents: GatewayRequestContext["unsubscribeAllSessionEvents"];
|
||||
getSessionEventSubscriberConnIds: GatewayRequestContext["getSessionEventSubscriberConnIds"];
|
||||
registerToolEventRecipient: GatewayRequestContext["registerToolEventRecipient"];
|
||||
dedupe: GatewayRequestContext["dedupe"];
|
||||
wizardSessions: GatewayRequestContext["wizardSessions"];
|
||||
findRunningWizard: GatewayRequestContext["findRunningWizard"];
|
||||
purgeWizardSession: GatewayRequestContext["purgeWizardSession"];
|
||||
getRuntimeSnapshot: GatewayRequestContext["getRuntimeSnapshot"];
|
||||
startChannel: GatewayRequestContext["startChannel"];
|
||||
stopChannel: GatewayRequestContext["stopChannel"];
|
||||
markChannelLoggedOut: GatewayRequestContext["markChannelLoggedOut"];
|
||||
wizardRunner: GatewayRequestContext["wizardRunner"];
|
||||
broadcastVoiceWakeChanged: GatewayRequestContext["broadcastVoiceWakeChanged"];
|
||||
unavailableGatewayMethods: ReadonlySet<string>;
|
||||
};
|
||||
|
||||
export function createGatewayRequestContext(
|
||||
params: GatewayRequestContextParams,
|
||||
): GatewayRequestContext {
|
||||
return {
|
||||
deps: params.deps,
|
||||
// Keep cron reads live so config hot reload can swap cron/store state without rebuilding
|
||||
// every handler closure that already holds this request context.
|
||||
get cron() {
|
||||
return params.runtimeState.cronState.cron;
|
||||
},
|
||||
get cronStorePath() {
|
||||
return params.runtimeState.cronState.storePath;
|
||||
},
|
||||
execApprovalManager: params.execApprovalManager,
|
||||
pluginApprovalManager: params.pluginApprovalManager,
|
||||
loadGatewayModelCatalog: params.loadGatewayModelCatalog,
|
||||
getHealthCache: params.getHealthCache,
|
||||
refreshHealthSnapshot: params.refreshHealthSnapshot,
|
||||
logHealth: params.logHealth,
|
||||
logGateway: params.logGateway,
|
||||
incrementPresenceVersion: params.incrementPresenceVersion,
|
||||
getHealthVersion: params.getHealthVersion,
|
||||
broadcast: params.broadcast,
|
||||
broadcastToConnIds: params.broadcastToConnIds,
|
||||
nodeSendToSession: params.nodeSendToSession,
|
||||
nodeSendToAllSubscribed: params.nodeSendToAllSubscribed,
|
||||
nodeSubscribe: params.nodeSubscribe,
|
||||
nodeUnsubscribe: params.nodeUnsubscribe,
|
||||
nodeUnsubscribeAll: params.nodeUnsubscribeAll,
|
||||
hasConnectedMobileNode: params.hasConnectedMobileNode,
|
||||
hasExecApprovalClients: (excludeConnId?: string) => {
|
||||
for (const gatewayClient of params.clients) {
|
||||
if (excludeConnId && gatewayClient.connId === excludeConnId) {
|
||||
continue;
|
||||
}
|
||||
const scopes = Array.isArray(gatewayClient.connect.scopes)
|
||||
? gatewayClient.connect.scopes
|
||||
: [];
|
||||
if (scopes.includes("operator.admin") || scopes.includes("operator.approvals")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
},
|
||||
disconnectClientsForDevice: (deviceId: string, opts?: { role?: string }) => {
|
||||
for (const gatewayClient of params.clients) {
|
||||
if (gatewayClient.connect.device?.id !== deviceId) {
|
||||
continue;
|
||||
}
|
||||
if (opts?.role && gatewayClient.connect.role !== opts.role) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
gatewayClient.socket.close(4001, "device removed");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
},
|
||||
disconnectClientsUsingSharedGatewayAuth: () => {
|
||||
disconnectAllSharedGatewayAuthClients(params.clients);
|
||||
},
|
||||
enforceSharedGatewayAuthGenerationForConfigWrite:
|
||||
params.enforceSharedGatewayAuthGenerationForConfigWrite,
|
||||
nodeRegistry: params.nodeRegistry,
|
||||
agentRunSeq: params.agentRunSeq,
|
||||
chatAbortControllers: params.chatAbortControllers,
|
||||
chatAbortedRuns: params.chatAbortedRuns,
|
||||
chatRunBuffers: params.chatRunBuffers,
|
||||
chatDeltaSentAt: params.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen,
|
||||
addChatRun: params.addChatRun,
|
||||
removeChatRun: params.removeChatRun,
|
||||
subscribeSessionEvents: params.subscribeSessionEvents,
|
||||
unsubscribeSessionEvents: params.unsubscribeSessionEvents,
|
||||
subscribeSessionMessageEvents: params.subscribeSessionMessageEvents,
|
||||
unsubscribeSessionMessageEvents: params.unsubscribeSessionMessageEvents,
|
||||
unsubscribeAllSessionEvents: params.unsubscribeAllSessionEvents,
|
||||
getSessionEventSubscriberConnIds: params.getSessionEventSubscriberConnIds,
|
||||
registerToolEventRecipient: params.registerToolEventRecipient,
|
||||
dedupe: params.dedupe,
|
||||
wizardSessions: params.wizardSessions,
|
||||
findRunningWizard: params.findRunningWizard,
|
||||
purgeWizardSession: params.purgeWizardSession,
|
||||
getRuntimeSnapshot: params.getRuntimeSnapshot,
|
||||
startChannel: params.startChannel,
|
||||
stopChannel: params.stopChannel,
|
||||
markChannelLoggedOut: params.markChannelLoggedOut,
|
||||
wizardRunner: params.wizardRunner,
|
||||
broadcastVoiceWakeChanged: params.broadcastVoiceWakeChanged,
|
||||
unavailableGatewayMethods: params.unavailableGatewayMethods,
|
||||
};
|
||||
}
|
||||
61
src/gateway/server-runtime-handles.ts
Normal file
61
src/gateway/server-runtime-handles.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import type { HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
|
||||
export type GatewayConfigReloaderHandle = {
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
|
||||
export type GatewayServerMutableState = {
|
||||
bonjourStop: (() => Promise<void>) | null;
|
||||
tickInterval: ReturnType<typeof setInterval>;
|
||||
healthInterval: ReturnType<typeof setInterval>;
|
||||
dedupeCleanup: ReturnType<typeof setInterval>;
|
||||
mediaCleanup: ReturnType<typeof setInterval> | null;
|
||||
heartbeatRunner: HeartbeatRunner;
|
||||
stopGatewayUpdateCheck: () => void;
|
||||
tailscaleCleanup: (() => Promise<void>) | null;
|
||||
skillsRefreshTimer: ReturnType<typeof setTimeout> | null;
|
||||
skillsRefreshDelayMs: number;
|
||||
skillsChangeUnsub: () => void;
|
||||
channelHealthMonitor: ChannelHealthMonitor | null;
|
||||
stopModelPricingRefresh: () => void;
|
||||
mcpServer: { port: number; close: () => Promise<void> } | undefined;
|
||||
configReloader: GatewayConfigReloaderHandle;
|
||||
agentUnsub: (() => void) | null;
|
||||
heartbeatUnsub: (() => void) | null;
|
||||
transcriptUnsub: (() => void) | null;
|
||||
lifecycleUnsub: (() => void) | null;
|
||||
};
|
||||
|
||||
export function createGatewayServerMutableState(): GatewayServerMutableState {
|
||||
const noopInterval = () => {
|
||||
const timer = setInterval(() => {}, 1 << 30);
|
||||
timer.unref?.();
|
||||
return timer;
|
||||
};
|
||||
return {
|
||||
bonjourStop: null as (() => Promise<void>) | null,
|
||||
tickInterval: noopInterval(),
|
||||
healthInterval: noopInterval(),
|
||||
dedupeCleanup: noopInterval(),
|
||||
mediaCleanup: null as ReturnType<typeof setInterval> | null,
|
||||
heartbeatRunner: {
|
||||
stop: () => {},
|
||||
updateConfig: (_cfg: OpenClawConfig) => {},
|
||||
} satisfies HeartbeatRunner,
|
||||
stopGatewayUpdateCheck: () => {},
|
||||
tailscaleCleanup: null as (() => Promise<void>) | null,
|
||||
skillsRefreshTimer: null as ReturnType<typeof setTimeout> | null,
|
||||
skillsRefreshDelayMs: 30_000,
|
||||
skillsChangeUnsub: () => {},
|
||||
channelHealthMonitor: null as ChannelHealthMonitor | null,
|
||||
stopModelPricingRefresh: () => {},
|
||||
mcpServer: undefined as { port: number; close: () => Promise<void> } | undefined,
|
||||
configReloader: { stop: async () => {} } satisfies GatewayConfigReloaderHandle,
|
||||
agentUnsub: null as (() => void) | null,
|
||||
heartbeatUnsub: null as (() => void) | null,
|
||||
transcriptUnsub: null as (() => void) | null,
|
||||
lifecycleUnsub: null as (() => void) | null,
|
||||
};
|
||||
}
|
||||
109
src/gateway/server-runtime-services.ts
Normal file
109
src/gateway/server-runtime-services.ts
Normal file
@@ -0,0 +1,109 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { startHeartbeatRunner, type HeartbeatRunner } from "../infra/heartbeat-runner.js";
|
||||
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
import { startGatewayModelPricingRefresh } from "./model-pricing-cache.js";
|
||||
|
||||
type GatewayRuntimeServiceLogger = {
|
||||
child: (name: string) => {
|
||||
info: (message: string) => void;
|
||||
warn: (message: string) => void;
|
||||
error: (message: string) => void;
|
||||
};
|
||||
error: (message: string) => void;
|
||||
};
|
||||
|
||||
export type GatewayChannelManager = Parameters<
|
||||
typeof startChannelHealthMonitor
|
||||
>[0]["channelManager"];
|
||||
|
||||
function createNoopHeartbeatRunner(): HeartbeatRunner {
|
||||
return {
|
||||
stop: () => {},
|
||||
updateConfig: (_cfg: OpenClawConfig) => {},
|
||||
};
|
||||
}
|
||||
|
||||
export function startGatewayChannelHealthMonitor(params: {
|
||||
cfg: OpenClawConfig;
|
||||
channelManager: GatewayChannelManager;
|
||||
}): ChannelHealthMonitor | null {
|
||||
const healthCheckMinutes = params.cfg.gateway?.channelHealthCheckMinutes;
|
||||
if (healthCheckMinutes === 0) {
|
||||
return null;
|
||||
}
|
||||
const staleEventThresholdMinutes = params.cfg.gateway?.channelStaleEventThresholdMinutes;
|
||||
const maxRestartsPerHour = params.cfg.gateway?.channelMaxRestartsPerHour;
|
||||
return startChannelHealthMonitor({
|
||||
channelManager: params.channelManager,
|
||||
checkIntervalMs: (healthCheckMinutes ?? 5) * 60_000,
|
||||
...(staleEventThresholdMinutes != null && {
|
||||
staleEventThresholdMs: staleEventThresholdMinutes * 60_000,
|
||||
}),
|
||||
...(maxRestartsPerHour != null && { maxRestartsPerHour }),
|
||||
});
|
||||
}
|
||||
|
||||
export function startGatewayCronWithLogging(params: {
|
||||
cron: { start: () => Promise<void> };
|
||||
logCron: { error: (message: string) => void };
|
||||
}): void {
|
||||
void params.cron.start().catch((err) => params.logCron.error(`failed to start: ${String(err)}`));
|
||||
}
|
||||
|
||||
function recoverPendingOutboundDeliveries(params: {
|
||||
cfg: OpenClawConfig;
|
||||
log: GatewayRuntimeServiceLogger;
|
||||
}): void {
|
||||
void (async () => {
|
||||
const { recoverPendingDeliveries } = await import("../infra/outbound/delivery-queue.js");
|
||||
const { deliverOutboundPayloads } = await import("../infra/outbound/deliver.js");
|
||||
const logRecovery = params.log.child("delivery-recovery");
|
||||
await recoverPendingDeliveries({
|
||||
deliver: deliverOutboundPayloads,
|
||||
log: logRecovery,
|
||||
cfg: params.cfg,
|
||||
});
|
||||
})().catch((err) => params.log.error(`Delivery recovery failed: ${String(err)}`));
|
||||
}
|
||||
|
||||
export function startGatewayRuntimeServices(params: {
|
||||
minimalTestGateway: boolean;
|
||||
cfgAtStart: OpenClawConfig;
|
||||
channelManager: GatewayChannelManager;
|
||||
cron: { start: () => Promise<void> };
|
||||
logCron: { error: (message: string) => void };
|
||||
log: GatewayRuntimeServiceLogger;
|
||||
}): {
|
||||
heartbeatRunner: HeartbeatRunner;
|
||||
channelHealthMonitor: ChannelHealthMonitor | null;
|
||||
stopModelPricingRefresh: () => void;
|
||||
} {
|
||||
const heartbeatRunner = params.minimalTestGateway
|
||||
? createNoopHeartbeatRunner()
|
||||
: startHeartbeatRunner({ cfg: params.cfgAtStart });
|
||||
const channelHealthMonitor = startGatewayChannelHealthMonitor({
|
||||
cfg: params.cfgAtStart,
|
||||
channelManager: params.channelManager,
|
||||
});
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
startGatewayCronWithLogging({
|
||||
cron: params.cron,
|
||||
logCron: params.logCron,
|
||||
});
|
||||
recoverPendingOutboundDeliveries({
|
||||
cfg: params.cfgAtStart,
|
||||
log: params.log,
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
heartbeatRunner,
|
||||
channelHealthMonitor,
|
||||
stopModelPricingRefresh:
|
||||
!params.minimalTestGateway && process.env.VITEST !== "1"
|
||||
? startGatewayModelPricingRefresh({ config: params.cfgAtStart })
|
||||
: () => {},
|
||||
};
|
||||
}
|
||||
84
src/gateway/server-runtime-subscriptions.ts
Normal file
84
src/gateway/server-runtime-subscriptions.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import { onAgentEvent } from "../infra/agent-events.js";
|
||||
import { onHeartbeatEvent } from "../infra/heartbeat-events.js";
|
||||
import { onSessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import {
|
||||
createAgentEventHandler,
|
||||
type ChatRunState,
|
||||
type SessionEventSubscriberRegistry,
|
||||
type SessionMessageSubscriberRegistry,
|
||||
type ToolEventRecipientRegistry,
|
||||
} from "./server-chat.js";
|
||||
import {
|
||||
createLifecycleEventBroadcastHandler,
|
||||
createTranscriptUpdateBroadcastHandler,
|
||||
} from "./server-session-events.js";
|
||||
|
||||
export function startGatewayEventSubscriptions(params: {
|
||||
minimalTestGateway: boolean;
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
broadcastToConnIds: (
|
||||
event: string,
|
||||
payload: unknown,
|
||||
connIds: ReadonlySet<string>,
|
||||
opts?: { dropIfSlow?: boolean },
|
||||
) => void;
|
||||
nodeSendToSession: (sessionKey: string, event: string, payload: unknown) => void;
|
||||
agentRunSeq: Map<string, number>;
|
||||
chatRunState: ChatRunState;
|
||||
resolveSessionKeyForRun: (runId: string) => string | undefined;
|
||||
clearAgentRunContext: (runId: string) => void;
|
||||
toolEventRecipients: ToolEventRecipientRegistry;
|
||||
sessionEventSubscribers: SessionEventSubscriberRegistry;
|
||||
sessionMessageSubscribers: SessionMessageSubscriberRegistry;
|
||||
chatAbortControllers: Map<string, unknown>;
|
||||
}) {
|
||||
const agentUnsub = params.minimalTestGateway
|
||||
? null
|
||||
: onAgentEvent(
|
||||
createAgentEventHandler({
|
||||
broadcast: params.broadcast,
|
||||
broadcastToConnIds: params.broadcastToConnIds,
|
||||
nodeSendToSession: params.nodeSendToSession,
|
||||
agentRunSeq: params.agentRunSeq,
|
||||
chatRunState: params.chatRunState,
|
||||
resolveSessionKeyForRun: params.resolveSessionKeyForRun,
|
||||
clearAgentRunContext: params.clearAgentRunContext,
|
||||
toolEventRecipients: params.toolEventRecipients,
|
||||
sessionEventSubscribers: params.sessionEventSubscribers,
|
||||
isChatSendRunActive: (runId) => params.chatAbortControllers.has(runId),
|
||||
}),
|
||||
);
|
||||
|
||||
const heartbeatUnsub = params.minimalTestGateway
|
||||
? null
|
||||
: onHeartbeatEvent((evt) => {
|
||||
params.broadcast("heartbeat", evt, { dropIfSlow: true });
|
||||
});
|
||||
|
||||
const transcriptUnsub = params.minimalTestGateway
|
||||
? null
|
||||
: onSessionTranscriptUpdate(
|
||||
createTranscriptUpdateBroadcastHandler({
|
||||
broadcastToConnIds: params.broadcastToConnIds,
|
||||
sessionEventSubscribers: params.sessionEventSubscribers,
|
||||
sessionMessageSubscribers: params.sessionMessageSubscribers,
|
||||
}),
|
||||
);
|
||||
|
||||
const lifecycleUnsub = params.minimalTestGateway
|
||||
? null
|
||||
: onSessionLifecycleEvent(
|
||||
createLifecycleEventBroadcastHandler({
|
||||
broadcastToConnIds: params.broadcastToConnIds,
|
||||
sessionEventSubscribers: params.sessionEventSubscribers,
|
||||
}),
|
||||
);
|
||||
|
||||
return {
|
||||
agentUnsub,
|
||||
heartbeatUnsub,
|
||||
transcriptUnsub,
|
||||
lifecycleUnsub,
|
||||
};
|
||||
}
|
||||
177
src/gateway/server-session-events.ts
Normal file
177
src/gateway/server-session-events.ts
Normal file
@@ -0,0 +1,177 @@
|
||||
import type { SessionLifecycleEvent } from "../sessions/session-lifecycle-events.js";
|
||||
import type { SessionTranscriptUpdate } from "../sessions/transcript-events.js";
|
||||
import type { GatewayBroadcastToConnIdsFn } from "./server-broadcast.js";
|
||||
import type {
|
||||
SessionEventSubscriberRegistry,
|
||||
SessionMessageSubscriberRegistry,
|
||||
} from "./server-chat.js";
|
||||
import { resolveSessionKeyForTranscriptFile } from "./session-transcript-key.js";
|
||||
import {
|
||||
attachOpenClawTranscriptMeta,
|
||||
loadGatewaySessionRow,
|
||||
loadSessionEntry,
|
||||
readSessionMessages,
|
||||
type GatewaySessionRow,
|
||||
} from "./session-utils.js";
|
||||
|
||||
type SessionEventSubscribers = Pick<SessionEventSubscriberRegistry, "getAll">;
|
||||
type SessionMessageSubscribers = Pick<SessionMessageSubscriberRegistry, "get">;
|
||||
|
||||
function buildGatewaySessionSnapshot(params: {
|
||||
sessionRow: GatewaySessionRow | null | undefined;
|
||||
includeSession?: boolean;
|
||||
label?: string;
|
||||
displayName?: string;
|
||||
parentSessionKey?: string;
|
||||
}): Record<string, unknown> {
|
||||
const { sessionRow } = params;
|
||||
if (!sessionRow) {
|
||||
return {};
|
||||
}
|
||||
return {
|
||||
...(params.includeSession ? { session: sessionRow } : {}),
|
||||
updatedAt: sessionRow.updatedAt ?? undefined,
|
||||
sessionId: sessionRow.sessionId,
|
||||
kind: sessionRow.kind,
|
||||
channel: sessionRow.channel,
|
||||
subject: sessionRow.subject,
|
||||
groupChannel: sessionRow.groupChannel,
|
||||
space: sessionRow.space,
|
||||
chatType: sessionRow.chatType,
|
||||
origin: sessionRow.origin,
|
||||
spawnedBy: sessionRow.spawnedBy,
|
||||
spawnedWorkspaceDir: sessionRow.spawnedWorkspaceDir,
|
||||
forkedFromParent: sessionRow.forkedFromParent,
|
||||
spawnDepth: sessionRow.spawnDepth,
|
||||
subagentRole: sessionRow.subagentRole,
|
||||
subagentControlScope: sessionRow.subagentControlScope,
|
||||
label: params.label ?? sessionRow.label,
|
||||
displayName: params.displayName ?? sessionRow.displayName,
|
||||
deliveryContext: sessionRow.deliveryContext,
|
||||
parentSessionKey: params.parentSessionKey ?? sessionRow.parentSessionKey,
|
||||
childSessions: sessionRow.childSessions,
|
||||
thinkingLevel: sessionRow.thinkingLevel,
|
||||
fastMode: sessionRow.fastMode,
|
||||
verboseLevel: sessionRow.verboseLevel,
|
||||
reasoningLevel: sessionRow.reasoningLevel,
|
||||
elevatedLevel: sessionRow.elevatedLevel,
|
||||
sendPolicy: sessionRow.sendPolicy,
|
||||
systemSent: sessionRow.systemSent,
|
||||
abortedLastRun: sessionRow.abortedLastRun,
|
||||
inputTokens: sessionRow.inputTokens,
|
||||
outputTokens: sessionRow.outputTokens,
|
||||
lastChannel: sessionRow.lastChannel,
|
||||
lastTo: sessionRow.lastTo,
|
||||
lastAccountId: sessionRow.lastAccountId,
|
||||
lastThreadId: sessionRow.lastThreadId,
|
||||
totalTokens: sessionRow.totalTokens,
|
||||
totalTokensFresh: sessionRow.totalTokensFresh,
|
||||
contextTokens: sessionRow.contextTokens,
|
||||
estimatedCostUsd: sessionRow.estimatedCostUsd,
|
||||
responseUsage: sessionRow.responseUsage,
|
||||
modelProvider: sessionRow.modelProvider,
|
||||
model: sessionRow.model,
|
||||
status: sessionRow.status,
|
||||
startedAt: sessionRow.startedAt,
|
||||
endedAt: sessionRow.endedAt,
|
||||
runtimeMs: sessionRow.runtimeMs,
|
||||
compactionCheckpointCount: sessionRow.compactionCheckpointCount,
|
||||
latestCompactionCheckpoint: sessionRow.latestCompactionCheckpoint,
|
||||
};
|
||||
}
|
||||
|
||||
export function createTranscriptUpdateBroadcastHandler(params: {
|
||||
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
|
||||
sessionEventSubscribers: SessionEventSubscribers;
|
||||
sessionMessageSubscribers: SessionMessageSubscribers;
|
||||
}) {
|
||||
return (update: SessionTranscriptUpdate): void => {
|
||||
const sessionKey = update.sessionKey ?? resolveSessionKeyForTranscriptFile(update.sessionFile);
|
||||
if (!sessionKey || update.message === undefined) {
|
||||
return;
|
||||
}
|
||||
const connIds = new Set<string>();
|
||||
for (const connId of params.sessionEventSubscribers.getAll()) {
|
||||
connIds.add(connId);
|
||||
}
|
||||
for (const connId of params.sessionMessageSubscribers.get(sessionKey)) {
|
||||
connIds.add(connId);
|
||||
}
|
||||
if (connIds.size === 0) {
|
||||
return;
|
||||
}
|
||||
const { entry, storePath } = loadSessionEntry(sessionKey);
|
||||
const messageSeq = entry?.sessionId
|
||||
? readSessionMessages(entry.sessionId, storePath, entry.sessionFile).length
|
||||
: undefined;
|
||||
const sessionSnapshot = buildGatewaySessionSnapshot({
|
||||
sessionRow: loadGatewaySessionRow(sessionKey),
|
||||
includeSession: true,
|
||||
});
|
||||
const message = attachOpenClawTranscriptMeta(update.message, {
|
||||
...(typeof update.messageId === "string" ? { id: update.messageId } : {}),
|
||||
...(typeof messageSeq === "number" ? { seq: messageSeq } : {}),
|
||||
});
|
||||
params.broadcastToConnIds(
|
||||
"session.message",
|
||||
{
|
||||
sessionKey,
|
||||
message,
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
...(typeof messageSeq === "number" ? { messageSeq } : {}),
|
||||
...sessionSnapshot,
|
||||
},
|
||||
connIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
|
||||
const sessionEventConnIds = params.sessionEventSubscribers.getAll();
|
||||
if (sessionEventConnIds.size === 0) {
|
||||
return;
|
||||
}
|
||||
params.broadcastToConnIds(
|
||||
"sessions.changed",
|
||||
{
|
||||
sessionKey,
|
||||
phase: "message",
|
||||
ts: Date.now(),
|
||||
...(typeof update.messageId === "string" ? { messageId: update.messageId } : {}),
|
||||
...(typeof messageSeq === "number" ? { messageSeq } : {}),
|
||||
...sessionSnapshot,
|
||||
},
|
||||
sessionEventConnIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
export function createLifecycleEventBroadcastHandler(params: {
|
||||
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
|
||||
sessionEventSubscribers: SessionEventSubscribers;
|
||||
}) {
|
||||
return (event: SessionLifecycleEvent): void => {
|
||||
const connIds = params.sessionEventSubscribers.getAll();
|
||||
if (connIds.size === 0) {
|
||||
return;
|
||||
}
|
||||
params.broadcastToConnIds(
|
||||
"sessions.changed",
|
||||
{
|
||||
sessionKey: event.sessionKey,
|
||||
reason: event.reason,
|
||||
parentSessionKey: event.parentSessionKey,
|
||||
label: event.label,
|
||||
displayName: event.displayName,
|
||||
ts: Date.now(),
|
||||
...buildGatewaySessionSnapshot({
|
||||
sessionRow: loadGatewaySessionRow(event.sessionKey),
|
||||
label: event.label,
|
||||
displayName: event.displayName,
|
||||
parentSessionKey: event.parentSessionKey,
|
||||
}),
|
||||
},
|
||||
connIds,
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
};
|
||||
}
|
||||
93
src/gateway/server-shared-auth-generation.ts
Normal file
93
src/gateway/server-shared-auth-generation.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import { resolveGatewayReloadSettings } from "./config-reload.js";
|
||||
|
||||
export type SharedGatewayAuthClient = {
|
||||
usesSharedGatewayAuth?: boolean;
|
||||
sharedGatewaySessionGeneration?: string;
|
||||
socket: { close: (code: number, reason: string) => void };
|
||||
};
|
||||
|
||||
export type SharedGatewaySessionGenerationState = {
|
||||
current: string | undefined;
|
||||
required: string | undefined | null;
|
||||
};
|
||||
|
||||
export function disconnectStaleSharedGatewayAuthClients(params: {
|
||||
clients: Iterable<SharedGatewayAuthClient>;
|
||||
expectedGeneration: string | undefined;
|
||||
}): void {
|
||||
for (const gatewayClient of params.clients) {
|
||||
if (!gatewayClient.usesSharedGatewayAuth) {
|
||||
continue;
|
||||
}
|
||||
if (gatewayClient.sharedGatewaySessionGeneration === params.expectedGeneration) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
gatewayClient.socket.close(4001, "gateway auth changed");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function disconnectAllSharedGatewayAuthClients(
|
||||
clients: Iterable<SharedGatewayAuthClient>,
|
||||
): void {
|
||||
for (const gatewayClient of clients) {
|
||||
if (!gatewayClient.usesSharedGatewayAuth) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
gatewayClient.socket.close(4001, "gateway auth changed");
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function getRequiredSharedGatewaySessionGeneration(
|
||||
state: SharedGatewaySessionGenerationState,
|
||||
): string | undefined {
|
||||
return state.required === null ? state.current : state.required;
|
||||
}
|
||||
|
||||
export function setCurrentSharedGatewaySessionGeneration(
|
||||
state: SharedGatewaySessionGenerationState,
|
||||
nextGeneration: string | undefined,
|
||||
): void {
|
||||
const previousGeneration = state.current;
|
||||
state.current = nextGeneration;
|
||||
if (state.required === nextGeneration) {
|
||||
state.required = null;
|
||||
return;
|
||||
}
|
||||
if (state.required !== null && previousGeneration !== nextGeneration) {
|
||||
state.required = null;
|
||||
}
|
||||
}
|
||||
|
||||
export function enforceSharedGatewaySessionGenerationForConfigWrite(params: {
|
||||
state: SharedGatewaySessionGenerationState;
|
||||
nextConfig: OpenClawConfig;
|
||||
resolveRuntimeSnapshotGeneration: () => string | undefined;
|
||||
clients: Iterable<SharedGatewayAuthClient>;
|
||||
}): void {
|
||||
const reloadMode = resolveGatewayReloadSettings(params.nextConfig).mode;
|
||||
const nextSharedGatewaySessionGeneration = params.resolveRuntimeSnapshotGeneration();
|
||||
if (reloadMode === "off") {
|
||||
params.state.current = nextSharedGatewaySessionGeneration;
|
||||
params.state.required = nextSharedGatewaySessionGeneration;
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
return;
|
||||
}
|
||||
params.state.required = null;
|
||||
setCurrentSharedGatewaySessionGeneration(params.state, nextSharedGatewaySessionGeneration);
|
||||
disconnectStaleSharedGatewayAuthClients({
|
||||
clients: params.clients,
|
||||
expectedGeneration: nextSharedGatewaySessionGeneration,
|
||||
});
|
||||
}
|
||||
282
src/gateway/server-startup-config.ts
Normal file
282
src/gateway/server-startup-config.ts
Normal file
@@ -0,0 +1,282 @@
|
||||
import { formatCliCommand } from "../cli/command-format.js";
|
||||
import {
|
||||
type ConfigFileSnapshot,
|
||||
type GatewayAuthConfig,
|
||||
type GatewayTailscaleConfig,
|
||||
type OpenClawConfig,
|
||||
applyConfigOverrides,
|
||||
isNixMode,
|
||||
readConfigFileSnapshot,
|
||||
writeConfigFile,
|
||||
} from "../config/config.js";
|
||||
import { formatConfigIssueLines } from "../config/issue-format.js";
|
||||
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import {
|
||||
GATEWAY_AUTH_SURFACE_PATHS,
|
||||
evaluateGatewayAuthSurfaceStates,
|
||||
} from "../secrets/runtime-gateway-auth-surfaces.js";
|
||||
import {
|
||||
activateSecretsRuntimeSnapshot,
|
||||
prepareSecretsRuntimeSnapshot,
|
||||
} from "../secrets/runtime.js";
|
||||
import {
|
||||
ensureGatewayStartupAuth,
|
||||
mergeGatewayAuthConfig,
|
||||
mergeGatewayTailscaleConfig,
|
||||
} from "./startup-auth.js";
|
||||
|
||||
type GatewayStartupLog = {
|
||||
info: (message: string) => void;
|
||||
warn: (message: string) => void;
|
||||
error?: (message: string) => void;
|
||||
};
|
||||
|
||||
type GatewaySecretsStateEventCode = "SECRETS_RELOADER_DEGRADED" | "SECRETS_RELOADER_RECOVERED";
|
||||
|
||||
export type ActivateRuntimeSecrets = (
|
||||
config: OpenClawConfig,
|
||||
params: { reason: "startup" | "reload" | "restart-check"; activate: boolean },
|
||||
) => Promise<Awaited<ReturnType<typeof prepareSecretsRuntimeSnapshot>>>;
|
||||
|
||||
type GatewayStartupConfigOverrides = {
|
||||
auth?: GatewayAuthConfig;
|
||||
tailscale?: GatewayTailscaleConfig;
|
||||
};
|
||||
|
||||
export async function loadGatewayStartupConfigSnapshot(params: {
|
||||
minimalTestGateway: boolean;
|
||||
log: GatewayStartupLog;
|
||||
}): Promise<ConfigFileSnapshot> {
|
||||
let configSnapshot = await readConfigFileSnapshot();
|
||||
if (configSnapshot.legacyIssues.length > 0 && isNixMode) {
|
||||
throw new Error(
|
||||
"Legacy config entries detected while running in Nix mode. Update your Nix config to the latest schema and restart.",
|
||||
);
|
||||
}
|
||||
if (configSnapshot.exists) {
|
||||
assertValidGatewayStartupConfigSnapshot(configSnapshot, { includeDoctorHint: true });
|
||||
}
|
||||
|
||||
const autoEnable = params.minimalTestGateway
|
||||
? { config: configSnapshot.config, changes: [] as string[] }
|
||||
: applyPluginAutoEnable({ config: configSnapshot.config, env: process.env });
|
||||
if (autoEnable.changes.length === 0) {
|
||||
return configSnapshot;
|
||||
}
|
||||
|
||||
try {
|
||||
await writeConfigFile(autoEnable.config);
|
||||
configSnapshot = await readConfigFileSnapshot();
|
||||
assertValidGatewayStartupConfigSnapshot(configSnapshot);
|
||||
params.log.info(
|
||||
`gateway: auto-enabled plugins:\n${autoEnable.changes.map((entry) => `- ${entry}`).join("\n")}`,
|
||||
);
|
||||
} catch (err) {
|
||||
params.log.warn(`gateway: failed to persist plugin auto-enable changes: ${String(err)}`);
|
||||
}
|
||||
|
||||
return configSnapshot;
|
||||
}
|
||||
|
||||
export function createRuntimeSecretsActivator(params: {
|
||||
logSecrets: GatewayStartupLog;
|
||||
emitStateEvent: (
|
||||
code: GatewaySecretsStateEventCode,
|
||||
message: string,
|
||||
cfg: OpenClawConfig,
|
||||
) => void;
|
||||
}): ActivateRuntimeSecrets {
|
||||
let secretsDegraded = false;
|
||||
let secretsActivationTail: Promise<void> = Promise.resolve();
|
||||
|
||||
const runWithSecretsActivationLock = async <T>(operation: () => Promise<T>): Promise<T> => {
|
||||
const run = secretsActivationTail.then(operation, operation);
|
||||
secretsActivationTail = run.then(
|
||||
() => undefined,
|
||||
() => undefined,
|
||||
);
|
||||
return await run;
|
||||
};
|
||||
|
||||
return async (config, activationParams) =>
|
||||
await runWithSecretsActivationLock(async () => {
|
||||
try {
|
||||
const prepared = await prepareSecretsRuntimeSnapshot({
|
||||
config: pruneSkippedStartupSecretSurfaces(config),
|
||||
});
|
||||
if (activationParams.activate) {
|
||||
activateSecretsRuntimeSnapshot(prepared);
|
||||
logGatewayAuthSurfaceDiagnostics(prepared, params.logSecrets);
|
||||
}
|
||||
for (const warning of prepared.warnings) {
|
||||
params.logSecrets.warn(`[${warning.code}] ${warning.message}`);
|
||||
}
|
||||
if (secretsDegraded) {
|
||||
const recoveredMessage =
|
||||
"Secret resolution recovered; runtime remained on last-known-good during the outage.";
|
||||
params.logSecrets.info(`[SECRETS_RELOADER_RECOVERED] ${recoveredMessage}`);
|
||||
params.emitStateEvent("SECRETS_RELOADER_RECOVERED", recoveredMessage, prepared.config);
|
||||
}
|
||||
secretsDegraded = false;
|
||||
return prepared;
|
||||
} catch (err) {
|
||||
const details = String(err);
|
||||
if (!secretsDegraded) {
|
||||
params.logSecrets.error?.(`[SECRETS_RELOADER_DEGRADED] ${details}`);
|
||||
if (activationParams.reason !== "startup") {
|
||||
params.emitStateEvent(
|
||||
"SECRETS_RELOADER_DEGRADED",
|
||||
`Secret resolution failed; runtime remains on last-known-good snapshot. ${details}`,
|
||||
config,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
params.logSecrets.warn(`[SECRETS_RELOADER_DEGRADED] ${details}`);
|
||||
}
|
||||
secretsDegraded = true;
|
||||
if (activationParams.reason === "startup") {
|
||||
throw new Error(`Startup failed: required secrets are unavailable. ${details}`, {
|
||||
cause: err,
|
||||
});
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export function assertValidGatewayStartupConfigSnapshot(
|
||||
snapshot: ConfigFileSnapshot,
|
||||
options: { includeDoctorHint?: boolean } = {},
|
||||
): void {
|
||||
if (snapshot.valid) {
|
||||
return;
|
||||
}
|
||||
const issues =
|
||||
snapshot.issues.length > 0
|
||||
? formatConfigIssueLines(snapshot.issues, "", { normalizeRoot: true }).join("\n")
|
||||
: "Unknown validation issue.";
|
||||
const doctorHint = options.includeDoctorHint
|
||||
? `\nRun "${formatCliCommand("openclaw doctor --fix")}" to repair, then retry.`
|
||||
: "";
|
||||
throw new Error(`Invalid config at ${snapshot.path}.\n${issues}${doctorHint}`);
|
||||
}
|
||||
|
||||
export async function prepareGatewayStartupConfig(params: {
|
||||
configSnapshot: ConfigFileSnapshot;
|
||||
authOverride?: GatewayAuthConfig;
|
||||
tailscaleOverride?: GatewayTailscaleConfig;
|
||||
activateRuntimeSecrets: ActivateRuntimeSecrets;
|
||||
}): Promise<Awaited<ReturnType<typeof ensureGatewayStartupAuth>>> {
|
||||
assertValidGatewayStartupConfigSnapshot(params.configSnapshot);
|
||||
|
||||
const runtimeConfig = applyConfigOverrides(params.configSnapshot.config);
|
||||
const startupPreflightConfig = applyGatewayAuthOverridesForStartupPreflight(runtimeConfig, {
|
||||
auth: params.authOverride,
|
||||
tailscale: params.tailscaleOverride,
|
||||
});
|
||||
const preflightConfig = (
|
||||
await params.activateRuntimeSecrets(startupPreflightConfig, {
|
||||
reason: "startup",
|
||||
activate: false,
|
||||
})
|
||||
).config;
|
||||
const preflightAuthOverride =
|
||||
typeof preflightConfig.gateway?.auth?.token === "string" ||
|
||||
typeof preflightConfig.gateway?.auth?.password === "string"
|
||||
? {
|
||||
...params.authOverride,
|
||||
...(typeof preflightConfig.gateway?.auth?.token === "string"
|
||||
? { token: preflightConfig.gateway.auth.token }
|
||||
: {}),
|
||||
...(typeof preflightConfig.gateway?.auth?.password === "string"
|
||||
? { password: preflightConfig.gateway.auth.password }
|
||||
: {}),
|
||||
}
|
||||
: params.authOverride;
|
||||
|
||||
const authBootstrap = await ensureGatewayStartupAuth({
|
||||
cfg: runtimeConfig,
|
||||
env: process.env,
|
||||
authOverride: preflightAuthOverride,
|
||||
tailscaleOverride: params.tailscaleOverride,
|
||||
persist: true,
|
||||
baseHash: params.configSnapshot.hash,
|
||||
});
|
||||
const runtimeStartupConfig = applyGatewayAuthOverridesForStartupPreflight(authBootstrap.cfg, {
|
||||
auth: params.authOverride,
|
||||
tailscale: params.tailscaleOverride,
|
||||
});
|
||||
const activatedConfig = (
|
||||
await params.activateRuntimeSecrets(runtimeStartupConfig, {
|
||||
reason: "startup",
|
||||
activate: true,
|
||||
})
|
||||
).config;
|
||||
return {
|
||||
...authBootstrap,
|
||||
cfg: activatedConfig,
|
||||
};
|
||||
}
|
||||
|
||||
function pruneSkippedStartupSecretSurfaces(config: OpenClawConfig): OpenClawConfig {
|
||||
const skipChannels =
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS);
|
||||
if (!skipChannels || !config.channels) {
|
||||
return config;
|
||||
}
|
||||
return {
|
||||
...config,
|
||||
channels: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
function logGatewayAuthSurfaceDiagnostics(
|
||||
prepared: {
|
||||
sourceConfig: OpenClawConfig;
|
||||
warnings: Array<{ code: string; path: string; message: string }>;
|
||||
},
|
||||
logSecrets: GatewayStartupLog,
|
||||
): void {
|
||||
const states = evaluateGatewayAuthSurfaceStates({
|
||||
config: prepared.sourceConfig,
|
||||
defaults: prepared.sourceConfig.secrets?.defaults,
|
||||
env: process.env,
|
||||
});
|
||||
const inactiveWarnings = new Map<string, string>();
|
||||
for (const warning of prepared.warnings) {
|
||||
if (warning.code !== "SECRETS_REF_IGNORED_INACTIVE_SURFACE") {
|
||||
continue;
|
||||
}
|
||||
inactiveWarnings.set(warning.path, warning.message);
|
||||
}
|
||||
for (const path of GATEWAY_AUTH_SURFACE_PATHS) {
|
||||
const state = states[path];
|
||||
if (!state.hasSecretRef) {
|
||||
continue;
|
||||
}
|
||||
const stateLabel = state.active ? "active" : "inactive";
|
||||
const inactiveDetails =
|
||||
!state.active && inactiveWarnings.get(path) ? inactiveWarnings.get(path) : undefined;
|
||||
const details = inactiveDetails ?? state.reason;
|
||||
logSecrets.info(`[SECRETS_GATEWAY_AUTH_SURFACE] ${path} is ${stateLabel}. ${details}`);
|
||||
}
|
||||
}
|
||||
|
||||
function applyGatewayAuthOverridesForStartupPreflight(
|
||||
config: OpenClawConfig,
|
||||
overrides: GatewayStartupConfigOverrides,
|
||||
): OpenClawConfig {
|
||||
if (!overrides.auth && !overrides.tailscale) {
|
||||
return config;
|
||||
}
|
||||
return {
|
||||
...config,
|
||||
gateway: {
|
||||
...config.gateway,
|
||||
auth: mergeGatewayAuthConfig(config.gateway?.auth, overrides.auth),
|
||||
tailscale: mergeGatewayTailscaleConfig(config.gateway?.tailscale, overrides.tailscale),
|
||||
},
|
||||
};
|
||||
}
|
||||
135
src/gateway/server-startup-early.ts
Normal file
135
src/gateway/server-startup-early.ts
Normal file
@@ -0,0 +1,135 @@
|
||||
import { registerSkillsChangeListener } from "../agents/skills/refresh.js";
|
||||
import type { OpenClawConfig } from "../config/config.js";
|
||||
import type { GatewayTailscaleMode } from "../config/types.gateway.js";
|
||||
import { getMachineDisplayName } from "../infra/machine-name.js";
|
||||
import {
|
||||
primeRemoteSkillsCache,
|
||||
refreshRemoteBinsForConnectedNodes,
|
||||
setSkillsRemoteRegistry,
|
||||
} from "../infra/skills-remote.js";
|
||||
import { startTaskRegistryMaintenance } from "../tasks/task-registry.maintenance.js";
|
||||
import { startMcpLoopbackServer } from "./mcp-http.js";
|
||||
import { startGatewayDiscovery } from "./server-discovery-runtime.js";
|
||||
import { startGatewayMaintenanceTimers } from "./server-maintenance.js";
|
||||
|
||||
export async function startGatewayEarlyRuntime(params: {
|
||||
minimalTestGateway: boolean;
|
||||
cfgAtStart: OpenClawConfig;
|
||||
port: number;
|
||||
gatewayTls: { enabled: boolean; fingerprintSha256?: string };
|
||||
tailscaleMode: GatewayTailscaleMode;
|
||||
log: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
};
|
||||
logDiscovery: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
};
|
||||
nodeRegistry: Parameters<typeof setSkillsRemoteRegistry>[0];
|
||||
broadcast: Parameters<typeof startGatewayMaintenanceTimers>[0]["broadcast"];
|
||||
nodeSendToAllSubscribed: Parameters<
|
||||
typeof startGatewayMaintenanceTimers
|
||||
>[0]["nodeSendToAllSubscribed"];
|
||||
getPresenceVersion: Parameters<typeof startGatewayMaintenanceTimers>[0]["getPresenceVersion"];
|
||||
getHealthVersion: Parameters<typeof startGatewayMaintenanceTimers>[0]["getHealthVersion"];
|
||||
refreshGatewayHealthSnapshot: Parameters<
|
||||
typeof startGatewayMaintenanceTimers
|
||||
>[0]["refreshGatewayHealthSnapshot"];
|
||||
logHealth: Parameters<typeof startGatewayMaintenanceTimers>[0]["logHealth"];
|
||||
dedupe: Parameters<typeof startGatewayMaintenanceTimers>[0]["dedupe"];
|
||||
chatAbortControllers: Parameters<typeof startGatewayMaintenanceTimers>[0]["chatAbortControllers"];
|
||||
chatRunState: Parameters<typeof startGatewayMaintenanceTimers>[0]["chatRunState"];
|
||||
chatRunBuffers: Parameters<typeof startGatewayMaintenanceTimers>[0]["chatRunBuffers"];
|
||||
chatDeltaSentAt: Parameters<typeof startGatewayMaintenanceTimers>[0]["chatDeltaSentAt"];
|
||||
chatDeltaLastBroadcastLen: Parameters<
|
||||
typeof startGatewayMaintenanceTimers
|
||||
>[0]["chatDeltaLastBroadcastLen"];
|
||||
removeChatRun: Parameters<typeof startGatewayMaintenanceTimers>[0]["removeChatRun"];
|
||||
agentRunSeq: Parameters<typeof startGatewayMaintenanceTimers>[0]["agentRunSeq"];
|
||||
nodeSendToSession: Parameters<typeof startGatewayMaintenanceTimers>[0]["nodeSendToSession"];
|
||||
mediaCleanupTtlMs?: number;
|
||||
skillsRefreshDelayMs: number;
|
||||
getSkillsRefreshTimer: () => ReturnType<typeof setTimeout> | null;
|
||||
setSkillsRefreshTimer: (timer: ReturnType<typeof setTimeout> | null) => void;
|
||||
loadConfig: () => OpenClawConfig;
|
||||
}) {
|
||||
let mcpServer: { port: number; close: () => Promise<void> } | undefined;
|
||||
try {
|
||||
mcpServer = await startMcpLoopbackServer(0);
|
||||
params.log.info(`MCP loopback server listening on http://127.0.0.1:${mcpServer.port}/mcp`);
|
||||
} catch (error) {
|
||||
params.log.warn(`MCP loopback server failed to start: ${String(error)}`);
|
||||
}
|
||||
|
||||
let bonjourStop: (() => Promise<void>) | null = null;
|
||||
if (!params.minimalTestGateway) {
|
||||
const machineDisplayName = await getMachineDisplayName();
|
||||
const discovery = await startGatewayDiscovery({
|
||||
machineDisplayName,
|
||||
port: params.port,
|
||||
gatewayTls: params.gatewayTls.enabled
|
||||
? { enabled: true, fingerprintSha256: params.gatewayTls.fingerprintSha256 }
|
||||
: undefined,
|
||||
wideAreaDiscoveryEnabled: params.cfgAtStart.discovery?.wideArea?.enabled === true,
|
||||
wideAreaDiscoveryDomain: params.cfgAtStart.discovery?.wideArea?.domain,
|
||||
tailscaleMode: params.tailscaleMode,
|
||||
mdnsMode: params.cfgAtStart.discovery?.mdns?.mode,
|
||||
logDiscovery: params.logDiscovery,
|
||||
});
|
||||
bonjourStop = discovery.bonjourStop;
|
||||
}
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
setSkillsRemoteRegistry(params.nodeRegistry);
|
||||
void primeRemoteSkillsCache();
|
||||
startTaskRegistryMaintenance();
|
||||
}
|
||||
|
||||
const skillsChangeUnsub = params.minimalTestGateway
|
||||
? () => {}
|
||||
: registerSkillsChangeListener((event) => {
|
||||
if (event.reason === "remote-node") {
|
||||
return;
|
||||
}
|
||||
const existingTimer = params.getSkillsRefreshTimer();
|
||||
if (existingTimer) {
|
||||
clearTimeout(existingTimer);
|
||||
}
|
||||
const nextTimer = setTimeout(() => {
|
||||
params.setSkillsRefreshTimer(null);
|
||||
void refreshRemoteBinsForConnectedNodes(params.loadConfig());
|
||||
}, params.skillsRefreshDelayMs);
|
||||
params.setSkillsRefreshTimer(nextTimer);
|
||||
});
|
||||
|
||||
const maintenance = params.minimalTestGateway
|
||||
? null
|
||||
: startGatewayMaintenanceTimers({
|
||||
broadcast: params.broadcast,
|
||||
nodeSendToAllSubscribed: params.nodeSendToAllSubscribed,
|
||||
getPresenceVersion: params.getPresenceVersion,
|
||||
getHealthVersion: params.getHealthVersion,
|
||||
refreshGatewayHealthSnapshot: params.refreshGatewayHealthSnapshot,
|
||||
logHealth: params.logHealth,
|
||||
dedupe: params.dedupe,
|
||||
chatAbortControllers: params.chatAbortControllers,
|
||||
chatRunState: params.chatRunState,
|
||||
chatRunBuffers: params.chatRunBuffers,
|
||||
chatDeltaSentAt: params.chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen: params.chatDeltaLastBroadcastLen,
|
||||
removeChatRun: params.removeChatRun,
|
||||
agentRunSeq: params.agentRunSeq,
|
||||
nodeSendToSession: params.nodeSendToSession,
|
||||
...(typeof params.mediaCleanupTtlMs === "number"
|
||||
? { mediaCleanupTtlMs: params.mediaCleanupTtlMs }
|
||||
: {}),
|
||||
});
|
||||
|
||||
return {
|
||||
mcpServer,
|
||||
bonjourStop,
|
||||
skillsChangeUnsub,
|
||||
maintenance,
|
||||
};
|
||||
}
|
||||
107
src/gateway/server-startup-plugins.ts
Normal file
107
src/gateway/server-startup-plugins.ts
Normal file
@@ -0,0 +1,107 @@
|
||||
import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js";
|
||||
import { initSubagentRegistry } from "../agents/subagent-registry.js";
|
||||
import { runChannelPluginStartupMaintenance } from "../channels/plugins/lifecycle-startup.js";
|
||||
import type { loadConfig } from "../config/config.js";
|
||||
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
|
||||
import {
|
||||
resolveConfiguredDeferredChannelPluginIds,
|
||||
resolveGatewayStartupPluginIds,
|
||||
} from "../plugins/channel-plugin-ids.js";
|
||||
import { createEmptyPluginRegistry } from "../plugins/registry.js";
|
||||
import { getActivePluginRegistry, setActivePluginRegistry } from "../plugins/runtime.js";
|
||||
import { listGatewayMethods } from "./server-methods-list.js";
|
||||
import { coreGatewayHandlers } from "./server-methods.js";
|
||||
import { loadGatewayStartupPlugins } from "./server-plugin-bootstrap.js";
|
||||
import { runStartupSessionMigration } from "./server-startup-session-migration.js";
|
||||
|
||||
type GatewayPluginBootstrapLog = {
|
||||
info: (message: string) => void;
|
||||
warn: (message: string) => void;
|
||||
error: (message: string) => void;
|
||||
debug: (message: string) => void;
|
||||
};
|
||||
|
||||
export async function prepareGatewayPluginBootstrap(params: {
|
||||
cfgAtStart: ReturnType<typeof loadConfig>;
|
||||
startupRuntimeConfig: ReturnType<typeof loadConfig>;
|
||||
minimalTestGateway: boolean;
|
||||
log: GatewayPluginBootstrapLog;
|
||||
}) {
|
||||
const startupMaintenanceConfig =
|
||||
params.cfgAtStart.channels === undefined && params.startupRuntimeConfig.channels !== undefined
|
||||
? {
|
||||
...params.cfgAtStart,
|
||||
channels: params.startupRuntimeConfig.channels,
|
||||
}
|
||||
: params.cfgAtStart;
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
await runChannelPluginStartupMaintenance({
|
||||
cfg: startupMaintenanceConfig,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
});
|
||||
await runStartupSessionMigration({
|
||||
cfg: params.cfgAtStart,
|
||||
env: process.env,
|
||||
log: params.log,
|
||||
});
|
||||
}
|
||||
|
||||
initSubagentRegistry();
|
||||
|
||||
const gatewayPluginConfigAtStart = params.minimalTestGateway
|
||||
? params.cfgAtStart
|
||||
: applyPluginAutoEnable({
|
||||
config: params.cfgAtStart,
|
||||
env: process.env,
|
||||
}).config;
|
||||
const defaultAgentId = resolveDefaultAgentId(gatewayPluginConfigAtStart);
|
||||
const defaultWorkspaceDir = resolveAgentWorkspaceDir(gatewayPluginConfigAtStart, defaultAgentId);
|
||||
const deferredConfiguredChannelPluginIds = params.minimalTestGateway
|
||||
? []
|
||||
: resolveConfiguredDeferredChannelPluginIds({
|
||||
config: gatewayPluginConfigAtStart,
|
||||
workspaceDir: defaultWorkspaceDir,
|
||||
env: process.env,
|
||||
});
|
||||
const startupPluginIds = params.minimalTestGateway
|
||||
? []
|
||||
: resolveGatewayStartupPluginIds({
|
||||
config: gatewayPluginConfigAtStart,
|
||||
activationSourceConfig: params.cfgAtStart,
|
||||
workspaceDir: defaultWorkspaceDir,
|
||||
env: process.env,
|
||||
});
|
||||
|
||||
const baseMethods = listGatewayMethods();
|
||||
const emptyPluginRegistry = createEmptyPluginRegistry();
|
||||
let pluginRegistry = emptyPluginRegistry;
|
||||
let baseGatewayMethods = baseMethods;
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
({ pluginRegistry, gatewayMethods: baseGatewayMethods } = loadGatewayStartupPlugins({
|
||||
cfg: gatewayPluginConfigAtStart,
|
||||
activationSourceConfig: params.cfgAtStart,
|
||||
workspaceDir: defaultWorkspaceDir,
|
||||
log: params.log,
|
||||
coreGatewayHandlers,
|
||||
baseMethods,
|
||||
pluginIds: startupPluginIds,
|
||||
preferSetupRuntimeForChannelPlugins: deferredConfiguredChannelPluginIds.length > 0,
|
||||
}));
|
||||
} else {
|
||||
pluginRegistry = getActivePluginRegistry() ?? emptyPluginRegistry;
|
||||
setActivePluginRegistry(pluginRegistry);
|
||||
}
|
||||
|
||||
return {
|
||||
gatewayPluginConfigAtStart,
|
||||
defaultWorkspaceDir,
|
||||
deferredConfiguredChannelPluginIds,
|
||||
startupPluginIds,
|
||||
baseMethods,
|
||||
pluginRegistry,
|
||||
baseGatewayMethods,
|
||||
};
|
||||
}
|
||||
161
src/gateway/server-startup-post-attach.test.ts
Normal file
161
src/gateway/server-startup-post-attach.test.ts
Normal file
@@ -0,0 +1,161 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
const hoisted = vi.hoisted(() => {
|
||||
const startPluginServices = vi.fn(async () => null);
|
||||
const startGmailWatcherWithLogs = vi.fn(async () => undefined);
|
||||
const clearInternalHooks = vi.fn();
|
||||
const loadInternalHooks = vi.fn(async () => 0);
|
||||
const startGatewayMemoryBackend = vi.fn(async () => undefined);
|
||||
const scheduleGatewayUpdateCheck = vi.fn(() => () => {});
|
||||
const startGatewayTailscaleExposure = vi.fn(async () => null);
|
||||
const logGatewayStartup = vi.fn();
|
||||
const scheduleSubagentOrphanRecovery = vi.fn();
|
||||
const shouldWakeFromRestartSentinel = vi.fn(() => false);
|
||||
const scheduleRestartSentinelWake = vi.fn();
|
||||
const reconcilePendingSessionIdentities = vi.fn(async () => ({
|
||||
checked: 0,
|
||||
resolved: 0,
|
||||
failed: 0,
|
||||
}));
|
||||
return {
|
||||
startPluginServices,
|
||||
startGmailWatcherWithLogs,
|
||||
clearInternalHooks,
|
||||
loadInternalHooks,
|
||||
startGatewayMemoryBackend,
|
||||
scheduleGatewayUpdateCheck,
|
||||
startGatewayTailscaleExposure,
|
||||
logGatewayStartup,
|
||||
scheduleSubagentOrphanRecovery,
|
||||
shouldWakeFromRestartSentinel,
|
||||
scheduleRestartSentinelWake,
|
||||
reconcilePendingSessionIdentities,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../agents/session-dirs.js", () => ({
|
||||
resolveAgentSessionDirs: vi.fn(async () => []),
|
||||
}));
|
||||
|
||||
vi.mock("../agents/session-write-lock.js", () => ({
|
||||
cleanStaleLockFiles: vi.fn(async () => undefined),
|
||||
}));
|
||||
|
||||
vi.mock("../agents/subagent-registry.js", () => ({
|
||||
scheduleSubagentOrphanRecovery: hoisted.scheduleSubagentOrphanRecovery,
|
||||
}));
|
||||
|
||||
vi.mock("../config/paths.js", () => ({
|
||||
resolveStateDir: vi.fn(() => "/tmp/openclaw-state"),
|
||||
}));
|
||||
|
||||
vi.mock("../hooks/gmail-watcher-lifecycle.js", () => ({
|
||||
startGmailWatcherWithLogs: hoisted.startGmailWatcherWithLogs,
|
||||
}));
|
||||
|
||||
vi.mock("../hooks/internal-hooks.js", () => ({
|
||||
clearInternalHooks: hoisted.clearInternalHooks,
|
||||
createInternalHookEvent: vi.fn(() => ({})),
|
||||
triggerInternalHook: vi.fn(async () => undefined),
|
||||
}));
|
||||
|
||||
vi.mock("../hooks/loader.js", () => ({
|
||||
loadInternalHooks: hoisted.loadInternalHooks,
|
||||
}));
|
||||
|
||||
vi.mock("../plugins/hook-runner-global.js", () => ({
|
||||
getGlobalHookRunner: vi.fn(() => null),
|
||||
}));
|
||||
|
||||
vi.mock("../plugins/services.js", () => ({
|
||||
startPluginServices: hoisted.startPluginServices,
|
||||
}));
|
||||
|
||||
vi.mock("../acp/control-plane/manager.js", () => ({
|
||||
getAcpSessionManager: vi.fn(() => ({
|
||||
reconcilePendingSessionIdentities: hoisted.reconcilePendingSessionIdentities,
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("./server-restart-sentinel.js", () => ({
|
||||
scheduleRestartSentinelWake: hoisted.scheduleRestartSentinelWake,
|
||||
shouldWakeFromRestartSentinel: hoisted.shouldWakeFromRestartSentinel,
|
||||
}));
|
||||
|
||||
vi.mock("./server-startup-memory.js", () => ({
|
||||
startGatewayMemoryBackend: hoisted.startGatewayMemoryBackend,
|
||||
}));
|
||||
|
||||
vi.mock("./server-startup-log.js", () => ({
|
||||
logGatewayStartup: hoisted.logGatewayStartup,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/update-startup.js", () => ({
|
||||
scheduleGatewayUpdateCheck: hoisted.scheduleGatewayUpdateCheck,
|
||||
}));
|
||||
|
||||
vi.mock("./server-tailscale.js", () => ({
|
||||
startGatewayTailscaleExposure: hoisted.startGatewayTailscaleExposure,
|
||||
}));
|
||||
|
||||
const { startGatewayPostAttachRuntime } = await import("./server-startup-post-attach.js");
|
||||
|
||||
describe("startGatewayPostAttachRuntime", () => {
|
||||
beforeEach(() => {
|
||||
hoisted.startPluginServices.mockClear();
|
||||
hoisted.startGmailWatcherWithLogs.mockClear();
|
||||
hoisted.clearInternalHooks.mockClear();
|
||||
hoisted.loadInternalHooks.mockClear();
|
||||
hoisted.startGatewayMemoryBackend.mockClear();
|
||||
hoisted.scheduleGatewayUpdateCheck.mockClear();
|
||||
hoisted.startGatewayTailscaleExposure.mockClear();
|
||||
hoisted.logGatewayStartup.mockClear();
|
||||
hoisted.scheduleSubagentOrphanRecovery.mockClear();
|
||||
hoisted.shouldWakeFromRestartSentinel.mockReturnValue(false);
|
||||
hoisted.scheduleRestartSentinelWake.mockClear();
|
||||
hoisted.reconcilePendingSessionIdentities.mockClear();
|
||||
});
|
||||
|
||||
it("re-enables chat.history after post-attach sidecars start", async () => {
|
||||
const unavailableGatewayMethods = new Set<string>(["chat.history"]);
|
||||
|
||||
await startGatewayPostAttachRuntime({
|
||||
minimalTestGateway: false,
|
||||
cfgAtStart: { hooks: { internal: { enabled: false } } } as never,
|
||||
bindHost: "127.0.0.1",
|
||||
bindHosts: ["127.0.0.1"],
|
||||
port: 18789,
|
||||
tlsEnabled: false,
|
||||
pluginCount: 0,
|
||||
log: { info: vi.fn(), warn: vi.fn() },
|
||||
isNixMode: false,
|
||||
broadcast: vi.fn(),
|
||||
tailscaleMode: "off",
|
||||
resetOnExit: false,
|
||||
controlUiBasePath: "/",
|
||||
logTailscale: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
gatewayPluginConfigAtStart: { hooks: { internal: { enabled: false } } } as never,
|
||||
pluginRegistry: { plugins: [] } as never,
|
||||
defaultWorkspaceDir: "/tmp/openclaw-workspace",
|
||||
deps: {} as never,
|
||||
startChannels: vi.fn(async () => undefined),
|
||||
logHooks: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
logChannels: {
|
||||
info: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
unavailableGatewayMethods,
|
||||
});
|
||||
|
||||
expect(unavailableGatewayMethods.has("chat.history")).toBe(false);
|
||||
expect(hoisted.startPluginServices).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
332
src/gateway/server-startup-post-attach.ts
Normal file
332
src/gateway/server-startup-post-attach.ts
Normal file
@@ -0,0 +1,332 @@
|
||||
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
|
||||
import { ACP_SESSION_IDENTITY_RENDERER_VERSION } from "../acp/runtime/session-identifiers.js";
|
||||
import { resolveOpenClawAgentDir } from "../agents/agent-paths.js";
|
||||
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
|
||||
import { loadModelCatalog } from "../agents/model-catalog.js";
|
||||
import {
|
||||
getModelRefStatus,
|
||||
isCliProvider,
|
||||
resolveConfiguredModelRef,
|
||||
resolveHooksGmailModel,
|
||||
} from "../agents/model-selection.js";
|
||||
import { ensureOpenClawModelsJson } from "../agents/models-config.js";
|
||||
import { resolveModel } from "../agents/pi-embedded-runner/model.js";
|
||||
import { resolveAgentSessionDirs } from "../agents/session-dirs.js";
|
||||
import { cleanStaleLockFiles } from "../agents/session-write-lock.js";
|
||||
import { scheduleSubagentOrphanRecovery } from "../agents/subagent-registry.js";
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { loadConfig } from "../config/config.js";
|
||||
import { resolveAgentModelPrimaryValue } from "../config/model-input.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import type { GatewayTailscaleMode } from "../config/types.gateway.js";
|
||||
import { startGmailWatcherWithLogs } from "../hooks/gmail-watcher-lifecycle.js";
|
||||
import {
|
||||
clearInternalHooks,
|
||||
createInternalHookEvent,
|
||||
triggerInternalHook,
|
||||
} from "../hooks/internal-hooks.js";
|
||||
import { loadInternalHooks } from "../hooks/loader.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import { scheduleGatewayUpdateCheck } from "../infra/update-startup.js";
|
||||
import { getGlobalHookRunner } from "../plugins/hook-runner-global.js";
|
||||
import type { loadOpenClawPlugins } from "../plugins/loader.js";
|
||||
import { type PluginServicesHandle, startPluginServices } from "../plugins/services.js";
|
||||
import {
|
||||
GATEWAY_EVENT_UPDATE_AVAILABLE,
|
||||
type GatewayUpdateAvailableEventPayload,
|
||||
} from "./events.js";
|
||||
import {
|
||||
scheduleRestartSentinelWake,
|
||||
shouldWakeFromRestartSentinel,
|
||||
} from "./server-restart-sentinel.js";
|
||||
import { logGatewayStartup } from "./server-startup-log.js";
|
||||
import { startGatewayMemoryBackend } from "./server-startup-memory.js";
|
||||
import { startGatewayTailscaleExposure } from "./server-tailscale.js";
|
||||
|
||||
const SESSION_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
|
||||
async function prewarmConfiguredPrimaryModel(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
log: { warn: (msg: string) => void };
|
||||
}): Promise<void> {
|
||||
const explicitPrimary = resolveAgentModelPrimaryValue(params.cfg.agents?.defaults?.model)?.trim();
|
||||
if (!explicitPrimary) {
|
||||
return;
|
||||
}
|
||||
const { provider, model } = resolveConfiguredModelRef({
|
||||
cfg: params.cfg,
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
defaultModel: DEFAULT_MODEL,
|
||||
});
|
||||
if (isCliProvider(provider, params.cfg)) {
|
||||
return;
|
||||
}
|
||||
const agentDir = resolveOpenClawAgentDir();
|
||||
try {
|
||||
await ensureOpenClawModelsJson(params.cfg, agentDir);
|
||||
const resolved = resolveModel(provider, model, agentDir, params.cfg, {
|
||||
skipProviderRuntimeHooks: true,
|
||||
});
|
||||
if (!resolved.model) {
|
||||
throw new Error(
|
||||
resolved.error ??
|
||||
`Unknown model: ${provider}/${model} (startup warmup only checks static model resolution)`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`startup model warmup failed for ${provider}/${model}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function startGatewaySidecars(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
pluginRegistry: ReturnType<typeof loadOpenClawPlugins>;
|
||||
defaultWorkspaceDir: string;
|
||||
deps: CliDeps;
|
||||
startChannels: () => Promise<void>;
|
||||
log: { warn: (msg: string) => void };
|
||||
logHooks: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
error: (msg: string) => void;
|
||||
};
|
||||
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
|
||||
}) {
|
||||
try {
|
||||
const stateDir = resolveStateDir(process.env);
|
||||
const sessionDirs = await resolveAgentSessionDirs(stateDir);
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: SESSION_LOCK_STALE_MS,
|
||||
removeStale: true,
|
||||
log: { warn: (message) => params.log.warn(message) },
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`session lock cleanup failed on startup: ${String(err)}`);
|
||||
}
|
||||
|
||||
await startGmailWatcherWithLogs({
|
||||
cfg: params.cfg,
|
||||
log: params.logHooks,
|
||||
});
|
||||
|
||||
if (params.cfg.hooks?.gmail?.model) {
|
||||
const hooksModelRef = resolveHooksGmailModel({
|
||||
cfg: params.cfg,
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
});
|
||||
if (hooksModelRef) {
|
||||
const { provider: resolvedDefaultProvider, model: defaultModel } = resolveConfiguredModelRef({
|
||||
cfg: params.cfg,
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
defaultModel: DEFAULT_MODEL,
|
||||
});
|
||||
const catalog = await loadModelCatalog({ config: params.cfg });
|
||||
const status = getModelRefStatus({
|
||||
cfg: params.cfg,
|
||||
catalog,
|
||||
ref: hooksModelRef,
|
||||
defaultProvider: resolvedDefaultProvider,
|
||||
defaultModel,
|
||||
});
|
||||
if (!status.allowed) {
|
||||
params.logHooks.warn(
|
||||
`hooks.gmail.model "${status.key}" not in agents.defaults.models allowlist (will use primary instead)`,
|
||||
);
|
||||
}
|
||||
if (!status.inCatalog) {
|
||||
params.logHooks.warn(
|
||||
`hooks.gmail.model "${status.key}" not in the model catalog (may fail at runtime)`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
clearInternalHooks();
|
||||
const loadedCount = await loadInternalHooks(params.cfg, params.defaultWorkspaceDir);
|
||||
if (loadedCount > 0) {
|
||||
params.logHooks.info(
|
||||
`loaded ${loadedCount} internal hook handler${loadedCount > 1 ? "s" : ""}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
params.logHooks.error(`failed to load hooks: ${String(err)}`);
|
||||
}
|
||||
|
||||
const skipChannels =
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS);
|
||||
if (!skipChannels) {
|
||||
try {
|
||||
await prewarmConfiguredPrimaryModel({
|
||||
cfg: params.cfg,
|
||||
log: params.log,
|
||||
});
|
||||
await params.startChannels();
|
||||
} catch (err) {
|
||||
params.logChannels.error(`channel startup failed: ${String(err)}`);
|
||||
}
|
||||
} else {
|
||||
params.logChannels.info(
|
||||
"skipping channel start (OPENCLAW_SKIP_CHANNELS=1 or OPENCLAW_SKIP_PROVIDERS=1)",
|
||||
);
|
||||
}
|
||||
|
||||
if (params.cfg.hooks?.internal?.enabled !== false) {
|
||||
setTimeout(() => {
|
||||
const hookEvent = createInternalHookEvent("gateway", "startup", "gateway:startup", {
|
||||
cfg: params.cfg,
|
||||
deps: params.deps,
|
||||
workspaceDir: params.defaultWorkspaceDir,
|
||||
});
|
||||
void triggerInternalHook(hookEvent);
|
||||
}, 250);
|
||||
}
|
||||
|
||||
let pluginServices: PluginServicesHandle | null = null;
|
||||
try {
|
||||
pluginServices = await startPluginServices({
|
||||
registry: params.pluginRegistry,
|
||||
config: params.cfg,
|
||||
workspaceDir: params.defaultWorkspaceDir,
|
||||
});
|
||||
} catch (err) {
|
||||
params.log.warn(`plugin services failed to start: ${String(err)}`);
|
||||
}
|
||||
|
||||
if (params.cfg.acp?.enabled) {
|
||||
void getAcpSessionManager()
|
||||
.reconcilePendingSessionIdentities({ cfg: params.cfg })
|
||||
.then((result) => {
|
||||
if (result.checked === 0) {
|
||||
return;
|
||||
}
|
||||
params.log.warn(
|
||||
`acp startup identity reconcile (renderer=${ACP_SESSION_IDENTITY_RENDERER_VERSION}): checked=${result.checked} resolved=${result.resolved} failed=${result.failed}`,
|
||||
);
|
||||
})
|
||||
.catch((err) => {
|
||||
params.log.warn(`acp startup identity reconcile failed: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
|
||||
void startGatewayMemoryBackend({ cfg: params.cfg, log: params.log }).catch((err) => {
|
||||
params.log.warn(`qmd memory startup initialization failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
if (shouldWakeFromRestartSentinel()) {
|
||||
setTimeout(() => {
|
||||
void scheduleRestartSentinelWake({ deps: params.deps });
|
||||
}, 750);
|
||||
}
|
||||
|
||||
scheduleSubagentOrphanRecovery();
|
||||
|
||||
return { pluginServices };
|
||||
}
|
||||
|
||||
export async function startGatewayPostAttachRuntime(params: {
|
||||
minimalTestGateway: boolean;
|
||||
cfgAtStart: ReturnType<typeof loadConfig>;
|
||||
bindHost: string;
|
||||
bindHosts: string[];
|
||||
port: number;
|
||||
tlsEnabled: boolean;
|
||||
pluginCount: number;
|
||||
log: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
};
|
||||
isNixMode: boolean;
|
||||
startupStartedAt?: number;
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
tailscaleMode: GatewayTailscaleMode;
|
||||
resetOnExit: boolean;
|
||||
controlUiBasePath: string;
|
||||
logTailscale: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
error: (msg: string) => void;
|
||||
debug?: (msg: string) => void;
|
||||
};
|
||||
gatewayPluginConfigAtStart: ReturnType<typeof loadConfig>;
|
||||
pluginRegistry: ReturnType<typeof loadOpenClawPlugins>;
|
||||
defaultWorkspaceDir: string;
|
||||
deps: CliDeps;
|
||||
startChannels: () => Promise<void>;
|
||||
logHooks: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
error: (msg: string) => void;
|
||||
};
|
||||
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
|
||||
unavailableGatewayMethods: Set<string>;
|
||||
}) {
|
||||
logGatewayStartup({
|
||||
cfg: params.cfgAtStart,
|
||||
bindHost: params.bindHost,
|
||||
bindHosts: params.bindHosts,
|
||||
port: params.port,
|
||||
tlsEnabled: params.tlsEnabled,
|
||||
pluginCount: params.pluginCount,
|
||||
log: params.log,
|
||||
isNixMode: params.isNixMode,
|
||||
startupStartedAt: params.startupStartedAt,
|
||||
});
|
||||
|
||||
const stopGatewayUpdateCheck = params.minimalTestGateway
|
||||
? () => {}
|
||||
: scheduleGatewayUpdateCheck({
|
||||
cfg: params.cfgAtStart,
|
||||
log: params.log,
|
||||
isNixMode: params.isNixMode,
|
||||
onUpdateAvailableChange: (updateAvailable) => {
|
||||
const payload: GatewayUpdateAvailableEventPayload = { updateAvailable };
|
||||
params.broadcast(GATEWAY_EVENT_UPDATE_AVAILABLE, payload, { dropIfSlow: true });
|
||||
},
|
||||
});
|
||||
|
||||
const tailscaleCleanup = params.minimalTestGateway
|
||||
? null
|
||||
: await startGatewayTailscaleExposure({
|
||||
tailscaleMode: params.tailscaleMode,
|
||||
resetOnExit: params.resetOnExit,
|
||||
port: params.port,
|
||||
controlUiBasePath: params.controlUiBasePath,
|
||||
logTailscale: params.logTailscale,
|
||||
});
|
||||
|
||||
let pluginServices: PluginServicesHandle | null = null;
|
||||
if (!params.minimalTestGateway) {
|
||||
params.log.info("starting channels and sidecars...");
|
||||
({ pluginServices } = await startGatewaySidecars({
|
||||
cfg: params.gatewayPluginConfigAtStart,
|
||||
pluginRegistry: params.pluginRegistry,
|
||||
defaultWorkspaceDir: params.defaultWorkspaceDir,
|
||||
deps: params.deps,
|
||||
startChannels: params.startChannels,
|
||||
log: params.log,
|
||||
logHooks: params.logHooks,
|
||||
logChannels: params.logChannels,
|
||||
}));
|
||||
params.unavailableGatewayMethods.delete("chat.history");
|
||||
}
|
||||
|
||||
if (!params.minimalTestGateway) {
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
if (hookRunner?.hasHooks("gateway_start")) {
|
||||
void hookRunner.runGatewayStart({ port: params.port }, { port: params.port }).catch((err) => {
|
||||
params.log.warn(`gateway_start hook failed: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return { stopGatewayUpdateCheck, tailscaleCleanup, pluginServices };
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
prewarmConfiguredPrimaryModel,
|
||||
};
|
||||
@@ -1,232 +1,6 @@
|
||||
import { getAcpSessionManager } from "../acp/control-plane/manager.js";
|
||||
import { ACP_SESSION_IDENTITY_RENDERER_VERSION } from "../acp/runtime/session-identifiers.js";
|
||||
import { resolveOpenClawAgentDir } from "../agents/agent-paths.js";
|
||||
import { DEFAULT_MODEL, DEFAULT_PROVIDER } from "../agents/defaults.js";
|
||||
import { loadModelCatalog } from "../agents/model-catalog.js";
|
||||
import {
|
||||
getModelRefStatus,
|
||||
isCliProvider,
|
||||
resolveConfiguredModelRef,
|
||||
resolveHooksGmailModel,
|
||||
} from "../agents/model-selection.js";
|
||||
import { ensureOpenClawModelsJson } from "../agents/models-config.js";
|
||||
import { resolveModel } from "../agents/pi-embedded-runner/model.js";
|
||||
import { resolveAgentSessionDirs } from "../agents/session-dirs.js";
|
||||
import { cleanStaleLockFiles } from "../agents/session-write-lock.js";
|
||||
import { scheduleSubagentOrphanRecovery } from "../agents/subagent-registry.js";
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { loadConfig } from "../config/config.js";
|
||||
import { resolveAgentModelPrimaryValue } from "../config/model-input.js";
|
||||
import { resolveStateDir } from "../config/paths.js";
|
||||
import { startGmailWatcherWithLogs } from "../hooks/gmail-watcher-lifecycle.js";
|
||||
import {
|
||||
clearInternalHooks,
|
||||
createInternalHookEvent,
|
||||
triggerInternalHook,
|
||||
} from "../hooks/internal-hooks.js";
|
||||
import { loadInternalHooks } from "../hooks/loader.js";
|
||||
import { isTruthyEnvValue } from "../infra/env.js";
|
||||
import type { loadOpenClawPlugins } from "../plugins/loader.js";
|
||||
import { type PluginServicesHandle, startPluginServices } from "../plugins/services.js";
|
||||
import {
|
||||
scheduleRestartSentinelWake,
|
||||
shouldWakeFromRestartSentinel,
|
||||
} from "./server-restart-sentinel.js";
|
||||
import { startGatewayMemoryBackend } from "./server-startup-memory.js";
|
||||
|
||||
const SESSION_LOCK_STALE_MS = 30 * 60 * 1000;
|
||||
|
||||
async function prewarmConfiguredPrimaryModel(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
log: { warn: (msg: string) => void };
|
||||
}): Promise<void> {
|
||||
const explicitPrimary = resolveAgentModelPrimaryValue(params.cfg.agents?.defaults?.model)?.trim();
|
||||
if (!explicitPrimary) {
|
||||
return;
|
||||
}
|
||||
const { provider, model } = resolveConfiguredModelRef({
|
||||
cfg: params.cfg,
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
defaultModel: DEFAULT_MODEL,
|
||||
});
|
||||
if (isCliProvider(provider, params.cfg)) {
|
||||
return;
|
||||
}
|
||||
const agentDir = resolveOpenClawAgentDir();
|
||||
try {
|
||||
await ensureOpenClawModelsJson(params.cfg, agentDir);
|
||||
const resolved = resolveModel(provider, model, agentDir, params.cfg, {
|
||||
skipProviderRuntimeHooks: true,
|
||||
});
|
||||
if (!resolved.model) {
|
||||
throw new Error(
|
||||
resolved.error ??
|
||||
`Unknown model: ${provider}/${model} (startup warmup only checks static model resolution)`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`startup model warmup failed for ${provider}/${model}: ${String(err)}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function startGatewaySidecars(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
pluginRegistry: ReturnType<typeof loadOpenClawPlugins>;
|
||||
defaultWorkspaceDir: string;
|
||||
deps: CliDeps;
|
||||
startChannels: () => Promise<void>;
|
||||
log: { warn: (msg: string) => void };
|
||||
logHooks: {
|
||||
info: (msg: string) => void;
|
||||
warn: (msg: string) => void;
|
||||
error: (msg: string) => void;
|
||||
};
|
||||
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
|
||||
}) {
|
||||
try {
|
||||
const stateDir = resolveStateDir(process.env);
|
||||
const sessionDirs = await resolveAgentSessionDirs(stateDir);
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: SESSION_LOCK_STALE_MS,
|
||||
removeStale: true,
|
||||
log: { warn: (message) => params.log.warn(message) },
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`session lock cleanup failed on startup: ${String(err)}`);
|
||||
}
|
||||
|
||||
// Start Gmail watcher if configured (hooks.gmail.account).
|
||||
await startGmailWatcherWithLogs({
|
||||
cfg: params.cfg,
|
||||
log: params.logHooks,
|
||||
});
|
||||
|
||||
// Validate hooks.gmail.model if configured.
|
||||
if (params.cfg.hooks?.gmail?.model) {
|
||||
const hooksModelRef = resolveHooksGmailModel({
|
||||
cfg: params.cfg,
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
});
|
||||
if (hooksModelRef) {
|
||||
const { provider: defaultProvider, model: defaultModel } = resolveConfiguredModelRef({
|
||||
cfg: params.cfg,
|
||||
defaultProvider: DEFAULT_PROVIDER,
|
||||
defaultModel: DEFAULT_MODEL,
|
||||
});
|
||||
const catalog = await loadModelCatalog({ config: params.cfg });
|
||||
const status = getModelRefStatus({
|
||||
cfg: params.cfg,
|
||||
catalog,
|
||||
ref: hooksModelRef,
|
||||
defaultProvider,
|
||||
defaultModel,
|
||||
});
|
||||
if (!status.allowed) {
|
||||
params.logHooks.warn(
|
||||
`hooks.gmail.model "${status.key}" not in agents.defaults.models allowlist (will use primary instead)`,
|
||||
);
|
||||
}
|
||||
if (!status.inCatalog) {
|
||||
params.logHooks.warn(
|
||||
`hooks.gmail.model "${status.key}" not in the model catalog (may fail at runtime)`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Load internal hook handlers from configuration and directory discovery.
|
||||
try {
|
||||
// Clear any previously registered hooks to ensure fresh loading
|
||||
clearInternalHooks();
|
||||
const loadedCount = await loadInternalHooks(params.cfg, params.defaultWorkspaceDir);
|
||||
if (loadedCount > 0) {
|
||||
params.logHooks.info(
|
||||
`loaded ${loadedCount} internal hook handler${loadedCount > 1 ? "s" : ""}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
params.logHooks.error(`failed to load hooks: ${String(err)}`);
|
||||
}
|
||||
|
||||
// Launch configured channels so gateway replies via the surface the message came from.
|
||||
// Tests can opt out via OPENCLAW_SKIP_CHANNELS (or legacy OPENCLAW_SKIP_PROVIDERS).
|
||||
const skipChannels =
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_CHANNELS) ||
|
||||
isTruthyEnvValue(process.env.OPENCLAW_SKIP_PROVIDERS);
|
||||
if (!skipChannels) {
|
||||
try {
|
||||
await prewarmConfiguredPrimaryModel({
|
||||
cfg: params.cfg,
|
||||
log: params.log,
|
||||
});
|
||||
await params.startChannels();
|
||||
} catch (err) {
|
||||
params.logChannels.error(`channel startup failed: ${String(err)}`);
|
||||
}
|
||||
} else {
|
||||
params.logChannels.info(
|
||||
"skipping channel start (OPENCLAW_SKIP_CHANNELS=1 or OPENCLAW_SKIP_PROVIDERS=1)",
|
||||
);
|
||||
}
|
||||
|
||||
if (params.cfg.hooks?.internal?.enabled !== false) {
|
||||
setTimeout(() => {
|
||||
const hookEvent = createInternalHookEvent("gateway", "startup", "gateway:startup", {
|
||||
cfg: params.cfg,
|
||||
deps: params.deps,
|
||||
workspaceDir: params.defaultWorkspaceDir,
|
||||
});
|
||||
void triggerInternalHook(hookEvent);
|
||||
}, 250);
|
||||
}
|
||||
|
||||
let pluginServices: PluginServicesHandle | null = null;
|
||||
try {
|
||||
pluginServices = await startPluginServices({
|
||||
registry: params.pluginRegistry,
|
||||
config: params.cfg,
|
||||
workspaceDir: params.defaultWorkspaceDir,
|
||||
});
|
||||
} catch (err) {
|
||||
params.log.warn(`plugin services failed to start: ${String(err)}`);
|
||||
}
|
||||
|
||||
if (params.cfg.acp?.enabled) {
|
||||
void getAcpSessionManager()
|
||||
.reconcilePendingSessionIdentities({ cfg: params.cfg })
|
||||
.then((result) => {
|
||||
if (result.checked === 0) {
|
||||
return;
|
||||
}
|
||||
params.log.warn(
|
||||
`acp startup identity reconcile (renderer=${ACP_SESSION_IDENTITY_RENDERER_VERSION}): checked=${result.checked} resolved=${result.resolved} failed=${result.failed}`,
|
||||
);
|
||||
})
|
||||
.catch((err) => {
|
||||
params.log.warn(`acp startup identity reconcile failed: ${String(err)}`);
|
||||
});
|
||||
}
|
||||
|
||||
void startGatewayMemoryBackend({ cfg: params.cfg, log: params.log }).catch((err) => {
|
||||
params.log.warn(`qmd memory startup initialization failed: ${String(err)}`);
|
||||
});
|
||||
|
||||
if (shouldWakeFromRestartSentinel()) {
|
||||
setTimeout(() => {
|
||||
void scheduleRestartSentinelWake({ deps: params.deps });
|
||||
}, 750);
|
||||
}
|
||||
|
||||
// Same-process SIGUSR1 restarts keep subagent registry memory alive, so
|
||||
// schedule recovery on every startup cycle instead of only cold restore.
|
||||
scheduleSubagentOrphanRecovery();
|
||||
|
||||
return { pluginServices };
|
||||
}
|
||||
|
||||
export const __testing = {
|
||||
prewarmConfiguredPrimaryModel,
|
||||
};
|
||||
export { startGatewayEarlyRuntime } from "./server-startup-early.js";
|
||||
export {
|
||||
__testing,
|
||||
startGatewayPostAttachRuntime,
|
||||
startGatewaySidecars,
|
||||
} from "./server-startup-post-attach.js";
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user