test: add weighted Docker aggregate scheduler

This commit is contained in:
Peter Steinberger
2026-04-24 20:41:20 +01:00
parent 88c91675e2
commit 0f689d22f4
4 changed files with 279 additions and 64 deletions

View File

@@ -11,81 +11,166 @@ const DEFAULT_TAIL_PARALLELISM = 10;
const DEFAULT_FAILURE_TAIL_LINES = 80;
const DEFAULT_LANE_TIMEOUT_MS = 120 * 60 * 1000;
const DEFAULT_LANE_START_STAGGER_MS = 2_000;
const DEFAULT_RESOURCE_LIMITS = {
docker: DEFAULT_PARALLELISM,
live: 4,
npm: 4,
service: 5,
};
const bundledChannelLaneCommand =
"OPENCLAW_SKIP_DOCKER_BUILD=1 OPENCLAW_BUNDLED_CHANNEL_UPDATE_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_ROOT_OWNED_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_SETUP_ENTRY_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_LOAD_FAILURE_SCENARIO=0 pnpm test:docker:bundled-channel-deps";
function lane(name, command, options = {}) {
return {
command,
name,
resources: options.resources ?? [],
weight: options.weight ?? 1,
};
}
function liveLane(name, command, options = {}) {
return lane(name, command, {
resources: ["live", ...(options.resources ?? [])],
weight: options.weight ?? 3,
});
}
function npmLane(name, command, options = {}) {
return lane(name, command, {
resources: ["npm", ...(options.resources ?? [])],
weight: options.weight ?? 2,
});
}
function serviceLane(name, command, options = {}) {
return lane(name, command, {
resources: ["service", ...(options.resources ?? [])],
weight: options.weight ?? 2,
});
}
const bundledScenarioLanes = [
["bundled-channel-telegram", `OPENCLAW_BUNDLED_CHANNELS=telegram ${bundledChannelLaneCommand}`],
["bundled-channel-discord", `OPENCLAW_BUNDLED_CHANNELS=discord ${bundledChannelLaneCommand}`],
["bundled-channel-slack", `OPENCLAW_BUNDLED_CHANNELS=slack ${bundledChannelLaneCommand}`],
["bundled-channel-feishu", `OPENCLAW_BUNDLED_CHANNELS=feishu ${bundledChannelLaneCommand}`],
[
npmLane(
"bundled-channel-telegram",
`OPENCLAW_BUNDLED_CHANNELS=telegram ${bundledChannelLaneCommand}`,
),
npmLane(
"bundled-channel-discord",
`OPENCLAW_BUNDLED_CHANNELS=discord ${bundledChannelLaneCommand}`,
),
npmLane("bundled-channel-slack", `OPENCLAW_BUNDLED_CHANNELS=slack ${bundledChannelLaneCommand}`),
npmLane(
"bundled-channel-feishu",
`OPENCLAW_BUNDLED_CHANNELS=feishu ${bundledChannelLaneCommand}`,
),
npmLane(
"bundled-channel-memory-lancedb",
`OPENCLAW_BUNDLED_CHANNELS=memory-lancedb ${bundledChannelLaneCommand}`,
],
[
),
npmLane(
"bundled-channel-update",
"OPENCLAW_SKIP_DOCKER_BUILD=1 OPENCLAW_BUNDLED_CHANNEL_SCENARIOS=0 OPENCLAW_BUNDLED_CHANNEL_UPDATE_SCENARIO=1 OPENCLAW_BUNDLED_CHANNEL_ROOT_OWNED_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_SETUP_ENTRY_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_LOAD_FAILURE_SCENARIO=0 pnpm test:docker:bundled-channel-deps",
],
[
),
npmLane(
"bundled-channel-root-owned",
"OPENCLAW_SKIP_DOCKER_BUILD=1 OPENCLAW_BUNDLED_CHANNEL_SCENARIOS=0 OPENCLAW_BUNDLED_CHANNEL_UPDATE_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_ROOT_OWNED_SCENARIO=1 OPENCLAW_BUNDLED_CHANNEL_SETUP_ENTRY_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_LOAD_FAILURE_SCENARIO=0 pnpm test:docker:bundled-channel-deps",
],
[
),
npmLane(
"bundled-channel-setup-entry",
"OPENCLAW_SKIP_DOCKER_BUILD=1 OPENCLAW_BUNDLED_CHANNEL_SCENARIOS=0 OPENCLAW_BUNDLED_CHANNEL_UPDATE_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_ROOT_OWNED_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_SETUP_ENTRY_SCENARIO=1 OPENCLAW_BUNDLED_CHANNEL_LOAD_FAILURE_SCENARIO=0 pnpm test:docker:bundled-channel-deps",
],
[
),
npmLane(
"bundled-channel-load-failure",
"OPENCLAW_SKIP_DOCKER_BUILD=1 OPENCLAW_BUNDLED_CHANNEL_SCENARIOS=0 OPENCLAW_BUNDLED_CHANNEL_UPDATE_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_ROOT_OWNED_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_SETUP_ENTRY_SCENARIO=0 OPENCLAW_BUNDLED_CHANNEL_LOAD_FAILURE_SCENARIO=1 pnpm test:docker:bundled-channel-deps",
],
),
];
const lanes = [
["live-models", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-models"],
["live-gateway", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-gateway"],
[
liveLane("live-models", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-models", {
weight: 4,
}),
liveLane("live-gateway", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-gateway", {
weight: 4,
}),
liveLane(
"live-cli-backend-claude",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-cli-backend:claude",
],
[
{ resources: ["npm"], weight: 3 },
),
liveLane(
"live-cli-backend-gemini",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-cli-backend:gemini",
],
["openwebui", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:openwebui"],
["onboard", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:onboard"],
[
{ resources: ["npm"], weight: 3 },
),
serviceLane("openwebui", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:openwebui", {
weight: 3,
}),
serviceLane("onboard", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:onboard", {
weight: 2,
}),
npmLane(
"npm-onboard-channel-agent",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:npm-onboard-channel-agent",
],
["gateway-network", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:gateway-network"],
["mcp-channels", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:mcp-channels"],
["pi-bundle-mcp-tools", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:pi-bundle-mcp-tools"],
["cron-mcp-cleanup", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:cron-mcp-cleanup"],
["doctor-switch", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:doctor-switch"],
["plugins", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:plugins"],
["plugin-update", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:plugin-update"],
["config-reload", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:config-reload"],
{ resources: ["service"], weight: 3 },
),
serviceLane("gateway-network", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:gateway-network"),
serviceLane("mcp-channels", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:mcp-channels", {
resources: ["npm"],
weight: 3,
}),
lane("pi-bundle-mcp-tools", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:pi-bundle-mcp-tools"),
serviceLane(
"cron-mcp-cleanup",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:cron-mcp-cleanup",
{ resources: ["npm"], weight: 3 },
),
npmLane("doctor-switch", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:doctor-switch", {
weight: 3,
}),
npmLane("plugins", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:plugins", { weight: 2 }),
npmLane("plugin-update", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:plugin-update"),
serviceLane("config-reload", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:config-reload"),
...bundledScenarioLanes,
["openai-image-auth", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:openai-image-auth"],
["qr", "pnpm test:docker:qr"],
lane("openai-image-auth", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:openai-image-auth"),
lane("qr", "pnpm test:docker:qr"),
];
const exclusiveLanes = [
[
serviceLane(
"openai-web-search-minimal",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:openai-web-search-minimal",
],
["live-codex-harness", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-codex-harness"],
["live-codex-bind", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-codex-bind"],
[
),
liveLane(
"live-codex-harness",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-codex-harness",
{ resources: ["npm"], weight: 3 },
),
liveLane("live-codex-bind", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-codex-bind", {
resources: ["npm"],
weight: 3,
}),
liveLane(
"live-cli-backend-codex",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-cli-backend:codex",
],
["live-acp-bind-claude", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-acp-bind:claude"],
["live-acp-bind-codex", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-acp-bind:codex"],
["live-acp-bind-gemini", "OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-acp-bind:gemini"],
{ resources: ["npm"], weight: 3 },
),
liveLane(
"live-acp-bind-claude",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-acp-bind:claude",
{ resources: ["npm"], weight: 3 },
),
liveLane(
"live-acp-bind-codex",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-acp-bind:codex",
{ resources: ["npm"], weight: 3 },
),
liveLane(
"live-acp-bind-gemini",
"OPENCLAW_SKIP_DOCKER_BUILD=1 pnpm test:docker:live-acp-bind:gemini",
{ resources: ["npm"], weight: 3 },
),
];
const tailLanes = exclusiveLanes;
@@ -119,6 +204,41 @@ function parseBool(raw, fallback) {
return !/^(?:0|false|no)$/i.test(raw);
}
function parseResourceLimit(env, resource, parallelism, fallback) {
const envName = `OPENCLAW_DOCKER_ALL_${resource.toUpperCase()}_LIMIT`;
return parsePositiveInt(env[envName], Math.min(parallelism, fallback), envName);
}
function parseSchedulerOptions(env, parallelism) {
const weightLimit = parsePositiveInt(
env.OPENCLAW_DOCKER_ALL_WEIGHT_LIMIT,
parallelism,
"OPENCLAW_DOCKER_ALL_WEIGHT_LIMIT",
);
return {
resourceLimits: {
docker: parseResourceLimit(env, "docker", parallelism, parallelism),
live: parseResourceLimit(env, "live", parallelism, DEFAULT_RESOURCE_LIMITS.live),
npm: parseResourceLimit(env, "npm", parallelism, DEFAULT_RESOURCE_LIMITS.npm),
service: parseResourceLimit(env, "service", parallelism, DEFAULT_RESOURCE_LIMITS.service),
},
weightLimit,
};
}
function laneWeight(poolLane) {
return Math.max(1, poolLane.weight ?? 1);
}
function laneResources(poolLane) {
return ["docker", ...(poolLane.resources ?? [])];
}
function laneSummary(poolLane) {
const resources = laneResources(poolLane).join(",");
return `${poolLane.name}(w=${laneWeight(poolLane)} r=${resources})`;
}
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
@@ -287,7 +407,7 @@ function laneEnv(name, baseEnv, logDir) {
}
async function runLane(lane, baseEnv, logDir, timeoutMs) {
const [name, command] = lane;
const { command, name } = lane;
const logFile = path.join(logDir, `${name}.log`);
const env = laneEnv(name, baseEnv, logDir);
await mkdir(env.OPENCLAW_DOCKER_CLI_TOOLS_DIR, { recursive: true });
@@ -323,7 +443,13 @@ async function runLane(lane, baseEnv, logDir, timeoutMs) {
async function runLanePool(poolLanes, baseEnv, logDir, parallelism, options) {
const failures = [];
let nextIndex = 0;
const pending = [...poolLanes];
const running = new Set();
const active = {
count: 0,
resources: new Map(),
weight: 0,
};
let lastLaneStartAt = 0;
let laneStartQueue = Promise.resolve();
@@ -345,25 +471,96 @@ async function runLanePool(poolLanes, baseEnv, logDir, parallelism, options) {
releaseQueue();
}
async function worker() {
while (nextIndex < poolLanes.length) {
if (options.failFast && failures.length > 0) {
return;
function canStartLane(candidate) {
const weight = laneWeight(candidate);
if (active.count >= parallelism || active.weight + weight > options.weightLimit) {
return false;
}
for (const resource of laneResources(candidate)) {
const limit = options.resourceLimits[resource] ?? options.weightLimit;
const current = active.resources.get(resource) ?? 0;
if (current + weight > limit) {
return false;
}
const lane = poolLanes[nextIndex++];
await waitForLaneStartSlot();
const result = await runLane(lane, baseEnv, logDir, options.timeoutMs);
if (result.status !== 0) {
failures.push(result);
if (options.failFast) {
return;
}
}
return true;
}
function reserve(candidate) {
const weight = laneWeight(candidate);
active.count += 1;
active.weight += weight;
for (const resource of laneResources(candidate)) {
active.resources.set(resource, (active.resources.get(resource) ?? 0) + weight);
}
}
function release(candidate) {
const weight = laneWeight(candidate);
active.count -= 1;
active.weight -= weight;
for (const resource of laneResources(candidate)) {
const next = (active.resources.get(resource) ?? 0) - weight;
if (next > 0) {
active.resources.set(resource, next);
} else {
active.resources.delete(resource);
}
}
}
const workerCount = Math.min(parallelism, poolLanes.length);
await Promise.all(Array.from({ length: workerCount }, () => worker()));
async function startLane(poolLane) {
await waitForLaneStartSlot();
reserve(poolLane);
let promise;
promise = runLane(poolLane, baseEnv, logDir, options.timeoutMs)
.then((result) => ({ lane: poolLane, promise, result }))
.finally(() => {
release(poolLane);
});
running.add(promise);
}
while (pending.length > 0 || running.size > 0) {
let started = false;
if (!options.failFast || failures.length === 0) {
for (let index = 0; index < pending.length; ) {
const candidate = pending[index];
if (!canStartLane(candidate)) {
index += 1;
continue;
}
pending.splice(index, 1);
await startLane(candidate);
started = true;
}
}
if (started) {
continue;
}
if (running.size === 0) {
const blocked = pending.map(laneSummary).join(", ");
throw new Error(`No Docker lanes fit scheduler limits: ${blocked}`);
}
const { promise, result } = await Promise.race(running);
running.delete(promise);
if (result.status !== 0) {
failures.push(result);
}
if (options.failFast && failures.length > 0) {
const remainingResults = await Promise.all(running);
running.clear();
for (const remaining of remainingResults) {
if (remaining.result.status !== 0) {
failures.push(remaining.result);
}
}
break;
}
}
return failures;
}
@@ -448,6 +645,14 @@ async function main() {
console.log(`==> Lane start stagger: ${laneStartStaggerMs}ms`);
console.log(`==> Fail fast: ${failFast ? "yes" : "no"}`);
console.log(`==> Live-test bundled plugin deps: ${baseEnv.OPENCLAW_DOCKER_BUILD_EXTENSIONS}`);
const schedulerOptions = parseSchedulerOptions(process.env, parallelism);
const tailSchedulerOptions = parseSchedulerOptions(process.env, tailParallelism);
console.log(
`==> Scheduler: weight=${schedulerOptions.weightLimit} docker=${schedulerOptions.resourceLimits.docker} live=${schedulerOptions.resourceLimits.live} npm=${schedulerOptions.resourceLimits.npm} service=${schedulerOptions.resourceLimits.service}`,
);
console.log(
`==> Tail scheduler: weight=${tailSchedulerOptions.weightLimit} docker=${tailSchedulerOptions.resourceLimits.docker} live=${tailSchedulerOptions.resourceLimits.live} npm=${tailSchedulerOptions.resourceLimits.npm} service=${tailSchedulerOptions.resourceLimits.service}`,
);
await runForegroundGroup(
[
@@ -461,7 +666,12 @@ async function main() {
);
await prepareBundledChannelPackage(baseEnv, logDir);
const options = { failFast, startStaggerMs: laneStartStaggerMs, timeoutMs: laneTimeoutMs };
const options = {
...schedulerOptions,
failFast,
startStaggerMs: laneStartStaggerMs,
timeoutMs: laneTimeoutMs,
};
const failures = await runLanePool(lanes, baseEnv, logDir, parallelism, options);
if (failFast && failures.length > 0) {
await printFailureSummary(failures, tailLines);
@@ -469,7 +679,12 @@ async function main() {
}
console.log("==> Running provider-sensitive Docker tail lanes");
failures.push(...(await runLanePool(tailLanes, baseEnv, logDir, tailParallelism, options)));
failures.push(
...(await runLanePool(tailLanes, baseEnv, logDir, tailParallelism, {
...options,
...tailSchedulerOptions,
})),
);
if (failures.length > 0) {
await printFailureSummary(failures, tailLines);
process.exit(1);