fix(discord): bound identify limiter clock skew

This commit is contained in:
Peter Steinberger
2026-05-30 17:46:06 -04:00
parent 21478cab93
commit c767b37e3b
2 changed files with 63 additions and 4 deletions

View File

@@ -2,21 +2,36 @@ import { parseFiniteNumber } from "openclaw/plugin-sdk/number-runtime";
const IDENTIFY_WINDOW_MS = 5_000;
type IdentifyRateState = {
lastObservedAt: number;
nextAllowedAt: number;
};
function normalizeMaxConcurrency(value: number | undefined): number {
const parsed = parseFiniteNumber(value);
return parsed === undefined ? 1 : Math.max(1, Math.floor(parsed));
}
class GatewayIdentifyLimiter {
private nextAllowedAtByKey = new Map<number, number>();
private stateByKey = new Map<number, IdentifyRateState>();
async wait(params: { shardId?: number; maxConcurrency?: number }): Promise<void> {
const maxConcurrency = normalizeMaxConcurrency(params.maxConcurrency);
const rateKey = (params.shardId ?? 0) % maxConcurrency;
const now = Date.now();
const nextAllowedAt = this.nextAllowedAtByKey.get(rateKey) ?? now;
const state = this.stateByKey.get(rateKey);
const clockMovedBackward = state !== undefined && now < state.lastObservedAt;
const nextAllowedAt =
state === undefined
? now
: clockMovedBackward
? now + IDENTIFY_WINDOW_MS
: state.nextAllowedAt;
const waitMs = Math.max(0, nextAllowedAt - now);
this.nextAllowedAtByKey.set(rateKey, Math.max(now, nextAllowedAt) + IDENTIFY_WINDOW_MS);
this.stateByKey.set(rateKey, {
lastObservedAt: now,
nextAllowedAt: Math.max(now, nextAllowedAt) + IDENTIFY_WINDOW_MS,
});
if (waitMs > 0) {
await new Promise<void>((resolve) => {
const timer = setTimeout(resolve, waitMs);
@@ -26,7 +41,7 @@ class GatewayIdentifyLimiter {
}
reset(): void {
this.nextAllowedAtByKey.clear();
this.stateByKey.clear();
}
}

View File

@@ -257,6 +257,50 @@ describe("GatewayPlugin", () => {
expect(secondResolved).toBe(true);
});
it("bounds identify waits after a backward clock jump", async () => {
vi.useFakeTimers();
const timeoutSpy = vi.spyOn(globalThis, "setTimeout");
try {
vi.setSystemTime(1_000_000_000_000);
await sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 });
vi.setSystemTime(0);
const second = sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 });
expect(timeoutSpy).toHaveBeenCalledWith(expect.any(Function), 5_000);
await vi.advanceTimersByTimeAsync(5_000);
await expect(second).resolves.toBeUndefined();
} finally {
timeoutSpy.mockRestore();
}
});
it("preserves queued identify spacing in the same bucket", async () => {
vi.useFakeTimers();
vi.setSystemTime(0);
await sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 });
let secondResolved = false;
let thirdResolved = false;
const second = sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }).then(() => {
secondResolved = true;
});
const third = sharedGatewayIdentifyLimiter.wait({ shardId: 0, maxConcurrency: 1 }).then(() => {
thirdResolved = true;
});
await vi.advanceTimersByTimeAsync(5_000);
await second;
expect(secondResolved).toBe(true);
expect(thirdResolved).toBe(false);
await vi.advanceTimersByTimeAsync(5_000);
await third;
expect(thirdResolved).toBe(true);
});
it("preserves MESSAGE_CREATE author payloads for inbound dispatch", async () => {
const gateway = new GatewayPlugin({ autoInteractions: false });
const dispatchGatewayEvent = vi.fn(async (eventValue: string, dataValue: unknown) => {});