Files
openclaw/src/signal/monitor/event-handler.ts
zerone0x 1d46ca3a95 fix(signal): enforce mention gating for group messages (#13124)
* fix(signal): enforce mention gating for group messages

Signal group messages bypassed mention gating, causing the bot to reply
even when requireMention was enabled and the message did not mention
the bot. This aligns Signal with Slack, Discord, Telegram, and iMessage
which all enforce mention gating correctly.

Fixes #13106

Co-Authored-By: Claude <noreply@anthropic.com>

* fix(signal): keep pending history context for mention-gated skips (#13124) (thanks @zerone0x)

---------

Co-authored-by: Yansu <no-reply@yansu.ai>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
2026-02-09 23:19:07 -06:00

659 lines
22 KiB
TypeScript

import type { SignalEventHandlerDeps, SignalReceivePayload } from "./event-handler.types.js";
import { resolveHumanDelayConfig } from "../../agents/identity.js";
import { hasControlCommand } from "../../auto-reply/command-detection.js";
import { dispatchInboundMessage } from "../../auto-reply/dispatch.js";
import {
formatInboundEnvelope,
formatInboundFromLabel,
resolveEnvelopeFormatOptions,
} from "../../auto-reply/envelope.js";
import {
createInboundDebouncer,
resolveInboundDebounceMs,
} from "../../auto-reply/inbound-debounce.js";
import {
buildPendingHistoryContextFromMap,
clearHistoryEntriesIfEnabled,
recordPendingHistoryEntryIfEnabled,
} from "../../auto-reply/reply/history.js";
import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js";
import { buildMentionRegexes, matchesMentionPatterns } from "../../auto-reply/reply/mentions.js";
import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js";
import { resolveControlCommandGate } from "../../channels/command-gating.js";
import { logInboundDrop, logTypingFailure } from "../../channels/logging.js";
import { resolveMentionGatingWithBypass } from "../../channels/mention-gating.js";
import { createReplyPrefixOptions } from "../../channels/reply-prefix.js";
import { recordInboundSession } from "../../channels/session.js";
import { createTypingCallbacks } from "../../channels/typing.js";
import { resolveChannelGroupRequireMention } from "../../config/group-policy.js";
import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js";
import { danger, logVerbose, shouldLogVerbose } from "../../globals.js";
import { enqueueSystemEvent } from "../../infra/system-events.js";
import { mediaKindFromMime } from "../../media/constants.js";
import { buildPairingReply } from "../../pairing/pairing-messages.js";
import {
readChannelAllowFromStore,
upsertChannelPairingRequest,
} from "../../pairing/pairing-store.js";
import { resolveAgentRoute } from "../../routing/resolve-route.js";
import { normalizeE164 } from "../../utils.js";
import {
formatSignalPairingIdLine,
formatSignalSenderDisplay,
formatSignalSenderId,
isSignalSenderAllowed,
resolveSignalPeerId,
resolveSignalRecipient,
resolveSignalSender,
} from "../identity.js";
import { sendMessageSignal, sendReadReceiptSignal, sendTypingSignal } from "../send.js";
export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
const inboundDebounceMs = resolveInboundDebounceMs({ cfg: deps.cfg, channel: "signal" });
type SignalInboundEntry = {
senderName: string;
senderDisplay: string;
senderRecipient: string;
senderPeerId: string;
groupId?: string;
groupName?: string;
isGroup: boolean;
bodyText: string;
timestamp?: number;
messageId?: string;
mediaPath?: string;
mediaType?: string;
commandAuthorized: boolean;
wasMentioned?: boolean;
};
async function handleSignalInboundMessage(entry: SignalInboundEntry) {
const fromLabel = formatInboundFromLabel({
isGroup: entry.isGroup,
groupLabel: entry.groupName ?? undefined,
groupId: entry.groupId ?? "unknown",
groupFallback: "Group",
directLabel: entry.senderName,
directId: entry.senderDisplay,
});
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: entry.isGroup ? "group" : "direct",
id: entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId,
},
});
const storePath = resolveStorePath(deps.cfg.session?.store, {
agentId: route.agentId,
});
const envelopeOptions = resolveEnvelopeFormatOptions(deps.cfg);
const previousTimestamp = readSessionUpdatedAt({
storePath,
sessionKey: route.sessionKey,
});
const body = formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: entry.timestamp ?? undefined,
body: entry.bodyText,
chatType: entry.isGroup ? "group" : "direct",
sender: { name: entry.senderName, id: entry.senderDisplay },
previousTimestamp,
envelope: envelopeOptions,
});
let combinedBody = body;
const historyKey = entry.isGroup ? String(entry.groupId ?? "unknown") : undefined;
if (entry.isGroup && historyKey) {
combinedBody = buildPendingHistoryContextFromMap({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
currentMessage: combinedBody,
formatEntry: (historyEntry) =>
formatInboundEnvelope({
channel: "Signal",
from: fromLabel,
timestamp: historyEntry.timestamp,
body: `${historyEntry.body}${
historyEntry.messageId ? ` [id:${historyEntry.messageId}]` : ""
}`,
chatType: "group",
senderLabel: historyEntry.sender,
envelope: envelopeOptions,
}),
});
}
const signalTo = entry.isGroup ? `group:${entry.groupId}` : `signal:${entry.senderRecipient}`;
const ctxPayload = finalizeInboundContext({
Body: combinedBody,
RawBody: entry.bodyText,
CommandBody: entry.bodyText,
From: entry.isGroup
? `group:${entry.groupId ?? "unknown"}`
: `signal:${entry.senderRecipient}`,
To: signalTo,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: entry.isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
GroupSubject: entry.isGroup ? (entry.groupName ?? undefined) : undefined,
SenderName: entry.senderName,
SenderId: entry.senderDisplay,
Provider: "signal" as const,
Surface: "signal" as const,
MessageSid: entry.messageId,
Timestamp: entry.timestamp ?? undefined,
MediaPath: entry.mediaPath,
MediaType: entry.mediaType,
MediaUrl: entry.mediaPath,
WasMentioned: entry.isGroup ? entry.wasMentioned === true : undefined,
CommandAuthorized: entry.commandAuthorized,
OriginatingChannel: "signal" as const,
OriginatingTo: signalTo,
});
await recordInboundSession({
storePath,
sessionKey: ctxPayload.SessionKey ?? route.sessionKey,
ctx: ctxPayload,
updateLastRoute: !entry.isGroup
? {
sessionKey: route.mainSessionKey,
channel: "signal",
to: entry.senderRecipient,
accountId: route.accountId,
}
: undefined,
onRecordError: (err) => {
logVerbose(`signal: failed updating session meta: ${String(err)}`);
},
});
if (shouldLogVerbose()) {
const preview = body.slice(0, 200).replace(/\\n/g, "\\\\n");
logVerbose(`signal inbound: from=${ctxPayload.From} len=${body.length} preview="${preview}"`);
}
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
cfg: deps.cfg,
agentId: route.agentId,
channel: "signal",
accountId: route.accountId,
});
const typingCallbacks = createTypingCallbacks({
start: async () => {
if (!ctxPayload.To) {
return;
}
await sendTypingSignal(ctxPayload.To, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
},
onStartError: (err) => {
logTypingFailure({
log: logVerbose,
channel: "signal",
target: ctxPayload.To ?? undefined,
error: err,
});
},
});
const { dispatcher, replyOptions, markDispatchIdle } = createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: resolveHumanDelayConfig(deps.cfg, route.agentId),
deliver: async (payload) => {
await deps.deliverReplies({
replies: [payload],
target: ctxPayload.To,
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
runtime: deps.runtime,
maxBytes: deps.mediaMaxBytes,
textLimit: deps.textLimit,
});
},
onError: (err, info) => {
deps.runtime.error?.(danger(`signal ${info.kind} reply failed: ${String(err)}`));
},
onReplyStart: typingCallbacks.onReplyStart,
});
const { queuedFinal } = await dispatchInboundMessage({
ctx: ctxPayload,
cfg: deps.cfg,
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof deps.blockStreaming === "boolean" ? !deps.blockStreaming : undefined,
onModelSelected,
},
});
markDispatchIdle();
if (!queuedFinal) {
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
return;
}
if (entry.isGroup && historyKey) {
clearHistoryEntriesIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
});
}
}
const inboundDebouncer = createInboundDebouncer<SignalInboundEntry>({
debounceMs: inboundDebounceMs,
buildKey: (entry) => {
const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId;
if (!conversationId || !entry.senderPeerId) {
return null;
}
return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`;
},
shouldDebounce: (entry) => {
if (!entry.bodyText.trim()) {
return false;
}
if (entry.mediaPath || entry.mediaType) {
return false;
}
return !hasControlCommand(entry.bodyText, deps.cfg);
},
onFlush: async (entries) => {
const last = entries.at(-1);
if (!last) {
return;
}
if (entries.length === 1) {
await handleSignalInboundMessage(last);
return;
}
const combinedText = entries
.map((entry) => entry.bodyText)
.filter(Boolean)
.join("\\n");
if (!combinedText.trim()) {
return;
}
await handleSignalInboundMessage({
...last,
bodyText: combinedText,
mediaPath: undefined,
mediaType: undefined,
});
},
onError: (err) => {
deps.runtime.error?.(`signal debounce flush failed: ${String(err)}`);
},
});
return async (event: { event?: string; data?: string }) => {
if (event.event !== "receive" || !event.data) {
return;
}
let payload: SignalReceivePayload | null = null;
try {
payload = JSON.parse(event.data) as SignalReceivePayload;
} catch (err) {
deps.runtime.error?.(`failed to parse event: ${String(err)}`);
return;
}
if (payload?.exception?.message) {
deps.runtime.error?.(`receive exception: ${payload.exception.message}`);
}
const envelope = payload?.envelope;
if (!envelope) {
return;
}
if (envelope.syncMessage) {
return;
}
const sender = resolveSignalSender(envelope);
if (!sender) {
return;
}
if (deps.account && sender.kind === "phone") {
if (sender.e164 === normalizeE164(deps.account)) {
return;
}
}
const dataMessage = envelope.dataMessage ?? envelope.editMessage?.dataMessage;
const reaction = deps.isSignalReactionMessage(envelope.reactionMessage)
? envelope.reactionMessage
: deps.isSignalReactionMessage(dataMessage?.reaction)
? dataMessage?.reaction
: null;
const messageText = (dataMessage?.message ?? "").trim();
const quoteText = dataMessage?.quote?.text?.trim() ?? "";
const hasBodyContent =
Boolean(messageText || quoteText) || Boolean(!reaction && dataMessage?.attachments?.length);
if (reaction && !hasBodyContent) {
if (reaction.isRemove) {
return;
} // Ignore reaction removals
const emojiLabel = reaction.emoji?.trim() || "emoji";
const senderDisplay = formatSignalSenderDisplay(sender);
const senderName = envelope.sourceName ?? senderDisplay;
logVerbose(`signal reaction: ${emojiLabel} from ${senderName}`);
const targets = deps.resolveSignalReactionTargets(reaction);
const shouldNotify = deps.shouldEmitSignalReactionNotification({
mode: deps.reactionMode,
account: deps.account,
targets,
sender,
allowlist: deps.reactionAllowlist,
});
if (!shouldNotify) {
return;
}
const groupId = reaction.groupInfo?.groupId ?? undefined;
const groupName = reaction.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const senderPeerId = resolveSignalPeerId(sender);
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupId ?? "unknown") : senderPeerId,
},
});
const groupLabel = isGroup ? `${groupName ?? "Signal Group"} id:${groupId}` : undefined;
const messageId = reaction.targetSentTimestamp
? String(reaction.targetSentTimestamp)
: "unknown";
const text = deps.buildSignalReactionSystemEventText({
emojiLabel,
actorLabel: senderName,
messageId,
targetLabel: targets[0]?.display,
groupLabel,
});
const senderId = formatSignalSenderId(sender);
const contextKey = [
"signal",
"reaction",
"added",
messageId,
senderId,
emojiLabel,
groupId ?? "",
]
.filter(Boolean)
.join(":");
enqueueSystemEvent(text, { sessionKey: route.sessionKey, contextKey });
return;
}
if (!dataMessage) {
return;
}
const senderDisplay = formatSignalSenderDisplay(sender);
const senderRecipient = resolveSignalRecipient(sender);
const senderPeerId = resolveSignalPeerId(sender);
const senderAllowId = formatSignalSenderId(sender);
if (!senderRecipient) {
return;
}
const senderIdLine = formatSignalPairingIdLine(sender);
const groupId = dataMessage.groupInfo?.groupId ?? undefined;
const groupName = dataMessage.groupInfo?.groupName ?? undefined;
const isGroup = Boolean(groupId);
const storeAllowFrom = await readChannelAllowFromStore("signal").catch(() => []);
const effectiveDmAllow = [...deps.allowFrom, ...storeAllowFrom];
const effectiveGroupAllow = [...deps.groupAllowFrom, ...storeAllowFrom];
const dmAllowed =
deps.dmPolicy === "open" ? true : isSignalSenderAllowed(sender, effectiveDmAllow);
if (!isGroup) {
if (deps.dmPolicy === "disabled") {
return;
}
if (!dmAllowed) {
if (deps.dmPolicy === "pairing") {
const senderId = senderAllowId;
const { code, created } = await upsertChannelPairingRequest({
channel: "signal",
id: senderId,
meta: { name: envelope.sourceName ?? undefined },
});
if (created) {
logVerbose(`signal pairing request sender=${senderId}`);
try {
await sendMessageSignal(
`signal:${senderRecipient}`,
buildPairingReply({
channel: "signal",
idLine: senderIdLine,
code,
}),
{
baseUrl: deps.baseUrl,
account: deps.account,
maxBytes: deps.mediaMaxBytes,
accountId: deps.accountId,
},
);
} catch (err) {
logVerbose(`signal pairing reply failed for ${senderId}: ${String(err)}`);
}
}
} else {
logVerbose(`Blocked signal sender ${senderDisplay} (dmPolicy=${deps.dmPolicy})`);
}
return;
}
}
if (isGroup && deps.groupPolicy === "disabled") {
logVerbose("Blocked signal group message (groupPolicy: disabled)");
return;
}
if (isGroup && deps.groupPolicy === "allowlist") {
if (effectiveGroupAllow.length === 0) {
logVerbose("Blocked signal group message (groupPolicy: allowlist, no groupAllowFrom)");
return;
}
if (!isSignalSenderAllowed(sender, effectiveGroupAllow)) {
logVerbose(`Blocked signal group sender ${senderDisplay} (not in groupAllowFrom)`);
return;
}
}
const useAccessGroups = deps.cfg.commands?.useAccessGroups !== false;
const ownerAllowedForCommands = isSignalSenderAllowed(sender, effectiveDmAllow);
const groupAllowedForCommands = isSignalSenderAllowed(sender, effectiveGroupAllow);
const hasControlCommandInMessage = hasControlCommand(messageText, deps.cfg);
const commandGate = resolveControlCommandGate({
useAccessGroups,
authorizers: [
{ configured: effectiveDmAllow.length > 0, allowed: ownerAllowedForCommands },
{ configured: effectiveGroupAllow.length > 0, allowed: groupAllowedForCommands },
],
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
});
const commandAuthorized = isGroup ? commandGate.commandAuthorized : dmAllowed;
if (isGroup && commandGate.shouldBlock) {
logInboundDrop({
log: logVerbose,
channel: "signal",
reason: "control command (unauthorized)",
target: senderDisplay,
});
return;
}
const route = resolveAgentRoute({
cfg: deps.cfg,
channel: "signal",
accountId: deps.accountId,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupId ?? "unknown") : senderPeerId,
},
});
const mentionRegexes = buildMentionRegexes(deps.cfg, route.agentId);
const wasMentioned = isGroup && matchesMentionPatterns(messageText, mentionRegexes);
const requireMention =
isGroup &&
resolveChannelGroupRequireMention({
cfg: deps.cfg,
channel: "signal",
groupId,
accountId: deps.accountId,
});
const canDetectMention = mentionRegexes.length > 0;
const mentionGate = resolveMentionGatingWithBypass({
isGroup,
requireMention: Boolean(requireMention),
canDetectMention,
wasMentioned,
implicitMention: false,
hasAnyMention: false,
allowTextCommands: true,
hasControlCommand: hasControlCommandInMessage,
commandAuthorized,
});
const effectiveWasMentioned = mentionGate.effectiveWasMentioned;
if (isGroup && requireMention && canDetectMention && mentionGate.shouldSkip) {
logInboundDrop({
log: logVerbose,
channel: "signal",
reason: "no mention",
target: senderDisplay,
});
const quoteText = dataMessage.quote?.text?.trim() || "";
const pendingPlaceholder = (() => {
if (!dataMessage.attachments?.length) {
return "";
}
// When we're skipping a message we intentionally avoid downloading attachments.
// Still record a useful placeholder for pending-history context.
if (deps.ignoreAttachments) {
return "<media:attachment>";
}
const firstContentType = dataMessage.attachments?.[0]?.contentType;
const pendingKind = mediaKindFromMime(firstContentType ?? undefined);
return pendingKind ? `<media:${pendingKind}>` : "<media:attachment>";
})();
const pendingBodyText = messageText || pendingPlaceholder || quoteText;
const historyKey = groupId ?? "unknown";
recordPendingHistoryEntryIfEnabled({
historyMap: deps.groupHistories,
historyKey,
limit: deps.historyLimit,
entry: {
sender: envelope.sourceName ?? senderDisplay,
body: pendingBodyText,
timestamp: envelope.timestamp ?? undefined,
messageId:
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined,
},
});
return;
}
let mediaPath: string | undefined;
let mediaType: string | undefined;
let placeholder = "";
const firstAttachment = dataMessage.attachments?.[0];
if (firstAttachment?.id && !deps.ignoreAttachments) {
try {
const fetched = await deps.fetchAttachment({
baseUrl: deps.baseUrl,
account: deps.account,
attachment: firstAttachment,
sender: senderRecipient,
groupId,
maxBytes: deps.mediaMaxBytes,
});
if (fetched) {
mediaPath = fetched.path;
mediaType = fetched.contentType ?? firstAttachment.contentType ?? undefined;
}
} catch (err) {
deps.runtime.error?.(danger(`attachment fetch failed: ${String(err)}`));
}
}
const kind = mediaKindFromMime(mediaType ?? undefined);
if (kind) {
placeholder = `<media:${kind}>`;
} else if (dataMessage.attachments?.length) {
placeholder = "<media:attachment>";
}
const bodyText = messageText || placeholder || dataMessage.quote?.text?.trim() || "";
if (!bodyText) {
return;
}
const receiptTimestamp =
typeof envelope.timestamp === "number"
? envelope.timestamp
: typeof dataMessage.timestamp === "number"
? dataMessage.timestamp
: undefined;
if (deps.sendReadReceipts && !deps.readReceiptsViaDaemon && !isGroup && receiptTimestamp) {
try {
await sendReadReceiptSignal(`signal:${senderRecipient}`, receiptTimestamp, {
baseUrl: deps.baseUrl,
account: deps.account,
accountId: deps.accountId,
});
} catch (err) {
logVerbose(`signal read receipt failed for ${senderDisplay}: ${String(err)}`);
}
} else if (
deps.sendReadReceipts &&
!deps.readReceiptsViaDaemon &&
!isGroup &&
!receiptTimestamp
) {
logVerbose(`signal read receipt skipped (missing timestamp) for ${senderDisplay}`);
}
const senderName = envelope.sourceName ?? senderDisplay;
const messageId =
typeof envelope.timestamp === "number" ? String(envelope.timestamp) : undefined;
await inboundDebouncer.enqueue({
senderName,
senderDisplay,
senderRecipient,
senderPeerId,
groupId,
groupName,
isGroup,
bodyText,
timestamp: envelope.timestamp ?? undefined,
messageId,
mediaPath,
mediaType,
commandAuthorized,
wasMentioned: effectiveWasMentioned,
});
};
}