fix(gateway): wait for event loop before client start

This commit is contained in:
Peter Steinberger
2026-04-29 14:50:21 +01:00
parent a972c9ec45
commit 6bbacd14a3
24 changed files with 675 additions and 68 deletions

View File

@@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
- CLI/tools: keep the Gateway `tools.*` RPC namespace out of plugin command discovery and managed proxy startup, so stray commands like `openclaw tools effective` fail quickly instead of cold-loading plugin metadata. Refs #73477. Thanks @oromeis.
- CLI/status: keep default text `openclaw status --usage` on metadata-only channel scans unless `--deep` or `--all` is set, and send stray `openclaw tools --help` through the precomputed root-help fast path so latency-triage commands avoid plugin/runtime cold loads before printing. Refs #73477 and #74220. Thanks @oromeis and @NianJiuZst.
- Agents/diagnostics: log slow embedded-run startup and preparation stage timings before model I/O, so Docker/VPS latency reports can identify whether plugin loading, auth/model resolution, tool inventory, bootstrap, MCP/LSP, resource loading, or stream setup is dominating pre-run latency. Refs #73428. Thanks @Dimaoggg, @quangtran88, and @Heyvhuang.
- Gateway/clients: wait for the event loop to become responsive before opening Gateway WebSocket RPC/probe/client connections while charging that readiness wait to caller timeouts, so Windows deferred module-evaluation stalls no longer turn healthy loopback gateways into false handshake timeouts across status, TUI, ACP, MCP, node-host, and plugin client paths. Refs #74279 and #48270. Thanks @wongcode and @joost-heijden.
- Plugins/runtime-deps: memoize packaged bundled runtime dist-mirror preparation after the first successful pass while keeping source-checkout mirrors refreshable, so constrained Docker/VPS installs avoid repeated root scans before chat turns. Refs #73428, #73421, #73532, and #73477. Thanks @Dimaoggg, @oromeis, @oadiazp, @jmfraga, @bstanbury, @antoniusfelix, and @jkobject.
- Channels/Discord: treat bare numeric outbound targets that match the effective Discord DM allowlist as user DMs while preserving account-specific legacy `dm.allowFrom` precedence over inherited root `allowFrom`. (#74303) Thanks @Squirbie.
- Control UI: make the chat sidebar split divider focusable, keyboard-resizable, ARIA-described, and pointer-event based so sidebar resizing works without a mouse. Thanks @BunsDev.

View File

@@ -1,2 +1,2 @@
d5b33ee6be988cd6a844a358aaa098e1f6401b151e5ee1e46dceeccddaeb7434 plugin-sdk-api-baseline.json
dffa8b4afbb085faf42a857805c43708b748111e346552d7ea4654da3bafdee7 plugin-sdk-api-baseline.jsonl
4266568b93a03ce07e7811285ebc67ec90ebf7519ee892bbeb618b4a527f81e9 plugin-sdk-api-baseline.json
545c2ea8874ef1332c7710b74291f391231c861ac296afa46ddcc91fc1c2ed1c plugin-sdk-api-baseline.jsonl

View File

@@ -412,7 +412,7 @@ releases.
| `plugin-sdk/lazy-runtime` | Lazy runtime helpers | `createLazyRuntimeModule`, `createLazyRuntimeMethod`, `createLazyRuntimeMethodBinder`, `createLazyRuntimeNamedExport`, `createLazyRuntimeSurface` |
| `plugin-sdk/process-runtime` | Process helpers | Shared exec helpers |
| `plugin-sdk/cli-runtime` | CLI runtime helpers | Command formatting, waits, version helpers |
| `plugin-sdk/gateway-runtime` | Gateway helpers | Gateway client and channel-status patch helpers |
| `plugin-sdk/gateway-runtime` | Gateway helpers | Gateway client, event-loop-ready start helper, and channel-status patch helpers |
| `plugin-sdk/config-runtime` | Deprecated config compatibility shim | Prefer `config-types`, `plugin-config-runtime`, `runtime-config-snapshot`, and `config-mutation` |
| `plugin-sdk/telegram-command-config` | Telegram command helpers | Fallback-stable Telegram command validation helpers when the bundled Telegram contract surface is unavailable |
| `plugin-sdk/approval-runtime` | Approval prompt helpers | Exec/plugin approval payload, approval capability/profile helpers, native approval routing/runtime helpers, and structured approval display path formatting |

View File

@@ -180,7 +180,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview)
| `plugin-sdk/lazy-runtime` | Lazy runtime import/binding helpers such as `createLazyRuntimeModule`, `createLazyRuntimeMethod`, and `createLazyRuntimeSurface` |
| `plugin-sdk/process-runtime` | Process exec helpers |
| `plugin-sdk/cli-runtime` | CLI formatting, wait, version, argument-invocation, and lazy command-group helpers |
| `plugin-sdk/gateway-runtime` | Gateway client, gateway CLI RPC, gateway protocol errors, and channel-status patch helpers |
| `plugin-sdk/gateway-runtime` | Gateway client, event-loop-ready client start helper, gateway CLI RPC, gateway protocol errors, and channel-status patch helpers |
| `plugin-sdk/config-types` | Type-only config surface for plugin config shapes such as `OpenClawConfig` and channel/provider config types |
| `plugin-sdk/plugin-config-runtime` | Runtime plugin-config lookup helpers such as `requireRuntimeConfig`, `resolvePluginConfigObject`, and `resolveLivePluginConfigObject` |
| `plugin-sdk/config-mutation` | Transactional config mutation helpers such as `mutateConfigFile`, `replaceConfigFile`, and `logConfigUpdated` |

