Telegram: preserve inbound debounce order

This commit is contained in:
Onur Solmaz
2026-03-23 17:19:42 +01:00
committed by Peter Steinberger
parent b15462ebaf
commit 9a34a602bd
4 changed files with 277 additions and 14 deletions

View File

@@ -37,6 +37,10 @@ type DebounceBuffer<T> = {
items: T[];
timeout: ReturnType<typeof setTimeout> | null;
debounceMs: number;
ready: Promise<void>;
releaseReady: () => void;
readyReleased: boolean;
task: Promise<void>;
};
export type InboundDebounceCreateParams<T> = {
@@ -50,6 +54,7 @@ export type InboundDebounceCreateParams<T> = {
export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>) {
const buffers = new Map<string, DebounceBuffer<T>>();
const keyChains = new Map<string, Promise<void>>();
const defaultDebounceMs = Math.max(0, Math.trunc(params.debounceMs));
const resolveDebounceMs = (item: T) => {
@@ -60,20 +65,47 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return Math.max(0, Math.trunc(resolved));
};
const runFlush = async (items: T[]) => {
try {
await params.onFlush(items);
} catch (err) {
params.onError?.(err, items);
}
};
const enqueueKeyTask = (key: string, task: () => Promise<void>) => {
const previous = keyChains.get(key) ?? Promise.resolve();
const next = previous.catch(() => undefined).then(task);
const settled = next.catch(() => undefined);
keyChains.set(key, settled);
void settled.finally(() => {
if (keyChains.get(key) === settled) {
keyChains.delete(key);
}
});
return next;
};
const releaseBuffer = (buffer: DebounceBuffer<T>) => {
if (buffer.readyReleased) {
return;
}
buffer.readyReleased = true;
buffer.releaseReady();
};
const flushBuffer = async (key: string, buffer: DebounceBuffer<T>) => {
buffers.delete(key);
if (buffers.get(key) === buffer) {
buffers.delete(key);
}
if (buffer.timeout) {
clearTimeout(buffer.timeout);
buffer.timeout = null;
}
if (buffer.items.length === 0) {
return;
}
try {
await params.onFlush(buffer.items);
} catch (err) {
params.onError?.(err, buffer.items);
}
// Reserve each key's execution slot as soon as the first buffered item
// arrives, so later same-key work cannot overtake a timer-backed flush.
releaseBuffer(buffer);
await buffer.task;
};
const flushKey = async (key: string) => {
@@ -103,10 +135,12 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
if (key && buffers.has(key)) {
await flushKey(key);
}
try {
await params.onFlush([item]);
} catch (err) {
params.onError?.(err, [item]);
if (key) {
await enqueueKeyTask(key, async () => {
await runFlush([item]);
});
} else {
await runFlush([item]);
}
return;
}
@@ -119,7 +153,25 @@ export function createInboundDebouncer<T>(params: InboundDebounceCreateParams<T>
return;
}
const buffer: DebounceBuffer<T> = { items: [item], timeout: null, debounceMs };
let releaseReady!: () => void;
const buffer: DebounceBuffer<T> = {
items: [item],
timeout: null,
debounceMs,
ready: new Promise<void>((resolve) => {
releaseReady = resolve;
}),
releaseReady,
readyReleased: false,
task: Promise.resolve(),
};
buffer.task = enqueueKeyTask(key, async () => {
await buffer.ready;
if (buffer.items.length === 0) {
return;
}
await runFlush(buffer.items);
});
buffers.set(key, buffer);
scheduleFlush(key, buffer);
};

View File

@@ -347,6 +347,60 @@ describe("createInboundDebouncer", () => {
vi.useRealTimers();
});
it("keeps later same-key work behind a timer-backed flush that already started", async () => {
const started: string[] = [];
const finished: string[] = [];
let releaseFirst!: () => void;
const firstGate = new Promise<void>((resolve) => {
releaseFirst = resolve;
});
const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout");
const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({
debounceMs: 50,
buildKey: (item) => item.key,
shouldDebounce: (item) => item.debounce,
onFlush: async (items) => {
const ids = items.map((entry) => entry.id).join(",");
started.push(ids);
if (ids === "1") {
await firstGate;
}
finished.push(ids);
},
});
try {
await debouncer.enqueue({ key: "a", id: "1", debounce: true });
const timerIndex = setTimeoutSpy.mock.calls.findLastIndex((call) => call[1] === 50);
expect(timerIndex).toBeGreaterThanOrEqual(0);
clearTimeout(setTimeoutSpy.mock.results[timerIndex]?.value as ReturnType<typeof setTimeout>);
const flushTimer = setTimeoutSpy.mock.calls[timerIndex]?.[0] as
| (() => Promise<void>)
| undefined;
const firstFlush = flushTimer?.();
await vi.waitFor(() => {
expect(started).toEqual(["1"]);
});
const secondEnqueue = debouncer.enqueue({ key: "a", id: "2", debounce: false });
await Promise.resolve();
expect(started).toEqual(["1"]);
expect(finished).toEqual([]);
releaseFirst();
await Promise.all([firstFlush, secondEnqueue]);
expect(started).toEqual(["1", "2"]);
expect(finished).toEqual(["1", "2"]);
} finally {
setTimeoutSpy.mockRestore();
}
});
});
describe("initSessionState BodyStripped", () => {