fix(telegram): retry transient draft preview failures instead of killing the stream

This commit is contained in:
Ayaan Zaidi
2026-06-10 23:57:16 +05:30
parent a956ab8481
commit ff04e24ead
2 changed files with 165 additions and 4 deletions

View File

@@ -378,11 +378,20 @@ describe("createTelegramDraftStream", () => {
expect(stream.sendMayHaveLanded?.()).toBe(expected);
}
it("clears sendMayHaveLanded on pre-connect first preview send failures", async () => {
await expectSendMayHaveLandedStateAfterFirstFailure(
it("retries pre-connect first preview send failures instead of stopping", async () => {
const api = createMockDraftApi();
api.sendMessage.mockRejectedValueOnce(
Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED" }),
false,
);
const stream = createDraftStream(api);
stream.update("Hello");
await stream.flush();
await stream.flush();
expect(api.sendMessage).toHaveBeenCalledTimes(2);
expect(stream.sendMayHaveLanded?.()).toBe(false);
expect(stream.messageId()).toBe(17);
});
it("clears sendMayHaveLanded on Telegram 4xx client rejections", async () => {
@@ -392,6 +401,103 @@ describe("createTelegramDraftStream", () => {
);
});
it("treats message-is-not-modified edits as delivered", async () => {
const api = createMockDraftApi();
api.editMessageText.mockRejectedValueOnce(
Object.assign(
new Error("Call to 'editMessageText' failed! (400: Bad Request: message is not modified)"),
{ error_code: 400 },
),
);
const warn = vi.fn();
const stream = createDraftStream(api, { warn });
stream.update("Hello");
await stream.flush();
stream.update("Hello again");
await stream.flush();
stream.update("Hello more");
await stream.flush();
expect(api.editMessageText).toHaveBeenCalledTimes(2);
expect(api.editMessageText).toHaveBeenLastCalledWith(123, 17, "Hello more");
expect(warn).not.toHaveBeenCalled();
});
it("retries the preview edit after a transient network failure", async () => {
const api = createMockDraftApi();
api.editMessageText.mockRejectedValueOnce(
Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }),
);
const warn = vi.fn();
const stream = createDraftStream(api, { warn });
stream.update("Hello");
await stream.flush();
stream.update("Hello again");
await stream.flush();
expect(warn).toHaveBeenCalledWith(
"telegram stream preview edit failed (retrying): read ECONNRESET",
);
await stream.flush();
expect(api.editMessageText).toHaveBeenCalledTimes(2);
expect(api.editMessageText).toHaveBeenLastCalledWith(123, 17, "Hello again");
expect(stream.lastDeliveredText?.()).toBe("Hello again");
});
it("suspends preview edits for retry_after during flood control", async () => {
vi.useFakeTimers();
try {
const api = createMockDraftApi();
api.editMessageText.mockRejectedValueOnce(
Object.assign(
new Error("Call to 'editMessageText' failed! (429: Too Many Requests: retry after 1)"),
{ error_code: 429, parameters: { retry_after: 1 } },
),
);
const stream = createDraftStream(api);
stream.update("Hello");
await stream.flush();
stream.update("Hello again");
await stream.flush();
stream.update("Hello more");
await stream.flush();
expect(api.editMessageText).toHaveBeenCalledTimes(1);
await vi.advanceTimersByTimeAsync(1100);
await stream.flush();
expect(api.editMessageText).toHaveBeenCalledTimes(2);
expect(api.editMessageText).toHaveBeenLastCalledWith(123, 17, "Hello more");
} finally {
vi.useRealTimers();
}
});
it("stops the preview after repeated retryable edit failures", async () => {
const api = createMockDraftApi();
api.editMessageText.mockRejectedValue(
Object.assign(new Error("read ECONNRESET"), { code: "ECONNRESET" }),
);
const warn = vi.fn();
const stream = createDraftStream(api, { warn });
stream.update("Hello");
await stream.flush();
stream.update("Hello again");
await stream.flush();
await stream.flush();
await stream.flush();
await stream.flush();
await stream.flush();
expect(api.editMessageText).toHaveBeenCalledTimes(4);
expect(warn).toHaveBeenCalledWith("telegram stream preview failed: read ECONNRESET");
});
it("supports rendered previews with parse_mode", async () => {
const api = createMockDraftApi();
const stream = createTelegramDraftStream({

View File

@@ -6,11 +6,25 @@ import {
} from "openclaw/plugin-sdk/channel-outbound";
import { formatErrorMessage } from "openclaw/plugin-sdk/error-runtime";
import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js";
import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js";
import {
isRecoverableTelegramNetworkError,
isSafeToRetrySendError,
isTelegramClientRejection,
isTelegramMessageNotModifiedError,
isTelegramRateLimitError,
readTelegramRetryAfterMs,
} from "./network-errors.js";
import { normalizeTelegramReplyToMessageId } from "./outbound-params.js";
const TELEGRAM_STREAM_MAX_CHARS = 4096;
const DEFAULT_THROTTLE_MS = 1000;
// Retryable preview failures keep the latest text pending for the next throttle
// tick; cap consecutive misses so a persistent outage stops the preview instead
// of warn-spamming for the rest of the run.
const MAX_CONSECUTIVE_PREVIEW_FAILURES = 3;
// Flood waits beyond this freeze the preview longer than it is useful; clamp so
// a large retry_after cannot park the suspension past the run's lifetime.
const MAX_PREVIEW_FLOOD_SUSPEND_MS = 60_000;
export type TelegramDraftStream = {
update: (text: string) => void;
@@ -109,6 +123,8 @@ export function createTelegramDraftStream(params: {
const streamState = { stopped: false, final: false };
let messageSendAttempted = false;
let suspendedUntilMs = 0;
let consecutivePreviewFailures = 0;
let streamMessageId: number | undefined;
let streamVisibleSinceMs: number | undefined;
let lastSentText = "";
@@ -198,6 +214,12 @@ export function createTelegramDraftStream(params: {
if (streamState.stopped && !streamState.final) {
return false;
}
// Flood-control suspension: returning false keeps the newest text pending,
// so the first tick after retry_after delivers it. Final flushes still try
// so the last text has a chance to land.
if (!streamState.final && Date.now() < suspendedUntilMs) {
return false;
}
const trimmed = text.trimEnd();
if (!trimmed) {
return false;
@@ -262,6 +284,8 @@ export function createTelegramDraftStream(params: {
}
}
const previousSentText = lastSentText;
const previousSentParseMode = lastSentParseMode;
lastSentText = renderedText;
lastSentParseMode = renderedParseMode;
try {
@@ -273,9 +297,40 @@ export function createTelegramDraftStream(params: {
if (sent) {
previewRevision += 1;
lastDeliveredText = trimmed;
consecutivePreviewFailures = 0;
suspendedUntilMs = 0;
}
return sent;
} catch (err) {
const isEdit = typeof streamMessageId === "number";
if (isEdit && isTelegramMessageNotModifiedError(err)) {
// Telegram already shows exactly this text; count the edit as delivered.
consecutivePreviewFailures = 0;
lastDeliveredText = trimmed;
return true;
}
// Roll back the dedupe snapshot so the retried tick is not skipped as a no-op.
lastSentText = previousSentText;
lastSentParseMode = previousSentParseMode;
// Flood control is always retryable: Telegram rejected the call outright.
// Beyond that, edits retry on any transient network error (re-editing the
// same content is idempotent) while an unsent first preview retries only
// on provably pre-connect failures — anything ambiguous could duplicate
// the preview message.
const retryable =
isTelegramRateLimitError(err) ||
(isEdit ? isRecoverableTelegramNetworkError(err) : isSafeToRetrySendError(err));
consecutivePreviewFailures += 1;
if (retryable && consecutivePreviewFailures <= MAX_CONSECUTIVE_PREVIEW_FAILURES) {
const retryAfterMs = readTelegramRetryAfterMs(err);
if (retryAfterMs !== undefined) {
suspendedUntilMs = Date.now() + Math.min(retryAfterMs, MAX_PREVIEW_FLOOD_SUSPEND_MS);
}
params.warn?.(
`telegram stream preview ${isEdit ? "edit" : "send"} failed (retrying): ${formatErrorMessage(err)}`,
);
return false;
}
streamState.stopped = true;
params.warn?.(`telegram stream preview failed: ${formatErrorMessage(err)}`);
return false;