mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 07:40:44 +00:00
fix(gateway): retry startup handshakes before surfacing failures
This commit is contained in:
@@ -69,6 +69,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Feishu: suppress distinct late `final` text deliveries after a streaming card has already closed, while keeping media attachments deliverable, so late-finals no longer reopen duplicate Feishu cards. Fixes #71977. (#72294) Thanks @MonkeyLeeT.
|
||||
- Gateway: expose `gateway.handshakeTimeoutMs` in config, schema, and docs while preserving `OPENCLAW_HANDSHAKE_TIMEOUT_MS` precedence, so loaded or low-powered hosts can tune local WebSocket pre-auth handshakes without patching dist files. Supersedes #51282; refs #73592 and #73652. Thanks @henry-the-frog.
|
||||
- Gateway/TUI/status: align configured and env-based WebSocket handshake budgets across local clients, probes, and fallback RPCs while preserving explicit status timeouts and paired-device auth fallback, so slow local gateways are not marked unreachable by a shorter client watchdog. Refs #73524, #73535, #73592, and #73602. Thanks @harshcatsystems-collab, @DJBlackhawk, and @Vksh07.
|
||||
- Gateway/startup: return retryable `UNAVAILABLE` during the sidecar startup window and keep CLI/TUI/status clients retrying inside their existing timeout budget, so early connects no longer surface as terminal handshake failures. Fixes #73652. Thanks @spenceryang1996-dot.
|
||||
- Agents/auth: scope external CLI credential discovery to configured providers during model auth status and startup prewarm, so opencode-only and other single-provider gateways do not block on unrelated Claude CLI Keychain probes. Fixes #73908. Thanks @Ailuras.
|
||||
- Agents/model selection: resolve slash-form aliases before provider/model parsing and keep alias-resolved primary models subject to transient provider cooldowns, so cron and persisted sessions do not retry cooled-down raw aliases. Fixes #73573 and #73657. Thanks @akai-shuuichi and @hashslingers.
|
||||
- Agents/Claude CLI: reuse already-cached macOS Keychain credentials for no-prompt Claude credential reads, so doctor/runtime checks do not miss fresh interactive Claude auth. Fixes #73682. Thanks @RyanSandoval.
|
||||
|
||||
@@ -97,6 +97,12 @@ Gateway → Client:
|
||||
}
|
||||
```
|
||||
|
||||
While the Gateway is still finishing startup sidecars, the `connect` request can
|
||||
return a retryable `UNAVAILABLE` error with `details.reason` set to
|
||||
`"startup-sidecars"` and `retryAfterMs`. Clients should retry that response
|
||||
within their overall connection budget instead of surfacing it as a terminal
|
||||
handshake failure.
|
||||
|
||||
`server`, `features`, `snapshot`, and `policy` are all required by the schema
|
||||
(`src/gateway/protocol/schema/frames.ts`). `auth` is also required and reports
|
||||
the negotiated role/scopes. `canvasHostUrl` is optional.
|
||||
|
||||
@@ -41,7 +41,7 @@ let lastRequestOptions: {
|
||||
params?: unknown;
|
||||
opts?: { expectFinal?: boolean; timeoutMs?: number | null };
|
||||
} | null = null;
|
||||
type StartMode = "hello" | "close" | "silent";
|
||||
type StartMode = "hello" | "close" | "silent" | "startup-retry-then-hello";
|
||||
let startMode: StartMode = "hello";
|
||||
let closeCode = 1006;
|
||||
let closeReason = "";
|
||||
@@ -87,6 +87,12 @@ vi.mock("./client.js", () => ({
|
||||
methods: helloMethods,
|
||||
},
|
||||
});
|
||||
} else if (startMode === "startup-retry-then-hello") {
|
||||
void lastClientOptions?.onHelloOk?.({
|
||||
features: {
|
||||
methods: helloMethods,
|
||||
},
|
||||
});
|
||||
} else if (startMode === "close") {
|
||||
lastClientOptions?.onClose?.(closeCode, closeReason);
|
||||
}
|
||||
@@ -134,6 +140,12 @@ class StubGatewayClient {
|
||||
methods: helloMethods,
|
||||
},
|
||||
});
|
||||
} else if (startMode === "startup-retry-then-hello") {
|
||||
void lastClientOptions?.onHelloOk?.({
|
||||
features: {
|
||||
methods: helloMethods,
|
||||
},
|
||||
});
|
||||
} else if (startMode === "close") {
|
||||
lastClientOptions?.onClose?.(closeCode, closeReason);
|
||||
}
|
||||
@@ -835,6 +847,15 @@ describe("callGateway error details", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps the request alive through internally retried startup-unavailable handshakes", async () => {
|
||||
startMode = "startup-retry-then-hello";
|
||||
setLocalLoopbackGatewayConfig();
|
||||
|
||||
await expect(callGateway({ method: "health" })).resolves.toEqual({ ok: true });
|
||||
|
||||
expect(lastRequestOptions?.method).toBe("health");
|
||||
});
|
||||
|
||||
it("includes connection details on timeout", async () => {
|
||||
startMode = "silent";
|
||||
setLocalLoopbackGatewayConfig();
|
||||
|
||||
@@ -29,6 +29,7 @@ class MockWebSocket {
|
||||
private errorHandlers: WsEventHandlers["error"][] = [];
|
||||
readonly sent: string[] = [];
|
||||
closeCalls = 0;
|
||||
lastClose: { code?: number; reason?: string } | null = null;
|
||||
terminateCalls = 0;
|
||||
autoCloseOnClose = true;
|
||||
readyState = MockWebSocket.CONNECTING;
|
||||
@@ -62,6 +63,7 @@ class MockWebSocket {
|
||||
|
||||
close(code?: number, reason?: string): void {
|
||||
this.closeCalls += 1;
|
||||
this.lastClose = { code, reason };
|
||||
this.readyState = MockWebSocket.CLOSING;
|
||||
if (this.autoCloseOnClose) {
|
||||
this.emitClose(code ?? 1000, reason ?? "");
|
||||
@@ -335,6 +337,73 @@ describe("GatewayClient request errors", () => {
|
||||
|
||||
client.stop();
|
||||
});
|
||||
|
||||
it("retries startup-unavailable connect failures without terminal callbacks", async () => {
|
||||
vi.useFakeTimers();
|
||||
wsInstances.length = 0;
|
||||
logDebugMock.mockClear();
|
||||
logErrorMock.mockClear();
|
||||
const onClose = vi.fn();
|
||||
const onConnectError = vi.fn();
|
||||
const client = new GatewayClient({
|
||||
url: "ws://127.0.0.1:18789",
|
||||
deviceIdentity: null,
|
||||
onClose,
|
||||
onConnectError,
|
||||
});
|
||||
try {
|
||||
client.start();
|
||||
const ws = getLatestWs();
|
||||
ws.emitOpen();
|
||||
ws.emitMessage(
|
||||
JSON.stringify({
|
||||
type: "event",
|
||||
event: "connect.challenge",
|
||||
payload: { nonce: "nonce-1" },
|
||||
}),
|
||||
);
|
||||
const connectFrame = JSON.parse(
|
||||
ws.sent.find((frame) => frame.includes('"method":"connect"')) ?? "{}",
|
||||
) as { id?: string };
|
||||
|
||||
ws.emitMessage(
|
||||
JSON.stringify({
|
||||
type: "res",
|
||||
id: connectFrame.id,
|
||||
ok: false,
|
||||
error: {
|
||||
code: "UNAVAILABLE",
|
||||
message: "gateway starting; retry shortly",
|
||||
details: { reason: "startup-sidecars" },
|
||||
retryable: true,
|
||||
retryAfterMs: 250,
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
for (let i = 0; i < 10; i += 1) {
|
||||
await Promise.resolve();
|
||||
}
|
||||
|
||||
expect(onConnectError).not.toHaveBeenCalled();
|
||||
expect(onClose).not.toHaveBeenCalled();
|
||||
expect(ws.lastClose).toEqual({ code: 1013, reason: "gateway starting" });
|
||||
expect(logDebugMock).toHaveBeenCalledWith(expect.stringContaining("gateway connect failed:"));
|
||||
expect(logErrorMock).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining("gateway connect failed:"),
|
||||
);
|
||||
expect(wsInstances).toHaveLength(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(249);
|
||||
expect(wsInstances).toHaveLength(1);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(wsInstances).toHaveLength(2);
|
||||
} finally {
|
||||
client.stop();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("GatewayClient close handling", () => {
|
||||
|
||||
@@ -50,6 +50,7 @@ import {
|
||||
validateRequestFrame,
|
||||
validateResponseFrame,
|
||||
} from "./protocol/index.js";
|
||||
import { resolveGatewayStartupRetryAfterMs } from "./protocol/startup-unavailable.js";
|
||||
|
||||
type Pending = {
|
||||
resolve: (value: unknown) => void;
|
||||
@@ -168,6 +169,7 @@ export const GATEWAY_CLOSE_CODE_HINTS: Readonly<Record<number, string>> = {
|
||||
1006: "abnormal closure (no close frame)",
|
||||
1008: "policy violation",
|
||||
1012: "service restart",
|
||||
1013: "try again later",
|
||||
};
|
||||
|
||||
export function describeGatewayCloseCode(code: number): string | undefined {
|
||||
@@ -227,6 +229,7 @@ export class GatewayClient {
|
||||
private reconnectTimer: NodeJS.Timeout | null = null;
|
||||
private pendingDeviceTokenRetry = false;
|
||||
private deviceTokenRetryBudgetUsed = false;
|
||||
private pendingStartupReconnectDelayMs: number | null = null;
|
||||
private pendingConnectErrorDetailCode: string | null = null;
|
||||
// Track last tick to detect silent stalls.
|
||||
private lastTick: number | null = null;
|
||||
@@ -350,6 +353,10 @@ export class GatewayClient {
|
||||
}
|
||||
this.socketOpened = false;
|
||||
this.resolvePendingStop(ws);
|
||||
if (this.pendingStartupReconnectDelayMs !== null) {
|
||||
this.scheduleReconnect();
|
||||
return;
|
||||
}
|
||||
// Clear persisted device auth state only when device-token auth was active.
|
||||
// Shared token/password failures can return the same close reason but should
|
||||
// not erase a valid cached device token.
|
||||
@@ -429,6 +436,7 @@ export class GatewayClient {
|
||||
this.closed = true;
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
this.deviceTokenRetryBudgetUsed = false;
|
||||
this.pendingStartupReconnectDelayMs = null;
|
||||
this.pendingConnectErrorDetailCode = null;
|
||||
this.clearReconnectTimer();
|
||||
if (this.tickTimer) {
|
||||
@@ -576,6 +584,7 @@ export class GatewayClient {
|
||||
.then((helloOk) => {
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
this.deviceTokenRetryBudgetUsed = false;
|
||||
this.pendingStartupReconnectDelayMs = null;
|
||||
this.pendingConnectErrorDetailCode = null;
|
||||
const authInfo = helloOk?.auth;
|
||||
if (authInfo?.deviceToken && this.opts.deviceIdentity) {
|
||||
@@ -626,6 +635,13 @@ export class GatewayClient {
|
||||
this.deviceTokenRetryBudgetUsed = true;
|
||||
this.backoffMs = Math.min(this.backoffMs, 250);
|
||||
}
|
||||
const startupRetryAfterMs = resolveGatewayStartupRetryAfterMs(err);
|
||||
if (startupRetryAfterMs !== null) {
|
||||
this.pendingStartupReconnectDelayMs = startupRetryAfterMs;
|
||||
logDebug(`gateway connect failed: ${String(err)}`);
|
||||
this.ws?.close(1013, "gateway starting");
|
||||
return;
|
||||
}
|
||||
this.opts.onConnectError?.(err instanceof Error ? err : new Error(String(err)));
|
||||
const msg = `gateway connect failed: ${String(err)}`;
|
||||
if (this.opts.mode === GATEWAY_CLIENT_MODES.PROBE || isGatewayClientStoppedError(err)) {
|
||||
@@ -916,8 +932,12 @@ export class GatewayClient {
|
||||
this.tickTimer = null;
|
||||
}
|
||||
this.clearReconnectTimer();
|
||||
const delay = this.backoffMs;
|
||||
this.backoffMs = Math.min(this.backoffMs * 2, 30_000);
|
||||
const startupDelay = this.pendingStartupReconnectDelayMs;
|
||||
this.pendingStartupReconnectDelayMs = null;
|
||||
const delay = startupDelay ?? this.backoffMs;
|
||||
if (startupDelay === null) {
|
||||
this.backoffMs = Math.min(this.backoffMs * 2, 30_000);
|
||||
}
|
||||
this.reconnectTimer = setTimeout(() => {
|
||||
this.reconnectTimer = null;
|
||||
this.start();
|
||||
|
||||
@@ -3,7 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
const gatewayClientState = vi.hoisted(() => ({
|
||||
options: null as Record<string, unknown> | null,
|
||||
requests: [] as string[],
|
||||
startMode: "hello" as "hello" | "close" | "connect-error-close",
|
||||
startMode: "hello" as "hello" | "close" | "connect-error-close" | "startup-retry-then-hello",
|
||||
close: { code: 1008, reason: "pairing required" },
|
||||
helloAuth: {
|
||||
role: "operator",
|
||||
@@ -76,6 +76,17 @@ class MockGatewayClient {
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (gatewayClientState.startMode === "startup-retry-then-hello") {
|
||||
const onHelloOk = this.opts.onHelloOk;
|
||||
if (typeof onHelloOk === "function") {
|
||||
await onHelloOk({
|
||||
type: "hello-ok",
|
||||
server: gatewayClientState.helloServer,
|
||||
auth: gatewayClientState.helloAuth,
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
const onHelloOk = this.opts.onHelloOk;
|
||||
if (typeof onHelloOk === "function") {
|
||||
await onHelloOk({
|
||||
@@ -381,4 +392,21 @@ describe("probeGateway", () => {
|
||||
close: { code: 1008, reason: "pairing required" },
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps probing through internally retried startup-unavailable handshakes", async () => {
|
||||
gatewayClientState.startMode = "startup-retry-then-hello";
|
||||
|
||||
const result = await probeGateway({
|
||||
url: "ws://127.0.0.1:18789",
|
||||
auth: { token: "secret" },
|
||||
timeoutMs: 1_000,
|
||||
includeDetails: false,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
ok: true,
|
||||
error: null,
|
||||
close: null,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
55
src/gateway/protocol/startup-unavailable.ts
Normal file
55
src/gateway/protocol/startup-unavailable.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
export const GATEWAY_STARTUP_UNAVAILABLE_REASON = "startup-sidecars";
|
||||
export const GATEWAY_STARTUP_RETRY_AFTER_MS = 500;
|
||||
export const GATEWAY_STARTUP_RETRY_MIN_MS = 100;
|
||||
export const GATEWAY_STARTUP_RETRY_MAX_MS = 2_000;
|
||||
|
||||
export type GatewayStartupUnavailableDetails = {
|
||||
reason: typeof GATEWAY_STARTUP_UNAVAILABLE_REASON;
|
||||
};
|
||||
|
||||
export function gatewayStartupUnavailableDetails(): GatewayStartupUnavailableDetails {
|
||||
return { reason: GATEWAY_STARTUP_UNAVAILABLE_REASON };
|
||||
}
|
||||
|
||||
export function isGatewayStartupUnavailableDetails(
|
||||
details: unknown,
|
||||
): details is GatewayStartupUnavailableDetails {
|
||||
return (
|
||||
typeof details === "object" &&
|
||||
details !== null &&
|
||||
(details as { reason?: unknown }).reason === GATEWAY_STARTUP_UNAVAILABLE_REASON
|
||||
);
|
||||
}
|
||||
|
||||
export function isRetryableGatewayStartupUnavailableError(error: unknown): boolean {
|
||||
if (!error || typeof error !== "object") {
|
||||
return false;
|
||||
}
|
||||
const shaped = error as {
|
||||
code?: unknown;
|
||||
gatewayCode?: unknown;
|
||||
retryable?: unknown;
|
||||
details?: unknown;
|
||||
};
|
||||
const code = shaped.gatewayCode ?? shaped.code;
|
||||
return (
|
||||
code === "UNAVAILABLE" &&
|
||||
shaped.retryable === true &&
|
||||
isGatewayStartupUnavailableDetails(shaped.details)
|
||||
);
|
||||
}
|
||||
|
||||
export function resolveGatewayStartupRetryAfterMs(error: unknown): number | null {
|
||||
if (!isRetryableGatewayStartupUnavailableError(error)) {
|
||||
return null;
|
||||
}
|
||||
const retryAfterMs = (error as { retryAfterMs?: unknown }).retryAfterMs;
|
||||
const raw =
|
||||
typeof retryAfterMs === "number" && Number.isFinite(retryAfterMs)
|
||||
? retryAfterMs
|
||||
: GATEWAY_STARTUP_RETRY_AFTER_MS;
|
||||
return Math.min(
|
||||
Math.max(Math.floor(raw), GATEWAY_STARTUP_RETRY_MIN_MS),
|
||||
GATEWAY_STARTUP_RETRY_MAX_MS,
|
||||
);
|
||||
}
|
||||
@@ -36,6 +36,7 @@ export function attachGatewayWsHandlers(params: GatewayWsRuntimeParams) {
|
||||
rateLimiter: params.rateLimiter,
|
||||
browserRateLimiter: params.browserRateLimiter,
|
||||
preauthHandshakeTimeoutMs: params.preauthHandshakeTimeoutMs,
|
||||
isStartupPending: params.isStartupPending,
|
||||
gatewayMethods: params.gatewayMethods,
|
||||
events: params.events,
|
||||
refreshHealthSnapshot: params.context.refreshHealthSnapshot,
|
||||
|
||||
@@ -942,6 +942,7 @@ export async function startGatewayServer(
|
||||
rateLimiter: authRateLimiter,
|
||||
browserRateLimiter: browserAuthRateLimiter,
|
||||
preauthHandshakeTimeoutMs,
|
||||
isStartupPending: () => !startupSidecarsReady,
|
||||
gatewayMethods: runtimeState.gatewayMethods,
|
||||
events: GATEWAY_EVENTS,
|
||||
logGateway: log,
|
||||
|
||||
128
src/gateway/server/ws-connection.startup.test.ts
Normal file
128
src/gateway/server/ws-connection.startup.test.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import type { WebSocketServer } from "ws";
|
||||
import { GATEWAY_CLIENT_MODES, GATEWAY_CLIENT_NAMES } from "../protocol/client-info.js";
|
||||
import { PROTOCOL_VERSION } from "../protocol/index.js";
|
||||
import { GATEWAY_STARTUP_UNAVAILABLE_REASON } from "../protocol/startup-unavailable.js";
|
||||
import { attachGatewayWsConnectionHandler } from "./ws-connection.js";
|
||||
|
||||
function createLogger() {
|
||||
return {
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function createRequestContext() {
|
||||
return {
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
nodeRegistry: { unregister: vi.fn() },
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
describe("attachGatewayWsConnectionHandler startup readiness", () => {
|
||||
it("returns a retryable startup-unavailable connect response while sidecars are pending", async () => {
|
||||
const listeners = new Map<string, (...args: unknown[]) => void>();
|
||||
const wss = {
|
||||
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
|
||||
listeners.set(event, handler);
|
||||
}),
|
||||
} as unknown as WebSocketServer;
|
||||
const sent: unknown[] = [];
|
||||
const socket = Object.assign(new EventEmitter(), {
|
||||
_socket: {
|
||||
remoteAddress: "127.0.0.1",
|
||||
remotePort: 1234,
|
||||
localAddress: "127.0.0.1",
|
||||
localPort: 5678,
|
||||
},
|
||||
send: vi.fn((data: string, cb?: (err?: Error) => void) => {
|
||||
sent.push(JSON.parse(data));
|
||||
cb?.();
|
||||
}),
|
||||
close: vi.fn((code?: number, reason?: string) => {
|
||||
socket.emit("close", code ?? 1000, Buffer.from(reason ?? ""));
|
||||
}),
|
||||
});
|
||||
const upgradeReq = {
|
||||
headers: { host: "127.0.0.1:19001" },
|
||||
socket: { localAddress: "127.0.0.1" },
|
||||
};
|
||||
|
||||
attachGatewayWsConnectionHandler({
|
||||
wss,
|
||||
clients: new Set(),
|
||||
preauthConnectionBudget: { release: vi.fn() } as never,
|
||||
port: 19001,
|
||||
canvasHostEnabled: false,
|
||||
resolvedAuth: { mode: "none", allowTailscale: false },
|
||||
isStartupPending: () => true,
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
refreshHealthSnapshot: vi.fn(async () => ({}) as never),
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
extraHandlers: {},
|
||||
broadcast: vi.fn(),
|
||||
buildRequestContext: () => createRequestContext() as never,
|
||||
});
|
||||
|
||||
const onConnection = listeners.get("connection");
|
||||
expect(onConnection).toBeTypeOf("function");
|
||||
onConnection?.(socket, upgradeReq);
|
||||
socket.emit(
|
||||
"message",
|
||||
JSON.stringify({
|
||||
type: "req",
|
||||
id: "connect-1",
|
||||
method: "connect",
|
||||
params: {
|
||||
minProtocol: PROTOCOL_VERSION,
|
||||
maxProtocol: PROTOCOL_VERSION,
|
||||
client: {
|
||||
id: GATEWAY_CLIENT_NAMES.CLI,
|
||||
version: "dev",
|
||||
platform: "test",
|
||||
mode: GATEWAY_CLIENT_MODES.CLI,
|
||||
},
|
||||
role: "operator",
|
||||
scopes: ["operator.read"],
|
||||
caps: [],
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(
|
||||
sent.some(
|
||||
(frame) =>
|
||||
typeof frame === "object" &&
|
||||
frame !== null &&
|
||||
(frame as { type?: unknown; id?: unknown; ok?: unknown }).type === "res" &&
|
||||
(frame as { id?: unknown }).id === "connect-1",
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
expect(sent).toContainEqual(
|
||||
expect.objectContaining({
|
||||
type: "res",
|
||||
id: "connect-1",
|
||||
ok: false,
|
||||
error: expect.objectContaining({
|
||||
code: "UNAVAILABLE",
|
||||
retryable: true,
|
||||
retryAfterMs: 500,
|
||||
details: { reason: GATEWAY_STARTUP_UNAVAILABLE_REASON },
|
||||
}),
|
||||
}),
|
||||
);
|
||||
await vi.waitFor(() => {
|
||||
expect(socket.close).toHaveBeenCalledWith(1013, "gateway starting");
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -132,6 +132,7 @@ export type GatewayWsSharedHandlerParams = {
|
||||
/** Browser-origin fallback limiter (loopback is never exempt). */
|
||||
browserRateLimiter?: AuthRateLimiter;
|
||||
preauthHandshakeTimeoutMs?: number;
|
||||
isStartupPending?: () => boolean;
|
||||
gatewayMethods: string[];
|
||||
events: string[];
|
||||
refreshHealthSnapshot: GatewayRequestContext["refreshHealthSnapshot"];
|
||||
@@ -168,6 +169,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
|
||||
resolveSharedGatewaySessionGeneration(getResolvedAuth()),
|
||||
rateLimiter,
|
||||
browserRateLimiter,
|
||||
isStartupPending,
|
||||
gatewayMethods,
|
||||
events,
|
||||
refreshHealthSnapshot,
|
||||
@@ -403,6 +405,7 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
|
||||
getRequiredSharedGatewaySessionGeneration,
|
||||
rateLimiter,
|
||||
browserRateLimiter,
|
||||
isStartupPending,
|
||||
gatewayMethods,
|
||||
events,
|
||||
extraHandlers,
|
||||
|
||||
@@ -96,6 +96,10 @@ import {
|
||||
validateConnectParams,
|
||||
validateRequestFrame,
|
||||
} from "../../protocol/index.js";
|
||||
import {
|
||||
gatewayStartupUnavailableDetails,
|
||||
GATEWAY_STARTUP_RETRY_AFTER_MS,
|
||||
} from "../../protocol/startup-unavailable.js";
|
||||
import { parseGatewayRole } from "../../role-policy.js";
|
||||
import {
|
||||
MAX_BUFFERED_BYTES,
|
||||
@@ -191,6 +195,7 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
rateLimiter?: AuthRateLimiter;
|
||||
/** Browser-origin fallback limiter (loopback is never exempt). */
|
||||
browserRateLimiter?: AuthRateLimiter;
|
||||
isStartupPending?: () => boolean;
|
||||
gatewayMethods: string[];
|
||||
events: string[];
|
||||
extraHandlers: GatewayRequestHandlers;
|
||||
@@ -230,6 +235,7 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
getRequiredSharedGatewaySessionGeneration,
|
||||
rateLimiter,
|
||||
browserRateLimiter,
|
||||
isStartupPending,
|
||||
gatewayMethods,
|
||||
events,
|
||||
extraHandlers,
|
||||
@@ -447,6 +453,22 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
});
|
||||
};
|
||||
|
||||
if (isStartupPending?.()) {
|
||||
markHandshakeFailure("startup-sidecars-pending");
|
||||
await sendFrame({
|
||||
type: "res",
|
||||
id: frame.id,
|
||||
ok: false,
|
||||
error: errorShape(ErrorCodes.UNAVAILABLE, "gateway starting; retry shortly", {
|
||||
retryable: true,
|
||||
retryAfterMs: GATEWAY_STARTUP_RETRY_AFTER_MS,
|
||||
details: gatewayStartupUnavailableDetails(),
|
||||
}),
|
||||
}).catch(() => {});
|
||||
queueMicrotask(() => close(1013, "gateway starting"));
|
||||
return;
|
||||
}
|
||||
|
||||
// protocol negotiation
|
||||
const { minProtocol, maxProtocol } = connectParams;
|
||||
if (maxProtocol < PROTOCOL_VERSION || minProtocol > PROTOCOL_VERSION) {
|
||||
|
||||
@@ -38,6 +38,7 @@ class MockWebSocket {
|
||||
};
|
||||
|
||||
readonly sent: string[] = [];
|
||||
lastClose: { code?: number; reason?: string } | null = null;
|
||||
readyState = MockWebSocket.OPEN;
|
||||
|
||||
constructor(_url: string) {
|
||||
@@ -52,7 +53,8 @@ class MockWebSocket {
|
||||
this.sent.push(data);
|
||||
}
|
||||
|
||||
close() {
|
||||
close(code?: number, reason?: string) {
|
||||
this.lastClose = { code, reason };
|
||||
this.readyState = 3;
|
||||
}
|
||||
|
||||
@@ -355,6 +357,47 @@ describe("GatewayBrowserClient", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("retries startup-unavailable connect responses without terminal callbacks", async () => {
|
||||
vi.useFakeTimers();
|
||||
const onClose = vi.fn();
|
||||
const client = new GatewayBrowserClient({
|
||||
url: "ws://127.0.0.1:18789",
|
||||
token: "shared-auth-token",
|
||||
onClose,
|
||||
});
|
||||
try {
|
||||
const { ws, connectFrame } = await startConnect(client);
|
||||
|
||||
ws.emitMessage({
|
||||
type: "res",
|
||||
id: connectFrame.id,
|
||||
ok: false,
|
||||
error: {
|
||||
code: "UNAVAILABLE",
|
||||
message: "gateway starting; retry shortly",
|
||||
details: { reason: "startup-sidecars" },
|
||||
retryable: true,
|
||||
retryAfterMs: 250,
|
||||
},
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
|
||||
await expectSocketClosed(ws);
|
||||
expect(ws.lastClose).toEqual({ code: 4013, reason: "gateway starting" });
|
||||
ws.emitClose(4013, "gateway starting");
|
||||
expect(onClose).not.toHaveBeenCalled();
|
||||
expect(wsInstances).toHaveLength(1);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(249);
|
||||
expect(wsInstances).toHaveLength(1);
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
expect(wsInstances).toHaveLength(2);
|
||||
} finally {
|
||||
client.stop();
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("treats IPv6 loopback as trusted for bounded device-token retry", async () => {
|
||||
vi.useFakeTimers();
|
||||
const { client } = await startRetriedDeviceTokenConnect({
|
||||
|
||||
@@ -11,6 +11,10 @@ import {
|
||||
readConnectErrorRecoveryAdvice,
|
||||
readConnectErrorDetailCode,
|
||||
} from "../../../src/gateway/protocol/connect-error-details.js";
|
||||
import {
|
||||
isRetryableGatewayStartupUnavailableError,
|
||||
resolveGatewayStartupRetryAfterMs,
|
||||
} from "../../../src/gateway/protocol/startup-unavailable.js";
|
||||
import { clearDeviceAuthToken, loadDeviceAuthToken, storeDeviceAuthToken } from "./device-auth.ts";
|
||||
import { loadOrCreateDeviceIdentity, signDevicePayload } from "./device-identity.ts";
|
||||
import { generateUUID } from "./uuid.ts";
|
||||
@@ -228,6 +232,7 @@ export type GatewayEventListener = (evt: GatewayEventFrame) => void;
|
||||
|
||||
// 4008 = application-defined code (browser rejects 1008 "Policy Violation")
|
||||
const CONNECT_FAILED_CLOSE_CODE = 4008;
|
||||
const STARTUP_RETRY_CLOSE_CODE = 4013;
|
||||
|
||||
function buildGatewayConnectAuth(
|
||||
selectedAuth: SelectedConnectAuth,
|
||||
@@ -302,6 +307,7 @@ export class GatewayBrowserClient {
|
||||
private pendingConnectError: GatewayErrorInfo | undefined;
|
||||
private pendingDeviceTokenRetry = false;
|
||||
private deviceTokenRetryBudgetUsed = false;
|
||||
private pendingStartupReconnectDelayMs: number | null = null;
|
||||
private eventListeners = new Set<GatewayEventListener>();
|
||||
|
||||
constructor(private opts: GatewayBrowserClientOptions) {}
|
||||
@@ -319,6 +325,7 @@ export class GatewayBrowserClient {
|
||||
this.pendingConnectError = undefined;
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
this.deviceTokenRetryBudgetUsed = false;
|
||||
this.pendingStartupReconnectDelayMs = null;
|
||||
this.flushPending(new Error("gateway client stopped"));
|
||||
}
|
||||
|
||||
@@ -348,6 +355,11 @@ export class GatewayBrowserClient {
|
||||
const connectError = this.pendingConnectError;
|
||||
this.pendingConnectError = undefined;
|
||||
this.ws = null;
|
||||
if (this.pendingStartupReconnectDelayMs !== null) {
|
||||
this.flushPending(new Error(`gateway closed (${ev.code}): ${reason}`));
|
||||
this.scheduleReconnect();
|
||||
return;
|
||||
}
|
||||
this.flushPending(new Error(`gateway closed (${ev.code}): ${reason}`));
|
||||
this.opts.onClose?.({ code: ev.code, reason, error: connectError });
|
||||
const connectErrorCode = resolveGatewayErrorDetailCode(connectError);
|
||||
@@ -371,8 +383,12 @@ export class GatewayBrowserClient {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
const delay = this.backoffMs;
|
||||
this.backoffMs = Math.min(this.backoffMs * 1.7, 15_000);
|
||||
const startupDelay = this.pendingStartupReconnectDelayMs;
|
||||
this.pendingStartupReconnectDelayMs = null;
|
||||
const delay = startupDelay ?? this.backoffMs;
|
||||
if (startupDelay === null) {
|
||||
this.backoffMs = Math.min(this.backoffMs * 1.7, 15_000);
|
||||
}
|
||||
this.clearConnectTimer();
|
||||
this.connectTimer = window.setTimeout(() => {
|
||||
this.connectTimer = null;
|
||||
@@ -468,6 +484,7 @@ export class GatewayBrowserClient {
|
||||
}
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
this.deviceTokenRetryBudgetUsed = false;
|
||||
this.pendingStartupReconnectDelayMs = null;
|
||||
if (hello?.auth?.deviceToken && plan.deviceIdentity) {
|
||||
storeDeviceAuthToken({
|
||||
deviceId: plan.deviceIdentity.deviceId,
|
||||
@@ -531,6 +548,14 @@ export class GatewayBrowserClient {
|
||||
) {
|
||||
clearDeviceAuthToken({ deviceId: plan.deviceIdentity.deviceId, role: plan.role });
|
||||
}
|
||||
const startupRetryAfterMs = resolveGatewayStartupRetryAfterMs(err);
|
||||
if (startupRetryAfterMs !== null) {
|
||||
this.pendingStartupReconnectDelayMs = startupRetryAfterMs;
|
||||
}
|
||||
if (isRetryableGatewayStartupUnavailableError(err)) {
|
||||
ws.close(STARTUP_RETRY_CLOSE_CODE, "gateway starting");
|
||||
return;
|
||||
}
|
||||
ws.close(CONNECT_FAILED_CLOSE_CODE, "connect failed");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user