From fcb396bf6589402e83ffbd96dd3d88c2f7f7a4f0 Mon Sep 17 00:00:00 2001 From: Alex Knight Date: Mon, 4 May 2026 18:20:04 +1000 Subject: [PATCH] feat(plugin-state): add registerIfAbsent keyed store (#77135) --- CHANGELOG.md | 1 + docs/plugins/sdk-runtime.md | 3 +- .../plugin-state-store.e2e.test.ts | 1 + .../plugin-state-store.runtime.test.ts | 3 +- src/plugin-state/plugin-state-store.sqlite.ts | 128 +++++++++++++--- src/plugin-state/plugin-state-store.test.ts | 139 ++++++++++++++++++ src/plugin-state/plugin-state-store.ts | 41 +++++- src/plugin-state/plugin-state-store.types.ts | 1 + 8 files changed, 283 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 218e7eb92b6..fe843cb3db6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ Docs: https://docs.openclaw.ai - Exec approvals: add a tree-sitter-backed shell command explainer for future approval and command-review surfaces. (#75004) Thanks @jesse-merhi. - Agents/sandbox: store sandbox container and browser registry entries as per-runtime shard files, reducing unrelated session lock contention while `openclaw doctor --fix` migrates legacy monolithic registry files. (#74831) Thanks @luckylhb90. - Plugins/ClawHub: annotate 429 errors from ClawHub with the reset window from `RateLimit-Reset`/`Retry-After` and append a `Sign in for higher rate limits.` hint when the request was unauthenticated, so users can see when downloads will recover and how to lift the cap. Thanks @romneyda. +- Plugins/runtime state: add `registerIfAbsent` for atomic keyed-store dedupe claims that return whether a plugin successfully claimed a key without overwriting an existing live value. Thanks @amknight. ### Fixes diff --git a/docs/plugins/sdk-runtime.md b/docs/plugins/sdk-runtime.md index e0dee0e8824..bd4835df097 100644 --- a/docs/plugins/sdk-runtime.md +++ b/docs/plugins/sdk-runtime.md @@ -417,12 +417,13 @@ Provider and channel execution paths must use the active runtime config snapshot }); await store.register("key-1", { value: "hello" }); + const claimed = await store.registerIfAbsent("dedupe-key", { value: "first" }); const value = await store.lookup("key-1"); await store.consume("key-1"); await store.clear(); ``` - Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry. + Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry. Bundled plugins only in this release. diff --git a/src/plugin-state/plugin-state-store.e2e.test.ts b/src/plugin-state/plugin-state-store.e2e.test.ts index 95bc5ffa593..d6ab66a72de 100644 --- a/src/plugin-state/plugin-state-store.e2e.test.ts +++ b/src/plugin-state/plugin-state-store.e2e.test.ts @@ -31,6 +31,7 @@ describe("runtime smoke", () => { }); expect(store).toBeDefined(); expect(typeof store.register).toBe("function"); + expect(typeof store.registerIfAbsent).toBe("function"); expect(typeof store.lookup).toBe("function"); expect(typeof store.consume).toBe("function"); }); diff --git a/src/plugin-state/plugin-state-store.runtime.test.ts b/src/plugin-state/plugin-state-store.runtime.test.ts index 851cf6bf0ac..b44e19e36c2 100644 --- a/src/plugin-state/plugin-state-store.runtime.test.ts +++ b/src/plugin-state/plugin-state-store.runtime.test.ts @@ -65,7 +65,8 @@ describe("plugin runtime state proxy", () => { namespace: "runtime", maxEntries: 10, }); - await store.register("k", { plugin: "discord" }); + await expect(store.registerIfAbsent("k", { plugin: "discord" })).resolves.toBe(true); + await expect(store.registerIfAbsent("k", { plugin: "duplicate" })).resolves.toBe(false); const telegram = createPluginRecord("telegram", "bundled"); registry.registry.plugins.push(telegram); diff --git a/src/plugin-state/plugin-state-store.sqlite.ts b/src/plugin-state/plugin-state-store.sqlite.ts index ab5e98b77a9..38125dceb7e 100644 --- a/src/plugin-state/plugin-state-store.sqlite.ts +++ b/src/plugin-state/plugin-state-store.sqlite.ts @@ -40,6 +40,7 @@ type UserVersionRow = { type PluginStateStatements = { upsertEntry: StatementSync; + insertEntryIfAbsent: StatementSync; selectEntry: StatementSync; selectEntries: StatementSync; deleteEntry: StatementSync; @@ -190,6 +191,23 @@ function createStatements(db: DatabaseSync): PluginStateStatements { created_at = excluded.created_at, expires_at = excluded.expires_at `), + insertEntryIfAbsent: db.prepare(` + INSERT OR IGNORE INTO plugin_state_entries ( + plugin_id, + namespace, + entry_key, + value_json, + created_at, + expires_at + ) VALUES ( + @plugin_id, + @namespace, + @entry_key, + @value_json, + @created_at, + @expires_at + ) + `), selectEntry: db.prepare(` SELECT plugin_id, namespace, entry_key, value_json, created_at, expires_at FROM plugin_state_entries @@ -363,6 +381,44 @@ function runWriteTransaction( } } +function enforcePostRegisterLimits(params: { + store: PluginStateDatabase; + pluginId: string; + namespace: string; + maxEntries: number; + now: number; +}): void { + const namespaceCount = countRow( + params.store.statements.countLiveNamespace.get( + params.pluginId, + params.namespace, + params.now, + ) as CountRow | undefined, + ); + if (namespaceCount > params.maxEntries) { + params.store.statements.deleteOldestNamespace.run( + params.pluginId, + params.namespace, + params.now, + namespaceCount - params.maxEntries, + ); + } + + const pluginCount = countRow( + params.store.statements.countLivePlugin.get(params.pluginId, params.now) as + | CountRow + | undefined, + ); + if (pluginCount > MAX_ENTRIES_PER_PLUGIN) { + throw createPluginStateError({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + operation: "register", + message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`, + path: params.store.path, + }); + } +} + export function pluginStateRegister(params: { pluginId: string; namespace: string; @@ -384,32 +440,56 @@ export function pluginStateRegister(params: { created_at: now, expires_at: expiresAt, }); + enforcePostRegisterLimits({ + store, + pluginId: params.pluginId, + namespace: params.namespace, + maxEntries: params.maxEntries, + now, + }); + }); + } catch (error) { + throw wrapPluginStateError( + error, + "register", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to register plugin state entry.", + ); + } +} - const namespaceCount = countRow( - store.statements.countLiveNamespace.get(params.pluginId, params.namespace, now) as - | CountRow - | undefined, - ); - if (namespaceCount > params.maxEntries) { - store.statements.deleteOldestNamespace.run( - params.pluginId, - params.namespace, - now, - namespaceCount - params.maxEntries, - ); - } - - const pluginCount = countRow( - store.statements.countLivePlugin.get(params.pluginId, now) as CountRow | undefined, - ); - if (pluginCount > MAX_ENTRIES_PER_PLUGIN) { - throw createPluginStateError({ - code: "PLUGIN_STATE_LIMIT_EXCEEDED", - operation: "register", - message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`, - path: store.path, - }); +export function pluginStateRegisterIfAbsent(params: { + pluginId: string; + namespace: string; + key: string; + valueJson: string; + maxEntries: number; + ttlMs?: number; +}): boolean { + try { + return runWriteTransaction("register", (store) => { + const now = Date.now(); + const expiresAt = params.ttlMs == null ? null : now + params.ttlMs; + store.statements.pruneExpiredNamespace.run(params.pluginId, params.namespace, now); + const result = store.statements.insertEntryIfAbsent.run({ + plugin_id: params.pluginId, + namespace: params.namespace, + entry_key: params.key, + value_json: params.valueJson, + created_at: now, + expires_at: expiresAt, + }); + if (result.changes === 0) { + return false; } + enforcePostRegisterLimits({ + store, + pluginId: params.pluginId, + namespace: params.namespace, + maxEntries: params.maxEntries, + now, + }); + return true; }); } catch (error) { throw wrapPluginStateError( diff --git a/src/plugin-state/plugin-state-store.test.ts b/src/plugin-state/plugin-state-store.test.ts index 3babcb9626d..94521be7af1 100644 --- a/src/plugin-state/plugin-state-store.test.ts +++ b/src/plugin-state/plugin-state-store.test.ts @@ -58,6 +58,145 @@ describe("plugin state keyed store", () => { }); }); + it("registerIfAbsent inserts the first value and preserves live duplicates", async () => { + await withOpenClawTestState({ label: "plugin-state-register-if-absent-live" }, async () => { + vi.useFakeTimers(); + const store = createPluginStateKeyedStore<{ version: number }>("discord", { + namespace: "claims", + maxEntries: 10, + }); + + vi.setSystemTime(1000); + await expect(store.registerIfAbsent("claim", { version: 1 }, { ttlMs: 1000 })).resolves.toBe( + true, + ); + vi.setSystemTime(1200); + await expect(store.registerIfAbsent("claim", { version: 2 }, { ttlMs: 5000 })).resolves.toBe( + false, + ); + + await expect(store.lookup("claim")).resolves.toEqual({ version: 1 }); + await expect(store.entries()).resolves.toMatchObject([ + { key: "claim", value: { version: 1 }, createdAt: 1000, expiresAt: 2000 }, + ]); + }); + }); + + it("registerIfAbsent replaces expired keys", async () => { + await withOpenClawTestState({ label: "plugin-state-register-if-absent-expired" }, async () => { + vi.useFakeTimers(); + const store = createPluginStateKeyedStore<{ version: number }>("discord", { + namespace: "claims-expired", + maxEntries: 10, + }); + + vi.setSystemTime(1000); + await expect(store.registerIfAbsent("claim", { version: 1 }, { ttlMs: 100 })).resolves.toBe( + true, + ); + vi.setSystemTime(1200); + await expect(store.registerIfAbsent("claim", { version: 2 })).resolves.toBe(true); + + await expect(store.lookup("claim")).resolves.toEqual({ version: 2 }); + await expect(store.entries()).resolves.toMatchObject([ + { key: "claim", value: { version: 2 }, createdAt: 1200 }, + ]); + }); + }); + + it("registerIfAbsent keeps plugin and namespace claims isolated", async () => { + await withOpenClawTestState( + { label: "plugin-state-register-if-absent-isolation" }, + async () => { + const discordA = createPluginStateKeyedStore<{ owner: string }>("discord", { + namespace: "claims-a", + maxEntries: 10, + }); + const discordB = createPluginStateKeyedStore<{ owner: string }>("discord", { + namespace: "claims-b", + maxEntries: 10, + }); + const telegramA = createPluginStateKeyedStore<{ owner: string }>("telegram", { + namespace: "claims-a", + maxEntries: 10, + }); + + await expect(discordA.registerIfAbsent("same", { owner: "discord-a" })).resolves.toBe(true); + await expect(discordB.registerIfAbsent("same", { owner: "discord-b" })).resolves.toBe(true); + await expect(telegramA.registerIfAbsent("same", { owner: "telegram-a" })).resolves.toBe( + true, + ); + await expect(discordA.registerIfAbsent("same", { owner: "overwrite" })).resolves.toBe( + false, + ); + + await expect(discordA.lookup("same")).resolves.toEqual({ owner: "discord-a" }); + await expect(discordB.lookup("same")).resolves.toEqual({ owner: "discord-b" }); + await expect(telegramA.lookup("same")).resolves.toEqual({ owner: "telegram-a" }); + }, + ); + }); + + it("registerIfAbsent only lets one parallel claimant win", async () => { + await withOpenClawTestState({ label: "plugin-state-register-if-absent-race" }, async () => { + const store = createPluginStateKeyedStore<{ claimant: number }>("discord", { + namespace: "claims-race", + maxEntries: 10, + }); + + const attempts = await Promise.all( + Array.from({ length: 25 }, async (_, claimant) => + store.registerIfAbsent("claim", { claimant }), + ), + ); + + expect(attempts.filter(Boolean)).toHaveLength(1); + const stored = await store.lookup("claim"); + expect(stored).toBeDefined(); + expect(attempts[stored?.claimant ?? -1]).toBe(true); + }); + }); + + it("registerIfAbsent preserves eviction and plugin row cap behavior", async () => { + await withOpenClawTestState({ label: "plugin-state-register-if-absent-limits" }, async () => { + vi.useFakeTimers(); + const evicting = createPluginStateKeyedStore("discord", { + namespace: "claims-evict", + maxEntries: 2, + }); + vi.setSystemTime(1000); + await evicting.registerIfAbsent("a", 1); + vi.setSystemTime(2000); + await evicting.registerIfAbsent("b", 2); + vi.setSystemTime(3000); + await evicting.registerIfAbsent("c", 3); + await expect(evicting.entries()).resolves.toMatchObject([{ key: "b" }, { key: "c" }]); + + seedPluginStateEntriesForTests([ + ...Array.from({ length: 999 }, (_, entryIndex) => ({ + pluginId: "limited-plugin", + namespace: "limit", + key: `k-${entryIndex}`, + value: { entryIndex }, + })), + { + pluginId: "limited-plugin", + namespace: "sibling", + key: "k-0", + value: { sibling: true }, + }, + ]); + const limited = createPluginStateKeyedStore("limited-plugin", { + namespace: "limit", + maxEntries: 1_001, + }); + await expect(limited.registerIfAbsent("overflow", { overflow: true })).rejects.toMatchObject({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + }); + await expect(limited.lookup("overflow")).resolves.toBeUndefined(); + }); + }); + it("returns undefined for missing lookups and consumes by deleting atomically", async () => { await withOpenClawTestState({ label: "plugin-state-consume" }, async () => { const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", { diff --git a/src/plugin-state/plugin-state-store.ts b/src/plugin-state/plugin-state-store.ts index b8002581f06..5ae358cedb9 100644 --- a/src/plugin-state/plugin-state-store.ts +++ b/src/plugin-state/plugin-state-store.ts @@ -7,6 +7,7 @@ import { pluginStateEntries, pluginStateLookup, pluginStateRegister, + pluginStateRegisterIfAbsent, } from "./plugin-state-store.sqlite.js"; import type { OpenKeyedStoreOptions, @@ -217,20 +218,44 @@ function createKeyedStoreForPluginId( const defaultTtlMs = validateOptionalTtlMs(options.defaultTtlMs); assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs }); + const prepareRegisterParams = ( + key: string, + value: T, + opts?: { ttlMs?: number }, + ): { key: string; valueJson: string; ttlMs?: number } => { + const normalizedKey = validateKey(key, "register"); + assertJsonSerializable(value); + const json = JSON.stringify(value); + assertValueSize(json); + const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs; + return { + key: normalizedKey, + valueJson: json, + ...(ttlMs != null ? { ttlMs } : {}), + }; + }; + return { async register(key, value, opts) { - const normalizedKey = validateKey(key, "register"); - assertJsonSerializable(value); - const json = JSON.stringify(value); - assertValueSize(json); - const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs; + const params = prepareRegisterParams(key, value, opts); pluginStateRegister({ pluginId, namespace, - key: normalizedKey, - valueJson: json, + key: params.key, + valueJson: params.valueJson, maxEntries, - ...(ttlMs != null ? { ttlMs } : {}), + ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), + }); + }, + async registerIfAbsent(key, value, opts) { + const params = prepareRegisterParams(key, value, opts); + return pluginStateRegisterIfAbsent({ + pluginId, + namespace, + key: params.key, + valueJson: params.valueJson, + maxEntries, + ...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}), }); }, async lookup(key) { diff --git a/src/plugin-state/plugin-state-store.types.ts b/src/plugin-state/plugin-state-store.types.ts index 8cb17bffe62..9c07e95a7b3 100644 --- a/src/plugin-state/plugin-state-store.types.ts +++ b/src/plugin-state/plugin-state-store.types.ts @@ -7,6 +7,7 @@ export type PluginStateEntry = { export type PluginStateKeyedStore = { register(key: string, value: T, opts?: { ttlMs?: number }): Promise; + registerIfAbsent(key: string, value: T, opts?: { ttlMs?: number }): Promise; lookup(key: string): Promise; consume(key: string): Promise; delete(key: string): Promise;