Files
openclaw/src/plugin-sdk/persistent-dedupe.test.ts
2026-04-25 11:04:16 +01:00

146 lines
6.1 KiB
TypeScript

import path from "node:path";
import { describe, expect, it } from "vitest";
import { createClaimableDedupe, createPersistentDedupe } from "./persistent-dedupe.js";
import { createPluginSdkTestHarness } from "./test-helpers.js";
const { createTempDir } = createPluginSdkTestHarness();
function createDedupe(root: string, overrides?: { ttlMs?: number }) {
return createPersistentDedupe({
ttlMs: overrides?.ttlMs ?? 24 * 60 * 60 * 1000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
});
}
describe("createPersistentDedupe", () => {
it("deduplicates keys, persists across instances, warms up, and checks recent keys", async () => {
const root = await createTempDir("openclaw-dedupe-");
const first = createDedupe(root);
expect(await first.checkAndRecord("m1", { namespace: "a" })).toBe(true);
expect(await first.checkAndRecord("m1", { namespace: "a" })).toBe(false);
expect(await first.checkAndRecord("m2", { namespace: "a" })).toBe(true);
const second = createDedupe(root);
expect(await second.hasRecent("m1", { namespace: "a" })).toBe(true);
expect(await second.hasRecent("missing", { namespace: "a" })).toBe(false);
expect(await second.warmup("a")).toBe(2);
expect(await second.checkAndRecord("m1", { namespace: "a" })).toBe(false);
expect(await second.checkAndRecord("m2", { namespace: "a" })).toBe(false);
expect(await second.checkAndRecord("m3", { namespace: "a" })).toBe(true);
expect(await second.checkAndRecord("m1", { namespace: "b" })).toBe(true);
const raceDedupe = createDedupe(root, { ttlMs: 10_000 });
const [raceFirst, raceSecond] = await Promise.all([
raceDedupe.checkAndRecord("race-key", { namespace: "feishu" }),
raceDedupe.checkAndRecord("race-key", { namespace: "feishu" }),
]);
expect(raceFirst).toBe(true);
expect(raceSecond).toBe(false);
});
it("falls back to memory-only behavior on disk errors", async () => {
const dedupe = createPersistentDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: () => path.join("/dev/null", "dedupe.json"),
});
expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(true);
expect(await dedupe.checkAndRecord("memory-only", { namespace: "x" })).toBe(false);
});
it("warms empty namespaces and skips expired disk entries", async () => {
const root = await createTempDir("openclaw-dedupe-");
const emptyReader = createDedupe(root, { ttlMs: 10_000 });
expect(await emptyReader.warmup("nonexistent")).toBe(0);
const writer = createDedupe(root, { ttlMs: 1000 });
const oldNow = Date.now() - 2000;
expect(await writer.checkAndRecord("old-msg", { namespace: "acct", now: oldNow })).toBe(true);
expect(await writer.checkAndRecord("new-msg", { namespace: "acct" })).toBe(true);
const reader = createDedupe(root, { ttlMs: 1000 });
expect(await reader.warmup("acct")).toBe(1);
expect(await reader.checkAndRecord("old-msg", { namespace: "acct" })).toBe(true);
expect(await reader.checkAndRecord("new-msg", { namespace: "acct" })).toBe(false);
});
});
describe("createClaimableDedupe", () => {
it("mirrors in-flight duplicates, serializes races, and records on commit", async () => {
const dedupe = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
});
await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "claimed" });
const duplicate = await dedupe.claim("line:evt-1");
expect(duplicate.kind).toBe("inflight");
const commit = dedupe.commit("line:evt-1");
await expect(commit).resolves.toBe(true);
if (duplicate.kind === "inflight") {
await expect(duplicate.pending).resolves.toBe(true);
}
await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "duplicate" });
const claims = await Promise.all([dedupe.claim("line:race-1"), dedupe.claim("line:race-1")]);
expect(claims.filter((claim) => claim.kind === "claimed")).toHaveLength(1);
expect(claims.filter((claim) => claim.kind === "inflight")).toHaveLength(1);
const waitingClaim = claims.find((claim) => claim.kind === "inflight");
await expect(dedupe.commit("line:race-1")).resolves.toBe(true);
if (waitingClaim?.kind === "inflight") {
await expect(waitingClaim.pending).resolves.toBe(true);
}
await expect(dedupe.claim("line:race-1")).resolves.toEqual({ kind: "duplicate" });
});
it("rejects waiting duplicates when the active claim releases with an error", async () => {
const dedupe = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
});
await expect(dedupe.claim("line:evt-2")).resolves.toEqual({ kind: "claimed" });
const duplicate = await dedupe.claim("line:evt-2");
expect(duplicate.kind).toBe("inflight");
const failure = new Error("transient failure");
dedupe.release("line:evt-2", { error: failure });
if (duplicate.kind === "inflight") {
await expect(duplicate.pending).rejects.toThrow("transient failure");
}
await expect(dedupe.claim("line:evt-2")).resolves.toEqual({ kind: "claimed" });
});
it("supports persistent-backed recent checks and warmup", async () => {
const root = await createTempDir("openclaw-claimable-dedupe-");
const writer = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
});
await expect(writer.claim("m1", { namespace: "acct" })).resolves.toEqual({ kind: "claimed" });
await expect(writer.commit("m1", { namespace: "acct" })).resolves.toBe(true);
const reader = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
});
expect(await reader.hasRecent("m1", { namespace: "acct" })).toBe(true);
expect(await reader.warmup("acct")).toBe(1);
await expect(reader.claim("m1", { namespace: "acct" })).resolves.toEqual({
kind: "duplicate",
});
});
});