fix(kitchen-sink): clean command groups on parent signal

This commit is contained in:
Vincent Koc
2026-06-20 13:12:00 +02:00
parent 84895e9276
commit 36934fd9f5
2 changed files with 166 additions and 0 deletions

View File

@@ -31,6 +31,7 @@ const DEFAULT_MAX_COMMAND_RSS_MIB = 8192;
const DEFAULT_OUTPUT_CAPTURE_CHARS = 1024 * 1024;
const GATEWAY_TEARDOWN_GRACE_MS = 10000;
const GATEWAY_TEARDOWN_KILL_GRACE_MS = 2000;
const COMMAND_PARENT_SIGNAL_KILL_GRACE_MS = 2000;
const COMMAND_PROCESS_TREE_EXIT_POLL_MS = 50;
const LOG_SCAN_CHUNK_BYTES = 64 * 1024;
const LOG_SCAN_MAX_LINE_CHARS = 16 * 1024;
@@ -56,6 +57,40 @@ const ERROR_LOG_ALLOW_PATTERNS = [
];
let callGatewayModulePromise;
const activeCommandChildren = new Set();
const commandParentSignals =
process.platform === "win32" ? ["SIGINT", "SIGTERM"] : ["SIGINT", "SIGTERM", "SIGHUP"];
let commandShutdownPromise;
let commandSignalHandlersInstalled = false;
function installCommandSignalHandlers() {
if (commandSignalHandlersInstalled) {
return;
}
commandSignalHandlersInstalled = true;
for (const signal of commandParentSignals) {
process.on(signal, commandSignalHandlers.get(signal));
}
}
function removeCommandSignalHandlers() {
if (!commandSignalHandlersInstalled) {
return;
}
commandSignalHandlersInstalled = false;
for (const signal of commandParentSignals) {
process.off(signal, commandSignalHandlers.get(signal));
}
}
const commandSignalHandlers = new Map(
commandParentSignals.map((signal) => [
signal,
() => {
void shutdownActiveCommands(signal);
},
]),
);
function usage() {
return `Usage: node scripts/e2e/kitchen-sink-rpc-walk.mjs
@@ -301,6 +336,11 @@ function formatCapturedOutput(label, buffer) {
}
export function runCommand(command, args, options = {}) {
if (commandShutdownPromise) {
return commandShutdownPromise.then(() => {
throw new Error(`${command} ${args.join(" ")} skipped during parent signal shutdown`);
});
}
return new Promise((resolve, reject) => {
const config = resolveKitchenSinkRpcConfig();
const {
@@ -320,6 +360,8 @@ export function runCommand(command, args, options = {}) {
...spawnOptions,
detached: spawnOptions.detached ?? process.platform !== "win32",
});
activeCommandChildren.add(child);
installCommandSignalHandlers();
const startedAt = Date.now();
let stdout = { text: "", truncatedChars: 0 };
let stderr = { text: "", truncatedChars: 0 };
@@ -393,6 +435,7 @@ export function runCommand(command, args, options = {}) {
clearTimeout(timer);
clearTimeout(forceKillTimer);
forceKillAt = undefined;
releaseCommandChild(child);
void stopResourceSampling().finally(() =>
reject(toLintErrorObject(error, "Command failed before exit")),
);
@@ -402,6 +445,7 @@ export function runCommand(command, args, options = {}) {
const finish = () => {
clearTimeout(forceKillTimer);
forceKillAt = undefined;
releaseCommandChild(child);
void stopResourceSampling().then((resourceSampleFailure) => {
if (!timedOut && status === 0) {
if (resourceSampleFailure) {
@@ -470,6 +514,38 @@ async function finishTimedOutCommandProcessTree(child, { forceKillAt, timeoutKil
await waitForCommandProcessTreeExit(child, timeoutKillGraceMs);
}
function releaseCommandChild(child) {
activeCommandChildren.delete(child);
if (activeCommandChildren.size === 0 && !commandShutdownPromise) {
removeCommandSignalHandlers();
}
}
async function shutdownActiveCommands(signal) {
if (commandShutdownPromise) {
for (const child of activeCommandChildren) {
signalProcessGroup(child, "SIGKILL");
}
return commandShutdownPromise;
}
const children = [...activeCommandChildren];
for (const child of children) {
signalProcessGroup(child, signal);
}
commandShutdownPromise = Promise.all(
children.map((child) =>
finishTimedOutCommandProcessTree(child, {
forceKillAt: Date.now() + COMMAND_PARENT_SIGNAL_KILL_GRACE_MS,
timeoutKillGraceMs: COMMAND_PARENT_SIGNAL_KILL_GRACE_MS,
}),
),
).finally(() => {
removeCommandSignalHandlers();
process.kill(process.pid, signal);
});
return commandShutdownPromise;
}
async function waitForCommandProcessTreeExit(child, timeoutMs) {
const deadlineAt = Date.now() + timeoutMs;
while (Date.now() < deadlineAt) {

View File

@@ -1,4 +1,5 @@
// Kitchen Sink Rpc Walk tests cover kitchen sink rpc walk script behavior.
import { spawn } from "node:child_process";
import { createHash } from "node:crypto";
import { EventEmitter } from "node:events";
import fs, {
@@ -834,6 +835,81 @@ setInterval(() => {}, 1000);
cleanupTempDirs(tempDirs);
}
});
posixIt("cleans active command process groups before parent signal exit", async () => {
const tempDirs: string[] = [];
const root = makeTempDir(tempDirs, "openclaw-kitchen-rpc-parent-signal-");
const runnerPath = path.join(root, "runner.mjs");
const scriptPath = path.join(root, "term-zero-grandchild.mjs");
const grandchildPidPath = path.join(root, "grandchild.pid");
const readyPath = path.join(root, "ready");
let grandchildPid = 0;
let runner: ReturnType<typeof spawn> | undefined;
const grandchildScript = [
"const fs = require('node:fs');",
"process.on('SIGTERM', () => {});",
"process.on('SIGHUP', () => {});",
`fs.writeFileSync(${JSON.stringify(readyPath)}, 'ready');`,
"setInterval(() => {}, 1000);",
].join("\n");
writeFileSync(
scriptPath,
`
import { spawn } from "node:child_process";
import fs from "node:fs";
const grandchild = spawn(process.execPath, ["-e", ${JSON.stringify(grandchildScript)}], {
stdio: "ignore",
});
fs.writeFileSync(${JSON.stringify(grandchildPidPath)}, String(grandchild.pid));
process.on("SIGTERM", () => process.exit(0));
setInterval(() => {}, 1000);
`,
"utf8",
);
writeFileSync(
runnerPath,
`
import { runCommand } from ${JSON.stringify(
new URL("../../scripts/e2e/kitchen-sink-rpc-walk.mjs", import.meta.url).href,
)};
await runCommand(process.execPath, [${JSON.stringify(scriptPath)}], {
timeoutKillGraceMs: 100,
timeoutMs: 30_000,
});
`,
"utf8",
);
try {
runner = spawn(process.execPath, [runnerPath], {
cwd: process.cwd(),
stdio: ["ignore", "ignore", "pipe"],
});
await waitFor(() => existsSync(readyPath) && existsSync(grandchildPidPath));
grandchildPid = Number.parseInt(readText(grandchildPidPath), 10);
expect(Number.isInteger(grandchildPid)).toBe(true);
expect(isProcessAlive(grandchildPid)).toBe(true);
runner.kill("SIGTERM");
await expect(waitForChildClose(runner, 5_000)).resolves.toEqual({
code: null,
signal: "SIGTERM",
});
await waitFor(() => !isProcessAlive(grandchildPid), 5_000);
} finally {
if (grandchildPid && isProcessAlive(grandchildPid)) {
process.kill(grandchildPid, "SIGKILL");
}
if (runner?.pid && isProcessAlive(runner.pid)) {
runner.kill("SIGKILL");
}
cleanupTempDirs(tempDirs);
}
});
});
describe("kitchen-sink RPC payload unwrapping", () => {
@@ -2202,6 +2278,20 @@ async function waitFor(condition: () => boolean, timeoutMs = 3_000) {
}
}
async function waitForChildClose(child: ReturnType<typeof spawn>, timeoutMs = 3_000) {
return await new Promise<{ code: number | null; signal: NodeJS.Signals | null }>(
(resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("child did not close before timeout"));
}, timeoutMs);
child.once("close", (code, signal) => {
clearTimeout(timeout);
resolve({ code, signal });
});
},
);
}
function isProcessAlive(pid: number) {
try {
process.kill(pid, 0);