mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-15 19:21:08 +00:00
fix: stop qa lab children cleanly
This commit is contained in:
59
extensions/qa-lab/src/bus-server.test.ts
Normal file
59
extensions/qa-lab/src/bus-server.test.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { Agent, createServer, request } from "node:http";
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { closeQaHttpServer } from "./bus-server.js";
|
||||
|
||||
async function listenOnLoopback(server: ReturnType<typeof createServer>): Promise<number> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.once("error", reject);
|
||||
server.listen(0, "127.0.0.1", () => resolve());
|
||||
});
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("expected server to bind a TCP port");
|
||||
}
|
||||
return address.port;
|
||||
}
|
||||
|
||||
async function requestOnce(params: { port: number; agent: Agent }): Promise<void> {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const req = request(
|
||||
{
|
||||
host: "127.0.0.1",
|
||||
port: params.port,
|
||||
path: "/",
|
||||
agent: params.agent,
|
||||
},
|
||||
(res) => {
|
||||
res.resume();
|
||||
res.on("end", resolve);
|
||||
res.on("error", reject);
|
||||
},
|
||||
);
|
||||
req.on("error", reject);
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
|
||||
describe("closeQaHttpServer", () => {
|
||||
it("closes idle keep-alive sockets so suite processes can exit", async () => {
|
||||
const server = createServer((_req, res) => {
|
||||
res.writeHead(200, {
|
||||
"content-type": "text/plain",
|
||||
connection: "keep-alive",
|
||||
});
|
||||
res.end("ok");
|
||||
});
|
||||
const agent = new Agent({ keepAlive: true });
|
||||
const port = await listenOnLoopback(server);
|
||||
|
||||
try {
|
||||
await requestOnce({ port, agent });
|
||||
const startedAt = Date.now();
|
||||
await closeQaHttpServer(server);
|
||||
expect(Date.now() - startedAt).toBeLessThan(1_000);
|
||||
} finally {
|
||||
agent.destroy();
|
||||
server.closeAllConnections?.();
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -38,6 +38,24 @@ export function writeError(res: ServerResponse, statusCode: number, error: unkno
|
||||
});
|
||||
}
|
||||
|
||||
export async function closeQaHttpServer(server: Server): Promise<void> {
|
||||
let forceCloseTimer: NodeJS.Timeout | undefined;
|
||||
try {
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server.close((error) => (error ? reject(error) : resolve()));
|
||||
server.closeIdleConnections?.();
|
||||
forceCloseTimer = setTimeout(() => {
|
||||
server.closeAllConnections?.();
|
||||
}, 250);
|
||||
forceCloseTimer.unref();
|
||||
});
|
||||
} finally {
|
||||
if (forceCloseTimer) {
|
||||
clearTimeout(forceCloseTimer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function handleQaBusRequest(params: {
|
||||
req: IncomingMessage;
|
||||
res: ServerResponse;
|
||||
@@ -172,9 +190,7 @@ export async function startQaBusServer(params: { state: QaBusState; port?: numbe
|
||||
port: address.port,
|
||||
baseUrl: `http://127.0.0.1:${address.port}`,
|
||||
async stop() {
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
server.close((error) => (error ? reject(error) : resolve())),
|
||||
);
|
||||
await closeQaHttpServer(server);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { lstat, mkdir, mkdtemp, readFile, readdir, rm, writeFile } from "node:fs/promises";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
@@ -302,6 +303,49 @@ describe("buildQaRuntimeEnv", () => {
|
||||
);
|
||||
expect(release).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("force-stops gateway children that ignore the graceful signal", async () => {
|
||||
const child = spawn(
|
||||
process.execPath,
|
||||
[
|
||||
"-e",
|
||||
[
|
||||
"process.on('SIGTERM', () => {});",
|
||||
"process.stdout.write('ready\\n');",
|
||||
"setInterval(() => {}, 1000);",
|
||||
].join(""),
|
||||
],
|
||||
{
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["ignore", "pipe", "ignore"],
|
||||
},
|
||||
);
|
||||
cleanups.push(async () => {
|
||||
if (child.exitCode === null && child.signalCode === null) {
|
||||
try {
|
||||
if (process.platform === "win32") {
|
||||
child.kill("SIGKILL");
|
||||
} else if (child.pid) {
|
||||
process.kill(-child.pid, "SIGKILL");
|
||||
}
|
||||
} catch {
|
||||
// The child already exited.
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
child.once("error", reject);
|
||||
child.stdout?.once("data", () => resolve());
|
||||
});
|
||||
|
||||
await __testing.stopQaGatewayChildProcessTree(child, {
|
||||
gracefulTimeoutMs: 50,
|
||||
forceTimeoutMs: 1_000,
|
||||
});
|
||||
|
||||
expect(child.exitCode !== null || child.signalCode !== null).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("resolveQaControlUiRoot", () => {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import { spawn, type ChildProcess } from "node:child_process";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { createWriteStream, existsSync } from "node:fs";
|
||||
import fs from "node:fs/promises";
|
||||
@@ -339,8 +339,57 @@ export const __testing = {
|
||||
resolveQaBundledPluginsSourceRoot,
|
||||
resolveQaRuntimeHostVersion,
|
||||
createQaBundledPluginsDir,
|
||||
stopQaGatewayChildProcessTree,
|
||||
};
|
||||
|
||||
function hasChildExited(child: ChildProcess) {
|
||||
return child.exitCode !== null || child.signalCode !== null;
|
||||
}
|
||||
|
||||
function signalQaGatewayChildProcessTree(child: ChildProcess, signal: NodeJS.Signals) {
|
||||
if (!child.pid) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (process.platform === "win32") {
|
||||
child.kill(signal);
|
||||
return;
|
||||
}
|
||||
process.kill(-child.pid, signal);
|
||||
} catch {
|
||||
try {
|
||||
child.kill(signal);
|
||||
} catch {
|
||||
// The child already exited.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForQaGatewayChildExit(child: ChildProcess, timeoutMs: number) {
|
||||
if (hasChildExited(child)) {
|
||||
return true;
|
||||
}
|
||||
return await Promise.race([
|
||||
new Promise<boolean>((resolve) => child.once("exit", () => resolve(true))),
|
||||
sleep(timeoutMs).then(() => false),
|
||||
]);
|
||||
}
|
||||
|
||||
async function stopQaGatewayChildProcessTree(
|
||||
child: ChildProcess,
|
||||
opts?: { gracefulTimeoutMs?: number; forceTimeoutMs?: number },
|
||||
) {
|
||||
if (hasChildExited(child)) {
|
||||
return;
|
||||
}
|
||||
signalQaGatewayChildProcessTree(child, "SIGTERM");
|
||||
if (await waitForQaGatewayChildExit(child, opts?.gracefulTimeoutMs ?? 5_000)) {
|
||||
return;
|
||||
}
|
||||
signalQaGatewayChildProcessTree(child, "SIGKILL");
|
||||
await waitForQaGatewayChildExit(child, opts?.forceTimeoutMs ?? 2_000);
|
||||
}
|
||||
|
||||
function resolveQaBundledPluginsSourceRoot(repoRoot: string) {
|
||||
const candidates = [
|
||||
path.join(repoRoot, "dist", "extensions"),
|
||||
@@ -811,6 +860,7 @@ export async function startQaGatewayChild(params: {
|
||||
{
|
||||
cwd: runtimeCwd,
|
||||
env,
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
@@ -868,7 +918,7 @@ export async function startQaGatewayChild(params: {
|
||||
} catch (error) {
|
||||
stdoutLog.end();
|
||||
stderrLog.end();
|
||||
child.kill("SIGTERM");
|
||||
await stopQaGatewayChildProcessTree(child, { gracefulTimeoutMs: 1_000 }).catch(() => {});
|
||||
if (!keepTemp && stagedBundledPluginsRoot) {
|
||||
await fs.rm(stagedBundledPluginsRoot, { recursive: true, force: true }).catch(() => {});
|
||||
}
|
||||
@@ -925,17 +975,7 @@ export async function startQaGatewayChild(params: {
|
||||
await rpcClient.stop().catch(() => {});
|
||||
stdoutLog.end();
|
||||
stderrLog.end();
|
||||
if (!child.killed) {
|
||||
child.kill("SIGTERM");
|
||||
await Promise.race([
|
||||
new Promise<void>((resolve) => child.once("exit", () => resolve())),
|
||||
sleep(5_000).then(() => {
|
||||
if (!child.killed) {
|
||||
child.kill("SIGKILL");
|
||||
}
|
||||
}),
|
||||
]);
|
||||
}
|
||||
await stopQaGatewayChildProcessTree(child);
|
||||
if (!(opts?.keepTemp ?? keepTemp)) {
|
||||
await fs.rm(tempRoot, { recursive: true, force: true });
|
||||
if (stagedBundledPluginsRoot) {
|
||||
|
||||
@@ -64,6 +64,21 @@ async function waitForRunnerCatalog(baseUrl: string, timeoutMs = 5_000) {
|
||||
throw new Error("runner catalog stayed loading");
|
||||
}
|
||||
|
||||
async function waitForFile(filePath: string, timeoutMs = 5_000) {
|
||||
const startedAt = Date.now();
|
||||
while (Date.now() - startedAt < timeoutMs) {
|
||||
try {
|
||||
return await readFile(filePath, "utf8");
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== "ENOENT") {
|
||||
throw error;
|
||||
}
|
||||
await sleep(50);
|
||||
}
|
||||
}
|
||||
throw new Error(`file did not appear: ${filePath}`);
|
||||
}
|
||||
|
||||
describe("qa-lab server", () => {
|
||||
it("serves bootstrap state and writes a self-check report", async () => {
|
||||
const tempDir = await mkdtemp(path.join(os.tmpdir(), "qa-lab-test-"));
|
||||
@@ -405,6 +420,56 @@ describe("qa-lab server", () => {
|
||||
expect(await readFile(markerPath, "utf8")).toContain("models list --all --json");
|
||||
});
|
||||
|
||||
it("aborts an in-flight runner model catalog when the lab stops", async () => {
|
||||
const repoRoot = await mkdtemp(path.join(os.tmpdir(), "qa-lab-abort-catalog-"));
|
||||
cleanups.push(async () => {
|
||||
await rm(repoRoot, { recursive: true, force: true });
|
||||
});
|
||||
const markerPath = path.join(repoRoot, "runner-catalog-started.txt");
|
||||
const stoppedPath = path.join(repoRoot, "runner-catalog-stopped.txt");
|
||||
|
||||
await mkdir(path.join(repoRoot, "dist"), { recursive: true });
|
||||
await mkdir(path.join(repoRoot, "extensions/qa-lab/web/dist"), { recursive: true });
|
||||
await writeFile(
|
||||
path.join(repoRoot, "dist/index.js"),
|
||||
[
|
||||
'const fs = require("node:fs");',
|
||||
`fs.writeFileSync(${JSON.stringify(markerPath)}, process.env.OPENCLAW_CODEX_DISCOVERY_LIVE || "", "utf8");`,
|
||||
"process.on('SIGTERM', () => {",
|
||||
` fs.writeFileSync(${JSON.stringify(stoppedPath)}, "terminated", "utf8");`,
|
||||
" process.exit(0);",
|
||||
"});",
|
||||
"setInterval(() => {}, 1000);",
|
||||
].join("\n"),
|
||||
"utf8",
|
||||
);
|
||||
await writeFile(
|
||||
path.join(repoRoot, "extensions/qa-lab/web/dist/index.html"),
|
||||
"<!doctype html><html><body>abort catalog</body></html>",
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const lab = await startQaLabServer({
|
||||
host: "127.0.0.1",
|
||||
port: 0,
|
||||
repoRoot,
|
||||
});
|
||||
let stopped = false;
|
||||
cleanups.push(async () => {
|
||||
if (!stopped) {
|
||||
await lab.stop();
|
||||
}
|
||||
});
|
||||
|
||||
const bootstrapResponse = await fetchWithRetry(`${lab.baseUrl}/api/bootstrap`);
|
||||
expect(bootstrapResponse.status).toBe(200);
|
||||
expect(await waitForFile(markerPath)).toBe("0");
|
||||
|
||||
await lab.stop();
|
||||
stopped = true;
|
||||
expect(await waitForFile(stoppedPath)).toBe("terminated");
|
||||
});
|
||||
|
||||
it("can disable the embedded echo gateway for real-suite runs", async () => {
|
||||
const lab = await startQaLabServer({
|
||||
host: "127.0.0.1",
|
||||
|
||||
@@ -14,7 +14,7 @@ import tls from "node:tls";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import { normalizeLowercaseStringOrEmpty } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { handleQaBusRequest, writeError, writeJson } from "./bus-server.js";
|
||||
import { closeQaHttpServer, handleQaBusRequest, writeError, writeJson } from "./bus-server.js";
|
||||
import { createQaBusState, type QaBusState } from "./bus-state.js";
|
||||
import { createQaRunnerRuntime } from "./harness-runtime.js";
|
||||
import type {
|
||||
@@ -465,22 +465,27 @@ export async function startQaLabServer(
|
||||
|
||||
let publicBaseUrl = "";
|
||||
let runnerModelCatalogPromise: Promise<void> | null = null;
|
||||
let runnerModelCatalogAbort: AbortController | null = null;
|
||||
const ensureRunnerModelCatalog = () => {
|
||||
if (runnerModelCatalogPromise) {
|
||||
return runnerModelCatalogPromise;
|
||||
}
|
||||
runnerModelCatalogAbort = new AbortController();
|
||||
runnerModelCatalogPromise = (async () => {
|
||||
try {
|
||||
const { loadQaRunnerModelOptions } = await import("./model-catalog.runtime.js");
|
||||
runnerModelOptions = await loadQaRunnerModelOptions({
|
||||
repoRoot,
|
||||
signal: runnerModelCatalogAbort?.signal,
|
||||
});
|
||||
runnerModelCatalogStatus = "ready";
|
||||
} catch {
|
||||
runnerModelOptions = [];
|
||||
runnerModelCatalogStatus = "failed";
|
||||
}
|
||||
})();
|
||||
})().finally(() => {
|
||||
runnerModelCatalogAbort = null;
|
||||
});
|
||||
return runnerModelCatalogPromise;
|
||||
};
|
||||
|
||||
@@ -802,10 +807,10 @@ export async function startQaLabServer(
|
||||
},
|
||||
runSelfCheck,
|
||||
async stop() {
|
||||
runnerModelCatalogAbort?.abort();
|
||||
await runnerModelCatalogPromise?.catch(() => undefined);
|
||||
await gateway?.stop();
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
server.close((error) => (error ? reject(error) : resolve())),
|
||||
);
|
||||
await closeQaHttpServer(server);
|
||||
},
|
||||
};
|
||||
labHandle = lab;
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
import { closeQaHttpServer } from "./bus-server.js";
|
||||
|
||||
type ResponsesInputItem = Record<string, unknown>;
|
||||
|
||||
@@ -805,9 +806,7 @@ export async function startQaMockOpenAiServer(params?: { host?: string; port?: n
|
||||
return {
|
||||
baseUrl: `http://${host}:${address.port}`,
|
||||
async stop() {
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
server.close((error) => (error ? reject(error) : resolve())),
|
||||
);
|
||||
await closeQaHttpServer(server);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -59,7 +59,32 @@ export function selectQaRunnerModelOptions(rows: ModelRow[]): QaRunnerModelOptio
|
||||
});
|
||||
}
|
||||
|
||||
export async function loadQaRunnerModelOptions(params: { repoRoot: string }) {
|
||||
const CATALOG_ABORT_ERROR_MESSAGE = "qa model catalog aborted";
|
||||
|
||||
function createCatalogAbortError() {
|
||||
return new Error(CATALOG_ABORT_ERROR_MESSAGE);
|
||||
}
|
||||
|
||||
function killProcessTree(pid: number | undefined, signal: NodeJS.Signals) {
|
||||
if (pid === undefined) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (process.platform === "win32") {
|
||||
process.kill(pid, signal);
|
||||
return;
|
||||
}
|
||||
process.kill(-pid, signal);
|
||||
} catch {
|
||||
try {
|
||||
process.kill(pid, signal);
|
||||
} catch {
|
||||
// The process already exited.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function loadQaRunnerModelOptions(params: { repoRoot: string; signal?: AbortSignal }) {
|
||||
const tempRoot = await fs.mkdtemp(
|
||||
path.join(resolvePreferredOpenClawTmpDir(), "openclaw-qa-model-catalog-"),
|
||||
);
|
||||
@@ -92,6 +117,8 @@ export async function loadQaRunnerModelOptions(params: { repoRoot: string }) {
|
||||
const stdout: Buffer[] = [];
|
||||
const stderr: Buffer[] = [];
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
let aborted = params.signal?.aborted === true;
|
||||
let forceKillTimer: NodeJS.Timeout | undefined;
|
||||
const child = spawn(
|
||||
process.execPath,
|
||||
["dist/index.js", "models", "list", "--all", "--json"],
|
||||
@@ -104,14 +131,43 @@ export async function loadQaRunnerModelOptions(params: { repoRoot: string }) {
|
||||
OPENCLAW_CONFIG_PATH: configPath,
|
||||
OPENCLAW_STATE_DIR: stateDir,
|
||||
OPENCLAW_OAUTH_DIR: path.join(stateDir, "credentials"),
|
||||
OPENCLAW_CODEX_DISCOVERY_LIVE: "0",
|
||||
},
|
||||
detached: process.platform !== "win32",
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
},
|
||||
);
|
||||
const cleanup = () => {
|
||||
params.signal?.removeEventListener("abort", abortCatalogLoad);
|
||||
if (forceKillTimer) {
|
||||
clearTimeout(forceKillTimer);
|
||||
}
|
||||
};
|
||||
const abortCatalogLoad = () => {
|
||||
aborted = true;
|
||||
killProcessTree(child.pid, "SIGTERM");
|
||||
forceKillTimer = setTimeout(() => {
|
||||
killProcessTree(child.pid, "SIGKILL");
|
||||
}, 1_000);
|
||||
forceKillTimer.unref();
|
||||
};
|
||||
if (aborted) {
|
||||
abortCatalogLoad();
|
||||
} else {
|
||||
params.signal?.addEventListener("abort", abortCatalogLoad, { once: true });
|
||||
}
|
||||
child.stdout.on("data", (chunk) => stdout.push(Buffer.from(chunk)));
|
||||
child.stderr.on("data", (chunk) => stderr.push(Buffer.from(chunk)));
|
||||
child.once("error", reject);
|
||||
child.once("error", (error) => {
|
||||
cleanup();
|
||||
reject(aborted ? createCatalogAbortError() : error);
|
||||
});
|
||||
child.once("exit", (code) => {
|
||||
cleanup();
|
||||
if (aborted) {
|
||||
reject(createCatalogAbortError());
|
||||
return;
|
||||
}
|
||||
if (code === 0) {
|
||||
resolve();
|
||||
return;
|
||||
|
||||
@@ -5,6 +5,7 @@ import path from "node:path";
|
||||
import { setTimeout as sleep } from "node:timers/promises";
|
||||
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
|
||||
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
|
||||
import { disposeRegisteredAgentHarnesses } from "openclaw/plugin-sdk/agent-harness";
|
||||
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
|
||||
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
|
||||
import {
|
||||
@@ -1460,6 +1461,7 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise<QaSuiteResu
|
||||
watchUrl: lab.baseUrl,
|
||||
} satisfies QaSuiteResult;
|
||||
} finally {
|
||||
await disposeRegisteredAgentHarnesses();
|
||||
if (ownsLab) {
|
||||
await lab.stop();
|
||||
}
|
||||
@@ -1608,6 +1610,7 @@ export async function runQaSuite(params?: QaSuiteRunParams): Promise<QaSuiteResu
|
||||
await gateway.stop({
|
||||
keepTemp,
|
||||
});
|
||||
await disposeRegisteredAgentHarnesses();
|
||||
await mock?.stop();
|
||||
if (ownsLab) {
|
||||
await lab.stop();
|
||||
|
||||
Reference in New Issue
Block a user