refactor: dedupe extension runtime caches

This commit is contained in:
Peter Steinberger
2026-03-22 17:58:29 +00:00
parent f095bbd7b0
commit 89d65521fe
8 changed files with 91 additions and 197 deletions

View File

@@ -1,19 +1,10 @@
import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime";
import type { DiscordComponentEntry, DiscordModalEntry } from "./components.js";
const DEFAULT_COMPONENT_TTL_MS = 30 * 60 * 1000;
const DISCORD_COMPONENT_ENTRIES_KEY = Symbol.for("openclaw.discord.componentEntries");
const DISCORD_MODAL_ENTRIES_KEY = Symbol.for("openclaw.discord.modalEntries");
function resolveGlobalMap<TKey, TValue>(key: symbol): Map<TKey, TValue> {
const globalStore = globalThis as Record<PropertyKey, unknown>;
if (globalStore[key] instanceof Map) {
return globalStore[key] as Map<TKey, TValue>;
}
const created = new Map<TKey, TValue>();
globalStore[key] = created;
return created;
}
const componentEntries = resolveGlobalMap<string, DiscordComponentEntry>(
DISCORD_COMPONENT_ENTRIES_KEY,
);
@@ -33,6 +24,42 @@ function normalizeEntryTimestamps<T extends { createdAt?: number; expiresAt?: nu
return { ...entry, createdAt, expiresAt };
}
function registerEntries<
T extends { id: string; messageId?: string; createdAt?: number; expiresAt?: number },
>(
entries: T[],
store: Map<string, T>,
params: { now: number; ttlMs: number; messageId?: string },
): void {
for (const entry of entries) {
const normalized = normalizeEntryTimestamps(
{ ...entry, messageId: params.messageId ?? entry.messageId },
params.now,
params.ttlMs,
);
store.set(entry.id, normalized);
}
}
function resolveEntry<T extends { expiresAt?: number }>(
store: Map<string, T>,
params: { id: string; consume?: boolean },
): T | null {
const entry = store.get(params.id);
if (!entry) {
return null;
}
const now = Date.now();
if (isExpired(entry, now)) {
store.delete(params.id);
return null;
}
if (params.consume !== false) {
store.delete(params.id);
}
return entry;
}
export function registerDiscordComponentEntries(params: {
entries: DiscordComponentEntry[];
modals: DiscordModalEntry[];
@@ -41,60 +68,22 @@ export function registerDiscordComponentEntries(params: {
}): void {
const now = Date.now();
const ttlMs = params.ttlMs ?? DEFAULT_COMPONENT_TTL_MS;
for (const entry of params.entries) {
const normalized = normalizeEntryTimestamps(
{ ...entry, messageId: params.messageId ?? entry.messageId },
now,
ttlMs,
);
componentEntries.set(entry.id, normalized);
}
for (const modal of params.modals) {
const normalized = normalizeEntryTimestamps(
{ ...modal, messageId: params.messageId ?? modal.messageId },
now,
ttlMs,
);
modalEntries.set(modal.id, normalized);
}
registerEntries(params.entries, componentEntries, { now, ttlMs, messageId: params.messageId });
registerEntries(params.modals, modalEntries, { now, ttlMs, messageId: params.messageId });
}
export function resolveDiscordComponentEntry(params: {
id: string;
consume?: boolean;
}): DiscordComponentEntry | null {
const entry = componentEntries.get(params.id);
if (!entry) {
return null;
}
const now = Date.now();
if (isExpired(entry, now)) {
componentEntries.delete(params.id);
return null;
}
if (params.consume !== false) {
componentEntries.delete(params.id);
}
return entry;
return resolveEntry(componentEntries, params);
}
export function resolveDiscordModalEntry(params: {
id: string;
consume?: boolean;
}): DiscordModalEntry | null {
const entry = modalEntries.get(params.id);
if (!entry) {
return null;
}
const now = Date.now();
if (isExpired(entry, now)) {
modalEntries.delete(params.id);
return null;
}
if (params.consume !== false) {
modalEntries.delete(params.id);
}
return entry;
return resolveEntry(modalEntries, params);
}
export function clearDiscordComponentEntries(): void {

View File

@@ -52,16 +52,15 @@ type FeishuThreadBindingsState = {
};
const FEISHU_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.feishuThreadBindingsState");
let state: FeishuThreadBindingsState | undefined;
const state = resolveGlobalSingleton<FeishuThreadBindingsState>(
FEISHU_THREAD_BINDINGS_STATE_KEY,
() => ({
managersByAccountId: new Map(),
bindingsByAccountConversation: new Map(),
}),
);
function getState(): FeishuThreadBindingsState {
state ??= resolveGlobalSingleton<FeishuThreadBindingsState>(
FEISHU_THREAD_BINDINGS_STATE_KEY,
() => ({
managersByAccountId: new Map(),
bindingsByAccountConversation: new Map(),
}),
);
return state;
}

View File

@@ -1,44 +1,23 @@
import { createScopedExpiringIdCache } from "openclaw/plugin-sdk/text-runtime";
const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
type CacheEntry = {
timestamps: Map<string, number>;
};
const sentMessages = new Map<string, CacheEntry>();
function cleanupExpired(entry: CacheEntry): void {
const now = Date.now();
for (const [msgId, timestamp] of entry.timestamps) {
if (now - timestamp > TTL_MS) {
entry.timestamps.delete(msgId);
}
}
}
const sentMessageCache = createScopedExpiringIdCache<string, string>({
store: new Map<string, Map<string, number>>(),
ttlMs: TTL_MS,
cleanupThreshold: 200,
});
export function recordMSTeamsSentMessage(conversationId: string, messageId: string): void {
if (!conversationId || !messageId) {
return;
}
let entry = sentMessages.get(conversationId);
if (!entry) {
entry = { timestamps: new Map() };
sentMessages.set(conversationId, entry);
}
entry.timestamps.set(messageId, Date.now());
if (entry.timestamps.size > 200) {
cleanupExpired(entry);
}
sentMessageCache.record(conversationId, messageId);
}
export function wasMSTeamsMessageSent(conversationId: string, messageId: string): boolean {
const entry = sentMessages.get(conversationId);
if (!entry) {
return false;
}
cleanupExpired(entry);
return entry.timestamps.has(messageId);
return sentMessageCache.has(conversationId, messageId);
}
export function clearMSTeamsSentMessageCache(): void {
sentMessages.clear();
sentMessageCache.clear();
}

View File

@@ -1,4 +1,4 @@
import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime";
import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/infra-runtime";
/**
* In-memory cache of Slack threads the bot has participated in.
@@ -14,34 +14,15 @@ const MAX_ENTRIES = 5000;
* auto-reply gating does not diverge between prepare/dispatch call paths.
*/
const SLACK_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.slackThreadParticipation");
let threadParticipation: Map<string, number> | undefined;
function getThreadParticipation(): Map<string, number> {
threadParticipation ??= resolveGlobalMap<string, number>(SLACK_THREAD_PARTICIPATION_KEY);
return threadParticipation;
}
const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_KEY, {
ttlMs: TTL_MS,
maxSize: MAX_ENTRIES,
});
function makeKey(accountId: string, channelId: string, threadTs: string): string {
return `${accountId}:${channelId}:${threadTs}`;
}
function evictExpired(): void {
const now = Date.now();
for (const [key, timestamp] of getThreadParticipation()) {
if (now - timestamp > TTL_MS) {
getThreadParticipation().delete(key);
}
}
}
function evictOldest(): void {
const oldest = getThreadParticipation().keys().next().value;
if (oldest) {
getThreadParticipation().delete(oldest);
}
}
export function recordSlackThreadParticipation(
accountId: string,
channelId: string,
@@ -50,14 +31,7 @@ export function recordSlackThreadParticipation(
if (!accountId || !channelId || !threadTs) {
return;
}
const threadParticipation = getThreadParticipation();
if (threadParticipation.size >= MAX_ENTRIES) {
evictExpired();
}
if (threadParticipation.size >= MAX_ENTRIES) {
evictOldest();
}
threadParticipation.set(makeKey(accountId, channelId, threadTs), Date.now());
threadParticipation.check(makeKey(accountId, channelId, threadTs));
}
export function hasSlackThreadParticipation(
@@ -68,19 +42,9 @@ export function hasSlackThreadParticipation(
if (!accountId || !channelId || !threadTs) {
return false;
}
const key = makeKey(accountId, channelId, threadTs);
const threadParticipation = getThreadParticipation();
const timestamp = threadParticipation.get(key);
if (timestamp == null) {
return false;
}
if (Date.now() - timestamp > TTL_MS) {
threadParticipation.delete(key);
return false;
}
return true;
return threadParticipation.peek(makeKey(accountId, channelId, threadTs));
}
export function clearSlackThreadParticipationCache(): void {
getThreadParticipation().clear();
threadParticipation.clear();
}

View File

@@ -27,18 +27,11 @@ type TelegramSendMessageDraft = (
* lanes do not accidentally reuse draft ids when code-split entries coexist.
*/
const TELEGRAM_DRAFT_STREAM_STATE_KEY = Symbol.for("openclaw.telegramDraftStreamState");
let draftStreamState: { nextDraftId: number } | undefined;
function getDraftStreamState(): { nextDraftId: number } {
draftStreamState ??= resolveGlobalSingleton(TELEGRAM_DRAFT_STREAM_STATE_KEY, () => ({
nextDraftId: 0,
}));
return draftStreamState;
}
const draftStreamState = resolveGlobalSingleton(TELEGRAM_DRAFT_STREAM_STATE_KEY, () => ({
nextDraftId: 0,
}));
function allocateTelegramDraftId(): number {
const draftStreamState = getDraftStreamState();
draftStreamState.nextDraftId =
draftStreamState.nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : draftStreamState.nextDraftId + 1;
return draftStreamState.nextDraftId;
@@ -460,6 +453,6 @@ export function createTelegramDraftStream(params: {
export const __testing = {
resetTelegramDraftStreamForTests() {
getDraftStreamState().nextDraftId = 0;
draftStreamState.nextDraftId = 0;
},
};

View File

@@ -1,4 +1,4 @@
import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime";
import { createScopedExpiringIdCache, resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime";
/**
* In-memory cache of sent message IDs per chat.
@@ -7,71 +7,42 @@ import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime";
const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours
type CacheEntry = {
timestamps: Map<number, number>;
};
/**
* Keep sent-message tracking shared across bundled chunks so Telegram reaction
* filters see the same sent-message history regardless of which chunk recorded it.
*/
const TELEGRAM_SENT_MESSAGES_KEY = Symbol.for("openclaw.telegramSentMessages");
let sentMessages: Map<string, CacheEntry> | undefined;
let sentMessages: Map<string, Map<string, number>> | undefined;
function getSentMessages(): Map<string, CacheEntry> {
sentMessages ??= resolveGlobalMap<string, CacheEntry>(TELEGRAM_SENT_MESSAGES_KEY);
function getSentMessages(): Map<string, Map<string, number>> {
sentMessages ??= resolveGlobalMap<string, Map<string, number>>(TELEGRAM_SENT_MESSAGES_KEY);
return sentMessages;
}
function getChatKey(chatId: number | string): string {
return String(chatId);
}
function cleanupExpired(entry: CacheEntry): void {
const now = Date.now();
for (const [msgId, timestamp] of entry.timestamps) {
if (now - timestamp > TTL_MS) {
entry.timestamps.delete(msgId);
}
}
}
const sentMessageCache = createScopedExpiringIdCache<number | string, number>({
store: getSentMessages(),
ttlMs: TTL_MS,
cleanupThreshold: 100,
});
/**
* Record a message ID as sent by the bot.
*/
export function recordSentMessage(chatId: number | string, messageId: number): void {
const key = getChatKey(chatId);
const sentMessages = getSentMessages();
let entry = sentMessages.get(key);
if (!entry) {
entry = { timestamps: new Map() };
sentMessages.set(key, entry);
}
entry.timestamps.set(messageId, Date.now());
// Periodic cleanup
if (entry.timestamps.size > 100) {
cleanupExpired(entry);
}
sentMessageCache.record(chatId, messageId);
}
/**
* Check if a message was sent by the bot.
*/
export function wasSentByBot(chatId: number | string, messageId: number): boolean {
const key = getChatKey(chatId);
const entry = getSentMessages().get(key);
if (!entry) {
return false;
}
// Clean up expired entries on read
cleanupExpired(entry);
return entry.timestamps.has(messageId);
return sentMessageCache.has(chatId, messageId);
}
/**
* Clear all cached entries (for testing).
*/
export function clearSentMessageCache(): void {
getSentMessages().clear();
sentMessageCache.clear();
}

View File

@@ -77,18 +77,16 @@ type TelegramThreadBindingsState = {
* binding lookups, and binding mutations all observe the same live registry.
*/
const TELEGRAM_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.telegramThreadBindingsState");
let threadBindingsState: TelegramThreadBindingsState | undefined;
const threadBindingsState = resolveGlobalSingleton<TelegramThreadBindingsState>(
TELEGRAM_THREAD_BINDINGS_STATE_KEY,
() => ({
managersByAccountId: new Map<string, TelegramThreadBindingManager>(),
bindingsByAccountConversation: new Map<string, TelegramThreadBindingRecord>(),
persistQueueByAccountId: new Map<string, Promise<void>>(),
}),
);
function getThreadBindingsState(): TelegramThreadBindingsState {
threadBindingsState ??= resolveGlobalSingleton<TelegramThreadBindingsState>(
TELEGRAM_THREAD_BINDINGS_STATE_KEY,
() => ({
managersByAccountId: new Map<string, TelegramThreadBindingManager>(),
bindingsByAccountConversation: new Map<string, TelegramThreadBindingRecord>(),
persistQueueByAccountId: new Map<string, Promise<void>>(),
}),
);
return threadBindingsState;
}

View File

@@ -10,6 +10,7 @@ export * from "../markdown/render.js";
export * from "../markdown/tables.js";
export * from "../markdown/whatsapp.js";
export * from "../shared/global-singleton.js";
export * from "../shared/scoped-expiring-id-cache.js";
export * from "../shared/string-normalization.js";
export * from "../shared/string-sample.js";
export * from "../shared/text/assistant-visible-text.js";