fix(plugin-sdk): serialize claimable dedupe races

This commit is contained in:
Vincent Koc
2026-04-13 15:27:52 +01:00
parent df9a38120b
commit 6c12ec1ed2
2 changed files with 30 additions and 4 deletions

View File

@@ -129,6 +129,24 @@ describe("createClaimableDedupe", () => {
await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "duplicate" });
});
it("serializes concurrent first-claim races onto one in-flight owner", async () => {
const dedupe = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
});
const claims = await Promise.all([dedupe.claim("line:race-1"), dedupe.claim("line:race-1")]);
expect(claims.filter((claim) => claim.kind === "claimed")).toHaveLength(1);
expect(claims.filter((claim) => claim.kind === "inflight")).toHaveLength(1);
const waitingClaim = claims.find((claim) => claim.kind === "inflight");
await expect(dedupe.commit("line:race-1")).resolves.toBe(true);
if (waitingClaim?.kind === "inflight") {
await expect(waitingClaim.pending).resolves.toBe(true);
}
await expect(dedupe.claim("line:race-1")).resolves.toEqual({ kind: "duplicate" });
});
it("rejects waiting duplicates when the active claim releases with an error", async () => {
const dedupe = createClaimableDedupe({
ttlMs: 10_000,

View File

@@ -347,9 +347,6 @@ export function createClaimableDedupe(options: ClaimableDedupeOptions): Claimabl
if (existing) {
return { kind: "inflight", pending: existing.promise };
}
if (await hasRecent(trimmed, dedupeOptions)) {
return { kind: "duplicate" };
}
let resolve!: (result: boolean) => void;
let reject!: (error: unknown) => void;
@@ -359,7 +356,18 @@ export function createClaimableDedupe(options: ClaimableDedupeOptions): Claimabl
});
void promise.catch(() => {});
inflight.set(scopedKey, { promise, resolve, reject });
return { kind: "claimed" };
try {
if (await hasRecent(trimmed, dedupeOptions)) {
resolve(false);
inflight.delete(scopedKey);
return { kind: "duplicate" };
}
return { kind: "claimed" };
} catch (error) {
reject(error);
inflight.delete(scopedKey);
throw error;
}
}
async function commit(