diff --git a/CHANGELOG.md b/CHANGELOG.md index c51d5441568..1479fafeb30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Docs: https://docs.openclaw.ai - CLI/QR/dependencies: internalize small terminal progress and QR wrapper helpers while keeping the real QR encoder dependency direct, reducing the default runtime dependency graph without changing QR output behavior. Thanks @vincentkoc. - Channels: add Yuanbao channel docs entrance so the Tencent Yuanbao bot appears in the channel listing and sidebar navigation. (#73443) Thanks @loongfay. - Active Memory: add optional per-conversation `allowedChatIds` and `deniedChatIds` filters so operators can enable recall only for selected direct, group, or channel conversations while keeping broad sessions skipped. (#67977) Thanks @quengh. +- Added SQLite-backed plugin state store (`api.runtime.state.openKeyedStore`) for restart-safe keyed registries with TTL, eviction, and automatic plugin isolation. Thanks @amknight. - Active Memory: return bounded partial recall summaries when the hidden memory sub-agent times out, including the default temporary-transcript path, so useful recovered context is not discarded. (#73219) Thanks @joeykrug. - Docker setup: add `OPENCLAW_SKIP_ONBOARDING` so automated Docker installs can skip the interactive onboarding step while still applying gateway defaults. (#55518) Thanks @jinjimz. - Gateway/memory: add a read-only `doctor.memory.remHarness` RPC so operator clients can preview bounded REM dreaming output without running mutation paths. (#66673) Thanks @samzong. diff --git a/docs/.generated/plugin-sdk-api-baseline.sha256 b/docs/.generated/plugin-sdk-api-baseline.sha256 index 4ceea21f1c7..df7a39d31ae 100644 --- a/docs/.generated/plugin-sdk-api-baseline.sha256 +++ b/docs/.generated/plugin-sdk-api-baseline.sha256 @@ -1,2 +1,2 @@ -597577966dfee329740d7b0a331263afc26db518fe778f0fad95e2a01da88d83 plugin-sdk-api-baseline.json -65fb1cad5e5ec1764e3ccfcfd3fbb2e5cfb938ad34b45e6416bba0c00a1d735a plugin-sdk-api-baseline.jsonl +d5b33ee6be988cd6a844a358aaa098e1f6401b151e5ee1e46dceeccddaeb7434 plugin-sdk-api-baseline.json +dffa8b4afbb085faf42a857805c43708b748111e346552d7ea4654da3bafdee7 plugin-sdk-api-baseline.jsonl diff --git a/docs/plugins/sdk-runtime.md b/docs/plugins/sdk-runtime.md index fbf4c46f4b5..1742010ce18 100644 --- a/docs/plugins/sdk-runtime.md +++ b/docs/plugins/sdk-runtime.md @@ -394,12 +394,28 @@ Provider and channel execution paths must use the active runtime config snapshot - State directory resolution. + State directory resolution and SQLite-backed keyed storage. ```typescript - const stateDir = api.runtime.state.resolveStateDir(); + const stateDir = api.runtime.state.resolveStateDir(process.env); + const store = api.runtime.state.openKeyedStore({ + namespace: "my-feature", + maxEntries: 200, + defaultTtlMs: 15 * 60_000, + }); + + await store.register("key-1", { value: "hello" }); + const value = await store.lookup("key-1"); + await store.consume("key-1"); + await store.clear(); ``` + Keyed stores survive restarts and are isolated by the runtime-bound plugin id. Limits: `maxEntries` per namespace, 1,000 live rows per plugin, JSON values under 64KB, and optional TTL expiry. + + + Bundled plugins only in this release. + + Memory tool factories and CLI. diff --git a/src/gateway/server-close.ts b/src/gateway/server-close.ts index aa2860b97f1..1a35ec499ce 100644 --- a/src/gateway/server-close.ts +++ b/src/gateway/server-close.ts @@ -7,6 +7,7 @@ import { type ChannelId, listChannelPlugins } from "../channels/plugins/index.js import { createInternalHookEvent, triggerInternalHook } from "../hooks/internal-hooks.js"; import type { HeartbeatRunner } from "../infra/heartbeat-runner.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { closePluginStateSqliteStore } from "../plugin-state/plugin-state-store.js"; import type { PluginServicesHandle } from "../plugins/services.js"; import { normalizeOptionalString } from "../shared/string-coerce.js"; @@ -290,6 +291,7 @@ export function createGatewayCloseHandler(params: { if (params.pluginServices) { await shutdownStep("plugin-services", () => params.pluginServices!.stop(), warnings); } + await shutdownStep("plugin-state-store", () => closePluginStateSqliteStore(), warnings); await shutdownStep("gmail-watcher", () => stopGmailWatcherOnDemand(), warnings); params.cron.stop(); params.heartbeatRunner.stop(); diff --git a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts index f1ba68c2a45..63dfc2047a6 100644 --- a/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts +++ b/src/plugin-sdk/test-helpers/plugin-runtime-mock.ts @@ -458,6 +458,9 @@ export function createPluginRuntimeMock(overrides: DeepPartial = }, state: { resolveStateDir: vi.fn(() => "/tmp/openclaw"), + openKeyedStore: vi.fn(() => { + throw new Error("openKeyedStore mock is not configured"); + }) as unknown as PluginRuntime["state"]["openKeyedStore"], }, tasks: { runs: { diff --git a/src/plugin-state/plugin-state-store.e2e.test.ts b/src/plugin-state/plugin-state-store.e2e.test.ts new file mode 100644 index 00000000000..c64e18e394d --- /dev/null +++ b/src/plugin-state/plugin-state-store.e2e.test.ts @@ -0,0 +1,304 @@ +import { mkdirSync } from "node:fs"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; +import { + closePluginStateSqliteStore, + createPluginStateKeyedStore, + PluginStateStoreError, + probePluginStateStore, + resetPluginStateStoreForTests, + sweepExpiredPluginStateEntries, +} from "./plugin-state-store.js"; +import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js"; +import { MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN } from "./plugin-state-store.sqlite.js"; + +afterEach(() => { + vi.useRealTimers(); + resetPluginStateStoreForTests(); +}); + +// --------------------------------------------------------------------------- +// Runtime smoke +// --------------------------------------------------------------------------- +describe("runtime smoke", () => { + it("creates and exercises a keyed store directly", async () => { + await withOpenClawTestState({ label: "e2e-smoke-load" }, async () => { + const store = createPluginStateKeyedStore<{ ready: boolean }>("fixture-plugin", { + namespace: "boot", + maxEntries: 10, + }); + expect(store).toBeDefined(); + expect(typeof store.register).toBe("function"); + expect(typeof store.lookup).toBe("function"); + expect(typeof store.consume).toBe("function"); + }); + }); + + it("writes and reads a value", async () => { + await withOpenClawTestState({ label: "e2e-smoke-rw" }, async () => { + const store = createPluginStateKeyedStore<{ msg: string }>("fixture-plugin", { + namespace: "data", + maxEntries: 10, + }); + await store.register("greeting", { msg: "hello" }); + await expect(store.lookup("greeting")).resolves.toEqual({ msg: "hello" }); + }); + }); + + it("consumes a value exactly once", async () => { + await withOpenClawTestState({ label: "e2e-smoke-consume" }, async () => { + const store = createPluginStateKeyedStore<{ token: string }>("fixture-plugin", { + namespace: "tokens", + maxEntries: 10, + }); + await store.register("one-shot", { token: "abc123" }); + + const first = await store.consume("one-shot"); + expect(first).toEqual({ token: "abc123" }); + + const second = await store.consume("one-shot"); + expect(second).toBeUndefined(); + + await expect(store.lookup("one-shot")).resolves.toBeUndefined(); + }); + }); +}); + +// --------------------------------------------------------------------------- +// Persistence +// --------------------------------------------------------------------------- +describe("persistence", () => { + it("survives close and reopen of the store", async () => { + await withOpenClawTestState({ label: "e2e-persist" }, async () => { + const storeA = createPluginStateKeyedStore<{ persisted: boolean }>("fixture-plugin", { + namespace: "durable", + maxEntries: 10, + }); + await storeA.register("key1", { persisted: true }); + await storeA.register("key2", { persisted: true }); + + // Tear down the cached DB handle and option signatures – simulates + // a full gateway restart while the on-disk DB survives. + resetPluginStateStoreForTests(); + + const storeB = createPluginStateKeyedStore<{ persisted: boolean }>("fixture-plugin", { + namespace: "durable", + maxEntries: 10, + }); + await expect(storeB.lookup("key1")).resolves.toEqual({ persisted: true }); + await expect(storeB.lookup("key2")).resolves.toEqual({ persisted: true }); + }); + }); +}); + +// --------------------------------------------------------------------------- +// TTL +// --------------------------------------------------------------------------- +describe("TTL", () => { + it("hides expired values and sweep removes the row", async () => { + await withOpenClawTestState({ label: "e2e-ttl" }, async () => { + vi.useFakeTimers(); + vi.setSystemTime(10_000); + + const store = createPluginStateKeyedStore<{ v: number }>("fixture-plugin", { + namespace: "ttl-test", + maxEntries: 10, + }); + await store.register("short", { v: 1 }, { ttlMs: 500 }); + await store.register("long", { v: 2 }, { ttlMs: 60_000 }); + + // Before expiry – both visible. + await expect(store.lookup("short")).resolves.toEqual({ v: 1 }); + await expect(store.lookup("long")).resolves.toEqual({ v: 2 }); + + // Advance past the short TTL. + vi.setSystemTime(10_600); + + // Expired value is invisible to reads. + await expect(store.lookup("short")).resolves.toBeUndefined(); + await expect(store.lookup("long")).resolves.toEqual({ v: 2 }); + + // Sweep physically removes the expired row. + const swept = sweepExpiredPluginStateEntries(); + expect(swept).toBe(1); + + // After sweep the entry list contains only the long-lived record. + const remaining = await store.entries(); + expect(remaining).toHaveLength(1); + expect(remaining[0].key).toBe("long"); + }); + }); +}); + +// --------------------------------------------------------------------------- +// Isolation +// --------------------------------------------------------------------------- +describe("isolation", () => { + it("segregates plugins sharing namespace and key", async () => { + await withOpenClawTestState({ label: "e2e-isolation" }, async () => { + const pluginA = createPluginStateKeyedStore<{ owner: string }>("plugin-a", { + namespace: "x", + maxEntries: 10, + }); + const pluginB = createPluginStateKeyedStore<{ owner: string }>("plugin-b", { + namespace: "x", + maxEntries: 10, + }); + + await pluginA.register("same", { owner: "a" }); + await pluginB.register("same", { owner: "b" }); + + await expect(pluginA.lookup("same")).resolves.toEqual({ owner: "a" }); + await expect(pluginB.lookup("same")).resolves.toEqual({ owner: "b" }); + + // Clearing one plugin's namespace does not affect the other. + await pluginA.clear(); + await expect(pluginA.lookup("same")).resolves.toBeUndefined(); + await expect(pluginB.lookup("same")).resolves.toEqual({ owner: "b" }); + }); + }); +}); + +// --------------------------------------------------------------------------- +// Limits +// --------------------------------------------------------------------------- +describe("limits", () => { + it("accepts a value at the 64 KB boundary", async () => { + await withOpenClawTestState({ label: "e2e-limit-accept" }, async () => { + const store = createPluginStateKeyedStore("fixture-plugin", { + namespace: "size", + maxEntries: 10, + }); + // JSON.stringify wraps a string in quotes (+2 bytes). + // 65 534 chars → 65 536 bytes of JSON → exactly at limit. + const boundary = "x".repeat(65_534); + await expect(store.register("big", boundary)).resolves.toBeUndefined(); + await expect(store.lookup("big")).resolves.toBe(boundary); + }); + }); + + it("rejects a value one byte over 64 KB", async () => { + await withOpenClawTestState({ label: "e2e-limit-reject" }, async () => { + const store = createPluginStateKeyedStore("fixture-plugin", { + namespace: "size", + maxEntries: 10, + }); + // 65 535 chars → 65 537 bytes of JSON → over limit. + const oversize = "x".repeat(65_535); + await expect(store.register("big", oversize)).rejects.toMatchObject({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + }); + }); + }); + + it("enforces the per-plugin live-row cap", async () => { + await withOpenClawTestState({ label: "e2e-limit-plugin" }, async () => { + // Spread MAX_ENTRIES_PER_PLUGIN rows across several namespaces so + // namespace eviction never fires (each namespace has generous room). + const nsCount = 10; + const perNs = MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN / nsCount; // 100 + const stores = Array.from({ length: nsCount }, (_, i) => + createPluginStateKeyedStore("fixture-plugin", { + namespace: `ns-${i}`, + maxEntries: perNs + 1, + }), + ); + + for (let ns = 0; ns < nsCount; ns += 1) { + for (let k = 0; k < perNs; k += 1) { + await stores[ns].register(`k-${k}`, { ns, k }); + } + } + + // One more row tips over the plugin-wide limit. + await expect(stores[0].register("overflow", { boom: true })).rejects.toMatchObject({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + }); + }); + }); + + it("evicts oldest entries when namespace maxEntries is exceeded", async () => { + await withOpenClawTestState({ label: "e2e-limit-eviction" }, async () => { + vi.useFakeTimers(); + const store = createPluginStateKeyedStore("fixture-plugin", { + namespace: "capped", + maxEntries: 3, + }); + + vi.setSystemTime(1000); + await store.register("a", 1); + vi.setSystemTime(2000); + await store.register("b", 2); + vi.setSystemTime(3000); + await store.register("c", 3); + vi.setSystemTime(4000); + await store.register("d", 4); // should evict "a" + + const entries = await store.entries(); + expect(entries).toHaveLength(3); + expect(entries.map((e) => e.key)).toEqual(["b", "c", "d"]); + await expect(store.lookup("a")).resolves.toBeUndefined(); + }); + }); +}); + +// --------------------------------------------------------------------------- +// Failure safety +// --------------------------------------------------------------------------- +describe("failure safety", () => { + it("gives a typed error for unsupported schema versions", async () => { + await withOpenClawTestState({ label: "e2e-fail-schema" }, async () => { + // Pre-seed the DB with a future schema version. + mkdirSync(resolvePluginStateDir(), { recursive: true }); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(resolvePluginStateSqlitePath()); + db.exec("PRAGMA user_version = 99;"); + db.close(); + + const store = createPluginStateKeyedStore("fixture-plugin", { + namespace: "schema", + maxEntries: 10, + }); + const error = await store.register("k", { ok: true }).catch((e: unknown) => e); + expect(error).toBeInstanceOf(PluginStateStoreError); + expect(error).toMatchObject({ code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED" }); + }); + }); + + it("probe returns redacted diagnostics without leaking stored values", async () => { + await withOpenClawTestState({ label: "e2e-fail-probe" }, async () => { + const result = probePluginStateStore(); + expect(result.ok).toBe(true); + expect(result.dbPath).toContain("state.sqlite"); + expect(result.steps.length).toBeGreaterThanOrEqual(4); + expect(result.steps.every((s) => s.ok)).toBe(true); + + // The probe's temporary stored value must not leak into the result. + const serialised = JSON.stringify(result); + expect(serialised).not.toContain("probe-value"); + }); + }); + + it("close and reopen cycle is clean", async () => { + await withOpenClawTestState({ label: "e2e-fail-reopen" }, async () => { + const store = createPluginStateKeyedStore<{ v: number }>("fixture-plugin", { + namespace: "reopen", + maxEntries: 10, + }); + await store.register("k", { v: 1 }); + + // First close. + closePluginStateSqliteStore(); + await expect(store.lookup("k")).resolves.toEqual({ v: 1 }); + + // Second close (idempotent). + closePluginStateSqliteStore(); + await expect(store.lookup("k")).resolves.toEqual({ v: 1 }); + + // Write after reopen. + await store.register("k", { v: 2 }); + await expect(store.lookup("k")).resolves.toEqual({ v: 2 }); + }); + }); +}); diff --git a/src/plugin-state/plugin-state-store.paths.ts b/src/plugin-state/plugin-state-store.paths.ts new file mode 100644 index 00000000000..e84083635d3 --- /dev/null +++ b/src/plugin-state/plugin-state-store.paths.ts @@ -0,0 +1,10 @@ +import path from "node:path"; +import { resolveStateDir } from "../config/paths.js"; + +export function resolvePluginStateDir(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolveStateDir(env), "plugin-state"); +} + +export function resolvePluginStateSqlitePath(env: NodeJS.ProcessEnv = process.env): string { + return path.join(resolvePluginStateDir(env), "state.sqlite"); +} diff --git a/src/plugin-state/plugin-state-store.permissions.test.ts b/src/plugin-state/plugin-state-store.permissions.test.ts new file mode 100644 index 00000000000..e0818b93452 --- /dev/null +++ b/src/plugin-state/plugin-state-store.permissions.test.ts @@ -0,0 +1,56 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; + +afterEach(() => { + vi.doUnmock("node:fs"); + vi.resetModules(); +}); + +describe("plugin state permission hardening", () => { + it("does not reject a committed write when post-commit chmod fails", async () => { + let chmodCalls = 0; + let throwAfter = Number.POSITIVE_INFINITY; + + vi.doMock("node:fs", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + chmodSync: (target: Parameters[0], mode: number) => { + chmodCalls += 1; + if (chmodCalls > throwAfter) { + throw Object.assign(new Error("chmod denied"), { code: "EACCES" }); + } + return actual.chmodSync(target, mode); + }, + existsSync: (target: Parameters[0]) => { + const pathname = String(target); + if (pathname.endsWith("-shm") || pathname.endsWith("-wal")) { + return false; + } + return actual.existsSync(target); + }, + }; + }); + + const { createPluginStateKeyedStore, resetPluginStateStoreForTests } = + await import("./plugin-state-store.js"); + + try { + await withOpenClawTestState({ label: "plugin-state-post-commit-chmod" }, async () => { + const store = createPluginStateKeyedStore<{ value: number }>("fixture-plugin", { + namespace: "post-commit", + maxEntries: 10, + }); + await store.register("first", { value: 1 }); + + chmodCalls = 0; + throwAfter = 2; + + await expect(store.register("second", { value: 2 })).resolves.toBeUndefined(); + await expect(store.lookup("second")).resolves.toEqual({ value: 2 }); + }); + } finally { + resetPluginStateStoreForTests(); + } + }); +}); diff --git a/src/plugin-state/plugin-state-store.sqlite.ts b/src/plugin-state/plugin-state-store.sqlite.ts new file mode 100644 index 00000000000..ab5e98b77a9 --- /dev/null +++ b/src/plugin-state/plugin-state-store.sqlite.ts @@ -0,0 +1,645 @@ +import { chmodSync, existsSync, mkdirSync } from "node:fs"; +import type { DatabaseSync, StatementSync } from "node:sqlite"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import { configureSqliteWalMaintenance, type SqliteWalMaintenance } from "../infra/sqlite-wal.js"; +import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js"; +import { + PluginStateStoreError, + type PluginStateEntry, + type PluginStateStoreErrorCode, + type PluginStateStoreOperation, + type PluginStateStoreProbeResult, + type PluginStateStoreProbeStep, +} from "./plugin-state-store.types.js"; + +const PLUGIN_STATE_SCHEMA_VERSION = 1; +const PLUGIN_STATE_DIR_MODE = 0o700; +const PLUGIN_STATE_FILE_MODE = 0o600; +const PLUGIN_STATE_SIDECAR_SUFFIXES = ["", "-shm", "-wal"] as const; +const MAX_ENTRIES_PER_PLUGIN = 1_000; + +export const MAX_PLUGIN_STATE_VALUE_BYTES = 65_536; +export const MAX_PLUGIN_STATE_ENTRIES_PER_PLUGIN = MAX_ENTRIES_PER_PLUGIN; + +type PluginStateRow = { + plugin_id: string; + namespace: string; + entry_key: string; + value_json: string; + created_at: number | bigint; + expires_at: number | bigint | null; +}; + +type CountRow = { + count: number | bigint; +}; + +type UserVersionRow = { + user_version?: number | bigint; +}; + +type PluginStateStatements = { + upsertEntry: StatementSync; + selectEntry: StatementSync; + selectEntries: StatementSync; + deleteEntry: StatementSync; + clearNamespace: StatementSync; + pruneExpiredNamespace: StatementSync; + countLiveNamespace: StatementSync; + countLivePlugin: StatementSync; + deleteOldestNamespace: StatementSync; + sweepExpired: StatementSync; +}; + +type PluginStateDatabase = { + db: DatabaseSync; + path: string; + statements: PluginStateStatements; + walMaintenance: SqliteWalMaintenance; +}; + +let cachedDatabase: PluginStateDatabase | null = null; + +function normalizeNumber(value: number | bigint | null): number | undefined { + if (typeof value === "bigint") { + return Number(value); + } + return typeof value === "number" ? value : undefined; +} + +function createPluginStateError(params: { + code: PluginStateStoreErrorCode; + operation: PluginStateStoreOperation; + message: string; + path?: string; + cause?: unknown; +}): PluginStateStoreError { + return new PluginStateStoreError(params.message, { + code: params.code, + operation: params.operation, + ...(params.path ? { path: params.path } : {}), + cause: params.cause, + }); +} + +function wrapPluginStateError( + error: unknown, + operation: PluginStateStoreOperation, + fallbackCode: PluginStateStoreErrorCode, + message: string, + pathname = resolvePluginStateSqlitePath(process.env), +): PluginStateStoreError { + if (error instanceof PluginStateStoreError) { + return error; + } + return createPluginStateError({ + code: fallbackCode, + operation, + message, + path: pathname, + cause: error, + }); +} + +function parseStoredJson(raw: string, operation: PluginStateStoreOperation): unknown { + try { + return JSON.parse(raw) as unknown; + } catch (error) { + throw createPluginStateError({ + code: "PLUGIN_STATE_CORRUPT", + operation, + message: "Plugin state entry contains corrupt JSON.", + path: resolvePluginStateSqlitePath(process.env), + cause: error, + }); + } +} + +function rowToEntry( + row: PluginStateRow, + operation: PluginStateStoreOperation, +): PluginStateEntry { + const expiresAt = normalizeNumber(row.expires_at); + return { + key: row.entry_key, + value: parseStoredJson(row.value_json, operation), + createdAt: normalizeNumber(row.created_at) ?? 0, + ...(expiresAt != null ? { expiresAt } : {}), + }; +} + +function getUserVersion(db: DatabaseSync): number { + const row = db.prepare("PRAGMA user_version").get() as UserVersionRow | undefined; + const raw = row?.user_version ?? 0; + return typeof raw === "bigint" ? Number(raw) : raw; +} + +function ensureSchema(db: DatabaseSync, pathname: string) { + const userVersion = getUserVersion(db); + if (userVersion > PLUGIN_STATE_SCHEMA_VERSION) { + throw createPluginStateError({ + code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED", + operation: "ensure-schema", + message: `Plugin state database schema version ${userVersion} is newer than supported version ${PLUGIN_STATE_SCHEMA_VERSION}.`, + path: pathname, + }); + } + + db.exec(` + CREATE TABLE IF NOT EXISTS plugin_state_entries ( + plugin_id TEXT NOT NULL, + namespace TEXT NOT NULL, + entry_key TEXT NOT NULL, + value_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + expires_at INTEGER, + PRIMARY KEY (plugin_id, namespace, entry_key) + ); + + CREATE INDEX IF NOT EXISTS idx_plugin_state_expiry + ON plugin_state_entries(expires_at) + WHERE expires_at IS NOT NULL; + + CREATE INDEX IF NOT EXISTS idx_plugin_state_listing + ON plugin_state_entries(plugin_id, namespace, created_at, entry_key); + + PRAGMA user_version = ${PLUGIN_STATE_SCHEMA_VERSION}; + `); +} + +function createStatements(db: DatabaseSync): PluginStateStatements { + return { + upsertEntry: db.prepare(` + INSERT INTO plugin_state_entries ( + plugin_id, + namespace, + entry_key, + value_json, + created_at, + expires_at + ) VALUES ( + @plugin_id, + @namespace, + @entry_key, + @value_json, + @created_at, + @expires_at + ) + ON CONFLICT(plugin_id, namespace, entry_key) DO UPDATE SET + value_json = excluded.value_json, + created_at = excluded.created_at, + expires_at = excluded.expires_at + `), + selectEntry: db.prepare(` + SELECT plugin_id, namespace, entry_key, value_json, created_at, expires_at + FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND entry_key = ? + AND (expires_at IS NULL OR expires_at > ?) + `), + selectEntries: db.prepare(` + SELECT plugin_id, namespace, entry_key, value_json, created_at, expires_at + FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at ASC, entry_key ASC + `), + deleteEntry: db.prepare(` + DELETE FROM plugin_state_entries + WHERE plugin_id = ? AND namespace = ? AND entry_key = ? + `), + clearNamespace: db.prepare(` + DELETE FROM plugin_state_entries + WHERE plugin_id = ? AND namespace = ? + `), + pruneExpiredNamespace: db.prepare(` + DELETE FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND expires_at IS NOT NULL + AND expires_at <= ? + `), + countLiveNamespace: db.prepare(` + SELECT COUNT(*) AS count + FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND (expires_at IS NULL OR expires_at > ?) + `), + countLivePlugin: db.prepare(` + SELECT COUNT(*) AS count + FROM plugin_state_entries + WHERE plugin_id = ? + AND (expires_at IS NULL OR expires_at > ?) + `), + deleteOldestNamespace: db.prepare(` + DELETE FROM plugin_state_entries + WHERE rowid IN ( + SELECT rowid + FROM plugin_state_entries + WHERE plugin_id = ? + AND namespace = ? + AND (expires_at IS NULL OR expires_at > ?) + ORDER BY created_at ASC, entry_key ASC + LIMIT ? + ) + `), + sweepExpired: db.prepare(` + DELETE FROM plugin_state_entries + WHERE expires_at IS NOT NULL AND expires_at <= ? + `), + }; +} + +function ensurePluginStatePermissions(pathname: string) { + const dir = resolvePluginStateDir(process.env); + mkdirSync(dir, { recursive: true, mode: PLUGIN_STATE_DIR_MODE }); + chmodSync(dir, PLUGIN_STATE_DIR_MODE); + for (const suffix of PLUGIN_STATE_SIDECAR_SUFFIXES) { + const candidate = `${pathname}${suffix}`; + if (existsSync(candidate)) { + chmodSync(candidate, PLUGIN_STATE_FILE_MODE); + } + } +} + +function ensurePluginStatePermissionsBestEffort(pathname: string): void { + try { + ensurePluginStatePermissions(pathname); + } catch { + // The write already committed. Permission hardening is best-effort from here. + } +} + +function openPluginStateDatabase( + operation: PluginStateStoreOperation = "open", +): PluginStateDatabase { + const pathname = resolvePluginStateSqlitePath(process.env); + if (cachedDatabase && cachedDatabase.path === pathname) { + return cachedDatabase; + } + if (cachedDatabase) { + cachedDatabase.walMaintenance.close(); + cachedDatabase.db.close(); + cachedDatabase = null; + } + + try { + ensurePluginStatePermissions(pathname); + } catch (error) { + throw createPluginStateError({ + code: "PLUGIN_STATE_OPEN_FAILED", + operation, + message: "Failed to prepare the plugin state database directory.", + path: pathname, + cause: error, + }); + } + + let sqlite: typeof import("node:sqlite"); + try { + sqlite = requireNodeSqlite(); + } catch (error) { + throw createPluginStateError({ + code: "PLUGIN_STATE_SQLITE_UNAVAILABLE", + operation: "load-sqlite", + message: "SQLite support is unavailable for plugin state storage.", + path: pathname, + cause: error, + }); + } + + try { + const db = new sqlite.DatabaseSync(pathname); + const walMaintenance = configureSqliteWalMaintenance(db); + db.exec("PRAGMA synchronous = NORMAL;"); + db.exec("PRAGMA busy_timeout = 5000;"); + ensureSchema(db, pathname); + ensurePluginStatePermissions(pathname); + cachedDatabase = { + db, + path: pathname, + statements: createStatements(db), + walMaintenance, + }; + return cachedDatabase; + } catch (error) { + throw wrapPluginStateError( + error, + operation, + "PLUGIN_STATE_OPEN_FAILED", + "Failed to open the plugin state database.", + pathname, + ); + } +} + +function countRow(row: CountRow | undefined): number { + const raw = row?.count ?? 0; + return typeof raw === "bigint" ? Number(raw) : raw; +} + +function runWriteTransaction( + operation: PluginStateStoreOperation, + write: (store: PluginStateDatabase) => T, +): T { + const store = openPluginStateDatabase(operation); + ensurePluginStatePermissions(store.path); + store.db.exec("BEGIN IMMEDIATE"); + try { + const result = write(store); + store.db.exec("COMMIT"); + ensurePluginStatePermissionsBestEffort(store.path); + return result; + } catch (error) { + try { + store.db.exec("ROLLBACK"); + } catch { + // Preserve the original failure; rollback errors are secondary here. + } + throw error; + } +} + +export function pluginStateRegister(params: { + pluginId: string; + namespace: string; + key: string; + valueJson: string; + maxEntries: number; + ttlMs?: number; +}): void { + try { + runWriteTransaction("register", (store) => { + const now = Date.now(); + const expiresAt = params.ttlMs == null ? null : now + params.ttlMs; + store.statements.pruneExpiredNamespace.run(params.pluginId, params.namespace, now); + store.statements.upsertEntry.run({ + plugin_id: params.pluginId, + namespace: params.namespace, + entry_key: params.key, + value_json: params.valueJson, + created_at: now, + expires_at: expiresAt, + }); + + const namespaceCount = countRow( + store.statements.countLiveNamespace.get(params.pluginId, params.namespace, now) as + | CountRow + | undefined, + ); + if (namespaceCount > params.maxEntries) { + store.statements.deleteOldestNamespace.run( + params.pluginId, + params.namespace, + now, + namespaceCount - params.maxEntries, + ); + } + + const pluginCount = countRow( + store.statements.countLivePlugin.get(params.pluginId, now) as CountRow | undefined, + ); + if (pluginCount > MAX_ENTRIES_PER_PLUGIN) { + throw createPluginStateError({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + operation: "register", + message: `Plugin state for ${params.pluginId} exceeds the ${MAX_ENTRIES_PER_PLUGIN} live row limit.`, + path: store.path, + }); + } + }); + } catch (error) { + throw wrapPluginStateError( + error, + "register", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to register plugin state entry.", + ); + } +} + +export function pluginStateLookup(params: { + pluginId: string; + namespace: string; + key: string; +}): unknown { + try { + const { statements } = openPluginStateDatabase("lookup"); + const row = statements.selectEntry.get( + params.pluginId, + params.namespace, + params.key, + Date.now(), + ) as PluginStateRow | undefined; + return row ? parseStoredJson(row.value_json, "lookup") : undefined; + } catch (error) { + throw wrapPluginStateError( + error, + "lookup", + "PLUGIN_STATE_READ_FAILED", + "Failed to read plugin state entry.", + ); + } +} + +export function pluginStateConsume(params: { + pluginId: string; + namespace: string; + key: string; +}): unknown { + try { + return runWriteTransaction("consume", (store) => { + const row = store.statements.selectEntry.get( + params.pluginId, + params.namespace, + params.key, + Date.now(), + ) as PluginStateRow | undefined; + if (!row) { + return undefined; + } + store.statements.deleteEntry.run(params.pluginId, params.namespace, params.key); + return parseStoredJson(row.value_json, "consume"); + }); + } catch (error) { + throw wrapPluginStateError( + error, + "consume", + "PLUGIN_STATE_READ_FAILED", + "Failed to consume plugin state entry.", + ); + } +} + +export function pluginStateDelete(params: { + pluginId: string; + namespace: string; + key: string; +}): boolean { + try { + const { statements } = openPluginStateDatabase("delete"); + const result = statements.deleteEntry.run(params.pluginId, params.namespace, params.key); + return result.changes > 0; + } catch (error) { + throw wrapPluginStateError( + error, + "delete", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to delete plugin state entry.", + ); + } +} + +export function pluginStateEntries(params: { + pluginId: string; + namespace: string; +}): PluginStateEntry[] { + try { + const { statements } = openPluginStateDatabase("entries"); + const rows = statements.selectEntries.all( + params.pluginId, + params.namespace, + Date.now(), + ) as PluginStateRow[]; + return rows.map((row) => rowToEntry(row, "entries")); + } catch (error) { + throw wrapPluginStateError( + error, + "entries", + "PLUGIN_STATE_READ_FAILED", + "Failed to list plugin state entries.", + ); + } +} + +export function pluginStateClear(params: { pluginId: string; namespace: string }): void { + try { + const { statements } = openPluginStateDatabase("clear"); + statements.clearNamespace.run(params.pluginId, params.namespace); + } catch (error) { + throw wrapPluginStateError( + error, + "clear", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to clear plugin state namespace.", + ); + } +} + +export function sweepExpiredPluginStateEntries(): number { + try { + const { statements } = openPluginStateDatabase("sweep"); + const result = statements.sweepExpired.run(Date.now()); + return Number(result.changes); + } catch (error) { + throw wrapPluginStateError( + error, + "sweep", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to sweep expired plugin state entries.", + ); + } +} + +export function isPluginStateDatabaseOpen(): boolean { + return cachedDatabase !== null; +} + +export function probePluginStateStore(): PluginStateStoreProbeResult { + const dbPath = resolvePluginStateSqlitePath(process.env); + const steps: PluginStateStoreProbeStep[] = []; + const wasOpen = cachedDatabase !== null; + + const pushOk = (name: string) => steps.push({ name, ok: true }); + const pushFailure = (name: string, error: unknown) => { + const wrapped = + error instanceof PluginStateStoreError + ? error + : createPluginStateError({ + code: "PLUGIN_STATE_OPEN_FAILED", + operation: "probe", + message: error instanceof Error ? error.message : String(error), + path: dbPath, + cause: error, + }); + steps.push({ name, ok: false, code: wrapped.code, message: wrapped.message }); + }; + + try { + ensurePluginStatePermissions(dbPath); + pushOk("state-dir"); + } catch (error) { + pushFailure("state-dir", error); + return { ok: false, dbPath, steps }; + } + + try { + requireNodeSqlite(); + pushOk("load-sqlite"); + } catch (error) { + pushFailure( + "load-sqlite", + createPluginStateError({ + code: "PLUGIN_STATE_SQLITE_UNAVAILABLE", + operation: "load-sqlite", + message: "SQLite support is unavailable for plugin state storage.", + path: dbPath, + cause: error, + }), + ); + return { ok: false, dbPath, steps }; + } + + try { + const store = openPluginStateDatabase("probe"); + pushOk("open"); + ensureSchema(store.db, store.path); + pushOk("schema"); + runWriteTransaction("probe", ({ statements }) => { + const now = Date.now(); + statements.upsertEntry.run({ + plugin_id: "core:plugin-state-probe", + namespace: "diagnostics", + entry_key: "probe", + value_json: JSON.stringify({ ok: true }), + created_at: now, + expires_at: now + 60_000, + }); + statements.selectEntry.get("core:plugin-state-probe", "diagnostics", "probe", now); + statements.deleteEntry.run("core:plugin-state-probe", "diagnostics", "probe"); + }); + pushOk("write-read-delete"); + store.walMaintenance.checkpoint(); + pushOk("checkpoint"); + } catch (error) { + pushFailure("probe", error); + } finally { + if (!wasOpen) { + closePluginStateSqliteStore(); + } + } + + return { ok: steps.every((step) => step.ok), dbPath, steps }; +} + +export function closePluginStateSqliteStore(): void { + if (!cachedDatabase) { + return; + } + try { + cachedDatabase.walMaintenance.close(); + cachedDatabase.db.close(); + cachedDatabase = null; + } catch (error) { + cachedDatabase = null; + throw wrapPluginStateError( + error, + "close", + "PLUGIN_STATE_WRITE_FAILED", + "Failed to close plugin state database.", + ); + } +} diff --git a/src/plugin-state/plugin-state-store.test.ts b/src/plugin-state/plugin-state-store.test.ts new file mode 100644 index 00000000000..03fe8fc849e --- /dev/null +++ b/src/plugin-state/plugin-state-store.test.ts @@ -0,0 +1,379 @@ +import { mkdirSync, statSync } from "node:fs"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { requireNodeSqlite } from "../infra/node-sqlite.js"; +import type { PluginRecord } from "../plugins/registry-types.js"; +import { createPluginRegistry } from "../plugins/registry.js"; +import { createPluginRuntime } from "../plugins/runtime/index.js"; +import { withOpenClawTestState } from "../test-utils/openclaw-test-state.js"; +import { + closePluginStateSqliteStore, + createCorePluginStateKeyedStore, + createPluginStateKeyedStore, + PluginStateStoreError, + probePluginStateStore, + resetPluginStateStoreForTests, + sweepExpiredPluginStateEntries, +} from "./plugin-state-store.js"; +import { resolvePluginStateDir, resolvePluginStateSqlitePath } from "./plugin-state-store.paths.js"; + +function createPluginRecord(id: string, origin: PluginRecord["origin"] = "bundled"): PluginRecord { + return { + id, + name: id, + source: `/plugins/${id}/index.ts`, + origin, + enabled: true, + status: "loaded", + toolNames: [], + hookNames: [], + channelIds: [], + cliBackendIds: [], + providerIds: [], + speechProviderIds: [], + realtimeTranscriptionProviderIds: [], + realtimeVoiceProviderIds: [], + mediaUnderstandingProviderIds: [], + imageGenerationProviderIds: [], + videoGenerationProviderIds: [], + musicGenerationProviderIds: [], + webFetchProviderIds: [], + webSearchProviderIds: [], + migrationProviderIds: [], + memoryEmbeddingProviderIds: [], + agentHarnessIds: [], + gatewayMethods: [], + cliCommands: [], + services: [], + gatewayDiscoveryServiceIds: [], + commands: [], + httpRoutes: 0, + hookCount: 0, + configSchema: false, + } as PluginRecord; +} + +afterEach(() => { + vi.useRealTimers(); + resetPluginStateStoreForTests(); +}); + +describe("plugin state keyed store", () => { + it("registers and looks up values across store instances", async () => { + await withOpenClawTestState({ label: "plugin-state-roundtrip" }, async () => { + const store = createPluginStateKeyedStore<{ count: number }>("discord", { + namespace: "components", + maxEntries: 10, + }); + await store.register("interaction:1", { count: 1 }); + + const reopened = createPluginStateKeyedStore<{ count: number }>("discord", { + namespace: "components", + maxEntries: 10, + }); + await expect(reopened.lookup("interaction:1")).resolves.toEqual({ count: 1 }); + }); + }); + + it("upserts values and refreshes deterministic entry ordering", async () => { + await withOpenClawTestState({ label: "plugin-state-upsert" }, async () => { + vi.useFakeTimers(); + const store = createPluginStateKeyedStore<{ version: number }>("discord", { + namespace: "components", + maxEntries: 10, + }); + vi.setSystemTime(1000); + await store.register("b", { version: 1 }); + vi.setSystemTime(2000); + await store.register("a", { version: 1 }); + vi.setSystemTime(3000); + await store.register("b", { version: 2 }); + + await expect(store.lookup("b")).resolves.toEqual({ version: 2 }); + await expect(store.entries()).resolves.toMatchObject([ + { key: "a", value: { version: 1 }, createdAt: 2000 }, + { key: "b", value: { version: 2 }, createdAt: 3000 }, + ]); + }); + }); + + it("returns undefined for missing lookups and consumes by deleting atomically", async () => { + await withOpenClawTestState({ label: "plugin-state-consume" }, async () => { + const store = createPluginStateKeyedStore<{ ok: boolean }>("discord", { + namespace: "components", + maxEntries: 10, + }); + + await expect(store.lookup("missing")).resolves.toBeUndefined(); + await expect(store.consume("missing")).resolves.toBeUndefined(); + await store.register("k", { ok: true }); + await expect(store.consume("k")).resolves.toEqual({ ok: true }); + await expect(store.lookup("k")).resolves.toBeUndefined(); + }); + }); + + it("deletes and clears only the targeted namespace", async () => { + await withOpenClawTestState({ label: "plugin-state-clear" }, async () => { + const first = createPluginStateKeyedStore("discord", { namespace: "a", maxEntries: 10 }); + const second = createPluginStateKeyedStore("discord", { namespace: "b", maxEntries: 10 }); + await first.register("k1", { value: 1 }); + await second.register("k2", { value: 2 }); + + await expect(first.delete("k1")).resolves.toBe(true); + await expect(first.delete("k1")).resolves.toBe(false); + await first.register("k1", { value: 1 }); + await first.clear(); + + await expect(first.entries()).resolves.toEqual([]); + await expect(second.lookup("k2")).resolves.toEqual({ value: 2 }); + }); + }); + + it("excludes expired entries and sweeps them", async () => { + await withOpenClawTestState({ label: "plugin-state-expiry" }, async () => { + vi.useFakeTimers(); + vi.setSystemTime(1000); + const store = createPluginStateKeyedStore("discord", { + namespace: "ttl", + maxEntries: 10, + defaultTtlMs: 100, + }); + await store.register("default", { value: "default" }); + await store.register("override", { value: "override" }, { ttlMs: 500 }); + + vi.setSystemTime(1200); + await expect(store.lookup("default")).resolves.toBeUndefined(); + await expect(store.lookup("override")).resolves.toEqual({ value: "override" }); + expect(sweepExpiredPluginStateEntries()).toBe(1); + await expect(store.entries()).resolves.toMatchObject([{ key: "override" }]); + }); + }); + + it("evicts oldest live entries over maxEntries", async () => { + await withOpenClawTestState({ label: "plugin-state-eviction" }, async () => { + vi.useFakeTimers(); + const store = createPluginStateKeyedStore("discord", { namespace: "evict", maxEntries: 2 }); + vi.setSystemTime(1000); + await store.register("a", 1); + vi.setSystemTime(2000); + await store.register("b", 2); + vi.setSystemTime(3000); + await store.register("c", 3); + + await expect(store.entries()).resolves.toMatchObject([{ key: "b" }, { key: "c" }]); + }); + }); + + it("rejects when the per-plugin live row ceiling would be exceeded without evicting siblings", async () => { + await withOpenClawTestState({ label: "plugin-state-plugin-limit" }, async () => { + const stores = Array.from({ length: 10 }, (_, index) => + createPluginStateKeyedStore("discord", { + namespace: `ns-${index}`, + maxEntries: 101, + }), + ); + for (let namespaceIndex = 0; namespaceIndex < stores.length; namespaceIndex += 1) { + for (let entryIndex = 0; entryIndex < 100; entryIndex += 1) { + await stores[namespaceIndex].register(`k-${entryIndex}`, { namespaceIndex, entryIndex }); + } + } + + await expect(stores[0].register("overflow", { overflow: true })).rejects.toMatchObject({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + }); + await expect(stores[1].lookup("k-0")).resolves.toEqual({ namespaceIndex: 1, entryIndex: 0 }); + }); + }); + + it("segregates plugins sharing a namespace and key", async () => { + await withOpenClawTestState({ label: "plugin-state-segregation" }, async () => { + const discord = createPluginStateKeyedStore("discord", { namespace: "same", maxEntries: 10 }); + const telegram = createPluginStateKeyedStore("telegram", { + namespace: "same", + maxEntries: 10, + }); + await discord.register("k", { plugin: "discord" }); + await telegram.register("k", { plugin: "telegram" }); + await discord.clear(); + + await expect(discord.lookup("k")).resolves.toBeUndefined(); + await expect(telegram.lookup("k")).resolves.toEqual({ plugin: "telegram" }); + }); + }); + + it("validates namespaces, keys, options, and JSON values before writes", async () => { + await withOpenClawTestState({ label: "plugin-state-validation" }, async () => { + expect(() => + createPluginStateKeyedStore("discord", { namespace: "../bad", maxEntries: 10 }), + ).toThrow(PluginStateStoreError); + expect(() => + createPluginStateKeyedStore("discord", { namespace: "bad-max", maxEntries: 0 }), + ).toThrow(PluginStateStoreError); + + const store = createPluginStateKeyedStore("discord", { namespace: "valid", maxEntries: 10 }); + await expect(store.register(" ", { ok: true })).rejects.toThrow(PluginStateStoreError); + await expect(store.register("undefined", undefined)).rejects.toThrow(PluginStateStoreError); + await expect(store.register("infinity", Number.POSITIVE_INFINITY)).rejects.toThrow( + PluginStateStoreError, + ); + const circular: Record = {}; + circular.self = circular; + await expect(store.register("circular", circular)).rejects.toThrow(PluginStateStoreError); + const sparse = [] as unknown[]; + sparse[1] = "hole"; + await expect(store.register("sparse", sparse)).rejects.toThrow(PluginStateStoreError); + await expect(store.register("date", new Date())).rejects.toThrow(PluginStateStoreError); + await expect(store.register("map", new Map([["k", "v"]]))).rejects.toThrow( + PluginStateStoreError, + ); + const nonEnumerable = { visible: true }; + Object.defineProperty(nonEnumerable, "hidden", { value: true, enumerable: false }); + await expect(store.register("non-enumerable", nonEnumerable)).rejects.toThrow( + PluginStateStoreError, + ); + await expect(store.register("big", "x".repeat(65_537))).rejects.toMatchObject({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + }); + + // Key byte-length limit (512 bytes) + await expect(store.register("k".repeat(513), { ok: true })).rejects.toThrow( + PluginStateStoreError, + ); + + // Namespace byte-length limit (128 bytes) + expect(() => + createPluginStateKeyedStore("discord", { namespace: "a".repeat(129), maxEntries: 10 }), + ).toThrow(PluginStateStoreError); + + // JSON depth limit (64 levels) + let deep: unknown = { leaf: true }; + for (let i = 0; i < 65; i += 1) { + deep = { nested: deep }; + } + await expect(store.register("deep", deep)).rejects.toMatchObject({ + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + }); + + // Validation errors surface the correct operation + await expect(store.lookup(" ")).rejects.toMatchObject({ + code: "PLUGIN_STATE_INVALID_INPUT", + operation: "lookup", + }); + await expect(store.delete(" ")).rejects.toMatchObject({ + code: "PLUGIN_STATE_INVALID_INPUT", + operation: "delete", + }); + }); + }); + + it("rejects reopening the same namespace with incompatible options", async () => { + await withOpenClawTestState({ label: "plugin-state-option-consistency" }, async () => { + createPluginStateKeyedStore("discord", { namespace: "same", maxEntries: 10 }); + expect(() => + createPluginStateKeyedStore("discord", { namespace: "same", maxEntries: 11 }), + ).toThrow(PluginStateStoreError); + }); + }); + + it("allows core owners and reserves core-prefixed plugin ids", async () => { + await withOpenClawTestState({ label: "plugin-state-core" }, async () => { + const store = createCorePluginStateKeyedStore<{ stopped: boolean }>({ + ownerId: "core:channel-intent", + namespace: "stopped", + maxEntries: 10, + }); + await store.register("telegram:personal", { stopped: true }); + await expect(store.lookup("telegram:personal")).resolves.toEqual({ stopped: true }); + expect(() => + createPluginStateKeyedStore("core:not-a-plugin", { namespace: "bad", maxEntries: 10 }), + ).toThrow(PluginStateStoreError); + }); + }); + + it("closes the cached DB handle and reopens cleanly", async () => { + await withOpenClawTestState({ label: "plugin-state-close" }, async () => { + const store = createPluginStateKeyedStore("discord", { namespace: "close", maxEntries: 10 }); + await store.register("k", { ok: true }); + closePluginStateSqliteStore(); + await expect(store.lookup("k")).resolves.toEqual({ ok: true }); + }); + }); + + it.runIf(process.platform !== "win32")("hardens DB directory and file permissions", async () => { + await withOpenClawTestState({ label: "plugin-state-permissions" }, async () => { + const store = createPluginStateKeyedStore("discord", { namespace: "perms", maxEntries: 10 }); + await store.register("k", { ok: true }); + + expect(statSync(resolvePluginStateDir()).mode & 0o777).toBe(0o700); + expect(statSync(resolvePluginStateSqlitePath()).mode & 0o777).toBe(0o600); + }); + }); + + it("reports healthy diagnostics without stored values", async () => { + await withOpenClawTestState({ label: "plugin-state-probe" }, async () => { + const result = probePluginStateStore(); + expect(result.ok).toBe(true); + expect(result.steps.every((step) => step.ok)).toBe(true); + expect(JSON.stringify(result)).not.toContain("probe-value"); + }); + }); + + it("throws on unsupported future schema versions", async () => { + await withOpenClawTestState({ label: "plugin-state-schema" }, async () => { + mkdirSync(resolvePluginStateDir(), { recursive: true }); + const { DatabaseSync } = requireNodeSqlite(); + const db = new DatabaseSync(resolvePluginStateSqlitePath()); + db.exec("PRAGMA user_version = 2;"); + db.close(); + + const store = createPluginStateKeyedStore("discord", { namespace: "schema", maxEntries: 10 }); + await expect(store.register("k", { ok: true })).rejects.toMatchObject({ + code: "PLUGIN_STATE_SCHEMA_UNSUPPORTED", + }); + }); + }); +}); + +describe("plugin runtime state proxy", () => { + it("binds openKeyedStore to the bundled plugin id and keeps resolveStateDir", async () => { + await withOpenClawTestState({ label: "plugin-state-runtime" }, async (state) => { + const registry = createPluginRegistry({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + runtime: createPluginRuntime(), + }); + const record = createPluginRecord("discord", "bundled"); + registry.registry.plugins.push(record); + const api = registry.createApi(record, { config: {} }); + + expect(api.runtime.state.resolveStateDir()).toBe(state.stateDir); + const store = api.runtime.state.openKeyedStore<{ plugin: string }>({ + namespace: "runtime", + maxEntries: 10, + }); + await store.register("k", { plugin: "discord" }); + + const telegram = createPluginRecord("telegram", "bundled"); + registry.registry.plugins.push(telegram); + const telegramApi = registry.createApi(telegram, { config: {} }); + const telegramStore = telegramApi.runtime.state.openKeyedStore<{ plugin: string }>({ + namespace: "runtime", + maxEntries: 10, + }); + await expect(telegramStore.lookup("k")).resolves.toBeUndefined(); + await expect(store.lookup("k")).resolves.toEqual({ plugin: "discord" }); + }); + }); + + it("rejects external plugins in this release", () => { + const registry = createPluginRegistry({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn(), debug: vi.fn() }, + runtime: createPluginRuntime(), + }); + const record = createPluginRecord("external-plugin", "workspace"); + registry.registry.plugins.push(record); + const api = registry.createApi(record, { config: {} }); + + expect(() => + api.runtime.state.openKeyedStore({ namespace: "runtime", maxEntries: 10 }), + ).toThrow("openKeyedStore is only available for bundled plugins"); + }); +}); diff --git a/src/plugin-state/plugin-state-store.ts b/src/plugin-state/plugin-state-store.ts new file mode 100644 index 00000000000..b8002581f06 --- /dev/null +++ b/src/plugin-state/plugin-state-store.ts @@ -0,0 +1,276 @@ +import { + closePluginStateSqliteStore, + MAX_PLUGIN_STATE_VALUE_BYTES, + pluginStateClear, + pluginStateConsume, + pluginStateDelete, + pluginStateEntries, + pluginStateLookup, + pluginStateRegister, +} from "./plugin-state-store.sqlite.js"; +import type { + OpenKeyedStoreOptions, + PluginStateEntry, + PluginStateKeyedStore, + PluginStateStoreOperation, +} from "./plugin-state-store.types.js"; +import { PluginStateStoreError } from "./plugin-state-store.types.js"; + +export type { + OpenKeyedStoreOptions, + PluginStateEntry, + PluginStateKeyedStore, + PluginStateStoreErrorCode, + PluginStateStoreOperation, + PluginStateStoreProbeResult, + PluginStateStoreProbeStep, +} from "./plugin-state-store.types.js"; +export { PluginStateStoreError } from "./plugin-state-store.types.js"; +export { + closePluginStateSqliteStore, + isPluginStateDatabaseOpen, + probePluginStateStore, + sweepExpiredPluginStateEntries, +} from "./plugin-state-store.sqlite.js"; + +const NAMESPACE_PATTERN = /^[a-z0-9][a-z0-9._-]*$/iu; +const MAX_NAMESPACE_BYTES = 128; +const MAX_KEY_BYTES = 512; +const MAX_JSON_DEPTH = 64; + +type StoreOptionSignature = { + maxEntries: number; + defaultTtlMs?: number; +}; + +const namespaceOptionSignatures = new Map(); +const textEncoder = new TextEncoder(); + +function invalidInput( + message: string, + operation: PluginStateStoreOperation = "register", +): PluginStateStoreError { + return new PluginStateStoreError(message, { + code: "PLUGIN_STATE_INVALID_INPUT", + operation, + }); +} + +function assertMaxBytes( + label: string, + value: string, + max: number, + operation: PluginStateStoreOperation = "register", +): void { + if (textEncoder.encode(value).byteLength > max) { + throw invalidInput(`plugin state ${label} must be <= ${max} bytes`, operation); + } +} + +function validateNamespace(value: string, operation: PluginStateStoreOperation = "open"): string { + const trimmed = value.trim(); + if (!NAMESPACE_PATTERN.test(trimmed)) { + throw invalidInput(`plugin state namespace must be a safe path segment: ${value}`, operation); + } + assertMaxBytes("namespace", trimmed, MAX_NAMESPACE_BYTES, operation); + return trimmed; +} + +function validateKey(value: string, operation: PluginStateStoreOperation = "register"): string { + const trimmed = value.trim(); + if (!trimmed) { + throw invalidInput("plugin state entry key must not be empty", operation); + } + assertMaxBytes("entry key", trimmed, MAX_KEY_BYTES, operation); + return trimmed; +} + +function validateMaxEntries(value: number): number { + if (!Number.isInteger(value) || value < 1) { + throw invalidInput("plugin state maxEntries must be an integer >= 1", "open"); + } + return value; +} + +function validateOptionalTtlMs( + value: number | undefined, + operation: PluginStateStoreOperation = "register", +): number | undefined { + if (value == null) { + return undefined; + } + if (!Number.isInteger(value) || value < 1) { + throw invalidInput("plugin state ttlMs must be a positive integer", operation); + } + return value; +} + +function assertPlainJsonValue( + value: unknown, + seen: WeakSet, + path: string, + depth = 0, +): void { + if (depth > MAX_JSON_DEPTH) { + throw new PluginStateStoreError( + `plugin state value nesting exceeds maximum depth of ${MAX_JSON_DEPTH}`, + { code: "PLUGIN_STATE_LIMIT_EXCEEDED", operation: "register" }, + ); + } + if (value === null) { + return; + } + const valueType = typeof value; + if (valueType === "string" || valueType === "boolean") { + return; + } + if (valueType === "number") { + if (!Number.isFinite(value)) { + throw invalidInput(`plugin state value at ${path} must be a finite number`); + } + return; + } + if (valueType !== "object") { + throw invalidInput(`plugin state value at ${path} must be JSON-serializable`); + } + + const objectValue = value as object; + if (seen.has(objectValue)) { + throw invalidInput(`plugin state value at ${path} must not contain circular references`); + } + seen.add(objectValue); + try { + if (Array.isArray(value)) { + for (let index = 0; index < value.length; index += 1) { + if (!(index in value)) { + throw invalidInput(`plugin state array at ${path} must not be sparse`); + } + assertPlainJsonValue(value[index], seen, `${path}[${index}]`, depth + 1); + } + return; + } + + if (Object.getPrototypeOf(objectValue) !== Object.prototype) { + throw invalidInput(`plugin state object at ${path} must be a plain object`); + } + + const descriptorEntries = Object.entries(Object.getOwnPropertyDescriptors(objectValue)); + const enumerableKeys = Object.keys(objectValue); + if (Object.getOwnPropertySymbols(objectValue).length > 0) { + throw invalidInput(`plugin state object at ${path} must not use symbol keys`); + } + if (descriptorEntries.length !== enumerableKeys.length) { + throw invalidInput(`plugin state object at ${path} must not use non-enumerable properties`); + } + for (const [key, descriptor] of descriptorEntries) { + if (descriptor.get || descriptor.set || !("value" in descriptor)) { + throw invalidInput(`plugin state object at ${path}.${key} must use data properties`); + } + assertPlainJsonValue(descriptor.value, seen, `${path}.${key}`, depth + 1); + } + } finally { + seen.delete(objectValue); + } +} + +function assertJsonSerializable(value: unknown): void { + assertPlainJsonValue(value, new WeakSet(), "value"); +} + +function assertValueSize(json: string): void { + if (textEncoder.encode(json).byteLength > MAX_PLUGIN_STATE_VALUE_BYTES) { + throw new PluginStateStoreError("plugin state value exceeds 64KB limit", { + code: "PLUGIN_STATE_LIMIT_EXCEEDED", + operation: "register", + }); + } +} + +function assertConsistentOptions( + pluginId: string, + namespace: string, + signature: StoreOptionSignature, +): void { + const key = `${pluginId}\0${namespace}`; + const existing = namespaceOptionSignatures.get(key); + if (!existing) { + namespaceOptionSignatures.set(key, signature); + return; + } + if ( + existing.maxEntries !== signature.maxEntries || + existing.defaultTtlMs !== signature.defaultTtlMs + ) { + throw invalidInput( + `plugin state namespace ${namespace} for ${pluginId} was reopened with incompatible options`, + "open", + ); + } +} + +function createKeyedStoreForPluginId( + pluginId: string, + options: OpenKeyedStoreOptions, +): PluginStateKeyedStore { + const namespace = validateNamespace(options.namespace); + const maxEntries = validateMaxEntries(options.maxEntries); + const defaultTtlMs = validateOptionalTtlMs(options.defaultTtlMs); + assertConsistentOptions(pluginId, namespace, { maxEntries, defaultTtlMs }); + + return { + async register(key, value, opts) { + const normalizedKey = validateKey(key, "register"); + assertJsonSerializable(value); + const json = JSON.stringify(value); + assertValueSize(json); + const ttlMs = validateOptionalTtlMs(opts?.ttlMs, "register") ?? defaultTtlMs; + pluginStateRegister({ + pluginId, + namespace, + key: normalizedKey, + valueJson: json, + maxEntries, + ...(ttlMs != null ? { ttlMs } : {}), + }); + }, + async lookup(key) { + const normalizedKey = validateKey(key, "lookup"); + return pluginStateLookup({ pluginId, namespace, key: normalizedKey }) as T | undefined; + }, + async consume(key) { + const normalizedKey = validateKey(key, "consume"); + return pluginStateConsume({ pluginId, namespace, key: normalizedKey }) as T | undefined; + }, + async delete(key) { + const normalizedKey = validateKey(key, "delete"); + return pluginStateDelete({ pluginId, namespace, key: normalizedKey }); + }, + async entries() { + return pluginStateEntries({ pluginId, namespace }) as PluginStateEntry[]; + }, + async clear() { + pluginStateClear({ pluginId, namespace }); + }, + }; +} + +export function createPluginStateKeyedStore( + pluginId: string, + options: OpenKeyedStoreOptions, +): PluginStateKeyedStore { + if (pluginId.startsWith("core:")) { + throw invalidInput("Plugin ids starting with 'core:' are reserved for core consumers.", "open"); + } + return createKeyedStoreForPluginId(pluginId, options); +} + +export function createCorePluginStateKeyedStore( + options: OpenKeyedStoreOptions & { ownerId: `core:${string}` }, +): PluginStateKeyedStore { + return createKeyedStoreForPluginId(options.ownerId, options); +} + +export function resetPluginStateStoreForTests(): void { + closePluginStateSqliteStore(); + namespaceOptionSignatures.clear(); +} diff --git a/src/plugin-state/plugin-state-store.types.ts b/src/plugin-state/plugin-state-store.types.ts new file mode 100644 index 00000000000..8cb17bffe62 --- /dev/null +++ b/src/plugin-state/plugin-state-store.types.ts @@ -0,0 +1,81 @@ +export type PluginStateEntry = { + key: string; + value: T; + createdAt: number; + expiresAt?: number; +}; + +export type PluginStateKeyedStore = { + register(key: string, value: T, opts?: { ttlMs?: number }): Promise; + lookup(key: string): Promise; + consume(key: string): Promise; + delete(key: string): Promise; + entries(): Promise[]>; + clear(): Promise; +}; + +export type OpenKeyedStoreOptions = { + namespace: string; + maxEntries: number; + defaultTtlMs?: number; +}; + +export type PluginStateStoreErrorCode = + | "PLUGIN_STATE_SQLITE_UNAVAILABLE" + | "PLUGIN_STATE_OPEN_FAILED" + | "PLUGIN_STATE_SCHEMA_UNSUPPORTED" + | "PLUGIN_STATE_WRITE_FAILED" + | "PLUGIN_STATE_READ_FAILED" + | "PLUGIN_STATE_CORRUPT" + | "PLUGIN_STATE_LIMIT_EXCEEDED" + | "PLUGIN_STATE_INVALID_INPUT"; + +export type PluginStateStoreOperation = + | "load-sqlite" + | "open" + | "ensure-schema" + | "register" + | "lookup" + | "consume" + | "delete" + | "entries" + | "clear" + | "sweep" + | "probe" + | "close"; + +export type PluginStateStoreErrorOptions = { + code: PluginStateStoreErrorCode; + operation: PluginStateStoreOperation; + path?: string; + cause?: unknown; +}; + +export class PluginStateStoreError extends Error { + readonly code: PluginStateStoreErrorCode; + readonly operation: PluginStateStoreOperation; + readonly path?: string; + + constructor(message: string, options: PluginStateStoreErrorOptions) { + super(message, { cause: options.cause }); + this.name = "PluginStateStoreError"; + this.code = options.code; + this.operation = options.operation; + if (options.path) { + this.path = options.path; + } + } +} + +export type PluginStateStoreProbeStep = { + name: string; + ok: boolean; + code?: PluginStateStoreErrorCode; + message?: string; +}; + +export type PluginStateStoreProbeResult = { + ok: boolean; + dbPath: string; + steps: PluginStateStoreProbeStep[]; +}; diff --git a/src/plugins/registry.ts b/src/plugins/registry.ts index f42014fbcc4..5f5c24f5bf1 100644 --- a/src/plugins/registry.ts +++ b/src/plugins/registry.ts @@ -24,6 +24,11 @@ import { NODE_SYSTEM_NOTIFY_COMMAND, NODE_SYSTEM_RUN_COMMANDS, } from "../infra/node-commands.js"; +import { + createPluginStateKeyedStore, + type OpenKeyedStoreOptions, + type PluginStateKeyedStore, +} from "../plugin-state/plugin-state-store.js"; import { normalizePluginGatewayMethodScope } from "../shared/gateway-method-policy.js"; import { resolveGlobalSingleton } from "../shared/global-singleton.js"; import { @@ -1944,6 +1949,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { }); const pluginRuntimeById = new Map(); + const pluginRuntimeRecordById = new Map(); const resolvePluginRuntime = (pluginId: string): PluginRuntime => { const cached = pluginRuntimeById.get(pluginId); @@ -1952,6 +1958,23 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { } const runtime = new Proxy(registryParams.runtime, { get(target, prop, receiver) { + if (prop === "state") { + const baseState = Reflect.get(target, prop, receiver); + return { + ...baseState, + openKeyedStore: (options: OpenKeyedStoreOptions): PluginStateKeyedStore => { + const record = + pluginRuntimeRecordById.get(pluginId) ?? + registry.plugins.find((entry) => entry.id === pluginId); + if (record?.origin !== "bundled") { + throw new Error( + "openKeyedStore is only available for bundled plugins in this release.", + ); + } + return createPluginStateKeyedStore(pluginId, options); + }, + } satisfies PluginRuntime["state"]; + } if (prop !== "subagent") { return Reflect.get(target, prop, receiver); } @@ -1984,6 +2007,7 @@ export function createPluginRegistry(registryParams: PluginRegistryParams) { ): OpenClawPluginApi => { const registrationMode = params.registrationMode ?? "full"; const registrationCapabilities = resolvePluginRegistrationCapabilities(registrationMode); + pluginRuntimeRecordById.set(record.id, record); return buildPluginApi({ id: record.id, name: record.name, diff --git a/src/plugins/runtime/index.ts b/src/plugins/runtime/index.ts index 05dba934b4b..c0179e831b3 100644 --- a/src/plugins/runtime/index.ts +++ b/src/plugins/runtime/index.ts @@ -226,7 +226,12 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): channel: createRuntimeChannel(), events: createRuntimeEvents(), logging: createRuntimeLogging(), - state: { resolveStateDir }, + state: { + resolveStateDir, + openKeyedStore: () => { + throw new Error("openKeyedStore is only available through the plugin runtime proxy."); + }, + }, tasks, taskFlow, } satisfies Omit< @@ -262,7 +267,7 @@ export function createPluginRuntime(_options: CreatePluginRuntimeOptions = {}): defineCachedValue(runtime, "videoGeneration", createRuntimeVideoGeneration); defineCachedValue(runtime, "musicGeneration", createRuntimeMusicGeneration); - return runtime as PluginRuntime; + return runtime as unknown as PluginRuntime; } export type { PluginRuntime } from "./types.js"; diff --git a/src/plugins/runtime/types-core.ts b/src/plugins/runtime/types-core.ts index 3c75c02ed7d..109a1dca911 100644 --- a/src/plugins/runtime/types-core.ts +++ b/src/plugins/runtime/types-core.ts @@ -227,6 +227,9 @@ export type PluginRuntimeCore = { }; state: { resolveStateDir: typeof import("../../config/paths.js").resolveStateDir; + openKeyedStore: ( + options: import("../../plugin-state/plugin-state-store.types.js").OpenKeyedStoreOptions, + ) => import("../../plugin-state/plugin-state-store.types.js").PluginStateKeyedStore; }; tasks: { runs: PluginRuntimeTaskRuns; diff --git a/src/tasks/task-registry.maintenance.ts b/src/tasks/task-registry.maintenance.ts index 2607ab4e7c0..ca5516a81c4 100644 --- a/src/tasks/task-registry.maintenance.ts +++ b/src/tasks/task-registry.maintenance.ts @@ -14,6 +14,10 @@ import type { CronJob, CronStoreFile } from "../cron/types.js"; import { getAgentRunContext } from "../infra/agent-events.js"; import { getSessionBindingService } from "../infra/outbound/session-binding-service.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { + isPluginStateDatabaseOpen, + sweepExpiredPluginStateEntries, +} from "../plugin-state/plugin-state-store.js"; import { parseAgentSessionKey } from "../routing/session-key.js"; import { deriveSessionChatType } from "../sessions/session-chat-type.js"; import { @@ -846,6 +850,13 @@ export async function runTaskRegistryMaintenance(): Promise