mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 18:10:45 +00:00
perf: slim gateway startup imports
This commit is contained in:
@@ -241,6 +241,20 @@ async function waitForStart(started: Promise<void>) {
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
}
|
||||
|
||||
async function waitForAssertion(assertion: () => void, maxTicks = 20) {
|
||||
for (let tick = 0; tick < maxTicks; tick += 1) {
|
||||
try {
|
||||
assertion();
|
||||
return;
|
||||
} catch (err) {
|
||||
if (tick === maxTicks - 1) {
|
||||
throw err;
|
||||
}
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function createSignaledLoopHarness(exitCallOrder?: string[]) {
|
||||
const close = vi.fn(async () => {});
|
||||
const { start, started } = createSignaledStart(close);
|
||||
@@ -302,7 +316,7 @@ describe("runGatewayLoop", () => {
|
||||
reason: "gateway restarting",
|
||||
restartExpectedMs: 1500,
|
||||
});
|
||||
expect(start).toHaveBeenCalledTimes(2);
|
||||
await waitForAssertion(() => expect(start).toHaveBeenCalledTimes(2));
|
||||
|
||||
sigint();
|
||||
await expect(exited).resolves.toBe(0);
|
||||
|
||||
@@ -1,43 +1,9 @@
|
||||
import net from "node:net";
|
||||
import {
|
||||
abortEmbeddedPiRun,
|
||||
getActiveEmbeddedRunCount,
|
||||
waitForActiveEmbeddedRuns,
|
||||
} from "../../agents/pi-embedded-runner/runs.js";
|
||||
import { getRuntimeConfig } from "../../config/config.js";
|
||||
import type { startGatewayServer } from "../../gateway/server.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { acquireGatewayLock } from "../../infra/gateway-lock.js";
|
||||
import {
|
||||
respawnGatewayProcessForUpdate,
|
||||
restartGatewayProcessWithFreshPid,
|
||||
} from "../../infra/process-respawn.js";
|
||||
import { markUpdateRestartSentinelFailure } from "../../infra/restart-sentinel.js";
|
||||
import {
|
||||
consumeGatewaySigusr1RestartAuthorization,
|
||||
consumeGatewayRestartIntentSync,
|
||||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
markGatewaySigusr1RestartHandled,
|
||||
peekGatewaySigusr1RestartReason,
|
||||
resetGatewayRestartStateForInProcessRestart,
|
||||
scheduleGatewaySigusr1Restart,
|
||||
} from "../../infra/restart.js";
|
||||
import { detectRespawnSupervisor } from "../../infra/supervisor-markers.js";
|
||||
import { writeDiagnosticStabilityBundleForFailureSync } from "../../logging/diagnostic-stability-bundle.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import {
|
||||
getActiveBundledRuntimeDepsInstallCount,
|
||||
waitForBundledRuntimeDepsInstallIdle,
|
||||
} from "../../plugins/bundled-runtime-deps-activity.js";
|
||||
import {
|
||||
getActiveTaskCount,
|
||||
markGatewayDraining,
|
||||
resetAllLanes,
|
||||
waitForActiveTasks,
|
||||
} from "../../process/command-queue.js";
|
||||
import { createRestartIterationHook } from "../../process/restart-recovery.js";
|
||||
import type { RuntimeEnv } from "../../runtime.js";
|
||||
import { reloadTaskRegistryFromStore } from "../../tasks/runtime-internal.js";
|
||||
|
||||
const gatewayLog = createSubsystemLogger("gateway");
|
||||
const LAUNCHD_SUPERVISED_RESTART_EXIT_DELAY_MS = 1500;
|
||||
@@ -49,6 +15,61 @@ const UPDATE_RESPAWN_HEALTH_POLL_MS = 200;
|
||||
type GatewayRunSignalAction = "stop" | "restart";
|
||||
type RestartDrainTimeoutMs = number | undefined;
|
||||
|
||||
type EmbeddedRunsModule = typeof import("../../agents/pi-embedded-runner/runs.js");
|
||||
type RuntimeConfigModule = typeof import("../../config/config.js");
|
||||
type ProcessRespawnModule = typeof import("../../infra/process-respawn.js");
|
||||
type RestartSentinelModule = typeof import("../../infra/restart-sentinel.js");
|
||||
type RestartModule = typeof import("../../infra/restart.js");
|
||||
type SupervisorMarkersModule = typeof import("../../infra/supervisor-markers.js");
|
||||
type DiagnosticStabilityBundleModule =
|
||||
typeof import("../../logging/diagnostic-stability-bundle.js");
|
||||
type BundledRuntimeDepsActivityModule =
|
||||
typeof import("../../plugins/bundled-runtime-deps-activity.js");
|
||||
type CommandQueueModule = typeof import("../../process/command-queue.js");
|
||||
type RuntimeInternalModule = typeof import("../../tasks/runtime-internal.js");
|
||||
|
||||
let embeddedRunsModule: Promise<EmbeddedRunsModule> | undefined;
|
||||
let runtimeConfigModule: Promise<RuntimeConfigModule> | undefined;
|
||||
let processRespawnModule: Promise<ProcessRespawnModule> | undefined;
|
||||
let restartSentinelModule: Promise<RestartSentinelModule> | undefined;
|
||||
let restartModule: Promise<RestartModule> | undefined;
|
||||
let supervisorMarkersModule: Promise<SupervisorMarkersModule> | undefined;
|
||||
let diagnosticStabilityBundleModule: Promise<DiagnosticStabilityBundleModule> | undefined;
|
||||
let bundledRuntimeDepsActivityModule: Promise<BundledRuntimeDepsActivityModule> | undefined;
|
||||
let commandQueueModule: Promise<CommandQueueModule> | undefined;
|
||||
let runtimeInternalModule: Promise<RuntimeInternalModule> | undefined;
|
||||
|
||||
const loadEmbeddedRunsModule = () =>
|
||||
(embeddedRunsModule ??= import("../../agents/pi-embedded-runner/runs.js"));
|
||||
const loadRuntimeConfigModule = () => (runtimeConfigModule ??= import("../../config/config.js"));
|
||||
const loadProcessRespawnModule = () =>
|
||||
(processRespawnModule ??= import("../../infra/process-respawn.js"));
|
||||
const loadRestartSentinelModule = () =>
|
||||
(restartSentinelModule ??= import("../../infra/restart-sentinel.js"));
|
||||
const loadRestartModule = () => (restartModule ??= import("../../infra/restart.js"));
|
||||
const loadSupervisorMarkersModule = () =>
|
||||
(supervisorMarkersModule ??= import("../../infra/supervisor-markers.js"));
|
||||
const loadDiagnosticStabilityBundleModule = () =>
|
||||
(diagnosticStabilityBundleModule ??= import("../../logging/diagnostic-stability-bundle.js"));
|
||||
const loadBundledRuntimeDepsActivityModule = () =>
|
||||
(bundledRuntimeDepsActivityModule ??= import("../../plugins/bundled-runtime-deps-activity.js"));
|
||||
const loadCommandQueueModule = () =>
|
||||
(commandQueueModule ??= import("../../process/command-queue.js"));
|
||||
const loadRuntimeInternalModule = () =>
|
||||
(runtimeInternalModule ??= import("../../tasks/runtime-internal.js"));
|
||||
|
||||
function createRestartIterationHook(onRestart: () => Promise<void> | void): () => Promise<boolean> {
|
||||
let isFirstIteration = true;
|
||||
return async () => {
|
||||
if (isFirstIteration) {
|
||||
isFirstIteration = false;
|
||||
return false;
|
||||
}
|
||||
await onRestart();
|
||||
return true;
|
||||
};
|
||||
}
|
||||
|
||||
async function waitForGatewayPortReady(host: string, port: number): Promise<boolean> {
|
||||
return await new Promise<boolean>((resolve) => {
|
||||
const socket = net.createConnection({ host, port });
|
||||
@@ -114,7 +135,9 @@ export async function runGatewayLoop(params: {
|
||||
cleanupSignals();
|
||||
params.runtime.exit(code);
|
||||
};
|
||||
const writeStabilityBundle = (reason: string, error?: unknown) => {
|
||||
const writeStabilityBundle = async (reason: string, error?: unknown) => {
|
||||
const { writeDiagnosticStabilityBundleForFailureSync } =
|
||||
await loadDiagnosticStabilityBundleModule();
|
||||
const result = writeDiagnosticStabilityBundleForFailureSync(reason, error);
|
||||
if ("message" in result) {
|
||||
gatewayLog.warn(result.message);
|
||||
@@ -142,6 +165,10 @@ export async function runGatewayLoop(params: {
|
||||
const handleRestartAfterServerClose = async (restartReason?: string) => {
|
||||
const hadLock = await releaseLockIfHeld();
|
||||
const isUpdateRestart = restartReason === "update.run";
|
||||
const { respawnGatewayProcessForUpdate, restartGatewayProcessWithFreshPid } =
|
||||
await loadProcessRespawnModule();
|
||||
const { detectRespawnSupervisor } = await loadSupervisorMarkersModule();
|
||||
const { markUpdateRestartSentinelFailure } = await loadRestartSentinelModule();
|
||||
|
||||
if (isUpdateRestart) {
|
||||
const respawn = respawnGatewayProcessForUpdate();
|
||||
@@ -228,7 +255,7 @@ export async function runGatewayLoop(params: {
|
||||
return;
|
||||
}
|
||||
if (respawn.mode === "failed") {
|
||||
writeStabilityBundle("gateway.restart_respawn_failed");
|
||||
await writeStabilityBundle("gateway.restart_respawn_failed");
|
||||
gatewayLog.warn(
|
||||
`full process restart failed (${respawn.detail ?? "unknown error"}); falling back to in-process restart`,
|
||||
);
|
||||
@@ -250,8 +277,9 @@ export async function runGatewayLoop(params: {
|
||||
|
||||
const SUPERVISOR_STOP_TIMEOUT_MS = 30_000;
|
||||
const SHUTDOWN_TIMEOUT_MS = SUPERVISOR_STOP_TIMEOUT_MS - 5_000;
|
||||
const resolveRestartDrainTimeoutMs = (): RestartDrainTimeoutMs => {
|
||||
const resolveRestartDrainTimeoutMs = async (): Promise<RestartDrainTimeoutMs> => {
|
||||
try {
|
||||
const { getRuntimeConfig } = await loadRuntimeConfigModule();
|
||||
const timeoutMs = getRuntimeConfig().gateway?.reload?.deferralTimeoutMs;
|
||||
return typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs > 0
|
||||
? timeoutMs
|
||||
@@ -268,7 +296,6 @@ export async function runGatewayLoop(params: {
|
||||
}
|
||||
shuttingDown = true;
|
||||
const isRestart = action === "restart";
|
||||
const restartDrainTimeoutMs = isRestart ? resolveRestartDrainTimeoutMs() : 0;
|
||||
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
|
||||
|
||||
let forceExitTimer: ReturnType<typeof setTimeout> | null = null;
|
||||
@@ -278,13 +305,18 @@ export async function runGatewayLoop(params: {
|
||||
}
|
||||
forceExitTimer = setTimeout(() => {
|
||||
gatewayLog.error("shutdown timed out; exiting without full cleanup");
|
||||
writeStabilityBundle(
|
||||
isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout",
|
||||
);
|
||||
// Keep the in-process watchdog below the supervisor stop budget so this
|
||||
// path wins before launchd/systemd escalates to a hard kill. Exit
|
||||
// non-zero on any timeout so supervised installs restart cleanly.
|
||||
exitProcess(1);
|
||||
void (async () => {
|
||||
try {
|
||||
await writeStabilityBundle(
|
||||
isRestart ? "gateway.restart_shutdown_timeout" : "gateway.stop_shutdown_timeout",
|
||||
);
|
||||
} finally {
|
||||
// Keep the in-process watchdog below the supervisor stop budget so this
|
||||
// path wins before launchd/systemd escalates to a hard kill. Exit
|
||||
// non-zero on any timeout so supervised installs restart cleanly.
|
||||
exitProcess(1);
|
||||
}
|
||||
})();
|
||||
}, forceExitMs);
|
||||
};
|
||||
const clearForceExitTimer = () => {
|
||||
@@ -295,35 +327,45 @@ export async function runGatewayLoop(params: {
|
||||
forceExitTimer = null;
|
||||
};
|
||||
|
||||
if (!isRestart) {
|
||||
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
|
||||
} else if (restartDrainTimeoutMs !== undefined) {
|
||||
// Allow extra time for draining active turns on explicitly capped restarts.
|
||||
armForceExitTimer(restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
const formatRestartDrainBudget = () =>
|
||||
restartDrainTimeoutMs === undefined
|
||||
? "without a timeout"
|
||||
: `with timeout ${restartDrainTimeoutMs}ms`;
|
||||
const createStillPendingDrainLogger = () =>
|
||||
setInterval(() => {
|
||||
gatewayLog.warn(
|
||||
`still draining ${getActiveTaskCount()} active task(s), ${getActiveEmbeddedRunCount()} active embedded run(s), and ${getActiveBundledRuntimeDepsInstallCount()} runtime deps install(s) before restart`,
|
||||
);
|
||||
}, RESTART_DRAIN_STILL_PENDING_WARN_MS);
|
||||
|
||||
const armCloseForceExitTimerForIndefiniteRestart = () => {
|
||||
if (isRestart && restartDrainTimeoutMs === undefined) {
|
||||
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
|
||||
}
|
||||
};
|
||||
|
||||
void (async () => {
|
||||
const restartDrainTimeoutMs = isRestart ? await resolveRestartDrainTimeoutMs() : 0;
|
||||
if (!isRestart) {
|
||||
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
|
||||
} else if (restartDrainTimeoutMs !== undefined) {
|
||||
// Allow extra time for draining active turns on explicitly capped restarts.
|
||||
armForceExitTimer(restartDrainTimeoutMs + SHUTDOWN_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
const formatRestartDrainBudget = () =>
|
||||
restartDrainTimeoutMs === undefined
|
||||
? "without a timeout"
|
||||
: `with timeout ${restartDrainTimeoutMs}ms`;
|
||||
const armCloseForceExitTimerForIndefiniteRestart = () => {
|
||||
if (isRestart && restartDrainTimeoutMs === undefined) {
|
||||
armForceExitTimer(SHUTDOWN_TIMEOUT_MS);
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
// On restart, wait for in-flight agent turns to finish before
|
||||
// tearing down the server so buffered messages are delivered.
|
||||
if (isRestart) {
|
||||
const [
|
||||
{ abortEmbeddedPiRun, getActiveEmbeddedRunCount, waitForActiveEmbeddedRuns },
|
||||
{ getActiveBundledRuntimeDepsInstallCount, waitForBundledRuntimeDepsInstallIdle },
|
||||
{ getActiveTaskCount, markGatewayDraining, waitForActiveTasks },
|
||||
] = await Promise.all([
|
||||
loadEmbeddedRunsModule(),
|
||||
loadBundledRuntimeDepsActivityModule(),
|
||||
loadCommandQueueModule(),
|
||||
]);
|
||||
const createStillPendingDrainLogger = () =>
|
||||
setInterval(() => {
|
||||
gatewayLog.warn(
|
||||
`still draining ${getActiveTaskCount()} active task(s), ${getActiveEmbeddedRunCount()} active embedded run(s), and ${getActiveBundledRuntimeDepsInstallCount()} runtime deps install(s) before restart`,
|
||||
);
|
||||
}, RESTART_DRAIN_STILL_PENDING_WARN_MS);
|
||||
|
||||
// Reject new enqueues immediately during the drain window so
|
||||
// sessions get an explicit restart error instead of silent task loss.
|
||||
markGatewayDraining();
|
||||
@@ -385,7 +427,10 @@ export async function runGatewayLoop(params: {
|
||||
|
||||
const onSigterm = () => {
|
||||
gatewayLog.info("signal SIGTERM received");
|
||||
request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM");
|
||||
void (async () => {
|
||||
const { consumeGatewayRestartIntentSync } = await loadRestartModule();
|
||||
request(consumeGatewayRestartIntentSync() ? "restart" : "stop", "SIGTERM");
|
||||
})();
|
||||
};
|
||||
const onSigint = () => {
|
||||
gatewayLog.info("signal SIGINT received");
|
||||
@@ -393,26 +438,35 @@ export async function runGatewayLoop(params: {
|
||||
};
|
||||
const onSigusr1 = () => {
|
||||
gatewayLog.info("signal SIGUSR1 received");
|
||||
const authorized = consumeGatewaySigusr1RestartAuthorization();
|
||||
if (!authorized) {
|
||||
if (!isGatewaySigusr1RestartExternallyAllowed()) {
|
||||
gatewayLog.warn(
|
||||
"SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).",
|
||||
);
|
||||
void (async () => {
|
||||
const {
|
||||
consumeGatewaySigusr1RestartAuthorization,
|
||||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
markGatewaySigusr1RestartHandled,
|
||||
peekGatewaySigusr1RestartReason,
|
||||
scheduleGatewaySigusr1Restart,
|
||||
} = await loadRestartModule();
|
||||
const authorized = consumeGatewaySigusr1RestartAuthorization();
|
||||
if (!authorized) {
|
||||
if (!isGatewaySigusr1RestartExternallyAllowed()) {
|
||||
gatewayLog.warn(
|
||||
"SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).",
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (shuttingDown) {
|
||||
gatewayLog.info("received SIGUSR1 during shutdown; ignoring");
|
||||
return;
|
||||
}
|
||||
// External SIGUSR1 requests should still reuse the in-process restart
|
||||
// scheduler so idle drain and restart coalescing stay consistent.
|
||||
scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "SIGUSR1" });
|
||||
return;
|
||||
}
|
||||
if (shuttingDown) {
|
||||
gatewayLog.info("received SIGUSR1 during shutdown; ignoring");
|
||||
return;
|
||||
}
|
||||
// External SIGUSR1 requests should still reuse the in-process restart
|
||||
// scheduler so idle drain and restart coalescing stay consistent.
|
||||
scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "SIGUSR1" });
|
||||
return;
|
||||
}
|
||||
const restartReason = peekGatewaySigusr1RestartReason();
|
||||
markGatewaySigusr1RestartHandled();
|
||||
request("restart", "SIGUSR1", restartReason);
|
||||
const restartReason = peekGatewaySigusr1RestartReason();
|
||||
markGatewaySigusr1RestartHandled();
|
||||
request("restart", "SIGUSR1", restartReason);
|
||||
})();
|
||||
};
|
||||
|
||||
process.on("SIGTERM", onSigterm);
|
||||
@@ -420,13 +474,22 @@ export async function runGatewayLoop(params: {
|
||||
process.on("SIGUSR1", onSigusr1);
|
||||
|
||||
try {
|
||||
const onIteration = createRestartIterationHook(() => {
|
||||
const onIteration = createRestartIterationHook(async () => {
|
||||
// After an in-process restart (SIGUSR1), reset command-queue lane state.
|
||||
// Interrupted tasks from the previous lifecycle may have left `active`
|
||||
// counts elevated (their finally blocks never ran), permanently blocking
|
||||
// new work from draining. The same boundary also discards stale restart
|
||||
// deferral timers and reloads the task registry from durable state so
|
||||
// cancelled/completed work is not kept alive by old in-memory maps.
|
||||
const [
|
||||
{ resetAllLanes },
|
||||
{ resetGatewayRestartStateForInProcessRestart },
|
||||
{ reloadTaskRegistryFromStore },
|
||||
] = await Promise.all([
|
||||
loadCommandQueueModule(),
|
||||
loadRestartModule(),
|
||||
loadRuntimeInternalModule(),
|
||||
]);
|
||||
resetAllLanes();
|
||||
resetGatewayRestartStateForInProcessRestart();
|
||||
reloadTaskRegistryFromStore();
|
||||
@@ -436,7 +499,7 @@ export async function runGatewayLoop(params: {
|
||||
// SIGTERM/SIGINT still exit after a graceful shutdown.
|
||||
let isFirstStart = true;
|
||||
for (;;) {
|
||||
onIteration();
|
||||
await onIteration();
|
||||
try {
|
||||
server = await params.start({ startupStartedAt });
|
||||
isFirstStart = false;
|
||||
@@ -456,7 +519,7 @@ export async function runGatewayLoop(params: {
|
||||
await releaseLockIfHeld();
|
||||
const errMsg = formatErrorMessage(err);
|
||||
const errStack = err instanceof Error && err.stack ? `\n${err.stack}` : "";
|
||||
writeStabilityBundle("gateway.restart_startup_failed", err);
|
||||
await writeStabilityBundle("gateway.restart_startup_failed", err);
|
||||
gatewayLog.error(
|
||||
`gateway startup failed: ${errMsg}. ` +
|
||||
`Process will stay alive; fix the issue and restart.${errStack}`,
|
||||
|
||||
@@ -2,29 +2,15 @@ import fs from "node:fs";
|
||||
import { request } from "node:http";
|
||||
import path from "node:path";
|
||||
import type { Command } from "commander";
|
||||
import { readSecretFromFile } from "../../acp/secret-file.js";
|
||||
import type {
|
||||
ConfigFileSnapshot,
|
||||
GatewayAuthMode,
|
||||
GatewayBindMode,
|
||||
GatewayTailscaleMode,
|
||||
} from "../../config/config.js";
|
||||
import {
|
||||
CONFIG_PATH,
|
||||
readBestEffortConfig,
|
||||
readConfigFileSnapshot,
|
||||
recoverConfigFromLastKnownGood,
|
||||
recoverConfigFromJsonRootSuffix,
|
||||
resolveStateDir,
|
||||
resolveGatewayPort,
|
||||
} from "../../config/config.js";
|
||||
import {
|
||||
formatFutureConfigActionBlock,
|
||||
resolveFutureConfigActionBlock,
|
||||
} from "../../config/future-version-guard.js";
|
||||
import { CONFIG_PATH, resolveGatewayPort, resolveStateDir } from "../../config/paths.js";
|
||||
import type { OpenClawConfig } from "../../config/types.openclaw.js";
|
||||
import { hasConfiguredSecretInput } from "../../config/types.secrets.js";
|
||||
import { resolveGatewayAuth } from "../../gateway/auth.js";
|
||||
import {
|
||||
defaultGatewayBindMode,
|
||||
isContainerEnvironment,
|
||||
@@ -33,16 +19,11 @@ import {
|
||||
import type { GatewayWsLogStyle } from "../../gateway/ws-logging.js";
|
||||
import { setGatewayWsLogStyle } from "../../gateway/ws-logging.js";
|
||||
import { setVerbose } from "../../globals.js";
|
||||
import { resolveControlUiRootSync } from "../../infra/control-ui-assets.js";
|
||||
import { isTruthyEnvValue } from "../../infra/env.js";
|
||||
import { formatErrorMessage } from "../../infra/errors.js";
|
||||
import { GatewayLockError } from "../../infra/gateway-lock.js";
|
||||
import { formatPortDiagnostics, inspectPortUsage } from "../../infra/ports.js";
|
||||
import { writeRestartSentinel } from "../../infra/restart-sentinel.js";
|
||||
import { cleanStaleGatewayProcessesSync } from "../../infra/restart-stale-pids.js";
|
||||
import { detectRespawnSupervisor } from "../../infra/supervisor-markers.js";
|
||||
import type { RespawnSupervisor } from "../../infra/supervisor-markers.js";
|
||||
import { setConsoleSubsystemFilter, setConsoleTimestampPrefix } from "../../logging/console.js";
|
||||
import { writeDiagnosticStabilityBundleForFailureSync } from "../../logging/diagnostic-stability-bundle.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import {
|
||||
@@ -51,16 +32,9 @@ import {
|
||||
} from "../../shared/string-coerce.js";
|
||||
import { formatCliCommand } from "../command-format.js";
|
||||
import { inheritOptionFromParent } from "../command-options.js";
|
||||
import { forceFreePortAndWait, waitForPortBindable } from "../ports.js";
|
||||
import { withProgress } from "../progress.js";
|
||||
import { ensureDevGatewayConfig } from "./dev.js";
|
||||
import { parsePort } from "../shared/parse-port.js";
|
||||
import { runGatewayLoop } from "./run-loop.js";
|
||||
import {
|
||||
extractGatewayMiskeys,
|
||||
maybeExplainGatewayServiceStop,
|
||||
parsePort,
|
||||
toOptionString,
|
||||
} from "./shared.js";
|
||||
|
||||
type GatewayRunOpts = {
|
||||
port?: unknown;
|
||||
@@ -135,6 +109,34 @@ const GATEWAY_AUTH_MODES: readonly GatewayAuthMode[] = [
|
||||
];
|
||||
const GATEWAY_TAILSCALE_MODES: readonly GatewayTailscaleMode[] = ["off", "serve", "funnel"];
|
||||
|
||||
const toOptionString = (value: unknown): string | undefined => {
|
||||
if (typeof value === "string") {
|
||||
return value;
|
||||
}
|
||||
if (typeof value === "number" || typeof value === "bigint") {
|
||||
return value.toString();
|
||||
}
|
||||
return undefined;
|
||||
};
|
||||
|
||||
function extractGatewayMiskeys(parsed: unknown): {
|
||||
hasGatewayToken: boolean;
|
||||
hasRemoteToken: boolean;
|
||||
} {
|
||||
if (!parsed || typeof parsed !== "object") {
|
||||
return { hasGatewayToken: false, hasRemoteToken: false };
|
||||
}
|
||||
const gateway = (parsed as Record<string, unknown>).gateway;
|
||||
if (!gateway || typeof gateway !== "object") {
|
||||
return { hasGatewayToken: false, hasRemoteToken: false };
|
||||
}
|
||||
const hasGatewayToken = "token" in (gateway as Record<string, unknown>);
|
||||
const remote = (gateway as Record<string, unknown>).remote;
|
||||
const hasRemoteToken =
|
||||
remote && typeof remote === "object" ? "token" in (remote as Record<string, unknown>) : false;
|
||||
return { hasGatewayToken, hasRemoteToken };
|
||||
}
|
||||
|
||||
function createGatewayCliStartupTrace() {
|
||||
const enabled = isTruthyEnvValue(process.env.OPENCLAW_GATEWAY_STARTUP_TRACE);
|
||||
const started = performance.now();
|
||||
@@ -171,13 +173,14 @@ function warnInlinePasswordFlag() {
|
||||
);
|
||||
}
|
||||
|
||||
function resolveGatewayPasswordOption(opts: GatewayRunOpts): string | undefined {
|
||||
async function resolveGatewayPasswordOption(opts: GatewayRunOpts): Promise<string | undefined> {
|
||||
const direct = toOptionString(opts.password);
|
||||
const file = toOptionString(opts.passwordFile);
|
||||
if (direct && file) {
|
||||
throw new Error("Use either --password or --password-file.");
|
||||
}
|
||||
if (file) {
|
||||
const { readSecretFromFile } = await import("../../acp/secret-file.js");
|
||||
return readSecretFromFile(file, "Gateway password");
|
||||
}
|
||||
return direct;
|
||||
@@ -211,13 +214,14 @@ function formatModeErrorList(modes: readonly string[]): string {
|
||||
return `${quoted.slice(0, -1).join(", ")}, or ${quoted[quoted.length - 1]}`;
|
||||
}
|
||||
|
||||
function maybeLogPendingControlUiBuild(cfg: OpenClawConfig): void {
|
||||
async function maybeLogPendingControlUiBuild(cfg: OpenClawConfig): Promise<void> {
|
||||
if (cfg.gateway?.controlUi?.enabled === false) {
|
||||
return;
|
||||
}
|
||||
if (toOptionString(cfg.gateway?.controlUi?.root)) {
|
||||
return;
|
||||
}
|
||||
const { resolveControlUiRootSync } = await import("../../infra/control-ui-assets.js");
|
||||
if (
|
||||
resolveControlUiRootSync({
|
||||
moduleUrl: import.meta.url,
|
||||
@@ -265,6 +269,12 @@ function getGatewayStartGuardErrors(params: {
|
||||
async function readGatewayStartupConfig(params: {
|
||||
startupTrace: ReturnType<typeof createGatewayCliStartupTrace>;
|
||||
}): Promise<{ cfg: OpenClawConfig; snapshot: ConfigFileSnapshot | null }> {
|
||||
const {
|
||||
readBestEffortConfig,
|
||||
readConfigFileSnapshot,
|
||||
recoverConfigFromLastKnownGood,
|
||||
recoverConfigFromJsonRootSuffix,
|
||||
} = await import("../../config/config.js");
|
||||
let cfg = await params.startupTrace.measure("cli.config-load", () => readBestEffortConfig());
|
||||
let snapshot: ConfigFileSnapshot | null = await params.startupTrace.measure(
|
||||
"cli.config-snapshot",
|
||||
@@ -283,6 +293,7 @@ async function readGatewayStartupConfig(params: {
|
||||
`gateway: restored invalid effective config from last-known-good backup: ${invalidSnapshot.path}`,
|
||||
);
|
||||
try {
|
||||
const { writeRestartSentinel } = await import("../../infra/restart-sentinel.js");
|
||||
await writeRestartSentinel({
|
||||
kind: "config-auto-recovery",
|
||||
status: "ok",
|
||||
@@ -400,7 +411,7 @@ async function probeGatewayHealthz(params: {
|
||||
|
||||
async function runGatewayLoopWithSupervisedLockRecovery(params: {
|
||||
startLoop: () => Promise<void>;
|
||||
supervisor: ReturnType<typeof detectRespawnSupervisor>;
|
||||
supervisor: RespawnSupervisor | null;
|
||||
port: number;
|
||||
healthHost: string;
|
||||
log: GatewayRunLogger;
|
||||
@@ -461,7 +472,9 @@ async function runGatewayLoopWithSupervisedLockRecovery(params: {
|
||||
}
|
||||
}
|
||||
|
||||
function maybeWriteGatewayStartupFailureBundle(err: unknown): void {
|
||||
async function maybeWriteGatewayStartupFailureBundle(err: unknown): Promise<void> {
|
||||
const { writeDiagnosticStabilityBundleForFailureSync } =
|
||||
await import("../../logging/diagnostic-stability-bundle.js");
|
||||
const result = writeDiagnosticStabilityBundleForFailureSync("gateway.startup_failed", err);
|
||||
if ("message" in result) {
|
||||
gatewayLog.warn(result.message);
|
||||
@@ -519,6 +532,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
setConsoleTimestampPrefix(true);
|
||||
|
||||
if (devMode) {
|
||||
const { ensureDevGatewayConfig } = await import("./dev.js");
|
||||
await startupTrace.measure("cli.dev-config", () =>
|
||||
ensureDevGatewayConfig({ reset: Boolean(opts.reset) }),
|
||||
);
|
||||
@@ -526,7 +540,9 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
|
||||
gatewayLog.info("loading configuration…");
|
||||
const { cfg, snapshot } = await readGatewayStartupConfig({ startupTrace });
|
||||
maybeLogPendingControlUiBuild(cfg);
|
||||
void maybeLogPendingControlUiBuild(cfg).catch((err) => {
|
||||
gatewayLog.warn(`Control UI asset check failed: ${String(err)}`);
|
||||
});
|
||||
const portOverride = parsePort(opts.port);
|
||||
if (opts.port !== undefined && portOverride === null) {
|
||||
defaultRuntime.error("Invalid port");
|
||||
@@ -537,6 +553,8 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
defaultRuntime.error("Invalid port");
|
||||
defaultRuntime.exit(1);
|
||||
}
|
||||
const { formatFutureConfigActionBlock, resolveFutureConfigActionBlock } =
|
||||
await import("../../config/future-version-guard.js");
|
||||
const futureStartupBlock = resolveFutureConfigActionBlock({
|
||||
action: "start the gateway service",
|
||||
snapshot,
|
||||
@@ -571,6 +589,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
}
|
||||
const bindExplicitRaw = bindExplicitRawStr as GatewayBindMode | undefined;
|
||||
if (process.env.OPENCLAW_SERVICE_MARKER?.trim()) {
|
||||
const { cleanStaleGatewayProcessesSync } = await import("../../infra/restart-stale-pids.js");
|
||||
const stale = cleanStaleGatewayProcessesSync(port);
|
||||
if (stale.length > 0) {
|
||||
gatewayLog.info(
|
||||
@@ -580,6 +599,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
}
|
||||
if (opts.force) {
|
||||
try {
|
||||
const { forceFreePortAndWait, waitForPortBindable } = await import("../ports.js");
|
||||
const { killed, waitedMs, escalatedToSigkill } = await forceFreePortAndWait(port, {
|
||||
timeoutMs: 2000,
|
||||
intervalMs: 100,
|
||||
@@ -656,7 +676,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
|
||||
let passwordRaw: string | undefined;
|
||||
try {
|
||||
passwordRaw = resolveGatewayPasswordOption(opts);
|
||||
passwordRaw = await resolveGatewayPasswordOption(opts);
|
||||
} catch (err) {
|
||||
defaultRuntime.error(formatErrorMessage(err));
|
||||
defaultRuntime.exit(1);
|
||||
@@ -694,6 +714,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
...(passwordRaw ? { password: passwordRaw } : {}),
|
||||
}
|
||||
: undefined;
|
||||
const { resolveGatewayAuth } = await import("../../gateway/auth.js");
|
||||
const resolvedAuth = await startupTrace.measure("cli.auth-resolve", () =>
|
||||
resolveGatewayAuth({
|
||||
authConfig: cfg.gateway?.auth,
|
||||
@@ -801,6 +822,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
});
|
||||
|
||||
try {
|
||||
const { detectRespawnSupervisor } = await import("../../infra/supervisor-markers.js");
|
||||
await runGatewayLoopWithSupervisedLockRecovery({
|
||||
startLoop,
|
||||
supervisor: detectRespawnSupervisor(process.env),
|
||||
@@ -815,6 +837,7 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
`Gateway failed to start: ${errMessage}\nIf the gateway is supervised, stop it with: ${formatCliCommand("openclaw gateway stop")}`,
|
||||
);
|
||||
try {
|
||||
const { formatPortDiagnostics, inspectPortUsage } = await import("../../infra/ports.js");
|
||||
const diagnostics = await inspectPortUsage(port);
|
||||
if (diagnostics.status === "busy") {
|
||||
for (const line of formatPortDiagnostics(diagnostics)) {
|
||||
@@ -824,11 +847,12 @@ async function runGatewayCommand(opts: GatewayRunOpts) {
|
||||
} catch {
|
||||
// ignore diagnostics failures
|
||||
}
|
||||
const { maybeExplainGatewayServiceStop } = await import("./shared.js");
|
||||
await maybeExplainGatewayServiceStop();
|
||||
defaultRuntime.exit(isHealthyGatewayLockError(err) ? 0 : 1);
|
||||
return;
|
||||
}
|
||||
maybeWriteGatewayStartupFailureBundle(err);
|
||||
await maybeWriteGatewayStartupFailureBundle(err);
|
||||
defaultRuntime.error(`Gateway failed to start: ${String(err)}`);
|
||||
defaultRuntime.exit(1);
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ describe("server-runtime-services", () => {
|
||||
hoisted.deliverOutboundPayloads.mockClear();
|
||||
});
|
||||
|
||||
it("keeps scheduled services inert during initial runtime setup", () => {
|
||||
it("keeps scheduled services inert during initial runtime setup", async () => {
|
||||
const services = startGatewayRuntimeServices({
|
||||
minimalTestGateway: false,
|
||||
cfgAtStart: {} as never,
|
||||
@@ -75,6 +75,7 @@ describe("server-runtime-services", () => {
|
||||
});
|
||||
|
||||
expect(hoisted.startChannelHealthMonitor).toHaveBeenCalledTimes(1);
|
||||
await vi.dynamicImportSettled();
|
||||
expect(hoisted.startGatewayModelPricingRefresh).toHaveBeenCalledWith({ config: {} });
|
||||
expect(hoisted.startHeartbeatRunner).not.toHaveBeenCalled();
|
||||
expect(hoisted.recoverPendingDeliveries).not.toHaveBeenCalled();
|
||||
@@ -83,7 +84,7 @@ describe("server-runtime-services", () => {
|
||||
expect(hoisted.heartbeatRunner.stop).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("passes startup plugin lookup metadata to the initial pricing refresh", () => {
|
||||
it("passes startup plugin lookup metadata to the initial pricing refresh", async () => {
|
||||
const pluginLookUpTable = {
|
||||
index: { plugins: [] },
|
||||
manifestRegistry: { plugins: [], diagnostics: [] },
|
||||
@@ -101,12 +102,31 @@ describe("server-runtime-services", () => {
|
||||
pluginLookUpTable: pluginLookUpTable as never,
|
||||
});
|
||||
|
||||
await vi.dynamicImportSettled();
|
||||
expect(hoisted.startGatewayModelPricingRefresh).toHaveBeenCalledWith({
|
||||
config: {},
|
||||
pluginLookUpTable,
|
||||
});
|
||||
});
|
||||
|
||||
it("does not start model pricing refresh after early stop", async () => {
|
||||
const services = startGatewayRuntimeServices({
|
||||
minimalTestGateway: false,
|
||||
cfgAtStart: {} as never,
|
||||
channelManager: {
|
||||
getRuntimeSnapshot: vi.fn(),
|
||||
isHealthMonitorEnabled: vi.fn(),
|
||||
isManuallyStopped: vi.fn(),
|
||||
} as never,
|
||||
log: createLog(),
|
||||
});
|
||||
|
||||
services.stopModelPricingRefresh();
|
||||
await vi.dynamicImportSettled();
|
||||
|
||||
expect(hoisted.startGatewayModelPricingRefresh).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("activates heartbeat, cron, and delivery recovery after sidecars are ready", async () => {
|
||||
vi.useFakeTimers();
|
||||
const cron = { start: vi.fn(async () => undefined) };
|
||||
|
||||
@@ -4,7 +4,6 @@ import { startHeartbeatRunner, type HeartbeatRunner } from "../infra/heartbeat-r
|
||||
import type { PluginLookUpTable } from "../plugins/plugin-lookup-table.js";
|
||||
import type { ChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
import { startChannelHealthMonitor } from "./channel-health-monitor.js";
|
||||
import { startGatewayModelPricingRefresh } from "./model-pricing-cache.js";
|
||||
|
||||
type GatewayRuntimeServiceLogger = {
|
||||
child: (name: string) => {
|
||||
@@ -89,6 +88,34 @@ function recoverPendingSessionDeliveries(params: {
|
||||
timer.unref?.();
|
||||
}
|
||||
|
||||
function startGatewayModelPricingRefreshOnDemand(params: {
|
||||
config: OpenClawConfig;
|
||||
pluginLookUpTable?: Pick<PluginLookUpTable, "index" | "manifestRegistry">;
|
||||
log: GatewayRuntimeServiceLogger;
|
||||
}): () => void {
|
||||
let stopped = false;
|
||||
let stopRefresh: (() => void) | undefined;
|
||||
void (async () => {
|
||||
const { startGatewayModelPricingRefresh } = await import("./model-pricing-cache.js");
|
||||
if (stopped) {
|
||||
return;
|
||||
}
|
||||
stopRefresh = startGatewayModelPricingRefresh({
|
||||
config: params.config,
|
||||
...(params.pluginLookUpTable ? { pluginLookUpTable: params.pluginLookUpTable } : {}),
|
||||
});
|
||||
if (stopped) {
|
||||
stopRefresh();
|
||||
stopRefresh = undefined;
|
||||
}
|
||||
})().catch((err) => params.log.error(`Model pricing refresh failed to start: ${String(err)}`));
|
||||
return () => {
|
||||
stopped = true;
|
||||
stopRefresh?.();
|
||||
stopRefresh = undefined;
|
||||
};
|
||||
}
|
||||
|
||||
export function startGatewayRuntimeServices(params: {
|
||||
minimalTestGateway: boolean;
|
||||
cfgAtStart: OpenClawConfig;
|
||||
@@ -110,9 +137,10 @@ export function startGatewayRuntimeServices(params: {
|
||||
channelHealthMonitor,
|
||||
stopModelPricingRefresh:
|
||||
!params.minimalTestGateway && !isVitestRuntimeEnv()
|
||||
? startGatewayModelPricingRefresh({
|
||||
? startGatewayModelPricingRefreshOnDemand({
|
||||
config: params.cfgAtStart,
|
||||
...(params.pluginLookUpTable ? { pluginLookUpTable: params.pluginLookUpTable } : {}),
|
||||
log: params.log,
|
||||
})
|
||||
: () => {},
|
||||
};
|
||||
|
||||
@@ -1,7 +1,24 @@
|
||||
export { truncateCloseReason } from "./server/close-reason.js";
|
||||
export type { GatewayServer, GatewayServerOptions } from "./server.impl.js";
|
||||
|
||||
function emitStartupTrace(name: string, durationMs: number, totalMs: number): void {
|
||||
if (!process.env.OPENCLAW_GATEWAY_STARTUP_TRACE) {
|
||||
return;
|
||||
}
|
||||
process.stderr.write(
|
||||
`[gateway] startup trace: ${name} ${durationMs.toFixed(1)}ms total=${totalMs.toFixed(1)}ms\n`,
|
||||
);
|
||||
}
|
||||
|
||||
async function loadServerImpl() {
|
||||
return await import("./server.impl.js");
|
||||
const startupStartedAt = performance.now();
|
||||
const before = performance.now();
|
||||
try {
|
||||
return await import("./server.impl.js");
|
||||
} finally {
|
||||
const now = performance.now();
|
||||
emitStartupTrace("gateway.server-impl-import", now - before, now - startupStartedAt);
|
||||
}
|
||||
}
|
||||
|
||||
export async function startGatewayServer(
|
||||
|
||||
Reference in New Issue
Block a user