diff --git a/extensions/slack/src/monitor/media-types.ts b/extensions/slack/src/monitor/media-types.ts new file mode 100644 index 00000000000..b5bb17ff7b4 --- /dev/null +++ b/extensions/slack/src/monitor/media-types.ts @@ -0,0 +1,7 @@ +export type SlackMediaResult = { + path: string; + contentType?: string; + placeholder: string; +}; + +export const MAX_SLACK_MEDIA_FILES = 8; diff --git a/extensions/slack/src/monitor/media.test.ts b/extensions/slack/src/monitor/media.test.ts index c0944976da6..e5112a9482d 100644 --- a/extensions/slack/src/monitor/media.test.ts +++ b/extensions/slack/src/monitor/media.test.ts @@ -9,7 +9,7 @@ import { } from "./media.js"; import type { FetchLike, SavedMedia } from "./media.runtime.js"; import * as mediaRuntime from "./media.runtime.js"; -import { logVerbose } from "./media.runtime.js"; +import { logVerbose } from "./thread.runtime.js"; type FetchMock = (input: RequestInfo | URL, init?: RequestInit) => Promise; @@ -67,6 +67,10 @@ vi.mock("./media.runtime.js", () => ({ saveMediaBuffer: saveMediaBufferMock, })); +vi.mock("./thread.runtime.js", () => ({ + logVerbose: logVerboseMock, +})); + function withFetchPreconnect(fetchMock: ReturnType>): typeof fetch { return Object.assign( ((input: RequestInfo | URL, init?: RequestInit) => fetchMock(input, init)) as typeof fetch, diff --git a/extensions/slack/src/monitor/media.ts b/extensions/slack/src/monitor/media.ts index 769d36117a2..89242beefc1 100644 --- a/extensions/slack/src/monitor/media.ts +++ b/extensions/slack/src/monitor/media.ts @@ -1,16 +1,21 @@ -import type { WebClient as SlackWebClient } from "@slack/web-api"; -import { pruneMapToMaxSize } from "openclaw/plugin-sdk/collection-runtime"; -import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { normalizeHostname } from "openclaw/plugin-sdk/host-runtime"; import { resolveRequestUrl } from "openclaw/plugin-sdk/request-url"; import type { SlackAttachment, SlackFile } from "../types.js"; +export { MAX_SLACK_MEDIA_FILES, type SlackMediaResult } from "./media-types.js"; +import { MAX_SLACK_MEDIA_FILES, type SlackMediaResult } from "./media-types.js"; import { type FetchLike, fetchRemoteMedia, fetchWithRuntimeDispatcher, - logVerbose, saveMediaBuffer, } from "./media.runtime.js"; +export { + resetSlackThreadStarterCacheForTest, + resolveSlackThreadHistory, + resolveSlackThreadStarter, + type SlackThreadMessage, + type SlackThreadStarter, +} from "./thread.js"; function normalizeLowercaseStringOrEmpty(value: unknown): string { return typeof value === "string" ? value.trim().toLowerCase() : ""; @@ -165,13 +170,6 @@ function looksLikeHtmlBuffer(buffer: Buffer): boolean { return head.startsWith("(); -const THREAD_STARTER_CACHE_TTL_MS = 6 * 60 * 60_000; -const THREAD_STARTER_CACHE_MAX = 2000; - -function evictThreadStarterCache(): void { - const now = Date.now(); - for (const [cacheKey, entry] of THREAD_STARTER_CACHE.entries()) { - if (now - entry.cachedAt > THREAD_STARTER_CACHE_TTL_MS) { - THREAD_STARTER_CACHE.delete(cacheKey); - } - } - pruneMapToMaxSize(THREAD_STARTER_CACHE, THREAD_STARTER_CACHE_MAX); -} - -function formatSlackFilePlaceholder(files: SlackFile[] | undefined): string { - return `[attached: ${files?.map((file) => file.name ?? "file").join(", ") ?? "file"}]`; -} - -export async function resolveSlackThreadStarter(params: { - channelId: string; - threadTs: string; - client: SlackWebClient; -}): Promise { - evictThreadStarterCache(); - const cacheKey = `${params.channelId}:${params.threadTs}`; - const cached = THREAD_STARTER_CACHE.get(cacheKey); - if (cached && Date.now() - cached.cachedAt <= THREAD_STARTER_CACHE_TTL_MS) { - return cached.value; - } - if (cached) { - THREAD_STARTER_CACHE.delete(cacheKey); - } - try { - const response = (await params.client.conversations.replies({ - channel: params.channelId, - ts: params.threadTs, - limit: 1, - inclusive: true, - })) as { - messages?: Array<{ - text?: string; - user?: string; - bot_id?: string; - ts?: string; - files?: SlackFile[]; - }>; - }; - const message = response?.messages?.[0]; - const text = (message?.text ?? "").trim(); - const files = message?.files?.length ? message.files : undefined; - if (!message || (!text && !files)) { - return null; - } - const starter: SlackThreadStarter = { - text: text || formatSlackFilePlaceholder(files), - userId: message.user, - botId: message.bot_id, - ts: message.ts, - files, - }; - if (THREAD_STARTER_CACHE.has(cacheKey)) { - THREAD_STARTER_CACHE.delete(cacheKey); - } - THREAD_STARTER_CACHE.set(cacheKey, { - value: starter, - cachedAt: Date.now(), - }); - evictThreadStarterCache(); - return starter; - } catch (err) { - logVerbose( - `slack thread starter fetch failed channel=${params.channelId} ts=${params.threadTs}: ${formatErrorMessage(err)}`, - ); - return null; - } -} - -export function resetSlackThreadStarterCacheForTest(): void { - THREAD_STARTER_CACHE.clear(); -} - -export type SlackThreadMessage = { - text: string; - userId?: string; - ts?: string; - botId?: string; - files?: SlackFile[]; -}; - -type SlackRepliesPageMessage = { - text?: string; - user?: string; - bot_id?: string; - ts?: string; - files?: SlackFile[]; -}; - -type SlackRepliesPage = { - messages?: SlackRepliesPageMessage[]; - response_metadata?: { next_cursor?: string }; -}; - -/** - * Fetches the most recent messages in a Slack thread (excluding the current message). - * Used to populate thread context when a new thread session starts. - * - * Uses cursor pagination and keeps only the latest N retained messages so long threads - * still produce up-to-date context without unbounded memory growth. - */ -export async function resolveSlackThreadHistory(params: { - channelId: string; - threadTs: string; - client: SlackWebClient; - currentMessageTs?: string; - limit?: number; -}): Promise { - const maxMessages = params.limit ?? 20; - if (!Number.isFinite(maxMessages) || maxMessages <= 0) { - return []; - } - - // Slack recommends no more than 200 per page. - const fetchLimit = 200; - const retained: SlackRepliesPageMessage[] = []; - let cursor: string | undefined; - - try { - do { - const response = (await params.client.conversations.replies({ - channel: params.channelId, - ts: params.threadTs, - limit: fetchLimit, - inclusive: true, - ...(cursor ? { cursor } : {}), - })) as SlackRepliesPage; - - for (const msg of response.messages ?? []) { - // Keep messages with text OR file attachments - if (!msg.text?.trim() && !msg.files?.length) { - continue; - } - if (params.currentMessageTs && msg.ts === params.currentMessageTs) { - continue; - } - retained.push(msg); - if (retained.length > maxMessages) { - retained.shift(); - } - } - - const next = response.response_metadata?.next_cursor; - cursor = typeof next === "string" && next.trim().length > 0 ? next.trim() : undefined; - } while (cursor); - - return retained.map((msg) => ({ - // For file-only messages, create a placeholder showing attached filenames - text: msg.text?.trim() ? msg.text : formatSlackFilePlaceholder(msg.files), - userId: msg.user, - botId: msg.bot_id, - ts: msg.ts, - files: msg.files, - })); - } catch (err) { - logVerbose( - `slack thread history fetch failed channel=${params.channelId} ts=${params.threadTs}: ${formatErrorMessage(err)}`, - ); - return []; - } -} diff --git a/extensions/slack/src/monitor/message-handler/prepare-content.ts b/extensions/slack/src/monitor/message-handler/prepare-content.ts index 18d02cbdcff..1ff718459d0 100644 --- a/extensions/slack/src/monitor/message-handler/prepare-content.ts +++ b/extensions/slack/src/monitor/message-handler/prepare-content.ts @@ -2,13 +2,8 @@ import { runTasksWithConcurrency } from "openclaw/plugin-sdk/infra-runtime"; import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; import { normalizeOptionalString } from "openclaw/plugin-sdk/text-runtime"; import type { SlackFile, SlackMessageEvent } from "../../types.js"; -import { - MAX_SLACK_MEDIA_FILES, - resolveSlackAttachmentContent, - resolveSlackMedia, - type SlackMediaResult, - type SlackThreadStarter, -} from "../media.js"; +import { MAX_SLACK_MEDIA_FILES, type SlackMediaResult } from "../media-types.js"; +import type { SlackThreadStarter } from "../thread.js"; export type SlackResolvedMessageContent = { rawBody: string; @@ -18,6 +13,14 @@ export type SlackResolvedMessageContent = { const SLACK_MENTION_RESOLUTION_CONCURRENCY = 4; const SLACK_MENTION_RESOLUTION_MAX_LOOKUPS_PER_MESSAGE = 20; +type SlackMediaModule = typeof import("../media.js"); +let slackMediaModulePromise: Promise | undefined; + +function loadSlackMediaModule(): Promise { + slackMediaModulePromise ??= import("../media.js"); + return slackMediaModulePromise; +} + function collectUniqueSlackMentionIds(texts: Array): string[] { const seen = new Set(); const mentionIds: string[] = []; @@ -87,17 +90,29 @@ export async function resolveSlackMessageContent(params: { threadStarter: params.threadStarter, }); - const media = await resolveSlackMedia({ - files: ownFiles, - token: params.botToken, - maxBytes: params.mediaMaxBytes, - }); + const media = + ownFiles && ownFiles.length > 0 + ? await (async () => { + const { resolveSlackMedia } = await loadSlackMediaModule(); + return resolveSlackMedia({ + files: ownFiles, + token: params.botToken, + maxBytes: params.mediaMaxBytes, + }); + })() + : null; - const attachmentContent = await resolveSlackAttachmentContent({ - attachments: params.message.attachments, - token: params.botToken, - maxBytes: params.mediaMaxBytes, - }); + const attachmentContent = + params.message.attachments && params.message.attachments.length > 0 + ? await (async () => { + const { resolveSlackAttachmentContent } = await loadSlackMediaModule(); + return resolveSlackAttachmentContent({ + attachments: params.message.attachments, + token: params.botToken, + maxBytes: params.mediaMaxBytes, + }); + })() + : null; const mergedMedia = [...(media ?? []), ...(attachmentContent?.media ?? [])]; const effectiveDirectMedia = mergedMedia.length > 0 ? mergedMedia : null; diff --git a/extensions/slack/src/monitor/message-handler/prepare-thread-context.ts b/extensions/slack/src/monitor/message-handler/prepare-thread-context.ts index a929df13ff2..3360381a99f 100644 --- a/extensions/slack/src/monitor/message-handler/prepare-thread-context.ts +++ b/extensions/slack/src/monitor/message-handler/prepare-thread-context.ts @@ -10,12 +10,16 @@ import type { SlackMessageEvent } from "../../types.js"; import { resolveSlackAllowListMatch } from "../allow-list.js"; import { readSessionUpdatedAt } from "../config.runtime.js"; import type { SlackMonitorContext } from "../context.js"; -import { - resolveSlackMedia, - resolveSlackThreadHistory, - type SlackMediaResult, - type SlackThreadStarter, -} from "../media.js"; +import type { SlackMediaResult } from "../media-types.js"; +import { resolveSlackThreadHistory, type SlackThreadStarter } from "../thread.js"; + +type SlackMediaModule = typeof import("../media.js"); +let slackMediaModulePromise: Promise | undefined; + +function loadSlackMediaModule(): Promise { + slackMediaModulePromise ??= import("../media.js"); + return slackMediaModulePromise; +} export type SlackThreadContextData = { threadStarterBody: string | undefined; @@ -122,6 +126,7 @@ export async function resolveSlackThreadContextData(params: { const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80); threadLabel = `Slack thread ${params.roomLabel}${snippet ? `: ${snippet}` : ""}`; if (!params.effectiveDirectMedia && starter.files && starter.files.length > 0) { + const { resolveSlackMedia } = await loadSlackMediaModule(); threadStarterMedia = await resolveSlackMedia({ files: starter.files, token: params.ctx.botToken, diff --git a/extensions/slack/src/monitor/message-handler/prepare.test.ts b/extensions/slack/src/monitor/message-handler/prepare.test.ts index 08d2c8bfbb7..88fd7606ce2 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.test.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.test.ts @@ -14,7 +14,7 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vites import type { ResolvedSlackAccount } from "../../accounts.js"; import type { SlackMessageEvent } from "../../types.js"; import type { SlackMonitorContext } from "../context.js"; -import { resetSlackThreadStarterCacheForTest } from "../media.js"; +import { resetSlackThreadStarterCacheForTest } from "../thread.js"; import { resolveSlackMessageContent } from "./prepare-content.js"; import { prepareSlackMessage } from "./prepare.js"; import { diff --git a/extensions/slack/src/monitor/message-handler/prepare.ts b/extensions/slack/src/monitor/message-handler/prepare.ts index 314b7c4a03c..37a8c172b33 100644 --- a/extensions/slack/src/monitor/message-handler/prepare.ts +++ b/extensions/slack/src/monitor/message-handler/prepare.ts @@ -54,9 +54,9 @@ import { } from "../context.js"; import { recordInboundSession, resolveConversationLabel } from "../conversation.runtime.js"; import { authorizeSlackDirectMessage } from "../dm-auth.js"; -import { resolveSlackThreadStarter } from "../media.js"; import { resolveSlackRoomContextHints } from "../room-context.js"; import { sendMessageSlack } from "../send.runtime.js"; +import { resolveSlackThreadStarter } from "../thread.js"; import { resolveSlackMessageContent } from "./prepare-content.js"; import { resolveSlackRoutingContext } from "./prepare-routing.js"; import { resolveSlackThreadContextData } from "./prepare-thread-context.js"; diff --git a/extensions/slack/src/monitor/monitor.media.test.ts b/extensions/slack/src/monitor/monitor.media.test.ts index 3537b374b03..feb9f5871c3 100644 --- a/extensions/slack/src/monitor/monitor.media.test.ts +++ b/extensions/slack/src/monitor/monitor.media.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, expect, it, vi } from "vitest"; -import { resetSlackThreadStarterCacheForTest, resolveSlackThreadStarter } from "./media.js"; +import { resetSlackThreadStarterCacheForTest, resolveSlackThreadStarter } from "./thread.js"; type ThreadStarterClient = Parameters[0]["client"]; diff --git a/extensions/slack/src/monitor/thread.runtime.ts b/extensions/slack/src/monitor/thread.runtime.ts new file mode 100644 index 00000000000..a557af564d3 --- /dev/null +++ b/extensions/slack/src/monitor/thread.runtime.ts @@ -0,0 +1 @@ +export { logVerbose } from "openclaw/plugin-sdk/runtime-env"; diff --git a/extensions/slack/src/monitor/thread.ts b/extensions/slack/src/monitor/thread.ts new file mode 100644 index 00000000000..a24117ee53c --- /dev/null +++ b/extensions/slack/src/monitor/thread.ts @@ -0,0 +1,188 @@ +import type { WebClient as SlackWebClient } from "@slack/web-api"; +import { pruneMapToMaxSize } from "openclaw/plugin-sdk/collection-runtime"; +import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; +import type { SlackFile } from "../types.js"; +import { logVerbose } from "./thread.runtime.js"; + +export type SlackThreadStarter = { + text: string; + userId?: string; + botId?: string; + ts?: string; + files?: SlackFile[]; +}; + +type SlackThreadStarterCacheEntry = { + value: SlackThreadStarter; + cachedAt: number; +}; + +const THREAD_STARTER_CACHE = new Map(); +const THREAD_STARTER_CACHE_TTL_MS = 6 * 60 * 60_000; +const THREAD_STARTER_CACHE_MAX = 2000; + +function evictThreadStarterCache(): void { + const now = Date.now(); + for (const [cacheKey, entry] of THREAD_STARTER_CACHE.entries()) { + if (now - entry.cachedAt > THREAD_STARTER_CACHE_TTL_MS) { + THREAD_STARTER_CACHE.delete(cacheKey); + } + } + pruneMapToMaxSize(THREAD_STARTER_CACHE, THREAD_STARTER_CACHE_MAX); +} + +function formatSlackFilePlaceholder(files: SlackFile[] | undefined): string { + return `[attached: ${files?.map((file) => file.name ?? "file").join(", ") ?? "file"}]`; +} + +export async function resolveSlackThreadStarter(params: { + channelId: string; + threadTs: string; + client: SlackWebClient; +}): Promise { + evictThreadStarterCache(); + const cacheKey = `${params.channelId}:${params.threadTs}`; + const cached = THREAD_STARTER_CACHE.get(cacheKey); + if (cached && Date.now() - cached.cachedAt <= THREAD_STARTER_CACHE_TTL_MS) { + return cached.value; + } + if (cached) { + THREAD_STARTER_CACHE.delete(cacheKey); + } + try { + const response = (await params.client.conversations.replies({ + channel: params.channelId, + ts: params.threadTs, + limit: 1, + inclusive: true, + })) as { + messages?: Array<{ + text?: string; + user?: string; + bot_id?: string; + ts?: string; + files?: SlackFile[]; + }>; + }; + const message = response?.messages?.[0]; + const text = (message?.text ?? "").trim(); + const files = message?.files?.length ? message.files : undefined; + if (!message || (!text && !files)) { + return null; + } + const starter: SlackThreadStarter = { + text: text || formatSlackFilePlaceholder(files), + userId: message.user, + botId: message.bot_id, + ts: message.ts, + files, + }; + if (THREAD_STARTER_CACHE.has(cacheKey)) { + THREAD_STARTER_CACHE.delete(cacheKey); + } + THREAD_STARTER_CACHE.set(cacheKey, { + value: starter, + cachedAt: Date.now(), + }); + evictThreadStarterCache(); + return starter; + } catch (err) { + logVerbose( + `slack thread starter fetch failed channel=${params.channelId} ts=${params.threadTs}: ${formatErrorMessage(err)}`, + ); + return null; + } +} + +export function resetSlackThreadStarterCacheForTest(): void { + THREAD_STARTER_CACHE.clear(); +} + +export type SlackThreadMessage = { + text: string; + userId?: string; + ts?: string; + botId?: string; + files?: SlackFile[]; +}; + +type SlackRepliesPageMessage = { + text?: string; + user?: string; + bot_id?: string; + ts?: string; + files?: SlackFile[]; +}; + +type SlackRepliesPage = { + messages?: SlackRepliesPageMessage[]; + response_metadata?: { next_cursor?: string }; +}; + +/** + * Fetches the most recent messages in a Slack thread (excluding the current message). + * Used to populate thread context when a new thread session starts. + * + * Uses cursor pagination and keeps only the latest N retained messages so long threads + * still produce up-to-date context without unbounded memory growth. + */ +export async function resolveSlackThreadHistory(params: { + channelId: string; + threadTs: string; + client: SlackWebClient; + currentMessageTs?: string; + limit?: number; +}): Promise { + const maxMessages = params.limit ?? 20; + if (!Number.isFinite(maxMessages) || maxMessages <= 0) { + return []; + } + + // Slack recommends no more than 200 per page. + const fetchLimit = 200; + const retained: SlackRepliesPageMessage[] = []; + let cursor: string | undefined; + + try { + do { + const response = (await params.client.conversations.replies({ + channel: params.channelId, + ts: params.threadTs, + limit: fetchLimit, + inclusive: true, + ...(cursor ? { cursor } : {}), + })) as SlackRepliesPage; + + for (const msg of response.messages ?? []) { + // Keep messages with text OR file attachments. + if (!msg.text?.trim() && !msg.files?.length) { + continue; + } + if (params.currentMessageTs && msg.ts === params.currentMessageTs) { + continue; + } + retained.push(msg); + if (retained.length > maxMessages) { + retained.shift(); + } + } + + const next = response.response_metadata?.next_cursor; + cursor = typeof next === "string" && next.trim().length > 0 ? next.trim() : undefined; + } while (cursor); + + return retained.map((msg) => ({ + // For file-only messages, create a placeholder showing attached filenames. + text: msg.text?.trim() ? msg.text : formatSlackFilePlaceholder(msg.files), + userId: msg.user, + botId: msg.bot_id, + ts: msg.ts, + files: msg.files, + })); + } catch (err) { + logVerbose( + `slack thread history fetch failed channel=${params.channelId} ts=${params.threadTs}: ${formatErrorMessage(err)}`, + ); + return []; + } +}