From 5056dd47ca00a6dbfc9954193e7ca06cbcdcb1d5 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 2 Jun 2026 08:18:25 -0700 Subject: [PATCH] chore(scripts): add gateway rpc rtt probe --- scripts/measure-rpc-rtt.mjs | 456 ++++++++++++++++++++++++++++++++++++ 1 file changed, 456 insertions(+) create mode 100644 scripts/measure-rpc-rtt.mjs diff --git a/scripts/measure-rpc-rtt.mjs b/scripts/measure-rpc-rtt.mjs new file mode 100644 index 00000000000..32d0efe9755 --- /dev/null +++ b/scripts/measure-rpc-rtt.mjs @@ -0,0 +1,456 @@ +import { spawn } from "node:child_process"; +import { randomUUID } from "node:crypto"; +import fs from "node:fs/promises"; +import { createRequire } from "node:module"; +import net from "node:net"; +import path from "node:path"; +import { performance } from "node:perf_hooks"; +import { pathToFileURL } from "node:url"; + +const DEFAULT_METHODS = ["health", "config.get"]; +const DEFAULT_ITERATIONS = 10; +const READY_TIMEOUT_MS = 120_000; + +function usage() { + return [ + "Usage: node --import tsx scripts/measure-rpc-rtt.mjs", + " --output-dir ", + " [--repo-root ]", + " [--iterations ]", + " [--methods ]", + ].join("\n"); +} + +function parseArgs(argv) { + const args = { + iterations: DEFAULT_ITERATIONS, + methods: DEFAULT_METHODS, + }; + for (let index = 0; index < argv.length; index += 1) { + const arg = argv[index]; + if (arg === "--output-dir") { + args.outputDir = argv[(index += 1)]; + continue; + } + if (arg === "--repo-root") { + args.repoRoot = argv[(index += 1)]; + continue; + } + if (arg === "--iterations") { + args.iterations = Number(argv[(index += 1)]); + continue; + } + if (arg === "--methods") { + args.methods = argv[(index += 1)] + .split(",") + .map((entry) => entry.trim()) + .filter(Boolean); + continue; + } + throw new Error(`Unknown argument: ${arg}\n${usage()}`); + } + if (!args.outputDir) { + throw new Error(usage()); + } + if (!Number.isInteger(args.iterations) || args.iterations < 1) { + throw new Error("--iterations must be a positive integer."); + } + if (args.methods.length === 0) { + throw new Error("--methods must include at least one gateway method."); + } + return args; +} + +async function getFreePort() { + return await new Promise((resolve, reject) => { + const server = net.createServer(); + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address(); + server.close(() => { + if (address && typeof address === "object") { + resolve(address.port); + return; + } + reject(new Error("failed to allocate loopback port")); + }); + }); + }); +} + +async function sleep(ms) { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +async function waitForGatewayReady({ child, port, stderrPath }) { + const startedAt = Date.now(); + let childExit = null; + child.once("exit", (code, signal) => { + childExit = { code, signal }; + }); + while (Date.now() - startedAt < READY_TIMEOUT_MS) { + if (childExit) { + const stderr = await fs.readFile(stderrPath, "utf8").catch(() => ""); + throw new Error( + `gateway exited before readiness code=${childExit.code ?? "null"} signal=${childExit.signal ?? "null"}\n${stderr.slice(-4000)}`, + ); + } + for (const endpoint of ["/readyz", "/healthz"]) { + try { + const response = await fetch(`http://127.0.0.1:${port}${endpoint}`); + if (response.ok) { + return; + } + } catch { + // The gateway may not have bound the port yet. + } + } + await sleep(250); + } + const stderr = await fs.readFile(stderrPath, "utf8").catch(() => ""); + throw new Error( + `gateway did not become ready after ${READY_TIMEOUT_MS}ms\n${stderr.slice(-4000)}`, + ); +} + +async function stopGateway(child) { + if (child.exitCode !== null || child.signalCode !== null) { + return; + } + child.kill("SIGTERM"); + const exited = await new Promise((resolve) => { + const timer = setTimeout(() => resolve(false), 1_500); + child.once("exit", () => { + clearTimeout(timer); + resolve(true); + }); + }); + if (!exited && child.exitCode === null && child.signalCode === null) { + child.kill("SIGKILL"); + } +} + +function quantile(sorted, q) { + return sorted[Math.min(sorted.length - 1, Math.max(0, Math.ceil(sorted.length * q) - 1))]; +} + +function stats(samples) { + const sorted = samples.toSorted((left, right) => left - right); + return { + avgMs: Math.round(sorted.reduce((sum, value) => sum + value, 0) / sorted.length), + maxMs: Math.round(sorted.at(-1)), + minMs: Math.round(sorted[0]), + p50Ms: Math.round(quantile(sorted, 0.5)), + p95Ms: Math.round(quantile(sorted, 0.95)), + }; +} + +function toText(data) { + if (typeof data === "string") { + return data; + } + if (data instanceof ArrayBuffer) { + return Buffer.from(data).toString("utf8"); + } + if (Array.isArray(data)) { + return Buffer.concat(data.map((chunk) => Buffer.from(chunk))).toString("utf8"); + } + return Buffer.from(data).toString("utf8"); +} + +function createGatewayClient({ WebSocket, url }) { + const ws = new WebSocket(url, { handshakeTimeout: 8_000 }); + const pending = new Map(); + const rejectPending = (error) => { + for (const waiter of pending.values()) { + clearTimeout(waiter.timeout); + waiter.reject(error); + } + pending.clear(); + }; + ws.on("message", (data) => { + let frame; + try { + frame = JSON.parse(toText(data)); + } catch { + return; + } + if (frame?.type === "res") { + const waiter = pending.get(frame.id); + if (!waiter) { + return; + } + pending.delete(frame.id); + clearTimeout(waiter.timeout); + waiter.resolve(frame); + } + }); + ws.on("close", (code, reason) => { + rejectPending(new Error(`gateway websocket closed (${code}): ${toText(reason)}`)); + }); + ws.on("error", (error) => { + rejectPending(error instanceof Error ? error : new Error(String(error))); + }); + const waitOpen = async () => + await new Promise((resolve, reject) => { + const timer = setTimeout(() => reject(new Error("gateway websocket open timeout")), 8_000); + ws.once("open", () => { + clearTimeout(timer); + resolve(); + }); + ws.once("error", (error) => { + clearTimeout(timer); + reject(error instanceof Error ? error : new Error(String(error))); + }); + }); + const request = async (method, params, timeoutMs = 10_000) => + await new Promise((resolve, reject) => { + if (ws.readyState !== WebSocket.OPEN) { + reject(new Error(`gateway websocket is not open for ${method}`)); + return; + } + const id = randomUUID(); + const timeout = setTimeout(() => { + pending.delete(id); + reject(new Error(`timeout waiting for ${method}`)); + }, timeoutMs); + pending.set(id, { resolve, reject, timeout }); + ws.send(JSON.stringify({ type: "req", id, method, params }), (error) => { + if (!error) { + return; + } + const waiter = pending.get(id); + if (!waiter) { + return; + } + pending.delete(id); + clearTimeout(waiter.timeout); + waiter.reject(error instanceof Error ? error : new Error(String(error))); + }); + }); + const close = () => { + rejectPending(new Error("gateway websocket client closed")); + ws.close(); + }; + return { close, request, waitOpen }; +} + +async function writeSummary({ + details, + events, + finishedAt, + outputDir, + measurement, + startedAt, + status, +}) { + await fs.mkdir(outputDir, { recursive: true }); + await fs.writeFile( + path.join(outputDir, "rpc-events.json"), + `${JSON.stringify(events, null, 2)}\n`, + ); + await fs.writeFile( + path.join(outputDir, "qa-suite-summary.json"), + `${JSON.stringify( + { + counts: { + total: 1, + passed: status === "pass" ? 1 : 0, + failed: status === "pass" ? 0 : 1, + }, + run: { + startedAt: startedAt.toISOString(), + finishedAt: finishedAt.toISOString(), + providerMode: "gateway-rpc", + scenarioIds: ["rpc-gateway-smoke"], + }, + scenarios: [ + { + id: "rpc-gateway-smoke", + title: "Gateway RPC loopback smoke", + status, + details, + ...(measurement ? { rttMeasurement: measurement } : {}), + }, + ], + }, + null, + 2, + )}\n`, + ); +} + +async function main() { + const args = parseArgs(process.argv.slice(2)); + const repoRoot = path.resolve(args.repoRoot ?? process.env.OPENCLAW_REPO_ROOT ?? process.cwd()); + const outputDir = path.resolve(args.outputDir); + await fs.mkdir(outputDir, { recursive: true }); + const tempRoot = await fs.mkdtemp(path.join(outputDir, "..", ".rpc-rtt-")); + const startedAt = new Date(); + const token = `rpc-rtt-${randomUUID()}`; + const port = await getFreePort(); + const configPath = path.join(tempRoot, "openclaw.json"); + const stdoutPath = path.join(tempRoot, "gateway.stdout.log"); + const stderrPath = path.join(tempRoot, "gateway.stderr.log"); + let gatewayChild; + let status = "fail"; + let details = ""; + let measurement; + const events = []; + try { + await fs.writeFile( + configPath, + `${JSON.stringify( + { + gateway: { + mode: "local", + bind: "loopback", + port, + auth: { mode: "token", token }, + controlUi: { enabled: false }, + }, + plugins: { enabled: false }, + }, + null, + 2, + )}\n`, + ); + const stdout = await fs.open(stdoutPath, "w"); + const stderr = await fs.open(stderrPath, "w"); + gatewayChild = spawn( + "pnpm", + [ + "openclaw", + "gateway", + "run", + "--port", + String(port), + "--bind", + "loopback", + "--allow-unconfigured", + ], + { + cwd: repoRoot, + env: { + ...process.env, + HOME: path.join(tempRoot, "home"), + XDG_CONFIG_HOME: path.join(tempRoot, "xdg-config"), + XDG_DATA_HOME: path.join(tempRoot, "xdg-data"), + XDG_CACHE_HOME: path.join(tempRoot, "xdg-cache"), + OPENCLAW_CONFIG_PATH: configPath, + OPENCLAW_STATE_DIR: path.join(tempRoot, "state"), + OPENCLAW_GATEWAY_TOKEN: token, + OPENCLAW_SKIP_BROWSER_CONTROL_SERVER: "1", + OPENCLAW_SKIP_GMAIL_WATCHER: "1", + OPENCLAW_SKIP_CANVAS_HOST: "1", + OPENCLAW_NO_RESPAWN: "1", + OPENCLAW_TEST_FAST: "1", + }, + stdio: ["ignore", stdout.fd, stderr.fd], + }, + ); + await waitForGatewayReady({ child: gatewayChild, port, stderrPath }); + + const requireFromOpenClaw = createRequire(path.join(repoRoot, "package.json")); + const WebSocket = requireFromOpenClaw("ws"); + const protocol = await import( + pathToFileURL(path.join(repoRoot, "packages/gateway-protocol/src/version.ts")).href + ); + const client = createGatewayClient({ WebSocket, url: `ws://127.0.0.1:${port}` }); + await client.waitOpen(); + const connectStarted = performance.now(); + const connect = await client.request( + "connect", + { + minProtocol: protocol.MIN_CLIENT_PROTOCOL_VERSION, + maxProtocol: protocol.PROTOCOL_VERSION, + client: { + id: "gateway-client", + displayName: "openclaw-rtt rpc probe", + version: "rtt", + platform: process.platform, + mode: "backend", + instanceId: `openclaw-rtt-rpc-${randomUUID()}`, + }, + locale: "en-US", + userAgent: "openclaw-rtt-rpc", + role: "operator", + scopes: ["operator.admin"], + caps: [], + auth: { token }, + }, + 10_000, + ); + if (!connect.ok) { + throw new Error(`connect failed: ${JSON.stringify(connect.error)}`); + } + events.push({ + event: "gateway-rpc.connect", + payload: { + method: "connect", + ok: true, + durationMs: Math.round(performance.now() - connectStarted), + }, + }); + const samples = []; + for (const method of args.methods) { + for (let iteration = 1; iteration <= args.iterations; iteration += 1) { + const requestStartedAtMs = performance.now(); + const response = await client.request(method, {}, 10_000); + const durationMs = Math.round(performance.now() - requestStartedAtMs); + if (!response.ok) { + throw new Error(`${method} failed: ${JSON.stringify(response.error)}`); + } + samples.push({ method, durationMs }); + events.push({ + event: "gateway-rpc", + payload: { kind: "gateway-rpc", method, ok: true, durationMs, iteration }, + }); + } + } + client.close(); + const sampleStats = stats(samples.map((sample) => sample.durationMs)); + const byMethod = Object.fromEntries( + args.methods.map((method) => [ + method, + stats( + samples.filter((sample) => sample.method === method).map((sample) => sample.durationMs), + ), + ]), + ); + measurement = { + finalMatchedReplyRttMs: sampleStats.p50Ms, + durationMs: sampleStats.p50Ms, + method: args.methods.join(","), + source: "gateway-rpc", + }; + details = JSON.stringify({ + iterations: args.iterations, + methods: args.methods, + stats: sampleStats, + byMethod, + }); + status = "pass"; + } catch (error) { + details = error instanceof Error ? (error.stack ?? error.message) : String(error); + } finally { + if (gatewayChild) { + await stopGateway(gatewayChild).catch(() => {}); + } + await fs.rm(tempRoot, { force: true, recursive: true }).catch(() => {}); + } + const finishedAt = new Date(); + await writeSummary({ details, events, finishedAt, outputDir, measurement, startedAt, status }); + if (status !== "pass") { + throw new Error(details || "RPC RTT measurement failed"); + } +} + +main().catch( + /** @param {unknown} error */ (error) => { + process.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); + process.exitCode = 1; + }, +);