mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 06:40:44 +00:00
fix(gateway): drop stale webchat handshakes
This commit is contained in:
@@ -22,6 +22,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Agents/reasoning: recover fully wrapped unclosed `<think>` replies that would otherwise sanitize to empty text while keeping strict stripping for closed reasoning blocks and unclosed tails after visible text. Fixes #37696; supersedes #51915. Thanks @druide67 and @okuyam2y.
|
||||
- Control UI/Gateway: bind WebChat handshakes to their active socket and reject post-close server registrations, so aborted connects no longer leave zombie clients or misleading duplicate WebSocket connection logs. Fixes #72753. Thanks @LumenFromTheFuture.
|
||||
- Plugins/Windows: normalize Windows absolute paths before handing bundled plugin modules to Jiti, so Feishu/Lark message sending no longer fails with unsupported `c:` ESM loader URLs. Fixes #72783. Thanks @jackychen-png.
|
||||
- CLI/doctor: run bundled plugin runtime-dependency repairs through the async npm installer with spinner/line progress and heartbeat updates, so long `openclaw doctor --fix` installs no longer look hung in TTY or piped output. Fixes #72775. Thanks @dfpalhano.
|
||||
- Feishu/Windows: normalize bundled channel sidecar loads before Jiti evaluates them, so Feishu outbound sends no longer fail with raw `C:` ESM loader errors on Windows. Fixes #72783. Thanks @jackychen-png.
|
||||
|
||||
@@ -100,4 +100,69 @@ describe("attachGatewayWsConnectionHandler", () => {
|
||||
resolveSharedGatewaySessionGeneration(currentAuth),
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects late client registration after a pre-connect socket close", () => {
|
||||
const listeners = new Map<string, (...args: unknown[]) => void>();
|
||||
const wss = {
|
||||
on: vi.fn((event: string, handler: (...args: unknown[]) => void) => {
|
||||
listeners.set(event, handler);
|
||||
}),
|
||||
} as unknown as WebSocketServer;
|
||||
const socket = Object.assign(new EventEmitter(), {
|
||||
_socket: {
|
||||
remoteAddress: "127.0.0.1",
|
||||
remotePort: 1234,
|
||||
localAddress: "127.0.0.1",
|
||||
localPort: 5678,
|
||||
},
|
||||
send: vi.fn(),
|
||||
close: vi.fn(),
|
||||
});
|
||||
const upgradeReq = {
|
||||
headers: { host: "127.0.0.1:19001" },
|
||||
socket: { localAddress: "127.0.0.1" },
|
||||
};
|
||||
const clients = new Set();
|
||||
|
||||
attachGatewayWsConnectionHandler({
|
||||
wss,
|
||||
clients: clients as never,
|
||||
preauthConnectionBudget: { release: vi.fn() } as never,
|
||||
port: 19001,
|
||||
canvasHostEnabled: false,
|
||||
resolvedAuth: createResolvedAuth("token"),
|
||||
gatewayMethods: [],
|
||||
events: [],
|
||||
logGateway: createLogger() as never,
|
||||
logHealth: createLogger() as never,
|
||||
logWsControl: createLogger() as never,
|
||||
extraHandlers: {},
|
||||
broadcast: vi.fn(),
|
||||
buildRequestContext: () =>
|
||||
({
|
||||
unsubscribeAllSessionEvents: vi.fn(),
|
||||
nodeRegistry: { unregister: vi.fn() },
|
||||
nodeUnsubscribeAll: vi.fn(),
|
||||
}) as never,
|
||||
});
|
||||
|
||||
const onConnection = listeners.get("connection");
|
||||
expect(onConnection).toBeTypeOf("function");
|
||||
onConnection?.(socket, upgradeReq);
|
||||
|
||||
const passed = attachGatewayWsMessageHandlerMock.mock.calls[0]?.[0] as {
|
||||
setClient: (client: unknown) => boolean;
|
||||
};
|
||||
socket.emit("close", 1001, Buffer.from("client left"));
|
||||
|
||||
const registered = passed.setClient({
|
||||
socket,
|
||||
connect: { client: { id: "openclaw-control-ui", mode: "webchat" } },
|
||||
connId: "late-client",
|
||||
usesSharedGatewayAuth: false,
|
||||
});
|
||||
|
||||
expect(registered).toBe(false);
|
||||
expect(clients.size).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -408,9 +408,13 @@ export function attachGatewayWsConnectionHandler(params: AttachGatewayWsConnecti
|
||||
clearHandshakeTimer: () => clearTimeout(handshakeTimer),
|
||||
getClient: () => client,
|
||||
setClient: (next) => {
|
||||
if (closed) {
|
||||
return false;
|
||||
}
|
||||
releasePreauthBudget();
|
||||
client = next;
|
||||
clients.add(next);
|
||||
return true;
|
||||
},
|
||||
setHandshakeState: (next) => {
|
||||
handshakeState = next;
|
||||
|
||||
@@ -201,7 +201,7 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
isClosed: () => boolean;
|
||||
clearHandshakeTimer: () => void;
|
||||
getClient: () => GatewayWsClient | null;
|
||||
setClient: (next: GatewayWsClient) => void;
|
||||
setClient: (next: GatewayWsClient) => boolean;
|
||||
setHandshakeState: (state: "pending" | "connected" | "failed") => void;
|
||||
setCloseCause: (cause: string, meta?: Record<string, unknown>) => void;
|
||||
setLastFrameMeta: (meta: { type?: string; method?: string; id?: string }) => void;
|
||||
@@ -1266,39 +1266,12 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
const instanceId = connectParams.client.instanceId;
|
||||
const presenceKey = shouldTrackPresence ? (device?.id ?? instanceId ?? connId) : undefined;
|
||||
|
||||
logWs("in", "connect", {
|
||||
connId,
|
||||
client: connectParams.client.id,
|
||||
clientDisplayName: connectParams.client.displayName,
|
||||
version: connectParams.client.version,
|
||||
mode: connectParams.client.mode,
|
||||
clientId,
|
||||
platform: connectParams.client.platform,
|
||||
auth: authMethod,
|
||||
});
|
||||
|
||||
if (isWebchatConnect(connectParams)) {
|
||||
logWsControl.info(
|
||||
`webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${clientLabel} ${connectParams.client.mode} v${connectParams.client.version}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (presenceKey) {
|
||||
upsertPresence(presenceKey, {
|
||||
host: connectParams.client.displayName ?? connectParams.client.id ?? os.hostname(),
|
||||
ip: isLocalClient ? undefined : reportedClientIp,
|
||||
version: connectParams.client.version,
|
||||
platform: connectParams.client.platform,
|
||||
deviceFamily: connectParams.client.deviceFamily,
|
||||
modelIdentifier: connectParams.client.modelIdentifier,
|
||||
mode: connectParams.client.mode,
|
||||
deviceId: device?.id,
|
||||
roles: [role],
|
||||
scopes,
|
||||
instanceId: device?.id ?? instanceId,
|
||||
reason: "connect",
|
||||
if (isClosed()) {
|
||||
setCloseCause("connect-aborted-before-register", {
|
||||
...clientMeta,
|
||||
auth: authMethod,
|
||||
});
|
||||
incrementPresenceVersion();
|
||||
return;
|
||||
}
|
||||
|
||||
const snapshot = buildGatewaySnapshot({
|
||||
@@ -1367,8 +1340,48 @@ export function attachGatewayWsMessageHandler(params: {
|
||||
canvasCapabilityExpiresAtMs,
|
||||
};
|
||||
setSocketMaxPayload(socket, MAX_PAYLOAD_BYTES);
|
||||
setClient(nextClient);
|
||||
if (!setClient(nextClient)) {
|
||||
setCloseCause("connect-aborted-before-register", {
|
||||
...clientMeta,
|
||||
auth: authMethod,
|
||||
});
|
||||
return;
|
||||
}
|
||||
setHandshakeState("connected");
|
||||
logWs("in", "connect", {
|
||||
connId,
|
||||
client: connectParams.client.id,
|
||||
clientDisplayName: connectParams.client.displayName,
|
||||
version: connectParams.client.version,
|
||||
mode: connectParams.client.mode,
|
||||
clientId,
|
||||
platform: connectParams.client.platform,
|
||||
auth: authMethod,
|
||||
});
|
||||
|
||||
if (isWebchatConnect(connectParams)) {
|
||||
logWsControl.info(
|
||||
`webchat connected conn=${connId} remote=${remoteAddr ?? "?"} client=${clientLabel} ${connectParams.client.mode} v${connectParams.client.version}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (presenceKey) {
|
||||
upsertPresence(presenceKey, {
|
||||
host: connectParams.client.displayName ?? connectParams.client.id ?? os.hostname(),
|
||||
ip: isLocalClient ? undefined : reportedClientIp,
|
||||
version: connectParams.client.version,
|
||||
platform: connectParams.client.platform,
|
||||
deviceFamily: connectParams.client.deviceFamily,
|
||||
modelIdentifier: connectParams.client.modelIdentifier,
|
||||
mode: connectParams.client.mode,
|
||||
deviceId: device?.id,
|
||||
roles: [role],
|
||||
scopes,
|
||||
instanceId: device?.id ?? instanceId,
|
||||
reason: "connect",
|
||||
});
|
||||
incrementPresenceVersion();
|
||||
}
|
||||
if (role === "node") {
|
||||
const context = buildRequestContext();
|
||||
const nodeSession = context.nodeRegistry.register(nextClient, {
|
||||
|
||||
@@ -417,6 +417,56 @@ describe("GatewayBrowserClient", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("does not send stale connect frames on a replacement socket", async () => {
|
||||
vi.useFakeTimers();
|
||||
let resolveIdentity!: (identity: DeviceIdentity) => void;
|
||||
loadOrCreateDeviceIdentityMock.mockImplementationOnce(
|
||||
() =>
|
||||
new Promise<DeviceIdentity>((resolve) => {
|
||||
resolveIdentity = resolve;
|
||||
}),
|
||||
);
|
||||
|
||||
const client = new GatewayBrowserClient({
|
||||
url: "ws://127.0.0.1:18789",
|
||||
token: "shared-auth-token",
|
||||
});
|
||||
|
||||
client.start();
|
||||
const firstWs = getLatestWebSocket();
|
||||
firstWs.emitOpen();
|
||||
firstWs.emitMessage({
|
||||
type: "event",
|
||||
event: "connect.challenge",
|
||||
payload: { nonce: "nonce-stale" },
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
expect(firstWs.sent).toHaveLength(0);
|
||||
|
||||
firstWs.emitClose(1006, "socket lost");
|
||||
await vi.advanceTimersByTimeAsync(800);
|
||||
const secondWs = getLatestWebSocket();
|
||||
expect(secondWs).not.toBe(firstWs);
|
||||
|
||||
resolveIdentity({
|
||||
deviceId: "device-1",
|
||||
privateKey: "private-key", // pragma: allowlist secret
|
||||
publicKey: "public-key", // pragma: allowlist secret
|
||||
});
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
await Promise.resolve();
|
||||
|
||||
expect(secondWs.sent).toHaveLength(0);
|
||||
|
||||
const { connectFrame } = await continueConnect(secondWs, "nonce-current");
|
||||
expect(connectFrame.method).toBe("connect");
|
||||
const signedPayload = signDevicePayloadMock.mock.calls.at(-1)?.[1];
|
||||
expect(signedPayload).toContain("|shared-auth-token|nonce-current");
|
||||
|
||||
client.stop();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("cancels a scheduled reconnect when stopped before the retry fires", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
|
||||
@@ -296,6 +296,7 @@ export class GatewayBrowserClient {
|
||||
private connectNonce: string | null = null;
|
||||
private connectSent = false;
|
||||
private connectTimer: number | null = null;
|
||||
private connectGeneration = 0;
|
||||
private backoffMs = 800;
|
||||
private pendingConnectError: GatewayErrorInfo | undefined;
|
||||
private pendingDeviceTokenRetry = false;
|
||||
@@ -328,10 +329,20 @@ export class GatewayBrowserClient {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
this.ws = new WebSocket(this.opts.url);
|
||||
this.ws.addEventListener("open", () => this.queueConnect());
|
||||
this.ws.addEventListener("message", (ev) => this.handleMessage(String(ev.data ?? "")));
|
||||
this.ws.addEventListener("close", (ev) => {
|
||||
const ws = new WebSocket(this.opts.url);
|
||||
const generation = ++this.connectGeneration;
|
||||
this.ws = ws;
|
||||
ws.addEventListener("open", () => this.queueConnect(ws, generation));
|
||||
ws.addEventListener("message", (ev) => {
|
||||
if (!this.isActiveSocket(ws, generation)) {
|
||||
return;
|
||||
}
|
||||
this.handleMessage(ws, generation, String(ev.data ?? ""));
|
||||
});
|
||||
ws.addEventListener("close", (ev) => {
|
||||
if (this.ws !== ws) {
|
||||
return;
|
||||
}
|
||||
const reason = ev.reason ?? "";
|
||||
const connectError = this.pendingConnectError;
|
||||
this.pendingConnectError = undefined;
|
||||
@@ -350,7 +361,7 @@ export class GatewayBrowserClient {
|
||||
this.scheduleReconnect();
|
||||
}
|
||||
});
|
||||
this.ws.addEventListener("error", () => {
|
||||
ws.addEventListener("error", () => {
|
||||
// ignored; close handler will fire
|
||||
});
|
||||
}
|
||||
@@ -400,7 +411,7 @@ export class GatewayBrowserClient {
|
||||
};
|
||||
}
|
||||
|
||||
private async buildConnectPlan(): Promise<ConnectPlan> {
|
||||
private async buildConnectPlan(connectNonce: string | null): Promise<ConnectPlan> {
|
||||
const role = CONTROL_UI_OPERATOR_ROLE;
|
||||
const scopes = [...CONTROL_UI_OPERATOR_SCOPES];
|
||||
const client = this.buildConnectClient();
|
||||
@@ -424,9 +435,6 @@ export class GatewayBrowserClient {
|
||||
role,
|
||||
deviceId: deviceIdentity.deviceId,
|
||||
});
|
||||
if (this.pendingDeviceTokenRetry && selectedAuth.authDeviceToken) {
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
@@ -443,12 +451,20 @@ export class GatewayBrowserClient {
|
||||
role,
|
||||
scopes,
|
||||
authToken: selectedAuth.authToken,
|
||||
connectNonce: this.connectNonce,
|
||||
connectNonce,
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
private handleConnectHello(hello: GatewayHelloOk, plan: ConnectPlan) {
|
||||
private handleConnectHello(
|
||||
hello: GatewayHelloOk,
|
||||
plan: ConnectPlan,
|
||||
ws: WebSocket,
|
||||
generation: number,
|
||||
) {
|
||||
if (!this.isActiveSocket(ws, generation)) {
|
||||
return;
|
||||
}
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
this.deviceTokenRetryBudgetUsed = false;
|
||||
if (hello?.auth?.deviceToken && plan.deviceIdentity) {
|
||||
@@ -463,7 +479,10 @@ export class GatewayBrowserClient {
|
||||
this.opts.onHello?.(hello);
|
||||
}
|
||||
|
||||
private handleConnectFailure(err: unknown, plan: ConnectPlan) {
|
||||
private handleConnectFailure(err: unknown, plan: ConnectPlan, ws: WebSocket, generation: number) {
|
||||
if (!this.isActiveSocket(ws, generation)) {
|
||||
return;
|
||||
}
|
||||
const connectErrorCode =
|
||||
err instanceof GatewayRequestError ? resolveGatewayErrorDetailCode(err) : null;
|
||||
const recoveryAdvice =
|
||||
@@ -507,23 +526,36 @@ export class GatewayBrowserClient {
|
||||
) {
|
||||
clearDeviceAuthToken({ deviceId: plan.deviceIdentity.deviceId, role: plan.role });
|
||||
}
|
||||
this.ws?.close(CONNECT_FAILED_CLOSE_CODE, "connect failed");
|
||||
ws.close(CONNECT_FAILED_CLOSE_CODE, "connect failed");
|
||||
}
|
||||
|
||||
private async sendConnect() {
|
||||
private isActiveSocket(ws: WebSocket, generation: number): boolean {
|
||||
return !this.closed && this.ws === ws && this.connectGeneration === generation;
|
||||
}
|
||||
|
||||
private async sendConnect(ws: WebSocket, generation: number) {
|
||||
if (!this.isActiveSocket(ws, generation) || ws.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
if (this.connectSent) {
|
||||
return;
|
||||
}
|
||||
this.connectSent = true;
|
||||
this.clearConnectTimer();
|
||||
|
||||
const plan = await this.buildConnectPlan();
|
||||
void this.request<GatewayHelloOk>("connect", this.buildConnectParams(plan))
|
||||
.then((hello) => this.handleConnectHello(hello, plan))
|
||||
.catch((err: unknown) => this.handleConnectFailure(err, plan));
|
||||
const plan = await this.buildConnectPlan(this.connectNonce);
|
||||
if (!this.isActiveSocket(ws, generation) || ws.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
if (this.pendingDeviceTokenRetry && plan.selectedAuth.authDeviceToken) {
|
||||
this.pendingDeviceTokenRetry = false;
|
||||
}
|
||||
void this.requestOnSocket<GatewayHelloOk>(ws, "connect", this.buildConnectParams(plan))
|
||||
.then((hello) => this.handleConnectHello(hello, plan, ws, generation))
|
||||
.catch((err: unknown) => this.handleConnectFailure(err, plan, ws, generation));
|
||||
}
|
||||
|
||||
private handleMessage(raw: string) {
|
||||
private handleMessage(ws: WebSocket, generation: number, raw: string) {
|
||||
let parsed: unknown;
|
||||
try {
|
||||
parsed = JSON.parse(raw);
|
||||
@@ -539,7 +571,7 @@ export class GatewayBrowserClient {
|
||||
const nonce = payload && typeof payload.nonce === "string" ? payload.nonce : null;
|
||||
if (nonce) {
|
||||
this.connectNonce = nonce;
|
||||
void this.sendConnect();
|
||||
void this.sendConnect(ws, generation);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -622,12 +654,23 @@ export class GatewayBrowserClient {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
return Promise.reject(new Error("gateway not connected"));
|
||||
}
|
||||
return this.requestOnSocket(this.ws, method, params);
|
||||
}
|
||||
|
||||
private requestOnSocket<T = unknown>(
|
||||
ws: WebSocket,
|
||||
method: string,
|
||||
params?: unknown,
|
||||
): Promise<T> {
|
||||
if (this.ws !== ws || ws.readyState !== WebSocket.OPEN) {
|
||||
return Promise.reject(new Error("gateway not connected"));
|
||||
}
|
||||
const id = generateUUID();
|
||||
const frame = { type: "req", id, method, params };
|
||||
const p = new Promise<T>((resolve, reject) => {
|
||||
this.pending.set(id, { resolve: (v) => resolve(v as T), reject });
|
||||
});
|
||||
this.ws.send(JSON.stringify(frame));
|
||||
ws.send(JSON.stringify(frame));
|
||||
return p;
|
||||
}
|
||||
|
||||
@@ -638,13 +681,16 @@ export class GatewayBrowserClient {
|
||||
};
|
||||
}
|
||||
|
||||
private queueConnect() {
|
||||
private queueConnect(ws: WebSocket, generation: number) {
|
||||
if (!this.isActiveSocket(ws, generation)) {
|
||||
return;
|
||||
}
|
||||
this.connectNonce = null;
|
||||
this.connectSent = false;
|
||||
this.clearConnectTimer();
|
||||
this.connectTimer = window.setTimeout(() => {
|
||||
this.connectTimer = null;
|
||||
void this.sendConnect();
|
||||
void this.sendConnect(ws, generation);
|
||||
}, 750);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user