fix(telegram): split streaming preview per assistant block (#22613)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 26f35f4411
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Ayaan Zaidi
2026-02-21 18:05:23 +05:30
committed by GitHub
parent 36a0df423d
commit 8b1fe0d1e2
14 changed files with 277 additions and 19 deletions

View File

@@ -85,6 +85,7 @@ Docs: https://docs.openclaw.ai
- Memory: return empty snippets when `memory_get`/QMD read files that have not been created yet, and harden memory indexing/session helpers against ENOENT races so missing Markdown no longer crashes tools. (#20680) Thanks @pahdo.
- Telegram/Streaming: always clean up draft previews even when dispatch throws before fallback handling, preventing orphaned preview messages during failed runs. (#19041) thanks @mudrii.
- Telegram/Streaming: split reasoning and answer draft preview lanes to prevent cross-lane overwrites, and ignore literal `<think>` tags inside inline/fenced code snippets so sample markup is not misrouted as reasoning. (#20774) Thanks @obviyus.
- Telegram/Streaming: restore 30-char first-preview debounce and scope `NO_REPLY` prefix suppression to partial sentinel fragments so normal `No...` text is not filtered. (#22613) thanks @obviyus.
- Telegram/Status reactions: refresh stall timers on repeated phase updates and honor ack-reaction scope when lifecycle reactions are enabled, preventing false stall emojis and unwanted group reactions. Thanks @wolly-tundracube and @thewilloftheshadow.
- Telegram/Status reactions: keep lifecycle reactions active when available-reactions lookup fails by falling back to unrestricted variant selection instead of suppressing reaction updates. (#22380) thanks @obviyus.
- Discord/Streaming: apply `replyToMode: first` only to the first Discord chunk so block-streamed replies do not spam mention pings. (#20726) Thanks @thewilloftheshadow for the report.

View File

@@ -28,7 +28,7 @@ import {
import { stripHeartbeatToken } from "../heartbeat.js";
import type { TemplateContext } from "../templating.js";
import type { VerboseLevel } from "../thinking.js";
import { isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import { isSilentReplyPrefixText, isSilentReplyText, SILENT_REPLY_TOKEN } from "../tokens.js";
import type { GetReplyOptions, ReplyPayload } from "../types.js";
import {
buildEmbeddedRunBaseParams,
@@ -157,6 +157,9 @@ export async function runAgentTurnWithFallback(params: {
return { text: sanitized, skip: false };
};
const handlePartialForTyping = async (payload: ReplyPayload): Promise<string | undefined> => {
if (isSilentReplyPrefixText(payload.text, SILENT_REPLY_TOKEN)) {
return undefined;
}
const { text, skip } = normalizeStreamingText(payload);
if (skip || !text) {
return undefined;

View File

@@ -383,6 +383,50 @@ describe("runReplyAgent typing (heartbeat)", () => {
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("suppresses partial streaming for NO_REPLY prefixes", async () => {
const onPartialReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "NO_" });
await params.onPartialReply?.({ text: "NO_RE" });
await params.onPartialReply?.({ text: "NO_REPLY" });
return { payloads: [{ text: "NO_REPLY" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
typingMode: "message",
});
await run();
expect(onPartialReply).not.toHaveBeenCalled();
expect(typing.startTypingOnText).not.toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("does not suppress partial streaming for normal 'No' prefixes", async () => {
const onPartialReply = vi.fn();
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onPartialReply?.({ text: "No" });
await params.onPartialReply?.({ text: "No, that is valid" });
return { payloads: [{ text: "No, that is valid" }], meta: {} };
});
const { run, typing } = createMinimalRun({
opts: { isHeartbeat: false, onPartialReply },
typingMode: "message",
});
await run();
expect(onPartialReply).toHaveBeenCalledTimes(2);
expect(onPartialReply).toHaveBeenNthCalledWith(1, { text: "No", mediaUrls: undefined });
expect(onPartialReply).toHaveBeenNthCalledWith(2, {
text: "No, that is valid",
mediaUrls: undefined,
});
expect(typing.startTypingOnText).toHaveBeenCalled();
expect(typing.startTypingLoop).not.toHaveBeenCalled();
});
it("does not start typing on assistant message start without prior text in message mode", async () => {
state.runEmbeddedPiAgentMock.mockImplementationOnce(async (params: AgentRunParams) => {
await params.onAssistantMessageStart?.();

View File

@@ -18,3 +18,23 @@ export function isSilentReplyText(
const suffix = new RegExp(`\\b${escaped}\\b\\W*$`);
return suffix.test(text);
}
export function isSilentReplyPrefixText(
text: string | undefined,
token: string = SILENT_REPLY_TOKEN,
): boolean {
if (!text) {
return false;
}
const normalized = text.trimStart().toUpperCase();
if (!normalized) {
return false;
}
if (!normalized.includes("_")) {
return false;
}
if (/[^A-Z_]/.test(normalized)) {
return false;
}
return token.toUpperCase().startsWith(normalized);
}

View File

@@ -3,6 +3,7 @@ export type DraftStreamLoop = {
flush: () => Promise<void>;
stop: () => void;
resetPending: () => void;
resetThrottleWindow: () => void;
waitForInFlight: () => Promise<void>;
};
@@ -87,6 +88,13 @@ export function createDraftStreamLoop(params: {
resetPending: () => {
pendingText = "";
},
resetThrottleWindow: () => {
lastSentAt = 0;
if (timer) {
clearTimeout(timer);
timer = undefined;
}
},
waitForInFlight: async () => {
if (inFlightPromise) {
await inFlightPromise;

View File

@@ -378,11 +378,11 @@ describe("legacy config detection", () => {
expect(res.config.channels?.telegram?.groupPolicy).toBe("allowlist");
}
});
it("defaults telegram.streaming to true when telegram section exists", async () => {
it("defaults telegram.streaming to false when telegram section exists", async () => {
const res = validateConfigObject({ channels: { telegram: {} } });
expect(res.ok).toBe(true);
if (res.ok) {
expect(res.config.channels?.telegram?.streaming).toBe(true);
expect(res.config.channels?.telegram?.streaming).toBe(false);
expect(res.config.channels?.telegram?.streamMode).toBeUndefined();
}
});

View File

@@ -392,7 +392,7 @@ export const FIELD_HELP: Record<string, string> = {
"channels.telegram.dmPolicy":
'Direct message access control ("pairing" recommended). "open" requires channels.telegram.allowFrom=["*"].',
"channels.telegram.streaming":
"Enable Telegram live stream preview via message edits (default: true; legacy streamMode auto-maps here).",
"Enable Telegram live stream preview via message edits (default: false; legacy streamMode auto-maps here).",
"channels.discord.streamMode":
"Live stream preview mode for Discord replies (off | partial | block). Separate from block streaming; uses sendMessage + editMessage.",
"channels.discord.draftChunk.minChars":

View File

@@ -117,7 +117,7 @@ function normalizeTelegramStreamingConfig(value: {
delete value.streamMode;
return;
}
value.streaming = true;
value.streaming = false;
}
export const TelegramAccountSchemaBase = z

View File

@@ -63,6 +63,25 @@ describe("dispatchTelegramMessage draft streaming", () => {
};
}
function createSequencedDraftStream(startMessageId = 1001) {
let activeMessageId: number | undefined;
let nextMessageId = startMessageId;
return {
update: vi.fn().mockImplementation(() => {
if (activeMessageId == null) {
activeMessageId = nextMessageId++;
}
}),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockImplementation(() => activeMessageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;
}),
};
}
function setupDraftStreams(params?: { answerMessageId?: number; reasoningMessageId?: number }) {
const answerDraftStream = createDraftStream(params?.answerMessageId);
const reasoningDraftStream = createDraftStream(params?.reasoningMessageId);
@@ -172,7 +191,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect.objectContaining({
chatId: 123,
thread: { id: 777, scope: "dm" },
minInitialChars: 1,
minInitialChars: 30,
}),
);
expect(draftStream.update).toHaveBeenCalledWith("Hello");
@@ -193,7 +212,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("uses immediate preview updates for legacy block stream mode", async () => {
it("uses 30-char preview debounce for legacy block stream mode", async () => {
const draftStream = createDraftStream();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -209,7 +228,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(createTelegramDraftStream).toHaveBeenCalledWith(
expect.objectContaining({
minInitialChars: 1,
minInitialChars: 30,
}),
);
});
@@ -445,7 +464,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
);
});
it("does not force new message for legacy block stream mode", async () => {
it("forces new message for next assistant block in legacy block stream mode", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -464,10 +483,10 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ context: createContext(), streamMode: "block" });
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
});
it("does not force new message in partial mode when assistant message restarts", async () => {
it("forces new message in partial mode when assistant message restarts", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
@@ -483,7 +502,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
expect(draftStream.forceNewMessage).toHaveBeenCalledTimes(1);
});
it("does not force new message on first assistant message start", async () => {
@@ -508,6 +527,56 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.forceNewMessage).not.toHaveBeenCalled();
});
it("finalizes multi-message assistant stream to matching preview messages in order", async () => {
const answerDraftStream = createSequencedDraftStream(1001);
const reasoningDraftStream = createDraftStream();
createTelegramDraftStream
.mockImplementationOnce(() => answerDraftStream)
.mockImplementationOnce(() => reasoningDraftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Message A partial" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message B partial" });
await replyOptions?.onAssistantMessageStart?.();
await replyOptions?.onPartialReply?.({ text: "Message C partial" });
await dispatcherOptions.deliver({ text: "Message A final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Message B final" }, { kind: "final" });
await dispatcherOptions.deliver({ text: "Message C final" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "1001" });
await dispatchWithContext({ context: createContext(), streamMode: "partial" });
expect(answerDraftStream.forceNewMessage).toHaveBeenCalledTimes(2);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
1,
123,
1001,
"Message A final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
2,
123,
1002,
"Message B final",
expect.any(Object),
);
expect(editMessageTelegram).toHaveBeenNthCalledWith(
3,
123,
1003,
"Message C final",
expect.any(Object),
);
expect(deliverReplies).not.toHaveBeenCalled();
});
it.each(["block", "partial"] as const)(
"splits reasoning lane only when a later reasoning block starts (%s mode)",
async (streamMode) => {

View File

@@ -147,8 +147,7 @@ export const dispatchTelegramMessage = async ({
const canStreamReasoningDraft = canStreamAnswerDraft || streamReasoningDraft;
const draftReplyToMessageId =
replyToMode !== "off" && typeof msg.message_id === "number" ? msg.message_id : undefined;
const draftMinInitialChars =
previewStreamingEnabled || streamReasoningDraft ? 1 : DRAFT_MIN_INITIAL_CHARS;
const draftMinInitialChars = DRAFT_MIN_INITIAL_CHARS;
const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId);
type LaneName = "answer" | "reasoning";
type DraftLaneState = {
@@ -184,6 +183,8 @@ export const dispatchTelegramMessage = async ({
const reasoningLane = lanes.reasoning;
let splitReasoningOnNextStream = false;
const reasoningStepState = createTelegramReasoningStepState();
type ArchivedPreview = { messageId: number; textSnapshot: string };
const archivedAnswerPreviews: ArchivedPreview[] = [];
type SplitLaneSegment = { lane: LaneName; text: string };
const splitTextIntoLaneSegments = (text?: string): SplitLaneSegment[] => {
const split = splitTelegramReasoningText(text);
@@ -353,6 +354,8 @@ export const dispatchTelegramMessage = async ({
updateLaneSnapshot?: boolean;
skipRegressive: "always" | "existingOnly";
context: "final" | "update";
previewMessageId?: number;
previewTextSnapshot?: string;
}): Promise<boolean> => {
const {
lane,
@@ -363,19 +366,26 @@ export const dispatchTelegramMessage = async ({
updateLaneSnapshot = false,
skipRegressive,
context,
previewMessageId: previewMessageIdOverride,
previewTextSnapshot,
} = params;
if (!lane.stream) {
return false;
}
const hadPreviewMessage = typeof lane.stream.messageId() === "number";
const lanePreviewMessageId = lane.stream.messageId();
const hadPreviewMessage =
typeof previewMessageIdOverride === "number" || typeof lanePreviewMessageId === "number";
if (stopBeforeEdit) {
await lane.stream.stop();
}
const previewMessageId = lane.stream.messageId();
const previewMessageId =
typeof previewMessageIdOverride === "number"
? previewMessageIdOverride
: lane.stream.messageId();
if (typeof previewMessageId !== "number") {
return false;
}
const currentPreviewText = getLanePreviewText(lane);
const currentPreviewText = previewTextSnapshot ?? getLanePreviewText(lane);
const shouldSkipRegressive =
Boolean(currentPreviewText) &&
currentPreviewText.startsWith(text) &&
@@ -446,6 +456,36 @@ export const dispatchTelegramMessage = async ({
!hasMedia && text.length > 0 && text.length <= draftMaxChars && !payload.isError;
if (infoKind === "final") {
if (laneName === "answer" && archivedAnswerPreviews.length > 0) {
const archivedPreview = archivedAnswerPreviews.shift();
if (archivedPreview) {
if (canEditViaPreview) {
const finalized = await tryUpdatePreviewForLane({
lane,
laneName,
text,
previewButtons,
stopBeforeEdit: false,
skipRegressive: "existingOnly",
context: "final",
previewMessageId: archivedPreview.messageId,
previewTextSnapshot: archivedPreview.textSnapshot,
});
if (finalized) {
return "preview-finalized";
}
}
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {
logVerbose(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
const delivered = await sendPayload(applyTextToPayload(payload, text));
return delivered ? "sent" : "skipped";
}
}
if (canEditViaPreview && !finalizedPreviewByLane[laneName]) {
await flushDraftLane(lane);
const finalized = await tryUpdatePreviewForLane({
@@ -628,8 +668,18 @@ export const dispatchTelegramMessage = async ({
}
: undefined,
onAssistantMessageStart: answerLane.stream
? () => {
? async () => {
reasoningStepState.resetForNextStep();
if (answerLane.hasStreamedMessage) {
const previewMessageId = answerLane.stream?.messageId();
if (typeof previewMessageId === "number") {
archivedAnswerPreviews.push({
messageId: previewMessageId,
textSnapshot: answerLane.lastPartialText,
});
}
answerLane.stream?.forceNewMessage();
}
resetDraftLaneState(answerLane);
}
: undefined,
@@ -676,6 +726,15 @@ export const dispatchTelegramMessage = async ({
await stream.clear();
}
}
for (const archivedPreview of archivedAnswerPreviews) {
try {
await bot.api.deleteMessage(chatId, archivedPreview.messageId);
} catch (err) {
logVerbose(
`telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`,
);
}
}
}
let sentFallback = false;
if (

View File

@@ -0,0 +1,20 @@
import { describe, expect, it } from "vitest";
import { resolveTelegramStreamMode } from "./bot/helpers.js";
describe("resolveTelegramStreamMode", () => {
it("defaults to off when telegram streaming is unset", () => {
expect(resolveTelegramStreamMode(undefined)).toBe("off");
expect(resolveTelegramStreamMode({})).toBe("off");
});
it("prefers explicit streaming boolean", () => {
expect(resolveTelegramStreamMode({ streaming: true })).toBe("partial");
expect(resolveTelegramStreamMode({ streaming: false })).toBe("off");
});
it("maps legacy streamMode values", () => {
expect(resolveTelegramStreamMode({ streamMode: "off" })).toBe("off");
expect(resolveTelegramStreamMode({ streamMode: "partial" })).toBe("partial");
expect(resolveTelegramStreamMode({ streamMode: "block" })).toBe("partial");
});
});

View File

@@ -167,7 +167,7 @@ export function resolveTelegramStreamMode(telegramCfg?: {
if (raw === "partial" || raw === "block") {
return "partial";
}
return "partial";
return "off";
}
export function buildTelegramGroupPeerId(chatId: number | string, messageThreadId?: number) {

View File

@@ -134,6 +134,39 @@ describe("createTelegramDraftStream", () => {
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined);
});
it("sends first update immediately after forceNewMessage within throttle window", async () => {
vi.useFakeTimers();
try {
const api = {
sendMessage: vi
.fn()
.mockResolvedValueOnce({ message_id: 17 })
.mockResolvedValueOnce({ message_id: 42 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
};
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
throttleMs: 1000,
});
stream.update("Hello");
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(1));
stream.update("Hello edited");
expect(api.editMessageText).not.toHaveBeenCalled();
stream.forceNewMessage();
stream.update("Second message");
await vi.waitFor(() => expect(api.sendMessage).toHaveBeenCalledTimes(2));
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "Second message", undefined);
} finally {
vi.useRealTimers();
}
});
it("supports rendered previews with parse_mode", async () => {
const api = createMockDraftApi();
const stream = createTelegramDraftStream({

View File

@@ -167,6 +167,7 @@ export function createTelegramDraftStream(params: {
lastSentText = "";
lastSentParseMode = undefined;
loop.resetPending();
loop.resetThrottleWindow();
};
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);