fix(telegram): keep unreachable polling sockets non-fatal

* Runtime: suppress transient network uncaught exceptions

* fix(telegram): keep unreachable polling sockets non-fatal

---------

Co-authored-by: Peter Steinberger <steipete@gmail.com>
This commit is contained in:
Hemant Sudarshan
2026-04-30 00:23:43 +05:30
committed by GitHub
parent dabf76b3de
commit db6951088a
6 changed files with 156 additions and 35 deletions

View File

@@ -44,24 +44,40 @@ const { initSpy, runSpy, getRuntimeConfigMock } = vi.hoisted(() => ({
})),
}));
const { registerUnhandledRejectionHandlerMock, emitUnhandledRejection, resetUnhandledRejection } =
vi.hoisted(() => {
let handler: ((reason: unknown) => boolean) | undefined;
return {
registerUnhandledRejectionHandlerMock: vi.fn((next: (reason: unknown) => boolean) => {
handler = next;
return () => {
if (handler === next) {
handler = undefined;
}
};
}),
emitUnhandledRejection: (reason: unknown) => handler?.(reason) ?? false,
resetUnhandledRejection: () => {
handler = undefined;
},
};
});
const {
registerUnhandledRejectionHandlerMock,
registerUncaughtExceptionHandlerMock,
emitUnhandledRejection,
emitUncaughtException,
resetProcessErrorHandlers,
} = vi.hoisted(() => {
let unhandledRejectionHandler: ((reason: unknown) => boolean) | undefined;
let uncaughtExceptionHandler: ((error: unknown) => boolean) | undefined;
return {
registerUnhandledRejectionHandlerMock: vi.fn((next: (reason: unknown) => boolean) => {
unhandledRejectionHandler = next;
return () => {
if (unhandledRejectionHandler === next) {
unhandledRejectionHandler = undefined;
}
};
}),
registerUncaughtExceptionHandlerMock: vi.fn((next: (error: unknown) => boolean) => {
uncaughtExceptionHandler = next;
return () => {
if (uncaughtExceptionHandler === next) {
uncaughtExceptionHandler = undefined;
}
};
}),
emitUnhandledRejection: (reason: unknown) => unhandledRejectionHandler?.(reason) ?? false,
emitUncaughtException: (error: unknown) => uncaughtExceptionHandler?.(error) ?? false,
resetProcessErrorHandlers: () => {
unhandledRejectionHandler = undefined;
uncaughtExceptionHandler = undefined;
},
};
});
const { createTelegramBotErrors } = vi.hoisted(() => ({
createTelegramBotErrors: [] as unknown[],
@@ -113,6 +129,16 @@ function makeRecoverableFetchError() {
});
}
class MockHttpError extends Error {
constructor(
message: string,
public readonly error: unknown,
) {
super(message);
this.name = "HttpError";
}
}
async function makeTaggedPollingFetchError() {
const { tagTelegramNetworkError } = await import("./network-errors.js");
const err = makeRecoverableFetchError();
@@ -123,6 +149,13 @@ async function makeTaggedPollingFetchError() {
return err;
}
async function makeTaggedPollingHttpError() {
return new MockHttpError(
"Network request for 'getUpdates' failed!",
await makeTaggedPollingFetchError(),
);
}
const createAbortTask = (
abort: AbortController,
beforeAbort?: () => void,
@@ -316,6 +349,7 @@ vi.mock("openclaw/plugin-sdk/runtime-env", async () => {
computeBackoff,
sleepWithAbort,
registerUnhandledRejectionHandler: registerUnhandledRejectionHandlerMock,
registerUncaughtExceptionHandler: registerUncaughtExceptionHandlerMock,
};
});
@@ -364,7 +398,8 @@ describe("monitorTelegramProvider (grammY)", () => {
close: vi.fn(async () => undefined),
}));
registerUnhandledRejectionHandlerMock.mockClear();
resetUnhandledRejection();
registerUncaughtExceptionHandlerMock.mockClear();
resetProcessErrorHandlers();
createTelegramBotErrors.length = 0;
createdBotStops.length = 0;
consoleErrorSpy = vi.spyOn(console, "error").mockImplementation(() => {});
@@ -659,6 +694,38 @@ describe("monitorTelegramProvider (grammY)", () => {
expectRecoverableRetryState(2);
});
it("force-restarts polling when uncaught network exception stalls runner", async () => {
const abort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await firstCycle.waitForRunStart();
expect(emitUncaughtException(await makeTaggedPollingFetchError())).toBe(true);
expect(firstCycle.stop).toHaveBeenCalledTimes(1);
await secondCycle.waitForRunStart();
abort.abort();
await monitor;
expectRecoverableRetryState(2);
});
it("force-restarts polling when uncaught polling HttpError stalls runner", async () => {
const abort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await firstCycle.waitForRunStart();
expect(emitUncaughtException(await makeTaggedPollingHttpError())).toBe(true);
expect(firstCycle.stop).toHaveBeenCalledTimes(1);
await secondCycle.waitForRunStart();
abort.abort();
await monitor;
expectRecoverableRetryState(2);
});
it("rebuilds the resolved transport after a stalled polling restart", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
try {

View File

@@ -4,8 +4,11 @@ import { registerChannelRuntimeContext } from "openclaw/plugin-sdk/channel-runti
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-types";
import { resolveAgentMaxConcurrent } from "openclaw/plugin-sdk/model-session-runtime";
import { getRuntimeConfig } from "openclaw/plugin-sdk/runtime-config-snapshot";
import { waitForAbortSignal } from "openclaw/plugin-sdk/runtime-env";
import { registerUnhandledRejectionHandler } from "openclaw/plugin-sdk/runtime-env";
import {
registerUncaughtExceptionHandler,
registerUnhandledRejectionHandler,
waitForAbortSignal,
} from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { formatErrorMessage } from "openclaw/plugin-sdk/ssrf-runtime";
import { resolveTelegramAccount } from "./accounts.js";
@@ -89,13 +92,9 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const log = opts.runtime?.error ?? console.error;
let pollingSession: TelegramPollingSessionInstance | undefined;
const unregisterHandler = registerUnhandledRejectionHandler((err) => {
const handlePollingNetworkFailure = (err: unknown, label: string) => {
const isNetworkError = isRecoverableTelegramNetworkError(err, { context: "polling" });
const isTelegramPollingError = isTelegramPollingNetworkError(err);
if (isGrammyHttpError(err) && isNetworkError && isTelegramPollingError) {
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
return true;
}
const activeRunner = pollingSession?.activeRunner;
if (isNetworkError && isTelegramPollingError && activeRunner && activeRunner.isRunning()) {
@@ -104,14 +103,24 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
pollingSession?.abortActiveFetch();
void activeRunner.stop().catch(() => {});
log("[telegram][diag] marking transport dirty after polling network failure");
log(
`[telegram] Restarting polling after unhandled network error: ${formatErrorMessage(err)}`,
);
log(`[telegram] Restarting polling after ${label}: ${formatErrorMessage(err)}`);
return true;
}
if (isGrammyHttpError(err) && isNetworkError && isTelegramPollingError) {
log(`[telegram] Suppressed network error: ${formatErrorMessage(err)}`);
return true;
}
return false;
});
};
const unregisterUnhandledRejectionHandler = registerUnhandledRejectionHandler((err) =>
handlePollingNetworkFailure(err, "unhandled network error"),
);
const unregisterUncaughtExceptionHandler = registerUncaughtExceptionHandler((err) =>
handlePollingNetworkFailure(err, "uncaught network error"),
);
try {
const cfg = opts.config ?? getRuntimeConfig();
@@ -254,6 +263,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
pollingLease.release();
}
} finally {
unregisterHandler();
unregisterUnhandledRejectionHandler();
unregisterUncaughtExceptionHandler();
}
}