View File

@@ -1,5 +1,8 @@
import { setTimeout as sleep } from "node:timers/promises";
import { GatewayClient } from "openclaw/plugin-sdk/gateway-runtime";
import {
GatewayClient,
startGatewayClientWhenEventLoopReady,
} from "openclaw/plugin-sdk/gateway-runtime";
import type { GoogleMeetConfig } from "./config.js";
type VoiceCallGatewayClient = InstanceType<typeof GatewayClient>;
@@ -20,10 +23,11 @@ async function createConnectedGatewayClient(
): Promise<VoiceCallGatewayClient> {
let client: VoiceCallGatewayClient;
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error("gateway connect timeout")),
config.voiceCall.requestTimeoutMs,
);
const abortStart = new AbortController();
const timer = setTimeout(() => {
abortStart.abort();
reject(new Error("gateway connect timeout"));
}, config.voiceCall.requestTimeoutMs);
client = new GatewayClient({
url: config.voiceCall.gatewayUrl,
token: config.voiceCall.token,
@@ -37,10 +41,24 @@ async function createConnectedGatewayClient(
},
onConnectError: (err) => {
clearTimeout(timer);
abortStart.abort();
reject(err);
},
});
client.start();
void startGatewayClientWhenEventLoopReady(client, {
timeoutMs: config.voiceCall.requestTimeoutMs,
signal: abortStart.signal,
})
.then((readiness) => {
if (!readiness.ready && !readiness.aborted) {
clearTimeout(timer);
reject(new Error("gateway event loop readiness timeout"));
}
})
.catch((err) => {
clearTimeout(timer);
reject(err instanceof Error ? err : new Error(String(err)));
});
});
return client!;
}

View File

