From 1ea12fe3e254a49b7a08874617a84f72291eced0 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Mon, 27 Apr 2026 06:16:17 +0100 Subject: [PATCH] fix: stage bundled plugin runtime deps safely --- CHANGELOG.md | 1 + docs/cli/update.md | 2 + src/cli/gateway-cli/run-loop.test.ts | 51 ++++ src/cli/gateway-cli/run-loop.ts | 18 +- src/gateway/server-plugin-bootstrap.ts | 3 + src/gateway/server-plugins.ts | 3 + src/gateway/server-startup-plugins.test.ts | 191 ++++++++++++++ src/gateway/server-startup-plugins.ts | 78 ++++++ src/gateway/server.impl.ts | 2 + src/plugins/bundled-runtime-deps-activity.ts | 102 ++++++++ src/plugins/bundled-runtime-deps.test.ts | 84 ++++++ src/plugins/bundled-runtime-deps.ts | 261 ++++++++++++++++++- 12 files changed, 781 insertions(+), 15 deletions(-) create mode 100644 src/gateway/server-startup-plugins.test.ts create mode 100644 src/plugins/bundled-runtime-deps-activity.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4863d818205..619c2591922 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Plugins/install: stage bundled plugin runtime dependencies before Gateway startup and drain update restarts while preserving per-plugin isolation when pre-stage scan or install fails. Thanks @codex. - CLI/startup: read generated startup metadata from the bundled `dist` layout before falling back to live help rendering, so root/browser help and channel-option bootstrap stay on the fast path. Thanks @vincentkoc. - CLI/help: treat positional `help` invocations like `openclaw channels help` as help paths for startup gating, avoiding model/auth warmup while preserving positional arguments such as `openclaw docs help`. Thanks @gumadeiras. - Matrix/E2EE: stabilize recovery and broken-device QA flows while avoiding Matrix device-cleanup sync races that could leave shutdown-time crypto work running. Thanks @gumadeiras. diff --git a/docs/cli/update.md b/docs/cli/update.md index 19dc0b64aeb..07803ceeb70 100644 --- a/docs/cli/update.md +++ b/docs/cli/update.md @@ -138,6 +138,8 @@ If an exact pinned npm plugin update resolves to an artifact whose integrity dif Post-update plugin sync failures fail the update result and stop restart follow-up work. Fix the plugin install or update error, then rerun `openclaw update`. +When the updated Gateway starts, enabled bundled plugin runtime dependencies are staged before plugin activation. Update-triggered restarts drain any active runtime-dependency staging before closing the Gateway, so service-manager restarts do not interrupt an in-flight npm install. + If pnpm bootstrap still fails, the updater stops early with a package-manager-specific error instead of trying `npm run build` inside the checkout. diff --git a/src/cli/gateway-cli/run-loop.test.ts b/src/cli/gateway-cli/run-loop.test.ts index 49ab1f53fed..7771bbfdc71 100644 --- a/src/cli/gateway-cli/run-loop.test.ts +++ b/src/cli/gateway-cli/run-loop.test.ts @@ -22,6 +22,11 @@ const getActiveTaskCount = vi.fn(() => 0); const markGatewayDraining = vi.fn(); const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true })); const resetAllLanes = vi.fn(); +const getActiveBundledRuntimeDepsInstallCount = vi.fn(() => 0); +const waitForBundledRuntimeDepsInstallIdle = vi.fn(async (_timeoutMs?: number) => ({ + drained: true, + active: 0, +})); const restartGatewayProcessWithFreshPid = vi.fn< () => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string } >(() => ({ mode: "disabled" })); @@ -68,6 +73,12 @@ vi.mock("../../process/command-queue.js", () => ({ resetAllLanes: () => resetAllLanes(), })); +vi.mock("../../plugins/bundled-runtime-deps-activity.js", () => ({ + getActiveBundledRuntimeDepsInstallCount: () => getActiveBundledRuntimeDepsInstallCount(), + waitForBundledRuntimeDepsInstallIdle: (timeoutMs?: number) => + waitForBundledRuntimeDepsInstallIdle(timeoutMs), +})); + vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({ abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) => abortEmbeddedPiRun(sessionId, opts), @@ -400,6 +411,46 @@ describe("runGatewayLoop", () => { }); }); + it("waits for active runtime-deps installs before restart close", async () => { + vi.clearAllMocks(); + loadConfig.mockReturnValue({ + gateway: { + reload: { + deferralTimeoutMs: 90_000, + }, + }, + }); + let releaseRuntimeDeps!: () => void; + getActiveBundledRuntimeDepsInstallCount.mockReturnValueOnce(1).mockReturnValue(0); + waitForBundledRuntimeDepsInstallIdle.mockReturnValueOnce( + new Promise((resolve) => { + releaseRuntimeDeps = () => resolve({ drained: true, active: 0 }); + }), + ); + + await withIsolatedSignals(async ({ captureSignal }) => { + const { close, start } = await createSignaledLoopHarness(); + const sigusr1 = captureSignal("SIGUSR1"); + + sigusr1(); + await new Promise((resolve) => setImmediate(resolve)); + + expect(markGatewayDraining).toHaveBeenCalledOnce(); + expect(waitForBundledRuntimeDepsInstallIdle).toHaveBeenCalledWith(90_000); + expect(close).not.toHaveBeenCalled(); + + releaseRuntimeDeps(); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(close).toHaveBeenCalledWith({ + reason: "gateway restarting", + restartExpectedMs: 1500, + }); + expect(start).toHaveBeenCalledTimes(2); + }); + }); + it("releases the lock before exiting on spawned restart", async () => { vi.clearAllMocks(); diff --git a/src/cli/gateway-cli/run-loop.ts b/src/cli/gateway-cli/run-loop.ts index 6922c061d5c..a00cc9efab7 100644 --- a/src/cli/gateway-cli/run-loop.ts +++ b/src/cli/gateway-cli/run-loop.ts @@ -18,6 +18,10 @@ import { import { detectRespawnSupervisor } from "../../infra/supervisor-markers.js"; import { writeDiagnosticStabilityBundleForFailureSync } from "../../logging/diagnostic-stability-bundle.js"; import { createSubsystemLogger } from "../../logging/subsystem.js"; +import { + getActiveBundledRuntimeDepsInstallCount, + waitForBundledRuntimeDepsInstallIdle, +} from "../../plugins/bundled-runtime-deps-activity.js"; import { getActiveTaskCount, markGatewayDraining, @@ -187,7 +191,7 @@ export async function runGatewayLoop(params: { const createStillPendingDrainLogger = () => setInterval(() => { gatewayLog.warn( - `still draining ${getActiveTaskCount()} active task(s) and ${getActiveEmbeddedRunCount()} active embedded run(s) before restart`, + `still draining ${getActiveTaskCount()} active task(s), ${getActiveEmbeddedRunCount()} active embedded run(s), and ${getActiveBundledRuntimeDepsInstallCount()} runtime deps install(s) before restart`, ); }, RESTART_DRAIN_STILL_PENDING_WARN_MS); @@ -207,6 +211,7 @@ export async function runGatewayLoop(params: { markGatewayDraining(); const activeTasks = getActiveTaskCount(); const activeRuns = getActiveEmbeddedRunCount(); + const activeRuntimeDepsInstalls = getActiveBundledRuntimeDepsInstallCount(); // Best-effort abort for compacting runs so long compaction operations // don't hold session write locks across restart boundaries. @@ -214,20 +219,23 @@ export async function runGatewayLoop(params: { abortEmbeddedPiRun(undefined, { mode: "compacting" }); } - if (activeTasks > 0 || activeRuns > 0) { + if (activeTasks > 0 || activeRuns > 0 || activeRuntimeDepsInstalls > 0) { gatewayLog.info( - `draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`, + `draining ${activeTasks} active task(s), ${activeRuns} active embedded run(s), and ${activeRuntimeDepsInstalls} runtime deps install(s) before restart ${formatRestartDrainBudget()}`, ); const stillPendingDrainLogger = createStillPendingDrainLogger(); - const [tasksDrain, runsDrain] = await Promise.all([ + const [tasksDrain, runsDrain, runtimeDepsDrain] = await Promise.all([ activeTasks > 0 ? waitForActiveTasks(restartDrainTimeoutMs) : Promise.resolve({ drained: true }), activeRuns > 0 ? waitForActiveEmbeddedRuns(restartDrainTimeoutMs) : Promise.resolve({ drained: true }), + activeRuntimeDepsInstalls > 0 + ? waitForBundledRuntimeDepsInstallIdle(restartDrainTimeoutMs) + : Promise.resolve({ drained: true }), ]).finally(() => clearInterval(stillPendingDrainLogger)); - if (tasksDrain.drained && runsDrain.drained) { + if (tasksDrain.drained && runsDrain.drained && runtimeDepsDrain.drained) { gatewayLog.info("all active work drained"); } else { gatewayLog.warn("drain timeout reached; proceeding with restart"); diff --git a/src/gateway/server-plugin-bootstrap.ts b/src/gateway/server-plugin-bootstrap.ts index ffa0cfe25f8..99e64f70d18 100644 --- a/src/gateway/server-plugin-bootstrap.ts +++ b/src/gateway/server-plugin-bootstrap.ts @@ -1,6 +1,7 @@ import { primeConfiguredBindingRegistry } from "../channels/plugins/binding-registry.js"; import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { BundledRuntimeDepsInstallParams } from "../plugins/bundled-runtime-deps.js"; import type { PluginRegistry } from "../plugins/registry.js"; import { pinActivePluginChannelRegistry } from "../plugins/runtime.js"; import { @@ -34,6 +35,7 @@ type GatewayPluginBootstrapParams = { preferSetupRuntimeForChannelPlugins?: boolean; suppressPluginInfoLogs?: boolean; logDiagnostics?: boolean; + bundledRuntimeDepsInstaller?: (params: BundledRuntimeDepsInstallParams) => void; beforePrimeRegistry?: (pluginRegistry: PluginRegistry) => void; }; @@ -89,6 +91,7 @@ export function prepareGatewayPluginLoad(params: GatewayPluginBootstrapParams) { pluginIds: params.pluginIds, preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins, suppressPluginInfoLogs: params.suppressPluginInfoLogs, + bundledRuntimeDepsInstaller: params.bundledRuntimeDepsInstaller, }); params.beforePrimeRegistry?.(loaded.pluginRegistry); primeConfiguredBindingRegistry({ cfg: resolvedConfig }); diff --git a/src/gateway/server-plugins.ts b/src/gateway/server-plugins.ts index 2da368661e1..0737bfed62e 100644 --- a/src/gateway/server-plugins.ts +++ b/src/gateway/server-plugins.ts @@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto"; import { normalizeModelRef, parseModelRef } from "../agents/model-selection.js"; import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import type { BundledRuntimeDepsInstallParams } from "../plugins/bundled-runtime-deps.js"; import { resolveGatewayStartupPluginIds } from "../plugins/channel-plugin-ids.js"; import { normalizePluginsConfig } from "../plugins/config-state.js"; import { loadOpenClawPlugins } from "../plugins/loader.js"; @@ -448,6 +449,7 @@ export function loadGatewayPlugins(params: { pluginIds?: string[]; preferSetupRuntimeForChannelPlugins?: boolean; suppressPluginInfoLogs?: boolean; + bundledRuntimeDepsInstaller?: (params: BundledRuntimeDepsInstallParams) => void; }) { const activationAutoEnabled = params.activationSourceConfig !== undefined @@ -510,6 +512,7 @@ export function loadGatewayPlugins(params: { allowGatewaySubagentBinding: true, }, preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins, + bundledRuntimeDepsInstaller: params.bundledRuntimeDepsInstaller, }); const pluginMethods = Object.keys(pluginRegistry.gatewayHandlers); const gatewayMethods = Array.from(new Set([...params.baseMethods, ...pluginMethods])); diff --git a/src/gateway/server-startup-plugins.test.ts b/src/gateway/server-startup-plugins.test.ts new file mode 100644 index 00000000000..74be88f3afe --- /dev/null +++ b/src/gateway/server-startup-plugins.test.ts @@ -0,0 +1,191 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const applyPluginAutoEnable = vi.hoisted(() => + vi.fn((params: { config: unknown }) => ({ + config: params.config, + changes: [], + autoEnabledReasons: {}, + })), +); +const initSubagentRegistry = vi.hoisted(() => vi.fn()); +const loadGatewayStartupPlugins = vi.hoisted(() => + vi.fn((_params: unknown) => ({ + pluginRegistry: { diagnostics: [], gatewayHandlers: {}, plugins: [] }, + gatewayMethods: ["ping"], + })), +); +const repairBundledRuntimeDepsInstallRootAsync = vi.hoisted(() => + vi.fn(async (_params: unknown) => ({})), +); +const resolveBundledRuntimeDependencyPackageInstallRoot = vi.hoisted(() => + vi.fn((_packageRoot: string, _params: unknown) => "/runtime"), +); +const resolveConfiguredDeferredChannelPluginIds = vi.hoisted(() => vi.fn((_params: unknown) => [])); +const resolveGatewayStartupPluginIds = vi.hoisted(() => vi.fn((_params: unknown) => ["telegram"])); +const resolveOpenClawPackageRootSync = vi.hoisted(() => vi.fn((_params: unknown) => "/package")); +const runChannelPluginStartupMaintenance = vi.hoisted(() => + vi.fn(async (_params: unknown) => undefined), +); +const runStartupSessionMigration = vi.hoisted(() => vi.fn(async (_params: unknown) => undefined)); +const scanBundledPluginRuntimeDeps = vi.hoisted(() => + vi.fn((_params: unknown) => ({ + deps: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }], + missing: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }], + conflicts: [], + })), +); + +vi.mock("../agents/agent-scope.js", () => ({ + resolveAgentWorkspaceDir: () => "/workspace", + resolveDefaultAgentId: () => "default", +})); + +vi.mock("../agents/subagent-registry.js", () => ({ + initSubagentRegistry: () => initSubagentRegistry(), +})); + +vi.mock("../channels/plugins/lifecycle-startup.js", () => ({ + runChannelPluginStartupMaintenance: (params: unknown) => + runChannelPluginStartupMaintenance(params), +})); + +vi.mock("../config/plugin-auto-enable.js", () => ({ + applyPluginAutoEnable: (params: { config: unknown }) => applyPluginAutoEnable(params), +})); + +vi.mock("../infra/openclaw-root.js", () => ({ + resolveOpenClawPackageRootSync: (params: unknown) => resolveOpenClawPackageRootSync(params), +})); + +vi.mock("../plugins/bundled-runtime-deps.js", () => ({ + repairBundledRuntimeDepsInstallRootAsync: (params: unknown) => + repairBundledRuntimeDepsInstallRootAsync(params), + resolveBundledRuntimeDependencyPackageInstallRoot: (packageRoot: string, params: unknown) => + resolveBundledRuntimeDependencyPackageInstallRoot(packageRoot, params), + scanBundledPluginRuntimeDeps: (params: unknown) => scanBundledPluginRuntimeDeps(params), +})); + +vi.mock("../plugins/channel-plugin-ids.js", () => ({ + resolveConfiguredDeferredChannelPluginIds: (params: unknown) => + resolveConfiguredDeferredChannelPluginIds(params), + resolveGatewayStartupPluginIds: (params: unknown) => resolveGatewayStartupPluginIds(params), +})); + +vi.mock("../plugins/registry.js", () => ({ + createEmptyPluginRegistry: () => ({ diagnostics: [], gatewayHandlers: {}, plugins: [] }), +})); + +vi.mock("../plugins/runtime.js", () => ({ + getActivePluginRegistry: () => undefined, + setActivePluginRegistry: vi.fn(), +})); + +vi.mock("./server-methods-list.js", () => ({ + listGatewayMethods: () => ["ping"], +})); + +vi.mock("./server-methods.js", () => ({ + coreGatewayHandlers: {}, +})); + +vi.mock("./server-plugin-bootstrap.js", () => ({ + loadGatewayStartupPlugins: (params: unknown) => loadGatewayStartupPlugins(params), +})); + +vi.mock("./server-startup-session-migration.js", () => ({ + runStartupSessionMigration: (params: unknown) => runStartupSessionMigration(params), +})); + +function createLog() { + return { + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + debug: vi.fn(), + }; +} + +describe("prepareGatewayPluginBootstrap runtime-deps staging", () => { + beforeEach(() => { + applyPluginAutoEnable.mockClear(); + initSubagentRegistry.mockClear(); + loadGatewayStartupPlugins.mockClear(); + repairBundledRuntimeDepsInstallRootAsync.mockReset().mockResolvedValue({}); + resolveBundledRuntimeDependencyPackageInstallRoot.mockClear(); + resolveConfiguredDeferredChannelPluginIds.mockClear(); + resolveGatewayStartupPluginIds.mockClear().mockReturnValue(["telegram"]); + resolveOpenClawPackageRootSync.mockClear().mockReturnValue("/package"); + runChannelPluginStartupMaintenance.mockClear(); + runStartupSessionMigration.mockClear(); + scanBundledPluginRuntimeDeps.mockClear().mockReturnValue({ + deps: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }], + missing: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }], + conflicts: [], + }); + }); + + it("falls back to per-plugin runtime-deps installs after failed pre-start staging", async () => { + const installError = new Error("offline registry"); + repairBundledRuntimeDepsInstallRootAsync.mockRejectedValueOnce(installError); + const log = createLog(); + const { prepareGatewayPluginBootstrap } = await import("./server-startup-plugins.js"); + + await expect( + prepareGatewayPluginBootstrap({ + cfgAtStart: {}, + startupRuntimeConfig: {}, + minimalTestGateway: false, + log, + }), + ).resolves.toMatchObject({ + baseGatewayMethods: ["ping"], + startupPluginIds: ["telegram"], + }); + + expect(loadGatewayStartupPlugins).toHaveBeenCalledOnce(); + expect(scanBundledPluginRuntimeDeps).toHaveBeenCalledWith( + expect.objectContaining({ + selectedPluginIds: ["telegram"], + }), + ); + expect(log.warn).toHaveBeenCalledWith( + expect.stringContaining( + "gateway startup will continue with per-plugin runtime-deps installs", + ), + ); + expect(loadGatewayStartupPlugins.mock.calls[0]?.[0]).not.toHaveProperty( + "bundledRuntimeDepsInstaller", + ); + }); + + it("falls back to per-plugin runtime-deps installs after failed pre-start scan", async () => { + scanBundledPluginRuntimeDeps.mockImplementationOnce(() => { + throw new Error("unsupported runtime dependency spec"); + }); + const log = createLog(); + const { prepareGatewayPluginBootstrap } = await import("./server-startup-plugins.js"); + + await expect( + prepareGatewayPluginBootstrap({ + cfgAtStart: {}, + startupRuntimeConfig: {}, + minimalTestGateway: false, + log, + }), + ).resolves.toMatchObject({ + baseGatewayMethods: ["ping"], + startupPluginIds: ["telegram"], + }); + + expect(repairBundledRuntimeDepsInstallRootAsync).not.toHaveBeenCalled(); + expect(loadGatewayStartupPlugins).toHaveBeenCalledOnce(); + expect(log.warn).toHaveBeenCalledWith( + expect.stringContaining( + "failed to scan bundled runtime deps before gateway startup; gateway startup will continue with per-plugin runtime-deps installs", + ), + ); + expect(loadGatewayStartupPlugins.mock.calls[0]?.[0]).not.toHaveProperty( + "bundledRuntimeDepsInstaller", + ); + }); +}); diff --git a/src/gateway/server-startup-plugins.ts b/src/gateway/server-startup-plugins.ts index 8f31c99b5b7..b3999ac3cca 100644 --- a/src/gateway/server-startup-plugins.ts +++ b/src/gateway/server-startup-plugins.ts @@ -3,6 +3,12 @@ import { initSubagentRegistry } from "../agents/subagent-registry.js"; import { runChannelPluginStartupMaintenance } from "../channels/plugins/lifecycle-startup.js"; import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js"; import type { OpenClawConfig } from "../config/types.openclaw.js"; +import { resolveOpenClawPackageRootSync } from "../infra/openclaw-root.js"; +import { + repairBundledRuntimeDepsInstallRootAsync, + resolveBundledRuntimeDependencyPackageInstallRoot, + scanBundledPluginRuntimeDeps, +} from "../plugins/bundled-runtime-deps.js"; import { resolveConfiguredDeferredChannelPluginIds, resolveGatewayStartupPluginIds, @@ -20,6 +26,73 @@ type GatewayPluginBootstrapLog = { debug: (message: string) => void; }; +async function prestageGatewayBundledRuntimeDeps(params: { + cfg: OpenClawConfig; + pluginIds: readonly string[]; + log: GatewayPluginBootstrapLog; +}): Promise { + if (params.pluginIds.length === 0) { + return; + } + const packageRoot = resolveOpenClawPackageRootSync({ + argv1: process.argv[1], + cwd: process.cwd(), + moduleUrl: import.meta.url, + }); + if (!packageRoot) { + return; + } + let scanResult: ReturnType; + try { + scanResult = scanBundledPluginRuntimeDeps({ + packageRoot, + config: params.cfg, + selectedPluginIds: [...params.pluginIds], + env: process.env, + }); + } catch (error) { + params.log.warn( + `[plugins] failed to scan bundled runtime deps before gateway startup; gateway startup will continue with per-plugin runtime-deps installs: ${String(error)}`, + ); + return; + } + const { deps, missing, conflicts } = scanResult; + if (conflicts.length > 0) { + params.log.warn( + `[plugins] bundled runtime deps have version conflicts: ${conflicts.map((conflict) => `${conflict.name} (${conflict.versions.join(", ")})`).join("; ")}`, + ); + } + if (missing.length === 0) { + return; + } + const missingSpecs = missing.map((dep) => `${dep.name}@${dep.version}`); + const installSpecs = deps.map((dep) => `${dep.name}@${dep.version}`); + const installRoot = resolveBundledRuntimeDependencyPackageInstallRoot(packageRoot, { + env: process.env, + }); + const startedAt = Date.now(); + params.log.info( + `[plugins] staging bundled runtime deps before gateway startup (${missingSpecs.length} missing, ${installSpecs.length} install specs): ${missingSpecs.join(", ")}`, + ); + try { + await repairBundledRuntimeDepsInstallRootAsync({ + installRoot, + missingSpecs, + installSpecs, + env: process.env, + warn: (message) => params.log.warn(`[plugins] ${message}`), + }); + } catch (error) { + params.log.warn( + `[plugins] failed to stage bundled runtime deps before gateway startup after ${Date.now() - startedAt}ms; gateway startup will continue with per-plugin runtime-deps installs: ${String(error)}`, + ); + return; + } + params.log.info( + `[plugins] installed bundled runtime deps before gateway startup in ${Date.now() - startedAt}ms: ${missingSpecs.join(", ")}`, + ); +} + export async function prepareGatewayPluginBootstrap(params: { cfgAtStart: OpenClawConfig; startupRuntimeConfig: OpenClawConfig; @@ -88,6 +161,11 @@ export async function prepareGatewayPluginBootstrap(params: { let baseGatewayMethods = baseMethods; if (!params.minimalTestGateway) { + await prestageGatewayBundledRuntimeDeps({ + cfg: gatewayPluginConfigAtStart, + pluginIds: startupPluginIds, + log: params.log, + }); ({ pluginRegistry, gatewayMethods: baseGatewayMethods } = loadGatewayStartupPlugins({ cfg: gatewayPluginConfigAtStart, activationSourceConfig: params.cfgAtStart, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 7ff39f5f243..f5ada858852 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -30,6 +30,7 @@ import { enqueueSystemEvent } from "../infra/system-events.js"; import type { VoiceWakeRoutingConfig } from "../infra/voicewake-routing.js"; import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js"; import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js"; +import { getActiveBundledRuntimeDepsInstallCount } from "../plugins/bundled-runtime-deps-activity.js"; import { runGlobalGatewayStopSafely } from "../plugins/hook-runner-global.js"; import { createRuntimeChannel } from "../plugins/runtime/runtime-channel.js"; import type { PluginRuntime } from "../plugins/runtime/types.js"; @@ -328,6 +329,7 @@ export async function startGatewayServer( getTotalQueueSize() + getTotalPendingReplies() + getActiveEmbeddedRunCount() + + getActiveBundledRuntimeDepsInstallCount() + getInspectableTaskRegistrySummary().active, ); // Unconditional startup migration: seed gateway.controlUi.allowedOrigins for existing diff --git a/src/plugins/bundled-runtime-deps-activity.ts b/src/plugins/bundled-runtime-deps-activity.ts new file mode 100644 index 00000000000..a0474d409db --- /dev/null +++ b/src/plugins/bundled-runtime-deps-activity.ts @@ -0,0 +1,102 @@ +export type BundledRuntimeDepsInstallActivity = { + id: number; + installRoot: string; + missingSpecs: string[]; + installSpecs: string[]; + pluginId?: string; + startedAtMs: number; +}; + +type IdleWaiter = () => void; + +let nextActivityId = 1; +const activeInstalls = new Map(); +const idleWaiters = new Set(); + +function notifyIdleWaiters(): void { + if (activeInstalls.size > 0) { + return; + } + const waiters = [...idleWaiters]; + idleWaiters.clear(); + for (const waiter of waiters) { + waiter(); + } +} + +export function beginBundledRuntimeDepsInstall(params: { + installRoot: string; + missingSpecs: readonly string[]; + installSpecs?: readonly string[]; + pluginId?: string; +}): () => void { + const id = nextActivityId++; + activeInstalls.set(id, { + id, + installRoot: params.installRoot, + missingSpecs: [...params.missingSpecs], + installSpecs: [...(params.installSpecs ?? params.missingSpecs)], + ...(params.pluginId ? { pluginId: params.pluginId } : {}), + startedAtMs: Date.now(), + }); + let ended = false; + return () => { + if (ended) { + return; + } + ended = true; + activeInstalls.delete(id); + notifyIdleWaiters(); + }; +} + +export function getActiveBundledRuntimeDepsInstallCount(): number { + return activeInstalls.size; +} + +export function listActiveBundledRuntimeDepsInstalls(): BundledRuntimeDepsInstallActivity[] { + return [...activeInstalls.values()].toSorted((left, right) => left.id - right.id); +} + +export async function waitForBundledRuntimeDepsInstallIdle( + timeoutMs?: number, +): Promise<{ drained: boolean; active: number }> { + if (activeInstalls.size === 0) { + return { drained: true, active: 0 }; + } + + return await new Promise((resolve) => { + let settled = false; + let timer: ReturnType | null = null; + const cleanup = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + idleWaiters.delete(onIdle); + }; + const settle = (drained: boolean) => { + if (settled) { + return; + } + settled = true; + cleanup(); + resolve({ drained, active: activeInstalls.size }); + }; + const onIdle = () => settle(true); + idleWaiters.add(onIdle); + if (typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs >= 0) { + timer = setTimeout(() => settle(false), Math.floor(timeoutMs)); + timer.unref?.(); + } + }); +} + +export const __testing = { + resetBundledRuntimeDepsInstallActivity(): void { + activeInstalls.clear(); + notifyIdleWaiters(); + idleWaiters.clear(); + nextActivityId = 1; + }, +}; diff --git a/src/plugins/bundled-runtime-deps.test.ts b/src/plugins/bundled-runtime-deps.test.ts index 6ece5bd6b04..e4797a6f76a 100644 --- a/src/plugins/bundled-runtime-deps.test.ts +++ b/src/plugins/bundled-runtime-deps.test.ts @@ -4,6 +4,11 @@ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it, vi } from "vitest"; +import { + __testing as bundledRuntimeDepsActivityTesting, + getActiveBundledRuntimeDepsInstallCount, + waitForBundledRuntimeDepsInstallIdle, +} from "./bundled-runtime-deps-activity.js"; import { __testing as bundledRuntimeDepsTesting, createBundledRuntimeDependencyAliasMap, @@ -12,6 +17,7 @@ import { ensureBundledPluginRuntimeDeps, installBundledRuntimeDeps, isWritableDirectory, + repairBundledRuntimeDepsInstallRootAsync, resolveBundledRuntimeDependencyInstallRoot, resolveBundledRuntimeDepsNpmRunner, scanBundledPluginRuntimeDeps, @@ -85,6 +91,7 @@ function statfsFixture(params: { afterEach(() => { vi.restoreAllMocks(); spawnSyncMock.mockReset(); + bundledRuntimeDepsActivityTesting.resetBundledRuntimeDepsInstallActivity(); for (const dir of tempDirs.splice(0)) { fs.rmSync(dir, { recursive: true, force: true }); } @@ -773,6 +780,22 @@ describe("scanBundledPluginRuntimeDeps config policy", () => { ]); }); + it("trusts preselected startup plugin ids without reapplying config policy", () => { + const result = scanBundledPluginRuntimeDeps({ + packageRoot: setupPolicyPackageRoot(), + selectedPluginIds: ["telegram"], + config: { + plugins: { allow: ["browser"] }, + channels: { telegram: { botToken: "123:abc" } }, + }, + }); + + expect(result.deps.map((dep) => `${dep.name}@${dep.version}`)).toEqual([ + "telegram-runtime@2.0.0", + ]); + expect(result.conflicts).toEqual([]); + }); + it("reads each bundled plugin manifest once per runtime-deps scan", () => { const packageRoot = makeTempDir(); const pluginRoot = writeBundledPluginPackage({ @@ -1217,6 +1240,67 @@ describe("ensureBundledPluginRuntimeDeps", () => { ]); }); + it("tracks active runtime-deps installs until the installer returns", async () => { + const packageRoot = makeTempDir(); + const pluginRoot = path.join(packageRoot, "dist", "extensions", "browser"); + fs.mkdirSync(pluginRoot, { recursive: true }); + fs.writeFileSync( + path.join(pluginRoot, "package.json"), + JSON.stringify({ dependencies: { "browser-runtime": "1.0.0" } }), + ); + + let idleWait: Promise<{ drained: boolean; active: number }> | null = null; + expect(getActiveBundledRuntimeDepsInstallCount()).toBe(0); + const result = ensureBundledPluginRuntimeDeps({ + env: {}, + installDeps: (params) => { + expect(getActiveBundledRuntimeDepsInstallCount()).toBe(1); + idleWait = waitForBundledRuntimeDepsInstallIdle(); + writeInstalledPackage(params.installRoot, "browser-runtime", "1.0.0"); + }, + pluginId: "browser", + pluginRoot, + }); + + expect(result).toEqual({ + installedSpecs: ["browser-runtime@1.0.0"], + retainSpecs: ["browser-runtime@1.0.0"], + }); + expect(getActiveBundledRuntimeDepsInstallCount()).toBe(0); + await expect(idleWait).resolves.toEqual({ drained: true, active: 0 }); + }); + + it("keeps async repair locks and activity active until npm staging settles", async () => { + const installRoot = makeTempDir(); + const lockDir = path.join(installRoot, ".openclaw-runtime-deps.lock"); + let releaseInstall!: () => void; + const repair = repairBundledRuntimeDepsInstallRootAsync({ + installRoot, + missingSpecs: ["browser-runtime@1.0.0"], + installSpecs: ["browser-runtime@1.0.0"], + env: {}, + installDeps: async (params) => { + expect(fs.existsSync(lockDir)).toBe(true); + expect(getActiveBundledRuntimeDepsInstallCount()).toBe(1); + await new Promise((resolve) => { + releaseInstall = () => { + writeInstalledPackage(params.installRoot, "browser-runtime", "1.0.0"); + resolve(); + }; + }); + }, + }); + + await Promise.resolve(); + expect(fs.existsSync(lockDir)).toBe(true); + expect(getActiveBundledRuntimeDepsInstallCount()).toBe(1); + + releaseInstall(); + await expect(repair).resolves.toEqual({ installSpecs: ["browser-runtime@1.0.0"] }); + expect(fs.existsSync(lockDir)).toBe(false); + expect(getActiveBundledRuntimeDepsInstallCount()).toBe(0); + }); + it("does not expire active runtime-deps install locks by age alone", () => { expect( bundledRuntimeDepsTesting.shouldRemoveRuntimeDepsLock( diff --git a/src/plugins/bundled-runtime-deps.ts b/src/plugins/bundled-runtime-deps.ts index 4e248ef3602..e79cb58be71 100644 --- a/src/plugins/bundled-runtime-deps.ts +++ b/src/plugins/bundled-runtime-deps.ts @@ -1,4 +1,4 @@ -import { spawnSync } from "node:child_process"; +import { spawn, spawnSync } from "node:child_process"; import { createHash } from "node:crypto"; import fs from "node:fs"; import { Module } from "node:module"; @@ -10,6 +10,7 @@ import { createLowDiskSpaceWarning } from "../infra/disk-space.js"; import { resolveHomeRelativePath } from "../infra/home-dir.js"; import { createNpmProjectInstallEnv } from "../infra/npm-install-env.js"; import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js"; +import { beginBundledRuntimeDepsInstall } from "./bundled-runtime-deps-activity.js"; import { normalizePluginsConfig } from "./config-state.js"; import { satisfies, validRange, validSemver } from "./semver.runtime.js"; @@ -187,6 +188,10 @@ function sleepSync(ms: number): void { Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms); } +async function sleep(ms: number): Promise { + await new Promise((resolve) => setTimeout(resolve, ms)); +} + function isProcessAlive(pid: number): boolean { if (!Number.isInteger(pid) || pid <= 0) { return false; @@ -985,11 +990,15 @@ function isBundledPluginConfiguredForRuntimeDeps(params: { function shouldIncludeBundledPluginRuntimeDeps(params: { config?: OpenClawConfig; pluginIds?: ReadonlySet; + selectedPluginIds?: ReadonlySet; pluginId: string; pluginDir: string; includeConfiguredChannels?: boolean; manifestCache?: BundledPluginRuntimeDepsManifestCache; }): boolean { + if (params.selectedPluginIds) { + return params.selectedPluginIds.has(params.pluginId); + } const scopedToPluginIds = Boolean(params.pluginIds); if (params.pluginIds) { if (!params.pluginIds.has(params.pluginId)) { @@ -1023,6 +1032,7 @@ function collectBundledPluginRuntimeDeps(params: { extensionsDir: string; config?: OpenClawConfig; pluginIds?: ReadonlySet; + selectedPluginIds?: ReadonlySet; includeConfiguredChannels?: boolean; }): { deps: RuntimeDepEntry[]; @@ -1041,6 +1051,7 @@ function collectBundledPluginRuntimeDeps(params: { !shouldIncludeBundledPluginRuntimeDeps({ config: params.config, pluginIds: params.pluginIds, + selectedPluginIds: params.selectedPluginIds, pluginId, pluginDir, includeConfiguredChannels: params.includeConfiguredChannels, @@ -1117,6 +1128,7 @@ export function scanBundledPluginRuntimeDeps(params: { packageRoot: string; config?: OpenClawConfig; pluginIds?: readonly string[]; + selectedPluginIds?: readonly string[]; includeConfiguredChannels?: boolean; env?: NodeJS.ProcessEnv; }): { @@ -1135,6 +1147,7 @@ export function scanBundledPluginRuntimeDeps(params: { extensionsDir, config: params.config, pluginIds: normalizePluginIdSet(params.pluginIds), + selectedPluginIds: normalizePluginIdSet(params.selectedPluginIds), includeConfiguredChannels: params.includeConfiguredChannels, }); const packageInstallRoot = resolveBundledRuntimeDependencyPackageInstallRoot(params.packageRoot, { @@ -1250,6 +1263,64 @@ function ensureNpmInstallExecutionManifest(installExecutionRoot: string): void { "utf8", ); } + +function formatBundledRuntimeDepsInstallError(result: { + error?: Error; + signal?: NodeJS.Signals | null; + status?: number | null; + stderr?: string | Buffer | null; + stdout?: string | Buffer | null; +}): string { + const output = [ + result.error?.message, + result.signal ? `terminated by ${result.signal}` : null, + result.stderr, + result.stdout, + ] + .filter(Boolean) + .join("\n") + .trim(); + return output || "npm install failed"; +} + +async function spawnBundledRuntimeDepsInstall(params: { + command: string; + args: string[]; + cwd: string; + env: NodeJS.ProcessEnv; +}): Promise { + await new Promise((resolve, reject) => { + const child = spawn(params.command, params.args, { + cwd: params.cwd, + env: params.env, + stdio: ["ignore", "pipe", "pipe"], + }); + const stdout: Buffer[] = []; + const stderr: Buffer[] = []; + child.stdout?.on("data", (chunk: Buffer) => stdout.push(chunk)); + child.stderr?.on("data", (chunk: Buffer) => stderr.push(chunk)); + child.on("error", (error) => { + reject(new Error(formatBundledRuntimeDepsInstallError({ error }))); + }); + child.on("close", (status, signal) => { + if (status === 0 && !signal) { + resolve(); + return; + } + reject( + new Error( + formatBundledRuntimeDepsInstallError({ + status, + signal, + stdout: Buffer.concat(stdout).toString("utf8"), + stderr: Buffer.concat(stderr).toString("utf8"), + }), + ), + ); + }); + }); +} + export function installBundledRuntimeDeps(params: { installRoot: string; installExecutionRoot?: string; @@ -1296,11 +1367,7 @@ export function installBundledRuntimeDeps(params: { stdio: "pipe", }); if (result.status !== 0 || result.error) { - const output = [result.error?.message, result.stderr, result.stdout] - .filter(Boolean) - .join("\n") - .trim(); - throw new Error(output || "npm install failed"); + throw new Error(formatBundledRuntimeDepsInstallError(result)); } assertBundledRuntimeDepsInstalled(installExecutionRoot, params.missingSpecs); if (isolatedExecutionRoot) { @@ -1323,6 +1390,68 @@ export function installBundledRuntimeDeps(params: { } } +export async function installBundledRuntimeDepsAsync(params: { + installRoot: string; + installExecutionRoot?: string; + linkNodeModulesFromExecutionRoot?: boolean; + missingSpecs: string[]; + env: NodeJS.ProcessEnv; + warn?: (message: string) => void; +}): Promise { + const installExecutionRoot = params.installExecutionRoot ?? params.installRoot; + const isolatedExecutionRoot = + path.resolve(installExecutionRoot) !== path.resolve(params.installRoot); + const cleanInstallExecutionRoot = + isolatedExecutionRoot && + shouldCleanBundledRuntimeDepsInstallExecutionRoot({ + installRoot: params.installRoot, + installExecutionRoot, + }); + try { + fs.mkdirSync(params.installRoot, { recursive: true }); + fs.mkdirSync(installExecutionRoot, { recursive: true }); + const diskWarning = createLowDiskSpaceWarning({ + targetPath: installExecutionRoot, + purpose: "bundled plugin runtime dependency staging", + }); + if (diskWarning) { + params.warn?.(diskWarning); + } + ensureNpmInstallExecutionManifest(installExecutionRoot); + const installEnv = createBundledRuntimeDepsInstallEnv(params.env, { + cacheDir: path.join(installExecutionRoot, ".openclaw-npm-cache"), + }); + const npmRunner = resolveBundledRuntimeDepsNpmRunner({ + env: installEnv, + npmArgs: createBundledRuntimeDepsInstallArgs(params.missingSpecs), + }); + await spawnBundledRuntimeDepsInstall({ + command: npmRunner.command, + args: npmRunner.args, + cwd: installExecutionRoot, + env: npmRunner.env ?? installEnv, + }); + assertBundledRuntimeDepsInstalled(installExecutionRoot, params.missingSpecs); + if (isolatedExecutionRoot) { + const stagedNodeModulesDir = path.join(installExecutionRoot, "node_modules"); + if (!fs.existsSync(stagedNodeModulesDir)) { + throw new Error("npm install did not produce node_modules"); + } + const targetNodeModulesDir = path.join(params.installRoot, "node_modules"); + if (params.linkNodeModulesFromExecutionRoot) { + replaceNodeModulesDirFromCache(targetNodeModulesDir, stagedNodeModulesDir); + } else { + replaceNodeModulesDir(targetNodeModulesDir, stagedNodeModulesDir); + } + assertBundledRuntimeDepsInstalled(params.installRoot, params.missingSpecs); + } + } finally { + if (cleanInstallExecutionRoot) { + fs.rmSync(installExecutionRoot, { recursive: true, force: true }); + } + } +} + export function repairBundledRuntimeDepsInstallRoot(params: { installRoot: string; missingSpecs: string[]; @@ -1345,11 +1474,113 @@ export function repairBundledRuntimeDepsInstallRoot(params: { env: params.env, warn: params.warn, })); - install({ + const finishActivity = beginBundledRuntimeDepsInstall({ installRoot: params.installRoot, missingSpecs: params.missingSpecs, installSpecs, }); + try { + install({ + installRoot: params.installRoot, + missingSpecs: params.missingSpecs, + installSpecs, + }); + } finally { + finishActivity(); + } + writeRetainedRuntimeDepsManifest(params.installRoot, installSpecs); + return { installSpecs }; + }); +} + +async function withBundledRuntimeDepsInstallRootLockAsync( + installRoot: string, + run: () => Promise, +): Promise { + fs.mkdirSync(installRoot, { recursive: true }); + const lockDir = path.join(installRoot, BUNDLED_RUNTIME_DEPS_LOCK_DIR); + const startedAt = Date.now(); + let locked = false; + while (!locked) { + try { + fs.mkdirSync(lockDir); + try { + fs.writeFileSync( + path.join(lockDir, BUNDLED_RUNTIME_DEPS_LOCK_OWNER_FILE), + `${JSON.stringify({ pid: process.pid, createdAtMs: Date.now() }, null, 2)}\n`, + "utf8", + ); + } catch (ownerWriteError) { + fs.rmSync(lockDir, { recursive: true, force: true }); + throw ownerWriteError; + } + locked = true; + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code !== "EEXIST") { + throw error; + } + removeRuntimeDepsLockIfStale(lockDir, Date.now()); + const nowMs = Date.now(); + if (nowMs - startedAt > BUNDLED_RUNTIME_DEPS_LOCK_TIMEOUT_MS) { + throw new Error( + formatRuntimeDepsLockTimeoutMessage({ + lockDir, + owner: readRuntimeDepsLockOwner(lockDir), + waitedMs: nowMs - startedAt, + nowMs, + }), + { + cause: error, + }, + ); + } + await sleep(BUNDLED_RUNTIME_DEPS_LOCK_WAIT_MS); + } + } + try { + return await run(); + } finally { + fs.rmSync(lockDir, { recursive: true, force: true }); + } +} + +export async function repairBundledRuntimeDepsInstallRootAsync(params: { + installRoot: string; + missingSpecs: string[]; + installSpecs: string[]; + env: NodeJS.ProcessEnv; + installDeps?: (params: BundledRuntimeDepsInstallParams) => Promise; + warn?: (message: string) => void; +}): Promise<{ installSpecs: string[] }> { + return await withBundledRuntimeDepsInstallRootLockAsync(params.installRoot, async () => { + const retainedManifestSpecs = readRetainedRuntimeDepsManifest(params.installRoot); + const installSpecs = [...new Set([...retainedManifestSpecs, ...params.installSpecs])].toSorted( + (left, right) => left.localeCompare(right), + ); + const install = + params.installDeps ?? + ((installParams) => + installBundledRuntimeDepsAsync({ + installRoot: installParams.installRoot, + missingSpecs: installParams.installSpecs ?? installParams.missingSpecs, + env: params.env, + warn: params.warn, + })); + const finishActivity = beginBundledRuntimeDepsInstall({ + installRoot: params.installRoot, + missingSpecs: params.missingSpecs, + installSpecs, + }); + try { + await install({ + installRoot: params.installRoot, + missingSpecs: params.missingSpecs, + installSpecs, + }); + } finally { + finishActivity(); + } writeRetainedRuntimeDepsManifest(params.installRoot, installSpecs); return { installSpecs }; }); @@ -1458,13 +1689,23 @@ export function ensureBundledPluginRuntimeDeps(params: { missingSpecs: installParams.installSpecs ?? installParams.missingSpecs, env: params.env, })); - install({ + const finishActivity = beginBundledRuntimeDepsInstall({ installRoot, - installExecutionRoot, - ...(sourceCheckoutCacheStage ? { linkNodeModulesFromExecutionRoot: true } : {}), missingSpecs, installSpecs, + pluginId: params.pluginId, }); + try { + install({ + installRoot, + installExecutionRoot, + ...(sourceCheckoutCacheStage ? { linkNodeModulesFromExecutionRoot: true } : {}), + missingSpecs, + installSpecs, + }); + } finally { + finishActivity(); + } const cacheAlreadyPopulated = Boolean( sourceCheckoutCacheStage && hasAllDependencySentinels(sourceCheckoutCacheStage, deps), );