diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index 0c59c713ed2..05bd3ce935a 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -3573,6 +3573,150 @@ describe("dispatchTelegramMessage draft streaming", () => { await sidePromise; }); + it("does not drop the first chunk of a long final after a generic lane rotation", async () => { + const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await dispatcherOptions.deliver( + { text: "A".repeat(4000) + "B".repeat(4000) }, + { kind: "final" }, + ); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + textLimit: 4000, + }); + + expect(answerDraftStream.update).toHaveBeenCalledWith("A".repeat(4000)); + }); + + it("does not suppress text-only blocks as delivered when answer draft is inactive", async () => { + setupDraftStreams({ answerMessageId: 2001 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => { + await dispatcherOptions.deliver({ text: "forced block" }, { kind: "block" }); + await dispatcherOptions.deliver({ text: "final text" }, { kind: "final" }); + return { queuedFinal: true }; + }); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + telegramCfg: { + streaming: { mode: "partial", block: { enabled: true } }, + } satisfies Parameters[0]["telegramCfg"], + }); + + const deliveredTexts = deliverReplies.mock.calls.flatMap((call) => + ((call[0] as { replies?: Array<{ text?: string }> }).replies ?? []).map( + (reply) => reply.text, + ), + ); + expect(deliveredTexts).toContain("forced block"); + }); + + it("does not suppress text-only blocks after a tool-progress draft", async () => { + const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onToolStart?.({ name: "exec", phase: "start" }); + await dispatcherOptions.deliver({ text: "block after progress" }, { kind: "block" }); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + telegramCfg: { streaming: { mode: "partial" } }, + }); + + expect(mockCallArg(answerDraftStream.update)).toContain("Exec"); + expect(answerDraftStream.update).toHaveBeenLastCalledWith("block after progress"); + }); + + it("does not suppress button-bearing blocks after answer streaming starts", async () => { + const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); + const buttons = [[{ text: "OK", callback_data: "ok" }]]; + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "partial answer" }); + await dispatcherOptions.deliver( + { text: "choose now", channelData: { telegram: { buttons } } }, + { kind: "block" }, + ); + return { queuedFinal: true }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + telegramCfg: { streaming: { mode: "partial" } }, + }); + + expect(answerDraftStream.update).toHaveBeenLastCalledWith("choose now"); + expectRecordFields(mockCallArg(editMessageTelegram, 0, 3), { buttons }); + }); + + it("finalizes a duplicate text-only block when no final follows", async () => { + const { answerDraftStream } = setupDraftStreams({ answerMessageId: 2001 }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "partial answer" }); + await dispatcherOptions.deliver({ text: "partial answer" }, { kind: "block" }); + return { queuedFinal: false }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + telegramCfg: { streaming: { mode: "partial" } }, + }); + + expect(answerDraftStream.stop).toHaveBeenCalled(); + expect(answerDraftStream.clear).not.toHaveBeenCalled(); + expectRecordFields(mockCallArg(emitInternalMessageSentHook), { + content: "partial answer", + messageId: 2001, + }); + expectRecordFields(mockCallArg(recordOutboundMessageForPromptContext), { + text: "partial answer", + messageId: 2001, + }); + }); + + it("materializes a pending duplicate text-only block before finalizing it", async () => { + const { answerDraftStream } = setupDraftStreams(); + answerDraftStream.stop.mockImplementation(async () => { + answerDraftStream.setMessageId(2001); + }); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "pending answer" }); + await dispatcherOptions.deliver({ text: "pending answer" }, { kind: "block" }); + return { queuedFinal: false }; + }, + ); + + await dispatchWithContext({ + context: createContext(), + streamMode: "partial", + telegramCfg: { streaming: { mode: "partial" } }, + }); + + expect(answerDraftStream.stop).toHaveBeenCalled(); + expect(answerDraftStream.clear).not.toHaveBeenCalled(); + expectRecordFields(mockCallArg(emitInternalMessageSentHook), { + content: "pending answer", + messageId: 2001, + }); + }); + it("keeps queued room events abortable after their source dispatch returns", async () => { const historyKey = "telegram:group:-100123"; const groupHistories = new Map([[historyKey, []]]); diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index a78e0eff59e..2007dd9dadf 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -19,6 +19,7 @@ import { CURRENT_MESSAGE_MARKER } from "openclaw/plugin-sdk/channel-mention-gati import { createChannelMessageReplyPipeline, createOutboundPayloadPlan, + createPreviewMessageReceipt, deriveDurableFinalDeliveryRequirements, projectOutboundPayloadPlanForDelivery, } from "openclaw/plugin-sdk/channel-outbound"; @@ -899,6 +900,7 @@ export const dispatchTelegramMessage = async ({ renderText: renderStreamText, onSupersededPreview: (superseded) => { if (superseded.retain) { + lanes[laneName].activeChunkIndex += 1; return; } void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => { @@ -916,6 +918,7 @@ export const dispatchTelegramMessage = async ({ lastPartialText: "", hasStreamedMessage: false, finalized: false, + activeChunkIndex: 0, }; }; const lanes: Record = { @@ -1075,6 +1078,7 @@ export const dispatchTelegramMessage = async ({ } lane.hasStreamedMessage = false; lane.finalized = false; + lane.activeChunkIndex = 0; if (lane === answerLane) { resetAnswerToolProgressDraft(); } @@ -1293,6 +1297,7 @@ export const dispatchTelegramMessage = async ({ const silentErrorReplies = telegramCfg.silentErrorReplies === true; const isDmTopic = !isGroup && threadSpec.scope === "dm" && threadSpec.id != null; let queuedFinal = false; + let skippedDuplicateAnswerBlockDraftDelivery = false; let suppressSilentReplyFallback = false; let hadErrorReplyFailureOrSkip = false; let isFirstTurnInSession = false; @@ -1491,6 +1496,43 @@ export const dispatchTelegramMessage = async ({ }); } }; + const finalizeSkippedDuplicateAnswerBlockDraft = async () => { + if ( + !skippedDuplicateAnswerBlockDraftDelivery || + queuedFinal || + dispatchError || + isDispatchSuperseded() || + answerLane.finalized + ) { + return; + } + const stream = answerLane.stream; + const content = answerLane.lastPartialText; + if (!stream || !content) { + return; + } + await stream.stop(); + const messageId = stream.messageId(); + if (typeof messageId !== "number") { + if (stream.sendMayHaveLanded?.()) { + answerLane.finalized = true; + deliveryState.markDelivered(); + } + return; + } + answerLane.finalized = true; + deliveryState.markDelivered(); + await emitPreviewFinalizedHook({ + kind: "preview-finalized", + delivery: { + content, + promptContextContent: content, + messageId, + buttonsAttached: false, + receipt: createPreviewMessageReceipt({ id: messageId }), + }, + }); + }; const deliverLaneText = createLaneTextDeliverer({ lanes, draftMaxChars, @@ -1760,6 +1802,24 @@ export const dispatchTelegramMessage = async ({ } await prepareAnswerLaneForToolProgress(); } + + const skipTextOnlyBlock = + streamMode === "partial" && + info.kind === "block" && + segment.lane === "answer" && + !reply.hasMedia && + !hasExecApprovalPayload(effectivePayload) && + telegramButtons === undefined && + answerLane.hasStreamedMessage && + !activeAnswerDraftIsToolProgressOnly && + segment.update.text.trimEnd() === answerLane.lastPartialText.trimEnd(); + + if (skipTextOnlyBlock) { + skippedDuplicateAnswerBlockDraftDelivery = true; + blockDelivered = true; + continue; + } + const result = segment.lane === "answer" && info.kind === "final" ? await deliverFinalAnswerText( @@ -2085,6 +2145,7 @@ export const dispatchTelegramMessage = async ({ progressDraft.cancel(); await draftLaneEventQueue; nativeToolProgressDraft?.stop(); + await finalizeSkippedDuplicateAnswerBlockDraft(); const lanesToCleanup: Array<{ laneName: LaneName; lane: DraftLaneState }> = [ { laneName: "answer", lane: answerLane }, { laneName: "reasoning", lane: reasoningLane }, diff --git a/extensions/telegram/src/lane-delivery-text-deliverer.ts b/extensions/telegram/src/lane-delivery-text-deliverer.ts index 50b42e21424..a4fd2e0d048 100644 --- a/extensions/telegram/src/lane-delivery-text-deliverer.ts +++ b/extensions/telegram/src/lane-delivery-text-deliverer.ts @@ -22,6 +22,7 @@ export type DraftLaneState = { lastPartialText: string; hasStreamedMessage: boolean; finalized: boolean; + activeChunkIndex: number; }; type LanePreviewFinalizedDelivery = { @@ -275,11 +276,19 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { text.length > params.draftMaxChars ? compactChunks(params.splitFinalTextForStream?.(text) ?? []) : [text]; - const [firstChunk, ...remainingChunks] = chunks; - if (!firstChunk || firstChunk.length > params.draftMaxChars) { + + const clampActiveChunkIndex = () => + Math.min(lane.activeChunkIndex, Math.max(0, chunks.length - 1)); + const activeChunkIndex = clampActiveChunkIndex(); + const activeChunk = chunks[activeChunkIndex]; + const remainingChunks = chunks.slice(activeChunkIndex + 1); + + if (!activeChunk || activeChunk.length > params.draftMaxChars) { return undefined; } - const finalText = text.trimEnd(); + + const activeFullText = chunks.slice(activeChunkIndex).join(""); + const finalText = activeFullText.trimEnd(); const deliveredStreamTextBeforeUpdate = stream.lastDeliveredText?.(); const deliveredPrefixBeforeUpdate = isFinal && @@ -288,7 +297,8 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { deliveredText: deliveredStreamTextBeforeUpdate, finalText, }) && - deliveredStreamTextBeforeUpdate.length > firstChunk.trimEnd().length; + deliveredStreamTextBeforeUpdate.length > activeChunk.trimEnd().length; + const finalizeDeliveredPrefix = async ( deliveredStreamText: string, messageId: number, @@ -310,7 +320,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } } } - const suffix = finalText.slice(deliveredStreamText.length); + const suffix = activeFullText.slice(deliveredStreamText.length); if (suffix.trim().length > 0) { for (const chunk of compactChunks(params.splitFinalTextForStream?.(suffix) ?? [])) { if (chunk.trim().length === 0) { @@ -327,17 +337,29 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { }); }; + const candidateTexts = [stream.lastDeliveredText?.(), lane.lastPartialText]; + if (isFinal && remainingChunks.length === 0 && isPotentialTruncatedFinal(activeFullText)) { + const resolvedFullCandidate = await params.resolveFinalTextCandidate?.({ + finalText: text, + laneName, + }); + if (resolvedFullCandidate) { + const resolvedChunks = + resolvedFullCandidate.length > params.draftMaxChars + ? compactChunks(params.splitFinalTextForStream?.(resolvedFullCandidate) ?? []) + : [resolvedFullCandidate]; + candidateTexts.push(resolvedChunks.slice(activeChunkIndex).join("")); + } + } + const retainedPreview = - isFinal && remainingChunks.length === 0 && isPotentialTruncatedFinal(text) + isFinal && remainingChunks.length === 0 && isPotentialTruncatedFinal(activeFullText) ? selectLongerFinalText({ - finalText: text, - candidateTexts: [ - await params.resolveFinalTextCandidate?.({ finalText: text, laneName }), - stream.lastDeliveredText?.(), - lane.lastPartialText, - ], + finalText: activeFullText, + candidateTexts, }) : undefined; + if (retainedPreview && (!buttons || retainedPreview.length <= params.draftMaxChars)) { const previewText = retainedPreview; lane.lastPartialText = previewText; @@ -376,20 +398,28 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } lane.finalized = true; params.markDelivered(); - return result("preview-finalized", { content: previewText, messageId, buttonsAttached }); + return result("preview-finalized", { + content: previewText, + promptContextContent: previewText, + messageId, + buttonsAttached, + }); } if (!deliveredPrefixBeforeUpdate) { - lane.lastPartialText = firstChunk; + lane.lastPartialText = activeChunk; lane.hasStreamedMessage = true; lane.finalized = false; - stream.update(firstChunk); + stream.update(activeChunk); } if (isFinal) { await params.stopDraftLane(lane); } else { await params.flushDraftLane(lane); } + const activeChunkIndexAfterStop = isFinal ? clampActiveChunkIndex() : activeChunkIndex; + const activeChunkAfterStop = chunks[activeChunkIndexAfterStop] ?? activeChunk; + const remainingChunksAfterStop = chunks.slice(activeChunkIndexAfterStop + 1); const messageId = stream.messageId(); if (typeof messageId !== "number") { @@ -402,14 +432,19 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } const deliveredStreamTextAfterStop = stream.lastDeliveredText?.(); + const activeChunkTextAfterStop = activeChunkAfterStop.trimEnd(); + const retainedActiveChunkAfterStop = + activeChunkIndexAfterStop !== activeChunkIndex && + deliveredStreamTextAfterStop === activeChunk.trimEnd(); if ( isFinal && deliveredStreamTextAfterStop !== undefined && - deliveredStreamTextAfterStop !== firstChunk.trimEnd() + deliveredStreamTextAfterStop !== activeChunkTextAfterStop && + !retainedActiveChunkAfterStop ) { if ( isDeliveredPrefix({ deliveredText: deliveredStreamTextAfterStop, finalText }) && - deliveredStreamTextAfterStop.length > firstChunk.trimEnd().length + deliveredStreamTextAfterStop.length > activeChunkTextAfterStop.length ) { return await finalizeDeliveredPrefix(deliveredStreamTextAfterStop, messageId); } @@ -424,7 +459,12 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { let buttonsAttached = false; if (buttons) { try { - await params.editStreamMessage({ laneName, messageId, text: firstChunk, buttons }); + await params.editStreamMessage({ + laneName, + messageId, + text: activeChunkAfterStop, + buttons, + }); buttonsAttached = true; } catch (err) { params.log(`telegram: ${laneName} stream button edit failed: ${String(err)}`); @@ -433,7 +473,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { if (isFinal) { lane.finalized = true; - for (const chunk of remainingChunks) { + for (const chunk of remainingChunksAfterStop) { if (chunk.trim().length === 0) { continue; } @@ -441,7 +481,7 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { } return result("preview-finalized", { content: text, - promptContextContent: firstChunk, + promptContextContent: activeChunkAfterStop, messageId, buttonsAttached, }); diff --git a/extensions/telegram/src/lane-delivery.test.ts b/extensions/telegram/src/lane-delivery.test.ts index df9ff08f02c..6813d0ae614 100644 --- a/extensions/telegram/src/lane-delivery.test.ts +++ b/extensions/telegram/src/lane-delivery.test.ts @@ -31,12 +31,14 @@ function createHarness(params?: { lastPartialText: "", hasStreamedMessage: false, finalized: false, + activeChunkIndex: 0, }, reasoning: { stream: reasoning, lastPartialText: "", hasStreamedMessage: false, finalized: false, + activeChunkIndex: 0, }, }; const sendPayload = vi.fn().mockResolvedValue(true); @@ -762,6 +764,87 @@ describe("createLaneTextDeliverer", () => { expect(harness.markDelivered).toHaveBeenCalledTimes(1); }); + it("does not resend chunks retained while stopping a long streamed final", async () => { + const answer = createTestDraftStream({ messageId: 999 }); + const harness = createHarness({ + answerStream: answer, + draftMaxChars: 5, + splitFinalTextForStream: () => ["Hello", " world", " again"], + }); + harness.lanes.answer.hasStreamedMessage = true; + answer.stop.mockImplementation(async () => { + harness.lanes.answer.activeChunkIndex = 1; + }); + + const result = await deliverFinalAnswer(harness, "Hello world again"); + + const delivery = expectPreviewFinalized(result); + expect(delivery.content).toBe("Hello world again"); + expect(harness.sendPayload).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith({ text: " again" }); + expect(harness.markDelivered).toHaveBeenCalledTimes(1); + }); + + it("compares retained delivered prefixes against the full final text", async () => { + let deliveredText = "Hello"; + const answer = createTestDraftStream({ messageId: 999 }); + const harness = createHarness({ + answerStream: answer, + draftMaxChars: 5, + splitFinalTextForStream: (text) => + text === " again" ? [" again"] : ["Hello", " world", " again"], + }); + answer.lastDeliveredText.mockImplementation(() => deliveredText); + answer.stop.mockImplementation(async () => { + harness.lanes.answer.activeChunkIndex = 1; + deliveredText = "Hello world"; + }); + harness.lanes.answer.hasStreamedMessage = true; + + const result = await deliverFinalAnswer(harness, "Hello world again"); + + const delivery = expectPreviewFinalized(result); + expect(delivery.promptContextContent).toBe("Hello world"); + expect(harness.sendPayload).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith({ text: " again" }); + }); + + it("edits buttons onto the chunk active after stopping a retained long final", async () => { + const buttons = [[{ text: "OK", callback_data: "ok" }]]; + const answer = createTestDraftStream({ messageId: 999 }); + const harness = createHarness({ + answerStream: answer, + draftMaxChars: 6, + splitFinalTextForStream: () => ["Hello", " world", " again"], + }); + harness.lanes.answer.hasStreamedMessage = true; + answer.stop.mockImplementation(async () => { + harness.lanes.answer.activeChunkIndex = 1; + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello world again", + payload: { text: "Hello world again", channelData: { telegram: { buttons } } }, + infoKind: "final", + buttons, + }); + + const delivery = expectPreviewFinalized(result); + expect(delivery.buttonsAttached).toBe(true); + expect(harness.editStreamMessage).toHaveBeenCalledWith({ + laneName: "answer", + messageId: 999, + text: " world", + buttons, + }); + expect(harness.sendPayload).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith({ + text: " again", + channelData: { telegram: { buttons } }, + }); + }); + it("keeps inline buttons on the current chunk of an already-streamed long final", async () => { const buttons = [[{ text: "OK", callback_data: "ok" }]]; const fullAnswer = "Hello world again";