fix(telegram): add initial message debounce for better push notifications (#18147)

Merged via /review-pr -> /prepare-pr -> /merge-pr.

Prepared head SHA: 5e2285b6a0
Co-authored-by: Marvae <11957602+Marvae@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Hongwei Ma
2026-02-17 13:51:49 +08:00
committed by GitHub
parent 2e375a5498
commit 7ffc8f9f7c
6 changed files with 261 additions and 32 deletions

View File

@@ -41,6 +41,7 @@ Docs: https://docs.openclaw.ai
- Discord: prevent duplicate media delivery when the model uses the `message send` tool with media, by skipping media extraction from messaging tool results since the tool already sent the message directly. (#18270)
- Telegram: keep draft-stream preview replies attached to the user message for `replyToMode: "all"` in groups and DMs, preserving threaded reply context from preview through finalization. (#17880) Thanks @yinghaosang.
- Telegram: prevent streaming final replies from being overwritten by later final/error payloads, and suppress fallback tool-error warnings when a recovered assistant answer already exists after tool calls. (#17883) Thanks @Marvae and @obviyus.
- Telegram: debounce the first draft-stream preview update (30-char threshold) and finalize short responses by editing the stop-time preview message, improving first push notifications and avoiding duplicate final sends. (#18148) Thanks @Marvae.
- Telegram: disable block streaming when `channels.telegram.streamMode` is `off`, preventing newline/content-block replies from splitting into multiple messages. (#17679) Thanks @saivarunk.
- Telegram: route non-abort slash commands on the normal chat/topic sequential lane while keeping true abort requests (`/stop`, `stop`) on the control lane, preventing command/reply race conditions from control-lane bypass. (#17899) Thanks @obviyus.
- Telegram: ignore `<media:...>` placeholder lines when extracting `MEDIA:` tool-result paths, preventing false local-file reads and dropped replies. (#18510) Thanks @yinghaosang.

View File

@@ -9,11 +9,11 @@ export type DraftStreamLoop = {
export function createDraftStreamLoop(params: {
throttleMs: number;
isStopped: () => boolean;
sendOrEditStreamMessage: (text: string) => Promise<void>;
sendOrEditStreamMessage: (text: string) => Promise<void | boolean>;
}): DraftStreamLoop {
let lastSentAt = 0;
let pendingText = "";
let inFlightPromise: Promise<void> | undefined;
let inFlightPromise: Promise<void | boolean> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
const flush = async () => {
@@ -32,14 +32,18 @@ export function createDraftStreamLoop(params: {
return;
}
pendingText = "";
lastSentAt = Date.now();
const current = params.sendOrEditStreamMessage(text).finally(() => {
if (inFlightPromise === current) {
inFlightPromise = undefined;
}
});
inFlightPromise = current;
await current;
const sent = await current;
if (sent === false) {
pendingText = text;
return;
}
lastSentAt = Date.now();
if (!pendingText) {
return;
}

View File

@@ -1,5 +1,5 @@
import path from "node:path";
import type { Bot } from "grammy";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { STATE_DIR } from "../config/paths.js";
@@ -47,7 +47,7 @@ describe("dispatchTelegramMessage draft streaming", () => {
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockReturnValue(messageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn(),
stop: vi.fn().mockResolvedValue(undefined),
forceNewMessage: vi.fn(),
};
}
@@ -216,6 +216,33 @@ describe("dispatchTelegramMessage draft streaming", () => {
expect(draftStream.stop).toHaveBeenCalled();
});
it("edits the preview message created during stop() final flush", async () => {
let messageId: number | undefined;
const draftStream = {
update: vi.fn(),
flush: vi.fn().mockResolvedValue(undefined),
messageId: vi.fn().mockImplementation(() => messageId),
clear: vi.fn().mockResolvedValue(undefined),
stop: vi.fn().mockImplementation(async () => {
messageId = 777;
}),
forceNewMessage: vi.fn(),
};
createTelegramDraftStream.mockReturnValue(draftStream);
dispatchReplyWithBufferedBlockDispatcher.mockImplementation(async ({ dispatcherOptions }) => {
await dispatcherOptions.deliver({ text: "Short final" }, { kind: "final" });
return { queuedFinal: true };
});
deliverReplies.mockResolvedValue({ delivered: true });
editMessageTelegram.mockResolvedValue({ ok: true, chatId: "123", messageId: "777" });
await dispatchWithContext({ context: createContext() });
expect(editMessageTelegram).toHaveBeenCalledWith(123, 777, "Short final", expect.any(Object));
expect(deliverReplies).not.toHaveBeenCalled();
expect(draftStream.stop).toHaveBeenCalled();
});
it("does not overwrite finalized preview when additional final payloads are sent", async () => {
const draftStream = createDraftStream(999);
createTelegramDraftStream.mockReturnValue(draftStream);

View File

@@ -1,4 +1,10 @@
import type { Bot } from "grammy";
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
import type { RuntimeEnv } from "../runtime.js";
import type { TelegramMessageContext } from "./bot-message-context.js";
import type { TelegramBotOptions } from "./bot.js";
import type { TelegramStreamMode } from "./bot/types.js";
import type { TelegramInlineButtons } from "./button-types.js";
import { resolveAgentDir } from "../agents/agent-scope.js";
import {
findModelInCatalog,
@@ -15,15 +21,9 @@ import { logAckFailure, logTypingFailure } from "../channels/logging.js";
import { createReplyPrefixOptions } from "../channels/reply-prefix.js";
import { createTypingCallbacks } from "../channels/typing.js";
import { resolveMarkdownTableMode } from "../config/markdown-tables.js";
import type { OpenClawConfig, ReplyToMode, TelegramAccountConfig } from "../config/types.js";
import { danger, logVerbose } from "../globals.js";
import { getAgentScopedMediaLocalRoots } from "../media/local-roots.js";
import type { RuntimeEnv } from "../runtime.js";
import type { TelegramMessageContext } from "./bot-message-context.js";
import type { TelegramBotOptions } from "./bot.js";
import { deliverReplies } from "./bot/delivery.js";
import type { TelegramStreamMode } from "./bot/types.js";
import type { TelegramInlineButtons } from "./button-types.js";
import { resolveTelegramDraftStreamingChunking } from "./draft-chunking.js";
import { createTelegramDraftStream } from "./draft-stream.js";
import { editMessageTelegram } from "./send.js";
@@ -31,6 +31,9 @@ import { cacheSticker, describeStickerImage } from "./sticker-cache.js";
const EMPTY_RESPONSE_FALLBACK = "No response generated. Please try again.";
/** Minimum chars before sending first streaming message (improves push notification UX) */
const DRAFT_MIN_INITIAL_CHARS = 30;
async function resolveStickerVisionSupport(cfg: OpenClawConfig, agentId: string) {
try {
const catalog = await loadModelCatalog({ config: cfg });
@@ -101,6 +104,7 @@ export const dispatchTelegramMessage = async ({
maxChars: draftMaxChars,
thread: threadSpec,
replyToMessageId: draftReplyToMessageId,
minInitialChars: DRAFT_MIN_INITIAL_CHARS,
log: logVerbose,
warn: logVerbose,
})
@@ -314,7 +318,7 @@ export const dispatchTelegramMessage = async ({
finalText.length <= draftMaxChars &&
!payload.isError;
if (canFinalizeViaPreviewEdit) {
draftStream?.stop();
await draftStream?.stop();
draftStoppedForPreviewEdit = true;
if (
currentPreviewText &&
@@ -353,7 +357,36 @@ export const dispatchTelegramMessage = async ({
);
}
if (!draftStoppedForPreviewEdit) {
draftStream?.stop();
await draftStream?.stop();
}
// Check if stop() sent a message (debounce released on isFinal)
// If so, edit that message instead of sending a new one
const messageIdAfterStop = draftStream?.messageId();
if (
!finalizedViaPreviewMessage &&
typeof messageIdAfterStop === "number" &&
typeof finalText === "string" &&
finalText.length > 0 &&
finalText.length <= draftMaxChars &&
!hasMedia &&
!payload.isError
) {
try {
await editMessageTelegram(chatId, messageIdAfterStop, finalText, {
api: bot.api,
cfg,
accountId: route.accountId,
linkPreview: telegramCfg.linkPreview,
buttons: previewButtons,
});
finalizedViaPreviewMessage = true;
deliveryState.delivered = true;
return;
} catch (err) {
logVerbose(
`telegram: post-stop preview edit failed; falling back to standard send (${String(err)})`,
);
}
}
}
const result = await deliverReplies({
@@ -421,10 +454,11 @@ export const dispatchTelegramMessage = async ({
},
}));
} finally {
// Must stop() first to flush debounced content before clear() wipes state
await draftStream?.stop();
if (!finalizedViaPreviewMessage) {
await draftStream?.clear();
}
draftStream?.stop();
}
let sentFallback = false;
if (!deliveryState.delivered && deliveryState.skippedNonSilent > 0) {

View File

@@ -1,4 +1,4 @@
import { describe, expect, it, vi } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createTelegramDraftStream } from "./draft-stream.js";
function createMockDraftApi(sendMessageImpl?: () => Promise<{ message_id: number }>) {
@@ -134,3 +134,142 @@ describe("createTelegramDraftStream", () => {
expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined);
});
});
describe("draft stream initial message debounce", () => {
const createMockApi = () => ({
sendMessage: vi.fn().mockResolvedValue({ message_id: 42 }),
editMessageText: vi.fn().mockResolvedValue(true),
deleteMessage: vi.fn().mockResolvedValue(true),
});
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
describe("isFinal has highest priority", () => {
it("sends immediately on stop() even with 1 character", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
minInitialChars: 30,
});
stream.update("Y");
await stream.stop();
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledWith(123, "Y", undefined);
});
it("sends immediately on stop() with short sentence", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
minInitialChars: 30,
});
stream.update("Ok.");
await stream.stop();
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledWith(123, "Ok.", undefined);
});
});
describe("minInitialChars threshold", () => {
it("does not send first message below threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
minInitialChars: 30,
});
stream.update("Processing"); // 10 chars, below 30
await stream.flush();
expect(api.sendMessage).not.toHaveBeenCalled();
});
it("sends first message when reaching threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
minInitialChars: 30,
});
// Exactly 30 chars
stream.update("I am processing your request..");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalled();
});
it("works with longer text above threshold", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
minInitialChars: 30,
});
stream.update("I am processing your request, please wait a moment"); // 50 chars
await stream.flush();
expect(api.sendMessage).toHaveBeenCalled();
});
});
describe("subsequent updates after first message", () => {
it("edits normally after first message is sent", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
minInitialChars: 30,
});
// First message at threshold (30 chars)
stream.update("I am processing your request..");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(1);
// Subsequent updates should edit, not wait for threshold
stream.update("I am processing your request.. and summarizing");
await stream.flush();
expect(api.editMessageText).toHaveBeenCalled();
expect(api.sendMessage).toHaveBeenCalledTimes(1); // still only 1 send
});
});
describe("default behavior without debounce params", () => {
it("sends immediately without minInitialChars set (backward compatible)", async () => {
const api = createMockApi();
const stream = createTelegramDraftStream({
// oxlint-disable-next-line typescript/no-explicit-any
api: api as any,
chatId: 123,
// no minInitialChars (backward-compatible behavior)
});
stream.update("Hi");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledWith(123, "Hi", undefined);
});
});
});

