diff --git a/extensions/mattermost/src/mattermost/monitor.test.ts b/extensions/mattermost/src/mattermost/monitor.test.ts index 06fe1abc1bb..3394dc2c995 100644 --- a/extensions/mattermost/src/mattermost/monitor.test.ts +++ b/extensions/mattermost/src/mattermost/monitor.test.ts @@ -1,8 +1,11 @@ +import { createClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../runtime-api.js"; import { resolveMattermostAccount } from "./accounts.js"; import { evaluateMattermostMentionGate, + MattermostRetryableInboundError, + processMattermostReplayGuardedPost, resolveMattermostReactionChannelId, resolveMattermostEffectiveReplyToId, resolveMattermostReplyRootId, @@ -281,6 +284,100 @@ describe("resolveMattermostThreadSessionContext", () => { }); }); +describe("processMattermostReplayGuardedPost", () => { + it("skips duplicate message batches after a successful commit", async () => { + const replayGuard = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + }); + const handlePost = vi.fn(async () => undefined); + + await expect( + processMattermostReplayGuardedPost({ + replayGuard, + accountId: "acct", + messageIds: ["post-1"], + handlePost, + }), + ).resolves.toBe("processed"); + await expect( + processMattermostReplayGuardedPost({ + replayGuard, + accountId: "acct", + messageIds: ["post-1"], + handlePost, + }), + ).resolves.toBe("duplicate"); + + expect(handlePost).toHaveBeenCalledTimes(1); + }); + + it("releases claims for explicit retryable failures", async () => { + const replayGuard = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + }); + let attempts = 0; + const handlePost = vi.fn(async () => { + attempts += 1; + if (attempts === 1) { + throw new MattermostRetryableInboundError("retry me"); + } + }); + + await expect( + processMattermostReplayGuardedPost({ + replayGuard, + accountId: "acct", + messageIds: ["post-2"], + handlePost, + }), + ).rejects.toThrow("retry me"); + await expect( + processMattermostReplayGuardedPost({ + replayGuard, + accountId: "acct", + messageIds: ["post-2"], + handlePost, + }), + ).resolves.toBe("processed"); + + expect(handlePost).toHaveBeenCalledTimes(2); + }); + + it("keeps replay committed after a non-retryable failure", async () => { + const replayGuard = createClaimableDedupe({ + ttlMs: 10_000, + memoryMaxSize: 100, + }); + const visibleSideEffect = vi.fn(); + const handlePost = vi.fn(async () => { + visibleSideEffect(); + throw new Error("post-send failure"); + }); + + await expect( + processMattermostReplayGuardedPost({ + replayGuard, + accountId: "acct", + messageIds: ["post-3"], + handlePost, + }), + ).rejects.toThrow("post-send failure"); + await expect( + processMattermostReplayGuardedPost({ + replayGuard, + accountId: "acct", + messageIds: ["post-3"], + handlePost, + }), + ).resolves.toBe("duplicate"); + + expect(handlePost).toHaveBeenCalledTimes(1); + expect(visibleSideEffect).toHaveBeenCalledTimes(1); + }); +}); + describe("resolveMattermostReactionChannelId", () => { it("prefers broadcast channel_id when present", () => { expect( diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 7692e3c7a0c..bf1279596bc 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1,3 +1,4 @@ +import { createClaimableDedupe, type ClaimableDedupe } from "openclaw/plugin-sdk/persistent-dedupe"; import { isPrivateNetworkOptInEnabled } from "openclaw/plugin-sdk/ssrf-runtime"; import { normalizeLowercaseStringOrEmpty, @@ -37,7 +38,6 @@ import { mapMattermostChannelTypeToChatType, } from "./monitor-gating.js"; import { - createDedupeCache, formatInboundFromLabel, normalizeMention, resolveThreadSessionKeys, @@ -127,11 +127,68 @@ function normalizeInteractionSourceIps(values?: string[]): string[] { .filter((value): value is string => Boolean(value)); } -const recentInboundMessages = createDedupeCache({ +const recentInboundMessages = createClaimableDedupe({ ttlMs: RECENT_MATTERMOST_MESSAGE_TTL_MS, - maxSize: RECENT_MATTERMOST_MESSAGE_MAX, + memoryMaxSize: RECENT_MATTERMOST_MESSAGE_MAX, }); +export class MattermostRetryableInboundError extends Error { + constructor(message: string, options?: ErrorOptions) { + super(message, options); + this.name = "MattermostRetryableInboundError"; + } +} + +function buildMattermostInboundReplayKeys(params: { + accountId: string; + messageIds: string[]; +}): string[] { + return [...new Set(params.messageIds.map((id) => `${params.accountId}:${id.trim()}`))].filter( + (key) => !key.endsWith(":"), + ); +} + +export async function processMattermostReplayGuardedPost(params: { + accountId: string; + messageIds: string[]; + handlePost: () => Promise; + replayGuard?: ClaimableDedupe; +}): Promise<"processed" | "duplicate"> { + const replayGuard = params.replayGuard ?? recentInboundMessages; + const replayKeys = buildMattermostInboundReplayKeys({ + accountId: params.accountId, + messageIds: params.messageIds, + }); + if (replayKeys.length === 0) { + await params.handlePost(); + return "processed"; + } + + const claimedKeys: string[] = []; + for (const replayKey of replayKeys) { + const claim = await replayGuard.claim(replayKey); + if (claim.kind === "claimed") { + claimedKeys.push(replayKey); + } + } + if (claimedKeys.length === 0) { + return "duplicate"; + } + + try { + await params.handlePost(); + await Promise.all(claimedKeys.map((replayKey) => replayGuard.commit(replayKey))); + return "processed"; + } catch (error) { + if (error instanceof MattermostRetryableInboundError) { + claimedKeys.forEach((replayKey) => replayGuard.release(replayKey, { error })); + } else { + await Promise.all(claimedKeys.map((replayKey) => replayGuard.commit(replayKey))); + } + throw error; + } +} + function resolveRuntime(opts: MonitorMattermostOpts): RuntimeEnv { return ( opts.runtime ?? { @@ -1013,491 +1070,497 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} logVerboseMessage("mattermost: drop post (missing message id)"); return; } - const dedupeEntries = allMessageIds.map((id) => - recentInboundMessages.check(`${account.accountId}:${id}`), - ); - if (dedupeEntries.length > 0 && dedupeEntries.every(Boolean)) { + const replayResult = await processMattermostReplayGuardedPost({ + accountId: account.accountId, + messageIds: allMessageIds, + handlePost: async () => { + const senderId = post.user_id ?? payload.broadcast?.user_id; + if (!senderId) { + logVerboseMessage("mattermost: drop post (missing sender id)"); + return; + } + if (senderId === botUserId) { + logVerboseMessage(`mattermost: drop post (self sender=${senderId})`); + return; + } + if (isSystemPost(post)) { + logVerboseMessage(`mattermost: drop post (system post type=${post.type ?? "unknown"})`); + return; + } + + const channelInfo = await resolveChannelInfo(channelId); + const channelType = payload.data?.channel_type ?? channelInfo?.type ?? undefined; + const kind = mapMattermostChannelTypeToChatType(channelType); + const chatType = channelChatType(kind); + + const senderName = + normalizeOptionalString(payload.data?.sender_name) ?? + normalizeOptionalString((await resolveUserInfo(senderId))?.username) ?? + senderId; + const rawText = normalizeOptionalString(post.message) ?? ""; + const dmPolicy = account.config.dmPolicy ?? "pairing"; + const normalizedAllowFrom = normalizeMattermostAllowList(account.config.allowFrom ?? []); + const normalizedGroupAllowFrom = normalizeMattermostAllowList( + account.config.groupAllowFrom ?? [], + ); + const storeAllowFrom = normalizeMattermostAllowList( + await readStoreAllowFromForDmPolicy({ + provider: "mattermost", + accountId: account.accountId, + dmPolicy, + readStore: pairing.readStoreForDmPolicy, + }), + ); + const accessDecision = resolveDmGroupAccessWithLists({ + isGroup: kind !== "direct", + dmPolicy, + groupPolicy, + allowFrom: normalizedAllowFrom, + groupAllowFrom: normalizedGroupAllowFrom, + storeAllowFrom, + isSenderAllowed: (allowFrom) => + isMattermostSenderAllowed({ + senderId, + senderName, + allowFrom, + allowNameMatching, + }), + }); + const effectiveAllowFrom = accessDecision.effectiveAllowFrom; + const effectiveGroupAllowFrom = accessDecision.effectiveGroupAllowFrom; + const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ + cfg, + surface: "mattermost", + }); + const hasControlCommand = core.channel.text.hasControlCommand(rawText, cfg); + const isControlCommand = allowTextCommands && hasControlCommand; + const useAccessGroups = cfg.commands?.useAccessGroups !== false; + const commandDmAllowFrom = kind === "direct" ? effectiveAllowFrom : normalizedAllowFrom; + const senderAllowedForCommands = isMattermostSenderAllowed({ + senderId, + senderName, + allowFrom: commandDmAllowFrom, + allowNameMatching, + }); + const groupAllowedForCommands = isMattermostSenderAllowed({ + senderId, + senderName, + allowFrom: effectiveGroupAllowFrom, + allowNameMatching, + }); + const commandGate = resolveControlCommandGate({ + useAccessGroups, + authorizers: [ + { configured: commandDmAllowFrom.length > 0, allowed: senderAllowedForCommands }, + { + configured: effectiveGroupAllowFrom.length > 0, + allowed: groupAllowedForCommands, + }, + ], + allowTextCommands, + hasControlCommand, + }); + const commandAuthorized = commandGate.commandAuthorized; + + if (accessDecision.decision !== "allow") { + if (kind === "direct") { + if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) { + logVerboseMessage(`mattermost: drop dm (dmPolicy=disabled sender=${senderId})`); + return; + } + if (accessDecision.decision === "pairing") { + const { code, created } = await pairing.upsertPairingRequest({ + id: senderId, + meta: { name: senderName }, + }); + logVerboseMessage( + `mattermost: pairing request sender=${senderId} created=${created}`, + ); + if (created) { + try { + await sendMessageMattermost( + `user:${senderId}`, + core.channel.pairing.buildPairingReply({ + channel: "mattermost", + idLine: `Your Mattermost user id: ${senderId}`, + code, + }), + { cfg, accountId: account.accountId }, + ); + opts.statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { + logVerboseMessage( + `mattermost: pairing reply failed for ${senderId}: ${String(err)}`, + ); + } + } + return; + } + logVerboseMessage(`mattermost: drop dm sender=${senderId} (dmPolicy=${dmPolicy})`); + return; + } + if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_DISABLED) { + logVerboseMessage("mattermost: drop group message (groupPolicy=disabled)"); + return; + } + if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) { + logVerboseMessage("mattermost: drop group message (no group allowlist)"); + return; + } + if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) { + logVerboseMessage(`mattermost: drop group sender=${senderId} (not in groupAllowFrom)`); + return; + } + logVerboseMessage( + `mattermost: drop group message (groupPolicy=${groupPolicy} reason=${accessDecision.reason})`, + ); + return; + } + + if (kind !== "direct" && commandGate.shouldBlock) { + logInboundDrop({ + log: logVerboseMessage, + channel: "mattermost", + reason: "control command (unauthorized)", + target: senderId, + }); + return; + } + + const teamId = payload.data?.team_id ?? channelInfo?.team_id ?? undefined; + const channelName = payload.data?.channel_name ?? channelInfo?.name ?? ""; + const channelDisplay = + payload.data?.channel_display_name ?? channelInfo?.display_name ?? channelName; + const roomLabel = channelName ? `#${channelName}` : channelDisplay || `#${channelId}`; + + const route = core.channel.routing.resolveAgentRoute({ + cfg, + channel: "mattermost", + accountId: account.accountId, + teamId, + peer: { + kind, + id: kind === "direct" ? senderId : channelId, + }, + }); + + const baseSessionKey = route.sessionKey; + const threadRootId = normalizeOptionalString(post.root_id); + const replyToMode = resolveMattermostReplyToMode(account, kind); + const threadContext = resolveMattermostThreadSessionContext({ + baseSessionKey, + kind, + postId: post.id, + replyToMode, + threadRootId, + }); + const { effectiveReplyToId, sessionKey, parentSessionKey } = threadContext; + const historyKey = kind === "direct" ? null : sessionKey; + + const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, route.agentId); + const wasMentioned = + kind !== "direct" && + ((botUsername + ? normalizeLowercaseStringOrEmpty(rawText).includes( + `@${normalizeLowercaseStringOrEmpty(botUsername)}`, + ) + : false) || + core.channel.mentions.matchesMentionPatterns(rawText, mentionRegexes)); + const pendingBody = + rawText || + (post.file_ids?.length + ? `[Mattermost ${post.file_ids.length === 1 ? "file" : "files"}]` + : ""); + const pendingSender = senderName; + const recordPendingHistory = () => { + const trimmed = pendingBody.trim(); + recordPendingHistoryEntryIfEnabled({ + historyMap: channelHistories, + limit: historyLimit, + historyKey: historyKey ?? "", + entry: + historyKey && trimmed + ? { + sender: pendingSender, + body: trimmed, + timestamp: typeof post.create_at === "number" ? post.create_at : undefined, + messageId: post.id ?? undefined, + } + : null, + }); + }; + + const oncharEnabled = account.chatmode === "onchar" && kind !== "direct"; + const oncharPrefixes = oncharEnabled ? resolveOncharPrefixes(account.oncharPrefixes) : []; + const oncharResult = oncharEnabled + ? stripOncharPrefix(rawText, oncharPrefixes) + : { triggered: false, stripped: rawText }; + const oncharTriggered = oncharResult.triggered; + const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0; + const mentionDecision = evaluateMattermostMentionGate({ + kind, + cfg, + accountId: account.accountId, + channelId, + threadRootId, + requireMentionOverride: account.requireMention, + resolveRequireMention: core.channel.groups.resolveRequireMention, + wasMentioned, + isControlCommand, + commandAuthorized, + oncharEnabled, + oncharTriggered, + canDetectMention, + }); + const { shouldRequireMention, shouldBypassMention } = mentionDecision; + + if (mentionDecision.dropReason === "onchar-not-triggered") { + logVerboseMessage( + `mattermost: drop group message (onchar not triggered channel=${channelId} sender=${senderId})`, + ); + recordPendingHistory(); + return; + } + + if (mentionDecision.dropReason === "missing-mention") { + logVerboseMessage( + `mattermost: drop group message (missing mention channel=${channelId} sender=${senderId} requireMention=${shouldRequireMention} bypass=${shouldBypassMention} canDetectMention=${canDetectMention})`, + ); + recordPendingHistory(); + return; + } + const mediaList = await resolveMattermostMedia(post.file_ids); + const mediaPlaceholder = buildMattermostAttachmentPlaceholder(mediaList); + const bodySource = oncharTriggered ? oncharResult.stripped : rawText; + const baseText = [bodySource, mediaPlaceholder].filter(Boolean).join("\n").trim(); + const bodyText = normalizeMention(baseText, botUsername); + if (!bodyText) { + logVerboseMessage( + `mattermost: drop group message (empty body after normalization channel=${channelId} sender=${senderId})`, + ); + return; + } + + core.channel.activity.record({ + channel: "mattermost", + accountId: account.accountId, + direction: "inbound", + }); + + const fromLabel = formatInboundFromLabel({ + isGroup: kind !== "direct", + groupLabel: channelDisplay || roomLabel, + groupId: channelId, + groupFallback: roomLabel || "Channel", + directLabel: senderName, + directId: senderId, + }); + + const preview = bodyText.replace(/\s+/g, " ").slice(0, 160); + const inboundLabel = + kind === "direct" + ? `Mattermost DM from ${senderName}` + : `Mattermost message in ${roomLabel} from ${senderName}`; + core.system.enqueueSystemEvent(`${inboundLabel}: ${preview}`, { + sessionKey, + contextKey: `mattermost:message:${channelId}:${post.id ?? "unknown"}`, + }); + + const textWithId = `${bodyText}\n[mattermost message id: ${post.id ?? "unknown"} channel: ${channelId}]`; + const body = core.channel.reply.formatInboundEnvelope({ + channel: "Mattermost", + from: fromLabel, + timestamp: typeof post.create_at === "number" ? post.create_at : undefined, + body: textWithId, + chatType, + sender: { name: senderName, id: senderId }, + }); + let combinedBody = body; + if (historyKey) { + combinedBody = buildPendingHistoryContextFromMap({ + historyMap: channelHistories, + historyKey, + limit: historyLimit, + currentMessage: combinedBody, + formatEntry: (entry) => + core.channel.reply.formatInboundEnvelope({ + channel: "Mattermost", + from: fromLabel, + timestamp: entry.timestamp, + body: `${entry.body}${ + entry.messageId ? ` [id:${entry.messageId} channel:${channelId}]` : "" + }`, + chatType, + senderLabel: entry.sender, + }), + }); + } + + const to = kind === "direct" ? `user:${senderId}` : `channel:${channelId}`; + const mediaPayload = buildAgentMediaPayload(mediaList); + const commandBody = rawText.trim(); + const inboundHistory = + historyKey && historyLimit > 0 + ? (channelHistories.get(historyKey) ?? []).map((entry) => ({ + sender: entry.sender, + body: entry.body, + timestamp: entry.timestamp, + })) + : undefined; + const ctxPayload = core.channel.reply.finalizeInboundContext({ + Body: combinedBody, + BodyForAgent: bodyText, + InboundHistory: inboundHistory, + RawBody: bodyText, + CommandBody: commandBody, + BodyForCommands: commandBody, + From: + kind === "direct" + ? `mattermost:${senderId}` + : kind === "group" + ? `mattermost:group:${channelId}` + : `mattermost:channel:${channelId}`, + To: to, + SessionKey: sessionKey, + ParentSessionKey: parentSessionKey, + AccountId: route.accountId, + ChatType: chatType, + ConversationLabel: fromLabel, + GroupSubject: kind !== "direct" ? channelDisplay || roomLabel : undefined, + GroupChannel: channelName ? `#${channelName}` : undefined, + GroupSpace: teamId, + SenderName: senderName, + SenderId: senderId, + Provider: "mattermost" as const, + Surface: "mattermost" as const, + MessageSid: post.id ?? undefined, + MessageSids: allMessageIds.length > 1 ? allMessageIds : undefined, + MessageSidFirst: allMessageIds.length > 1 ? allMessageIds[0] : undefined, + MessageSidLast: + allMessageIds.length > 1 ? allMessageIds[allMessageIds.length - 1] : undefined, + ReplyToId: effectiveReplyToId, + MessageThreadId: effectiveReplyToId, + Timestamp: typeof post.create_at === "number" ? post.create_at : undefined, + WasMentioned: kind !== "direct" ? mentionDecision.effectiveWasMentioned : undefined, + CommandAuthorized: commandAuthorized, + OriginatingChannel: "mattermost" as const, + OriginatingTo: to, + ...mediaPayload, + }); + + if (kind === "direct") { + const sessionCfg = cfg.session; + const storePath = core.channel.session.resolveStorePath(sessionCfg?.store, { + agentId: route.agentId, + }); + await core.channel.session.updateLastRoute({ + storePath, + sessionKey: route.mainSessionKey, + deliveryContext: { + channel: "mattermost", + to, + accountId: route.accountId, + }, + }); + } + + const previewLine = bodyText.slice(0, 200).replace(/\n/g, "\\n"); + logVerboseMessage( + `mattermost inbound: from=${ctxPayload.From} len=${bodyText.length} preview="${previewLine}"`, + ); + + const textLimit = core.channel.text.resolveTextChunkLimit( + cfg, + "mattermost", + account.accountId, + { + fallbackLimit: account.textChunkLimit ?? 4000, + }, + ); + const tableMode = core.channel.text.resolveMarkdownTableMode({ + cfg, + channel: "mattermost", + accountId: account.accountId, + }); + + const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({ + cfg, + agentId: route.agentId, + channel: "mattermost", + accountId: account.accountId, + typing: { + start: () => sendTypingIndicator(channelId, effectiveReplyToId), + onStartError: (err) => { + logTypingFailure({ + log: (message) => logger.debug?.(message), + channel: "mattermost", + target: channelId, + error: err, + }); + }, + }, + }); + const { dispatcher, replyOptions, markDispatchIdle } = + core.channel.reply.createReplyDispatcherWithTyping({ + ...replyPipeline, + humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), + typingCallbacks, + deliver: async (payload: ReplyPayload) => { + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + runtime.log?.(`delivered reply to ${to}`); + }, + onError: (err, info) => { + runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); + }, + }); + + await core.channel.reply.withReplyDispatcher({ + dispatcher, + onSettled: () => { + markDispatchIdle(); + }, + run: () => + core.channel.reply.dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions: { + ...replyOptions, + disableBlockStreaming: + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + onModelSelected, + }, + }), + }); + if (historyKey) { + clearHistoryEntriesIfEnabled({ + historyMap: channelHistories, + historyKey, + limit: historyLimit, + }); + } + }, + }); + if (replayResult === "duplicate") { logVerboseMessage( `mattermost: drop post (dedupe account=${account.accountId} ids=${allMessageIds.length})`, ); return; } - - const senderId = post.user_id ?? payload.broadcast?.user_id; - if (!senderId) { - logVerboseMessage("mattermost: drop post (missing sender id)"); - return; - } - if (senderId === botUserId) { - logVerboseMessage(`mattermost: drop post (self sender=${senderId})`); - return; - } - if (isSystemPost(post)) { - logVerboseMessage(`mattermost: drop post (system post type=${post.type ?? "unknown"})`); - return; - } - - const channelInfo = await resolveChannelInfo(channelId); - const channelType = payload.data?.channel_type ?? channelInfo?.type ?? undefined; - const kind = mapMattermostChannelTypeToChatType(channelType); - const chatType = channelChatType(kind); - - const senderName = - normalizeOptionalString(payload.data?.sender_name) ?? - normalizeOptionalString((await resolveUserInfo(senderId))?.username) ?? - senderId; - const rawText = normalizeOptionalString(post.message) ?? ""; - const dmPolicy = account.config.dmPolicy ?? "pairing"; - const normalizedAllowFrom = normalizeMattermostAllowList(account.config.allowFrom ?? []); - const normalizedGroupAllowFrom = normalizeMattermostAllowList( - account.config.groupAllowFrom ?? [], - ); - const storeAllowFrom = normalizeMattermostAllowList( - await readStoreAllowFromForDmPolicy({ - provider: "mattermost", - accountId: account.accountId, - dmPolicy, - readStore: pairing.readStoreForDmPolicy, - }), - ); - const accessDecision = resolveDmGroupAccessWithLists({ - isGroup: kind !== "direct", - dmPolicy, - groupPolicy, - allowFrom: normalizedAllowFrom, - groupAllowFrom: normalizedGroupAllowFrom, - storeAllowFrom, - isSenderAllowed: (allowFrom) => - isMattermostSenderAllowed({ - senderId, - senderName, - allowFrom, - allowNameMatching, - }), - }); - const effectiveAllowFrom = accessDecision.effectiveAllowFrom; - const effectiveGroupAllowFrom = accessDecision.effectiveGroupAllowFrom; - const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ - cfg, - surface: "mattermost", - }); - const hasControlCommand = core.channel.text.hasControlCommand(rawText, cfg); - const isControlCommand = allowTextCommands && hasControlCommand; - const useAccessGroups = cfg.commands?.useAccessGroups !== false; - const commandDmAllowFrom = kind === "direct" ? effectiveAllowFrom : normalizedAllowFrom; - const senderAllowedForCommands = isMattermostSenderAllowed({ - senderId, - senderName, - allowFrom: commandDmAllowFrom, - allowNameMatching, - }); - const groupAllowedForCommands = isMattermostSenderAllowed({ - senderId, - senderName, - allowFrom: effectiveGroupAllowFrom, - allowNameMatching, - }); - const commandGate = resolveControlCommandGate({ - useAccessGroups, - authorizers: [ - { configured: commandDmAllowFrom.length > 0, allowed: senderAllowedForCommands }, - { - configured: effectiveGroupAllowFrom.length > 0, - allowed: groupAllowedForCommands, - }, - ], - allowTextCommands, - hasControlCommand, - }); - const commandAuthorized = commandGate.commandAuthorized; - - if (accessDecision.decision !== "allow") { - if (kind === "direct") { - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.DM_POLICY_DISABLED) { - logVerboseMessage(`mattermost: drop dm (dmPolicy=disabled sender=${senderId})`); - return; - } - if (accessDecision.decision === "pairing") { - const { code, created } = await pairing.upsertPairingRequest({ - id: senderId, - meta: { name: senderName }, - }); - logVerboseMessage(`mattermost: pairing request sender=${senderId} created=${created}`); - if (created) { - try { - await sendMessageMattermost( - `user:${senderId}`, - core.channel.pairing.buildPairingReply({ - channel: "mattermost", - idLine: `Your Mattermost user id: ${senderId}`, - code, - }), - { cfg, accountId: account.accountId }, - ); - opts.statusSink?.({ lastOutboundAt: Date.now() }); - } catch (err) { - logVerboseMessage(`mattermost: pairing reply failed for ${senderId}: ${String(err)}`); - } - } - return; - } - logVerboseMessage(`mattermost: drop dm sender=${senderId} (dmPolicy=${dmPolicy})`); - return; - } - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_DISABLED) { - logVerboseMessage("mattermost: drop group message (groupPolicy=disabled)"); - return; - } - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_EMPTY_ALLOWLIST) { - logVerboseMessage("mattermost: drop group message (no group allowlist)"); - return; - } - if (accessDecision.reasonCode === DM_GROUP_ACCESS_REASON.GROUP_POLICY_NOT_ALLOWLISTED) { - logVerboseMessage(`mattermost: drop group sender=${senderId} (not in groupAllowFrom)`); - return; - } - logVerboseMessage( - `mattermost: drop group message (groupPolicy=${groupPolicy} reason=${accessDecision.reason})`, - ); - return; - } - - if (kind !== "direct" && commandGate.shouldBlock) { - logInboundDrop({ - log: logVerboseMessage, - channel: "mattermost", - reason: "control command (unauthorized)", - target: senderId, - }); - return; - } - - const teamId = payload.data?.team_id ?? channelInfo?.team_id ?? undefined; - const channelName = payload.data?.channel_name ?? channelInfo?.name ?? ""; - const channelDisplay = - payload.data?.channel_display_name ?? channelInfo?.display_name ?? channelName; - const roomLabel = channelName ? `#${channelName}` : channelDisplay || `#${channelId}`; - - const route = core.channel.routing.resolveAgentRoute({ - cfg, - channel: "mattermost", - accountId: account.accountId, - teamId, - peer: { - kind, - id: kind === "direct" ? senderId : channelId, - }, - }); - - const baseSessionKey = route.sessionKey; - const threadRootId = normalizeOptionalString(post.root_id); - const replyToMode = resolveMattermostReplyToMode(account, kind); - const threadContext = resolveMattermostThreadSessionContext({ - baseSessionKey, - kind, - postId: post.id, - replyToMode, - threadRootId, - }); - const { effectiveReplyToId, sessionKey, parentSessionKey } = threadContext; - const historyKey = kind === "direct" ? null : sessionKey; - - const mentionRegexes = core.channel.mentions.buildMentionRegexes(cfg, route.agentId); - const wasMentioned = - kind !== "direct" && - ((botUsername - ? normalizeLowercaseStringOrEmpty(rawText).includes( - `@${normalizeLowercaseStringOrEmpty(botUsername)}`, - ) - : false) || - core.channel.mentions.matchesMentionPatterns(rawText, mentionRegexes)); - const pendingBody = - rawText || - (post.file_ids?.length - ? `[Mattermost ${post.file_ids.length === 1 ? "file" : "files"}]` - : ""); - const pendingSender = senderName; - const recordPendingHistory = () => { - const trimmed = pendingBody.trim(); - recordPendingHistoryEntryIfEnabled({ - historyMap: channelHistories, - limit: historyLimit, - historyKey: historyKey ?? "", - entry: - historyKey && trimmed - ? { - sender: pendingSender, - body: trimmed, - timestamp: typeof post.create_at === "number" ? post.create_at : undefined, - messageId: post.id ?? undefined, - } - : null, - }); - }; - - const oncharEnabled = account.chatmode === "onchar" && kind !== "direct"; - const oncharPrefixes = oncharEnabled ? resolveOncharPrefixes(account.oncharPrefixes) : []; - const oncharResult = oncharEnabled - ? stripOncharPrefix(rawText, oncharPrefixes) - : { triggered: false, stripped: rawText }; - const oncharTriggered = oncharResult.triggered; - const canDetectMention = Boolean(botUsername) || mentionRegexes.length > 0; - const mentionDecision = evaluateMattermostMentionGate({ - kind, - cfg, - accountId: account.accountId, - channelId, - threadRootId, - requireMentionOverride: account.requireMention, - resolveRequireMention: core.channel.groups.resolveRequireMention, - wasMentioned, - isControlCommand, - commandAuthorized, - oncharEnabled, - oncharTriggered, - canDetectMention, - }); - const { shouldRequireMention, shouldBypassMention } = mentionDecision; - - if (mentionDecision.dropReason === "onchar-not-triggered") { - logVerboseMessage( - `mattermost: drop group message (onchar not triggered channel=${channelId} sender=${senderId})`, - ); - recordPendingHistory(); - return; - } - - if (mentionDecision.dropReason === "missing-mention") { - logVerboseMessage( - `mattermost: drop group message (missing mention channel=${channelId} sender=${senderId} requireMention=${shouldRequireMention} bypass=${shouldBypassMention} canDetectMention=${canDetectMention})`, - ); - recordPendingHistory(); - return; - } - const mediaList = await resolveMattermostMedia(post.file_ids); - const mediaPlaceholder = buildMattermostAttachmentPlaceholder(mediaList); - const bodySource = oncharTriggered ? oncharResult.stripped : rawText; - const baseText = [bodySource, mediaPlaceholder].filter(Boolean).join("\n").trim(); - const bodyText = normalizeMention(baseText, botUsername); - if (!bodyText) { - logVerboseMessage( - `mattermost: drop group message (empty body after normalization channel=${channelId} sender=${senderId})`, - ); - return; - } - - core.channel.activity.record({ - channel: "mattermost", - accountId: account.accountId, - direction: "inbound", - }); - - const fromLabel = formatInboundFromLabel({ - isGroup: kind !== "direct", - groupLabel: channelDisplay || roomLabel, - groupId: channelId, - groupFallback: roomLabel || "Channel", - directLabel: senderName, - directId: senderId, - }); - - const preview = bodyText.replace(/\s+/g, " ").slice(0, 160); - const inboundLabel = - kind === "direct" - ? `Mattermost DM from ${senderName}` - : `Mattermost message in ${roomLabel} from ${senderName}`; - core.system.enqueueSystemEvent(`${inboundLabel}: ${preview}`, { - sessionKey, - contextKey: `mattermost:message:${channelId}:${post.id ?? "unknown"}`, - }); - - const textWithId = `${bodyText}\n[mattermost message id: ${post.id ?? "unknown"} channel: ${channelId}]`; - const body = core.channel.reply.formatInboundEnvelope({ - channel: "Mattermost", - from: fromLabel, - timestamp: typeof post.create_at === "number" ? post.create_at : undefined, - body: textWithId, - chatType, - sender: { name: senderName, id: senderId }, - }); - let combinedBody = body; - if (historyKey) { - combinedBody = buildPendingHistoryContextFromMap({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, - currentMessage: combinedBody, - formatEntry: (entry) => - core.channel.reply.formatInboundEnvelope({ - channel: "Mattermost", - from: fromLabel, - timestamp: entry.timestamp, - body: `${entry.body}${ - entry.messageId ? ` [id:${entry.messageId} channel:${channelId}]` : "" - }`, - chatType, - senderLabel: entry.sender, - }), - }); - } - - const to = kind === "direct" ? `user:${senderId}` : `channel:${channelId}`; - const mediaPayload = buildAgentMediaPayload(mediaList); - const commandBody = rawText.trim(); - const inboundHistory = - historyKey && historyLimit > 0 - ? (channelHistories.get(historyKey) ?? []).map((entry) => ({ - sender: entry.sender, - body: entry.body, - timestamp: entry.timestamp, - })) - : undefined; - const ctxPayload = core.channel.reply.finalizeInboundContext({ - Body: combinedBody, - BodyForAgent: bodyText, - InboundHistory: inboundHistory, - RawBody: bodyText, - CommandBody: commandBody, - BodyForCommands: commandBody, - From: - kind === "direct" - ? `mattermost:${senderId}` - : kind === "group" - ? `mattermost:group:${channelId}` - : `mattermost:channel:${channelId}`, - To: to, - SessionKey: sessionKey, - ParentSessionKey: parentSessionKey, - AccountId: route.accountId, - ChatType: chatType, - ConversationLabel: fromLabel, - GroupSubject: kind !== "direct" ? channelDisplay || roomLabel : undefined, - GroupChannel: channelName ? `#${channelName}` : undefined, - GroupSpace: teamId, - SenderName: senderName, - SenderId: senderId, - Provider: "mattermost" as const, - Surface: "mattermost" as const, - MessageSid: post.id ?? undefined, - MessageSids: allMessageIds.length > 1 ? allMessageIds : undefined, - MessageSidFirst: allMessageIds.length > 1 ? allMessageIds[0] : undefined, - MessageSidLast: - allMessageIds.length > 1 ? allMessageIds[allMessageIds.length - 1] : undefined, - ReplyToId: effectiveReplyToId, - MessageThreadId: effectiveReplyToId, - Timestamp: typeof post.create_at === "number" ? post.create_at : undefined, - WasMentioned: kind !== "direct" ? mentionDecision.effectiveWasMentioned : undefined, - CommandAuthorized: commandAuthorized, - OriginatingChannel: "mattermost" as const, - OriginatingTo: to, - ...mediaPayload, - }); - - if (kind === "direct") { - const sessionCfg = cfg.session; - const storePath = core.channel.session.resolveStorePath(sessionCfg?.store, { - agentId: route.agentId, - }); - await core.channel.session.updateLastRoute({ - storePath, - sessionKey: route.mainSessionKey, - deliveryContext: { - channel: "mattermost", - to, - accountId: route.accountId, - }, - }); - } - - const previewLine = bodyText.slice(0, 200).replace(/\n/g, "\\n"); - logVerboseMessage( - `mattermost inbound: from=${ctxPayload.From} len=${bodyText.length} preview="${previewLine}"`, - ); - - const textLimit = core.channel.text.resolveTextChunkLimit( - cfg, - "mattermost", - account.accountId, - { - fallbackLimit: account.textChunkLimit ?? 4000, - }, - ); - const tableMode = core.channel.text.resolveMarkdownTableMode({ - cfg, - channel: "mattermost", - accountId: account.accountId, - }); - - const { onModelSelected, typingCallbacks, ...replyPipeline } = createChannelReplyPipeline({ - cfg, - agentId: route.agentId, - channel: "mattermost", - accountId: account.accountId, - typing: { - start: () => sendTypingIndicator(channelId, effectiveReplyToId), - onStartError: (err) => { - logTypingFailure({ - log: (message) => logger.debug?.(message), - channel: "mattermost", - target: channelId, - error: err, - }); - }, - }, - }); - const { dispatcher, replyOptions, markDispatchIdle } = - core.channel.reply.createReplyDispatcherWithTyping({ - ...replyPipeline, - humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), - typingCallbacks, - deliver: async (payload: ReplyPayload) => { - await deliverMattermostReplyPayload({ - core, - cfg, - payload, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - textLimit, - tableMode, - sendMessage: sendMessageMattermost, - }); - runtime.log?.(`delivered reply to ${to}`); - }, - onError: (err, info) => { - runtime.error?.(`mattermost ${info.kind} reply failed: ${String(err)}`); - }, - }); - - await core.channel.reply.withReplyDispatcher({ - dispatcher, - onSettled: () => { - markDispatchIdle(); - }, - run: () => - core.channel.reply.dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions: { - ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, - onModelSelected, - }, - }), - }); - if (historyKey) { - clearHistoryEntriesIfEnabled({ - historyMap: channelHistories, - historyKey, - limit: historyLimit, - }); - } }; const handleReactionEvent = async (payload: MattermostEventPayload) => {