fix: stage bundled plugin runtime deps safely

This commit is contained in:
Peter Steinberger
2026-04-27 06:16:17 +01:00
parent 6038725501
commit 1ea12fe3e2
12 changed files with 781 additions and 15 deletions

View File

@@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Plugins/install: stage bundled plugin runtime dependencies before Gateway startup and drain update restarts while preserving per-plugin isolation when pre-stage scan or install fails. Thanks @codex.
- CLI/startup: read generated startup metadata from the bundled `dist` layout before falling back to live help rendering, so root/browser help and channel-option bootstrap stay on the fast path. Thanks @vincentkoc.
- CLI/help: treat positional `help` invocations like `openclaw channels help` as help paths for startup gating, avoiding model/auth warmup while preserving positional arguments such as `openclaw docs help`. Thanks @gumadeiras.
- Matrix/E2EE: stabilize recovery and broken-device QA flows while avoiding Matrix device-cleanup sync races that could leave shutdown-time crypto work running. Thanks @gumadeiras.

View File

@@ -138,6 +138,8 @@ If an exact pinned npm plugin update resolves to an artifact whose integrity dif
<Note>
Post-update plugin sync failures fail the update result and stop restart follow-up work. Fix the plugin install or update error, then rerun `openclaw update`.
When the updated Gateway starts, enabled bundled plugin runtime dependencies are staged before plugin activation. Update-triggered restarts drain any active runtime-dependency staging before closing the Gateway, so service-manager restarts do not interrupt an in-flight npm install.
If pnpm bootstrap still fails, the updater stops early with a package-manager-specific error instead of trying `npm run build` inside the checkout.
</Note>

View File

