mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(telegram): prevent duplicate messages with slow LLM providers
Retry 5xx in editMessageTelegram, invert ambiguous-error default to retain preview, and track sendMayHaveLanded in draft stream.
This commit is contained in:
@@ -13,6 +13,7 @@ export type TestDraftStream = {
|
||||
stop: ReturnType<typeof vi.fn<() => Promise<void>>>;
|
||||
materialize: ReturnType<typeof vi.fn<() => Promise<number | undefined>>>;
|
||||
forceNewMessage: ReturnType<typeof vi.fn<() => void>>;
|
||||
sendMayHaveLanded: ReturnType<typeof vi.fn<() => boolean>>;
|
||||
setMessageId: (value: number | undefined) => void;
|
||||
};
|
||||
|
||||
@@ -47,6 +48,7 @@ export function createTestDraftStream(params?: {
|
||||
messageId = undefined;
|
||||
}
|
||||
}),
|
||||
sendMayHaveLanded: vi.fn().mockReturnValue(false),
|
||||
setMessageId: (value: number | undefined) => {
|
||||
messageId = value;
|
||||
},
|
||||
@@ -77,6 +79,7 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft
|
||||
forceNewMessage: vi.fn().mockImplementation(() => {
|
||||
activeMessageId = undefined;
|
||||
}),
|
||||
sendMayHaveLanded: vi.fn().mockReturnValue(false),
|
||||
setMessageId: (value: number | undefined) => {
|
||||
activeMessageId = value;
|
||||
},
|
||||
|
||||
@@ -66,6 +66,8 @@ export type TelegramDraftStream = {
|
||||
materialize?: () => Promise<number | undefined>;
|
||||
/** Reset internal state so the next update creates a new message instead of editing. */
|
||||
forceNewMessage: () => void;
|
||||
/** True when a preview sendMessage was attempted but the response was lost. */
|
||||
sendMayHaveLanded?: () => boolean;
|
||||
};
|
||||
|
||||
type TelegramDraftPreview = {
|
||||
@@ -126,6 +128,7 @@ export function createTelegramDraftStream(params: {
|
||||
}
|
||||
|
||||
const streamState = { stopped: false, final: false };
|
||||
let messageSendAttempted = false;
|
||||
let streamMessageId: number | undefined;
|
||||
let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined;
|
||||
let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message";
|
||||
@@ -192,6 +195,7 @@ export function createTelegramDraftStream(params: {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
messageSendAttempted = true;
|
||||
const { sent } = await sendRenderedMessageWithThreadFallback({
|
||||
renderedText,
|
||||
renderedParseMode,
|
||||
@@ -345,6 +349,7 @@ export function createTelegramDraftStream(params: {
|
||||
// Re-open the stream lifecycle for the next assistant segment.
|
||||
streamState.final = false;
|
||||
generation += 1;
|
||||
messageSendAttempted = false;
|
||||
streamMessageId = undefined;
|
||||
if (previewTransport === "draft") {
|
||||
streamDraftId = allocateTelegramDraftId();
|
||||
@@ -421,5 +426,6 @@ export function createTelegramDraftStream(params: {
|
||||
stop,
|
||||
materialize,
|
||||
forceNewMessage,
|
||||
sendMayHaveLanded: () => messageSendAttempted && typeof streamMessageId !== "number",
|
||||
};
|
||||
}
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
import type { ReplyPayload } from "../auto-reply/types.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import type { TelegramDraftStream } from "./draft-stream.js";
|
||||
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
|
||||
import {
|
||||
isRecoverableTelegramNetworkError,
|
||||
isSafeToRetrySendError,
|
||||
isTelegramClientRejection,
|
||||
} from "./network-errors.js";
|
||||
|
||||
const MESSAGE_NOT_MODIFIED_RE =
|
||||
/400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i;
|
||||
@@ -270,10 +274,18 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
params.markDelivered();
|
||||
return "retained";
|
||||
}
|
||||
if (isTelegramClientRejection(err)) {
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview final edit rejected by Telegram (client error); falling back to standard send (${String(err)})`,
|
||||
);
|
||||
return "fallback";
|
||||
}
|
||||
// Default: ambiguous error — prefer incomplete over duplicate
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview final edit rejected by Telegram; falling back to standard send (${String(err)})`,
|
||||
`telegram: ${args.laneName} preview final edit failed with ambiguous error; keeping existing preview to avoid duplicate (${String(err)})`,
|
||||
);
|
||||
return "fallback";
|
||||
params.markDelivered();
|
||||
return "retained";
|
||||
}
|
||||
params.log(
|
||||
`telegram: ${args.laneName} preview ${args.context} edit failed; falling back to standard send (${String(err)})`,
|
||||
@@ -353,6 +365,13 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
context,
|
||||
});
|
||||
if (typeof previewTargetAfterStop.previewMessageId !== "number") {
|
||||
if (lane.stream?.sendMayHaveLanded?.()) {
|
||||
params.log(
|
||||
`telegram: ${laneName} preview send may have landed despite missing message id; keeping to avoid duplicate`,
|
||||
);
|
||||
params.markDelivered();
|
||||
return "retained";
|
||||
}
|
||||
return "fallback";
|
||||
}
|
||||
return finalizePreview(previewTargetAfterStop.previewMessageId, true, false);
|
||||
@@ -367,6 +386,13 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) {
|
||||
context,
|
||||
});
|
||||
if (typeof previewTargetAfterStop.previewMessageId !== "number") {
|
||||
if (lane.stream?.sendMayHaveLanded?.()) {
|
||||
params.log(
|
||||
`telegram: ${laneName} preview send may have landed despite missing message id; keeping to avoid duplicate`,
|
||||
);
|
||||
params.markDelivered();
|
||||
return "retained";
|
||||
}
|
||||
return "fallback";
|
||||
}
|
||||
const activePreviewMessageId = lane.stream?.messageId();
|
||||
|
||||
@@ -174,8 +174,9 @@ describe("createLaneTextDeliverer", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to sendPayload when an existing preview final edit is rejected", async () => {
|
||||
it("retains preview when an existing preview final edit fails with ambiguous error", async () => {
|
||||
const harness = createHarness({ answerMessageId: 999 });
|
||||
// Plain Error with no error_code → ambiguous, prefer incomplete over duplicate
|
||||
harness.editPreview.mockRejectedValue(new Error("500: preview edit failed"));
|
||||
|
||||
const result = await harness.deliverLaneText({
|
||||
@@ -185,13 +186,11 @@ describe("createLaneTextDeliverer", () => {
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("sent");
|
||||
expect(result).toBe("preview-retained");
|
||||
expect(harness.editPreview).toHaveBeenCalledTimes(1);
|
||||
expect(harness.sendPayload).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ text: "Hello final" }),
|
||||
);
|
||||
expect(harness.sendPayload).not.toHaveBeenCalled();
|
||||
expect(harness.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("edit rejected by Telegram; falling back"),
|
||||
expect.stringContaining("ambiguous error; keeping existing preview to avoid duplicate"),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -433,8 +432,9 @@ describe("createLaneTextDeliverer", () => {
|
||||
// During final delivery, only ambiguous post-connect failures keep the
|
||||
// preview. Definite non-delivery falls back to a real send.
|
||||
|
||||
it("falls back on API error during final", async () => {
|
||||
it("retains preview on ambiguous API error during final", async () => {
|
||||
const harness = createHarness({ answerMessageId: 999 });
|
||||
// Plain Error with no error_code → ambiguous, prefer incomplete over duplicate
|
||||
harness.editPreview.mockRejectedValue(new Error("500: Internal Server Error"));
|
||||
|
||||
const result = await harness.deliverLaneText({
|
||||
@@ -444,9 +444,9 @@ describe("createLaneTextDeliverer", () => {
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("sent");
|
||||
expect(result).toBe("preview-retained");
|
||||
expect(harness.editPreview).toHaveBeenCalledTimes(1);
|
||||
expect(harness.sendPayload).toHaveBeenCalledTimes(1);
|
||||
expect(harness.sendPayload).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("falls back when an archived preview edit target is missing and no alternate preview exists", async () => {
|
||||
@@ -497,6 +497,67 @@ describe("createLaneTextDeliverer", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back on 4xx client rejection with error_code during final", async () => {
|
||||
const harness = createHarness({ answerMessageId: 999 });
|
||||
const err = Object.assign(new Error("403: Forbidden"), { error_code: 403 });
|
||||
harness.editPreview.mockRejectedValue(err);
|
||||
|
||||
const result = await harness.deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: "Hello final",
|
||||
payload: { text: "Hello final" },
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("sent");
|
||||
expect(harness.editPreview).toHaveBeenCalledTimes(1);
|
||||
expect(harness.sendPayload).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ text: "Hello final" }),
|
||||
);
|
||||
expect(harness.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("rejected by Telegram (client error); falling back"),
|
||||
);
|
||||
});
|
||||
|
||||
it("retains preview on 502 with error_code during final (ambiguous server error)", async () => {
|
||||
const harness = createHarness({ answerMessageId: 999 });
|
||||
const err = Object.assign(new Error("502: Bad Gateway"), { error_code: 502 });
|
||||
harness.editPreview.mockRejectedValue(err);
|
||||
|
||||
const result = await harness.deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: "Hello final",
|
||||
payload: { text: "Hello final" },
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("preview-retained");
|
||||
expect(harness.sendPayload).not.toHaveBeenCalled();
|
||||
expect(harness.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("ambiguous error; keeping existing preview to avoid duplicate"),
|
||||
);
|
||||
});
|
||||
|
||||
it("retains preview when sendMayHaveLanded is true and no messageId", async () => {
|
||||
const stream = createTestDraftStream();
|
||||
stream.sendMayHaveLanded.mockReturnValue(true);
|
||||
// No messageId → resolvePreviewTarget returns undefined
|
||||
const harness = createHarness({ answerStream: stream });
|
||||
|
||||
const result = await harness.deliverLaneText({
|
||||
laneName: "answer",
|
||||
text: "Hello final",
|
||||
payload: { text: "Hello final" },
|
||||
infoKind: "final",
|
||||
});
|
||||
|
||||
expect(result).toBe("preview-retained");
|
||||
expect(harness.sendPayload).not.toHaveBeenCalled();
|
||||
expect(harness.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("preview send may have landed despite missing message id"),
|
||||
);
|
||||
});
|
||||
|
||||
it("deletes consumed boundary previews after fallback final send", async () => {
|
||||
const harness = createHarness();
|
||||
harness.archivedAnswerPreviews.push({
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
|
||||
import {
|
||||
isRecoverableTelegramNetworkError,
|
||||
isSafeToRetrySendError,
|
||||
isTelegramClientRejection,
|
||||
isTelegramServerError,
|
||||
} from "./network-errors.js";
|
||||
|
||||
describe("isRecoverableTelegramNetworkError", () => {
|
||||
it("detects recoverable error codes", () => {
|
||||
@@ -164,3 +169,51 @@ describe("isSafeToRetrySendError", () => {
|
||||
expect(isSafeToRetrySendError(wrapped)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isTelegramServerError", () => {
|
||||
it("returns true for error_code 500", () => {
|
||||
const err = Object.assign(new Error("Internal Server Error"), { error_code: 500 });
|
||||
expect(isTelegramServerError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("returns true for error_code 502", () => {
|
||||
const err = Object.assign(new Error("Bad Gateway"), { error_code: 502 });
|
||||
expect(isTelegramServerError(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false for error_code 403", () => {
|
||||
const err = Object.assign(new Error("Forbidden"), { error_code: 403 });
|
||||
expect(isTelegramServerError(err)).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for plain Error", () => {
|
||||
expect(isTelegramServerError(new Error("500: Internal Server Error"))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("isTelegramClientRejection", () => {
|
||||
it("returns true for error_code 400", () => {
|
||||
const err = Object.assign(new Error("Bad Request"), { error_code: 400 });
|
||||
expect(isTelegramClientRejection(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("returns true for error_code 403", () => {
|
||||
const err = Object.assign(new Error("Forbidden"), { error_code: 403 });
|
||||
expect(isTelegramClientRejection(err)).toBe(true);
|
||||
});
|
||||
|
||||
it("returns false for error_code 502", () => {
|
||||
const err = Object.assign(new Error("Bad Gateway"), { error_code: 502 });
|
||||
expect(isTelegramClientRejection(err)).toBe(false);
|
||||
});
|
||||
|
||||
it("returns false for plain Error", () => {
|
||||
expect(isTelegramClientRejection(new Error("400: Bad Request"))).toBe(false);
|
||||
});
|
||||
|
||||
it("detects error_code in nested cause", () => {
|
||||
const inner = Object.assign(new Error("Forbidden"), { error_code: 403 });
|
||||
const outer = Object.assign(new Error("wrapped"), { cause: inner });
|
||||
expect(isTelegramClientRejection(outer)).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -123,6 +123,32 @@ export function isSafeToRetrySendError(err: unknown): boolean {
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Returns true for HTTP 5xx server errors (error may have been processed). */
|
||||
export function isTelegramServerError(err: unknown): boolean {
|
||||
for (const candidate of collectTelegramErrorCandidates(err)) {
|
||||
if (candidate && typeof candidate === "object" && "error_code" in candidate) {
|
||||
const code = (candidate as { error_code: unknown }).error_code;
|
||||
if (typeof code === "number" && code >= 500) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Returns true for HTTP 4xx client errors (Telegram explicitly rejected, not applied). */
|
||||
export function isTelegramClientRejection(err: unknown): boolean {
|
||||
for (const candidate of collectTelegramErrorCandidates(err)) {
|
||||
if (candidate && typeof candidate === "object" && "error_code" in candidate) {
|
||||
const code = (candidate as { error_code: unknown }).error_code;
|
||||
if (typeof code === "number" && code >= 400 && code < 500) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export function isRecoverableTelegramNetworkError(
|
||||
err: unknown,
|
||||
options: { context?: TelegramNetworkErrorContext; allowMessageMatch?: boolean } = {},
|
||||
|
||||
@@ -27,7 +27,11 @@ import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { splitTelegramCaption } from "./caption.js";
|
||||
import { resolveTelegramFetch } from "./fetch.js";
|
||||
import { renderTelegramHtmlText, splitTelegramHtmlChunks } from "./format.js";
|
||||
import { isRecoverableTelegramNetworkError, isSafeToRetrySendError } from "./network-errors.js";
|
||||
import {
|
||||
isRecoverableTelegramNetworkError,
|
||||
isSafeToRetrySendError,
|
||||
isTelegramServerError,
|
||||
} from "./network-errors.js";
|
||||
import { makeProxyFetch } from "./proxy.js";
|
||||
import { recordSentMessage } from "./sent-message-cache.js";
|
||||
import { maybePersistResolvedTelegramTarget } from "./target-writeback.js";
|
||||
@@ -1150,6 +1154,9 @@ export async function editMessageTelegram(
|
||||
account,
|
||||
retry: opts.retry,
|
||||
verbose: opts.verbose,
|
||||
shouldRetry: (err) =>
|
||||
isRecoverableTelegramNetworkError(err, { allowMessageMatch: true }) ||
|
||||
isTelegramServerError(err),
|
||||
});
|
||||
const requestWithEditShouldLog = <T>(
|
||||
fn: () => Promise<T>,
|
||||
|
||||
Reference in New Issue
Block a user