fix(telegram): retry webhook registration failures

This commit is contained in:
Peter Steinberger
2026-04-28 04:33:03 +01:00
parent 5a2e5446a4
commit b2d102109b
3 changed files with 159 additions and 26 deletions

View File

@@ -442,6 +442,60 @@ describe("startTelegramWebhook", () => {
);
});
it("keeps local listener alive and retries when setWebhook has a recoverable startup failure", async () => {
const runtimeLog = vi.fn();
const runtimeError = vi.fn();
setWebhookSpy.mockRejectedValueOnce(new TypeError("fetch failed")).mockResolvedValueOnce(true);
await withStartedWebhook(
{
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
runtime: { log: runtimeLog, error: runtimeError, exit: vi.fn() },
webhookRegistrationRetryPolicy: {
initialMs: 0,
maxMs: 0,
factor: 1,
jitter: 0,
},
},
async ({ port }) => {
const health = await fetch(`http://127.0.0.1:${port}/healthz`);
expect(health.status).toBe(200);
expect(stopSpy).not.toHaveBeenCalled();
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("telegram setWebhook failed: fetch failed"),
);
await vi.waitFor(() => expect(setWebhookSpy).toHaveBeenCalledTimes(2));
expect(runtimeLog).toHaveBeenCalledWith("telegram setWebhook retry 1 scheduled in 0ms");
expect(runtimeLog).toHaveBeenCalledWith(
expect.stringContaining("webhook advertised to telegram on http://"),
);
},
);
});
it("fails startup when setWebhook has a non-recoverable rejection", async () => {
const runtimeError = vi.fn();
const error = Object.assign(new Error("unauthorized"), { error_code: 401 });
setWebhookSpy.mockRejectedValueOnce(error);
await expect(
startTelegramWebhook({
token: TELEGRAM_TOKEN,
port: 0,
secret: TELEGRAM_SECRET,
path: TELEGRAM_WEBHOOK_PATH,
runtime: { log: vi.fn(), error: runtimeError, exit: vi.fn() },
}),
).rejects.toThrow("unauthorized");
expect(stopSpy).toHaveBeenCalledTimes(1);
expect(runtimeError).toHaveBeenCalledWith(
expect.stringContaining("telegram setWebhook failed: unauthorized"),
);
});
it("registers webhook with certificate when webhookCertPath is provided", async () => {
setWebhookSpy.mockClear();
await withStartedWebhook(

View File

@@ -4,8 +4,13 @@ import net from "node:net";
import * as grammy from "grammy";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { isDiagnosticsEnabled } from "openclaw/plugin-sdk/diagnostic-runtime";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { defaultRuntime } from "openclaw/plugin-sdk/runtime-env";
import type { BackoffPolicy, RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import {
computeBackoff,
defaultRuntime,
formatDurationPrecise,
sleepWithAbort,
} from "openclaw/plugin-sdk/runtime-env";
import { safeEqualSecret } from "openclaw/plugin-sdk/security-runtime";
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
import {
@@ -25,9 +30,20 @@ import { readJsonBodyWithLimit } from "openclaw/plugin-sdk/webhook-request-guard
import { resolveTelegramAllowedUpdates } from "./allowed-updates.js";
import { withTelegramApiErrorLogging } from "./api-logging.js";
import { createTelegramBot } from "./bot.js";
import {
isRecoverableTelegramNetworkError,
isTelegramRateLimitError,
isTelegramServerError,
} from "./network-errors.js";
const TELEGRAM_WEBHOOK_MAX_BODY_BYTES = 1024 * 1024;
const TELEGRAM_WEBHOOK_BODY_TIMEOUT_MS = 30_000;
const TELEGRAM_WEBHOOK_REGISTRATION_RETRY_POLICY: BackoffPolicy = {
initialMs: 5_000,
maxMs: 60_000,
factor: 2,
jitter: 0.2,
};
const InputFileCtor: typeof grammy.InputFile =
typeof grammy.InputFile === "function"
? grammy.InputFile
@@ -245,6 +261,7 @@ export async function startTelegramWebhook(opts: {
healthPath?: string;
publicUrl?: string;
webhookCertPath?: string;
webhookRegistrationRetryPolicy?: BackoffPolicy;
}) {
const path = opts.path ?? "/telegram-webhook";
const healthPath = opts.healthPath ?? "/healthz";
@@ -258,6 +275,8 @@ export async function startTelegramWebhook(opts: {
);
}
const runtime = opts.runtime ?? defaultRuntime;
const webhookRegistrationRetryPolicy =
opts.webhookRegistrationRetryPolicy ?? TELEGRAM_WEBHOOK_REGISTRATION_RETRY_POLICY;
const diagnosticsEnabled = isDiagnosticsEnabled(opts.config);
const bot = createTelegramBot({
token: opts.token,
@@ -398,30 +417,8 @@ export async function startTelegramWebhook(opts: {
port,
});
try {
await withTelegramApiErrorLogging({
operation: "setWebhook",
runtime,
fn: () =>
bot.api.setWebhook(publicUrl, {
secret_token: secret,
allowed_updates: resolveTelegramAllowedUpdates(),
certificate: opts.webhookCertPath ? new InputFileCtor(opts.webhookCertPath) : undefined,
}),
});
} catch (err) {
server.close();
void bot.stop();
if (diagnosticsEnabled) {
stopDiagnosticHeartbeat();
}
throw err;
}
runtime.log?.(`webhook local listener on http://${host}:${boundPort}${path}`);
runtime.log?.(`webhook advertised to telegram on ${publicUrl}`);
let shutDown = false;
let webhookAdvertised = false;
const shutdown = () => {
if (shutDown) {
return;
@@ -440,9 +437,90 @@ export async function startTelegramWebhook(opts: {
stopDiagnosticHeartbeat();
}
};
if (opts.abortSignal) {
if (opts.abortSignal?.aborted) {
shutdown();
} else if (opts.abortSignal) {
opts.abortSignal.addEventListener("abort", shutdown, { once: true });
}
const advertiseWebhook = async (): Promise<void> => {
if (shutDown || opts.abortSignal?.aborted) {
return;
}
await withTelegramApiErrorLogging({
operation: "setWebhook",
runtime,
fn: () =>
bot.api.setWebhook(publicUrl, {
secret_token: secret,
allowed_updates: resolveTelegramAllowedUpdates(),
certificate: opts.webhookCertPath ? new InputFileCtor(opts.webhookCertPath) : undefined,
}),
});
if (shutDown) {
return;
}
webhookAdvertised = true;
runtime.log?.(`webhook advertised to telegram on ${publicUrl}`);
};
const shouldRetryWebhookRegistration = (err: unknown): boolean =>
isRecoverableTelegramNetworkError(err, { context: "webhook" }) ||
isTelegramServerError(err) ||
isTelegramRateLimitError(err);
const retryWebhookRegistration = async (firstAttempt: number): Promise<void> => {
let attempt = firstAttempt;
while (true) {
if (shutDown || opts.abortSignal?.aborted || webhookAdvertised) {
return;
}
const delayMs = computeBackoff(webhookRegistrationRetryPolicy, attempt);
runtime.log?.(
`telegram setWebhook retry ${attempt} scheduled in ${formatDurationPrecise(delayMs)}`,
);
try {
await sleepWithAbort(delayMs, opts.abortSignal);
} catch {
return;
}
if (shutDown || opts.abortSignal?.aborted || webhookAdvertised) {
return;
}
try {
await advertiseWebhook();
return;
} catch (err) {
if (!shouldRetryWebhookRegistration(err)) {
runtime.error?.(
`telegram setWebhook retry stopped after non-recoverable error: ${formatErrorMessage(err)}`,
);
return;
}
}
attempt += 1;
}
};
const closeAfterStartupFailure = () => {
shutDown = true;
server.close();
void bot.stop();
if (diagnosticsEnabled) {
stopDiagnosticHeartbeat();
}
};
runtime.log?.(`webhook local listener on http://${host}:${boundPort}${path}`);
if (!shutDown) {
try {
await advertiseWebhook();
} catch (err) {
if (!shouldRetryWebhookRegistration(err)) {
closeAfterStartupFailure();
throw err;
}
void retryWebhookRegistration(1);
}
}
return { server, bot, stop: shutdown };
}