feat(plugin-state): add registerIfAbsent keyed store (#77135)

This commit is contained in:
Alex Knight
2026-05-04 18:20:04 +10:00
committed by GitHub
parent 071db2ca69
commit fcb396bf65
8 changed files with 283 additions and 34 deletions

View File

@@ -57,6 +57,7 @@ Docs: https://docs.openclaw.ai
- Exec approvals: add a tree-sitter-backed shell command explainer for future approval and command-review surfaces. (#75004) Thanks @jesse-merhi.
- Agents/sandbox: store sandbox container and browser registry entries as per-runtime shard files, reducing unrelated session lock contention while `openclaw doctor --fix` migrates legacy monolithic registry files. (#74831) Thanks @luckylhb90.
- Plugins/ClawHub: annotate 429 errors from ClawHub with the reset window from `RateLimit-Reset`/`Retry-After` and append a `Sign in for higher rate limits.` hint when the request was unauthenticated, so users can see when downloads will recover and how to lift the cap. Thanks @romneyda.
- Plugins/runtime state: add `registerIfAbsent` for atomic keyed-store dedupe claims that return whether a plugin successfully claimed a key without overwriting an existing live value. Thanks @amknight.
### Fixes

View File

@@ -417,12 +417,13 @@ Provider and channel execution paths must use the active runtime config snapshot
});
await store.register("key-1", { value: "hello" });
const claimed = await store.registerIfAbsent("dedupe-key", { value: "first" });
const value = await store.lookup("key-1");
await store.consume("key-1");
await store.clear();
```
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
<Warning>
Bundled plugins only in this release.

View File

@@ -31,6 +31,7 @@ describe("runtime smoke", () => {
});
expect(store).toBeDefined();
expect(typeof store.register).toBe("function");
expect(typeof store.registerIfAbsent).toBe("function");
expect(typeof store.lookup).toBe("function");
expect(typeof store.consume).toBe("function");
});

View File

@@ -65,7 +65,8 @@ describe("plugin runtime state proxy", () => {
namespace: "runtime",
maxEntries: 10,
});
await store.register("k", { plugin: "discord" });
await expect(store.registerIfAbsent("k", { plugin: "discord" })).resolves.toBe(true);
await expect(store.registerIfAbsent("k", { plugin: "duplicate" })).resolves.toBe(false);
const telegram = createPluginRecord("telegram", "bundled");
registry.registry.plugins.push(telegram);

View File

@@ -40,6 +40,7 @@ type UserVersionRow = {
type PluginStateStatements = {
upsertEntry: StatementSync;
insertEntryIfAbsent: StatementSync;
selectEntry: StatementSync;
selectEntries: StatementSync;
deleteEntry: StatementSync;
@@ -190,6 +191,23 @@ function createStatements(db: DatabaseSync): PluginStateStatements {
created_at = excluded.created_at,
expires_at = excluded.expires_at
`),
insertEntryIfAbsent: db.prepare(`
INSERT OR IGNORE INTO plugin_state_entries (
plugin_id,
namespace,
entry_key,
value_json,
created_at,
expires_at
) VALUES (
@plugin_id,
@namespace,
@entry_key,
@value_json,
@created_at,
@expires_at
)
`),
selectEntry: db.prepare(`
SELECT plugin_id, namespace, entry_key, value_json, created_at, expires_at
FROM plugin_state_entries
@@ -363,6 +381,44 @@ function runWriteTransaction<T>(
}
}
function enforcePostRegisterLimits(params: {
store: PluginStateDatabase;
pluginId: string;
namespace: string;
maxEntries: number;
now: number;
}): void {
const namespaceCount = countRow(
params.store.statements.countLiveNamespace.get(
params.pluginId,
params.namespace,
params.now,
) as CountRow | undefined,
);
if (namespaceCount > params.maxEntries) {
params.store.statements.deleteOldestNamespace.run(
params.pluginId,
params.namespace,
params.now,
namespaceCount - params.maxEntries,
);
}
const pluginCount = countRow(
params.store.statements.countLivePlugin.get(params.pluginId, params.now) as
| CountRow
| undefined,
);
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
throw createPluginStateError({
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
operation: "register",
message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`,
path: params.store.path,
});
}
}
export function pluginStateRegister(params: {
pluginId: string;
namespace: string;
@@ -384,32 +440,56 @@ export function pluginStateRegister(params: {
created_at: now,
expires_at: expiresAt,
});
enforcePostRegisterLimits({
store,
pluginId: params.pluginId,
namespace: params.namespace,
maxEntries: params.maxEntries,
now,
});
});
} catch (error) {
throw wrapPluginStateError(
error,
"register",
"PLUGIN_STATE_WRITE_FAILED",
"Failed to register plugin state entry.",
);
}
}
const namespaceCount = countRow(
store.statements.countLiveNamespace.get(params.pluginId, params.namespace, now) as
| CountRow
| undefined,
);
if (namespaceCount > params.maxEntries) {
store.statements.deleteOldestNamespace.run(
params.pluginId,
params.namespace,
now,
namespaceCount - params.maxEntries,
);
}
const pluginCount = countRow(
store.statements.countLivePlugin.get(params.pluginId, now) as CountRow | undefined,
);
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
throw createPluginStateError({
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
operation: "register",
message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`,
path: store.path,
});
export function pluginStateRegisterIfAbsent(params: {
pluginId: string;
namespace: string;
key: string;
valueJson: string;
maxEntries: number;
ttlMs?: number;
}): boolean {
try {
return runWriteTransaction("register", (store) => {
const now = Date.now();
const expiresAt = params.ttlMs == null ? null : now + params.ttlMs;
store.statements.pruneExpiredNamespace.run(params.pluginId, params.namespace, now);
const result = store.statements.insertEntryIfAbsent.run({
plugin_id: params.pluginId,
namespace: params.namespace,
entry_key: params.key,
value_json: params.valueJson,
created_at: now,
expires_at: expiresAt,
});
if (result.changes === 0) {
return false;
}
enforcePostRegisterLimits({
store,
pluginId: params.pluginId,
namespace: params.namespace,
maxEntries: params.maxEntries,
now,
});
return true;
});
} catch (error) {
throw wrapPluginStateError(

View File

@@ -58,6 +58,145 @@ describe("plugin state keyed store", () => {
});
});
it("registerIfAbsent inserts the first value and preserves live duplicates", async () => {
await withOpenClawTestState({ label: "plugin-state-register-if-absent-live" }, async () => {
vi.useFakeTimers();
const store = createPluginStateKeyedStore<{ version: number }>("discord", {
namespace: "claims",
maxEntries: 10,
});
vi.setSystemTime(1000);
await expect(store.registerIfAbsent("claim", { version: 1 }, { ttlMs: 1000 })).resolves.toBe(
true,
);
vi.setSystemTime(1200);
await expect(store.registerIfAbsent("claim", { version: 2 }, { ttlMs: 5000 })).resolves.toBe(
false,
);
await expect(store.lookup("claim")).resolves.toEqual({ version: 1 });
await expect(store.entries()).resolves.toMatchObject([
{ key: "claim", value: { version: 1 }, createdAt: 1000, expiresAt: 2000 },
]);
});
});
it("registerIfAbsent replaces expired keys", async () => {
await withOpenClawTestState({ label: "plugin-state-register-if-absent-expired" }, async () => {
vi.useFakeTimers();
const store = createPluginStateKeyedStore<{ version: number }>("discord", {
namespace: "claims-expired",
maxEntries: 10,
});
vi.setSystemTime(1000);
await expect(store.registerIfAbsent("claim", { version: 1 }, { ttlMs: 100 })).resolves.toBe(
true,
);
vi.setSystemTime(1200);
await expect(store.registerIfAbsent("claim", { version: 2 })).resolves.toBe(true);
await expect(store.lookup("claim")).resolves.toEqual({ version: 2 });
await expect(store.entries()).resolves.toMatchObject([
{ key: "claim", value: { version: 2 }, createdAt: 1200 },
]);
});
});
it("registerIfAbsent keeps plugin and namespace claims isolated", async () => {
await withOpenClawTestState(
{ label: "plugin-state-register-if-absent-isolation" },
async () => {
const discordA = createPluginStateKeyedStore<{ owner: string }>("discord", {
namespace: "claims-a",
maxEntries: 10,
});
const discordB = createPluginStateKeyedStore<{ owner: string }>("discord", {
namespace: "claims-b",
maxEntries: 10,
});
const telegramA = createPluginStateKeyedStore<{ owner: string }>("telegram", {
namespace: "claims-a",
maxEntries: 10,
});
await expect(discordA.registerIfAbsent("same", { owner: "discord-a" })).resolves.toBe(true);
await expect(discordB.registerIfAbsent("same", { owner: "discord-b" })).resolves.toBe(true);
await expect(telegramA.registerIfAbsent("same", { owner: "telegram-a" })).resolves.toBe(
true,
);
await expect(discordA.registerIfAbsent("same", { owner: "overwrite" })).resolves.toBe(
false,
);
await expect(discordA.lookup("same")).resolves.toEqual({ owner: "discord-a" });
await expect(discordB.lookup("same")).resolves.toEqual({ owner: "discord-b" });
await expect(telegramA.lookup("same")).resolves.toEqual({ owner: "telegram-a" });
},
);
});
it("registerIfAbsent only lets one parallel claimant win", async () => {
await withOpenClawTestState({ label: "plugin-state-register-if-absent-race" }, async () => {
const store = createPluginStateKeyedStore<{ claimant: number }>("discord", {
namespace: "claims-race",
maxEntries: 10,
});
const attempts = await Promise.all(
Array.from({ length: 25 }, async (_, claimant) =>
store.registerIfAbsent("claim", { claimant }),
),
);
expect(attempts.filter(Boolean)).toHaveLength(1);
const stored = await store.lookup("claim");
expect(stored).toBeDefined();
expect(attempts[stored?.claimant ?? -1]).toBe(true);
});
});
it("registerIfAbsent preserves eviction and plugin row cap behavior", async () => {
await withOpenClawTestState({ label: "plugin-state-register-if-absent-limits" }, async () => {
vi.useFakeTimers();
const evicting = createPluginStateKeyedStore<number>("discord", {
namespace: "claims-evict",
maxEntries: 2,
});
vi.setSystemTime(1000);
await evicting.registerIfAbsent("a", 1);
vi.setSystemTime(2000);
await evicting.registerIfAbsent("b", 2);
vi.setSystemTime(3000);
await evicting.registerIfAbsent("c", 3);
await expect(evicting.entries()).resolves.toMatchObject([{ key: "b" }, { key: "c" }]);
seedPluginStateEntriesForTests([
...Array.from({ length: 999 }, (_, entryIndex) => ({
pluginId: "limited-plugin",
namespace: "limit",
key: `k-${entryIndex}`,
value: { entryIndex },
})),
{
pluginId: "limited-plugin",
namespace: "sibling",
key: "k-0",
value: { sibling: true },
},
]);
const limited = createPluginStateKeyedStore("limited-plugin", {
namespace: "limit",
maxEntries: 1_001,
});
await expect(limited.registerIfAbsent("overflow", { overflow: true })).rejects.toMatchObject({
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
});
await expect(limited.lookup("overflow")).resolves.toBeUndefined();
});
});
it("returns undefined for missing lookups and consumes by deleting atomically", async () => {
await withOpenClawTestState({ label: "plugin-state-consume" }, async () => {
const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", {

View File

@@ -7,6 +7,7 @@ import {
pluginStateEntries,
pluginStateLookup,
pluginStateRegister,
pluginStateRegisterIfAbsent,
} from "./plugin-state-store.sqlite.js";
import type {
OpenKeyedStoreOptions,
@@ -217,20 +218,44 @@ function createKeyedStoreForPluginId<T>(
const defaultTtlMs = validateOptionalTtlMs(options.defaultTtlMs);
assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs });
const prepareRegisterParams = (
key: string,
value: T,
opts?: { ttlMs?: number },
): { key: string; valueJson: string; ttlMs?: number } => {
const normalizedKey = validateKey(key, "register");
assertJsonSerializable(value);
const json = JSON.stringify(value);
assertValueSize(json);
const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs;
return {
key: normalizedKey,
valueJson: json,
...(ttlMs != null ? { ttlMs } : {}),
};
};
return {
async register(key, value, opts) {
const normalizedKey = validateKey(key, "register");
assertJsonSerializable(value);
const json = JSON.stringify(value);
assertValueSize(json);
const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs;
const params = prepareRegisterParams(key, value, opts);
pluginStateRegister({
pluginId,
namespace,
key: normalizedKey,
valueJson: json,
key: params.key,
valueJson: params.valueJson,
maxEntries,
...(ttlMs != null ? { ttlMs } : {}),
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
});
},
async registerIfAbsent(key, value, opts) {
const params = prepareRegisterParams(key, value, opts);
return pluginStateRegisterIfAbsent({
pluginId,
namespace,
key: params.key,
valueJson: params.valueJson,
maxEntries,
...(params.ttlMs != null ? { ttlMs: params.ttlMs } : {}),
});
},
async lookup(key) {

View File

@@ -7,6 +7,7 @@ export type PluginStateEntry<T> = {
export type PluginStateKeyedStore<T> = {
register(key: string, value: T, opts?: { ttlMs?: number }): Promise<void>;
registerIfAbsent(key: string, value: T, opts?: { ttlMs?: number }): Promise<boolean>;
lookup(key: string): Promise<T | undefined>;
consume(key: string): Promise<T | undefined>;
delete(key: string): Promise<boolean>;