feat(plugin-sdk): add claimable dedupe helper

This commit is contained in:
Vincent Koc
2026-04-13 15:03:54 +01:00
parent dc5ed7edea
commit 028434a00f
2 changed files with 311 additions and 3 deletions

View File

@@ -1,6 +1,6 @@
import path from "node:path";
import { describe, expect, it } from "vitest";
import { createPersistentDedupe } from "./persistent-dedupe.js";
import { createClaimableDedupe, createPersistentDedupe } from "./persistent-dedupe.js";
import { createPluginSdkTestHarness } from "./test-helpers.js";
const { createTempDir } = createPluginSdkTestHarness();
@@ -64,6 +64,17 @@ describe("createPersistentDedupe", () => {
expect(await reader.checkAndRecord("msg-3", { namespace: "acct" })).toBe(true);
});
it("checks for recent keys without mutating the store", async () => {
const root = await createTempDir("openclaw-dedupe-");
const writer = createDedupe(root);
expect(await writer.checkAndRecord("peek-me", { namespace: "acct" })).toBe(true);
const reader = createDedupe(root);
expect(await reader.hasRecent("peek-me", { namespace: "acct" })).toBe(true);
expect(await reader.hasRecent("missing", { namespace: "acct" })).toBe(false);
expect(await reader.checkAndRecord("peek-me", { namespace: "acct" })).toBe(false);
});
it.each([
{
name: "returns 0 when no disk file exists",
@@ -98,3 +109,67 @@ describe("createPersistentDedupe", () => {
await verify(reader);
});
});
describe("createClaimableDedupe", () => {
it("mirrors concurrent in-flight duplicates and records on commit", async () => {
const dedupe = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
});
await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "claimed" });
const duplicate = await dedupe.claim("line:evt-1");
expect(duplicate.kind).toBe("inflight");
const commit = dedupe.commit("line:evt-1");
await expect(commit).resolves.toBe(true);
if (duplicate.kind === "inflight") {
await expect(duplicate.pending).resolves.toBe(true);
}
await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "duplicate" });
});
it("rejects waiting duplicates when the active claim releases with an error", async () => {
const dedupe = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
});
await expect(dedupe.claim("line:evt-2")).resolves.toEqual({ kind: "claimed" });
const duplicate = await dedupe.claim("line:evt-2");
expect(duplicate.kind).toBe("inflight");
const failure = new Error("transient failure");
dedupe.release("line:evt-2", { error: failure });
if (duplicate.kind === "inflight") {
await expect(duplicate.pending).rejects.toThrow("transient failure");
}
await expect(dedupe.claim("line:evt-2")).resolves.toEqual({ kind: "claimed" });
});
it("supports persistent-backed recent checks and warmup", async () => {
const root = await createTempDir("openclaw-claimable-dedupe-");
const writer = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
});
await expect(writer.claim("m1", { namespace: "acct" })).resolves.toEqual({ kind: "claimed" });
await expect(writer.commit("m1", { namespace: "acct" })).resolves.toBe(true);
const reader = createClaimableDedupe({
ttlMs: 10_000,
memoryMaxSize: 100,
fileMaxEntries: 1000,
resolveFilePath: (namespace) => path.join(root, `${namespace}.json`),
});
expect(await reader.hasRecent("m1", { namespace: "acct" })).toBe(true);
expect(await reader.warmup("acct")).toBe(1);
await expect(reader.claim("m1", { namespace: "acct" })).resolves.toEqual({
kind: "duplicate",
});
});
});

View File

