From 0b1fbeabed8e4b03c542fb4ca81e00267fb4bcde Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 3 May 2026 13:30:18 +0100 Subject: [PATCH] perf(gateway): defer cron and sentinel startup work --- src/gateway/server-cron-lazy.test.ts | 117 ++++++++++++++++++ src/gateway/server-cron-lazy.ts | 100 +++++++++++++++ src/gateway/server-cron.ts | 3 +- src/gateway/server-import-boundary.test.ts | 22 ++++ src/gateway/server-runtime-services.test.ts | 23 ++++ src/gateway/server-runtime-services.ts | 11 +- .../server-startup-post-attach.test.ts | 28 +++++ src/gateway/server-startup-post-attach.ts | 34 +++-- src/gateway/server.impl.ts | 12 +- 9 files changed, 335 insertions(+), 15 deletions(-) create mode 100644 src/gateway/server-cron-lazy.test.ts create mode 100644 src/gateway/server-cron-lazy.ts create mode 100644 src/gateway/server-import-boundary.test.ts diff --git a/src/gateway/server-cron-lazy.test.ts b/src/gateway/server-cron-lazy.test.ts new file mode 100644 index 00000000000..d6c5286c905 --- /dev/null +++ b/src/gateway/server-cron-lazy.test.ts @@ -0,0 +1,117 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { CliDeps } from "../cli/deps.types.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { CronServiceContract } from "../cron/service-contract.js"; +import type { GatewayCronState } from "./server-cron.js"; + +const hoisted = vi.hoisted(() => { + let state: unknown; + return { + buildGatewayCronService: vi.fn(() => state), + setState(nextState: unknown) { + state = nextState; + }, + }; +}); + +vi.mock("./server-cron.js", () => ({ + buildGatewayCronService: hoisted.buildGatewayCronService, +})); + +const { createLazyGatewayCronState } = await import("./server-cron-lazy.js"); + +describe("createLazyGatewayCronState", () => { + beforeEach(() => { + vi.unstubAllEnvs(); + hoisted.buildGatewayCronService.mockClear(); + }); + + it("does not build the heavy cron service until an async cron operation needs it", async () => { + const cron = createCronService(); + const state = createCronState(cron); + hoisted.setState(state); + + const lazy = createLazyGatewayCronState(createParams()); + + expect(hoisted.buildGatewayCronService).not.toHaveBeenCalled(); + expect(lazy.cron.getJob("demo")).toBeUndefined(); + expect(lazy.cron.getDefaultAgentId()).toBeUndefined(); + + await lazy.cron.status(); + + expect(hoisted.buildGatewayCronService).toHaveBeenCalledTimes(1); + expect(cron.status).toHaveBeenCalledTimes(1); + }); + + it("starts the loaded cron service once", async () => { + const cron = createCronService(); + hoisted.setState(createCronState(cron)); + + const lazy = createLazyGatewayCronState(createParams()); + + await lazy.cron.start(); + await lazy.cron.start(); + + expect(hoisted.buildGatewayCronService).toHaveBeenCalledTimes(1); + expect(cron.start).toHaveBeenCalledTimes(1); + }); + + it("keeps synchronous wake non-blocking before the cron service is loaded", async () => { + const cron = createCronService(); + hoisted.setState(createCronState(cron)); + + const lazy = createLazyGatewayCronState(createParams()); + + expect(lazy.cron.wake({ mode: "now", text: "ping" })).toEqual({ ok: false }); + + await vi.waitFor(() => { + expect(hoisted.buildGatewayCronService).toHaveBeenCalledTimes(1); + }); + expect(cron.wake).not.toHaveBeenCalled(); + }); + + it("preserves the startup cron enabled flag without loading cron runtime", () => { + vi.stubEnv("OPENCLAW_SKIP_CRON", "1"); + + const lazy = createLazyGatewayCronState(createParams()); + + expect(lazy.cronEnabled).toBe(false); + expect(hoisted.buildGatewayCronService).not.toHaveBeenCalled(); + }); +}); + +function createParams(overrides: Partial = {}) { + return { + cfg: { + ...overrides, + } as OpenClawConfig, + deps: {} as CliDeps, + broadcast: vi.fn(), + }; +} + +function createCronState(cron: CronServiceContract): GatewayCronState { + return { + cron, + storePath: "/tmp/openclaw-cron.json", + cronEnabled: true, + } as GatewayCronState; +} + +function createCronService(): CronServiceContract { + return { + start: vi.fn(async () => undefined), + stop: vi.fn(), + status: vi.fn(async () => ({ enabled: true }) as never), + list: vi.fn(async () => [] as never), + listPage: vi.fn(async () => ({ items: [], total: 0 }) as never), + add: vi.fn(async () => ({ ok: true }) as never), + update: vi.fn(async () => ({ ok: true }) as never), + remove: vi.fn(async () => ({ ok: true }) as never), + run: vi.fn(async () => ({ ok: true, ran: false, reason: "invalid-spec" }) as never), + enqueueRun: vi.fn(async () => ({ ok: true, ran: false, reason: "invalid-spec" }) as never), + getJob: vi.fn(() => undefined), + getDefaultAgentId: vi.fn(() => "default"), + wake: vi.fn(() => ({ ok: true })), + }; +} diff --git a/src/gateway/server-cron-lazy.ts b/src/gateway/server-cron-lazy.ts new file mode 100644 index 00000000000..ec416341593 --- /dev/null +++ b/src/gateway/server-cron-lazy.ts @@ -0,0 +1,100 @@ +import type { CliDeps } from "../cli/deps.types.js"; +import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { CronServiceContract } from "../cron/service-contract.js"; +import { resolveCronStorePath } from "../cron/store.js"; +import type { GatewayCronState } from "./server-cron.js"; + +type LazyGatewayCronParams = { + cfg: OpenClawConfig; + deps: CliDeps; + broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void; +}; + +type LoadedGatewayCronState = { + state: GatewayCronState; + started: boolean; +}; + +export function createLazyGatewayCronState(params: LazyGatewayCronParams): GatewayCronState { + const storePath = resolveCronStorePath(params.cfg.cron?.store); + const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false; + let loaded: LoadedGatewayCronState | null = null; + let loading: Promise | null = null; + + const load = async (): Promise => { + if (loaded) { + return loaded; + } + loading ??= import("./server-cron.js").then(({ buildGatewayCronService }) => { + loaded = { + state: buildGatewayCronService(params), + started: false, + }; + return loaded; + }); + return await loading; + }; + + const cron: CronServiceContract = { + async start() { + const resolved = await load(); + if (resolved.started) { + return; + } + resolved.started = true; + await resolved.state.cron.start(); + }, + stop() { + loaded?.state.cron.stop(); + }, + async status() { + return await (await load()).state.cron.status(); + }, + async list(opts) { + return await (await load()).state.cron.list(opts); + }, + async listPage(opts) { + return await (await load()).state.cron.listPage(opts); + }, + async add(input) { + return await (await load()).state.cron.add(input); + }, + async update(id, patch) { + return await (await load()).state.cron.update(id, patch); + }, + async remove(id) { + return await (await load()).state.cron.remove(id); + }, + async run(id, mode) { + return await (await load()).state.cron.run(id, mode); + }, + async enqueueRun(id, mode) { + return await (await load()).state.cron.enqueueRun(id, mode); + }, + getJob(id) { + if (!loaded) { + return undefined; + } + return loaded.state.cron.getJob(id); + }, + getDefaultAgentId() { + if (!loaded) { + return undefined; + } + return loaded.state.cron.getDefaultAgentId(); + }, + wake(opts) { + if (!loaded) { + void load(); + return { ok: false }; + } + return loaded.state.cron.wake(opts); + }, + }; + + return { + cron, + storePath, + cronEnabled, + }; +} diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 9e4f4eaae2a..b06ce9ec0da 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -16,6 +16,7 @@ import { resolveCronRunLogPath, resolveCronRunLogPruneOptions, } from "../cron/run-log.js"; +import type { CronServiceContract } from "../cron/service-contract.js"; import { CronService } from "../cron/service.js"; import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js"; import { resolveCronStorePath } from "../cron/store.js"; @@ -40,7 +41,7 @@ import { } from "./server-cron-notifications.js"; export type GatewayCronState = { - cron: CronService; + cron: CronServiceContract; storePath: string; cronEnabled: boolean; }; diff --git a/src/gateway/server-import-boundary.test.ts b/src/gateway/server-import-boundary.test.ts new file mode 100644 index 00000000000..1f005d69e55 --- /dev/null +++ b/src/gateway/server-import-boundary.test.ts @@ -0,0 +1,22 @@ +import { readFileSync } from "node:fs"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; + +const repoRoot = path.resolve(import.meta.dirname, "../.."); + +function readSource(relativePath: string): string { + return readFileSync(path.join(repoRoot, relativePath), "utf8"); +} + +describe("gateway startup import boundaries", () => { + it("keeps heavy cron and doctor legacy paths out of the server.impl import graph", () => { + const serverImpl = readSource("src/gateway/server.impl.ts"); + const validation = readSource("src/config/validation.ts"); + + expect(serverImpl).not.toContain('from "./server-cron.js"'); + expect(serverImpl).toContain('from "./server-cron-lazy.js"'); + expect(serverImpl).not.toContain('from "./server-methods.js"'); + expect(validation).not.toContain("legacy-secretref-env-marker"); + expect(validation).not.toContain("commands/doctor"); + }); +}); diff --git a/src/gateway/server-runtime-services.test.ts b/src/gateway/server-runtime-services.test.ts index ef3204381f1..1c1b81f010e 100644 --- a/src/gateway/server-runtime-services.test.ts +++ b/src/gateway/server-runtime-services.test.ts @@ -194,6 +194,29 @@ describe("server-runtime-services", () => { ); }); + it("can defer cron startup while activating other scheduled services", async () => { + vi.useFakeTimers(); + const cron = { start: vi.fn(async () => undefined) }; + const log = createLog(); + + activateGatewayScheduledServices({ + minimalTestGateway: false, + cfgAtStart: {} as never, + deps: {} as never, + sessionDeliveryRecoveryMaxEnqueuedAt: 123, + cron, + startCron: false, + logCron: { error: vi.fn() }, + log, + }); + + expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1); + expect(cron.start).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(1_250); + await vi.dynamicImportSettled(); + expect(hoisted.recoverPendingDeliveries).toHaveBeenCalledTimes(1); + }); + it("keeps scheduled services disabled for minimal test gateways", () => { const cron = { start: vi.fn(async () => undefined) }; diff --git a/src/gateway/server-runtime-services.ts b/src/gateway/server-runtime-services.ts index 96eb22be89f..df927da8b9c 100644 --- a/src/gateway/server-runtime-services.ts +++ b/src/gateway/server-runtime-services.ts @@ -148,6 +148,7 @@ export function activateGatewayScheduledServices(params: { deps: import("../cli/deps.types.js").CliDeps; sessionDeliveryRecoveryMaxEnqueuedAt: number; cron: { start: () => Promise }; + startCron?: boolean; logCron: { error: (message: string) => void }; log: GatewayRuntimeServiceLogger; pluginLookUpTable?: PluginMetadataRegistryView; @@ -156,10 +157,12 @@ export function activateGatewayScheduledServices(params: { return { heartbeatRunner: createNoopHeartbeatRunner(), stopModelPricingRefresh: () => {} }; } const heartbeatRunner = startHeartbeatRunner({ cfg: params.cfgAtStart }); - startGatewayCronWithLogging({ - cron: params.cron, - logCron: params.logCron, - }); + if (params.startCron !== false) { + startGatewayCronWithLogging({ + cron: params.cron, + logCron: params.logCron, + }); + } recoverPendingOutboundDeliveries({ cfg: params.cfgAtStart, log: params.log, diff --git a/src/gateway/server-startup-post-attach.test.ts b/src/gateway/server-startup-post-attach.test.ts index 0ead2cc92e1..0afd7622e69 100644 --- a/src/gateway/server-startup-post-attach.test.ts +++ b/src/gateway/server-startup-post-attach.test.ts @@ -253,6 +253,34 @@ describe("startGatewayPostAttachRuntime", () => { expect(hoisted.startGatewayMemoryBackend).not.toHaveBeenCalled(); }); + it("refreshes the restart sentinel after sidecars without blocking post-attach", async () => { + const events: string[] = []; + const refreshLatestUpdateRestartSentinel = vi.fn(async () => { + events.push("sentinel"); + return null; + }); + const startGatewaySidecars = vi.fn(async () => { + events.push("sidecars"); + return { pluginServices: null }; + }); + + await startGatewayPostAttachRuntime( + createPostAttachParams(), + createPostAttachRuntimeDeps({ + refreshLatestUpdateRestartSentinel, + startGatewaySidecars, + }), + ); + + events.push("returned"); + expect(refreshLatestUpdateRestartSentinel).not.toHaveBeenCalled(); + + await vi.waitFor(() => { + expect(refreshLatestUpdateRestartSentinel).toHaveBeenCalledTimes(1); + }); + expect(events).toEqual(["sidecars", "returned", "sentinel"]); + }); + it("loads deferred startup plugins before channel sidecars", async () => { const events: string[] = []; const loadedPluginRegistry = { diff --git a/src/gateway/server-startup-post-attach.ts b/src/gateway/server-startup-post-attach.ts index 9669dbd40d7..a5738f47f0b 100644 --- a/src/gateway/server-startup-post-attach.ts +++ b/src/gateway/server-startup-post-attach.ts @@ -103,6 +103,27 @@ function scheduleGatewayMemoryBackend(params: { timer.unref?.(); } +function schedulePostAttachUpdateSentinelRefresh(params: { + startupTrace?: GatewayStartupTrace; + log: { warn: (msg: string) => void }; + refreshLatestUpdateRestartSentinel: () => Awaitable< + ReturnType + >; +}): void { + const handle = setImmediate(() => { + void measureStartup(params.startupTrace, "post-attach.update-sentinel", async () => { + try { + await params.refreshLatestUpdateRestartSentinel(); + } catch (err) { + params.log.warn(`restart sentinel refresh failed: ${String(err)}`); + } + }).catch((err) => { + params.log.warn(`restart sentinel refresh failed: ${String(err)}`); + }); + }); + handle.unref?.(); +} + function hasGatewayStartHooks(pluginRegistry: ReturnType): boolean { return pluginRegistry.typedHooks.some((hook) => hook.hookName === "gateway_start"); } @@ -597,14 +618,6 @@ export async function startGatewayPostAttachRuntime( }, runtimeDeps: GatewayPostAttachRuntimeDeps = defaultGatewayPostAttachRuntimeDeps, ) { - await measureStartup(params.startupTrace, "post-attach.update-sentinel", async () => { - try { - await runtimeDeps.refreshLatestUpdateRestartSentinel(); - } catch (err) { - params.log.warn(`restart sentinel refresh failed: ${String(err)}`); - } - }); - let pluginRegistry = params.pluginRegistry; if (!params.minimalTestGateway && params.loadStartupPlugins) { params.onStartupPluginsLoading?.(); @@ -691,6 +704,11 @@ export async function startGatewayPostAttachRuntime( if (params.minimalTestGateway) { return; } + schedulePostAttachUpdateSentinelRefresh({ + startupTrace: params.startupTrace, + log: params.log, + refreshLatestUpdateRestartSentinel: runtimeDeps.refreshLatestUpdateRestartSentinel, + }); if (!hasGatewayStartHooks(sidecarsResult.pluginRegistry)) { return; } diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 06176cdd08e..af00deb2cb1 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -68,7 +68,7 @@ import { import { createGatewayAuxHandlers } from "./server-aux-handlers.js"; import { createChannelManager } from "./server-channels.js"; import { resolveGatewayControlUiRootState } from "./server-control-ui-root.js"; -import { buildGatewayCronService } from "./server-cron.js"; +import { createLazyGatewayCronState } from "./server-cron-lazy.js"; import { applyGatewayLaneConcurrency } from "./server-lanes.js"; import { createGatewayServerLiveState, type GatewayServerLiveState } from "./server-live-state.js"; import { GATEWAY_EVENTS } from "./server-methods-list.js"; @@ -81,6 +81,7 @@ import { createGatewayRequestContext } from "./server-request-context.js"; import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js"; import { activateGatewayScheduledServices, + startGatewayCronWithLogging, startGatewayRuntimeServices, } from "./server-runtime-services.js"; import { createGatewayRuntimeState } from "./server-runtime-state.js"; @@ -835,7 +836,7 @@ export async function startGatewayServer( runtimeState = createGatewayServerLiveState({ hooksConfig: initialHooksConfig, hookClientIpConfig: initialHookClientIpConfig, - cronState: buildGatewayCronService({ + cronState: createLazyGatewayCronState({ cfg: cfgAtStart, deps, broadcast, @@ -1306,6 +1307,7 @@ export async function startGatewayServer( deps, sessionDeliveryRecoveryMaxEnqueuedAt, cron: runtimeState.cronState.cron, + startCron: false, logCron, log, pluginLookUpTable, @@ -1423,6 +1425,12 @@ export async function startGatewayServer( await promoteConfigSnapshotToLastKnownGood(startupLastGoodSnapshot).catch((err) => { log.warn(`gateway: failed to promote config last-known-good backup: ${String(err)}`); }); + if (!minimalTestGateway) { + startGatewayCronWithLogging({ + cron: runtimeState.cronState.cron, + logCron, + }); + } } catch (err) { await closeOnStartupFailure(); throw err;