fix(qa): preserve gateway cli auth in no-fork rpc path

This commit is contained in:
Vincent Koc
2026-04-07 12:48:52 +01:00
committed by Peter Steinberger
parent e7538b4499
commit f312d6c106
4 changed files with 116 additions and 159 deletions

View File

@@ -270,6 +270,7 @@ export async function startQaGatewayChild(params: {
rpcClient = await startQaGatewayRpcClient({
wsUrl,
token: gatewayToken,
env,
logs,
});
} catch (error) {

View File

@@ -1,126 +1,77 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const gatewayClientMock = vi.hoisted(() => {
const request = vi.fn(async (_method?: string, _params?: unknown, _opts?: unknown) => ({
ok: true,
}));
const stopAndWait = vi.fn(async () => {});
const stop = vi.fn();
const constructorCalls: Array<Record<string, unknown>> = [];
let startMode: "hello" | "connect-error" = "hello";
class MockGatewayClient {
private readonly options: Record<string, unknown>;
constructor(options: Record<string, unknown>) {
this.options = options;
constructorCalls.push(options);
}
start() {
queueMicrotask(() => {
if (startMode === "connect-error") {
const onConnectError = this.options.onConnectError;
if (typeof onConnectError === "function") {
onConnectError(new Error("connect boom"));
}
return;
}
const onHelloOk = this.options.onHelloOk;
if (typeof onHelloOk === "function") {
onHelloOk({});
}
});
}
async request(method: string, params?: unknown, opts?: unknown) {
return await request(method, params, opts);
}
async stopAndWait() {
await stopAndWait();
}
stop() {
stop();
}
}
const gatewayRpcMock = vi.hoisted(() => {
const callGatewayFromCli = vi.fn(async () => ({ ok: true }));
return {
MockGatewayClient,
request,
stopAndWait,
stop,
constructorCalls,
callGatewayFromCli,
reset() {
request.mockReset().mockResolvedValue({ ok: true });
stopAndWait.mockReset().mockResolvedValue(undefined);
stop.mockReset();
constructorCalls.splice(0, constructorCalls.length);
startMode = "hello";
},
setStartMode(mode: "hello" | "connect-error") {
startMode = mode;
callGatewayFromCli.mockReset().mockResolvedValue({ ok: true });
},
};
});
vi.mock("./runtime-api.js", () => ({
GatewayClient: gatewayClientMock.MockGatewayClient,
callGatewayFromCli: gatewayRpcMock.callGatewayFromCli,
}));
import { startQaGatewayRpcClient } from "./gateway-rpc-client.js";
describe("startQaGatewayRpcClient", () => {
beforeEach(() => {
gatewayClientMock.reset();
gatewayRpcMock.reset();
});
it("starts a gateway client without device identity and forwards requests", async () => {
it("calls the in-process gateway cli helper with the qa runtime env", async () => {
const originalHome = process.env.OPENCLAW_HOME;
delete process.env.OPENCLAW_HOME;
delete process.env.OPENCLAW_QA_TEST_ONLY;
gatewayRpcMock.callGatewayFromCli.mockImplementationOnce(async () => {
expect(process.env.OPENCLAW_HOME).toBe("/tmp/openclaw-home");
expect(process.env.OPENCLAW_QA_TEST_ONLY).toBe("1");
return { ok: true };
});
const client = await startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
env: {
OPENCLAW_HOME: "/tmp/openclaw-home",
OPENCLAW_QA_TEST_ONLY: "1",
} as NodeJS.ProcessEnv,
logs: () => "qa logs",
});
expect(gatewayClientMock.constructorCalls[0]).toEqual(
expect.objectContaining({
url: "ws://127.0.0.1:18789",
token: "qa-token",
deviceIdentity: null,
scopes: [
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
"operator.talk.secrets",
],
}),
);
await expect(
client.request("agent.run", { prompt: "hi" }, { expectFinal: true, timeoutMs: 45_000 }),
).resolves.toEqual({ ok: true });
expect(gatewayClientMock.request).toHaveBeenCalledWith(
expect(gatewayRpcMock.callGatewayFromCli).toHaveBeenCalledWith(
"agent.run",
{
url: "ws://127.0.0.1:18789",
token: "qa-token",
timeout: "45000",
expectFinal: true,
json: true,
},
{ prompt: "hi" },
{
expectFinal: true,
timeoutMs: 45_000,
progress: false,
},
);
await client.stop();
expect(gatewayClientMock.stopAndWait).toHaveBeenCalledTimes(1);
expect(process.env.OPENCLAW_HOME).toBe(originalHome);
expect(process.env.OPENCLAW_QA_TEST_ONLY).toBeUndefined();
});
it("wraps request failures with gateway logs", async () => {
gatewayClientMock.request.mockRejectedValueOnce(new Error("gateway not connected"));
gatewayRpcMock.callGatewayFromCli.mockRejectedValueOnce(new Error("gateway not connected"));
const client = await startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
env: { OPENCLAW_HOME: "/tmp/openclaw-home" } as NodeJS.ProcessEnv,
logs: () => "qa logs",
});
@@ -129,15 +80,18 @@ describe("startQaGatewayRpcClient", () => {
);
});
it("wraps connect failures with gateway logs", async () => {
gatewayClientMock.setStartMode("connect-error");
it("rejects new requests after stop", async () => {
const client = await startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
env: { OPENCLAW_HOME: "/tmp/openclaw-home" } as NodeJS.ProcessEnv,
logs: () => "qa logs",
});
await expect(
startQaGatewayRpcClient({
wsUrl: "ws://127.0.0.1:18789",
token: "qa-token",
logs: () => "qa logs",
}),
).rejects.toThrow("connect boom\nGateway logs:\nqa logs");
await client.stop();
await expect(client.request("health")).rejects.toThrow(
"gateway rpc client already stopped\nGateway logs:\nqa logs",
);
});
});

View File

@@ -1,20 +1,11 @@
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { GatewayClient } from "./runtime-api.js";
import { callGatewayFromCli } from "./runtime-api.js";
type QaGatewayRpcRequestOptions = {
expectFinal?: boolean;
timeoutMs?: number;
};
const QA_GATEWAY_RPC_SCOPES = [
"operator.admin",
"operator.read",
"operator.write",
"operator.approvals",
"operator.pairing",
"operator.talk.secrets",
] as const;
export type QaGatewayRpcClient = {
request(method: string, rpcParams?: unknown, opts?: QaGatewayRpcRequestOptions): Promise<unknown>;
stop(): Promise<void>;
@@ -25,77 +16,88 @@ function formatQaGatewayRpcError(error: unknown, logs: () => string) {
return new Error(`${details}\nGateway logs:\n${logs()}`);
}
let qaGatewayRpcQueue = Promise.resolve();
async function withScopedProcessEnv<T>(env: NodeJS.ProcessEnv, task: () => Promise<T>): Promise<T> {
const original = new Map<string, string | undefined>();
const keys = new Set([...Object.keys(process.env), ...Object.keys(env)]);
for (const key of keys) {
original.set(key, process.env[key]);
const nextValue = env[key];
if (nextValue === undefined) {
delete process.env[key];
continue;
}
process.env[key] = nextValue;
}
try {
return await task();
} finally {
for (const key of keys) {
const previousValue = original.get(key);
if (previousValue === undefined) {
delete process.env[key];
continue;
}
process.env[key] = previousValue;
}
}
}
async function runQueuedQaGatewayRpc<T>(task: () => Promise<T>): Promise<T> {
const run = qaGatewayRpcQueue.then(task, task);
qaGatewayRpcQueue = run.then(
() => undefined,
() => undefined,
);
return await run;
}
export async function startQaGatewayRpcClient(params: {
wsUrl: string;
token: string;
env: NodeJS.ProcessEnv;
logs: () => string;
}): Promise<QaGatewayRpcClient> {
let readySettled = false;
let stopping = false;
let resolveReady!: () => void;
let rejectReady!: (err: unknown) => void;
const ready = new Promise<void>((resolve, reject) => {
resolveReady = resolve;
rejectReady = reject;
});
const settleReady = (error?: unknown) => {
if (readySettled) {
return;
}
readySettled = true;
if (error) {
rejectReady(error);
return;
}
resolveReady();
};
const wrapError = (error: unknown) => formatQaGatewayRpcError(error, params.logs);
const client = new GatewayClient({
url: params.wsUrl,
token: params.token,
deviceIdentity: null,
// Mirror the old gateway CLI caller scopes so the faster path stays behavior-identical.
scopes: [...QA_GATEWAY_RPC_SCOPES],
onHelloOk: () => {
settleReady();
},
onConnectError: (error) => {
settleReady(wrapError(error));
},
onClose: (code, reason) => {
if (stopping) {
return;
}
const reasonText = reason.trim() || "no close reason";
settleReady(wrapError(new Error(`gateway closed (${code}): ${reasonText}`)));
},
});
client.start();
await ready;
let stopped = false;
return {
async request(method, rpcParams, opts) {
if (stopped) {
throw wrapError(new Error("gateway rpc client already stopped"));
}
try {
return await client.request(method, rpcParams, {
expectFinal: opts?.expectFinal,
timeoutMs: opts?.timeoutMs ?? 20_000,
});
return await runQueuedQaGatewayRpc(
async () =>
await withScopedProcessEnv(
params.env,
async () =>
await callGatewayFromCli(
method,
{
url: params.wsUrl,
token: params.token,
timeout: String(opts?.timeoutMs ?? 20_000),
expectFinal: opts?.expectFinal,
json: true,
},
rpcParams ?? {},
{
expectFinal: opts?.expectFinal,
progress: false,
},
),
),
);
} catch (error) {
throw wrapError(error);
}
},
async stop() {
stopping = true;
try {
await client.stopAndWait();
} catch {
client.stop();
}
stopped = true;
},
};
}

View File

@@ -1,7 +1,7 @@
export type { Command } from "commander";
export type { OpenClawConfig, PluginRuntime } from "openclaw/plugin-sdk/core";
export { definePluginEntry } from "openclaw/plugin-sdk/core";
export { GatewayClient } from "openclaw/plugin-sdk/gateway-runtime";
export { callGatewayFromCli } from "openclaw/plugin-sdk/browser-node-runtime";
export {
buildQaTarget,
createQaBusThread,