From ff04e24eadefe5ef417d57d19250fe03e8f9cec8 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 10 Jun 2026 23:57:16 +0530 Subject: [PATCH] fix(telegram): retry transient draft preview failures instead of killing the stream --- extensions/telegram/src/draft-stream.test.ts | 112 ++++++++++++++++++- extensions/telegram/src/draft-stream.ts | 57 +++++++++- 2 files changed, 165 insertions(+), 4 deletions(-) diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index 7e922c4d4e0..3531cedbf86 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -378,11 +378,20 @@ describe("createTelegramDraftStream", () => { expect(stream.sendMayHaveLanded?.()).toBe(expected); } - it("clears sendMayHaveLanded on pre-connect first preview send failures", async () => { - await expectSendMayHaveLandedStateAfterFirstFailure( + it("retries pre-connect first preview send failures instead of stopping", async () => { + const api = createMockDraftApi(); + api.sendMessage.mockRejectedValueOnce( Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED" }), - false, ); + const stream = createDraftStream(api); + + stream.update("Hello"); + await stream.flush(); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(stream.sendMayHaveLanded?.()).toBe(false); + expect(stream.messageId()).toBe(17); }); it("clears sendMayHaveLanded on Telegram 4xx client rejections", async () => { @@ -392,6 +401,103 @@ describe("createTelegramDraftStream", () => { ); }); + it("treats message-is-not-modified edits as delivered", async () => { + const api = createMockDraftApi(); + api.editMessageText.mockRejectedValueOnce( + Object.assign( + new Error("Call to 'editMessageText' failed! (400: Bad Request: message is not modified)"), + { error_code: 400 }, + ), + ); + const warn = vi.fn(); + const stream = createDraftStream(api, { warn }); + + stream.update("Hello"); + await stream.flush(); + stream.update("Hello again"); + await stream.flush(); + stream.update("Hello more"); + await stream.flush(); + + expect(api.editMessageText).toHaveBeenCalledTimes(2); + expect(api.editMessageText).toHaveBeenLastCalledWith(123, 17, "Hello more"); + expect(warn).not.toHaveBeenCalled(); + }); + + it("retries the preview edit after a transient network failure", async () => { + const api = createMockDraftApi(); + api.editMessageText.mockRejectedValueOnce( + Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }), + ); + const warn = vi.fn(); + const stream = createDraftStream(api, { warn }); + + stream.update("Hello"); + await stream.flush(); + stream.update("Hello again"); + await stream.flush(); + expect(warn).toHaveBeenCalledWith( + "telegram stream preview edit failed (retrying): read ECONNRESET", + ); + + await stream.flush(); + + expect(api.editMessageText).toHaveBeenCalledTimes(2); + expect(api.editMessageText).toHaveBeenLastCalledWith(123, 17, "Hello again"); + expect(stream.lastDeliveredText?.()).toBe("Hello again"); + }); + + it("suspends preview edits for retry_after during flood control", async () => { + vi.useFakeTimers(); + try { + const api = createMockDraftApi(); + api.editMessageText.mockRejectedValueOnce( + Object.assign( + new Error("Call to 'editMessageText' failed! (429: Too Many Requests: retry after 1)"), + { error_code: 429, parameters: { retry_after: 1 } }, + ), + ); + const stream = createDraftStream(api); + + stream.update("Hello"); + await stream.flush(); + stream.update("Hello again"); + await stream.flush(); + stream.update("Hello more"); + await stream.flush(); + expect(api.editMessageText).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(1100); + await stream.flush(); + + expect(api.editMessageText).toHaveBeenCalledTimes(2); + expect(api.editMessageText).toHaveBeenLastCalledWith(123, 17, "Hello more"); + } finally { + vi.useRealTimers(); + } + }); + + it("stops the preview after repeated retryable edit failures", async () => { + const api = createMockDraftApi(); + api.editMessageText.mockRejectedValue( + Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }), + ); + const warn = vi.fn(); + const stream = createDraftStream(api, { warn }); + + stream.update("Hello"); + await stream.flush(); + stream.update("Hello again"); + await stream.flush(); + await stream.flush(); + await stream.flush(); + await stream.flush(); + await stream.flush(); + + expect(api.editMessageText).toHaveBeenCalledTimes(4); + expect(warn).toHaveBeenCalledWith("telegram stream preview failed: read ECONNRESET"); + }); + it("supports rendered previews with parse_mode", async () => { const api = createMockDraftApi(); const stream = createTelegramDraftStream({ diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index dd0effe3e59..5ffcb698728 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -6,11 +6,25 @@ import { } from "openclaw/plugin-sdk/channel-outbound"; import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; -import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js"; +import { + isRecoverableTelegramNetworkError, + isSafeToRetrySendError, + isTelegramClientRejection, + isTelegramMessageNotModifiedError, + isTelegramRateLimitError, + readTelegramRetryAfterMs, +} from "./network-errors.js"; import { normalizeTelegramReplyToMessageId } from "./outbound-params.js"; const TELEGRAM_STREAM_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 1000; +// Retryable preview failures keep the latest text pending for the next throttle +// tick; cap consecutive misses so a persistent outage stops the preview instead +// of warn-spamming for the rest of the run. +const MAX_CONSECUTIVE_PREVIEW_FAILURES = 3; +// Flood waits beyond this freeze the preview longer than it is useful; clamp so +// a large retry_after cannot park the suspension past the run's lifetime. +const MAX_PREVIEW_FLOOD_SUSPEND_MS = 60_000; export type TelegramDraftStream = { update: (text: string) => void; @@ -109,6 +123,8 @@ export function createTelegramDraftStream(params: { const streamState = { stopped: false, final: false }; let messageSendAttempted = false; + let suspendedUntilMs = 0; + let consecutivePreviewFailures = 0; let streamMessageId: number | undefined; let streamVisibleSinceMs: number | undefined; let lastSentText = ""; @@ -198,6 +214,12 @@ export function createTelegramDraftStream(params: { if (streamState.stopped && !streamState.final) { return false; } + // Flood-control suspension: returning false keeps the newest text pending, + // so the first tick after retry_after delivers it. Final flushes still try + // so the last text has a chance to land. + if (!streamState.final && Date.now() < suspendedUntilMs) { + return false; + } const trimmed = text.trimEnd(); if (!trimmed) { return false; @@ -262,6 +284,8 @@ export function createTelegramDraftStream(params: { } } + const previousSentText = lastSentText; + const previousSentParseMode = lastSentParseMode; lastSentText = renderedText; lastSentParseMode = renderedParseMode; try { @@ -273,9 +297,40 @@ export function createTelegramDraftStream(params: { if (sent) { previewRevision += 1; lastDeliveredText = trimmed; + consecutivePreviewFailures = 0; + suspendedUntilMs = 0; } return sent; } catch (err) { + const isEdit = typeof streamMessageId === "number"; + if (isEdit && isTelegramMessageNotModifiedError(err)) { + // Telegram already shows exactly this text; count the edit as delivered. + consecutivePreviewFailures = 0; + lastDeliveredText = trimmed; + return true; + } + // Roll back the dedupe snapshot so the retried tick is not skipped as a no-op. + lastSentText = previousSentText; + lastSentParseMode = previousSentParseMode; + // Flood control is always retryable: Telegram rejected the call outright. + // Beyond that, edits retry on any transient network error (re-editing the + // same content is idempotent) while an unsent first preview retries only + // on provably pre-connect failures — anything ambiguous could duplicate + // the preview message. + const retryable = + isTelegramRateLimitError(err) || + (isEdit ? isRecoverableTelegramNetworkError(err) : isSafeToRetrySendError(err)); + consecutivePreviewFailures += 1; + if (retryable && consecutivePreviewFailures <= MAX_CONSECUTIVE_PREVIEW_FAILURES) { + const retryAfterMs = readTelegramRetryAfterMs(err); + if (retryAfterMs !== undefined) { + suspendedUntilMs = Date.now() + Math.min(retryAfterMs, MAX_PREVIEW_FLOOD_SUSPEND_MS); + } + params.warn?.( + `telegram stream preview ${isEdit ? "edit" : "send"} failed (retrying): ${formatErrorMessage(err)}`, + ); + return false; + } streamState.stopped = true; params.warn?.(`telegram stream preview failed: ${formatErrorMessage(err)}`); return false;