From 610407730d42556b1ccc8563d2949c96d8bddd06 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 10 Apr 2026 23:29:46 +0100 Subject: [PATCH] fix: stop qa lab children cleanly --- extensions/qa-lab/src/bus-server.test.ts | 59 +++++++++++++++++ extensions/qa-lab/src/bus-server.ts | 22 ++++++- extensions/qa-lab/src/gateway-child.test.ts | 44 +++++++++++++ extensions/qa-lab/src/gateway-child.ts | 66 +++++++++++++++---- extensions/qa-lab/src/lab-server.test.ts | 65 ++++++++++++++++++ extensions/qa-lab/src/lab-server.ts | 15 +++-- extensions/qa-lab/src/mock-openai-server.ts | 5 +- .../qa-lab/src/model-catalog.runtime.ts | 60 ++++++++++++++++- extensions/qa-lab/src/suite.ts | 3 + 9 files changed, 313 insertions(+), 26 deletions(-) create mode 100644 extensions/qa-lab/src/bus-server.test.ts diff --git a/extensions/qa-lab/src/bus-server.test.ts b/extensions/qa-lab/src/bus-server.test.ts new file mode 100644 index 00000000000..56c89f6c208 --- /dev/null +++ b/extensions/qa-lab/src/bus-server.test.ts @@ -0,0 +1,59 @@ +import { Agent, createServer, request } from "node:http"; +import { describe, expect, it } from "vitest"; +import { closeQaHttpServer } from "./bus-server.js"; + +async function listenOnLoopback(server: ReturnType): Promise { + await new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => resolve()); + }); + const address = server.address(); + if (!address || typeof address === "string") { + throw new Error("expected server to bind a TCP port"); + } + return address.port; +} + +async function requestOnce(params: { port: number; agent: Agent }): Promise { + await new Promise((resolve, reject) => { + const req = request( + { + host: "127.0.0.1", + port: params.port, + path: "/", + agent: params.agent, + }, + (res) => { + res.resume(); + res.on("end", resolve); + res.on("error", reject); + }, + ); + req.on("error", reject); + req.end(); + }); +} + +describe("closeQaHttpServer", () => { + it("closes idle keep-alive sockets so suite processes can exit", async () => { + const server = createServer((_req, res) => { + res.writeHead(200, { + "content-type": "text/plain", + connection: "keep-alive", + }); + res.end("ok"); + }); + const agent = new Agent({ keepAlive: true }); + const port = await listenOnLoopback(server); + + try { + await requestOnce({ port, agent }); + const startedAt = Date.now(); + await closeQaHttpServer(server); + expect(Date.now() - startedAt).toBeLessThan(1_000); + } finally { + agent.destroy(); + server.closeAllConnections?.(); + } + }); +}); diff --git a/extensions/qa-lab/src/bus-server.ts b/extensions/qa-lab/src/bus-server.ts index e84a7053158..f5bc37ca459 100644 --- a/extensions/qa-lab/src/bus-server.ts +++ b/extensions/qa-lab/src/bus-server.ts @@ -38,6 +38,24 @@ export function writeError(res: ServerResponse, statusCode: number, error: unkno }); } +export async function closeQaHttpServer(server: Server): Promise { + let forceCloseTimer: NodeJS.Timeout | undefined; + try { + await new Promise((resolve, reject) => { + server.close((error) => (error ? reject(error) : resolve())); + server.closeIdleConnections?.(); + forceCloseTimer = setTimeout(() => { + server.closeAllConnections?.(); + }, 250); + forceCloseTimer.unref(); + }); + } finally { + if (forceCloseTimer) { + clearTimeout(forceCloseTimer); + } + } +} + export async function handleQaBusRequest(params: { req: IncomingMessage; res: ServerResponse; @@ -172,9 +190,7 @@ export async function startQaBusServer(params: { state: QaBusState; port?: numbe port: address.port, baseUrl: `http://127.0.0.1:${address.port}`, async stop() { - await new Promise((resolve, reject) => - server.close((error) => (error ? reject(error) : resolve())), - ); + await closeQaHttpServer(server); }, }; } diff --git a/extensions/qa-lab/src/gateway-child.test.ts b/extensions/qa-lab/src/gateway-child.test.ts index 4ff6dcb6df3..6a11cef254c 100644 --- a/extensions/qa-lab/src/gateway-child.test.ts +++ b/extensions/qa-lab/src/gateway-child.test.ts @@ -1,3 +1,4 @@ +import { spawn } from "node:child_process"; import { lstat, mkdir, mkdtemp, readFile, readdir, rm, writeFile } from "node:fs/promises"; import os from "node:os"; import path from "node:path"; @@ -302,6 +303,49 @@ describe("buildQaRuntimeEnv", () => { ); expect(release).toHaveBeenCalledTimes(1); }); + + it("force-stops gateway children that ignore the graceful signal", async () => { + const child = spawn( + process.execPath, + [ + "-e", + [ + "process.on('SIGTERM', () => {});", + "process.stdout.write('ready\\n');", + "setInterval(() => {}, 1000);", + ].join(""), + ], + { + detached: process.platform !== "win32", + stdio: ["ignore", "pipe", "ignore"], + }, + ); + cleanups.push(async () => { + if (child.exitCode === null && child.signalCode === null) { + try { + if (process.platform === "win32") { + child.kill("SIGKILL"); + } else if (child.pid) { + process.kill(-child.pid, "SIGKILL"); + } + } catch { + // The child already exited. + } + } + }); + + await new Promise((resolve, reject) => { + child.once("error", reject); + child.stdout?.once("data", () => resolve()); + }); + + await __testing.stopQaGatewayChildProcessTree(child, { + gracefulTimeoutMs: 50, + forceTimeoutMs: 1_000, + }); + + expect(child.exitCode !== null || child.signalCode !== null).toBe(true); + }); }); describe("resolveQaControlUiRoot", () => { diff --git a/extensions/qa-lab/src/gateway-child.ts b/extensions/qa-lab/src/gateway-child.ts index 7cffacee349..5a282910b43 100644 --- a/extensions/qa-lab/src/gateway-child.ts +++ b/extensions/qa-lab/src/gateway-child.ts @@ -1,4 +1,4 @@ -import { spawn } from "node:child_process"; +import { spawn, type ChildProcess } from "node:child_process"; import { randomUUID } from "node:crypto"; import { createWriteStream, existsSync } from "node:fs"; import fs from "node:fs/promises"; @@ -339,8 +339,57 @@ export const __testing = { resolveQaBundledPluginsSourceRoot, resolveQaRuntimeHostVersion, createQaBundledPluginsDir, + stopQaGatewayChildProcessTree, }; +function hasChildExited(child: ChildProcess) { + return child.exitCode !== null || child.signalCode !== null; +} + +function signalQaGatewayChildProcessTree(child: ChildProcess, signal: NodeJS.Signals) { + if (!child.pid) { + return; + } + try { + if (process.platform === "win32") { + child.kill(signal); + return; + } + process.kill(-child.pid, signal); + } catch { + try { + child.kill(signal); + } catch { + // The child already exited. + } + } +} + +async function waitForQaGatewayChildExit(child: ChildProcess, timeoutMs: number) { + if (hasChildExited(child)) { + return true; + } + return await Promise.race([ + new Promise((resolve) => child.once("exit", () => resolve(true))), + sleep(timeoutMs).then(() => false), + ]); +} + +async function stopQaGatewayChildProcessTree( + child: ChildProcess, + opts?: { gracefulTimeoutMs?: number; forceTimeoutMs?: number }, +) { + if (hasChildExited(child)) { + return; + } + signalQaGatewayChildProcessTree(child, "SIGTERM"); + if (await waitForQaGatewayChildExit(child, opts?.gracefulTimeoutMs ?? 5_000)) { + return; + } + signalQaGatewayChildProcessTree(child, "SIGKILL"); + await waitForQaGatewayChildExit(child, opts?.forceTimeoutMs ?? 2_000); +} + function resolveQaBundledPluginsSourceRoot(repoRoot: string) { const candidates = [ path.join(repoRoot, "dist", "extensions"), @@ -811,6 +860,7 @@ export async function startQaGatewayChild(params: { { cwd: runtimeCwd, env, + detached: process.platform !== "win32", stdio: ["ignore", "pipe", "pipe"], }, ); @@ -868,7 +918,7 @@ export async function startQaGatewayChild(params: { } catch (error) { stdoutLog.end(); stderrLog.end(); - child.kill("SIGTERM"); + await stopQaGatewayChildProcessTree(child, { gracefulTimeoutMs: 1_000 }).catch(() => {}); if (!keepTemp && stagedBundledPluginsRoot) { await fs.rm(stagedBundledPluginsRoot, { recursive: true, force: true }).catch(() => {}); } @@ -925,17 +975,7 @@ export async function startQaGatewayChild(params: { await rpcClient.stop().catch(() => {}); stdoutLog.end(); stderrLog.end(); - if (!child.killed) { - child.kill("SIGTERM"); - await Promise.race([ - new Promise((resolve) => child.once("exit", () => resolve())), - sleep(5_000).then(() => { - if (!child.killed) { - child.kill("SIGKILL"); - } - }), - ]); - } + await stopQaGatewayChildProcessTree(child); if (!(opts?.keepTemp ?? keepTemp)) { await fs.rm(tempRoot, { recursive: true, force: true }); if (stagedBundledPluginsRoot) { diff --git a/extensions/qa-lab/src/lab-server.test.ts b/extensions/qa-lab/src/lab-server.test.ts index f83dcecb993..709fd9ccd32 100644 --- a/extensions/qa-lab/src/lab-server.test.ts +++ b/extensions/qa-lab/src/lab-server.test.ts @@ -64,6 +64,21 @@ async function waitForRunnerCatalog(baseUrl: string, timeoutMs = 5_000) { throw new Error("runner catalog stayed loading"); } +async function waitForFile(filePath: string, timeoutMs = 5_000) { + const startedAt = Date.now(); + while (Date.now() - startedAt < timeoutMs) { + try { + return await readFile(filePath, "utf8"); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== "ENOENT") { + throw error; + } + await sleep(50); + } + } + throw new Error(`file did not appear: ${filePath}`); +} + describe("qa-lab server", () => { it("serves bootstrap state and writes a self-check report", async () => { const tempDir = await mkdtemp(path.join(os.tmpdir(), "qa-lab-test-")); @@ -405,6 +420,56 @@ describe("qa-lab server", () => { expect(await readFile(markerPath, "utf8")).toContain("models list --all --json"); }); + it("aborts an in-flight runner model catalog when the lab stops", async () => { + const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-lab-abort-catalog-")); + cleanups.push(async () => { + await rm(repoRoot, { recursive: true, force: true }); + }); + const markerPath = path.join(repoRoot, "runner-catalog-started.txt"); + const stoppedPath = path.join(repoRoot, "runner-catalog-stopped.txt"); + + await mkdir(path.join(repoRoot, "dist"), { recursive: true }); + await mkdir(path.join(repoRoot, "extensions/qa-lab/web/dist"), { recursive: true }); + await writeFile( + path.join(repoRoot, "dist/index.js"), + [ + 'const fs = require("node:fs");', + `fs.writeFileSync(${JSON.stringify(markerPath)}, process.env.OPENCLAW_CODEX_DISCOVERY_LIVE || "", "utf8");`, + "process.on('SIGTERM', () => {", + ` fs.writeFileSync(${JSON.stringify(stoppedPath)}, "terminated", "utf8");`, + " process.exit(0);", + "});", + "setInterval(() => {}, 1000);", + ].join("\n"), + "utf8", + ); + await writeFile( + path.join(repoRoot, "extensions/qa-lab/web/dist/index.html"), + "abort catalog", + "utf8", + ); + + const lab = await startQaLabServer({ + host: "127.0.0.1", + port: 0, + repoRoot, + }); + let stopped = false; + cleanups.push(async () => { + if (!stopped) { + await lab.stop(); + } + }); + + const bootstrapResponse = await fetchWithRetry(`${lab.baseUrl}/api/bootstrap`); + expect(bootstrapResponse.status).toBe(200); + expect(await waitForFile(markerPath)).toBe("0"); + + await lab.stop(); + stopped = true; + expect(await waitForFile(stoppedPath)).toBe("terminated"); + }); + it("can disable the embedded echo gateway for real-suite runs", async () => { const lab = await startQaLabServer({ host: "127.0.0.1", diff --git a/extensions/qa-lab/src/lab-server.ts b/extensions/qa-lab/src/lab-server.ts index 440814452be..19abeb4d820 100644 --- a/extensions/qa-lab/src/lab-server.ts +++ b/extensions/qa-lab/src/lab-server.ts @@ -14,7 +14,7 @@ import tls from "node:tls"; import { fileURLToPath } from "node:url"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime"; -import { handleQaBusRequest, writeError, writeJson } from "./bus-server.js"; +import { closeQaHttpServer, handleQaBusRequest, writeError, writeJson } from "./bus-server.js"; import { createQaBusState, type QaBusState } from "./bus-state.js"; import { createQaRunnerRuntime } from "./harness-runtime.js"; import type { @@ -465,22 +465,27 @@ export async function startQaLabServer( let publicBaseUrl = ""; let runnerModelCatalogPromise: Promise | null = null; + let runnerModelCatalogAbort: AbortController | null = null; const ensureRunnerModelCatalog = () => { if (runnerModelCatalogPromise) { return runnerModelCatalogPromise; } + runnerModelCatalogAbort = new AbortController(); runnerModelCatalogPromise = (async () => { try { const { loadQaRunnerModelOptions } = await import("./model-catalog.runtime.js"); runnerModelOptions = await loadQaRunnerModelOptions({ repoRoot, + signal: runnerModelCatalogAbort?.signal, }); runnerModelCatalogStatus = "ready"; } catch { runnerModelOptions = []; runnerModelCatalogStatus = "failed"; } - })(); + })().finally(() => { + runnerModelCatalogAbort = null; + }); return runnerModelCatalogPromise; }; @@ -802,10 +807,10 @@ export async function startQaLabServer( }, runSelfCheck, async stop() { + runnerModelCatalogAbort?.abort(); + await runnerModelCatalogPromise?.catch(() => undefined); await gateway?.stop(); - await new Promise((resolve, reject) => - server.close((error) => (error ? reject(error) : resolve())), - ); + await closeQaHttpServer(server); }, }; labHandle = lab; diff --git a/extensions/qa-lab/src/mock-openai-server.ts b/extensions/qa-lab/src/mock-openai-server.ts index 634c3844537..ff0f5324779 100644 --- a/extensions/qa-lab/src/mock-openai-server.ts +++ b/extensions/qa-lab/src/mock-openai-server.ts @@ -1,5 +1,6 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import { setTimeout as sleep } from "node:timers/promises"; +import { closeQaHttpServer } from "./bus-server.js"; type ResponsesInputItem = Record; @@ -805,9 +806,7 @@ export async function startQaMockOpenAiServer(params?: { host?: string; port?: n return { baseUrl: `http://${host}:${address.port}`, async stop() { - await new Promise((resolve, reject) => - server.close((error) => (error ? reject(error) : resolve())), - ); + await closeQaHttpServer(server); }, }; } diff --git a/extensions/qa-lab/src/model-catalog.runtime.ts b/extensions/qa-lab/src/model-catalog.runtime.ts index 36818683a82..54146348c66 100644 --- a/extensions/qa-lab/src/model-catalog.runtime.ts +++ b/extensions/qa-lab/src/model-catalog.runtime.ts @@ -59,7 +59,32 @@ export function selectQaRunnerModelOptions(rows: ModelRow[]): QaRunnerModelOptio }); } -export async function loadQaRunnerModelOptions(params: { repoRoot: string }) { +const CATALOG_ABORT_ERROR_MESSAGE = "qa model catalog aborted"; + +function createCatalogAbortError() { + return new Error(CATALOG_ABORT_ERROR_MESSAGE); +} + +function killProcessTree(pid: number | undefined, signal: NodeJS.Signals) { + if (pid === undefined) { + return; + } + try { + if (process.platform === "win32") { + process.kill(pid, signal); + return; + } + process.kill(-pid, signal); + } catch { + try { + process.kill(pid, signal); + } catch { + // The process already exited. + } + } +} + +export async function loadQaRunnerModelOptions(params: { repoRoot: string; signal?: AbortSignal }) { const tempRoot = await fs.mkdtemp( path.join(resolvePreferredOpenClawTmpDir(), "openclaw-qa-model-catalog-"), ); @@ -92,6 +117,8 @@ export async function loadQaRunnerModelOptions(params: { repoRoot: string }) { const stdout: Buffer[] = []; const stderr: Buffer[] = []; await new Promise((resolve, reject) => { + let aborted = params.signal?.aborted === true; + let forceKillTimer: NodeJS.Timeout | undefined; const child = spawn( process.execPath, ["dist/index.js", "models", "list", "--all", "--json"], @@ -104,14 +131,43 @@ export async function loadQaRunnerModelOptions(params: { repoRoot: string }) { OPENCLAW_CONFIG_PATH: configPath, OPENCLAW_STATE_DIR: stateDir, OPENCLAW_OAUTH_DIR: path.join(stateDir, "credentials"), + OPENCLAW_CODEX_DISCOVERY_LIVE: "0", }, + detached: process.platform !== "win32", stdio: ["ignore", "pipe", "pipe"], }, ); + const cleanup = () => { + params.signal?.removeEventListener("abort", abortCatalogLoad); + if (forceKillTimer) { + clearTimeout(forceKillTimer); + } + }; + const abortCatalogLoad = () => { + aborted = true; + killProcessTree(child.pid, "SIGTERM"); + forceKillTimer = setTimeout(() => { + killProcessTree(child.pid, "SIGKILL"); + }, 1_000); + forceKillTimer.unref(); + }; + if (aborted) { + abortCatalogLoad(); + } else { + params.signal?.addEventListener("abort", abortCatalogLoad, { once: true }); + } child.stdout.on("data", (chunk) => stdout.push(Buffer.from(chunk))); child.stderr.on("data", (chunk) => stderr.push(Buffer.from(chunk))); - child.once("error", reject); + child.once("error", (error) => { + cleanup(); + reject(aborted ? createCatalogAbortError() : error); + }); child.once("exit", (code) => { + cleanup(); + if (aborted) { + reject(createCatalogAbortError()); + return; + } if (code === 0) { resolve(); return; diff --git a/extensions/qa-lab/src/suite.ts b/extensions/qa-lab/src/suite.ts index 18a40c6c324..9cfb3e2be46 100644 --- a/extensions/qa-lab/src/suite.ts +++ b/extensions/qa-lab/src/suite.ts @@ -5,6 +5,7 @@ import path from "node:path"; import { setTimeout as sleep } from "node:timers/promises"; import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; +import { disposeRegisteredAgentHarnesses } from "openclaw/plugin-sdk/agent-harness"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { @@ -1460,6 +1461,7 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise