fix(telegram): guard duplicate polling leases

This commit is contained in:
Peter Steinberger
2026-04-25 09:38:12 +01:00
parent c88c2328c2
commit 3169886a21
8 changed files with 473 additions and 60 deletions

View File

@@ -15,6 +15,7 @@ Docs: https://docs.openclaw.ai
### Fixes
- Browser/Playwright: ignore benign already-handled route races during guarded navigation so browser-page tasks no longer fail when Playwright tears down a route mid-flight. (#68708) Thanks @Steady-ai.
- Telegram: prevent duplicate in-process long pollers for the same bot token and add clearer `getUpdates` conflict diagnostics for external duplicate pollers. Fixes #56230.
- Browser/Linux: detect Chromium-based installs under `/opt/google`, `/opt/brave.com`, `/usr/lib/chromium`, and `/usr/lib/chromium-browser` before asking users to set `browser.executablePath`. (#48563) Thanks @lupuletic.
- Sessions/browser: close tracked browser tabs when idle, daily, `/new`, or `/reset` session rollover archives the previous transcript, preventing tabs from leaking past the old session. Thanks @jakozloski.
- Sessions/forking: fall back to transcript-estimated parent token counts when cached totals are stale or missing, so oversized thread forks start fresh instead of cloning the full parent transcript. Thanks @jalehman.

View File

@@ -257,6 +257,7 @@ curl "https://api.telegram.org/bot<bot_token>/getUpdates"
- Group sessions are isolated by group ID. Forum topics append `:topic:<threadId>` to keep topics isolated.
- DM messages can carry `message_thread_id`; OpenClaw routes them with thread-aware session keys and preserves thread ID for replies.
- Long polling uses grammY runner with per-chat/per-thread sequencing. Overall runner sink concurrency uses `agents.defaults.maxConcurrent`.
- Long polling is guarded inside each gateway process so only one active poller can use a bot token at a time. If you still see `getUpdates` 409 conflicts, another OpenClaw gateway, script, or external poller is likely using the same token.
- Long-polling watchdog restarts trigger after 120 seconds without completed `getUpdates` liveness by default. Increase `channels.telegram.pollingStallThresholdMs` only if your deployment still sees false polling-stall restarts during long-running work. The value is in milliseconds and is allowed from `30000` to `600000`; per-account overrides are supported.
- Telegram Bot API has no read-receipt support (`sendReadReceipts` does not apply).

View File

@@ -2,6 +2,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite
type MonitorTelegramOpts = import("./monitor.js").MonitorTelegramOpts;
let monitorTelegramProvider: typeof import("./monitor.js").monitorTelegramProvider;
let resetTelegramPollingLeasesForTests: typeof import("./polling-lease.js").resetTelegramPollingLeasesForTests;
type MockCtx = {
message: {
@@ -337,9 +338,11 @@ describe("monitorTelegramProvider (grammY)", () => {
beforeAll(async () => {
({ monitorTelegramProvider } = await import("./monitor.js"));
({ resetTelegramPollingLeasesForTests } = await import("./polling-lease.js"));
});
beforeEach(() => {
resetTelegramPollingLeasesForTests();
loadConfig.mockReturnValue({
agents: { defaults: { maxConcurrent: 2 } },
channels: { telegram: {} },
@@ -519,6 +522,69 @@ describe("monitorTelegramProvider (grammY)", () => {
expect(createdBotStops[0]).toHaveBeenCalledTimes(1);
});
it("refuses a concurrent same-token polling monitor before starting another runner", async () => {
const abort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const monitor = monitorTelegramProvider({ token: "tok", abortSignal: abort.signal });
await firstCycle.waitForRunStart();
await expect(monitorTelegramProvider({ token: "tok" })).rejects.toThrow(
"refusing duplicate poller",
);
expect(runSpy).toHaveBeenCalledTimes(1);
abort.abort();
await monitor;
});
it("allows concurrent polling monitors for different bot tokens", async () => {
const firstAbort = new AbortController();
const secondAbort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const secondCycle = mockRunOnceWithStalledPollingRunner();
const firstMonitor = monitorTelegramProvider({
token: "tok-a",
abortSignal: firstAbort.signal,
});
await firstCycle.waitForRunStart();
const secondMonitor = monitorTelegramProvider({
token: "tok-b",
abortSignal: secondAbort.signal,
});
await secondCycle.waitForRunStart();
expect(runSpy).toHaveBeenCalledTimes(2);
firstAbort.abort();
secondAbort.abort();
await Promise.all([firstMonitor, secondMonitor]);
});
it("starts a same-token replacement after the previous monitor releases", async () => {
const firstAbort = new AbortController();
const secondAbort = new AbortController();
const firstCycle = mockRunOnceWithStalledPollingRunner();
const firstMonitor = monitorTelegramProvider({
token: "tok",
abortSignal: firstAbort.signal,
});
await firstCycle.waitForRunStart();
firstAbort.abort();
const secondCycle = mockRunOnceAndAbort(secondAbort);
const secondMonitor = monitorTelegramProvider({
token: "tok",
abortSignal: secondAbort.signal,
});
await secondCycle.waitForRunStart();
await Promise.all([firstMonitor, secondMonitor]);
expect(runSpy).toHaveBeenCalledTimes(2);
});
it("clears bounded cleanup timers after a clean stop", async () => {
vi.useFakeTimers();
try {

View File

@@ -17,6 +17,7 @@ import {
isRecoverableTelegramNetworkError,
isTelegramPollingNetworkError,
} from "./network-errors.js";
import { acquireTelegramPollingLease } from "./polling-lease.js";
import { makeProxyFetch } from "./proxy.js";
export type { MonitorTelegramOpts } from "./monitor.types.js";
@@ -161,76 +162,96 @@ export async function monitorTelegramProvider(opts: MonitorTelegramOpts = {}) {
const { TelegramPollingSession, readTelegramUpdateOffset, writeTelegramUpdateOffset } =
await loadTelegramMonitorPollingRuntime();
if (isTelegramExecApprovalHandlerConfigured({ cfg, accountId: account.accountId })) {
registerChannelRuntimeContext({
channelRuntime: opts.channelRuntime,
channelId: "telegram",
accountId: account.accountId,
capability: CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY,
context: { token },
abortSignal: opts.abortSignal,
});
}
const persistedOffsetRaw = await readTelegramUpdateOffset({
const pollingLease = await acquireTelegramPollingLease({
token,
accountId: account.accountId,
botToken: token,
abortSignal: opts.abortSignal,
});
let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw);
if (persistedOffsetRaw !== null && lastUpdateId === null) {
if (pollingLease.waitedForPrevious) {
log(
`[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`,
`[telegram][diag] waited for previous polling session for bot token ${pollingLease.tokenFingerprint} before starting account "${account.accountId}".`,
);
}
if (pollingLease.replacedStoppingPrevious) {
log(
`[telegram][diag] previous polling session for bot token ${pollingLease.tokenFingerprint} did not stop within the lease wait; starting a replacement for account "${account.accountId}".`,
);
}
const persistUpdateId = async (updateId: number) => {
const normalizedUpdateId = normalizePersistedUpdateId(updateId);
if (normalizedUpdateId === null) {
log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`);
return;
}
if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) {
return;
}
lastUpdateId = normalizedUpdateId;
try {
await writeTelegramUpdateOffset({
try {
if (isTelegramExecApprovalHandlerConfigured({ cfg, accountId: account.accountId })) {
registerChannelRuntimeContext({
channelRuntime: opts.channelRuntime,
channelId: "telegram",
accountId: account.accountId,
updateId: normalizedUpdateId,
botToken: token,
capability: CHANNEL_APPROVAL_NATIVE_RUNTIME_CONTEXT_CAPABILITY,
context: { token },
abortSignal: opts.abortSignal,
});
} catch (err) {
(opts.runtime?.error ?? console.error)(
`telegram: failed to persist update offset: ${String(err)}`,
}
const persistedOffsetRaw = await readTelegramUpdateOffset({
accountId: account.accountId,
botToken: token,
});
let lastUpdateId = normalizePersistedUpdateId(persistedOffsetRaw);
if (persistedOffsetRaw !== null && lastUpdateId === null) {
log(
`[telegram] Ignoring invalid persisted update offset (${String(persistedOffsetRaw)}); starting without offset confirmation.`,
);
}
};
// Preserve sticky IPv4 fallback state across clean/conflict restarts.
// Dirty polling cycles rebuild transport inside TelegramPollingSession.
const createTelegramTransportForPolling = () =>
resolveTelegramTransport(proxyFetch, {
network: account.config.network,
const persistUpdateId = async (updateId: number) => {
const normalizedUpdateId = normalizePersistedUpdateId(updateId);
if (normalizedUpdateId === null) {
log(`[telegram] Ignoring invalid update_id value: ${String(updateId)}`);
return;
}
if (lastUpdateId !== null && normalizedUpdateId <= lastUpdateId) {
return;
}
lastUpdateId = normalizedUpdateId;
try {
await writeTelegramUpdateOffset({
accountId: account.accountId,
updateId: normalizedUpdateId,
botToken: token,
});
} catch (err) {
(opts.runtime?.error ?? console.error)(
`telegram: failed to persist update offset: ${String(err)}`,
);
}
};
// Preserve sticky IPv4 fallback state across clean/conflict restarts.
// Dirty polling cycles rebuild transport inside TelegramPollingSession.
const createTelegramTransportForPolling = () =>
resolveTelegramTransport(proxyFetch, {
network: account.config.network,
});
const telegramTransport = createTelegramTransportForPolling();
pollingSession = new TelegramPollingSession({
token,
config: cfg,
accountId: account.accountId,
runtime: opts.runtime,
proxyFetch,
abortSignal: opts.abortSignal,
runnerOptions: createTelegramRunnerOptions(cfg),
getLastUpdateId: () => lastUpdateId,
persistUpdateId,
log,
telegramTransport,
createTelegramTransport: createTelegramTransportForPolling,
stallThresholdMs: account.config.pollingStallThresholdMs,
setStatus: opts.setStatus,
});
const telegramTransport = createTelegramTransportForPolling();
pollingSession = new TelegramPollingSession({
token,
config: cfg,
accountId: account.accountId,
runtime: opts.runtime,
proxyFetch,
abortSignal: opts.abortSignal,
runnerOptions: createTelegramRunnerOptions(cfg),
getLastUpdateId: () => lastUpdateId,
persistUpdateId,
log,
telegramTransport,
createTelegramTransport: createTelegramTransportForPolling,
stallThresholdMs: account.config.pollingStallThresholdMs,
setStatus: opts.setStatus,
});
await pollingSession.runUntilAbort();
await pollingSession.runUntilAbort();
} finally {
pollingLease.release();
}
} finally {
unregisterHandler();
}

View File

@@ -0,0 +1,102 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import {
acquireTelegramPollingLease,
resetTelegramPollingLeasesForTests,
} from "./polling-lease.js";
describe("Telegram polling lease", () => {
beforeEach(() => {
resetTelegramPollingLeasesForTests();
});
it("refuses an active duplicate poller for the same bot token", async () => {
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
});
await expect(
acquireTelegramPollingLease({
token: "123:abc",
accountId: "ops",
}),
).rejects.toThrow('refusing duplicate poller for account "ops"');
first.release();
});
it("allows concurrent pollers for different bot tokens", async () => {
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
});
const second = await acquireTelegramPollingLease({
token: "456:def",
accountId: "ops",
});
expect(first.tokenFingerprint).not.toBe(second.tokenFingerprint);
first.release();
second.release();
});
it("waits for an aborting same-token poller before acquiring", async () => {
const oldAbort = new AbortController();
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
abortSignal: oldAbort.signal,
});
oldAbort.abort();
const acquire = acquireTelegramPollingLease({
token: "123:abc",
accountId: "default",
waitMs: 1_000,
});
await Promise.resolve();
first.release();
const second = await acquire;
expect(second.waitedForPrevious).toBe(true);
expect(second.replacedStoppingPrevious).toBe(false);
second.release();
});
it("does not let stale release clear a replacement lease", async () => {
vi.useFakeTimers();
try {
const oldAbort = new AbortController();
const first = await acquireTelegramPollingLease({
token: "123:abc",
accountId: "old",
abortSignal: oldAbort.signal,
});
oldAbort.abort();
const acquireReplacement = acquireTelegramPollingLease({
token: "123:abc",
accountId: "new",
waitMs: 10,
});
await vi.advanceTimersByTimeAsync(10);
const replacement = await acquireReplacement;
expect(replacement.replacedStoppingPrevious).toBe(true);
first.release();
await expect(
acquireTelegramPollingLease({
token: "123:abc",
accountId: "third",
}),
).rejects.toThrow('account "new"');
replacement.release();
} finally {
vi.useRealTimers();
}
});
});

View File

@@ -0,0 +1,193 @@
import { createHash } from "node:crypto";
const TELEGRAM_POLLING_LEASES_KEY = Symbol.for("openclaw.telegram.pollingLeases");
const DEFAULT_TELEGRAM_POLLING_LEASE_WAIT_MS = 5_000;
type TelegramPollingLeaseEntry = {
accountId: string;
abortSignal?: AbortSignal;
done: Promise<void>;
owner: symbol;
resolveDone: () => void;
startedAt: number;
};
type TelegramPollingLeaseRegistry = Map<string, TelegramPollingLeaseEntry>;
export type TelegramPollingLease = {
tokenFingerprint: string;
waitedForPrevious: boolean;
replacedStoppingPrevious: boolean;
release: () => void;
};
type AcquireTelegramPollingLeaseOpts = {
token: string;
accountId: string;
abortSignal?: AbortSignal;
waitMs?: number;
};
type WaitForPreviousResult = "released" | "timeout" | "aborted";
function pollingLeaseRegistry(): TelegramPollingLeaseRegistry {
const proc = process as NodeJS.Process & {
[TELEGRAM_POLLING_LEASES_KEY]?: TelegramPollingLeaseRegistry;
};
proc[TELEGRAM_POLLING_LEASES_KEY] ??= new Map();
return proc[TELEGRAM_POLLING_LEASES_KEY];
}
function tokenFingerprint(token: string): string {
return createHash("sha256").update(token).digest("hex").slice(0, 16);
}
function createDuplicatePollingError(params: {
accountId: string;
existing: TelegramPollingLeaseEntry;
tokenFingerprint: string;
}): Error {
const ageMs = Math.max(0, Date.now() - params.existing.startedAt);
const ageSeconds = Math.round(ageMs / 1000);
return new Error(
`Telegram polling already active for bot token ${params.tokenFingerprint} on account "${params.existing.accountId}" (${ageSeconds}s old); refusing duplicate poller for account "${params.accountId}". Stop the existing OpenClaw gateway/poller or use a different bot token.`,
);
}
async function waitForPreviousRelease(params: {
done: Promise<void>;
signal?: AbortSignal;
waitMs: number;
}): Promise<WaitForPreviousResult> {
if (params.signal?.aborted) {
return "aborted";
}
let timer: ReturnType<typeof setTimeout> | undefined;
let abortListener: (() => void) | undefined;
try {
const timeout = new Promise<"timeout">((resolve) => {
timer = setTimeout(() => resolve("timeout"), Math.max(0, params.waitMs));
timer.unref?.();
});
const aborted = new Promise<"aborted">((resolve) => {
abortListener = () => resolve("aborted");
params.signal?.addEventListener("abort", abortListener, { once: true });
});
const released = params.done.then(() => "released" as const);
return await Promise.race([released, timeout, aborted]);
} finally {
if (timer) {
clearTimeout(timer);
}
if (abortListener) {
params.signal?.removeEventListener("abort", abortListener);
}
}
}
function createLease(params: {
accountId: string;
abortSignal?: AbortSignal;
registry: TelegramPollingLeaseRegistry;
tokenFingerprint: string;
waitedForPrevious: boolean;
replacedStoppingPrevious: boolean;
}): TelegramPollingLease {
let resolveDone!: () => void;
const done = new Promise<void>((resolve) => {
resolveDone = resolve;
});
const owner = Symbol(`telegram-polling:${params.accountId}`);
const entry: TelegramPollingLeaseEntry = {
accountId: params.accountId,
abortSignal: params.abortSignal,
done,
owner,
resolveDone,
startedAt: Date.now(),
};
params.registry.set(params.tokenFingerprint, entry);
let released = false;
return {
tokenFingerprint: params.tokenFingerprint,
waitedForPrevious: params.waitedForPrevious,
replacedStoppingPrevious: params.replacedStoppingPrevious,
release: () => {
if (released) {
return;
}
released = true;
const current = params.registry.get(params.tokenFingerprint);
if (current?.owner === owner) {
params.registry.delete(params.tokenFingerprint);
}
resolveDone();
},
};
}
export async function acquireTelegramPollingLease(
opts: AcquireTelegramPollingLeaseOpts,
): Promise<TelegramPollingLease> {
const registry = pollingLeaseRegistry();
const fingerprint = tokenFingerprint(opts.token);
const waitMs = opts.waitMs ?? DEFAULT_TELEGRAM_POLLING_LEASE_WAIT_MS;
let waitedForPrevious = false;
for (;;) {
const existing = registry.get(fingerprint);
if (!existing) {
return createLease({
accountId: opts.accountId,
abortSignal: opts.abortSignal,
registry,
tokenFingerprint: fingerprint,
waitedForPrevious,
replacedStoppingPrevious: false,
});
}
if (!existing.abortSignal?.aborted) {
throw createDuplicatePollingError({
accountId: opts.accountId,
existing,
tokenFingerprint: fingerprint,
});
}
waitedForPrevious = true;
const waitResult = await waitForPreviousRelease({
done: existing.done,
signal: opts.abortSignal,
waitMs,
});
if (waitResult === "aborted") {
throw new Error(
`Telegram polling start aborted while waiting for previous poller for bot token ${fingerprint} to stop.`,
);
}
const current = registry.get(fingerprint);
if (current !== existing) {
continue;
}
if (waitResult === "released") {
continue;
}
return createLease({
accountId: opts.accountId,
abortSignal: opts.abortSignal,
registry,
tokenFingerprint: fingerprint,
waitedForPrevious,
replacedStoppingPrevious: true,
});
}
}
export function resetTelegramPollingLeasesForTests(): void {
pollingLeaseRegistry().clear();
}

View File

@@ -953,6 +953,32 @@ describe("TelegramPollingSession", () => {
expect(transport2.close).toHaveBeenCalledTimes(1);
});
it("logs an actionable duplicate-poller hint for getUpdates conflicts", async () => {
const abort = new AbortController();
const log = vi.fn();
const conflictError = Object.assign(
new Error("Conflict: terminated by other getUpdates request"),
{
error_code: 409,
method: "getUpdates",
},
);
createTelegramBotMock.mockReturnValueOnce(makeBot()).mockReturnValueOnce(makeBot());
isRecoverableTelegramNetworkErrorMock.mockReturnValue(false);
mockRestartAfterPollingError(conflictError, abort);
const session = createPollingSession({
abortSignal: abort.signal,
log,
});
await session.runUntilAbort();
expect(log).toHaveBeenCalledWith(
expect.stringContaining("Another OpenClaw gateway, script, or Telegram poller"),
);
});
it("closes the transport once when runUntilAbort exits normally", async () => {
const abort = new AbortController();
const transport = makeTelegramTransport();

View File

@@ -381,11 +381,14 @@ export class TelegramPollingSession {
}
const reason = isConflict ? "getUpdates conflict" : "network error";
const errMsg = formatErrorMessage(err);
const conflictHint = isConflict
? " Another OpenClaw gateway, script, or Telegram poller may be using this bot token; stop the duplicate poller or switch this account to webhook mode."
: "";
this.opts.log(
`[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}`,
`[telegram][diag] polling cycle error reason=${reason} ${liveness.formatDiagnosticFields("lastGetUpdatesError")} err=${errMsg}${conflictHint}`,
);
const shouldRestart = await this.#waitBeforeRestart(
(delay) => `Telegram ${reason}: ${errMsg}; retrying in ${delay}.`,
(delay) => `Telegram ${reason}: ${errMsg};${conflictHint} retrying in ${delay}.`,
);
return shouldRestart ? "continue" : "exit";
} finally {