diff --git a/src/plugin-sdk/persistent-dedupe.test.ts b/src/plugin-sdk/persistent-dedupe.test.ts index f072482b33f..31c740ff6e7 100644 --- a/src/plugin-sdk/persistent-dedupe.test.ts +++ b/src/plugin-sdk/persistent-dedupe.test.ts @@ -129,6 +129,24 @@ describe("createClaimableDedupe", () => { await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "duplicate" }); }); + it("serializes concurrent first-claim races onto one in-flight owner", async () => { + const dedupe = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + }); + + const claims = await Promise.all([dedupe.claim("line:race-1"), dedupe.claim("line:race-1")]); + expect(claims.filter((claim) => claim.kind === "claimed")).toHaveLength(1); + expect(claims.filter((claim) => claim.kind === "inflight")).toHaveLength(1); + + const waitingClaim = claims.find((claim) => claim.kind === "inflight"); + await expect(dedupe.commit("line:race-1")).resolves.toBe(true); + if (waitingClaim?.kind === "inflight") { + await expect(waitingClaim.pending).resolves.toBe(true); + } + await expect(dedupe.claim("line:race-1")).resolves.toEqual({ kind: "duplicate" }); + }); + it("rejects waiting duplicates when the active claim releases with an error", async () => { const dedupe = createClaimableDedupe({ ttlMs: 10_000, diff --git a/src/plugin-sdk/persistent-dedupe.ts b/src/plugin-sdk/persistent-dedupe.ts index e07d45073dd..79dd33e2779 100644 --- a/src/plugin-sdk/persistent-dedupe.ts +++ b/src/plugin-sdk/persistent-dedupe.ts @@ -347,9 +347,6 @@ export function createClaimableDedupe(options: ClaimableDedupeOptions): Claimabl if (existing) { return { kind: "inflight", pending: existing.promise }; } - if (await hasRecent(trimmed, dedupeOptions)) { - return { kind: "duplicate" }; - } let resolve!: (result: boolean) => void; let reject!: (error: unknown) => void; @@ -359,7 +356,18 @@ export function createClaimableDedupe(options: ClaimableDedupeOptions): Claimabl }); void promise.catch(() => {}); inflight.set(scopedKey, { promise, resolve, reject }); - return { kind: "claimed" }; + try { + if (await hasRecent(trimmed, dedupeOptions)) { + resolve(false); + inflight.delete(scopedKey); + return { kind: "duplicate" }; + } + return { kind: "claimed" }; + } catch (error) { + reject(error); + inflight.delete(scopedKey); + throw error; + } } async function commit(