View File

@@ -10,7 +10,7 @@ export type TelegramDraftStream = {
flush: () => Promise<void>;
messageId: () => number | undefined;
clear: () => Promise<void>;
stop: () => void;
stop: () => Promise<void>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
};
@@ -22,6 +22,8 @@ export function createTelegramDraftStream(params: {
thread?: TelegramThreadSpec | null;
replyToMessageId?: number;
throttleMs?: number;
/** Minimum chars before sending first message (debounce for push notifications) */
minInitialChars?: number;
log?: (message: string) => void;
warn?: (message: string) => void;
}): TelegramDraftStream {
@@ -30,6 +32,7 @@ export function createTelegramDraftStream(params: {
TELEGRAM_STREAM_MAX_CHARS,
);
const throttleMs = Math.max(250, params.throttleMs ?? DEFAULT_THROTTLE_MS);
const minInitialChars = params.minInitialChars;
const chatId = params.chatId;
const threadParams = buildTelegramThreadParams(params.thread);
const replyParams =
@@ -40,14 +43,16 @@ export function createTelegramDraftStream(params: {
let streamMessageId: number | undefined;
let lastSentText = "";
let stopped = false;
let isFinal = false;
const sendOrEditStreamMessage = async (text: string) => {
if (stopped) {
return;
const sendOrEditStreamMessage = async (text: string): Promise<boolean> => {
// Allow final flush even if stopped (e.g., after clear()).
if (stopped && !isFinal) {
return false;
}
const trimmed = text.trimEnd();
if (!trimmed) {
return;
return false;
}
if (trimmed.length > maxChars) {
// Telegram text messages/edits cap at 4096 chars.
@@ -56,40 +61,64 @@ export function createTelegramDraftStream(params: {
params.warn?.(
`telegram stream preview stopped (text length ${trimmed.length} > ${maxChars})`,
);
return;
return false;
}
if (trimmed === lastSentText) {
return;
return true;
}
// Debounce first preview send for better push notification quality.
if (typeof streamMessageId !== "number" && minInitialChars != null && !isFinal) {
if (trimmed.length < minInitialChars) {
return false;
}
}
lastSentText = trimmed;
try {
if (typeof streamMessageId === "number") {
await params.api.editMessageText(chatId, streamMessageId, trimmed);
return;
return true;
}
const sent = await params.api.sendMessage(chatId, trimmed, replyParams);
const sentMessageId = sent?.message_id;
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
stopped = true;
params.warn?.("telegram stream preview stopped (missing message id from sendMessage)");
return;
return false;
}
streamMessageId = Math.trunc(sentMessageId);
return true;
} catch (err) {
stopped = true;
params.warn?.(
`telegram stream preview failed: ${err instanceof Error ? err.message : String(err)}`,
);
return false;
}
};
const loop = createDraftStreamLoop({
throttleMs,
isStopped: () => stopped,
sendOrEditStreamMessage,
});
const update = (text: string) => {
if (stopped || isFinal) {
return;
}
loop.update(text);
};
const stop = async (): Promise<void> => {
isFinal = true;
await loop.flush();
};
const clear = async () => {
stop();
stopped = true;
loop.stop();
await loop.waitForInFlight();
const messageId = streamMessageId;
streamMessageId = undefined;
@@ -105,11 +134,6 @@ export function createTelegramDraftStream(params: {
}
};
const stop = () => {
stopped = true;
loop.stop();
};
const forceNewMessage = () => {
streamMessageId = undefined;
lastSentText = "";
@@ -119,7 +143,7 @@ export function createTelegramDraftStream(params: {
params.log?.(`telegram stream preview ready (maxChars=${maxChars}, throttleMs=${throttleMs})`);
return {
update: loop.update,
update,
flush: loop.flush,
messageId: () => streamMessageId,
clear,