mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-04 03:50:23 +00:00
fix: split telegram reasoning and answer draft streams (#20774)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 7458444144
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
@@ -10,11 +10,13 @@ import { EmbeddedBlockChunker } from "../agents/pi-embedded-block-chunker.js";
|
||||
import { resolveChunkMode } from "../auto-reply/chunk.js";
|
||||
import { clearHistoryEntriesIfEnabled } from "../auto-reply/reply/history.js";
|
||||
import { dispatchReplyWithBufferedBlockDispatcher } from "../auto-reply/reply/provider-dispatcher.js";
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
import { removeAckReactionAfterReply } from "../channels/ack-reactions.js";
|
||||
import { logAckFailure, logTypingFailure } from "../channels/logging.js";
|
||||
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
|
||||
import { createTypingCallbacks } from "../channels/typing.js";
|
||||
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
|
||||
import { loadSessionStore, resolveStorePath } from "../config/sessions.js";
|
||||
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
|
||||
import { danger, logVerbose } from "../globals.js";
|
||||
import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
|
||||
@@ -26,6 +28,11 @@ import type { TelegramStreamMode } from "./bot/types.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js";
|
||||
import { createTelegramDraftStream } from "./draft-stream.js";
|
||||
import { renderTelegramHtmlText } from "./format.js";
|
||||
import {
|
||||
createTelegramReasoningStepState,
|
||||
splitTelegramReasoningText,
|
||||
} from "./reasoning-lane-coordinator.js";
|
||||
import { editMessageTelegram } from "./send.js";
|
||||
import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
|
||||
|
||||
@@ -60,6 +67,31 @@ type DispatchTelegramMessageParams = {
|
||||
opts: Pick<TelegramBotOptions, "token">;
|
||||
};
|
||||
|
||||
type TelegramReasoningLevel = "off" | "on" | "stream";
|
||||
|
||||
function resolveTelegramReasoningLevel(params: {
|
||||
cfg: OpenClawConfig;
|
||||
sessionKey?: string;
|
||||
agentId: string;
|
||||
}): TelegramReasoningLevel {
|
||||
const { cfg, sessionKey, agentId } = params;
|
||||
if (!sessionKey) {
|
||||
return "off";
|
||||
}
|
||||
try {
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
const store = loadSessionStore(storePath, { skipCache: true });
|
||||
const entry = store[sessionKey.toLowerCase()] ?? store[sessionKey];
|
||||
const level = entry?.reasoningLevel;
|
||||
if (level === "on" || level === "stream") {
|
||||
return level;
|
||||
}
|
||||
} catch {
|
||||
// Fall through to default.
|
||||
}
|
||||
return "off";
|
||||
}
|
||||
|
||||
export const dispatchTelegramMessage = async ({
|
||||
context,
|
||||
bot,
|
||||
@@ -90,112 +122,183 @@ export const dispatchTelegramMessage = async ({
|
||||
} = context;
|
||||
|
||||
const draftMaxChars = Math.min(textLimit, 4096);
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const renderDraftPreview = (text: string) => ({
|
||||
text: renderTelegramHtmlText(text, { tableMode }),
|
||||
parseMode: "HTML" as const,
|
||||
});
|
||||
const accountBlockStreamingEnabled =
|
||||
typeof telegramCfg.blockStreaming === "boolean"
|
||||
? telegramCfg.blockStreaming
|
||||
: cfg.agents?.defaults?.blockStreamingDefault === "on";
|
||||
const canStreamDraft = streamMode !== "off" && !accountBlockStreamingEnabled;
|
||||
const resolvedReasoningLevel = resolveTelegramReasoningLevel({
|
||||
cfg,
|
||||
sessionKey: ctxPayload.SessionKey,
|
||||
agentId: route.agentId,
|
||||
});
|
||||
const forceBlockStreamingForReasoning = resolvedReasoningLevel === "on";
|
||||
const streamReasoningDraft = resolvedReasoningLevel === "stream";
|
||||
const canStreamAnswerDraft =
|
||||
streamMode !== "off" && !accountBlockStreamingEnabled && !forceBlockStreamingForReasoning;
|
||||
const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft;
|
||||
const draftReplyToMessageId =
|
||||
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
|
||||
const draftStream = canStreamDraft
|
||||
? createTelegramDraftStream({
|
||||
api: bot.api,
|
||||
chatId,
|
||||
maxChars: draftMaxChars,
|
||||
thread: threadSpec,
|
||||
replyToMessageId: draftReplyToMessageId,
|
||||
minInitialChars: DRAFT_MIN_INITIAL_CHARS,
|
||||
log: logVerbose,
|
||||
warn: logVerbose,
|
||||
})
|
||||
: undefined;
|
||||
const draftChunking =
|
||||
draftStream && streamMode === "block"
|
||||
? resolveTelegramDraftStreamingChunking(cfg, route.accountId)
|
||||
: undefined;
|
||||
const shouldSplitPreviewMessages = streamMode === "block";
|
||||
const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined;
|
||||
const draftMinInitialChars =
|
||||
streamMode === "partial" || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS;
|
||||
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
|
||||
let lastPartialText = "";
|
||||
let draftText = "";
|
||||
let hasStreamedMessage = false;
|
||||
const updateDraftFromPartial = (text?: string) => {
|
||||
if (!draftStream || !text) {
|
||||
type LaneName = "answer" | "reasoning";
|
||||
type DraftLaneState = {
|
||||
stream: ReturnType<typeof createTelegramDraftStream> | undefined;
|
||||
lastPartialText: string;
|
||||
draftText: string;
|
||||
hasStreamedMessage: boolean;
|
||||
chunker: EmbeddedBlockChunker | undefined;
|
||||
};
|
||||
const createDraftLane = (enabled: boolean): DraftLaneState => {
|
||||
const stream = enabled
|
||||
? createTelegramDraftStream({
|
||||
api: bot.api,
|
||||
chatId,
|
||||
maxChars: draftMaxChars,
|
||||
thread: threadSpec,
|
||||
replyToMessageId: draftReplyToMessageId,
|
||||
minInitialChars: draftMinInitialChars,
|
||||
renderText: renderDraftPreview,
|
||||
log: logVerbose,
|
||||
warn: logVerbose,
|
||||
})
|
||||
: undefined;
|
||||
const chunker =
|
||||
stream && streamMode === "block"
|
||||
? new EmbeddedBlockChunker(resolveTelegramDraftStreamingChunking(cfg, route.accountId))
|
||||
: undefined;
|
||||
return {
|
||||
stream,
|
||||
lastPartialText: "",
|
||||
draftText: "",
|
||||
hasStreamedMessage: false,
|
||||
chunker,
|
||||
};
|
||||
};
|
||||
const lanes: Record<LaneName, DraftLaneState> = {
|
||||
answer: createDraftLane(canStreamAnswerDraft),
|
||||
reasoning: createDraftLane(canStreamReasoningDraft),
|
||||
};
|
||||
const answerLane = lanes.answer;
|
||||
const reasoningLane = lanes.reasoning;
|
||||
let splitReasoningOnNextStream = false;
|
||||
const reasoningStepState = createTelegramReasoningStepState();
|
||||
type SplitLaneSegment = { lane: LaneName; text: string };
|
||||
const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => {
|
||||
const split = splitTelegramReasoningText(text);
|
||||
const segments: SplitLaneSegment[] = [];
|
||||
if (split.reasoningText) {
|
||||
segments.push({ lane: "reasoning", text: split.reasoningText });
|
||||
}
|
||||
if (split.answerText) {
|
||||
segments.push({ lane: "answer", text: split.answerText });
|
||||
}
|
||||
return segments;
|
||||
};
|
||||
const resetDraftLaneState = (lane: DraftLaneState) => {
|
||||
lane.lastPartialText = "";
|
||||
lane.draftText = "";
|
||||
lane.hasStreamedMessage = false;
|
||||
lane.chunker?.reset();
|
||||
};
|
||||
const updateDraftFromPartial = (lane: DraftLaneState, text: string | undefined) => {
|
||||
const laneStream = lane.stream;
|
||||
if (!laneStream || !text) {
|
||||
return;
|
||||
}
|
||||
if (text === lastPartialText) {
|
||||
if (text === lane.lastPartialText) {
|
||||
return;
|
||||
}
|
||||
// Mark that we've received streaming content (for forceNewMessage decision).
|
||||
hasStreamedMessage = true;
|
||||
lane.hasStreamedMessage = true;
|
||||
if (streamMode === "partial") {
|
||||
// Some providers briefly emit a shorter prefix snapshot (for example
|
||||
// "Sure." -> "Sure" -> "Sure."). Keep the longer preview to avoid
|
||||
// visible punctuation flicker.
|
||||
if (
|
||||
lastPartialText &&
|
||||
lastPartialText.startsWith(text) &&
|
||||
text.length < lastPartialText.length
|
||||
lane.lastPartialText &&
|
||||
lane.lastPartialText.startsWith(text) &&
|
||||
text.length < lane.lastPartialText.length
|
||||
) {
|
||||
return;
|
||||
}
|
||||
lastPartialText = text;
|
||||
draftStream.update(text);
|
||||
lane.lastPartialText = text;
|
||||
laneStream.update(text);
|
||||
return;
|
||||
}
|
||||
let delta = text;
|
||||
if (text.startsWith(lastPartialText)) {
|
||||
delta = text.slice(lastPartialText.length);
|
||||
if (text.startsWith(lane.lastPartialText)) {
|
||||
delta = text.slice(lane.lastPartialText.length);
|
||||
} else {
|
||||
// Streaming buffer reset (or non-monotonic stream). Start fresh.
|
||||
draftChunker?.reset();
|
||||
draftText = "";
|
||||
lane.chunker?.reset();
|
||||
lane.draftText = "";
|
||||
}
|
||||
lastPartialText = text;
|
||||
lane.lastPartialText = text;
|
||||
if (!delta) {
|
||||
return;
|
||||
}
|
||||
if (!draftChunker) {
|
||||
draftText = text;
|
||||
draftStream.update(draftText);
|
||||
if (!lane.chunker) {
|
||||
lane.draftText = text;
|
||||
laneStream.update(lane.draftText);
|
||||
return;
|
||||
}
|
||||
draftChunker.append(delta);
|
||||
draftChunker.drain({
|
||||
lane.chunker.append(delta);
|
||||
lane.chunker.drain({
|
||||
force: false,
|
||||
emit: (chunk) => {
|
||||
draftText += chunk;
|
||||
draftStream.update(draftText);
|
||||
lane.draftText += chunk;
|
||||
laneStream.update(lane.draftText);
|
||||
},
|
||||
});
|
||||
};
|
||||
const flushDraft = async () => {
|
||||
if (!draftStream) {
|
||||
const ingestDraftLaneSegments = (text: string | undefined) => {
|
||||
for (const segment of splitTextIntoLaneSegments(text)) {
|
||||
if (segment.lane === "reasoning") {
|
||||
reasoningStepState.noteReasoningHint();
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
}
|
||||
updateDraftFromPartial(lanes[segment.lane], segment.text);
|
||||
}
|
||||
};
|
||||
const flushDraftLane = async (lane: DraftLaneState) => {
|
||||
if (!lane.stream) {
|
||||
return;
|
||||
}
|
||||
if (draftChunker?.hasBuffered()) {
|
||||
draftChunker.drain({
|
||||
if (lane.chunker?.hasBuffered()) {
|
||||
lane.chunker.drain({
|
||||
force: true,
|
||||
emit: (chunk) => {
|
||||
draftText += chunk;
|
||||
lane.draftText += chunk;
|
||||
},
|
||||
});
|
||||
draftChunker.reset();
|
||||
if (draftText) {
|
||||
draftStream.update(draftText);
|
||||
lane.chunker.reset();
|
||||
if (lane.draftText) {
|
||||
lane.stream.update(lane.draftText);
|
||||
}
|
||||
}
|
||||
await draftStream.flush();
|
||||
await lane.stream.flush();
|
||||
};
|
||||
|
||||
const disableBlockStreaming =
|
||||
streamMode === "off"
|
||||
? true // off mode must always disable block streaming
|
||||
: typeof telegramCfg.blockStreaming === "boolean"
|
||||
? !telegramCfg.blockStreaming
|
||||
: draftStream
|
||||
? true
|
||||
: undefined;
|
||||
? true
|
||||
: forceBlockStreamingForReasoning
|
||||
? false
|
||||
: typeof telegramCfg.blockStreaming === "boolean"
|
||||
? !telegramCfg.blockStreaming
|
||||
: canStreamAnswerDraft
|
||||
? true
|
||||
: undefined;
|
||||
|
||||
const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({
|
||||
cfg,
|
||||
@@ -203,11 +306,6 @@ export const dispatchTelegramMessage = async ({
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const tableMode = resolveMarkdownTableMode({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
accountId: route.accountId,
|
||||
});
|
||||
const chunkMode = resolveChunkMode(cfg, "telegram", route.accountId);
|
||||
|
||||
// Handle uncached stickers: get a dedicated vision description before dispatch
|
||||
@@ -271,26 +369,12 @@ export const dispatchTelegramMessage = async ({
|
||||
const deliveryState = {
|
||||
delivered: false,
|
||||
skippedNonSilent: 0,
|
||||
failedDeliveries: 0,
|
||||
failedNonSilent: 0,
|
||||
};
|
||||
let finalizedViaPreviewMessage = false;
|
||||
|
||||
/**
|
||||
* Clean up the draft preview message. The preview must be removed in every
|
||||
* case EXCEPT when it was successfully finalized as the actual response via
|
||||
* an in-place edit (`finalizedViaPreviewMessage === true`).
|
||||
*/
|
||||
const clearDraftPreviewIfNeeded = async () => {
|
||||
if (finalizedViaPreviewMessage) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await draftStream?.clear();
|
||||
} catch (err) {
|
||||
logVerbose(`telegram: draft preview cleanup failed: ${String(err)}`);
|
||||
}
|
||||
const finalizedPreviewByLane: Record<LaneName, boolean> = {
|
||||
answer: false,
|
||||
reasoning: false,
|
||||
};
|
||||
|
||||
const clearGroupHistory = () => {
|
||||
if (isGroup && historyKey) {
|
||||
clearHistoryEntriesIfEnabled({ historyMap: groupHistories, historyKey, limit: historyLimit });
|
||||
@@ -310,9 +394,157 @@ export const dispatchTelegramMessage = async ({
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
replyQuoteText,
|
||||
};
|
||||
const getLanePreviewText = (lane: DraftLaneState) =>
|
||||
streamMode === "block" ? lane.draftText : lane.lastPartialText;
|
||||
const tryUpdatePreviewForLane = async (params: {
|
||||
lane: DraftLaneState;
|
||||
laneName: LaneName;
|
||||
text: string;
|
||||
previewButtons?: TelegramInlineButtons;
|
||||
stopBeforeEdit?: boolean;
|
||||
updateLaneSnapshot?: boolean;
|
||||
skipRegressive: "always" | "existingOnly";
|
||||
context: "final" | "update";
|
||||
}): Promise<boolean> => {
|
||||
const {
|
||||
lane,
|
||||
laneName,
|
||||
text,
|
||||
previewButtons,
|
||||
stopBeforeEdit = false,
|
||||
updateLaneSnapshot = false,
|
||||
skipRegressive,
|
||||
context,
|
||||
} = params;
|
||||
if (!lane.stream) {
|
||||
return false;
|
||||
}
|
||||
const hadPreviewMessage = typeof lane.stream.messageId() === "number";
|
||||
if (stopBeforeEdit) {
|
||||
await lane.stream.stop();
|
||||
}
|
||||
const previewMessageId = lane.stream.messageId();
|
||||
if (typeof previewMessageId !== "number") {
|
||||
return false;
|
||||
}
|
||||
const currentPreviewText = getLanePreviewText(lane);
|
||||
const shouldSkipRegressive =
|
||||
Boolean(currentPreviewText) &&
|
||||
currentPreviewText.startsWith(text) &&
|
||||
text.length < currentPreviewText.length &&
|
||||
(skipRegressive === "always" || hadPreviewMessage);
|
||||
if (shouldSkipRegressive) {
|
||||
// Avoid regressive punctuation/wording flicker from occasional shorter finals.
|
||||
deliveryState.delivered = true;
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
await editMessageTelegram(chatId, previewMessageId, text, {
|
||||
api: bot.api,
|
||||
cfg,
|
||||
accountId: route.accountId,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
buttons: previewButtons,
|
||||
});
|
||||
if (updateLaneSnapshot) {
|
||||
lane.lastPartialText = text;
|
||||
lane.draftText = text;
|
||||
}
|
||||
deliveryState.delivered = true;
|
||||
return true;
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram: ${laneName} preview ${context} edit failed; falling back to standard send (${String(err)})`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
const applyTextToPayload = (payload: ReplyPayload, text: string): ReplyPayload => {
|
||||
if (payload.text === text) {
|
||||
return payload;
|
||||
}
|
||||
return { ...payload, text };
|
||||
};
|
||||
const sendPayload = async (payload: ReplyPayload) => {
|
||||
const result = await deliverReplies({
|
||||
...deliveryBaseOptions,
|
||||
replies: [payload],
|
||||
onVoiceRecording: sendRecordVoice,
|
||||
});
|
||||
if (result.delivered) {
|
||||
deliveryState.delivered = true;
|
||||
}
|
||||
return result.delivered;
|
||||
};
|
||||
type LaneDeliveryResult = "preview-finalized" | "preview-updated" | "sent" | "skipped";
|
||||
const deliverLaneText = async (params: {
|
||||
laneName: LaneName;
|
||||
text: string;
|
||||
payload: ReplyPayload;
|
||||
infoKind: string;
|
||||
previewButtons?: TelegramInlineButtons;
|
||||
allowPreviewUpdateForNonFinal?: boolean;
|
||||
}): Promise<LaneDeliveryResult> => {
|
||||
const {
|
||||
laneName,
|
||||
text,
|
||||
payload,
|
||||
infoKind,
|
||||
previewButtons,
|
||||
allowPreviewUpdateForNonFinal = false,
|
||||
} = params;
|
||||
const lane = lanes[laneName];
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
const canEditViaPreview =
|
||||
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
|
||||
|
||||
if (infoKind === "final") {
|
||||
if (canEditViaPreview && !finalizedPreviewByLane[laneName]) {
|
||||
await flushDraftLane(lane);
|
||||
const finalized = await tryUpdatePreviewForLane({
|
||||
lane,
|
||||
laneName,
|
||||
text,
|
||||
previewButtons,
|
||||
stopBeforeEdit: true,
|
||||
skipRegressive: "existingOnly",
|
||||
context: "final",
|
||||
});
|
||||
if (finalized) {
|
||||
finalizedPreviewByLane[laneName] = true;
|
||||
return "preview-finalized";
|
||||
}
|
||||
} else if (!hasMedia && !payload.isError && text.length > draftMaxChars) {
|
||||
logVerbose(
|
||||
`telegram: preview final too long for edit (${text.length} > ${draftMaxChars}); falling back to standard send`,
|
||||
);
|
||||
}
|
||||
await lane.stream?.stop();
|
||||
const delivered = await sendPayload(applyTextToPayload(payload, text));
|
||||
return delivered ? "sent" : "skipped";
|
||||
}
|
||||
|
||||
if (allowPreviewUpdateForNonFinal && canEditViaPreview) {
|
||||
const updated = await tryUpdatePreviewForLane({
|
||||
lane,
|
||||
laneName,
|
||||
text,
|
||||
previewButtons,
|
||||
stopBeforeEdit: false,
|
||||
updateLaneSnapshot: true,
|
||||
skipRegressive: "always",
|
||||
context: "update",
|
||||
});
|
||||
if (updated) {
|
||||
return "preview-updated";
|
||||
}
|
||||
}
|
||||
|
||||
const delivered = await sendPayload(applyTextToPayload(payload, text));
|
||||
return delivered ? "sent" : "skipped";
|
||||
};
|
||||
|
||||
let queuedFinal = false;
|
||||
let dispatchError: unknown;
|
||||
try {
|
||||
({ queuedFinal } = await dispatchReplyWithBufferedBlockDispatcher({
|
||||
ctx: ctxPayload,
|
||||
@@ -320,117 +552,86 @@ export const dispatchTelegramMessage = async ({
|
||||
dispatcherOptions: {
|
||||
...prefixOptions,
|
||||
deliver: async (payload, info) => {
|
||||
if (info.kind === "final") {
|
||||
await flushDraft();
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
const previewMessageId = draftStream?.messageId();
|
||||
const finalText = payload.text;
|
||||
const currentPreviewText = streamMode === "block" ? draftText : lastPartialText;
|
||||
const previewButtons = (
|
||||
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
|
||||
const previewButtons = (
|
||||
payload.channelData?.telegram as { buttons?: TelegramInlineButtons } | undefined
|
||||
)?.buttons;
|
||||
const segments = splitTextIntoLaneSegments(payload.text);
|
||||
const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
|
||||
const flushBufferedFinalAnswer = async () => {
|
||||
const buffered = reasoningStepState.takeBufferedFinalAnswer();
|
||||
if (!buffered) {
|
||||
return;
|
||||
}
|
||||
const bufferedButtons = (
|
||||
buffered.payload.channelData?.telegram as
|
||||
| { buttons?: TelegramInlineButtons }
|
||||
| undefined
|
||||
)?.buttons;
|
||||
let draftStoppedForPreviewEdit = false;
|
||||
// Skip preview edit for error payloads to avoid overwriting previous content
|
||||
const canFinalizeViaPreviewEdit =
|
||||
!finalizedViaPreviewMessage &&
|
||||
!hasMedia &&
|
||||
typeof finalText === "string" &&
|
||||
finalText.length > 0 &&
|
||||
typeof previewMessageId === "number" &&
|
||||
finalText.length <= draftMaxChars &&
|
||||
!payload.isError;
|
||||
if (canFinalizeViaPreviewEdit) {
|
||||
await draftStream?.stop();
|
||||
draftStoppedForPreviewEdit = true;
|
||||
if (
|
||||
currentPreviewText &&
|
||||
currentPreviewText.startsWith(finalText) &&
|
||||
finalText.length < currentPreviewText.length
|
||||
) {
|
||||
// Ignore regressive final edits (e.g., "Okay." -> "Ok"), which
|
||||
// can appear transiently in some provider streams.
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await editMessageTelegram(chatId, previewMessageId, finalText, {
|
||||
api: bot.api,
|
||||
cfg,
|
||||
accountId: route.accountId,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
buttons: previewButtons,
|
||||
});
|
||||
finalizedViaPreviewMessage = true;
|
||||
deliveryState.delivered = true;
|
||||
logVerbose(
|
||||
`telegram: finalized response via preview edit (messageId=${previewMessageId})`,
|
||||
);
|
||||
return;
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram: preview final edit failed; falling back to standard send (${String(err)})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
await deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: buffered.text,
|
||||
payload: buffered.payload,
|
||||
infoKind: "final",
|
||||
previewButtons: bufferedButtons,
|
||||
});
|
||||
reasoningStepState.resetForNextStep();
|
||||
};
|
||||
|
||||
for (const segment of segments) {
|
||||
if (
|
||||
!hasMedia &&
|
||||
!payload.isError &&
|
||||
typeof finalText === "string" &&
|
||||
finalText.length > draftMaxChars
|
||||
segment.lane === "answer" &&
|
||||
info.kind === "final" &&
|
||||
reasoningStepState.shouldBufferFinalAnswer()
|
||||
) {
|
||||
logVerbose(
|
||||
`telegram: preview final too long for edit (${finalText.length} > ${draftMaxChars}); falling back to standard send`,
|
||||
);
|
||||
reasoningStepState.bufferFinalAnswer({ payload, text: segment.text });
|
||||
continue;
|
||||
}
|
||||
if (!draftStoppedForPreviewEdit) {
|
||||
await draftStream?.stop();
|
||||
if (segment.lane === "reasoning") {
|
||||
reasoningStepState.noteReasoningHint();
|
||||
}
|
||||
// Check if stop() sent a message (debounce released on isFinal)
|
||||
// If so, edit that message instead of sending a new one
|
||||
const messageIdAfterStop = draftStream?.messageId();
|
||||
if (
|
||||
!finalizedViaPreviewMessage &&
|
||||
typeof messageIdAfterStop === "number" &&
|
||||
typeof finalText === "string" &&
|
||||
finalText.length > 0 &&
|
||||
finalText.length <= draftMaxChars &&
|
||||
!hasMedia &&
|
||||
!payload.isError
|
||||
) {
|
||||
try {
|
||||
await editMessageTelegram(chatId, messageIdAfterStop, finalText, {
|
||||
api: bot.api,
|
||||
cfg,
|
||||
accountId: route.accountId,
|
||||
linkPreview: telegramCfg.linkPreview,
|
||||
buttons: previewButtons,
|
||||
});
|
||||
finalizedViaPreviewMessage = true;
|
||||
deliveryState.delivered = true;
|
||||
logVerbose(
|
||||
`telegram: finalized response via post-stop preview edit (messageId=${messageIdAfterStop})`,
|
||||
);
|
||||
return;
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`telegram: post-stop preview edit failed; falling back to standard send (${String(err)})`,
|
||||
);
|
||||
const result = await deliverLaneText({
|
||||
laneName: segment.lane,
|
||||
text: segment.text,
|
||||
payload,
|
||||
infoKind: info.kind,
|
||||
previewButtons,
|
||||
allowPreviewUpdateForNonFinal: segment.lane === "reasoning",
|
||||
});
|
||||
if (segment.lane === "reasoning") {
|
||||
if (result !== "skipped") {
|
||||
reasoningStepState.noteReasoningDelivered();
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (info.kind === "final") {
|
||||
if (reasoningLane.hasStreamedMessage) {
|
||||
finalizedPreviewByLane.reasoning = true;
|
||||
}
|
||||
reasoningStepState.resetForNextStep();
|
||||
}
|
||||
}
|
||||
const result = await deliverReplies({
|
||||
...deliveryBaseOptions,
|
||||
replies: [payload],
|
||||
onVoiceRecording: sendRecordVoice,
|
||||
});
|
||||
if (result.delivered) {
|
||||
deliveryState.delivered = true;
|
||||
logVerbose(
|
||||
`telegram: ${info.kind} reply delivered to chat ${chatId}${payload.isError ? " (error payload)" : ""}`,
|
||||
);
|
||||
} else {
|
||||
logVerbose(
|
||||
`telegram: ${info.kind} reply delivery returned not-delivered for chat ${chatId}`,
|
||||
);
|
||||
if (segments.length > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (info.kind === "final") {
|
||||
await answerLane.stream?.stop();
|
||||
await reasoningLane.stream?.stop();
|
||||
reasoningStepState.resetForNextStep();
|
||||
}
|
||||
const canSendAsIs =
|
||||
hasMedia || typeof payload.text !== "string" || payload.text.length > 0;
|
||||
if (!canSendAsIs) {
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
return;
|
||||
}
|
||||
await sendPayload(payload);
|
||||
if (info.kind === "final") {
|
||||
await flushBufferedFinalAnswer();
|
||||
}
|
||||
},
|
||||
onSkip: (_payload, info) => {
|
||||
@@ -439,7 +640,7 @@ export const dispatchTelegramMessage = async ({
|
||||
}
|
||||
},
|
||||
onError: (err, info) => {
|
||||
deliveryState.failedDeliveries += 1;
|
||||
deliveryState.failedNonSilent += 1;
|
||||
runtime.error?.(danger(`telegram ${info.kind} reply failed: ${String(err)}`));
|
||||
},
|
||||
onReplyStart: createTypingCallbacks({
|
||||
@@ -457,60 +658,82 @@ export const dispatchTelegramMessage = async ({
|
||||
replyOptions: {
|
||||
skillFilter,
|
||||
disableBlockStreaming,
|
||||
onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined,
|
||||
onAssistantMessageStart: draftStream
|
||||
? () => {
|
||||
// Only split preview bubbles in block mode. In partial mode, keep
|
||||
// editing one preview message to avoid flooding the chat.
|
||||
logVerbose(
|
||||
`telegram: onAssistantMessageStart called, hasStreamedMessage=${hasStreamedMessage}`,
|
||||
);
|
||||
if (shouldSplitPreviewMessages && hasStreamedMessage) {
|
||||
logVerbose(`telegram: calling forceNewMessage()`);
|
||||
draftStream.forceNewMessage();
|
||||
onPartialReply:
|
||||
answerLane.stream || reasoningLane.stream
|
||||
? (payload) => ingestDraftLaneSegments(payload.text)
|
||||
: undefined,
|
||||
onReasoningStream: reasoningLane.stream
|
||||
? (payload) => {
|
||||
// Split between reasoning blocks only when the next reasoning
|
||||
// stream starts. Splitting at reasoning-end can orphan the active
|
||||
// preview and cause duplicate reasoning sends on reasoning final.
|
||||
if (splitReasoningOnNextStream) {
|
||||
reasoningLane.stream?.forceNewMessage();
|
||||
resetDraftLaneState(reasoningLane);
|
||||
splitReasoningOnNextStream = false;
|
||||
}
|
||||
lastPartialText = "";
|
||||
draftText = "";
|
||||
draftChunker?.reset();
|
||||
ingestDraftLaneSegments(payload.text);
|
||||
}
|
||||
: undefined,
|
||||
onReasoningEnd: draftStream
|
||||
onAssistantMessageStart: answerLane.stream
|
||||
? () => {
|
||||
// Same policy as assistant-message boundaries: split only in block mode.
|
||||
if (shouldSplitPreviewMessages && hasStreamedMessage) {
|
||||
draftStream.forceNewMessage();
|
||||
reasoningStepState.resetForNextStep();
|
||||
// Keep answer blocks separated in block mode; partial mode keeps one answer lane.
|
||||
if (streamMode === "block" && answerLane.hasStreamedMessage) {
|
||||
answerLane.stream?.forceNewMessage();
|
||||
}
|
||||
lastPartialText = "";
|
||||
draftText = "";
|
||||
draftChunker?.reset();
|
||||
resetDraftLaneState(answerLane);
|
||||
}
|
||||
: undefined,
|
||||
onReasoningEnd: reasoningLane.stream
|
||||
? () => {
|
||||
// Split when/if a later reasoning block begins.
|
||||
splitReasoningOnNextStream = reasoningLane.hasStreamedMessage;
|
||||
}
|
||||
: undefined,
|
||||
onModelSelected,
|
||||
},
|
||||
}));
|
||||
} catch (err) {
|
||||
dispatchError = err;
|
||||
} finally {
|
||||
await draftStream?.stop();
|
||||
// Must stop() first to flush debounced content before clear() wipes state.
|
||||
const streamCleanupStates = new Map<
|
||||
NonNullable<DraftLaneState["stream"]>,
|
||||
{ shouldClear: boolean }
|
||||
>();
|
||||
const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [
|
||||
{ laneName: "answer", lane: answerLane },
|
||||
{ laneName: "reasoning", lane: reasoningLane },
|
||||
];
|
||||
for (const laneState of lanesToCleanup) {
|
||||
const stream = laneState.lane.stream;
|
||||
if (!stream) {
|
||||
continue;
|
||||
}
|
||||
const shouldClear = !finalizedPreviewByLane[laneState.laneName];
|
||||
const existing = streamCleanupStates.get(stream);
|
||||
if (!existing) {
|
||||
streamCleanupStates.set(stream, { shouldClear });
|
||||
continue;
|
||||
}
|
||||
existing.shouldClear = existing.shouldClear && shouldClear;
|
||||
}
|
||||
for (const [stream, cleanupState] of streamCleanupStates) {
|
||||
await stream.stop();
|
||||
if (cleanupState.shouldClear) {
|
||||
await stream.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
let sentFallback = false;
|
||||
try {
|
||||
if (
|
||||
!dispatchError &&
|
||||
!deliveryState.delivered &&
|
||||
(deliveryState.skippedNonSilent > 0 || deliveryState.failedDeliveries > 0)
|
||||
) {
|
||||
const result = await deliverReplies({
|
||||
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],
|
||||
...deliveryBaseOptions,
|
||||
});
|
||||
sentFallback = result.delivered;
|
||||
}
|
||||
} finally {
|
||||
await clearDraftPreviewIfNeeded();
|
||||
}
|
||||
if (dispatchError) {
|
||||
throw dispatchError;
|
||||
if (
|
||||
!deliveryState.delivered &&
|
||||
(deliveryState.skippedNonSilent > 0 || deliveryState.failedNonSilent > 0)
|
||||
) {
|
||||
const result = await deliverReplies({
|
||||
replies: [{ text: EMPTY_RESPONSE_FALLBACK }],
|
||||
...deliveryBaseOptions,
|
||||
});
|
||||
sentFallback = result.delivered;
|
||||
}
|
||||
|
||||
const hasFinalResponse = queuedFinal || sentFallback;
|
||||
|
||||
Reference in New Issue
Block a user