@@ -22,6 +22,49 @@ export type PersistentDedupeCheckOptions = {
export type PersistentDedupe = {
checkAndRecord: (key: string, options?: PersistentDedupeCheckOptions) => Promise<boolean>;
hasRecent: (key: string, options?: PersistentDedupeCheckOptions) => Promise<boolean>;
warmup: (namespace?: string, onError?: (error: unknown) => void) => Promise<number>;
clearMemory: () => void;
memorySize: () => number;
};
export type ClaimableDedupeClaimResult =
| { kind: "claimed" }
| { kind: "duplicate" }
| { kind: "inflight"; pending: Promise<boolean> };
export type ClaimableDedupeOptions =
| {
ttlMs: number;
memoryMaxSize: number;
resolveFilePath: (namespace: string) => string;
fileMaxEntries: number;
lockOptions?: Partial<FileLockOptions>;
onDiskError?: (error: unknown) => void;
}
| {
ttlMs: number;
memoryMaxSize: number;
resolveFilePath?: undefined;
fileMaxEntries?: undefined;
lockOptions?: undefined;
onDiskError?: undefined;
};
export type ClaimableDedupe = {
claim: (
key: string,
options?: PersistentDedupeCheckOptions,
) => Promise<ClaimableDedupeClaimResult>;
commit: (key: string, options?: PersistentDedupeCheckOptions) => Promise<boolean>;
release: (
key: string,
options?: {
namespace?: string;
error?: unknown;
},
) => void;
hasRecent: (key: string, options?: PersistentDedupeCheckOptions) => Promise<boolean>;
warmup: (namespace?: string, onError?: (error: unknown) => void) => Promise<number>;
clearMemory: () => void;
memorySize: () => number;
@@ -91,6 +134,18 @@ function pruneData(
});
}
function resolveNamespace(namespace?: string): string {
return namespace?.trim() || "global";
}
function resolveScopedKey(namespace: string, key: string): string {
return `${namespace}:${key}`;
}
function isRecentTimestamp(seenAt: number | undefined, ttlMs: number, now: number): boolean {
return seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs);
}
/** Create a dedupe helper that combines in-memory fast checks with a lock-protected disk store. */
export function createPersistentDedupe(options: PersistentDedupeOptions): PersistentDedupe {
const ttlMs = Math.max(0, Math.floor(options.ttlMs));
@@ -134,6 +189,33 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
}
}
async function hasRecentInner(
key: string,
namespace: string,
scopedKey: string,
now: number,
onDiskError?: (error: unknown) => void,
): Promise<boolean> {
if (memory.peek(scopedKey, now)) {
return true;
}
const path = options.resolveFilePath(namespace);
try {
const { value } = await readJsonFileWithFallback<PersistentDedupeData>(path, {});
const data = sanitizeData(value);
const seenAt = data[key];
if (!isRecentTimestamp(seenAt, ttlMs, now)) {
return false;
}
memory.check(scopedKey, seenAt);
return true;
} catch (error) {
onDiskError?.(error);
return memory.peek(scopedKey, now);
}
}
async function warmup(namespace = "global", onError?: (error: unknown) => void): Promise<number> {
const filePath = options.resolveFilePath(namespace);
const now = Date.now();
@@ -164,8 +246,8 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
if (!trimmed) {
return true;
}
const namespace = dedupeOptions?.namespace?.trim() || "global";
const scopedKey = `${namespace}:${trimmed}`;
const namespace = resolveNamespace(dedupeOptions?.namespace);
const scopedKey = resolveScopedKey(namespace, trimmed);
if (inflight.has(scopedKey)) {
return false;
}
@@ -181,10 +263,161 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis
}
}
async function hasRecent(
key: string,
dedupeOptions?: PersistentDedupeCheckOptions,
): Promise<boolean> {
const trimmed = key.trim();
if (!trimmed) {
return false;
}
const namespace = resolveNamespace(dedupeOptions?.namespace);
const scopedKey = resolveScopedKey(namespace, trimmed);
const onDiskError = dedupeOptions?.onDiskError ?? options.onDiskError;
const now = dedupeOptions?.now ?? Date.now();
return hasRecentInner(trimmed, namespace, scopedKey, now, onDiskError);
}
return {
checkAndRecord,
hasRecent,
warmup,
clearMemory: () => memory.clear(),
memorySize: () => memory.size(),
};
}
function createReleasedClaimError(scopedKey: string): Error {
return new Error(`claim released before commit: ${scopedKey}`);
}
/** Create a claim/commit/release dedupe guard backed by memory and optional persistent storage. */
export function createClaimableDedupe(options: ClaimableDedupeOptions): ClaimableDedupe {
const ttlMs = Math.max(0, Math.floor(options.ttlMs));
const memoryMaxSize = Math.max(0, Math.floor(options.memoryMaxSize));
const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize });
const persistent =
options.resolveFilePath != null
? createPersistentDedupe({
ttlMs,
memoryMaxSize,
fileMaxEntries: Math.max(1, Math.floor(options.fileMaxEntries)),
resolveFilePath: options.resolveFilePath,
lockOptions: options.lockOptions,
onDiskError: options.onDiskError,
})
: null;
const inflight = new Map<
string,
{
promise: Promise<boolean>;
resolve: (result: boolean) => void;
reject: (error: unknown) => void;
}
>();
async function hasRecent(
key: string,
dedupeOptions?: PersistentDedupeCheckOptions,
): Promise<boolean> {
const trimmed = key.trim();
if (!trimmed) {
return false;
}
const namespace = resolveNamespace(dedupeOptions?.namespace);
const scopedKey = resolveScopedKey(namespace, trimmed);
if (persistent) {
return persistent.hasRecent(trimmed, dedupeOptions);
}
return memory.peek(scopedKey, dedupeOptions?.now);
}
async function claim(
key: string,
dedupeOptions?: PersistentDedupeCheckOptions,
): Promise<ClaimableDedupeClaimResult> {
const trimmed = key.trim();
if (!trimmed) {
return { kind: "claimed" };
}
const namespace = resolveNamespace(dedupeOptions?.namespace);
const scopedKey = resolveScopedKey(namespace, trimmed);
const existing = inflight.get(scopedKey);
if (existing) {
return { kind: "inflight", pending: existing.promise };
}
if (await hasRecent(trimmed, dedupeOptions)) {
return { kind: "duplicate" };
}
let resolve!: (result: boolean) => void;
let reject!: (error: unknown) => void;
const promise = new Promise<boolean>((resolvePromise, rejectPromise) => {
resolve = resolvePromise;
reject = rejectPromise;
});
void promise.catch(() => {});
inflight.set(scopedKey, { promise, resolve, reject });
return { kind: "claimed" };
}
async function commit(
key: string,
dedupeOptions?: PersistentDedupeCheckOptions,
): Promise<boolean> {
const trimmed = key.trim();
if (!trimmed) {
return true;
}
const namespace = resolveNamespace(dedupeOptions?.namespace);
const scopedKey = resolveScopedKey(namespace, trimmed);
const claim = inflight.get(scopedKey);
try {
const recorded = persistent
? await persistent.checkAndRecord(trimmed, dedupeOptions)
: !memory.check(scopedKey, dedupeOptions?.now);
claim?.resolve(recorded);
return recorded;
} catch (error) {
claim?.reject(error);
throw error;
} finally {
inflight.delete(scopedKey);
}
}
function release(
key: string,
dedupeOptions?: {
namespace?: string;
error?: unknown;
},
): void {
const trimmed = key.trim();
if (!trimmed) {
return;
}
const namespace = resolveNamespace(dedupeOptions?.namespace);
const scopedKey = resolveScopedKey(namespace, trimmed);
const claim = inflight.get(scopedKey);
if (!claim) {
return;
}
claim.reject(dedupeOptions?.error ?? createReleasedClaimError(scopedKey));
inflight.delete(scopedKey);
}
return {
claim,
commit,
release,
hasRecent,
warmup: persistent?.warmup ?? (async () => 0),
clearMemory: () => {
persistent?.clearMemory();
memory.clear();
},
memorySize: () => persistent?.memorySize() ?? memory.size(),
};
}