mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-06 13:10:43 +00:00
fix(telegram): tune polling stall threshold
Raise the Telegram polling watchdog default from 90s to 120s and add bounded channels.telegram.pollingStallThresholdMs overrides, including per-account config.\n\nThanks @Vitalcheffe.
This commit is contained in:
committed by
GitHub
parent
660e4257a7
commit
8c05043eca
@@ -13,6 +13,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Telegram/polling: raise the default polling watchdog threshold from 90s to 120s and add configurable `channels.telegram.pollingStallThresholdMs` (also per-account) so long-running Telegram work gets more room before polling is treated as stalled. (#57737) Thanks @Vitalcheffe.
|
||||
- Telegram/polling: bound the persisted-offset confirmation `getUpdates` probe with a client-side timeout so a zombie socket cannot hang polling recovery before the runner watchdog starts. (#50368) Thanks @boticlaw.
|
||||
- Agents/Pi runner: retry silent `stopReason=error` turns with no output when no side effects ran, so non-frontier providers that briefly return empty error turns get another chance instead of ending the session early. (#68310) Thanks @Chased1k.
|
||||
- Plugins/memory: preserve the active memory capability when read-only snapshot plugin loads run, so status and provider discovery paths no longer wipe memory public artifacts. (#69219) Thanks @zeroaltitude.
|
||||
|
||||
@@ -259,6 +259,7 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
|
||||
- Group sessions are isolated by group ID. Forum topics append `:topic:<threadId>` to keep topics isolated.
|
||||
- DM messages can carry `message_thread_id`; OpenClaw routes them with thread-aware session keys and preserves thread ID for replies.
|
||||
- Long polling uses grammY runner with per-chat/per-thread sequencing. Overall runner sink concurrency uses `agents.defaults.maxConcurrent`.
|
||||
- Long-polling watchdog restarts trigger after 120 seconds without completed `getUpdates` liveness by default. Increase `channels.telegram.pollingStallThresholdMs` only if your deployment still sees false polling-stall restarts during long-running work. The value is in milliseconds and is allowed from `30000` to `600000`; per-account overrides are supported.
|
||||
- Telegram Bot API has no read-receipt support (`sendReadReceipts` does not apply).
|
||||
|
||||
## Feature reference
|
||||
|
||||
@@ -53,6 +53,24 @@ describe("telegram custom commands schema", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("accepts pollingStallThresholdMs overrides per account", () => {
|
||||
const res = TelegramConfigSchema.safeParse({
|
||||
pollingStallThresholdMs: 120_000,
|
||||
accounts: { ops: { pollingStallThresholdMs: 180_000 } },
|
||||
});
|
||||
|
||||
expect(res.success).toBe(true);
|
||||
if (res.success) {
|
||||
expect(res.data.pollingStallThresholdMs).toBe(120_000);
|
||||
expect(res.data.accounts?.ops?.pollingStallThresholdMs).toBe(180_000);
|
||||
}
|
||||
});
|
||||
|
||||
it("rejects pollingStallThresholdMs outside the watchdog bounds", () => {
|
||||
expectTelegramConfigIssue({ pollingStallThresholdMs: 29_999 }, "pollingStallThresholdMs");
|
||||
expectTelegramConfigIssue({ pollingStallThresholdMs: 600_001 }, "pollingStallThresholdMs");
|
||||
});
|
||||
|
||||
it("accepts textChunkLimit", () => {
|
||||
const res = TelegramConfigSchema.safeParse({
|
||||
enabled: true,
|
||||
|
||||
@@ -89,6 +89,10 @@ export const telegramChannelConfigUiHints = {
|
||||
label: "Telegram API Timeout (seconds)",
|
||||
help: "Max seconds before Telegram API requests are aborted (default: 500 per grammY).",
|
||||
},
|
||||
pollingStallThresholdMs: {
|
||||
label: "Telegram Polling Stall Threshold (ms)",
|
||||
help: "Milliseconds without completed Telegram getUpdates liveness before the polling watchdog restarts the polling runner. Default: 120000.",
|
||||
},
|
||||
silentErrorReplies: {
|
||||
label: "Telegram Silent Error Replies",
|
||||
help: "When true, Telegram bot replies marked as errors are sent silently (no notification sound). Default: false.",
|
||||
|
||||
@@ -585,7 +585,7 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
|
||||
await firstCycle.waitForRunStart();
|
||||
|
||||
vi.advanceTimersByTime(120_000);
|
||||
vi.advanceTimersByTime(150_000);
|
||||
await secondCycle.waitForRunStart();
|
||||
await monitor;
|
||||
|
||||
@@ -728,8 +728,8 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
|
||||
await firstCycle.waitForRunStart();
|
||||
|
||||
// Advance time past the stall threshold (90s) + watchdog interval (30s)
|
||||
vi.advanceTimersByTime(120_000);
|
||||
// Advance time past the stall threshold (120s) + watchdog interval (30s)
|
||||
vi.advanceTimersByTime(150_000);
|
||||
await secondCycle.waitForRunStart();
|
||||
await monitor;
|
||||
|
||||
@@ -738,6 +738,31 @@ describe("monitorTelegramProvider (grammY)", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("uses configured Telegram polling stall threshold", async () => {
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
const abort = new AbortController();
|
||||
const firstCycle = mockRunOnceWithStalledPollingRunner();
|
||||
const secondCycle = mockRunOnceAndAbort(abort);
|
||||
|
||||
const monitor = monitorTelegramProvider({
|
||||
token: "tok",
|
||||
abortSignal: abort.signal,
|
||||
config: {
|
||||
agents: { defaults: { maxConcurrent: 2 } },
|
||||
channels: { telegram: { pollingStallThresholdMs: 30_000 } },
|
||||
},
|
||||
});
|
||||
await firstCycle.waitForRunStart();
|
||||
|
||||
vi.advanceTimersByTime(60_000);
|
||||
await secondCycle.waitForRunStart();
|
||||
await monitor;
|
||||
|
||||
expect(firstCycle.stop.mock.calls.length).toBeGreaterThanOrEqual(1);
|
||||
expectRecoverableRetryState(2);
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("confirms persisted offset with Telegram before starting runner", async () => {
|
||||
const { order } = await runMonitorAndCaptureStartupOrder({
|
||||
persistedOffset: 549076203,
|
||||
|
||||
@@ -227,6 +227,7 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
|
||||
log,
|
||||
telegramTransport,
|
||||
createTelegramTransport: createTelegramTransportForPolling,
|
||||
stallThresholdMs: account.config.pollingStallThresholdMs,
|
||||
setStatus: opts.setStatus,
|
||||
});
|
||||
await pollingSession.runUntilAbort();
|
||||
|
||||
@@ -51,7 +51,7 @@ function makeBot() {
|
||||
|
||||
function installPollingStallWatchdogHarness(
|
||||
dateNowSequence: readonly number[] = [0, 0],
|
||||
fallbackDateNow = 120_001,
|
||||
fallbackDateNow = 150_001,
|
||||
) {
|
||||
let watchdog: (() => void) | undefined;
|
||||
const setIntervalSpy = vi.spyOn(globalThis, "setInterval").mockImplementation((fn) => {
|
||||
@@ -141,6 +141,7 @@ function createPollingSession(params: {
|
||||
log?: (message: string) => void;
|
||||
telegramTransport?: ReturnType<typeof makeTelegramTransport>;
|
||||
createTelegramTransport?: () => ReturnType<typeof makeTelegramTransport>;
|
||||
stallThresholdMs?: number;
|
||||
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
|
||||
}) {
|
||||
return new TelegramPollingSession({
|
||||
@@ -155,6 +156,7 @@ function createPollingSession(params: {
|
||||
persistUpdateId: async () => undefined,
|
||||
log: params.log ?? (() => undefined),
|
||||
telegramTransport: params.telegramTransport,
|
||||
stallThresholdMs: params.stallThresholdMs,
|
||||
setStatus: params.setStatus,
|
||||
...(params.createTelegramTransport
|
||||
? { createTelegramTransport: params.createTelegramTransport }
|
||||
@@ -393,6 +395,38 @@ describe("TelegramPollingSession", () => {
|
||||
}
|
||||
});
|
||||
|
||||
it("honors a custom polling stall threshold", async () => {
|
||||
const abort = new AbortController();
|
||||
const botStop = vi.fn(async () => undefined);
|
||||
const runnerStop = vi.fn(async () => undefined);
|
||||
mockBotCapturingApiMiddleware(botStop);
|
||||
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
|
||||
const watchdogHarness = installPollingStallWatchdogHarness([0, 0], 150_001);
|
||||
|
||||
const log = vi.fn();
|
||||
const session = createPollingSession({
|
||||
abortSignal: abort.signal,
|
||||
log,
|
||||
stallThresholdMs: 180_000,
|
||||
});
|
||||
|
||||
try {
|
||||
const runPromise = session.runUntilAbort();
|
||||
const watchdog = await watchdogHarness.waitForWatchdog();
|
||||
watchdog?.();
|
||||
|
||||
expect(runnerStop).not.toHaveBeenCalled();
|
||||
expect(botStop).not.toHaveBeenCalled();
|
||||
expect(log).not.toHaveBeenCalledWith(expect.stringContaining("Polling stall detected"));
|
||||
|
||||
abort.abort();
|
||||
resolveFirstTask();
|
||||
await runPromise;
|
||||
} finally {
|
||||
watchdogHarness.restore();
|
||||
}
|
||||
});
|
||||
|
||||
it("rebuilds the transport after a stalled polling cycle", async () => {
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
const abort = new AbortController();
|
||||
@@ -662,8 +696,8 @@ describe("TelegramPollingSession", () => {
|
||||
const resolveFirstTask = mockLongRunningPollingCycle(runnerStop);
|
||||
|
||||
// t=0: lastGetUpdatesAt and lastApiActivityAt initialized
|
||||
// t=120_001: watchdog fires (getUpdates stale for 120s)
|
||||
// But right before watchdog, a sendMessage succeeded at t=120_000
|
||||
// t=150_001: watchdog fires (getUpdates stale for 150s)
|
||||
// But right before watchdog, a sendMessage succeeds at t=150_001
|
||||
// All subsequent Date.now calls return the same value, giving apiIdle = 0.
|
||||
const watchdogHarness = installPollingStallWatchdogHarness();
|
||||
|
||||
@@ -789,7 +823,7 @@ describe("TelegramPollingSession", () => {
|
||||
);
|
||||
const sendPromise = apiMiddleware(slowPrev, "sendMessage", { chat_id: 123, text: "hello" });
|
||||
|
||||
// The in-flight send started at t=1 and is still stuck at t=120_001.
|
||||
// The in-flight send started at t=1 and is still stuck at t=150_001.
|
||||
// That is older than the watchdog threshold, so restart should proceed.
|
||||
watchdog?.();
|
||||
|
||||
|
||||
@@ -22,7 +22,9 @@ const TELEGRAM_POLL_RESTART_POLICY = {
|
||||
jitter: 0.25,
|
||||
};
|
||||
|
||||
const POLL_STALL_THRESHOLD_MS = 90_000;
|
||||
const DEFAULT_POLL_STALL_THRESHOLD_MS = 120_000;
|
||||
const MIN_POLL_STALL_THRESHOLD_MS = 30_000;
|
||||
const MAX_POLL_STALL_THRESHOLD_MS = 600_000;
|
||||
const POLL_WATCHDOG_INTERVAL_MS = 30_000;
|
||||
const POLL_STOP_GRACE_MS = 15_000;
|
||||
const CONFIRM_PERSISTED_OFFSET_TIMEOUT_MS = 10_000;
|
||||
@@ -50,6 +52,16 @@ const waitForGracefulStop = async (stop: () => Promise<void>) => {
|
||||
const telegramApiTimeoutSignal = (timeoutMs: number): TelegramApiAbortSignal =>
|
||||
AbortSignal.timeout(timeoutMs) as unknown as TelegramApiAbortSignal;
|
||||
|
||||
const resolvePollingStallThresholdMs = (value: number | undefined): number => {
|
||||
if (typeof value !== "number" || !Number.isFinite(value)) {
|
||||
return DEFAULT_POLL_STALL_THRESHOLD_MS;
|
||||
}
|
||||
return Math.min(
|
||||
MAX_POLL_STALL_THRESHOLD_MS,
|
||||
Math.max(MIN_POLL_STALL_THRESHOLD_MS, Math.floor(value)),
|
||||
);
|
||||
};
|
||||
|
||||
type TelegramPollingSessionOpts = {
|
||||
token: string;
|
||||
config: Parameters<typeof createTelegramBot>[0]["config"];
|
||||
@@ -65,6 +77,8 @@ type TelegramPollingSessionOpts = {
|
||||
telegramTransport?: TelegramTransport;
|
||||
/** Rebuild Telegram transport after stall/network recovery when marked dirty. */
|
||||
createTelegramTransport?: () => TelegramTransport;
|
||||
/** Stall detection threshold in ms. Defaults to 120_000 (2 min). */
|
||||
stallThresholdMs?: number;
|
||||
setStatus?: (patch: Omit<ChannelAccountSnapshot, "accountId">) => void;
|
||||
};
|
||||
|
||||
@@ -76,6 +90,7 @@ export class TelegramPollingSession {
|
||||
#activeFetchAbort: AbortController | undefined;
|
||||
#transportState: TelegramPollingTransportState;
|
||||
#status: ReturnType<typeof createTelegramPollingStatusPublisher>;
|
||||
#stallThresholdMs: number;
|
||||
|
||||
constructor(private readonly opts: TelegramPollingSessionOpts) {
|
||||
this.#transportState = new TelegramPollingTransportState({
|
||||
@@ -84,6 +99,7 @@ export class TelegramPollingSession {
|
||||
createTelegramTransport: opts.createTelegramTransport,
|
||||
});
|
||||
this.#status = createTelegramPollingStatusPublisher(opts.setStatus);
|
||||
this.#stallThresholdMs = resolvePollingStallThresholdMs(opts.stallThresholdMs);
|
||||
}
|
||||
|
||||
get activeRunner() {
|
||||
@@ -300,7 +316,7 @@ export class TelegramPollingSession {
|
||||
}
|
||||
|
||||
const stall = liveness.detectStall({
|
||||
thresholdMs: POLL_STALL_THRESHOLD_MS,
|
||||
thresholdMs: this.#stallThresholdMs,
|
||||
runnerIsRunning: runner.isRunning(),
|
||||
});
|
||||
if (stall) {
|
||||
|
||||
@@ -152,6 +152,8 @@ export type TelegramAccountConfig = {
|
||||
mediaMaxMb?: number;
|
||||
/** Telegram API client timeout in seconds (grammY ApiClientOptions). */
|
||||
timeoutSeconds?: number;
|
||||
/** Telegram polling watchdog threshold in milliseconds. Default: 120000. */
|
||||
pollingStallThresholdMs?: number;
|
||||
/** Retry policy for outbound Telegram API calls. */
|
||||
retry?: OutboundRetryConfig;
|
||||
/** Network transport overrides for Telegram. */
|
||||
|
||||
@@ -240,6 +240,7 @@ export const TelegramAccountSchemaBase = z
|
||||
streaming: ChannelPreviewStreamingConfigSchema.optional(),
|
||||
mediaMaxMb: z.number().positive().optional(),
|
||||
timeoutSeconds: z.number().int().positive().optional(),
|
||||
pollingStallThresholdMs: z.number().int().min(30_000).max(600_000).optional(),
|
||||
retry: RetryConfigSchema,
|
||||
network: z
|
||||
.object({
|
||||
|
||||
Reference in New Issue
Block a user