refactor: split webhook ingress and policy guards

This commit is contained in:
Peter Steinberger
2026-03-02 18:02:10 +00:00
parent fc0d374390
commit 1c9deeda97
11 changed files with 1026 additions and 819 deletions

View File

@@ -0,0 +1,205 @@
import type { OpenClawConfig } from "openclaw/plugin-sdk";
import type { NormalizedWebhookMessage } from "./monitor-normalize.js";
import type { BlueBubblesCoreRuntime, WebhookTarget } from "./monitor-shared.js";
/**
* Entry type for debouncing inbound messages.
* Captures the normalized message and its target for later combined processing.
*/
type BlueBubblesDebounceEntry = {
message: NormalizedWebhookMessage;
target: WebhookTarget;
};
export type BlueBubblesDebouncer = {
enqueue: (item: BlueBubblesDebounceEntry) => Promise<void>;
flushKey: (key: string) => Promise<void>;
};
export type BlueBubblesDebounceRegistry = {
getOrCreateDebouncer: (target: WebhookTarget) => BlueBubblesDebouncer;
removeDebouncer: (target: WebhookTarget) => void;
};
/**
* Default debounce window for inbound message coalescing (ms).
* This helps combine URL text + link preview balloon messages that BlueBubbles
* sends as separate webhook events when no explicit inbound debounce config exists.
*/
const DEFAULT_INBOUND_DEBOUNCE_MS = 500;
/**
* Combines multiple debounced messages into a single message for processing.
* Used when multiple webhook events arrive within the debounce window.
*/
function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage {
if (entries.length === 0) {
throw new Error("Cannot combine empty entries");
}
if (entries.length === 1) {
return entries[0].message;
}
// Use the first message as the base (typically the text message)
const first = entries[0].message;
// Combine text from all entries, filtering out duplicates and empty strings
const seenTexts = new Set<string>();
const textParts: string[] = [];
for (const entry of entries) {
const text = entry.message.text.trim();
if (!text) {
continue;
}
// Skip duplicate text (URL might be in both text message and balloon)
const normalizedText = text.toLowerCase();
if (seenTexts.has(normalizedText)) {
continue;
}
seenTexts.add(normalizedText);
textParts.push(text);
}
// Merge attachments from all entries
const allAttachments = entries.flatMap((e) => e.message.attachments ?? []);
// Use the latest timestamp
const timestamps = entries
.map((e) => e.message.timestamp)
.filter((t): t is number => typeof t === "number");
const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp;
// Collect all message IDs for reference
const messageIds = entries
.map((e) => e.message.messageId)
.filter((id): id is string => Boolean(id));
// Prefer reply context from any entry that has it
const entryWithReply = entries.find((e) => e.message.replyToId);
return {
...first,
text: textParts.join(" "),
attachments: allAttachments.length > 0 ? allAttachments : first.attachments,
timestamp: latestTimestamp,
// Use first message's ID as primary (for reply reference), but we've coalesced others
messageId: messageIds[0] ?? first.messageId,
// Preserve reply context if present
replyToId: entryWithReply?.message.replyToId ?? first.replyToId,
replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody,
replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender,
// Clear balloonBundleId since we've combined (the combined message is no longer just a balloon)
balloonBundleId: undefined,
};
}
function resolveBlueBubblesDebounceMs(
config: OpenClawConfig,
core: BlueBubblesCoreRuntime,
): number {
const inbound = config.messages?.inbound;
const hasExplicitDebounce =
typeof inbound?.debounceMs === "number" || typeof inbound?.byChannel?.bluebubbles === "number";
if (!hasExplicitDebounce) {
return DEFAULT_INBOUND_DEBOUNCE_MS;
}
return core.channel.debounce.resolveInboundDebounceMs({ cfg: config, channel: "bluebubbles" });
}
export function createBlueBubblesDebounceRegistry(params: {
processMessage: (message: NormalizedWebhookMessage, target: WebhookTarget) => Promise<void>;
}): BlueBubblesDebounceRegistry {
const targetDebouncers = new Map<WebhookTarget, BlueBubblesDebouncer>();
return {
getOrCreateDebouncer: (target) => {
const existing = targetDebouncers.get(target);
if (existing) {
return existing;
}
const { account, config, runtime, core } = target;
const debouncer = core.channel.debounce.createInboundDebouncer<BlueBubblesDebounceEntry>({
debounceMs: resolveBlueBubblesDebounceMs(config, core),
buildKey: (entry) => {
const msg = entry.message;
// Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the
// same message (e.g., text-only then text+attachment).
//
// For balloons (URL previews, stickers, etc), BlueBubbles often uses a different
// messageId than the originating text. When present, key by associatedMessageGuid
// to keep text + balloon coalescing working.
const balloonBundleId = msg.balloonBundleId?.trim();
const associatedMessageGuid = msg.associatedMessageGuid?.trim();
if (balloonBundleId && associatedMessageGuid) {
return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`;
}
const messageId = msg.messageId?.trim();
if (messageId) {
return `bluebubbles:${account.accountId}:msg:${messageId}`;
}
const chatKey =
msg.chatGuid?.trim() ??
msg.chatIdentifier?.trim() ??
(msg.chatId ? String(msg.chatId) : "dm");
return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`;
},
shouldDebounce: (entry) => {
const msg = entry.message;
// Skip debouncing for from-me messages (they're just cached, not processed)
if (msg.fromMe) {
return false;
}
// Skip debouncing for control commands - process immediately
if (core.channel.text.hasControlCommand(msg.text, config)) {
return false;
}
// Debounce all other messages to coalesce rapid-fire webhook events
// (e.g., text+image arriving as separate webhooks for the same messageId)
return true;
},
onFlush: async (entries) => {
if (entries.length === 0) {
return;
}
// Use target from first entry (all entries have same target due to key structure)
const flushTarget = entries[0].target;
if (entries.length === 1) {
// Single message - process normally
await params.processMessage(entries[0].message, flushTarget);
return;
}
// Multiple messages - combine and process
const combined = combineDebounceEntries(entries);
if (core.logging.shouldLogVerbose()) {
const count = entries.length;
const preview = combined.text.slice(0, 50);
runtime.log?.(
`[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`,
);
}
await params.processMessage(combined, flushTarget);
},
onError: (err) => {
runtime.error?.(
`[${account.accountId}] [bluebubbles] debounce flush failed: ${String(err)}`,
);
},
});
targetDebouncers.set(target, debouncer);
return debouncer;
},
removeDebouncer: (target) => {
targetDebouncers.delete(target);
},
};
}

View File

@@ -1,19 +1,15 @@
import { timingSafeEqual } from "node:crypto";
import type { IncomingMessage, ServerResponse } from "node:http";
import type { OpenClawConfig } from "openclaw/plugin-sdk";
import {
beginWebhookRequestPipelineOrReject,
createWebhookInFlightLimiter,
registerWebhookTargetWithPluginRoute,
readWebhookBodyOrReject,
resolveSingleWebhookTarget,
resolveWebhookTargetWithAuthOrRejectSync,
resolveWebhookTargets,
} from "openclaw/plugin-sdk";
import {
normalizeWebhookMessage,
normalizeWebhookReaction,
type NormalizedWebhookMessage,
} from "./monitor-normalize.js";
import { createBlueBubblesDebounceRegistry } from "./monitor-debounce.js";
import { normalizeWebhookMessage, normalizeWebhookReaction } from "./monitor-normalize.js";
import { logVerbose, processMessage, processReaction } from "./monitor-processing.js";
import {
_resetBlueBubblesShortIdState,
@@ -23,216 +19,15 @@ import {
DEFAULT_WEBHOOK_PATH,
normalizeWebhookPath,
resolveWebhookPathFromConfig,
type BlueBubblesCoreRuntime,
type BlueBubblesMonitorOptions,
type WebhookTarget,
} from "./monitor-shared.js";
import { fetchBlueBubblesServerInfo } from "./probe.js";
import { getBlueBubblesRuntime } from "./runtime.js";
/**
* Entry type for debouncing inbound messages.
* Captures the normalized message and its target for later combined processing.
*/
type BlueBubblesDebounceEntry = {
message: NormalizedWebhookMessage;
target: WebhookTarget;
};
/**
* Default debounce window for inbound message coalescing (ms).
* This helps combine URL text + link preview balloon messages that BlueBubbles
* sends as separate webhook events when no explicit inbound debounce config exists.
*/
const DEFAULT_INBOUND_DEBOUNCE_MS = 500;
/**
* Combines multiple debounced messages into a single message for processing.
* Used when multiple webhook events arrive within the debounce window.
*/
function combineDebounceEntries(entries: BlueBubblesDebounceEntry[]): NormalizedWebhookMessage {
if (entries.length === 0) {
throw new Error("Cannot combine empty entries");
}
if (entries.length === 1) {
return entries[0].message;
}
// Use the first message as the base (typically the text message)
const first = entries[0].message;
// Combine text from all entries, filtering out duplicates and empty strings
const seenTexts = new Set<string>();
const textParts: string[] = [];
for (const entry of entries) {
const text = entry.message.text.trim();
if (!text) {
continue;
}
// Skip duplicate text (URL might be in both text message and balloon)
const normalizedText = text.toLowerCase();
if (seenTexts.has(normalizedText)) {
continue;
}
seenTexts.add(normalizedText);
textParts.push(text);
}
// Merge attachments from all entries
const allAttachments = entries.flatMap((e) => e.message.attachments ?? []);
// Use the latest timestamp
const timestamps = entries
.map((e) => e.message.timestamp)
.filter((t): t is number => typeof t === "number");
const latestTimestamp = timestamps.length > 0 ? Math.max(...timestamps) : first.timestamp;
// Collect all message IDs for reference
const messageIds = entries
.map((e) => e.message.messageId)
.filter((id): id is string => Boolean(id));
// Prefer reply context from any entry that has it
const entryWithReply = entries.find((e) => e.message.replyToId);
return {
...first,
text: textParts.join(" "),
attachments: allAttachments.length > 0 ? allAttachments : first.attachments,
timestamp: latestTimestamp,
// Use first message's ID as primary (for reply reference), but we've coalesced others
messageId: messageIds[0] ?? first.messageId,
// Preserve reply context if present
replyToId: entryWithReply?.message.replyToId ?? first.replyToId,
replyToBody: entryWithReply?.message.replyToBody ?? first.replyToBody,
replyToSender: entryWithReply?.message.replyToSender ?? first.replyToSender,
// Clear balloonBundleId since we've combined (the combined message is no longer just a balloon)
balloonBundleId: undefined,
};
}
const webhookTargets = new Map<string, WebhookTarget[]>();
const webhookInFlightLimiter = createWebhookInFlightLimiter();
type BlueBubblesDebouncer = {
enqueue: (item: BlueBubblesDebounceEntry) => Promise<void>;
flushKey: (key: string) => Promise<void>;
};
/**
* Maps webhook targets to their inbound debouncers.
* Each target gets its own debouncer keyed by a unique identifier.
*/
const targetDebouncers = new Map<WebhookTarget, BlueBubblesDebouncer>();
function resolveBlueBubblesDebounceMs(
config: OpenClawConfig,
core: BlueBubblesCoreRuntime,
): number {
const inbound = config.messages?.inbound;
const hasExplicitDebounce =
typeof inbound?.debounceMs === "number" || typeof inbound?.byChannel?.bluebubbles === "number";
if (!hasExplicitDebounce) {
return DEFAULT_INBOUND_DEBOUNCE_MS;
}
return core.channel.debounce.resolveInboundDebounceMs({ cfg: config, channel: "bluebubbles" });
}
/**
* Creates or retrieves a debouncer for a webhook target.
*/
function getOrCreateDebouncer(target: WebhookTarget) {
const existing = targetDebouncers.get(target);
if (existing) {
return existing;
}
const { account, config, runtime, core } = target;
const debouncer = core.channel.debounce.createInboundDebouncer<BlueBubblesDebounceEntry>({
debounceMs: resolveBlueBubblesDebounceMs(config, core),
buildKey: (entry) => {
const msg = entry.message;
// Prefer stable, shared identifiers to coalesce rapid-fire webhook events for the
// same message (e.g., text-only then text+attachment).
//
// For balloons (URL previews, stickers, etc), BlueBubbles often uses a different
// messageId than the originating text. When present, key by associatedMessageGuid
// to keep text + balloon coalescing working.
const balloonBundleId = msg.balloonBundleId?.trim();
const associatedMessageGuid = msg.associatedMessageGuid?.trim();
if (balloonBundleId && associatedMessageGuid) {
return `bluebubbles:${account.accountId}:balloon:${associatedMessageGuid}`;
}
const messageId = msg.messageId?.trim();
if (messageId) {
return `bluebubbles:${account.accountId}:msg:${messageId}`;
}
const chatKey =
msg.chatGuid?.trim() ??
msg.chatIdentifier?.trim() ??
(msg.chatId ? String(msg.chatId) : "dm");
return `bluebubbles:${account.accountId}:${chatKey}:${msg.senderId}`;
},
shouldDebounce: (entry) => {
const msg = entry.message;
// Skip debouncing for from-me messages (they're just cached, not processed)
if (msg.fromMe) {
return false;
}
// Skip debouncing for control commands - process immediately
if (core.channel.text.hasControlCommand(msg.text, config)) {
return false;
}
// Debounce all other messages to coalesce rapid-fire webhook events
// (e.g., text+image arriving as separate webhooks for the same messageId)
return true;
},
onFlush: async (entries) => {
if (entries.length === 0) {
return;
}
// Use target from first entry (all entries have same target due to key structure)
const flushTarget = entries[0].target;
if (entries.length === 1) {
// Single message - process normally
await processMessage(entries[0].message, flushTarget);
return;
}
// Multiple messages - combine and process
const combined = combineDebounceEntries(entries);
if (core.logging.shouldLogVerbose()) {
const count = entries.length;
const preview = combined.text.slice(0, 50);
runtime.log?.(
`[bluebubbles] coalesced ${count} messages: "${preview}${combined.text.length > 50 ? "..." : ""}"`,
);
}
await processMessage(combined, flushTarget);
},
onError: (err) => {
runtime.error?.(`[${account.accountId}] [bluebubbles] debounce flush failed: ${String(err)}`);
},
});
targetDebouncers.set(target, debouncer);
return debouncer;
}
/**
* Removes a debouncer for a target (called during unregistration).
*/
function removeDebouncer(target: WebhookTarget): void {
targetDebouncers.delete(target);
}
const debounceRegistry = createBlueBubblesDebounceRegistry({ processMessage });
export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => void {
const registered = registerWebhookTargetWithPluginRoute({
@@ -258,7 +53,7 @@ export function registerBlueBubblesWebhookTarget(target: WebhookTarget): () => v
return () => {
registered.unregister();
// Clean up debouncer when target is unregistered
removeDebouncer(registered.target);
debounceRegistry.removeDebouncer(registered.target);
};
}
@@ -352,28 +147,20 @@ export async function handleBlueBubblesWebhookRequest(
req.headers["x-bluebubbles-guid"] ??
req.headers["authorization"];
const guid = (Array.isArray(headerToken) ? headerToken[0] : headerToken) ?? guidParam ?? "";
const matchedTarget = resolveSingleWebhookTarget(targets, (target) => {
const token = target.account.config.password?.trim() ?? "";
return safeEqualSecret(guid, token);
const target = resolveWebhookTargetWithAuthOrRejectSync({
targets,
res,
isMatch: (target) => {
const token = target.account.config.password?.trim() ?? "";
return safeEqualSecret(guid, token);
},
});
if (matchedTarget.kind === "none") {
res.statusCode = 401;
res.end("unauthorized");
if (!target) {
console.warn(
`[bluebubbles] webhook rejected: unauthorized guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`,
`[bluebubbles] webhook rejected: status=${res.statusCode} path=${path} guid=${maskSecret(url.searchParams.get("guid") ?? url.searchParams.get("password") ?? "")}`,
);
return true;
}
if (matchedTarget.kind === "ambiguous") {
res.statusCode = 401;
res.end("ambiguous webhook target");
console.warn(`[bluebubbles] webhook rejected: ambiguous target match path=${path}`);
return true;
}
const target = matchedTarget.target;
const body = await readWebhookBodyOrReject({
req,
res,
@@ -454,7 +241,7 @@ export async function handleBlueBubblesWebhookRequest(
} else if (message) {
// Route messages through debouncer to coalesce rapid-fire events
// (e.g., text message + URL balloon arriving as separate webhooks)
const debouncer = getOrCreateDebouncer(target);
const debouncer = debounceRegistry.getOrCreateDebouncer(target);
debouncer.enqueue({ message, target }).catch((err) => {
target.runtime.error?.(
`[${target.account.accountId}] BlueBubbles webhook failed: ${String(err)}`,