fix(plugin-state): evict current namespace on plugin row cap

Make plugin-state enforce the plugin-wide live-row fuse by evicting only from the namespace currently being written, preserving sibling namespace rows and still failing atomically when the current namespace cannot free enough rows.

Raise the plugin-wide cap to 6,000 rows, keep Telegram's persistent message-cache namespace at 3,000 entries, and document the updated SDK runtime contract. Harden legacy plugin-state import so capacity pressure cannot archive a source after losing imported keys, with focused regression coverage for Telegram-shaped namespaces and migration rollback.

Also restore the Docker runtime-assets preflight step in full release validation so release workflow contract tests stay aligned.

Verification: focused plugin-state, migration, Telegram, workflow-contract, lint, deprecated-API, diff-check, Blacksmith Testbox, CI, CodeQL, Workflow Sanity, OpenGrep, and autoreview all passed on PR head fee021cfa6.

Co-authored-by: Keshav's Bot <keshavbotagent@gmail.com>
This commit is contained in:
keshavbotagent
2026-05-28 01:03:40 +05:30
committed by GitHub
parent 90f30075aa
commit e339586750
10 changed files with 291 additions and 40 deletions

View File

@@ -240,6 +240,16 @@ jobs:
fetch-depth: 1
persist-credentials: true
- name: Verify Docker runtime-assets prune path
env:
DOCKER_BUILDKIT: "1"
run: |
set -euo pipefail
timeout --kill-after=30s 35m docker build \
--target runtime-assets \
--build-arg OPENCLAW_EXTENSIONS="diagnostics-otel,codex" \
.
- name: Build and smoke test final Docker runtime image
env:
DOCKER_BUILDKIT: "1"

View File

