refactor: consolidate duplicated plugin state and doctor migration plumbing onto SDK seams (#99850)

* refactor(plugin-sdk): add createPersistentDedupeCache and migrate channel presence caches

* refactor(matrix): adopt SDK approval reaction target store

* refactor(plugin-sdk): share doctor legacy-state migration fs helpers

* refactor(memory-core): dedupe qmd cache entry envelope validation

* chore(plugin-sdk): pin surface budgets for shared dedupe and doctor helpers

* test(matrix): use future approval expiry fixtures for reaction targets

* test(matrix): use future approval expiry fixtures for reaction targets
This commit is contained in:
Peter Steinberger
2026-07-04 01:51:03 -07:00
committed by GitHub
parent 3d404478b8
commit eafe2a8d0b
27 changed files with 565 additions and 946 deletions

View File

@@ -440,7 +440,7 @@ releases.
| Heartbeat wake, event, and visibility helpers | `openclaw/plugin-sdk/heartbeat-runtime` |
| Pending delivery queue drain | `openclaw/plugin-sdk/delivery-queue-runtime` |
| Channel activity telemetry | `openclaw/plugin-sdk/channel-activity-runtime` |
| In-memory dedupe caches | `openclaw/plugin-sdk/dedupe-runtime` |
| In-memory and persistent-backed dedupe caches | `openclaw/plugin-sdk/dedupe-runtime` |
| Safe local-file/media path helpers | `openclaw/plugin-sdk/file-access-runtime` |
| Dispatcher-aware fetch | `openclaw/plugin-sdk/runtime-fetch` |
| Proxy and guarded fetch helpers | `openclaw/plugin-sdk/fetch-runtime` |
@@ -567,7 +567,7 @@ releases.
| `plugin-sdk/heartbeat-runtime` | Heartbeat helpers | Heartbeat wake, event, and visibility helpers |
| `plugin-sdk/delivery-queue-runtime` | Delivery queue helpers | `drainPendingDeliveries` |
| `plugin-sdk/channel-activity-runtime` | Channel activity helpers | `recordChannelActivity` |
| `plugin-sdk/dedupe-runtime` | Dedupe helpers | In-memory dedupe caches |
| `plugin-sdk/dedupe-runtime` | Dedupe helpers | In-memory and persistent-backed dedupe caches |
| `plugin-sdk/file-access-runtime` | File access helpers | Safe local-file/media path helpers |
| `plugin-sdk/transport-ready-runtime` | Transport readiness helpers | `waitForTransportReady` |
| `plugin-sdk/exec-approvals-runtime` | Exec approval policy helpers | `loadExecApprovals`, `resolveExecApprovalsFromFile`, `ExecApprovalsFile` |

View File

@@ -290,7 +290,7 @@ usage endpoint failed or returned no usable usage data.
| `plugin-sdk/async-lock-runtime` | Process-local async lock helper for small runtime state files |
| `plugin-sdk/channel-activity-runtime` | Channel activity telemetry helper |
| `plugin-sdk/concurrency-runtime` | Bounded async task concurrency helper |
| `plugin-sdk/dedupe-runtime` | In-memory dedupe cache helpers |
| `plugin-sdk/dedupe-runtime` | In-memory and persistent-backed dedupe cache helpers |
| `plugin-sdk/delivery-queue-runtime` | Outbound pending-delivery drain helper |
| `plugin-sdk/file-access-runtime` | Safe local-file and media-source path helpers |
| `plugin-sdk/heartbeat-runtime` | Heartbeat wake, event, and visibility helpers |

View File

@@ -1,7 +1,10 @@
// ACPX doctor contract migrates shipped plugin-owned runtime state.
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import {
normalizeAcpxProcessLease,
normalizeAcpxProcessLeaseFile,
@@ -26,15 +29,6 @@ function resolveLegacyProcessLeasePath(stateDir: string): string {
return path.join(stateDir, "acpx", ACPX_LEGACY_PROCESS_LEASE_FILE);
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function readLegacyGatewayInstanceId(filePath: string): Promise<string | null> {
try {
const value = (await fs.readFile(filePath, "utf8")).trim();
@@ -55,27 +49,6 @@ async function readLegacyOpenProcessLeases(filePath: string): Promise<AcpxProces
}
}
async function archiveLegacySource(params: {
filePath: string;
label: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated ACPX ${params.label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived ACPX ${params.label} legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ACPX ${params.label} legacy source: ${String(err)}`);
}
}
export const stateMigrations: PluginDoctorStateMigration[] = [
{
id: "acpx-runtime-state-to-plugin-state",
@@ -171,9 +144,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
}
if (gatewayInstanceId) {
await archiveLegacySource({
await archiveLegacyStateSource({
filePath: gatewayInstancePath,
label: "gateway-instance-id",
label: "ACPX gateway-instance-id",
changes,
warnings,
});
@@ -193,9 +166,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
changes.push(
`Migrated ACPX process leases -> plugin state (${imported} imported, ${alreadyPresent} already present)`,
);
await archiveLegacySource({
await archiveLegacyStateSource({
filePath: processLeasePath,
label: "process-leases",
label: "ACPX process-leases",
changes,
warnings,
});

View File

@@ -5,7 +5,10 @@
import crypto from "node:crypto";
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
type ActiveMemoryToggleEntry = {
sessionKey: string;
@@ -25,15 +28,6 @@ function activeMemoryToggleKey(sessionKey: string): string {
return crypto.createHash("sha256").update(sessionKey, "utf8").digest("hex");
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function readLegacyToggleEntries(filePath: string): Promise<ActiveMemoryToggleEntry[]> {
try {
const parsed = JSON.parse(await fs.readFile(filePath, "utf8")) as unknown;
@@ -64,27 +58,6 @@ async function readLegacyToggleEntries(filePath: string): Promise<ActiveMemoryTo
}
}
async function archiveLegacySource(params: {
filePath: string;
label: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${params.label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived ${params.label} legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`);
}
}
/** State migrations exposed to OpenClaw doctor for Active Memory. */
export const stateMigrations: PluginDoctorStateMigration[] = [
{
@@ -139,7 +112,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
`Migrated ${imported} Active Memory session toggle ${imported === 1 ? "entry" : "entries"} -> plugin state`,
);
}
await archiveLegacySource({
await archiveLegacyStateSource({
filePath,
label: "Active Memory session toggles",
changes,

View File

@@ -1,7 +1,10 @@
// Device Pair doctor contract migrates shipped plugin-owned state.
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import {
DEVICE_PAIR_NOTIFY_LEGACY_STATE_FILE,
DEVICE_PAIR_NOTIFY_SUBSCRIBER_MAX_ENTRIES,
@@ -16,15 +19,6 @@ function resolveLegacyNotifyStatePath(stateDir: string): string {
return path.join(stateDir, DEVICE_PAIR_NOTIFY_LEGACY_STATE_FILE);
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function readLegacyNotifyState(filePath: string): Promise<LegacyNotifyStateFile | null> {
try {
return normalizeLegacyNotifyState(JSON.parse(await fs.readFile(filePath, "utf8")) as unknown);
@@ -33,26 +27,6 @@ async function readLegacyNotifyState(filePath: string): Promise<LegacyNotifyStat
}
}
async function archiveLegacySource(params: {
filePath: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated Device Pair notify-state source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived Device Pair notify-state legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving Device Pair notify-state legacy source: ${String(err)}`);
}
}
export const stateMigrations: PluginDoctorStateMigration[] = [
{
id: "device-pair-notify-json-to-plugin-state",
@@ -99,7 +73,12 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
changes.push(
`Migrated Device Pair notify subscribers -> plugin state (${imported} imported, ${alreadyPresent} already present)`,
);
await archiveLegacySource({ filePath, changes, warnings });
await archiveLegacyStateSource({
filePath,
label: "Device Pair notify-state",
changes,
warnings,
});
return { changes, warnings };
},
},

View File

@@ -2,7 +2,10 @@ import type { Dirent } from "node:fs";
// Matrix API module exposes the plugin public contract.
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
legacyStateFileExists,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import {
hasMatrixSyncCacheStateInStore,
openMatrixSyncCacheStoreOptions,
@@ -43,15 +46,6 @@ export { normalizeCompatibilityConfig, legacyConfigRules } from "./src/doctor-co
const MATRIX_SYNC_CACHE_FILENAME = "bot-storage.json";
const MATRIX_STORAGE_META_FILENAME = "storage-meta.json";
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function collectLegacyMatrixStateRoots(
stateDir: string,
filename: string,
@@ -105,7 +99,7 @@ async function archiveLegacySyncCache(params: {
}): Promise<void> {
const sourcePath = path.join(params.storageRootDir, MATRIX_SYNC_CACHE_FILENAME);
const archivedPath = `${sourcePath}.migrated`;
if (await fileExists(archivedPath)) {
if (await legacyStateFileExists(archivedPath)) {
params.warnings.push(
`Left migrated Matrix sync cache in place because ${archivedPath} already exists`,
);
@@ -128,7 +122,7 @@ async function archiveLegacyMatrixStateFile(params: {
}): Promise<void> {
const sourcePath = path.join(params.storageRootDir, params.filename);
const archivedPath = `${sourcePath}.migrated`;
if (await fileExists(archivedPath)) {
if (await legacyStateFileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${params.label} in place because ${archivedPath} already exists`,
);

View File

@@ -7,7 +7,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import { matrixApprovalNativeRuntime } from "./approval-handler.runtime.js";
import {
clearMatrixApprovalReactionTargetsForTest,
resolveMatrixApprovalReactionTarget,
resolveMatrixApprovalReactionTargetWithPersistence,
} from "./approval-reactions.js";
type MatrixDeliverPendingParams = Parameters<
@@ -72,6 +72,11 @@ function buildMatrixApprovalRoomTarget(
};
}
// Pending approvals expire in the future; the reaction target store TTLs its
// memory layer from `view.expiresAtMs - now`, so epoch-past fixtures would
// evict the mapping before assertions run.
const TEST_APPROVAL_EXPIRES_AT_MS = Date.now() + 5 * 60_000;
function buildExecApprovalView(
overrides: Partial<MatrixPendingExecApprovalView> = {},
): MatrixPendingExecApprovalView {
@@ -102,7 +107,7 @@ function buildExecApprovalView(
command: "/approve req-1 deny",
},
],
expiresAtMs: 1_000,
expiresAtMs: TEST_APPROVAL_EXPIRES_AT_MS,
...overrides,
};
}
@@ -129,7 +134,7 @@ function buildPluginApprovalView(
command: "/approve plugin:req-1 allow-once",
},
],
expiresAtMs: 1_000,
expiresAtMs: TEST_APPROVAL_EXPIRES_AT_MS,
...overrides,
};
}
@@ -273,7 +278,7 @@ describe("matrixApprovalNativeRuntime", () => {
kind: "plugin",
title: "Plugin Approval Required",
description: "Approve the tool call.",
expiresAtMs: 1_000,
expiresAtMs: TEST_APPROVAL_EXPIRES_AT_MS,
metadata: [],
allowedDecisions: ["allow-once"],
actions: [
@@ -304,7 +309,7 @@ describe("matrixApprovalNativeRuntime", () => {
});
const reactMessage = vi.fn().mockImplementation(async () => {
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!room:example.org",
eventId: "$approval",
reactionKey: "✅",
@@ -510,7 +515,7 @@ describe("matrixApprovalNativeRuntime", () => {
eventId: "$primary",
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!room:example.org",
eventId: "$primary",
reactionKey: "✅",
@@ -520,7 +525,7 @@ describe("matrixApprovalNativeRuntime", () => {
decision: "allow-once",
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!room:example.org",
eventId: "$last",
reactionKey: "✅",

View File

@@ -5,7 +5,6 @@ import {
clearMatrixApprovalReactionTargetsForTest,
listMatrixApprovalReactionBindings,
registerMatrixApprovalReactionTarget,
resolveMatrixApprovalReactionTarget,
resolveMatrixApprovalReactionTargetWithPersistence,
unregisterMatrixApprovalReactionTarget,
} from "./approval-reactions.js";
@@ -31,7 +30,7 @@ describe("matrix approval reactions", () => {
);
});
it("resolves a registered approval anchor event back to an approval decision", () => {
it("resolves a registered approval anchor event back to an approval decision", async () => {
registerMatrixApprovalReactionTarget({
roomId: "!ops:example.org",
eventId: "$approval-msg",
@@ -40,7 +39,7 @@ describe("matrix approval reactions", () => {
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg",
reactionKey: "✅",
@@ -50,7 +49,7 @@ describe("matrix approval reactions", () => {
decision: "allow-once",
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg",
reactionKey: "♾️",
@@ -60,7 +59,7 @@ describe("matrix approval reactions", () => {
decision: "allow-always",
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg",
reactionKey: "❌",
@@ -71,7 +70,7 @@ describe("matrix approval reactions", () => {
});
});
it("ignores reactions that are not allowed on the registered approval anchor event", () => {
it("ignores reactions that are not allowed on the registered approval anchor event", async () => {
registerMatrixApprovalReactionTarget({
roomId: "!ops:example.org",
eventId: "$approval-msg",
@@ -80,7 +79,7 @@ describe("matrix approval reactions", () => {
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg",
reactionKey: "♾️",
@@ -88,7 +87,7 @@ describe("matrix approval reactions", () => {
).toBeNull();
});
it("stops resolving reactions after the approval anchor event is unregistered", () => {
it("stops resolving reactions after the approval anchor event is unregistered", async () => {
registerMatrixApprovalReactionTarget({
roomId: "!ops:example.org",
eventId: "$approval-msg",
@@ -101,7 +100,7 @@ describe("matrix approval reactions", () => {
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg",
reactionKey: "✅",
@@ -158,7 +157,7 @@ describe("matrix approval reactions", () => {
expect(lookup).toHaveBeenCalledWith("!ops:example.org:$approval-msg-2");
});
it("falls back to in-memory approval reaction targets when persistent state cannot open", () => {
it("falls back to in-memory approval reaction targets when persistent state cannot open", async () => {
const warn = vi.fn();
setMatrixRuntime({
state: {
@@ -177,7 +176,7 @@ describe("matrix approval reactions", () => {
});
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg-3",
reactionKey: "❌",

View File

@@ -1,7 +1,10 @@
// Matrix plugin module implements approval reactions behavior.
import { createApprovalReactionTargetStore } from "openclaw/plugin-sdk/approval-reaction-runtime";
import type { ExecApprovalReplyDecision } from "openclaw/plugin-sdk/approval-runtime";
import { getOptionalMatrixRuntime } from "./runtime.js";
// Matrix keeps its own reaction emoji set (checkmark/cross render reliably across
// Matrix clients), so decision resolution stays local instead of using the SDK bindings.
const MATRIX_APPROVAL_REACTION_META = {
"allow-once": {
emoji: "✅",
@@ -43,34 +46,6 @@ type MatrixApprovalReactionTarget = {
allowedDecisions: readonly ExecApprovalReplyDecision[];
};
type PersistedMatrixApprovalReactionTarget = {
version: 1;
target: MatrixApprovalReactionTarget;
};
type MatrixApprovalReactionStore = {
register(
key: string,
value: PersistedMatrixApprovalReactionTarget,
opts?: { ttlMs?: number },
): Promise<void>;
lookup(key: string): Promise<PersistedMatrixApprovalReactionTarget | undefined>;
delete(key: string): Promise<boolean>;
};
const matrixApprovalReactionTargets = new Map<string, MatrixApprovalReactionTarget>();
let persistentStore: MatrixApprovalReactionStore | undefined;
let persistentStoreDisabled = false;
function buildReactionTargetKey(roomId: string, eventId: string): string | null {
const normalizedRoomId = roomId.trim();
const normalizedEventId = eventId.trim();
if (!normalizedRoomId || !normalizedEventId) {
return null;
}
return `${normalizedRoomId}:${normalizedEventId}`;
}
function reportPersistentApprovalReactionError(error: unknown): void {
try {
getOptionalMatrixRuntime()
@@ -81,85 +56,34 @@ function reportPersistentApprovalReactionError(error: unknown): void {
}
}
function disablePersistentApprovalReactionStore(error: unknown): void {
persistentStoreDisabled = true;
persistentStore = undefined;
reportPersistentApprovalReactionError(error);
}
function getPersistentApprovalReactionStore(): MatrixApprovalReactionStore | undefined {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
const runtime = getOptionalMatrixRuntime();
if (!runtime) {
return undefined;
}
try {
persistentStore = runtime.state.openKeyedStore<PersistedMatrixApprovalReactionTarget>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: DEFAULT_REACTION_TARGET_TTL_MS,
});
return persistentStore;
} catch (error) {
disablePersistentApprovalReactionStore(error);
return undefined;
}
}
function readPersistedTarget(value: unknown): MatrixApprovalReactionTarget | null {
const persisted = value as PersistedMatrixApprovalReactionTarget | undefined;
if (
persisted?.version !== 1 ||
!persisted.target ||
typeof persisted.target.approvalId !== "string" ||
!Array.isArray(persisted.target.allowedDecisions)
) {
function readPersistedTarget(target: unknown): MatrixApprovalReactionTarget | null {
const value = target as Partial<MatrixApprovalReactionTarget> | null | undefined;
if (!value || typeof value.approvalId !== "string" || !Array.isArray(value.allowedDecisions)) {
return null;
}
return persisted.target;
return {
approvalId: value.approvalId,
allowedDecisions: value.allowedDecisions,
};
}
function rememberPersistentApprovalReactionTarget(params: {
key: string;
target: MatrixApprovalReactionTarget;
ttlMs?: number;
}): void {
const ttlMs = params.ttlMs == null ? DEFAULT_REACTION_TARGET_TTL_MS : Math.max(1, params.ttlMs);
const store = getPersistentApprovalReactionStore();
if (!store) {
return;
}
void store
.register(params.key, { version: 1, target: params.target }, { ttlMs })
.catch(disablePersistentApprovalReactionStore);
}
const matrixApprovalReactionTargets =
createApprovalReactionTargetStore<MatrixApprovalReactionTarget>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: DEFAULT_REACTION_TARGET_TTL_MS,
openStore: (storeParams) => getOptionalMatrixRuntime()?.state.openKeyedStore(storeParams),
logPersistentError: reportPersistentApprovalReactionError,
readPersistedTarget,
});
function forgetPersistentApprovalReactionTarget(key: string): void {
const store = getPersistentApprovalReactionStore();
if (!store) {
return;
}
void store.delete(key).catch(disablePersistentApprovalReactionStore);
}
async function lookupPersistentApprovalReactionTarget(
key: string,
): Promise<MatrixApprovalReactionTarget | null> {
const store = getPersistentApprovalReactionStore();
if (!store) {
return null;
}
try {
return readPersistedTarget(await store.lookup(key));
} catch (error) {
disablePersistentApprovalReactionStore(error);
function buildReactionTargetKey(roomId: string, eventId: string): string | null {
const normalizedRoomId = roomId.trim();
const normalizedEventId = eventId.trim();
if (!normalizedRoomId || !normalizedEventId) {
return null;
}
return `${normalizedRoomId}:${normalizedEventId}`;
}
export function listMatrixApprovalReactionBindings(
@@ -225,16 +149,11 @@ export function registerMatrixApprovalReactionTarget(params: {
if (!key || !approvalId || allowedDecisions.length === 0) {
return;
}
const target = {
approvalId,
allowedDecisions,
};
matrixApprovalReactionTargets.set(key, target);
rememberPersistentApprovalReactionTarget({
matrixApprovalReactionTargets.register(
key,
target,
ttlMs: params.ttlMs,
});
{ approvalId, allowedDecisions },
{ ttlMs: params.ttlMs },
);
}
export function unregisterMatrixApprovalReactionTarget(params: {
@@ -246,7 +165,6 @@ export function unregisterMatrixApprovalReactionTarget(params: {
return;
}
matrixApprovalReactionTargets.delete(key);
forgetPersistentApprovalReactionTarget(key);
}
function resolveTarget(params: {
@@ -270,21 +188,6 @@ function resolveTarget(params: {
};
}
export function resolveMatrixApprovalReactionTarget(params: {
roomId: string;
eventId: string;
reactionKey: string;
}): MatrixApprovalReactionResolution | null {
const key = buildReactionTargetKey(params.roomId, params.eventId);
if (!key) {
return null;
}
return resolveTarget({
target: matrixApprovalReactionTargets.get(key),
reactionKey: params.reactionKey,
});
}
export async function resolveMatrixApprovalReactionTargetWithPersistence(params: {
roomId: string;
eventId: string;
@@ -294,21 +197,12 @@ export async function resolveMatrixApprovalReactionTargetWithPersistence(params:
if (!key) {
return null;
}
const inMemory = resolveTarget({
target: matrixApprovalReactionTargets.get(key),
reactionKey: params.reactionKey,
});
if (inMemory) {
return inMemory;
}
return resolveTarget({
target: await lookupPersistentApprovalReactionTarget(key),
target: await matrixApprovalReactionTargets.lookup(key),
reactionKey: params.reactionKey,
});
}
export function clearMatrixApprovalReactionTargetsForTest(): void {
matrixApprovalReactionTargets.clear();
persistentStore = undefined;
persistentStoreDisabled = false;
matrixApprovalReactionTargets.clearForTest();
}

View File

@@ -3,7 +3,7 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
import {
clearMatrixApprovalReactionTargetsForTest,
registerMatrixApprovalReactionTarget,
resolveMatrixApprovalReactionTarget,
resolveMatrixApprovalReactionTargetWithPersistence,
} from "../../approval-reactions.js";
import type { CoreConfig } from "../../types.js";
import { handleInboundMatrixReaction } from "./reaction-events.js";
@@ -295,7 +295,7 @@ describe("matrix approval reactions", () => {
expect(client.getEvent).not.toHaveBeenCalled();
expect(
resolveMatrixApprovalReactionTarget({
await resolveMatrixApprovalReactionTargetWithPersistence({
roomId: "!ops:example.org",
eventId: "$approval-msg",
reactionKey: "❌",

View File

@@ -1,11 +1,10 @@
// Mattermost plugin module implements thread participation cache behavior.
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { getOptionalMattermostRuntime } from "../runtime.js";
/**
* In-memory + persisted cache of Mattermost threads the bot has replied in.
* Lets the bot auto-respond to thread follow-ups without a re-mention after its
* first visible reply. Mirrors the Slack `sent-thread-cache` dual-layer pattern.
* Cache of Mattermost threads the bot has replied in. Lets the bot auto-respond
* to thread follow-ups without a re-mention after its first visible reply.
*/
const TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days
@@ -18,99 +17,37 @@ type MattermostThreadParticipationRecord = {
repliedAt: number;
};
type MattermostThreadParticipationStore = {
register(
key: string,
value: MattermostThreadParticipationRecord,
opts?: { ttlMs?: number },
): Promise<void>;
lookup(key: string): Promise<MattermostThreadParticipationRecord | undefined>;
};
/**
* Keep thread participation shared across bundled chunks so thread auto-reply
* gating does not diverge between the inbound-gate and reply-dispatch paths.
*/
const MATTERMOST_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.mattermostThreadParticipation");
const threadParticipation = resolveGlobalDedupeCache(MATTERMOST_THREAD_PARTICIPATION_KEY, {
const threadParticipation = createPersistentDedupeCache<MattermostThreadParticipationRecord>({
globalKey: MATTERMOST_THREAD_PARTICIPATION_KEY,
ttlMs: TTL_MS,
maxSize: MAX_ENTRIES,
persistent: {
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
openStore: (options) => getOptionalMattermostRuntime()?.state.openKeyedStore(options),
logError: (error) => {
try {
getOptionalMattermostRuntime()
?.logging.getChildLogger({ plugin: "mattermost", feature: "thread-participation-state" })
.warn("Mattermost persistent thread participation state failed", {
error: String(error),
});
} catch {
// Best effort only: persistent state must never break Mattermost message handling.
}
},
},
});
let persistentStore: MattermostThreadParticipationStore | undefined;
let persistentStoreDisabled = false;
function makeKey(accountId: string, channelId: string, threadRootId: string): string {
return `${accountId}:${channelId}:${threadRootId}`;
}
function reportPersistentThreadParticipationError(error: unknown): void {
try {
getOptionalMattermostRuntime()
?.logging.getChildLogger({ plugin: "mattermost", feature: "thread-participation-state" })
.warn("Mattermost persistent thread participation state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Mattermost message handling.
}
}
function disablePersistentThreadParticipation(error: unknown): void {
persistentStoreDisabled = true;
persistentStore = undefined;
reportPersistentThreadParticipationError(error);
}
function getPersistentThreadParticipationStore(): MattermostThreadParticipationStore | undefined {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
const runtime = getOptionalMattermostRuntime();
if (!runtime) {
return undefined;
}
try {
persistentStore = runtime.state.openKeyedStore<MattermostThreadParticipationRecord>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: TTL_MS,
});
return persistentStore;
} catch (error) {
disablePersistentThreadParticipation(error);
return undefined;
}
}
function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void {
const store = getPersistentThreadParticipationStore();
if (!store) {
return;
}
void store
.register(params.key, {
// Stored for future per-agent thread routing; current reads only need presence.
...(params.agentId ? { agentId: params.agentId } : {}),
repliedAt: Date.now(),
})
.catch(disablePersistentThreadParticipation);
}
async function lookupPersistentThreadParticipation(key: string): Promise<boolean> {
const store = getPersistentThreadParticipationStore();
if (!store) {
return false;
}
try {
return Boolean(await store.lookup(key));
} catch (error) {
disablePersistentThreadParticipation(error);
return false;
}
}
export function recordMattermostThreadParticipation(
accountId: string,
channelId: string,
@@ -120,9 +57,11 @@ export function recordMattermostThreadParticipation(
if (!accountId || !channelId || !threadRootId) {
return;
}
const key = makeKey(accountId, channelId, threadRootId);
threadParticipation.check(key);
rememberPersistentThreadParticipation({ key, agentId: opts?.agentId });
void threadParticipation.register(makeKey(accountId, channelId, threadRootId), {
// Stored for future per-agent thread routing; current reads only need presence.
...(opts?.agentId ? { agentId: opts.agentId } : {}),
repliedAt: Date.now(),
});
}
export async function hasMattermostThreadParticipationWithPersistence(params: {
@@ -133,19 +72,11 @@ export async function hasMattermostThreadParticipationWithPersistence(params: {
if (!params.accountId || !params.channelId || !params.threadRootId) {
return false;
}
const key = makeKey(params.accountId, params.channelId, params.threadRootId);
if (threadParticipation.peek(key)) {
return true;
}
const found = await lookupPersistentThreadParticipation(key);
if (found) {
threadParticipation.check(key);
}
return found;
return await threadParticipation.lookup(
makeKey(params.accountId, params.channelId, params.threadRootId),
);
}
export function clearMattermostThreadParticipationCache(): void {
threadParticipation.clear();
persistentStore = undefined;
persistentStoreDisabled = false;
threadParticipation.clearForTest();
}

View File

@@ -18,7 +18,11 @@ import {
} from "openclaw/plugin-sdk/memory-core-host-engine-storage";
import { resolveMemoryDreamingWorkspaces } from "openclaw/plugin-sdk/memory-core-host-status";
import { normalizeAgentId } from "openclaw/plugin-sdk/routing";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
legacyStateFileExists,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import {
ensureOpenClawAgentDatabaseSchema,
resolveOpenClawAgentSqlitePath,
@@ -724,7 +728,7 @@ async function collectLegacyMemorySidecarSources(params: {
async function addSource(agentId: string, legacyPath: string): Promise<void> {
const normalizedPath = path.resolve(legacyPath);
const key = `${agentId}\0${normalizedPath}`;
if (seen.has(key) || !(await fileExists(normalizedPath))) {
if (seen.has(key) || !(await legacyStateFileExists(normalizedPath))) {
return;
}
seen.add(key);
@@ -759,7 +763,7 @@ async function archiveLegacyMemorySidecar(params: {
await Promise.all(
LEGACY_MEMORY_SIDECAR_SUFFIXES.map(async (suffix) => {
const filePath = `${params.source.legacyPath}${suffix}`;
return (await fileExists(filePath)) ? filePath : null;
return (await legacyStateFileExists(filePath)) ? filePath : null;
}),
)
).filter((filePath): filePath is string => filePath !== null);
@@ -770,7 +774,7 @@ async function archiveLegacyMemorySidecar(params: {
await Promise.all(
existingSources.map(async (sourcePath) => {
const archivedPath = `${sourcePath}.migrated`;
return (await fileExists(archivedPath)) ? archivedPath : null;
return (await legacyStateFileExists(archivedPath)) ? archivedPath : null;
}),
)
).filter((filePath): filePath is string => filePath !== null);
@@ -789,7 +793,10 @@ async function archiveLegacyMemorySidecar(params: {
} catch (err) {
for (const entry of renamed.toReversed()) {
try {
if ((await fileExists(entry.archivedPath)) && !(await fileExists(entry.sourcePath))) {
if (
(await legacyStateFileExists(entry.archivedPath)) &&
!(await legacyStateFileExists(entry.sourcePath))
) {
await fs.rename(entry.archivedPath, entry.sourcePath);
}
} catch (rollbackErr) {
@@ -827,7 +834,7 @@ async function preserveLegacyMemorySidecarRetryPath(params: {
await Promise.all(
LEGACY_MEMORY_SIDECAR_SUFFIXES.map(async (suffix) => {
const targetPath = `${retryPath}${suffix}`;
return (await fileExists(targetPath)) ? targetPath : null;
return (await legacyStateFileExists(targetPath)) ? targetPath : null;
}),
)
).filter((targetPath): targetPath is string => targetPath !== null);
@@ -843,14 +850,14 @@ async function preserveLegacyMemorySidecarRetryPath(params: {
.digest("hex")
.slice(0, 12)}.sqlite`,
);
if (await fileExists(targetBasePath)) {
if (await legacyStateFileExists(targetBasePath)) {
return;
}
const existingSources = (
await Promise.all(
LEGACY_MEMORY_SIDECAR_SUFFIXES.map(async (suffix) => {
const sourcePath = `${params.source.legacyPath}${suffix}`;
return (await fileExists(sourcePath))
return (await legacyStateFileExists(sourcePath))
? { sourcePath, targetPath: `${targetBasePath}${suffix}` }
: null;
}),
@@ -985,42 +992,10 @@ function resolveConfiguredWorkspaces(config: unknown, env: NodeJS.ProcessEnv): s
).map((entry) => entry.workspaceDir);
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function readJsonFile(filePath: string): Promise<unknown> {
return JSON.parse(await fs.readFile(filePath, "utf8"));
}
async function archiveLegacySource(params: {
filePath: string;
label: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated Memory Core ${params.label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived Memory Core ${params.label} legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(
`Failed archiving Memory Core ${params.label} legacy source: ${String(err)}`,
);
}
}
async function collectLegacySources(
config: unknown,
env: NodeJS.ProcessEnv,
@@ -1035,7 +1010,7 @@ async function collectLegacySources(
];
for (const candidate of candidates) {
const filePath = path.join(workspaceDir, candidate.relativePath);
if (await fileExists(filePath)) {
if (await legacyStateFileExists(filePath)) {
sources.push({ workspaceDir, label: candidate.label, filePath });
}
}
@@ -1200,9 +1175,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
changes.push(
`Migrated Memory Core ${source.label} -> SQLite plugin state (${imported} row(s))`,
);
await archiveLegacySource({
await archiveLegacyStateSource({
filePath: source.filePath,
label: source.label,
label: `Memory Core ${source.label}`,
changes,
warnings,
});

View File

@@ -194,11 +194,19 @@ function multiCollectionProbeEntryKey(params: QmdRuntimeMultiCollectionProbeCach
);
}
function normalizeCollectionValidationEntry(
type QmdRuntimeCacheEnvelope = {
record: Record<string, unknown>;
createdAtMs: number;
expiresAtMs: number;
keyHash: string;
};
/** Validates the shared cache-entry envelope: version, expiry window, and key hash. */
function normalizeCacheEntryEnvelope(
value: unknown,
nowMs: number,
expectedKeyHash: string,
): QmdRuntimeCacheCollectionValidationEntry | undefined {
): QmdRuntimeCacheEnvelope | undefined {
if (typeof value !== "object" || value === null) {
return undefined;
}
@@ -229,6 +237,20 @@ function normalizeCollectionValidationEntry(
return undefined;
}
return { record, createdAtMs, expiresAtMs, keyHash };
}
function normalizeCollectionValidationEntry(
value: unknown,
nowMs: number,
expectedKeyHash: string,
): QmdRuntimeCacheCollectionValidationEntry | undefined {
const envelope = normalizeCacheEntryEnvelope(value, nowMs, expectedKeyHash);
if (!envelope) {
return undefined;
}
const { record, createdAtMs, expiresAtMs, keyHash } = envelope;
const validation = record.validation;
if (typeof validation !== "object" || validation === null) {
return undefined;
@@ -262,35 +284,11 @@ function normalizeMultiCollectionProbeEntry(
nowMs: number,
expectedKeyHash: string,
): QmdRuntimeCacheMultiCollectionProbeEntry | undefined {
if (typeof value !== "object" || value === null) {
return undefined;
}
const record = value as Record<string, unknown>;
if (record.version !== QMD_RUNTIME_CACHE_ENTRY_VERSION) {
return undefined;
}
const createdAtMs =
typeof record.createdAtMs === "number"
? Math.max(0, Math.floor(record.createdAtMs))
: Number.NaN;
const expiresAtMs =
typeof record.expiresAtMs === "number"
? Math.max(0, Math.floor(record.expiresAtMs))
: Number.NaN;
if (
!Number.isFinite(createdAtMs) ||
!Number.isFinite(expiresAtMs) ||
!Number.isFinite(nowMs) ||
nowMs >= expiresAtMs
) {
return undefined;
}
const keyHash = normalizeText(typeof record.keyHash === "string" ? record.keyHash : "");
if (keyHash !== expectedKeyHash) {
const envelope = normalizeCacheEntryEnvelope(value, nowMs, expectedKeyHash);
if (!envelope) {
return undefined;
}
const { record, createdAtMs, expiresAtMs, keyHash } = envelope;
const probe = record.multiCollectionProbe;
if (typeof probe !== "object" || probe === null) {

View File

@@ -166,7 +166,7 @@ describe("memory-wiki doctor source sync migration", () => {
await expect(migration.migrateLegacyState(params)).resolves.toEqual({
changes: [
"Migrated Memory Wiki import runs -> plugin state (1 imported, 0 existing)",
expect.stringContaining("Archived Memory Wiki import-run legacy record ->"),
expect.stringContaining("Archived Memory Wiki import-run legacy source ->"),
],
warnings: [],
});

View File

@@ -2,7 +2,11 @@
import fs from "node:fs/promises";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/plugin-entry";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
legacyStateFileExists,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import { resolveMemoryWikiConfig, type MemoryWikiPluginConfig } from "./src/config.js";
export { legacyConfigRules, normalizeCompatibilityConfig } from "./src/config-compat.js";
import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime";
@@ -49,36 +53,6 @@ function resolveConfiguredVaultRoots(params: {
return [resolved.vault.path];
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function archiveLegacySource(params: {
filePath: string;
label: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${params.label} in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived ${params.label} -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ${params.label}: ${String(err)}`);
}
}
async function archiveLegacyImportRunRecords(params: {
vaultRoot: string;
changes: string[];
@@ -97,9 +71,9 @@ async function archiveLegacyImportRunRecords(params: {
if (!entry.isFile() || !entry.name.endsWith(".json")) {
continue;
}
await archiveLegacySource({
await archiveLegacyStateSource({
filePath: path.join(importRunsDir, entry.name),
label: "Memory Wiki import-run legacy record",
label: "Memory Wiki import-run",
changes: params.changes,
warnings: params.warnings,
});
@@ -128,7 +102,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
const filePath = resolveMemoryWikiSourceSyncStatePath(vaultRoot);
const state = await readLegacyMemoryWikiSourceSyncState(vaultRoot);
const count = Object.keys(state.entries).length;
if (count === 0 || !(await fileExists(filePath))) {
if (count === 0 || !(await legacyStateFileExists(filePath))) {
continue;
}
previews.push(
@@ -146,7 +120,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
env: params.env,
})) {
const filePath = resolveMemoryWikiSourceSyncStatePath(vaultRoot);
if (!(await fileExists(filePath))) {
if (!(await legacyStateFileExists(filePath))) {
continue;
}
const state = await readLegacyMemoryWikiSourceSyncState(vaultRoot);
@@ -176,9 +150,9 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
changes.push(
`Migrated Memory Wiki source sync -> plugin state (${importedCount} imported, ${existingCount} existing)`,
);
await archiveLegacySource({
await archiveLegacyStateSource({
filePath,
label: "Memory Wiki source-sync legacy source",
label: "Memory Wiki source-sync",
changes,
warnings,
});

View File

@@ -3,7 +3,10 @@ import crypto from "node:crypto";
import type { Dirent } from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import { resolveStorePath } from "openclaw/plugin-sdk/session-store-runtime";
import { isRecord } from "openclaw/plugin-sdk/string-coerce-runtime";
import { normalizeStoredConversationId } from "./src/conversation-store-helpers.js";
@@ -133,15 +136,6 @@ function listCandidateStorePaths(params: {
return [...paths];
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
function resolveStateFilePath(stateDir: string, filename: string): string {
return path.join(stateDir, filename);
}
@@ -265,28 +259,6 @@ async function listLegacyLearningFiles(
return files;
}
async function archiveLegacySource(params: {
filePath: string;
label?: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
const label = params.label ?? "Microsoft Teams feedback-learning";
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived ${label} legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ${label} legacy source: ${String(err)}`);
}
}
function mergeLearnings(legacy: string[], existing?: FeedbackLearningEntry): string[] {
const seen = new Set<string>();
const merged: string[] = [];
@@ -347,7 +319,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
changes.push(
`Migrated ${imported} ${MSTEAMS_PLUGIN_ID} conversation ${imported === 1 ? "entry" : "entries"} -> plugin state`,
);
await archiveLegacySource({
await archiveLegacyStateSource({
filePath,
label: `${MSTEAMS_PLUGIN_ID} conversation`,
changes,
@@ -422,7 +394,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
changes.push(
`Migrated ${imported} ${MSTEAMS_PLUGIN_ID} poll ${imported === 1 ? "entry" : "entries"} -> plugin state`,
);
await archiveLegacySource({
await archiveLegacyStateSource({
filePath,
label: `${MSTEAMS_PLUGIN_ID} poll`,
changes,
@@ -486,7 +458,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
`Skipped ${skipped} malformed ${MSTEAMS_PLUGIN_ID} SSO token ${skipped === 1 ? "entry" : "entries"} during migration`,
);
}
await archiveLegacySource({
await archiveLegacyStateSource({
filePath,
label: `${MSTEAMS_PLUGIN_ID} SSO-token`,
changes,
@@ -555,7 +527,12 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
updatedAt: Date.now(),
});
imported++;
await archiveLegacySource({ filePath: file.filePath, changes, warnings });
await archiveLegacyStateSource({
filePath: file.filePath,
label: "Microsoft Teams feedback-learning",
changes,
warnings,
});
}
if (imported > 0) {
changes.unshift(

View File

@@ -1,7 +1,9 @@
// Msteams plugin module implements sent message cache behavior.
import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { getOptionalMSTeamsRuntime } from "./runtime.js";
const TTL_MS = 24 * 60 * 60 * 1000;
const MAX_ENTRIES = 20_000;
const PERSISTENT_MAX_ENTRIES = 1000;
const PERSISTENT_NAMESPACE = "msteams.sent-messages";
const MSTEAMS_SENT_MESSAGES_KEY = Symbol.for("openclaw.msteamsSentMessages");
@@ -10,144 +12,45 @@ type MSTeamsSentMessageRecord = {
sentAt: number;
};
type MSTeamsSentMessageStore = {
register(key: string, value: MSTeamsSentMessageRecord, opts?: { ttlMs?: number }): Promise<void>;
lookup(key: string): Promise<MSTeamsSentMessageRecord | undefined>;
};
const sentMessages = createPersistentDedupeCache<MSTeamsSentMessageRecord>({
globalKey: MSTEAMS_SENT_MESSAGES_KEY,
ttlMs: TTL_MS,
maxSize: MAX_ENTRIES,
persistent: {
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
openStore: (options) => getOptionalMSTeamsRuntime()?.state.openKeyedStore(options),
logError: (error) => {
try {
getOptionalMSTeamsRuntime()
?.logging.getChildLogger({ plugin: "msteams", feature: "sent-message-state" })
.warn("Microsoft Teams persistent sent-message state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Teams routing.
}
},
// Re-prime with the original send time so restored entries keep their TTL window.
readTimestamp: (record) => record.sentAt,
},
});
let sentMessageCache: Map<string, Map<string, number>> | undefined;
let persistentStore: MSTeamsSentMessageStore | undefined;
let persistentStoreDisabled = false;
function getSentMessageCache(): Map<string, Map<string, number>> {
if (!sentMessageCache) {
const globalStore = globalThis as Record<PropertyKey, unknown>;
sentMessageCache =
(globalStore[MSTEAMS_SENT_MESSAGES_KEY] as Map<string, Map<string, number>> | undefined) ??
new Map<string, Map<string, number>>();
globalStore[MSTEAMS_SENT_MESSAGES_KEY] = sentMessageCache;
}
return sentMessageCache;
}
function makePersistentKey(conversationId: string, messageId: string): string {
function makeKey(conversationId: string, messageId: string): string {
return `${conversationId}:${messageId}`;
}
function reportPersistentSentMessageError(error: unknown): void {
try {
getOptionalMSTeamsRuntime()
?.logging.getChildLogger({ plugin: "msteams", feature: "sent-message-state" })
.warn("Microsoft Teams persistent sent-message state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Teams routing.
}
}
function disablePersistentSentMessageStore(error: unknown): void {
persistentStoreDisabled = true;
persistentStore = undefined;
reportPersistentSentMessageError(error);
}
function getPersistentSentMessageStore(): MSTeamsSentMessageStore | undefined {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
const runtime = getOptionalMSTeamsRuntime();
if (!runtime) {
return undefined;
}
try {
persistentStore = runtime.state.openKeyedStore<MSTeamsSentMessageRecord>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: TTL_MS,
});
return persistentStore;
} catch (error) {
disablePersistentSentMessageStore(error);
return undefined;
}
}
function cleanupExpired(scopeKey: string, entry: Map<string, number>, now: number): void {
for (const [id, timestamp] of entry) {
if (now - timestamp > TTL_MS) {
entry.delete(id);
}
}
if (entry.size === 0) {
getSentMessageCache().delete(scopeKey);
}
}
function rememberSentMessageInMemory(
conversationId: string,
messageId: string,
sentAt: number,
): void {
const store = getSentMessageCache();
let entry = store.get(conversationId);
if (!entry) {
entry = new Map<string, number>();
store.set(conversationId, entry);
}
entry.set(messageId, sentAt);
if (entry.size > 200) {
cleanupExpired(conversationId, entry, sentAt);
}
}
function rememberPersistentSentMessage(params: {
conversationId: string;
messageId: string;
sentAt: number;
}): void {
const store = getPersistentSentMessageStore();
if (!store) {
return;
}
void store
.register(makePersistentKey(params.conversationId, params.messageId), { sentAt: params.sentAt })
.catch(disablePersistentSentMessageStore);
}
async function lookupPersistentSentMessage(params: {
conversationId: string;
messageId: string;
}): Promise<number | undefined> {
const store = getPersistentSentMessageStore();
if (!store) {
return undefined;
}
try {
return (await store.lookup(makePersistentKey(params.conversationId, params.messageId)))?.sentAt;
} catch (error) {
disablePersistentSentMessageStore(error);
return undefined;
}
}
export function recordMSTeamsSentMessage(conversationId: string, messageId: string): void {
if (!conversationId || !messageId) {
return;
}
const now = Date.now();
rememberSentMessageInMemory(conversationId, messageId, now);
rememberPersistentSentMessage({ conversationId, messageId, sentAt: now });
const sentAt = Date.now();
void sentMessages.register(makeKey(conversationId, messageId), { sentAt }, { at: sentAt });
}
export function wasMSTeamsMessageSent(conversationId: string, messageId: string): boolean {
const entry = getSentMessageCache().get(conversationId);
if (!entry) {
if (!conversationId || !messageId) {
return false;
}
cleanupExpired(conversationId, entry, Date.now());
return entry.has(messageId);
return sentMessages.peek(makeKey(conversationId, messageId));
}
export async function wasMSTeamsMessageSentWithPersistence(params: {
@@ -157,19 +60,9 @@ export async function wasMSTeamsMessageSentWithPersistence(params: {
if (!params.conversationId || !params.messageId) {
return false;
}
if (wasMSTeamsMessageSent(params.conversationId, params.messageId)) {
return true;
}
const sentAt = await lookupPersistentSentMessage(params);
if (sentAt == null) {
return false;
}
rememberSentMessageInMemory(params.conversationId, params.messageId, sentAt);
return wasMSTeamsMessageSent(params.conversationId, params.messageId);
return await sentMessages.lookup(makeKey(params.conversationId, params.messageId));
}
export function clearMSTeamsSentMessageCache(): void {
getSentMessageCache().clear();
persistentStore = undefined;
persistentStoreDisabled = false;
sentMessages.clearForTest();
}

View File

@@ -2,7 +2,10 @@
import type { Dirent } from "node:fs";
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
import { normalizeNostrStateAccountId } from "./src/state-account-id.js";
type NostrBusState = {
@@ -75,15 +78,6 @@ function parseProfileState(value: unknown): NostrProfileState | null {
};
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
async function readJsonFile(filePath: string): Promise<unknown> {
return JSON.parse(await fs.readFile(filePath, "utf8")) as unknown;
}
@@ -121,27 +115,6 @@ async function listLegacyFiles(params: {
return files;
}
async function archiveLegacySource(params: {
filePath: string;
label: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${params.label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived ${params.label} legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`);
}
}
async function ensureStoreCapacity(params: {
files: Array<{ accountId: string }>;
store: { entries: () => Promise<Array<{ key: string; value: unknown }>> };
@@ -210,7 +183,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
existingKeys.add(file.accountId);
imported++;
}
await archiveLegacySource({
await archiveLegacyStateSource({
filePath: file.filePath,
label: "Nostr bus state",
changes,
@@ -272,7 +245,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
existingKeys.add(file.accountId);
imported++;
}
await archiveLegacySource({
await archiveLegacyStateSource({
filePath: file.filePath,
label: "Nostr profile state",
changes,

View File

@@ -1,7 +1,10 @@
// Phone Control API module exposes the plugin public contract.
import fs from "node:fs/promises";
import path from "node:path";
import type { PluginDoctorStateMigration } from "openclaw/plugin-sdk/runtime-doctor";
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
} from "openclaw/plugin-sdk/runtime-doctor";
type ArmGroup = "camera" | "screen" | "writes" | "all";
@@ -31,15 +34,6 @@ function resolveArmStatePath(stateDir: string): string {
return path.join(stateDir, "plugins", "phone-control", "armed.json");
}
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
function isStringArray(value: unknown): value is string[] {
return Array.isArray(value) && value.every((entry) => typeof entry === "string");
}
@@ -99,28 +93,6 @@ async function readLegacyArmState(filePath: string): Promise<ArmStateFile | null
}
}
async function archiveLegacySource(params: {
filePath: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated Phone Control armed-state source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived Phone Control armed-state legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(
`Failed archiving Phone Control armed-state legacy source: ${String(err)}`,
);
}
}
export const stateMigrations: PluginDoctorStateMigration[] = [
{
id: "phone-control-armed-json-to-plugin-state",
@@ -156,7 +128,12 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
}
await store.register(ARM_STATE_KEY, state);
changes.push("Migrated Phone Control armed state -> plugin state");
await archiveLegacySource({ filePath, changes, warnings });
await archiveLegacyStateSource({
filePath,
label: "Phone Control armed-state",
changes,
warnings,
});
return { changes, warnings };
},
},

View File

@@ -1,5 +1,5 @@
// Slack plugin module implements inbound delivery state behavior.
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { getOptionalSlackRuntime } from "../runtime.js";
import type { SlackMessageEvent } from "../types.js";
@@ -13,92 +13,30 @@ type SlackInboundDeliveryRecord = {
deliveredAt: number;
};
type SlackInboundDeliveryStore = {
register(
key: string,
value: SlackInboundDeliveryRecord,
opts?: { ttlMs?: number },
): Promise<void>;
lookup(key: string): Promise<SlackInboundDeliveryRecord | undefined>;
};
const deliveredMessages = resolveGlobalDedupeCache(SLACK_INBOUND_DELIVERIES_KEY, {
const deliveredMessages = createPersistentDedupeCache<SlackInboundDeliveryRecord>({
globalKey: SLACK_INBOUND_DELIVERIES_KEY,
ttlMs: TTL_MS,
maxSize: MAX_ENTRIES,
persistent: {
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
openStore: (options) => getOptionalSlackRuntime()?.state.openKeyedStore(options),
logError: (error) => {
try {
getOptionalSlackRuntime()
?.logging.getChildLogger({ plugin: "slack", feature: "inbound-delivery-state" })
.warn("Slack persistent inbound delivery state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Slack message handling.
}
},
},
});
let persistentStore: SlackInboundDeliveryStore | undefined;
let persistentStoreDisabled = false;
function makeKey(accountId: string, channelId: string, ts: string): string {
return `${accountId}:${channelId}:${ts}`;
}
function reportPersistentInboundDeliveryError(error: unknown): void {
try {
getOptionalSlackRuntime()
?.logging.getChildLogger({ plugin: "slack", feature: "inbound-delivery-state" })
.warn("Slack persistent inbound delivery state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Slack message handling.
}
}
function disablePersistentInboundDelivery(error: unknown): void {
persistentStoreDisabled = true;
persistentStore = undefined;
reportPersistentInboundDeliveryError(error);
}
function getPersistentInboundDeliveryStore(): SlackInboundDeliveryStore | undefined {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
const runtime = getOptionalSlackRuntime();
if (!runtime) {
return undefined;
}
try {
persistentStore = runtime.state.openKeyedStore<SlackInboundDeliveryRecord>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: TTL_MS,
});
return persistentStore;
} catch (error) {
disablePersistentInboundDelivery(error);
return undefined;
}
}
async function lookupPersistentInboundDelivery(key: string): Promise<boolean> {
const store = getPersistentInboundDeliveryStore();
if (!store) {
return false;
}
try {
return Boolean(await store.lookup(key));
} catch (error) {
disablePersistentInboundDelivery(error);
return false;
}
}
async function rememberPersistentInboundDelivery(key: string, deliveredAt: number): Promise<void> {
const store = getPersistentInboundDeliveryStore();
if (!store) {
return;
}
try {
await store.register(key, { deliveredAt });
} catch (error) {
disablePersistentInboundDelivery(error);
}
}
export async function hasSlackInboundMessageDelivery(params: {
accountId: string;
channelId: string | undefined;
@@ -107,15 +45,7 @@ export async function hasSlackInboundMessageDelivery(params: {
if (!params.accountId || !params.channelId || !params.ts) {
return false;
}
const key = makeKey(params.accountId, params.channelId, params.ts);
if (deliveredMessages.peek(key)) {
return true;
}
const found = await lookupPersistentInboundDelivery(key);
if (found) {
deliveredMessages.check(key);
}
return found;
return await deliveredMessages.lookup(makeKey(params.accountId, params.channelId, params.ts));
}
export async function recordSlackInboundMessageDeliveries(params: {
@@ -133,17 +63,13 @@ export async function recordSlackInboundMessageDeliveries(params: {
}
keys.add(makeKey(params.accountId, message.channel, message.ts));
}
if (keys.size === 0) {
return;
}
for (const key of keys) {
deliveredMessages.check(key, deliveredAt);
}
await Promise.all(Array.from(keys, (key) => rememberPersistentInboundDelivery(key, deliveredAt)));
await Promise.all(
Array.from(keys, (key) =>
deliveredMessages.register(key, { deliveredAt }, { at: deliveredAt }),
),
);
}
export function clearSlackInboundDeliveryStateForTest(): void {
deliveredMessages.clear();
persistentStore = undefined;
persistentStoreDisabled = false;
deliveredMessages.clearForTest();
}

View File

@@ -1,11 +1,10 @@
// Slack plugin module implements sent thread cache behavior.
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { createPersistentDedupeCache } from "openclaw/plugin-sdk/dedupe-runtime";
import { getOptionalSlackRuntime } from "./runtime.js";
/**
* In-memory cache of Slack threads the bot has participated in.
* Cache of Slack threads the bot has participated in.
* Used to auto-respond in threads without requiring @mention after the first reply.
* Follows a similar TTL pattern to the MS Teams and Telegram sent-message caches.
*/
const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
@@ -18,99 +17,35 @@ type SlackThreadParticipationRecord = {
repliedAt: number;
};
type SlackThreadParticipationStore = {
register(
key: string,
value: SlackThreadParticipationRecord,
opts?: { ttlMs?: number },
): Promise<void>;
lookup(key: string): Promise<SlackThreadParticipationRecord | undefined>;
};
/**
* Keep Slack thread participation shared across bundled chunks so thread
* auto-reply gating does not diverge between prepare/dispatch call paths.
*/
const SLACK_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.slackThreadParticipation");
const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_KEY, {
const threadParticipation = createPersistentDedupeCache<SlackThreadParticipationRecord>({
globalKey: SLACK_THREAD_PARTICIPATION_KEY,
ttlMs: TTL_MS,
maxSize: MAX_ENTRIES,
persistent: {
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
openStore: (options) => getOptionalSlackRuntime()?.state.openKeyedStore(options),
logError: (error) => {
try {
getOptionalSlackRuntime()
?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" })
.warn("Slack persistent thread participation state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Slack message handling.
}
},
},
});
let persistentStore: SlackThreadParticipationStore | undefined;
let persistentStoreDisabled = false;
function makeKey(accountId: string, channelId: string, threadTs: string): string {
return `${accountId}:${channelId}:${threadTs}`;
}
function reportPersistentThreadParticipationError(error: unknown): void {
try {
getOptionalSlackRuntime()
?.logging.getChildLogger({ plugin: "slack", feature: "thread-participation-state" })
.warn("Slack persistent thread participation state failed", { error: String(error) });
} catch {
// Best effort only: persistent state must never break Slack message handling.
}
}
function disablePersistentThreadParticipation(error: unknown): void {
persistentStoreDisabled = true;
persistentStore = undefined;
reportPersistentThreadParticipationError(error);
}
function getPersistentThreadParticipationStore(): SlackThreadParticipationStore | undefined {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
const runtime = getOptionalSlackRuntime();
if (!runtime) {
return undefined;
}
try {
persistentStore = runtime.state.openKeyedStore<SlackThreadParticipationRecord>({
namespace: PERSISTENT_NAMESPACE,
maxEntries: PERSISTENT_MAX_ENTRIES,
defaultTtlMs: TTL_MS,
});
return persistentStore;
} catch (error) {
disablePersistentThreadParticipation(error);
return undefined;
}
}
function rememberPersistentThreadParticipation(params: { key: string; agentId?: string }): void {
const store = getPersistentThreadParticipationStore();
if (!store) {
return;
}
void store
.register(params.key, {
// Stored for future per-agent thread routing; current reads only need presence.
...(params.agentId ? { agentId: params.agentId } : {}),
repliedAt: Date.now(),
})
.catch(disablePersistentThreadParticipation);
}
async function lookupPersistentThreadParticipation(key: string): Promise<boolean> {
const store = getPersistentThreadParticipationStore();
if (!store) {
return false;
}
try {
return Boolean(await store.lookup(key));
} catch (error) {
disablePersistentThreadParticipation(error);
return false;
}
}
export function recordSlackThreadParticipation(
accountId: string,
channelId: string,
@@ -120,9 +55,11 @@ export function recordSlackThreadParticipation(
if (!accountId || !channelId || !threadTs) {
return;
}
const key = makeKey(accountId, channelId, threadTs);
threadParticipation.check(key);
rememberPersistentThreadParticipation({ key, agentId: opts?.agentId });
void threadParticipation.register(makeKey(accountId, channelId, threadTs), {
// Stored for future per-agent thread routing; current reads only need presence.
...(opts?.agentId ? { agentId: opts.agentId } : {}),
repliedAt: Date.now(),
});
}
export function hasSlackThreadParticipation(
@@ -144,19 +81,11 @@ export async function hasSlackThreadParticipationWithPersistence(params: {
if (!params.accountId || !params.channelId || !params.threadTs) {
return false;
}
const key = makeKey(params.accountId, params.channelId, params.threadTs);
if (threadParticipation.peek(key)) {
return true;
}
const found = await lookupPersistentThreadParticipation(key);
if (found) {
threadParticipation.check(key);
}
return found;
return await threadParticipation.lookup(
makeKey(params.accountId, params.channelId, params.threadTs),
);
}
export function clearSlackThreadParticipationCache(): void {
threadParticipation.clear();
persistentStore = undefined;
persistentStoreDisabled = false;
threadParticipation.clearForTest();
}

View File

@@ -4,9 +4,10 @@ import os from "node:os";
import path from "node:path";
import type { OpenClawConfig } from "openclaw/plugin-sdk/plugin-entry";
import { normalizeAgentId } from "openclaw/plugin-sdk/routing";
import type {
PluginDoctorStateMigration,
PluginStateKeyedStore,
import {
archiveLegacyStateSource,
type PluginDoctorStateMigration,
type PluginStateKeyedStore,
} from "openclaw/plugin-sdk/runtime-doctor";
import {
buildVoiceCallLegacyJsonlEventKey,
@@ -126,14 +127,6 @@ function resolveVoiceCallStorePath(params: {
}
/** Return true when a path exists and is a file. */
async function fileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
/** Build the plugin state key for one migrated event chunk. */
function buildChunkKey(eventKey: string, index: number): string {
@@ -214,25 +207,6 @@ async function readLegacyCallRecords(filePath: string): Promise<{
}
/** Archive the legacy JSONL source after a complete migration. */
async function archiveLegacySource(params: {
filePath: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await fileExists(archivedPath)) {
params.warnings.push(
`Left migrated Voice Call call-log source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived Voice Call call-log legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving Voice Call call-log legacy source: ${String(err)}`);
}
}
/** Select newest missing records that fit remaining plugin state capacity. */
async function selectEntriesForImport(params: {
@@ -356,7 +330,7 @@ export const stateMigrations: PluginDoctorStateMigration[] = [
warnings.push("Left Voice Call call-log source in place because migration was incomplete");
return { changes, warnings };
}
await archiveLegacySource({ filePath, changes, warnings });
await archiveLegacyStateSource({ filePath, label: "Voice Call call-log", changes, warnings });
return { changes, warnings };
},
},

View File

@@ -202,8 +202,8 @@ let publicDeprecatedExportsByEntrypointBudget;
try {
budgets = {
publicEntrypoints: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_ENTRYPOINTS", 323),
publicExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_EXPORTS", 10412),
publicFunctionExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_FUNCTION_EXPORTS", 5227),
publicExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_EXPORTS", 10416),
publicFunctionExports: readBudgetEnv("OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_FUNCTION_EXPORTS", 5230),
publicDeprecatedExports: readBudgetEnv(
"OPENCLAW_PLUGIN_SDK_MAX_PUBLIC_DEPRECATED_EXPORTS",
3261,

View File

@@ -0,0 +1,116 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { createPersistentDedupeCache } from "./dedupe-runtime.js";
type Record = { at: number };
function createMemoryStore() {
const entries = new Map<string, Record>();
return {
entries,
store: {
register: vi.fn(async (key: string, value: Record) => {
entries.set(key, value);
}),
lookup: vi.fn(async (key: string) => entries.get(key)),
},
};
}
function createCache(params?: {
openStore?: () => ReturnType<typeof createMemoryStore>["store"] | undefined;
logError?: (error: unknown) => void;
readTimestamp?: (record: Record) => number | undefined;
}) {
const backing = createMemoryStore();
const cache = createPersistentDedupeCache<Record>({
// Plain Symbol() is unique per cache, so parallel tests never share memory layers.
globalKey: Symbol("test.persistent-dedupe"),
ttlMs: 60_000,
maxSize: 100,
persistent: {
namespace: "test.persistent-dedupe",
maxEntries: 100,
openStore: params?.openStore ?? (() => backing.store),
logError: params?.logError,
readTimestamp: params?.readTimestamp,
},
});
return { cache, backing };
}
describe("createPersistentDedupeCache", () => {
afterEach(() => {
vi.useRealTimers();
});
it("records presence in both layers and answers from memory first", async () => {
const { cache, backing } = createCache();
await cache.register("k1", { at: 1 });
expect(cache.peek("k1")).toBe(true);
expect(await cache.lookup("k1")).toBe(true);
expect(backing.store.register).toHaveBeenCalledWith("k1", { at: 1 });
expect(backing.store.lookup).not.toHaveBeenCalled();
});
it("falls back to persistence and re-primes memory on a hit", async () => {
const { cache, backing } = createCache();
backing.entries.set("k2", { at: 42 });
expect(cache.peek("k2")).toBe(false);
expect(await cache.lookup("k2")).toBe(true);
expect(cache.peek("k2")).toBe(true);
});
it("re-primes memory with the persisted timestamp when provided", async () => {
vi.useFakeTimers();
vi.setSystemTime(1_000_000);
const { cache, backing } = createCache({ readTimestamp: (record) => record.at });
backing.entries.set("k3", { at: 1_000_000 - 59_000 });
expect(await cache.lookup("k3")).toBe(true);
// Re-primed at the original timestamp: expires 59s later instead of a fresh 60s TTL.
vi.setSystemTime(1_000_000 + 2_000);
expect(cache.peek("k3")).toBe(false);
});
it("disables persistence after an open failure and never rejects", async () => {
const logError = vi.fn();
const openStore = vi.fn(() => {
throw new Error("sqlite unavailable");
});
const { cache } = createCache({ openStore, logError });
await expect(cache.register("k4", { at: 1 })).resolves.toBeUndefined();
expect(cache.peek("k4")).toBe(true);
expect(await cache.lookup("k5")).toBe(false);
expect(openStore).toHaveBeenCalledTimes(1);
expect(logError).toHaveBeenCalledTimes(1);
});
it("disables persistence after a lookup failure", async () => {
const logError = vi.fn();
const store = {
register: vi.fn(async () => {}),
lookup: vi.fn(async () => {
throw new Error("read failed");
}),
};
const { cache } = createCache({ openStore: () => store, logError });
expect(await cache.lookup("k6")).toBe(false);
expect(logError).toHaveBeenCalledTimes(1);
await cache.register("k7", { at: 1 });
expect(store.register).not.toHaveBeenCalled();
});
it("clearForTest resets memory and re-enables persistence", async () => {
const openStore = vi
.fn()
.mockImplementationOnce(() => {
throw new Error("boom");
})
.mockImplementation(() => createMemoryStore().store);
const { cache } = createCache({ openStore });
await cache.register("k8", { at: 1 });
cache.clearForTest();
expect(cache.peek("k8")).toBe(false);
await cache.register("k9", { at: 1 });
expect(openStore).toHaveBeenCalledTimes(2);
});
});

View File

@@ -1,3 +1,120 @@
// In-memory dedupe helpers for plugin runtime hot paths.
import { resolveGlobalDedupeCache } from "../infra/dedupe.js";
import type { OpenKeyedStoreOptions } from "./plugin-state-runtime.js";
export { createDedupeCache, resolveGlobalDedupeCache } from "../infra/dedupe.js";
type PersistentDedupeStore<TRecord> = {
register(key: string, value: TRecord, opts?: { ttlMs?: number }): Promise<void>;
lookup(key: string): Promise<TRecord | undefined>;
};
/** Dual-layer presence cache: process-memory dedupe plus best-effort persistent state. */
export type PersistentDedupeCache<TRecord> = {
/** Memory-only presence check without refreshing recency. */
peek(key: string): boolean;
/** Memory-first presence check; falls back to persistence and re-primes memory on a hit. */
lookup(key: string): Promise<boolean>;
/** Records presence in memory and best-effort persistence. Never rejects. */
register(key: string, record: TRecord, opts?: { at?: number }): Promise<void>;
/** Clears memory and re-enables a persistent layer disabled by an earlier failure. */
clearForTest(): void;
};
/**
* Creates a channel-family presence cache backed by a global in-memory dedupe layer
* plus a lazily opened plugin keyed store. Persistence is best effort: the first
* open/read/write failure disables the persistent layer for the process so message
* handling never breaks on state errors, matching the shipped channel-cache contract.
*/
export function createPersistentDedupeCache<TRecord>(params: {
/** Global symbol key so the memory layer stays shared across bundled chunks. */
globalKey: symbol;
ttlMs: number;
maxSize: number;
persistent: {
namespace: string;
maxEntries: number;
/** Usually `() => runtime?.state.openKeyedStore(options)`; undefined skips persistence. */
openStore: (options: OpenKeyedStoreOptions) => PersistentDedupeStore<TRecord> | undefined;
logError?: (error: unknown) => void;
/** Memory re-prime timestamp after a persistent hit; defaults to now. */
readTimestamp?: (record: TRecord) => number | undefined;
};
}): PersistentDedupeCache<TRecord> {
const memory = resolveGlobalDedupeCache(params.globalKey, {
ttlMs: params.ttlMs,
maxSize: params.maxSize,
});
let persistentStore: PersistentDedupeStore<TRecord> | undefined;
let persistentStoreDisabled = false;
const disablePersistentStore = (error: unknown) => {
persistentStoreDisabled = true;
persistentStore = undefined;
params.persistent.logError?.(error);
};
const getPersistentStore = (): PersistentDedupeStore<TRecord> | undefined => {
if (persistentStoreDisabled) {
return undefined;
}
if (persistentStore) {
return persistentStore;
}
try {
persistentStore = params.persistent.openStore({
namespace: params.persistent.namespace,
maxEntries: params.persistent.maxEntries,
defaultTtlMs: params.ttlMs,
});
return persistentStore;
} catch (error) {
disablePersistentStore(error);
return undefined;
}
};
return {
peek: (key) => memory.peek(key),
lookup: async (key) => {
if (memory.peek(key)) {
return true;
}
const store = getPersistentStore();
if (!store) {
return false;
}
let record: TRecord | undefined;
try {
record = await store.lookup(key);
} catch (error) {
disablePersistentStore(error);
return false;
}
if (record === undefined) {
return false;
}
memory.check(key, params.persistent.readTimestamp?.(record));
return true;
},
register: async (key, record, opts) => {
memory.check(key, opts?.at);
const store = getPersistentStore();
if (!store) {
return;
}
try {
await store.register(key, record);
} catch (error) {
disablePersistentStore(error);
}
},
clearForTest: () => {
memory.clear();
persistentStore = undefined;
persistentStoreDisabled = false;
},
};
}

View File

@@ -29,4 +29,8 @@ export type {
PluginDoctorStateMigration,
PluginDoctorStateMigrationContext,
} from "../plugins/doctor-contract-registry.js";
export {
archiveLegacyStateSource,
legacyStateFileExists,
} from "../plugins/doctor-state-migration-fs.js";
export type { DoctorSessionRouteStateOwner } from "../plugins/doctor-session-route-state-owner-types.js";

View File

@@ -0,0 +1,38 @@
// Shared filesystem helpers for plugin doctor legacy-state migrations.
import fs from "node:fs/promises";
/** True when the legacy-state path exists and is a regular file. */
export async function legacyStateFileExists(filePath: string): Promise<boolean> {
try {
const stat = await fs.stat(filePath);
return stat.isFile();
} catch {
return false;
}
}
/**
* Renames a migrated legacy source to `<path>.migrated`, recording the outcome in the
* doctor changes/warnings lists. Never throws: a failed archive leaves the source in
* place so a later doctor run can retry without losing migrated data.
*/
export async function archiveLegacyStateSource(params: {
filePath: string;
label: string;
changes: string[];
warnings: string[];
}): Promise<void> {
const archivedPath = `${params.filePath}.migrated`;
if (await legacyStateFileExists(archivedPath)) {
params.warnings.push(
`Left migrated ${params.label} source in place because ${archivedPath} already exists`,
);
return;
}
try {
await fs.rename(params.filePath, archivedPath);
params.changes.push(`Archived ${params.label} legacy source -> ${archivedPath}`);
} catch (err) {
params.warnings.push(`Failed archiving ${params.label} legacy source: ${String(err)}`);
}
}