From 1d0a4d1be2da883c117cc2ab05eacc890c5210df Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Tue, 3 Mar 2026 00:05:12 +0000 Subject: [PATCH] refactor(runtime): harden channel-registry cache invalidation and split outbound delivery flow --- src/channels/plugins/index.ts | 18 +- src/infra/outbound/deliver.ts | 359 ++++++++++++++++++++++------------ src/media/parse.ts | 9 +- src/plugins/runtime.ts | 8 + 4 files changed, 254 insertions(+), 140 deletions(-) diff --git a/src/channels/plugins/index.ts b/src/channels/plugins/index.ts index d5aa15d3cb8..43b0aa99452 100644 --- a/src/channels/plugins/index.ts +++ b/src/channels/plugins/index.ts @@ -1,4 +1,7 @@ -import { getActivePluginRegistryKey, requireActivePluginRegistry } from "../../plugins/runtime.js"; +import { + getActivePluginRegistryVersion, + requireActivePluginRegistry, +} from "../../plugins/runtime.js"; import { CHAT_CHANNEL_ORDER, type ChatChannelId, normalizeAnyChannelId } from "../registry.js"; import type { ChannelId, ChannelPlugin } from "./types.js"; @@ -23,15 +26,13 @@ function dedupeChannels(channels: ChannelPlugin[]): ChannelPlugin[] { } type CachedChannelPlugins = { - registry: ReturnType | null; - registryKey: string | null; + registryVersion: number; sorted: ChannelPlugin[]; byId: Map; }; const EMPTY_CHANNEL_PLUGIN_CACHE: CachedChannelPlugins = { - registry: null, - registryKey: null, + registryVersion: -1, sorted: [], byId: new Map(), }; @@ -40,9 +41,9 @@ let cachedChannelPlugins = EMPTY_CHANNEL_PLUGIN_CACHE; function resolveCachedChannelPlugins(): CachedChannelPlugins { const registry = requireActivePluginRegistry(); - const registryKey = getActivePluginRegistryKey(); + const registryVersion = getActivePluginRegistryVersion(); const cached = cachedChannelPlugins; - if (cached.registry === registry && cached.registryKey === registryKey) { + if (cached.registryVersion === registryVersion) { return cached; } @@ -62,8 +63,7 @@ function resolveCachedChannelPlugins(): CachedChannelPlugins { } const next: CachedChannelPlugins = { - registry, - registryKey, + registryVersion, sorted, byId, }; diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 3b77e54a374..ac1e957c73d 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -240,6 +240,212 @@ type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & { skipQueue?: boolean; }; +type MessageSentEvent = { + success: boolean; + content: string; + error?: string; + messageId?: string; +}; + +function hasMediaPayload(payload: ReplyPayload): boolean { + return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; +} + +function hasChannelDataPayload(payload: ReplyPayload): boolean { + return Boolean(payload.channelData && Object.keys(payload.channelData).length > 0); +} + +function normalizePayloadForChannelDelivery( + payload: ReplyPayload, + channelId: string, +): ReplyPayload | null { + const hasMedia = hasMediaPayload(payload); + const hasChannelData = hasChannelDataPayload(payload); + const rawText = typeof payload.text === "string" ? payload.text : ""; + const normalizedText = + channelId === "whatsapp" ? rawText.replace(/^(?:[ \t]*\r?\n)+/, "") : rawText; + if (!normalizedText.trim()) { + if (!hasMedia && !hasChannelData) { + return null; + } + return { + ...payload, + text: "", + }; + } + if (normalizedText === rawText) { + return payload; + } + return { + ...payload, + text: normalizedText, + }; +} + +function normalizePayloadsForChannelDelivery( + payloads: ReplyPayload[], + channel: Exclude, +): ReplyPayload[] { + const normalizedPayloads: ReplyPayload[] = []; + for (const payload of normalizeReplyPayloadsForDelivery(payloads)) { + let sanitizedPayload = payload; + // Strip HTML tags for plain-text surfaces (WhatsApp, Signal, etc.) + // Models occasionally produce
, , etc. that render as literal text. + // See https://github.com/openclaw/openclaw/issues/31884 + if (isPlainTextSurface(channel) && payload.text) { + // Telegram sendPayload uses textMode:"html". Preserve raw HTML in this path. + if (!(channel === "telegram" && payload.channelData)) { + sanitizedPayload = { ...payload, text: sanitizeForPlainText(payload.text) }; + } + } + const normalized = normalizePayloadForChannelDelivery(sanitizedPayload, channel); + if (normalized) { + normalizedPayloads.push(normalized); + } + } + return normalizedPayloads; +} + +function buildPayloadSummary(payload: ReplyPayload): NormalizedOutboundPayload { + return { + text: payload.text ?? "", + mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []), + channelData: payload.channelData, + }; +} + +function createMessageSentEmitter(params: { + hookRunner: ReturnType; + channel: Exclude; + to: string; + accountId?: string; + sessionKeyForInternalHooks?: string; + mirrorIsGroup?: boolean; + mirrorGroupId?: string; +}): { emitMessageSent: (event: MessageSentEvent) => void; hasMessageSentHooks: boolean } { + const hasMessageSentHooks = params.hookRunner?.hasHooks("message_sent") ?? false; + const canEmitInternalHook = Boolean(params.sessionKeyForInternalHooks); + const emitMessageSent = (event: MessageSentEvent) => { + if (!hasMessageSentHooks && !canEmitInternalHook) { + return; + } + const canonical = buildCanonicalSentMessageHookContext({ + to: params.to, + content: event.content, + success: event.success, + error: event.error, + channelId: params.channel, + accountId: params.accountId ?? undefined, + conversationId: params.to, + messageId: event.messageId, + isGroup: params.mirrorIsGroup, + groupId: params.mirrorGroupId, + }); + if (hasMessageSentHooks) { + fireAndForgetHook( + params.hookRunner!.runMessageSent( + toPluginMessageSentEvent(canonical), + toPluginMessageContext(canonical), + ), + "deliverOutboundPayloads: message_sent plugin hook failed", + (message) => { + log.warn(message); + }, + ); + } + if (!canEmitInternalHook) { + return; + } + fireAndForgetHook( + triggerInternalHook( + createInternalHookEvent( + "message", + "sent", + params.sessionKeyForInternalHooks!, + toInternalMessageSentContext(canonical), + ), + ), + "deliverOutboundPayloads: message:sent internal hook failed", + (message) => { + log.warn(message); + }, + ); + }; + return { emitMessageSent, hasMessageSentHooks }; +} + +async function applyMessageSendingHook(params: { + hookRunner: ReturnType; + enabled: boolean; + payload: ReplyPayload; + payloadSummary: NormalizedOutboundPayload; + to: string; + channel: Exclude; + accountId?: string; +}): Promise<{ + cancelled: boolean; + payload: ReplyPayload; + payloadSummary: NormalizedOutboundPayload; +}> { + if (!params.enabled) { + return { + cancelled: false, + payload: params.payload, + payloadSummary: params.payloadSummary, + }; + } + try { + const sendingResult = await params.hookRunner!.runMessageSending( + { + to: params.to, + content: params.payloadSummary.text, + metadata: { + channel: params.channel, + accountId: params.accountId, + mediaUrls: params.payloadSummary.mediaUrls, + }, + }, + { + channelId: params.channel, + accountId: params.accountId ?? undefined, + }, + ); + if (sendingResult?.cancel) { + return { + cancelled: true, + payload: params.payload, + payloadSummary: params.payloadSummary, + }; + } + if (sendingResult?.content == null) { + return { + cancelled: false, + payload: params.payload, + payloadSummary: params.payloadSummary, + }; + } + const payload = { + ...params.payload, + text: sendingResult.content, + }; + return { + cancelled: false, + payload, + payloadSummary: { + ...params.payloadSummary, + text: sendingResult.content, + }, + }; + } catch { + // Don't block delivery on hook failure. + return { + cancelled: false, + payload: params.payload, + payloadSummary: params.payloadSummary, + }; + } +} + export async function deliverOutboundPayloads( params: DeliverOutboundPayloadsParams, ): Promise { @@ -439,60 +645,21 @@ async function deliverOutboundPayloadsCore( })), }; }; - const hasMediaPayload = (payload: ReplyPayload): boolean => - Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; - const hasChannelDataPayload = (payload: ReplyPayload): boolean => - Boolean(payload.channelData && Object.keys(payload.channelData).length > 0); - const normalizePayloadForChannelDelivery = ( - payload: ReplyPayload, - channelId: string, - ): ReplyPayload | null => { - const hasMedia = hasMediaPayload(payload); - const hasChannelData = hasChannelDataPayload(payload); - const rawText = typeof payload.text === "string" ? payload.text : ""; - const normalizedText = - channelId === "whatsapp" ? rawText.replace(/^(?:[ \t]*\r?\n)+/, "") : rawText; - if (!normalizedText.trim()) { - if (!hasMedia && !hasChannelData) { - return null; - } - return { - ...payload, - text: "", - }; - } - if (normalizedText === rawText) { - return payload; - } - return { - ...payload, - text: normalizedText, - }; - }; - const normalizedPayloads: ReplyPayload[] = []; - for (const payload of normalizeReplyPayloadsForDelivery(payloads)) { - let sanitizedPayload = payload; - // Strip HTML tags for plain-text surfaces (WhatsApp, Signal, etc.) - // Models occasionally produce
, , etc. that render as literal text. - // See https://github.com/openclaw/openclaw/issues/31884 - if (isPlainTextSurface(channel) && payload.text) { - // Telegram sendPayload uses textMode:"html". Preserve raw HTML in this path. - if (!(channel === "telegram" && payload.channelData)) { - sanitizedPayload = { ...payload, text: sanitizeForPlainText(payload.text) }; - } - } - const normalized = normalizePayloadForChannelDelivery(sanitizedPayload, channel); - if (normalized) { - normalizedPayloads.push(normalized); - } - } + const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel); const hookRunner = getGlobalHookRunner(); const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key; const mirrorIsGroup = params.mirror?.isGroup; const mirrorGroupId = params.mirror?.groupId; - const hasMessageSentHooks = hookRunner?.hasHooks("message_sent") ?? false; + const { emitMessageSent, hasMessageSentHooks } = createMessageSentEmitter({ + hookRunner, + channel, + to, + accountId, + sessionKeyForInternalHooks, + mirrorIsGroup, + mirrorGroupId, + }); const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false; - const canEmitInternalHook = Boolean(sessionKeyForInternalHooks); if (hasMessageSentHooks && params.session?.agentId && !sessionKeyForInternalHooks) { log.warn( "deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped", @@ -504,91 +671,25 @@ async function deliverOutboundPayloadsCore( ); } for (const payload of normalizedPayloads) { - const payloadSummary: NormalizedOutboundPayload = { - text: payload.text ?? "", - mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []), - channelData: payload.channelData, - }; - const emitMessageSent = (params: { - success: boolean; - content: string; - error?: string; - messageId?: string; - }) => { - if (!hasMessageSentHooks && !canEmitInternalHook) { - return; - } - const canonical = buildCanonicalSentMessageHookContext({ - to, - content: params.content, - success: params.success, - error: params.error, - channelId: channel, - accountId: accountId ?? undefined, - conversationId: to, - messageId: params.messageId, - isGroup: mirrorIsGroup, - groupId: mirrorGroupId, - }); - if (hasMessageSentHooks) { - fireAndForgetHook( - hookRunner!.runMessageSent( - toPluginMessageSentEvent(canonical), - toPluginMessageContext(canonical), - ), - "deliverOutboundPayloads: message_sent plugin hook failed", - (message) => { - log.warn(message); - }, - ); - } - if (!canEmitInternalHook) { - return; - } - fireAndForgetHook( - triggerInternalHook( - createInternalHookEvent( - "message", - "sent", - sessionKeyForInternalHooks!, - toInternalMessageSentContext(canonical), - ), - ), - "deliverOutboundPayloads: message:sent internal hook failed", - (message) => { - log.warn(message); - }, - ); - }; + let payloadSummary = buildPayloadSummary(payload); try { throwIfAborted(abortSignal); // Run message_sending plugin hook (may modify content or cancel) - let effectivePayload = payload; - if (hasMessageSendingHooks) { - try { - const sendingResult = await hookRunner!.runMessageSending( - { - to, - content: payloadSummary.text, - metadata: { channel, accountId, mediaUrls: payloadSummary.mediaUrls }, - }, - { - channelId: channel, - accountId: accountId ?? undefined, - }, - ); - if (sendingResult?.cancel) { - continue; - } - if (sendingResult?.content != null) { - effectivePayload = { ...payload, text: sendingResult.content }; - payloadSummary.text = sendingResult.content; - } - } catch { - // Don't block delivery on hook failure - } + const hookResult = await applyMessageSendingHook({ + hookRunner, + enabled: hasMessageSendingHooks, + payload, + payloadSummary, + to, + channel, + accountId, + }); + if (hookResult.cancelled) { + continue; } + const effectivePayload = hookResult.payload; + payloadSummary = hookResult.payloadSummary; params.onPayload?.(payloadSummary); const sendOverrides = { diff --git a/src/media/parse.ts b/src/media/parse.ts index 8dd69d31bbb..9aa8893d095 100644 --- a/src/media/parse.ts +++ b/src/media/parse.ts @@ -79,6 +79,10 @@ function unwrapQuoted(value: string): string | undefined { return trimmed.slice(1, -1).trim(); } +function mayContainFenceMarkers(input: string): boolean { + return input.includes("```") || input.includes("~~~"); +} + // Check if a character offset is inside any fenced code block function isInsideFence(fenceSpans: Array<{ start: number; end: number }>, offset: number): boolean { return fenceSpans.some((span) => offset >= span.start && offset < span.end); @@ -106,7 +110,8 @@ export function splitMediaFromOutput(raw: string): { let foundMediaToken = false; // Parse fenced code blocks to avoid extracting MEDIA tokens from inside them - const fenceSpans = parseFenceSpans(trimmedRaw); + const hasFenceMarkers = mayContainFenceMarkers(trimmedRaw); + const fenceSpans = hasFenceMarkers ? parseFenceSpans(trimmedRaw) : []; // Collect tokens line by line so we can strip them cleanly. const lines = trimmedRaw.split("\n"); @@ -115,7 +120,7 @@ export function splitMediaFromOutput(raw: string): { let lineOffset = 0; // Track character offset for fence checking for (const line of lines) { // Skip MEDIA extraction if this line is inside a fenced code block - if (isInsideFence(fenceSpans, lineOffset)) { + if (hasFenceMarkers && isInsideFence(fenceSpans, lineOffset)) { keptLines.push(line); lineOffset += line.length + 1; // +1 for newline continue; diff --git a/src/plugins/runtime.ts b/src/plugins/runtime.ts index 10177d74f46..752908ddf75 100644 --- a/src/plugins/runtime.ts +++ b/src/plugins/runtime.ts @@ -5,6 +5,7 @@ const REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState"); type RegistryState = { registry: PluginRegistry | null; key: string | null; + version: number; }; const state: RegistryState = (() => { @@ -15,6 +16,7 @@ const state: RegistryState = (() => { globalState[REGISTRY_STATE] = { registry: createEmptyPluginRegistry(), key: null, + version: 0, }; } return globalState[REGISTRY_STATE]; @@ -23,6 +25,7 @@ const state: RegistryState = (() => { export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: string) { state.registry = registry; state.key = cacheKey ?? null; + state.version += 1; } export function getActivePluginRegistry(): PluginRegistry | null { @@ -32,6 +35,7 @@ export function getActivePluginRegistry(): PluginRegistry | null { export function requireActivePluginRegistry(): PluginRegistry { if (!state.registry) { state.registry = createEmptyPluginRegistry(); + state.version += 1; } return state.registry; } @@ -39,3 +43,7 @@ export function requireActivePluginRegistry(): PluginRegistry { export function getActivePluginRegistryKey(): string | null { return state.key; } + +export function getActivePluginRegistryVersion(): number { + return state.version; +}