mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 00:50:22 +00:00
perf(core): trim provider and inbound startup imports (#51927)
* fix(telegram): fail fast on stuck getUpdates * perf(core): trim provider and inbound startup imports
This commit is contained in:
@@ -70,6 +70,26 @@ describe("createTelegramBot fetch abort", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("aborts wrapped getUpdates fetch after the hard polling timeout", async () => {
|
||||
vi.useFakeTimers();
|
||||
const fetchSpy = vi.fn(
|
||||
(_input: RequestInfo | URL, init?: RequestInit) =>
|
||||
new Promise<AbortSignal>((resolve) => {
|
||||
const signal = init?.signal as AbortSignal;
|
||||
signal.addEventListener("abort", () => resolve(signal), { once: true });
|
||||
}),
|
||||
);
|
||||
const { clientFetch } = createWrappedTelegramClientFetch(fetchSpy as unknown as typeof fetch);
|
||||
|
||||
const observedSignalPromise = clientFetch("https://api.telegram.org/bot123456:ABC/getUpdates");
|
||||
await vi.advanceTimersByTimeAsync(45_000);
|
||||
const observedSignal = (await observedSignalPromise) as AbortSignal;
|
||||
|
||||
expect(observedSignal).toBeInstanceOf(AbortSignal);
|
||||
expect(observedSignal.aborted).toBe(true);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("preserves the original fetch error when tagging cannot attach metadata", async () => {
|
||||
const frozenError = Object.freeze(
|
||||
Object.assign(new TypeError("fetch failed"), {
|
||||
|
||||
@@ -81,6 +81,8 @@ const DEFAULT_TELEGRAM_BOT_RUNTIME: TelegramBotRuntime = {
|
||||
apiThrottler,
|
||||
};
|
||||
|
||||
const TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS = 45_000;
|
||||
|
||||
let telegramBotRuntimeForTest: TelegramBotRuntime | undefined;
|
||||
|
||||
export function setTelegramBotRuntimeForTest(runtime?: TelegramBotRuntime): void {
|
||||
@@ -114,7 +116,8 @@ function extractTelegramApiMethod(input: TelegramFetchInput): string | null {
|
||||
try {
|
||||
const pathname = new URL(url).pathname;
|
||||
const segments = pathname.split("/").filter(Boolean);
|
||||
return segments.length > 0 ? (segments.at(-1) ?? null) : null;
|
||||
const method = segments.length > 0 ? (segments.at(-1) ?? null) : null;
|
||||
return method?.toLowerCase() ?? null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
@@ -164,15 +167,12 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
ApiClientOptions["fetch"]
|
||||
>;
|
||||
|
||||
// When a shutdown abort signal is provided, wrap fetch so every Telegram API request
|
||||
// (especially long-polling getUpdates) aborts immediately on shutdown. Without this,
|
||||
// the in-flight getUpdates hangs for up to 30s, and a new gateway instance starting
|
||||
// its own poll triggers a 409 Conflict from Telegram.
|
||||
// Wrap fetch so polling requests cannot hang indefinitely on a wedged network path,
|
||||
// and so shutdown still aborts in-flight Telegram API requests immediately.
|
||||
let finalFetch = shouldProvideFetch ? fetchForClient : undefined;
|
||||
if (opts.fetchAbortSignal) {
|
||||
if (finalFetch || opts.fetchAbortSignal) {
|
||||
const baseFetch =
|
||||
finalFetch ?? (globalThis.fetch as unknown as NonNullable<ApiClientOptions["fetch"]>);
|
||||
const shutdownSignal = opts.fetchAbortSignal;
|
||||
// Cast baseFetch to global fetch to avoid node-fetch ↔ global-fetch type divergence;
|
||||
// they are runtime-compatible (the codebase already casts at every fetch boundary).
|
||||
const callFetch = baseFetch as unknown as typeof globalThis.fetch;
|
||||
@@ -182,11 +182,16 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
finalFetch = ((input: TelegramFetchInput, init?: TelegramFetchInit) => {
|
||||
const controller = new AbortController();
|
||||
const abortWith = (signal: AbortSignal) => controller.abort(signal.reason);
|
||||
const onShutdown = () => abortWith(shutdownSignal);
|
||||
const shutdownSignal = opts.fetchAbortSignal;
|
||||
const onShutdown = () => abortWith(shutdownSignal as AbortSignal);
|
||||
const method = extractTelegramApiMethod(input);
|
||||
const requestTimeoutMs =
|
||||
method === "getupdates" ? TELEGRAM_GET_UPDATES_REQUEST_TIMEOUT_MS : undefined;
|
||||
let requestTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||
let onRequestAbort: (() => void) | undefined;
|
||||
if (shutdownSignal.aborted) {
|
||||
if (shutdownSignal?.aborted) {
|
||||
abortWith(shutdownSignal);
|
||||
} else {
|
||||
} else if (shutdownSignal) {
|
||||
shutdownSignal.addEventListener("abort", onShutdown, { once: true });
|
||||
}
|
||||
if (init?.signal) {
|
||||
@@ -197,11 +202,20 @@ export function createTelegramBot(opts: TelegramBotOptions) {
|
||||
init.signal.addEventListener("abort", onRequestAbort);
|
||||
}
|
||||
}
|
||||
if (requestTimeoutMs) {
|
||||
requestTimeout = setTimeout(() => {
|
||||
controller.abort(new Error(`Telegram ${method} timed out after ${requestTimeoutMs}ms`));
|
||||
}, requestTimeoutMs);
|
||||
requestTimeout.unref?.();
|
||||
}
|
||||
return callFetch(input as GlobalFetchInput, {
|
||||
...(init as GlobalFetchInit),
|
||||
signal: controller.signal,
|
||||
}).finally(() => {
|
||||
shutdownSignal.removeEventListener("abort", onShutdown);
|
||||
if (requestTimeout) {
|
||||
clearTimeout(requestTimeout);
|
||||
}
|
||||
shutdownSignal?.removeEventListener("abort", onShutdown);
|
||||
if (init?.signal && onRequestAbort) {
|
||||
init.signal.removeEventListener("abort", onRequestAbort);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user