fix(e2e): bound bundled runtime gateway cleanup

This commit is contained in:
Vincent Koc
2026-06-01 12:11:44 +02:00
parent f30235bed2
commit 4685a84e9b
2 changed files with 388 additions and 13 deletions

View File

@@ -15,6 +15,10 @@ const LOG_SCAN_BYTES = readPositiveInt(
process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_LOG_SCAN_BYTES,
256 * 1024,
);
const GATEWAY_LOG_CAPTURE_BYTES = readPositiveInt(
process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_GATEWAY_LOG_BYTES,
16 * 1024 * 1024,
);
const WATCHDOG_MS = readPositiveInt(process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_WATCHDOG_MS, 1000);
const READY_TIMEOUT_MS = readPositiveInt(
process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_READY_MS,
@@ -33,14 +37,26 @@ const HTTP_PROBE_TIMEOUT_MS = readPositiveInt(
process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_HTTP_MS,
5000,
);
const GATEWAY_TEARDOWN_GRACE_MS = readPositiveInt(
process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_GRACE_MS,
10000,
);
const GATEWAY_TEARDOWN_KILL_GRACE_MS = readPositiveInt(
process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_KILL_GRACE_MS,
1000,
);
const GATEWAY_READY_LOG_NEEDLE = Buffer.from("[gateway] ready");
const READY_OFFSET_LOG_NEEDLES = [
GATEWAY_READY_LOG_NEEDLE,
Buffer.from("listening on ws://"),
Buffer.from("[gateway] http server listening"),
];
const GATEWAY_LOG_TRUNCATED_NEEDLE = "[gateway log truncated after ";
const FORBIDDEN_POST_READY_DEPS_WORK = [/\b(?:npm|pnpm|yarn|corepack) install\b/iu];
const isolatedStateRoots = new WeakMap();
const activeGatewayChildren = new Set();
const parentSignalHandlers = new Map();
let gatewayExitCleanupInstalled = false;
function readPositiveInt(raw, fallback) {
const text = String(raw ?? "").trim();
@@ -319,6 +335,44 @@ function formatCapturedOutput(label, buffer) {
return `${prefix}${buffer.text}`;
}
function createBoundedGatewayLog(logPath) {
fs.mkdirSync(path.dirname(logPath), { recursive: true });
const fd = fs.openSync(logPath, "w");
let bytes = 0;
let closed = false;
let truncated = false;
const marker = Buffer.from(
`\n[gateway log truncated after ${String(GATEWAY_LOG_CAPTURE_BYTES)} bytes]\n`,
);
return {
append(chunk) {
if (closed || truncated) {
return;
}
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk));
const remaining = GATEWAY_LOG_CAPTURE_BYTES - bytes;
if (buffer.length <= remaining) {
fs.writeSync(fd, buffer);
bytes += buffer.length;
return;
}
if (remaining > 0) {
fs.writeSync(fd, buffer.subarray(0, remaining));
}
fs.writeSync(fd, marker);
bytes = GATEWAY_LOG_CAPTURE_BYTES;
truncated = true;
},
close() {
if (closed) {
return;
}
closed = true;
fs.closeSync(fd);
},
};
}
export function runCommand(command, args, options = {}) {
return new Promise((resolve, reject) => {
const { timeoutMs = COMMAND_TIMEOUT_MS, ...spawnOptions } = options;
@@ -384,8 +438,8 @@ export function runCommand(command, args, options = {}) {
});
}
function startGateway(params) {
const log = fs.openSync(params.logPath, "w");
export function startGateway(params) {
const log = createBoundedGatewayLog(params.logPath);
const child = childProcess.spawn(
"node",
[
@@ -405,11 +459,15 @@ function startGateway(params) {
OPENCLAW_SKIP_CHANNELS: params.skipChannels ? "1" : "0",
OPENCLAW_SKIP_PROVIDERS: "0",
},
stdio: ["ignore", log, log],
detached: false,
stdio: ["ignore", "pipe", "pipe"],
detached: process.platform !== "win32",
},
);
fs.closeSync(log);
child.stdout?.on("data", (chunk) => log.append(chunk));
child.stderr?.on("data", (chunk) => log.append(chunk));
child.once("error", () => log.close());
child.once("close", () => log.close());
trackGatewayChild(child);
return child;
}
@@ -417,17 +475,109 @@ export function hasChildExited(child) {
return child.exitCode !== null || (child.signalCode ?? null) !== null;
}
function trackGatewayChild(child) {
activeGatewayChildren.add(child);
const untrack = () => {
if (!processTreeIsAlive(child)) {
activeGatewayChildren.delete(child);
}
};
child.once("error", untrack);
child.once("close", untrack);
installGatewayParentCleanup();
}
function installGatewayParentCleanup() {
if (!gatewayExitCleanupInstalled) {
gatewayExitCleanupInstalled = true;
process.once("exit", () => {
cleanupActiveGatewayChildren("SIGTERM");
});
}
for (const signal of ["SIGHUP", "SIGINT", "SIGTERM"]) {
if (parentSignalHandlers.has(signal)) {
continue;
}
const handler = () => {
cleanupActiveGatewayChildren(signal);
for (const [registeredSignal, registeredHandler] of parentSignalHandlers) {
process.off(registeredSignal, registeredHandler);
}
parentSignalHandlers.clear();
process.kill(process.pid, signal);
};
parentSignalHandlers.set(signal, handler);
process.once(signal, handler);
}
}
function cleanupActiveGatewayChildren(signal) {
for (const child of activeGatewayChildren) {
signalGateway(child, signal);
if (process.platform !== "win32") {
signalGateway(child, "SIGKILL");
}
}
}
export async function stopGateway(child) {
if (!child || hasChildExited(child)) {
if (!child || !processTreeIsAlive(child)) {
return;
}
child.kill("SIGTERM");
const started = Date.now();
while (!hasChildExited(child) && Date.now() - started < 10000) {
await delay(100);
const waitForExit = async (ms) => {
const deadline = Date.now() + ms;
while (Date.now() < deadline) {
if (!processTreeIsAlive(child)) {
return true;
}
await delay(100);
}
return !processTreeIsAlive(child);
};
signalGateway(child, "SIGTERM");
if (await waitForExit(GATEWAY_TEARDOWN_GRACE_MS)) {
return;
}
if (!hasChildExited(child)) {
child.kill("SIGKILL");
signalGateway(child, "SIGKILL");
await waitForExit(GATEWAY_TEARDOWN_KILL_GRACE_MS);
}
function processTreeIsAlive(child) {
if (!child || typeof child.pid !== "number") {
return !hasChildExited(child);
}
if (process.platform === "win32") {
return !hasChildExited(child);
}
try {
process.kill(-child.pid, 0);
return true;
} catch (error) {
if (error?.code === "EPERM") {
return true;
}
return false;
}
}
function signalGateway(child, signal) {
if (process.platform !== "win32" && typeof child.pid === "number") {
try {
process.kill(-child.pid, signal);
return;
} catch (error) {
if (error?.code === "ESRCH") {
return;
}
}
}
try {
child.kill(signal);
} catch (error) {
if (error?.code !== "ESRCH") {
throw error;
}
}
}
@@ -842,6 +992,7 @@ async function runWatchdog(options) {
);
}
await retryRpcCall("health", {}, options);
assertGatewayLogNotTruncated(options.logPath);
assertNoPostReadyRuntimeDepsWork(options.logPath, readyOffset);
await assertNoPackageManagerChildren(options.child.pid);
}
@@ -850,6 +1001,16 @@ export function findReadyLogOffset(logPath) {
return findFirstNeedleOffset(logPath, READY_OFFSET_LOG_NEEDLES);
}
export function assertGatewayLogNotTruncated(logPath) {
if (readFileTail(logPath).includes(GATEWAY_LOG_TRUNCATED_NEEDLE)) {
throw new Error(
`gateway log exceeded ${String(
GATEWAY_LOG_CAPTURE_BYTES,
)} bytes; runtime smoke cannot validate complete post-ready output`,
);
}
}
export function assertNoPostReadyRuntimeDepsWork(logPath, readyOffset) {
let stat;
try {