fix(telegram): chain over-limit stream previews

This commit is contained in:
Ayaan Zaidi
2026-05-08 18:14:32 +05:30
parent c7cf34a955
commit 10bbed8a6d
5 changed files with 177 additions and 11 deletions

View File

@@ -20,6 +20,7 @@ Docs: https://docs.openclaw.ai
- Agents/compaction: keep contributor diagnostics to a bounded top-three selection without sorting the full history. Thanks @shakkernerd.
- Sessions/UI: avoid full-array sorting while selecting ACPX leases, Google Meet calendar events, and latest chat sessions. Thanks @shakkernerd.
- Telegram: preserve the channel-specific 10-option poll cap in the unified outbound adapter so over-limit polls are rejected before send. (#78762) Thanks @obviyus.
- Telegram/streaming: continue over-limit draft previews in a new message instead of stopping when rendered preview text crosses Telegram's message limit. (#74508) Thanks @anagnorisis2peripeteia.
- Slack: route handled top-level channel turns in implicit-conversation channels to thread-scoped sessions when Slack reply threading is enabled, keeping the root turn and later thread replies on one OpenClaw session. (#78522) Thanks @zeroth-blip.
- Telegram: re-probe the primary fetch transport after repeated sticky fallback success so transient IPv4 or pinned-IP fallback promotion can recover without a gateway restart. Fixes #77088. (#77157) Thanks @MkDev11.
- Runtime/install: raise the supported Node 22 floor to `22.16+` so native SQLite query handling can rely on the `node:sqlite` statement metadata API while continuing to recommend Node 24. (#78921)

View File

@@ -468,6 +468,38 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.clear).toHaveBeenCalledTimes(1);
});
it("keeps retained overflow draft previews", async () => {
const draftStream = createDraftStream();
const bot = createBot();
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(
async ({ dispatcherOptions, replyOptions }) => {
await replyOptions?.onPartialReply?.({ text: "Hello" });
await dispatcherOptions.deliver({ text: "Hello" }, { kind: "final" });
return { queuedFinal: true };
},
);
deliverReplies.mockResolvedValue({ delivered: true });
await dispatchWithContext({ context: createContext(), bot });
const streamParams = createTelegramDraftStream.mock.calls[0]?.[0] as Parameters<
NonNullable<TelegramBotDeps["createTelegramDraftStream"]>
>[0];
streamParams.onSupersededPreview?.({
messageId: 17,
textSnapshot: "first page",
retain: true,
});
expect(bot.api.deleteMessage).not.toHaveBeenCalled();
streamParams.onSupersededPreview?.({
messageId: 18,
textSnapshot: "stale page",
});
await vi.waitFor(() => expect(bot.api.deleteMessage).toHaveBeenCalledWith(123, 18));
});
it("queues final Telegram replies through outbound delivery when available", async () => {
deliverInboundReplyWithMessageSendContext.mockResolvedValue({
status: "handled_visible",

View File

@@ -442,6 +442,9 @@ export const dispatchTelegramMessage = async ({
minInitialChars: draftMinInitialChars,
renderText: renderStreamText,
onSupersededPreview: (superseded) => {
if (superseded.retain) {
return;
}
void bot.api.deleteMessage(chatId, superseded.messageId).catch((err: unknown) => {
logVerbose(
`telegram: superseded ${laneName} stream cleanup failed (${superseded.messageId}): ${String(err)}`,

View File

@@ -389,6 +389,63 @@ describe("createTelegramDraftStream", () => {
});
});
it("continues in a new message when rendered preview crosses maxChars", async () => {
const api = createMockDraftApi();
api.sendMessage
.mockResolvedValueOnce({ message_id: 17 })
.mockResolvedValueOnce({ message_id: 42 });
const stream = createDraftStream(api, { maxChars: 20 });
stream.update("Hello world");
await stream.flush();
stream.update("Hello world foo bar baz qux");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(2);
expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "Hello world", undefined);
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "foo bar baz qux", undefined);
});
it("splits a first oversized rendered preview into chained messages", async () => {
const api = createMockDraftApi();
api.sendMessage
.mockResolvedValueOnce({ message_id: 17 })
.mockResolvedValueOnce({ message_id: 42 });
const stream = createDraftStream(api, { maxChars: 10 });
stream.update("1234567890ABCDEFGHIJ");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(2);
expect(api.sendMessage).toHaveBeenNthCalledWith(1, 123, "1234567890", undefined);
expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "ABCDEFGHIJ", undefined);
});
it("retains overflow preview pages", async () => {
const api = createMockDraftApi();
api.sendMessage
.mockResolvedValueOnce({ message_id: 17 })
.mockResolvedValueOnce({ message_id: 42 });
const onSupersededPreview = vi.fn();
const stream = createDraftStream(api, {
maxChars: 20,
onSupersededPreview,
});
stream.update("Hello world");
await stream.flush();
stream.update("Hello world foo bar baz qux");
await stream.flush();
expect(onSupersededPreview).toHaveBeenCalledWith({
messageId: 17,
textSnapshot: "Hello world",
parseMode: undefined,
visibleSinceMs: expect.any(Number),
retain: true,
});
});
it("enforces maxChars after renderText expansion", async () => {
const api = createMockDraftApi();
const warn = vi.fn();

View File

@@ -53,8 +53,38 @@ type SupersededTelegramPreview = {
textSnapshot: string;
parseMode?: "HTML";
visibleSinceMs?: number;
retain?: boolean;
};
function renderTelegramDraftPreview(
text: string,
renderText: ((text: string) => TelegramDraftPreview) | undefined,
): TelegramDraftPreview {
const trimmed = text.trimEnd();
return renderText?.(trimmed) ?? { text: trimmed };
}
function findTelegramDraftChunkLength(
text: string,
maxChars: number,
renderText: ((text: string) => TelegramDraftPreview) | undefined,
): number {
let best = 0;
let low = 1;
let high = text.length;
while (low <= high) {
const mid = Math.floor((low + high) / 2);
const renderedText = renderTelegramDraftPreview(text.slice(0, mid), renderText).text.trimEnd();
if (renderedText && renderedText.length <= maxChars) {
best = mid;
low = mid + 1;
} else {
high = mid - 1;
}
}
return best;
}
export function createTelegramDraftStream(params: {
api: Bot["api"];
chatId: Parameters<Bot["api"]["sendMessage"]>[0];
@@ -98,6 +128,8 @@ export function createTelegramDraftStream(params: {
let lastSentParseMode: "HTML" | undefined;
let previewRevision = 0;
let generation = 0;
let deliveredTextOffset = 0;
let resetStreamToNewMessage: (options?: { keepPending?: boolean; resetOffset?: boolean }) => void;
type PreviewSendParams = {
renderedText: string;
renderedParseMode: "HTML" | undefined;
@@ -198,13 +230,45 @@ export function createTelegramDraftStream(params: {
if (!trimmed) {
return false;
}
const rendered = params.renderText?.(trimmed) ?? { text: trimmed };
const currentText = trimmed.slice(deliveredTextOffset).trimStart();
if (!currentText) {
return false;
}
const rendered = renderTelegramDraftPreview(currentText, params.renderText);
const renderedText = rendered.text.trimEnd();
const renderedParseMode = rendered.parseMode;
if (!renderedText) {
return false;
}
if (renderedText.length > maxChars) {
if (lastDeliveredText.length > deliveredTextOffset) {
const supersededMessageId = streamMessageId;
const supersededTextSnapshot = lastSentText;
const supersededParseMode = lastSentParseMode;
const supersededVisibleSinceMs = streamVisibleSinceMs;
deliveredTextOffset = lastDeliveredText.length;
resetStreamToNewMessage({ keepPending: true, resetOffset: false });
if (typeof supersededMessageId === "number") {
params.onSupersededPreview?.({
messageId: supersededMessageId,
textSnapshot: supersededTextSnapshot,
parseMode: supersededParseMode,
visibleSinceMs: supersededVisibleSinceMs,
retain: true,
});
}
return await sendOrEditStreamMessage(trimmed);
}
const chunkLength = findTelegramDraftChunkLength(currentText, maxChars, params.renderText);
if (chunkLength > 0) {
const sent = await sendOrEditStreamMessage(
trimmed.slice(0, deliveredTextOffset) + currentText.slice(0, chunkLength),
);
if (!sent) {
return false;
}
return await sendOrEditStreamMessage(trimmed);
}
streamState.stopped = true;
params.warn?.(
`telegram stream preview stopped (text length ${renderedText.length} > ${maxChars})`,
@@ -248,6 +312,24 @@ export function createTelegramDraftStream(params: {
sendOrEditStreamMessage,
});
resetStreamToNewMessage = (options) => {
streamState.stopped = false;
streamState.final = false;
generation += 1;
messageSendAttempted = false;
streamMessageId = undefined;
streamVisibleSinceMs = undefined;
lastSentText = "";
lastSentParseMode = undefined;
if (options?.resetOffset !== false) {
deliveredTextOffset = 0;
}
if (!options?.keepPending) {
loop.resetPending();
}
loop.resetThrottleWindow();
};
const clear = async () => {
const messageId = await takeMessageIdAfterStop({
stopForClear,
@@ -272,16 +354,7 @@ export function createTelegramDraftStream(params: {
};
const forceNewMessage = () => {
streamState.stopped = false;
streamState.final = false;
generation += 1;
messageSendAttempted = false;
streamMessageId = undefined;
streamVisibleSinceMs = undefined;
lastSentText = "";
lastSentParseMode = undefined;
loop.resetPending();
loop.resetThrottleWindow();
resetStreamToNewMessage();
};
const materialize = async (): Promise<number | undefined> => {