refactor: deduplicate reply payload handling

This commit is contained in:
Peter Steinberger
2026-03-18 18:14:36 +00:00
parent 152d179302
commit 62edfdffbd
58 changed files with 704 additions and 450 deletions

View File

@@ -1,3 +1,4 @@
import { hasOutboundReplyContent } from "../plugin-sdk/reply-payload.js";
import type { ReplyPayload } from "./types.js";
export function resolveHeartbeatReplyPayload(
@@ -14,7 +15,7 @@ export function resolveHeartbeatReplyPayload(
if (!payload) {
continue;
}
if (payload.text || payload.mediaUrl || (payload.mediaUrls && payload.mediaUrls.length > 0)) {
if (hasOutboundReplyContent(payload)) {
return payload;
}
}

View File

@@ -23,6 +23,7 @@ import {
} from "../../config/sessions.js";
import { logVerbose } from "../../globals.js";
import { emitAgentEvent, registerAgentRunContext } from "../../infra/agent-events.js";
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import { defaultRuntime } from "../../runtime.js";
import {
isMarkdownCapableMessageChannel,
@@ -148,6 +149,7 @@ export async function runAgentTurnWithFallback(params: {
try {
const normalizeStreamingText = (payload: ReplyPayload): { text?: string; skip: boolean } => {
let text = payload.text;
const reply = resolveSendableOutboundReplyParts(payload);
if (!params.isHeartbeat && text?.includes("HEARTBEAT_OK")) {
const stripped = stripHeartbeatToken(text, {
mode: "message",
@@ -156,7 +158,7 @@ export async function runAgentTurnWithFallback(params: {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
if (stripped.shouldSkip && (payload.mediaUrls?.length ?? 0) === 0) {
if (stripped.shouldSkip && !reply.hasMedia) {
return { skip: true };
}
text = stripped.text;
@@ -172,7 +174,7 @@ export async function runAgentTurnWithFallback(params: {
}
if (!text) {
// Allow media-only payloads (e.g. tool result screenshots) through.
if ((payload.mediaUrls?.length ?? 0) > 0) {
if (reply.hasMedia) {
return { text: undefined, skip: false };
}
return { skip: true };

View File

@@ -1,5 +1,9 @@
import { loadSessionStore } from "../../config/sessions.js";
import { isAudioFileName } from "../../media/mime.js";
import {
hasOutboundReplyContent,
resolveSendableOutboundReplyParts,
} from "../../plugin-sdk/reply-payload.js";
import { normalizeVerboseLevel, type VerboseLevel } from "../thinking.js";
import type { ReplyPayload } from "../types.js";
import { scheduleFollowupDrain } from "./queue.js";
@@ -9,7 +13,7 @@ const hasAudioMedia = (urls?: string[]): boolean =>
Boolean(urls?.some((url) => isAudioFileName(url)));
export const isAudioPayload = (payload: ReplyPayload): boolean =>
hasAudioMedia(payload.mediaUrls ?? (payload.mediaUrl ? [payload.mediaUrl] : undefined));
hasAudioMedia(resolveSendableOutboundReplyParts(payload).mediaUrls);
type VerboseGateParams = {
sessionKey?: string;
@@ -63,19 +67,9 @@ export const signalTypingIfNeeded = async (
payloads: ReplyPayload[],
typingSignals: TypingSignaler,
): Promise<void> => {
const shouldSignalTyping = payloads.some((payload) => {
const trimmed = payload.text?.trim();
if (trimmed) {
return true;
}
if (payload.mediaUrl) {
return true;
}
if (payload.mediaUrls && payload.mediaUrls.length > 0) {
return true;
}
return false;
});
const shouldSignalTyping = payloads.some((payload) =>
hasOutboundReplyContent(payload, { trimText: true }),
);
if (shouldSignalTyping) {
await typingSignals.signalRunStart();
}

View File

@@ -1,5 +1,6 @@
import type { ReplyToMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import type { OriginatingChannelType } from "../templating.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
@@ -20,15 +21,11 @@ import {
shouldSuppressMessagingToolReplies,
} from "./reply-payloads.js";
function hasPayloadMedia(payload: ReplyPayload): boolean {
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
}
async function normalizeReplyPayloadMedia(params: {
payload: ReplyPayload;
normalizeMediaPaths?: (payload: ReplyPayload) => Promise<ReplyPayload>;
}): Promise<ReplyPayload> {
if (!params.normalizeMediaPaths || !hasPayloadMedia(params.payload)) {
if (!params.normalizeMediaPaths || !resolveSendableOutboundReplyParts(params.payload).hasMedia) {
return params.payload;
}
@@ -69,11 +66,7 @@ async function normalizeSentMediaUrlsForDedupe(params: {
mediaUrl: trimmed,
mediaUrls: [trimmed],
});
const normalizedMediaUrls = normalized.mediaUrls?.length
? normalized.mediaUrls
: normalized.mediaUrl
? [normalized.mediaUrl]
: [];
const normalizedMediaUrls = resolveSendableOutboundReplyParts(normalized).mediaUrls;
for (const mediaUrl of normalizedMediaUrls) {
const candidate = mediaUrl.trim();
if (!candidate || seen.has(candidate)) {
@@ -130,7 +123,7 @@ export async function buildReplyPayloads(params: {
didLogHeartbeatStrip = true;
logVerbose("Stripped stray HEARTBEAT_OK token from reply");
}
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const hasMedia = resolveSendableOutboundReplyParts(payload).hasMedia;
if (stripped.shouldSkip && !hasMedia) {
return [];
}

View File

@@ -1,3 +1,4 @@
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import type { ReplyPayload } from "../types.js";
import type { BlockStreamingCoalescing } from "./block-streaming.js";
@@ -75,9 +76,10 @@ export function createBlockReplyCoalescer(params: {
if (shouldAbort()) {
return;
}
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const text = payload.text ?? "";
const hasText = text.trim().length > 0;
const reply = resolveSendableOutboundReplyParts(payload);
const hasMedia = reply.hasMedia;
const text = reply.text;
const hasText = reply.hasText;
if (hasMedia) {
void flush({ force: true });
void onFlush(payload);

View File

@@ -1,4 +1,5 @@
import { logVerbose } from "../../globals.js";
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import type { ReplyPayload } from "../types.js";
import { createBlockReplyCoalescer } from "./block-reply-coalescer.js";
import type { BlockStreamingCoalescing } from "./block-streaming.js";
@@ -35,30 +36,20 @@ export function createAudioAsVoiceBuffer(params: {
}
export function createBlockReplyPayloadKey(payload: ReplyPayload): string {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
const reply = resolveSendableOutboundReplyParts(payload);
return JSON.stringify({
text,
mediaList,
text: reply.trimmedText,
mediaList: reply.mediaUrls,
replyToId: payload.replyToId ?? null,
});
}
export function createBlockReplyContentKey(payload: ReplyPayload): string {
const text = payload.text?.trim() ?? "";
const mediaList = payload.mediaUrls?.length
? payload.mediaUrls
: payload.mediaUrl
? [payload.mediaUrl]
: [];
const reply = resolveSendableOutboundReplyParts(payload);
// Content-only key used for final-payload suppression after block streaming.
// This intentionally ignores replyToId so a streamed threaded payload and the
// later final payload still collapse when they carry the same content.
return JSON.stringify({ text, mediaList });
return JSON.stringify({ text: reply.trimmedText, mediaList: reply.mediaUrls });
}
const withTimeout = async <T>(
@@ -217,7 +208,7 @@ export function createBlockReplyPipeline(params: {
if (bufferPayload(payload)) {
return;
}
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const hasMedia = resolveSendableOutboundReplyParts(payload).hasMedia;
if (hasMedia) {
void coalescer?.flush({ force: true });
sendPayload(payload, /* bypassSeenCheck */ false);

View File

@@ -2,6 +2,7 @@ import type { OpenClawConfig } from "../../config/config.js";
import type { TtsAutoMode } from "../../config/types.tts.js";
import { logVerbose } from "../../globals.js";
import { runMessageAction } from "../../infra/outbound/message-action-runner.js";
import { hasOutboundReplyContent } from "../../plugin-sdk/reply-payload.js";
import { maybeApplyTtsToPayload } from "../../tts/tts.js";
import type { FinalizedMsgContext } from "../templating.js";
import type { ReplyPayload } from "../types.js";
@@ -127,7 +128,7 @@ export function createAcpDispatchDeliveryCoordinator(params: {
state.blockCount += 1;
}
if ((payload.text?.trim() ?? "").length > 0 || payload.mediaUrl || payload.mediaUrls?.length) {
if (hasOutboundReplyContent(payload, { trimText: true })) {
await startReplyLifecycleOnce();
}

View File

@@ -29,6 +29,7 @@ import {
logMessageQueued,
logSessionStateChange,
} from "../../logging/diagnostic.js";
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import {
buildPluginBindingDeclinedText,
buildPluginBindingErrorText,
@@ -532,7 +533,7 @@ export async function dispatchReplyFromConfig(params: {
}
// Group/native flows intentionally suppress tool summary text, but media-only
// tool results (for example TTS audio) must still be delivered.
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const hasMedia = resolveSendableOutboundReplyParts(payload).hasMedia;
if (!hasMedia) {
return null;
}

View File

@@ -9,6 +9,10 @@ import type { SessionEntry } from "../../config/sessions.js";
import type { TypingMode } from "../../config/types.js";
import { logVerbose } from "../../globals.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import {
hasOutboundReplyContent,
resolveSendableOutboundReplyParts,
} from "../../plugin-sdk/reply-payload.js";
import { defaultRuntime } from "../../runtime.js";
import { isInternalMessageChannel } from "../../utils/message-channel.js";
import { stripHeartbeatToken } from "../heartbeat.js";
@@ -81,13 +85,12 @@ export function createFollowupRunner(params: {
}
for (const payload of payloads) {
if (!payload?.text && !payload?.mediaUrl && !payload?.mediaUrls?.length) {
if (!payload || !hasOutboundReplyContent(payload)) {
continue;
}
if (
isSilentReplyText(payload.text, SILENT_REPLY_TOKEN) &&
!payload.mediaUrl &&
!payload.mediaUrls?.length
!resolveSendableOutboundReplyParts(payload).hasMedia
) {
continue;
}
@@ -289,7 +292,7 @@ export function createFollowupRunner(params: {
return [payload];
}
const stripped = stripHeartbeatToken(text, { mode: "message" });
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
const hasMedia = resolveSendableOutboundReplyParts(payload).hasMedia;
if (stripped.shouldSkip && !hasMedia) {
return [];
}

View File

@@ -1,5 +1,5 @@
import { sanitizeUserFacingText } from "../../agents/pi-embedded-helpers.js";
import { hasReplyChannelData, hasReplyContent } from "../../interactive/payload.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import { stripHeartbeatToken } from "../heartbeat.js";
import {
HEARTBEAT_TOKEN,
@@ -32,17 +32,18 @@ export function normalizeReplyPayload(
payload: ReplyPayload,
opts: NormalizeReplyOptions = {},
): ReplyPayload | null {
const hasChannelData = hasReplyChannelData(payload.channelData);
const hasContent = (text: string | undefined) =>
hasReplyPayloadContent(
{
...payload,
text,
},
{
trimText: true,
},
);
const trimmed = payload.text?.trim() ?? "";
if (
!hasReplyContent({
text: trimmed,
mediaUrl: payload.mediaUrl,
mediaUrls: payload.mediaUrls,
interactive: payload.interactive,
hasChannelData,
})
) {
if (!hasContent(trimmed)) {
opts.onSkip?.("empty");
return null;
}
@@ -50,14 +51,7 @@ export function normalizeReplyPayload(
const silentToken = opts.silentToken ?? SILENT_REPLY_TOKEN;
let text = payload.text ?? undefined;
if (text && isSilentReplyText(text, silentToken)) {
if (
!hasReplyContent({
mediaUrl: payload.mediaUrl,
mediaUrls: payload.mediaUrls,
interactive: payload.interactive,
hasChannelData,
})
) {
if (!hasContent("")) {
opts.onSkip?.("silent");
return null;
}
@@ -68,15 +62,7 @@ export function normalizeReplyPayload(
// silent just like the exact-match path above. (#30916, #30955)
if (text && text.includes(silentToken) && !isSilentReplyText(text, silentToken)) {
text = stripSilentToken(text, silentToken);
if (
!hasReplyContent({
text,
mediaUrl: payload.mediaUrl,
mediaUrls: payload.mediaUrls,
interactive: payload.interactive,
hasChannelData,
})
) {
if (!hasContent(text)) {
opts.onSkip?.("silent");
return null;
}
@@ -92,16 +78,7 @@ export function normalizeReplyPayload(
if (stripped.didStrip) {
opts.onHeartbeatStrip?.();
}
if (
stripped.shouldSkip &&
!hasReplyContent({
text: stripped.text,
mediaUrl: payload.mediaUrl,
mediaUrls: payload.mediaUrls,
interactive: payload.interactive,
hasChannelData,
})
) {
if (stripped.shouldSkip && !hasContent(stripped.text)) {
opts.onSkip?.("heartbeat");
return null;
}
@@ -111,15 +88,7 @@ export function normalizeReplyPayload(
if (text) {
text = sanitizeUserFacingText(text, { errorContext: Boolean(payload.isError) });
}
if (
!hasReplyContent({
text,
mediaUrl: payload.mediaUrl,
mediaUrls: payload.mediaUrls,
interactive: payload.interactive,
hasChannelData,
})
) {
if (!hasContent(text)) {
opts.onSkip?.("empty");
return null;
}

View File

@@ -1,4 +1,5 @@
import { logVerbose } from "../../globals.js";
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import type { BlockReplyContext, ReplyPayload } from "../types.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
@@ -57,9 +58,6 @@ export function normalizeReplyPayloadDirectives(params: {
};
}
const hasRenderableMedia = (payload: ReplyPayload): boolean =>
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
export function createBlockReplyDeliveryHandler(params: {
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
currentMessageId?: string;
@@ -73,7 +71,7 @@ export function createBlockReplyDeliveryHandler(params: {
}): (payload: ReplyPayload) => Promise<void> {
return async (payload) => {
const { text, skip } = params.normalizeStreamingText(payload);
if (skip && !hasRenderableMedia(payload)) {
if (skip && !resolveSendableOutboundReplyParts(payload).hasMedia) {
return;
}
@@ -106,7 +104,7 @@ export function createBlockReplyDeliveryHandler(params: {
? await params.normalizeMediaPaths(normalized.payload)
: normalized.payload;
const blockPayload = params.applyReplyToMode(mediaNormalizedPayload);
const blockHasMedia = hasRenderableMedia(blockPayload);
const blockHasMedia = resolveSendableOutboundReplyParts(blockPayload).hasMedia;
// Skip empty payloads unless they have audioAsVoice flag (need to track it).
if (!blockPayload.text && !blockHasMedia && !blockPayload.audioAsVoice) {

View File

@@ -2,6 +2,7 @@ import { resolvePathFromInput } from "../../agents/path-policy.js";
import { assertMediaNotDataUrl, resolveSandboxedMediaSource } from "../../agents/sandbox-paths.js";
import { ensureSandboxWorkspaceForSession } from "../../agents/sandbox.js";
import type { OpenClawConfig } from "../../config/config.js";
import { resolveSendableOutboundReplyParts } from "../../plugin-sdk/reply-payload.js";
import type { ReplyPayload } from "../types.js";
const HTTP_URL_RE = /^https?:\/\//i;
@@ -25,7 +26,7 @@ function isLikelyLocalMediaSource(media: string): boolean {
}
function getPayloadMediaList(payload: ReplyPayload): string[] {
return payload.mediaUrls?.length ? payload.mediaUrls : payload.mediaUrl ? [payload.mediaUrl] : [];
return resolveSendableOutboundReplyParts(payload).mediaUrls;
}
export function createReplyMediaPathNormalizer(params: {

View File

@@ -4,7 +4,7 @@ import { normalizeChannelId } from "../../channels/plugins/index.js";
import { parseExplicitTargetForChannel } from "../../channels/plugins/target-parsing.js";
import type { ReplyToMode } from "../../config/types.js";
import { normalizeTargetForProvider } from "../../infra/outbound/target-normalization.js";
import { hasReplyChannelData, hasReplyContent } from "../../interactive/payload.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import { normalizeOptionalAccountId } from "../../routing/account-id.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
@@ -75,14 +75,7 @@ export function applyReplyTagsToPayload(
}
export function isRenderablePayload(payload: ReplyPayload): boolean {
return hasReplyContent({
text: payload.text,
mediaUrl: payload.mediaUrl,
mediaUrls: payload.mediaUrls,
interactive: payload.interactive,
hasChannelData: hasReplyChannelData(payload.channelData),
extraContent: payload.audioAsVoice,
});
return hasReplyPayloadContent(payload, { extraContent: payload.audioAsVoice });
}
export function shouldSuppressReasoningPayload(payload: ReplyPayload): boolean {

View File

@@ -12,7 +12,7 @@ import { resolveEffectiveMessagesConfig } from "../../agents/identity.js";
import { getChannelPlugin, normalizeChannelId } from "../../channels/plugins/index.js";
import type { OpenClawConfig } from "../../config/config.js";
import { buildOutboundSessionContext } from "../../infra/outbound/session-context.js";
import { hasReplyContent } from "../../interactive/payload.js";
import { hasReplyPayloadContent } from "../../interactive/payload.js";
import { INTERNAL_MESSAGE_CHANNEL, normalizeMessageChannel } from "../../utils/message-channel.js";
import type { OriginatingChannelType } from "../templating.js";
import type { ReplyPayload } from "../types.js";
@@ -126,12 +126,16 @@ export async function routeReply(params: RouteReplyParams): Promise<RouteReplyRe
// Skip empty replies.
if (
!hasReplyContent({
text,
mediaUrls,
interactive: externalPayload.interactive,
hasChannelData,
})
!hasReplyPayloadContent(
{
...externalPayload,
text,
mediaUrls,
},
{
hasChannelData,
},
)
) {
return { ok: true };
}

View File

@@ -1,4 +1,5 @@
import { splitMediaFromOutput } from "../../media/parse.js";
import { hasOutboundReplyContent } from "../../plugin-sdk/reply-payload.js";
import { parseInlineDirectives } from "../../utils/directive-tags.js";
import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { ReplyDirectiveParseResult } from "./reply-directives.js";
@@ -67,10 +68,7 @@ const parseChunk = (raw: string, options?: { silentToken?: string }): ParsedChun
};
const hasRenderableContent = (parsed: ReplyDirectiveParseResult): boolean =>
Boolean(parsed.text) ||
Boolean(parsed.mediaUrl) ||
(parsed.mediaUrls?.length ?? 0) > 0 ||
Boolean(parsed.audioAsVoice);
hasOutboundReplyContent(parsed) || Boolean(parsed.audioAsVoice);
export function createStreamingDirectiveAccumulator() {
let pendingTail = "";