@@ -22,6 +22,11 @@ const getActiveTaskCount = vi.fn(() => 0);
const markGatewayDraining = vi.fn();
const waitForActiveTasks = vi.fn(async (_timeoutMs?: number) => ({ drained: true }));
const resetAllLanes = vi.fn();
const getActiveBundledRuntimeDepsInstallCount = vi.fn(() => 0);
const waitForBundledRuntimeDepsInstallIdle = vi.fn(async (_timeoutMs?: number) => ({
drained: true,
active: 0,
}));
const restartGatewayProcessWithFreshPid = vi.fn<
() => { mode: "spawned" | "supervised" | "disabled" | "failed"; pid?: number; detail?: string }
>(() => ({ mode: "disabled" }));
@@ -68,6 +73,12 @@ vi.mock("../../process/command-queue.js", () => ({
resetAllLanes: () => resetAllLanes(),
}));
vi.mock("../../plugins/bundled-runtime-deps-activity.js", () => ({
getActiveBundledRuntimeDepsInstallCount: () => getActiveBundledRuntimeDepsInstallCount(),
waitForBundledRuntimeDepsInstallIdle: (timeoutMs?: number) =>
waitForBundledRuntimeDepsInstallIdle(timeoutMs),
}));
vi.mock("../../agents/pi-embedded-runner/runs.js", () => ({
abortEmbeddedPiRun: (sessionId?: string, opts?: { mode?: "all" | "compacting" }) =>
abortEmbeddedPiRun(sessionId, opts),
@@ -400,6 +411,46 @@ describe("runGatewayLoop", () => {
});
});
it("waits for active runtime-deps installs before restart close", async () => {
vi.clearAllMocks();
loadConfig.mockReturnValue({
gateway: {
reload: {
deferralTimeoutMs: 90_000,
},
},
});
let releaseRuntimeDeps!: () => void;
getActiveBundledRuntimeDepsInstallCount.mockReturnValueOnce(1).mockReturnValue(0);
waitForBundledRuntimeDepsInstallIdle.mockReturnValueOnce(
new Promise((resolve) => {
releaseRuntimeDeps = () => resolve({ drained: true, active: 0 });
}),
);
await withIsolatedSignals(async ({ captureSignal }) => {
const { close, start } = await createSignaledLoopHarness();
const sigusr1 = captureSignal("SIGUSR1");
sigusr1();
await new Promise<void>((resolve) => setImmediate(resolve));
expect(markGatewayDraining).toHaveBeenCalledOnce();
expect(waitForBundledRuntimeDepsInstallIdle).toHaveBeenCalledWith(90_000);
expect(close).not.toHaveBeenCalled();
releaseRuntimeDeps();
await new Promise<void>((resolve) => setImmediate(resolve));
await new Promise<void>((resolve) => setImmediate(resolve));
expect(close).toHaveBeenCalledWith({
reason: "gateway restarting",
restartExpectedMs: 1500,
});
expect(start).toHaveBeenCalledTimes(2);
});
});
it("releases the lock before exiting on spawned restart", async () => {
vi.clearAllMocks();

View File

@@ -18,6 +18,10 @@ import {
import { detectRespawnSupervisor } from "../../infra/supervisor-markers.js";
import { writeDiagnosticStabilityBundleForFailureSync } from "../../logging/diagnostic-stability-bundle.js";
import { createSubsystemLogger } from "../../logging/subsystem.js";
import {
getActiveBundledRuntimeDepsInstallCount,
waitForBundledRuntimeDepsInstallIdle,
} from "../../plugins/bundled-runtime-deps-activity.js";
import {
getActiveTaskCount,
markGatewayDraining,
@@ -187,7 +191,7 @@ export async function runGatewayLoop(params: {
const createStillPendingDrainLogger = () =>
setInterval(() => {
gatewayLog.warn(
`still draining ${getActiveTaskCount()} active task(s) and ${getActiveEmbeddedRunCount()} active embedded run(s) before restart`,
`still draining ${getActiveTaskCount()} active task(s), ${getActiveEmbeddedRunCount()} active embedded run(s), and ${getActiveBundledRuntimeDepsInstallCount()} runtime deps install(s) before restart`,
);
}, RESTART_DRAIN_STILL_PENDING_WARN_MS);
@@ -207,6 +211,7 @@ export async function runGatewayLoop(params: {
markGatewayDraining();
const activeTasks = getActiveTaskCount();
const activeRuns = getActiveEmbeddedRunCount();
const activeRuntimeDepsInstalls = getActiveBundledRuntimeDepsInstallCount();
// Best-effort abort for compacting runs so long compaction operations
// don't hold session write locks across restart boundaries.
@@ -214,20 +219,23 @@ export async function runGatewayLoop(params: {
abortEmbeddedPiRun(undefined, { mode: "compacting" });
}
if (activeTasks > 0 || activeRuns > 0) {
if (activeTasks > 0 || activeRuns > 0 || activeRuntimeDepsInstalls > 0) {
gatewayLog.info(
`draining ${activeTasks} active task(s) and ${activeRuns} active embedded run(s) before restart ${formatRestartDrainBudget()}`,
`draining ${activeTasks} active task(s), ${activeRuns} active embedded run(s), and ${activeRuntimeDepsInstalls} runtime deps install(s) before restart ${formatRestartDrainBudget()}`,
);
const stillPendingDrainLogger = createStillPendingDrainLogger();
const [tasksDrain, runsDrain] = await Promise.all([
const [tasksDrain, runsDrain, runtimeDepsDrain] = await Promise.all([
activeTasks > 0
? waitForActiveTasks(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuns > 0
? waitForActiveEmbeddedRuns(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
activeRuntimeDepsInstalls > 0
? waitForBundledRuntimeDepsInstallIdle(restartDrainTimeoutMs)
: Promise.resolve({ drained: true }),
]).finally(() => clearInterval(stillPendingDrainLogger));
if (tasksDrain.drained && runsDrain.drained) {
if (tasksDrain.drained && runsDrain.drained && runtimeDepsDrain.drained) {
gatewayLog.info("all active work drained");
} else {
gatewayLog.warn("drain timeout reached; proceeding with restart");

View File

@@ -1,6 +1,7 @@
import { primeConfiguredBindingRegistry } from "../channels/plugins/binding-registry.js";
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { BundledRuntimeDepsInstallParams } from "../plugins/bundled-runtime-deps.js";
import type { PluginRegistry } from "../plugins/registry.js";
import { pinActivePluginChannelRegistry } from "../plugins/runtime.js";
import {
@@ -34,6 +35,7 @@ type GatewayPluginBootstrapParams = {
preferSetupRuntimeForChannelPlugins?: boolean;
suppressPluginInfoLogs?: boolean;
logDiagnostics?: boolean;
bundledRuntimeDepsInstaller?: (params: BundledRuntimeDepsInstallParams) => void;
beforePrimeRegistry?: (pluginRegistry: PluginRegistry) => void;
};
@@ -89,6 +91,7 @@ export function prepareGatewayPluginLoad(params: GatewayPluginBootstrapParams) {
pluginIds: params.pluginIds,
preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins,
suppressPluginInfoLogs: params.suppressPluginInfoLogs,
bundledRuntimeDepsInstaller: params.bundledRuntimeDepsInstaller,
});
params.beforePrimeRegistry?.(loaded.pluginRegistry);
primeConfiguredBindingRegistry({ cfg: resolvedConfig });

View File

@@ -2,6 +2,7 @@ import { randomUUID } from "node:crypto";
import { normalizeModelRef, parseModelRef } from "../agents/model-selection.js";
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import type { BundledRuntimeDepsInstallParams } from "../plugins/bundled-runtime-deps.js";
import { resolveGatewayStartupPluginIds } from "../plugins/channel-plugin-ids.js";
import { normalizePluginsConfig } from "../plugins/config-state.js";
import { loadOpenClawPlugins } from "../plugins/loader.js";
@@ -448,6 +449,7 @@ export function loadGatewayPlugins(params: {
pluginIds?: string[];
preferSetupRuntimeForChannelPlugins?: boolean;
suppressPluginInfoLogs?: boolean;
bundledRuntimeDepsInstaller?: (params: BundledRuntimeDepsInstallParams) => void;
}) {
const activationAutoEnabled =
params.activationSourceConfig !== undefined
@@ -510,6 +512,7 @@ export function loadGatewayPlugins(params: {
allowGatewaySubagentBinding: true,
},
preferSetupRuntimeForChannelPlugins: params.preferSetupRuntimeForChannelPlugins,
bundledRuntimeDepsInstaller: params.bundledRuntimeDepsInstaller,
});
const pluginMethods = Object.keys(pluginRegistry.gatewayHandlers);
const gatewayMethods = Array.from(new Set([...params.baseMethods, ...pluginMethods]));

View File

@@ -0,0 +1,191 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const applyPluginAutoEnable = vi.hoisted(() =>
vi.fn((params: { config: unknown }) => ({
config: params.config,
changes: [],
autoEnabledReasons: {},
})),
);
const initSubagentRegistry = vi.hoisted(() => vi.fn());
const loadGatewayStartupPlugins = vi.hoisted(() =>
vi.fn((_params: unknown) => ({
pluginRegistry: { diagnostics: [], gatewayHandlers: {}, plugins: [] },
gatewayMethods: ["ping"],
})),
);
const repairBundledRuntimeDepsInstallRootAsync = vi.hoisted(() =>
vi.fn(async (_params: unknown) => ({})),
);
const resolveBundledRuntimeDependencyPackageInstallRoot = vi.hoisted(() =>
vi.fn((_packageRoot: string, _params: unknown) => "/runtime"),
);
const resolveConfiguredDeferredChannelPluginIds = vi.hoisted(() => vi.fn((_params: unknown) => []));
const resolveGatewayStartupPluginIds = vi.hoisted(() => vi.fn((_params: unknown) => ["telegram"]));
const resolveOpenClawPackageRootSync = vi.hoisted(() => vi.fn((_params: unknown) => "/package"));
const runChannelPluginStartupMaintenance = vi.hoisted(() =>
vi.fn(async (_params: unknown) => undefined),
);
const runStartupSessionMigration = vi.hoisted(() => vi.fn(async (_params: unknown) => undefined));
const scanBundledPluginRuntimeDeps = vi.hoisted(() =>
vi.fn((_params: unknown) => ({
deps: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }],
missing: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }],
conflicts: [],
})),
);
vi.mock("../agents/agent-scope.js", () => ({
resolveAgentWorkspaceDir: () => "/workspace",
resolveDefaultAgentId: () => "default",
}));
vi.mock("../agents/subagent-registry.js", () => ({
initSubagentRegistry: () => initSubagentRegistry(),
}));
vi.mock("../channels/plugins/lifecycle-startup.js", () => ({
runChannelPluginStartupMaintenance: (params: unknown) =>
runChannelPluginStartupMaintenance(params),
}));
vi.mock("../config/plugin-auto-enable.js", () => ({
applyPluginAutoEnable: (params: { config: unknown }) => applyPluginAutoEnable(params),
}));
vi.mock("../infra/openclaw-root.js", () => ({
resolveOpenClawPackageRootSync: (params: unknown) => resolveOpenClawPackageRootSync(params),
}));
vi.mock("../plugins/bundled-runtime-deps.js", () => ({
repairBundledRuntimeDepsInstallRootAsync: (params: unknown) =>
repairBundledRuntimeDepsInstallRootAsync(params),
resolveBundledRuntimeDependencyPackageInstallRoot: (packageRoot: string, params: unknown) =>
resolveBundledRuntimeDependencyPackageInstallRoot(packageRoot, params),
scanBundledPluginRuntimeDeps: (params: unknown) => scanBundledPluginRuntimeDeps(params),
}));
vi.mock("../plugins/channel-plugin-ids.js", () => ({
resolveConfiguredDeferredChannelPluginIds: (params: unknown) =>
resolveConfiguredDeferredChannelPluginIds(params),
resolveGatewayStartupPluginIds: (params: unknown) => resolveGatewayStartupPluginIds(params),
}));
vi.mock("../plugins/registry.js", () => ({
createEmptyPluginRegistry: () => ({ diagnostics: [], gatewayHandlers: {}, plugins: [] }),
}));
vi.mock("../plugins/runtime.js", () => ({
getActivePluginRegistry: () => undefined,
setActivePluginRegistry: vi.fn(),
}));
vi.mock("./server-methods-list.js", () => ({
listGatewayMethods: () => ["ping"],
}));
vi.mock("./server-methods.js", () => ({
coreGatewayHandlers: {},
}));
vi.mock("./server-plugin-bootstrap.js", () => ({
loadGatewayStartupPlugins: (params: unknown) => loadGatewayStartupPlugins(params),
}));
vi.mock("./server-startup-session-migration.js", () => ({
runStartupSessionMigration: (params: unknown) => runStartupSessionMigration(params),
}));
function createLog() {
return {
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
debug: vi.fn(),
};
}
describe("prepareGatewayPluginBootstrap runtime-deps staging", () => {
beforeEach(() => {
applyPluginAutoEnable.mockClear();
initSubagentRegistry.mockClear();
loadGatewayStartupPlugins.mockClear();
repairBundledRuntimeDepsInstallRootAsync.mockReset().mockResolvedValue({});
resolveBundledRuntimeDependencyPackageInstallRoot.mockClear();
resolveConfiguredDeferredChannelPluginIds.mockClear();
resolveGatewayStartupPluginIds.mockClear().mockReturnValue(["telegram"]);
resolveOpenClawPackageRootSync.mockClear().mockReturnValue("/package");
runChannelPluginStartupMaintenance.mockClear();
runStartupSessionMigration.mockClear();
scanBundledPluginRuntimeDeps.mockClear().mockReturnValue({
deps: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }],
missing: [{ name: "grammy", version: "1.37.0", pluginIds: ["telegram"] }],
conflicts: [],
});
});
it("falls back to per-plugin runtime-deps installs after failed pre-start staging", async () => {
const installError = new Error("offline registry");
repairBundledRuntimeDepsInstallRootAsync.mockRejectedValueOnce(installError);
const log = createLog();
const { prepareGatewayPluginBootstrap } = await import("./server-startup-plugins.js");
await expect(
prepareGatewayPluginBootstrap({
cfgAtStart: {},
startupRuntimeConfig: {},
minimalTestGateway: false,
log,
}),
).resolves.toMatchObject({
baseGatewayMethods: ["ping"],
startupPluginIds: ["telegram"],
});
expect(loadGatewayStartupPlugins).toHaveBeenCalledOnce();
expect(scanBundledPluginRuntimeDeps).toHaveBeenCalledWith(
expect.objectContaining({
selectedPluginIds: ["telegram"],
}),
);
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining(
"gateway startup will continue with per-plugin runtime-deps installs",
),
);
expect(loadGatewayStartupPlugins.mock.calls[0]?.[0]).not.toHaveProperty(
"bundledRuntimeDepsInstaller",
);
});
it("falls back to per-plugin runtime-deps installs after failed pre-start scan", async () => {
scanBundledPluginRuntimeDeps.mockImplementationOnce(() => {
throw new Error("unsupported runtime dependency spec");
});
const log = createLog();
const { prepareGatewayPluginBootstrap } = await import("./server-startup-plugins.js");
await expect(
prepareGatewayPluginBootstrap({
cfgAtStart: {},
startupRuntimeConfig: {},
minimalTestGateway: false,
log,
}),
).resolves.toMatchObject({
baseGatewayMethods: ["ping"],
startupPluginIds: ["telegram"],
});
expect(repairBundledRuntimeDepsInstallRootAsync).not.toHaveBeenCalled();
expect(loadGatewayStartupPlugins).toHaveBeenCalledOnce();
expect(log.warn).toHaveBeenCalledWith(
expect.stringContaining(
"failed to scan bundled runtime deps before gateway startup; gateway startup will continue with per-plugin runtime-deps installs",
),
);
expect(loadGatewayStartupPlugins.mock.calls[0]?.[0]).not.toHaveProperty(
"bundledRuntimeDepsInstaller",
);
});
});

View File

@@ -3,6 +3,12 @@ import { initSubagentRegistry } from "../agents/subagent-registry.js";
import { runChannelPluginStartupMaintenance } from "../channels/plugins/lifecycle-startup.js";
import { applyPluginAutoEnable } from "../config/plugin-auto-enable.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveOpenClawPackageRootSync } from "../infra/openclaw-root.js";
import {
repairBundledRuntimeDepsInstallRootAsync,
resolveBundledRuntimeDependencyPackageInstallRoot,
scanBundledPluginRuntimeDeps,
} from "../plugins/bundled-runtime-deps.js";
import {
resolveConfiguredDeferredChannelPluginIds,
resolveGatewayStartupPluginIds,
@@ -20,6 +26,73 @@ type GatewayPluginBootstrapLog = {
debug: (message: string) => void;
};
async function prestageGatewayBundledRuntimeDeps(params: {
cfg: OpenClawConfig;
pluginIds: readonly string[];
log: GatewayPluginBootstrapLog;
}): Promise<void> {
if (params.pluginIds.length === 0) {
return;
}
const packageRoot = resolveOpenClawPackageRootSync({
argv1: process.argv[1],
cwd: process.cwd(),
moduleUrl: import.meta.url,
});
if (!packageRoot) {
return;
}
let scanResult: ReturnType<typeof scanBundledPluginRuntimeDeps>;
try {
scanResult = scanBundledPluginRuntimeDeps({
packageRoot,
config: params.cfg,
selectedPluginIds: [...params.pluginIds],
env: process.env,
});
} catch (error) {
params.log.warn(
`[plugins] failed to scan bundled runtime deps before gateway startup; gateway startup will continue with per-plugin runtime-deps installs: ${String(error)}`,
);
return;
}
const { deps, missing, conflicts } = scanResult;
if (conflicts.length > 0) {
params.log.warn(
`[plugins] bundled runtime deps have version conflicts: ${conflicts.map((conflict) => `${conflict.name} (${conflict.versions.join(", ")})`).join("; ")}`,
);
}
if (missing.length === 0) {
return;
}
const missingSpecs = missing.map((dep) => `${dep.name}@${dep.version}`);
const installSpecs = deps.map((dep) => `${dep.name}@${dep.version}`);
const installRoot = resolveBundledRuntimeDependencyPackageInstallRoot(packageRoot, {
env: process.env,
});
const startedAt = Date.now();
params.log.info(
`[plugins] staging bundled runtime deps before gateway startup (${missingSpecs.length} missing, ${installSpecs.length} install specs): ${missingSpecs.join(", ")}`,
);
try {
await repairBundledRuntimeDepsInstallRootAsync({
installRoot,
missingSpecs,
installSpecs,
env: process.env,
warn: (message) => params.log.warn(`[plugins] ${message}`),
});
} catch (error) {
params.log.warn(
`[plugins] failed to stage bundled runtime deps before gateway startup after ${Date.now() - startedAt}ms; gateway startup will continue with per-plugin runtime-deps installs: ${String(error)}`,
);
return;
}
params.log.info(
`[plugins] installed bundled runtime deps before gateway startup in ${Date.now() - startedAt}ms: ${missingSpecs.join(", ")}`,
);
}
export async function prepareGatewayPluginBootstrap(params: {
cfgAtStart: OpenClawConfig;
startupRuntimeConfig: OpenClawConfig;
@@ -88,6 +161,11 @@ export async function prepareGatewayPluginBootstrap(params: {
let baseGatewayMethods = baseMethods;
if (!params.minimalTestGateway) {
await prestageGatewayBundledRuntimeDeps({
cfg: gatewayPluginConfigAtStart,
pluginIds: startupPluginIds,
log: params.log,
});
({ pluginRegistry, gatewayMethods: baseGatewayMethods } = loadGatewayStartupPlugins({
cfg: gatewayPluginConfigAtStart,
activationSourceConfig: params.cfgAtStart,

View File

@@ -30,6 +30,7 @@ import { enqueueSystemEvent } from "../infra/system-events.js";
import type { VoiceWakeRoutingConfig } from "../infra/voicewake-routing.js";
import { startDiagnosticHeartbeat, stopDiagnosticHeartbeat } from "../logging/diagnostic.js";
import { createSubsystemLogger, runtimeForLogger } from "../logging/subsystem.js";
import { getActiveBundledRuntimeDepsInstallCount } from "../plugins/bundled-runtime-deps-activity.js";
import { runGlobalGatewayStopSafely } from "../plugins/hook-runner-global.js";
import { createRuntimeChannel } from "../plugins/runtime/runtime-channel.js";
import type { PluginRuntime } from "../plugins/runtime/types.js";
@@ -328,6 +329,7 @@ export async function startGatewayServer(
getTotalQueueSize() +
getTotalPendingReplies() +
getActiveEmbeddedRunCount() +
getActiveBundledRuntimeDepsInstallCount() +
getInspectableTaskRegistrySummary().active,
);
// Unconditional startup migration: seed gateway.controlUi.allowedOrigins for existing

View File

@@ -0,0 +1,102 @@
export type BundledRuntimeDepsInstallActivity = {
id: number;
installRoot: string;
missingSpecs: string[];
installSpecs: string[];
pluginId?: string;
startedAtMs: number;
};
type IdleWaiter = () => void;
let nextActivityId = 1;
const activeInstalls = new Map<number, BundledRuntimeDepsInstallActivity>();
const idleWaiters = new Set<IdleWaiter>();
function notifyIdleWaiters(): void {
if (activeInstalls.size > 0) {
return;
}
const waiters = [...idleWaiters];
idleWaiters.clear();
for (const waiter of waiters) {
waiter();
}
}
export function beginBundledRuntimeDepsInstall(params: {
installRoot: string;
missingSpecs: readonly string[];
installSpecs?: readonly string[];
pluginId?: string;
}): () => void {
const id = nextActivityId++;
activeInstalls.set(id, {
id,
installRoot: params.installRoot,
missingSpecs: [...params.missingSpecs],
installSpecs: [...(params.installSpecs ?? params.missingSpecs)],
...(params.pluginId ? { pluginId: params.pluginId } : {}),
startedAtMs: Date.now(),
});
let ended = false;
return () => {
if (ended) {
return;
}
ended = true;
activeInstalls.delete(id);
notifyIdleWaiters();
};
}
export function getActiveBundledRuntimeDepsInstallCount(): number {
return activeInstalls.size;
}
export function listActiveBundledRuntimeDepsInstalls(): BundledRuntimeDepsInstallActivity[] {
return [...activeInstalls.values()].toSorted((left, right) => left.id - right.id);
}
export async function waitForBundledRuntimeDepsInstallIdle(
timeoutMs?: number,
): Promise<{ drained: boolean; active: number }> {
if (activeInstalls.size === 0) {
return { drained: true, active: 0 };
}
return await new Promise((resolve) => {
let settled = false;
let timer: ReturnType<typeof setTimeout> | null = null;
const cleanup = () => {
if (timer) {
clearTimeout(timer);
timer = null;
}
idleWaiters.delete(onIdle);
};
const settle = (drained: boolean) => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve({ drained, active: activeInstalls.size });
};
const onIdle = () => settle(true);
idleWaiters.add(onIdle);
if (typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs >= 0) {
timer = setTimeout(() => settle(false), Math.floor(timeoutMs));
timer.unref?.();
}
});
}
export const __testing = {
resetBundledRuntimeDepsInstallActivity(): void {
activeInstalls.clear();
notifyIdleWaiters();
idleWaiters.clear();
nextActivityId = 1;
},
};

View File

@@ -4,6 +4,11 @@ import fs from "node:fs";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
__testing as bundledRuntimeDepsActivityTesting,
getActiveBundledRuntimeDepsInstallCount,
waitForBundledRuntimeDepsInstallIdle,
} from "./bundled-runtime-deps-activity.js";
import {
__testing as bundledRuntimeDepsTesting,
createBundledRuntimeDependencyAliasMap,
@@ -12,6 +17,7 @@ import {
ensureBundledPluginRuntimeDeps,
installBundledRuntimeDeps,
isWritableDirectory,
repairBundledRuntimeDepsInstallRootAsync,
resolveBundledRuntimeDependencyInstallRoot,
resolveBundledRuntimeDepsNpmRunner,
scanBundledPluginRuntimeDeps,
@@ -85,6 +91,7 @@ function statfsFixture(params: {
afterEach(() => {
vi.restoreAllMocks();
spawnSyncMock.mockReset();
bundledRuntimeDepsActivityTesting.resetBundledRuntimeDepsInstallActivity();
for (const dir of tempDirs.splice(0)) {
fs.rmSync(dir, { recursive: true, force: true });
}
@@ -773,6 +780,22 @@ describe("scanBundledPluginRuntimeDeps config policy", () => {
]);
});
it("trusts preselected startup plugin ids without reapplying config policy", () => {
const result = scanBundledPluginRuntimeDeps({
packageRoot: setupPolicyPackageRoot(),
selectedPluginIds: ["telegram"],
config: {
plugins: { allow: ["browser"] },
channels: { telegram: { botToken: "123:abc" } },
},
});
expect(result.deps.map((dep) => `${dep.name}@${dep.version}`)).toEqual([
"telegram-runtime@2.0.0",
]);
expect(result.conflicts).toEqual([]);
});
it("reads each bundled plugin manifest once per runtime-deps scan", () => {
const packageRoot = makeTempDir();
const pluginRoot = writeBundledPluginPackage({
@@ -1217,6 +1240,67 @@ describe("ensureBundledPluginRuntimeDeps", () => {
]);
});
it("tracks active runtime-deps installs until the installer returns", async () => {
const packageRoot = makeTempDir();
const pluginRoot = path.join(packageRoot, "dist", "extensions", "browser");
fs.mkdirSync(pluginRoot, { recursive: true });
fs.writeFileSync(
path.join(pluginRoot, "package.json"),
JSON.stringify({ dependencies: { "browser-runtime": "1.0.0" } }),
);
let idleWait: Promise<{ drained: boolean; active: number }> | null = null;
expect(getActiveBundledRuntimeDepsInstallCount()).toBe(0);
const result = ensureBundledPluginRuntimeDeps({
env: {},
installDeps: (params) => {
expect(getActiveBundledRuntimeDepsInstallCount()).toBe(1);
idleWait = waitForBundledRuntimeDepsInstallIdle();
writeInstalledPackage(params.installRoot, "browser-runtime", "1.0.0");
},
pluginId: "browser",
pluginRoot,
});
expect(result).toEqual({
installedSpecs: ["browser-runtime@1.0.0"],
retainSpecs: ["browser-runtime@1.0.0"],
});
expect(getActiveBundledRuntimeDepsInstallCount()).toBe(0);
await expect(idleWait).resolves.toEqual({ drained: true, active: 0 });
});
it("keeps async repair locks and activity active until npm staging settles", async () => {
const installRoot = makeTempDir();
const lockDir = path.join(installRoot, ".openclaw-runtime-deps.lock");
let releaseInstall!: () => void;
const repair = repairBundledRuntimeDepsInstallRootAsync({
installRoot,
missingSpecs: ["browser-runtime@1.0.0"],
installSpecs: ["browser-runtime@1.0.0"],
env: {},
installDeps: async (params) => {
expect(fs.existsSync(lockDir)).toBe(true);
expect(getActiveBundledRuntimeDepsInstallCount()).toBe(1);
await new Promise<void>((resolve) => {
releaseInstall = () => {
writeInstalledPackage(params.installRoot, "browser-runtime", "1.0.0");
resolve();
};
});
},
});
await Promise.resolve();
expect(fs.existsSync(lockDir)).toBe(true);
expect(getActiveBundledRuntimeDepsInstallCount()).toBe(1);
releaseInstall();
await expect(repair).resolves.toEqual({ installSpecs: ["browser-runtime@1.0.0"] });
expect(fs.existsSync(lockDir)).toBe(false);
expect(getActiveBundledRuntimeDepsInstallCount()).toBe(0);
});
it("does not expire active runtime-deps install locks by age alone", () => {
expect(
bundledRuntimeDepsTesting.shouldRemoveRuntimeDepsLock(

View File

@@ -1,4 +1,4 @@
import { spawnSync } from "node:child_process";
import { spawn, spawnSync } from "node:child_process";
import { createHash } from "node:crypto";
import fs from "node:fs";
import { Module } from "node:module";
@@ -10,6 +10,7 @@ import { createLowDiskSpaceWarning } from "../infra/disk-space.js";
import { resolveHomeRelativePath } from "../infra/home-dir.js";
import { createNpmProjectInstallEnv } from "../infra/npm-install-env.js";
import { normalizeOptionalLowercaseString } from "../shared/string-coerce.js";
import { beginBundledRuntimeDepsInstall } from "./bundled-runtime-deps-activity.js";
import { normalizePluginsConfig } from "./config-state.js";
import { satisfies, validRange, validSemver } from "./semver.runtime.js";
@@ -187,6 +188,10 @@ function sleepSync(ms: number): void {
Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms);
}
async function sleep(ms: number): Promise<void> {
await new Promise((resolve) => setTimeout(resolve, ms));
}
function isProcessAlive(pid: number): boolean {
if (!Number.isInteger(pid) || pid <= 0) {
return false;
@@ -985,11 +990,15 @@ function isBundledPluginConfiguredForRuntimeDeps(params: {
function shouldIncludeBundledPluginRuntimeDeps(params: {
config?: OpenClawConfig;
pluginIds?: ReadonlySet<string>;
selectedPluginIds?: ReadonlySet<string>;
pluginId: string;
pluginDir: string;
includeConfiguredChannels?: boolean;
manifestCache?: BundledPluginRuntimeDepsManifestCache;
}): boolean {
if (params.selectedPluginIds) {
return params.selectedPluginIds.has(params.pluginId);
}
const scopedToPluginIds = Boolean(params.pluginIds);
if (params.pluginIds) {
if (!params.pluginIds.has(params.pluginId)) {
@@ -1023,6 +1032,7 @@ function collectBundledPluginRuntimeDeps(params: {
extensionsDir: string;
config?: OpenClawConfig;
pluginIds?: ReadonlySet<string>;
selectedPluginIds?: ReadonlySet<string>;
includeConfiguredChannels?: boolean;
}): {
deps: RuntimeDepEntry[];
@@ -1041,6 +1051,7 @@ function collectBundledPluginRuntimeDeps(params: {
!shouldIncludeBundledPluginRuntimeDeps({
config: params.config,
pluginIds: params.pluginIds,
selectedPluginIds: params.selectedPluginIds,
pluginId,
pluginDir,
includeConfiguredChannels: params.includeConfiguredChannels,
@@ -1117,6 +1128,7 @@ export function scanBundledPluginRuntimeDeps(params: {
packageRoot: string;
config?: OpenClawConfig;
pluginIds?: readonly string[];
selectedPluginIds?: readonly string[];
includeConfiguredChannels?: boolean;
env?: NodeJS.ProcessEnv;
}): {
@@ -1135,6 +1147,7 @@ export function scanBundledPluginRuntimeDeps(params: {
extensionsDir,
config: params.config,
pluginIds: normalizePluginIdSet(params.pluginIds),
selectedPluginIds: normalizePluginIdSet(params.selectedPluginIds),
includeConfiguredChannels: params.includeConfiguredChannels,
});
const packageInstallRoot = resolveBundledRuntimeDependencyPackageInstallRoot(params.packageRoot, {
@@ -1250,6 +1263,64 @@ function ensureNpmInstallExecutionManifest(installExecutionRoot: string): void {
"utf8",
);
}
function formatBundledRuntimeDepsInstallError(result: {
error?: Error;
signal?: NodeJS.Signals | null;
status?: number | null;
stderr?: string | Buffer | null;
stdout?: string | Buffer | null;
}): string {
const output = [
result.error?.message,
result.signal ? `terminated by ${result.signal}` : null,
result.stderr,
result.stdout,
]
.filter(Boolean)
.join("\n")
.trim();
return output || "npm install failed";
}
async function spawnBundledRuntimeDepsInstall(params: {
command: string;
args: string[];
cwd: string;
env: NodeJS.ProcessEnv;
}): Promise<void> {
await new Promise<void>((resolve, reject) => {
const child = spawn(params.command, params.args, {
cwd: params.cwd,
env: params.env,
stdio: ["ignore", "pipe", "pipe"],
});
const stdout: Buffer[] = [];
const stderr: Buffer[] = [];
child.stdout?.on("data", (chunk: Buffer) => stdout.push(chunk));
child.stderr?.on("data", (chunk: Buffer) => stderr.push(chunk));
child.on("error", (error) => {
reject(new Error(formatBundledRuntimeDepsInstallError({ error })));
});
child.on("close", (status, signal) => {
if (status === 0 && !signal) {
resolve();
return;
}
reject(
new Error(
formatBundledRuntimeDepsInstallError({
status,
signal,
stdout: Buffer.concat(stdout).toString("utf8"),
stderr: Buffer.concat(stderr).toString("utf8"),
}),
),
);
});
});
}
export function installBundledRuntimeDeps(params: {
installRoot: string;
installExecutionRoot?: string;
@@ -1296,11 +1367,7 @@ export function installBundledRuntimeDeps(params: {
stdio: "pipe",
});
if (result.status !== 0 || result.error) {
const output = [result.error?.message, result.stderr, result.stdout]
.filter(Boolean)
.join("\n")
.trim();
throw new Error(output || "npm install failed");
throw new Error(formatBundledRuntimeDepsInstallError(result));
}
assertBundledRuntimeDepsInstalled(installExecutionRoot, params.missingSpecs);
if (isolatedExecutionRoot) {
@@ -1323,6 +1390,68 @@ export function installBundledRuntimeDeps(params: {
}
}
export async function installBundledRuntimeDepsAsync(params: {
installRoot: string;
installExecutionRoot?: string;
linkNodeModulesFromExecutionRoot?: boolean;
missingSpecs: string[];
env: NodeJS.ProcessEnv;
warn?: (message: string) => void;
}): Promise<void> {
const installExecutionRoot = params.installExecutionRoot ?? params.installRoot;
const isolatedExecutionRoot =
path.resolve(installExecutionRoot) !== path.resolve(params.installRoot);
const cleanInstallExecutionRoot =
isolatedExecutionRoot &&
shouldCleanBundledRuntimeDepsInstallExecutionRoot({
installRoot: params.installRoot,
installExecutionRoot,
});
try {
fs.mkdirSync(params.installRoot, { recursive: true });
fs.mkdirSync(installExecutionRoot, { recursive: true });
const diskWarning = createLowDiskSpaceWarning({
targetPath: installExecutionRoot,
purpose: "bundled plugin runtime dependency staging",
});
if (diskWarning) {
params.warn?.(diskWarning);
}
ensureNpmInstallExecutionManifest(installExecutionRoot);
const installEnv = createBundledRuntimeDepsInstallEnv(params.env, {
cacheDir: path.join(installExecutionRoot, ".openclaw-npm-cache"),
});
const npmRunner = resolveBundledRuntimeDepsNpmRunner({
env: installEnv,
npmArgs: createBundledRuntimeDepsInstallArgs(params.missingSpecs),
});
await spawnBundledRuntimeDepsInstall({
command: npmRunner.command,
args: npmRunner.args,
cwd: installExecutionRoot,
env: npmRunner.env ?? installEnv,
});
assertBundledRuntimeDepsInstalled(installExecutionRoot, params.missingSpecs);
if (isolatedExecutionRoot) {
const stagedNodeModulesDir = path.join(installExecutionRoot, "node_modules");
if (!fs.existsSync(stagedNodeModulesDir)) {
throw new Error("npm install did not produce node_modules");
}
const targetNodeModulesDir = path.join(params.installRoot, "node_modules");
if (params.linkNodeModulesFromExecutionRoot) {
replaceNodeModulesDirFromCache(targetNodeModulesDir, stagedNodeModulesDir);
} else {
replaceNodeModulesDir(targetNodeModulesDir, stagedNodeModulesDir);
}
assertBundledRuntimeDepsInstalled(params.installRoot, params.missingSpecs);
}
} finally {
if (cleanInstallExecutionRoot) {
fs.rmSync(installExecutionRoot, { recursive: true, force: true });
}
}
}
export function repairBundledRuntimeDepsInstallRoot(params: {
installRoot: string;
missingSpecs: string[];
@@ -1345,11 +1474,113 @@ export function repairBundledRuntimeDepsInstallRoot(params: {
env: params.env,
warn: params.warn,
}));
install({
const finishActivity = beginBundledRuntimeDepsInstall({
installRoot: params.installRoot,
missingSpecs: params.missingSpecs,
installSpecs,
});
try {
install({
installRoot: params.installRoot,
missingSpecs: params.missingSpecs,
installSpecs,
});
} finally {
finishActivity();
}
writeRetainedRuntimeDepsManifest(params.installRoot, installSpecs);
return { installSpecs };
});
}
async function withBundledRuntimeDepsInstallRootLockAsync<T>(
installRoot: string,
run: () => Promise<T>,
): Promise<T> {
fs.mkdirSync(installRoot, { recursive: true });
const lockDir = path.join(installRoot, BUNDLED_RUNTIME_DEPS_LOCK_DIR);
const startedAt = Date.now();
let locked = false;
while (!locked) {
try {
fs.mkdirSync(lockDir);
try {
fs.writeFileSync(
path.join(lockDir, BUNDLED_RUNTIME_DEPS_LOCK_OWNER_FILE),
`${JSON.stringify({ pid: process.pid, createdAtMs: Date.now() }, null, 2)}\n`,
"utf8",
);
} catch (ownerWriteError) {
fs.rmSync(lockDir, { recursive: true, force: true });
throw ownerWriteError;
}
locked = true;
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== "EEXIST") {
throw error;
}
removeRuntimeDepsLockIfStale(lockDir, Date.now());
const nowMs = Date.now();
if (nowMs - startedAt > BUNDLED_RUNTIME_DEPS_LOCK_TIMEOUT_MS) {
throw new Error(
formatRuntimeDepsLockTimeoutMessage({
lockDir,
owner: readRuntimeDepsLockOwner(lockDir),
waitedMs: nowMs - startedAt,
nowMs,
}),
{
cause: error,
},
);
}
await sleep(BUNDLED_RUNTIME_DEPS_LOCK_WAIT_MS);
}
}
try {
return await run();
} finally {
fs.rmSync(lockDir, { recursive: true, force: true });
}
}
export async function repairBundledRuntimeDepsInstallRootAsync(params: {
installRoot: string;
missingSpecs: string[];
installSpecs: string[];
env: NodeJS.ProcessEnv;
installDeps?: (params: BundledRuntimeDepsInstallParams) => Promise<void>;
warn?: (message: string) => void;
}): Promise<{ installSpecs: string[] }> {
return await withBundledRuntimeDepsInstallRootLockAsync(params.installRoot, async () => {
const retainedManifestSpecs = readRetainedRuntimeDepsManifest(params.installRoot);
const installSpecs = [...new Set([...retainedManifestSpecs, ...params.installSpecs])].toSorted(
(left, right) => left.localeCompare(right),
);
const install =
params.installDeps ??
((installParams) =>
installBundledRuntimeDepsAsync({
installRoot: installParams.installRoot,
missingSpecs: installParams.installSpecs ?? installParams.missingSpecs,
env: params.env,
warn: params.warn,
}));
const finishActivity = beginBundledRuntimeDepsInstall({
installRoot: params.installRoot,
missingSpecs: params.missingSpecs,
installSpecs,
});
try {
await install({
installRoot: params.installRoot,
missingSpecs: params.missingSpecs,
installSpecs,
});
} finally {
finishActivity();
}
writeRetainedRuntimeDepsManifest(params.installRoot, installSpecs);
return { installSpecs };
});
@@ -1458,13 +1689,23 @@ export function ensureBundledPluginRuntimeDeps(params: {
missingSpecs: installParams.installSpecs ?? installParams.missingSpecs,
env: params.env,
}));
install({
const finishActivity = beginBundledRuntimeDepsInstall({
installRoot,
installExecutionRoot,
...(sourceCheckoutCacheStage ? { linkNodeModulesFromExecutionRoot: true } : {}),
missingSpecs,
installSpecs,
pluginId: params.pluginId,
});
try {
install({
installRoot,
installExecutionRoot,
...(sourceCheckoutCacheStage ? { linkNodeModulesFromExecutionRoot: true } : {}),
missingSpecs,
installSpecs,
});
} finally {
finishActivity();
}
const cacheAlreadyPopulated = Boolean(
sourceCheckoutCacheStage && hasAllDependencySentinels(sourceCheckoutCacheStage, deps),
);