fix(telegram): prevent preview duplication in partial and block streaming modes

Fix Telegram streamed replies so preview chunks are finalized once in partial and block streaming modes.

Fixes #87624. Thanks @jmao0001.
This commit is contained in:
jmao
2026-06-03 08:06:08 -05:00
committed by GitHub
parent 90493ee8e2
commit e4993ec00f
4 changed files with 348 additions and 20 deletions

View File

@@ -3573,6 +3573,150 @@ describe("dispatchTelegramMessage draft streaming", () => {
await sidePromise;
});
it("does not drop the first chunk of a long final after a generic lane rotation", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await dispatcherOptions.deliver(
{ text: "A".repeat(4000) + "B".repeat(4000) },
{ kind: "final" },
);
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
textLimit: 4000,
});
expect(answerDraftStream.update).toHaveBeenCalledWith("A".repeat(4000));
});
it("does not suppress text-only blocks as delivered when answer draft is inactive", async () => {
setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "forced block" }, { kind: "block" });
await dispatcherOptions.deliver({ text: "final text" }, { kind: "final" });
return { queuedFinal: true };
});
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: {
streaming: { mode: "partial", block: { enabled: true } },
} satisfies Parameters<typeof dispatchTelegramMessage>[0]["telegramCfg"],
});
const deliveredTexts = deliverReplies.mock.calls.flatMap((call) =>
((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map(
(reply) => reply.text,
),
);
expect(deliveredTexts).toContain("forced block");
});
it("does not suppress text-only blocks after a tool-progress draft", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onToolStart?.({ name: "exec", phase: "start" });
await dispatcherOptions.deliver({ text: "block after progress" }, { kind: "block" });
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(mockCallArg(answerDraftStream.update)).toContain("Exec");
expect(answerDraftStream.update).toHaveBeenLastCalledWith("block after progress");
});
it("does not suppress button-bearing blocks after answer streaming starts", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
const buttons = [[{ text: "OK", callback_data: "ok" }]];
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "partial answer" });
await dispatcherOptions.deliver(
{ text: "choose now", channelData: { telegram: { buttons } } },
{ kind: "block" },
);
return { queuedFinal: true };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(answerDraftStream.update).toHaveBeenLastCalledWith("choose now");
expectRecordFields(mockCallArg(editMessageTelegram, 0, 3), { buttons });
});
it("finalizes a duplicate text-only block when no final follows", async () => {
const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 });
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "partial answer" });
await dispatcherOptions.deliver({ text: "partial answer" }, { kind: "block" });
return { queuedFinal: false };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(answerDraftStream.clear).not.toHaveBeenCalled();
expectRecordFields(mockCallArg(emitInternalMessageSentHook), {
content: "partial answer",
messageId: 2001,
});
expectRecordFields(mockCallArg(recordOutboundMessageForPromptContext), {
text: "partial answer",
messageId: 2001,
});
});
it("materializes a pending duplicate text-only block before finalizing it", async () => {
const { answerDraftStream } = setupDraftStreams();
answerDraftStream.stop.mockImplementation(async () => {
answerDraftStream.setMessageId(2001);
});
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "pending answer" });
await dispatcherOptions.deliver({ text: "pending answer" }, { kind: "block" });
return { queuedFinal: false };
},
);
await dispatchWithContext({
context: createContext(),
streamMode: "partial",
telegramCfg: { streaming: { mode: "partial" } },
});
expect(answerDraftStream.stop).toHaveBeenCalled();
expect(answerDraftStream.clear).not.toHaveBeenCalled();
expectRecordFields(mockCallArg(emitInternalMessageSentHook), {
content: "pending answer",
messageId: 2001,
});
});
it("keeps queued room events abortable after their source dispatch returns", async () => {
const historyKey = "telegram:group:-100123";
const groupHistories = new Map([[historyKey, []]]);

View File

@@ -19,6 +19,7 @@ import { CURRENT_MESSAGE_MARKER } from "openclaw/plugin-sdk/channel-mention-gati
import {
createChannelMessageReplyPipeline,
createOutboundPayloadPlan,
createPreviewMessageReceipt,
deriveDurableFinalDeliveryRequirements,
projectOutboundPayloadPlanForDelivery,
} from "openclaw/plugin-sdk/channel-outbound";
@@ -899,6 +900,7 @@ export const dispatchTelegramMessage = async ({
renderText: renderStreamText,
onSupersededPreview: (superseded) => {
if (superseded.retain) {
lanes[laneName].activeChunkIndex += 1;
return;
}
void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => {
@@ -916,6 +918,7 @@ export const dispatchTelegramMessage = async ({
lastPartialText: "",
hasStreamedMessage: false,
finalized: false,
activeChunkIndex: 0,
};
};
const lanes: Record<LaneName, DraftLaneState> = {
@@ -1075,6 +1078,7 @@ export const dispatchTelegramMessage = async ({
}
lane.hasStreamedMessage = false;
lane.finalized = false;
lane.activeChunkIndex = 0;
if (lane === answerLane) {
resetAnswerToolProgressDraft();
}
@@ -1293,6 +1297,7 @@ export const dispatchTelegramMessage = async ({
const silentErrorReplies = telegramCfg.silentErrorReplies === true;
const isDmTopic = !isGroup && threadSpec.scope === "dm" && threadSpec.id != null;
let queuedFinal = false;
let skippedDuplicateAnswerBlockDraftDelivery = false;
let suppressSilentReplyFallback = false;
let hadErrorReplyFailureOrSkip = false;
let isFirstTurnInSession = false;
@@ -1491,6 +1496,43 @@ export const dispatchTelegramMessage = async ({
});
}
};
const finalizeSkippedDuplicateAnswerBlockDraft = async () => {
if (
!skippedDuplicateAnswerBlockDraftDelivery ||
queuedFinal ||
dispatchError ||
isDispatchSuperseded() ||
answerLane.finalized
) {
return;
}
const stream = answerLane.stream;
const content = answerLane.lastPartialText;
if (!stream || !content) {
return;
}
await stream.stop();
const messageId = stream.messageId();
if (typeof messageId !== "number") {
if (stream.sendMayHaveLanded?.()) {
answerLane.finalized = true;
deliveryState.markDelivered();
}
return;
}
answerLane.finalized = true;
deliveryState.markDelivered();
await emitPreviewFinalizedHook({
kind: "preview-finalized",
delivery: {
content,
promptContextContent: content,
messageId,
buttonsAttached: false,
receipt: createPreviewMessageReceipt({ id: messageId }),
},
});
};
const deliverLaneText = createLaneTextDeliverer({
lanes,
draftMaxChars,
@@ -1760,6 +1802,24 @@ export const dispatchTelegramMessage = async ({
}
await prepareAnswerLaneForToolProgress();
}
const skipTextOnlyBlock =
streamMode === "partial" &&
info.kind === "block" &&
segment.lane === "answer" &&
!reply.hasMedia &&
!hasExecApprovalPayload(effectivePayload) &&
telegramButtons === undefined &&
answerLane.hasStreamedMessage &&
!activeAnswerDraftIsToolProgressOnly &&
segment.update.text.trimEnd() === answerLane.lastPartialText.trimEnd();
if (skipTextOnlyBlock) {
skippedDuplicateAnswerBlockDraftDelivery = true;
blockDelivered = true;
continue;
}
const result =
segment.lane === "answer" && info.kind === "final"
? await deliverFinalAnswerText(
@@ -2085,6 +2145,7 @@ export const dispatchTelegramMessage = async ({
progressDraft.cancel();
await draftLaneEventQueue;
nativeToolProgressDraft?.stop();
await finalizeSkippedDuplicateAnswerBlockDraft();
const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [
{ laneName: "answer", lane: answerLane },
{ laneName: "reasoning", lane: reasoningLane },

View File

@@ -22,6 +22,7 @@ export type DraftLaneState = {
lastPartialText: string;
hasStreamedMessage: boolean;
finalized: boolean;
activeChunkIndex: number;
};
type LanePreviewFinalizedDelivery = {
@@ -275,11 +276,19 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
text.length > params.draftMaxChars
? compactChunks(params.splitFinalTextForStream?.(text) ?? [])
: [text];
const [firstChunk, ...remainingChunks] = chunks;
if (!firstChunk || firstChunk.length > params.draftMaxChars) {
const clampActiveChunkIndex = () =>
Math.min(lane.activeChunkIndex, Math.max(0, chunks.length - 1));
const activeChunkIndex = clampActiveChunkIndex();
const activeChunk = chunks[activeChunkIndex];
const remainingChunks = chunks.slice(activeChunkIndex + 1);
if (!activeChunk || activeChunk.length > params.draftMaxChars) {
return undefined;
}
const finalText = text.trimEnd();
const activeFullText = chunks.slice(activeChunkIndex).join("");
const finalText = activeFullText.trimEnd();
const deliveredStreamTextBeforeUpdate = stream.lastDeliveredText?.();
const deliveredPrefixBeforeUpdate =
isFinal &&
@@ -288,7 +297,8 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
deliveredText: deliveredStreamTextBeforeUpdate,
finalText,
}) &&
deliveredStreamTextBeforeUpdate.length > firstChunk.trimEnd().length;
deliveredStreamTextBeforeUpdate.length > activeChunk.trimEnd().length;
const finalizeDeliveredPrefix = async (
deliveredStreamText: string,
messageId: number,
@@ -310,7 +320,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
}
}
const suffix = finalText.slice(deliveredStreamText.length);
const suffix = activeFullText.slice(deliveredStreamText.length);
if (suffix.trim().length > 0) {
for (const chunk of compactChunks(params.splitFinalTextForStream?.(suffix) ?? [])) {
if (chunk.trim().length === 0) {
@@ -327,17 +337,29 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
});
};
const candidateTexts = [stream.lastDeliveredText?.(), lane.lastPartialText];
if (isFinal && remainingChunks.length === 0 && isPotentialTruncatedFinal(activeFullText)) {
const resolvedFullCandidate = await params.resolveFinalTextCandidate?.({
finalText: text,
laneName,
});
if (resolvedFullCandidate) {
const resolvedChunks =
resolvedFullCandidate.length > params.draftMaxChars
? compactChunks(params.splitFinalTextForStream?.(resolvedFullCandidate) ?? [])
: [resolvedFullCandidate];
candidateTexts.push(resolvedChunks.slice(activeChunkIndex).join(""));
}
}
const retainedPreview =
isFinal && remainingChunks.length === 0 && isPotentialTruncatedFinal(text)
isFinal && remainingChunks.length === 0 && isPotentialTruncatedFinal(activeFullText)
? selectLongerFinalText({
finalText: text,
candidateTexts: [
await params.resolveFinalTextCandidate?.({ finalText: text, laneName }),
stream.lastDeliveredText?.(),
lane.lastPartialText,
],
finalText: activeFullText,
candidateTexts,
})
: undefined;
if (retainedPreview && (!buttons || retainedPreview.length <= params.draftMaxChars)) {
const previewText = retainedPreview;
lane.lastPartialText = previewText;
@@ -376,20 +398,28 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
lane.finalized = true;
params.markDelivered();
return result("preview-finalized", { content: previewText, messageId, buttonsAttached });
return result("preview-finalized", {
content: previewText,
promptContextContent: previewText,
messageId,
buttonsAttached,
});
}
if (!deliveredPrefixBeforeUpdate) {
lane.lastPartialText = firstChunk;
lane.lastPartialText = activeChunk;
lane.hasStreamedMessage = true;
lane.finalized = false;
stream.update(firstChunk);
stream.update(activeChunk);
}
if (isFinal) {
await params.stopDraftLane(lane);
} else {
await params.flushDraftLane(lane);
}
const activeChunkIndexAfterStop = isFinal ? clampActiveChunkIndex() : activeChunkIndex;
const activeChunkAfterStop = chunks[activeChunkIndexAfterStop] ?? activeChunk;
const remainingChunksAfterStop = chunks.slice(activeChunkIndexAfterStop + 1);
const messageId = stream.messageId();
if (typeof messageId !== "number") {
@@ -402,14 +432,19 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
const deliveredStreamTextAfterStop = stream.lastDeliveredText?.();
const activeChunkTextAfterStop = activeChunkAfterStop.trimEnd();
const retainedActiveChunkAfterStop =
activeChunkIndexAfterStop !== activeChunkIndex &&
deliveredStreamTextAfterStop === activeChunk.trimEnd();
if (
isFinal &&
deliveredStreamTextAfterStop !== undefined &&
deliveredStreamTextAfterStop !== firstChunk.trimEnd()
deliveredStreamTextAfterStop !== activeChunkTextAfterStop &&
!retainedActiveChunkAfterStop
) {
if (
isDeliveredPrefix({ deliveredText: deliveredStreamTextAfterStop, finalText }) &&
deliveredStreamTextAfterStop.length > firstChunk.trimEnd().length
deliveredStreamTextAfterStop.length > activeChunkTextAfterStop.length
) {
return await finalizeDeliveredPrefix(deliveredStreamTextAfterStop, messageId);
}
@@ -424,7 +459,12 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
let buttonsAttached = false;
if (buttons) {
try {
await params.editStreamMessage({ laneName, messageId, text: firstChunk, buttons });
await params.editStreamMessage({
laneName,
messageId,
text: activeChunkAfterStop,
buttons,
});
buttonsAttached = true;
} catch (err) {
params.log(`telegram: ${laneName} stream button edit failed: ${String(err)}`);
@@ -433,7 +473,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
if (isFinal) {
lane.finalized = true;
for (const chunk of remainingChunks) {
for (const chunk of remainingChunksAfterStop) {
if (chunk.trim().length === 0) {
continue;
}
@@ -441,7 +481,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
}
return result("preview-finalized", {
content: text,
promptContextContent: firstChunk,
promptContextContent: activeChunkAfterStop,
messageId,
buttonsAttached,
});

View File

@@ -31,12 +31,14 @@ function createHarness(params?: {
lastPartialText: "",
hasStreamedMessage: false,
finalized: false,
activeChunkIndex: 0,
},
reasoning: {
stream: reasoning,
lastPartialText: "",
hasStreamedMessage: false,
finalized: false,
activeChunkIndex: 0,
},
};
const sendPayload = vi.fn().mockResolvedValue(true);
@@ -762,6 +764,87 @@ describe("createLaneTextDeliverer", () => {
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("does not resend chunks retained while stopping a long streamed final", async () => {
const answer = createTestDraftStream({ messageId: 999 });
const harness = createHarness({
answerStream: answer,
draftMaxChars: 5,
splitFinalTextForStream: () => ["Hello", " world", " again"],
});
harness.lanes.answer.hasStreamedMessage = true;
answer.stop.mockImplementation(async () => {
harness.lanes.answer.activeChunkIndex = 1;
});
const result = await deliverFinalAnswer(harness, "Hello world again");
const delivery = expectPreviewFinalized(result);
expect(delivery.content).toBe("Hello world again");
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith({ text: " again" });
expect(harness.markDelivered).toHaveBeenCalledTimes(1);
});
it("compares retained delivered prefixes against the full final text", async () => {
let deliveredText = "Hello";
const answer = createTestDraftStream({ messageId: 999 });
const harness = createHarness({
answerStream: answer,
draftMaxChars: 5,
splitFinalTextForStream: (text) =>
text === " again" ? [" again"] : ["Hello", " world", " again"],
});
answer.lastDeliveredText.mockImplementation(() => deliveredText);
answer.stop.mockImplementation(async () => {
harness.lanes.answer.activeChunkIndex = 1;
deliveredText = "Hello world";
});
harness.lanes.answer.hasStreamedMessage = true;
const result = await deliverFinalAnswer(harness, "Hello world again");
const delivery = expectPreviewFinalized(result);
expect(delivery.promptContextContent).toBe("Hello world");
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith({ text: " again" });
});
it("edits buttons onto the chunk active after stopping a retained long final", async () => {
const buttons = [[{ text: "OK", callback_data: "ok" }]];
const answer = createTestDraftStream({ messageId: 999 });
const harness = createHarness({
answerStream: answer,
draftMaxChars: 6,
splitFinalTextForStream: () => ["Hello", " world", " again"],
});
harness.lanes.answer.hasStreamedMessage = true;
answer.stop.mockImplementation(async () => {
harness.lanes.answer.activeChunkIndex = 1;
});
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Hello world again",
payload: { text: "Hello world again", channelData: { telegram: { buttons } } },
infoKind: "final",
buttons,
});
const delivery = expectPreviewFinalized(result);
expect(delivery.buttonsAttached).toBe(true);
expect(harness.editStreamMessage).toHaveBeenCalledWith({
laneName: "answer",
messageId: 999,
text: " world",
buttons,
});
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith({
text: " again",
channelData: { telegram: { buttons } },
});
});
it("keeps inline buttons on the current chunk of an already-streamed long final", async () => {
const buttons = [[{ text: "OK", callback_data: "ok" }]];
const fullAnswer = "Hello world again";