From 37cd82913fa361fb976f66436264ec1049b21271 Mon Sep 17 00:00:00 2001 From: Gio Della-Libera Date: Sat, 16 May 2026 10:19:35 -0700 Subject: [PATCH] fix(discord): bind delayed identify to socket generation (#82225) * fix(discord): bind delayed identify to socket generation * chore: refresh CI after main repairs --- .../discord/src/internal/gateway.test.ts | 51 +++++++++++++++++-- extensions/discord/src/internal/gateway.ts | 25 +++++---- 2 files changed, 62 insertions(+), 14 deletions(-) diff --git a/extensions/discord/src/internal/gateway.test.ts b/extensions/discord/src/internal/gateway.test.ts index 86a34ed42c9..c845cd5eb29 100644 --- a/extensions/discord/src/internal/gateway.test.ts +++ b/extensions/discord/src/internal/gateway.test.ts @@ -180,15 +180,60 @@ describe("GatewayPlugin", () => { } await vi.advanceTimersByTimeAsync(5_000); - expect(errorSpy).toHaveBeenCalledWith( - new Error("Discord gateway socket closed before IDENTIFY could be sent"), - ); + expect(errorSpy).not.toHaveBeenCalled(); await vi.advanceTimersByTimeAsync(2_000); expect(gateway.connectCalls).toEqual([false, false]); expect(gateway.sockets).toHaveLength(2); }); + it("does not identify a replacement socket from a stale HELLO", async () => { + vi.useFakeTimers(); + vi.setSystemTime(0); + await sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }); + const gateway = new TestGatewayPlugin({ + autoInteractions: false, + url: "wss://gateway.example.test", + }); + + gateway.connect(false); + const originalSocket = gateway.sockets[0]; + originalSocket?.emit("open"); + originalSocket?.emit( + "message", + JSON.stringify({ + op: GatewayOpcodes.Hello, + d: { heartbeat_interval: 45_000 }, + s: null, + }), + ); + originalSocket?.emit("close", 1006); + + await vi.advanceTimersByTimeAsync(2_000); + expect(gateway.connectCalls).toEqual([false, true]); + const replacementSocket = gateway.sockets[1]; + replacementSocket?.emit("open"); + + await vi.advanceTimersByTimeAsync(3_000); + expect(replacementSocket?.send).not.toHaveBeenCalledWith( + expect.stringContaining(`"op":${GatewayOpcodes.Identify}`), + ); + + replacementSocket?.emit( + "message", + JSON.stringify({ + op: GatewayOpcodes.Hello, + d: { heartbeat_interval: 45_000 }, + s: null, + }), + ); + + await vi.advanceTimersByTimeAsync(5_000); + expect(replacementSocket?.send).toHaveBeenCalledWith( + expect.stringContaining(`"op":${GatewayOpcodes.Identify}`), + ); + }); + it("preserves MESSAGE_CREATE author payloads for inbound dispatch", async () => { const gateway = new GatewayPlugin({ autoInteractions: false }); const dispatchGatewayEvent = vi.fn(async (_event: string, _data: unknown) => {}); diff --git a/extensions/discord/src/internal/gateway.ts b/extensions/discord/src/internal/gateway.ts index 666a00e2d63..c5a6305212e 100644 --- a/extensions/discord/src/internal/gateway.ts +++ b/extensions/discord/src/internal/gateway.ts @@ -195,7 +195,7 @@ export class GatewayPlugin extends Plugin { this.emitter.emit("error", new Error("Invalid gateway payload")); return; } - this.handlePayload(payload, resume); + this.handlePayload(payload, resume, socket); }); socket.on("close", (code) => { if (socket !== this.ws) { @@ -229,7 +229,11 @@ export class GatewayPlugin extends Plugin { }); } - private handlePayload(payload: GatewayReceivePayload, resume: boolean): void { + private handlePayload( + payload: GatewayReceivePayload, + resume: boolean, + sourceSocket?: ws.WebSocket, + ): void { if (payload.s !== null && payload.s !== undefined) { this.sequence = payload.s; } @@ -251,7 +255,7 @@ export class GatewayPlugin extends Plugin { true, ); } else { - void this.identifyWithConcurrency().catch((error: unknown) => { + void this.identifyWithConcurrency(sourceSocket).catch((error: unknown) => { this.emitter.emit( "error", error instanceof Error ? error : new Error(String(error), { cause: error }), @@ -332,18 +336,17 @@ export class GatewayPlugin extends Plugin { ); } - private async identifyWithConcurrency(): Promise { + private async identifyWithConcurrency(sourceSocket?: ws.WebSocket): Promise { await sharedGatewayIdentifyLimiter.wait({ shardId: this.shardId, maxConcurrency: this.gatewayInfo?.session_start_limit.max_concurrency, }); - const socket = this.ws; - if (!socket || socket.readyState !== READY_STATE_OPEN) { - const error = new Error("Discord gateway socket closed before IDENTIFY could be sent"); - this.emitter.emit("error", error); - if (socket) { - this.scheduleReconnect(false); - } + const socket = sourceSocket ?? this.ws; + if (!socket || socket !== this.ws) { + return; + } + if (socket.readyState !== READY_STATE_OPEN) { + this.scheduleReconnect(false); return; } this.identify();