mirror of
https://github.com/openclaw/openclaw.git
synced 2026-03-12 07:20:45 +00:00
fix(telegram): harden polling retry setup and teardown order
Co-authored-by: Cklee <99405438+liebertar@users.noreply.github.com> Co-authored-by: Ho Lim <166576253+HOYALIM@users.noreply.github.com>
This commit is contained in:
@@ -27,6 +27,7 @@ Docs: https://docs.openclaw.ai
|
||||
### Fixes
|
||||
|
||||
- Telegram/Webhook: keep webhook monitors alive until gateway abort signals fire, preventing false channel exits and immediate webhook auto-restart loops.
|
||||
- Telegram/Polling: retry recoverable setup-time network failures in monitor startup and await runner teardown before retry to avoid overlapping polling sessions.
|
||||
- Signal/RPC: guard malformed Signal RPC JSON responses with a clear status-scoped error and add regression coverage for invalid JSON responses. (#22995) Thanks @adhitShet.
|
||||
- Gateway/Subagents: guard gateway and subagent session-key/message trim paths against undefined inputs to prevent early `Cannot read properties of undefined (reading 'trim')` crashes during subagent spawn and wait flows.
|
||||
- Agents/Workspace: guard `resolveUserPath` against undefined/null input to prevent `Cannot read properties of undefined (reading 'trim')` crashes when workspace paths are missing in embedded runner flows.
|
||||
|
||||
@@ -55,6 +55,10 @@ const { registerUnhandledRejectionHandlerMock, emitUnhandledRejection, resetUnha
|
||||
};
|
||||
});
|
||||
|
||||
const { createTelegramBotErrors } = vi.hoisted(() => ({
|
||||
createTelegramBotErrors: [] as unknown[],
|
||||
}));
|
||||
|
||||
const { computeBackoff, sleepWithAbort } = vi.hoisted(() => ({
|
||||
computeBackoff: vi.fn(() => 0),
|
||||
sleepWithAbort: vi.fn(async () => undefined),
|
||||
@@ -73,6 +77,10 @@ vi.mock("../config/config.js", async (importOriginal) => {
|
||||
|
||||
vi.mock("./bot.js", () => ({
|
||||
createTelegramBot: () => {
|
||||
const nextError = createTelegramBotErrors.shift();
|
||||
if (nextError) {
|
||||
throw nextError;
|
||||
}
|
||||
handlers.message = async (ctx: MockCtx) => {
|
||||
const chatId = ctx.message.chat.id;
|
||||
const isGroup = ctx.message.chat.type !== "private";
|
||||
@@ -134,6 +142,7 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
startTelegramWebhookSpy.mockClear();
|
||||
registerUnhandledRejectionHandlerMock.mockClear();
|
||||
resetUnhandledRejection();
|
||||
createTelegramBotErrors.length = 0;
|
||||
});
|
||||
|
||||
it("processes a DM and sends reply", async () => {
|
||||
@@ -218,6 +227,62 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("retries setup-time recoverable errors before starting polling", async () => {
|
||||
const setupError = Object.assign(new TypeError("fetch failed"), {
|
||||
cause: Object.assign(new Error("connect timeout"), {
|
||||
code: "UND_ERR_CONNECT_TIMEOUT",
|
||||
}),
|
||||
});
|
||||
createTelegramBotErrors.push(setupError);
|
||||
|
||||
runSpy.mockImplementationOnce(() => ({
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
isRunning: () => false,
|
||||
}));
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("awaits runner.stop before retrying after recoverable polling error", async () => {
|
||||
const recoverableError = Object.assign(new TypeError("fetch failed"), {
|
||||
cause: Object.assign(new Error("connect timeout"), {
|
||||
code: "UND_ERR_CONNECT_TIMEOUT",
|
||||
}),
|
||||
});
|
||||
let firstStopped = false;
|
||||
const firstStop = vi.fn(async () => {
|
||||
await Promise.resolve();
|
||||
firstStopped = true;
|
||||
});
|
||||
|
||||
runSpy
|
||||
.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(recoverableError),
|
||||
stop: firstStop,
|
||||
isRunning: () => false,
|
||||
}))
|
||||
.mockImplementationOnce(() => {
|
||||
expect(firstStopped).toBe(true);
|
||||
return {
|
||||
task: () => Promise.resolve(),
|
||||
stop: vi.fn(),
|
||||
isRunning: () => false,
|
||||
};
|
||||
});
|
||||
|
||||
await monitorTelegramProvider({ token: "tok" });
|
||||
|
||||
expect(firstStop).toHaveBeenCalled();
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("surfaces non-recoverable errors", async () => {
|
||||
runSpy.mockImplementationOnce(() => ({
|
||||
task: () => Promise.reject(new Error("bad token")),
|
||||
@@ -256,7 +321,7 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
expect(emitUnhandledRejection(new TypeError("fetch failed"))).toBe(true);
|
||||
await monitor;
|
||||
|
||||
expect(stop).toHaveBeenCalledTimes(1);
|
||||
expect(stop.mock.calls.length).toBeGreaterThanOrEqual(1);
|
||||
expect(computeBackoff).toHaveBeenCalled();
|
||||
expect(sleepWithAbort).toHaveBeenCalled();
|
||||
expect(runSpy).toHaveBeenCalledTimes(2);
|
||||
|
||||
@@ -152,18 +152,6 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
}
|
||||
};
|
||||
|
||||
const bot = createTelegramBot({
|
||||
token,
|
||||
runtime: opts.runtime,
|
||||
proxyFetch,
|
||||
config: cfg,
|
||||
accountId: account.accountId,
|
||||
updateOffset: {
|
||||
lastUpdateId,
|
||||
onUpdateId: persistUpdateId,
|
||||
},
|
||||
});
|
||||
|
||||
if (opts.useWebhook) {
|
||||
await startTelegramWebhook({
|
||||
token,
|
||||
@@ -192,9 +180,46 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
|
||||
// Use grammyjs/runner for concurrent update processing
|
||||
let restartAttempts = 0;
|
||||
const runnerOptions = createTelegramRunnerOptions(cfg);
|
||||
|
||||
while (!opts.abortSignal?.aborted) {
|
||||
const runner = run(bot, createTelegramRunnerOptions(cfg));
|
||||
let bot;
|
||||
try {
|
||||
bot = createTelegramBot({
|
||||
token,
|
||||
runtime: opts.runtime,
|
||||
proxyFetch,
|
||||
config: cfg,
|
||||
accountId: account.accountId,
|
||||
updateOffset: {
|
||||
lastUpdateId,
|
||||
onUpdateId: persistUpdateId,
|
||||
},
|
||||
});
|
||||
} catch (err) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
if (!isRecoverableTelegramNetworkError(err, { context: "unknown" })) {
|
||||
throw err;
|
||||
}
|
||||
restartAttempts += 1;
|
||||
const delayMs = computeBackoff(TELEGRAM_POLL_RESTART_POLICY, restartAttempts);
|
||||
(opts.runtime?.error ?? console.error)(
|
||||
`Telegram setup network error: ${formatErrorMessage(err)}; retrying in ${formatDurationPrecise(delayMs)}.`,
|
||||
);
|
||||
try {
|
||||
await sleepWithAbort(delayMs, opts.abortSignal);
|
||||
} catch (sleepErr) {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
throw sleepErr;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
const runner = run(bot, runnerOptions);
|
||||
activeRunner = runner;
|
||||
const stopOnAbort = () => {
|
||||
if (opts.abortSignal?.aborted) {
|
||||
@@ -243,6 +268,11 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
}
|
||||
} finally {
|
||||
opts.abortSignal?.removeEventListener("abort", stopOnAbort);
|
||||
try {
|
||||
await runner.stop();
|
||||
} catch {
|
||||
// Runner may already be stopped by abort/retry paths.
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
Reference in New Issue
Block a user