From e7538b449944569a956eee64fc8ecee4d062ce07 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Tue, 7 Apr 2026 12:23:26 +0100 Subject: [PATCH] perf(qa): drop per-rpc gateway cli forks --- extensions/qa-lab/src/gateway-child.ts | 61 ++------ .../qa-lab/src/gateway-rpc-client.test.ts | 143 ++++++++++++++++++ extensions/qa-lab/src/gateway-rpc-client.ts | 101 +++++++++++++ extensions/qa-lab/src/runtime-api.ts | 1 + 4 files changed, 254 insertions(+), 52 deletions(-) create mode 100644 extensions/qa-lab/src/gateway-rpc-client.test.ts create mode 100644 extensions/qa-lab/src/gateway-rpc-client.ts diff --git a/extensions/qa-lab/src/gateway-child.ts b/extensions/qa-lab/src/gateway-child.ts index c56e0d5029b..faa8b025e48 100644 --- a/extensions/qa-lab/src/gateway-child.ts +++ b/extensions/qa-lab/src/gateway-child.ts @@ -6,7 +6,7 @@ import net from "node:net"; import os from "node:os"; import path from "node:path"; import { setTimeout as sleep } from "node:timers/promises"; -import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { startQaGatewayRpcClient } from "./gateway-rpc-client.js"; import { seedQaAgentWorkspace } from "./qa-agent-workspace.js"; import { buildQaGatewayConfig } from "./qa-gateway-config.js"; @@ -157,34 +157,6 @@ async function waitForGatewayReady(params: { throw new Error(`gateway failed to become healthy:\n${params.logs()}`); } -async function runCliJson(params: { cwd: string; env: NodeJS.ProcessEnv; args: string[] }) { - const stdout: Buffer[] = []; - const stderr: Buffer[] = []; - await new Promise((resolve, reject) => { - const child = spawn(process.execPath, params.args, { - cwd: params.cwd, - env: params.env, - stdio: ["ignore", "pipe", "pipe"], - }); - child.stdout.on("data", (chunk) => stdout.push(Buffer.from(chunk))); - child.stderr.on("data", (chunk) => stderr.push(Buffer.from(chunk))); - child.once("error", reject); - child.once("exit", (code) => { - if (code === 0) { - resolve(); - return; - } - reject( - new Error( - `gateway cli failed (${code ?? "unknown"}): ${Buffer.concat(stderr).toString("utf8")}`, - ), - ); - }); - }); - const text = Buffer.concat(stdout).toString("utf8").trim(); - return text ? (JSON.parse(text) as unknown) : {}; -} - export function resolveQaControlUiRoot(params: { repoRoot: string; controlUiEnabled?: boolean }) { if (params.controlUiEnabled === false) { return undefined; @@ -288,12 +260,18 @@ export async function startQaGatewayChild(params: { `${Buffer.concat(stdout).toString("utf8")}\n${Buffer.concat(stderr).toString("utf8")}`.trim(); const keepTemp = process.env.OPENCLAW_QA_KEEP_TEMP === "1"; + let rpcClient; try { await waitForGatewayReady({ baseUrl, logs, child, }); + rpcClient = await startQaGatewayRpcClient({ + wsUrl, + token: gatewayToken, + logs, + }); } catch (error) { child.kill("SIGTERM"); throw error; @@ -314,31 +292,10 @@ export async function startQaGatewayChild(params: { rpcParams?: unknown, opts?: { expectFinal?: boolean; timeoutMs?: number }, ) { - return await runCliJson({ - cwd: runtimeCwd, - env, - args: [ - distEntryPath, - "gateway", - "call", - method, - "--url", - wsUrl, - "--token", - gatewayToken, - "--json", - "--timeout", - String(opts?.timeoutMs ?? 20_000), - ...(opts?.expectFinal ? ["--expect-final"] : []), - "--params", - JSON.stringify(rpcParams ?? {}), - ], - }).catch((error) => { - const details = formatErrorMessage(error); - throw new Error(`${details}\nGateway logs:\n${logs()}`); - }); + return await rpcClient.request(method, rpcParams, opts); }, async stop(opts?: { keepTemp?: boolean }) { + await rpcClient.stop().catch(() => {}); if (!child.killed) { child.kill("SIGTERM"); await Promise.race([ diff --git a/extensions/qa-lab/src/gateway-rpc-client.test.ts b/extensions/qa-lab/src/gateway-rpc-client.test.ts new file mode 100644 index 00000000000..fb736826876 --- /dev/null +++ b/extensions/qa-lab/src/gateway-rpc-client.test.ts @@ -0,0 +1,143 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const gatewayClientMock = vi.hoisted(() => { + const request = vi.fn(async (_method?: string, _params?: unknown, _opts?: unknown) => ({ + ok: true, + })); + const stopAndWait = vi.fn(async () => {}); + const stop = vi.fn(); + const constructorCalls: Array> = []; + let startMode: "hello" | "connect-error" = "hello"; + + class MockGatewayClient { + private readonly options: Record; + + constructor(options: Record) { + this.options = options; + constructorCalls.push(options); + } + + start() { + queueMicrotask(() => { + if (startMode === "connect-error") { + const onConnectError = this.options.onConnectError; + if (typeof onConnectError === "function") { + onConnectError(new Error("connect boom")); + } + return; + } + const onHelloOk = this.options.onHelloOk; + if (typeof onHelloOk === "function") { + onHelloOk({}); + } + }); + } + + async request(method: string, params?: unknown, opts?: unknown) { + return await request(method, params, opts); + } + + async stopAndWait() { + await stopAndWait(); + } + + stop() { + stop(); + } + } + + return { + MockGatewayClient, + request, + stopAndWait, + stop, + constructorCalls, + reset() { + request.mockReset().mockResolvedValue({ ok: true }); + stopAndWait.mockReset().mockResolvedValue(undefined); + stop.mockReset(); + constructorCalls.splice(0, constructorCalls.length); + startMode = "hello"; + }, + setStartMode(mode: "hello" | "connect-error") { + startMode = mode; + }, + }; +}); + +vi.mock("./runtime-api.js", () => ({ + GatewayClient: gatewayClientMock.MockGatewayClient, +})); + +import { startQaGatewayRpcClient } from "./gateway-rpc-client.js"; + +describe("startQaGatewayRpcClient", () => { + beforeEach(() => { + gatewayClientMock.reset(); + }); + + it("starts a gateway client without device identity and forwards requests", async () => { + const client = await startQaGatewayRpcClient({ + wsUrl: "ws://127.0.0.1:18789", + token: "qa-token", + logs: () => "qa logs", + }); + + expect(gatewayClientMock.constructorCalls[0]).toEqual( + expect.objectContaining({ + url: "ws://127.0.0.1:18789", + token: "qa-token", + deviceIdentity: null, + scopes: [ + "operator.admin", + "operator.read", + "operator.write", + "operator.approvals", + "operator.pairing", + "operator.talk.secrets", + ], + }), + ); + + await expect( + client.request("agent.run", { prompt: "hi" }, { expectFinal: true, timeoutMs: 45_000 }), + ).resolves.toEqual({ ok: true }); + + expect(gatewayClientMock.request).toHaveBeenCalledWith( + "agent.run", + { prompt: "hi" }, + { + expectFinal: true, + timeoutMs: 45_000, + }, + ); + + await client.stop(); + expect(gatewayClientMock.stopAndWait).toHaveBeenCalledTimes(1); + }); + + it("wraps request failures with gateway logs", async () => { + gatewayClientMock.request.mockRejectedValueOnce(new Error("gateway not connected")); + const client = await startQaGatewayRpcClient({ + wsUrl: "ws://127.0.0.1:18789", + token: "qa-token", + logs: () => "qa logs", + }); + + await expect(client.request("health")).rejects.toThrow( + "gateway not connected\nGateway logs:\nqa logs", + ); + }); + + it("wraps connect failures with gateway logs", async () => { + gatewayClientMock.setStartMode("connect-error"); + + await expect( + startQaGatewayRpcClient({ + wsUrl: "ws://127.0.0.1:18789", + token: "qa-token", + logs: () => "qa logs", + }), + ).rejects.toThrow("connect boom\nGateway logs:\nqa logs"); + }); +}); diff --git a/extensions/qa-lab/src/gateway-rpc-client.ts b/extensions/qa-lab/src/gateway-rpc-client.ts new file mode 100644 index 00000000000..d5ad3036643 --- /dev/null +++ b/extensions/qa-lab/src/gateway-rpc-client.ts @@ -0,0 +1,101 @@ +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import { GatewayClient } from "./runtime-api.js"; + +type QaGatewayRpcRequestOptions = { + expectFinal?: boolean; + timeoutMs?: number; +}; + +const QA_GATEWAY_RPC_SCOPES = [ + "operator.admin", + "operator.read", + "operator.write", + "operator.approvals", + "operator.pairing", + "operator.talk.secrets", +] as const; + +export type QaGatewayRpcClient = { + request(method: string, rpcParams?: unknown, opts?: QaGatewayRpcRequestOptions): Promise; + stop(): Promise; +}; + +function formatQaGatewayRpcError(error: unknown, logs: () => string) { + const details = formatErrorMessage(error); + return new Error(`${details}\nGateway logs:\n${logs()}`); +} + +export async function startQaGatewayRpcClient(params: { + wsUrl: string; + token: string; + logs: () => string; +}): Promise { + let readySettled = false; + let stopping = false; + let resolveReady!: () => void; + let rejectReady!: (err: unknown) => void; + + const ready = new Promise((resolve, reject) => { + resolveReady = resolve; + rejectReady = reject; + }); + + const settleReady = (error?: unknown) => { + if (readySettled) { + return; + } + readySettled = true; + if (error) { + rejectReady(error); + return; + } + resolveReady(); + }; + + const wrapError = (error: unknown) => formatQaGatewayRpcError(error, params.logs); + + const client = new GatewayClient({ + url: params.wsUrl, + token: params.token, + deviceIdentity: null, + // Mirror the old gateway CLI caller scopes so the faster path stays behavior-identical. + scopes: [...QA_GATEWAY_RPC_SCOPES], + onHelloOk: () => { + settleReady(); + }, + onConnectError: (error) => { + settleReady(wrapError(error)); + }, + onClose: (code, reason) => { + if (stopping) { + return; + } + const reasonText = reason.trim() || "no close reason"; + settleReady(wrapError(new Error(`gateway closed (${code}): ${reasonText}`))); + }, + }); + + client.start(); + await ready; + + return { + async request(method, rpcParams, opts) { + try { + return await client.request(method, rpcParams, { + expectFinal: opts?.expectFinal, + timeoutMs: opts?.timeoutMs ?? 20_000, + }); + } catch (error) { + throw wrapError(error); + } + }, + async stop() { + stopping = true; + try { + await client.stopAndWait(); + } catch { + client.stop(); + } + }, + }; +} diff --git a/extensions/qa-lab/src/runtime-api.ts b/extensions/qa-lab/src/runtime-api.ts index a5b6d0c32c6..00af1ac3a65 100644 --- a/extensions/qa-lab/src/runtime-api.ts +++ b/extensions/qa-lab/src/runtime-api.ts @@ -1,6 +1,7 @@ export type { Command } from "commander"; export type { OpenClawConfig, PluginRuntime } from "openclaw/plugin-sdk/core"; export { definePluginEntry } from "openclaw/plugin-sdk/core"; +export { GatewayClient } from "openclaw/plugin-sdk/gateway-runtime"; export { buildQaTarget, createQaBusThread,