perf(qa): drop per-rpc gateway cli forks

This commit is contained in:
Vincent Koc
2026-04-07 12:23:26 +01:00
committed by Peter Steinberger
parent 02bd9e8c10
commit e7538b4499
4 changed files with 254 additions and 52 deletions

View File

@@ -6,7 +6,7 @@ import net from "node:net";
import os from "node:os";
import path from "node:path";
import { setTimeout as sleep } from "node:timers/promises";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { startQaGatewayRpcClient } from "./gateway-rpc-client.js";
import { seedQaAgentWorkspace } from "./qa-agent-workspace.js";
import { buildQaGatewayConfig } from "./qa-gateway-config.js";
@@ -157,34 +157,6 @@ async function waitForGatewayReady(params: {
throw new Error(`gateway failed to become healthy:\n${params.logs()}`);
}
async function runCliJson(params: { cwd: string; env: NodeJS.ProcessEnv; args: string[] }) {
const stdout: Buffer[] = [];
const stderr: Buffer[] = [];
await new Promise<void>((resolve, reject) => {
const child = spawn(process.execPath, params.args, {
cwd: params.cwd,
env: params.env,
stdio: ["ignore", "pipe", "pipe"],
});
child.stdout.on("data", (chunk) => stdout.push(Buffer.from(chunk)));
child.stderr.on("data", (chunk) => stderr.push(Buffer.from(chunk)));
child.once("error", reject);
child.once("exit", (code) => {
if (code === 0) {
resolve();
return;
}
reject(
new Error(
`gateway cli failed (${code ?? "unknown"}): ${Buffer.concat(stderr).toString("utf8")}`,
),
);
});
});
const text = Buffer.concat(stdout).toString("utf8").trim();
return text ? (JSON.parse(text) as unknown) : {};
}
export function resolveQaControlUiRoot(params: { repoRoot: string; controlUiEnabled?: boolean }) {
if (params.controlUiEnabled === false) {
return undefined;
@@ -288,12 +260,18 @@ export async function startQaGatewayChild(params: {
`${Buffer.concat(stdout).toString("utf8")}\n${Buffer.concat(stderr).toString("utf8")}`.trim();
const keepTemp = process.env.OPENCLAW_QA_KEEP_TEMP === "1";
let rpcClient;
try {
await waitForGatewayReady({
baseUrl,
logs,
child,
});
rpcClient = await startQaGatewayRpcClient({
wsUrl,
token: gatewayToken,
logs,
});
} catch (error) {
child.kill("SIGTERM");
throw error;
@@ -314,31 +292,10 @@ export async function startQaGatewayChild(params: {
rpcParams?: unknown,
opts?: { expectFinal?: boolean; timeoutMs?: number },
) {
return await runCliJson({
cwd: runtimeCwd,
env,
args: [
distEntryPath,
"gateway",
"call",
method,
"--url",
wsUrl,
"--token",
gatewayToken,
"--json",
"--timeout",
String(opts?.timeoutMs ?? 20_000),
...(opts?.expectFinal ? ["--expect-final"] : []),
"--params",
JSON.stringify(rpcParams ?? {}),
],
}).catch((error) => {
const details = formatErrorMessage(error);
throw new Error(`${details}\nGateway logs:\n${logs()}`);
});
return await rpcClient.request(method, rpcParams, opts);
},
async stop(opts?: { keepTemp?: boolean }) {
await rpcClient.stop().catch(() => {});
if (!child.killed) {
child.kill("SIGTERM");
await Promise.race([

View File

@@ -0,0 +1,143 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const gatewayClientMock = vi.hoisted(() => {
const request = vi.fn(async (_method?: string, _params?: unknown, _opts?: unknown) => ({
ok: true,
}));
const stopAndWait = vi.fn(async () => {});
const stop = vi.fn();
const constructorCalls: Array<Record<string, unknown>> = [];
let startMode: "hello" | "connect-error" = "hello";
class MockGatewayClient {
private readonly options: Record<string, unknown>;
constructor(options: Record<string, unknown>) {
this.options = options;
constructorCalls.push(options);
}
start() {
queueMicrotask(() => {
if (startMode === "connect-error") {
const onConnectError = this.options.onConnectError;
if (typeof onConnectError === "function") {
onConnectError(new Error("connect boom"));
}
return;
}
const onHelloOk = this.options.onHelloOk;
if (typeof onHelloOk === "function") {
onHelloOk({});
}
});
}
async request(method: string, params?: unknown, opts?: unknown) {
return await request(method, params, opts);
}
async stopAndWait() {
await stopAndWait();
}
stop() {
stop();
}
}
return {
MockGatewayClient,
request,
stopAndWait,
stop,
constructorCalls,
reset() {
request.mockReset().mockResolvedValue({ ok: true });
stopAndWait.mockReset().mockResolvedValue(undefined);
stop.mockReset();
constructorCalls.splice(0, constructorCalls.length);
startMode = "hello";
},
setStartMode(mode: "hello" | "connect-error") {
startMode = mode;
},
};
});
vi.mock("./runtime-api.js", () => ({
GatewayClient: gatewayClientMock.MockGatewayClient,
}));
import { startQaGatewayRpcClient } from "./gateway-rpc-client.js";
describe("startQaGatewayRpcClient", () => {
beforeEach(() => {
gatewayClientMock.reset();
});
it("starts a gateway client without device identity and forwards requests", async () => {
const client = await startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
logs: () => "qa logs",
});
expect(gatewayClientMock.constructorCalls[0]).toEqual(
expect.objectContaining({
url: "ws://127.0.0.1:18789",
token: "qa-token",
deviceIdentity: null,
scopes: [
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
"operator.talk.secrets",
],
}),
);
await expect(
client.request("agent.run", { prompt: "hi" }, { expectFinal: true, timeoutMs: 45_000 }),
).resolves.toEqual({ ok: true });
expect(gatewayClientMock.request).toHaveBeenCalledWith(
"agent.run",
{ prompt: "hi" },
{
expectFinal: true,
timeoutMs: 45_000,
},
);
await client.stop();
expect(gatewayClientMock.stopAndWait).toHaveBeenCalledTimes(1);
});
it("wraps request failures with gateway logs", async () => {
gatewayClientMock.request.mockRejectedValueOnce(new Error("gateway not connected"));
const client = await startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
logs: () => "qa logs",
});
await expect(client.request("health")).rejects.toThrow(
"gateway not connected\nGateway logs:\nqa logs",
);
});
it("wraps connect failures with gateway logs", async () => {
gatewayClientMock.setStartMode("connect-error");
await expect(
startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
logs: () => "qa logs",
}),
).rejects.toThrow("connect boom\nGateway logs:\nqa logs");
});
});

View File

@@ -0,0 +1,101 @@
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { GatewayClient } from "./runtime-api.js";
type QaGatewayRpcRequestOptions = {
expectFinal?: boolean;
timeoutMs?: number;
};
const QA_GATEWAY_RPC_SCOPES = [
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
"operator.talk.secrets",
] as const;
export type QaGatewayRpcClient = {
request(method: string, rpcParams?: unknown, opts?: QaGatewayRpcRequestOptions): Promise<unknown>;
stop(): Promise<void>;
};
function formatQaGatewayRpcError(error: unknown, logs: () => string) {
const details = formatErrorMessage(error);
return new Error(`${details}\nGateway logs:\n${logs()}`);
}
export async function startQaGatewayRpcClient(params: {
wsUrl: string;
token: string;
logs: () => string;
}): Promise<QaGatewayRpcClient> {
let readySettled = false;
let stopping = false;
let resolveReady!: () => void;
let rejectReady!: (err: unknown) => void;
const ready = new Promise<void>((resolve, reject) => {
resolveReady = resolve;
rejectReady = reject;
});
const settleReady = (error?: unknown) => {
if (readySettled) {
return;
}
readySettled = true;
if (error) {
rejectReady(error);
return;
}
resolveReady();
};
const wrapError = (error: unknown) => formatQaGatewayRpcError(error, params.logs);
const client = new GatewayClient({
url: params.wsUrl,
token: params.token,
deviceIdentity: null,
// Mirror the old gateway CLI caller scopes so the faster path stays behavior-identical.
scopes: [...QA_GATEWAY_RPC_SCOPES],
onHelloOk: () => {
settleReady();
},
onConnectError: (error) => {
settleReady(wrapError(error));
},
onClose: (code, reason) => {
if (stopping) {
return;
}
const reasonText = reason.trim() || "no close reason";
settleReady(wrapError(new Error(`gateway closed (${code}): ${reasonText}`)));
},
});
client.start();
await ready;
return {
async request(method, rpcParams, opts) {
try {
return await client.request(method, rpcParams, {
expectFinal: opts?.expectFinal,
timeoutMs: opts?.timeoutMs ?? 20_000,
});
} catch (error) {
throw wrapError(error);
}
},
async stop() {
stopping = true;
try {
await client.stopAndWait();
} catch {
client.stop();
}
},
};
}

View File

@@ -1,6 +1,7 @@
export type { Command } from "commander";
export type { OpenClawConfig, PluginRuntime } from "openclaw/plugin-sdk/core";
export { definePluginEntry } from "openclaw/plugin-sdk/core";
export { GatewayClient } from "openclaw/plugin-sdk/gateway-runtime";
export {
buildQaTarget,
createQaBusThread,