@@ -107,6 +107,19 @@ vi.mock("../gateway/client.js", () => ({
GatewayClient: MockGatewayClient,
}));
vi.mock("../gateway/client-start-readiness.js", () => ({
startGatewayClientWhenEventLoopReady: vi.fn(async (client: MockGatewayClient) => {
client.start();
return {
ready: true,
elapsedMs: 0,
maxDriftMs: 0,
checks: 2,
aborted: false,
};
}),
}));
vi.mock("../infra/is-main.js", () => ({
isMainModule: () => false,
}));
@@ -158,11 +171,14 @@ describe("serveAcpGateway startup", () => {
}
async function emitHelloAndWaitForAgentSideConnection() {
await vi.waitFor(() => {
expect(mockState.gateways).toHaveLength(1);
});
const gateway = getMockGateway();
gateway.emitHello();
await Promise.resolve();
await Promise.resolve();
expect(mockState.agentSideConnectionCtor).toHaveBeenCalledTimes(1);
await vi.waitFor(() => {
expect(mockState.agentSideConnectionCtor).toHaveBeenCalledTimes(1);
});
}
async function stopServeWithSigint(

View File

@@ -4,6 +4,7 @@ import { fileURLToPath } from "node:url";
import { AgentSideConnection, ndJsonStream } from "@agentclientprotocol/sdk";
import { getRuntimeConfig } from "../config/config.js";
import { resolveGatewayClientBootstrap } from "../gateway/client-bootstrap.js";
import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js";
import { GatewayClient } from "../gateway/client.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js";
import { isMainModule } from "../infra/is-main.js";
@@ -103,7 +104,12 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise<void
process.once("SIGTERM", shutdown);
// Start gateway first and wait for hello before accepting ACP requests.
gateway.start();
const readiness = await startGatewayClientWhenEventLoopReady(gateway, {
clientOptions: { preauthHandshakeTimeoutMs: bootstrap.preauthHandshakeTimeoutMs },
});
if (!readiness.ready) {
rejectGatewayReady(new Error("gateway event loop readiness timeout"));
}
await gatewayReady.catch((err) => {
shutdown();
throw err;

View File

@@ -22,6 +22,24 @@ const deviceIdentityState = vi.hoisted(() => ({
throwOnLoad: false,
}));
const eventLoopReadyState = vi.hoisted(() => ({
calls: [] as Array<{ maxWaitMs?: number } | undefined>,
promise: null as Promise<{
ready: boolean;
elapsedMs: number;
maxDriftMs: number;
checks: number;
aborted: boolean;
}> | null,
result: {
ready: true,
elapsedMs: 0,
maxDriftMs: 0,
checks: 2,
aborted: false,
},
}));
let lastClientOptions: {
url?: string;
token?: string;
@@ -43,6 +61,7 @@ let lastRequestOptions: {
} | null = null;
type StartMode = "hello" | "close" | "silent" | "startup-retry-then-hello";
let startMode: StartMode = "hello";
let startCalls = 0;
let closeCode = 1006;
let closeReason = "";
let helloMethods: string[] | undefined = ["health", "secrets.resolve"];
@@ -81,6 +100,7 @@ vi.mock("./client.js", () => ({
return { ok: true };
}
start() {
startCalls += 1;
if (startMode === "hello") {
void lastClientOptions?.onHelloOk?.({
features: {
@@ -101,6 +121,16 @@ vi.mock("./client.js", () => ({
},
}));
vi.mock("./event-loop-ready.js", () => ({
waitForEventLoopReady: vi.fn(async (params?: { maxWaitMs?: number }) => {
eventLoopReadyState.calls.push(params);
if (eventLoopReadyState.promise) {
return await eventLoopReadyState.promise;
}
return eventLoopReadyState.result;
}),
}));
const {
__testing,
buildGatewayConnectionDetails,
@@ -134,6 +164,7 @@ class StubGatewayClient {
return { ok: true };
}
start() {
startCalls += 1;
if (startMode === "hello") {
void lastClientOptions?.onHelloOk?.({
features: {
@@ -161,7 +192,17 @@ function resetGatewayCallMocks() {
pickPrimaryLanIPv4.mockClear();
lastClientOptions = null;
lastRequestOptions = null;
eventLoopReadyState.calls = [];
eventLoopReadyState.promise = null;
eventLoopReadyState.result = {
ready: true,
elapsedMs: 0,
maxDriftMs: 0,
checks: 2,
aborted: false,
};
startMode = "hello";
startCalls = 0;
closeCode = 1006;
closeReason = "";
helloMethods = ["health", "secrets.resolve"];
@@ -570,52 +611,37 @@ describe("callGateway url resolution", () => {
expect(lastClientOptions?.clientDisplayName).toBeUndefined();
});
it("yields one event-loop turn before starting CLI pairing requests", async () => {
it("waits for event-loop readiness before starting CLI pairing requests", async () => {
setLocalLoopbackGatewayConfig();
let preConnectYieldRan = false;
let sawYieldBeforeStart = false;
setImmediate(() => {
preConnectYieldRan = true;
let resolveReady!: (result: {
ready: boolean;
elapsedMs: number;
maxDriftMs: number;
checks: number;
aborted: boolean;
}) => void;
eventLoopReadyState.promise = new Promise((resolve) => {
resolveReady = resolve;
});
__testing.setDepsForTests({
createGatewayClient: (opts) =>
({
async request(
method: string,
params: unknown,
requestOpts?: { expectFinal?: boolean; timeoutMs?: number | null },
) {
lastRequestOptions = { method, params, opts: requestOpts };
return { ok: true };
},
start() {
sawYieldBeforeStart = preConnectYieldRan;
opts.onHelloOk?.({
features: {
methods: helloMethods ?? [],
events: [],
},
} as unknown as Parameters<NonNullable<typeof opts.onHelloOk>>[0]);
},
stop() {},
}) as never,
getRuntimeConfig: getRuntimeConfig as unknown as () => OpenClawConfig,
loadOrCreateDeviceIdentity: () => deviceIdentityState.value,
resolveGatewayPort: resolveGatewayPort as unknown as (
cfg?: OpenClawConfig,
env?: NodeJS.ProcessEnv,
) => number,
});
await callGateway({
const promise = callGateway({
method: "device.pair.list",
mode: GATEWAY_CLIENT_MODES.CLI,
clientName: GATEWAY_CLIENT_NAMES.CLI,
});
expect(sawYieldBeforeStart).toBe(true);
await vi.waitFor(() => {
expect(eventLoopReadyState.calls).toHaveLength(1);
});
expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(10_000);
expect(lastClientOptions?.clientName).toBe(GATEWAY_CLIENT_NAMES.CLI);
expect(startCalls).toBe(0);
resolveReady({ ready: true, elapsedMs: 0, maxDriftMs: 0, checks: 2, aborted: false });
await promise;
expect(startCalls).toBe(1);
});
});
@@ -896,6 +922,51 @@ describe("callGateway error details", () => {
});
});
it("charges event-loop readiness against the wrapper timeout", async () => {
startMode = "silent";
setLocalLoopbackGatewayConfig();
eventLoopReadyState.promise = new Promise(() => {});
vi.useFakeTimers();
let errMessage = "";
const promise = callGateway({ method: "health", timeoutMs: 5 }).catch((caught) => {
errMessage = caught instanceof Error ? caught.message : String(caught);
});
await vi.waitFor(() => {
expect(eventLoopReadyState.calls).toHaveLength(1);
});
expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(5);
expect(startCalls).toBe(0);
await vi.advanceTimersByTimeAsync(5);
await promise;
expect(startCalls).toBe(0);
expect(errMessage).toContain("gateway timeout after 5ms");
});
it("fails before connecting when event-loop readiness consumes the wrapper timeout", async () => {
startMode = "silent";
setLocalLoopbackGatewayConfig();
eventLoopReadyState.result = {
ready: false,
elapsedMs: 5,
maxDriftMs: 400,
checks: 1,
aborted: false,
};
await expect(callGateway({ method: "health", timeoutMs: 5 })).rejects.toMatchObject({
name: "GatewayTransportError",
kind: "timeout",
timeoutMs: 5,
});
expect(eventLoopReadyState.calls).toHaveLength(1);
expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(5);
expect(lastClientOptions).not.toBeNull();
expect(startCalls).toBe(0);
});
it("keeps the default wrapper timeout aligned with configured handshake timeout", async () => {
startMode = "silent";
getRuntimeConfig.mockReturnValue({

View File

@@ -18,6 +18,7 @@ import {
} from "../utils/message-channel.js";
import { resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js";
import { VERSION } from "../version.js";
import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js";
import { GatewayClient, type GatewayClientOptions } from "./client.js";
import {
buildGatewayConnectionDetailsWithResolvers,
@@ -615,19 +616,16 @@ async function executeGatewayRequestWithScopes<T>(params: {
timeoutMs,
safeTimerTimeoutMs,
} = params;
// Yield to the event loop before starting the WebSocket connection.
// On Windows with large dist bundles, heavy synchronous module loading
// can starve the event loop, preventing timely processing of the
// connect.challenge frame and causing handshake timeouts (#48736).
await new Promise<void>((r) => setImmediate(r));
return await new Promise<T>((resolve, reject) => {
let settled = false;
let ignoreClose = false;
const startAbort = new AbortController();
const stop = (err?: Error, value?: T) => {
if (settled) {
return;
}
settled = true;
startAbort.abort();
clearTimeout(timer);
void stopGatewayClient(client).finally(() => {
if (err) {
@@ -701,7 +699,29 @@ async function executeGatewayRequestWithScopes<T>(params: {
);
}, safeTimerTimeoutMs);
client.start();
void startGatewayClientWhenEventLoopReady(client, {
timeoutMs: safeTimerTimeoutMs,
signal: startAbort.signal,
})
.then((readiness) => {
if (settled || readiness.ready || readiness.aborted) {
return;
}
ignoreClose = true;
stop(
createGatewayTimeoutTransportError({
timeoutMs,
connectionDetails: params.connectionDetails,
}),
);
})
.catch((err) => {
if (settled) {
return;
}
ignoreClose = true;
stop(err instanceof Error ? err : new Error(String(err)));
});
});
}

View File

@@ -5,6 +5,7 @@ import { describe, expect, it } from "vitest";
const GATEWAY_CLIENT_CONSTRUCTOR_PATTERN = /new\s+GatewayClient\s*\(/;
const ALLOWED_GATEWAY_CLIENT_CALLSITES = new Set([
"extensions/google-meet/src/voice-call-gateway.ts",
"src/acp/server.ts",
"src/gateway/call.ts",
"src/gateway/gateway-cli-backend.live-helpers.ts",
@@ -45,7 +46,10 @@ async function collectSourceFiles(dir: string): Promise<string[]> {
describe("GatewayClient production callsites", () => {
it("remain constrained to allowlisted files", async () => {
const root = process.cwd();
const sourceFiles = await collectSourceFiles(path.join(root, "src"));
const sourceFiles = [
...(await collectSourceFiles(path.join(root, "src"))),
...(await collectSourceFiles(path.join(root, "extensions"))),
];
const callsites: string[] = [];
for (const fullPath of sourceFiles) {
const relativePath = path.relative(root, fullPath).replaceAll(path.sep, "/");

View File

@@ -0,0 +1,38 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js";
import type { GatewayClient } from "./client.js";
describe("startGatewayClientWhenEventLoopReady", () => {
afterEach(() => {
vi.useRealTimers();
});
it("starts the client only after the event loop is responsive", async () => {
vi.useFakeTimers();
const client = { start: vi.fn() } as unknown as GatewayClient;
const promise = startGatewayClientWhenEventLoopReady(client, { timeoutMs: 100 });
await vi.advanceTimersByTimeAsync(1);
expect(client.start).not.toHaveBeenCalled();
await vi.advanceTimersByTimeAsync(1);
await expect(promise).resolves.toMatchObject({ ready: true });
expect(client.start).toHaveBeenCalledTimes(1);
});
it("does not start the client after an aborted readiness wait", async () => {
vi.useFakeTimers();
const client = { start: vi.fn() } as unknown as GatewayClient;
const controller = new AbortController();
const promise = startGatewayClientWhenEventLoopReady(client, {
timeoutMs: 100,
signal: controller.signal,
});
controller.abort();
await expect(promise).resolves.toMatchObject({ ready: false, aborted: true });
expect(client.start).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,46 @@
import type { GatewayClient, GatewayClientOptions } from "./client.js";
import { waitForEventLoopReady, type EventLoopReadyResult } from "./event-loop-ready.js";
import { resolveConnectChallengeTimeoutMs } from "./handshake-timeouts.js";
export type GatewayClientStartReadinessOptions = {
timeoutMs?: number;
clientOptions?: Pick<
GatewayClientOptions,
"connectChallengeTimeoutMs" | "connectDelayMs" | "preauthHandshakeTimeoutMs"
>;
signal?: AbortSignal;
};
export function resolveGatewayClientStartReadinessTimeoutMs(
options: GatewayClientStartReadinessOptions = {},
): number {
if (typeof options.timeoutMs === "number" && Number.isFinite(options.timeoutMs)) {
return options.timeoutMs;
}
const clientOptions = options.clientOptions ?? {};
const timeoutOverride =
typeof clientOptions.connectChallengeTimeoutMs === "number" &&
Number.isFinite(clientOptions.connectChallengeTimeoutMs)
? clientOptions.connectChallengeTimeoutMs
: typeof clientOptions.connectDelayMs === "number" &&
Number.isFinite(clientOptions.connectDelayMs)
? clientOptions.connectDelayMs
: undefined;
return resolveConnectChallengeTimeoutMs(timeoutOverride, {
configuredTimeoutMs: clientOptions.preauthHandshakeTimeoutMs,
});
}
export async function startGatewayClientWhenEventLoopReady(
client: GatewayClient,
options: GatewayClientStartReadinessOptions = {},
): Promise<EventLoopReadyResult> {
const readiness = await waitForEventLoopReady({
maxWaitMs: resolveGatewayClientStartReadinessTimeoutMs(options),
signal: options.signal,
});
if (readiness.ready && !readiness.aborted && options.signal?.aborted !== true) {
client.start();
}
return readiness;
}

View File

@@ -0,0 +1,69 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { waitForEventLoopReady } from "./event-loop-ready.js";
describe("waitForEventLoopReady", () => {
afterEach(() => {
vi.useRealTimers();
});
it("resolves ready after consecutive low-drift timer checks", async () => {
vi.useFakeTimers();
const promise = waitForEventLoopReady({
maxWaitMs: 100,
intervalMs: 10,
consecutiveReadyChecks: 2,
});
await vi.advanceTimersByTimeAsync(20);
await expect(promise).resolves.toMatchObject({
ready: true,
aborted: false,
elapsedMs: 20,
checks: 2,
maxDriftMs: 0,
});
});
it("resolves not-ready when the readiness deadline expires", async () => {
vi.useFakeTimers();
const promise = waitForEventLoopReady({
maxWaitMs: 5,
intervalMs: 5,
consecutiveReadyChecks: 2,
});
await vi.advanceTimersByTimeAsync(5);
await expect(promise).resolves.toMatchObject({
ready: false,
aborted: false,
elapsedMs: 5,
checks: 1,
maxDriftMs: 0,
});
});
it("clears pending readiness timers when aborted", async () => {
vi.useFakeTimers();
const controller = new AbortController();
const promise = waitForEventLoopReady({
maxWaitMs: 100,
intervalMs: 10,
signal: controller.signal,
});
controller.abort();
await expect(promise).resolves.toMatchObject({
ready: false,
aborted: true,
elapsedMs: 0,
checks: 0,
});
expect(vi.getTimerCount()).toBe(0);
});
});

View File

@@ -0,0 +1,114 @@
import { resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js";
export type EventLoopReadyResult = {
ready: boolean;
elapsedMs: number;
maxDriftMs: number;
checks: number;
aborted: boolean;
};
export type EventLoopReadyOptions = {
maxWaitMs?: number;
intervalMs?: number;
driftThresholdMs?: number;
consecutiveReadyChecks?: number;
signal?: AbortSignal;
};
const DEFAULT_MAX_WAIT_MS = 10_000;
const DEFAULT_INTERVAL_MS = 1;
const DEFAULT_DRIFT_THRESHOLD_MS = 200;
const DEFAULT_CONSECUTIVE_READY_CHECKS = 2;
function resolvePositiveInteger(value: number | undefined, fallback: number): number {
return Number.isFinite(value) && value !== undefined ? Math.max(1, Math.floor(value)) : fallback;
}
export async function waitForEventLoopReady(
options: EventLoopReadyOptions = {},
): Promise<EventLoopReadyResult> {
const maxWaitMs = resolveSafeTimeoutDelayMs(options.maxWaitMs ?? DEFAULT_MAX_WAIT_MS);
const intervalMs = resolvePositiveInteger(options.intervalMs, DEFAULT_INTERVAL_MS);
const driftThresholdMs = resolvePositiveInteger(
options.driftThresholdMs,
DEFAULT_DRIFT_THRESHOLD_MS,
);
const consecutiveReadyChecks = resolvePositiveInteger(
options.consecutiveReadyChecks,
DEFAULT_CONSECUTIVE_READY_CHECKS,
);
const signal = options.signal;
const startedAt = Date.now();
let readyChecks = 0;
let checks = 0;
let maxDriftMs = 0;
return await new Promise<EventLoopReadyResult>((resolve) => {
let settled = false;
let timer: ReturnType<typeof setTimeout> | null = null;
const clearTimer = () => {
if (timer) {
clearTimeout(timer);
timer = null;
}
};
const finish = (ready: boolean, aborted = false) => {
if (settled) {
return;
}
settled = true;
clearTimer();
signal?.removeEventListener("abort", onAbort);
resolve({
ready,
elapsedMs: Math.max(0, Date.now() - startedAt),
maxDriftMs,
checks,
aborted,
});
};
const onAbort = () => {
finish(false, true);
};
if (signal?.aborted) {
finish(false, true);
return;
}
signal?.addEventListener("abort", onAbort, { once: true });
const scheduleNext = () => {
if (signal?.aborted) {
finish(false, true);
return;
}
const elapsedMs = Math.max(0, Date.now() - startedAt);
const remainingMs = maxWaitMs - elapsedMs;
if (remainingMs <= 0) {
finish(false);
return;
}
const delayMs = Math.min(intervalMs, remainingMs);
const scheduledAt = Date.now();
timer = setTimeout(() => {
timer = null;
checks += 1;
const driftMs = Math.max(0, Date.now() - scheduledAt - delayMs);
maxDriftMs = Math.max(maxDriftMs, driftMs);
if (driftMs > driftThresholdMs) {
readyChecks = 0;
} else {
readyChecks += 1;
}
if (readyChecks >= consecutiveReadyChecks) {
finish(true);
return;
}
scheduleNext();
}, delayMs);
};
scheduleNext();
});
}

View File

@@ -16,6 +16,7 @@ import { isTruthyEnvValue } from "../infra/env.js";
import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js";
import { getFreePortBlockWithPermissionFallback } from "../test-utils/ports.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js";
import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js";
import { GatewayClient, type GatewayClientOptions } from "./client.js";
// Aggregate docker live runs can contend on startup enough that the gateway
@@ -289,11 +290,13 @@ async function connectClientOnce(params: {
return await new Promise<GatewayClient>((resolve, reject) => {
let done = false;
let client: GatewayClient | undefined;
const abortStart = new AbortController();
const finish = (result: { client?: GatewayClient; error?: Error }) => {
if (done) {
return;
}
done = true;
abortStart.abort();
clearTimeout(connectTimeout);
if (result.error) {
if (client) {
@@ -334,7 +337,19 @@ async function connectClientOnce(params: {
params.timeoutMs,
);
connectTimeout.unref();
client.start();
void startGatewayClientWhenEventLoopReady(client, {
timeoutMs: params.timeoutMs,
signal: abortStart.signal,
}).then(
(readiness) => {
if (!readiness.ready && !readiness.aborted) {
finish({ error: new Error("gateway event loop readiness timeout") });
}
},
(error) => {
finish({ error: error instanceof Error ? error : new Error(String(error)) });
},
);
});
}

View File

@@ -1,5 +1,6 @@
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { resolveGatewayClientBootstrap } from "./client-bootstrap.js";
import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js";
import { GatewayClient, type GatewayClientOptions } from "./client.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "./protocol/client-info.js";
@@ -86,7 +87,12 @@ export async function withOperatorApprovalsGatewayClient<T>(
});
try {
gatewayClient.start();
const readiness = await startGatewayClientWhenEventLoopReady(gatewayClient, {
clientOptions: { preauthHandshakeTimeoutMs: params.config.gateway?.handshakeTimeoutMs },
});
if (!readiness.ready) {
throw new Error("gateway event loop readiness timeout");
}
await ready;
return await run(gatewayClient);
} finally {

View File

@@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
const gatewayClientState = vi.hoisted(() => ({
options: null as Record<string, unknown> | null,
requests: [] as string[],
startCalls: 0,
startMode: "hello" as "hello" | "close" | "connect-error-close" | "startup-retry-then-hello",
close: { code: 1008, reason: "pairing required" },
helloAuth: {
@@ -32,6 +33,17 @@ const deviceIdentityState = vi.hoisted(() => ({
} as Record<string, unknown> | null,
}));
const eventLoopReadyState = vi.hoisted(() => ({
calls: [] as Array<{ maxWaitMs?: number } | undefined>,
result: {
ready: true,
elapsedMs: 0,
maxDriftMs: 0,
checks: 2,
aborted: false,
},
}));
class MockGatewayClientRequestError extends Error {
readonly details?: unknown;
@@ -51,6 +63,7 @@ class MockGatewayClient {
}
start(): void {
gatewayClientState.startCalls += 1;
void Promise.resolve()
.then(async () => {
if (gatewayClientState.startMode === "close") {
@@ -134,6 +147,13 @@ vi.mock("../infra/device-auth-store.js", () => ({
loadDeviceAuthToken: () => deviceIdentityState.cachedToken,
}));
vi.mock("./event-loop-ready.js", () => ({
waitForEventLoopReady: vi.fn((params?: { maxWaitMs?: number }) => {
eventLoopReadyState.calls.push(params);
return Promise.resolve(eventLoopReadyState.result);
}),
}));
const { clampProbeTimeoutMs, probeGateway } = await import("./probe.js");
describe("probeGateway", () => {
@@ -146,6 +166,9 @@ describe("probeGateway", () => {
updatedAtMs: 1,
};
gatewayClientState.startMode = "hello";
gatewayClientState.options = null;
gatewayClientState.requests = [];
gatewayClientState.startCalls = 0;
gatewayClientState.close = { code: 1008, reason: "pairing required" };
gatewayClientState.helloAuth = {
role: "operator",
@@ -157,6 +180,14 @@ describe("probeGateway", () => {
reason: "scope-upgrade",
requestId: "req-123",
};
eventLoopReadyState.calls = [];
eventLoopReadyState.result = {
ready: true,
elapsedMs: 0,
maxDriftMs: 0,
checks: 2,
aborted: false,
};
});
it("clamps probe timeout to timer-safe bounds", () => {
@@ -164,6 +195,50 @@ describe("probeGateway", () => {
expect(clampProbeTimeoutMs(2_000)).toBe(2_000);
expect(clampProbeTimeoutMs(3_000_000_000)).toBe(2_147_483_647);
});
it("waits for event-loop readiness before connecting", async () => {
await probeGateway({
url: "ws://127.0.0.1:18789",
timeoutMs: 1_000,
includeDetails: false,
});
expect(eventLoopReadyState.calls).toHaveLength(1);
expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(1_000);
expect(gatewayClientState.options).not.toBeNull();
expect(gatewayClientState.startCalls).toBe(1);
});
it("fails before connecting when event-loop readiness consumes the initial probe budget", async () => {
eventLoopReadyState.result = {
ready: false,
elapsedMs: 250,
maxDriftMs: 500,
checks: 1,
aborted: false,
};
const result = await probeGateway({
url: "ws://127.0.0.1:18789",
timeoutMs: 1,
includeDetails: false,
});
expect(result).toMatchObject({
ok: false,
error: "timeout",
close: null,
auth: {
role: null,
scopes: [],
capability: "unknown",
},
});
expect(eventLoopReadyState.calls).toHaveLength(1);
expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(250);
expect(gatewayClientState.options).not.toBeNull();
expect(gatewayClientState.startCalls).toBe(0);
});
it("connects with operator.read scope", async () => {
const result = await probeGateway({
url: "ws://127.0.0.1:18789",

View File

@@ -3,6 +3,7 @@ import { loadDeviceAuthToken } from "../infra/device-auth-store.js";
import { formatErrorMessage } from "../infra/errors.js";
import type { SystemPresence } from "../infra/system-presence.js";
import { MAX_SAFE_TIMEOUT_DELAY_MS, resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js";
import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js";
import { GatewayClient, GatewayClientRequestError } from "./client.js";
import { READ_SCOPE } from "./method-scopes.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "./protocol/client-info.js";
@@ -185,19 +186,21 @@ export async function probeGateway(opts: {
return null;
}
})();
const initialProbeTimeoutMs = clampProbeTimeoutMs(opts.timeoutMs);
return await new Promise<GatewayProbeResult>((resolve) => {
let settled = false;
let timer: ReturnType<typeof setTimeout> | null = null;
const startAbort = new AbortController();
const clearProbeTimer = () => {
if (timer) {
clearTimeout(timer);
timer = null;
}
};
const armProbeTimer = (onTimeout: () => void) => {
const armProbeTimer = (onTimeout: () => void, timeoutMs = initialProbeTimeoutMs) => {
clearProbeTimer();
timer = setTimeout(onTimeout, clampProbeTimeoutMs(opts.timeoutMs));
timer = setTimeout(onTimeout, resolveSafeTimeoutDelayMs(timeoutMs));
};
const settle = (
result: Omit<GatewayProbeResult, "url" | "connectErrorDetails"> & {
@@ -208,6 +211,7 @@ export async function probeGateway(opts: {
return;
}
settled = true;
startAbort.abort();
clearProbeTimer();
client.stop();
const { connectErrorDetails: resultConnectErrorDetails, ...rest } = result;
@@ -373,6 +377,36 @@ export async function probeGateway(opts: {
});
});
client.start();
void startGatewayClientWhenEventLoopReady(client, {
timeoutMs: initialProbeTimeoutMs,
signal: startAbort.signal,
})
.then((readiness) => {
if (settled || readiness.ready || readiness.aborted) {
return;
}
settleProbe({
ok: false,
error: "timeout",
health: null,
status: null,
presence: null,
configSnapshot: null,
});
})
.catch((err) => {
if (settled) {
return;
}
connectError = formatErrorMessage(err);
settleProbe({
ok: false,
error: connectError,
health: null,
status: null,
presence: null,
configSnapshot: null,
});
});
});
}

View File

@@ -11,6 +11,7 @@ const mockGatewayClientRequests = vi.hoisted(() =>
})),
);
const mockCreateOperatorApprovalsGatewayClient = vi.hoisted(() => vi.fn());
const mockStartGatewayClientWhenEventLoopReady = vi.hoisted(() => vi.fn());
const loggerMocks = vi.hoisted(() => ({
debug: vi.fn(),
error: vi.fn(),
@@ -20,6 +21,10 @@ vi.mock("../gateway/operator-approvals-client.js", () => ({
createOperatorApprovalsGatewayClient: mockCreateOperatorApprovalsGatewayClient,
}));
vi.mock("../gateway/client-start-readiness.js", () => ({
startGatewayClientWhenEventLoopReady: mockStartGatewayClientWhenEventLoopReady,
}));
vi.mock("../logging/subsystem.js", () => ({
createSubsystemLogger: () => loggerMocks,
}));
@@ -97,6 +102,10 @@ beforeEach(() => {
mockGatewayClientRequests.mockImplementation(async (method: string) =>
method.endsWith(".approval.list") ? [] : { ok: true },
);
mockStartGatewayClientWhenEventLoopReady.mockReset().mockImplementation(async (client) => {
client.start();
return { ready: true, elapsedMs: 0, maxDriftMs: 0, checks: 2, aborted: false };
});
loggerMocks.debug.mockReset();
loggerMocks.error.mockReset();
mockCreateOperatorApprovalsGatewayClient.mockReset().mockImplementation(async (params) => ({
@@ -252,12 +261,48 @@ describe("createExecApprovalChannelRuntime", () => {
await runtime.request("exec.approval.resolve", { id: "abc", decision: "deny" });
expect(mockGatewayClientStarts).toHaveBeenCalledTimes(1);
expect(mockStartGatewayClientWhenEventLoopReady).toHaveBeenCalledWith(
expect.objectContaining({ start: expect.any(Function) }),
{
clientOptions: { preauthHandshakeTimeoutMs: undefined },
},
);
expect(mockGatewayClientRequests).toHaveBeenCalledWith("exec.approval.resolve", {
id: "abc",
decision: "deny",
});
});
it("fails startup when gateway client readiness times out before start", async () => {
mockStartGatewayClientWhenEventLoopReady.mockResolvedValueOnce({
ready: false,
elapsedMs: 30_000,
maxDriftMs: 1_000,
checks: 1,
aborted: false,
});
const runtime = createExecApprovalChannelRuntime({
label: "test/exec-approvals",
clientDisplayName: "Test Exec Approvals",
cfg: { gateway: { handshakeTimeoutMs: 30_000 } } as never,
isConfigured: () => true,
shouldHandle: () => true,
deliverRequested: async () => [],
finalizeResolved: async () => undefined,
});
await expect(runtime.start()).rejects.toThrow("gateway event loop readiness timeout");
expect(mockGatewayClientStarts).not.toHaveBeenCalled();
expect(mockGatewayClientStops).toHaveBeenCalledTimes(1);
expect(mockStartGatewayClientWhenEventLoopReady).toHaveBeenCalledWith(
expect.objectContaining({ start: expect.any(Function) }),
{
clientOptions: { preauthHandshakeTimeoutMs: 30_000 },
},
);
});
it("can retry start after gateway client creation fails", async () => {
const boom = new Error("boom");
mockCreateOperatorApprovalsGatewayClient

View File

@@ -1,3 +1,4 @@
import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js";
import type { GatewayClient, GatewayReconnectPausedInfo } from "../gateway/client.js";
import { createOperatorApprovalsGatewayClient } from "../gateway/operator-approvals-client.js";
import { readConnectErrorDetailCode } from "../gateway/protocol/connect-error-details.js";
@@ -358,7 +359,14 @@ export function createExecApprovalChannelRuntime<
await adapter.beforeGatewayClientStart?.();
gatewayClient = client;
try {
client.start();
const readiness = await startGatewayClientWhenEventLoopReady(client, {
clientOptions: {
preauthHandshakeTimeoutMs: adapter.cfg.gateway?.handshakeTimeoutMs,
},
});
if (!readiness.ready) {
throw new Error("gateway event loop readiness timeout");
}
await ready;
if (stopClientIfInactive(client)) {
return;

View File

@@ -88,11 +88,13 @@ export class OpenClawChannelBridge {
const [
{ resolveGatewayClientBootstrap },
{ GatewayClient: GatewayClientCtor },
{ startGatewayClientWhenEventLoopReady },
{ APPROVALS_SCOPE, READ_SCOPE, WRITE_SCOPE },
{ GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES },
] = await Promise.all([
import("../gateway/client-bootstrap.js"),
import("../gateway/client.js"),
import("../gateway/client-start-readiness.js"),
import("../gateway/method-scopes.js"),
import("../gateway/protocol/client-info.js"),
]);
@@ -143,7 +145,12 @@ export class OpenClawChannelBridge {
this.retryingInitialConnect = false;
},
});
this.gateway.start();
const readiness = await startGatewayClientWhenEventLoopReady(this.gateway, {
clientOptions: { preauthHandshakeTimeoutMs: bootstrap.preauthHandshakeTimeoutMs },
});
if (!readiness.ready) {
this.rejectReadyOnce(new Error("gateway event loop readiness timeout"));
}
await this.readyPromise;
}

View File

@@ -1,4 +1,5 @@
import { getRuntimeConfig, type OpenClawConfig } from "../config/config.js";
import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js";
import { GatewayClient, type GatewayReconnectPausedInfo } from "../gateway/client.js";
import { resolveGatewayConnectionAuth } from "../gateway/connection-auth.js";
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js";
@@ -269,6 +270,11 @@ export async function runNodeHost(opts: NodeHostRunOptions): Promise<void> {
return bins;
}, pathEnv);
client.start();
const readiness = await startGatewayClientWhenEventLoopReady(client, {
clientOptions: { preauthHandshakeTimeoutMs: cfg.gateway?.handshakeTimeoutMs },
});
if (!readiness.ready) {
throw new Error("node host gateway event loop readiness timeout");
}
await new Promise(() => {});
}

View File

@@ -18,6 +18,7 @@ export { ensureGatewayStartupAuth } from "../gateway/startup-auth.js";
export { resolveGatewayAuth } from "../gateway/auth.js";
export { rawDataToString } from "../infra/ws.js";
export { GatewayClient } from "../gateway/client.js";
export { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js";
export {
createOperatorApprovalsGatewayClient,
withOperatorApprovalsGatewayClient,

View File

@@ -7,6 +7,7 @@ import {
ensureExplicitGatewayAuth,
resolveExplicitGatewayAuth,
} from "../gateway/call.js";
import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js";
import { GatewayClient, GatewayClientRequestError } from "../gateway/client.js";
import { isLoopbackHost } from "../gateway/net.js";
import {
@@ -99,7 +100,7 @@ export class GatewayChatClient implements TuiBackend {
private client: GatewayClient;
private readyPromise: Promise<void>;
private resolveReady?: () => void;
readonly connection: { url: string; token?: string; password?: string };
readonly connection: ResolvedGatewayConnection;
hello?: HelloOk;
onEvent?: (evt: GatewayEvent) => void;
@@ -160,7 +161,13 @@ export class GatewayChatClient implements TuiBackend {
}
start() {
this.client.start();
void startGatewayClientWhenEventLoopReady(this.client, {
clientOptions: { preauthHandshakeTimeoutMs: this.connection.preauthHandshakeTimeoutMs },
}).then((readiness) => {
if (!readiness.ready && !readiness.aborted) {
this.onDisconnected?.("gateway event loop readiness timeout");
}
});
}
stop() {