From 6bbacd14a36684dcddf42e1b6db8453771ef6663 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 29 Apr 2026 14:50:21 +0100 Subject: [PATCH] fix(gateway): wait for event loop before client start --- CHANGELOG.md | 1 + .../.generated/plugin-sdk-api-baseline.sha256 | 4 +- docs/plugins/sdk-migration.md | 2 +- docs/plugins/sdk-subpaths.md | 2 +- .../google-meet/src/voice-call-gateway.ts | 30 +++- src/acp/server.startup.test.ts | 22 ++- src/acp/server.ts | 8 +- src/gateway/call.test.ts | 145 +++++++++++++----- src/gateway/call.ts | 32 +++- src/gateway/client-callsites.guard.test.ts | 6 +- src/gateway/client-start-readiness.test.ts | 38 +++++ src/gateway/client-start-readiness.ts | 46 ++++++ src/gateway/event-loop-ready.test.ts | 69 +++++++++ src/gateway/event-loop-ready.ts | 114 ++++++++++++++ .../gateway-cli-backend.live-helpers.ts | 17 +- src/gateway/operator-approvals-client.ts | 8 +- src/gateway/probe.test.ts | 75 +++++++++ src/gateway/probe.ts | 40 ++++- .../exec-approval-channel-runtime.test.ts | 45 ++++++ src/infra/exec-approval-channel-runtime.ts | 10 +- src/mcp/channel-bridge.ts | 9 +- src/node-host/runner.ts | 8 +- src/plugin-sdk/gateway-runtime.ts | 1 + src/tui/gateway-chat.ts | 11 +- 24 files changed, 675 insertions(+), 68 deletions(-) create mode 100644 src/gateway/client-start-readiness.test.ts create mode 100644 src/gateway/client-start-readiness.ts create mode 100644 src/gateway/event-loop-ready.test.ts create mode 100644 src/gateway/event-loop-ready.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index e30e090750a..e84fe5221c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai - CLI/tools: keep the Gateway `tools.*` RPC namespace out of plugin command discovery and managed proxy startup, so stray commands like `openclaw tools effective` fail quickly instead of cold-loading plugin metadata. Refs #73477. Thanks @oromeis. - CLI/status: keep default text `openclaw status --usage` on metadata-only channel scans unless `--deep` or `--all` is set, and send stray `openclaw tools --help` through the precomputed root-help fast path so latency-triage commands avoid plugin/runtime cold loads before printing. Refs #73477 and #74220. Thanks @oromeis and @NianJiuZst. - Agents/diagnostics: log slow embedded-run startup and preparation stage timings before model I/O, so Docker/VPS latency reports can identify whether plugin loading, auth/model resolution, tool inventory, bootstrap, MCP/LSP, resource loading, or stream setup is dominating pre-run latency. Refs #73428. Thanks @Dimaoggg, @quangtran88, and @Heyvhuang. +- Gateway/clients: wait for the event loop to become responsive before opening Gateway WebSocket RPC/probe/client connections while charging that readiness wait to caller timeouts, so Windows deferred module-evaluation stalls no longer turn healthy loopback gateways into false handshake timeouts across status, TUI, ACP, MCP, node-host, and plugin client paths. Refs #74279 and #48270. Thanks @wongcode and @joost-heijden. - Plugins/runtime-deps: memoize packaged bundled runtime dist-mirror preparation after the first successful pass while keeping source-checkout mirrors refreshable, so constrained Docker/VPS installs avoid repeated root scans before chat turns. Refs #73428, #73421, #73532, and #73477. Thanks @Dimaoggg, @oromeis, @oadiazp, @jmfraga, @bstanbury, @antoniusfelix, and @jkobject. - Channels/Discord: treat bare numeric outbound targets that match the effective Discord DM allowlist as user DMs while preserving account-specific legacy `dm.allowFrom` precedence over inherited root `allowFrom`. (#74303) Thanks @Squirbie. - Control UI: make the chat sidebar split divider focusable, keyboard-resizable, ARIA-described, and pointer-event based so sidebar resizing works without a mouse. Thanks @BunsDev. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index df7a39d31ae..8d76c0044f9 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -d5b33ee6be988cd6a844a358aaa098e1f6401b151e5ee1e46dceeccddaeb7434 plugin-sdk-api-baseline.json -dffa8b4afbb085faf42a857805c43708b748111e346552d7ea4654da3bafdee7 plugin-sdk-api-baseline.jsonl +4266568b93a03ce07e7811285ebc67ec90ebf7519ee892bbeb618b4a527f81e9 plugin-sdk-api-baseline.json +545c2ea8874ef1332c7710b74291f391231c861ac296afa46ddcc91fc1c2ed1c plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index eb0e2439468..8eb3024b708 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -412,7 +412,7 @@ releases. | `plugin-sdk/lazy-runtime` | Lazy runtime helpers | `createLazyRuntimeModule`, `createLazyRuntimeMethod`, `createLazyRuntimeMethodBinder`, `createLazyRuntimeNamedExport`, `createLazyRuntimeSurface` | | `plugin-sdk/process-runtime` | Process helpers | Shared exec helpers | | `plugin-sdk/cli-runtime` | CLI runtime helpers | Command formatting, waits, version helpers | - | `plugin-sdk/gateway-runtime` | Gateway helpers | Gateway client and channel-status patch helpers | + | `plugin-sdk/gateway-runtime` | Gateway helpers | Gateway client, event-loop-ready start helper, and channel-status patch helpers | | `plugin-sdk/config-runtime` | Deprecated config compatibility shim | Prefer `config-types`, `plugin-config-runtime`, `runtime-config-snapshot`, and `config-mutation` | | `plugin-sdk/telegram-command-config` | Telegram command helpers | Fallback-stable Telegram command validation helpers when the bundled Telegram contract surface is unavailable | | `plugin-sdk/approval-runtime` | Approval prompt helpers | Exec/plugin approval payload, approval capability/profile helpers, native approval routing/runtime helpers, and structured approval display path formatting | diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index 2049138699a..58bdaac3248 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -180,7 +180,7 @@ For the plugin authoring guide, see [Plugin SDK overview](/plugins/sdk-overview) | `plugin-sdk/lazy-runtime` | Lazy runtime import/binding helpers such as `createLazyRuntimeModule`, `createLazyRuntimeMethod`, and `createLazyRuntimeSurface` | | `plugin-sdk/process-runtime` | Process exec helpers | | `plugin-sdk/cli-runtime` | CLI formatting, wait, version, argument-invocation, and lazy command-group helpers | - | `plugin-sdk/gateway-runtime` | Gateway client, gateway CLI RPC, gateway protocol errors, and channel-status patch helpers | + | `plugin-sdk/gateway-runtime` | Gateway client, event-loop-ready client start helper, gateway CLI RPC, gateway protocol errors, and channel-status patch helpers | | `plugin-sdk/config-types` | Type-only config surface for plugin config shapes such as `OpenClawConfig` and channel/provider config types | | `plugin-sdk/plugin-config-runtime` | Runtime plugin-config lookup helpers such as `requireRuntimeConfig`, `resolvePluginConfigObject`, and `resolveLivePluginConfigObject` | | `plugin-sdk/config-mutation` | Transactional config mutation helpers such as `mutateConfigFile`, `replaceConfigFile`, and `logConfigUpdated` | diff --git a/extensions/google-meet/src/voice-call-gateway.ts b/extensions/google-meet/src/voice-call-gateway.ts index f65a3e376ed..efe1d54c3cd 100644 --- a/extensions/google-meet/src/voice-call-gateway.ts +++ b/extensions/google-meet/src/voice-call-gateway.ts @@ -1,5 +1,8 @@ import { setTimeout as sleep } from "node:timers/promises"; -import { GatewayClient } from "openclaw/plugin-sdk/gateway-runtime"; +import { + GatewayClient, + startGatewayClientWhenEventLoopReady, +} from "openclaw/plugin-sdk/gateway-runtime"; import type { GoogleMeetConfig } from "./config.js"; type VoiceCallGatewayClient = InstanceType; @@ -20,10 +23,11 @@ async function createConnectedGatewayClient( ): Promise { let client: VoiceCallGatewayClient; await new Promise((resolve, reject) => { - const timer = setTimeout( - () => reject(new Error("gateway connect timeout")), - config.voiceCall.requestTimeoutMs, - ); + const abortStart = new AbortController(); + const timer = setTimeout(() => { + abortStart.abort(); + reject(new Error("gateway connect timeout")); + }, config.voiceCall.requestTimeoutMs); client = new GatewayClient({ url: config.voiceCall.gatewayUrl, token: config.voiceCall.token, @@ -37,10 +41,24 @@ async function createConnectedGatewayClient( }, onConnectError: (err) => { clearTimeout(timer); + abortStart.abort(); reject(err); }, }); - client.start(); + void startGatewayClientWhenEventLoopReady(client, { + timeoutMs: config.voiceCall.requestTimeoutMs, + signal: abortStart.signal, + }) + .then((readiness) => { + if (!readiness.ready && !readiness.aborted) { + clearTimeout(timer); + reject(new Error("gateway event loop readiness timeout")); + } + }) + .catch((err) => { + clearTimeout(timer); + reject(err instanceof Error ? err : new Error(String(err))); + }); }); return client!; } diff --git a/src/acp/server.startup.test.ts b/src/acp/server.startup.test.ts index 33ab0edc1f8..acadc23e427 100644 --- a/src/acp/server.startup.test.ts +++ b/src/acp/server.startup.test.ts @@ -107,6 +107,19 @@ vi.mock("../gateway/client.js", () => ({ GatewayClient: MockGatewayClient, })); +vi.mock("../gateway/client-start-readiness.js", () => ({ + startGatewayClientWhenEventLoopReady: vi.fn(async (client: MockGatewayClient) => { + client.start(); + return { + ready: true, + elapsedMs: 0, + maxDriftMs: 0, + checks: 2, + aborted: false, + }; + }), +})); + vi.mock("../infra/is-main.js", () => ({ isMainModule: () => false, })); @@ -158,11 +171,14 @@ describe("serveAcpGateway startup", () => { } async function emitHelloAndWaitForAgentSideConnection() { + await vi.waitFor(() => { + expect(mockState.gateways).toHaveLength(1); + }); const gateway = getMockGateway(); gateway.emitHello(); - await Promise.resolve(); - await Promise.resolve(); - expect(mockState.agentSideConnectionCtor).toHaveBeenCalledTimes(1); + await vi.waitFor(() => { + expect(mockState.agentSideConnectionCtor).toHaveBeenCalledTimes(1); + }); } async function stopServeWithSigint( diff --git a/src/acp/server.ts b/src/acp/server.ts index 6819a347624..1d47545a256 100644 --- a/src/acp/server.ts +++ b/src/acp/server.ts @@ -4,6 +4,7 @@ import { fileURLToPath } from "node:url"; import { AgentSideConnection, ndJsonStream } from "@agentclientprotocol/sdk"; import { getRuntimeConfig } from "../config/config.js"; import { resolveGatewayClientBootstrap } from "../gateway/client-bootstrap.js"; +import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js"; import { GatewayClient } from "../gateway/client.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js"; import { isMainModule } from "../infra/is-main.js"; @@ -103,7 +104,12 @@ export async function serveAcpGateway(opts: AcpServerOptions = {}): Promise { shutdown(); throw err; diff --git a/src/gateway/call.test.ts b/src/gateway/call.test.ts index 759ba4b2e7d..fab3f0b26da 100644 --- a/src/gateway/call.test.ts +++ b/src/gateway/call.test.ts @@ -22,6 +22,24 @@ const deviceIdentityState = vi.hoisted(() => ({ throwOnLoad: false, })); +const eventLoopReadyState = vi.hoisted(() => ({ + calls: [] as Array<{ maxWaitMs?: number } | undefined>, + promise: null as Promise<{ + ready: boolean; + elapsedMs: number; + maxDriftMs: number; + checks: number; + aborted: boolean; + }> | null, + result: { + ready: true, + elapsedMs: 0, + maxDriftMs: 0, + checks: 2, + aborted: false, + }, +})); + let lastClientOptions: { url?: string; token?: string; @@ -43,6 +61,7 @@ let lastRequestOptions: { } | null = null; type StartMode = "hello" | "close" | "silent" | "startup-retry-then-hello"; let startMode: StartMode = "hello"; +let startCalls = 0; let closeCode = 1006; let closeReason = ""; let helloMethods: string[] | undefined = ["health", "secrets.resolve"]; @@ -81,6 +100,7 @@ vi.mock("./client.js", () => ({ return { ok: true }; } start() { + startCalls += 1; if (startMode === "hello") { void lastClientOptions?.onHelloOk?.({ features: { @@ -101,6 +121,16 @@ vi.mock("./client.js", () => ({ }, })); +vi.mock("./event-loop-ready.js", () => ({ + waitForEventLoopReady: vi.fn(async (params?: { maxWaitMs?: number }) => { + eventLoopReadyState.calls.push(params); + if (eventLoopReadyState.promise) { + return await eventLoopReadyState.promise; + } + return eventLoopReadyState.result; + }), +})); + const { __testing, buildGatewayConnectionDetails, @@ -134,6 +164,7 @@ class StubGatewayClient { return { ok: true }; } start() { + startCalls += 1; if (startMode === "hello") { void lastClientOptions?.onHelloOk?.({ features: { @@ -161,7 +192,17 @@ function resetGatewayCallMocks() { pickPrimaryLanIPv4.mockClear(); lastClientOptions = null; lastRequestOptions = null; + eventLoopReadyState.calls = []; + eventLoopReadyState.promise = null; + eventLoopReadyState.result = { + ready: true, + elapsedMs: 0, + maxDriftMs: 0, + checks: 2, + aborted: false, + }; startMode = "hello"; + startCalls = 0; closeCode = 1006; closeReason = ""; helloMethods = ["health", "secrets.resolve"]; @@ -570,52 +611,37 @@ describe("callGateway url resolution", () => { expect(lastClientOptions?.clientDisplayName).toBeUndefined(); }); - it("yields one event-loop turn before starting CLI pairing requests", async () => { + it("waits for event-loop readiness before starting CLI pairing requests", async () => { setLocalLoopbackGatewayConfig(); - let preConnectYieldRan = false; - let sawYieldBeforeStart = false; - setImmediate(() => { - preConnectYieldRan = true; + let resolveReady!: (result: { + ready: boolean; + elapsedMs: number; + maxDriftMs: number; + checks: number; + aborted: boolean; + }) => void; + eventLoopReadyState.promise = new Promise((resolve) => { + resolveReady = resolve; }); - __testing.setDepsForTests({ - createGatewayClient: (opts) => - ({ - async request( - method: string, - params: unknown, - requestOpts?: { expectFinal?: boolean; timeoutMs?: number | null }, - ) { - lastRequestOptions = { method, params, opts: requestOpts }; - return { ok: true }; - }, - start() { - sawYieldBeforeStart = preConnectYieldRan; - opts.onHelloOk?.({ - features: { - methods: helloMethods ?? [], - events: [], - }, - } as unknown as Parameters>[0]); - }, - stop() {}, - }) as never, - getRuntimeConfig: getRuntimeConfig as unknown as () => OpenClawConfig, - loadOrCreateDeviceIdentity: () => deviceIdentityState.value, - resolveGatewayPort: resolveGatewayPort as unknown as ( - cfg?: OpenClawConfig, - env?: NodeJS.ProcessEnv, - ) => number, - }); - - await callGateway({ + const promise = callGateway({ method: "device.pair.list", mode: GATEWAY_CLIENT_MODES.CLI, clientName: GATEWAY_CLIENT_NAMES.CLI, }); - expect(sawYieldBeforeStart).toBe(true); + await vi.waitFor(() => { + expect(eventLoopReadyState.calls).toHaveLength(1); + }); + expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(10_000); + expect(lastClientOptions?.clientName).toBe(GATEWAY_CLIENT_NAMES.CLI); + expect(startCalls).toBe(0); + + resolveReady({ ready: true, elapsedMs: 0, maxDriftMs: 0, checks: 2, aborted: false }); + await promise; + + expect(startCalls).toBe(1); }); }); @@ -896,6 +922,51 @@ describe("callGateway error details", () => { }); }); + it("charges event-loop readiness against the wrapper timeout", async () => { + startMode = "silent"; + setLocalLoopbackGatewayConfig(); + eventLoopReadyState.promise = new Promise(() => {}); + + vi.useFakeTimers(); + let errMessage = ""; + const promise = callGateway({ method: "health", timeoutMs: 5 }).catch((caught) => { + errMessage = caught instanceof Error ? caught.message : String(caught); + }); + + await vi.waitFor(() => { + expect(eventLoopReadyState.calls).toHaveLength(1); + }); + expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(5); + expect(startCalls).toBe(0); + await vi.advanceTimersByTimeAsync(5); + await promise; + + expect(startCalls).toBe(0); + expect(errMessage).toContain("gateway timeout after 5ms"); + }); + + it("fails before connecting when event-loop readiness consumes the wrapper timeout", async () => { + startMode = "silent"; + setLocalLoopbackGatewayConfig(); + eventLoopReadyState.result = { + ready: false, + elapsedMs: 5, + maxDriftMs: 400, + checks: 1, + aborted: false, + }; + + await expect(callGateway({ method: "health", timeoutMs: 5 })).rejects.toMatchObject({ + name: "GatewayTransportError", + kind: "timeout", + timeoutMs: 5, + }); + expect(eventLoopReadyState.calls).toHaveLength(1); + expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(5); + expect(lastClientOptions).not.toBeNull(); + expect(startCalls).toBe(0); + }); + it("keeps the default wrapper timeout aligned with configured handshake timeout", async () => { startMode = "silent"; getRuntimeConfig.mockReturnValue({ diff --git a/src/gateway/call.ts b/src/gateway/call.ts index 17c0b98e37e..75dcfb0b3e0 100644 --- a/src/gateway/call.ts +++ b/src/gateway/call.ts @@ -18,6 +18,7 @@ import { } from "../utils/message-channel.js"; import { resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js"; import { VERSION } from "../version.js"; +import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js"; import { GatewayClient, type GatewayClientOptions } from "./client.js"; import { buildGatewayConnectionDetailsWithResolvers, @@ -615,19 +616,16 @@ async function executeGatewayRequestWithScopes(params: { timeoutMs, safeTimerTimeoutMs, } = params; - // Yield to the event loop before starting the WebSocket connection. - // On Windows with large dist bundles, heavy synchronous module loading - // can starve the event loop, preventing timely processing of the - // connect.challenge frame and causing handshake timeouts (#48736). - await new Promise((r) => setImmediate(r)); return await new Promise((resolve, reject) => { let settled = false; let ignoreClose = false; + const startAbort = new AbortController(); const stop = (err?: Error, value?: T) => { if (settled) { return; } settled = true; + startAbort.abort(); clearTimeout(timer); void stopGatewayClient(client).finally(() => { if (err) { @@ -701,7 +699,29 @@ async function executeGatewayRequestWithScopes(params: { ); }, safeTimerTimeoutMs); - client.start(); + void startGatewayClientWhenEventLoopReady(client, { + timeoutMs: safeTimerTimeoutMs, + signal: startAbort.signal, + }) + .then((readiness) => { + if (settled || readiness.ready || readiness.aborted) { + return; + } + ignoreClose = true; + stop( + createGatewayTimeoutTransportError({ + timeoutMs, + connectionDetails: params.connectionDetails, + }), + ); + }) + .catch((err) => { + if (settled) { + return; + } + ignoreClose = true; + stop(err instanceof Error ? err : new Error(String(err))); + }); }); } diff --git a/src/gateway/client-callsites.guard.test.ts b/src/gateway/client-callsites.guard.test.ts index 16c4869306b..49f33ddf3ad 100644 --- a/src/gateway/client-callsites.guard.test.ts +++ b/src/gateway/client-callsites.guard.test.ts @@ -5,6 +5,7 @@ import { describe, expect, it } from "vitest"; const GATEWAY_CLIENT_CONSTRUCTOR_PATTERN = /new\s+GatewayClient\s*\(/; const ALLOWED_GATEWAY_CLIENT_CALLSITES = new Set([ + "extensions/google-meet/src/voice-call-gateway.ts", "src/acp/server.ts", "src/gateway/call.ts", "src/gateway/gateway-cli-backend.live-helpers.ts", @@ -45,7 +46,10 @@ async function collectSourceFiles(dir: string): Promise { describe("GatewayClient production callsites", () => { it("remain constrained to allowlisted files", async () => { const root = process.cwd(); - const sourceFiles = await collectSourceFiles(path.join(root, "src")); + const sourceFiles = [ + ...(await collectSourceFiles(path.join(root, "src"))), + ...(await collectSourceFiles(path.join(root, "extensions"))), + ]; const callsites: string[] = []; for (const fullPath of sourceFiles) { const relativePath = path.relative(root, fullPath).replaceAll(path.sep, "/"); diff --git a/src/gateway/client-start-readiness.test.ts b/src/gateway/client-start-readiness.test.ts new file mode 100644 index 00000000000..c90527b8b00 --- /dev/null +++ b/src/gateway/client-start-readiness.test.ts @@ -0,0 +1,38 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js"; +import type { GatewayClient } from "./client.js"; + +describe("startGatewayClientWhenEventLoopReady", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("starts the client only after the event loop is responsive", async () => { + vi.useFakeTimers(); + const client = { start: vi.fn() } as unknown as GatewayClient; + + const promise = startGatewayClientWhenEventLoopReady(client, { timeoutMs: 100 }); + + await vi.advanceTimersByTimeAsync(1); + expect(client.start).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(1); + await expect(promise).resolves.toMatchObject({ ready: true }); + + expect(client.start).toHaveBeenCalledTimes(1); + }); + + it("does not start the client after an aborted readiness wait", async () => { + vi.useFakeTimers(); + const client = { start: vi.fn() } as unknown as GatewayClient; + const controller = new AbortController(); + + const promise = startGatewayClientWhenEventLoopReady(client, { + timeoutMs: 100, + signal: controller.signal, + }); + controller.abort(); + + await expect(promise).resolves.toMatchObject({ ready: false, aborted: true }); + expect(client.start).not.toHaveBeenCalled(); + }); +}); diff --git a/src/gateway/client-start-readiness.ts b/src/gateway/client-start-readiness.ts new file mode 100644 index 00000000000..e4dbe7c01b4 --- /dev/null +++ b/src/gateway/client-start-readiness.ts @@ -0,0 +1,46 @@ +import type { GatewayClient, GatewayClientOptions } from "./client.js"; +import { waitForEventLoopReady, type EventLoopReadyResult } from "./event-loop-ready.js"; +import { resolveConnectChallengeTimeoutMs } from "./handshake-timeouts.js"; + +export type GatewayClientStartReadinessOptions = { + timeoutMs?: number; + clientOptions?: Pick< + GatewayClientOptions, + "connectChallengeTimeoutMs" | "connectDelayMs" | "preauthHandshakeTimeoutMs" + >; + signal?: AbortSignal; +}; + +export function resolveGatewayClientStartReadinessTimeoutMs( + options: GatewayClientStartReadinessOptions = {}, +): number { + if (typeof options.timeoutMs === "number" && Number.isFinite(options.timeoutMs)) { + return options.timeoutMs; + } + const clientOptions = options.clientOptions ?? {}; + const timeoutOverride = + typeof clientOptions.connectChallengeTimeoutMs === "number" && + Number.isFinite(clientOptions.connectChallengeTimeoutMs) + ? clientOptions.connectChallengeTimeoutMs + : typeof clientOptions.connectDelayMs === "number" && + Number.isFinite(clientOptions.connectDelayMs) + ? clientOptions.connectDelayMs + : undefined; + return resolveConnectChallengeTimeoutMs(timeoutOverride, { + configuredTimeoutMs: clientOptions.preauthHandshakeTimeoutMs, + }); +} + +export async function startGatewayClientWhenEventLoopReady( + client: GatewayClient, + options: GatewayClientStartReadinessOptions = {}, +): Promise { + const readiness = await waitForEventLoopReady({ + maxWaitMs: resolveGatewayClientStartReadinessTimeoutMs(options), + signal: options.signal, + }); + if (readiness.ready && !readiness.aborted && options.signal?.aborted !== true) { + client.start(); + } + return readiness; +} diff --git a/src/gateway/event-loop-ready.test.ts b/src/gateway/event-loop-ready.test.ts new file mode 100644 index 00000000000..6544b438e9d --- /dev/null +++ b/src/gateway/event-loop-ready.test.ts @@ -0,0 +1,69 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { waitForEventLoopReady } from "./event-loop-ready.js"; + +describe("waitForEventLoopReady", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("resolves ready after consecutive low-drift timer checks", async () => { + vi.useFakeTimers(); + + const promise = waitForEventLoopReady({ + maxWaitMs: 100, + intervalMs: 10, + consecutiveReadyChecks: 2, + }); + + await vi.advanceTimersByTimeAsync(20); + + await expect(promise).resolves.toMatchObject({ + ready: true, + aborted: false, + elapsedMs: 20, + checks: 2, + maxDriftMs: 0, + }); + }); + + it("resolves not-ready when the readiness deadline expires", async () => { + vi.useFakeTimers(); + + const promise = waitForEventLoopReady({ + maxWaitMs: 5, + intervalMs: 5, + consecutiveReadyChecks: 2, + }); + + await vi.advanceTimersByTimeAsync(5); + + await expect(promise).resolves.toMatchObject({ + ready: false, + aborted: false, + elapsedMs: 5, + checks: 1, + maxDriftMs: 0, + }); + }); + + it("clears pending readiness timers when aborted", async () => { + vi.useFakeTimers(); + const controller = new AbortController(); + + const promise = waitForEventLoopReady({ + maxWaitMs: 100, + intervalMs: 10, + signal: controller.signal, + }); + + controller.abort(); + + await expect(promise).resolves.toMatchObject({ + ready: false, + aborted: true, + elapsedMs: 0, + checks: 0, + }); + expect(vi.getTimerCount()).toBe(0); + }); +}); diff --git a/src/gateway/event-loop-ready.ts b/src/gateway/event-loop-ready.ts new file mode 100644 index 00000000000..ee3cede53f2 --- /dev/null +++ b/src/gateway/event-loop-ready.ts @@ -0,0 +1,114 @@ +import { resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js"; + +export type EventLoopReadyResult = { + ready: boolean; + elapsedMs: number; + maxDriftMs: number; + checks: number; + aborted: boolean; +}; + +export type EventLoopReadyOptions = { + maxWaitMs?: number; + intervalMs?: number; + driftThresholdMs?: number; + consecutiveReadyChecks?: number; + signal?: AbortSignal; +}; + +const DEFAULT_MAX_WAIT_MS = 10_000; +const DEFAULT_INTERVAL_MS = 1; +const DEFAULT_DRIFT_THRESHOLD_MS = 200; +const DEFAULT_CONSECUTIVE_READY_CHECKS = 2; + +function resolvePositiveInteger(value: number | undefined, fallback: number): number { + return Number.isFinite(value) && value !== undefined ? Math.max(1, Math.floor(value)) : fallback; +} + +export async function waitForEventLoopReady( + options: EventLoopReadyOptions = {}, +): Promise { + const maxWaitMs = resolveSafeTimeoutDelayMs(options.maxWaitMs ?? DEFAULT_MAX_WAIT_MS); + const intervalMs = resolvePositiveInteger(options.intervalMs, DEFAULT_INTERVAL_MS); + const driftThresholdMs = resolvePositiveInteger( + options.driftThresholdMs, + DEFAULT_DRIFT_THRESHOLD_MS, + ); + const consecutiveReadyChecks = resolvePositiveInteger( + options.consecutiveReadyChecks, + DEFAULT_CONSECUTIVE_READY_CHECKS, + ); + const signal = options.signal; + + const startedAt = Date.now(); + let readyChecks = 0; + let checks = 0; + let maxDriftMs = 0; + + return await new Promise((resolve) => { + let settled = false; + let timer: ReturnType | null = null; + const clearTimer = () => { + if (timer) { + clearTimeout(timer); + timer = null; + } + }; + const finish = (ready: boolean, aborted = false) => { + if (settled) { + return; + } + settled = true; + clearTimer(); + signal?.removeEventListener("abort", onAbort); + resolve({ + ready, + elapsedMs: Math.max(0, Date.now() - startedAt), + maxDriftMs, + checks, + aborted, + }); + }; + const onAbort = () => { + finish(false, true); + }; + if (signal?.aborted) { + finish(false, true); + return; + } + signal?.addEventListener("abort", onAbort, { once: true }); + + const scheduleNext = () => { + if (signal?.aborted) { + finish(false, true); + return; + } + const elapsedMs = Math.max(0, Date.now() - startedAt); + const remainingMs = maxWaitMs - elapsedMs; + if (remainingMs <= 0) { + finish(false); + return; + } + const delayMs = Math.min(intervalMs, remainingMs); + const scheduledAt = Date.now(); + timer = setTimeout(() => { + timer = null; + checks += 1; + const driftMs = Math.max(0, Date.now() - scheduledAt - delayMs); + maxDriftMs = Math.max(maxDriftMs, driftMs); + if (driftMs > driftThresholdMs) { + readyChecks = 0; + } else { + readyChecks += 1; + } + if (readyChecks >= consecutiveReadyChecks) { + finish(true); + return; + } + scheduleNext(); + }, delayMs); + }; + + scheduleNext(); + }); +} diff --git a/src/gateway/gateway-cli-backend.live-helpers.ts b/src/gateway/gateway-cli-backend.live-helpers.ts index 347d22a1587..629070a6692 100644 --- a/src/gateway/gateway-cli-backend.live-helpers.ts +++ b/src/gateway/gateway-cli-backend.live-helpers.ts @@ -16,6 +16,7 @@ import { isTruthyEnvValue } from "../infra/env.js"; import { normalizeLowercaseStringOrEmpty } from "../shared/string-coerce.js"; import { getFreePortBlockWithPermissionFallback } from "../test-utils/ports.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../utils/message-channel.js"; +import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js"; import { GatewayClient, type GatewayClientOptions } from "./client.js"; // Aggregate docker live runs can contend on startup enough that the gateway @@ -289,11 +290,13 @@ async function connectClientOnce(params: { return await new Promise((resolve, reject) => { let done = false; let client: GatewayClient | undefined; + const abortStart = new AbortController(); const finish = (result: { client?: GatewayClient; error?: Error }) => { if (done) { return; } done = true; + abortStart.abort(); clearTimeout(connectTimeout); if (result.error) { if (client) { @@ -334,7 +337,19 @@ async function connectClientOnce(params: { params.timeoutMs, ); connectTimeout.unref(); - client.start(); + void startGatewayClientWhenEventLoopReady(client, { + timeoutMs: params.timeoutMs, + signal: abortStart.signal, + }).then( + (readiness) => { + if (!readiness.ready && !readiness.aborted) { + finish({ error: new Error("gateway event loop readiness timeout") }); + } + }, + (error) => { + finish({ error: error instanceof Error ? error : new Error(String(error)) }); + }, + ); }); } diff --git a/src/gateway/operator-approvals-client.ts b/src/gateway/operator-approvals-client.ts index 86a6bcddac7..0fca7bd43d2 100644 --- a/src/gateway/operator-approvals-client.ts +++ b/src/gateway/operator-approvals-client.ts @@ -1,5 +1,6 @@ import type { OpenClawConfig } from "../config/types.openclaw.js"; import { resolveGatewayClientBootstrap } from "./client-bootstrap.js"; +import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js"; import { GatewayClient, type GatewayClientOptions } from "./client.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "./protocol/client-info.js"; @@ -86,7 +87,12 @@ export async function withOperatorApprovalsGatewayClient( }); try { - gatewayClient.start(); + const readiness = await startGatewayClientWhenEventLoopReady(gatewayClient, { + clientOptions: { preauthHandshakeTimeoutMs: params.config.gateway?.handshakeTimeoutMs }, + }); + if (!readiness.ready) { + throw new Error("gateway event loop readiness timeout"); + } await ready; return await run(gatewayClient); } finally { diff --git a/src/gateway/probe.test.ts b/src/gateway/probe.test.ts index 2654b37e01a..fbd57b6785c 100644 --- a/src/gateway/probe.test.ts +++ b/src/gateway/probe.test.ts @@ -3,6 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; const gatewayClientState = vi.hoisted(() => ({ options: null as Record | null, requests: [] as string[], + startCalls: 0, startMode: "hello" as "hello" | "close" | "connect-error-close" | "startup-retry-then-hello", close: { code: 1008, reason: "pairing required" }, helloAuth: { @@ -32,6 +33,17 @@ const deviceIdentityState = vi.hoisted(() => ({ } as Record | null, })); +const eventLoopReadyState = vi.hoisted(() => ({ + calls: [] as Array<{ maxWaitMs?: number } | undefined>, + result: { + ready: true, + elapsedMs: 0, + maxDriftMs: 0, + checks: 2, + aborted: false, + }, +})); + class MockGatewayClientRequestError extends Error { readonly details?: unknown; @@ -51,6 +63,7 @@ class MockGatewayClient { } start(): void { + gatewayClientState.startCalls += 1; void Promise.resolve() .then(async () => { if (gatewayClientState.startMode === "close") { @@ -134,6 +147,13 @@ vi.mock("../infra/device-auth-store.js", () => ({ loadDeviceAuthToken: () => deviceIdentityState.cachedToken, })); +vi.mock("./event-loop-ready.js", () => ({ + waitForEventLoopReady: vi.fn((params?: { maxWaitMs?: number }) => { + eventLoopReadyState.calls.push(params); + return Promise.resolve(eventLoopReadyState.result); + }), +})); + const { clampProbeTimeoutMs, probeGateway } = await import("./probe.js"); describe("probeGateway", () => { @@ -146,6 +166,9 @@ describe("probeGateway", () => { updatedAtMs: 1, }; gatewayClientState.startMode = "hello"; + gatewayClientState.options = null; + gatewayClientState.requests = []; + gatewayClientState.startCalls = 0; gatewayClientState.close = { code: 1008, reason: "pairing required" }; gatewayClientState.helloAuth = { role: "operator", @@ -157,6 +180,14 @@ describe("probeGateway", () => { reason: "scope-upgrade", requestId: "req-123", }; + eventLoopReadyState.calls = []; + eventLoopReadyState.result = { + ready: true, + elapsedMs: 0, + maxDriftMs: 0, + checks: 2, + aborted: false, + }; }); it("clamps probe timeout to timer-safe bounds", () => { @@ -164,6 +195,50 @@ describe("probeGateway", () => { expect(clampProbeTimeoutMs(2_000)).toBe(2_000); expect(clampProbeTimeoutMs(3_000_000_000)).toBe(2_147_483_647); }); + it("waits for event-loop readiness before connecting", async () => { + await probeGateway({ + url: "ws://127.0.0.1:18789", + timeoutMs: 1_000, + includeDetails: false, + }); + + expect(eventLoopReadyState.calls).toHaveLength(1); + expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(1_000); + expect(gatewayClientState.options).not.toBeNull(); + expect(gatewayClientState.startCalls).toBe(1); + }); + + it("fails before connecting when event-loop readiness consumes the initial probe budget", async () => { + eventLoopReadyState.result = { + ready: false, + elapsedMs: 250, + maxDriftMs: 500, + checks: 1, + aborted: false, + }; + + const result = await probeGateway({ + url: "ws://127.0.0.1:18789", + timeoutMs: 1, + includeDetails: false, + }); + + expect(result).toMatchObject({ + ok: false, + error: "timeout", + close: null, + auth: { + role: null, + scopes: [], + capability: "unknown", + }, + }); + expect(eventLoopReadyState.calls).toHaveLength(1); + expect(eventLoopReadyState.calls[0]?.maxWaitMs).toBe(250); + expect(gatewayClientState.options).not.toBeNull(); + expect(gatewayClientState.startCalls).toBe(0); + }); + it("connects with operator.read scope", async () => { const result = await probeGateway({ url: "ws://127.0.0.1:18789", diff --git a/src/gateway/probe.ts b/src/gateway/probe.ts index 4d1acaeac79..6ca77a2ade3 100644 --- a/src/gateway/probe.ts +++ b/src/gateway/probe.ts @@ -3,6 +3,7 @@ import { loadDeviceAuthToken } from "../infra/device-auth-store.js"; import { formatErrorMessage } from "../infra/errors.js"; import type { SystemPresence } from "../infra/system-presence.js"; import { MAX_SAFE_TIMEOUT_DELAY_MS, resolveSafeTimeoutDelayMs } from "../utils/timer-delay.js"; +import { startGatewayClientWhenEventLoopReady } from "./client-start-readiness.js"; import { GatewayClient, GatewayClientRequestError } from "./client.js"; import { READ_SCOPE } from "./method-scopes.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "./protocol/client-info.js"; @@ -185,19 +186,21 @@ export async function probeGateway(opts: { return null; } })(); + const initialProbeTimeoutMs = clampProbeTimeoutMs(opts.timeoutMs); return await new Promise((resolve) => { let settled = false; let timer: ReturnType | null = null; + const startAbort = new AbortController(); const clearProbeTimer = () => { if (timer) { clearTimeout(timer); timer = null; } }; - const armProbeTimer = (onTimeout: () => void) => { + const armProbeTimer = (onTimeout: () => void, timeoutMs = initialProbeTimeoutMs) => { clearProbeTimer(); - timer = setTimeout(onTimeout, clampProbeTimeoutMs(opts.timeoutMs)); + timer = setTimeout(onTimeout, resolveSafeTimeoutDelayMs(timeoutMs)); }; const settle = ( result: Omit & { @@ -208,6 +211,7 @@ export async function probeGateway(opts: { return; } settled = true; + startAbort.abort(); clearProbeTimer(); client.stop(); const { connectErrorDetails: resultConnectErrorDetails, ...rest } = result; @@ -373,6 +377,36 @@ export async function probeGateway(opts: { }); }); - client.start(); + void startGatewayClientWhenEventLoopReady(client, { + timeoutMs: initialProbeTimeoutMs, + signal: startAbort.signal, + }) + .then((readiness) => { + if (settled || readiness.ready || readiness.aborted) { + return; + } + settleProbe({ + ok: false, + error: "timeout", + health: null, + status: null, + presence: null, + configSnapshot: null, + }); + }) + .catch((err) => { + if (settled) { + return; + } + connectError = formatErrorMessage(err); + settleProbe({ + ok: false, + error: connectError, + health: null, + status: null, + presence: null, + configSnapshot: null, + }); + }); }); } diff --git a/src/infra/exec-approval-channel-runtime.test.ts b/src/infra/exec-approval-channel-runtime.test.ts index f7de18d32f3..b15ff27238a 100644 --- a/src/infra/exec-approval-channel-runtime.test.ts +++ b/src/infra/exec-approval-channel-runtime.test.ts @@ -11,6 +11,7 @@ const mockGatewayClientRequests = vi.hoisted(() => })), ); const mockCreateOperatorApprovalsGatewayClient = vi.hoisted(() => vi.fn()); +const mockStartGatewayClientWhenEventLoopReady = vi.hoisted(() => vi.fn()); const loggerMocks = vi.hoisted(() => ({ debug: vi.fn(), error: vi.fn(), @@ -20,6 +21,10 @@ vi.mock("../gateway/operator-approvals-client.js", () => ({ createOperatorApprovalsGatewayClient: mockCreateOperatorApprovalsGatewayClient, })); +vi.mock("../gateway/client-start-readiness.js", () => ({ + startGatewayClientWhenEventLoopReady: mockStartGatewayClientWhenEventLoopReady, +})); + vi.mock("../logging/subsystem.js", () => ({ createSubsystemLogger: () => loggerMocks, })); @@ -97,6 +102,10 @@ beforeEach(() => { mockGatewayClientRequests.mockImplementation(async (method: string) => method.endsWith(".approval.list") ? [] : { ok: true }, ); + mockStartGatewayClientWhenEventLoopReady.mockReset().mockImplementation(async (client) => { + client.start(); + return { ready: true, elapsedMs: 0, maxDriftMs: 0, checks: 2, aborted: false }; + }); loggerMocks.debug.mockReset(); loggerMocks.error.mockReset(); mockCreateOperatorApprovalsGatewayClient.mockReset().mockImplementation(async (params) => ({ @@ -252,12 +261,48 @@ describe("createExecApprovalChannelRuntime", () => { await runtime.request("exec.approval.resolve", { id: "abc", decision: "deny" }); expect(mockGatewayClientStarts).toHaveBeenCalledTimes(1); + expect(mockStartGatewayClientWhenEventLoopReady).toHaveBeenCalledWith( + expect.objectContaining({ start: expect.any(Function) }), + { + clientOptions: { preauthHandshakeTimeoutMs: undefined }, + }, + ); expect(mockGatewayClientRequests).toHaveBeenCalledWith("exec.approval.resolve", { id: "abc", decision: "deny", }); }); + it("fails startup when gateway client readiness times out before start", async () => { + mockStartGatewayClientWhenEventLoopReady.mockResolvedValueOnce({ + ready: false, + elapsedMs: 30_000, + maxDriftMs: 1_000, + checks: 1, + aborted: false, + }); + const runtime = createExecApprovalChannelRuntime({ + label: "test/exec-approvals", + clientDisplayName: "Test Exec Approvals", + cfg: { gateway: { handshakeTimeoutMs: 30_000 } } as never, + isConfigured: () => true, + shouldHandle: () => true, + deliverRequested: async () => [], + finalizeResolved: async () => undefined, + }); + + await expect(runtime.start()).rejects.toThrow("gateway event loop readiness timeout"); + + expect(mockGatewayClientStarts).not.toHaveBeenCalled(); + expect(mockGatewayClientStops).toHaveBeenCalledTimes(1); + expect(mockStartGatewayClientWhenEventLoopReady).toHaveBeenCalledWith( + expect.objectContaining({ start: expect.any(Function) }), + { + clientOptions: { preauthHandshakeTimeoutMs: 30_000 }, + }, + ); + }); + it("can retry start after gateway client creation fails", async () => { const boom = new Error("boom"); mockCreateOperatorApprovalsGatewayClient diff --git a/src/infra/exec-approval-channel-runtime.ts b/src/infra/exec-approval-channel-runtime.ts index c86c8118f46..3c0cd134396 100644 --- a/src/infra/exec-approval-channel-runtime.ts +++ b/src/infra/exec-approval-channel-runtime.ts @@ -1,3 +1,4 @@ +import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js"; import type { GatewayClient, GatewayReconnectPausedInfo } from "../gateway/client.js"; import { createOperatorApprovalsGatewayClient } from "../gateway/operator-approvals-client.js"; import { readConnectErrorDetailCode } from "../gateway/protocol/connect-error-details.js"; @@ -358,7 +359,14 @@ export function createExecApprovalChannelRuntime< await adapter.beforeGatewayClientStart?.(); gatewayClient = client; try { - client.start(); + const readiness = await startGatewayClientWhenEventLoopReady(client, { + clientOptions: { + preauthHandshakeTimeoutMs: adapter.cfg.gateway?.handshakeTimeoutMs, + }, + }); + if (!readiness.ready) { + throw new Error("gateway event loop readiness timeout"); + } await ready; if (stopClientIfInactive(client)) { return; diff --git a/src/mcp/channel-bridge.ts b/src/mcp/channel-bridge.ts index ef420648186..67db49e638f 100644 --- a/src/mcp/channel-bridge.ts +++ b/src/mcp/channel-bridge.ts @@ -88,11 +88,13 @@ export class OpenClawChannelBridge { const [ { resolveGatewayClientBootstrap }, { GatewayClient: GatewayClientCtor }, + { startGatewayClientWhenEventLoopReady }, { APPROVALS_SCOPE, READ_SCOPE, WRITE_SCOPE }, { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES }, ] = await Promise.all([ import("../gateway/client-bootstrap.js"), import("../gateway/client.js"), + import("../gateway/client-start-readiness.js"), import("../gateway/method-scopes.js"), import("../gateway/protocol/client-info.js"), ]); @@ -143,7 +145,12 @@ export class OpenClawChannelBridge { this.retryingInitialConnect = false; }, }); - this.gateway.start(); + const readiness = await startGatewayClientWhenEventLoopReady(this.gateway, { + clientOptions: { preauthHandshakeTimeoutMs: bootstrap.preauthHandshakeTimeoutMs }, + }); + if (!readiness.ready) { + this.rejectReadyOnce(new Error("gateway event loop readiness timeout")); + } await this.readyPromise; } diff --git a/src/node-host/runner.ts b/src/node-host/runner.ts index 14aba18d66d..e0b3b588bfc 100644 --- a/src/node-host/runner.ts +++ b/src/node-host/runner.ts @@ -1,4 +1,5 @@ import { getRuntimeConfig, type OpenClawConfig } from "../config/config.js"; +import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js"; import { GatewayClient, type GatewayReconnectPausedInfo } from "../gateway/client.js"; import { resolveGatewayConnectionAuth } from "../gateway/connection-auth.js"; import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../gateway/protocol/client-info.js"; @@ -269,6 +270,11 @@ export async function runNodeHost(opts: NodeHostRunOptions): Promise { return bins; }, pathEnv); - client.start(); + const readiness = await startGatewayClientWhenEventLoopReady(client, { + clientOptions: { preauthHandshakeTimeoutMs: cfg.gateway?.handshakeTimeoutMs }, + }); + if (!readiness.ready) { + throw new Error("node host gateway event loop readiness timeout"); + } await new Promise(() => {}); } diff --git a/src/plugin-sdk/gateway-runtime.ts b/src/plugin-sdk/gateway-runtime.ts index 119efc343d5..48a28eb30d6 100644 --- a/src/plugin-sdk/gateway-runtime.ts +++ b/src/plugin-sdk/gateway-runtime.ts @@ -18,6 +18,7 @@ export { ensureGatewayStartupAuth } from "../gateway/startup-auth.js"; export { resolveGatewayAuth } from "../gateway/auth.js"; export { rawDataToString } from "../infra/ws.js"; export { GatewayClient } from "../gateway/client.js"; +export { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js"; export { createOperatorApprovalsGatewayClient, withOperatorApprovalsGatewayClient, diff --git a/src/tui/gateway-chat.ts b/src/tui/gateway-chat.ts index 80e78d471f9..bbd302af72e 100644 --- a/src/tui/gateway-chat.ts +++ b/src/tui/gateway-chat.ts @@ -7,6 +7,7 @@ import { ensureExplicitGatewayAuth, resolveExplicitGatewayAuth, } from "../gateway/call.js"; +import { startGatewayClientWhenEventLoopReady } from "../gateway/client-start-readiness.js"; import { GatewayClient, GatewayClientRequestError } from "../gateway/client.js"; import { isLoopbackHost } from "../gateway/net.js"; import { @@ -99,7 +100,7 @@ export class GatewayChatClient implements TuiBackend { private client: GatewayClient; private readyPromise: Promise; private resolveReady?: () => void; - readonly connection: { url: string; token?: string; password?: string }; + readonly connection: ResolvedGatewayConnection; hello?: HelloOk; onEvent?: (evt: GatewayEvent) => void; @@ -160,7 +161,13 @@ export class GatewayChatClient implements TuiBackend { } start() { - this.client.start(); + void startGatewayClientWhenEventLoopReady(this.client, { + clientOptions: { preauthHandshakeTimeoutMs: this.connection.preauthHandshakeTimeoutMs }, + }).then((readiness) => { + if (!readiness.ready && !readiness.aborted) { + this.onDisconnected?.("gateway event loop readiness timeout"); + } + }); } stop() {