mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 08:20:43 +00:00
fix(discord): align internal gateway and component parity
This commit is contained in:
@@ -80,6 +80,25 @@ describe("ComponentRegistry", () => {
|
||||
select,
|
||||
);
|
||||
});
|
||||
|
||||
it("uses each registered component parser when resolving specific keys", () => {
|
||||
const registry = new ComponentRegistry<Button>();
|
||||
class EncodedButton extends Button {
|
||||
label = "button";
|
||||
customId = "encoded:seed=one";
|
||||
customIdParser = (id: string) => ({
|
||||
key: id.startsWith("encoded:") ? "encoded" : parseCustomId(id).key,
|
||||
data: {},
|
||||
});
|
||||
}
|
||||
const button = new EncodedButton();
|
||||
|
||||
registry.register(button);
|
||||
|
||||
expect(registry.resolve("encoded:payload=two", { componentType: ComponentType.Button })).toBe(
|
||||
button,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Client.deployCommands", () => {
|
||||
|
||||
@@ -71,11 +71,19 @@ export class ComponentRegistry<
|
||||
}
|
||||
|
||||
resolve(customId: string, options?: { componentType?: number }): T | undefined {
|
||||
const entries = [
|
||||
...(this.entries.get(parseRegistryKey(customId)) ?? []),
|
||||
...this.wildcardEntries,
|
||||
];
|
||||
return entries.find((entry) => {
|
||||
for (const entries of this.entries.values()) {
|
||||
const match = entries.find((entry) => {
|
||||
if (options?.componentType !== undefined && entry.type !== options.componentType) {
|
||||
return false;
|
||||
}
|
||||
const parser = entry.customIdParser ?? parseCustomId;
|
||||
return parseRegistryKey(entry.customId, parser) === parseRegistryKey(customId, parser);
|
||||
});
|
||||
if (match) {
|
||||
return match;
|
||||
}
|
||||
}
|
||||
return this.wildcardEntries.find((entry) => {
|
||||
if (options?.componentType !== undefined && entry.type !== options.componentType) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -56,6 +56,16 @@ class TestGatewayPlugin extends GatewayPlugin {
|
||||
}
|
||||
}
|
||||
|
||||
type GatewaySessionState = {
|
||||
sessionId: string | null;
|
||||
resumeGatewayUrl: string | null;
|
||||
sequence: number | null;
|
||||
};
|
||||
|
||||
function gatewaySessionState(gateway: GatewayPlugin): GatewaySessionState {
|
||||
return gateway as unknown as GatewaySessionState;
|
||||
}
|
||||
|
||||
describe("GatewayPlugin", () => {
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
@@ -263,20 +273,71 @@ describe("GatewayPlugin", () => {
|
||||
expect(gateway.sockets).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("re-identifies after non-resumable gateway closes", async () => {
|
||||
it.each([GatewayCloseCodes.InvalidSeq, GatewayCloseCodes.AlreadyAuthenticated])(
|
||||
"re-identifies after non-resumable gateway close %s",
|
||||
async (closeCode) => {
|
||||
vi.useFakeTimers();
|
||||
const gateway = new TestGatewayPlugin({
|
||||
autoInteractions: false,
|
||||
url: "wss://gateway.example.test",
|
||||
});
|
||||
|
||||
gateway.connect(false);
|
||||
gateway.sockets[0]?.emit("open");
|
||||
gateway.sockets[0]?.emit("close", closeCode);
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
|
||||
expect(gateway.connectCalls).toEqual([false, false]);
|
||||
expect(gateway.sockets).toHaveLength(2);
|
||||
},
|
||||
);
|
||||
|
||||
it("clears resume state after invalid session false", async () => {
|
||||
vi.useFakeTimers();
|
||||
const gateway = new TestGatewayPlugin({
|
||||
autoInteractions: false,
|
||||
url: "wss://gateway.example.test",
|
||||
});
|
||||
const sessionState = gatewaySessionState(gateway);
|
||||
sessionState.sessionId = "session1";
|
||||
sessionState.resumeGatewayUrl = "wss://resume.example.test";
|
||||
sessionState.sequence = 123;
|
||||
|
||||
gateway.connect(false);
|
||||
gateway.sockets[0]?.emit("open");
|
||||
gateway.sockets[0]?.emit("close", GatewayCloseCodes.InvalidSeq);
|
||||
(
|
||||
gateway as unknown as {
|
||||
handlePayload(payload: { op: number; d: unknown }, resume: boolean): void;
|
||||
}
|
||||
).handlePayload({ op: GatewayOpcodes.InvalidSession, d: false }, true);
|
||||
await vi.advanceTimersByTimeAsync(2_000);
|
||||
|
||||
expect(gateway.connectCalls).toEqual([false, false]);
|
||||
expect(gateway.sockets).toHaveLength(2);
|
||||
expect(sessionState.sessionId).toBeNull();
|
||||
expect(sessionState.resumeGatewayUrl).toBeNull();
|
||||
expect(sessionState.sequence).toBeNull();
|
||||
});
|
||||
|
||||
it("includes close code details when reconnect attempts are exhausted", async () => {
|
||||
vi.useFakeTimers();
|
||||
const gateway = new TestGatewayPlugin({
|
||||
autoInteractions: false,
|
||||
reconnect: { maxAttempts: 0 },
|
||||
url: "wss://gateway.example.test",
|
||||
});
|
||||
const errorSpy = vi.fn();
|
||||
gateway.emitter.on("error", errorSpy);
|
||||
|
||||
gateway.connect(false);
|
||||
gateway.sockets[0]?.emit("open");
|
||||
gateway.sockets[0]?.emit("close", 1006);
|
||||
await vi.advanceTimersByTimeAsync(30_000);
|
||||
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
new Error("Max reconnect attempts (0) reached after close code 1006"),
|
||||
);
|
||||
expect(gateway.connectCalls).toEqual([false]);
|
||||
expect(gateway.sockets).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("does not reconnect after fatal gateway closes", async () => {
|
||||
|
||||
@@ -85,7 +85,8 @@ function canResumeAfterGatewayClose(code: GatewayCloseCodes): boolean {
|
||||
return (
|
||||
code !== GatewayCloseCodes.NotAuthenticated &&
|
||||
code !== GatewayCloseCodes.InvalidSeq &&
|
||||
code !== GatewayCloseCodes.SessionTimedOut
|
||||
code !== GatewayCloseCodes.SessionTimedOut &&
|
||||
code !== GatewayCloseCodes.AlreadyAuthenticated
|
||||
);
|
||||
}
|
||||
|
||||
@@ -232,7 +233,11 @@ export class GatewayPlugin extends Plugin {
|
||||
this.emitter.emit("error", new Error(`Fatal gateway close code: ${code}`));
|
||||
return;
|
||||
}
|
||||
this.scheduleReconnect(canResumeAfterGatewayClose(closeCode));
|
||||
const canResume = canResumeAfterGatewayClose(closeCode);
|
||||
if (!canResume) {
|
||||
this.resetSessionState();
|
||||
}
|
||||
this.scheduleReconnect(canResume, closeCode);
|
||||
});
|
||||
socket.on("error", (error) => {
|
||||
if (socket !== this.ws) {
|
||||
@@ -282,6 +287,9 @@ export class GatewayPlugin extends Plugin {
|
||||
});
|
||||
break;
|
||||
case GatewayOpcodes.InvalidSession:
|
||||
if (!payload.d) {
|
||||
this.resetSessionState();
|
||||
}
|
||||
this.scheduleReconnect(payload.d);
|
||||
break;
|
||||
case GatewayOpcodes.Reconnect:
|
||||
@@ -382,7 +390,13 @@ export class GatewayPlugin extends Plugin {
|
||||
}
|
||||
}
|
||||
|
||||
private scheduleReconnect(resume: boolean): void {
|
||||
private resetSessionState(): void {
|
||||
this.sessionId = null;
|
||||
this.resumeGatewayUrl = null;
|
||||
this.sequence = null;
|
||||
}
|
||||
|
||||
private scheduleReconnect(resume: boolean, closeCode?: number): void {
|
||||
if (!this.shouldReconnect) {
|
||||
return;
|
||||
}
|
||||
@@ -395,7 +409,13 @@ export class GatewayPlugin extends Plugin {
|
||||
this.outboundLimiter.clear();
|
||||
this.reconnectAttempts += 1;
|
||||
if (this.reconnectAttempts > (this.options.reconnect?.maxAttempts ?? 50)) {
|
||||
this.emitter.emit("error", new Error("Max reconnect attempts reached"));
|
||||
const maxAttempts = this.options.reconnect?.maxAttempts ?? 50;
|
||||
this.emitter.emit(
|
||||
"error",
|
||||
new Error(
|
||||
`Max reconnect attempts (${maxAttempts}) reached${closeCode !== undefined ? ` after close code ${closeCode}` : ""}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const delay = Math.min(30_000, 1_000 * 2 ** Math.min(this.reconnectAttempts, 5));
|
||||
|
||||
Reference in New Issue
Block a user