test: share gateway live client helpers

This commit is contained in:
Peter Steinberger
2026-04-20 22:49:37 +01:00
parent c55e1f7566
commit ca2c9fef8c
3 changed files with 48 additions and 184 deletions

View File

@@ -15,8 +15,8 @@ import {
import { extractFirstTextBlock } from "../shared/chat-message-content.js";
import { createTestRegistry } from "../test-utils/channel-plugins.js";
import { sleep } from "../utils.js";
import { GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { GatewayClient } from "./client.js";
import type { GatewayClient } from "./client.js";
import { connectTestGatewayClient } from "./gateway-cli-backend.live-helpers.js";
import {
assertCronJobMatches,
assertCronJobVisibleViaCli,
@@ -161,87 +161,18 @@ async function waitForGatewayPort(params: {
async function connectClient(params: { url: string; token: string; timeoutMs?: number }) {
const timeoutMs = params.timeoutMs ?? CONNECT_TIMEOUT_MS;
const startedAt = Date.now();
let attempt = 0;
let lastError: Error | null = null;
while (Date.now() - startedAt < timeoutMs) {
attempt += 1;
const remainingMs = timeoutMs - (Date.now() - startedAt);
if (remainingMs <= 0) {
break;
}
try {
return await connectClientOnce({
...params,
timeoutMs: Math.min(remainingMs, 35_000),
});
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (!isRetryableGatewayConnectError(lastError) || remainingMs <= 5_000) {
throw lastError;
}
logLiveStep(`gateway connect warmup retry ${attempt}: ${lastError.message}`);
await sleep(Math.min(1_000 * attempt, 5_000));
}
}
throw lastError ?? new Error("gateway connect timeout");
}
async function connectClientOnce(params: { url: string; token: string; timeoutMs?: number }) {
const timeoutMs = params.timeoutMs ?? CONNECT_TIMEOUT_MS;
return await new Promise<GatewayClient>((resolve, reject) => {
let done = false;
let client: GatewayClient | undefined;
const finish = (result: { client?: GatewayClient; error?: Error }) => {
if (done) {
return;
}
done = true;
clearTimeout(connectTimeout);
if (result.error) {
if (client) {
void client.stopAndWait({ timeoutMs: 1_000 }).catch(() => {});
}
reject(result.error);
return;
}
resolve(result.client as GatewayClient);
};
client = new GatewayClient({
url: params.url,
token: params.token,
clientName: GATEWAY_CLIENT_NAMES.TEST,
clientVersion: "dev",
mode: "test",
requestTimeoutMs: timeoutMs,
connectChallengeTimeoutMs: timeoutMs,
onHelloOk: () => finish({ client }),
onConnectError: (error) => finish({ error }),
onClose: (code, reason) =>
finish({ error: new Error(`gateway closed during connect (${code}): ${reason}`) }),
});
const connectTimeout = setTimeout(
() => finish({ error: new Error("gateway connect timeout") }),
timeoutMs,
);
connectTimeout.unref();
client.start();
return await connectTestGatewayClient({
...params,
timeoutMs,
maxAttemptTimeoutMs: 35_000,
clientDisplayName: null,
requestTimeoutMs: timeoutMs,
onRetry: (attempt, error) => {
logLiveStep(`gateway connect warmup retry ${attempt}: ${error.message}`);
},
});
}
function isRetryableGatewayConnectError(error: Error): boolean {
const message = error.message.toLowerCase();
return (
message.includes("gateway closed during connect (1000)") ||
message.includes("gateway connect timeout") ||
message.includes("gateway connect challenge timeout")
);
}
function isRetryableAcpBindWarmupText(texts: string[]): boolean {
const combined = texts.join("\n\n").toLowerCase();
return (

View File

@@ -16,7 +16,7 @@ import { isTruthyEnvValue } from "../infra/env.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { getFreePortBlockWithPermissionFallback } from "../test-utils/ports.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { GatewayClient } from "./client.js";
import { GatewayClient, type GatewayClientOptions } from "./client.js";
import {
assertCronJobMatches,
assertCronJobVisibleViaCli,
@@ -472,27 +472,35 @@ export async function connectTestGatewayClient(params: {
url: string;
token: string;
deviceIdentity?: DeviceIdentity;
timeoutMs?: number;
maxAttemptTimeoutMs?: number;
clientDisplayName?: string | null;
requestTimeoutMs?: number;
onRetry?: (attempt: number, error: Error) => void;
}): Promise<GatewayClient> {
const timeoutMs = params.timeoutMs ?? CLI_GATEWAY_CONNECT_TIMEOUT_MS;
const maxAttemptTimeoutMs = params.maxAttemptTimeoutMs ?? 45_000;
const startedAt = Date.now();
let attempt = 0;
let lastError: Error | null = null;
while (Date.now() - startedAt < CLI_GATEWAY_CONNECT_TIMEOUT_MS) {
while (Date.now() - startedAt < timeoutMs) {
attempt += 1;
const remainingMs = CLI_GATEWAY_CONNECT_TIMEOUT_MS - (Date.now() - startedAt);
const remainingMs = timeoutMs - (Date.now() - startedAt);
if (remainingMs <= 0) {
break;
}
try {
return await connectClientOnce({
...params,
timeoutMs: Math.min(remainingMs, 45_000),
timeoutMs: Math.min(remainingMs, maxAttemptTimeoutMs),
});
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error));
if (!isRetryableGatewayConnectError(lastError) || remainingMs <= 5_000) {
throw lastError;
}
params.onRetry?.(attempt, lastError);
await sleep(Math.min(1_000 * attempt, 5_000));
}
}
@@ -505,6 +513,8 @@ async function connectClientOnce(params: {
token: string;
timeoutMs: number;
deviceIdentity?: DeviceIdentity;
clientDisplayName?: string | null;
requestTimeoutMs?: number;
}): Promise<GatewayClient> {
return await new Promise<GatewayClient>((resolve, reject) => {
let done = false;
@@ -528,11 +538,10 @@ async function connectClientOnce(params: {
const failWithClose = (code: number, reason: string) =>
finish({ error: new Error(`gateway closed during connect (${code}): ${reason}`) });
client = new GatewayClient({
const clientOptions: GatewayClientOptions = {
url: params.url,
token: params.token,
clientName: GATEWAY_CLIENT_NAMES.TEST,
clientDisplayName: "vitest-live",
clientVersion: "dev",
mode: GATEWAY_CLIENT_MODES.TEST,
connectChallengeTimeoutMs: params.timeoutMs,
@@ -540,7 +549,15 @@ async function connectClientOnce(params: {
onHelloOk: () => finish({ client }),
onConnectError: (error) => finish({ error }),
onClose: failWithClose,
});
};
if (params.clientDisplayName !== null) {
clientOptions.clientDisplayName = params.clientDisplayName ?? "vitest-live";
}
if (params.requestTimeoutMs !== undefined) {
clientOptions.requestTimeoutMs = params.requestTimeoutMs;
}
client = new GatewayClient(clientOptions);
const connectTimeout = setTimeout(
() => finish({ error: new Error("gateway connect timeout") }),
@@ -620,7 +637,9 @@ function restoreEnvVar(name: string, value: string | undefined): void {
process.env[name] = value;
}
export async function ensurePairedTestGatewayClientIdentity(): Promise<DeviceIdentity> {
export async function ensurePairedTestGatewayClientIdentity(params?: {
displayName?: string;
}): Promise<DeviceIdentity> {
const identity = loadOrCreateDeviceIdentity();
const publicKey = publicKeyRawBase64UrlFromPem(identity.publicKeyPem);
const requiredScopes = ["operator.admin"];
@@ -639,7 +658,7 @@ export async function ensurePairedTestGatewayClientIdentity(): Promise<DeviceIde
const pairing = await requestDevicePairing({
deviceId: identity.deviceId,
publicKey,
displayName: "vitest",
displayName: params?.displayName ?? "vitest",
platform: process.platform,
clientId: GATEWAY_CLIENT_NAMES.TEST,
clientMode: GATEWAY_CLIENT_MODES.TEST,

View File

@@ -6,9 +6,12 @@ import path from "node:path";
import { describe, expect, it } from "vitest";
import { isLiveTestEnabled } from "../agents/live-test-helpers.js";
import type { OpenClawConfig } from "../config/config.js";
import type { DeviceIdentity } from "../infra/device-identity.js";
import { isTruthyEnvValue } from "../infra/env.js";
import type { GatewayClient } from "./client.js";
import {
connectTestGatewayClient,
ensurePairedTestGatewayClientIdentity,
} from "./gateway-cli-backend.live-helpers.js";
import {
EXPECTED_CODEX_MODELS_COMMAND_TEXT,
isExpectedCodexModelsCommandText,
@@ -116,99 +119,6 @@ async function getFreeGatewayPort(): Promise<number> {
return port;
}
async function ensurePairedTestGatewayClientIdentity(): Promise<DeviceIdentity> {
const { loadOrCreateDeviceIdentity, publicKeyRawBase64UrlFromPem } =
await import("../infra/device-identity.js");
const { approveDevicePairing, getPairedDevice, requestDevicePairing } =
await import("../infra/device-pairing.js");
const { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } =
await import("../utils/message-channel.js");
const identity = loadOrCreateDeviceIdentity();
const publicKey = publicKeyRawBase64UrlFromPem(identity.publicKeyPem);
const requiredScopes = ["operator.admin"];
const paired = await getPairedDevice(identity.deviceId);
const pairedScopes = Array.isArray(paired?.approvedScopes)
? paired.approvedScopes
: Array.isArray(paired?.scopes)
? paired.scopes
: [];
if (
paired?.publicKey === publicKey &&
requiredScopes.every((scope) => pairedScopes.includes(scope))
) {
return identity;
}
const pairing = await requestDevicePairing({
deviceId: identity.deviceId,
publicKey,
displayName: "vitest-codex-harness-live",
platform: process.platform,
clientId: GATEWAY_CLIENT_NAMES.TEST,
clientMode: GATEWAY_CLIENT_MODES.TEST,
role: "operator",
scopes: requiredScopes,
silent: true,
});
const approved = await approveDevicePairing(pairing.request.requestId, {
callerScopes: requiredScopes,
});
if (approved?.status !== "approved") {
throw new Error(`failed to pre-pair live test device: ${approved?.status ?? "missing"}`);
}
return identity;
}
async function connectTestGatewayClient(params: {
deviceIdentity: DeviceIdentity;
token: string;
url: string;
}): Promise<GatewayClient> {
const { GatewayClient } = await import("./client.js");
const { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } =
await import("../utils/message-channel.js");
return await new Promise<GatewayClient>((resolve, reject) => {
let done = false;
let client: GatewayClient | undefined;
const connectTimeout = setTimeout(() => {
finish({ error: new Error("gateway connect timeout") });
}, GATEWAY_CONNECT_TIMEOUT_MS);
connectTimeout.unref();
function finish(result: { client?: GatewayClient; error?: Error }): void {
if (done) {
return;
}
done = true;
clearTimeout(connectTimeout);
if (result.error) {
if (client) {
void client.stopAndWait({ timeoutMs: 1_000 }).catch(() => {});
}
reject(result.error);
return;
}
resolve(result.client as GatewayClient);
}
client = new GatewayClient({
url: params.url,
token: params.token,
clientName: GATEWAY_CLIENT_NAMES.TEST,
clientDisplayName: "vitest-codex-harness-live",
clientVersion: "dev",
mode: GATEWAY_CLIENT_MODES.TEST,
connectChallengeTimeoutMs: GATEWAY_CONNECT_TIMEOUT_MS,
deviceIdentity: params.deviceIdentity,
onHelloOk: () => finish({ client }),
onConnectError: (error) => finish({ error }),
onClose: (code, reason) => {
finish({ error: new Error(`gateway closed during connect (${code}): ${reason}`) });
},
});
client.start();
});
}
async function createLiveWorkspace(tempDir: string): Promise<string> {
const workspace = path.join(tempDir, "workspace");
await fs.mkdir(workspace, { recursive: true });
@@ -459,7 +369,9 @@ describeLive("gateway live (Codex harness)", () => {
await fs.mkdir(stateDir, { recursive: true });
await writeLiveGatewayConfig({ configPath, modelKey, port, token, workspace });
const deviceIdentity = await ensurePairedTestGatewayClientIdentity();
const deviceIdentity = await ensurePairedTestGatewayClientIdentity({
displayName: "vitest-codex-harness-live",
});
logCodexLiveStep("config-written", { configPath, modelKey, port });
const server = await startGatewayServer(port, {
@@ -471,6 +383,8 @@ describeLive("gateway live (Codex harness)", () => {
url: `ws://127.0.0.1:${port}`,
token,
deviceIdentity,
timeoutMs: GATEWAY_CONNECT_TIMEOUT_MS,
clientDisplayName: "vitest-codex-harness-live",
});
logCodexLiveStep("client-connected");