diff --git a/CHANGELOG.md b/CHANGELOG.md index e0a763c1418..849b72ed9fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ Docs: https://docs.openclaw.ai - Commands/config writes: enforce `configWrites` against both the originating account and the targeted account scope for `/config` and config-backed `/allowlist` edits, blocking sibling-account mutations while preserving gateway `operator.admin` flows. Thanks @tdjackey for reporting. - Security/system.run: fail closed for approval-backed interpreter/runtime commands when OpenClaw cannot bind exactly one concrete local file operand, while extending best-effort direct-file binding to additional runtime forms. Thanks @tdjackey for reporting. - Gateway/session reset auth: split conversation `/new` and `/reset` handling away from the admin-only `sessions.reset` control-plane RPC so write-scoped gateway callers can no longer reach the privileged reset path through `agent`. Thanks @tdjackey for reporting. +- Telegram/final preview delivery followup: keep ambiguous first preview sends without a returned `message_id` instead of falling back to a second final send, so slow-provider Telegram replies stop duplicating on the first preview-final seam. (#41932) thanks @hougangdev. ## 2026.3.8 diff --git a/src/telegram/draft-stream.test-helpers.ts b/src/telegram/draft-stream.test-helpers.ts index 0a6073309c7..b68a3c226b1 100644 --- a/src/telegram/draft-stream.test-helpers.ts +++ b/src/telegram/draft-stream.test-helpers.ts @@ -13,6 +13,7 @@ export type TestDraftStream = { stop: ReturnType Promise>>; materialize: ReturnType Promise>>; forceNewMessage: ReturnType void>>; + sendMayHaveLanded: ReturnType boolean>>; setMessageId: (value: number | undefined) => void; }; @@ -47,6 +48,7 @@ export function createTestDraftStream(params?: { messageId = undefined; } }), + sendMayHaveLanded: vi.fn().mockReturnValue(false), setMessageId: (value: number | undefined) => { messageId = value; }, @@ -77,6 +79,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft forceNewMessage: vi.fn().mockImplementation(() => { activeMessageId = undefined; }), + sendMayHaveLanded: vi.fn().mockReturnValue(false), setMessageId: (value: number | undefined) => { activeMessageId = value; }, diff --git a/src/telegram/draft-stream.test.ts b/src/telegram/draft-stream.test.ts index fc65dd6b82a..58990c41abf 100644 --- a/src/telegram/draft-stream.test.ts +++ b/src/telegram/draft-stream.test.ts @@ -435,6 +435,46 @@ describe("createTelegramDraftStream", () => { expect(api.editMessageText).not.toHaveBeenCalledWith(123, 17, "Message B partial"); }); + it("marks sendMayHaveLanded after an ambiguous first preview send failure", async () => { + const api = createMockDraftApi(); + api.sendMessage.mockRejectedValueOnce(new Error("timeout after Telegram accepted send")); + const stream = createDraftStream(api); + + stream.update("Hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(1); + expect(stream.sendMayHaveLanded?.()).toBe(true); + }); + + it("clears sendMayHaveLanded on pre-connect first preview send failures", async () => { + const api = createMockDraftApi(); + api.sendMessage.mockRejectedValueOnce( + Object.assign(new Error("connect ECONNREFUSED"), { code: "ECONNREFUSED" }), + ); + const stream = createDraftStream(api); + + stream.update("Hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(1); + expect(stream.sendMayHaveLanded?.()).toBe(false); + }); + + it("clears sendMayHaveLanded on Telegram 4xx client rejections", async () => { + const api = createMockDraftApi(); + api.sendMessage.mockRejectedValueOnce( + Object.assign(new Error("403: Forbidden"), { error_code: 403 }), + ); + const stream = createDraftStream(api); + + stream.update("Hello"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(1); + expect(stream.sendMayHaveLanded?.()).toBe(false); + }); + it("supports rendered previews with parse_mode", async () => { const api = createMockDraftApi(); const stream = createTelegramDraftStream({ diff --git a/src/telegram/draft-stream.ts b/src/telegram/draft-stream.ts index f7f03db48f6..ddb0595312b 100644 --- a/src/telegram/draft-stream.ts +++ b/src/telegram/draft-stream.ts @@ -1,6 +1,7 @@ import type { Bot } from "grammy"; import { createFinalizableDraftLifecycle } from "../channels/draft-stream-controls.js"; import { buildTelegramThreadParams, type TelegramThreadSpec } from "./bot/helpers.js"; +import { isSafeToRetrySendError, isTelegramClientRejection } from "./network-errors.js"; const TELEGRAM_STREAM_MAX_CHARS = 4096; const DEFAULT_THROTTLE_MS = 1000; @@ -66,6 +67,8 @@ export type TelegramDraftStream = { materialize?: () => Promise; /** Reset internal state so the next update creates a new message instead of editing. */ forceNewMessage: () => void; + /** True when a preview sendMessage was attempted but the response was lost. */ + sendMayHaveLanded?: () => boolean; }; type TelegramDraftPreview = { @@ -126,6 +129,7 @@ export function createTelegramDraftStream(params: { } const streamState = { stopped: false, final: false }; + let messageSendAttempted = false; let streamMessageId: number | undefined; let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined; let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message"; @@ -192,12 +196,24 @@ export function createTelegramDraftStream(params: { } return true; } - const { sent } = await sendRenderedMessageWithThreadFallback({ - renderedText, - renderedParseMode, - fallbackWarnMessage: - "telegram stream preview send failed with message_thread_id, retrying without thread", - }); + messageSendAttempted = true; + let sent: Awaited>["sent"]; + try { + ({ sent } = await sendRenderedMessageWithThreadFallback({ + renderedText, + renderedParseMode, + fallbackWarnMessage: + "telegram stream preview send failed with message_thread_id, retrying without thread", + })); + } catch (err) { + // Pre-connect failures (DNS, refused) and explicit Telegram rejections (4xx) + // guarantee the message was never delivered — clear the flag so + // sendMayHaveLanded() doesn't suppress fallback. + if (isSafeToRetrySendError(err) || isTelegramClientRejection(err)) { + messageSendAttempted = false; + } + throw err; + } const sentMessageId = sent?.message_id; if (typeof sentMessageId !== "number" || !Number.isFinite(sentMessageId)) { streamState.stopped = true; @@ -345,6 +361,7 @@ export function createTelegramDraftStream(params: { // Re-open the stream lifecycle for the next assistant segment. streamState.final = false; generation += 1; + messageSendAttempted = false; streamMessageId = undefined; if (previewTransport === "draft") { streamDraftId = allocateTelegramDraftId(); @@ -421,5 +438,6 @@ export function createTelegramDraftStream(params: { stop, materialize, forceNewMessage, + sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number", }; } diff --git a/src/telegram/lane-delivery-text-deliverer.ts b/src/telegram/lane-delivery-text-deliverer.ts index c8eb10a9bb1..d5614057452 100644 --- a/src/telegram/lane-delivery-text-deliverer.ts +++ b/src/telegram/lane-delivery-text-deliverer.ts @@ -1,7 +1,11 @@ import type { ReplyPayload } from "../auto-reply/types.js"; import type { TelegramInlineButtons } from "./button-types.js"; import type { TelegramDraftStream } from "./draft-stream.js"; -import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js"; +import { + isRecoverableTelegramNetworkError, + isSafeToRetrySendError, + isTelegramClientRejection, +} from "./network-errors.js"; const MESSAGE_NOT_MODIFIED_RE = /400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i; @@ -270,10 +274,18 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { params.markDelivered(); return "retained"; } + if (isTelegramClientRejection(err)) { + params.log( + `telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`, + ); + return "fallback"; + } + // Default: ambiguous error — prefer incomplete over duplicate params.log( - `telegram: ${args.laneName} preview final edit rejected by Telegram; falling back to standard send (${String(err)})`, + `telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`, ); - return "fallback"; + params.markDelivered(); + return "retained"; } params.log( `telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`, @@ -353,6 +365,13 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { context, }); if (typeof previewTargetAfterStop.previewMessageId !== "number") { + if (lane.stream?.sendMayHaveLanded?.()) { + params.log( + `telegram: ${laneName} first preview send may have landed despite missing message id; keeping to avoid duplicate`, + ); + params.markDelivered(); + return "retained"; + } return "fallback"; } return finalizePreview(previewTargetAfterStop.previewMessageId, true, false); @@ -367,6 +386,17 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { context, }); if (typeof previewTargetAfterStop.previewMessageId !== "number") { + // Only retain for final delivery when a prior preview is already visible + // to the user — otherwise falling back is safer than silence. For updates, + // always fall back so the caller can attempt sendPayload without stale + // markDelivered() state. + if (context === "final" && lane.hasStreamedMessage && lane.stream?.sendMayHaveLanded?.()) { + params.log( + `telegram: ${laneName} preview send may have landed despite missing message id; keeping to avoid duplicate`, + ); + params.markDelivered(); + return "retained"; + } return "fallback"; } const activePreviewMessageId = lane.stream?.messageId(); diff --git a/src/telegram/lane-delivery.test.ts b/src/telegram/lane-delivery.test.ts index a2dae1f05b9..1243ae4a266 100644 --- a/src/telegram/lane-delivery.test.ts +++ b/src/telegram/lane-delivery.test.ts @@ -174,8 +174,9 @@ describe("createLaneTextDeliverer", () => { ); }); - it("falls back to sendPayload when an existing preview final edit is rejected", async () => { + it("retains preview when an existing preview final edit fails with ambiguous error", async () => { const harness = createHarness({ answerMessageId: 999 }); + // Plain Error with no error_code → ambiguous, prefer incomplete over duplicate harness.editPreview.mockRejectedValue(new Error("500: preview edit failed")); const result = await harness.deliverLaneText({ @@ -185,13 +186,11 @@ describe("createLaneTextDeliverer", () => { infoKind: "final", }); - expect(result).toBe("sent"); + expect(result).toBe("preview-retained"); expect(harness.editPreview).toHaveBeenCalledTimes(1); - expect(harness.sendPayload).toHaveBeenCalledWith( - expect.objectContaining({ text: "Hello final" }), - ); + expect(harness.sendPayload).not.toHaveBeenCalled(); expect(harness.log).toHaveBeenCalledWith( - expect.stringContaining("edit rejected by Telegram; falling back"), + expect.stringContaining("ambiguous error; keeping existing preview to avoid duplicate"), ); }); @@ -433,8 +432,9 @@ describe("createLaneTextDeliverer", () => { // During final delivery, only ambiguous post-connect failures keep the // preview. Definite non-delivery falls back to a real send. - it("falls back on API error during final", async () => { + it("retains preview on ambiguous API error during final", async () => { const harness = createHarness({ answerMessageId: 999 }); + // Plain Error with no error_code → ambiguous, prefer incomplete over duplicate harness.editPreview.mockRejectedValue(new Error("500: Internal Server Error")); const result = await harness.deliverLaneText({ @@ -444,9 +444,9 @@ describe("createLaneTextDeliverer", () => { infoKind: "final", }); - expect(result).toBe("sent"); + expect(result).toBe("preview-retained"); expect(harness.editPreview).toHaveBeenCalledTimes(1); - expect(harness.sendPayload).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).not.toHaveBeenCalled(); }); it("falls back when an archived preview edit target is missing and no alternate preview exists", async () => { @@ -497,6 +497,94 @@ describe("createLaneTextDeliverer", () => { ); }); + it("falls back on 4xx client rejection with error_code during final", async () => { + const harness = createHarness({ answerMessageId: 999 }); + const err = Object.assign(new Error("403: Forbidden"), { error_code: 403 }); + harness.editPreview.mockRejectedValue(err); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello final", + payload: { text: "Hello final" }, + infoKind: "final", + }); + + expect(result).toBe("sent"); + expect(harness.editPreview).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: "Hello final" }), + ); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("rejected by Telegram (client error); falling back"), + ); + }); + + it("retains preview on 502 with error_code during final (ambiguous server error)", async () => { + const harness = createHarness({ answerMessageId: 999 }); + const err = Object.assign(new Error("502: Bad Gateway"), { error_code: 502 }); + harness.editPreview.mockRejectedValue(err); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello final", + payload: { text: "Hello final" }, + infoKind: "final", + }); + + expect(result).toBe("preview-retained"); + expect(harness.sendPayload).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("ambiguous error; keeping existing preview to avoid duplicate"), + ); + }); + + it("retains when the first preview send may have landed without a message id", async () => { + const stream = createTestDraftStream(); + stream.sendMayHaveLanded.mockReturnValue(true); + const harness = createHarness({ answerStream: stream }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello final", + payload: { text: "Hello final" }, + infoKind: "final", + }); + + expect(result).toBe("preview-retained"); + expect(harness.sendPayload).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("first preview send may have landed despite missing message id"), + ); + }); + + it("retains when sendMayHaveLanded is true and a prior preview was visible", async () => { + // Stream has a messageId (visible preview) but loses it after stop + const stream = createTestDraftStream({ messageId: 999 }); + stream.sendMayHaveLanded.mockReturnValue(true); + const harness = createHarness({ + answerStream: stream, + answerHasStreamedMessage: true, + }); + // Simulate messageId lost after stop (e.g. forceNewMessage or timeout) + harness.stopDraftLane.mockImplementation(async (lane: DraftLaneState) => { + stream.setMessageId(undefined); + await lane.stream?.stop(); + }); + + const result = await harness.deliverLaneText({ + laneName: "answer", + text: "Hello final", + payload: { text: "Hello final" }, + infoKind: "final", + }); + + expect(result).toBe("preview-retained"); + expect(harness.sendPayload).not.toHaveBeenCalled(); + expect(harness.log).toHaveBeenCalledWith( + expect.stringContaining("preview send may have landed despite missing message id"), + ); + }); + it("deletes consumed boundary previews after fallback final send", async () => { const harness = createHarness(); harness.archivedAnswerPreviews.push({ diff --git a/src/telegram/network-errors.test.ts b/src/telegram/network-errors.test.ts index d4572eda9c8..6624b8f63a0 100644 --- a/src/telegram/network-errors.test.ts +++ b/src/telegram/network-errors.test.ts @@ -1,5 +1,10 @@ import { describe, expect, it } from "vitest"; -import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js"; +import { + isRecoverableTelegramNetworkError, + isSafeToRetrySendError, + isTelegramClientRejection, + isTelegramServerError, +} from "./network-errors.js"; describe("isRecoverableTelegramNetworkError", () => { it("detects recoverable error codes", () => { @@ -164,3 +169,51 @@ describe("isSafeToRetrySendError", () => { expect(isSafeToRetrySendError(wrapped)).toBe(true); }); }); + +describe("isTelegramServerError", () => { + it("returns true for error_code 500", () => { + const err = Object.assign(new Error("Internal Server Error"), { error_code: 500 }); + expect(isTelegramServerError(err)).toBe(true); + }); + + it("returns true for error_code 502", () => { + const err = Object.assign(new Error("Bad Gateway"), { error_code: 502 }); + expect(isTelegramServerError(err)).toBe(true); + }); + + it("returns false for error_code 403", () => { + const err = Object.assign(new Error("Forbidden"), { error_code: 403 }); + expect(isTelegramServerError(err)).toBe(false); + }); + + it("returns false for plain Error", () => { + expect(isTelegramServerError(new Error("500: Internal Server Error"))).toBe(false); + }); +}); + +describe("isTelegramClientRejection", () => { + it("returns true for error_code 400", () => { + const err = Object.assign(new Error("Bad Request"), { error_code: 400 }); + expect(isTelegramClientRejection(err)).toBe(true); + }); + + it("returns true for error_code 403", () => { + const err = Object.assign(new Error("Forbidden"), { error_code: 403 }); + expect(isTelegramClientRejection(err)).toBe(true); + }); + + it("returns false for error_code 502", () => { + const err = Object.assign(new Error("Bad Gateway"), { error_code: 502 }); + expect(isTelegramClientRejection(err)).toBe(false); + }); + + it("returns false for plain Error", () => { + expect(isTelegramClientRejection(new Error("400: Bad Request"))).toBe(false); + }); + + it("detects error_code in nested cause", () => { + const inner = Object.assign(new Error("Forbidden"), { error_code: 403 }); + const outer = Object.assign(new Error("wrapped"), { cause: inner }); + expect(isTelegramClientRejection(outer)).toBe(true); + }); +}); diff --git a/src/telegram/network-errors.ts b/src/telegram/network-errors.ts index bf5aa9cbcbe..66da37c4dd4 100644 --- a/src/telegram/network-errors.ts +++ b/src/telegram/network-errors.ts @@ -123,6 +123,29 @@ export function isSafeToRetrySendError(err: unknown): boolean { return false; } +function hasTelegramErrorCode(err: unknown, matches: (code: number) => boolean): boolean { + for (const candidate of collectTelegramErrorCandidates(err)) { + if (!candidate || typeof candidate !== "object" || !("error_code" in candidate)) { + continue; + } + const code = (candidate as { error_code: unknown }).error_code; + if (typeof code === "number" && matches(code)) { + return true; + } + } + return false; +} + +/** Returns true for HTTP 5xx server errors (error may have been processed). */ +export function isTelegramServerError(err: unknown): boolean { + return hasTelegramErrorCode(err, (code) => code >= 500); +} + +/** Returns true for HTTP 4xx client errors (Telegram explicitly rejected, not applied). */ +export function isTelegramClientRejection(err: unknown): boolean { + return hasTelegramErrorCode(err, (code) => code >= 400 && code < 500); +} + export function isRecoverableTelegramNetworkError( err: unknown, options: { context?: TelegramNetworkErrorContext; allowMessageMatch?: boolean } = {}, diff --git a/src/telegram/send.test.ts b/src/telegram/send.test.ts index a00d1b2e89e..2bd6556ee42 100644 --- a/src/telegram/send.test.ts +++ b/src/telegram/send.test.ts @@ -1734,6 +1734,22 @@ describe("editMessageTelegram", () => { expect(botApi.editMessageText).toHaveBeenCalledTimes(1); }); + it("retries editMessageTelegram on Telegram 5xx errors", async () => { + botApi.editMessageText + .mockRejectedValueOnce(Object.assign(new Error("502: Bad Gateway"), { error_code: 502 })) + .mockResolvedValueOnce({ message_id: 1, chat: { id: "123" } }); + + await expect( + editMessageTelegram("123", 1, "hi", { + token: "tok", + cfg: {}, + retry: { attempts: 2, minDelayMs: 0, maxDelayMs: 0, jitter: 0 }, + }), + ).resolves.toEqual({ ok: true, messageId: "1", chatId: "123" }); + + expect(botApi.editMessageText).toHaveBeenCalledTimes(2); + }); + it("disables link previews when linkPreview is false", async () => { botApi.editMessageText.mockResolvedValue({ message_id: 1, chat: { id: "123" } }); diff --git a/src/telegram/send.ts b/src/telegram/send.ts index 44e18ee2340..5261887779f 100644 --- a/src/telegram/send.ts +++ b/src/telegram/send.ts @@ -27,7 +27,11 @@ import type { TelegramInlineButtons } from "./button-types.js"; import { splitTelegramCaption } from "./caption.js"; import { resolveTelegramFetch } from "./fetch.js"; import { renderTelegramHtmlText, splitTelegramHtmlChunks } from "./format.js"; -import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js"; +import { + isRecoverableTelegramNetworkError, + isSafeToRetrySendError, + isTelegramServerError, +} from "./network-errors.js"; import { makeProxyFetch } from "./proxy.js"; import { recordSentMessage } from "./sent-message-cache.js"; import { maybePersistResolvedTelegramTarget } from "./target-writeback.js"; @@ -1150,6 +1154,9 @@ export async function editMessageTelegram( account, retry: opts.retry, verbose: opts.verbose, + shouldRetry: (err) => + isRecoverableTelegramNetworkError(err, { allowMessageMatch: true }) || + isTelegramServerError(err), }); const requestWithEditShouldLog = ( fn: () => Promise,