@@ -524,7 +524,7 @@ two-party event loops that do not go through the shared inbound reply runner.
await store.clear();
```
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry.
Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Use `registerIfAbsent(...)` for atomic dedupe claims: it returns `true` when the key was missing or expired and registered, or `false` when a live value already exists without overwriting its value, creation time, or TTL. Limits: `maxEntries` per namespace, 6,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry. When a write would exceed the plugin row cap, the runtime may evict the oldest live rows from the namespace being written; sibling namespaces are not evicted for that write, and the write still fails if the namespace cannot free enough rows.
<Warning>
Bundled plugins only in this release.

View File

@@ -7,6 +7,7 @@ import {
createTelegramMessageCache,
resetTelegramMessageCacheBucketsForTest,
resolveTelegramMessageCachePath,
TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES,
type TelegramMessageCachePersistentStore,
} from "./message-cache.js";
@@ -24,7 +25,7 @@ type PersistedCacheValue = {
let persistentStoreId = 0;
function createMemoryPersistentStore(maxEntries = 1000): {
function createMemoryPersistentStore(maxEntries = TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES): {
bucketKey: string;
entries: Map<string, PersistedCacheValue>;
store: TelegramMessageCachePersistentStore;

View File

@@ -82,7 +82,7 @@ type TelegramCachedMessageObservation = {
type TelegramEmbeddedReplyMessage = NonNullable<Message["reply_to_message"]>;
const DEFAULT_MAX_MESSAGES = 5000;
export const TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES = 1000;
export const TELEGRAM_MESSAGE_CACHE_PERSISTENT_MAX_MESSAGES = 3000;
export const TELEGRAM_MESSAGE_CACHE_PERSISTENT_NAMESPACE = "telegram.message-cache";
const PERSISTENT_BUCKET_KEY = `plugin-state:${TELEGRAM_MESSAGE_CACHE_PERSISTENT_NAMESPACE}`;
const COMPACT_THRESHOLD_RATIO = 2;

View File

@@ -7,6 +7,7 @@ import {
createPluginStateKeyedStore,
resetPluginStateStoreForTests,
} from "../plugin-state/plugin-state-store.js";
import { seedPluginStateEntriesForTests } from "../plugin-state/plugin-state-store.test-helpers.js";
import {
autoMigrateLegacyStateDir,
autoMigrateLegacyState,
@@ -711,6 +712,69 @@ describe("doctor legacy state migrations", () => {
});
});
it("keeps plugin-state import source when plugin cap eviction drops an imported row", async () => {
const root = await makeTempRoot();
const sourcePath = path.join(root, "legacy-cache.json");
fs.writeFileSync(sourcePath, "legacy", "utf-8");
mockedChannelMigrationPlans.plans = [
{
kind: "plugin-state-import",
label: "Test capped cache",
sourcePath,
targetPath: "plugin state:test.capped-cache",
pluginId: "telegram",
namespace: "test.capped-cache",
maxEntries: 6_000,
scopeKey: "scope",
cleanupSource: "rename",
readEntries: () => [
{ key: "first", value: { body: "first" } },
{ key: "second", value: { body: "second" } },
],
},
];
await withStateDir(root, async () => {
seedPluginStateEntriesForTests(
Array.from({ length: 5_999 }, (_, index) => ({
pluginId: "telegram",
namespace: "test.sibling-cache",
key: `sibling-${index}`,
value: { body: "sibling" },
})),
);
});
resetPluginStateStoreForTests();
const detected = await detectLegacyStateMigrations({
cfg: {},
env: { OPENCLAW_STATE_DIR: root } as NodeJS.ProcessEnv,
});
const result = await runLegacyStateMigrations({ detected });
expect(result.warnings).toStrictEqual([
"Skipped migrating Test capped cache because plugin state has room for 1 of 2 missing entries; left legacy source in place",
]);
expect(result.changes).not.toContain("Migrated 2 Test capped cache entries → plugin state");
expect(result.changes).not.toContain(
`Archived Test capped cache legacy source → ${sourcePath}.migrated`,
);
expect(fs.existsSync(sourcePath)).toBe(true);
expect(fs.existsSync(`${sourcePath}.migrated`)).toBe(false);
await withStateDir(root, async () => {
const store = createPluginStateKeyedStore<{ body: string }>("telegram", {
namespace: "test.capped-cache",
maxEntries: 6_000,
});
const valuesByKey = new Map(
(await store.entries()).map(({ key, value }) => [key, value.body]),
);
expect(valuesByKey.has("scope:first")).toBe(false);
expect(valuesByKey.has("scope:second")).toBe(false);
});
});
it("routes legacy state to the default agent entry", async () => {
const root = await makeTempRoot();
const cfg: OpenClawConfig = {

View File

@@ -19,7 +19,11 @@ import { canonicalizeMainSessionAlias } from "../config/sessions/main-session.js
import type { SessionScope } from "../config/sessions/types.js";
import type { OpenClawConfig } from "../config/types.openclaw.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import { createPluginStateKeyedStore } from "../plugin-state/plugin-state-store.js";
import {
countPluginStateLiveEntries,
createPluginStateKeyedStore,
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
} from "../plugin-state/plugin-state-store.js";
import {
buildAgentMainSessionKey,
DEFAULT_AGENT_ID,
@@ -127,6 +131,15 @@ function resolvePluginStateImportTargetKey(scopeKey: string, key: string): strin
return scopeKey ? `${scopeKey}:${key}` : key;
}
function findMissingKey(expected: Set<string>, actual: Set<string>): string | undefined {
for (const key of expected) {
if (!actual.has(key)) {
return key;
}
}
return undefined;
}
async function withPluginStateImportEnv<T>(
plan: Extract<ChannelLegacyStateMigrationPlan, { kind: "plugin-state-import" }>,
run: () => Promise<T>,
@@ -155,13 +168,15 @@ async function runLegacyMigrationPlans(
for (const plan of plans) {
if (plan.kind === "plugin-state-import") {
await withPluginStateImportEnv(plan, async () => {
let storeEntries: Array<{ key: string }> = [];
let storeEntries: Array<{ key: string; value: unknown }> = [];
let pluginEntryCount = 0;
const store = createPluginStateKeyedStore<unknown>(plan.pluginId, {
namespace: plan.namespace,
maxEntries: plan.maxEntries,
});
try {
storeEntries = await store.entries();
pluginEntryCount = countPluginStateLiveEntries(plan.pluginId);
} catch (err) {
warnings.push(
`Failed reading ${plan.label} plugin state before migration: ${String(err)}`,
@@ -169,9 +184,25 @@ async function runLegacyMigrationPlans(
return;
}
const existingKeys = new Set(storeEntries.map(({ key }) => key));
const existingValuesByKey = new Map(storeEntries.map(({ key, value }) => [key, value]));
const expectedKeys = new Set(existingKeys);
let remainingCapacity = Math.max(0, plan.maxEntries - storeEntries.length);
const entries = await plan.readEntries();
const missingEntries = entries.filter(
({ key }) => !existingKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
);
const pluginRemainingCapacity = Math.max(
0,
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN - pluginEntryCount,
);
if (missingEntries.length > pluginRemainingCapacity) {
warnings.push(
`Skipped migrating ${plan.label} because plugin state has room for ${pluginRemainingCapacity} of ${missingEntries.length} missing entries; left legacy source in place`,
);
return;
}
let imported = 0;
const importedKeys: string[] = [];
for (const entry of entries) {
const targetKey = resolvePluginStateImportTargetKey(plan.scopeKey, entry.key);
if (existingKeys.has(targetKey)) {
@@ -182,7 +213,26 @@ async function runLegacyMigrationPlans(
}
try {
await store.register(targetKey, entry.value);
const nextExpectedKeys = new Set(expectedKeys);
nextExpectedKeys.add(targetKey);
const liveKeys = new Set((await store.entries()).map(({ key }) => key));
const missingKey = findMissingKey(nextExpectedKeys, liveKeys);
if (missingKey) {
for (const importedKey of importedKeys.toReversed()) {
await store.delete(importedKey);
}
await store.delete(targetKey);
if (existingValuesByKey.has(missingKey)) {
await store.register(missingKey, existingValuesByKey.get(missingKey));
}
warnings.push(
`Stopped migrating ${plan.label} because plugin state cap evicted ${missingKey}; left legacy source in place`,
);
return;
}
expectedKeys.add(targetKey);
existingKeys.add(targetKey);
importedKeys.push(targetKey);
remainingCapacity--;
imported++;
} catch (err) {
@@ -194,10 +244,14 @@ async function runLegacyMigrationPlans(
`Migrated ${imported} ${plan.label} ${imported === 1 ? "entry" : "entries"} → plugin state`,
);
}
let cleanupKeys = existingKeys;
if (plan.cleanupSource === "rename") {
cleanupKeys = expectedKeys;
}
const allEntriesCovered =
entries.length > 0 &&
entries.every(({ key }) =>
existingKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
cleanupKeys.has(resolvePluginStateImportTargetKey(plan.scopeKey, key)),
);
if (allEntriesCovered && plan.cleanupSource === "rename" && fileExists(plan.sourcePath)) {
const archivedPath = `${plan.sourcePath}.migrated`;

View File

@@ -196,10 +196,9 @@ describe("limits", () => {
it("enforces the per-plugin live-row cap", async () => {
await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => {
// Spread MAX_ENTRIES_PER_PLUGIN rows across several namespaces so
// namespace eviction never fires (each namespace has generous room).
// Fill the plugin budget outside the namespace that attempts the write.
const nsCount = 10;
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100
const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount;
seedPluginStateEntriesForTests(
Array.from({ length: MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN }, (_, index) => {
const ns = Math.floor(index / perNs);
@@ -213,8 +212,8 @@ describe("limits", () => {
}),
);
const store = createPluginStateKeyedStore("fixture-plugin", {
namespace: "ns-0",
maxEntries: perNs + 1,
namespace: "overflow-ns",
maxEntries: 10,
});
// One more row tips over the plugin-wide limit.

View File

@@ -16,7 +16,8 @@ const PLUGIN_STATE_SCHEMA_VERSION = 1;
const PLUGIN_STATE_DIR_MODE = 0o700;
const PLUGIN_STATE_FILE_MODE = 0o600;
const PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const;
const MAX_ENTRIES_PER_PLUGIN = 1_000;
// Plugin-wide fuse only; namespace maxEntries still owns normal cache eviction.
const MAX_ENTRIES_PER_PLUGIN = 6_000;
export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536;
export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN;
@@ -421,7 +422,24 @@ function enforcePostRegisterLimits(params: {
| CountRow
| undefined,
);
if (pluginCount > MAX_ENTRIES_PER_PLUGIN) {
if (pluginCount <= MAX_ENTRIES_PER_PLUGIN) {
return;
}
// Shed rows from the namespace that grew before failing the plugin write.
params.store.statements.deleteOldestNamespace.run(
params.pluginId,
params.namespace,
params.protectedKey,
params.now,
pluginCount - MAX_ENTRIES_PER_PLUGIN,
);
const remainingPluginCount = countRow(
params.store.statements.countLivePlugin.get(params.pluginId, params.now) as
| CountRow
| undefined,
);
if (remainingPluginCount > MAX_ENTRIES_PER_PLUGIN) {
throw createPluginStateError({
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
operation: "register",
@@ -609,6 +627,20 @@ export function pluginStateEntries(params: {
}
}
export function countPluginStateLiveEntries(pluginId: string): number {
try {
const { statements } = openPluginStateDatabase("entries");
return countRow(statements.countLivePlugin.get(pluginId, Date.now()) as CountRow | undefined);
} catch (error) {
throw wrapPluginStateError(
error,
"entries",
"PLUGIN_STATE_READ_FAILED",
"Failed to count plugin state entries.",
);
}
}
export function pluginStateClear(params: { pluginId: string; namespace: string }): void {
try {
const { statements } = openPluginStateDatabase("clear");

View File

@@ -210,7 +210,7 @@ describe("plugin state keyed store", () => {
expect((await evicting.entries()).map((entry) => entry.key)).toEqual(["b", "c"]);
seedPluginStateEntriesForTests([
...Array.from({ length: 999 }, (_, entryIndex) => ({
...Array.from({ length: 5_999 }, (_, entryIndex) => ({
pluginId: "limited-plugin",
namespace: "limit",
key: `k-${entryIndex}`,
@@ -225,12 +225,16 @@ describe("plugin state keyed store", () => {
]);
const limited = createPluginStateKeyedStore("limited-plugin", {
namespace: "limit",
maxEntries: 1_001,
maxEntries: 6_001,
});
await expectPluginStateStoreError(limited.registerIfAbsent("overflow", { overflow: true }), {
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
const sibling = createPluginStateKeyedStore("limited-plugin", {
namespace: "sibling",
maxEntries: 10,
});
await expect(limited.lookup("overflow")).resolves.toBeUndefined();
await expect(limited.registerIfAbsent("overflow", { overflow: true })).resolves.toBe(true);
await expect(limited.lookup("k-0")).resolves.toBeUndefined();
await expect(limited.lookup("overflow")).resolves.toEqual({ overflow: true });
await expect(sibling.lookup("k-0")).resolves.toEqual({ sibling: true });
});
});
@@ -335,40 +339,125 @@ describe("plugin state keyed store", () => {
});
});
it("rejects when the per-plugin live row ceiling would be exceeded without evicting siblings", async () => {
it("evicts current namespace rows when sibling namespaces consume plugin row budget", async () => {
await withPluginStateTestState(async () => {
seedPluginStateEntriesForTests([
...Array.from({ length: 999 }, (_, entryIndex) => ({
pluginId: "discord",
namespace: "limit",
...Array.from({ length: 5_989 }, (_, entryIndex) => ({
pluginId: "telegram",
namespace: "telegram.message-cache",
key: `k-${entryIndex}`,
value: { namespaceIndex: 0, entryIndex },
value: { kind: "message", entryIndex },
})),
...Array.from({ length: 11 }, (_, entryIndex) => ({
pluginId: "telegram",
namespace: "telegram.topic-name-cache",
key: `topic-${entryIndex}`,
value: { kind: "topic", entryIndex },
})),
{
pluginId: "discord",
namespace: "sibling",
key: "k-0",
value: { namespaceIndex: 1, entryIndex: 0 },
},
]);
const limitStore = createPluginStateKeyedStore("discord", {
namespace: "limit",
maxEntries: 1_001,
const messageStore = createPluginStateKeyedStore("telegram", {
namespace: "telegram.message-cache",
maxEntries: 6_000,
});
const siblingStore = createPluginStateKeyedStore("discord", {
namespace: "sibling",
maxEntries: 10,
const topicStore = createPluginStateKeyedStore("telegram", {
namespace: "telegram.topic-name-cache",
maxEntries: 100,
});
await expectPluginStateStoreError(limitStore.register("overflow", { overflow: true }), {
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
await expect(
messageStore.register("new-message", { kind: "message", fresh: true }),
).resolves.toBeUndefined();
await expect(messageStore.lookup("k-0")).resolves.toBeUndefined();
await expect(messageStore.lookup("new-message")).resolves.toEqual({
kind: "message",
fresh: true,
});
await expect(siblingStore.lookup("k-0")).resolves.toEqual({
namespaceIndex: 1,
await expect(topicStore.lookup("topic-0")).resolves.toEqual({
kind: "topic",
entryIndex: 0,
});
await expect(limitStore.lookup("overflow")).resolves.toBeUndefined();
await expect(messageStore.entries()).resolves.toHaveLength(5_989);
await expect(topicStore.entries()).resolves.toHaveLength(11);
});
});
it("leaves room for Telegram sibling namespaces at their persistent budgets", async () => {
await withPluginStateTestState(async () => {
seedPluginStateEntriesForTests([
...Array.from({ length: 3_000 }, (_, entryIndex) => ({
pluginId: "telegram",
namespace: "telegram.message-cache",
key: `message-${entryIndex}`,
value: { kind: "message", entryIndex },
})),
...Array.from({ length: 2_047 }, (_, entryIndex) => ({
pluginId: "telegram",
namespace: "telegram.topic-name-cache",
key: `topic-${entryIndex}`,
value: { kind: "topic", updatedAt: entryIndex },
})),
...Array.from({ length: 127 }, (_, entryIndex) => ({
pluginId: "telegram",
namespace: "telegram.bot-info-cache",
key: `bot-${entryIndex}`,
value: { kind: "bot-info", fetchedAt: String(entryIndex) },
})),
]);
const topicStore = createPluginStateKeyedStore("telegram", {
namespace: "telegram.topic-name-cache",
maxEntries: 2_048,
});
const botInfoStore = createPluginStateKeyedStore("telegram", {
namespace: "telegram.bot-info-cache",
maxEntries: 128,
});
await expect(
topicStore.register("topic-final", { kind: "topic", updatedAt: 2_048 }),
).resolves.toBeUndefined();
await expect(
botInfoStore.register("default", { kind: "bot-info", fetchedAt: "now" }),
).resolves.toBeUndefined();
await expect(topicStore.lookup("topic-final")).resolves.toEqual({
kind: "topic",
updatedAt: 2_048,
});
await expect(botInfoStore.lookup("default")).resolves.toEqual({
kind: "bot-info",
fetchedAt: "now",
});
});
});
it("rejects plugin overflow when the current namespace cannot shed old rows", async () => {
await withPluginStateTestState(async () => {
seedPluginStateEntriesForTests(
Array.from({ length: 6_000 }, (_, entryIndex) => ({
pluginId: "telegram",
namespace: "telegram.topic-name-cache",
key: `topic-${entryIndex}`,
value: { entryIndex },
})),
);
const messageStore = createPluginStateKeyedStore("telegram", {
namespace: "telegram.message-cache",
maxEntries: 6_000,
});
const topicStore = createPluginStateKeyedStore("telegram", {
namespace: "telegram.topic-name-cache",
maxEntries: 6_000,
});
await expectPluginStateStoreError(messageStore.register("new-message", { fresh: true }), {
code: "PLUGIN_STATE_LIMIT_EXCEEDED",
});
await expect(messageStore.lookup("new-message")).resolves.toBeUndefined();
await expect(topicStore.lookup("topic-0")).resolves.toEqual({ entryIndex: 0 });
});
});

View File

@@ -30,6 +30,8 @@ export type {
export { PluginStateStoreError } from "./plugin-state-store.types.js";
export {
closePluginStateSqliteStore,
countPluginStateLiveEntries,
MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN,
isPluginStateDatabaseOpen,
probePluginStateStore,
sweepExpiredPluginStateEntries,