refactor(runtime): harden channel-registry cache invalidation and split outbound delivery flow

This commit is contained in:
Peter Steinberger
2026-03-03 00:05:12 +00:00
parent d6491d8d71
commit 1d0a4d1be2
4 changed files with 254 additions and 140 deletions

View File

@@ -1,4 +1,7 @@
import { getActivePluginRegistryKey, requireActivePluginRegistry } from "../../plugins/runtime.js";
import {
getActivePluginRegistryVersion,
requireActivePluginRegistry,
} from "../../plugins/runtime.js";
import { CHAT_CHANNEL_ORDER, type ChatChannelId, normalizeAnyChannelId } from "../registry.js";
import type { ChannelId, ChannelPlugin } from "./types.js";
@@ -23,15 +26,13 @@ function dedupeChannels(channels: ChannelPlugin[]): ChannelPlugin[] {
}
type CachedChannelPlugins = {
registry: ReturnType<typeof requireActivePluginRegistry> | null;
registryKey: string | null;
registryVersion: number;
sorted: ChannelPlugin[];
byId: Map<string, ChannelPlugin>;
};
const EMPTY_CHANNEL_PLUGIN_CACHE: CachedChannelPlugins = {
registry: null,
registryKey: null,
registryVersion: -1,
sorted: [],
byId: new Map(),
};
@@ -40,9 +41,9 @@ let cachedChannelPlugins = EMPTY_CHANNEL_PLUGIN_CACHE;
function resolveCachedChannelPlugins(): CachedChannelPlugins {
const registry = requireActivePluginRegistry();
const registryKey = getActivePluginRegistryKey();
const registryVersion = getActivePluginRegistryVersion();
const cached = cachedChannelPlugins;
if (cached.registry === registry && cached.registryKey === registryKey) {
if (cached.registryVersion === registryVersion) {
return cached;
}
@@ -62,8 +63,7 @@ function resolveCachedChannelPlugins(): CachedChannelPlugins {
}
const next: CachedChannelPlugins = {
registry,
registryKey,
registryVersion,
sorted,
byId,
};

View File

@@ -240,6 +240,212 @@ type DeliverOutboundPayloadsParams = DeliverOutboundPayloadsCoreParams & {
skipQueue?: boolean;
};
type MessageSentEvent = {
success: boolean;
content: string;
error?: string;
messageId?: string;
};
function hasMediaPayload(payload: ReplyPayload): boolean {
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
}
function hasChannelDataPayload(payload: ReplyPayload): boolean {
return Boolean(payload.channelData && Object.keys(payload.channelData).length > 0);
}
function normalizePayloadForChannelDelivery(
payload: ReplyPayload,
channelId: string,
): ReplyPayload | null {
const hasMedia = hasMediaPayload(payload);
const hasChannelData = hasChannelDataPayload(payload);
const rawText = typeof payload.text === "string" ? payload.text : "";
const normalizedText =
channelId === "whatsapp" ? rawText.replace(/^(?:[ \t]*\r?\n)+/, "") : rawText;
if (!normalizedText.trim()) {
if (!hasMedia && !hasChannelData) {
return null;
}
return {
...payload,
text: "",
};
}
if (normalizedText === rawText) {
return payload;
}
return {
...payload,
text: normalizedText,
};
}
function normalizePayloadsForChannelDelivery(
payloads: ReplyPayload[],
channel: Exclude<OutboundChannel, "none">,
): ReplyPayload[] {
const normalizedPayloads: ReplyPayload[] = [];
for (const payload of normalizeReplyPayloadsForDelivery(payloads)) {
let sanitizedPayload = payload;
// Strip HTML tags for plain-text surfaces (WhatsApp, Signal, etc.)
// Models occasionally produce <br>, <b>, etc. that render as literal text.
// See https://github.com/openclaw/openclaw/issues/31884
if (isPlainTextSurface(channel) && payload.text) {
// Telegram sendPayload uses textMode:"html". Preserve raw HTML in this path.
if (!(channel === "telegram" && payload.channelData)) {
sanitizedPayload = { ...payload, text: sanitizeForPlainText(payload.text) };
}
}
const normalized = normalizePayloadForChannelDelivery(sanitizedPayload, channel);
if (normalized) {
normalizedPayloads.push(normalized);
}
}
return normalizedPayloads;
}
function buildPayloadSummary(payload: ReplyPayload): NormalizedOutboundPayload {
return {
text: payload.text ?? "",
mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []),
channelData: payload.channelData,
};
}
function createMessageSentEmitter(params: {
hookRunner: ReturnType<typeof getGlobalHookRunner>;
channel: Exclude<OutboundChannel, "none">;
to: string;
accountId?: string;
sessionKeyForInternalHooks?: string;
mirrorIsGroup?: boolean;
mirrorGroupId?: string;
}): { emitMessageSent: (event: MessageSentEvent) => void; hasMessageSentHooks: boolean } {
const hasMessageSentHooks = params.hookRunner?.hasHooks("message_sent") ?? false;
const canEmitInternalHook = Boolean(params.sessionKeyForInternalHooks);
const emitMessageSent = (event: MessageSentEvent) => {
if (!hasMessageSentHooks && !canEmitInternalHook) {
return;
}
const canonical = buildCanonicalSentMessageHookContext({
to: params.to,
content: event.content,
success: event.success,
error: event.error,
channelId: params.channel,
accountId: params.accountId ?? undefined,
conversationId: params.to,
messageId: event.messageId,
isGroup: params.mirrorIsGroup,
groupId: params.mirrorGroupId,
});
if (hasMessageSentHooks) {
fireAndForgetHook(
params.hookRunner!.runMessageSent(
toPluginMessageSentEvent(canonical),
toPluginMessageContext(canonical),
),
"deliverOutboundPayloads: message_sent plugin hook failed",
(message) => {
log.warn(message);
},
);
}
if (!canEmitInternalHook) {
return;
}
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent(
"message",
"sent",
params.sessionKeyForInternalHooks!,
toInternalMessageSentContext(canonical),
),
),
"deliverOutboundPayloads: message:sent internal hook failed",
(message) => {
log.warn(message);
},
);
};
return { emitMessageSent, hasMessageSentHooks };
}
async function applyMessageSendingHook(params: {
hookRunner: ReturnType<typeof getGlobalHookRunner>;
enabled: boolean;
payload: ReplyPayload;
payloadSummary: NormalizedOutboundPayload;
to: string;
channel: Exclude<OutboundChannel, "none">;
accountId?: string;
}): Promise<{
cancelled: boolean;
payload: ReplyPayload;
payloadSummary: NormalizedOutboundPayload;
}> {
if (!params.enabled) {
return {
cancelled: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
}
try {
const sendingResult = await params.hookRunner!.runMessageSending(
{
to: params.to,
content: params.payloadSummary.text,
metadata: {
channel: params.channel,
accountId: params.accountId,
mediaUrls: params.payloadSummary.mediaUrls,
},
},
{
channelId: params.channel,
accountId: params.accountId ?? undefined,
},
);
if (sendingResult?.cancel) {
return {
cancelled: true,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
}
if (sendingResult?.content == null) {
return {
cancelled: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
}
const payload = {
...params.payload,
text: sendingResult.content,
};
return {
cancelled: false,
payload,
payloadSummary: {
...params.payloadSummary,
text: sendingResult.content,
},
};
} catch {
// Don't block delivery on hook failure.
return {
cancelled: false,
payload: params.payload,
payloadSummary: params.payloadSummary,
};
}
}
export async function deliverOutboundPayloads(
params: DeliverOutboundPayloadsParams,
): Promise<OutboundDeliveryResult[]> {
@@ -439,60 +645,21 @@ async function deliverOutboundPayloadsCore(
})),
};
};
const hasMediaPayload = (payload: ReplyPayload): boolean =>
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const hasChannelDataPayload = (payload: ReplyPayload): boolean =>
Boolean(payload.channelData && Object.keys(payload.channelData).length > 0);
const normalizePayloadForChannelDelivery = (
payload: ReplyPayload,
channelId: string,
): ReplyPayload | null => {
const hasMedia = hasMediaPayload(payload);
const hasChannelData = hasChannelDataPayload(payload);
const rawText = typeof payload.text === "string" ? payload.text : "";
const normalizedText =
channelId === "whatsapp" ? rawText.replace(/^(?:[ \t]*\r?\n)+/, "") : rawText;
if (!normalizedText.trim()) {
if (!hasMedia && !hasChannelData) {
return null;
}
return {
...payload,
text: "",
};
}
if (normalizedText === rawText) {
return payload;
}
return {
...payload,
text: normalizedText,
};
};
const normalizedPayloads: ReplyPayload[] = [];
for (const payload of normalizeReplyPayloadsForDelivery(payloads)) {
let sanitizedPayload = payload;
// Strip HTML tags for plain-text surfaces (WhatsApp, Signal, etc.)
// Models occasionally produce <br>, <b>, etc. that render as literal text.
// See https://github.com/openclaw/openclaw/issues/31884
if (isPlainTextSurface(channel) && payload.text) {
// Telegram sendPayload uses textMode:"html". Preserve raw HTML in this path.
if (!(channel === "telegram" && payload.channelData)) {
sanitizedPayload = { ...payload, text: sanitizeForPlainText(payload.text) };
}
}
const normalized = normalizePayloadForChannelDelivery(sanitizedPayload, channel);
if (normalized) {
normalizedPayloads.push(normalized);
}
}
const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel);
const hookRunner = getGlobalHookRunner();
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
const mirrorIsGroup = params.mirror?.isGroup;
const mirrorGroupId = params.mirror?.groupId;
const hasMessageSentHooks = hookRunner?.hasHooks("message_sent") ?? false;
const { emitMessageSent, hasMessageSentHooks } = createMessageSentEmitter({
hookRunner,
channel,
to,
accountId,
sessionKeyForInternalHooks,
mirrorIsGroup,
mirrorGroupId,
});
const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false;
const canEmitInternalHook = Boolean(sessionKeyForInternalHooks);
if (hasMessageSentHooks && params.session?.agentId && !sessionKeyForInternalHooks) {
log.warn(
"deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
@@ -504,91 +671,25 @@ async function deliverOutboundPayloadsCore(
);
}
for (const payload of normalizedPayloads) {
const payloadSummary: NormalizedOutboundPayload = {
text: payload.text ?? "",
mediaUrls: payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : []),
channelData: payload.channelData,
};
const emitMessageSent = (params: {
success: boolean;
content: string;
error?: string;
messageId?: string;
}) => {
if (!hasMessageSentHooks && !canEmitInternalHook) {
return;
}
const canonical = buildCanonicalSentMessageHookContext({
to,
content: params.content,
success: params.success,
error: params.error,
channelId: channel,
accountId: accountId ?? undefined,
conversationId: to,
messageId: params.messageId,
isGroup: mirrorIsGroup,
groupId: mirrorGroupId,
});
if (hasMessageSentHooks) {
fireAndForgetHook(
hookRunner!.runMessageSent(
toPluginMessageSentEvent(canonical),
toPluginMessageContext(canonical),
),
"deliverOutboundPayloads: message_sent plugin hook failed",
(message) => {
log.warn(message);
},
);
}
if (!canEmitInternalHook) {
return;
}
fireAndForgetHook(
triggerInternalHook(
createInternalHookEvent(
"message",
"sent",
sessionKeyForInternalHooks!,
toInternalMessageSentContext(canonical),
),
),
"deliverOutboundPayloads: message:sent internal hook failed",
(message) => {
log.warn(message);
},
);
};
let payloadSummary = buildPayloadSummary(payload);
try {
throwIfAborted(abortSignal);
// Run message_sending plugin hook (may modify content or cancel)
let effectivePayload = payload;
if (hasMessageSendingHooks) {
try {
const sendingResult = await hookRunner!.runMessageSending(
{
to,
content: payloadSummary.text,
metadata: { channel, accountId, mediaUrls: payloadSummary.mediaUrls },
},
{
channelId: channel,
accountId: accountId ?? undefined,
},
);
if (sendingResult?.cancel) {
continue;
}
if (sendingResult?.content != null) {
effectivePayload = { ...payload, text: sendingResult.content };
payloadSummary.text = sendingResult.content;
}
} catch {
// Don't block delivery on hook failure
}
const hookResult = await applyMessageSendingHook({
hookRunner,
enabled: hasMessageSendingHooks,
payload,
payloadSummary,
to,
channel,
accountId,
});
if (hookResult.cancelled) {
continue;
}
const effectivePayload = hookResult.payload;
payloadSummary = hookResult.payloadSummary;
params.onPayload?.(payloadSummary);
const sendOverrides = {

View File

@@ -79,6 +79,10 @@ function unwrapQuoted(value: string): string | undefined {
return trimmed.slice(1, -1).trim();
}
function mayContainFenceMarkers(input: string): boolean {
return input.includes("```") || input.includes("~~~");
}
// Check if a character offset is inside any fenced code block
function isInsideFence(fenceSpans: Array<{ start: number; end: number }>, offset: number): boolean {
return fenceSpans.some((span) => offset >= span.start && offset < span.end);
@@ -106,7 +110,8 @@ export function splitMediaFromOutput(raw: string): {
let foundMediaToken = false;
// Parse fenced code blocks to avoid extracting MEDIA tokens from inside them
const fenceSpans = parseFenceSpans(trimmedRaw);
const hasFenceMarkers = mayContainFenceMarkers(trimmedRaw);
const fenceSpans = hasFenceMarkers ? parseFenceSpans(trimmedRaw) : [];
// Collect tokens line by line so we can strip them cleanly.
const lines = trimmedRaw.split("\n");
@@ -115,7 +120,7 @@ export function splitMediaFromOutput(raw: string): {
let lineOffset = 0; // Track character offset for fence checking
for (const line of lines) {
// Skip MEDIA extraction if this line is inside a fenced code block
if (isInsideFence(fenceSpans, lineOffset)) {
if (hasFenceMarkers && isInsideFence(fenceSpans, lineOffset)) {
keptLines.push(line);
lineOffset += line.length + 1; // +1 for newline
continue;

View File

@@ -5,6 +5,7 @@ const REGISTRY_STATE = Symbol.for("openclaw.pluginRegistryState");
type RegistryState = {
registry: PluginRegistry | null;
key: string | null;
version: number;
};
const state: RegistryState = (() => {
@@ -15,6 +16,7 @@ const state: RegistryState = (() => {
globalState[REGISTRY_STATE] = {
registry: createEmptyPluginRegistry(),
key: null,
version: 0,
};
}
return globalState[REGISTRY_STATE];
@@ -23,6 +25,7 @@ const state: RegistryState = (() => {
export function setActivePluginRegistry(registry: PluginRegistry, cacheKey?: string) {
state.registry = registry;
state.key = cacheKey ?? null;
state.version += 1;
}
export function getActivePluginRegistry(): PluginRegistry | null {
@@ -32,6 +35,7 @@ export function getActivePluginRegistry(): PluginRegistry | null {
export function requireActivePluginRegistry(): PluginRegistry {
if (!state.registry) {
state.registry = createEmptyPluginRegistry();
state.version += 1;
}
return state.registry;
}
@@ -39,3 +43,7 @@ export function requireActivePluginRegistry(): PluginRegistry {
export function getActivePluginRegistryKey(): string | null {
return state.key;
}
export function getActivePluginRegistryVersion(): number {
return state.version;
}