mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-18 20:24:46 +00:00
fix(discord): bind delayed identify to socket generation (#82225)
* fix(discord): bind delayed identify to socket generation * chore: refresh CI after main repairs
This commit is contained in:
@@ -180,15 +180,60 @@ describe("GatewayPlugin", () => {
|
||||
}
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
new Error("Discord gateway socket closed before IDENTIFY could be sent"),
|
||||
);
|
||||
expect(errorSpy).not.toHaveBeenCalled();
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
|
||||
expect(gateway.connectCalls).toEqual([false, false]);
|
||||
expect(gateway.sockets).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("does not identify a replacement socket from a stale HELLO", async () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(0);
|
||||
await sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 });
|
||||
const gateway = new TestGatewayPlugin({
|
||||
autoInteractions: false,
|
||||
url: "wss://gateway.example.test",
|
||||
});
|
||||
|
||||
gateway.connect(false);
|
||||
const originalSocket = gateway.sockets[0];
|
||||
originalSocket?.emit("open");
|
||||
originalSocket?.emit(
|
||||
"message",
|
||||
JSON.stringify({
|
||||
op: GatewayOpcodes.Hello,
|
||||
d: { heartbeat_interval: 45_000 },
|
||||
s: null,
|
||||
}),
|
||||
);
|
||||
originalSocket?.emit("close", 1006);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
expect(gateway.connectCalls).toEqual([false, true]);
|
||||
const replacementSocket = gateway.sockets[1];
|
||||
replacementSocket?.emit("open");
|
||||
|
||||
await vi.advanceTimersByTimeAsync(3_000);
|
||||
expect(replacementSocket?.send).not.toHaveBeenCalledWith(
|
||||
expect.stringContaining(`"op":${GatewayOpcodes.Identify}`),
|
||||
);
|
||||
|
||||
replacementSocket?.emit(
|
||||
"message",
|
||||
JSON.stringify({
|
||||
op: GatewayOpcodes.Hello,
|
||||
d: { heartbeat_interval: 45_000 },
|
||||
s: null,
|
||||
}),
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_000);
|
||||
expect(replacementSocket?.send).toHaveBeenCalledWith(
|
||||
expect.stringContaining(`"op":${GatewayOpcodes.Identify}`),
|
||||
);
|
||||
});
|
||||
|
||||
it("preserves MESSAGE_CREATE author payloads for inbound dispatch", async () => {
|
||||
const gateway = new GatewayPlugin({ autoInteractions: false });
|
||||
const dispatchGatewayEvent = vi.fn(async (_event: string, _data: unknown) => {});
|
||||
|
||||
@@ -195,7 +195,7 @@ export class GatewayPlugin extends Plugin {
|
||||
this.emitter.emit("error", new Error("Invalid gateway payload"));
|
||||
return;
|
||||
}
|
||||
this.handlePayload(payload, resume);
|
||||
this.handlePayload(payload, resume, socket);
|
||||
});
|
||||
socket.on("close", (code) => {
|
||||
if (socket !== this.ws) {
|
||||
@@ -229,7 +229,11 @@ export class GatewayPlugin extends Plugin {
|
||||
});
|
||||
}
|
||||
|
||||
private handlePayload(payload: GatewayReceivePayload, resume: boolean): void {
|
||||
private handlePayload(
|
||||
payload: GatewayReceivePayload,
|
||||
resume: boolean,
|
||||
sourceSocket?: ws.WebSocket,
|
||||
): void {
|
||||
if (payload.s !== null && payload.s !== undefined) {
|
||||
this.sequence = payload.s;
|
||||
}
|
||||
@@ -251,7 +255,7 @@ export class GatewayPlugin extends Plugin {
|
||||
true,
|
||||
);
|
||||
} else {
|
||||
void this.identifyWithConcurrency().catch((error: unknown) => {
|
||||
void this.identifyWithConcurrency(sourceSocket).catch((error: unknown) => {
|
||||
this.emitter.emit(
|
||||
"error",
|
||||
error instanceof Error ? error : new Error(String(error), { cause: error }),
|
||||
@@ -332,18 +336,17 @@ export class GatewayPlugin extends Plugin {
|
||||
);
|
||||
}
|
||||
|
||||
private async identifyWithConcurrency(): Promise<void> {
|
||||
private async identifyWithConcurrency(sourceSocket?: ws.WebSocket): Promise<void> {
|
||||
await sharedGatewayIdentifyLimiter.wait({
|
||||
shardId: this.shardId,
|
||||
maxConcurrency: this.gatewayInfo?.session_start_limit.max_concurrency,
|
||||
});
|
||||
const socket = this.ws;
|
||||
if (!socket || socket.readyState !== READY_STATE_OPEN) {
|
||||
const error = new Error("Discord gateway socket closed before IDENTIFY could be sent");
|
||||
this.emitter.emit("error", error);
|
||||
if (socket) {
|
||||
this.scheduleReconnect(false);
|
||||
}
|
||||
const socket = sourceSocket ?? this.ws;
|
||||
if (!socket || socket !== this.ws) {
|
||||
return;
|
||||
}
|
||||
if (socket.readyState !== READY_STATE_OPEN) {
|
||||
this.scheduleReconnect(false);
|
||||
return;
|
||||
}
|
||||
this.identify();
|
||||
|
||||
Reference in New Issue
Block a user