import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import { randomUUID } from "node:crypto"; import fs from "node:fs/promises"; import { request as httpRequest } from "node:http"; import net from "node:net"; import os from "node:os"; import path from "node:path"; import { GatewayClient } from "../../src/gateway/client.js"; import { connectGatewayClient } from "../../src/gateway/test-helpers.e2e.js"; import { loadOrCreateDeviceIdentity } from "../../src/infra/device-identity.js"; import { extractFirstTextBlock } from "../../src/shared/chat-message-content.js"; import { sleep } from "../../src/utils.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../../src/utils/message-channel.js"; export { extractFirstTextBlock }; type NodeListPayload = { nodes?: Array<{ nodeId?: string; connected?: boolean; paired?: boolean }>; }; export type ChatEventPayload = { runId?: string; sessionKey?: string; state?: string; message?: unknown; }; export type GatewayInstance = { name: string; port: number; hookToken: string; gatewayToken: string; homeDir: string; stateDir: string; configPath: string; child: ChildProcessWithoutNullStreams; stdout: string[]; stderr: string[]; }; const GATEWAY_START_TIMEOUT_MS = 60_000; const GATEWAY_STOP_TIMEOUT_MS = 1_500; const GATEWAY_CONNECT_STATUS_TIMEOUT_MS = 2_000; const GATEWAY_NODE_STATUS_TIMEOUT_MS = 4_000; const GATEWAY_NODE_STATUS_POLL_MS = 20; const getFreePort = async () => { const srv = net.createServer(); await new Promise((resolve) => srv.listen(0, "127.0.0.1", resolve)); const addr = srv.address(); if (!addr || typeof addr === "string") { srv.close(); throw new Error("failed to bind ephemeral port"); } await new Promise((resolve) => srv.close(() => resolve())); return addr.port; }; async function waitForPortOpen( proc: ChildProcessWithoutNullStreams, chunksOut: string[], chunksErr: string[], port: number, timeoutMs: number, ) { const startedAt = Date.now(); while (Date.now() - startedAt < timeoutMs) { if (proc.exitCode !== null) { const stdout = chunksOut.join(""); const stderr = chunksErr.join(""); throw new Error( `gateway exited before listening (code=${String(proc.exitCode)} signal=${String(proc.signalCode)})\n` + `--- stdout ---\n${stdout}\n--- stderr ---\n${stderr}`, ); } try { await new Promise((resolve, reject) => { const socket = net.connect({ host: "127.0.0.1", port }); socket.once("connect", () => { socket.destroy(); resolve(); }); socket.once("error", (err) => { socket.destroy(); reject(err); }); }); return; } catch { // keep polling } await sleep(10); } const stdout = chunksOut.join(""); const stderr = chunksErr.join(""); throw new Error( `timeout waiting for gateway to listen on port ${port}\n` + `--- stdout ---\n${stdout}\n--- stderr ---\n${stderr}`, ); } export async function spawnGatewayInstance(name: string): Promise { const port = await getFreePort(); const hookToken = `token-${name}-${randomUUID()}`; const gatewayToken = `gateway-${name}-${randomUUID()}`; const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), `openclaw-e2e-${name}-`)); const configDir = path.join(homeDir, ".openclaw"); await fs.mkdir(configDir, { recursive: true }); const configPath = path.join(configDir, "openclaw.json"); const stateDir = path.join(configDir, "state"); const config = { gateway: { port, auth: { mode: "token", token: gatewayToken }, controlUi: { enabled: false }, }, hooks: { enabled: true, token: hookToken, path: "/hooks" }, }; await fs.writeFile(configPath, JSON.stringify(config, null, 2), "utf8"); const stdout: string[] = []; const stderr: string[] = []; let child: ChildProcessWithoutNullStreams | null = null; try { child = spawn( "node", [ "dist/index.js", "gateway", "--port", String(port), "--bind", "loopback", "--allow-unconfigured", ], { cwd: process.cwd(), env: { ...process.env, HOME: homeDir, OPENCLAW_CONFIG_PATH: configPath, OPENCLAW_STATE_DIR: stateDir, OPENCLAW_GATEWAY_TOKEN: "", OPENCLAW_GATEWAY_PASSWORD: "", OPENCLAW_SKIP_CHANNELS: "1", OPENCLAW_SKIP_PROVIDERS: "1", OPENCLAW_SKIP_GMAIL_WATCHER: "1", OPENCLAW_SKIP_CRON: "1", OPENCLAW_SKIP_BROWSER_CONTROL_SERVER: "1", OPENCLAW_SKIP_CANVAS_HOST: "1", OPENCLAW_TEST_MINIMAL_GATEWAY: "1", VITEST: "1", }, stdio: ["ignore", "pipe", "pipe"], }, ); child.stdout?.setEncoding("utf8"); child.stderr?.setEncoding("utf8"); child.stdout?.on("data", (d) => stdout.push(String(d))); child.stderr?.on("data", (d) => stderr.push(String(d))); await waitForPortOpen(child, stdout, stderr, port, GATEWAY_START_TIMEOUT_MS); return { name, port, hookToken, gatewayToken, homeDir, stateDir, configPath, child, stdout, stderr, }; } catch (err) { if (child && child.exitCode === null && !child.killed) { try { child.kill("SIGKILL"); } catch { // ignore } } await fs.rm(homeDir, { recursive: true, force: true }); throw err; } } export async function stopGatewayInstance(inst: GatewayInstance) { if (inst.child.exitCode === null && !inst.child.killed) { try { inst.child.kill("SIGTERM"); } catch { // ignore } } const exited = await Promise.race([ new Promise((resolve) => { if (inst.child.exitCode !== null) { return resolve(true); } inst.child.once("exit", () => resolve(true)); }), sleep(GATEWAY_STOP_TIMEOUT_MS).then(() => false), ]); if (!exited && inst.child.exitCode === null && !inst.child.killed) { try { inst.child.kill("SIGKILL"); } catch { // ignore } } await fs.rm(inst.homeDir, { recursive: true, force: true }); } export async function postJson( url: string, body: unknown, headers?: Record, ): Promise<{ status: number; json: unknown }> { const payload = JSON.stringify(body); const parsed = new URL(url); return await new Promise<{ status: number; json: unknown }>((resolve, reject) => { const req = httpRequest( { method: "POST", hostname: parsed.hostname, port: Number(parsed.port), path: `${parsed.pathname}${parsed.search}`, headers: { "Content-Type": "application/json", "Content-Length": Buffer.byteLength(payload), ...headers, }, }, (res) => { let data = ""; res.setEncoding("utf8"); res.on("data", (chunk) => { data += chunk; }); res.on("end", () => { let json: unknown = null; if (data.trim()) { try { json = JSON.parse(data); } catch { json = data; } } resolve({ status: res.statusCode ?? 0, json }); }); }, ); req.on("error", reject); req.write(payload); req.end(); }); } export async function connectNode( inst: GatewayInstance, label: string, ): Promise<{ client: GatewayClient; nodeId: string }> { const identityPath = path.join(inst.homeDir, `${label}-device.json`); const deviceIdentity = loadOrCreateDeviceIdentity(identityPath); const nodeId = deviceIdentity.deviceId; const client = await connectGatewayClient({ url: `ws://127.0.0.1:${inst.port}`, token: inst.gatewayToken, clientName: GATEWAY_CLIENT_NAMES.NODE_HOST, clientDisplayName: label, clientVersion: "1.0.0", platform: "ios", mode: GATEWAY_CLIENT_MODES.NODE, role: "node", scopes: [], caps: ["system"], commands: ["system.run"], deviceIdentity, timeoutMessage: `timeout waiting for ${label} to connect`, }); return { client, nodeId }; } async function connectStatusClient( inst: GatewayInstance, timeoutMs = GATEWAY_CONNECT_STATUS_TIMEOUT_MS, ): Promise { let settled = false; let timer: NodeJS.Timeout | null = null; return await new Promise((resolve, reject) => { const finish = (err?: Error) => { if (settled) { return; } settled = true; if (timer) { clearTimeout(timer); } if (err) { reject(err); return; } resolve(client); }; const client = new GatewayClient({ url: `ws://127.0.0.1:${inst.port}`, connectDelayMs: 0, token: inst.gatewayToken, clientName: GATEWAY_CLIENT_NAMES.CLI, clientDisplayName: `status-${inst.name}`, clientVersion: "1.0.0", platform: "test", mode: GATEWAY_CLIENT_MODES.CLI, onHelloOk: () => { finish(); }, onConnectError: (err) => finish(err), onClose: (code, reason) => { finish(new Error(`gateway closed (${code}): ${reason}`)); }, }); timer = setTimeout(() => { finish(new Error("timeout waiting for node.list")); }, timeoutMs); client.start(); }); } export async function waitForNodeStatus( inst: GatewayInstance, nodeId: string, timeoutMs = GATEWAY_NODE_STATUS_TIMEOUT_MS, ) { const deadline = Date.now() + timeoutMs; const client = await connectStatusClient( inst, Math.min(GATEWAY_CONNECT_STATUS_TIMEOUT_MS, timeoutMs), ); try { while (Date.now() < deadline) { const list = await client.request("node.list", {}); const match = list.nodes?.find((n) => n.nodeId === nodeId); if (match?.connected && match?.paired) { return; } await sleep(GATEWAY_NODE_STATUS_POLL_MS); } } finally { client.stop(); } throw new Error(`timeout waiting for node status for ${nodeId}`); } export async function waitForChatFinalEvent(params: { events: ChatEventPayload[]; runId: string; sessionKey: string; timeoutMs?: number; }): Promise { const deadline = Date.now() + (params.timeoutMs ?? 15_000); while (Date.now() < deadline) { const match = params.events.find( (evt) => evt.runId === params.runId && evt.sessionKey === params.sessionKey && evt.state === "final", ); if (match) { return match; } await sleep(20); } throw new Error(`timeout waiting for final chat event (runId=${params.runId})`); }