diff --git a/src/auto-reply/inbound-debounce.ts b/src/auto-reply/inbound-debounce.ts index 438adce5945..c8461639010 100644 --- a/src/auto-reply/inbound-debounce.ts +++ b/src/auto-reply/inbound-debounce.ts @@ -37,7 +37,6 @@ type DebounceBuffer = { items: T[]; timeout: ReturnType | null; debounceMs: number; - ready: Promise; releaseReady: () => void; readyReleased: boolean; task: Promise; @@ -86,6 +85,27 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return next; }; + const enqueueReservedKeyTask = (key: string, task: () => Promise) => { + let readyReleased = false; + let releaseReady!: () => void; + const ready = new Promise((resolve) => { + releaseReady = resolve; + }); + return { + task: enqueueKeyTask(key, async () => { + await ready; + await task(); + }), + release: () => { + if (readyReleased) { + return; + } + readyReleased = true; + releaseReady(); + }, + }; + }; + const releaseBuffer = (buffer: DebounceBuffer) => { if (buffer.readyReleased) { return; @@ -132,13 +152,24 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams const canDebounce = debounceMs > 0 && (params.shouldDebounce?.(item) ?? true); if (!canDebounce || !key) { - if (key && buffers.has(key)) { - await flushKey(key); - } if (key) { - await enqueueKeyTask(key, async () => { + if (!buffers.has(key)) { + await enqueueKeyTask(key, async () => { + await runFlush([item]); + }); + return; + } + // Reserve the keyed immediate slot before forcing the pending buffer + // to flush so fire-and-forget callers cannot be overtaken. + const reservedTask = enqueueReservedKeyTask(key, async () => { await runFlush([item]); }); + try { + await flushKey(key); + } finally { + reservedTask.release(); + } + await reservedTask.task; } else { await runFlush([item]); } @@ -153,25 +184,21 @@ export function createInboundDebouncer(params: InboundDebounceCreateParams return; } - let releaseReady!: () => void; - const buffer: DebounceBuffer = { - items: [item], - timeout: null, - debounceMs, - ready: new Promise((resolve) => { - releaseReady = resolve; - }), - releaseReady, - readyReleased: false, - task: Promise.resolve(), - }; - buffer.task = enqueueKeyTask(key, async () => { - await buffer.ready; + let buffer!: DebounceBuffer; + const reservedTask = enqueueReservedKeyTask(key, async () => { if (buffer.items.length === 0) { return; } await runFlush(buffer.items); }); + buffer = { + items: [item], + timeout: null, + debounceMs, + releaseReady: reservedTask.release, + readyReleased: false, + task: reservedTask.task, + }; buffers.set(key, buffer); scheduleFlush(key, buffer); }; diff --git a/src/auto-reply/inbound.test.ts b/src/auto-reply/inbound.test.ts index 3f2fb63d188..2200c1c0554 100644 --- a/src/auto-reply/inbound.test.ts +++ b/src/auto-reply/inbound.test.ts @@ -401,6 +401,74 @@ describe("createInboundDebouncer", () => { setTimeoutSpy.mockRestore(); } }); + + it("keeps fire-and-forget keyed work ahead of a later buffered item", async () => { + const started: string[] = []; + const finished: string[] = []; + let releaseFirst!: () => void; + const firstGate = new Promise((resolve) => { + releaseFirst = resolve; + }); + + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + const debouncer = createInboundDebouncer<{ key: string; id: string; debounce: boolean }>({ + debounceMs: 50, + buildKey: (item) => item.key, + shouldDebounce: (item) => item.debounce, + onFlush: async (items) => { + const ids = items.map((entry) => entry.id).join(","); + started.push(ids); + if (ids === "1") { + await firstGate; + } + finished.push(ids); + }, + }); + + try { + await debouncer.enqueue({ key: "a", id: "1", debounce: true }); + + const firstTimerIndex = setTimeoutSpy.mock.calls.findLastIndex((call) => call[1] === 50); + expect(firstTimerIndex).toBeGreaterThanOrEqual(0); + clearTimeout( + setTimeoutSpy.mock.results[firstTimerIndex]?.value as ReturnType, + ); + const firstFlush = ( + setTimeoutSpy.mock.calls[firstTimerIndex]?.[0] as (() => Promise) | undefined + )?.(); + + await vi.waitFor(() => { + expect(started).toEqual(["1"]); + }); + + const secondEnqueue = debouncer.enqueue({ key: "a", id: "2", debounce: false }); + const thirdEnqueue = debouncer.enqueue({ key: "a", id: "3", debounce: true }); + + const thirdTimerIndex = setTimeoutSpy.mock.calls.findLastIndex( + (call, index) => index > firstTimerIndex && call[1] === 50, + ); + expect(thirdTimerIndex).toBeGreaterThan(firstTimerIndex); + clearTimeout( + setTimeoutSpy.mock.results[thirdTimerIndex]?.value as ReturnType, + ); + const thirdFlush = ( + setTimeoutSpy.mock.calls[thirdTimerIndex]?.[0] as (() => Promise) | undefined + )?.(); + + await Promise.resolve(); + + expect(started).toEqual(["1"]); + expect(finished).toEqual([]); + + releaseFirst(); + await Promise.all([firstFlush, secondEnqueue, thirdFlush, thirdEnqueue]); + + expect(started).toEqual(["1", "2", "3"]); + expect(finished).toEqual(["1", "2", "3"]); + } finally { + setTimeoutSpy.mockRestore(); + } + }); }); describe("initSessionState BodyStripped", () => {