mirror of
https://github.com/openclaw/openclaw.git
synced 2026-06-03 14:44:05 +00:00
fix(dev): reject closed gateway websocket calls
This commit is contained in:
@@ -65,8 +65,20 @@ export function createGatewayWsClient(params: {
|
||||
}
|
||||
>();
|
||||
|
||||
const rejectPending = (error: Error) => {
|
||||
for (const waiter of pending.values()) {
|
||||
clearTimeout(waiter.timeout);
|
||||
waiter.reject(error);
|
||||
}
|
||||
pending.clear();
|
||||
};
|
||||
|
||||
const request = (method: string, paramsObj?: unknown, timeoutMs = 12_000) =>
|
||||
new Promise<GatewayResFrame>((resolve, reject) => {
|
||||
if (ws.readyState !== WebSocket.OPEN) {
|
||||
reject(new Error(`gateway websocket is not open for ${method}`));
|
||||
return;
|
||||
}
|
||||
const id = randomUUID();
|
||||
const frame: GatewayReqFrame = { type: "req", id, method, params: paramsObj };
|
||||
const timeout = setTimeout(() => {
|
||||
@@ -74,7 +86,24 @@ export function createGatewayWsClient(params: {
|
||||
reject(new Error(`timeout waiting for ${method}`));
|
||||
}, timeoutMs);
|
||||
pending.set(id, { resolve, reject, timeout });
|
||||
ws.send(JSON.stringify(frame));
|
||||
try {
|
||||
ws.send(JSON.stringify(frame), (err) => {
|
||||
if (!err) {
|
||||
return;
|
||||
}
|
||||
const waiter = pending.get(id);
|
||||
if (!waiter) {
|
||||
return;
|
||||
}
|
||||
pending.delete(id);
|
||||
clearTimeout(waiter.timeout);
|
||||
waiter.reject(err instanceof Error ? err : new Error(String(err)));
|
||||
});
|
||||
} catch (err) {
|
||||
pending.delete(id);
|
||||
clearTimeout(timeout);
|
||||
reject(err instanceof Error ? err : new Error(String(err)));
|
||||
}
|
||||
});
|
||||
|
||||
const waitOpen = () =>
|
||||
@@ -119,12 +148,16 @@ export function createGatewayWsClient(params: {
|
||||
params.onEvent?.(evt);
|
||||
}
|
||||
});
|
||||
ws.on("close", (code, reason) => {
|
||||
const suffix = reason.length > 0 ? `: ${reason.toString("utf8")}` : "";
|
||||
rejectPending(new Error(`gateway websocket closed (${code})${suffix}`));
|
||||
});
|
||||
ws.on("error", (err) => {
|
||||
rejectPending(err instanceof Error ? err : new Error(String(err)));
|
||||
});
|
||||
|
||||
const close = () => {
|
||||
for (const waiter of pending.values()) {
|
||||
clearTimeout(waiter.timeout);
|
||||
}
|
||||
pending.clear();
|
||||
rejectPending(new Error("gateway websocket client closed"));
|
||||
ws.close();
|
||||
};
|
||||
|
||||
|
||||
73
test/scripts/gateway-ws-client.test.ts
Normal file
73
test/scripts/gateway-ws-client.test.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { createServer, type Server } from "node:http";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
import { WebSocketServer, type WebSocket } from "ws";
|
||||
import { createGatewayWsClient } from "../../scripts/dev/gateway-ws-client.js";
|
||||
|
||||
let server: Server | undefined;
|
||||
let wss: WebSocketServer | undefined;
|
||||
|
||||
afterEach(async () => {
|
||||
await new Promise<void>((resolve) => {
|
||||
wss?.close(() => resolve());
|
||||
if (!wss) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
wss = undefined;
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
server?.close((error) => {
|
||||
if (error) {
|
||||
reject(error);
|
||||
return;
|
||||
}
|
||||
resolve();
|
||||
});
|
||||
if (!server) {
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
server = undefined;
|
||||
});
|
||||
|
||||
async function listen(handler: (ws: WebSocket) => void): Promise<string> {
|
||||
server = createServer();
|
||||
wss = new WebSocketServer({ server });
|
||||
wss.on("connection", handler);
|
||||
await new Promise<void>((resolve) => {
|
||||
server?.listen(0, "127.0.0.1", resolve);
|
||||
});
|
||||
const address = server.address();
|
||||
if (!address || typeof address === "string") {
|
||||
throw new Error("test websocket server did not get a TCP address");
|
||||
}
|
||||
return `ws://127.0.0.1:${address.port}`;
|
||||
}
|
||||
|
||||
describe("createGatewayWsClient", () => {
|
||||
it("rejects pending RPC requests when the client closes", async () => {
|
||||
const url = await listen(() => {});
|
||||
const client = createGatewayWsClient({ url });
|
||||
await client.waitOpen();
|
||||
|
||||
const pending = client.request("health", {}, 1000);
|
||||
client.close();
|
||||
|
||||
await expect(pending).rejects.toThrow("gateway websocket client closed");
|
||||
});
|
||||
|
||||
it("rejects pending RPC requests when the gateway closes the socket", async () => {
|
||||
const url = await listen((ws) => {
|
||||
ws.on("message", () => {
|
||||
ws.close(1011, "boom");
|
||||
});
|
||||
});
|
||||
const client = createGatewayWsClient({ url });
|
||||
await client.waitOpen();
|
||||
|
||||
await expect(client.request("health", {}, 1000)).rejects.toThrow(
|
||||
"gateway websocket closed (1011): boom",
|
||||
);
|
||||
client.close();
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user