diff --git a/scripts/e2e/lib/bundled-plugin-install-uninstall/runtime-smoke.mjs b/scripts/e2e/lib/bundled-plugin-install-uninstall/runtime-smoke.mjs index f52d02f9e83..14fbb2ca601 100644 --- a/scripts/e2e/lib/bundled-plugin-install-uninstall/runtime-smoke.mjs +++ b/scripts/e2e/lib/bundled-plugin-install-uninstall/runtime-smoke.mjs @@ -15,6 +15,10 @@ const LOG_SCAN_BYTES = readPositiveInt( process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_LOG_SCAN_BYTES, 256 * 1024, ); +const GATEWAY_LOG_CAPTURE_BYTES = readPositiveInt( + process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_GATEWAY_LOG_BYTES, + 16 * 1024 * 1024, +); const WATCHDOG_MS = readPositiveInt(process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_WATCHDOG_MS, 1000); const READY_TIMEOUT_MS = readPositiveInt( process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_READY_MS, @@ -33,14 +37,26 @@ const HTTP_PROBE_TIMEOUT_MS = readPositiveInt( process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_HTTP_MS, 5000, ); +const GATEWAY_TEARDOWN_GRACE_MS = readPositiveInt( + process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_GRACE_MS, + 10000, +); +const GATEWAY_TEARDOWN_KILL_GRACE_MS = readPositiveInt( + process.env.OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_KILL_GRACE_MS, + 1000, +); const GATEWAY_READY_LOG_NEEDLE = Buffer.from("[gateway] ready"); const READY_OFFSET_LOG_NEEDLES = [ GATEWAY_READY_LOG_NEEDLE, Buffer.from("listening on ws://"), Buffer.from("[gateway] http server listening"), ]; +const GATEWAY_LOG_TRUNCATED_NEEDLE = "[gateway log truncated after "; const FORBIDDEN_POST_READY_DEPS_WORK = [/\b(?:npm|pnpm|yarn|corepack) install\b/iu]; const isolatedStateRoots = new WeakMap(); +const activeGatewayChildren = new Set(); +const parentSignalHandlers = new Map(); +let gatewayExitCleanupInstalled = false; function readPositiveInt(raw, fallback) { const text = String(raw ?? "").trim(); @@ -319,6 +335,44 @@ function formatCapturedOutput(label, buffer) { return `${prefix}${buffer.text}`; } +function createBoundedGatewayLog(logPath) { + fs.mkdirSync(path.dirname(logPath), { recursive: true }); + const fd = fs.openSync(logPath, "w"); + let bytes = 0; + let closed = false; + let truncated = false; + const marker = Buffer.from( + `\n[gateway log truncated after ${String(GATEWAY_LOG_CAPTURE_BYTES)} bytes]\n`, + ); + return { + append(chunk) { + if (closed || truncated) { + return; + } + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(String(chunk)); + const remaining = GATEWAY_LOG_CAPTURE_BYTES - bytes; + if (buffer.length <= remaining) { + fs.writeSync(fd, buffer); + bytes += buffer.length; + return; + } + if (remaining > 0) { + fs.writeSync(fd, buffer.subarray(0, remaining)); + } + fs.writeSync(fd, marker); + bytes = GATEWAY_LOG_CAPTURE_BYTES; + truncated = true; + }, + close() { + if (closed) { + return; + } + closed = true; + fs.closeSync(fd); + }, + }; +} + export function runCommand(command, args, options = {}) { return new Promise((resolve, reject) => { const { timeoutMs = COMMAND_TIMEOUT_MS, ...spawnOptions } = options; @@ -384,8 +438,8 @@ export function runCommand(command, args, options = {}) { }); } -function startGateway(params) { - const log = fs.openSync(params.logPath, "w"); +export function startGateway(params) { + const log = createBoundedGatewayLog(params.logPath); const child = childProcess.spawn( "node", [ @@ -405,11 +459,15 @@ function startGateway(params) { OPENCLAW_SKIP_CHANNELS: params.skipChannels ? "1" : "0", OPENCLAW_SKIP_PROVIDERS: "0", }, - stdio: ["ignore", log, log], - detached: false, + stdio: ["ignore", "pipe", "pipe"], + detached: process.platform !== "win32", }, ); - fs.closeSync(log); + child.stdout?.on("data", (chunk) => log.append(chunk)); + child.stderr?.on("data", (chunk) => log.append(chunk)); + child.once("error", () => log.close()); + child.once("close", () => log.close()); + trackGatewayChild(child); return child; } @@ -417,17 +475,109 @@ export function hasChildExited(child) { return child.exitCode !== null || (child.signalCode ?? null) !== null; } +function trackGatewayChild(child) { + activeGatewayChildren.add(child); + const untrack = () => { + if (!processTreeIsAlive(child)) { + activeGatewayChildren.delete(child); + } + }; + child.once("error", untrack); + child.once("close", untrack); + installGatewayParentCleanup(); +} + +function installGatewayParentCleanup() { + if (!gatewayExitCleanupInstalled) { + gatewayExitCleanupInstalled = true; + process.once("exit", () => { + cleanupActiveGatewayChildren("SIGTERM"); + }); + } + for (const signal of ["SIGHUP", "SIGINT", "SIGTERM"]) { + if (parentSignalHandlers.has(signal)) { + continue; + } + const handler = () => { + cleanupActiveGatewayChildren(signal); + for (const [registeredSignal, registeredHandler] of parentSignalHandlers) { + process.off(registeredSignal, registeredHandler); + } + parentSignalHandlers.clear(); + process.kill(process.pid, signal); + }; + parentSignalHandlers.set(signal, handler); + process.once(signal, handler); + } +} + +function cleanupActiveGatewayChildren(signal) { + for (const child of activeGatewayChildren) { + signalGateway(child, signal); + if (process.platform !== "win32") { + signalGateway(child, "SIGKILL"); + } + } +} + export async function stopGateway(child) { - if (!child || hasChildExited(child)) { + if (!child || !processTreeIsAlive(child)) { return; } - child.kill("SIGTERM"); - const started = Date.now(); - while (!hasChildExited(child) && Date.now() - started < 10000) { - await delay(100); + const waitForExit = async (ms) => { + const deadline = Date.now() + ms; + while (Date.now() < deadline) { + if (!processTreeIsAlive(child)) { + return true; + } + await delay(100); + } + return !processTreeIsAlive(child); + }; + + signalGateway(child, "SIGTERM"); + if (await waitForExit(GATEWAY_TEARDOWN_GRACE_MS)) { + return; } - if (!hasChildExited(child)) { - child.kill("SIGKILL"); + signalGateway(child, "SIGKILL"); + await waitForExit(GATEWAY_TEARDOWN_KILL_GRACE_MS); +} + +function processTreeIsAlive(child) { + if (!child || typeof child.pid !== "number") { + return !hasChildExited(child); + } + if (process.platform === "win32") { + return !hasChildExited(child); + } + try { + process.kill(-child.pid, 0); + return true; + } catch (error) { + if (error?.code === "EPERM") { + return true; + } + return false; + } +} + +function signalGateway(child, signal) { + if (process.platform !== "win32" && typeof child.pid === "number") { + try { + process.kill(-child.pid, signal); + return; + } catch (error) { + if (error?.code === "ESRCH") { + return; + } + } + } + try { + child.kill(signal); + } catch (error) { + if (error?.code !== "ESRCH") { + throw error; + } } } @@ -842,6 +992,7 @@ async function runWatchdog(options) { ); } await retryRpcCall("health", {}, options); + assertGatewayLogNotTruncated(options.logPath); assertNoPostReadyRuntimeDepsWork(options.logPath, readyOffset); await assertNoPackageManagerChildren(options.child.pid); } @@ -850,6 +1001,16 @@ export function findReadyLogOffset(logPath) { return findFirstNeedleOffset(logPath, READY_OFFSET_LOG_NEEDLES); } +export function assertGatewayLogNotTruncated(logPath) { + if (readFileTail(logPath).includes(GATEWAY_LOG_TRUNCATED_NEEDLE)) { + throw new Error( + `gateway log exceeded ${String( + GATEWAY_LOG_CAPTURE_BYTES, + )} bytes; runtime smoke cannot validate complete post-ready output`, + ); + } +} + export function assertNoPostReadyRuntimeDepsWork(logPath, readyOffset) { let stat; try { diff --git a/test/scripts/bundled-plugin-install-uninstall-probe.test.ts b/test/scripts/bundled-plugin-install-uninstall-probe.test.ts index 061d26642c4..c39f7e021e5 100644 --- a/test/scripts/bundled-plugin-install-uninstall-probe.test.ts +++ b/test/scripts/bundled-plugin-install-uninstall-probe.test.ts @@ -1,4 +1,4 @@ -import { spawnSync } from "node:child_process"; +import { spawn, spawnSync } from "node:child_process"; import fs from "node:fs"; import { createServer as createHttpServer, type Server as HttpServer } from "node:http"; import { createServer as createNetServer, type Server as NetServer, type Socket } from "node:net"; @@ -151,6 +151,41 @@ async function closeServer(server: HttpServer | NetServer): Promise { }); } +async function waitForFile(filePath: string, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (fs.existsSync(filePath)) { + return; + } + await new Promise((resolve) => { + setTimeout(resolve, 20); + }); + } + throw new Error(`timeout waiting for ${filePath}`); +} + +function pidIsAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch { + return false; + } +} + +async function waitForDead(pid: number, timeoutMs: number): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!pidIsAlive(pid)) { + return; + } + await new Promise((resolve) => { + setTimeout(resolve, 20); + }); + } + throw new Error(`timeout waiting for pid ${pid} to exit`); +} + afterEach(() => { vi.restoreAllMocks(); for (const dir of tempDirs.splice(0)) { @@ -176,6 +211,55 @@ describe("bundled plugin install/uninstall probe", () => { expect(second).toEqual({ text: "fghij", truncatedChars: 5 }); }); + it("caps noisy runtime gateway logs", async () => { + const runtimeSmoke = await importRuntimeSmokeWithEnv({ + OPENCLAW_BUNDLED_PLUGIN_RUNTIME_GATEWAY_LOG_BYTES: "64", + }); + const root = makePackageRoot(); + const entrypoint = path.join(root, "dist", "noisy-gateway.js"); + const logPath = path.join(root, "gateway.log"); + fs.writeFileSync( + entrypoint, + [ + "if (process.argv[2] === 'gateway') {", + " process.stdout.write('x'.repeat(2048));", + " setInterval(() => {}, 1000);", + "}", + "", + ].join("\n"), + "utf8", + ); + + const child = runtimeSmoke.startGateway({ + entrypoint, + env: {}, + logPath, + port: 19002, + skipChannels: true, + }); + try { + const marker = "[gateway log truncated after 64 bytes]"; + const deadline = Date.now() + 1000; + while (Date.now() < deadline) { + if (fs.existsSync(logPath) && fs.readFileSync(logPath, "utf8").includes(marker)) { + break; + } + await new Promise((resolve) => { + setTimeout(resolve, 20); + }); + } + + const log = fs.readFileSync(logPath, "utf8"); + expect(log).toContain(marker); + expect(log.length).toBeLessThan(256); + expect(() => runtimeSmoke.assertGatewayLogNotTruncated(logPath)).toThrow( + /runtime smoke cannot validate complete post-ready output/u, + ); + } finally { + await runtimeSmoke.stopGateway(child); + } + }); + it("matches runtime slash aliases across command list surfaces", async () => { const runtimeSmoke = await import(pathToFileURL(runtimeSmokePath).href); const payload = { @@ -291,6 +375,136 @@ describe("bundled plugin install/uninstall probe", () => { expect(child.kill).not.toHaveBeenCalled(); }); + it.runIf(process.platform !== "win32")("stops runtime gateway process groups", async () => { + const runtimeSmoke = await importRuntimeSmokeWithEnv({ + OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_GRACE_MS: "50", + OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_KILL_GRACE_MS: "1000", + }); + const root = makePackageRoot(); + const entrypoint = path.join(root, "dist", "gateway-with-sidecar.js"); + const logPath = path.join(root, "gateway.log"); + const descendantPidPath = path.join(root, "descendant.pid"); + const descendantScript = [ + "import fs from 'node:fs';", + `fs.writeFileSync(${JSON.stringify(descendantPidPath)}, String(process.pid));`, + "process.on('SIGTERM', () => {});", + "setInterval(() => {}, 1000);", + ].join("\n"); + fs.writeFileSync( + entrypoint, + [ + "import childProcess from 'node:child_process';", + "if (process.argv[2] === 'gateway') {", + ` childProcess.spawn(process.execPath, ["--input-type=module", "--eval", ${JSON.stringify( + descendantScript, + )}], { stdio: "ignore" });`, + " process.on('SIGTERM', () => process.exit(0));", + " setInterval(() => {}, 1000);", + "}", + "", + ].join("\n"), + "utf8", + ); + + const child = runtimeSmoke.startGateway({ + entrypoint, + env: {}, + logPath, + port: 19003, + skipChannels: true, + }); + let descendantPid: number | undefined; + try { + await waitForFile(descendantPidPath, 1000); + descendantPid = Number(fs.readFileSync(descendantPidPath, "utf8")); + expect(pidIsAlive(descendantPid)).toBe(true); + + await runtimeSmoke.stopGateway(child); + + await waitForDead(descendantPid, 2000); + } finally { + if (descendantPid !== undefined && pidIsAlive(descendantPid)) { + process.kill(descendantPid, "SIGKILL"); + } + } + }); + + it.runIf(process.platform !== "win32")( + "cleans detached runtime gateway groups when the parent is signaled", + async () => { + const root = makePackageRoot(); + const entrypoint = path.join(root, "dist", "gateway-with-signaled-sidecar.js"); + const runnerPath = path.join(root, "run-runtime-smoke.mjs"); + const logPath = path.join(root, "gateway-signal.log"); + const descendantPidPath = path.join(root, "signaled-descendant.pid"); + const descendantScript = [ + "import fs from 'node:fs';", + `fs.writeFileSync(${JSON.stringify(descendantPidPath)}, String(process.pid));`, + "process.on('SIGTERM', () => {});", + "setInterval(() => {}, 1000);", + ].join("\n"); + fs.writeFileSync( + entrypoint, + [ + "import childProcess from 'node:child_process';", + "if (process.argv[2] === 'gateway') {", + ` childProcess.spawn(process.execPath, ["--input-type=module", "--eval", ${JSON.stringify( + descendantScript, + )}], { stdio: "ignore" });`, + " setTimeout(() => process.exit(0), 50);", + "}", + "", + ].join("\n"), + "utf8", + ); + fs.writeFileSync( + runnerPath, + [ + `const runtimeSmoke = await import(${JSON.stringify(pathToFileURL(runtimeSmokePath).href)});`, + "runtimeSmoke.startGateway({", + ` entrypoint: ${JSON.stringify(entrypoint)},`, + " env: {},", + ` logPath: ${JSON.stringify(logPath)},`, + " port: 19004,", + " skipChannels: true,", + "});", + "setInterval(() => {}, 1000);", + "", + ].join("\n"), + "utf8", + ); + + const runner = spawn(process.execPath, [runnerPath], { + env: { + ...process.env, + OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_GRACE_MS: "50", + OPENCLAW_BUNDLED_PLUGIN_RUNTIME_TEARDOWN_KILL_GRACE_MS: "1000", + }, + stdio: "ignore", + }); + let descendantPid: number | undefined; + try { + await waitForFile(descendantPidPath, 1000); + descendantPid = Number(fs.readFileSync(descendantPidPath, "utf8")); + expect(pidIsAlive(descendantPid)).toBe(true); + await new Promise((resolve) => { + setTimeout(resolve, 150); + }); + + runner.kill("SIGTERM"); + + await waitForDead(descendantPid, 2000); + } finally { + if (runner.pid && pidIsAlive(runner.pid)) { + runner.kill("SIGKILL"); + } + if (descendantPid !== undefined && pidIsAlive(descendantPid)) { + process.kill(descendantPid, "SIGKILL"); + } + } + }, + ); + it("does not treat shallow HTTP listen logs as runtime readiness", async () => { const runtimeSmoke = await import(pathToFileURL(runtimeSmokePath).href); const root = makePackageRoot();