refactor(reply): extract block delivery normalization

This commit is contained in:
Sebastian
2026-02-14 22:59:50 -05:00
parent 1eb023b26c
commit eefb2f8fb3
3 changed files with 155 additions and 93 deletions

View File

@@ -35,9 +35,8 @@ import {
import { stripHeartbeatToken } from "../heartbeat.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import { buildThreadingToolContext, resolveEnforceFinalTag } from "./agent-runner-utils.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js";
import { type BlockReplyPipeline } from "./block-reply-pipeline.js";
import { createBlockReplyDeliveryHandler } from "./reply-delivery.js";
export type AgentRunLoopResult =
| {
@@ -367,77 +366,17 @@ export async function runAgentTurnWithFallback(params: {
// even when regular block streaming is disabled. The handler sends directly
// via opts.onBlockReply when the pipeline isn't available.
onBlockReply: params.opts?.onBlockReply
? async (payload) => {
const { text, skip } = normalizeStreamingText(payload);
const hasPayloadMedia = (payload.mediaUrls?.length ?? 0) > 0;
if (skip && !hasPayloadMedia) {
return;
}
const currentMessageId =
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid;
const taggedPayload = applyReplyTagsToPayload(
{
text,
mediaUrls: payload.mediaUrls,
mediaUrl: payload.mediaUrls?.[0],
replyToId:
payload.replyToId ??
(payload.replyToCurrent === false ? undefined : currentMessageId),
replyToTag: payload.replyToTag,
replyToCurrent: payload.replyToCurrent,
},
currentMessageId,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it)
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) {
return;
}
const parsed = parseReplyDirectives(taggedPayload.text ?? "", {
currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
});
const cleaned = parsed.text || undefined;
const hasRenderableMedia =
Boolean(taggedPayload.mediaUrl) || (taggedPayload.mediaUrls?.length ?? 0) > 0;
// Skip empty payloads unless they have audioAsVoice flag (need to track it)
if (
!cleaned &&
!hasRenderableMedia &&
!payload.audioAsVoice &&
!parsed.audioAsVoice
) {
return;
}
if (parsed.isSilent && !hasRenderableMedia) {
return;
}
const blockPayload: ReplyPayload = params.applyReplyToMode({
...taggedPayload,
text: cleaned?.trimStart(),
audioAsVoice: Boolean(parsed.audioAsVoice || payload.audioAsVoice),
replyToId: taggedPayload.replyToId ?? parsed.replyToId,
replyToTag: taggedPayload.replyToTag || parsed.replyToTag,
replyToCurrent: taggedPayload.replyToCurrent || parsed.replyToCurrent,
});
void params.typingSignals
.signalTextDelta(cleaned ?? taggedPayload.text)
.catch((err) => {
logVerbose(`block reply typing signal failed: ${String(err)}`);
});
// Use pipeline if available (block streaming enabled), otherwise send directly
if (params.blockStreamingEnabled && params.blockReplyPipeline) {
params.blockReplyPipeline.enqueue(blockPayload);
} else if (params.blockStreamingEnabled) {
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
// Track sent key to avoid duplicate in final payloads.
directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
await params.opts?.onBlockReply?.(blockPayload);
}
// When streaming is disabled entirely, blocks are accumulated in final text instead.
}
? createBlockReplyDeliveryHandler({
onBlockReply: params.opts.onBlockReply,
currentMessageId:
params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid,
normalizeStreamingText,
applyReplyToMode: params.applyReplyToMode,
typingSignals: params.typingSignals,
blockStreamingEnabled: params.blockStreamingEnabled,
blockReplyPipeline,
directlySentBlockKeys,
})
: undefined,
onBlockReplyFlush:
params.blockStreamingEnabled && blockReplyPipeline

View File

@@ -6,7 +6,7 @@ import { stripHeartbeatToken } from "../heartbeat.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import { formatBunFetchSocketError, isBunFetchSocketError } from "./agent-runner-utils.js";
import { createBlockReplyPayloadKey, type BlockReplyPipeline } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
import { normalizeReplyPayloadDirectives } from "./reply-delivery.js";
import {
applyReplyThreading,
filterMessagingToolDuplicates,
@@ -64,24 +64,15 @@ export function buildReplyPayloads(params: {
replyToChannel: params.replyToChannel,
currentMessageId: params.currentMessageId,
})
.map((payload) => {
const parsed = parseReplyDirectives(payload.text ?? "", {
currentMessageId: params.currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
});
const mediaUrls = payload.mediaUrls ?? parsed.mediaUrls;
const mediaUrl = payload.mediaUrl ?? parsed.mediaUrl ?? mediaUrls?.[0];
return {
...payload,
text: parsed.text ? parsed.text : undefined,
mediaUrls,
mediaUrl,
replyToId: payload.replyToId ?? parsed.replyToId,
replyToTag: payload.replyToTag || parsed.replyToTag,
replyToCurrent: payload.replyToCurrent || parsed.replyToCurrent,
audioAsVoice: Boolean(payload.audioAsVoice || parsed.audioAsVoice),
};
})
.map(
(payload) =>
normalizeReplyPayloadDirectives({
payload,
currentMessageId: params.currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
parseMode: "always",
}).payload,
)
.filter(isRenderablePayload);
// Drop final payloads only when block streaming succeeded end-to-end.

View File

@@ -0,0 +1,132 @@
import type { BlockReplyContext, ReplyPayload } from "../types.js";
import type { BlockReplyPipeline } from "./block-reply-pipeline.js";
import type { TypingSignaler } from "./typing-mode.js";
import { logVerbose } from "../../globals.js";
import { SILENT_REPLY_TOKEN } from "../tokens.js";
import { createBlockReplyPayloadKey } from "./block-reply-pipeline.js";
import { parseReplyDirectives } from "./reply-directives.js";
import { applyReplyTagsToPayload, isRenderablePayload } from "./reply-payloads.js";
export type ReplyDirectiveParseMode = "always" | "auto" | "never";
export function normalizeReplyPayloadDirectives(params: {
payload: ReplyPayload;
currentMessageId?: string;
silentToken?: string;
trimLeadingWhitespace?: boolean;
parseMode?: ReplyDirectiveParseMode;
}): { payload: ReplyPayload; isSilent: boolean } {
const parseMode = params.parseMode ?? "always";
const silentToken = params.silentToken ?? SILENT_REPLY_TOKEN;
const sourceText = params.payload.text ?? "";
const shouldParse =
parseMode === "always" ||
(parseMode === "auto" &&
(sourceText.includes("[[") ||
sourceText.includes("MEDIA:") ||
sourceText.includes(silentToken)));
const parsed = shouldParse
? parseReplyDirectives(sourceText, {
currentMessageId: params.currentMessageId,
silentToken,
})
: undefined;
let text = parsed ? parsed.text || undefined : params.payload.text || undefined;
if (params.trimLeadingWhitespace && text) {
text = text.trimStart() || undefined;
}
const mediaUrls = params.payload.mediaUrls ?? parsed?.mediaUrls;
const mediaUrl = params.payload.mediaUrl ?? parsed?.mediaUrl ?? mediaUrls?.[0];
return {
payload: {
...params.payload,
text,
mediaUrls,
mediaUrl,
replyToId: params.payload.replyToId ?? parsed?.replyToId,
replyToTag: params.payload.replyToTag || parsed?.replyToTag,
replyToCurrent: params.payload.replyToCurrent || parsed?.replyToCurrent,
audioAsVoice: Boolean(params.payload.audioAsVoice || parsed?.audioAsVoice),
},
isSilent: parsed?.isSilent ?? false,
};
}
const hasRenderableMedia = (payload: ReplyPayload): boolean =>
Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
export function createBlockReplyDeliveryHandler(params: {
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => Promise<void> | void;
currentMessageId?: string;
normalizeStreamingText: (payload: ReplyPayload) => { text?: string; skip: boolean };
applyReplyToMode: (payload: ReplyPayload) => ReplyPayload;
typingSignals: TypingSignaler;
blockStreamingEnabled: boolean;
blockReplyPipeline: BlockReplyPipeline | null;
directlySentBlockKeys: Set<string>;
}): (payload: ReplyPayload) => Promise<void> {
return async (payload) => {
const { text, skip } = params.normalizeStreamingText(payload);
if (skip && !hasRenderableMedia(payload)) {
return;
}
const taggedPayload = applyReplyTagsToPayload(
{
...payload,
text,
mediaUrl: payload.mediaUrl ?? payload.mediaUrls?.[0],
replyToId:
payload.replyToId ??
(payload.replyToCurrent === false ? undefined : params.currentMessageId),
},
params.currentMessageId,
);
// Let through payloads with audioAsVoice flag even if empty (need to track it).
if (!isRenderablePayload(taggedPayload) && !payload.audioAsVoice) {
return;
}
const normalized = normalizeReplyPayloadDirectives({
payload: taggedPayload,
currentMessageId: params.currentMessageId,
silentToken: SILENT_REPLY_TOKEN,
trimLeadingWhitespace: true,
parseMode: "auto",
});
const blockPayload = params.applyReplyToMode(normalized.payload);
const blockHasMedia = hasRenderableMedia(blockPayload);
// Skip empty payloads unless they have audioAsVoice flag (need to track it).
if (!blockPayload.text && !blockHasMedia && !blockPayload.audioAsVoice) {
return;
}
if (normalized.isSilent && !blockHasMedia) {
return;
}
if (blockPayload.text) {
void params.typingSignals.signalTextDelta(blockPayload.text).catch((err) => {
logVerbose(`block reply typing signal failed: ${String(err)}`);
});
}
// Use pipeline if available (block streaming enabled), otherwise send directly.
if (params.blockStreamingEnabled && params.blockReplyPipeline) {
params.blockReplyPipeline.enqueue(blockPayload);
} else if (params.blockStreamingEnabled) {
// Send directly when flushing before tool execution (no pipeline but streaming enabled).
// Track sent key to avoid duplicate in final payloads.
params.directlySentBlockKeys.add(createBlockReplyPayloadKey(blockPayload));
await params.onBlockReply(blockPayload);
}
// When streaming is disabled entirely, blocks are accumulated in final text instead.
};
}