mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(telegram): land #34238 from @hal-crackbot
Landed from contributor PR #34238 by @hal-crackbot. Co-authored-by: Hal Crackbot <hal@crackbot.dev>
This commit is contained in:
@@ -322,6 +322,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Queue/followup dedupe across drain restarts: dedupe queued redelivery `message_id` values after queue recreation so busy-session followups no longer duplicate on replayed inbound events. Landed from contributor PR #33168 by @rylena. Thanks @rylena.
|
||||
- Telegram/preview-final edit idempotence: treat `message is not modified` errors during preview finalization as delivered so partial-stream final replies do not fall back to duplicate sends. Landed from contributor PR #34983 by @HOYALIM. Thanks @HOYALIM.
|
||||
- Telegram/DM streaming transport parity: use message preview transport for all DM streaming lanes so final delivery can edit the active preview instead of sending duplicate finals. Landed from contributor PR #38906 by @gambletan. Thanks @gambletan.
|
||||
- Telegram/send retry safety: retry non-idempotent send paths only for pre-connect failures and make custom retry predicates strict, preventing ambiguous reconnect retries from sending duplicate messages. Landed from contributor PR #34238 by @hal-crackbot. Thanks @hal-crackbot.
|
||||
|
||||
## 2026.3.2
|
||||
|
||||
|
||||
48
src/infra/retry-policy.test.ts
Normal file
48
src/infra/retry-policy.test.ts
Normal file
@@ -0,0 +1,48 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
import { createTelegramRetryRunner } from "./retry-policy.js";
|
||||
|
||||
describe("createTelegramRetryRunner", () => {
|
||||
describe("strictShouldRetry", () => {
|
||||
it("without strictShouldRetry: ECONNRESET is retried via regex fallback even when predicate returns false", async () => {
|
||||
const fn = vi
|
||||
.fn()
|
||||
.mockRejectedValue(Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }));
|
||||
const runner = createTelegramRetryRunner({
|
||||
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
|
||||
shouldRetry: () => false, // predicate says no
|
||||
// strictShouldRetry not set — regex fallback still applies
|
||||
});
|
||||
await expect(runner(fn, "test")).rejects.toThrow("ECONNRESET");
|
||||
// Regex matches "reset" so it retried despite shouldRetry returning false
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("with strictShouldRetry=true: ECONNRESET is NOT retried when predicate returns false", async () => {
|
||||
const fn = vi
|
||||
.fn()
|
||||
.mockRejectedValue(Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }));
|
||||
const runner = createTelegramRetryRunner({
|
||||
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
|
||||
shouldRetry: () => false,
|
||||
strictShouldRetry: true, // predicate is authoritative
|
||||
});
|
||||
await expect(runner(fn, "test")).rejects.toThrow("ECONNRESET");
|
||||
// No retry — predicate returned false and regex fallback was suppressed
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("with strictShouldRetry=true: ECONNREFUSED is still retried when predicate returns true", async () => {
|
||||
const fn = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(Object.assign(new Error("ECONNREFUSED"), { code: "ECONNREFUSED" }))
|
||||
.mockResolvedValue("ok");
|
||||
const runner = createTelegramRetryRunner({
|
||||
retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 },
|
||||
shouldRetry: (err) => (err as { code?: string }).code === "ECONNREFUSED",
|
||||
strictShouldRetry: true,
|
||||
});
|
||||
await expect(runner(fn, "test")).resolves.toBe("ok");
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -76,13 +76,23 @@ export function createTelegramRetryRunner(params: {
|
||||
configRetry?: RetryConfig;
|
||||
verbose?: boolean;
|
||||
shouldRetry?: (err: unknown) => boolean;
|
||||
/**
|
||||
* When true, the custom shouldRetry predicate is used exclusively —
|
||||
* the default TELEGRAM_RETRY_RE fallback regex is NOT OR'd in.
|
||||
* Use this for non-idempotent operations (e.g. sendMessage) where
|
||||
* the regex fallback would cause duplicate message delivery.
|
||||
*/
|
||||
strictShouldRetry?: boolean;
|
||||
}): RetryRunner {
|
||||
const retryConfig = resolveRetryConfig(TELEGRAM_RETRY_DEFAULTS, {
|
||||
...params.configRetry,
|
||||
...params.retry,
|
||||
});
|
||||
const shouldRetry = params.shouldRetry
|
||||
? (err: unknown) => params.shouldRetry?.(err) || TELEGRAM_RETRY_RE.test(formatErrorMessage(err))
|
||||
? params.strictShouldRetry
|
||||
? params.shouldRetry
|
||||
: (err: unknown) =>
|
||||
params.shouldRetry?.(err) || TELEGRAM_RETRY_RE.test(formatErrorMessage(err))
|
||||
: (err: unknown) => TELEGRAM_RETRY_RE.test(formatErrorMessage(err));
|
||||
|
||||
return <T>(fn: () => Promise<T>, label?: string) =>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
|
||||
|
||||
describe("isRecoverableTelegramNetworkError", () => {
|
||||
it("detects recoverable error codes", () => {
|
||||
@@ -106,3 +106,61 @@ describe("isRecoverableTelegramNetworkError", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("isSafeToRetrySendError", () => {
|
||||
it("allows retry for ECONNREFUSED (pre-connect, message not sent)", () => {
|
||||
const err = Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("allows retry for ENOTFOUND (DNS failure, message not sent)", () => {
|
||||
const err = Object.assign(new Error("getaddrinfo ENOTFOUND"), { code: "ENOTFOUND" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("allows retry for EAI_AGAIN (transient DNS, message not sent)", () => {
|
||||
const err = Object.assign(new Error("getaddrinfo EAI_AGAIN"), { code: "EAI_AGAIN" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("allows retry for ENETUNREACH (no route to host, message not sent)", () => {
|
||||
const err = Object.assign(new Error("connect ENETUNREACH"), { code: "ENETUNREACH" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("allows retry for EHOSTUNREACH (host unreachable, message not sent)", () => {
|
||||
const err = Object.assign(new Error("connect EHOSTUNREACH"), { code: "EHOSTUNREACH" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("does NOT allow retry for ECONNRESET (message may already be delivered)", () => {
|
||||
const err = Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(false);
|
||||
});
|
||||
|
||||
it("does NOT allow retry for ETIMEDOUT (message may already be delivered)", () => {
|
||||
const err = Object.assign(new Error("connect ETIMEDOUT"), { code: "ETIMEDOUT" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(false);
|
||||
});
|
||||
|
||||
it("does NOT allow retry for EPIPE (connection broken mid-transfer, message may be delivered)", () => {
|
||||
const err = Object.assign(new Error("write EPIPE"), { code: "EPIPE" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(false);
|
||||
});
|
||||
|
||||
it("does NOT allow retry for UND_ERR_CONNECT_TIMEOUT (ambiguous timing)", () => {
|
||||
const err = Object.assign(new Error("connect timeout"), { code: "UND_ERR_CONNECT_TIMEOUT" });
|
||||
expect(isSafeToRetrySendError(err)).toBe(false);
|
||||
});
|
||||
|
||||
it("does NOT allow retry for non-network errors", () => {
|
||||
expect(isSafeToRetrySendError(new Error("400: Bad Request"))).toBe(false);
|
||||
expect(isSafeToRetrySendError(null)).toBe(false);
|
||||
});
|
||||
|
||||
it("detects pre-connect error nested in cause chain", () => {
|
||||
const root = Object.assign(new Error("ECONNREFUSED"), { code: "ECONNREFUSED" });
|
||||
const wrapped = Object.assign(new Error("fetch failed"), { cause: root });
|
||||
expect(isSafeToRetrySendError(wrapped)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -24,6 +24,24 @@ const RECOVERABLE_ERROR_CODES = new Set([
|
||||
"ERR_NETWORK",
|
||||
]);
|
||||
|
||||
/**
|
||||
* Error codes that are safe to retry for non-idempotent send operations (e.g. sendMessage).
|
||||
*
|
||||
* These represent failures that occur *before* the request reaches Telegram's servers,
|
||||
* meaning the message was definitely not delivered and it is safe to retry.
|
||||
*
|
||||
* Contrast with RECOVERABLE_ERROR_CODES which includes codes like ECONNRESET and ETIMEDOUT
|
||||
* that can fire *after* Telegram has already received and delivered a message — retrying
|
||||
* those would cause duplicate messages.
|
||||
*/
|
||||
const PRE_CONNECT_ERROR_CODES = new Set([
|
||||
"ECONNREFUSED", // Server actively refused the connection (never reached Telegram)
|
||||
"ENOTFOUND", // DNS resolution failed (never sent)
|
||||
"EAI_AGAIN", // Transient DNS failure (never sent)
|
||||
"ENETUNREACH", // No route to host (never sent)
|
||||
"EHOSTUNREACH", // Host unreachable (never sent)
|
||||
]);
|
||||
|
||||
const RECOVERABLE_ERROR_NAMES = new Set([
|
||||
"AbortError",
|
||||
"TimeoutError",
|
||||
@@ -71,6 +89,36 @@ function getErrorCode(err: unknown): string | undefined {
|
||||
|
||||
export type TelegramNetworkErrorContext = "polling" | "send" | "webhook" | "unknown";
|
||||
|
||||
/**
|
||||
* Returns true if the error is safe to retry for a non-idempotent Telegram send operation
|
||||
* (e.g. sendMessage). Only matches errors that are guaranteed to have occurred *before*
|
||||
* the request reached Telegram's servers, preventing duplicate message delivery.
|
||||
*
|
||||
* Use this instead of isRecoverableTelegramNetworkError for sendMessage/sendPhoto/etc.
|
||||
* calls where a retry would create a duplicate visible message.
|
||||
*/
|
||||
export function isSafeToRetrySendError(err: unknown): boolean {
|
||||
if (!err) {
|
||||
return false;
|
||||
}
|
||||
for (const candidate of collectErrorGraphCandidates(err, (current) => {
|
||||
const nested: Array<unknown> = [current.cause, current.reason];
|
||||
if (Array.isArray(current.errors)) {
|
||||
nested.push(...current.errors);
|
||||
}
|
||||
if (readErrorName(current) === "HttpError") {
|
||||
nested.push(current.error);
|
||||
}
|
||||
return nested;
|
||||
})) {
|
||||
const code = normalizeCode(getErrorCode(candidate));
|
||||
if (code && PRE_CONNECT_ERROR_CODES.has(code)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function isRecoverableTelegramNetworkError(
|
||||
err: unknown,
|
||||
options: { context?: TelegramNetworkErrorContext; allowMessageMatch?: boolean } = {},
|
||||
|
||||
@@ -27,7 +27,7 @@ import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { splitTelegramCaption } from "./caption.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { renderTelegramHtmlText } from "./format.js";
|
||||
import { isRecoverableTelegramNetworkError } from "./network-errors.js";
|
||||
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
|
||||
import { makeProxyFetch } from "./proxy.js";
|
||||
import { recordSentMessage } from "./sent-message-cache.js";
|
||||
import { maybePersistResolvedTelegramTarget } from "./target-writeback.js";
|
||||
@@ -349,6 +349,8 @@ function createTelegramRequestWithDiag(params: {
|
||||
retry?: RetryConfig;
|
||||
verbose?: boolean;
|
||||
shouldRetry?: (err: unknown) => boolean;
|
||||
/** When true, the shouldRetry predicate is used exclusively without the TELEGRAM_RETRY_RE fallback. */
|
||||
strictShouldRetry?: boolean;
|
||||
useApiErrorLogging?: boolean;
|
||||
}): TelegramRequestWithDiag {
|
||||
const request = createTelegramRetryRunner({
|
||||
@@ -356,6 +358,7 @@ function createTelegramRequestWithDiag(params: {
|
||||
configRetry: params.account.config.retry,
|
||||
verbose: params.verbose,
|
||||
...(params.shouldRetry ? { shouldRetry: params.shouldRetry } : {}),
|
||||
...(params.strictShouldRetry ? { strictShouldRetry: true } : {}),
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(params.cfg);
|
||||
return <T>(
|
||||
@@ -491,7 +494,8 @@ export async function sendMessageTelegram(
|
||||
account,
|
||||
retry: opts.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
shouldRetry: (err) => isSafeToRetrySendError(err),
|
||||
strictShouldRetry: true,
|
||||
});
|
||||
const requestWithChatNotFound = createRequestWithChatNotFound({
|
||||
requestWithDiag,
|
||||
@@ -1098,7 +1102,8 @@ export async function sendPollTelegram(
|
||||
account,
|
||||
retry: opts.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
shouldRetry: (err) => isSafeToRetrySendError(err),
|
||||
strictShouldRetry: true,
|
||||
});
|
||||
const requestWithChatNotFound = createRequestWithChatNotFound({
|
||||
requestWithDiag,
|
||||
@@ -1217,7 +1222,8 @@ export async function createForumTopicTelegram(
|
||||
retry: opts.retry,
|
||||
configRetry: account.config.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }),
|
||||
shouldRetry: (err) => isSafeToRetrySendError(err),
|
||||
strictShouldRetry: true,
|
||||
});
|
||||
const logHttpError = createTelegramHttpLogger(cfg);
|
||||
const requestWithDiag = <T>(fn: () => Promise<T>, label?: string) =>
|
||||
|
||||
Reference in New Issue
Block a user