fix(telegram): prevent duplicate messages with slow LLM providers (#41932)

Merged via squash.

Prepared head SHA: 2f50c51d5a
Co-authored-by: hougangdev <105773686+hougangdev@users.noreply.github.com>
Co-authored-by: obviyus <22031114+obviyus@users.noreply.github.com>
Reviewed-by: @obviyus
This commit is contained in:
Wayne
2026-03-11 13:49:55 +08:00
committed by GitHub
parent 7761e7626f
commit e37e1ed24e
10 changed files with 299 additions and 20 deletions

View File

@@ -13,6 +13,7 @@ export type TestDraftStream = {
stop: ReturnType<typeof vi.fn<() => Promise<void>>>;
materialize: ReturnType<typeof vi.fn<() => Promise<number | undefined>>>;
forceNewMessage: ReturnType<typeof vi.fn<() => void>>;
sendMayHaveLanded: ReturnType<typeof vi.fn<() => boolean>>;
setMessageId: (value: number | undefined) => void;
};
@@ -47,6 +48,7 @@ export function createTestDraftStream(params?: {
messageId = undefined;
}
}),
sendMayHaveLanded: vi.fn().mockReturnValue(false),
setMessageId: (value: number | undefined) => {
messageId = value;
},
@@ -77,6 +79,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
forceNewMessage: vi.fn().mockImplementation(() => {
activeMessageId = undefined;
}),
sendMayHaveLanded: vi.fn().mockReturnValue(false),
setMessageId: (value: number | undefined) => {
activeMessageId = value;
},

View File

