mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:40:44 +00:00
perf(gateway): defer cron and sentinel startup work
This commit is contained in:
117
src/gateway/server-cron-lazy.test.ts
Normal file
117
src/gateway/server-cron-lazy.test.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import type { CronServiceContract } from "../cron/service-contract.js";
|
||||
import type { GatewayCronState } from "./server-cron.js";
|
||||
|
||||
const hoisted = vi.hoisted(() => {
|
||||
let state: unknown;
|
||||
return {
|
||||
buildGatewayCronService: vi.fn(() => state),
|
||||
setState(nextState: unknown) {
|
||||
state = nextState;
|
||||
},
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("./server-cron.js", () => ({
|
||||
buildGatewayCronService: hoisted.buildGatewayCronService,
|
||||
}));
|
||||
|
||||
const { createLazyGatewayCronState } = await import("./server-cron-lazy.js");
|
||||
|
||||
describe("createLazyGatewayCronState", () => {
|
||||
beforeEach(() => {
|
||||
vi.unstubAllEnvs();
|
||||
hoisted.buildGatewayCronService.mockClear();
|
||||
});
|
||||
|
||||
it("does not build the heavy cron service until an async cron operation needs it", async () => {
|
||||
const cron = createCronService();
|
||||
const state = createCronState(cron);
|
||||
hoisted.setState(state);
|
||||
|
||||
const lazy = createLazyGatewayCronState(createParams());
|
||||
|
||||
expect(hoisted.buildGatewayCronService).not.toHaveBeenCalled();
|
||||
expect(lazy.cron.getJob("demo")).toBeUndefined();
|
||||
expect(lazy.cron.getDefaultAgentId()).toBeUndefined();
|
||||
|
||||
await lazy.cron.status();
|
||||
|
||||
expect(hoisted.buildGatewayCronService).toHaveBeenCalledTimes(1);
|
||||
expect(cron.status).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("starts the loaded cron service once", async () => {
|
||||
const cron = createCronService();
|
||||
hoisted.setState(createCronState(cron));
|
||||
|
||||
const lazy = createLazyGatewayCronState(createParams());
|
||||
|
||||
await lazy.cron.start();
|
||||
await lazy.cron.start();
|
||||
|
||||
expect(hoisted.buildGatewayCronService).toHaveBeenCalledTimes(1);
|
||||
expect(cron.start).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("keeps synchronous wake non-blocking before the cron service is loaded", async () => {
|
||||
const cron = createCronService();
|
||||
hoisted.setState(createCronState(cron));
|
||||
|
||||
const lazy = createLazyGatewayCronState(createParams());
|
||||
|
||||
expect(lazy.cron.wake({ mode: "now", text: "ping" })).toEqual({ ok: false });
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(hoisted.buildGatewayCronService).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(cron.wake).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("preserves the startup cron enabled flag without loading cron runtime", () => {
|
||||
vi.stubEnv("OPENCLAW_SKIP_CRON", "1");
|
||||
|
||||
const lazy = createLazyGatewayCronState(createParams());
|
||||
|
||||
expect(lazy.cronEnabled).toBe(false);
|
||||
expect(hoisted.buildGatewayCronService).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
function createParams(overrides: Partial<OpenClawConfig> = {}) {
|
||||
return {
|
||||
cfg: {
|
||||
...overrides,
|
||||
} as OpenClawConfig,
|
||||
deps: {} as CliDeps,
|
||||
broadcast: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createCronState(cron: CronServiceContract): GatewayCronState {
|
||||
return {
|
||||
cron,
|
||||
storePath: "/tmp/openclaw-cron.json",
|
||||
cronEnabled: true,
|
||||
} as GatewayCronState;
|
||||
}
|
||||
|
||||
function createCronService(): CronServiceContract {
|
||||
return {
|
||||
start: vi.fn(async () => undefined),
|
||||
stop: vi.fn(),
|
||||
status: vi.fn(async () => ({ enabled: true }) as never),
|
||||
list: vi.fn(async () => [] as never),
|
||||
listPage: vi.fn(async () => ({ items: [], total: 0 }) as never),
|
||||
add: vi.fn(async () => ({ ok: true }) as never),
|
||||
update: vi.fn(async () => ({ ok: true }) as never),
|
||||
remove: vi.fn(async () => ({ ok: true }) as never),
|
||||
run: vi.fn(async () => ({ ok: true, ran: false, reason: "invalid-spec" }) as never),
|
||||
enqueueRun: vi.fn(async () => ({ ok: true, ran: false, reason: "invalid-spec" }) as never),
|
||||
getJob: vi.fn(() => undefined),
|
||||
getDefaultAgentId: vi.fn(() => "default"),
|
||||
wake: vi.fn(() => ({ ok: true })),
|
||||
};
|
||||
}
|
||||
100
src/gateway/server-cron-lazy.ts
Normal file
100
src/gateway/server-cron-lazy.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
import type { CliDeps } from "../cli/deps.types.js";
|
||||
import type { OpenClawConfig } from "../config/types.openclaw.js";
|
||||
import type { CronServiceContract } from "../cron/service-contract.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
import type { GatewayCronState } from "./server-cron.js";
|
||||
|
||||
type LazyGatewayCronParams = {
|
||||
cfg: OpenClawConfig;
|
||||
deps: CliDeps;
|
||||
broadcast: (event: string, payload: unknown, opts?: { dropIfSlow?: boolean }) => void;
|
||||
};
|
||||
|
||||
type LoadedGatewayCronState = {
|
||||
state: GatewayCronState;
|
||||
started: boolean;
|
||||
};
|
||||
|
||||
export function createLazyGatewayCronState(params: LazyGatewayCronParams): GatewayCronState {
|
||||
const storePath = resolveCronStorePath(params.cfg.cron?.store);
|
||||
const cronEnabled = process.env.OPENCLAW_SKIP_CRON !== "1" && params.cfg.cron?.enabled !== false;
|
||||
let loaded: LoadedGatewayCronState | null = null;
|
||||
let loading: Promise<LoadedGatewayCronState> | null = null;
|
||||
|
||||
const load = async (): Promise<LoadedGatewayCronState> => {
|
||||
if (loaded) {
|
||||
return loaded;
|
||||
}
|
||||
loading ??= import("./server-cron.js").then(({ buildGatewayCronService }) => {
|
||||
loaded = {
|
||||
state: buildGatewayCronService(params),
|
||||
started: false,
|
||||
};
|
||||
return loaded;
|
||||
});
|
||||
return await loading;
|
||||
};
|
||||
|
||||
const cron: CronServiceContract = {
|
||||
async start() {
|
||||
const resolved = await load();
|
||||
if (resolved.started) {
|
||||
return;
|
||||
}
|
||||
resolved.started = true;
|
||||
await resolved.state.cron.start();
|
||||
},
|
||||
stop() {
|
||||
loaded?.state.cron.stop();
|
||||
},
|
||||
async status() {
|
||||
return await (await load()).state.cron.status();
|
||||
},
|
||||
async list(opts) {
|
||||
return await (await load()).state.cron.list(opts);
|
||||
},
|
||||
async listPage(opts) {
|
||||
return await (await load()).state.cron.listPage(opts);
|
||||
},
|
||||
async add(input) {
|
||||
return await (await load()).state.cron.add(input);
|
||||
},
|
||||
async update(id, patch) {
|
||||
return await (await load()).state.cron.update(id, patch);
|
||||
},
|
||||
async remove(id) {
|
||||
return await (await load()).state.cron.remove(id);
|
||||
},
|
||||
async run(id, mode) {
|
||||
return await (await load()).state.cron.run(id, mode);
|
||||
},
|
||||
async enqueueRun(id, mode) {
|
||||
return await (await load()).state.cron.enqueueRun(id, mode);
|
||||
},
|
||||
getJob(id) {
|
||||
if (!loaded) {
|
||||
return undefined;
|
||||
}
|
||||
return loaded.state.cron.getJob(id);
|
||||
},
|
||||
getDefaultAgentId() {
|
||||
if (!loaded) {
|
||||
return undefined;
|
||||
}
|
||||
return loaded.state.cron.getDefaultAgentId();
|
||||
},
|
||||
wake(opts) {
|
||||
if (!loaded) {
|
||||
void load();
|
||||
return { ok: false };
|
||||
}
|
||||
return loaded.state.cron.wake(opts);
|
||||
},
|
||||
};
|
||||
|
||||
return {
|
||||
cron,
|
||||
storePath,
|
||||
cronEnabled,
|
||||
};
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
resolveCronRunLogPath,
|
||||
resolveCronRunLogPruneOptions,
|
||||
} from "../cron/run-log.js";
|
||||
import type { CronServiceContract } from "../cron/service-contract.js";
|
||||
import { CronService } from "../cron/service.js";
|
||||
import { resolveCronSessionTargetSessionKey } from "../cron/session-target.js";
|
||||
import { resolveCronStorePath } from "../cron/store.js";
|
||||
@@ -40,7 +41,7 @@ import {
|
||||
} from "./server-cron-notifications.js";
|
||||
|
||||
export type GatewayCronState = {
|
||||
cron: CronService;
|
||||
cron: CronServiceContract;
|
||||
storePath: string;
|
||||
cronEnabled: boolean;
|
||||
};
|
||||
|
||||
22
src/gateway/server-import-boundary.test.ts
Normal file
22
src/gateway/server-import-boundary.test.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { readFileSync } from "node:fs";
|
||||
import path from "node:path";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
const repoRoot = path.resolve(import.meta.dirname, "../..");
|
||||
|
||||
function readSource(relativePath: string): string {
|
||||
return readFileSync(path.join(repoRoot, relativePath), "utf8");
|
||||
}
|
||||
|
||||
describe("gateway startup import boundaries", () => {
|
||||
it("keeps heavy cron and doctor legacy paths out of the server.impl import graph", () => {
|
||||
const serverImpl = readSource("src/gateway/server.impl.ts");
|
||||
const validation = readSource("src/config/validation.ts");
|
||||
|
||||
expect(serverImpl).not.toContain('from "./server-cron.js"');
|
||||
expect(serverImpl).toContain('from "./server-cron-lazy.js"');
|
||||
expect(serverImpl).not.toContain('from "./server-methods.js"');
|
||||
expect(validation).not.toContain("legacy-secretref-env-marker");
|
||||
expect(validation).not.toContain("commands/doctor");
|
||||
});
|
||||
});
|
||||
@@ -194,6 +194,29 @@ describe("server-runtime-services", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("can defer cron startup while activating other scheduled services", async () => {
|
||||
vi.useFakeTimers();
|
||||
const cron = { start: vi.fn(async () => undefined) };
|
||||
const log = createLog();
|
||||
|
||||
activateGatewayScheduledServices({
|
||||
minimalTestGateway: false,
|
||||
cfgAtStart: {} as never,
|
||||
deps: {} as never,
|
||||
sessionDeliveryRecoveryMaxEnqueuedAt: 123,
|
||||
cron,
|
||||
startCron: false,
|
||||
logCron: { error: vi.fn() },
|
||||
log,
|
||||
});
|
||||
|
||||
expect(hoisted.startHeartbeatRunner).toHaveBeenCalledTimes(1);
|
||||
expect(cron.start).not.toHaveBeenCalled();
|
||||
await vi.advanceTimersByTimeAsync(1_250);
|
||||
await vi.dynamicImportSettled();
|
||||
expect(hoisted.recoverPendingDeliveries).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("keeps scheduled services disabled for minimal test gateways", () => {
|
||||
const cron = { start: vi.fn(async () => undefined) };
|
||||
|
||||
|
||||
@@ -148,6 +148,7 @@ export function activateGatewayScheduledServices(params: {
|
||||
deps: import("../cli/deps.types.js").CliDeps;
|
||||
sessionDeliveryRecoveryMaxEnqueuedAt: number;
|
||||
cron: { start: () => Promise<void> };
|
||||
startCron?: boolean;
|
||||
logCron: { error: (message: string) => void };
|
||||
log: GatewayRuntimeServiceLogger;
|
||||
pluginLookUpTable?: PluginMetadataRegistryView;
|
||||
@@ -156,10 +157,12 @@ export function activateGatewayScheduledServices(params: {
|
||||
return { heartbeatRunner: createNoopHeartbeatRunner(), stopModelPricingRefresh: () => {} };
|
||||
}
|
||||
const heartbeatRunner = startHeartbeatRunner({ cfg: params.cfgAtStart });
|
||||
startGatewayCronWithLogging({
|
||||
cron: params.cron,
|
||||
logCron: params.logCron,
|
||||
});
|
||||
if (params.startCron !== false) {
|
||||
startGatewayCronWithLogging({
|
||||
cron: params.cron,
|
||||
logCron: params.logCron,
|
||||
});
|
||||
}
|
||||
recoverPendingOutboundDeliveries({
|
||||
cfg: params.cfgAtStart,
|
||||
log: params.log,
|
||||
|
||||
@@ -253,6 +253,34 @@ describe("startGatewayPostAttachRuntime", () => {
|
||||
expect(hoisted.startGatewayMemoryBackend).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("refreshes the restart sentinel after sidecars without blocking post-attach", async () => {
|
||||
const events: string[] = [];
|
||||
const refreshLatestUpdateRestartSentinel = vi.fn(async () => {
|
||||
events.push("sentinel");
|
||||
return null;
|
||||
});
|
||||
const startGatewaySidecars = vi.fn(async () => {
|
||||
events.push("sidecars");
|
||||
return { pluginServices: null };
|
||||
});
|
||||
|
||||
await startGatewayPostAttachRuntime(
|
||||
createPostAttachParams(),
|
||||
createPostAttachRuntimeDeps({
|
||||
refreshLatestUpdateRestartSentinel,
|
||||
startGatewaySidecars,
|
||||
}),
|
||||
);
|
||||
|
||||
events.push("returned");
|
||||
expect(refreshLatestUpdateRestartSentinel).not.toHaveBeenCalled();
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(refreshLatestUpdateRestartSentinel).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(events).toEqual(["sidecars", "returned", "sentinel"]);
|
||||
});
|
||||
|
||||
it("loads deferred startup plugins before channel sidecars", async () => {
|
||||
const events: string[] = [];
|
||||
const loadedPluginRegistry = {
|
||||
|
||||
@@ -103,6 +103,27 @@ function scheduleGatewayMemoryBackend(params: {
|
||||
timer.unref?.();
|
||||
}
|
||||
|
||||
function schedulePostAttachUpdateSentinelRefresh(params: {
|
||||
startupTrace?: GatewayStartupTrace;
|
||||
log: { warn: (msg: string) => void };
|
||||
refreshLatestUpdateRestartSentinel: () => Awaitable<
|
||||
ReturnType<typeof refreshLatestUpdateRestartSentinel>
|
||||
>;
|
||||
}): void {
|
||||
const handle = setImmediate(() => {
|
||||
void measureStartup(params.startupTrace, "post-attach.update-sentinel", async () => {
|
||||
try {
|
||||
await params.refreshLatestUpdateRestartSentinel();
|
||||
} catch (err) {
|
||||
params.log.warn(`restart sentinel refresh failed: ${String(err)}`);
|
||||
}
|
||||
}).catch((err) => {
|
||||
params.log.warn(`restart sentinel refresh failed: ${String(err)}`);
|
||||
});
|
||||
});
|
||||
handle.unref?.();
|
||||
}
|
||||
|
||||
function hasGatewayStartHooks(pluginRegistry: ReturnType<typeof loadOpenClawPlugins>): boolean {
|
||||
return pluginRegistry.typedHooks.some((hook) => hook.hookName === "gateway_start");
|
||||
}
|
||||
@@ -597,14 +618,6 @@ export async function startGatewayPostAttachRuntime(
|
||||
},
|
||||
runtimeDeps: GatewayPostAttachRuntimeDeps = defaultGatewayPostAttachRuntimeDeps,
|
||||
) {
|
||||
await measureStartup(params.startupTrace, "post-attach.update-sentinel", async () => {
|
||||
try {
|
||||
await runtimeDeps.refreshLatestUpdateRestartSentinel();
|
||||
} catch (err) {
|
||||
params.log.warn(`restart sentinel refresh failed: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
|
||||
let pluginRegistry = params.pluginRegistry;
|
||||
if (!params.minimalTestGateway && params.loadStartupPlugins) {
|
||||
params.onStartupPluginsLoading?.();
|
||||
@@ -691,6 +704,11 @@ export async function startGatewayPostAttachRuntime(
|
||||
if (params.minimalTestGateway) {
|
||||
return;
|
||||
}
|
||||
schedulePostAttachUpdateSentinelRefresh({
|
||||
startupTrace: params.startupTrace,
|
||||
log: params.log,
|
||||
refreshLatestUpdateRestartSentinel: runtimeDeps.refreshLatestUpdateRestartSentinel,
|
||||
});
|
||||
if (!hasGatewayStartHooks(sidecarsResult.pluginRegistry)) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ import {
|
||||
import { createGatewayAuxHandlers } from "./server-aux-handlers.js";
|
||||
import { createChannelManager } from "./server-channels.js";
|
||||
import { resolveGatewayControlUiRootState } from "./server-control-ui-root.js";
|
||||
import { buildGatewayCronService } from "./server-cron.js";
|
||||
import { createLazyGatewayCronState } from "./server-cron-lazy.js";
|
||||
import { applyGatewayLaneConcurrency } from "./server-lanes.js";
|
||||
import { createGatewayServerLiveState, type GatewayServerLiveState } from "./server-live-state.js";
|
||||
import { GATEWAY_EVENTS } from "./server-methods-list.js";
|
||||
@@ -81,6 +81,7 @@ import { createGatewayRequestContext } from "./server-request-context.js";
|
||||
import { resolveGatewayRuntimeConfig } from "./server-runtime-config.js";
|
||||
import {
|
||||
activateGatewayScheduledServices,
|
||||
startGatewayCronWithLogging,
|
||||
startGatewayRuntimeServices,
|
||||
} from "./server-runtime-services.js";
|
||||
import { createGatewayRuntimeState } from "./server-runtime-state.js";
|
||||
@@ -835,7 +836,7 @@ export async function startGatewayServer(
|
||||
runtimeState = createGatewayServerLiveState({
|
||||
hooksConfig: initialHooksConfig,
|
||||
hookClientIpConfig: initialHookClientIpConfig,
|
||||
cronState: buildGatewayCronService({
|
||||
cronState: createLazyGatewayCronState({
|
||||
cfg: cfgAtStart,
|
||||
deps,
|
||||
broadcast,
|
||||
@@ -1306,6 +1307,7 @@ export async function startGatewayServer(
|
||||
deps,
|
||||
sessionDeliveryRecoveryMaxEnqueuedAt,
|
||||
cron: runtimeState.cronState.cron,
|
||||
startCron: false,
|
||||
logCron,
|
||||
log,
|
||||
pluginLookUpTable,
|
||||
@@ -1423,6 +1425,12 @@ export async function startGatewayServer(
|
||||
await promoteConfigSnapshotToLastKnownGood(startupLastGoodSnapshot).catch((err) => {
|
||||
log.warn(`gateway: failed to promote config last-known-good backup: ${String(err)}`);
|
||||
});
|
||||
if (!minimalTestGateway) {
|
||||
startGatewayCronWithLogging({
|
||||
cron: runtimeState.cronState.cron,
|
||||
logCron,
|
||||
});
|
||||
}
|
||||
} catch (err) {
|
||||
await closeOnStartupFailure();
|
||||
throw err;
|
||||
|
||||
Reference in New Issue
Block a user