diff --git a/CHANGELOG.md b/CHANGELOG.md index 747d37adfc0..3dd8bca824b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Agents/reasoning: recover fully wrapped unclosed `` replies that would otherwise sanitize to empty text while keeping strict stripping for closed reasoning blocks and unclosed tails after visible text. Fixes #37696; supersedes #51915. Thanks @druide67 and @okuyam2y. +- Control UI/Gateway: bind WebChat handshakes to their active socket and reject post-close server registrations, so aborted connects no longer leave zombie clients or misleading duplicate WebSocket connection logs. Fixes #72753. Thanks @LumenFromTheFuture. - Plugins/Windows: normalize Windows absolute paths before handing bundled plugin modules to Jiti, so Feishu/Lark message sending no longer fails with unsupported `c:` ESM loader URLs. Fixes #72783. Thanks @jackychen-png. - CLI/doctor: run bundled plugin runtime-dependency repairs through the async npm installer with spinner/line progress and heartbeat updates, so long `openclaw doctor --fix` installs no longer look hung in TTY or piped output. Fixes #72775. Thanks @dfpalhano. - Feishu/Windows: normalize bundled channel sidecar loads before Jiti evaluates them, so Feishu outbound sends no longer fail with raw `C:` ESM loader errors on Windows. Fixes #72783. Thanks @jackychen-png. diff --git a/src/gateway/server/ws-connection.test.ts b/src/gateway/server/ws-connection.test.ts index e354429fed2..628edf3b71b 100644 --- a/src/gateway/server/ws-connection.test.ts +++ b/src/gateway/server/ws-connection.test.ts @@ -100,4 +100,69 @@ describe("attachGatewayWsConnectionHandler", () => { resolveSharedGatewaySessionGeneration(currentAuth), ); }); + + it("rejects late client registration after a pre-connect socket close", () => { + const listeners = new Map void>(); + const wss = { + on: vi.fn((event: string, handler: (...args: unknown[]) => void) => { + listeners.set(event, handler); + }), + } as unknown as WebSocketServer; + const socket = Object.assign(new EventEmitter(), { + _socket: { + remoteAddress: "127.0.0.1", + remotePort: 1234, + localAddress: "127.0.0.1", + localPort: 5678, + }, + send: vi.fn(), + close: vi.fn(), + }); + const upgradeReq = { + headers: { host: "127.0.0.1:19001" }, + socket: { localAddress: "127.0.0.1" }, + }; + const clients = new Set(); + + attachGatewayWsConnectionHandler({ + wss, + clients: clients as never, + preauthConnectionBudget: { release: vi.fn() } as never, + port: 19001, + canvasHostEnabled: false, + resolvedAuth: createResolvedAuth("token"), + gatewayMethods: [], + events: [], + logGateway: createLogger() as never, + logHealth: createLogger() as never, + logWsControl: createLogger() as never, + extraHandlers: {}, + broadcast: vi.fn(), + buildRequestContext: () => + ({ + unsubscribeAllSessionEvents: vi.fn(), + nodeRegistry: { unregister: vi.fn() }, + nodeUnsubscribeAll: vi.fn(), + }) as never, + }); + + const onConnection = listeners.get("connection"); + expect(onConnection).toBeTypeOf("function"); + onConnection?.(socket, upgradeReq); + + const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as { + setClient: (client: unknown) => boolean; + }; + socket.emit("close", 1001, Buffer.from("client left")); + + const registered = passed.setClient({ + socket, + connect: { client: { id: "openclaw-control-ui", mode: "webchat" } }, + connId: "late-client", + usesSharedGatewayAuth: false, + }); + + expect(registered).toBe(false); + expect(clients.size).toBe(0); + }); }); diff --git a/src/gateway/server/ws-connection.ts b/src/gateway/server/ws-connection.ts index 5136d83f36e..59c76338f64 100644 --- a/src/gateway/server/ws-connection.ts +++ b/src/gateway/server/ws-connection.ts @@ -408,9 +408,13 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti clearHandshakeTimer: () => clearTimeout(handshakeTimer), getClient: () => client, setClient: (next) => { + if (closed) { + return false; + } releasePreauthBudget(); client = next; clients.add(next); + return true; }, setHandshakeState: (next) => { handshakeState = next; diff --git a/src/gateway/server/ws-connection/message-handler.ts b/src/gateway/server/ws-connection/message-handler.ts index f34ceecd329..115265161c5 100644 --- a/src/gateway/server/ws-connection/message-handler.ts +++ b/src/gateway/server/ws-connection/message-handler.ts @@ -201,7 +201,7 @@ export function attachGatewayWsMessageHandler(params: { isClosed: () => boolean; clearHandshakeTimer: () => void; getClient: () => GatewayWsClient | null; - setClient: (next: GatewayWsClient) => void; + setClient: (next: GatewayWsClient) => boolean; setHandshakeState: (state: "pending" | "connected" | "failed") => void; setCloseCause: (cause: string, meta?: Record) => void; setLastFrameMeta: (meta: { type?: string; method?: string; id?: string }) => void; @@ -1266,39 +1266,12 @@ export function attachGatewayWsMessageHandler(params: { const instanceId = connectParams.client.instanceId; const presenceKey = shouldTrackPresence ? (device?.id ?? instanceId ?? connId) : undefined; - logWs("in", "connect", { - connId, - client: connectParams.client.id, - clientDisplayName: connectParams.client.displayName, - version: connectParams.client.version, - mode: connectParams.client.mode, - clientId, - platform: connectParams.client.platform, - auth: authMethod, - }); - - if (isWebchatConnect(connectParams)) { - logWsControl.info( - `webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${clientLabel} ${connectParams.client.mode} v${connectParams.client.version}`, - ); - } - - if (presenceKey) { - upsertPresence(presenceKey, { - host: connectParams.client.displayName ?? connectParams.client.id ?? os.hostname(), - ip: isLocalClient ? undefined : reportedClientIp, - version: connectParams.client.version, - platform: connectParams.client.platform, - deviceFamily: connectParams.client.deviceFamily, - modelIdentifier: connectParams.client.modelIdentifier, - mode: connectParams.client.mode, - deviceId: device?.id, - roles: [role], - scopes, - instanceId: device?.id ?? instanceId, - reason: "connect", + if (isClosed()) { + setCloseCause("connect-aborted-before-register", { + ...clientMeta, + auth: authMethod, }); - incrementPresenceVersion(); + return; } const snapshot = buildGatewaySnapshot({ @@ -1367,8 +1340,48 @@ export function attachGatewayWsMessageHandler(params: { canvasCapabilityExpiresAtMs, }; setSocketMaxPayload(socket, MAX_PAYLOAD_BYTES); - setClient(nextClient); + if (!setClient(nextClient)) { + setCloseCause("connect-aborted-before-register", { + ...clientMeta, + auth: authMethod, + }); + return; + } setHandshakeState("connected"); + logWs("in", "connect", { + connId, + client: connectParams.client.id, + clientDisplayName: connectParams.client.displayName, + version: connectParams.client.version, + mode: connectParams.client.mode, + clientId, + platform: connectParams.client.platform, + auth: authMethod, + }); + + if (isWebchatConnect(connectParams)) { + logWsControl.info( + `webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${clientLabel} ${connectParams.client.mode} v${connectParams.client.version}`, + ); + } + + if (presenceKey) { + upsertPresence(presenceKey, { + host: connectParams.client.displayName ?? connectParams.client.id ?? os.hostname(), + ip: isLocalClient ? undefined : reportedClientIp, + version: connectParams.client.version, + platform: connectParams.client.platform, + deviceFamily: connectParams.client.deviceFamily, + modelIdentifier: connectParams.client.modelIdentifier, + mode: connectParams.client.mode, + deviceId: device?.id, + roles: [role], + scopes, + instanceId: device?.id ?? instanceId, + reason: "connect", + }); + incrementPresenceVersion(); + } if (role === "node") { const context = buildRequestContext(); const nodeSession = context.nodeRegistry.register(nextClient, { diff --git a/ui/src/ui/gateway.node.test.ts b/ui/src/ui/gateway.node.test.ts index ef8d9a2f21f..ccdd9085a7e 100644 --- a/ui/src/ui/gateway.node.test.ts +++ b/ui/src/ui/gateway.node.test.ts @@ -417,6 +417,56 @@ describe("GatewayBrowserClient", () => { vi.useRealTimers(); }); + it("does not send stale connect frames on a replacement socket", async () => { + vi.useFakeTimers(); + let resolveIdentity!: (identity: DeviceIdentity) => void; + loadOrCreateDeviceIdentityMock.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveIdentity = resolve; + }), + ); + + const client = new GatewayBrowserClient({ + url: "ws://127.0.0.1:18789", + token: "shared-auth-token", + }); + + client.start(); + const firstWs = getLatestWebSocket(); + firstWs.emitOpen(); + firstWs.emitMessage({ + type: "event", + event: "connect.challenge", + payload: { nonce: "nonce-stale" }, + }); + await vi.advanceTimersByTimeAsync(0); + expect(firstWs.sent).toHaveLength(0); + + firstWs.emitClose(1006, "socket lost"); + await vi.advanceTimersByTimeAsync(800); + const secondWs = getLatestWebSocket(); + expect(secondWs).not.toBe(firstWs); + + resolveIdentity({ + deviceId: "device-1", + privateKey: "private-key", // pragma: allowlist secret + publicKey: "public-key", // pragma: allowlist secret + }); + await vi.advanceTimersByTimeAsync(0); + await Promise.resolve(); + + expect(secondWs.sent).toHaveLength(0); + + const { connectFrame } = await continueConnect(secondWs, "nonce-current"); + expect(connectFrame.method).toBe("connect"); + const signedPayload = signDevicePayloadMock.mock.calls.at(-1)?.[1]; + expect(signedPayload).toContain("|shared-auth-token|nonce-current"); + + client.stop(); + vi.useRealTimers(); + }); + it("cancels a scheduled reconnect when stopped before the retry fires", async () => { vi.useFakeTimers(); diff --git a/ui/src/ui/gateway.ts b/ui/src/ui/gateway.ts index 6ef973417d3..a092f75eca8 100644 --- a/ui/src/ui/gateway.ts +++ b/ui/src/ui/gateway.ts @@ -296,6 +296,7 @@ export class GatewayBrowserClient { private connectNonce: string | null = null; private connectSent = false; private connectTimer: number | null = null; + private connectGeneration = 0; private backoffMs = 800; private pendingConnectError: GatewayErrorInfo | undefined; private pendingDeviceTokenRetry = false; @@ -328,10 +329,20 @@ export class GatewayBrowserClient { if (this.closed) { return; } - this.ws = new WebSocket(this.opts.url); - this.ws.addEventListener("open", () => this.queueConnect()); - this.ws.addEventListener("message", (ev) => this.handleMessage(String(ev.data ?? ""))); - this.ws.addEventListener("close", (ev) => { + const ws = new WebSocket(this.opts.url); + const generation = ++this.connectGeneration; + this.ws = ws; + ws.addEventListener("open", () => this.queueConnect(ws, generation)); + ws.addEventListener("message", (ev) => { + if (!this.isActiveSocket(ws, generation)) { + return; + } + this.handleMessage(ws, generation, String(ev.data ?? "")); + }); + ws.addEventListener("close", (ev) => { + if (this.ws !== ws) { + return; + } const reason = ev.reason ?? ""; const connectError = this.pendingConnectError; this.pendingConnectError = undefined; @@ -350,7 +361,7 @@ export class GatewayBrowserClient { this.scheduleReconnect(); } }); - this.ws.addEventListener("error", () => { + ws.addEventListener("error", () => { // ignored; close handler will fire }); } @@ -400,7 +411,7 @@ export class GatewayBrowserClient { }; } - private async buildConnectPlan(): Promise { + private async buildConnectPlan(connectNonce: string | null): Promise { const role = CONTROL_UI_OPERATOR_ROLE; const scopes = [...CONTROL_UI_OPERATOR_SCOPES]; const client = this.buildConnectClient(); @@ -424,9 +435,6 @@ export class GatewayBrowserClient { role, deviceId: deviceIdentity.deviceId, }); - if (this.pendingDeviceTokenRetry && selectedAuth.authDeviceToken) { - this.pendingDeviceTokenRetry = false; - } } return { @@ -443,12 +451,20 @@ export class GatewayBrowserClient { role, scopes, authToken: selectedAuth.authToken, - connectNonce: this.connectNonce, + connectNonce, }), }; } - private handleConnectHello(hello: GatewayHelloOk, plan: ConnectPlan) { + private handleConnectHello( + hello: GatewayHelloOk, + plan: ConnectPlan, + ws: WebSocket, + generation: number, + ) { + if (!this.isActiveSocket(ws, generation)) { + return; + } this.pendingDeviceTokenRetry = false; this.deviceTokenRetryBudgetUsed = false; if (hello?.auth?.deviceToken && plan.deviceIdentity) { @@ -463,7 +479,10 @@ export class GatewayBrowserClient { this.opts.onHello?.(hello); } - private handleConnectFailure(err: unknown, plan: ConnectPlan) { + private handleConnectFailure(err: unknown, plan: ConnectPlan, ws: WebSocket, generation: number) { + if (!this.isActiveSocket(ws, generation)) { + return; + } const connectErrorCode = err instanceof GatewayRequestError ? resolveGatewayErrorDetailCode(err) : null; const recoveryAdvice = @@ -507,23 +526,36 @@ export class GatewayBrowserClient { ) { clearDeviceAuthToken({ deviceId: plan.deviceIdentity.deviceId, role: plan.role }); } - this.ws?.close(CONNECT_FAILED_CLOSE_CODE, "connect failed"); + ws.close(CONNECT_FAILED_CLOSE_CODE, "connect failed"); } - private async sendConnect() { + private isActiveSocket(ws: WebSocket, generation: number): boolean { + return !this.closed && this.ws === ws && this.connectGeneration === generation; + } + + private async sendConnect(ws: WebSocket, generation: number) { + if (!this.isActiveSocket(ws, generation) || ws.readyState !== WebSocket.OPEN) { + return; + } if (this.connectSent) { return; } this.connectSent = true; this.clearConnectTimer(); - const plan = await this.buildConnectPlan(); - void this.request("connect", this.buildConnectParams(plan)) - .then((hello) => this.handleConnectHello(hello, plan)) - .catch((err: unknown) => this.handleConnectFailure(err, plan)); + const plan = await this.buildConnectPlan(this.connectNonce); + if (!this.isActiveSocket(ws, generation) || ws.readyState !== WebSocket.OPEN) { + return; + } + if (this.pendingDeviceTokenRetry && plan.selectedAuth.authDeviceToken) { + this.pendingDeviceTokenRetry = false; + } + void this.requestOnSocket(ws, "connect", this.buildConnectParams(plan)) + .then((hello) => this.handleConnectHello(hello, plan, ws, generation)) + .catch((err: unknown) => this.handleConnectFailure(err, plan, ws, generation)); } - private handleMessage(raw: string) { + private handleMessage(ws: WebSocket, generation: number, raw: string) { let parsed: unknown; try { parsed = JSON.parse(raw); @@ -539,7 +571,7 @@ export class GatewayBrowserClient { const nonce = payload && typeof payload.nonce === "string" ? payload.nonce : null; if (nonce) { this.connectNonce = nonce; - void this.sendConnect(); + void this.sendConnect(ws, generation); } return; } @@ -622,12 +654,23 @@ export class GatewayBrowserClient { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { return Promise.reject(new Error("gateway not connected")); } + return this.requestOnSocket(this.ws, method, params); + } + + private requestOnSocket( + ws: WebSocket, + method: string, + params?: unknown, + ): Promise { + if (this.ws !== ws || ws.readyState !== WebSocket.OPEN) { + return Promise.reject(new Error("gateway not connected")); + } const id = generateUUID(); const frame = { type: "req", id, method, params }; const p = new Promise((resolve, reject) => { this.pending.set(id, { resolve: (v) => resolve(v as T), reject }); }); - this.ws.send(JSON.stringify(frame)); + ws.send(JSON.stringify(frame)); return p; } @@ -638,13 +681,16 @@ export class GatewayBrowserClient { }; } - private queueConnect() { + private queueConnect(ws: WebSocket, generation: number) { + if (!this.isActiveSocket(ws, generation)) { + return; + } this.connectNonce = null; this.connectSent = false; this.clearConnectTimer(); this.connectTimer = window.setTimeout(() => { this.connectTimer = null; - void this.sendConnect(); + void this.sendConnect(ws, generation); }, 750); }