@@ -435,6 +435,46 @@ describe("createTelegramDraftStream", () => {
expect(api.editMessageText).not.toHaveBeenCalledWith(123, 17, "Message B partial");
});
it("marks sendMayHaveLanded after an ambiguous first preview send failure", async () => {
const api = createMockDraftApi();
api.sendMessage.mockRejectedValueOnce(new Error("timeout after Telegram accepted send"));
const stream = createDraftStream(api);
stream.update("Hello");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(1);
expect(stream.sendMayHaveLanded?.()).toBe(true);
});
it("clears sendMayHaveLanded on pre-connect first preview send failures", async () => {
const api = createMockDraftApi();
api.sendMessage.mockRejectedValueOnce(
Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED" }),
);
const stream = createDraftStream(api);
stream.update("Hello");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(1);
expect(stream.sendMayHaveLanded?.()).toBe(false);
});
it("clears sendMayHaveLanded on Telegram 4xx client rejections", async () => {
const api = createMockDraftApi();
api.sendMessage.mockRejectedValueOnce(
Object.assign(new Error("403: Forbidden"), { error_code: 403 }),
);
const stream = createDraftStream(api);
stream.update("Hello");
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(1);
expect(stream.sendMayHaveLanded?.()).toBe(false);
});
it("supports rendered previews with parse_mode", async () => {
const api = createMockDraftApi();
const stream = createTelegramDraftStream({

View File

@@ -1,6 +1,7 @@
import type { Bot } from "grammy";
import { createFinalizableDraftLifecycle } from "../channels/draft-stream-controls.js";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js";
const TELEGRAM_STREAM_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 1000;
@@ -66,6 +67,8 @@ export type TelegramDraftStream = {
materialize?: () => Promise<number | undefined>;
/** Reset internal state so the next update creates a new message instead of editing. */
forceNewMessage: () => void;
/** True when a preview sendMessage was attempted but the response was lost. */
sendMayHaveLanded?: () => boolean;
};
type TelegramDraftPreview = {
@@ -126,6 +129,7 @@ export function createTelegramDraftStream(params: {
}
const streamState = { stopped: false, final: false };
let messageSendAttempted = false;
let streamMessageId: number | undefined;
let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined;
let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message";
@@ -192,12 +196,24 @@ export function createTelegramDraftStream(params: {
}
return true;
}
const { sent } = await sendRenderedMessageWithThreadFallback({
renderedText,
renderedParseMode,
fallbackWarnMessage:
"telegram stream preview send failed with message_thread_id, retrying without thread",
});
messageSendAttempted = true;
let sent: Awaited<ReturnType<typeof sendRenderedMessageWithThreadFallback>>["sent"];
try {
({ sent } = await sendRenderedMessageWithThreadFallback({
renderedText,
renderedParseMode,
fallbackWarnMessage:
"telegram stream preview send failed with message_thread_id, retrying without thread",
}));
} catch (err) {
// Pre-connect failures (DNS, refused) and explicit Telegram rejections (4xx)
// guarantee the message was never delivered — clear the flag so
// sendMayHaveLanded() doesn't suppress fallback.
if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) {
messageSendAttempted = false;
}
throw err;
}
const sentMessageId = sent?.message_id;
if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) {
streamState.stopped = true;
@@ -345,6 +361,7 @@ export function createTelegramDraftStream(params: {
// Re-open the stream lifecycle for the next assistant segment.
streamState.final = false;
generation += 1;
messageSendAttempted = false;
streamMessageId = undefined;
if (previewTransport === "draft") {
streamDraftId = allocateTelegramDraftId();
@@ -421,5 +438,6 @@ export function createTelegramDraftStream(params: {
stop,
materialize,
forceNewMessage,
sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",
};
}

View File

@@ -1,7 +1,11 @@
import type { ReplyPayload } from "../auto-reply/types.js";
import type { TelegramInlineButtons } from "./button-types.js";
import type { TelegramDraftStream } from "./draft-stream.js";
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
import {
isRecoverableTelegramNetworkError,
isSafeToRetrySendError,
isTelegramClientRejection,
} from "./network-errors.js";
const MESSAGE_NOT_MODIFIED_RE =
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
@@ -270,10 +274,18 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
params.markDelivered();
return "retained";
}
if (isTelegramClientRejection(err)) {
params.log(
`telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`,
);
return "fallback";
}
// Default: ambiguous error — prefer incomplete over duplicate
params.log(
`telegram: ${args.laneName} preview final edit rejected by Telegram; falling back to standard send (${String(err)})`,
`telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`,
);
return "fallback";
params.markDelivered();
return "retained";
}
params.log(
`telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`,
@@ -353,6 +365,13 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
context,
});
if (typeof previewTargetAfterStop.previewMessageId !== "number") {
if (lane.stream?.sendMayHaveLanded?.()) {
params.log(
`telegram: ${laneName} first preview send may have landed despite missing message id; keeping to avoid duplicate`,
);
params.markDelivered();
return "retained";
}
return "fallback";
}
return finalizePreview(previewTargetAfterStop.previewMessageId, true, false);
@@ -367,6 +386,17 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
context,
});
if (typeof previewTargetAfterStop.previewMessageId !== "number") {
// Only retain for final delivery when a prior preview is already visible
// to the user — otherwise falling back is safer than silence. For updates,
// always fall back so the caller can attempt sendPayload without stale
// markDelivered() state.
if (context === "final" && lane.hasStreamedMessage && lane.stream?.sendMayHaveLanded?.()) {
params.log(
`telegram: ${laneName} preview send may have landed despite missing message id; keeping to avoid duplicate`,
);
params.markDelivered();
return "retained";
}
return "fallback";
}
const activePreviewMessageId = lane.stream?.messageId();

View File

@@ -174,8 +174,9 @@ describe("createLaneTextDeliverer", () => {
);
});
it("falls back to sendPayload when an existing preview final edit is rejected", async () => {
it("retains preview when an existing preview final edit fails with ambiguous error", async () => {
const harness = createHarness({ answerMessageId: 999 });
// Plain Error with no error_code → ambiguous, prefer incomplete over duplicate
harness.editPreview.mockRejectedValue(new Error("500: preview edit failed"));
const result = await harness.deliverLaneText({
@@ -185,13 +186,11 @@ describe("createLaneTextDeliverer", () => {
infoKind: "final",
});
expect(result).toBe("sent");
expect(result).toBe("preview-retained");
expect(harness.editPreview).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Hello final" }),
);
expect(harness.sendPayload).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("edit rejected by Telegram; falling back"),
expect.stringContaining("ambiguous error; keeping existing preview to avoid duplicate"),
);
});
@@ -433,8 +432,9 @@ describe("createLaneTextDeliverer", () => {
// During final delivery, only ambiguous post-connect failures keep the
// preview. Definite non-delivery falls back to a real send.
it("falls back on API error during final", async () => {
it("retains preview on ambiguous API error during final", async () => {
const harness = createHarness({ answerMessageId: 999 });
// Plain Error with no error_code → ambiguous, prefer incomplete over duplicate
harness.editPreview.mockRejectedValue(new Error("500: Internal Server Error"));
const result = await harness.deliverLaneText({
@@ -444,9 +444,9 @@ describe("createLaneTextDeliverer", () => {
infoKind: "final",
});
expect(result).toBe("sent");
expect(result).toBe("preview-retained");
expect(harness.editPreview).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).not.toHaveBeenCalled();
});
it("falls back when an archived preview edit target is missing and no alternate preview exists", async () => {
@@ -497,6 +497,94 @@ describe("createLaneTextDeliverer", () => {
);
});
it("falls back on 4xx client rejection with error_code during final", async () => {
const harness = createHarness({ answerMessageId: 999 });
const err = Object.assign(new Error("403: Forbidden"), { error_code: 403 });
harness.editPreview.mockRejectedValue(err);
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Hello final",
payload: { text: "Hello final" },
infoKind: "final",
});
expect(result).toBe("sent");
expect(harness.editPreview).toHaveBeenCalledTimes(1);
expect(harness.sendPayload).toHaveBeenCalledWith(
expect.objectContaining({ text: "Hello final" }),
);
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("rejected by Telegram (client error); falling back"),
);
});
it("retains preview on 502 with error_code during final (ambiguous server error)", async () => {
const harness = createHarness({ answerMessageId: 999 });
const err = Object.assign(new Error("502: Bad Gateway"), { error_code: 502 });
harness.editPreview.mockRejectedValue(err);
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Hello final",
payload: { text: "Hello final" },
infoKind: "final",
});
expect(result).toBe("preview-retained");
expect(harness.sendPayload).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("ambiguous error; keeping existing preview to avoid duplicate"),
);
});
it("retains when the first preview send may have landed without a message id", async () => {
const stream = createTestDraftStream();
stream.sendMayHaveLanded.mockReturnValue(true);
const harness = createHarness({ answerStream: stream });
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Hello final",
payload: { text: "Hello final" },
infoKind: "final",
});
expect(result).toBe("preview-retained");
expect(harness.sendPayload).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("first preview send may have landed despite missing message id"),
);
});
it("retains when sendMayHaveLanded is true and a prior preview was visible", async () => {
// Stream has a messageId (visible preview) but loses it after stop
const stream = createTestDraftStream({ messageId: 999 });
stream.sendMayHaveLanded.mockReturnValue(true);
const harness = createHarness({
answerStream: stream,
answerHasStreamedMessage: true,
});
// Simulate messageId lost after stop (e.g. forceNewMessage or timeout)
harness.stopDraftLane.mockImplementation(async (lane: DraftLaneState) => {
stream.setMessageId(undefined);
await lane.stream?.stop();
});
const result = await harness.deliverLaneText({
laneName: "answer",
text: "Hello final",
payload: { text: "Hello final" },
infoKind: "final",
});
expect(result).toBe("preview-retained");
expect(harness.sendPayload).not.toHaveBeenCalled();
expect(harness.log).toHaveBeenCalledWith(
expect.stringContaining("preview send may have landed despite missing message id"),
);
});
it("deletes consumed boundary previews after fallback final send", async () => {
const harness = createHarness();
harness.archivedAnswerPreviews.push({

View File

@@ -1,5 +1,10 @@
import { describe, expect, it } from "vitest";
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
import {
isRecoverableTelegramNetworkError,
isSafeToRetrySendError,
isTelegramClientRejection,
isTelegramServerError,
} from "./network-errors.js";
describe("isRecoverableTelegramNetworkError", () => {
it("detects recoverable error codes", () => {
@@ -164,3 +169,51 @@ describe("isSafeToRetrySendError", () => {
expect(isSafeToRetrySendError(wrapped)).toBe(true);
});
});
describe("isTelegramServerError", () => {
it("returns true for error_code 500", () => {
const err = Object.assign(new Error("Internal Server Error"), { error_code: 500 });
expect(isTelegramServerError(err)).toBe(true);
});
it("returns true for error_code 502", () => {
const err = Object.assign(new Error("Bad Gateway"), { error_code: 502 });
expect(isTelegramServerError(err)).toBe(true);
});
it("returns false for error_code 403", () => {
const err = Object.assign(new Error("Forbidden"), { error_code: 403 });
expect(isTelegramServerError(err)).toBe(false);
});
it("returns false for plain Error", () => {
expect(isTelegramServerError(new Error("500: Internal Server Error"))).toBe(false);
});
});
describe("isTelegramClientRejection", () => {
it("returns true for error_code 400", () => {
const err = Object.assign(new Error("Bad Request"), { error_code: 400 });
expect(isTelegramClientRejection(err)).toBe(true);
});
it("returns true for error_code 403", () => {
const err = Object.assign(new Error("Forbidden"), { error_code: 403 });
expect(isTelegramClientRejection(err)).toBe(true);
});
it("returns false for error_code 502", () => {
const err = Object.assign(new Error("Bad Gateway"), { error_code: 502 });
expect(isTelegramClientRejection(err)).toBe(false);
});
it("returns false for plain Error", () => {
expect(isTelegramClientRejection(new Error("400: Bad Request"))).toBe(false);
});
it("detects error_code in nested cause", () => {
const inner = Object.assign(new Error("Forbidden"), { error_code: 403 });
const outer = Object.assign(new Error("wrapped"), { cause: inner });
expect(isTelegramClientRejection(outer)).toBe(true);
});
});

View File

@@ -123,6 +123,29 @@ export function isSafeToRetrySendError(err: unknown): boolean {
return false;
}
function hasTelegramErrorCode(err: unknown, matches: (code: number) => boolean): boolean {
for (const candidate of collectTelegramErrorCandidates(err)) {
if (!candidate || typeof candidate !== "object" || !("error_code" in candidate)) {
continue;
}
const code = (candidate as { error_code: unknown }).error_code;
if (typeof code === "number" && matches(code)) {
return true;
}
}
return false;
}
/** Returns true for HTTP 5xx server errors (error may have been processed). */
export function isTelegramServerError(err: unknown): boolean {
return hasTelegramErrorCode(err, (code) => code >= 500);
}
/** Returns true for HTTP 4xx client errors (Telegram explicitly rejected, not applied). */
export function isTelegramClientRejection(err: unknown): boolean {
return hasTelegramErrorCode(err, (code) => code >= 400 && code < 500);
}
export function isRecoverableTelegramNetworkError(
err: unknown,
options: { context?: TelegramNetworkErrorContext; allowMessageMatch?: boolean } = {},

View File

@@ -1734,6 +1734,22 @@ describe("editMessageTelegram", () => {
expect(botApi.editMessageText).toHaveBeenCalledTimes(1);
});
it("retries editMessageTelegram on Telegram 5xx errors", async () => {
botApi.editMessageText
.mockRejectedValueOnce(Object.assign(new Error("502: Bad Gateway"), { error_code: 502 }))
.mockResolvedValueOnce({ message_id: 1, chat: { id: "123" } });
await expect(
editMessageTelegram("123", 1, "hi", {
token: "tok",
cfg: {},
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
}),
).resolves.toEqual({ ok: true, messageId: "1", chatId: "123" });
expect(botApi.editMessageText).toHaveBeenCalledTimes(2);
});
it("disables link previews when linkPreview is false", async () => {
botApi.editMessageText.mockResolvedValue({ message_id: 1, chat: { id: "123" } });

View File

@@ -27,7 +27,11 @@ import type { TelegramInlineButtons } from "./button-types.js";
import { splitTelegramCaption } from "./caption.js";
import { resolveTelegramFetch } from "./fetch.js";
import { renderTelegramHtmlText, splitTelegramHtmlChunks } from "./format.js";
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
import {
isRecoverableTelegramNetworkError,
isSafeToRetrySendError,
isTelegramServerError,
} from "./network-errors.js";
import { makeProxyFetch } from "./proxy.js";
import { recordSentMessage } from "./sent-message-cache.js";
import { maybePersistResolvedTelegramTarget } from "./target-writeback.js";
@@ -1150,6 +1154,9 @@ export async function editMessageTelegram(
account,
retry: opts.retry,
verbose: opts.verbose,
shouldRetry: (err) =>
isRecoverableTelegramNetworkError(err, { allowMessageMatch: true }) ||
isTelegramServerError(err),
});
const requestWithEditShouldLog = <T>(
fn: () => Promise<T>,