mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:20:43 +00:00
perf: reduce gateway startup readiness latency
This commit is contained in:
@@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Discord/status: let explicit reaction tool calls opt into tracking subsequent tool progress on the reacted message with `trackToolCalls: true`, and use the shared tool display emoji table for status reactions.
|
||||
- Gateway/config: stop Gateway startup and hot reload from auto-restoring invalid config; invalid config now fails closed and `openclaw doctor --fix` owns last-known-good repair.
|
||||
- Gateway/performance: lazy-load early runtime discovery and shutdown-hook helpers, defer maintenance timers until after readiness, and trim duplicate plugin auto-enable work during Gateway startup.
|
||||
- Gateway/performance: defer non-readiness sidecars until after the ready signal, avoid hot-path channel plugin barrel imports, and fast-path trusted bundled plugin metadata during Gateway startup.
|
||||
- QA/Mantis: add a `pnpm openclaw qa mantis discord-smoke` runner and manual GitHub workflow that verify the Mantis Discord bot can see the configured guild/channel, post a smoke message, add a reaction, and upload artifacts.
|
||||
- QA/Mantis: add `pnpm openclaw qa mantis slack-desktop-smoke` to run Slack live QA inside a Crabbox VNC desktop, open Slack Web, and capture desktop screenshots beside the Slack QA artifacts.
|
||||
- QA/Mantis: pass the runtime env through desktop-browser Crabbox and artifact-copy child commands, so embedded Mantis callers can provide Crabbox credentials without mutating the parent process. Thanks @vincentkoc.
|
||||
|
||||
@@ -430,4 +430,4 @@ export const ProtocolSchemas = {
|
||||
ShutdownEvent: ShutdownEventSchema,
|
||||
} satisfies Record<string, TSchema>;
|
||||
|
||||
export const PROTOCOL_VERSION = 3 as const;
|
||||
export { PROTOCOL_VERSION } from "../version.js";
|
||||
|
||||
1
src/gateway/protocol/version.ts
Normal file
1
src/gateway/protocol/version.ts
Normal file
@@ -0,0 +1 @@
|
||||
export const PROTOCOL_VERSION = 3 as const;
|
||||
@@ -1,6 +1,10 @@
|
||||
import { listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { listLoadedChannelPlugins } from "../channels/plugins/registry-loaded.js";
|
||||
import { GATEWAY_EVENT_UPDATE_AVAILABLE } from "./events.js";
|
||||
|
||||
type GatewayMethodChannelPlugin = {
|
||||
gatewayMethods?: readonly string[];
|
||||
};
|
||||
|
||||
const BASE_METHODS = [
|
||||
"health",
|
||||
"diagnostics.stability",
|
||||
@@ -158,7 +162,9 @@ const BASE_METHODS = [
|
||||
];
|
||||
|
||||
export function listGatewayMethods(): string[] {
|
||||
const channelMethods = listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []);
|
||||
const channelMethods = (listLoadedChannelPlugins() as GatewayMethodChannelPlugin[]).flatMap(
|
||||
(plugin) => plugin.gatewayMethods ?? [],
|
||||
);
|
||||
return Array.from(new Set([...BASE_METHODS, ...channelMethods]));
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import { resolveGlobalSingleton } from "../shared/global-singleton.js";
|
||||
import { ADMIN_SCOPE, WRITE_SCOPE } from "./method-scopes.js";
|
||||
import { GATEWAY_CLIENT_IDS, GATEWAY_CLIENT_MODES } from "./protocol/client-info.js";
|
||||
import type { ErrorShape } from "./protocol/index.js";
|
||||
import { PROTOCOL_VERSION } from "./protocol/index.js";
|
||||
import { PROTOCOL_VERSION } from "./protocol/version.js";
|
||||
import type {
|
||||
GatewayRequestContext,
|
||||
GatewayRequestHandler,
|
||||
|
||||
@@ -113,6 +113,7 @@ type GatewayReloadHandlerParams = {
|
||||
logCron: { error: (msg: string) => void };
|
||||
logReload: GatewayReloadLog;
|
||||
createHealthMonitor: (config: OpenClawConfig) => ChannelHealthMonitor | null;
|
||||
onCronRestart?: () => void;
|
||||
};
|
||||
|
||||
type ManagedGatewayConfigReloaderParams = Omit<
|
||||
@@ -310,6 +311,7 @@ export function createGatewayReloadHandlers(params: GatewayReloadHandlerParams)
|
||||
}
|
||||
|
||||
if (plan.restartCron) {
|
||||
params.onCronRestart?.();
|
||||
state.cronState.cron.stop();
|
||||
nextState.cronState = buildGatewayCronService({
|
||||
cfg: nextConfig,
|
||||
@@ -489,6 +491,7 @@ export function startManagedGatewayConfigReloader(params: ManagedGatewayConfigRe
|
||||
logChannels: params.logChannels,
|
||||
logCron: params.logCron,
|
||||
logReload: params.logReload,
|
||||
...(params.onCronRestart ? { onCronRestart: params.onCronRestart } : {}),
|
||||
createHealthMonitor: (config) =>
|
||||
startGatewayChannelHealthMonitor({
|
||||
cfg: config,
|
||||
|
||||
@@ -130,6 +130,20 @@ function schedulePostAttachUpdateSentinelRefresh(params: {
|
||||
handle.unref?.();
|
||||
}
|
||||
|
||||
function schedulePostReadySidecarTask(params: {
|
||||
startupTrace?: GatewayStartupTrace;
|
||||
name: string;
|
||||
log: { warn: (msg: string) => void };
|
||||
run: () => Awaitable<void>;
|
||||
}): void {
|
||||
const handle = setImmediate(() => {
|
||||
void measureStartup(params.startupTrace, params.name, params.run).catch((err) => {
|
||||
params.log.warn(`${params.name} failed after gateway ready: ${String(err)}`);
|
||||
});
|
||||
});
|
||||
handle.unref?.();
|
||||
}
|
||||
|
||||
function resolveRestartSentinelPathFast(env: NodeJS.ProcessEnv = process.env): string {
|
||||
const normalizePathEnv = (value: string | undefined) => {
|
||||
const trimmed = value?.trim();
|
||||
@@ -369,37 +383,6 @@ export async function startGatewaySidecars(params: {
|
||||
logChannels: { info: (msg: string) => void; error: (msg: string) => void };
|
||||
startupTrace?: GatewayStartupTrace;
|
||||
}) {
|
||||
await measureStartup(params.startupTrace, "sidecars.session-locks", async () => {
|
||||
try {
|
||||
const [{ resolveStateDir }, { resolveAgentSessionDirs }, { cleanStaleLockFiles }] =
|
||||
await Promise.all([
|
||||
import("../config/paths.js"),
|
||||
import("../agents/session-dirs.js"),
|
||||
import("../agents/session-write-lock.js"),
|
||||
]);
|
||||
const stateDir = resolveStateDir(process.env);
|
||||
const sessionDirs = await resolveAgentSessionDirs(stateDir);
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
const result = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: SESSION_LOCK_STALE_MS,
|
||||
removeStale: true,
|
||||
log: { warn: (message) => params.log.warn(message) },
|
||||
});
|
||||
if (result.cleaned.length > 0) {
|
||||
const { markRestartAbortedMainSessionsFromLocks } =
|
||||
await import("../agents/main-session-restart-recovery.js");
|
||||
await markRestartAbortedMainSessionsFromLocks({
|
||||
sessionsDir,
|
||||
cleanedLocks: result.cleaned,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`session lock cleanup failed on startup: ${String(err)}`);
|
||||
}
|
||||
});
|
||||
|
||||
await measureStartup(params.startupTrace, "sidecars.gmail-watch", async () => {
|
||||
if (params.cfg.hooks?.enabled && params.cfg.hooks.gmail?.account) {
|
||||
const { startGmailWatcherWithLogs } = await import("../hooks/gmail-watcher-lifecycle.js");
|
||||
@@ -564,33 +547,84 @@ export async function startGatewaySidecars(params: {
|
||||
scheduleGatewayMemoryBackend({ cfg: params.cfg, log: params.log, policy });
|
||||
});
|
||||
|
||||
await measureStartup(params.startupTrace, "sidecars.restart-sentinel", async () => {
|
||||
if (!shouldCheckRestartSentinel()) {
|
||||
return;
|
||||
}
|
||||
if (!hasRestartSentinelFileFast()) {
|
||||
return;
|
||||
}
|
||||
setTimeout(() => {
|
||||
void import("./server-restart-sentinel.js")
|
||||
.then(({ scheduleRestartSentinelWake }) =>
|
||||
scheduleRestartSentinelWake({ deps: params.deps }),
|
||||
)
|
||||
.catch((err) => {
|
||||
params.log.warn(`restart sentinel wake failed to schedule: ${String(err)}`);
|
||||
});
|
||||
}, 750);
|
||||
schedulePostReadySidecarTask({
|
||||
startupTrace: params.startupTrace,
|
||||
name: "sidecars.session-locks",
|
||||
log: params.log,
|
||||
run: async () => {
|
||||
try {
|
||||
const [{ resolveStateDir }, { resolveAgentSessionDirs }, { cleanStaleLockFiles }] =
|
||||
await Promise.all([
|
||||
import("../config/paths.js"),
|
||||
import("../agents/session-dirs.js"),
|
||||
import("../agents/session-write-lock.js"),
|
||||
]);
|
||||
const stateDir = resolveStateDir(process.env);
|
||||
const sessionDirs = await resolveAgentSessionDirs(stateDir);
|
||||
for (const sessionsDir of sessionDirs) {
|
||||
const result = await cleanStaleLockFiles({
|
||||
sessionsDir,
|
||||
staleMs: SESSION_LOCK_STALE_MS,
|
||||
removeStale: true,
|
||||
log: { warn: (message) => params.log.warn(message) },
|
||||
});
|
||||
if (result.cleaned.length > 0) {
|
||||
const { markRestartAbortedMainSessionsFromLocks } =
|
||||
await import("../agents/main-session-restart-recovery.js");
|
||||
await markRestartAbortedMainSessionsFromLocks({
|
||||
sessionsDir,
|
||||
cleanedLocks: result.cleaned,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
params.log.warn(`session lock cleanup failed on startup: ${String(err)}`);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await measureStartup(params.startupTrace, "sidecars.subagent-recovery", async () => {
|
||||
const { scheduleSubagentOrphanRecovery } = await import("../agents/subagent-registry.js");
|
||||
scheduleSubagentOrphanRecovery();
|
||||
schedulePostReadySidecarTask({
|
||||
startupTrace: params.startupTrace,
|
||||
name: "sidecars.restart-sentinel",
|
||||
log: params.log,
|
||||
run: async () => {
|
||||
if (!shouldCheckRestartSentinel()) {
|
||||
return;
|
||||
}
|
||||
if (!hasRestartSentinelFileFast()) {
|
||||
return;
|
||||
}
|
||||
setTimeout(() => {
|
||||
void import("./server-restart-sentinel.js")
|
||||
.then(({ scheduleRestartSentinelWake }) =>
|
||||
scheduleRestartSentinelWake({ deps: params.deps }),
|
||||
)
|
||||
.catch((err) => {
|
||||
params.log.warn(`restart sentinel wake failed to schedule: ${String(err)}`);
|
||||
});
|
||||
}, 750);
|
||||
},
|
||||
});
|
||||
|
||||
await measureStartup(params.startupTrace, "sidecars.main-session-recovery", async () => {
|
||||
const { scheduleRestartAbortedMainSessionRecovery } =
|
||||
await import("../agents/main-session-restart-recovery.js");
|
||||
scheduleRestartAbortedMainSessionRecovery();
|
||||
schedulePostReadySidecarTask({
|
||||
startupTrace: params.startupTrace,
|
||||
name: "sidecars.subagent-recovery",
|
||||
log: params.log,
|
||||
run: async () => {
|
||||
const { scheduleSubagentOrphanRecovery } = await import("../agents/subagent-registry.js");
|
||||
scheduleSubagentOrphanRecovery();
|
||||
},
|
||||
});
|
||||
|
||||
schedulePostReadySidecarTask({
|
||||
startupTrace: params.startupTrace,
|
||||
name: "sidecars.main-session-recovery",
|
||||
log: params.log,
|
||||
run: async () => {
|
||||
const { scheduleRestartAbortedMainSessionRecovery } =
|
||||
await import("../agents/main-session-restart-recovery.js");
|
||||
scheduleRestartAbortedMainSessionRecovery();
|
||||
},
|
||||
});
|
||||
|
||||
return { pluginServices };
|
||||
|
||||
@@ -3,8 +3,11 @@ import { getActiveEmbeddedRunCount } from "../agents/pi-embedded-runner/run-stat
|
||||
import { getTotalPendingReplies } from "../auto-reply/reply/dispatcher-registry.js";
|
||||
import type { CanvasHostServer } from "../canvas-host/server.js";
|
||||
import type { ChannelRuntimeSurface } from "../channels/plugins/channel-runtime-surface.types.js";
|
||||
import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js";
|
||||
import { getLoadedChannelPluginEntryById } from "../channels/plugins/registry-loaded.js";
|
||||
import {
|
||||
getLoadedChannelPluginEntryById,
|
||||
listLoadedChannelPlugins,
|
||||
} from "../channels/plugins/registry-loaded.js";
|
||||
import type { ChannelId } from "../channels/plugins/types.public.js";
|
||||
import { createDefaultDeps } from "../cli/deps.js";
|
||||
import { isRestartEnabled } from "../config/commands.flags.js";
|
||||
import {
|
||||
@@ -74,11 +77,6 @@ import {
|
||||
type SharedGatewaySessionGenerationState,
|
||||
} from "./server-shared-auth-generation.js";
|
||||
import { STARTUP_UNAVAILABLE_GATEWAY_METHODS } from "./server-startup-unavailable-methods.js";
|
||||
import {
|
||||
startGatewayEarlyRuntime,
|
||||
startGatewayPluginDiscovery,
|
||||
startGatewayPostAttachRuntime,
|
||||
} from "./server-startup.js";
|
||||
import { createWizardSessionTracker } from "./server-wizard-sessions.js";
|
||||
import { createGatewayEventLoopHealthMonitor } from "./server/event-loop-health.js";
|
||||
import {
|
||||
@@ -99,6 +97,37 @@ export { __resetModelCatalogCacheForTest } from "./server-model-catalog.js";
|
||||
ensureOpenClawCliOnPath();
|
||||
|
||||
const MAX_MEDIA_TTL_HOURS = 24 * 7;
|
||||
const POST_READY_MAINTENANCE_DELAY_MS = 250;
|
||||
|
||||
type GatewayStartupChannelPlugin = {
|
||||
id: ChannelId;
|
||||
gatewayMethods?: readonly string[];
|
||||
meta: {
|
||||
aliases?: readonly string[];
|
||||
};
|
||||
};
|
||||
|
||||
let gatewayStartupEarlyModulePromise: Promise<typeof import("./server-startup-early.js")> | null =
|
||||
null;
|
||||
let gatewayStartupPostAttachModulePromise: Promise<
|
||||
typeof import("./server-startup-post-attach.js")
|
||||
> | null = null;
|
||||
|
||||
function loadGatewayStartupEarlyModule(): Promise<typeof import("./server-startup-early.js")> {
|
||||
gatewayStartupEarlyModulePromise ??= import("./server-startup-early.js");
|
||||
return gatewayStartupEarlyModulePromise;
|
||||
}
|
||||
|
||||
function loadGatewayStartupPostAttachModule(): Promise<
|
||||
typeof import("./server-startup-post-attach.js")
|
||||
> {
|
||||
gatewayStartupPostAttachModulePromise ??= import("./server-startup-post-attach.js");
|
||||
return gatewayStartupPostAttachModulePromise;
|
||||
}
|
||||
|
||||
function listGatewayStartupChannelPlugins(): GatewayStartupChannelPlugin[] {
|
||||
return listLoadedChannelPlugins() as GatewayStartupChannelPlugin[];
|
||||
}
|
||||
|
||||
function resolveMediaCleanupTtlMs(ttlHoursRaw: number): number {
|
||||
const ttlHours = Math.min(Math.max(ttlHoursRaw, 1), MAX_MEDIA_TTL_HOURS);
|
||||
@@ -647,7 +676,7 @@ export async function startGatewayServer(
|
||||
}
|
||||
let { pluginRegistry, baseGatewayMethods } = pluginBootstrap;
|
||||
const channelLogs = Object.fromEntries(
|
||||
listChannelPlugins().map((plugin) => [plugin.id, logChannels.child(plugin.id)]),
|
||||
listGatewayStartupChannelPlugins().map((plugin) => [plugin.id, logChannels.child(plugin.id)]),
|
||||
) as Record<ChannelId, ReturnType<typeof createSubsystemLogger>>;
|
||||
const channelRuntimeEnvs = Object.fromEntries(
|
||||
Object.entries(channelLogs).map(([id, logger]) => [id, runtimeForLogger(logger)]),
|
||||
@@ -656,7 +685,7 @@ export async function startGatewayServer(
|
||||
Array.from(
|
||||
new Set([
|
||||
...nextBaseGatewayMethods,
|
||||
...listChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []),
|
||||
...listGatewayStartupChannelPlugins().flatMap((plugin) => plugin.gatewayMethods ?? []),
|
||||
]),
|
||||
);
|
||||
const runtimeConfig = await startupTrace.measure("runtime.config", async () => {
|
||||
@@ -744,6 +773,7 @@ export async function startGatewayServer(
|
||||
|
||||
const deps = createDefaultDeps();
|
||||
let runtimeState: GatewayServerLiveState | null = null;
|
||||
let gatewayCronStartHandled = false;
|
||||
let canvasHostServer: CanvasHostServer | null = null;
|
||||
const gatewayTls = await startupTrace.measure("tls.runtime", () =>
|
||||
loadGatewayTlsRuntime(cfgAtStart.gateway?.tls, log.child("tls")),
|
||||
@@ -941,42 +971,44 @@ export async function startGatewayServer(
|
||||
|
||||
try {
|
||||
const earlyRuntime = await startupTrace.measure("runtime.early", () =>
|
||||
startGatewayEarlyRuntime({
|
||||
minimalTestGateway,
|
||||
cfgAtStart,
|
||||
port,
|
||||
gatewayTls,
|
||||
tailscaleMode,
|
||||
log,
|
||||
logDiscovery,
|
||||
nodeRegistry,
|
||||
pluginRegistry,
|
||||
broadcast,
|
||||
nodeSendToAllSubscribed,
|
||||
getPresenceVersion,
|
||||
getHealthVersion,
|
||||
refreshGatewayHealthSnapshot: refreshGatewayHealthSnapshotWithRuntime,
|
||||
logHealth,
|
||||
dedupe,
|
||||
chatAbortControllers,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen,
|
||||
removeChatRun,
|
||||
agentRunSeq,
|
||||
nodeSendToSession,
|
||||
...(typeof cfgAtStart.media?.ttlHours === "number"
|
||||
? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) }
|
||||
: {}),
|
||||
skillsRefreshDelayMs: runtimeState.skillsRefreshDelayMs,
|
||||
getSkillsRefreshTimer: () => runtimeState.skillsRefreshTimer,
|
||||
setSkillsRefreshTimer: (timer) => {
|
||||
runtimeState.skillsRefreshTimer = timer;
|
||||
},
|
||||
getRuntimeConfig,
|
||||
startupTrace,
|
||||
}),
|
||||
loadGatewayStartupEarlyModule().then(({ startGatewayEarlyRuntime }) =>
|
||||
startGatewayEarlyRuntime({
|
||||
minimalTestGateway,
|
||||
cfgAtStart,
|
||||
port,
|
||||
gatewayTls,
|
||||
tailscaleMode,
|
||||
log,
|
||||
logDiscovery,
|
||||
nodeRegistry,
|
||||
pluginRegistry,
|
||||
broadcast,
|
||||
nodeSendToAllSubscribed,
|
||||
getPresenceVersion,
|
||||
getHealthVersion,
|
||||
refreshGatewayHealthSnapshot: refreshGatewayHealthSnapshotWithRuntime,
|
||||
logHealth,
|
||||
dedupe,
|
||||
chatAbortControllers,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
chatDeltaSentAt,
|
||||
chatDeltaLastBroadcastLen,
|
||||
removeChatRun,
|
||||
agentRunSeq,
|
||||
nodeSendToSession,
|
||||
...(typeof cfgAtStart.media?.ttlHours === "number"
|
||||
? { mediaCleanupTtlMs: resolveMediaCleanupTtlMs(cfgAtStart.media.ttlHours) }
|
||||
: {}),
|
||||
skillsRefreshDelayMs: runtimeState.skillsRefreshDelayMs,
|
||||
getSkillsRefreshTimer: () => runtimeState.skillsRefreshTimer,
|
||||
setSkillsRefreshTimer: (timer) => {
|
||||
runtimeState.skillsRefreshTimer = timer;
|
||||
},
|
||||
getRuntimeConfig,
|
||||
startupTrace,
|
||||
}),
|
||||
),
|
||||
);
|
||||
runtimeState.bonjourStop = earlyRuntime.bonjourStop;
|
||||
getActiveTaskCount = earlyRuntime.getActiveTaskCount;
|
||||
@@ -1064,6 +1096,7 @@ export async function startGatewayServer(
|
||||
);
|
||||
}
|
||||
}
|
||||
const { startGatewayPluginDiscovery } = await loadGatewayStartupEarlyModule();
|
||||
runtimeState.bonjourStop = await startGatewayPluginDiscovery({
|
||||
minimalTestGateway,
|
||||
cfgAtStart,
|
||||
@@ -1079,7 +1112,7 @@ export async function startGatewayServer(
|
||||
};
|
||||
const listAttachedChannelConfigTargets = () =>
|
||||
new Map(
|
||||
listChannelPlugins().map((plugin) => [
|
||||
listGatewayStartupChannelPlugins().map((plugin) => [
|
||||
plugin.id,
|
||||
listChannelPluginConfigTargetIds({
|
||||
channelId: plugin.id,
|
||||
@@ -1340,64 +1373,66 @@ export async function startGatewayServer(
|
||||
tailscaleCleanup: runtimeState.tailscaleCleanup,
|
||||
pluginServices: runtimeState.pluginServices,
|
||||
} = await startupTrace.measure("runtime.post-attach", () =>
|
||||
startGatewayPostAttachRuntime({
|
||||
minimalTestGateway,
|
||||
cfgAtStart,
|
||||
bindHost,
|
||||
bindHosts: httpBindHosts,
|
||||
port,
|
||||
tlsEnabled: gatewayTls.enabled,
|
||||
log,
|
||||
isNixMode,
|
||||
startupStartedAt: opts.startupStartedAt,
|
||||
broadcast,
|
||||
tailscaleMode,
|
||||
resetOnExit: tailscaleConfig.resetOnExit ?? false,
|
||||
controlUiBasePath,
|
||||
logTailscale,
|
||||
gatewayPluginConfigAtStart,
|
||||
pluginRegistry,
|
||||
defaultWorkspaceDir,
|
||||
deps,
|
||||
startChannels,
|
||||
logHooks,
|
||||
logChannels,
|
||||
unavailableGatewayMethods,
|
||||
loadStartupPlugins: runtimePluginsLoaded
|
||||
? undefined
|
||||
: async () => {
|
||||
const { loadGatewayStartupPluginRuntime } = await loadStartupPluginsModule();
|
||||
return loadGatewayStartupPluginRuntime({
|
||||
cfg: gatewayPluginConfigAtStart,
|
||||
activationSourceConfig: startupActivationSourceConfig,
|
||||
workspaceDir: defaultWorkspaceDir,
|
||||
log,
|
||||
baseMethods,
|
||||
startupPluginIds,
|
||||
pluginLookUpTable,
|
||||
startupTrace,
|
||||
});
|
||||
},
|
||||
onStartupPluginsLoading: () => {
|
||||
startupPendingReason = "startup-sidecars";
|
||||
},
|
||||
onStartupPluginsLoaded: async (loaded) => {
|
||||
replaceAttachedPluginRuntime(loaded);
|
||||
startupPendingReason = "startup-sidecars";
|
||||
await refreshAttachedGatewayDiscovery(loaded.pluginRegistry);
|
||||
},
|
||||
getCronService: () =>
|
||||
runtimeState?.cronState.cron as PluginHookGatewayCronService | undefined,
|
||||
onPluginServices: (pluginServices) => {
|
||||
runtimeState.pluginServices = pluginServices;
|
||||
},
|
||||
onSidecarsReady: () => {
|
||||
startupSidecarsReady = true;
|
||||
activateScheduledServicesWhenReady();
|
||||
},
|
||||
startupTrace,
|
||||
deferSidecars: opts.deferStartupSidecars === true,
|
||||
}),
|
||||
loadGatewayStartupPostAttachModule().then(({ startGatewayPostAttachRuntime }) =>
|
||||
startGatewayPostAttachRuntime({
|
||||
minimalTestGateway,
|
||||
cfgAtStart,
|
||||
bindHost,
|
||||
bindHosts: httpBindHosts,
|
||||
port,
|
||||
tlsEnabled: gatewayTls.enabled,
|
||||
log,
|
||||
isNixMode,
|
||||
startupStartedAt: opts.startupStartedAt,
|
||||
broadcast,
|
||||
tailscaleMode,
|
||||
resetOnExit: tailscaleConfig.resetOnExit ?? false,
|
||||
controlUiBasePath,
|
||||
logTailscale,
|
||||
gatewayPluginConfigAtStart,
|
||||
pluginRegistry,
|
||||
defaultWorkspaceDir,
|
||||
deps,
|
||||
startChannels,
|
||||
logHooks,
|
||||
logChannels,
|
||||
unavailableGatewayMethods,
|
||||
loadStartupPlugins: runtimePluginsLoaded
|
||||
? undefined
|
||||
: async () => {
|
||||
const { loadGatewayStartupPluginRuntime } = await loadStartupPluginsModule();
|
||||
return loadGatewayStartupPluginRuntime({
|
||||
cfg: gatewayPluginConfigAtStart,
|
||||
activationSourceConfig: startupActivationSourceConfig,
|
||||
workspaceDir: defaultWorkspaceDir,
|
||||
log,
|
||||
baseMethods,
|
||||
startupPluginIds,
|
||||
pluginLookUpTable,
|
||||
startupTrace,
|
||||
});
|
||||
},
|
||||
onStartupPluginsLoading: () => {
|
||||
startupPendingReason = "startup-sidecars";
|
||||
},
|
||||
onStartupPluginsLoaded: async (loaded) => {
|
||||
replaceAttachedPluginRuntime(loaded);
|
||||
startupPendingReason = "startup-sidecars";
|
||||
await refreshAttachedGatewayDiscovery(loaded.pluginRegistry);
|
||||
},
|
||||
getCronService: () =>
|
||||
runtimeState?.cronState.cron as PluginHookGatewayCronService | undefined,
|
||||
onPluginServices: (pluginServices) => {
|
||||
runtimeState.pluginServices = pluginServices;
|
||||
},
|
||||
onSidecarsReady: () => {
|
||||
startupSidecarsReady = true;
|
||||
activateScheduledServicesWhenReady();
|
||||
},
|
||||
startupTrace,
|
||||
deferSidecars: opts.deferStartupSidecars === true,
|
||||
}),
|
||||
),
|
||||
));
|
||||
startupTrace.detail("memory.ready", collectProcessMemoryUsageMb());
|
||||
startupTrace.mark("ready");
|
||||
@@ -1424,12 +1459,16 @@ export async function startGatewayServer(
|
||||
channelHealthMonitor: runtimeState.channelHealthMonitor,
|
||||
}),
|
||||
setState: (nextState) => {
|
||||
const cronStateChanged = nextState.cronState !== runtimeState.cronState;
|
||||
runtimeState.hooksConfig = nextState.hooksConfig;
|
||||
runtimeState.hookClientIpConfig = nextState.hookClientIpConfig;
|
||||
runtimeState.heartbeatRunner = nextState.heartbeatRunner;
|
||||
runtimeState.cronState = nextState.cronState;
|
||||
deps.cron = runtimeState.cronState.cron;
|
||||
runtimeState.channelHealthMonitor = nextState.channelHealthMonitor;
|
||||
if (cronStateChanged) {
|
||||
gatewayCronStartHandled = true;
|
||||
}
|
||||
},
|
||||
startChannel,
|
||||
stopChannel,
|
||||
@@ -1438,6 +1477,9 @@ export async function startGatewayServer(
|
||||
logChannels,
|
||||
logCron,
|
||||
logReload,
|
||||
onCronRestart: () => {
|
||||
gatewayCronStartHandled = true;
|
||||
},
|
||||
channelManager,
|
||||
activateRuntimeSecrets,
|
||||
resolveSharedGatewaySessionGenerationForConfig,
|
||||
@@ -1448,19 +1490,31 @@ export async function startGatewayServer(
|
||||
log.warn(`gateway: failed to promote config last-known-good backup: ${String(err)}`);
|
||||
});
|
||||
if (!minimalTestGateway) {
|
||||
const maintenance = await earlyRuntime.startMaintenance();
|
||||
if (maintenance) {
|
||||
runtimeState.tickInterval = maintenance.tickInterval;
|
||||
runtimeState.healthInterval = maintenance.healthInterval;
|
||||
runtimeState.dedupeCleanup = maintenance.dedupeCleanup;
|
||||
runtimeState.mediaCleanup = maintenance.mediaCleanup;
|
||||
}
|
||||
gatewayRuntimeServices.startGatewayCronWithLogging({
|
||||
cron: runtimeState.cronState.cron,
|
||||
logCron,
|
||||
});
|
||||
const handle = setTimeout(() => {
|
||||
void (async () => {
|
||||
const maintenance = await earlyRuntime.startMaintenance();
|
||||
if (maintenance) {
|
||||
runtimeState.tickInterval = maintenance.tickInterval;
|
||||
runtimeState.healthInterval = maintenance.healthInterval;
|
||||
runtimeState.dedupeCleanup = maintenance.dedupeCleanup;
|
||||
runtimeState.mediaCleanup = maintenance.mediaCleanup;
|
||||
}
|
||||
if (!gatewayCronStartHandled) {
|
||||
gatewayCronStartHandled = true;
|
||||
gatewayRuntimeServices.startGatewayCronWithLogging({
|
||||
cron: runtimeState.cronState.cron,
|
||||
logCron,
|
||||
});
|
||||
}
|
||||
startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb());
|
||||
})().catch((err) => {
|
||||
log.warn(`gateway post-ready maintenance startup failed: ${String(err)}`);
|
||||
});
|
||||
}, POST_READY_MAINTENANCE_DELAY_MS);
|
||||
handle.unref?.();
|
||||
} else {
|
||||
startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb());
|
||||
}
|
||||
startupTrace.detail("memory.post-ready", collectProcessMemoryUsageMb());
|
||||
} catch (err) {
|
||||
await closeOnStartupFailure();
|
||||
throw err;
|
||||
|
||||
@@ -703,12 +703,18 @@ describe("gateway hot reload", () => {
|
||||
expect.objectContaining(nextConfig),
|
||||
);
|
||||
|
||||
expect(hoisted.cronInstances.length).toBe(2);
|
||||
await vi.waitFor(() => {
|
||||
expect(
|
||||
hoisted.cronInstances.some((instance) => instance.start.mock.calls.length === 1),
|
||||
).toBe(true);
|
||||
expect(hoisted.cronInstances.length).toBeGreaterThanOrEqual(1);
|
||||
});
|
||||
const restartedCron = hoisted.cronInstances.at(-1);
|
||||
if (!restartedCron) {
|
||||
throw new Error("expected cron restart to create a cron service");
|
||||
}
|
||||
await vi.waitFor(() => {
|
||||
expect(restartedCron.start).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 300));
|
||||
expect(restartedCron.start).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(hoisted.providerManager.stopChannel).toHaveBeenCalledTimes(5);
|
||||
expect(hoisted.providerManager.startChannel).toHaveBeenCalledTimes(5);
|
||||
|
||||
@@ -435,6 +435,26 @@ function readPackageManifest(
|
||||
}
|
||||
}
|
||||
|
||||
function readTrustedPackageManifest(dir: string): PackageManifest | null {
|
||||
try {
|
||||
return JSON.parse(fs.readFileSync(path.join(dir, "package.json"), "utf8")) as PackageManifest;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function readCandidatePackageManifest(params: {
|
||||
dir: string;
|
||||
origin: PluginOrigin;
|
||||
rejectHardlinks: boolean;
|
||||
rootRealPath?: string;
|
||||
}): PackageManifest | null {
|
||||
if (params.origin === "bundled") {
|
||||
return readTrustedPackageManifest(params.dir);
|
||||
}
|
||||
return readPackageManifest(params.dir, params.rejectHardlinks, params.rootRealPath);
|
||||
}
|
||||
|
||||
function deriveIdHint(params: {
|
||||
filePath: string;
|
||||
manifestId?: string;
|
||||
@@ -660,7 +680,12 @@ function discoverInDirectory(params: {
|
||||
|
||||
const rejectHardlinks = params.origin !== "bundled";
|
||||
const fullPathRealPath = safeRealpathSync(fullPath, params.realpathCache) ?? undefined;
|
||||
const manifest = readPackageManifest(fullPath, rejectHardlinks, fullPathRealPath);
|
||||
const manifest = readCandidatePackageManifest({
|
||||
dir: fullPath,
|
||||
origin: params.origin,
|
||||
rejectHardlinks,
|
||||
...(fullPathRealPath !== undefined ? { rootRealPath: fullPathRealPath } : {}),
|
||||
});
|
||||
const extensionResolution = resolvePackageExtensionEntries(manifest ?? undefined);
|
||||
const extensions = extensionResolution.status === "ok" ? extensionResolution.entries : [];
|
||||
const manifestId = resolveIdHintManifestId(fullPath, rejectHardlinks, fullPathRealPath);
|
||||
@@ -860,7 +885,12 @@ function discoverFromPath(params: {
|
||||
if (stat.isDirectory()) {
|
||||
const rejectHardlinks = params.origin !== "bundled";
|
||||
const resolvedRealPath = safeRealpathSync(resolved, params.realpathCache) ?? undefined;
|
||||
const manifest = readPackageManifest(resolved, rejectHardlinks, resolvedRealPath);
|
||||
const manifest = readCandidatePackageManifest({
|
||||
dir: resolved,
|
||||
origin: params.origin,
|
||||
rejectHardlinks,
|
||||
...(resolvedRealPath !== undefined ? { rootRealPath: resolvedRealPath } : {}),
|
||||
});
|
||||
const extensionResolution = resolvePackageExtensionEntries(manifest ?? undefined);
|
||||
const extensions = extensionResolution.status === "ok" ? extensionResolution.entries : [];
|
||||
const manifestId = resolveIdHintManifestId(resolved, rejectHardlinks, resolvedRealPath);
|
||||
|
||||
@@ -1335,6 +1335,21 @@ function validatePluginConfig(params: {
|
||||
if (!schema) {
|
||||
return { ok: true, value: params.value as Record<string, unknown> | undefined };
|
||||
}
|
||||
if (isEmptyPluginConfigJsonSchema(schema)) {
|
||||
if (
|
||||
params.value === undefined ||
|
||||
(params.value &&
|
||||
typeof params.value === "object" &&
|
||||
!Array.isArray(params.value) &&
|
||||
Object.keys(params.value).length === 0)
|
||||
) {
|
||||
return { ok: true, value: {} };
|
||||
}
|
||||
if (!params.value || typeof params.value !== "object" || Array.isArray(params.value)) {
|
||||
return { ok: false, errors: ["<root>: must be object"] };
|
||||
}
|
||||
return { ok: false, errors: ["<root>: config must be empty"] };
|
||||
}
|
||||
const cacheKey = params.cacheKey ?? JSON.stringify(schema);
|
||||
const result = validateJsonSchemaValue({
|
||||
schema,
|
||||
@@ -1348,6 +1363,31 @@ function validatePluginConfig(params: {
|
||||
return { ok: false, errors: result.errors.map((error) => error.text) };
|
||||
}
|
||||
|
||||
function isEmptyPluginConfigJsonSchema(schema: Record<string, unknown>): boolean {
|
||||
if (schema.type !== "object" || schema.additionalProperties !== false) {
|
||||
return false;
|
||||
}
|
||||
const properties = schema.properties;
|
||||
if (
|
||||
!properties ||
|
||||
typeof properties !== "object" ||
|
||||
Array.isArray(properties) ||
|
||||
Object.keys(properties).length > 0
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return !(
|
||||
"required" in schema ||
|
||||
"dependentRequired" in schema ||
|
||||
"dependencies" in schema ||
|
||||
"minProperties" in schema ||
|
||||
"allOf" in schema ||
|
||||
"anyOf" in schema ||
|
||||
"oneOf" in schema ||
|
||||
"not" in schema
|
||||
);
|
||||
}
|
||||
|
||||
function resolvePluginModuleExport(moduleExport: unknown): {
|
||||
definition?: OpenClawPluginDefinition;
|
||||
register?: OpenClawPluginDefinition["register"];
|
||||
|
||||
@@ -155,6 +155,9 @@ function hasStalePersistedPluginMetadata(index: InstalledPluginIndex): boolean {
|
||||
packageJsonPath,
|
||||
plugin.packageJson.fileSignature,
|
||||
);
|
||||
if (packageJsonSignatureMatches === true && plugin.origin === "bundled") {
|
||||
return false;
|
||||
}
|
||||
if (packageJsonSignatureMatches === false) {
|
||||
return hashExistingFile(packageJsonPath) !== plugin.packageJson.hash;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user