From eafe2a8d0bb6a67fe43784bc5d3f4a73b33dc06e Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 4 Jul 2026 01:51:03 -0700 Subject: [PATCH] refactor: consolidate duplicated plugin state and doctor migration plumbing onto SDK seams (#99850) * refactor(plugin-sdk): add createPersistentDedupeCache and migrate channel presence caches * refactor(matrix): adopt SDK approval reaction target store * refactor(plugin-sdk): share doctor legacy-state migration fs helpers * refactor(memory-core): dedupe qmd cache entry envelope validation * chore(plugin-sdk): pin surface budgets for shared dedupe and doctor helpers * test(matrix): use future approval expiry fixtures for reaction targets * test(matrix): use future approval expiry fixtures for reaction targets --- docs/plugins/sdk-migration.md | 4 +- docs/plugins/sdk-subpaths.md | 2 +- extensions/acpx/doctor-contract-api.ts | 43 +---- .../active-memory/doctor-contract-api.ts | 37 +--- extensions/device-pair/doctor-contract-api.ts | 41 ++--- extensions/matrix/doctor-contract-api.ts | 18 +- .../src/approval-handler.runtime.test.ts | 19 +- .../matrix/src/approval-reactions.test.ts | 21 ++- extensions/matrix/src/approval-reactions.ts | 166 ++++------------- .../matrix/monitor/reaction-events.test.ts | 4 +- .../src/mattermost/thread-participation.ts | 129 ++++---------- extensions/memory-core/doctor-contract-api.ts | 61 ++----- .../src/memory/qmd-runtime-cache.ts | 56 +++--- .../memory-wiki/doctor-contract-api.test.ts | 2 +- extensions/memory-wiki/doctor-contract-api.ts | 48 ++--- extensions/msteams/doctor-contract-api.ts | 49 ++--- extensions/msteams/src/sent-message-cache.ts | 167 ++++-------------- extensions/nostr/doctor-contract-api.ts | 39 +--- .../phone-control/doctor-contract-api.ts | 43 ++--- .../src/monitor/inbound-delivery-state.ts | 122 +++---------- extensions/slack/src/sent-thread-cache.ts | 125 +++---------- extensions/voice-call/doctor-contract-api.ts | 36 +--- scripts/plugin-sdk-surface-report.mjs | 4 +- src/plugin-sdk/dedupe-runtime.test.ts | 116 ++++++++++++ src/plugin-sdk/dedupe-runtime.ts | 117 ++++++++++++ src/plugin-sdk/runtime-doctor.ts | 4 + src/plugins/doctor-state-migration-fs.ts | 38 ++++ 27 files changed, 565 insertions(+), 946 deletions(-) create mode 100644 src/plugin-sdk/dedupe-runtime.test.ts create mode 100644 src/plugins/doctor-state-migration-fs.ts diff --git a/docs/plugins/sdk-migration.md b/docs/plugins/sdk-migration.md index 3087ebd14ea6..25191d9ee7a2 100644 --- a/docs/plugins/sdk-migration.md +++ b/docs/plugins/sdk-migration.md @@ -440,7 +440,7 @@ releases. | Heartbeat wake, event, and visibility helpers | `openclaw/plugin-sdk/heartbeat-runtime` | | Pending delivery queue drain | `openclaw/plugin-sdk/delivery-queue-runtime` | | Channel activity telemetry | `openclaw/plugin-sdk/channel-activity-runtime` | - | In-memory dedupe caches | `openclaw/plugin-sdk/dedupe-runtime` | + | In-memory and persistent-backed dedupe caches | `openclaw/plugin-sdk/dedupe-runtime` | | Safe local-file/media path helpers | `openclaw/plugin-sdk/file-access-runtime` | | Dispatcher-aware fetch | `openclaw/plugin-sdk/runtime-fetch` | | Proxy and guarded fetch helpers | `openclaw/plugin-sdk/fetch-runtime` | @@ -567,7 +567,7 @@ releases. | `plugin-sdk/heartbeat-runtime` | Heartbeat helpers | Heartbeat wake, event, and visibility helpers | | `plugin-sdk/delivery-queue-runtime` | Delivery queue helpers | `drainPendingDeliveries` | | `plugin-sdk/channel-activity-runtime` | Channel activity helpers | `recordChannelActivity` | - | `plugin-sdk/dedupe-runtime` | Dedupe helpers | In-memory dedupe caches | + | `plugin-sdk/dedupe-runtime` | Dedupe helpers | In-memory and persistent-backed dedupe caches | | `plugin-sdk/file-access-runtime` | File access helpers | Safe local-file/media path helpers | | `plugin-sdk/transport-ready-runtime` | Transport readiness helpers | `waitForTransportReady` | | `plugin-sdk/exec-approvals-runtime` | Exec approval policy helpers | `loadExecApprovals`, `resolveExecApprovalsFromFile`, `ExecApprovalsFile` | diff --git a/docs/plugins/sdk-subpaths.md b/docs/plugins/sdk-subpaths.md index 92148e0c4faf..1c5b34c941ad 100644 --- a/docs/plugins/sdk-subpaths.md +++ b/docs/plugins/sdk-subpaths.md @@ -290,7 +290,7 @@ usage endpoint failed or returned no usable usage data. | `plugin-sdk/async-lock-runtime` | Process-local async lock helper for small runtime state files | | `plugin-sdk/channel-activity-runtime` | Channel activity telemetry helper | | `plugin-sdk/concurrency-runtime` | Bounded async task concurrency helper | - | `plugin-sdk/dedupe-runtime` | In-memory dedupe cache helpers | + | `plugin-sdk/dedupe-runtime` | In-memory and persistent-backed dedupe cache helpers | | `plugin-sdk/delivery-queue-runtime` | Outbound pending-delivery drain helper | | `plugin-sdk/file-access-runtime` | Safe local-file and media-source path helpers | | `plugin-sdk/heartbeat-runtime` | Heartbeat wake, event, and visibility helpers | diff --git a/extensions/acpx/doctor-contract-api.ts b/extensions/acpx/doctor-contract-api.ts index f64952d7eab0..1968d118d300 100644 --- a/extensions/acpx/doctor-contract-api.ts +++ b/extensions/acpx/doctor-contract-api.ts @@ -1,7 +1,10 @@ // ACPX doctor contract migrates shipped plugin-owned runtime state. import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { normalizeAcpxProcessLease, normalizeAcpxProcessLeaseFile, @@ -26,15 +29,6 @@ function resolveLegacyProcessLeasePath(stateDir: string): string { return path.join(stateDir, "acpx", ACPX_LEGACY_PROCESS_LEASE_FILE); } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - async function readLegacyGatewayInstanceId(filePath: string): Promise { try { const value = (await fs.readFile(filePath, "utf8")).trim(); @@ -55,27 +49,6 @@ async function readLegacyOpenProcessLeases(filePath: string): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated ACPX ${params.label} source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived ACPX ${params.label} legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving ACPX ${params.label} legacy source: ${String(err)}`); - } -} - export const stateMigrations: PluginDoctorStateMigration[] = [ { id: "acpx-runtime-state-to-plugin-state", @@ -171,9 +144,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ } if (gatewayInstanceId) { - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath: gatewayInstancePath, - label: "gateway-instance-id", + label: "ACPX gateway-instance-id", changes, warnings, }); @@ -193,9 +166,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ changes.push( `Migrated ACPX process leases -> plugin state (${imported} imported, ${alreadyPresent} already present)`, ); - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath: processLeasePath, - label: "process-leases", + label: "ACPX process-leases", changes, warnings, }); diff --git a/extensions/active-memory/doctor-contract-api.ts b/extensions/active-memory/doctor-contract-api.ts index 7d02c87f610b..3ff55e516068 100644 --- a/extensions/active-memory/doctor-contract-api.ts +++ b/extensions/active-memory/doctor-contract-api.ts @@ -5,7 +5,10 @@ import crypto from "node:crypto"; import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; type ActiveMemoryToggleEntry = { sessionKey: string; @@ -25,15 +28,6 @@ function activeMemoryToggleKey(sessionKey: string): string { return crypto.createHash("sha256").update(sessionKey, "utf8").digest("hex"); } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - async function readLegacyToggleEntries(filePath: string): Promise { try { const parsed = JSON.parse(await fs.readFile(filePath, "utf8")) as unknown; @@ -64,27 +58,6 @@ async function readLegacyToggleEntries(filePath: string): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated ${params.label} source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived ${params.label} legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`); - } -} - /** State migrations exposed to OpenClaw doctor for Active Memory. */ export const stateMigrations: PluginDoctorStateMigration[] = [ { @@ -139,7 +112,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ `Migrated ${imported} Active Memory session toggle ${imported === 1 ? "entry" : "entries"} -> plugin state`, ); } - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath, label: "Active Memory session toggles", changes, diff --git a/extensions/device-pair/doctor-contract-api.ts b/extensions/device-pair/doctor-contract-api.ts index 78a4da5baca2..06cbe65eb67d 100644 --- a/extensions/device-pair/doctor-contract-api.ts +++ b/extensions/device-pair/doctor-contract-api.ts @@ -1,7 +1,10 @@ // Device Pair doctor contract migrates shipped plugin-owned state. import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { DEVICE_PAIR_NOTIFY_LEGACY_STATE_FILE, DEVICE_PAIR_NOTIFY_SUBSCRIBER_MAX_ENTRIES, @@ -16,15 +19,6 @@ function resolveLegacyNotifyStatePath(stateDir: string): string { return path.join(stateDir, DEVICE_PAIR_NOTIFY_LEGACY_STATE_FILE); } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - async function readLegacyNotifyState(filePath: string): Promise { try { return normalizeLegacyNotifyState(JSON.parse(await fs.readFile(filePath, "utf8")) as unknown); @@ -33,26 +27,6 @@ async function readLegacyNotifyState(filePath: string): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated Device Pair notify-state source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived Device Pair notify-state legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving Device Pair notify-state legacy source: ${String(err)}`); - } -} - export const stateMigrations: PluginDoctorStateMigration[] = [ { id: "device-pair-notify-json-to-plugin-state", @@ -99,7 +73,12 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ changes.push( `Migrated Device Pair notify subscribers -> plugin state (${imported} imported, ${alreadyPresent} already present)`, ); - await archiveLegacySource({ filePath, changes, warnings }); + await archiveLegacyStateSource({ + filePath, + label: "Device Pair notify-state", + changes, + warnings, + }); return { changes, warnings }; }, }, diff --git a/extensions/matrix/doctor-contract-api.ts b/extensions/matrix/doctor-contract-api.ts index 810faf9af496..29df71ce46f7 100644 --- a/extensions/matrix/doctor-contract-api.ts +++ b/extensions/matrix/doctor-contract-api.ts @@ -2,7 +2,10 @@ import type { Dirent } from "node:fs"; // Matrix API module exposes the plugin public contract. import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + legacyStateFileExists, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { hasMatrixSyncCacheStateInStore, openMatrixSyncCacheStoreOptions, @@ -43,15 +46,6 @@ export { normalizeCompatibilityConfig, legacyConfigRules } from "./src/doctor-co const MATRIX_SYNC_CACHE_FILENAME = "bot-storage.json"; const MATRIX_STORAGE_META_FILENAME = "storage-meta.json"; -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - async function collectLegacyMatrixStateRoots( stateDir: string, filename: string, @@ -105,7 +99,7 @@ async function archiveLegacySyncCache(params: { }): Promise { const sourcePath = path.join(params.storageRootDir, MATRIX_SYNC_CACHE_FILENAME); const archivedPath = `${sourcePath}.migrated`; - if (await fileExists(archivedPath)) { + if (await legacyStateFileExists(archivedPath)) { params.warnings.push( `Left migrated Matrix sync cache in place because ${archivedPath} already exists`, ); @@ -128,7 +122,7 @@ async function archiveLegacyMatrixStateFile(params: { }): Promise { const sourcePath = path.join(params.storageRootDir, params.filename); const archivedPath = `${sourcePath}.migrated`; - if (await fileExists(archivedPath)) { + if (await legacyStateFileExists(archivedPath)) { params.warnings.push( `Left migrated ${params.label} in place because ${archivedPath} already exists`, ); diff --git a/extensions/matrix/src/approval-handler.runtime.test.ts b/extensions/matrix/src/approval-handler.runtime.test.ts index 56f2ebfa6740..b649f28b8193 100644 --- a/extensions/matrix/src/approval-handler.runtime.test.ts +++ b/extensions/matrix/src/approval-handler.runtime.test.ts @@ -7,7 +7,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { matrixApprovalNativeRuntime } from "./approval-handler.runtime.js"; import { clearMatrixApprovalReactionTargetsForTest, - resolveMatrixApprovalReactionTarget, + resolveMatrixApprovalReactionTargetWithPersistence, } from "./approval-reactions.js"; type MatrixDeliverPendingParams = Parameters< @@ -72,6 +72,11 @@ function buildMatrixApprovalRoomTarget( }; } +// Pending approvals expire in the future; the reaction target store TTLs its +// memory layer from `view.expiresAtMs - now`, so epoch-past fixtures would +// evict the mapping before assertions run. +const TEST_APPROVAL_EXPIRES_AT_MS = Date.now() + 5 * 60_000; + function buildExecApprovalView( overrides: Partial = {}, ): MatrixPendingExecApprovalView { @@ -102,7 +107,7 @@ function buildExecApprovalView( command: "/approve req-1 deny", }, ], - expiresAtMs: 1_000, + expiresAtMs: TEST_APPROVAL_EXPIRES_AT_MS, ...overrides, }; } @@ -129,7 +134,7 @@ function buildPluginApprovalView( command: "/approve plugin:req-1 allow-once", }, ], - expiresAtMs: 1_000, + expiresAtMs: TEST_APPROVAL_EXPIRES_AT_MS, ...overrides, }; } @@ -273,7 +278,7 @@ describe("matrixApprovalNativeRuntime", () => { kind: "plugin", title: "Plugin Approval Required", description: "Approve the tool call.", - expiresAtMs: 1_000, + expiresAtMs: TEST_APPROVAL_EXPIRES_AT_MS, metadata: [], allowedDecisions: ["allow-once"], actions: [ @@ -304,7 +309,7 @@ describe("matrixApprovalNativeRuntime", () => { }); const reactMessage = vi.fn().mockImplementation(async () => { expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!room:example.org", eventId: "$approval", reactionKey: "✅", @@ -510,7 +515,7 @@ describe("matrixApprovalNativeRuntime", () => { eventId: "$primary", }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!room:example.org", eventId: "$primary", reactionKey: "✅", @@ -520,7 +525,7 @@ describe("matrixApprovalNativeRuntime", () => { decision: "allow-once", }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!room:example.org", eventId: "$last", reactionKey: "✅", diff --git a/extensions/matrix/src/approval-reactions.test.ts b/extensions/matrix/src/approval-reactions.test.ts index 11e98d4b4c7c..d8bd041c62f9 100644 --- a/extensions/matrix/src/approval-reactions.test.ts +++ b/extensions/matrix/src/approval-reactions.test.ts @@ -5,7 +5,6 @@ import { clearMatrixApprovalReactionTargetsForTest, listMatrixApprovalReactionBindings, registerMatrixApprovalReactionTarget, - resolveMatrixApprovalReactionTarget, resolveMatrixApprovalReactionTargetWithPersistence, unregisterMatrixApprovalReactionTarget, } from "./approval-reactions.js"; @@ -31,7 +30,7 @@ describe("matrix approval reactions", () => { ); }); - it("resolves a registered approval anchor event back to an approval decision", () => { + it("resolves a registered approval anchor event back to an approval decision", async () => { registerMatrixApprovalReactionTarget({ roomId: "!ops:example.org", eventId: "$approval-msg", @@ -40,7 +39,7 @@ describe("matrix approval reactions", () => { }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg", reactionKey: "✅", @@ -50,7 +49,7 @@ describe("matrix approval reactions", () => { decision: "allow-once", }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg", reactionKey: "♾️", @@ -60,7 +59,7 @@ describe("matrix approval reactions", () => { decision: "allow-always", }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg", reactionKey: "❌", @@ -71,7 +70,7 @@ describe("matrix approval reactions", () => { }); }); - it("ignores reactions that are not allowed on the registered approval anchor event", () => { + it("ignores reactions that are not allowed on the registered approval anchor event", async () => { registerMatrixApprovalReactionTarget({ roomId: "!ops:example.org", eventId: "$approval-msg", @@ -80,7 +79,7 @@ describe("matrix approval reactions", () => { }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg", reactionKey: "♾️", @@ -88,7 +87,7 @@ describe("matrix approval reactions", () => { ).toBeNull(); }); - it("stops resolving reactions after the approval anchor event is unregistered", () => { + it("stops resolving reactions after the approval anchor event is unregistered", async () => { registerMatrixApprovalReactionTarget({ roomId: "!ops:example.org", eventId: "$approval-msg", @@ -101,7 +100,7 @@ describe("matrix approval reactions", () => { }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg", reactionKey: "✅", @@ -158,7 +157,7 @@ describe("matrix approval reactions", () => { expect(lookup).toHaveBeenCalledWith("!ops:example.org:$approval-msg-2"); }); - it("falls back to in-memory approval reaction targets when persistent state cannot open", () => { + it("falls back to in-memory approval reaction targets when persistent state cannot open", async () => { const warn = vi.fn(); setMatrixRuntime({ state: { @@ -177,7 +176,7 @@ describe("matrix approval reactions", () => { }); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg-3", reactionKey: "❌", diff --git a/extensions/matrix/src/approval-reactions.ts b/extensions/matrix/src/approval-reactions.ts index 4b8c20702736..f7d481918599 100644 --- a/extensions/matrix/src/approval-reactions.ts +++ b/extensions/matrix/src/approval-reactions.ts @@ -1,7 +1,10 @@ // Matrix plugin module implements approval reactions behavior. +import { createApprovalReactionTargetStore } from "openclaw/plugin-sdk/approval-reaction-runtime"; import type { ExecApprovalReplyDecision } from "openclaw/plugin-sdk/approval-runtime"; import { getOptionalMatrixRuntime } from "./runtime.js"; +// Matrix keeps its own reaction emoji set (checkmark/cross render reliably across +// Matrix clients), so decision resolution stays local instead of using the SDK bindings. const MATRIX_APPROVAL_REACTION_META = { "allow-once": { emoji: "✅", @@ -43,34 +46,6 @@ type MatrixApprovalReactionTarget = { allowedDecisions: readonly ExecApprovalReplyDecision[]; }; -type PersistedMatrixApprovalReactionTarget = { - version: 1; - target: MatrixApprovalReactionTarget; -}; - -type MatrixApprovalReactionStore = { - register( - key: string, - value: PersistedMatrixApprovalReactionTarget, - opts?: { ttlMs?: number }, - ): Promise; - lookup(key: string): Promise; - delete(key: string): Promise; -}; - -const matrixApprovalReactionTargets = new Map(); -let persistentStore: MatrixApprovalReactionStore | undefined; -let persistentStoreDisabled = false; - -function buildReactionTargetKey(roomId: string, eventId: string): string | null { - const normalizedRoomId = roomId.trim(); - const normalizedEventId = eventId.trim(); - if (!normalizedRoomId || !normalizedEventId) { - return null; - } - return `${normalizedRoomId}:${normalizedEventId}`; -} - function reportPersistentApprovalReactionError(error: unknown): void { try { getOptionalMatrixRuntime() @@ -81,85 +56,34 @@ function reportPersistentApprovalReactionError(error: unknown): void { } } -function disablePersistentApprovalReactionStore(error: unknown): void { - persistentStoreDisabled = true; - persistentStore = undefined; - reportPersistentApprovalReactionError(error); -} - -function getPersistentApprovalReactionStore(): MatrixApprovalReactionStore | undefined { - if (persistentStoreDisabled) { - return undefined; - } - if (persistentStore) { - return persistentStore; - } - const runtime = getOptionalMatrixRuntime(); - if (!runtime) { - return undefined; - } - try { - persistentStore = runtime.state.openKeyedStore({ - namespace: PERSISTENT_NAMESPACE, - maxEntries: PERSISTENT_MAX_ENTRIES, - defaultTtlMs: DEFAULT_REACTION_TARGET_TTL_MS, - }); - return persistentStore; - } catch (error) { - disablePersistentApprovalReactionStore(error); - return undefined; - } -} - -function readPersistedTarget(value: unknown): MatrixApprovalReactionTarget | null { - const persisted = value as PersistedMatrixApprovalReactionTarget | undefined; - if ( - persisted?.version !== 1 || - !persisted.target || - typeof persisted.target.approvalId !== "string" || - !Array.isArray(persisted.target.allowedDecisions) - ) { +function readPersistedTarget(target: unknown): MatrixApprovalReactionTarget | null { + const value = target as Partial | null | undefined; + if (!value || typeof value.approvalId !== "string" || !Array.isArray(value.allowedDecisions)) { return null; } - return persisted.target; + return { + approvalId: value.approvalId, + allowedDecisions: value.allowedDecisions, + }; } -function rememberPersistentApprovalReactionTarget(params: { - key: string; - target: MatrixApprovalReactionTarget; - ttlMs?: number; -}): void { - const ttlMs = params.ttlMs == null ? DEFAULT_REACTION_TARGET_TTL_MS : Math.max(1, params.ttlMs); - const store = getPersistentApprovalReactionStore(); - if (!store) { - return; - } - void store - .register(params.key, { version: 1, target: params.target }, { ttlMs }) - .catch(disablePersistentApprovalReactionStore); -} +const matrixApprovalReactionTargets = + createApprovalReactionTargetStore({ + namespace: PERSISTENT_NAMESPACE, + maxEntries: PERSISTENT_MAX_ENTRIES, + defaultTtlMs: DEFAULT_REACTION_TARGET_TTL_MS, + openStore: (storeParams) => getOptionalMatrixRuntime()?.state.openKeyedStore(storeParams), + logPersistentError: reportPersistentApprovalReactionError, + readPersistedTarget, + }); -function forgetPersistentApprovalReactionTarget(key: string): void { - const store = getPersistentApprovalReactionStore(); - if (!store) { - return; - } - void store.delete(key).catch(disablePersistentApprovalReactionStore); -} - -async function lookupPersistentApprovalReactionTarget( - key: string, -): Promise { - const store = getPersistentApprovalReactionStore(); - if (!store) { - return null; - } - try { - return readPersistedTarget(await store.lookup(key)); - } catch (error) { - disablePersistentApprovalReactionStore(error); +function buildReactionTargetKey(roomId: string, eventId: string): string | null { + const normalizedRoomId = roomId.trim(); + const normalizedEventId = eventId.trim(); + if (!normalizedRoomId || !normalizedEventId) { return null; } + return `${normalizedRoomId}:${normalizedEventId}`; } export function listMatrixApprovalReactionBindings( @@ -225,16 +149,11 @@ export function registerMatrixApprovalReactionTarget(params: { if (!key || !approvalId || allowedDecisions.length === 0) { return; } - const target = { - approvalId, - allowedDecisions, - }; - matrixApprovalReactionTargets.set(key, target); - rememberPersistentApprovalReactionTarget({ + matrixApprovalReactionTargets.register( key, - target, - ttlMs: params.ttlMs, - }); + { approvalId, allowedDecisions }, + { ttlMs: params.ttlMs }, + ); } export function unregisterMatrixApprovalReactionTarget(params: { @@ -246,7 +165,6 @@ export function unregisterMatrixApprovalReactionTarget(params: { return; } matrixApprovalReactionTargets.delete(key); - forgetPersistentApprovalReactionTarget(key); } function resolveTarget(params: { @@ -270,21 +188,6 @@ function resolveTarget(params: { }; } -export function resolveMatrixApprovalReactionTarget(params: { - roomId: string; - eventId: string; - reactionKey: string; -}): MatrixApprovalReactionResolution | null { - const key = buildReactionTargetKey(params.roomId, params.eventId); - if (!key) { - return null; - } - return resolveTarget({ - target: matrixApprovalReactionTargets.get(key), - reactionKey: params.reactionKey, - }); -} - export async function resolveMatrixApprovalReactionTargetWithPersistence(params: { roomId: string; eventId: string; @@ -294,21 +197,12 @@ export async function resolveMatrixApprovalReactionTargetWithPersistence(params: if (!key) { return null; } - const inMemory = resolveTarget({ - target: matrixApprovalReactionTargets.get(key), - reactionKey: params.reactionKey, - }); - if (inMemory) { - return inMemory; - } return resolveTarget({ - target: await lookupPersistentApprovalReactionTarget(key), + target: await matrixApprovalReactionTargets.lookup(key), reactionKey: params.reactionKey, }); } export function clearMatrixApprovalReactionTargetsForTest(): void { - matrixApprovalReactionTargets.clear(); - persistentStore = undefined; - persistentStoreDisabled = false; + matrixApprovalReactionTargets.clearForTest(); } diff --git a/extensions/matrix/src/matrix/monitor/reaction-events.test.ts b/extensions/matrix/src/matrix/monitor/reaction-events.test.ts index b317a364b64d..5c96b48f1b68 100644 --- a/extensions/matrix/src/matrix/monitor/reaction-events.test.ts +++ b/extensions/matrix/src/matrix/monitor/reaction-events.test.ts @@ -3,7 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { clearMatrixApprovalReactionTargetsForTest, registerMatrixApprovalReactionTarget, - resolveMatrixApprovalReactionTarget, + resolveMatrixApprovalReactionTargetWithPersistence, } from "../../approval-reactions.js"; import type { CoreConfig } from "../../types.js"; import { handleInboundMatrixReaction } from "./reaction-events.js"; @@ -295,7 +295,7 @@ describe("matrix approval reactions", () => { expect(client.getEvent).not.toHaveBeenCalled(); expect( - resolveMatrixApprovalReactionTarget({ + await resolveMatrixApprovalReactionTargetWithPersistence({ roomId: "!ops:example.org", eventId: "$approval-msg", reactionKey: "❌", diff --git a/extensions/mattermost/src/mattermost/thread-participation.ts b/extensions/mattermost/src/mattermost/thread-participation.ts index b4df9f8f8c9a..d1cd38f076c3 100644 --- a/extensions/mattermost/src/mattermost/thread-participation.ts +++ b/extensions/mattermost/src/mattermost/thread-participation.ts @@ -1,11 +1,10 @@ // Mattermost plugin module implements thread participation cache behavior. -import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; +import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; import { getOptionalMattermostRuntime } from "../runtime.js"; /** - * In-memory + persisted cache of Mattermost threads the bot has replied in. - * Lets the bot auto-respond to thread follow-ups without a re-mention after its - * first visible reply. Mirrors the Slack `sent-thread-cache` dual-layer pattern. + * Cache of Mattermost threads the bot has replied in. Lets the bot auto-respond + * to thread follow-ups without a re-mention after its first visible reply. */ const TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days @@ -18,99 +17,37 @@ type MattermostThreadParticipationRecord = { repliedAt: number; }; -type MattermostThreadParticipationStore = { - register( - key: string, - value: MattermostThreadParticipationRecord, - opts?: { ttlMs?: number }, - ): Promise; - lookup(key: string): Promise; -}; - /** * Keep thread participation shared across bundled chunks so thread auto-reply * gating does not diverge between the inbound-gate and reply-dispatch paths. */ const MATTERMOST_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.mattermostThreadParticipation"); -const threadParticipation = resolveGlobalDedupeCache(MATTERMOST_THREAD_PARTICIPATION_KEY, { +const threadParticipation = createPersistentDedupeCache({ + globalKey: MATTERMOST_THREAD_PARTICIPATION_KEY, ttlMs: TTL_MS, maxSize: MAX_ENTRIES, + persistent: { + namespace: PERSISTENT_NAMESPACE, + maxEntries: PERSISTENT_MAX_ENTRIES, + openStore: (options) => getOptionalMattermostRuntime()?.state.openKeyedStore(options), + logError: (error) => { + try { + getOptionalMattermostRuntime() + ?.logging.getChildLogger({ plugin: "mattermost", feature: "thread-participation-state" }) + .warn("Mattermost persistent thread participation state failed", { + error: String(error), + }); + } catch { + // Best effort only: persistent state must never break Mattermost message handling. + } + }, + }, }); -let persistentStore: MattermostThreadParticipationStore | undefined; -let persistentStoreDisabled = false; - function makeKey(accountId: string, channelId: string, threadRootId: string): string { return `${accountId}:${channelId}:${threadRootId}`; } -function reportPersistentThreadParticipationError(error: unknown): void { - try { - getOptionalMattermostRuntime() - ?.logging.getChildLogger({ plugin: "mattermost", feature: "thread-participation-state" }) - .warn("Mattermost persistent thread participation state failed", { error: String(error) }); - } catch { - // Best effort only: persistent state must never break Mattermost message handling. - } -} - -function disablePersistentThreadParticipation(error: unknown): void { - persistentStoreDisabled = true; - persistentStore = undefined; - reportPersistentThreadParticipationError(error); -} - -function getPersistentThreadParticipationStore(): MattermostThreadParticipationStore | undefined { - if (persistentStoreDisabled) { - return undefined; - } - if (persistentStore) { - return persistentStore; - } - const runtime = getOptionalMattermostRuntime(); - if (!runtime) { - return undefined; - } - try { - persistentStore = runtime.state.openKeyedStore({ - namespace: PERSISTENT_NAMESPACE, - maxEntries: PERSISTENT_MAX_ENTRIES, - defaultTtlMs: TTL_MS, - }); - return persistentStore; - } catch (error) { - disablePersistentThreadParticipation(error); - return undefined; - } -} - -function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void { - const store = getPersistentThreadParticipationStore(); - if (!store) { - return; - } - void store - .register(params.key, { - // Stored for future per-agent thread routing; current reads only need presence. - ...(params.agentId ? { agentId: params.agentId } : {}), - repliedAt: Date.now(), - }) - .catch(disablePersistentThreadParticipation); -} - -async function lookupPersistentThreadParticipation(key: string): Promise { - const store = getPersistentThreadParticipationStore(); - if (!store) { - return false; - } - try { - return Boolean(await store.lookup(key)); - } catch (error) { - disablePersistentThreadParticipation(error); - return false; - } -} - export function recordMattermostThreadParticipation( accountId: string, channelId: string, @@ -120,9 +57,11 @@ export function recordMattermostThreadParticipation( if (!accountId || !channelId || !threadRootId) { return; } - const key = makeKey(accountId, channelId, threadRootId); - threadParticipation.check(key); - rememberPersistentThreadParticipation({ key, agentId: opts?.agentId }); + void threadParticipation.register(makeKey(accountId, channelId, threadRootId), { + // Stored for future per-agent thread routing; current reads only need presence. + ...(opts?.agentId ? { agentId: opts.agentId } : {}), + repliedAt: Date.now(), + }); } export async function hasMattermostThreadParticipationWithPersistence(params: { @@ -133,19 +72,11 @@ export async function hasMattermostThreadParticipationWithPersistence(params: { if (!params.accountId || !params.channelId || !params.threadRootId) { return false; } - const key = makeKey(params.accountId, params.channelId, params.threadRootId); - if (threadParticipation.peek(key)) { - return true; - } - const found = await lookupPersistentThreadParticipation(key); - if (found) { - threadParticipation.check(key); - } - return found; + return await threadParticipation.lookup( + makeKey(params.accountId, params.channelId, params.threadRootId), + ); } export function clearMattermostThreadParticipationCache(): void { - threadParticipation.clear(); - persistentStore = undefined; - persistentStoreDisabled = false; + threadParticipation.clearForTest(); } diff --git a/extensions/memory-core/doctor-contract-api.ts b/extensions/memory-core/doctor-contract-api.ts index 6b3b02010a69..a6b8f9d0939d 100644 --- a/extensions/memory-core/doctor-contract-api.ts +++ b/extensions/memory-core/doctor-contract-api.ts @@ -18,7 +18,11 @@ import { } from "openclaw/plugin-sdk/memory-core-host-engine-storage"; import { resolveMemoryDreamingWorkspaces } from "openclaw/plugin-sdk/memory-core-host-status"; import { normalizeAgentId } from "openclaw/plugin-sdk/routing"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + legacyStateFileExists, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { ensureOpenClawAgentDatabaseSchema, resolveOpenClawAgentSqlitePath, @@ -724,7 +728,7 @@ async function collectLegacyMemorySidecarSources(params: { async function addSource(agentId: string, legacyPath: string): Promise { const normalizedPath = path.resolve(legacyPath); const key = `${agentId}\0${normalizedPath}`; - if (seen.has(key) || !(await fileExists(normalizedPath))) { + if (seen.has(key) || !(await legacyStateFileExists(normalizedPath))) { return; } seen.add(key); @@ -759,7 +763,7 @@ async function archiveLegacyMemorySidecar(params: { await Promise.all( LEGACY_MEMORY_SIDECAR_SUFFIXES.map(async (suffix) => { const filePath = `${params.source.legacyPath}${suffix}`; - return (await fileExists(filePath)) ? filePath : null; + return (await legacyStateFileExists(filePath)) ? filePath : null; }), ) ).filter((filePath): filePath is string => filePath !== null); @@ -770,7 +774,7 @@ async function archiveLegacyMemorySidecar(params: { await Promise.all( existingSources.map(async (sourcePath) => { const archivedPath = `${sourcePath}.migrated`; - return (await fileExists(archivedPath)) ? archivedPath : null; + return (await legacyStateFileExists(archivedPath)) ? archivedPath : null; }), ) ).filter((filePath): filePath is string => filePath !== null); @@ -789,7 +793,10 @@ async function archiveLegacyMemorySidecar(params: { } catch (err) { for (const entry of renamed.toReversed()) { try { - if ((await fileExists(entry.archivedPath)) && !(await fileExists(entry.sourcePath))) { + if ( + (await legacyStateFileExists(entry.archivedPath)) && + !(await legacyStateFileExists(entry.sourcePath)) + ) { await fs.rename(entry.archivedPath, entry.sourcePath); } } catch (rollbackErr) { @@ -827,7 +834,7 @@ async function preserveLegacyMemorySidecarRetryPath(params: { await Promise.all( LEGACY_MEMORY_SIDECAR_SUFFIXES.map(async (suffix) => { const targetPath = `${retryPath}${suffix}`; - return (await fileExists(targetPath)) ? targetPath : null; + return (await legacyStateFileExists(targetPath)) ? targetPath : null; }), ) ).filter((targetPath): targetPath is string => targetPath !== null); @@ -843,14 +850,14 @@ async function preserveLegacyMemorySidecarRetryPath(params: { .digest("hex") .slice(0, 12)}.sqlite`, ); - if (await fileExists(targetBasePath)) { + if (await legacyStateFileExists(targetBasePath)) { return; } const existingSources = ( await Promise.all( LEGACY_MEMORY_SIDECAR_SUFFIXES.map(async (suffix) => { const sourcePath = `${params.source.legacyPath}${suffix}`; - return (await fileExists(sourcePath)) + return (await legacyStateFileExists(sourcePath)) ? { sourcePath, targetPath: `${targetBasePath}${suffix}` } : null; }), @@ -985,42 +992,10 @@ function resolveConfiguredWorkspaces(config: unknown, env: NodeJS.ProcessEnv): s ).map((entry) => entry.workspaceDir); } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - async function readJsonFile(filePath: string): Promise { return JSON.parse(await fs.readFile(filePath, "utf8")); } -async function archiveLegacySource(params: { - filePath: string; - label: string; - changes: string[]; - warnings: string[]; -}): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated Memory Core ${params.label} source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived Memory Core ${params.label} legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push( - `Failed archiving Memory Core ${params.label} legacy source: ${String(err)}`, - ); - } -} - async function collectLegacySources( config: unknown, env: NodeJS.ProcessEnv, @@ -1035,7 +1010,7 @@ async function collectLegacySources( ]; for (const candidate of candidates) { const filePath = path.join(workspaceDir, candidate.relativePath); - if (await fileExists(filePath)) { + if (await legacyStateFileExists(filePath)) { sources.push({ workspaceDir, label: candidate.label, filePath }); } } @@ -1200,9 +1175,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ changes.push( `Migrated Memory Core ${source.label} -> SQLite plugin state (${imported} row(s))`, ); - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath: source.filePath, - label: source.label, + label: `Memory Core ${source.label}`, changes, warnings, }); diff --git a/extensions/memory-core/src/memory/qmd-runtime-cache.ts b/extensions/memory-core/src/memory/qmd-runtime-cache.ts index 8e1be67ce33b..9732d4a690f6 100644 --- a/extensions/memory-core/src/memory/qmd-runtime-cache.ts +++ b/extensions/memory-core/src/memory/qmd-runtime-cache.ts @@ -194,11 +194,19 @@ function multiCollectionProbeEntryKey(params: QmdRuntimeMultiCollectionProbeCach ); } -function normalizeCollectionValidationEntry( +type QmdRuntimeCacheEnvelope = { + record: Record; + createdAtMs: number; + expiresAtMs: number; + keyHash: string; +}; + +/** Validates the shared cache-entry envelope: version, expiry window, and key hash. */ +function normalizeCacheEntryEnvelope( value: unknown, nowMs: number, expectedKeyHash: string, -): QmdRuntimeCacheCollectionValidationEntry | undefined { +): QmdRuntimeCacheEnvelope | undefined { if (typeof value !== "object" || value === null) { return undefined; } @@ -229,6 +237,20 @@ function normalizeCollectionValidationEntry( return undefined; } + return { record, createdAtMs, expiresAtMs, keyHash }; +} + +function normalizeCollectionValidationEntry( + value: unknown, + nowMs: number, + expectedKeyHash: string, +): QmdRuntimeCacheCollectionValidationEntry | undefined { + const envelope = normalizeCacheEntryEnvelope(value, nowMs, expectedKeyHash); + if (!envelope) { + return undefined; + } + const { record, createdAtMs, expiresAtMs, keyHash } = envelope; + const validation = record.validation; if (typeof validation !== "object" || validation === null) { return undefined; @@ -262,35 +284,11 @@ function normalizeMultiCollectionProbeEntry( nowMs: number, expectedKeyHash: string, ): QmdRuntimeCacheMultiCollectionProbeEntry | undefined { - if (typeof value !== "object" || value === null) { - return undefined; - } - const record = value as Record; - if (record.version !== QMD_RUNTIME_CACHE_ENTRY_VERSION) { - return undefined; - } - - const createdAtMs = - typeof record.createdAtMs === "number" - ? Math.max(0, Math.floor(record.createdAtMs)) - : Number.NaN; - const expiresAtMs = - typeof record.expiresAtMs === "number" - ? Math.max(0, Math.floor(record.expiresAtMs)) - : Number.NaN; - if ( - !Number.isFinite(createdAtMs) || - !Number.isFinite(expiresAtMs) || - !Number.isFinite(nowMs) || - nowMs >= expiresAtMs - ) { - return undefined; - } - - const keyHash = normalizeText(typeof record.keyHash === "string" ? record.keyHash : ""); - if (keyHash !== expectedKeyHash) { + const envelope = normalizeCacheEntryEnvelope(value, nowMs, expectedKeyHash); + if (!envelope) { return undefined; } + const { record, createdAtMs, expiresAtMs, keyHash } = envelope; const probe = record.multiCollectionProbe; if (typeof probe !== "object" || probe === null) { diff --git a/extensions/memory-wiki/doctor-contract-api.test.ts b/extensions/memory-wiki/doctor-contract-api.test.ts index 5abc1349c2de..d16413b8219b 100644 --- a/extensions/memory-wiki/doctor-contract-api.test.ts +++ b/extensions/memory-wiki/doctor-contract-api.test.ts @@ -166,7 +166,7 @@ describe("memory-wiki doctor source sync migration", () => { await expect(migration.migrateLegacyState(params)).resolves.toEqual({ changes: [ "Migrated Memory Wiki import runs -> plugin state (1 imported, 0 existing)", - expect.stringContaining("Archived Memory Wiki import-run legacy record ->"), + expect.stringContaining("Archived Memory Wiki import-run legacy source ->"), ], warnings: [], }); diff --git a/extensions/memory-wiki/doctor-contract-api.ts b/extensions/memory-wiki/doctor-contract-api.ts index adc4731cf4b1..0252672cb72b 100644 --- a/extensions/memory-wiki/doctor-contract-api.ts +++ b/extensions/memory-wiki/doctor-contract-api.ts @@ -2,7 +2,11 @@ import fs from "node:fs/promises"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/plugin-entry"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + legacyStateFileExists, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { resolveMemoryWikiConfig, type MemoryWikiPluginConfig } from "./src/config.js"; export { legacyConfigRules, normalizeCompatibilityConfig } from "./src/config-compat.js"; import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime"; @@ -49,36 +53,6 @@ function resolveConfiguredVaultRoots(params: { return [resolved.vault.path]; } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - -async function archiveLegacySource(params: { - filePath: string; - label: string; - changes: string[]; - warnings: string[]; -}): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated ${params.label} in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived ${params.label} -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving ${params.label}: ${String(err)}`); - } -} - async function archiveLegacyImportRunRecords(params: { vaultRoot: string; changes: string[]; @@ -97,9 +71,9 @@ async function archiveLegacyImportRunRecords(params: { if (!entry.isFile() || !entry.name.endsWith(".json")) { continue; } - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath: path.join(importRunsDir, entry.name), - label: "Memory Wiki import-run legacy record", + label: "Memory Wiki import-run", changes: params.changes, warnings: params.warnings, }); @@ -128,7 +102,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ const filePath = resolveMemoryWikiSourceSyncStatePath(vaultRoot); const state = await readLegacyMemoryWikiSourceSyncState(vaultRoot); const count = Object.keys(state.entries).length; - if (count === 0 || !(await fileExists(filePath))) { + if (count === 0 || !(await legacyStateFileExists(filePath))) { continue; } previews.push( @@ -146,7 +120,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ env: params.env, })) { const filePath = resolveMemoryWikiSourceSyncStatePath(vaultRoot); - if (!(await fileExists(filePath))) { + if (!(await legacyStateFileExists(filePath))) { continue; } const state = await readLegacyMemoryWikiSourceSyncState(vaultRoot); @@ -176,9 +150,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ changes.push( `Migrated Memory Wiki source sync -> plugin state (${importedCount} imported, ${existingCount} existing)`, ); - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath, - label: "Memory Wiki source-sync legacy source", + label: "Memory Wiki source-sync", changes, warnings, }); diff --git a/extensions/msteams/doctor-contract-api.ts b/extensions/msteams/doctor-contract-api.ts index 165d3fe064d8..4071e58b36fd 100644 --- a/extensions/msteams/doctor-contract-api.ts +++ b/extensions/msteams/doctor-contract-api.ts @@ -3,7 +3,10 @@ import crypto from "node:crypto"; import type { Dirent } from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime"; import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime"; import { normalizeStoredConversationId } from "./src/conversation-store-helpers.js"; @@ -133,15 +136,6 @@ function listCandidateStorePaths(params: { return [...paths]; } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - function resolveStateFilePath(stateDir: string, filename: string): string { return path.join(stateDir, filename); } @@ -265,28 +259,6 @@ async function listLegacyLearningFiles( return files; } -async function archiveLegacySource(params: { - filePath: string; - label?: string; - changes: string[]; - warnings: string[]; -}): Promise { - const archivedPath = `${params.filePath}.migrated`; - const label = params.label ?? "Microsoft Teams feedback-learning"; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated ${label} source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived ${label} legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving ${label} legacy source: ${String(err)}`); - } -} - function mergeLearnings(legacy: string[], existing?: FeedbackLearningEntry): string[] { const seen = new Set(); const merged: string[] = []; @@ -347,7 +319,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ changes.push( `Migrated ${imported} ${MSTEAMS_PLUGIN_ID} conversation ${imported === 1 ? "entry" : "entries"} -> plugin state`, ); - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath, label: `${MSTEAMS_PLUGIN_ID} conversation`, changes, @@ -422,7 +394,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ changes.push( `Migrated ${imported} ${MSTEAMS_PLUGIN_ID} poll ${imported === 1 ? "entry" : "entries"} -> plugin state`, ); - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath, label: `${MSTEAMS_PLUGIN_ID} poll`, changes, @@ -486,7 +458,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ `Skipped ${skipped} malformed ${MSTEAMS_PLUGIN_ID} SSO token ${skipped === 1 ? "entry" : "entries"} during migration`, ); } - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath, label: `${MSTEAMS_PLUGIN_ID} SSO-token`, changes, @@ -555,7 +527,12 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ updatedAt: Date.now(), }); imported++; - await archiveLegacySource({ filePath: file.filePath, changes, warnings }); + await archiveLegacyStateSource({ + filePath: file.filePath, + label: "Microsoft Teams feedback-learning", + changes, + warnings, + }); } if (imported > 0) { changes.unshift( diff --git a/extensions/msteams/src/sent-message-cache.ts b/extensions/msteams/src/sent-message-cache.ts index 4ff25602a420..83fccdb519ba 100644 --- a/extensions/msteams/src/sent-message-cache.ts +++ b/extensions/msteams/src/sent-message-cache.ts @@ -1,7 +1,9 @@ // Msteams plugin module implements sent message cache behavior. +import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; import { getOptionalMSTeamsRuntime } from "./runtime.js"; const TTL_MS = 24 * 60 * 60 * 1000; +const MAX_ENTRIES = 20_000; const PERSISTENT_MAX_ENTRIES = 1000; const PERSISTENT_NAMESPACE = "msteams.sent-messages"; const MSTEAMS_SENT_MESSAGES_KEY = Symbol.for("openclaw.msteamsSentMessages"); @@ -10,144 +12,45 @@ type MSTeamsSentMessageRecord = { sentAt: number; }; -type MSTeamsSentMessageStore = { - register(key: string, value: MSTeamsSentMessageRecord, opts?: { ttlMs?: number }): Promise; - lookup(key: string): Promise; -}; +const sentMessages = createPersistentDedupeCache({ + globalKey: MSTEAMS_SENT_MESSAGES_KEY, + ttlMs: TTL_MS, + maxSize: MAX_ENTRIES, + persistent: { + namespace: PERSISTENT_NAMESPACE, + maxEntries: PERSISTENT_MAX_ENTRIES, + openStore: (options) => getOptionalMSTeamsRuntime()?.state.openKeyedStore(options), + logError: (error) => { + try { + getOptionalMSTeamsRuntime() + ?.logging.getChildLogger({ plugin: "msteams", feature: "sent-message-state" }) + .warn("Microsoft Teams persistent sent-message state failed", { error: String(error) }); + } catch { + // Best effort only: persistent state must never break Teams routing. + } + }, + // Re-prime with the original send time so restored entries keep their TTL window. + readTimestamp: (record) => record.sentAt, + }, +}); -let sentMessageCache: Map> | undefined; -let persistentStore: MSTeamsSentMessageStore | undefined; -let persistentStoreDisabled = false; - -function getSentMessageCache(): Map> { - if (!sentMessageCache) { - const globalStore = globalThis as Record; - sentMessageCache = - (globalStore[MSTEAMS_SENT_MESSAGES_KEY] as Map> | undefined) ?? - new Map>(); - globalStore[MSTEAMS_SENT_MESSAGES_KEY] = sentMessageCache; - } - return sentMessageCache; -} - -function makePersistentKey(conversationId: string, messageId: string): string { +function makeKey(conversationId: string, messageId: string): string { return `${conversationId}:${messageId}`; } -function reportPersistentSentMessageError(error: unknown): void { - try { - getOptionalMSTeamsRuntime() - ?.logging.getChildLogger({ plugin: "msteams", feature: "sent-message-state" }) - .warn("Microsoft Teams persistent sent-message state failed", { error: String(error) }); - } catch { - // Best effort only: persistent state must never break Teams routing. - } -} - -function disablePersistentSentMessageStore(error: unknown): void { - persistentStoreDisabled = true; - persistentStore = undefined; - reportPersistentSentMessageError(error); -} - -function getPersistentSentMessageStore(): MSTeamsSentMessageStore | undefined { - if (persistentStoreDisabled) { - return undefined; - } - if (persistentStore) { - return persistentStore; - } - const runtime = getOptionalMSTeamsRuntime(); - if (!runtime) { - return undefined; - } - try { - persistentStore = runtime.state.openKeyedStore({ - namespace: PERSISTENT_NAMESPACE, - maxEntries: PERSISTENT_MAX_ENTRIES, - defaultTtlMs: TTL_MS, - }); - return persistentStore; - } catch (error) { - disablePersistentSentMessageStore(error); - return undefined; - } -} - -function cleanupExpired(scopeKey: string, entry: Map, now: number): void { - for (const [id, timestamp] of entry) { - if (now - timestamp > TTL_MS) { - entry.delete(id); - } - } - if (entry.size === 0) { - getSentMessageCache().delete(scopeKey); - } -} - -function rememberSentMessageInMemory( - conversationId: string, - messageId: string, - sentAt: number, -): void { - const store = getSentMessageCache(); - let entry = store.get(conversationId); - if (!entry) { - entry = new Map(); - store.set(conversationId, entry); - } - entry.set(messageId, sentAt); - if (entry.size > 200) { - cleanupExpired(conversationId, entry, sentAt); - } -} - -function rememberPersistentSentMessage(params: { - conversationId: string; - messageId: string; - sentAt: number; -}): void { - const store = getPersistentSentMessageStore(); - if (!store) { - return; - } - void store - .register(makePersistentKey(params.conversationId, params.messageId), { sentAt: params.sentAt }) - .catch(disablePersistentSentMessageStore); -} - -async function lookupPersistentSentMessage(params: { - conversationId: string; - messageId: string; -}): Promise { - const store = getPersistentSentMessageStore(); - if (!store) { - return undefined; - } - try { - return (await store.lookup(makePersistentKey(params.conversationId, params.messageId)))?.sentAt; - } catch (error) { - disablePersistentSentMessageStore(error); - return undefined; - } -} - export function recordMSTeamsSentMessage(conversationId: string, messageId: string): void { if (!conversationId || !messageId) { return; } - const now = Date.now(); - rememberSentMessageInMemory(conversationId, messageId, now); - rememberPersistentSentMessage({ conversationId, messageId, sentAt: now }); + const sentAt = Date.now(); + void sentMessages.register(makeKey(conversationId, messageId), { sentAt }, { at: sentAt }); } export function wasMSTeamsMessageSent(conversationId: string, messageId: string): boolean { - const entry = getSentMessageCache().get(conversationId); - if (!entry) { + if (!conversationId || !messageId) { return false; } - cleanupExpired(conversationId, entry, Date.now()); - return entry.has(messageId); + return sentMessages.peek(makeKey(conversationId, messageId)); } export async function wasMSTeamsMessageSentWithPersistence(params: { @@ -157,19 +60,9 @@ export async function wasMSTeamsMessageSentWithPersistence(params: { if (!params.conversationId || !params.messageId) { return false; } - if (wasMSTeamsMessageSent(params.conversationId, params.messageId)) { - return true; - } - const sentAt = await lookupPersistentSentMessage(params); - if (sentAt == null) { - return false; - } - rememberSentMessageInMemory(params.conversationId, params.messageId, sentAt); - return wasMSTeamsMessageSent(params.conversationId, params.messageId); + return await sentMessages.lookup(makeKey(params.conversationId, params.messageId)); } export function clearMSTeamsSentMessageCache(): void { - getSentMessageCache().clear(); - persistentStore = undefined; - persistentStoreDisabled = false; + sentMessages.clearForTest(); } diff --git a/extensions/nostr/doctor-contract-api.ts b/extensions/nostr/doctor-contract-api.ts index dbb6e1aec55f..56436b083311 100644 --- a/extensions/nostr/doctor-contract-api.ts +++ b/extensions/nostr/doctor-contract-api.ts @@ -2,7 +2,10 @@ import type { Dirent } from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; import { normalizeNostrStateAccountId } from "./src/state-account-id.js"; type NostrBusState = { @@ -75,15 +78,6 @@ function parseProfileState(value: unknown): NostrProfileState | null { }; } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - async function readJsonFile(filePath: string): Promise { return JSON.parse(await fs.readFile(filePath, "utf8")) as unknown; } @@ -121,27 +115,6 @@ async function listLegacyFiles(params: { return files; } -async function archiveLegacySource(params: { - filePath: string; - label: string; - changes: string[]; - warnings: string[]; -}): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated ${params.label} source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived ${params.label} legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`); - } -} - async function ensureStoreCapacity(params: { files: Array<{ accountId: string }>; store: { entries: () => Promise> }; @@ -210,7 +183,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ existingKeys.add(file.accountId); imported++; } - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath: file.filePath, label: "Nostr bus state", changes, @@ -272,7 +245,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ existingKeys.add(file.accountId); imported++; } - await archiveLegacySource({ + await archiveLegacyStateSource({ filePath: file.filePath, label: "Nostr profile state", changes, diff --git a/extensions/phone-control/doctor-contract-api.ts b/extensions/phone-control/doctor-contract-api.ts index 5cef4ebd03ed..66cca40f9205 100644 --- a/extensions/phone-control/doctor-contract-api.ts +++ b/extensions/phone-control/doctor-contract-api.ts @@ -1,7 +1,10 @@ // Phone Control API module exposes the plugin public contract. import fs from "node:fs/promises"; import path from "node:path"; -import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor"; +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, +} from "openclaw/plugin-sdk/runtime-doctor"; type ArmGroup = "camera" | "screen" | "writes" | "all"; @@ -31,15 +34,6 @@ function resolveArmStatePath(stateDir: string): string { return path.join(stateDir, "plugins", "phone-control", "armed.json"); } -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} - function isStringArray(value: unknown): value is string[] { return Array.isArray(value) && value.every((entry) => typeof entry === "string"); } @@ -99,28 +93,6 @@ async function readLegacyArmState(filePath: string): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated Phone Control armed-state source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived Phone Control armed-state legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push( - `Failed archiving Phone Control armed-state legacy source: ${String(err)}`, - ); - } -} - export const stateMigrations: PluginDoctorStateMigration[] = [ { id: "phone-control-armed-json-to-plugin-state", @@ -156,7 +128,12 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ } await store.register(ARM_STATE_KEY, state); changes.push("Migrated Phone Control armed state -> plugin state"); - await archiveLegacySource({ filePath, changes, warnings }); + await archiveLegacyStateSource({ + filePath, + label: "Phone Control armed-state", + changes, + warnings, + }); return { changes, warnings }; }, }, diff --git a/extensions/slack/src/monitor/inbound-delivery-state.ts b/extensions/slack/src/monitor/inbound-delivery-state.ts index c8470cfbe95c..1815779d5281 100644 --- a/extensions/slack/src/monitor/inbound-delivery-state.ts +++ b/extensions/slack/src/monitor/inbound-delivery-state.ts @@ -1,5 +1,5 @@ // Slack plugin module implements inbound delivery state behavior. -import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; +import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; import { getOptionalSlackRuntime } from "../runtime.js"; import type { SlackMessageEvent } from "../types.js"; @@ -13,92 +13,30 @@ type SlackInboundDeliveryRecord = { deliveredAt: number; }; -type SlackInboundDeliveryStore = { - register( - key: string, - value: SlackInboundDeliveryRecord, - opts?: { ttlMs?: number }, - ): Promise; - lookup(key: string): Promise; -}; - -const deliveredMessages = resolveGlobalDedupeCache(SLACK_INBOUND_DELIVERIES_KEY, { +const deliveredMessages = createPersistentDedupeCache({ + globalKey: SLACK_INBOUND_DELIVERIES_KEY, ttlMs: TTL_MS, maxSize: MAX_ENTRIES, + persistent: { + namespace: PERSISTENT_NAMESPACE, + maxEntries: PERSISTENT_MAX_ENTRIES, + openStore: (options) => getOptionalSlackRuntime()?.state.openKeyedStore(options), + logError: (error) => { + try { + getOptionalSlackRuntime() + ?.logging.getChildLogger({ plugin: "slack", feature: "inbound-delivery-state" }) + .warn("Slack persistent inbound delivery state failed", { error: String(error) }); + } catch { + // Best effort only: persistent state must never break Slack message handling. + } + }, + }, }); -let persistentStore: SlackInboundDeliveryStore | undefined; -let persistentStoreDisabled = false; - function makeKey(accountId: string, channelId: string, ts: string): string { return `${accountId}:${channelId}:${ts}`; } -function reportPersistentInboundDeliveryError(error: unknown): void { - try { - getOptionalSlackRuntime() - ?.logging.getChildLogger({ plugin: "slack", feature: "inbound-delivery-state" }) - .warn("Slack persistent inbound delivery state failed", { error: String(error) }); - } catch { - // Best effort only: persistent state must never break Slack message handling. - } -} - -function disablePersistentInboundDelivery(error: unknown): void { - persistentStoreDisabled = true; - persistentStore = undefined; - reportPersistentInboundDeliveryError(error); -} - -function getPersistentInboundDeliveryStore(): SlackInboundDeliveryStore | undefined { - if (persistentStoreDisabled) { - return undefined; - } - if (persistentStore) { - return persistentStore; - } - const runtime = getOptionalSlackRuntime(); - if (!runtime) { - return undefined; - } - try { - persistentStore = runtime.state.openKeyedStore({ - namespace: PERSISTENT_NAMESPACE, - maxEntries: PERSISTENT_MAX_ENTRIES, - defaultTtlMs: TTL_MS, - }); - return persistentStore; - } catch (error) { - disablePersistentInboundDelivery(error); - return undefined; - } -} - -async function lookupPersistentInboundDelivery(key: string): Promise { - const store = getPersistentInboundDeliveryStore(); - if (!store) { - return false; - } - try { - return Boolean(await store.lookup(key)); - } catch (error) { - disablePersistentInboundDelivery(error); - return false; - } -} - -async function rememberPersistentInboundDelivery(key: string, deliveredAt: number): Promise { - const store = getPersistentInboundDeliveryStore(); - if (!store) { - return; - } - try { - await store.register(key, { deliveredAt }); - } catch (error) { - disablePersistentInboundDelivery(error); - } -} - export async function hasSlackInboundMessageDelivery(params: { accountId: string; channelId: string | undefined; @@ -107,15 +45,7 @@ export async function hasSlackInboundMessageDelivery(params: { if (!params.accountId || !params.channelId || !params.ts) { return false; } - const key = makeKey(params.accountId, params.channelId, params.ts); - if (deliveredMessages.peek(key)) { - return true; - } - const found = await lookupPersistentInboundDelivery(key); - if (found) { - deliveredMessages.check(key); - } - return found; + return await deliveredMessages.lookup(makeKey(params.accountId, params.channelId, params.ts)); } export async function recordSlackInboundMessageDeliveries(params: { @@ -133,17 +63,13 @@ export async function recordSlackInboundMessageDeliveries(params: { } keys.add(makeKey(params.accountId, message.channel, message.ts)); } - if (keys.size === 0) { - return; - } - for (const key of keys) { - deliveredMessages.check(key, deliveredAt); - } - await Promise.all(Array.from(keys, (key) => rememberPersistentInboundDelivery(key, deliveredAt))); + await Promise.all( + Array.from(keys, (key) => + deliveredMessages.register(key, { deliveredAt }, { at: deliveredAt }), + ), + ); } export function clearSlackInboundDeliveryStateForTest(): void { - deliveredMessages.clear(); - persistentStore = undefined; - persistentStoreDisabled = false; + deliveredMessages.clearForTest(); } diff --git a/extensions/slack/src/sent-thread-cache.ts b/extensions/slack/src/sent-thread-cache.ts index d19650f585e1..1be35d97b5fc 100644 --- a/extensions/slack/src/sent-thread-cache.ts +++ b/extensions/slack/src/sent-thread-cache.ts @@ -1,11 +1,10 @@ // Slack plugin module implements sent thread cache behavior. -import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; +import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime"; import { getOptionalSlackRuntime } from "./runtime.js"; /** - * In-memory cache of Slack threads the bot has participated in. + * Cache of Slack threads the bot has participated in. * Used to auto-respond in threads without requiring @mention after the first reply. - * Follows a similar TTL pattern to the MS Teams and Telegram sent-message caches. */ const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours @@ -18,99 +17,35 @@ type SlackThreadParticipationRecord = { repliedAt: number; }; -type SlackThreadParticipationStore = { - register( - key: string, - value: SlackThreadParticipationRecord, - opts?: { ttlMs?: number }, - ): Promise; - lookup(key: string): Promise; -}; - /** * Keep Slack thread participation shared across bundled chunks so thread * auto-reply gating does not diverge between prepare/dispatch call paths. */ const SLACK_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.slackThreadParticipation"); -const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_KEY, { +const threadParticipation = createPersistentDedupeCache({ + globalKey: SLACK_THREAD_PARTICIPATION_KEY, ttlMs: TTL_MS, maxSize: MAX_ENTRIES, + persistent: { + namespace: PERSISTENT_NAMESPACE, + maxEntries: PERSISTENT_MAX_ENTRIES, + openStore: (options) => getOptionalSlackRuntime()?.state.openKeyedStore(options), + logError: (error) => { + try { + getOptionalSlackRuntime() + ?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" }) + .warn("Slack persistent thread participation state failed", { error: String(error) }); + } catch { + // Best effort only: persistent state must never break Slack message handling. + } + }, + }, }); -let persistentStore: SlackThreadParticipationStore | undefined; -let persistentStoreDisabled = false; - function makeKey(accountId: string, channelId: string, threadTs: string): string { return `${accountId}:${channelId}:${threadTs}`; } -function reportPersistentThreadParticipationError(error: unknown): void { - try { - getOptionalSlackRuntime() - ?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" }) - .warn("Slack persistent thread participation state failed", { error: String(error) }); - } catch { - // Best effort only: persistent state must never break Slack message handling. - } -} - -function disablePersistentThreadParticipation(error: unknown): void { - persistentStoreDisabled = true; - persistentStore = undefined; - reportPersistentThreadParticipationError(error); -} - -function getPersistentThreadParticipationStore(): SlackThreadParticipationStore | undefined { - if (persistentStoreDisabled) { - return undefined; - } - if (persistentStore) { - return persistentStore; - } - const runtime = getOptionalSlackRuntime(); - if (!runtime) { - return undefined; - } - try { - persistentStore = runtime.state.openKeyedStore({ - namespace: PERSISTENT_NAMESPACE, - maxEntries: PERSISTENT_MAX_ENTRIES, - defaultTtlMs: TTL_MS, - }); - return persistentStore; - } catch (error) { - disablePersistentThreadParticipation(error); - return undefined; - } -} - -function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void { - const store = getPersistentThreadParticipationStore(); - if (!store) { - return; - } - void store - .register(params.key, { - // Stored for future per-agent thread routing; current reads only need presence. - ...(params.agentId ? { agentId: params.agentId } : {}), - repliedAt: Date.now(), - }) - .catch(disablePersistentThreadParticipation); -} - -async function lookupPersistentThreadParticipation(key: string): Promise { - const store = getPersistentThreadParticipationStore(); - if (!store) { - return false; - } - try { - return Boolean(await store.lookup(key)); - } catch (error) { - disablePersistentThreadParticipation(error); - return false; - } -} - export function recordSlackThreadParticipation( accountId: string, channelId: string, @@ -120,9 +55,11 @@ export function recordSlackThreadParticipation( if (!accountId || !channelId || !threadTs) { return; } - const key = makeKey(accountId, channelId, threadTs); - threadParticipation.check(key); - rememberPersistentThreadParticipation({ key, agentId: opts?.agentId }); + void threadParticipation.register(makeKey(accountId, channelId, threadTs), { + // Stored for future per-agent thread routing; current reads only need presence. + ...(opts?.agentId ? { agentId: opts.agentId } : {}), + repliedAt: Date.now(), + }); } export function hasSlackThreadParticipation( @@ -144,19 +81,11 @@ export async function hasSlackThreadParticipationWithPersistence(params: { if (!params.accountId || !params.channelId || !params.threadTs) { return false; } - const key = makeKey(params.accountId, params.channelId, params.threadTs); - if (threadParticipation.peek(key)) { - return true; - } - const found = await lookupPersistentThreadParticipation(key); - if (found) { - threadParticipation.check(key); - } - return found; + return await threadParticipation.lookup( + makeKey(params.accountId, params.channelId, params.threadTs), + ); } export function clearSlackThreadParticipationCache(): void { - threadParticipation.clear(); - persistentStore = undefined; - persistentStoreDisabled = false; + threadParticipation.clearForTest(); } diff --git a/extensions/voice-call/doctor-contract-api.ts b/extensions/voice-call/doctor-contract-api.ts index 7c0600906828..666e74c5da60 100644 --- a/extensions/voice-call/doctor-contract-api.ts +++ b/extensions/voice-call/doctor-contract-api.ts @@ -4,9 +4,10 @@ import os from "node:os"; import path from "node:path"; import type { OpenClawConfig } from "openclaw/plugin-sdk/plugin-entry"; import { normalizeAgentId } from "openclaw/plugin-sdk/routing"; -import type { - PluginDoctorStateMigration, - PluginStateKeyedStore, +import { + archiveLegacyStateSource, + type PluginDoctorStateMigration, + type PluginStateKeyedStore, } from "openclaw/plugin-sdk/runtime-doctor"; import { buildVoiceCallLegacyJsonlEventKey, @@ -126,14 +127,6 @@ function resolveVoiceCallStorePath(params: { } /** Return true when a path exists and is a file. */ -async function fileExists(filePath: string): Promise { - try { - const stat = await fs.stat(filePath); - return stat.isFile(); - } catch { - return false; - } -} /** Build the plugin state key for one migrated event chunk. */ function buildChunkKey(eventKey: string, index: number): string { @@ -214,25 +207,6 @@ async function readLegacyCallRecords(filePath: string): Promise<{ } /** Archive the legacy JSONL source after a complete migration. */ -async function archiveLegacySource(params: { - filePath: string; - changes: string[]; - warnings: string[]; -}): Promise { - const archivedPath = `${params.filePath}.migrated`; - if (await fileExists(archivedPath)) { - params.warnings.push( - `Left migrated Voice Call call-log source in place because ${archivedPath} already exists`, - ); - return; - } - try { - await fs.rename(params.filePath, archivedPath); - params.changes.push(`Archived Voice Call call-log legacy source -> ${archivedPath}`); - } catch (err) { - params.warnings.push(`Failed archiving Voice Call call-log legacy source: ${String(err)}`); - } -} /** Select newest missing records that fit remaining plugin state capacity. */ async function selectEntriesForImport(params: { @@ -356,7 +330,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [ warnings.push("Left Voice Call call-log source in place because migration was incomplete"); return { changes, warnings }; } - await archiveLegacySource({ filePath, changes, warnings }); + await archiveLegacyStateSource({ filePath, label: "Voice Call call-log", changes, warnings }); return { changes, warnings }; }, }, diff --git a/scripts/plugin-sdk-surface-report.mjs b/scripts/plugin-sdk-surface-report.mjs index e8675b768123..06a2a3a907c7 100644 --- a/scripts/plugin-sdk-surface-report.mjs +++ b/scripts/plugin-sdk-surface-report.mjs @@ -202,8 +202,8 @@ let publicDeprecatedExportsByEntrypointBudget; try { budgets = { publicEntrypoints: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_ENTRYPOINTS", 323), - publicExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_EXPORTS", 10412), - publicFunctionExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_FUNCTION_EXPORTS", 5227), + publicExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_EXPORTS", 10416), + publicFunctionExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_FUNCTION_EXPORTS", 5230), publicDeprecatedExports: readBudgetEnv( "OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_DEPRECATED_EXPORTS", 3261, diff --git a/src/plugin-sdk/dedupe-runtime.test.ts b/src/plugin-sdk/dedupe-runtime.test.ts new file mode 100644 index 000000000000..449b5b7bab2d --- /dev/null +++ b/src/plugin-sdk/dedupe-runtime.test.ts @@ -0,0 +1,116 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createPersistentDedupeCache } from "./dedupe-runtime.js"; + +type Record = { at: number }; + +function createMemoryStore() { + const entries = new Map(); + return { + entries, + store: { + register: vi.fn(async (key: string, value: Record) => { + entries.set(key, value); + }), + lookup: vi.fn(async (key: string) => entries.get(key)), + }, + }; +} + +function createCache(params?: { + openStore?: () => ReturnType["store"] | undefined; + logError?: (error: unknown) => void; + readTimestamp?: (record: Record) => number | undefined; +}) { + const backing = createMemoryStore(); + const cache = createPersistentDedupeCache({ + // Plain Symbol() is unique per cache, so parallel tests never share memory layers. + globalKey: Symbol("test.persistent-dedupe"), + ttlMs: 60_000, + maxSize: 100, + persistent: { + namespace: "test.persistent-dedupe", + maxEntries: 100, + openStore: params?.openStore ?? (() => backing.store), + logError: params?.logError, + readTimestamp: params?.readTimestamp, + }, + }); + return { cache, backing }; +} + +describe("createPersistentDedupeCache", () => { + afterEach(() => { + vi.useRealTimers(); + }); + + it("records presence in both layers and answers from memory first", async () => { + const { cache, backing } = createCache(); + await cache.register("k1", { at: 1 }); + expect(cache.peek("k1")).toBe(true); + expect(await cache.lookup("k1")).toBe(true); + expect(backing.store.register).toHaveBeenCalledWith("k1", { at: 1 }); + expect(backing.store.lookup).not.toHaveBeenCalled(); + }); + + it("falls back to persistence and re-primes memory on a hit", async () => { + const { cache, backing } = createCache(); + backing.entries.set("k2", { at: 42 }); + expect(cache.peek("k2")).toBe(false); + expect(await cache.lookup("k2")).toBe(true); + expect(cache.peek("k2")).toBe(true); + }); + + it("re-primes memory with the persisted timestamp when provided", async () => { + vi.useFakeTimers(); + vi.setSystemTime(1_000_000); + const { cache, backing } = createCache({ readTimestamp: (record) => record.at }); + backing.entries.set("k3", { at: 1_000_000 - 59_000 }); + expect(await cache.lookup("k3")).toBe(true); + // Re-primed at the original timestamp: expires 59s later instead of a fresh 60s TTL. + vi.setSystemTime(1_000_000 + 2_000); + expect(cache.peek("k3")).toBe(false); + }); + + it("disables persistence after an open failure and never rejects", async () => { + const logError = vi.fn(); + const openStore = vi.fn(() => { + throw new Error("sqlite unavailable"); + }); + const { cache } = createCache({ openStore, logError }); + await expect(cache.register("k4", { at: 1 })).resolves.toBeUndefined(); + expect(cache.peek("k4")).toBe(true); + expect(await cache.lookup("k5")).toBe(false); + expect(openStore).toHaveBeenCalledTimes(1); + expect(logError).toHaveBeenCalledTimes(1); + }); + + it("disables persistence after a lookup failure", async () => { + const logError = vi.fn(); + const store = { + register: vi.fn(async () => {}), + lookup: vi.fn(async () => { + throw new Error("read failed"); + }), + }; + const { cache } = createCache({ openStore: () => store, logError }); + expect(await cache.lookup("k6")).toBe(false); + expect(logError).toHaveBeenCalledTimes(1); + await cache.register("k7", { at: 1 }); + expect(store.register).not.toHaveBeenCalled(); + }); + + it("clearForTest resets memory and re-enables persistence", async () => { + const openStore = vi + .fn() + .mockImplementationOnce(() => { + throw new Error("boom"); + }) + .mockImplementation(() => createMemoryStore().store); + const { cache } = createCache({ openStore }); + await cache.register("k8", { at: 1 }); + cache.clearForTest(); + expect(cache.peek("k8")).toBe(false); + await cache.register("k9", { at: 1 }); + expect(openStore).toHaveBeenCalledTimes(2); + }); +}); diff --git a/src/plugin-sdk/dedupe-runtime.ts b/src/plugin-sdk/dedupe-runtime.ts index e5a116242477..a4993ae87111 100644 --- a/src/plugin-sdk/dedupe-runtime.ts +++ b/src/plugin-sdk/dedupe-runtime.ts @@ -1,3 +1,120 @@ // In-memory dedupe helpers for plugin runtime hot paths. +import { resolveGlobalDedupeCache } from "../infra/dedupe.js"; +import type { OpenKeyedStoreOptions } from "./plugin-state-runtime.js"; + export { createDedupeCache, resolveGlobalDedupeCache } from "../infra/dedupe.js"; + +type PersistentDedupeStore = { + register(key: string, value: TRecord, opts?: { ttlMs?: number }): Promise; + lookup(key: string): Promise; +}; + +/** Dual-layer presence cache: process-memory dedupe plus best-effort persistent state. */ +export type PersistentDedupeCache = { + /** Memory-only presence check without refreshing recency. */ + peek(key: string): boolean; + /** Memory-first presence check; falls back to persistence and re-primes memory on a hit. */ + lookup(key: string): Promise; + /** Records presence in memory and best-effort persistence. Never rejects. */ + register(key: string, record: TRecord, opts?: { at?: number }): Promise; + /** Clears memory and re-enables a persistent layer disabled by an earlier failure. */ + clearForTest(): void; +}; + +/** + * Creates a channel-family presence cache backed by a global in-memory dedupe layer + * plus a lazily opened plugin keyed store. Persistence is best effort: the first + * open/read/write failure disables the persistent layer for the process so message + * handling never breaks on state errors, matching the shipped channel-cache contract. + */ +export function createPersistentDedupeCache(params: { + /** Global symbol key so the memory layer stays shared across bundled chunks. */ + globalKey: symbol; + ttlMs: number; + maxSize: number; + persistent: { + namespace: string; + maxEntries: number; + /** Usually `() => runtime?.state.openKeyedStore(options)`; undefined skips persistence. */ + openStore: (options: OpenKeyedStoreOptions) => PersistentDedupeStore | undefined; + logError?: (error: unknown) => void; + /** Memory re-prime timestamp after a persistent hit; defaults to now. */ + readTimestamp?: (record: TRecord) => number | undefined; + }; +}): PersistentDedupeCache { + const memory = resolveGlobalDedupeCache(params.globalKey, { + ttlMs: params.ttlMs, + maxSize: params.maxSize, + }); + let persistentStore: PersistentDedupeStore | undefined; + let persistentStoreDisabled = false; + + const disablePersistentStore = (error: unknown) => { + persistentStoreDisabled = true; + persistentStore = undefined; + params.persistent.logError?.(error); + }; + + const getPersistentStore = (): PersistentDedupeStore | undefined => { + if (persistentStoreDisabled) { + return undefined; + } + if (persistentStore) { + return persistentStore; + } + try { + persistentStore = params.persistent.openStore({ + namespace: params.persistent.namespace, + maxEntries: params.persistent.maxEntries, + defaultTtlMs: params.ttlMs, + }); + return persistentStore; + } catch (error) { + disablePersistentStore(error); + return undefined; + } + }; + + return { + peek: (key) => memory.peek(key), + lookup: async (key) => { + if (memory.peek(key)) { + return true; + } + const store = getPersistentStore(); + if (!store) { + return false; + } + let record: TRecord | undefined; + try { + record = await store.lookup(key); + } catch (error) { + disablePersistentStore(error); + return false; + } + if (record === undefined) { + return false; + } + memory.check(key, params.persistent.readTimestamp?.(record)); + return true; + }, + register: async (key, record, opts) => { + memory.check(key, opts?.at); + const store = getPersistentStore(); + if (!store) { + return; + } + try { + await store.register(key, record); + } catch (error) { + disablePersistentStore(error); + } + }, + clearForTest: () => { + memory.clear(); + persistentStore = undefined; + persistentStoreDisabled = false; + }, + }; +} diff --git a/src/plugin-sdk/runtime-doctor.ts b/src/plugin-sdk/runtime-doctor.ts index b8d35ee130f8..6e50804d7792 100644 --- a/src/plugin-sdk/runtime-doctor.ts +++ b/src/plugin-sdk/runtime-doctor.ts @@ -29,4 +29,8 @@ export type { PluginDoctorStateMigration, PluginDoctorStateMigrationContext, } from "../plugins/doctor-contract-registry.js"; +export { + archiveLegacyStateSource, + legacyStateFileExists, +} from "../plugins/doctor-state-migration-fs.js"; export type { DoctorSessionRouteStateOwner } from "../plugins/doctor-session-route-state-owner-types.js"; diff --git a/src/plugins/doctor-state-migration-fs.ts b/src/plugins/doctor-state-migration-fs.ts new file mode 100644 index 000000000000..c25b7b620cc0 --- /dev/null +++ b/src/plugins/doctor-state-migration-fs.ts @@ -0,0 +1,38 @@ +// Shared filesystem helpers for plugin doctor legacy-state migrations. +import fs from "node:fs/promises"; + +/** True when the legacy-state path exists and is a regular file. */ +export async function legacyStateFileExists(filePath: string): Promise { + try { + const stat = await fs.stat(filePath); + return stat.isFile(); + } catch { + return false; + } +} + +/** + * Renames a migrated legacy source to `.migrated`, recording the outcome in the + * doctor changes/warnings lists. Never throws: a failed archive leaves the source in + * place so a later doctor run can retry without losing migrated data. + */ +export async function archiveLegacyStateSource(params: { + filePath: string; + label: string; + changes: string[]; + warnings: string[]; +}): Promise { + const archivedPath = `${params.filePath}.migrated`; + if (await legacyStateFileExists(archivedPath)) { + params.warnings.push( + `Left migrated ${params.label} source in place because ${archivedPath} already exists`, + ); + return; + } + try { + await fs.rename(params.filePath, archivedPath); + params.changes.push(`Archived ${params.label} legacy source -> ${archivedPath}`); + } catch (err) { + params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`); + } +}