From 89d65521fe34b3f0104a262a44f181bcb25c174a Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sun, 22 Mar 2026 17:58:29 +0000 Subject: [PATCH] refactor: dedupe extension runtime caches --- extensions/discord/src/components-registry.ts | 93 ++++++++----------- extensions/feishu/src/thread-bindings.ts | 15 ++- extensions/msteams/src/sent-message-cache.ts | 41 ++------ extensions/slack/src/sent-thread-cache.ts | 52 ++--------- extensions/telegram/src/draft-stream.ts | 15 +-- extensions/telegram/src/sent-message-cache.ts | 53 +++-------- extensions/telegram/src/thread-bindings.ts | 18 ++-- src/plugin-sdk/text-runtime.ts | 1 + 8 files changed, 91 insertions(+), 197 deletions(-) diff --git a/extensions/discord/src/components-registry.ts b/extensions/discord/src/components-registry.ts index 17bbe1408b4..b839eb433e7 100644 --- a/extensions/discord/src/components-registry.ts +++ b/extensions/discord/src/components-registry.ts @@ -1,19 +1,10 @@ +import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime"; import type { DiscordComponentEntry, DiscordModalEntry } from "./components.js"; const DEFAULT_COMPONENT_TTL_MS = 30 * 60 * 1000; const DISCORD_COMPONENT_ENTRIES_KEY = Symbol.for("openclaw.discord.componentEntries"); const DISCORD_MODAL_ENTRIES_KEY = Symbol.for("openclaw.discord.modalEntries"); -function resolveGlobalMap(key: symbol): Map { - const globalStore = globalThis as Record; - if (globalStore[key] instanceof Map) { - return globalStore[key] as Map; - } - const created = new Map(); - globalStore[key] = created; - return created; -} - const componentEntries = resolveGlobalMap( DISCORD_COMPONENT_ENTRIES_KEY, ); @@ -33,6 +24,42 @@ function normalizeEntryTimestamps( + entries: T[], + store: Map, + params: { now: number; ttlMs: number; messageId?: string }, +): void { + for (const entry of entries) { + const normalized = normalizeEntryTimestamps( + { ...entry, messageId: params.messageId ?? entry.messageId }, + params.now, + params.ttlMs, + ); + store.set(entry.id, normalized); + } +} + +function resolveEntry( + store: Map, + params: { id: string; consume?: boolean }, +): T | null { + const entry = store.get(params.id); + if (!entry) { + return null; + } + const now = Date.now(); + if (isExpired(entry, now)) { + store.delete(params.id); + return null; + } + if (params.consume !== false) { + store.delete(params.id); + } + return entry; +} + export function registerDiscordComponentEntries(params: { entries: DiscordComponentEntry[]; modals: DiscordModalEntry[]; @@ -41,60 +68,22 @@ export function registerDiscordComponentEntries(params: { }): void { const now = Date.now(); const ttlMs = params.ttlMs ?? DEFAULT_COMPONENT_TTL_MS; - for (const entry of params.entries) { - const normalized = normalizeEntryTimestamps( - { ...entry, messageId: params.messageId ?? entry.messageId }, - now, - ttlMs, - ); - componentEntries.set(entry.id, normalized); - } - for (const modal of params.modals) { - const normalized = normalizeEntryTimestamps( - { ...modal, messageId: params.messageId ?? modal.messageId }, - now, - ttlMs, - ); - modalEntries.set(modal.id, normalized); - } + registerEntries(params.entries, componentEntries, { now, ttlMs, messageId: params.messageId }); + registerEntries(params.modals, modalEntries, { now, ttlMs, messageId: params.messageId }); } export function resolveDiscordComponentEntry(params: { id: string; consume?: boolean; }): DiscordComponentEntry | null { - const entry = componentEntries.get(params.id); - if (!entry) { - return null; - } - const now = Date.now(); - if (isExpired(entry, now)) { - componentEntries.delete(params.id); - return null; - } - if (params.consume !== false) { - componentEntries.delete(params.id); - } - return entry; + return resolveEntry(componentEntries, params); } export function resolveDiscordModalEntry(params: { id: string; consume?: boolean; }): DiscordModalEntry | null { - const entry = modalEntries.get(params.id); - if (!entry) { - return null; - } - const now = Date.now(); - if (isExpired(entry, now)) { - modalEntries.delete(params.id); - return null; - } - if (params.consume !== false) { - modalEntries.delete(params.id); - } - return entry; + return resolveEntry(modalEntries, params); } export function clearDiscordComponentEntries(): void { diff --git a/extensions/feishu/src/thread-bindings.ts b/extensions/feishu/src/thread-bindings.ts index b1eb1cf8b09..4ae79a35e0e 100644 --- a/extensions/feishu/src/thread-bindings.ts +++ b/extensions/feishu/src/thread-bindings.ts @@ -52,16 +52,15 @@ type FeishuThreadBindingsState = { }; const FEISHU_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.feishuThreadBindingsState"); -let state: FeishuThreadBindingsState | undefined; +const state = resolveGlobalSingleton( + FEISHU_THREAD_BINDINGS_STATE_KEY, + () => ({ + managersByAccountId: new Map(), + bindingsByAccountConversation: new Map(), + }), +); function getState(): FeishuThreadBindingsState { - state ??= resolveGlobalSingleton( - FEISHU_THREAD_BINDINGS_STATE_KEY, - () => ({ - managersByAccountId: new Map(), - bindingsByAccountConversation: new Map(), - }), - ); return state; } diff --git a/extensions/msteams/src/sent-message-cache.ts b/extensions/msteams/src/sent-message-cache.ts index f31647cefc9..26fedf2ddb5 100644 --- a/extensions/msteams/src/sent-message-cache.ts +++ b/extensions/msteams/src/sent-message-cache.ts @@ -1,44 +1,23 @@ +import { createScopedExpiringIdCache } from "openclaw/plugin-sdk/text-runtime"; + const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours - -type CacheEntry = { - timestamps: Map; -}; - -const sentMessages = new Map(); - -function cleanupExpired(entry: CacheEntry): void { - const now = Date.now(); - for (const [msgId, timestamp] of entry.timestamps) { - if (now - timestamp > TTL_MS) { - entry.timestamps.delete(msgId); - } - } -} +const sentMessageCache = createScopedExpiringIdCache({ + store: new Map>(), + ttlMs: TTL_MS, + cleanupThreshold: 200, +}); export function recordMSTeamsSentMessage(conversationId: string, messageId: string): void { if (!conversationId || !messageId) { return; } - let entry = sentMessages.get(conversationId); - if (!entry) { - entry = { timestamps: new Map() }; - sentMessages.set(conversationId, entry); - } - entry.timestamps.set(messageId, Date.now()); - if (entry.timestamps.size > 200) { - cleanupExpired(entry); - } + sentMessageCache.record(conversationId, messageId); } export function wasMSTeamsMessageSent(conversationId: string, messageId: string): boolean { - const entry = sentMessages.get(conversationId); - if (!entry) { - return false; - } - cleanupExpired(entry); - return entry.timestamps.has(messageId); + return sentMessageCache.has(conversationId, messageId); } export function clearMSTeamsSentMessageCache(): void { - sentMessages.clear(); + sentMessageCache.clear(); } diff --git a/extensions/slack/src/sent-thread-cache.ts b/extensions/slack/src/sent-thread-cache.ts index 332a7d65496..f09732345f1 100644 --- a/extensions/slack/src/sent-thread-cache.ts +++ b/extensions/slack/src/sent-thread-cache.ts @@ -1,4 +1,4 @@ -import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime"; +import { resolveGlobalDedupeCache } from "openclaw/plugin-sdk/infra-runtime"; /** * In-memory cache of Slack threads the bot has participated in. @@ -14,34 +14,15 @@ const MAX_ENTRIES = 5000; * auto-reply gating does not diverge between prepare/dispatch call paths. */ const SLACK_THREAD_PARTICIPATION_KEY = Symbol.for("openclaw.slackThreadParticipation"); - -let threadParticipation: Map | undefined; - -function getThreadParticipation(): Map { - threadParticipation ??= resolveGlobalMap(SLACK_THREAD_PARTICIPATION_KEY); - return threadParticipation; -} +const threadParticipation = resolveGlobalDedupeCache(SLACK_THREAD_PARTICIPATION_KEY, { + ttlMs: TTL_MS, + maxSize: MAX_ENTRIES, +}); function makeKey(accountId: string, channelId: string, threadTs: string): string { return `${accountId}:${channelId}:${threadTs}`; } -function evictExpired(): void { - const now = Date.now(); - for (const [key, timestamp] of getThreadParticipation()) { - if (now - timestamp > TTL_MS) { - getThreadParticipation().delete(key); - } - } -} - -function evictOldest(): void { - const oldest = getThreadParticipation().keys().next().value; - if (oldest) { - getThreadParticipation().delete(oldest); - } -} - export function recordSlackThreadParticipation( accountId: string, channelId: string, @@ -50,14 +31,7 @@ export function recordSlackThreadParticipation( if (!accountId || !channelId || !threadTs) { return; } - const threadParticipation = getThreadParticipation(); - if (threadParticipation.size >= MAX_ENTRIES) { - evictExpired(); - } - if (threadParticipation.size >= MAX_ENTRIES) { - evictOldest(); - } - threadParticipation.set(makeKey(accountId, channelId, threadTs), Date.now()); + threadParticipation.check(makeKey(accountId, channelId, threadTs)); } export function hasSlackThreadParticipation( @@ -68,19 +42,9 @@ export function hasSlackThreadParticipation( if (!accountId || !channelId || !threadTs) { return false; } - const key = makeKey(accountId, channelId, threadTs); - const threadParticipation = getThreadParticipation(); - const timestamp = threadParticipation.get(key); - if (timestamp == null) { - return false; - } - if (Date.now() - timestamp > TTL_MS) { - threadParticipation.delete(key); - return false; - } - return true; + return threadParticipation.peek(makeKey(accountId, channelId, threadTs)); } export function clearSlackThreadParticipationCache(): void { - getThreadParticipation().clear(); + threadParticipation.clear(); } diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index 7b10e52312a..3cb2d4074f2 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -27,18 +27,11 @@ type TelegramSendMessageDraft = ( * lanes do not accidentally reuse draft ids when code-split entries coexist. */ const TELEGRAM_DRAFT_STREAM_STATE_KEY = Symbol.for("openclaw.telegramDraftStreamState"); - -let draftStreamState: { nextDraftId: number } | undefined; - -function getDraftStreamState(): { nextDraftId: number } { - draftStreamState ??= resolveGlobalSingleton(TELEGRAM_DRAFT_STREAM_STATE_KEY, () => ({ - nextDraftId: 0, - })); - return draftStreamState; -} +const draftStreamState = resolveGlobalSingleton(TELEGRAM_DRAFT_STREAM_STATE_KEY, () => ({ + nextDraftId: 0, +})); function allocateTelegramDraftId(): number { - const draftStreamState = getDraftStreamState(); draftStreamState.nextDraftId = draftStreamState.nextDraftId >= TELEGRAM_DRAFT_ID_MAX ? 1 : draftStreamState.nextDraftId + 1; return draftStreamState.nextDraftId; @@ -460,6 +453,6 @@ export function createTelegramDraftStream(params: { export const __testing = { resetTelegramDraftStreamForTests() { - getDraftStreamState().nextDraftId = 0; + draftStreamState.nextDraftId = 0; }, }; diff --git a/extensions/telegram/src/sent-message-cache.ts b/extensions/telegram/src/sent-message-cache.ts index f10f56b68f7..59101229d65 100644 --- a/extensions/telegram/src/sent-message-cache.ts +++ b/extensions/telegram/src/sent-message-cache.ts @@ -1,4 +1,4 @@ -import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime"; +import { createScopedExpiringIdCache, resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime"; /** * In-memory cache of sent message IDs per chat. @@ -7,71 +7,42 @@ import { resolveGlobalMap } from "openclaw/plugin-sdk/text-runtime"; const TTL_MS = 24 * 60 * 60 * 1000; // 24 hours -type CacheEntry = { - timestamps: Map; -}; - /** * Keep sent-message tracking shared across bundled chunks so Telegram reaction * filters see the same sent-message history regardless of which chunk recorded it. */ const TELEGRAM_SENT_MESSAGES_KEY = Symbol.for("openclaw.telegramSentMessages"); -let sentMessages: Map | undefined; +let sentMessages: Map> | undefined; -function getSentMessages(): Map { - sentMessages ??= resolveGlobalMap(TELEGRAM_SENT_MESSAGES_KEY); +function getSentMessages(): Map> { + sentMessages ??= resolveGlobalMap>(TELEGRAM_SENT_MESSAGES_KEY); return sentMessages; } -function getChatKey(chatId: number | string): string { - return String(chatId); -} - -function cleanupExpired(entry: CacheEntry): void { - const now = Date.now(); - for (const [msgId, timestamp] of entry.timestamps) { - if (now - timestamp > TTL_MS) { - entry.timestamps.delete(msgId); - } - } -} +const sentMessageCache = createScopedExpiringIdCache({ + store: getSentMessages(), + ttlMs: TTL_MS, + cleanupThreshold: 100, +}); /** * Record a message ID as sent by the bot. */ export function recordSentMessage(chatId: number | string, messageId: number): void { - const key = getChatKey(chatId); - const sentMessages = getSentMessages(); - let entry = sentMessages.get(key); - if (!entry) { - entry = { timestamps: new Map() }; - sentMessages.set(key, entry); - } - entry.timestamps.set(messageId, Date.now()); - // Periodic cleanup - if (entry.timestamps.size > 100) { - cleanupExpired(entry); - } + sentMessageCache.record(chatId, messageId); } /** * Check if a message was sent by the bot. */ export function wasSentByBot(chatId: number | string, messageId: number): boolean { - const key = getChatKey(chatId); - const entry = getSentMessages().get(key); - if (!entry) { - return false; - } - // Clean up expired entries on read - cleanupExpired(entry); - return entry.timestamps.has(messageId); + return sentMessageCache.has(chatId, messageId); } /** * Clear all cached entries (for testing). */ export function clearSentMessageCache(): void { - getSentMessages().clear(); + sentMessageCache.clear(); } diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index 7f6b74d0a2b..7972d0bfc97 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -77,18 +77,16 @@ type TelegramThreadBindingsState = { * binding lookups, and binding mutations all observe the same live registry. */ const TELEGRAM_THREAD_BINDINGS_STATE_KEY = Symbol.for("openclaw.telegramThreadBindingsState"); - -let threadBindingsState: TelegramThreadBindingsState | undefined; +const threadBindingsState = resolveGlobalSingleton( + TELEGRAM_THREAD_BINDINGS_STATE_KEY, + () => ({ + managersByAccountId: new Map(), + bindingsByAccountConversation: new Map(), + persistQueueByAccountId: new Map>(), + }), +); function getThreadBindingsState(): TelegramThreadBindingsState { - threadBindingsState ??= resolveGlobalSingleton( - TELEGRAM_THREAD_BINDINGS_STATE_KEY, - () => ({ - managersByAccountId: new Map(), - bindingsByAccountConversation: new Map(), - persistQueueByAccountId: new Map>(), - }), - ); return threadBindingsState; } diff --git a/src/plugin-sdk/text-runtime.ts b/src/plugin-sdk/text-runtime.ts index 5dd70cdcc3c..58d0fdb260a 100644 --- a/src/plugin-sdk/text-runtime.ts +++ b/src/plugin-sdk/text-runtime.ts @@ -10,6 +10,7 @@ export * from "../markdown/render.js"; export * from "../markdown/tables.js"; export * from "../markdown/whatsapp.js"; export * from "../shared/global-singleton.js"; +export * from "../shared/scoped-expiring-id-cache.js"; export * from "../shared/string-normalization.js"; export * from "../shared/string-sample.js"; export * from "../shared/text/assistant-visible-text.js";