From 10bbed8a6d303b2a61512dc7ca446863bca5429b Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Fri, 8 May 2026 18:14:32 +0530 Subject: [PATCH] fix(telegram): chain over-limit stream previews --- CHANGELOG.md | 1 + .../telegram/src/bot-message-dispatch.test.ts | 32 +++++++ .../telegram/src/bot-message-dispatch.ts | 3 + extensions/telegram/src/draft-stream.test.ts | 57 +++++++++++ extensions/telegram/src/draft-stream.ts | 95 ++++++++++++++++--- 5 files changed, 177 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a7bbe85929..2fc697f2200 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai - Agents/compaction: keep contributor diagnostics to a bounded top-three selection without sorting the full history. Thanks @shakkernerd. - Sessions/UI: avoid full-array sorting while selecting ACPX leases, Google Meet calendar events, and latest chat sessions. Thanks @shakkernerd. - Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus. +- Telegram/streaming: continue over-limit draft previews in a new message instead of stopping when rendered preview text crosses Telegram's message limit. (#74508) Thanks @anagnorisis2peripeteia. - Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip. - Telegram: re-probe the primary fetch transport after repeated sticky fallback success so transient IPv4 or pinned-IP fallback promotion can recover without a gateway restart. Fixes #77088. (#77157) Thanks @MkDev11. - Runtime/install: raise the supported Node 22 floor to `22.16+` so native SQLite query handling can rely on the `node:sqlite` statement metadata API while continuing to recommend Node 24. (#78921) diff --git a/extensions/telegram/src/bot-message-dispatch.test.ts b/extensions/telegram/src/bot-message-dispatch.test.ts index f824b407ca3..b64f1e2bb24 100644 --- a/extensions/telegram/src/bot-message-dispatch.test.ts +++ b/extensions/telegram/src/bot-message-dispatch.test.ts @@ -468,6 +468,38 @@ describe("dispatchTelegramMessage draft streaming", () => { expect(draftStream.clear).toHaveBeenCalledTimes(1); }); + it("keeps retained overflow draft previews", async () => { + const draftStream = createDraftStream(); + const bot = createBot(); + createTelegramDraftStream.mockReturnValue(draftStream); + dispatchReplyWithBufferedBlockDispatcher.mockImplementation( + async ({ dispatcherOptions, replyOptions }) => { + await replyOptions?.onPartialReply?.({ text: "Hello" }); + await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" }); + return { queuedFinal: true }; + }, + ); + deliverReplies.mockResolvedValue({ delivered: true }); + + await dispatchWithContext({ context: createContext(), bot }); + + const streamParams = createTelegramDraftStream.mock.calls[0]?.[0] as Parameters< + NonNullable + >[0]; + streamParams.onSupersededPreview?.({ + messageId: 17, + textSnapshot: "first page", + retain: true, + }); + expect(bot.api.deleteMessage).not.toHaveBeenCalled(); + + streamParams.onSupersededPreview?.({ + messageId: 18, + textSnapshot: "stale page", + }); + await vi.waitFor(() => expect(bot.api.deleteMessage).toHaveBeenCalledWith(123, 18)); + }); + it("queues final Telegram replies through outbound delivery when available", async () => { deliverInboundReplyWithMessageSendContext.mockResolvedValue({ status: "handled_visible", diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index e5d9fb373c9..d27460123a2 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -442,6 +442,9 @@ export const dispatchTelegramMessage = async ({ minInitialChars: draftMinInitialChars, renderText: renderStreamText, onSupersededPreview: (superseded) => { + if (superseded.retain) { + return; + } void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => { logVerbose( `telegram: superseded ${laneName} stream cleanup failed (${superseded.messageId}): ${String(err)}`, diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index 41002246a86..cad4fc4ab16 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -389,6 +389,63 @@ describe("createTelegramDraftStream", () => { }); }); + it("continues in a new message when rendered preview crosses maxChars", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }); + const stream = createDraftStream(api, { maxChars: 20 }); + + stream.update("Hello world"); + await stream.flush(); + stream.update("Hello world foo bar baz qux"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello world", undefined); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "foo bar baz qux", undefined); + }); + + it("splits a first oversized rendered preview into chained messages", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }); + const stream = createDraftStream(api, { maxChars: 10 }); + + stream.update("1234567890ABCDEFGHIJ"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "1234567890", undefined); + expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "ABCDEFGHIJ", undefined); + }); + + it("retains overflow preview pages", async () => { + const api = createMockDraftApi(); + api.sendMessage + .mockResolvedValueOnce({ message_id: 17 }) + .mockResolvedValueOnce({ message_id: 42 }); + const onSupersededPreview = vi.fn(); + const stream = createDraftStream(api, { + maxChars: 20, + onSupersededPreview, + }); + + stream.update("Hello world"); + await stream.flush(); + stream.update("Hello world foo bar baz qux"); + await stream.flush(); + + expect(onSupersededPreview).toHaveBeenCalledWith({ + messageId: 17, + textSnapshot: "Hello world", + parseMode: undefined, + visibleSinceMs: expect.any(Number), + retain: true, + }); + }); + it("enforces maxChars after renderText expansion", async () => { const api = createMockDraftApi(); const warn = vi.fn(); diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index f9ab22c88d7..9a532fc5255 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -53,8 +53,38 @@ type SupersededTelegramPreview = { textSnapshot: string; parseMode?: "HTML"; visibleSinceMs?: number; + retain?: boolean; }; +function renderTelegramDraftPreview( + text: string, + renderText: ((text: string) => TelegramDraftPreview) | undefined, +): TelegramDraftPreview { + const trimmed = text.trimEnd(); + return renderText?.(trimmed) ?? { text: trimmed }; +} + +function findTelegramDraftChunkLength( + text: string, + maxChars: number, + renderText: ((text: string) => TelegramDraftPreview) | undefined, +): number { + let best = 0; + let low = 1; + let high = text.length; + while (low <= high) { + const mid = Math.floor((low + high) / 2); + const renderedText = renderTelegramDraftPreview(text.slice(0, mid), renderText).text.trimEnd(); + if (renderedText && renderedText.length <= maxChars) { + best = mid; + low = mid + 1; + } else { + high = mid - 1; + } + } + return best; +} + export function createTelegramDraftStream(params: { api: Bot["api"]; chatId: Parameters[0]; @@ -98,6 +128,8 @@ export function createTelegramDraftStream(params: { let lastSentParseMode: "HTML" | undefined; let previewRevision = 0; let generation = 0; + let deliveredTextOffset = 0; + let resetStreamToNewMessage: (options?: { keepPending?: boolean; resetOffset?: boolean }) => void; type PreviewSendParams = { renderedText: string; renderedParseMode: "HTML" | undefined; @@ -198,13 +230,45 @@ export function createTelegramDraftStream(params: { if (!trimmed) { return false; } - const rendered = params.renderText?.(trimmed) ?? { text: trimmed }; + const currentText = trimmed.slice(deliveredTextOffset).trimStart(); + if (!currentText) { + return false; + } + const rendered = renderTelegramDraftPreview(currentText, params.renderText); const renderedText = rendered.text.trimEnd(); const renderedParseMode = rendered.parseMode; if (!renderedText) { return false; } if (renderedText.length > maxChars) { + if (lastDeliveredText.length > deliveredTextOffset) { + const supersededMessageId = streamMessageId; + const supersededTextSnapshot = lastSentText; + const supersededParseMode = lastSentParseMode; + const supersededVisibleSinceMs = streamVisibleSinceMs; + deliveredTextOffset = lastDeliveredText.length; + resetStreamToNewMessage({ keepPending: true, resetOffset: false }); + if (typeof supersededMessageId === "number") { + params.onSupersededPreview?.({ + messageId: supersededMessageId, + textSnapshot: supersededTextSnapshot, + parseMode: supersededParseMode, + visibleSinceMs: supersededVisibleSinceMs, + retain: true, + }); + } + return await sendOrEditStreamMessage(trimmed); + } + const chunkLength = findTelegramDraftChunkLength(currentText, maxChars, params.renderText); + if (chunkLength > 0) { + const sent = await sendOrEditStreamMessage( + trimmed.slice(0, deliveredTextOffset) + currentText.slice(0, chunkLength), + ); + if (!sent) { + return false; + } + return await sendOrEditStreamMessage(trimmed); + } streamState.stopped = true; params.warn?.( `telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`, @@ -248,6 +312,24 @@ export function createTelegramDraftStream(params: { sendOrEditStreamMessage, }); + resetStreamToNewMessage = (options) => { + streamState.stopped = false; + streamState.final = false; + generation += 1; + messageSendAttempted = false; + streamMessageId = undefined; + streamVisibleSinceMs = undefined; + lastSentText = ""; + lastSentParseMode = undefined; + if (options?.resetOffset !== false) { + deliveredTextOffset = 0; + } + if (!options?.keepPending) { + loop.resetPending(); + } + loop.resetThrottleWindow(); + }; + const clear = async () => { const messageId = await takeMessageIdAfterStop({ stopForClear, @@ -272,16 +354,7 @@ export function createTelegramDraftStream(params: { }; const forceNewMessage = () => { - streamState.stopped = false; - streamState.final = false; - generation += 1; - messageSendAttempted = false; - streamMessageId = undefined; - streamVisibleSinceMs = undefined; - lastSentText = ""; - lastSentParseMode = undefined; - loop.resetPending(); - loop.resetThrottleWindow(); + resetStreamToNewMessage(); }; const materialize = async (): Promise => {