diff --git a/extensions/bluebubbles/src/monitor-debounce.ts b/extensions/bluebubbles/src/monitor-debounce.ts new file mode 100644 index 00000000000..952c591e847 --- /dev/null +++ b/extensions/bluebubbles/src/monitor-debounce.ts @@ -0,0 +1,205 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk"; +import type { NormalizedWebhookMessage } from "./monitor-normalize.js"; +import type { BlueBubblesCoreRuntime, WebhookTarget } from "./monitor-shared.js"; + +/** + * Entry type for debouncing inbound messages. + * Captures the normalized message and its target for later combined processing. + */ +type BlueBubblesDebounceEntry = { + message: NormalizedWebhookMessage; + target: WebhookTarget; +}; + +export type BlueBubblesDebouncer = { + enqueue: (item: BlueBubblesDebounceEntry) => Promise; + flushKey: (key: string) => Promise; +}; + +export type BlueBubblesDebounceRegistry = { + getOrCreateDebouncer: (target: WebhookTarget) => BlueBubblesDebouncer; + removeDebouncer: (target: WebhookTarget) => void; +}; + +/** + * Default debounce window for inbound message coalescing (ms). + * This helps combine URL text + link preview balloon messages that BlueBubbles + * sends as separate webhook events when no explicit inbound debounce config exists. + */ +const DEFAULT_INBOUND_DEBOUNCE_MS = 500; + +/** + * Combines multiple debounced messages into a single message for processing. + * Used when multiple webhook events arrive within the debounce window. + */ +function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage { + if (entries.length === 0) { + throw new Error("Cannot combine empty entries"); + } + if (entries.length === 1) { + return entries[0].message; + } + + // Use the first message as the base (typically the text message) + const first = entries[0].message; + + // Combine text from all entries, filtering out duplicates and empty strings + const seenTexts = new Set(); + const textParts: string[] = []; + + for (const entry of entries) { + const text = entry.message.text.trim(); + if (!text) { + continue; + } + // Skip duplicate text (URL might be in both text message and balloon) + const normalizedText = text.toLowerCase(); + if (seenTexts.has(normalizedText)) { + continue; + } + seenTexts.add(normalizedText); + textParts.push(text); + } + + // Merge attachments from all entries + const allAttachments = entries.flatMap((e) => e.message.attachments ?? []); + + // Use the latest timestamp + const timestamps = entries + .map((e) => e.message.timestamp) + .filter((t): t is number => typeof t === "number"); + const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp; + + // Collect all message IDs for reference + const messageIds = entries + .map((e) => e.message.messageId) + .filter((id): id is string => Boolean(id)); + + // Prefer reply context from any entry that has it + const entryWithReply = entries.find((e) => e.message.replyToId); + + return { + ...first, + text: textParts.join(" "), + attachments: allAttachments.length > 0 ? allAttachments : first.attachments, + timestamp: latestTimestamp, + // Use first message's ID as primary (for reply reference), but we've coalesced others + messageId: messageIds[0] ?? first.messageId, + // Preserve reply context if present + replyToId: entryWithReply?.message.replyToId ?? first.replyToId, + replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody, + replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender, + // Clear balloonBundleId since we've combined (the combined message is no longer just a balloon) + balloonBundleId: undefined, + }; +} + +function resolveBlueBubblesDebounceMs( + config: OpenClawConfig, + core: BlueBubblesCoreRuntime, +): number { + const inbound = config.messages?.inbound; + const hasExplicitDebounce = + typeof inbound?.debounceMs === "number" || typeof inbound?.byChannel?.bluebubbles === "number"; + if (!hasExplicitDebounce) { + return DEFAULT_INBOUND_DEBOUNCE_MS; + } + return core.channel.debounce.resolveInboundDebounceMs({ cfg: config, channel: "bluebubbles" }); +} + +export function createBlueBubblesDebounceRegistry(params: { + processMessage: (message: NormalizedWebhookMessage, target: WebhookTarget) => Promise; +}): BlueBubblesDebounceRegistry { + const targetDebouncers = new Map(); + + return { + getOrCreateDebouncer: (target) => { + const existing = targetDebouncers.get(target); + if (existing) { + return existing; + } + + const { account, config, runtime, core } = target; + const debouncer = core.channel.debounce.createInboundDebouncer({ + debounceMs: resolveBlueBubblesDebounceMs(config, core), + buildKey: (entry) => { + const msg = entry.message; + // Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the + // same message (e.g., text-only then text+attachment). + // + // For balloons (URL previews, stickers, etc), BlueBubbles often uses a different + // messageId than the originating text. When present, key by associatedMessageGuid + // to keep text + balloon coalescing working. + const balloonBundleId = msg.balloonBundleId?.trim(); + const associatedMessageGuid = msg.associatedMessageGuid?.trim(); + if (balloonBundleId && associatedMessageGuid) { + return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`; + } + + const messageId = msg.messageId?.trim(); + if (messageId) { + return `bluebubbles:${account.accountId}:msg:${messageId}`; + } + + const chatKey = + msg.chatGuid?.trim() ?? + msg.chatIdentifier?.trim() ?? + (msg.chatId ? String(msg.chatId) : "dm"); + return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`; + }, + shouldDebounce: (entry) => { + const msg = entry.message; + // Skip debouncing for from-me messages (they're just cached, not processed) + if (msg.fromMe) { + return false; + } + // Skip debouncing for control commands - process immediately + if (core.channel.text.hasControlCommand(msg.text, config)) { + return false; + } + // Debounce all other messages to coalesce rapid-fire webhook events + // (e.g., text+image arriving as separate webhooks for the same messageId) + return true; + }, + onFlush: async (entries) => { + if (entries.length === 0) { + return; + } + + // Use target from first entry (all entries have same target due to key structure) + const flushTarget = entries[0].target; + + if (entries.length === 1) { + // Single message - process normally + await params.processMessage(entries[0].message, flushTarget); + return; + } + + // Multiple messages - combine and process + const combined = combineDebounceEntries(entries); + + if (core.logging.shouldLogVerbose()) { + const count = entries.length; + const preview = combined.text.slice(0, 50); + runtime.log?.( + `[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`, + ); + } + + await params.processMessage(combined, flushTarget); + }, + onError: (err) => { + runtime.error?.( + `[${account.accountId}] [bluebubbles] debounce flush failed: ${String(err)}`, + ); + }, + }); + + targetDebouncers.set(target, debouncer); + return debouncer; + }, + removeDebouncer: (target) => { + targetDebouncers.delete(target); + }, + }; +} diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index 7ec98279dbf..a0e06bce6d8 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -1,19 +1,15 @@ import { timingSafeEqual } from "node:crypto"; import type { IncomingMessage, ServerResponse } from "node:http"; -import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { beginWebhookRequestPipelineOrReject, createWebhookInFlightLimiter, registerWebhookTargetWithPluginRoute, readWebhookBodyOrReject, - resolveSingleWebhookTarget, + resolveWebhookTargetWithAuthOrRejectSync, resolveWebhookTargets, } from "openclaw/plugin-sdk"; -import { - normalizeWebhookMessage, - normalizeWebhookReaction, - type NormalizedWebhookMessage, -} from "./monitor-normalize.js"; +import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js"; +import { normalizeWebhookMessage, normalizeWebhookReaction } from "./monitor-normalize.js"; import { logVerbose, processMessage, processReaction } from "./monitor-processing.js"; import { _resetBlueBubblesShortIdState, @@ -23,216 +19,15 @@ import { DEFAULT_WEBHOOK_PATH, normalizeWebhookPath, resolveWebhookPathFromConfig, - type BlueBubblesCoreRuntime, type BlueBubblesMonitorOptions, type WebhookTarget, } from "./monitor-shared.js"; import { fetchBlueBubblesServerInfo } from "./probe.js"; import { getBlueBubblesRuntime } from "./runtime.js"; -/** - * Entry type for debouncing inbound messages. - * Captures the normalized message and its target for later combined processing. - */ -type BlueBubblesDebounceEntry = { - message: NormalizedWebhookMessage; - target: WebhookTarget; -}; - -/** - * Default debounce window for inbound message coalescing (ms). - * This helps combine URL text + link preview balloon messages that BlueBubbles - * sends as separate webhook events when no explicit inbound debounce config exists. - */ -const DEFAULT_INBOUND_DEBOUNCE_MS = 500; - -/** - * Combines multiple debounced messages into a single message for processing. - * Used when multiple webhook events arrive within the debounce window. - */ -function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage { - if (entries.length === 0) { - throw new Error("Cannot combine empty entries"); - } - if (entries.length === 1) { - return entries[0].message; - } - - // Use the first message as the base (typically the text message) - const first = entries[0].message; - - // Combine text from all entries, filtering out duplicates and empty strings - const seenTexts = new Set(); - const textParts: string[] = []; - - for (const entry of entries) { - const text = entry.message.text.trim(); - if (!text) { - continue; - } - // Skip duplicate text (URL might be in both text message and balloon) - const normalizedText = text.toLowerCase(); - if (seenTexts.has(normalizedText)) { - continue; - } - seenTexts.add(normalizedText); - textParts.push(text); - } - - // Merge attachments from all entries - const allAttachments = entries.flatMap((e) => e.message.attachments ?? []); - - // Use the latest timestamp - const timestamps = entries - .map((e) => e.message.timestamp) - .filter((t): t is number => typeof t === "number"); - const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp; - - // Collect all message IDs for reference - const messageIds = entries - .map((e) => e.message.messageId) - .filter((id): id is string => Boolean(id)); - - // Prefer reply context from any entry that has it - const entryWithReply = entries.find((e) => e.message.replyToId); - - return { - ...first, - text: textParts.join(" "), - attachments: allAttachments.length > 0 ? allAttachments : first.attachments, - timestamp: latestTimestamp, - // Use first message's ID as primary (for reply reference), but we've coalesced others - messageId: messageIds[0] ?? first.messageId, - // Preserve reply context if present - replyToId: entryWithReply?.message.replyToId ?? first.replyToId, - replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody, - replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender, - // Clear balloonBundleId since we've combined (the combined message is no longer just a balloon) - balloonBundleId: undefined, - }; -} - const webhookTargets = new Map(); const webhookInFlightLimiter = createWebhookInFlightLimiter(); - -type BlueBubblesDebouncer = { - enqueue: (item: BlueBubblesDebounceEntry) => Promise; - flushKey: (key: string) => Promise; -}; - -/** - * Maps webhook targets to their inbound debouncers. - * Each target gets its own debouncer keyed by a unique identifier. - */ -const targetDebouncers = new Map(); - -function resolveBlueBubblesDebounceMs( - config: OpenClawConfig, - core: BlueBubblesCoreRuntime, -): number { - const inbound = config.messages?.inbound; - const hasExplicitDebounce = - typeof inbound?.debounceMs === "number" || typeof inbound?.byChannel?.bluebubbles === "number"; - if (!hasExplicitDebounce) { - return DEFAULT_INBOUND_DEBOUNCE_MS; - } - return core.channel.debounce.resolveInboundDebounceMs({ cfg: config, channel: "bluebubbles" }); -} - -/** - * Creates or retrieves a debouncer for a webhook target. - */ -function getOrCreateDebouncer(target: WebhookTarget) { - const existing = targetDebouncers.get(target); - if (existing) { - return existing; - } - - const { account, config, runtime, core } = target; - - const debouncer = core.channel.debounce.createInboundDebouncer({ - debounceMs: resolveBlueBubblesDebounceMs(config, core), - buildKey: (entry) => { - const msg = entry.message; - // Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the - // same message (e.g., text-only then text+attachment). - // - // For balloons (URL previews, stickers, etc), BlueBubbles often uses a different - // messageId than the originating text. When present, key by associatedMessageGuid - // to keep text + balloon coalescing working. - const balloonBundleId = msg.balloonBundleId?.trim(); - const associatedMessageGuid = msg.associatedMessageGuid?.trim(); - if (balloonBundleId && associatedMessageGuid) { - return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`; - } - - const messageId = msg.messageId?.trim(); - if (messageId) { - return `bluebubbles:${account.accountId}:msg:${messageId}`; - } - - const chatKey = - msg.chatGuid?.trim() ?? - msg.chatIdentifier?.trim() ?? - (msg.chatId ? String(msg.chatId) : "dm"); - return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`; - }, - shouldDebounce: (entry) => { - const msg = entry.message; - // Skip debouncing for from-me messages (they're just cached, not processed) - if (msg.fromMe) { - return false; - } - // Skip debouncing for control commands - process immediately - if (core.channel.text.hasControlCommand(msg.text, config)) { - return false; - } - // Debounce all other messages to coalesce rapid-fire webhook events - // (e.g., text+image arriving as separate webhooks for the same messageId) - return true; - }, - onFlush: async (entries) => { - if (entries.length === 0) { - return; - } - - // Use target from first entry (all entries have same target due to key structure) - const flushTarget = entries[0].target; - - if (entries.length === 1) { - // Single message - process normally - await processMessage(entries[0].message, flushTarget); - return; - } - - // Multiple messages - combine and process - const combined = combineDebounceEntries(entries); - - if (core.logging.shouldLogVerbose()) { - const count = entries.length; - const preview = combined.text.slice(0, 50); - runtime.log?.( - `[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`, - ); - } - - await processMessage(combined, flushTarget); - }, - onError: (err) => { - runtime.error?.(`[${account.accountId}] [bluebubbles] debounce flush failed: ${String(err)}`); - }, - }); - - targetDebouncers.set(target, debouncer); - return debouncer; -} - -/** - * Removes a debouncer for a target (called during unregistration). - */ -function removeDebouncer(target: WebhookTarget): void { - targetDebouncers.delete(target); -} +const debounceRegistry = createBlueBubblesDebounceRegistry({ processMessage }); export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => void { const registered = registerWebhookTargetWithPluginRoute({ @@ -258,7 +53,7 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v return () => { registered.unregister(); // Clean up debouncer when target is unregistered - removeDebouncer(registered.target); + debounceRegistry.removeDebouncer(registered.target); }; } @@ -352,28 +147,20 @@ export async function handleBlueBubblesWebhookRequest( req.headers["x-bluebubbles-guid"] ?? req.headers["authorization"]; const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? ""; - const matchedTarget = resolveSingleWebhookTarget(targets, (target) => { - const token = target.account.config.password?.trim() ?? ""; - return safeEqualSecret(guid, token); + const target = resolveWebhookTargetWithAuthOrRejectSync({ + targets, + res, + isMatch: (target) => { + const token = target.account.config.password?.trim() ?? ""; + return safeEqualSecret(guid, token); + }, }); - - if (matchedTarget.kind === "none") { - res.statusCode = 401; - res.end("unauthorized"); + if (!target) { console.warn( - `[bluebubbles] webhook rejected: unauthorized guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`, + `[bluebubbles] webhook rejected: status=${res.statusCode} path=${path} guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`, ); return true; } - - if (matchedTarget.kind === "ambiguous") { - res.statusCode = 401; - res.end("ambiguous webhook target"); - console.warn(`[bluebubbles] webhook rejected: ambiguous target match path=${path}`); - return true; - } - - const target = matchedTarget.target; const body = await readWebhookBodyOrReject({ req, res, @@ -454,7 +241,7 @@ export async function handleBlueBubblesWebhookRequest( } else if (message) { // Route messages through debouncer to coalesce rapid-fire events // (e.g., text message + URL balloon arriving as separate webhooks) - const debouncer = getOrCreateDebouncer(target); + const debouncer = debounceRegistry.getOrCreateDebouncer(target); debouncer.enqueue({ message, target }).catch((err) => { target.runtime.error?.( `[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`, diff --git a/extensions/googlechat/src/monitor-access.ts b/extensions/googlechat/src/monitor-access.ts new file mode 100644 index 00000000000..f057c645de9 --- /dev/null +++ b/extensions/googlechat/src/monitor-access.ts @@ -0,0 +1,357 @@ +import { + GROUP_POLICY_BLOCKED_LABEL, + createScopedPairingAccess, + isDangerousNameMatchingEnabled, + resolveAllowlistProviderRuntimeGroupPolicy, + resolveDefaultGroupPolicy, + resolveDmGroupAccessWithLists, + resolveMentionGatingWithBypass, + warnMissingProviderGroupPolicyFallbackOnce, +} from "openclaw/plugin-sdk"; +import type { OpenClawConfig } from "openclaw/plugin-sdk"; +import type { ResolvedGoogleChatAccount } from "./accounts.js"; +import { sendGoogleChatMessage } from "./api.js"; +import type { GoogleChatCoreRuntime } from "./monitor-types.js"; +import type { GoogleChatAnnotation, GoogleChatMessage, GoogleChatSpace } from "./types.js"; + +function normalizeUserId(raw?: string | null): string { + const trimmed = raw?.trim() ?? ""; + if (!trimmed) { + return ""; + } + return trimmed.replace(/^users\//i, "").toLowerCase(); +} + +function isEmailLike(value: string): boolean { + // Keep this intentionally loose; allowlists are user-provided config. + return value.includes("@"); +} + +export function isSenderAllowed( + senderId: string, + senderEmail: string | undefined, + allowFrom: string[], + allowNameMatching = false, +) { + if (allowFrom.includes("*")) { + return true; + } + const normalizedSenderId = normalizeUserId(senderId); + const normalizedEmail = senderEmail?.trim().toLowerCase() ?? ""; + return allowFrom.some((entry) => { + const normalized = String(entry).trim().toLowerCase(); + if (!normalized) { + return false; + } + + // Accept `googlechat:` but treat `users/...` as an *ID* only (deprecated `users/`). + const withoutPrefix = normalized.replace(/^(googlechat|google-chat|gchat):/i, ""); + if (withoutPrefix.startsWith("users/")) { + return normalizeUserId(withoutPrefix) === normalizedSenderId; + } + + // Raw email allowlist entries are a break-glass override. + if (allowNameMatching && normalizedEmail && isEmailLike(withoutPrefix)) { + return withoutPrefix === normalizedEmail; + } + + return withoutPrefix.replace(/^users\//i, "") === normalizedSenderId; + }); +} + +type GoogleChatGroupEntry = { + requireMention?: boolean; + allow?: boolean; + enabled?: boolean; + users?: Array; + systemPrompt?: string; +}; + +function resolveGroupConfig(params: { + groupId: string; + groupName?: string | null; + groups?: Record; +}) { + const { groupId, groupName, groups } = params; + const entries = groups ?? {}; + const keys = Object.keys(entries); + if (keys.length === 0) { + return { entry: undefined, allowlistConfigured: false }; + } + const normalizedName = groupName?.trim().toLowerCase(); + const candidates = [groupId, groupName ?? "", normalizedName ?? ""].filter(Boolean); + let entry = candidates.map((candidate) => entries[candidate]).find(Boolean); + if (!entry && normalizedName) { + entry = entries[normalizedName]; + } + const fallback = entries["*"]; + return { entry: entry ?? fallback, allowlistConfigured: true, fallback }; +} + +function extractMentionInfo(annotations: GoogleChatAnnotation[], botUser?: string | null) { + const mentionAnnotations = annotations.filter((entry) => entry.type === "USER_MENTION"); + const hasAnyMention = mentionAnnotations.length > 0; + const botTargets = new Set(["users/app", botUser?.trim()].filter(Boolean) as string[]); + const wasMentioned = mentionAnnotations.some((entry) => { + const userName = entry.userMention?.user?.name; + if (!userName) { + return false; + } + if (botTargets.has(userName)) { + return true; + } + return normalizeUserId(userName) === "app"; + }); + return { hasAnyMention, wasMentioned }; +} + +const warnedDeprecatedUsersEmailAllowFrom = new Set(); + +function warnDeprecatedUsersEmailEntries(logVerbose: (message: string) => void, entries: string[]) { + const deprecated = entries.map((v) => String(v).trim()).filter((v) => /^users\/.+@.+/i.test(v)); + if (deprecated.length === 0) { + return; + } + const key = deprecated + .map((v) => v.toLowerCase()) + .sort() + .join(","); + if (warnedDeprecatedUsersEmailAllowFrom.has(key)) { + return; + } + warnedDeprecatedUsersEmailAllowFrom.add(key); + logVerbose( + `Deprecated allowFrom entry detected: "users/" is no longer treated as an email allowlist. Use raw email (alice@example.com) or immutable user id (users/). entries=${deprecated.join(", ")}`, + ); +} + +export async function applyGoogleChatInboundAccessPolicy(params: { + account: ResolvedGoogleChatAccount; + config: OpenClawConfig; + core: GoogleChatCoreRuntime; + space: GoogleChatSpace; + message: GoogleChatMessage; + isGroup: boolean; + senderId: string; + senderName: string; + senderEmail?: string; + rawBody: string; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + logVerbose: (message: string) => void; +}): Promise< + | { + ok: true; + commandAuthorized: boolean | undefined; + effectiveWasMentioned: boolean | undefined; + groupSystemPrompt: string | undefined; + } + | { ok: false } +> { + const { + account, + config, + core, + space, + message, + isGroup, + senderId, + senderName, + senderEmail, + rawBody, + statusSink, + logVerbose, + } = params; + const allowNameMatching = isDangerousNameMatchingEnabled(account.config); + const spaceId = space.name ?? ""; + const pairing = createScopedPairingAccess({ + core, + channel: "googlechat", + accountId: account.accountId, + }); + + const defaultGroupPolicy = resolveDefaultGroupPolicy(config); + const { groupPolicy, providerMissingFallbackApplied } = + resolveAllowlistProviderRuntimeGroupPolicy({ + providerConfigPresent: config.channels?.googlechat !== undefined, + groupPolicy: account.config.groupPolicy, + defaultGroupPolicy, + }); + warnMissingProviderGroupPolicyFallbackOnce({ + providerMissingFallbackApplied, + providerKey: "googlechat", + accountId: account.accountId, + blockedLabel: GROUP_POLICY_BLOCKED_LABEL.space, + log: logVerbose, + }); + const groupConfigResolved = resolveGroupConfig({ + groupId: spaceId, + groupName: space.displayName ?? null, + groups: account.config.groups ?? undefined, + }); + const groupEntry = groupConfigResolved.entry; + const groupUsers = groupEntry?.users ?? account.config.groupAllowFrom ?? []; + let effectiveWasMentioned: boolean | undefined; + + if (isGroup) { + if (groupPolicy === "disabled") { + logVerbose(`drop group message (groupPolicy=disabled, space=${spaceId})`); + return { ok: false }; + } + const groupAllowlistConfigured = groupConfigResolved.allowlistConfigured; + const groupAllowed = Boolean(groupEntry) || Boolean((account.config.groups ?? {})["*"]); + if (groupPolicy === "allowlist") { + if (!groupAllowlistConfigured) { + logVerbose(`drop group message (groupPolicy=allowlist, no allowlist, space=${spaceId})`); + return { ok: false }; + } + if (!groupAllowed) { + logVerbose(`drop group message (not allowlisted, space=${spaceId})`); + return { ok: false }; + } + } + if (groupEntry?.enabled === false || groupEntry?.allow === false) { + logVerbose(`drop group message (space disabled, space=${spaceId})`); + return { ok: false }; + } + + if (groupUsers.length > 0) { + const normalizedGroupUsers = groupUsers.map((v) => String(v)); + warnDeprecatedUsersEmailEntries(logVerbose, normalizedGroupUsers); + const ok = isSenderAllowed(senderId, senderEmail, normalizedGroupUsers, allowNameMatching); + if (!ok) { + logVerbose(`drop group message (sender not allowed, ${senderId})`); + return { ok: false }; + } + } + } + + const dmPolicy = account.config.dm?.policy ?? "pairing"; + const configAllowFrom = (account.config.dm?.allowFrom ?? []).map((v) => String(v)); + const normalizedGroupUsers = groupUsers.map((v) => String(v)); + const senderGroupPolicy = + groupPolicy === "disabled" + ? "disabled" + : normalizedGroupUsers.length > 0 + ? "allowlist" + : "open"; + const shouldComputeAuth = core.channel.commands.shouldComputeCommandAuthorized(rawBody, config); + const storeAllowFrom = + !isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeAuth) + ? await pairing.readAllowFromStore().catch(() => []) + : []; + const access = resolveDmGroupAccessWithLists({ + isGroup, + dmPolicy, + groupPolicy: senderGroupPolicy, + allowFrom: configAllowFrom, + groupAllowFrom: normalizedGroupUsers, + storeAllowFrom, + groupAllowFromFallbackToAllowFrom: false, + isSenderAllowed: (allowFrom) => + isSenderAllowed(senderId, senderEmail, allowFrom, allowNameMatching), + }); + const effectiveAllowFrom = access.effectiveAllowFrom; + const effectiveGroupAllowFrom = access.effectiveGroupAllowFrom; + warnDeprecatedUsersEmailEntries(logVerbose, effectiveAllowFrom); + const commandAllowFrom = isGroup ? effectiveGroupAllowFrom : effectiveAllowFrom; + const useAccessGroups = config.commands?.useAccessGroups !== false; + const senderAllowedForCommands = isSenderAllowed( + senderId, + senderEmail, + commandAllowFrom, + allowNameMatching, + ); + const commandAuthorized = shouldComputeAuth + ? core.channel.commands.resolveCommandAuthorizedFromAuthorizers({ + useAccessGroups, + authorizers: [ + { configured: commandAllowFrom.length > 0, allowed: senderAllowedForCommands }, + ], + }) + : undefined; + + if (isGroup) { + const requireMention = groupEntry?.requireMention ?? account.config.requireMention ?? true; + const annotations = message.annotations ?? []; + const mentionInfo = extractMentionInfo(annotations, account.config.botUser); + const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ + cfg: config, + surface: "googlechat", + }); + const mentionGate = resolveMentionGatingWithBypass({ + isGroup: true, + requireMention, + canDetectMention: true, + wasMentioned: mentionInfo.wasMentioned, + implicitMention: false, + hasAnyMention: mentionInfo.hasAnyMention, + allowTextCommands, + hasControlCommand: core.channel.text.hasControlCommand(rawBody, config), + commandAuthorized: commandAuthorized === true, + }); + effectiveWasMentioned = mentionGate.effectiveWasMentioned; + if (mentionGate.shouldSkip) { + logVerbose(`drop group message (mention required, space=${spaceId})`); + return { ok: false }; + } + } + + if (isGroup && access.decision !== "allow") { + logVerbose( + `drop group message (sender policy blocked, reason=${access.reason}, space=${spaceId})`, + ); + return { ok: false }; + } + + if (!isGroup) { + if (account.config.dm?.enabled === false) { + logVerbose(`Blocked Google Chat DM from ${senderId} (dmPolicy=disabled)`); + return { ok: false }; + } + + if (access.decision !== "allow") { + if (access.decision === "pairing") { + const { code, created } = await pairing.upsertPairingRequest({ + id: senderId, + meta: { name: senderName || undefined, email: senderEmail }, + }); + if (created) { + logVerbose(`googlechat pairing request sender=${senderId}`); + try { + await sendGoogleChatMessage({ + account, + space: spaceId, + text: core.channel.pairing.buildPairingReply({ + channel: "googlechat", + idLine: `Your Google Chat user id: ${senderId}`, + code, + }), + }); + statusSink?.({ lastOutboundAt: Date.now() }); + } catch (err) { + logVerbose(`pairing reply failed for ${senderId}: ${String(err)}`); + } + } + } else { + logVerbose(`Blocked unauthorized Google Chat sender ${senderId} (dmPolicy=${dmPolicy})`); + } + return { ok: false }; + } + } + + if ( + isGroup && + core.channel.commands.isControlCommandMessage(rawBody, config) && + commandAuthorized !== true + ) { + logVerbose(`googlechat: drop control command from ${senderId}`); + return { ok: false }; + } + + return { + ok: true, + commandAuthorized, + effectiveWasMentioned, + groupSystemPrompt: groupEntry?.systemPrompt?.trim() || undefined, + }; +} diff --git a/extensions/googlechat/src/monitor-types.ts b/extensions/googlechat/src/monitor-types.ts new file mode 100644 index 00000000000..6a0f6d8f847 --- /dev/null +++ b/extensions/googlechat/src/monitor-types.ts @@ -0,0 +1,33 @@ +import type { OpenClawConfig } from "openclaw/plugin-sdk"; +import type { ResolvedGoogleChatAccount } from "./accounts.js"; +import type { GoogleChatAudienceType } from "./auth.js"; +import { getGoogleChatRuntime } from "./runtime.js"; + +export type GoogleChatRuntimeEnv = { + log?: (message: string) => void; + error?: (message: string) => void; +}; + +export type GoogleChatMonitorOptions = { + account: ResolvedGoogleChatAccount; + config: OpenClawConfig; + runtime: GoogleChatRuntimeEnv; + abortSignal: AbortSignal; + webhookPath?: string; + webhookUrl?: string; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; +}; + +export type GoogleChatCoreRuntime = ReturnType; + +export type WebhookTarget = { + account: ResolvedGoogleChatAccount; + config: OpenClawConfig; + runtime: GoogleChatRuntimeEnv; + core: GoogleChatCoreRuntime; + path: string; + audienceType?: GoogleChatAudienceType; + audience?: string; + statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; + mediaMaxMb: number; +}; diff --git a/extensions/googlechat/src/monitor-webhook.ts b/extensions/googlechat/src/monitor-webhook.ts new file mode 100644 index 00000000000..c2978566198 --- /dev/null +++ b/extensions/googlechat/src/monitor-webhook.ts @@ -0,0 +1,216 @@ +import type { IncomingMessage, ServerResponse } from "node:http"; +import { + beginWebhookRequestPipelineOrReject, + readJsonWebhookBodyOrReject, + resolveWebhookTargetWithAuthOrReject, + resolveWebhookTargets, + type WebhookInFlightLimiter, +} from "openclaw/plugin-sdk"; +import { verifyGoogleChatRequest } from "./auth.js"; +import type { WebhookTarget } from "./monitor-types.js"; +import type { + GoogleChatEvent, + GoogleChatMessage, + GoogleChatSpace, + GoogleChatUser, +} from "./types.js"; + +function extractBearerToken(header: unknown): string { + const authHeader = Array.isArray(header) ? String(header[0] ?? "") : String(header ?? ""); + return authHeader.toLowerCase().startsWith("bearer ") + ? authHeader.slice("bearer ".length).trim() + : ""; +} + +type ParsedGoogleChatInboundPayload = + | { ok: true; event: GoogleChatEvent; addOnBearerToken: string } + | { ok: false }; + +function parseGoogleChatInboundPayload( + raw: unknown, + res: ServerResponse, +): ParsedGoogleChatInboundPayload { + if (!raw || typeof raw !== "object" || Array.isArray(raw)) { + res.statusCode = 400; + res.end("invalid payload"); + return { ok: false }; + } + + let eventPayload = raw; + let addOnBearerToken = ""; + + // Transform Google Workspace Add-on format to standard Chat API format. + const rawObj = raw as { + commonEventObject?: { hostApp?: string }; + chat?: { + messagePayload?: { space?: GoogleChatSpace; message?: GoogleChatMessage }; + user?: GoogleChatUser; + eventTime?: string; + }; + authorizationEventObject?: { systemIdToken?: string }; + }; + + if (rawObj.commonEventObject?.hostApp === "CHAT" && rawObj.chat?.messagePayload) { + const chat = rawObj.chat; + const messagePayload = chat.messagePayload; + eventPayload = { + type: "MESSAGE", + space: messagePayload?.space, + message: messagePayload?.message, + user: chat.user, + eventTime: chat.eventTime, + }; + addOnBearerToken = String(rawObj.authorizationEventObject?.systemIdToken ?? "").trim(); + } + + const event = eventPayload as GoogleChatEvent; + const eventType = event.type ?? (eventPayload as { eventType?: string }).eventType; + if (typeof eventType !== "string") { + res.statusCode = 400; + res.end("invalid payload"); + return { ok: false }; + } + + if (!event.space || typeof event.space !== "object" || Array.isArray(event.space)) { + res.statusCode = 400; + res.end("invalid payload"); + return { ok: false }; + } + + if (eventType === "MESSAGE") { + if (!event.message || typeof event.message !== "object" || Array.isArray(event.message)) { + res.statusCode = 400; + res.end("invalid payload"); + return { ok: false }; + } + } + + return { ok: true, event, addOnBearerToken }; +} + +export function createGoogleChatWebhookRequestHandler(params: { + webhookTargets: Map; + webhookInFlightLimiter: WebhookInFlightLimiter; + processEvent: (event: GoogleChatEvent, target: WebhookTarget) => Promise; +}): (req: IncomingMessage, res: ServerResponse) => Promise { + return async (req: IncomingMessage, res: ServerResponse): Promise => { + const resolved = resolveWebhookTargets(req, params.webhookTargets); + if (!resolved) { + return false; + } + const { path, targets } = resolved; + + const requestLifecycle = beginWebhookRequestPipelineOrReject({ + req, + res, + allowMethods: ["POST"], + requireJsonContentType: true, + inFlightLimiter: params.webhookInFlightLimiter, + inFlightKey: `${path}:${req.socket?.remoteAddress ?? "unknown"}`, + }); + if (!requestLifecycle.ok) { + return true; + } + + try { + const headerBearer = extractBearerToken(req.headers.authorization); + let selectedTarget: WebhookTarget | null = null; + let parsedEvent: GoogleChatEvent | null = null; + + if (headerBearer) { + selectedTarget = await resolveWebhookTargetWithAuthOrReject({ + targets, + res, + isMatch: async (target) => { + const verification = await verifyGoogleChatRequest({ + bearer: headerBearer, + audienceType: target.audienceType, + audience: target.audience, + }); + return verification.ok; + }, + }); + if (!selectedTarget) { + return true; + } + + const body = await readJsonWebhookBodyOrReject({ + req, + res, + profile: "post-auth", + emptyObjectOnEmpty: false, + invalidJsonMessage: "invalid payload", + }); + if (!body.ok) { + return true; + } + + const parsed = parseGoogleChatInboundPayload(body.value, res); + if (!parsed.ok) { + return true; + } + parsedEvent = parsed.event; + } else { + const body = await readJsonWebhookBodyOrReject({ + req, + res, + profile: "pre-auth", + emptyObjectOnEmpty: false, + invalidJsonMessage: "invalid payload", + }); + if (!body.ok) { + return true; + } + + const parsed = parseGoogleChatInboundPayload(body.value, res); + if (!parsed.ok) { + return true; + } + parsedEvent = parsed.event; + + if (!parsed.addOnBearerToken) { + res.statusCode = 401; + res.end("unauthorized"); + return true; + } + + selectedTarget = await resolveWebhookTargetWithAuthOrReject({ + targets, + res, + isMatch: async (target) => { + const verification = await verifyGoogleChatRequest({ + bearer: parsed.addOnBearerToken, + audienceType: target.audienceType, + audience: target.audience, + }); + return verification.ok; + }, + }); + if (!selectedTarget) { + return true; + } + } + + if (!selectedTarget || !parsedEvent) { + res.statusCode = 401; + res.end("unauthorized"); + return true; + } + + const dispatchTarget = selectedTarget; + dispatchTarget.statusSink?.({ lastInboundAt: Date.now() }); + params.processEvent(parsedEvent, dispatchTarget).catch((err) => { + dispatchTarget.runtime.error?.( + `[${dispatchTarget.account.accountId}] Google Chat webhook failed: ${String(err)}`, + ); + }); + + res.statusCode = 200; + res.setHeader("Content-Type", "application/json"); + res.end("{}"); + return true; + } finally { + requestLifecycle.release(); + } + }; +} diff --git a/extensions/googlechat/src/monitor.ts b/extensions/googlechat/src/monitor.ts index 97c9c979398..f0079b5c0f8 100644 --- a/extensions/googlechat/src/monitor.ts +++ b/extensions/googlechat/src/monitor.ts @@ -1,23 +1,11 @@ import type { IncomingMessage, ServerResponse } from "node:http"; import type { OpenClawConfig } from "openclaw/plugin-sdk"; import { - beginWebhookRequestPipelineOrReject, createWebhookInFlightLimiter, - GROUP_POLICY_BLOCKED_LABEL, - createScopedPairingAccess, createReplyPrefixOptions, - readJsonWebhookBodyOrReject, registerWebhookTargetWithPluginRoute, - isDangerousNameMatchingEnabled, - resolveAllowlistProviderRuntimeGroupPolicy, - resolveDefaultGroupPolicy, resolveInboundRouteEnvelopeBuilderWithRuntime, - resolveSingleWebhookTargetAsync, resolveWebhookPath, - resolveWebhookTargets, - warnMissingProviderGroupPolicyFallbackOnce, - resolveMentionGatingWithBypass, - resolveDmGroupAccessWithLists, } from "openclaw/plugin-sdk"; import { type ResolvedGoogleChatAccount } from "./accounts.js"; import { @@ -26,48 +14,29 @@ import { sendGoogleChatMessage, updateGoogleChatMessage, } from "./api.js"; -import { verifyGoogleChatRequest, type GoogleChatAudienceType } from "./auth.js"; -import { getGoogleChatRuntime } from "./runtime.js"; +import { type GoogleChatAudienceType } from "./auth.js"; +import { applyGoogleChatInboundAccessPolicy, isSenderAllowed } from "./monitor-access.js"; import type { - GoogleChatAnnotation, - GoogleChatAttachment, - GoogleChatEvent, - GoogleChatSpace, - GoogleChatMessage, - GoogleChatUser, -} from "./types.js"; - -export type GoogleChatRuntimeEnv = { - log?: (message: string) => void; - error?: (message: string) => void; -}; - -export type GoogleChatMonitorOptions = { - account: ResolvedGoogleChatAccount; - config: OpenClawConfig; - runtime: GoogleChatRuntimeEnv; - abortSignal: AbortSignal; - webhookPath?: string; - webhookUrl?: string; - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; -}; - -type GoogleChatCoreRuntime = ReturnType; - -type WebhookTarget = { - account: ResolvedGoogleChatAccount; - config: OpenClawConfig; - runtime: GoogleChatRuntimeEnv; - core: GoogleChatCoreRuntime; - path: string; - audienceType?: GoogleChatAudienceType; - audience?: string; - statusSink?: (patch: { lastInboundAt?: number; lastOutboundAt?: number }) => void; - mediaMaxMb: number; -}; + GoogleChatCoreRuntime, + GoogleChatMonitorOptions, + GoogleChatRuntimeEnv, + WebhookTarget, +} from "./monitor-types.js"; +import { createGoogleChatWebhookRequestHandler } from "./monitor-webhook.js"; +import { getGoogleChatRuntime } from "./runtime.js"; +import type { GoogleChatAttachment, GoogleChatEvent } from "./types.js"; +export type { GoogleChatMonitorOptions, GoogleChatRuntimeEnv } from "./monitor-types.js"; +export { isSenderAllowed }; const webhookTargets = new Map(); const webhookInFlightLimiter = createWebhookInFlightLimiter(); +const googleChatWebhookRequestHandler = createGoogleChatWebhookRequestHandler({ + webhookTargets, + webhookInFlightLimiter, + processEvent: async (event, target) => { + await processGoogleChatEvent(event, target); + }, +}); function logVerbose(core: GoogleChatCoreRuntime, runtime: GoogleChatRuntimeEnv, message: string) { if (core.logging.shouldLogVerbose()) { @@ -75,31 +44,6 @@ function logVerbose(core: GoogleChatCoreRuntime, runtime: GoogleChatRuntimeEnv, } } -const warnedDeprecatedUsersEmailAllowFrom = new Set(); -function warnDeprecatedUsersEmailEntries( - core: GoogleChatCoreRuntime, - runtime: GoogleChatRuntimeEnv, - entries: string[], -) { - const deprecated = entries.map((v) => String(v).trim()).filter((v) => /^users\/.+@.+/i.test(v)); - if (deprecated.length === 0) { - return; - } - const key = deprecated - .map((v) => v.toLowerCase()) - .sort() - .join(","); - if (warnedDeprecatedUsersEmailAllowFrom.has(key)) { - return; - } - warnedDeprecatedUsersEmailAllowFrom.add(key); - logVerbose( - core, - runtime, - `Deprecated allowFrom entry detected: "users/" is no longer treated as an email allowlist. Use raw email (alice@example.com) or immutable user id (users/). entries=${deprecated.join(", ")}`, - ); -} - export function registerGoogleChatWebhookTarget(target: WebhookTarget): () => void { return registerWebhookTargetWithPluginRoute({ targetsByPath: webhookTargets, @@ -138,211 +82,11 @@ function normalizeAudienceType(value?: string | null): GoogleChatAudienceType | return undefined; } -function extractBearerToken(header: unknown): string { - const authHeader = Array.isArray(header) ? String(header[0] ?? "") : String(header ?? ""); - return authHeader.toLowerCase().startsWith("bearer ") - ? authHeader.slice("bearer ".length).trim() - : ""; -} - -type ParsedGoogleChatInboundPayload = - | { ok: true; event: GoogleChatEvent; addOnBearerToken: string } - | { ok: false }; - -function parseGoogleChatInboundPayload( - raw: unknown, - res: ServerResponse, -): ParsedGoogleChatInboundPayload { - if (!raw || typeof raw !== "object" || Array.isArray(raw)) { - res.statusCode = 400; - res.end("invalid payload"); - return { ok: false }; - } - - let eventPayload = raw; - let addOnBearerToken = ""; - - // Transform Google Workspace Add-on format to standard Chat API format. - const rawObj = raw as { - commonEventObject?: { hostApp?: string }; - chat?: { - messagePayload?: { space?: GoogleChatSpace; message?: GoogleChatMessage }; - user?: GoogleChatUser; - eventTime?: string; - }; - authorizationEventObject?: { systemIdToken?: string }; - }; - - if (rawObj.commonEventObject?.hostApp === "CHAT" && rawObj.chat?.messagePayload) { - const chat = rawObj.chat; - const messagePayload = chat.messagePayload; - eventPayload = { - type: "MESSAGE", - space: messagePayload?.space, - message: messagePayload?.message, - user: chat.user, - eventTime: chat.eventTime, - }; - addOnBearerToken = String(rawObj.authorizationEventObject?.systemIdToken ?? "").trim(); - } - - const event = eventPayload as GoogleChatEvent; - const eventType = event.type ?? (eventPayload as { eventType?: string }).eventType; - if (typeof eventType !== "string") { - res.statusCode = 400; - res.end("invalid payload"); - return { ok: false }; - } - - if (!event.space || typeof event.space !== "object" || Array.isArray(event.space)) { - res.statusCode = 400; - res.end("invalid payload"); - return { ok: false }; - } - - if (eventType === "MESSAGE") { - if (!event.message || typeof event.message !== "object" || Array.isArray(event.message)) { - res.statusCode = 400; - res.end("invalid payload"); - return { ok: false }; - } - } - - return { ok: true, event, addOnBearerToken }; -} - -async function resolveGoogleChatWebhookTargetByBearer( - targets: readonly WebhookTarget[], - bearer: string, -) { - return await resolveSingleWebhookTargetAsync(targets, async (target) => { - const verification = await verifyGoogleChatRequest({ - bearer, - audienceType: target.audienceType, - audience: target.audience, - }); - return verification.ok; - }); -} - export async function handleGoogleChatWebhookRequest( req: IncomingMessage, res: ServerResponse, ): Promise { - const resolved = resolveWebhookTargets(req, webhookTargets); - if (!resolved) { - return false; - } - const { path, targets } = resolved; - - const requestLifecycle = beginWebhookRequestPipelineOrReject({ - req, - res, - allowMethods: ["POST"], - requireJsonContentType: true, - inFlightLimiter: webhookInFlightLimiter, - inFlightKey: `${path}:${req.socket?.remoteAddress ?? "unknown"}`, - }); - if (!requestLifecycle.ok) { - return true; - } - - try { - const headerBearer = extractBearerToken(req.headers.authorization); - let matchedTarget: Awaited> | null = - null; - let parsedEvent: GoogleChatEvent | null = null; - let addOnBearerToken = ""; - - if (headerBearer) { - matchedTarget = await resolveGoogleChatWebhookTargetByBearer(targets, headerBearer); - if (matchedTarget.kind === "none") { - res.statusCode = 401; - res.end("unauthorized"); - return true; - } - if (matchedTarget.kind === "ambiguous") { - res.statusCode = 401; - res.end("ambiguous webhook target"); - return true; - } - - const body = await readJsonWebhookBodyOrReject({ - req, - res, - profile: "post-auth", - emptyObjectOnEmpty: false, - invalidJsonMessage: "invalid payload", - }); - if (!body.ok) { - return true; - } - - const parsed = parseGoogleChatInboundPayload(body.value, res); - if (!parsed.ok) { - return true; - } - parsedEvent = parsed.event; - addOnBearerToken = parsed.addOnBearerToken; - } else { - const body = await readJsonWebhookBodyOrReject({ - req, - res, - profile: "pre-auth", - emptyObjectOnEmpty: false, - invalidJsonMessage: "invalid payload", - }); - if (!body.ok) { - return true; - } - - const parsed = parseGoogleChatInboundPayload(body.value, res); - if (!parsed.ok) { - return true; - } - parsedEvent = parsed.event; - addOnBearerToken = parsed.addOnBearerToken; - - if (!addOnBearerToken) { - res.statusCode = 401; - res.end("unauthorized"); - return true; - } - - matchedTarget = await resolveGoogleChatWebhookTargetByBearer(targets, addOnBearerToken); - if (matchedTarget.kind === "none") { - res.statusCode = 401; - res.end("unauthorized"); - return true; - } - if (matchedTarget.kind === "ambiguous") { - res.statusCode = 401; - res.end("ambiguous webhook target"); - return true; - } - } - - if (!matchedTarget || !parsedEvent) { - res.statusCode = 401; - res.end("unauthorized"); - return true; - } - - const selected = matchedTarget.target; - selected.statusSink?.({ lastInboundAt: Date.now() }); - processGoogleChatEvent(parsedEvent, selected).catch((err) => { - selected.runtime.error?.( - `[${selected.account.accountId}] Google Chat webhook failed: ${String(err)}`, - ); - }); - - res.statusCode = 200; - res.setHeader("Content-Type", "application/json"); - res.end("{}"); - return true; - } finally { - requestLifecycle.release(); - } + return await googleChatWebhookRequestHandler(req, res); } async function processGoogleChatEvent(event: GoogleChatEvent, target: WebhookTarget) { @@ -365,98 +109,6 @@ async function processGoogleChatEvent(event: GoogleChatEvent, target: WebhookTar }); } -function normalizeUserId(raw?: string | null): string { - const trimmed = raw?.trim() ?? ""; - if (!trimmed) { - return ""; - } - return trimmed.replace(/^users\//i, "").toLowerCase(); -} - -function isEmailLike(value: string): boolean { - // Keep this intentionally loose; allowlists are user-provided config. - return value.includes("@"); -} - -export function isSenderAllowed( - senderId: string, - senderEmail: string | undefined, - allowFrom: string[], - allowNameMatching = false, -) { - if (allowFrom.includes("*")) { - return true; - } - const normalizedSenderId = normalizeUserId(senderId); - const normalizedEmail = senderEmail?.trim().toLowerCase() ?? ""; - return allowFrom.some((entry) => { - const normalized = String(entry).trim().toLowerCase(); - if (!normalized) { - return false; - } - - // Accept `googlechat:` but treat `users/...` as an *ID* only (deprecated `users/`). - const withoutPrefix = normalized.replace(/^(googlechat|google-chat|gchat):/i, ""); - if (withoutPrefix.startsWith("users/")) { - return normalizeUserId(withoutPrefix) === normalizedSenderId; - } - - // Raw email allowlist entries are a break-glass override. - if (allowNameMatching && normalizedEmail && isEmailLike(withoutPrefix)) { - return withoutPrefix === normalizedEmail; - } - - return withoutPrefix.replace(/^users\//i, "") === normalizedSenderId; - }); -} - -function resolveGroupConfig(params: { - groupId: string; - groupName?: string | null; - groups?: Record< - string, - { - requireMention?: boolean; - allow?: boolean; - enabled?: boolean; - users?: Array; - systemPrompt?: string; - } - >; -}) { - const { groupId, groupName, groups } = params; - const entries = groups ?? {}; - const keys = Object.keys(entries); - if (keys.length === 0) { - return { entry: undefined, allowlistConfigured: false }; - } - const normalizedName = groupName?.trim().toLowerCase(); - const candidates = [groupId, groupName ?? "", normalizedName ?? ""].filter(Boolean); - let entry = candidates.map((candidate) => entries[candidate]).find(Boolean); - if (!entry && normalizedName) { - entry = entries[normalizedName]; - } - const fallback = entries["*"]; - return { entry: entry ?? fallback, allowlistConfigured: true, fallback }; -} - -function extractMentionInfo(annotations: GoogleChatAnnotation[], botUser?: string | null) { - const mentionAnnotations = annotations.filter((entry) => entry.type === "USER_MENTION"); - const hasAnyMention = mentionAnnotations.length > 0; - const botTargets = new Set(["users/app", botUser?.trim()].filter(Boolean) as string[]); - const wasMentioned = mentionAnnotations.some((entry) => { - const userName = entry.userMention?.user?.name; - if (!userName) { - return false; - } - if (botTargets.has(userName)) { - return true; - } - return normalizeUserId(userName) === "app"; - }); - return { hasAnyMention, wasMentioned }; -} - /** * Resolve bot display name with fallback chain: * 1. Account config name @@ -489,11 +141,6 @@ async function processMessageWithPipeline(params: { mediaMaxMb: number; }): Promise { const { event, account, config, runtime, core, statusSink, mediaMaxMb } = params; - const pairing = createScopedPairingAccess({ - core, - channel: "googlechat", - accountId: account.accountId, - }); const space = event.space; const message = event.message; if (!space || !message) { @@ -510,7 +157,6 @@ async function processMessageWithPipeline(params: { const senderId = sender?.name ?? ""; const senderName = sender?.displayName ?? ""; const senderEmail = sender?.email ?? undefined; - const allowNameMatching = isDangerousNameMatchingEnabled(account.config); const allowBots = account.config.allowBots === true; if (!allowBots) { @@ -532,202 +178,24 @@ async function processMessageWithPipeline(params: { return; } - const defaultGroupPolicy = resolveDefaultGroupPolicy(config); - const { groupPolicy, providerMissingFallbackApplied } = - resolveAllowlistProviderRuntimeGroupPolicy({ - providerConfigPresent: config.channels?.googlechat !== undefined, - groupPolicy: account.config.groupPolicy, - defaultGroupPolicy, - }); - warnMissingProviderGroupPolicyFallbackOnce({ - providerMissingFallbackApplied, - providerKey: "googlechat", - accountId: account.accountId, - blockedLabel: GROUP_POLICY_BLOCKED_LABEL.space, - log: (message) => logVerbose(core, runtime, message), - }); - const groupConfigResolved = resolveGroupConfig({ - groupId: spaceId, - groupName: space.displayName ?? null, - groups: account.config.groups ?? undefined, - }); - const groupEntry = groupConfigResolved.entry; - const groupUsers = groupEntry?.users ?? account.config.groupAllowFrom ?? []; - let effectiveWasMentioned: boolean | undefined; - - if (isGroup) { - if (groupPolicy === "disabled") { - logVerbose(core, runtime, `drop group message (groupPolicy=disabled, space=${spaceId})`); - return; - } - const groupAllowlistConfigured = groupConfigResolved.allowlistConfigured; - const groupAllowed = Boolean(groupEntry) || Boolean((account.config.groups ?? {})["*"]); - if (groupPolicy === "allowlist") { - if (!groupAllowlistConfigured) { - logVerbose( - core, - runtime, - `drop group message (groupPolicy=allowlist, no allowlist, space=${spaceId})`, - ); - return; - } - if (!groupAllowed) { - logVerbose(core, runtime, `drop group message (not allowlisted, space=${spaceId})`); - return; - } - } - if (groupEntry?.enabled === false || groupEntry?.allow === false) { - logVerbose(core, runtime, `drop group message (space disabled, space=${spaceId})`); - return; - } - - if (groupUsers.length > 0) { - warnDeprecatedUsersEmailEntries( - core, - runtime, - groupUsers.map((v) => String(v)), - ); - const ok = isSenderAllowed( - senderId, - senderEmail, - groupUsers.map((v) => String(v)), - allowNameMatching, - ); - if (!ok) { - logVerbose(core, runtime, `drop group message (sender not allowed, ${senderId})`); - return; - } - } - } - - const dmPolicy = account.config.dm?.policy ?? "pairing"; - const configAllowFrom = (account.config.dm?.allowFrom ?? []).map((v) => String(v)); - const normalizedGroupUsers = groupUsers.map((v) => String(v)); - const senderGroupPolicy = - groupPolicy === "disabled" - ? "disabled" - : normalizedGroupUsers.length > 0 - ? "allowlist" - : "open"; - const shouldComputeAuth = core.channel.commands.shouldComputeCommandAuthorized(rawBody, config); - const storeAllowFrom = - !isGroup && dmPolicy !== "allowlist" && (dmPolicy !== "open" || shouldComputeAuth) - ? await pairing.readAllowFromStore().catch(() => []) - : []; - const access = resolveDmGroupAccessWithLists({ + const access = await applyGoogleChatInboundAccessPolicy({ + account, + config, + core, + space, + message, isGroup, - dmPolicy, - groupPolicy: senderGroupPolicy, - allowFrom: configAllowFrom, - groupAllowFrom: normalizedGroupUsers, - storeAllowFrom, - groupAllowFromFallbackToAllowFrom: false, - isSenderAllowed: (allowFrom) => - isSenderAllowed(senderId, senderEmail, allowFrom, allowNameMatching), - }); - const effectiveAllowFrom = access.effectiveAllowFrom; - const effectiveGroupAllowFrom = access.effectiveGroupAllowFrom; - warnDeprecatedUsersEmailEntries(core, runtime, effectiveAllowFrom); - const commandAllowFrom = isGroup ? effectiveGroupAllowFrom : effectiveAllowFrom; - const useAccessGroups = config.commands?.useAccessGroups !== false; - const senderAllowedForCommands = isSenderAllowed( senderId, + senderName, senderEmail, - commandAllowFrom, - allowNameMatching, - ); - const commandAuthorized = shouldComputeAuth - ? core.channel.commands.resolveCommandAuthorizedFromAuthorizers({ - useAccessGroups, - authorizers: [ - { configured: commandAllowFrom.length > 0, allowed: senderAllowedForCommands }, - ], - }) - : undefined; - - if (isGroup) { - const requireMention = groupEntry?.requireMention ?? account.config.requireMention ?? true; - const annotations = message.annotations ?? []; - const mentionInfo = extractMentionInfo(annotations, account.config.botUser); - const allowTextCommands = core.channel.commands.shouldHandleTextCommands({ - cfg: config, - surface: "googlechat", - }); - const mentionGate = resolveMentionGatingWithBypass({ - isGroup: true, - requireMention, - canDetectMention: true, - wasMentioned: mentionInfo.wasMentioned, - implicitMention: false, - hasAnyMention: mentionInfo.hasAnyMention, - allowTextCommands, - hasControlCommand: core.channel.text.hasControlCommand(rawBody, config), - commandAuthorized: commandAuthorized === true, - }); - effectiveWasMentioned = mentionGate.effectiveWasMentioned; - if (mentionGate.shouldSkip) { - logVerbose(core, runtime, `drop group message (mention required, space=${spaceId})`); - return; - } - } - - if (isGroup && access.decision !== "allow") { - logVerbose( - core, - runtime, - `drop group message (sender policy blocked, reason=${access.reason}, space=${spaceId})`, - ); - return; - } - - if (!isGroup) { - if (account.config.dm?.enabled === false) { - logVerbose(core, runtime, `Blocked Google Chat DM from ${senderId} (dmPolicy=disabled)`); - return; - } - - if (access.decision !== "allow") { - if (access.decision === "pairing") { - const { code, created } = await pairing.upsertPairingRequest({ - id: senderId, - meta: { name: senderName || undefined, email: senderEmail }, - }); - if (created) { - logVerbose(core, runtime, `googlechat pairing request sender=${senderId}`); - try { - await sendGoogleChatMessage({ - account, - space: spaceId, - text: core.channel.pairing.buildPairingReply({ - channel: "googlechat", - idLine: `Your Google Chat user id: ${senderId}`, - code, - }), - }); - statusSink?.({ lastOutboundAt: Date.now() }); - } catch (err) { - logVerbose(core, runtime, `pairing reply failed for ${senderId}: ${String(err)}`); - } - } - } else { - logVerbose( - core, - runtime, - `Blocked unauthorized Google Chat sender ${senderId} (dmPolicy=${dmPolicy})`, - ); - } - return; - } - } - - if ( - isGroup && - core.channel.commands.isControlCommandMessage(rawBody, config) && - commandAuthorized !== true - ) { - logVerbose(core, runtime, `googlechat: drop control command from ${senderId}`); + rawBody, + statusSink, + logVerbose: (message) => logVerbose(core, runtime, message), + }); + if (!access.ok) { return; } + const { commandAuthorized, effectiveWasMentioned, groupSystemPrompt } = access; const { route, buildEnvelope } = resolveInboundRouteEnvelopeBuilderWithRuntime({ cfg: config, @@ -762,8 +230,6 @@ async function processMessageWithPipeline(params: { body: rawBody, }); - const groupSystemPrompt = groupConfigResolved.entry?.systemPrompt?.trim() || undefined; - const ctxPayload = core.channel.reply.finalizeInboundContext({ Body: body, BodyForAgent: rawBody, diff --git a/scripts/check-webhook-auth-body-order.mjs b/scripts/check-webhook-auth-body-order.mjs index 282909db4b8..aa771cb8e13 100644 --- a/scripts/check-webhook-auth-body-order.mjs +++ b/scripts/check-webhook-auth-body-order.mjs @@ -9,6 +9,7 @@ const sourceRoots = ["extensions"]; const enforcedFiles = new Set([ "extensions/bluebubbles/src/monitor.ts", "extensions/googlechat/src/monitor.ts", + "extensions/zalo/src/monitor.webhook.ts", ]); const blockedCallees = new Set(["readJsonBodyWithLimit", "readRequestBodyWithLimit"]); diff --git a/src/plugin-sdk/index.ts b/src/plugin-sdk/index.ts index 6d77dce043a..81c64f69ab2 100644 --- a/src/plugin-sdk/index.ts +++ b/src/plugin-sdk/index.ts @@ -125,6 +125,8 @@ export { registerWebhookTarget, registerWebhookTargetWithPluginRoute, rejectNonPostWebhookRequest, + resolveWebhookTargetWithAuthOrReject, + resolveWebhookTargetWithAuthOrRejectSync, resolveSingleWebhookTarget, resolveSingleWebhookTargetAsync, resolveWebhookTargets, diff --git a/src/plugin-sdk/webhook-request-guards.ts b/src/plugin-sdk/webhook-request-guards.ts index ce447212ff7..a45df7c06dd 100644 --- a/src/plugin-sdk/webhook-request-guards.ts +++ b/src/plugin-sdk/webhook-request-guards.ts @@ -55,6 +55,32 @@ function resolveWebhookBodyReadLimits(params: { return { maxBytes, timeoutMs }; } +function respondWebhookBodyReadError(params: { + res: ServerResponse; + code: string; + invalidMessage?: string; +}): { ok: false } { + const { res, code, invalidMessage } = params; + if (code === "PAYLOAD_TOO_LARGE") { + res.statusCode = 413; + res.end(requestBodyErrorToText("PAYLOAD_TOO_LARGE")); + return { ok: false }; + } + if (code === "REQUEST_BODY_TIMEOUT") { + res.statusCode = 408; + res.end(requestBodyErrorToText("REQUEST_BODY_TIMEOUT")); + return { ok: false }; + } + if (code === "CONNECTION_CLOSED") { + res.statusCode = 400; + res.end(requestBodyErrorToText("CONNECTION_CLOSED")); + return { ok: false }; + } + res.statusCode = 400; + res.end(invalidMessage ?? "Bad Request"); + return { ok: false }; +} + export function createWebhookInFlightLimiter(options?: { maxInFlightPerKey?: number; maxTrackedKeys?: number; @@ -219,20 +245,18 @@ export async function readWebhookBodyOrReject(params: { return { ok: true, value: raw }; } catch (error) { if (isRequestBodyLimitError(error)) { - params.res.statusCode = - error.code === "PAYLOAD_TOO_LARGE" - ? 413 - : error.code === "REQUEST_BODY_TIMEOUT" - ? 408 - : 400; - params.res.end(requestBodyErrorToText(error.code)); - return { ok: false }; + return respondWebhookBodyReadError({ + res: params.res, + code: error.code, + invalidMessage: params.invalidBodyMessage, + }); } - params.res.statusCode = 400; - params.res.end( - params.invalidBodyMessage ?? (error instanceof Error ? error.message : String(error)), - ); - return { ok: false }; + return respondWebhookBodyReadError({ + res: params.res, + code: "INVALID_BODY", + invalidMessage: + params.invalidBodyMessage ?? (error instanceof Error ? error.message : String(error)), + }); } } @@ -258,15 +282,9 @@ export async function readJsonWebhookBodyOrReject(params: { if (body.ok) { return { ok: true, value: body.value }; } - - params.res.statusCode = - body.code === "PAYLOAD_TOO_LARGE" ? 413 : body.code === "REQUEST_BODY_TIMEOUT" ? 408 : 400; - const message = - body.code === "PAYLOAD_TOO_LARGE" - ? requestBodyErrorToText("PAYLOAD_TOO_LARGE") - : body.code === "REQUEST_BODY_TIMEOUT" - ? requestBodyErrorToText("REQUEST_BODY_TIMEOUT") - : (params.invalidJsonMessage ?? "Bad Request"); - params.res.end(message); - return { ok: false }; + return respondWebhookBodyReadError({ + res: params.res, + code: body.code, + invalidMessage: params.invalidJsonMessage, + }); } diff --git a/src/plugin-sdk/webhook-targets.test.ts b/src/plugin-sdk/webhook-targets.test.ts index d18cb6b22e6..4f428f5b477 100644 --- a/src/plugin-sdk/webhook-targets.test.ts +++ b/src/plugin-sdk/webhook-targets.test.ts @@ -9,6 +9,8 @@ import { rejectNonPostWebhookRequest, resolveSingleWebhookTarget, resolveSingleWebhookTargetAsync, + resolveWebhookTargetWithAuthOrReject, + resolveWebhookTargetWithAuthOrRejectSync, resolveWebhookTargets, } from "./webhook-targets.js"; @@ -212,3 +214,72 @@ describe("resolveSingleWebhookTarget", () => { expect(calls).toEqual(["a", "b"]); }); }); + +describe("resolveWebhookTargetWithAuthOrReject", () => { + it("returns matched target", async () => { + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: vi.fn(), + } as unknown as ServerResponse; + await expect( + resolveWebhookTargetWithAuthOrReject({ + targets: [{ id: "a" }, { id: "b" }], + res, + isMatch: (target) => target.id === "b", + }), + ).resolves.toEqual({ id: "b" }); + }); + + it("writes unauthorized response on no match", async () => { + const endMock = vi.fn(); + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: endMock, + } as unknown as ServerResponse; + await expect( + resolveWebhookTargetWithAuthOrReject({ + targets: [{ id: "a" }], + res, + isMatch: () => false, + }), + ).resolves.toBeNull(); + expect(res.statusCode).toBe(401); + expect(endMock).toHaveBeenCalledWith("unauthorized"); + }); + + it("writes ambiguous response on multi-match", async () => { + const endMock = vi.fn(); + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: endMock, + } as unknown as ServerResponse; + await expect( + resolveWebhookTargetWithAuthOrReject({ + targets: [{ id: "a" }, { id: "b" }], + res, + isMatch: () => true, + }), + ).resolves.toBeNull(); + expect(res.statusCode).toBe(401); + expect(endMock).toHaveBeenCalledWith("ambiguous webhook target"); + }); +}); + +describe("resolveWebhookTargetWithAuthOrRejectSync", () => { + it("returns matched target synchronously", () => { + const res = { + statusCode: 200, + setHeader: vi.fn(), + end: vi.fn(), + } as unknown as ServerResponse; + const target = resolveWebhookTargetWithAuthOrRejectSync({ + targets: [{ id: "a" }, { id: "b" }], + res, + isMatch: (entry) => entry.id === "a", + }); + expect(target).toEqual({ id: "a" }); + }); +}); diff --git a/src/plugin-sdk/webhook-targets.ts b/src/plugin-sdk/webhook-targets.ts index 5d5f4200d23..de90c398667 100644 --- a/src/plugin-sdk/webhook-targets.ts +++ b/src/plugin-sdk/webhook-targets.ts @@ -152,6 +152,57 @@ export async function resolveSingleWebhookTargetAsync( return { kind: "single", target: matched }; } +export async function resolveWebhookTargetWithAuthOrReject(params: { + targets: readonly T[]; + res: ServerResponse; + isMatch: (target: T) => boolean | Promise; + unauthorizedStatusCode?: number; + unauthorizedMessage?: string; + ambiguousStatusCode?: number; + ambiguousMessage?: string; +}): Promise { + const match = await resolveSingleWebhookTargetAsync(params.targets, async (target) => + Boolean(await params.isMatch(target)), + ); + return resolveWebhookTargetMatchOrReject(params, match); +} + +export function resolveWebhookTargetWithAuthOrRejectSync(params: { + targets: readonly T[]; + res: ServerResponse; + isMatch: (target: T) => boolean; + unauthorizedStatusCode?: number; + unauthorizedMessage?: string; + ambiguousStatusCode?: number; + ambiguousMessage?: string; +}): T | null { + const match = resolveSingleWebhookTarget(params.targets, params.isMatch); + return resolveWebhookTargetMatchOrReject(params, match); +} + +function resolveWebhookTargetMatchOrReject( + params: { + res: ServerResponse; + unauthorizedStatusCode?: number; + unauthorizedMessage?: string; + ambiguousStatusCode?: number; + ambiguousMessage?: string; + }, + match: WebhookTargetMatchResult, +): T | null { + if (match.kind === "single") { + return match.target; + } + if (match.kind === "ambiguous") { + params.res.statusCode = params.ambiguousStatusCode ?? 401; + params.res.end(params.ambiguousMessage ?? "ambiguous webhook target"); + return null; + } + params.res.statusCode = params.unauthorizedStatusCode ?? 401; + params.res.end(params.unauthorizedMessage ?? "unauthorized"); + return null; +} + export function rejectNonPostWebhookRequest(req: IncomingMessage, res: ServerResponse): boolean { if (req.method === "POST") { return false;