diff --git a/CHANGELOG.md b/CHANGELOG.md index ceab2e1061f..13d9b910d01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -322,6 +322,7 @@ Docs: https://docs.openclaw.ai - Queue/followup dedupe across drain restarts: dedupe queued redelivery `message_id` values after queue recreation so busy-session followups no longer duplicate on replayed inbound events. Landed from contributor PR #33168 by @rylena. Thanks @rylena. - Telegram/preview-final edit idempotence: treat `message is not modified` errors during preview finalization as delivered so partial-stream final replies do not fall back to duplicate sends. Landed from contributor PR #34983 by @HOYALIM. Thanks @HOYALIM. - Telegram/DM streaming transport parity: use message preview transport for all DM streaming lanes so final delivery can edit the active preview instead of sending duplicate finals. Landed from contributor PR #38906 by @gambletan. Thanks @gambletan. +- Telegram/send retry safety: retry non-idempotent send paths only for pre-connect failures and make custom retry predicates strict, preventing ambiguous reconnect retries from sending duplicate messages. Landed from contributor PR #34238 by @hal-crackbot. Thanks @hal-crackbot. ## 2026.3.2 diff --git a/src/infra/retry-policy.test.ts b/src/infra/retry-policy.test.ts new file mode 100644 index 00000000000..76a4415deee --- /dev/null +++ b/src/infra/retry-policy.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it, vi } from "vitest"; +import { createTelegramRetryRunner } from "./retry-policy.js"; + +describe("createTelegramRetryRunner", () => { + describe("strictShouldRetry", () => { + it("without strictShouldRetry: ECONNRESET is retried via regex fallback even when predicate returns false", async () => { + const fn = vi + .fn() + .mockRejectedValue(Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" })); + const runner = createTelegramRetryRunner({ + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + shouldRetry: () => false, // predicate says no + // strictShouldRetry not set — regex fallback still applies + }); + await expect(runner(fn, "test")).rejects.toThrow("ECONNRESET"); + // Regex matches "reset" so it retried despite shouldRetry returning false + expect(fn).toHaveBeenCalledTimes(2); + }); + + it("with strictShouldRetry=true: ECONNRESET is NOT retried when predicate returns false", async () => { + const fn = vi + .fn() + .mockRejectedValue(Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" })); + const runner = createTelegramRetryRunner({ + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + shouldRetry: () => false, + strictShouldRetry: true, // predicate is authoritative + }); + await expect(runner(fn, "test")).rejects.toThrow("ECONNRESET"); + // No retry — predicate returned false and regex fallback was suppressed + expect(fn).toHaveBeenCalledTimes(1); + }); + + it("with strictShouldRetry=true: ECONNREFUSED is still retried when predicate returns true", async () => { + const fn = vi + .fn() + .mockRejectedValueOnce(Object.assign(new Error("ECONNREFUSED"), { code: "ECONNREFUSED" })) + .mockResolvedValue("ok"); + const runner = createTelegramRetryRunner({ + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + shouldRetry: (err) => (err as { code?: string }).code === "ECONNREFUSED", + strictShouldRetry: true, + }); + await expect(runner(fn, "test")).resolves.toBe("ok"); + expect(fn).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/src/infra/retry-policy.ts b/src/infra/retry-policy.ts index 78737241e0b..26ecb71e8a0 100644 --- a/src/infra/retry-policy.ts +++ b/src/infra/retry-policy.ts @@ -76,13 +76,23 @@ export function createTelegramRetryRunner(params: { configRetry?: RetryConfig; verbose?: boolean; shouldRetry?: (err: unknown) => boolean; + /** + * When true, the custom shouldRetry predicate is used exclusively — + * the default TELEGRAM_RETRY_RE fallback regex is NOT OR'd in. + * Use this for non-idempotent operations (e.g. sendMessage) where + * the regex fallback would cause duplicate message delivery. + */ + strictShouldRetry?: boolean; }): RetryRunner { const retryConfig = resolveRetryConfig(TELEGRAM_RETRY_DEFAULTS, { ...params.configRetry, ...params.retry, }); const shouldRetry = params.shouldRetry - ? (err: unknown) => params.shouldRetry?.(err) || TELEGRAM_RETRY_RE.test(formatErrorMessage(err)) + ? params.strictShouldRetry + ? params.shouldRetry + : (err: unknown) => + params.shouldRetry?.(err) || TELEGRAM_RETRY_RE.test(formatErrorMessage(err)) : (err: unknown) => TELEGRAM_RETRY_RE.test(formatErrorMessage(err)); return (fn: () => Promise, label?: string) => diff --git a/src/telegram/network-errors.test.ts b/src/telegram/network-errors.test.ts index 4eff7b4da2e..d4572eda9c8 100644 --- a/src/telegram/network-errors.test.ts +++ b/src/telegram/network-errors.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from "vitest"; -import { isRecoverableTelegramNetworkError } from "./network-errors.js"; +import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js"; describe("isRecoverableTelegramNetworkError", () => { it("detects recoverable error codes", () => { @@ -106,3 +106,61 @@ describe("isRecoverableTelegramNetworkError", () => { }); }); }); + +describe("isSafeToRetrySendError", () => { + it("allows retry for ECONNREFUSED (pre-connect, message not sent)", () => { + const err = Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED" }); + expect(isSafeToRetrySendError(err)).toBe(true); + }); + + it("allows retry for ENOTFOUND (DNS failure, message not sent)", () => { + const err = Object.assign(new Error("getaddrinfo ENOTFOUND"), { code: "ENOTFOUND" }); + expect(isSafeToRetrySendError(err)).toBe(true); + }); + + it("allows retry for EAI_AGAIN (transient DNS, message not sent)", () => { + const err = Object.assign(new Error("getaddrinfo EAI_AGAIN"), { code: "EAI_AGAIN" }); + expect(isSafeToRetrySendError(err)).toBe(true); + }); + + it("allows retry for ENETUNREACH (no route to host, message not sent)", () => { + const err = Object.assign(new Error("connect ENETUNREACH"), { code: "ENETUNREACH" }); + expect(isSafeToRetrySendError(err)).toBe(true); + }); + + it("allows retry for EHOSTUNREACH (host unreachable, message not sent)", () => { + const err = Object.assign(new Error("connect EHOSTUNREACH"), { code: "EHOSTUNREACH" }); + expect(isSafeToRetrySendError(err)).toBe(true); + }); + + it("does NOT allow retry for ECONNRESET (message may already be delivered)", () => { + const err = Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }); + expect(isSafeToRetrySendError(err)).toBe(false); + }); + + it("does NOT allow retry for ETIMEDOUT (message may already be delivered)", () => { + const err = Object.assign(new Error("connect ETIMEDOUT"), { code: "ETIMEDOUT" }); + expect(isSafeToRetrySendError(err)).toBe(false); + }); + + it("does NOT allow retry for EPIPE (connection broken mid-transfer, message may be delivered)", () => { + const err = Object.assign(new Error("write EPIPE"), { code: "EPIPE" }); + expect(isSafeToRetrySendError(err)).toBe(false); + }); + + it("does NOT allow retry for UND_ERR_CONNECT_TIMEOUT (ambiguous timing)", () => { + const err = Object.assign(new Error("connect timeout"), { code: "UND_ERR_CONNECT_TIMEOUT" }); + expect(isSafeToRetrySendError(err)).toBe(false); + }); + + it("does NOT allow retry for non-network errors", () => { + expect(isSafeToRetrySendError(new Error("400: Bad Request"))).toBe(false); + expect(isSafeToRetrySendError(null)).toBe(false); + }); + + it("detects pre-connect error nested in cause chain", () => { + const root = Object.assign(new Error("ECONNREFUSED"), { code: "ECONNREFUSED" }); + const wrapped = Object.assign(new Error("fetch failed"), { cause: root }); + expect(isSafeToRetrySendError(wrapped)).toBe(true); + }); +}); diff --git a/src/telegram/network-errors.ts b/src/telegram/network-errors.ts index b670bc48212..9438eddd7c7 100644 --- a/src/telegram/network-errors.ts +++ b/src/telegram/network-errors.ts @@ -24,6 +24,24 @@ const RECOVERABLE_ERROR_CODES = new Set([ "ERR_NETWORK", ]); +/** + * Error codes that are safe to retry for non-idempotent send operations (e.g. sendMessage). + * + * These represent failures that occur *before* the request reaches Telegram's servers, + * meaning the message was definitely not delivered and it is safe to retry. + * + * Contrast with RECOVERABLE_ERROR_CODES which includes codes like ECONNRESET and ETIMEDOUT + * that can fire *after* Telegram has already received and delivered a message — retrying + * those would cause duplicate messages. + */ +const PRE_CONNECT_ERROR_CODES = new Set([ + "ECONNREFUSED", // Server actively refused the connection (never reached Telegram) + "ENOTFOUND", // DNS resolution failed (never sent) + "EAI_AGAIN", // Transient DNS failure (never sent) + "ENETUNREACH", // No route to host (never sent) + "EHOSTUNREACH", // Host unreachable (never sent) +]); + const RECOVERABLE_ERROR_NAMES = new Set([ "AbortError", "TimeoutError", @@ -71,6 +89,36 @@ function getErrorCode(err: unknown): string | undefined { export type TelegramNetworkErrorContext = "polling" | "send" | "webhook" | "unknown"; +/** + * Returns true if the error is safe to retry for a non-idempotent Telegram send operation + * (e.g. sendMessage). Only matches errors that are guaranteed to have occurred *before* + * the request reached Telegram's servers, preventing duplicate message delivery. + * + * Use this instead of isRecoverableTelegramNetworkError for sendMessage/sendPhoto/etc. + * calls where a retry would create a duplicate visible message. + */ +export function isSafeToRetrySendError(err: unknown): boolean { + if (!err) { + return false; + } + for (const candidate of collectErrorGraphCandidates(err, (current) => { + const nested: Array = [current.cause, current.reason]; + if (Array.isArray(current.errors)) { + nested.push(...current.errors); + } + if (readErrorName(current) === "HttpError") { + nested.push(current.error); + } + return nested; + })) { + const code = normalizeCode(getErrorCode(candidate)); + if (code && PRE_CONNECT_ERROR_CODES.has(code)) { + return true; + } + } + return false; +} + export function isRecoverableTelegramNetworkError( err: unknown, options: { context?: TelegramNetworkErrorContext; allowMessageMatch?: boolean } = {}, diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 88b9df19dd7..83d550b7498 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -27,7 +27,7 @@ import type { TelegramInlineButtons } from "./button-types.js"; import { splitTelegramCaption } from "./caption.js"; import { resolveTelegramFetch } from "./fetch.js"; import { renderTelegramHtmlText } from "./format.js"; -import { isRecoverableTelegramNetworkError } from "./network-errors.js"; +import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js"; import { makeProxyFetch } from "./proxy.js"; import { recordSentMessage } from "./sent-message-cache.js"; import { maybePersistResolvedTelegramTarget } from "./target-writeback.js"; @@ -349,6 +349,8 @@ function createTelegramRequestWithDiag(params: { retry?: RetryConfig; verbose?: boolean; shouldRetry?: (err: unknown) => boolean; + /** When true, the shouldRetry predicate is used exclusively without the TELEGRAM_RETRY_RE fallback. */ + strictShouldRetry?: boolean; useApiErrorLogging?: boolean; }): TelegramRequestWithDiag { const request = createTelegramRetryRunner({ @@ -356,6 +358,7 @@ function createTelegramRequestWithDiag(params: { configRetry: params.account.config.retry, verbose: params.verbose, ...(params.shouldRetry ? { shouldRetry: params.shouldRetry } : {}), + ...(params.strictShouldRetry ? { strictShouldRetry: true } : {}), }); const logHttpError = createTelegramHttpLogger(params.cfg); return ( @@ -491,7 +494,8 @@ export async function sendMessageTelegram( account, retry: opts.retry, verbose: opts.verbose, - shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), + shouldRetry: (err) => isSafeToRetrySendError(err), + strictShouldRetry: true, }); const requestWithChatNotFound = createRequestWithChatNotFound({ requestWithDiag, @@ -1098,7 +1102,8 @@ export async function sendPollTelegram( account, retry: opts.retry, verbose: opts.verbose, - shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), + shouldRetry: (err) => isSafeToRetrySendError(err), + strictShouldRetry: true, }); const requestWithChatNotFound = createRequestWithChatNotFound({ requestWithDiag, @@ -1217,7 +1222,8 @@ export async function createForumTopicTelegram( retry: opts.retry, configRetry: account.config.retry, verbose: opts.verbose, - shouldRetry: (err) => isRecoverableTelegramNetworkError(err, { context: "send" }), + shouldRetry: (err) => isSafeToRetrySendError(err), + strictShouldRetry: true, }); const logHttpError = createTelegramHttpLogger(cfg); const requestWithDiag = (fn: () => Promise, label?: string) =>