diff --git a/extensions/discord/src/internal/gateway-identify-limiter.ts b/extensions/discord/src/internal/gateway-identify-limiter.ts index 17b499ad8fd..78f8bdf7707 100644 --- a/extensions/discord/src/internal/gateway-identify-limiter.ts +++ b/extensions/discord/src/internal/gateway-identify-limiter.ts @@ -2,21 +2,36 @@ import { parseFiniteNumber } from "openclaw/plugin-sdk/number-runtime"; const IDENTIFY_WINDOW_MS = 5_000; +type IdentifyRateState = { + lastObservedAt: number; + nextAllowedAt: number; +}; + function normalizeMaxConcurrency(value: number | undefined): number { const parsed = parseFiniteNumber(value); return parsed === undefined ? 1 : Math.max(1, Math.floor(parsed)); } class GatewayIdentifyLimiter { - private nextAllowedAtByKey = new Map(); + private stateByKey = new Map(); async wait(params: { shardId?: number; maxConcurrency?: number }): Promise { const maxConcurrency = normalizeMaxConcurrency(params.maxConcurrency); const rateKey = (params.shardId ?? 0) % maxConcurrency; const now = Date.now(); - const nextAllowedAt = this.nextAllowedAtByKey.get(rateKey) ?? now; + const state = this.stateByKey.get(rateKey); + const clockMovedBackward = state !== undefined && now < state.lastObservedAt; + const nextAllowedAt = + state === undefined + ? now + : clockMovedBackward + ? now + IDENTIFY_WINDOW_MS + : state.nextAllowedAt; const waitMs = Math.max(0, nextAllowedAt - now); - this.nextAllowedAtByKey.set(rateKey, Math.max(now, nextAllowedAt) + IDENTIFY_WINDOW_MS); + this.stateByKey.set(rateKey, { + lastObservedAt: now, + nextAllowedAt: Math.max(now, nextAllowedAt) + IDENTIFY_WINDOW_MS, + }); if (waitMs > 0) { await new Promise((resolve) => { const timer = setTimeout(resolve, waitMs); @@ -26,7 +41,7 @@ class GatewayIdentifyLimiter { } reset(): void { - this.nextAllowedAtByKey.clear(); + this.stateByKey.clear(); } } diff --git a/extensions/discord/src/internal/gateway.test.ts b/extensions/discord/src/internal/gateway.test.ts index d6fcc4a87a5..2de7e776291 100644 --- a/extensions/discord/src/internal/gateway.test.ts +++ b/extensions/discord/src/internal/gateway.test.ts @@ -257,6 +257,50 @@ describe("GatewayPlugin", () => { expect(secondResolved).toBe(true); }); + it("bounds identify waits after a backward clock jump", async () => { + vi.useFakeTimers(); + const timeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + vi.setSystemTime(1_000_000_000_000); + await sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }); + + vi.setSystemTime(0); + const second = sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }); + + expect(timeoutSpy).toHaveBeenCalledWith(expect.any(Function), 5_000); + + await vi.advanceTimersByTimeAsync(5_000); + await expect(second).resolves.toBeUndefined(); + } finally { + timeoutSpy.mockRestore(); + } + }); + + it("preserves queued identify spacing in the same bucket", async () => { + vi.useFakeTimers(); + vi.setSystemTime(0); + + await sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }); + let secondResolved = false; + let thirdResolved = false; + + const second = sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }).then(() => { + secondResolved = true; + }); + const third = sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }).then(() => { + thirdResolved = true; + }); + + await vi.advanceTimersByTimeAsync(5_000); + await second; + expect(secondResolved).toBe(true); + expect(thirdResolved).toBe(false); + + await vi.advanceTimersByTimeAsync(5_000); + await third; + expect(thirdResolved).toBe(true); + }); + it("preserves MESSAGE_CREATE author payloads for inbound dispatch", async () => { const gateway = new GatewayPlugin({ autoInteractions: false }); const dispatchGatewayEvent = vi.fn(async (eventValue: string, dataValue: unknown) => {});