Gateway: add request timeouts to client RPCs

This commit is contained in:
Vincent Koc
2026-03-07 09:42:15 -08:00
parent f03f305ade
commit 7f9eaed281
2 changed files with 70 additions and 4 deletions

View File

@@ -19,11 +19,14 @@ type WsEventHandlers = {
}; };
class MockWebSocket { class MockWebSocket {
static readonly OPEN = 1;
static readonly CLOSED = 3;
private openHandlers: WsEventHandlers["open"][] = []; private openHandlers: WsEventHandlers["open"][] = [];
private messageHandlers: WsEventHandlers["message"][] = []; private messageHandlers: WsEventHandlers["message"][] = [];
private closeHandlers: WsEventHandlers["close"][] = []; private closeHandlers: WsEventHandlers["close"][] = [];
private errorHandlers: WsEventHandlers["error"][] = []; private errorHandlers: WsEventHandlers["error"][] = [];
readonly sent: string[] = []; readonly sent: string[] = [];
readyState = MockWebSocket.CLOSED;
constructor(_url: string, _options?: unknown) { constructor(_url: string, _options?: unknown) {
wsInstances.push(this); wsInstances.push(this);
@@ -59,6 +62,7 @@ class MockWebSocket {
} }
emitOpen(): void { emitOpen(): void {
this.readyState = MockWebSocket.OPEN;
for (const handler of this.openHandlers) { for (const handler of this.openHandlers) {
handler(); handler();
} }
@@ -71,6 +75,7 @@ class MockWebSocket {
} }
emitClose(code: number, reason: string): void { emitClose(code: number, reason: string): void {
this.readyState = MockWebSocket.CLOSED;
for (const handler of this.closeHandlers) { for (const handler of this.closeHandlers) {
handler(code, Buffer.from(reason)); handler(code, Buffer.from(reason));
} }
@@ -438,4 +443,30 @@ describe("GatewayClient connect auth payload", () => {
}); });
client.stop(); client.stop();
}); });
it("times out pending requests and cleans them up", async () => {
vi.useFakeTimers();
try {
const client = new GatewayClient({
url: "ws://127.0.0.1:18789",
requestTimeoutMs: 25,
});
client.start();
const ws = getLatestWs();
ws.emitOpen();
const pending = client.request("health.check");
const observed = pending.catch((err) => err);
await vi.advanceTimersByTimeAsync(25);
await expect(observed).resolves.toMatchObject({
message: "gateway request timeout for health.check",
});
expect((client as unknown as { pending: Map<string, unknown> }).pending.size).toBe(0);
client.stop();
} finally {
vi.useRealTimers();
}
});
}); });

View File

@@ -39,12 +39,14 @@ type Pending = {
resolve: (value: unknown) => void; resolve: (value: unknown) => void;
reject: (err: unknown) => void; reject: (err: unknown) => void;
expectFinal: boolean; expectFinal: boolean;
cleanup?: () => void;
}; };
export type GatewayClientOptions = { export type GatewayClientOptions = {
url?: string; // ws://127.0.0.1:18789 url?: string; // ws://127.0.0.1:18789
connectDelayMs?: number; connectDelayMs?: number;
tickWatchMinIntervalMs?: number; tickWatchMinIntervalMs?: number;
requestTimeoutMs?: number;
token?: string; token?: string;
deviceToken?: string; deviceToken?: string;
password?: string; password?: string;
@@ -442,6 +444,7 @@ export class GatewayClient {
private flushPendingErrors(err: Error) { private flushPendingErrors(err: Error) {
for (const [, p] of this.pending) { for (const [, p] of this.pending) {
p.cleanup?.();
p.reject(err); p.reject(err);
} }
this.pending.clear(); this.pending.clear();
@@ -501,7 +504,7 @@ export class GatewayClient {
async request<T = Record<string, unknown>>( async request<T = Record<string, unknown>>(
method: string, method: string,
params?: unknown, params?: unknown,
opts?: { expectFinal?: boolean }, opts?: { expectFinal?: boolean; timeoutMs?: number },
): Promise<T> { ): Promise<T> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("gateway not connected"); throw new Error("gateway not connected");
@@ -514,14 +517,46 @@ export class GatewayClient {
); );
} }
const expectFinal = opts?.expectFinal === true; const expectFinal = opts?.expectFinal === true;
const rawTimeoutMs = opts?.timeoutMs ?? this.opts.requestTimeoutMs;
const timeoutMs =
typeof rawTimeoutMs === "number" && Number.isFinite(rawTimeoutMs)
? Math.max(1, Math.min(300_000, rawTimeoutMs))
: 30_000;
const p = new Promise<T>((resolve, reject) => { const p = new Promise<T>((resolve, reject) => {
let timeout: NodeJS.Timeout | null = setTimeout(() => {
timeout = null;
this.pending.delete(id);
reject(new Error(`gateway request timeout for ${method}`));
}, timeoutMs);
timeout.unref?.();
const cleanup = () => {
if (!timeout) {
return;
}
clearTimeout(timeout);
timeout = null;
};
this.pending.set(id, { this.pending.set(id, {
resolve: (value) => resolve(value as T), resolve: (value) => {
reject, cleanup();
resolve(value as T);
},
reject: (err) => {
cleanup();
reject(err);
},
expectFinal, expectFinal,
cleanup,
}); });
}); });
this.ws.send(JSON.stringify(frame)); try {
this.ws.send(JSON.stringify(frame));
} catch (err) {
const pending = this.pending.get(id);
pending?.cleanup?.();
this.pending.delete(id);
throw err;
}
return p; return p;
} }
} }