From 028434a00ffd52f841a501e3a0d7b0c9ab880f10 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 15:03:54 +0100 Subject: [PATCH] feat(plugin-sdk): add claimable dedupe helper --- src/plugin-sdk/persistent-dedupe.test.ts | 77 +++++++- src/plugin-sdk/persistent-dedupe.ts | 237 ++++++++++++++++++++++- 2 files changed, 311 insertions(+), 3 deletions(-) diff --git a/src/plugin-sdk/persistent-dedupe.test.ts b/src/plugin-sdk/persistent-dedupe.test.ts index fbf4282d6aa..f072482b33f 100644 --- a/src/plugin-sdk/persistent-dedupe.test.ts +++ b/src/plugin-sdk/persistent-dedupe.test.ts @@ -1,6 +1,6 @@ import path from "node:path"; import { describe, expect, it } from "vitest"; -import { createPersistentDedupe } from "./persistent-dedupe.js"; +import { createClaimableDedupe, createPersistentDedupe } from "./persistent-dedupe.js"; import { createPluginSdkTestHarness } from "./test-helpers.js"; const { createTempDir } = createPluginSdkTestHarness(); @@ -64,6 +64,17 @@ describe("createPersistentDedupe", () => { expect(await reader.checkAndRecord("msg-3", { namespace: "acct" })).toBe(true); }); + it("checks for recent keys without mutating the store", async () => { + const root = await createTempDir("openclaw-dedupe-"); + const writer = createDedupe(root); + expect(await writer.checkAndRecord("peek-me", { namespace: "acct" })).toBe(true); + + const reader = createDedupe(root); + expect(await reader.hasRecent("peek-me", { namespace: "acct" })).toBe(true); + expect(await reader.hasRecent("missing", { namespace: "acct" })).toBe(false); + expect(await reader.checkAndRecord("peek-me", { namespace: "acct" })).toBe(false); + }); + it.each([ { name: "returns 0 when no disk file exists", @@ -98,3 +109,67 @@ describe("createPersistentDedupe", () => { await verify(reader); }); }); + +describe("createClaimableDedupe", () => { + it("mirrors concurrent in-flight duplicates and records on commit", async () => { + const dedupe = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + }); + + await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "claimed" }); + const duplicate = await dedupe.claim("line:evt-1"); + expect(duplicate.kind).toBe("inflight"); + + const commit = dedupe.commit("line:evt-1"); + await expect(commit).resolves.toBe(true); + if (duplicate.kind === "inflight") { + await expect(duplicate.pending).resolves.toBe(true); + } + await expect(dedupe.claim("line:evt-1")).resolves.toEqual({ kind: "duplicate" }); + }); + + it("rejects waiting duplicates when the active claim releases with an error", async () => { + const dedupe = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + }); + + await expect(dedupe.claim("line:evt-2")).resolves.toEqual({ kind: "claimed" }); + const duplicate = await dedupe.claim("line:evt-2"); + expect(duplicate.kind).toBe("inflight"); + + const failure = new Error("transient failure"); + dedupe.release("line:evt-2", { error: failure }); + if (duplicate.kind === "inflight") { + await expect(duplicate.pending).rejects.toThrow("transient failure"); + } + await expect(dedupe.claim("line:evt-2")).resolves.toEqual({ kind: "claimed" }); + }); + + it("supports persistent-backed recent checks and warmup", async () => { + const root = await createTempDir("openclaw-claimable-dedupe-"); + const writer = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath: (namespace) => path.join(root, `${namespace}.json`), + }); + + await expect(writer.claim("m1", { namespace: "acct" })).resolves.toEqual({ kind: "claimed" }); + await expect(writer.commit("m1", { namespace: "acct" })).resolves.toBe(true); + + const reader = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + fileMaxEntries: 1000, + resolveFilePath: (namespace) => path.join(root, `${namespace}.json`), + }); + + expect(await reader.hasRecent("m1", { namespace: "acct" })).toBe(true); + expect(await reader.warmup("acct")).toBe(1); + await expect(reader.claim("m1", { namespace: "acct" })).resolves.toEqual({ + kind: "duplicate", + }); + }); +}); diff --git a/src/plugin-sdk/persistent-dedupe.ts b/src/plugin-sdk/persistent-dedupe.ts index 7a6ea159841..e07d45073dd 100644 --- a/src/plugin-sdk/persistent-dedupe.ts +++ b/src/plugin-sdk/persistent-dedupe.ts @@ -22,6 +22,49 @@ export type PersistentDedupeCheckOptions = { export type PersistentDedupe = { checkAndRecord: (key: string, options?: PersistentDedupeCheckOptions) => Promise; + hasRecent: (key: string, options?: PersistentDedupeCheckOptions) => Promise; + warmup: (namespace?: string, onError?: (error: unknown) => void) => Promise; + clearMemory: () => void; + memorySize: () => number; +}; + +export type ClaimableDedupeClaimResult = + | { kind: "claimed" } + | { kind: "duplicate" } + | { kind: "inflight"; pending: Promise }; + +export type ClaimableDedupeOptions = + | { + ttlMs: number; + memoryMaxSize: number; + resolveFilePath: (namespace: string) => string; + fileMaxEntries: number; + lockOptions?: Partial; + onDiskError?: (error: unknown) => void; + } + | { + ttlMs: number; + memoryMaxSize: number; + resolveFilePath?: undefined; + fileMaxEntries?: undefined; + lockOptions?: undefined; + onDiskError?: undefined; + }; + +export type ClaimableDedupe = { + claim: ( + key: string, + options?: PersistentDedupeCheckOptions, + ) => Promise; + commit: (key: string, options?: PersistentDedupeCheckOptions) => Promise; + release: ( + key: string, + options?: { + namespace?: string; + error?: unknown; + }, + ) => void; + hasRecent: (key: string, options?: PersistentDedupeCheckOptions) => Promise; warmup: (namespace?: string, onError?: (error: unknown) => void) => Promise; clearMemory: () => void; memorySize: () => number; @@ -91,6 +134,18 @@ function pruneData( }); } +function resolveNamespace(namespace?: string): string { + return namespace?.trim() || "global"; +} + +function resolveScopedKey(namespace: string, key: string): string { + return `${namespace}:${key}`; +} + +function isRecentTimestamp(seenAt: number | undefined, ttlMs: number, now: number): boolean { + return seenAt != null && (ttlMs <= 0 || now - seenAt < ttlMs); +} + /** Create a dedupe helper that combines in-memory fast checks with a lock-protected disk store. */ export function createPersistentDedupe(options: PersistentDedupeOptions): PersistentDedupe { const ttlMs = Math.max(0, Math.floor(options.ttlMs)); @@ -134,6 +189,33 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis } } + async function hasRecentInner( + key: string, + namespace: string, + scopedKey: string, + now: number, + onDiskError?: (error: unknown) => void, + ): Promise { + if (memory.peek(scopedKey, now)) { + return true; + } + + const path = options.resolveFilePath(namespace); + try { + const { value } = await readJsonFileWithFallback(path, {}); + const data = sanitizeData(value); + const seenAt = data[key]; + if (!isRecentTimestamp(seenAt, ttlMs, now)) { + return false; + } + memory.check(scopedKey, seenAt); + return true; + } catch (error) { + onDiskError?.(error); + return memory.peek(scopedKey, now); + } + } + async function warmup(namespace = "global", onError?: (error: unknown) => void): Promise { const filePath = options.resolveFilePath(namespace); const now = Date.now(); @@ -164,8 +246,8 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis if (!trimmed) { return true; } - const namespace = dedupeOptions?.namespace?.trim() || "global"; - const scopedKey = `${namespace}:${trimmed}`; + const namespace = resolveNamespace(dedupeOptions?.namespace); + const scopedKey = resolveScopedKey(namespace, trimmed); if (inflight.has(scopedKey)) { return false; } @@ -181,10 +263,161 @@ export function createPersistentDedupe(options: PersistentDedupeOptions): Persis } } + async function hasRecent( + key: string, + dedupeOptions?: PersistentDedupeCheckOptions, + ): Promise { + const trimmed = key.trim(); + if (!trimmed) { + return false; + } + const namespace = resolveNamespace(dedupeOptions?.namespace); + const scopedKey = resolveScopedKey(namespace, trimmed); + const onDiskError = dedupeOptions?.onDiskError ?? options.onDiskError; + const now = dedupeOptions?.now ?? Date.now(); + return hasRecentInner(trimmed, namespace, scopedKey, now, onDiskError); + } + return { checkAndRecord, + hasRecent, warmup, clearMemory: () => memory.clear(), memorySize: () => memory.size(), }; } + +function createReleasedClaimError(scopedKey: string): Error { + return new Error(`claim released before commit: ${scopedKey}`); +} + +/** Create a claim/commit/release dedupe guard backed by memory and optional persistent storage. */ +export function createClaimableDedupe(options: ClaimableDedupeOptions): ClaimableDedupe { + const ttlMs = Math.max(0, Math.floor(options.ttlMs)); + const memoryMaxSize = Math.max(0, Math.floor(options.memoryMaxSize)); + const memory = createDedupeCache({ ttlMs, maxSize: memoryMaxSize }); + const persistent = + options.resolveFilePath != null + ? createPersistentDedupe({ + ttlMs, + memoryMaxSize, + fileMaxEntries: Math.max(1, Math.floor(options.fileMaxEntries)), + resolveFilePath: options.resolveFilePath, + lockOptions: options.lockOptions, + onDiskError: options.onDiskError, + }) + : null; + + const inflight = new Map< + string, + { + promise: Promise; + resolve: (result: boolean) => void; + reject: (error: unknown) => void; + } + >(); + + async function hasRecent( + key: string, + dedupeOptions?: PersistentDedupeCheckOptions, + ): Promise { + const trimmed = key.trim(); + if (!trimmed) { + return false; + } + const namespace = resolveNamespace(dedupeOptions?.namespace); + const scopedKey = resolveScopedKey(namespace, trimmed); + if (persistent) { + return persistent.hasRecent(trimmed, dedupeOptions); + } + return memory.peek(scopedKey, dedupeOptions?.now); + } + + async function claim( + key: string, + dedupeOptions?: PersistentDedupeCheckOptions, + ): Promise { + const trimmed = key.trim(); + if (!trimmed) { + return { kind: "claimed" }; + } + const namespace = resolveNamespace(dedupeOptions?.namespace); + const scopedKey = resolveScopedKey(namespace, trimmed); + const existing = inflight.get(scopedKey); + if (existing) { + return { kind: "inflight", pending: existing.promise }; + } + if (await hasRecent(trimmed, dedupeOptions)) { + return { kind: "duplicate" }; + } + + let resolve!: (result: boolean) => void; + let reject!: (error: unknown) => void; + const promise = new Promise((resolvePromise, rejectPromise) => { + resolve = resolvePromise; + reject = rejectPromise; + }); + void promise.catch(() => {}); + inflight.set(scopedKey, { promise, resolve, reject }); + return { kind: "claimed" }; + } + + async function commit( + key: string, + dedupeOptions?: PersistentDedupeCheckOptions, + ): Promise { + const trimmed = key.trim(); + if (!trimmed) { + return true; + } + const namespace = resolveNamespace(dedupeOptions?.namespace); + const scopedKey = resolveScopedKey(namespace, trimmed); + const claim = inflight.get(scopedKey); + try { + const recorded = persistent + ? await persistent.checkAndRecord(trimmed, dedupeOptions) + : !memory.check(scopedKey, dedupeOptions?.now); + claim?.resolve(recorded); + return recorded; + } catch (error) { + claim?.reject(error); + throw error; + } finally { + inflight.delete(scopedKey); + } + } + + function release( + key: string, + dedupeOptions?: { + namespace?: string; + error?: unknown; + }, + ): void { + const trimmed = key.trim(); + if (!trimmed) { + return; + } + const namespace = resolveNamespace(dedupeOptions?.namespace); + const scopedKey = resolveScopedKey(namespace, trimmed); + const claim = inflight.get(scopedKey); + if (!claim) { + return; + } + claim.reject(dedupeOptions?.error ?? createReleasedClaimError(scopedKey)); + inflight.delete(scopedKey); + } + + return { + claim, + commit, + release, + hasRecent, + warmup: persistent?.warmup ?? (async () => 0), + clearMemory: () => { + persistent?.clearMemory(); + memory.clear(); + }, + memorySize: () => persistent?.memorySize() ?? memory.size(), + }; +}