mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-08 04:32:53 +00:00
perf(scripts): parallelize remote core oxlint shards
This commit is contained in:
@@ -12,6 +12,7 @@ const DEFAULT_WINDOWS_EXTENSION_CHUNK_SIZE = 8;
|
||||
const DEFAULT_SHARD_HEARTBEAT_MS = 30_000;
|
||||
const DEFAULT_SHARD_TIMEOUT_MS = 15 * 60_000;
|
||||
const DEFAULT_SHARD_KILL_GRACE_MS = 5_000;
|
||||
const DEFAULT_SPLIT_CORE_SHARD_CONCURRENCY = 4;
|
||||
const FAST_LOCAL_CHECK_MIN_CPUS = 12;
|
||||
const FAST_LOCAL_CHECK_MIN_MEMORY_BYTES = 48 * 1024 ** 3;
|
||||
const EXTENSION_TS_CONFIG = "config/tsconfig/oxlint.extensions.json";
|
||||
@@ -128,11 +129,13 @@ export function shouldRunOxlintShardsSerial({
|
||||
return false;
|
||||
}
|
||||
const localCheckMode = env.OPENCLAW_LOCAL_CHECK_MODE?.trim().toLowerCase();
|
||||
if (localCheckMode === "full" || localCheckMode === "fast") {
|
||||
return false;
|
||||
}
|
||||
if (localCheckMode === "throttled" || localCheckMode === "low-memory") {
|
||||
return true;
|
||||
if (!isRemoteChangedGateEnv(env)) {
|
||||
if (localCheckMode === "full" || localCheckMode === "fast") {
|
||||
return false;
|
||||
}
|
||||
if (localCheckMode === "throttled" || localCheckMode === "low-memory") {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
const resources = resolveHostResources(hostResources);
|
||||
if (env.CI === "true" || env.GITHUB_ACTIONS === "true") {
|
||||
@@ -147,6 +150,13 @@ export function shouldRunOxlintShardsSerial({
|
||||
);
|
||||
}
|
||||
|
||||
function isRemoteChangedGateEnv(env) {
|
||||
return (
|
||||
env.OPENCLAW_CHECK_CHANGED_REMOTE_CHILD === "1" ||
|
||||
env.OPENCLAW_CHANGED_LANES_RAW_SYNC === "1"
|
||||
);
|
||||
}
|
||||
|
||||
function listExtensionEntries({ cwd, readDir }) {
|
||||
let entries;
|
||||
try {
|
||||
@@ -241,24 +251,25 @@ export async function main(extraArgs = process.argv.slice(2), runtimeEnv = proce
|
||||
if ((prepareResult.status ?? 1) !== 0) {
|
||||
process.exitCode = prepareResult.status ?? 1;
|
||||
} else {
|
||||
const runSerial =
|
||||
shardArgs.splitCore ||
|
||||
shouldRunOxlintShardsSerial({
|
||||
env,
|
||||
platform: process.platform,
|
||||
});
|
||||
const results = runSerial
|
||||
const shardConcurrency = resolveOxlintShardConcurrency({
|
||||
env,
|
||||
platform: process.platform,
|
||||
splitCore: shardArgs.splitCore,
|
||||
});
|
||||
const results = shardConcurrency <= 1
|
||||
? await runShardsSerial({
|
||||
entries: selectedShards,
|
||||
env,
|
||||
extraArgs: shardArgs.oxlintArgs,
|
||||
runner,
|
||||
})
|
||||
: await Promise.all(
|
||||
selectedShards.map((shard) =>
|
||||
runShard({ env, extraArgs: shardArgs.oxlintArgs, runner, shard }),
|
||||
),
|
||||
);
|
||||
: await runShardsParallel({
|
||||
concurrency: Math.min(shardConcurrency, selectedShards.length),
|
||||
entries: selectedShards,
|
||||
env,
|
||||
extraArgs: shardArgs.oxlintArgs,
|
||||
runner,
|
||||
});
|
||||
process.exitCode = results.find((status) => status !== 0) ?? 0;
|
||||
}
|
||||
} finally {
|
||||
@@ -322,6 +333,32 @@ export function filterOxlintShards(shards, only) {
|
||||
return shards.filter((shard) => only.has(shard.name) || only.has(shard.name.split(":")[0]));
|
||||
}
|
||||
|
||||
export function resolveOxlintShardConcurrency({
|
||||
env = process.env,
|
||||
platform = process.platform,
|
||||
hostResources,
|
||||
splitCore = false,
|
||||
} = {}) {
|
||||
if (shouldRunOxlintShardsSerial({ env, platform, hostResources })) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
const explicitConcurrency = resolvePositiveEnvInt(env, "OPENCLAW_OXLINT_SHARD_CONCURRENCY");
|
||||
if (explicitConcurrency !== null) {
|
||||
return explicitConcurrency;
|
||||
}
|
||||
|
||||
if (!splitCore) {
|
||||
return Number.MAX_SAFE_INTEGER;
|
||||
}
|
||||
|
||||
const resources = resolveHostResources(hostResources);
|
||||
return Math.max(
|
||||
1,
|
||||
Math.min(DEFAULT_SPLIT_CORE_SHARD_CONCURRENCY, Math.floor(resources.logicalCpuCount / 4)),
|
||||
);
|
||||
}
|
||||
|
||||
async function runShardsSerial({ entries, env, extraArgs, runner }) {
|
||||
const results = [];
|
||||
for (const shard of entries) {
|
||||
@@ -333,6 +370,30 @@ async function runShardsSerial({ entries, env, extraArgs, runner }) {
|
||||
return results;
|
||||
}
|
||||
|
||||
async function runShardsParallel({ concurrency, entries, env, extraArgs, runner }) {
|
||||
const results = [];
|
||||
results.length = entries.length;
|
||||
let nextIndex = 0;
|
||||
|
||||
const workers = Array.from({ length: concurrency }, async () => {
|
||||
for (;;) {
|
||||
if (isParentTerminationRequested()) {
|
||||
return;
|
||||
}
|
||||
const currentIndex = nextIndex;
|
||||
nextIndex += 1;
|
||||
const shard = entries[currentIndex];
|
||||
if (!shard) {
|
||||
return;
|
||||
}
|
||||
results[currentIndex] = await runShard({ env, extraArgs, runner, shard });
|
||||
}
|
||||
});
|
||||
|
||||
await Promise.all(workers);
|
||||
return results.filter((status) => status !== undefined);
|
||||
}
|
||||
|
||||
export async function runShard({ env, extraArgs, runner, shard }) {
|
||||
console.error(`[oxlint:${shard.name}] starting`);
|
||||
const startedAt = Date.now();
|
||||
@@ -452,6 +513,16 @@ function resolveNonNegativeEnvInt(env, key, defaultValue) {
|
||||
return Number.isFinite(parsedValue) && parsedValue >= 0 ? parsedValue : defaultValue;
|
||||
}
|
||||
|
||||
function resolvePositiveEnvInt(env, key) {
|
||||
const rawValue = env[key];
|
||||
if (rawValue === undefined) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const parsedValue = Number.parseInt(rawValue, 10);
|
||||
return Number.isFinite(parsedValue) && parsedValue > 0 ? parsedValue : null;
|
||||
}
|
||||
|
||||
function signalChildProcess({ child, signal, useProcessGroup }) {
|
||||
if (!child.pid) {
|
||||
return;
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
resolveShardKillGraceMs,
|
||||
resolveShardHeartbeatMs,
|
||||
resolveShardTimeoutMs,
|
||||
resolveOxlintShardConcurrency,
|
||||
resolveWindowsExtensionChunkSize,
|
||||
runShard,
|
||||
shouldRunOxlintShardsSerial,
|
||||
@@ -67,7 +68,7 @@ describe("run-oxlint", () => {
|
||||
expect(childSkipIndex).toBeGreaterThan(lockIndex);
|
||||
});
|
||||
|
||||
it("lets dev update preflight run oxlint shards serially", () => {
|
||||
it("keeps a serial oxlint shard path available", () => {
|
||||
const shardedLintRunner = readFileSync("scripts/run-oxlint-shards.mjs", "utf8");
|
||||
|
||||
expect(shardedLintRunner).toContain("OPENCLAW_OXLINT_SHARDS_SERIAL");
|
||||
@@ -143,6 +144,61 @@ describe("run-oxlint", () => {
|
||||
).toBe(false);
|
||||
});
|
||||
|
||||
it("bounds split-core shard parallelism on roomy CI hosts", () => {
|
||||
const roomyHost = { totalMemoryBytes: 64 * 1024 ** 3, logicalCpuCount: 16 };
|
||||
|
||||
expect(
|
||||
resolveOxlintShardConcurrency({
|
||||
env: { CI: "true" },
|
||||
platform: "linux",
|
||||
hostResources: roomyHost,
|
||||
splitCore: true,
|
||||
}),
|
||||
).toBe(4);
|
||||
});
|
||||
|
||||
it("keeps split-core shard runs serial on constrained hosts", () => {
|
||||
const constrainedHost = { totalMemoryBytes: 8 * 1024 ** 3, logicalCpuCount: 4 };
|
||||
|
||||
expect(
|
||||
resolveOxlintShardConcurrency({
|
||||
env: { CI: "true" },
|
||||
platform: "linux",
|
||||
hostResources: constrainedHost,
|
||||
splitCore: true,
|
||||
}),
|
||||
).toBe(1);
|
||||
});
|
||||
|
||||
it("does not let local throttled mode serialize remote changed gates", () => {
|
||||
const roomyHost = { totalMemoryBytes: 64 * 1024 ** 3, logicalCpuCount: 16 };
|
||||
|
||||
expect(
|
||||
resolveOxlintShardConcurrency({
|
||||
env: {
|
||||
OPENCLAW_CHECK_CHANGED_REMOTE_CHILD: "1",
|
||||
OPENCLAW_LOCAL_CHECK_MODE: "throttled",
|
||||
},
|
||||
platform: "linux",
|
||||
hostResources: roomyHost,
|
||||
splitCore: true,
|
||||
}),
|
||||
).toBe(4);
|
||||
});
|
||||
|
||||
it("honors explicit oxlint shard concurrency overrides", () => {
|
||||
const roomyHost = { totalMemoryBytes: 64 * 1024 ** 3, logicalCpuCount: 16 };
|
||||
|
||||
expect(
|
||||
resolveOxlintShardConcurrency({
|
||||
env: { CI: "true", OPENCLAW_OXLINT_SHARD_CONCURRENCY: "2" },
|
||||
platform: "linux",
|
||||
hostResources: roomyHost,
|
||||
splitCore: true,
|
||||
}),
|
||||
).toBe(2);
|
||||
});
|
||||
|
||||
it("uses a bounded oxlint shard heartbeat by default", () => {
|
||||
expect(resolveShardHeartbeatMs({})).toBe(30_000);
|
||||
expect(resolveShardHeartbeatMs({ OPENCLAW_OXLINT_SHARD_HEARTBEAT_MS: "0" })).toBe(0);
|
||||
|
||||
Reference in New Issue
Block a user