From eefb2f8fb3dd8b2d0a5f718b2f0e070149a23891 Mon Sep 17 00:00:00 2001 From: Sebastian <19554889+sebslight@users.noreply.github.com> Date: Sat, 14 Feb 2026 22:59:50 -0500 Subject: [PATCH] refactor(reply): extract block delivery normalization --- .../reply/agent-runner-execution.ts | 87 ++---------- src/auto-reply/reply/agent-runner-payloads.ts | 29 ++-- src/auto-reply/reply/reply-delivery.ts | 132 ++++++++++++++++++ 3 files changed, 155 insertions(+), 93 deletions(-) create mode 100644 src/auto-reply/reply/reply-delivery.ts diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 1fb8d219855..488d770a793 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -35,9 +35,8 @@ import { import { stripHeartbeatToken } from "../heartbeat.js"; import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js"; import { buildThreadingToolContext, resolveEnforceFinalTag } from "./agent-runner-utils.js"; -import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; -import { parseReplyDirectives } from "./reply-directives.js"; -import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js"; +import { type BlockReplyPipeline } from "./block-reply-pipeline.js"; +import { createBlockReplyDeliveryHandler } from "./reply-delivery.js"; export type AgentRunLoopResult = | { @@ -367,77 +366,17 @@ export async function runAgentTurnWithFallback(params: { // even when regular block streaming is disabled. The handler sends directly // via opts.onBlockReply when the pipeline isn't available. onBlockReply: params.opts?.onBlockReply - ? async (payload) => { - const { text, skip } = normalizeStreamingText(payload); - const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0; - if (skip && !hasPayloadMedia) { - return; - } - const currentMessageId = - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid; - const taggedPayload = applyReplyTagsToPayload( - { - text, - mediaUrls: payload.mediaUrls, - mediaUrl: payload.mediaUrls?.[0], - replyToId: - payload.replyToId ?? - (payload.replyToCurrent === false ? undefined : currentMessageId), - replyToTag: payload.replyToTag, - replyToCurrent: payload.replyToCurrent, - }, - currentMessageId, - ); - // Let through payloads with audioAsVoice flag even if empty (need to track it) - if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) { - return; - } - const parsed = parseReplyDirectives(taggedPayload.text ?? "", { - currentMessageId, - silentToken: SILENT_REPLY_TOKEN, - }); - const cleaned = parsed.text || undefined; - const hasRenderableMedia = - Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0; - // Skip empty payloads unless they have audioAsVoice flag (need to track it) - if ( - !cleaned && - !hasRenderableMedia && - !payload.audioAsVoice && - !parsed.audioAsVoice - ) { - return; - } - if (parsed.isSilent && !hasRenderableMedia) { - return; - } - - const blockPayload: ReplyPayload = params.applyReplyToMode({ - ...taggedPayload, - text: cleaned?.trimStart(), - audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice), - replyToId: taggedPayload.replyToId ?? parsed.replyToId, - replyToTag: taggedPayload.replyToTag || parsed.replyToTag, - replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent, - }); - - void params.typingSignals - .signalTextDelta(cleaned ?? taggedPayload.text) - .catch((err) => { - logVerbose(`block reply typing signal failed: ${String(err)}`); - }); - - // Use pipeline if available (block streaming enabled), otherwise send directly - if (params.blockStreamingEnabled && params.blockReplyPipeline) { - params.blockReplyPipeline.enqueue(blockPayload); - } else if (params.blockStreamingEnabled) { - // Send directly when flushing before tool execution (no pipeline but streaming enabled). - // Track sent key to avoid duplicate in final payloads. - directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); - await params.opts?.onBlockReply?.(blockPayload); - } - // When streaming is disabled entirely, blocks are accumulated in final text instead. - } + ? createBlockReplyDeliveryHandler({ + onBlockReply: params.opts.onBlockReply, + currentMessageId: + params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, + normalizeStreamingText, + applyReplyToMode: params.applyReplyToMode, + typingSignals: params.typingSignals, + blockStreamingEnabled: params.blockStreamingEnabled, + blockReplyPipeline, + directlySentBlockKeys, + }) : undefined, onBlockReplyFlush: params.blockStreamingEnabled && blockReplyPipeline diff --git a/src/auto-reply/reply/agent-runner-payloads.ts b/src/auto-reply/reply/agent-runner-payloads.ts index e8aad67063b..3c2543e9cbd 100644 --- a/src/auto-reply/reply/agent-runner-payloads.ts +++ b/src/auto-reply/reply/agent-runner-payloads.ts @@ -6,7 +6,7 @@ import { stripHeartbeatToken } from "../heartbeat.js"; import { SILENT_REPLY_TOKEN } from "../tokens.js"; import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js"; import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js"; -import { parseReplyDirectives } from "./reply-directives.js"; +import { normalizeReplyPayloadDirectives } from "./reply-delivery.js"; import { applyReplyThreading, filterMessagingToolDuplicates, @@ -64,24 +64,15 @@ export function buildReplyPayloads(params: { replyToChannel: params.replyToChannel, currentMessageId: params.currentMessageId, }) - .map((payload) => { - const parsed = parseReplyDirectives(payload.text ?? "", { - currentMessageId: params.currentMessageId, - silentToken: SILENT_REPLY_TOKEN, - }); - const mediaUrls = payload.mediaUrls ?? parsed.mediaUrls; - const mediaUrl = payload.mediaUrl ?? parsed.mediaUrl ?? mediaUrls?.[0]; - return { - ...payload, - text: parsed.text ? parsed.text : undefined, - mediaUrls, - mediaUrl, - replyToId: payload.replyToId ?? parsed.replyToId, - replyToTag: payload.replyToTag || parsed.replyToTag, - replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent, - audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice), - }; - }) + .map( + (payload) => + normalizeReplyPayloadDirectives({ + payload, + currentMessageId: params.currentMessageId, + silentToken: SILENT_REPLY_TOKEN, + parseMode: "always", + }).payload, + ) .filter(isRenderablePayload); // Drop final payloads only when block streaming succeeded end-to-end. diff --git a/src/auto-reply/reply/reply-delivery.ts b/src/auto-reply/reply/reply-delivery.ts new file mode 100644 index 00000000000..367d5b84d93 --- /dev/null +++ b/src/auto-reply/reply/reply-delivery.ts @@ -0,0 +1,132 @@ +import type { BlockReplyContext, ReplyPayload } from "../types.js"; +import type { BlockReplyPipeline } from "./block-reply-pipeline.js"; +import type { TypingSignaler } from "./typing-mode.js"; +import { logVerbose } from "../../globals.js"; +import { SILENT_REPLY_TOKEN } from "../tokens.js"; +import { createBlockReplyPayloadKey } from "./block-reply-pipeline.js"; +import { parseReplyDirectives } from "./reply-directives.js"; +import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js"; + +export type ReplyDirectiveParseMode = "always" | "auto" | "never"; + +export function normalizeReplyPayloadDirectives(params: { + payload: ReplyPayload; + currentMessageId?: string; + silentToken?: string; + trimLeadingWhitespace?: boolean; + parseMode?: ReplyDirectiveParseMode; +}): { payload: ReplyPayload; isSilent: boolean } { + const parseMode = params.parseMode ?? "always"; + const silentToken = params.silentToken ?? SILENT_REPLY_TOKEN; + const sourceText = params.payload.text ?? ""; + + const shouldParse = + parseMode === "always" || + (parseMode === "auto" && + (sourceText.includes("[[") || + sourceText.includes("MEDIA:") || + sourceText.includes(silentToken))); + + const parsed = shouldParse + ? parseReplyDirectives(sourceText, { + currentMessageId: params.currentMessageId, + silentToken, + }) + : undefined; + + let text = parsed ? parsed.text || undefined : params.payload.text || undefined; + if (params.trimLeadingWhitespace && text) { + text = text.trimStart() || undefined; + } + + const mediaUrls = params.payload.mediaUrls ?? parsed?.mediaUrls; + const mediaUrl = params.payload.mediaUrl ?? parsed?.mediaUrl ?? mediaUrls?.[0]; + + return { + payload: { + ...params.payload, + text, + mediaUrls, + mediaUrl, + replyToId: params.payload.replyToId ?? parsed?.replyToId, + replyToTag: params.payload.replyToTag || parsed?.replyToTag, + replyToCurrent: params.payload.replyToCurrent || parsed?.replyToCurrent, + audioAsVoice: Boolean(params.payload.audioAsVoice || parsed?.audioAsVoice), + }, + isSilent: parsed?.isSilent ?? false, + }; +} + +const hasRenderableMedia = (payload: ReplyPayload): boolean => + Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; + +export function createBlockReplyDeliveryHandler(params: { + onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise | void; + currentMessageId?: string; + normalizeStreamingText: (payload: ReplyPayload) => { text?: string; skip: boolean }; + applyReplyToMode: (payload: ReplyPayload) => ReplyPayload; + typingSignals: TypingSignaler; + blockStreamingEnabled: boolean; + blockReplyPipeline: BlockReplyPipeline | null; + directlySentBlockKeys: Set; +}): (payload: ReplyPayload) => Promise { + return async (payload) => { + const { text, skip } = params.normalizeStreamingText(payload); + if (skip && !hasRenderableMedia(payload)) { + return; + } + + const taggedPayload = applyReplyTagsToPayload( + { + ...payload, + text, + mediaUrl: payload.mediaUrl ?? payload.mediaUrls?.[0], + replyToId: + payload.replyToId ?? + (payload.replyToCurrent === false ? undefined : params.currentMessageId), + }, + params.currentMessageId, + ); + + // Let through payloads with audioAsVoice flag even if empty (need to track it). + if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) { + return; + } + + const normalized = normalizeReplyPayloadDirectives({ + payload: taggedPayload, + currentMessageId: params.currentMessageId, + silentToken: SILENT_REPLY_TOKEN, + trimLeadingWhitespace: true, + parseMode: "auto", + }); + + const blockPayload = params.applyReplyToMode(normalized.payload); + const blockHasMedia = hasRenderableMedia(blockPayload); + + // Skip empty payloads unless they have audioAsVoice flag (need to track it). + if (!blockPayload.text && !blockHasMedia && !blockPayload.audioAsVoice) { + return; + } + if (normalized.isSilent && !blockHasMedia) { + return; + } + + if (blockPayload.text) { + void params.typingSignals.signalTextDelta(blockPayload.text).catch((err) => { + logVerbose(`block reply typing signal failed: ${String(err)}`); + }); + } + + // Use pipeline if available (block streaming enabled), otherwise send directly. + if (params.blockStreamingEnabled && params.blockReplyPipeline) { + params.blockReplyPipeline.enqueue(blockPayload); + } else if (params.blockStreamingEnabled) { + // Send directly when flushing before tool execution (no pipeline but streaming enabled). + // Track sent key to avoid duplicate in final payloads. + params.directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload)); + await params.onBlockReply(blockPayload); + } + // When streaming is disabled entirely, blocks are accumulated in final text instead. + }; +}