diff --git a/CHANGELOG.md b/CHANGELOG.md index 54d12ecc389..d5bbab52274 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Telegram/polling: raise the default polling watchdog threshold from 90s to 120s and add configurable `channels.telegram.pollingStallThresholdMs` (also per-account) so long-running Telegram work gets more room before polling is treated as stalled. (#57737) Thanks @Vitalcheffe. - Telegram/polling: bound the persisted-offset confirmation `getUpdates` probe with a client-side timeout so a zombie socket cannot hang polling recovery before the runner watchdog starts. (#50368) Thanks @boticlaw. - Agents/Pi runner: retry silent `stopReason=error` turns with no output when no side effects ran, so non-frontier providers that briefly return empty error turns get another chance instead of ending the session early. (#68310) Thanks @Chased1k. - Plugins/memory: preserve the active memory capability when read-only snapshot plugin loads run, so status and provider discovery paths no longer wipe memory public artifacts. (#69219) Thanks @zeroaltitude. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index 20522e517c2..a39438083bb 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -259,6 +259,7 @@ curl "https://api.telegram.org/bot/getUpdates" - Group sessions are isolated by group ID. Forum topics append `:topic:` to keep topics isolated. - DM messages can carry `message_thread_id`; OpenClaw routes them with thread-aware session keys and preserves thread ID for replies. - Long polling uses grammY runner with per-chat/per-thread sequencing. Overall runner sink concurrency uses `agents.defaults.maxConcurrent`. +- Long-polling watchdog restarts trigger after 120 seconds without completed `getUpdates` liveness by default. Increase `channels.telegram.pollingStallThresholdMs` only if your deployment still sees false polling-stall restarts during long-running work. The value is in milliseconds and is allowed from `30000` to `600000`; per-account overrides are supported. - Telegram Bot API has no read-receipt support (`sendReadReceipts` does not apply). ## Feature reference diff --git a/extensions/telegram/src/config-schema.test.ts b/extensions/telegram/src/config-schema.test.ts index 05a3757167b..37474455704 100644 --- a/extensions/telegram/src/config-schema.test.ts +++ b/extensions/telegram/src/config-schema.test.ts @@ -53,6 +53,24 @@ describe("telegram custom commands schema", () => { } }); + it("accepts pollingStallThresholdMs overrides per account", () => { + const res = TelegramConfigSchema.safeParse({ + pollingStallThresholdMs: 120_000, + accounts: { ops: { pollingStallThresholdMs: 180_000 } }, + }); + + expect(res.success).toBe(true); + if (res.success) { + expect(res.data.pollingStallThresholdMs).toBe(120_000); + expect(res.data.accounts?.ops?.pollingStallThresholdMs).toBe(180_000); + } + }); + + it("rejects pollingStallThresholdMs outside the watchdog bounds", () => { + expectTelegramConfigIssue({ pollingStallThresholdMs: 29_999 }, "pollingStallThresholdMs"); + expectTelegramConfigIssue({ pollingStallThresholdMs: 600_001 }, "pollingStallThresholdMs"); + }); + it("accepts textChunkLimit", () => { const res = TelegramConfigSchema.safeParse({ enabled: true, diff --git a/extensions/telegram/src/config-ui-hints.ts b/extensions/telegram/src/config-ui-hints.ts index 1e5cacf1cdb..9439b487811 100644 --- a/extensions/telegram/src/config-ui-hints.ts +++ b/extensions/telegram/src/config-ui-hints.ts @@ -89,6 +89,10 @@ export const telegramChannelConfigUiHints = { label: "Telegram API Timeout (seconds)", help: "Max seconds before Telegram API requests are aborted (default: 500 per grammY).", }, + pollingStallThresholdMs: { + label: "Telegram Polling Stall Threshold (ms)", + help: "Milliseconds without completed Telegram getUpdates liveness before the polling watchdog restarts the polling runner. Default: 120000.", + }, silentErrorReplies: { label: "Telegram Silent Error Replies", help: "When true, Telegram bot replies marked as errors are sent silently (no notification sound). Default: false.", diff --git a/extensions/telegram/src/monitor.test.ts b/extensions/telegram/src/monitor.test.ts index 55f202fb26e..7bc7ffa654b 100644 --- a/extensions/telegram/src/monitor.test.ts +++ b/extensions/telegram/src/monitor.test.ts @@ -585,7 +585,7 @@ describe("monitorTelegramProvider (grammY)", () => { const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); await firstCycle.waitForRunStart(); - vi.advanceTimersByTime(120_000); + vi.advanceTimersByTime(150_000); await secondCycle.waitForRunStart(); await monitor; @@ -728,8 +728,8 @@ describe("monitorTelegramProvider (grammY)", () => { const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal }); await firstCycle.waitForRunStart(); - // Advance time past the stall threshold (90s) + watchdog interval (30s) - vi.advanceTimersByTime(120_000); + // Advance time past the stall threshold (120s) + watchdog interval (30s) + vi.advanceTimersByTime(150_000); await secondCycle.waitForRunStart(); await monitor; @@ -738,6 +738,31 @@ describe("monitorTelegramProvider (grammY)", () => { vi.useRealTimers(); }); + it("uses configured Telegram polling stall threshold", async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + const abort = new AbortController(); + const firstCycle = mockRunOnceWithStalledPollingRunner(); + const secondCycle = mockRunOnceAndAbort(abort); + + const monitor = monitorTelegramProvider({ + token: "tok", + abortSignal: abort.signal, + config: { + agents: { defaults: { maxConcurrent: 2 } }, + channels: { telegram: { pollingStallThresholdMs: 30_000 } }, + }, + }); + await firstCycle.waitForRunStart(); + + vi.advanceTimersByTime(60_000); + await secondCycle.waitForRunStart(); + await monitor; + + expect(firstCycle.stop.mock.calls.length).toBeGreaterThanOrEqual(1); + expectRecoverableRetryState(2); + vi.useRealTimers(); + }); + it("confirms persisted offset with Telegram before starting runner", async () => { const { order } = await runMonitorAndCaptureStartupOrder({ persistedOffset: 549076203, diff --git a/extensions/telegram/src/monitor.ts b/extensions/telegram/src/monitor.ts index 40a4cd06586..1691607d60b 100644 --- a/extensions/telegram/src/monitor.ts +++ b/extensions/telegram/src/monitor.ts @@ -227,6 +227,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) { log, telegramTransport, createTelegramTransport: createTelegramTransportForPolling, + stallThresholdMs: account.config.pollingStallThresholdMs, setStatus: opts.setStatus, }); await pollingSession.runUntilAbort(); diff --git a/extensions/telegram/src/polling-session.test.ts b/extensions/telegram/src/polling-session.test.ts index eabcd6e52c0..4822e20d24a 100644 --- a/extensions/telegram/src/polling-session.test.ts +++ b/extensions/telegram/src/polling-session.test.ts @@ -51,7 +51,7 @@ function makeBot() { function installPollingStallWatchdogHarness( dateNowSequence: readonly number[] = [0, 0], - fallbackDateNow = 120_001, + fallbackDateNow = 150_001, ) { let watchdog: (() => void) | undefined; const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => { @@ -141,6 +141,7 @@ function createPollingSession(params: { log?: (message: string) => void; telegramTransport?: ReturnType; createTelegramTransport?: () => ReturnType; + stallThresholdMs?: number; setStatus?: (patch: Omit) => void; }) { return new TelegramPollingSession({ @@ -155,6 +156,7 @@ function createPollingSession(params: { persistUpdateId: async () => undefined, log: params.log ?? (() => undefined), telegramTransport: params.telegramTransport, + stallThresholdMs: params.stallThresholdMs, setStatus: params.setStatus, ...(params.createTelegramTransport ? { createTelegramTransport: params.createTelegramTransport } @@ -393,6 +395,38 @@ describe("TelegramPollingSession", () => { } }); + it("honors a custom polling stall threshold", async () => { + const abort = new AbortController(); + const botStop = vi.fn(async () => undefined); + const runnerStop = vi.fn(async () => undefined); + mockBotCapturingApiMiddleware(botStop); + const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); + const watchdogHarness = installPollingStallWatchdogHarness([0, 0], 150_001); + + const log = vi.fn(); + const session = createPollingSession({ + abortSignal: abort.signal, + log, + stallThresholdMs: 180_000, + }); + + try { + const runPromise = session.runUntilAbort(); + const watchdog = await watchdogHarness.waitForWatchdog(); + watchdog?.(); + + expect(runnerStop).not.toHaveBeenCalled(); + expect(botStop).not.toHaveBeenCalled(); + expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected")); + + abort.abort(); + resolveFirstTask(); + await runPromise; + } finally { + watchdogHarness.restore(); + } + }); + it("rebuilds the transport after a stalled polling cycle", async () => { vi.useFakeTimers({ shouldAdvanceTime: true }); const abort = new AbortController(); @@ -662,8 +696,8 @@ describe("TelegramPollingSession", () => { const resolveFirstTask = mockLongRunningPollingCycle(runnerStop); // t=0: lastGetUpdatesAt and lastApiActivityAt initialized - // t=120_001: watchdog fires (getUpdates stale for 120s) - // But right before watchdog, a sendMessage succeeded at t=120_000 + // t=150_001: watchdog fires (getUpdates stale for 150s) + // But right before watchdog, a sendMessage succeeds at t=150_001 // All subsequent Date.now calls return the same value, giving apiIdle = 0. const watchdogHarness = installPollingStallWatchdogHarness(); @@ -789,7 +823,7 @@ describe("TelegramPollingSession", () => { ); const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" }); - // The in-flight send started at t=1 and is still stuck at t=120_001. + // The in-flight send started at t=1 and is still stuck at t=150_001. // That is older than the watchdog threshold, so restart should proceed. watchdog?.(); diff --git a/extensions/telegram/src/polling-session.ts b/extensions/telegram/src/polling-session.ts index 894d11cf8c9..7f1cd401c6d 100644 --- a/extensions/telegram/src/polling-session.ts +++ b/extensions/telegram/src/polling-session.ts @@ -22,7 +22,9 @@ const TELEGRAM_POLL_RESTART_POLICY = { jitter: 0.25, }; -const POLL_STALL_THRESHOLD_MS = 90_000; +const DEFAULT_POLL_STALL_THRESHOLD_MS = 120_000; +const MIN_POLL_STALL_THRESHOLD_MS = 30_000; +const MAX_POLL_STALL_THRESHOLD_MS = 600_000; const POLL_WATCHDOG_INTERVAL_MS = 30_000; const POLL_STOP_GRACE_MS = 15_000; const CONFIRM_PERSISTED_OFFSET_TIMEOUT_MS = 10_000; @@ -50,6 +52,16 @@ const waitForGracefulStop = async (stop: () => Promise) => { const telegramApiTimeoutSignal = (timeoutMs: number): TelegramApiAbortSignal => AbortSignal.timeout(timeoutMs) as unknown as TelegramApiAbortSignal; +const resolvePollingStallThresholdMs = (value: number | undefined): number => { + if (typeof value !== "number" || !Number.isFinite(value)) { + return DEFAULT_POLL_STALL_THRESHOLD_MS; + } + return Math.min( + MAX_POLL_STALL_THRESHOLD_MS, + Math.max(MIN_POLL_STALL_THRESHOLD_MS, Math.floor(value)), + ); +}; + type TelegramPollingSessionOpts = { token: string; config: Parameters[0]["config"]; @@ -65,6 +77,8 @@ type TelegramPollingSessionOpts = { telegramTransport?: TelegramTransport; /** Rebuild Telegram transport after stall/network recovery when marked dirty. */ createTelegramTransport?: () => TelegramTransport; + /** Stall detection threshold in ms. Defaults to 120_000 (2 min). */ + stallThresholdMs?: number; setStatus?: (patch: Omit) => void; }; @@ -76,6 +90,7 @@ export class TelegramPollingSession { #activeFetchAbort: AbortController | undefined; #transportState: TelegramPollingTransportState; #status: ReturnType; + #stallThresholdMs: number; constructor(private readonly opts: TelegramPollingSessionOpts) { this.#transportState = new TelegramPollingTransportState({ @@ -84,6 +99,7 @@ export class TelegramPollingSession { createTelegramTransport: opts.createTelegramTransport, }); this.#status = createTelegramPollingStatusPublisher(opts.setStatus); + this.#stallThresholdMs = resolvePollingStallThresholdMs(opts.stallThresholdMs); } get activeRunner() { @@ -300,7 +316,7 @@ export class TelegramPollingSession { } const stall = liveness.detectStall({ - thresholdMs: POLL_STALL_THRESHOLD_MS, + thresholdMs: this.#stallThresholdMs, runnerIsRunning: runner.isRunning(), }); if (stall) { diff --git a/src/config/types.telegram.ts b/src/config/types.telegram.ts index 50d073d6f3f..8b7c7b01f4a 100644 --- a/src/config/types.telegram.ts +++ b/src/config/types.telegram.ts @@ -152,6 +152,8 @@ export type TelegramAccountConfig = { mediaMaxMb?: number; /** Telegram API client timeout in seconds (grammY ApiClientOptions). */ timeoutSeconds?: number; + /** Telegram polling watchdog threshold in milliseconds. Default: 120000. */ + pollingStallThresholdMs?: number; /** Retry policy for outbound Telegram API calls. */ retry?: OutboundRetryConfig; /** Network transport overrides for Telegram. */ diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 8dfa2e59054..8add8ece8f8 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -240,6 +240,7 @@ export const TelegramAccountSchemaBase = z streaming: ChannelPreviewStreamingConfigSchema.optional(), mediaMaxMb: z.number().positive().optional(), timeoutSeconds: z.number().int().positive().optional(), + pollingStallThresholdMs: z.number().int().min(30_000).max(600_000).optional(), retry: RetryConfigSchema, network: z .object({